1# Copyright (c) 2014 Amazon.com, Inc. or its affiliates.  All Rights Reserved
2#
3# Permission is hereby granted, free of charge, to any person obtaining a
4# copy of this software and associated documentation files (the
5# "Software"), to deal in the Software without restriction, including
6# without limitation the rights to use, copy, modify, merge, publish, dis-
7# tribute, sublicense, and/or sell copies of the Software, and to permit
8# persons to whom the Software is furnished to do so, subject to the fol-
9# lowing conditions:
10#
11# The above copyright notice and this permission notice shall be included
12# in all copies or substantial portions of the Software.
13#
14# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS
15# OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABIL-
16# ITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT
17# SHALL THE AUTHOR BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY,
18# WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
19# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS
20# IN THE SOFTWARE.
21#
22
23import base64
24import boto
25
26from boto.connection import AWSQueryConnection
27from boto.regioninfo import RegionInfo
28from boto.exception import JSONResponseError
29from boto.kinesis import exceptions
30from boto.compat import json
31from boto.compat import six
32
33
34class KinesisConnection(AWSQueryConnection):
35    """
36    Amazon Kinesis Service API Reference
37    Amazon Kinesis is a managed service that scales elastically for
38    real time processing of streaming big data.
39    """
40    APIVersion = "2013-12-02"
41    DefaultRegionName = "us-east-1"
42    DefaultRegionEndpoint = "kinesis.us-east-1.amazonaws.com"
43    ServiceName = "Kinesis"
44    TargetPrefix = "Kinesis_20131202"
45    ResponseError = JSONResponseError
46
47    _faults = {
48        "ProvisionedThroughputExceededException": exceptions.ProvisionedThroughputExceededException,
49        "LimitExceededException": exceptions.LimitExceededException,
50        "ExpiredIteratorException": exceptions.ExpiredIteratorException,
51        "ResourceInUseException": exceptions.ResourceInUseException,
52        "ResourceNotFoundException": exceptions.ResourceNotFoundException,
53        "InvalidArgumentException": exceptions.InvalidArgumentException,
54        "SubscriptionRequiredException": exceptions.SubscriptionRequiredException
55    }
56
57
58    def __init__(self, **kwargs):
59        region = kwargs.pop('region', None)
60        if not region:
61            region = RegionInfo(self, self.DefaultRegionName,
62                                self.DefaultRegionEndpoint)
63        if 'host' not in kwargs:
64            kwargs['host'] = region.endpoint
65        super(KinesisConnection, self).__init__(**kwargs)
66        self.region = region
67
68    def _required_auth_capability(self):
69        return ['hmac-v4']
70
71    def add_tags_to_stream(self, stream_name, tags):
72        """
73        Adds or updates tags for the specified Amazon Kinesis stream.
74        Each stream can have up to 10 tags.
75
76        If tags have already been assigned to the stream,
77        `AddTagsToStream` overwrites any existing tags that correspond
78        to the specified tag keys.
79
80        :type stream_name: string
81        :param stream_name: The name of the stream.
82
83        :type tags: map
84        :param tags: The set of key-value pairs to use to create the tags.
85
86        """
87        params = {'StreamName': stream_name, 'Tags': tags, }
88        return self.make_request(action='AddTagsToStream',
89                                 body=json.dumps(params))
90
91    def create_stream(self, stream_name, shard_count):
92        """
93        Creates a Amazon Kinesis stream. A stream captures and
94        transports data records that are continuously emitted from
95        different data sources or producers . Scale-out within an
96        Amazon Kinesis stream is explicitly supported by means of
97        shards, which are uniquely identified groups of data records
98        in an Amazon Kinesis stream.
99
100        You specify and control the number of shards that a stream is
101        composed of. Each open shard can support up to 5 read
102        transactions per second, up to a maximum total of 2 MB of data
103        read per second. Each shard can support up to 1000 records
104        written per second, up to a maximum total of 1 MB data written
105        per second. You can add shards to a stream if the amount of
106        data input increases and you can remove shards if the amount
107        of data input decreases.
108
109        The stream name identifies the stream. The name is scoped to
110        the AWS account used by the application. It is also scoped by
111        region. That is, two streams in two different accounts can
112        have the same name, and two streams in the same account, but
113        in two different regions, can have the same name.
114
115        `CreateStream` is an asynchronous operation. Upon receiving a
116        `CreateStream` request, Amazon Kinesis immediately returns and
117        sets the stream status to `CREATING`. After the stream is
118        created, Amazon Kinesis sets the stream status to `ACTIVE`.
119        You should perform read and write operations only on an
120        `ACTIVE` stream.
121
122        You receive a `LimitExceededException` when making a
123        `CreateStream` request if you try to do one of the following:
124
125
126        + Have more than five streams in the `CREATING` state at any
127          point in time.
128        + Create more shards than are authorized for your account.
129
130
131        The default limit for an AWS account is 10 shards per stream.
132        If you need to create a stream with more than 10 shards,
133        `contact AWS Support`_ to increase the limit on your account.
134
135        You can use `DescribeStream` to check the stream status, which
136        is returned in `StreamStatus`.
137
138        `CreateStream` has a limit of 5 transactions per second per
139        account.
140
141        :type stream_name: string
142        :param stream_name: A name to identify the stream. The stream name is
143            scoped to the AWS account used by the application that creates the
144            stream. It is also scoped by region. That is, two streams in two
145            different AWS accounts can have the same name, and two streams in
146            the same AWS account, but in two different regions, can have the
147            same name.
148
149        :type shard_count: integer
150        :param shard_count: The number of shards that the stream will use. The
151            throughput of the stream is a function of the number of shards;
152            more shards are required for greater provisioned throughput.
153        **Note:** The default limit for an AWS account is 10 shards per stream.
154            If you need to create a stream with more than 10 shards, `contact
155            AWS Support`_ to increase the limit on your account.
156
157        """
158        params = {
159            'StreamName': stream_name,
160            'ShardCount': shard_count,
161        }
162        return self.make_request(action='CreateStream',
163                                 body=json.dumps(params))
164
165    def delete_stream(self, stream_name):
166        """
167        Deletes a stream and all its shards and data. You must shut
168        down any applications that are operating on the stream before
169        you delete the stream. If an application attempts to operate
170        on a deleted stream, it will receive the exception
171        `ResourceNotFoundException`.
172
173        If the stream is in the `ACTIVE` state, you can delete it.
174        After a `DeleteStream` request, the specified stream is in the
175        `DELETING` state until Amazon Kinesis completes the deletion.
176
177        **Note:** Amazon Kinesis might continue to accept data read
178        and write operations, such as PutRecord, PutRecords, and
179        GetRecords, on a stream in the `DELETING` state until the
180        stream deletion is complete.
181
182        When you delete a stream, any shards in that stream are also
183        deleted, and any tags are dissociated from the stream.
184
185        You can use the DescribeStream operation to check the state of
186        the stream, which is returned in `StreamStatus`.
187
188        `DeleteStream` has a limit of 5 transactions per second per
189        account.
190
191        :type stream_name: string
192        :param stream_name: The name of the stream to delete.
193
194        """
195        params = {'StreamName': stream_name, }
196        return self.make_request(action='DeleteStream',
197                                 body=json.dumps(params))
198
199    def describe_stream(self, stream_name, limit=None,
200                        exclusive_start_shard_id=None):
201        """
202        Describes the specified stream.
203
204        The information about the stream includes its current status,
205        its Amazon Resource Name (ARN), and an array of shard objects.
206        For each shard object, there is information about the hash key
207        and sequence number ranges that the shard spans, and the IDs
208        of any earlier shards that played in a role in creating the
209        shard. A sequence number is the identifier associated with
210        every record ingested in the Amazon Kinesis stream. The
211        sequence number is assigned when a record is put into the
212        stream.
213
214        You can limit the number of returned shards using the `Limit`
215        parameter. The number of shards in a stream may be too large
216        to return from a single call to `DescribeStream`. You can
217        detect this by using the `HasMoreShards` flag in the returned
218        output. `HasMoreShards` is set to `True` when there is more
219        data available.
220
221        `DescribeStream` is a paginated operation. If there are more
222        shards available, you can request them using the shard ID of
223        the last shard returned. Specify this ID in the
224        `ExclusiveStartShardId` parameter in a subsequent request to
225        `DescribeStream`.
226
227        `DescribeStream` has a limit of 10 transactions per second per
228        account.
229
230        :type stream_name: string
231        :param stream_name: The name of the stream to describe.
232
233        :type limit: integer
234        :param limit: The maximum number of shards to return.
235
236        :type exclusive_start_shard_id: string
237        :param exclusive_start_shard_id: The shard ID of the shard to start
238            with.
239
240        """
241        params = {'StreamName': stream_name, }
242        if limit is not None:
243            params['Limit'] = limit
244        if exclusive_start_shard_id is not None:
245            params['ExclusiveStartShardId'] = exclusive_start_shard_id
246        return self.make_request(action='DescribeStream',
247                                 body=json.dumps(params))
248
249    def get_records(self, shard_iterator, limit=None, b64_decode=True):
250        """
251        Gets data records from a shard.
252
253        Specify a shard iterator using the `ShardIterator` parameter.
254        The shard iterator specifies the position in the shard from
255        which you want to start reading data records sequentially. If
256        there are no records available in the portion of the shard
257        that the iterator points to, `GetRecords` returns an empty
258        list. Note that it might take multiple calls to get to a
259        portion of the shard that contains records.
260
261        You can scale by provisioning multiple shards. Your
262        application should have one thread per shard, each reading
263        continuously from its stream. To read from a stream
264        continually, call `GetRecords` in a loop. Use GetShardIterator
265        to get the shard iterator to specify in the first `GetRecords`
266        call. `GetRecords` returns a new shard iterator in
267        `NextShardIterator`. Specify the shard iterator returned in
268        `NextShardIterator` in subsequent calls to `GetRecords`. Note
269        that if the shard has been closed, the shard iterator can't
270        return more data and `GetRecords` returns `null` in
271        `NextShardIterator`. You can terminate the loop when the shard
272        is closed, or when the shard iterator reaches the record with
273        the sequence number or other attribute that marks it as the
274        last record to process.
275
276        Each data record can be up to 50 KB in size, and each shard
277        can read up to 2 MB per second. You can ensure that your calls
278        don't exceed the maximum supported size or throughput by using
279        the `Limit` parameter to specify the maximum number of records
280        that `GetRecords` can return. Consider your average record
281        size when determining this limit. For example, if your average
282        record size is 40 KB, you can limit the data returned to about
283        1 MB per call by specifying 25 as the limit.
284
285        The size of the data returned by `GetRecords` will vary
286        depending on the utilization of the shard. The maximum size of
287        data that `GetRecords` can return is 10 MB. If a call returns
288        10 MB of data, subsequent calls made within the next 5 seconds
289        throw `ProvisionedThroughputExceededException`. If there is
290        insufficient provisioned throughput on the shard, subsequent
291        calls made within the next 1 second throw
292        `ProvisionedThroughputExceededException`. Note that
293        `GetRecords` won't return any data when it throws an
294        exception. For this reason, we recommend that you wait one
295        second between calls to `GetRecords`; however, it's possible
296        that the application will get exceptions for longer than 1
297        second.
298
299        To detect whether the application is falling behind in
300        processing, add a timestamp to your records and note how long
301        it takes to process them. You can also monitor how much data
302        is in a stream using the CloudWatch metrics for write
303        operations ( `PutRecord` and `PutRecords`). For more
304        information, see `Monitoring Amazon Kinesis with Amazon
305        CloudWatch`_ in the Amazon Kinesis Developer Guide .
306
307        :type shard_iterator: string
308        :param shard_iterator: The position in the shard from which you want to
309            start sequentially reading data records. A shard iterator specifies
310            this position using the sequence number of a data record in the
311            shard.
312
313        :type limit: integer
314        :param limit: The maximum number of records to return. Specify a value
315            of up to 10,000. If you specify a value that is greater than
316            10,000, `GetRecords` throws `InvalidArgumentException`.
317
318        :type b64_decode: boolean
319        :param b64_decode: Decode the Base64-encoded ``Data`` field of records.
320
321        """
322        params = {'ShardIterator': shard_iterator, }
323        if limit is not None:
324            params['Limit'] = limit
325
326        response = self.make_request(action='GetRecords',
327                                     body=json.dumps(params))
328
329        # Base64 decode the data
330        if b64_decode:
331            for record in response.get('Records', []):
332                record['Data'] = base64.b64decode(
333                    record['Data'].encode('utf-8')).decode('utf-8')
334
335        return response
336
337    def get_shard_iterator(self, stream_name, shard_id, shard_iterator_type,
338                           starting_sequence_number=None):
339        """
340        Gets a shard iterator. A shard iterator expires five minutes
341        after it is returned to the requester.
342
343        A shard iterator specifies the position in the shard from
344        which to start reading data records sequentially. A shard
345        iterator specifies this position using the sequence number of
346        a data record in a shard. A sequence number is the identifier
347        associated with every record ingested in the Amazon Kinesis
348        stream. The sequence number is assigned when a record is put
349        into the stream.
350
351        You must specify the shard iterator type. For example, you can
352        set the `ShardIteratorType` parameter to read exactly from the
353        position denoted by a specific sequence number by using the
354        `AT_SEQUENCE_NUMBER` shard iterator type, or right after the
355        sequence number by using the `AFTER_SEQUENCE_NUMBER` shard
356        iterator type, using sequence numbers returned by earlier
357        calls to PutRecord, PutRecords, GetRecords, or DescribeStream.
358        You can specify the shard iterator type `TRIM_HORIZON` in the
359        request to cause `ShardIterator` to point to the last
360        untrimmed record in the shard in the system, which is the
361        oldest data record in the shard. Or you can point to just
362        after the most recent record in the shard, by using the shard
363        iterator type `LATEST`, so that you always read the most
364        recent data in the shard.
365
366        When you repeatedly read from an Amazon Kinesis stream use a
367        GetShardIterator request to get the first shard iterator to to
368        use in your first `GetRecords` request and then use the shard
369        iterator returned by the `GetRecords` request in
370        `NextShardIterator` for subsequent reads. A new shard iterator
371        is returned by every `GetRecords` request in
372        `NextShardIterator`, which you use in the `ShardIterator`
373        parameter of the next `GetRecords` request.
374
375        If a `GetShardIterator` request is made too often, you receive
376        a `ProvisionedThroughputExceededException`. For more
377        information about throughput limits, see GetRecords.
378
379        If the shard is closed, the iterator can't return more data,
380        and `GetShardIterator` returns `null` for its `ShardIterator`.
381        A shard can be closed using SplitShard or MergeShards.
382
383        `GetShardIterator` has a limit of 5 transactions per second
384        per account per open shard.
385
386        :type stream_name: string
387        :param stream_name: The name of the stream.
388
389        :type shard_id: string
390        :param shard_id: The shard ID of the shard to get the iterator for.
391
392        :type shard_iterator_type: string
393        :param shard_iterator_type:
394        Determines how the shard iterator is used to start reading data records
395            from the shard.
396
397        The following are the valid shard iterator types:
398
399
400        + AT_SEQUENCE_NUMBER - Start reading exactly from the position denoted
401              by a specific sequence number.
402        + AFTER_SEQUENCE_NUMBER - Start reading right after the position
403              denoted by a specific sequence number.
404        + TRIM_HORIZON - Start reading at the last untrimmed record in the
405              shard in the system, which is the oldest data record in the shard.
406        + LATEST - Start reading just after the most recent record in the
407              shard, so that you always read the most recent data in the shard.
408
409        :type starting_sequence_number: string
410        :param starting_sequence_number: The sequence number of the data record
411            in the shard from which to start reading from.
412
413        """
414        params = {
415            'StreamName': stream_name,
416            'ShardId': shard_id,
417            'ShardIteratorType': shard_iterator_type,
418        }
419        if starting_sequence_number is not None:
420            params['StartingSequenceNumber'] = starting_sequence_number
421        return self.make_request(action='GetShardIterator',
422                                 body=json.dumps(params))
423
424    def list_streams(self, limit=None, exclusive_start_stream_name=None):
425        """
426        Lists your streams.
427
428        The number of streams may be too large to return from a single
429        call to `ListStreams`. You can limit the number of returned
430        streams using the `Limit` parameter. If you do not specify a
431        value for the `Limit` parameter, Amazon Kinesis uses the
432        default limit, which is currently 10.
433
434        You can detect if there are more streams available to list by
435        using the `HasMoreStreams` flag from the returned output. If
436        there are more streams available, you can request more streams
437        by using the name of the last stream returned by the
438        `ListStreams` request in the `ExclusiveStartStreamName`
439        parameter in a subsequent request to `ListStreams`. The group
440        of stream names returned by the subsequent request is then
441        added to the list. You can continue this process until all the
442        stream names have been collected in the list.
443
444        `ListStreams` has a limit of 5 transactions per second per
445        account.
446
447        :type limit: integer
448        :param limit: The maximum number of streams to list.
449
450        :type exclusive_start_stream_name: string
451        :param exclusive_start_stream_name: The name of the stream to start the
452            list with.
453
454        """
455        params = {}
456        if limit is not None:
457            params['Limit'] = limit
458        if exclusive_start_stream_name is not None:
459            params['ExclusiveStartStreamName'] = exclusive_start_stream_name
460        return self.make_request(action='ListStreams',
461                                 body=json.dumps(params))
462
463    def list_tags_for_stream(self, stream_name, exclusive_start_tag_key=None,
464                             limit=None):
465        """
466        Lists the tags for the specified Amazon Kinesis stream.
467
468        :type stream_name: string
469        :param stream_name: The name of the stream.
470
471        :type exclusive_start_tag_key: string
472        :param exclusive_start_tag_key: The key to use as the starting point
473            for the list of tags. If this parameter is set, `ListTagsForStream`
474            gets all tags that occur after `ExclusiveStartTagKey`.
475
476        :type limit: integer
477        :param limit: The number of tags to return. If this number is less than
478            the total number of tags associated with the stream, `HasMoreTags`
479            is set to `True`. To list additional tags, set
480            `ExclusiveStartTagKey` to the last key in the response.
481
482        """
483        params = {'StreamName': stream_name, }
484        if exclusive_start_tag_key is not None:
485            params['ExclusiveStartTagKey'] = exclusive_start_tag_key
486        if limit is not None:
487            params['Limit'] = limit
488        return self.make_request(action='ListTagsForStream',
489                                 body=json.dumps(params))
490
491    def merge_shards(self, stream_name, shard_to_merge,
492                     adjacent_shard_to_merge):
493        """
494        Merges two adjacent shards in a stream and combines them into
495        a single shard to reduce the stream's capacity to ingest and
496        transport data. Two shards are considered adjacent if the
497        union of the hash key ranges for the two shards form a
498        contiguous set with no gaps. For example, if you have two
499        shards, one with a hash key range of 276...381 and the other
500        with a hash key range of 382...454, then you could merge these
501        two shards into a single shard that would have a hash key
502        range of 276...454. After the merge, the single child shard
503        receives data for all hash key values covered by the two
504        parent shards.
505
506        `MergeShards` is called when there is a need to reduce the
507        overall capacity of a stream because of excess capacity that
508        is not being used. You must specify the shard to be merged and
509        the adjacent shard for a stream. For more information about
510        merging shards, see `Merge Two Shards`_ in the Amazon Kinesis
511        Developer Guide .
512
513        If the stream is in the `ACTIVE` state, you can call
514        `MergeShards`. If a stream is in the `CREATING`, `UPDATING`,
515        or `DELETING` state, `MergeShards` returns a
516        `ResourceInUseException`. If the specified stream does not
517        exist, `MergeShards` returns a `ResourceNotFoundException`.
518
519        You can use DescribeStream to check the state of the stream,
520        which is returned in `StreamStatus`.
521
522        `MergeShards` is an asynchronous operation. Upon receiving a
523        `MergeShards` request, Amazon Kinesis immediately returns a
524        response and sets the `StreamStatus` to `UPDATING`. After the
525        operation is completed, Amazon Kinesis sets the `StreamStatus`
526        to `ACTIVE`. Read and write operations continue to work while
527        the stream is in the `UPDATING` state.
528
529        You use DescribeStream to determine the shard IDs that are
530        specified in the `MergeShards` request.
531
532        If you try to operate on too many streams in parallel using
533        CreateStream, DeleteStream, `MergeShards` or SplitShard, you
534        will receive a `LimitExceededException`.
535
536        `MergeShards` has limit of 5 transactions per second per
537        account.
538
539        :type stream_name: string
540        :param stream_name: The name of the stream for the merge.
541
542        :type shard_to_merge: string
543        :param shard_to_merge: The shard ID of the shard to combine with the
544            adjacent shard for the merge.
545
546        :type adjacent_shard_to_merge: string
547        :param adjacent_shard_to_merge: The shard ID of the adjacent shard for
548            the merge.
549
550        """
551        params = {
552            'StreamName': stream_name,
553            'ShardToMerge': shard_to_merge,
554            'AdjacentShardToMerge': adjacent_shard_to_merge,
555        }
556        return self.make_request(action='MergeShards',
557                                 body=json.dumps(params))
558
559    def put_record(self, stream_name, data, partition_key,
560                   explicit_hash_key=None,
561                   sequence_number_for_ordering=None,
562                   exclusive_minimum_sequence_number=None,
563                   b64_encode=True):
564        """
565        This operation puts a data record into an Amazon Kinesis
566        stream from a producer. This operation must be called to send
567        data from the producer into the Amazon Kinesis stream for
568        real-time ingestion and subsequent processing. The `PutRecord`
569        operation requires the name of the stream that captures,
570        stores, and transports the data; a partition key; and the data
571        blob itself. The data blob could be a segment from a log file,
572        geographic/location data, website clickstream data, or any
573        other data type.
574
575        The partition key is used to distribute data across shards.
576        Amazon Kinesis segregates the data records that belong to a
577        data stream into multiple shards, using the partition key
578        associated with each data record to determine which shard a
579        given data record belongs to.
580
581        Partition keys are Unicode strings, with a maximum length
582        limit of 256 bytes. An MD5 hash function is used to map
583        partition keys to 128-bit integer values and to map associated
584        data records to shards using the hash key ranges of the
585        shards. You can override hashing the partition key to
586        determine the shard by explicitly specifying a hash value
587        using the `ExplicitHashKey` parameter. For more information,
588        see the `Amazon Kinesis Developer Guide`_.
589
590        `PutRecord` returns the shard ID of where the data record was
591        placed and the sequence number that was assigned to the data
592        record.
593
594        Sequence numbers generally increase over time. To guarantee
595        strictly increasing ordering, use the
596        `SequenceNumberForOrdering` parameter. For more information,
597        see the `Amazon Kinesis Developer Guide`_.
598
599        If a `PutRecord` request cannot be processed because of
600        insufficient provisioned throughput on the shard involved in
601        the request, `PutRecord` throws
602        `ProvisionedThroughputExceededException`.
603
604        Data records are accessible for only 24 hours from the time
605        that they are added to an Amazon Kinesis stream.
606
607        :type stream_name: string
608        :param stream_name: The name of the stream to put the data record into.
609
610        :type data: blob
611        :param data: The data blob to put into the record, which is
612            Base64-encoded when the blob is serialized.
613            The maximum size of the data blob (the payload after
614            Base64-decoding) is 50 kilobytes (KB)
615            Set `b64_encode` to disable automatic Base64 encoding.
616
617        :type partition_key: string
618        :param partition_key: Determines which shard in the stream the data
619            record is assigned to. Partition keys are Unicode strings with a
620            maximum length limit of 256 bytes. Amazon Kinesis uses the
621            partition key as input to a hash function that maps the partition
622            key and associated data to a specific shard. Specifically, an MD5
623            hash function is used to map partition keys to 128-bit integer
624            values and to map associated data records to shards. As a result of
625            this hashing mechanism, all data records with the same partition
626            key will map to the same shard within the stream.
627
628        :type explicit_hash_key: string
629        :param explicit_hash_key: The hash value used to explicitly determine
630            the shard the data record is assigned to by overriding the
631            partition key hash.
632
633        :type sequence_number_for_ordering: string
634        :param sequence_number_for_ordering: Guarantees strictly increasing
635            sequence numbers, for puts from the same client and to the same
636            partition key. Usage: set the `SequenceNumberForOrdering` of record
637            n to the sequence number of record n-1 (as returned in the
638            PutRecordResult when putting record n-1 ). If this parameter is not
639            set, records will be coarsely ordered based on arrival time.
640
641        :type b64_encode: boolean
642        :param b64_encode: Whether to Base64 encode `data`. Can be set to
643            ``False`` if `data` is already encoded to prevent double encoding.
644
645        """
646        params = {
647            'StreamName': stream_name,
648            'Data': data,
649            'PartitionKey': partition_key,
650        }
651        if explicit_hash_key is not None:
652            params['ExplicitHashKey'] = explicit_hash_key
653        if sequence_number_for_ordering is not None:
654            params['SequenceNumberForOrdering'] = sequence_number_for_ordering
655        if b64_encode:
656            if not isinstance(params['Data'], six.binary_type):
657                params['Data'] = params['Data'].encode('utf-8')
658            params['Data'] = base64.b64encode(params['Data']).decode('utf-8')
659        return self.make_request(action='PutRecord',
660                                 body=json.dumps(params))
661
662    def put_records(self, records, stream_name, b64_encode=True):
663        """
664        Puts (writes) multiple data records from a producer into an
665        Amazon Kinesis stream in a single call (also referred to as a
666        `PutRecords` request). Use this operation to send data from a
667        data producer into the Amazon Kinesis stream for real-time
668        ingestion and processing. Each shard can support up to 1000
669        records written per second, up to a maximum total of 1 MB data
670        written per second.
671
672        You must specify the name of the stream that captures, stores,
673        and transports the data; and an array of request `Records`,
674        with each record in the array requiring a partition key and
675        data blob.
676
677        The data blob can be any type of data; for example, a segment
678        from a log file, geographic/location data, website clickstream
679        data, and so on.
680
681        The partition key is used by Amazon Kinesis as input to a hash
682        function that maps the partition key and associated data to a
683        specific shard. An MD5 hash function is used to map partition
684        keys to 128-bit integer values and to map associated data
685        records to shards. As a result of this hashing mechanism, all
686        data records with the same partition key map to the same shard
687        within the stream. For more information, see `Partition Key`_
688        in the Amazon Kinesis Developer Guide .
689
690        Each record in the `Records` array may include an optional
691        parameter, `ExplicitHashKey`, which overrides the partition
692        key to shard mapping. This parameter allows a data producer to
693        determine explicitly the shard where the record is stored. For
694        more information, see `Adding Multiple Records with
695        PutRecords`_ in the Amazon Kinesis Developer Guide .
696
697        The `PutRecords` response includes an array of response
698        `Records`. Each record in the response array directly
699        correlates with a record in the request array using natural
700        ordering, from the top to the bottom of the request and
701        response. The response `Records` array always includes the
702        same number of records as the request array.
703
704        The response `Records` array includes both successfully and
705        unsuccessfully processed records. Amazon Kinesis attempts to
706        process all records in each `PutRecords` request. A single
707        record failure does not stop the processing of subsequent
708        records.
709
710        A successfully-processed record includes `ShardId` and
711        `SequenceNumber` values. The `ShardId` parameter identifies
712        the shard in the stream where the record is stored. The
713        `SequenceNumber` parameter is an identifier assigned to the
714        put record, unique to all records in the stream.
715
716        An unsuccessfully-processed record includes `ErrorCode` and
717        `ErrorMessage` values. `ErrorCode` reflects the type of error
718        and can be one of the following values:
719        `ProvisionedThroughputExceededException` or `InternalFailure`.
720        `ErrorMessage` provides more detailed information about the
721        `ProvisionedThroughputExceededException` exception including
722        the account ID, stream name, and shard ID of the record that
723        was throttled.
724
725        Data records are accessible for only 24 hours from the time
726        that they are added to an Amazon Kinesis stream.
727
728        :type records: list
729        :param records: The records associated with the request.
730
731        :type stream_name: string
732        :param stream_name: The stream name associated with the request.
733
734        :type b64_encode: boolean
735        :param b64_encode: Whether to Base64 encode `data`. Can be set to
736            ``False`` if `data` is already encoded to prevent double encoding.
737
738        """
739        params = {'Records': records, 'StreamName': stream_name, }
740        if b64_encode:
741            for i in range(len(params['Records'])):
742                data = params['Records'][i]['Data']
743                if not isinstance(data, six.binary_type):
744                    data = data.encode('utf-8')
745                params['Records'][i]['Data'] = base64.b64encode(
746                    data).decode('utf-8')
747        return self.make_request(action='PutRecords',
748                                 body=json.dumps(params))
749
750    def remove_tags_from_stream(self, stream_name, tag_keys):
751        """
752        Deletes tags from the specified Amazon Kinesis stream.
753
754        If you specify a tag that does not exist, it is ignored.
755
756        :type stream_name: string
757        :param stream_name: The name of the stream.
758
759        :type tag_keys: list
760        :param tag_keys: A list of tag keys. Each corresponding tag is removed
761            from the stream.
762
763        """
764        params = {'StreamName': stream_name, 'TagKeys': tag_keys, }
765        return self.make_request(action='RemoveTagsFromStream',
766                                 body=json.dumps(params))
767
768    def split_shard(self, stream_name, shard_to_split, new_starting_hash_key):
769        """
770        Splits a shard into two new shards in the stream, to increase
771        the stream's capacity to ingest and transport data.
772        `SplitShard` is called when there is a need to increase the
773        overall capacity of stream because of an expected increase in
774        the volume of data records being ingested.
775
776        You can also use `SplitShard` when a shard appears to be
777        approaching its maximum utilization, for example, when the set
778        of producers sending data into the specific shard are suddenly
779        sending more than previously anticipated. You can also call
780        `SplitShard` to increase stream capacity, so that more Amazon
781        Kinesis applications can simultaneously read data from the
782        stream for real-time processing.
783
784        You must specify the shard to be split and the new hash key,
785        which is the position in the shard where the shard gets split
786        in two. In many cases, the new hash key might simply be the
787        average of the beginning and ending hash key, but it can be
788        any hash key value in the range being mapped into the shard.
789        For more information about splitting shards, see `Split a
790        Shard`_ in the Amazon Kinesis Developer Guide .
791
792        You can use DescribeStream to determine the shard ID and hash
793        key values for the `ShardToSplit` and `NewStartingHashKey`
794        parameters that are specified in the `SplitShard` request.
795
796        `SplitShard` is an asynchronous operation. Upon receiving a
797        `SplitShard` request, Amazon Kinesis immediately returns a
798        response and sets the stream status to `UPDATING`. After the
799        operation is completed, Amazon Kinesis sets the stream status
800        to `ACTIVE`. Read and write operations continue to work while
801        the stream is in the `UPDATING` state.
802
803        You can use `DescribeStream` to check the status of the
804        stream, which is returned in `StreamStatus`. If the stream is
805        in the `ACTIVE` state, you can call `SplitShard`. If a stream
806        is in `CREATING` or `UPDATING` or `DELETING` states,
807        `DescribeStream` returns a `ResourceInUseException`.
808
809        If the specified stream does not exist, `DescribeStream`
810        returns a `ResourceNotFoundException`. If you try to create
811        more shards than are authorized for your account, you receive
812        a `LimitExceededException`.
813
814        The default limit for an AWS account is 10 shards per stream.
815        If you need to create a stream with more than 10 shards,
816        `contact AWS Support`_ to increase the limit on your account.
817
818        If you try to operate on too many streams in parallel using
819        CreateStream, DeleteStream, MergeShards or SplitShard, you
820        receive a `LimitExceededException`.
821
822        `SplitShard` has limit of 5 transactions per second per
823        account.
824
825        :type stream_name: string
826        :param stream_name: The name of the stream for the shard split.
827
828        :type shard_to_split: string
829        :param shard_to_split: The shard ID of the shard to split.
830
831        :type new_starting_hash_key: string
832        :param new_starting_hash_key: A hash key value for the starting hash
833            key of one of the child shards created by the split. The hash key
834            range for a given shard constitutes a set of ordered contiguous
835            positive integers. The value for `NewStartingHashKey` must be in
836            the range of hash keys being mapped into the shard. The
837            `NewStartingHashKey` hash key value and all higher hash key values
838            in hash key range are distributed to one of the child shards. All
839            the lower hash key values in the range are distributed to the other
840            child shard.
841
842        """
843        params = {
844            'StreamName': stream_name,
845            'ShardToSplit': shard_to_split,
846            'NewStartingHashKey': new_starting_hash_key,
847        }
848        return self.make_request(action='SplitShard',
849                                 body=json.dumps(params))
850
851    def make_request(self, action, body):
852        headers = {
853            'X-Amz-Target': '%s.%s' % (self.TargetPrefix, action),
854            'Host': self.region.endpoint,
855            'Content-Type': 'application/x-amz-json-1.1',
856            'Content-Length': str(len(body)),
857        }
858        http_request = self.build_base_http_request(
859            method='POST', path='/', auth_path='/', params={},
860            headers=headers, data=body)
861        response = self._mexe(http_request, sender=None,
862                              override_num_retries=10)
863        response_body = response.read().decode('utf-8')
864        boto.log.debug(response.getheaders())
865        boto.log.debug(response_body)
866        if response.status == 200:
867            if response_body:
868                return json.loads(response_body)
869        else:
870            json_body = json.loads(response_body)
871            fault_name = json_body.get('__type', None)
872            exception_class = self._faults.get(fault_name, self.ResponseError)
873            raise exception_class(response.status, response.reason,
874                                  body=json_body)
875
876