1#!/usr/bin/env python
2"""Per job config for map jobs."""
3from mapreduce import hooks
4from mapreduce import input_readers
5from mapreduce import output_writers
6from mapreduce import parameters
7from mapreduce import util
8from mapreduce.api.map_job import input_reader
9from mapreduce.api.map_job import mapper as mapper_module
10
11
12# pylint: disable=protected-access
13# pylint: disable=invalid-name
14
15_Option = parameters._Option
16# Current API Version. The API version used to start a map_job will be
17# saved to its job states. The framework can use this information to
18# handle adding and removing stuffs better.
19_API_VERSION = 1
20
21
22class JobConfig(parameters._Config):
23  """Configurations for a map job.
24
25  Names started with _ are reserved for internal use.
26
27  To create an instance:
28  all option names can be used as keys to __init__.
29  If an option is required, the key must be provided.
30  If an option isn't required and no value is given, the default value
31  will be used.
32  """
33  # Job name in str. UI purpose only.
34  job_name = _Option(basestring, required=True)
35
36  # Job ID. Must be unique across the app.
37  # This is used to store state in datastore.
38  # One will be automatically generated if not given.
39  job_id = _Option(basestring, default_factory=util._get_descending_key)
40
41  # Reference to your mapper.
42  mapper = _Option(mapper_module.Mapper, required=True)
43
44  # The class of input reader to use.
45  input_reader_cls = _Option(input_reader.InputReader, required=True)
46
47  # TODO(user): Create a config object for readers instead of a dict.
48  # Parameters for input reader. Varies by input reader class.
49  input_reader_params = _Option(dict, default_factory=lambda: {})
50
51  # TODO(user): restrict output writer to new API.
52  # The class of output writer to use.
53  output_writer_cls = _Option(output_writers.OutputWriter,
54                              can_be_none=True)
55
56  # Parameters for output writers. Varies by input reader class.
57  output_writer_params = _Option(dict, default_factory=lambda: {})
58
59  # Number of map shards.
60  # This number is only a hint to the map_job framework. This will be
61  # updated to the actual number of running shards after the job starts.
62  shard_count = _Option(int,
63                        default_factory=lambda: parameters.config.SHARD_COUNT)
64
65  # Additional parameters user can supply and read from their mapper.
66  user_params = _Option(dict, default_factory=lambda: {})
67
68  # The queue where all map tasks should run on.
69  queue_name = _Option(
70      basestring, default_factory=lambda: parameters.config.QUEUE_NAME)
71
72  # max attempts to run and retry a shard.
73  shard_max_attempts = _Option(
74      int, default_factory=lambda: parameters.config.SHARD_MAX_ATTEMPTS)
75
76  # The URL to GET after the job finish, regardless of success.
77  # The map_job_id will be provided as a query string key.
78  done_callback_url = _Option(basestring, can_be_none=True)
79
80  # Force datastore writes.
81  _force_writes = _Option(bool, default_factory=lambda: False)
82
83  _base_path = _Option(basestring,
84                       default_factory=lambda: parameters.config.BASE_PATH)
85
86  _task_max_attempts = _Option(
87      int, default_factory=lambda: parameters.config.TASK_MAX_ATTEMPTS)
88
89  _task_max_data_processing_attempts = _Option(
90      int, default_factory=(
91          lambda: parameters.config.TASK_MAX_DATA_PROCESSING_ATTEMPTS))
92
93  _hooks_cls = _Option(hooks.Hooks, can_be_none=True)
94
95  _app = _Option(basestring, can_be_none=True)
96
97  _api_version = _Option(int, default_factory=lambda: _API_VERSION)
98
99  # The following methods are to convert Config to supply for older APIs.
100
101  def _get_mapper_params(self):
102    """Converts self to model.MapperSpec.params."""
103    reader_params = self.input_reader_cls.params_to_json(
104        self.input_reader_params)
105    # TODO(user): Do the same for writer params.
106    return {"input_reader": reader_params,
107            "output_writer": self.output_writer_params}
108
109  def _get_mapper_spec(self):
110    """Converts self to model.MapperSpec."""
111    # pylint: disable=g-import-not-at-top
112    from mapreduce import model
113
114    return model.MapperSpec(
115        handler_spec=util._obj_to_path(self.mapper),
116        input_reader_spec=util._obj_to_path(self.input_reader_cls),
117        params=self._get_mapper_params(),
118        shard_count=self.shard_count,
119        output_writer_spec=util._obj_to_path(self.output_writer_cls))
120
121  def _get_mr_params(self):
122    """Converts self to model.MapreduceSpec.params."""
123    return {"force_writes": self._force_writes,
124            "done_callback": self.done_callback_url,
125            "user_params": self.user_params,
126            "shard_max_attempts": self.shard_max_attempts,
127            "task_max_attempts": self._task_max_attempts,
128            "task_max_data_processing_attempts":
129                self._task_max_data_processing_attempts,
130            "queue_name": self.queue_name,
131            "base_path": self._base_path,
132            "app_id": self._app,
133            "api_version": self._api_version}
134
135  # TODO(user): Ideally we should replace all the *_spec and *_params
136  # in model.py with JobConfig. This not only cleans up codebase, but may
137  # also be necessary for launching input_reader/output_writer API. We don't
138  # want to surface the numerous *_spec and *_params objects in our public API.
139  # The cleanup has to be done over several releases to not to break runtime.
140  @classmethod
141  def _get_default_mr_params(cls):
142    """Gets default values for old API."""
143    cfg = cls(_lenient=True)
144    mr_params = cfg._get_mr_params()
145    mr_params["api_version"] = 0
146    return mr_params
147
148  @classmethod
149  def _to_map_job_config(cls,
150                         mr_spec,
151                         # TODO(user): Remove this parameter after it can be
152                         # read from mr_spec.
153                         queue_name):
154    """Converts model.MapreduceSpec back to JobConfig.
155
156    This method allows our internal methods to use JobConfig directly.
157    This method also allows us to expose JobConfig as an API during execution,
158    despite that it is not saved into datastore.
159
160    Args:
161      mr_spec: model.MapreduceSpec.
162      queue_name: queue name.
163
164    Returns:
165      The JobConfig object for this job.
166    """
167    mapper_spec = mr_spec.mapper
168    # 0 means all the old APIs before api_version is introduced.
169    api_version = mr_spec.params.get("api_version", 0)
170    old_api = api_version == 0
171
172    # Deserialize params from json if input_reader/output_writer are new API.
173    input_reader_cls = mapper_spec.input_reader_class()
174    input_reader_params = input_readers._get_params(mapper_spec)
175    if issubclass(input_reader_cls, input_reader.InputReader):
176      input_reader_params = input_reader_cls.params_from_json(
177          input_reader_params)
178
179    output_writer_cls = mapper_spec.output_writer_class()
180    output_writer_params = output_writers._get_params(mapper_spec)
181    # TODO(user): Call json (de)serialization for writer.
182    # if (output_writer_cls and
183    #     issubclass(output_writer_cls, output_writer.OutputWriter)):
184    #   output_writer_params = output_writer_cls.params_from_json(
185    #       output_writer_params)
186
187    # We can not always convert MapreduceSpec generated by older API
188    # to JobConfig. Thus, mr framework should use/expose the returned JobConfig
189    # object with caution when a job is started with an old API.
190    # In this case, this method only tries not to blow up and assemble a
191    # JobConfig object as accurate as possible.
192    return cls(_lenient=old_api,
193               job_name=mr_spec.name,
194               job_id=mr_spec.mapreduce_id,
195               # handler_spec from older API may not have map_job.Mapper type.
196               mapper=util.for_name(mapper_spec.handler_spec),
197               input_reader_cls=input_reader_cls,
198               input_reader_params=input_reader_params,
199               output_writer_cls=output_writer_cls,
200               output_writer_params=output_writer_params,
201               shard_count=mapper_spec.shard_count,
202               queue_name=queue_name,
203               user_params=mr_spec.params.get("user_params"),
204               shard_max_attempts=mr_spec.params.get("shard_max_attempts"),
205               done_callback_url=mr_spec.params.get("done_callback"),
206               _force_writes=mr_spec.params.get("force_writes"),
207               _base_path=mr_spec.params["base_path"],
208               _task_max_attempts=mr_spec.params.get("task_max_attempts"),
209               _task_max_data_processing_attempts=(
210                   mr_spec.params.get("task_max_data_processing_attempts")),
211               _hooks_cls=util.for_name(mr_spec.hooks_class_name),
212               _app=mr_spec.params.get("app_id"),
213               _api_version=api_version)
214