1# Copyright 2015 gRPC authors.
2#
3# Licensed under the Apache License, Version 2.0 (the "License");
4# you may not use this file except in compliance with the License.
5# You may obtain a copy of the License at
6#
7#     http://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS,
11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12# See the License for the specific language governing permissions and
13# limitations under the License.
14
15cimport cpython
16
17import threading
18import time
19
20_INTERNAL_CALL_ERROR_MESSAGE_FORMAT = (
21    'Internal gRPC call error %d. ' +
22    'Please report to https://github.com/grpc/grpc/issues')
23
24
25cdef str _call_error_metadata(metadata):
26  return 'metadata was invalid: %s' % metadata
27
28
29cdef str _call_error_no_metadata(c_call_error):
30  return _INTERNAL_CALL_ERROR_MESSAGE_FORMAT % c_call_error
31
32
33cdef str _call_error(c_call_error, metadata):
34  if c_call_error == GRPC_CALL_ERROR_INVALID_METADATA:
35    return _call_error_metadata(metadata)
36  else:
37    return _call_error_no_metadata(c_call_error)
38
39
40cdef _check_call_error_no_metadata(c_call_error):
41  if c_call_error != GRPC_CALL_OK:
42    return _INTERNAL_CALL_ERROR_MESSAGE_FORMAT % c_call_error
43  else:
44    return None
45
46
47cdef _check_and_raise_call_error_no_metadata(c_call_error):
48  error = _check_call_error_no_metadata(c_call_error)
49  if error is not None:
50    raise ValueError(error)
51
52
53cdef _check_call_error(c_call_error, metadata):
54  if c_call_error == GRPC_CALL_ERROR_INVALID_METADATA:
55    return _call_error_metadata(metadata)
56  else:
57    return _check_call_error_no_metadata(c_call_error)
58
59
60cdef void _raise_call_error_no_metadata(c_call_error) except *:
61  raise ValueError(_call_error_no_metadata(c_call_error))
62
63
64cdef void _raise_call_error(c_call_error, metadata) except *:
65  raise ValueError(_call_error(c_call_error, metadata))
66
67
68cdef _destroy_c_completion_queue(grpc_completion_queue *c_completion_queue):
69  grpc_completion_queue_shutdown(c_completion_queue)
70  grpc_completion_queue_destroy(c_completion_queue)
71
72
73cdef class _CallState:
74
75  def __cinit__(self):
76    self.due = set()
77
78
79cdef class _ChannelState:
80
81  def __cinit__(self):
82    self.condition = threading.Condition()
83    self.open = True
84    self.integrated_call_states = {}
85    self.segregated_call_states = set()
86    self.connectivity_due = set()
87    self.closed_reason = None
88
89
90cdef tuple _operate(grpc_call *c_call, object operations, object user_tag):
91  cdef grpc_call_error c_call_error
92  cdef _BatchOperationTag tag = _BatchOperationTag(user_tag, operations, None)
93  tag.prepare()
94  cpython.Py_INCREF(tag)
95  with nogil:
96    c_call_error = grpc_call_start_batch(
97        c_call, tag.c_ops, tag.c_nops, <cpython.PyObject *>tag, NULL)
98  return c_call_error, tag
99
100
101cdef object _operate_from_integrated_call(
102    _ChannelState channel_state, _CallState call_state, object operations,
103    object user_tag):
104  cdef grpc_call_error c_call_error
105  cdef _BatchOperationTag tag
106  with channel_state.condition:
107    if call_state.due:
108      c_call_error, tag = _operate(call_state.c_call, operations, user_tag)
109      if c_call_error == GRPC_CALL_OK:
110        call_state.due.add(tag)
111        channel_state.integrated_call_states[tag] = call_state
112        return True
113      else:
114        _raise_call_error_no_metadata(c_call_error)
115    else:
116      return False
117
118
119cdef object _operate_from_segregated_call(
120    _ChannelState channel_state, _CallState call_state, object operations,
121    object user_tag):
122  cdef grpc_call_error c_call_error
123  cdef _BatchOperationTag tag
124  with channel_state.condition:
125    if call_state.due:
126      c_call_error, tag = _operate(call_state.c_call, operations, user_tag)
127      if c_call_error == GRPC_CALL_OK:
128        call_state.due.add(tag)
129        return True
130      else:
131        _raise_call_error_no_metadata(c_call_error)
132    else:
133      return False
134
135
136cdef _cancel(
137    _ChannelState channel_state, _CallState call_state, grpc_status_code code,
138    str details):
139  cdef grpc_call_error c_call_error
140  with channel_state.condition:
141    if call_state.due:
142      c_call_error = grpc_call_cancel_with_status(
143          call_state.c_call, code, _encode(details), NULL)
144      _check_and_raise_call_error_no_metadata(c_call_error)
145
146
147cdef _next_call_event(
148    _ChannelState channel_state, grpc_completion_queue *c_completion_queue,
149    on_success, deadline):
150  tag, event = _latent_event(c_completion_queue, deadline)
151  with channel_state.condition:
152    on_success(tag)
153    channel_state.condition.notify_all()
154  return event
155
156
157# TODO(https://github.com/grpc/grpc/issues/14569): This could be a lot simpler.
158cdef void _call(
159    _ChannelState channel_state, _CallState call_state,
160    grpc_completion_queue *c_completion_queue, on_success, int flags, method,
161    host, object deadline, CallCredentials credentials,
162    object operationses_and_user_tags, object metadata) except *:
163  """Invokes an RPC.
164
165  Args:
166    channel_state: A _ChannelState with its "open" attribute set to True. RPCs
167      may not be invoked on a closed channel.
168    call_state: An empty _CallState to be altered (specifically assigned a
169      c_call and having its due set populated) if the RPC invocation is
170      successful.
171    c_completion_queue: A grpc_completion_queue to be used for the call's
172      operations.
173    on_success: A behavior to be called if attempting to start operations for
174      the call succeeds. If called the behavior will be called while holding the
175      channel_state condition and passed the tags associated with operations
176      that were successfully started for the call.
177    flags: Flags to be passed to gRPC Core as part of call creation.
178    method: The fully-qualified name of the RPC method being invoked.
179    host: A "host" string to be passed to gRPC Core as part of call creation.
180    deadline: A float for the deadline of the RPC, or None if the RPC is to have
181      no deadline.
182    credentials: A _CallCredentials for the RPC or None.
183    operationses_and_user_tags: A sequence of length-two sequences the first
184      element of which is a sequence of Operations and the second element of
185      which is an object to be used as a tag. A SendInitialMetadataOperation
186      must be present in the first element of this value.
187    metadata: The metadata for this call.
188  """
189  cdef grpc_slice method_slice
190  cdef grpc_slice host_slice
191  cdef grpc_slice *host_slice_ptr
192  cdef grpc_call_credentials *c_call_credentials
193  cdef grpc_call_error c_call_error
194  cdef tuple error_and_wrapper_tag
195  cdef _BatchOperationTag wrapper_tag
196  with channel_state.condition:
197    if channel_state.open:
198      method_slice = _slice_from_bytes(method)
199      if host is None:
200        host_slice_ptr = NULL
201      else:
202        host_slice = _slice_from_bytes(host)
203        host_slice_ptr = &host_slice
204      call_state.c_call = grpc_channel_create_call(
205          channel_state.c_channel, NULL, flags,
206          c_completion_queue, method_slice, host_slice_ptr,
207          _timespec_from_time(deadline), NULL)
208      grpc_slice_unref(method_slice)
209      if host_slice_ptr:
210        grpc_slice_unref(host_slice)
211      if credentials is not None:
212        c_call_credentials = credentials.c()
213        c_call_error = grpc_call_set_credentials(
214            call_state.c_call, c_call_credentials)
215        grpc_call_credentials_release(c_call_credentials)
216        if c_call_error != GRPC_CALL_OK:
217          grpc_call_unref(call_state.c_call)
218          call_state.c_call = NULL
219          _raise_call_error_no_metadata(c_call_error)
220      started_tags = set()
221      for operations, user_tag in operationses_and_user_tags:
222        c_call_error, tag = _operate(call_state.c_call, operations, user_tag)
223        if c_call_error == GRPC_CALL_OK:
224          started_tags.add(tag)
225        else:
226          grpc_call_cancel(call_state.c_call, NULL)
227          grpc_call_unref(call_state.c_call)
228          call_state.c_call = NULL
229          _raise_call_error(c_call_error, metadata)
230      else:
231        call_state.due.update(started_tags)
232        on_success(started_tags)
233    else:
234      raise ValueError('Cannot invoke RPC: %s' % channel_state.closed_reason)
235cdef void _process_integrated_call_tag(
236    _ChannelState state, _BatchOperationTag tag) except *:
237  cdef _CallState call_state = state.integrated_call_states.pop(tag)
238  call_state.due.remove(tag)
239  if not call_state.due:
240    grpc_call_unref(call_state.c_call)
241    call_state.c_call = NULL
242
243
244cdef class IntegratedCall:
245
246  def __cinit__(self, _ChannelState channel_state, _CallState call_state):
247    self._channel_state = channel_state
248    self._call_state = call_state
249
250  def operate(self, operations, tag):
251    return _operate_from_integrated_call(
252        self._channel_state, self._call_state, operations, tag)
253
254  def cancel(self, code, details):
255    _cancel(self._channel_state, self._call_state, code, details)
256
257
258cdef IntegratedCall _integrated_call(
259    _ChannelState state, int flags, method, host, object deadline,
260    object metadata, CallCredentials credentials, operationses_and_user_tags):
261  call_state = _CallState()
262
263  def on_success(started_tags):
264    for started_tag in started_tags:
265      state.integrated_call_states[started_tag] = call_state
266
267  _call(
268      state, call_state, state.c_call_completion_queue, on_success, flags,
269      method, host, deadline, credentials, operationses_and_user_tags, metadata)
270
271  return IntegratedCall(state, call_state)
272
273
274cdef object _process_segregated_call_tag(
275    _ChannelState state, _CallState call_state,
276    grpc_completion_queue *c_completion_queue, _BatchOperationTag tag):
277  call_state.due.remove(tag)
278  if not call_state.due:
279    grpc_call_unref(call_state.c_call)
280    call_state.c_call = NULL
281    state.segregated_call_states.remove(call_state)
282    _destroy_c_completion_queue(c_completion_queue)
283    return True
284  else:
285    return False
286
287
288cdef class SegregatedCall:
289
290  def __cinit__(self, _ChannelState channel_state, _CallState call_state):
291    self._channel_state = channel_state
292    self._call_state = call_state
293
294  def operate(self, operations, tag):
295    return _operate_from_segregated_call(
296        self._channel_state, self._call_state, operations, tag)
297
298  def cancel(self, code, details):
299    _cancel(self._channel_state, self._call_state, code, details)
300
301  def next_event(self):
302    def on_success(tag):
303      _process_segregated_call_tag(
304        self._channel_state, self._call_state, self._c_completion_queue, tag)
305    return _next_call_event(
306        self._channel_state, self._c_completion_queue, on_success, None)
307
308
309cdef SegregatedCall _segregated_call(
310    _ChannelState state, int flags, method, host, object deadline,
311    object metadata, CallCredentials credentials, operationses_and_user_tags):
312  cdef _CallState call_state = _CallState()
313  cdef SegregatedCall segregated_call
314  cdef grpc_completion_queue *c_completion_queue
315
316  def on_success(started_tags):
317    state.segregated_call_states.add(call_state)
318
319  with state.condition:
320    if state.open:
321      c_completion_queue = (grpc_completion_queue_create_for_next(NULL))
322    else:
323      raise ValueError('Cannot invoke RPC on closed channel!')
324
325  try:
326    _call(
327        state, call_state, c_completion_queue, on_success, flags, method, host,
328        deadline, credentials, operationses_and_user_tags, metadata)
329  except:
330    _destroy_c_completion_queue(c_completion_queue)
331    raise
332
333  segregated_call = SegregatedCall(state, call_state)
334  segregated_call._c_completion_queue = c_completion_queue
335  return segregated_call
336
337
338cdef object _watch_connectivity_state(
339    _ChannelState state, grpc_connectivity_state last_observed_state,
340    object deadline):
341  cdef _ConnectivityTag tag = _ConnectivityTag(object())
342  with state.condition:
343    if state.open:
344      cpython.Py_INCREF(tag)
345      grpc_channel_watch_connectivity_state(
346          state.c_channel, last_observed_state, _timespec_from_time(deadline),
347          state.c_connectivity_completion_queue, <cpython.PyObject *>tag)
348      state.connectivity_due.add(tag)
349    else:
350      raise ValueError('Cannot invoke RPC: %s' % state.closed_reason)
351  completed_tag, event = _latent_event(
352      state.c_connectivity_completion_queue, None)
353  with state.condition:
354    state.connectivity_due.remove(completed_tag)
355    state.condition.notify_all()
356  return event
357
358
359cdef _close(Channel channel, grpc_status_code code, object details,
360    drain_calls):
361  cdef _ChannelState state = channel._state
362  cdef _CallState call_state
363  encoded_details = _encode(details)
364  with state.condition:
365    if state.open:
366      state.open = False
367      state.closed_reason = details
368      for call_state in set(state.integrated_call_states.values()):
369        grpc_call_cancel_with_status(
370            call_state.c_call, code, encoded_details, NULL)
371      for call_state in state.segregated_call_states:
372        grpc_call_cancel_with_status(
373            call_state.c_call, code, encoded_details, NULL)
374      # TODO(https://github.com/grpc/grpc/issues/3064): Cancel connectivity
375      # watching.
376
377      if drain_calls:
378        while not _calls_drained(state):
379          event = channel.next_call_event()
380          if event.completion_type == CompletionType.queue_timeout:
381              continue
382          event.tag(event)
383      else:
384        while state.integrated_call_states:
385          state.condition.wait()
386        while state.segregated_call_states:
387          state.condition.wait()
388        while state.connectivity_due:
389          state.condition.wait()
390
391      _destroy_c_completion_queue(state.c_call_completion_queue)
392      _destroy_c_completion_queue(state.c_connectivity_completion_queue)
393      grpc_channel_destroy(state.c_channel)
394      state.c_channel = NULL
395      grpc_shutdown()
396      state.condition.notify_all()
397    else:
398      # Another call to close already completed in the past or is currently
399      # being executed in another thread.
400      while state.c_channel != NULL:
401        state.condition.wait()
402
403
404cdef _calls_drained(_ChannelState state):
405  return not (state.integrated_call_states or state.segregated_call_states or
406              state.connectivity_due)
407
408cdef class Channel:
409
410  def __cinit__(
411      self, bytes target, object arguments,
412      ChannelCredentials channel_credentials):
413    arguments = () if arguments is None else tuple(arguments)
414    fork_handlers_and_grpc_init()
415    self._state = _ChannelState()
416    self._vtable.copy = &_copy_pointer
417    self._vtable.destroy = &_destroy_pointer
418    self._vtable.cmp = &_compare_pointer
419    cdef _ArgumentsProcessor arguments_processor = _ArgumentsProcessor(
420        arguments)
421    cdef grpc_channel_args *c_arguments = arguments_processor.c(&self._vtable)
422    if channel_credentials is None:
423      self._state.c_channel = grpc_insecure_channel_create(
424          <char *>target, c_arguments, NULL)
425    else:
426      c_channel_credentials = channel_credentials.c()
427      self._state.c_channel = grpc_secure_channel_create(
428          c_channel_credentials, <char *>target, c_arguments, NULL)
429      grpc_channel_credentials_release(c_channel_credentials)
430    self._state.c_call_completion_queue = (
431        grpc_completion_queue_create_for_next(NULL))
432    self._state.c_connectivity_completion_queue = (
433        grpc_completion_queue_create_for_next(NULL))
434    self._arguments = arguments
435
436  def target(self):
437    cdef char *c_target
438    with self._state.condition:
439      c_target = grpc_channel_get_target(self._state.c_channel)
440      target = <bytes>c_target
441      gpr_free(c_target)
442      return target
443
444  def integrated_call(
445      self, int flags, method, host, object deadline, object metadata,
446      CallCredentials credentials, operationses_and_tags):
447    return _integrated_call(
448        self._state, flags, method, host, deadline, metadata, credentials,
449        operationses_and_tags)
450
451  def next_call_event(self):
452    def on_success(tag):
453      if tag is not None:
454        _process_integrated_call_tag(self._state, tag)
455    if is_fork_support_enabled():
456      queue_deadline = time.time() + 1.0
457    else:
458      queue_deadline = None
459    return _next_call_event(self._state, self._state.c_call_completion_queue,
460                            on_success, queue_deadline)
461
462  def segregated_call(
463      self, int flags, method, host, object deadline, object metadata,
464      CallCredentials credentials, operationses_and_tags):
465    return _segregated_call(
466        self._state, flags, method, host, deadline, metadata, credentials,
467        operationses_and_tags)
468
469  def check_connectivity_state(self, bint try_to_connect):
470    with self._state.condition:
471      if self._state.open:
472        return grpc_channel_check_connectivity_state(
473            self._state.c_channel, try_to_connect)
474      else:
475        raise ValueError('Cannot invoke RPC: %s' % self._state.closed_reason)
476
477  def watch_connectivity_state(
478      self, grpc_connectivity_state last_observed_state, object deadline):
479    return _watch_connectivity_state(self._state, last_observed_state, deadline)
480
481  def close(self, code, details):
482    _close(self, code, details, False)
483
484  def close_on_fork(self, code, details):
485    _close(self, code, details, True)
486