1import boto
2from boto.dynamodb2 import exceptions
3from boto.dynamodb2.fields import (HashKey, RangeKey,
4                                   AllIndex, KeysOnlyIndex, IncludeIndex,
5                                   GlobalAllIndex, GlobalKeysOnlyIndex,
6                                   GlobalIncludeIndex)
7from boto.dynamodb2.items import Item
8from boto.dynamodb2.layer1 import DynamoDBConnection
9from boto.dynamodb2.results import ResultSet, BatchGetResultSet
10from boto.dynamodb2.types import (NonBooleanDynamizer, Dynamizer, FILTER_OPERATORS,
11                                  QUERY_OPERATORS, STRING)
12from boto.exception import JSONResponseError
13
14
15class Table(object):
16    """
17    Interacts & models the behavior of a DynamoDB table.
18
19    The ``Table`` object represents a set (or rough categorization) of
20    records within DynamoDB. The important part is that all records within the
21    table, while largely-schema-free, share the same schema & are essentially
22    namespaced for use in your application. For example, you might have a
23    ``users`` table or a ``forums`` table.
24    """
25    max_batch_get = 100
26
27    _PROJECTION_TYPE_TO_INDEX = dict(
28        global_indexes=dict(
29            ALL=GlobalAllIndex,
30            KEYS_ONLY=GlobalKeysOnlyIndex,
31            INCLUDE=GlobalIncludeIndex,
32        ), local_indexes=dict(
33            ALL=AllIndex,
34            KEYS_ONLY=KeysOnlyIndex,
35            INCLUDE=IncludeIndex,
36        )
37    )
38
39    def __init__(self, table_name, schema=None, throughput=None, indexes=None,
40                 global_indexes=None, connection=None):
41        """
42        Sets up a new in-memory ``Table``.
43
44        This is useful if the table already exists within DynamoDB & you simply
45        want to use it for additional interactions. The only required parameter
46        is the ``table_name``. However, under the hood, the object will call
47        ``describe_table`` to determine the schema/indexes/throughput. You
48        can avoid this extra call by passing in ``schema`` & ``indexes``.
49
50        **IMPORTANT** - If you're creating a new ``Table`` for the first time,
51        you should use the ``Table.create`` method instead, as it will
52        persist the table structure to DynamoDB.
53
54        Requires a ``table_name`` parameter, which should be a simple string
55        of the name of the table.
56
57        Optionally accepts a ``schema`` parameter, which should be a list of
58        ``BaseSchemaField`` subclasses representing the desired schema.
59
60        Optionally accepts a ``throughput`` parameter, which should be a
61        dictionary. If provided, it should specify a ``read`` & ``write`` key,
62        both of which should have an integer value associated with them.
63
64        Optionally accepts a ``indexes`` parameter, which should be a list of
65        ``BaseIndexField`` subclasses representing the desired indexes.
66
67        Optionally accepts a ``global_indexes`` parameter, which should be a
68        list of ``GlobalBaseIndexField`` subclasses representing the desired
69        indexes.
70
71        Optionally accepts a ``connection`` parameter, which should be a
72        ``DynamoDBConnection`` instance (or subclass). This is primarily useful
73        for specifying alternate connection parameters.
74
75        Example::
76
77            # The simple, it-already-exists case.
78            >>> conn = Table('users')
79
80            # The full, minimum-extra-calls case.
81            >>> from boto import dynamodb2
82            >>> users = Table('users', schema=[
83            ...     HashKey('username'),
84            ...     RangeKey('date_joined', data_type=NUMBER)
85            ... ], throughput={
86            ...     'read':20,
87            ...     'write': 10,
88            ... }, indexes=[
89            ...     KeysOnlyIndex('MostRecentlyJoined', parts=[
90            ...         HashKey('username')
91            ...         RangeKey('date_joined')
92            ...     ]),
93            ... ], global_indexes=[
94            ...     GlobalAllIndex('UsersByZipcode', parts=[
95            ...         HashKey('zipcode'),
96            ...         RangeKey('username'),
97            ...     ],
98            ...     throughput={
99            ...       'read':10,
100            ...       'write":10,
101            ...     }),
102            ... ], connection=dynamodb2.connect_to_region('us-west-2',
103            ...     aws_access_key_id='key',
104            ...     aws_secret_access_key='key',
105            ... ))
106
107        """
108        self.table_name = table_name
109        self.connection = connection
110        self.throughput = {
111            'read': 5,
112            'write': 5,
113        }
114        self.schema = schema
115        self.indexes = indexes
116        self.global_indexes = global_indexes
117
118        if self.connection is None:
119            self.connection = DynamoDBConnection()
120
121        if throughput is not None:
122            self.throughput = throughput
123
124        self._dynamizer = NonBooleanDynamizer()
125
126    def use_boolean(self):
127        self._dynamizer = Dynamizer()
128
129    @classmethod
130    def create(cls, table_name, schema, throughput=None, indexes=None,
131               global_indexes=None, connection=None):
132        """
133        Creates a new table in DynamoDB & returns an in-memory ``Table`` object.
134
135        This will setup a brand new table within DynamoDB. The ``table_name``
136        must be unique for your AWS account. The ``schema`` is also required
137        to define the key structure of the table.
138
139        **IMPORTANT** - You should consider the usage pattern of your table
140        up-front, as the schema can **NOT** be modified once the table is
141        created, requiring the creation of a new table & migrating the data
142        should you wish to revise it.
143
144        **IMPORTANT** - If the table already exists in DynamoDB, additional
145        calls to this method will result in an error. If you just need
146        a ``Table`` object to interact with the existing table, you should
147        just initialize a new ``Table`` object, which requires only the
148        ``table_name``.
149
150        Requires a ``table_name`` parameter, which should be a simple string
151        of the name of the table.
152
153        Requires a ``schema`` parameter, which should be a list of
154        ``BaseSchemaField`` subclasses representing the desired schema.
155
156        Optionally accepts a ``throughput`` parameter, which should be a
157        dictionary. If provided, it should specify a ``read`` & ``write`` key,
158        both of which should have an integer value associated with them.
159
160        Optionally accepts a ``indexes`` parameter, which should be a list of
161        ``BaseIndexField`` subclasses representing the desired indexes.
162
163        Optionally accepts a ``global_indexes`` parameter, which should be a
164        list of ``GlobalBaseIndexField`` subclasses representing the desired
165        indexes.
166
167        Optionally accepts a ``connection`` parameter, which should be a
168        ``DynamoDBConnection`` instance (or subclass). This is primarily useful
169        for specifying alternate connection parameters.
170
171        Example::
172
173            >>> users = Table.create('users', schema=[
174            ...     HashKey('username'),
175            ...     RangeKey('date_joined', data_type=NUMBER)
176            ... ], throughput={
177            ...     'read':20,
178            ...     'write': 10,
179            ... }, indexes=[
180            ...     KeysOnlyIndex('MostRecentlyJoined', parts=[
181            ...         RangeKey('date_joined')
182            ... ]), global_indexes=[
183            ...     GlobalAllIndex('UsersByZipcode', parts=[
184            ...         HashKey('zipcode'),
185            ...         RangeKey('username'),
186            ...     ],
187            ...     throughput={
188            ...       'read':10,
189            ...       'write':10,
190            ...     }),
191            ... ])
192
193        """
194        table = cls(table_name=table_name, connection=connection)
195        table.schema = schema
196
197        if throughput is not None:
198            table.throughput = throughput
199
200        if indexes is not None:
201            table.indexes = indexes
202
203        if global_indexes is not None:
204            table.global_indexes = global_indexes
205
206        # Prep the schema.
207        raw_schema = []
208        attr_defs = []
209        seen_attrs = set()
210
211        for field in table.schema:
212            raw_schema.append(field.schema())
213            # Build the attributes off what we know.
214            seen_attrs.add(field.name)
215            attr_defs.append(field.definition())
216
217        raw_throughput = {
218            'ReadCapacityUnits': int(table.throughput['read']),
219            'WriteCapacityUnits': int(table.throughput['write']),
220        }
221        kwargs = {}
222
223        kwarg_map = {
224            'indexes': 'local_secondary_indexes',
225            'global_indexes': 'global_secondary_indexes',
226        }
227        for index_attr in ('indexes', 'global_indexes'):
228            table_indexes = getattr(table, index_attr)
229            if table_indexes:
230                raw_indexes = []
231                for index_field in table_indexes:
232                    raw_indexes.append(index_field.schema())
233                    # Make sure all attributes specified in the indexes are
234                    # added to the definition
235                    for field in index_field.parts:
236                        if field.name not in seen_attrs:
237                            seen_attrs.add(field.name)
238                            attr_defs.append(field.definition())
239
240                kwargs[kwarg_map[index_attr]] = raw_indexes
241
242        table.connection.create_table(
243            table_name=table.table_name,
244            attribute_definitions=attr_defs,
245            key_schema=raw_schema,
246            provisioned_throughput=raw_throughput,
247            **kwargs
248        )
249        return table
250
251    def _introspect_schema(self, raw_schema, raw_attributes=None):
252        """
253        Given a raw schema structure back from a DynamoDB response, parse
254        out & build the high-level Python objects that represent them.
255        """
256        schema = []
257        sane_attributes = {}
258
259        if raw_attributes:
260            for field in raw_attributes:
261                sane_attributes[field['AttributeName']] = field['AttributeType']
262
263        for field in raw_schema:
264            data_type = sane_attributes.get(field['AttributeName'], STRING)
265
266            if field['KeyType'] == 'HASH':
267                schema.append(
268                    HashKey(field['AttributeName'], data_type=data_type)
269                )
270            elif field['KeyType'] == 'RANGE':
271                schema.append(
272                    RangeKey(field['AttributeName'], data_type=data_type)
273                )
274            else:
275                raise exceptions.UnknownSchemaFieldError(
276                    "%s was seen, but is unknown. Please report this at "
277                    "https://github.com/boto/boto/issues." % field['KeyType']
278                )
279
280        return schema
281
282    def _introspect_all_indexes(self, raw_indexes, map_indexes_projection):
283        """
284        Given a raw index/global index structure back from a DynamoDB response,
285        parse out & build the high-level Python objects that represent them.
286        """
287        indexes = []
288
289        for field in raw_indexes:
290            index_klass = map_indexes_projection.get('ALL')
291            kwargs = {
292                'parts': []
293            }
294
295            if field['Projection']['ProjectionType'] == 'ALL':
296                index_klass = map_indexes_projection.get('ALL')
297            elif field['Projection']['ProjectionType'] == 'KEYS_ONLY':
298                index_klass = map_indexes_projection.get('KEYS_ONLY')
299            elif field['Projection']['ProjectionType'] == 'INCLUDE':
300                index_klass = map_indexes_projection.get('INCLUDE')
301                kwargs['includes'] = field['Projection']['NonKeyAttributes']
302            else:
303                raise exceptions.UnknownIndexFieldError(
304                    "%s was seen, but is unknown. Please report this at "
305                    "https://github.com/boto/boto/issues." % \
306                    field['Projection']['ProjectionType']
307                )
308
309            name = field['IndexName']
310            kwargs['parts'] = self._introspect_schema(field['KeySchema'], None)
311            indexes.append(index_klass(name, **kwargs))
312
313        return indexes
314
315    def _introspect_indexes(self, raw_indexes):
316        """
317        Given a raw index structure back from a DynamoDB response, parse
318        out & build the high-level Python objects that represent them.
319        """
320        return self._introspect_all_indexes(
321            raw_indexes, self._PROJECTION_TYPE_TO_INDEX.get('local_indexes'))
322
323    def _introspect_global_indexes(self, raw_global_indexes):
324        """
325        Given a raw global index structure back from a DynamoDB response, parse
326        out & build the high-level Python objects that represent them.
327        """
328        return self._introspect_all_indexes(
329            raw_global_indexes,
330            self._PROJECTION_TYPE_TO_INDEX.get('global_indexes'))
331
332    def describe(self):
333        """
334        Describes the current structure of the table in DynamoDB.
335
336        This information will be used to update the ``schema``, ``indexes``,
337        ``global_indexes`` and ``throughput`` information on the ``Table``. Some
338        calls, such as those involving creating keys or querying, will require
339        this information to be populated.
340
341        It also returns the full raw data structure from DynamoDB, in the
342        event you'd like to parse out additional information (such as the
343        ``ItemCount`` or usage information).
344
345        Example::
346
347            >>> users.describe()
348            {
349                # Lots of keys here...
350            }
351            >>> len(users.schema)
352            2
353
354        """
355        result = self.connection.describe_table(self.table_name)
356
357        # Blindly update throughput, since what's on DynamoDB's end is likely
358        # more correct.
359        raw_throughput = result['Table']['ProvisionedThroughput']
360        self.throughput['read'] = int(raw_throughput['ReadCapacityUnits'])
361        self.throughput['write'] = int(raw_throughput['WriteCapacityUnits'])
362
363        if not self.schema:
364            # Since we have the data, build the schema.
365            raw_schema = result['Table'].get('KeySchema', [])
366            raw_attributes = result['Table'].get('AttributeDefinitions', [])
367            self.schema = self._introspect_schema(raw_schema, raw_attributes)
368
369        if not self.indexes:
370            # Build the index information as well.
371            raw_indexes = result['Table'].get('LocalSecondaryIndexes', [])
372            self.indexes = self._introspect_indexes(raw_indexes)
373
374        # Build the global index information as well.
375        raw_global_indexes = result['Table'].get('GlobalSecondaryIndexes', [])
376        self.global_indexes = self._introspect_global_indexes(raw_global_indexes)
377
378        # This is leaky.
379        return result
380
381    def update(self, throughput=None, global_indexes=None):
382        """
383        Updates table attributes and global indexes in DynamoDB.
384
385        Optionally accepts a ``throughput`` parameter, which should be a
386        dictionary. If provided, it should specify a ``read`` & ``write`` key,
387        both of which should have an integer value associated with them.
388
389        Optionally accepts a ``global_indexes`` parameter, which should be a
390        dictionary. If provided, it should specify the index name, which is also
391        a dict containing a ``read`` & ``write`` key, both of which
392        should have an integer value associated with them. If you are writing
393        new code, please use ``Table.update_global_secondary_index``.
394
395        Returns ``True`` on success.
396
397        Example::
398
399            # For a read-heavier application...
400            >>> users.update(throughput={
401            ...     'read': 20,
402            ...     'write': 10,
403            ... })
404            True
405
406            # To also update the global index(es) throughput.
407            >>> users.update(throughput={
408            ...     'read': 20,
409            ...     'write': 10,
410            ... },
411            ... global_secondary_indexes={
412            ...     'TheIndexNameHere': {
413            ...         'read': 15,
414            ...         'write': 5,
415            ...     }
416            ... })
417            True
418        """
419
420        data = None
421
422        if throughput:
423            self.throughput = throughput
424            data = {
425                'ReadCapacityUnits': int(self.throughput['read']),
426                'WriteCapacityUnits': int(self.throughput['write']),
427            }
428
429        gsi_data = None
430
431        if global_indexes:
432            gsi_data = []
433
434            for gsi_name, gsi_throughput in global_indexes.items():
435                gsi_data.append({
436                    "Update": {
437                        "IndexName": gsi_name,
438                        "ProvisionedThroughput": {
439                            "ReadCapacityUnits": int(gsi_throughput['read']),
440                            "WriteCapacityUnits": int(gsi_throughput['write']),
441                        },
442                    },
443                })
444
445        if throughput or global_indexes:
446            self.connection.update_table(
447                self.table_name,
448                provisioned_throughput=data,
449                global_secondary_index_updates=gsi_data,
450            )
451
452            return True
453        else:
454            msg = 'You need to provide either the throughput or the ' \
455                  'global_indexes to update method'
456            boto.log.error(msg)
457
458            return False
459
460    def create_global_secondary_index(self, global_index):
461        """
462        Creates a global index in DynamoDB after the table has been created.
463
464        Requires a ``global_indexes`` parameter, which should be a
465        ``GlobalBaseIndexField`` subclass representing the desired index.
466
467        To update ``global_indexes`` information on the ``Table``, you'll need
468        to call ``Table.describe``.
469
470        Returns ``True`` on success.
471
472        Example::
473
474            # To create a global index
475            >>> users.create_global_secondary_index(
476            ...     global_index=GlobalAllIndex(
477            ...         'TheIndexNameHere', parts=[
478            ...             HashKey('requiredHashkey', data_type=STRING),
479            ...             RangeKey('optionalRangeKey', data_type=STRING)
480            ...         ],
481            ...         throughput={
482            ...             'read': 2,
483            ...             'write': 1,
484            ...         })
485            ... )
486            True
487
488        """
489
490        if global_index:
491            gsi_data = []
492            gsi_data_attr_def = []
493
494            gsi_data.append({
495                "Create": global_index.schema()
496            })
497
498            for attr_def in global_index.parts:
499                gsi_data_attr_def.append(attr_def.definition())
500
501            self.connection.update_table(
502                self.table_name,
503                global_secondary_index_updates=gsi_data,
504                attribute_definitions=gsi_data_attr_def
505            )
506
507            return True
508        else:
509            msg = 'You need to provide the global_index to ' \
510                  'create_global_secondary_index method'
511            boto.log.error(msg)
512
513            return False
514
515    def delete_global_secondary_index(self, global_index_name):
516        """
517        Deletes a global index in DynamoDB after the table has been created.
518
519        Requires a ``global_index_name`` parameter, which should be a simple
520        string of the name of the global secondary index.
521
522        To update ``global_indexes`` information on the ``Table``, you'll need
523        to call ``Table.describe``.
524
525        Returns ``True`` on success.
526
527        Example::
528
529            # To delete a global index
530            >>> users.delete_global_secondary_index('TheIndexNameHere')
531            True
532
533        """
534
535        if global_index_name:
536            gsi_data = [
537                {
538                    "Delete": {
539                        "IndexName": global_index_name
540                    }
541                }
542            ]
543
544            self.connection.update_table(
545                self.table_name,
546                global_secondary_index_updates=gsi_data,
547            )
548
549            return True
550        else:
551            msg = 'You need to provide the global index name to ' \
552                  'delete_global_secondary_index method'
553            boto.log.error(msg)
554
555            return False
556
557    def update_global_secondary_index(self, global_indexes):
558        """
559        Updates a global index(es) in DynamoDB after the table has been created.
560
561        Requires a ``global_indexes`` parameter, which should be a
562        dictionary. If provided, it should specify the index name, which is also
563        a dict containing a ``read`` & ``write`` key, both of which
564        should have an integer value associated with them.
565
566        To update ``global_indexes`` information on the ``Table``, you'll need
567        to call ``Table.describe``.
568
569        Returns ``True`` on success.
570
571        Example::
572
573            # To update a global index
574            >>> users.update_global_secondary_index(global_indexes={
575            ...     'TheIndexNameHere': {
576            ...         'read': 15,
577            ...         'write': 5,
578            ...     }
579            ... })
580            True
581
582        """
583
584        if global_indexes:
585            gsi_data = []
586
587            for gsi_name, gsi_throughput in global_indexes.items():
588                gsi_data.append({
589                    "Update": {
590                        "IndexName": gsi_name,
591                        "ProvisionedThroughput": {
592                            "ReadCapacityUnits": int(gsi_throughput['read']),
593                            "WriteCapacityUnits": int(gsi_throughput['write']),
594                        },
595                    },
596                })
597
598            self.connection.update_table(
599                self.table_name,
600                global_secondary_index_updates=gsi_data,
601            )
602            return True
603        else:
604            msg = 'You need to provide the global indexes to ' \
605                  'update_global_secondary_index method'
606            boto.log.error(msg)
607
608            return False
609
610    def delete(self):
611        """
612        Deletes a table in DynamoDB.
613
614        **IMPORTANT** - Be careful when using this method, there is no undo.
615
616        Returns ``True`` on success.
617
618        Example::
619
620            >>> users.delete()
621            True
622
623        """
624        self.connection.delete_table(self.table_name)
625        return True
626
627    def _encode_keys(self, keys):
628        """
629        Given a flat Python dictionary of keys/values, converts it into the
630        nested dictionary DynamoDB expects.
631
632        Converts::
633
634            {
635                'username': 'john',
636                'tags': [1, 2, 5],
637            }
638
639        ...to...::
640
641            {
642                'username': {'S': 'john'},
643                'tags': {'NS': ['1', '2', '5']},
644            }
645
646        """
647        raw_key = {}
648
649        for key, value in keys.items():
650            raw_key[key] = self._dynamizer.encode(value)
651
652        return raw_key
653
654    def get_item(self, consistent=False, attributes=None, **kwargs):
655        """
656        Fetches an item (record) from a table in DynamoDB.
657
658        To specify the key of the item you'd like to get, you can specify the
659        key attributes as kwargs.
660
661        Optionally accepts a ``consistent`` parameter, which should be a
662        boolean. If you provide ``True``, it will perform
663        a consistent (but more expensive) read from DynamoDB.
664        (Default: ``False``)
665
666        Optionally accepts an ``attributes`` parameter, which should be a
667        list of fieldname to fetch. (Default: ``None``, which means all fields
668        should be fetched)
669
670        Returns an ``Item`` instance containing all the data for that record.
671
672        Raises an ``ItemNotFound`` exception if the item is not found.
673
674        Example::
675
676            # A simple hash key.
677            >>> john = users.get_item(username='johndoe')
678            >>> john['first_name']
679            'John'
680
681            # A complex hash+range key.
682            >>> john = users.get_item(username='johndoe', last_name='Doe')
683            >>> john['first_name']
684            'John'
685
686            # A consistent read (assuming the data might have just changed).
687            >>> john = users.get_item(username='johndoe', consistent=True)
688            >>> john['first_name']
689            'Johann'
690
691            # With a key that is an invalid variable name in Python.
692            # Also, assumes a different schema than previous examples.
693            >>> john = users.get_item(**{
694            ...     'date-joined': 127549192,
695            ... })
696            >>> john['first_name']
697            'John'
698
699        """
700        raw_key = self._encode_keys(kwargs)
701        item_data = self.connection.get_item(
702            self.table_name,
703            raw_key,
704            attributes_to_get=attributes,
705            consistent_read=consistent
706        )
707        if 'Item' not in item_data:
708            raise exceptions.ItemNotFound("Item %s couldn't be found." % kwargs)
709        item = Item(self)
710        item.load(item_data)
711        return item
712
713    def has_item(self, **kwargs):
714        """
715        Return whether an item (record) exists within a table in DynamoDB.
716
717        To specify the key of the item you'd like to get, you can specify the
718        key attributes as kwargs.
719
720        Optionally accepts a ``consistent`` parameter, which should be a
721        boolean. If you provide ``True``, it will perform
722        a consistent (but more expensive) read from DynamoDB.
723        (Default: ``False``)
724
725        Optionally accepts an ``attributes`` parameter, which should be a
726        list of fieldnames to fetch. (Default: ``None``, which means all fields
727        should be fetched)
728
729        Returns ``True`` if an ``Item`` is present, ``False`` if not.
730
731        Example::
732
733            # Simple, just hash-key schema.
734            >>> users.has_item(username='johndoe')
735            True
736
737            # Complex schema, item not present.
738            >>> users.has_item(
739            ...     username='johndoe',
740            ...     date_joined='2014-01-07'
741            ... )
742            False
743
744        """
745        try:
746            self.get_item(**kwargs)
747        except (JSONResponseError, exceptions.ItemNotFound):
748            return False
749
750        return True
751
752    def lookup(self, *args, **kwargs):
753        """
754        Look up an entry in DynamoDB. This is mostly backwards compatible
755        with boto.dynamodb. Unlike get_item, it takes hash_key and range_key first,
756        although you may still specify keyword arguments instead.
757
758        Also unlike the get_item command, if the returned item has no keys
759        (i.e., it does not exist in DynamoDB), a None result is returned, instead
760        of an empty key object.
761
762        Example::
763            >>> user = users.lookup(username)
764            >>> user = users.lookup(username, consistent=True)
765            >>> app = apps.lookup('my_customer_id', 'my_app_id')
766
767        """
768        if not self.schema:
769            self.describe()
770        for x, arg in enumerate(args):
771            kwargs[self.schema[x].name] = arg
772        ret = self.get_item(**kwargs)
773        if not ret.keys():
774            return None
775        return ret
776
777    def new_item(self, *args):
778        """
779        Returns a new, blank item
780
781        This is mostly for consistency with boto.dynamodb
782        """
783        if not self.schema:
784            self.describe()
785        data = {}
786        for x, arg in enumerate(args):
787            data[self.schema[x].name] = arg
788        return Item(self, data=data)
789
790    def put_item(self, data, overwrite=False):
791        """
792        Saves an entire item to DynamoDB.
793
794        By default, if any part of the ``Item``'s original data doesn't match
795        what's currently in DynamoDB, this request will fail. This prevents
796        other processes from updating the data in between when you read the
797        item & when your request to update the item's data is processed, which
798        would typically result in some data loss.
799
800        Requires a ``data`` parameter, which should be a dictionary of the data
801        you'd like to store in DynamoDB.
802
803        Optionally accepts an ``overwrite`` parameter, which should be a
804        boolean. If you provide ``True``, this will tell DynamoDB to blindly
805        overwrite whatever data is present, if any.
806
807        Returns ``True`` on success.
808
809        Example::
810
811            >>> users.put_item(data={
812            ...     'username': 'jane',
813            ...     'first_name': 'Jane',
814            ...     'last_name': 'Doe',
815            ...     'date_joined': 126478915,
816            ... })
817            True
818
819        """
820        item = Item(self, data=data)
821        return item.save(overwrite=overwrite)
822
823    def _put_item(self, item_data, expects=None):
824        """
825        The internal variant of ``put_item`` (full data). This is used by the
826        ``Item`` objects, since that operation is represented at the
827        table-level by the API, but conceptually maps better to telling an
828        individual ``Item`` to save itself.
829        """
830        kwargs = {}
831
832        if expects is not None:
833            kwargs['expected'] = expects
834
835        self.connection.put_item(self.table_name, item_data, **kwargs)
836        return True
837
838    def _update_item(self, key, item_data, expects=None):
839        """
840        The internal variant of ``put_item`` (partial data). This is used by the
841        ``Item`` objects, since that operation is represented at the
842        table-level by the API, but conceptually maps better to telling an
843        individual ``Item`` to save itself.
844        """
845        raw_key = self._encode_keys(key)
846        kwargs = {}
847
848        if expects is not None:
849            kwargs['expected'] = expects
850
851        self.connection.update_item(self.table_name, raw_key, item_data, **kwargs)
852        return True
853
854    def delete_item(self, expected=None, conditional_operator=None, **kwargs):
855        """
856        Deletes a single item. You can perform a conditional delete operation
857        that deletes the item if it exists, or if it has an expected attribute
858        value.
859
860        Conditional deletes are useful for only deleting items if specific
861        conditions are met. If those conditions are met, DynamoDB performs
862        the delete. Otherwise, the item is not deleted.
863
864        To specify the expected attribute values of the item, you can pass a
865        dictionary of conditions to ``expected``. Each condition should follow
866        the pattern ``<attributename>__<comparison_operator>=<value_to_expect>``.
867
868        **IMPORTANT** - Be careful when using this method, there is no undo.
869
870        To specify the key of the item you'd like to get, you can specify the
871        key attributes as kwargs.
872
873        Optionally accepts an ``expected`` parameter which is a dictionary of
874        expected attribute value conditions.
875
876        Optionally accepts a ``conditional_operator`` which applies to the
877        expected attribute value conditions:
878
879        + `AND` - If all of the conditions evaluate to true (default)
880        + `OR` - True if at least one condition evaluates to true
881
882        Returns ``True`` on success, ``False`` on failed conditional delete.
883
884        Example::
885
886            # A simple hash key.
887            >>> users.delete_item(username='johndoe')
888            True
889
890            # A complex hash+range key.
891            >>> users.delete_item(username='jane', last_name='Doe')
892            True
893
894            # With a key that is an invalid variable name in Python.
895            # Also, assumes a different schema than previous examples.
896            >>> users.delete_item(**{
897            ...     'date-joined': 127549192,
898            ... })
899            True
900
901            # Conditional delete
902            >>> users.delete_item(username='johndoe',
903            ...                   expected={'balance__eq': 0})
904            True
905        """
906        expected = self._build_filters(expected, using=FILTER_OPERATORS)
907        raw_key = self._encode_keys(kwargs)
908
909        try:
910            self.connection.delete_item(self.table_name, raw_key,
911                                        expected=expected,
912                                        conditional_operator=conditional_operator)
913        except exceptions.ConditionalCheckFailedException:
914            return False
915
916        return True
917
918    def get_key_fields(self):
919        """
920        Returns the fields necessary to make a key for a table.
921
922        If the ``Table`` does not already have a populated ``schema``,
923        this will request it via a ``Table.describe`` call.
924
925        Returns a list of fieldnames (strings).
926
927        Example::
928
929            # A simple hash key.
930            >>> users.get_key_fields()
931            ['username']
932
933            # A complex hash+range key.
934            >>> users.get_key_fields()
935            ['username', 'last_name']
936
937        """
938        if not self.schema:
939            # We don't know the structure of the table. Get a description to
940            # populate the schema.
941            self.describe()
942
943        return [field.name for field in self.schema]
944
945    def batch_write(self):
946        """
947        Allows the batching of writes to DynamoDB.
948
949        Since each write/delete call to DynamoDB has a cost associated with it,
950        when loading lots of data, it makes sense to batch them, creating as
951        few calls as possible.
952
953        This returns a context manager that will transparently handle creating
954        these batches. The object you get back lightly-resembles a ``Table``
955        object, sharing just the ``put_item`` & ``delete_item`` methods
956        (which are all that DynamoDB can batch in terms of writing data).
957
958        DynamoDB's maximum batch size is 25 items per request. If you attempt
959        to put/delete more than that, the context manager will batch as many
960        as it can up to that number, then flush them to DynamoDB & continue
961        batching as more calls come in.
962
963        Example::
964
965            # Assuming a table with one record...
966            >>> with users.batch_write() as batch:
967            ...     batch.put_item(data={
968            ...         'username': 'johndoe',
969            ...         'first_name': 'John',
970            ...         'last_name': 'Doe',
971            ...         'owner': 1,
972            ...     })
973            ...     # Nothing across the wire yet.
974            ...     batch.delete_item(username='bob')
975            ...     # Still no requests sent.
976            ...     batch.put_item(data={
977            ...         'username': 'jane',
978            ...         'first_name': 'Jane',
979            ...         'last_name': 'Doe',
980            ...         'date_joined': 127436192,
981            ...     })
982            ...     # Nothing yet, but once we leave the context, the
983            ...     # put/deletes will be sent.
984
985        """
986        # PHENOMENAL COSMIC DOCS!!! itty-bitty code.
987        return BatchTable(self)
988
989    def _build_filters(self, filter_kwargs, using=QUERY_OPERATORS):
990        """
991        An internal method for taking query/scan-style ``**kwargs`` & turning
992        them into the raw structure DynamoDB expects for filtering.
993        """
994        if filter_kwargs is None:
995            return
996
997        filters = {}
998
999        for field_and_op, value in filter_kwargs.items():
1000            field_bits = field_and_op.split('__')
1001            fieldname = '__'.join(field_bits[:-1])
1002
1003            try:
1004                op = using[field_bits[-1]]
1005            except KeyError:
1006                raise exceptions.UnknownFilterTypeError(
1007                    "Operator '%s' from '%s' is not recognized." % (
1008                        field_bits[-1],
1009                        field_and_op
1010                    )
1011                )
1012
1013            lookup = {
1014                'AttributeValueList': [],
1015                'ComparisonOperator': op,
1016            }
1017
1018            # Special-case the ``NULL/NOT_NULL`` case.
1019            if field_bits[-1] == 'null':
1020                del lookup['AttributeValueList']
1021
1022                if value is False:
1023                    lookup['ComparisonOperator'] = 'NOT_NULL'
1024                else:
1025                    lookup['ComparisonOperator'] = 'NULL'
1026            # Special-case the ``BETWEEN`` case.
1027            elif field_bits[-1] == 'between':
1028                if len(value) == 2 and isinstance(value, (list, tuple)):
1029                    lookup['AttributeValueList'].append(
1030                        self._dynamizer.encode(value[0])
1031                    )
1032                    lookup['AttributeValueList'].append(
1033                        self._dynamizer.encode(value[1])
1034                    )
1035            # Special-case the ``IN`` case
1036            elif field_bits[-1] == 'in':
1037                for val in value:
1038                    lookup['AttributeValueList'].append(self._dynamizer.encode(val))
1039            else:
1040                # Fix up the value for encoding, because it was built to only work
1041                # with ``set``s.
1042                if isinstance(value, (list, tuple)):
1043                    value = set(value)
1044                lookup['AttributeValueList'].append(
1045                    self._dynamizer.encode(value)
1046                )
1047
1048            # Finally, insert it into the filters.
1049            filters[fieldname] = lookup
1050
1051        return filters
1052
1053    def query(self, limit=None, index=None, reverse=False, consistent=False,
1054              attributes=None, max_page_size=None, **filter_kwargs):
1055        """
1056        **WARNING:** This method is provided **strictly** for
1057        backward-compatibility. It returns results in an incorrect order.
1058
1059        If you are writing new code, please use ``Table.query_2``.
1060        """
1061        reverse = not reverse
1062        return self.query_2(limit=limit, index=index, reverse=reverse,
1063                            consistent=consistent, attributes=attributes,
1064                            max_page_size=max_page_size, **filter_kwargs)
1065
1066    def query_2(self, limit=None, index=None, reverse=False,
1067                consistent=False, attributes=None, max_page_size=None,
1068                query_filter=None, conditional_operator=None,
1069                **filter_kwargs):
1070        """
1071        Queries for a set of matching items in a DynamoDB table.
1072
1073        Queries can be performed against a hash key, a hash+range key or
1074        against any data stored in your local secondary indexes. Query filters
1075        can be used to filter on arbitrary fields.
1076
1077        **Note** - You can not query against arbitrary fields within the data
1078        stored in DynamoDB unless you specify ``query_filter`` values.
1079
1080        To specify the filters of the items you'd like to get, you can specify
1081        the filters as kwargs. Each filter kwarg should follow the pattern
1082        ``<fieldname>__<filter_operation>=<value_to_look_for>``. Query filters
1083        are specified in the same way.
1084
1085        Optionally accepts a ``limit`` parameter, which should be an integer
1086        count of the total number of items to return. (Default: ``None`` -
1087        all results)
1088
1089        Optionally accepts an ``index`` parameter, which should be a string of
1090        name of the local secondary index you want to query against.
1091        (Default: ``None``)
1092
1093        Optionally accepts a ``reverse`` parameter, which will present the
1094        results in reverse order. (Default: ``False`` - normal order)
1095
1096        Optionally accepts a ``consistent`` parameter, which should be a
1097        boolean. If you provide ``True``, it will force a consistent read of
1098        the data (more expensive). (Default: ``False`` - use eventually
1099        consistent reads)
1100
1101        Optionally accepts a ``attributes`` parameter, which should be a
1102        tuple. If you provide any attributes only these will be fetched
1103        from DynamoDB. This uses the ``AttributesToGet`` and set's
1104        ``Select`` to ``SPECIFIC_ATTRIBUTES`` API.
1105
1106        Optionally accepts a ``max_page_size`` parameter, which should be an
1107        integer count of the maximum number of items to retrieve
1108        **per-request**. This is useful in making faster requests & prevent
1109        the scan from drowning out other queries. (Default: ``None`` -
1110        fetch as many as DynamoDB will return)
1111
1112        Optionally accepts a ``query_filter`` which is a dictionary of filter
1113        conditions against any arbitrary field in the returned data.
1114
1115        Optionally accepts a ``conditional_operator`` which applies to the
1116        query filter conditions:
1117
1118        + `AND` - True if all filter conditions evaluate to true (default)
1119        + `OR` - True if at least one filter condition evaluates to true
1120
1121        Returns a ``ResultSet``, which transparently handles the pagination of
1122        results you get back.
1123
1124        Example::
1125
1126            # Look for last names equal to "Doe".
1127            >>> results = users.query(last_name__eq='Doe')
1128            >>> for res in results:
1129            ...     print res['first_name']
1130            'John'
1131            'Jane'
1132
1133            # Look for last names beginning with "D", in reverse order, limit 3.
1134            >>> results = users.query(
1135            ...     last_name__beginswith='D',
1136            ...     reverse=True,
1137            ...     limit=3
1138            ... )
1139            >>> for res in results:
1140            ...     print res['first_name']
1141            'Alice'
1142            'Jane'
1143            'John'
1144
1145            # Use an LSI & a consistent read.
1146            >>> results = users.query(
1147            ...     date_joined__gte=1236451000,
1148            ...     owner__eq=1,
1149            ...     index='DateJoinedIndex',
1150            ...     consistent=True
1151            ... )
1152            >>> for res in results:
1153            ...     print res['first_name']
1154            'Alice'
1155            'Bob'
1156            'John'
1157            'Fred'
1158
1159            # Filter by non-indexed field(s)
1160            >>> results = users.query(
1161            ...     last_name__eq='Doe',
1162            ...     reverse=True,
1163            ...     query_filter={
1164            ...         'first_name__beginswith': 'A'
1165            ...     }
1166            ... )
1167            >>> for res in results:
1168            ...     print res['first_name'] + ' ' + res['last_name']
1169            'Alice Doe'
1170
1171        """
1172        if self.schema:
1173            if len(self.schema) == 1:
1174                if len(filter_kwargs) <= 1:
1175                    if not self.global_indexes or not len(self.global_indexes):
1176                        # If the schema only has one field, there's <= 1 filter
1177                        # param & no Global Secondary Indexes, this is user
1178                        # error. Bail early.
1179                        raise exceptions.QueryError(
1180                            "You must specify more than one key to filter on."
1181                        )
1182
1183        if attributes is not None:
1184            select = 'SPECIFIC_ATTRIBUTES'
1185        else:
1186            select = None
1187
1188        results = ResultSet(
1189            max_page_size=max_page_size
1190        )
1191        kwargs = filter_kwargs.copy()
1192        kwargs.update({
1193            'limit': limit,
1194            'index': index,
1195            'reverse': reverse,
1196            'consistent': consistent,
1197            'select': select,
1198            'attributes_to_get': attributes,
1199            'query_filter': query_filter,
1200            'conditional_operator': conditional_operator,
1201        })
1202        results.to_call(self._query, **kwargs)
1203        return results
1204
1205    def query_count(self, index=None, consistent=False, conditional_operator=None,
1206                    query_filter=None, scan_index_forward=True, limit=None,
1207                    exclusive_start_key=None, **filter_kwargs):
1208        """
1209        Queries the exact count of matching items in a DynamoDB table.
1210
1211        Queries can be performed against a hash key, a hash+range key or
1212        against any data stored in your local secondary indexes. Query filters
1213        can be used to filter on arbitrary fields.
1214
1215        To specify the filters of the items you'd like to get, you can specify
1216        the filters as kwargs. Each filter kwarg should follow the pattern
1217        ``<fieldname>__<filter_operation>=<value_to_look_for>``. Query filters
1218        are specified in the same way.
1219
1220        Optionally accepts an ``index`` parameter, which should be a string of
1221        name of the local secondary index you want to query against.
1222        (Default: ``None``)
1223
1224        Optionally accepts a ``consistent`` parameter, which should be a
1225        boolean. If you provide ``True``, it will force a consistent read of
1226        the data (more expensive). (Default: ``False`` - use eventually
1227        consistent reads)
1228
1229        Optionally accepts a ``query_filter`` which is a dictionary of filter
1230        conditions against any arbitrary field in the returned data.
1231
1232        Optionally accepts a ``conditional_operator`` which applies to the
1233        query filter conditions:
1234
1235        + `AND` - True if all filter conditions evaluate to true (default)
1236        + `OR` - True if at least one filter condition evaluates to true
1237
1238        Optionally accept a ``exclusive_start_key`` which is used to get
1239        the remaining items when a query cannot return the complete count.
1240
1241        Returns an integer which represents the exact amount of matched
1242        items.
1243
1244        :type scan_index_forward: boolean
1245        :param scan_index_forward: Specifies ascending (true) or descending
1246            (false) traversal of the index. DynamoDB returns results reflecting
1247            the requested order determined by the range key. If the data type
1248            is Number, the results are returned in numeric order. For String,
1249            the results are returned in order of ASCII character code values.
1250            For Binary, DynamoDB treats each byte of the binary data as
1251            unsigned when it compares binary values.
1252
1253        If ScanIndexForward is not specified, the results are returned in
1254            ascending order.
1255
1256        :type limit: integer
1257        :param limit: The maximum number of items to evaluate (not necessarily
1258            the number of matching items).
1259
1260        Example::
1261
1262            # Look for last names equal to "Doe".
1263            >>> users.query_count(last_name__eq='Doe')
1264            5
1265
1266            # Use an LSI & a consistent read.
1267            >>> users.query_count(
1268            ...     date_joined__gte=1236451000,
1269            ...     owner__eq=1,
1270            ...     index='DateJoinedIndex',
1271            ...     consistent=True
1272            ... )
1273            2
1274
1275        """
1276        key_conditions = self._build_filters(
1277            filter_kwargs,
1278            using=QUERY_OPERATORS
1279        )
1280
1281        built_query_filter = self._build_filters(
1282            query_filter,
1283            using=FILTER_OPERATORS
1284        )
1285
1286        count_buffer = 0
1287        last_evaluated_key = exclusive_start_key
1288
1289        while True:
1290            raw_results = self.connection.query(
1291                self.table_name,
1292                index_name=index,
1293                consistent_read=consistent,
1294                select='COUNT',
1295                key_conditions=key_conditions,
1296                query_filter=built_query_filter,
1297                conditional_operator=conditional_operator,
1298                limit=limit,
1299                scan_index_forward=scan_index_forward,
1300                exclusive_start_key=last_evaluated_key
1301            )
1302
1303            count_buffer += int(raw_results.get('Count', 0))
1304            last_evaluated_key = raw_results.get('LastEvaluatedKey')
1305            if not last_evaluated_key or count_buffer < 1:
1306                break
1307
1308        return count_buffer
1309
1310    def _query(self, limit=None, index=None, reverse=False, consistent=False,
1311               exclusive_start_key=None, select=None, attributes_to_get=None,
1312               query_filter=None, conditional_operator=None, **filter_kwargs):
1313        """
1314        The internal method that performs the actual queries. Used extensively
1315        by ``ResultSet`` to perform each (paginated) request.
1316        """
1317        kwargs = {
1318            'limit': limit,
1319            'index_name': index,
1320            'consistent_read': consistent,
1321            'select': select,
1322            'attributes_to_get': attributes_to_get,
1323            'conditional_operator': conditional_operator,
1324        }
1325
1326        if reverse:
1327            kwargs['scan_index_forward'] = False
1328
1329        if exclusive_start_key:
1330            kwargs['exclusive_start_key'] = {}
1331
1332            for key, value in exclusive_start_key.items():
1333                kwargs['exclusive_start_key'][key] = \
1334                    self._dynamizer.encode(value)
1335
1336        # Convert the filters into something we can actually use.
1337        kwargs['key_conditions'] = self._build_filters(
1338            filter_kwargs,
1339            using=QUERY_OPERATORS
1340        )
1341
1342        kwargs['query_filter'] = self._build_filters(
1343            query_filter,
1344            using=FILTER_OPERATORS
1345        )
1346
1347        raw_results = self.connection.query(
1348            self.table_name,
1349            **kwargs
1350        )
1351        results = []
1352        last_key = None
1353
1354        for raw_item in raw_results.get('Items', []):
1355            item = Item(self)
1356            item.load({
1357                'Item': raw_item,
1358            })
1359            results.append(item)
1360
1361        if raw_results.get('LastEvaluatedKey', None):
1362            last_key = {}
1363
1364            for key, value in raw_results['LastEvaluatedKey'].items():
1365                last_key[key] = self._dynamizer.decode(value)
1366
1367        return {
1368            'results': results,
1369            'last_key': last_key,
1370        }
1371
1372    def scan(self, limit=None, segment=None, total_segments=None,
1373             max_page_size=None, attributes=None, conditional_operator=None,
1374             **filter_kwargs):
1375        """
1376        Scans across all items within a DynamoDB table.
1377
1378        Scans can be performed against a hash key or a hash+range key. You can
1379        additionally filter the results after the table has been read but
1380        before the response is returned by using query filters.
1381
1382        To specify the filters of the items you'd like to get, you can specify
1383        the filters as kwargs. Each filter kwarg should follow the pattern
1384        ``<fieldname>__<filter_operation>=<value_to_look_for>``.
1385
1386        Optionally accepts a ``limit`` parameter, which should be an integer
1387        count of the total number of items to return. (Default: ``None`` -
1388        all results)
1389
1390        Optionally accepts a ``segment`` parameter, which should be an integer
1391        of the segment to retrieve on. Please see the documentation about
1392        Parallel Scans (Default: ``None`` - no segments)
1393
1394        Optionally accepts a ``total_segments`` parameter, which should be an
1395        integer count of number of segments to divide the table into.
1396        Please see the documentation about Parallel Scans (Default: ``None`` -
1397        no segments)
1398
1399        Optionally accepts a ``max_page_size`` parameter, which should be an
1400        integer count of the maximum number of items to retrieve
1401        **per-request**. This is useful in making faster requests & prevent
1402        the scan from drowning out other queries. (Default: ``None`` -
1403        fetch as many as DynamoDB will return)
1404
1405        Optionally accepts an ``attributes`` parameter, which should be a
1406        tuple. If you provide any attributes only these will be fetched
1407        from DynamoDB. This uses the ``AttributesToGet`` and set's
1408        ``Select`` to ``SPECIFIC_ATTRIBUTES`` API.
1409
1410        Returns a ``ResultSet``, which transparently handles the pagination of
1411        results you get back.
1412
1413        Example::
1414
1415            # All results.
1416            >>> everything = users.scan()
1417
1418            # Look for last names beginning with "D".
1419            >>> results = users.scan(last_name__beginswith='D')
1420            >>> for res in results:
1421            ...     print res['first_name']
1422            'Alice'
1423            'John'
1424            'Jane'
1425
1426            # Use an ``IN`` filter & limit.
1427            >>> results = users.scan(
1428            ...     age__in=[25, 26, 27, 28, 29],
1429            ...     limit=1
1430            ... )
1431            >>> for res in results:
1432            ...     print res['first_name']
1433            'Alice'
1434
1435        """
1436        results = ResultSet(
1437            max_page_size=max_page_size
1438        )
1439        kwargs = filter_kwargs.copy()
1440        kwargs.update({
1441            'limit': limit,
1442            'segment': segment,
1443            'total_segments': total_segments,
1444            'attributes': attributes,
1445            'conditional_operator': conditional_operator,
1446        })
1447        results.to_call(self._scan, **kwargs)
1448        return results
1449
1450    def _scan(self, limit=None, exclusive_start_key=None, segment=None,
1451              total_segments=None, attributes=None, conditional_operator=None,
1452              **filter_kwargs):
1453        """
1454        The internal method that performs the actual scan. Used extensively
1455        by ``ResultSet`` to perform each (paginated) request.
1456        """
1457        kwargs = {
1458            'limit': limit,
1459            'segment': segment,
1460            'total_segments': total_segments,
1461            'attributes_to_get': attributes,
1462            'conditional_operator': conditional_operator,
1463        }
1464
1465        if exclusive_start_key:
1466            kwargs['exclusive_start_key'] = {}
1467
1468            for key, value in exclusive_start_key.items():
1469                kwargs['exclusive_start_key'][key] = \
1470                    self._dynamizer.encode(value)
1471
1472        # Convert the filters into something we can actually use.
1473        kwargs['scan_filter'] = self._build_filters(
1474            filter_kwargs,
1475            using=FILTER_OPERATORS
1476        )
1477
1478        raw_results = self.connection.scan(
1479            self.table_name,
1480            **kwargs
1481        )
1482        results = []
1483        last_key = None
1484
1485        for raw_item in raw_results.get('Items', []):
1486            item = Item(self)
1487            item.load({
1488                'Item': raw_item,
1489            })
1490            results.append(item)
1491
1492        if raw_results.get('LastEvaluatedKey', None):
1493            last_key = {}
1494
1495            for key, value in raw_results['LastEvaluatedKey'].items():
1496                last_key[key] = self._dynamizer.decode(value)
1497
1498        return {
1499            'results': results,
1500            'last_key': last_key,
1501        }
1502
1503    def batch_get(self, keys, consistent=False, attributes=None):
1504        """
1505        Fetches many specific items in batch from a table.
1506
1507        Requires a ``keys`` parameter, which should be a list of dictionaries.
1508        Each dictionary should consist of the keys values to specify.
1509
1510        Optionally accepts a ``consistent`` parameter, which should be a
1511        boolean. If you provide ``True``, a strongly consistent read will be
1512        used. (Default: False)
1513
1514        Optionally accepts an ``attributes`` parameter, which should be a
1515        tuple. If you provide any attributes only these will be fetched
1516        from DynamoDB.
1517
1518        Returns a ``ResultSet``, which transparently handles the pagination of
1519        results you get back.
1520
1521        Example::
1522
1523            >>> results = users.batch_get(keys=[
1524            ...     {
1525            ...         'username': 'johndoe',
1526            ...     },
1527            ...     {
1528            ...         'username': 'jane',
1529            ...     },
1530            ...     {
1531            ...         'username': 'fred',
1532            ...     },
1533            ... ])
1534            >>> for res in results:
1535            ...     print res['first_name']
1536            'John'
1537            'Jane'
1538            'Fred'
1539
1540        """
1541        # We pass the keys to the constructor instead, so it can maintain it's
1542        # own internal state as to what keys have been processed.
1543        results = BatchGetResultSet(keys=keys, max_batch_get=self.max_batch_get)
1544        results.to_call(self._batch_get, consistent=consistent, attributes=attributes)
1545        return results
1546
1547    def _batch_get(self, keys, consistent=False, attributes=None):
1548        """
1549        The internal method that performs the actual batch get. Used extensively
1550        by ``BatchGetResultSet`` to perform each (paginated) request.
1551        """
1552        items = {
1553            self.table_name: {
1554                'Keys': [],
1555            },
1556        }
1557
1558        if consistent:
1559            items[self.table_name]['ConsistentRead'] = True
1560
1561        if attributes is not None:
1562            items[self.table_name]['AttributesToGet'] = attributes
1563
1564        for key_data in keys:
1565            raw_key = {}
1566
1567            for key, value in key_data.items():
1568                raw_key[key] = self._dynamizer.encode(value)
1569
1570            items[self.table_name]['Keys'].append(raw_key)
1571
1572        raw_results = self.connection.batch_get_item(request_items=items)
1573        results = []
1574        unprocessed_keys = []
1575
1576        for raw_item in raw_results['Responses'].get(self.table_name, []):
1577            item = Item(self)
1578            item.load({
1579                'Item': raw_item,
1580            })
1581            results.append(item)
1582
1583        raw_unproccessed = raw_results.get('UnprocessedKeys', {})
1584
1585        for raw_key in raw_unproccessed.get('Keys', []):
1586            py_key = {}
1587
1588            for key, value in raw_key.items():
1589                py_key[key] = self._dynamizer.decode(value)
1590
1591            unprocessed_keys.append(py_key)
1592
1593        return {
1594            'results': results,
1595            # NEVER return a ``last_key``. Just in-case any part of
1596            # ``ResultSet`` peeks through, since much of the
1597            # original underlying implementation is based on this key.
1598            'last_key': None,
1599            'unprocessed_keys': unprocessed_keys,
1600        }
1601
1602    def count(self):
1603        """
1604        Returns a (very) eventually consistent count of the number of items
1605        in a table.
1606
1607        Lag time is about 6 hours, so don't expect a high degree of accuracy.
1608
1609        Example::
1610
1611            >>> users.count()
1612            6
1613
1614        """
1615        info = self.describe()
1616        return info['Table'].get('ItemCount', 0)
1617
1618
1619class BatchTable(object):
1620    """
1621    Used by ``Table`` as the context manager for batch writes.
1622
1623    You likely don't want to try to use this object directly.
1624    """
1625    def __init__(self, table):
1626        self.table = table
1627        self._to_put = []
1628        self._to_delete = []
1629        self._unprocessed = []
1630
1631    def __enter__(self):
1632        return self
1633
1634    def __exit__(self, type, value, traceback):
1635        if self._to_put or self._to_delete:
1636            # Flush anything that's left.
1637            self.flush()
1638
1639        if self._unprocessed:
1640            # Finally, handle anything that wasn't processed.
1641            self.resend_unprocessed()
1642
1643    def put_item(self, data, overwrite=False):
1644        self._to_put.append(data)
1645
1646        if self.should_flush():
1647            self.flush()
1648
1649    def delete_item(self, **kwargs):
1650        self._to_delete.append(kwargs)
1651
1652        if self.should_flush():
1653            self.flush()
1654
1655    def should_flush(self):
1656        if len(self._to_put) + len(self._to_delete) == 25:
1657            return True
1658
1659        return False
1660
1661    def flush(self):
1662        batch_data = {
1663            self.table.table_name: [
1664                # We'll insert data here shortly.
1665            ],
1666        }
1667
1668        for put in self._to_put:
1669            item = Item(self.table, data=put)
1670            batch_data[self.table.table_name].append({
1671                'PutRequest': {
1672                    'Item': item.prepare_full(),
1673                }
1674            })
1675
1676        for delete in self._to_delete:
1677            batch_data[self.table.table_name].append({
1678                'DeleteRequest': {
1679                    'Key': self.table._encode_keys(delete),
1680                }
1681            })
1682
1683        resp = self.table.connection.batch_write_item(batch_data)
1684        self.handle_unprocessed(resp)
1685
1686        self._to_put = []
1687        self._to_delete = []
1688        return True
1689
1690    def handle_unprocessed(self, resp):
1691        if len(resp.get('UnprocessedItems', [])):
1692            table_name = self.table.table_name
1693            unprocessed = resp['UnprocessedItems'].get(table_name, [])
1694
1695            # Some items have not been processed. Stow them for now &
1696            # re-attempt processing on ``__exit__``.
1697            msg = "%s items were unprocessed. Storing for later."
1698            boto.log.info(msg % len(unprocessed))
1699            self._unprocessed.extend(unprocessed)
1700
1701    def resend_unprocessed(self):
1702        # If there are unprocessed records (for instance, the user was over
1703        # their throughput limitations), iterate over them & send until they're
1704        # all there.
1705        boto.log.info(
1706            "Re-sending %s unprocessed items." % len(self._unprocessed)
1707        )
1708
1709        while len(self._unprocessed):
1710            # Again, do 25 at a time.
1711            to_resend = self._unprocessed[:25]
1712            # Remove them from the list.
1713            self._unprocessed = self._unprocessed[25:]
1714            batch_data = {
1715                self.table.table_name: to_resend
1716            }
1717            boto.log.info("Sending %s items" % len(to_resend))
1718            resp = self.table.connection.batch_write_item(batch_data)
1719            self.handle_unprocessed(resp)
1720            boto.log.info(
1721                "%s unprocessed items left" % len(self._unprocessed)
1722            )
1723