1 /*-------------------------------------------------------------------------
2  * drawElements Quality Program Test Executor
3  * ------------------------------------------
4  *
5  * Copyright 2014 The Android Open Source Project
6  *
7  * Licensed under the Apache License, Version 2.0 (the "License");
8  * you may not use this file except in compliance with the License.
9  * You may obtain a copy of the License at
10  *
11  *      http://www.apache.org/licenses/LICENSE-2.0
12  *
13  * Unless required by applicable law or agreed to in writing, software
14  * distributed under the License is distributed on an "AS IS" BASIS,
15  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16  * See the License for the specific language governing permissions and
17  * limitations under the License.
18  *
19  *//*!
20  * \file
21  * \brief Tcp/Ip communication link.
22  *//*--------------------------------------------------------------------*/
23 
24 #include "xeTcpIpLink.hpp"
25 #include "xsProtocol.hpp"
26 #include "deClock.h"
27 #include "deInt32.h"
28 
29 namespace xe
30 {
31 
32 enum
33 {
34 	SEND_BUFFER_BLOCK_SIZE		= 1024,
35 	SEND_BUFFER_NUM_BLOCKS		= 64
36 };
37 
38 // Utilities for writing messages out.
39 
writeMessageHeader(de::BlockBuffer<deUint8> & dst,xs::MessageType type,int messageSize)40 static void writeMessageHeader (de::BlockBuffer<deUint8>& dst, xs::MessageType type, int messageSize)
41 {
42 	deUint8 hdr[xs::MESSAGE_HEADER_SIZE];
43 	xs::Message::writeHeader(type, messageSize, &hdr[0], xs::MESSAGE_HEADER_SIZE);
44 	dst.write(xs::MESSAGE_HEADER_SIZE, &hdr[0]);
45 }
46 
writeKeepalive(de::BlockBuffer<deUint8> & dst)47 static void writeKeepalive (de::BlockBuffer<deUint8>& dst)
48 {
49 	writeMessageHeader(dst, xs::MESSAGETYPE_KEEPALIVE, xs::MESSAGE_HEADER_SIZE);
50 	dst.flush();
51 }
52 
writeExecuteBinary(de::BlockBuffer<deUint8> & dst,const char * name,const char * params,const char * workDir,const char * caseList)53 static void writeExecuteBinary (de::BlockBuffer<deUint8>& dst, const char* name, const char* params, const char* workDir, const char* caseList)
54 {
55 	int		nameSize			= (int)strlen(name)		+ 1;
56 	int		paramsSize			= (int)strlen(params)	+ 1;
57 	int		workDirSize			= (int)strlen(workDir)	+ 1;
58 	int		caseListSize		= (int)strlen(caseList)	+ 1;
59 	int		totalSize			= xs::MESSAGE_HEADER_SIZE + nameSize + paramsSize + workDirSize + caseListSize;
60 
61 	writeMessageHeader(dst, xs::MESSAGETYPE_EXECUTE_BINARY, totalSize);
62 	dst.write(nameSize,		(const deUint8*)name);
63 	dst.write(paramsSize,	(const deUint8*)params);
64 	dst.write(workDirSize,	(const deUint8*)workDir);
65 	dst.write(caseListSize,	(const deUint8*)caseList);
66 	dst.flush();
67 }
68 
writeStopExecution(de::BlockBuffer<deUint8> & dst)69 static void writeStopExecution (de::BlockBuffer<deUint8>& dst)
70 {
71 	writeMessageHeader(dst, xs::MESSAGETYPE_STOP_EXECUTION, xs::MESSAGE_HEADER_SIZE);
72 	dst.flush();
73 }
74 
75 // TcpIpLinkState
76 
TcpIpLinkState(CommLinkState initialState,const char * initialErr)77 TcpIpLinkState::TcpIpLinkState (CommLinkState initialState, const char* initialErr)
78 	: m_state					(initialState)
79 	, m_error					(initialErr)
80 	, m_lastKeepaliveReceived	(0)
81 	, m_stateChangedCallback	(DE_NULL)
82 	, m_testLogDataCallback		(DE_NULL)
83 	, m_infoLogDataCallback		(DE_NULL)
84 	, m_userPtr					(DE_NULL)
85 {
86 }
87 
~TcpIpLinkState(void)88 TcpIpLinkState::~TcpIpLinkState (void)
89 {
90 }
91 
getState(void) const92 CommLinkState TcpIpLinkState::getState (void) const
93 {
94 	de::ScopedLock lock(m_lock);
95 
96 	return m_state;
97 }
98 
getState(std::string & error) const99 CommLinkState TcpIpLinkState::getState (std::string& error) const
100 {
101 	de::ScopedLock lock(m_lock);
102 
103 	error = m_error;
104 	return m_state;
105 }
106 
setCallbacks(CommLink::StateChangedFunc stateChangedCallback,CommLink::LogDataFunc testLogDataCallback,CommLink::LogDataFunc infoLogDataCallback,void * userPtr)107 void TcpIpLinkState::setCallbacks (CommLink::StateChangedFunc stateChangedCallback, CommLink::LogDataFunc testLogDataCallback, CommLink::LogDataFunc infoLogDataCallback, void* userPtr)
108 {
109 	de::ScopedLock lock(m_lock);
110 
111 	m_stateChangedCallback		= stateChangedCallback;
112 	m_testLogDataCallback		= testLogDataCallback;
113 	m_infoLogDataCallback		= infoLogDataCallback;
114 	m_userPtr					= userPtr;
115 }
116 
setState(CommLinkState state,const char * error)117 void TcpIpLinkState::setState (CommLinkState state, const char* error)
118 {
119 	CommLink::StateChangedFunc	callback	= DE_NULL;
120 	void*						userPtr		= DE_NULL;
121 
122 	{
123 		de::ScopedLock lock(m_lock);
124 
125 		m_state = state;
126 		m_error	= error;
127 
128 		callback	= m_stateChangedCallback;
129 		userPtr		= m_userPtr;
130 	}
131 
132 	if (callback)
133 		callback(userPtr, state, error);
134 }
135 
onTestLogData(const deUint8 * bytes,size_t numBytes) const136 void TcpIpLinkState::onTestLogData (const deUint8* bytes, size_t numBytes) const
137 {
138 	CommLink::LogDataFunc	callback	= DE_NULL;
139 	void*					userPtr		= DE_NULL;
140 
141 	m_lock.lock();
142 	callback	= m_testLogDataCallback;
143 	userPtr		= m_userPtr;
144 	m_lock.unlock();
145 
146 	if (callback)
147 		callback(userPtr, bytes, numBytes);
148 }
149 
onInfoLogData(const deUint8 * bytes,size_t numBytes) const150 void TcpIpLinkState::onInfoLogData (const deUint8* bytes, size_t numBytes) const
151 {
152 	CommLink::LogDataFunc	callback	= DE_NULL;
153 	void*					userPtr		= DE_NULL;
154 
155 	m_lock.lock();
156 	callback	= m_infoLogDataCallback;
157 	userPtr		= m_userPtr;
158 	m_lock.unlock();
159 
160 	if (callback)
161 		callback(userPtr, bytes, numBytes);
162 }
163 
onKeepaliveReceived(void)164 void TcpIpLinkState::onKeepaliveReceived (void)
165 {
166 	de::ScopedLock lock(m_lock);
167 	m_lastKeepaliveReceived = deGetMicroseconds();
168 }
169 
getLastKeepaliveRecevied(void) const170 deUint64 TcpIpLinkState::getLastKeepaliveRecevied (void) const
171 {
172 	de::ScopedLock lock(m_lock);
173 	return m_lastKeepaliveReceived;
174 }
175 
176 // TcpIpSendThread
177 
TcpIpSendThread(de::Socket & socket,TcpIpLinkState & state)178 TcpIpSendThread::TcpIpSendThread (de::Socket& socket, TcpIpLinkState& state)
179 	: m_socket		(socket)
180 	, m_state		(state)
181 	, m_buffer		(SEND_BUFFER_BLOCK_SIZE, SEND_BUFFER_NUM_BLOCKS)
182 	, m_isRunning	(false)
183 {
184 }
185 
~TcpIpSendThread(void)186 TcpIpSendThread::~TcpIpSendThread (void)
187 {
188 }
189 
start(void)190 void TcpIpSendThread::start (void)
191 {
192 	DE_ASSERT(!m_isRunning);
193 
194 	// Reset state.
195 	m_buffer.clear();
196 	m_isRunning = true;
197 
198 	de::Thread::start();
199 }
200 
run(void)201 void TcpIpSendThread::run (void)
202 {
203 	try
204 	{
205 		deUint8 buf[SEND_BUFFER_BLOCK_SIZE];
206 
207 		while (!m_buffer.isCanceled())
208 		{
209 			size_t			numToSend	= 0;
210 			size_t			numSent		= 0;
211 			deSocketResult	result		= DE_SOCKETRESULT_LAST;
212 
213 			try
214 			{
215 				// Wait for single byte and then try to read more.
216 				m_buffer.read(1, &buf[0]);
217 				numToSend = 1 + m_buffer.tryRead(DE_LENGTH_OF_ARRAY(buf)-1, &buf[1]);
218 			}
219 			catch (const de::BlockBuffer<deUint8>::CanceledException&)
220 			{
221 				// Handled in loop condition.
222 			}
223 
224 			while (numSent < numToSend)
225 			{
226 				result = m_socket.send(&buf[numSent], numToSend-numSent, &numSent);
227 
228 				if (result == DE_SOCKETRESULT_CONNECTION_CLOSED)
229 					XE_FAIL("Connection closed");
230 				else if (result == DE_SOCKETRESULT_CONNECTION_TERMINATED)
231 					XE_FAIL("Connection terminated");
232 				else if (result == DE_SOCKETRESULT_ERROR)
233 					XE_FAIL("Socket error");
234 				else if (result == DE_SOCKETRESULT_WOULD_BLOCK)
235 				{
236 					// \note Socket should not be in non-blocking mode.
237 					DE_ASSERT(numSent == 0);
238 					deYield();
239 				}
240 				else
241 					DE_ASSERT(result == DE_SOCKETRESULT_SUCCESS);
242 			}
243 		}
244 	}
245 	catch (const std::exception& e)
246 	{
247 		m_state.setState(COMMLINKSTATE_ERROR, e.what());
248 	}
249 }
250 
stop(void)251 void TcpIpSendThread::stop (void)
252 {
253 	if (m_isRunning)
254 	{
255 		m_buffer.cancel();
256 		join();
257 		m_isRunning = false;
258 	}
259 }
260 
261 // TcpIpRecvThread
262 
TcpIpRecvThread(de::Socket & socket,TcpIpLinkState & state)263 TcpIpRecvThread::TcpIpRecvThread (de::Socket& socket, TcpIpLinkState& state)
264 	: m_socket		(socket)
265 	, m_state		(state)
266 	, m_curMsgPos	(0)
267 	, m_isRunning	(false)
268 {
269 }
270 
~TcpIpRecvThread(void)271 TcpIpRecvThread::~TcpIpRecvThread (void)
272 {
273 }
274 
start(void)275 void TcpIpRecvThread::start (void)
276 {
277 	DE_ASSERT(!m_isRunning);
278 
279 	// Reset state.
280 	m_curMsgPos = 0;
281 	m_isRunning = true;
282 
283 	de::Thread::start();
284 }
285 
run(void)286 void TcpIpRecvThread::run (void)
287 {
288 	try
289 	{
290 		for (;;)
291 		{
292 			bool				hasHeader		= m_curMsgPos >= xs::MESSAGE_HEADER_SIZE;
293 			bool				hasPayload		= false;
294 			size_t				messageSize		= 0;
295 			xs::MessageType		messageType		= (xs::MessageType)0;
296 
297 			if (hasHeader)
298 			{
299 				xs::Message::parseHeader(&m_curMsgBuf[0], xs::MESSAGE_HEADER_SIZE, messageType, messageSize);
300 				hasPayload = m_curMsgPos >= messageSize;
301 			}
302 
303 			if (hasPayload)
304 			{
305 				// Process message.
306 				handleMessage(messageType, m_curMsgPos > xs::MESSAGE_HEADER_SIZE ? &m_curMsgBuf[xs::MESSAGE_HEADER_SIZE] : DE_NULL, messageSize-xs::MESSAGE_HEADER_SIZE);
307 				m_curMsgPos = 0;
308 			}
309 			else
310 			{
311 				// Try to receive missing bytes.
312 				size_t				curSize			= hasHeader ? messageSize : (size_t)xs::MESSAGE_HEADER_SIZE;
313 				size_t				bytesToRecv		= curSize-m_curMsgPos;
314 				size_t				numRecv			= 0;
315 				deSocketResult		result			= DE_SOCKETRESULT_LAST;
316 
317 				if (m_curMsgBuf.size() < curSize)
318 					m_curMsgBuf.resize(curSize);
319 
320 				result = m_socket.receive(&m_curMsgBuf[m_curMsgPos], bytesToRecv, &numRecv);
321 
322 				if (result == DE_SOCKETRESULT_CONNECTION_CLOSED)
323 					XE_FAIL("Connection closed");
324 				else if (result == DE_SOCKETRESULT_CONNECTION_TERMINATED)
325 					XE_FAIL("Connection terminated");
326 				else if (result == DE_SOCKETRESULT_ERROR)
327 					XE_FAIL("Socket error");
328 				else if (result == DE_SOCKETRESULT_WOULD_BLOCK)
329 				{
330 					// \note Socket should not be in non-blocking mode.
331 					DE_ASSERT(numRecv == 0);
332 					deYield();
333 				}
334 				else
335 				{
336 					DE_ASSERT(result == DE_SOCKETRESULT_SUCCESS);
337 					DE_ASSERT(numRecv <= bytesToRecv);
338 					m_curMsgPos += numRecv;
339 					// Continue receiving bytes / handle message in next iter.
340 				}
341 			}
342 		}
343 	}
344 	catch (const std::exception& e)
345 	{
346 		m_state.setState(COMMLINKSTATE_ERROR, e.what());
347 	}
348 }
349 
stop(void)350 void TcpIpRecvThread::stop (void)
351 {
352 	if (m_isRunning)
353 	{
354 		// \note Socket must be closed before terminating receive thread.
355 		XE_CHECK(!m_socket.isReceiveOpen());
356 
357 		join();
358 		m_isRunning = false;
359 	}
360 }
361 
handleMessage(xs::MessageType messageType,const deUint8 * data,size_t dataSize)362 void TcpIpRecvThread::handleMessage (xs::MessageType messageType, const deUint8* data, size_t dataSize)
363 {
364 	switch (messageType)
365 	{
366 		case xs::MESSAGETYPE_KEEPALIVE:
367 			m_state.onKeepaliveReceived();
368 			break;
369 
370 		case xs::MESSAGETYPE_PROCESS_STARTED:
371 			XE_CHECK_MSG(m_state.getState() == COMMLINKSTATE_TEST_PROCESS_LAUNCHING, "Unexpected PROCESS_STARTED message");
372 			m_state.setState(COMMLINKSTATE_TEST_PROCESS_RUNNING);
373 			break;
374 
375 		case xs::MESSAGETYPE_PROCESS_LAUNCH_FAILED:
376 		{
377 			xs::ProcessLaunchFailedMessage msg(data, dataSize);
378 			XE_CHECK_MSG(m_state.getState() == COMMLINKSTATE_TEST_PROCESS_LAUNCHING, "Unexpected PROCESS_LAUNCH_FAILED message");
379 			m_state.setState(COMMLINKSTATE_TEST_PROCESS_LAUNCH_FAILED, msg.reason.c_str());
380 			break;
381 		}
382 
383 		case xs::MESSAGETYPE_PROCESS_FINISHED:
384 		{
385 			XE_CHECK_MSG(m_state.getState() == COMMLINKSTATE_TEST_PROCESS_RUNNING, "Unexpected PROCESS_FINISHED message");
386 			xs::ProcessFinishedMessage msg(data, dataSize);
387 			m_state.setState(COMMLINKSTATE_TEST_PROCESS_FINISHED);
388 			DE_UNREF(msg); // \todo [2012-06-19 pyry] Report exit code.
389 			break;
390 		}
391 
392 		case xs::MESSAGETYPE_PROCESS_LOG_DATA:
393 		case xs::MESSAGETYPE_INFO:
394 			// Ignore leading \0 if such is present. \todo [2012-06-19 pyry] Improve protocol.
395 			if (data[dataSize-1] == 0)
396 				dataSize -= 1;
397 
398 			if (messageType == xs::MESSAGETYPE_PROCESS_LOG_DATA)
399 			{
400 				XE_CHECK_MSG(m_state.getState() == COMMLINKSTATE_TEST_PROCESS_RUNNING, "Unexpected PROCESS_LOG_DATA message");
401 				m_state.onTestLogData(&data[0], dataSize);
402 			}
403 			else
404 				m_state.onInfoLogData(&data[0], dataSize);
405 			break;
406 
407 		default:
408 			XE_FAIL("Unknown message");
409 	}
410 }
411 
412 // TcpIpLink
413 
TcpIpLink(void)414 TcpIpLink::TcpIpLink (void)
415 	: m_state			(COMMLINKSTATE_ERROR, "Not connected")
416 	, m_sendThread		(m_socket, m_state)
417 	, m_recvThread		(m_socket, m_state)
418 	, m_keepaliveTimer	(DE_NULL)
419 {
420 	m_keepaliveTimer = deTimer_create(keepaliveTimerCallback, this);
421 	XE_CHECK(m_keepaliveTimer);
422 }
423 
~TcpIpLink(void)424 TcpIpLink::~TcpIpLink (void)
425 {
426 	try
427 	{
428 		closeConnection();
429 	}
430 	catch (...)
431 	{
432 		// Can't do much except to ignore error.
433 	}
434 	deTimer_destroy(m_keepaliveTimer);
435 }
436 
closeConnection(void)437 void TcpIpLink::closeConnection (void)
438 {
439 	{
440 		deSocketState state = m_socket.getState();
441 		if (state != DE_SOCKETSTATE_DISCONNECTED && state != DE_SOCKETSTATE_CLOSED)
442 			m_socket.shutdown();
443 	}
444 
445 	if (deTimer_isActive(m_keepaliveTimer))
446 		deTimer_disable(m_keepaliveTimer);
447 
448 	if (m_sendThread.isRunning())
449 		m_sendThread.stop();
450 
451 	if (m_recvThread.isRunning())
452 		m_recvThread.stop();
453 
454 	if (m_socket.getState() != DE_SOCKETSTATE_CLOSED)
455 		m_socket.close();
456 }
457 
connect(const de::SocketAddress & address)458 void TcpIpLink::connect (const de::SocketAddress& address)
459 {
460 	XE_CHECK(m_socket.getState() == DE_SOCKETSTATE_CLOSED);
461 	XE_CHECK(m_state.getState() == COMMLINKSTATE_ERROR);
462 	XE_CHECK(!m_sendThread.isRunning());
463 	XE_CHECK(!m_recvThread.isRunning());
464 
465 	m_socket.connect(address);
466 
467 	try
468 	{
469 		// Clear error and set state to ready.
470 		m_state.setState(COMMLINKSTATE_READY, "");
471 		m_state.onKeepaliveReceived();
472 
473 		// Launch threads.
474 		m_sendThread.start();
475 		m_recvThread.start();
476 
477 		XE_CHECK(deTimer_scheduleInterval(m_keepaliveTimer, xs::KEEPALIVE_SEND_INTERVAL));
478 	}
479 	catch (const std::exception& e)
480 	{
481 		closeConnection();
482 		m_state.setState(COMMLINKSTATE_ERROR, e.what());
483 		throw;
484 	}
485 }
486 
disconnect(void)487 void TcpIpLink::disconnect (void)
488 {
489 	try
490 	{
491 		closeConnection();
492 		m_state.setState(COMMLINKSTATE_ERROR, "Not connected");
493 	}
494 	catch (const std::exception& e)
495 	{
496 		m_state.setState(COMMLINKSTATE_ERROR, e.what());
497 	}
498 }
499 
reset(void)500 void TcpIpLink::reset (void)
501 {
502 	// \note Just clears error state if we are connected.
503 	if (m_socket.getState() == DE_SOCKETSTATE_CONNECTED)
504 	{
505 		m_state.setState(COMMLINKSTATE_READY, "");
506 
507 		// \todo [2012-07-10 pyry] Do we need to reset send/receive buffers?
508 	}
509 	else
510 		disconnect(); // Abnormal state/usage. Disconnect socket.
511 }
512 
keepaliveTimerCallback(void * ptr)513 void TcpIpLink::keepaliveTimerCallback (void* ptr)
514 {
515 	TcpIpLink*	link			= static_cast<TcpIpLink*>(ptr);
516 	deUint64	lastKeepalive	= link->m_state.getLastKeepaliveRecevied();
517 	deUint64	curTime			= deGetMicroseconds();
518 
519 	// Check for timeout.
520 	if ((deInt64)curTime-(deInt64)lastKeepalive > xs::KEEPALIVE_TIMEOUT*1000)
521 		link->m_state.setState(COMMLINKSTATE_ERROR, "Keepalive timeout");
522 
523 	// Enqueue new keepalive.
524 	try
525 	{
526 		writeKeepalive(link->m_sendThread.getBuffer());
527 	}
528 	catch (const de::BlockBuffer<deUint8>::CanceledException&)
529 	{
530 		// Ignore. Can happen in connection teardown.
531 	}
532 }
533 
getState(void) const534 CommLinkState TcpIpLink::getState (void) const
535 {
536 	return m_state.getState();
537 }
538 
getState(std::string & message) const539 CommLinkState TcpIpLink::getState (std::string& message) const
540 {
541 	return m_state.getState(message);
542 }
543 
setCallbacks(StateChangedFunc stateChangedCallback,LogDataFunc testLogDataCallback,LogDataFunc infoLogDataCallback,void * userPtr)544 void TcpIpLink::setCallbacks (StateChangedFunc stateChangedCallback, LogDataFunc testLogDataCallback, LogDataFunc infoLogDataCallback, void* userPtr)
545 {
546 	m_state.setCallbacks(stateChangedCallback, testLogDataCallback, infoLogDataCallback, userPtr);
547 }
548 
startTestProcess(const char * name,const char * params,const char * workingDir,const char * caseList)549 void TcpIpLink::startTestProcess (const char* name, const char* params, const char* workingDir, const char* caseList)
550 {
551 	XE_CHECK(m_state.getState() == COMMLINKSTATE_READY);
552 
553 	m_state.setState(COMMLINKSTATE_TEST_PROCESS_LAUNCHING);
554 	writeExecuteBinary(m_sendThread.getBuffer(), name, params, workingDir, caseList);
555 }
556 
stopTestProcess(void)557 void TcpIpLink::stopTestProcess (void)
558 {
559 	XE_CHECK(m_state.getState() != COMMLINKSTATE_ERROR);
560 	writeStopExecution(m_sendThread.getBuffer());
561 }
562 
563 } // xe
564