• Home
  • History
  • Annotate
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1  // Copyright 2016 the V8 project authors. All rights reserved.
2  // Use of this source code is governed by a BSD-style license that can be
3  // found in the LICENSE file.
4  
5  #ifndef V8_HEAP_PAGE_PARALLEL_JOB_
6  #define V8_HEAP_PAGE_PARALLEL_JOB_
7  
8  #include "src/allocation.h"
9  #include "src/cancelable-task.h"
10  #include "src/utils.h"
11  #include "src/v8.h"
12  
13  namespace v8 {
14  namespace internal {
15  
16  class Heap;
17  class Isolate;
18  
19  // This class manages background tasks that process set of pages in parallel.
20  // The JobTraits class needs to define:
21  // - PerPageData type - state associated with each page.
22  // - PerTaskData type - state associated with each task.
23  // - static bool ProcessPageInParallel(Heap* heap,
24  //                                     PerTaskData task_data,
25  //                                     MemoryChunk* page,
26  //                                     PerPageData page_data)
27  //   The function should return true iff processing succeeded.
28  // - static const bool NeedSequentialFinalization
29  // - static void FinalizePageSequentially(Heap* heap,
30  //                                        bool processing_succeeded,
31  //                                        MemoryChunk* page,
32  //                                        PerPageData page_data)
33  template <typename JobTraits>
34  class PageParallelJob {
35   public:
36    // PageParallelJob cannot dynamically create a semaphore because of a bug in
37    // glibc. See http://crbug.com/609249 and
38    // https://sourceware.org/bugzilla/show_bug.cgi?id=12674.
39    // The caller must provide a semaphore with value 0 and ensure that
40    // the lifetime of the semaphore is the same as the lifetime of the Isolate.
41    // It is guaranteed that the semaphore value will be 0 after Run() call.
PageParallelJob(Heap * heap,CancelableTaskManager * cancelable_task_manager,base::Semaphore * semaphore)42    PageParallelJob(Heap* heap, CancelableTaskManager* cancelable_task_manager,
43                    base::Semaphore* semaphore)
44        : heap_(heap),
45          cancelable_task_manager_(cancelable_task_manager),
46          items_(nullptr),
47          num_items_(0),
48          num_tasks_(0),
49          pending_tasks_(semaphore) {}
50  
~PageParallelJob()51    ~PageParallelJob() {
52      Item* item = items_;
53      while (item != nullptr) {
54        Item* next = item->next;
55        delete item;
56        item = next;
57      }
58    }
59  
AddPage(MemoryChunk * chunk,typename JobTraits::PerPageData data)60    void AddPage(MemoryChunk* chunk, typename JobTraits::PerPageData data) {
61      Item* item = new Item(chunk, data, items_);
62      items_ = item;
63      ++num_items_;
64    }
65  
NumberOfPages()66    int NumberOfPages() { return num_items_; }
67  
68    // Returns the number of tasks that were spawned when running the job.
NumberOfTasks()69    int NumberOfTasks() { return num_tasks_; }
70  
71    // Runs the given number of tasks in parallel and processes the previously
72    // added pages. This function blocks until all tasks finish.
73    // The callback takes the index of a task and returns data for that task.
74    template <typename Callback>
Run(int num_tasks,Callback per_task_data_callback)75    void Run(int num_tasks, Callback per_task_data_callback) {
76      if (num_items_ == 0) return;
77      DCHECK_GE(num_tasks, 1);
78      uint32_t task_ids[kMaxNumberOfTasks];
79      const int max_num_tasks = Min(
80          kMaxNumberOfTasks,
81          static_cast<int>(
82              V8::GetCurrentPlatform()->NumberOfAvailableBackgroundThreads()));
83      num_tasks_ = Max(1, Min(num_tasks, max_num_tasks));
84      int items_per_task = (num_items_ + num_tasks_ - 1) / num_tasks_;
85      int start_index = 0;
86      Task* main_task = nullptr;
87      for (int i = 0; i < num_tasks_; i++, start_index += items_per_task) {
88        if (start_index >= num_items_) {
89          start_index -= num_items_;
90        }
91        Task* task = new Task(heap_, items_, num_items_, start_index,
92                              pending_tasks_, per_task_data_callback(i));
93        task_ids[i] = task->id();
94        if (i > 0) {
95          V8::GetCurrentPlatform()->CallOnBackgroundThread(
96              task, v8::Platform::kShortRunningTask);
97        } else {
98          main_task = task;
99        }
100      }
101      // Contribute on main thread.
102      main_task->Run();
103      delete main_task;
104      // Wait for background tasks.
105      for (int i = 0; i < num_tasks_; i++) {
106        if (cancelable_task_manager_->TryAbort(task_ids[i]) !=
107            CancelableTaskManager::kTaskAborted) {
108          pending_tasks_->Wait();
109        }
110      }
111      if (JobTraits::NeedSequentialFinalization) {
112        Item* item = items_;
113        while (item != nullptr) {
114          bool success = (item->state.Value() == kFinished);
115          JobTraits::FinalizePageSequentially(heap_, item->chunk, success,
116                                              item->data);
117          item = item->next;
118        }
119      }
120    }
121  
122   private:
123    static const int kMaxNumberOfTasks = 10;
124  
125    enum ProcessingState { kAvailable, kProcessing, kFinished, kFailed };
126  
127    struct Item : public Malloced {
ItemItem128      Item(MemoryChunk* chunk, typename JobTraits::PerPageData data, Item* next)
129          : chunk(chunk), state(kAvailable), data(data), next(next) {}
130      MemoryChunk* chunk;
131      base::AtomicValue<ProcessingState> state;
132      typename JobTraits::PerPageData data;
133      Item* next;
134    };
135  
136    class Task : public CancelableTask {
137     public:
Task(Heap * heap,Item * items,int num_items,int start_index,base::Semaphore * on_finish,typename JobTraits::PerTaskData data)138      Task(Heap* heap, Item* items, int num_items, int start_index,
139           base::Semaphore* on_finish, typename JobTraits::PerTaskData data)
140          : CancelableTask(heap->isolate()),
141            heap_(heap),
142            items_(items),
143            num_items_(num_items),
144            start_index_(start_index),
145            on_finish_(on_finish),
146            data_(data) {}
147  
~Task()148      virtual ~Task() {}
149  
150     private:
151      // v8::internal::CancelableTask overrides.
RunInternal()152      void RunInternal() override {
153        // Each task starts at a different index to improve parallelization.
154        Item* current = items_;
155        int skip = start_index_;
156        while (skip-- > 0) {
157          current = current->next;
158        }
159        for (int i = 0; i < num_items_; i++) {
160          if (current->state.TrySetValue(kAvailable, kProcessing)) {
161            bool success = JobTraits::ProcessPageInParallel(
162                heap_, data_, current->chunk, current->data);
163            current->state.SetValue(success ? kFinished : kFailed);
164          }
165          current = current->next;
166          // Wrap around if needed.
167          if (current == nullptr) {
168            current = items_;
169          }
170        }
171        on_finish_->Signal();
172      }
173  
174      Heap* heap_;
175      Item* items_;
176      int num_items_;
177      int start_index_;
178      base::Semaphore* on_finish_;
179      typename JobTraits::PerTaskData data_;
180      DISALLOW_COPY_AND_ASSIGN(Task);
181    };
182  
183    Heap* heap_;
184    CancelableTaskManager* cancelable_task_manager_;
185    Item* items_;
186    int num_items_;
187    int num_tasks_;
188    base::Semaphore* pending_tasks_;
189    DISALLOW_COPY_AND_ASSIGN(PageParallelJob);
190  };
191  
192  }  // namespace internal
193  }  // namespace v8
194  
195  #endif  // V8_HEAP_PAGE_PARALLEL_JOB_
196