1 /* Copyright 2015 The TensorFlow Authors. All Rights Reserved.
2 
3 Licensed under the Apache License, Version 2.0 (the "License");
4 you may not use this file except in compliance with the License.
5 You may obtain a copy of the License at
6 
7     http://www.apache.org/licenses/LICENSE-2.0
8 
9 Unless required by applicable law or agreed to in writing, software
10 distributed under the License is distributed on an "AS IS" BASIS,
11 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 See the License for the specific language governing permissions and
13 limitations under the License.
14 ==============================================================================*/
15 
16 #ifndef TENSORFLOW_CORE_FRAMEWORK_QUEUE_INTERFACE_H_
17 #define TENSORFLOW_CORE_FRAMEWORK_QUEUE_INTERFACE_H_
18 
19 #include <string>
20 #include <vector>
21 
22 #include "tensorflow/core/framework/op_kernel.h"
23 #include "tensorflow/core/framework/resource_mgr.h"
24 #include "tensorflow/core/framework/tensor.h"
25 #include "tensorflow/core/framework/types.h"
26 #include "tensorflow/core/platform/types.h"
27 
28 namespace tensorflow {
29 
30 // All implementations must be thread-safe.
31 class QueueInterface : public ResourceBase {
32  public:
33   typedef std::vector<Tensor> Tuple;
34   typedef AsyncOpKernel::DoneCallback DoneCallback;
35   typedef std::function<void(const Tuple&)> CallbackWithTuple;
36 
37   virtual Status ValidateTuple(const Tuple& tuple) = 0;
38   virtual Status ValidateManyTuple(const Tuple& tuple) = 0;
39 
40   // Stashes a function object for future execution, that will eventually
41   // enqueue the tuple of tensors into the queue, and returns immediately. The
42   // function object is guaranteed to call 'callback'.
43   virtual void TryEnqueue(const Tuple& tuple, OpKernelContext* ctx,
44                           DoneCallback callback) = 0;
45 
46   // Same as above, but the component tensors are sliced along the 0th dimension
47   // to make multiple queue-element components.
48   virtual void TryEnqueueMany(const Tuple& tuple, OpKernelContext* ctx,
49                               DoneCallback callback) = 0;
50 
51   // Stashes a function object for future execution, that will eventually
52   // dequeue an element from the queue and call 'callback' with that tuple
53   // element as argument.
54   virtual void TryDequeue(OpKernelContext* ctx, CallbackWithTuple callback) = 0;
55 
56   // Same as above, but the stashed function object will attempt to dequeue
57   // num_elements items.  If allow_small_batch is true, and the Queue is
58   // closed but at least 1 element is available, there is no blocking
59   // and between 1 and num_elements items are immediately returned.
60   // If the queue does not support the allow_small_batch flag will
61   // return an Unimplemented error.
62   virtual void TryDequeueMany(int num_elements, OpKernelContext* ctx,
63                               bool allow_small_batch,
64                               CallbackWithTuple callback) = 0;
65 
66   // Signals that no more elements will be enqueued, and optionally
67   // cancels pending Enqueue(Many) operations.
68   //
69   // After calling this function, subsequent calls to Enqueue(Many)
70   // will fail. If `cancel_pending_enqueues` is true, all pending
71   // calls to Enqueue(Many) will fail as well.
72   //
73   // After calling this function, all current and subsequent calls to
74   // Dequeue(Many) will fail instead of blocking (though they may
75   // succeed if they can be satisfied by the elements in the queue at
76   // the time it was closed).
77   virtual void Close(OpKernelContext* ctx, bool cancel_pending_enqueues,
78                      DoneCallback callback) = 0;
79 
80   // Returns true if a given queue is closed and false if it is open.
81   virtual bool is_closed() const = 0;
82 
83   // Assuming *this represents a shared queue, verify that it matches
84   // another instantiation indicated by node_def.
85   virtual Status MatchesNodeDef(const NodeDef& node_def) = 0;
86 
87   // Returns the number of elements in the queue.
88   virtual int32 size() const = 0;
89 
90   virtual const DataTypeVector& component_dtypes() const = 0;
91 
DebugString()92   string DebugString() const override {
93     return strings::StrCat("A Queue of size: ", size());
94   }
95 
96  protected:
~QueueInterface()97   virtual ~QueueInterface() {}
98 };
99 
100 }  // namespace tensorflow
101 
102 #endif  // TENSORFLOW_CORE_FRAMEWORK_QUEUE_INTERFACE_H_
103