1# Copyright 2018 gRPC authors.
2#
3# Licensed under the Apache License, Version 2.0 (the "License");
4# you may not use this file except in compliance with the License.
5# You may obtain a copy of the License at
6#
7#     http://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS,
11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12# See the License for the specific language governing permissions and
13# limitations under the License.
14# distutils: language=c++
15
16cimport cpython
17from libc cimport string
18from libc.stdlib cimport malloc, free
19import errno
20gevent_g = None
21gevent_socket = None
22gevent_hub = None
23gevent_event = None
24g_event = None
25g_pool = None
26
27cdef grpc_error* grpc_error_none():
28  return <grpc_error*>0
29
30cdef grpc_error* socket_error(str syscall, str err):
31  error_str = "{} failed: {}".format(syscall, err)
32  error_bytes = str_to_bytes(error_str)
33  return grpc_socket_error(error_bytes)
34
35cdef resolved_addr_to_tuple(grpc_resolved_address* address):
36  cdef char* res_str
37  port = grpc_sockaddr_get_port(address)
38  str_len = grpc_sockaddr_to_string(&res_str, address, 0)
39  byte_str = _decode(<bytes>res_str[:str_len])
40  if byte_str.endswith(':' + str(port)):
41    byte_str = byte_str[:(0 - len(str(port)) - 1)]
42  byte_str = byte_str.lstrip('[')
43  byte_str = byte_str.rstrip(']')
44  byte_str = '{}'.format(byte_str)
45  return byte_str, port
46
47cdef sockaddr_to_tuple(const grpc_sockaddr* address, size_t length):
48  cdef grpc_resolved_address c_addr
49  string.memcpy(<void*>c_addr.addr, <void*> address, length)
50  c_addr.len = length
51  return resolved_addr_to_tuple(&c_addr)
52
53cdef sockaddr_is_ipv4(const grpc_sockaddr* address, size_t length):
54  cdef grpc_resolved_address c_addr
55  string.memcpy(<void*>c_addr.addr, <void*> address, length)
56  c_addr.len = length
57  return grpc_sockaddr_get_uri_scheme(&c_addr) == b'ipv4'
58
59cdef grpc_resolved_addresses* tuples_to_resolvaddr(tups):
60  cdef grpc_resolved_addresses* addresses
61  tups_set = set((tup[4][0], tup[4][1]) for tup in tups)
62  addresses = <grpc_resolved_addresses*> malloc(sizeof(grpc_resolved_addresses))
63  addresses.naddrs = len(tups_set)
64  addresses.addrs = <grpc_resolved_address*> malloc(sizeof(grpc_resolved_address) * len(tups_set))
65  i = 0
66  for tup in set(tups_set):
67    hostname = str_to_bytes(tup[0])
68    grpc_string_to_sockaddr(&addresses.addrs[i], hostname, tup[1])
69    i += 1
70  return addresses
71
72def _spawn_greenlet(*args):
73  greenlet = g_pool.spawn(*args)
74
75###############################
76### socket implementation ###
77###############################
78
79cdef class SocketWrapper:
80  def __cinit__(self):
81    self.sockopts = []
82    self.socket = None
83    self.c_socket = NULL
84    self.c_buffer = NULL
85    self.len = 0
86
87cdef grpc_error* socket_init(grpc_custom_socket* socket, int domain) with gil:
88  sw = SocketWrapper()
89  sw.c_socket = socket
90  sw.sockopts = []
91  cpython.Py_INCREF(sw)
92  # Python doesn't support AF_UNSPEC sockets, so we defer creation until
93  # bind/connect when we know what type of socket we need
94  sw.socket = None
95  sw.closed = False
96  sw.accepting_socket = NULL
97  socket.impl = <void*>sw
98  return grpc_error_none()
99
100cdef socket_connect_async_cython(SocketWrapper socket_wrapper, addr_tuple):
101  try:
102    socket_wrapper.socket.connect(addr_tuple)
103    socket_wrapper.connect_cb(<grpc_custom_socket*>socket_wrapper.c_socket,
104                              grpc_error_none())
105  except IOError as io_error:
106    socket_wrapper.connect_cb(<grpc_custom_socket*>socket_wrapper.c_socket,
107                              socket_error("connect", str(io_error)))
108  g_event.set()
109
110def socket_connect_async(socket_wrapper, addr_tuple):
111  socket_connect_async_cython(socket_wrapper, addr_tuple)
112
113cdef void socket_connect(grpc_custom_socket* socket, const grpc_sockaddr* addr,
114                         size_t addr_len,
115                         grpc_custom_connect_callback cb) with gil:
116  py_socket = None
117  socket_wrapper = <SocketWrapper>socket.impl
118  socket_wrapper.connect_cb = cb
119  addr_tuple = sockaddr_to_tuple(addr, addr_len)
120  if sockaddr_is_ipv4(addr, addr_len):
121      py_socket = gevent_socket.socket(gevent_socket.AF_INET)
122  else:
123      py_socket = gevent_socket.socket(gevent_socket.AF_INET6)
124  applysockopts(py_socket)
125  socket_wrapper.socket = py_socket
126  _spawn_greenlet(socket_connect_async, socket_wrapper, addr_tuple)
127
128cdef void socket_destroy(grpc_custom_socket* socket) with gil:
129  cpython.Py_DECREF(<SocketWrapper>socket.impl)
130
131cdef void socket_shutdown(grpc_custom_socket* socket) with gil:
132  try:
133    (<SocketWrapper>socket.impl).socket.shutdown(gevent_socket.SHUT_RDWR)
134  except IOError as io_error:
135    if io_error.errno != errno.ENOTCONN:
136      raise io_error
137
138cdef void socket_close(grpc_custom_socket* socket,
139                       grpc_custom_close_callback cb) with gil:
140  socket_wrapper = (<SocketWrapper>socket.impl)
141  if socket_wrapper.socket is not None:
142    socket_wrapper.socket.close()
143    socket_wrapper.closed = True
144    socket_wrapper.close_cb = cb
145    # Delay the close callback until the accept() call has picked it up
146    if socket_wrapper.accepting_socket != NULL:
147      return
148  socket_wrapper.close_cb(socket)
149
150def socket_sendmsg(socket, write_bytes):
151  try:
152    return socket.sendmsg(write_bytes)
153  except AttributeError:
154    # sendmsg not available on all Pythons/Platforms
155    return socket.send(b''.join(write_bytes))
156
157cdef socket_write_async_cython(SocketWrapper socket_wrapper, write_bytes):
158  try:
159    while write_bytes:
160      sent_byte_count = socket_sendmsg(socket_wrapper.socket, write_bytes)
161      while sent_byte_count > 0:
162        if sent_byte_count < len(write_bytes[0]):
163          write_bytes[0] = write_bytes[0][sent_byte_count:]
164          sent_byte_count = 0
165        else:
166          sent_byte_count -= len(write_bytes[0])
167          write_bytes = write_bytes[1:]
168    socket_wrapper.write_cb(<grpc_custom_socket*>socket_wrapper.c_socket,
169                            grpc_error_none())
170  except IOError as io_error:
171    socket_wrapper.write_cb(<grpc_custom_socket*>socket_wrapper.c_socket,
172                            socket_error("send", str(io_error)))
173  g_event.set()
174
175def socket_write_async(socket_wrapper, write_bytes):
176  socket_write_async_cython(socket_wrapper, write_bytes)
177
178cdef void socket_write(grpc_custom_socket* socket, grpc_slice_buffer* buffer,
179                       grpc_custom_write_callback cb) with gil:
180  cdef char* start
181  sw = <SocketWrapper>socket.impl
182  sw.write_cb = cb
183  write_bytes = []
184  for i in range(buffer.count):
185    start = grpc_slice_buffer_start(buffer, i)
186    length = grpc_slice_buffer_length(buffer, i)
187    write_bytes.append(<bytes>start[:length])
188  _spawn_greenlet(socket_write_async, <SocketWrapper>socket.impl, write_bytes)
189
190cdef socket_read_async_cython(SocketWrapper socket_wrapper):
191  cdef char* buff_char_arr
192  try:
193    buff_str = socket_wrapper.socket.recv(socket_wrapper.len)
194    buff_char_arr = buff_str
195    string.memcpy(<void*>socket_wrapper.c_buffer, buff_char_arr, len(buff_str))
196    socket_wrapper.read_cb(<grpc_custom_socket*>socket_wrapper.c_socket,
197                           len(buff_str), grpc_error_none())
198  except IOError as io_error:
199    socket_wrapper.read_cb(<grpc_custom_socket*>socket_wrapper.c_socket,
200                           -1, socket_error("recv", str(io_error)))
201  g_event.set()
202
203def socket_read_async(socket_wrapper):
204  socket_read_async_cython(socket_wrapper)
205
206cdef void socket_read(grpc_custom_socket* socket, char* buffer,
207                      size_t length, grpc_custom_read_callback cb) with gil:
208  sw = <SocketWrapper>socket.impl
209  sw.read_cb = cb
210  sw.c_buffer = buffer
211  sw.len = length
212  _spawn_greenlet(socket_read_async, sw)
213
214cdef grpc_error* socket_getpeername(grpc_custom_socket* socket,
215                                    const grpc_sockaddr* addr,
216                                    int* length) with gil:
217  cdef char* src_buf
218  peer = (<SocketWrapper>socket.impl).socket.getpeername()
219
220  cdef grpc_resolved_address c_addr
221  hostname = str_to_bytes(peer[0])
222  grpc_string_to_sockaddr(&c_addr, hostname, peer[1])
223  string.memcpy(<void*>addr, <void*>c_addr.addr, c_addr.len)
224  length[0] = c_addr.len
225  return grpc_error_none()
226
227cdef grpc_error* socket_getsockname(grpc_custom_socket* socket,
228                                    const grpc_sockaddr* addr,
229                                    int* length) with gil:
230  cdef char* src_buf
231  cdef grpc_resolved_address c_addr
232  if (<SocketWrapper>socket.impl).socket is None:
233    peer = ('0.0.0.0', 0)
234  else:
235    peer = (<SocketWrapper>socket.impl).socket.getsockname()
236  hostname = str_to_bytes(peer[0])
237  grpc_string_to_sockaddr(&c_addr, hostname, peer[1])
238  string.memcpy(<void*>addr, <void*>c_addr.addr, c_addr.len)
239  length[0] = c_addr.len
240  return grpc_error_none()
241
242def applysockopts(s):
243  s.setsockopt(gevent_socket.SOL_SOCKET, gevent_socket.SO_REUSEADDR, 1)
244  s.setsockopt(gevent_socket.IPPROTO_TCP, gevent_socket.TCP_NODELAY, True)
245
246cdef grpc_error* socket_bind(grpc_custom_socket* socket,
247                             const grpc_sockaddr* addr,
248                             size_t len, int flags) with gil:
249  addr_tuple = sockaddr_to_tuple(addr, len)
250  try:
251    try:
252      py_socket = gevent_socket.socket(gevent_socket.AF_INET)
253      applysockopts(py_socket)
254      py_socket.bind(addr_tuple)
255    except gevent_socket.gaierror as e:
256      py_socket = gevent_socket.socket(gevent_socket.AF_INET6)
257      applysockopts(py_socket)
258      py_socket.bind(addr_tuple)
259    (<SocketWrapper>socket.impl).socket = py_socket
260  except IOError as io_error:
261    return socket_error("bind", str(io_error))
262  else:
263    return grpc_error_none()
264
265cdef grpc_error* socket_listen(grpc_custom_socket* socket) with gil:
266  (<SocketWrapper>socket.impl).socket.listen(50)
267  return grpc_error_none()
268
269cdef void accept_callback_cython(SocketWrapper s):
270   try:
271     conn, address = s.socket.accept()
272     sw = SocketWrapper()
273     sw.closed = False
274     sw.c_socket = s.accepting_socket
275     sw.sockopts = []
276     sw.socket = conn
277     sw.c_socket.impl = <void*>sw
278     sw.accepting_socket = NULL
279     cpython.Py_INCREF(sw)
280     s.accepting_socket = NULL
281     s.accept_cb(<grpc_custom_socket*>s.c_socket, sw.c_socket, grpc_error_none())
282   except IOError as io_error:
283      #TODO actual error
284      s.accepting_socket = NULL
285      s.accept_cb(<grpc_custom_socket*>s.c_socket, s.accepting_socket,
286                  socket_error("accept", str(io_error)))
287      if s.closed:
288        s.close_cb(<grpc_custom_socket*>s.c_socket)
289   g_event.set()
290
291def socket_accept_async(s):
292  accept_callback_cython(s)
293
294cdef void socket_accept(grpc_custom_socket* socket, grpc_custom_socket* client,
295                        grpc_custom_accept_callback cb) with gil:
296  sw = <SocketWrapper>socket.impl
297  sw.accepting_socket = client
298  sw.accept_cb = cb
299  _spawn_greenlet(socket_accept_async, sw)
300
301#####################################
302######Resolver implementation #######
303#####################################
304
305cdef class ResolveWrapper:
306  def __cinit__(self):
307    self.c_resolver = NULL
308    self.c_host = NULL
309    self.c_port = NULL
310
311cdef socket_resolve_async_cython(ResolveWrapper resolve_wrapper):
312  try:
313    res = gevent_socket.getaddrinfo(resolve_wrapper.c_host, resolve_wrapper.c_port)
314    grpc_custom_resolve_callback(<grpc_custom_resolver*>resolve_wrapper.c_resolver,
315                                 tuples_to_resolvaddr(res), grpc_error_none())
316  except IOError as io_error:
317    grpc_custom_resolve_callback(<grpc_custom_resolver*>resolve_wrapper.c_resolver,
318                                 <grpc_resolved_addresses*>0,
319                                 socket_error("getaddrinfo", str(io_error)))
320  g_event.set()
321
322def socket_resolve_async_python(resolve_wrapper):
323  socket_resolve_async_cython(resolve_wrapper)
324
325cdef void socket_resolve_async(grpc_custom_resolver* r, char* host, char* port) with gil:
326  rw = ResolveWrapper()
327  rw.c_resolver = r
328  rw.c_host = host
329  rw.c_port = port
330  _spawn_greenlet(socket_resolve_async_python, rw)
331
332cdef grpc_error* socket_resolve(char* host, char* port,
333                                grpc_resolved_addresses** res) with gil:
334    try:
335      result = gevent_socket.getaddrinfo(host, port)
336      res[0] = tuples_to_resolvaddr(result)
337      return grpc_error_none()
338    except IOError as io_error:
339      return socket_error("getaddrinfo", str(io_error))
340
341###############################
342### timer implementation ######
343###############################
344
345cdef class TimerWrapper:
346  def __cinit__(self, deadline):
347    self.timer = gevent_hub.get_hub().loop.timer(deadline)
348    self.event = None
349
350  def start(self):
351    self.event = gevent_event.Event()
352    self.timer.start(self.on_finish)
353
354  def on_finish(self):
355    grpc_custom_timer_callback(self.c_timer, grpc_error_none())
356    self.timer.stop()
357    g_event.set()
358
359  def stop(self):
360    self.event.set()
361    self.timer.stop()
362
363cdef void timer_start(grpc_custom_timer* t) with gil:
364  timer = TimerWrapper(t.timeout_ms / 1000.0)
365  timer.c_timer = t
366  t.timer = <void*>timer
367  timer.start()
368
369cdef void timer_stop(grpc_custom_timer* t) with gil:
370  time_wrapper = <object>t.timer
371  time_wrapper.stop()
372
373###############################
374### pollset implementation ###
375###############################
376
377cdef void init_loop() with gil:
378  pass
379
380cdef void destroy_loop() with gil:
381  g_pool.join()
382
383cdef void kick_loop() with gil:
384  g_event.set()
385
386cdef void run_loop(size_t timeout_ms) with gil:
387    timeout = timeout_ms / 1000.0
388    if timeout_ms > 0:
389      g_event.wait(timeout)
390      g_event.clear()
391
392###############################
393### Initializer ###############
394###############################
395
396cdef grpc_socket_vtable gevent_socket_vtable
397cdef grpc_custom_resolver_vtable gevent_resolver_vtable
398cdef grpc_custom_timer_vtable gevent_timer_vtable
399cdef grpc_custom_poller_vtable gevent_pollset_vtable
400
401def init_grpc_gevent():
402  # Lazily import gevent
403  global gevent_socket
404  global gevent_g
405  global gevent_hub
406  global gevent_event
407  global g_event
408  global g_pool
409  import gevent
410  gevent_g = gevent
411  import gevent.socket
412  gevent_socket = gevent.socket
413  import gevent.hub
414  gevent_hub = gevent.hub
415  import gevent.event
416  gevent_event = gevent.event
417  import gevent.pool
418
419  g_event = gevent.event.Event()
420  g_pool = gevent.pool.Group()
421
422  def cb_func(cb, args):
423    _spawn_greenlet(cb, *args)
424  set_async_callback_func(cb_func)
425
426  gevent_resolver_vtable.resolve = socket_resolve
427  gevent_resolver_vtable.resolve_async = socket_resolve_async
428
429  gevent_socket_vtable.init = socket_init
430  gevent_socket_vtable.connect = socket_connect
431  gevent_socket_vtable.destroy = socket_destroy
432  gevent_socket_vtable.shutdown = socket_shutdown
433  gevent_socket_vtable.close = socket_close
434  gevent_socket_vtable.write = socket_write
435  gevent_socket_vtable.read = socket_read
436  gevent_socket_vtable.getpeername = socket_getpeername
437  gevent_socket_vtable.getsockname = socket_getsockname
438  gevent_socket_vtable.bind = socket_bind
439  gevent_socket_vtable.listen = socket_listen
440  gevent_socket_vtable.accept = socket_accept
441
442  gevent_timer_vtable.start = timer_start
443  gevent_timer_vtable.stop = timer_stop
444
445  gevent_pollset_vtable.init = init_loop
446  gevent_pollset_vtable.poll = run_loop
447  gevent_pollset_vtable.kick = kick_loop
448  gevent_pollset_vtable.shutdown = destroy_loop
449
450  grpc_custom_iomgr_init(&gevent_socket_vtable,
451                         &gevent_resolver_vtable,
452                         &gevent_timer_vtable,
453                         &gevent_pollset_vtable)
454