1 /*
2 * Copyright (C) 2010 The Android Open Source Project
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
17 #ifndef MY_HANDLER_H_
18
19 #define MY_HANDLER_H_
20
21 //#define LOG_NDEBUG 0
22
23 #ifndef LOG_TAG
24 #define LOG_TAG "MyHandler"
25 #endif
26
27 #include <utils/Log.h>
28 #include <cutils/properties.h> // for property_get
29
30 #include "APacketSource.h"
31 #include "ARTPConnection.h"
32 #include "ARTSPConnection.h"
33 #include "ASessionDescription.h"
34
35 #include <ctype.h>
36 #include <cutils/properties.h>
37
38 #include <media/stagefright/foundation/ABuffer.h>
39 #include <media/stagefright/foundation/ADebug.h>
40 #include <media/stagefright/foundation/ALooper.h>
41 #include <media/stagefright/foundation/AMessage.h>
42 #include <media/stagefright/MediaErrors.h>
43 #include <media/stagefright/Utils.h>
44
45 #include <arpa/inet.h>
46 #include <sys/socket.h>
47 #include <netdb.h>
48
49 #include "HTTPBase.h"
50
51 #if LOG_NDEBUG
52 #define UNUSED_UNLESS_VERBOSE(x) (void)(x)
53 #else
54 #define UNUSED_UNLESS_VERBOSE(x)
55 #endif
56
57 // If no access units are received within 5 secs, assume that the rtp
58 // stream has ended and signal end of stream.
59 static int64_t kAccessUnitTimeoutUs = 10000000ll;
60
61 // If no access units arrive for the first 10 secs after starting the
62 // stream, assume none ever will and signal EOS or switch transports.
63 static int64_t kStartupTimeoutUs = 10000000ll;
64
65 static int64_t kDefaultKeepAliveTimeoutUs = 60000000ll;
66
67 static int64_t kPauseDelayUs = 3000000ll;
68
69 // The allowed maximum number of stale access units at the beginning of
70 // a new sequence.
71 static int32_t kMaxAllowedStaleAccessUnits = 20;
72
73 namespace android {
74
GetAttribute(const char * s,const char * key,AString * value)75 static bool GetAttribute(const char *s, const char *key, AString *value) {
76 value->clear();
77
78 size_t keyLen = strlen(key);
79
80 for (;;) {
81 while (isspace(*s)) {
82 ++s;
83 }
84
85 const char *colonPos = strchr(s, ';');
86
87 size_t len =
88 (colonPos == NULL) ? strlen(s) : colonPos - s;
89
90 if (len >= keyLen + 1 && s[keyLen] == '=' && !strncmp(s, key, keyLen)) {
91 value->setTo(&s[keyLen + 1], len - keyLen - 1);
92 return true;
93 }
94
95 if (colonPos == NULL) {
96 return false;
97 }
98
99 s = colonPos + 1;
100 }
101 }
102
103 struct MyHandler : public AHandler {
104 enum {
105 kWhatConnected = 'conn',
106 kWhatDisconnected = 'disc',
107 kWhatSeekPaused = 'spau',
108 kWhatSeekDone = 'sdon',
109
110 kWhatAccessUnit = 'accU',
111 kWhatEOS = 'eos!',
112 kWhatSeekDiscontinuity = 'seeD',
113 kWhatNormalPlayTimeMapping = 'nptM',
114 };
115
116 MyHandler(
117 const char *url,
118 const sp<AMessage> ¬ify,
119 bool uidValid = false, uid_t uid = 0)
mNotifyMyHandler120 : mNotify(notify),
121 mUIDValid(uidValid),
122 mUID(uid),
123 mNetLooper(new ALooper),
124 mConn(new ARTSPConnection(mUIDValid, mUID)),
125 mRTPConn(new ARTPConnection),
126 mOriginalSessionURL(url),
127 mSessionURL(url),
128 mSetupTracksSuccessful(false),
129 mSeekPending(false),
130 mFirstAccessUnit(true),
131 mAllTracksHaveTime(false),
132 mNTPAnchorUs(-1),
133 mMediaAnchorUs(-1),
134 mLastMediaTimeUs(0),
135 mNumAccessUnitsReceived(0),
136 mCheckPending(false),
137 mCheckGeneration(0),
138 mCheckTimeoutGeneration(0),
139 mTryTCPInterleaving(property_get_bool("rtp.transport.TCP", false)),
140 mTryFakeRTCP(false),
141 mReceivedFirstRTCPPacket(false),
142 mReceivedFirstRTPPacket(false),
143 mSeekable(true),
144 mKeepAliveTimeoutUs(kDefaultKeepAliveTimeoutUs),
145 mKeepAliveGeneration(0),
146 mPausing(false),
147 mPauseGeneration(0),
148 mPlayResponseParsed(false) {
149 mNetLooper->setName("rtsp net");
150 mNetLooper->start(false /* runOnCallingThread */,
151 false /* canCallJava */,
152 PRIORITY_HIGHEST);
153
154 // Strip any authentication info from the session url, we don't
155 // want to transmit user/pass in cleartext.
156 AString host, path, user, pass;
157 unsigned port;
158 CHECK(ARTSPConnection::ParseURL(
159 mSessionURL.c_str(), &host, &port, &path, &user, &pass));
160
161 if (user.size() > 0) {
162 mSessionURL.clear();
163 mSessionURL.append("rtsp://");
164 mSessionURL.append(host);
165 mSessionURL.append(":");
166 mSessionURL.append(AStringPrintf("%u", port));
167 mSessionURL.append(path);
168
169 ALOGV("rewritten session url: '%s'", mSessionURL.c_str());
170 }
171
172 mSessionHost = host;
173 }
174
connectMyHandler175 void connect() {
176 looper()->registerHandler(mConn);
177 (1 ? mNetLooper : looper())->registerHandler(mRTPConn);
178
179 sp<AMessage> notify = new AMessage('biny', this);
180 mConn->observeBinaryData(notify);
181
182 sp<AMessage> reply = new AMessage('conn', this);
183 mConn->connect(mOriginalSessionURL.c_str(), reply);
184 }
185
loadSDPMyHandler186 void loadSDP(const sp<ASessionDescription>& desc) {
187 looper()->registerHandler(mConn);
188 (1 ? mNetLooper : looper())->registerHandler(mRTPConn);
189
190 sp<AMessage> notify = new AMessage('biny', this);
191 mConn->observeBinaryData(notify);
192
193 sp<AMessage> reply = new AMessage('sdpl', this);
194 reply->setObject("description", desc);
195 mConn->connect(mOriginalSessionURL.c_str(), reply);
196 }
197
getControlURLMyHandler198 AString getControlURL() {
199 AString sessionLevelControlURL;
200 if (mSessionDesc->findAttribute(
201 0,
202 "a=control",
203 &sessionLevelControlURL)) {
204 if (sessionLevelControlURL.compare("*") == 0) {
205 return mBaseURL;
206 } else {
207 AString controlURL;
208 CHECK(MakeURL(
209 mBaseURL.c_str(),
210 sessionLevelControlURL.c_str(),
211 &controlURL));
212 return controlURL;
213 }
214 } else {
215 return mSessionURL;
216 }
217 }
218
disconnectMyHandler219 void disconnect() {
220 (new AMessage('abor', this))->post();
221 }
222
seekMyHandler223 void seek(int64_t timeUs) {
224 sp<AMessage> msg = new AMessage('seek', this);
225 msg->setInt64("time", timeUs);
226 mPauseGeneration++;
227 msg->post();
228 }
229
continueSeekAfterPauseMyHandler230 void continueSeekAfterPause(int64_t timeUs) {
231 sp<AMessage> msg = new AMessage('see1', this);
232 msg->setInt64("time", timeUs);
233 msg->post();
234 }
235
isSeekableMyHandler236 bool isSeekable() const {
237 return mSeekable;
238 }
239
pauseMyHandler240 void pause() {
241 sp<AMessage> msg = new AMessage('paus', this);
242 mPauseGeneration++;
243 msg->setInt32("pausecheck", mPauseGeneration);
244 msg->post();
245 }
246
resumeMyHandler247 void resume() {
248 sp<AMessage> msg = new AMessage('resu', this);
249 mPauseGeneration++;
250 msg->post();
251 }
252
addRRMyHandler253 static void addRR(const sp<ABuffer> &buf) {
254 uint8_t *ptr = buf->data() + buf->size();
255 ptr[0] = 0x80 | 0;
256 ptr[1] = 201; // RR
257 ptr[2] = 0;
258 ptr[3] = 1;
259 ptr[4] = 0xde; // SSRC
260 ptr[5] = 0xad;
261 ptr[6] = 0xbe;
262 ptr[7] = 0xef;
263
264 buf->setRange(0, buf->size() + 8);
265 }
266
addSDESMyHandler267 static void addSDES(int s, const sp<ABuffer> &buffer) {
268 struct sockaddr_in addr;
269 socklen_t addrSize = sizeof(addr);
270 if (getsockname(s, (sockaddr *)&addr, &addrSize) != 0) {
271 inet_aton("0.0.0.0", &(addr.sin_addr));
272 }
273
274 uint8_t *data = buffer->data() + buffer->size();
275 data[0] = 0x80 | 1;
276 data[1] = 202; // SDES
277 data[4] = 0xde; // SSRC
278 data[5] = 0xad;
279 data[6] = 0xbe;
280 data[7] = 0xef;
281
282 size_t offset = 8;
283
284 data[offset++] = 1; // CNAME
285
286 AString cname = "stagefright@";
287 cname.append(inet_ntoa(addr.sin_addr));
288 data[offset++] = cname.size();
289
290 memcpy(&data[offset], cname.c_str(), cname.size());
291 offset += cname.size();
292
293 data[offset++] = 6; // TOOL
294
295 AString tool = MakeUserAgent();
296
297 data[offset++] = tool.size();
298
299 memcpy(&data[offset], tool.c_str(), tool.size());
300 offset += tool.size();
301
302 data[offset++] = 0;
303
304 if ((offset % 4) > 0) {
305 size_t count = 4 - (offset % 4);
306 switch (count) {
307 case 3:
308 data[offset++] = 0;
309 case 2:
310 data[offset++] = 0;
311 case 1:
312 data[offset++] = 0;
313 }
314 }
315
316 size_t numWords = (offset / 4) - 1;
317 data[2] = numWords >> 8;
318 data[3] = numWords & 0xff;
319
320 buffer->setRange(buffer->offset(), buffer->size() + offset);
321 }
322
323 // In case we're behind NAT, fire off two UDP packets to the remote
324 // rtp/rtcp ports to poke a hole into the firewall for future incoming
325 // packets. We're going to send an RR/SDES RTCP packet to both of them.
pokeAHoleMyHandler326 bool pokeAHole(int rtpSocket, int rtcpSocket, const AString &transport) {
327 struct sockaddr_in addr;
328 memset(addr.sin_zero, 0, sizeof(addr.sin_zero));
329 addr.sin_family = AF_INET;
330
331 AString source;
332 AString server_port;
333 if (!GetAttribute(transport.c_str(),
334 "source",
335 &source)) {
336 ALOGW("Missing 'source' field in Transport response. Using "
337 "RTSP endpoint address.");
338
339 struct hostent *ent = gethostbyname(mSessionHost.c_str());
340 if (ent == NULL) {
341 ALOGE("Failed to look up address of session host '%s'",
342 mSessionHost.c_str());
343
344 return false;
345 }
346
347 addr.sin_addr.s_addr = *(in_addr_t *)ent->h_addr;
348 } else {
349 addr.sin_addr.s_addr = inet_addr(source.c_str());
350 }
351
352 if (!GetAttribute(transport.c_str(),
353 "server_port",
354 &server_port)) {
355 ALOGI("Missing 'server_port' field in Transport response.");
356 return false;
357 }
358
359 int rtpPort, rtcpPort;
360 if (sscanf(server_port.c_str(), "%d-%d", &rtpPort, &rtcpPort) != 2
361 || rtpPort <= 0 || rtpPort > 65535
362 || rtcpPort <=0 || rtcpPort > 65535
363 || rtcpPort != rtpPort + 1) {
364 ALOGE("Server picked invalid RTP/RTCP port pair %s,"
365 " RTP port must be even, RTCP port must be one higher.",
366 server_port.c_str());
367
368 return false;
369 }
370
371 if (rtpPort & 1) {
372 ALOGW("Server picked an odd RTP port, it should've picked an "
373 "even one, we'll let it pass for now, but this may break "
374 "in the future.");
375 }
376
377 if (addr.sin_addr.s_addr == INADDR_NONE) {
378 return true;
379 }
380
381 if (IN_LOOPBACK(ntohl(addr.sin_addr.s_addr))) {
382 // No firewalls to traverse on the loopback interface.
383 return true;
384 }
385
386 // Make up an RR/SDES RTCP packet.
387 sp<ABuffer> buf = new ABuffer(65536);
388 buf->setRange(0, 0);
389 addRR(buf);
390 addSDES(rtpSocket, buf);
391
392 addr.sin_port = htons(rtpPort);
393
394 ssize_t n = sendto(
395 rtpSocket, buf->data(), buf->size(), 0,
396 (const sockaddr *)&addr, sizeof(addr));
397
398 if (n < (ssize_t)buf->size()) {
399 ALOGE("failed to poke a hole for RTP packets");
400 return false;
401 }
402
403 addr.sin_port = htons(rtcpPort);
404
405 n = sendto(
406 rtcpSocket, buf->data(), buf->size(), 0,
407 (const sockaddr *)&addr, sizeof(addr));
408
409 if (n < (ssize_t)buf->size()) {
410 ALOGE("failed to poke a hole for RTCP packets");
411 return false;
412 }
413
414 ALOGV("successfully poked holes.");
415
416 return true;
417 }
418
isLiveStreamMyHandler419 static bool isLiveStream(const sp<ASessionDescription> &desc) {
420 AString attrLiveStream;
421 if (desc->findAttribute(0, "a=LiveStream", &attrLiveStream)) {
422 ssize_t semicolonPos = attrLiveStream.find(";", 2);
423
424 const char* liveStreamValue;
425 if (semicolonPos < 0) {
426 liveStreamValue = attrLiveStream.c_str();
427 } else {
428 AString valString;
429 valString.setTo(attrLiveStream,
430 semicolonPos + 1,
431 attrLiveStream.size() - semicolonPos - 1);
432 liveStreamValue = valString.c_str();
433 }
434
435 uint32_t value = strtoul(liveStreamValue, NULL, 10);
436 if (value == 1) {
437 ALOGV("found live stream");
438 return true;
439 }
440 } else {
441 // It is a live stream if no duration is returned
442 int64_t durationUs;
443 if (!desc->getDurationUs(&durationUs)) {
444 ALOGV("No duration found, assume live stream");
445 return true;
446 }
447 }
448
449 return false;
450 }
451
onMessageReceivedMyHandler452 virtual void onMessageReceived(const sp<AMessage> &msg) {
453 switch (msg->what()) {
454 case 'conn':
455 {
456 int32_t result;
457 CHECK(msg->findInt32("result", &result));
458
459 ALOGI("connection request completed with result %d (%s)",
460 result, strerror(-result));
461
462 if (result == OK) {
463 AString request;
464 request = "DESCRIBE ";
465 request.append(mSessionURL);
466 request.append(" RTSP/1.0\r\n");
467 request.append("Accept: application/sdp\r\n");
468 request.append("\r\n");
469
470 sp<AMessage> reply = new AMessage('desc', this);
471 mConn->sendRequest(request.c_str(), reply);
472 } else {
473 (new AMessage('disc', this))->post();
474 }
475 break;
476 }
477
478 case 'disc':
479 {
480 ++mKeepAliveGeneration;
481
482 int32_t reconnect;
483 if (msg->findInt32("reconnect", &reconnect) && reconnect) {
484 sp<AMessage> reply = new AMessage('conn', this);
485 mConn->connect(mOriginalSessionURL.c_str(), reply);
486 } else {
487 (new AMessage('quit', this))->post();
488 }
489 break;
490 }
491
492 case 'desc':
493 {
494 int32_t result;
495 CHECK(msg->findInt32("result", &result));
496
497 ALOGI("DESCRIBE completed with result %d (%s)",
498 result, strerror(-result));
499
500 if (result == OK) {
501 sp<RefBase> obj;
502 CHECK(msg->findObject("response", &obj));
503 sp<ARTSPResponse> response =
504 static_cast<ARTSPResponse *>(obj.get());
505
506 if (response->mStatusCode == 301 || response->mStatusCode == 302) {
507 ssize_t i = response->mHeaders.indexOfKey("location");
508 CHECK_GE(i, 0);
509
510 mOriginalSessionURL = response->mHeaders.valueAt(i);
511 mSessionURL = mOriginalSessionURL;
512
513 // Strip any authentication info from the session url, we don't
514 // want to transmit user/pass in cleartext.
515 AString host, path, user, pass;
516 unsigned port;
517 if (ARTSPConnection::ParseURL(
518 mSessionURL.c_str(), &host, &port, &path, &user, &pass)
519 && user.size() > 0) {
520 mSessionURL.clear();
521 mSessionURL.append("rtsp://");
522 mSessionURL.append(host);
523 mSessionURL.append(":");
524 mSessionURL.append(AStringPrintf("%u", port));
525 mSessionURL.append(path);
526
527 ALOGI("rewritten session url: '%s'", mSessionURL.c_str());
528 }
529
530 sp<AMessage> reply = new AMessage('conn', this);
531 mConn->connect(mOriginalSessionURL.c_str(), reply);
532 break;
533 }
534
535 if (response->mStatusCode != 200) {
536 result = UNKNOWN_ERROR;
537 } else if (response->mContent == NULL) {
538 result = ERROR_MALFORMED;
539 ALOGE("The response has no content.");
540 } else {
541 mSessionDesc = new ASessionDescription;
542
543 mSessionDesc->setTo(
544 response->mContent->data(),
545 response->mContent->size());
546
547 if (!mSessionDesc->isValid()) {
548 ALOGE("Failed to parse session description.");
549 result = ERROR_MALFORMED;
550 } else {
551 ssize_t i = response->mHeaders.indexOfKey("content-base");
552 if (i >= 0) {
553 mBaseURL = response->mHeaders.valueAt(i);
554 } else {
555 i = response->mHeaders.indexOfKey("content-location");
556 if (i >= 0) {
557 mBaseURL = response->mHeaders.valueAt(i);
558 } else {
559 mBaseURL = mSessionURL;
560 }
561 }
562
563 mSeekable = !isLiveStream(mSessionDesc);
564
565 if (!mBaseURL.startsWith("rtsp://")) {
566 // Some misbehaving servers specify a relative
567 // URL in one of the locations above, combine
568 // it with the absolute session URL to get
569 // something usable...
570
571 ALOGW("Server specified a non-absolute base URL"
572 ", combining it with the session URL to "
573 "get something usable...");
574
575 AString tmp;
576 CHECK(MakeURL(
577 mSessionURL.c_str(),
578 mBaseURL.c_str(),
579 &tmp));
580
581 mBaseURL = tmp;
582 }
583
584 mControlURL = getControlURL();
585
586 if (mSessionDesc->countTracks() < 2) {
587 // There's no actual tracks in this session.
588 // The first "track" is merely session meta
589 // data.
590
591 ALOGW("Session doesn't contain any playable "
592 "tracks. Aborting.");
593 result = ERROR_UNSUPPORTED;
594 } else {
595 setupTrack(1);
596 }
597 }
598 }
599 }
600
601 if (result != OK) {
602 sp<AMessage> reply = new AMessage('disc', this);
603 mConn->disconnect(reply);
604 }
605 break;
606 }
607
608 case 'sdpl':
609 {
610 int32_t result;
611 CHECK(msg->findInt32("result", &result));
612
613 ALOGI("SDP connection request completed with result %d (%s)",
614 result, strerror(-result));
615
616 if (result == OK) {
617 sp<RefBase> obj;
618 CHECK(msg->findObject("description", &obj));
619 mSessionDesc =
620 static_cast<ASessionDescription *>(obj.get());
621
622 if (!mSessionDesc->isValid()) {
623 ALOGE("Failed to parse session description.");
624 result = ERROR_MALFORMED;
625 } else {
626 mBaseURL = mSessionURL;
627
628 mSeekable = !isLiveStream(mSessionDesc);
629
630 mControlURL = getControlURL();
631
632 if (mSessionDesc->countTracks() < 2) {
633 // There's no actual tracks in this session.
634 // The first "track" is merely session meta
635 // data.
636
637 ALOGW("Session doesn't contain any playable "
638 "tracks. Aborting.");
639 result = ERROR_UNSUPPORTED;
640 } else {
641 setupTrack(1);
642 }
643 }
644 }
645
646 if (result != OK) {
647 sp<AMessage> reply = new AMessage('disc', this);
648 mConn->disconnect(reply);
649 }
650 break;
651 }
652
653 case 'setu':
654 {
655 size_t index;
656 CHECK(msg->findSize("index", &index));
657
658 TrackInfo *track = NULL;
659 size_t trackIndex;
660 if (msg->findSize("track-index", &trackIndex)) {
661 track = &mTracks.editItemAt(trackIndex);
662 }
663
664 int32_t result;
665 CHECK(msg->findInt32("result", &result));
666
667 ALOGI("SETUP(%zu) completed with result %d (%s)",
668 index, result, strerror(-result));
669
670 if (result == OK) {
671 CHECK(track != NULL);
672
673 sp<RefBase> obj;
674 CHECK(msg->findObject("response", &obj));
675 sp<ARTSPResponse> response =
676 static_cast<ARTSPResponse *>(obj.get());
677
678 if (response->mStatusCode != 200) {
679 result = UNKNOWN_ERROR;
680 } else {
681 ssize_t i = response->mHeaders.indexOfKey("session");
682 CHECK_GE(i, 0);
683
684 mSessionID = response->mHeaders.valueAt(i);
685
686 mKeepAliveTimeoutUs = kDefaultKeepAliveTimeoutUs;
687 AString timeoutStr;
688 if (GetAttribute(
689 mSessionID.c_str(), "timeout", &timeoutStr)) {
690 char *end;
691 unsigned long timeoutSecs =
692 strtoul(timeoutStr.c_str(), &end, 10);
693
694 if (end == timeoutStr.c_str() || *end != '\0') {
695 ALOGW("server specified malformed timeout '%s'",
696 timeoutStr.c_str());
697
698 mKeepAliveTimeoutUs = kDefaultKeepAliveTimeoutUs;
699 } else if (timeoutSecs < 15) {
700 ALOGW("server specified too short a timeout "
701 "(%lu secs), using default.",
702 timeoutSecs);
703
704 mKeepAliveTimeoutUs = kDefaultKeepAliveTimeoutUs;
705 } else {
706 mKeepAliveTimeoutUs = timeoutSecs * 1000000ll;
707
708 ALOGI("server specified timeout of %lu secs.",
709 timeoutSecs);
710 }
711 }
712
713 i = mSessionID.find(";");
714 if (i >= 0) {
715 // Remove options, i.e. ";timeout=90"
716 mSessionID.erase(i, mSessionID.size() - i);
717 }
718
719 sp<AMessage> notify = new AMessage('accu', this);
720 notify->setSize("track-index", trackIndex);
721
722 i = response->mHeaders.indexOfKey("transport");
723 CHECK_GE(i, 0);
724
725 if (track->mRTPSocket != -1 && track->mRTCPSocket != -1) {
726 if (!track->mUsingInterleavedTCP) {
727 AString transport = response->mHeaders.valueAt(i);
728
729 // We are going to continue even if we were
730 // unable to poke a hole into the firewall...
731 pokeAHole(
732 track->mRTPSocket,
733 track->mRTCPSocket,
734 transport);
735 }
736
737 mRTPConn->addStream(
738 track->mRTPSocket, track->mRTCPSocket,
739 mSessionDesc, index,
740 notify, track->mUsingInterleavedTCP);
741
742 mSetupTracksSuccessful = true;
743 } else {
744 result = BAD_VALUE;
745 }
746 }
747 }
748
749 if (result != OK) {
750 if (track) {
751 if (!track->mUsingInterleavedTCP) {
752 // Clear the tag
753 if (mUIDValid) {
754 HTTPBase::UnRegisterSocketUserTag(track->mRTPSocket);
755 HTTPBase::UnRegisterSocketUserMark(track->mRTPSocket);
756 HTTPBase::UnRegisterSocketUserTag(track->mRTCPSocket);
757 HTTPBase::UnRegisterSocketUserMark(track->mRTCPSocket);
758 }
759
760 close(track->mRTPSocket);
761 close(track->mRTCPSocket);
762 }
763
764 mTracks.removeItemsAt(trackIndex);
765 }
766 }
767
768 ++index;
769 if (result == OK && index < mSessionDesc->countTracks()) {
770 setupTrack(index);
771 } else if (mSetupTracksSuccessful) {
772 ++mKeepAliveGeneration;
773 postKeepAlive();
774
775 AString request = "PLAY ";
776 request.append(mControlURL);
777 request.append(" RTSP/1.0\r\n");
778
779 request.append("Session: ");
780 request.append(mSessionID);
781 request.append("\r\n");
782
783 request.append("\r\n");
784
785 sp<AMessage> reply = new AMessage('play', this);
786 mConn->sendRequest(request.c_str(), reply);
787 } else {
788 sp<AMessage> reply = new AMessage('disc', this);
789 mConn->disconnect(reply);
790 }
791 break;
792 }
793
794 case 'play':
795 {
796 int32_t result;
797 CHECK(msg->findInt32("result", &result));
798
799 ALOGI("PLAY completed with result %d (%s)",
800 result, strerror(-result));
801
802 if (result == OK) {
803 sp<RefBase> obj;
804 CHECK(msg->findObject("response", &obj));
805 sp<ARTSPResponse> response =
806 static_cast<ARTSPResponse *>(obj.get());
807
808 if (response->mStatusCode != 200) {
809 result = UNKNOWN_ERROR;
810 } else {
811 parsePlayResponse(response);
812 postTimeout();
813 }
814 }
815
816 if (result != OK) {
817 sp<AMessage> reply = new AMessage('disc', this);
818 mConn->disconnect(reply);
819 }
820
821 break;
822 }
823
824 case 'aliv':
825 {
826 int32_t generation;
827 CHECK(msg->findInt32("generation", &generation));
828
829 if (generation != mKeepAliveGeneration) {
830 // obsolete event.
831 break;
832 }
833
834 AString request;
835 request.append("OPTIONS ");
836 request.append(mSessionURL);
837 request.append(" RTSP/1.0\r\n");
838 request.append("Session: ");
839 request.append(mSessionID);
840 request.append("\r\n");
841 request.append("\r\n");
842
843 sp<AMessage> reply = new AMessage('opts', this);
844 reply->setInt32("generation", mKeepAliveGeneration);
845 mConn->sendRequest(request.c_str(), reply);
846 break;
847 }
848
849 case 'opts':
850 {
851 int32_t result;
852 CHECK(msg->findInt32("result", &result));
853
854 ALOGI("OPTIONS completed with result %d (%s)",
855 result, strerror(-result));
856
857 int32_t generation;
858 CHECK(msg->findInt32("generation", &generation));
859
860 if (generation != mKeepAliveGeneration) {
861 // obsolete event.
862 break;
863 }
864
865 postKeepAlive();
866 break;
867 }
868
869 case 'abor':
870 {
871 for (size_t i = 0; i < mTracks.size(); ++i) {
872 TrackInfo *info = &mTracks.editItemAt(i);
873
874 if (!mFirstAccessUnit) {
875 postQueueEOS(i, ERROR_END_OF_STREAM);
876 }
877
878 if (!info->mUsingInterleavedTCP) {
879 mRTPConn->removeStream(info->mRTPSocket, info->mRTCPSocket);
880
881 // Clear the tag
882 if (mUIDValid) {
883 HTTPBase::UnRegisterSocketUserTag(info->mRTPSocket);
884 HTTPBase::UnRegisterSocketUserMark(info->mRTPSocket);
885 HTTPBase::UnRegisterSocketUserTag(info->mRTCPSocket);
886 HTTPBase::UnRegisterSocketUserMark(info->mRTCPSocket);
887 }
888
889 close(info->mRTPSocket);
890 close(info->mRTCPSocket);
891 }
892 }
893 mTracks.clear();
894 mSetupTracksSuccessful = false;
895 mSeekPending = false;
896 mFirstAccessUnit = true;
897 mAllTracksHaveTime = false;
898 mNTPAnchorUs = -1;
899 mMediaAnchorUs = -1;
900 mNumAccessUnitsReceived = 0;
901 mReceivedFirstRTCPPacket = false;
902 mReceivedFirstRTPPacket = false;
903 mPausing = false;
904 mSeekable = true;
905
906 sp<AMessage> reply = new AMessage('tear', this);
907
908 int32_t reconnect;
909 if (msg->findInt32("reconnect", &reconnect) && reconnect) {
910 reply->setInt32("reconnect", true);
911 }
912
913 AString request;
914 request = "TEARDOWN ";
915
916 // XXX should use aggregate url from SDP here...
917 request.append(mSessionURL);
918 request.append(" RTSP/1.0\r\n");
919
920 request.append("Session: ");
921 request.append(mSessionID);
922 request.append("\r\n");
923
924 request.append("\r\n");
925
926 mConn->sendRequest(request.c_str(), reply);
927 break;
928 }
929
930 case 'tear':
931 {
932 int32_t result;
933 CHECK(msg->findInt32("result", &result));
934
935 ALOGI("TEARDOWN completed with result %d (%s)",
936 result, strerror(-result));
937
938 sp<AMessage> reply = new AMessage('disc', this);
939
940 int32_t reconnect;
941 if (msg->findInt32("reconnect", &reconnect) && reconnect) {
942 reply->setInt32("reconnect", true);
943 }
944
945 mConn->disconnect(reply);
946 break;
947 }
948
949 case 'quit':
950 {
951 sp<AMessage> msg = mNotify->dup();
952 msg->setInt32("what", kWhatDisconnected);
953 msg->setInt32("result", UNKNOWN_ERROR);
954 msg->post();
955 break;
956 }
957
958 case 'chek':
959 {
960 int32_t generation;
961 CHECK(msg->findInt32("generation", &generation));
962 if (generation != mCheckGeneration) {
963 // This is an outdated message. Ignore.
964 break;
965 }
966
967 if (mNumAccessUnitsReceived == 0) {
968 #if 1
969 ALOGI("stream ended? aborting.");
970 (new AMessage('abor', this))->post();
971 break;
972 #else
973 ALOGI("haven't seen an AU in a looong time.");
974 #endif
975 }
976
977 mNumAccessUnitsReceived = 0;
978 msg->post(kAccessUnitTimeoutUs);
979 break;
980 }
981
982 case 'accu':
983 {
984 if (mSeekPending) {
985 ALOGV("Stale access unit.");
986 break;
987 }
988
989 int32_t timeUpdate;
990 if (msg->findInt32("time-update", &timeUpdate) && timeUpdate) {
991 size_t trackIndex;
992 CHECK(msg->findSize("track-index", &trackIndex));
993
994 uint32_t rtpTime;
995 uint64_t ntpTime;
996 CHECK(msg->findInt32("rtp-time", (int32_t *)&rtpTime));
997 CHECK(msg->findInt64("ntp-time", (int64_t *)&ntpTime));
998
999 onTimeUpdate(trackIndex, rtpTime, ntpTime);
1000 break;
1001 }
1002
1003 int32_t first;
1004 if (msg->findInt32("first-rtcp", &first)) {
1005 mReceivedFirstRTCPPacket = true;
1006 break;
1007 }
1008
1009 if (msg->findInt32("first-rtp", &first)) {
1010 mReceivedFirstRTPPacket = true;
1011 break;
1012 }
1013
1014 ++mNumAccessUnitsReceived;
1015 postAccessUnitTimeoutCheck();
1016
1017 size_t trackIndex;
1018 CHECK(msg->findSize("track-index", &trackIndex));
1019
1020 if (trackIndex >= mTracks.size()) {
1021 ALOGV("late packets ignored.");
1022 break;
1023 }
1024
1025 TrackInfo *track = &mTracks.editItemAt(trackIndex);
1026
1027 int32_t eos;
1028 if (msg->findInt32("eos", &eos)) {
1029 ALOGI("received BYE on track index %zu", trackIndex);
1030 if (!mAllTracksHaveTime && dataReceivedOnAllChannels()) {
1031 ALOGI("No time established => fake existing data");
1032
1033 track->mEOSReceived = true;
1034 mTryFakeRTCP = true;
1035 mReceivedFirstRTCPPacket = true;
1036 fakeTimestamps();
1037 } else {
1038 postQueueEOS(trackIndex, ERROR_END_OF_STREAM);
1039 }
1040 return;
1041 }
1042
1043 if (mSeekPending) {
1044 ALOGV("we're seeking, dropping stale packet.");
1045 break;
1046 }
1047
1048 sp<ABuffer> accessUnit;
1049 CHECK(msg->findBuffer("access-unit", &accessUnit));
1050 onAccessUnitComplete(trackIndex, accessUnit);
1051 break;
1052 }
1053
1054 case 'paus':
1055 {
1056 int32_t generation;
1057 CHECK(msg->findInt32("pausecheck", &generation));
1058 if (generation != mPauseGeneration) {
1059 ALOGV("Ignoring outdated pause message.");
1060 break;
1061 }
1062
1063 if (!mSeekable) {
1064 ALOGW("This is a live stream, ignoring pause request.");
1065 break;
1066 }
1067
1068 if (mPausing) {
1069 ALOGV("This stream is already paused.");
1070 break;
1071 }
1072
1073 mCheckPending = true;
1074 ++mCheckGeneration;
1075 mPausing = true;
1076
1077 AString request = "PAUSE ";
1078 request.append(mControlURL);
1079 request.append(" RTSP/1.0\r\n");
1080
1081 request.append("Session: ");
1082 request.append(mSessionID);
1083 request.append("\r\n");
1084
1085 request.append("\r\n");
1086
1087 sp<AMessage> reply = new AMessage('pau2', this);
1088 mConn->sendRequest(request.c_str(), reply);
1089 break;
1090 }
1091
1092 case 'pau2':
1093 {
1094 int32_t result;
1095 CHECK(msg->findInt32("result", &result));
1096 mCheckTimeoutGeneration++;
1097
1098 ALOGI("PAUSE completed with result %d (%s)",
1099 result, strerror(-result));
1100 break;
1101 }
1102
1103 case 'resu':
1104 {
1105 if (mPausing && mSeekPending) {
1106 // If seeking, Play will be sent from see1 instead
1107 break;
1108 }
1109
1110 if (!mPausing) {
1111 // Dont send PLAY if we have not paused
1112 break;
1113 }
1114 AString request = "PLAY ";
1115 request.append(mControlURL);
1116 request.append(" RTSP/1.0\r\n");
1117
1118 request.append("Session: ");
1119 request.append(mSessionID);
1120 request.append("\r\n");
1121
1122 request.append("\r\n");
1123
1124 sp<AMessage> reply = new AMessage('res2', this);
1125 mConn->sendRequest(request.c_str(), reply);
1126 break;
1127 }
1128
1129 case 'res2':
1130 {
1131 int32_t result;
1132 CHECK(msg->findInt32("result", &result));
1133
1134 ALOGI("PLAY (for resume) completed with result %d (%s)",
1135 result, strerror(-result));
1136
1137 mCheckPending = false;
1138 ++mCheckGeneration;
1139 postAccessUnitTimeoutCheck();
1140
1141 if (result == OK) {
1142 sp<RefBase> obj;
1143 CHECK(msg->findObject("response", &obj));
1144 sp<ARTSPResponse> response =
1145 static_cast<ARTSPResponse *>(obj.get());
1146
1147 if (response->mStatusCode != 200) {
1148 result = UNKNOWN_ERROR;
1149 } else {
1150 parsePlayResponse(response);
1151
1152 // Post new timeout in order to make sure to use
1153 // fake timestamps if no new Sender Reports arrive
1154 postTimeout();
1155 }
1156 }
1157
1158 if (result != OK) {
1159 ALOGE("resume failed, aborting.");
1160 (new AMessage('abor', this))->post();
1161 }
1162
1163 mPausing = false;
1164 break;
1165 }
1166
1167 case 'seek':
1168 {
1169 if (!mSeekable) {
1170 ALOGW("This is a live stream, ignoring seek request.");
1171
1172 sp<AMessage> msg = mNotify->dup();
1173 msg->setInt32("what", kWhatSeekDone);
1174 msg->post();
1175 break;
1176 }
1177
1178 int64_t timeUs;
1179 CHECK(msg->findInt64("time", &timeUs));
1180
1181 mSeekPending = true;
1182
1183 // Disable the access unit timeout until we resumed
1184 // playback again.
1185 mCheckPending = true;
1186 ++mCheckGeneration;
1187
1188 sp<AMessage> reply = new AMessage('see0', this);
1189 reply->setInt64("time", timeUs);
1190
1191 if (mPausing) {
1192 // PAUSE already sent
1193 ALOGI("Pause already sent");
1194 reply->post();
1195 break;
1196 }
1197 AString request = "PAUSE ";
1198 request.append(mControlURL);
1199 request.append(" RTSP/1.0\r\n");
1200
1201 request.append("Session: ");
1202 request.append(mSessionID);
1203 request.append("\r\n");
1204
1205 request.append("\r\n");
1206
1207 mConn->sendRequest(request.c_str(), reply);
1208 break;
1209 }
1210
1211 case 'see0':
1212 {
1213 // Session is paused now.
1214 status_t err = OK;
1215 msg->findInt32("result", &err);
1216
1217 int64_t timeUs;
1218 CHECK(msg->findInt64("time", &timeUs));
1219
1220 sp<AMessage> notify = mNotify->dup();
1221 notify->setInt32("what", kWhatSeekPaused);
1222 notify->setInt32("err", err);
1223 notify->setInt64("time", timeUs);
1224 notify->post();
1225 break;
1226
1227 }
1228
1229 case 'see1':
1230 {
1231 for (size_t i = 0; i < mTracks.size(); ++i) {
1232 TrackInfo *info = &mTracks.editItemAt(i);
1233
1234 postQueueSeekDiscontinuity(i);
1235 info->mEOSReceived = false;
1236
1237 info->mRTPAnchor = 0;
1238 info->mNTPAnchorUs = -1;
1239 }
1240
1241 mAllTracksHaveTime = false;
1242 mNTPAnchorUs = -1;
1243
1244 // Start new timeoutgeneration to avoid getting timeout
1245 // before PLAY response arrive
1246 postTimeout();
1247
1248 int64_t timeUs;
1249 CHECK(msg->findInt64("time", &timeUs));
1250
1251 AString request = "PLAY ";
1252 request.append(mControlURL);
1253 request.append(" RTSP/1.0\r\n");
1254
1255 request.append("Session: ");
1256 request.append(mSessionID);
1257 request.append("\r\n");
1258
1259 request.append(
1260 AStringPrintf(
1261 "Range: npt=%lld-\r\n", timeUs / 1000000ll));
1262
1263 request.append("\r\n");
1264
1265 sp<AMessage> reply = new AMessage('see2', this);
1266 mConn->sendRequest(request.c_str(), reply);
1267 break;
1268 }
1269
1270 case 'see2':
1271 {
1272 if (mTracks.size() == 0) {
1273 // We have already hit abor, break
1274 break;
1275 }
1276
1277 int32_t result;
1278 CHECK(msg->findInt32("result", &result));
1279
1280 ALOGI("PLAY (for seek) completed with result %d (%s)",
1281 result, strerror(-result));
1282
1283 mCheckPending = false;
1284 ++mCheckGeneration;
1285 postAccessUnitTimeoutCheck();
1286
1287 if (result == OK) {
1288 sp<RefBase> obj;
1289 CHECK(msg->findObject("response", &obj));
1290 sp<ARTSPResponse> response =
1291 static_cast<ARTSPResponse *>(obj.get());
1292
1293 if (response->mStatusCode != 200) {
1294 result = UNKNOWN_ERROR;
1295 } else {
1296 parsePlayResponse(response);
1297
1298 // Post new timeout in order to make sure to use
1299 // fake timestamps if no new Sender Reports arrive
1300 postTimeout();
1301
1302 ssize_t i = response->mHeaders.indexOfKey("rtp-info");
1303 CHECK_GE(i, 0);
1304
1305 ALOGV("rtp-info: %s", response->mHeaders.valueAt(i).c_str());
1306
1307 ALOGI("seek completed.");
1308 }
1309 }
1310
1311 if (result != OK) {
1312 ALOGE("seek failed, aborting.");
1313 (new AMessage('abor', this))->post();
1314 }
1315
1316 mPausing = false;
1317 mSeekPending = false;
1318
1319 // Discard all stale access units.
1320 for (size_t i = 0; i < mTracks.size(); ++i) {
1321 TrackInfo *track = &mTracks.editItemAt(i);
1322 track->mPackets.clear();
1323 }
1324
1325 sp<AMessage> msg = mNotify->dup();
1326 msg->setInt32("what", kWhatSeekDone);
1327 msg->post();
1328 break;
1329 }
1330
1331 case 'biny':
1332 {
1333 sp<ABuffer> buffer;
1334 CHECK(msg->findBuffer("buffer", &buffer));
1335
1336 int32_t index;
1337 CHECK(buffer->meta()->findInt32("index", &index));
1338
1339 mRTPConn->injectPacket(index, buffer);
1340 break;
1341 }
1342
1343 case 'tiou':
1344 {
1345 int32_t timeoutGenerationCheck;
1346 CHECK(msg->findInt32("tioucheck", &timeoutGenerationCheck));
1347 if (timeoutGenerationCheck != mCheckTimeoutGeneration) {
1348 // This is an outdated message. Ignore.
1349 // This typically happens if a lot of seeks are
1350 // performed, since new timeout messages now are
1351 // posted at seek as well.
1352 break;
1353 }
1354 if (!mReceivedFirstRTCPPacket) {
1355 if (dataReceivedOnAllChannels() && !mTryFakeRTCP) {
1356 ALOGW("We received RTP packets but no RTCP packets, "
1357 "using fake timestamps.");
1358
1359 mTryFakeRTCP = true;
1360
1361 mReceivedFirstRTCPPacket = true;
1362
1363 fakeTimestamps();
1364 } else if (!mReceivedFirstRTPPacket && !mTryTCPInterleaving) {
1365 ALOGW("Never received any data, switching transports.");
1366
1367 mTryTCPInterleaving = true;
1368
1369 sp<AMessage> msg = new AMessage('abor', this);
1370 msg->setInt32("reconnect", true);
1371 msg->post();
1372 } else {
1373 ALOGW("Never received any data, disconnecting.");
1374 (new AMessage('abor', this))->post();
1375 }
1376 } else {
1377 if (!mAllTracksHaveTime) {
1378 ALOGW("We received some RTCP packets, but time "
1379 "could not be established on all tracks, now "
1380 "using fake timestamps");
1381
1382 fakeTimestamps();
1383 }
1384 }
1385 break;
1386 }
1387
1388 default:
1389 TRESPASS();
1390 break;
1391 }
1392 }
1393
postKeepAliveMyHandler1394 void postKeepAlive() {
1395 sp<AMessage> msg = new AMessage('aliv', this);
1396 msg->setInt32("generation", mKeepAliveGeneration);
1397 msg->post((mKeepAliveTimeoutUs * 9) / 10);
1398 }
1399
cancelAccessUnitTimeoutCheckMyHandler1400 void cancelAccessUnitTimeoutCheck() {
1401 ALOGV("cancelAccessUnitTimeoutCheck");
1402 ++mCheckGeneration;
1403 }
1404
postAccessUnitTimeoutCheckMyHandler1405 void postAccessUnitTimeoutCheck() {
1406 if (mCheckPending) {
1407 return;
1408 }
1409
1410 mCheckPending = true;
1411 sp<AMessage> check = new AMessage('chek', this);
1412 check->setInt32("generation", mCheckGeneration);
1413 check->post(kAccessUnitTimeoutUs);
1414 }
1415
SplitStringMyHandler1416 static void SplitString(
1417 const AString &s, const char *separator, List<AString> *items) {
1418 items->clear();
1419 size_t start = 0;
1420 while (start < s.size()) {
1421 ssize_t offset = s.find(separator, start);
1422
1423 if (offset < 0) {
1424 items->push_back(AString(s, start, s.size() - start));
1425 break;
1426 }
1427
1428 items->push_back(AString(s, start, offset - start));
1429 start = offset + strlen(separator);
1430 }
1431 }
1432
parsePlayResponseMyHandler1433 void parsePlayResponse(const sp<ARTSPResponse> &response) {
1434 mPlayResponseParsed = true;
1435 if (mTracks.size() == 0) {
1436 ALOGV("parsePlayResponse: late packets ignored.");
1437 return;
1438 }
1439
1440 ssize_t i = response->mHeaders.indexOfKey("range");
1441 if (i < 0) {
1442 // Server doesn't even tell use what range it is going to
1443 // play, therefore we won't support seeking.
1444 return;
1445 }
1446
1447 AString range = response->mHeaders.valueAt(i);
1448 ALOGV("Range: %s", range.c_str());
1449
1450 AString val;
1451 CHECK(GetAttribute(range.c_str(), "npt", &val));
1452
1453 float npt1, npt2;
1454 if (!ASessionDescription::parseNTPRange(val.c_str(), &npt1, &npt2)) {
1455 // This is a live stream and therefore not seekable.
1456
1457 ALOGI("This is a live stream");
1458 return;
1459 }
1460
1461 i = response->mHeaders.indexOfKey("rtp-info");
1462 CHECK_GE(i, 0);
1463
1464 AString rtpInfo = response->mHeaders.valueAt(i);
1465 List<AString> streamInfos;
1466 SplitString(rtpInfo, ",", &streamInfos);
1467
1468 int n = 1;
1469 for (List<AString>::iterator it = streamInfos.begin();
1470 it != streamInfos.end(); ++it) {
1471 (*it).trim();
1472 ALOGV("streamInfo[%d] = %s", n, (*it).c_str());
1473
1474 CHECK(GetAttribute((*it).c_str(), "url", &val));
1475
1476 size_t trackIndex = 0;
1477 while (trackIndex < mTracks.size()
1478 && !(val == mTracks.editItemAt(trackIndex).mURL)) {
1479 ++trackIndex;
1480 }
1481 CHECK_LT(trackIndex, mTracks.size());
1482
1483 CHECK(GetAttribute((*it).c_str(), "seq", &val));
1484
1485 char *end;
1486 unsigned long seq = strtoul(val.c_str(), &end, 10);
1487
1488 TrackInfo *info = &mTracks.editItemAt(trackIndex);
1489 info->mFirstSeqNumInSegment = seq;
1490 info->mNewSegment = true;
1491 info->mAllowedStaleAccessUnits = kMaxAllowedStaleAccessUnits;
1492
1493 CHECK(GetAttribute((*it).c_str(), "rtptime", &val));
1494
1495 uint32_t rtpTime = strtoul(val.c_str(), &end, 10);
1496
1497 ALOGV("track #%d: rtpTime=%u <=> npt=%.2f", n, rtpTime, npt1);
1498
1499 info->mNormalPlayTimeRTP = rtpTime;
1500 info->mNormalPlayTimeUs = (int64_t)(npt1 * 1E6);
1501
1502 if (!mFirstAccessUnit) {
1503 postNormalPlayTimeMapping(
1504 trackIndex,
1505 info->mNormalPlayTimeRTP, info->mNormalPlayTimeUs);
1506 }
1507
1508 ++n;
1509 }
1510 }
1511
getTrackFormatMyHandler1512 sp<MetaData> getTrackFormat(size_t index, int32_t *timeScale) {
1513 CHECK_GE(index, 0u);
1514 CHECK_LT(index, mTracks.size());
1515
1516 const TrackInfo &info = mTracks.itemAt(index);
1517
1518 *timeScale = info.mTimeScale;
1519
1520 return info.mPacketSource->getFormat();
1521 }
1522
countTracksMyHandler1523 size_t countTracks() const {
1524 return mTracks.size();
1525 }
1526
1527 private:
1528 struct TrackInfo {
1529 AString mURL;
1530 int mRTPSocket;
1531 int mRTCPSocket;
1532 bool mUsingInterleavedTCP;
1533 uint32_t mFirstSeqNumInSegment;
1534 bool mNewSegment;
1535 int32_t mAllowedStaleAccessUnits;
1536
1537 uint32_t mRTPAnchor;
1538 int64_t mNTPAnchorUs;
1539 int32_t mTimeScale;
1540 bool mEOSReceived;
1541
1542 uint32_t mNormalPlayTimeRTP;
1543 int64_t mNormalPlayTimeUs;
1544
1545 sp<APacketSource> mPacketSource;
1546
1547 // Stores packets temporarily while no notion of time
1548 // has been established yet.
1549 List<sp<ABuffer> > mPackets;
1550 };
1551
1552 sp<AMessage> mNotify;
1553 bool mUIDValid;
1554 uid_t mUID;
1555 sp<ALooper> mNetLooper;
1556 sp<ARTSPConnection> mConn;
1557 sp<ARTPConnection> mRTPConn;
1558 sp<ASessionDescription> mSessionDesc;
1559 AString mOriginalSessionURL; // This one still has user:pass@
1560 AString mSessionURL;
1561 AString mSessionHost;
1562 AString mBaseURL;
1563 AString mControlURL;
1564 AString mSessionID;
1565 bool mSetupTracksSuccessful;
1566 bool mSeekPending;
1567 bool mFirstAccessUnit;
1568
1569 bool mAllTracksHaveTime;
1570 int64_t mNTPAnchorUs;
1571 int64_t mMediaAnchorUs;
1572 int64_t mLastMediaTimeUs;
1573
1574 int64_t mNumAccessUnitsReceived;
1575 bool mCheckPending;
1576 int32_t mCheckGeneration;
1577 int32_t mCheckTimeoutGeneration;
1578 bool mTryTCPInterleaving;
1579 bool mTryFakeRTCP;
1580 bool mReceivedFirstRTCPPacket;
1581 bool mReceivedFirstRTPPacket;
1582 bool mSeekable;
1583 int64_t mKeepAliveTimeoutUs;
1584 int32_t mKeepAliveGeneration;
1585 bool mPausing;
1586 int32_t mPauseGeneration;
1587
1588 Vector<TrackInfo> mTracks;
1589
1590 bool mPlayResponseParsed;
1591
setupTrackMyHandler1592 void setupTrack(size_t index) {
1593 sp<APacketSource> source =
1594 new APacketSource(mSessionDesc, index);
1595
1596 if (source->initCheck() != OK) {
1597 ALOGW("Unsupported format. Ignoring track #%zu.", index);
1598
1599 sp<AMessage> reply = new AMessage('setu', this);
1600 reply->setSize("index", index);
1601 reply->setInt32("result", ERROR_UNSUPPORTED);
1602 reply->post();
1603 return;
1604 }
1605
1606 AString url;
1607 CHECK(mSessionDesc->findAttribute(index, "a=control", &url));
1608
1609 AString trackURL;
1610 CHECK(MakeURL(mBaseURL.c_str(), url.c_str(), &trackURL));
1611
1612 mTracks.push(TrackInfo());
1613 TrackInfo *info = &mTracks.editItemAt(mTracks.size() - 1);
1614 info->mURL = trackURL;
1615 info->mPacketSource = source;
1616 info->mUsingInterleavedTCP = false;
1617 info->mFirstSeqNumInSegment = 0;
1618 info->mNewSegment = true;
1619 info->mAllowedStaleAccessUnits = kMaxAllowedStaleAccessUnits;
1620 info->mRTPSocket = -1;
1621 info->mRTCPSocket = -1;
1622 info->mRTPAnchor = 0;
1623 info->mNTPAnchorUs = -1;
1624 info->mNormalPlayTimeRTP = 0;
1625 info->mNormalPlayTimeUs = 0ll;
1626
1627 unsigned long PT;
1628 AString formatDesc;
1629 AString formatParams;
1630 mSessionDesc->getFormatType(index, &PT, &formatDesc, &formatParams);
1631
1632 int32_t timescale;
1633 int32_t numChannels;
1634 ASessionDescription::ParseFormatDesc(
1635 formatDesc.c_str(), ×cale, &numChannels);
1636
1637 info->mTimeScale = timescale;
1638 info->mEOSReceived = false;
1639
1640 ALOGV("track #%zu URL=%s", mTracks.size(), trackURL.c_str());
1641
1642 AString request = "SETUP ";
1643 request.append(trackURL);
1644 request.append(" RTSP/1.0\r\n");
1645
1646 if (mTryTCPInterleaving) {
1647 size_t interleaveIndex = 2 * (mTracks.size() - 1);
1648 info->mUsingInterleavedTCP = true;
1649 info->mRTPSocket = interleaveIndex;
1650 info->mRTCPSocket = interleaveIndex + 1;
1651
1652 request.append("Transport: RTP/AVP/TCP;interleaved=");
1653 request.append(interleaveIndex);
1654 request.append("-");
1655 request.append(interleaveIndex + 1);
1656 } else {
1657 unsigned rtpPort;
1658 ARTPConnection::MakePortPair(
1659 &info->mRTPSocket, &info->mRTCPSocket, &rtpPort);
1660
1661 if (mUIDValid) {
1662 HTTPBase::RegisterSocketUserTag(info->mRTPSocket, mUID,
1663 (uint32_t)*(uint32_t*) "RTP_");
1664 HTTPBase::RegisterSocketUserTag(info->mRTCPSocket, mUID,
1665 (uint32_t)*(uint32_t*) "RTP_");
1666 HTTPBase::RegisterSocketUserMark(info->mRTPSocket, mUID);
1667 HTTPBase::RegisterSocketUserMark(info->mRTCPSocket, mUID);
1668 }
1669
1670 request.append("Transport: RTP/AVP/UDP;unicast;client_port=");
1671 request.append(rtpPort);
1672 request.append("-");
1673 request.append(rtpPort + 1);
1674 }
1675
1676 request.append("\r\n");
1677
1678 if (index > 1) {
1679 request.append("Session: ");
1680 request.append(mSessionID);
1681 request.append("\r\n");
1682 }
1683
1684 request.append("\r\n");
1685
1686 sp<AMessage> reply = new AMessage('setu', this);
1687 reply->setSize("index", index);
1688 reply->setSize("track-index", mTracks.size() - 1);
1689 mConn->sendRequest(request.c_str(), reply);
1690 }
1691
MakeURLMyHandler1692 static bool MakeURL(const char *baseURL, const char *url, AString *out) {
1693 out->clear();
1694
1695 if (strncasecmp("rtsp://", baseURL, 7)) {
1696 // Base URL must be absolute
1697 return false;
1698 }
1699
1700 if (!strncasecmp("rtsp://", url, 7)) {
1701 // "url" is already an absolute URL, ignore base URL.
1702 out->setTo(url);
1703 return true;
1704 }
1705
1706 size_t n = strlen(baseURL);
1707 out->setTo(baseURL);
1708 if (baseURL[n - 1] != '/') {
1709 out->append("/");
1710 }
1711 out->append(url);
1712
1713 return true;
1714 }
1715
fakeTimestampsMyHandler1716 void fakeTimestamps() {
1717 mNTPAnchorUs = -1ll;
1718 for (size_t i = 0; i < mTracks.size(); ++i) {
1719 onTimeUpdate(i, 0, 0ll);
1720 }
1721 }
1722
dataReceivedOnAllChannelsMyHandler1723 bool dataReceivedOnAllChannels() {
1724 TrackInfo *track;
1725 for (size_t i = 0; i < mTracks.size(); ++i) {
1726 track = &mTracks.editItemAt(i);
1727 if (track->mPackets.empty()) {
1728 return false;
1729 }
1730 }
1731 return true;
1732 }
1733
handleFirstAccessUnitMyHandler1734 void handleFirstAccessUnit() {
1735 if (mFirstAccessUnit) {
1736 sp<AMessage> msg = mNotify->dup();
1737 msg->setInt32("what", kWhatConnected);
1738 msg->post();
1739
1740 if (mSeekable) {
1741 for (size_t i = 0; i < mTracks.size(); ++i) {
1742 TrackInfo *info = &mTracks.editItemAt(i);
1743
1744 postNormalPlayTimeMapping(
1745 i,
1746 info->mNormalPlayTimeRTP, info->mNormalPlayTimeUs);
1747 }
1748 }
1749
1750 mFirstAccessUnit = false;
1751 }
1752 }
1753
onTimeUpdateMyHandler1754 void onTimeUpdate(int32_t trackIndex, uint32_t rtpTime, uint64_t ntpTime) {
1755 ALOGV("onTimeUpdate track %d, rtpTime = 0x%08x, ntpTime = %#016llx",
1756 trackIndex, rtpTime, (long long)ntpTime);
1757
1758 int64_t ntpTimeUs = (int64_t)(ntpTime * 1E6 / (1ll << 32));
1759
1760 TrackInfo *track = &mTracks.editItemAt(trackIndex);
1761
1762 track->mRTPAnchor = rtpTime;
1763 track->mNTPAnchorUs = ntpTimeUs;
1764
1765 if (mNTPAnchorUs < 0) {
1766 mNTPAnchorUs = ntpTimeUs;
1767 mMediaAnchorUs = mLastMediaTimeUs;
1768 }
1769
1770 if (!mAllTracksHaveTime) {
1771 bool allTracksHaveTime = (mTracks.size() > 0);
1772 for (size_t i = 0; i < mTracks.size(); ++i) {
1773 TrackInfo *track = &mTracks.editItemAt(i);
1774 if (track->mNTPAnchorUs < 0) {
1775 allTracksHaveTime = false;
1776 break;
1777 }
1778 }
1779 if (allTracksHaveTime) {
1780 mAllTracksHaveTime = true;
1781 ALOGI("Time now established for all tracks.");
1782 }
1783 }
1784 if (mAllTracksHaveTime && dataReceivedOnAllChannels()) {
1785 handleFirstAccessUnit();
1786
1787 // Time is now established, lets start timestamping immediately
1788 for (size_t i = 0; i < mTracks.size(); ++i) {
1789 if (OK != processAccessUnitQueue(i)) {
1790 return;
1791 }
1792 }
1793 for (size_t i = 0; i < mTracks.size(); ++i) {
1794 TrackInfo *trackInfo = &mTracks.editItemAt(i);
1795 if (trackInfo->mEOSReceived) {
1796 postQueueEOS(i, ERROR_END_OF_STREAM);
1797 trackInfo->mEOSReceived = false;
1798 }
1799 }
1800 }
1801 }
1802
processAccessUnitQueueMyHandler1803 status_t processAccessUnitQueue(int32_t trackIndex) {
1804 TrackInfo *track = &mTracks.editItemAt(trackIndex);
1805 while (!track->mPackets.empty()) {
1806 sp<ABuffer> accessUnit = *track->mPackets.begin();
1807 track->mPackets.erase(track->mPackets.begin());
1808
1809 uint32_t seqNum = (uint32_t)accessUnit->int32Data();
1810 if (track->mNewSegment) {
1811 // The sequence number from RTP packet has only 16 bits and is extended
1812 // by ARTPSource. Only the low 16 bits of seq in RTP-Info of reply of
1813 // RTSP "PLAY" command should be used to detect the first RTP packet
1814 // after seeking.
1815 if (mSeekable) {
1816 if (track->mAllowedStaleAccessUnits > 0) {
1817 uint32_t seqNum16 = seqNum & 0xffff;
1818 uint32_t firstSeqNumInSegment16 = track->mFirstSeqNumInSegment & 0xffff;
1819 if (seqNum16 > firstSeqNumInSegment16 + kMaxAllowedStaleAccessUnits
1820 || seqNum16 < firstSeqNumInSegment16) {
1821 // Not the first rtp packet of the stream after seeking, discarding.
1822 track->mAllowedStaleAccessUnits--;
1823 ALOGV("discarding stale access unit (0x%x : 0x%x)",
1824 seqNum, track->mFirstSeqNumInSegment);
1825 continue;
1826 }
1827 ALOGW_IF(seqNum16 != firstSeqNumInSegment16,
1828 "Missing the first packet(%u), now take packet(%u) as first one",
1829 track->mFirstSeqNumInSegment, seqNum);
1830 } else { // track->mAllowedStaleAccessUnits <= 0
1831 mNumAccessUnitsReceived = 0;
1832 ALOGW_IF(track->mAllowedStaleAccessUnits == 0,
1833 "Still no first rtp packet after %d stale ones",
1834 kMaxAllowedStaleAccessUnits);
1835 track->mAllowedStaleAccessUnits = -1;
1836 return UNKNOWN_ERROR;
1837 }
1838 }
1839
1840 // Now found the first rtp packet of the stream after seeking.
1841 track->mFirstSeqNumInSegment = seqNum;
1842 track->mNewSegment = false;
1843 }
1844
1845 if (seqNum < track->mFirstSeqNumInSegment) {
1846 ALOGV("dropping stale access-unit (%d < %d)",
1847 seqNum, track->mFirstSeqNumInSegment);
1848 continue;
1849 }
1850
1851 if (addMediaTimestamp(trackIndex, track, accessUnit)) {
1852 postQueueAccessUnit(trackIndex, accessUnit);
1853 }
1854 }
1855 return OK;
1856 }
1857
onAccessUnitCompleteMyHandler1858 void onAccessUnitComplete(
1859 int32_t trackIndex, const sp<ABuffer> &accessUnit) {
1860 TrackInfo *track = &mTracks.editItemAt(trackIndex);
1861 track->mPackets.push_back(accessUnit);
1862
1863 uint32_t seqNum = (uint32_t)accessUnit->int32Data();
1864 ALOGV("onAccessUnitComplete track %d storing accessunit %u", trackIndex, seqNum);
1865
1866 if(!mPlayResponseParsed){
1867 ALOGV("play response is not parsed");
1868 return;
1869 }
1870
1871 handleFirstAccessUnit();
1872
1873 if (!mAllTracksHaveTime) {
1874 ALOGV("storing accessUnit, no time established yet");
1875 return;
1876 }
1877
1878 if (OK != processAccessUnitQueue(trackIndex)) {
1879 return;
1880 }
1881
1882 if (track->mEOSReceived) {
1883 postQueueEOS(trackIndex, ERROR_END_OF_STREAM);
1884 track->mEOSReceived = false;
1885 }
1886 }
1887
addMediaTimestampMyHandler1888 bool addMediaTimestamp(
1889 int32_t trackIndex, const TrackInfo *track,
1890 const sp<ABuffer> &accessUnit) {
1891 UNUSED_UNLESS_VERBOSE(trackIndex);
1892
1893 uint32_t rtpTime;
1894 CHECK(accessUnit->meta()->findInt32(
1895 "rtp-time", (int32_t *)&rtpTime));
1896
1897 int64_t relRtpTimeUs =
1898 (((int64_t)rtpTime - (int64_t)track->mRTPAnchor) * 1000000ll)
1899 / track->mTimeScale;
1900
1901 int64_t ntpTimeUs = track->mNTPAnchorUs + relRtpTimeUs;
1902
1903 int64_t mediaTimeUs = mMediaAnchorUs + ntpTimeUs - mNTPAnchorUs;
1904
1905 if (mediaTimeUs > mLastMediaTimeUs) {
1906 mLastMediaTimeUs = mediaTimeUs;
1907 }
1908
1909 if (mediaTimeUs < 0) {
1910 ALOGV("dropping early accessUnit.");
1911 return false;
1912 }
1913
1914 ALOGV("track %d rtpTime=%u mediaTimeUs = %lld us (%.2f secs)",
1915 trackIndex, rtpTime, (long long)mediaTimeUs, mediaTimeUs / 1E6);
1916
1917 accessUnit->meta()->setInt64("timeUs", mediaTimeUs);
1918
1919 return true;
1920 }
1921
postQueueAccessUnitMyHandler1922 void postQueueAccessUnit(
1923 size_t trackIndex, const sp<ABuffer> &accessUnit) {
1924 sp<AMessage> msg = mNotify->dup();
1925 msg->setInt32("what", kWhatAccessUnit);
1926 msg->setSize("trackIndex", trackIndex);
1927 msg->setBuffer("accessUnit", accessUnit);
1928 msg->post();
1929 }
1930
postQueueEOSMyHandler1931 void postQueueEOS(size_t trackIndex, status_t finalResult) {
1932 sp<AMessage> msg = mNotify->dup();
1933 msg->setInt32("what", kWhatEOS);
1934 msg->setSize("trackIndex", trackIndex);
1935 msg->setInt32("finalResult", finalResult);
1936 msg->post();
1937 }
1938
postQueueSeekDiscontinuityMyHandler1939 void postQueueSeekDiscontinuity(size_t trackIndex) {
1940 sp<AMessage> msg = mNotify->dup();
1941 msg->setInt32("what", kWhatSeekDiscontinuity);
1942 msg->setSize("trackIndex", trackIndex);
1943 msg->post();
1944 }
1945
postNormalPlayTimeMappingMyHandler1946 void postNormalPlayTimeMapping(
1947 size_t trackIndex, uint32_t rtpTime, int64_t nptUs) {
1948 sp<AMessage> msg = mNotify->dup();
1949 msg->setInt32("what", kWhatNormalPlayTimeMapping);
1950 msg->setSize("trackIndex", trackIndex);
1951 msg->setInt32("rtpTime", rtpTime);
1952 msg->setInt64("nptUs", nptUs);
1953 msg->post();
1954 }
1955
postTimeoutMyHandler1956 void postTimeout() {
1957 sp<AMessage> timeout = new AMessage('tiou', this);
1958 mCheckTimeoutGeneration++;
1959 timeout->setInt32("tioucheck", mCheckTimeoutGeneration);
1960
1961 int64_t startupTimeoutUs;
1962 startupTimeoutUs = property_get_int64("media.rtsp.timeout-us", kStartupTimeoutUs);
1963 timeout->post(startupTimeoutUs);
1964 }
1965
1966 DISALLOW_EVIL_CONSTRUCTORS(MyHandler);
1967 };
1968
1969 } // namespace android
1970
1971 #endif // MY_HANDLER_H_
1972