1#!/usr/bin/env python 2# 3# Copyright 2010 Google Inc. 4# 5# Licensed under the Apache License, Version 2.0 (the "License"); 6# you may not use this file except in compliance with the License. 7# You may obtain a copy of the License at 8# 9# http://www.apache.org/licenses/LICENSE-2.0 10# 11# Unless required by applicable law or agreed to in writing, software 12# distributed under the License is distributed on an "AS IS" BASIS, 13# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 14# See the License for the specific language governing permissions and 15# limitations under the License. 16 17"""API for controlling MapReduce execution outside of MapReduce framework.""" 18 19 20 21__all__ = ["start_map"] 22 23# pylint: disable=g-bad-name 24# pylint: disable=protected-access 25 26 27import logging 28 29from google.appengine.ext import db 30from mapreduce import handlers 31from mapreduce import model 32from mapreduce import parameters 33from mapreduce import util 34from mapreduce.api import map_job 35 36 37def start_map(name, 38 handler_spec, 39 reader_spec, 40 mapper_parameters, 41 shard_count=None, 42 output_writer_spec=None, 43 mapreduce_parameters=None, 44 base_path=None, 45 queue_name=None, 46 eta=None, 47 countdown=None, 48 hooks_class_name=None, 49 _app=None, 50 in_xg_transaction=False): 51 """Start a new, mapper-only mapreduce. 52 53 Deprecated! Use map_job.start instead. 54 55 If a value can be specified both from an explicit argument and from 56 a dictionary, the value from the explicit argument wins. 57 58 Args: 59 name: mapreduce name. Used only for display purposes. 60 handler_spec: fully qualified name of mapper handler function/class to call. 61 reader_spec: fully qualified name of mapper reader to use 62 mapper_parameters: dictionary of parameters to pass to mapper. These are 63 mapper-specific and also used for reader/writer initialization. 64 Should have format {"input_reader": {}, "output_writer":{}}. Old 65 deprecated style does not have sub dictionaries. 66 shard_count: number of shards to create. 67 mapreduce_parameters: dictionary of mapreduce parameters relevant to the 68 whole job. 69 base_path: base path of mapreduce library handler specified in app.yaml. 70 "/mapreduce" by default. 71 queue_name: taskqueue queue name to be used for mapreduce tasks. 72 see util.get_queue_name. 73 eta: absolute time when the MR should execute. May not be specified 74 if 'countdown' is also supplied. This may be timezone-aware or 75 timezone-naive. 76 countdown: time in seconds into the future that this MR should execute. 77 Defaults to zero. 78 hooks_class_name: fully qualified name of a hooks.Hooks subclass. 79 in_xg_transaction: controls what transaction scope to use to start this MR 80 job. If True, there has to be an already opened cross-group transaction 81 scope. MR will use one entity group from it. 82 If False, MR will create an independent transaction to start the job 83 regardless of any existing transaction scopes. 84 85 Returns: 86 mapreduce id as string. 87 """ 88 if shard_count is None: 89 shard_count = parameters.config.SHARD_COUNT 90 91 if mapper_parameters: 92 mapper_parameters = dict(mapper_parameters) 93 94 # Make sure this old API fill all parameters with default values. 95 mr_params = map_job.JobConfig._get_default_mr_params() 96 if mapreduce_parameters: 97 mr_params.update(mapreduce_parameters) 98 99 # Override default values if user specified them as arguments. 100 if base_path: 101 mr_params["base_path"] = base_path 102 mr_params["queue_name"] = util.get_queue_name(queue_name) 103 104 mapper_spec = model.MapperSpec(handler_spec, 105 reader_spec, 106 mapper_parameters, 107 shard_count, 108 output_writer_spec=output_writer_spec) 109 110 if in_xg_transaction and not db.is_in_transaction(): 111 logging.warning("Expects an opened xg transaction to start mapreduce " 112 "when transactional is True.") 113 114 return handlers.StartJobHandler._start_map( 115 name, 116 mapper_spec, 117 mr_params, 118 # TODO(user): Now that "queue_name" is part of mr_params. 119 # Remove all the other ways to get queue_name after one release. 120 queue_name=mr_params["queue_name"], 121 eta=eta, 122 countdown=countdown, 123 hooks_class_name=hooks_class_name, 124 _app=_app, 125 in_xg_transaction=in_xg_transaction) 126