1 /*
2 * Copyright 2020 The WebRTC project authors. All Rights Reserved.
3 *
4 * Use of this source code is governed by a BSD-style license
5 * that can be found in the LICENSE file in the root of the source
6 * tree. An additional intellectual property rights grant can be found
7 * in the file PATENTS. All contributing project authors may
8 * be found in the AUTHORS file in the root of the source tree.
9 */
10
11 #include "pc/sctp_data_channel.h"
12
13 #include <memory>
14 #include <string>
15 #include <utility>
16
17 #include "api/proxy.h"
18 #include "media/sctp/sctp_transport_internal.h"
19 #include "pc/sctp_utils.h"
20 #include "rtc_base/checks.h"
21 #include "rtc_base/location.h"
22 #include "rtc_base/logging.h"
23 #include "rtc_base/ref_counted_object.h"
24 #include "rtc_base/thread.h"
25
26 namespace webrtc {
27
28 namespace {
29
30 static size_t kMaxQueuedReceivedDataBytes = 16 * 1024 * 1024;
31 static size_t kMaxQueuedSendDataBytes = 16 * 1024 * 1024;
32
33 static std::atomic<int> g_unique_id{0};
34
GenerateUniqueId()35 int GenerateUniqueId() {
36 return ++g_unique_id;
37 }
38
39 // Define proxy for DataChannelInterface.
40 BEGIN_SIGNALING_PROXY_MAP(DataChannel)
41 PROXY_SIGNALING_THREAD_DESTRUCTOR()
42 PROXY_METHOD1(void, RegisterObserver, DataChannelObserver*)
43 PROXY_METHOD0(void, UnregisterObserver)
44 BYPASS_PROXY_CONSTMETHOD0(std::string, label)
45 BYPASS_PROXY_CONSTMETHOD0(bool, reliable)
46 BYPASS_PROXY_CONSTMETHOD0(bool, ordered)
47 BYPASS_PROXY_CONSTMETHOD0(uint16_t, maxRetransmitTime)
48 BYPASS_PROXY_CONSTMETHOD0(uint16_t, maxRetransmits)
49 BYPASS_PROXY_CONSTMETHOD0(absl::optional<int>, maxRetransmitsOpt)
50 BYPASS_PROXY_CONSTMETHOD0(absl::optional<int>, maxPacketLifeTime)
51 BYPASS_PROXY_CONSTMETHOD0(std::string, protocol)
52 BYPASS_PROXY_CONSTMETHOD0(bool, negotiated)
53 // Can't bypass the proxy since the id may change.
54 PROXY_CONSTMETHOD0(int, id)
55 BYPASS_PROXY_CONSTMETHOD0(Priority, priority)
56 PROXY_CONSTMETHOD0(DataState, state)
57 PROXY_CONSTMETHOD0(RTCError, error)
58 PROXY_CONSTMETHOD0(uint32_t, messages_sent)
59 PROXY_CONSTMETHOD0(uint64_t, bytes_sent)
60 PROXY_CONSTMETHOD0(uint32_t, messages_received)
61 PROXY_CONSTMETHOD0(uint64_t, bytes_received)
62 PROXY_CONSTMETHOD0(uint64_t, buffered_amount)
63 PROXY_METHOD0(void, Close)
64 // TODO(bugs.webrtc.org/11547): Change to run on the network thread.
65 PROXY_METHOD1(bool, Send, const DataBuffer&)
66 END_PROXY_MAP()
67
68 } // namespace
69
InternalDataChannelInit(const DataChannelInit & base)70 InternalDataChannelInit::InternalDataChannelInit(const DataChannelInit& base)
71 : DataChannelInit(base), open_handshake_role(kOpener) {
72 // If the channel is externally negotiated, do not send the OPEN message.
73 if (base.negotiated) {
74 open_handshake_role = kNone;
75 } else {
76 // Datachannel is externally negotiated. Ignore the id value.
77 // Specified in createDataChannel, WebRTC spec section 6.1 bullet 13.
78 id = -1;
79 }
80 // Backwards compatibility: If base.maxRetransmits or base.maxRetransmitTime
81 // have been set to -1, unset them.
82 if (maxRetransmits && *maxRetransmits == -1) {
83 RTC_LOG(LS_ERROR)
84 << "Accepting maxRetransmits = -1 for backwards compatibility";
85 maxRetransmits = absl::nullopt;
86 }
87 if (maxRetransmitTime && *maxRetransmitTime == -1) {
88 RTC_LOG(LS_ERROR)
89 << "Accepting maxRetransmitTime = -1 for backwards compatibility";
90 maxRetransmitTime = absl::nullopt;
91 }
92 }
93
AllocateSid(rtc::SSLRole role,int * sid)94 bool SctpSidAllocator::AllocateSid(rtc::SSLRole role, int* sid) {
95 int potential_sid = (role == rtc::SSL_CLIENT) ? 0 : 1;
96 while (!IsSidAvailable(potential_sid)) {
97 potential_sid += 2;
98 if (potential_sid > static_cast<int>(cricket::kMaxSctpSid)) {
99 return false;
100 }
101 }
102
103 *sid = potential_sid;
104 used_sids_.insert(potential_sid);
105 return true;
106 }
107
ReserveSid(int sid)108 bool SctpSidAllocator::ReserveSid(int sid) {
109 if (!IsSidAvailable(sid)) {
110 return false;
111 }
112 used_sids_.insert(sid);
113 return true;
114 }
115
ReleaseSid(int sid)116 void SctpSidAllocator::ReleaseSid(int sid) {
117 auto it = used_sids_.find(sid);
118 if (it != used_sids_.end()) {
119 used_sids_.erase(it);
120 }
121 }
122
IsSidAvailable(int sid) const123 bool SctpSidAllocator::IsSidAvailable(int sid) const {
124 if (sid < static_cast<int>(cricket::kMinSctpSid) ||
125 sid > static_cast<int>(cricket::kMaxSctpSid)) {
126 return false;
127 }
128 return used_sids_.find(sid) == used_sids_.end();
129 }
130
Create(SctpDataChannelProviderInterface * provider,const std::string & label,const InternalDataChannelInit & config,rtc::Thread * signaling_thread,rtc::Thread * network_thread)131 rtc::scoped_refptr<SctpDataChannel> SctpDataChannel::Create(
132 SctpDataChannelProviderInterface* provider,
133 const std::string& label,
134 const InternalDataChannelInit& config,
135 rtc::Thread* signaling_thread,
136 rtc::Thread* network_thread) {
137 rtc::scoped_refptr<SctpDataChannel> channel(
138 new rtc::RefCountedObject<SctpDataChannel>(
139 config, provider, label, signaling_thread, network_thread));
140 if (!channel->Init()) {
141 return nullptr;
142 }
143 return channel;
144 }
145
146 // static
CreateProxy(rtc::scoped_refptr<SctpDataChannel> channel)147 rtc::scoped_refptr<DataChannelInterface> SctpDataChannel::CreateProxy(
148 rtc::scoped_refptr<SctpDataChannel> channel) {
149 // TODO(bugs.webrtc.org/11547): incorporate the network thread in the proxy.
150 // Also, consider allowing the proxy object to own the reference (std::move).
151 // As is, the proxy has a raw pointer and no reference to the channel object
152 // and trusting that the lifetime management aligns with the
153 // sctp_data_channels_ array in SctpDataChannelController.
154 return DataChannelProxy::Create(channel->signaling_thread_, channel.get());
155 }
156
SctpDataChannel(const InternalDataChannelInit & config,SctpDataChannelProviderInterface * provider,const std::string & label,rtc::Thread * signaling_thread,rtc::Thread * network_thread)157 SctpDataChannel::SctpDataChannel(const InternalDataChannelInit& config,
158 SctpDataChannelProviderInterface* provider,
159 const std::string& label,
160 rtc::Thread* signaling_thread,
161 rtc::Thread* network_thread)
162 : signaling_thread_(signaling_thread),
163 network_thread_(network_thread),
164 internal_id_(GenerateUniqueId()),
165 label_(label),
166 config_(config),
167 observer_(nullptr),
168 provider_(provider) {
169 RTC_DCHECK_RUN_ON(signaling_thread_);
170 }
171
Init()172 bool SctpDataChannel::Init() {
173 RTC_DCHECK_RUN_ON(signaling_thread_);
174 if (config_.id < -1 ||
175 (config_.maxRetransmits && *config_.maxRetransmits < 0) ||
176 (config_.maxRetransmitTime && *config_.maxRetransmitTime < 0)) {
177 RTC_LOG(LS_ERROR) << "Failed to initialize the SCTP data channel due to "
178 "invalid DataChannelInit.";
179 return false;
180 }
181 if (config_.maxRetransmits && config_.maxRetransmitTime) {
182 RTC_LOG(LS_ERROR)
183 << "maxRetransmits and maxRetransmitTime should not be both set.";
184 return false;
185 }
186
187 switch (config_.open_handshake_role) {
188 case webrtc::InternalDataChannelInit::kNone: // pre-negotiated
189 handshake_state_ = kHandshakeReady;
190 break;
191 case webrtc::InternalDataChannelInit::kOpener:
192 handshake_state_ = kHandshakeShouldSendOpen;
193 break;
194 case webrtc::InternalDataChannelInit::kAcker:
195 handshake_state_ = kHandshakeShouldSendAck;
196 break;
197 }
198
199 // Try to connect to the transport in case the transport channel already
200 // exists.
201 OnTransportChannelCreated();
202
203 // Checks if the transport is ready to send because the initial channel
204 // ready signal may have been sent before the DataChannel creation.
205 // This has to be done async because the upper layer objects (e.g.
206 // Chrome glue and WebKit) are not wired up properly until after this
207 // function returns.
208 if (provider_->ReadyToSendData()) {
209 invoker_.AsyncInvoke<void>(RTC_FROM_HERE, rtc::Thread::Current(),
210 [this] { OnTransportReady(true); });
211 }
212
213 return true;
214 }
215
~SctpDataChannel()216 SctpDataChannel::~SctpDataChannel() {
217 RTC_DCHECK_RUN_ON(signaling_thread_);
218 }
219
RegisterObserver(DataChannelObserver * observer)220 void SctpDataChannel::RegisterObserver(DataChannelObserver* observer) {
221 RTC_DCHECK_RUN_ON(signaling_thread_);
222 observer_ = observer;
223 DeliverQueuedReceivedData();
224 }
225
UnregisterObserver()226 void SctpDataChannel::UnregisterObserver() {
227 RTC_DCHECK_RUN_ON(signaling_thread_);
228 observer_ = nullptr;
229 }
230
reliable() const231 bool SctpDataChannel::reliable() const {
232 // May be called on any thread.
233 return !config_.maxRetransmits && !config_.maxRetransmitTime;
234 }
235
buffered_amount() const236 uint64_t SctpDataChannel::buffered_amount() const {
237 RTC_DCHECK_RUN_ON(signaling_thread_);
238 return buffered_amount_;
239 }
240
Close()241 void SctpDataChannel::Close() {
242 RTC_DCHECK_RUN_ON(signaling_thread_);
243 if (state_ == kClosed)
244 return;
245 SetState(kClosing);
246 // Will send queued data before beginning the underlying closing procedure.
247 UpdateState();
248 }
249
state() const250 SctpDataChannel::DataState SctpDataChannel::state() const {
251 RTC_DCHECK_RUN_ON(signaling_thread_);
252 return state_;
253 }
254
error() const255 RTCError SctpDataChannel::error() const {
256 RTC_DCHECK_RUN_ON(signaling_thread_);
257 return error_;
258 }
259
messages_sent() const260 uint32_t SctpDataChannel::messages_sent() const {
261 RTC_DCHECK_RUN_ON(signaling_thread_);
262 return messages_sent_;
263 }
264
bytes_sent() const265 uint64_t SctpDataChannel::bytes_sent() const {
266 RTC_DCHECK_RUN_ON(signaling_thread_);
267 return bytes_sent_;
268 }
269
messages_received() const270 uint32_t SctpDataChannel::messages_received() const {
271 RTC_DCHECK_RUN_ON(signaling_thread_);
272 return messages_received_;
273 }
274
bytes_received() const275 uint64_t SctpDataChannel::bytes_received() const {
276 RTC_DCHECK_RUN_ON(signaling_thread_);
277 return bytes_received_;
278 }
279
Send(const DataBuffer & buffer)280 bool SctpDataChannel::Send(const DataBuffer& buffer) {
281 RTC_DCHECK_RUN_ON(signaling_thread_);
282 // TODO(bugs.webrtc.org/11547): Expect this method to be called on the network
283 // thread. Bring buffer management etc to the network thread and keep the
284 // operational state management on the signaling thread.
285
286 if (state_ != kOpen) {
287 return false;
288 }
289
290 // TODO(jiayl): the spec is unclear about if the remote side should get the
291 // onmessage event. We need to figure out the expected behavior and change the
292 // code accordingly.
293 if (buffer.size() == 0) {
294 return true;
295 }
296
297 buffered_amount_ += buffer.size();
298
299 // If the queue is non-empty, we're waiting for SignalReadyToSend,
300 // so just add to the end of the queue and keep waiting.
301 if (!queued_send_data_.Empty()) {
302 if (!QueueSendDataMessage(buffer)) {
303 RTC_LOG(LS_ERROR) << "Closing the DataChannel due to a failure to queue "
304 "additional data.";
305 // https://w3c.github.io/webrtc-pc/#dom-rtcdatachannel-send step 5
306 // Note that the spec doesn't explicitly say to close in this situation.
307 CloseAbruptlyWithError(RTCError(RTCErrorType::RESOURCE_EXHAUSTED,
308 "Unable to queue data for sending"));
309 }
310 return true;
311 }
312
313 SendDataMessage(buffer, true);
314
315 // Always return true for SCTP DataChannel per the spec.
316 return true;
317 }
318
SetSctpSid(int sid)319 void SctpDataChannel::SetSctpSid(int sid) {
320 RTC_DCHECK_RUN_ON(signaling_thread_);
321 RTC_DCHECK_LT(config_.id, 0);
322 RTC_DCHECK_GE(sid, 0);
323 RTC_DCHECK_NE(handshake_state_, kHandshakeWaitingForAck);
324 RTC_DCHECK_EQ(state_, kConnecting);
325
326 if (config_.id == sid) {
327 return;
328 }
329
330 const_cast<InternalDataChannelInit&>(config_).id = sid;
331 provider_->AddSctpDataStream(sid);
332 }
333
OnClosingProcedureStartedRemotely(int sid)334 void SctpDataChannel::OnClosingProcedureStartedRemotely(int sid) {
335 RTC_DCHECK_RUN_ON(signaling_thread_);
336 if (sid == config_.id && state_ != kClosing && state_ != kClosed) {
337 // Don't bother sending queued data since the side that initiated the
338 // closure wouldn't receive it anyway. See crbug.com/559394 for a lengthy
339 // discussion about this.
340 queued_send_data_.Clear();
341 queued_control_data_.Clear();
342 // Just need to change state to kClosing, SctpTransport will handle the
343 // rest of the closing procedure and OnClosingProcedureComplete will be
344 // called later.
345 started_closing_procedure_ = true;
346 SetState(kClosing);
347 }
348 }
349
OnClosingProcedureComplete(int sid)350 void SctpDataChannel::OnClosingProcedureComplete(int sid) {
351 RTC_DCHECK_RUN_ON(signaling_thread_);
352 if (sid == config_.id) {
353 // If the closing procedure is complete, we should have finished sending
354 // all pending data and transitioned to kClosing already.
355 RTC_DCHECK_EQ(state_, kClosing);
356 RTC_DCHECK(queued_send_data_.Empty());
357 DisconnectFromProvider();
358 SetState(kClosed);
359 }
360 }
361
OnTransportChannelCreated()362 void SctpDataChannel::OnTransportChannelCreated() {
363 RTC_DCHECK_RUN_ON(signaling_thread_);
364 if (!connected_to_provider_) {
365 connected_to_provider_ = provider_->ConnectDataChannel(this);
366 }
367 // The sid may have been unassigned when provider_->ConnectDataChannel was
368 // done. So always add the streams even if connected_to_provider_ is true.
369 if (config_.id >= 0) {
370 provider_->AddSctpDataStream(config_.id);
371 }
372 }
373
OnTransportChannelClosed()374 void SctpDataChannel::OnTransportChannelClosed() {
375 // The SctpTransport is unusable (for example, because the SCTP m= section
376 // was rejected, or because the DTLS transport closed), so we need to close
377 // abruptly.
378 RTCError error = RTCError(RTCErrorType::OPERATION_ERROR_WITH_DATA,
379 "Transport channel closed");
380 error.set_error_detail(RTCErrorDetailType::SCTP_FAILURE);
381 CloseAbruptlyWithError(std::move(error));
382 }
383
GetStats() const384 DataChannelStats SctpDataChannel::GetStats() const {
385 RTC_DCHECK_RUN_ON(signaling_thread_);
386 DataChannelStats stats{internal_id_, id(), label(),
387 protocol(), state(), messages_sent(),
388 messages_received(), bytes_sent(), bytes_received()};
389 return stats;
390 }
391
OnDataReceived(const cricket::ReceiveDataParams & params,const rtc::CopyOnWriteBuffer & payload)392 void SctpDataChannel::OnDataReceived(const cricket::ReceiveDataParams& params,
393 const rtc::CopyOnWriteBuffer& payload) {
394 RTC_DCHECK_RUN_ON(signaling_thread_);
395 if (params.sid != config_.id) {
396 return;
397 }
398
399 if (params.type == cricket::DMT_CONTROL) {
400 if (handshake_state_ != kHandshakeWaitingForAck) {
401 // Ignore it if we are not expecting an ACK message.
402 RTC_LOG(LS_WARNING)
403 << "DataChannel received unexpected CONTROL message, sid = "
404 << params.sid;
405 return;
406 }
407 if (ParseDataChannelOpenAckMessage(payload)) {
408 // We can send unordered as soon as we receive the ACK message.
409 handshake_state_ = kHandshakeReady;
410 RTC_LOG(LS_INFO) << "DataChannel received OPEN_ACK message, sid = "
411 << params.sid;
412 } else {
413 RTC_LOG(LS_WARNING)
414 << "DataChannel failed to parse OPEN_ACK message, sid = "
415 << params.sid;
416 }
417 return;
418 }
419
420 RTC_DCHECK(params.type == cricket::DMT_BINARY ||
421 params.type == cricket::DMT_TEXT);
422
423 RTC_LOG(LS_VERBOSE) << "DataChannel received DATA message, sid = "
424 << params.sid;
425 // We can send unordered as soon as we receive any DATA message since the
426 // remote side must have received the OPEN (and old clients do not send
427 // OPEN_ACK).
428 if (handshake_state_ == kHandshakeWaitingForAck) {
429 handshake_state_ = kHandshakeReady;
430 }
431
432 bool binary = (params.type == cricket::DMT_BINARY);
433 auto buffer = std::make_unique<DataBuffer>(payload, binary);
434 if (state_ == kOpen && observer_) {
435 ++messages_received_;
436 bytes_received_ += buffer->size();
437 observer_->OnMessage(*buffer.get());
438 } else {
439 if (queued_received_data_.byte_count() + payload.size() >
440 kMaxQueuedReceivedDataBytes) {
441 RTC_LOG(LS_ERROR) << "Queued received data exceeds the max buffer size.";
442
443 queued_received_data_.Clear();
444 CloseAbruptlyWithError(
445 RTCError(RTCErrorType::RESOURCE_EXHAUSTED,
446 "Queued received data exceeds the max buffer size."));
447
448 return;
449 }
450 queued_received_data_.PushBack(std::move(buffer));
451 }
452 }
453
OnTransportReady(bool writable)454 void SctpDataChannel::OnTransportReady(bool writable) {
455 RTC_DCHECK_RUN_ON(signaling_thread_);
456
457 writable_ = writable;
458 if (!writable) {
459 return;
460 }
461
462 SendQueuedControlMessages();
463 SendQueuedDataMessages();
464
465 UpdateState();
466 }
467
CloseAbruptlyWithError(RTCError error)468 void SctpDataChannel::CloseAbruptlyWithError(RTCError error) {
469 RTC_DCHECK_RUN_ON(signaling_thread_);
470
471 if (state_ == kClosed) {
472 return;
473 }
474
475 if (connected_to_provider_) {
476 DisconnectFromProvider();
477 }
478
479 // Closing abruptly means any queued data gets thrown away.
480 buffered_amount_ = 0;
481
482 queued_send_data_.Clear();
483 queued_control_data_.Clear();
484
485 // Still go to "kClosing" before "kClosed", since observers may be expecting
486 // that.
487 SetState(kClosing);
488 error_ = std::move(error);
489 SetState(kClosed);
490 }
491
CloseAbruptlyWithDataChannelFailure(const std::string & message)492 void SctpDataChannel::CloseAbruptlyWithDataChannelFailure(
493 const std::string& message) {
494 RTCError error(RTCErrorType::OPERATION_ERROR_WITH_DATA, message);
495 error.set_error_detail(RTCErrorDetailType::DATA_CHANNEL_FAILURE);
496 CloseAbruptlyWithError(std::move(error));
497 }
498
UpdateState()499 void SctpDataChannel::UpdateState() {
500 RTC_DCHECK_RUN_ON(signaling_thread_);
501 // UpdateState determines what to do from a few state variables. Include
502 // all conditions required for each state transition here for
503 // clarity. OnTransportReady(true) will send any queued data and then invoke
504 // UpdateState().
505
506 switch (state_) {
507 case kConnecting: {
508 if (connected_to_provider_) {
509 if (handshake_state_ == kHandshakeShouldSendOpen) {
510 rtc::CopyOnWriteBuffer payload;
511 WriteDataChannelOpenMessage(label_, config_, &payload);
512 SendControlMessage(payload);
513 } else if (handshake_state_ == kHandshakeShouldSendAck) {
514 rtc::CopyOnWriteBuffer payload;
515 WriteDataChannelOpenAckMessage(&payload);
516 SendControlMessage(payload);
517 }
518 if (writable_ && (handshake_state_ == kHandshakeReady ||
519 handshake_state_ == kHandshakeWaitingForAck)) {
520 SetState(kOpen);
521 // If we have received buffers before the channel got writable.
522 // Deliver them now.
523 DeliverQueuedReceivedData();
524 }
525 }
526 break;
527 }
528 case kOpen: {
529 break;
530 }
531 case kClosing: {
532 // Wait for all queued data to be sent before beginning the closing
533 // procedure.
534 if (queued_send_data_.Empty() && queued_control_data_.Empty()) {
535 // For SCTP data channels, we need to wait for the closing procedure
536 // to complete; after calling RemoveSctpDataStream,
537 // OnClosingProcedureComplete will end up called asynchronously
538 // afterwards.
539 if (connected_to_provider_ && !started_closing_procedure_ &&
540 config_.id >= 0) {
541 started_closing_procedure_ = true;
542 provider_->RemoveSctpDataStream(config_.id);
543 }
544 }
545 break;
546 }
547 case kClosed:
548 break;
549 }
550 }
551
SetState(DataState state)552 void SctpDataChannel::SetState(DataState state) {
553 RTC_DCHECK_RUN_ON(signaling_thread_);
554 if (state_ == state) {
555 return;
556 }
557
558 state_ = state;
559 if (observer_) {
560 observer_->OnStateChange();
561 }
562 if (state_ == kOpen) {
563 SignalOpened(this);
564 } else if (state_ == kClosed) {
565 SignalClosed(this);
566 }
567 }
568
DisconnectFromProvider()569 void SctpDataChannel::DisconnectFromProvider() {
570 RTC_DCHECK_RUN_ON(signaling_thread_);
571 if (!connected_to_provider_)
572 return;
573
574 provider_->DisconnectDataChannel(this);
575 connected_to_provider_ = false;
576 }
577
DeliverQueuedReceivedData()578 void SctpDataChannel::DeliverQueuedReceivedData() {
579 RTC_DCHECK_RUN_ON(signaling_thread_);
580 if (!observer_) {
581 return;
582 }
583
584 while (!queued_received_data_.Empty()) {
585 std::unique_ptr<DataBuffer> buffer = queued_received_data_.PopFront();
586 ++messages_received_;
587 bytes_received_ += buffer->size();
588 observer_->OnMessage(*buffer);
589 }
590 }
591
SendQueuedDataMessages()592 void SctpDataChannel::SendQueuedDataMessages() {
593 RTC_DCHECK_RUN_ON(signaling_thread_);
594 if (queued_send_data_.Empty()) {
595 return;
596 }
597
598 RTC_DCHECK(state_ == kOpen || state_ == kClosing);
599
600 while (!queued_send_data_.Empty()) {
601 std::unique_ptr<DataBuffer> buffer = queued_send_data_.PopFront();
602 if (!SendDataMessage(*buffer, false)) {
603 // Return the message to the front of the queue if sending is aborted.
604 queued_send_data_.PushFront(std::move(buffer));
605 break;
606 }
607 }
608 }
609
SendDataMessage(const DataBuffer & buffer,bool queue_if_blocked)610 bool SctpDataChannel::SendDataMessage(const DataBuffer& buffer,
611 bool queue_if_blocked) {
612 RTC_DCHECK_RUN_ON(signaling_thread_);
613 cricket::SendDataParams send_params;
614
615 send_params.ordered = config_.ordered;
616 // Send as ordered if it is still going through OPEN/ACK signaling.
617 if (handshake_state_ != kHandshakeReady && !config_.ordered) {
618 send_params.ordered = true;
619 RTC_LOG(LS_VERBOSE)
620 << "Sending data as ordered for unordered DataChannel "
621 "because the OPEN_ACK message has not been received.";
622 }
623
624 send_params.max_rtx_count =
625 config_.maxRetransmits ? *config_.maxRetransmits : -1;
626 send_params.max_rtx_ms =
627 config_.maxRetransmitTime ? *config_.maxRetransmitTime : -1;
628 send_params.sid = config_.id;
629 send_params.type = buffer.binary ? cricket::DMT_BINARY : cricket::DMT_TEXT;
630
631 cricket::SendDataResult send_result = cricket::SDR_SUCCESS;
632 bool success = provider_->SendData(send_params, buffer.data, &send_result);
633
634 if (success) {
635 ++messages_sent_;
636 bytes_sent_ += buffer.size();
637
638 RTC_DCHECK(buffered_amount_ >= buffer.size());
639 buffered_amount_ -= buffer.size();
640 if (observer_ && buffer.size() > 0) {
641 observer_->OnBufferedAmountChange(buffer.size());
642 }
643 return true;
644 }
645
646 if (send_result == cricket::SDR_BLOCK) {
647 if (!queue_if_blocked || QueueSendDataMessage(buffer)) {
648 return false;
649 }
650 }
651 // Close the channel if the error is not SDR_BLOCK, or if queuing the
652 // message failed.
653 RTC_LOG(LS_ERROR) << "Closing the DataChannel due to a failure to send data, "
654 "send_result = "
655 << send_result;
656 CloseAbruptlyWithError(
657 RTCError(RTCErrorType::NETWORK_ERROR, "Failure to send data"));
658
659 return false;
660 }
661
QueueSendDataMessage(const DataBuffer & buffer)662 bool SctpDataChannel::QueueSendDataMessage(const DataBuffer& buffer) {
663 RTC_DCHECK_RUN_ON(signaling_thread_);
664 size_t start_buffered_amount = queued_send_data_.byte_count();
665 if (start_buffered_amount + buffer.size() > kMaxQueuedSendDataBytes) {
666 RTC_LOG(LS_ERROR) << "Can't buffer any more data for the data channel.";
667 return false;
668 }
669 queued_send_data_.PushBack(std::make_unique<DataBuffer>(buffer));
670 return true;
671 }
672
SendQueuedControlMessages()673 void SctpDataChannel::SendQueuedControlMessages() {
674 RTC_DCHECK_RUN_ON(signaling_thread_);
675 PacketQueue control_packets;
676 control_packets.Swap(&queued_control_data_);
677
678 while (!control_packets.Empty()) {
679 std::unique_ptr<DataBuffer> buf = control_packets.PopFront();
680 SendControlMessage(buf->data);
681 }
682 }
683
QueueControlMessage(const rtc::CopyOnWriteBuffer & buffer)684 void SctpDataChannel::QueueControlMessage(
685 const rtc::CopyOnWriteBuffer& buffer) {
686 RTC_DCHECK_RUN_ON(signaling_thread_);
687 queued_control_data_.PushBack(std::make_unique<DataBuffer>(buffer, true));
688 }
689
SendControlMessage(const rtc::CopyOnWriteBuffer & buffer)690 bool SctpDataChannel::SendControlMessage(const rtc::CopyOnWriteBuffer& buffer) {
691 RTC_DCHECK_RUN_ON(signaling_thread_);
692 RTC_DCHECK(writable_);
693 RTC_DCHECK_GE(config_.id, 0);
694
695 bool is_open_message = handshake_state_ == kHandshakeShouldSendOpen;
696 RTC_DCHECK(!is_open_message || !config_.negotiated);
697
698 cricket::SendDataParams send_params;
699 send_params.sid = config_.id;
700 // Send data as ordered before we receive any message from the remote peer to
701 // make sure the remote peer will not receive any data before it receives the
702 // OPEN message.
703 send_params.ordered = config_.ordered || is_open_message;
704 send_params.type = cricket::DMT_CONTROL;
705
706 cricket::SendDataResult send_result = cricket::SDR_SUCCESS;
707 bool retval = provider_->SendData(send_params, buffer, &send_result);
708 if (retval) {
709 RTC_LOG(LS_VERBOSE) << "Sent CONTROL message on channel " << config_.id;
710
711 if (handshake_state_ == kHandshakeShouldSendAck) {
712 handshake_state_ = kHandshakeReady;
713 } else if (handshake_state_ == kHandshakeShouldSendOpen) {
714 handshake_state_ = kHandshakeWaitingForAck;
715 }
716 } else if (send_result == cricket::SDR_BLOCK) {
717 QueueControlMessage(buffer);
718 } else {
719 RTC_LOG(LS_ERROR) << "Closing the DataChannel due to a failure to send"
720 " the CONTROL message, send_result = "
721 << send_result;
722 CloseAbruptlyWithError(RTCError(RTCErrorType::NETWORK_ERROR,
723 "Failed to send a CONTROL message"));
724 }
725 return retval;
726 }
727
728 // static
ResetInternalIdAllocatorForTesting(int new_value)729 void SctpDataChannel::ResetInternalIdAllocatorForTesting(int new_value) {
730 g_unique_id = new_value;
731 }
732
733 } // namespace webrtc
734