1# Copyright (c) 2010 Spotify AB
2# Copyright (c) 2010-2011 Yelp
3#
4# Permission is hereby granted, free of charge, to any person obtaining a
5# copy of this software and associated documentation files (the
6# "Software"), to deal in the Software without restriction, including
7# without limitation the rights to use, copy, modify, merge, publish, dis-
8# tribute, sublicense, and/or sell copies of the Software, and to permit
9# persons to whom the Software is furnished to do so, subject to the fol-
10# lowing conditions:
11#
12# The above copyright notice and this permission notice shall be included
13# in all copies or substantial portions of the Software.
14#
15# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS
16# OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABIL-
17# ITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT
18# SHALL THE AUTHOR BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY,
19# WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
20# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS
21# IN THE SOFTWARE.
22
23from boto.compat import six
24
25
26class Step(object):
27    """
28    Jobflow Step base class
29    """
30    def jar(self):
31        """
32        :rtype: str
33        :return: URI to the jar
34        """
35        raise NotImplemented()
36
37    def args(self):
38        """
39        :rtype: list(str)
40        :return: List of arguments for the step
41        """
42        raise NotImplemented()
43
44    def main_class(self):
45        """
46        :rtype: str
47        :return: The main class name
48        """
49        raise NotImplemented()
50
51
52class JarStep(Step):
53    """
54    Custom jar step
55    """
56    def __init__(self, name, jar, main_class=None,
57                 action_on_failure='TERMINATE_JOB_FLOW', step_args=None):
58        """
59        A elastic mapreduce step that executes a jar
60
61        :type name: str
62        :param name: The name of the step
63        :type jar: str
64        :param jar: S3 URI to the Jar file
65        :type main_class: str
66        :param main_class: The class to execute in the jar
67        :type action_on_failure: str
68        :param action_on_failure: An action, defined in the EMR docs to
69            take on failure.
70        :type step_args: list(str)
71        :param step_args: A list of arguments to pass to the step
72        """
73        self.name = name
74        self._jar = jar
75        self._main_class = main_class
76        self.action_on_failure = action_on_failure
77
78        if isinstance(step_args, six.string_types):
79            step_args = [step_args]
80
81        self.step_args = step_args
82
83    def jar(self):
84        return self._jar
85
86    def args(self):
87        args = []
88
89        if self.step_args:
90            args.extend(self.step_args)
91
92        return args
93
94    def main_class(self):
95        return self._main_class
96
97
98class StreamingStep(Step):
99    """
100    Hadoop streaming step
101    """
102    def __init__(self, name, mapper, reducer=None, combiner=None,
103                 action_on_failure='TERMINATE_JOB_FLOW',
104                 cache_files=None, cache_archives=None,
105                 step_args=None, input=None, output=None,
106                 jar='/home/hadoop/contrib/streaming/hadoop-streaming.jar'):
107        """
108        A hadoop streaming elastic mapreduce step
109
110        :type name: str
111        :param name: The name of the step
112        :type mapper: str
113        :param mapper: The mapper URI
114        :type reducer: str
115        :param reducer: The reducer URI
116        :type combiner: str
117        :param combiner: The combiner URI. Only works for Hadoop 0.20
118            and later!
119        :type action_on_failure: str
120        :param action_on_failure: An action, defined in the EMR docs to
121            take on failure.
122        :type cache_files: list(str)
123        :param cache_files: A list of cache files to be bundled with the job
124        :type cache_archives: list(str)
125        :param cache_archives: A list of jar archives to be bundled with
126            the job
127        :type step_args: list(str)
128        :param step_args: A list of arguments to pass to the step
129        :type input: str or a list of str
130        :param input: The input uri
131        :type output: str
132        :param output: The output uri
133        :type jar: str
134        :param jar: The hadoop streaming jar. This can be either a local
135            path on the master node, or an s3:// URI.
136        """
137        self.name = name
138        self.mapper = mapper
139        self.reducer = reducer
140        self.combiner = combiner
141        self.action_on_failure = action_on_failure
142        self.cache_files = cache_files
143        self.cache_archives = cache_archives
144        self.input = input
145        self.output = output
146        self._jar = jar
147
148        if isinstance(step_args, six.string_types):
149            step_args = [step_args]
150
151        self.step_args = step_args
152
153    def jar(self):
154        return self._jar
155
156    def main_class(self):
157        return None
158
159    def args(self):
160        args = []
161
162        # put extra args BEFORE -mapper and -reducer so that e.g. -libjar
163        # will work
164        if self.step_args:
165            args.extend(self.step_args)
166
167        args.extend(['-mapper', self.mapper])
168
169        if self.combiner:
170            args.extend(['-combiner', self.combiner])
171
172        if self.reducer:
173            args.extend(['-reducer', self.reducer])
174        else:
175            args.extend(['-jobconf', 'mapred.reduce.tasks=0'])
176
177        if self.input:
178            if isinstance(self.input, list):
179                for input in self.input:
180                    args.extend(('-input', input))
181            else:
182                args.extend(('-input', self.input))
183        if self.output:
184            args.extend(('-output', self.output))
185
186        if self.cache_files:
187            for cache_file in self.cache_files:
188                args.extend(('-cacheFile', cache_file))
189
190        if self.cache_archives:
191            for cache_archive in self.cache_archives:
192                args.extend(('-cacheArchive', cache_archive))
193
194        return args
195
196    def __repr__(self):
197        return '%s.%s(name=%r, mapper=%r, reducer=%r, action_on_failure=%r, cache_files=%r, cache_archives=%r, step_args=%r, input=%r, output=%r, jar=%r)' % (
198            self.__class__.__module__, self.__class__.__name__,
199            self.name, self.mapper, self.reducer, self.action_on_failure,
200            self.cache_files, self.cache_archives, self.step_args,
201            self.input, self.output, self._jar)
202
203
204class ScriptRunnerStep(JarStep):
205
206    ScriptRunnerJar = 's3n://us-east-1.elasticmapreduce/libs/script-runner/script-runner.jar'
207
208    def __init__(self, name, **kw):
209        super(ScriptRunnerStep, self).__init__(name, self.ScriptRunnerJar, **kw)
210
211
212class PigBase(ScriptRunnerStep):
213
214    BaseArgs = ['s3n://us-east-1.elasticmapreduce/libs/pig/pig-script',
215                '--base-path', 's3n://us-east-1.elasticmapreduce/libs/pig/']
216
217
218class InstallPigStep(PigBase):
219    """
220    Install pig on emr step
221    """
222
223    InstallPigName = 'Install Pig'
224
225    def __init__(self, pig_versions='latest'):
226        step_args = []
227        step_args.extend(self.BaseArgs)
228        step_args.extend(['--install-pig'])
229        step_args.extend(['--pig-versions', pig_versions])
230        super(InstallPigStep, self).__init__(self.InstallPigName, step_args=step_args)
231
232
233class PigStep(PigBase):
234    """
235    Pig script step
236    """
237
238    def __init__(self, name, pig_file, pig_versions='latest', pig_args=[]):
239        step_args = []
240        step_args.extend(self.BaseArgs)
241        step_args.extend(['--pig-versions', pig_versions])
242        step_args.extend(['--run-pig-script', '--args', '-f', pig_file])
243        step_args.extend(pig_args)
244        super(PigStep, self).__init__(name, step_args=step_args)
245
246
247class HiveBase(ScriptRunnerStep):
248
249    BaseArgs = ['s3n://us-east-1.elasticmapreduce/libs/hive/hive-script',
250                '--base-path', 's3n://us-east-1.elasticmapreduce/libs/hive/']
251
252
253class InstallHiveStep(HiveBase):
254    """
255    Install Hive on EMR step
256    """
257    InstallHiveName = 'Install Hive'
258
259    def __init__(self, hive_versions='latest', hive_site=None):
260        step_args = []
261        step_args.extend(self.BaseArgs)
262        step_args.extend(['--install-hive'])
263        step_args.extend(['--hive-versions', hive_versions])
264        if hive_site is not None:
265            step_args.extend(['--hive-site=%s' % hive_site])
266        super(InstallHiveStep, self).__init__(self.InstallHiveName,
267                                  step_args=step_args)
268
269
270class HiveStep(HiveBase):
271    """
272    Hive script step
273    """
274
275    def __init__(self, name, hive_file, hive_versions='latest',
276                 hive_args=None):
277        step_args = []
278        step_args.extend(self.BaseArgs)
279        step_args.extend(['--hive-versions', hive_versions])
280        step_args.extend(['--run-hive-script', '--args', '-f', hive_file])
281        if hive_args is not None:
282            step_args.extend(hive_args)
283        super(HiveStep, self).__init__(name, step_args=step_args)
284