1#!/usr/bin/env python3
2#
3# Copyright (C) 2022 The Android Open Source Project
4#
5# Licensed under the Apache License, Version 2.0 (the "License");
6# you may not use this file except in compliance with the License.
7# You may obtain a copy of the License at
8#
9#      http://www.apache.org/licenses/LICENSE-2.0
10#
11# Unless required by applicable law or agreed to in writing, software
12# distributed under the License is distributed on an "AS IS" BASIS,
13# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14# See the License for the specific language governing permissions and
15# limitations under the License.
16#
17
18# A tool that can feed data into GNSS HAL over virtio-console
19# This tool can be invoked as gnss_replay.py -s <socket> -i <input>
20# where <socket> is the host-side of a UNIX domain socket that backs
21# virtio-console GNSS vport and <input> is a log file generated by
22# GnssLogger or equivalent tooling
23
24import argparse
25import sys
26import socket
27import time
28
29class DataStore(object):
30    def __init__(self, prefix, lines):
31        self.idx = 0
32        self.lines = []
33        for line in lines:
34            if line.startswith(prefix):
35                self.lines.append(line.strip('\n'))
36
37    def next(self):
38        line = ''
39        if len(self.lines) > 0:
40            line = self.lines[self.idx]
41            self.idx = (self.idx + 1) % len(self.lines)
42        return line
43
44    def __str__(self):
45        return str(self.lines)
46
47def load(input):
48    with open(input) as f:
49        lines = f.readlines()
50        return {'locations' : DataStore('Fix', lines),
51                'measurements' : DataStore('Raw', lines)}
52
53class CommandLoop(object):
54    def __init__(self, socket):
55        self.socket = socket
56        self.commands = {}
57
58    def command(self, action, reaction):
59        self.commands[action] = reaction
60
61    def loop(self):
62        while True:
63            inp = self.socket.recv(128)
64            reaction = self.commands.get(inp, None)
65            if reaction:
66                response = reaction() + '\n\n\n\n'
67                self.socket.send(str.encode(response))
68            else:
69                print("Unknown command '%s'" % inp)
70                self.socket.send(b'\n\n\n\n')
71
72def main():
73    parser = argparse.ArgumentParser(description="GNSS mock host agent")
74    parser.add_argument('--socket', '-s', action='store', required=True, help='The UNIX socket to interact with')
75    parser.add_argument('--input', '-i', action='store', required=True, help='The file with fix information to send over')
76
77    args = parser.parse_args()
78
79    sk = socket.socket(socket.AF_UNIX, socket.SOCK_SEQPACKET)
80    sk.connect(args.socket)
81
82    data = load(args.input)
83
84    loop = CommandLoop(sk)
85    loop.command(b'CMD_GET_LOCATION', lambda: data['locations'].next())
86    loop.command(b'CMD_GET_RAWMEASUREMENT', lambda: data['measurements'].next())
87
88    loop.loop()
89
90if __name__ == "__main__":
91    main()
92