1# Copyright 2017 gRPC authors.
2#
3# Licensed under the Apache License, Version 2.0 (the "License");
4# you may not use this file except in compliance with the License.
5# You may obtain a copy of the License at
6#
7#     http://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS,
11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12# See the License for the specific language governing permissions and
13# limitations under the License.
14"""Objects for use in testing gRPC Python-using application code."""
15
16import abc
17import six
18
19from google.protobuf import descriptor
20
21import grpc
22
23
24class UnaryUnaryChannelRpc(six.with_metaclass(abc.ABCMeta)):
25    """Fixture for a unary-unary RPC invoked by a system under test.
26
27    Enables users to "play server" for the RPC.
28    """
29
30    @abc.abstractmethod
31    def send_initial_metadata(self, initial_metadata):
32        """Sends the RPC's initial metadata to the system under test.
33
34        Args:
35          initial_metadata: The RPC's initial metadata to be "sent" to
36            the system under test.
37        """
38        raise NotImplementedError()
39
40    @abc.abstractmethod
41    def cancelled(self):
42        """Blocks until the system under test has cancelled the RPC."""
43        raise NotImplementedError()
44
45    @abc.abstractmethod
46    def terminate(self, response, trailing_metadata, code, details):
47        """Terminates the RPC.
48
49        Args:
50          response: The response for the RPC.
51          trailing_metadata: The RPC's trailing metadata.
52          code: The RPC's status code.
53          details: The RPC's status details.
54        """
55        raise NotImplementedError()
56
57
58class UnaryStreamChannelRpc(six.with_metaclass(abc.ABCMeta)):
59    """Fixture for a unary-stream RPC invoked by a system under test.
60
61    Enables users to "play server" for the RPC.
62    """
63
64    @abc.abstractmethod
65    def send_initial_metadata(self, initial_metadata):
66        """Sends the RPC's initial metadata to the system under test.
67
68        Args:
69          initial_metadata: The RPC's initial metadata to be "sent" to
70            the system under test.
71        """
72        raise NotImplementedError()
73
74    @abc.abstractmethod
75    def send_response(self, response):
76        """Sends a response to the system under test.
77
78        Args:
79          response: A response message to be "sent" to the system under test.
80        """
81        raise NotImplementedError()
82
83    @abc.abstractmethod
84    def cancelled(self):
85        """Blocks until the system under test has cancelled the RPC."""
86        raise NotImplementedError()
87
88    @abc.abstractmethod
89    def terminate(self, trailing_metadata, code, details):
90        """Terminates the RPC.
91
92        Args:
93          trailing_metadata: The RPC's trailing metadata.
94          code: The RPC's status code.
95          details: The RPC's status details.
96        """
97        raise NotImplementedError()
98
99
100class StreamUnaryChannelRpc(six.with_metaclass(abc.ABCMeta)):
101    """Fixture for a stream-unary RPC invoked by a system under test.
102
103    Enables users to "play server" for the RPC.
104    """
105
106    @abc.abstractmethod
107    def send_initial_metadata(self, initial_metadata):
108        """Sends the RPC's initial metadata to the system under test.
109
110        Args:
111          initial_metadata: The RPC's initial metadata to be "sent" to
112            the system under test.
113        """
114        raise NotImplementedError()
115
116    @abc.abstractmethod
117    def take_request(self):
118        """Draws one of the requests added to the RPC by the system under test.
119
120        This method blocks until the system under test has added to the RPC
121        the request to be returned.
122
123        Successive calls to this method return requests in the same order in
124        which the system under test added them to the RPC.
125
126        Returns:
127          A request message added to the RPC by the system under test.
128        """
129        raise NotImplementedError()
130
131    @abc.abstractmethod
132    def requests_closed(self):
133        """Blocks until the system under test has closed the request stream."""
134        raise NotImplementedError()
135
136    @abc.abstractmethod
137    def cancelled(self):
138        """Blocks until the system under test has cancelled the RPC."""
139        raise NotImplementedError()
140
141    @abc.abstractmethod
142    def terminate(self, response, trailing_metadata, code, details):
143        """Terminates the RPC.
144
145        Args:
146          response: The response for the RPC.
147          trailing_metadata: The RPC's trailing metadata.
148          code: The RPC's status code.
149          details: The RPC's status details.
150        """
151        raise NotImplementedError()
152
153
154class StreamStreamChannelRpc(six.with_metaclass(abc.ABCMeta)):
155    """Fixture for a stream-stream RPC invoked by a system under test.
156
157    Enables users to "play server" for the RPC.
158    """
159
160    @abc.abstractmethod
161    def send_initial_metadata(self, initial_metadata):
162        """Sends the RPC's initial metadata to the system under test.
163
164        Args:
165          initial_metadata: The RPC's initial metadata to be "sent" to the
166            system under test.
167        """
168        raise NotImplementedError()
169
170    @abc.abstractmethod
171    def take_request(self):
172        """Draws one of the requests added to the RPC by the system under test.
173
174        This method blocks until the system under test has added to the RPC
175        the request to be returned.
176
177        Successive calls to this method return requests in the same order in
178        which the system under test added them to the RPC.
179
180        Returns:
181          A request message added to the RPC by the system under test.
182        """
183        raise NotImplementedError()
184
185    @abc.abstractmethod
186    def send_response(self, response):
187        """Sends a response to the system under test.
188
189        Args:
190          response: A response messages to be "sent" to the system under test.
191        """
192        raise NotImplementedError()
193
194    @abc.abstractmethod
195    def requests_closed(self):
196        """Blocks until the system under test has closed the request stream."""
197        raise NotImplementedError()
198
199    @abc.abstractmethod
200    def cancelled(self):
201        """Blocks until the system under test has cancelled the RPC."""
202        raise NotImplementedError()
203
204    @abc.abstractmethod
205    def terminate(self, trailing_metadata, code, details):
206        """Terminates the RPC.
207
208        Args:
209          trailing_metadata: The RPC's trailing metadata.
210          code: The RPC's status code.
211          details: The RPC's status details.
212        """
213        raise NotImplementedError()
214
215
216class Channel(six.with_metaclass(abc.ABCMeta, grpc.Channel)):
217    """A grpc.Channel double with which to test a system that invokes RPCs."""
218
219    @abc.abstractmethod
220    def take_unary_unary(self, method_descriptor):
221        """Draws an RPC currently being made by the system under test.
222
223        If the given descriptor does not identify any RPC currently being made
224        by the system under test, this method blocks until the system under
225        test invokes such an RPC.
226
227        Args:
228          method_descriptor: A descriptor.MethodDescriptor describing a
229            unary-unary RPC method.
230
231        Returns:
232          A (invocation_metadata, request, unary_unary_channel_rpc) tuple of
233            the RPC's invocation metadata, its request, and a
234            UnaryUnaryChannelRpc with which to "play server" for the RPC.
235        """
236        raise NotImplementedError()
237
238    @abc.abstractmethod
239    def take_unary_stream(self, method_descriptor):
240        """Draws an RPC currently being made by the system under test.
241
242        If the given descriptor does not identify any RPC currently being made
243        by the system under test, this method blocks until the system under
244        test invokes such an RPC.
245
246        Args:
247          method_descriptor: A descriptor.MethodDescriptor describing a
248            unary-stream RPC method.
249
250        Returns:
251          A (invocation_metadata, request, unary_stream_channel_rpc) tuple of
252            the RPC's invocation metadata, its request, and a
253            UnaryStreamChannelRpc with which to "play server" for the RPC.
254        """
255        raise NotImplementedError()
256
257    @abc.abstractmethod
258    def take_stream_unary(self, method_descriptor):
259        """Draws an RPC currently being made by the system under test.
260
261        If the given descriptor does not identify any RPC currently being made
262        by the system under test, this method blocks until the system under
263        test invokes such an RPC.
264
265        Args:
266          method_descriptor: A descriptor.MethodDescriptor describing a
267            stream-unary RPC method.
268
269        Returns:
270          A (invocation_metadata, stream_unary_channel_rpc) tuple of the RPC's
271            invocation metadata and a StreamUnaryChannelRpc with which to "play
272            server" for the RPC.
273        """
274        raise NotImplementedError()
275
276    @abc.abstractmethod
277    def take_stream_stream(self, method_descriptor):
278        """Draws an RPC currently being made by the system under test.
279
280        If the given descriptor does not identify any RPC currently being made
281        by the system under test, this method blocks until the system under
282        test invokes such an RPC.
283
284        Args:
285          method_descriptor: A descriptor.MethodDescriptor describing a
286            stream-stream RPC method.
287
288        Returns:
289          A (invocation_metadata, stream_stream_channel_rpc) tuple of the RPC's
290            invocation metadata and a StreamStreamChannelRpc with which to
291            "play server" for the RPC.
292        """
293        raise NotImplementedError()
294
295
296class UnaryUnaryServerRpc(six.with_metaclass(abc.ABCMeta)):
297    """Fixture for a unary-unary RPC serviced by a system under test.
298
299    Enables users to "play client" for the RPC.
300    """
301
302    @abc.abstractmethod
303    def initial_metadata(self):
304        """Accesses the initial metadata emitted by the system under test.
305
306        This method blocks until the system under test has added initial
307        metadata to the RPC (or has provided one or more response messages or
308        has terminated the RPC, either of which will cause gRPC Python to
309        synthesize initial metadata for the RPC).
310
311        Returns:
312          The initial metadata for the RPC.
313        """
314        raise NotImplementedError()
315
316    @abc.abstractmethod
317    def cancel(self):
318        """Cancels the RPC."""
319        raise NotImplementedError()
320
321    @abc.abstractmethod
322    def termination(self):
323        """Blocks until the system under test has terminated the RPC.
324
325        Returns:
326          A (response, trailing_metadata, code, details) sequence with the RPC's
327            response, trailing metadata, code, and details.
328        """
329        raise NotImplementedError()
330
331
332class UnaryStreamServerRpc(six.with_metaclass(abc.ABCMeta)):
333    """Fixture for a unary-stream RPC serviced by a system under test.
334
335    Enables users to "play client" for the RPC.
336    """
337
338    @abc.abstractmethod
339    def initial_metadata(self):
340        """Accesses the initial metadata emitted by the system under test.
341
342        This method blocks until the system under test has added initial
343        metadata to the RPC (or has provided one or more response messages or
344        has terminated the RPC, either of which will cause gRPC Python to
345        synthesize initial metadata for the RPC).
346
347        Returns:
348          The initial metadata for the RPC.
349        """
350        raise NotImplementedError()
351
352    @abc.abstractmethod
353    def take_response(self):
354        """Draws one of the responses added to the RPC by the system under test.
355
356        Successive calls to this method return responses in the same order in
357        which the system under test added them to the RPC.
358
359        Returns:
360          A response message added to the RPC by the system under test.
361        """
362        raise NotImplementedError()
363
364    @abc.abstractmethod
365    def cancel(self):
366        """Cancels the RPC."""
367        raise NotImplementedError()
368
369    @abc.abstractmethod
370    def termination(self):
371        """Blocks until the system under test has terminated the RPC.
372
373        Returns:
374          A (trailing_metadata, code, details) sequence with the RPC's trailing
375            metadata, code, and details.
376        """
377        raise NotImplementedError()
378
379
380class StreamUnaryServerRpc(six.with_metaclass(abc.ABCMeta)):
381    """Fixture for a stream-unary RPC serviced by a system under test.
382
383    Enables users to "play client" for the RPC.
384    """
385
386    @abc.abstractmethod
387    def initial_metadata(self):
388        """Accesses the initial metadata emitted by the system under test.
389
390        This method blocks until the system under test has added initial
391        metadata to the RPC (or has provided one or more response messages or
392        has terminated the RPC, either of which will cause gRPC Python to
393        synthesize initial metadata for the RPC).
394
395        Returns:
396          The initial metadata for the RPC.
397        """
398        raise NotImplementedError()
399
400    @abc.abstractmethod
401    def send_request(self, request):
402        """Sends a request to the system under test.
403
404        Args:
405          request: A request message for the RPC to be "sent" to the system
406            under test.
407        """
408        raise NotImplementedError()
409
410    @abc.abstractmethod
411    def requests_closed(self):
412        """Indicates the end of the RPC's request stream."""
413        raise NotImplementedError()
414
415    @abc.abstractmethod
416    def cancel(self):
417        """Cancels the RPC."""
418        raise NotImplementedError()
419
420    @abc.abstractmethod
421    def termination(self):
422        """Blocks until the system under test has terminated the RPC.
423
424        Returns:
425          A (response, trailing_metadata, code, details) sequence with the RPC's
426            response, trailing metadata, code, and details.
427        """
428        raise NotImplementedError()
429
430
431class StreamStreamServerRpc(six.with_metaclass(abc.ABCMeta)):
432    """Fixture for a stream-stream RPC serviced by a system under test.
433
434    Enables users to "play client" for the RPC.
435    """
436
437    @abc.abstractmethod
438    def initial_metadata(self):
439        """Accesses the initial metadata emitted by the system under test.
440
441        This method blocks until the system under test has added initial
442        metadata to the RPC (or has provided one or more response messages or
443        has terminated the RPC, either of which will cause gRPC Python to
444        synthesize initial metadata for the RPC).
445
446        Returns:
447          The initial metadata for the RPC.
448        """
449        raise NotImplementedError()
450
451    @abc.abstractmethod
452    def send_request(self, request):
453        """Sends a request to the system under test.
454
455        Args:
456          request: A request message for the RPC to be "sent" to the system
457            under test.
458        """
459        raise NotImplementedError()
460
461    @abc.abstractmethod
462    def requests_closed(self):
463        """Indicates the end of the RPC's request stream."""
464        raise NotImplementedError()
465
466    @abc.abstractmethod
467    def take_response(self):
468        """Draws one of the responses added to the RPC by the system under test.
469
470        Successive calls to this method return responses in the same order in
471        which the system under test added them to the RPC.
472
473        Returns:
474          A response message added to the RPC by the system under test.
475        """
476        raise NotImplementedError()
477
478    @abc.abstractmethod
479    def cancel(self):
480        """Cancels the RPC."""
481        raise NotImplementedError()
482
483    @abc.abstractmethod
484    def termination(self):
485        """Blocks until the system under test has terminated the RPC.
486
487        Returns:
488          A (trailing_metadata, code, details) sequence with the RPC's trailing
489            metadata, code, and details.
490        """
491        raise NotImplementedError()
492
493
494class Server(six.with_metaclass(abc.ABCMeta)):
495    """A server with which to test a system that services RPCs."""
496
497    @abc.abstractmethod
498    def invoke_unary_unary(self, method_descriptor, invocation_metadata,
499                           request, timeout):
500        """Invokes an RPC to be serviced by the system under test.
501
502        Args:
503          method_descriptor: A descriptor.MethodDescriptor describing a unary-unary
504            RPC method.
505          invocation_metadata: The RPC's invocation metadata.
506          request: The RPC's request.
507          timeout: A duration of time in seconds for the RPC or None to
508            indicate that the RPC has no time limit.
509
510        Returns:
511          A UnaryUnaryServerRpc with which to "play client" for the RPC.
512        """
513        raise NotImplementedError()
514
515    @abc.abstractmethod
516    def invoke_unary_stream(self, method_descriptor, invocation_metadata,
517                            request, timeout):
518        """Invokes an RPC to be serviced by the system under test.
519
520        Args:
521          method_descriptor: A descriptor.MethodDescriptor describing a unary-stream
522            RPC method.
523          invocation_metadata: The RPC's invocation metadata.
524          request: The RPC's request.
525          timeout: A duration of time in seconds for the RPC or None to
526            indicate that the RPC has no time limit.
527
528        Returns:
529          A UnaryStreamServerRpc with which to "play client" for the RPC.
530        """
531        raise NotImplementedError()
532
533    @abc.abstractmethod
534    def invoke_stream_unary(self, method_descriptor, invocation_metadata,
535                            timeout):
536        """Invokes an RPC to be serviced by the system under test.
537
538        Args:
539          method_descriptor: A descriptor.MethodDescriptor describing a stream-unary
540            RPC method.
541          invocation_metadata: The RPC's invocation metadata.
542          timeout: A duration of time in seconds for the RPC or None to
543            indicate that the RPC has no time limit.
544
545        Returns:
546          A StreamUnaryServerRpc with which to "play client" for the RPC.
547        """
548        raise NotImplementedError()
549
550    @abc.abstractmethod
551    def invoke_stream_stream(self, method_descriptor, invocation_metadata,
552                             timeout):
553        """Invokes an RPC to be serviced by the system under test.
554
555        Args:
556          method_descriptor: A descriptor.MethodDescriptor describing a stream-stream
557            RPC method.
558          invocation_metadata: The RPC's invocation metadata.
559          timeout: A duration of time in seconds for the RPC or None to
560            indicate that the RPC has no time limit.
561
562        Returns:
563          A StreamStreamServerRpc with which to "play client" for the RPC.
564        """
565        raise NotImplementedError()
566
567
568class Time(six.with_metaclass(abc.ABCMeta)):
569    """A simulation of time.
570
571    Implementations needn't be connected with real time as provided by the
572    Python interpreter, but as long as systems under test use
573    RpcContext.is_active and RpcContext.time_remaining for querying RPC liveness
574    implementations may be used to change passage of time in tests.
575    """
576
577    @abc.abstractmethod
578    def time(self):
579        """Accesses the current test time.
580
581        Returns:
582          The current test time (over which this object has authority).
583        """
584        raise NotImplementedError()
585
586    @abc.abstractmethod
587    def call_in(self, behavior, delay):
588        """Adds a behavior to be called after some time.
589
590        Args:
591          behavior: A behavior to be called with no arguments.
592          delay: A duration of time in seconds after which to call the behavior.
593
594        Returns:
595          A grpc.Future with which the call of the behavior may be cancelled
596            before it is executed.
597        """
598        raise NotImplementedError()
599
600    @abc.abstractmethod
601    def call_at(self, behavior, time):
602        """Adds a behavior to be called at a specific time.
603
604        Args:
605          behavior: A behavior to be called with no arguments.
606          time: The test time at which to call the behavior.
607
608        Returns:
609          A grpc.Future with which the call of the behavior may be cancelled
610            before it is executed.
611        """
612        raise NotImplementedError()
613
614    @abc.abstractmethod
615    def sleep_for(self, duration):
616        """Blocks for some length of test time.
617
618        Args:
619          duration: A duration of test time in seconds for which to block.
620        """
621        raise NotImplementedError()
622
623    @abc.abstractmethod
624    def sleep_until(self, time):
625        """Blocks until some test time.
626
627        Args:
628          time: The test time until which to block.
629        """
630        raise NotImplementedError()
631
632
633def strict_real_time():
634    """Creates a Time backed by the Python interpreter's time.
635
636    The returned instance will be "strict" with respect to callbacks
637    submitted to it: it will ensure that all callbacks registered to
638    be called at time t have been called before it describes the time
639    as having advanced beyond t.
640
641    Returns:
642      A Time backed by the "system" (Python interpreter's) time.
643    """
644    from grpc_testing import _time
645    return _time.StrictRealTime()
646
647
648def strict_fake_time(now):
649    """Creates a Time that can be manipulated by test code.
650
651    The returned instance maintains an internal representation of time
652    independent of real time. This internal representation only advances
653    when user code calls the instance's sleep_for and sleep_until methods.
654
655    The returned instance will be "strict" with respect to callbacks
656    submitted to it: it will ensure that all callbacks registered to
657    be called at time t have been called before it describes the time
658    as having advanced beyond t.
659
660    Returns:
661      A Time that simulates the passage of time.
662    """
663    from grpc_testing import _time
664    return _time.StrictFakeTime(now)
665
666
667def channel(service_descriptors, time):
668    """Creates a Channel for use in tests of a gRPC Python-using system.
669
670    Args:
671      service_descriptors: An iterable of descriptor.ServiceDescriptors
672        describing the RPCs that will be made on the returned Channel by the
673        system under test.
674      time: A Time to be used for tests.
675
676    Returns:
677      A Channel for use in tests.
678    """
679    from grpc_testing import _channel
680    return _channel.testing_channel(service_descriptors, time)
681
682
683def server_from_dictionary(descriptors_to_servicers, time):
684    """Creates a Server for use in tests of a gRPC Python-using system.
685
686    Args:
687      descriptors_to_servicers: A dictionary from descriptor.ServiceDescriptors
688        defining RPC services to servicer objects (usually instances of classes
689        that implement "Servicer" interfaces defined in generated "_pb2_grpc"
690        modules) implementing those services.
691      time: A Time to be used for tests.
692
693    Returns:
694      A Server for use in tests.
695    """
696    from grpc_testing import _server
697    return _server.server_from_dictionary(descriptors_to_servicers, time)
698