1 /* Copyright 2015 The TensorFlow Authors. All Rights Reserved.
2 
3 Licensed under the Apache License, Version 2.0 (the "License");
4 you may not use this file except in compliance with the License.
5 You may obtain a copy of the License at
6 
7     http://www.apache.org/licenses/LICENSE-2.0
8 
9 Unless required by applicable law or agreed to in writing, software
10 distributed under the License is distributed on an "AS IS" BASIS,
11 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 See the License for the specific language governing permissions and
13 limitations under the License.
14 ==============================================================================*/
15 
16 #include "tensorflow/python/lib/core/py_func.h"
17 
18 #include <array>
19 
20 #include <Python.h>
21 
22 #include "numpy/arrayobject.h"
23 #include "tensorflow/c/eager/c_api.h"
24 #include "tensorflow/c/eager/c_api_internal.h"
25 #include "tensorflow/c/tf_status_helper.h"
26 #include "tensorflow/core/framework/allocation_description.pb.h"
27 #include "tensorflow/core/framework/op_kernel.h"
28 #include "tensorflow/core/lib/core/errors.h"
29 #include "tensorflow/core/lib/core/threadpool.h"
30 #include "tensorflow/core/platform/macros.h"
31 #include "tensorflow/core/platform/mutex.h"
32 #include "tensorflow/core/platform/types.h"
33 #include "tensorflow/python/eager/pywrap_tfe.h"
34 #include "tensorflow/python/lib/core/ndarray_tensor_bridge.h"
35 #include "tensorflow/python/lib/core/py_util.h"
36 #include "tensorflow/python/lib/core/safe_ptr.h"
37 
38 namespace tensorflow {
39 namespace {
40 
41 static mutex mu(LINKER_INITIALIZED);
42 static PyObject* py_trampoline GUARDED_BY(mu) = nullptr;
43 
44 // Returns the py_trampoline that is used to pass the control to the
45 // python runtime.
GetPyTrampoline()46 PyObject* GetPyTrampoline() {
47   mutex_lock l(mu);
48   return py_trampoline;
49 }
50 
51 // A call to the registered python function.
52 struct PyCall {
53   // Passed to python runtime to call the python function registered
54   // with this "token".
55   string token;
56 
57   // The device on which Tensors are stored; only used for EagerPyFunc.
58   Device* device = nullptr;
59 
60   // True if the call is associated with an EagerPyFunc.
61   bool eager = false;
62 
63   // Inputs and outputs of this function invocation.
64   std::vector<Tensor> ins;
65   std::vector<Tensor> out;
66 };
67 
IsCPUDevice(const Device * d)68 bool IsCPUDevice(const Device* d) {
69   return d == nullptr || d->tensorflow_gpu_device_info() == nullptr;
70 }
71 
72 // Givens the 'call', prepares the token and inputs as a python tuple
73 // that is appropriate for calling the trampoline.
MakeArgTuple(const PyCall * call,PyObject ** tuple)74 Status MakeArgTuple(const PyCall* call, PyObject** tuple) {
75   int64 n = call->ins.size();
76   PyObject* lst = PyList_New(n);
77   CHECK(lst);
78   // TFE_TensorHandle assumes that CPU is identified by nullptr.
79   Device* device = IsCPUDevice(call->device) ? nullptr : call->device;
80   for (int64 i = 0; i < n; ++i) {
81     PyObject* arg = nullptr;
82     const Tensor& t = call->ins[i];
83     if (call->eager) {
84       arg = EagerTensorFromHandle(new TFE_TensorHandle(t, device, device));
85       if (arg == nullptr) {
86         Py_DECREF(lst);
87         return errors::Internal("Unable to procure EagerTensor from Tensor.");
88       }
89     } else {
90       Status s = ConvertTensorToNdarray(t, &arg);
91       if (!s.ok()) {
92         Py_DECREF(lst);
93         return s;
94       }
95     }
96     PyList_SetItem(lst, i, arg);
97   }
98   const char* device_name =
99       device == nullptr ? nullptr : device->attributes().name().c_str();
100   *tuple = Py_BuildValue("(ssN)", call->token.c_str(), device_name, lst);
101   CHECK(*tuple);
102   return Status::OK();
103 }
104 
105 // Returns the corresponding tf dtype in 'tf' for numpy data type
106 // 'np'.  Returns an error if the type is not supported by this
107 // module.
NumericNpDTypeToTfDType(const int np,DataType * tf)108 Status NumericNpDTypeToTfDType(const int np, DataType* tf) {
109   switch (np) {
110     case NPY_FLOAT16:
111       *tf = DT_HALF;
112       break;
113     case NPY_FLOAT32:
114       *tf = DT_FLOAT;
115       break;
116     case NPY_FLOAT64:
117       *tf = DT_DOUBLE;
118       break;
119     case NPY_INT32:
120       *tf = DT_INT32;
121       break;
122     case NPY_UINT8:
123       *tf = DT_UINT8;
124       break;
125     case NPY_INT8:
126       *tf = DT_INT8;
127       break;
128     case NPY_UINT16:
129       *tf = DT_UINT16;
130       break;
131     case NPY_INT16:
132       *tf = DT_INT16;
133       break;
134     case NPY_INT64:
135       *tf = DT_INT64;
136       break;
137     case NPY_BOOL:
138       *tf = DT_BOOL;
139       break;
140     case NPY_COMPLEX64:
141       *tf = DT_COMPLEX64;
142       break;
143     case NPY_COMPLEX128:
144       *tf = DT_COMPLEX128;
145       break;
146     default:
147       return errors::Unimplemented("Unsupported numpy type ", np);
148   }
149   return Status::OK();
150 }
151 
IsSingleNone(PyObject * obj)152 bool IsSingleNone(PyObject* obj) {
153   if (!PyArray_Check(obj)) {
154     return false;
155   }
156   PyArrayObject* array_obj = reinterpret_cast<PyArrayObject*>(obj);
157   if (PyArray_NDIM(array_obj) != 0 || PyArray_SIZE(array_obj) != 1) {
158     return false;
159   }
160   std::array<npy_intp, 0> indices;
161   char* item_ptr =
162       static_cast<char*>(PyArray_GetPtr(array_obj, indices.data()));
163   PyObject* item = PyArray_GETITEM(array_obj, item_ptr);
164   CHECK(item);
165   return item == Py_None;
166 }
167 
168 // Retrieves a Tensor from `eager_tensor` and stores it in `output_tensor`.
169 // Validates that `output_tensor` is backed by memory in `expected_device`
170 // (which is assumed to be a local device, one on which the kernel was
171 // executed.)
172 //
173 // It may be nice to copy the tensor to the right device instead of failing if
174 // it isn't already there. This is left as a future exercise.  The required
175 // device-copying logic is implemented in Python at the moment.
ExtractTensorFromEagerTensor(const PyObject * eager_tensor,const Device * expected_device,const Tensor ** output_tensor)176 tensorflow::Status ExtractTensorFromEagerTensor(const PyObject* eager_tensor,
177                                                 const Device* expected_device,
178                                                 const Tensor** output_tensor) {
179   auto handle = EagerTensor_Handle(eager_tensor)->handle;
180   Device* actual_device = handle->device();
181   TF_RETURN_IF_ERROR(handle->Tensor(output_tensor));
182   // actual_device may be nullptr, which implies local CPU.
183   if (expected_device == actual_device) return Status::OK();
184   const string& expected_device_name = expected_device->attributes().name();
185   if (actual_device == nullptr) {
186     if (!IsCPUDevice(expected_device)) {
187       return errors::Internal(
188           "Expected the py_func to return a Tensor backed by memory in ",
189           expected_device_name,
190           ", but is actually backed by local host memory. This is a bug.");
191     }
192     return Status::OK();
193   }
194   // NOTE(ebrevdo): Here we could try comparing "actual_device_name"
195   // (actual_device->attributes()->name()) to expected_device_name and ensure
196   // they're the same.  However, this comparison fails if we create a ClusterDef
197   // on localhost, mainly because the Device created by Eager code doesn't match
198   // the device created by a session.  In this case, expected_device_name may
199   // contain "worker" but the Eager device name contains "localhost".  Since we
200   // can't easily access the true underlying device of "worker" here, we are not
201   // able to perform a proper comparison.  Furthermore, we can't check
202   // IsCPUDevice(actual_device) because the kernel's device may indeed be a
203   // GPU device (the python interpreter doesn't use it, however).
204   return Status::OK();
205 }
206 
207 // Calls the registered py function through the trampoline.
DoCallPyFunc(PyCall * call,bool * out_log_on_error)208 Status DoCallPyFunc(PyCall* call, bool* out_log_on_error) {
209   *out_log_on_error = true;
210   PyObject* trampoline = GetPyTrampoline();
211   if (trampoline == nullptr) {
212     return errors::InvalidArgument(
213         "Missing py trampoline. Most likely, it is a link error.");
214   }
215   // Prepare the argument.
216   PyObject* args = nullptr;
217   TF_RETURN_IF_ERROR(MakeArgTuple(call, &args));
218   CHECK(args);
219 
220   // Invokes the trampoline.
221   PyObject* result = PyEval_CallObject(trampoline, args);
222   Py_DECREF(args);
223   if (result == nullptr) {
224     if (PyErr_Occurred()) {
225       if (PyErr_ExceptionMatches(PyExc_ValueError) ||
226           PyErr_ExceptionMatches(PyExc_TypeError)) {
227         return errors::InvalidArgument(PyExceptionFetch());
228       } else if (PyErr_ExceptionMatches(PyExc_StopIteration)) {
229         *out_log_on_error = false;
230         return errors::OutOfRange(PyExceptionFetch());
231       } else if (PyErr_ExceptionMatches(PyExc_MemoryError)) {
232         return errors::ResourceExhausted(PyExceptionFetch());
233       } else if (PyErr_ExceptionMatches(PyExc_NotImplementedError)) {
234         return errors::Unimplemented(PyExceptionFetch());
235       } else {
236         // TODO(ebrevdo): Check if exception is an OpError and use the
237         // OpError.error_code property to map it back in the Status.
238         return errors::Unknown(PyExceptionFetch());
239       }
240     } else {
241       return errors::Internal("Failed to run py callback ", call->token,
242                               ": see error log.");
243     }
244   }
245 
246   // Process the return values and convert them to TF Tensors.
247   Status s = Status::OK();
248   if (PyList_Check(result)) {
249     // `result` is a Python list; if this operation is an `EagerPyFunc`, then
250     // every item in the list must be an `EagerTensor`; otherwise, every element
251     // must be a NumPy array.
252     call->out.clear();
253     for (int i = 0; i < PyList_Size(result); ++i) {
254       Tensor t;
255       if (call->eager) {
256         const PyObject* item = PyList_GetItem(result, i);
257         if (EagerTensor_CheckExact(item)) {
258           const Tensor* tensor = nullptr;
259           s = ExtractTensorFromEagerTensor(item, call->device, &tensor);
260           if (s.ok()) t = *tensor;
261         } else {
262           s = errors::FailedPrecondition(
263               "Expected EagerTensor, found PyObject of type: ",
264               Py_TYPE(item)->tp_name);
265         }
266       } else {
267         s = ConvertNdarrayToTensor(PyList_GetItem(result, i), &t);
268       }
269 
270       if (!s.ok()) {
271         break;
272       }
273       call->out.push_back(t);
274     }
275   } else if (EagerTensor_CheckExact(result) || result == Py_None) {
276     // result is an `EagerTensor` or `None`.
277     DCHECK(call->eager);
278     if (result != Py_None) {
279       const Tensor* t = nullptr;
280       s = ExtractTensorFromEagerTensor(result, call->device, &t);
281       if (s.ok()) call->out.push_back(*t);
282     }
283   } else if (PyArray_Check(result)) {
284     // `result` is a NumPy array.
285     DCHECK(!call->eager);
286     if (!IsSingleNone(result)) {
287       Tensor t;
288       s = ConvertNdarrayToTensor(result, &t);
289       if (s.ok()) {
290         call->out.push_back(t);
291       }
292     }
293   } else {
294     s = errors::Internal("Unexpected PyObject was returned: ",
295                          Py_TYPE(result)->tp_name);
296   }
297   Py_DECREF(result);
298   return s;
299 }
300 
301 }  // end namespace
302 
303 // Outside anonymous namespace just to make the friend declaration in
304 // tensorflow::Tensor apply.
305 class NumpyTensorBuffer : public TensorBuffer {
306  public:
NumpyTensorBuffer(PyArrayObject * array,size_t len,void * data)307   NumpyTensorBuffer(PyArrayObject* array, size_t len, void* data)
308       : TensorBuffer(data), array_(array), len_(len) {}
309 
~NumpyTensorBuffer()310   ~NumpyTensorBuffer() override {
311     // Note: The session::run wrapper is responsible for freeing this while
312     // holding the GIL.
313     DelayedNumpyDecref(data(), len_, array_);
314   }
315 
size() const316   size_t size() const override { return len_; }
root_buffer()317   TensorBuffer* root_buffer() override { return this; }
FillAllocationDescription(AllocationDescription * proto) const318   void FillAllocationDescription(AllocationDescription* proto) const override {
319     tensorflow::int64 rb = size();
320     proto->set_requested_bytes(rb);
321     proto->set_allocator_name(tensorflow::cpu_allocator()->Name());
322   }
MakeTensor(DataType dtype,const TensorShape & shape)323   Tensor MakeTensor(DataType dtype, const TensorShape& shape) {
324     CHECK_EQ(len_, shape.num_elements() * DataTypeSize(dtype));
325     return Tensor(dtype, shape, this);
326   }
327 
328   // Prevents input forwarding from overwriting this buffer.
OwnsMemory() const329   bool OwnsMemory() const override { return false; }
330 
331  private:
332   PyArrayObject* array_;
333   size_t len_;
334 };
335 
PyObjectToString(PyObject * obj,string * str)336 Status PyObjectToString(PyObject* obj, string* str) {
337   char* py_bytes;
338   Py_ssize_t size;
339   if (PyBytes_AsStringAndSize(obj, &py_bytes, &size) != -1) {
340     str->assign(py_bytes, size);
341     return Status::OK();
342   }
343 #if PY_MAJOR_VERSION >= 3
344   const char* ptr = PyUnicode_AsUTF8AndSize(obj, &size);
345   if (ptr != nullptr) {
346     str->assign(ptr, size);
347     return Status::OK();
348   }
349 #else
350   if (PyUnicode_Check(obj)) {
351     PyObject* unicode = PyUnicode_AsUTF8String(obj);
352     char* ptr;
353     if (unicode && PyString_AsStringAndSize(unicode, &ptr, &size) != -1) {
354       str->assign(ptr, size);
355       Py_DECREF(unicode);
356       return Status::OK();
357     }
358     Py_XDECREF(unicode);
359   }
360 #endif
361   return errors::Unimplemented("Unsupported object type ",
362                                obj->ob_type->tp_name);
363 }
364 
ConvertNdarrayToTensor(PyObject * obj,Tensor * ret)365 Status ConvertNdarrayToTensor(PyObject* obj, Tensor* ret) {
366   PyArrayObject* input = reinterpret_cast<PyArrayObject*>(obj);
367   DataType dtype = DT_INVALID;
368   TensorShape shape;
369   for (int i = 0; i < PyArray_NDIM(input); ++i) {
370     shape.AddDim(PyArray_SHAPE(input)[i]);
371   }
372   const int np_type = PyArray_TYPE(input);
373   switch (np_type) {
374     case NPY_OBJECT: {
375       dtype = DT_STRING;
376       Tensor t(dtype, shape);
377       auto tflat = t.flat<string>();
378       PyObject** input_data = reinterpret_cast<PyObject**>(PyArray_DATA(input));
379       for (int i = 0; i < tflat.dimension(0); ++i) {
380         TF_RETURN_IF_ERROR(PyObjectToString(input_data[i], &tflat(i)));
381       }
382       *ret = t;
383       break;
384     }
385     case NPY_STRING: {
386       dtype = DT_STRING;
387       Tensor t(dtype, shape);
388       auto tflat = t.flat<string>();
389       char* input_data = PyArray_BYTES(input);
390       Py_ssize_t el_size = PyArray_ITEMSIZE(input);
391       for (int i = 0; i < tflat.dimension(0); ++i) {
392         tflat(i) = string(input_data + i * el_size, el_size);
393       }
394       *ret = t;
395       break;
396     }
397     default: {
398       TF_RETURN_IF_ERROR(NumericNpDTypeToTfDType(PyArray_TYPE(input), &dtype));
399       CHECK(DataTypeCanUseMemcpy(dtype));
400       if (reinterpret_cast<intptr_t>(PyArray_DATA(input)) %
401               std::max(1, EIGEN_MAX_ALIGN_BYTES) !=
402           0) {
403         Tensor t(dtype, shape);
404         StringPiece p = t.tensor_data();
405         memcpy(const_cast<char*>(p.data()), PyArray_DATA(input), p.size());
406         *ret = t;
407       } else {
408         // Incref the array as the calling context will decref it when we
409         // return and we want to keep a handle to this memory.
410         Py_INCREF(input);
411         NumpyTensorBuffer* buf = new NumpyTensorBuffer(
412             input, shape.num_elements() * DataTypeSize(dtype),
413             PyArray_DATA(input));
414         *ret = buf->MakeTensor(dtype, shape);
415         buf->Unref();
416       }
417     }
418   }
419   return Status::OK();
420 }
421 
422 // Creates a numpy array in 'ret' which either aliases the content of 't' or has
423 // a copy.
ConvertTensorToNdarray(const Tensor & t,PyObject ** ret)424 Status ConvertTensorToNdarray(const Tensor& t, PyObject** ret) {
425   int typenum = -1;
426   TF_RETURN_IF_ERROR(TF_DataType_to_PyArray_TYPE(
427       static_cast<TF_DataType>(t.dtype()), &typenum));
428   PyArray_Descr* descr = PyArray_DescrFromType(typenum);
429   CHECK(descr);
430   std::vector<npy_intp> dims;
431   dims.reserve(t.dims());
432   for (int i = 0; i < t.dims(); ++i) {
433     dims.push_back(t.dim_size(i));
434   }
435   Tensor* copy = new Tensor(t);
436   if (ArrayFromMemory(dims.size(), dims.data(),
437                       const_cast<char*>(copy->tensor_data().data()), t.dtype(),
438                       [copy]() { delete copy; }, ret)
439           .ok()) {
440     return Status::OK();
441   }
442   delete copy;
443 
444   PyObject* obj = PyArray_Empty(dims.size(), dims.data(), descr, 0);
445   if (obj == nullptr) {
446     return errors::Internal("Failed to allocate np array: ",
447                             t.shape().DebugString());
448   }
449   PyArrayObject* np_array = reinterpret_cast<PyArrayObject*>(obj);
450   if (typenum == NPY_OBJECT) {
451     CHECK_EQ(DT_STRING, t.dtype());
452     auto tflat = t.flat<string>();
453     PyObject** out = reinterpret_cast<PyObject**>(PyArray_DATA(np_array));
454     for (int i = 0; i < tflat.dimension(0); ++i) {
455       const string& el = tflat(i);
456       out[i] = PyBytes_FromStringAndSize(el.data(), el.size());
457       if (out[i] == nullptr) {
458         for (int j = 0; j < i; ++j) {
459           Py_DECREF(out[j]);
460         }
461         Py_DECREF(obj);
462         return errors::Internal("Failed to allocate a copy of string ", i);
463       }
464     }
465   } else {
466     CHECK(DataTypeCanUseMemcpy(t.dtype()));
467     StringPiece p = t.tensor_data();
468     memcpy(PyArray_DATA(np_array), p.data(), p.size());
469   }
470   *ret = PyArray_Return(np_array);
471   return Status::OK();
472 }
473 
InitializePyTrampoline(PyObject * trampoline)474 void InitializePyTrampoline(PyObject* trampoline) {
475   mutex_lock l(mu);
476   if (py_trampoline == nullptr) {
477     py_trampoline = trampoline;
478     Py_INCREF(py_trampoline);
479   } else {
480     LOG(WARNING) << "InitializeCallback should only be called once";
481   }
482 }
483 
484 class PyFuncOp : public OpKernel {
485  public:
PyFuncOp(OpKernelConstruction * ctx)486   explicit PyFuncOp(OpKernelConstruction* ctx) : OpKernel(ctx) {
487     OP_REQUIRES_OK(ctx, ctx->GetAttr("token", &token_));
488     eager_ = type_string() == "EagerPyFunc";
489   }
490 
IsExpensive()491   bool IsExpensive() override { return true; }
492 
Compute(OpKernelContext * ctx)493   void Compute(OpKernelContext* ctx) override {
494     PyCall call;
495     call.token = token_;
496     call.eager = eager_;
497     if (call.eager) {
498       // Eager's C API uses `Device`, whereas `OpKernelContext` stores a
499       // `DeviceBase`; attempt to downcast.
500       call.device = dynamic_cast<Device*>(ctx->device());
501       if (call.device == nullptr) {
502         ctx->CtxFailureWithWarning(errors::Internal(
503             "Unrecognized device class: ", ctx->device()->name()));
504         return;
505       }
506     }
507 
508     for (int i = 0; i < ctx->num_inputs(); ++i) {
509       call.ins.push_back(ctx->input(i));
510     }
511 
512     // NOTE(mrry): There is a potential time-of-check-to-time-of-use race here.
513     // because it is possible that `Py_Finalize()` could be called in another
514     // thread between this check and the  call to `PyGILState_Ensure()`, which
515     // will abort the process if `Py_Finalize()` has been called. A more robust
516     // solution would be welcome, but it is not obvious how to make this work
517     // using the current Python C API.
518     OP_REQUIRES(ctx, Py_IsInitialized(),
519                 errors::FailedPrecondition(
520                     "Python interpreter state is not initialized. "
521                     "The process may be terminated."));
522 
523     PyGILState_STATE py_threadstate;
524     py_threadstate = PyGILState_Ensure();
525     bool log_on_error;
526     Status s = DoCallPyFunc(&call, &log_on_error);
527     // Sometimes py_funcs can be called without a session and leak memory. This
528     // ensures we clear the decref cache so this doesn't happen.
529     ClearDecrefCache();
530     PyGILState_Release(py_threadstate);
531 
532     // Ensures that GIL is released even when !s.ok().
533     if (!s.ok()) {
534       if (log_on_error) {
535         ctx->CtxFailureWithWarning(s);
536       } else {
537         ctx->CtxFailure(s);
538       }
539       return;
540     }
541 
542     OP_REQUIRES(ctx, static_cast<int32>(call.out.size()) == ctx->num_outputs(),
543                 errors::InvalidArgument(token_, " returns ", call.out.size(),
544                                         " values, but expects to see ",
545                                         ctx->num_outputs(), " values."));
546     for (size_t i = 0; i < call.out.size(); ++i) {
547       const auto& t = call.out[i];
548       OP_REQUIRES(
549           ctx, t.dtype() == output_type(i),
550           errors::InvalidArgument(i, "-th value returned by ", token_, " is ",
551                                   DataTypeString(t.dtype()), ", but expects ",
552                                   DataTypeString(output_type(i))));
553       ctx->set_output(i, t);
554     }
555   }
556 
557  private:
558   string token_;
559 
560   // True if and only if this op should execute the python function eagerly,
561   // i.e., if and only if the eager attribute is set.
562   bool eager_;
563 
564   TF_DISALLOW_COPY_AND_ASSIGN(PyFuncOp);
565 };
566 
567 REGISTER_KERNEL_BUILDER(Name("PyFunc").Device(DEVICE_CPU), PyFuncOp);
568 REGISTER_KERNEL_BUILDER(Name("PyFuncStateless").Device(DEVICE_CPU), PyFuncOp);
569 REGISTER_KERNEL_BUILDER(Name("EagerPyFunc").Device(DEVICE_CPU), PyFuncOp);
570 REGISTER_KERNEL_BUILDER(Name("EagerPyFunc").Device(DEVICE_GPU), PyFuncOp);
571 
572 }  // end namespace tensorflow
573