1#!/usr/bin/env python
2#
3# Copyright 2010 Google Inc.
4#
5# Licensed under the Apache License, Version 2.0 (the "License");
6# you may not use this file except in compliance with the License.
7# You may obtain a copy of the License at
8#
9#     http://www.apache.org/licenses/LICENSE-2.0
10#
11# Unless required by applicable law or agreed to in writing, software
12# distributed under the License is distributed on an "AS IS" BASIS,
13# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14# See the License for the specific language governing permissions and
15# limitations under the License.
16
17"""API for controlling MapReduce execution outside of MapReduce framework."""
18
19
20
21__all__ = ["start_map"]
22
23# pylint: disable=g-bad-name
24# pylint: disable=protected-access
25
26
27import logging
28
29from google.appengine.ext import db
30from mapreduce import handlers
31from mapreduce import model
32from mapreduce import parameters
33from mapreduce import util
34from mapreduce.api import map_job
35
36
37def start_map(name,
38              handler_spec,
39              reader_spec,
40              mapper_parameters,
41              shard_count=None,
42              output_writer_spec=None,
43              mapreduce_parameters=None,
44              base_path=None,
45              queue_name=None,
46              eta=None,
47              countdown=None,
48              hooks_class_name=None,
49              _app=None,
50              in_xg_transaction=False):
51  """Start a new, mapper-only mapreduce.
52
53  Deprecated! Use map_job.start instead.
54
55  If a value can be specified both from an explicit argument and from
56  a dictionary, the value from the explicit argument wins.
57
58  Args:
59    name: mapreduce name. Used only for display purposes.
60    handler_spec: fully qualified name of mapper handler function/class to call.
61    reader_spec: fully qualified name of mapper reader to use
62    mapper_parameters: dictionary of parameters to pass to mapper. These are
63      mapper-specific and also used for reader/writer initialization.
64      Should have format {"input_reader": {}, "output_writer":{}}. Old
65      deprecated style does not have sub dictionaries.
66    shard_count: number of shards to create.
67    mapreduce_parameters: dictionary of mapreduce parameters relevant to the
68      whole job.
69    base_path: base path of mapreduce library handler specified in app.yaml.
70      "/mapreduce" by default.
71    queue_name: taskqueue queue name to be used for mapreduce tasks.
72      see util.get_queue_name.
73    eta: absolute time when the MR should execute. May not be specified
74      if 'countdown' is also supplied. This may be timezone-aware or
75      timezone-naive.
76    countdown: time in seconds into the future that this MR should execute.
77      Defaults to zero.
78    hooks_class_name: fully qualified name of a hooks.Hooks subclass.
79    in_xg_transaction: controls what transaction scope to use to start this MR
80      job. If True, there has to be an already opened cross-group transaction
81      scope. MR will use one entity group from it.
82      If False, MR will create an independent transaction to start the job
83      regardless of any existing transaction scopes.
84
85  Returns:
86    mapreduce id as string.
87  """
88  if shard_count is None:
89    shard_count = parameters.config.SHARD_COUNT
90
91  if mapper_parameters:
92    mapper_parameters = dict(mapper_parameters)
93
94  # Make sure this old API fill all parameters with default values.
95  mr_params = map_job.JobConfig._get_default_mr_params()
96  if mapreduce_parameters:
97    mr_params.update(mapreduce_parameters)
98
99  # Override default values if user specified them as arguments.
100  if base_path:
101    mr_params["base_path"] = base_path
102  mr_params["queue_name"] = util.get_queue_name(queue_name)
103
104  mapper_spec = model.MapperSpec(handler_spec,
105                                 reader_spec,
106                                 mapper_parameters,
107                                 shard_count,
108                                 output_writer_spec=output_writer_spec)
109
110  if in_xg_transaction and not db.is_in_transaction():
111    logging.warning("Expects an opened xg transaction to start mapreduce "
112                    "when transactional is True.")
113
114  return handlers.StartJobHandler._start_map(
115      name,
116      mapper_spec,
117      mr_params,
118      # TODO(user): Now that "queue_name" is part of mr_params.
119      # Remove all the other ways to get queue_name after one release.
120      queue_name=mr_params["queue_name"],
121      eta=eta,
122      countdown=countdown,
123      hooks_class_name=hooks_class_name,
124      _app=_app,
125      in_xg_transaction=in_xg_transaction)
126