1 /*
2  * Copyright (C) 2020 The Android Open Source Project
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *      http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 #include <chrono>
18 #include <ctime>
19 #include <fstream>
20 #include <iostream>
21 #include <memory>
22 #include <string>
23 
24 #include <grpc/grpc.h>
25 #include <grpcpp/ext/proto_server_reflection_plugin.h>
26 #include <grpcpp/health_check_service_interface.h>
27 #include <grpcpp/server.h>
28 #include <grpcpp/server_builder.h>
29 #include <grpcpp/server_context.h>
30 #include <grpcpp/server_posix.h>
31 
32 #include "gnss_grpc_proxy.grpc.pb.h"
33 
34 #include <signal.h>
35 
36 #include <chrono>
37 #include <deque>
38 #include <mutex>
39 #include <sstream>
40 #include <thread>
41 #include <vector>
42 
43 #include <android-base/logging.h>
44 #include <android-base/strings.h>
45 #include <gflags/gflags.h>
46 
47 #include <common/libs/fs/shared_buf.h>
48 #include <common/libs/fs/shared_fd.h>
49 #include <common/libs/fs/shared_select.h>
50 #include <host/libs/config/cuttlefish_config.h>
51 #include <host/libs/config/logging.h>
52 #include <queue>
53 
54 using gnss_grpc_proxy::GnssGrpcProxy;
55 using gnss_grpc_proxy::SendGpsReply;
56 using gnss_grpc_proxy::SendGpsRequest;
57 using gnss_grpc_proxy::SendGpsCoordinatesReply;
58 using gnss_grpc_proxy::SendGpsCoordinatesRequest;
59 using grpc::Server;
60 using grpc::ServerBuilder;
61 using grpc::ServerContext;
62 using grpc::Status;
63 
64 DEFINE_int32(gnss_in_fd,
65              -1,
66              "File descriptor for the gnss's input channel");
67 DEFINE_int32(gnss_out_fd,
68              -1,
69              "File descriptor for the gnss's output channel");
70 
71 DEFINE_int32(fixed_location_in_fd, -1,
72              "File descriptor for the fixed location input channel");
73 DEFINE_int32(fixed_location_out_fd, -1,
74              "File descriptor for the fixed location output channel");
75 
76 DEFINE_int32(gnss_grpc_port,
77              -1,
78              "Service port for gnss grpc");
79 DEFINE_string(gnss_grpc_socket, "", "Service socket path for gnss grpc");
80 
81 DEFINE_string(gnss_file_path, "",
82               "gnss raw measurement file path for gnss grpc");
83 DEFINE_string(fixed_location_file_path, "",
84               "fixed location file path for gnss grpc");
85 
86 constexpr char CMD_GET_LOCATION[] = "CMD_GET_LOCATION";
87 constexpr char CMD_GET_RAWMEASUREMENT[] = "CMD_GET_RAWMEASUREMENT";
88 constexpr char END_OF_MSG_MARK[] = "\n\n\n\n";
89 
90 constexpr uint32_t GNSS_SERIAL_BUFFER_SIZE = 4096;
91 
GenerateGpsLine(const std::string & dataPoint)92 std::string GenerateGpsLine(const std::string& dataPoint) {
93   std::string unix_time_millis =
94       std::to_string(std::chrono::duration_cast<std::chrono::milliseconds>(
95                          std::chrono::system_clock::now().time_since_epoch())
96                          .count());
97   std::string formatted_location =
98       std::string("Fix,GPS,") + dataPoint + "," +
99       std::string("0.000000,3.790092,0.000000,") + unix_time_millis + "," +
100       std::string("0.086023256,0.0,11529389988248");
101 
102   return formatted_location;
103 }
104 // Logic and data behind the server's behavior.
105 class GnssGrpcProxyServiceImpl final : public GnssGrpcProxy::Service {
106   public:
GnssGrpcProxyServiceImpl(cuttlefish::SharedFD gnss_in,cuttlefish::SharedFD gnss_out,cuttlefish::SharedFD fixed_location_in,cuttlefish::SharedFD fixed_location_out)107    GnssGrpcProxyServiceImpl(cuttlefish::SharedFD gnss_in,
108                             cuttlefish::SharedFD gnss_out,
109                             cuttlefish::SharedFD fixed_location_in,
110                             cuttlefish::SharedFD fixed_location_out)
111        : gnss_in_(gnss_in),
112          gnss_out_(gnss_out),
113          fixed_location_in_(fixed_location_in),
114          fixed_location_out_(fixed_location_out) {
115           //Set the default GPS delay to 1 second
116           fixed_locations_delay_=1000;
117          }
118 
119 
SendGps(ServerContext * context,const SendGpsRequest * request,SendGpsReply * reply)120    Status SendGps(ServerContext* context, const SendGpsRequest* request,
121                   SendGpsReply* reply) override {
122      reply->set_reply("Received gps record");
123      std::lock_guard<std::mutex> lock(cached_fixed_location_mutex);
124      cached_fixed_location = request->gps();
125      return Status::OK;
126    }
127 
128 
ConvertCoordinate(gnss_grpc_proxy::GpsCoordinates coordinate)129   std::string ConvertCoordinate(gnss_grpc_proxy::GpsCoordinates coordinate){
130     std::string latitude = std::to_string(coordinate.latitude());
131     std::string longitude = std::to_string(coordinate.longitude());
132     std::string elevation = std::to_string(coordinate.elevation());
133     std::string result = latitude + "," + longitude + "," + elevation;
134     return result;
135   }
136 
SendGpsVector(ServerContext * context,const SendGpsCoordinatesRequest * request,SendGpsCoordinatesReply * reply)137    Status SendGpsVector(ServerContext* context,
138                         const SendGpsCoordinatesRequest* request,
139                         SendGpsCoordinatesReply* reply) override {
140      reply->set_status(SendGpsCoordinatesReply::OK);//update protobuf reply
141      {
142        std::lock_guard<std::mutex> lock(fixed_locations_queue_mutex_);
143        // Reset local buffers
144        fixed_locations_queue_ = {};
145        // Make a local copy of the input buffers
146        for (auto loc : request->coordinates()) {
147          fixed_locations_queue_.push(ConvertCoordinate(loc));
148        }
149        fixed_locations_delay_ = request->delay();
150      }
151 
152      return Status::OK;
153    }
154 
sendToSerial()155     void sendToSerial() {
156       std::lock_guard<std::mutex> lock(cached_fixed_location_mutex);
157       ssize_t bytes_written = cuttlefish::WriteAll(
158           fixed_location_in_, cached_fixed_location + END_OF_MSG_MARK);
159       if (bytes_written < 0) {
160           LOG(ERROR) << "Error writing to fd: " << gnss_in_->StrError();
161       }
162     }
163 
sendGnssRawToSerial()164     void sendGnssRawToSerial() {
165       std::lock_guard<std::mutex> lock(cached_gnss_raw_mutex);
166       if (!isGnssRawMeasurement(cached_gnss_raw)) {
167         return;
168       }
169       if (previous_cached_gnss_raw == cached_gnss_raw) {
170         // Skip for same record
171         return;
172       } else {
173         // Update cached data
174         LOG(DEBUG) << "Skip same record";
175         previous_cached_gnss_raw = cached_gnss_raw;
176       }
177       ssize_t bytes_written =
178           cuttlefish::WriteAll(gnss_in_, cached_gnss_raw + END_OF_MSG_MARK);
179       LOG(DEBUG) << "Send Gnss Raw to serial: bytes_written: " << bytes_written;
180       if (bytes_written < 0) {
181         LOG(ERROR) << "Error writing to fd: " << gnss_in_->StrError();
182       }
183     }
184 
StartServer()185     void StartServer() {
186       // Create a new thread to handle writes to the gnss and to the any client
187       // connected to the socket.
188       fixed_location_write_thread_ =
189           std::thread([this]() { WriteFixedLocationFromQueue(); });
190       measurement_read_thread_ =
191           std::thread([this]() { ReadMeasurementLoop(); });
192       fixed_location_read_thread_ =
193           std::thread([this]() { ReadFixedLocLoop(); });
194     }
195 
StartReadFixedLocationFileThread()196     void StartReadFixedLocationFileThread() {
197       // Create a new thread to read fixed_location data.
198       fixed_location_file_read_thread_ =
199           std::thread([this]() { ReadFixedLocationFromLocalFile(); });
200     }
201 
StartReadGnssRawMeasurementFileThread()202     void StartReadGnssRawMeasurementFileThread() {
203       // Create a new thread to read raw measurement data.
204       measurement_file_read_thread_ =
205           std::thread([this]() { ReadGnssRawMeasurement(); });
206     }
207 
ReadFixedLocationFromLocalFile()208     void ReadFixedLocationFromLocalFile() {
209       std::ifstream file(FLAGS_fixed_location_file_path);
210       if (file.is_open()) {
211         std::string line;
212         while (std::getline(file, line)) {
213           /* Only support fix location format to make it simple.
214            * Records will only contains 'Fix' prefix.
215            * Sample line:
216            * Fix,GPS,37.7925002,-122.3979132,13.462797,0.000000,48.000000,0.000000,1593029872254,0.581968,0.000000
217            * Sending at 1Hz, currently user should provide a fixed location
218            * file that has one location per second. need some extra work to
219            * make it more generic, i.e. align with the timestamp in the file.
220            */
221           {
222             std::lock_guard<std::mutex> lock(cached_fixed_location_mutex);
223             cached_fixed_location = line;
224           }
225           std::this_thread::sleep_for(std::chrono::milliseconds(1000));
226         }
227           file.close();
228       } else {
229         LOG(ERROR) << "Can not open fixed location file: "
230                    << FLAGS_gnss_file_path;
231         return;
232       }
233     }
234 
ReadGnssRawMeasurement()235     void ReadGnssRawMeasurement() {
236       std::ifstream file(FLAGS_gnss_file_path);
237 
238       if (file.is_open()) {
239         std::string line;
240         std::string cached_line = "";
241         std::string header = "";
242 
243         while (!cached_line.empty() || std::getline(file, line)) {
244           if (!cached_line.empty()) {
245             line = cached_line;
246             cached_line = "";
247           }
248 
249           // Get data header.
250           if (header.empty() && android::base::StartsWith(line, "# Raw")) {
251             header = line;
252             LOG(DEBUG) << "Header: " << header;
253             continue;
254           }
255 
256           // Ignore not raw measurement data.
257           if (!android::base::StartsWith(line, "Raw")) {
258             continue;
259           }
260 
261           {
262             std::lock_guard<std::mutex> lock(cached_gnss_raw_mutex);
263             cached_gnss_raw = header + "\n" + line;
264 
265             std::string new_line = "";
266             while (std::getline(file, new_line)) {
267               // Group raw data by TimeNanos.
268               if (getTimeNanosFromLine(new_line) ==
269                   getTimeNanosFromLine(line)) {
270                 cached_gnss_raw += "\n" + new_line;
271               } else {
272                 cached_line = new_line;
273                 break;
274               }
275             }
276           }
277           std::this_thread::sleep_for(std::chrono::milliseconds(1000));
278         }
279         file.close();
280       } else {
281         LOG(ERROR) << "Can not open GNSS Raw file: " << FLAGS_gnss_file_path;
282         return;
283       }
284     }
285 
~GnssGrpcProxyServiceImpl()286     ~GnssGrpcProxyServiceImpl() {
287       if (fixed_location_file_read_thread_.joinable()) {
288         fixed_location_file_read_thread_.join();
289       }
290       if (fixed_location_write_thread_.joinable()) {
291         fixed_location_write_thread_.join();
292       }
293       if (measurement_file_read_thread_.joinable()) {
294         measurement_file_read_thread_.join();
295       }
296       if (measurement_read_thread_.joinable()) {
297         measurement_read_thread_.join();
298       }
299       if (fixed_location_read_thread_.joinable()) {
300         fixed_location_read_thread_.join();
301       }
302     }
303 
304   private:
SendCommand(std::string command,cuttlefish::SharedFD source_out,int out_fd)305    void SendCommand(std::string command, cuttlefish::SharedFD source_out,
306                     int out_fd) {
307      std::vector<char> buffer(GNSS_SERIAL_BUFFER_SIZE);
308      std::string cmd_str;
309      auto bytes_read = source_out->Read(buffer.data(), buffer.size());
310      if (bytes_read > 0) {
311        std::string s(buffer.data(), bytes_read);
312        cmd_str += s;
313        // In case random string sent though /dev/gnss1, cmd_str will
314        // auto resize, to get rid of first page.
315        if (cmd_str.size() > GNSS_SERIAL_BUFFER_SIZE * 2) {
316          cmd_str = cmd_str.substr(cmd_str.size() - GNSS_SERIAL_BUFFER_SIZE);
317        }
318        if (cmd_str.find(command) != std::string::npos) {
319          if (command == CMD_GET_RAWMEASUREMENT) {
320            sendGnssRawToSerial();
321          } else if (command == CMD_GET_LOCATION) {
322            sendToSerial();
323          }
324          cmd_str = "";
325        }
326      } else {
327        if (source_out->GetErrno() == EAGAIN ||
328            source_out->GetErrno() == EWOULDBLOCK) {
329          std::this_thread::sleep_for(std::chrono::milliseconds(100));
330        } else {
331          LOG(ERROR) << "Error reading fd " << out_fd << ": "
332                     << " Error code: " << source_out->GetErrno()
333                     << " Error sg:" << source_out->StrError();
334        }
335      }
336    }
337 
ReadMeasurementLoop()338    [[noreturn]] void ReadMeasurementLoop() {
339      int flags = gnss_out_->Fcntl(F_GETFL, 0);
340      gnss_out_->Fcntl(F_SETFL, flags | O_NONBLOCK);
341 
342      while (true) {
343        SendCommand(CMD_GET_RAWMEASUREMENT, gnss_out_, FLAGS_gnss_out_fd);
344      }
345    }
346 
ReadFixedLocLoop()347    [[noreturn]] void ReadFixedLocLoop() {
348      int flags2 = fixed_location_out_->Fcntl(F_GETFL, 0);
349      fixed_location_out_->Fcntl(F_SETFL, flags2 | O_NONBLOCK);
350      while (true) {
351        SendCommand(CMD_GET_LOCATION, fixed_location_out_,
352                    FLAGS_fixed_location_out_fd);
353      }
354    }
355 
WriteFixedLocationFromQueue()356    [[noreturn]] void WriteFixedLocationFromQueue() {
357       while (true) {
358          if (!fixed_locations_queue_.empty()) {
359          std::string dataPoint = fixed_locations_queue_.front();
360          std::string line = GenerateGpsLine(dataPoint);
361          std::lock_guard<std::mutex> lock(cached_fixed_location_mutex);
362          cached_fixed_location = line;
363          {
364            std::lock_guard<std::mutex> lock(fixed_locations_queue_mutex_);
365            fixed_locations_queue_.pop();
366          }
367        }
368        std::this_thread::sleep_for(std::chrono::milliseconds(fixed_locations_delay_));
369      }
370    }
371 
getTimeNanosFromLine(const std::string & line)372     std::string getTimeNanosFromLine(const std::string& line) {
373       // TimeNanos is in column #3.
374       std::vector<std::string> vals = android::base::Split(line, ",");
375       return vals.size() >= 3 ? vals[2] : "-1";
376     }
377 
isGnssRawMeasurement(const std::string & inputStr)378     bool isGnssRawMeasurement(const std::string& inputStr) {
379       // TODO: add more logic check to by pass invalid data.
380       return !inputStr.empty() && android::base::StartsWith(inputStr, "# Raw");
381     }
382 
383     cuttlefish::SharedFD gnss_in_;
384     cuttlefish::SharedFD gnss_out_;
385     cuttlefish::SharedFD fixed_location_in_;
386     cuttlefish::SharedFD fixed_location_out_;
387 
388     std::thread measurement_read_thread_;
389     std::thread fixed_location_read_thread_;
390     std::thread fixed_location_file_read_thread_;
391     std::thread fixed_location_write_thread_;
392     std::thread measurement_file_read_thread_;
393 
394     std::string cached_fixed_location;
395     std::mutex cached_fixed_location_mutex;
396 
397     std::string cached_gnss_raw;
398     std::string previous_cached_gnss_raw;
399     std::mutex cached_gnss_raw_mutex;
400 
401     std::queue<std::string> fixed_locations_queue_;
402     std::mutex fixed_locations_queue_mutex_;
403     int fixed_locations_delay_;
404 };
405 
RunServer()406 void RunServer() {
407   grpc::EnableDefaultHealthCheckService(true);
408   grpc::reflection::InitProtoReflectionServerBuilderPlugin();
409   auto gnss_in = cuttlefish::SharedFD::Dup(FLAGS_gnss_in_fd);
410   close(FLAGS_gnss_in_fd);
411   if (!gnss_in->IsOpen()) {
412     LOG(ERROR) << "Error dupping fd " << FLAGS_gnss_in_fd << ": "
413                << gnss_in->StrError();
414     return;
415   }
416   close(FLAGS_gnss_in_fd);
417 
418   auto gnss_out = cuttlefish::SharedFD::Dup(FLAGS_gnss_out_fd);
419   close(FLAGS_gnss_out_fd);
420   if (!gnss_out->IsOpen()) {
421     LOG(ERROR) << "Error dupping fd " << FLAGS_gnss_out_fd << ": "
422                << gnss_out->StrError();
423     return;
424   }
425 
426   auto fixed_location_in =
427       cuttlefish::SharedFD::Dup(FLAGS_fixed_location_in_fd);
428   close(FLAGS_fixed_location_in_fd);
429   if (!fixed_location_in->IsOpen()) {
430     LOG(ERROR) << "Error dupping fd " << FLAGS_fixed_location_in_fd << ": "
431                << fixed_location_in->StrError();
432     return;
433   }
434   close(FLAGS_fixed_location_in_fd);
435 
436   auto fixed_location_out =
437       cuttlefish::SharedFD::Dup(FLAGS_fixed_location_out_fd);
438   close(FLAGS_fixed_location_out_fd);
439   if (!fixed_location_out->IsOpen()) {
440     LOG(ERROR) << "Error dupping fd " << FLAGS_fixed_location_out_fd << ": "
441                << fixed_location_out->StrError();
442     return;
443   }
444 
445   auto server_address("0.0.0.0:" + std::to_string(FLAGS_gnss_grpc_port));
446   GnssGrpcProxyServiceImpl service(gnss_in, gnss_out, fixed_location_in,
447                                    fixed_location_out);
448   service.StartServer();
449   if (!FLAGS_gnss_file_path.empty()) {
450     // TODO: On-demand start the read file threads according to data type.
451     service.StartReadFixedLocationFileThread();
452     service.StartReadGnssRawMeasurementFileThread();
453 
454     // In the local mode, we are not start a grpc server, use a infinite loop instead
455     while(true) {
456       std::this_thread::sleep_for(std::chrono::milliseconds(2000));
457     }
458   } else {
459     ServerBuilder builder;
460     // Listen on the given address without any authentication mechanism.
461     builder.AddListeningPort(server_address, grpc::InsecureServerCredentials());
462     if (!FLAGS_gnss_grpc_socket.empty()) {
463       builder.AddListeningPort("unix:" + FLAGS_gnss_grpc_socket,
464                                grpc::InsecureServerCredentials());
465     }
466     // Register "service" as the instance through which we'll communicate with
467     // clients. In this case it corresponds to an *synchronous* service.
468     builder.RegisterService(&service);
469     // Finally assemble the server.
470     std::unique_ptr<Server> server(builder.BuildAndStart());
471     std::cout << "Server listening on " << server_address << std::endl;
472 
473     // Wait for the server to shutdown. Note that some other thread must be
474     // responsible for shutting down the server for this call to ever return.
475     server->Wait();
476   }
477 
478 }
479 
480 
main(int argc,char ** argv)481 int main(int argc, char** argv) {
482   cuttlefish::DefaultSubprocessLogging(argv);
483   ::gflags::ParseCommandLineFlags(&argc, &argv, true);
484 
485   LOG(DEBUG) << "Starting gnss grpc proxy server...";
486   RunServer();
487 
488   return 0;
489 }
490