1 /*
2 * Copyright (C) 2010 The Android Open Source Project
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
17 //#define LOG_NDEBUG 0
18 #define LOG_TAG "ARTPConnection"
19 #include <utils/Log.h>
20
21 #include "ARTPAssembler.h"
22 #include "ARTPConnection.h"
23
24 #include "ARTPSource.h"
25 #include "ASessionDescription.h"
26
27 #include <media/stagefright/foundation/ABuffer.h>
28 #include <media/stagefright/foundation/ADebug.h>
29 #include <media/stagefright/foundation/AMessage.h>
30 #include <media/stagefright/foundation/AString.h>
31 #include <media/stagefright/foundation/hexdump.h>
32
33 #include <arpa/inet.h>
34 #include <sys/socket.h>
35
36 namespace android {
37
38 static const size_t kMaxUDPSize = 1500;
39
u16at(const uint8_t * data)40 static uint16_t u16at(const uint8_t *data) {
41 return data[0] << 8 | data[1];
42 }
43
u32at(const uint8_t * data)44 static uint32_t u32at(const uint8_t *data) {
45 return u16at(data) << 16 | u16at(&data[2]);
46 }
47
u64at(const uint8_t * data)48 static uint64_t u64at(const uint8_t *data) {
49 return (uint64_t)(u32at(data)) << 32 | u32at(&data[4]);
50 }
51
52 // static
53 const int64_t ARTPConnection::kSelectTimeoutUs = 1000ll;
54
55 struct ARTPConnection::StreamInfo {
56 int mRTPSocket;
57 int mRTCPSocket;
58 sp<ASessionDescription> mSessionDesc;
59 size_t mIndex;
60 sp<AMessage> mNotifyMsg;
61 KeyedVector<uint32_t, sp<ARTPSource> > mSources;
62
63 int64_t mNumRTCPPacketsReceived;
64 int64_t mNumRTPPacketsReceived;
65 struct sockaddr_in mRemoteRTCPAddr;
66
67 bool mIsInjected;
68 };
69
ARTPConnection(uint32_t flags)70 ARTPConnection::ARTPConnection(uint32_t flags)
71 : mFlags(flags),
72 mPollEventPending(false),
73 mLastReceiverReportTimeUs(-1) {
74 }
75
~ARTPConnection()76 ARTPConnection::~ARTPConnection() {
77 }
78
addStream(int rtpSocket,int rtcpSocket,const sp<ASessionDescription> & sessionDesc,size_t index,const sp<AMessage> & notify,bool injected)79 void ARTPConnection::addStream(
80 int rtpSocket, int rtcpSocket,
81 const sp<ASessionDescription> &sessionDesc,
82 size_t index,
83 const sp<AMessage> ¬ify,
84 bool injected) {
85 sp<AMessage> msg = new AMessage(kWhatAddStream, this);
86 msg->setInt32("rtp-socket", rtpSocket);
87 msg->setInt32("rtcp-socket", rtcpSocket);
88 msg->setObject("session-desc", sessionDesc);
89 msg->setSize("index", index);
90 msg->setMessage("notify", notify);
91 msg->setInt32("injected", injected);
92 msg->post();
93 }
94
removeStream(int rtpSocket,int rtcpSocket)95 void ARTPConnection::removeStream(int rtpSocket, int rtcpSocket) {
96 sp<AMessage> msg = new AMessage(kWhatRemoveStream, this);
97 msg->setInt32("rtp-socket", rtpSocket);
98 msg->setInt32("rtcp-socket", rtcpSocket);
99 msg->post();
100 }
101
bumpSocketBufferSize(int s)102 static void bumpSocketBufferSize(int s) {
103 int size = 256 * 1024;
104 CHECK_EQ(setsockopt(s, SOL_SOCKET, SO_RCVBUF, &size, sizeof(size)), 0);
105 }
106
107 // static
MakePortPair(int * rtpSocket,int * rtcpSocket,unsigned * rtpPort)108 void ARTPConnection::MakePortPair(
109 int *rtpSocket, int *rtcpSocket, unsigned *rtpPort) {
110 *rtpSocket = socket(AF_INET, SOCK_DGRAM, 0);
111 CHECK_GE(*rtpSocket, 0);
112
113 bumpSocketBufferSize(*rtpSocket);
114
115 *rtcpSocket = socket(AF_INET, SOCK_DGRAM, 0);
116 CHECK_GE(*rtcpSocket, 0);
117
118 bumpSocketBufferSize(*rtcpSocket);
119
120 /* rand() * 1000 may overflow int type, use long long */
121 unsigned start = (unsigned)((rand()* 1000ll)/RAND_MAX) + 15550;
122 start &= ~1;
123
124 for (unsigned port = start; port < 65536; port += 2) {
125 struct sockaddr_in addr;
126 memset(addr.sin_zero, 0, sizeof(addr.sin_zero));
127 addr.sin_family = AF_INET;
128 addr.sin_addr.s_addr = htonl(INADDR_ANY);
129 addr.sin_port = htons(port);
130
131 if (bind(*rtpSocket,
132 (const struct sockaddr *)&addr, sizeof(addr)) < 0) {
133 continue;
134 }
135
136 addr.sin_port = htons(port + 1);
137
138 if (bind(*rtcpSocket,
139 (const struct sockaddr *)&addr, sizeof(addr)) == 0) {
140 *rtpPort = port;
141 return;
142 }
143 }
144
145 TRESPASS();
146 }
147
onMessageReceived(const sp<AMessage> & msg)148 void ARTPConnection::onMessageReceived(const sp<AMessage> &msg) {
149 switch (msg->what()) {
150 case kWhatAddStream:
151 {
152 onAddStream(msg);
153 break;
154 }
155
156 case kWhatRemoveStream:
157 {
158 onRemoveStream(msg);
159 break;
160 }
161
162 case kWhatPollStreams:
163 {
164 onPollStreams();
165 break;
166 }
167
168 case kWhatInjectPacket:
169 {
170 onInjectPacket(msg);
171 break;
172 }
173
174 default:
175 {
176 TRESPASS();
177 break;
178 }
179 }
180 }
181
onAddStream(const sp<AMessage> & msg)182 void ARTPConnection::onAddStream(const sp<AMessage> &msg) {
183 mStreams.push_back(StreamInfo());
184 StreamInfo *info = &*--mStreams.end();
185
186 int32_t s;
187 CHECK(msg->findInt32("rtp-socket", &s));
188 info->mRTPSocket = s;
189 CHECK(msg->findInt32("rtcp-socket", &s));
190 info->mRTCPSocket = s;
191
192 int32_t injected;
193 CHECK(msg->findInt32("injected", &injected));
194
195 info->mIsInjected = injected;
196
197 sp<RefBase> obj;
198 CHECK(msg->findObject("session-desc", &obj));
199 info->mSessionDesc = static_cast<ASessionDescription *>(obj.get());
200
201 CHECK(msg->findSize("index", &info->mIndex));
202 CHECK(msg->findMessage("notify", &info->mNotifyMsg));
203
204 info->mNumRTCPPacketsReceived = 0;
205 info->mNumRTPPacketsReceived = 0;
206 memset(&info->mRemoteRTCPAddr, 0, sizeof(info->mRemoteRTCPAddr));
207
208 if (!injected) {
209 postPollEvent();
210 }
211 }
212
onRemoveStream(const sp<AMessage> & msg)213 void ARTPConnection::onRemoveStream(const sp<AMessage> &msg) {
214 int32_t rtpSocket, rtcpSocket;
215 CHECK(msg->findInt32("rtp-socket", &rtpSocket));
216 CHECK(msg->findInt32("rtcp-socket", &rtcpSocket));
217
218 List<StreamInfo>::iterator it = mStreams.begin();
219 while (it != mStreams.end()
220 && (it->mRTPSocket != rtpSocket || it->mRTCPSocket != rtcpSocket)) {
221 ++it;
222 }
223
224 if (it == mStreams.end()) {
225 return;
226 }
227
228 mStreams.erase(it);
229 }
230
postPollEvent()231 void ARTPConnection::postPollEvent() {
232 if (mPollEventPending) {
233 return;
234 }
235
236 sp<AMessage> msg = new AMessage(kWhatPollStreams, this);
237 msg->post();
238
239 mPollEventPending = true;
240 }
241
onPollStreams()242 void ARTPConnection::onPollStreams() {
243 mPollEventPending = false;
244
245 if (mStreams.empty()) {
246 return;
247 }
248
249 struct timeval tv;
250 tv.tv_sec = 0;
251 tv.tv_usec = kSelectTimeoutUs;
252
253 fd_set rs;
254 FD_ZERO(&rs);
255
256 int maxSocket = -1;
257 for (List<StreamInfo>::iterator it = mStreams.begin();
258 it != mStreams.end(); ++it) {
259 if ((*it).mIsInjected) {
260 continue;
261 }
262
263 FD_SET(it->mRTPSocket, &rs);
264 FD_SET(it->mRTCPSocket, &rs);
265
266 if (it->mRTPSocket > maxSocket) {
267 maxSocket = it->mRTPSocket;
268 }
269 if (it->mRTCPSocket > maxSocket) {
270 maxSocket = it->mRTCPSocket;
271 }
272 }
273
274 if (maxSocket == -1) {
275 return;
276 }
277
278 int res = select(maxSocket + 1, &rs, NULL, NULL, &tv);
279
280 if (res > 0) {
281 List<StreamInfo>::iterator it = mStreams.begin();
282 while (it != mStreams.end()) {
283 if ((*it).mIsInjected) {
284 ++it;
285 continue;
286 }
287
288 status_t err = OK;
289 if (FD_ISSET(it->mRTPSocket, &rs)) {
290 err = receive(&*it, true);
291 }
292 if (err == OK && FD_ISSET(it->mRTCPSocket, &rs)) {
293 err = receive(&*it, false);
294 }
295
296 if (err == -ECONNRESET) {
297 // socket failure, this stream is dead, Jim.
298
299 ALOGW("failed to receive RTP/RTCP datagram.");
300 it = mStreams.erase(it);
301 continue;
302 }
303
304 ++it;
305 }
306 }
307
308 int64_t nowUs = ALooper::GetNowUs();
309 if (mLastReceiverReportTimeUs <= 0
310 || mLastReceiverReportTimeUs + 5000000ll <= nowUs) {
311 sp<ABuffer> buffer = new ABuffer(kMaxUDPSize);
312 List<StreamInfo>::iterator it = mStreams.begin();
313 while (it != mStreams.end()) {
314 StreamInfo *s = &*it;
315
316 if (s->mIsInjected) {
317 ++it;
318 continue;
319 }
320
321 if (s->mNumRTCPPacketsReceived == 0) {
322 // We have never received any RTCP packets on this stream,
323 // we don't even know where to send a report.
324 ++it;
325 continue;
326 }
327
328 buffer->setRange(0, 0);
329
330 for (size_t i = 0; i < s->mSources.size(); ++i) {
331 sp<ARTPSource> source = s->mSources.valueAt(i);
332
333 source->addReceiverReport(buffer);
334
335 if (mFlags & kRegularlyRequestFIR) {
336 source->addFIR(buffer);
337 }
338 }
339
340 if (buffer->size() > 0) {
341 ALOGV("Sending RR...");
342
343 ssize_t n;
344 do {
345 n = sendto(
346 s->mRTCPSocket, buffer->data(), buffer->size(), 0,
347 (const struct sockaddr *)&s->mRemoteRTCPAddr,
348 sizeof(s->mRemoteRTCPAddr));
349 } while (n < 0 && errno == EINTR);
350
351 if (n <= 0) {
352 ALOGW("failed to send RTCP receiver report (%s).",
353 n == 0 ? "connection gone" : strerror(errno));
354
355 it = mStreams.erase(it);
356 continue;
357 }
358
359 CHECK_EQ(n, (ssize_t)buffer->size());
360
361 mLastReceiverReportTimeUs = nowUs;
362 }
363
364 ++it;
365 }
366 }
367
368 if (!mStreams.empty()) {
369 postPollEvent();
370 }
371 }
372
receive(StreamInfo * s,bool receiveRTP)373 status_t ARTPConnection::receive(StreamInfo *s, bool receiveRTP) {
374 ALOGV("receiving %s", receiveRTP ? "RTP" : "RTCP");
375
376 CHECK(!s->mIsInjected);
377
378 sp<ABuffer> buffer = new ABuffer(65536);
379
380 socklen_t remoteAddrLen =
381 (!receiveRTP && s->mNumRTCPPacketsReceived == 0)
382 ? sizeof(s->mRemoteRTCPAddr) : 0;
383
384 ssize_t nbytes;
385 do {
386 nbytes = recvfrom(
387 receiveRTP ? s->mRTPSocket : s->mRTCPSocket,
388 buffer->data(),
389 buffer->capacity(),
390 0,
391 remoteAddrLen > 0 ? (struct sockaddr *)&s->mRemoteRTCPAddr : NULL,
392 remoteAddrLen > 0 ? &remoteAddrLen : NULL);
393 } while (nbytes < 0 && errno == EINTR);
394
395 if (nbytes <= 0) {
396 return -ECONNRESET;
397 }
398
399 buffer->setRange(0, nbytes);
400
401 // ALOGI("received %d bytes.", buffer->size());
402
403 status_t err;
404 if (receiveRTP) {
405 err = parseRTP(s, buffer);
406 } else {
407 err = parseRTCP(s, buffer);
408 }
409
410 return err;
411 }
412
parseRTP(StreamInfo * s,const sp<ABuffer> & buffer)413 status_t ARTPConnection::parseRTP(StreamInfo *s, const sp<ABuffer> &buffer) {
414 if (s->mNumRTPPacketsReceived++ == 0) {
415 sp<AMessage> notify = s->mNotifyMsg->dup();
416 notify->setInt32("first-rtp", true);
417 notify->post();
418 }
419
420 size_t size = buffer->size();
421
422 if (size < 12) {
423 // Too short to be a valid RTP header.
424 return -1;
425 }
426
427 const uint8_t *data = buffer->data();
428
429 if ((data[0] >> 6) != 2) {
430 // Unsupported version.
431 return -1;
432 }
433
434 if (data[0] & 0x20) {
435 // Padding present.
436
437 size_t paddingLength = data[size - 1];
438
439 if (paddingLength + 12 > size) {
440 // If we removed this much padding we'd end up with something
441 // that's too short to be a valid RTP header.
442 return -1;
443 }
444
445 size -= paddingLength;
446 }
447
448 int numCSRCs = data[0] & 0x0f;
449
450 size_t payloadOffset = 12 + 4 * numCSRCs;
451
452 if (size < payloadOffset) {
453 // Not enough data to fit the basic header and all the CSRC entries.
454 return -1;
455 }
456
457 if (data[0] & 0x10) {
458 // Header eXtension present.
459
460 if (size < payloadOffset + 4) {
461 // Not enough data to fit the basic header, all CSRC entries
462 // and the first 4 bytes of the extension header.
463
464 return -1;
465 }
466
467 const uint8_t *extensionData = &data[payloadOffset];
468
469 size_t extensionLength =
470 4 * (extensionData[2] << 8 | extensionData[3]);
471
472 if (size < payloadOffset + 4 + extensionLength) {
473 return -1;
474 }
475
476 payloadOffset += 4 + extensionLength;
477 }
478
479 uint32_t srcId = u32at(&data[8]);
480
481 sp<ARTPSource> source = findSource(s, srcId);
482
483 uint32_t rtpTime = u32at(&data[4]);
484
485 sp<AMessage> meta = buffer->meta();
486 meta->setInt32("ssrc", srcId);
487 meta->setInt32("rtp-time", rtpTime);
488 meta->setInt32("PT", data[1] & 0x7f);
489 meta->setInt32("M", data[1] >> 7);
490
491 buffer->setInt32Data(u16at(&data[2]));
492 buffer->setRange(payloadOffset, size - payloadOffset);
493
494 source->processRTPPacket(buffer);
495
496 return OK;
497 }
498
parseRTCP(StreamInfo * s,const sp<ABuffer> & buffer)499 status_t ARTPConnection::parseRTCP(StreamInfo *s, const sp<ABuffer> &buffer) {
500 if (s->mNumRTCPPacketsReceived++ == 0) {
501 sp<AMessage> notify = s->mNotifyMsg->dup();
502 notify->setInt32("first-rtcp", true);
503 notify->post();
504 }
505
506 const uint8_t *data = buffer->data();
507 size_t size = buffer->size();
508
509 while (size > 0) {
510 if (size < 8) {
511 // Too short to be a valid RTCP header
512 return -1;
513 }
514
515 if ((data[0] >> 6) != 2) {
516 // Unsupported version.
517 return -1;
518 }
519
520 if (data[0] & 0x20) {
521 // Padding present.
522
523 size_t paddingLength = data[size - 1];
524
525 if (paddingLength + 12 > size) {
526 // If we removed this much padding we'd end up with something
527 // that's too short to be a valid RTP header.
528 return -1;
529 }
530
531 size -= paddingLength;
532 }
533
534 size_t headerLength = 4 * (data[2] << 8 | data[3]) + 4;
535
536 if (size < headerLength) {
537 // Only received a partial packet?
538 return -1;
539 }
540
541 switch (data[1]) {
542 case 200:
543 {
544 parseSR(s, data, headerLength);
545 break;
546 }
547
548 case 201: // RR
549 case 202: // SDES
550 case 204: // APP
551 break;
552
553 case 205: // TSFB (transport layer specific feedback)
554 case 206: // PSFB (payload specific feedback)
555 // hexdump(data, headerLength);
556 break;
557
558 case 203:
559 {
560 parseBYE(s, data, headerLength);
561 break;
562 }
563
564 default:
565 {
566 ALOGW("Unknown RTCP packet type %u of size %zu",
567 (unsigned)data[1], headerLength);
568 break;
569 }
570 }
571
572 data += headerLength;
573 size -= headerLength;
574 }
575
576 return OK;
577 }
578
parseBYE(StreamInfo * s,const uint8_t * data,size_t size)579 status_t ARTPConnection::parseBYE(
580 StreamInfo *s, const uint8_t *data, size_t size) {
581 size_t SC = data[0] & 0x3f;
582
583 if (SC == 0 || size < (4 + SC * 4)) {
584 // Packet too short for the minimal BYE header.
585 return -1;
586 }
587
588 uint32_t id = u32at(&data[4]);
589
590 sp<ARTPSource> source = findSource(s, id);
591
592 source->byeReceived();
593
594 return OK;
595 }
596
parseSR(StreamInfo * s,const uint8_t * data,size_t size)597 status_t ARTPConnection::parseSR(
598 StreamInfo *s, const uint8_t *data, size_t size) {
599 size_t RC = data[0] & 0x1f;
600
601 if (size < (7 + RC * 6) * 4) {
602 // Packet too short for the minimal SR header.
603 return -1;
604 }
605
606 uint32_t id = u32at(&data[4]);
607 uint64_t ntpTime = u64at(&data[8]);
608 uint32_t rtpTime = u32at(&data[16]);
609
610 #if 0
611 ALOGI("XXX timeUpdate: ssrc=0x%08x, rtpTime %u == ntpTime %.3f",
612 id,
613 rtpTime,
614 (ntpTime >> 32) + (double)(ntpTime & 0xffffffff) / (1ll << 32));
615 #endif
616
617 sp<ARTPSource> source = findSource(s, id);
618
619 source->timeUpdate(rtpTime, ntpTime);
620
621 return 0;
622 }
623
findSource(StreamInfo * info,uint32_t srcId)624 sp<ARTPSource> ARTPConnection::findSource(StreamInfo *info, uint32_t srcId) {
625 sp<ARTPSource> source;
626 ssize_t index = info->mSources.indexOfKey(srcId);
627 if (index < 0) {
628 index = info->mSources.size();
629
630 source = new ARTPSource(
631 srcId, info->mSessionDesc, info->mIndex, info->mNotifyMsg);
632
633 info->mSources.add(srcId, source);
634 } else {
635 source = info->mSources.valueAt(index);
636 }
637
638 return source;
639 }
640
injectPacket(int index,const sp<ABuffer> & buffer)641 void ARTPConnection::injectPacket(int index, const sp<ABuffer> &buffer) {
642 sp<AMessage> msg = new AMessage(kWhatInjectPacket, this);
643 msg->setInt32("index", index);
644 msg->setBuffer("buffer", buffer);
645 msg->post();
646 }
647
onInjectPacket(const sp<AMessage> & msg)648 void ARTPConnection::onInjectPacket(const sp<AMessage> &msg) {
649 int32_t index;
650 CHECK(msg->findInt32("index", &index));
651
652 sp<ABuffer> buffer;
653 CHECK(msg->findBuffer("buffer", &buffer));
654
655 List<StreamInfo>::iterator it = mStreams.begin();
656 while (it != mStreams.end()
657 && it->mRTPSocket != index && it->mRTCPSocket != index) {
658 ++it;
659 }
660
661 if (it == mStreams.end()) {
662 TRESPASS();
663 }
664
665 StreamInfo *s = &*it;
666
667 if (it->mRTPSocket == index) {
668 parseRTP(s, buffer);
669 } else {
670 parseRTCP(s, buffer);
671 }
672 }
673
674 } // namespace android
675
676