1 // Copyright 2013 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4 
5 #include "mojo/core/data_pipe_consumer_dispatcher.h"
6 
7 #include <stddef.h>
8 #include <stdint.h>
9 
10 #include <algorithm>
11 #include <limits>
12 #include <utility>
13 
14 #include "base/bind.h"
15 #include "base/logging.h"
16 #include "base/memory/ref_counted.h"
17 #include "mojo/core/core.h"
18 #include "mojo/core/data_pipe_control_message.h"
19 #include "mojo/core/node_controller.h"
20 #include "mojo/core/platform_handle_utils.h"
21 #include "mojo/core/request_context.h"
22 #include "mojo/core/user_message_impl.h"
23 #include "mojo/public/c/system/data_pipe.h"
24 
25 namespace mojo {
26 namespace core {
27 
28 namespace {
29 
30 const uint8_t kFlagPeerClosed = 0x01;
31 
32 #pragma pack(push, 1)
33 
34 struct SerializedState {
35   MojoCreateDataPipeOptions options;
36   uint64_t pipe_id;
37   uint32_t read_offset;
38   uint32_t bytes_available;
39   uint8_t flags;
40   uint64_t buffer_guid_high;
41   uint64_t buffer_guid_low;
42   char padding[7];
43 };
44 
45 static_assert(sizeof(SerializedState) % 8 == 0,
46               "Invalid SerializedState size.");
47 
48 #pragma pack(pop)
49 
50 }  // namespace
51 
52 // A PortObserver which forwards to a DataPipeConsumerDispatcher. This owns a
53 // reference to the dispatcher to ensure it lives as long as the observed port.
54 class DataPipeConsumerDispatcher::PortObserverThunk
55     : public NodeController::PortObserver {
56  public:
PortObserverThunk(scoped_refptr<DataPipeConsumerDispatcher> dispatcher)57   explicit PortObserverThunk(
58       scoped_refptr<DataPipeConsumerDispatcher> dispatcher)
59       : dispatcher_(dispatcher) {}
60 
61  private:
~PortObserverThunk()62   ~PortObserverThunk() override {}
63 
64   // NodeController::PortObserver:
OnPortStatusChanged()65   void OnPortStatusChanged() override { dispatcher_->OnPortStatusChanged(); }
66 
67   scoped_refptr<DataPipeConsumerDispatcher> dispatcher_;
68 
69   DISALLOW_COPY_AND_ASSIGN(PortObserverThunk);
70 };
71 
72 // static
Create(NodeController * node_controller,const ports::PortRef & control_port,base::UnsafeSharedMemoryRegion shared_ring_buffer,const MojoCreateDataPipeOptions & options,uint64_t pipe_id)73 scoped_refptr<DataPipeConsumerDispatcher> DataPipeConsumerDispatcher::Create(
74     NodeController* node_controller,
75     const ports::PortRef& control_port,
76     base::UnsafeSharedMemoryRegion shared_ring_buffer,
77     const MojoCreateDataPipeOptions& options,
78     uint64_t pipe_id) {
79   scoped_refptr<DataPipeConsumerDispatcher> consumer =
80       new DataPipeConsumerDispatcher(node_controller, control_port,
81                                      std::move(shared_ring_buffer), options,
82                                      pipe_id);
83   base::AutoLock lock(consumer->lock_);
84   if (!consumer->InitializeNoLock())
85     return nullptr;
86   return consumer;
87 }
88 
GetType() const89 Dispatcher::Type DataPipeConsumerDispatcher::GetType() const {
90   return Type::DATA_PIPE_CONSUMER;
91 }
92 
Close()93 MojoResult DataPipeConsumerDispatcher::Close() {
94   base::AutoLock lock(lock_);
95   DVLOG(1) << "Closing data pipe consumer " << pipe_id_;
96   return CloseNoLock();
97 }
98 
ReadData(const MojoReadDataOptions & options,void * elements,uint32_t * num_bytes)99 MojoResult DataPipeConsumerDispatcher::ReadData(
100     const MojoReadDataOptions& options,
101     void* elements,
102     uint32_t* num_bytes) {
103   base::AutoLock lock(lock_);
104 
105   if (!shared_ring_buffer_.IsValid() || in_transit_)
106     return MOJO_RESULT_INVALID_ARGUMENT;
107 
108   if (in_two_phase_read_)
109     return MOJO_RESULT_BUSY;
110 
111   const bool had_new_data = new_data_available_;
112   new_data_available_ = false;
113 
114   if ((options.flags & MOJO_READ_DATA_FLAG_QUERY)) {
115     if ((options.flags & MOJO_READ_DATA_FLAG_PEEK) ||
116         (options.flags & MOJO_READ_DATA_FLAG_DISCARD))
117       return MOJO_RESULT_INVALID_ARGUMENT;
118     DCHECK(!(options.flags & MOJO_READ_DATA_FLAG_DISCARD));  // Handled above.
119     DVLOG_IF(2, elements) << "Query mode: ignoring non-null |elements|";
120     *num_bytes = static_cast<uint32_t>(bytes_available_);
121 
122     if (had_new_data)
123       watchers_.NotifyState(GetHandleSignalsStateNoLock());
124     return MOJO_RESULT_OK;
125   }
126 
127   bool discard = false;
128   if ((options.flags & MOJO_READ_DATA_FLAG_DISCARD)) {
129     // These flags are mutally exclusive.
130     if (options.flags & MOJO_READ_DATA_FLAG_PEEK)
131       return MOJO_RESULT_INVALID_ARGUMENT;
132     DVLOG_IF(2, elements) << "Discard mode: ignoring non-null |elements|";
133     discard = true;
134   }
135 
136   uint32_t max_num_bytes_to_read = *num_bytes;
137   if (max_num_bytes_to_read % options_.element_num_bytes != 0)
138     return MOJO_RESULT_INVALID_ARGUMENT;
139 
140   bool all_or_none = options.flags & MOJO_READ_DATA_FLAG_ALL_OR_NONE;
141   uint32_t min_num_bytes_to_read = all_or_none ? max_num_bytes_to_read : 0;
142 
143   if (min_num_bytes_to_read > bytes_available_) {
144     if (had_new_data)
145       watchers_.NotifyState(GetHandleSignalsStateNoLock());
146     return peer_closed_ ? MOJO_RESULT_FAILED_PRECONDITION
147                         : MOJO_RESULT_OUT_OF_RANGE;
148   }
149 
150   uint32_t bytes_to_read = std::min(max_num_bytes_to_read, bytes_available_);
151   if (bytes_to_read == 0) {
152     if (had_new_data)
153       watchers_.NotifyState(GetHandleSignalsStateNoLock());
154     return peer_closed_ ? MOJO_RESULT_FAILED_PRECONDITION
155                         : MOJO_RESULT_SHOULD_WAIT;
156   }
157 
158   if (!discard) {
159     const uint8_t* data =
160         static_cast<const uint8_t*>(ring_buffer_mapping_.memory());
161     CHECK(data);
162 
163     uint8_t* destination = static_cast<uint8_t*>(elements);
164     CHECK(destination);
165 
166     DCHECK_LE(read_offset_, options_.capacity_num_bytes);
167     uint32_t tail_bytes_to_copy =
168         std::min(options_.capacity_num_bytes - read_offset_, bytes_to_read);
169     uint32_t head_bytes_to_copy = bytes_to_read - tail_bytes_to_copy;
170     if (tail_bytes_to_copy > 0)
171       memcpy(destination, data + read_offset_, tail_bytes_to_copy);
172     if (head_bytes_to_copy > 0)
173       memcpy(destination + tail_bytes_to_copy, data, head_bytes_to_copy);
174   }
175   *num_bytes = bytes_to_read;
176 
177   bool peek = !!(options.flags & MOJO_READ_DATA_FLAG_PEEK);
178   if (discard || !peek) {
179     read_offset_ = (read_offset_ + bytes_to_read) % options_.capacity_num_bytes;
180     bytes_available_ -= bytes_to_read;
181 
182     base::AutoUnlock unlock(lock_);
183     NotifyRead(bytes_to_read);
184   }
185 
186   // We may have just read the last available data and thus changed the signals
187   // state.
188   watchers_.NotifyState(GetHandleSignalsStateNoLock());
189 
190   return MOJO_RESULT_OK;
191 }
192 
BeginReadData(const void ** buffer,uint32_t * buffer_num_bytes)193 MojoResult DataPipeConsumerDispatcher::BeginReadData(
194     const void** buffer,
195     uint32_t* buffer_num_bytes) {
196   base::AutoLock lock(lock_);
197   if (!shared_ring_buffer_.IsValid() || in_transit_)
198     return MOJO_RESULT_INVALID_ARGUMENT;
199 
200   if (in_two_phase_read_)
201     return MOJO_RESULT_BUSY;
202 
203   const bool had_new_data = new_data_available_;
204   new_data_available_ = false;
205 
206   if (bytes_available_ == 0) {
207     if (had_new_data)
208       watchers_.NotifyState(GetHandleSignalsStateNoLock());
209     return peer_closed_ ? MOJO_RESULT_FAILED_PRECONDITION
210                         : MOJO_RESULT_SHOULD_WAIT;
211   }
212 
213   DCHECK_LT(read_offset_, options_.capacity_num_bytes);
214   uint32_t bytes_to_read =
215       std::min(bytes_available_, options_.capacity_num_bytes - read_offset_);
216 
217   CHECK(ring_buffer_mapping_.IsValid());
218   uint8_t* data = static_cast<uint8_t*>(ring_buffer_mapping_.memory());
219   CHECK(data);
220 
221   in_two_phase_read_ = true;
222   *buffer = data + read_offset_;
223   *buffer_num_bytes = bytes_to_read;
224   two_phase_max_bytes_read_ = bytes_to_read;
225 
226   if (had_new_data)
227     watchers_.NotifyState(GetHandleSignalsStateNoLock());
228 
229   return MOJO_RESULT_OK;
230 }
231 
EndReadData(uint32_t num_bytes_read)232 MojoResult DataPipeConsumerDispatcher::EndReadData(uint32_t num_bytes_read) {
233   base::AutoLock lock(lock_);
234   if (!in_two_phase_read_)
235     return MOJO_RESULT_FAILED_PRECONDITION;
236 
237   if (in_transit_)
238     return MOJO_RESULT_INVALID_ARGUMENT;
239 
240   CHECK(shared_ring_buffer_.IsValid());
241 
242   MojoResult rv;
243   if (num_bytes_read > two_phase_max_bytes_read_ ||
244       num_bytes_read % options_.element_num_bytes != 0) {
245     rv = MOJO_RESULT_INVALID_ARGUMENT;
246   } else {
247     rv = MOJO_RESULT_OK;
248     read_offset_ =
249         (read_offset_ + num_bytes_read) % options_.capacity_num_bytes;
250 
251     DCHECK_GE(bytes_available_, num_bytes_read);
252     bytes_available_ -= num_bytes_read;
253 
254     base::AutoUnlock unlock(lock_);
255     NotifyRead(num_bytes_read);
256   }
257 
258   in_two_phase_read_ = false;
259   two_phase_max_bytes_read_ = 0;
260 
261   watchers_.NotifyState(GetHandleSignalsStateNoLock());
262 
263   return rv;
264 }
265 
GetHandleSignalsState() const266 HandleSignalsState DataPipeConsumerDispatcher::GetHandleSignalsState() const {
267   base::AutoLock lock(lock_);
268   return GetHandleSignalsStateNoLock();
269 }
270 
AddWatcherRef(const scoped_refptr<WatcherDispatcher> & watcher,uintptr_t context)271 MojoResult DataPipeConsumerDispatcher::AddWatcherRef(
272     const scoped_refptr<WatcherDispatcher>& watcher,
273     uintptr_t context) {
274   base::AutoLock lock(lock_);
275   if (is_closed_ || in_transit_)
276     return MOJO_RESULT_INVALID_ARGUMENT;
277   return watchers_.Add(watcher, context, GetHandleSignalsStateNoLock());
278 }
279 
RemoveWatcherRef(WatcherDispatcher * watcher,uintptr_t context)280 MojoResult DataPipeConsumerDispatcher::RemoveWatcherRef(
281     WatcherDispatcher* watcher,
282     uintptr_t context) {
283   base::AutoLock lock(lock_);
284   if (is_closed_ || in_transit_)
285     return MOJO_RESULT_INVALID_ARGUMENT;
286   return watchers_.Remove(watcher, context);
287 }
288 
StartSerialize(uint32_t * num_bytes,uint32_t * num_ports,uint32_t * num_handles)289 void DataPipeConsumerDispatcher::StartSerialize(uint32_t* num_bytes,
290                                                 uint32_t* num_ports,
291                                                 uint32_t* num_handles) {
292   base::AutoLock lock(lock_);
293   DCHECK(in_transit_);
294   *num_bytes = static_cast<uint32_t>(sizeof(SerializedState));
295   *num_ports = 1;
296   *num_handles = 1;
297 }
298 
EndSerialize(void * destination,ports::PortName * ports,PlatformHandle * platform_handles)299 bool DataPipeConsumerDispatcher::EndSerialize(
300     void* destination,
301     ports::PortName* ports,
302     PlatformHandle* platform_handles) {
303   SerializedState* state = static_cast<SerializedState*>(destination);
304   memcpy(&state->options, &options_, sizeof(MojoCreateDataPipeOptions));
305   memset(state->padding, 0, sizeof(state->padding));
306 
307   base::AutoLock lock(lock_);
308   DCHECK(in_transit_);
309   state->pipe_id = pipe_id_;
310   state->read_offset = read_offset_;
311   state->bytes_available = bytes_available_;
312   state->flags = peer_closed_ ? kFlagPeerClosed : 0;
313 
314   auto region_handle =
315       base::UnsafeSharedMemoryRegion::TakeHandleForSerialization(
316           std::move(shared_ring_buffer_));
317   const base::UnguessableToken& guid = region_handle.GetGUID();
318   state->buffer_guid_high = guid.GetHighForSerialization();
319   state->buffer_guid_low = guid.GetLowForSerialization();
320 
321   ports[0] = control_port_.name();
322 
323   PlatformHandle handle;
324   PlatformHandle ignored_handle;
325   ExtractPlatformHandlesFromSharedMemoryRegionHandle(
326       region_handle.PassPlatformHandle(), &handle, &ignored_handle);
327   if (!handle.is_valid() || ignored_handle.is_valid())
328     return false;
329 
330   platform_handles[0] = std::move(handle);
331   return true;
332 }
333 
BeginTransit()334 bool DataPipeConsumerDispatcher::BeginTransit() {
335   base::AutoLock lock(lock_);
336   if (in_transit_)
337     return false;
338   in_transit_ = !in_two_phase_read_;
339   return in_transit_;
340 }
341 
CompleteTransitAndClose()342 void DataPipeConsumerDispatcher::CompleteTransitAndClose() {
343   node_controller_->SetPortObserver(control_port_, nullptr);
344 
345   base::AutoLock lock(lock_);
346   DCHECK(in_transit_);
347   in_transit_ = false;
348   transferred_ = true;
349   CloseNoLock();
350 }
351 
CancelTransit()352 void DataPipeConsumerDispatcher::CancelTransit() {
353   base::AutoLock lock(lock_);
354   DCHECK(in_transit_);
355   in_transit_ = false;
356   UpdateSignalsStateNoLock();
357 }
358 
359 // static
360 scoped_refptr<DataPipeConsumerDispatcher>
Deserialize(const void * data,size_t num_bytes,const ports::PortName * ports,size_t num_ports,PlatformHandle * handles,size_t num_handles)361 DataPipeConsumerDispatcher::Deserialize(const void* data,
362                                         size_t num_bytes,
363                                         const ports::PortName* ports,
364                                         size_t num_ports,
365                                         PlatformHandle* handles,
366                                         size_t num_handles) {
367   if (num_ports != 1 || num_handles != 1 ||
368       num_bytes != sizeof(SerializedState)) {
369     return nullptr;
370   }
371 
372   const SerializedState* state = static_cast<const SerializedState*>(data);
373   if (!state->options.capacity_num_bytes || !state->options.element_num_bytes ||
374       state->options.capacity_num_bytes < state->options.element_num_bytes) {
375     return nullptr;
376   }
377 
378   NodeController* node_controller = Core::Get()->GetNodeController();
379   ports::PortRef port;
380   if (node_controller->node()->GetPort(ports[0], &port) != ports::OK)
381     return nullptr;
382 
383   auto region_handle = CreateSharedMemoryRegionHandleFromPlatformHandles(
384       std::move(handles[0]), PlatformHandle());
385   auto region = base::subtle::PlatformSharedMemoryRegion::Take(
386       std::move(region_handle),
387       base::subtle::PlatformSharedMemoryRegion::Mode::kUnsafe,
388       state->options.capacity_num_bytes,
389       base::UnguessableToken::Deserialize(state->buffer_guid_high,
390                                           state->buffer_guid_low));
391   auto ring_buffer =
392       base::UnsafeSharedMemoryRegion::Deserialize(std::move(region));
393   if (!ring_buffer.IsValid()) {
394     DLOG(ERROR) << "Failed to deserialize shared buffer handle.";
395     return nullptr;
396   }
397 
398   scoped_refptr<DataPipeConsumerDispatcher> dispatcher =
399       new DataPipeConsumerDispatcher(node_controller, port,
400                                      std::move(ring_buffer), state->options,
401                                      state->pipe_id);
402 
403   {
404     base::AutoLock lock(dispatcher->lock_);
405     dispatcher->read_offset_ = state->read_offset;
406     dispatcher->bytes_available_ = state->bytes_available;
407     dispatcher->new_data_available_ = state->bytes_available > 0;
408     dispatcher->peer_closed_ = state->flags & kFlagPeerClosed;
409     if (!dispatcher->InitializeNoLock())
410       return nullptr;
411     dispatcher->UpdateSignalsStateNoLock();
412   }
413 
414   return dispatcher;
415 }
416 
DataPipeConsumerDispatcher(NodeController * node_controller,const ports::PortRef & control_port,base::UnsafeSharedMemoryRegion shared_ring_buffer,const MojoCreateDataPipeOptions & options,uint64_t pipe_id)417 DataPipeConsumerDispatcher::DataPipeConsumerDispatcher(
418     NodeController* node_controller,
419     const ports::PortRef& control_port,
420     base::UnsafeSharedMemoryRegion shared_ring_buffer,
421     const MojoCreateDataPipeOptions& options,
422     uint64_t pipe_id)
423     : options_(options),
424       node_controller_(node_controller),
425       control_port_(control_port),
426       pipe_id_(pipe_id),
427       watchers_(this),
428       shared_ring_buffer_(std::move(shared_ring_buffer)) {}
429 
~DataPipeConsumerDispatcher()430 DataPipeConsumerDispatcher::~DataPipeConsumerDispatcher() {
431   DCHECK(is_closed_ && !shared_ring_buffer_.IsValid() &&
432          !ring_buffer_mapping_.IsValid() && !in_transit_);
433 }
434 
InitializeNoLock()435 bool DataPipeConsumerDispatcher::InitializeNoLock() {
436   lock_.AssertAcquired();
437   if (!shared_ring_buffer_.IsValid())
438     return false;
439 
440   DCHECK(!ring_buffer_mapping_.IsValid());
441   ring_buffer_mapping_ = shared_ring_buffer_.Map();
442   if (!ring_buffer_mapping_.IsValid()) {
443     DLOG(ERROR) << "Failed to map shared buffer.";
444     shared_ring_buffer_ = base::UnsafeSharedMemoryRegion();
445     return false;
446   }
447 
448   base::AutoUnlock unlock(lock_);
449   node_controller_->SetPortObserver(
450       control_port_, base::MakeRefCounted<PortObserverThunk>(this));
451 
452   return true;
453 }
454 
CloseNoLock()455 MojoResult DataPipeConsumerDispatcher::CloseNoLock() {
456   lock_.AssertAcquired();
457   if (is_closed_ || in_transit_)
458     return MOJO_RESULT_INVALID_ARGUMENT;
459   is_closed_ = true;
460   ring_buffer_mapping_ = base::WritableSharedMemoryMapping();
461   shared_ring_buffer_ = base::UnsafeSharedMemoryRegion();
462 
463   watchers_.NotifyClosed();
464   if (!transferred_) {
465     base::AutoUnlock unlock(lock_);
466     node_controller_->ClosePort(control_port_);
467   }
468 
469   return MOJO_RESULT_OK;
470 }
471 
GetHandleSignalsStateNoLock() const472 HandleSignalsState DataPipeConsumerDispatcher::GetHandleSignalsStateNoLock()
473     const {
474   lock_.AssertAcquired();
475 
476   HandleSignalsState rv;
477   if (shared_ring_buffer_.IsValid() && bytes_available_) {
478     if (!in_two_phase_read_) {
479       rv.satisfied_signals |= MOJO_HANDLE_SIGNAL_READABLE;
480       if (new_data_available_)
481         rv.satisfied_signals |= MOJO_HANDLE_SIGNAL_NEW_DATA_READABLE;
482     }
483     rv.satisfiable_signals |= MOJO_HANDLE_SIGNAL_READABLE;
484   } else if (!peer_closed_ && shared_ring_buffer_.IsValid()) {
485     rv.satisfiable_signals |= MOJO_HANDLE_SIGNAL_READABLE;
486   }
487 
488   if (shared_ring_buffer_.IsValid()) {
489     if (new_data_available_ || !peer_closed_)
490       rv.satisfiable_signals |= MOJO_HANDLE_SIGNAL_NEW_DATA_READABLE;
491   }
492 
493   if (peer_closed_) {
494     rv.satisfied_signals |= MOJO_HANDLE_SIGNAL_PEER_CLOSED;
495   } else {
496     rv.satisfiable_signals |= MOJO_HANDLE_SIGNAL_PEER_REMOTE;
497     if (peer_remote_)
498       rv.satisfied_signals |= MOJO_HANDLE_SIGNAL_PEER_REMOTE;
499   }
500   rv.satisfiable_signals |= MOJO_HANDLE_SIGNAL_PEER_CLOSED;
501 
502   return rv;
503 }
504 
NotifyRead(uint32_t num_bytes)505 void DataPipeConsumerDispatcher::NotifyRead(uint32_t num_bytes) {
506   DVLOG(1) << "Data pipe consumer " << pipe_id_
507            << " notifying peer: " << num_bytes
508            << " bytes read. [control_port=" << control_port_.name() << "]";
509 
510   SendDataPipeControlMessage(node_controller_, control_port_,
511                              DataPipeCommand::DATA_WAS_READ, num_bytes);
512 }
513 
OnPortStatusChanged()514 void DataPipeConsumerDispatcher::OnPortStatusChanged() {
515   DCHECK(RequestContext::current());
516 
517   base::AutoLock lock(lock_);
518 
519   // We stop observing the control port as soon it's transferred, but this can
520   // race with events which are raised right before that happens. This is fine
521   // to ignore.
522   if (transferred_)
523     return;
524 
525   DVLOG(1) << "Control port status changed for data pipe producer " << pipe_id_;
526 
527   UpdateSignalsStateNoLock();
528 }
529 
UpdateSignalsStateNoLock()530 void DataPipeConsumerDispatcher::UpdateSignalsStateNoLock() {
531   lock_.AssertAcquired();
532 
533   const bool was_peer_closed = peer_closed_;
534   const bool was_peer_remote = peer_remote_;
535   size_t previous_bytes_available = bytes_available_;
536 
537   ports::PortStatus port_status;
538   int rv = node_controller_->node()->GetStatus(control_port_, &port_status);
539   peer_remote_ = rv == ports::OK && port_status.peer_remote;
540   if (rv != ports::OK || !port_status.receiving_messages) {
541     DVLOG(1) << "Data pipe consumer " << pipe_id_ << " is aware of peer closure"
542              << " [control_port=" << control_port_.name() << "]";
543     peer_closed_ = true;
544   } else if (rv == ports::OK && port_status.has_messages && !in_transit_) {
545     std::unique_ptr<ports::UserMessageEvent> message_event;
546     do {
547       int rv = node_controller_->node()->GetMessage(control_port_,
548                                                     &message_event, nullptr);
549       if (rv != ports::OK)
550         peer_closed_ = true;
551       if (message_event) {
552         auto* message = message_event->GetMessage<UserMessageImpl>();
553         if (message->user_payload_size() < sizeof(DataPipeControlMessage)) {
554           peer_closed_ = true;
555           break;
556         }
557 
558         const DataPipeControlMessage* m =
559             static_cast<const DataPipeControlMessage*>(message->user_payload());
560 
561         if (m->command != DataPipeCommand::DATA_WAS_WRITTEN) {
562           DLOG(ERROR) << "Unexpected control message from producer.";
563           peer_closed_ = true;
564           break;
565         }
566 
567         if (static_cast<size_t>(bytes_available_) + m->num_bytes >
568             options_.capacity_num_bytes) {
569           DLOG(ERROR) << "Producer claims to have written too many bytes.";
570           peer_closed_ = true;
571           break;
572         }
573 
574         DVLOG(1) << "Data pipe consumer " << pipe_id_ << " is aware that "
575                  << m->num_bytes << " bytes were written. [control_port="
576                  << control_port_.name() << "]";
577 
578         bytes_available_ += m->num_bytes;
579       }
580     } while (message_event);
581   }
582 
583   bool has_new_data = bytes_available_ != previous_bytes_available;
584   if (has_new_data)
585     new_data_available_ = true;
586 
587   if (peer_closed_ != was_peer_closed || has_new_data ||
588       peer_remote_ != was_peer_remote) {
589     watchers_.NotifyState(GetHandleSignalsStateNoLock());
590   }
591 }
592 
593 }  // namespace core
594 }  // namespace mojo
595