1 /*
2 * Copyright (C) 2010 The Android Open Source Project
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
17 #ifndef MY_HANDLER_H_
18
19 #define MY_HANDLER_H_
20
21 //#define LOG_NDEBUG 0
22
23 #ifndef LOG_TAG
24 #define LOG_TAG "MyHandler"
25 #endif
26
27 #include <utils/Log.h>
28 #include <cutils/properties.h> // for property_get
29
30 #include "APacketSource.h"
31 #include "ARTPConnection.h"
32 #include "ARTSPConnection.h"
33 #include "ASessionDescription.h"
34
35 #include <ctype.h>
36
37 #include <media/stagefright/foundation/ABuffer.h>
38 #include <media/stagefright/foundation/ADebug.h>
39 #include <media/stagefright/foundation/ALooper.h>
40 #include <media/stagefright/foundation/AMessage.h>
41 #include <media/stagefright/MediaErrors.h>
42 #include <media/stagefright/Utils.h>
43
44 #include <arpa/inet.h>
45 #include <sys/socket.h>
46 #include <netdb.h>
47
48 #include "HTTPBase.h"
49
50 #if LOG_NDEBUG
51 #define UNUSED_UNLESS_VERBOSE(x) (void)(x)
52 #else
53 #define UNUSED_UNLESS_VERBOSE(x)
54 #endif
55
56 // If no access units are received within 5 secs, assume that the rtp
57 // stream has ended and signal end of stream.
58 static int64_t kAccessUnitTimeoutUs = 10000000ll;
59
60 // If no access units arrive for the first 10 secs after starting the
61 // stream, assume none ever will and signal EOS or switch transports.
62 static int64_t kStartupTimeoutUs = 10000000ll;
63
64 static int64_t kDefaultKeepAliveTimeoutUs = 60000000ll;
65
66 static int64_t kPauseDelayUs = 3000000ll;
67
68 // The allowed maximum number of stale access units at the beginning of
69 // a new sequence.
70 static int32_t kMaxAllowedStaleAccessUnits = 20;
71
72 namespace android {
73
GetAttribute(const char * s,const char * key,AString * value)74 static bool GetAttribute(const char *s, const char *key, AString *value) {
75 value->clear();
76
77 size_t keyLen = strlen(key);
78
79 for (;;) {
80 while (isspace(*s)) {
81 ++s;
82 }
83
84 const char *colonPos = strchr(s, ';');
85
86 size_t len =
87 (colonPos == NULL) ? strlen(s) : colonPos - s;
88
89 if (len >= keyLen + 1 && s[keyLen] == '=' && !strncmp(s, key, keyLen)) {
90 value->setTo(&s[keyLen + 1], len - keyLen - 1);
91 return true;
92 }
93
94 if (colonPos == NULL) {
95 return false;
96 }
97
98 s = colonPos + 1;
99 }
100 }
101
102 struct MyHandler : public AHandler {
103 enum {
104 kWhatConnected = 'conn',
105 kWhatDisconnected = 'disc',
106 kWhatSeekPaused = 'spau',
107 kWhatSeekDone = 'sdon',
108
109 kWhatAccessUnit = 'accU',
110 kWhatEOS = 'eos!',
111 kWhatSeekDiscontinuity = 'seeD',
112 kWhatNormalPlayTimeMapping = 'nptM',
113 };
114
115 MyHandler(
116 const char *url,
117 const sp<AMessage> ¬ify,
118 bool uidValid = false, uid_t uid = 0)
mNotifyMyHandler119 : mNotify(notify),
120 mUIDValid(uidValid),
121 mUID(uid),
122 mNetLooper(new ALooper),
123 mConn(new ARTSPConnection(mUIDValid, mUID)),
124 mRTPConn(new ARTPConnection),
125 mOriginalSessionURL(url),
126 mSessionURL(url),
127 mSetupTracksSuccessful(false),
128 mSeekPending(false),
129 mFirstAccessUnit(true),
130 mAllTracksHaveTime(false),
131 mNTPAnchorUs(-1),
132 mMediaAnchorUs(-1),
133 mLastMediaTimeUs(0),
134 mNumAccessUnitsReceived(0),
135 mCheckPending(false),
136 mCheckGeneration(0),
137 mCheckTimeoutGeneration(0),
138 mTryTCPInterleaving(false),
139 mTryFakeRTCP(false),
140 mReceivedFirstRTCPPacket(false),
141 mReceivedFirstRTPPacket(false),
142 mSeekable(true),
143 mKeepAliveTimeoutUs(kDefaultKeepAliveTimeoutUs),
144 mKeepAliveGeneration(0),
145 mPausing(false),
146 mPauseGeneration(0),
147 mPlayResponseParsed(false) {
148 mNetLooper->setName("rtsp net");
149 mNetLooper->start(false /* runOnCallingThread */,
150 false /* canCallJava */,
151 PRIORITY_HIGHEST);
152
153 // Strip any authentication info from the session url, we don't
154 // want to transmit user/pass in cleartext.
155 AString host, path, user, pass;
156 unsigned port;
157 CHECK(ARTSPConnection::ParseURL(
158 mSessionURL.c_str(), &host, &port, &path, &user, &pass));
159
160 if (user.size() > 0) {
161 mSessionURL.clear();
162 mSessionURL.append("rtsp://");
163 mSessionURL.append(host);
164 mSessionURL.append(":");
165 mSessionURL.append(AStringPrintf("%u", port));
166 mSessionURL.append(path);
167
168 ALOGV("rewritten session url: '%s'", mSessionURL.c_str());
169 }
170
171 mSessionHost = host;
172 }
173
connectMyHandler174 void connect() {
175 looper()->registerHandler(mConn);
176 (1 ? mNetLooper : looper())->registerHandler(mRTPConn);
177
178 sp<AMessage> notify = new AMessage('biny', this);
179 mConn->observeBinaryData(notify);
180
181 sp<AMessage> reply = new AMessage('conn', this);
182 mConn->connect(mOriginalSessionURL.c_str(), reply);
183 }
184
loadSDPMyHandler185 void loadSDP(const sp<ASessionDescription>& desc) {
186 looper()->registerHandler(mConn);
187 (1 ? mNetLooper : looper())->registerHandler(mRTPConn);
188
189 sp<AMessage> notify = new AMessage('biny', this);
190 mConn->observeBinaryData(notify);
191
192 sp<AMessage> reply = new AMessage('sdpl', this);
193 reply->setObject("description", desc);
194 mConn->connect(mOriginalSessionURL.c_str(), reply);
195 }
196
getControlURLMyHandler197 AString getControlURL() {
198 AString sessionLevelControlURL;
199 if (mSessionDesc->findAttribute(
200 0,
201 "a=control",
202 &sessionLevelControlURL)) {
203 if (sessionLevelControlURL.compare("*") == 0) {
204 return mBaseURL;
205 } else {
206 AString controlURL;
207 CHECK(MakeURL(
208 mBaseURL.c_str(),
209 sessionLevelControlURL.c_str(),
210 &controlURL));
211 return controlURL;
212 }
213 } else {
214 return mSessionURL;
215 }
216 }
217
disconnectMyHandler218 void disconnect() {
219 (new AMessage('abor', this))->post();
220 }
221
seekMyHandler222 void seek(int64_t timeUs) {
223 sp<AMessage> msg = new AMessage('seek', this);
224 msg->setInt64("time", timeUs);
225 mPauseGeneration++;
226 msg->post();
227 }
228
continueSeekAfterPauseMyHandler229 void continueSeekAfterPause(int64_t timeUs) {
230 sp<AMessage> msg = new AMessage('see1', this);
231 msg->setInt64("time", timeUs);
232 msg->post();
233 }
234
isSeekableMyHandler235 bool isSeekable() const {
236 return mSeekable;
237 }
238
pauseMyHandler239 void pause() {
240 sp<AMessage> msg = new AMessage('paus', this);
241 mPauseGeneration++;
242 msg->setInt32("pausecheck", mPauseGeneration);
243 msg->post();
244 }
245
resumeMyHandler246 void resume() {
247 sp<AMessage> msg = new AMessage('resu', this);
248 mPauseGeneration++;
249 msg->post();
250 }
251
addRRMyHandler252 static void addRR(const sp<ABuffer> &buf) {
253 uint8_t *ptr = buf->data() + buf->size();
254 ptr[0] = 0x80 | 0;
255 ptr[1] = 201; // RR
256 ptr[2] = 0;
257 ptr[3] = 1;
258 ptr[4] = 0xde; // SSRC
259 ptr[5] = 0xad;
260 ptr[6] = 0xbe;
261 ptr[7] = 0xef;
262
263 buf->setRange(0, buf->size() + 8);
264 }
265
addSDESMyHandler266 static void addSDES(int s, const sp<ABuffer> &buffer) {
267 struct sockaddr_in addr;
268 socklen_t addrSize = sizeof(addr);
269 if (getsockname(s, (sockaddr *)&addr, &addrSize) != 0) {
270 inet_aton("0.0.0.0", &(addr.sin_addr));
271 }
272
273 uint8_t *data = buffer->data() + buffer->size();
274 data[0] = 0x80 | 1;
275 data[1] = 202; // SDES
276 data[4] = 0xde; // SSRC
277 data[5] = 0xad;
278 data[6] = 0xbe;
279 data[7] = 0xef;
280
281 size_t offset = 8;
282
283 data[offset++] = 1; // CNAME
284
285 AString cname = "stagefright@";
286 cname.append(inet_ntoa(addr.sin_addr));
287 data[offset++] = cname.size();
288
289 memcpy(&data[offset], cname.c_str(), cname.size());
290 offset += cname.size();
291
292 data[offset++] = 6; // TOOL
293
294 AString tool = MakeUserAgent();
295
296 data[offset++] = tool.size();
297
298 memcpy(&data[offset], tool.c_str(), tool.size());
299 offset += tool.size();
300
301 data[offset++] = 0;
302
303 if ((offset % 4) > 0) {
304 size_t count = 4 - (offset % 4);
305 switch (count) {
306 case 3:
307 data[offset++] = 0;
308 case 2:
309 data[offset++] = 0;
310 case 1:
311 data[offset++] = 0;
312 }
313 }
314
315 size_t numWords = (offset / 4) - 1;
316 data[2] = numWords >> 8;
317 data[3] = numWords & 0xff;
318
319 buffer->setRange(buffer->offset(), buffer->size() + offset);
320 }
321
322 // In case we're behind NAT, fire off two UDP packets to the remote
323 // rtp/rtcp ports to poke a hole into the firewall for future incoming
324 // packets. We're going to send an RR/SDES RTCP packet to both of them.
pokeAHoleMyHandler325 bool pokeAHole(int rtpSocket, int rtcpSocket, const AString &transport) {
326 struct sockaddr_in addr;
327 memset(addr.sin_zero, 0, sizeof(addr.sin_zero));
328 addr.sin_family = AF_INET;
329
330 AString source;
331 AString server_port;
332 if (!GetAttribute(transport.c_str(),
333 "source",
334 &source)) {
335 ALOGW("Missing 'source' field in Transport response. Using "
336 "RTSP endpoint address.");
337
338 struct hostent *ent = gethostbyname(mSessionHost.c_str());
339 if (ent == NULL) {
340 ALOGE("Failed to look up address of session host '%s'",
341 mSessionHost.c_str());
342
343 return false;
344 }
345
346 addr.sin_addr.s_addr = *(in_addr_t *)ent->h_addr;
347 } else {
348 addr.sin_addr.s_addr = inet_addr(source.c_str());
349 }
350
351 if (!GetAttribute(transport.c_str(),
352 "server_port",
353 &server_port)) {
354 ALOGI("Missing 'server_port' field in Transport response.");
355 return false;
356 }
357
358 int rtpPort, rtcpPort;
359 if (sscanf(server_port.c_str(), "%d-%d", &rtpPort, &rtcpPort) != 2
360 || rtpPort <= 0 || rtpPort > 65535
361 || rtcpPort <=0 || rtcpPort > 65535
362 || rtcpPort != rtpPort + 1) {
363 ALOGE("Server picked invalid RTP/RTCP port pair %s,"
364 " RTP port must be even, RTCP port must be one higher.",
365 server_port.c_str());
366
367 return false;
368 }
369
370 if (rtpPort & 1) {
371 ALOGW("Server picked an odd RTP port, it should've picked an "
372 "even one, we'll let it pass for now, but this may break "
373 "in the future.");
374 }
375
376 if (addr.sin_addr.s_addr == INADDR_NONE) {
377 return true;
378 }
379
380 if (IN_LOOPBACK(ntohl(addr.sin_addr.s_addr))) {
381 // No firewalls to traverse on the loopback interface.
382 return true;
383 }
384
385 // Make up an RR/SDES RTCP packet.
386 sp<ABuffer> buf = new ABuffer(65536);
387 buf->setRange(0, 0);
388 addRR(buf);
389 addSDES(rtpSocket, buf);
390
391 addr.sin_port = htons(rtpPort);
392
393 ssize_t n = sendto(
394 rtpSocket, buf->data(), buf->size(), 0,
395 (const sockaddr *)&addr, sizeof(addr));
396
397 if (n < (ssize_t)buf->size()) {
398 ALOGE("failed to poke a hole for RTP packets");
399 return false;
400 }
401
402 addr.sin_port = htons(rtcpPort);
403
404 n = sendto(
405 rtcpSocket, buf->data(), buf->size(), 0,
406 (const sockaddr *)&addr, sizeof(addr));
407
408 if (n < (ssize_t)buf->size()) {
409 ALOGE("failed to poke a hole for RTCP packets");
410 return false;
411 }
412
413 ALOGV("successfully poked holes.");
414
415 return true;
416 }
417
isLiveStreamMyHandler418 static bool isLiveStream(const sp<ASessionDescription> &desc) {
419 AString attrLiveStream;
420 if (desc->findAttribute(0, "a=LiveStream", &attrLiveStream)) {
421 ssize_t semicolonPos = attrLiveStream.find(";", 2);
422
423 const char* liveStreamValue;
424 if (semicolonPos < 0) {
425 liveStreamValue = attrLiveStream.c_str();
426 } else {
427 AString valString;
428 valString.setTo(attrLiveStream,
429 semicolonPos + 1,
430 attrLiveStream.size() - semicolonPos - 1);
431 liveStreamValue = valString.c_str();
432 }
433
434 uint32_t value = strtoul(liveStreamValue, NULL, 10);
435 if (value == 1) {
436 ALOGV("found live stream");
437 return true;
438 }
439 } else {
440 // It is a live stream if no duration is returned
441 int64_t durationUs;
442 if (!desc->getDurationUs(&durationUs)) {
443 ALOGV("No duration found, assume live stream");
444 return true;
445 }
446 }
447
448 return false;
449 }
450
onMessageReceivedMyHandler451 virtual void onMessageReceived(const sp<AMessage> &msg) {
452 switch (msg->what()) {
453 case 'conn':
454 {
455 int32_t result;
456 CHECK(msg->findInt32("result", &result));
457
458 ALOGI("connection request completed with result %d (%s)",
459 result, strerror(-result));
460
461 if (result == OK) {
462 AString request;
463 request = "DESCRIBE ";
464 request.append(mSessionURL);
465 request.append(" RTSP/1.0\r\n");
466 request.append("Accept: application/sdp\r\n");
467 request.append("\r\n");
468
469 sp<AMessage> reply = new AMessage('desc', this);
470 mConn->sendRequest(request.c_str(), reply);
471 } else {
472 (new AMessage('disc', this))->post();
473 }
474 break;
475 }
476
477 case 'disc':
478 {
479 ++mKeepAliveGeneration;
480
481 int32_t reconnect;
482 if (msg->findInt32("reconnect", &reconnect) && reconnect) {
483 sp<AMessage> reply = new AMessage('conn', this);
484 mConn->connect(mOriginalSessionURL.c_str(), reply);
485 } else {
486 (new AMessage('quit', this))->post();
487 }
488 break;
489 }
490
491 case 'desc':
492 {
493 int32_t result;
494 CHECK(msg->findInt32("result", &result));
495
496 ALOGI("DESCRIBE completed with result %d (%s)",
497 result, strerror(-result));
498
499 if (result == OK) {
500 sp<RefBase> obj;
501 CHECK(msg->findObject("response", &obj));
502 sp<ARTSPResponse> response =
503 static_cast<ARTSPResponse *>(obj.get());
504
505 if (response->mStatusCode == 301 || response->mStatusCode == 302) {
506 ssize_t i = response->mHeaders.indexOfKey("location");
507 CHECK_GE(i, 0);
508
509 mOriginalSessionURL = response->mHeaders.valueAt(i);
510 mSessionURL = mOriginalSessionURL;
511
512 // Strip any authentication info from the session url, we don't
513 // want to transmit user/pass in cleartext.
514 AString host, path, user, pass;
515 unsigned port;
516 if (ARTSPConnection::ParseURL(
517 mSessionURL.c_str(), &host, &port, &path, &user, &pass)
518 && user.size() > 0) {
519 mSessionURL.clear();
520 mSessionURL.append("rtsp://");
521 mSessionURL.append(host);
522 mSessionURL.append(":");
523 mSessionURL.append(AStringPrintf("%u", port));
524 mSessionURL.append(path);
525
526 ALOGI("rewritten session url: '%s'", mSessionURL.c_str());
527 }
528
529 sp<AMessage> reply = new AMessage('conn', this);
530 mConn->connect(mOriginalSessionURL.c_str(), reply);
531 break;
532 }
533
534 if (response->mStatusCode != 200) {
535 result = UNKNOWN_ERROR;
536 } else if (response->mContent == NULL) {
537 result = ERROR_MALFORMED;
538 ALOGE("The response has no content.");
539 } else {
540 mSessionDesc = new ASessionDescription;
541
542 mSessionDesc->setTo(
543 response->mContent->data(),
544 response->mContent->size());
545
546 if (!mSessionDesc->isValid()) {
547 ALOGE("Failed to parse session description.");
548 result = ERROR_MALFORMED;
549 } else {
550 ssize_t i = response->mHeaders.indexOfKey("content-base");
551 if (i >= 0) {
552 mBaseURL = response->mHeaders.valueAt(i);
553 } else {
554 i = response->mHeaders.indexOfKey("content-location");
555 if (i >= 0) {
556 mBaseURL = response->mHeaders.valueAt(i);
557 } else {
558 mBaseURL = mSessionURL;
559 }
560 }
561
562 mSeekable = !isLiveStream(mSessionDesc);
563
564 if (!mBaseURL.startsWith("rtsp://")) {
565 // Some misbehaving servers specify a relative
566 // URL in one of the locations above, combine
567 // it with the absolute session URL to get
568 // something usable...
569
570 ALOGW("Server specified a non-absolute base URL"
571 ", combining it with the session URL to "
572 "get something usable...");
573
574 AString tmp;
575 CHECK(MakeURL(
576 mSessionURL.c_str(),
577 mBaseURL.c_str(),
578 &tmp));
579
580 mBaseURL = tmp;
581 }
582
583 mControlURL = getControlURL();
584
585 if (mSessionDesc->countTracks() < 2) {
586 // There's no actual tracks in this session.
587 // The first "track" is merely session meta
588 // data.
589
590 ALOGW("Session doesn't contain any playable "
591 "tracks. Aborting.");
592 result = ERROR_UNSUPPORTED;
593 } else {
594 setupTrack(1);
595 }
596 }
597 }
598 }
599
600 if (result != OK) {
601 sp<AMessage> reply = new AMessage('disc', this);
602 mConn->disconnect(reply);
603 }
604 break;
605 }
606
607 case 'sdpl':
608 {
609 int32_t result;
610 CHECK(msg->findInt32("result", &result));
611
612 ALOGI("SDP connection request completed with result %d (%s)",
613 result, strerror(-result));
614
615 if (result == OK) {
616 sp<RefBase> obj;
617 CHECK(msg->findObject("description", &obj));
618 mSessionDesc =
619 static_cast<ASessionDescription *>(obj.get());
620
621 if (!mSessionDesc->isValid()) {
622 ALOGE("Failed to parse session description.");
623 result = ERROR_MALFORMED;
624 } else {
625 mBaseURL = mSessionURL;
626
627 mSeekable = !isLiveStream(mSessionDesc);
628
629 mControlURL = getControlURL();
630
631 if (mSessionDesc->countTracks() < 2) {
632 // There's no actual tracks in this session.
633 // The first "track" is merely session meta
634 // data.
635
636 ALOGW("Session doesn't contain any playable "
637 "tracks. Aborting.");
638 result = ERROR_UNSUPPORTED;
639 } else {
640 setupTrack(1);
641 }
642 }
643 }
644
645 if (result != OK) {
646 sp<AMessage> reply = new AMessage('disc', this);
647 mConn->disconnect(reply);
648 }
649 break;
650 }
651
652 case 'setu':
653 {
654 size_t index;
655 CHECK(msg->findSize("index", &index));
656
657 TrackInfo *track = NULL;
658 size_t trackIndex;
659 if (msg->findSize("track-index", &trackIndex)) {
660 track = &mTracks.editItemAt(trackIndex);
661 }
662
663 int32_t result;
664 CHECK(msg->findInt32("result", &result));
665
666 ALOGI("SETUP(%zu) completed with result %d (%s)",
667 index, result, strerror(-result));
668
669 if (result == OK) {
670 CHECK(track != NULL);
671
672 sp<RefBase> obj;
673 CHECK(msg->findObject("response", &obj));
674 sp<ARTSPResponse> response =
675 static_cast<ARTSPResponse *>(obj.get());
676
677 if (response->mStatusCode != 200) {
678 result = UNKNOWN_ERROR;
679 } else {
680 ssize_t i = response->mHeaders.indexOfKey("session");
681 CHECK_GE(i, 0);
682
683 mSessionID = response->mHeaders.valueAt(i);
684
685 mKeepAliveTimeoutUs = kDefaultKeepAliveTimeoutUs;
686 AString timeoutStr;
687 if (GetAttribute(
688 mSessionID.c_str(), "timeout", &timeoutStr)) {
689 char *end;
690 unsigned long timeoutSecs =
691 strtoul(timeoutStr.c_str(), &end, 10);
692
693 if (end == timeoutStr.c_str() || *end != '\0') {
694 ALOGW("server specified malformed timeout '%s'",
695 timeoutStr.c_str());
696
697 mKeepAliveTimeoutUs = kDefaultKeepAliveTimeoutUs;
698 } else if (timeoutSecs < 15) {
699 ALOGW("server specified too short a timeout "
700 "(%lu secs), using default.",
701 timeoutSecs);
702
703 mKeepAliveTimeoutUs = kDefaultKeepAliveTimeoutUs;
704 } else {
705 mKeepAliveTimeoutUs = timeoutSecs * 1000000ll;
706
707 ALOGI("server specified timeout of %lu secs.",
708 timeoutSecs);
709 }
710 }
711
712 i = mSessionID.find(";");
713 if (i >= 0) {
714 // Remove options, i.e. ";timeout=90"
715 mSessionID.erase(i, mSessionID.size() - i);
716 }
717
718 sp<AMessage> notify = new AMessage('accu', this);
719 notify->setSize("track-index", trackIndex);
720
721 i = response->mHeaders.indexOfKey("transport");
722 CHECK_GE(i, 0);
723
724 if (track->mRTPSocket != -1 && track->mRTCPSocket != -1) {
725 if (!track->mUsingInterleavedTCP) {
726 AString transport = response->mHeaders.valueAt(i);
727
728 // We are going to continue even if we were
729 // unable to poke a hole into the firewall...
730 pokeAHole(
731 track->mRTPSocket,
732 track->mRTCPSocket,
733 transport);
734 }
735
736 mRTPConn->addStream(
737 track->mRTPSocket, track->mRTCPSocket,
738 mSessionDesc, index,
739 notify, track->mUsingInterleavedTCP);
740
741 mSetupTracksSuccessful = true;
742 } else {
743 result = BAD_VALUE;
744 }
745 }
746 }
747
748 if (result != OK) {
749 if (track) {
750 if (!track->mUsingInterleavedTCP) {
751 // Clear the tag
752 if (mUIDValid) {
753 HTTPBase::UnRegisterSocketUserTag(track->mRTPSocket);
754 HTTPBase::UnRegisterSocketUserMark(track->mRTPSocket);
755 HTTPBase::UnRegisterSocketUserTag(track->mRTCPSocket);
756 HTTPBase::UnRegisterSocketUserMark(track->mRTCPSocket);
757 }
758
759 close(track->mRTPSocket);
760 close(track->mRTCPSocket);
761 }
762
763 mTracks.removeItemsAt(trackIndex);
764 }
765 }
766
767 ++index;
768 if (result == OK && index < mSessionDesc->countTracks()) {
769 setupTrack(index);
770 } else if (mSetupTracksSuccessful) {
771 ++mKeepAliveGeneration;
772 postKeepAlive();
773
774 AString request = "PLAY ";
775 request.append(mControlURL);
776 request.append(" RTSP/1.0\r\n");
777
778 request.append("Session: ");
779 request.append(mSessionID);
780 request.append("\r\n");
781
782 request.append("\r\n");
783
784 sp<AMessage> reply = new AMessage('play', this);
785 mConn->sendRequest(request.c_str(), reply);
786 } else {
787 sp<AMessage> reply = new AMessage('disc', this);
788 mConn->disconnect(reply);
789 }
790 break;
791 }
792
793 case 'play':
794 {
795 int32_t result;
796 CHECK(msg->findInt32("result", &result));
797
798 ALOGI("PLAY completed with result %d (%s)",
799 result, strerror(-result));
800
801 if (result == OK) {
802 sp<RefBase> obj;
803 CHECK(msg->findObject("response", &obj));
804 sp<ARTSPResponse> response =
805 static_cast<ARTSPResponse *>(obj.get());
806
807 if (response->mStatusCode != 200) {
808 result = UNKNOWN_ERROR;
809 } else {
810 parsePlayResponse(response);
811 postTimeout();
812 }
813 }
814
815 if (result != OK) {
816 sp<AMessage> reply = new AMessage('disc', this);
817 mConn->disconnect(reply);
818 }
819
820 break;
821 }
822
823 case 'aliv':
824 {
825 int32_t generation;
826 CHECK(msg->findInt32("generation", &generation));
827
828 if (generation != mKeepAliveGeneration) {
829 // obsolete event.
830 break;
831 }
832
833 AString request;
834 request.append("OPTIONS ");
835 request.append(mSessionURL);
836 request.append(" RTSP/1.0\r\n");
837 request.append("Session: ");
838 request.append(mSessionID);
839 request.append("\r\n");
840 request.append("\r\n");
841
842 sp<AMessage> reply = new AMessage('opts', this);
843 reply->setInt32("generation", mKeepAliveGeneration);
844 mConn->sendRequest(request.c_str(), reply);
845 break;
846 }
847
848 case 'opts':
849 {
850 int32_t result;
851 CHECK(msg->findInt32("result", &result));
852
853 ALOGI("OPTIONS completed with result %d (%s)",
854 result, strerror(-result));
855
856 int32_t generation;
857 CHECK(msg->findInt32("generation", &generation));
858
859 if (generation != mKeepAliveGeneration) {
860 // obsolete event.
861 break;
862 }
863
864 postKeepAlive();
865 break;
866 }
867
868 case 'abor':
869 {
870 for (size_t i = 0; i < mTracks.size(); ++i) {
871 TrackInfo *info = &mTracks.editItemAt(i);
872
873 if (!mFirstAccessUnit) {
874 postQueueEOS(i, ERROR_END_OF_STREAM);
875 }
876
877 if (!info->mUsingInterleavedTCP) {
878 mRTPConn->removeStream(info->mRTPSocket, info->mRTCPSocket);
879
880 // Clear the tag
881 if (mUIDValid) {
882 HTTPBase::UnRegisterSocketUserTag(info->mRTPSocket);
883 HTTPBase::UnRegisterSocketUserMark(info->mRTPSocket);
884 HTTPBase::UnRegisterSocketUserTag(info->mRTCPSocket);
885 HTTPBase::UnRegisterSocketUserMark(info->mRTCPSocket);
886 }
887
888 close(info->mRTPSocket);
889 close(info->mRTCPSocket);
890 }
891 }
892 mTracks.clear();
893 mSetupTracksSuccessful = false;
894 mSeekPending = false;
895 mFirstAccessUnit = true;
896 mAllTracksHaveTime = false;
897 mNTPAnchorUs = -1;
898 mMediaAnchorUs = -1;
899 mNumAccessUnitsReceived = 0;
900 mReceivedFirstRTCPPacket = false;
901 mReceivedFirstRTPPacket = false;
902 mPausing = false;
903 mSeekable = true;
904
905 sp<AMessage> reply = new AMessage('tear', this);
906
907 int32_t reconnect;
908 if (msg->findInt32("reconnect", &reconnect) && reconnect) {
909 reply->setInt32("reconnect", true);
910 }
911
912 AString request;
913 request = "TEARDOWN ";
914
915 // XXX should use aggregate url from SDP here...
916 request.append(mSessionURL);
917 request.append(" RTSP/1.0\r\n");
918
919 request.append("Session: ");
920 request.append(mSessionID);
921 request.append("\r\n");
922
923 request.append("\r\n");
924
925 mConn->sendRequest(request.c_str(), reply);
926 break;
927 }
928
929 case 'tear':
930 {
931 int32_t result;
932 CHECK(msg->findInt32("result", &result));
933
934 ALOGI("TEARDOWN completed with result %d (%s)",
935 result, strerror(-result));
936
937 sp<AMessage> reply = new AMessage('disc', this);
938
939 int32_t reconnect;
940 if (msg->findInt32("reconnect", &reconnect) && reconnect) {
941 reply->setInt32("reconnect", true);
942 }
943
944 mConn->disconnect(reply);
945 break;
946 }
947
948 case 'quit':
949 {
950 sp<AMessage> msg = mNotify->dup();
951 msg->setInt32("what", kWhatDisconnected);
952 msg->setInt32("result", UNKNOWN_ERROR);
953 msg->post();
954 break;
955 }
956
957 case 'chek':
958 {
959 int32_t generation;
960 CHECK(msg->findInt32("generation", &generation));
961 if (generation != mCheckGeneration) {
962 // This is an outdated message. Ignore.
963 break;
964 }
965
966 if (mNumAccessUnitsReceived == 0) {
967 #if 1
968 ALOGI("stream ended? aborting.");
969 (new AMessage('abor', this))->post();
970 break;
971 #else
972 ALOGI("haven't seen an AU in a looong time.");
973 #endif
974 }
975
976 mNumAccessUnitsReceived = 0;
977 msg->post(kAccessUnitTimeoutUs);
978 break;
979 }
980
981 case 'accu':
982 {
983 if (mSeekPending) {
984 ALOGV("Stale access unit.");
985 break;
986 }
987
988 int32_t timeUpdate;
989 if (msg->findInt32("time-update", &timeUpdate) && timeUpdate) {
990 size_t trackIndex;
991 CHECK(msg->findSize("track-index", &trackIndex));
992
993 uint32_t rtpTime;
994 uint64_t ntpTime;
995 CHECK(msg->findInt32("rtp-time", (int32_t *)&rtpTime));
996 CHECK(msg->findInt64("ntp-time", (int64_t *)&ntpTime));
997
998 onTimeUpdate(trackIndex, rtpTime, ntpTime);
999 break;
1000 }
1001
1002 int32_t first;
1003 if (msg->findInt32("first-rtcp", &first)) {
1004 mReceivedFirstRTCPPacket = true;
1005 break;
1006 }
1007
1008 if (msg->findInt32("first-rtp", &first)) {
1009 mReceivedFirstRTPPacket = true;
1010 break;
1011 }
1012
1013 ++mNumAccessUnitsReceived;
1014 postAccessUnitTimeoutCheck();
1015
1016 size_t trackIndex;
1017 CHECK(msg->findSize("track-index", &trackIndex));
1018
1019 if (trackIndex >= mTracks.size()) {
1020 ALOGV("late packets ignored.");
1021 break;
1022 }
1023
1024 TrackInfo *track = &mTracks.editItemAt(trackIndex);
1025
1026 int32_t eos;
1027 if (msg->findInt32("eos", &eos)) {
1028 ALOGI("received BYE on track index %zu", trackIndex);
1029 if (!mAllTracksHaveTime && dataReceivedOnAllChannels()) {
1030 ALOGI("No time established => fake existing data");
1031
1032 track->mEOSReceived = true;
1033 mTryFakeRTCP = true;
1034 mReceivedFirstRTCPPacket = true;
1035 fakeTimestamps();
1036 } else {
1037 postQueueEOS(trackIndex, ERROR_END_OF_STREAM);
1038 }
1039 return;
1040 }
1041
1042 if (mSeekPending) {
1043 ALOGV("we're seeking, dropping stale packet.");
1044 break;
1045 }
1046
1047 sp<ABuffer> accessUnit;
1048 CHECK(msg->findBuffer("access-unit", &accessUnit));
1049 onAccessUnitComplete(trackIndex, accessUnit);
1050 break;
1051 }
1052
1053 case 'paus':
1054 {
1055 int32_t generation;
1056 CHECK(msg->findInt32("pausecheck", &generation));
1057 if (generation != mPauseGeneration) {
1058 ALOGV("Ignoring outdated pause message.");
1059 break;
1060 }
1061
1062 if (!mSeekable) {
1063 ALOGW("This is a live stream, ignoring pause request.");
1064 break;
1065 }
1066
1067 if (mPausing) {
1068 ALOGV("This stream is already paused.");
1069 break;
1070 }
1071
1072 mCheckPending = true;
1073 ++mCheckGeneration;
1074 mPausing = true;
1075
1076 AString request = "PAUSE ";
1077 request.append(mControlURL);
1078 request.append(" RTSP/1.0\r\n");
1079
1080 request.append("Session: ");
1081 request.append(mSessionID);
1082 request.append("\r\n");
1083
1084 request.append("\r\n");
1085
1086 sp<AMessage> reply = new AMessage('pau2', this);
1087 mConn->sendRequest(request.c_str(), reply);
1088 break;
1089 }
1090
1091 case 'pau2':
1092 {
1093 int32_t result;
1094 CHECK(msg->findInt32("result", &result));
1095 mCheckTimeoutGeneration++;
1096
1097 ALOGI("PAUSE completed with result %d (%s)",
1098 result, strerror(-result));
1099 break;
1100 }
1101
1102 case 'resu':
1103 {
1104 if (mPausing && mSeekPending) {
1105 // If seeking, Play will be sent from see1 instead
1106 break;
1107 }
1108
1109 if (!mPausing) {
1110 // Dont send PLAY if we have not paused
1111 break;
1112 }
1113 AString request = "PLAY ";
1114 request.append(mControlURL);
1115 request.append(" RTSP/1.0\r\n");
1116
1117 request.append("Session: ");
1118 request.append(mSessionID);
1119 request.append("\r\n");
1120
1121 request.append("\r\n");
1122
1123 sp<AMessage> reply = new AMessage('res2', this);
1124 mConn->sendRequest(request.c_str(), reply);
1125 break;
1126 }
1127
1128 case 'res2':
1129 {
1130 int32_t result;
1131 CHECK(msg->findInt32("result", &result));
1132
1133 ALOGI("PLAY (for resume) completed with result %d (%s)",
1134 result, strerror(-result));
1135
1136 mCheckPending = false;
1137 ++mCheckGeneration;
1138 postAccessUnitTimeoutCheck();
1139
1140 if (result == OK) {
1141 sp<RefBase> obj;
1142 CHECK(msg->findObject("response", &obj));
1143 sp<ARTSPResponse> response =
1144 static_cast<ARTSPResponse *>(obj.get());
1145
1146 if (response->mStatusCode != 200) {
1147 result = UNKNOWN_ERROR;
1148 } else {
1149 parsePlayResponse(response);
1150
1151 // Post new timeout in order to make sure to use
1152 // fake timestamps if no new Sender Reports arrive
1153 postTimeout();
1154 }
1155 }
1156
1157 if (result != OK) {
1158 ALOGE("resume failed, aborting.");
1159 (new AMessage('abor', this))->post();
1160 }
1161
1162 mPausing = false;
1163 break;
1164 }
1165
1166 case 'seek':
1167 {
1168 if (!mSeekable) {
1169 ALOGW("This is a live stream, ignoring seek request.");
1170
1171 sp<AMessage> msg = mNotify->dup();
1172 msg->setInt32("what", kWhatSeekDone);
1173 msg->post();
1174 break;
1175 }
1176
1177 int64_t timeUs;
1178 CHECK(msg->findInt64("time", &timeUs));
1179
1180 mSeekPending = true;
1181
1182 // Disable the access unit timeout until we resumed
1183 // playback again.
1184 mCheckPending = true;
1185 ++mCheckGeneration;
1186
1187 sp<AMessage> reply = new AMessage('see0', this);
1188 reply->setInt64("time", timeUs);
1189
1190 if (mPausing) {
1191 // PAUSE already sent
1192 ALOGI("Pause already sent");
1193 reply->post();
1194 break;
1195 }
1196 AString request = "PAUSE ";
1197 request.append(mControlURL);
1198 request.append(" RTSP/1.0\r\n");
1199
1200 request.append("Session: ");
1201 request.append(mSessionID);
1202 request.append("\r\n");
1203
1204 request.append("\r\n");
1205
1206 mConn->sendRequest(request.c_str(), reply);
1207 break;
1208 }
1209
1210 case 'see0':
1211 {
1212 // Session is paused now.
1213 status_t err = OK;
1214 msg->findInt32("result", &err);
1215
1216 int64_t timeUs;
1217 CHECK(msg->findInt64("time", &timeUs));
1218
1219 sp<AMessage> notify = mNotify->dup();
1220 notify->setInt32("what", kWhatSeekPaused);
1221 notify->setInt32("err", err);
1222 notify->setInt64("time", timeUs);
1223 notify->post();
1224 break;
1225
1226 }
1227
1228 case 'see1':
1229 {
1230 for (size_t i = 0; i < mTracks.size(); ++i) {
1231 TrackInfo *info = &mTracks.editItemAt(i);
1232
1233 postQueueSeekDiscontinuity(i);
1234 info->mEOSReceived = false;
1235
1236 info->mRTPAnchor = 0;
1237 info->mNTPAnchorUs = -1;
1238 }
1239
1240 mAllTracksHaveTime = false;
1241 mNTPAnchorUs = -1;
1242
1243 // Start new timeoutgeneration to avoid getting timeout
1244 // before PLAY response arrive
1245 postTimeout();
1246
1247 int64_t timeUs;
1248 CHECK(msg->findInt64("time", &timeUs));
1249
1250 AString request = "PLAY ";
1251 request.append(mControlURL);
1252 request.append(" RTSP/1.0\r\n");
1253
1254 request.append("Session: ");
1255 request.append(mSessionID);
1256 request.append("\r\n");
1257
1258 request.append(
1259 AStringPrintf(
1260 "Range: npt=%lld-\r\n", timeUs / 1000000ll));
1261
1262 request.append("\r\n");
1263
1264 sp<AMessage> reply = new AMessage('see2', this);
1265 mConn->sendRequest(request.c_str(), reply);
1266 break;
1267 }
1268
1269 case 'see2':
1270 {
1271 if (mTracks.size() == 0) {
1272 // We have already hit abor, break
1273 break;
1274 }
1275
1276 int32_t result;
1277 CHECK(msg->findInt32("result", &result));
1278
1279 ALOGI("PLAY (for seek) completed with result %d (%s)",
1280 result, strerror(-result));
1281
1282 mCheckPending = false;
1283 ++mCheckGeneration;
1284 postAccessUnitTimeoutCheck();
1285
1286 if (result == OK) {
1287 sp<RefBase> obj;
1288 CHECK(msg->findObject("response", &obj));
1289 sp<ARTSPResponse> response =
1290 static_cast<ARTSPResponse *>(obj.get());
1291
1292 if (response->mStatusCode != 200) {
1293 result = UNKNOWN_ERROR;
1294 } else {
1295 parsePlayResponse(response);
1296
1297 // Post new timeout in order to make sure to use
1298 // fake timestamps if no new Sender Reports arrive
1299 postTimeout();
1300
1301 ssize_t i = response->mHeaders.indexOfKey("rtp-info");
1302 CHECK_GE(i, 0);
1303
1304 ALOGV("rtp-info: %s", response->mHeaders.valueAt(i).c_str());
1305
1306 ALOGI("seek completed.");
1307 }
1308 }
1309
1310 if (result != OK) {
1311 ALOGE("seek failed, aborting.");
1312 (new AMessage('abor', this))->post();
1313 }
1314
1315 mPausing = false;
1316 mSeekPending = false;
1317
1318 // Discard all stale access units.
1319 for (size_t i = 0; i < mTracks.size(); ++i) {
1320 TrackInfo *track = &mTracks.editItemAt(i);
1321 track->mPackets.clear();
1322 }
1323
1324 sp<AMessage> msg = mNotify->dup();
1325 msg->setInt32("what", kWhatSeekDone);
1326 msg->post();
1327 break;
1328 }
1329
1330 case 'biny':
1331 {
1332 sp<ABuffer> buffer;
1333 CHECK(msg->findBuffer("buffer", &buffer));
1334
1335 int32_t index;
1336 CHECK(buffer->meta()->findInt32("index", &index));
1337
1338 mRTPConn->injectPacket(index, buffer);
1339 break;
1340 }
1341
1342 case 'tiou':
1343 {
1344 int32_t timeoutGenerationCheck;
1345 CHECK(msg->findInt32("tioucheck", &timeoutGenerationCheck));
1346 if (timeoutGenerationCheck != mCheckTimeoutGeneration) {
1347 // This is an outdated message. Ignore.
1348 // This typically happens if a lot of seeks are
1349 // performed, since new timeout messages now are
1350 // posted at seek as well.
1351 break;
1352 }
1353 if (!mReceivedFirstRTCPPacket) {
1354 if (dataReceivedOnAllChannels() && !mTryFakeRTCP) {
1355 ALOGW("We received RTP packets but no RTCP packets, "
1356 "using fake timestamps.");
1357
1358 mTryFakeRTCP = true;
1359
1360 mReceivedFirstRTCPPacket = true;
1361
1362 fakeTimestamps();
1363 } else if (!mReceivedFirstRTPPacket && !mTryTCPInterleaving) {
1364 ALOGW("Never received any data, switching transports.");
1365
1366 mTryTCPInterleaving = true;
1367
1368 sp<AMessage> msg = new AMessage('abor', this);
1369 msg->setInt32("reconnect", true);
1370 msg->post();
1371 } else {
1372 ALOGW("Never received any data, disconnecting.");
1373 (new AMessage('abor', this))->post();
1374 }
1375 } else {
1376 if (!mAllTracksHaveTime) {
1377 ALOGW("We received some RTCP packets, but time "
1378 "could not be established on all tracks, now "
1379 "using fake timestamps");
1380
1381 fakeTimestamps();
1382 }
1383 }
1384 break;
1385 }
1386
1387 default:
1388 TRESPASS();
1389 break;
1390 }
1391 }
1392
postKeepAliveMyHandler1393 void postKeepAlive() {
1394 sp<AMessage> msg = new AMessage('aliv', this);
1395 msg->setInt32("generation", mKeepAliveGeneration);
1396 msg->post((mKeepAliveTimeoutUs * 9) / 10);
1397 }
1398
cancelAccessUnitTimeoutCheckMyHandler1399 void cancelAccessUnitTimeoutCheck() {
1400 ALOGV("cancelAccessUnitTimeoutCheck");
1401 ++mCheckGeneration;
1402 }
1403
postAccessUnitTimeoutCheckMyHandler1404 void postAccessUnitTimeoutCheck() {
1405 if (mCheckPending) {
1406 return;
1407 }
1408
1409 mCheckPending = true;
1410 sp<AMessage> check = new AMessage('chek', this);
1411 check->setInt32("generation", mCheckGeneration);
1412 check->post(kAccessUnitTimeoutUs);
1413 }
1414
SplitStringMyHandler1415 static void SplitString(
1416 const AString &s, const char *separator, List<AString> *items) {
1417 items->clear();
1418 size_t start = 0;
1419 while (start < s.size()) {
1420 ssize_t offset = s.find(separator, start);
1421
1422 if (offset < 0) {
1423 items->push_back(AString(s, start, s.size() - start));
1424 break;
1425 }
1426
1427 items->push_back(AString(s, start, offset - start));
1428 start = offset + strlen(separator);
1429 }
1430 }
1431
parsePlayResponseMyHandler1432 void parsePlayResponse(const sp<ARTSPResponse> &response) {
1433 mPlayResponseParsed = true;
1434 if (mTracks.size() == 0) {
1435 ALOGV("parsePlayResponse: late packets ignored.");
1436 return;
1437 }
1438
1439 ssize_t i = response->mHeaders.indexOfKey("range");
1440 if (i < 0) {
1441 // Server doesn't even tell use what range it is going to
1442 // play, therefore we won't support seeking.
1443 return;
1444 }
1445
1446 AString range = response->mHeaders.valueAt(i);
1447 ALOGV("Range: %s", range.c_str());
1448
1449 AString val;
1450 CHECK(GetAttribute(range.c_str(), "npt", &val));
1451
1452 float npt1, npt2;
1453 if (!ASessionDescription::parseNTPRange(val.c_str(), &npt1, &npt2)) {
1454 // This is a live stream and therefore not seekable.
1455
1456 ALOGI("This is a live stream");
1457 return;
1458 }
1459
1460 i = response->mHeaders.indexOfKey("rtp-info");
1461 CHECK_GE(i, 0);
1462
1463 AString rtpInfo = response->mHeaders.valueAt(i);
1464 List<AString> streamInfos;
1465 SplitString(rtpInfo, ",", &streamInfos);
1466
1467 int n = 1;
1468 for (List<AString>::iterator it = streamInfos.begin();
1469 it != streamInfos.end(); ++it) {
1470 (*it).trim();
1471 ALOGV("streamInfo[%d] = %s", n, (*it).c_str());
1472
1473 CHECK(GetAttribute((*it).c_str(), "url", &val));
1474
1475 size_t trackIndex = 0;
1476 while (trackIndex < mTracks.size()
1477 && !(val == mTracks.editItemAt(trackIndex).mURL)) {
1478 ++trackIndex;
1479 }
1480 CHECK_LT(trackIndex, mTracks.size());
1481
1482 CHECK(GetAttribute((*it).c_str(), "seq", &val));
1483
1484 char *end;
1485 unsigned long seq = strtoul(val.c_str(), &end, 10);
1486
1487 TrackInfo *info = &mTracks.editItemAt(trackIndex);
1488 info->mFirstSeqNumInSegment = seq;
1489 info->mNewSegment = true;
1490 info->mAllowedStaleAccessUnits = kMaxAllowedStaleAccessUnits;
1491
1492 CHECK(GetAttribute((*it).c_str(), "rtptime", &val));
1493
1494 uint32_t rtpTime = strtoul(val.c_str(), &end, 10);
1495
1496 ALOGV("track #%d: rtpTime=%u <=> npt=%.2f", n, rtpTime, npt1);
1497
1498 info->mNormalPlayTimeRTP = rtpTime;
1499 info->mNormalPlayTimeUs = (int64_t)(npt1 * 1E6);
1500
1501 if (!mFirstAccessUnit) {
1502 postNormalPlayTimeMapping(
1503 trackIndex,
1504 info->mNormalPlayTimeRTP, info->mNormalPlayTimeUs);
1505 }
1506
1507 ++n;
1508 }
1509 }
1510
getTrackFormatMyHandler1511 sp<MetaData> getTrackFormat(size_t index, int32_t *timeScale) {
1512 CHECK_GE(index, 0u);
1513 CHECK_LT(index, mTracks.size());
1514
1515 const TrackInfo &info = mTracks.itemAt(index);
1516
1517 *timeScale = info.mTimeScale;
1518
1519 return info.mPacketSource->getFormat();
1520 }
1521
countTracksMyHandler1522 size_t countTracks() const {
1523 return mTracks.size();
1524 }
1525
1526 private:
1527 struct TrackInfo {
1528 AString mURL;
1529 int mRTPSocket;
1530 int mRTCPSocket;
1531 bool mUsingInterleavedTCP;
1532 uint32_t mFirstSeqNumInSegment;
1533 bool mNewSegment;
1534 int32_t mAllowedStaleAccessUnits;
1535
1536 uint32_t mRTPAnchor;
1537 int64_t mNTPAnchorUs;
1538 int32_t mTimeScale;
1539 bool mEOSReceived;
1540
1541 uint32_t mNormalPlayTimeRTP;
1542 int64_t mNormalPlayTimeUs;
1543
1544 sp<APacketSource> mPacketSource;
1545
1546 // Stores packets temporarily while no notion of time
1547 // has been established yet.
1548 List<sp<ABuffer> > mPackets;
1549 };
1550
1551 sp<AMessage> mNotify;
1552 bool mUIDValid;
1553 uid_t mUID;
1554 sp<ALooper> mNetLooper;
1555 sp<ARTSPConnection> mConn;
1556 sp<ARTPConnection> mRTPConn;
1557 sp<ASessionDescription> mSessionDesc;
1558 AString mOriginalSessionURL; // This one still has user:pass@
1559 AString mSessionURL;
1560 AString mSessionHost;
1561 AString mBaseURL;
1562 AString mControlURL;
1563 AString mSessionID;
1564 bool mSetupTracksSuccessful;
1565 bool mSeekPending;
1566 bool mFirstAccessUnit;
1567
1568 bool mAllTracksHaveTime;
1569 int64_t mNTPAnchorUs;
1570 int64_t mMediaAnchorUs;
1571 int64_t mLastMediaTimeUs;
1572
1573 int64_t mNumAccessUnitsReceived;
1574 bool mCheckPending;
1575 int32_t mCheckGeneration;
1576 int32_t mCheckTimeoutGeneration;
1577 bool mTryTCPInterleaving;
1578 bool mTryFakeRTCP;
1579 bool mReceivedFirstRTCPPacket;
1580 bool mReceivedFirstRTPPacket;
1581 bool mSeekable;
1582 int64_t mKeepAliveTimeoutUs;
1583 int32_t mKeepAliveGeneration;
1584 bool mPausing;
1585 int32_t mPauseGeneration;
1586
1587 Vector<TrackInfo> mTracks;
1588
1589 bool mPlayResponseParsed;
1590
setupTrackMyHandler1591 void setupTrack(size_t index) {
1592 sp<APacketSource> source =
1593 new APacketSource(mSessionDesc, index);
1594
1595 if (source->initCheck() != OK) {
1596 ALOGW("Unsupported format. Ignoring track #%zu.", index);
1597
1598 sp<AMessage> reply = new AMessage('setu', this);
1599 reply->setSize("index", index);
1600 reply->setInt32("result", ERROR_UNSUPPORTED);
1601 reply->post();
1602 return;
1603 }
1604
1605 AString url;
1606 CHECK(mSessionDesc->findAttribute(index, "a=control", &url));
1607
1608 AString trackURL;
1609 CHECK(MakeURL(mBaseURL.c_str(), url.c_str(), &trackURL));
1610
1611 mTracks.push(TrackInfo());
1612 TrackInfo *info = &mTracks.editItemAt(mTracks.size() - 1);
1613 info->mURL = trackURL;
1614 info->mPacketSource = source;
1615 info->mUsingInterleavedTCP = false;
1616 info->mFirstSeqNumInSegment = 0;
1617 info->mNewSegment = true;
1618 info->mAllowedStaleAccessUnits = kMaxAllowedStaleAccessUnits;
1619 info->mRTPSocket = -1;
1620 info->mRTCPSocket = -1;
1621 info->mRTPAnchor = 0;
1622 info->mNTPAnchorUs = -1;
1623 info->mNormalPlayTimeRTP = 0;
1624 info->mNormalPlayTimeUs = 0ll;
1625
1626 unsigned long PT;
1627 AString formatDesc;
1628 AString formatParams;
1629 mSessionDesc->getFormatType(index, &PT, &formatDesc, &formatParams);
1630
1631 int32_t timescale;
1632 int32_t numChannels;
1633 ASessionDescription::ParseFormatDesc(
1634 formatDesc.c_str(), ×cale, &numChannels);
1635
1636 info->mTimeScale = timescale;
1637 info->mEOSReceived = false;
1638
1639 ALOGV("track #%zu URL=%s", mTracks.size(), trackURL.c_str());
1640
1641 AString request = "SETUP ";
1642 request.append(trackURL);
1643 request.append(" RTSP/1.0\r\n");
1644
1645 if (mTryTCPInterleaving) {
1646 size_t interleaveIndex = 2 * (mTracks.size() - 1);
1647 info->mUsingInterleavedTCP = true;
1648 info->mRTPSocket = interleaveIndex;
1649 info->mRTCPSocket = interleaveIndex + 1;
1650
1651 request.append("Transport: RTP/AVP/TCP;interleaved=");
1652 request.append(interleaveIndex);
1653 request.append("-");
1654 request.append(interleaveIndex + 1);
1655 } else {
1656 unsigned rtpPort;
1657 ARTPConnection::MakePortPair(
1658 &info->mRTPSocket, &info->mRTCPSocket, &rtpPort);
1659
1660 if (mUIDValid) {
1661 HTTPBase::RegisterSocketUserTag(info->mRTPSocket, mUID,
1662 (uint32_t)*(uint32_t*) "RTP_");
1663 HTTPBase::RegisterSocketUserTag(info->mRTCPSocket, mUID,
1664 (uint32_t)*(uint32_t*) "RTP_");
1665 HTTPBase::RegisterSocketUserMark(info->mRTPSocket, mUID);
1666 HTTPBase::RegisterSocketUserMark(info->mRTCPSocket, mUID);
1667 }
1668
1669 request.append("Transport: RTP/AVP/UDP;unicast;client_port=");
1670 request.append(rtpPort);
1671 request.append("-");
1672 request.append(rtpPort + 1);
1673 }
1674
1675 request.append("\r\n");
1676
1677 if (index > 1) {
1678 request.append("Session: ");
1679 request.append(mSessionID);
1680 request.append("\r\n");
1681 }
1682
1683 request.append("\r\n");
1684
1685 sp<AMessage> reply = new AMessage('setu', this);
1686 reply->setSize("index", index);
1687 reply->setSize("track-index", mTracks.size() - 1);
1688 mConn->sendRequest(request.c_str(), reply);
1689 }
1690
MakeURLMyHandler1691 static bool MakeURL(const char *baseURL, const char *url, AString *out) {
1692 out->clear();
1693
1694 if (strncasecmp("rtsp://", baseURL, 7)) {
1695 // Base URL must be absolute
1696 return false;
1697 }
1698
1699 if (!strncasecmp("rtsp://", url, 7)) {
1700 // "url" is already an absolute URL, ignore base URL.
1701 out->setTo(url);
1702 return true;
1703 }
1704
1705 size_t n = strlen(baseURL);
1706 out->setTo(baseURL);
1707 if (baseURL[n - 1] != '/') {
1708 out->append("/");
1709 }
1710 out->append(url);
1711
1712 return true;
1713 }
1714
fakeTimestampsMyHandler1715 void fakeTimestamps() {
1716 mNTPAnchorUs = -1ll;
1717 for (size_t i = 0; i < mTracks.size(); ++i) {
1718 onTimeUpdate(i, 0, 0ll);
1719 }
1720 }
1721
dataReceivedOnAllChannelsMyHandler1722 bool dataReceivedOnAllChannels() {
1723 TrackInfo *track;
1724 for (size_t i = 0; i < mTracks.size(); ++i) {
1725 track = &mTracks.editItemAt(i);
1726 if (track->mPackets.empty()) {
1727 return false;
1728 }
1729 }
1730 return true;
1731 }
1732
handleFirstAccessUnitMyHandler1733 void handleFirstAccessUnit() {
1734 if (mFirstAccessUnit) {
1735 sp<AMessage> msg = mNotify->dup();
1736 msg->setInt32("what", kWhatConnected);
1737 msg->post();
1738
1739 if (mSeekable) {
1740 for (size_t i = 0; i < mTracks.size(); ++i) {
1741 TrackInfo *info = &mTracks.editItemAt(i);
1742
1743 postNormalPlayTimeMapping(
1744 i,
1745 info->mNormalPlayTimeRTP, info->mNormalPlayTimeUs);
1746 }
1747 }
1748
1749 mFirstAccessUnit = false;
1750 }
1751 }
1752
onTimeUpdateMyHandler1753 void onTimeUpdate(int32_t trackIndex, uint32_t rtpTime, uint64_t ntpTime) {
1754 ALOGV("onTimeUpdate track %d, rtpTime = 0x%08x, ntpTime = %#016llx",
1755 trackIndex, rtpTime, (long long)ntpTime);
1756
1757 int64_t ntpTimeUs = (int64_t)(ntpTime * 1E6 / (1ll << 32));
1758
1759 TrackInfo *track = &mTracks.editItemAt(trackIndex);
1760
1761 track->mRTPAnchor = rtpTime;
1762 track->mNTPAnchorUs = ntpTimeUs;
1763
1764 if (mNTPAnchorUs < 0) {
1765 mNTPAnchorUs = ntpTimeUs;
1766 mMediaAnchorUs = mLastMediaTimeUs;
1767 }
1768
1769 if (!mAllTracksHaveTime) {
1770 bool allTracksHaveTime = (mTracks.size() > 0);
1771 for (size_t i = 0; i < mTracks.size(); ++i) {
1772 TrackInfo *track = &mTracks.editItemAt(i);
1773 if (track->mNTPAnchorUs < 0) {
1774 allTracksHaveTime = false;
1775 break;
1776 }
1777 }
1778 if (allTracksHaveTime) {
1779 mAllTracksHaveTime = true;
1780 ALOGI("Time now established for all tracks.");
1781 }
1782 }
1783 if (mAllTracksHaveTime && dataReceivedOnAllChannels()) {
1784 handleFirstAccessUnit();
1785
1786 // Time is now established, lets start timestamping immediately
1787 for (size_t i = 0; i < mTracks.size(); ++i) {
1788 if (OK != processAccessUnitQueue(i)) {
1789 return;
1790 }
1791 }
1792 for (size_t i = 0; i < mTracks.size(); ++i) {
1793 TrackInfo *trackInfo = &mTracks.editItemAt(i);
1794 if (trackInfo->mEOSReceived) {
1795 postQueueEOS(i, ERROR_END_OF_STREAM);
1796 trackInfo->mEOSReceived = false;
1797 }
1798 }
1799 }
1800 }
1801
processAccessUnitQueueMyHandler1802 status_t processAccessUnitQueue(int32_t trackIndex) {
1803 TrackInfo *track = &mTracks.editItemAt(trackIndex);
1804 while (!track->mPackets.empty()) {
1805 sp<ABuffer> accessUnit = *track->mPackets.begin();
1806 track->mPackets.erase(track->mPackets.begin());
1807
1808 uint32_t seqNum = (uint32_t)accessUnit->int32Data();
1809 if (track->mNewSegment) {
1810 // The sequence number from RTP packet has only 16 bits and is extended
1811 // by ARTPSource. Only the low 16 bits of seq in RTP-Info of reply of
1812 // RTSP "PLAY" command should be used to detect the first RTP packet
1813 // after seeking.
1814 if (mSeekable) {
1815 if (track->mAllowedStaleAccessUnits > 0) {
1816 uint32_t seqNum16 = seqNum & 0xffff;
1817 uint32_t firstSeqNumInSegment16 = track->mFirstSeqNumInSegment & 0xffff;
1818 if (seqNum16 > firstSeqNumInSegment16 + kMaxAllowedStaleAccessUnits
1819 || seqNum16 < firstSeqNumInSegment16) {
1820 // Not the first rtp packet of the stream after seeking, discarding.
1821 track->mAllowedStaleAccessUnits--;
1822 ALOGV("discarding stale access unit (0x%x : 0x%x)",
1823 seqNum, track->mFirstSeqNumInSegment);
1824 continue;
1825 }
1826 ALOGW_IF(seqNum16 != firstSeqNumInSegment16,
1827 "Missing the first packet(%u), now take packet(%u) as first one",
1828 track->mFirstSeqNumInSegment, seqNum);
1829 } else { // track->mAllowedStaleAccessUnits <= 0
1830 mNumAccessUnitsReceived = 0;
1831 ALOGW_IF(track->mAllowedStaleAccessUnits == 0,
1832 "Still no first rtp packet after %d stale ones",
1833 kMaxAllowedStaleAccessUnits);
1834 track->mAllowedStaleAccessUnits = -1;
1835 return UNKNOWN_ERROR;
1836 }
1837 }
1838
1839 // Now found the first rtp packet of the stream after seeking.
1840 track->mFirstSeqNumInSegment = seqNum;
1841 track->mNewSegment = false;
1842 }
1843
1844 if (seqNum < track->mFirstSeqNumInSegment) {
1845 ALOGV("dropping stale access-unit (%d < %d)",
1846 seqNum, track->mFirstSeqNumInSegment);
1847 continue;
1848 }
1849
1850 if (addMediaTimestamp(trackIndex, track, accessUnit)) {
1851 postQueueAccessUnit(trackIndex, accessUnit);
1852 }
1853 }
1854 return OK;
1855 }
1856
onAccessUnitCompleteMyHandler1857 void onAccessUnitComplete(
1858 int32_t trackIndex, const sp<ABuffer> &accessUnit) {
1859 TrackInfo *track = &mTracks.editItemAt(trackIndex);
1860 track->mPackets.push_back(accessUnit);
1861
1862 uint32_t seqNum = (uint32_t)accessUnit->int32Data();
1863 ALOGV("onAccessUnitComplete track %d storing accessunit %u", trackIndex, seqNum);
1864
1865 if(!mPlayResponseParsed){
1866 ALOGV("play response is not parsed");
1867 return;
1868 }
1869
1870 handleFirstAccessUnit();
1871
1872 if (!mAllTracksHaveTime) {
1873 ALOGV("storing accessUnit, no time established yet");
1874 return;
1875 }
1876
1877 if (OK != processAccessUnitQueue(trackIndex)) {
1878 return;
1879 }
1880
1881 if (track->mEOSReceived) {
1882 postQueueEOS(trackIndex, ERROR_END_OF_STREAM);
1883 track->mEOSReceived = false;
1884 }
1885 }
1886
addMediaTimestampMyHandler1887 bool addMediaTimestamp(
1888 int32_t trackIndex, const TrackInfo *track,
1889 const sp<ABuffer> &accessUnit) {
1890 UNUSED_UNLESS_VERBOSE(trackIndex);
1891
1892 uint32_t rtpTime;
1893 CHECK(accessUnit->meta()->findInt32(
1894 "rtp-time", (int32_t *)&rtpTime));
1895
1896 int64_t relRtpTimeUs =
1897 (((int64_t)rtpTime - (int64_t)track->mRTPAnchor) * 1000000ll)
1898 / track->mTimeScale;
1899
1900 int64_t ntpTimeUs = track->mNTPAnchorUs + relRtpTimeUs;
1901
1902 int64_t mediaTimeUs = mMediaAnchorUs + ntpTimeUs - mNTPAnchorUs;
1903
1904 if (mediaTimeUs > mLastMediaTimeUs) {
1905 mLastMediaTimeUs = mediaTimeUs;
1906 }
1907
1908 if (mediaTimeUs < 0) {
1909 ALOGV("dropping early accessUnit.");
1910 return false;
1911 }
1912
1913 ALOGV("track %d rtpTime=%u mediaTimeUs = %lld us (%.2f secs)",
1914 trackIndex, rtpTime, (long long)mediaTimeUs, mediaTimeUs / 1E6);
1915
1916 accessUnit->meta()->setInt64("timeUs", mediaTimeUs);
1917
1918 return true;
1919 }
1920
postQueueAccessUnitMyHandler1921 void postQueueAccessUnit(
1922 size_t trackIndex, const sp<ABuffer> &accessUnit) {
1923 sp<AMessage> msg = mNotify->dup();
1924 msg->setInt32("what", kWhatAccessUnit);
1925 msg->setSize("trackIndex", trackIndex);
1926 msg->setBuffer("accessUnit", accessUnit);
1927 msg->post();
1928 }
1929
postQueueEOSMyHandler1930 void postQueueEOS(size_t trackIndex, status_t finalResult) {
1931 sp<AMessage> msg = mNotify->dup();
1932 msg->setInt32("what", kWhatEOS);
1933 msg->setSize("trackIndex", trackIndex);
1934 msg->setInt32("finalResult", finalResult);
1935 msg->post();
1936 }
1937
postQueueSeekDiscontinuityMyHandler1938 void postQueueSeekDiscontinuity(size_t trackIndex) {
1939 sp<AMessage> msg = mNotify->dup();
1940 msg->setInt32("what", kWhatSeekDiscontinuity);
1941 msg->setSize("trackIndex", trackIndex);
1942 msg->post();
1943 }
1944
postNormalPlayTimeMappingMyHandler1945 void postNormalPlayTimeMapping(
1946 size_t trackIndex, uint32_t rtpTime, int64_t nptUs) {
1947 sp<AMessage> msg = mNotify->dup();
1948 msg->setInt32("what", kWhatNormalPlayTimeMapping);
1949 msg->setSize("trackIndex", trackIndex);
1950 msg->setInt32("rtpTime", rtpTime);
1951 msg->setInt64("nptUs", nptUs);
1952 msg->post();
1953 }
1954
postTimeoutMyHandler1955 void postTimeout() {
1956 sp<AMessage> timeout = new AMessage('tiou', this);
1957 mCheckTimeoutGeneration++;
1958 timeout->setInt32("tioucheck", mCheckTimeoutGeneration);
1959
1960 int64_t startupTimeoutUs;
1961 startupTimeoutUs = property_get_int64("media.rtsp.timeout-us", kStartupTimeoutUs);
1962 timeout->post(startupTimeoutUs);
1963 }
1964
1965 DISALLOW_EVIL_CONSTRUCTORS(MyHandler);
1966 };
1967
1968 } // namespace android
1969
1970 #endif // MY_HANDLER_H_
1971