1 /* Copyright 2015 The TensorFlow Authors. All Rights Reserved.
2 
3 Licensed under the Apache License, Version 2.0 (the "License");
4 you may not use this file except in compliance with the License.
5 You may obtain a copy of the License at
6 
7     http://www.apache.org/licenses/LICENSE-2.0
8 
9 Unless required by applicable law or agreed to in writing, software
10 distributed under the License is distributed on an "AS IS" BASIS,
11 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 See the License for the specific language governing permissions and
13 limitations under the License.
14 ==============================================================================*/
15 
16 // See docs in ../ops/data_flow_ops.cc.
17 
18 #include <algorithm>
19 #include <deque>
20 #include <vector>
21 
22 #include "tensorflow/core/framework/node_def.pb.h"
23 #include "tensorflow/core/framework/tensor.h"
24 #include "tensorflow/core/framework/tensor_shape.h"
25 #include "tensorflow/core/framework/types.h"
26 #include "tensorflow/core/kernels/fifo_queue.h"
27 #include "tensorflow/core/kernels/queue_base.h"
28 #include "tensorflow/core/lib/core/errors.h"
29 #include "tensorflow/core/platform/logging.h"
30 #include "tensorflow/core/platform/mutex.h"
31 #include "tensorflow/core/platform/types.h"
32 #include "tensorflow/core/util/batch_util.h"
33 
34 namespace tensorflow {
35 
FIFOQueue(int capacity,const DataTypeVector & component_dtypes,const std::vector<TensorShape> & component_shapes,const string & name)36 FIFOQueue::FIFOQueue(int capacity, const DataTypeVector& component_dtypes,
37                      const std::vector<TensorShape>& component_shapes,
38                      const string& name)
39     : TypedQueue(capacity, component_dtypes, component_shapes, name) {}
40 
DequeueLocked(OpKernelContext * ctx,Tuple * tuple)41 void FIFOQueue::DequeueLocked(OpKernelContext* ctx, Tuple* tuple) {
42   DCHECK_GT(queues_[0].size(), size_t{0});
43   (*tuple).reserve(num_components());
44   for (int i = 0; i < num_components(); ++i) {
45     (*tuple).push_back(*queues_[i][0].AccessTensor(ctx));
46     queues_[i].pop_front();
47   }
48 }
49 
TryEnqueue(const Tuple & tuple,OpKernelContext * ctx,DoneCallback callback)50 void FIFOQueue::TryEnqueue(const Tuple& tuple, OpKernelContext* ctx,
51                            DoneCallback callback) {
52   CancellationManager* cm = ctx->cancellation_manager();
53   CancellationToken token = cm->get_cancellation_token();
54   bool already_cancelled;
55   {
56     mutex_lock l(mu_);
57     already_cancelled = !cm->RegisterCallback(
58         token, [this, cm, token]() { Cancel(kEnqueue, cm, token); });
59     if (!already_cancelled) {
60       enqueue_attempts_.emplace_back(
61           1, callback, ctx, cm, token,
62           [tuple, this](Attempt* attempt) TF_EXCLUSIVE_LOCKS_REQUIRED(mu_) {
63             if (closed_) {
64               attempt->context->SetStatus(
65                   errors::Cancelled("FIFOQueue '", name_, "' is closed."));
66               return kComplete;
67             }
68             if (queues_[0].size() < static_cast<size_t>(capacity_)) {
69               for (int i = 0; i < num_components(); ++i) {
70                 queues_[i].push_back(PersistentTensor(tuple[i]));
71               }
72               return kComplete;
73             } else {
74               return kNoProgress;
75             }
76           });
77     }
78   }
79   if (!already_cancelled) {
80     FlushUnlocked();
81   } else {
82     ctx->SetStatus(errors::Cancelled("Enqueue operation was cancelled"));
83     callback();
84   }
85 }
86 
87 /* static */
GetElementComponentFromBatch(const FIFOQueue::Tuple & tuple,int64 index,int component,OpKernelContext * ctx,PersistentTensor * out_tensor)88 Status FIFOQueue::GetElementComponentFromBatch(const FIFOQueue::Tuple& tuple,
89                                                int64 index, int component,
90                                                OpKernelContext* ctx,
91                                                PersistentTensor* out_tensor) {
92   TensorShape element_shape(tuple[component].shape());
93   element_shape.RemoveDim(0);
94   Tensor* element_access = nullptr;
95   TF_RETURN_IF_ERROR(ctx->allocate_persistent(
96       tuple[component].dtype(), element_shape, out_tensor, &element_access));
97   TF_RETURN_IF_ERROR(
98       batch_util::CopySliceToElement(tuple[component], element_access, index));
99   return Status::OK();
100 }
101 
TryEnqueueMany(const Tuple & tuple,OpKernelContext * ctx,DoneCallback callback)102 void FIFOQueue::TryEnqueueMany(const Tuple& tuple, OpKernelContext* ctx,
103                                DoneCallback callback) {
104   const int64 batch_size = tuple[0].dim_size(0);
105   if (batch_size == 0) {
106     callback();
107     return;
108   }
109 
110   CancellationManager* cm = ctx->cancellation_manager();
111   CancellationToken token = cm->get_cancellation_token();
112   bool already_cancelled;
113   {
114     mutex_lock l(mu_);
115     already_cancelled = !cm->RegisterCallback(
116         token, [this, cm, token]() { Cancel(kEnqueue, cm, token); });
117     if (!already_cancelled) {
118       enqueue_attempts_.emplace_back(
119           batch_size, callback, ctx, cm, token,
120           [tuple, this](Attempt* attempt) TF_EXCLUSIVE_LOCKS_REQUIRED(mu_) {
121             if (closed_) {
122               attempt->context->SetStatus(
123                   errors::Cancelled("FIFOQueue '", name_, "' is closed."));
124               return kComplete;
125             }
126             RunResult result = kNoProgress;
127             while (queues_[0].size() < static_cast<size_t>(capacity_)) {
128               result = kProgress;
129               const int64 index =
130                   tuple[0].dim_size(0) - attempt->elements_requested;
131               for (int i = 0; i < num_components(); ++i) {
132                 PersistentTensor element;
133                 attempt->context->SetStatus(GetElementComponentFromBatch(
134                     tuple, index, i, attempt->context, &element));
135                 if (!attempt->context->status().ok()) return kComplete;
136                 queues_[i].push_back(element);
137               }
138               --attempt->elements_requested;
139               if (attempt->elements_requested == 0) {
140                 return kComplete;
141               }
142             }
143             return result;
144           });
145     }
146   }
147   if (!already_cancelled) {
148     FlushUnlocked();
149   } else {
150     ctx->SetStatus(errors::Cancelled("Enqueue operation was cancelled"));
151     callback();
152   }
153 }
154 
TryDequeue(OpKernelContext * ctx,CallbackWithTuple callback)155 void FIFOQueue::TryDequeue(OpKernelContext* ctx, CallbackWithTuple callback) {
156   CancellationManager* cm = ctx->cancellation_manager();
157   CancellationToken token = cm->get_cancellation_token();
158   bool already_cancelled;
159   {
160     mutex_lock l(mu_);
161     already_cancelled = !cm->RegisterCallback(
162         token, [this, cm, token]() { Cancel(kDequeue, cm, token); });
163     if (!already_cancelled) {
164       // TODO(josh11b): This makes two copies of callback, avoid this if possible.
165       dequeue_attempts_.emplace_back(
166           1, [callback]() { callback(Tuple()); }, ctx, cm, token,
167           [callback, this](Attempt* attempt) TF_EXCLUSIVE_LOCKS_REQUIRED(mu_) {
168             const int64 queue_size = queues_[0].size();
169             if (closed_ && queue_size == 0) {
170               attempt->context->SetStatus(errors::OutOfRange(
171                   "FIFOQueue '", name_, "' is closed and has ",
172                   "insufficient elements (requested ", 1, ", current size ",
173                   queue_size, ")"));
174               return kComplete;
175             }
176             if (queue_size > 0) {
177               Tuple tuple;
178               DequeueLocked(attempt->context, &tuple);
179               attempt->done_callback = [callback, tuple]() { callback(tuple); };
180               return kComplete;
181             } else {
182               return kNoProgress;
183             }
184           });
185     }
186   }
187   if (!already_cancelled) {
188     FlushUnlocked();
189   } else {
190     ctx->SetStatus(errors::Cancelled("Dequeue operation was cancelled"));
191     callback(Tuple());
192   }
193 }
194 
TryDequeueMany(int num_elements,OpKernelContext * ctx,bool allow_small_batch,CallbackWithTuple callback)195 void FIFOQueue::TryDequeueMany(int num_elements, OpKernelContext* ctx,
196                                bool allow_small_batch,
197                                CallbackWithTuple callback) {
198   if (!specified_shapes()) {
199     ctx->SetStatus(errors::InvalidArgument(
200         "FIFOQueue's DequeueMany and DequeueUpTo require the "
201         "components to have specified shapes."));
202     callback(Tuple());
203     return;
204   }
205   if (num_elements == 0) {
206     Tuple tuple;
207     tuple.reserve(num_components());
208     for (int i = 0; i < num_components(); ++i) {
209       // TODO(josh11b,misard): Switch to allocate_output().  Problem is
210       // this breaks the abstraction boundary since we don't *really*
211       // know if and how the Tensors in the tuple we pass to callback
212       // correspond to the outputs of *ctx.  For example, the
213       // ReaderRead Op uses TryDequeue() to get a filename out of a
214       // queue that is used internally by the reader and is not
215       // associated with any output of the ReaderRead.
216       // mrry@ adds:
217       // Maybe we need to pass a std::function<Tensor*(...)> (or
218       // better signature) that calls the appropriate allocator
219       // function in addition to ctx?  (Or support a shim Allocator
220       // that has an internal OpKernelContext*, and dispatches to the
221       // appropriate method?)
222       // misard@ adds:
223       // I don't see that a std::function would help. The problem is
224       // that at this point (allocation time) the system doesn't know
225       // what is going to happen to the element read out of the
226       // queue. As long as we keep the generality that TensorFlow Ops
227       // do their own dynamic allocation in arbitrary C++ code, we
228       // need to preserve robustness to allocating output Tensors with
229       // the 'wrong' attributes, and fixing up with a copy. The only
230       // improvement I can see here in the future would be to support
231       // an optimized case where the queue 'knows' what attributes to
232       // use, and plumbs them through here.
233       Tensor element;
234       Status status = ctx->allocate_temp(component_dtypes_[i],
235                                          ManyOutShape(i, 0), &element);
236       if (!status.ok()) {
237         ctx->SetStatus(status);
238         callback(Tuple());
239         return;
240       }
241       tuple.emplace_back(element);
242     }
243     callback(tuple);
244     return;
245   }
246 
247   CancellationManager* cm = ctx->cancellation_manager();
248   CancellationToken token = cm->get_cancellation_token();
249   bool already_cancelled;
250   {
251     mutex_lock l(mu_);
252     already_cancelled = !cm->RegisterCallback(
253         token, [this, cm, token]() { Cancel(kDequeue, cm, token); });
254     if (!already_cancelled) {
255       // TODO(josh11b): This makes two copies of callback, avoid this if possible.
256       dequeue_attempts_.emplace_back(
257           num_elements, [callback]() { callback(Tuple()); }, ctx, cm, token,
258           [callback, allow_small_batch,
259            this](Attempt* attempt) TF_EXCLUSIVE_LOCKS_REQUIRED(mu_) {
260             int64 queue_size = queues_[0].size();
261 
262             if (closed_ && queue_size < attempt->elements_requested) {
263               // If we don't have enough for a full dequeue, we have
264               // to reset the attempt tuple.
265               if (!attempt->tuple.empty()) {
266                 // Restore already-dequeued elements to the front of the
267                 // queue.
268                 for (int64 i = attempt->tuple[0].dim_size(0) -
269                                attempt->elements_requested - 1;
270                      i >= 0; --i) {
271                   for (int j = 0; j < num_components(); ++j) {
272                     PersistentTensor element;
273                     Status s = GetElementComponentFromBatch(
274                         attempt->tuple, i, j, attempt->context, &element);
275                     if (!s.ok()) {
276                       attempt->context->SetStatus(
277                           errors::DataLoss("Failed to restore element from "
278                                            "partially-dequeued batch "
279                                            "to FIFOQueue: ",
280                                            s.error_message()));
281                     }
282                     queues_[j].push_front(element);
283                   }
284                 }
285               }
286               if (allow_small_batch && !queues_[0].empty()) {
287                 // Request all remaining elements in the queue.
288                 queue_size = queues_[0].size();
289                 attempt->tuple.clear();
290                 attempt->elements_requested = queue_size;
291               } else {
292                 if (allow_small_batch) {
293                   // There may be some other attempts containing
294                   // values.  If so, we'll yield and wait for them
295                   // to add elements to the queue.
296                   if (!enqueue_attempts_.empty()) return kProgress;
297                 }
298                 if (attempt->context->status().ok()) {
299                   attempt->context->SetStatus(errors::OutOfRange(
300                       "FIFOQueue '", name_, "' is closed and has ",
301                       "insufficient elements (requested ",
302                       attempt->elements_requested, ", current size ",
303                       queue_size, ")"));
304                 }
305                 return kComplete;
306               }
307             }
308 
309             RunResult result = kNoProgress;
310             for (; queue_size > 0; --queue_size) {
311               if (attempt->tuple.empty()) {
312                 // Only allocate tuple when we have something to dequeue
313                 // so we don't use excessive memory when there are many
314                 // blocked dequeue attempts waiting.
315                 attempt->tuple.reserve(num_components());
316                 for (int i = 0; i < num_components(); ++i) {
317                   const TensorShape shape =
318                       ManyOutShape(i, attempt->elements_requested);
319                   Tensor element;
320                   attempt->context->SetStatus(attempt->context->allocate_temp(
321                       component_dtypes_[i], shape, &element));
322                   if (!attempt->context->status().ok()) return kComplete;
323                   attempt->tuple.emplace_back(element);
324                 }
325               }
326               result = kProgress;
327               Tuple tuple;
328               DequeueLocked(attempt->context, &tuple);
329               const int64 index =
330                   attempt->tuple[0].dim_size(0) - attempt->elements_requested;
331               for (int i = 0; i < num_components(); ++i) {
332                 attempt->context->SetStatus(batch_util::CopyElementToSlice(
333                     std::move(tuple[i]), &attempt->tuple[i], index));
334                 if (!attempt->context->status().ok()) return kComplete;
335               }
336               tuple.clear();
337               --attempt->elements_requested;
338               if (attempt->elements_requested == 0) {
339                 tuple = attempt->tuple;
340                 attempt->done_callback = [callback, tuple]() {
341                   callback(tuple);
342                 };
343                 return kComplete;
344               }
345             }
346             return result;
347           });
348     }
349   }
350   if (!already_cancelled) {
351     FlushUnlocked();
352   } else {
353     ctx->SetStatus(errors::Cancelled("Dequeue operation was cancelled"));
354     callback(Tuple());
355   }
356 }
357 
MatchesNodeDef(const NodeDef & node_def)358 Status FIFOQueue::MatchesNodeDef(const NodeDef& node_def) {
359   if (!MatchesNodeDefOp(node_def, "FIFOQueue").ok() &&
360       !MatchesNodeDefOp(node_def, "FIFOQueueV2").ok()) {
361     return errors::InvalidArgument("Expected FIFOQueue, found ", node_def.op());
362   }
363   TF_RETURN_IF_ERROR(MatchesNodeDefCapacity(node_def, capacity_));
364   TF_RETURN_IF_ERROR(MatchesNodeDefTypes(node_def));
365   TF_RETURN_IF_ERROR(MatchesNodeDefShapes(node_def));
366   return Status::OK();
367 }
368 
369 // Defines a FIFOQueueOp, which produces a Queue (specifically, one
370 // backed by FIFOQueue) that persists across different graph
371 // executions, and sessions. Running this op produces a single-element
372 // tensor of handles to Queues in the corresponding device.
FIFOQueueOp(OpKernelConstruction * context)373 FIFOQueueOp::FIFOQueueOp(OpKernelConstruction* context)
374     : TypedQueueOp(context) {
375   OP_REQUIRES_OK(context, context->GetAttr("shapes", &component_shapes_));
376 }
377 
CreateResource(QueueInterface ** ret)378 Status FIFOQueueOp::CreateResource(QueueInterface** ret) {
379   FIFOQueue* queue = new FIFOQueue(capacity_, component_types_,
380                                    component_shapes_, cinfo_.name());
381   return CreateTypedQueue(queue, ret);
382 }
383 
384 }  // namespace tensorflow
385