1# Copyright 2012 the V8 project authors. All rights reserved. 2# Redistribution and use in source and binary forms, with or without 3# modification, are permitted provided that the following conditions are 4# met: 5# 6# * Redistributions of source code must retain the above copyright 7# notice, this list of conditions and the following disclaimer. 8# * Redistributions in binary form must reproduce the above 9# copyright notice, this list of conditions and the following 10# disclaimer in the documentation and/or other materials provided 11# with the distribution. 12# * Neither the name of Google Inc. nor the names of its 13# contributors may be used to endorse or promote products derived 14# from this software without specific prior written permission. 15# 16# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS 17# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT 18# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR 19# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT 20# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, 21# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT 22# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, 23# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY 24# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT 25# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE 26# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. 27 28 29import os 30import socket 31import subprocess 32import threading 33import time 34 35from . import distro 36from ..local import execution 37from ..local import perfdata 38from ..objects import peer 39from ..objects import workpacket 40from ..server import compression 41from ..server import constants 42from ..server import local_handler 43from ..server import signatures 44 45 46def GetPeers(): 47 data = local_handler.LocalQuery([constants.REQUEST_PEERS]) 48 if not data: return [] 49 return [ peer.Peer.Unpack(p) for p in data ] 50 51 52class NetworkedRunner(execution.Runner): 53 def __init__(self, suites, progress_indicator, context, peers, workspace): 54 self.suites = suites 55 num_tests = 0 56 datapath = os.path.join("out", "testrunner_data") 57 # TODO(machenbach): These fields should exist now in the superclass. 58 # But there is no super constructor call. Check if this is a problem. 59 self.perf_data_manager = perfdata.PerfDataManager(datapath) 60 self.perfdata = self.perf_data_manager.GetStore(context.arch, context.mode) 61 for s in suites: 62 for t in s.tests: 63 t.duration = self.perfdata.FetchPerfData(t) or 1.0 64 num_tests += len(s.tests) 65 self._CommonInit(num_tests, progress_indicator, context) 66 self.tests = [] # Only used if we need to fall back to local execution. 67 self.tests_lock = threading.Lock() 68 self.peers = peers 69 self.pubkey_fingerprint = None # Fetched later. 70 self.base_rev = subprocess.check_output( 71 "cd %s; git log -1 --format=%%H --grep=git-svn-id" % workspace, 72 shell=True).strip() 73 self.base_svn_rev = subprocess.check_output( 74 "cd %s; git log -1 %s" # Get commit description. 75 " | grep -e '^\s*git-svn-id:'" # Extract "git-svn-id" line. 76 " | awk '{print $2}'" # Extract "repository@revision" part. 77 " | sed -e 's/.*@//'" % # Strip away "repository@". 78 (workspace, self.base_rev), shell=True).strip() 79 self.patch = subprocess.check_output( 80 "cd %s; git diff %s" % (workspace, self.base_rev), shell=True) 81 self.binaries = {} 82 self.initialization_lock = threading.Lock() 83 self.initialization_lock.acquire() # Released when init is done. 84 self._OpenLocalConnection() 85 self.local_receiver_thread = threading.Thread( 86 target=self._ListenLocalConnection) 87 self.local_receiver_thread.daemon = True 88 self.local_receiver_thread.start() 89 self.initialization_lock.acquire() 90 self.initialization_lock.release() 91 92 def _OpenLocalConnection(self): 93 self.local_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM) 94 code = self.local_socket.connect_ex(("localhost", constants.CLIENT_PORT)) 95 if code != 0: 96 raise RuntimeError("Failed to connect to local server") 97 compression.Send([constants.REQUEST_PUBKEY_FINGERPRINT], self.local_socket) 98 99 def _ListenLocalConnection(self): 100 release_lock_countdown = 1 # Pubkey. 101 self.local_receiver = compression.Receiver(self.local_socket) 102 while not self.local_receiver.IsDone(): 103 data = self.local_receiver.Current() 104 if data[0] == constants.REQUEST_PUBKEY_FINGERPRINT: 105 pubkey = data[1] 106 if not pubkey: raise RuntimeError("Received empty public key") 107 self.pubkey_fingerprint = pubkey 108 release_lock_countdown -= 1 109 if release_lock_countdown == 0: 110 self.initialization_lock.release() 111 release_lock_countdown -= 1 # Prevent repeated triggering. 112 self.local_receiver.Advance() 113 114 def Run(self, jobs): 115 self.indicator.Starting() 116 need_libv8 = False 117 for s in self.suites: 118 shell = s.shell() 119 if shell not in self.binaries: 120 path = os.path.join(self.context.shell_dir, shell) 121 # Check if this is a shared library build. 122 try: 123 ldd = subprocess.check_output("ldd %s | grep libv8\\.so" % (path), 124 shell=True) 125 ldd = ldd.strip().split(" ") 126 assert ldd[0] == "libv8.so" 127 assert ldd[1] == "=>" 128 need_libv8 = True 129 binary_needs_libv8 = True 130 libv8 = signatures.ReadFileAndSignature(ldd[2]) 131 except: 132 binary_needs_libv8 = False 133 binary = signatures.ReadFileAndSignature(path) 134 if binary[0] is None: 135 print("Error: Failed to create signature.") 136 assert binary[1] != 0 137 return binary[1] 138 binary.append(binary_needs_libv8) 139 self.binaries[shell] = binary 140 if need_libv8: 141 self.binaries["libv8.so"] = libv8 142 distro.Assign(self.suites, self.peers) 143 # Spawn one thread for each peer. 144 threads = [] 145 for p in self.peers: 146 thread = threading.Thread(target=self._TalkToPeer, args=[p]) 147 threads.append(thread) 148 thread.start() 149 try: 150 for thread in threads: 151 # Use a timeout so that signals (Ctrl+C) will be processed. 152 thread.join(timeout=10000000) 153 self._AnalyzePeerRuntimes() 154 except KeyboardInterrupt: 155 self.terminate = True 156 raise 157 except Exception, _e: 158 # If there's an exception we schedule an interruption for any 159 # remaining threads... 160 self.terminate = True 161 # ...and then reraise the exception to bail out. 162 raise 163 compression.Send(constants.END_OF_STREAM, self.local_socket) 164 self.local_socket.close() 165 if self.tests: 166 self._RunInternal(jobs) 167 self.indicator.Done() 168 return not self.failed 169 170 def _TalkToPeer(self, peer): 171 sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) 172 sock.settimeout(self.context.timeout + 10) 173 code = sock.connect_ex((peer.address, constants.PEER_PORT)) 174 if code == 0: 175 try: 176 peer.runtime = None 177 start_time = time.time() 178 packet = workpacket.WorkPacket(peer=peer, context=self.context, 179 base_revision=self.base_svn_rev, 180 patch=self.patch, 181 pubkey=self.pubkey_fingerprint) 182 data, test_map = packet.Pack(self.binaries) 183 compression.Send(data, sock) 184 compression.Send(constants.END_OF_STREAM, sock) 185 rec = compression.Receiver(sock) 186 while not rec.IsDone() and not self.terminate: 187 data_list = rec.Current() 188 for data in data_list: 189 test_id = data[0] 190 if test_id < 0: 191 # The peer is reporting an error. 192 with self.lock: 193 print("\nPeer %s reports error: %s" % (peer.address, data[1])) 194 continue 195 test = test_map.pop(test_id) 196 test.MergeResult(data) 197 try: 198 self.perfdata.UpdatePerfData(test) 199 except Exception, e: 200 print("UpdatePerfData exception: %s" % e) 201 pass # Just keep working. 202 with self.lock: 203 perf_key = self.perfdata.GetKey(test) 204 compression.Send( 205 [constants.INFORM_DURATION, perf_key, test.duration, 206 self.context.arch, self.context.mode], 207 self.local_socket) 208 self.indicator.AboutToRun(test) 209 has_unexpected_output = test.suite.HasUnexpectedOutput(test) 210 if has_unexpected_output: 211 self.failed.append(test) 212 if test.output.HasCrashed(): 213 self.crashed += 1 214 else: 215 self.succeeded += 1 216 self.remaining -= 1 217 self.indicator.HasRun(test, has_unexpected_output) 218 rec.Advance() 219 peer.runtime = time.time() - start_time 220 except KeyboardInterrupt: 221 sock.close() 222 raise 223 except Exception, e: 224 print("Got exception: %s" % e) 225 pass # Fall back to local execution. 226 else: 227 compression.Send([constants.UNRESPONSIVE_PEER, peer.address], 228 self.local_socket) 229 sock.close() 230 if len(test_map) > 0: 231 # Some tests have not received any results. Run them locally. 232 print("\nNo results for %d tests, running them locally." % len(test_map)) 233 self._EnqueueLocally(test_map) 234 235 def _EnqueueLocally(self, test_map): 236 with self.tests_lock: 237 for test in test_map: 238 self.tests.append(test_map[test]) 239 240 def _AnalyzePeerRuntimes(self): 241 total_runtime = 0.0 242 total_work = 0.0 243 for p in self.peers: 244 if p.runtime is None: 245 return 246 total_runtime += p.runtime 247 total_work += p.assigned_work 248 for p in self.peers: 249 p.assigned_work /= total_work 250 p.runtime /= total_runtime 251 perf_correction = p.assigned_work / p.runtime 252 old_perf = p.relative_performance 253 p.relative_performance = (old_perf + perf_correction) / 2.0 254 compression.Send([constants.UPDATE_PERF, p.address, 255 p.relative_performance], 256 self.local_socket) 257