1#!/usr/bin/env python3 2# 3# Copyright (C) 2022 The Android Open Source Project 4# 5# Licensed under the Apache License, Version 2.0 (the "License"); 6# you may not use this file except in compliance with the License. 7# You may obtain a copy of the License at 8# 9# http://www.apache.org/licenses/LICENSE-2.0 10# 11# Unless required by applicable law or agreed to in writing, software 12# distributed under the License is distributed on an "AS IS" BASIS, 13# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 14# See the License for the specific language governing permissions and 15# limitations under the License. 16# 17 18# A tool that can feed data into GNSS HAL over virtio-console 19# This tool can be invoked as gnss_replay.py -s <socket> -i <input> 20# where <socket> is the host-side of a UNIX domain socket that backs 21# virtio-console GNSS vport and <input> is a log file generated by 22# GnssLogger or equivalent tooling 23 24import argparse 25import sys 26import socket 27import time 28 29class DataStore(object): 30 def __init__(self, prefix, lines): 31 self.idx = 0 32 self.lines = [] 33 for line in lines: 34 if line.startswith(prefix): 35 self.lines.append(line.strip('\n')) 36 37 def next(self): 38 line = '' 39 if len(self.lines) > 0: 40 line = self.lines[self.idx] 41 self.idx = (self.idx + 1) % len(self.lines) 42 return line 43 44 def __str__(self): 45 return str(self.lines) 46 47def load(input): 48 with open(input) as f: 49 lines = f.readlines() 50 return {'locations' : DataStore('Fix', lines), 51 'measurements' : DataStore('Raw', lines)} 52 53class CommandLoop(object): 54 def __init__(self, socket): 55 self.socket = socket 56 self.commands = {} 57 58 def command(self, action, reaction): 59 self.commands[action] = reaction 60 61 def loop(self): 62 while True: 63 inp = self.socket.recv(128) 64 reaction = self.commands.get(inp, None) 65 if reaction: 66 response = reaction() + '\n\n\n\n' 67 self.socket.send(str.encode(response)) 68 else: 69 print("Unknown command '%s'" % inp) 70 self.socket.send(b'\n\n\n\n') 71 72def main(): 73 parser = argparse.ArgumentParser(description="GNSS mock host agent") 74 parser.add_argument('--socket', '-s', action='store', required=True, help='The UNIX socket to interact with') 75 parser.add_argument('--input', '-i', action='store', required=True, help='The file with fix information to send over') 76 77 args = parser.parse_args() 78 79 sk = socket.socket(socket.AF_UNIX, socket.SOCK_SEQPACKET) 80 sk.connect(args.socket) 81 82 data = load(args.input) 83 84 loop = CommandLoop(sk) 85 loop.command(b'CMD_GET_LOCATION', lambda: data['locations'].next()) 86 loop.command(b'CMD_GET_RAWMEASUREMENT', lambda: data['measurements'].next()) 87 88 loop.loop() 89 90if __name__ == "__main__": 91 main() 92