1 /*
2 * Copyright (C) 2010 The Android Open Source Project
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
17 //#define LOG_NDEBUG 0
18 #define LOG_TAG "ARTPConnection"
19 #define INET_ECN_NOT_ECT 0x00 /* ECN was not enabled */
20 #define INET_ECN_ECT_1 0x01 /* ECN capable packet */
21 #define INET_ECN_ECT_0 0x02 /* ECN capable packet */
22 #define INET_ECN_CE 0x03 /* ECN congestion */
23 #define INET_ECN_MASK 0x03 /* Mask of ECN bits */
24
25 #include <utils/Log.h>
26
27 #include <media/stagefright/rtsp/ARTPAssembler.h>
28 #include <media/stagefright/rtsp/ARTPConnection.h>
29 #include <media/stagefright/rtsp/ARTPSource.h>
30 #include <media/stagefright/rtsp/ASessionDescription.h>
31
32 #include <media/stagefright/foundation/ABuffer.h>
33 #include <media/stagefright/foundation/ADebug.h>
34 #include <media/stagefright/foundation/AMessage.h>
35 #include <media/stagefright/foundation/AString.h>
36 #include <media/stagefright/foundation/hexdump.h>
37
38 #include <android/multinetwork.h>
39
40 #include <arpa/inet.h>
41 #include <sys/socket.h>
42
43 namespace android {
44
45 static const size_t kMaxUDPSize = 1500;
46
u16at(const uint8_t * data)47 static uint16_t u16at(const uint8_t *data) {
48 return data[0] << 8 | data[1];
49 }
50
u24at(const uint8_t * data)51 static uint32_t u24at(const uint8_t *data) {
52 return u16at(data) << 16 | data[2];
53 }
54
u32at(const uint8_t * data)55 static uint32_t u32at(const uint8_t *data) {
56 return u16at(data) << 16 | u16at(&data[2]);
57 }
58
u64at(const uint8_t * data)59 static uint64_t u64at(const uint8_t *data) {
60 return (uint64_t)(u32at(data)) << 32 | u32at(&data[4]);
61 }
62
63 // static
64 const int64_t ARTPConnection::kSelectTimeoutUs = 1000LL;
65 const int64_t ARTPConnection::kMinOneSecondNotifyDelayUs = 100000ll;
66
67 struct ARTPConnection::StreamInfo {
68 bool isIPv6;
69 int mRTPSocket;
70 int mRTCPSocket;
71 sp<ASessionDescription> mSessionDesc;
72 size_t mIndex;
73 sp<AMessage> mNotifyMsg;
74 KeyedVector<uint32_t, sp<ARTPSource> > mSources;
75
76 int64_t mNumRTCPPacketsReceived;
77 int64_t mNumRTPPacketsReceived;
78 struct sockaddr_in mRemoteRTCPAddr;
79 struct sockaddr_in6 mRemoteRTCPAddr6;
80
81 bool mIsInjected;
82
83 // A place to save time when it polls
84 int64_t mLastPollTimeUs;
85 // RTCP Extension for CVO
86 int mCVOExtMap; // will be set to 0 if cvo is not negotiated in sdp
87 };
88
ARTPConnection(uint32_t flags)89 ARTPConnection::ARTPConnection(uint32_t flags)
90 : mFlags(flags),
91 mPollEventPending(false),
92 mLastReceiverReportTimeUs(-1),
93 mLastBitrateReportTimeUs(-1),
94 mLastCongestionNotifyTimeUs(-1),
95 mTargetBitrate(-1),
96 mRtpSockOptEcn(0),
97 mIsIPv6(false),
98 mStaticJitterTimeMs(kStaticJitterTimeMs) {
99 }
100
~ARTPConnection()101 ARTPConnection::~ARTPConnection() {
102 }
103
addStream(int rtpSocket,int rtcpSocket,const sp<ASessionDescription> & sessionDesc,size_t index,const sp<AMessage> & notify,bool injected)104 void ARTPConnection::addStream(
105 int rtpSocket, int rtcpSocket,
106 const sp<ASessionDescription> &sessionDesc,
107 size_t index,
108 const sp<AMessage> ¬ify,
109 bool injected) {
110 sp<AMessage> msg = new AMessage(kWhatAddStream, this);
111 msg->setInt32("rtp-socket", rtpSocket);
112 msg->setInt32("rtcp-socket", rtcpSocket);
113 msg->setObject("session-desc", sessionDesc);
114 msg->setSize("index", index);
115 msg->setMessage("notify", notify);
116 msg->setInt32("injected", injected);
117 msg->post();
118 }
119
seekStream()120 void ARTPConnection::seekStream() {
121 sp<AMessage> msg = new AMessage(kWhatSeekStream, this);
122 msg->post();
123 }
124
removeStream(int rtpSocket,int rtcpSocket)125 void ARTPConnection::removeStream(int rtpSocket, int rtcpSocket) {
126 sp<AMessage> msg = new AMessage(kWhatRemoveStream, this);
127 msg->setInt32("rtp-socket", rtpSocket);
128 msg->setInt32("rtcp-socket", rtcpSocket);
129 msg->post();
130 }
131
bumpSocketBufferSize(int s)132 static void bumpSocketBufferSize(int s) {
133 int size = 256 * 1024;
134 CHECK_EQ(setsockopt(s, SOL_SOCKET, SO_RCVBUF, &size, sizeof(size)), 0);
135 }
136
137 // static
MakePortPair(int * rtpSocket,int * rtcpSocket,unsigned * rtpPort)138 void ARTPConnection::MakePortPair(
139 int *rtpSocket, int *rtcpSocket, unsigned *rtpPort) {
140 *rtpSocket = socket(AF_INET, SOCK_DGRAM, 0);
141 CHECK_GE(*rtpSocket, 0);
142
143 bumpSocketBufferSize(*rtpSocket);
144
145 *rtcpSocket = socket(AF_INET, SOCK_DGRAM, 0);
146 CHECK_GE(*rtcpSocket, 0);
147
148 bumpSocketBufferSize(*rtcpSocket);
149
150 /* rand() * 1000 may overflow int type, use long long */
151 unsigned start = (unsigned)((rand()* 1000LL)/RAND_MAX) + 15550;
152 start &= ~1;
153
154 for (unsigned port = start; port < 65535; port += 2) {
155 struct sockaddr_in addr;
156 memset(addr.sin_zero, 0, sizeof(addr.sin_zero));
157 addr.sin_family = AF_INET;
158 addr.sin_addr.s_addr = htonl(INADDR_ANY);
159 addr.sin_port = htons(port);
160
161 if (bind(*rtpSocket,
162 (const struct sockaddr *)&addr, sizeof(addr)) < 0) {
163 continue;
164 }
165
166 addr.sin_port = htons(port + 1);
167
168 if (bind(*rtcpSocket,
169 (const struct sockaddr *)&addr, sizeof(addr)) == 0) {
170 *rtpPort = port;
171 return;
172 } else {
173 // we should recreate a RTP socket to avoid bind other port in same RTP socket
174 close(*rtpSocket);
175
176 *rtpSocket = socket(AF_INET, SOCK_DGRAM, 0);
177 CHECK_GE(*rtpSocket, 0);
178 bumpSocketBufferSize(*rtpSocket);
179 }
180 }
181
182 TRESPASS();
183 }
184
185 // static
MakeRTPSocketPair(int * rtpSocket,int * rtcpSocket,const char * localIp,const char * remoteIp,unsigned localPort,unsigned remotePort,int64_t socketNetwork,int32_t sockOptEcn)186 void ARTPConnection::MakeRTPSocketPair(
187 int *rtpSocket, int *rtcpSocket, const char *localIp, const char *remoteIp,
188 unsigned localPort, unsigned remotePort, int64_t socketNetwork, int32_t sockOptEcn) {
189 bool isIPv6 = false;
190 if (strchr(localIp, ':') != NULL)
191 isIPv6 = true;
192
193 *rtpSocket = socket(isIPv6 ? AF_INET6 : AF_INET, SOCK_DGRAM, 0);
194 CHECK_GE(*rtpSocket, 0);
195
196 bumpSocketBufferSize(*rtpSocket);
197
198 *rtcpSocket = socket(isIPv6 ? AF_INET6 : AF_INET, SOCK_DGRAM, 0);
199 CHECK_GE(*rtcpSocket, 0);
200
201 if (socketNetwork != 0) {
202 ALOGD("trying to bind rtp socket(%d) to network(%llu).",
203 *rtpSocket, (unsigned long long)socketNetwork);
204
205 int result = android_setsocknetwork((net_handle_t)socketNetwork, *rtpSocket);
206 if (result != 0) {
207 ALOGW("failed(%d) to bind rtp socket(%d) to network(%llu)",
208 result, *rtpSocket, (unsigned long long)socketNetwork);
209 }
210 result = android_setsocknetwork((net_handle_t)socketNetwork, *rtcpSocket);
211 if (result != 0) {
212 ALOGW("failed(%d) to bind rtcp socket(%d) to network(%llu)",
213 result, *rtcpSocket, (unsigned long long)socketNetwork);
214 }
215 }
216
217 if (sockOptEcn != 0) {
218 int sockOptForTOS = 1;
219 if (setsockopt(*rtpSocket, isIPv6 ? IPPROTO_IPV6 : IPPROTO_IP,
220 isIPv6 ? IPV6_RECVTCLASS : IP_RECVTOS,
221 (int *)&sockOptForTOS, sizeof(sockOptForTOS)) < 0) {
222 ALOGE("failed to set recv sockopt TOS on rtpsock(%d). err=%s", *rtpSocket,
223 strerror(errno));
224 } else {
225 ALOGD("successfully set recv sockopt TOS on rtpsock(%d)", *rtpSocket);
226 int result = setsockopt(*rtcpSocket, isIPv6 ? IPPROTO_IPV6 : IPPROTO_IP,
227 isIPv6 ? IPV6_RECVTCLASS : IP_RECVTOS,
228 (int *)&sockOptForTOS, sizeof(sockOptForTOS));
229 if (result >= 0) {
230 ALOGD("successfully set recv sockopt TOS on rtcpsock(%d).", *rtcpSocket);
231 }
232 }
233 }
234
235 bumpSocketBufferSize(*rtcpSocket);
236
237 struct sockaddr *addr;
238 struct sockaddr_in addr4;
239 struct sockaddr_in6 addr6;
240
241 if (isIPv6) {
242 addr = (struct sockaddr *)&addr6;
243 memset(&addr6, 0, sizeof(addr6));
244 addr6.sin6_family = AF_INET6;
245 inet_pton(AF_INET6, localIp, &addr6.sin6_addr);
246 addr6.sin6_port = htons((uint16_t)localPort);
247 } else {
248 addr = (struct sockaddr *)&addr4;
249 memset(&addr4, 0, sizeof(addr4));
250 addr4.sin_family = AF_INET;
251 addr4.sin_addr.s_addr = inet_addr(localIp);
252 addr4.sin_port = htons((uint16_t)localPort);
253 }
254
255 int sockopt = 1;
256 setsockopt(*rtpSocket, SOL_SOCKET, SO_REUSEADDR, (int *)&sockopt, sizeof(sockopt));
257 setsockopt(*rtcpSocket, SOL_SOCKET, SO_REUSEADDR, (int *)&sockopt, sizeof(sockopt));
258
259 int sizeSockSt = isIPv6 ? sizeof(addr6) : sizeof(addr4);
260
261 if (bind(*rtpSocket, addr, sizeSockSt) == 0) {
262 ALOGI("rtp socket successfully binded. addr=%s:%d", localIp, localPort);
263 } else {
264 ALOGE("failed to bind rtp socket addr=%s:%d err=%s", localIp, localPort, strerror(errno));
265 return;
266 }
267
268 if (isIPv6)
269 addr6.sin6_port = htons(localPort + 1);
270 else
271 addr4.sin_port = htons(localPort + 1);
272
273 if (bind(*rtcpSocket, addr, sizeSockSt) == 0) {
274 ALOGI("rtcp socket successfully binded. addr=%s:%d", localIp, localPort + 1);
275 } else {
276 ALOGE("failed to bind rtcp socket addr=%s:%d err=%s", localIp,
277 localPort + 1, strerror(errno));
278 }
279
280 // Re uses addr variable as remote addr.
281 if (isIPv6) {
282 memset(&addr6, 0, sizeof(addr6));
283 addr6.sin6_family = AF_INET6;
284 inet_pton(AF_INET6, remoteIp, &addr6.sin6_addr);
285 addr6.sin6_port = htons((uint16_t)remotePort);
286 } else {
287 memset(&addr4, 0, sizeof(addr4));
288 addr4.sin_family = AF_INET;
289 addr4.sin_addr.s_addr = inet_addr(remoteIp);
290 addr4.sin_port = htons((uint16_t)remotePort);
291 }
292 if (connect(*rtpSocket, addr, sizeSockSt) == 0) {
293 ALOGI("rtp socket successfully connected to remote=%s:%d", remoteIp, remotePort);
294 } else {
295 ALOGE("failed to connect rtp socket to remote addr=%s:%d err=%s", remoteIp,
296 remotePort, strerror(errno));
297 return;
298 }
299
300 if (isIPv6)
301 addr6.sin6_port = htons(remotePort + 1);
302 else
303 addr4.sin_port = htons(remotePort + 1);
304
305 if (connect(*rtcpSocket, addr, sizeSockSt) == 0) {
306 ALOGI("rtcp socket successfully connected to remote=%s:%d", remoteIp, remotePort + 1);
307 } else {
308 ALOGE("failed to connect rtcp socket addr=%s:%d err=%s", remoteIp,
309 remotePort + 1, strerror(errno));
310 return;
311 }
312 }
313
onMessageReceived(const sp<AMessage> & msg)314 void ARTPConnection::onMessageReceived(const sp<AMessage> &msg) {
315 switch (msg->what()) {
316 case kWhatAddStream:
317 {
318 onAddStream(msg);
319 break;
320 }
321
322 case kWhatSeekStream:
323 {
324 onSeekStream(msg);
325 break;
326 }
327
328 case kWhatRemoveStream:
329 {
330 onRemoveStream(msg);
331 break;
332 }
333
334 case kWhatPollStreams:
335 {
336 onPollStreams();
337 break;
338 }
339
340 case kWhatAlarmStream:
341 {
342 onAlarmStream(msg);
343 break;
344 }
345
346 case kWhatInjectPacket:
347 {
348 onInjectPacket(msg);
349 break;
350 }
351
352 default:
353 {
354 TRESPASS();
355 break;
356 }
357 }
358 }
359
onAddStream(const sp<AMessage> & msg)360 void ARTPConnection::onAddStream(const sp<AMessage> &msg) {
361 mStreams.push_back(StreamInfo());
362 StreamInfo *info = &*--mStreams.end();
363
364 int32_t s;
365 CHECK(msg->findInt32("rtp-socket", &s));
366 info->mRTPSocket = s;
367 CHECK(msg->findInt32("rtcp-socket", &s));
368 info->mRTCPSocket = s;
369
370 int32_t injected;
371 CHECK(msg->findInt32("injected", &injected));
372
373 info->mIsInjected = injected;
374
375 sp<RefBase> obj;
376 CHECK(msg->findObject("session-desc", &obj));
377 info->mSessionDesc = static_cast<ASessionDescription *>(obj.get());
378
379 CHECK(msg->findSize("index", &info->mIndex));
380 CHECK(msg->findMessage("notify", &info->mNotifyMsg));
381
382 info->mNumRTCPPacketsReceived = 0;
383 info->mNumRTPPacketsReceived = 0;
384 memset(&info->mRemoteRTCPAddr, 0, sizeof(info->mRemoteRTCPAddr));
385 memset(&info->mRemoteRTCPAddr6, 0, sizeof(info->mRemoteRTCPAddr6));
386
387 sp<ASessionDescription> sessionDesc = info->mSessionDesc;
388 info->mCVOExtMap = 0;
389 for (size_t i = 1; i < sessionDesc->countTracks(); ++i) {
390 int32_t cvoExtMap;
391 if (sessionDesc->getCvoExtMap(i, &cvoExtMap)) {
392 info->mCVOExtMap = cvoExtMap;
393 ALOGI("urn:3gpp:video-orientation(cvo) found as extmap:%d", info->mCVOExtMap);
394 } else {
395 ALOGI("urn:3gpp:video-orientation(cvo) not found :%d", info->mCVOExtMap);
396 }
397 }
398
399 if (!injected) {
400 postPollEvent();
401 }
402 }
403
onSeekStream(const sp<AMessage> & msg)404 void ARTPConnection::onSeekStream(const sp<AMessage> &msg) {
405 (void)msg; // unused param as of now.
406 List<StreamInfo>::iterator it = mStreams.begin();
407 while (it != mStreams.end()) {
408 for (size_t i = 0; i < it->mSources.size(); ++i) {
409 sp<ARTPSource> source = it->mSources.valueAt(i);
410 source->timeReset();
411 }
412 ++it;
413 }
414 }
415
onRemoveStream(const sp<AMessage> & msg)416 void ARTPConnection::onRemoveStream(const sp<AMessage> &msg) {
417 int32_t rtpSocket, rtcpSocket;
418 CHECK(msg->findInt32("rtp-socket", &rtpSocket));
419 CHECK(msg->findInt32("rtcp-socket", &rtcpSocket));
420
421 List<StreamInfo>::iterator it = mStreams.begin();
422 while (it != mStreams.end()
423 && (it->mRTPSocket != rtpSocket || it->mRTCPSocket != rtcpSocket)) {
424 ++it;
425 }
426
427 if (it == mStreams.end()) {
428 return;
429 }
430
431 mStreams.erase(it);
432 }
433
postPollEvent()434 void ARTPConnection::postPollEvent() {
435 if (mPollEventPending) {
436 return;
437 }
438
439 sp<AMessage> msg = new AMessage(kWhatPollStreams, this);
440 msg->post();
441
442 mPollEventPending = true;
443 }
444
onPollStreams()445 void ARTPConnection::onPollStreams() {
446 mPollEventPending = false;
447
448 if (mStreams.empty()) {
449 return;
450 }
451
452 struct timeval tv;
453 tv.tv_sec = 0;
454 tv.tv_usec = kSelectTimeoutUs;
455
456 fd_set rs;
457 FD_ZERO(&rs);
458
459 int maxSocket = -1;
460 for (List<StreamInfo>::iterator it = mStreams.begin();
461 it != mStreams.end(); ++it) {
462 if ((*it).mIsInjected) {
463 continue;
464 }
465
466 FD_SET(it->mRTPSocket, &rs);
467 FD_SET(it->mRTCPSocket, &rs);
468
469 if (it->mRTPSocket > maxSocket) {
470 maxSocket = it->mRTPSocket;
471 }
472 if (it->mRTCPSocket > maxSocket) {
473 maxSocket = it->mRTCPSocket;
474 }
475 }
476
477 if (maxSocket == -1) {
478 return;
479 }
480
481 int64_t nowUs = ALooper::GetNowUs();
482 int res = select(maxSocket + 1, &rs, NULL, NULL, &tv);
483
484 if (res > 0) {
485 List<StreamInfo>::iterator it = mStreams.begin();
486 while (it != mStreams.end()) {
487 if ((*it).mIsInjected) {
488 ++it;
489 continue;
490 }
491 it->mLastPollTimeUs = nowUs;
492
493 status_t err = OK;
494 if (FD_ISSET(it->mRTPSocket, &rs)) {
495 err = receive(&*it, true);
496 }
497 if (err == OK && FD_ISSET(it->mRTCPSocket, &rs)) {
498 err = receive(&*it, false);
499 }
500
501 if (err == -ECONNRESET) {
502 // socket failure, this stream is dead, Jim.
503 for (size_t i = 0; i < it->mSources.size(); ++i) {
504 sp<AMessage> notify = it->mNotifyMsg->dup();
505 notify->setInt32("rtcp-event", 1);
506 notify->setInt32("payload-type", 400);
507 notify->setInt32("feedback-type", 1);
508 notify->setInt32("sender", it->mSources.valueAt(i)->getSelfID());
509 notify->post();
510
511 ALOGW("failed to receive RTP/RTCP datagram.");
512 }
513 it = mStreams.erase(it);
514 continue;
515 }
516
517 // add NACK and FIR that needs to be sent immediately.
518 sp<ABuffer> buffer = new ABuffer(kMaxUDPSize);
519 for (size_t i = 0; i < it->mSources.size(); ++i) {
520 buffer->setRange(0, 0);
521 int cnt = it->mSources.valueAt(i)->addNACK(buffer);
522 if (cnt > 0) {
523 ALOGV("Send NACK for lost %d Packets", cnt);
524 send(&*it, buffer);
525 }
526
527 buffer->setRange(0, 0);
528 it->mSources.valueAt(i)->addFIR(buffer);
529 if (buffer->size() > 0) {
530 ALOGD("Send FIR immediately for lost Packets");
531 send(&*it, buffer);
532 }
533
534 buffer->setRange(0, 0);
535 it->mSources.valueAt(i)->addTMMBR(buffer, mTargetBitrate);
536 mTargetBitrate = -1;
537 if (buffer->size() > 0) {
538 ALOGV("Sending TMMBR...");
539 ssize_t n = send(&*it, buffer);
540
541 if (n != (ssize_t)buffer->size()) {
542 ALOGW("failed to send RTCP TMMBR (%s).",
543 n >= 0 ? "connection gone" : strerror(errno));
544 continue;
545 }
546 }
547 }
548
549 ++it;
550 }
551 }
552
553 checkRxBitrate(nowUs);
554
555 if (mLastReceiverReportTimeUs <= 0
556 || mLastReceiverReportTimeUs + 5000000LL <= nowUs) {
557 sp<ABuffer> buffer = new ABuffer(kMaxUDPSize);
558 List<StreamInfo>::iterator it = mStreams.begin();
559 while (it != mStreams.end()) {
560 StreamInfo *s = &*it;
561
562 if (s->mIsInjected) {
563 ++it;
564 continue;
565 }
566
567 if (s->mNumRTCPPacketsReceived == 0) {
568 // We have never received any RTCP packets on this stream,
569 // we don't even know where to send a report.
570 ++it;
571 continue;
572 }
573
574 buffer->setRange(0, 0);
575
576 for (size_t i = 0; i < s->mSources.size(); ++i) {
577 sp<ARTPSource> source = s->mSources.valueAt(i);
578
579 source->addReceiverReport(buffer);
580
581 if (mFlags & kRegularlyRequestFIR) {
582 source->addFIR(buffer);
583 }
584 }
585
586 if (buffer->size() > 0) {
587 ALOGV("Sending RR...");
588
589 ssize_t n = send(s, buffer);
590
591 if (n != (ssize_t)buffer->size()) {
592 ALOGW("failed to send RTCP receiver report (%s).",
593 n >= 0 ? "connection gone" : strerror(errno));
594 ++it;
595 continue;
596 }
597
598 mLastReceiverReportTimeUs = nowUs;
599 }
600
601 ++it;
602 }
603 }
604
605 if (!mStreams.empty()) {
606 postPollEvent();
607 }
608 }
609
onAlarmStream(const sp<AMessage> msg)610 void ARTPConnection::onAlarmStream(const sp<AMessage> msg) {
611 sp<ARTPSource> source = nullptr;
612 if (msg->findObject("source", (sp<android::RefBase>*)&source)) {
613 source->processRTPPacket();
614 }
615 }
616
receive(StreamInfo * s,bool receiveRTP)617 status_t ARTPConnection::receive(StreamInfo *s, bool receiveRTP) {
618 ALOGV("receiving %s", receiveRTP ? "RTP" : "RTCP");
619
620 CHECK(!s->mIsInjected);
621
622 sp<ABuffer> buffer = new ABuffer(65536);
623
624 struct msghdr sMsg = {};
625 struct iovec sIov[1] = {};
626
627 sIov[0].iov_base = (char *) buffer->data();
628 sIov[0].iov_len = buffer->capacity();
629
630 sMsg.msg_iov = sIov;
631 sMsg.msg_iovlen = 1;
632
633 int cMsgSize = sizeof(struct cmsghdr) + sizeof(uint8_t);
634 char buf[CMSG_SPACE(cMsgSize)];
635 sMsg.msg_control = buf;
636 sMsg.msg_controllen = sizeof(buf);
637 sMsg.msg_flags = 0;
638
639 ssize_t nbytes;
640 do {
641 // Used recvmsg to get the TOS header of incoming packet
642 nbytes = recvmsg(receiveRTP ? s->mRTPSocket : s->mRTCPSocket, &sMsg, 0);
643 mCumulativeBytes += nbytes;
644 } while (nbytes < 0 && errno == EINTR);
645
646 if (nbytes <= 0) {
647 ALOGW("failed to recv rtp packet. cause=%s", strerror(errno));
648 // ECONNREFUSED may happen in next recvfrom() calling if one of
649 // outgoing packet can not be delivered to remote by using sendto()
650 if (errno == ECONNREFUSED) {
651 return -ECONNREFUSED;
652 } else {
653 return -ECONNRESET;
654 }
655 }
656
657 if (nbytes > 0) {
658 handleIpHeadersIfReceived(s, sMsg);
659 }
660
661 buffer->setRange(0, nbytes);
662
663 // ALOGI("received %d bytes.", buffer->size());
664
665 status_t err;
666 if (receiveRTP) {
667 err = parseRTP(s, buffer);
668 } else {
669 err = parseRTCP(s, buffer);
670 }
671
672 return err;
673 }
674
675 /* This function will check if TOS is present or not in received IP packet.
676 * After that if it is present then it will notify about congestion to upper
677 * layer if CE bit is set in TOS header.
678 **/
handleIpHeadersIfReceived(StreamInfo * s,struct msghdr sMsg)679 void ARTPConnection::handleIpHeadersIfReceived(StreamInfo *s, struct msghdr sMsg) {
680 struct cmsghdr *cMsg;
681 cMsg = CMSG_FIRSTHDR(&sMsg);
682
683 if (cMsg == NULL) {
684 ALOGV("cmsg is null");
685 }
686
687 for (; cMsg != NULL; cMsg = CMSG_NXTHDR(&sMsg, cMsg)) {
688 bool isTOSHeader = ((cMsg->cmsg_level == (mIsIPv6 ? IPPROTO_IPV6 : IPPROTO_IP))
689 && (cMsg->cmsg_type == (mIsIPv6 ? IPV6_TCLASS : IP_TOS))
690 && (cMsg->cmsg_len));
691 if (isTOSHeader) {
692 uint8_t receivedTOS;
693 receivedTOS = *((uint8_t *) CMSG_DATA(cMsg));
694 // checking CE bit is set
695 bool isCEBitMarked = ((receivedTOS & INET_ECN_MASK) == INET_ECN_CE);
696
697 ALOGV("receivedTos(value -> %d)", receivedTOS);
698
699 if (isCEBitMarked) {
700 ALOGD("receivedTos(value -> %d), is ECN CE marked = %d",
701 receivedTOS, isCEBitMarked);
702 notifyCongestionToUpperLayerIfNeeded(s);
703 }
704 break;
705 }
706 }
707 }
708
709 /* this function will be use to notify congestion in video call to upper layer */
notifyCongestionToUpperLayerIfNeeded(StreamInfo * s)710 void ARTPConnection::notifyCongestionToUpperLayerIfNeeded(StreamInfo *s) {
711 int64_t nowUs = ALooper::GetNowUs();
712
713 if (mLastCongestionNotifyTimeUs <= 0) {
714 mLastCongestionNotifyTimeUs = nowUs;
715 }
716
717 bool isNeedToUpdate = (mLastCongestionNotifyTimeUs + kMinOneSecondNotifyDelayUs <= nowUs);
718 ALOGD("ECN info set by upper layer=%d, isNeedToUpdate=%d", mRtpSockOptEcn, isNeedToUpdate);
719
720 if ((mRtpSockOptEcn != 0) && (isNeedToUpdate)) {
721 sp<AMessage> notify = s->mNotifyMsg->dup();
722 notify->setInt32("rtcp-event", 1);
723 notify->setInt32("payload-type", ARTPSource::RTP_QUALITY_CD);
724 notify->post();
725 mLastCongestionNotifyTimeUs = nowUs;
726 ALOGD("Congestion detected in n/w, Notify upper layer");
727 }
728 }
729
send(const StreamInfo * info,const sp<ABuffer> buffer)730 ssize_t ARTPConnection::send(const StreamInfo *info, const sp<ABuffer> buffer) {
731 struct sockaddr* pRemoteRTCPAddr;
732 int sizeSockSt;
733
734 /* It seems this isIPv6 variable is useless.
735 * We should remove it to prevent confusion */
736 if (mIsIPv6) {
737 pRemoteRTCPAddr = (struct sockaddr *)&info->mRemoteRTCPAddr6;
738 sizeSockSt = sizeof(struct sockaddr_in6);
739 } else {
740 pRemoteRTCPAddr = (struct sockaddr *)&info->mRemoteRTCPAddr;
741 sizeSockSt = sizeof(struct sockaddr_in);
742 }
743
744 if (mFlags & kViLTEConnection) {
745 ALOGV("ViLTE RTCP");
746 pRemoteRTCPAddr = NULL;
747 sizeSockSt = 0;
748 }
749
750 ssize_t n;
751 do {
752 n = sendto(
753 info->mRTCPSocket, buffer->data(), buffer->size(), 0,
754 pRemoteRTCPAddr, sizeSockSt);
755 } while (n < 0 && errno == EINTR);
756
757 if (n < 0) {
758 ALOGW("failed to send rtcp packet. cause=%s", strerror(errno));
759 }
760
761 return n;
762 }
763
parseRTP(StreamInfo * s,const sp<ABuffer> & buffer)764 status_t ARTPConnection::parseRTP(StreamInfo *s, const sp<ABuffer> &buffer) {
765 size_t size = buffer->size();
766
767 if (size < 12) {
768 // Too short to be a valid RTP header.
769 return -1;
770 }
771
772 const uint8_t *data = buffer->data();
773
774 if ((data[0] >> 6) != 2) {
775 // Unsupported version.
776 return -1;
777 }
778
779 if ((data[1] & 0x7f) == 20 /* decimal */) {
780 // Unassigned payload type
781 return -1;
782 }
783
784 if (data[0] & 0x20) {
785 // Padding present.
786
787 size_t paddingLength = data[size - 1];
788
789 if (paddingLength + 12 > size) {
790 // If we removed this much padding we'd end up with something
791 // that's too short to be a valid RTP header.
792 return -1;
793 }
794
795 size -= paddingLength;
796 }
797
798 int numCSRCs = data[0] & 0x0f;
799
800 size_t payloadOffset = 12 + 4 * numCSRCs;
801
802 if (size < payloadOffset) {
803 // Not enough data to fit the basic header and all the CSRC entries.
804 return -1;
805 }
806
807 int32_t cvoDegrees = -1;
808 if (data[0] & 0x10) {
809 // Header eXtension present.
810
811 if (size < payloadOffset + 4) {
812 // Not enough data to fit the basic header, all CSRC entries
813 // and the first 4 bytes of the extension header.
814
815 return -1;
816 }
817
818 const uint8_t *extensionData = &data[payloadOffset];
819
820 size_t extensionLength =
821 (4 * (extensionData[2] << 8 | extensionData[3])) + 4;
822
823 if (size < payloadOffset + extensionLength) {
824 return -1;
825 }
826
827 parseRTPExt(s, (const uint8_t *)extensionData, extensionLength, &cvoDegrees);
828 payloadOffset += extensionLength;
829 }
830
831 uint32_t srcId = u32at(&data[8]);
832
833 sp<ARTPSource> source = findSource(s, srcId);
834
835 uint32_t rtpTime = u32at(&data[4]);
836
837 sp<AMessage> meta = buffer->meta();
838 meta->setInt32("ssrc", srcId);
839 meta->setInt32("rtp-time", rtpTime);
840 meta->setInt32("PT", data[1] & 0x7f);
841 meta->setInt32("M", data[1] >> 7);
842 if (cvoDegrees >= 0) {
843 meta->setInt32("cvo", cvoDegrees);
844 }
845
846 int32_t seq = u16at(&data[2]);
847 buffer->setInt32Data(seq);
848 buffer->setRange(payloadOffset, size - payloadOffset);
849
850 if (s->mNumRTPPacketsReceived++ == 0) {
851 sp<AMessage> notify = s->mNotifyMsg->dup();
852 notify->setInt32("first-rtp", true);
853 notify->setInt32("rtcp-event", 1);
854 notify->setInt32("payload-type", ARTPSource::RTP_FIRST_PACKET);
855 notify->setInt32("rtp-time", (int32_t)rtpTime);
856 notify->setInt32("rtp-seq-num", seq);
857 notify->setInt64("recv-time-us", ALooper::GetNowUs());
858 notify->post();
859
860 ALOGD("send first-rtp event to upper layer");
861 }
862
863 source->processRTPPacket(buffer);
864
865 return OK;
866 }
867
parseRTPExt(StreamInfo * s,const uint8_t * extHeader,size_t extLen,int32_t * cvoDegrees)868 status_t ARTPConnection::parseRTPExt(StreamInfo *s,
869 const uint8_t *extHeader, size_t extLen, int32_t *cvoDegrees) {
870 if (extLen < 4)
871 return -1;
872
873 uint16_t header = (extHeader[0] << 8) | (extHeader[1]);
874 bool isOnebyteHeader = false;
875
876 if (header == 0xBEDE) {
877 isOnebyteHeader = true;
878 } else if (header == 0x1000) {
879 ALOGW("parseRTPExt: two-byte header is not implemented yet");
880 return -1;
881 } else {
882 ALOGW("parseRTPExt: can not recognize header");
883 return -1;
884 }
885
886 const uint8_t *extPayload = extHeader + 4;
887 extLen -= 4;
888 size_t offset = 0; //start from first payload of rtp extension.
889 // one-byte header parser
890 while (isOnebyteHeader && offset < extLen) {
891 uint8_t extmapId = extPayload[offset] >> 4;
892 uint8_t length = (extPayload[offset] & 0xF) + 1;
893 offset++;
894
895 // padding case
896 if (extmapId == 0)
897 continue;
898
899 uint8_t data[16]; // maximum length value
900 for (uint8_t j = 0; offset + j <= extLen && j < length; j++) {
901 data[j] = extPayload[offset + j];
902 }
903
904 offset += length;
905
906 if (extmapId == s->mCVOExtMap) {
907 *cvoDegrees = (int32_t)data[0];
908 return OK;
909 }
910 }
911
912 return BAD_VALUE;
913 }
914
parseRTCP(StreamInfo * s,const sp<ABuffer> & buffer)915 status_t ARTPConnection::parseRTCP(StreamInfo *s, const sp<ABuffer> &buffer) {
916 if (s->mNumRTCPPacketsReceived++ == 0) {
917 sp<AMessage> notify = s->mNotifyMsg->dup();
918 notify->setInt32("first-rtcp", true);
919 notify->setInt32("rtcp-event", 1);
920 notify->setInt32("payload-type", ARTPSource::RTCP_FIRST_PACKET);
921 notify->setInt64("recv-time-us", ALooper::GetNowUs());
922 notify->post();
923
924 ALOGD("send first-rtcp event to upper layer");
925 }
926
927 const uint8_t *data = buffer->data();
928 size_t size = buffer->size();
929
930 while (size > 0) {
931 if (size < 8) {
932 // Too short to be a valid RTCP header
933 return -1;
934 }
935
936 if ((data[0] >> 6) != 2) {
937 // Unsupported version.
938 return -1;
939 }
940
941 if (data[0] & 0x20) {
942 // Padding present.
943
944 size_t paddingLength = data[size - 1];
945
946 if (paddingLength + 12 > size) {
947 // If we removed this much padding we'd end up with something
948 // that's too short to be a valid RTP header.
949 return -1;
950 }
951
952 size -= paddingLength;
953 }
954
955 size_t headerLength = 4 * (data[2] << 8 | data[3]) + 4;
956
957 if (size < headerLength) {
958 // Only received a partial packet?
959 return -1;
960 }
961
962 switch (data[1]) {
963 case 200:
964 {
965 parseSenderReport(s, data, headerLength);
966 break;
967 }
968
969 case 201: // RR
970 {
971 parseReceiverReport(s, data, headerLength);
972 break;
973 }
974 case 202: // SDES
975 case 204: // APP
976 break;
977
978 case 205: // TSFB (transport layer specific feedback)
979 parseTSFB(s, data, headerLength);
980 break;
981 case 206: // PSFB (payload specific feedback)
982 // hexdump(data, headerLength);
983 parsePSFB(s, data, headerLength);
984 ALOGI("RTCP packet type %u of size %zu", (unsigned)data[1], headerLength);
985 break;
986
987 case 203:
988 {
989 parseBYE(s, data, headerLength);
990 break;
991 }
992
993 default:
994 {
995 ALOGW("Unknown RTCP packet type %u of size %zu",
996 (unsigned)data[1], headerLength);
997 break;
998 }
999 }
1000
1001 data += headerLength;
1002 size -= headerLength;
1003 }
1004
1005 return OK;
1006 }
1007
parseBYE(StreamInfo * s,const uint8_t * data,size_t size)1008 status_t ARTPConnection::parseBYE(
1009 StreamInfo *s, const uint8_t *data, size_t size) {
1010 size_t SC = data[0] & 0x3f;
1011
1012 if (SC == 0 || size < (4 + SC * 4)) {
1013 // Packet too short for the minimal BYE header.
1014 return -1;
1015 }
1016
1017 uint32_t id = u32at(&data[4]);
1018
1019 sp<ARTPSource> source = findSource(s, id);
1020
1021 // Report a final stastics to be used for rtp data usage.
1022 int64_t nowUs = ALooper::GetNowUs();
1023 int32_t timeDiff = (nowUs - mLastBitrateReportTimeUs) / 1000000ll;
1024 int32_t bitrate = mCumulativeBytes * 8 / timeDiff;
1025 source->notifyPktInfo(bitrate, nowUs, true /* isRegular */);
1026
1027 source->byeReceived();
1028
1029 return OK;
1030 }
1031
parseSenderReport(StreamInfo * s,const uint8_t * data,size_t size)1032 status_t ARTPConnection::parseSenderReport(
1033 StreamInfo *s, const uint8_t *data, size_t size) {
1034 ALOG_ASSERT(size >= 1, "parseSenderReport: invalid packet size.");
1035 size_t receptionReportCount = data[0] & 0x1f;
1036 if (size < (7 + (receptionReportCount * 6)) * 4) {
1037 // Packet too short for the minimal sender report header.
1038 return -1;
1039 }
1040
1041 int64_t recvTimeUs = ALooper::GetNowUs();
1042 uint32_t senderId = u32at(&data[4]);
1043 uint64_t ntpTime = u64at(&data[8]);
1044 uint32_t rtpTime = u32at(&data[16]);
1045 uint32_t pktCount = u32at(&data[20]);
1046 uint32_t octCount = u32at(&data[24]);
1047
1048 ALOGD("SR received: ssrc=0x%08x, rtpTime%u == ntpTime %llu, pkt=%u, oct=%u",
1049 senderId, rtpTime, (unsigned long long)ntpTime, pktCount, octCount);
1050
1051 sp<ARTPSource> source = findSource(s, senderId);
1052 source->timeUpdate(recvTimeUs, rtpTime, ntpTime);
1053
1054 for (int32_t i = 0; i < receptionReportCount; i++) {
1055 int32_t offset = 28 + (i * 24);
1056 parseReceptionReportBlock(s, recvTimeUs, senderId, data + offset, size - offset);
1057 }
1058
1059 return 0;
1060 }
1061
parseReceiverReport(StreamInfo * s,const uint8_t * data,size_t size)1062 status_t ARTPConnection::parseReceiverReport(
1063 StreamInfo *s, const uint8_t *data, size_t size) {
1064 ALOG_ASSERT(size >= 1, "parseReceiverReport: invalid packet size.");
1065 size_t receptionReportCount = data[0] & 0x1f;
1066 if (size < (2 + (receptionReportCount * 6)) * 4) {
1067 // Packet too short for the minimal receiver report header.
1068 return -1;
1069 }
1070
1071 #if 0
1072 ALOGI("XXX timeUpdate: ssrc=0x%08x, rtpTime %u == ntpTime %.3f",
1073 id,
1074 rtpTime,
1075 (ntpTime >> 32) + (double)(ntpTime & 0xffffffff) / (1ll << 32));
1076 #endif
1077 int64_t recvTimeUs = ALooper::GetNowUs();
1078 uint32_t senderId = u32at(&data[4]);
1079
1080 for (int i = 0; i < receptionReportCount; i++) {
1081 int32_t offset = 8 + (i * 24);
1082 parseReceptionReportBlock(s, recvTimeUs, senderId, data + offset, size - offset);
1083 }
1084
1085 return 0;
1086 }
1087
parseReceptionReportBlock(StreamInfo * s,int64_t recvTimeUs,uint32_t senderId,const uint8_t * data,size_t size)1088 status_t ARTPConnection::parseReceptionReportBlock(
1089 StreamInfo *s, int64_t recvTimeUs, uint32_t senderId, const uint8_t *data, size_t size) {
1090 ALOG_ASSERT(size >= 24, "parseReceptionReportBlock: invalid packet size.");
1091 if (size < 24) {
1092 // remaining size is smaller than reception report block size.
1093 return -1;
1094 }
1095
1096 uint32_t rbId = u32at(&data[0]);
1097 uint32_t fLost = data[4];
1098 int32_t cumLost = u24at(&data[5]);
1099 uint32_t ehSeq = u32at(&data[8]);
1100 uint32_t jitter = u32at(&data[12]);
1101 uint32_t lsr = u32at(&data[16]);
1102 uint32_t dlsr = u32at(&data[20]);
1103
1104 ALOGD("Reception Report Block: t:%llu sid:%u rid:%u fl:%u cl:%u hs:%u jt:%u lsr:%u dlsr:%u",
1105 (unsigned long long)recvTimeUs, senderId, rbId, fLost, cumLost,
1106 ehSeq, jitter, lsr, dlsr);
1107 sp<ARTPSource> source = findSource(s, senderId);
1108 sp<ReceptionReportBlock> rrb = new ReceptionReportBlock(
1109 rbId, fLost, cumLost, ehSeq, jitter, lsr, dlsr);
1110 source->processReceptionReportBlock(recvTimeUs, senderId, rrb);
1111
1112 return 0;
1113 }
1114
parseTSFB(StreamInfo * s,const uint8_t * data,size_t size)1115 status_t ARTPConnection::parseTSFB(
1116 StreamInfo *s, const uint8_t *data, size_t size) {
1117 if (size < 12) {
1118 // broken packet
1119 return -1;
1120 }
1121
1122 uint8_t msgType = data[0] & 0x1f;
1123 uint32_t id = u32at(&data[4]);
1124
1125 const uint8_t *ptr = &data[12];
1126 size -= 12;
1127
1128 using namespace std;
1129 size_t FCISize;
1130 switch(msgType) {
1131 case 1: // Generic NACK
1132 {
1133 FCISize = 4;
1134 while (size >= FCISize) {
1135 uint16_t PID = u16at(&ptr[0]); // lost packet RTP number
1136 uint16_t BLP = u16at(&ptr[2]); // Bitmask of following Lost Packets
1137
1138 size -= FCISize;
1139 ptr += FCISize;
1140
1141 AString list_of_losts;
1142 list_of_losts.append(PID);
1143 for (int i=0 ; i<16 ; i++) {
1144 bool is_lost = BLP & (0x1 << i);
1145 if (is_lost) {
1146 list_of_losts.append(", ");
1147 list_of_losts.append(PID + i);
1148 }
1149 }
1150 ALOGI("Opponent losts packet of RTP %s", list_of_losts.c_str());
1151 }
1152 break;
1153 }
1154 case 3: // TMMBR
1155 case 4: // TMMBN
1156 {
1157 FCISize = 8;
1158 while (size >= FCISize) {
1159 uint32_t MxTBR = u32at(&ptr[4]);
1160 uint32_t MxTBRExp = MxTBR >> 26;
1161 uint32_t MxTBRMantissa = (MxTBR >> 9) & 0x01FFFF;
1162 uint32_t overhead = MxTBR & 0x01FF;
1163
1164 size -= FCISize;
1165 ptr += FCISize;
1166
1167 uint32_t bitRate = (1 << MxTBRExp) * MxTBRMantissa;
1168
1169 if (msgType == 3)
1170 ALOGI("Op -> UE Req Tx bitrate : %d X 2^%d = %d",
1171 MxTBRMantissa, MxTBRExp, bitRate);
1172 else if (msgType == 4)
1173 ALOGI("OP -> UE Noti Rx bitrate : %d X 2^%d = %d",
1174 MxTBRMantissa, MxTBRExp, bitRate);
1175
1176 sp<AMessage> notify = s->mNotifyMsg->dup();
1177 notify->setInt32("rtcp-event", 1);
1178 notify->setInt32("payload-type", 205);
1179 notify->setInt32("feedback-type", msgType);
1180 notify->setInt32("sender", id);
1181 notify->setInt32("bit-rate", bitRate);
1182 notify->post();
1183 ALOGI("overhead : %d", overhead);
1184 }
1185 break;
1186 }
1187 default:
1188 {
1189 ALOGI("Not supported TSFB type %d", msgType);
1190 break;
1191 }
1192 }
1193
1194 return 0;
1195 }
1196
parsePSFB(StreamInfo * s,const uint8_t * data,size_t size)1197 status_t ARTPConnection::parsePSFB(
1198 StreamInfo *s, const uint8_t *data, size_t size) {
1199 if (size < 12) {
1200 // broken packet
1201 return -1;
1202 }
1203
1204 uint8_t msgType = data[0] & 0x1f;
1205 uint32_t id = u32at(&data[4]);
1206
1207 const uint8_t *ptr = &data[12];
1208 size -= 12;
1209
1210 using namespace std;
1211 switch(msgType) {
1212 case 1: // Picture Loss Indication (PLI)
1213 {
1214 if (size > 0) {
1215 // PLI does not need parameters
1216 break;
1217 };
1218 sp<AMessage> notify = s->mNotifyMsg->dup();
1219 notify->setInt32("rtcp-event", 1);
1220 notify->setInt32("payload-type", 206);
1221 notify->setInt32("feedback-type", msgType);
1222 notify->setInt32("sender", id);
1223 notify->post();
1224 ALOGI("PLI detected.");
1225 break;
1226 }
1227 case 4: // Full Intra Request (FIR)
1228 {
1229 if (size < 4) {
1230 break;
1231 }
1232 uint32_t requestedId = u32at(&ptr[0]);
1233 if (requestedId == (uint32_t)mSelfID) {
1234 sp<AMessage> notify = s->mNotifyMsg->dup();
1235 notify->setInt32("rtcp-event", 1);
1236 notify->setInt32("payload-type", 206);
1237 notify->setInt32("feedback-type", msgType);
1238 notify->setInt32("sender", id);
1239 notify->post();
1240 ALOGI("FIR detected.");
1241 }
1242 break;
1243 }
1244 default:
1245 {
1246 ALOGI("Not supported PSFB type %d", msgType);
1247 break;
1248 }
1249 }
1250
1251 return 0;
1252 }
findSource(StreamInfo * info,uint32_t srcId)1253 sp<ARTPSource> ARTPConnection::findSource(StreamInfo *info, uint32_t srcId) {
1254 sp<ARTPSource> source;
1255 ssize_t index = info->mSources.indexOfKey(srcId);
1256 if (index < 0) {
1257 index = info->mSources.size();
1258
1259 source = new ARTPSource(
1260 srcId, info->mSessionDesc, info->mIndex, info->mNotifyMsg);
1261
1262 if (mFlags & kViLTEConnection) {
1263 setStaticJitterTimeMs(50);
1264 source->setPeriodicFIR(false);
1265 }
1266
1267 source->setSelfID(mSelfID);
1268 source->setStaticJitterTimeMs(mStaticJitterTimeMs);
1269 sp<AMessage> timer = new AMessage(kWhatAlarmStream, this);
1270 source->setJbTimer(timer);
1271 info->mSources.add(srcId, source);
1272 } else {
1273 source = info->mSources.valueAt(index);
1274 }
1275
1276 return source;
1277 }
1278
injectPacket(int index,const sp<ABuffer> & buffer)1279 void ARTPConnection::injectPacket(int index, const sp<ABuffer> &buffer) {
1280 sp<AMessage> msg = new AMessage(kWhatInjectPacket, this);
1281 msg->setInt32("index", index);
1282 msg->setBuffer("buffer", buffer);
1283 msg->post();
1284 }
1285
setSelfID(const uint32_t selfID)1286 void ARTPConnection::setSelfID(const uint32_t selfID) {
1287 mSelfID = selfID;
1288 }
1289
setStaticJitterTimeMs(const uint32_t jbTimeMs)1290 void ARTPConnection::setStaticJitterTimeMs(const uint32_t jbTimeMs) {
1291 mStaticJitterTimeMs = jbTimeMs;
1292 }
1293
setTargetBitrate(int32_t targetBitrate)1294 void ARTPConnection::setTargetBitrate(int32_t targetBitrate) {
1295 mTargetBitrate = targetBitrate;
1296 }
1297
setRtpSockOptEcn(int32_t sockOptEcn)1298 void ARTPConnection::setRtpSockOptEcn(int32_t sockOptEcn) {
1299 mRtpSockOptEcn = sockOptEcn;
1300 }
1301
setIsIPv6(const char * localIp)1302 void ARTPConnection::setIsIPv6(const char *localIp) {
1303 mIsIPv6 = (strchr(localIp, ':') != nullptr);
1304 }
1305
checkRxBitrate(int64_t nowUs)1306 void ARTPConnection::checkRxBitrate(int64_t nowUs) {
1307 if (mLastBitrateReportTimeUs <= 0) {
1308 mCumulativeBytes = 0;
1309 mLastBitrateReportTimeUs = nowUs;
1310 }
1311 else if (mLastEarlyNotifyTimeUs + kMinOneSecondNotifyDelayUs <= nowUs) {
1312 int32_t timeDiff = (nowUs - mLastBitrateReportTimeUs) / 1000000ll;
1313 int32_t bitrate = mCumulativeBytes * 8 / timeDiff;
1314 mLastEarlyNotifyTimeUs = nowUs;
1315
1316 List<StreamInfo>::iterator it = mStreams.begin();
1317 while (it != mStreams.end()) {
1318 StreamInfo *s = &*it;
1319 if (s->mIsInjected) {
1320 ++it;
1321 continue;
1322 }
1323 for (size_t i = 0; i < s->mSources.size(); ++i) {
1324 sp<ARTPSource> source = s->mSources.valueAt(i);
1325 if (source->isNeedToEarlyNotify()) {
1326 source->notifyPktInfo(bitrate, nowUs, false /* isRegular */);
1327 mLastEarlyNotifyTimeUs = nowUs + (1000000ll * 3600 * 24); // after 1 day
1328 }
1329 }
1330 ++it;
1331 }
1332 }
1333 else if (mLastBitrateReportTimeUs + 1000000ll <= nowUs) {
1334 int32_t timeDiff = (nowUs - mLastBitrateReportTimeUs) / 1000000ll;
1335 int32_t bitrate = mCumulativeBytes * 8 / timeDiff;
1336 ALOGI("Actual Rx bitrate : %d bits/sec", bitrate);
1337
1338 sp<ABuffer> buffer = new ABuffer(kMaxUDPSize);
1339 List<StreamInfo>::iterator it = mStreams.begin();
1340 while (it != mStreams.end()) {
1341 StreamInfo *s = &*it;
1342 if (s->mIsInjected) {
1343 ++it;
1344 continue;
1345 }
1346
1347 if (s->mNumRTCPPacketsReceived == 0) {
1348 // We have never received any RTCP packets on this stream,
1349 // we don't even know where to send a report.
1350 ++it;
1351 continue;
1352 }
1353
1354 buffer->setRange(0, 0);
1355 for (size_t i = 0; i < s->mSources.size(); ++i) {
1356 sp<ARTPSource> source = s->mSources.valueAt(i);
1357 source->notifyPktInfo(bitrate, nowUs, true /* isRegular */);
1358 }
1359 ++it;
1360 }
1361 mCumulativeBytes = 0;
1362 mLastBitrateReportTimeUs = nowUs;
1363 mLastEarlyNotifyTimeUs = nowUs;
1364 }
1365 }
onInjectPacket(const sp<AMessage> & msg)1366 void ARTPConnection::onInjectPacket(const sp<AMessage> &msg) {
1367 int32_t index;
1368 CHECK(msg->findInt32("index", &index));
1369
1370 sp<ABuffer> buffer;
1371 CHECK(msg->findBuffer("buffer", &buffer));
1372
1373 List<StreamInfo>::iterator it = mStreams.begin();
1374 while (it != mStreams.end()
1375 && it->mRTPSocket != index && it->mRTCPSocket != index) {
1376 ++it;
1377 }
1378
1379 if (it == mStreams.end()) {
1380 TRESPASS();
1381 }
1382
1383 StreamInfo *s = &*it;
1384
1385 if (it->mRTPSocket == index) {
1386 parseRTP(s, buffer);
1387 } else {
1388 parseRTCP(s, buffer);
1389 }
1390 }
1391
1392 } // namespace android
1393