1 /*
2  *  Copyright 2020 The WebRTC project authors. All Rights Reserved.
3  *
4  *  Use of this source code is governed by a BSD-style license
5  *  that can be found in the LICENSE file in the root of the source
6  *  tree. An additional intellectual property rights grant can be found
7  *  in the file PATENTS.  All contributing project authors may
8  *  be found in the AUTHORS file in the root of the source tree.
9  */
10 
11 #include "pc/sctp_data_channel.h"
12 
13 #include <memory>
14 #include <string>
15 #include <utility>
16 
17 #include "api/proxy.h"
18 #include "media/sctp/sctp_transport_internal.h"
19 #include "pc/sctp_utils.h"
20 #include "rtc_base/checks.h"
21 #include "rtc_base/location.h"
22 #include "rtc_base/logging.h"
23 #include "rtc_base/ref_counted_object.h"
24 #include "rtc_base/thread.h"
25 
26 namespace webrtc {
27 
28 namespace {
29 
30 static size_t kMaxQueuedReceivedDataBytes = 16 * 1024 * 1024;
31 static size_t kMaxQueuedSendDataBytes = 16 * 1024 * 1024;
32 
33 static std::atomic<int> g_unique_id{0};
34 
GenerateUniqueId()35 int GenerateUniqueId() {
36   return ++g_unique_id;
37 }
38 
39 // Define proxy for DataChannelInterface.
40 BEGIN_SIGNALING_PROXY_MAP(DataChannel)
41 PROXY_SIGNALING_THREAD_DESTRUCTOR()
42 PROXY_METHOD1(void, RegisterObserver, DataChannelObserver*)
43 PROXY_METHOD0(void, UnregisterObserver)
44 BYPASS_PROXY_CONSTMETHOD0(std::string, label)
45 BYPASS_PROXY_CONSTMETHOD0(bool, reliable)
46 BYPASS_PROXY_CONSTMETHOD0(bool, ordered)
47 BYPASS_PROXY_CONSTMETHOD0(uint16_t, maxRetransmitTime)
48 BYPASS_PROXY_CONSTMETHOD0(uint16_t, maxRetransmits)
49 BYPASS_PROXY_CONSTMETHOD0(absl::optional<int>, maxRetransmitsOpt)
50 BYPASS_PROXY_CONSTMETHOD0(absl::optional<int>, maxPacketLifeTime)
51 BYPASS_PROXY_CONSTMETHOD0(std::string, protocol)
52 BYPASS_PROXY_CONSTMETHOD0(bool, negotiated)
53 // Can't bypass the proxy since the id may change.
54 PROXY_CONSTMETHOD0(int, id)
55 BYPASS_PROXY_CONSTMETHOD0(Priority, priority)
56 PROXY_CONSTMETHOD0(DataState, state)
57 PROXY_CONSTMETHOD0(RTCError, error)
58 PROXY_CONSTMETHOD0(uint32_t, messages_sent)
59 PROXY_CONSTMETHOD0(uint64_t, bytes_sent)
60 PROXY_CONSTMETHOD0(uint32_t, messages_received)
61 PROXY_CONSTMETHOD0(uint64_t, bytes_received)
62 PROXY_CONSTMETHOD0(uint64_t, buffered_amount)
63 PROXY_METHOD0(void, Close)
64 // TODO(bugs.webrtc.org/11547): Change to run on the network thread.
65 PROXY_METHOD1(bool, Send, const DataBuffer&)
66 END_PROXY_MAP()
67 
68 }  // namespace
69 
InternalDataChannelInit(const DataChannelInit & base)70 InternalDataChannelInit::InternalDataChannelInit(const DataChannelInit& base)
71     : DataChannelInit(base), open_handshake_role(kOpener) {
72   // If the channel is externally negotiated, do not send the OPEN message.
73   if (base.negotiated) {
74     open_handshake_role = kNone;
75   } else {
76     // Datachannel is externally negotiated. Ignore the id value.
77     // Specified in createDataChannel, WebRTC spec section 6.1 bullet 13.
78     id = -1;
79   }
80   // Backwards compatibility: If base.maxRetransmits or base.maxRetransmitTime
81   // have been set to -1, unset them.
82   if (maxRetransmits && *maxRetransmits == -1) {
83     RTC_LOG(LS_ERROR)
84         << "Accepting maxRetransmits = -1 for backwards compatibility";
85     maxRetransmits = absl::nullopt;
86   }
87   if (maxRetransmitTime && *maxRetransmitTime == -1) {
88     RTC_LOG(LS_ERROR)
89         << "Accepting maxRetransmitTime = -1 for backwards compatibility";
90     maxRetransmitTime = absl::nullopt;
91   }
92 }
93 
AllocateSid(rtc::SSLRole role,int * sid)94 bool SctpSidAllocator::AllocateSid(rtc::SSLRole role, int* sid) {
95   int potential_sid = (role == rtc::SSL_CLIENT) ? 0 : 1;
96   while (!IsSidAvailable(potential_sid)) {
97     potential_sid += 2;
98     if (potential_sid > static_cast<int>(cricket::kMaxSctpSid)) {
99       return false;
100     }
101   }
102 
103   *sid = potential_sid;
104   used_sids_.insert(potential_sid);
105   return true;
106 }
107 
ReserveSid(int sid)108 bool SctpSidAllocator::ReserveSid(int sid) {
109   if (!IsSidAvailable(sid)) {
110     return false;
111   }
112   used_sids_.insert(sid);
113   return true;
114 }
115 
ReleaseSid(int sid)116 void SctpSidAllocator::ReleaseSid(int sid) {
117   auto it = used_sids_.find(sid);
118   if (it != used_sids_.end()) {
119     used_sids_.erase(it);
120   }
121 }
122 
IsSidAvailable(int sid) const123 bool SctpSidAllocator::IsSidAvailable(int sid) const {
124   if (sid < static_cast<int>(cricket::kMinSctpSid) ||
125       sid > static_cast<int>(cricket::kMaxSctpSid)) {
126     return false;
127   }
128   return used_sids_.find(sid) == used_sids_.end();
129 }
130 
Create(SctpDataChannelProviderInterface * provider,const std::string & label,const InternalDataChannelInit & config,rtc::Thread * signaling_thread,rtc::Thread * network_thread)131 rtc::scoped_refptr<SctpDataChannel> SctpDataChannel::Create(
132     SctpDataChannelProviderInterface* provider,
133     const std::string& label,
134     const InternalDataChannelInit& config,
135     rtc::Thread* signaling_thread,
136     rtc::Thread* network_thread) {
137   rtc::scoped_refptr<SctpDataChannel> channel(
138       new rtc::RefCountedObject<SctpDataChannel>(
139           config, provider, label, signaling_thread, network_thread));
140   if (!channel->Init()) {
141     return nullptr;
142   }
143   return channel;
144 }
145 
146 // static
CreateProxy(rtc::scoped_refptr<SctpDataChannel> channel)147 rtc::scoped_refptr<DataChannelInterface> SctpDataChannel::CreateProxy(
148     rtc::scoped_refptr<SctpDataChannel> channel) {
149   // TODO(bugs.webrtc.org/11547): incorporate the network thread in the proxy.
150   // Also, consider allowing the proxy object to own the reference (std::move).
151   // As is, the proxy has a raw pointer and no reference to the channel object
152   // and trusting that the lifetime management aligns with the
153   // sctp_data_channels_ array in SctpDataChannelController.
154   return DataChannelProxy::Create(channel->signaling_thread_, channel.get());
155 }
156 
SctpDataChannel(const InternalDataChannelInit & config,SctpDataChannelProviderInterface * provider,const std::string & label,rtc::Thread * signaling_thread,rtc::Thread * network_thread)157 SctpDataChannel::SctpDataChannel(const InternalDataChannelInit& config,
158                                  SctpDataChannelProviderInterface* provider,
159                                  const std::string& label,
160                                  rtc::Thread* signaling_thread,
161                                  rtc::Thread* network_thread)
162     : signaling_thread_(signaling_thread),
163       network_thread_(network_thread),
164       internal_id_(GenerateUniqueId()),
165       label_(label),
166       config_(config),
167       observer_(nullptr),
168       provider_(provider) {
169   RTC_DCHECK_RUN_ON(signaling_thread_);
170 }
171 
Init()172 bool SctpDataChannel::Init() {
173   RTC_DCHECK_RUN_ON(signaling_thread_);
174   if (config_.id < -1 ||
175       (config_.maxRetransmits && *config_.maxRetransmits < 0) ||
176       (config_.maxRetransmitTime && *config_.maxRetransmitTime < 0)) {
177     RTC_LOG(LS_ERROR) << "Failed to initialize the SCTP data channel due to "
178                          "invalid DataChannelInit.";
179     return false;
180   }
181   if (config_.maxRetransmits && config_.maxRetransmitTime) {
182     RTC_LOG(LS_ERROR)
183         << "maxRetransmits and maxRetransmitTime should not be both set.";
184     return false;
185   }
186 
187   switch (config_.open_handshake_role) {
188     case webrtc::InternalDataChannelInit::kNone:  // pre-negotiated
189       handshake_state_ = kHandshakeReady;
190       break;
191     case webrtc::InternalDataChannelInit::kOpener:
192       handshake_state_ = kHandshakeShouldSendOpen;
193       break;
194     case webrtc::InternalDataChannelInit::kAcker:
195       handshake_state_ = kHandshakeShouldSendAck;
196       break;
197   }
198 
199   // Try to connect to the transport in case the transport channel already
200   // exists.
201   OnTransportChannelCreated();
202 
203   // Checks if the transport is ready to send because the initial channel
204   // ready signal may have been sent before the DataChannel creation.
205   // This has to be done async because the upper layer objects (e.g.
206   // Chrome glue and WebKit) are not wired up properly until after this
207   // function returns.
208   if (provider_->ReadyToSendData()) {
209     invoker_.AsyncInvoke<void>(RTC_FROM_HERE, rtc::Thread::Current(),
210                                [this] { OnTransportReady(true); });
211   }
212 
213   return true;
214 }
215 
~SctpDataChannel()216 SctpDataChannel::~SctpDataChannel() {
217   RTC_DCHECK_RUN_ON(signaling_thread_);
218 }
219 
RegisterObserver(DataChannelObserver * observer)220 void SctpDataChannel::RegisterObserver(DataChannelObserver* observer) {
221   RTC_DCHECK_RUN_ON(signaling_thread_);
222   observer_ = observer;
223   DeliverQueuedReceivedData();
224 }
225 
UnregisterObserver()226 void SctpDataChannel::UnregisterObserver() {
227   RTC_DCHECK_RUN_ON(signaling_thread_);
228   observer_ = nullptr;
229 }
230 
reliable() const231 bool SctpDataChannel::reliable() const {
232   // May be called on any thread.
233   return !config_.maxRetransmits && !config_.maxRetransmitTime;
234 }
235 
buffered_amount() const236 uint64_t SctpDataChannel::buffered_amount() const {
237   RTC_DCHECK_RUN_ON(signaling_thread_);
238   return buffered_amount_;
239 }
240 
Close()241 void SctpDataChannel::Close() {
242   RTC_DCHECK_RUN_ON(signaling_thread_);
243   if (state_ == kClosed)
244     return;
245   SetState(kClosing);
246   // Will send queued data before beginning the underlying closing procedure.
247   UpdateState();
248 }
249 
state() const250 SctpDataChannel::DataState SctpDataChannel::state() const {
251   RTC_DCHECK_RUN_ON(signaling_thread_);
252   return state_;
253 }
254 
error() const255 RTCError SctpDataChannel::error() const {
256   RTC_DCHECK_RUN_ON(signaling_thread_);
257   return error_;
258 }
259 
messages_sent() const260 uint32_t SctpDataChannel::messages_sent() const {
261   RTC_DCHECK_RUN_ON(signaling_thread_);
262   return messages_sent_;
263 }
264 
bytes_sent() const265 uint64_t SctpDataChannel::bytes_sent() const {
266   RTC_DCHECK_RUN_ON(signaling_thread_);
267   return bytes_sent_;
268 }
269 
messages_received() const270 uint32_t SctpDataChannel::messages_received() const {
271   RTC_DCHECK_RUN_ON(signaling_thread_);
272   return messages_received_;
273 }
274 
bytes_received() const275 uint64_t SctpDataChannel::bytes_received() const {
276   RTC_DCHECK_RUN_ON(signaling_thread_);
277   return bytes_received_;
278 }
279 
Send(const DataBuffer & buffer)280 bool SctpDataChannel::Send(const DataBuffer& buffer) {
281   RTC_DCHECK_RUN_ON(signaling_thread_);
282   // TODO(bugs.webrtc.org/11547): Expect this method to be called on the network
283   // thread. Bring buffer management etc to the network thread and keep the
284   // operational state management on the signaling thread.
285 
286   if (state_ != kOpen) {
287     return false;
288   }
289 
290   // TODO(jiayl): the spec is unclear about if the remote side should get the
291   // onmessage event. We need to figure out the expected behavior and change the
292   // code accordingly.
293   if (buffer.size() == 0) {
294     return true;
295   }
296 
297   buffered_amount_ += buffer.size();
298 
299   // If the queue is non-empty, we're waiting for SignalReadyToSend,
300   // so just add to the end of the queue and keep waiting.
301   if (!queued_send_data_.Empty()) {
302     if (!QueueSendDataMessage(buffer)) {
303       RTC_LOG(LS_ERROR) << "Closing the DataChannel due to a failure to queue "
304                            "additional data.";
305       // https://w3c.github.io/webrtc-pc/#dom-rtcdatachannel-send step 5
306       // Note that the spec doesn't explicitly say to close in this situation.
307       CloseAbruptlyWithError(RTCError(RTCErrorType::RESOURCE_EXHAUSTED,
308                                       "Unable to queue data for sending"));
309     }
310     return true;
311   }
312 
313   SendDataMessage(buffer, true);
314 
315   // Always return true for SCTP DataChannel per the spec.
316   return true;
317 }
318 
SetSctpSid(int sid)319 void SctpDataChannel::SetSctpSid(int sid) {
320   RTC_DCHECK_RUN_ON(signaling_thread_);
321   RTC_DCHECK_LT(config_.id, 0);
322   RTC_DCHECK_GE(sid, 0);
323   RTC_DCHECK_NE(handshake_state_, kHandshakeWaitingForAck);
324   RTC_DCHECK_EQ(state_, kConnecting);
325 
326   if (config_.id == sid) {
327     return;
328   }
329 
330   const_cast<InternalDataChannelInit&>(config_).id = sid;
331   provider_->AddSctpDataStream(sid);
332 }
333 
OnClosingProcedureStartedRemotely(int sid)334 void SctpDataChannel::OnClosingProcedureStartedRemotely(int sid) {
335   RTC_DCHECK_RUN_ON(signaling_thread_);
336   if (sid == config_.id && state_ != kClosing && state_ != kClosed) {
337     // Don't bother sending queued data since the side that initiated the
338     // closure wouldn't receive it anyway. See crbug.com/559394 for a lengthy
339     // discussion about this.
340     queued_send_data_.Clear();
341     queued_control_data_.Clear();
342     // Just need to change state to kClosing, SctpTransport will handle the
343     // rest of the closing procedure and OnClosingProcedureComplete will be
344     // called later.
345     started_closing_procedure_ = true;
346     SetState(kClosing);
347   }
348 }
349 
OnClosingProcedureComplete(int sid)350 void SctpDataChannel::OnClosingProcedureComplete(int sid) {
351   RTC_DCHECK_RUN_ON(signaling_thread_);
352   if (sid == config_.id) {
353     // If the closing procedure is complete, we should have finished sending
354     // all pending data and transitioned to kClosing already.
355     RTC_DCHECK_EQ(state_, kClosing);
356     RTC_DCHECK(queued_send_data_.Empty());
357     DisconnectFromProvider();
358     SetState(kClosed);
359   }
360 }
361 
OnTransportChannelCreated()362 void SctpDataChannel::OnTransportChannelCreated() {
363   RTC_DCHECK_RUN_ON(signaling_thread_);
364   if (!connected_to_provider_) {
365     connected_to_provider_ = provider_->ConnectDataChannel(this);
366   }
367   // The sid may have been unassigned when provider_->ConnectDataChannel was
368   // done. So always add the streams even if connected_to_provider_ is true.
369   if (config_.id >= 0) {
370     provider_->AddSctpDataStream(config_.id);
371   }
372 }
373 
OnTransportChannelClosed()374 void SctpDataChannel::OnTransportChannelClosed() {
375   // The SctpTransport is unusable (for example, because the SCTP m= section
376   // was rejected, or because the DTLS transport closed), so we need to close
377   // abruptly.
378   RTCError error = RTCError(RTCErrorType::OPERATION_ERROR_WITH_DATA,
379                             "Transport channel closed");
380   error.set_error_detail(RTCErrorDetailType::SCTP_FAILURE);
381   CloseAbruptlyWithError(std::move(error));
382 }
383 
GetStats() const384 DataChannelStats SctpDataChannel::GetStats() const {
385   RTC_DCHECK_RUN_ON(signaling_thread_);
386   DataChannelStats stats{internal_id_,        id(),         label(),
387                          protocol(),          state(),      messages_sent(),
388                          messages_received(), bytes_sent(), bytes_received()};
389   return stats;
390 }
391 
OnDataReceived(const cricket::ReceiveDataParams & params,const rtc::CopyOnWriteBuffer & payload)392 void SctpDataChannel::OnDataReceived(const cricket::ReceiveDataParams& params,
393                                      const rtc::CopyOnWriteBuffer& payload) {
394   RTC_DCHECK_RUN_ON(signaling_thread_);
395   if (params.sid != config_.id) {
396     return;
397   }
398 
399   if (params.type == cricket::DMT_CONTROL) {
400     if (handshake_state_ != kHandshakeWaitingForAck) {
401       // Ignore it if we are not expecting an ACK message.
402       RTC_LOG(LS_WARNING)
403           << "DataChannel received unexpected CONTROL message, sid = "
404           << params.sid;
405       return;
406     }
407     if (ParseDataChannelOpenAckMessage(payload)) {
408       // We can send unordered as soon as we receive the ACK message.
409       handshake_state_ = kHandshakeReady;
410       RTC_LOG(LS_INFO) << "DataChannel received OPEN_ACK message, sid = "
411                        << params.sid;
412     } else {
413       RTC_LOG(LS_WARNING)
414           << "DataChannel failed to parse OPEN_ACK message, sid = "
415           << params.sid;
416     }
417     return;
418   }
419 
420   RTC_DCHECK(params.type == cricket::DMT_BINARY ||
421              params.type == cricket::DMT_TEXT);
422 
423   RTC_LOG(LS_VERBOSE) << "DataChannel received DATA message, sid = "
424                       << params.sid;
425   // We can send unordered as soon as we receive any DATA message since the
426   // remote side must have received the OPEN (and old clients do not send
427   // OPEN_ACK).
428   if (handshake_state_ == kHandshakeWaitingForAck) {
429     handshake_state_ = kHandshakeReady;
430   }
431 
432   bool binary = (params.type == cricket::DMT_BINARY);
433   auto buffer = std::make_unique<DataBuffer>(payload, binary);
434   if (state_ == kOpen && observer_) {
435     ++messages_received_;
436     bytes_received_ += buffer->size();
437     observer_->OnMessage(*buffer.get());
438   } else {
439     if (queued_received_data_.byte_count() + payload.size() >
440         kMaxQueuedReceivedDataBytes) {
441       RTC_LOG(LS_ERROR) << "Queued received data exceeds the max buffer size.";
442 
443       queued_received_data_.Clear();
444       CloseAbruptlyWithError(
445           RTCError(RTCErrorType::RESOURCE_EXHAUSTED,
446                    "Queued received data exceeds the max buffer size."));
447 
448       return;
449     }
450     queued_received_data_.PushBack(std::move(buffer));
451   }
452 }
453 
OnTransportReady(bool writable)454 void SctpDataChannel::OnTransportReady(bool writable) {
455   RTC_DCHECK_RUN_ON(signaling_thread_);
456 
457   writable_ = writable;
458   if (!writable) {
459     return;
460   }
461 
462   SendQueuedControlMessages();
463   SendQueuedDataMessages();
464 
465   UpdateState();
466 }
467 
CloseAbruptlyWithError(RTCError error)468 void SctpDataChannel::CloseAbruptlyWithError(RTCError error) {
469   RTC_DCHECK_RUN_ON(signaling_thread_);
470 
471   if (state_ == kClosed) {
472     return;
473   }
474 
475   if (connected_to_provider_) {
476     DisconnectFromProvider();
477   }
478 
479   // Closing abruptly means any queued data gets thrown away.
480   buffered_amount_ = 0;
481 
482   queued_send_data_.Clear();
483   queued_control_data_.Clear();
484 
485   // Still go to "kClosing" before "kClosed", since observers may be expecting
486   // that.
487   SetState(kClosing);
488   error_ = std::move(error);
489   SetState(kClosed);
490 }
491 
CloseAbruptlyWithDataChannelFailure(const std::string & message)492 void SctpDataChannel::CloseAbruptlyWithDataChannelFailure(
493     const std::string& message) {
494   RTCError error(RTCErrorType::OPERATION_ERROR_WITH_DATA, message);
495   error.set_error_detail(RTCErrorDetailType::DATA_CHANNEL_FAILURE);
496   CloseAbruptlyWithError(std::move(error));
497 }
498 
UpdateState()499 void SctpDataChannel::UpdateState() {
500   RTC_DCHECK_RUN_ON(signaling_thread_);
501   // UpdateState determines what to do from a few state variables. Include
502   // all conditions required for each state transition here for
503   // clarity. OnTransportReady(true) will send any queued data and then invoke
504   // UpdateState().
505 
506   switch (state_) {
507     case kConnecting: {
508       if (connected_to_provider_) {
509         if (handshake_state_ == kHandshakeShouldSendOpen) {
510           rtc::CopyOnWriteBuffer payload;
511           WriteDataChannelOpenMessage(label_, config_, &payload);
512           SendControlMessage(payload);
513         } else if (handshake_state_ == kHandshakeShouldSendAck) {
514           rtc::CopyOnWriteBuffer payload;
515           WriteDataChannelOpenAckMessage(&payload);
516           SendControlMessage(payload);
517         }
518         if (writable_ && (handshake_state_ == kHandshakeReady ||
519                           handshake_state_ == kHandshakeWaitingForAck)) {
520           SetState(kOpen);
521           // If we have received buffers before the channel got writable.
522           // Deliver them now.
523           DeliverQueuedReceivedData();
524         }
525       }
526       break;
527     }
528     case kOpen: {
529       break;
530     }
531     case kClosing: {
532       // Wait for all queued data to be sent before beginning the closing
533       // procedure.
534       if (queued_send_data_.Empty() && queued_control_data_.Empty()) {
535         // For SCTP data channels, we need to wait for the closing procedure
536         // to complete; after calling RemoveSctpDataStream,
537         // OnClosingProcedureComplete will end up called asynchronously
538         // afterwards.
539         if (connected_to_provider_ && !started_closing_procedure_ &&
540             config_.id >= 0) {
541           started_closing_procedure_ = true;
542           provider_->RemoveSctpDataStream(config_.id);
543         }
544       }
545       break;
546     }
547     case kClosed:
548       break;
549   }
550 }
551 
SetState(DataState state)552 void SctpDataChannel::SetState(DataState state) {
553   RTC_DCHECK_RUN_ON(signaling_thread_);
554   if (state_ == state) {
555     return;
556   }
557 
558   state_ = state;
559   if (observer_) {
560     observer_->OnStateChange();
561   }
562   if (state_ == kOpen) {
563     SignalOpened(this);
564   } else if (state_ == kClosed) {
565     SignalClosed(this);
566   }
567 }
568 
DisconnectFromProvider()569 void SctpDataChannel::DisconnectFromProvider() {
570   RTC_DCHECK_RUN_ON(signaling_thread_);
571   if (!connected_to_provider_)
572     return;
573 
574   provider_->DisconnectDataChannel(this);
575   connected_to_provider_ = false;
576 }
577 
DeliverQueuedReceivedData()578 void SctpDataChannel::DeliverQueuedReceivedData() {
579   RTC_DCHECK_RUN_ON(signaling_thread_);
580   if (!observer_) {
581     return;
582   }
583 
584   while (!queued_received_data_.Empty()) {
585     std::unique_ptr<DataBuffer> buffer = queued_received_data_.PopFront();
586     ++messages_received_;
587     bytes_received_ += buffer->size();
588     observer_->OnMessage(*buffer);
589   }
590 }
591 
SendQueuedDataMessages()592 void SctpDataChannel::SendQueuedDataMessages() {
593   RTC_DCHECK_RUN_ON(signaling_thread_);
594   if (queued_send_data_.Empty()) {
595     return;
596   }
597 
598   RTC_DCHECK(state_ == kOpen || state_ == kClosing);
599 
600   while (!queued_send_data_.Empty()) {
601     std::unique_ptr<DataBuffer> buffer = queued_send_data_.PopFront();
602     if (!SendDataMessage(*buffer, false)) {
603       // Return the message to the front of the queue if sending is aborted.
604       queued_send_data_.PushFront(std::move(buffer));
605       break;
606     }
607   }
608 }
609 
SendDataMessage(const DataBuffer & buffer,bool queue_if_blocked)610 bool SctpDataChannel::SendDataMessage(const DataBuffer& buffer,
611                                       bool queue_if_blocked) {
612   RTC_DCHECK_RUN_ON(signaling_thread_);
613   cricket::SendDataParams send_params;
614 
615   send_params.ordered = config_.ordered;
616   // Send as ordered if it is still going through OPEN/ACK signaling.
617   if (handshake_state_ != kHandshakeReady && !config_.ordered) {
618     send_params.ordered = true;
619     RTC_LOG(LS_VERBOSE)
620         << "Sending data as ordered for unordered DataChannel "
621            "because the OPEN_ACK message has not been received.";
622   }
623 
624   send_params.max_rtx_count =
625       config_.maxRetransmits ? *config_.maxRetransmits : -1;
626   send_params.max_rtx_ms =
627       config_.maxRetransmitTime ? *config_.maxRetransmitTime : -1;
628   send_params.sid = config_.id;
629   send_params.type = buffer.binary ? cricket::DMT_BINARY : cricket::DMT_TEXT;
630 
631   cricket::SendDataResult send_result = cricket::SDR_SUCCESS;
632   bool success = provider_->SendData(send_params, buffer.data, &send_result);
633 
634   if (success) {
635     ++messages_sent_;
636     bytes_sent_ += buffer.size();
637 
638     RTC_DCHECK(buffered_amount_ >= buffer.size());
639     buffered_amount_ -= buffer.size();
640     if (observer_ && buffer.size() > 0) {
641       observer_->OnBufferedAmountChange(buffer.size());
642     }
643     return true;
644   }
645 
646   if (send_result == cricket::SDR_BLOCK) {
647     if (!queue_if_blocked || QueueSendDataMessage(buffer)) {
648       return false;
649     }
650   }
651   // Close the channel if the error is not SDR_BLOCK, or if queuing the
652   // message failed.
653   RTC_LOG(LS_ERROR) << "Closing the DataChannel due to a failure to send data, "
654                        "send_result = "
655                     << send_result;
656   CloseAbruptlyWithError(
657       RTCError(RTCErrorType::NETWORK_ERROR, "Failure to send data"));
658 
659   return false;
660 }
661 
QueueSendDataMessage(const DataBuffer & buffer)662 bool SctpDataChannel::QueueSendDataMessage(const DataBuffer& buffer) {
663   RTC_DCHECK_RUN_ON(signaling_thread_);
664   size_t start_buffered_amount = queued_send_data_.byte_count();
665   if (start_buffered_amount + buffer.size() > kMaxQueuedSendDataBytes) {
666     RTC_LOG(LS_ERROR) << "Can't buffer any more data for the data channel.";
667     return false;
668   }
669   queued_send_data_.PushBack(std::make_unique<DataBuffer>(buffer));
670   return true;
671 }
672 
SendQueuedControlMessages()673 void SctpDataChannel::SendQueuedControlMessages() {
674   RTC_DCHECK_RUN_ON(signaling_thread_);
675   PacketQueue control_packets;
676   control_packets.Swap(&queued_control_data_);
677 
678   while (!control_packets.Empty()) {
679     std::unique_ptr<DataBuffer> buf = control_packets.PopFront();
680     SendControlMessage(buf->data);
681   }
682 }
683 
QueueControlMessage(const rtc::CopyOnWriteBuffer & buffer)684 void SctpDataChannel::QueueControlMessage(
685     const rtc::CopyOnWriteBuffer& buffer) {
686   RTC_DCHECK_RUN_ON(signaling_thread_);
687   queued_control_data_.PushBack(std::make_unique<DataBuffer>(buffer, true));
688 }
689 
SendControlMessage(const rtc::CopyOnWriteBuffer & buffer)690 bool SctpDataChannel::SendControlMessage(const rtc::CopyOnWriteBuffer& buffer) {
691   RTC_DCHECK_RUN_ON(signaling_thread_);
692   RTC_DCHECK(writable_);
693   RTC_DCHECK_GE(config_.id, 0);
694 
695   bool is_open_message = handshake_state_ == kHandshakeShouldSendOpen;
696   RTC_DCHECK(!is_open_message || !config_.negotiated);
697 
698   cricket::SendDataParams send_params;
699   send_params.sid = config_.id;
700   // Send data as ordered before we receive any message from the remote peer to
701   // make sure the remote peer will not receive any data before it receives the
702   // OPEN message.
703   send_params.ordered = config_.ordered || is_open_message;
704   send_params.type = cricket::DMT_CONTROL;
705 
706   cricket::SendDataResult send_result = cricket::SDR_SUCCESS;
707   bool retval = provider_->SendData(send_params, buffer, &send_result);
708   if (retval) {
709     RTC_LOG(LS_VERBOSE) << "Sent CONTROL message on channel " << config_.id;
710 
711     if (handshake_state_ == kHandshakeShouldSendAck) {
712       handshake_state_ = kHandshakeReady;
713     } else if (handshake_state_ == kHandshakeShouldSendOpen) {
714       handshake_state_ = kHandshakeWaitingForAck;
715     }
716   } else if (send_result == cricket::SDR_BLOCK) {
717     QueueControlMessage(buffer);
718   } else {
719     RTC_LOG(LS_ERROR) << "Closing the DataChannel due to a failure to send"
720                          " the CONTROL message, send_result = "
721                       << send_result;
722     CloseAbruptlyWithError(RTCError(RTCErrorType::NETWORK_ERROR,
723                                     "Failed to send a CONTROL message"));
724   }
725   return retval;
726 }
727 
728 // static
ResetInternalIdAllocatorForTesting(int new_value)729 void SctpDataChannel::ResetInternalIdAllocatorForTesting(int new_value) {
730   g_unique_id = new_value;
731 }
732 
733 }  // namespace webrtc
734