1 /* Copyright 2016 The TensorFlow Authors. All Rights Reserved.
2 
3 Licensed under the Apache License, Version 2.0 (the "License");
4 you may not use this file except in compliance with the License.
5 You may obtain a copy of the License at
6 
7     http://www.apache.org/licenses/LICENSE-2.0
8 
9 Unless required by applicable law or agreed to in writing, software
10 distributed under the License is distributed on an "AS IS" BASIS,
11 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 See the License for the specific language governing permissions and
13 limitations under the License.
14 ==============================================================================*/
15 
16 #ifndef TENSORFLOW_CORE_KERNELS_BATCHING_UTIL_SHARED_BATCH_SCHEDULER_H_
17 #define TENSORFLOW_CORE_KERNELS_BATCHING_UTIL_SHARED_BATCH_SCHEDULER_H_
18 
19 #include <stddef.h>
20 #include <deque>
21 #include <functional>
22 #include <list>
23 #include <memory>
24 #include <string>
25 #include <utility>
26 #include <vector>
27 
28 #include "tensorflow/core/kernels/batching_util/batch_scheduler.h"
29 #include "tensorflow/core/kernels/batching_util/periodic_function.h"
30 #include "tensorflow/core/lib/core/errors.h"
31 #include "tensorflow/core/lib/core/status.h"
32 #include "tensorflow/core/lib/strings/strcat.h"
33 #include "tensorflow/core/platform/byte_order.h"
34 #include "tensorflow/core/platform/cpu_info.h"
35 #include "tensorflow/core/platform/env.h"
36 #include "tensorflow/core/platform/thread_annotations.h"
37 #include "tensorflow/core/platform/types.h"
38 
39 namespace tensorflow {
40 namespace serving {
41 namespace internal {
42 template <typename TaskType>
43 class Queue;
44 }  // namespace internal
45 }  // namespace serving
46 }  // namespace tensorflow
47 
48 namespace tensorflow {
49 namespace serving {
50 
51 // A batch scheduler for server instances that service multiple request types
52 // (e.g. multiple machine-learned models, or multiple versions of a model served
53 // concurrently), or even multiple distinct tasks for a given request. The
54 // scheduler multiplexes batches of different kinds of tasks onto a fixed-size
55 // thread pool (each batch contains tasks of a single type), in a carefully
56 // controlled manner. A common configuration is to set the number of threads
57 // equal to the number of hardware accelerator units, in which case the
58 // scheduler takes care of multiplexing the task types onto the shared hardware,
59 // in a manner that is both fair and efficient.
60 //
61 // Semantically, SharedBatchScheduler behaves like having N instances of
62 // BasicBatchScheduler (see basic_batch_scheduler.h), one per task type. The
63 // difference is that under the covers there is a single shared thread pool,
64 // instead of N independent ones, with their sharing deliberately coordinated.
65 //
66 // SharedBatchScheduler does not implement the BatchScheduler API; rather, it
67 // presents an abstraction of "queues", where each queue corresponds to one type
68 // of task. Tasks submitted to a given queue are placed in their own batches,
69 // and cannot be mixed with other tasks. Queues can be added and deleted
70 // dynamically, to accommodate e.g. versions of a model being brought up and
71 // down over the lifetime of a server.
72 //
73 // The batch thread pool round-robins through the queues, running one batch
74 // from a queue and then moving to the next queue. Each queue behaves like a
75 // BasicBatchScheduler instance, in the sense that it has maximum batch size and
76 // timeout parameters, which govern when a batch is eligible to be processed.
77 //
78 // Each queue is independently configured with a maximum size (in terms of the
79 // maximum number of batches worth of enqueued tasks). For online serving, it is
80 // recommended that the queue sizes be configured such that the sum of the sizes
81 // of the active queues roughly equal the number of batch threads. (The idea is
82 // that if all threads become available at roughly the same time, there will be
83 // enough enqueued work for them to take on, but no more.)
84 //
85 // If queue sizes are configured in the manner suggested above, the maximum time
86 // a task can spend in a queue before being placed in a batch and assigned to a
87 // thread for processing, is the greater of:
88 //  - the maximum time to process one batch of tasks from any active queue
89 //  - the configured timeout parameter for the task's queue (which can be 0)
90 //
91 // For bulk processing jobs and throughput-oriented benchmarks, you may want to
92 // set the maximum queue size to a large value.
93 //
94 // TODO(b/26539183): Support queue servicing policies other than round-robin.
95 // E.g. let each queue specify a "share" (an int >= 1), so e.g. with queues A
96 // and B having shares 1 and 2 respectively, the servicing pattern is ABBABB...
97 //
98 //
99 // PERFORMANCE TUNING: See README.md.
100 //
101 template <typename TaskType>
102 class SharedBatchScheduler
103     : public std::enable_shared_from_this<SharedBatchScheduler<TaskType>> {
104  public:
105   // TODO(b/25089730): Tune defaults based on best practices as they develop.
106   struct Options {
107     // The name to use for the pool of batch threads.
108     string thread_pool_name = {"batch_threads"};
109 
110     // The number of threads to use to process batches.
111     // Must be >= 1, and should be tuned carefully.
112     int num_batch_threads = port::NumSchedulableCPUs();
113 
114     // The environment to use.
115     // (Typically only overridden by test code.)
116     Env* env = Env::Default();
117   };
118   // Ownership is shared between the caller of Create() and any queues created
119   // via AddQueue().
120   static Status Create(
121       const Options& options,
122       std::shared_ptr<SharedBatchScheduler<TaskType>>* scheduler);
123 
124   ~SharedBatchScheduler();
125 
126   // Adds a queue to which tasks may be submitted. The returned queue implements
127   // the BatchScheduler API. Each queue has its own set of scheduling options,
128   // and its own callback to process batches of tasks submitted to the queue.
129   //
130   // The returned queue's destructor blocks until all tasks submitted to it have
131   // been processed.
132   struct QueueOptions {
133     // The maximum size of each batch.
134     //
135     // The scheduler may form batches of any size between 1 and this number
136     // (inclusive). If there is a need to quantize the batch sizes, i.e. only
137     // submit batches whose size is in a small set of allowed sizes, that can be
138     // done by adding padding in the process-batch callback.
139     size_t max_batch_size = 1000;
140 
141     // If a task has been enqueued for this amount of time (in microseconds),
142     // and a thread is available, the scheduler will immediately form a batch
143     // from enqueued tasks and assign the batch to the thread for processing,
144     // even if the batch's size is below 'max_batch_size'.
145     //
146     // This parameter offers a way to bound queue latency, so that a task isn't
147     // stuck in the queue indefinitely waiting for enough tasks to arrive to
148     // make a full batch. (The latency bound is given in the class documentation
149     // above.)
150     //
151     // The goal is to smooth out batch sizes under low request rates, and thus
152     // avoid latency spikes.
153     int64 batch_timeout_micros = 0;
154 
155     // The maximum allowable number of enqueued (accepted by Schedule() but
156     // not yet being processed on a batch thread) tasks in terms of batches.
157     // If this limit is reached, Schedule() will return an UNAVAILABLE error.
158     // See the class documentation above for guidelines on how to tune this
159     // parameter.
160     size_t max_enqueued_batches = 10;
161   };
162   Status AddQueue(const QueueOptions& options,
163                   std::function<void(std::unique_ptr<Batch<TaskType>>)>
164                       process_batch_callback,
165                   std::unique_ptr<BatchScheduler<TaskType>>* queue);
166 
167  private:
168   explicit SharedBatchScheduler(const Options& options);
169 
170   // The code executed in 'batch_threads_'. Obtains a batch to process from the
171   // queue pointed to by 'next_queue_to_schedule_', and processes it. If that
172   // queue declines to provide a batch to process, moves onto the next queue. If
173   // no queues provide a batch to process, just sleeps briefly and exits.
174   void ThreadLogic();
175 
176   const Options options_;
177 
178   mutex mu_;
179 
180   // A list of queues. (We use std::list instead of std::vector to ensure that
181   // iterators are not invalidated by adding/removing elements. It also offers
182   // efficient removal of elements from the middle.)
183   using QueueList = std::list<std::unique_ptr<internal::Queue<TaskType>>>;
184 
185   // All "active" queues, i.e. ones that either:
186   //  - have not been removed, or
187   //  - have been removed but are not yet empty.
188   QueueList queues_ GUARDED_BY(mu_);
189 
190   // An iterator over 'queues_', pointing to the queue from which the next
191   // available batch thread should grab work.
192   typename QueueList::iterator next_queue_to_schedule_ GUARDED_BY(mu_);
193 
194   // Used by idle batch threads to wait for work to enter the system. Notified
195   // whenever a batch becomes schedulable.
196   condition_variable schedulable_batch_cv_;
197 
198   // Threads that process batches obtained from the queues.
199   std::vector<std::unique_ptr<PeriodicFunction>> batch_threads_;
200 
201   TF_DISALLOW_COPY_AND_ASSIGN(SharedBatchScheduler);
202 };
203 
204 //////////
205 // Implementation details follow. API users need not read.
206 
207 namespace internal {
208 
209 // A task queue for SharedBatchScheduler. Accepts tasks and accumulates them
210 // into batches, and dispenses those batches to be processed via a "pull"
211 // interface. The queue's behavior is governed by maximum batch size, timeout
212 // and maximum queue length parameters; see their documentation in
213 // SharedBatchScheduler.
214 //
215 // The queue is implemented as a deque of batches, with these invariants:
216 //  - The number of batches is between 1 and 'options_.max_enqueued_batches'.
217 //  - The back-most batch is open; the rest are closed.
218 //
219 // Submitted tasks are added to the open batch. If that batch doesn't have room
220 // but the queue isn't full, then that batch is closed and a new open batch is
221 // started.
222 //
223 // Batch pull requests are handled by dequeuing the front-most batch if it is
224 // closed. If the front-most batch is open (i.e. the queue contains only one
225 // batch) and has reached the timeout, it is immediately closed and returned;
226 // otherwise no batch is returned for the request.
227 template <typename TaskType>
228 class Queue {
229  public:
230   using ProcessBatchCallback =
231       std::function<void(std::unique_ptr<Batch<TaskType>>)>;
232   using SchedulableBatchCallback = std::function<void()>;
233   Queue(const typename SharedBatchScheduler<TaskType>::QueueOptions& options,
234         Env* env, ProcessBatchCallback process_batch_callback,
235         SchedulableBatchCallback schdulable_batch_callback);
236 
237   // Illegal to destruct unless the queue is empty.
238   ~Queue();
239 
240   // Submits a task to the queue, with the same semantics as
241   // BatchScheduler::Schedule().
242   Status Schedule(std::unique_ptr<TaskType>* task);
243 
244   // Returns the number of enqueued tasks, with the same semantics as
245   // BatchScheduler::NumEnqueuedTasks().
246   size_t NumEnqueuedTasks() const;
247 
248   // Returns the queue capacity, with the same semantics as
249   // BatchScheduler::SchedulingCapacity().
250   size_t SchedulingCapacity() const;
251 
252   // Returns the maximum allowed size of tasks submitted to the queue.
max_task_size()253   size_t max_task_size() const { return options_.max_batch_size; }
254 
255   // Called by a thread that is ready to process a batch, to request one from
256   // this queue. Either returns a batch that is ready to be processed, or
257   // nullptr if the queue declines to schedule a batch at this time. If it
258   // returns a batch, the batch is guaranteed to be closed.
259   std::unique_ptr<Batch<TaskType>> ScheduleBatch();
260 
261   // Processes a batch that has been returned earlier by ScheduleBatch().
262   void ProcessBatch(std::unique_ptr<Batch<TaskType>> batch);
263 
264   // Determines whether the queue is empty, i.e. has no tasks waiting or being
265   // processed.
266   bool IsEmpty() const;
267 
268   // Marks the queue closed, and waits until it is empty.
269   void CloseAndWaitUntilEmpty();
270 
closed()271   bool closed() const {
272     mutex_lock l(mu_);
273     return closed_;
274   }
275 
276  private:
277   // Same as IsEmpty(), but assumes the caller already holds a lock on 'mu_'.
278   bool IsEmptyInternal() const EXCLUSIVE_LOCKS_REQUIRED(mu_);
279 
280   // Closes the open batch residing at the back of 'batches_', and inserts a
281   // fresh open batch behind it.
282   void StartNewBatch() EXCLUSIVE_LOCKS_REQUIRED(mu_);
283 
284   // Determines whether the open batch residing at the back of 'batches_' is
285   // currently schedulable.
286   bool IsOpenBatchSchedulable() const EXCLUSIVE_LOCKS_REQUIRED(mu_);
287 
288   const typename SharedBatchScheduler<TaskType>::QueueOptions options_;
289 
290   // The environment to use.
291   Env* env_;
292 
293   // A callback invoked to processes a batch of work units. Always invoked from
294   // a batch thread.
295   ProcessBatchCallback process_batch_callback_;
296 
297   // A callback invoked to notify the scheduler that a new batch has become
298   // schedulable.
299   SchedulableBatchCallback schedulable_batch_callback_;
300 
301   mutable mutex mu_;
302 
303   // Whether this queue can accept new tasks. This variable is monotonic: it
304   // starts as false, and then at some point gets set to true and remains true
305   // for the duration of this object's life.
306   bool closed_ GUARDED_BY(mu_) = false;
307 
308   // The enqueued batches. See the invariants in the class comments above.
309   std::deque<std::unique_ptr<Batch<TaskType>>> batches_ GUARDED_BY(mu_);
310 
311   // The time at which the first task was added to the open (back-most) batch
312   // in 'batches_'. Valid iff that batch contains at least one task.
313   uint64 open_batch_start_time_micros_ GUARDED_BY(mu_);
314 
315   // Whether this queue contains a batch that is eligible to be scheduled. Used
316   // to keep track of when to call 'schedulable_batch_callback_'.
317   bool schedulable_batch_ GUARDED_BY(mu_) = false;
318 
319   // The number of batches currently being processed by batch threads.
320   // Incremented in ScheduleBatch() and decremented in ProcessBatch().
321   int num_batches_being_processed_ GUARDED_BY(mu_) = 0;
322 
323   // Used by CloseAndWaitUntilEmpty() to wait until the queue is empty, for the
324   // case in which the queue is not empty when CloseAndWaitUntilEmpty() starts.
325   // When ProcessBatch() dequeues the last batch and makes the queue empty, if
326   // 'empty_notification_' is non-null it calls 'empty_notification_->Notify()'.
327   Notification* empty_notification_ GUARDED_BY(mu_) = nullptr;
328 
329   TF_DISALLOW_COPY_AND_ASSIGN(Queue);
330 };
331 
332 // A RAII-style object that points to a Queue and implements
333 // the BatchScheduler API. To be handed out to clients who call AddQueue().
334 template <typename TaskType>
335 class QueueHandle : public BatchScheduler<TaskType> {
336  public:
337   QueueHandle(std::shared_ptr<SharedBatchScheduler<TaskType>> scheduler,
338               Queue<TaskType>* queue);
339   ~QueueHandle() override;
340 
341   Status Schedule(std::unique_ptr<TaskType>* task) override;
342   size_t NumEnqueuedTasks() const override;
343   size_t SchedulingCapacity() const override;
344 
max_task_size()345   size_t max_task_size() const override { return queue_->max_task_size(); }
346 
347  private:
348   // The scheduler that owns 'queue_'.
349   std::shared_ptr<SharedBatchScheduler<TaskType>> scheduler_;
350 
351   // The queue this handle wraps. Owned by 'scheduler_', which keeps it alive at
352   // least until this class's destructor closes it.
353   Queue<TaskType>* queue_;
354 
355   TF_DISALLOW_COPY_AND_ASSIGN(QueueHandle);
356 };
357 
358 }  // namespace internal
359 
360 template <typename TaskType>
Create(const Options & options,std::shared_ptr<SharedBatchScheduler<TaskType>> * scheduler)361 Status SharedBatchScheduler<TaskType>::Create(
362     const Options& options,
363     std::shared_ptr<SharedBatchScheduler<TaskType>>* scheduler) {
364   if (options.num_batch_threads < 1) {
365     return errors::InvalidArgument("num_batch_threads must be positive; was ",
366                                    options.num_batch_threads);
367   }
368   scheduler->reset(new SharedBatchScheduler<TaskType>(options));
369   return Status::OK();
370 }
371 
372 template <typename TaskType>
~SharedBatchScheduler()373 SharedBatchScheduler<TaskType>::~SharedBatchScheduler() {
374   // Wait until the batch threads finish clearing out and deleting the closed
375   // queues.
376   for (;;) {
377     {
378       mutex_lock l(mu_);
379       if (queues_.empty()) {
380         break;
381       }
382     }
383     const int64 kSleepTimeMicros = 100;
384     options_.env->SleepForMicroseconds(kSleepTimeMicros);
385   }
386   // Delete the batch threads before allowing state the threads may access (e.g.
387   // 'mu_') to be deleted.
388   batch_threads_.clear();
389 }
390 
391 template <typename TaskType>
AddQueue(const QueueOptions & options,std::function<void (std::unique_ptr<Batch<TaskType>>)> process_batch_callback,std::unique_ptr<BatchScheduler<TaskType>> * queue)392 Status SharedBatchScheduler<TaskType>::AddQueue(
393     const QueueOptions& options,
394     std::function<void(std::unique_ptr<Batch<TaskType>>)>
395         process_batch_callback,
396     std::unique_ptr<BatchScheduler<TaskType>>* queue) {
397   if (options.max_batch_size == 0) {
398     return errors::InvalidArgument("max_batch_size must be positive; was ",
399                                    options.max_batch_size);
400   }
401   if (options.batch_timeout_micros < 0) {
402     return errors::InvalidArgument(
403         "batch_timeout_micros must be non-negative; was ",
404         options.batch_timeout_micros);
405   }
406   if (options.max_enqueued_batches < 0) {
407     return errors::InvalidArgument(
408         "max_enqueued_batches must be non-negative; was ",
409         options.max_enqueued_batches);
410   }
411 
412   auto schedulable_batch_callback = [this] {
413     mutex_lock l(mu_);
414     schedulable_batch_cv_.notify_one();
415   };
416   auto internal_queue =
417       std::unique_ptr<internal::Queue<TaskType>>(new internal::Queue<TaskType>(
418           options, options_.env, process_batch_callback,
419           schedulable_batch_callback));
420   auto handle = std::unique_ptr<BatchScheduler<TaskType>>(
421       new internal::QueueHandle<TaskType>(this->shared_from_this(),
422                                           internal_queue.get()));
423   {
424     mutex_lock l(mu_);
425     queues_.push_back(std::move(internal_queue));
426     if (next_queue_to_schedule_ == queues_.end()) {
427       next_queue_to_schedule_ = queues_.begin();
428     }
429   }
430   *queue = std::move(handle);
431   return Status::OK();
432 }
433 
434 template <typename TaskType>
SharedBatchScheduler(const Options & options)435 SharedBatchScheduler<TaskType>::SharedBatchScheduler(const Options& options)
436     : options_(options), next_queue_to_schedule_(queues_.end()) {
437   // Kick off the batch threads.
438   PeriodicFunction::Options periodic_fn_options;
439   periodic_fn_options.thread_name_prefix =
440       strings::StrCat(options.thread_pool_name, "_");
441   for (int i = 0; i < options.num_batch_threads; ++i) {
442     std::unique_ptr<PeriodicFunction> thread(new PeriodicFunction(
443         [this] { this->ThreadLogic(); },
444         0 /* function invocation interval time */, periodic_fn_options));
445     batch_threads_.push_back(std::move(thread));
446   }
447 }
448 
449 template <typename TaskType>
ThreadLogic()450 void SharedBatchScheduler<TaskType>::ThreadLogic() {
451   // A batch to process next (or nullptr if no work to do).
452   std::unique_ptr<Batch<TaskType>> batch_to_process;
453   // The queue with which 'batch_to_process' is associated.
454   internal::Queue<TaskType>* queue_for_batch = nullptr;
455   {
456     mutex_lock l(mu_);
457 
458     const int num_queues = queues_.size();
459     for (int num_queues_tried = 0;
460          batch_to_process == nullptr && num_queues_tried < num_queues;
461          ++num_queues_tried) {
462       DCHECK(next_queue_to_schedule_ != queues_.end());
463 
464       // If a closed queue responds to ScheduleBatch() with nullptr, the queue
465       // will never yield any further batches so we can drop it. To avoid a
466       // race, we take a snapshot of the queue's closedness state *before*
467       // calling ScheduleBatch().
468       const bool queue_closed = (*next_queue_to_schedule_)->closed();
469 
470       // Ask '*next_queue_to_schedule_' if it wants us to process a batch.
471       batch_to_process = (*next_queue_to_schedule_)->ScheduleBatch();
472       if (batch_to_process != nullptr) {
473         queue_for_batch = next_queue_to_schedule_->get();
474       }
475 
476       // Advance 'next_queue_to_schedule_'.
477       if (queue_closed && (*next_queue_to_schedule_)->IsEmpty() &&
478           batch_to_process == nullptr) {
479         // We've encountered a closed queue with no work to do. Drop it.
480         DCHECK_NE(queue_for_batch, next_queue_to_schedule_->get());
481         next_queue_to_schedule_ = queues_.erase(next_queue_to_schedule_);
482       } else {
483         ++next_queue_to_schedule_;
484       }
485       if (next_queue_to_schedule_ == queues_.end() && !queues_.empty()) {
486         // We've hit the end. Wrap to the first queue.
487         next_queue_to_schedule_ = queues_.begin();
488       }
489     }
490 
491     if (batch_to_process == nullptr) {
492       // We couldn't find any work to do. Wait until a new batch becomes
493       // schedulable, or some time has elapsed, before checking again.
494       const int64 kTimeoutMillis = 1;  // The smallest accepted granule of time.
495       WaitForMilliseconds(&l, &schedulable_batch_cv_, kTimeoutMillis);
496       return;
497     }
498   }
499 
500   queue_for_batch->ProcessBatch(std::move(batch_to_process));
501 }
502 
503 namespace internal {
504 
505 template <typename TaskType>
Queue(const typename SharedBatchScheduler<TaskType>::QueueOptions & options,Env * env,ProcessBatchCallback process_batch_callback,SchedulableBatchCallback schedulable_batch_callback)506 Queue<TaskType>::Queue(
507     const typename SharedBatchScheduler<TaskType>::QueueOptions& options,
508     Env* env, ProcessBatchCallback process_batch_callback,
509     SchedulableBatchCallback schedulable_batch_callback)
510     : options_(options),
511       env_(env),
512       process_batch_callback_(process_batch_callback),
513       schedulable_batch_callback_(schedulable_batch_callback) {
514   // Create an initial, open batch.
515   batches_.emplace_back(new Batch<TaskType>);
516 }
517 
518 template <typename TaskType>
~Queue()519 Queue<TaskType>::~Queue() {
520   mutex_lock l(mu_);
521   DCHECK(IsEmptyInternal());
522 
523   // Close the (empty) open batch, so its destructor doesn't block.
524   batches_.back()->Close();
525 }
526 
527 template <typename TaskType>
Schedule(std::unique_ptr<TaskType> * task)528 Status Queue<TaskType>::Schedule(std::unique_ptr<TaskType>* task) {
529   if ((*task)->size() > options_.max_batch_size) {
530     return errors::InvalidArgument("Task size ", (*task)->size(),
531                                    " is larger than maximum batch size ",
532                                    options_.max_batch_size);
533   }
534 
535   bool notify_of_schedulable_batch = false;
536   {
537     mutex_lock l(mu_);
538 
539     DCHECK(!closed_);
540 
541     if (batches_.back()->size() + (*task)->size() > options_.max_batch_size) {
542       if (batches_.size() >= options_.max_enqueued_batches) {
543         return errors::Unavailable(
544             "The batch scheduling queue to which this task was submitted is "
545             "full");
546       }
547       StartNewBatch();
548     }
549     if (batches_.back()->empty()) {
550       open_batch_start_time_micros_ = env_->NowMicros();
551     }
552     batches_.back()->AddTask(std::move(*task));
553 
554     if (!schedulable_batch_) {
555       if (batches_.size() > 1 || IsOpenBatchSchedulable()) {
556         schedulable_batch_ = true;
557         notify_of_schedulable_batch = true;
558       }
559     }
560   }
561 
562   if (notify_of_schedulable_batch) {
563     schedulable_batch_callback_();
564   }
565 
566   return Status::OK();
567 }
568 
569 template <typename TaskType>
NumEnqueuedTasks()570 size_t Queue<TaskType>::NumEnqueuedTasks() const {
571   mutex_lock l(mu_);
572   size_t num_enqueued_tasks = 0;
573   for (const auto& batch : batches_) {
574     num_enqueued_tasks += batch->num_tasks();
575   }
576   return num_enqueued_tasks;
577 }
578 
579 template <typename TaskType>
SchedulingCapacity()580 size_t Queue<TaskType>::SchedulingCapacity() const {
581   mutex_lock l(mu_);
582   const int num_new_batches_schedulable =
583       options_.max_enqueued_batches - batches_.size();
584   const int open_batch_capacity =
585       options_.max_batch_size - batches_.back()->size();
586   return (num_new_batches_schedulable * options_.max_batch_size) +
587          open_batch_capacity;
588 }
589 
590 template <typename TaskType>
ScheduleBatch()591 std::unique_ptr<Batch<TaskType>> Queue<TaskType>::ScheduleBatch() {
592   // The batch to schedule, which we may populate below. (If left as nullptr,
593   // that means we are electing not to schedule a batch at this time.)
594   std::unique_ptr<Batch<TaskType>> batch_to_schedule;
595 
596   {
597     mutex_lock l(mu_);
598 
599     // Consider closing the open batch at this time, to schedule it.
600     if (batches_.size() == 1 && IsOpenBatchSchedulable()) {
601       StartNewBatch();
602     }
603 
604     if (batches_.size() >= 2) {
605       // There is at least one closed batch that is ready to be scheduled.
606       ++num_batches_being_processed_;
607       batch_to_schedule = std::move(batches_.front());
608       batches_.pop_front();
609     } else {
610       schedulable_batch_ = false;
611     }
612   }
613 
614   return batch_to_schedule;
615 }
616 
617 template <typename TaskType>
ProcessBatch(std::unique_ptr<Batch<TaskType>> batch)618 void Queue<TaskType>::ProcessBatch(std::unique_ptr<Batch<TaskType>> batch) {
619   process_batch_callback_(std::move(batch));
620 
621   {
622     mutex_lock l(mu_);
623     --num_batches_being_processed_;
624     if (empty_notification_ != nullptr && IsEmptyInternal()) {
625       empty_notification_->Notify();
626     }
627   }
628 }
629 
630 template <typename TaskType>
IsEmpty()631 bool Queue<TaskType>::IsEmpty() const {
632   mutex_lock l(mu_);
633   return IsEmptyInternal();
634 }
635 
636 template <typename TaskType>
CloseAndWaitUntilEmpty()637 void Queue<TaskType>::CloseAndWaitUntilEmpty() {
638   Notification empty;
639   {
640     mutex_lock l(mu_);
641     closed_ = true;
642     if (IsEmptyInternal()) {
643       empty.Notify();
644     } else {
645       // Arrange for ProcessBatch() to notify when the queue becomes empty.
646       empty_notification_ = &empty;
647     }
648   }
649   empty.WaitForNotification();
650 }
651 
652 template <typename TaskType>
IsEmptyInternal()653 bool Queue<TaskType>::IsEmptyInternal() const {
654   return num_batches_being_processed_ == 0 && batches_.size() == 1 &&
655          batches_.back()->empty();
656 }
657 
658 template <typename TaskType>
StartNewBatch()659 void Queue<TaskType>::StartNewBatch() {
660   batches_.back()->Close();
661   batches_.emplace_back(new Batch<TaskType>);
662 }
663 
664 template <typename TaskType>
IsOpenBatchSchedulable()665 bool Queue<TaskType>::IsOpenBatchSchedulable() const {
666   Batch<TaskType>* open_batch = batches_.back().get();
667   if (open_batch->empty()) {
668     return false;
669   }
670   return closed_ || open_batch->size() >= options_.max_batch_size ||
671          env_->NowMicros() >=
672              open_batch_start_time_micros_ + options_.batch_timeout_micros;
673 }
674 
675 template <typename TaskType>
QueueHandle(std::shared_ptr<SharedBatchScheduler<TaskType>> scheduler,Queue<TaskType> * queue)676 QueueHandle<TaskType>::QueueHandle(
677     std::shared_ptr<SharedBatchScheduler<TaskType>> scheduler,
678     Queue<TaskType>* queue)
679     : scheduler_(scheduler), queue_(queue) {}
680 
681 template <typename TaskType>
~QueueHandle()682 QueueHandle<TaskType>::~QueueHandle() {
683   queue_->CloseAndWaitUntilEmpty();
684 }
685 
686 template <typename TaskType>
Schedule(std::unique_ptr<TaskType> * task)687 Status QueueHandle<TaskType>::Schedule(std::unique_ptr<TaskType>* task) {
688   return queue_->Schedule(task);
689 }
690 
691 template <typename TaskType>
NumEnqueuedTasks()692 size_t QueueHandle<TaskType>::NumEnqueuedTasks() const {
693   return queue_->NumEnqueuedTasks();
694 }
695 
696 template <typename TaskType>
SchedulingCapacity()697 size_t QueueHandle<TaskType>::SchedulingCapacity() const {
698   return queue_->SchedulingCapacity();
699 }
700 
701 }  // namespace internal
702 
703 }  // namespace serving
704 }  // namespace tensorflow
705 
706 #endif  // TENSORFLOW_CORE_KERNELS_BATCHING_UTIL_SHARED_BATCH_SCHEDULER_H_
707