1#!/usr/bin/env python
2"""Map job execution context."""
3
4import logging
5
6# pylint: disable=invalid-name
7
8
9class JobContext(object):
10  """Context for map job."""
11
12  def __init__(self, job_config):
13    """Init.
14
15    Read only properties:
16      job_config: map_job.JobConfig for the job.
17
18    Args:
19      job_config: map_job.JobConfig.
20    """
21    self.job_config = job_config
22
23
24class ShardContext(object):
25  """Context for a shard."""
26
27  def __init__(self, job_context, shard_state):
28    """Init.
29
30    The signature of __init__ is subject to change.
31
32    Read only properties:
33      job_context: JobContext object.
34      id: str. of format job_id-shard_number.
35      number: int. shard number. 0 indexed.
36      attempt: int. The current attempt at executing this shard.
37        Starting at 1.
38
39    Args:
40      job_context: map_job.JobConfig.
41      shard_state: model.ShardState.
42    """
43    self.job_context = job_context
44    self.id = shard_state.shard_id
45    self.number = shard_state.shard_number
46    self.attempt = shard_state.retries + 1
47    self._state = shard_state
48
49  # TODO(user): standardize and document what format counter_name should take.
50  def incr(self, counter_name, delta=1):
51    """Changes counter by delta.
52
53    Args:
54      counter_name: the name of the counter to change. str.
55      delta: int.
56    """
57    self._state.counters_map.increment(counter_name, delta)
58
59  def counter(self, counter_name, default=0):
60    """Get the current counter value.
61
62    Args:
63      counter_name: name of the counter in string.
64      default: default value in int if one doesn't exist.
65
66    Returns:
67      Current value of the counter.
68    """
69    return self._state.counters_map.get(counter_name, default)
70
71
72class SliceContext(object):
73  """Context for map job."""
74
75  def __init__(self, shard_context, shard_state, tstate):
76    """Init.
77
78    The signature of __init__ is subject to change.
79
80    Read only properties:
81      job_context: JobContext object.
82      shard_context: ShardContext object.
83      number: int. slice number. 0 indexed.
84      attempt: int. The current attempt at executing this slice.
85        starting at 1.
86
87    Args:
88      shard_context: map_job.JobConfig.
89      shard_state: model.ShardState.
90      tstate: model.TransientShardstate.
91    """
92    self._tstate = tstate
93    self.job_context = shard_context.job_context
94    self.shard_context = shard_context
95    self.number = shard_state.slice_id
96    self.attempt = shard_state.slice_retries + 1
97
98  def incr(self, counter_name, delta=1):
99    """See shard_context.count."""
100    self.shard_context.incr(counter_name, delta)
101
102  def counter(self, counter_name, default=0):
103    """See shard_context.count."""
104    return self.shard_context.counter(counter_name, default)
105
106  def emit(self, value):
107    """Emits a value to output writer.
108
109    Args:
110      value: a value of type expected by the output writer.
111    """
112    if not self._tstate.output_writer:
113      logging.error("emit is called, but no output writer is set.")
114      return
115    self._tstate.output_writer.write(value)
116