1"""Blueberry gRPC device controller.
2
3This is a server to act as a mock device for testing the Blueberry gRPC
4interface.
5"""
6
7from concurrent import futures
8from absl import app
9from absl import flags
10
11import grpc
12
13# Internal import
14from blueberry.grpc import blueberry_device_controller_service
15from blueberry.grpc.proto import blueberry_device_controller_pb2_grpc
16
17
18_HOST = '[::]'
19
20FLAGS = flags.FLAGS
21flags.DEFINE_integer('port', 10000, 'port to listen on')
22flags.DEFINE_integer('threads', 10, 'number of worker threads in thread pool')
23
24
25def main(unused_argv):
26  server = grpc.server(
27      futures.ThreadPoolExecutor(max_workers=FLAGS.threads),
28      ports=(FLAGS.port,))  # pytype: disable=wrong-keyword-args
29  servicer = (
30      blueberry_device_controller_service.BlueberryDeviceControllerServicer())
31  blueberry_device_controller_pb2_grpc.add_BlueberryDeviceControllerServicer_to_server(
32      servicer, server)
33  server_creds = loas2.loas2_server_credentials()
34  server.add_secure_port(f'{_HOST}:{FLAGS.port}', server_creds)
35  server.start()
36  server.wait_for_termination()
37
38
39if __name__ == '__main__':
40  app.run(main)
41