1// 2// detail/impl/task_io_service.ipp 3// ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ 4// 5// Copyright (c) 2003-2015 Christopher M. Kohlhoff (chris at kohlhoff dot com) 6// 7// Distributed under the Boost Software License, Version 1.0. (See accompanying 8// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt) 9// 10 11#ifndef ASIO_DETAIL_IMPL_TASK_IO_SERVICE_IPP 12#define ASIO_DETAIL_IMPL_TASK_IO_SERVICE_IPP 13 14 15#include "asio/detail/config.hpp" 16 17 18#include "asio/detail/event.hpp" 19#include "asio/detail/limits.hpp" 20#include "asio/detail/reactor.hpp" 21#include "asio/detail/task_io_service.hpp" 22#include "asio/detail/task_io_service_thread_info.hpp" 23 24#include "asio/detail/push_options.hpp" 25 26namespace asio { 27namespace detail { 28 29struct task_io_service::task_cleanup 30{ 31 ~task_cleanup() 32 { 33 if (this_thread_->private_outstanding_work > 0) 34 { 35 asio::detail::increment( 36 task_io_service_->outstanding_work_, 37 this_thread_->private_outstanding_work); 38 } 39 this_thread_->private_outstanding_work = 0; 40 41 // Enqueue the completed operations and reinsert the task at the end of 42 // the operation queue. 43 lock_->lock(); 44 task_io_service_->task_interrupted_ = true; 45 task_io_service_->op_queue_.push(this_thread_->private_op_queue); 46 task_io_service_->op_queue_.push(&task_io_service_->task_operation_); 47 } 48 49 task_io_service* task_io_service_; 50 mutex::scoped_lock* lock_; 51 thread_info* this_thread_; 52}; 53 54struct task_io_service::work_cleanup 55{ 56 ~work_cleanup() 57 { 58 if (this_thread_->private_outstanding_work > 1) 59 { 60 asio::detail::increment( 61 task_io_service_->outstanding_work_, 62 this_thread_->private_outstanding_work - 1); 63 } 64 else if (this_thread_->private_outstanding_work < 1) 65 { 66 task_io_service_->work_finished(); 67 } 68 this_thread_->private_outstanding_work = 0; 69 70 if (!this_thread_->private_op_queue.empty()) 71 { 72 lock_->lock(); 73 task_io_service_->op_queue_.push(this_thread_->private_op_queue); 74 } 75 } 76 77 task_io_service* task_io_service_; 78 mutex::scoped_lock* lock_; 79 thread_info* this_thread_; 80}; 81 82task_io_service::task_io_service( 83 asio::io_service& io_service, std::size_t concurrency_hint) 84 : asio::detail::service_base<task_io_service>(io_service), 85 one_thread_(concurrency_hint == 1), 86 mutex_(), 87 task_(0), 88 task_interrupted_(true), 89 outstanding_work_(0), 90 stopped_(false), 91 shutdown_(false) 92{ 93 ASIO_HANDLER_TRACKING_INIT; 94} 95 96void task_io_service::shutdown_service() 97{ 98 mutex::scoped_lock lock(mutex_); 99 shutdown_ = true; 100 lock.unlock(); 101 102 // Destroy handler objects. 103 while (!op_queue_.empty()) 104 { 105 operation* o = op_queue_.front(); 106 op_queue_.pop(); 107 if (o != &task_operation_) 108 o->destroy(); 109 } 110 111 // Reset to initial state. 112 task_ = 0; 113} 114 115void task_io_service::init_task() 116{ 117 mutex::scoped_lock lock(mutex_); 118 if (!shutdown_ && !task_) 119 { 120 task_ = &use_service<reactor>(this->get_io_service()); 121 op_queue_.push(&task_operation_); 122 wake_one_thread_and_unlock(lock); 123 } 124} 125 126std::size_t task_io_service::run(asio::error_code& ec) 127{ 128 ec = asio::error_code(); 129 if (outstanding_work_ == 0) 130 { 131 stop(); 132 return 0; 133 } 134 135 thread_info this_thread; 136 this_thread.private_outstanding_work = 0; 137 thread_call_stack::context ctx(this, this_thread); 138 139 mutex::scoped_lock lock(mutex_); 140 141 std::size_t n = 0; 142 for (; do_run_one(lock, this_thread, ec); lock.lock()) 143 if (n != (std::numeric_limits<std::size_t>::max)()) 144 ++n; 145 return n; 146} 147 148std::size_t task_io_service::run_one(asio::error_code& ec) 149{ 150 ec = asio::error_code(); 151 if (outstanding_work_ == 0) 152 { 153 stop(); 154 return 0; 155 } 156 157 thread_info this_thread; 158 this_thread.private_outstanding_work = 0; 159 thread_call_stack::context ctx(this, this_thread); 160 161 mutex::scoped_lock lock(mutex_); 162 163 return do_run_one(lock, this_thread, ec); 164} 165 166std::size_t task_io_service::poll(asio::error_code& ec) 167{ 168 ec = asio::error_code(); 169 if (outstanding_work_ == 0) 170 { 171 stop(); 172 return 0; 173 } 174 175 thread_info this_thread; 176 this_thread.private_outstanding_work = 0; 177 thread_call_stack::context ctx(this, this_thread); 178 179 mutex::scoped_lock lock(mutex_); 180 181 // We want to support nested calls to poll() and poll_one(), so any handlers 182 // that are already on a thread-private queue need to be put on to the main 183 // queue now. 184 if (one_thread_) 185 if (thread_info* outer_thread_info = ctx.next_by_key()) 186 op_queue_.push(outer_thread_info->private_op_queue); 187 188 std::size_t n = 0; 189 for (; do_poll_one(lock, this_thread, ec); lock.lock()) 190 if (n != (std::numeric_limits<std::size_t>::max)()) 191 ++n; 192 return n; 193} 194 195std::size_t task_io_service::poll_one(asio::error_code& ec) 196{ 197 ec = asio::error_code(); 198 if (outstanding_work_ == 0) 199 { 200 stop(); 201 return 0; 202 } 203 204 thread_info this_thread; 205 this_thread.private_outstanding_work = 0; 206 thread_call_stack::context ctx(this, this_thread); 207 208 mutex::scoped_lock lock(mutex_); 209 210 // We want to support nested calls to poll() and poll_one(), so any handlers 211 // that are already on a thread-private queue need to be put on to the main 212 // queue now. 213 if (one_thread_) 214 if (thread_info* outer_thread_info = ctx.next_by_key()) 215 op_queue_.push(outer_thread_info->private_op_queue); 216 217 return do_poll_one(lock, this_thread, ec); 218} 219 220void task_io_service::stop() 221{ 222 mutex::scoped_lock lock(mutex_); 223 stop_all_threads(lock); 224} 225 226bool task_io_service::stopped() const 227{ 228 mutex::scoped_lock lock(mutex_); 229 return stopped_; 230} 231 232void task_io_service::reset() 233{ 234 mutex::scoped_lock lock(mutex_); 235 stopped_ = false; 236} 237 238void task_io_service::post_immediate_completion( 239 task_io_service::operation* op, bool is_continuation) 240{ 241 if (one_thread_ || is_continuation) 242 { 243 if (thread_info* this_thread = thread_call_stack::contains(this)) 244 { 245 ++this_thread->private_outstanding_work; 246 this_thread->private_op_queue.push(op); 247 return; 248 } 249 } 250 251 work_started(); 252 mutex::scoped_lock lock(mutex_); 253 op_queue_.push(op); 254 wake_one_thread_and_unlock(lock); 255} 256 257void task_io_service::post_deferred_completion(task_io_service::operation* op) 258{ 259 if (one_thread_) 260 { 261 if (thread_info* this_thread = thread_call_stack::contains(this)) 262 { 263 this_thread->private_op_queue.push(op); 264 return; 265 } 266 } 267 268 mutex::scoped_lock lock(mutex_); 269 op_queue_.push(op); 270 wake_one_thread_and_unlock(lock); 271} 272 273void task_io_service::post_deferred_completions( 274 op_queue<task_io_service::operation>& ops) 275{ 276 if (!ops.empty()) 277 { 278 if (one_thread_) 279 { 280 if (thread_info* this_thread = thread_call_stack::contains(this)) 281 { 282 this_thread->private_op_queue.push(ops); 283 return; 284 } 285 } 286 287 mutex::scoped_lock lock(mutex_); 288 op_queue_.push(ops); 289 wake_one_thread_and_unlock(lock); 290 } 291} 292 293void task_io_service::do_dispatch( 294 task_io_service::operation* op) 295{ 296 work_started(); 297 mutex::scoped_lock lock(mutex_); 298 op_queue_.push(op); 299 wake_one_thread_and_unlock(lock); 300} 301 302void task_io_service::abandon_operations( 303 op_queue<task_io_service::operation>& ops) 304{ 305 op_queue<task_io_service::operation> ops2; 306 ops2.push(ops); 307} 308 309std::size_t task_io_service::do_run_one(mutex::scoped_lock& lock, 310 task_io_service::thread_info& this_thread, 311 const asio::error_code& ec) 312{ 313 while (!stopped_) 314 { 315 if (!op_queue_.empty()) 316 { 317 // Prepare to execute first handler from queue. 318 operation* o = op_queue_.front(); 319 op_queue_.pop(); 320 bool more_handlers = (!op_queue_.empty()); 321 322 if (o == &task_operation_) 323 { 324 task_interrupted_ = more_handlers; 325 326 if (more_handlers && !one_thread_) 327 wakeup_event_.unlock_and_signal_one(lock); 328 else 329 lock.unlock(); 330 331 task_cleanup on_exit = { this, &lock, &this_thread }; 332 (void)on_exit; 333 334 // Run the task. May throw an exception. Only block if the operation 335 // queue is empty and we're not polling, otherwise we want to return 336 // as soon as possible. 337 task_->run(!more_handlers, this_thread.private_op_queue); 338 } 339 else 340 { 341 std::size_t task_result = o->task_result_; 342 343 if (more_handlers && !one_thread_) 344 wake_one_thread_and_unlock(lock); 345 else 346 lock.unlock(); 347 348 // Ensure the count of outstanding work is decremented on block exit. 349 work_cleanup on_exit = { this, &lock, &this_thread }; 350 (void)on_exit; 351 352 // Complete the operation. May throw an exception. Deletes the object. 353 o->complete(*this, ec, task_result); 354 355 return 1; 356 } 357 } 358 else 359 { 360 wakeup_event_.clear(lock); 361 wakeup_event_.wait(lock); 362 } 363 } 364 365 return 0; 366} 367 368std::size_t task_io_service::do_poll_one(mutex::scoped_lock& lock, 369 task_io_service::thread_info& this_thread, 370 const asio::error_code& ec) 371{ 372 if (stopped_) 373 return 0; 374 375 operation* o = op_queue_.front(); 376 if (o == &task_operation_) 377 { 378 op_queue_.pop(); 379 lock.unlock(); 380 381 { 382 task_cleanup c = { this, &lock, &this_thread }; 383 (void)c; 384 385 // Run the task. May throw an exception. Only block if the operation 386 // queue is empty and we're not polling, otherwise we want to return 387 // as soon as possible. 388 task_->run(false, this_thread.private_op_queue); 389 } 390 391 o = op_queue_.front(); 392 if (o == &task_operation_) 393 { 394 wakeup_event_.maybe_unlock_and_signal_one(lock); 395 return 0; 396 } 397 } 398 399 if (o == 0) 400 return 0; 401 402 op_queue_.pop(); 403 bool more_handlers = (!op_queue_.empty()); 404 405 std::size_t task_result = o->task_result_; 406 407 if (more_handlers && !one_thread_) 408 wake_one_thread_and_unlock(lock); 409 else 410 lock.unlock(); 411 412 // Ensure the count of outstanding work is decremented on block exit. 413 work_cleanup on_exit = { this, &lock, &this_thread }; 414 (void)on_exit; 415 416 // Complete the operation. May throw an exception. Deletes the object. 417 o->complete(*this, ec, task_result); 418 419 return 1; 420} 421 422void task_io_service::stop_all_threads( 423 mutex::scoped_lock& lock) 424{ 425 stopped_ = true; 426 wakeup_event_.signal_all(lock); 427 428 if (!task_interrupted_ && task_) 429 { 430 task_interrupted_ = true; 431 task_->interrupt(); 432 } 433} 434 435void task_io_service::wake_one_thread_and_unlock( 436 mutex::scoped_lock& lock) 437{ 438 if (!wakeup_event_.maybe_unlock_and_signal_one(lock)) 439 { 440 if (!task_interrupted_ && task_) 441 { 442 task_interrupted_ = true; 443 task_->interrupt(); 444 } 445 lock.unlock(); 446 } 447} 448 449} // namespace detail 450} // namespace asio 451 452#include "asio/detail/pop_options.hpp" 453 454 455#endif // ASIO_DETAIL_IMPL_TASK_IO_SERVICE_IPP 456