1# Copyright 2017 The Chromium Authors. All rights reserved.
2# Use of this source code is governed by a BSD-style license that can be
3# found in the LICENSE file.
4
5"""Job leasing utilities
6
7Jobs are leased to processes to own and run.  A process owning a job
8obtain a job lease.  Ongoing ownership of the lease is established using
9an exclusive fcntl lock on the lease file.
10
11If a lease file is older than a few seconds and is not locked, then its
12owning process should be considered crashed.
13"""
14
15from __future__ import absolute_import
16from __future__ import division
17from __future__ import print_function
18
19import contextlib
20import errno
21import fcntl
22import logging
23import os
24import socket
25import time
26
27from scandir import scandir
28
29logger = logging.getLogger(__name__)
30
31
32@contextlib.contextmanager
33def obtain_lease(path):
34    """Return a context manager owning a lease file.
35
36    The process that obtains the lease will maintain an exclusive,
37    unlimited fcntl lock on the lock file.
38    """
39    with open(path, 'w') as f:
40        fcntl.lockf(f.fileno(), fcntl.LOCK_EX | fcntl.LOCK_NB)
41        try:
42            yield path
43        finally:
44            os.unlink(path)
45
46
47def leases_iter(jobdir):
48    """Yield Lease instances from jobdir.
49
50    @param jobdir: job lease file directory
51    @returns: iterator of Leases
52    """
53    for entry in scandir(jobdir):
54        if _is_lease_entry(entry):
55            yield Lease(entry)
56
57
58class Lease(object):
59    "Represents a job lease."
60
61    # Seconds after a lease file's mtime where its owning process is not
62    # considered dead.
63    _FRESH_LIMIT = 5
64
65    def __init__(self, entry):
66        """Initialize instance.
67
68        @param entry: scandir.DirEntry instance
69        """
70        self._entry = entry
71
72    @property
73    def id(self):
74        """Return id of leased job."""
75        return int(self._entry.name)
76
77    def expired(self):
78        """Return True if the lease is expired.
79
80        A lease is considered expired if there is no fcntl lock on it
81        and the grace period for the owning process to obtain the lock
82        has passed.  The lease is not considered expired if the owning
83        process removed the lock file normally, as an expired lease
84        indicates that some error has occurred and clean up operations
85        are needed.
86        """
87        try:
88            stat_result = self._entry.stat()
89        except OSError as e:  # pragma: no cover
90            if e.errno == errno.ENOENT:
91                return False
92            raise
93        mtime = stat_result.st_mtime_ns / (10 ** 9)
94        if time.time() - mtime < self._FRESH_LIMIT:
95            return False
96        return not _fcntl_locked(self._entry.path)
97
98    def cleanup(self):
99        """Remove the lease file.
100
101        This does not need to be called normally, as the owning process
102        should clean up its files.
103        """
104        try:
105            os.unlink(self._entry.path)
106        except OSError as e:
107            logger.warning('Error removing %s: %s', self._entry.path, e)
108        try:
109            os.unlink(self._sock_path)
110        except OSError as e:
111            # This is fine; it means that job_reporter crashed, but
112            # lucifer was able to run its cleanup.
113            logger.debug('Error removing %s: %s', self._sock_path, e)
114
115    def abort(self):
116        """Abort the job.
117
118        This sends a datagram to the abort socket associated with the
119        lease.
120
121        If the socket is closed, either the connect() call or the send()
122        call will raise socket.error with ECONNREFUSED.
123        """
124        sock = socket.socket(socket.AF_UNIX, socket.SOCK_DGRAM)
125        sock.setblocking(0)
126        logger.debug('Connecting to abort socket %s', self._sock_path)
127        sock.connect(self._sock_path)
128        logger.debug('Sending abort to %s', self._sock_path)
129        # The value sent does not matter.
130        sent = sock.send('abort')
131        # TODO(ayatane): I don't know if it is possible for sent to be 0
132        assert sent > 0
133
134    def maybe_abort(self):
135        """Abort the job, ignoring errors."""
136        try:
137            self.abort()
138        except socket.error as e:
139            logger.debug('Error aborting socket: %s', e)
140
141    @property
142    def _sock_path(self):
143        """Return the path of the abort socket corresponding to the lease."""
144        return self._entry.path + ".sock"
145
146
147def _is_lease_entry(entry):
148    """Return True if the DirEntry is for a lease."""
149    return entry.name.isdigit()
150
151
152def _fcntl_locked(path):
153    """Return True if a file is fcntl locked.
154
155    @param path: path to file
156    """
157    try:
158        fd = os.open(path, os.O_WRONLY)
159    except (IOError, OSError):
160        return False
161    try:
162        fcntl.lockf(fd, fcntl.LOCK_EX | fcntl.LOCK_NB)
163    except IOError:
164        return True
165    else:
166        return False
167    finally:
168        os.close(fd)
169