1//
2// detail/impl/task_io_service.ipp
3// ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
4//
5// Copyright (c) 2003-2015 Christopher M. Kohlhoff (chris at kohlhoff dot com)
6//
7// Distributed under the Boost Software License, Version 1.0. (See accompanying
8// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
9//
10
11#ifndef ASIO_DETAIL_IMPL_TASK_IO_SERVICE_IPP
12#define ASIO_DETAIL_IMPL_TASK_IO_SERVICE_IPP
13
14
15#include "asio/detail/config.hpp"
16
17
18#include "asio/detail/event.hpp"
19#include "asio/detail/limits.hpp"
20#include "asio/detail/reactor.hpp"
21#include "asio/detail/task_io_service.hpp"
22#include "asio/detail/task_io_service_thread_info.hpp"
23
24#include "asio/detail/push_options.hpp"
25
26namespace asio {
27namespace detail {
28
29struct task_io_service::task_cleanup
30{
31  ~task_cleanup()
32  {
33    if (this_thread_->private_outstanding_work > 0)
34    {
35      asio::detail::increment(
36          task_io_service_->outstanding_work_,
37          this_thread_->private_outstanding_work);
38    }
39    this_thread_->private_outstanding_work = 0;
40
41    // Enqueue the completed operations and reinsert the task at the end of
42    // the operation queue.
43    lock_->lock();
44    task_io_service_->task_interrupted_ = true;
45    task_io_service_->op_queue_.push(this_thread_->private_op_queue);
46    task_io_service_->op_queue_.push(&task_io_service_->task_operation_);
47  }
48
49  task_io_service* task_io_service_;
50  mutex::scoped_lock* lock_;
51  thread_info* this_thread_;
52};
53
54struct task_io_service::work_cleanup
55{
56  ~work_cleanup()
57  {
58    if (this_thread_->private_outstanding_work > 1)
59    {
60      asio::detail::increment(
61          task_io_service_->outstanding_work_,
62          this_thread_->private_outstanding_work - 1);
63    }
64    else if (this_thread_->private_outstanding_work < 1)
65    {
66      task_io_service_->work_finished();
67    }
68    this_thread_->private_outstanding_work = 0;
69
70    if (!this_thread_->private_op_queue.empty())
71    {
72      lock_->lock();
73      task_io_service_->op_queue_.push(this_thread_->private_op_queue);
74    }
75  }
76
77  task_io_service* task_io_service_;
78  mutex::scoped_lock* lock_;
79  thread_info* this_thread_;
80};
81
82task_io_service::task_io_service(
83    asio::io_service& io_service, std::size_t concurrency_hint)
84  : asio::detail::service_base<task_io_service>(io_service),
85    one_thread_(concurrency_hint == 1),
86    mutex_(),
87    task_(0),
88    task_interrupted_(true),
89    outstanding_work_(0),
90    stopped_(false),
91    shutdown_(false)
92{
93  ASIO_HANDLER_TRACKING_INIT;
94}
95
96void task_io_service::shutdown_service()
97{
98  mutex::scoped_lock lock(mutex_);
99  shutdown_ = true;
100  lock.unlock();
101
102  // Destroy handler objects.
103  while (!op_queue_.empty())
104  {
105    operation* o = op_queue_.front();
106    op_queue_.pop();
107    if (o != &task_operation_)
108      o->destroy();
109  }
110
111  // Reset to initial state.
112  task_ = 0;
113}
114
115void task_io_service::init_task()
116{
117  mutex::scoped_lock lock(mutex_);
118  if (!shutdown_ && !task_)
119  {
120    task_ = &use_service<reactor>(this->get_io_service());
121    op_queue_.push(&task_operation_);
122    wake_one_thread_and_unlock(lock);
123  }
124}
125
126std::size_t task_io_service::run(asio::error_code& ec)
127{
128  ec = asio::error_code();
129  if (outstanding_work_ == 0)
130  {
131    stop();
132    return 0;
133  }
134
135  thread_info this_thread;
136  this_thread.private_outstanding_work = 0;
137  thread_call_stack::context ctx(this, this_thread);
138
139  mutex::scoped_lock lock(mutex_);
140
141  std::size_t n = 0;
142  for (; do_run_one(lock, this_thread, ec); lock.lock())
143    if (n != (std::numeric_limits<std::size_t>::max)())
144      ++n;
145  return n;
146}
147
148std::size_t task_io_service::run_one(asio::error_code& ec)
149{
150  ec = asio::error_code();
151  if (outstanding_work_ == 0)
152  {
153    stop();
154    return 0;
155  }
156
157  thread_info this_thread;
158  this_thread.private_outstanding_work = 0;
159  thread_call_stack::context ctx(this, this_thread);
160
161  mutex::scoped_lock lock(mutex_);
162
163  return do_run_one(lock, this_thread, ec);
164}
165
166std::size_t task_io_service::poll(asio::error_code& ec)
167{
168  ec = asio::error_code();
169  if (outstanding_work_ == 0)
170  {
171    stop();
172    return 0;
173  }
174
175  thread_info this_thread;
176  this_thread.private_outstanding_work = 0;
177  thread_call_stack::context ctx(this, this_thread);
178
179  mutex::scoped_lock lock(mutex_);
180
181  // We want to support nested calls to poll() and poll_one(), so any handlers
182  // that are already on a thread-private queue need to be put on to the main
183  // queue now.
184  if (one_thread_)
185    if (thread_info* outer_thread_info = ctx.next_by_key())
186      op_queue_.push(outer_thread_info->private_op_queue);
187
188  std::size_t n = 0;
189  for (; do_poll_one(lock, this_thread, ec); lock.lock())
190    if (n != (std::numeric_limits<std::size_t>::max)())
191      ++n;
192  return n;
193}
194
195std::size_t task_io_service::poll_one(asio::error_code& ec)
196{
197  ec = asio::error_code();
198  if (outstanding_work_ == 0)
199  {
200    stop();
201    return 0;
202  }
203
204  thread_info this_thread;
205  this_thread.private_outstanding_work = 0;
206  thread_call_stack::context ctx(this, this_thread);
207
208  mutex::scoped_lock lock(mutex_);
209
210  // We want to support nested calls to poll() and poll_one(), so any handlers
211  // that are already on a thread-private queue need to be put on to the main
212  // queue now.
213  if (one_thread_)
214    if (thread_info* outer_thread_info = ctx.next_by_key())
215      op_queue_.push(outer_thread_info->private_op_queue);
216
217  return do_poll_one(lock, this_thread, ec);
218}
219
220void task_io_service::stop()
221{
222  mutex::scoped_lock lock(mutex_);
223  stop_all_threads(lock);
224}
225
226bool task_io_service::stopped() const
227{
228  mutex::scoped_lock lock(mutex_);
229  return stopped_;
230}
231
232void task_io_service::reset()
233{
234  mutex::scoped_lock lock(mutex_);
235  stopped_ = false;
236}
237
238void task_io_service::post_immediate_completion(
239    task_io_service::operation* op, bool is_continuation)
240{
241  if (one_thread_ || is_continuation)
242  {
243    if (thread_info* this_thread = thread_call_stack::contains(this))
244    {
245      ++this_thread->private_outstanding_work;
246      this_thread->private_op_queue.push(op);
247      return;
248    }
249  }
250
251  work_started();
252  mutex::scoped_lock lock(mutex_);
253  op_queue_.push(op);
254  wake_one_thread_and_unlock(lock);
255}
256
257void task_io_service::post_deferred_completion(task_io_service::operation* op)
258{
259  if (one_thread_)
260  {
261    if (thread_info* this_thread = thread_call_stack::contains(this))
262    {
263      this_thread->private_op_queue.push(op);
264      return;
265    }
266  }
267
268  mutex::scoped_lock lock(mutex_);
269  op_queue_.push(op);
270  wake_one_thread_and_unlock(lock);
271}
272
273void task_io_service::post_deferred_completions(
274    op_queue<task_io_service::operation>& ops)
275{
276  if (!ops.empty())
277  {
278    if (one_thread_)
279    {
280      if (thread_info* this_thread = thread_call_stack::contains(this))
281      {
282        this_thread->private_op_queue.push(ops);
283        return;
284      }
285    }
286
287    mutex::scoped_lock lock(mutex_);
288    op_queue_.push(ops);
289    wake_one_thread_and_unlock(lock);
290  }
291}
292
293void task_io_service::do_dispatch(
294    task_io_service::operation* op)
295{
296  work_started();
297  mutex::scoped_lock lock(mutex_);
298  op_queue_.push(op);
299  wake_one_thread_and_unlock(lock);
300}
301
302void task_io_service::abandon_operations(
303    op_queue<task_io_service::operation>& ops)
304{
305  op_queue<task_io_service::operation> ops2;
306  ops2.push(ops);
307}
308
309std::size_t task_io_service::do_run_one(mutex::scoped_lock& lock,
310    task_io_service::thread_info& this_thread,
311    const asio::error_code& ec)
312{
313  while (!stopped_)
314  {
315    if (!op_queue_.empty())
316    {
317      // Prepare to execute first handler from queue.
318      operation* o = op_queue_.front();
319      op_queue_.pop();
320      bool more_handlers = (!op_queue_.empty());
321
322      if (o == &task_operation_)
323      {
324        task_interrupted_ = more_handlers;
325
326        if (more_handlers && !one_thread_)
327          wakeup_event_.unlock_and_signal_one(lock);
328        else
329          lock.unlock();
330
331        task_cleanup on_exit = { this, &lock, &this_thread };
332        (void)on_exit;
333
334        // Run the task. May throw an exception. Only block if the operation
335        // queue is empty and we're not polling, otherwise we want to return
336        // as soon as possible.
337        task_->run(!more_handlers, this_thread.private_op_queue);
338      }
339      else
340      {
341        std::size_t task_result = o->task_result_;
342
343        if (more_handlers && !one_thread_)
344          wake_one_thread_and_unlock(lock);
345        else
346          lock.unlock();
347
348        // Ensure the count of outstanding work is decremented on block exit.
349        work_cleanup on_exit = { this, &lock, &this_thread };
350        (void)on_exit;
351
352        // Complete the operation. May throw an exception. Deletes the object.
353        o->complete(*this, ec, task_result);
354
355        return 1;
356      }
357    }
358    else
359    {
360      wakeup_event_.clear(lock);
361      wakeup_event_.wait(lock);
362    }
363  }
364
365  return 0;
366}
367
368std::size_t task_io_service::do_poll_one(mutex::scoped_lock& lock,
369    task_io_service::thread_info& this_thread,
370    const asio::error_code& ec)
371{
372  if (stopped_)
373    return 0;
374
375  operation* o = op_queue_.front();
376  if (o == &task_operation_)
377  {
378    op_queue_.pop();
379    lock.unlock();
380
381    {
382      task_cleanup c = { this, &lock, &this_thread };
383      (void)c;
384
385      // Run the task. May throw an exception. Only block if the operation
386      // queue is empty and we're not polling, otherwise we want to return
387      // as soon as possible.
388      task_->run(false, this_thread.private_op_queue);
389    }
390
391    o = op_queue_.front();
392    if (o == &task_operation_)
393    {
394      wakeup_event_.maybe_unlock_and_signal_one(lock);
395      return 0;
396    }
397  }
398
399  if (o == 0)
400    return 0;
401
402  op_queue_.pop();
403  bool more_handlers = (!op_queue_.empty());
404
405  std::size_t task_result = o->task_result_;
406
407  if (more_handlers && !one_thread_)
408    wake_one_thread_and_unlock(lock);
409  else
410    lock.unlock();
411
412  // Ensure the count of outstanding work is decremented on block exit.
413  work_cleanup on_exit = { this, &lock, &this_thread };
414  (void)on_exit;
415
416  // Complete the operation. May throw an exception. Deletes the object.
417  o->complete(*this, ec, task_result);
418
419  return 1;
420}
421
422void task_io_service::stop_all_threads(
423    mutex::scoped_lock& lock)
424{
425  stopped_ = true;
426  wakeup_event_.signal_all(lock);
427
428  if (!task_interrupted_ && task_)
429  {
430    task_interrupted_ = true;
431    task_->interrupt();
432  }
433}
434
435void task_io_service::wake_one_thread_and_unlock(
436    mutex::scoped_lock& lock)
437{
438  if (!wakeup_event_.maybe_unlock_and_signal_one(lock))
439  {
440    if (!task_interrupted_ && task_)
441    {
442      task_interrupted_ = true;
443      task_->interrupt();
444    }
445    lock.unlock();
446  }
447}
448
449} // namespace detail
450} // namespace asio
451
452#include "asio/detail/pop_options.hpp"
453
454
455#endif // ASIO_DETAIL_IMPL_TASK_IO_SERVICE_IPP
456