1 /*
2  * Copyright (C) 2010 The Android Open Source Project
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *      http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 #ifndef MY_HANDLER_H_
18 
19 #define MY_HANDLER_H_
20 
21 //#define LOG_NDEBUG 0
22 
23 #ifndef LOG_TAG
24 #define LOG_TAG "MyHandler"
25 #endif
26 
27 #include <utils/Log.h>
28 #include <cutils/properties.h> // for property_get
29 
30 #include "APacketSource.h"
31 #include "ARTPConnection.h"
32 #include "ARTSPConnection.h"
33 #include "ASessionDescription.h"
34 #include "NetworkUtils.h"
35 
36 #include <ctype.h>
37 #include <cutils/properties.h>
38 
39 #include <media/stagefright/foundation/ABuffer.h>
40 #include <media/stagefright/foundation/ADebug.h>
41 #include <media/stagefright/foundation/ALooper.h>
42 #include <media/stagefright/foundation/AMessage.h>
43 #include <media/stagefright/MediaErrors.h>
44 #include <media/stagefright/Utils.h>
45 
46 #include <arpa/inet.h>
47 #include <sys/socket.h>
48 #include <netdb.h>
49 
50 #include "HTTPBase.h"
51 
52 #if LOG_NDEBUG
53 #define UNUSED_UNLESS_VERBOSE(x) (void)(x)
54 #else
55 #define UNUSED_UNLESS_VERBOSE(x)
56 #endif
57 
58 #ifndef FALLTHROUGH_INTENDED
59 #define FALLTHROUGH_INTENDED [[clang::fallthrough]]  // NOLINT
60 #endif
61 
62 // If no access units are received within 5 secs, assume that the rtp
63 // stream has ended and signal end of stream.
64 static int64_t kAccessUnitTimeoutUs = 10000000ll;
65 
66 // If no access units arrive for the first 10 secs after starting the
67 // stream, assume none ever will and signal EOS or switch transports.
68 static int64_t kStartupTimeoutUs = 10000000ll;
69 
70 static int64_t kDefaultKeepAliveTimeoutUs = 60000000ll;
71 
72 static int64_t kPauseDelayUs = 3000000ll;
73 
74 // The allowed maximum number of stale access units at the beginning of
75 // a new sequence.
76 static int32_t kMaxAllowedStaleAccessUnits = 20;
77 
78 static int64_t kTearDownTimeoutUs = 3000000ll;
79 
80 namespace android {
81 
GetAttribute(const char * s,const char * key,AString * value)82 static bool GetAttribute(const char *s, const char *key, AString *value) {
83     value->clear();
84 
85     size_t keyLen = strlen(key);
86 
87     for (;;) {
88         while (isspace(*s)) {
89             ++s;
90         }
91 
92         const char *colonPos = strchr(s, ';');
93 
94         size_t len =
95             (colonPos == NULL) ? strlen(s) : colonPos - s;
96 
97         if (len >= keyLen + 1 && s[keyLen] == '=' && !strncmp(s, key, keyLen)) {
98             value->setTo(&s[keyLen + 1], len - keyLen - 1);
99             return true;
100         }
101 
102         if (colonPos == NULL) {
103             return false;
104         }
105 
106         s = colonPos + 1;
107     }
108 }
109 
110 struct MyHandler : public AHandler {
111     enum {
112         kWhatConnected                  = 'conn',
113         kWhatDisconnected               = 'disc',
114         kWhatSeekPaused                 = 'spau',
115         kWhatSeekDone                   = 'sdon',
116 
117         kWhatAccessUnit                 = 'accU',
118         kWhatEOS                        = 'eos!',
119         kWhatSeekDiscontinuity          = 'seeD',
120         kWhatNormalPlayTimeMapping      = 'nptM',
121     };
122 
123     MyHandler(
124             const char *url,
125             const sp<AMessage> &notify,
126             bool uidValid = false, uid_t uid = 0)
mNotifyMyHandler127         : mNotify(notify),
128           mUIDValid(uidValid),
129           mUID(uid),
130           mNetLooper(new ALooper),
131           mConn(new ARTSPConnection(mUIDValid, mUID)),
132           mRTPConn(new ARTPConnection),
133           mOriginalSessionURL(url),
134           mSessionURL(url),
135           mSetupTracksSuccessful(false),
136           mSeekPending(false),
137           mFirstAccessUnit(true),
138           mAllTracksHaveTime(false),
139           mNTPAnchorUs(-1),
140           mMediaAnchorUs(-1),
141           mLastMediaTimeUs(0),
142           mNumAccessUnitsReceived(0),
143           mCheckPending(false),
144           mCheckGeneration(0),
145           mCheckTimeoutGeneration(0),
146           mTryTCPInterleaving(property_get_bool("rtp.transport.TCP", false)),
147           mTryFakeRTCP(false),
148           mReceivedFirstRTCPPacket(false),
149           mReceivedFirstRTPPacket(false),
150           mSeekable(true),
151           mKeepAliveTimeoutUs(kDefaultKeepAliveTimeoutUs),
152           mKeepAliveGeneration(0),
153           mPausing(false),
154           mPauseGeneration(0),
155           mPlayResponseParsed(false) {
156         mNetLooper->setName("rtsp net");
157         mNetLooper->start(false /* runOnCallingThread */,
158                           false /* canCallJava */,
159                           PRIORITY_HIGHEST);
160 
161         // Strip any authentication info from the session url, we don't
162         // want to transmit user/pass in cleartext.
163         AString host, path, user, pass;
164         unsigned port;
165         CHECK(ARTSPConnection::ParseURL(
166                     mSessionURL.c_str(), &host, &port, &path, &user, &pass));
167 
168         if (user.size() > 0) {
169             mSessionURL.clear();
170             mSessionURL.append("rtsp://");
171             mSessionURL.append(host);
172             mSessionURL.append(":");
173             mSessionURL.append(AStringPrintf("%u", port));
174             mSessionURL.append(path);
175 
176             ALOGV("rewritten session url: '%s'", mSessionURL.c_str());
177         }
178 
179         mSessionHost = host;
180     }
181 
connectMyHandler182     void connect() {
183         looper()->registerHandler(mConn);
184         (1 ? mNetLooper : looper())->registerHandler(mRTPConn);
185 
186         sp<AMessage> notify = new AMessage('biny', this);
187         mConn->observeBinaryData(notify);
188 
189         sp<AMessage> reply = new AMessage('conn', this);
190         mConn->connect(mOriginalSessionURL.c_str(), reply);
191     }
192 
loadSDPMyHandler193     void loadSDP(const sp<ASessionDescription>& desc) {
194         looper()->registerHandler(mConn);
195         (1 ? mNetLooper : looper())->registerHandler(mRTPConn);
196 
197         sp<AMessage> notify = new AMessage('biny', this);
198         mConn->observeBinaryData(notify);
199 
200         sp<AMessage> reply = new AMessage('sdpl', this);
201         reply->setObject("description", desc);
202         mConn->connect(mOriginalSessionURL.c_str(), reply);
203     }
204 
getControlURLMyHandler205     AString getControlURL() {
206         AString sessionLevelControlURL;
207         if (mSessionDesc->findAttribute(
208                 0,
209                 "a=control",
210                 &sessionLevelControlURL)) {
211             if (sessionLevelControlURL.compare("*") == 0) {
212                 return mBaseURL;
213             } else {
214                 AString controlURL;
215                 CHECK(MakeURL(
216                         mBaseURL.c_str(),
217                         sessionLevelControlURL.c_str(),
218                         &controlURL));
219                 return controlURL;
220             }
221         } else {
222             return mSessionURL;
223         }
224     }
225 
disconnectMyHandler226     void disconnect() {
227         (new AMessage('abor', this))->post();
228     }
229 
seekMyHandler230     void seek(int64_t timeUs) {
231         sp<AMessage> msg = new AMessage('seek', this);
232         msg->setInt64("time", timeUs);
233         mPauseGeneration++;
234         msg->post();
235     }
236 
continueSeekAfterPauseMyHandler237     void continueSeekAfterPause(int64_t timeUs) {
238         sp<AMessage> msg = new AMessage('see1', this);
239         msg->setInt64("time", timeUs);
240         msg->post();
241     }
242 
isSeekableMyHandler243     bool isSeekable() const {
244         return mSeekable;
245     }
246 
pauseMyHandler247     void pause() {
248         sp<AMessage> msg = new AMessage('paus', this);
249         mPauseGeneration++;
250         msg->setInt32("pausecheck", mPauseGeneration);
251         msg->post();
252     }
253 
resumeMyHandler254     void resume() {
255         sp<AMessage> msg = new AMessage('resu', this);
256         mPauseGeneration++;
257         msg->post();
258     }
259 
addRRMyHandler260     static void addRR(const sp<ABuffer> &buf) {
261         uint8_t *ptr = buf->data() + buf->size();
262         ptr[0] = 0x80 | 0;
263         ptr[1] = 201;  // RR
264         ptr[2] = 0;
265         ptr[3] = 1;
266         ptr[4] = 0xde;  // SSRC
267         ptr[5] = 0xad;
268         ptr[6] = 0xbe;
269         ptr[7] = 0xef;
270 
271         buf->setRange(0, buf->size() + 8);
272     }
273 
addSDESMyHandler274     static void addSDES(int s, const sp<ABuffer> &buffer) {
275         struct sockaddr_in addr;
276         socklen_t addrSize = sizeof(addr);
277         if (getsockname(s, (sockaddr *)&addr, &addrSize) != 0) {
278             inet_aton("0.0.0.0", &(addr.sin_addr));
279         }
280 
281         uint8_t *data = buffer->data() + buffer->size();
282         data[0] = 0x80 | 1;
283         data[1] = 202;  // SDES
284         data[4] = 0xde;  // SSRC
285         data[5] = 0xad;
286         data[6] = 0xbe;
287         data[7] = 0xef;
288 
289         size_t offset = 8;
290 
291         data[offset++] = 1;  // CNAME
292 
293         AString cname = "stagefright@";
294         cname.append(inet_ntoa(addr.sin_addr));
295         data[offset++] = cname.size();
296 
297         memcpy(&data[offset], cname.c_str(), cname.size());
298         offset += cname.size();
299 
300         data[offset++] = 6;  // TOOL
301 
302         AString tool = MakeUserAgent();
303 
304         data[offset++] = tool.size();
305 
306         memcpy(&data[offset], tool.c_str(), tool.size());
307         offset += tool.size();
308 
309         data[offset++] = 0;
310 
311         if ((offset % 4) > 0) {
312             size_t count = 4 - (offset % 4);
313             switch (count) {
314                 case 3:
315                     data[offset++] = 0;
316                     FALLTHROUGH_INTENDED;
317                 case 2:
318                     data[offset++] = 0;
319                     FALLTHROUGH_INTENDED;
320                 case 1:
321                     data[offset++] = 0;
322             }
323         }
324 
325         size_t numWords = (offset / 4) - 1;
326         data[2] = numWords >> 8;
327         data[3] = numWords & 0xff;
328 
329         buffer->setRange(buffer->offset(), buffer->size() + offset);
330     }
331 
332     // In case we're behind NAT, fire off two UDP packets to the remote
333     // rtp/rtcp ports to poke a hole into the firewall for future incoming
334     // packets. We're going to send an RR/SDES RTCP packet to both of them.
pokeAHoleMyHandler335     bool pokeAHole(int rtpSocket, int rtcpSocket, const AString &transport) {
336         struct sockaddr_in addr;
337         memset(addr.sin_zero, 0, sizeof(addr.sin_zero));
338         addr.sin_family = AF_INET;
339 
340         AString source;
341         AString server_port;
342         if (!GetAttribute(transport.c_str(),
343                           "source",
344                           &source)) {
345             ALOGW("Missing 'source' field in Transport response. Using "
346                  "RTSP endpoint address.");
347 
348             struct hostent *ent = gethostbyname(mSessionHost.c_str());
349             if (ent == NULL) {
350                 ALOGE("Failed to look up address of session host");
351 
352                 return false;
353             }
354 
355             addr.sin_addr.s_addr = *(in_addr_t *)ent->h_addr;
356         } else {
357             addr.sin_addr.s_addr = inet_addr(source.c_str());
358         }
359 
360         if (!GetAttribute(transport.c_str(),
361                                  "server_port",
362                                  &server_port)) {
363             ALOGI("Missing 'server_port' field in Transport response.");
364             return false;
365         }
366 
367         int rtpPort, rtcpPort;
368         if (sscanf(server_port.c_str(), "%d-%d", &rtpPort, &rtcpPort) != 2
369                 || rtpPort <= 0 || rtpPort > 65535
370                 || rtcpPort <=0 || rtcpPort > 65535
371                 || rtcpPort != rtpPort + 1) {
372             ALOGE("Server picked invalid RTP/RTCP port pair %s,"
373                  " RTP port must be even, RTCP port must be one higher.",
374                  server_port.c_str());
375 
376             return false;
377         }
378 
379         if (rtpPort & 1) {
380             ALOGW("Server picked an odd RTP port, it should've picked an "
381                  "even one, we'll let it pass for now, but this may break "
382                  "in the future.");
383         }
384 
385         if (addr.sin_addr.s_addr == INADDR_NONE) {
386             return true;
387         }
388 
389         if (IN_LOOPBACK(ntohl(addr.sin_addr.s_addr))) {
390             // No firewalls to traverse on the loopback interface.
391             return true;
392         }
393 
394         // Make up an RR/SDES RTCP packet.
395         sp<ABuffer> buf = new ABuffer(65536);
396         buf->setRange(0, 0);
397         addRR(buf);
398         addSDES(rtpSocket, buf);
399 
400         addr.sin_port = htons(rtpPort);
401 
402         ssize_t n = sendto(
403                 rtpSocket, buf->data(), buf->size(), 0,
404                 (const sockaddr *)&addr, sizeof(addr));
405 
406         if (n < (ssize_t)buf->size()) {
407             ALOGE("failed to poke a hole for RTP packets");
408             return false;
409         }
410 
411         addr.sin_port = htons(rtcpPort);
412 
413         n = sendto(
414                 rtcpSocket, buf->data(), buf->size(), 0,
415                 (const sockaddr *)&addr, sizeof(addr));
416 
417         if (n < (ssize_t)buf->size()) {
418             ALOGE("failed to poke a hole for RTCP packets");
419             return false;
420         }
421 
422         ALOGV("successfully poked holes.");
423 
424         return true;
425     }
426 
isLiveStreamMyHandler427     static bool isLiveStream(const sp<ASessionDescription> &desc) {
428         AString attrLiveStream;
429         if (desc->findAttribute(0, "a=LiveStream", &attrLiveStream)) {
430             ssize_t semicolonPos = attrLiveStream.find(";", 2);
431 
432             const char* liveStreamValue;
433             if (semicolonPos < 0) {
434                 liveStreamValue = attrLiveStream.c_str();
435             } else {
436                 AString valString;
437                 valString.setTo(attrLiveStream,
438                         semicolonPos + 1,
439                         attrLiveStream.size() - semicolonPos - 1);
440                 liveStreamValue = valString.c_str();
441             }
442 
443             uint32_t value = strtoul(liveStreamValue, NULL, 10);
444             if (value == 1) {
445                 ALOGV("found live stream");
446                 return true;
447             }
448         } else {
449             // It is a live stream if no duration is returned
450             int64_t durationUs;
451             if (!desc->getDurationUs(&durationUs)) {
452                 ALOGV("No duration found, assume live stream");
453                 return true;
454             }
455         }
456 
457         return false;
458     }
459 
onMessageReceivedMyHandler460     virtual void onMessageReceived(const sp<AMessage> &msg) {
461         switch (msg->what()) {
462             case 'conn':
463             {
464                 int32_t result;
465                 CHECK(msg->findInt32("result", &result));
466 
467                 ALOGI("connection request completed with result %d (%s)",
468                      result, strerror(-result));
469 
470                 if (result == OK) {
471                     AString request;
472                     request = "DESCRIBE ";
473                     request.append(mSessionURL);
474                     request.append(" RTSP/1.0\r\n");
475                     request.append("Accept: application/sdp\r\n");
476                     request.append("\r\n");
477 
478                     sp<AMessage> reply = new AMessage('desc', this);
479                     mConn->sendRequest(request.c_str(), reply);
480                 } else {
481                     (new AMessage('disc', this))->post();
482                 }
483                 break;
484             }
485 
486             case 'disc':
487             {
488                 ++mKeepAliveGeneration;
489 
490                 int32_t reconnect;
491                 if (msg->findInt32("reconnect", &reconnect) && reconnect) {
492                     sp<AMessage> reply = new AMessage('conn', this);
493                     mConn->connect(mOriginalSessionURL.c_str(), reply);
494                 } else {
495                     (new AMessage('quit', this))->post();
496                 }
497                 break;
498             }
499 
500             case 'desc':
501             {
502                 int32_t result;
503                 CHECK(msg->findInt32("result", &result));
504 
505                 ALOGI("DESCRIBE completed with result %d (%s)",
506                      result, strerror(-result));
507 
508                 if (result == OK) {
509                     sp<RefBase> obj;
510                     CHECK(msg->findObject("response", &obj));
511                     sp<ARTSPResponse> response =
512                         static_cast<ARTSPResponse *>(obj.get());
513 
514                     if (response->mStatusCode == 301 || response->mStatusCode == 302) {
515                         ssize_t i = response->mHeaders.indexOfKey("location");
516                         CHECK_GE(i, 0);
517 
518                         mOriginalSessionURL = response->mHeaders.valueAt(i);
519                         mSessionURL = mOriginalSessionURL;
520 
521                         // Strip any authentication info from the session url, we don't
522                         // want to transmit user/pass in cleartext.
523                         AString host, path, user, pass;
524                         unsigned port;
525                         if (ARTSPConnection::ParseURL(
526                                     mSessionURL.c_str(), &host, &port, &path, &user, &pass)
527                                 && user.size() > 0) {
528                             mSessionURL.clear();
529                             mSessionURL.append("rtsp://");
530                             mSessionURL.append(host);
531                             mSessionURL.append(":");
532                             mSessionURL.append(AStringPrintf("%u", port));
533                             mSessionURL.append(path);
534 
535                             ALOGV("rewritten session url: '%s'", mSessionURL.c_str());
536                         }
537 
538                         sp<AMessage> reply = new AMessage('conn', this);
539                         mConn->connect(mOriginalSessionURL.c_str(), reply);
540                         break;
541                     }
542 
543                     if (response->mStatusCode != 200) {
544                         result = UNKNOWN_ERROR;
545                     } else if (response->mContent == NULL) {
546                         result = ERROR_MALFORMED;
547                         ALOGE("The response has no content.");
548                     } else {
549                         mSessionDesc = new ASessionDescription;
550 
551                         mSessionDesc->setTo(
552                                 response->mContent->data(),
553                                 response->mContent->size());
554 
555                         if (!mSessionDesc->isValid()) {
556                             ALOGE("Failed to parse session description.");
557                             result = ERROR_MALFORMED;
558                         } else {
559                             ssize_t i = response->mHeaders.indexOfKey("content-base");
560                             if (i >= 0) {
561                                 mBaseURL = response->mHeaders.valueAt(i);
562                             } else {
563                                 i = response->mHeaders.indexOfKey("content-location");
564                                 if (i >= 0) {
565                                     mBaseURL = response->mHeaders.valueAt(i);
566                                 } else {
567                                     mBaseURL = mSessionURL;
568                                 }
569                             }
570 
571                             mSeekable = !isLiveStream(mSessionDesc);
572 
573                             if (!mBaseURL.startsWith("rtsp://")) {
574                                 // Some misbehaving servers specify a relative
575                                 // URL in one of the locations above, combine
576                                 // it with the absolute session URL to get
577                                 // something usable...
578 
579                                 ALOGW("Server specified a non-absolute base URL"
580                                      ", combining it with the session URL to "
581                                      "get something usable...");
582 
583                                 AString tmp;
584                                 CHECK(MakeURL(
585                                             mSessionURL.c_str(),
586                                             mBaseURL.c_str(),
587                                             &tmp));
588 
589                                 mBaseURL = tmp;
590                             }
591 
592                             mControlURL = getControlURL();
593 
594                             if (mSessionDesc->countTracks() < 2) {
595                                 // There's no actual tracks in this session.
596                                 // The first "track" is merely session meta
597                                 // data.
598 
599                                 ALOGW("Session doesn't contain any playable "
600                                      "tracks. Aborting.");
601                                 result = ERROR_UNSUPPORTED;
602                             } else {
603                                 setupTrack(1);
604                             }
605                         }
606                     }
607                 }
608 
609                 if (result != OK) {
610                     sp<AMessage> reply = new AMessage('disc', this);
611                     mConn->disconnect(reply);
612                 }
613                 break;
614             }
615 
616             case 'sdpl':
617             {
618                 int32_t result;
619                 CHECK(msg->findInt32("result", &result));
620 
621                 ALOGI("SDP connection request completed with result %d (%s)",
622                      result, strerror(-result));
623 
624                 if (result == OK) {
625                     sp<RefBase> obj;
626                     CHECK(msg->findObject("description", &obj));
627                     mSessionDesc =
628                         static_cast<ASessionDescription *>(obj.get());
629 
630                     if (!mSessionDesc->isValid()) {
631                         ALOGE("Failed to parse session description.");
632                         result = ERROR_MALFORMED;
633                     } else {
634                         mBaseURL = mSessionURL;
635 
636                         mSeekable = !isLiveStream(mSessionDesc);
637 
638                         mControlURL = getControlURL();
639 
640                         if (mSessionDesc->countTracks() < 2) {
641                             // There's no actual tracks in this session.
642                             // The first "track" is merely session meta
643                             // data.
644 
645                             ALOGW("Session doesn't contain any playable "
646                                  "tracks. Aborting.");
647                             result = ERROR_UNSUPPORTED;
648                         } else {
649                             setupTrack(1);
650                         }
651                     }
652                 }
653 
654                 if (result != OK) {
655                     sp<AMessage> reply = new AMessage('disc', this);
656                     mConn->disconnect(reply);
657                 }
658                 break;
659             }
660 
661             case 'setu':
662             {
663                 size_t index;
664                 CHECK(msg->findSize("index", &index));
665 
666                 TrackInfo *track = NULL;
667                 size_t trackIndex;
668                 if (msg->findSize("track-index", &trackIndex)) {
669                     track = &mTracks.editItemAt(trackIndex);
670                 }
671 
672                 int32_t result;
673                 CHECK(msg->findInt32("result", &result));
674 
675                 ALOGI("SETUP(%zu) completed with result %d (%s)",
676                      index, result, strerror(-result));
677 
678                 if (result == OK) {
679                     CHECK(track != NULL);
680 
681                     sp<RefBase> obj;
682                     CHECK(msg->findObject("response", &obj));
683                     sp<ARTSPResponse> response =
684                         static_cast<ARTSPResponse *>(obj.get());
685 
686                     if (response->mStatusCode != 200) {
687                         result = UNKNOWN_ERROR;
688                     } else {
689                         ssize_t i = response->mHeaders.indexOfKey("session");
690                         CHECK_GE(i, 0);
691 
692                         mSessionID = response->mHeaders.valueAt(i);
693 
694                         mKeepAliveTimeoutUs = kDefaultKeepAliveTimeoutUs;
695                         AString timeoutStr;
696                         if (GetAttribute(
697                                     mSessionID.c_str(), "timeout", &timeoutStr)) {
698                             char *end;
699                             unsigned long timeoutSecs =
700                                 strtoul(timeoutStr.c_str(), &end, 10);
701 
702                             if (end == timeoutStr.c_str() || *end != '\0') {
703                                 ALOGW("server specified malformed timeout '%s'",
704                                      timeoutStr.c_str());
705 
706                                 mKeepAliveTimeoutUs = kDefaultKeepAliveTimeoutUs;
707                             } else if (timeoutSecs < 15) {
708                                 ALOGW("server specified too short a timeout "
709                                      "(%lu secs), using default.",
710                                      timeoutSecs);
711 
712                                 mKeepAliveTimeoutUs = kDefaultKeepAliveTimeoutUs;
713                             } else {
714                                 mKeepAliveTimeoutUs = timeoutSecs * 1000000ll;
715 
716                                 ALOGI("server specified timeout of %lu secs.",
717                                      timeoutSecs);
718                             }
719                         }
720 
721                         i = mSessionID.find(";");
722                         if (i >= 0) {
723                             // Remove options, i.e. ";timeout=90"
724                             mSessionID.erase(i, mSessionID.size() - i);
725                         }
726 
727                         sp<AMessage> notify = new AMessage('accu', this);
728                         notify->setSize("track-index", trackIndex);
729 
730                         i = response->mHeaders.indexOfKey("transport");
731                         CHECK_GE(i, 0);
732 
733                         if (track->mRTPSocket != -1 && track->mRTCPSocket != -1) {
734                             if (!track->mUsingInterleavedTCP) {
735                                 AString transport = response->mHeaders.valueAt(i);
736 
737                                 // We are going to continue even if we were
738                                 // unable to poke a hole into the firewall...
739                                 pokeAHole(
740                                         track->mRTPSocket,
741                                         track->mRTCPSocket,
742                                         transport);
743                             }
744 
745                             mRTPConn->addStream(
746                                     track->mRTPSocket, track->mRTCPSocket,
747                                     mSessionDesc, index,
748                                     notify, track->mUsingInterleavedTCP);
749 
750                             mSetupTracksSuccessful = true;
751                         } else {
752                             result = BAD_VALUE;
753                         }
754                     }
755                 }
756 
757                 if (result != OK) {
758                     if (track) {
759                         if (!track->mUsingInterleavedTCP) {
760                             // Clear the tag
761                             if (mUIDValid) {
762                                 NetworkUtils::UnRegisterSocketUserTag(track->mRTPSocket);
763                                 NetworkUtils::UnRegisterSocketUserMark(track->mRTPSocket);
764                                 NetworkUtils::UnRegisterSocketUserTag(track->mRTCPSocket);
765                                 NetworkUtils::UnRegisterSocketUserMark(track->mRTCPSocket);
766                             }
767 
768                             close(track->mRTPSocket);
769                             close(track->mRTCPSocket);
770                         }
771 
772                         mTracks.removeItemsAt(trackIndex);
773                     }
774                 }
775 
776                 ++index;
777                 if (result == OK && index < mSessionDesc->countTracks()) {
778                     setupTrack(index);
779                 } else if (mSetupTracksSuccessful) {
780                     ++mKeepAliveGeneration;
781                     postKeepAlive();
782 
783                     AString request = "PLAY ";
784                     request.append(mControlURL);
785                     request.append(" RTSP/1.0\r\n");
786 
787                     request.append("Session: ");
788                     request.append(mSessionID);
789                     request.append("\r\n");
790 
791                     request.append("\r\n");
792 
793                     sp<AMessage> reply = new AMessage('play', this);
794                     mConn->sendRequest(request.c_str(), reply);
795                 } else {
796                     sp<AMessage> reply = new AMessage('disc', this);
797                     mConn->disconnect(reply);
798                 }
799                 break;
800             }
801 
802             case 'play':
803             {
804                 int32_t result;
805                 CHECK(msg->findInt32("result", &result));
806 
807                 ALOGI("PLAY completed with result %d (%s)",
808                      result, strerror(-result));
809 
810                 if (result == OK) {
811                     sp<RefBase> obj;
812                     CHECK(msg->findObject("response", &obj));
813                     sp<ARTSPResponse> response =
814                         static_cast<ARTSPResponse *>(obj.get());
815 
816                     if (response->mStatusCode != 200) {
817                         result = UNKNOWN_ERROR;
818                     } else {
819                         parsePlayResponse(response);
820                         postTimeout();
821                     }
822                 }
823 
824                 if (result != OK) {
825                     sp<AMessage> reply = new AMessage('disc', this);
826                     mConn->disconnect(reply);
827                 }
828 
829                 break;
830             }
831 
832             case 'aliv':
833             {
834                 int32_t generation;
835                 CHECK(msg->findInt32("generation", &generation));
836 
837                 if (generation != mKeepAliveGeneration) {
838                     // obsolete event.
839                     break;
840                 }
841 
842                 AString request;
843                 request.append("OPTIONS ");
844                 request.append(mSessionURL);
845                 request.append(" RTSP/1.0\r\n");
846                 request.append("Session: ");
847                 request.append(mSessionID);
848                 request.append("\r\n");
849                 request.append("\r\n");
850 
851                 sp<AMessage> reply = new AMessage('opts', this);
852                 reply->setInt32("generation", mKeepAliveGeneration);
853                 mConn->sendRequest(request.c_str(), reply);
854                 break;
855             }
856 
857             case 'opts':
858             {
859                 int32_t result;
860                 CHECK(msg->findInt32("result", &result));
861 
862                 ALOGI("OPTIONS completed with result %d (%s)",
863                      result, strerror(-result));
864 
865                 int32_t generation;
866                 CHECK(msg->findInt32("generation", &generation));
867 
868                 if (generation != mKeepAliveGeneration) {
869                     // obsolete event.
870                     break;
871                 }
872 
873                 postKeepAlive();
874                 break;
875             }
876 
877             case 'abor':
878             {
879                 for (size_t i = 0; i < mTracks.size(); ++i) {
880                     TrackInfo *info = &mTracks.editItemAt(i);
881 
882                     if (!mFirstAccessUnit) {
883                         postQueueEOS(i, ERROR_END_OF_STREAM);
884                     }
885 
886                     if (!info->mUsingInterleavedTCP) {
887                         mRTPConn->removeStream(info->mRTPSocket, info->mRTCPSocket);
888 
889                         // Clear the tag
890                         if (mUIDValid) {
891                             NetworkUtils::UnRegisterSocketUserTag(info->mRTPSocket);
892                             NetworkUtils::UnRegisterSocketUserMark(info->mRTPSocket);
893                             NetworkUtils::UnRegisterSocketUserTag(info->mRTCPSocket);
894                             NetworkUtils::UnRegisterSocketUserMark(info->mRTCPSocket);
895                         }
896 
897                         close(info->mRTPSocket);
898                         close(info->mRTCPSocket);
899                     }
900                 }
901                 mTracks.clear();
902                 mSetupTracksSuccessful = false;
903                 mSeekPending = false;
904                 mFirstAccessUnit = true;
905                 mAllTracksHaveTime = false;
906                 mNTPAnchorUs = -1;
907                 mMediaAnchorUs = -1;
908                 mNumAccessUnitsReceived = 0;
909                 mReceivedFirstRTCPPacket = false;
910                 mReceivedFirstRTPPacket = false;
911                 mPausing = false;
912                 mSeekable = true;
913 
914                 sp<AMessage> reply = new AMessage('tear', this);
915 
916                 int32_t reconnect;
917                 if (msg->findInt32("reconnect", &reconnect) && reconnect) {
918                     reply->setInt32("reconnect", true);
919                 }
920 
921                 AString request;
922                 request = "TEARDOWN ";
923 
924                 // XXX should use aggregate url from SDP here...
925                 request.append(mSessionURL);
926                 request.append(" RTSP/1.0\r\n");
927 
928                 request.append("Session: ");
929                 request.append(mSessionID);
930                 request.append("\r\n");
931 
932                 request.append("\r\n");
933 
934                 mConn->sendRequest(request.c_str(), reply);
935 
936                 // If the response of teardown hasn't been received in 3 seconds,
937                 // post 'tear' message to avoid ANR.
938                 if (!msg->findInt32("reconnect", &reconnect) || !reconnect) {
939                     sp<AMessage> teardown = reply->dup();
940                     teardown->setInt32("result", -ECONNABORTED);
941                     teardown->post(kTearDownTimeoutUs);
942                 }
943                 break;
944             }
945 
946             case 'tear':
947             {
948                 int32_t result;
949                 CHECK(msg->findInt32("result", &result));
950 
951                 ALOGI("TEARDOWN completed with result %d (%s)",
952                      result, strerror(-result));
953 
954                 sp<AMessage> reply = new AMessage('disc', this);
955 
956                 int32_t reconnect;
957                 if (msg->findInt32("reconnect", &reconnect) && reconnect) {
958                     reply->setInt32("reconnect", true);
959                 }
960 
961                 mConn->disconnect(reply);
962                 break;
963             }
964 
965             case 'quit':
966             {
967                 sp<AMessage> msg = mNotify->dup();
968                 msg->setInt32("what", kWhatDisconnected);
969                 msg->setInt32("result", UNKNOWN_ERROR);
970                 msg->post();
971                 break;
972             }
973 
974             case 'chek':
975             {
976                 int32_t generation;
977                 CHECK(msg->findInt32("generation", &generation));
978                 if (generation != mCheckGeneration) {
979                     // This is an outdated message. Ignore.
980                     break;
981                 }
982 
983                 if (mNumAccessUnitsReceived == 0) {
984 #if 1
985                     ALOGI("stream ended? aborting.");
986                     (new AMessage('abor', this))->post();
987                     break;
988 #else
989                     ALOGI("haven't seen an AU in a looong time.");
990 #endif
991                 }
992 
993                 mNumAccessUnitsReceived = 0;
994                 msg->post(kAccessUnitTimeoutUs);
995                 break;
996             }
997 
998             case 'accu':
999             {
1000                 if (mSeekPending) {
1001                     ALOGV("Stale access unit.");
1002                     break;
1003                 }
1004 
1005                 int32_t timeUpdate;
1006                 if (msg->findInt32("time-update", &timeUpdate) && timeUpdate) {
1007                     size_t trackIndex;
1008                     CHECK(msg->findSize("track-index", &trackIndex));
1009 
1010                     uint32_t rtpTime;
1011                     uint64_t ntpTime;
1012                     CHECK(msg->findInt32("rtp-time", (int32_t *)&rtpTime));
1013                     CHECK(msg->findInt64("ntp-time", (int64_t *)&ntpTime));
1014 
1015                     onTimeUpdate(trackIndex, rtpTime, ntpTime);
1016                     break;
1017                 }
1018 
1019                 int32_t first;
1020                 if (msg->findInt32("first-rtcp", &first)) {
1021                     mReceivedFirstRTCPPacket = true;
1022                     break;
1023                 }
1024 
1025                 if (msg->findInt32("first-rtp", &first)) {
1026                     mReceivedFirstRTPPacket = true;
1027                     break;
1028                 }
1029 
1030                 ++mNumAccessUnitsReceived;
1031                 postAccessUnitTimeoutCheck();
1032 
1033                 size_t trackIndex;
1034                 CHECK(msg->findSize("track-index", &trackIndex));
1035 
1036                 if (trackIndex >= mTracks.size()) {
1037                     ALOGV("late packets ignored.");
1038                     break;
1039                 }
1040 
1041                 TrackInfo *track = &mTracks.editItemAt(trackIndex);
1042 
1043                 int32_t eos;
1044                 if (msg->findInt32("eos", &eos)) {
1045                     ALOGI("received BYE on track index %zu", trackIndex);
1046                     if (!mAllTracksHaveTime && dataReceivedOnAllChannels()) {
1047                         ALOGI("No time established => fake existing data");
1048 
1049                         track->mEOSReceived = true;
1050                         mTryFakeRTCP = true;
1051                         mReceivedFirstRTCPPacket = true;
1052                         fakeTimestamps();
1053                     } else {
1054                         postQueueEOS(trackIndex, ERROR_END_OF_STREAM);
1055                     }
1056                     return;
1057                 }
1058 
1059                 if (mSeekPending) {
1060                     ALOGV("we're seeking, dropping stale packet.");
1061                     break;
1062                 }
1063 
1064                 sp<ABuffer> accessUnit;
1065                 CHECK(msg->findBuffer("access-unit", &accessUnit));
1066                 onAccessUnitComplete(trackIndex, accessUnit);
1067                 break;
1068             }
1069 
1070             case 'paus':
1071             {
1072                 int32_t generation;
1073                 CHECK(msg->findInt32("pausecheck", &generation));
1074                 if (generation != mPauseGeneration) {
1075                     ALOGV("Ignoring outdated pause message.");
1076                     break;
1077                 }
1078 
1079                 if (!mSeekable) {
1080                     ALOGW("This is a live stream, ignoring pause request.");
1081                     break;
1082                 }
1083 
1084                 if (mPausing) {
1085                     ALOGV("This stream is already paused.");
1086                     break;
1087                 }
1088 
1089                 mCheckPending = true;
1090                 ++mCheckGeneration;
1091                 mPausing = true;
1092 
1093                 AString request = "PAUSE ";
1094                 request.append(mControlURL);
1095                 request.append(" RTSP/1.0\r\n");
1096 
1097                 request.append("Session: ");
1098                 request.append(mSessionID);
1099                 request.append("\r\n");
1100 
1101                 request.append("\r\n");
1102 
1103                 sp<AMessage> reply = new AMessage('pau2', this);
1104                 mConn->sendRequest(request.c_str(), reply);
1105                 break;
1106             }
1107 
1108             case 'pau2':
1109             {
1110                 int32_t result;
1111                 CHECK(msg->findInt32("result", &result));
1112                 mCheckTimeoutGeneration++;
1113 
1114                 ALOGI("PAUSE completed with result %d (%s)",
1115                      result, strerror(-result));
1116                 break;
1117             }
1118 
1119             case 'resu':
1120             {
1121                 if (mPausing && mSeekPending) {
1122                     // If seeking, Play will be sent from see1 instead
1123                     break;
1124                 }
1125 
1126                 if (!mPausing) {
1127                     // Dont send PLAY if we have not paused
1128                     break;
1129                 }
1130                 AString request = "PLAY ";
1131                 request.append(mControlURL);
1132                 request.append(" RTSP/1.0\r\n");
1133 
1134                 request.append("Session: ");
1135                 request.append(mSessionID);
1136                 request.append("\r\n");
1137 
1138                 request.append("\r\n");
1139 
1140                 sp<AMessage> reply = new AMessage('res2', this);
1141                 mConn->sendRequest(request.c_str(), reply);
1142                 break;
1143             }
1144 
1145             case 'res2':
1146             {
1147                 int32_t result;
1148                 CHECK(msg->findInt32("result", &result));
1149 
1150                 ALOGI("PLAY (for resume) completed with result %d (%s)",
1151                      result, strerror(-result));
1152 
1153                 mCheckPending = false;
1154                 ++mCheckGeneration;
1155                 postAccessUnitTimeoutCheck();
1156 
1157                 if (result == OK) {
1158                     sp<RefBase> obj;
1159                     CHECK(msg->findObject("response", &obj));
1160                     sp<ARTSPResponse> response =
1161                         static_cast<ARTSPResponse *>(obj.get());
1162 
1163                     if (response->mStatusCode != 200) {
1164                         result = UNKNOWN_ERROR;
1165                     } else {
1166                         parsePlayResponse(response);
1167 
1168                         // Post new timeout in order to make sure to use
1169                         // fake timestamps if no new Sender Reports arrive
1170                         postTimeout();
1171                     }
1172                 }
1173 
1174                 if (result != OK) {
1175                     ALOGE("resume failed, aborting.");
1176                     (new AMessage('abor', this))->post();
1177                 }
1178 
1179                 mPausing = false;
1180                 break;
1181             }
1182 
1183             case 'seek':
1184             {
1185                 if (!mSeekable) {
1186                     ALOGW("This is a live stream, ignoring seek request.");
1187 
1188                     sp<AMessage> msg = mNotify->dup();
1189                     msg->setInt32("what", kWhatSeekDone);
1190                     msg->post();
1191                     break;
1192                 }
1193 
1194                 int64_t timeUs;
1195                 CHECK(msg->findInt64("time", &timeUs));
1196 
1197                 mSeekPending = true;
1198 
1199                 // Disable the access unit timeout until we resumed
1200                 // playback again.
1201                 mCheckPending = true;
1202                 ++mCheckGeneration;
1203 
1204                 sp<AMessage> reply = new AMessage('see0', this);
1205                 reply->setInt64("time", timeUs);
1206 
1207                 if (mPausing) {
1208                     // PAUSE already sent
1209                     ALOGI("Pause already sent");
1210                     reply->post();
1211                     break;
1212                 }
1213                 AString request = "PAUSE ";
1214                 request.append(mControlURL);
1215                 request.append(" RTSP/1.0\r\n");
1216 
1217                 request.append("Session: ");
1218                 request.append(mSessionID);
1219                 request.append("\r\n");
1220 
1221                 request.append("\r\n");
1222 
1223                 mConn->sendRequest(request.c_str(), reply);
1224                 break;
1225             }
1226 
1227             case 'see0':
1228             {
1229                 // Session is paused now.
1230                 status_t err = OK;
1231                 msg->findInt32("result", &err);
1232 
1233                 int64_t timeUs;
1234                 CHECK(msg->findInt64("time", &timeUs));
1235 
1236                 sp<AMessage> notify = mNotify->dup();
1237                 notify->setInt32("what", kWhatSeekPaused);
1238                 notify->setInt32("err", err);
1239                 notify->setInt64("time", timeUs);
1240                 notify->post();
1241                 break;
1242 
1243             }
1244 
1245             case 'see1':
1246             {
1247                 for (size_t i = 0; i < mTracks.size(); ++i) {
1248                     TrackInfo *info = &mTracks.editItemAt(i);
1249 
1250                     postQueueSeekDiscontinuity(i);
1251                     info->mEOSReceived = false;
1252 
1253                     info->mRTPAnchor = 0;
1254                     info->mNTPAnchorUs = -1;
1255                 }
1256 
1257                 mAllTracksHaveTime = false;
1258                 mNTPAnchorUs = -1;
1259 
1260                 // Start new timeoutgeneration to avoid getting timeout
1261                 // before PLAY response arrive
1262                 postTimeout();
1263 
1264                 int64_t timeUs;
1265                 CHECK(msg->findInt64("time", &timeUs));
1266 
1267                 AString request = "PLAY ";
1268                 request.append(mControlURL);
1269                 request.append(" RTSP/1.0\r\n");
1270 
1271                 request.append("Session: ");
1272                 request.append(mSessionID);
1273                 request.append("\r\n");
1274 
1275                 request.append(
1276                         AStringPrintf(
1277                             "Range: npt=%lld-\r\n", timeUs / 1000000ll));
1278 
1279                 request.append("\r\n");
1280 
1281                 sp<AMessage> reply = new AMessage('see2', this);
1282                 mConn->sendRequest(request.c_str(), reply);
1283                 break;
1284             }
1285 
1286             case 'see2':
1287             {
1288                 if (mTracks.size() == 0) {
1289                     // We have already hit abor, break
1290                     break;
1291                 }
1292 
1293                 int32_t result;
1294                 CHECK(msg->findInt32("result", &result));
1295 
1296                 ALOGI("PLAY (for seek) completed with result %d (%s)",
1297                      result, strerror(-result));
1298 
1299                 mCheckPending = false;
1300                 ++mCheckGeneration;
1301                 postAccessUnitTimeoutCheck();
1302 
1303                 if (result == OK) {
1304                     sp<RefBase> obj;
1305                     CHECK(msg->findObject("response", &obj));
1306                     sp<ARTSPResponse> response =
1307                         static_cast<ARTSPResponse *>(obj.get());
1308 
1309                     if (response->mStatusCode != 200) {
1310                         result = UNKNOWN_ERROR;
1311                     } else {
1312                         parsePlayResponse(response);
1313 
1314                         // Post new timeout in order to make sure to use
1315                         // fake timestamps if no new Sender Reports arrive
1316                         postTimeout();
1317 
1318                         ssize_t i = response->mHeaders.indexOfKey("rtp-info");
1319                         CHECK_GE(i, 0);
1320 
1321                         ALOGV("rtp-info: %s", response->mHeaders.valueAt(i).c_str());
1322 
1323                         ALOGI("seek completed.");
1324                     }
1325                 }
1326 
1327                 if (result != OK) {
1328                     ALOGE("seek failed, aborting.");
1329                     (new AMessage('abor', this))->post();
1330                 }
1331 
1332                 mPausing = false;
1333                 mSeekPending = false;
1334 
1335                 // Discard all stale access units.
1336                 for (size_t i = 0; i < mTracks.size(); ++i) {
1337                     TrackInfo *track = &mTracks.editItemAt(i);
1338                     track->mPackets.clear();
1339                 }
1340 
1341                 sp<AMessage> msg = mNotify->dup();
1342                 msg->setInt32("what", kWhatSeekDone);
1343                 msg->post();
1344                 break;
1345             }
1346 
1347             case 'biny':
1348             {
1349                 sp<ABuffer> buffer;
1350                 CHECK(msg->findBuffer("buffer", &buffer));
1351 
1352                 int32_t index;
1353                 CHECK(buffer->meta()->findInt32("index", &index));
1354 
1355                 mRTPConn->injectPacket(index, buffer);
1356                 break;
1357             }
1358 
1359             case 'tiou':
1360             {
1361                 int32_t timeoutGenerationCheck;
1362                 CHECK(msg->findInt32("tioucheck", &timeoutGenerationCheck));
1363                 if (timeoutGenerationCheck != mCheckTimeoutGeneration) {
1364                     // This is an outdated message. Ignore.
1365                     // This typically happens if a lot of seeks are
1366                     // performed, since new timeout messages now are
1367                     // posted at seek as well.
1368                     break;
1369                 }
1370                 if (!mReceivedFirstRTCPPacket) {
1371                     if (dataReceivedOnAllChannels() && !mTryFakeRTCP) {
1372                         ALOGW("We received RTP packets but no RTCP packets, "
1373                              "using fake timestamps.");
1374 
1375                         mTryFakeRTCP = true;
1376 
1377                         mReceivedFirstRTCPPacket = true;
1378 
1379                         fakeTimestamps();
1380                     } else if (!mReceivedFirstRTPPacket && !mTryTCPInterleaving) {
1381                         ALOGW("Never received any data, switching transports.");
1382 
1383                         mTryTCPInterleaving = true;
1384 
1385                         sp<AMessage> msg = new AMessage('abor', this);
1386                         msg->setInt32("reconnect", true);
1387                         msg->post();
1388                     } else {
1389                         ALOGW("Never received any data, disconnecting.");
1390                         (new AMessage('abor', this))->post();
1391                     }
1392                 } else {
1393                     if (!mAllTracksHaveTime) {
1394                         ALOGW("We received some RTCP packets, but time "
1395                               "could not be established on all tracks, now "
1396                               "using fake timestamps");
1397 
1398                         fakeTimestamps();
1399                     }
1400                 }
1401                 break;
1402             }
1403 
1404             default:
1405                 TRESPASS();
1406                 break;
1407         }
1408     }
1409 
postKeepAliveMyHandler1410     void postKeepAlive() {
1411         sp<AMessage> msg = new AMessage('aliv', this);
1412         msg->setInt32("generation", mKeepAliveGeneration);
1413         msg->post((mKeepAliveTimeoutUs * 9) / 10);
1414     }
1415 
cancelAccessUnitTimeoutCheckMyHandler1416     void cancelAccessUnitTimeoutCheck() {
1417         ALOGV("cancelAccessUnitTimeoutCheck");
1418         ++mCheckGeneration;
1419     }
1420 
postAccessUnitTimeoutCheckMyHandler1421     void postAccessUnitTimeoutCheck() {
1422         if (mCheckPending) {
1423             return;
1424         }
1425 
1426         mCheckPending = true;
1427         sp<AMessage> check = new AMessage('chek', this);
1428         check->setInt32("generation", mCheckGeneration);
1429         check->post(kAccessUnitTimeoutUs);
1430     }
1431 
SplitStringMyHandler1432     static void SplitString(
1433             const AString &s, const char *separator, List<AString> *items) {
1434         items->clear();
1435         size_t start = 0;
1436         while (start < s.size()) {
1437             ssize_t offset = s.find(separator, start);
1438 
1439             if (offset < 0) {
1440                 items->push_back(AString(s, start, s.size() - start));
1441                 break;
1442             }
1443 
1444             items->push_back(AString(s, start, offset - start));
1445             start = offset + strlen(separator);
1446         }
1447     }
1448 
parsePlayResponseMyHandler1449     void parsePlayResponse(const sp<ARTSPResponse> &response) {
1450         mPlayResponseParsed = true;
1451         if (mTracks.size() == 0) {
1452             ALOGV("parsePlayResponse: late packets ignored.");
1453             return;
1454         }
1455 
1456         ssize_t i = response->mHeaders.indexOfKey("range");
1457         if (i < 0) {
1458             // Server doesn't even tell use what range it is going to
1459             // play, therefore we won't support seeking.
1460             return;
1461         }
1462 
1463         AString range = response->mHeaders.valueAt(i);
1464         ALOGV("Range: %s", range.c_str());
1465 
1466         AString val;
1467         CHECK(GetAttribute(range.c_str(), "npt", &val));
1468 
1469         float npt1, npt2;
1470         if (!ASessionDescription::parseNTPRange(val.c_str(), &npt1, &npt2)) {
1471             // This is a live stream and therefore not seekable.
1472 
1473             ALOGI("This is a live stream");
1474             return;
1475         }
1476 
1477         i = response->mHeaders.indexOfKey("rtp-info");
1478         CHECK_GE(i, 0);
1479 
1480         AString rtpInfo = response->mHeaders.valueAt(i);
1481         List<AString> streamInfos;
1482         SplitString(rtpInfo, ",", &streamInfos);
1483 
1484         int n = 1;
1485         for (List<AString>::iterator it = streamInfos.begin();
1486              it != streamInfos.end(); ++it) {
1487             (*it).trim();
1488             ALOGV("streamInfo[%d] = %s", n, (*it).c_str());
1489 
1490             CHECK(GetAttribute((*it).c_str(), "url", &val));
1491 
1492             size_t trackIndex = 0;
1493             while (trackIndex < mTracks.size()
1494                     && !(val == mTracks.editItemAt(trackIndex).mURL)) {
1495                 ++trackIndex;
1496             }
1497             CHECK_LT(trackIndex, mTracks.size());
1498 
1499             CHECK(GetAttribute((*it).c_str(), "seq", &val));
1500 
1501             char *end;
1502             unsigned long seq = strtoul(val.c_str(), &end, 10);
1503 
1504             TrackInfo *info = &mTracks.editItemAt(trackIndex);
1505             info->mFirstSeqNumInSegment = seq;
1506             info->mNewSegment = true;
1507             info->mAllowedStaleAccessUnits = kMaxAllowedStaleAccessUnits;
1508 
1509             CHECK(GetAttribute((*it).c_str(), "rtptime", &val));
1510 
1511             uint32_t rtpTime = strtoul(val.c_str(), &end, 10);
1512 
1513             ALOGV("track #%d: rtpTime=%u <=> npt=%.2f", n, rtpTime, npt1);
1514 
1515             info->mNormalPlayTimeRTP = rtpTime;
1516             info->mNormalPlayTimeUs = (int64_t)(npt1 * 1E6);
1517 
1518             if (!mFirstAccessUnit) {
1519                 postNormalPlayTimeMapping(
1520                         trackIndex,
1521                         info->mNormalPlayTimeRTP, info->mNormalPlayTimeUs);
1522             }
1523 
1524             ++n;
1525         }
1526     }
1527 
getTrackFormatMyHandler1528     sp<MetaData> getTrackFormat(size_t index, int32_t *timeScale) {
1529         CHECK_GE(index, 0u);
1530         CHECK_LT(index, mTracks.size());
1531 
1532         const TrackInfo &info = mTracks.itemAt(index);
1533 
1534         *timeScale = info.mTimeScale;
1535 
1536         return info.mPacketSource->getFormat();
1537     }
1538 
countTracksMyHandler1539     size_t countTracks() const {
1540         return mTracks.size();
1541     }
1542 
1543 private:
1544     struct TrackInfo {
1545         AString mURL;
1546         int mRTPSocket;
1547         int mRTCPSocket;
1548         bool mUsingInterleavedTCP;
1549         uint32_t mFirstSeqNumInSegment;
1550         bool mNewSegment;
1551         int32_t mAllowedStaleAccessUnits;
1552 
1553         uint32_t mRTPAnchor;
1554         int64_t mNTPAnchorUs;
1555         int32_t mTimeScale;
1556         bool mEOSReceived;
1557 
1558         uint32_t mNormalPlayTimeRTP;
1559         int64_t mNormalPlayTimeUs;
1560 
1561         sp<APacketSource> mPacketSource;
1562 
1563         // Stores packets temporarily while no notion of time
1564         // has been established yet.
1565         List<sp<ABuffer> > mPackets;
1566     };
1567 
1568     sp<AMessage> mNotify;
1569     bool mUIDValid;
1570     uid_t mUID;
1571     sp<ALooper> mNetLooper;
1572     sp<ARTSPConnection> mConn;
1573     sp<ARTPConnection> mRTPConn;
1574     sp<ASessionDescription> mSessionDesc;
1575     AString mOriginalSessionURL;  // This one still has user:pass@
1576     AString mSessionURL;
1577     AString mSessionHost;
1578     AString mBaseURL;
1579     AString mControlURL;
1580     AString mSessionID;
1581     bool mSetupTracksSuccessful;
1582     bool mSeekPending;
1583     bool mFirstAccessUnit;
1584 
1585     bool mAllTracksHaveTime;
1586     int64_t mNTPAnchorUs;
1587     int64_t mMediaAnchorUs;
1588     int64_t mLastMediaTimeUs;
1589 
1590     int64_t mNumAccessUnitsReceived;
1591     bool mCheckPending;
1592     int32_t mCheckGeneration;
1593     int32_t mCheckTimeoutGeneration;
1594     bool mTryTCPInterleaving;
1595     bool mTryFakeRTCP;
1596     bool mReceivedFirstRTCPPacket;
1597     bool mReceivedFirstRTPPacket;
1598     bool mSeekable;
1599     int64_t mKeepAliveTimeoutUs;
1600     int32_t mKeepAliveGeneration;
1601     bool mPausing;
1602     int32_t mPauseGeneration;
1603 
1604     Vector<TrackInfo> mTracks;
1605 
1606     bool mPlayResponseParsed;
1607 
setupTrackMyHandler1608     void setupTrack(size_t index) {
1609         sp<APacketSource> source =
1610             new APacketSource(mSessionDesc, index);
1611 
1612         if (source->initCheck() != OK) {
1613             ALOGW("Unsupported format. Ignoring track #%zu.", index);
1614 
1615             sp<AMessage> reply = new AMessage('setu', this);
1616             reply->setSize("index", index);
1617             reply->setInt32("result", ERROR_UNSUPPORTED);
1618             reply->post();
1619             return;
1620         }
1621 
1622         AString url;
1623         CHECK(mSessionDesc->findAttribute(index, "a=control", &url));
1624 
1625         AString trackURL;
1626         CHECK(MakeURL(mBaseURL.c_str(), url.c_str(), &trackURL));
1627 
1628         mTracks.push(TrackInfo());
1629         TrackInfo *info = &mTracks.editItemAt(mTracks.size() - 1);
1630         info->mURL = trackURL;
1631         info->mPacketSource = source;
1632         info->mUsingInterleavedTCP = false;
1633         info->mFirstSeqNumInSegment = 0;
1634         info->mNewSegment = true;
1635         info->mAllowedStaleAccessUnits = kMaxAllowedStaleAccessUnits;
1636         info->mRTPSocket = -1;
1637         info->mRTCPSocket = -1;
1638         info->mRTPAnchor = 0;
1639         info->mNTPAnchorUs = -1;
1640         info->mNormalPlayTimeRTP = 0;
1641         info->mNormalPlayTimeUs = 0ll;
1642 
1643         unsigned long PT;
1644         AString formatDesc;
1645         AString formatParams;
1646         mSessionDesc->getFormatType(index, &PT, &formatDesc, &formatParams);
1647 
1648         int32_t timescale;
1649         int32_t numChannels;
1650         ASessionDescription::ParseFormatDesc(
1651                 formatDesc.c_str(), &timescale, &numChannels);
1652 
1653         info->mTimeScale = timescale;
1654         info->mEOSReceived = false;
1655 
1656         ALOGV("track #%zu URL=%s", mTracks.size(), trackURL.c_str());
1657 
1658         AString request = "SETUP ";
1659         request.append(trackURL);
1660         request.append(" RTSP/1.0\r\n");
1661 
1662         if (mTryTCPInterleaving) {
1663             size_t interleaveIndex = 2 * (mTracks.size() - 1);
1664             info->mUsingInterleavedTCP = true;
1665             info->mRTPSocket = interleaveIndex;
1666             info->mRTCPSocket = interleaveIndex + 1;
1667 
1668             request.append("Transport: RTP/AVP/TCP;interleaved=");
1669             request.append(interleaveIndex);
1670             request.append("-");
1671             request.append(interleaveIndex + 1);
1672         } else {
1673             unsigned rtpPort;
1674             ARTPConnection::MakePortPair(
1675                     &info->mRTPSocket, &info->mRTCPSocket, &rtpPort);
1676 
1677             if (mUIDValid) {
1678                 NetworkUtils::RegisterSocketUserTag(info->mRTPSocket, mUID,
1679                         (uint32_t)*(uint32_t*) "RTP_");
1680                 NetworkUtils::RegisterSocketUserTag(info->mRTCPSocket, mUID,
1681                         (uint32_t)*(uint32_t*) "RTP_");
1682                 NetworkUtils::RegisterSocketUserMark(info->mRTPSocket, mUID);
1683                 NetworkUtils::RegisterSocketUserMark(info->mRTCPSocket, mUID);
1684             }
1685 
1686             request.append("Transport: RTP/AVP/UDP;unicast;client_port=");
1687             request.append(rtpPort);
1688             request.append("-");
1689             request.append(rtpPort + 1);
1690         }
1691 
1692         request.append("\r\n");
1693 
1694         if (index > 1) {
1695             request.append("Session: ");
1696             request.append(mSessionID);
1697             request.append("\r\n");
1698         }
1699 
1700         request.append("\r\n");
1701 
1702         sp<AMessage> reply = new AMessage('setu', this);
1703         reply->setSize("index", index);
1704         reply->setSize("track-index", mTracks.size() - 1);
1705         mConn->sendRequest(request.c_str(), reply);
1706     }
1707 
MakeURLMyHandler1708     static bool MakeURL(const char *baseURL, const char *url, AString *out) {
1709         out->clear();
1710 
1711         if (strncasecmp("rtsp://", baseURL, 7)) {
1712             // Base URL must be absolute
1713             return false;
1714         }
1715 
1716         if (!strncasecmp("rtsp://", url, 7)) {
1717             // "url" is already an absolute URL, ignore base URL.
1718             out->setTo(url);
1719             return true;
1720         }
1721 
1722         size_t n = strlen(baseURL);
1723         out->setTo(baseURL);
1724         if (baseURL[n - 1] != '/') {
1725             out->append("/");
1726         }
1727         out->append(url);
1728 
1729         return true;
1730     }
1731 
fakeTimestampsMyHandler1732     void fakeTimestamps() {
1733         mNTPAnchorUs = -1ll;
1734         for (size_t i = 0; i < mTracks.size(); ++i) {
1735             onTimeUpdate(i, 0, 0ll);
1736         }
1737     }
1738 
dataReceivedOnAllChannelsMyHandler1739     bool dataReceivedOnAllChannels() {
1740         TrackInfo *track;
1741         for (size_t i = 0; i < mTracks.size(); ++i) {
1742             track = &mTracks.editItemAt(i);
1743             if (track->mPackets.empty()) {
1744                 return false;
1745             }
1746         }
1747         return true;
1748     }
1749 
handleFirstAccessUnitMyHandler1750     void handleFirstAccessUnit() {
1751         if (mFirstAccessUnit) {
1752             sp<AMessage> msg = mNotify->dup();
1753             msg->setInt32("what", kWhatConnected);
1754             msg->post();
1755 
1756             if (mSeekable) {
1757                 for (size_t i = 0; i < mTracks.size(); ++i) {
1758                     TrackInfo *info = &mTracks.editItemAt(i);
1759 
1760                     postNormalPlayTimeMapping(
1761                             i,
1762                             info->mNormalPlayTimeRTP, info->mNormalPlayTimeUs);
1763                 }
1764             }
1765 
1766             mFirstAccessUnit = false;
1767         }
1768     }
1769 
onTimeUpdateMyHandler1770     void onTimeUpdate(int32_t trackIndex, uint32_t rtpTime, uint64_t ntpTime) {
1771         ALOGV("onTimeUpdate track %d, rtpTime = 0x%08x, ntpTime = %#016llx",
1772              trackIndex, rtpTime, (long long)ntpTime);
1773 
1774         int64_t ntpTimeUs = (int64_t)(ntpTime * 1E6 / (1ll << 32));
1775 
1776         TrackInfo *track = &mTracks.editItemAt(trackIndex);
1777 
1778         track->mRTPAnchor = rtpTime;
1779         track->mNTPAnchorUs = ntpTimeUs;
1780 
1781         if (mNTPAnchorUs < 0) {
1782             mNTPAnchorUs = ntpTimeUs;
1783             mMediaAnchorUs = mLastMediaTimeUs;
1784         }
1785 
1786         if (!mAllTracksHaveTime) {
1787             bool allTracksHaveTime = (mTracks.size() > 0);
1788             for (size_t i = 0; i < mTracks.size(); ++i) {
1789                 TrackInfo *track = &mTracks.editItemAt(i);
1790                 if (track->mNTPAnchorUs < 0) {
1791                     allTracksHaveTime = false;
1792                     break;
1793                 }
1794             }
1795             if (allTracksHaveTime) {
1796                 mAllTracksHaveTime = true;
1797                 ALOGI("Time now established for all tracks.");
1798             }
1799         }
1800         if (mAllTracksHaveTime && dataReceivedOnAllChannels()) {
1801             handleFirstAccessUnit();
1802 
1803             // Time is now established, lets start timestamping immediately
1804             for (size_t i = 0; i < mTracks.size(); ++i) {
1805                 if (OK != processAccessUnitQueue(i)) {
1806                     return;
1807                 }
1808             }
1809             for (size_t i = 0; i < mTracks.size(); ++i) {
1810                 TrackInfo *trackInfo = &mTracks.editItemAt(i);
1811                 if (trackInfo->mEOSReceived) {
1812                     postQueueEOS(i, ERROR_END_OF_STREAM);
1813                     trackInfo->mEOSReceived = false;
1814                 }
1815             }
1816         }
1817     }
1818 
processAccessUnitQueueMyHandler1819     status_t processAccessUnitQueue(int32_t trackIndex) {
1820         TrackInfo *track = &mTracks.editItemAt(trackIndex);
1821         while (!track->mPackets.empty()) {
1822             sp<ABuffer> accessUnit = *track->mPackets.begin();
1823             track->mPackets.erase(track->mPackets.begin());
1824 
1825             uint32_t seqNum = (uint32_t)accessUnit->int32Data();
1826             if (track->mNewSegment) {
1827                 // The sequence number from RTP packet has only 16 bits and is extended
1828                 // by ARTPSource. Only the low 16 bits of seq in RTP-Info of reply of
1829                 // RTSP "PLAY" command should be used to detect the first RTP packet
1830                 // after seeking.
1831                 if (mSeekable) {
1832                     if (track->mAllowedStaleAccessUnits > 0) {
1833                         uint32_t seqNum16 = seqNum & 0xffff;
1834                         uint32_t firstSeqNumInSegment16 = track->mFirstSeqNumInSegment & 0xffff;
1835                         if (seqNum16 > firstSeqNumInSegment16 + kMaxAllowedStaleAccessUnits
1836                                 || seqNum16 < firstSeqNumInSegment16) {
1837                             // Not the first rtp packet of the stream after seeking, discarding.
1838                             track->mAllowedStaleAccessUnits--;
1839                             ALOGV("discarding stale access unit (0x%x : 0x%x)",
1840                                  seqNum, track->mFirstSeqNumInSegment);
1841                             continue;
1842                         }
1843                         ALOGW_IF(seqNum16 != firstSeqNumInSegment16,
1844                                 "Missing the first packet(%u), now take packet(%u) as first one",
1845                                 track->mFirstSeqNumInSegment, seqNum);
1846                     } else { // track->mAllowedStaleAccessUnits <= 0
1847                         mNumAccessUnitsReceived = 0;
1848                         ALOGW_IF(track->mAllowedStaleAccessUnits == 0,
1849                              "Still no first rtp packet after %d stale ones",
1850                              kMaxAllowedStaleAccessUnits);
1851                         track->mAllowedStaleAccessUnits = -1;
1852                         return UNKNOWN_ERROR;
1853                     }
1854                 }
1855 
1856                 // Now found the first rtp packet of the stream after seeking.
1857                 track->mFirstSeqNumInSegment = seqNum;
1858                 track->mNewSegment = false;
1859             }
1860 
1861             if (seqNum < track->mFirstSeqNumInSegment) {
1862                 ALOGV("dropping stale access-unit (%d < %d)",
1863                      seqNum, track->mFirstSeqNumInSegment);
1864                 continue;
1865             }
1866 
1867             if (addMediaTimestamp(trackIndex, track, accessUnit)) {
1868                 postQueueAccessUnit(trackIndex, accessUnit);
1869             }
1870         }
1871         return OK;
1872     }
1873 
onAccessUnitCompleteMyHandler1874     void onAccessUnitComplete(
1875             int32_t trackIndex, const sp<ABuffer> &accessUnit) {
1876         TrackInfo *track = &mTracks.editItemAt(trackIndex);
1877         track->mPackets.push_back(accessUnit);
1878 
1879         uint32_t seqNum = (uint32_t)accessUnit->int32Data();
1880         ALOGV("onAccessUnitComplete track %d storing accessunit %u", trackIndex, seqNum);
1881 
1882         if(!mPlayResponseParsed){
1883             ALOGV("play response is not parsed");
1884             return;
1885         }
1886 
1887         handleFirstAccessUnit();
1888 
1889         if (!mAllTracksHaveTime) {
1890             ALOGV("storing accessUnit, no time established yet");
1891             return;
1892         }
1893 
1894         if (OK != processAccessUnitQueue(trackIndex)) {
1895             return;
1896         }
1897 
1898         if (track->mEOSReceived) {
1899             postQueueEOS(trackIndex, ERROR_END_OF_STREAM);
1900             track->mEOSReceived = false;
1901         }
1902     }
1903 
addMediaTimestampMyHandler1904     bool addMediaTimestamp(
1905             int32_t trackIndex, const TrackInfo *track,
1906             const sp<ABuffer> &accessUnit) {
1907         UNUSED_UNLESS_VERBOSE(trackIndex);
1908 
1909         uint32_t rtpTime;
1910         CHECK(accessUnit->meta()->findInt32(
1911                     "rtp-time", (int32_t *)&rtpTime));
1912 
1913         int64_t relRtpTimeUs =
1914             (((int64_t)rtpTime - (int64_t)track->mRTPAnchor) * 1000000ll)
1915                 / track->mTimeScale;
1916 
1917         int64_t ntpTimeUs = track->mNTPAnchorUs + relRtpTimeUs;
1918 
1919         int64_t mediaTimeUs = mMediaAnchorUs + ntpTimeUs - mNTPAnchorUs;
1920 
1921         if (mediaTimeUs > mLastMediaTimeUs) {
1922             mLastMediaTimeUs = mediaTimeUs;
1923         }
1924 
1925         if (mediaTimeUs < 0 && !mSeekable) {
1926             ALOGV("dropping early accessUnit.");
1927             return false;
1928         }
1929 
1930         ALOGV("track %d rtpTime=%u mediaTimeUs = %lld us (%.2f secs)",
1931              trackIndex, rtpTime, (long long)mediaTimeUs, mediaTimeUs / 1E6);
1932 
1933         accessUnit->meta()->setInt64("timeUs", mediaTimeUs);
1934 
1935         return true;
1936     }
1937 
postQueueAccessUnitMyHandler1938     void postQueueAccessUnit(
1939             size_t trackIndex, const sp<ABuffer> &accessUnit) {
1940         sp<AMessage> msg = mNotify->dup();
1941         msg->setInt32("what", kWhatAccessUnit);
1942         msg->setSize("trackIndex", trackIndex);
1943         msg->setBuffer("accessUnit", accessUnit);
1944         msg->post();
1945     }
1946 
postQueueEOSMyHandler1947     void postQueueEOS(size_t trackIndex, status_t finalResult) {
1948         sp<AMessage> msg = mNotify->dup();
1949         msg->setInt32("what", kWhatEOS);
1950         msg->setSize("trackIndex", trackIndex);
1951         msg->setInt32("finalResult", finalResult);
1952         msg->post();
1953     }
1954 
postQueueSeekDiscontinuityMyHandler1955     void postQueueSeekDiscontinuity(size_t trackIndex) {
1956         sp<AMessage> msg = mNotify->dup();
1957         msg->setInt32("what", kWhatSeekDiscontinuity);
1958         msg->setSize("trackIndex", trackIndex);
1959         msg->post();
1960     }
1961 
postNormalPlayTimeMappingMyHandler1962     void postNormalPlayTimeMapping(
1963             size_t trackIndex, uint32_t rtpTime, int64_t nptUs) {
1964         sp<AMessage> msg = mNotify->dup();
1965         msg->setInt32("what", kWhatNormalPlayTimeMapping);
1966         msg->setSize("trackIndex", trackIndex);
1967         msg->setInt32("rtpTime", rtpTime);
1968         msg->setInt64("nptUs", nptUs);
1969         msg->post();
1970     }
1971 
postTimeoutMyHandler1972     void postTimeout() {
1973         sp<AMessage> timeout = new AMessage('tiou', this);
1974         mCheckTimeoutGeneration++;
1975         timeout->setInt32("tioucheck", mCheckTimeoutGeneration);
1976 
1977         int64_t startupTimeoutUs;
1978         startupTimeoutUs = property_get_int64("media.rtsp.timeout-us", kStartupTimeoutUs);
1979         timeout->post(startupTimeoutUs);
1980     }
1981 
1982     DISALLOW_EVIL_CONSTRUCTORS(MyHandler);
1983 };
1984 
1985 }  // namespace android
1986 
1987 #endif  // MY_HANDLER_H_
1988