1 /*
2 * Copyright (c) 2011-2015, Intel Corporation
3 * All rights reserved.
4 *
5 * Redistribution and use in source and binary forms, with or without modification,
6 * are permitted provided that the following conditions are met:
7 *
8 * 1. Redistributions of source code must retain the above copyright notice, this
9 * list of conditions and the following disclaimer.
10 *
11 * 2. Redistributions in binary form must reproduce the above copyright notice,
12 * this list of conditions and the following disclaimer in the documentation and/or
13 * other materials provided with the distribution.
14 *
15 * 3. Neither the name of the copyright holder nor the names of its contributors
16 * may be used to endorse or promote products derived from this software without
17 * specific prior written permission.
18 *
19 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND
20 * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
21 * WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
22 * DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE FOR
23 * ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES
24 * (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
25 * LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON
26 * ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
27 * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
28 * SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
29 */
30 #include "RemoteProcessorServer.h"
31 #include "ListeningSocket.h"
32 #include "FullIo.hpp"
33 #include <iostream>
34 #include <memory>
35 #include <assert.h>
36 #include <poll.h>
37 #include <unistd.h>
38 #include <string.h>
39 #include <errno.h>
40 #include "RequestMessage.h"
41 #include "AnswerMessage.h"
42 #include "RemoteCommandHandler.h"
43
44 using std::string;
45
CRemoteProcessorServer(uint16_t uiPort,IRemoteCommandHandler * pCommandHandler)46 CRemoteProcessorServer::CRemoteProcessorServer(uint16_t uiPort, IRemoteCommandHandler* pCommandHandler) :
47 _uiPort(uiPort), _pCommandHandler(pCommandHandler), _bIsStarted(false), _pListeningSocket(NULL), _ulThreadId(0)
48 {
49 }
50
~CRemoteProcessorServer()51 CRemoteProcessorServer::~CRemoteProcessorServer()
52 {
53 stop();
54 }
55
56 // State
start(string & error)57 bool CRemoteProcessorServer::start(string &error)
58 {
59 assert(!_bIsStarted);
60
61 if (pipe(_aiInbandPipe) == -1) {
62 error = "Could not create a pipe for remote processor communication: ";
63 error += strerror(errno);
64 return false;
65 }
66
67 // Create server socket
68 std::auto_ptr<CListeningSocket> pListeningSocket(new CListeningSocket);
69
70 if (!pListeningSocket->listen(_uiPort, error)) {
71
72 return false;
73 }
74
75 // Thread needs to access to the listning socket.
76 _pListeningSocket = pListeningSocket.get();
77 // Create thread
78 errno = pthread_create(&_ulThreadId, NULL, thread_func, this);
79 if (errno != 0) {
80
81 error = "Could not create a remote processor thread: ";
82 error += strerror(errno);
83 return false;
84 }
85
86 // State
87 _bIsStarted = true;
88 pListeningSocket.release();
89
90 return true;
91 }
92
stop()93 void CRemoteProcessorServer::stop()
94 {
95 // Check state
96 if (!_bIsStarted) {
97
98 return;
99 }
100
101 // Cause exiting of the thread
102 uint8_t ucData = 0;
103 if (not utility::fullWrite(_aiInbandPipe[1], &ucData, sizeof(ucData))) {
104 std::cerr << "Could not query command processor thread to terminate: "
105 "fail to write on inband pipe: "
106 << strerror(errno) << std::endl;
107 assert(false);
108 }
109
110 // Join thread
111 errno = pthread_join(_ulThreadId, NULL);
112 if (errno != 0) {
113 std::cout << "Could not join with remote processor thread: "
114 << strerror(errno) << std::endl;
115 assert(false);
116 }
117
118 _bIsStarted = false;
119
120 // Remove listening socket
121 delete _pListeningSocket;
122 _pListeningSocket = NULL;
123 }
124
isStarted() const125 bool CRemoteProcessorServer::isStarted() const
126 {
127 return _bIsStarted;
128 }
129
130 // Thread
thread_func(void * pData)131 void* CRemoteProcessorServer::thread_func(void* pData)
132 {
133 reinterpret_cast<CRemoteProcessorServer*>(pData)->run();
134
135 return NULL;
136 }
137
run()138 void CRemoteProcessorServer::run()
139 {
140 struct pollfd _aPollFds[2];
141
142 bzero(_aPollFds, sizeof(_aPollFds));
143
144 // Build poll elements
145 _aPollFds[0].fd = _pListeningSocket->getFd();
146 _aPollFds[1].fd = _aiInbandPipe[0];
147 _aPollFds[0].events = POLLIN;
148 _aPollFds[1].events = POLLIN;
149
150 while (true) {
151
152 poll(_aPollFds, 2, -1);
153
154 if (_aPollFds[0].revents & POLLIN) {
155
156 // New incoming connection
157 handleNewConnection();
158 }
159 if (_aPollFds[1].revents & POLLIN) {
160
161 // Consume exit request
162 uint8_t ucData;
163 if (not utility::fullRead(_aiInbandPipe[0], &ucData, sizeof(ucData))) {
164 std::cerr << "Remote processor could not receive exit request"
165 << strerror(errno) << std::endl;
166 assert(false);
167 }
168
169 // Exit
170 return;
171 }
172 }
173 }
174
175 // New connection
handleNewConnection()176 void CRemoteProcessorServer::handleNewConnection()
177 {
178 const std::auto_ptr<CSocket> clientSocket(_pListeningSocket->accept());
179
180 if (clientSocket.get() == NULL) {
181
182 return;
183 }
184
185 // Process all incoming requests from the client
186 while (true) {
187
188 // Process requests
189 // Create command message
190 CRequestMessage requestMessage;
191
192 string strError;
193 ///// Receive command
194 CRequestMessage::Result res;
195 res = requestMessage.serialize(clientSocket.get(), false, strError);
196
197 switch (res) {
198 case CRequestMessage::error:
199 std::cout << "Error while receiving message: " << strError << std::endl;
200 // fall through
201 case CRequestMessage::peerDisconnected:
202 // Consider peer disconnection as normal, no log
203 return; // Bail out
204 case CRequestMessage::success:
205 break; // No error, continue
206 }
207
208 // Actually process the request
209 bool bSuccess;
210
211 string strResult;
212
213 if (_pCommandHandler) {
214
215 bSuccess = _pCommandHandler->remoteCommandProcess(requestMessage, strResult);
216
217 } else {
218
219 strResult = "No handler!";
220
221 bSuccess = false;
222 }
223
224 // Send back answer
225 // Create answer message
226 CAnswerMessage answerMessage(strResult, bSuccess);
227
228 ///// Send answer
229 res = answerMessage.serialize(clientSocket.get(), true, strError);
230
231 switch (res) {
232 case CRequestMessage::peerDisconnected:
233 // Peer should not disconnect while waiting for an answer
234 // Fall through to log the error and bail out
235 case CRequestMessage::error:
236 std::cout << "Error while receiving message: " << strError << std::endl;
237 return; // Bail out
238 case CRequestMessage::success:
239 break; // No error, continue
240 }
241 }
242 }
243