1 /*
2  * Copyright (C) 2010 The Android Open Source Project
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *      http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 //#define LOG_NDEBUG 0
18 #define LOG_TAG "ARTPConnection"
19 #define INET_ECN_NOT_ECT    0x00    /* ECN was not enabled */
20 #define INET_ECN_ECT_1      0x01    /* ECN capable packet */
21 #define INET_ECN_ECT_0      0x02    /* ECN capable packet */
22 #define INET_ECN_CE         0x03    /* ECN congestion */
23 #define INET_ECN_MASK       0x03    /* Mask of ECN bits */
24 
25 #include <utils/Log.h>
26 
27 #include <media/stagefright/rtsp/ARTPAssembler.h>
28 #include <media/stagefright/rtsp/ARTPConnection.h>
29 #include <media/stagefright/rtsp/ARTPSource.h>
30 #include <media/stagefright/rtsp/ASessionDescription.h>
31 
32 #include <media/stagefright/foundation/ABuffer.h>
33 #include <media/stagefright/foundation/ADebug.h>
34 #include <media/stagefright/foundation/AMessage.h>
35 #include <media/stagefright/foundation/AString.h>
36 #include <media/stagefright/foundation/hexdump.h>
37 
38 #include <android/multinetwork.h>
39 
40 #include <arpa/inet.h>
41 #include <sys/socket.h>
42 
43 namespace android {
44 
45 static const size_t kMaxUDPSize = 1500;
46 
u16at(const uint8_t * data)47 static uint16_t u16at(const uint8_t *data) {
48     return data[0] << 8 | data[1];
49 }
50 
u24at(const uint8_t * data)51 static uint32_t u24at(const uint8_t *data) {
52     return u16at(data) << 16 | data[2];
53 }
54 
u32at(const uint8_t * data)55 static uint32_t u32at(const uint8_t *data) {
56     return u16at(data) << 16 | u16at(&data[2]);
57 }
58 
u64at(const uint8_t * data)59 static uint64_t u64at(const uint8_t *data) {
60     return (uint64_t)(u32at(data)) << 32 | u32at(&data[4]);
61 }
62 
63 // static
64 const int64_t ARTPConnection::kSelectTimeoutUs = 1000LL;
65 const int64_t ARTPConnection::kMinOneSecondNotifyDelayUs = 100000ll;
66 
67 struct ARTPConnection::StreamInfo {
68     bool isIPv6;
69     int mRTPSocket;
70     int mRTCPSocket;
71     sp<ASessionDescription> mSessionDesc;
72     size_t mIndex;
73     sp<AMessage> mNotifyMsg;
74     KeyedVector<uint32_t, sp<ARTPSource> > mSources;
75 
76     int64_t mNumRTCPPacketsReceived;
77     int64_t mNumRTPPacketsReceived;
78     struct sockaddr_in mRemoteRTCPAddr;
79     struct sockaddr_in6 mRemoteRTCPAddr6;
80 
81     bool mIsInjected;
82 
83     // A place to save time when it polls
84     int64_t mLastPollTimeUs;
85     // RTCP Extension for CVO
86     int mCVOExtMap; // will be set to 0 if cvo is not negotiated in sdp
87 };
88 
ARTPConnection(uint32_t flags)89 ARTPConnection::ARTPConnection(uint32_t flags)
90     : mFlags(flags),
91       mPollEventPending(false),
92       mLastReceiverReportTimeUs(-1),
93       mLastBitrateReportTimeUs(-1),
94       mLastCongestionNotifyTimeUs(-1),
95       mTargetBitrate(-1),
96       mRtpSockOptEcn(0),
97       mIsIPv6(false),
98       mStaticJitterTimeMs(kStaticJitterTimeMs) {
99 }
100 
~ARTPConnection()101 ARTPConnection::~ARTPConnection() {
102 }
103 
addStream(int rtpSocket,int rtcpSocket,const sp<ASessionDescription> & sessionDesc,size_t index,const sp<AMessage> & notify,bool injected)104 void ARTPConnection::addStream(
105         int rtpSocket, int rtcpSocket,
106         const sp<ASessionDescription> &sessionDesc,
107         size_t index,
108         const sp<AMessage> &notify,
109         bool injected) {
110     sp<AMessage> msg = new AMessage(kWhatAddStream, this);
111     msg->setInt32("rtp-socket", rtpSocket);
112     msg->setInt32("rtcp-socket", rtcpSocket);
113     msg->setObject("session-desc", sessionDesc);
114     msg->setSize("index", index);
115     msg->setMessage("notify", notify);
116     msg->setInt32("injected", injected);
117     msg->post();
118 }
119 
seekStream()120 void ARTPConnection::seekStream() {
121     sp<AMessage> msg = new AMessage(kWhatSeekStream, this);
122     msg->post();
123 }
124 
removeStream(int rtpSocket,int rtcpSocket)125 void ARTPConnection::removeStream(int rtpSocket, int rtcpSocket) {
126     sp<AMessage> msg = new AMessage(kWhatRemoveStream, this);
127     msg->setInt32("rtp-socket", rtpSocket);
128     msg->setInt32("rtcp-socket", rtcpSocket);
129     msg->post();
130 }
131 
bumpSocketBufferSize(int s)132 static void bumpSocketBufferSize(int s) {
133     int size = 256 * 1024;
134     CHECK_EQ(setsockopt(s, SOL_SOCKET, SO_RCVBUF, &size, sizeof(size)), 0);
135 }
136 
137 // static
MakePortPair(int * rtpSocket,int * rtcpSocket,unsigned * rtpPort)138 void ARTPConnection::MakePortPair(
139         int *rtpSocket, int *rtcpSocket, unsigned *rtpPort) {
140     *rtpSocket = socket(AF_INET, SOCK_DGRAM, 0);
141     CHECK_GE(*rtpSocket, 0);
142 
143     bumpSocketBufferSize(*rtpSocket);
144 
145     *rtcpSocket = socket(AF_INET, SOCK_DGRAM, 0);
146     CHECK_GE(*rtcpSocket, 0);
147 
148     bumpSocketBufferSize(*rtcpSocket);
149 
150     /* rand() * 1000 may overflow int type, use long long */
151     unsigned start = (unsigned)((rand()* 1000LL)/RAND_MAX) + 15550;
152     start &= ~1;
153 
154     for (unsigned port = start; port < 65535; port += 2) {
155         struct sockaddr_in addr;
156         memset(addr.sin_zero, 0, sizeof(addr.sin_zero));
157         addr.sin_family = AF_INET;
158         addr.sin_addr.s_addr = htonl(INADDR_ANY);
159         addr.sin_port = htons(port);
160 
161         if (bind(*rtpSocket,
162                  (const struct sockaddr *)&addr, sizeof(addr)) < 0) {
163             continue;
164         }
165 
166         addr.sin_port = htons(port + 1);
167 
168         if (bind(*rtcpSocket,
169                  (const struct sockaddr *)&addr, sizeof(addr)) == 0) {
170             *rtpPort = port;
171             return;
172         } else {
173             // we should recreate a RTP socket to avoid bind other port in same RTP socket
174             close(*rtpSocket);
175 
176             *rtpSocket = socket(AF_INET, SOCK_DGRAM, 0);
177             CHECK_GE(*rtpSocket, 0);
178             bumpSocketBufferSize(*rtpSocket);
179         }
180     }
181 
182     TRESPASS();
183 }
184 
185 // static
MakeRTPSocketPair(int * rtpSocket,int * rtcpSocket,const char * localIp,const char * remoteIp,unsigned localPort,unsigned remotePort,int64_t socketNetwork,int32_t sockOptEcn)186 void ARTPConnection::MakeRTPSocketPair(
187         int *rtpSocket, int *rtcpSocket, const char *localIp, const char *remoteIp,
188         unsigned localPort, unsigned remotePort, int64_t socketNetwork, int32_t sockOptEcn) {
189     bool isIPv6 = false;
190     if (strchr(localIp, ':') != NULL)
191         isIPv6 = true;
192 
193     *rtpSocket = socket(isIPv6 ? AF_INET6 : AF_INET, SOCK_DGRAM, 0);
194     CHECK_GE(*rtpSocket, 0);
195 
196     bumpSocketBufferSize(*rtpSocket);
197 
198     *rtcpSocket = socket(isIPv6 ? AF_INET6 : AF_INET, SOCK_DGRAM, 0);
199     CHECK_GE(*rtcpSocket, 0);
200 
201     if (socketNetwork != 0) {
202         ALOGD("trying to bind rtp socket(%d) to network(%llu).",
203                 *rtpSocket, (unsigned long long)socketNetwork);
204 
205         int result = android_setsocknetwork((net_handle_t)socketNetwork, *rtpSocket);
206         if (result != 0) {
207             ALOGW("failed(%d) to bind rtp socket(%d) to network(%llu)",
208                     result, *rtpSocket, (unsigned long long)socketNetwork);
209         }
210         result = android_setsocknetwork((net_handle_t)socketNetwork, *rtcpSocket);
211         if (result != 0) {
212             ALOGW("failed(%d) to bind rtcp socket(%d) to network(%llu)",
213                     result, *rtcpSocket, (unsigned long long)socketNetwork);
214         }
215     }
216 
217     if (sockOptEcn != 0) {
218         int sockOptForTOS = 1;
219         if (setsockopt(*rtpSocket, isIPv6 ? IPPROTO_IPV6 : IPPROTO_IP,
220                isIPv6 ? IPV6_RECVTCLASS : IP_RECVTOS,
221                (int *)&sockOptForTOS, sizeof(sockOptForTOS)) < 0) {
222             ALOGE("failed to set recv sockopt TOS on rtpsock(%d). err=%s", *rtpSocket,
223                 strerror(errno));
224         } else {
225             ALOGD("successfully set recv sockopt TOS on rtpsock(%d)", *rtpSocket);
226             int result = setsockopt(*rtcpSocket, isIPv6 ? IPPROTO_IPV6 : IPPROTO_IP,
227                 isIPv6 ? IPV6_RECVTCLASS : IP_RECVTOS,
228                 (int *)&sockOptForTOS, sizeof(sockOptForTOS));
229             if (result >= 0) {
230                 ALOGD("successfully set recv sockopt TOS on rtcpsock(%d).", *rtcpSocket);
231             }
232         }
233     }
234 
235     bumpSocketBufferSize(*rtcpSocket);
236 
237     struct sockaddr *addr;
238     struct sockaddr_in addr4;
239     struct sockaddr_in6 addr6;
240 
241     if (isIPv6) {
242         addr = (struct sockaddr *)&addr6;
243         memset(&addr6, 0, sizeof(addr6));
244         addr6.sin6_family = AF_INET6;
245         inet_pton(AF_INET6, localIp, &addr6.sin6_addr);
246         addr6.sin6_port = htons((uint16_t)localPort);
247     } else {
248         addr = (struct sockaddr *)&addr4;
249         memset(&addr4, 0, sizeof(addr4));
250         addr4.sin_family = AF_INET;
251         addr4.sin_addr.s_addr = inet_addr(localIp);
252         addr4.sin_port = htons((uint16_t)localPort);
253     }
254 
255     int sockopt = 1;
256     setsockopt(*rtpSocket, SOL_SOCKET, SO_REUSEADDR, (int *)&sockopt, sizeof(sockopt));
257     setsockopt(*rtcpSocket, SOL_SOCKET, SO_REUSEADDR, (int *)&sockopt, sizeof(sockopt));
258 
259     int sizeSockSt = isIPv6 ? sizeof(addr6) : sizeof(addr4);
260 
261     if (bind(*rtpSocket, addr, sizeSockSt) == 0) {
262         ALOGI("rtp socket successfully binded. addr=%s:%d", localIp, localPort);
263     } else {
264         ALOGE("failed to bind rtp socket addr=%s:%d err=%s", localIp, localPort, strerror(errno));
265         return;
266     }
267 
268     if (isIPv6)
269         addr6.sin6_port = htons(localPort + 1);
270     else
271         addr4.sin_port = htons(localPort + 1);
272 
273     if (bind(*rtcpSocket, addr, sizeSockSt) == 0) {
274         ALOGI("rtcp socket successfully binded. addr=%s:%d", localIp, localPort + 1);
275     } else {
276         ALOGE("failed to bind rtcp socket addr=%s:%d err=%s", localIp,
277                 localPort + 1, strerror(errno));
278     }
279 
280     // Re uses addr variable as remote addr.
281     if (isIPv6) {
282         memset(&addr6, 0, sizeof(addr6));
283         addr6.sin6_family = AF_INET6;
284         inet_pton(AF_INET6, remoteIp, &addr6.sin6_addr);
285         addr6.sin6_port = htons((uint16_t)remotePort);
286     } else {
287         memset(&addr4, 0, sizeof(addr4));
288         addr4.sin_family = AF_INET;
289         addr4.sin_addr.s_addr = inet_addr(remoteIp);
290         addr4.sin_port = htons((uint16_t)remotePort);
291     }
292     if (connect(*rtpSocket, addr, sizeSockSt) == 0) {
293         ALOGI("rtp socket successfully connected to remote=%s:%d", remoteIp, remotePort);
294     } else {
295         ALOGE("failed to connect rtp socket to remote addr=%s:%d err=%s", remoteIp,
296                 remotePort, strerror(errno));
297         return;
298     }
299 
300     if (isIPv6)
301         addr6.sin6_port = htons(remotePort + 1);
302     else
303         addr4.sin_port = htons(remotePort + 1);
304 
305     if (connect(*rtcpSocket, addr, sizeSockSt) == 0) {
306         ALOGI("rtcp socket successfully connected to remote=%s:%d", remoteIp, remotePort + 1);
307     } else {
308         ALOGE("failed to connect rtcp socket addr=%s:%d err=%s", remoteIp,
309                 remotePort + 1, strerror(errno));
310         return;
311     }
312 }
313 
onMessageReceived(const sp<AMessage> & msg)314 void ARTPConnection::onMessageReceived(const sp<AMessage> &msg) {
315     switch (msg->what()) {
316         case kWhatAddStream:
317         {
318             onAddStream(msg);
319             break;
320         }
321 
322         case kWhatSeekStream:
323         {
324             onSeekStream(msg);
325             break;
326         }
327 
328         case kWhatRemoveStream:
329         {
330             onRemoveStream(msg);
331             break;
332         }
333 
334         case kWhatPollStreams:
335         {
336             onPollStreams();
337             break;
338         }
339 
340         case kWhatAlarmStream:
341         {
342             onAlarmStream(msg);
343             break;
344         }
345 
346         case kWhatInjectPacket:
347         {
348             onInjectPacket(msg);
349             break;
350         }
351 
352         default:
353         {
354             TRESPASS();
355             break;
356         }
357     }
358 }
359 
onAddStream(const sp<AMessage> & msg)360 void ARTPConnection::onAddStream(const sp<AMessage> &msg) {
361     mStreams.push_back(StreamInfo());
362     StreamInfo *info = &*--mStreams.end();
363 
364     int32_t s;
365     CHECK(msg->findInt32("rtp-socket", &s));
366     info->mRTPSocket = s;
367     CHECK(msg->findInt32("rtcp-socket", &s));
368     info->mRTCPSocket = s;
369 
370     int32_t injected;
371     CHECK(msg->findInt32("injected", &injected));
372 
373     info->mIsInjected = injected;
374 
375     sp<RefBase> obj;
376     CHECK(msg->findObject("session-desc", &obj));
377     info->mSessionDesc = static_cast<ASessionDescription *>(obj.get());
378 
379     CHECK(msg->findSize("index", &info->mIndex));
380     CHECK(msg->findMessage("notify", &info->mNotifyMsg));
381 
382     info->mNumRTCPPacketsReceived = 0;
383     info->mNumRTPPacketsReceived = 0;
384     memset(&info->mRemoteRTCPAddr, 0, sizeof(info->mRemoteRTCPAddr));
385     memset(&info->mRemoteRTCPAddr6, 0, sizeof(info->mRemoteRTCPAddr6));
386 
387     sp<ASessionDescription> sessionDesc = info->mSessionDesc;
388     info->mCVOExtMap = 0;
389     for (size_t i = 1; i < sessionDesc->countTracks(); ++i) {
390         int32_t cvoExtMap;
391         if (sessionDesc->getCvoExtMap(i, &cvoExtMap)) {
392             info->mCVOExtMap = cvoExtMap;
393             ALOGI("urn:3gpp:video-orientation(cvo) found as extmap:%d", info->mCVOExtMap);
394         } else {
395             ALOGI("urn:3gpp:video-orientation(cvo) not found :%d", info->mCVOExtMap);
396         }
397     }
398 
399     if (!injected) {
400         postPollEvent();
401     }
402 }
403 
onSeekStream(const sp<AMessage> & msg)404 void ARTPConnection::onSeekStream(const sp<AMessage> &msg) {
405     (void)msg; // unused param as of now.
406     List<StreamInfo>::iterator it = mStreams.begin();
407     while (it != mStreams.end()) {
408         for (size_t i = 0; i < it->mSources.size(); ++i) {
409             sp<ARTPSource> source = it->mSources.valueAt(i);
410             source->timeReset();
411         }
412         ++it;
413     }
414 }
415 
onRemoveStream(const sp<AMessage> & msg)416 void ARTPConnection::onRemoveStream(const sp<AMessage> &msg) {
417     int32_t rtpSocket, rtcpSocket;
418     CHECK(msg->findInt32("rtp-socket", &rtpSocket));
419     CHECK(msg->findInt32("rtcp-socket", &rtcpSocket));
420 
421     List<StreamInfo>::iterator it = mStreams.begin();
422     while (it != mStreams.end()
423            && (it->mRTPSocket != rtpSocket || it->mRTCPSocket != rtcpSocket)) {
424         ++it;
425     }
426 
427     if (it == mStreams.end()) {
428         return;
429     }
430 
431     mStreams.erase(it);
432 }
433 
postPollEvent()434 void ARTPConnection::postPollEvent() {
435     if (mPollEventPending) {
436         return;
437     }
438 
439     sp<AMessage> msg = new AMessage(kWhatPollStreams, this);
440     msg->post();
441 
442     mPollEventPending = true;
443 }
444 
onPollStreams()445 void ARTPConnection::onPollStreams() {
446     mPollEventPending = false;
447 
448     if (mStreams.empty()) {
449         return;
450     }
451 
452     struct timeval tv;
453     tv.tv_sec = 0;
454     tv.tv_usec = kSelectTimeoutUs;
455 
456     fd_set rs;
457     FD_ZERO(&rs);
458 
459     int maxSocket = -1;
460     for (List<StreamInfo>::iterator it = mStreams.begin();
461          it != mStreams.end(); ++it) {
462         if ((*it).mIsInjected) {
463             continue;
464         }
465 
466         FD_SET(it->mRTPSocket, &rs);
467         FD_SET(it->mRTCPSocket, &rs);
468 
469         if (it->mRTPSocket > maxSocket) {
470             maxSocket = it->mRTPSocket;
471         }
472         if (it->mRTCPSocket > maxSocket) {
473             maxSocket = it->mRTCPSocket;
474         }
475     }
476 
477     if (maxSocket == -1) {
478         return;
479     }
480 
481     int64_t nowUs = ALooper::GetNowUs();
482     int res = select(maxSocket + 1, &rs, NULL, NULL, &tv);
483 
484     if (res > 0) {
485         List<StreamInfo>::iterator it = mStreams.begin();
486         while (it != mStreams.end()) {
487             if ((*it).mIsInjected) {
488                 ++it;
489                 continue;
490             }
491             it->mLastPollTimeUs = nowUs;
492 
493             status_t err = OK;
494             if (FD_ISSET(it->mRTPSocket, &rs)) {
495                 err = receive(&*it, true);
496             }
497             if (err == OK && FD_ISSET(it->mRTCPSocket, &rs)) {
498                 err = receive(&*it, false);
499             }
500 
501             if (err == -ECONNRESET) {
502                 // socket failure, this stream is dead, Jim.
503                 for (size_t i = 0; i < it->mSources.size(); ++i) {
504                     sp<AMessage> notify = it->mNotifyMsg->dup();
505                     notify->setInt32("rtcp-event", 1);
506                     notify->setInt32("payload-type", 400);
507                     notify->setInt32("feedback-type", 1);
508                     notify->setInt32("sender", it->mSources.valueAt(i)->getSelfID());
509                     notify->post();
510 
511                     ALOGW("failed to receive RTP/RTCP datagram.");
512                 }
513                 it = mStreams.erase(it);
514                 continue;
515             }
516 
517             // add NACK and FIR that needs to be sent immediately.
518             sp<ABuffer> buffer = new ABuffer(kMaxUDPSize);
519             for (size_t i = 0; i < it->mSources.size(); ++i) {
520                 buffer->setRange(0, 0);
521                 int cnt = it->mSources.valueAt(i)->addNACK(buffer);
522                 if (cnt > 0) {
523                     ALOGV("Send NACK for lost %d Packets", cnt);
524                     send(&*it, buffer);
525                 }
526 
527                 buffer->setRange(0, 0);
528                 it->mSources.valueAt(i)->addFIR(buffer);
529                 if (buffer->size() > 0) {
530                     ALOGD("Send FIR immediately for lost Packets");
531                     send(&*it, buffer);
532                 }
533 
534                 buffer->setRange(0, 0);
535                 it->mSources.valueAt(i)->addTMMBR(buffer, mTargetBitrate);
536                 mTargetBitrate = -1;
537                 if (buffer->size() > 0) {
538                     ALOGV("Sending TMMBR...");
539                     ssize_t n = send(&*it, buffer);
540 
541                     if (n != (ssize_t)buffer->size()) {
542                         ALOGW("failed to send RTCP TMMBR (%s).",
543                                 n >= 0 ? "connection gone" : strerror(errno));
544                         continue;
545                     }
546                 }
547             }
548 
549             ++it;
550         }
551     }
552 
553     checkRxBitrate(nowUs);
554 
555     if (mLastReceiverReportTimeUs <= 0
556             || mLastReceiverReportTimeUs + 5000000LL <= nowUs) {
557         sp<ABuffer> buffer = new ABuffer(kMaxUDPSize);
558         List<StreamInfo>::iterator it = mStreams.begin();
559         while (it != mStreams.end()) {
560             StreamInfo *s = &*it;
561 
562             if (s->mIsInjected) {
563                 ++it;
564                 continue;
565             }
566 
567             if (s->mNumRTCPPacketsReceived == 0) {
568                 // We have never received any RTCP packets on this stream,
569                 // we don't even know where to send a report.
570                 ++it;
571                 continue;
572             }
573 
574             buffer->setRange(0, 0);
575 
576             for (size_t i = 0; i < s->mSources.size(); ++i) {
577                 sp<ARTPSource> source = s->mSources.valueAt(i);
578 
579                 source->addReceiverReport(buffer);
580 
581                 if (mFlags & kRegularlyRequestFIR) {
582                     source->addFIR(buffer);
583                 }
584             }
585 
586             if (buffer->size() > 0) {
587                 ALOGV("Sending RR...");
588 
589                 ssize_t n = send(s, buffer);
590 
591                 if (n != (ssize_t)buffer->size()) {
592                     ALOGW("failed to send RTCP receiver report (%s).",
593                             n >= 0 ? "connection gone" : strerror(errno));
594                     ++it;
595                     continue;
596                 }
597 
598                 mLastReceiverReportTimeUs = nowUs;
599             }
600 
601             ++it;
602         }
603     }
604 
605     if (!mStreams.empty()) {
606         postPollEvent();
607     }
608 }
609 
onAlarmStream(const sp<AMessage> msg)610 void ARTPConnection::onAlarmStream(const sp<AMessage> msg) {
611     sp<ARTPSource> source = nullptr;
612     if (msg->findObject("source", (sp<android::RefBase>*)&source)) {
613         source->processRTPPacket();
614     }
615 }
616 
receive(StreamInfo * s,bool receiveRTP)617 status_t ARTPConnection::receive(StreamInfo *s, bool receiveRTP) {
618     ALOGV("receiving %s", receiveRTP ? "RTP" : "RTCP");
619 
620     CHECK(!s->mIsInjected);
621 
622     sp<ABuffer> buffer = new ABuffer(65536);
623 
624     struct msghdr sMsg = {};
625     struct iovec sIov[1] = {};
626 
627     sIov[0].iov_base = (char *) buffer->data();
628     sIov[0].iov_len = buffer->capacity();
629 
630     sMsg.msg_iov = sIov;
631     sMsg.msg_iovlen = 1;
632 
633     int cMsgSize = sizeof(struct cmsghdr) + sizeof(uint8_t);
634     char buf[CMSG_SPACE(cMsgSize)];
635     sMsg.msg_control = buf;
636     sMsg.msg_controllen = sizeof(buf);
637     sMsg.msg_flags = 0;
638 
639     ssize_t nbytes;
640     do {
641         // Used recvmsg to get the TOS header of incoming packet
642         nbytes = recvmsg(receiveRTP ? s->mRTPSocket : s->mRTCPSocket, &sMsg, 0);
643         mCumulativeBytes += nbytes;
644     } while (nbytes < 0 && errno == EINTR);
645 
646     if (nbytes <= 0) {
647         ALOGW("failed to recv rtp packet. cause=%s", strerror(errno));
648         // ECONNREFUSED may happen in next recvfrom() calling if one of
649         // outgoing packet can not be delivered to remote by using sendto()
650         if (errno == ECONNREFUSED) {
651             return -ECONNREFUSED;
652         } else {
653             return -ECONNRESET;
654         }
655     }
656 
657     if (nbytes > 0) {
658         handleIpHeadersIfReceived(s, sMsg);
659     }
660 
661     buffer->setRange(0, nbytes);
662 
663     // ALOGI("received %d bytes.", buffer->size());
664 
665     status_t err;
666     if (receiveRTP) {
667         err = parseRTP(s, buffer);
668     } else {
669         err = parseRTCP(s, buffer);
670     }
671 
672     return err;
673 }
674 
675 /* This function will check if TOS is present or not in received IP packet.
676  * After that if it is present then it will notify about congestion to upper
677  * layer if CE bit is set in TOS header.
678  **/
handleIpHeadersIfReceived(StreamInfo * s,struct msghdr sMsg)679 void ARTPConnection::handleIpHeadersIfReceived(StreamInfo *s, struct msghdr sMsg) {
680     struct cmsghdr *cMsg;
681     cMsg = CMSG_FIRSTHDR(&sMsg);
682 
683     if (cMsg == NULL) {
684         ALOGV("cmsg is null");
685     }
686 
687     for (; cMsg != NULL; cMsg = CMSG_NXTHDR(&sMsg, cMsg)) {
688         bool isTOSHeader = ((cMsg->cmsg_level == (mIsIPv6 ? IPPROTO_IPV6 : IPPROTO_IP))
689                               && (cMsg->cmsg_type == (mIsIPv6 ? IPV6_TCLASS : IP_TOS))
690                               && (cMsg->cmsg_len));
691         if (isTOSHeader) {
692             uint8_t receivedTOS;
693             receivedTOS = *((uint8_t *) CMSG_DATA(cMsg));
694             // checking CE bit is set
695             bool isCEBitMarked = ((receivedTOS & INET_ECN_MASK) == INET_ECN_CE);
696 
697             ALOGV("receivedTos(value -> %d)", receivedTOS);
698 
699             if (isCEBitMarked) {
700                 ALOGD("receivedTos(value -> %d), is ECN CE marked = %d",
701                     receivedTOS, isCEBitMarked);
702                 notifyCongestionToUpperLayerIfNeeded(s);
703             }
704             break;
705         }
706     }
707 }
708 
709 /* this function will be use to notify congestion in video call to upper layer */
notifyCongestionToUpperLayerIfNeeded(StreamInfo * s)710 void ARTPConnection::notifyCongestionToUpperLayerIfNeeded(StreamInfo *s) {
711     int64_t nowUs = ALooper::GetNowUs();
712 
713     if (mLastCongestionNotifyTimeUs <= 0) {
714         mLastCongestionNotifyTimeUs = nowUs;
715     }
716 
717     bool isNeedToUpdate = (mLastCongestionNotifyTimeUs + kMinOneSecondNotifyDelayUs <= nowUs);
718     ALOGD("ECN info set by upper layer=%d, isNeedToUpdate=%d", mRtpSockOptEcn, isNeedToUpdate);
719 
720     if ((mRtpSockOptEcn != 0) && (isNeedToUpdate)) {
721         sp<AMessage> notify = s->mNotifyMsg->dup();
722         notify->setInt32("rtcp-event", 1);
723         notify->setInt32("payload-type", ARTPSource::RTP_QUALITY_CD);
724         notify->post();
725         mLastCongestionNotifyTimeUs = nowUs;
726         ALOGD("Congestion detected in n/w, Notify upper layer");
727     }
728 }
729 
send(const StreamInfo * info,const sp<ABuffer> buffer)730 ssize_t ARTPConnection::send(const StreamInfo *info, const sp<ABuffer> buffer) {
731         struct sockaddr* pRemoteRTCPAddr;
732         int sizeSockSt;
733 
734         /* It seems this isIPv6 variable is useless.
735          * We should remove it to prevent confusion */
736         if (mIsIPv6) {
737             pRemoteRTCPAddr = (struct sockaddr *)&info->mRemoteRTCPAddr6;
738             sizeSockSt = sizeof(struct sockaddr_in6);
739         } else {
740             pRemoteRTCPAddr = (struct sockaddr *)&info->mRemoteRTCPAddr;
741             sizeSockSt = sizeof(struct sockaddr_in);
742         }
743 
744         if (mFlags & kViLTEConnection) {
745             ALOGV("ViLTE RTCP");
746             pRemoteRTCPAddr = NULL;
747             sizeSockSt = 0;
748         }
749 
750         ssize_t n;
751         do {
752             n = sendto(
753                     info->mRTCPSocket, buffer->data(), buffer->size(), 0,
754                     pRemoteRTCPAddr, sizeSockSt);
755         } while (n < 0 && errno == EINTR);
756 
757         if (n < 0) {
758             ALOGW("failed to send rtcp packet. cause=%s", strerror(errno));
759         }
760 
761         return n;
762 }
763 
parseRTP(StreamInfo * s,const sp<ABuffer> & buffer)764 status_t ARTPConnection::parseRTP(StreamInfo *s, const sp<ABuffer> &buffer) {
765     size_t size = buffer->size();
766 
767     if (size < 12) {
768         // Too short to be a valid RTP header.
769         return -1;
770     }
771 
772     const uint8_t *data = buffer->data();
773 
774     if ((data[0] >> 6) != 2) {
775         // Unsupported version.
776         return -1;
777     }
778 
779     if ((data[1] & 0x7f) == 20 /* decimal */) {
780         // Unassigned payload type
781         return -1;
782     }
783 
784     if (data[0] & 0x20) {
785         // Padding present.
786 
787         size_t paddingLength = data[size - 1];
788 
789         if (paddingLength + 12 > size) {
790             // If we removed this much padding we'd end up with something
791             // that's too short to be a valid RTP header.
792             return -1;
793         }
794 
795         size -= paddingLength;
796     }
797 
798     int numCSRCs = data[0] & 0x0f;
799 
800     size_t payloadOffset = 12 + 4 * numCSRCs;
801 
802     if (size < payloadOffset) {
803         // Not enough data to fit the basic header and all the CSRC entries.
804         return -1;
805     }
806 
807     int32_t cvoDegrees = -1;
808     if (data[0] & 0x10) {
809         // Header eXtension present.
810 
811         if (size < payloadOffset + 4) {
812             // Not enough data to fit the basic header, all CSRC entries
813             // and the first 4 bytes of the extension header.
814 
815             return -1;
816         }
817 
818         const uint8_t *extensionData = &data[payloadOffset];
819 
820         size_t extensionLength =
821             (4 * (extensionData[2] << 8 | extensionData[3])) + 4;
822 
823         if (size < payloadOffset + extensionLength) {
824             return -1;
825         }
826 
827         parseRTPExt(s, (const uint8_t *)extensionData, extensionLength, &cvoDegrees);
828         payloadOffset += extensionLength;
829     }
830 
831     uint32_t srcId = u32at(&data[8]);
832 
833     sp<ARTPSource> source = findSource(s, srcId);
834 
835     uint32_t rtpTime = u32at(&data[4]);
836 
837     sp<AMessage> meta = buffer->meta();
838     meta->setInt32("ssrc", srcId);
839     meta->setInt32("rtp-time", rtpTime);
840     meta->setInt32("PT", data[1] & 0x7f);
841     meta->setInt32("M", data[1] >> 7);
842     if (cvoDegrees >= 0) {
843         meta->setInt32("cvo", cvoDegrees);
844     }
845 
846     int32_t seq = u16at(&data[2]);
847     buffer->setInt32Data(seq);
848     buffer->setRange(payloadOffset, size - payloadOffset);
849 
850     if (s->mNumRTPPacketsReceived++ == 0) {
851         sp<AMessage> notify = s->mNotifyMsg->dup();
852         notify->setInt32("first-rtp", true);
853         notify->setInt32("rtcp-event", 1);
854         notify->setInt32("payload-type", ARTPSource::RTP_FIRST_PACKET);
855         notify->setInt32("rtp-time", (int32_t)rtpTime);
856         notify->setInt32("rtp-seq-num", seq);
857         notify->setInt64("recv-time-us", ALooper::GetNowUs());
858         notify->post();
859 
860         ALOGD("send first-rtp event to upper layer");
861     }
862 
863     source->processRTPPacket(buffer);
864 
865     return OK;
866 }
867 
parseRTPExt(StreamInfo * s,const uint8_t * extHeader,size_t extLen,int32_t * cvoDegrees)868 status_t ARTPConnection::parseRTPExt(StreamInfo *s,
869         const uint8_t *extHeader, size_t extLen, int32_t *cvoDegrees) {
870     if (extLen < 4)
871         return -1;
872 
873     uint16_t header = (extHeader[0] << 8) | (extHeader[1]);
874     bool isOnebyteHeader = false;
875 
876     if (header == 0xBEDE) {
877         isOnebyteHeader = true;
878     } else if (header == 0x1000) {
879         ALOGW("parseRTPExt: two-byte header is not implemented yet");
880         return -1;
881     } else {
882         ALOGW("parseRTPExt: can not recognize header");
883         return -1;
884     }
885 
886     const uint8_t *extPayload = extHeader + 4;
887     extLen -= 4;
888     size_t offset = 0; //start from first payload of rtp extension.
889     // one-byte header parser
890     while (isOnebyteHeader && offset < extLen) {
891         uint8_t extmapId = extPayload[offset] >> 4;
892         uint8_t length = (extPayload[offset] & 0xF) + 1;
893         offset++;
894 
895         // padding case
896         if (extmapId == 0)
897             continue;
898 
899         uint8_t data[16]; // maximum length value
900         for (uint8_t j = 0; offset + j <= extLen && j < length; j++) {
901             data[j] = extPayload[offset + j];
902         }
903 
904         offset += length;
905 
906         if (extmapId == s->mCVOExtMap) {
907             *cvoDegrees = (int32_t)data[0];
908             return OK;
909         }
910     }
911 
912     return BAD_VALUE;
913 }
914 
parseRTCP(StreamInfo * s,const sp<ABuffer> & buffer)915 status_t ARTPConnection::parseRTCP(StreamInfo *s, const sp<ABuffer> &buffer) {
916     if (s->mNumRTCPPacketsReceived++ == 0) {
917         sp<AMessage> notify = s->mNotifyMsg->dup();
918         notify->setInt32("first-rtcp", true);
919         notify->setInt32("rtcp-event", 1);
920         notify->setInt32("payload-type", ARTPSource::RTCP_FIRST_PACKET);
921         notify->setInt64("recv-time-us", ALooper::GetNowUs());
922         notify->post();
923 
924         ALOGD("send first-rtcp event to upper layer");
925     }
926 
927     const uint8_t *data = buffer->data();
928     size_t size = buffer->size();
929 
930     while (size > 0) {
931         if (size < 8) {
932             // Too short to be a valid RTCP header
933             return -1;
934         }
935 
936         if ((data[0] >> 6) != 2) {
937             // Unsupported version.
938             return -1;
939         }
940 
941         if (data[0] & 0x20) {
942             // Padding present.
943 
944             size_t paddingLength = data[size - 1];
945 
946             if (paddingLength + 12 > size) {
947                 // If we removed this much padding we'd end up with something
948                 // that's too short to be a valid RTP header.
949                 return -1;
950             }
951 
952             size -= paddingLength;
953         }
954 
955         size_t headerLength = 4 * (data[2] << 8 | data[3]) + 4;
956 
957         if (size < headerLength) {
958             // Only received a partial packet?
959             return -1;
960         }
961 
962         switch (data[1]) {
963             case 200:
964             {
965                 parseSenderReport(s, data, headerLength);
966                 break;
967             }
968 
969             case 201:  // RR
970             {
971                 parseReceiverReport(s, data, headerLength);
972                 break;
973             }
974             case 202:  // SDES
975             case 204:  // APP
976                 break;
977 
978             case 205:  // TSFB (transport layer specific feedback)
979                 parseTSFB(s, data, headerLength);
980                 break;
981             case 206:  // PSFB (payload specific feedback)
982                 // hexdump(data, headerLength);
983                 parsePSFB(s, data, headerLength);
984                 ALOGI("RTCP packet type %u of size %zu", (unsigned)data[1], headerLength);
985                 break;
986 
987             case 203:
988             {
989                 parseBYE(s, data, headerLength);
990                 break;
991             }
992 
993             default:
994             {
995                 ALOGW("Unknown RTCP packet type %u of size %zu",
996                      (unsigned)data[1], headerLength);
997                 break;
998             }
999         }
1000 
1001         data += headerLength;
1002         size -= headerLength;
1003     }
1004 
1005     return OK;
1006 }
1007 
parseBYE(StreamInfo * s,const uint8_t * data,size_t size)1008 status_t ARTPConnection::parseBYE(
1009         StreamInfo *s, const uint8_t *data, size_t size) {
1010     size_t SC = data[0] & 0x3f;
1011 
1012     if (SC == 0 || size < (4 + SC * 4)) {
1013         // Packet too short for the minimal BYE header.
1014         return -1;
1015     }
1016 
1017     uint32_t id = u32at(&data[4]);
1018 
1019     sp<ARTPSource> source = findSource(s, id);
1020 
1021     // Report a final stastics to be used for rtp data usage.
1022     int64_t nowUs = ALooper::GetNowUs();
1023     int32_t timeDiff = (nowUs - mLastBitrateReportTimeUs) / 1000000ll;
1024     int32_t bitrate = mCumulativeBytes * 8 / timeDiff;
1025     source->notifyPktInfo(bitrate, nowUs, true /* isRegular */);
1026 
1027     source->byeReceived();
1028 
1029     return OK;
1030 }
1031 
parseSenderReport(StreamInfo * s,const uint8_t * data,size_t size)1032 status_t ARTPConnection::parseSenderReport(
1033         StreamInfo *s, const uint8_t *data, size_t size) {
1034     ALOG_ASSERT(size >= 1, "parseSenderReport: invalid packet size.");
1035     size_t receptionReportCount = data[0] & 0x1f;
1036     if (size < (7 + (receptionReportCount * 6)) * 4) {
1037         // Packet too short for the minimal sender report header.
1038         return -1;
1039     }
1040 
1041     int64_t recvTimeUs = ALooper::GetNowUs();
1042     uint32_t senderId = u32at(&data[4]);
1043     uint64_t ntpTime = u64at(&data[8]);
1044     uint32_t rtpTime = u32at(&data[16]);
1045     uint32_t pktCount = u32at(&data[20]);
1046     uint32_t octCount = u32at(&data[24]);
1047 
1048     ALOGD("SR received: ssrc=0x%08x, rtpTime%u == ntpTime %llu, pkt=%u, oct=%u",
1049             senderId, rtpTime, (unsigned long long)ntpTime, pktCount, octCount);
1050 
1051     sp<ARTPSource> source = findSource(s, senderId);
1052     source->timeUpdate(recvTimeUs, rtpTime, ntpTime);
1053 
1054     for (int32_t i = 0; i < receptionReportCount; i++) {
1055         int32_t offset = 28 + (i * 24);
1056         parseReceptionReportBlock(s, recvTimeUs, senderId, data + offset, size - offset);
1057     }
1058 
1059     return 0;
1060 }
1061 
parseReceiverReport(StreamInfo * s,const uint8_t * data,size_t size)1062 status_t ARTPConnection::parseReceiverReport(
1063         StreamInfo *s, const uint8_t *data, size_t size) {
1064     ALOG_ASSERT(size >= 1, "parseReceiverReport: invalid packet size.");
1065     size_t receptionReportCount = data[0] & 0x1f;
1066     if (size < (2 + (receptionReportCount * 6)) * 4) {
1067         // Packet too short for the minimal receiver report header.
1068         return -1;
1069     }
1070 
1071 #if 0
1072     ALOGI("XXX timeUpdate: ssrc=0x%08x, rtpTime %u == ntpTime %.3f",
1073          id,
1074          rtpTime,
1075          (ntpTime >> 32) + (double)(ntpTime & 0xffffffff) / (1ll << 32));
1076 #endif
1077     int64_t recvTimeUs = ALooper::GetNowUs();
1078     uint32_t senderId = u32at(&data[4]);
1079 
1080     for (int i = 0; i < receptionReportCount; i++) {
1081         int32_t offset = 8 + (i * 24);
1082         parseReceptionReportBlock(s, recvTimeUs, senderId, data + offset, size - offset);
1083     }
1084 
1085     return 0;
1086 }
1087 
parseReceptionReportBlock(StreamInfo * s,int64_t recvTimeUs,uint32_t senderId,const uint8_t * data,size_t size)1088 status_t ARTPConnection::parseReceptionReportBlock(
1089         StreamInfo *s, int64_t recvTimeUs, uint32_t senderId, const uint8_t *data, size_t size) {
1090     ALOG_ASSERT(size >= 24, "parseReceptionReportBlock: invalid packet size.");
1091     if (size < 24) {
1092         // remaining size is smaller than reception report block size.
1093         return -1;
1094     }
1095 
1096     uint32_t rbId = u32at(&data[0]);
1097     uint32_t fLost = data[4];
1098     int32_t cumLost = u24at(&data[5]);
1099     uint32_t ehSeq = u32at(&data[8]);
1100     uint32_t jitter = u32at(&data[12]);
1101     uint32_t lsr = u32at(&data[16]);
1102     uint32_t dlsr = u32at(&data[20]);
1103 
1104     ALOGD("Reception Report Block: t:%llu sid:%u rid:%u fl:%u cl:%u hs:%u jt:%u lsr:%u dlsr:%u",
1105             (unsigned long long)recvTimeUs, senderId, rbId, fLost, cumLost,
1106             ehSeq, jitter, lsr, dlsr);
1107     sp<ARTPSource> source = findSource(s, senderId);
1108     sp<ReceptionReportBlock> rrb = new ReceptionReportBlock(
1109             rbId, fLost, cumLost, ehSeq, jitter, lsr, dlsr);
1110     source->processReceptionReportBlock(recvTimeUs, senderId, rrb);
1111 
1112     return 0;
1113 }
1114 
parseTSFB(StreamInfo * s,const uint8_t * data,size_t size)1115 status_t ARTPConnection::parseTSFB(
1116         StreamInfo *s, const uint8_t *data, size_t size) {
1117     if (size < 12) {
1118         // broken packet
1119         return -1;
1120     }
1121 
1122     uint8_t msgType = data[0] & 0x1f;
1123     uint32_t id = u32at(&data[4]);
1124 
1125     const uint8_t *ptr = &data[12];
1126     size -= 12;
1127 
1128     using namespace std;
1129     size_t FCISize;
1130     switch(msgType) {
1131         case 1:     // Generic NACK
1132         {
1133             FCISize = 4;
1134             while (size >= FCISize) {
1135                 uint16_t PID = u16at(&ptr[0]);  // lost packet RTP number
1136                 uint16_t BLP = u16at(&ptr[2]);  // Bitmask of following Lost Packets
1137 
1138                 size -= FCISize;
1139                 ptr += FCISize;
1140 
1141                 AString list_of_losts;
1142                 list_of_losts.append(PID);
1143                 for (int i=0 ; i<16 ; i++) {
1144                     bool is_lost = BLP & (0x1 << i);
1145                     if (is_lost) {
1146                         list_of_losts.append(", ");
1147                         list_of_losts.append(PID + i);
1148                     }
1149                 }
1150                 ALOGI("Opponent losts packet of RTP %s", list_of_losts.c_str());
1151             }
1152             break;
1153         }
1154         case 3:     // TMMBR
1155         case 4:     // TMMBN
1156         {
1157             FCISize = 8;
1158             while (size >= FCISize) {
1159                 uint32_t MxTBR = u32at(&ptr[4]);
1160                 uint32_t MxTBRExp = MxTBR >> 26;
1161                 uint32_t MxTBRMantissa = (MxTBR >> 9) & 0x01FFFF;
1162                 uint32_t overhead = MxTBR & 0x01FF;
1163 
1164                 size -= FCISize;
1165                 ptr += FCISize;
1166 
1167                 uint32_t bitRate = (1 << MxTBRExp) * MxTBRMantissa;
1168 
1169                 if (msgType == 3)
1170                     ALOGI("Op -> UE Req Tx bitrate : %d X 2^%d = %d",
1171                         MxTBRMantissa, MxTBRExp, bitRate);
1172                 else if (msgType == 4)
1173                     ALOGI("OP -> UE Noti Rx bitrate : %d X 2^%d = %d",
1174                         MxTBRMantissa, MxTBRExp, bitRate);
1175 
1176                 sp<AMessage> notify = s->mNotifyMsg->dup();
1177                 notify->setInt32("rtcp-event", 1);
1178                 notify->setInt32("payload-type", 205);
1179                 notify->setInt32("feedback-type", msgType);
1180                 notify->setInt32("sender", id);
1181                 notify->setInt32("bit-rate", bitRate);
1182                 notify->post();
1183                 ALOGI("overhead : %d", overhead);
1184             }
1185             break;
1186         }
1187         default:
1188         {
1189             ALOGI("Not supported TSFB type %d", msgType);
1190             break;
1191         }
1192     }
1193 
1194     return 0;
1195 }
1196 
parsePSFB(StreamInfo * s,const uint8_t * data,size_t size)1197 status_t ARTPConnection::parsePSFB(
1198         StreamInfo *s, const uint8_t *data, size_t size) {
1199     if (size < 12) {
1200         // broken packet
1201         return -1;
1202     }
1203 
1204     uint8_t msgType = data[0] & 0x1f;
1205     uint32_t id = u32at(&data[4]);
1206 
1207     const uint8_t *ptr = &data[12];
1208     size -= 12;
1209 
1210     using namespace std;
1211     switch(msgType) {
1212         case 1:     // Picture Loss Indication (PLI)
1213         {
1214             if (size > 0) {
1215                 // PLI does not need parameters
1216                 break;
1217             };
1218             sp<AMessage> notify = s->mNotifyMsg->dup();
1219             notify->setInt32("rtcp-event", 1);
1220             notify->setInt32("payload-type", 206);
1221             notify->setInt32("feedback-type", msgType);
1222             notify->setInt32("sender", id);
1223             notify->post();
1224             ALOGI("PLI detected.");
1225             break;
1226         }
1227         case 4:     // Full Intra Request (FIR)
1228         {
1229             if (size < 4) {
1230                 break;
1231             }
1232             uint32_t requestedId = u32at(&ptr[0]);
1233             if (requestedId == (uint32_t)mSelfID) {
1234                 sp<AMessage> notify = s->mNotifyMsg->dup();
1235                 notify->setInt32("rtcp-event", 1);
1236                 notify->setInt32("payload-type", 206);
1237                 notify->setInt32("feedback-type", msgType);
1238                 notify->setInt32("sender", id);
1239                 notify->post();
1240                 ALOGI("FIR detected.");
1241             }
1242             break;
1243         }
1244         default:
1245         {
1246             ALOGI("Not supported PSFB type %d", msgType);
1247             break;
1248         }
1249     }
1250 
1251     return 0;
1252 }
findSource(StreamInfo * info,uint32_t srcId)1253 sp<ARTPSource> ARTPConnection::findSource(StreamInfo *info, uint32_t srcId) {
1254     sp<ARTPSource> source;
1255     ssize_t index = info->mSources.indexOfKey(srcId);
1256     if (index < 0) {
1257         index = info->mSources.size();
1258 
1259         source = new ARTPSource(
1260                 srcId, info->mSessionDesc, info->mIndex, info->mNotifyMsg);
1261 
1262         if (mFlags & kViLTEConnection) {
1263             setStaticJitterTimeMs(50);
1264             source->setPeriodicFIR(false);
1265         }
1266 
1267         source->setSelfID(mSelfID);
1268         source->setStaticJitterTimeMs(mStaticJitterTimeMs);
1269         sp<AMessage> timer = new AMessage(kWhatAlarmStream, this);
1270         source->setJbTimer(timer);
1271         info->mSources.add(srcId, source);
1272     } else {
1273         source = info->mSources.valueAt(index);
1274     }
1275 
1276     return source;
1277 }
1278 
injectPacket(int index,const sp<ABuffer> & buffer)1279 void ARTPConnection::injectPacket(int index, const sp<ABuffer> &buffer) {
1280     sp<AMessage> msg = new AMessage(kWhatInjectPacket, this);
1281     msg->setInt32("index", index);
1282     msg->setBuffer("buffer", buffer);
1283     msg->post();
1284 }
1285 
setSelfID(const uint32_t selfID)1286 void ARTPConnection::setSelfID(const uint32_t selfID) {
1287     mSelfID = selfID;
1288 }
1289 
setStaticJitterTimeMs(const uint32_t jbTimeMs)1290 void ARTPConnection::setStaticJitterTimeMs(const uint32_t jbTimeMs) {
1291     mStaticJitterTimeMs = jbTimeMs;
1292 }
1293 
setTargetBitrate(int32_t targetBitrate)1294 void ARTPConnection::setTargetBitrate(int32_t targetBitrate) {
1295     mTargetBitrate = targetBitrate;
1296 }
1297 
setRtpSockOptEcn(int32_t sockOptEcn)1298 void ARTPConnection::setRtpSockOptEcn(int32_t sockOptEcn) {
1299     mRtpSockOptEcn = sockOptEcn;
1300 }
1301 
setIsIPv6(const char * localIp)1302 void ARTPConnection::setIsIPv6(const char *localIp) {
1303     mIsIPv6 = (strchr(localIp, ':') != nullptr);
1304 }
1305 
checkRxBitrate(int64_t nowUs)1306 void ARTPConnection::checkRxBitrate(int64_t nowUs) {
1307     if (mLastBitrateReportTimeUs <= 0) {
1308         mCumulativeBytes = 0;
1309         mLastBitrateReportTimeUs = nowUs;
1310     }
1311     else if (mLastEarlyNotifyTimeUs + kMinOneSecondNotifyDelayUs <= nowUs) {
1312         int32_t timeDiff = (nowUs - mLastBitrateReportTimeUs) / 1000000ll;
1313         int32_t bitrate = mCumulativeBytes * 8 / timeDiff;
1314         mLastEarlyNotifyTimeUs = nowUs;
1315 
1316         List<StreamInfo>::iterator it = mStreams.begin();
1317         while (it != mStreams.end()) {
1318             StreamInfo *s = &*it;
1319             if (s->mIsInjected) {
1320                 ++it;
1321                 continue;
1322             }
1323             for (size_t i = 0; i < s->mSources.size(); ++i) {
1324                 sp<ARTPSource> source = s->mSources.valueAt(i);
1325                 if (source->isNeedToEarlyNotify()) {
1326                     source->notifyPktInfo(bitrate, nowUs, false /* isRegular */);
1327                     mLastEarlyNotifyTimeUs = nowUs + (1000000ll * 3600 * 24); // after 1 day
1328                 }
1329             }
1330             ++it;
1331         }
1332     }
1333     else if (mLastBitrateReportTimeUs + 1000000ll <= nowUs) {
1334         int32_t timeDiff = (nowUs - mLastBitrateReportTimeUs) / 1000000ll;
1335         int32_t bitrate = mCumulativeBytes * 8 / timeDiff;
1336         ALOGI("Actual Rx bitrate : %d bits/sec", bitrate);
1337 
1338         sp<ABuffer> buffer = new ABuffer(kMaxUDPSize);
1339         List<StreamInfo>::iterator it = mStreams.begin();
1340         while (it != mStreams.end()) {
1341             StreamInfo *s = &*it;
1342             if (s->mIsInjected) {
1343                 ++it;
1344                 continue;
1345             }
1346 
1347             if (s->mNumRTCPPacketsReceived == 0) {
1348                 // We have never received any RTCP packets on this stream,
1349                 // we don't even know where to send a report.
1350                 ++it;
1351                 continue;
1352             }
1353 
1354             buffer->setRange(0, 0);
1355             for (size_t i = 0; i < s->mSources.size(); ++i) {
1356                 sp<ARTPSource> source = s->mSources.valueAt(i);
1357                 source->notifyPktInfo(bitrate, nowUs, true /* isRegular */);
1358             }
1359             ++it;
1360         }
1361         mCumulativeBytes = 0;
1362         mLastBitrateReportTimeUs = nowUs;
1363         mLastEarlyNotifyTimeUs = nowUs;
1364     }
1365 }
onInjectPacket(const sp<AMessage> & msg)1366 void ARTPConnection::onInjectPacket(const sp<AMessage> &msg) {
1367     int32_t index;
1368     CHECK(msg->findInt32("index", &index));
1369 
1370     sp<ABuffer> buffer;
1371     CHECK(msg->findBuffer("buffer", &buffer));
1372 
1373     List<StreamInfo>::iterator it = mStreams.begin();
1374     while (it != mStreams.end()
1375            && it->mRTPSocket != index && it->mRTCPSocket != index) {
1376         ++it;
1377     }
1378 
1379     if (it == mStreams.end()) {
1380         TRESPASS();
1381     }
1382 
1383     StreamInfo *s = &*it;
1384 
1385     if (it->mRTPSocket == index) {
1386         parseRTP(s, buffer);
1387     } else {
1388         parseRTCP(s, buffer);
1389     }
1390 }
1391 
1392 }  // namespace android
1393