1# Copyright 2010 Google Inc. All Rights Reserved.
2#
3
4import logging
5import os
6import re
7import threading
8
9from automation.common import job
10from automation.common import logger
11from automation.server.job_executer import JobExecuter
12
13
14class IdProducerPolicy(object):
15  """Produces series of unique integer IDs.
16
17  Example:
18      id_producer = IdProducerPolicy()
19      id_a = id_producer.GetNextId()
20      id_b = id_producer.GetNextId()
21      assert id_a != id_b
22  """
23
24  def __init__(self):
25    self._counter = 1
26
27  def Initialize(self, home_prefix, home_pattern):
28    """Find first available ID based on a directory listing.
29
30    Args:
31      home_prefix: A directory to be traversed.
32      home_pattern: A regexp describing all files/directories that will be
33        considered. The regexp must contain exactly one match group with name
34        "id", which must match an integer number.
35
36    Example:
37      id_producer.Initialize(JOBDIR_PREFIX, 'job-(?P<id>\d+)')
38    """
39    harvested_ids = []
40
41    if os.path.isdir(home_prefix):
42      for filename in os.listdir(home_prefix):
43        path = os.path.join(home_prefix, filename)
44
45        if os.path.isdir(path):
46          match = re.match(home_pattern, filename)
47
48          if match:
49            harvested_ids.append(int(match.group('id')))
50
51    self._counter = max(harvested_ids or [0]) + 1
52
53  def GetNextId(self):
54    """Calculates another ID considered to be unique."""
55    new_id = self._counter
56    self._counter += 1
57    return new_id
58
59
60class JobManager(threading.Thread):
61
62  def __init__(self, machine_manager):
63    threading.Thread.__init__(self, name=self.__class__.__name__)
64    self.all_jobs = []
65    self.ready_jobs = []
66    self.job_executer_mapping = {}
67
68    self.machine_manager = machine_manager
69
70    self._lock = threading.Lock()
71    self._jobs_available = threading.Condition(self._lock)
72    self._exit_request = False
73
74    self.listeners = []
75    self.listeners.append(self)
76
77    self._id_producer = IdProducerPolicy()
78    self._id_producer.Initialize(job.Job.WORKDIR_PREFIX, 'job-(?P<id>\d+)')
79
80    self._logger = logging.getLogger(self.__class__.__name__)
81
82  def StartJobManager(self):
83    self._logger.info('Starting...')
84
85    with self._lock:
86      self.start()
87      self._jobs_available.notifyAll()
88
89  def StopJobManager(self):
90    self._logger.info('Shutdown request received.')
91
92    with self._lock:
93      for job_ in self.all_jobs:
94        self._KillJob(job_.id)
95
96      # Signal to die
97      self._exit_request = True
98      self._jobs_available.notifyAll()
99
100    # Wait for all job threads to finish
101    for executer in self.job_executer_mapping.values():
102      executer.join()
103
104  def KillJob(self, job_id):
105    """Kill a job by id.
106
107    Does not block until the job is completed.
108    """
109    with self._lock:
110      self._KillJob(job_id)
111
112  def GetJob(self, job_id):
113    for job_ in self.all_jobs:
114      if job_.id == job_id:
115        return job_
116    return None
117
118  def _KillJob(self, job_id):
119    self._logger.info('Killing [Job: %d].', job_id)
120
121    if job_id in self.job_executer_mapping:
122      self.job_executer_mapping[job_id].Kill()
123    for job_ in self.ready_jobs:
124      if job_.id == job_id:
125        self.ready_jobs.remove(job_)
126        break
127
128  def AddJob(self, job_):
129    with self._lock:
130      job_.id = self._id_producer.GetNextId()
131
132      self.all_jobs.append(job_)
133      # Only queue a job as ready if it has no dependencies
134      if job_.is_ready:
135        self.ready_jobs.append(job_)
136
137      self._jobs_available.notifyAll()
138
139    return job_.id
140
141  def CleanUpJob(self, job_):
142    with self._lock:
143      if job_.id in self.job_executer_mapping:
144        self.job_executer_mapping[job_.id].CleanUpWorkDir()
145        del self.job_executer_mapping[job_.id]
146      # TODO(raymes): remove job from self.all_jobs
147
148  def NotifyJobComplete(self, job_):
149    self.machine_manager.ReturnMachines(job_.machines)
150
151    with self._lock:
152      self._logger.debug('Handling %r completion event.', job_)
153
154      if job_.status == job.STATUS_SUCCEEDED:
155        for succ in job_.successors:
156          if succ.is_ready:
157            if succ not in self.ready_jobs:
158              self.ready_jobs.append(succ)
159
160      self._jobs_available.notifyAll()
161
162  def AddListener(self, listener):
163    self.listeners.append(listener)
164
165  @logger.HandleUncaughtExceptions
166  def run(self):
167    self._logger.info('Started.')
168
169    while not self._exit_request:
170      with self._lock:
171        # Get the next ready job, block if there are none
172        self._jobs_available.wait()
173
174        while self.ready_jobs:
175          ready_job = self.ready_jobs.pop()
176
177          required_machines = ready_job.machine_dependencies
178          for pred in ready_job.predecessors:
179            required_machines[0].AddPreferredMachine(
180                pred.primary_machine.hostname)
181
182          machines = self.machine_manager.GetMachines(required_machines)
183          if not machines:
184            # If we can't get the necessary machines right now, simply wait
185            # for some jobs to complete
186            self.ready_jobs.insert(0, ready_job)
187            break
188          else:
189            # Mark as executing
190            executer = JobExecuter(ready_job, machines, self.listeners)
191            executer.start()
192            self.job_executer_mapping[ready_job.id] = executer
193
194    self._logger.info('Stopped.')
195