1 /* Copyright 2016 The TensorFlow Authors. All Rights Reserved.
2 
3 Licensed under the Apache License, Version 2.0 (the "License");
4 you may not use this file except in compliance with the License.
5 You may obtain a copy of the License at
6 
7     http://www.apache.org/licenses/LICENSE-2.0
8 
9 Unless required by applicable law or agreed to in writing, software
10 distributed under the License is distributed on an "AS IS" BASIS,
11 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 See the License for the specific language governing permissions and
13 limitations under the License.
14 ==============================================================================*/
15 
16 #ifndef TENSORFLOW_CORE_KERNELS_BATCHING_UTIL_BASIC_BATCH_SCHEDULER_H_
17 #define TENSORFLOW_CORE_KERNELS_BATCHING_UTIL_BASIC_BATCH_SCHEDULER_H_
18 
19 #include <stddef.h>
20 #include <cstddef>
21 #include <functional>
22 #include <memory>
23 #include <string>
24 
25 #include "tensorflow/core/kernels/batching_util/shared_batch_scheduler.h"
26 
27 namespace tensorflow {
28 namespace serving {
29 
30 // A BatchScheduler implementation geared toward handling a single request type
31 // running on a specific set of hardware resources. A typical scenario is one in
32 // which all requests invoke the same machine-learned model on one GPU.
33 //
34 // If there are, say, two GPUs and two models each bound to one of the GPUs, one
35 // could use two BasicBatchScheduler instances to schedule the two model/GPU
36 // combinations independently. If multiple models must share a given GPU or
37 // other hardware resource, consider using SharedBatchScheduler instead.
38 //
39 //
40 // PARAMETERS AND BEHAVIOR:
41 //
42 // BasicBatchScheduler runs a fixed pool of threads, which it uses to process
43 // batches of tasks. It enforces a maximum batch size, and enqueues a bounded
44 // number of tasks. If the queue is nearly empty, such that a full batch cannot
45 // be formed, when a thread becomes free, it anyway schedules a batch
46 // immediately if a task has been in the queue for longer than a given timeout
47 // parameter. If the timeout parameter is set to 0, then the batch threads will
48 // always be kept busy (unless there are zero tasks waiting to be processed).
49 //
50 // For online serving, it is recommended to set the maximum number of enqueued
51 // batches worth of tasks equal to the number of batch threads, which allows
52 // enqueuing of enough tasks s.t. if every thread becomes available it can be
53 // kept busy, but no more. For bulk processing jobs and throughput-oriented
54 // benchmarks, you may want to set it much higher.
55 //
56 // When Schedule() is called, if the queue is full the call will fail with an
57 // UNAVAILABLE error (after which the client may retry again later). If the call
58 // succeeds, the maximum time the task will spend in the queue before being
59 // placed in a batch and assigned to a thread for processing, is the greater of:
60 //  - the maximum time to process ceil(max_enqueued_batches/num_batch_threads)
61 //    (1 in the recommended configuration) batches of previously-submitted tasks
62 //  - the configured timeout parameter (which can be 0, as mentioned above)
63 //
64 // Unlike StreamingBatchScheduler, when BasicBatchScheduler assigns a batch to a
65 // thread, it closes the batch. The process-batch callback may assume that every
66 // batch it receives is closed at the outset.
67 //
68 //
69 // RECOMMENDED USE-CASES:
70 //
71 // BasicBatchScheduler is suitable for use-cases that feature a single kind of
72 // request (e.g. a server performing inference with a single machine-learned
73 // model, possibly evolving over time), with loose versioning semantics.
74 // Concretely, the following conditions should hold:
75 //
76 //  A. All requests batched onto a given resource (e.g. a hardware accelerator,
77 //     or a pool accelerators) are of the same type. For example, they all
78 //     invoke the same machine-learned model.
79 //
80 //     These variations are permitted:
81 //      - The model may reside in a single servable, or it may be spread across
82 //        multiple servables that are used in unison (e.g. a vocabulary lookup
83 //        table servable and a tensorflow session servable).
84 //      - The model's servable(s) may be static, or they may evolve over time
85 //        (successive servable versions).
86 //      - Zero or more of the servables are used in the request thread; the rest
87 //        are used in the batch thread. In our running example, the vocabulary
88 //        lookups and tensorflow runs may both be performed in the batch thread,
89 //        or alternatively the vocabulary lookup may occur in the request thread
90 //        with only the tensorflow run performed in the batch thread.
91 //
92 //     In contrast, BasicBatchScheduler is not a good fit if the server
93 //     hosts multiple distinct models running on a pool accelerators, with each
94 //     request specifying which model it wants to use. BasicBatchScheduler
95 //     has no facility to time-multiplex the batch threads across multiple
96 //     models in a principled way. More basically, it cannot ensure that a given
97 //     batch doesn't contain a mixture of requests for different models.
98 //
99 //  B. Requests do not specify a particular version of the servable(s) that must
100 //     be used. Instead, each request is content to use the "latest" version.
101 //
102 //     BasicBatchScheduler does not constrain which requests get grouped
103 //     together into a batch, so using this scheduler there is no way to achieve
104 //     cohesion of versioned requests to version-specific batches.
105 //
106 //  C. No servable version coordination needs to be performed between the
107 //     request threads and the batch threads. Often, servables are only used in
108 //     the batch threads, in which case this condition trivially holds. If
109 //     servables are used in both threads, then the use-case must tolerate
110 //     version skew across the servables used in the two kinds of threads.
111 //
112 //
113 // EXAMPLE USE-CASE FLOW:
114 //
115 // For such use-cases, request processing via BasicBatchScheduler generally
116 // follows this flow (given for illustration; variations are possible):
117 //  1. Optionally perform some pre-processing on each request in the request
118 //     threads.
119 //  2. Route the requests to the batch scheduler, as batching::Task objects.
120 //     (Since all requests are of the same type and are not versioned, the
121 //     scheduler is free to group them into batches arbitrarily.)
122 //  3. Merge the requests into a single batched representation B.
123 //  4. Obtain handles to the servable(s) needed to process B. The simplest
124 //     approach is to obtain the latest version of each servable. Alternatively,
125 //     if cross-servable consistency is required (e.g. the vocabulary lookup
126 //     table's version number must match that of the tensorflow session),
127 //     identify an appropriate version number and obtain the servable handles
128 //     accordingly.
129 //  5. Process B using the obtained servable handles, and split the result into
130 //     individual per-request units.
131 //  6. Perform any post-processing in the batch thread and/or request thread.
132 //
133 //
134 // PERFORMANCE TUNING: See README.md.
135 //
136 template <typename TaskType>
137 class BasicBatchScheduler : public BatchScheduler<TaskType> {
138  public:
139   // TODO(b/25089730): Tune defaults based on best practices as they develop.
140   // (Keep them mirrored to the ones in SharedBatchScheduler::QueueOptions and
141   // SharedBatchScheduler::Options.)
142   struct Options {
143     // The maximum size of each batch.
144     //
145     // The scheduler may form batches of any size between 1 and this number
146     // (inclusive). If there is a need to quantize the batch sizes, i.e. only
147     // submit batches whose size is in a small set of allowed sizes, that can be
148     // done by adding padding in the process-batch callback.
149     int max_batch_size = 1000;
150 
151     // If a task has been enqueued for this amount of time (in microseconds),
152     // and a thread is available, the scheduler will immediately form a batch
153     // from enqueued tasks and assign the batch to the thread for processing,
154     // even if the batch's size is below 'max_batch_size'.
155     //
156     // This parameter offers a way to bound queue latency, so that a task isn't
157     // stuck in the queue indefinitely waiting for enough tasks to arrive to
158     // make a full batch. (The latency bound is given in the class documentation
159     // above.)
160     //
161     // The goal is to smooth out batch sizes under low request rates, and thus
162     // avoid latency spikes.
163     int64 batch_timeout_micros = 0;
164 
165     // The name to use for the pool of batch threads.
166     string thread_pool_name = {"batch_threads"};
167 
168     // The number of threads to use to process batches.
169     // Must be >= 1, and should be tuned carefully.
170     int num_batch_threads = port::NumSchedulableCPUs();
171 
172     // The maximum allowable number of enqueued (accepted by Schedule() but
173     // not yet being processed on a batch thread) tasks in terms of batches.
174     // If this limit is reached, Schedule() will return an UNAVAILABLE error.
175     // See the class documentation above for guidelines on how to tune this
176     // parameter.
177     int max_enqueued_batches = 10;
178 
179     // The following options are typically only overridden by test code.
180 
181     // The environment to use.
182     Env* env = Env::Default();
183   };
184   static Status Create(const Options& options,
185                        std::function<void(std::unique_ptr<Batch<TaskType>>)>
186                            process_batch_callback,
187                        std::unique_ptr<BasicBatchScheduler>* scheduler);
188 
189   ~BasicBatchScheduler() override = default;
190 
191   Status Schedule(std::unique_ptr<TaskType>* task) override;
192   size_t NumEnqueuedTasks() const override;
193   size_t SchedulingCapacity() const override;
194 
max_task_size()195   size_t max_task_size() const override {
196     return shared_scheduler_queue_->max_task_size();
197   }
198 
199  private:
200   explicit BasicBatchScheduler(
201       std::unique_ptr<BatchScheduler<TaskType>> shared_scheduler_queue);
202 
203   // This class is merely a thin wrapper around a SharedBatchScheduler with a
204   // single queue.
205   std::unique_ptr<BatchScheduler<TaskType>> shared_scheduler_queue_;
206 
207   TF_DISALLOW_COPY_AND_ASSIGN(BasicBatchScheduler);
208 };
209 
210 //////////
211 // Implementation details follow. API users need not read.
212 
213 template <typename TaskType>
Create(const Options & options,std::function<void (std::unique_ptr<Batch<TaskType>>)> process_batch_callback,std::unique_ptr<BasicBatchScheduler> * scheduler)214 Status BasicBatchScheduler<TaskType>::Create(
215     const Options& options,
216     std::function<void(std::unique_ptr<Batch<TaskType>>)>
217         process_batch_callback,
218     std::unique_ptr<BasicBatchScheduler>* scheduler) {
219   typename SharedBatchScheduler<TaskType>::Options shared_scheduler_options;
220   shared_scheduler_options.thread_pool_name = options.thread_pool_name;
221   shared_scheduler_options.num_batch_threads = options.num_batch_threads;
222   shared_scheduler_options.env = options.env;
223   std::shared_ptr<SharedBatchScheduler<TaskType>> shared_scheduler;
224   TF_RETURN_IF_ERROR(SharedBatchScheduler<TaskType>::Create(
225       shared_scheduler_options, &shared_scheduler));
226 
227   typename SharedBatchScheduler<TaskType>::QueueOptions
228       shared_scheduler_queue_options;
229   shared_scheduler_queue_options.max_batch_size = options.max_batch_size;
230   shared_scheduler_queue_options.batch_timeout_micros =
231       options.batch_timeout_micros;
232   shared_scheduler_queue_options.max_enqueued_batches =
233       options.max_enqueued_batches;
234   std::unique_ptr<BatchScheduler<TaskType>> shared_scheduler_queue;
235   TF_RETURN_IF_ERROR(shared_scheduler->AddQueue(shared_scheduler_queue_options,
236                                                 process_batch_callback,
237                                                 &shared_scheduler_queue));
238 
239   scheduler->reset(
240       new BasicBatchScheduler<TaskType>(std::move(shared_scheduler_queue)));
241   return Status::OK();
242 }
243 
244 template <typename TaskType>
Schedule(std::unique_ptr<TaskType> * task)245 Status BasicBatchScheduler<TaskType>::Schedule(
246     std::unique_ptr<TaskType>* task) {
247   return shared_scheduler_queue_->Schedule(task);
248 }
249 
250 template <typename TaskType>
NumEnqueuedTasks()251 size_t BasicBatchScheduler<TaskType>::NumEnqueuedTasks() const {
252   return shared_scheduler_queue_->NumEnqueuedTasks();
253 }
254 
255 template <typename TaskType>
SchedulingCapacity()256 size_t BasicBatchScheduler<TaskType>::SchedulingCapacity() const {
257   return shared_scheduler_queue_->SchedulingCapacity();
258 }
259 
260 template <typename TaskType>
BasicBatchScheduler(std::unique_ptr<BatchScheduler<TaskType>> shared_scheduler_queue)261 BasicBatchScheduler<TaskType>::BasicBatchScheduler(
262     std::unique_ptr<BatchScheduler<TaskType>> shared_scheduler_queue)
263     : shared_scheduler_queue_(std::move(shared_scheduler_queue)) {}
264 
265 }  // namespace serving
266 }  // namespace tensorflow
267 
268 #endif  // TENSORFLOW_CORE_KERNELS_BATCHING_UTIL_BASIC_BATCH_SCHEDULER_H_
269