1# Copyright 2015 gRPC authors.
2#
3# Licensed under the Apache License, Version 2.0 (the "License");
4# you may not use this file except in compliance with the License.
5# You may obtain a copy of the License at
6#
7#     http://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS,
11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12# See the License for the specific language governing permissions and
13# limitations under the License.
14
15cimport cpython
16
17import threading
18import time
19
20cdef int _INTERRUPT_CHECK_PERIOD_MS = 200
21
22
23cdef grpc_event _next(grpc_completion_queue *c_completion_queue, deadline):
24  cdef gpr_timespec c_increment
25  cdef gpr_timespec c_timeout
26  cdef gpr_timespec c_deadline
27  c_increment = gpr_time_from_millis(_INTERRUPT_CHECK_PERIOD_MS, GPR_TIMESPAN)
28  if deadline is None:
29    c_deadline = gpr_inf_future(GPR_CLOCK_REALTIME)
30  else:
31    c_deadline = _timespec_from_time(deadline)
32
33  with nogil:
34    while True:
35      c_timeout = gpr_time_add(gpr_now(GPR_CLOCK_REALTIME), c_increment)
36      if gpr_time_cmp(c_timeout, c_deadline) > 0:
37        c_timeout = c_deadline
38      c_event = grpc_completion_queue_next(c_completion_queue, c_timeout, NULL)
39      if (c_event.type != GRPC_QUEUE_TIMEOUT or
40          gpr_time_cmp(c_timeout, c_deadline) == 0):
41        break
42
43      # Handle any signals
44      with gil:
45        cpython.PyErr_CheckSignals()
46  return c_event
47
48
49cdef _interpret_event(grpc_event c_event):
50  cdef _Tag tag
51  if c_event.type == GRPC_QUEUE_TIMEOUT:
52    # NOTE(nathaniel): For now we coopt ConnectivityEvent here.
53    return None, ConnectivityEvent(GRPC_QUEUE_TIMEOUT, False, None)
54  elif c_event.type == GRPC_QUEUE_SHUTDOWN:
55    # NOTE(nathaniel): For now we coopt ConnectivityEvent here.
56    return None, ConnectivityEvent(GRPC_QUEUE_SHUTDOWN, False, None)
57  else:
58    tag = <_Tag>c_event.tag
59    # We receive event tags only after they've been inc-ref'd elsewhere in
60    # the code.
61    cpython.Py_DECREF(tag)
62    return tag, tag.event(c_event)
63
64
65cdef _latent_event(grpc_completion_queue *c_completion_queue, object deadline):
66  cdef grpc_event c_event = _next(c_completion_queue, deadline)
67  return _interpret_event(c_event)
68
69
70cdef class CompletionQueue:
71
72  def __cinit__(self, shutdown_cq=False):
73    cdef grpc_completion_queue_attributes c_attrs
74    fork_handlers_and_grpc_init()
75    if shutdown_cq:
76      c_attrs.version = 1
77      c_attrs.cq_completion_type = GRPC_CQ_NEXT
78      c_attrs.cq_polling_type = GRPC_CQ_NON_LISTENING
79      self.c_completion_queue = grpc_completion_queue_create(
80          grpc_completion_queue_factory_lookup(&c_attrs), &c_attrs, NULL);
81    else:
82      self.c_completion_queue = grpc_completion_queue_create_for_next(NULL)
83    self.is_shutting_down = False
84    self.is_shutdown = False
85
86  cdef _interpret_event(self, grpc_event c_event):
87    unused_tag, event = _interpret_event(c_event)
88    if event.completion_type == GRPC_QUEUE_SHUTDOWN:
89      self.is_shutdown = True
90    return event
91
92  # We name this 'poll' to avoid problems with CPython's expectations for
93  # 'special' methods (like next and __next__).
94  def poll(self, deadline=None):
95    return self._interpret_event(_next(self.c_completion_queue, deadline))
96
97  def shutdown(self):
98    with nogil:
99      grpc_completion_queue_shutdown(self.c_completion_queue)
100    self.is_shutting_down = True
101
102  def clear(self):
103    if not self.is_shutting_down:
104      raise ValueError('queue must be shutting down to be cleared')
105    while self.poll().type != GRPC_QUEUE_SHUTDOWN:
106      pass
107
108  def __dealloc__(self):
109    cdef gpr_timespec c_deadline
110    c_deadline = gpr_inf_future(GPR_CLOCK_REALTIME)
111    if self.c_completion_queue != NULL:
112      # Ensure shutdown
113      if not self.is_shutting_down:
114        grpc_completion_queue_shutdown(self.c_completion_queue)
115      # Pump the queue (All outstanding calls should have been cancelled)
116      while not self.is_shutdown:
117        event = grpc_completion_queue_next(
118            self.c_completion_queue, c_deadline, NULL)
119        self._interpret_event(event)
120      grpc_completion_queue_destroy(self.c_completion_queue)
121    grpc_shutdown()
122