1 // Copyright 2016 the V8 project authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4 
5 #ifndef V8_HEAP_PAGE_PARALLEL_JOB_
6 #define V8_HEAP_PAGE_PARALLEL_JOB_
7 
8 #include "src/allocation.h"
9 #include "src/cancelable-task.h"
10 #include "src/utils.h"
11 #include "src/v8.h"
12 
13 namespace v8 {
14 namespace internal {
15 
16 class Heap;
17 class Isolate;
18 
19 // This class manages background tasks that process set of pages in parallel.
20 // The JobTraits class needs to define:
21 // - PerPageData type - state associated with each page.
22 // - PerTaskData type - state associated with each task.
23 // - static bool ProcessPageInParallel(Heap* heap,
24 //                                     PerTaskData task_data,
25 //                                     MemoryChunk* page,
26 //                                     PerPageData page_data)
27 //   The function should return true iff processing succeeded.
28 // - static const bool NeedSequentialFinalization
29 // - static void FinalizePageSequentially(Heap* heap,
30 //                                        bool processing_succeeded,
31 //                                        MemoryChunk* page,
32 //                                        PerPageData page_data)
33 template <typename JobTraits>
34 class PageParallelJob {
35  public:
36   // PageParallelJob cannot dynamically create a semaphore because of a bug in
37   // glibc. See http://crbug.com/609249 and
38   // https://sourceware.org/bugzilla/show_bug.cgi?id=12674.
39   // The caller must provide a semaphore with value 0 and ensure that
40   // the lifetime of the semaphore is the same as the lifetime of the Isolate.
41   // It is guaranteed that the semaphore value will be 0 after Run() call.
PageParallelJob(Heap * heap,CancelableTaskManager * cancelable_task_manager,base::Semaphore * semaphore)42   PageParallelJob(Heap* heap, CancelableTaskManager* cancelable_task_manager,
43                   base::Semaphore* semaphore)
44       : heap_(heap),
45         cancelable_task_manager_(cancelable_task_manager),
46         items_(nullptr),
47         num_items_(0),
48         num_tasks_(0),
49         pending_tasks_(semaphore) {}
50 
~PageParallelJob()51   ~PageParallelJob() {
52     Item* item = items_;
53     while (item != nullptr) {
54       Item* next = item->next;
55       delete item;
56       item = next;
57     }
58   }
59 
AddPage(MemoryChunk * chunk,typename JobTraits::PerPageData data)60   void AddPage(MemoryChunk* chunk, typename JobTraits::PerPageData data) {
61     Item* item = new Item(chunk, data, items_);
62     items_ = item;
63     ++num_items_;
64   }
65 
NumberOfPages()66   int NumberOfPages() { return num_items_; }
67 
68   // Returns the number of tasks that were spawned when running the job.
NumberOfTasks()69   int NumberOfTasks() { return num_tasks_; }
70 
71   // Runs the given number of tasks in parallel and processes the previously
72   // added pages. This function blocks until all tasks finish.
73   // The callback takes the index of a task and returns data for that task.
74   template <typename Callback>
Run(int num_tasks,Callback per_task_data_callback)75   void Run(int num_tasks, Callback per_task_data_callback) {
76     if (num_items_ == 0) return;
77     DCHECK_GE(num_tasks, 1);
78     uint32_t task_ids[kMaxNumberOfTasks];
79     const int max_num_tasks = Min(
80         kMaxNumberOfTasks,
81         static_cast<int>(
82             V8::GetCurrentPlatform()->NumberOfAvailableBackgroundThreads()));
83     num_tasks_ = Max(1, Min(num_tasks, max_num_tasks));
84     int items_per_task = (num_items_ + num_tasks_ - 1) / num_tasks_;
85     int start_index = 0;
86     Task* main_task = nullptr;
87     for (int i = 0; i < num_tasks_; i++, start_index += items_per_task) {
88       if (start_index >= num_items_) {
89         start_index -= num_items_;
90       }
91       Task* task = new Task(heap_, items_, num_items_, start_index,
92                             pending_tasks_, per_task_data_callback(i));
93       task_ids[i] = task->id();
94       if (i > 0) {
95         V8::GetCurrentPlatform()->CallOnBackgroundThread(
96             task, v8::Platform::kShortRunningTask);
97       } else {
98         main_task = task;
99       }
100     }
101     // Contribute on main thread.
102     main_task->Run();
103     delete main_task;
104     // Wait for background tasks.
105     for (int i = 0; i < num_tasks_; i++) {
106       if (cancelable_task_manager_->TryAbort(task_ids[i]) !=
107           CancelableTaskManager::kTaskAborted) {
108         pending_tasks_->Wait();
109       }
110     }
111     if (JobTraits::NeedSequentialFinalization) {
112       Item* item = items_;
113       while (item != nullptr) {
114         bool success = (item->state.Value() == kFinished);
115         JobTraits::FinalizePageSequentially(heap_, item->chunk, success,
116                                             item->data);
117         item = item->next;
118       }
119     }
120   }
121 
122  private:
123   static const int kMaxNumberOfTasks = 10;
124 
125   enum ProcessingState { kAvailable, kProcessing, kFinished, kFailed };
126 
127   struct Item : public Malloced {
ItemItem128     Item(MemoryChunk* chunk, typename JobTraits::PerPageData data, Item* next)
129         : chunk(chunk), state(kAvailable), data(data), next(next) {}
130     MemoryChunk* chunk;
131     base::AtomicValue<ProcessingState> state;
132     typename JobTraits::PerPageData data;
133     Item* next;
134   };
135 
136   class Task : public CancelableTask {
137    public:
Task(Heap * heap,Item * items,int num_items,int start_index,base::Semaphore * on_finish,typename JobTraits::PerTaskData data)138     Task(Heap* heap, Item* items, int num_items, int start_index,
139          base::Semaphore* on_finish, typename JobTraits::PerTaskData data)
140         : CancelableTask(heap->isolate()),
141           heap_(heap),
142           items_(items),
143           num_items_(num_items),
144           start_index_(start_index),
145           on_finish_(on_finish),
146           data_(data) {}
147 
~Task()148     virtual ~Task() {}
149 
150    private:
151     // v8::internal::CancelableTask overrides.
RunInternal()152     void RunInternal() override {
153       // Each task starts at a different index to improve parallelization.
154       Item* current = items_;
155       int skip = start_index_;
156       while (skip-- > 0) {
157         current = current->next;
158       }
159       for (int i = 0; i < num_items_; i++) {
160         if (current->state.TrySetValue(kAvailable, kProcessing)) {
161           bool success = JobTraits::ProcessPageInParallel(
162               heap_, data_, current->chunk, current->data);
163           current->state.SetValue(success ? kFinished : kFailed);
164         }
165         current = current->next;
166         // Wrap around if needed.
167         if (current == nullptr) {
168           current = items_;
169         }
170       }
171       on_finish_->Signal();
172     }
173 
174     Heap* heap_;
175     Item* items_;
176     int num_items_;
177     int start_index_;
178     base::Semaphore* on_finish_;
179     typename JobTraits::PerTaskData data_;
180     DISALLOW_COPY_AND_ASSIGN(Task);
181   };
182 
183   Heap* heap_;
184   CancelableTaskManager* cancelable_task_manager_;
185   Item* items_;
186   int num_items_;
187   int num_tasks_;
188   base::Semaphore* pending_tasks_;
189   DISALLOW_COPY_AND_ASSIGN(PageParallelJob);
190 };
191 
192 }  // namespace internal
193 }  // namespace v8
194 
195 #endif  // V8_HEAP_PAGE_PARALLEL_JOB_
196