1# Copyright (c) 2014 Amazon.com, Inc. or its affiliates. All Rights Reserved 2# 3# Permission is hereby granted, free of charge, to any person obtaining a 4# copy of this software and associated documentation files (the 5# "Software"), to deal in the Software without restriction, including 6# without limitation the rights to use, copy, modify, merge, publish, dis- 7# tribute, sublicense, and/or sell copies of the Software, and to permit 8# persons to whom the Software is furnished to do so, subject to the fol- 9# lowing conditions: 10# 11# The above copyright notice and this permission notice shall be included 12# in all copies or substantial portions of the Software. 13# 14# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS 15# OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABIL- 16# ITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT 17# SHALL THE AUTHOR BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, 18# WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, 19# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS 20# IN THE SOFTWARE. 21# 22 23import base64 24import boto 25 26from boto.connection import AWSQueryConnection 27from boto.regioninfo import RegionInfo 28from boto.exception import JSONResponseError 29from boto.kinesis import exceptions 30from boto.compat import json 31from boto.compat import six 32 33 34class KinesisConnection(AWSQueryConnection): 35 """ 36 Amazon Kinesis Service API Reference 37 Amazon Kinesis is a managed service that scales elastically for 38 real time processing of streaming big data. 39 """ 40 APIVersion = "2013-12-02" 41 DefaultRegionName = "us-east-1" 42 DefaultRegionEndpoint = "kinesis.us-east-1.amazonaws.com" 43 ServiceName = "Kinesis" 44 TargetPrefix = "Kinesis_20131202" 45 ResponseError = JSONResponseError 46 47 _faults = { 48 "ProvisionedThroughputExceededException": exceptions.ProvisionedThroughputExceededException, 49 "LimitExceededException": exceptions.LimitExceededException, 50 "ExpiredIteratorException": exceptions.ExpiredIteratorException, 51 "ResourceInUseException": exceptions.ResourceInUseException, 52 "ResourceNotFoundException": exceptions.ResourceNotFoundException, 53 "InvalidArgumentException": exceptions.InvalidArgumentException, 54 "SubscriptionRequiredException": exceptions.SubscriptionRequiredException 55 } 56 57 58 def __init__(self, **kwargs): 59 region = kwargs.pop('region', None) 60 if not region: 61 region = RegionInfo(self, self.DefaultRegionName, 62 self.DefaultRegionEndpoint) 63 if 'host' not in kwargs: 64 kwargs['host'] = region.endpoint 65 super(KinesisConnection, self).__init__(**kwargs) 66 self.region = region 67 68 def _required_auth_capability(self): 69 return ['hmac-v4'] 70 71 def add_tags_to_stream(self, stream_name, tags): 72 """ 73 Adds or updates tags for the specified Amazon Kinesis stream. 74 Each stream can have up to 10 tags. 75 76 If tags have already been assigned to the stream, 77 `AddTagsToStream` overwrites any existing tags that correspond 78 to the specified tag keys. 79 80 :type stream_name: string 81 :param stream_name: The name of the stream. 82 83 :type tags: map 84 :param tags: The set of key-value pairs to use to create the tags. 85 86 """ 87 params = {'StreamName': stream_name, 'Tags': tags, } 88 return self.make_request(action='AddTagsToStream', 89 body=json.dumps(params)) 90 91 def create_stream(self, stream_name, shard_count): 92 """ 93 Creates a Amazon Kinesis stream. A stream captures and 94 transports data records that are continuously emitted from 95 different data sources or producers . Scale-out within an 96 Amazon Kinesis stream is explicitly supported by means of 97 shards, which are uniquely identified groups of data records 98 in an Amazon Kinesis stream. 99 100 You specify and control the number of shards that a stream is 101 composed of. Each open shard can support up to 5 read 102 transactions per second, up to a maximum total of 2 MB of data 103 read per second. Each shard can support up to 1000 records 104 written per second, up to a maximum total of 1 MB data written 105 per second. You can add shards to a stream if the amount of 106 data input increases and you can remove shards if the amount 107 of data input decreases. 108 109 The stream name identifies the stream. The name is scoped to 110 the AWS account used by the application. It is also scoped by 111 region. That is, two streams in two different accounts can 112 have the same name, and two streams in the same account, but 113 in two different regions, can have the same name. 114 115 `CreateStream` is an asynchronous operation. Upon receiving a 116 `CreateStream` request, Amazon Kinesis immediately returns and 117 sets the stream status to `CREATING`. After the stream is 118 created, Amazon Kinesis sets the stream status to `ACTIVE`. 119 You should perform read and write operations only on an 120 `ACTIVE` stream. 121 122 You receive a `LimitExceededException` when making a 123 `CreateStream` request if you try to do one of the following: 124 125 126 + Have more than five streams in the `CREATING` state at any 127 point in time. 128 + Create more shards than are authorized for your account. 129 130 131 The default limit for an AWS account is 10 shards per stream. 132 If you need to create a stream with more than 10 shards, 133 `contact AWS Support`_ to increase the limit on your account. 134 135 You can use `DescribeStream` to check the stream status, which 136 is returned in `StreamStatus`. 137 138 `CreateStream` has a limit of 5 transactions per second per 139 account. 140 141 :type stream_name: string 142 :param stream_name: A name to identify the stream. The stream name is 143 scoped to the AWS account used by the application that creates the 144 stream. It is also scoped by region. That is, two streams in two 145 different AWS accounts can have the same name, and two streams in 146 the same AWS account, but in two different regions, can have the 147 same name. 148 149 :type shard_count: integer 150 :param shard_count: The number of shards that the stream will use. The 151 throughput of the stream is a function of the number of shards; 152 more shards are required for greater provisioned throughput. 153 **Note:** The default limit for an AWS account is 10 shards per stream. 154 If you need to create a stream with more than 10 shards, `contact 155 AWS Support`_ to increase the limit on your account. 156 157 """ 158 params = { 159 'StreamName': stream_name, 160 'ShardCount': shard_count, 161 } 162 return self.make_request(action='CreateStream', 163 body=json.dumps(params)) 164 165 def delete_stream(self, stream_name): 166 """ 167 Deletes a stream and all its shards and data. You must shut 168 down any applications that are operating on the stream before 169 you delete the stream. If an application attempts to operate 170 on a deleted stream, it will receive the exception 171 `ResourceNotFoundException`. 172 173 If the stream is in the `ACTIVE` state, you can delete it. 174 After a `DeleteStream` request, the specified stream is in the 175 `DELETING` state until Amazon Kinesis completes the deletion. 176 177 **Note:** Amazon Kinesis might continue to accept data read 178 and write operations, such as PutRecord, PutRecords, and 179 GetRecords, on a stream in the `DELETING` state until the 180 stream deletion is complete. 181 182 When you delete a stream, any shards in that stream are also 183 deleted, and any tags are dissociated from the stream. 184 185 You can use the DescribeStream operation to check the state of 186 the stream, which is returned in `StreamStatus`. 187 188 `DeleteStream` has a limit of 5 transactions per second per 189 account. 190 191 :type stream_name: string 192 :param stream_name: The name of the stream to delete. 193 194 """ 195 params = {'StreamName': stream_name, } 196 return self.make_request(action='DeleteStream', 197 body=json.dumps(params)) 198 199 def describe_stream(self, stream_name, limit=None, 200 exclusive_start_shard_id=None): 201 """ 202 Describes the specified stream. 203 204 The information about the stream includes its current status, 205 its Amazon Resource Name (ARN), and an array of shard objects. 206 For each shard object, there is information about the hash key 207 and sequence number ranges that the shard spans, and the IDs 208 of any earlier shards that played in a role in creating the 209 shard. A sequence number is the identifier associated with 210 every record ingested in the Amazon Kinesis stream. The 211 sequence number is assigned when a record is put into the 212 stream. 213 214 You can limit the number of returned shards using the `Limit` 215 parameter. The number of shards in a stream may be too large 216 to return from a single call to `DescribeStream`. You can 217 detect this by using the `HasMoreShards` flag in the returned 218 output. `HasMoreShards` is set to `True` when there is more 219 data available. 220 221 `DescribeStream` is a paginated operation. If there are more 222 shards available, you can request them using the shard ID of 223 the last shard returned. Specify this ID in the 224 `ExclusiveStartShardId` parameter in a subsequent request to 225 `DescribeStream`. 226 227 `DescribeStream` has a limit of 10 transactions per second per 228 account. 229 230 :type stream_name: string 231 :param stream_name: The name of the stream to describe. 232 233 :type limit: integer 234 :param limit: The maximum number of shards to return. 235 236 :type exclusive_start_shard_id: string 237 :param exclusive_start_shard_id: The shard ID of the shard to start 238 with. 239 240 """ 241 params = {'StreamName': stream_name, } 242 if limit is not None: 243 params['Limit'] = limit 244 if exclusive_start_shard_id is not None: 245 params['ExclusiveStartShardId'] = exclusive_start_shard_id 246 return self.make_request(action='DescribeStream', 247 body=json.dumps(params)) 248 249 def get_records(self, shard_iterator, limit=None, b64_decode=True): 250 """ 251 Gets data records from a shard. 252 253 Specify a shard iterator using the `ShardIterator` parameter. 254 The shard iterator specifies the position in the shard from 255 which you want to start reading data records sequentially. If 256 there are no records available in the portion of the shard 257 that the iterator points to, `GetRecords` returns an empty 258 list. Note that it might take multiple calls to get to a 259 portion of the shard that contains records. 260 261 You can scale by provisioning multiple shards. Your 262 application should have one thread per shard, each reading 263 continuously from its stream. To read from a stream 264 continually, call `GetRecords` in a loop. Use GetShardIterator 265 to get the shard iterator to specify in the first `GetRecords` 266 call. `GetRecords` returns a new shard iterator in 267 `NextShardIterator`. Specify the shard iterator returned in 268 `NextShardIterator` in subsequent calls to `GetRecords`. Note 269 that if the shard has been closed, the shard iterator can't 270 return more data and `GetRecords` returns `null` in 271 `NextShardIterator`. You can terminate the loop when the shard 272 is closed, or when the shard iterator reaches the record with 273 the sequence number or other attribute that marks it as the 274 last record to process. 275 276 Each data record can be up to 50 KB in size, and each shard 277 can read up to 2 MB per second. You can ensure that your calls 278 don't exceed the maximum supported size or throughput by using 279 the `Limit` parameter to specify the maximum number of records 280 that `GetRecords` can return. Consider your average record 281 size when determining this limit. For example, if your average 282 record size is 40 KB, you can limit the data returned to about 283 1 MB per call by specifying 25 as the limit. 284 285 The size of the data returned by `GetRecords` will vary 286 depending on the utilization of the shard. The maximum size of 287 data that `GetRecords` can return is 10 MB. If a call returns 288 10 MB of data, subsequent calls made within the next 5 seconds 289 throw `ProvisionedThroughputExceededException`. If there is 290 insufficient provisioned throughput on the shard, subsequent 291 calls made within the next 1 second throw 292 `ProvisionedThroughputExceededException`. Note that 293 `GetRecords` won't return any data when it throws an 294 exception. For this reason, we recommend that you wait one 295 second between calls to `GetRecords`; however, it's possible 296 that the application will get exceptions for longer than 1 297 second. 298 299 To detect whether the application is falling behind in 300 processing, add a timestamp to your records and note how long 301 it takes to process them. You can also monitor how much data 302 is in a stream using the CloudWatch metrics for write 303 operations ( `PutRecord` and `PutRecords`). For more 304 information, see `Monitoring Amazon Kinesis with Amazon 305 CloudWatch`_ in the Amazon Kinesis Developer Guide . 306 307 :type shard_iterator: string 308 :param shard_iterator: The position in the shard from which you want to 309 start sequentially reading data records. A shard iterator specifies 310 this position using the sequence number of a data record in the 311 shard. 312 313 :type limit: integer 314 :param limit: The maximum number of records to return. Specify a value 315 of up to 10,000. If you specify a value that is greater than 316 10,000, `GetRecords` throws `InvalidArgumentException`. 317 318 :type b64_decode: boolean 319 :param b64_decode: Decode the Base64-encoded ``Data`` field of records. 320 321 """ 322 params = {'ShardIterator': shard_iterator, } 323 if limit is not None: 324 params['Limit'] = limit 325 326 response = self.make_request(action='GetRecords', 327 body=json.dumps(params)) 328 329 # Base64 decode the data 330 if b64_decode: 331 for record in response.get('Records', []): 332 record['Data'] = base64.b64decode( 333 record['Data'].encode('utf-8')).decode('utf-8') 334 335 return response 336 337 def get_shard_iterator(self, stream_name, shard_id, shard_iterator_type, 338 starting_sequence_number=None): 339 """ 340 Gets a shard iterator. A shard iterator expires five minutes 341 after it is returned to the requester. 342 343 A shard iterator specifies the position in the shard from 344 which to start reading data records sequentially. A shard 345 iterator specifies this position using the sequence number of 346 a data record in a shard. A sequence number is the identifier 347 associated with every record ingested in the Amazon Kinesis 348 stream. The sequence number is assigned when a record is put 349 into the stream. 350 351 You must specify the shard iterator type. For example, you can 352 set the `ShardIteratorType` parameter to read exactly from the 353 position denoted by a specific sequence number by using the 354 `AT_SEQUENCE_NUMBER` shard iterator type, or right after the 355 sequence number by using the `AFTER_SEQUENCE_NUMBER` shard 356 iterator type, using sequence numbers returned by earlier 357 calls to PutRecord, PutRecords, GetRecords, or DescribeStream. 358 You can specify the shard iterator type `TRIM_HORIZON` in the 359 request to cause `ShardIterator` to point to the last 360 untrimmed record in the shard in the system, which is the 361 oldest data record in the shard. Or you can point to just 362 after the most recent record in the shard, by using the shard 363 iterator type `LATEST`, so that you always read the most 364 recent data in the shard. 365 366 When you repeatedly read from an Amazon Kinesis stream use a 367 GetShardIterator request to get the first shard iterator to to 368 use in your first `GetRecords` request and then use the shard 369 iterator returned by the `GetRecords` request in 370 `NextShardIterator` for subsequent reads. A new shard iterator 371 is returned by every `GetRecords` request in 372 `NextShardIterator`, which you use in the `ShardIterator` 373 parameter of the next `GetRecords` request. 374 375 If a `GetShardIterator` request is made too often, you receive 376 a `ProvisionedThroughputExceededException`. For more 377 information about throughput limits, see GetRecords. 378 379 If the shard is closed, the iterator can't return more data, 380 and `GetShardIterator` returns `null` for its `ShardIterator`. 381 A shard can be closed using SplitShard or MergeShards. 382 383 `GetShardIterator` has a limit of 5 transactions per second 384 per account per open shard. 385 386 :type stream_name: string 387 :param stream_name: The name of the stream. 388 389 :type shard_id: string 390 :param shard_id: The shard ID of the shard to get the iterator for. 391 392 :type shard_iterator_type: string 393 :param shard_iterator_type: 394 Determines how the shard iterator is used to start reading data records 395 from the shard. 396 397 The following are the valid shard iterator types: 398 399 400 + AT_SEQUENCE_NUMBER - Start reading exactly from the position denoted 401 by a specific sequence number. 402 + AFTER_SEQUENCE_NUMBER - Start reading right after the position 403 denoted by a specific sequence number. 404 + TRIM_HORIZON - Start reading at the last untrimmed record in the 405 shard in the system, which is the oldest data record in the shard. 406 + LATEST - Start reading just after the most recent record in the 407 shard, so that you always read the most recent data in the shard. 408 409 :type starting_sequence_number: string 410 :param starting_sequence_number: The sequence number of the data record 411 in the shard from which to start reading from. 412 413 """ 414 params = { 415 'StreamName': stream_name, 416 'ShardId': shard_id, 417 'ShardIteratorType': shard_iterator_type, 418 } 419 if starting_sequence_number is not None: 420 params['StartingSequenceNumber'] = starting_sequence_number 421 return self.make_request(action='GetShardIterator', 422 body=json.dumps(params)) 423 424 def list_streams(self, limit=None, exclusive_start_stream_name=None): 425 """ 426 Lists your streams. 427 428 The number of streams may be too large to return from a single 429 call to `ListStreams`. You can limit the number of returned 430 streams using the `Limit` parameter. If you do not specify a 431 value for the `Limit` parameter, Amazon Kinesis uses the 432 default limit, which is currently 10. 433 434 You can detect if there are more streams available to list by 435 using the `HasMoreStreams` flag from the returned output. If 436 there are more streams available, you can request more streams 437 by using the name of the last stream returned by the 438 `ListStreams` request in the `ExclusiveStartStreamName` 439 parameter in a subsequent request to `ListStreams`. The group 440 of stream names returned by the subsequent request is then 441 added to the list. You can continue this process until all the 442 stream names have been collected in the list. 443 444 `ListStreams` has a limit of 5 transactions per second per 445 account. 446 447 :type limit: integer 448 :param limit: The maximum number of streams to list. 449 450 :type exclusive_start_stream_name: string 451 :param exclusive_start_stream_name: The name of the stream to start the 452 list with. 453 454 """ 455 params = {} 456 if limit is not None: 457 params['Limit'] = limit 458 if exclusive_start_stream_name is not None: 459 params['ExclusiveStartStreamName'] = exclusive_start_stream_name 460 return self.make_request(action='ListStreams', 461 body=json.dumps(params)) 462 463 def list_tags_for_stream(self, stream_name, exclusive_start_tag_key=None, 464 limit=None): 465 """ 466 Lists the tags for the specified Amazon Kinesis stream. 467 468 :type stream_name: string 469 :param stream_name: The name of the stream. 470 471 :type exclusive_start_tag_key: string 472 :param exclusive_start_tag_key: The key to use as the starting point 473 for the list of tags. If this parameter is set, `ListTagsForStream` 474 gets all tags that occur after `ExclusiveStartTagKey`. 475 476 :type limit: integer 477 :param limit: The number of tags to return. If this number is less than 478 the total number of tags associated with the stream, `HasMoreTags` 479 is set to `True`. To list additional tags, set 480 `ExclusiveStartTagKey` to the last key in the response. 481 482 """ 483 params = {'StreamName': stream_name, } 484 if exclusive_start_tag_key is not None: 485 params['ExclusiveStartTagKey'] = exclusive_start_tag_key 486 if limit is not None: 487 params['Limit'] = limit 488 return self.make_request(action='ListTagsForStream', 489 body=json.dumps(params)) 490 491 def merge_shards(self, stream_name, shard_to_merge, 492 adjacent_shard_to_merge): 493 """ 494 Merges two adjacent shards in a stream and combines them into 495 a single shard to reduce the stream's capacity to ingest and 496 transport data. Two shards are considered adjacent if the 497 union of the hash key ranges for the two shards form a 498 contiguous set with no gaps. For example, if you have two 499 shards, one with a hash key range of 276...381 and the other 500 with a hash key range of 382...454, then you could merge these 501 two shards into a single shard that would have a hash key 502 range of 276...454. After the merge, the single child shard 503 receives data for all hash key values covered by the two 504 parent shards. 505 506 `MergeShards` is called when there is a need to reduce the 507 overall capacity of a stream because of excess capacity that 508 is not being used. You must specify the shard to be merged and 509 the adjacent shard for a stream. For more information about 510 merging shards, see `Merge Two Shards`_ in the Amazon Kinesis 511 Developer Guide . 512 513 If the stream is in the `ACTIVE` state, you can call 514 `MergeShards`. If a stream is in the `CREATING`, `UPDATING`, 515 or `DELETING` state, `MergeShards` returns a 516 `ResourceInUseException`. If the specified stream does not 517 exist, `MergeShards` returns a `ResourceNotFoundException`. 518 519 You can use DescribeStream to check the state of the stream, 520 which is returned in `StreamStatus`. 521 522 `MergeShards` is an asynchronous operation. Upon receiving a 523 `MergeShards` request, Amazon Kinesis immediately returns a 524 response and sets the `StreamStatus` to `UPDATING`. After the 525 operation is completed, Amazon Kinesis sets the `StreamStatus` 526 to `ACTIVE`. Read and write operations continue to work while 527 the stream is in the `UPDATING` state. 528 529 You use DescribeStream to determine the shard IDs that are 530 specified in the `MergeShards` request. 531 532 If you try to operate on too many streams in parallel using 533 CreateStream, DeleteStream, `MergeShards` or SplitShard, you 534 will receive a `LimitExceededException`. 535 536 `MergeShards` has limit of 5 transactions per second per 537 account. 538 539 :type stream_name: string 540 :param stream_name: The name of the stream for the merge. 541 542 :type shard_to_merge: string 543 :param shard_to_merge: The shard ID of the shard to combine with the 544 adjacent shard for the merge. 545 546 :type adjacent_shard_to_merge: string 547 :param adjacent_shard_to_merge: The shard ID of the adjacent shard for 548 the merge. 549 550 """ 551 params = { 552 'StreamName': stream_name, 553 'ShardToMerge': shard_to_merge, 554 'AdjacentShardToMerge': adjacent_shard_to_merge, 555 } 556 return self.make_request(action='MergeShards', 557 body=json.dumps(params)) 558 559 def put_record(self, stream_name, data, partition_key, 560 explicit_hash_key=None, 561 sequence_number_for_ordering=None, 562 exclusive_minimum_sequence_number=None, 563 b64_encode=True): 564 """ 565 This operation puts a data record into an Amazon Kinesis 566 stream from a producer. This operation must be called to send 567 data from the producer into the Amazon Kinesis stream for 568 real-time ingestion and subsequent processing. The `PutRecord` 569 operation requires the name of the stream that captures, 570 stores, and transports the data; a partition key; and the data 571 blob itself. The data blob could be a segment from a log file, 572 geographic/location data, website clickstream data, or any 573 other data type. 574 575 The partition key is used to distribute data across shards. 576 Amazon Kinesis segregates the data records that belong to a 577 data stream into multiple shards, using the partition key 578 associated with each data record to determine which shard a 579 given data record belongs to. 580 581 Partition keys are Unicode strings, with a maximum length 582 limit of 256 bytes. An MD5 hash function is used to map 583 partition keys to 128-bit integer values and to map associated 584 data records to shards using the hash key ranges of the 585 shards. You can override hashing the partition key to 586 determine the shard by explicitly specifying a hash value 587 using the `ExplicitHashKey` parameter. For more information, 588 see the `Amazon Kinesis Developer Guide`_. 589 590 `PutRecord` returns the shard ID of where the data record was 591 placed and the sequence number that was assigned to the data 592 record. 593 594 Sequence numbers generally increase over time. To guarantee 595 strictly increasing ordering, use the 596 `SequenceNumberForOrdering` parameter. For more information, 597 see the `Amazon Kinesis Developer Guide`_. 598 599 If a `PutRecord` request cannot be processed because of 600 insufficient provisioned throughput on the shard involved in 601 the request, `PutRecord` throws 602 `ProvisionedThroughputExceededException`. 603 604 Data records are accessible for only 24 hours from the time 605 that they are added to an Amazon Kinesis stream. 606 607 :type stream_name: string 608 :param stream_name: The name of the stream to put the data record into. 609 610 :type data: blob 611 :param data: The data blob to put into the record, which is 612 Base64-encoded when the blob is serialized. 613 The maximum size of the data blob (the payload after 614 Base64-decoding) is 50 kilobytes (KB) 615 Set `b64_encode` to disable automatic Base64 encoding. 616 617 :type partition_key: string 618 :param partition_key: Determines which shard in the stream the data 619 record is assigned to. Partition keys are Unicode strings with a 620 maximum length limit of 256 bytes. Amazon Kinesis uses the 621 partition key as input to a hash function that maps the partition 622 key and associated data to a specific shard. Specifically, an MD5 623 hash function is used to map partition keys to 128-bit integer 624 values and to map associated data records to shards. As a result of 625 this hashing mechanism, all data records with the same partition 626 key will map to the same shard within the stream. 627 628 :type explicit_hash_key: string 629 :param explicit_hash_key: The hash value used to explicitly determine 630 the shard the data record is assigned to by overriding the 631 partition key hash. 632 633 :type sequence_number_for_ordering: string 634 :param sequence_number_for_ordering: Guarantees strictly increasing 635 sequence numbers, for puts from the same client and to the same 636 partition key. Usage: set the `SequenceNumberForOrdering` of record 637 n to the sequence number of record n-1 (as returned in the 638 PutRecordResult when putting record n-1 ). If this parameter is not 639 set, records will be coarsely ordered based on arrival time. 640 641 :type b64_encode: boolean 642 :param b64_encode: Whether to Base64 encode `data`. Can be set to 643 ``False`` if `data` is already encoded to prevent double encoding. 644 645 """ 646 params = { 647 'StreamName': stream_name, 648 'Data': data, 649 'PartitionKey': partition_key, 650 } 651 if explicit_hash_key is not None: 652 params['ExplicitHashKey'] = explicit_hash_key 653 if sequence_number_for_ordering is not None: 654 params['SequenceNumberForOrdering'] = sequence_number_for_ordering 655 if b64_encode: 656 if not isinstance(params['Data'], six.binary_type): 657 params['Data'] = params['Data'].encode('utf-8') 658 params['Data'] = base64.b64encode(params['Data']).decode('utf-8') 659 return self.make_request(action='PutRecord', 660 body=json.dumps(params)) 661 662 def put_records(self, records, stream_name, b64_encode=True): 663 """ 664 Puts (writes) multiple data records from a producer into an 665 Amazon Kinesis stream in a single call (also referred to as a 666 `PutRecords` request). Use this operation to send data from a 667 data producer into the Amazon Kinesis stream for real-time 668 ingestion and processing. Each shard can support up to 1000 669 records written per second, up to a maximum total of 1 MB data 670 written per second. 671 672 You must specify the name of the stream that captures, stores, 673 and transports the data; and an array of request `Records`, 674 with each record in the array requiring a partition key and 675 data blob. 676 677 The data blob can be any type of data; for example, a segment 678 from a log file, geographic/location data, website clickstream 679 data, and so on. 680 681 The partition key is used by Amazon Kinesis as input to a hash 682 function that maps the partition key and associated data to a 683 specific shard. An MD5 hash function is used to map partition 684 keys to 128-bit integer values and to map associated data 685 records to shards. As a result of this hashing mechanism, all 686 data records with the same partition key map to the same shard 687 within the stream. For more information, see `Partition Key`_ 688 in the Amazon Kinesis Developer Guide . 689 690 Each record in the `Records` array may include an optional 691 parameter, `ExplicitHashKey`, which overrides the partition 692 key to shard mapping. This parameter allows a data producer to 693 determine explicitly the shard where the record is stored. For 694 more information, see `Adding Multiple Records with 695 PutRecords`_ in the Amazon Kinesis Developer Guide . 696 697 The `PutRecords` response includes an array of response 698 `Records`. Each record in the response array directly 699 correlates with a record in the request array using natural 700 ordering, from the top to the bottom of the request and 701 response. The response `Records` array always includes the 702 same number of records as the request array. 703 704 The response `Records` array includes both successfully and 705 unsuccessfully processed records. Amazon Kinesis attempts to 706 process all records in each `PutRecords` request. A single 707 record failure does not stop the processing of subsequent 708 records. 709 710 A successfully-processed record includes `ShardId` and 711 `SequenceNumber` values. The `ShardId` parameter identifies 712 the shard in the stream where the record is stored. The 713 `SequenceNumber` parameter is an identifier assigned to the 714 put record, unique to all records in the stream. 715 716 An unsuccessfully-processed record includes `ErrorCode` and 717 `ErrorMessage` values. `ErrorCode` reflects the type of error 718 and can be one of the following values: 719 `ProvisionedThroughputExceededException` or `InternalFailure`. 720 `ErrorMessage` provides more detailed information about the 721 `ProvisionedThroughputExceededException` exception including 722 the account ID, stream name, and shard ID of the record that 723 was throttled. 724 725 Data records are accessible for only 24 hours from the time 726 that they are added to an Amazon Kinesis stream. 727 728 :type records: list 729 :param records: The records associated with the request. 730 731 :type stream_name: string 732 :param stream_name: The stream name associated with the request. 733 734 :type b64_encode: boolean 735 :param b64_encode: Whether to Base64 encode `data`. Can be set to 736 ``False`` if `data` is already encoded to prevent double encoding. 737 738 """ 739 params = {'Records': records, 'StreamName': stream_name, } 740 if b64_encode: 741 for i in range(len(params['Records'])): 742 data = params['Records'][i]['Data'] 743 if not isinstance(data, six.binary_type): 744 data = data.encode('utf-8') 745 params['Records'][i]['Data'] = base64.b64encode( 746 data).decode('utf-8') 747 return self.make_request(action='PutRecords', 748 body=json.dumps(params)) 749 750 def remove_tags_from_stream(self, stream_name, tag_keys): 751 """ 752 Deletes tags from the specified Amazon Kinesis stream. 753 754 If you specify a tag that does not exist, it is ignored. 755 756 :type stream_name: string 757 :param stream_name: The name of the stream. 758 759 :type tag_keys: list 760 :param tag_keys: A list of tag keys. Each corresponding tag is removed 761 from the stream. 762 763 """ 764 params = {'StreamName': stream_name, 'TagKeys': tag_keys, } 765 return self.make_request(action='RemoveTagsFromStream', 766 body=json.dumps(params)) 767 768 def split_shard(self, stream_name, shard_to_split, new_starting_hash_key): 769 """ 770 Splits a shard into two new shards in the stream, to increase 771 the stream's capacity to ingest and transport data. 772 `SplitShard` is called when there is a need to increase the 773 overall capacity of stream because of an expected increase in 774 the volume of data records being ingested. 775 776 You can also use `SplitShard` when a shard appears to be 777 approaching its maximum utilization, for example, when the set 778 of producers sending data into the specific shard are suddenly 779 sending more than previously anticipated. You can also call 780 `SplitShard` to increase stream capacity, so that more Amazon 781 Kinesis applications can simultaneously read data from the 782 stream for real-time processing. 783 784 You must specify the shard to be split and the new hash key, 785 which is the position in the shard where the shard gets split 786 in two. In many cases, the new hash key might simply be the 787 average of the beginning and ending hash key, but it can be 788 any hash key value in the range being mapped into the shard. 789 For more information about splitting shards, see `Split a 790 Shard`_ in the Amazon Kinesis Developer Guide . 791 792 You can use DescribeStream to determine the shard ID and hash 793 key values for the `ShardToSplit` and `NewStartingHashKey` 794 parameters that are specified in the `SplitShard` request. 795 796 `SplitShard` is an asynchronous operation. Upon receiving a 797 `SplitShard` request, Amazon Kinesis immediately returns a 798 response and sets the stream status to `UPDATING`. After the 799 operation is completed, Amazon Kinesis sets the stream status 800 to `ACTIVE`. Read and write operations continue to work while 801 the stream is in the `UPDATING` state. 802 803 You can use `DescribeStream` to check the status of the 804 stream, which is returned in `StreamStatus`. If the stream is 805 in the `ACTIVE` state, you can call `SplitShard`. If a stream 806 is in `CREATING` or `UPDATING` or `DELETING` states, 807 `DescribeStream` returns a `ResourceInUseException`. 808 809 If the specified stream does not exist, `DescribeStream` 810 returns a `ResourceNotFoundException`. If you try to create 811 more shards than are authorized for your account, you receive 812 a `LimitExceededException`. 813 814 The default limit for an AWS account is 10 shards per stream. 815 If you need to create a stream with more than 10 shards, 816 `contact AWS Support`_ to increase the limit on your account. 817 818 If you try to operate on too many streams in parallel using 819 CreateStream, DeleteStream, MergeShards or SplitShard, you 820 receive a `LimitExceededException`. 821 822 `SplitShard` has limit of 5 transactions per second per 823 account. 824 825 :type stream_name: string 826 :param stream_name: The name of the stream for the shard split. 827 828 :type shard_to_split: string 829 :param shard_to_split: The shard ID of the shard to split. 830 831 :type new_starting_hash_key: string 832 :param new_starting_hash_key: A hash key value for the starting hash 833 key of one of the child shards created by the split. The hash key 834 range for a given shard constitutes a set of ordered contiguous 835 positive integers. The value for `NewStartingHashKey` must be in 836 the range of hash keys being mapped into the shard. The 837 `NewStartingHashKey` hash key value and all higher hash key values 838 in hash key range are distributed to one of the child shards. All 839 the lower hash key values in the range are distributed to the other 840 child shard. 841 842 """ 843 params = { 844 'StreamName': stream_name, 845 'ShardToSplit': shard_to_split, 846 'NewStartingHashKey': new_starting_hash_key, 847 } 848 return self.make_request(action='SplitShard', 849 body=json.dumps(params)) 850 851 def make_request(self, action, body): 852 headers = { 853 'X-Amz-Target': '%s.%s' % (self.TargetPrefix, action), 854 'Host': self.region.endpoint, 855 'Content-Type': 'application/x-amz-json-1.1', 856 'Content-Length': str(len(body)), 857 } 858 http_request = self.build_base_http_request( 859 method='POST', path='/', auth_path='/', params={}, 860 headers=headers, data=body) 861 response = self._mexe(http_request, sender=None, 862 override_num_retries=10) 863 response_body = response.read().decode('utf-8') 864 boto.log.debug(response.getheaders()) 865 boto.log.debug(response_body) 866 if response.status == 200: 867 if response_body: 868 return json.loads(response_body) 869 else: 870 json_body = json.loads(response_body) 871 fault_name = json_body.get('__type', None) 872 exception_class = self._faults.get(fault_name, self.ResponseError) 873 raise exception_class(response.status, response.reason, 874 body=json_body) 875 876