1 /* Copyright 2015 The TensorFlow Authors. All Rights Reserved.
2 
3 Licensed under the Apache License, Version 2.0 (the "License");
4 you may not use this file except in compliance with the License.
5 You may obtain a copy of the License at
6 
7     http://www.apache.org/licenses/LICENSE-2.0
8 
9 Unless required by applicable law or agreed to in writing, software
10 distributed under the License is distributed on an "AS IS" BASIS,
11 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 See the License for the specific language governing permissions and
13 limitations under the License.
14 ==============================================================================*/
15 
16 #ifndef TENSORFLOW_CORE_KERNELS_QUEUE_OP_H_
17 #define TENSORFLOW_CORE_KERNELS_QUEUE_OP_H_
18 
19 #include <deque>
20 
21 #include "tensorflow/core/framework/op_kernel.h"
22 #include "tensorflow/core/framework/queue_interface.h"
23 #include "tensorflow/core/framework/resource_op_kernel.h"
24 #include "tensorflow/core/framework/tensor.h"
25 #include "tensorflow/core/framework/types.h"
26 #include "tensorflow/core/kernels/queue_base.h"
27 #include "tensorflow/core/lib/core/errors.h"
28 #include "tensorflow/core/platform/macros.h"
29 #include "tensorflow/core/platform/types.h"
30 
31 namespace tensorflow {
32 
33 // Defines a QueueOp, an abstract class for Queue construction ops.
34 class QueueOp : public ResourceOpKernel<QueueInterface> {
35  public:
36   QueueOp(OpKernelConstruction* context);
37 
38   void Compute(OpKernelContext* context) override;
39 
40  protected:
41   // Variables accessible by subclasses
42   int32 capacity_;
43   DataTypeVector component_types_;
44 
45  private:
46   Status VerifyResource(QueueInterface* queue) override;
47 };
48 
49 class TypedQueueOp : public QueueOp {
50  public:
51   using QueueOp::QueueOp;
52 
53  protected:
54   template <typename TypedQueue>
CreateTypedQueue(TypedQueue * queue,QueueInterface ** ret)55   Status CreateTypedQueue(TypedQueue* queue, QueueInterface** ret) {
56     if (queue == nullptr) {
57       return errors::ResourceExhausted("Failed to allocate queue.");
58     }
59     *ret = queue;
60     return queue->Initialize();
61   }
62 };
63 
64 // Queue manipulator kernels
65 
66 class QueueOpKernel : public AsyncOpKernel {
67  public:
68   explicit QueueOpKernel(OpKernelConstruction* context);
69 
70   void ComputeAsync(OpKernelContext* ctx, DoneCallback callback) final;
71 
72  protected:
73   virtual void ComputeAsync(OpKernelContext* ctx, QueueInterface* queue,
74                             DoneCallback callback) = 0;
75 };
76 
77 class QueueAccessOpKernel : public QueueOpKernel {
78  public:
79   explicit QueueAccessOpKernel(OpKernelConstruction* context);
80 
81  protected:
82   int64 timeout_;
83 };
84 
85 // Defines an EnqueueOp, the execution of which enqueues a tuple of
86 // tensors in the given Queue.
87 //
88 // The op has 1 + k inputs, where k is the number of components in the
89 // tuples stored in the given Queue:
90 // - Input 0: queue handle.
91 // - Input 1: 0th element of the tuple.
92 // - ...
93 // - Input (1+k): kth element of the tuple.
94 class EnqueueOp : public QueueAccessOpKernel {
95  public:
96   explicit EnqueueOp(OpKernelConstruction* context);
97 
98  protected:
99   void ComputeAsync(OpKernelContext* ctx, QueueInterface* queue,
100                     DoneCallback callback) override;
101 
102  private:
103   TF_DISALLOW_COPY_AND_ASSIGN(EnqueueOp);
104 };
105 
106 // Defines an EnqueueManyOp, the execution of which slices each
107 // component of a tuple of tensors along the 0th dimension, and
108 // enqueues tuples of slices in the given Queue.
109 //
110 // The op has 1 + k inputs, where k is the number of components in the
111 // tuples stored in the given Queue:
112 // - Input 0: queue handle.
113 // - Input 1: 0th element of the tuple.
114 // - ...
115 // - Input (1+k): kth element of the tuple.
116 //
117 // N.B. All tuple components must have the same size in the 0th
118 // dimension.
119 class EnqueueManyOp : public QueueAccessOpKernel {
120  public:
121   explicit EnqueueManyOp(OpKernelConstruction* context);
122 
123  protected:
124   void ComputeAsync(OpKernelContext* ctx, QueueInterface* queue,
125                     DoneCallback callback) override;
126 
127   ~EnqueueManyOp() override;
128 
129  private:
130   TF_DISALLOW_COPY_AND_ASSIGN(EnqueueManyOp);
131 };
132 
133 // Defines a DequeueOp, the execution of which dequeues a tuple of
134 // tensors from the given Queue.
135 //
136 // The op has one input, which is the handle of the appropriate
137 // Queue. The op has k outputs, where k is the number of components in
138 // the tuples stored in the given Queue, and output i is the ith
139 // component of the dequeued tuple.
140 class DequeueOp : public QueueAccessOpKernel {
141  public:
142   explicit DequeueOp(OpKernelConstruction* context);
143 
144  protected:
145   void ComputeAsync(OpKernelContext* ctx, QueueInterface* queue,
146                     DoneCallback callback) override;
147 
148   ~DequeueOp() override;
149 
150  private:
151   TF_DISALLOW_COPY_AND_ASSIGN(DequeueOp);
152 };
153 
154 // Defines a DequeueManyOp, the execution of which concatenates the
155 // requested number of elements from the given Queue along the 0th
156 // dimension, and emits the result as a single tuple of tensors.
157 //
158 // The op has two inputs:
159 // - Input 0: the handle to a queue.
160 // - Input 1: the number of elements to dequeue.
161 //
162 // The op has k outputs, where k is the number of components in the
163 // tuples stored in the given Queue, and output i is the ith component
164 // of the dequeued tuple.
165 class DequeueManyOp : public QueueAccessOpKernel {
166  public:
167   explicit DequeueManyOp(OpKernelConstruction* context);
168 
169  protected:
170   void ComputeAsync(OpKernelContext* ctx, QueueInterface* queue,
171                     DoneCallback callback) override;
172 
173   ~DequeueManyOp() override;
174 
175  private:
176   TF_DISALLOW_COPY_AND_ASSIGN(DequeueManyOp);
177 };
178 
179 // Defines a DequeueUpToOp, the execution of which concatenates the
180 // requested number of elements from the given Queue along the 0th
181 // dimension, and emits the result as a single tuple of tensors.
182 //
183 // The difference between this op and DequeueMany is the handling when
184 // the Queue is closed.  While the DequeueMany op will return if there
185 // an error when there are less than num_elements elements left in the
186 // closed queue, this op will return between 1 and
187 // min(num_elements, elements_remaining_in_queue), and will not block.
188 // If there are no elements left, then the standard DequeueMany error
189 // is returned.
190 //
191 // This op only works if the underlying Queue implementation accepts
192 // the allow_small_batch = true parameter to TryDequeueMany.
193 // If it does not, an errors::Unimplemented exception is returned.
194 //
195 // The op has two inputs:
196 // - Input 0: the handle to a queue.
197 // - Input 1: the number of elements to dequeue.
198 //
199 // The op has k outputs, where k is the number of components in the
200 // tuples stored in the given Queue, and output i is the ith component
201 // of the dequeued tuple.
202 //
203 // The op has one attribute: allow_small_batch.  If the Queue supports
204 // it, setting this to true causes the queue to return smaller
205 // (possibly zero length) batches when it is closed, up to however
206 // many elements are available when the op executes.  In this case,
207 // the Queue does not block when closed.
208 class DequeueUpToOp : public QueueAccessOpKernel {
209  public:
210   explicit DequeueUpToOp(OpKernelConstruction* context);
211 
212  protected:
213   void ComputeAsync(OpKernelContext* ctx, QueueInterface* queue,
214                     DoneCallback callback) override;
215 
216   ~DequeueUpToOp() override;
217 
218  private:
219   TF_DISALLOW_COPY_AND_ASSIGN(DequeueUpToOp);
220 };
221 
222 // Defines a QueueCloseOp, which closes the given Queue. Closing a
223 // Queue signals that no more elements will be enqueued in it.
224 //
225 // The op has one input, which is the handle of the appropriate Queue.
226 class QueueCloseOp : public QueueOpKernel {
227  public:
228   explicit QueueCloseOp(OpKernelConstruction* context);
229 
230  protected:
231   void ComputeAsync(OpKernelContext* ctx, QueueInterface* queue,
232                     DoneCallback callback) override;
233 
234  private:
235   bool cancel_pending_enqueues_;
236   TF_DISALLOW_COPY_AND_ASSIGN(QueueCloseOp);
237 };
238 
239 // Defines a QueueSizeOp, which computes the number of elements in the
240 // given Queue, and emits it as an output tensor.
241 //
242 // The op has one input, which is the handle of the appropriate Queue;
243 // and one output, which is a single-element tensor containing the current
244 // size of that Queue.
245 class QueueSizeOp : public QueueOpKernel {
246  public:
247   explicit QueueSizeOp(OpKernelConstruction* context);
248 
249  protected:
250   void ComputeAsync(OpKernelContext* ctx, QueueInterface* queue,
251                     DoneCallback callback) override;
252 
253  private:
254   TF_DISALLOW_COPY_AND_ASSIGN(QueueSizeOp);
255 };
256 
257 class QueueIsClosedOp : public QueueOpKernel {
258  public:
259   explicit QueueIsClosedOp(OpKernelConstruction* context);
260 
261  protected:
262   void ComputeAsync(OpKernelContext* ctx, QueueInterface* queue,
263                     DoneCallback callback) override;
264 
265  private:
266   TF_DISALLOW_COPY_AND_ASSIGN(QueueIsClosedOp);
267 };
268 
269 }  // namespace tensorflow
270 
271 #endif  // TENSORFLOW_CORE_KERNELS_QUEUE_OP_H_
272