1 /*
2  * Definition of a `Connection` type.
3  * Used by `socket_connection.c` and `pipe_connection.c`.
4  *
5  * connection.h
6  *
7  * Copyright (c) 2006-2008, R Oudkerk --- see COPYING.txt
8  */
9 
10 #ifndef CONNECTION_H
11 #define CONNECTION_H
12 
13 /*
14  * Read/write flags
15  */
16 
17 #define READABLE 1
18 #define WRITABLE 2
19 
20 #define CHECK_READABLE(self) \
21     if (!(self->flags & READABLE)) { \
22         PyErr_SetString(PyExc_IOError, "connection is write-only"); \
23         return NULL; \
24     }
25 
26 #define CHECK_WRITABLE(self) \
27     if (!(self->flags & WRITABLE)) { \
28         PyErr_SetString(PyExc_IOError, "connection is read-only"); \
29         return NULL; \
30     }
31 
32 /*
33  * Allocation and deallocation
34  */
35 
36 static PyObject *
connection_new(PyTypeObject * type,PyObject * args,PyObject * kwds)37 connection_new(PyTypeObject *type, PyObject *args, PyObject *kwds)
38 {
39     ConnectionObject *self;
40     HANDLE handle;
41     BOOL readable = TRUE, writable = TRUE;
42 
43     static char *kwlist[] = {"handle", "readable", "writable", NULL};
44 
45     if (!PyArg_ParseTupleAndKeywords(args, kwds, F_HANDLE "|ii", kwlist,
46                                      &handle, &readable, &writable))
47         return NULL;
48 
49     if (handle == INVALID_HANDLE_VALUE || (Py_ssize_t)handle < 0) {
50         PyErr_Format(PyExc_IOError, "invalid handle %zd",
51                      (Py_ssize_t)handle);
52         return NULL;
53     }
54 
55     if (!readable && !writable) {
56         PyErr_SetString(PyExc_ValueError,
57                         "either readable or writable must be true");
58         return NULL;
59     }
60 
61     self = PyObject_New(ConnectionObject, type);
62     if (self == NULL)
63         return NULL;
64 
65     self->weakreflist = NULL;
66     self->handle = handle;
67     self->flags = 0;
68 
69     if (readable)
70         self->flags |= READABLE;
71     if (writable)
72         self->flags |= WRITABLE;
73     assert(self->flags >= 1 && self->flags <= 3);
74 
75     return (PyObject*)self;
76 }
77 
78 static void
connection_dealloc(ConnectionObject * self)79 connection_dealloc(ConnectionObject* self)
80 {
81     if (self->weakreflist != NULL)
82         PyObject_ClearWeakRefs((PyObject*)self);
83 
84     if (self->handle != INVALID_HANDLE_VALUE) {
85         Py_BEGIN_ALLOW_THREADS
86         CLOSE(self->handle);
87         Py_END_ALLOW_THREADS
88     }
89     PyObject_Del(self);
90 }
91 
92 /*
93  * Functions for transferring buffers
94  */
95 
96 static PyObject *
connection_sendbytes(ConnectionObject * self,PyObject * args)97 connection_sendbytes(ConnectionObject *self, PyObject *args)
98 {
99     char *buffer;
100     Py_ssize_t length, offset=0, size=PY_SSIZE_T_MIN;
101     int res;
102 
103     if (!PyArg_ParseTuple(args, F_RBUFFER "#|" F_PY_SSIZE_T F_PY_SSIZE_T,
104                           &buffer, &length, &offset, &size))
105         return NULL;
106 
107     CHECK_WRITABLE(self);
108 
109     if (offset < 0) {
110         PyErr_SetString(PyExc_ValueError, "offset is negative");
111         return NULL;
112     }
113     if (length < offset) {
114         PyErr_SetString(PyExc_ValueError, "buffer length < offset");
115         return NULL;
116     }
117 
118     if (size == PY_SSIZE_T_MIN) {
119         size = length - offset;
120     } else {
121         if (size < 0) {
122             PyErr_SetString(PyExc_ValueError, "size is negative");
123             return NULL;
124         }
125         if (offset + size > length) {
126             PyErr_SetString(PyExc_ValueError,
127                             "buffer length < offset + size");
128             return NULL;
129         }
130     }
131 
132     res = conn_send_string(self, buffer + offset, size);
133 
134     if (res < 0) {
135         if (PyErr_Occurred())
136             return NULL;
137         else
138             return mp_SetError(PyExc_IOError, res);
139     }
140 
141     Py_RETURN_NONE;
142 }
143 
144 static PyObject *
connection_recvbytes(ConnectionObject * self,PyObject * args)145 connection_recvbytes(ConnectionObject *self, PyObject *args)
146 {
147     char *freeme = NULL;
148     Py_ssize_t res, maxlength = PY_SSIZE_T_MAX;
149     PyObject *result = NULL;
150 
151     if (!PyArg_ParseTuple(args, "|" F_PY_SSIZE_T, &maxlength))
152         return NULL;
153 
154     CHECK_READABLE(self);
155 
156     if (maxlength < 0) {
157         PyErr_SetString(PyExc_ValueError, "maxlength < 0");
158         return NULL;
159     }
160 
161     res = conn_recv_string(self, self->buffer, CONNECTION_BUFFER_SIZE,
162                            &freeme, maxlength);
163 
164     if (res < 0) {
165         if (res == MP_BAD_MESSAGE_LENGTH) {
166             if ((self->flags & WRITABLE) == 0) {
167                 Py_BEGIN_ALLOW_THREADS
168                 CLOSE(self->handle);
169                 Py_END_ALLOW_THREADS
170                 self->handle = INVALID_HANDLE_VALUE;
171             } else {
172                 self->flags = WRITABLE;
173             }
174         }
175         mp_SetError(PyExc_IOError, res);
176     } else {
177         if (freeme == NULL) {
178             result = PyString_FromStringAndSize(self->buffer, res);
179         } else {
180             result = PyString_FromStringAndSize(freeme, res);
181             PyMem_Free(freeme);
182         }
183     }
184 
185     return result;
186 }
187 
188 static PyObject *
connection_recvbytes_into(ConnectionObject * self,PyObject * args)189 connection_recvbytes_into(ConnectionObject *self, PyObject *args)
190 {
191     char *freeme = NULL, *buffer = NULL;
192     Py_ssize_t res, length, offset = 0;
193     PyObject *result = NULL;
194     Py_buffer pbuf;
195 
196     CHECK_READABLE(self);
197 
198     if (!PyArg_ParseTuple(args, "w*|" F_PY_SSIZE_T,
199                           &pbuf, &offset))
200         return NULL;
201 
202     buffer = pbuf.buf;
203     length = pbuf.len;
204 
205     if (offset < 0) {
206         PyErr_SetString(PyExc_ValueError, "negative offset");
207         goto _error;
208     }
209 
210     if (offset > length) {
211         PyErr_SetString(PyExc_ValueError, "offset too large");
212         goto _error;
213     }
214 
215     res = conn_recv_string(self, buffer+offset, length-offset,
216                            &freeme, PY_SSIZE_T_MAX);
217 
218     if (res < 0) {
219         if (res == MP_BAD_MESSAGE_LENGTH) {
220             if ((self->flags & WRITABLE) == 0) {
221                 Py_BEGIN_ALLOW_THREADS
222                 CLOSE(self->handle);
223                 Py_END_ALLOW_THREADS
224                 self->handle = INVALID_HANDLE_VALUE;
225             } else {
226                 self->flags = WRITABLE;
227             }
228         }
229         mp_SetError(PyExc_IOError, res);
230     } else {
231         if (freeme == NULL) {
232             result = PyInt_FromSsize_t(res);
233         } else {
234             result = PyObject_CallFunction(BufferTooShort,
235                                            F_RBUFFER "#",
236                                            freeme, res);
237             PyMem_Free(freeme);
238             if (result) {
239                 PyErr_SetObject(BufferTooShort, result);
240                 Py_DECREF(result);
241             }
242             goto _error;
243         }
244     }
245 
246 _cleanup:
247     PyBuffer_Release(&pbuf);
248     return result;
249 
250 _error:
251     result = NULL;
252     goto _cleanup;
253 }
254 
255 /*
256  * Functions for transferring objects
257  */
258 
259 static PyObject *
connection_send_obj(ConnectionObject * self,PyObject * obj)260 connection_send_obj(ConnectionObject *self, PyObject *obj)
261 {
262     char *buffer;
263     int res;
264     Py_ssize_t length;
265     PyObject *pickled_string = NULL;
266 
267     CHECK_WRITABLE(self);
268 
269     pickled_string = PyObject_CallFunctionObjArgs(pickle_dumps, obj,
270                                                   pickle_protocol, NULL);
271     if (!pickled_string)
272         goto failure;
273 
274     if (PyString_AsStringAndSize(pickled_string, &buffer, &length) < 0)
275         goto failure;
276 
277     res = conn_send_string(self, buffer, (int)length);
278 
279     if (res < 0) {
280         mp_SetError(PyExc_IOError, res);
281         goto failure;
282     }
283 
284     Py_XDECREF(pickled_string);
285     Py_RETURN_NONE;
286 
287   failure:
288     Py_XDECREF(pickled_string);
289     return NULL;
290 }
291 
292 static PyObject *
connection_recv_obj(ConnectionObject * self)293 connection_recv_obj(ConnectionObject *self)
294 {
295     char *freeme = NULL;
296     Py_ssize_t res;
297     PyObject *temp = NULL, *result = NULL;
298 
299     CHECK_READABLE(self);
300 
301     res = conn_recv_string(self, self->buffer, CONNECTION_BUFFER_SIZE,
302                            &freeme, PY_SSIZE_T_MAX);
303 
304     if (res < 0) {
305         if (res == MP_BAD_MESSAGE_LENGTH) {
306             if ((self->flags & WRITABLE) == 0) {
307                 Py_BEGIN_ALLOW_THREADS
308                 CLOSE(self->handle);
309                 Py_END_ALLOW_THREADS
310                 self->handle = INVALID_HANDLE_VALUE;
311             } else {
312                 self->flags = WRITABLE;
313             }
314         }
315         mp_SetError(PyExc_IOError, res);
316     } else {
317         if (freeme == NULL) {
318             temp = PyString_FromStringAndSize(self->buffer, res);
319         } else {
320             temp = PyString_FromStringAndSize(freeme, res);
321             PyMem_Free(freeme);
322         }
323     }
324 
325     if (temp)
326         result = PyObject_CallFunctionObjArgs(pickle_loads,
327                                               temp, NULL);
328     Py_XDECREF(temp);
329     return result;
330 }
331 
332 /*
333  * Other functions
334  */
335 
336 static PyObject *
connection_poll(ConnectionObject * self,PyObject * args)337 connection_poll(ConnectionObject *self, PyObject *args)
338 {
339     PyObject *timeout_obj = NULL;
340     double timeout = 0.0;
341     int res;
342 
343     CHECK_READABLE(self);
344 
345     if (!PyArg_ParseTuple(args, "|O", &timeout_obj))
346         return NULL;
347 
348     if (timeout_obj == NULL) {
349         timeout = 0.0;
350     } else if (timeout_obj == Py_None) {
351         timeout = -1.0;                                 /* block forever */
352     } else {
353         timeout = PyFloat_AsDouble(timeout_obj);
354         if (PyErr_Occurred())
355             return NULL;
356         if (timeout < 0.0)
357             timeout = 0.0;
358     }
359 
360     Py_BEGIN_ALLOW_THREADS
361     res = conn_poll(self, timeout, _save);
362     Py_END_ALLOW_THREADS
363 
364     switch (res) {
365     case TRUE:
366         Py_RETURN_TRUE;
367     case FALSE:
368         Py_RETURN_FALSE;
369     default:
370         return mp_SetError(PyExc_IOError, res);
371     }
372 }
373 
374 static PyObject *
connection_fileno(ConnectionObject * self)375 connection_fileno(ConnectionObject* self)
376 {
377     if (self->handle == INVALID_HANDLE_VALUE) {
378         PyErr_SetString(PyExc_IOError, "handle is invalid");
379         return NULL;
380     }
381     return PyInt_FromLong((long)self->handle);
382 }
383 
384 static PyObject *
connection_close(ConnectionObject * self)385 connection_close(ConnectionObject *self)
386 {
387     if (self->handle != INVALID_HANDLE_VALUE) {
388         Py_BEGIN_ALLOW_THREADS
389         CLOSE(self->handle);
390         Py_END_ALLOW_THREADS
391         self->handle = INVALID_HANDLE_VALUE;
392     }
393 
394     Py_RETURN_NONE;
395 }
396 
397 static PyObject *
connection_repr(ConnectionObject * self)398 connection_repr(ConnectionObject *self)
399 {
400     static char *conn_type[] = {"read-only", "write-only", "read-write"};
401 
402     assert(self->flags >= 1 && self->flags <= 3);
403     return FROM_FORMAT("<%s %s, handle %zd>",
404                        conn_type[self->flags - 1],
405                        CONNECTION_NAME, (Py_ssize_t)self->handle);
406 }
407 
408 /*
409  * Getters and setters
410  */
411 
412 static PyObject *
connection_closed(ConnectionObject * self,void * closure)413 connection_closed(ConnectionObject *self, void *closure)
414 {
415     return PyBool_FromLong((long)(self->handle == INVALID_HANDLE_VALUE));
416 }
417 
418 static PyObject *
connection_readable(ConnectionObject * self,void * closure)419 connection_readable(ConnectionObject *self, void *closure)
420 {
421     return PyBool_FromLong((long)(self->flags & READABLE));
422 }
423 
424 static PyObject *
connection_writable(ConnectionObject * self,void * closure)425 connection_writable(ConnectionObject *self, void *closure)
426 {
427     return PyBool_FromLong((long)(self->flags & WRITABLE));
428 }
429 
430 /*
431  * Tables
432  */
433 
434 static PyMethodDef connection_methods[] = {
435     {"send_bytes", (PyCFunction)connection_sendbytes, METH_VARARGS,
436      "send the byte data from a readable buffer-like object"},
437     {"recv_bytes", (PyCFunction)connection_recvbytes, METH_VARARGS,
438      "receive byte data as a string"},
439     {"recv_bytes_into",(PyCFunction)connection_recvbytes_into,METH_VARARGS,
440      "receive byte data into a writeable buffer-like object\n"
441      "returns the number of bytes read"},
442 
443     {"send", (PyCFunction)connection_send_obj, METH_O,
444      "send a (picklable) object"},
445     {"recv", (PyCFunction)connection_recv_obj, METH_NOARGS,
446      "receive a (picklable) object"},
447 
448     {"poll", (PyCFunction)connection_poll, METH_VARARGS,
449      "whether there is any input available to be read"},
450     {"fileno", (PyCFunction)connection_fileno, METH_NOARGS,
451      "file descriptor or handle of the connection"},
452     {"close", (PyCFunction)connection_close, METH_NOARGS,
453      "close the connection"},
454 
455     {NULL}  /* Sentinel */
456 };
457 
458 static PyGetSetDef connection_getset[] = {
459     {"closed", (getter)connection_closed, NULL,
460      "True if the connection is closed", NULL},
461     {"readable", (getter)connection_readable, NULL,
462      "True if the connection is readable", NULL},
463     {"writable", (getter)connection_writable, NULL,
464      "True if the connection is writable", NULL},
465     {NULL}
466 };
467 
468 /*
469  * Connection type
470  */
471 
472 PyDoc_STRVAR(connection_doc,
473              "Connection type whose constructor signature is\n\n"
474              "    Connection(handle, readable=True, writable=True).\n\n"
475              "The constructor does *not* duplicate the handle.");
476 
477 PyTypeObject CONNECTION_TYPE = {
478     PyVarObject_HEAD_INIT(NULL, 0)
479     /* tp_name           */ "_multiprocessing." CONNECTION_NAME,
480     /* tp_basicsize      */ sizeof(ConnectionObject),
481     /* tp_itemsize       */ 0,
482     /* tp_dealloc        */ (destructor)connection_dealloc,
483     /* tp_print          */ 0,
484     /* tp_getattr        */ 0,
485     /* tp_setattr        */ 0,
486     /* tp_compare        */ 0,
487     /* tp_repr           */ (reprfunc)connection_repr,
488     /* tp_as_number      */ 0,
489     /* tp_as_sequence    */ 0,
490     /* tp_as_mapping     */ 0,
491     /* tp_hash           */ 0,
492     /* tp_call           */ 0,
493     /* tp_str            */ 0,
494     /* tp_getattro       */ 0,
495     /* tp_setattro       */ 0,
496     /* tp_as_buffer      */ 0,
497     /* tp_flags          */ Py_TPFLAGS_DEFAULT | Py_TPFLAGS_BASETYPE |
498                             Py_TPFLAGS_HAVE_WEAKREFS,
499     /* tp_doc            */ connection_doc,
500     /* tp_traverse       */ 0,
501     /* tp_clear          */ 0,
502     /* tp_richcompare    */ 0,
503     /* tp_weaklistoffset */ offsetof(ConnectionObject, weakreflist),
504     /* tp_iter           */ 0,
505     /* tp_iternext       */ 0,
506     /* tp_methods        */ connection_methods,
507     /* tp_members        */ 0,
508     /* tp_getset         */ connection_getset,
509     /* tp_base           */ 0,
510     /* tp_dict           */ 0,
511     /* tp_descr_get      */ 0,
512     /* tp_descr_set      */ 0,
513     /* tp_dictoffset     */ 0,
514     /* tp_init           */ 0,
515     /* tp_alloc          */ 0,
516     /* tp_new            */ connection_new,
517 };
518 
519 #endif /* CONNECTION_H */
520