1 /*
2 * Copyright (C) 2010 The Android Open Source Project
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
17 #ifndef MY_HANDLER_H_
18
19 #define MY_HANDLER_H_
20
21 //#define LOG_NDEBUG 0
22
23 #ifndef LOG_TAG
24 #define LOG_TAG "MyHandler"
25 #endif
26
27 #include <utils/Log.h>
28 #include <cutils/properties.h> // for property_get
29
30 #include "APacketSource.h"
31 #include "ARTPConnection.h"
32 #include "ARTSPConnection.h"
33 #include "ASessionDescription.h"
34 #include "NetworkUtils.h"
35
36 #include <ctype.h>
37 #include <cutils/properties.h>
38
39 #include <datasource/HTTPBase.h>
40 #include <media/stagefright/foundation/ABuffer.h>
41 #include <media/stagefright/foundation/ADebug.h>
42 #include <media/stagefright/foundation/ALooper.h>
43 #include <media/stagefright/foundation/AMessage.h>
44 #include <media/stagefright/MediaErrors.h>
45 #include <media/stagefright/Utils.h>
46 #include <media/stagefright/FoundationUtils.h>
47
48 #include <arpa/inet.h>
49 #include <sys/socket.h>
50 #include <netdb.h>
51
52
53 #if LOG_NDEBUG
54 #define UNUSED_UNLESS_VERBOSE(x) (void)(x)
55 #else
56 #define UNUSED_UNLESS_VERBOSE(x)
57 #endif
58
59 #ifndef FALLTHROUGH_INTENDED
60 #define FALLTHROUGH_INTENDED [[clang::fallthrough]] // NOLINT
61 #endif
62
63 // If no access units are received within 5 secs, assume that the rtp
64 // stream has ended and signal end of stream.
65 static int64_t kAccessUnitTimeoutUs = 10000000ll;
66
67 // If no access units arrive for the first 10 secs after starting the
68 // stream, assume none ever will and signal EOS or switch transports.
69 static int64_t kStartupTimeoutUs = 10000000ll;
70
71 static int64_t kDefaultKeepAliveTimeoutUs = 60000000ll;
72
73 static int64_t kPauseDelayUs = 3000000ll;
74
75 // The allowed maximum number of stale access units at the beginning of
76 // a new sequence.
77 static int32_t kMaxAllowedStaleAccessUnits = 20;
78
79 static int64_t kTearDownTimeoutUs = 3000000ll;
80
81 namespace android {
82
GetAttribute(const char * s,const char * key,AString * value)83 static bool GetAttribute(const char *s, const char *key, AString *value) {
84 value->clear();
85
86 size_t keyLen = strlen(key);
87
88 for (;;) {
89 while (isspace(*s)) {
90 ++s;
91 }
92
93 const char *colonPos = strchr(s, ';');
94
95 size_t len =
96 (colonPos == NULL) ? strlen(s) : colonPos - s;
97
98 if (len >= keyLen + 1 && s[keyLen] == '=' && !strncmp(s, key, keyLen)) {
99 value->setTo(&s[keyLen + 1], len - keyLen - 1);
100 return true;
101 }
102
103 if (colonPos == NULL) {
104 return false;
105 }
106
107 s = colonPos + 1;
108 }
109 }
110
111 struct MyHandler : public AHandler {
112 enum {
113 kWhatConnected = 'conn',
114 kWhatDisconnected = 'disc',
115 kWhatSeekPaused = 'spau',
116 kWhatSeekDone = 'sdon',
117
118 kWhatAccessUnit = 'accU',
119 kWhatEOS = 'eos!',
120 kWhatSeekDiscontinuity = 'seeD',
121 kWhatNormalPlayTimeMapping = 'nptM',
122 };
123
124 MyHandler(
125 const char *url,
126 const sp<AMessage> ¬ify,
127 bool uidValid = false, uid_t uid = 0)
mNotifyMyHandler128 : mNotify(notify),
129 mUIDValid(uidValid),
130 mUID(uid),
131 mNetLooper(new ALooper),
132 mConn(new ARTSPConnection(mUIDValid, mUID)),
133 mRTPConn(new ARTPConnection),
134 mOriginalSessionURL(url),
135 mSessionURL(url),
136 mSetupTracksSuccessful(false),
137 mSeekPending(false),
138 mFirstAccessUnit(true),
139 mAllTracksHaveTime(false),
140 mNTPAnchorUs(-1),
141 mMediaAnchorUs(-1),
142 mLastMediaTimeUs(0),
143 mNumAccessUnitsReceived(0),
144 mCheckPending(false),
145 mCheckGeneration(0),
146 mCheckTimeoutGeneration(0),
147 mTryTCPInterleaving(property_get_bool("rtp.transport.TCP", false)),
148 mTryFakeRTCP(false),
149 mReceivedFirstRTCPPacket(false),
150 mReceivedFirstRTPPacket(false),
151 mSeekable(true),
152 mKeepAliveTimeoutUs(kDefaultKeepAliveTimeoutUs),
153 mKeepAliveGeneration(0),
154 mPausing(false),
155 mPauseGeneration(0),
156 mPlayResponseParsed(false) {
157 mNetLooper->setName("rtsp net");
158 mNetLooper->start(false /* runOnCallingThread */,
159 false /* canCallJava */,
160 PRIORITY_HIGHEST);
161
162 // Strip any authentication info from the session url, we don't
163 // want to transmit user/pass in cleartext.
164 AString host, path, user, pass;
165 unsigned port;
166 CHECK(ARTSPConnection::ParseURL(
167 mSessionURL.c_str(), &host, &port, &path, &user, &pass));
168
169 if (user.size() > 0) {
170 mSessionURL.clear();
171 mSessionURL.append("rtsp://");
172 mSessionURL.append(host);
173 mSessionURL.append(":");
174 mSessionURL.append(AStringPrintf("%u", port));
175 mSessionURL.append(path);
176
177 ALOGV("rewritten session url: '%s'", mSessionURL.c_str());
178 }
179
180 mSessionHost = host;
181 }
182
connectMyHandler183 void connect() {
184 looper()->registerHandler(mConn);
185 (1 ? mNetLooper : looper())->registerHandler(mRTPConn);
186
187 sp<AMessage> notify = new AMessage('biny', this);
188 mConn->observeBinaryData(notify);
189
190 sp<AMessage> reply = new AMessage('conn', this);
191 mConn->connect(mOriginalSessionURL.c_str(), reply);
192 }
193
loadSDPMyHandler194 void loadSDP(const sp<ASessionDescription>& desc) {
195 looper()->registerHandler(mConn);
196 (1 ? mNetLooper : looper())->registerHandler(mRTPConn);
197
198 sp<AMessage> notify = new AMessage('biny', this);
199 mConn->observeBinaryData(notify);
200
201 sp<AMessage> reply = new AMessage('sdpl', this);
202 reply->setObject("description", desc);
203 mConn->connect(mOriginalSessionURL.c_str(), reply);
204 }
205
getControlURLMyHandler206 AString getControlURL() {
207 AString sessionLevelControlURL;
208 if (mSessionDesc->findAttribute(
209 0,
210 "a=control",
211 &sessionLevelControlURL)) {
212 if (sessionLevelControlURL.compare("*") == 0) {
213 return mBaseURL;
214 } else {
215 AString controlURL;
216 CHECK(MakeURL(
217 mBaseURL.c_str(),
218 sessionLevelControlURL.c_str(),
219 &controlURL));
220 return controlURL;
221 }
222 } else {
223 return mSessionURL;
224 }
225 }
226
disconnectMyHandler227 void disconnect() {
228 (new AMessage('abor', this))->post();
229 }
230
seekMyHandler231 void seek(int64_t timeUs) {
232 sp<AMessage> msg = new AMessage('seek', this);
233 msg->setInt64("time", timeUs);
234 mPauseGeneration++;
235 msg->post();
236 }
237
continueSeekAfterPauseMyHandler238 void continueSeekAfterPause(int64_t timeUs) {
239 sp<AMessage> msg = new AMessage('see1', this);
240 msg->setInt64("time", timeUs);
241 msg->post();
242 }
243
isSeekableMyHandler244 bool isSeekable() const {
245 return mSeekable;
246 }
247
pauseMyHandler248 void pause() {
249 sp<AMessage> msg = new AMessage('paus', this);
250 mPauseGeneration++;
251 msg->setInt32("pausecheck", mPauseGeneration);
252 msg->post();
253 }
254
resumeMyHandler255 void resume() {
256 sp<AMessage> msg = new AMessage('resu', this);
257 mPauseGeneration++;
258 msg->post();
259 }
260
getARTSPConnectionMyHandler261 sp<ARTSPConnection> getARTSPConnection() {
262 return mConn;
263 }
264
addRRMyHandler265 static void addRR(const sp<ABuffer> &buf) {
266 uint8_t *ptr = buf->data() + buf->size();
267 ptr[0] = 0x80 | 0;
268 ptr[1] = 201; // RR
269 ptr[2] = 0;
270 ptr[3] = 1;
271 ptr[4] = 0xde; // SSRC
272 ptr[5] = 0xad;
273 ptr[6] = 0xbe;
274 ptr[7] = 0xef;
275
276 buf->setRange(0, buf->size() + 8);
277 }
278
addSDESMyHandler279 static void addSDES(int s, const sp<ABuffer> &buffer) {
280 struct sockaddr_in addr;
281 socklen_t addrSize = sizeof(addr);
282 if (getsockname(s, (sockaddr *)&addr, &addrSize) != 0) {
283 inet_aton("0.0.0.0", &(addr.sin_addr));
284 }
285
286 uint8_t *data = buffer->data() + buffer->size();
287 data[0] = 0x80 | 1;
288 data[1] = 202; // SDES
289 data[4] = 0xde; // SSRC
290 data[5] = 0xad;
291 data[6] = 0xbe;
292 data[7] = 0xef;
293
294 size_t offset = 8;
295
296 data[offset++] = 1; // CNAME
297
298 AString cname = "stagefright@";
299 cname.append(inet_ntoa(addr.sin_addr));
300 data[offset++] = cname.size();
301
302 memcpy(&data[offset], cname.c_str(), cname.size());
303 offset += cname.size();
304
305 data[offset++] = 6; // TOOL
306
307 AString tool = MakeUserAgent();
308
309 data[offset++] = tool.size();
310
311 memcpy(&data[offset], tool.c_str(), tool.size());
312 offset += tool.size();
313
314 data[offset++] = 0;
315
316 if ((offset % 4) > 0) {
317 size_t count = 4 - (offset % 4);
318 switch (count) {
319 case 3:
320 data[offset++] = 0;
321 FALLTHROUGH_INTENDED;
322 case 2:
323 data[offset++] = 0;
324 FALLTHROUGH_INTENDED;
325 case 1:
326 data[offset++] = 0;
327 }
328 }
329
330 size_t numWords = (offset / 4) - 1;
331 data[2] = numWords >> 8;
332 data[3] = numWords & 0xff;
333
334 buffer->setRange(buffer->offset(), buffer->size() + offset);
335 }
336
337 // In case we're behind NAT, fire off two UDP packets to the remote
338 // rtp/rtcp ports to poke a hole into the firewall for future incoming
339 // packets. We're going to send an RR/SDES RTCP packet to both of them.
pokeAHoleMyHandler340 bool pokeAHole(int rtpSocket, int rtcpSocket, const AString &transport) {
341 struct sockaddr_in addr;
342 memset(addr.sin_zero, 0, sizeof(addr.sin_zero));
343 addr.sin_family = AF_INET;
344
345 AString source;
346 AString server_port;
347 if (!GetAttribute(transport.c_str(),
348 "source",
349 &source)) {
350 ALOGW("Missing 'source' field in Transport response. Using "
351 "RTSP endpoint address.");
352
353 struct hostent *ent = gethostbyname(mSessionHost.c_str());
354 if (ent == NULL) {
355 ALOGE("Failed to look up address of session host");
356
357 return false;
358 }
359
360 addr.sin_addr.s_addr = *(in_addr_t *)ent->h_addr;
361 } else {
362 addr.sin_addr.s_addr = inet_addr(source.c_str());
363 }
364
365 if (!GetAttribute(transport.c_str(),
366 "server_port",
367 &server_port)) {
368 ALOGI("Missing 'server_port' field in Transport response.");
369 return false;
370 }
371
372 int rtpPort, rtcpPort;
373 if (sscanf(server_port.c_str(), "%d-%d", &rtpPort, &rtcpPort) != 2
374 || rtpPort <= 0 || rtpPort > 65535
375 || rtcpPort <=0 || rtcpPort > 65535
376 || rtcpPort != rtpPort + 1) {
377 ALOGE("Server picked invalid RTP/RTCP port pair %s,"
378 " RTP port must be even, RTCP port must be one higher.",
379 server_port.c_str());
380
381 return false;
382 }
383
384 if (rtpPort & 1) {
385 ALOGW("Server picked an odd RTP port, it should've picked an "
386 "even one, we'll let it pass for now, but this may break "
387 "in the future.");
388 }
389
390 if (addr.sin_addr.s_addr == INADDR_NONE) {
391 return true;
392 }
393
394 if (IN_LOOPBACK(ntohl(addr.sin_addr.s_addr))) {
395 // No firewalls to traverse on the loopback interface.
396 return true;
397 }
398
399 // Make up an RR/SDES RTCP packet.
400 sp<ABuffer> buf = new ABuffer(65536);
401 buf->setRange(0, 0);
402 addRR(buf);
403 addSDES(rtpSocket, buf);
404
405 addr.sin_port = htons(rtpPort);
406
407 ssize_t n = sendto(
408 rtpSocket, buf->data(), buf->size(), 0,
409 (const sockaddr *)&addr, sizeof(addr));
410
411 if (n < (ssize_t)buf->size()) {
412 ALOGE("failed to poke a hole for RTP packets");
413 return false;
414 }
415
416 addr.sin_port = htons(rtcpPort);
417
418 n = sendto(
419 rtcpSocket, buf->data(), buf->size(), 0,
420 (const sockaddr *)&addr, sizeof(addr));
421
422 if (n < (ssize_t)buf->size()) {
423 ALOGE("failed to poke a hole for RTCP packets");
424 return false;
425 }
426
427 ALOGV("successfully poked holes.");
428
429 return true;
430 }
431
isLiveStreamMyHandler432 static bool isLiveStream(const sp<ASessionDescription> &desc) {
433 AString attrLiveStream;
434 if (desc->findAttribute(0, "a=LiveStream", &attrLiveStream)) {
435 ssize_t semicolonPos = attrLiveStream.find(";", 2);
436
437 const char* liveStreamValue;
438 if (semicolonPos < 0) {
439 liveStreamValue = attrLiveStream.c_str();
440 } else {
441 AString valString;
442 valString.setTo(attrLiveStream,
443 semicolonPos + 1,
444 attrLiveStream.size() - semicolonPos - 1);
445 liveStreamValue = valString.c_str();
446 }
447
448 uint32_t value = strtoul(liveStreamValue, NULL, 10);
449 if (value == 1) {
450 ALOGV("found live stream");
451 return true;
452 }
453 } else {
454 // It is a live stream if no duration is returned
455 int64_t durationUs;
456 if (!desc->getDurationUs(&durationUs)) {
457 ALOGV("No duration found, assume live stream");
458 return true;
459 }
460 }
461
462 return false;
463 }
464
onMessageReceivedMyHandler465 virtual void onMessageReceived(const sp<AMessage> &msg) {
466 switch (msg->what()) {
467 case 'conn':
468 {
469 int32_t result;
470 CHECK(msg->findInt32("result", &result));
471
472 ALOGI("connection request completed with result %d (%s)",
473 result, strerror(-result));
474
475 if (result == OK) {
476 AString request;
477 request = "DESCRIBE ";
478 request.append(mSessionURL);
479 request.append(" RTSP/1.0\r\n");
480 request.append("Accept: application/sdp\r\n");
481 request.append("\r\n");
482
483 sp<AMessage> reply = new AMessage('desc', this);
484 mConn->sendRequest(request.c_str(), reply);
485 } else {
486 (new AMessage('disc', this))->post();
487 }
488 break;
489 }
490
491 case 'disc':
492 {
493 ++mKeepAliveGeneration;
494
495 int32_t reconnect;
496 if (msg->findInt32("reconnect", &reconnect) && reconnect) {
497 sp<AMessage> reply = new AMessage('conn', this);
498 mConn->connect(mOriginalSessionURL.c_str(), reply);
499 } else {
500 (new AMessage('quit', this))->post();
501 }
502 break;
503 }
504
505 case 'desc':
506 {
507 int32_t result;
508 CHECK(msg->findInt32("result", &result));
509
510 ALOGI("DESCRIBE completed with result %d (%s)",
511 result, strerror(-result));
512
513 if (result == OK) {
514 sp<RefBase> obj;
515 CHECK(msg->findObject("response", &obj));
516 sp<ARTSPResponse> response =
517 static_cast<ARTSPResponse *>(obj.get());
518
519 if (response->mStatusCode == 301 || response->mStatusCode == 302) {
520 ssize_t i = response->mHeaders.indexOfKey("location");
521 CHECK_GE(i, 0);
522
523 mOriginalSessionURL = response->mHeaders.valueAt(i);
524 mSessionURL = mOriginalSessionURL;
525
526 // Strip any authentication info from the session url, we don't
527 // want to transmit user/pass in cleartext.
528 AString host, path, user, pass;
529 unsigned port;
530 if (ARTSPConnection::ParseURL(
531 mSessionURL.c_str(), &host, &port, &path, &user, &pass)
532 && user.size() > 0) {
533 mSessionURL.clear();
534 mSessionURL.append("rtsp://");
535 mSessionURL.append(host);
536 mSessionURL.append(":");
537 mSessionURL.append(AStringPrintf("%u", port));
538 mSessionURL.append(path);
539
540 ALOGV("rewritten session url: '%s'", mSessionURL.c_str());
541 }
542
543 sp<AMessage> reply = new AMessage('conn', this);
544 mConn->connect(mOriginalSessionURL.c_str(), reply);
545 break;
546 }
547
548 if (response->mStatusCode != 200) {
549 result = UNKNOWN_ERROR;
550 } else if (response->mContent == NULL) {
551 result = ERROR_MALFORMED;
552 ALOGE("The response has no content.");
553 } else {
554 mSessionDesc = new ASessionDescription;
555
556 mSessionDesc->setTo(
557 response->mContent->data(),
558 response->mContent->size());
559
560 if (!mSessionDesc->isValid()) {
561 ALOGE("Failed to parse session description.");
562 result = ERROR_MALFORMED;
563 } else {
564 ssize_t i = response->mHeaders.indexOfKey("content-base");
565 if (i >= 0) {
566 mBaseURL = response->mHeaders.valueAt(i);
567 } else {
568 i = response->mHeaders.indexOfKey("content-location");
569 if (i >= 0) {
570 mBaseURL = response->mHeaders.valueAt(i);
571 } else {
572 mBaseURL = mSessionURL;
573 }
574 }
575
576 mSeekable = !isLiveStream(mSessionDesc);
577
578 if (!mBaseURL.startsWith("rtsp://")) {
579 // Some misbehaving servers specify a relative
580 // URL in one of the locations above, combine
581 // it with the absolute session URL to get
582 // something usable...
583
584 ALOGW("Server specified a non-absolute base URL"
585 ", combining it with the session URL to "
586 "get something usable...");
587
588 AString tmp;
589 CHECK(MakeURL(
590 mSessionURL.c_str(),
591 mBaseURL.c_str(),
592 &tmp));
593
594 mBaseURL = tmp;
595 }
596
597 mControlURL = getControlURL();
598
599 if (mSessionDesc->countTracks() < 2) {
600 // There's no actual tracks in this session.
601 // The first "track" is merely session meta
602 // data.
603
604 ALOGW("Session doesn't contain any playable "
605 "tracks. Aborting.");
606 result = ERROR_UNSUPPORTED;
607 } else {
608 setupTrack(1);
609 }
610 }
611 }
612 }
613
614 if (result != OK) {
615 sp<AMessage> reply = new AMessage('disc', this);
616 mConn->disconnect(reply);
617 }
618 break;
619 }
620
621 case 'sdpl':
622 {
623 int32_t result;
624 CHECK(msg->findInt32("result", &result));
625
626 ALOGI("SDP connection request completed with result %d (%s)",
627 result, strerror(-result));
628
629 if (result == OK) {
630 sp<RefBase> obj;
631 CHECK(msg->findObject("description", &obj));
632 mSessionDesc =
633 static_cast<ASessionDescription *>(obj.get());
634
635 if (!mSessionDesc->isValid()) {
636 ALOGE("Failed to parse session description.");
637 result = ERROR_MALFORMED;
638 } else {
639 mBaseURL = mSessionURL;
640
641 mSeekable = !isLiveStream(mSessionDesc);
642
643 mControlURL = getControlURL();
644
645 if (mSessionDesc->countTracks() < 2) {
646 // There's no actual tracks in this session.
647 // The first "track" is merely session meta
648 // data.
649
650 ALOGW("Session doesn't contain any playable "
651 "tracks. Aborting.");
652 result = ERROR_UNSUPPORTED;
653 } else {
654 setupTrack(1);
655 }
656 }
657 }
658
659 if (result != OK) {
660 sp<AMessage> reply = new AMessage('disc', this);
661 mConn->disconnect(reply);
662 }
663 break;
664 }
665
666 case 'setu':
667 {
668 size_t index;
669 CHECK(msg->findSize("index", &index));
670
671 TrackInfo *track = NULL;
672 size_t trackIndex;
673 if (msg->findSize("track-index", &trackIndex)) {
674 track = &mTracks.editItemAt(trackIndex);
675 }
676
677 int32_t result;
678 CHECK(msg->findInt32("result", &result));
679
680 ALOGI("SETUP(%zu) completed with result %d (%s)",
681 index, result, strerror(-result));
682
683 if (result == OK) {
684 CHECK(track != NULL);
685
686 sp<RefBase> obj;
687 CHECK(msg->findObject("response", &obj));
688 sp<ARTSPResponse> response =
689 static_cast<ARTSPResponse *>(obj.get());
690
691 if (response->mStatusCode != 200) {
692 result = UNKNOWN_ERROR;
693 } else {
694 ssize_t i = response->mHeaders.indexOfKey("session");
695 CHECK_GE(i, 0);
696
697 mSessionID = response->mHeaders.valueAt(i);
698
699 mKeepAliveTimeoutUs = kDefaultKeepAliveTimeoutUs;
700 AString timeoutStr;
701 if (GetAttribute(
702 mSessionID.c_str(), "timeout", &timeoutStr)) {
703 char *end;
704 unsigned long timeoutSecs =
705 strtoul(timeoutStr.c_str(), &end, 10);
706
707 if (end == timeoutStr.c_str() || *end != '\0') {
708 ALOGW("server specified malformed timeout '%s'",
709 timeoutStr.c_str());
710
711 mKeepAliveTimeoutUs = kDefaultKeepAliveTimeoutUs;
712 } else if (timeoutSecs < 15) {
713 ALOGW("server specified too short a timeout "
714 "(%lu secs), using default.",
715 timeoutSecs);
716
717 mKeepAliveTimeoutUs = kDefaultKeepAliveTimeoutUs;
718 } else {
719 mKeepAliveTimeoutUs = timeoutSecs * 1000000ll;
720
721 ALOGI("server specified timeout of %lu secs.",
722 timeoutSecs);
723 }
724 }
725
726 i = mSessionID.find(";");
727 if (i >= 0) {
728 // Remove options, i.e. ";timeout=90"
729 mSessionID.erase(i, mSessionID.size() - i);
730 }
731
732 sp<AMessage> notify = new AMessage('accu', this);
733 notify->setSize("track-index", trackIndex);
734
735 i = response->mHeaders.indexOfKey("transport");
736 CHECK_GE(i, 0);
737
738 if (track->mRTPSocket != -1 && track->mRTCPSocket != -1) {
739 if (!track->mUsingInterleavedTCP) {
740 AString transport = response->mHeaders.valueAt(i);
741
742 // We are going to continue even if we were
743 // unable to poke a hole into the firewall...
744 pokeAHole(
745 track->mRTPSocket,
746 track->mRTCPSocket,
747 transport);
748 }
749
750 mRTPConn->addStream(
751 track->mRTPSocket, track->mRTCPSocket,
752 mSessionDesc, index,
753 notify, track->mUsingInterleavedTCP);
754
755 mSetupTracksSuccessful = true;
756 } else {
757 result = BAD_VALUE;
758 }
759 }
760 }
761
762 if (result != OK) {
763 if (track) {
764 if (!track->mUsingInterleavedTCP) {
765 // Clear the tag
766 if (mUIDValid) {
767 NetworkUtils::UnRegisterSocketUserTag(track->mRTPSocket);
768 NetworkUtils::UnRegisterSocketUserMark(track->mRTPSocket);
769 NetworkUtils::UnRegisterSocketUserTag(track->mRTCPSocket);
770 NetworkUtils::UnRegisterSocketUserMark(track->mRTCPSocket);
771 }
772
773 close(track->mRTPSocket);
774 close(track->mRTCPSocket);
775 }
776
777 mTracks.removeItemsAt(trackIndex);
778 }
779 }
780
781 ++index;
782 if (result == OK && index < mSessionDesc->countTracks()) {
783 setupTrack(index);
784 } else if (mSetupTracksSuccessful) {
785 ++mKeepAliveGeneration;
786 postKeepAlive();
787
788 AString request = "PLAY ";
789 request.append(mControlURL);
790 request.append(" RTSP/1.0\r\n");
791
792 request.append("Session: ");
793 request.append(mSessionID);
794 request.append("\r\n");
795
796 request.append("\r\n");
797
798 sp<AMessage> reply = new AMessage('play', this);
799 mConn->sendRequest(request.c_str(), reply);
800 } else {
801 sp<AMessage> reply = new AMessage('disc', this);
802 mConn->disconnect(reply);
803 }
804 break;
805 }
806
807 case 'play':
808 {
809 int32_t result;
810 CHECK(msg->findInt32("result", &result));
811
812 ALOGI("PLAY completed with result %d (%s)",
813 result, strerror(-result));
814
815 if (result == OK) {
816 sp<RefBase> obj;
817 CHECK(msg->findObject("response", &obj));
818 sp<ARTSPResponse> response =
819 static_cast<ARTSPResponse *>(obj.get());
820
821 if (response->mStatusCode != 200) {
822 result = UNKNOWN_ERROR;
823 } else {
824 parsePlayResponse(response);
825 postTimeout();
826 }
827 }
828
829 if (result != OK) {
830 sp<AMessage> reply = new AMessage('disc', this);
831 mConn->disconnect(reply);
832 }
833
834 break;
835 }
836
837 case 'aliv':
838 {
839 int32_t generation;
840 CHECK(msg->findInt32("generation", &generation));
841
842 if (generation != mKeepAliveGeneration) {
843 // obsolete event.
844 break;
845 }
846
847 AString request;
848 request.append("OPTIONS ");
849 request.append(mSessionURL);
850 request.append(" RTSP/1.0\r\n");
851 request.append("Session: ");
852 request.append(mSessionID);
853 request.append("\r\n");
854 request.append("\r\n");
855
856 sp<AMessage> reply = new AMessage('opts', this);
857 reply->setInt32("generation", mKeepAliveGeneration);
858 mConn->sendRequest(request.c_str(), reply);
859 break;
860 }
861
862 case 'opts':
863 {
864 int32_t result;
865 CHECK(msg->findInt32("result", &result));
866
867 ALOGI("OPTIONS completed with result %d (%s)",
868 result, strerror(-result));
869
870 int32_t generation;
871 CHECK(msg->findInt32("generation", &generation));
872
873 if (generation != mKeepAliveGeneration) {
874 // obsolete event.
875 break;
876 }
877
878 postKeepAlive();
879 break;
880 }
881
882 case 'abor':
883 {
884 for (size_t i = 0; i < mTracks.size(); ++i) {
885 TrackInfo *info = &mTracks.editItemAt(i);
886
887 if (!mFirstAccessUnit) {
888 postQueueEOS(i, ERROR_END_OF_STREAM);
889 }
890
891 if (!info->mUsingInterleavedTCP) {
892 mRTPConn->removeStream(info->mRTPSocket, info->mRTCPSocket);
893
894 // Clear the tag
895 if (mUIDValid) {
896 NetworkUtils::UnRegisterSocketUserTag(info->mRTPSocket);
897 NetworkUtils::UnRegisterSocketUserMark(info->mRTPSocket);
898 NetworkUtils::UnRegisterSocketUserTag(info->mRTCPSocket);
899 NetworkUtils::UnRegisterSocketUserMark(info->mRTCPSocket);
900 }
901
902 close(info->mRTPSocket);
903 close(info->mRTCPSocket);
904 }
905 }
906 mTracks.clear();
907 mSetupTracksSuccessful = false;
908 mSeekPending = false;
909 mFirstAccessUnit = true;
910 mAllTracksHaveTime = false;
911 mNTPAnchorUs = -1;
912 mMediaAnchorUs = -1;
913 mNumAccessUnitsReceived = 0;
914 mReceivedFirstRTCPPacket = false;
915 mReceivedFirstRTPPacket = false;
916 mPausing = false;
917 mSeekable = true;
918
919 sp<AMessage> reply = new AMessage('tear', this);
920
921 int32_t reconnect;
922 if (msg->findInt32("reconnect", &reconnect) && reconnect) {
923 reply->setInt32("reconnect", true);
924 }
925
926 AString request;
927 request = "TEARDOWN ";
928
929 // XXX should use aggregate url from SDP here...
930 request.append(mSessionURL);
931 request.append(" RTSP/1.0\r\n");
932
933 request.append("Session: ");
934 request.append(mSessionID);
935 request.append("\r\n");
936
937 request.append("\r\n");
938
939 mConn->sendRequest(request.c_str(), reply);
940
941 // If the response of teardown hasn't been received in 3 seconds,
942 // post 'tear' message to avoid ANR.
943 if (!msg->findInt32("reconnect", &reconnect) || !reconnect) {
944 sp<AMessage> teardown = reply->dup();
945 teardown->setInt32("result", -ECONNABORTED);
946 teardown->post(kTearDownTimeoutUs);
947 }
948 break;
949 }
950
951 case 'tear':
952 {
953 int32_t result;
954 CHECK(msg->findInt32("result", &result));
955
956 ALOGI("TEARDOWN completed with result %d (%s)",
957 result, strerror(-result));
958
959 sp<AMessage> reply = new AMessage('disc', this);
960
961 int32_t reconnect;
962 if (msg->findInt32("reconnect", &reconnect) && reconnect) {
963 reply->setInt32("reconnect", true);
964 }
965
966 mConn->disconnect(reply);
967 break;
968 }
969
970 case 'quit':
971 {
972 sp<AMessage> msg = mNotify->dup();
973 msg->setInt32("what", kWhatDisconnected);
974 msg->setInt32("result", UNKNOWN_ERROR);
975 msg->post();
976 break;
977 }
978
979 case 'chek':
980 {
981 int32_t generation;
982 CHECK(msg->findInt32("generation", &generation));
983 if (generation != mCheckGeneration) {
984 // This is an outdated message. Ignore.
985 break;
986 }
987
988 if (mNumAccessUnitsReceived == 0) {
989 #if 1
990 ALOGI("stream ended? aborting.");
991 (new AMessage('abor', this))->post();
992 break;
993 #else
994 ALOGI("haven't seen an AU in a looong time.");
995 #endif
996 }
997
998 mNumAccessUnitsReceived = 0;
999 msg->post(kAccessUnitTimeoutUs);
1000 break;
1001 }
1002
1003 case 'accu':
1004 {
1005 if (mSeekPending) {
1006 ALOGV("Stale access unit.");
1007 break;
1008 }
1009
1010 int32_t timeUpdate;
1011 if (msg->findInt32("time-update", &timeUpdate) && timeUpdate) {
1012 size_t trackIndex;
1013 CHECK(msg->findSize("track-index", &trackIndex));
1014
1015 uint32_t rtpTime;
1016 uint64_t ntpTime;
1017 CHECK(msg->findInt32("rtp-time", (int32_t *)&rtpTime));
1018 CHECK(msg->findInt64("ntp-time", (int64_t *)&ntpTime));
1019
1020 onTimeUpdate(trackIndex, rtpTime, ntpTime);
1021 break;
1022 }
1023
1024 int32_t first;
1025 if (msg->findInt32("first-rtcp", &first)) {
1026 mReceivedFirstRTCPPacket = true;
1027 break;
1028 }
1029
1030 if (msg->findInt32("first-rtp", &first)) {
1031 mReceivedFirstRTPPacket = true;
1032 break;
1033 }
1034
1035 ++mNumAccessUnitsReceived;
1036 postAccessUnitTimeoutCheck();
1037
1038 size_t trackIndex;
1039 CHECK(msg->findSize("track-index", &trackIndex));
1040
1041 if (trackIndex >= mTracks.size()) {
1042 ALOGV("late packets ignored.");
1043 break;
1044 }
1045
1046 TrackInfo *track = &mTracks.editItemAt(trackIndex);
1047
1048 int32_t eos;
1049 if (msg->findInt32("eos", &eos)) {
1050 ALOGI("received BYE on track index %zu", trackIndex);
1051 if (!mAllTracksHaveTime && dataReceivedOnAllChannels()) {
1052 ALOGI("No time established => fake existing data");
1053
1054 track->mEOSReceived = true;
1055 mTryFakeRTCP = true;
1056 mReceivedFirstRTCPPacket = true;
1057 fakeTimestamps();
1058 } else {
1059 postQueueEOS(trackIndex, ERROR_END_OF_STREAM);
1060 }
1061 return;
1062 }
1063
1064 if (mSeekPending) {
1065 ALOGV("we're seeking, dropping stale packet.");
1066 break;
1067 }
1068
1069 sp<ABuffer> accessUnit;
1070 CHECK(msg->findBuffer("access-unit", &accessUnit));
1071 onAccessUnitComplete(trackIndex, accessUnit);
1072 break;
1073 }
1074
1075 case 'paus':
1076 {
1077 int32_t generation;
1078 CHECK(msg->findInt32("pausecheck", &generation));
1079 if (generation != mPauseGeneration) {
1080 ALOGV("Ignoring outdated pause message.");
1081 break;
1082 }
1083
1084 if (!mSeekable) {
1085 ALOGW("This is a live stream, ignoring pause request.");
1086 break;
1087 }
1088
1089 if (mPausing) {
1090 ALOGV("This stream is already paused.");
1091 break;
1092 }
1093
1094 mCheckPending = true;
1095 ++mCheckGeneration;
1096 mPausing = true;
1097
1098 AString request = "PAUSE ";
1099 request.append(mControlURL);
1100 request.append(" RTSP/1.0\r\n");
1101
1102 request.append("Session: ");
1103 request.append(mSessionID);
1104 request.append("\r\n");
1105
1106 request.append("\r\n");
1107
1108 sp<AMessage> reply = new AMessage('pau2', this);
1109 mConn->sendRequest(request.c_str(), reply);
1110 break;
1111 }
1112
1113 case 'pau2':
1114 {
1115 int32_t result;
1116 CHECK(msg->findInt32("result", &result));
1117 mCheckTimeoutGeneration++;
1118
1119 ALOGI("PAUSE completed with result %d (%s)",
1120 result, strerror(-result));
1121 break;
1122 }
1123
1124 case 'resu':
1125 {
1126 if (mPausing && mSeekPending) {
1127 // If seeking, Play will be sent from see1 instead
1128 break;
1129 }
1130
1131 if (!mPausing) {
1132 // Dont send PLAY if we have not paused
1133 break;
1134 }
1135 AString request = "PLAY ";
1136 request.append(mControlURL);
1137 request.append(" RTSP/1.0\r\n");
1138
1139 request.append("Session: ");
1140 request.append(mSessionID);
1141 request.append("\r\n");
1142
1143 request.append("\r\n");
1144
1145 sp<AMessage> reply = new AMessage('res2', this);
1146 mConn->sendRequest(request.c_str(), reply);
1147 break;
1148 }
1149
1150 case 'res2':
1151 {
1152 int32_t result;
1153 CHECK(msg->findInt32("result", &result));
1154
1155 ALOGI("PLAY (for resume) completed with result %d (%s)",
1156 result, strerror(-result));
1157
1158 mCheckPending = false;
1159 ++mCheckGeneration;
1160 postAccessUnitTimeoutCheck();
1161
1162 if (result == OK) {
1163 sp<RefBase> obj;
1164 CHECK(msg->findObject("response", &obj));
1165 sp<ARTSPResponse> response =
1166 static_cast<ARTSPResponse *>(obj.get());
1167
1168 if (response->mStatusCode != 200) {
1169 result = UNKNOWN_ERROR;
1170 } else {
1171 parsePlayResponse(response);
1172
1173 // Post new timeout in order to make sure to use
1174 // fake timestamps if no new Sender Reports arrive
1175 postTimeout();
1176 }
1177 }
1178
1179 if (result != OK) {
1180 ALOGE("resume failed, aborting.");
1181 (new AMessage('abor', this))->post();
1182 }
1183
1184 mPausing = false;
1185 break;
1186 }
1187
1188 case 'seek':
1189 {
1190 if (!mSeekable) {
1191 ALOGW("This is a live stream, ignoring seek request.");
1192
1193 sp<AMessage> msg = mNotify->dup();
1194 msg->setInt32("what", kWhatSeekDone);
1195 msg->post();
1196 break;
1197 }
1198
1199 int64_t timeUs;
1200 CHECK(msg->findInt64("time", &timeUs));
1201
1202 mSeekPending = true;
1203
1204 // Disable the access unit timeout until we resumed
1205 // playback again.
1206 mCheckPending = true;
1207 ++mCheckGeneration;
1208
1209 sp<AMessage> reply = new AMessage('see0', this);
1210 reply->setInt64("time", timeUs);
1211
1212 if (mPausing) {
1213 // PAUSE already sent
1214 ALOGI("Pause already sent");
1215 reply->post();
1216 break;
1217 }
1218 AString request = "PAUSE ";
1219 request.append(mControlURL);
1220 request.append(" RTSP/1.0\r\n");
1221
1222 request.append("Session: ");
1223 request.append(mSessionID);
1224 request.append("\r\n");
1225
1226 request.append("\r\n");
1227
1228 mConn->sendRequest(request.c_str(), reply);
1229 break;
1230 }
1231
1232 case 'see0':
1233 {
1234 // Session is paused now.
1235 status_t err = OK;
1236 msg->findInt32("result", &err);
1237
1238 int64_t timeUs;
1239 CHECK(msg->findInt64("time", &timeUs));
1240
1241 sp<AMessage> notify = mNotify->dup();
1242 notify->setInt32("what", kWhatSeekPaused);
1243 notify->setInt32("err", err);
1244 notify->setInt64("time", timeUs);
1245 notify->post();
1246 break;
1247
1248 }
1249
1250 case 'see1':
1251 {
1252 for (size_t i = 0; i < mTracks.size(); ++i) {
1253 TrackInfo *info = &mTracks.editItemAt(i);
1254
1255 postQueueSeekDiscontinuity(i);
1256 info->mEOSReceived = false;
1257
1258 info->mRTPAnchor = 0;
1259 info->mNTPAnchorUs = -1;
1260 }
1261
1262 mAllTracksHaveTime = false;
1263 mNTPAnchorUs = -1;
1264
1265 // Start new timeoutgeneration to avoid getting timeout
1266 // before PLAY response arrive
1267 postTimeout();
1268
1269 int64_t timeUs;
1270 CHECK(msg->findInt64("time", &timeUs));
1271
1272 AString request = "PLAY ";
1273 request.append(mControlURL);
1274 request.append(" RTSP/1.0\r\n");
1275
1276 request.append("Session: ");
1277 request.append(mSessionID);
1278 request.append("\r\n");
1279
1280 request.append(
1281 AStringPrintf(
1282 "Range: npt=%lld-\r\n", timeUs / 1000000ll));
1283
1284 request.append("\r\n");
1285
1286 sp<AMessage> reply = new AMessage('see2', this);
1287 mConn->sendRequest(request.c_str(), reply);
1288 break;
1289 }
1290
1291 case 'see2':
1292 {
1293 if (mTracks.size() == 0) {
1294 // We have already hit abor, break
1295 break;
1296 }
1297
1298 int32_t result;
1299 CHECK(msg->findInt32("result", &result));
1300
1301 ALOGI("PLAY (for seek) completed with result %d (%s)",
1302 result, strerror(-result));
1303
1304 mCheckPending = false;
1305 ++mCheckGeneration;
1306 postAccessUnitTimeoutCheck();
1307
1308 if (result == OK) {
1309 sp<RefBase> obj;
1310 CHECK(msg->findObject("response", &obj));
1311 sp<ARTSPResponse> response =
1312 static_cast<ARTSPResponse *>(obj.get());
1313
1314 if (response->mStatusCode != 200) {
1315 result = UNKNOWN_ERROR;
1316 } else {
1317 parsePlayResponse(response);
1318
1319 // Post new timeout in order to make sure to use
1320 // fake timestamps if no new Sender Reports arrive
1321 postTimeout();
1322
1323 ssize_t i = response->mHeaders.indexOfKey("rtp-info");
1324 CHECK_GE(i, 0);
1325
1326 ALOGV("rtp-info: %s", response->mHeaders.valueAt(i).c_str());
1327
1328 ALOGI("seek completed.");
1329 }
1330 }
1331
1332 if (result != OK) {
1333 ALOGE("seek failed, aborting.");
1334 (new AMessage('abor', this))->post();
1335 }
1336
1337 mPausing = false;
1338 mSeekPending = false;
1339
1340 // Discard all stale access units.
1341 for (size_t i = 0; i < mTracks.size(); ++i) {
1342 TrackInfo *track = &mTracks.editItemAt(i);
1343 track->mPackets.clear();
1344 }
1345
1346 sp<AMessage> msg = mNotify->dup();
1347 msg->setInt32("what", kWhatSeekDone);
1348 msg->post();
1349 break;
1350 }
1351
1352 case 'biny':
1353 {
1354 sp<ABuffer> buffer;
1355 CHECK(msg->findBuffer("buffer", &buffer));
1356
1357 int32_t index;
1358 CHECK(buffer->meta()->findInt32("index", &index));
1359
1360 mRTPConn->injectPacket(index, buffer);
1361 break;
1362 }
1363
1364 case 'tiou':
1365 {
1366 int32_t timeoutGenerationCheck;
1367 CHECK(msg->findInt32("tioucheck", &timeoutGenerationCheck));
1368 if (timeoutGenerationCheck != mCheckTimeoutGeneration) {
1369 // This is an outdated message. Ignore.
1370 // This typically happens if a lot of seeks are
1371 // performed, since new timeout messages now are
1372 // posted at seek as well.
1373 break;
1374 }
1375 if (!mReceivedFirstRTCPPacket) {
1376 if (dataReceivedOnAllChannels() && !mTryFakeRTCP) {
1377 ALOGW("We received RTP packets but no RTCP packets, "
1378 "using fake timestamps.");
1379
1380 mTryFakeRTCP = true;
1381
1382 mReceivedFirstRTCPPacket = true;
1383
1384 fakeTimestamps();
1385 } else if (!mReceivedFirstRTPPacket && !mTryTCPInterleaving) {
1386 ALOGW("Never received any data, switching transports.");
1387
1388 mTryTCPInterleaving = true;
1389
1390 sp<AMessage> msg = new AMessage('abor', this);
1391 msg->setInt32("reconnect", true);
1392 msg->post();
1393 } else {
1394 ALOGW("Never received any data, disconnecting.");
1395 (new AMessage('abor', this))->post();
1396 }
1397 } else {
1398 if (!mAllTracksHaveTime) {
1399 ALOGW("We received some RTCP packets, but time "
1400 "could not be established on all tracks, now "
1401 "using fake timestamps");
1402
1403 fakeTimestamps();
1404 }
1405 }
1406 break;
1407 }
1408
1409 default:
1410 TRESPASS();
1411 break;
1412 }
1413 }
1414
postKeepAliveMyHandler1415 void postKeepAlive() {
1416 sp<AMessage> msg = new AMessage('aliv', this);
1417 msg->setInt32("generation", mKeepAliveGeneration);
1418 msg->post((mKeepAliveTimeoutUs * 9) / 10);
1419 }
1420
cancelAccessUnitTimeoutCheckMyHandler1421 void cancelAccessUnitTimeoutCheck() {
1422 ALOGV("cancelAccessUnitTimeoutCheck");
1423 ++mCheckGeneration;
1424 }
1425
postAccessUnitTimeoutCheckMyHandler1426 void postAccessUnitTimeoutCheck() {
1427 if (mCheckPending) {
1428 return;
1429 }
1430
1431 mCheckPending = true;
1432 sp<AMessage> check = new AMessage('chek', this);
1433 check->setInt32("generation", mCheckGeneration);
1434 check->post(kAccessUnitTimeoutUs);
1435 }
1436
SplitStringMyHandler1437 static void SplitString(
1438 const AString &s, const char *separator, List<AString> *items) {
1439 items->clear();
1440 size_t start = 0;
1441 while (start < s.size()) {
1442 ssize_t offset = s.find(separator, start);
1443
1444 if (offset < 0) {
1445 items->push_back(AString(s, start, s.size() - start));
1446 break;
1447 }
1448
1449 items->push_back(AString(s, start, offset - start));
1450 start = offset + strlen(separator);
1451 }
1452 }
1453
parsePlayResponseMyHandler1454 void parsePlayResponse(const sp<ARTSPResponse> &response) {
1455 mPlayResponseParsed = true;
1456 if (mTracks.size() == 0) {
1457 ALOGV("parsePlayResponse: late packets ignored.");
1458 return;
1459 }
1460
1461 ssize_t i = response->mHeaders.indexOfKey("range");
1462 if (i < 0) {
1463 // Server doesn't even tell use what range it is going to
1464 // play, therefore we won't support seeking.
1465 return;
1466 }
1467
1468 AString range = response->mHeaders.valueAt(i);
1469 ALOGV("Range: %s", range.c_str());
1470
1471 AString val;
1472 CHECK(GetAttribute(range.c_str(), "npt", &val));
1473
1474 float npt1, npt2;
1475 if (!ASessionDescription::parseNTPRange(val.c_str(), &npt1, &npt2)) {
1476 // This is a live stream and therefore not seekable.
1477
1478 ALOGI("This is a live stream");
1479 return;
1480 }
1481
1482 i = response->mHeaders.indexOfKey("rtp-info");
1483 CHECK_GE(i, 0);
1484
1485 AString rtpInfo = response->mHeaders.valueAt(i);
1486 List<AString> streamInfos;
1487 SplitString(rtpInfo, ",", &streamInfos);
1488
1489 int n = 1;
1490 for (List<AString>::iterator it = streamInfos.begin();
1491 it != streamInfos.end(); ++it) {
1492 (*it).trim();
1493 ALOGV("streamInfo[%d] = %s", n, (*it).c_str());
1494
1495 CHECK(GetAttribute((*it).c_str(), "url", &val));
1496
1497 size_t trackIndex = 0;
1498 while (trackIndex < mTracks.size()
1499 && !(val == mTracks.editItemAt(trackIndex).mURL)) {
1500 ++trackIndex;
1501 }
1502 CHECK_LT(trackIndex, mTracks.size());
1503
1504 CHECK(GetAttribute((*it).c_str(), "seq", &val));
1505
1506 char *end;
1507 unsigned long seq = strtoul(val.c_str(), &end, 10);
1508
1509 TrackInfo *info = &mTracks.editItemAt(trackIndex);
1510 info->mFirstSeqNumInSegment = seq;
1511 info->mNewSegment = true;
1512 info->mAllowedStaleAccessUnits = kMaxAllowedStaleAccessUnits;
1513
1514 CHECK(GetAttribute((*it).c_str(), "rtptime", &val));
1515
1516 uint32_t rtpTime = strtoul(val.c_str(), &end, 10);
1517
1518 ALOGV("track #%d: rtpTime=%u <=> npt=%.2f", n, rtpTime, npt1);
1519
1520 info->mNormalPlayTimeRTP = rtpTime;
1521 info->mNormalPlayTimeUs = (int64_t)(npt1 * 1E6);
1522
1523 if (!mFirstAccessUnit) {
1524 postNormalPlayTimeMapping(
1525 trackIndex,
1526 info->mNormalPlayTimeRTP, info->mNormalPlayTimeUs);
1527 }
1528
1529 ++n;
1530 }
1531 }
1532
getTrackFormatMyHandler1533 sp<MetaData> getTrackFormat(size_t index, int32_t *timeScale) {
1534 CHECK_GE(index, 0u);
1535 CHECK_LT(index, mTracks.size());
1536
1537 const TrackInfo &info = mTracks.itemAt(index);
1538
1539 *timeScale = info.mTimeScale;
1540
1541 return info.mPacketSource->getFormat();
1542 }
1543
countTracksMyHandler1544 size_t countTracks() const {
1545 return mTracks.size();
1546 }
1547
1548 private:
1549 struct TrackInfo {
1550 AString mURL;
1551 int mRTPSocket;
1552 int mRTCPSocket;
1553 bool mUsingInterleavedTCP;
1554 uint32_t mFirstSeqNumInSegment;
1555 bool mNewSegment;
1556 int32_t mAllowedStaleAccessUnits;
1557
1558 uint32_t mRTPAnchor;
1559 int64_t mNTPAnchorUs;
1560 int32_t mTimeScale;
1561 bool mEOSReceived;
1562
1563 uint32_t mNormalPlayTimeRTP;
1564 int64_t mNormalPlayTimeUs;
1565
1566 sp<APacketSource> mPacketSource;
1567
1568 // Stores packets temporarily while no notion of time
1569 // has been established yet.
1570 List<sp<ABuffer> > mPackets;
1571 };
1572
1573 sp<AMessage> mNotify;
1574 bool mUIDValid;
1575 uid_t mUID;
1576 sp<ALooper> mNetLooper;
1577 sp<ARTSPConnection> mConn;
1578 sp<ARTPConnection> mRTPConn;
1579 sp<ASessionDescription> mSessionDesc;
1580 AString mOriginalSessionURL; // This one still has user:pass@
1581 AString mSessionURL;
1582 AString mSessionHost;
1583 AString mBaseURL;
1584 AString mControlURL;
1585 AString mSessionID;
1586 bool mSetupTracksSuccessful;
1587 bool mSeekPending;
1588 bool mFirstAccessUnit;
1589
1590 bool mAllTracksHaveTime;
1591 int64_t mNTPAnchorUs;
1592 int64_t mMediaAnchorUs;
1593 int64_t mLastMediaTimeUs;
1594
1595 int64_t mNumAccessUnitsReceived;
1596 bool mCheckPending;
1597 int32_t mCheckGeneration;
1598 int32_t mCheckTimeoutGeneration;
1599 bool mTryTCPInterleaving;
1600 bool mTryFakeRTCP;
1601 bool mReceivedFirstRTCPPacket;
1602 bool mReceivedFirstRTPPacket;
1603 bool mSeekable;
1604 int64_t mKeepAliveTimeoutUs;
1605 int32_t mKeepAliveGeneration;
1606 bool mPausing;
1607 int32_t mPauseGeneration;
1608
1609 Vector<TrackInfo> mTracks;
1610
1611 bool mPlayResponseParsed;
1612
setupTrackMyHandler1613 void setupTrack(size_t index) {
1614 sp<APacketSource> source =
1615 new APacketSource(mSessionDesc, index);
1616
1617 if (source->initCheck() != OK) {
1618 ALOGW("Unsupported format. Ignoring track #%zu.", index);
1619
1620 sp<AMessage> reply = new AMessage('setu', this);
1621 reply->setSize("index", index);
1622 reply->setInt32("result", ERROR_UNSUPPORTED);
1623 reply->post();
1624 return;
1625 }
1626
1627 AString url;
1628 CHECK(mSessionDesc->findAttribute(index, "a=control", &url));
1629
1630 AString trackURL;
1631 CHECK(MakeURL(mBaseURL.c_str(), url.c_str(), &trackURL));
1632
1633 mTracks.push(TrackInfo());
1634 TrackInfo *info = &mTracks.editItemAt(mTracks.size() - 1);
1635 info->mURL = trackURL;
1636 info->mPacketSource = source;
1637 info->mUsingInterleavedTCP = false;
1638 info->mFirstSeqNumInSegment = 0;
1639 info->mNewSegment = true;
1640 info->mAllowedStaleAccessUnits = kMaxAllowedStaleAccessUnits;
1641 info->mRTPSocket = -1;
1642 info->mRTCPSocket = -1;
1643 info->mRTPAnchor = 0;
1644 info->mNTPAnchorUs = -1;
1645 info->mNormalPlayTimeRTP = 0;
1646 info->mNormalPlayTimeUs = 0ll;
1647
1648 unsigned long PT;
1649 AString formatDesc;
1650 AString formatParams;
1651 mSessionDesc->getFormatType(index, &PT, &formatDesc, &formatParams);
1652
1653 int32_t timescale;
1654 int32_t numChannels;
1655 ASessionDescription::ParseFormatDesc(
1656 formatDesc.c_str(), ×cale, &numChannels);
1657
1658 info->mTimeScale = timescale;
1659 info->mEOSReceived = false;
1660
1661 ALOGV("track #%zu URL=%s", mTracks.size(), trackURL.c_str());
1662
1663 AString request = "SETUP ";
1664 request.append(trackURL);
1665 request.append(" RTSP/1.0\r\n");
1666
1667 if (mTryTCPInterleaving) {
1668 size_t interleaveIndex = 2 * (mTracks.size() - 1);
1669 info->mUsingInterleavedTCP = true;
1670 info->mRTPSocket = interleaveIndex;
1671 info->mRTCPSocket = interleaveIndex + 1;
1672
1673 request.append("Transport: RTP/AVP/TCP;interleaved=");
1674 request.append(interleaveIndex);
1675 request.append("-");
1676 request.append(interleaveIndex + 1);
1677 } else {
1678 unsigned rtpPort;
1679 ARTPConnection::MakePortPair(
1680 &info->mRTPSocket, &info->mRTCPSocket, &rtpPort);
1681
1682 if (mUIDValid) {
1683 NetworkUtils::RegisterSocketUserTag(info->mRTPSocket, mUID,
1684 (uint32_t)*(uint32_t*) "RTP_");
1685 NetworkUtils::RegisterSocketUserTag(info->mRTCPSocket, mUID,
1686 (uint32_t)*(uint32_t*) "RTP_");
1687 NetworkUtils::RegisterSocketUserMark(info->mRTPSocket, mUID);
1688 NetworkUtils::RegisterSocketUserMark(info->mRTCPSocket, mUID);
1689 }
1690
1691 request.append("Transport: RTP/AVP/UDP;unicast;client_port=");
1692 request.append(rtpPort);
1693 request.append("-");
1694 request.append(rtpPort + 1);
1695 }
1696
1697 request.append("\r\n");
1698
1699 if (index > 1) {
1700 request.append("Session: ");
1701 request.append(mSessionID);
1702 request.append("\r\n");
1703 }
1704
1705 request.append("\r\n");
1706
1707 sp<AMessage> reply = new AMessage('setu', this);
1708 reply->setSize("index", index);
1709 reply->setSize("track-index", mTracks.size() - 1);
1710 mConn->sendRequest(request.c_str(), reply);
1711 }
1712
MakeURLMyHandler1713 static bool MakeURL(const char *baseURL, const char *url, AString *out) {
1714 out->clear();
1715
1716 if (strncasecmp("rtsp://", baseURL, 7)) {
1717 // Base URL must be absolute
1718 return false;
1719 }
1720
1721 if (!strncasecmp("rtsp://", url, 7)) {
1722 // "url" is already an absolute URL, ignore base URL.
1723 out->setTo(url);
1724 return true;
1725 }
1726
1727 size_t n = strlen(baseURL);
1728 out->setTo(baseURL);
1729 if (baseURL[n - 1] != '/') {
1730 out->append("/");
1731 }
1732 out->append(url);
1733
1734 return true;
1735 }
1736
fakeTimestampsMyHandler1737 void fakeTimestamps() {
1738 mNTPAnchorUs = -1ll;
1739 for (size_t i = 0; i < mTracks.size(); ++i) {
1740 onTimeUpdate(i, 0, 0ll);
1741 }
1742 }
1743
dataReceivedOnAllChannelsMyHandler1744 bool dataReceivedOnAllChannels() {
1745 TrackInfo *track;
1746 for (size_t i = 0; i < mTracks.size(); ++i) {
1747 track = &mTracks.editItemAt(i);
1748 if (track->mPackets.empty()) {
1749 return false;
1750 }
1751 }
1752 return true;
1753 }
1754
handleFirstAccessUnitMyHandler1755 void handleFirstAccessUnit() {
1756 if (mFirstAccessUnit) {
1757 sp<AMessage> msg = mNotify->dup();
1758 msg->setInt32("what", kWhatConnected);
1759 msg->post();
1760
1761 if (mSeekable) {
1762 for (size_t i = 0; i < mTracks.size(); ++i) {
1763 TrackInfo *info = &mTracks.editItemAt(i);
1764
1765 postNormalPlayTimeMapping(
1766 i,
1767 info->mNormalPlayTimeRTP, info->mNormalPlayTimeUs);
1768 }
1769 }
1770
1771 mFirstAccessUnit = false;
1772 }
1773 }
1774
onTimeUpdateMyHandler1775 void onTimeUpdate(int32_t trackIndex, uint32_t rtpTime, uint64_t ntpTime) {
1776 ALOGV("onTimeUpdate track %d, rtpTime = 0x%08x, ntpTime = %#016llx",
1777 trackIndex, rtpTime, (long long)ntpTime);
1778
1779 int64_t ntpTimeUs = (int64_t)(ntpTime * 1E6 / (1ll << 32));
1780
1781 TrackInfo *track = &mTracks.editItemAt(trackIndex);
1782
1783 track->mRTPAnchor = rtpTime;
1784 track->mNTPAnchorUs = ntpTimeUs;
1785
1786 if (mNTPAnchorUs < 0) {
1787 mNTPAnchorUs = ntpTimeUs;
1788 mMediaAnchorUs = mLastMediaTimeUs;
1789 }
1790
1791 if (!mAllTracksHaveTime) {
1792 bool allTracksHaveTime = (mTracks.size() > 0);
1793 for (size_t i = 0; i < mTracks.size(); ++i) {
1794 TrackInfo *track = &mTracks.editItemAt(i);
1795 if (track->mNTPAnchorUs < 0) {
1796 allTracksHaveTime = false;
1797 break;
1798 }
1799 }
1800 if (allTracksHaveTime) {
1801 mAllTracksHaveTime = true;
1802 ALOGI("Time now established for all tracks.");
1803 }
1804 }
1805 if (mAllTracksHaveTime && dataReceivedOnAllChannels()) {
1806 handleFirstAccessUnit();
1807
1808 // Time is now established, lets start timestamping immediately
1809 for (size_t i = 0; i < mTracks.size(); ++i) {
1810 if (OK != processAccessUnitQueue(i)) {
1811 return;
1812 }
1813 }
1814 for (size_t i = 0; i < mTracks.size(); ++i) {
1815 TrackInfo *trackInfo = &mTracks.editItemAt(i);
1816 if (trackInfo->mEOSReceived) {
1817 postQueueEOS(i, ERROR_END_OF_STREAM);
1818 trackInfo->mEOSReceived = false;
1819 }
1820 }
1821 }
1822 }
1823
processAccessUnitQueueMyHandler1824 status_t processAccessUnitQueue(int32_t trackIndex) {
1825 TrackInfo *track = &mTracks.editItemAt(trackIndex);
1826 while (!track->mPackets.empty()) {
1827 sp<ABuffer> accessUnit = *track->mPackets.begin();
1828 track->mPackets.erase(track->mPackets.begin());
1829
1830 uint32_t seqNum = (uint32_t)accessUnit->int32Data();
1831 if (track->mNewSegment) {
1832 // The sequence number from RTP packet has only 16 bits and is extended
1833 // by ARTPSource. Only the low 16 bits of seq in RTP-Info of reply of
1834 // RTSP "PLAY" command should be used to detect the first RTP packet
1835 // after seeking.
1836 if (mSeekable) {
1837 if (track->mAllowedStaleAccessUnits > 0) {
1838 uint32_t seqNum16 = seqNum & 0xffff;
1839 uint32_t firstSeqNumInSegment16 = track->mFirstSeqNumInSegment & 0xffff;
1840 if (seqNum16 > firstSeqNumInSegment16 + kMaxAllowedStaleAccessUnits
1841 || seqNum16 < firstSeqNumInSegment16) {
1842 // Not the first rtp packet of the stream after seeking, discarding.
1843 track->mAllowedStaleAccessUnits--;
1844 ALOGV("discarding stale access unit (0x%x : 0x%x)",
1845 seqNum, track->mFirstSeqNumInSegment);
1846 continue;
1847 }
1848 ALOGW_IF(seqNum16 != firstSeqNumInSegment16,
1849 "Missing the first packet(%u), now take packet(%u) as first one",
1850 track->mFirstSeqNumInSegment, seqNum);
1851 } else { // track->mAllowedStaleAccessUnits <= 0
1852 mNumAccessUnitsReceived = 0;
1853 ALOGW_IF(track->mAllowedStaleAccessUnits == 0,
1854 "Still no first rtp packet after %d stale ones",
1855 kMaxAllowedStaleAccessUnits);
1856 track->mAllowedStaleAccessUnits = -1;
1857 return UNKNOWN_ERROR;
1858 }
1859 }
1860
1861 // Now found the first rtp packet of the stream after seeking.
1862 track->mFirstSeqNumInSegment = seqNum;
1863 track->mNewSegment = false;
1864 }
1865
1866 if (seqNum < track->mFirstSeqNumInSegment) {
1867 ALOGV("dropping stale access-unit (%d < %d)",
1868 seqNum, track->mFirstSeqNumInSegment);
1869 continue;
1870 }
1871
1872 if (addMediaTimestamp(trackIndex, track, accessUnit)) {
1873 postQueueAccessUnit(trackIndex, accessUnit);
1874 }
1875 }
1876 return OK;
1877 }
1878
onAccessUnitCompleteMyHandler1879 void onAccessUnitComplete(
1880 int32_t trackIndex, const sp<ABuffer> &accessUnit) {
1881 TrackInfo *track = &mTracks.editItemAt(trackIndex);
1882 track->mPackets.push_back(accessUnit);
1883
1884 uint32_t seqNum = (uint32_t)accessUnit->int32Data();
1885 ALOGV("onAccessUnitComplete track %d storing accessunit %u", trackIndex, seqNum);
1886
1887 if(!mPlayResponseParsed){
1888 ALOGV("play response is not parsed");
1889 return;
1890 }
1891
1892 handleFirstAccessUnit();
1893
1894 if (!mAllTracksHaveTime) {
1895 ALOGV("storing accessUnit, no time established yet");
1896 return;
1897 }
1898
1899 if (OK != processAccessUnitQueue(trackIndex)) {
1900 return;
1901 }
1902
1903 if (track->mEOSReceived) {
1904 postQueueEOS(trackIndex, ERROR_END_OF_STREAM);
1905 track->mEOSReceived = false;
1906 }
1907 }
1908
addMediaTimestampMyHandler1909 bool addMediaTimestamp(
1910 int32_t trackIndex, const TrackInfo *track,
1911 const sp<ABuffer> &accessUnit) {
1912 UNUSED_UNLESS_VERBOSE(trackIndex);
1913
1914 uint32_t rtpTime;
1915 CHECK(accessUnit->meta()->findInt32(
1916 "rtp-time", (int32_t *)&rtpTime));
1917
1918 int64_t relRtpTimeUs =
1919 (((int64_t)rtpTime - (int64_t)track->mRTPAnchor) * 1000000ll)
1920 / track->mTimeScale;
1921
1922 int64_t ntpTimeUs = track->mNTPAnchorUs + relRtpTimeUs;
1923
1924 int64_t mediaTimeUs = mMediaAnchorUs + ntpTimeUs - mNTPAnchorUs;
1925
1926 if (mediaTimeUs > mLastMediaTimeUs) {
1927 mLastMediaTimeUs = mediaTimeUs;
1928 }
1929
1930 if (mediaTimeUs < 0 && !mSeekable) {
1931 ALOGV("dropping early accessUnit.");
1932 return false;
1933 }
1934
1935 ALOGV("track %d rtpTime=%u mediaTimeUs = %lld us (%.2f secs)",
1936 trackIndex, rtpTime, (long long)mediaTimeUs, mediaTimeUs / 1E6);
1937
1938 accessUnit->meta()->setInt64("timeUs", mediaTimeUs);
1939
1940 return true;
1941 }
1942
postQueueAccessUnitMyHandler1943 void postQueueAccessUnit(
1944 size_t trackIndex, const sp<ABuffer> &accessUnit) {
1945 sp<AMessage> msg = mNotify->dup();
1946 msg->setInt32("what", kWhatAccessUnit);
1947 msg->setSize("trackIndex", trackIndex);
1948 msg->setBuffer("accessUnit", accessUnit);
1949 msg->post();
1950 }
1951
postQueueEOSMyHandler1952 void postQueueEOS(size_t trackIndex, status_t finalResult) {
1953 sp<AMessage> msg = mNotify->dup();
1954 msg->setInt32("what", kWhatEOS);
1955 msg->setSize("trackIndex", trackIndex);
1956 msg->setInt32("finalResult", finalResult);
1957 msg->post();
1958 }
1959
postQueueSeekDiscontinuityMyHandler1960 void postQueueSeekDiscontinuity(size_t trackIndex) {
1961 sp<AMessage> msg = mNotify->dup();
1962 msg->setInt32("what", kWhatSeekDiscontinuity);
1963 msg->setSize("trackIndex", trackIndex);
1964 msg->post();
1965 }
1966
postNormalPlayTimeMappingMyHandler1967 void postNormalPlayTimeMapping(
1968 size_t trackIndex, uint32_t rtpTime, int64_t nptUs) {
1969 sp<AMessage> msg = mNotify->dup();
1970 msg->setInt32("what", kWhatNormalPlayTimeMapping);
1971 msg->setSize("trackIndex", trackIndex);
1972 msg->setInt32("rtpTime", rtpTime);
1973 msg->setInt64("nptUs", nptUs);
1974 msg->post();
1975 }
1976
postTimeoutMyHandler1977 void postTimeout() {
1978 sp<AMessage> timeout = new AMessage('tiou', this);
1979 mCheckTimeoutGeneration++;
1980 timeout->setInt32("tioucheck", mCheckTimeoutGeneration);
1981
1982 int64_t startupTimeoutUs;
1983 startupTimeoutUs = property_get_int64("media.rtsp.timeout-us", kStartupTimeoutUs);
1984 timeout->post(startupTimeoutUs);
1985 }
1986
1987 DISALLOW_EVIL_CONSTRUCTORS(MyHandler);
1988 };
1989
1990 } // namespace android
1991
1992 #endif // MY_HANDLER_H_
1993