// Copyright 2019 The Chromium Authors. All rights reserved. // Use of this source code is governed by a BSD-style license that can be // found in the LICENSE file. #include "platform/impl/socket_handle_waiter.h" #include #include #include "absl/algorithm/container.h" #include "util/osp_logging.h" namespace openscreen { SocketHandleWaiter::SocketHandleWaiter(ClockNowFunctionPtr now_function) : now_function_(now_function) {} void SocketHandleWaiter::Subscribe(Subscriber* subscriber, SocketHandleRef handle) { std::lock_guard lock(mutex_); if (handle_mappings_.find(handle) == handle_mappings_.end()) { handle_mappings_.emplace(handle, SocketSubscription{subscriber}); } } void SocketHandleWaiter::Unsubscribe(Subscriber* subscriber, SocketHandleRef handle) { std::lock_guard lock(mutex_); auto iterator = handle_mappings_.find(handle); if (handle_mappings_.find(handle) != handle_mappings_.end()) { handle_mappings_.erase(iterator); } } void SocketHandleWaiter::UnsubscribeAll(Subscriber* subscriber) { std::lock_guard lock(mutex_); for (auto it = handle_mappings_.begin(); it != handle_mappings_.end();) { if (it->second.subscriber == subscriber) { it = handle_mappings_.erase(it); } else { it++; } } } void SocketHandleWaiter::OnHandleDeletion(Subscriber* subscriber, SocketHandleRef handle, bool disable_locking_for_testing) { std::unique_lock lock(mutex_); auto it = handle_mappings_.find(handle); if (it != handle_mappings_.end()) { handle_mappings_.erase(it); if (!disable_locking_for_testing) { handles_being_deleted_.push_back(handle); OSP_DVLOG << "Starting to block for handle deletion"; // This code will allow us to block completion of the socket destructor // (and subsequent invalidation of pointers to this socket) until we no // longer are waiting on a SELECT(...) call to it, since we only signal // this condition variable's wait(...) to proceed outside of SELECT(...). handle_deletion_block_.wait(lock, [this, handle]() { return std::find(handles_being_deleted_.begin(), handles_being_deleted_.end(), handle) == handles_being_deleted_.end(); }); OSP_DVLOG << "\tDone blocking for handle deletion!"; } } } void SocketHandleWaiter::ProcessReadyHandles( std::vector* handles, Clock::duration timeout) { if (handles->empty()) { return; } Clock::time_point start_time = now_function_(); // Process the stalest handles one by one until we hit our timeout. do { Clock::time_point oldest_time = Clock::time_point::max(); HandleWithSubscription& oldest_handle = handles->at(0); for (HandleWithSubscription& handle : *handles) { // Skip already processed handles. if (handle.subscription->last_updated >= start_time) { continue; } // Select the oldest handle. if (handle.subscription->last_updated < oldest_time) { oldest_time = handle.subscription->last_updated; oldest_handle = handle; } } // Already processed all handles. if (oldest_time == Clock::time_point::max()) { return; } // Process the oldest handle. oldest_handle.subscription->last_updated = now_function_(); oldest_handle.subscription->subscriber->ProcessReadyHandle( oldest_handle.ready_handle.handle, oldest_handle.ready_handle.flags); } while (now_function_() - start_time <= timeout); } Error SocketHandleWaiter::ProcessHandles(Clock::duration timeout) { Clock::time_point start_time = now_function_(); std::vector handles; { std::lock_guard lock(mutex_); handles_being_deleted_.clear(); handle_deletion_block_.notify_all(); handles.reserve(handle_mappings_.size()); for (const auto& pair : handle_mappings_) { handles.push_back(pair.first); } } Clock::time_point current_time = now_function_(); Clock::duration remaining_timeout = timeout - (current_time - start_time); ErrorOr> changed_handles = AwaitSocketsReadable(handles, remaining_timeout); std::vector ready_handles; { std::lock_guard lock(mutex_); handles_being_deleted_.clear(); handle_deletion_block_.notify_all(); if (changed_handles) { auto& ch = changed_handles.value(); ready_handles.reserve(ch.size()); for (const auto& handle : ch) { auto mapping_it = handle_mappings_.find(handle.handle); if (mapping_it != handle_mappings_.end()) { ready_handles.push_back( HandleWithSubscription{handle, &(mapping_it->second)}); } } } if (changed_handles.is_error()) { return changed_handles.error(); } current_time = now_function_(); remaining_timeout = timeout - (current_time - start_time); ProcessReadyHandles(&ready_handles, remaining_timeout); } return Error::None(); } } // namespace openscreen