1"""Blueberry gRPC device controller. 2 3This is a server to act as a mock device for testing the Blueberry gRPC 4interface. 5""" 6 7from concurrent import futures 8from absl import app 9from absl import flags 10 11import grpc 12 13# Internal import 14from blueberry.grpc import blueberry_device_controller_service 15from blueberry.grpc.proto import blueberry_device_controller_pb2_grpc 16 17 18_HOST = '[::]' 19 20FLAGS = flags.FLAGS 21flags.DEFINE_integer('port', 10000, 'port to listen on') 22flags.DEFINE_integer('threads', 10, 'number of worker threads in thread pool') 23 24 25def main(unused_argv): 26 server = grpc.server( 27 futures.ThreadPoolExecutor(max_workers=FLAGS.threads), 28 ports=(FLAGS.port,)) # pytype: disable=wrong-keyword-args 29 servicer = ( 30 blueberry_device_controller_service.BlueberryDeviceControllerServicer()) 31 blueberry_device_controller_pb2_grpc.add_BlueberryDeviceControllerServicer_to_server( 32 servicer, server) 33 server_creds = loas2.loas2_server_credentials() 34 server.add_secure_port(f'{_HOST}:{FLAGS.port}', server_creds) 35 server.start() 36 server.wait_for_termination() 37 38 39if __name__ == '__main__': 40 app.run(main) 41