1# Copyright 2012 the V8 project authors. All rights reserved.
2# Redistribution and use in source and binary forms, with or without
3# modification, are permitted provided that the following conditions are
4# met:
5#
6#     * Redistributions of source code must retain the above copyright
7#       notice, this list of conditions and the following disclaimer.
8#     * Redistributions in binary form must reproduce the above
9#       copyright notice, this list of conditions and the following
10#       disclaimer in the documentation and/or other materials provided
11#       with the distribution.
12#     * Neither the name of Google Inc. nor the names of its
13#       contributors may be used to endorse or promote products derived
14#       from this software without specific prior written permission.
15#
16# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
17# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
18# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
19# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
20# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
21# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
22# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
23# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
24# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
25# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
26# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
27
28
29import os
30import socket
31import subprocess
32import threading
33import time
34
35from . import distro
36from ..local import execution
37from ..local import perfdata
38from ..objects import peer
39from ..objects import workpacket
40from ..server import compression
41from ..server import constants
42from ..server import local_handler
43from ..server import signatures
44
45
46def GetPeers():
47  data = local_handler.LocalQuery([constants.REQUEST_PEERS])
48  if not data: return []
49  return [ peer.Peer.Unpack(p) for p in data ]
50
51
52class NetworkedRunner(execution.Runner):
53  def __init__(self, suites, progress_indicator, context, peers, workspace):
54    self.suites = suites
55    num_tests = 0
56    datapath = os.path.join("out", "testrunner_data")
57    # TODO(machenbach): These fields should exist now in the superclass.
58    # But there is no super constructor call. Check if this is a problem.
59    self.perf_data_manager = perfdata.PerfDataManager(datapath)
60    self.perfdata = self.perf_data_manager.GetStore(context.arch, context.mode)
61    for s in suites:
62      for t in s.tests:
63        t.duration = self.perfdata.FetchPerfData(t) or 1.0
64      num_tests += len(s.tests)
65    self._CommonInit(num_tests, progress_indicator, context)
66    self.tests = []  # Only used if we need to fall back to local execution.
67    self.tests_lock = threading.Lock()
68    self.peers = peers
69    self.pubkey_fingerprint = None  # Fetched later.
70    self.base_rev = subprocess.check_output(
71        "cd %s; git log -1 --format=%%H --grep=git-svn-id" % workspace,
72        shell=True).strip()
73    self.base_svn_rev = subprocess.check_output(
74        "cd %s; git log -1 %s"          # Get commit description.
75        " | grep -e '^\s*git-svn-id:'"  # Extract "git-svn-id" line.
76        " | awk '{print $2}'"           # Extract "repository@revision" part.
77        " | sed -e 's/.*@//'" %         # Strip away "repository@".
78        (workspace, self.base_rev), shell=True).strip()
79    self.patch = subprocess.check_output(
80        "cd %s; git diff %s" % (workspace, self.base_rev), shell=True)
81    self.binaries = {}
82    self.initialization_lock = threading.Lock()
83    self.initialization_lock.acquire()  # Released when init is done.
84    self._OpenLocalConnection()
85    self.local_receiver_thread = threading.Thread(
86        target=self._ListenLocalConnection)
87    self.local_receiver_thread.daemon = True
88    self.local_receiver_thread.start()
89    self.initialization_lock.acquire()
90    self.initialization_lock.release()
91
92  def _OpenLocalConnection(self):
93    self.local_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
94    code = self.local_socket.connect_ex(("localhost", constants.CLIENT_PORT))
95    if code != 0:
96      raise RuntimeError("Failed to connect to local server")
97    compression.Send([constants.REQUEST_PUBKEY_FINGERPRINT], self.local_socket)
98
99  def _ListenLocalConnection(self):
100    release_lock_countdown = 1  # Pubkey.
101    self.local_receiver = compression.Receiver(self.local_socket)
102    while not self.local_receiver.IsDone():
103      data = self.local_receiver.Current()
104      if data[0] == constants.REQUEST_PUBKEY_FINGERPRINT:
105        pubkey = data[1]
106        if not pubkey: raise RuntimeError("Received empty public key")
107        self.pubkey_fingerprint = pubkey
108        release_lock_countdown -= 1
109      if release_lock_countdown == 0:
110        self.initialization_lock.release()
111        release_lock_countdown -= 1  # Prevent repeated triggering.
112      self.local_receiver.Advance()
113
114  def Run(self, jobs):
115    self.indicator.Starting()
116    need_libv8 = False
117    for s in self.suites:
118      shell = s.shell()
119      if shell not in self.binaries:
120        path = os.path.join(self.context.shell_dir, shell)
121        # Check if this is a shared library build.
122        try:
123          ldd = subprocess.check_output("ldd %s | grep libv8\\.so" % (path),
124                                        shell=True)
125          ldd = ldd.strip().split(" ")
126          assert ldd[0] == "libv8.so"
127          assert ldd[1] == "=>"
128          need_libv8 = True
129          binary_needs_libv8 = True
130          libv8 = signatures.ReadFileAndSignature(ldd[2])
131        except:
132          binary_needs_libv8 = False
133        binary = signatures.ReadFileAndSignature(path)
134        if binary[0] is None:
135          print("Error: Failed to create signature.")
136          assert binary[1] != 0
137          return binary[1]
138        binary.append(binary_needs_libv8)
139        self.binaries[shell] = binary
140    if need_libv8:
141      self.binaries["libv8.so"] = libv8
142    distro.Assign(self.suites, self.peers)
143    # Spawn one thread for each peer.
144    threads = []
145    for p in self.peers:
146      thread = threading.Thread(target=self._TalkToPeer, args=[p])
147      threads.append(thread)
148      thread.start()
149    try:
150      for thread in threads:
151        # Use a timeout so that signals (Ctrl+C) will be processed.
152        thread.join(timeout=10000000)
153      self._AnalyzePeerRuntimes()
154    except KeyboardInterrupt:
155      self.terminate = True
156      raise
157    except Exception, _e:
158      # If there's an exception we schedule an interruption for any
159      # remaining threads...
160      self.terminate = True
161      # ...and then reraise the exception to bail out.
162      raise
163    compression.Send(constants.END_OF_STREAM, self.local_socket)
164    self.local_socket.close()
165    if self.tests:
166      self._RunInternal(jobs)
167    self.indicator.Done()
168    return not self.failed
169
170  def _TalkToPeer(self, peer):
171    sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
172    sock.settimeout(self.context.timeout + 10)
173    code = sock.connect_ex((peer.address, constants.PEER_PORT))
174    if code == 0:
175      try:
176        peer.runtime = None
177        start_time = time.time()
178        packet = workpacket.WorkPacket(peer=peer, context=self.context,
179                                       base_revision=self.base_svn_rev,
180                                       patch=self.patch,
181                                       pubkey=self.pubkey_fingerprint)
182        data, test_map = packet.Pack(self.binaries)
183        compression.Send(data, sock)
184        compression.Send(constants.END_OF_STREAM, sock)
185        rec = compression.Receiver(sock)
186        while not rec.IsDone() and not self.terminate:
187          data_list = rec.Current()
188          for data in data_list:
189            test_id = data[0]
190            if test_id < 0:
191              # The peer is reporting an error.
192              with self.lock:
193                print("\nPeer %s reports error: %s" % (peer.address, data[1]))
194              continue
195            test = test_map.pop(test_id)
196            test.MergeResult(data)
197            try:
198              self.perfdata.UpdatePerfData(test)
199            except Exception, e:
200              print("UpdatePerfData exception: %s" % e)
201              pass  # Just keep working.
202            with self.lock:
203              perf_key = self.perfdata.GetKey(test)
204              compression.Send(
205                  [constants.INFORM_DURATION, perf_key, test.duration,
206                   self.context.arch, self.context.mode],
207                  self.local_socket)
208              self.indicator.AboutToRun(test)
209              has_unexpected_output = test.suite.HasUnexpectedOutput(test)
210              if has_unexpected_output:
211                self.failed.append(test)
212                if test.output.HasCrashed():
213                  self.crashed += 1
214              else:
215                self.succeeded += 1
216              self.remaining -= 1
217              self.indicator.HasRun(test, has_unexpected_output)
218          rec.Advance()
219        peer.runtime = time.time() - start_time
220      except KeyboardInterrupt:
221        sock.close()
222        raise
223      except Exception, e:
224        print("Got exception: %s" % e)
225        pass  # Fall back to local execution.
226    else:
227      compression.Send([constants.UNRESPONSIVE_PEER, peer.address],
228                       self.local_socket)
229    sock.close()
230    if len(test_map) > 0:
231      # Some tests have not received any results. Run them locally.
232      print("\nNo results for %d tests, running them locally." % len(test_map))
233      self._EnqueueLocally(test_map)
234
235  def _EnqueueLocally(self, test_map):
236    with self.tests_lock:
237      for test in test_map:
238        self.tests.append(test_map[test])
239
240  def _AnalyzePeerRuntimes(self):
241    total_runtime = 0.0
242    total_work = 0.0
243    for p in self.peers:
244      if p.runtime is None:
245        return
246      total_runtime += p.runtime
247      total_work += p.assigned_work
248    for p in self.peers:
249      p.assigned_work /= total_work
250      p.runtime /= total_runtime
251      perf_correction = p.assigned_work / p.runtime
252      old_perf = p.relative_performance
253      p.relative_performance = (old_perf + perf_correction) / 2.0
254      compression.Send([constants.UPDATE_PERF, p.address,
255                        p.relative_performance],
256                       self.local_socket)
257