1 /*
2 * Copyright (C) 2010 The Android Open Source Project
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
17 #ifndef MY_HANDLER_H_
18
19 #define MY_HANDLER_H_
20
21 //#define LOG_NDEBUG 0
22
23 #ifndef LOG_TAG
24 #define LOG_TAG "MyHandler"
25 #endif
26
27 #include <utils/Log.h>
28 #include <cutils/properties.h> // for property_get
29
30 #include "APacketSource.h"
31 #include "ARTPConnection.h"
32 #include "ARTSPConnection.h"
33 #include "ASessionDescription.h"
34 #include "NetworkUtils.h"
35
36 #include <ctype.h>
37 #include <cutils/properties.h>
38
39 #include <media/stagefright/foundation/ABuffer.h>
40 #include <media/stagefright/foundation/ADebug.h>
41 #include <media/stagefright/foundation/ALooper.h>
42 #include <media/stagefright/foundation/AMessage.h>
43 #include <media/stagefright/MediaErrors.h>
44 #include <media/stagefright/Utils.h>
45
46 #include <arpa/inet.h>
47 #include <sys/socket.h>
48 #include <netdb.h>
49
50 #include "HTTPBase.h"
51
52 #if LOG_NDEBUG
53 #define UNUSED_UNLESS_VERBOSE(x) (void)(x)
54 #else
55 #define UNUSED_UNLESS_VERBOSE(x)
56 #endif
57
58 #ifndef FALLTHROUGH_INTENDED
59 #define FALLTHROUGH_INTENDED [[clang::fallthrough]] // NOLINT
60 #endif
61
62 // If no access units are received within 5 secs, assume that the rtp
63 // stream has ended and signal end of stream.
64 static int64_t kAccessUnitTimeoutUs = 10000000ll;
65
66 // If no access units arrive for the first 10 secs after starting the
67 // stream, assume none ever will and signal EOS or switch transports.
68 static int64_t kStartupTimeoutUs = 10000000ll;
69
70 static int64_t kDefaultKeepAliveTimeoutUs = 60000000ll;
71
72 static int64_t kPauseDelayUs = 3000000ll;
73
74 // The allowed maximum number of stale access units at the beginning of
75 // a new sequence.
76 static int32_t kMaxAllowedStaleAccessUnits = 20;
77
78 static int64_t kTearDownTimeoutUs = 3000000ll;
79
80 namespace android {
81
GetAttribute(const char * s,const char * key,AString * value)82 static bool GetAttribute(const char *s, const char *key, AString *value) {
83 value->clear();
84
85 size_t keyLen = strlen(key);
86
87 for (;;) {
88 while (isspace(*s)) {
89 ++s;
90 }
91
92 const char *colonPos = strchr(s, ';');
93
94 size_t len =
95 (colonPos == NULL) ? strlen(s) : colonPos - s;
96
97 if (len >= keyLen + 1 && s[keyLen] == '=' && !strncmp(s, key, keyLen)) {
98 value->setTo(&s[keyLen + 1], len - keyLen - 1);
99 return true;
100 }
101
102 if (colonPos == NULL) {
103 return false;
104 }
105
106 s = colonPos + 1;
107 }
108 }
109
110 struct MyHandler : public AHandler {
111 enum {
112 kWhatConnected = 'conn',
113 kWhatDisconnected = 'disc',
114 kWhatSeekPaused = 'spau',
115 kWhatSeekDone = 'sdon',
116
117 kWhatAccessUnit = 'accU',
118 kWhatEOS = 'eos!',
119 kWhatSeekDiscontinuity = 'seeD',
120 kWhatNormalPlayTimeMapping = 'nptM',
121 };
122
123 MyHandler(
124 const char *url,
125 const sp<AMessage> ¬ify,
126 bool uidValid = false, uid_t uid = 0)
mNotifyMyHandler127 : mNotify(notify),
128 mUIDValid(uidValid),
129 mUID(uid),
130 mNetLooper(new ALooper),
131 mConn(new ARTSPConnection(mUIDValid, mUID)),
132 mRTPConn(new ARTPConnection),
133 mOriginalSessionURL(url),
134 mSessionURL(url),
135 mSetupTracksSuccessful(false),
136 mSeekPending(false),
137 mFirstAccessUnit(true),
138 mAllTracksHaveTime(false),
139 mNTPAnchorUs(-1),
140 mMediaAnchorUs(-1),
141 mLastMediaTimeUs(0),
142 mNumAccessUnitsReceived(0),
143 mCheckPending(false),
144 mCheckGeneration(0),
145 mCheckTimeoutGeneration(0),
146 mTryTCPInterleaving(property_get_bool("rtp.transport.TCP", false)),
147 mTryFakeRTCP(false),
148 mReceivedFirstRTCPPacket(false),
149 mReceivedFirstRTPPacket(false),
150 mSeekable(true),
151 mKeepAliveTimeoutUs(kDefaultKeepAliveTimeoutUs),
152 mKeepAliveGeneration(0),
153 mPausing(false),
154 mPauseGeneration(0),
155 mPlayResponseParsed(false) {
156 mNetLooper->setName("rtsp net");
157 mNetLooper->start(false /* runOnCallingThread */,
158 false /* canCallJava */,
159 PRIORITY_HIGHEST);
160
161 // Strip any authentication info from the session url, we don't
162 // want to transmit user/pass in cleartext.
163 AString host, path, user, pass;
164 unsigned port;
165 CHECK(ARTSPConnection::ParseURL(
166 mSessionURL.c_str(), &host, &port, &path, &user, &pass));
167
168 if (user.size() > 0) {
169 mSessionURL.clear();
170 mSessionURL.append("rtsp://");
171 mSessionURL.append(host);
172 mSessionURL.append(":");
173 mSessionURL.append(AStringPrintf("%u", port));
174 mSessionURL.append(path);
175
176 ALOGV("rewritten session url: '%s'", mSessionURL.c_str());
177 }
178
179 mSessionHost = host;
180 }
181
connectMyHandler182 void connect() {
183 looper()->registerHandler(mConn);
184 (1 ? mNetLooper : looper())->registerHandler(mRTPConn);
185
186 sp<AMessage> notify = new AMessage('biny', this);
187 mConn->observeBinaryData(notify);
188
189 sp<AMessage> reply = new AMessage('conn', this);
190 mConn->connect(mOriginalSessionURL.c_str(), reply);
191 }
192
loadSDPMyHandler193 void loadSDP(const sp<ASessionDescription>& desc) {
194 looper()->registerHandler(mConn);
195 (1 ? mNetLooper : looper())->registerHandler(mRTPConn);
196
197 sp<AMessage> notify = new AMessage('biny', this);
198 mConn->observeBinaryData(notify);
199
200 sp<AMessage> reply = new AMessage('sdpl', this);
201 reply->setObject("description", desc);
202 mConn->connect(mOriginalSessionURL.c_str(), reply);
203 }
204
getControlURLMyHandler205 AString getControlURL() {
206 AString sessionLevelControlURL;
207 if (mSessionDesc->findAttribute(
208 0,
209 "a=control",
210 &sessionLevelControlURL)) {
211 if (sessionLevelControlURL.compare("*") == 0) {
212 return mBaseURL;
213 } else {
214 AString controlURL;
215 CHECK(MakeURL(
216 mBaseURL.c_str(),
217 sessionLevelControlURL.c_str(),
218 &controlURL));
219 return controlURL;
220 }
221 } else {
222 return mSessionURL;
223 }
224 }
225
disconnectMyHandler226 void disconnect() {
227 (new AMessage('abor', this))->post();
228 }
229
seekMyHandler230 void seek(int64_t timeUs) {
231 sp<AMessage> msg = new AMessage('seek', this);
232 msg->setInt64("time", timeUs);
233 mPauseGeneration++;
234 msg->post();
235 }
236
continueSeekAfterPauseMyHandler237 void continueSeekAfterPause(int64_t timeUs) {
238 sp<AMessage> msg = new AMessage('see1', this);
239 msg->setInt64("time", timeUs);
240 msg->post();
241 }
242
isSeekableMyHandler243 bool isSeekable() const {
244 return mSeekable;
245 }
246
pauseMyHandler247 void pause() {
248 sp<AMessage> msg = new AMessage('paus', this);
249 mPauseGeneration++;
250 msg->setInt32("pausecheck", mPauseGeneration);
251 msg->post();
252 }
253
resumeMyHandler254 void resume() {
255 sp<AMessage> msg = new AMessage('resu', this);
256 mPauseGeneration++;
257 msg->post();
258 }
259
addRRMyHandler260 static void addRR(const sp<ABuffer> &buf) {
261 uint8_t *ptr = buf->data() + buf->size();
262 ptr[0] = 0x80 | 0;
263 ptr[1] = 201; // RR
264 ptr[2] = 0;
265 ptr[3] = 1;
266 ptr[4] = 0xde; // SSRC
267 ptr[5] = 0xad;
268 ptr[6] = 0xbe;
269 ptr[7] = 0xef;
270
271 buf->setRange(0, buf->size() + 8);
272 }
273
addSDESMyHandler274 static void addSDES(int s, const sp<ABuffer> &buffer) {
275 struct sockaddr_in addr;
276 socklen_t addrSize = sizeof(addr);
277 if (getsockname(s, (sockaddr *)&addr, &addrSize) != 0) {
278 inet_aton("0.0.0.0", &(addr.sin_addr));
279 }
280
281 uint8_t *data = buffer->data() + buffer->size();
282 data[0] = 0x80 | 1;
283 data[1] = 202; // SDES
284 data[4] = 0xde; // SSRC
285 data[5] = 0xad;
286 data[6] = 0xbe;
287 data[7] = 0xef;
288
289 size_t offset = 8;
290
291 data[offset++] = 1; // CNAME
292
293 AString cname = "stagefright@";
294 cname.append(inet_ntoa(addr.sin_addr));
295 data[offset++] = cname.size();
296
297 memcpy(&data[offset], cname.c_str(), cname.size());
298 offset += cname.size();
299
300 data[offset++] = 6; // TOOL
301
302 AString tool = MakeUserAgent();
303
304 data[offset++] = tool.size();
305
306 memcpy(&data[offset], tool.c_str(), tool.size());
307 offset += tool.size();
308
309 data[offset++] = 0;
310
311 if ((offset % 4) > 0) {
312 size_t count = 4 - (offset % 4);
313 switch (count) {
314 case 3:
315 data[offset++] = 0;
316 FALLTHROUGH_INTENDED;
317 case 2:
318 data[offset++] = 0;
319 FALLTHROUGH_INTENDED;
320 case 1:
321 data[offset++] = 0;
322 }
323 }
324
325 size_t numWords = (offset / 4) - 1;
326 data[2] = numWords >> 8;
327 data[3] = numWords & 0xff;
328
329 buffer->setRange(buffer->offset(), buffer->size() + offset);
330 }
331
332 // In case we're behind NAT, fire off two UDP packets to the remote
333 // rtp/rtcp ports to poke a hole into the firewall for future incoming
334 // packets. We're going to send an RR/SDES RTCP packet to both of them.
pokeAHoleMyHandler335 bool pokeAHole(int rtpSocket, int rtcpSocket, const AString &transport) {
336 struct sockaddr_in addr;
337 memset(addr.sin_zero, 0, sizeof(addr.sin_zero));
338 addr.sin_family = AF_INET;
339
340 AString source;
341 AString server_port;
342 if (!GetAttribute(transport.c_str(),
343 "source",
344 &source)) {
345 ALOGW("Missing 'source' field in Transport response. Using "
346 "RTSP endpoint address.");
347
348 struct hostent *ent = gethostbyname(mSessionHost.c_str());
349 if (ent == NULL) {
350 ALOGE("Failed to look up address of session host");
351
352 return false;
353 }
354
355 addr.sin_addr.s_addr = *(in_addr_t *)ent->h_addr;
356 } else {
357 addr.sin_addr.s_addr = inet_addr(source.c_str());
358 }
359
360 if (!GetAttribute(transport.c_str(),
361 "server_port",
362 &server_port)) {
363 ALOGI("Missing 'server_port' field in Transport response.");
364 return false;
365 }
366
367 int rtpPort, rtcpPort;
368 if (sscanf(server_port.c_str(), "%d-%d", &rtpPort, &rtcpPort) != 2
369 || rtpPort <= 0 || rtpPort > 65535
370 || rtcpPort <=0 || rtcpPort > 65535
371 || rtcpPort != rtpPort + 1) {
372 ALOGE("Server picked invalid RTP/RTCP port pair %s,"
373 " RTP port must be even, RTCP port must be one higher.",
374 server_port.c_str());
375
376 return false;
377 }
378
379 if (rtpPort & 1) {
380 ALOGW("Server picked an odd RTP port, it should've picked an "
381 "even one, we'll let it pass for now, but this may break "
382 "in the future.");
383 }
384
385 if (addr.sin_addr.s_addr == INADDR_NONE) {
386 return true;
387 }
388
389 if (IN_LOOPBACK(ntohl(addr.sin_addr.s_addr))) {
390 // No firewalls to traverse on the loopback interface.
391 return true;
392 }
393
394 // Make up an RR/SDES RTCP packet.
395 sp<ABuffer> buf = new ABuffer(65536);
396 buf->setRange(0, 0);
397 addRR(buf);
398 addSDES(rtpSocket, buf);
399
400 addr.sin_port = htons(rtpPort);
401
402 ssize_t n = sendto(
403 rtpSocket, buf->data(), buf->size(), 0,
404 (const sockaddr *)&addr, sizeof(addr));
405
406 if (n < (ssize_t)buf->size()) {
407 ALOGE("failed to poke a hole for RTP packets");
408 return false;
409 }
410
411 addr.sin_port = htons(rtcpPort);
412
413 n = sendto(
414 rtcpSocket, buf->data(), buf->size(), 0,
415 (const sockaddr *)&addr, sizeof(addr));
416
417 if (n < (ssize_t)buf->size()) {
418 ALOGE("failed to poke a hole for RTCP packets");
419 return false;
420 }
421
422 ALOGV("successfully poked holes.");
423
424 return true;
425 }
426
isLiveStreamMyHandler427 static bool isLiveStream(const sp<ASessionDescription> &desc) {
428 AString attrLiveStream;
429 if (desc->findAttribute(0, "a=LiveStream", &attrLiveStream)) {
430 ssize_t semicolonPos = attrLiveStream.find(";", 2);
431
432 const char* liveStreamValue;
433 if (semicolonPos < 0) {
434 liveStreamValue = attrLiveStream.c_str();
435 } else {
436 AString valString;
437 valString.setTo(attrLiveStream,
438 semicolonPos + 1,
439 attrLiveStream.size() - semicolonPos - 1);
440 liveStreamValue = valString.c_str();
441 }
442
443 uint32_t value = strtoul(liveStreamValue, NULL, 10);
444 if (value == 1) {
445 ALOGV("found live stream");
446 return true;
447 }
448 } else {
449 // It is a live stream if no duration is returned
450 int64_t durationUs;
451 if (!desc->getDurationUs(&durationUs)) {
452 ALOGV("No duration found, assume live stream");
453 return true;
454 }
455 }
456
457 return false;
458 }
459
onMessageReceivedMyHandler460 virtual void onMessageReceived(const sp<AMessage> &msg) {
461 switch (msg->what()) {
462 case 'conn':
463 {
464 int32_t result;
465 CHECK(msg->findInt32("result", &result));
466
467 ALOGI("connection request completed with result %d (%s)",
468 result, strerror(-result));
469
470 if (result == OK) {
471 AString request;
472 request = "DESCRIBE ";
473 request.append(mSessionURL);
474 request.append(" RTSP/1.0\r\n");
475 request.append("Accept: application/sdp\r\n");
476 request.append("\r\n");
477
478 sp<AMessage> reply = new AMessage('desc', this);
479 mConn->sendRequest(request.c_str(), reply);
480 } else {
481 (new AMessage('disc', this))->post();
482 }
483 break;
484 }
485
486 case 'disc':
487 {
488 ++mKeepAliveGeneration;
489
490 int32_t reconnect;
491 if (msg->findInt32("reconnect", &reconnect) && reconnect) {
492 sp<AMessage> reply = new AMessage('conn', this);
493 mConn->connect(mOriginalSessionURL.c_str(), reply);
494 } else {
495 (new AMessage('quit', this))->post();
496 }
497 break;
498 }
499
500 case 'desc':
501 {
502 int32_t result;
503 CHECK(msg->findInt32("result", &result));
504
505 ALOGI("DESCRIBE completed with result %d (%s)",
506 result, strerror(-result));
507
508 if (result == OK) {
509 sp<RefBase> obj;
510 CHECK(msg->findObject("response", &obj));
511 sp<ARTSPResponse> response =
512 static_cast<ARTSPResponse *>(obj.get());
513
514 if (response->mStatusCode == 301 || response->mStatusCode == 302) {
515 ssize_t i = response->mHeaders.indexOfKey("location");
516 CHECK_GE(i, 0);
517
518 mOriginalSessionURL = response->mHeaders.valueAt(i);
519 mSessionURL = mOriginalSessionURL;
520
521 // Strip any authentication info from the session url, we don't
522 // want to transmit user/pass in cleartext.
523 AString host, path, user, pass;
524 unsigned port;
525 if (ARTSPConnection::ParseURL(
526 mSessionURL.c_str(), &host, &port, &path, &user, &pass)
527 && user.size() > 0) {
528 mSessionURL.clear();
529 mSessionURL.append("rtsp://");
530 mSessionURL.append(host);
531 mSessionURL.append(":");
532 mSessionURL.append(AStringPrintf("%u", port));
533 mSessionURL.append(path);
534
535 ALOGV("rewritten session url: '%s'", mSessionURL.c_str());
536 }
537
538 sp<AMessage> reply = new AMessage('conn', this);
539 mConn->connect(mOriginalSessionURL.c_str(), reply);
540 break;
541 }
542
543 if (response->mStatusCode != 200) {
544 result = UNKNOWN_ERROR;
545 } else if (response->mContent == NULL) {
546 result = ERROR_MALFORMED;
547 ALOGE("The response has no content.");
548 } else {
549 mSessionDesc = new ASessionDescription;
550
551 mSessionDesc->setTo(
552 response->mContent->data(),
553 response->mContent->size());
554
555 if (!mSessionDesc->isValid()) {
556 ALOGE("Failed to parse session description.");
557 result = ERROR_MALFORMED;
558 } else {
559 ssize_t i = response->mHeaders.indexOfKey("content-base");
560 if (i >= 0) {
561 mBaseURL = response->mHeaders.valueAt(i);
562 } else {
563 i = response->mHeaders.indexOfKey("content-location");
564 if (i >= 0) {
565 mBaseURL = response->mHeaders.valueAt(i);
566 } else {
567 mBaseURL = mSessionURL;
568 }
569 }
570
571 mSeekable = !isLiveStream(mSessionDesc);
572
573 if (!mBaseURL.startsWith("rtsp://")) {
574 // Some misbehaving servers specify a relative
575 // URL in one of the locations above, combine
576 // it with the absolute session URL to get
577 // something usable...
578
579 ALOGW("Server specified a non-absolute base URL"
580 ", combining it with the session URL to "
581 "get something usable...");
582
583 AString tmp;
584 CHECK(MakeURL(
585 mSessionURL.c_str(),
586 mBaseURL.c_str(),
587 &tmp));
588
589 mBaseURL = tmp;
590 }
591
592 mControlURL = getControlURL();
593
594 if (mSessionDesc->countTracks() < 2) {
595 // There's no actual tracks in this session.
596 // The first "track" is merely session meta
597 // data.
598
599 ALOGW("Session doesn't contain any playable "
600 "tracks. Aborting.");
601 result = ERROR_UNSUPPORTED;
602 } else {
603 setupTrack(1);
604 }
605 }
606 }
607 }
608
609 if (result != OK) {
610 sp<AMessage> reply = new AMessage('disc', this);
611 mConn->disconnect(reply);
612 }
613 break;
614 }
615
616 case 'sdpl':
617 {
618 int32_t result;
619 CHECK(msg->findInt32("result", &result));
620
621 ALOGI("SDP connection request completed with result %d (%s)",
622 result, strerror(-result));
623
624 if (result == OK) {
625 sp<RefBase> obj;
626 CHECK(msg->findObject("description", &obj));
627 mSessionDesc =
628 static_cast<ASessionDescription *>(obj.get());
629
630 if (!mSessionDesc->isValid()) {
631 ALOGE("Failed to parse session description.");
632 result = ERROR_MALFORMED;
633 } else {
634 mBaseURL = mSessionURL;
635
636 mSeekable = !isLiveStream(mSessionDesc);
637
638 mControlURL = getControlURL();
639
640 if (mSessionDesc->countTracks() < 2) {
641 // There's no actual tracks in this session.
642 // The first "track" is merely session meta
643 // data.
644
645 ALOGW("Session doesn't contain any playable "
646 "tracks. Aborting.");
647 result = ERROR_UNSUPPORTED;
648 } else {
649 setupTrack(1);
650 }
651 }
652 }
653
654 if (result != OK) {
655 sp<AMessage> reply = new AMessage('disc', this);
656 mConn->disconnect(reply);
657 }
658 break;
659 }
660
661 case 'setu':
662 {
663 size_t index;
664 CHECK(msg->findSize("index", &index));
665
666 TrackInfo *track = NULL;
667 size_t trackIndex;
668 if (msg->findSize("track-index", &trackIndex)) {
669 track = &mTracks.editItemAt(trackIndex);
670 }
671
672 int32_t result;
673 CHECK(msg->findInt32("result", &result));
674
675 ALOGI("SETUP(%zu) completed with result %d (%s)",
676 index, result, strerror(-result));
677
678 if (result == OK) {
679 CHECK(track != NULL);
680
681 sp<RefBase> obj;
682 CHECK(msg->findObject("response", &obj));
683 sp<ARTSPResponse> response =
684 static_cast<ARTSPResponse *>(obj.get());
685
686 if (response->mStatusCode != 200) {
687 result = UNKNOWN_ERROR;
688 } else {
689 ssize_t i = response->mHeaders.indexOfKey("session");
690 CHECK_GE(i, 0);
691
692 mSessionID = response->mHeaders.valueAt(i);
693
694 mKeepAliveTimeoutUs = kDefaultKeepAliveTimeoutUs;
695 AString timeoutStr;
696 if (GetAttribute(
697 mSessionID.c_str(), "timeout", &timeoutStr)) {
698 char *end;
699 unsigned long timeoutSecs =
700 strtoul(timeoutStr.c_str(), &end, 10);
701
702 if (end == timeoutStr.c_str() || *end != '\0') {
703 ALOGW("server specified malformed timeout '%s'",
704 timeoutStr.c_str());
705
706 mKeepAliveTimeoutUs = kDefaultKeepAliveTimeoutUs;
707 } else if (timeoutSecs < 15) {
708 ALOGW("server specified too short a timeout "
709 "(%lu secs), using default.",
710 timeoutSecs);
711
712 mKeepAliveTimeoutUs = kDefaultKeepAliveTimeoutUs;
713 } else {
714 mKeepAliveTimeoutUs = timeoutSecs * 1000000ll;
715
716 ALOGI("server specified timeout of %lu secs.",
717 timeoutSecs);
718 }
719 }
720
721 i = mSessionID.find(";");
722 if (i >= 0) {
723 // Remove options, i.e. ";timeout=90"
724 mSessionID.erase(i, mSessionID.size() - i);
725 }
726
727 sp<AMessage> notify = new AMessage('accu', this);
728 notify->setSize("track-index", trackIndex);
729
730 i = response->mHeaders.indexOfKey("transport");
731 CHECK_GE(i, 0);
732
733 if (track->mRTPSocket != -1 && track->mRTCPSocket != -1) {
734 if (!track->mUsingInterleavedTCP) {
735 AString transport = response->mHeaders.valueAt(i);
736
737 // We are going to continue even if we were
738 // unable to poke a hole into the firewall...
739 pokeAHole(
740 track->mRTPSocket,
741 track->mRTCPSocket,
742 transport);
743 }
744
745 mRTPConn->addStream(
746 track->mRTPSocket, track->mRTCPSocket,
747 mSessionDesc, index,
748 notify, track->mUsingInterleavedTCP);
749
750 mSetupTracksSuccessful = true;
751 } else {
752 result = BAD_VALUE;
753 }
754 }
755 }
756
757 if (result != OK) {
758 if (track) {
759 if (!track->mUsingInterleavedTCP) {
760 // Clear the tag
761 if (mUIDValid) {
762 NetworkUtils::UnRegisterSocketUserTag(track->mRTPSocket);
763 NetworkUtils::UnRegisterSocketUserMark(track->mRTPSocket);
764 NetworkUtils::UnRegisterSocketUserTag(track->mRTCPSocket);
765 NetworkUtils::UnRegisterSocketUserMark(track->mRTCPSocket);
766 }
767
768 close(track->mRTPSocket);
769 close(track->mRTCPSocket);
770 }
771
772 mTracks.removeItemsAt(trackIndex);
773 }
774 }
775
776 ++index;
777 if (result == OK && index < mSessionDesc->countTracks()) {
778 setupTrack(index);
779 } else if (mSetupTracksSuccessful) {
780 ++mKeepAliveGeneration;
781 postKeepAlive();
782
783 AString request = "PLAY ";
784 request.append(mControlURL);
785 request.append(" RTSP/1.0\r\n");
786
787 request.append("Session: ");
788 request.append(mSessionID);
789 request.append("\r\n");
790
791 request.append("\r\n");
792
793 sp<AMessage> reply = new AMessage('play', this);
794 mConn->sendRequest(request.c_str(), reply);
795 } else {
796 sp<AMessage> reply = new AMessage('disc', this);
797 mConn->disconnect(reply);
798 }
799 break;
800 }
801
802 case 'play':
803 {
804 int32_t result;
805 CHECK(msg->findInt32("result", &result));
806
807 ALOGI("PLAY completed with result %d (%s)",
808 result, strerror(-result));
809
810 if (result == OK) {
811 sp<RefBase> obj;
812 CHECK(msg->findObject("response", &obj));
813 sp<ARTSPResponse> response =
814 static_cast<ARTSPResponse *>(obj.get());
815
816 if (response->mStatusCode != 200) {
817 result = UNKNOWN_ERROR;
818 } else {
819 parsePlayResponse(response);
820 postTimeout();
821 }
822 }
823
824 if (result != OK) {
825 sp<AMessage> reply = new AMessage('disc', this);
826 mConn->disconnect(reply);
827 }
828
829 break;
830 }
831
832 case 'aliv':
833 {
834 int32_t generation;
835 CHECK(msg->findInt32("generation", &generation));
836
837 if (generation != mKeepAliveGeneration) {
838 // obsolete event.
839 break;
840 }
841
842 AString request;
843 request.append("OPTIONS ");
844 request.append(mSessionURL);
845 request.append(" RTSP/1.0\r\n");
846 request.append("Session: ");
847 request.append(mSessionID);
848 request.append("\r\n");
849 request.append("\r\n");
850
851 sp<AMessage> reply = new AMessage('opts', this);
852 reply->setInt32("generation", mKeepAliveGeneration);
853 mConn->sendRequest(request.c_str(), reply);
854 break;
855 }
856
857 case 'opts':
858 {
859 int32_t result;
860 CHECK(msg->findInt32("result", &result));
861
862 ALOGI("OPTIONS completed with result %d (%s)",
863 result, strerror(-result));
864
865 int32_t generation;
866 CHECK(msg->findInt32("generation", &generation));
867
868 if (generation != mKeepAliveGeneration) {
869 // obsolete event.
870 break;
871 }
872
873 postKeepAlive();
874 break;
875 }
876
877 case 'abor':
878 {
879 for (size_t i = 0; i < mTracks.size(); ++i) {
880 TrackInfo *info = &mTracks.editItemAt(i);
881
882 if (!mFirstAccessUnit) {
883 postQueueEOS(i, ERROR_END_OF_STREAM);
884 }
885
886 if (!info->mUsingInterleavedTCP) {
887 mRTPConn->removeStream(info->mRTPSocket, info->mRTCPSocket);
888
889 // Clear the tag
890 if (mUIDValid) {
891 NetworkUtils::UnRegisterSocketUserTag(info->mRTPSocket);
892 NetworkUtils::UnRegisterSocketUserMark(info->mRTPSocket);
893 NetworkUtils::UnRegisterSocketUserTag(info->mRTCPSocket);
894 NetworkUtils::UnRegisterSocketUserMark(info->mRTCPSocket);
895 }
896
897 close(info->mRTPSocket);
898 close(info->mRTCPSocket);
899 }
900 }
901 mTracks.clear();
902 mSetupTracksSuccessful = false;
903 mSeekPending = false;
904 mFirstAccessUnit = true;
905 mAllTracksHaveTime = false;
906 mNTPAnchorUs = -1;
907 mMediaAnchorUs = -1;
908 mNumAccessUnitsReceived = 0;
909 mReceivedFirstRTCPPacket = false;
910 mReceivedFirstRTPPacket = false;
911 mPausing = false;
912 mSeekable = true;
913
914 sp<AMessage> reply = new AMessage('tear', this);
915
916 int32_t reconnect;
917 if (msg->findInt32("reconnect", &reconnect) && reconnect) {
918 reply->setInt32("reconnect", true);
919 }
920
921 AString request;
922 request = "TEARDOWN ";
923
924 // XXX should use aggregate url from SDP here...
925 request.append(mSessionURL);
926 request.append(" RTSP/1.0\r\n");
927
928 request.append("Session: ");
929 request.append(mSessionID);
930 request.append("\r\n");
931
932 request.append("\r\n");
933
934 mConn->sendRequest(request.c_str(), reply);
935
936 // If the response of teardown hasn't been received in 3 seconds,
937 // post 'tear' message to avoid ANR.
938 if (!msg->findInt32("reconnect", &reconnect) || !reconnect) {
939 sp<AMessage> teardown = reply->dup();
940 teardown->setInt32("result", -ECONNABORTED);
941 teardown->post(kTearDownTimeoutUs);
942 }
943 break;
944 }
945
946 case 'tear':
947 {
948 int32_t result;
949 CHECK(msg->findInt32("result", &result));
950
951 ALOGI("TEARDOWN completed with result %d (%s)",
952 result, strerror(-result));
953
954 sp<AMessage> reply = new AMessage('disc', this);
955
956 int32_t reconnect;
957 if (msg->findInt32("reconnect", &reconnect) && reconnect) {
958 reply->setInt32("reconnect", true);
959 }
960
961 mConn->disconnect(reply);
962 break;
963 }
964
965 case 'quit':
966 {
967 sp<AMessage> msg = mNotify->dup();
968 msg->setInt32("what", kWhatDisconnected);
969 msg->setInt32("result", UNKNOWN_ERROR);
970 msg->post();
971 break;
972 }
973
974 case 'chek':
975 {
976 int32_t generation;
977 CHECK(msg->findInt32("generation", &generation));
978 if (generation != mCheckGeneration) {
979 // This is an outdated message. Ignore.
980 break;
981 }
982
983 if (mNumAccessUnitsReceived == 0) {
984 #if 1
985 ALOGI("stream ended? aborting.");
986 (new AMessage('abor', this))->post();
987 break;
988 #else
989 ALOGI("haven't seen an AU in a looong time.");
990 #endif
991 }
992
993 mNumAccessUnitsReceived = 0;
994 msg->post(kAccessUnitTimeoutUs);
995 break;
996 }
997
998 case 'accu':
999 {
1000 if (mSeekPending) {
1001 ALOGV("Stale access unit.");
1002 break;
1003 }
1004
1005 int32_t timeUpdate;
1006 if (msg->findInt32("time-update", &timeUpdate) && timeUpdate) {
1007 size_t trackIndex;
1008 CHECK(msg->findSize("track-index", &trackIndex));
1009
1010 uint32_t rtpTime;
1011 uint64_t ntpTime;
1012 CHECK(msg->findInt32("rtp-time", (int32_t *)&rtpTime));
1013 CHECK(msg->findInt64("ntp-time", (int64_t *)&ntpTime));
1014
1015 onTimeUpdate(trackIndex, rtpTime, ntpTime);
1016 break;
1017 }
1018
1019 int32_t first;
1020 if (msg->findInt32("first-rtcp", &first)) {
1021 mReceivedFirstRTCPPacket = true;
1022 break;
1023 }
1024
1025 if (msg->findInt32("first-rtp", &first)) {
1026 mReceivedFirstRTPPacket = true;
1027 break;
1028 }
1029
1030 ++mNumAccessUnitsReceived;
1031 postAccessUnitTimeoutCheck();
1032
1033 size_t trackIndex;
1034 CHECK(msg->findSize("track-index", &trackIndex));
1035
1036 if (trackIndex >= mTracks.size()) {
1037 ALOGV("late packets ignored.");
1038 break;
1039 }
1040
1041 TrackInfo *track = &mTracks.editItemAt(trackIndex);
1042
1043 int32_t eos;
1044 if (msg->findInt32("eos", &eos)) {
1045 ALOGI("received BYE on track index %zu", trackIndex);
1046 if (!mAllTracksHaveTime && dataReceivedOnAllChannels()) {
1047 ALOGI("No time established => fake existing data");
1048
1049 track->mEOSReceived = true;
1050 mTryFakeRTCP = true;
1051 mReceivedFirstRTCPPacket = true;
1052 fakeTimestamps();
1053 } else {
1054 postQueueEOS(trackIndex, ERROR_END_OF_STREAM);
1055 }
1056 return;
1057 }
1058
1059 if (mSeekPending) {
1060 ALOGV("we're seeking, dropping stale packet.");
1061 break;
1062 }
1063
1064 sp<ABuffer> accessUnit;
1065 CHECK(msg->findBuffer("access-unit", &accessUnit));
1066 onAccessUnitComplete(trackIndex, accessUnit);
1067 break;
1068 }
1069
1070 case 'paus':
1071 {
1072 int32_t generation;
1073 CHECK(msg->findInt32("pausecheck", &generation));
1074 if (generation != mPauseGeneration) {
1075 ALOGV("Ignoring outdated pause message.");
1076 break;
1077 }
1078
1079 if (!mSeekable) {
1080 ALOGW("This is a live stream, ignoring pause request.");
1081 break;
1082 }
1083
1084 if (mPausing) {
1085 ALOGV("This stream is already paused.");
1086 break;
1087 }
1088
1089 mCheckPending = true;
1090 ++mCheckGeneration;
1091 mPausing = true;
1092
1093 AString request = "PAUSE ";
1094 request.append(mControlURL);
1095 request.append(" RTSP/1.0\r\n");
1096
1097 request.append("Session: ");
1098 request.append(mSessionID);
1099 request.append("\r\n");
1100
1101 request.append("\r\n");
1102
1103 sp<AMessage> reply = new AMessage('pau2', this);
1104 mConn->sendRequest(request.c_str(), reply);
1105 break;
1106 }
1107
1108 case 'pau2':
1109 {
1110 int32_t result;
1111 CHECK(msg->findInt32("result", &result));
1112 mCheckTimeoutGeneration++;
1113
1114 ALOGI("PAUSE completed with result %d (%s)",
1115 result, strerror(-result));
1116 break;
1117 }
1118
1119 case 'resu':
1120 {
1121 if (mPausing && mSeekPending) {
1122 // If seeking, Play will be sent from see1 instead
1123 break;
1124 }
1125
1126 if (!mPausing) {
1127 // Dont send PLAY if we have not paused
1128 break;
1129 }
1130 AString request = "PLAY ";
1131 request.append(mControlURL);
1132 request.append(" RTSP/1.0\r\n");
1133
1134 request.append("Session: ");
1135 request.append(mSessionID);
1136 request.append("\r\n");
1137
1138 request.append("\r\n");
1139
1140 sp<AMessage> reply = new AMessage('res2', this);
1141 mConn->sendRequest(request.c_str(), reply);
1142 break;
1143 }
1144
1145 case 'res2':
1146 {
1147 int32_t result;
1148 CHECK(msg->findInt32("result", &result));
1149
1150 ALOGI("PLAY (for resume) completed with result %d (%s)",
1151 result, strerror(-result));
1152
1153 mCheckPending = false;
1154 ++mCheckGeneration;
1155 postAccessUnitTimeoutCheck();
1156
1157 if (result == OK) {
1158 sp<RefBase> obj;
1159 CHECK(msg->findObject("response", &obj));
1160 sp<ARTSPResponse> response =
1161 static_cast<ARTSPResponse *>(obj.get());
1162
1163 if (response->mStatusCode != 200) {
1164 result = UNKNOWN_ERROR;
1165 } else {
1166 parsePlayResponse(response);
1167
1168 // Post new timeout in order to make sure to use
1169 // fake timestamps if no new Sender Reports arrive
1170 postTimeout();
1171 }
1172 }
1173
1174 if (result != OK) {
1175 ALOGE("resume failed, aborting.");
1176 (new AMessage('abor', this))->post();
1177 }
1178
1179 mPausing = false;
1180 break;
1181 }
1182
1183 case 'seek':
1184 {
1185 if (!mSeekable) {
1186 ALOGW("This is a live stream, ignoring seek request.");
1187
1188 sp<AMessage> msg = mNotify->dup();
1189 msg->setInt32("what", kWhatSeekDone);
1190 msg->post();
1191 break;
1192 }
1193
1194 int64_t timeUs;
1195 CHECK(msg->findInt64("time", &timeUs));
1196
1197 mSeekPending = true;
1198
1199 // Disable the access unit timeout until we resumed
1200 // playback again.
1201 mCheckPending = true;
1202 ++mCheckGeneration;
1203
1204 sp<AMessage> reply = new AMessage('see0', this);
1205 reply->setInt64("time", timeUs);
1206
1207 if (mPausing) {
1208 // PAUSE already sent
1209 ALOGI("Pause already sent");
1210 reply->post();
1211 break;
1212 }
1213 AString request = "PAUSE ";
1214 request.append(mControlURL);
1215 request.append(" RTSP/1.0\r\n");
1216
1217 request.append("Session: ");
1218 request.append(mSessionID);
1219 request.append("\r\n");
1220
1221 request.append("\r\n");
1222
1223 mConn->sendRequest(request.c_str(), reply);
1224 break;
1225 }
1226
1227 case 'see0':
1228 {
1229 // Session is paused now.
1230 status_t err = OK;
1231 msg->findInt32("result", &err);
1232
1233 int64_t timeUs;
1234 CHECK(msg->findInt64("time", &timeUs));
1235
1236 sp<AMessage> notify = mNotify->dup();
1237 notify->setInt32("what", kWhatSeekPaused);
1238 notify->setInt32("err", err);
1239 notify->setInt64("time", timeUs);
1240 notify->post();
1241 break;
1242
1243 }
1244
1245 case 'see1':
1246 {
1247 for (size_t i = 0; i < mTracks.size(); ++i) {
1248 TrackInfo *info = &mTracks.editItemAt(i);
1249
1250 postQueueSeekDiscontinuity(i);
1251 info->mEOSReceived = false;
1252
1253 info->mRTPAnchor = 0;
1254 info->mNTPAnchorUs = -1;
1255 }
1256
1257 mAllTracksHaveTime = false;
1258 mNTPAnchorUs = -1;
1259
1260 // Start new timeoutgeneration to avoid getting timeout
1261 // before PLAY response arrive
1262 postTimeout();
1263
1264 int64_t timeUs;
1265 CHECK(msg->findInt64("time", &timeUs));
1266
1267 AString request = "PLAY ";
1268 request.append(mControlURL);
1269 request.append(" RTSP/1.0\r\n");
1270
1271 request.append("Session: ");
1272 request.append(mSessionID);
1273 request.append("\r\n");
1274
1275 request.append(
1276 AStringPrintf(
1277 "Range: npt=%lld-\r\n", timeUs / 1000000ll));
1278
1279 request.append("\r\n");
1280
1281 sp<AMessage> reply = new AMessage('see2', this);
1282 mConn->sendRequest(request.c_str(), reply);
1283 break;
1284 }
1285
1286 case 'see2':
1287 {
1288 if (mTracks.size() == 0) {
1289 // We have already hit abor, break
1290 break;
1291 }
1292
1293 int32_t result;
1294 CHECK(msg->findInt32("result", &result));
1295
1296 ALOGI("PLAY (for seek) completed with result %d (%s)",
1297 result, strerror(-result));
1298
1299 mCheckPending = false;
1300 ++mCheckGeneration;
1301 postAccessUnitTimeoutCheck();
1302
1303 if (result == OK) {
1304 sp<RefBase> obj;
1305 CHECK(msg->findObject("response", &obj));
1306 sp<ARTSPResponse> response =
1307 static_cast<ARTSPResponse *>(obj.get());
1308
1309 if (response->mStatusCode != 200) {
1310 result = UNKNOWN_ERROR;
1311 } else {
1312 parsePlayResponse(response);
1313
1314 // Post new timeout in order to make sure to use
1315 // fake timestamps if no new Sender Reports arrive
1316 postTimeout();
1317
1318 ssize_t i = response->mHeaders.indexOfKey("rtp-info");
1319 CHECK_GE(i, 0);
1320
1321 ALOGV("rtp-info: %s", response->mHeaders.valueAt(i).c_str());
1322
1323 ALOGI("seek completed.");
1324 }
1325 }
1326
1327 if (result != OK) {
1328 ALOGE("seek failed, aborting.");
1329 (new AMessage('abor', this))->post();
1330 }
1331
1332 mPausing = false;
1333 mSeekPending = false;
1334
1335 // Discard all stale access units.
1336 for (size_t i = 0; i < mTracks.size(); ++i) {
1337 TrackInfo *track = &mTracks.editItemAt(i);
1338 track->mPackets.clear();
1339 }
1340
1341 sp<AMessage> msg = mNotify->dup();
1342 msg->setInt32("what", kWhatSeekDone);
1343 msg->post();
1344 break;
1345 }
1346
1347 case 'biny':
1348 {
1349 sp<ABuffer> buffer;
1350 CHECK(msg->findBuffer("buffer", &buffer));
1351
1352 int32_t index;
1353 CHECK(buffer->meta()->findInt32("index", &index));
1354
1355 mRTPConn->injectPacket(index, buffer);
1356 break;
1357 }
1358
1359 case 'tiou':
1360 {
1361 int32_t timeoutGenerationCheck;
1362 CHECK(msg->findInt32("tioucheck", &timeoutGenerationCheck));
1363 if (timeoutGenerationCheck != mCheckTimeoutGeneration) {
1364 // This is an outdated message. Ignore.
1365 // This typically happens if a lot of seeks are
1366 // performed, since new timeout messages now are
1367 // posted at seek as well.
1368 break;
1369 }
1370 if (!mReceivedFirstRTCPPacket) {
1371 if (dataReceivedOnAllChannels() && !mTryFakeRTCP) {
1372 ALOGW("We received RTP packets but no RTCP packets, "
1373 "using fake timestamps.");
1374
1375 mTryFakeRTCP = true;
1376
1377 mReceivedFirstRTCPPacket = true;
1378
1379 fakeTimestamps();
1380 } else if (!mReceivedFirstRTPPacket && !mTryTCPInterleaving) {
1381 ALOGW("Never received any data, switching transports.");
1382
1383 mTryTCPInterleaving = true;
1384
1385 sp<AMessage> msg = new AMessage('abor', this);
1386 msg->setInt32("reconnect", true);
1387 msg->post();
1388 } else {
1389 ALOGW("Never received any data, disconnecting.");
1390 (new AMessage('abor', this))->post();
1391 }
1392 } else {
1393 if (!mAllTracksHaveTime) {
1394 ALOGW("We received some RTCP packets, but time "
1395 "could not be established on all tracks, now "
1396 "using fake timestamps");
1397
1398 fakeTimestamps();
1399 }
1400 }
1401 break;
1402 }
1403
1404 default:
1405 TRESPASS();
1406 break;
1407 }
1408 }
1409
postKeepAliveMyHandler1410 void postKeepAlive() {
1411 sp<AMessage> msg = new AMessage('aliv', this);
1412 msg->setInt32("generation", mKeepAliveGeneration);
1413 msg->post((mKeepAliveTimeoutUs * 9) / 10);
1414 }
1415
cancelAccessUnitTimeoutCheckMyHandler1416 void cancelAccessUnitTimeoutCheck() {
1417 ALOGV("cancelAccessUnitTimeoutCheck");
1418 ++mCheckGeneration;
1419 }
1420
postAccessUnitTimeoutCheckMyHandler1421 void postAccessUnitTimeoutCheck() {
1422 if (mCheckPending) {
1423 return;
1424 }
1425
1426 mCheckPending = true;
1427 sp<AMessage> check = new AMessage('chek', this);
1428 check->setInt32("generation", mCheckGeneration);
1429 check->post(kAccessUnitTimeoutUs);
1430 }
1431
SplitStringMyHandler1432 static void SplitString(
1433 const AString &s, const char *separator, List<AString> *items) {
1434 items->clear();
1435 size_t start = 0;
1436 while (start < s.size()) {
1437 ssize_t offset = s.find(separator, start);
1438
1439 if (offset < 0) {
1440 items->push_back(AString(s, start, s.size() - start));
1441 break;
1442 }
1443
1444 items->push_back(AString(s, start, offset - start));
1445 start = offset + strlen(separator);
1446 }
1447 }
1448
parsePlayResponseMyHandler1449 void parsePlayResponse(const sp<ARTSPResponse> &response) {
1450 mPlayResponseParsed = true;
1451 if (mTracks.size() == 0) {
1452 ALOGV("parsePlayResponse: late packets ignored.");
1453 return;
1454 }
1455
1456 ssize_t i = response->mHeaders.indexOfKey("range");
1457 if (i < 0) {
1458 // Server doesn't even tell use what range it is going to
1459 // play, therefore we won't support seeking.
1460 return;
1461 }
1462
1463 AString range = response->mHeaders.valueAt(i);
1464 ALOGV("Range: %s", range.c_str());
1465
1466 AString val;
1467 CHECK(GetAttribute(range.c_str(), "npt", &val));
1468
1469 float npt1, npt2;
1470 if (!ASessionDescription::parseNTPRange(val.c_str(), &npt1, &npt2)) {
1471 // This is a live stream and therefore not seekable.
1472
1473 ALOGI("This is a live stream");
1474 return;
1475 }
1476
1477 i = response->mHeaders.indexOfKey("rtp-info");
1478 CHECK_GE(i, 0);
1479
1480 AString rtpInfo = response->mHeaders.valueAt(i);
1481 List<AString> streamInfos;
1482 SplitString(rtpInfo, ",", &streamInfos);
1483
1484 int n = 1;
1485 for (List<AString>::iterator it = streamInfos.begin();
1486 it != streamInfos.end(); ++it) {
1487 (*it).trim();
1488 ALOGV("streamInfo[%d] = %s", n, (*it).c_str());
1489
1490 CHECK(GetAttribute((*it).c_str(), "url", &val));
1491
1492 size_t trackIndex = 0;
1493 while (trackIndex < mTracks.size()
1494 && !(val == mTracks.editItemAt(trackIndex).mURL)) {
1495 ++trackIndex;
1496 }
1497 CHECK_LT(trackIndex, mTracks.size());
1498
1499 CHECK(GetAttribute((*it).c_str(), "seq", &val));
1500
1501 char *end;
1502 unsigned long seq = strtoul(val.c_str(), &end, 10);
1503
1504 TrackInfo *info = &mTracks.editItemAt(trackIndex);
1505 info->mFirstSeqNumInSegment = seq;
1506 info->mNewSegment = true;
1507 info->mAllowedStaleAccessUnits = kMaxAllowedStaleAccessUnits;
1508
1509 CHECK(GetAttribute((*it).c_str(), "rtptime", &val));
1510
1511 uint32_t rtpTime = strtoul(val.c_str(), &end, 10);
1512
1513 ALOGV("track #%d: rtpTime=%u <=> npt=%.2f", n, rtpTime, npt1);
1514
1515 info->mNormalPlayTimeRTP = rtpTime;
1516 info->mNormalPlayTimeUs = (int64_t)(npt1 * 1E6);
1517
1518 if (!mFirstAccessUnit) {
1519 postNormalPlayTimeMapping(
1520 trackIndex,
1521 info->mNormalPlayTimeRTP, info->mNormalPlayTimeUs);
1522 }
1523
1524 ++n;
1525 }
1526 }
1527
getTrackFormatMyHandler1528 sp<MetaData> getTrackFormat(size_t index, int32_t *timeScale) {
1529 CHECK_GE(index, 0u);
1530 CHECK_LT(index, mTracks.size());
1531
1532 const TrackInfo &info = mTracks.itemAt(index);
1533
1534 *timeScale = info.mTimeScale;
1535
1536 return info.mPacketSource->getFormat();
1537 }
1538
countTracksMyHandler1539 size_t countTracks() const {
1540 return mTracks.size();
1541 }
1542
1543 private:
1544 struct TrackInfo {
1545 AString mURL;
1546 int mRTPSocket;
1547 int mRTCPSocket;
1548 bool mUsingInterleavedTCP;
1549 uint32_t mFirstSeqNumInSegment;
1550 bool mNewSegment;
1551 int32_t mAllowedStaleAccessUnits;
1552
1553 uint32_t mRTPAnchor;
1554 int64_t mNTPAnchorUs;
1555 int32_t mTimeScale;
1556 bool mEOSReceived;
1557
1558 uint32_t mNormalPlayTimeRTP;
1559 int64_t mNormalPlayTimeUs;
1560
1561 sp<APacketSource> mPacketSource;
1562
1563 // Stores packets temporarily while no notion of time
1564 // has been established yet.
1565 List<sp<ABuffer> > mPackets;
1566 };
1567
1568 sp<AMessage> mNotify;
1569 bool mUIDValid;
1570 uid_t mUID;
1571 sp<ALooper> mNetLooper;
1572 sp<ARTSPConnection> mConn;
1573 sp<ARTPConnection> mRTPConn;
1574 sp<ASessionDescription> mSessionDesc;
1575 AString mOriginalSessionURL; // This one still has user:pass@
1576 AString mSessionURL;
1577 AString mSessionHost;
1578 AString mBaseURL;
1579 AString mControlURL;
1580 AString mSessionID;
1581 bool mSetupTracksSuccessful;
1582 bool mSeekPending;
1583 bool mFirstAccessUnit;
1584
1585 bool mAllTracksHaveTime;
1586 int64_t mNTPAnchorUs;
1587 int64_t mMediaAnchorUs;
1588 int64_t mLastMediaTimeUs;
1589
1590 int64_t mNumAccessUnitsReceived;
1591 bool mCheckPending;
1592 int32_t mCheckGeneration;
1593 int32_t mCheckTimeoutGeneration;
1594 bool mTryTCPInterleaving;
1595 bool mTryFakeRTCP;
1596 bool mReceivedFirstRTCPPacket;
1597 bool mReceivedFirstRTPPacket;
1598 bool mSeekable;
1599 int64_t mKeepAliveTimeoutUs;
1600 int32_t mKeepAliveGeneration;
1601 bool mPausing;
1602 int32_t mPauseGeneration;
1603
1604 Vector<TrackInfo> mTracks;
1605
1606 bool mPlayResponseParsed;
1607
setupTrackMyHandler1608 void setupTrack(size_t index) {
1609 sp<APacketSource> source =
1610 new APacketSource(mSessionDesc, index);
1611
1612 if (source->initCheck() != OK) {
1613 ALOGW("Unsupported format. Ignoring track #%zu.", index);
1614
1615 sp<AMessage> reply = new AMessage('setu', this);
1616 reply->setSize("index", index);
1617 reply->setInt32("result", ERROR_UNSUPPORTED);
1618 reply->post();
1619 return;
1620 }
1621
1622 AString url;
1623 CHECK(mSessionDesc->findAttribute(index, "a=control", &url));
1624
1625 AString trackURL;
1626 CHECK(MakeURL(mBaseURL.c_str(), url.c_str(), &trackURL));
1627
1628 mTracks.push(TrackInfo());
1629 TrackInfo *info = &mTracks.editItemAt(mTracks.size() - 1);
1630 info->mURL = trackURL;
1631 info->mPacketSource = source;
1632 info->mUsingInterleavedTCP = false;
1633 info->mFirstSeqNumInSegment = 0;
1634 info->mNewSegment = true;
1635 info->mAllowedStaleAccessUnits = kMaxAllowedStaleAccessUnits;
1636 info->mRTPSocket = -1;
1637 info->mRTCPSocket = -1;
1638 info->mRTPAnchor = 0;
1639 info->mNTPAnchorUs = -1;
1640 info->mNormalPlayTimeRTP = 0;
1641 info->mNormalPlayTimeUs = 0ll;
1642
1643 unsigned long PT;
1644 AString formatDesc;
1645 AString formatParams;
1646 mSessionDesc->getFormatType(index, &PT, &formatDesc, &formatParams);
1647
1648 int32_t timescale;
1649 int32_t numChannels;
1650 ASessionDescription::ParseFormatDesc(
1651 formatDesc.c_str(), ×cale, &numChannels);
1652
1653 info->mTimeScale = timescale;
1654 info->mEOSReceived = false;
1655
1656 ALOGV("track #%zu URL=%s", mTracks.size(), trackURL.c_str());
1657
1658 AString request = "SETUP ";
1659 request.append(trackURL);
1660 request.append(" RTSP/1.0\r\n");
1661
1662 if (mTryTCPInterleaving) {
1663 size_t interleaveIndex = 2 * (mTracks.size() - 1);
1664 info->mUsingInterleavedTCP = true;
1665 info->mRTPSocket = interleaveIndex;
1666 info->mRTCPSocket = interleaveIndex + 1;
1667
1668 request.append("Transport: RTP/AVP/TCP;interleaved=");
1669 request.append(interleaveIndex);
1670 request.append("-");
1671 request.append(interleaveIndex + 1);
1672 } else {
1673 unsigned rtpPort;
1674 ARTPConnection::MakePortPair(
1675 &info->mRTPSocket, &info->mRTCPSocket, &rtpPort);
1676
1677 if (mUIDValid) {
1678 NetworkUtils::RegisterSocketUserTag(info->mRTPSocket, mUID,
1679 (uint32_t)*(uint32_t*) "RTP_");
1680 NetworkUtils::RegisterSocketUserTag(info->mRTCPSocket, mUID,
1681 (uint32_t)*(uint32_t*) "RTP_");
1682 NetworkUtils::RegisterSocketUserMark(info->mRTPSocket, mUID);
1683 NetworkUtils::RegisterSocketUserMark(info->mRTCPSocket, mUID);
1684 }
1685
1686 request.append("Transport: RTP/AVP/UDP;unicast;client_port=");
1687 request.append(rtpPort);
1688 request.append("-");
1689 request.append(rtpPort + 1);
1690 }
1691
1692 request.append("\r\n");
1693
1694 if (index > 1) {
1695 request.append("Session: ");
1696 request.append(mSessionID);
1697 request.append("\r\n");
1698 }
1699
1700 request.append("\r\n");
1701
1702 sp<AMessage> reply = new AMessage('setu', this);
1703 reply->setSize("index", index);
1704 reply->setSize("track-index", mTracks.size() - 1);
1705 mConn->sendRequest(request.c_str(), reply);
1706 }
1707
MakeURLMyHandler1708 static bool MakeURL(const char *baseURL, const char *url, AString *out) {
1709 out->clear();
1710
1711 if (strncasecmp("rtsp://", baseURL, 7)) {
1712 // Base URL must be absolute
1713 return false;
1714 }
1715
1716 if (!strncasecmp("rtsp://", url, 7)) {
1717 // "url" is already an absolute URL, ignore base URL.
1718 out->setTo(url);
1719 return true;
1720 }
1721
1722 size_t n = strlen(baseURL);
1723 out->setTo(baseURL);
1724 if (baseURL[n - 1] != '/') {
1725 out->append("/");
1726 }
1727 out->append(url);
1728
1729 return true;
1730 }
1731
fakeTimestampsMyHandler1732 void fakeTimestamps() {
1733 mNTPAnchorUs = -1ll;
1734 for (size_t i = 0; i < mTracks.size(); ++i) {
1735 onTimeUpdate(i, 0, 0ll);
1736 }
1737 }
1738
dataReceivedOnAllChannelsMyHandler1739 bool dataReceivedOnAllChannels() {
1740 TrackInfo *track;
1741 for (size_t i = 0; i < mTracks.size(); ++i) {
1742 track = &mTracks.editItemAt(i);
1743 if (track->mPackets.empty()) {
1744 return false;
1745 }
1746 }
1747 return true;
1748 }
1749
handleFirstAccessUnitMyHandler1750 void handleFirstAccessUnit() {
1751 if (mFirstAccessUnit) {
1752 sp<AMessage> msg = mNotify->dup();
1753 msg->setInt32("what", kWhatConnected);
1754 msg->post();
1755
1756 if (mSeekable) {
1757 for (size_t i = 0; i < mTracks.size(); ++i) {
1758 TrackInfo *info = &mTracks.editItemAt(i);
1759
1760 postNormalPlayTimeMapping(
1761 i,
1762 info->mNormalPlayTimeRTP, info->mNormalPlayTimeUs);
1763 }
1764 }
1765
1766 mFirstAccessUnit = false;
1767 }
1768 }
1769
onTimeUpdateMyHandler1770 void onTimeUpdate(int32_t trackIndex, uint32_t rtpTime, uint64_t ntpTime) {
1771 ALOGV("onTimeUpdate track %d, rtpTime = 0x%08x, ntpTime = %#016llx",
1772 trackIndex, rtpTime, (long long)ntpTime);
1773
1774 int64_t ntpTimeUs = (int64_t)(ntpTime * 1E6 / (1ll << 32));
1775
1776 TrackInfo *track = &mTracks.editItemAt(trackIndex);
1777
1778 track->mRTPAnchor = rtpTime;
1779 track->mNTPAnchorUs = ntpTimeUs;
1780
1781 if (mNTPAnchorUs < 0) {
1782 mNTPAnchorUs = ntpTimeUs;
1783 mMediaAnchorUs = mLastMediaTimeUs;
1784 }
1785
1786 if (!mAllTracksHaveTime) {
1787 bool allTracksHaveTime = (mTracks.size() > 0);
1788 for (size_t i = 0; i < mTracks.size(); ++i) {
1789 TrackInfo *track = &mTracks.editItemAt(i);
1790 if (track->mNTPAnchorUs < 0) {
1791 allTracksHaveTime = false;
1792 break;
1793 }
1794 }
1795 if (allTracksHaveTime) {
1796 mAllTracksHaveTime = true;
1797 ALOGI("Time now established for all tracks.");
1798 }
1799 }
1800 if (mAllTracksHaveTime && dataReceivedOnAllChannels()) {
1801 handleFirstAccessUnit();
1802
1803 // Time is now established, lets start timestamping immediately
1804 for (size_t i = 0; i < mTracks.size(); ++i) {
1805 if (OK != processAccessUnitQueue(i)) {
1806 return;
1807 }
1808 }
1809 for (size_t i = 0; i < mTracks.size(); ++i) {
1810 TrackInfo *trackInfo = &mTracks.editItemAt(i);
1811 if (trackInfo->mEOSReceived) {
1812 postQueueEOS(i, ERROR_END_OF_STREAM);
1813 trackInfo->mEOSReceived = false;
1814 }
1815 }
1816 }
1817 }
1818
processAccessUnitQueueMyHandler1819 status_t processAccessUnitQueue(int32_t trackIndex) {
1820 TrackInfo *track = &mTracks.editItemAt(trackIndex);
1821 while (!track->mPackets.empty()) {
1822 sp<ABuffer> accessUnit = *track->mPackets.begin();
1823 track->mPackets.erase(track->mPackets.begin());
1824
1825 uint32_t seqNum = (uint32_t)accessUnit->int32Data();
1826 if (track->mNewSegment) {
1827 // The sequence number from RTP packet has only 16 bits and is extended
1828 // by ARTPSource. Only the low 16 bits of seq in RTP-Info of reply of
1829 // RTSP "PLAY" command should be used to detect the first RTP packet
1830 // after seeking.
1831 if (mSeekable) {
1832 if (track->mAllowedStaleAccessUnits > 0) {
1833 uint32_t seqNum16 = seqNum & 0xffff;
1834 uint32_t firstSeqNumInSegment16 = track->mFirstSeqNumInSegment & 0xffff;
1835 if (seqNum16 > firstSeqNumInSegment16 + kMaxAllowedStaleAccessUnits
1836 || seqNum16 < firstSeqNumInSegment16) {
1837 // Not the first rtp packet of the stream after seeking, discarding.
1838 track->mAllowedStaleAccessUnits--;
1839 ALOGV("discarding stale access unit (0x%x : 0x%x)",
1840 seqNum, track->mFirstSeqNumInSegment);
1841 continue;
1842 }
1843 ALOGW_IF(seqNum16 != firstSeqNumInSegment16,
1844 "Missing the first packet(%u), now take packet(%u) as first one",
1845 track->mFirstSeqNumInSegment, seqNum);
1846 } else { // track->mAllowedStaleAccessUnits <= 0
1847 mNumAccessUnitsReceived = 0;
1848 ALOGW_IF(track->mAllowedStaleAccessUnits == 0,
1849 "Still no first rtp packet after %d stale ones",
1850 kMaxAllowedStaleAccessUnits);
1851 track->mAllowedStaleAccessUnits = -1;
1852 return UNKNOWN_ERROR;
1853 }
1854 }
1855
1856 // Now found the first rtp packet of the stream after seeking.
1857 track->mFirstSeqNumInSegment = seqNum;
1858 track->mNewSegment = false;
1859 }
1860
1861 if (seqNum < track->mFirstSeqNumInSegment) {
1862 ALOGV("dropping stale access-unit (%d < %d)",
1863 seqNum, track->mFirstSeqNumInSegment);
1864 continue;
1865 }
1866
1867 if (addMediaTimestamp(trackIndex, track, accessUnit)) {
1868 postQueueAccessUnit(trackIndex, accessUnit);
1869 }
1870 }
1871 return OK;
1872 }
1873
onAccessUnitCompleteMyHandler1874 void onAccessUnitComplete(
1875 int32_t trackIndex, const sp<ABuffer> &accessUnit) {
1876 TrackInfo *track = &mTracks.editItemAt(trackIndex);
1877 track->mPackets.push_back(accessUnit);
1878
1879 uint32_t seqNum = (uint32_t)accessUnit->int32Data();
1880 ALOGV("onAccessUnitComplete track %d storing accessunit %u", trackIndex, seqNum);
1881
1882 if(!mPlayResponseParsed){
1883 ALOGV("play response is not parsed");
1884 return;
1885 }
1886
1887 handleFirstAccessUnit();
1888
1889 if (!mAllTracksHaveTime) {
1890 ALOGV("storing accessUnit, no time established yet");
1891 return;
1892 }
1893
1894 if (OK != processAccessUnitQueue(trackIndex)) {
1895 return;
1896 }
1897
1898 if (track->mEOSReceived) {
1899 postQueueEOS(trackIndex, ERROR_END_OF_STREAM);
1900 track->mEOSReceived = false;
1901 }
1902 }
1903
addMediaTimestampMyHandler1904 bool addMediaTimestamp(
1905 int32_t trackIndex, const TrackInfo *track,
1906 const sp<ABuffer> &accessUnit) {
1907 UNUSED_UNLESS_VERBOSE(trackIndex);
1908
1909 uint32_t rtpTime;
1910 CHECK(accessUnit->meta()->findInt32(
1911 "rtp-time", (int32_t *)&rtpTime));
1912
1913 int64_t relRtpTimeUs =
1914 (((int64_t)rtpTime - (int64_t)track->mRTPAnchor) * 1000000ll)
1915 / track->mTimeScale;
1916
1917 int64_t ntpTimeUs = track->mNTPAnchorUs + relRtpTimeUs;
1918
1919 int64_t mediaTimeUs = mMediaAnchorUs + ntpTimeUs - mNTPAnchorUs;
1920
1921 if (mediaTimeUs > mLastMediaTimeUs) {
1922 mLastMediaTimeUs = mediaTimeUs;
1923 }
1924
1925 if (mediaTimeUs < 0 && !mSeekable) {
1926 ALOGV("dropping early accessUnit.");
1927 return false;
1928 }
1929
1930 ALOGV("track %d rtpTime=%u mediaTimeUs = %lld us (%.2f secs)",
1931 trackIndex, rtpTime, (long long)mediaTimeUs, mediaTimeUs / 1E6);
1932
1933 accessUnit->meta()->setInt64("timeUs", mediaTimeUs);
1934
1935 return true;
1936 }
1937
postQueueAccessUnitMyHandler1938 void postQueueAccessUnit(
1939 size_t trackIndex, const sp<ABuffer> &accessUnit) {
1940 sp<AMessage> msg = mNotify->dup();
1941 msg->setInt32("what", kWhatAccessUnit);
1942 msg->setSize("trackIndex", trackIndex);
1943 msg->setBuffer("accessUnit", accessUnit);
1944 msg->post();
1945 }
1946
postQueueEOSMyHandler1947 void postQueueEOS(size_t trackIndex, status_t finalResult) {
1948 sp<AMessage> msg = mNotify->dup();
1949 msg->setInt32("what", kWhatEOS);
1950 msg->setSize("trackIndex", trackIndex);
1951 msg->setInt32("finalResult", finalResult);
1952 msg->post();
1953 }
1954
postQueueSeekDiscontinuityMyHandler1955 void postQueueSeekDiscontinuity(size_t trackIndex) {
1956 sp<AMessage> msg = mNotify->dup();
1957 msg->setInt32("what", kWhatSeekDiscontinuity);
1958 msg->setSize("trackIndex", trackIndex);
1959 msg->post();
1960 }
1961
postNormalPlayTimeMappingMyHandler1962 void postNormalPlayTimeMapping(
1963 size_t trackIndex, uint32_t rtpTime, int64_t nptUs) {
1964 sp<AMessage> msg = mNotify->dup();
1965 msg->setInt32("what", kWhatNormalPlayTimeMapping);
1966 msg->setSize("trackIndex", trackIndex);
1967 msg->setInt32("rtpTime", rtpTime);
1968 msg->setInt64("nptUs", nptUs);
1969 msg->post();
1970 }
1971
postTimeoutMyHandler1972 void postTimeout() {
1973 sp<AMessage> timeout = new AMessage('tiou', this);
1974 mCheckTimeoutGeneration++;
1975 timeout->setInt32("tioucheck", mCheckTimeoutGeneration);
1976
1977 int64_t startupTimeoutUs;
1978 startupTimeoutUs = property_get_int64("media.rtsp.timeout-us", kStartupTimeoutUs);
1979 timeout->post(startupTimeoutUs);
1980 }
1981
1982 DISALLOW_EVIL_CONSTRUCTORS(MyHandler);
1983 };
1984
1985 } // namespace android
1986
1987 #endif // MY_HANDLER_H_
1988