1 /*
2  * Copyright (c) 2011-2015, Intel Corporation
3  * All rights reserved.
4  *
5  * Redistribution and use in source and binary forms, with or without modification,
6  * are permitted provided that the following conditions are met:
7  *
8  * 1. Redistributions of source code must retain the above copyright notice, this
9  * list of conditions and the following disclaimer.
10  *
11  * 2. Redistributions in binary form must reproduce the above copyright notice,
12  * this list of conditions and the following disclaimer in the documentation and/or
13  * other materials provided with the distribution.
14  *
15  * 3. Neither the name of the copyright holder nor the names of its contributors
16  * may be used to endorse or promote products derived from this software without
17  * specific prior written permission.
18  *
19  * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND
20  * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
21  * WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
22  * DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE FOR
23  * ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES
24  * (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
25  * LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON
26  * ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
27  * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
28  * SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
29  */
30 #include "RemoteProcessorServer.h"
31 #include "ListeningSocket.h"
32 #include "FullIo.hpp"
33 #include <iostream>
34 #include <memory>
35 #include <assert.h>
36 #include <poll.h>
37 #include <unistd.h>
38 #include <string.h>
39 #include <errno.h>
40 #include "RequestMessage.h"
41 #include "AnswerMessage.h"
42 #include "RemoteCommandHandler.h"
43 
44 using std::string;
45 
CRemoteProcessorServer(uint16_t uiPort,IRemoteCommandHandler * pCommandHandler)46 CRemoteProcessorServer::CRemoteProcessorServer(uint16_t uiPort, IRemoteCommandHandler* pCommandHandler) :
47     _uiPort(uiPort), _pCommandHandler(pCommandHandler), _bIsStarted(false), _pListeningSocket(NULL), _ulThreadId(0)
48 {
49 }
50 
~CRemoteProcessorServer()51 CRemoteProcessorServer::~CRemoteProcessorServer()
52 {
53     stop();
54 }
55 
56 // State
start(string & error)57 bool CRemoteProcessorServer::start(string &error)
58 {
59     assert(!_bIsStarted);
60 
61     if (pipe(_aiInbandPipe) == -1) {
62         error = "Could not create a pipe for remote processor communication: ";
63         error += strerror(errno);
64         return false;
65     }
66 
67     // Create server socket
68     std::auto_ptr<CListeningSocket> pListeningSocket(new CListeningSocket);
69 
70     if (!pListeningSocket->listen(_uiPort, error)) {
71 
72         return false;
73     }
74 
75     // Thread needs to access to the listning socket.
76     _pListeningSocket = pListeningSocket.get();
77     // Create thread
78     errno = pthread_create(&_ulThreadId, NULL, thread_func, this);
79     if (errno != 0) {
80 
81         error = "Could not create a remote processor thread: ";
82         error += strerror(errno);
83         return false;
84     }
85 
86     // State
87     _bIsStarted = true;
88     pListeningSocket.release();
89 
90     return true;
91 }
92 
stop()93 void CRemoteProcessorServer::stop()
94 {
95     // Check state
96     if (!_bIsStarted) {
97 
98         return;
99     }
100 
101     // Cause exiting of the thread
102     uint8_t ucData = 0;
103     if (not utility::fullWrite(_aiInbandPipe[1], &ucData, sizeof(ucData))) {
104         std::cerr << "Could not query command processor thread to terminate: "
105                      "fail to write on inband pipe: "
106                   << strerror(errno) << std::endl;
107         assert(false);
108     }
109 
110     // Join thread
111     errno = pthread_join(_ulThreadId, NULL);
112     if (errno != 0) {
113         std::cout << "Could not join with remote processor thread: "
114                   << strerror(errno) << std::endl;
115         assert(false);
116     }
117 
118     _bIsStarted = false;
119 
120     // Remove listening socket
121     delete _pListeningSocket;
122     _pListeningSocket = NULL;
123 }
124 
isStarted() const125 bool CRemoteProcessorServer::isStarted() const
126 {
127     return _bIsStarted;
128 }
129 
130 // Thread
thread_func(void * pData)131 void* CRemoteProcessorServer::thread_func(void* pData)
132 {
133     reinterpret_cast<CRemoteProcessorServer*>(pData)->run();
134 
135     return NULL;
136 }
137 
run()138 void CRemoteProcessorServer::run()
139 {
140     struct pollfd _aPollFds[2];
141 
142     bzero(_aPollFds, sizeof(_aPollFds));
143 
144     // Build poll elements
145     _aPollFds[0].fd = _pListeningSocket->getFd();
146     _aPollFds[1].fd = _aiInbandPipe[0];
147     _aPollFds[0].events = POLLIN;
148     _aPollFds[1].events = POLLIN;
149 
150     while (true) {
151 
152         poll(_aPollFds, 2, -1);
153 
154         if (_aPollFds[0].revents & POLLIN) {
155 
156             // New incoming connection
157             handleNewConnection();
158         }
159         if (_aPollFds[1].revents & POLLIN) {
160 
161             // Consume exit request
162             uint8_t ucData;
163             if (not utility::fullRead(_aiInbandPipe[0], &ucData, sizeof(ucData))) {
164                     std::cerr << "Remote processor could not receive exit request"
165                               << strerror(errno) << std::endl;
166                     assert(false);
167             }
168 
169             // Exit
170             return;
171         }
172     }
173 }
174 
175 // New connection
handleNewConnection()176 void CRemoteProcessorServer::handleNewConnection()
177 {
178     const std::auto_ptr<CSocket> clientSocket(_pListeningSocket->accept());
179 
180     if (clientSocket.get() == NULL) {
181 
182         return;
183     }
184 
185     // Process all incoming requests from the client
186     while (true) {
187 
188         // Process requests
189         // Create command message
190         CRequestMessage requestMessage;
191 
192         string strError;
193         ///// Receive command
194         CRequestMessage::Result res;
195         res = requestMessage.serialize(clientSocket.get(), false, strError);
196 
197         switch (res) {
198         case CRequestMessage::error:
199             std::cout << "Error while receiving message: " << strError << std::endl;
200             // fall through
201         case CRequestMessage::peerDisconnected:
202             // Consider peer disconnection as normal, no log
203             return; // Bail out
204         case CRequestMessage::success:
205             break; // No error, continue
206         }
207 
208         // Actually process the request
209         bool bSuccess;
210 
211         string strResult;
212 
213         if (_pCommandHandler) {
214 
215             bSuccess = _pCommandHandler->remoteCommandProcess(requestMessage, strResult);
216 
217         } else {
218 
219             strResult = "No handler!";
220 
221             bSuccess = false;
222         }
223 
224         // Send back answer
225         // Create answer message
226         CAnswerMessage answerMessage(strResult, bSuccess);
227 
228         ///// Send answer
229         res = answerMessage.serialize(clientSocket.get(), true, strError);
230 
231         switch (res) {
232         case CRequestMessage::peerDisconnected:
233             // Peer should not disconnect while waiting for an answer
234             // Fall through to log the error and bail out
235         case CRequestMessage::error:
236             std::cout << "Error while receiving message: " << strError << std::endl;
237             return; // Bail out
238         case CRequestMessage::success:
239             break; // No error, continue
240         }
241     }
242 }
243