1#!/usr/bin/env python 2"""Map job execution context.""" 3 4import logging 5 6# pylint: disable=invalid-name 7 8 9class JobContext(object): 10 """Context for map job.""" 11 12 def __init__(self, job_config): 13 """Init. 14 15 Read only properties: 16 job_config: map_job.JobConfig for the job. 17 18 Args: 19 job_config: map_job.JobConfig. 20 """ 21 self.job_config = job_config 22 23 24class ShardContext(object): 25 """Context for a shard.""" 26 27 def __init__(self, job_context, shard_state): 28 """Init. 29 30 The signature of __init__ is subject to change. 31 32 Read only properties: 33 job_context: JobContext object. 34 id: str. of format job_id-shard_number. 35 number: int. shard number. 0 indexed. 36 attempt: int. The current attempt at executing this shard. 37 Starting at 1. 38 39 Args: 40 job_context: map_job.JobConfig. 41 shard_state: model.ShardState. 42 """ 43 self.job_context = job_context 44 self.id = shard_state.shard_id 45 self.number = shard_state.shard_number 46 self.attempt = shard_state.retries + 1 47 self._state = shard_state 48 49 # TODO(user): standardize and document what format counter_name should take. 50 def incr(self, counter_name, delta=1): 51 """Changes counter by delta. 52 53 Args: 54 counter_name: the name of the counter to change. str. 55 delta: int. 56 """ 57 self._state.counters_map.increment(counter_name, delta) 58 59 def counter(self, counter_name, default=0): 60 """Get the current counter value. 61 62 Args: 63 counter_name: name of the counter in string. 64 default: default value in int if one doesn't exist. 65 66 Returns: 67 Current value of the counter. 68 """ 69 return self._state.counters_map.get(counter_name, default) 70 71 72class SliceContext(object): 73 """Context for map job.""" 74 75 def __init__(self, shard_context, shard_state, tstate): 76 """Init. 77 78 The signature of __init__ is subject to change. 79 80 Read only properties: 81 job_context: JobContext object. 82 shard_context: ShardContext object. 83 number: int. slice number. 0 indexed. 84 attempt: int. The current attempt at executing this slice. 85 starting at 1. 86 87 Args: 88 shard_context: map_job.JobConfig. 89 shard_state: model.ShardState. 90 tstate: model.TransientShardstate. 91 """ 92 self._tstate = tstate 93 self.job_context = shard_context.job_context 94 self.shard_context = shard_context 95 self.number = shard_state.slice_id 96 self.attempt = shard_state.slice_retries + 1 97 98 def incr(self, counter_name, delta=1): 99 """See shard_context.count.""" 100 self.shard_context.incr(counter_name, delta) 101 102 def counter(self, counter_name, default=0): 103 """See shard_context.count.""" 104 return self.shard_context.counter(counter_name, default) 105 106 def emit(self, value): 107 """Emits a value to output writer. 108 109 Args: 110 value: a value of type expected by the output writer. 111 """ 112 if not self._tstate.output_writer: 113 logging.error("emit is called, but no output writer is set.") 114 return 115 self._tstate.output_writer.write(value) 116