1 /*-------------------------------------------------------------------------
2  * drawElements Quality Program Execution Server
3  * ---------------------------------------------
4  *
5  * Copyright 2014 The Android Open Source Project
6  *
7  * Licensed under the Apache License, Version 2.0 (the "License");
8  * you may not use this file except in compliance with the License.
9  * You may obtain a copy of the License at
10  *
11  *      http://www.apache.org/licenses/LICENSE-2.0
12  *
13  * Unless required by applicable law or agreed to in writing, software
14  * distributed under the License is distributed on an "AS IS" BASIS,
15  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16  * See the License for the specific language governing permissions and
17  * limitations under the License.
18  *
19  *//*!
20  * \file
21  * \brief Test Execution Server.
22  *//*--------------------------------------------------------------------*/
23 
24 #include "xsExecutionServer.hpp"
25 #include "deClock.h"
26 
27 #include <cstdio>
28 
29 using std::vector;
30 using std::string;
31 
32 #if 1
33 #	define DBG_PRINT(X) printf X
34 #else
35 #	define DBG_PRINT(X)
36 #endif
37 
38 namespace xs
39 {
40 
isComplete(void) const41 inline bool MessageBuilder::isComplete (void) const
42 {
43 	if (m_buffer.size() < MESSAGE_HEADER_SIZE)
44 		return false;
45 	else
46 		return m_buffer.size() == getMessageSize();
47 }
48 
getMessageData(void) const49 const deUint8* MessageBuilder::getMessageData (void) const
50 {
51 	return m_buffer.size() > MESSAGE_HEADER_SIZE ? &m_buffer[MESSAGE_HEADER_SIZE] : DE_NULL;
52 }
53 
getMessageDataSize(void) const54 size_t MessageBuilder::getMessageDataSize (void) const
55 {
56 	DE_ASSERT(isComplete());
57 	return m_buffer.size() - MESSAGE_HEADER_SIZE;
58 }
59 
read(ByteBuffer & src)60 void MessageBuilder::read (ByteBuffer& src)
61 {
62 	// Try to get header.
63 	if (m_buffer.size() < MESSAGE_HEADER_SIZE)
64 	{
65 		while (m_buffer.size() < MESSAGE_HEADER_SIZE &&
66 			   src.getNumElements() > 0)
67 			m_buffer.push_back(src.popBack());
68 
69 		DE_ASSERT(m_buffer.size() <= MESSAGE_HEADER_SIZE);
70 
71 		if (m_buffer.size() == MESSAGE_HEADER_SIZE)
72 		{
73 			// Got whole header, parse it.
74 			Message::parseHeader(&m_buffer[0], (int)m_buffer.size(), m_messageType, m_messageSize);
75 		}
76 	}
77 
78 	if (m_buffer.size() >= MESSAGE_HEADER_SIZE)
79 	{
80 		// We have header.
81 		size_t msgSize			= getMessageSize();
82 		size_t numBytesLeft		= msgSize - m_buffer.size();
83 		size_t numToRead		= (size_t)de::min(src.getNumElements(), (int)numBytesLeft);
84 
85 		if (numToRead > 0)
86 		{
87 			int curBufPos = (int)m_buffer.size();
88 			m_buffer.resize(curBufPos+numToRead);
89 			src.popBack(&m_buffer[curBufPos], (int)numToRead);
90 		}
91 	}
92 }
93 
clear(void)94 void MessageBuilder::clear (void)
95 {
96 	m_buffer.clear();
97 	m_messageType	= MESSAGETYPE_NONE;
98 	m_messageSize	= 0;
99 }
100 
ExecutionServer(xs::TestProcess * testProcess,deSocketFamily family,int port,RunMode runMode)101 ExecutionServer::ExecutionServer (xs::TestProcess* testProcess, deSocketFamily family, int port, RunMode runMode)
102 	: TcpServer		(family, port)
103 	, m_testDriver	(testProcess)
104 	, m_runMode		(runMode)
105 {
106 }
107 
~ExecutionServer(void)108 ExecutionServer::~ExecutionServer (void)
109 {
110 }
111 
acquireTestDriver(void)112 TestDriver* ExecutionServer::acquireTestDriver (void)
113 {
114 	if (!m_testDriverLock.tryLock())
115 		throw Error("Failed to acquire test driver");
116 
117 	return &m_testDriver;
118 }
119 
releaseTestDriver(TestDriver * driver)120 void ExecutionServer::releaseTestDriver (TestDriver* driver)
121 {
122 	DE_ASSERT(&m_testDriver == driver);
123 	DE_UNREF(driver);
124 	m_testDriverLock.unlock();
125 }
126 
createHandler(de::Socket * socket,const de::SocketAddress & clientAddress)127 ConnectionHandler* ExecutionServer::createHandler (de::Socket* socket, const de::SocketAddress& clientAddress)
128 {
129 	printf("ExecutionServer: New connection from %s:%d\n", clientAddress.getHost(), clientAddress.getPort());
130 	return new ExecutionRequestHandler(this, socket);
131 }
132 
connectionDone(ConnectionHandler * handler)133 void ExecutionServer::connectionDone (ConnectionHandler* handler)
134 {
135 	if (m_runMode == RUNMODE_SINGLE_EXEC)
136 		m_socket.close();
137 
138 	TcpServer::connectionDone(handler);
139 }
140 
ExecutionRequestHandler(ExecutionServer * server,de::Socket * socket)141 ExecutionRequestHandler::ExecutionRequestHandler (ExecutionServer* server, de::Socket* socket)
142 	: ConnectionHandler	(server, socket)
143 	, m_execServer		(server)
144 	, m_testDriver		(DE_NULL)
145 	, m_bufferIn		(RECV_BUFFER_SIZE)
146 	, m_bufferOut		(SEND_BUFFER_SIZE)
147 	, m_run				(false)
148 	, m_sendRecvTmpBuf	(SEND_RECV_TMP_BUFFER_SIZE)
149 {
150 	// Set flags.
151 	m_socket->setFlags(DE_SOCKET_NONBLOCKING|DE_SOCKET_KEEPALIVE|DE_SOCKET_CLOSE_ON_EXEC);
152 
153 	// Init protocol keepalives.
154 	initKeepAlives();
155 }
156 
~ExecutionRequestHandler(void)157 ExecutionRequestHandler::~ExecutionRequestHandler (void)
158 {
159 	if (m_testDriver)
160 		m_execServer->releaseTestDriver(m_testDriver);
161 }
162 
handle(void)163 void ExecutionRequestHandler::handle (void)
164 {
165 	DBG_PRINT(("ExecutionRequestHandler::handle()\n"));
166 
167 	try
168 	{
169 		// Process execution session.
170 		processSession();
171 	}
172 	catch (const std::exception& e)
173 	{
174 		printf("ExecutionRequestHandler::run(): %s\n", e.what());
175 	}
176 
177 	DBG_PRINT(("ExecutionRequestHandler::handle(): Done!\n"));
178 
179 	// Release test driver.
180 	if (m_testDriver)
181 	{
182 		try
183 		{
184 			m_testDriver->reset();
185 		}
186 		catch (...)
187 		{
188 		}
189 		m_execServer->releaseTestDriver(m_testDriver);
190 		m_testDriver = DE_NULL;
191 	}
192 
193 	// Close connection.
194 	if (m_socket->isConnected())
195 		m_socket->shutdown();
196 }
197 
acquireTestDriver(void)198 void ExecutionRequestHandler::acquireTestDriver (void)
199 {
200 	DE_ASSERT(!m_testDriver);
201 
202 	// Try to acquire test driver - may fail.
203 	m_testDriver = m_execServer->acquireTestDriver();
204 	DE_ASSERT(m_testDriver);
205 	m_testDriver->reset();
206 
207 }
208 
processSession(void)209 void ExecutionRequestHandler::processSession (void)
210 {
211 	m_run = true;
212 
213 	deUint64 lastIoTime = deGetMicroseconds();
214 
215 	while (m_run)
216 	{
217 		bool anyIO = false;
218 
219 		// Read from socket to buffer.
220 		anyIO = receive() || anyIO;
221 
222 		// Send bytes in buffer.
223 		anyIO = send() || anyIO;
224 
225 		// Process incoming data.
226 		if (m_bufferIn.getNumElements() > 0)
227 		{
228 			DE_ASSERT(!m_msgBuilder.isComplete());
229 			m_msgBuilder.read(m_bufferIn);
230 		}
231 
232 		if (m_msgBuilder.isComplete())
233 		{
234 			// Process message.
235 			processMessage(m_msgBuilder.getMessageType(), m_msgBuilder.getMessageData(), m_msgBuilder.getMessageDataSize());
236 
237 			m_msgBuilder.clear();
238 		}
239 
240 		// Keepalives, anyone?
241 		pollKeepAlives();
242 
243 		// Poll test driver for IO.
244 		if (m_testDriver)
245 			anyIO = getTestDriver()->poll(m_bufferOut) || anyIO;
246 
247 		// If no IO happens in a reasonable amount of time, go to sleep.
248 		{
249 			deUint64 curTime = deGetMicroseconds();
250 			if (anyIO)
251 				lastIoTime = curTime;
252 			else if (curTime-lastIoTime > SERVER_IDLE_THRESHOLD*1000)
253 				deSleep(SERVER_IDLE_SLEEP); // Too long since last IO, sleep for a while.
254 			else
255 				deYield(); // Just give other threads chance to run.
256 		}
257 	}
258 }
259 
processMessage(MessageType type,const deUint8 * data,size_t dataSize)260 void ExecutionRequestHandler::processMessage (MessageType type, const deUint8* data, size_t dataSize)
261 {
262 	switch (type)
263 	{
264 		case MESSAGETYPE_HELLO:
265 		{
266 			HelloMessage msg(data, dataSize);
267 			DBG_PRINT(("HelloMessage: version = %d\n", msg.version));
268 			if (msg.version != PROTOCOL_VERSION)
269 				throw ProtocolError("Unsupported protocol version");
270 			break;
271 		}
272 
273 		case MESSAGETYPE_TEST:
274 		{
275 			TestMessage msg(data, dataSize);
276 			DBG_PRINT(("TestMessage: '%s'\n", msg.test.c_str()));
277 			break;
278 		}
279 
280 		case MESSAGETYPE_KEEPALIVE:
281 		{
282 			KeepAliveMessage msg(data, dataSize);
283 			DBG_PRINT(("KeepAliveMessage\n"));
284 			keepAliveReceived();
285 			break;
286 		}
287 
288 		case MESSAGETYPE_EXECUTE_BINARY:
289 		{
290 			ExecuteBinaryMessage msg(data, dataSize);
291 			DBG_PRINT(("ExecuteBinaryMessage: '%s', '%s', '%s', '%s'\n", msg.name.c_str(), msg.params.c_str(), msg.workDir.c_str(), msg.caseList.substr(0, 10).c_str()));
292 			getTestDriver()->startProcess(msg.name.c_str(), msg.params.c_str(), msg.workDir.c_str(), msg.caseList.c_str());
293 			keepAliveReceived(); // \todo [2011-10-11 pyry] Remove this once Candy is fixed.
294 			break;
295 		}
296 
297 		case MESSAGETYPE_STOP_EXECUTION:
298 		{
299 			StopExecutionMessage msg(data, dataSize);
300 			DBG_PRINT(("StopExecutionMessage\n"));
301 			getTestDriver()->stopProcess();
302 			break;
303 		}
304 
305 		default:
306 			throw ProtocolError("Unsupported message");
307 	}
308 }
309 
initKeepAlives(void)310 void ExecutionRequestHandler::initKeepAlives (void)
311 {
312 	deUint64 curTime = deGetMicroseconds();
313 	m_lastKeepAliveSent		= curTime;
314 	m_lastKeepAliveReceived	= curTime;
315 }
316 
keepAliveReceived(void)317 void ExecutionRequestHandler::keepAliveReceived (void)
318 {
319 	m_lastKeepAliveReceived = deGetMicroseconds();
320 }
321 
pollKeepAlives(void)322 void ExecutionRequestHandler::pollKeepAlives (void)
323 {
324 	deUint64 curTime = deGetMicroseconds();
325 
326 	// Check that we've got keepalives in timely fashion.
327 	if (curTime - m_lastKeepAliveReceived > KEEPALIVE_TIMEOUT*1000)
328 		throw ProtocolError("Keepalive timeout occurred");
329 
330 	// Send some?
331 	if (curTime - m_lastKeepAliveSent > KEEPALIVE_SEND_INTERVAL*1000 &&
332 		m_bufferOut.getNumFree() >= MESSAGE_HEADER_SIZE)
333 	{
334 		vector<deUint8> buf;
335 		KeepAliveMessage().write(buf);
336 		m_bufferOut.pushFront(&buf[0], (int)buf.size());
337 
338 		m_lastKeepAliveSent = deGetMicroseconds();
339 	}
340 }
341 
receive(void)342 bool ExecutionRequestHandler::receive (void)
343 {
344 	size_t maxLen = de::min(m_sendRecvTmpBuf.size(), (size_t)m_bufferIn.getNumFree());
345 
346 	if (maxLen > 0)
347 	{
348 		size_t			numRecv;
349 		deSocketResult	result	= m_socket->receive(&m_sendRecvTmpBuf[0], maxLen, &numRecv);
350 
351 		if (result == DE_SOCKETRESULT_SUCCESS)
352 		{
353 			DE_ASSERT(numRecv > 0);
354 			m_bufferIn.pushFront(&m_sendRecvTmpBuf[0], (int)numRecv);
355 			return true;
356 		}
357 		else if (result == DE_SOCKETRESULT_CONNECTION_CLOSED)
358 		{
359 			m_run = false;
360 			return true;
361 		}
362 		else if (result == DE_SOCKETRESULT_WOULD_BLOCK)
363 			return false;
364 		else if (result == DE_SOCKETRESULT_CONNECTION_TERMINATED)
365 			throw ConnectionError("Connection terminated");
366 		else
367 			throw ConnectionError("receive() failed");
368 	}
369 	else
370 		return false;
371 }
372 
send(void)373 bool ExecutionRequestHandler::send (void)
374 {
375 	size_t maxLen = de::min(m_sendRecvTmpBuf.size(), (size_t)m_bufferOut.getNumElements());
376 
377 	if (maxLen > 0)
378 	{
379 		m_bufferOut.peekBack(&m_sendRecvTmpBuf[0], (int)maxLen);
380 
381 		size_t			numSent;
382 		deSocketResult	result	= m_socket->send(&m_sendRecvTmpBuf[0], maxLen, &numSent);
383 
384 		if (result == DE_SOCKETRESULT_SUCCESS)
385 		{
386 			DE_ASSERT(numSent > 0);
387 			m_bufferOut.popBack((int)numSent);
388 			return true;
389 		}
390 		else if (result == DE_SOCKETRESULT_CONNECTION_CLOSED)
391 		{
392 			m_run = false;
393 			return true;
394 		}
395 		else if (result == DE_SOCKETRESULT_WOULD_BLOCK)
396 			return false;
397 		else if (result == DE_SOCKETRESULT_CONNECTION_TERMINATED)
398 			throw ConnectionError("Connection terminated");
399 		else
400 			throw ConnectionError("send() failed");
401 	}
402 	else
403 		return false;
404 }
405 
406 } // xs
407