1 // Copyright 2015 Google Inc. All Rights Reserved.
2 //
3 // Licensed under the Apache License, Version 2.0 (the "License");
4 // you may not use this file except in compliance with the License.
5 // You may obtain a copy of the License at
6 //
7 //     http://www.apache.org/licenses/LICENSE-2.0
8 //
9 // Unless required by applicable law or agreed to in writing, software
10 // distributed under the License is distributed on an "AS IS" BASIS,
11 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 // See the License for the specific language governing permissions and
13 // limitations under the License.
14 
15 // multi_thread_gemm.h: Multi-threaded GEMM entry point.
16 // Readers note: To understand this file, it is useful to first
17 // read and understand the much simpler single_thread_gemm.h.
18 
19 #ifndef GEMMLOWP_INTERNAL_MULTI_THREAD_GEMM_H_
20 #define GEMMLOWP_INTERNAL_MULTI_THREAD_GEMM_H_
21 
22 #include <pthread.h>
23 #include <unistd.h>
24 #include <vector>
25 
26 #include "single_thread_gemm.h"
27 
28 namespace gemmlowp {
29 
30 #ifdef GEMMLOWP_ALLOW_INLINE_ASM
31 // Where inline asm is allowed, we use some busy-waiting,
32 // preferably implemented using NOP instructions.
33 const int kMaxBusyWaitNOPs = 32 * 1000 * 1000;
34 
35 #define GEMMLOWP_NOP "nop\n"
36 
37 #define GEMMLOWP_STRING_CONCAT_4(X) X X X X
38 #define GEMMLOWP_NOP4 GEMMLOWP_STRING_CONCAT_4(GEMMLOWP_NOP)
39 #define GEMMLOWP_NOP16 GEMMLOWP_STRING_CONCAT_4(GEMMLOWP_NOP4)
40 #define GEMMLOWP_NOP64 GEMMLOWP_STRING_CONCAT_4(GEMMLOWP_NOP16)
41 #define GEMMLOWP_NOP256 GEMMLOWP_STRING_CONCAT_4(GEMMLOWP_NOP64)
42 
Do256NOPs()43 inline int Do256NOPs() {
44   asm volatile(GEMMLOWP_NOP256);
45   return 256;
46 }
47 
48 #undef GEMMLOWP_STRING_CONCAT_4
49 #undef GEMMLOWP_NOP256
50 #undef GEMMLOWP_NOP64
51 #undef GEMMLOWP_NOP16
52 #undef GEMMLOWP_NOP4
53 #undef GEMMLOWP_NOP
54 
55 #else  // not GEMMLOWP_ALLOW_INLINE_ASM
56 
57 // It is nontrivial to implement a good busy-waiting without
58 // using asm; NOP instructions have the least side effects
59 // and the lowest power usage; and since the whole busy-waiting
60 // story is an optimization, it's not very interesting anyway
61 // in places where we're slow anyway due to not being able to
62 // use our inline asm kernels.
63 
64 const int kMaxBusyWaitNOPs = 0;
65 inline int Do256NOPs() { return 0; }
66 
67 #endif  // not GEMMLOWP_ALLOW_INLINE_ASM
68 
WriteBarrier()69 inline void WriteBarrier() {
70 #ifdef GEMMLOWP_ARM_32
71   MemoryBarrier();
72 #elif defined(GEMMLOWP_ARM_64)
73   asm volatile("dmb ishst" ::: "memory");
74 #elif defined(GEMMLOWP_X86)
75   asm volatile("sfence" ::: "memory");
76 #elif defined(__mips__)
77   MemoryBarrier();
78 #else
79 #error "Unsupported architecture for WriteBarrier."
80 #endif
81 }
82 
ReadBarrier()83 inline void ReadBarrier() {
84 #ifdef GEMMLOWP_ARM_32
85   MemoryBarrier();
86 #elif defined(GEMMLOWP_ARM_64)
87   asm volatile("dmb ishld" ::: "memory");
88 #elif defined(GEMMLOWP_X86)
89   asm volatile("lfence" ::: "memory");
90 #elif defined(__mips__)
91   MemoryBarrier();
92 #else
93 #error "Unsupported architecture for ReadBarrier."
94 #endif
95 }
96 
97 // Waits until *var != initial_value.
98 //
99 // Returns the new value of *var. The guarantee here is that
100 // the return value is different from initial_value, and that that
101 // new value has been taken by *var at some point during the
102 // execution of this function. There is no guarantee that this is
103 // still the value of *var when this function returns, since *var is
104 // not assumed to be guarded by any lock.
105 //
106 // First does some busy-waiting for a fixed number of no-op cycles,
107 // then falls back to passive waiting for the given condvar, guarded
108 // by the given mutex.
109 //
110 // The idea of doing some initial busy-waiting is to help get
111 // better and more consistent multithreading benefits for small GEMM sizes.
112 // Busy-waiting help ensuring that if we need to wake up soon after having
113 // started waiting, then we can wake up quickly (as opposed to, say,
114 // having to wait to be scheduled again by the OS). On the other hand,
115 // we must still eventually revert to passive waiting for longer waits
116 // (e.g. worker threads having finished a GEMM and waiting until the next GEMM)
117 // so as to avoid permanently spinning.
118 //
119 template <typename T>
WaitForVariableChange(volatile T * var,T initial_value,pthread_cond_t * cond,pthread_mutex_t * mutex)120 T WaitForVariableChange(volatile T* var, T initial_value, pthread_cond_t* cond,
121                         pthread_mutex_t* mutex) {
122   int nops = 0;
123   // First, trivial case where the variable already changed value.
124   T new_value = *var;
125   if (new_value != initial_value) {
126     return new_value;
127   }
128   // Then try busy-waiting.
129   while (nops < kMaxBusyWaitNOPs) {
130     nops += Do256NOPs();
131     new_value = *var;
132     if (new_value != initial_value) {
133       return new_value;
134     }
135   }
136   // Finally, do real passive waiting.
137   pthread_mutex_lock(mutex);
138   new_value = *var;
139   if (new_value == initial_value) {
140     pthread_cond_wait(cond, mutex);
141     new_value = *var;
142     assert(new_value != initial_value);
143   }
144   pthread_mutex_unlock(mutex);
145   return new_value;
146 }
147 
148 // A BlockingCounter lets one thread to wait for N events to occur.
149 // This is how the master thread waits for all the worker threads
150 // to have finished working.
151 class BlockingCounter {
152  public:
BlockingCounter()153   BlockingCounter()
154       : cond_(PTHREAD_COND_INITIALIZER),
155         mutex_(PTHREAD_MUTEX_INITIALIZER),
156         count_(0),
157         initial_count_(0) {}
158 
159   // Sets/resets the counter; initial_count is the number of
160   // decrementing events that the Wait() call will be waiting for.
Reset(std::size_t initial_count)161   void Reset(std::size_t initial_count) {
162     pthread_mutex_lock(&mutex_);
163     assert(count_ == 0);
164     initial_count_ = initial_count;
165     count_ = initial_count_;
166     pthread_mutex_unlock(&mutex_);
167   }
168 
169   // Decrements the counter; if the counter hits zero, signals
170   // the thread that was waiting for that, and returns true.
171   // Otherwise (if the decremented count is still nonzero),
172   // returns false.
DecrementCount()173   bool DecrementCount() {
174     pthread_mutex_lock(&mutex_);
175     assert(count_ > 0);
176     count_--;
177     if (count_ == 0) {
178       pthread_cond_signal(&cond_);
179     }
180     bool retval = count_ == 0;
181     pthread_mutex_unlock(&mutex_);
182     return retval;
183   }
184 
185   // Waits for the N other threads (N having been set by Reset())
186   // to hit the BlockingCounter.
Wait()187   void Wait() {
188     ScopedProfilingLabel label("BlockingCounter::Wait");
189     while (count_) {
190       MemoryBarrier();
191       const std::size_t count_value = count_;
192       if (count_value) {
193         WaitForVariableChange(&count_, count_value, &cond_, &mutex_);
194       }
195     }
196   }
197 
198  private:
199   pthread_cond_t cond_;
200   pthread_mutex_t mutex_;
201   std::size_t count_;
202   std::size_t initial_count_;
203 };
204 
205 // A workload for a worker.
206 struct Task {
TaskTask207   Task() : local_allocator(nullptr) {}
~TaskTask208   virtual ~Task() {}
209   virtual void Run() const = 0;
210   Allocator* local_allocator;
211 };
212 
213 // A worker thread.
214 class Worker {
215  public:
216   enum class State {
217     ThreadStartup,  // The initial state before the thread main loop runs.
218     Ready,          // Is not working, has not yet received new work to do.
219     HasWork,        // Has work to do.
220     ExitAsSoonAsPossible  // Should exit at earliest convenience.
221   };
222 
Worker(BlockingCounter * counter_to_decrement_when_ready)223   explicit Worker(BlockingCounter* counter_to_decrement_when_ready)
224       : task_(nullptr),
225         state_cond_(PTHREAD_COND_INITIALIZER),
226         state_mutex_(PTHREAD_MUTEX_INITIALIZER),
227         state_(State::ThreadStartup),
228         counter_to_decrement_when_ready_(counter_to_decrement_when_ready) {
229     pthread_create(&thread_, nullptr, ThreadFunc, this);
230   }
231 
~Worker()232   ~Worker() {
233     ChangeState(State::ExitAsSoonAsPossible);
234     pthread_join(thread_, nullptr);
235   }
236 
237   // Changes State; may be called from either the worker thread
238   // or the master thread; however, not all state transitions are legal,
239   // which is guarded by assertions.
ChangeState(State new_state)240   void ChangeState(State new_state) {
241     ScopedProfilingLabel label("Worker::ChangeState");
242     pthread_mutex_lock(&state_mutex_);
243     assert(new_state != state_);
244     switch (state_) {
245       case State::ThreadStartup:
246         assert(new_state == State::Ready);
247         break;
248       case State::Ready:
249         assert(new_state == State::HasWork ||
250                new_state == State::ExitAsSoonAsPossible);
251         break;
252       case State::HasWork:
253         assert(new_state == State::Ready ||
254                new_state == State::ExitAsSoonAsPossible);
255         break;
256       default:
257         abort();
258     }
259     state_ = new_state;
260     pthread_cond_signal(&state_cond_);
261     if (state_ == State::Ready) {
262       counter_to_decrement_when_ready_->DecrementCount();
263     }
264     pthread_mutex_unlock(&state_mutex_);
265   }
266 
267   // Thread entry point.
ThreadFunc()268   void ThreadFunc() {
269     ScopedProfilingLabel label("Worker::ThreadFunc");
270     RegisterCurrentThreadForProfiling();
271 
272     ChangeState(State::Ready);
273 
274     // Thread main loop
275     while (true) {
276       // Get a state to act on
277       // In the 'Ready' state, we have nothing to do but to wait until
278       // we switch to another state.
279       State state_to_act_upon = WaitForVariableChange(
280           &state_, State::Ready, &state_cond_, &state_mutex_);
281 
282       // We now have a state to act on, so act.
283       switch (state_to_act_upon) {
284         case State::HasWork:
285           // Got work to do! So do it, and then revert to 'Ready' state.
286           ReadBarrier();
287           assert(task_);
288           task_->Run();
289           delete task_;
290           task_ = nullptr;
291           ChangeState(State::Ready);
292           break;
293         case State::ExitAsSoonAsPossible:
294           return;
295         default:
296           abort();
297       }
298     }
299   }
300 
ThreadFunc(void * arg)301   static void* ThreadFunc(void* arg) {
302     static_cast<Worker*>(arg)->ThreadFunc();
303     return nullptr;
304   }
305 
306   // Called by the master thead to give this worker work to do.
307   // It is only legal to call this if the worker
StartWork(Task * task)308   void StartWork(Task* task) {
309     assert(!task_);
310     task->local_allocator = &local_allocator_;
311     task_ = task;
312     WriteBarrier();
313     assert(state_ == State::Ready);
314     ChangeState(State::HasWork);
315   }
316 
317  private:
318   // The underlying thread.
319   pthread_t thread_;
320 
321   // The task to be worked on.
322   const Task* task_;
323 
324   // The condition variable and mutex guarding state changes.
325   pthread_cond_t state_cond_;
326   pthread_mutex_t state_mutex_;
327 
328   // The state enum tells if we're currently working, waiting for work, etc.
329   State state_;
330 
331   // Each thread had a local allocator so they can allocate temporary
332   // buffers without blocking each other.
333   Allocator local_allocator_;
334 
335   // pointer to the master's thread BlockingCounter object, to notify the
336   // master thread of when this worker switches to the 'Ready' state.
337   BlockingCounter* const counter_to_decrement_when_ready_;
338 };
339 
340 // A very simple pool of workers, that only allows the very
341 // specific parallelization pattern that we use here:
342 // a fixed number of workers can be given work, and one then
343 // waits for all of them to finish.
344 class WorkersPool {
345  public:
WorkersPool()346   WorkersPool() {}
347 
~WorkersPool()348   ~WorkersPool() {
349     for (auto w : workers_) {
350       delete w;
351     }
352   }
353 
counter_to_decrement_when_ready()354   BlockingCounter& counter_to_decrement_when_ready() {
355     return counter_to_decrement_when_ready_;
356   }
357 
358   // Give work to a specific worker.
StartWorker(int index,Task * task_)359   void StartWorker(int index, Task* task_) {
360     assert(static_cast<std::size_t>(index) < workers_.size());
361     workers_[index]->StartWork(task_);
362   }
363 
364   // Ensures that the pool has at least the given count of workers.
365   // If any new worker has to be created, this function waits for it to
366   // be ready.
CreateWorkers(std::size_t workers_count)367   void CreateWorkers(std::size_t workers_count) {
368     if (workers_.size() >= workers_count) {
369       return;
370     }
371     counter_to_decrement_when_ready_.Reset(workers_count - workers_.size());
372     while (workers_.size() < workers_count) {
373       workers_.push_back(new Worker(&counter_to_decrement_when_ready_));
374     }
375     counter_to_decrement_when_ready_.Wait();
376   }
377 
378  private:
379   // copy construction disallowed
380   WorkersPool(const WorkersPool&) = delete;
381 
382   // The workers in this pool. They are owned by the pool:
383   // the pool creates workers and destroys them in its destructor.
384   std::vector<Worker*> workers_;
385 
386   // The BlockingCounter used to wait for the workers.
387   BlockingCounter counter_to_decrement_when_ready_;
388 };
389 
390 // The task we use to implement a multi-threaded Gemm: a block of the
391 // RHS has been packed by the master thread; each worker thread
392 // then has to pack a block of the LHS and accumulate the Gemm of these
393 // packed LHS and RHS blocks.
394 template <typename KernelFormat, typename InputScalar, typename OutputScalar,
395           typename BitDepthParams, MapOrder LhsOrder, MapOrder RhsOrder,
396           MapOrder ResultOrder, typename LhsOffset, typename RhsOffset,
397           typename OutputPipelineType>
398 struct GemmWithPackedRhsTask : Task {
399   typedef PackedSideBlock<typename KernelFormat::Lhs> PackedLhs;
400   typedef PackedSideBlock<typename KernelFormat::Rhs> PackedRhs;
GemmWithPackedRhsTaskGemmWithPackedRhsTask401   GemmWithPackedRhsTask(const KernelBase& _kernel,
402                         const MatrixMap<const InputScalar, LhsOrder>& _lhs,
403                         const PackedRhs& _packed_rhs,
404                         MatrixMap<OutputScalar, ResultOrder>* _result,
405                         const LhsOffset& _lhs_offset,
406                         const RhsOffset& _rhs_offset,
407                         const OutputPipelineType& _output_pipeline)
408       : kernel(_kernel),
409         lhs(_lhs),
410         packed_rhs(_packed_rhs),
411         result(*_result),
412         lhs_offset(_lhs_offset),
413         rhs_offset(_rhs_offset),
414         output_pipeline(_output_pipeline) {}
415 
RunGemmWithPackedRhsTask416   void Run() const override {
417     ScopedProfilingLabel label("GemmWithPackedRhsTask");
418 
419     const int rows = result.rows();
420     const int cols = result.cols();
421     const int depth = lhs.cols();
422 
423     BlockParams block_params;
424     block_params.Init<KernelFormat>(rows, cols, depth, 1);
425 
426     PackedLhs packed_lhs(Side::Lhs, local_allocator, block_params);
427 
428     PackedResult packed_result(local_allocator, block_params);
429 
430     local_allocator->Commit();
431 
432     for (int c = 0; c < cols; c += block_params.l2_cols) {
433       int cs = std::min(block_params.l2_cols, cols - c);
434 
435       for (int r = 0; r < rows; r += block_params.l2_rows) {
436         int rs = std::min(block_params.l2_rows, rows - r);
437 
438         PackLhs<BitDepthParams>(&packed_lhs, lhs.block(r, 0, rs, depth));
439 
440         Compute(kernel, block_params, &packed_result, packed_lhs, packed_rhs);
441 
442         auto result_block = result.block(r, c, rs, cs);
443         UnpackResult<BitDepthParams>(&result_block, packed_result, depth,
444                                      packed_lhs.sums_of_each_slice(),
445                                      packed_rhs.sums_of_each_slice(),
446                                      lhs_offset, rhs_offset, output_pipeline);
447       }
448     }
449 
450     local_allocator->Decommit();
451   }
452 
453   const KernelBase& kernel;
454   const MatrixMap<const InputScalar, LhsOrder> lhs;
455   const PackedRhs packed_rhs;
456   MatrixMap<OutputScalar, ResultOrder> result;
457   const LhsOffset& lhs_offset;
458   const RhsOffset& rhs_offset;
459   const OutputPipelineType& output_pipeline;
460 };
461 
462 class MultiThreadGemmContext : public SingleThreadGemmContext {
463  public:
MultiThreadGemmContext()464   MultiThreadGemmContext() : max_num_threads_(0) {}
465 
set_max_num_threads(int n)466   void set_max_num_threads(int n) { max_num_threads_ = n; }
467 
max_num_threads()468   int max_num_threads() const { return max_num_threads_; }
469 
workers_pool()470   WorkersPool* workers_pool() { return &workers_pool_; }
471 
main_thread_task_allocator()472   Allocator* main_thread_task_allocator() {
473     return &main_thread_task_allocator_;
474   }
475 
476  protected:
477   // The workers pool used by MultiThreadGemm. Making
478   // this part of the context allows it to be persistent,
479   // avoiding recreating threads on every Gemm.
480   WorkersPool workers_pool_;
481 
482   // The maximum number of worker threads to use (in addition
483   // to the master thread).
484   // The default value 0 means the default behavior of
485   // detecting the number of hardware threads. Nonzero values mean
486   // skipping and overriding hardware detection.
487   int max_num_threads_;
488 
489   // For N-threaded operations, we will use only N-1 worker threads
490   // while the last task will be run directly on the main thread.
491   // It will then use this main_thread_task_allocator_; having a
492   // dedicated allocator for that (separate from the base allocator_)
493   // allows to use the same code for all tasks regardless of which
494   // thread they run on.
495   Allocator main_thread_task_allocator_;
496 };
497 
498 // Determines how many threads should be used for a given Gemm
499 // operation.
500 template <int KernelRows>
HowManyThreads(MultiThreadGemmContext * context,int rows,int cols,int depth)501 inline int HowManyThreads(MultiThreadGemmContext* context, int rows, int cols,
502                           int depth) {
503   // First check if the user set an explicit maximum number of threads.
504   int max_count = context->max_num_threads();
505   if (!max_count) {
506     // No user-set maximum number of threads, so we need to
507     // do some hardware detection.
508     // This is expensive to query so we do it only once.
509     // Too bad for dynamicness. Also, we dont use the c++11 standard getter
510     // because Google's coding style currently bans #include <thread_>.
511     static const int hardware_threads_count =
512         static_cast<int>(sysconf(_SC_NPROCESSORS_CONF));
513 
514     max_count = hardware_threads_count;
515   }
516 
517   // Basic calculation: take into account max pool size, and
518   // how many rows we have to feed our kernel.
519   // The motivation for an absolute minimum number of rows per thread,
520   // potentially higher than KernelRows, is that very thin thread workload
521   // currently defeat assumptions of the AddMod generator, resulting
522   // in substantial bias in TestWithRealData on 24 threads.
523   // Ideally, the AddMod generator should be aware of global (r,c) coordinates
524   // so as to be independent of the number of threads.
525   static const int AbsoluteMinRowsPerThread = 16;
526   static const int MinRowsPerThread = KernelRows > AbsoluteMinRowsPerThread
527                                           ? KernelRows
528                                           : AbsoluteMinRowsPerThread;
529   int thread_count = std::min(max_count, CeilQuotient(rows, MinRowsPerThread));
530 
531   // At this point for small products we already have thread_count==1 so
532   // we can avoid doing more work; otherwise, we still want to check
533   // that the cubic size (rows*cols*depth) is big enough to keep
534   // workers_ busy.
535   if (thread_count > 1) {
536     // Empirically determined value.
537     static const std::uint64_t min_cubic_size_per_thread = 64 * 1024;
538 
539     // We can only multiply two out of three sizes without risking overflow
540     const std::uint64_t cubic_size =
541         std::uint64_t(rows) * std::uint64_t(cols) * std::uint64_t(depth);
542 
543     thread_count =
544         std::min(thread_count, int(cubic_size / min_cubic_size_per_thread));
545 
546     if (thread_count < 1) {
547       thread_count = 1;
548     }
549   }
550 
551   assert(thread_count > 0 && thread_count <= max_count);
552   return thread_count;
553 }
554 
555 // The main multi-threaded Gemm function.
556 // To understand it, first read the code of SingleThreadedGemm().
557 // The parallelization scheme used here is to have this master function
558 // pack a block of RHS and then start worker threads to pack a block of LHS
559 // each, and accumulate the corresponding products.
560 template <typename KernelFormat, typename InputScalar, typename OutputScalar,
561           typename BitDepthParams, MapOrder LhsOrder, MapOrder RhsOrder,
562           MapOrder ResultOrder, typename LhsOffset, typename RhsOffset,
563           typename OutputPipelineType>
MultiThreadGemm(MultiThreadGemmContext * context,const KernelBase & kernel,const MatrixMap<const InputScalar,LhsOrder> & lhs,const MatrixMap<const InputScalar,RhsOrder> & rhs,MatrixMap<OutputScalar,ResultOrder> * result,const LhsOffset & lhs_offset,const RhsOffset & rhs_offset,const OutputPipelineType & output_pipeline)564 void MultiThreadGemm(MultiThreadGemmContext* context, const KernelBase& kernel,
565                      const MatrixMap<const InputScalar, LhsOrder>& lhs,
566                      const MatrixMap<const InputScalar, RhsOrder>& rhs,
567                      MatrixMap<OutputScalar, ResultOrder>* result,
568                      const LhsOffset& lhs_offset, const RhsOffset& rhs_offset,
569                      const OutputPipelineType& output_pipeline) {
570   ScopedProfilingLabel label("gemmlowp::MultiThreadGemm");
571 
572   assert(lhs.cols() == rhs.rows());
573 
574   int rows = result->rows();
575   int cols = result->cols();
576   int depth = lhs.cols();
577 
578   assert(rows > 0);
579   assert(cols > 0);
580   assert(depth > 0);
581 
582   const int thread_count =
583       HowManyThreads<KernelFormat::kRows>(context, rows, cols, depth);
584   if (thread_count == 1) {
585     return SingleThreadGemm<KernelFormat, InputScalar, OutputScalar,
586                             BitDepthParams>(context, kernel, lhs, rhs, result,
587                                             lhs_offset, rhs_offset,
588                                             output_pipeline);
589   }
590   assert(thread_count > 1);
591 
592   // We choose to use a worker thread for all but one
593   // of the thread workloads. The remaining thread workload will be
594   // executed immediately on the current thread.
595   // In this way, the total number of threads (1 master, N-1 workers)
596   // equals the value returned by HowManyThread. This simple
597   // 1:1 mapping of threads to physical cores, is very important
598   // to getting good multithreaded performance especially for
599   // not-very-large GEMMs, and especially on Android.
600   const int workers_count = thread_count - 1;
601 
602   Allocator* allocator = context->allocator();
603   WorkersPool* workers_pool = context->workers_pool();
604 
605   workers_pool->CreateWorkers(workers_count);
606 
607   BlockParams block_params;
608   block_params.Init<KernelFormat>(rows, cols, depth, workers_count);
609 
610   PackedSideBlock<typename KernelFormat::Rhs> packed_rhs(
611       Side::Rhs, allocator, block_params);
612   allocator->Commit();
613 
614   // We loop over large blocks of the RHS.
615   for (int c = 0; c < cols; c += block_params.l2_cols) {
616     int cs = std::min(block_params.l2_cols, cols - c);
617 
618     // Pack a large block of the RHS.
619     PackRhs<BitDepthParams>(&packed_rhs, rhs.block(0, c, depth, cs));
620 
621     // Give work to each worker.
622     int next_start_row = 0;
623     workers_pool->counter_to_decrement_when_ready().Reset(workers_count);
624     for (int thread = 0; thread < thread_count; thread++) {
625       int start_row = next_start_row;
626       next_start_row = std::min(rows, RoundUp<KernelFormat::kRows>(
627                                           rows * (thread + 1) / thread_count));
628 
629       int block_rows = next_start_row - start_row;
630       auto lhs_block = lhs.block(start_row, 0, block_rows, depth);
631       auto result_block = result->block(start_row, c, block_rows, cs);
632       typedef GemmWithPackedRhsTask<KernelFormat, InputScalar, OutputScalar,
633                                     BitDepthParams, LhsOrder, RhsOrder,
634                                     ResultOrder, LhsOffset, RhsOffset,
635                                     OutputPipelineType>
636           TaskType;
637       auto task = new TaskType(kernel, lhs_block, packed_rhs, &result_block,
638                                lhs_offset, rhs_offset, output_pipeline);
639       if (thread < workers_count) {
640         workers_pool->StartWorker(thread, task);
641       } else {
642         // Execute the remaining workload immediately on the current thread.
643         task->local_allocator = context->main_thread_task_allocator();
644         task->Run();
645         delete task;
646       }
647     }
648     // Wait for the workers.
649     workers_pool->counter_to_decrement_when_ready().Wait();
650   }
651 
652   allocator->Decommit();
653 }
654 
655 }  // namespace gemmlowp
656 
657 #endif  // GEMMLOWP_INTERNAL_MULTI_THREAD_GEMM_H_
658