1# Copyright 2017 gRPC authors. 2# 3# Licensed under the Apache License, Version 2.0 (the "License"); 4# you may not use this file except in compliance with the License. 5# You may obtain a copy of the License at 6# 7# http://www.apache.org/licenses/LICENSE-2.0 8# 9# Unless required by applicable law or agreed to in writing, software 10# distributed under the License is distributed on an "AS IS" BASIS, 11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12# See the License for the specific language governing permissions and 13# limitations under the License. 14"""An example gRPC Python-using server-side application.""" 15 16import grpc 17 18# requests_pb2 is a semantic dependency of this module. 19from tests.testing import _application_common 20from tests.testing.proto import requests_pb2 # pylint: disable=unused-import 21from tests.testing.proto import services_pb2 22from tests.testing.proto import services_pb2_grpc 23 24 25class FirstServiceServicer(services_pb2_grpc.FirstServiceServicer): 26 """Services RPCs.""" 27 28 def UnUn(self, request, context): 29 if _application_common.UNARY_UNARY_REQUEST == request: 30 return _application_common.UNARY_UNARY_RESPONSE 31 else: 32 context.set_code(grpc.StatusCode.INVALID_ARGUMENT) 33 context.set_details('Something is wrong with your request!') 34 return services_pb2.Down() 35 36 def UnStre(self, request, context): 37 if _application_common.UNARY_STREAM_REQUEST != request: 38 context.set_code(grpc.StatusCode.INVALID_ARGUMENT) 39 context.set_details('Something is wrong with your request!') 40 return 41 yield services_pb2.Strange() # pylint: disable=unreachable 42 43 def StreUn(self, request_iterator, context): 44 context.send_initial_metadata((( 45 'server_application_metadata_key', 46 'Hi there!', 47 ),)) 48 for request in request_iterator: 49 if request != _application_common.STREAM_UNARY_REQUEST: 50 context.set_code(grpc.StatusCode.INVALID_ARGUMENT) 51 context.set_details('Something is wrong with your request!') 52 return services_pb2.Strange() 53 elif not context.is_active(): 54 return services_pb2.Strange() 55 else: 56 return _application_common.STREAM_UNARY_RESPONSE 57 58 def StreStre(self, request_iterator, context): 59 for request in request_iterator: 60 if request != _application_common.STREAM_STREAM_REQUEST: 61 context.set_code(grpc.StatusCode.INVALID_ARGUMENT) 62 context.set_details('Something is wrong with your request!') 63 return 64 elif not context.is_active(): 65 return 66 else: 67 yield _application_common.STREAM_STREAM_RESPONSE 68 yield _application_common.STREAM_STREAM_RESPONSE 69