1# Copyright 2015 gRPC authors. 2# 3# Licensed under the Apache License, Version 2.0 (the "License"); 4# you may not use this file except in compliance with the License. 5# You may obtain a copy of the License at 6# 7# http://www.apache.org/licenses/LICENSE-2.0 8# 9# Unless required by applicable law or agreed to in writing, software 10# distributed under the License is distributed on an "AS IS" BASIS, 11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12# See the License for the specific language governing permissions and 13# limitations under the License. 14 15cimport cpython 16 17import threading 18import time 19 20cdef int _INTERRUPT_CHECK_PERIOD_MS = 200 21 22 23cdef grpc_event _next(grpc_completion_queue *c_completion_queue, deadline): 24 cdef gpr_timespec c_increment 25 cdef gpr_timespec c_timeout 26 cdef gpr_timespec c_deadline 27 c_increment = gpr_time_from_millis(_INTERRUPT_CHECK_PERIOD_MS, GPR_TIMESPAN) 28 if deadline is None: 29 c_deadline = gpr_inf_future(GPR_CLOCK_REALTIME) 30 else: 31 c_deadline = _timespec_from_time(deadline) 32 33 with nogil: 34 while True: 35 c_timeout = gpr_time_add(gpr_now(GPR_CLOCK_REALTIME), c_increment) 36 if gpr_time_cmp(c_timeout, c_deadline) > 0: 37 c_timeout = c_deadline 38 c_event = grpc_completion_queue_next(c_completion_queue, c_timeout, NULL) 39 if (c_event.type != GRPC_QUEUE_TIMEOUT or 40 gpr_time_cmp(c_timeout, c_deadline) == 0): 41 break 42 43 # Handle any signals 44 with gil: 45 cpython.PyErr_CheckSignals() 46 return c_event 47 48 49cdef _interpret_event(grpc_event c_event): 50 cdef _Tag tag 51 if c_event.type == GRPC_QUEUE_TIMEOUT: 52 # NOTE(nathaniel): For now we coopt ConnectivityEvent here. 53 return None, ConnectivityEvent(GRPC_QUEUE_TIMEOUT, False, None) 54 elif c_event.type == GRPC_QUEUE_SHUTDOWN: 55 # NOTE(nathaniel): For now we coopt ConnectivityEvent here. 56 return None, ConnectivityEvent(GRPC_QUEUE_SHUTDOWN, False, None) 57 else: 58 tag = <_Tag>c_event.tag 59 # We receive event tags only after they've been inc-ref'd elsewhere in 60 # the code. 61 cpython.Py_DECREF(tag) 62 return tag, tag.event(c_event) 63 64 65cdef _latent_event(grpc_completion_queue *c_completion_queue, object deadline): 66 cdef grpc_event c_event = _next(c_completion_queue, deadline) 67 return _interpret_event(c_event) 68 69 70cdef class CompletionQueue: 71 72 def __cinit__(self, shutdown_cq=False): 73 cdef grpc_completion_queue_attributes c_attrs 74 fork_handlers_and_grpc_init() 75 if shutdown_cq: 76 c_attrs.version = 1 77 c_attrs.cq_completion_type = GRPC_CQ_NEXT 78 c_attrs.cq_polling_type = GRPC_CQ_NON_LISTENING 79 self.c_completion_queue = grpc_completion_queue_create( 80 grpc_completion_queue_factory_lookup(&c_attrs), &c_attrs, NULL); 81 else: 82 self.c_completion_queue = grpc_completion_queue_create_for_next(NULL) 83 self.is_shutting_down = False 84 self.is_shutdown = False 85 86 cdef _interpret_event(self, grpc_event c_event): 87 unused_tag, event = _interpret_event(c_event) 88 if event.completion_type == GRPC_QUEUE_SHUTDOWN: 89 self.is_shutdown = True 90 return event 91 92 # We name this 'poll' to avoid problems with CPython's expectations for 93 # 'special' methods (like next and __next__). 94 def poll(self, deadline=None): 95 return self._interpret_event(_next(self.c_completion_queue, deadline)) 96 97 def shutdown(self): 98 with nogil: 99 grpc_completion_queue_shutdown(self.c_completion_queue) 100 self.is_shutting_down = True 101 102 def clear(self): 103 if not self.is_shutting_down: 104 raise ValueError('queue must be shutting down to be cleared') 105 while self.poll().type != GRPC_QUEUE_SHUTDOWN: 106 pass 107 108 def __dealloc__(self): 109 cdef gpr_timespec c_deadline 110 c_deadline = gpr_inf_future(GPR_CLOCK_REALTIME) 111 if self.c_completion_queue != NULL: 112 # Ensure shutdown 113 if not self.is_shutting_down: 114 grpc_completion_queue_shutdown(self.c_completion_queue) 115 # Pump the queue (All outstanding calls should have been cancelled) 116 while not self.is_shutdown: 117 event = grpc_completion_queue_next( 118 self.c_completion_queue, c_deadline, NULL) 119 self._interpret_event(event) 120 grpc_completion_queue_destroy(self.c_completion_queue) 121 grpc_shutdown() 122