1# Copyright (c) 2010 Spotify AB 2# Copyright (c) 2010-2011 Yelp 3# 4# Permission is hereby granted, free of charge, to any person obtaining a 5# copy of this software and associated documentation files (the 6# "Software"), to deal in the Software without restriction, including 7# without limitation the rights to use, copy, modify, merge, publish, dis- 8# tribute, sublicense, and/or sell copies of the Software, and to permit 9# persons to whom the Software is furnished to do so, subject to the fol- 10# lowing conditions: 11# 12# The above copyright notice and this permission notice shall be included 13# in all copies or substantial portions of the Software. 14# 15# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS 16# OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABIL- 17# ITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT 18# SHALL THE AUTHOR BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, 19# WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, 20# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS 21# IN THE SOFTWARE. 22 23""" 24Represents a connection to the EMR service 25""" 26import types 27 28import boto 29import boto.utils 30from boto.ec2.regioninfo import RegionInfo 31from boto.emr.emrobject import AddInstanceGroupsResponse, BootstrapActionList, \ 32 Cluster, ClusterSummaryList, HadoopStep, \ 33 InstanceGroupList, InstanceList, JobFlow, \ 34 JobFlowStepList, \ 35 ModifyInstanceGroupsResponse, \ 36 RunJobFlowResponse, StepSummaryList 37from boto.emr.step import JarStep 38from boto.connection import AWSQueryConnection 39from boto.exception import EmrResponseError 40from boto.compat import six 41 42 43class EmrConnection(AWSQueryConnection): 44 45 APIVersion = boto.config.get('Boto', 'emr_version', '2009-03-31') 46 DefaultRegionName = boto.config.get('Boto', 'emr_region_name', 'us-east-1') 47 DefaultRegionEndpoint = boto.config.get('Boto', 'emr_region_endpoint', 48 'elasticmapreduce.us-east-1.amazonaws.com') 49 ResponseError = EmrResponseError 50 51 # Constants for AWS Console debugging 52 DebuggingJar = 's3n://us-east-1.elasticmapreduce/libs/script-runner/script-runner.jar' 53 DebuggingArgs = 's3n://us-east-1.elasticmapreduce/libs/state-pusher/0.1/fetch' 54 55 def __init__(self, aws_access_key_id=None, aws_secret_access_key=None, 56 is_secure=True, port=None, proxy=None, proxy_port=None, 57 proxy_user=None, proxy_pass=None, debug=0, 58 https_connection_factory=None, region=None, path='/', 59 security_token=None, validate_certs=True, profile_name=None): 60 if not region: 61 region = RegionInfo(self, self.DefaultRegionName, 62 self.DefaultRegionEndpoint) 63 self.region = region 64 super(EmrConnection, self).__init__(aws_access_key_id, 65 aws_secret_access_key, 66 is_secure, port, proxy, proxy_port, 67 proxy_user, proxy_pass, 68 self.region.endpoint, debug, 69 https_connection_factory, path, 70 security_token, 71 validate_certs=validate_certs, 72 profile_name=profile_name) 73 # Many of the EMR hostnames are of the form: 74 # <region>.<service_name>.amazonaws.com 75 # rather than the more common: 76 # <service_name>.<region>.amazonaws.com 77 # so we need to explicitly set the region_name and service_name 78 # for the SigV4 signing. 79 self.auth_region_name = self.region.name 80 self.auth_service_name = 'elasticmapreduce' 81 82 def _required_auth_capability(self): 83 return ['hmac-v4'] 84 85 def describe_cluster(self, cluster_id): 86 """ 87 Describes an Elastic MapReduce cluster 88 89 :type cluster_id: str 90 :param cluster_id: The cluster id of interest 91 """ 92 params = { 93 'ClusterId': cluster_id 94 } 95 return self.get_object('DescribeCluster', params, Cluster) 96 97 def describe_jobflow(self, jobflow_id): 98 """ 99 Describes a single Elastic MapReduce job flow 100 101 :type jobflow_id: str 102 :param jobflow_id: The job flow id of interest 103 """ 104 jobflows = self.describe_jobflows(jobflow_ids=[jobflow_id]) 105 if jobflows: 106 return jobflows[0] 107 108 def describe_jobflows(self, states=None, jobflow_ids=None, 109 created_after=None, created_before=None): 110 """ 111 Retrieve all the Elastic MapReduce job flows on your account 112 113 :type states: list 114 :param states: A list of strings with job flow states wanted 115 116 :type jobflow_ids: list 117 :param jobflow_ids: A list of job flow IDs 118 :type created_after: datetime 119 :param created_after: Bound on job flow creation time 120 121 :type created_before: datetime 122 :param created_before: Bound on job flow creation time 123 """ 124 params = {} 125 126 if states: 127 self.build_list_params(params, states, 'JobFlowStates.member') 128 if jobflow_ids: 129 self.build_list_params(params, jobflow_ids, 'JobFlowIds.member') 130 if created_after: 131 params['CreatedAfter'] = created_after.strftime( 132 boto.utils.ISO8601) 133 if created_before: 134 params['CreatedBefore'] = created_before.strftime( 135 boto.utils.ISO8601) 136 137 return self.get_list('DescribeJobFlows', params, [('member', JobFlow)]) 138 139 def describe_step(self, cluster_id, step_id): 140 """ 141 Describe an Elastic MapReduce step 142 143 :type cluster_id: str 144 :param cluster_id: The cluster id of interest 145 :type step_id: str 146 :param step_id: The step id of interest 147 """ 148 params = { 149 'ClusterId': cluster_id, 150 'StepId': step_id 151 } 152 153 return self.get_object('DescribeStep', params, HadoopStep) 154 155 def list_bootstrap_actions(self, cluster_id, marker=None): 156 """ 157 Get a list of bootstrap actions for an Elastic MapReduce cluster 158 159 :type cluster_id: str 160 :param cluster_id: The cluster id of interest 161 :type marker: str 162 :param marker: Pagination marker 163 """ 164 params = { 165 'ClusterId': cluster_id 166 } 167 168 if marker: 169 params['Marker'] = marker 170 171 return self.get_object('ListBootstrapActions', params, BootstrapActionList) 172 173 def list_clusters(self, created_after=None, created_before=None, 174 cluster_states=None, marker=None): 175 """ 176 List Elastic MapReduce clusters with optional filtering 177 178 :type created_after: datetime 179 :param created_after: Bound on cluster creation time 180 :type created_before: datetime 181 :param created_before: Bound on cluster creation time 182 :type cluster_states: list 183 :param cluster_states: Bound on cluster states 184 :type marker: str 185 :param marker: Pagination marker 186 """ 187 params = {} 188 if created_after: 189 params['CreatedAfter'] = created_after.strftime( 190 boto.utils.ISO8601) 191 if created_before: 192 params['CreatedBefore'] = created_before.strftime( 193 boto.utils.ISO8601) 194 if marker: 195 params['Marker'] = marker 196 197 if cluster_states: 198 self.build_list_params(params, cluster_states, 'ClusterStates.member') 199 200 return self.get_object('ListClusters', params, ClusterSummaryList) 201 202 def list_instance_groups(self, cluster_id, marker=None): 203 """ 204 List EC2 instance groups in a cluster 205 206 :type cluster_id: str 207 :param cluster_id: The cluster id of interest 208 :type marker: str 209 :param marker: Pagination marker 210 """ 211 params = { 212 'ClusterId': cluster_id 213 } 214 215 if marker: 216 params['Marker'] = marker 217 218 return self.get_object('ListInstanceGroups', params, InstanceGroupList) 219 220 def list_instances(self, cluster_id, instance_group_id=None, 221 instance_group_types=None, marker=None): 222 """ 223 List EC2 instances in a cluster 224 225 :type cluster_id: str 226 :param cluster_id: The cluster id of interest 227 :type instance_group_id: str 228 :param instance_group_id: The EC2 instance group id of interest 229 :type instance_group_types: list 230 :param instance_group_types: Filter by EC2 instance group type 231 :type marker: str 232 :param marker: Pagination marker 233 """ 234 params = { 235 'ClusterId': cluster_id 236 } 237 238 if instance_group_id: 239 params['InstanceGroupId'] = instance_group_id 240 if marker: 241 params['Marker'] = marker 242 243 if instance_group_types: 244 self.build_list_params(params, instance_group_types, 245 'InstanceGroupTypeList.member') 246 247 return self.get_object('ListInstances', params, InstanceList) 248 249 def list_steps(self, cluster_id, step_states=None, marker=None): 250 """ 251 List cluster steps 252 253 :type cluster_id: str 254 :param cluster_id: The cluster id of interest 255 :type step_states: list 256 :param step_states: Filter by step states 257 :type marker: str 258 :param marker: Pagination marker 259 """ 260 params = { 261 'ClusterId': cluster_id 262 } 263 264 if marker: 265 params['Marker'] = marker 266 267 if step_states: 268 self.build_list_params(params, step_states, 'StepStateList.member') 269 270 return self.get_object('ListSteps', params, StepSummaryList) 271 272 def add_tags(self, resource_id, tags): 273 """ 274 Create new metadata tags for the specified resource id. 275 276 :type resource_id: str 277 :param resource_id: The cluster id 278 279 :type tags: dict 280 :param tags: A dictionary containing the name/value pairs. 281 If you want to create only a tag name, the 282 value for that tag should be the empty string 283 (e.g. '') or None. 284 """ 285 assert isinstance(resource_id, six.string_types) 286 params = { 287 'ResourceId': resource_id, 288 } 289 params.update(self._build_tag_list(tags)) 290 return self.get_status('AddTags', params, verb='POST') 291 292 def remove_tags(self, resource_id, tags): 293 """ 294 Remove metadata tags for the specified resource id. 295 296 :type resource_id: str 297 :param resource_id: The cluster id 298 299 :type tags: list 300 :param tags: A list of tag names to remove. 301 """ 302 params = { 303 'ResourceId': resource_id, 304 } 305 params.update(self._build_string_list('TagKeys', tags)) 306 return self.get_status('RemoveTags', params, verb='POST') 307 308 def terminate_jobflow(self, jobflow_id): 309 """ 310 Terminate an Elastic MapReduce job flow 311 312 :type jobflow_id: str 313 :param jobflow_id: A jobflow id 314 """ 315 self.terminate_jobflows([jobflow_id]) 316 317 def terminate_jobflows(self, jobflow_ids): 318 """ 319 Terminate an Elastic MapReduce job flow 320 321 :type jobflow_ids: list 322 :param jobflow_ids: A list of job flow IDs 323 """ 324 params = {} 325 self.build_list_params(params, jobflow_ids, 'JobFlowIds.member') 326 return self.get_status('TerminateJobFlows', params, verb='POST') 327 328 def add_jobflow_steps(self, jobflow_id, steps): 329 """ 330 Adds steps to a jobflow 331 332 :type jobflow_id: str 333 :param jobflow_id: The job flow id 334 :type steps: list(boto.emr.Step) 335 :param steps: A list of steps to add to the job 336 """ 337 if not isinstance(steps, list): 338 steps = [steps] 339 params = {} 340 params['JobFlowId'] = jobflow_id 341 342 # Step args 343 step_args = [self._build_step_args(step) for step in steps] 344 params.update(self._build_step_list(step_args)) 345 346 return self.get_object( 347 'AddJobFlowSteps', params, JobFlowStepList, verb='POST') 348 349 def add_instance_groups(self, jobflow_id, instance_groups): 350 """ 351 Adds instance groups to a running cluster. 352 353 :type jobflow_id: str 354 :param jobflow_id: The id of the jobflow which will take the 355 new instance groups 356 357 :type instance_groups: list(boto.emr.InstanceGroup) 358 :param instance_groups: A list of instance groups to add to the job 359 """ 360 if not isinstance(instance_groups, list): 361 instance_groups = [instance_groups] 362 params = {} 363 params['JobFlowId'] = jobflow_id 364 params.update(self._build_instance_group_list_args(instance_groups)) 365 366 return self.get_object('AddInstanceGroups', params, 367 AddInstanceGroupsResponse, verb='POST') 368 369 def modify_instance_groups(self, instance_group_ids, new_sizes): 370 """ 371 Modify the number of nodes and configuration settings in an 372 instance group. 373 374 :type instance_group_ids: list(str) 375 :param instance_group_ids: A list of the ID's of the instance 376 groups to be modified 377 378 :type new_sizes: list(int) 379 :param new_sizes: A list of the new sizes for each instance group 380 """ 381 if not isinstance(instance_group_ids, list): 382 instance_group_ids = [instance_group_ids] 383 if not isinstance(new_sizes, list): 384 new_sizes = [new_sizes] 385 386 instance_groups = zip(instance_group_ids, new_sizes) 387 388 params = {} 389 for k, ig in enumerate(instance_groups): 390 # could be wrong - the example amazon gives uses 391 # InstanceRequestCount, while the api documentation 392 # says InstanceCount 393 params['InstanceGroups.member.%d.InstanceGroupId' % (k+1) ] = ig[0] 394 params['InstanceGroups.member.%d.InstanceCount' % (k+1) ] = ig[1] 395 396 return self.get_object('ModifyInstanceGroups', params, 397 ModifyInstanceGroupsResponse, verb='POST') 398 399 def run_jobflow(self, name, log_uri=None, ec2_keyname=None, 400 availability_zone=None, 401 master_instance_type='m1.small', 402 slave_instance_type='m1.small', num_instances=1, 403 action_on_failure='TERMINATE_JOB_FLOW', keep_alive=False, 404 enable_debugging=False, 405 hadoop_version=None, 406 steps=[], 407 bootstrap_actions=[], 408 instance_groups=None, 409 additional_info=None, 410 ami_version=None, 411 api_params=None, 412 visible_to_all_users=None, 413 job_flow_role=None, 414 service_role=None): 415 """ 416 Runs a job flow 417 :type name: str 418 :param name: Name of the job flow 419 420 :type log_uri: str 421 :param log_uri: URI of the S3 bucket to place logs 422 423 :type ec2_keyname: str 424 :param ec2_keyname: EC2 key used for the instances 425 426 :type availability_zone: str 427 :param availability_zone: EC2 availability zone of the cluster 428 429 :type master_instance_type: str 430 :param master_instance_type: EC2 instance type of the master 431 432 :type slave_instance_type: str 433 :param slave_instance_type: EC2 instance type of the slave nodes 434 435 :type num_instances: int 436 :param num_instances: Number of instances in the Hadoop cluster 437 438 :type action_on_failure: str 439 :param action_on_failure: Action to take if a step terminates 440 441 :type keep_alive: bool 442 :param keep_alive: Denotes whether the cluster should stay 443 alive upon completion 444 445 :type enable_debugging: bool 446 :param enable_debugging: Denotes whether AWS console debugging 447 should be enabled. 448 449 :type hadoop_version: str 450 :param hadoop_version: Version of Hadoop to use. This no longer 451 defaults to '0.20' and now uses the AMI default. 452 453 :type steps: list(boto.emr.Step) 454 :param steps: List of steps to add with the job 455 456 :type bootstrap_actions: list(boto.emr.BootstrapAction) 457 :param bootstrap_actions: List of bootstrap actions that run 458 before Hadoop starts. 459 460 :type instance_groups: list(boto.emr.InstanceGroup) 461 :param instance_groups: Optional list of instance groups to 462 use when creating this job. 463 NB: When provided, this argument supersedes num_instances 464 and master/slave_instance_type. 465 466 :type ami_version: str 467 :param ami_version: Amazon Machine Image (AMI) version to use 468 for instances. Values accepted by EMR are '1.0', '2.0', and 469 'latest'; EMR currently defaults to '1.0' if you don't set 470 'ami_version'. 471 472 :type additional_info: JSON str 473 :param additional_info: A JSON string for selecting additional features 474 475 :type api_params: dict 476 :param api_params: a dictionary of additional parameters to pass 477 directly to the EMR API (so you don't have to upgrade boto to 478 use new EMR features). You can also delete an API parameter 479 by setting it to None. 480 481 :type visible_to_all_users: bool 482 :param visible_to_all_users: Whether the job flow is visible to all IAM 483 users of the AWS account associated with the job flow. If this 484 value is set to ``True``, all IAM users of that AWS 485 account can view and (if they have the proper policy permissions 486 set) manage the job flow. If it is set to ``False``, only 487 the IAM user that created the job flow can view and manage 488 it. 489 490 :type job_flow_role: str 491 :param job_flow_role: An IAM role for the job flow. The EC2 492 instances of the job flow assume this role. The default role is 493 ``EMRJobflowDefault``. In order to use the default role, 494 you must have already created it using the CLI. 495 496 :type service_role: str 497 :param service_role: The IAM role that will be assumed by the Amazon 498 EMR service to access AWS resources on your behalf. 499 500 :rtype: str 501 :return: The jobflow id 502 """ 503 params = {} 504 if action_on_failure: 505 params['ActionOnFailure'] = action_on_failure 506 if log_uri: 507 params['LogUri'] = log_uri 508 params['Name'] = name 509 510 # Common instance args 511 common_params = self._build_instance_common_args(ec2_keyname, 512 availability_zone, 513 keep_alive, 514 hadoop_version) 515 params.update(common_params) 516 517 # NB: according to the AWS API's error message, we must 518 # "configure instances either using instance count, master and 519 # slave instance type or instance groups but not both." 520 # 521 # Thus we switch here on the truthiness of instance_groups. 522 if not instance_groups: 523 # Instance args (the common case) 524 instance_params = self._build_instance_count_and_type_args( 525 master_instance_type, 526 slave_instance_type, 527 num_instances) 528 params.update(instance_params) 529 else: 530 # Instance group args (for spot instances or a heterogenous cluster) 531 list_args = self._build_instance_group_list_args(instance_groups) 532 instance_params = dict( 533 ('Instances.%s' % k, v) for k, v in six.iteritems(list_args) 534 ) 535 params.update(instance_params) 536 537 # Debugging step from EMR API docs 538 if enable_debugging: 539 debugging_step = JarStep(name='Setup Hadoop Debugging', 540 action_on_failure='TERMINATE_JOB_FLOW', 541 main_class=None, 542 jar=self.DebuggingJar, 543 step_args=self.DebuggingArgs) 544 steps.insert(0, debugging_step) 545 546 # Step args 547 if steps: 548 step_args = [self._build_step_args(step) for step in steps] 549 params.update(self._build_step_list(step_args)) 550 551 if bootstrap_actions: 552 bootstrap_action_args = [self._build_bootstrap_action_args(bootstrap_action) for bootstrap_action in bootstrap_actions] 553 params.update(self._build_bootstrap_action_list(bootstrap_action_args)) 554 555 if ami_version: 556 params['AmiVersion'] = ami_version 557 558 if additional_info is not None: 559 params['AdditionalInfo'] = additional_info 560 561 if api_params: 562 for key, value in six.iteritems(api_params): 563 if value is None: 564 params.pop(key, None) 565 else: 566 params[key] = value 567 568 if visible_to_all_users is not None: 569 if visible_to_all_users: 570 params['VisibleToAllUsers'] = 'true' 571 else: 572 params['VisibleToAllUsers'] = 'false' 573 574 if job_flow_role is not None: 575 params['JobFlowRole'] = job_flow_role 576 577 if service_role is not None: 578 params['ServiceRole'] = service_role 579 580 response = self.get_object( 581 'RunJobFlow', params, RunJobFlowResponse, verb='POST') 582 return response.jobflowid 583 584 def set_termination_protection(self, jobflow_id, 585 termination_protection_status): 586 """ 587 Set termination protection on specified Elastic MapReduce job flows 588 589 :type jobflow_ids: list or str 590 :param jobflow_ids: A list of job flow IDs 591 592 :type termination_protection_status: bool 593 :param termination_protection_status: Termination protection status 594 """ 595 assert termination_protection_status in (True, False) 596 597 params = {} 598 params['TerminationProtected'] = (termination_protection_status and "true") or "false" 599 self.build_list_params(params, [jobflow_id], 'JobFlowIds.member') 600 601 return self.get_status('SetTerminationProtection', params, verb='POST') 602 603 def set_visible_to_all_users(self, jobflow_id, visibility): 604 """ 605 Set whether specified Elastic Map Reduce job flows are visible to all IAM users 606 607 :type jobflow_ids: list or str 608 :param jobflow_ids: A list of job flow IDs 609 610 :type visibility: bool 611 :param visibility: Visibility 612 """ 613 assert visibility in (True, False) 614 615 params = {} 616 params['VisibleToAllUsers'] = (visibility and "true") or "false" 617 self.build_list_params(params, [jobflow_id], 'JobFlowIds.member') 618 619 return self.get_status('SetVisibleToAllUsers', params, verb='POST') 620 621 def _build_bootstrap_action_args(self, bootstrap_action): 622 bootstrap_action_params = {} 623 bootstrap_action_params['ScriptBootstrapAction.Path'] = bootstrap_action.path 624 625 try: 626 bootstrap_action_params['Name'] = bootstrap_action.name 627 except AttributeError: 628 pass 629 630 args = bootstrap_action.args() 631 if args: 632 self.build_list_params(bootstrap_action_params, args, 'ScriptBootstrapAction.Args.member') 633 634 return bootstrap_action_params 635 636 def _build_step_args(self, step): 637 step_params = {} 638 step_params['ActionOnFailure'] = step.action_on_failure 639 step_params['HadoopJarStep.Jar'] = step.jar() 640 641 main_class = step.main_class() 642 if main_class: 643 step_params['HadoopJarStep.MainClass'] = main_class 644 645 args = step.args() 646 if args: 647 self.build_list_params(step_params, args, 'HadoopJarStep.Args.member') 648 649 step_params['Name'] = step.name 650 return step_params 651 652 def _build_bootstrap_action_list(self, bootstrap_actions): 653 if not isinstance(bootstrap_actions, list): 654 bootstrap_actions = [bootstrap_actions] 655 656 params = {} 657 for i, bootstrap_action in enumerate(bootstrap_actions): 658 for key, value in six.iteritems(bootstrap_action): 659 params['BootstrapActions.member.%s.%s' % (i + 1, key)] = value 660 return params 661 662 def _build_step_list(self, steps): 663 if not isinstance(steps, list): 664 steps = [steps] 665 666 params = {} 667 for i, step in enumerate(steps): 668 for key, value in six.iteritems(step): 669 params['Steps.member.%s.%s' % (i+1, key)] = value 670 return params 671 672 def _build_string_list(self, field, items): 673 if not isinstance(items, list): 674 items = [items] 675 676 params = {} 677 for i, item in enumerate(items): 678 params['%s.member.%s' % (field, i + 1)] = item 679 return params 680 681 def _build_tag_list(self, tags): 682 assert isinstance(tags, dict) 683 684 params = {} 685 for i, key_value in enumerate(sorted(six.iteritems(tags)), start=1): 686 key, value = key_value 687 current_prefix = 'Tags.member.%s' % i 688 params['%s.Key' % current_prefix] = key 689 if value: 690 params['%s.Value' % current_prefix] = value 691 return params 692 693 def _build_instance_common_args(self, ec2_keyname, availability_zone, 694 keep_alive, hadoop_version): 695 """ 696 Takes a number of parameters used when starting a jobflow (as 697 specified in run_jobflow() above). Returns a comparable dict for 698 use in making a RunJobFlow request. 699 """ 700 params = { 701 'Instances.KeepJobFlowAliveWhenNoSteps': str(keep_alive).lower(), 702 } 703 704 if hadoop_version: 705 params['Instances.HadoopVersion'] = hadoop_version 706 if ec2_keyname: 707 params['Instances.Ec2KeyName'] = ec2_keyname 708 if availability_zone: 709 params['Instances.Placement.AvailabilityZone'] = availability_zone 710 711 return params 712 713 def _build_instance_count_and_type_args(self, master_instance_type, 714 slave_instance_type, num_instances): 715 """ 716 Takes a master instance type (string), a slave instance type 717 (string), and a number of instances. Returns a comparable dict 718 for use in making a RunJobFlow request. 719 """ 720 params = {'Instances.MasterInstanceType': master_instance_type, 721 'Instances.SlaveInstanceType': slave_instance_type, 722 'Instances.InstanceCount': num_instances} 723 return params 724 725 def _build_instance_group_args(self, instance_group): 726 """ 727 Takes an InstanceGroup; returns a dict that, when its keys are 728 properly prefixed, can be used for describing InstanceGroups in 729 RunJobFlow or AddInstanceGroups requests. 730 """ 731 params = {'InstanceCount': instance_group.num_instances, 732 'InstanceRole': instance_group.role, 733 'InstanceType': instance_group.type, 734 'Name': instance_group.name, 735 'Market': instance_group.market} 736 if instance_group.market == 'SPOT': 737 params['BidPrice'] = instance_group.bidprice 738 return params 739 740 def _build_instance_group_list_args(self, instance_groups): 741 """ 742 Takes a list of InstanceGroups, or a single InstanceGroup. Returns 743 a comparable dict for use in making a RunJobFlow or AddInstanceGroups 744 request. 745 """ 746 if not isinstance(instance_groups, list): 747 instance_groups = [instance_groups] 748 749 params = {} 750 for i, instance_group in enumerate(instance_groups): 751 ig_dict = self._build_instance_group_args(instance_group) 752 for key, value in six.iteritems(ig_dict): 753 params['InstanceGroups.member.%d.%s' % (i+1, key)] = value 754 return params 755