1 /*-------------------------------------------------------------------------
2 * drawElements Quality Program Execution Server
3 * ---------------------------------------------
4 *
5 * Copyright 2014 The Android Open Source Project
6 *
7 * Licensed under the Apache License, Version 2.0 (the "License");
8 * you may not use this file except in compliance with the License.
9 * You may obtain a copy of the License at
10 *
11 * http://www.apache.org/licenses/LICENSE-2.0
12 *
13 * Unless required by applicable law or agreed to in writing, software
14 * distributed under the License is distributed on an "AS IS" BASIS,
15 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16 * See the License for the specific language governing permissions and
17 * limitations under the License.
18 *
19 *//*!
20 * \file
21 * \brief Test Execution Server.
22 *//*--------------------------------------------------------------------*/
23
24 #include "xsExecutionServer.hpp"
25 #include "deClock.h"
26
27 #include <cstdio>
28
29 using std::vector;
30 using std::string;
31
32 #if 1
33 # define DBG_PRINT(X) printf X
34 #else
35 # define DBG_PRINT(X)
36 #endif
37
38 namespace xs
39 {
40
isComplete(void) const41 inline bool MessageBuilder::isComplete (void) const
42 {
43 if (m_buffer.size() < MESSAGE_HEADER_SIZE)
44 return false;
45 else
46 return (int)m_buffer.size() == getMessageSize();
47 }
48
getMessageData(void) const49 const deUint8* MessageBuilder::getMessageData (void) const
50 {
51 return m_buffer.size() > MESSAGE_HEADER_SIZE ? &m_buffer[MESSAGE_HEADER_SIZE] : DE_NULL;
52 }
53
getMessageDataSize(void) const54 int MessageBuilder::getMessageDataSize (void) const
55 {
56 DE_ASSERT(isComplete());
57 return (int)m_buffer.size() - MESSAGE_HEADER_SIZE;
58 }
59
read(ByteBuffer & src)60 void MessageBuilder::read (ByteBuffer& src)
61 {
62 // Try to get header.
63 if (m_buffer.size() < MESSAGE_HEADER_SIZE)
64 {
65 while (m_buffer.size() < MESSAGE_HEADER_SIZE &&
66 src.getNumElements() > 0)
67 m_buffer.push_back(src.popBack());
68
69 DE_ASSERT(m_buffer.size() <= MESSAGE_HEADER_SIZE);
70
71 if (m_buffer.size() == MESSAGE_HEADER_SIZE)
72 {
73 // Got whole header, parse it.
74 Message::parseHeader(&m_buffer[0], (int)m_buffer.size(), m_messageType, m_messageSize);
75 }
76 }
77
78 if (m_buffer.size() >= MESSAGE_HEADER_SIZE)
79 {
80 // We have header.
81 int msgSize = getMessageSize();
82 int numBytesLeft = msgSize - (int)m_buffer.size();
83 int numToRead = de::min(src.getNumElements(), numBytesLeft);
84
85 if (numToRead > 0)
86 {
87 int curBufPos = (int)m_buffer.size();
88 m_buffer.resize(curBufPos+numToRead);
89 src.popBack(&m_buffer[curBufPos], numToRead);
90 }
91 }
92 }
93
clear(void)94 void MessageBuilder::clear (void)
95 {
96 m_buffer.clear();
97 m_messageType = MESSAGETYPE_NONE;
98 m_messageSize = 0;
99 }
100
ExecutionServer(xs::TestProcess * testProcess,deSocketFamily family,int port,RunMode runMode)101 ExecutionServer::ExecutionServer (xs::TestProcess* testProcess, deSocketFamily family, int port, RunMode runMode)
102 : TcpServer (family, port)
103 , m_testDriver (testProcess)
104 , m_runMode (runMode)
105 {
106 }
107
~ExecutionServer(void)108 ExecutionServer::~ExecutionServer (void)
109 {
110 }
111
acquireTestDriver(void)112 TestDriver* ExecutionServer::acquireTestDriver (void)
113 {
114 if (!m_testDriverLock.tryLock())
115 throw Error("Failed to acquire test driver");
116
117 return &m_testDriver;
118 }
119
releaseTestDriver(TestDriver * driver)120 void ExecutionServer::releaseTestDriver (TestDriver* driver)
121 {
122 DE_ASSERT(&m_testDriver == driver);
123 DE_UNREF(driver);
124 m_testDriverLock.unlock();
125 }
126
createHandler(de::Socket * socket,const de::SocketAddress & clientAddress)127 ConnectionHandler* ExecutionServer::createHandler (de::Socket* socket, const de::SocketAddress& clientAddress)
128 {
129 printf("ExecutionServer: New connection from %s:%d\n", clientAddress.getHost(), clientAddress.getPort());
130 return new ExecutionRequestHandler(this, socket);
131 }
132
connectionDone(ConnectionHandler * handler)133 void ExecutionServer::connectionDone (ConnectionHandler* handler)
134 {
135 if (m_runMode == RUNMODE_SINGLE_EXEC)
136 m_socket.close();
137
138 TcpServer::connectionDone(handler);
139 }
140
ExecutionRequestHandler(ExecutionServer * server,de::Socket * socket)141 ExecutionRequestHandler::ExecutionRequestHandler (ExecutionServer* server, de::Socket* socket)
142 : ConnectionHandler (server, socket)
143 , m_execServer (server)
144 , m_testDriver (DE_NULL)
145 , m_bufferIn (RECV_BUFFER_SIZE)
146 , m_bufferOut (SEND_BUFFER_SIZE)
147 , m_run (false)
148 , m_sendRecvTmpBuf (SEND_RECV_TMP_BUFFER_SIZE)
149 {
150 // Set flags.
151 m_socket->setFlags(DE_SOCKET_NONBLOCKING|DE_SOCKET_KEEPALIVE|DE_SOCKET_CLOSE_ON_EXEC);
152
153 // Init protocol keepalives.
154 initKeepAlives();
155 }
156
~ExecutionRequestHandler(void)157 ExecutionRequestHandler::~ExecutionRequestHandler (void)
158 {
159 if (m_testDriver)
160 m_execServer->releaseTestDriver(m_testDriver);
161 }
162
handle(void)163 void ExecutionRequestHandler::handle (void)
164 {
165 DBG_PRINT(("ExecutionRequestHandler::handle()\n"));
166
167 try
168 {
169 // Process execution session.
170 processSession();
171 }
172 catch (const std::exception& e)
173 {
174 printf("ExecutionRequestHandler::run(): %s\n", e.what());
175 }
176
177 DBG_PRINT(("ExecutionRequestHandler::handle(): Done!\n"));
178
179 // Release test driver.
180 if (m_testDriver)
181 {
182 try
183 {
184 m_testDriver->reset();
185 }
186 catch (...)
187 {
188 }
189 m_execServer->releaseTestDriver(m_testDriver);
190 m_testDriver = DE_NULL;
191 }
192
193 // Close connection.
194 if (m_socket->isConnected())
195 m_socket->shutdown();
196 }
197
acquireTestDriver(void)198 void ExecutionRequestHandler::acquireTestDriver (void)
199 {
200 DE_ASSERT(!m_testDriver);
201
202 // Try to acquire test driver - may fail.
203 m_testDriver = m_execServer->acquireTestDriver();
204 DE_ASSERT(m_testDriver);
205 m_testDriver->reset();
206
207 }
208
processSession(void)209 void ExecutionRequestHandler::processSession (void)
210 {
211 m_run = true;
212
213 deUint64 lastIoTime = deGetMicroseconds();
214
215 while (m_run)
216 {
217 bool anyIO = false;
218
219 // Read from socket to buffer.
220 anyIO = receive() || anyIO;
221
222 // Send bytes in buffer.
223 anyIO = send() || anyIO;
224
225 // Process incoming data.
226 if (m_bufferIn.getNumElements() > 0)
227 {
228 DE_ASSERT(!m_msgBuilder.isComplete());
229 m_msgBuilder.read(m_bufferIn);
230 }
231
232 if (m_msgBuilder.isComplete())
233 {
234 // Process message.
235 processMessage(m_msgBuilder.getMessageType(), m_msgBuilder.getMessageData(), m_msgBuilder.getMessageDataSize());
236
237 m_msgBuilder.clear();
238 }
239
240 // Keepalives, anyone?
241 pollKeepAlives();
242
243 // Poll test driver for IO.
244 if (m_testDriver)
245 anyIO = getTestDriver()->poll(m_bufferOut) || anyIO;
246
247 // If no IO happens in a reasonable amount of time, go to sleep.
248 {
249 deUint64 curTime = deGetMicroseconds();
250 if (anyIO)
251 lastIoTime = curTime;
252 else if (curTime-lastIoTime > SERVER_IDLE_THRESHOLD*1000)
253 deSleep(SERVER_IDLE_SLEEP); // Too long since last IO, sleep for a while.
254 else
255 deYield(); // Just give other threads chance to run.
256 }
257 }
258 }
259
processMessage(MessageType type,const deUint8 * data,int dataSize)260 void ExecutionRequestHandler::processMessage (MessageType type, const deUint8* data, int dataSize)
261 {
262 switch (type)
263 {
264 case MESSAGETYPE_HELLO:
265 {
266 HelloMessage msg(data, dataSize);
267 DBG_PRINT(("HelloMessage: version = %d\n", msg.version));
268 if (msg.version != PROTOCOL_VERSION)
269 throw ProtocolError("Unsupported protocol version");
270 break;
271 }
272
273 case MESSAGETYPE_TEST:
274 {
275 TestMessage msg(data, dataSize);
276 DBG_PRINT(("TestMessage: '%s'\n", msg.test.c_str()));
277 break;
278 }
279
280 case MESSAGETYPE_KEEPALIVE:
281 {
282 KeepAliveMessage msg(data, dataSize);
283 DBG_PRINT(("KeepAliveMessage\n"));
284 keepAliveReceived();
285 break;
286 }
287
288 case MESSAGETYPE_EXECUTE_BINARY:
289 {
290 ExecuteBinaryMessage msg(data, dataSize);
291 DBG_PRINT(("ExecuteBinaryMessage: '%s', '%s', '%s', '%s'\n", msg.name.c_str(), msg.params.c_str(), msg.workDir.c_str(), msg.caseList.substr(0, 10).c_str()));
292 getTestDriver()->startProcess(msg.name.c_str(), msg.params.c_str(), msg.workDir.c_str(), msg.caseList.c_str());
293 keepAliveReceived(); // \todo [2011-10-11 pyry] Remove this once Candy is fixed.
294 break;
295 }
296
297 case MESSAGETYPE_STOP_EXECUTION:
298 {
299 StopExecutionMessage msg(data, dataSize);
300 DBG_PRINT(("StopExecutionMessage\n"));
301 getTestDriver()->stopProcess();
302 break;
303 }
304
305 default:
306 throw ProtocolError("Unsupported message");
307 }
308 }
309
initKeepAlives(void)310 void ExecutionRequestHandler::initKeepAlives (void)
311 {
312 deUint64 curTime = deGetMicroseconds();
313 m_lastKeepAliveSent = curTime;
314 m_lastKeepAliveReceived = curTime;
315 }
316
keepAliveReceived(void)317 void ExecutionRequestHandler::keepAliveReceived (void)
318 {
319 m_lastKeepAliveReceived = deGetMicroseconds();
320 }
321
pollKeepAlives(void)322 void ExecutionRequestHandler::pollKeepAlives (void)
323 {
324 deUint64 curTime = deGetMicroseconds();
325
326 // Check that we've got keepalives in timely fashion.
327 if (curTime - m_lastKeepAliveReceived > KEEPALIVE_TIMEOUT*1000)
328 throw ProtocolError("Keepalive timeout occurred");
329
330 // Send some?
331 if (curTime - m_lastKeepAliveSent > KEEPALIVE_SEND_INTERVAL*1000 &&
332 m_bufferOut.getNumFree() >= MESSAGE_HEADER_SIZE)
333 {
334 vector<deUint8> buf;
335 KeepAliveMessage().write(buf);
336 m_bufferOut.pushFront(&buf[0], (int)buf.size());
337
338 m_lastKeepAliveSent = deGetMicroseconds();
339 }
340 }
341
receive(void)342 bool ExecutionRequestHandler::receive (void)
343 {
344 int maxLen = de::min<int>((int)m_sendRecvTmpBuf.size(), m_bufferIn.getNumFree());
345
346 if (maxLen > 0)
347 {
348 int numRecv;
349 deSocketResult result = m_socket->receive(&m_sendRecvTmpBuf[0], maxLen, &numRecv);
350
351 if (result == DE_SOCKETRESULT_SUCCESS)
352 {
353 DE_ASSERT(numRecv > 0);
354 m_bufferIn.pushFront(&m_sendRecvTmpBuf[0], numRecv);
355 return true;
356 }
357 else if (result == DE_SOCKETRESULT_CONNECTION_CLOSED)
358 {
359 m_run = false;
360 return true;
361 }
362 else if (result == DE_SOCKETRESULT_WOULD_BLOCK)
363 return false;
364 else if (result == DE_SOCKETRESULT_CONNECTION_TERMINATED)
365 throw ConnectionError("Connection terminated");
366 else
367 throw ConnectionError("receive() failed");
368 }
369 else
370 return false;
371 }
372
send(void)373 bool ExecutionRequestHandler::send (void)
374 {
375 int maxLen = de::min<int>((int)m_sendRecvTmpBuf.size(), m_bufferOut.getNumElements());
376
377 if (maxLen > 0)
378 {
379 m_bufferOut.peekBack(&m_sendRecvTmpBuf[0], maxLen);
380
381 int numSent;
382 deSocketResult result = m_socket->send(&m_sendRecvTmpBuf[0], maxLen, &numSent);
383
384 if (result == DE_SOCKETRESULT_SUCCESS)
385 {
386 DE_ASSERT(numSent > 0);
387 m_bufferOut.popBack(numSent);
388 return true;
389 }
390 else if (result == DE_SOCKETRESULT_CONNECTION_CLOSED)
391 {
392 m_run = false;
393 return true;
394 }
395 else if (result == DE_SOCKETRESULT_WOULD_BLOCK)
396 return false;
397 else if (result == DE_SOCKETRESULT_CONNECTION_TERMINATED)
398 throw ConnectionError("Connection terminated");
399 else
400 throw ConnectionError("send() failed");
401 }
402 else
403 return false;
404 }
405
406 } // xs
407