1 /* Copyright 2015 The TensorFlow Authors. All Rights Reserved.
2 
3 Licensed under the Apache License, Version 2.0 (the "License");
4 you may not use this file except in compliance with the License.
5 You may obtain a copy of the License at
6 
7     http://www.apache.org/licenses/LICENSE-2.0
8 
9 Unless required by applicable law or agreed to in writing, software
10 distributed under the License is distributed on an "AS IS" BASIS,
11 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 See the License for the specific language governing permissions and
13 limitations under the License.
14 ==============================================================================*/
15 
16 #include "tensorflow/core/kernels/queue_op.h"
17 #include "tensorflow/core/framework/op_kernel.h"
18 #include "tensorflow/core/framework/queue_interface.h"
19 #include "tensorflow/core/framework/tensor.h"
20 #include "tensorflow/core/framework/tensor_shape.h"
21 #include "tensorflow/core/framework/types.h"
22 #include "tensorflow/core/lib/core/errors.h"
23 #include "tensorflow/core/platform/macros.h"
24 #include "tensorflow/core/platform/types.h"
25 
26 namespace tensorflow {
27 
QueueOp(OpKernelConstruction * context)28 QueueOp::QueueOp(OpKernelConstruction* context) : ResourceOpKernel(context) {
29   OP_REQUIRES_OK(context, context->GetAttr("capacity", &capacity_));
30   if (capacity_ < 0) {
31     capacity_ = QueueBase::kUnbounded;
32   }
33   OP_REQUIRES_OK(context,
34                  context->GetAttr("component_types", &component_types_));
35 }
36 
Compute(OpKernelContext * context)37 void QueueOp::Compute(OpKernelContext* context) {
38   ResourceOpKernel<QueueInterface>::Compute(context);
39   mutex_lock l(mu_);
40   if (resource_ && context->track_allocations()) {
41     context->record_persistent_memory_allocation(resource_->MemoryUsed());
42   }
43 }
44 
VerifyResource(QueueInterface * queue)45 Status QueueOp::VerifyResource(QueueInterface* queue) {
46   return queue->MatchesNodeDef(def());
47 }
48 
49 
QueueOpKernel(OpKernelConstruction * context)50 QueueOpKernel::QueueOpKernel(OpKernelConstruction* context)
51     : AsyncOpKernel(context) {}
52 
ComputeAsync(OpKernelContext * ctx,DoneCallback callback)53 void QueueOpKernel::ComputeAsync(OpKernelContext* ctx, DoneCallback callback) {
54   QueueInterface* queue;
55   if (ctx->input_dtype(0) == DT_RESOURCE) {
56     OP_REQUIRES_OK_ASYNC(
57         ctx, LookupResource(ctx, HandleFromInput(ctx, 0), &queue), callback);
58   } else {
59     OP_REQUIRES_OK_ASYNC(ctx, GetResourceFromContext(ctx, "handle", &queue),
60                          callback);
61   }
62   ComputeAsync(ctx, queue, [callback, queue]() {
63     queue->Unref();
64     callback();
65   });
66 }
67 
QueueAccessOpKernel(OpKernelConstruction * context)68 QueueAccessOpKernel::QueueAccessOpKernel(OpKernelConstruction* context)
69     : QueueOpKernel(context) {
70   OP_REQUIRES_OK(context, context->GetAttr("timeout_ms", &timeout_));
71   // TODO(keveman): Enable timeout.
72   OP_REQUIRES(context, timeout_ == -1,
73               errors::InvalidArgument("Timeout not supported yet."));
74 }
75 
76 // Defines an EnqueueOp, the execution of which enqueues a tuple of
77 // tensors in the given Queue.
78 //
79 // The op has 1 + k inputs, where k is the number of components in the
80 // tuples stored in the given Queue:
81 // - Input 0: queue handle.
82 // - Input 1: 0th element of the tuple.
83 // - ...
84 // - Input (1+k): kth element of the tuple.
EnqueueOp(OpKernelConstruction * context)85 EnqueueOp::EnqueueOp(OpKernelConstruction* context)
86     : QueueAccessOpKernel(context) {}
87 
ComputeAsync(OpKernelContext * ctx,QueueInterface * queue,DoneCallback callback)88 void EnqueueOp::ComputeAsync(OpKernelContext* ctx, QueueInterface* queue,
89                              DoneCallback callback) {
90   DataTypeVector expected_inputs;
91   if (ctx->input_dtype(0) == DT_RESOURCE) {
92     expected_inputs.push_back(DT_RESOURCE);
93   } else {
94     expected_inputs.push_back(DT_STRING_REF);
95   }
96   for (DataType dt : queue->component_dtypes()) {
97     expected_inputs.push_back(dt);
98   }
99   OP_REQUIRES_OK_ASYNC(ctx, ctx->MatchSignature(expected_inputs, {}), callback);
100 
101   QueueInterface::Tuple tuple;
102   OpInputList components;
103   OP_REQUIRES_OK_ASYNC(ctx, ctx->input_list("components", &components),
104                        callback);
105   for (const Tensor& Tcomponent : components) {
106     tuple.push_back(Tcomponent);
107   }
108 
109   OP_REQUIRES_OK_ASYNC(ctx, queue->ValidateTuple(tuple), callback);
110   queue->TryEnqueue(tuple, ctx, callback);
111 }
112 
113 // Defines an EnqueueManyOp, the execution of which slices each
114 // component of a tuple of tensors along the 0th dimension, and
115 // enqueues tuples of slices in the given Queue.
116 //
117 // The op has 1 + k inputs, where k is the number of components in the
118 // tuples stored in the given Queue:
119 // - Input 0: queue handle.
120 // - Input 1: 0th element of the tuple.
121 // - ...
122 // - Input (1+k): kth element of the tuple.
123 //
124 // N.B. All tuple components must have the same size in the 0th
125 // dimension.
EnqueueManyOp(OpKernelConstruction * context)126 EnqueueManyOp::EnqueueManyOp(OpKernelConstruction* context)
127     : QueueAccessOpKernel(context) {}
128 
ComputeAsync(OpKernelContext * ctx,QueueInterface * queue,DoneCallback callback)129 void EnqueueManyOp::ComputeAsync(OpKernelContext* ctx, QueueInterface* queue,
130                                  DoneCallback callback) {
131   DataTypeVector expected_inputs;
132   if (ctx->input_dtype(0) == DT_RESOURCE) {
133     expected_inputs.push_back(DT_RESOURCE);
134   } else {
135     expected_inputs.push_back(DT_STRING_REF);
136   }
137   for (DataType dt : queue->component_dtypes()) {
138     expected_inputs.push_back(dt);
139   }
140   OP_REQUIRES_OK_ASYNC(ctx, ctx->MatchSignature(expected_inputs, {}), callback);
141 
142   QueueInterface::Tuple tuple;
143   OpInputList components;
144   OP_REQUIRES_OK_ASYNC(ctx, ctx->input_list("components", &components),
145                        callback);
146   for (const Tensor& Tcomponent : components) {
147     tuple.push_back(Tcomponent);
148   }
149 
150   OP_REQUIRES_OK_ASYNC(ctx, queue->ValidateManyTuple(tuple), callback);
151   queue->TryEnqueueMany(tuple, ctx, callback);
152 }
153 
154 EnqueueManyOp::~EnqueueManyOp() = default;
155 
156 // Defines a DequeueOp, the execution of which dequeues a tuple of
157 // tensors from the given Queue.
158 //
159 // The op has one input, which is the handle of the appropriate
160 // Queue. The op has k outputs, where k is the number of components in
161 // the tuples stored in the given Queue, and output i is the ith
162 // component of the dequeued tuple.
DequeueOp(OpKernelConstruction * context)163 DequeueOp::DequeueOp(OpKernelConstruction* context)
164     : QueueAccessOpKernel(context) {}
165 
ComputeAsync(OpKernelContext * ctx,QueueInterface * queue,DoneCallback callback)166 void DequeueOp::ComputeAsync(OpKernelContext* ctx, QueueInterface* queue,
167                              DoneCallback callback) {
168   if (ctx->input_dtype(0) == DT_RESOURCE) {
169     OP_REQUIRES_OK_ASYNC(
170         ctx, ctx->MatchSignature({DT_RESOURCE}, queue->component_dtypes()),
171         callback);
172   } else {
173     OP_REQUIRES_OK_ASYNC(
174         ctx, ctx->MatchSignature({DT_STRING_REF}, queue->component_dtypes()),
175         callback);
176   }
177 
178   queue->TryDequeue(ctx, [ctx, callback](const QueueInterface::Tuple& tuple) {
179     if (!ctx->status().ok()) {
180       callback();
181       return;
182     }
183     OpOutputList output_components;
184     OP_REQUIRES_OK_ASYNC(
185         ctx, ctx->output_list("components", &output_components), callback);
186     for (int i = 0; i < ctx->num_outputs(); ++i) {
187       output_components.set(i, tuple[i]);
188     }
189     callback();
190   });
191 }
192 
193 DequeueOp::~DequeueOp() = default;
194 
195 // Defines a DequeueManyOp, the execution of which concatenates the
196 // requested number of elements from the given Queue along the 0th
197 // dimension, and emits the result as a single tuple of tensors.
198 //
199 // The op has two inputs:
200 // - Input 0: the handle to a queue.
201 // - Input 1: the number of elements to dequeue.
202 //
203 // The op has k outputs, where k is the number of components in the
204 // tuples stored in the given Queue, and output i is the ith component
205 // of the dequeued tuple.
DequeueManyOp(OpKernelConstruction * context)206 DequeueManyOp::DequeueManyOp(OpKernelConstruction* context)
207     : QueueAccessOpKernel(context) {}
208 
ComputeAsync(OpKernelContext * ctx,QueueInterface * queue,DoneCallback callback)209 void DequeueManyOp::ComputeAsync(OpKernelContext* ctx, QueueInterface* queue,
210                                  DoneCallback callback) {
211   const Tensor& Tnum_elements = ctx->input(1);
212   int32 num_elements = Tnum_elements.flat<int32>()(0);
213 
214   OP_REQUIRES_ASYNC(ctx, num_elements >= 0,
215                     errors::InvalidArgument("DequeueManyOp requested ",
216                                             num_elements, " < 0 elements"),
217                     callback);
218 
219   if (ctx->input_dtype(0) == DT_RESOURCE) {
220     OP_REQUIRES_OK_ASYNC(
221         ctx,
222         ctx->MatchSignature({DT_RESOURCE, DT_INT32}, queue->component_dtypes()),
223         callback);
224   } else {
225     OP_REQUIRES_OK_ASYNC(ctx,
226                          ctx->MatchSignature({DT_STRING_REF, DT_INT32},
227                                              queue->component_dtypes()),
228                          callback);
229   }
230 
231   queue->TryDequeueMany(
232       num_elements, ctx, false /* allow_small_batch */,
233       [ctx, callback](const QueueInterface::Tuple& tuple) {
234         if (!ctx->status().ok()) {
235           callback();
236           return;
237         }
238         OpOutputList output_components;
239         OP_REQUIRES_OK_ASYNC(
240             ctx, ctx->output_list("components", &output_components), callback);
241         for (int i = 0; i < ctx->num_outputs(); ++i) {
242           output_components.set(i, tuple[i]);
243         }
244         callback();
245       });
246 }
247 
248 DequeueManyOp::~DequeueManyOp() = default;
249 
250 // Defines a DequeueUpToOp, the execution of which concatenates the
251 // requested number of elements from the given Queue along the 0th
252 // dimension, and emits the result as a single tuple of tensors.
253 //
254 // The difference between this op and DequeueMany is the handling when
255 // the Queue is closed.  While the DequeueMany op will return if there
256 // an error when there are less than num_elements elements left in the
257 // closed queue, this op will return between 1 and
258 // min(num_elements, elements_remaining_in_queue), and will not block.
259 // If there are no elements left, then the standard DequeueMany error
260 // is returned.
261 //
262 // This op only works if the underlying Queue implementation accepts
263 // the allow_small_batch = true parameter to TryDequeueMany.
264 // If it does not, an errors::Unimplemented exception is returned.
265 //
266 // The op has two inputs:
267 // - Input 0: the handle to a queue.
268 // - Input 1: the number of elements to dequeue.
269 //
270 // The op has k outputs, where k is the number of components in the
271 // tuples stored in the given Queue, and output i is the ith component
272 // of the dequeued tuple.
273 //
274 // The op has one attribute: allow_small_batch.  If the Queue supports
275 // it, setting this to true causes the queue to return smaller
276 // (possibly zero length) batches when it is closed, up to however
277 // many elements are available when the op executes.  In this case,
278 // the Queue does not block when closed.
DequeueUpToOp(OpKernelConstruction * context)279 DequeueUpToOp::DequeueUpToOp(OpKernelConstruction* context)
280     : QueueAccessOpKernel(context) {}
281 
ComputeAsync(OpKernelContext * ctx,QueueInterface * queue,DoneCallback callback)282 void DequeueUpToOp::ComputeAsync(OpKernelContext* ctx, QueueInterface* queue,
283                                  DoneCallback callback) {
284   const Tensor& Tnum_elements = ctx->input(1);
285   int32 num_elements = Tnum_elements.flat<int32>()(0);
286 
287   OP_REQUIRES_ASYNC(ctx, num_elements >= 0,
288                     errors::InvalidArgument("DequeueUpToOp requested ",
289                                             num_elements, " < 0 elements"),
290                     callback);
291 
292   if (ctx->input_dtype(0) == DT_RESOURCE) {
293     OP_REQUIRES_OK_ASYNC(
294         ctx,
295         ctx->MatchSignature({DT_RESOURCE, DT_INT32}, queue->component_dtypes()),
296         callback);
297   } else {
298     OP_REQUIRES_OK_ASYNC(ctx,
299                          ctx->MatchSignature({DT_STRING_REF, DT_INT32},
300                                              queue->component_dtypes()),
301                          callback);
302   }
303 
304   queue->TryDequeueMany(
305       num_elements, ctx, true /* allow_small_batch */,
306       [ctx, callback](const QueueInterface::Tuple& tuple) {
307         if (!ctx->status().ok()) {
308           callback();
309           return;
310         }
311         OpOutputList output_components;
312         OP_REQUIRES_OK_ASYNC(
313             ctx, ctx->output_list("components", &output_components), callback);
314         for (int i = 0; i < ctx->num_outputs(); ++i) {
315           output_components.set(i, tuple[i]);
316         }
317         callback();
318       });
319 }
320 
321 DequeueUpToOp::~DequeueUpToOp() = default;
322 
323 // Defines a QueueCloseOp, which closes the given Queue. Closing a
324 // Queue signals that no more elements will be enqueued in it.
325 //
326 // The op has one input, which is the handle of the appropriate Queue.
QueueCloseOp(OpKernelConstruction * context)327 QueueCloseOp::QueueCloseOp(OpKernelConstruction* context)
328     : QueueOpKernel(context) {
329   OP_REQUIRES_OK(context, context->GetAttr("cancel_pending_enqueues",
330                                            &cancel_pending_enqueues_));
331 }
332 
ComputeAsync(OpKernelContext * ctx,QueueInterface * queue,DoneCallback callback)333 void QueueCloseOp::ComputeAsync(OpKernelContext* ctx, QueueInterface* queue,
334                                 DoneCallback callback) {
335   queue->Close(ctx, cancel_pending_enqueues_, callback);
336 }
337 
338 // Defines a QueueSizeOp, which computes the number of elements in the
339 // given Queue, and emits it as an output tensor.
340 //
341 // The op has one input, which is the handle of the appropriate Queue;
342 // and one output, which is a single-element tensor containing the current
343 // size of that Queue.
QueueSizeOp(OpKernelConstruction * context)344 QueueSizeOp::QueueSizeOp(OpKernelConstruction* context)
345     : QueueOpKernel(context) {}
346 
ComputeAsync(OpKernelContext * ctx,QueueInterface * queue,DoneCallback callback)347 void QueueSizeOp::ComputeAsync(OpKernelContext* ctx, QueueInterface* queue,
348                                DoneCallback callback) {
349   Tensor* Tqueue_size = nullptr;
350   OP_REQUIRES_OK(ctx, ctx->allocate_output(0, TensorShape({}), &Tqueue_size));
351   Tqueue_size->flat<int32>().setConstant(queue->size());
352   callback();
353 }
354 
QueueIsClosedOp(OpKernelConstruction * context)355 QueueIsClosedOp::QueueIsClosedOp(OpKernelConstruction* context)
356     : QueueOpKernel(context) {}
357 
ComputeAsync(OpKernelContext * ctx,QueueInterface * queue,DoneCallback callback)358 void QueueIsClosedOp::ComputeAsync(OpKernelContext* ctx, QueueInterface* queue,
359                                    DoneCallback callback) {
360   Tensor* Tqueue_is_closed = nullptr;
361   OP_REQUIRES_OK(ctx,
362                  ctx->allocate_output(0, TensorShape({}), &Tqueue_is_closed));
363   Tqueue_is_closed->flat<bool>().setConstant(queue->is_closed());
364   callback();
365 }
366 
367 }  // namespace tensorflow
368