1# Copyright 2012 the V8 project authors. All rights reserved. 2# Redistribution and use in source and binary forms, with or without 3# modification, are permitted provided that the following conditions are 4# met: 5# 6# * Redistributions of source code must retain the above copyright 7# notice, this list of conditions and the following disclaimer. 8# * Redistributions in binary form must reproduce the above 9# copyright notice, this list of conditions and the following 10# disclaimer in the documentation and/or other materials provided 11# with the distribution. 12# * Neither the name of Google Inc. nor the names of its 13# contributors may be used to endorse or promote products derived 14# from this software without specific prior written permission. 15# 16# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS 17# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT 18# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR 19# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT 20# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, 21# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT 22# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, 23# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY 24# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT 25# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE 26# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. 27 28 29import os 30import socket 31import subprocess 32import threading 33import time 34 35from . import distro 36from ..local import execution 37from ..local import perfdata 38from ..objects import peer 39from ..objects import workpacket 40from ..server import compression 41from ..server import constants 42from ..server import local_handler 43from ..server import signatures 44 45 46def GetPeers(): 47 data = local_handler.LocalQuery([constants.REQUEST_PEERS]) 48 if not data: return [] 49 return [ peer.Peer.Unpack(p) for p in data ] 50 51 52class NetworkedRunner(execution.Runner): 53 def __init__(self, suites, progress_indicator, context, peers, workspace): 54 self.suites = suites 55 datapath = os.path.join("out", "testrunner_data") 56 # TODO(machenbach): These fields should exist now in the superclass. 57 # But there is no super constructor call. Check if this is a problem. 58 self.perf_data_manager = perfdata.PerfDataManager(datapath) 59 self.perfdata = self.perf_data_manager.GetStore(context.arch, context.mode) 60 for s in suites: 61 for t in s.tests: 62 t.duration = self.perfdata.FetchPerfData(t) or 1.0 63 self._CommonInit(suites, progress_indicator, context) 64 self.tests = [] # Only used if we need to fall back to local execution. 65 self.tests_lock = threading.Lock() 66 self.peers = peers 67 self.pubkey_fingerprint = None # Fetched later. 68 self.base_rev = subprocess.check_output( 69 "cd %s; git log -1 --format=%%H --grep=git-svn-id" % workspace, 70 shell=True).strip() 71 self.base_svn_rev = subprocess.check_output( 72 "cd %s; git log -1 %s" # Get commit description. 73 " | grep -e '^\s*git-svn-id:'" # Extract "git-svn-id" line. 74 " | awk '{print $2}'" # Extract "repository@revision" part. 75 " | sed -e 's/.*@//'" % # Strip away "repository@". 76 (workspace, self.base_rev), shell=True).strip() 77 self.patch = subprocess.check_output( 78 "cd %s; git diff %s" % (workspace, self.base_rev), shell=True) 79 self.binaries = {} 80 self.initialization_lock = threading.Lock() 81 self.initialization_lock.acquire() # Released when init is done. 82 self._OpenLocalConnection() 83 self.local_receiver_thread = threading.Thread( 84 target=self._ListenLocalConnection) 85 self.local_receiver_thread.daemon = True 86 self.local_receiver_thread.start() 87 self.initialization_lock.acquire() 88 self.initialization_lock.release() 89 90 def _OpenLocalConnection(self): 91 self.local_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM) 92 code = self.local_socket.connect_ex(("localhost", constants.CLIENT_PORT)) 93 if code != 0: 94 raise RuntimeError("Failed to connect to local server") 95 compression.Send([constants.REQUEST_PUBKEY_FINGERPRINT], self.local_socket) 96 97 def _ListenLocalConnection(self): 98 release_lock_countdown = 1 # Pubkey. 99 self.local_receiver = compression.Receiver(self.local_socket) 100 while not self.local_receiver.IsDone(): 101 data = self.local_receiver.Current() 102 if data[0] == constants.REQUEST_PUBKEY_FINGERPRINT: 103 pubkey = data[1] 104 if not pubkey: raise RuntimeError("Received empty public key") 105 self.pubkey_fingerprint = pubkey 106 release_lock_countdown -= 1 107 if release_lock_countdown == 0: 108 self.initialization_lock.release() 109 release_lock_countdown -= 1 # Prevent repeated triggering. 110 self.local_receiver.Advance() 111 112 def Run(self, jobs): 113 self.indicator.Starting() 114 need_libv8 = False 115 for s in self.suites: 116 shell = s.shell() 117 if shell not in self.binaries: 118 path = os.path.join(self.context.shell_dir, shell) 119 # Check if this is a shared library build. 120 try: 121 ldd = subprocess.check_output("ldd %s | grep libv8\\.so" % (path), 122 shell=True) 123 ldd = ldd.strip().split(" ") 124 assert ldd[0] == "libv8.so" 125 assert ldd[1] == "=>" 126 need_libv8 = True 127 binary_needs_libv8 = True 128 libv8 = signatures.ReadFileAndSignature(ldd[2]) 129 except: 130 binary_needs_libv8 = False 131 binary = signatures.ReadFileAndSignature(path) 132 if binary[0] is None: 133 print("Error: Failed to create signature.") 134 assert binary[1] != 0 135 return binary[1] 136 binary.append(binary_needs_libv8) 137 self.binaries[shell] = binary 138 if need_libv8: 139 self.binaries["libv8.so"] = libv8 140 distro.Assign(self.suites, self.peers) 141 # Spawn one thread for each peer. 142 threads = [] 143 for p in self.peers: 144 thread = threading.Thread(target=self._TalkToPeer, args=[p]) 145 threads.append(thread) 146 thread.start() 147 try: 148 for thread in threads: 149 # Use a timeout so that signals (Ctrl+C) will be processed. 150 thread.join(timeout=10000000) 151 self._AnalyzePeerRuntimes() 152 except KeyboardInterrupt: 153 self.terminate = True 154 raise 155 except Exception, _e: 156 # If there's an exception we schedule an interruption for any 157 # remaining threads... 158 self.terminate = True 159 # ...and then reraise the exception to bail out. 160 raise 161 compression.Send(constants.END_OF_STREAM, self.local_socket) 162 self.local_socket.close() 163 if self.tests: 164 self._RunInternal(jobs) 165 self.indicator.Done() 166 return not self.failed 167 168 def _TalkToPeer(self, peer): 169 sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) 170 sock.settimeout(self.context.timeout + 10) 171 code = sock.connect_ex((peer.address, constants.PEER_PORT)) 172 if code == 0: 173 try: 174 peer.runtime = None 175 start_time = time.time() 176 packet = workpacket.WorkPacket(peer=peer, context=self.context, 177 base_revision=self.base_svn_rev, 178 patch=self.patch, 179 pubkey=self.pubkey_fingerprint) 180 data, test_map = packet.Pack(self.binaries) 181 compression.Send(data, sock) 182 compression.Send(constants.END_OF_STREAM, sock) 183 rec = compression.Receiver(sock) 184 while not rec.IsDone() and not self.terminate: 185 data_list = rec.Current() 186 for data in data_list: 187 test_id = data[0] 188 if test_id < 0: 189 # The peer is reporting an error. 190 with self.lock: 191 print("\nPeer %s reports error: %s" % (peer.address, data[1])) 192 continue 193 test = test_map.pop(test_id) 194 test.MergeResult(data) 195 try: 196 self.perfdata.UpdatePerfData(test) 197 except Exception, e: 198 print("UpdatePerfData exception: %s" % e) 199 pass # Just keep working. 200 with self.lock: 201 perf_key = self.perfdata.GetKey(test) 202 compression.Send( 203 [constants.INFORM_DURATION, perf_key, test.duration, 204 self.context.arch, self.context.mode], 205 self.local_socket) 206 has_unexpected_output = test.suite.HasUnexpectedOutput(test) 207 if has_unexpected_output: 208 self.failed.append(test) 209 if test.output.HasCrashed(): 210 self.crashed += 1 211 else: 212 self.succeeded += 1 213 self.remaining -= 1 214 self.indicator.HasRun(test, has_unexpected_output) 215 rec.Advance() 216 peer.runtime = time.time() - start_time 217 except KeyboardInterrupt: 218 sock.close() 219 raise 220 except Exception, e: 221 print("Got exception: %s" % e) 222 pass # Fall back to local execution. 223 else: 224 compression.Send([constants.UNRESPONSIVE_PEER, peer.address], 225 self.local_socket) 226 sock.close() 227 if len(test_map) > 0: 228 # Some tests have not received any results. Run them locally. 229 print("\nNo results for %d tests, running them locally." % len(test_map)) 230 self._EnqueueLocally(test_map) 231 232 def _EnqueueLocally(self, test_map): 233 with self.tests_lock: 234 for test in test_map: 235 self.tests.append(test_map[test]) 236 237 def _AnalyzePeerRuntimes(self): 238 total_runtime = 0.0 239 total_work = 0.0 240 for p in self.peers: 241 if p.runtime is None: 242 return 243 total_runtime += p.runtime 244 total_work += p.assigned_work 245 for p in self.peers: 246 p.assigned_work /= total_work 247 p.runtime /= total_runtime 248 perf_correction = p.assigned_work / p.runtime 249 old_perf = p.relative_performance 250 p.relative_performance = (old_perf + perf_correction) / 2.0 251 compression.Send([constants.UPDATE_PERF, p.address, 252 p.relative_performance], 253 self.local_socket) 254