1# Copyright 2015 gRPC authors.
2#
3# Licensed under the Apache License, Version 2.0 (the "License");
4# you may not use this file except in compliance with the License.
5# You may obtain a copy of the License at
6#
7#     http://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS,
11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12# See the License for the specific language governing permissions and
13# limitations under the License.
14
15cimport cpython
16
17import logging
18import time
19import grpc
20
21logging.basicConfig()
22_LOGGER = logging.getLogger(__name__)
23
24cdef class Server:
25
26  def __cinit__(self, object arguments):
27    fork_handlers_and_grpc_init()
28    self.references = []
29    self.registered_completion_queues = []
30    self._vtable.copy = &_copy_pointer
31    self._vtable.destroy = &_destroy_pointer
32    self._vtable.cmp = &_compare_pointer
33    cdef _ArgumentsProcessor arguments_processor = _ArgumentsProcessor(
34        arguments)
35    cdef grpc_channel_args *c_arguments = arguments_processor.c(&self._vtable)
36    self.c_server = grpc_server_create(c_arguments, NULL)
37    arguments_processor.un_c()
38    self.references.append(arguments)
39    self.is_started = False
40    self.is_shutting_down = False
41    self.is_shutdown = False
42
43  def request_call(
44      self, CompletionQueue call_queue not None,
45      CompletionQueue server_queue not None, tag):
46    if not self.is_started or self.is_shutting_down:
47      raise ValueError("server must be started and not shutting down")
48    if server_queue not in self.registered_completion_queues:
49      raise ValueError("server_queue must be a registered completion queue")
50    cdef _RequestCallTag request_call_tag = _RequestCallTag(tag)
51    request_call_tag.prepare()
52    cpython.Py_INCREF(request_call_tag)
53    return grpc_server_request_call(
54        self.c_server, &request_call_tag.call.c_call,
55        &request_call_tag.call_details.c_details,
56        &request_call_tag.c_invocation_metadata,
57        call_queue.c_completion_queue, server_queue.c_completion_queue,
58        <cpython.PyObject *>request_call_tag)
59
60  def register_completion_queue(
61      self, CompletionQueue queue not None):
62    if self.is_started:
63      raise ValueError("cannot register completion queues after start")
64    with nogil:
65      grpc_server_register_completion_queue(
66          self.c_server, queue.c_completion_queue, NULL)
67    self.registered_completion_queues.append(queue)
68
69  def start(self):
70    if self.is_started:
71      raise ValueError("the server has already started")
72    self.backup_shutdown_queue = CompletionQueue(shutdown_cq=True)
73    self.register_completion_queue(self.backup_shutdown_queue)
74    self.is_started = True
75    with nogil:
76      grpc_server_start(self.c_server)
77    # Ensure the core has gotten a chance to do the start-up work
78    self.backup_shutdown_queue.poll(deadline=time.time())
79
80  def add_http2_port(self, bytes address,
81                     ServerCredentials server_credentials=None):
82    address = str_to_bytes(address)
83    self.references.append(address)
84    cdef int result
85    cdef char *address_c_string = address
86    if server_credentials is not None:
87      self.references.append(server_credentials)
88      with nogil:
89        result = grpc_server_add_secure_http2_port(
90            self.c_server, address_c_string, server_credentials.c_credentials)
91    else:
92      with nogil:
93        result = grpc_server_add_insecure_http2_port(self.c_server,
94                                                     address_c_string)
95    return result
96
97  cdef _c_shutdown(self, CompletionQueue queue, tag):
98    self.is_shutting_down = True
99    cdef _ServerShutdownTag server_shutdown_tag = _ServerShutdownTag(tag, self)
100    cpython.Py_INCREF(server_shutdown_tag)
101    with nogil:
102      grpc_server_shutdown_and_notify(
103          self.c_server, queue.c_completion_queue,
104          <cpython.PyObject *>server_shutdown_tag)
105
106  def shutdown(self, CompletionQueue queue not None, tag):
107    if queue.is_shutting_down:
108      raise ValueError("queue must be live")
109    elif not self.is_started:
110      raise ValueError("the server hasn't started yet")
111    elif self.is_shutting_down:
112      return
113    elif queue not in self.registered_completion_queues:
114      raise ValueError("expected registered completion queue")
115    else:
116      self._c_shutdown(queue, tag)
117
118  cdef notify_shutdown_complete(self):
119    # called only after our server shutdown tag has emerged from a completion
120    # queue.
121    self.is_shutdown = True
122
123  def cancel_all_calls(self):
124    if not self.is_shutting_down:
125      raise RuntimeError("the server must be shutting down to cancel all calls")
126    elif self.is_shutdown:
127      return
128    else:
129      with nogil:
130        grpc_server_cancel_all_calls(self.c_server)
131
132  def __dealloc__(self):
133    if self.c_server != NULL:
134      if not self.is_started:
135        pass
136      elif self.is_shutdown:
137        pass
138      elif not self.is_shutting_down:
139        # the user didn't call shutdown - use our backup queue
140        self._c_shutdown(self.backup_shutdown_queue, None)
141        # and now we wait
142        while not self.is_shutdown:
143          self.backup_shutdown_queue.poll()
144      else:
145        # We're in the process of shutting down, but have not shutdown; can't do
146        # much but repeatedly release the GIL and wait
147        while not self.is_shutdown:
148          time.sleep(0)
149      grpc_server_destroy(self.c_server)
150    grpc_shutdown()
151