1# Copyright (c) 2010 Spotify AB 2# Copyright (c) 2010-2011 Yelp 3# 4# Permission is hereby granted, free of charge, to any person obtaining a 5# copy of this software and associated documentation files (the 6# "Software"), to deal in the Software without restriction, including 7# without limitation the rights to use, copy, modify, merge, publish, dis- 8# tribute, sublicense, and/or sell copies of the Software, and to permit 9# persons to whom the Software is furnished to do so, subject to the fol- 10# lowing conditions: 11# 12# The above copyright notice and this permission notice shall be included 13# in all copies or substantial portions of the Software. 14# 15# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS 16# OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABIL- 17# ITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT 18# SHALL THE AUTHOR BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, 19# WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, 20# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS 21# IN THE SOFTWARE. 22 23from boto.compat import six 24 25 26class Step(object): 27 """ 28 Jobflow Step base class 29 """ 30 def jar(self): 31 """ 32 :rtype: str 33 :return: URI to the jar 34 """ 35 raise NotImplemented() 36 37 def args(self): 38 """ 39 :rtype: list(str) 40 :return: List of arguments for the step 41 """ 42 raise NotImplemented() 43 44 def main_class(self): 45 """ 46 :rtype: str 47 :return: The main class name 48 """ 49 raise NotImplemented() 50 51 52class JarStep(Step): 53 """ 54 Custom jar step 55 """ 56 def __init__(self, name, jar, main_class=None, 57 action_on_failure='TERMINATE_JOB_FLOW', step_args=None): 58 """ 59 A elastic mapreduce step that executes a jar 60 61 :type name: str 62 :param name: The name of the step 63 :type jar: str 64 :param jar: S3 URI to the Jar file 65 :type main_class: str 66 :param main_class: The class to execute in the jar 67 :type action_on_failure: str 68 :param action_on_failure: An action, defined in the EMR docs to 69 take on failure. 70 :type step_args: list(str) 71 :param step_args: A list of arguments to pass to the step 72 """ 73 self.name = name 74 self._jar = jar 75 self._main_class = main_class 76 self.action_on_failure = action_on_failure 77 78 if isinstance(step_args, six.string_types): 79 step_args = [step_args] 80 81 self.step_args = step_args 82 83 def jar(self): 84 return self._jar 85 86 def args(self): 87 args = [] 88 89 if self.step_args: 90 args.extend(self.step_args) 91 92 return args 93 94 def main_class(self): 95 return self._main_class 96 97 98class StreamingStep(Step): 99 """ 100 Hadoop streaming step 101 """ 102 def __init__(self, name, mapper, reducer=None, combiner=None, 103 action_on_failure='TERMINATE_JOB_FLOW', 104 cache_files=None, cache_archives=None, 105 step_args=None, input=None, output=None, 106 jar='/home/hadoop/contrib/streaming/hadoop-streaming.jar'): 107 """ 108 A hadoop streaming elastic mapreduce step 109 110 :type name: str 111 :param name: The name of the step 112 :type mapper: str 113 :param mapper: The mapper URI 114 :type reducer: str 115 :param reducer: The reducer URI 116 :type combiner: str 117 :param combiner: The combiner URI. Only works for Hadoop 0.20 118 and later! 119 :type action_on_failure: str 120 :param action_on_failure: An action, defined in the EMR docs to 121 take on failure. 122 :type cache_files: list(str) 123 :param cache_files: A list of cache files to be bundled with the job 124 :type cache_archives: list(str) 125 :param cache_archives: A list of jar archives to be bundled with 126 the job 127 :type step_args: list(str) 128 :param step_args: A list of arguments to pass to the step 129 :type input: str or a list of str 130 :param input: The input uri 131 :type output: str 132 :param output: The output uri 133 :type jar: str 134 :param jar: The hadoop streaming jar. This can be either a local 135 path on the master node, or an s3:// URI. 136 """ 137 self.name = name 138 self.mapper = mapper 139 self.reducer = reducer 140 self.combiner = combiner 141 self.action_on_failure = action_on_failure 142 self.cache_files = cache_files 143 self.cache_archives = cache_archives 144 self.input = input 145 self.output = output 146 self._jar = jar 147 148 if isinstance(step_args, six.string_types): 149 step_args = [step_args] 150 151 self.step_args = step_args 152 153 def jar(self): 154 return self._jar 155 156 def main_class(self): 157 return None 158 159 def args(self): 160 args = [] 161 162 # put extra args BEFORE -mapper and -reducer so that e.g. -libjar 163 # will work 164 if self.step_args: 165 args.extend(self.step_args) 166 167 args.extend(['-mapper', self.mapper]) 168 169 if self.combiner: 170 args.extend(['-combiner', self.combiner]) 171 172 if self.reducer: 173 args.extend(['-reducer', self.reducer]) 174 else: 175 args.extend(['-jobconf', 'mapred.reduce.tasks=0']) 176 177 if self.input: 178 if isinstance(self.input, list): 179 for input in self.input: 180 args.extend(('-input', input)) 181 else: 182 args.extend(('-input', self.input)) 183 if self.output: 184 args.extend(('-output', self.output)) 185 186 if self.cache_files: 187 for cache_file in self.cache_files: 188 args.extend(('-cacheFile', cache_file)) 189 190 if self.cache_archives: 191 for cache_archive in self.cache_archives: 192 args.extend(('-cacheArchive', cache_archive)) 193 194 return args 195 196 def __repr__(self): 197 return '%s.%s(name=%r, mapper=%r, reducer=%r, action_on_failure=%r, cache_files=%r, cache_archives=%r, step_args=%r, input=%r, output=%r, jar=%r)' % ( 198 self.__class__.__module__, self.__class__.__name__, 199 self.name, self.mapper, self.reducer, self.action_on_failure, 200 self.cache_files, self.cache_archives, self.step_args, 201 self.input, self.output, self._jar) 202 203 204class ScriptRunnerStep(JarStep): 205 206 ScriptRunnerJar = 's3n://us-east-1.elasticmapreduce/libs/script-runner/script-runner.jar' 207 208 def __init__(self, name, **kw): 209 super(ScriptRunnerStep, self).__init__(name, self.ScriptRunnerJar, **kw) 210 211 212class PigBase(ScriptRunnerStep): 213 214 BaseArgs = ['s3n://us-east-1.elasticmapreduce/libs/pig/pig-script', 215 '--base-path', 's3n://us-east-1.elasticmapreduce/libs/pig/'] 216 217 218class InstallPigStep(PigBase): 219 """ 220 Install pig on emr step 221 """ 222 223 InstallPigName = 'Install Pig' 224 225 def __init__(self, pig_versions='latest'): 226 step_args = [] 227 step_args.extend(self.BaseArgs) 228 step_args.extend(['--install-pig']) 229 step_args.extend(['--pig-versions', pig_versions]) 230 super(InstallPigStep, self).__init__(self.InstallPigName, step_args=step_args) 231 232 233class PigStep(PigBase): 234 """ 235 Pig script step 236 """ 237 238 def __init__(self, name, pig_file, pig_versions='latest', pig_args=[]): 239 step_args = [] 240 step_args.extend(self.BaseArgs) 241 step_args.extend(['--pig-versions', pig_versions]) 242 step_args.extend(['--run-pig-script', '--args', '-f', pig_file]) 243 step_args.extend(pig_args) 244 super(PigStep, self).__init__(name, step_args=step_args) 245 246 247class HiveBase(ScriptRunnerStep): 248 249 BaseArgs = ['s3n://us-east-1.elasticmapreduce/libs/hive/hive-script', 250 '--base-path', 's3n://us-east-1.elasticmapreduce/libs/hive/'] 251 252 253class InstallHiveStep(HiveBase): 254 """ 255 Install Hive on EMR step 256 """ 257 InstallHiveName = 'Install Hive' 258 259 def __init__(self, hive_versions='latest', hive_site=None): 260 step_args = [] 261 step_args.extend(self.BaseArgs) 262 step_args.extend(['--install-hive']) 263 step_args.extend(['--hive-versions', hive_versions]) 264 if hive_site is not None: 265 step_args.extend(['--hive-site=%s' % hive_site]) 266 super(InstallHiveStep, self).__init__(self.InstallHiveName, 267 step_args=step_args) 268 269 270class HiveStep(HiveBase): 271 """ 272 Hive script step 273 """ 274 275 def __init__(self, name, hive_file, hive_versions='latest', 276 hive_args=None): 277 step_args = [] 278 step_args.extend(self.BaseArgs) 279 step_args.extend(['--hive-versions', hive_versions]) 280 step_args.extend(['--run-hive-script', '--args', '-f', hive_file]) 281 if hive_args is not None: 282 step_args.extend(hive_args) 283 super(HiveStep, self).__init__(name, step_args=step_args) 284