1 /*
2  * Copyright (C) 2010 The Android Open Source Project
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *      http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 //#define LOG_NDEBUG 0
18 #define LOG_TAG "ARTPConnection"
19 #include <utils/Log.h>
20 
21 #include "ARTPAssembler.h"
22 #include "ARTPConnection.h"
23 
24 #include "ARTPSource.h"
25 #include "ASessionDescription.h"
26 
27 #include <media/stagefright/foundation/ABuffer.h>
28 #include <media/stagefright/foundation/ADebug.h>
29 #include <media/stagefright/foundation/AMessage.h>
30 #include <media/stagefright/foundation/AString.h>
31 #include <media/stagefright/foundation/hexdump.h>
32 
33 #include <arpa/inet.h>
34 #include <sys/socket.h>
35 
36 namespace android {
37 
38 static const size_t kMaxUDPSize = 1500;
39 
u16at(const uint8_t * data)40 static uint16_t u16at(const uint8_t *data) {
41     return data[0] << 8 | data[1];
42 }
43 
u32at(const uint8_t * data)44 static uint32_t u32at(const uint8_t *data) {
45     return u16at(data) << 16 | u16at(&data[2]);
46 }
47 
u64at(const uint8_t * data)48 static uint64_t u64at(const uint8_t *data) {
49     return (uint64_t)(u32at(data)) << 32 | u32at(&data[4]);
50 }
51 
52 // static
53 const int64_t ARTPConnection::kSelectTimeoutUs = 1000ll;
54 
55 struct ARTPConnection::StreamInfo {
56     int mRTPSocket;
57     int mRTCPSocket;
58     sp<ASessionDescription> mSessionDesc;
59     size_t mIndex;
60     sp<AMessage> mNotifyMsg;
61     KeyedVector<uint32_t, sp<ARTPSource> > mSources;
62 
63     int64_t mNumRTCPPacketsReceived;
64     int64_t mNumRTPPacketsReceived;
65     struct sockaddr_in mRemoteRTCPAddr;
66 
67     bool mIsInjected;
68 };
69 
ARTPConnection(uint32_t flags)70 ARTPConnection::ARTPConnection(uint32_t flags)
71     : mFlags(flags),
72       mPollEventPending(false),
73       mLastReceiverReportTimeUs(-1) {
74 }
75 
~ARTPConnection()76 ARTPConnection::~ARTPConnection() {
77 }
78 
addStream(int rtpSocket,int rtcpSocket,const sp<ASessionDescription> & sessionDesc,size_t index,const sp<AMessage> & notify,bool injected)79 void ARTPConnection::addStream(
80         int rtpSocket, int rtcpSocket,
81         const sp<ASessionDescription> &sessionDesc,
82         size_t index,
83         const sp<AMessage> &notify,
84         bool injected) {
85     sp<AMessage> msg = new AMessage(kWhatAddStream, this);
86     msg->setInt32("rtp-socket", rtpSocket);
87     msg->setInt32("rtcp-socket", rtcpSocket);
88     msg->setObject("session-desc", sessionDesc);
89     msg->setSize("index", index);
90     msg->setMessage("notify", notify);
91     msg->setInt32("injected", injected);
92     msg->post();
93 }
94 
removeStream(int rtpSocket,int rtcpSocket)95 void ARTPConnection::removeStream(int rtpSocket, int rtcpSocket) {
96     sp<AMessage> msg = new AMessage(kWhatRemoveStream, this);
97     msg->setInt32("rtp-socket", rtpSocket);
98     msg->setInt32("rtcp-socket", rtcpSocket);
99     msg->post();
100 }
101 
bumpSocketBufferSize(int s)102 static void bumpSocketBufferSize(int s) {
103     int size = 256 * 1024;
104     CHECK_EQ(setsockopt(s, SOL_SOCKET, SO_RCVBUF, &size, sizeof(size)), 0);
105 }
106 
107 // static
MakePortPair(int * rtpSocket,int * rtcpSocket,unsigned * rtpPort)108 void ARTPConnection::MakePortPair(
109         int *rtpSocket, int *rtcpSocket, unsigned *rtpPort) {
110     *rtpSocket = socket(AF_INET, SOCK_DGRAM, 0);
111     CHECK_GE(*rtpSocket, 0);
112 
113     bumpSocketBufferSize(*rtpSocket);
114 
115     *rtcpSocket = socket(AF_INET, SOCK_DGRAM, 0);
116     CHECK_GE(*rtcpSocket, 0);
117 
118     bumpSocketBufferSize(*rtcpSocket);
119 
120     /* rand() * 1000 may overflow int type, use long long */
121     unsigned start = (unsigned)((rand()* 1000ll)/RAND_MAX) + 15550;
122     start &= ~1;
123 
124     for (unsigned port = start; port < 65536; port += 2) {
125         struct sockaddr_in addr;
126         memset(addr.sin_zero, 0, sizeof(addr.sin_zero));
127         addr.sin_family = AF_INET;
128         addr.sin_addr.s_addr = htonl(INADDR_ANY);
129         addr.sin_port = htons(port);
130 
131         if (bind(*rtpSocket,
132                  (const struct sockaddr *)&addr, sizeof(addr)) < 0) {
133             continue;
134         }
135 
136         addr.sin_port = htons(port + 1);
137 
138         if (bind(*rtcpSocket,
139                  (const struct sockaddr *)&addr, sizeof(addr)) == 0) {
140             *rtpPort = port;
141             return;
142         }
143     }
144 
145     TRESPASS();
146 }
147 
onMessageReceived(const sp<AMessage> & msg)148 void ARTPConnection::onMessageReceived(const sp<AMessage> &msg) {
149     switch (msg->what()) {
150         case kWhatAddStream:
151         {
152             onAddStream(msg);
153             break;
154         }
155 
156         case kWhatRemoveStream:
157         {
158             onRemoveStream(msg);
159             break;
160         }
161 
162         case kWhatPollStreams:
163         {
164             onPollStreams();
165             break;
166         }
167 
168         case kWhatInjectPacket:
169         {
170             onInjectPacket(msg);
171             break;
172         }
173 
174         default:
175         {
176             TRESPASS();
177             break;
178         }
179     }
180 }
181 
onAddStream(const sp<AMessage> & msg)182 void ARTPConnection::onAddStream(const sp<AMessage> &msg) {
183     mStreams.push_back(StreamInfo());
184     StreamInfo *info = &*--mStreams.end();
185 
186     int32_t s;
187     CHECK(msg->findInt32("rtp-socket", &s));
188     info->mRTPSocket = s;
189     CHECK(msg->findInt32("rtcp-socket", &s));
190     info->mRTCPSocket = s;
191 
192     int32_t injected;
193     CHECK(msg->findInt32("injected", &injected));
194 
195     info->mIsInjected = injected;
196 
197     sp<RefBase> obj;
198     CHECK(msg->findObject("session-desc", &obj));
199     info->mSessionDesc = static_cast<ASessionDescription *>(obj.get());
200 
201     CHECK(msg->findSize("index", &info->mIndex));
202     CHECK(msg->findMessage("notify", &info->mNotifyMsg));
203 
204     info->mNumRTCPPacketsReceived = 0;
205     info->mNumRTPPacketsReceived = 0;
206     memset(&info->mRemoteRTCPAddr, 0, sizeof(info->mRemoteRTCPAddr));
207 
208     if (!injected) {
209         postPollEvent();
210     }
211 }
212 
onRemoveStream(const sp<AMessage> & msg)213 void ARTPConnection::onRemoveStream(const sp<AMessage> &msg) {
214     int32_t rtpSocket, rtcpSocket;
215     CHECK(msg->findInt32("rtp-socket", &rtpSocket));
216     CHECK(msg->findInt32("rtcp-socket", &rtcpSocket));
217 
218     List<StreamInfo>::iterator it = mStreams.begin();
219     while (it != mStreams.end()
220            && (it->mRTPSocket != rtpSocket || it->mRTCPSocket != rtcpSocket)) {
221         ++it;
222     }
223 
224     if (it == mStreams.end()) {
225         return;
226     }
227 
228     mStreams.erase(it);
229 }
230 
postPollEvent()231 void ARTPConnection::postPollEvent() {
232     if (mPollEventPending) {
233         return;
234     }
235 
236     sp<AMessage> msg = new AMessage(kWhatPollStreams, this);
237     msg->post();
238 
239     mPollEventPending = true;
240 }
241 
onPollStreams()242 void ARTPConnection::onPollStreams() {
243     mPollEventPending = false;
244 
245     if (mStreams.empty()) {
246         return;
247     }
248 
249     struct timeval tv;
250     tv.tv_sec = 0;
251     tv.tv_usec = kSelectTimeoutUs;
252 
253     fd_set rs;
254     FD_ZERO(&rs);
255 
256     int maxSocket = -1;
257     for (List<StreamInfo>::iterator it = mStreams.begin();
258          it != mStreams.end(); ++it) {
259         if ((*it).mIsInjected) {
260             continue;
261         }
262 
263         FD_SET(it->mRTPSocket, &rs);
264         FD_SET(it->mRTCPSocket, &rs);
265 
266         if (it->mRTPSocket > maxSocket) {
267             maxSocket = it->mRTPSocket;
268         }
269         if (it->mRTCPSocket > maxSocket) {
270             maxSocket = it->mRTCPSocket;
271         }
272     }
273 
274     if (maxSocket == -1) {
275         return;
276     }
277 
278     int res = select(maxSocket + 1, &rs, NULL, NULL, &tv);
279 
280     if (res > 0) {
281         List<StreamInfo>::iterator it = mStreams.begin();
282         while (it != mStreams.end()) {
283             if ((*it).mIsInjected) {
284                 ++it;
285                 continue;
286             }
287 
288             status_t err = OK;
289             if (FD_ISSET(it->mRTPSocket, &rs)) {
290                 err = receive(&*it, true);
291             }
292             if (err == OK && FD_ISSET(it->mRTCPSocket, &rs)) {
293                 err = receive(&*it, false);
294             }
295 
296             if (err == -ECONNRESET) {
297                 // socket failure, this stream is dead, Jim.
298 
299                 ALOGW("failed to receive RTP/RTCP datagram.");
300                 it = mStreams.erase(it);
301                 continue;
302             }
303 
304             ++it;
305         }
306     }
307 
308     int64_t nowUs = ALooper::GetNowUs();
309     if (mLastReceiverReportTimeUs <= 0
310             || mLastReceiverReportTimeUs + 5000000ll <= nowUs) {
311         sp<ABuffer> buffer = new ABuffer(kMaxUDPSize);
312         List<StreamInfo>::iterator it = mStreams.begin();
313         while (it != mStreams.end()) {
314             StreamInfo *s = &*it;
315 
316             if (s->mIsInjected) {
317                 ++it;
318                 continue;
319             }
320 
321             if (s->mNumRTCPPacketsReceived == 0) {
322                 // We have never received any RTCP packets on this stream,
323                 // we don't even know where to send a report.
324                 ++it;
325                 continue;
326             }
327 
328             buffer->setRange(0, 0);
329 
330             for (size_t i = 0; i < s->mSources.size(); ++i) {
331                 sp<ARTPSource> source = s->mSources.valueAt(i);
332 
333                 source->addReceiverReport(buffer);
334 
335                 if (mFlags & kRegularlyRequestFIR) {
336                     source->addFIR(buffer);
337                 }
338             }
339 
340             if (buffer->size() > 0) {
341                 ALOGV("Sending RR...");
342 
343                 ssize_t n;
344                 do {
345                     n = sendto(
346                         s->mRTCPSocket, buffer->data(), buffer->size(), 0,
347                         (const struct sockaddr *)&s->mRemoteRTCPAddr,
348                         sizeof(s->mRemoteRTCPAddr));
349                 } while (n < 0 && errno == EINTR);
350 
351                 if (n <= 0) {
352                     ALOGW("failed to send RTCP receiver report (%s).",
353                          n == 0 ? "connection gone" : strerror(errno));
354 
355                     it = mStreams.erase(it);
356                     continue;
357                 }
358 
359                 CHECK_EQ(n, (ssize_t)buffer->size());
360 
361                 mLastReceiverReportTimeUs = nowUs;
362             }
363 
364             ++it;
365         }
366     }
367 
368     if (!mStreams.empty()) {
369         postPollEvent();
370     }
371 }
372 
receive(StreamInfo * s,bool receiveRTP)373 status_t ARTPConnection::receive(StreamInfo *s, bool receiveRTP) {
374     ALOGV("receiving %s", receiveRTP ? "RTP" : "RTCP");
375 
376     CHECK(!s->mIsInjected);
377 
378     sp<ABuffer> buffer = new ABuffer(65536);
379 
380     socklen_t remoteAddrLen =
381         (!receiveRTP && s->mNumRTCPPacketsReceived == 0)
382             ? sizeof(s->mRemoteRTCPAddr) : 0;
383 
384     ssize_t nbytes;
385     do {
386         nbytes = recvfrom(
387             receiveRTP ? s->mRTPSocket : s->mRTCPSocket,
388             buffer->data(),
389             buffer->capacity(),
390             0,
391             remoteAddrLen > 0 ? (struct sockaddr *)&s->mRemoteRTCPAddr : NULL,
392             remoteAddrLen > 0 ? &remoteAddrLen : NULL);
393     } while (nbytes < 0 && errno == EINTR);
394 
395     if (nbytes <= 0) {
396         return -ECONNRESET;
397     }
398 
399     buffer->setRange(0, nbytes);
400 
401     // ALOGI("received %d bytes.", buffer->size());
402 
403     status_t err;
404     if (receiveRTP) {
405         err = parseRTP(s, buffer);
406     } else {
407         err = parseRTCP(s, buffer);
408     }
409 
410     return err;
411 }
412 
parseRTP(StreamInfo * s,const sp<ABuffer> & buffer)413 status_t ARTPConnection::parseRTP(StreamInfo *s, const sp<ABuffer> &buffer) {
414     if (s->mNumRTPPacketsReceived++ == 0) {
415         sp<AMessage> notify = s->mNotifyMsg->dup();
416         notify->setInt32("first-rtp", true);
417         notify->post();
418     }
419 
420     size_t size = buffer->size();
421 
422     if (size < 12) {
423         // Too short to be a valid RTP header.
424         return -1;
425     }
426 
427     const uint8_t *data = buffer->data();
428 
429     if ((data[0] >> 6) != 2) {
430         // Unsupported version.
431         return -1;
432     }
433 
434     if (data[0] & 0x20) {
435         // Padding present.
436 
437         size_t paddingLength = data[size - 1];
438 
439         if (paddingLength + 12 > size) {
440             // If we removed this much padding we'd end up with something
441             // that's too short to be a valid RTP header.
442             return -1;
443         }
444 
445         size -= paddingLength;
446     }
447 
448     int numCSRCs = data[0] & 0x0f;
449 
450     size_t payloadOffset = 12 + 4 * numCSRCs;
451 
452     if (size < payloadOffset) {
453         // Not enough data to fit the basic header and all the CSRC entries.
454         return -1;
455     }
456 
457     if (data[0] & 0x10) {
458         // Header eXtension present.
459 
460         if (size < payloadOffset + 4) {
461             // Not enough data to fit the basic header, all CSRC entries
462             // and the first 4 bytes of the extension header.
463 
464             return -1;
465         }
466 
467         const uint8_t *extensionData = &data[payloadOffset];
468 
469         size_t extensionLength =
470             4 * (extensionData[2] << 8 | extensionData[3]);
471 
472         if (size < payloadOffset + 4 + extensionLength) {
473             return -1;
474         }
475 
476         payloadOffset += 4 + extensionLength;
477     }
478 
479     uint32_t srcId = u32at(&data[8]);
480 
481     sp<ARTPSource> source = findSource(s, srcId);
482 
483     uint32_t rtpTime = u32at(&data[4]);
484 
485     sp<AMessage> meta = buffer->meta();
486     meta->setInt32("ssrc", srcId);
487     meta->setInt32("rtp-time", rtpTime);
488     meta->setInt32("PT", data[1] & 0x7f);
489     meta->setInt32("M", data[1] >> 7);
490 
491     buffer->setInt32Data(u16at(&data[2]));
492     buffer->setRange(payloadOffset, size - payloadOffset);
493 
494     source->processRTPPacket(buffer);
495 
496     return OK;
497 }
498 
parseRTCP(StreamInfo * s,const sp<ABuffer> & buffer)499 status_t ARTPConnection::parseRTCP(StreamInfo *s, const sp<ABuffer> &buffer) {
500     if (s->mNumRTCPPacketsReceived++ == 0) {
501         sp<AMessage> notify = s->mNotifyMsg->dup();
502         notify->setInt32("first-rtcp", true);
503         notify->post();
504     }
505 
506     const uint8_t *data = buffer->data();
507     size_t size = buffer->size();
508 
509     while (size > 0) {
510         if (size < 8) {
511             // Too short to be a valid RTCP header
512             return -1;
513         }
514 
515         if ((data[0] >> 6) != 2) {
516             // Unsupported version.
517             return -1;
518         }
519 
520         if (data[0] & 0x20) {
521             // Padding present.
522 
523             size_t paddingLength = data[size - 1];
524 
525             if (paddingLength + 12 > size) {
526                 // If we removed this much padding we'd end up with something
527                 // that's too short to be a valid RTP header.
528                 return -1;
529             }
530 
531             size -= paddingLength;
532         }
533 
534         size_t headerLength = 4 * (data[2] << 8 | data[3]) + 4;
535 
536         if (size < headerLength) {
537             // Only received a partial packet?
538             return -1;
539         }
540 
541         switch (data[1]) {
542             case 200:
543             {
544                 parseSR(s, data, headerLength);
545                 break;
546             }
547 
548             case 201:  // RR
549             case 202:  // SDES
550             case 204:  // APP
551                 break;
552 
553             case 205:  // TSFB (transport layer specific feedback)
554             case 206:  // PSFB (payload specific feedback)
555                 // hexdump(data, headerLength);
556                 break;
557 
558             case 203:
559             {
560                 parseBYE(s, data, headerLength);
561                 break;
562             }
563 
564             default:
565             {
566                 ALOGW("Unknown RTCP packet type %u of size %zu",
567                      (unsigned)data[1], headerLength);
568                 break;
569             }
570         }
571 
572         data += headerLength;
573         size -= headerLength;
574     }
575 
576     return OK;
577 }
578 
parseBYE(StreamInfo * s,const uint8_t * data,size_t size)579 status_t ARTPConnection::parseBYE(
580         StreamInfo *s, const uint8_t *data, size_t size) {
581     size_t SC = data[0] & 0x3f;
582 
583     if (SC == 0 || size < (4 + SC * 4)) {
584         // Packet too short for the minimal BYE header.
585         return -1;
586     }
587 
588     uint32_t id = u32at(&data[4]);
589 
590     sp<ARTPSource> source = findSource(s, id);
591 
592     source->byeReceived();
593 
594     return OK;
595 }
596 
parseSR(StreamInfo * s,const uint8_t * data,size_t size)597 status_t ARTPConnection::parseSR(
598         StreamInfo *s, const uint8_t *data, size_t size) {
599     size_t RC = data[0] & 0x1f;
600 
601     if (size < (7 + RC * 6) * 4) {
602         // Packet too short for the minimal SR header.
603         return -1;
604     }
605 
606     uint32_t id = u32at(&data[4]);
607     uint64_t ntpTime = u64at(&data[8]);
608     uint32_t rtpTime = u32at(&data[16]);
609 
610 #if 0
611     ALOGI("XXX timeUpdate: ssrc=0x%08x, rtpTime %u == ntpTime %.3f",
612          id,
613          rtpTime,
614          (ntpTime >> 32) + (double)(ntpTime & 0xffffffff) / (1ll << 32));
615 #endif
616 
617     sp<ARTPSource> source = findSource(s, id);
618 
619     source->timeUpdate(rtpTime, ntpTime);
620 
621     return 0;
622 }
623 
findSource(StreamInfo * info,uint32_t srcId)624 sp<ARTPSource> ARTPConnection::findSource(StreamInfo *info, uint32_t srcId) {
625     sp<ARTPSource> source;
626     ssize_t index = info->mSources.indexOfKey(srcId);
627     if (index < 0) {
628         index = info->mSources.size();
629 
630         source = new ARTPSource(
631                 srcId, info->mSessionDesc, info->mIndex, info->mNotifyMsg);
632 
633         info->mSources.add(srcId, source);
634     } else {
635         source = info->mSources.valueAt(index);
636     }
637 
638     return source;
639 }
640 
injectPacket(int index,const sp<ABuffer> & buffer)641 void ARTPConnection::injectPacket(int index, const sp<ABuffer> &buffer) {
642     sp<AMessage> msg = new AMessage(kWhatInjectPacket, this);
643     msg->setInt32("index", index);
644     msg->setBuffer("buffer", buffer);
645     msg->post();
646 }
647 
onInjectPacket(const sp<AMessage> & msg)648 void ARTPConnection::onInjectPacket(const sp<AMessage> &msg) {
649     int32_t index;
650     CHECK(msg->findInt32("index", &index));
651 
652     sp<ABuffer> buffer;
653     CHECK(msg->findBuffer("buffer", &buffer));
654 
655     List<StreamInfo>::iterator it = mStreams.begin();
656     while (it != mStreams.end()
657            && it->mRTPSocket != index && it->mRTCPSocket != index) {
658         ++it;
659     }
660 
661     if (it == mStreams.end()) {
662         TRESPASS();
663     }
664 
665     StreamInfo *s = &*it;
666 
667     if (it->mRTPSocket == index) {
668         parseRTP(s, buffer);
669     } else {
670         parseRTCP(s, buffer);
671     }
672 }
673 
674 }  // namespace android
675 
676