1# Copyright (c) 2010 Spotify AB
2# Copyright (c) 2010-2011 Yelp
3#
4# Permission is hereby granted, free of charge, to any person obtaining a
5# copy of this software and associated documentation files (the
6# "Software"), to deal in the Software without restriction, including
7# without limitation the rights to use, copy, modify, merge, publish, dis-
8# tribute, sublicense, and/or sell copies of the Software, and to permit
9# persons to whom the Software is furnished to do so, subject to the fol-
10# lowing conditions:
11#
12# The above copyright notice and this permission notice shall be included
13# in all copies or substantial portions of the Software.
14#
15# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS
16# OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABIL-
17# ITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT
18# SHALL THE AUTHOR BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY,
19# WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
20# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS
21# IN THE SOFTWARE.
22
23"""
24Represents a connection to the EMR service
25"""
26import types
27
28import boto
29import boto.utils
30from boto.ec2.regioninfo import RegionInfo
31from boto.emr.emrobject import AddInstanceGroupsResponse, BootstrapActionList, \
32                               Cluster, ClusterSummaryList, HadoopStep, \
33                               InstanceGroupList, InstanceList, JobFlow, \
34                               JobFlowStepList, \
35                               ModifyInstanceGroupsResponse, \
36                               RunJobFlowResponse, StepSummaryList
37from boto.emr.step import JarStep
38from boto.connection import AWSQueryConnection
39from boto.exception import EmrResponseError
40from boto.compat import six
41
42
43class EmrConnection(AWSQueryConnection):
44
45    APIVersion = boto.config.get('Boto', 'emr_version', '2009-03-31')
46    DefaultRegionName = boto.config.get('Boto', 'emr_region_name', 'us-east-1')
47    DefaultRegionEndpoint = boto.config.get('Boto', 'emr_region_endpoint',
48                                            'elasticmapreduce.us-east-1.amazonaws.com')
49    ResponseError = EmrResponseError
50
51    # Constants for AWS Console debugging
52    DebuggingJar = 's3n://us-east-1.elasticmapreduce/libs/script-runner/script-runner.jar'
53    DebuggingArgs = 's3n://us-east-1.elasticmapreduce/libs/state-pusher/0.1/fetch'
54
55    def __init__(self, aws_access_key_id=None, aws_secret_access_key=None,
56                 is_secure=True, port=None, proxy=None, proxy_port=None,
57                 proxy_user=None, proxy_pass=None, debug=0,
58                 https_connection_factory=None, region=None, path='/',
59                 security_token=None, validate_certs=True, profile_name=None):
60        if not region:
61            region = RegionInfo(self, self.DefaultRegionName,
62                                self.DefaultRegionEndpoint)
63        self.region = region
64        super(EmrConnection, self).__init__(aws_access_key_id,
65                                    aws_secret_access_key,
66                                    is_secure, port, proxy, proxy_port,
67                                    proxy_user, proxy_pass,
68                                    self.region.endpoint, debug,
69                                    https_connection_factory, path,
70                                    security_token,
71                                    validate_certs=validate_certs,
72                                    profile_name=profile_name)
73        # Many of the EMR hostnames are of the form:
74        #     <region>.<service_name>.amazonaws.com
75        # rather than the more common:
76        #     <service_name>.<region>.amazonaws.com
77        # so we need to explicitly set the region_name and service_name
78        # for the SigV4 signing.
79        self.auth_region_name = self.region.name
80        self.auth_service_name = 'elasticmapreduce'
81
82    def _required_auth_capability(self):
83        return ['hmac-v4']
84
85    def describe_cluster(self, cluster_id):
86        """
87        Describes an Elastic MapReduce cluster
88
89        :type cluster_id: str
90        :param cluster_id: The cluster id of interest
91        """
92        params = {
93            'ClusterId': cluster_id
94        }
95        return self.get_object('DescribeCluster', params, Cluster)
96
97    def describe_jobflow(self, jobflow_id):
98        """
99        Describes a single Elastic MapReduce job flow
100
101        :type jobflow_id: str
102        :param jobflow_id: The job flow id of interest
103        """
104        jobflows = self.describe_jobflows(jobflow_ids=[jobflow_id])
105        if jobflows:
106            return jobflows[0]
107
108    def describe_jobflows(self, states=None, jobflow_ids=None,
109                           created_after=None, created_before=None):
110        """
111        Retrieve all the Elastic MapReduce job flows on your account
112
113        :type states: list
114        :param states: A list of strings with job flow states wanted
115
116        :type jobflow_ids: list
117        :param jobflow_ids: A list of job flow IDs
118        :type created_after: datetime
119        :param created_after: Bound on job flow creation time
120
121        :type created_before: datetime
122        :param created_before: Bound on job flow creation time
123        """
124        params = {}
125
126        if states:
127            self.build_list_params(params, states, 'JobFlowStates.member')
128        if jobflow_ids:
129            self.build_list_params(params, jobflow_ids, 'JobFlowIds.member')
130        if created_after:
131            params['CreatedAfter'] = created_after.strftime(
132                boto.utils.ISO8601)
133        if created_before:
134            params['CreatedBefore'] = created_before.strftime(
135                boto.utils.ISO8601)
136
137        return self.get_list('DescribeJobFlows', params, [('member', JobFlow)])
138
139    def describe_step(self, cluster_id, step_id):
140        """
141        Describe an Elastic MapReduce step
142
143        :type cluster_id: str
144        :param cluster_id: The cluster id of interest
145        :type step_id: str
146        :param step_id: The step id of interest
147        """
148        params = {
149            'ClusterId': cluster_id,
150            'StepId': step_id
151        }
152
153        return self.get_object('DescribeStep', params, HadoopStep)
154
155    def list_bootstrap_actions(self, cluster_id, marker=None):
156        """
157        Get a list of bootstrap actions for an Elastic MapReduce cluster
158
159        :type cluster_id: str
160        :param cluster_id: The cluster id of interest
161        :type marker: str
162        :param marker: Pagination marker
163        """
164        params = {
165            'ClusterId': cluster_id
166        }
167
168        if marker:
169            params['Marker'] = marker
170
171        return self.get_object('ListBootstrapActions', params, BootstrapActionList)
172
173    def list_clusters(self, created_after=None, created_before=None,
174                      cluster_states=None, marker=None):
175        """
176        List Elastic MapReduce clusters with optional filtering
177
178        :type created_after: datetime
179        :param created_after: Bound on cluster creation time
180        :type created_before: datetime
181        :param created_before: Bound on cluster creation time
182        :type cluster_states: list
183        :param cluster_states: Bound on cluster states
184        :type marker: str
185        :param marker: Pagination marker
186        """
187        params = {}
188        if created_after:
189            params['CreatedAfter'] = created_after.strftime(
190                boto.utils.ISO8601)
191        if created_before:
192            params['CreatedBefore'] = created_before.strftime(
193                boto.utils.ISO8601)
194        if marker:
195            params['Marker'] = marker
196
197        if cluster_states:
198            self.build_list_params(params, cluster_states, 'ClusterStates.member')
199
200        return self.get_object('ListClusters', params, ClusterSummaryList)
201
202    def list_instance_groups(self, cluster_id, marker=None):
203        """
204        List EC2 instance groups in a cluster
205
206        :type cluster_id: str
207        :param cluster_id: The cluster id of interest
208        :type marker: str
209        :param marker: Pagination marker
210        """
211        params = {
212            'ClusterId': cluster_id
213        }
214
215        if marker:
216            params['Marker'] = marker
217
218        return self.get_object('ListInstanceGroups', params, InstanceGroupList)
219
220    def list_instances(self, cluster_id, instance_group_id=None,
221                       instance_group_types=None, marker=None):
222        """
223        List EC2 instances in a cluster
224
225        :type cluster_id: str
226        :param cluster_id: The cluster id of interest
227        :type instance_group_id: str
228        :param instance_group_id: The EC2 instance group id of interest
229        :type instance_group_types: list
230        :param instance_group_types: Filter by EC2 instance group type
231        :type marker: str
232        :param marker: Pagination marker
233        """
234        params = {
235            'ClusterId': cluster_id
236        }
237
238        if instance_group_id:
239            params['InstanceGroupId'] = instance_group_id
240        if marker:
241            params['Marker'] = marker
242
243        if instance_group_types:
244            self.build_list_params(params, instance_group_types,
245                                   'InstanceGroupTypeList.member')
246
247        return self.get_object('ListInstances', params, InstanceList)
248
249    def list_steps(self, cluster_id, step_states=None, marker=None):
250        """
251        List cluster steps
252
253        :type cluster_id: str
254        :param cluster_id: The cluster id of interest
255        :type step_states: list
256        :param step_states: Filter by step states
257        :type marker: str
258        :param marker: Pagination marker
259        """
260        params = {
261            'ClusterId': cluster_id
262        }
263
264        if marker:
265            params['Marker'] = marker
266
267        if step_states:
268            self.build_list_params(params, step_states, 'StepStateList.member')
269
270        return self.get_object('ListSteps', params, StepSummaryList)
271
272    def add_tags(self, resource_id, tags):
273        """
274        Create new metadata tags for the specified resource id.
275
276        :type resource_id: str
277        :param resource_id: The cluster id
278
279        :type tags: dict
280        :param tags: A dictionary containing the name/value pairs.
281                     If you want to create only a tag name, the
282                     value for that tag should be the empty string
283                     (e.g. '') or None.
284        """
285        assert isinstance(resource_id, six.string_types)
286        params = {
287            'ResourceId': resource_id,
288        }
289        params.update(self._build_tag_list(tags))
290        return self.get_status('AddTags', params, verb='POST')
291
292    def remove_tags(self, resource_id, tags):
293        """
294        Remove metadata tags for the specified resource id.
295
296        :type resource_id: str
297        :param resource_id: The cluster id
298
299        :type tags: list
300        :param tags: A list of tag names to remove.
301        """
302        params = {
303            'ResourceId': resource_id,
304        }
305        params.update(self._build_string_list('TagKeys', tags))
306        return self.get_status('RemoveTags', params, verb='POST')
307
308    def terminate_jobflow(self, jobflow_id):
309        """
310        Terminate an Elastic MapReduce job flow
311
312        :type jobflow_id: str
313        :param jobflow_id: A jobflow id
314        """
315        self.terminate_jobflows([jobflow_id])
316
317    def terminate_jobflows(self, jobflow_ids):
318        """
319        Terminate an Elastic MapReduce job flow
320
321        :type jobflow_ids: list
322        :param jobflow_ids: A list of job flow IDs
323        """
324        params = {}
325        self.build_list_params(params, jobflow_ids, 'JobFlowIds.member')
326        return self.get_status('TerminateJobFlows', params, verb='POST')
327
328    def add_jobflow_steps(self, jobflow_id, steps):
329        """
330        Adds steps to a jobflow
331
332        :type jobflow_id: str
333        :param jobflow_id: The job flow id
334        :type steps: list(boto.emr.Step)
335        :param steps: A list of steps to add to the job
336        """
337        if not isinstance(steps, list):
338            steps = [steps]
339        params = {}
340        params['JobFlowId'] = jobflow_id
341
342        # Step args
343        step_args = [self._build_step_args(step) for step in steps]
344        params.update(self._build_step_list(step_args))
345
346        return self.get_object(
347            'AddJobFlowSteps', params, JobFlowStepList, verb='POST')
348
349    def add_instance_groups(self, jobflow_id, instance_groups):
350        """
351        Adds instance groups to a running cluster.
352
353        :type jobflow_id: str
354        :param jobflow_id: The id of the jobflow which will take the
355            new instance groups
356
357        :type instance_groups: list(boto.emr.InstanceGroup)
358        :param instance_groups: A list of instance groups to add to the job
359        """
360        if not isinstance(instance_groups, list):
361            instance_groups = [instance_groups]
362        params = {}
363        params['JobFlowId'] = jobflow_id
364        params.update(self._build_instance_group_list_args(instance_groups))
365
366        return self.get_object('AddInstanceGroups', params,
367                               AddInstanceGroupsResponse, verb='POST')
368
369    def modify_instance_groups(self, instance_group_ids, new_sizes):
370        """
371        Modify the number of nodes and configuration settings in an
372        instance group.
373
374        :type instance_group_ids: list(str)
375        :param instance_group_ids: A list of the ID's of the instance
376            groups to be modified
377
378        :type new_sizes: list(int)
379        :param new_sizes: A list of the new sizes for each instance group
380        """
381        if not isinstance(instance_group_ids, list):
382            instance_group_ids = [instance_group_ids]
383        if not isinstance(new_sizes, list):
384            new_sizes = [new_sizes]
385
386        instance_groups = zip(instance_group_ids, new_sizes)
387
388        params = {}
389        for k, ig in enumerate(instance_groups):
390            # could be wrong - the example amazon gives uses
391            # InstanceRequestCount, while the api documentation
392            # says InstanceCount
393            params['InstanceGroups.member.%d.InstanceGroupId' % (k+1) ] = ig[0]
394            params['InstanceGroups.member.%d.InstanceCount' % (k+1) ] = ig[1]
395
396        return self.get_object('ModifyInstanceGroups', params,
397                               ModifyInstanceGroupsResponse, verb='POST')
398
399    def run_jobflow(self, name, log_uri=None, ec2_keyname=None,
400                    availability_zone=None,
401                    master_instance_type='m1.small',
402                    slave_instance_type='m1.small', num_instances=1,
403                    action_on_failure='TERMINATE_JOB_FLOW', keep_alive=False,
404                    enable_debugging=False,
405                    hadoop_version=None,
406                    steps=[],
407                    bootstrap_actions=[],
408                    instance_groups=None,
409                    additional_info=None,
410                    ami_version=None,
411                    api_params=None,
412                    visible_to_all_users=None,
413                    job_flow_role=None,
414                    service_role=None):
415        """
416        Runs a job flow
417        :type name: str
418        :param name: Name of the job flow
419
420        :type log_uri: str
421        :param log_uri: URI of the S3 bucket to place logs
422
423        :type ec2_keyname: str
424        :param ec2_keyname: EC2 key used for the instances
425
426        :type availability_zone: str
427        :param availability_zone: EC2 availability zone of the cluster
428
429        :type master_instance_type: str
430        :param master_instance_type: EC2 instance type of the master
431
432        :type slave_instance_type: str
433        :param slave_instance_type: EC2 instance type of the slave nodes
434
435        :type num_instances: int
436        :param num_instances: Number of instances in the Hadoop cluster
437
438        :type action_on_failure: str
439        :param action_on_failure: Action to take if a step terminates
440
441        :type keep_alive: bool
442        :param keep_alive: Denotes whether the cluster should stay
443            alive upon completion
444
445        :type enable_debugging: bool
446        :param enable_debugging: Denotes whether AWS console debugging
447            should be enabled.
448
449        :type hadoop_version: str
450        :param hadoop_version: Version of Hadoop to use. This no longer
451            defaults to '0.20' and now uses the AMI default.
452
453        :type steps: list(boto.emr.Step)
454        :param steps: List of steps to add with the job
455
456        :type bootstrap_actions: list(boto.emr.BootstrapAction)
457        :param bootstrap_actions: List of bootstrap actions that run
458            before Hadoop starts.
459
460        :type instance_groups: list(boto.emr.InstanceGroup)
461        :param instance_groups: Optional list of instance groups to
462            use when creating this job.
463            NB: When provided, this argument supersedes num_instances
464            and master/slave_instance_type.
465
466        :type ami_version: str
467        :param ami_version: Amazon Machine Image (AMI) version to use
468            for instances. Values accepted by EMR are '1.0', '2.0', and
469            'latest'; EMR currently defaults to '1.0' if you don't set
470            'ami_version'.
471
472        :type additional_info: JSON str
473        :param additional_info: A JSON string for selecting additional features
474
475        :type api_params: dict
476        :param api_params: a dictionary of additional parameters to pass
477            directly to the EMR API (so you don't have to upgrade boto to
478            use new EMR features). You can also delete an API parameter
479            by setting it to None.
480
481        :type visible_to_all_users: bool
482        :param visible_to_all_users: Whether the job flow is visible to all IAM
483            users of the AWS account associated with the job flow. If this
484            value is set to ``True``, all IAM users of that AWS
485            account can view and (if they have the proper policy permissions
486            set) manage the job flow. If it is set to ``False``, only
487            the IAM user that created the job flow can view and manage
488            it.
489
490        :type job_flow_role: str
491        :param job_flow_role: An IAM role for the job flow. The EC2
492            instances of the job flow assume this role. The default role is
493            ``EMRJobflowDefault``. In order to use the default role,
494            you must have already created it using the CLI.
495
496        :type service_role: str
497        :param service_role: The IAM role that will be assumed by the Amazon
498            EMR service to access AWS resources on your behalf.
499
500        :rtype: str
501        :return: The jobflow id
502        """
503        params = {}
504        if action_on_failure:
505            params['ActionOnFailure'] = action_on_failure
506        if log_uri:
507            params['LogUri'] = log_uri
508        params['Name'] = name
509
510        # Common instance args
511        common_params = self._build_instance_common_args(ec2_keyname,
512                                                         availability_zone,
513                                                         keep_alive,
514                                                         hadoop_version)
515        params.update(common_params)
516
517        # NB: according to the AWS API's error message, we must
518        # "configure instances either using instance count, master and
519        # slave instance type or instance groups but not both."
520        #
521        # Thus we switch here on the truthiness of instance_groups.
522        if not instance_groups:
523            # Instance args (the common case)
524            instance_params = self._build_instance_count_and_type_args(
525                                                        master_instance_type,
526                                                        slave_instance_type,
527                                                        num_instances)
528            params.update(instance_params)
529        else:
530            # Instance group args (for spot instances or a heterogenous cluster)
531            list_args = self._build_instance_group_list_args(instance_groups)
532            instance_params = dict(
533                ('Instances.%s' % k, v) for k, v in six.iteritems(list_args)
534                )
535            params.update(instance_params)
536
537        # Debugging step from EMR API docs
538        if enable_debugging:
539            debugging_step = JarStep(name='Setup Hadoop Debugging',
540                                     action_on_failure='TERMINATE_JOB_FLOW',
541                                     main_class=None,
542                                     jar=self.DebuggingJar,
543                                     step_args=self.DebuggingArgs)
544            steps.insert(0, debugging_step)
545
546        # Step args
547        if steps:
548            step_args = [self._build_step_args(step) for step in steps]
549            params.update(self._build_step_list(step_args))
550
551        if bootstrap_actions:
552            bootstrap_action_args = [self._build_bootstrap_action_args(bootstrap_action) for bootstrap_action in bootstrap_actions]
553            params.update(self._build_bootstrap_action_list(bootstrap_action_args))
554
555        if ami_version:
556            params['AmiVersion'] = ami_version
557
558        if additional_info is not None:
559            params['AdditionalInfo'] = additional_info
560
561        if api_params:
562            for key, value in six.iteritems(api_params):
563                if value is None:
564                    params.pop(key, None)
565                else:
566                    params[key] = value
567
568        if visible_to_all_users is not None:
569            if visible_to_all_users:
570                params['VisibleToAllUsers'] = 'true'
571            else:
572                params['VisibleToAllUsers'] = 'false'
573
574        if job_flow_role is not None:
575            params['JobFlowRole'] = job_flow_role
576
577        if service_role is not None:
578            params['ServiceRole'] = service_role
579
580        response = self.get_object(
581            'RunJobFlow', params, RunJobFlowResponse, verb='POST')
582        return response.jobflowid
583
584    def set_termination_protection(self, jobflow_id,
585                                   termination_protection_status):
586        """
587        Set termination protection on specified Elastic MapReduce job flows
588
589        :type jobflow_ids: list or str
590        :param jobflow_ids: A list of job flow IDs
591
592        :type termination_protection_status: bool
593        :param termination_protection_status: Termination protection status
594        """
595        assert termination_protection_status in (True, False)
596
597        params = {}
598        params['TerminationProtected'] = (termination_protection_status and "true") or "false"
599        self.build_list_params(params, [jobflow_id], 'JobFlowIds.member')
600
601        return self.get_status('SetTerminationProtection', params, verb='POST')
602
603    def set_visible_to_all_users(self, jobflow_id, visibility):
604        """
605        Set whether specified Elastic Map Reduce job flows are visible to all IAM users
606
607        :type jobflow_ids: list or str
608        :param jobflow_ids: A list of job flow IDs
609
610        :type visibility: bool
611        :param visibility: Visibility
612        """
613        assert visibility in (True, False)
614
615        params = {}
616        params['VisibleToAllUsers'] = (visibility and "true") or "false"
617        self.build_list_params(params, [jobflow_id], 'JobFlowIds.member')
618
619        return self.get_status('SetVisibleToAllUsers', params, verb='POST')
620
621    def _build_bootstrap_action_args(self, bootstrap_action):
622        bootstrap_action_params = {}
623        bootstrap_action_params['ScriptBootstrapAction.Path'] = bootstrap_action.path
624
625        try:
626            bootstrap_action_params['Name'] = bootstrap_action.name
627        except AttributeError:
628            pass
629
630        args = bootstrap_action.args()
631        if args:
632            self.build_list_params(bootstrap_action_params, args, 'ScriptBootstrapAction.Args.member')
633
634        return bootstrap_action_params
635
636    def _build_step_args(self, step):
637        step_params = {}
638        step_params['ActionOnFailure'] = step.action_on_failure
639        step_params['HadoopJarStep.Jar'] = step.jar()
640
641        main_class = step.main_class()
642        if main_class:
643            step_params['HadoopJarStep.MainClass'] = main_class
644
645        args = step.args()
646        if args:
647            self.build_list_params(step_params, args, 'HadoopJarStep.Args.member')
648
649        step_params['Name'] = step.name
650        return step_params
651
652    def _build_bootstrap_action_list(self, bootstrap_actions):
653        if not isinstance(bootstrap_actions, list):
654            bootstrap_actions = [bootstrap_actions]
655
656        params = {}
657        for i, bootstrap_action in enumerate(bootstrap_actions):
658            for key, value in six.iteritems(bootstrap_action):
659                params['BootstrapActions.member.%s.%s' % (i + 1, key)] = value
660        return params
661
662    def _build_step_list(self, steps):
663        if not isinstance(steps, list):
664            steps = [steps]
665
666        params = {}
667        for i, step in enumerate(steps):
668            for key, value in six.iteritems(step):
669                params['Steps.member.%s.%s' % (i+1, key)] = value
670        return params
671
672    def _build_string_list(self, field, items):
673        if not isinstance(items, list):
674            items = [items]
675
676        params = {}
677        for i, item in enumerate(items):
678            params['%s.member.%s' % (field, i + 1)] = item
679        return params
680
681    def _build_tag_list(self, tags):
682        assert isinstance(tags, dict)
683
684        params = {}
685        for i, key_value in enumerate(sorted(six.iteritems(tags)), start=1):
686            key, value = key_value
687            current_prefix = 'Tags.member.%s' % i
688            params['%s.Key' % current_prefix] = key
689            if value:
690                params['%s.Value' % current_prefix] = value
691        return params
692
693    def _build_instance_common_args(self, ec2_keyname, availability_zone,
694                                    keep_alive, hadoop_version):
695        """
696        Takes a number of parameters used when starting a jobflow (as
697        specified in run_jobflow() above). Returns a comparable dict for
698        use in making a RunJobFlow request.
699        """
700        params = {
701            'Instances.KeepJobFlowAliveWhenNoSteps': str(keep_alive).lower(),
702        }
703
704        if hadoop_version:
705            params['Instances.HadoopVersion'] = hadoop_version
706        if ec2_keyname:
707            params['Instances.Ec2KeyName'] = ec2_keyname
708        if availability_zone:
709            params['Instances.Placement.AvailabilityZone'] = availability_zone
710
711        return params
712
713    def _build_instance_count_and_type_args(self, master_instance_type,
714                                            slave_instance_type, num_instances):
715        """
716        Takes a master instance type (string), a slave instance type
717        (string), and a number of instances. Returns a comparable dict
718        for use in making a RunJobFlow request.
719        """
720        params = {'Instances.MasterInstanceType': master_instance_type,
721                  'Instances.SlaveInstanceType': slave_instance_type,
722                  'Instances.InstanceCount': num_instances}
723        return params
724
725    def _build_instance_group_args(self, instance_group):
726        """
727        Takes an InstanceGroup; returns a dict that, when its keys are
728        properly prefixed, can be used for describing InstanceGroups in
729        RunJobFlow or AddInstanceGroups requests.
730        """
731        params = {'InstanceCount': instance_group.num_instances,
732                  'InstanceRole': instance_group.role,
733                  'InstanceType': instance_group.type,
734                  'Name': instance_group.name,
735                  'Market': instance_group.market}
736        if instance_group.market == 'SPOT':
737            params['BidPrice'] = instance_group.bidprice
738        return params
739
740    def _build_instance_group_list_args(self, instance_groups):
741        """
742        Takes a list of InstanceGroups, or a single InstanceGroup. Returns
743        a comparable dict for use in making a RunJobFlow or AddInstanceGroups
744        request.
745        """
746        if not isinstance(instance_groups, list):
747            instance_groups = [instance_groups]
748
749        params = {}
750        for i, instance_group in enumerate(instance_groups):
751            ig_dict = self._build_instance_group_args(instance_group)
752            for key, value in six.iteritems(ig_dict):
753                params['InstanceGroups.member.%d.%s' % (i+1, key)] = value
754        return params
755