1# -*- coding: utf-8 -*- 2# Copyright (c) 2012 Thomas Parslow http://almostobsolete.net/ 3# Copyright (c) 2012 Robie Basak <robie@justgohome.co.uk> 4# 5# Permission is hereby granted, free of charge, to any person obtaining a 6# copy of this software and associated documentation files (the 7# "Software"), to deal in the Software without restriction, including 8# without limitation the rights to use, copy, modify, merge, publish, dis- 9# tribute, sublicense, and/or sell copies of the Software, and to permit 10# persons to whom the Software is furnished to do so, subject to the fol- 11# lowing conditions: 12# 13# The above copyright notice and this permission notice shall be included 14# in all copies or substantial portions of the Software. 15# 16# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS 17# OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABIL- 18# ITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT 19# SHALL THE AUTHOR BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, 20# WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, 21# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS 22# IN THE SOFTWARE. 23# 24import codecs 25from boto.glacier.exceptions import UploadArchiveError 26from boto.glacier.job import Job 27from boto.glacier.writer import compute_hashes_from_fileobj, \ 28 resume_file_upload, Writer 29from boto.glacier.concurrent import ConcurrentUploader 30from boto.glacier.utils import minimum_part_size, DEFAULT_PART_SIZE 31import os.path 32 33 34_MEGABYTE = 1024 * 1024 35_GIGABYTE = 1024 * _MEGABYTE 36 37MAXIMUM_ARCHIVE_SIZE = 10000 * 4 * _GIGABYTE 38MAXIMUM_NUMBER_OF_PARTS = 10000 39 40 41class Vault(object): 42 43 DefaultPartSize = DEFAULT_PART_SIZE 44 SingleOperationThreshold = 100 * _MEGABYTE 45 46 ResponseDataElements = (('VaultName', 'name', None), 47 ('VaultARN', 'arn', None), 48 ('CreationDate', 'creation_date', None), 49 ('LastInventoryDate', 'last_inventory_date', None), 50 ('SizeInBytes', 'size', 0), 51 ('NumberOfArchives', 'number_of_archives', 0)) 52 53 def __init__(self, layer1, response_data=None): 54 self.layer1 = layer1 55 if response_data: 56 for response_name, attr_name, default in self.ResponseDataElements: 57 value = response_data[response_name] 58 setattr(self, attr_name, value) 59 else: 60 for response_name, attr_name, default in self.ResponseDataElements: 61 setattr(self, attr_name, default) 62 63 def __repr__(self): 64 return 'Vault("%s")' % self.arn 65 66 def delete(self): 67 """ 68 Delete's this vault. WARNING! 69 """ 70 self.layer1.delete_vault(self.name) 71 72 def upload_archive(self, filename, description=None): 73 """ 74 Adds an archive to a vault. For archives greater than 100MB the 75 multipart upload will be used. 76 77 :type file: str 78 :param file: A filename to upload 79 80 :type description: str 81 :param description: An optional description for the archive. 82 83 :rtype: str 84 :return: The archive id of the newly created archive 85 """ 86 if os.path.getsize(filename) > self.SingleOperationThreshold: 87 return self.create_archive_from_file(filename, description=description) 88 return self._upload_archive_single_operation(filename, description) 89 90 def _upload_archive_single_operation(self, filename, description): 91 """ 92 Adds an archive to a vault in a single operation. It's recommended for 93 archives less than 100MB 94 95 :type file: str 96 :param file: A filename to upload 97 98 :type description: str 99 :param description: A description for the archive. 100 101 :rtype: str 102 :return: The archive id of the newly created archive 103 """ 104 with open(filename, 'rb') as fileobj: 105 linear_hash, tree_hash = compute_hashes_from_fileobj(fileobj) 106 fileobj.seek(0) 107 response = self.layer1.upload_archive(self.name, fileobj, 108 linear_hash, tree_hash, 109 description) 110 return response['ArchiveId'] 111 112 def create_archive_writer(self, part_size=DefaultPartSize, 113 description=None): 114 """ 115 Create a new archive and begin a multi-part upload to it. 116 Returns a file-like object to which the data for the archive 117 can be written. Once all the data is written the file-like 118 object should be closed, you can then call the get_archive_id 119 method on it to get the ID of the created archive. 120 121 :type part_size: int 122 :param part_size: The part size for the multipart upload. 123 124 :type description: str 125 :param description: An optional description for the archive. 126 127 :rtype: :class:`boto.glacier.writer.Writer` 128 :return: A Writer object that to which the archive data 129 should be written. 130 """ 131 response = self.layer1.initiate_multipart_upload(self.name, 132 part_size, 133 description) 134 return Writer(self, response['UploadId'], part_size=part_size) 135 136 def create_archive_from_file(self, filename=None, file_obj=None, 137 description=None, upload_id_callback=None): 138 """ 139 Create a new archive and upload the data from the given file 140 or file-like object. 141 142 :type filename: str 143 :param filename: A filename to upload 144 145 :type file_obj: file 146 :param file_obj: A file-like object to upload 147 148 :type description: str 149 :param description: An optional description for the archive. 150 151 :type upload_id_callback: function 152 :param upload_id_callback: if set, call with the upload_id as the 153 only parameter when it becomes known, to enable future calls 154 to resume_archive_from_file in case resume is needed. 155 156 :rtype: str 157 :return: The archive id of the newly created archive 158 """ 159 part_size = self.DefaultPartSize 160 if not file_obj: 161 file_size = os.path.getsize(filename) 162 try: 163 part_size = minimum_part_size(file_size, part_size) 164 except ValueError: 165 raise UploadArchiveError("File size of %s bytes exceeds " 166 "40,000 GB archive limit of Glacier.") 167 file_obj = open(filename, "rb") 168 writer = self.create_archive_writer( 169 description=description, 170 part_size=part_size) 171 if upload_id_callback: 172 upload_id_callback(writer.upload_id) 173 while True: 174 data = file_obj.read(part_size) 175 if not data: 176 break 177 writer.write(data) 178 writer.close() 179 return writer.get_archive_id() 180 181 @staticmethod 182 def _range_string_to_part_index(range_string, part_size): 183 start, inside_end = [int(value) for value in range_string.split('-')] 184 end = inside_end + 1 185 length = end - start 186 if length == part_size + 1: 187 # Off-by-one bug in Amazon's Glacier implementation, 188 # see: https://forums.aws.amazon.com/thread.jspa?threadID=106866 189 # Workaround: since part_size is too big by one byte, adjust it 190 end -= 1 191 inside_end -= 1 192 length -= 1 193 assert not (start % part_size), ( 194 "upload part start byte is not on a part boundary") 195 assert (length <= part_size), "upload part is bigger than part size" 196 return start // part_size 197 198 def resume_archive_from_file(self, upload_id, filename=None, 199 file_obj=None): 200 """Resume upload of a file already part-uploaded to Glacier. 201 202 The resumption of an upload where the part-uploaded section is empty 203 is a valid degenerate case that this function can handle. 204 205 One and only one of filename or file_obj must be specified. 206 207 :type upload_id: str 208 :param upload_id: existing Glacier upload id of upload being resumed. 209 210 :type filename: str 211 :param filename: file to open for resume 212 213 :type fobj: file 214 :param fobj: file-like object containing local data to resume. This 215 must read from the start of the entire upload, not just from the 216 point being resumed. Use fobj.seek(0) to achieve this if necessary. 217 218 :rtype: str 219 :return: The archive id of the newly created archive 220 221 """ 222 part_list_response = self.list_all_parts(upload_id) 223 part_size = part_list_response['PartSizeInBytes'] 224 225 part_hash_map = {} 226 for part_desc in part_list_response['Parts']: 227 part_index = self._range_string_to_part_index( 228 part_desc['RangeInBytes'], part_size) 229 part_tree_hash = codecs.decode(part_desc['SHA256TreeHash'], 'hex_codec') 230 part_hash_map[part_index] = part_tree_hash 231 232 if not file_obj: 233 file_obj = open(filename, "rb") 234 235 return resume_file_upload( 236 self, upload_id, part_size, file_obj, part_hash_map) 237 238 def concurrent_create_archive_from_file(self, filename, description, 239 **kwargs): 240 """ 241 Create a new archive from a file and upload the given 242 file. 243 244 This is a convenience method around the 245 :class:`boto.glacier.concurrent.ConcurrentUploader` 246 class. This method will perform a multipart upload 247 and upload the parts of the file concurrently. 248 249 :type filename: str 250 :param filename: A filename to upload 251 252 :param kwargs: Additional kwargs to pass through to 253 :py:class:`boto.glacier.concurrent.ConcurrentUploader`. 254 You can pass any argument besides the ``api`` and 255 ``vault_name`` param (these arguments are already 256 passed to the ``ConcurrentUploader`` for you). 257 258 :raises: `boto.glacier.exception.UploadArchiveError` is an error 259 occurs during the upload process. 260 261 :rtype: str 262 :return: The archive id of the newly created archive 263 264 """ 265 uploader = ConcurrentUploader(self.layer1, self.name, **kwargs) 266 archive_id = uploader.upload(filename, description) 267 return archive_id 268 269 def retrieve_archive(self, archive_id, sns_topic=None, 270 description=None): 271 """ 272 Initiate a archive retrieval job to download the data from an 273 archive. You will need to wait for the notification from 274 Amazon (via SNS) before you can actually download the data, 275 this takes around 4 hours. 276 277 :type archive_id: str 278 :param archive_id: The id of the archive 279 280 :type description: str 281 :param description: An optional description for the job. 282 283 :type sns_topic: str 284 :param sns_topic: The Amazon SNS topic ARN where Amazon Glacier 285 sends notification when the job is completed and the output 286 is ready for you to download. 287 288 :rtype: :class:`boto.glacier.job.Job` 289 :return: A Job object representing the retrieval job. 290 """ 291 job_data = {'Type': 'archive-retrieval', 292 'ArchiveId': archive_id} 293 if sns_topic is not None: 294 job_data['SNSTopic'] = sns_topic 295 if description is not None: 296 job_data['Description'] = description 297 298 response = self.layer1.initiate_job(self.name, job_data) 299 return self.get_job(response['JobId']) 300 301 def retrieve_inventory(self, sns_topic=None, 302 description=None, byte_range=None, 303 start_date=None, end_date=None, 304 limit=None): 305 """ 306 Initiate a inventory retrieval job to list the items in the 307 vault. You will need to wait for the notification from 308 Amazon (via SNS) before you can actually download the data, 309 this takes around 4 hours. 310 311 :type description: str 312 :param description: An optional description for the job. 313 314 :type sns_topic: str 315 :param sns_topic: The Amazon SNS topic ARN where Amazon Glacier 316 sends notification when the job is completed and the output 317 is ready for you to download. 318 319 :type byte_range: str 320 :param byte_range: Range of bytes to retrieve. 321 322 :type start_date: DateTime 323 :param start_date: Beginning of the date range to query. 324 325 :type end_date: DateTime 326 :param end_date: End of the date range to query. 327 328 :type limit: int 329 :param limit: Limits the number of results returned. 330 331 :rtype: str 332 :return: The ID of the job 333 """ 334 job_data = {'Type': 'inventory-retrieval'} 335 if sns_topic is not None: 336 job_data['SNSTopic'] = sns_topic 337 if description is not None: 338 job_data['Description'] = description 339 if byte_range is not None: 340 job_data['RetrievalByteRange'] = byte_range 341 if start_date is not None or end_date is not None or limit is not None: 342 rparams = {} 343 344 if start_date is not None: 345 rparams['StartDate'] = start_date.strftime('%Y-%m-%dT%H:%M:%S%Z') 346 if end_date is not None: 347 rparams['EndDate'] = end_date.strftime('%Y-%m-%dT%H:%M:%S%Z') 348 if limit is not None: 349 rparams['Limit'] = limit 350 351 job_data['InventoryRetrievalParameters'] = rparams 352 353 response = self.layer1.initiate_job(self.name, job_data) 354 return response['JobId'] 355 356 def retrieve_inventory_job(self, **kwargs): 357 """ 358 Identical to ``retrieve_inventory``, but returns a ``Job`` instance 359 instead of just the job ID. 360 361 :type description: str 362 :param description: An optional description for the job. 363 364 :type sns_topic: str 365 :param sns_topic: The Amazon SNS topic ARN where Amazon Glacier 366 sends notification when the job is completed and the output 367 is ready for you to download. 368 369 :type byte_range: str 370 :param byte_range: Range of bytes to retrieve. 371 372 :type start_date: DateTime 373 :param start_date: Beginning of the date range to query. 374 375 :type end_date: DateTime 376 :param end_date: End of the date range to query. 377 378 :type limit: int 379 :param limit: Limits the number of results returned. 380 381 :rtype: :class:`boto.glacier.job.Job` 382 :return: A Job object representing the retrieval job. 383 """ 384 job_id = self.retrieve_inventory(**kwargs) 385 return self.get_job(job_id) 386 387 def delete_archive(self, archive_id): 388 """ 389 This operation deletes an archive from the vault. 390 391 :type archive_id: str 392 :param archive_id: The ID for the archive to be deleted. 393 """ 394 return self.layer1.delete_archive(self.name, archive_id) 395 396 def get_job(self, job_id): 397 """ 398 Get an object representing a job in progress. 399 400 :type job_id: str 401 :param job_id: The ID of the job 402 403 :rtype: :class:`boto.glacier.job.Job` 404 :return: A Job object representing the job. 405 """ 406 response_data = self.layer1.describe_job(self.name, job_id) 407 return Job(self, response_data) 408 409 def list_jobs(self, completed=None, status_code=None): 410 """ 411 Return a list of Job objects related to this vault. 412 413 :type completed: boolean 414 :param completed: Specifies the state of the jobs to return. 415 If a value of True is passed, only completed jobs will 416 be returned. If a value of False is passed, only 417 uncompleted jobs will be returned. If no value is 418 passed, all jobs will be returned. 419 420 :type status_code: string 421 :param status_code: Specifies the type of job status to return. 422 Valid values are: InProgress|Succeeded|Failed. If not 423 specified, jobs with all status codes are returned. 424 425 :rtype: list of :class:`boto.glacier.job.Job` 426 :return: A list of Job objects related to this vault. 427 """ 428 response_data = self.layer1.list_jobs(self.name, completed, 429 status_code) 430 return [Job(self, jd) for jd in response_data['JobList']] 431 432 def list_all_parts(self, upload_id): 433 """Automatically make and combine multiple calls to list_parts. 434 435 Call list_parts as necessary, combining the results in case multiple 436 calls were required to get data on all available parts. 437 438 """ 439 result = self.layer1.list_parts(self.name, upload_id) 440 marker = result['Marker'] 441 while marker: 442 additional_result = self.layer1.list_parts( 443 self.name, upload_id, marker=marker) 444 result['Parts'].extend(additional_result['Parts']) 445 marker = additional_result['Marker'] 446 # The marker makes no sense in an unpaginated result, and clearing it 447 # makes testing easier. This also has the nice property that the result 448 # is a normal (but expanded) response. 449 result['Marker'] = None 450 return result 451