1import boto 2from boto.dynamodb2 import exceptions 3from boto.dynamodb2.fields import (HashKey, RangeKey, 4 AllIndex, KeysOnlyIndex, IncludeIndex, 5 GlobalAllIndex, GlobalKeysOnlyIndex, 6 GlobalIncludeIndex) 7from boto.dynamodb2.items import Item 8from boto.dynamodb2.layer1 import DynamoDBConnection 9from boto.dynamodb2.results import ResultSet, BatchGetResultSet 10from boto.dynamodb2.types import (NonBooleanDynamizer, Dynamizer, FILTER_OPERATORS, 11 QUERY_OPERATORS, STRING) 12from boto.exception import JSONResponseError 13 14 15class Table(object): 16 """ 17 Interacts & models the behavior of a DynamoDB table. 18 19 The ``Table`` object represents a set (or rough categorization) of 20 records within DynamoDB. The important part is that all records within the 21 table, while largely-schema-free, share the same schema & are essentially 22 namespaced for use in your application. For example, you might have a 23 ``users`` table or a ``forums`` table. 24 """ 25 max_batch_get = 100 26 27 _PROJECTION_TYPE_TO_INDEX = dict( 28 global_indexes=dict( 29 ALL=GlobalAllIndex, 30 KEYS_ONLY=GlobalKeysOnlyIndex, 31 INCLUDE=GlobalIncludeIndex, 32 ), local_indexes=dict( 33 ALL=AllIndex, 34 KEYS_ONLY=KeysOnlyIndex, 35 INCLUDE=IncludeIndex, 36 ) 37 ) 38 39 def __init__(self, table_name, schema=None, throughput=None, indexes=None, 40 global_indexes=None, connection=None): 41 """ 42 Sets up a new in-memory ``Table``. 43 44 This is useful if the table already exists within DynamoDB & you simply 45 want to use it for additional interactions. The only required parameter 46 is the ``table_name``. However, under the hood, the object will call 47 ``describe_table`` to determine the schema/indexes/throughput. You 48 can avoid this extra call by passing in ``schema`` & ``indexes``. 49 50 **IMPORTANT** - If you're creating a new ``Table`` for the first time, 51 you should use the ``Table.create`` method instead, as it will 52 persist the table structure to DynamoDB. 53 54 Requires a ``table_name`` parameter, which should be a simple string 55 of the name of the table. 56 57 Optionally accepts a ``schema`` parameter, which should be a list of 58 ``BaseSchemaField`` subclasses representing the desired schema. 59 60 Optionally accepts a ``throughput`` parameter, which should be a 61 dictionary. If provided, it should specify a ``read`` & ``write`` key, 62 both of which should have an integer value associated with them. 63 64 Optionally accepts a ``indexes`` parameter, which should be a list of 65 ``BaseIndexField`` subclasses representing the desired indexes. 66 67 Optionally accepts a ``global_indexes`` parameter, which should be a 68 list of ``GlobalBaseIndexField`` subclasses representing the desired 69 indexes. 70 71 Optionally accepts a ``connection`` parameter, which should be a 72 ``DynamoDBConnection`` instance (or subclass). This is primarily useful 73 for specifying alternate connection parameters. 74 75 Example:: 76 77 # The simple, it-already-exists case. 78 >>> conn = Table('users') 79 80 # The full, minimum-extra-calls case. 81 >>> from boto import dynamodb2 82 >>> users = Table('users', schema=[ 83 ... HashKey('username'), 84 ... RangeKey('date_joined', data_type=NUMBER) 85 ... ], throughput={ 86 ... 'read':20, 87 ... 'write': 10, 88 ... }, indexes=[ 89 ... KeysOnlyIndex('MostRecentlyJoined', parts=[ 90 ... HashKey('username') 91 ... RangeKey('date_joined') 92 ... ]), 93 ... ], global_indexes=[ 94 ... GlobalAllIndex('UsersByZipcode', parts=[ 95 ... HashKey('zipcode'), 96 ... RangeKey('username'), 97 ... ], 98 ... throughput={ 99 ... 'read':10, 100 ... 'write":10, 101 ... }), 102 ... ], connection=dynamodb2.connect_to_region('us-west-2', 103 ... aws_access_key_id='key', 104 ... aws_secret_access_key='key', 105 ... )) 106 107 """ 108 self.table_name = table_name 109 self.connection = connection 110 self.throughput = { 111 'read': 5, 112 'write': 5, 113 } 114 self.schema = schema 115 self.indexes = indexes 116 self.global_indexes = global_indexes 117 118 if self.connection is None: 119 self.connection = DynamoDBConnection() 120 121 if throughput is not None: 122 self.throughput = throughput 123 124 self._dynamizer = NonBooleanDynamizer() 125 126 def use_boolean(self): 127 self._dynamizer = Dynamizer() 128 129 @classmethod 130 def create(cls, table_name, schema, throughput=None, indexes=None, 131 global_indexes=None, connection=None): 132 """ 133 Creates a new table in DynamoDB & returns an in-memory ``Table`` object. 134 135 This will setup a brand new table within DynamoDB. The ``table_name`` 136 must be unique for your AWS account. The ``schema`` is also required 137 to define the key structure of the table. 138 139 **IMPORTANT** - You should consider the usage pattern of your table 140 up-front, as the schema can **NOT** be modified once the table is 141 created, requiring the creation of a new table & migrating the data 142 should you wish to revise it. 143 144 **IMPORTANT** - If the table already exists in DynamoDB, additional 145 calls to this method will result in an error. If you just need 146 a ``Table`` object to interact with the existing table, you should 147 just initialize a new ``Table`` object, which requires only the 148 ``table_name``. 149 150 Requires a ``table_name`` parameter, which should be a simple string 151 of the name of the table. 152 153 Requires a ``schema`` parameter, which should be a list of 154 ``BaseSchemaField`` subclasses representing the desired schema. 155 156 Optionally accepts a ``throughput`` parameter, which should be a 157 dictionary. If provided, it should specify a ``read`` & ``write`` key, 158 both of which should have an integer value associated with them. 159 160 Optionally accepts a ``indexes`` parameter, which should be a list of 161 ``BaseIndexField`` subclasses representing the desired indexes. 162 163 Optionally accepts a ``global_indexes`` parameter, which should be a 164 list of ``GlobalBaseIndexField`` subclasses representing the desired 165 indexes. 166 167 Optionally accepts a ``connection`` parameter, which should be a 168 ``DynamoDBConnection`` instance (or subclass). This is primarily useful 169 for specifying alternate connection parameters. 170 171 Example:: 172 173 >>> users = Table.create('users', schema=[ 174 ... HashKey('username'), 175 ... RangeKey('date_joined', data_type=NUMBER) 176 ... ], throughput={ 177 ... 'read':20, 178 ... 'write': 10, 179 ... }, indexes=[ 180 ... KeysOnlyIndex('MostRecentlyJoined', parts=[ 181 ... RangeKey('date_joined') 182 ... ]), global_indexes=[ 183 ... GlobalAllIndex('UsersByZipcode', parts=[ 184 ... HashKey('zipcode'), 185 ... RangeKey('username'), 186 ... ], 187 ... throughput={ 188 ... 'read':10, 189 ... 'write':10, 190 ... }), 191 ... ]) 192 193 """ 194 table = cls(table_name=table_name, connection=connection) 195 table.schema = schema 196 197 if throughput is not None: 198 table.throughput = throughput 199 200 if indexes is not None: 201 table.indexes = indexes 202 203 if global_indexes is not None: 204 table.global_indexes = global_indexes 205 206 # Prep the schema. 207 raw_schema = [] 208 attr_defs = [] 209 seen_attrs = set() 210 211 for field in table.schema: 212 raw_schema.append(field.schema()) 213 # Build the attributes off what we know. 214 seen_attrs.add(field.name) 215 attr_defs.append(field.definition()) 216 217 raw_throughput = { 218 'ReadCapacityUnits': int(table.throughput['read']), 219 'WriteCapacityUnits': int(table.throughput['write']), 220 } 221 kwargs = {} 222 223 kwarg_map = { 224 'indexes': 'local_secondary_indexes', 225 'global_indexes': 'global_secondary_indexes', 226 } 227 for index_attr in ('indexes', 'global_indexes'): 228 table_indexes = getattr(table, index_attr) 229 if table_indexes: 230 raw_indexes = [] 231 for index_field in table_indexes: 232 raw_indexes.append(index_field.schema()) 233 # Make sure all attributes specified in the indexes are 234 # added to the definition 235 for field in index_field.parts: 236 if field.name not in seen_attrs: 237 seen_attrs.add(field.name) 238 attr_defs.append(field.definition()) 239 240 kwargs[kwarg_map[index_attr]] = raw_indexes 241 242 table.connection.create_table( 243 table_name=table.table_name, 244 attribute_definitions=attr_defs, 245 key_schema=raw_schema, 246 provisioned_throughput=raw_throughput, 247 **kwargs 248 ) 249 return table 250 251 def _introspect_schema(self, raw_schema, raw_attributes=None): 252 """ 253 Given a raw schema structure back from a DynamoDB response, parse 254 out & build the high-level Python objects that represent them. 255 """ 256 schema = [] 257 sane_attributes = {} 258 259 if raw_attributes: 260 for field in raw_attributes: 261 sane_attributes[field['AttributeName']] = field['AttributeType'] 262 263 for field in raw_schema: 264 data_type = sane_attributes.get(field['AttributeName'], STRING) 265 266 if field['KeyType'] == 'HASH': 267 schema.append( 268 HashKey(field['AttributeName'], data_type=data_type) 269 ) 270 elif field['KeyType'] == 'RANGE': 271 schema.append( 272 RangeKey(field['AttributeName'], data_type=data_type) 273 ) 274 else: 275 raise exceptions.UnknownSchemaFieldError( 276 "%s was seen, but is unknown. Please report this at " 277 "https://github.com/boto/boto/issues." % field['KeyType'] 278 ) 279 280 return schema 281 282 def _introspect_all_indexes(self, raw_indexes, map_indexes_projection): 283 """ 284 Given a raw index/global index structure back from a DynamoDB response, 285 parse out & build the high-level Python objects that represent them. 286 """ 287 indexes = [] 288 289 for field in raw_indexes: 290 index_klass = map_indexes_projection.get('ALL') 291 kwargs = { 292 'parts': [] 293 } 294 295 if field['Projection']['ProjectionType'] == 'ALL': 296 index_klass = map_indexes_projection.get('ALL') 297 elif field['Projection']['ProjectionType'] == 'KEYS_ONLY': 298 index_klass = map_indexes_projection.get('KEYS_ONLY') 299 elif field['Projection']['ProjectionType'] == 'INCLUDE': 300 index_klass = map_indexes_projection.get('INCLUDE') 301 kwargs['includes'] = field['Projection']['NonKeyAttributes'] 302 else: 303 raise exceptions.UnknownIndexFieldError( 304 "%s was seen, but is unknown. Please report this at " 305 "https://github.com/boto/boto/issues." % \ 306 field['Projection']['ProjectionType'] 307 ) 308 309 name = field['IndexName'] 310 kwargs['parts'] = self._introspect_schema(field['KeySchema'], None) 311 indexes.append(index_klass(name, **kwargs)) 312 313 return indexes 314 315 def _introspect_indexes(self, raw_indexes): 316 """ 317 Given a raw index structure back from a DynamoDB response, parse 318 out & build the high-level Python objects that represent them. 319 """ 320 return self._introspect_all_indexes( 321 raw_indexes, self._PROJECTION_TYPE_TO_INDEX.get('local_indexes')) 322 323 def _introspect_global_indexes(self, raw_global_indexes): 324 """ 325 Given a raw global index structure back from a DynamoDB response, parse 326 out & build the high-level Python objects that represent them. 327 """ 328 return self._introspect_all_indexes( 329 raw_global_indexes, 330 self._PROJECTION_TYPE_TO_INDEX.get('global_indexes')) 331 332 def describe(self): 333 """ 334 Describes the current structure of the table in DynamoDB. 335 336 This information will be used to update the ``schema``, ``indexes``, 337 ``global_indexes`` and ``throughput`` information on the ``Table``. Some 338 calls, such as those involving creating keys or querying, will require 339 this information to be populated. 340 341 It also returns the full raw data structure from DynamoDB, in the 342 event you'd like to parse out additional information (such as the 343 ``ItemCount`` or usage information). 344 345 Example:: 346 347 >>> users.describe() 348 { 349 # Lots of keys here... 350 } 351 >>> len(users.schema) 352 2 353 354 """ 355 result = self.connection.describe_table(self.table_name) 356 357 # Blindly update throughput, since what's on DynamoDB's end is likely 358 # more correct. 359 raw_throughput = result['Table']['ProvisionedThroughput'] 360 self.throughput['read'] = int(raw_throughput['ReadCapacityUnits']) 361 self.throughput['write'] = int(raw_throughput['WriteCapacityUnits']) 362 363 if not self.schema: 364 # Since we have the data, build the schema. 365 raw_schema = result['Table'].get('KeySchema', []) 366 raw_attributes = result['Table'].get('AttributeDefinitions', []) 367 self.schema = self._introspect_schema(raw_schema, raw_attributes) 368 369 if not self.indexes: 370 # Build the index information as well. 371 raw_indexes = result['Table'].get('LocalSecondaryIndexes', []) 372 self.indexes = self._introspect_indexes(raw_indexes) 373 374 # Build the global index information as well. 375 raw_global_indexes = result['Table'].get('GlobalSecondaryIndexes', []) 376 self.global_indexes = self._introspect_global_indexes(raw_global_indexes) 377 378 # This is leaky. 379 return result 380 381 def update(self, throughput=None, global_indexes=None): 382 """ 383 Updates table attributes and global indexes in DynamoDB. 384 385 Optionally accepts a ``throughput`` parameter, which should be a 386 dictionary. If provided, it should specify a ``read`` & ``write`` key, 387 both of which should have an integer value associated with them. 388 389 Optionally accepts a ``global_indexes`` parameter, which should be a 390 dictionary. If provided, it should specify the index name, which is also 391 a dict containing a ``read`` & ``write`` key, both of which 392 should have an integer value associated with them. If you are writing 393 new code, please use ``Table.update_global_secondary_index``. 394 395 Returns ``True`` on success. 396 397 Example:: 398 399 # For a read-heavier application... 400 >>> users.update(throughput={ 401 ... 'read': 20, 402 ... 'write': 10, 403 ... }) 404 True 405 406 # To also update the global index(es) throughput. 407 >>> users.update(throughput={ 408 ... 'read': 20, 409 ... 'write': 10, 410 ... }, 411 ... global_secondary_indexes={ 412 ... 'TheIndexNameHere': { 413 ... 'read': 15, 414 ... 'write': 5, 415 ... } 416 ... }) 417 True 418 """ 419 420 data = None 421 422 if throughput: 423 self.throughput = throughput 424 data = { 425 'ReadCapacityUnits': int(self.throughput['read']), 426 'WriteCapacityUnits': int(self.throughput['write']), 427 } 428 429 gsi_data = None 430 431 if global_indexes: 432 gsi_data = [] 433 434 for gsi_name, gsi_throughput in global_indexes.items(): 435 gsi_data.append({ 436 "Update": { 437 "IndexName": gsi_name, 438 "ProvisionedThroughput": { 439 "ReadCapacityUnits": int(gsi_throughput['read']), 440 "WriteCapacityUnits": int(gsi_throughput['write']), 441 }, 442 }, 443 }) 444 445 if throughput or global_indexes: 446 self.connection.update_table( 447 self.table_name, 448 provisioned_throughput=data, 449 global_secondary_index_updates=gsi_data, 450 ) 451 452 return True 453 else: 454 msg = 'You need to provide either the throughput or the ' \ 455 'global_indexes to update method' 456 boto.log.error(msg) 457 458 return False 459 460 def create_global_secondary_index(self, global_index): 461 """ 462 Creates a global index in DynamoDB after the table has been created. 463 464 Requires a ``global_indexes`` parameter, which should be a 465 ``GlobalBaseIndexField`` subclass representing the desired index. 466 467 To update ``global_indexes`` information on the ``Table``, you'll need 468 to call ``Table.describe``. 469 470 Returns ``True`` on success. 471 472 Example:: 473 474 # To create a global index 475 >>> users.create_global_secondary_index( 476 ... global_index=GlobalAllIndex( 477 ... 'TheIndexNameHere', parts=[ 478 ... HashKey('requiredHashkey', data_type=STRING), 479 ... RangeKey('optionalRangeKey', data_type=STRING) 480 ... ], 481 ... throughput={ 482 ... 'read': 2, 483 ... 'write': 1, 484 ... }) 485 ... ) 486 True 487 488 """ 489 490 if global_index: 491 gsi_data = [] 492 gsi_data_attr_def = [] 493 494 gsi_data.append({ 495 "Create": global_index.schema() 496 }) 497 498 for attr_def in global_index.parts: 499 gsi_data_attr_def.append(attr_def.definition()) 500 501 self.connection.update_table( 502 self.table_name, 503 global_secondary_index_updates=gsi_data, 504 attribute_definitions=gsi_data_attr_def 505 ) 506 507 return True 508 else: 509 msg = 'You need to provide the global_index to ' \ 510 'create_global_secondary_index method' 511 boto.log.error(msg) 512 513 return False 514 515 def delete_global_secondary_index(self, global_index_name): 516 """ 517 Deletes a global index in DynamoDB after the table has been created. 518 519 Requires a ``global_index_name`` parameter, which should be a simple 520 string of the name of the global secondary index. 521 522 To update ``global_indexes`` information on the ``Table``, you'll need 523 to call ``Table.describe``. 524 525 Returns ``True`` on success. 526 527 Example:: 528 529 # To delete a global index 530 >>> users.delete_global_secondary_index('TheIndexNameHere') 531 True 532 533 """ 534 535 if global_index_name: 536 gsi_data = [ 537 { 538 "Delete": { 539 "IndexName": global_index_name 540 } 541 } 542 ] 543 544 self.connection.update_table( 545 self.table_name, 546 global_secondary_index_updates=gsi_data, 547 ) 548 549 return True 550 else: 551 msg = 'You need to provide the global index name to ' \ 552 'delete_global_secondary_index method' 553 boto.log.error(msg) 554 555 return False 556 557 def update_global_secondary_index(self, global_indexes): 558 """ 559 Updates a global index(es) in DynamoDB after the table has been created. 560 561 Requires a ``global_indexes`` parameter, which should be a 562 dictionary. If provided, it should specify the index name, which is also 563 a dict containing a ``read`` & ``write`` key, both of which 564 should have an integer value associated with them. 565 566 To update ``global_indexes`` information on the ``Table``, you'll need 567 to call ``Table.describe``. 568 569 Returns ``True`` on success. 570 571 Example:: 572 573 # To update a global index 574 >>> users.update_global_secondary_index(global_indexes={ 575 ... 'TheIndexNameHere': { 576 ... 'read': 15, 577 ... 'write': 5, 578 ... } 579 ... }) 580 True 581 582 """ 583 584 if global_indexes: 585 gsi_data = [] 586 587 for gsi_name, gsi_throughput in global_indexes.items(): 588 gsi_data.append({ 589 "Update": { 590 "IndexName": gsi_name, 591 "ProvisionedThroughput": { 592 "ReadCapacityUnits": int(gsi_throughput['read']), 593 "WriteCapacityUnits": int(gsi_throughput['write']), 594 }, 595 }, 596 }) 597 598 self.connection.update_table( 599 self.table_name, 600 global_secondary_index_updates=gsi_data, 601 ) 602 return True 603 else: 604 msg = 'You need to provide the global indexes to ' \ 605 'update_global_secondary_index method' 606 boto.log.error(msg) 607 608 return False 609 610 def delete(self): 611 """ 612 Deletes a table in DynamoDB. 613 614 **IMPORTANT** - Be careful when using this method, there is no undo. 615 616 Returns ``True`` on success. 617 618 Example:: 619 620 >>> users.delete() 621 True 622 623 """ 624 self.connection.delete_table(self.table_name) 625 return True 626 627 def _encode_keys(self, keys): 628 """ 629 Given a flat Python dictionary of keys/values, converts it into the 630 nested dictionary DynamoDB expects. 631 632 Converts:: 633 634 { 635 'username': 'john', 636 'tags': [1, 2, 5], 637 } 638 639 ...to...:: 640 641 { 642 'username': {'S': 'john'}, 643 'tags': {'NS': ['1', '2', '5']}, 644 } 645 646 """ 647 raw_key = {} 648 649 for key, value in keys.items(): 650 raw_key[key] = self._dynamizer.encode(value) 651 652 return raw_key 653 654 def get_item(self, consistent=False, attributes=None, **kwargs): 655 """ 656 Fetches an item (record) from a table in DynamoDB. 657 658 To specify the key of the item you'd like to get, you can specify the 659 key attributes as kwargs. 660 661 Optionally accepts a ``consistent`` parameter, which should be a 662 boolean. If you provide ``True``, it will perform 663 a consistent (but more expensive) read from DynamoDB. 664 (Default: ``False``) 665 666 Optionally accepts an ``attributes`` parameter, which should be a 667 list of fieldname to fetch. (Default: ``None``, which means all fields 668 should be fetched) 669 670 Returns an ``Item`` instance containing all the data for that record. 671 672 Raises an ``ItemNotFound`` exception if the item is not found. 673 674 Example:: 675 676 # A simple hash key. 677 >>> john = users.get_item(username='johndoe') 678 >>> john['first_name'] 679 'John' 680 681 # A complex hash+range key. 682 >>> john = users.get_item(username='johndoe', last_name='Doe') 683 >>> john['first_name'] 684 'John' 685 686 # A consistent read (assuming the data might have just changed). 687 >>> john = users.get_item(username='johndoe', consistent=True) 688 >>> john['first_name'] 689 'Johann' 690 691 # With a key that is an invalid variable name in Python. 692 # Also, assumes a different schema than previous examples. 693 >>> john = users.get_item(**{ 694 ... 'date-joined': 127549192, 695 ... }) 696 >>> john['first_name'] 697 'John' 698 699 """ 700 raw_key = self._encode_keys(kwargs) 701 item_data = self.connection.get_item( 702 self.table_name, 703 raw_key, 704 attributes_to_get=attributes, 705 consistent_read=consistent 706 ) 707 if 'Item' not in item_data: 708 raise exceptions.ItemNotFound("Item %s couldn't be found." % kwargs) 709 item = Item(self) 710 item.load(item_data) 711 return item 712 713 def has_item(self, **kwargs): 714 """ 715 Return whether an item (record) exists within a table in DynamoDB. 716 717 To specify the key of the item you'd like to get, you can specify the 718 key attributes as kwargs. 719 720 Optionally accepts a ``consistent`` parameter, which should be a 721 boolean. If you provide ``True``, it will perform 722 a consistent (but more expensive) read from DynamoDB. 723 (Default: ``False``) 724 725 Optionally accepts an ``attributes`` parameter, which should be a 726 list of fieldnames to fetch. (Default: ``None``, which means all fields 727 should be fetched) 728 729 Returns ``True`` if an ``Item`` is present, ``False`` if not. 730 731 Example:: 732 733 # Simple, just hash-key schema. 734 >>> users.has_item(username='johndoe') 735 True 736 737 # Complex schema, item not present. 738 >>> users.has_item( 739 ... username='johndoe', 740 ... date_joined='2014-01-07' 741 ... ) 742 False 743 744 """ 745 try: 746 self.get_item(**kwargs) 747 except (JSONResponseError, exceptions.ItemNotFound): 748 return False 749 750 return True 751 752 def lookup(self, *args, **kwargs): 753 """ 754 Look up an entry in DynamoDB. This is mostly backwards compatible 755 with boto.dynamodb. Unlike get_item, it takes hash_key and range_key first, 756 although you may still specify keyword arguments instead. 757 758 Also unlike the get_item command, if the returned item has no keys 759 (i.e., it does not exist in DynamoDB), a None result is returned, instead 760 of an empty key object. 761 762 Example:: 763 >>> user = users.lookup(username) 764 >>> user = users.lookup(username, consistent=True) 765 >>> app = apps.lookup('my_customer_id', 'my_app_id') 766 767 """ 768 if not self.schema: 769 self.describe() 770 for x, arg in enumerate(args): 771 kwargs[self.schema[x].name] = arg 772 ret = self.get_item(**kwargs) 773 if not ret.keys(): 774 return None 775 return ret 776 777 def new_item(self, *args): 778 """ 779 Returns a new, blank item 780 781 This is mostly for consistency with boto.dynamodb 782 """ 783 if not self.schema: 784 self.describe() 785 data = {} 786 for x, arg in enumerate(args): 787 data[self.schema[x].name] = arg 788 return Item(self, data=data) 789 790 def put_item(self, data, overwrite=False): 791 """ 792 Saves an entire item to DynamoDB. 793 794 By default, if any part of the ``Item``'s original data doesn't match 795 what's currently in DynamoDB, this request will fail. This prevents 796 other processes from updating the data in between when you read the 797 item & when your request to update the item's data is processed, which 798 would typically result in some data loss. 799 800 Requires a ``data`` parameter, which should be a dictionary of the data 801 you'd like to store in DynamoDB. 802 803 Optionally accepts an ``overwrite`` parameter, which should be a 804 boolean. If you provide ``True``, this will tell DynamoDB to blindly 805 overwrite whatever data is present, if any. 806 807 Returns ``True`` on success. 808 809 Example:: 810 811 >>> users.put_item(data={ 812 ... 'username': 'jane', 813 ... 'first_name': 'Jane', 814 ... 'last_name': 'Doe', 815 ... 'date_joined': 126478915, 816 ... }) 817 True 818 819 """ 820 item = Item(self, data=data) 821 return item.save(overwrite=overwrite) 822 823 def _put_item(self, item_data, expects=None): 824 """ 825 The internal variant of ``put_item`` (full data). This is used by the 826 ``Item`` objects, since that operation is represented at the 827 table-level by the API, but conceptually maps better to telling an 828 individual ``Item`` to save itself. 829 """ 830 kwargs = {} 831 832 if expects is not None: 833 kwargs['expected'] = expects 834 835 self.connection.put_item(self.table_name, item_data, **kwargs) 836 return True 837 838 def _update_item(self, key, item_data, expects=None): 839 """ 840 The internal variant of ``put_item`` (partial data). This is used by the 841 ``Item`` objects, since that operation is represented at the 842 table-level by the API, but conceptually maps better to telling an 843 individual ``Item`` to save itself. 844 """ 845 raw_key = self._encode_keys(key) 846 kwargs = {} 847 848 if expects is not None: 849 kwargs['expected'] = expects 850 851 self.connection.update_item(self.table_name, raw_key, item_data, **kwargs) 852 return True 853 854 def delete_item(self, expected=None, conditional_operator=None, **kwargs): 855 """ 856 Deletes a single item. You can perform a conditional delete operation 857 that deletes the item if it exists, or if it has an expected attribute 858 value. 859 860 Conditional deletes are useful for only deleting items if specific 861 conditions are met. If those conditions are met, DynamoDB performs 862 the delete. Otherwise, the item is not deleted. 863 864 To specify the expected attribute values of the item, you can pass a 865 dictionary of conditions to ``expected``. Each condition should follow 866 the pattern ``<attributename>__<comparison_operator>=<value_to_expect>``. 867 868 **IMPORTANT** - Be careful when using this method, there is no undo. 869 870 To specify the key of the item you'd like to get, you can specify the 871 key attributes as kwargs. 872 873 Optionally accepts an ``expected`` parameter which is a dictionary of 874 expected attribute value conditions. 875 876 Optionally accepts a ``conditional_operator`` which applies to the 877 expected attribute value conditions: 878 879 + `AND` - If all of the conditions evaluate to true (default) 880 + `OR` - True if at least one condition evaluates to true 881 882 Returns ``True`` on success, ``False`` on failed conditional delete. 883 884 Example:: 885 886 # A simple hash key. 887 >>> users.delete_item(username='johndoe') 888 True 889 890 # A complex hash+range key. 891 >>> users.delete_item(username='jane', last_name='Doe') 892 True 893 894 # With a key that is an invalid variable name in Python. 895 # Also, assumes a different schema than previous examples. 896 >>> users.delete_item(**{ 897 ... 'date-joined': 127549192, 898 ... }) 899 True 900 901 # Conditional delete 902 >>> users.delete_item(username='johndoe', 903 ... expected={'balance__eq': 0}) 904 True 905 """ 906 expected = self._build_filters(expected, using=FILTER_OPERATORS) 907 raw_key = self._encode_keys(kwargs) 908 909 try: 910 self.connection.delete_item(self.table_name, raw_key, 911 expected=expected, 912 conditional_operator=conditional_operator) 913 except exceptions.ConditionalCheckFailedException: 914 return False 915 916 return True 917 918 def get_key_fields(self): 919 """ 920 Returns the fields necessary to make a key for a table. 921 922 If the ``Table`` does not already have a populated ``schema``, 923 this will request it via a ``Table.describe`` call. 924 925 Returns a list of fieldnames (strings). 926 927 Example:: 928 929 # A simple hash key. 930 >>> users.get_key_fields() 931 ['username'] 932 933 # A complex hash+range key. 934 >>> users.get_key_fields() 935 ['username', 'last_name'] 936 937 """ 938 if not self.schema: 939 # We don't know the structure of the table. Get a description to 940 # populate the schema. 941 self.describe() 942 943 return [field.name for field in self.schema] 944 945 def batch_write(self): 946 """ 947 Allows the batching of writes to DynamoDB. 948 949 Since each write/delete call to DynamoDB has a cost associated with it, 950 when loading lots of data, it makes sense to batch them, creating as 951 few calls as possible. 952 953 This returns a context manager that will transparently handle creating 954 these batches. The object you get back lightly-resembles a ``Table`` 955 object, sharing just the ``put_item`` & ``delete_item`` methods 956 (which are all that DynamoDB can batch in terms of writing data). 957 958 DynamoDB's maximum batch size is 25 items per request. If you attempt 959 to put/delete more than that, the context manager will batch as many 960 as it can up to that number, then flush them to DynamoDB & continue 961 batching as more calls come in. 962 963 Example:: 964 965 # Assuming a table with one record... 966 >>> with users.batch_write() as batch: 967 ... batch.put_item(data={ 968 ... 'username': 'johndoe', 969 ... 'first_name': 'John', 970 ... 'last_name': 'Doe', 971 ... 'owner': 1, 972 ... }) 973 ... # Nothing across the wire yet. 974 ... batch.delete_item(username='bob') 975 ... # Still no requests sent. 976 ... batch.put_item(data={ 977 ... 'username': 'jane', 978 ... 'first_name': 'Jane', 979 ... 'last_name': 'Doe', 980 ... 'date_joined': 127436192, 981 ... }) 982 ... # Nothing yet, but once we leave the context, the 983 ... # put/deletes will be sent. 984 985 """ 986 # PHENOMENAL COSMIC DOCS!!! itty-bitty code. 987 return BatchTable(self) 988 989 def _build_filters(self, filter_kwargs, using=QUERY_OPERATORS): 990 """ 991 An internal method for taking query/scan-style ``**kwargs`` & turning 992 them into the raw structure DynamoDB expects for filtering. 993 """ 994 if filter_kwargs is None: 995 return 996 997 filters = {} 998 999 for field_and_op, value in filter_kwargs.items(): 1000 field_bits = field_and_op.split('__') 1001 fieldname = '__'.join(field_bits[:-1]) 1002 1003 try: 1004 op = using[field_bits[-1]] 1005 except KeyError: 1006 raise exceptions.UnknownFilterTypeError( 1007 "Operator '%s' from '%s' is not recognized." % ( 1008 field_bits[-1], 1009 field_and_op 1010 ) 1011 ) 1012 1013 lookup = { 1014 'AttributeValueList': [], 1015 'ComparisonOperator': op, 1016 } 1017 1018 # Special-case the ``NULL/NOT_NULL`` case. 1019 if field_bits[-1] == 'null': 1020 del lookup['AttributeValueList'] 1021 1022 if value is False: 1023 lookup['ComparisonOperator'] = 'NOT_NULL' 1024 else: 1025 lookup['ComparisonOperator'] = 'NULL' 1026 # Special-case the ``BETWEEN`` case. 1027 elif field_bits[-1] == 'between': 1028 if len(value) == 2 and isinstance(value, (list, tuple)): 1029 lookup['AttributeValueList'].append( 1030 self._dynamizer.encode(value[0]) 1031 ) 1032 lookup['AttributeValueList'].append( 1033 self._dynamizer.encode(value[1]) 1034 ) 1035 # Special-case the ``IN`` case 1036 elif field_bits[-1] == 'in': 1037 for val in value: 1038 lookup['AttributeValueList'].append(self._dynamizer.encode(val)) 1039 else: 1040 # Fix up the value for encoding, because it was built to only work 1041 # with ``set``s. 1042 if isinstance(value, (list, tuple)): 1043 value = set(value) 1044 lookup['AttributeValueList'].append( 1045 self._dynamizer.encode(value) 1046 ) 1047 1048 # Finally, insert it into the filters. 1049 filters[fieldname] = lookup 1050 1051 return filters 1052 1053 def query(self, limit=None, index=None, reverse=False, consistent=False, 1054 attributes=None, max_page_size=None, **filter_kwargs): 1055 """ 1056 **WARNING:** This method is provided **strictly** for 1057 backward-compatibility. It returns results in an incorrect order. 1058 1059 If you are writing new code, please use ``Table.query_2``. 1060 """ 1061 reverse = not reverse 1062 return self.query_2(limit=limit, index=index, reverse=reverse, 1063 consistent=consistent, attributes=attributes, 1064 max_page_size=max_page_size, **filter_kwargs) 1065 1066 def query_2(self, limit=None, index=None, reverse=False, 1067 consistent=False, attributes=None, max_page_size=None, 1068 query_filter=None, conditional_operator=None, 1069 **filter_kwargs): 1070 """ 1071 Queries for a set of matching items in a DynamoDB table. 1072 1073 Queries can be performed against a hash key, a hash+range key or 1074 against any data stored in your local secondary indexes. Query filters 1075 can be used to filter on arbitrary fields. 1076 1077 **Note** - You can not query against arbitrary fields within the data 1078 stored in DynamoDB unless you specify ``query_filter`` values. 1079 1080 To specify the filters of the items you'd like to get, you can specify 1081 the filters as kwargs. Each filter kwarg should follow the pattern 1082 ``<fieldname>__<filter_operation>=<value_to_look_for>``. Query filters 1083 are specified in the same way. 1084 1085 Optionally accepts a ``limit`` parameter, which should be an integer 1086 count of the total number of items to return. (Default: ``None`` - 1087 all results) 1088 1089 Optionally accepts an ``index`` parameter, which should be a string of 1090 name of the local secondary index you want to query against. 1091 (Default: ``None``) 1092 1093 Optionally accepts a ``reverse`` parameter, which will present the 1094 results in reverse order. (Default: ``False`` - normal order) 1095 1096 Optionally accepts a ``consistent`` parameter, which should be a 1097 boolean. If you provide ``True``, it will force a consistent read of 1098 the data (more expensive). (Default: ``False`` - use eventually 1099 consistent reads) 1100 1101 Optionally accepts a ``attributes`` parameter, which should be a 1102 tuple. If you provide any attributes only these will be fetched 1103 from DynamoDB. This uses the ``AttributesToGet`` and set's 1104 ``Select`` to ``SPECIFIC_ATTRIBUTES`` API. 1105 1106 Optionally accepts a ``max_page_size`` parameter, which should be an 1107 integer count of the maximum number of items to retrieve 1108 **per-request**. This is useful in making faster requests & prevent 1109 the scan from drowning out other queries. (Default: ``None`` - 1110 fetch as many as DynamoDB will return) 1111 1112 Optionally accepts a ``query_filter`` which is a dictionary of filter 1113 conditions against any arbitrary field in the returned data. 1114 1115 Optionally accepts a ``conditional_operator`` which applies to the 1116 query filter conditions: 1117 1118 + `AND` - True if all filter conditions evaluate to true (default) 1119 + `OR` - True if at least one filter condition evaluates to true 1120 1121 Returns a ``ResultSet``, which transparently handles the pagination of 1122 results you get back. 1123 1124 Example:: 1125 1126 # Look for last names equal to "Doe". 1127 >>> results = users.query(last_name__eq='Doe') 1128 >>> for res in results: 1129 ... print res['first_name'] 1130 'John' 1131 'Jane' 1132 1133 # Look for last names beginning with "D", in reverse order, limit 3. 1134 >>> results = users.query( 1135 ... last_name__beginswith='D', 1136 ... reverse=True, 1137 ... limit=3 1138 ... ) 1139 >>> for res in results: 1140 ... print res['first_name'] 1141 'Alice' 1142 'Jane' 1143 'John' 1144 1145 # Use an LSI & a consistent read. 1146 >>> results = users.query( 1147 ... date_joined__gte=1236451000, 1148 ... owner__eq=1, 1149 ... index='DateJoinedIndex', 1150 ... consistent=True 1151 ... ) 1152 >>> for res in results: 1153 ... print res['first_name'] 1154 'Alice' 1155 'Bob' 1156 'John' 1157 'Fred' 1158 1159 # Filter by non-indexed field(s) 1160 >>> results = users.query( 1161 ... last_name__eq='Doe', 1162 ... reverse=True, 1163 ... query_filter={ 1164 ... 'first_name__beginswith': 'A' 1165 ... } 1166 ... ) 1167 >>> for res in results: 1168 ... print res['first_name'] + ' ' + res['last_name'] 1169 'Alice Doe' 1170 1171 """ 1172 if self.schema: 1173 if len(self.schema) == 1: 1174 if len(filter_kwargs) <= 1: 1175 if not self.global_indexes or not len(self.global_indexes): 1176 # If the schema only has one field, there's <= 1 filter 1177 # param & no Global Secondary Indexes, this is user 1178 # error. Bail early. 1179 raise exceptions.QueryError( 1180 "You must specify more than one key to filter on." 1181 ) 1182 1183 if attributes is not None: 1184 select = 'SPECIFIC_ATTRIBUTES' 1185 else: 1186 select = None 1187 1188 results = ResultSet( 1189 max_page_size=max_page_size 1190 ) 1191 kwargs = filter_kwargs.copy() 1192 kwargs.update({ 1193 'limit': limit, 1194 'index': index, 1195 'reverse': reverse, 1196 'consistent': consistent, 1197 'select': select, 1198 'attributes_to_get': attributes, 1199 'query_filter': query_filter, 1200 'conditional_operator': conditional_operator, 1201 }) 1202 results.to_call(self._query, **kwargs) 1203 return results 1204 1205 def query_count(self, index=None, consistent=False, conditional_operator=None, 1206 query_filter=None, scan_index_forward=True, limit=None, 1207 exclusive_start_key=None, **filter_kwargs): 1208 """ 1209 Queries the exact count of matching items in a DynamoDB table. 1210 1211 Queries can be performed against a hash key, a hash+range key or 1212 against any data stored in your local secondary indexes. Query filters 1213 can be used to filter on arbitrary fields. 1214 1215 To specify the filters of the items you'd like to get, you can specify 1216 the filters as kwargs. Each filter kwarg should follow the pattern 1217 ``<fieldname>__<filter_operation>=<value_to_look_for>``. Query filters 1218 are specified in the same way. 1219 1220 Optionally accepts an ``index`` parameter, which should be a string of 1221 name of the local secondary index you want to query against. 1222 (Default: ``None``) 1223 1224 Optionally accepts a ``consistent`` parameter, which should be a 1225 boolean. If you provide ``True``, it will force a consistent read of 1226 the data (more expensive). (Default: ``False`` - use eventually 1227 consistent reads) 1228 1229 Optionally accepts a ``query_filter`` which is a dictionary of filter 1230 conditions against any arbitrary field in the returned data. 1231 1232 Optionally accepts a ``conditional_operator`` which applies to the 1233 query filter conditions: 1234 1235 + `AND` - True if all filter conditions evaluate to true (default) 1236 + `OR` - True if at least one filter condition evaluates to true 1237 1238 Optionally accept a ``exclusive_start_key`` which is used to get 1239 the remaining items when a query cannot return the complete count. 1240 1241 Returns an integer which represents the exact amount of matched 1242 items. 1243 1244 :type scan_index_forward: boolean 1245 :param scan_index_forward: Specifies ascending (true) or descending 1246 (false) traversal of the index. DynamoDB returns results reflecting 1247 the requested order determined by the range key. If the data type 1248 is Number, the results are returned in numeric order. For String, 1249 the results are returned in order of ASCII character code values. 1250 For Binary, DynamoDB treats each byte of the binary data as 1251 unsigned when it compares binary values. 1252 1253 If ScanIndexForward is not specified, the results are returned in 1254 ascending order. 1255 1256 :type limit: integer 1257 :param limit: The maximum number of items to evaluate (not necessarily 1258 the number of matching items). 1259 1260 Example:: 1261 1262 # Look for last names equal to "Doe". 1263 >>> users.query_count(last_name__eq='Doe') 1264 5 1265 1266 # Use an LSI & a consistent read. 1267 >>> users.query_count( 1268 ... date_joined__gte=1236451000, 1269 ... owner__eq=1, 1270 ... index='DateJoinedIndex', 1271 ... consistent=True 1272 ... ) 1273 2 1274 1275 """ 1276 key_conditions = self._build_filters( 1277 filter_kwargs, 1278 using=QUERY_OPERATORS 1279 ) 1280 1281 built_query_filter = self._build_filters( 1282 query_filter, 1283 using=FILTER_OPERATORS 1284 ) 1285 1286 count_buffer = 0 1287 last_evaluated_key = exclusive_start_key 1288 1289 while True: 1290 raw_results = self.connection.query( 1291 self.table_name, 1292 index_name=index, 1293 consistent_read=consistent, 1294 select='COUNT', 1295 key_conditions=key_conditions, 1296 query_filter=built_query_filter, 1297 conditional_operator=conditional_operator, 1298 limit=limit, 1299 scan_index_forward=scan_index_forward, 1300 exclusive_start_key=last_evaluated_key 1301 ) 1302 1303 count_buffer += int(raw_results.get('Count', 0)) 1304 last_evaluated_key = raw_results.get('LastEvaluatedKey') 1305 if not last_evaluated_key or count_buffer < 1: 1306 break 1307 1308 return count_buffer 1309 1310 def _query(self, limit=None, index=None, reverse=False, consistent=False, 1311 exclusive_start_key=None, select=None, attributes_to_get=None, 1312 query_filter=None, conditional_operator=None, **filter_kwargs): 1313 """ 1314 The internal method that performs the actual queries. Used extensively 1315 by ``ResultSet`` to perform each (paginated) request. 1316 """ 1317 kwargs = { 1318 'limit': limit, 1319 'index_name': index, 1320 'consistent_read': consistent, 1321 'select': select, 1322 'attributes_to_get': attributes_to_get, 1323 'conditional_operator': conditional_operator, 1324 } 1325 1326 if reverse: 1327 kwargs['scan_index_forward'] = False 1328 1329 if exclusive_start_key: 1330 kwargs['exclusive_start_key'] = {} 1331 1332 for key, value in exclusive_start_key.items(): 1333 kwargs['exclusive_start_key'][key] = \ 1334 self._dynamizer.encode(value) 1335 1336 # Convert the filters into something we can actually use. 1337 kwargs['key_conditions'] = self._build_filters( 1338 filter_kwargs, 1339 using=QUERY_OPERATORS 1340 ) 1341 1342 kwargs['query_filter'] = self._build_filters( 1343 query_filter, 1344 using=FILTER_OPERATORS 1345 ) 1346 1347 raw_results = self.connection.query( 1348 self.table_name, 1349 **kwargs 1350 ) 1351 results = [] 1352 last_key = None 1353 1354 for raw_item in raw_results.get('Items', []): 1355 item = Item(self) 1356 item.load({ 1357 'Item': raw_item, 1358 }) 1359 results.append(item) 1360 1361 if raw_results.get('LastEvaluatedKey', None): 1362 last_key = {} 1363 1364 for key, value in raw_results['LastEvaluatedKey'].items(): 1365 last_key[key] = self._dynamizer.decode(value) 1366 1367 return { 1368 'results': results, 1369 'last_key': last_key, 1370 } 1371 1372 def scan(self, limit=None, segment=None, total_segments=None, 1373 max_page_size=None, attributes=None, conditional_operator=None, 1374 **filter_kwargs): 1375 """ 1376 Scans across all items within a DynamoDB table. 1377 1378 Scans can be performed against a hash key or a hash+range key. You can 1379 additionally filter the results after the table has been read but 1380 before the response is returned by using query filters. 1381 1382 To specify the filters of the items you'd like to get, you can specify 1383 the filters as kwargs. Each filter kwarg should follow the pattern 1384 ``<fieldname>__<filter_operation>=<value_to_look_for>``. 1385 1386 Optionally accepts a ``limit`` parameter, which should be an integer 1387 count of the total number of items to return. (Default: ``None`` - 1388 all results) 1389 1390 Optionally accepts a ``segment`` parameter, which should be an integer 1391 of the segment to retrieve on. Please see the documentation about 1392 Parallel Scans (Default: ``None`` - no segments) 1393 1394 Optionally accepts a ``total_segments`` parameter, which should be an 1395 integer count of number of segments to divide the table into. 1396 Please see the documentation about Parallel Scans (Default: ``None`` - 1397 no segments) 1398 1399 Optionally accepts a ``max_page_size`` parameter, which should be an 1400 integer count of the maximum number of items to retrieve 1401 **per-request**. This is useful in making faster requests & prevent 1402 the scan from drowning out other queries. (Default: ``None`` - 1403 fetch as many as DynamoDB will return) 1404 1405 Optionally accepts an ``attributes`` parameter, which should be a 1406 tuple. If you provide any attributes only these will be fetched 1407 from DynamoDB. This uses the ``AttributesToGet`` and set's 1408 ``Select`` to ``SPECIFIC_ATTRIBUTES`` API. 1409 1410 Returns a ``ResultSet``, which transparently handles the pagination of 1411 results you get back. 1412 1413 Example:: 1414 1415 # All results. 1416 >>> everything = users.scan() 1417 1418 # Look for last names beginning with "D". 1419 >>> results = users.scan(last_name__beginswith='D') 1420 >>> for res in results: 1421 ... print res['first_name'] 1422 'Alice' 1423 'John' 1424 'Jane' 1425 1426 # Use an ``IN`` filter & limit. 1427 >>> results = users.scan( 1428 ... age__in=[25, 26, 27, 28, 29], 1429 ... limit=1 1430 ... ) 1431 >>> for res in results: 1432 ... print res['first_name'] 1433 'Alice' 1434 1435 """ 1436 results = ResultSet( 1437 max_page_size=max_page_size 1438 ) 1439 kwargs = filter_kwargs.copy() 1440 kwargs.update({ 1441 'limit': limit, 1442 'segment': segment, 1443 'total_segments': total_segments, 1444 'attributes': attributes, 1445 'conditional_operator': conditional_operator, 1446 }) 1447 results.to_call(self._scan, **kwargs) 1448 return results 1449 1450 def _scan(self, limit=None, exclusive_start_key=None, segment=None, 1451 total_segments=None, attributes=None, conditional_operator=None, 1452 **filter_kwargs): 1453 """ 1454 The internal method that performs the actual scan. Used extensively 1455 by ``ResultSet`` to perform each (paginated) request. 1456 """ 1457 kwargs = { 1458 'limit': limit, 1459 'segment': segment, 1460 'total_segments': total_segments, 1461 'attributes_to_get': attributes, 1462 'conditional_operator': conditional_operator, 1463 } 1464 1465 if exclusive_start_key: 1466 kwargs['exclusive_start_key'] = {} 1467 1468 for key, value in exclusive_start_key.items(): 1469 kwargs['exclusive_start_key'][key] = \ 1470 self._dynamizer.encode(value) 1471 1472 # Convert the filters into something we can actually use. 1473 kwargs['scan_filter'] = self._build_filters( 1474 filter_kwargs, 1475 using=FILTER_OPERATORS 1476 ) 1477 1478 raw_results = self.connection.scan( 1479 self.table_name, 1480 **kwargs 1481 ) 1482 results = [] 1483 last_key = None 1484 1485 for raw_item in raw_results.get('Items', []): 1486 item = Item(self) 1487 item.load({ 1488 'Item': raw_item, 1489 }) 1490 results.append(item) 1491 1492 if raw_results.get('LastEvaluatedKey', None): 1493 last_key = {} 1494 1495 for key, value in raw_results['LastEvaluatedKey'].items(): 1496 last_key[key] = self._dynamizer.decode(value) 1497 1498 return { 1499 'results': results, 1500 'last_key': last_key, 1501 } 1502 1503 def batch_get(self, keys, consistent=False, attributes=None): 1504 """ 1505 Fetches many specific items in batch from a table. 1506 1507 Requires a ``keys`` parameter, which should be a list of dictionaries. 1508 Each dictionary should consist of the keys values to specify. 1509 1510 Optionally accepts a ``consistent`` parameter, which should be a 1511 boolean. If you provide ``True``, a strongly consistent read will be 1512 used. (Default: False) 1513 1514 Optionally accepts an ``attributes`` parameter, which should be a 1515 tuple. If you provide any attributes only these will be fetched 1516 from DynamoDB. 1517 1518 Returns a ``ResultSet``, which transparently handles the pagination of 1519 results you get back. 1520 1521 Example:: 1522 1523 >>> results = users.batch_get(keys=[ 1524 ... { 1525 ... 'username': 'johndoe', 1526 ... }, 1527 ... { 1528 ... 'username': 'jane', 1529 ... }, 1530 ... { 1531 ... 'username': 'fred', 1532 ... }, 1533 ... ]) 1534 >>> for res in results: 1535 ... print res['first_name'] 1536 'John' 1537 'Jane' 1538 'Fred' 1539 1540 """ 1541 # We pass the keys to the constructor instead, so it can maintain it's 1542 # own internal state as to what keys have been processed. 1543 results = BatchGetResultSet(keys=keys, max_batch_get=self.max_batch_get) 1544 results.to_call(self._batch_get, consistent=consistent, attributes=attributes) 1545 return results 1546 1547 def _batch_get(self, keys, consistent=False, attributes=None): 1548 """ 1549 The internal method that performs the actual batch get. Used extensively 1550 by ``BatchGetResultSet`` to perform each (paginated) request. 1551 """ 1552 items = { 1553 self.table_name: { 1554 'Keys': [], 1555 }, 1556 } 1557 1558 if consistent: 1559 items[self.table_name]['ConsistentRead'] = True 1560 1561 if attributes is not None: 1562 items[self.table_name]['AttributesToGet'] = attributes 1563 1564 for key_data in keys: 1565 raw_key = {} 1566 1567 for key, value in key_data.items(): 1568 raw_key[key] = self._dynamizer.encode(value) 1569 1570 items[self.table_name]['Keys'].append(raw_key) 1571 1572 raw_results = self.connection.batch_get_item(request_items=items) 1573 results = [] 1574 unprocessed_keys = [] 1575 1576 for raw_item in raw_results['Responses'].get(self.table_name, []): 1577 item = Item(self) 1578 item.load({ 1579 'Item': raw_item, 1580 }) 1581 results.append(item) 1582 1583 raw_unproccessed = raw_results.get('UnprocessedKeys', {}) 1584 1585 for raw_key in raw_unproccessed.get('Keys', []): 1586 py_key = {} 1587 1588 for key, value in raw_key.items(): 1589 py_key[key] = self._dynamizer.decode(value) 1590 1591 unprocessed_keys.append(py_key) 1592 1593 return { 1594 'results': results, 1595 # NEVER return a ``last_key``. Just in-case any part of 1596 # ``ResultSet`` peeks through, since much of the 1597 # original underlying implementation is based on this key. 1598 'last_key': None, 1599 'unprocessed_keys': unprocessed_keys, 1600 } 1601 1602 def count(self): 1603 """ 1604 Returns a (very) eventually consistent count of the number of items 1605 in a table. 1606 1607 Lag time is about 6 hours, so don't expect a high degree of accuracy. 1608 1609 Example:: 1610 1611 >>> users.count() 1612 6 1613 1614 """ 1615 info = self.describe() 1616 return info['Table'].get('ItemCount', 0) 1617 1618 1619class BatchTable(object): 1620 """ 1621 Used by ``Table`` as the context manager for batch writes. 1622 1623 You likely don't want to try to use this object directly. 1624 """ 1625 def __init__(self, table): 1626 self.table = table 1627 self._to_put = [] 1628 self._to_delete = [] 1629 self._unprocessed = [] 1630 1631 def __enter__(self): 1632 return self 1633 1634 def __exit__(self, type, value, traceback): 1635 if self._to_put or self._to_delete: 1636 # Flush anything that's left. 1637 self.flush() 1638 1639 if self._unprocessed: 1640 # Finally, handle anything that wasn't processed. 1641 self.resend_unprocessed() 1642 1643 def put_item(self, data, overwrite=False): 1644 self._to_put.append(data) 1645 1646 if self.should_flush(): 1647 self.flush() 1648 1649 def delete_item(self, **kwargs): 1650 self._to_delete.append(kwargs) 1651 1652 if self.should_flush(): 1653 self.flush() 1654 1655 def should_flush(self): 1656 if len(self._to_put) + len(self._to_delete) == 25: 1657 return True 1658 1659 return False 1660 1661 def flush(self): 1662 batch_data = { 1663 self.table.table_name: [ 1664 # We'll insert data here shortly. 1665 ], 1666 } 1667 1668 for put in self._to_put: 1669 item = Item(self.table, data=put) 1670 batch_data[self.table.table_name].append({ 1671 'PutRequest': { 1672 'Item': item.prepare_full(), 1673 } 1674 }) 1675 1676 for delete in self._to_delete: 1677 batch_data[self.table.table_name].append({ 1678 'DeleteRequest': { 1679 'Key': self.table._encode_keys(delete), 1680 } 1681 }) 1682 1683 resp = self.table.connection.batch_write_item(batch_data) 1684 self.handle_unprocessed(resp) 1685 1686 self._to_put = [] 1687 self._to_delete = [] 1688 return True 1689 1690 def handle_unprocessed(self, resp): 1691 if len(resp.get('UnprocessedItems', [])): 1692 table_name = self.table.table_name 1693 unprocessed = resp['UnprocessedItems'].get(table_name, []) 1694 1695 # Some items have not been processed. Stow them for now & 1696 # re-attempt processing on ``__exit__``. 1697 msg = "%s items were unprocessed. Storing for later." 1698 boto.log.info(msg % len(unprocessed)) 1699 self._unprocessed.extend(unprocessed) 1700 1701 def resend_unprocessed(self): 1702 # If there are unprocessed records (for instance, the user was over 1703 # their throughput limitations), iterate over them & send until they're 1704 # all there. 1705 boto.log.info( 1706 "Re-sending %s unprocessed items." % len(self._unprocessed) 1707 ) 1708 1709 while len(self._unprocessed): 1710 # Again, do 25 at a time. 1711 to_resend = self._unprocessed[:25] 1712 # Remove them from the list. 1713 self._unprocessed = self._unprocessed[25:] 1714 batch_data = { 1715 self.table.table_name: to_resend 1716 } 1717 boto.log.info("Sending %s items" % len(to_resend)) 1718 resp = self.table.connection.batch_write_item(batch_data) 1719 self.handle_unprocessed(resp) 1720 boto.log.info( 1721 "%s unprocessed items left" % len(self._unprocessed) 1722 ) 1723