1#!/usr/bin/env python 2"""Per job config for map jobs.""" 3from mapreduce import hooks 4from mapreduce import input_readers 5from mapreduce import output_writers 6from mapreduce import parameters 7from mapreduce import util 8from mapreduce.api.map_job import input_reader 9from mapreduce.api.map_job import mapper as mapper_module 10 11 12# pylint: disable=protected-access 13# pylint: disable=invalid-name 14 15_Option = parameters._Option 16# Current API Version. The API version used to start a map_job will be 17# saved to its job states. The framework can use this information to 18# handle adding and removing stuffs better. 19_API_VERSION = 1 20 21 22class JobConfig(parameters._Config): 23 """Configurations for a map job. 24 25 Names started with _ are reserved for internal use. 26 27 To create an instance: 28 all option names can be used as keys to __init__. 29 If an option is required, the key must be provided. 30 If an option isn't required and no value is given, the default value 31 will be used. 32 """ 33 # Job name in str. UI purpose only. 34 job_name = _Option(basestring, required=True) 35 36 # Job ID. Must be unique across the app. 37 # This is used to store state in datastore. 38 # One will be automatically generated if not given. 39 job_id = _Option(basestring, default_factory=util._get_descending_key) 40 41 # Reference to your mapper. 42 mapper = _Option(mapper_module.Mapper, required=True) 43 44 # The class of input reader to use. 45 input_reader_cls = _Option(input_reader.InputReader, required=True) 46 47 # TODO(user): Create a config object for readers instead of a dict. 48 # Parameters for input reader. Varies by input reader class. 49 input_reader_params = _Option(dict, default_factory=lambda: {}) 50 51 # TODO(user): restrict output writer to new API. 52 # The class of output writer to use. 53 output_writer_cls = _Option(output_writers.OutputWriter, 54 can_be_none=True) 55 56 # Parameters for output writers. Varies by input reader class. 57 output_writer_params = _Option(dict, default_factory=lambda: {}) 58 59 # Number of map shards. 60 # This number is only a hint to the map_job framework. This will be 61 # updated to the actual number of running shards after the job starts. 62 shard_count = _Option(int, 63 default_factory=lambda: parameters.config.SHARD_COUNT) 64 65 # Additional parameters user can supply and read from their mapper. 66 user_params = _Option(dict, default_factory=lambda: {}) 67 68 # The queue where all map tasks should run on. 69 queue_name = _Option( 70 basestring, default_factory=lambda: parameters.config.QUEUE_NAME) 71 72 # max attempts to run and retry a shard. 73 shard_max_attempts = _Option( 74 int, default_factory=lambda: parameters.config.SHARD_MAX_ATTEMPTS) 75 76 # The URL to GET after the job finish, regardless of success. 77 # The map_job_id will be provided as a query string key. 78 done_callback_url = _Option(basestring, can_be_none=True) 79 80 # Force datastore writes. 81 _force_writes = _Option(bool, default_factory=lambda: False) 82 83 _base_path = _Option(basestring, 84 default_factory=lambda: parameters.config.BASE_PATH) 85 86 _task_max_attempts = _Option( 87 int, default_factory=lambda: parameters.config.TASK_MAX_ATTEMPTS) 88 89 _task_max_data_processing_attempts = _Option( 90 int, default_factory=( 91 lambda: parameters.config.TASK_MAX_DATA_PROCESSING_ATTEMPTS)) 92 93 _hooks_cls = _Option(hooks.Hooks, can_be_none=True) 94 95 _app = _Option(basestring, can_be_none=True) 96 97 _api_version = _Option(int, default_factory=lambda: _API_VERSION) 98 99 # The following methods are to convert Config to supply for older APIs. 100 101 def _get_mapper_params(self): 102 """Converts self to model.MapperSpec.params.""" 103 reader_params = self.input_reader_cls.params_to_json( 104 self.input_reader_params) 105 # TODO(user): Do the same for writer params. 106 return {"input_reader": reader_params, 107 "output_writer": self.output_writer_params} 108 109 def _get_mapper_spec(self): 110 """Converts self to model.MapperSpec.""" 111 # pylint: disable=g-import-not-at-top 112 from mapreduce import model 113 114 return model.MapperSpec( 115 handler_spec=util._obj_to_path(self.mapper), 116 input_reader_spec=util._obj_to_path(self.input_reader_cls), 117 params=self._get_mapper_params(), 118 shard_count=self.shard_count, 119 output_writer_spec=util._obj_to_path(self.output_writer_cls)) 120 121 def _get_mr_params(self): 122 """Converts self to model.MapreduceSpec.params.""" 123 return {"force_writes": self._force_writes, 124 "done_callback": self.done_callback_url, 125 "user_params": self.user_params, 126 "shard_max_attempts": self.shard_max_attempts, 127 "task_max_attempts": self._task_max_attempts, 128 "task_max_data_processing_attempts": 129 self._task_max_data_processing_attempts, 130 "queue_name": self.queue_name, 131 "base_path": self._base_path, 132 "app_id": self._app, 133 "api_version": self._api_version} 134 135 # TODO(user): Ideally we should replace all the *_spec and *_params 136 # in model.py with JobConfig. This not only cleans up codebase, but may 137 # also be necessary for launching input_reader/output_writer API. We don't 138 # want to surface the numerous *_spec and *_params objects in our public API. 139 # The cleanup has to be done over several releases to not to break runtime. 140 @classmethod 141 def _get_default_mr_params(cls): 142 """Gets default values for old API.""" 143 cfg = cls(_lenient=True) 144 mr_params = cfg._get_mr_params() 145 mr_params["api_version"] = 0 146 return mr_params 147 148 @classmethod 149 def _to_map_job_config(cls, 150 mr_spec, 151 # TODO(user): Remove this parameter after it can be 152 # read from mr_spec. 153 queue_name): 154 """Converts model.MapreduceSpec back to JobConfig. 155 156 This method allows our internal methods to use JobConfig directly. 157 This method also allows us to expose JobConfig as an API during execution, 158 despite that it is not saved into datastore. 159 160 Args: 161 mr_spec: model.MapreduceSpec. 162 queue_name: queue name. 163 164 Returns: 165 The JobConfig object for this job. 166 """ 167 mapper_spec = mr_spec.mapper 168 # 0 means all the old APIs before api_version is introduced. 169 api_version = mr_spec.params.get("api_version", 0) 170 old_api = api_version == 0 171 172 # Deserialize params from json if input_reader/output_writer are new API. 173 input_reader_cls = mapper_spec.input_reader_class() 174 input_reader_params = input_readers._get_params(mapper_spec) 175 if issubclass(input_reader_cls, input_reader.InputReader): 176 input_reader_params = input_reader_cls.params_from_json( 177 input_reader_params) 178 179 output_writer_cls = mapper_spec.output_writer_class() 180 output_writer_params = output_writers._get_params(mapper_spec) 181 # TODO(user): Call json (de)serialization for writer. 182 # if (output_writer_cls and 183 # issubclass(output_writer_cls, output_writer.OutputWriter)): 184 # output_writer_params = output_writer_cls.params_from_json( 185 # output_writer_params) 186 187 # We can not always convert MapreduceSpec generated by older API 188 # to JobConfig. Thus, mr framework should use/expose the returned JobConfig 189 # object with caution when a job is started with an old API. 190 # In this case, this method only tries not to blow up and assemble a 191 # JobConfig object as accurate as possible. 192 return cls(_lenient=old_api, 193 job_name=mr_spec.name, 194 job_id=mr_spec.mapreduce_id, 195 # handler_spec from older API may not have map_job.Mapper type. 196 mapper=util.for_name(mapper_spec.handler_spec), 197 input_reader_cls=input_reader_cls, 198 input_reader_params=input_reader_params, 199 output_writer_cls=output_writer_cls, 200 output_writer_params=output_writer_params, 201 shard_count=mapper_spec.shard_count, 202 queue_name=queue_name, 203 user_params=mr_spec.params.get("user_params"), 204 shard_max_attempts=mr_spec.params.get("shard_max_attempts"), 205 done_callback_url=mr_spec.params.get("done_callback"), 206 _force_writes=mr_spec.params.get("force_writes"), 207 _base_path=mr_spec.params["base_path"], 208 _task_max_attempts=mr_spec.params.get("task_max_attempts"), 209 _task_max_data_processing_attempts=( 210 mr_spec.params.get("task_max_data_processing_attempts")), 211 _hooks_cls=util.for_name(mr_spec.hooks_class_name), 212 _app=mr_spec.params.get("app_id"), 213 _api_version=api_version) 214