1 /*
2 * Copyright (C) 2020 The Android Open Source Project
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
17 #include <chrono>
18 #include <ctime>
19 #include <fstream>
20 #include <iostream>
21 #include <memory>
22 #include <string>
23
24 #include <grpc/grpc.h>
25 #include <grpcpp/ext/proto_server_reflection_plugin.h>
26 #include <grpcpp/health_check_service_interface.h>
27 #include <grpcpp/server.h>
28 #include <grpcpp/server_builder.h>
29 #include <grpcpp/server_context.h>
30 #include <grpcpp/server_posix.h>
31
32 #include "gnss_grpc_proxy.grpc.pb.h"
33
34 #include <signal.h>
35
36 #include <chrono>
37 #include <deque>
38 #include <mutex>
39 #include <sstream>
40 #include <thread>
41 #include <vector>
42
43 #include <android-base/logging.h>
44 #include <android-base/strings.h>
45 #include <gflags/gflags.h>
46
47 #include <common/libs/fs/shared_buf.h>
48 #include <common/libs/fs/shared_fd.h>
49 #include <common/libs/fs/shared_select.h>
50 #include <host/libs/config/cuttlefish_config.h>
51 #include <host/libs/config/logging.h>
52 #include <queue>
53
54 using gnss_grpc_proxy::GnssGrpcProxy;
55 using gnss_grpc_proxy::SendGpsReply;
56 using gnss_grpc_proxy::SendGpsRequest;
57 using gnss_grpc_proxy::SendGpsCoordinatesReply;
58 using gnss_grpc_proxy::SendGpsCoordinatesRequest;
59 using grpc::Server;
60 using grpc::ServerBuilder;
61 using grpc::ServerContext;
62 using grpc::Status;
63
64 DEFINE_int32(gnss_in_fd,
65 -1,
66 "File descriptor for the gnss's input channel");
67 DEFINE_int32(gnss_out_fd,
68 -1,
69 "File descriptor for the gnss's output channel");
70
71 DEFINE_int32(fixed_location_in_fd, -1,
72 "File descriptor for the fixed location input channel");
73 DEFINE_int32(fixed_location_out_fd, -1,
74 "File descriptor for the fixed location output channel");
75
76 DEFINE_int32(gnss_grpc_port,
77 -1,
78 "Service port for gnss grpc");
79 DEFINE_string(gnss_grpc_socket, "", "Service socket path for gnss grpc");
80
81 DEFINE_string(gnss_file_path, "",
82 "gnss raw measurement file path for gnss grpc");
83 DEFINE_string(fixed_location_file_path, "",
84 "fixed location file path for gnss grpc");
85
86 constexpr char CMD_GET_LOCATION[] = "CMD_GET_LOCATION";
87 constexpr char CMD_GET_RAWMEASUREMENT[] = "CMD_GET_RAWMEASUREMENT";
88 constexpr char END_OF_MSG_MARK[] = "\n\n\n\n";
89
90 constexpr uint32_t GNSS_SERIAL_BUFFER_SIZE = 4096;
91
GenerateGpsLine(const std::string & dataPoint)92 std::string GenerateGpsLine(const std::string& dataPoint) {
93 std::string unix_time_millis =
94 std::to_string(std::chrono::duration_cast<std::chrono::milliseconds>(
95 std::chrono::system_clock::now().time_since_epoch())
96 .count());
97 std::string formatted_location =
98 std::string("Fix,GPS,") + dataPoint + "," +
99 std::string("0.000000,3.790092,0.000000,") + unix_time_millis + "," +
100 std::string("0.086023256,0.0,11529389988248");
101
102 return formatted_location;
103 }
104 // Logic and data behind the server's behavior.
105 class GnssGrpcProxyServiceImpl final : public GnssGrpcProxy::Service {
106 public:
GnssGrpcProxyServiceImpl(cuttlefish::SharedFD gnss_in,cuttlefish::SharedFD gnss_out,cuttlefish::SharedFD fixed_location_in,cuttlefish::SharedFD fixed_location_out)107 GnssGrpcProxyServiceImpl(cuttlefish::SharedFD gnss_in,
108 cuttlefish::SharedFD gnss_out,
109 cuttlefish::SharedFD fixed_location_in,
110 cuttlefish::SharedFD fixed_location_out)
111 : gnss_in_(gnss_in),
112 gnss_out_(gnss_out),
113 fixed_location_in_(fixed_location_in),
114 fixed_location_out_(fixed_location_out) {
115 //Set the default GPS delay to 1 second
116 fixed_locations_delay_=1000;
117 }
118
119
SendGps(ServerContext * context,const SendGpsRequest * request,SendGpsReply * reply)120 Status SendGps(ServerContext* context, const SendGpsRequest* request,
121 SendGpsReply* reply) override {
122 reply->set_reply("Received gps record");
123 std::lock_guard<std::mutex> lock(cached_fixed_location_mutex);
124 cached_fixed_location = request->gps();
125 return Status::OK;
126 }
127
128
ConvertCoordinate(gnss_grpc_proxy::GpsCoordinates coordinate)129 std::string ConvertCoordinate(gnss_grpc_proxy::GpsCoordinates coordinate){
130 std::string latitude = std::to_string(coordinate.latitude());
131 std::string longitude = std::to_string(coordinate.longitude());
132 std::string elevation = std::to_string(coordinate.elevation());
133 std::string result = latitude + "," + longitude + "," + elevation;
134 return result;
135 }
136
SendGpsVector(ServerContext * context,const SendGpsCoordinatesRequest * request,SendGpsCoordinatesReply * reply)137 Status SendGpsVector(ServerContext* context,
138 const SendGpsCoordinatesRequest* request,
139 SendGpsCoordinatesReply* reply) override {
140 reply->set_status(SendGpsCoordinatesReply::OK);//update protobuf reply
141 {
142 std::lock_guard<std::mutex> lock(fixed_locations_queue_mutex_);
143 // Reset local buffers
144 fixed_locations_queue_ = {};
145 // Make a local copy of the input buffers
146 for (auto loc : request->coordinates()) {
147 fixed_locations_queue_.push(ConvertCoordinate(loc));
148 }
149 fixed_locations_delay_ = request->delay();
150 }
151
152 return Status::OK;
153 }
154
sendToSerial()155 void sendToSerial() {
156 std::lock_guard<std::mutex> lock(cached_fixed_location_mutex);
157 ssize_t bytes_written = cuttlefish::WriteAll(
158 fixed_location_in_, cached_fixed_location + END_OF_MSG_MARK);
159 if (bytes_written < 0) {
160 LOG(ERROR) << "Error writing to fd: " << gnss_in_->StrError();
161 }
162 }
163
sendGnssRawToSerial()164 void sendGnssRawToSerial() {
165 std::lock_guard<std::mutex> lock(cached_gnss_raw_mutex);
166 if (!isGnssRawMeasurement(cached_gnss_raw)) {
167 return;
168 }
169 if (previous_cached_gnss_raw == cached_gnss_raw) {
170 // Skip for same record
171 return;
172 } else {
173 // Update cached data
174 LOG(DEBUG) << "Skip same record";
175 previous_cached_gnss_raw = cached_gnss_raw;
176 }
177 ssize_t bytes_written =
178 cuttlefish::WriteAll(gnss_in_, cached_gnss_raw + END_OF_MSG_MARK);
179 LOG(DEBUG) << "Send Gnss Raw to serial: bytes_written: " << bytes_written;
180 if (bytes_written < 0) {
181 LOG(ERROR) << "Error writing to fd: " << gnss_in_->StrError();
182 }
183 }
184
StartServer()185 void StartServer() {
186 // Create a new thread to handle writes to the gnss and to the any client
187 // connected to the socket.
188 fixed_location_write_thread_ =
189 std::thread([this]() { WriteFixedLocationFromQueue(); });
190 measurement_read_thread_ =
191 std::thread([this]() { ReadMeasurementLoop(); });
192 fixed_location_read_thread_ =
193 std::thread([this]() { ReadFixedLocLoop(); });
194 }
195
StartReadFixedLocationFileThread()196 void StartReadFixedLocationFileThread() {
197 // Create a new thread to read fixed_location data.
198 fixed_location_file_read_thread_ =
199 std::thread([this]() { ReadFixedLocationFromLocalFile(); });
200 }
201
StartReadGnssRawMeasurementFileThread()202 void StartReadGnssRawMeasurementFileThread() {
203 // Create a new thread to read raw measurement data.
204 measurement_file_read_thread_ =
205 std::thread([this]() { ReadGnssRawMeasurement(); });
206 }
207
ReadFixedLocationFromLocalFile()208 void ReadFixedLocationFromLocalFile() {
209 std::ifstream file(FLAGS_fixed_location_file_path);
210 if (file.is_open()) {
211 std::string line;
212 while (std::getline(file, line)) {
213 /* Only support fix location format to make it simple.
214 * Records will only contains 'Fix' prefix.
215 * Sample line:
216 * Fix,GPS,37.7925002,-122.3979132,13.462797,0.000000,48.000000,0.000000,1593029872254,0.581968,0.000000
217 * Sending at 1Hz, currently user should provide a fixed location
218 * file that has one location per second. need some extra work to
219 * make it more generic, i.e. align with the timestamp in the file.
220 */
221 {
222 std::lock_guard<std::mutex> lock(cached_fixed_location_mutex);
223 cached_fixed_location = line;
224 }
225 std::this_thread::sleep_for(std::chrono::milliseconds(1000));
226 }
227 file.close();
228 } else {
229 LOG(ERROR) << "Can not open fixed location file: "
230 << FLAGS_gnss_file_path;
231 return;
232 }
233 }
234
ReadGnssRawMeasurement()235 void ReadGnssRawMeasurement() {
236 std::ifstream file(FLAGS_gnss_file_path);
237
238 if (file.is_open()) {
239 std::string line;
240 std::string cached_line = "";
241 std::string header = "";
242
243 while (!cached_line.empty() || std::getline(file, line)) {
244 if (!cached_line.empty()) {
245 line = cached_line;
246 cached_line = "";
247 }
248
249 // Get data header.
250 if (header.empty() && android::base::StartsWith(line, "# Raw")) {
251 header = line;
252 LOG(DEBUG) << "Header: " << header;
253 continue;
254 }
255
256 // Ignore not raw measurement data.
257 if (!android::base::StartsWith(line, "Raw")) {
258 continue;
259 }
260
261 {
262 std::lock_guard<std::mutex> lock(cached_gnss_raw_mutex);
263 cached_gnss_raw = header + "\n" + line;
264
265 std::string new_line = "";
266 while (std::getline(file, new_line)) {
267 // Group raw data by TimeNanos.
268 if (getTimeNanosFromLine(new_line) ==
269 getTimeNanosFromLine(line)) {
270 cached_gnss_raw += "\n" + new_line;
271 } else {
272 cached_line = new_line;
273 break;
274 }
275 }
276 }
277 std::this_thread::sleep_for(std::chrono::milliseconds(1000));
278 }
279 file.close();
280 } else {
281 LOG(ERROR) << "Can not open GNSS Raw file: " << FLAGS_gnss_file_path;
282 return;
283 }
284 }
285
~GnssGrpcProxyServiceImpl()286 ~GnssGrpcProxyServiceImpl() {
287 if (fixed_location_file_read_thread_.joinable()) {
288 fixed_location_file_read_thread_.join();
289 }
290 if (fixed_location_write_thread_.joinable()) {
291 fixed_location_write_thread_.join();
292 }
293 if (measurement_file_read_thread_.joinable()) {
294 measurement_file_read_thread_.join();
295 }
296 if (measurement_read_thread_.joinable()) {
297 measurement_read_thread_.join();
298 }
299 if (fixed_location_read_thread_.joinable()) {
300 fixed_location_read_thread_.join();
301 }
302 }
303
304 private:
SendCommand(std::string command,cuttlefish::SharedFD source_out,int out_fd)305 void SendCommand(std::string command, cuttlefish::SharedFD source_out,
306 int out_fd) {
307 std::vector<char> buffer(GNSS_SERIAL_BUFFER_SIZE);
308 std::string cmd_str;
309 auto bytes_read = source_out->Read(buffer.data(), buffer.size());
310 if (bytes_read > 0) {
311 std::string s(buffer.data(), bytes_read);
312 cmd_str += s;
313 // In case random string sent though /dev/gnss1, cmd_str will
314 // auto resize, to get rid of first page.
315 if (cmd_str.size() > GNSS_SERIAL_BUFFER_SIZE * 2) {
316 cmd_str = cmd_str.substr(cmd_str.size() - GNSS_SERIAL_BUFFER_SIZE);
317 }
318 if (cmd_str.find(command) != std::string::npos) {
319 if (command == CMD_GET_RAWMEASUREMENT) {
320 sendGnssRawToSerial();
321 } else if (command == CMD_GET_LOCATION) {
322 sendToSerial();
323 }
324 cmd_str = "";
325 }
326 } else {
327 if (source_out->GetErrno() == EAGAIN ||
328 source_out->GetErrno() == EWOULDBLOCK) {
329 std::this_thread::sleep_for(std::chrono::milliseconds(100));
330 } else {
331 LOG(ERROR) << "Error reading fd " << out_fd << ": "
332 << " Error code: " << source_out->GetErrno()
333 << " Error sg:" << source_out->StrError();
334 }
335 }
336 }
337
ReadMeasurementLoop()338 [[noreturn]] void ReadMeasurementLoop() {
339 int flags = gnss_out_->Fcntl(F_GETFL, 0);
340 gnss_out_->Fcntl(F_SETFL, flags | O_NONBLOCK);
341
342 while (true) {
343 SendCommand(CMD_GET_RAWMEASUREMENT, gnss_out_, FLAGS_gnss_out_fd);
344 }
345 }
346
ReadFixedLocLoop()347 [[noreturn]] void ReadFixedLocLoop() {
348 int flags2 = fixed_location_out_->Fcntl(F_GETFL, 0);
349 fixed_location_out_->Fcntl(F_SETFL, flags2 | O_NONBLOCK);
350 while (true) {
351 SendCommand(CMD_GET_LOCATION, fixed_location_out_,
352 FLAGS_fixed_location_out_fd);
353 }
354 }
355
WriteFixedLocationFromQueue()356 [[noreturn]] void WriteFixedLocationFromQueue() {
357 while (true) {
358 if (!fixed_locations_queue_.empty()) {
359 std::string dataPoint = fixed_locations_queue_.front();
360 std::string line = GenerateGpsLine(dataPoint);
361 std::lock_guard<std::mutex> lock(cached_fixed_location_mutex);
362 cached_fixed_location = line;
363 {
364 std::lock_guard<std::mutex> lock(fixed_locations_queue_mutex_);
365 fixed_locations_queue_.pop();
366 }
367 }
368 std::this_thread::sleep_for(std::chrono::milliseconds(fixed_locations_delay_));
369 }
370 }
371
getTimeNanosFromLine(const std::string & line)372 std::string getTimeNanosFromLine(const std::string& line) {
373 // TimeNanos is in column #3.
374 std::vector<std::string> vals = android::base::Split(line, ",");
375 return vals.size() >= 3 ? vals[2] : "-1";
376 }
377
isGnssRawMeasurement(const std::string & inputStr)378 bool isGnssRawMeasurement(const std::string& inputStr) {
379 // TODO: add more logic check to by pass invalid data.
380 return !inputStr.empty() && android::base::StartsWith(inputStr, "# Raw");
381 }
382
383 cuttlefish::SharedFD gnss_in_;
384 cuttlefish::SharedFD gnss_out_;
385 cuttlefish::SharedFD fixed_location_in_;
386 cuttlefish::SharedFD fixed_location_out_;
387
388 std::thread measurement_read_thread_;
389 std::thread fixed_location_read_thread_;
390 std::thread fixed_location_file_read_thread_;
391 std::thread fixed_location_write_thread_;
392 std::thread measurement_file_read_thread_;
393
394 std::string cached_fixed_location;
395 std::mutex cached_fixed_location_mutex;
396
397 std::string cached_gnss_raw;
398 std::string previous_cached_gnss_raw;
399 std::mutex cached_gnss_raw_mutex;
400
401 std::queue<std::string> fixed_locations_queue_;
402 std::mutex fixed_locations_queue_mutex_;
403 int fixed_locations_delay_;
404 };
405
RunServer()406 void RunServer() {
407 grpc::EnableDefaultHealthCheckService(true);
408 grpc::reflection::InitProtoReflectionServerBuilderPlugin();
409 auto gnss_in = cuttlefish::SharedFD::Dup(FLAGS_gnss_in_fd);
410 close(FLAGS_gnss_in_fd);
411 if (!gnss_in->IsOpen()) {
412 LOG(ERROR) << "Error dupping fd " << FLAGS_gnss_in_fd << ": "
413 << gnss_in->StrError();
414 return;
415 }
416 close(FLAGS_gnss_in_fd);
417
418 auto gnss_out = cuttlefish::SharedFD::Dup(FLAGS_gnss_out_fd);
419 close(FLAGS_gnss_out_fd);
420 if (!gnss_out->IsOpen()) {
421 LOG(ERROR) << "Error dupping fd " << FLAGS_gnss_out_fd << ": "
422 << gnss_out->StrError();
423 return;
424 }
425
426 auto fixed_location_in =
427 cuttlefish::SharedFD::Dup(FLAGS_fixed_location_in_fd);
428 close(FLAGS_fixed_location_in_fd);
429 if (!fixed_location_in->IsOpen()) {
430 LOG(ERROR) << "Error dupping fd " << FLAGS_fixed_location_in_fd << ": "
431 << fixed_location_in->StrError();
432 return;
433 }
434 close(FLAGS_fixed_location_in_fd);
435
436 auto fixed_location_out =
437 cuttlefish::SharedFD::Dup(FLAGS_fixed_location_out_fd);
438 close(FLAGS_fixed_location_out_fd);
439 if (!fixed_location_out->IsOpen()) {
440 LOG(ERROR) << "Error dupping fd " << FLAGS_fixed_location_out_fd << ": "
441 << fixed_location_out->StrError();
442 return;
443 }
444
445 auto server_address("0.0.0.0:" + std::to_string(FLAGS_gnss_grpc_port));
446 GnssGrpcProxyServiceImpl service(gnss_in, gnss_out, fixed_location_in,
447 fixed_location_out);
448 service.StartServer();
449 if (!FLAGS_gnss_file_path.empty()) {
450 // TODO: On-demand start the read file threads according to data type.
451 service.StartReadFixedLocationFileThread();
452 service.StartReadGnssRawMeasurementFileThread();
453
454 // In the local mode, we are not start a grpc server, use a infinite loop instead
455 while(true) {
456 std::this_thread::sleep_for(std::chrono::milliseconds(2000));
457 }
458 } else {
459 ServerBuilder builder;
460 // Listen on the given address without any authentication mechanism.
461 builder.AddListeningPort(server_address, grpc::InsecureServerCredentials());
462 if (!FLAGS_gnss_grpc_socket.empty()) {
463 builder.AddListeningPort("unix:" + FLAGS_gnss_grpc_socket,
464 grpc::InsecureServerCredentials());
465 }
466 // Register "service" as the instance through which we'll communicate with
467 // clients. In this case it corresponds to an *synchronous* service.
468 builder.RegisterService(&service);
469 // Finally assemble the server.
470 std::unique_ptr<Server> server(builder.BuildAndStart());
471 std::cout << "Server listening on " << server_address << std::endl;
472
473 // Wait for the server to shutdown. Note that some other thread must be
474 // responsible for shutting down the server for this call to ever return.
475 server->Wait();
476 }
477
478 }
479
480
main(int argc,char ** argv)481 int main(int argc, char** argv) {
482 cuttlefish::DefaultSubprocessLogging(argv);
483 ::gflags::ParseCommandLineFlags(&argc, &argv, true);
484
485 LOG(DEBUG) << "Starting gnss grpc proxy server...";
486 RunServer();
487
488 return 0;
489 }
490