1# Copyright 2015 The Chromium Authors. All rights reserved. 2# Use of this source code is governed by a BSD-style license that can be 3# found in the LICENSE file. 4 5import contextlib 6import os 7import tempfile 8 9from perf_insights import cloud_storage 10 11 12class FilePreparationError(Exception): 13 """Raised if something goes wrong while preparing a file for processing.""" 14 15 16class FileHandle(object): 17 def __init__(self, canonical_url): 18 self._canonical_url = canonical_url 19 20 @property 21 def canonical_url(self): 22 return self._canonical_url 23 24 @contextlib.contextmanager 25 def PrepareFileForProcessing(self): 26 """Ensure that the URL to the file will be acessible during processing. 27 28 This function must do any pre-work to ensure that mappers and reducers will 29 be able to read from the URL contained in the file handle. 30 31 Raises: 32 FilePreparationError: If something went wrong while preparing the file. 33 """ 34 yield self._WillProcess() 35 self._DidProcess() 36 37 def _WillProcess(self): 38 raise NotImplementedError() 39 40 def _DidProcess(self): 41 raise NotImplementedError() 42 43 44class URLFileHandle(FileHandle): 45 def __init__(self, canonical_url, url_to_load): 46 super(URLFileHandle, self).__init__(canonical_url) 47 48 self._url_to_load = url_to_load 49 50 def AsDict(self): 51 return { 52 'type': 'url', 53 'canonical_url': self._canonical_url, 54 'url_to_load': self._url_to_load 55 } 56 57 def _WillProcess(self): 58 return self 59 60 def _DidProcess(self): 61 pass 62 63 64class GCSFileHandle(FileHandle): 65 def __init__(self, canonical_url, cache_directory): 66 super(GCSFileHandle, self).__init__(canonical_url) 67 file_name = canonical_url.split('/')[-1] 68 self.cache_file = os.path.join( 69 cache_directory, file_name + '.gz') 70 71 def _WillProcess(self): 72 if not os.path.exists(self.cache_file): 73 try: 74 cloud_storage.Copy(self.canonical_url, self.cache_file) 75 except cloud_storage.CloudStorageError: 76 return None 77 return URLFileHandle(self.canonical_url, 'file://' + self.cache_file) 78 79 def _DidProcess(self): 80 pass 81 82 83class InMemoryFileHandle(FileHandle): 84 def __init__(self, canonical_url, data): 85 super(InMemoryFileHandle, self).__init__(canonical_url) 86 87 self.data = data 88 self._temp_file_path = None 89 90 def _WillProcess(self): 91 temp_file = tempfile.NamedTemporaryFile(delete=False) 92 temp_file.write(self.data) 93 temp_file.close() 94 self._temp_file_path = temp_file.name 95 96 return URLFileHandle(self.canonical_url, 'file://' + self._temp_file_path) 97 98 def _DidProcess(self): 99 os.remove(self._temp_file_path) 100 self._temp_file_path = None 101