1 /*
2  * Copyright (C) 2010 The Android Open Source Project
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *      http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 #ifndef MY_HANDLER_H_
18 
19 #define MY_HANDLER_H_
20 
21 //#define LOG_NDEBUG 0
22 
23 #ifndef LOG_TAG
24 #define LOG_TAG "MyHandler"
25 #endif
26 
27 #include <utils/Log.h>
28 #include <cutils/properties.h> // for property_get
29 
30 #include "APacketSource.h"
31 #include "ARTPConnection.h"
32 #include "ARTSPConnection.h"
33 #include "ASessionDescription.h"
34 
35 #include <ctype.h>
36 
37 #include <media/stagefright/foundation/ABuffer.h>
38 #include <media/stagefright/foundation/ADebug.h>
39 #include <media/stagefright/foundation/ALooper.h>
40 #include <media/stagefright/foundation/AMessage.h>
41 #include <media/stagefright/MediaErrors.h>
42 #include <media/stagefright/Utils.h>
43 
44 #include <arpa/inet.h>
45 #include <sys/socket.h>
46 #include <netdb.h>
47 
48 #include "HTTPBase.h"
49 
50 #if LOG_NDEBUG
51 #define UNUSED_UNLESS_VERBOSE(x) (void)(x)
52 #else
53 #define UNUSED_UNLESS_VERBOSE(x)
54 #endif
55 
56 // If no access units are received within 5 secs, assume that the rtp
57 // stream has ended and signal end of stream.
58 static int64_t kAccessUnitTimeoutUs = 10000000ll;
59 
60 // If no access units arrive for the first 10 secs after starting the
61 // stream, assume none ever will and signal EOS or switch transports.
62 static int64_t kStartupTimeoutUs = 10000000ll;
63 
64 static int64_t kDefaultKeepAliveTimeoutUs = 60000000ll;
65 
66 static int64_t kPauseDelayUs = 3000000ll;
67 
68 // The allowed maximum number of stale access units at the beginning of
69 // a new sequence.
70 static int32_t kMaxAllowedStaleAccessUnits = 20;
71 
72 namespace android {
73 
GetAttribute(const char * s,const char * key,AString * value)74 static bool GetAttribute(const char *s, const char *key, AString *value) {
75     value->clear();
76 
77     size_t keyLen = strlen(key);
78 
79     for (;;) {
80         while (isspace(*s)) {
81             ++s;
82         }
83 
84         const char *colonPos = strchr(s, ';');
85 
86         size_t len =
87             (colonPos == NULL) ? strlen(s) : colonPos - s;
88 
89         if (len >= keyLen + 1 && s[keyLen] == '=' && !strncmp(s, key, keyLen)) {
90             value->setTo(&s[keyLen + 1], len - keyLen - 1);
91             return true;
92         }
93 
94         if (colonPos == NULL) {
95             return false;
96         }
97 
98         s = colonPos + 1;
99     }
100 }
101 
102 struct MyHandler : public AHandler {
103     enum {
104         kWhatConnected                  = 'conn',
105         kWhatDisconnected               = 'disc',
106         kWhatSeekPaused                 = 'spau',
107         kWhatSeekDone                   = 'sdon',
108 
109         kWhatAccessUnit                 = 'accU',
110         kWhatEOS                        = 'eos!',
111         kWhatSeekDiscontinuity          = 'seeD',
112         kWhatNormalPlayTimeMapping      = 'nptM',
113     };
114 
115     MyHandler(
116             const char *url,
117             const sp<AMessage> &notify,
118             bool uidValid = false, uid_t uid = 0)
mNotifyMyHandler119         : mNotify(notify),
120           mUIDValid(uidValid),
121           mUID(uid),
122           mNetLooper(new ALooper),
123           mConn(new ARTSPConnection(mUIDValid, mUID)),
124           mRTPConn(new ARTPConnection),
125           mOriginalSessionURL(url),
126           mSessionURL(url),
127           mSetupTracksSuccessful(false),
128           mSeekPending(false),
129           mFirstAccessUnit(true),
130           mAllTracksHaveTime(false),
131           mNTPAnchorUs(-1),
132           mMediaAnchorUs(-1),
133           mLastMediaTimeUs(0),
134           mNumAccessUnitsReceived(0),
135           mCheckPending(false),
136           mCheckGeneration(0),
137           mCheckTimeoutGeneration(0),
138           mTryTCPInterleaving(false),
139           mTryFakeRTCP(false),
140           mReceivedFirstRTCPPacket(false),
141           mReceivedFirstRTPPacket(false),
142           mSeekable(true),
143           mKeepAliveTimeoutUs(kDefaultKeepAliveTimeoutUs),
144           mKeepAliveGeneration(0),
145           mPausing(false),
146           mPauseGeneration(0),
147           mPlayResponseParsed(false) {
148         mNetLooper->setName("rtsp net");
149         mNetLooper->start(false /* runOnCallingThread */,
150                           false /* canCallJava */,
151                           PRIORITY_HIGHEST);
152 
153         // Strip any authentication info from the session url, we don't
154         // want to transmit user/pass in cleartext.
155         AString host, path, user, pass;
156         unsigned port;
157         CHECK(ARTSPConnection::ParseURL(
158                     mSessionURL.c_str(), &host, &port, &path, &user, &pass));
159 
160         if (user.size() > 0) {
161             mSessionURL.clear();
162             mSessionURL.append("rtsp://");
163             mSessionURL.append(host);
164             mSessionURL.append(":");
165             mSessionURL.append(AStringPrintf("%u", port));
166             mSessionURL.append(path);
167 
168             ALOGV("rewritten session url: '%s'", mSessionURL.c_str());
169         }
170 
171         mSessionHost = host;
172     }
173 
connectMyHandler174     void connect() {
175         looper()->registerHandler(mConn);
176         (1 ? mNetLooper : looper())->registerHandler(mRTPConn);
177 
178         sp<AMessage> notify = new AMessage('biny', this);
179         mConn->observeBinaryData(notify);
180 
181         sp<AMessage> reply = new AMessage('conn', this);
182         mConn->connect(mOriginalSessionURL.c_str(), reply);
183     }
184 
loadSDPMyHandler185     void loadSDP(const sp<ASessionDescription>& desc) {
186         looper()->registerHandler(mConn);
187         (1 ? mNetLooper : looper())->registerHandler(mRTPConn);
188 
189         sp<AMessage> notify = new AMessage('biny', this);
190         mConn->observeBinaryData(notify);
191 
192         sp<AMessage> reply = new AMessage('sdpl', this);
193         reply->setObject("description", desc);
194         mConn->connect(mOriginalSessionURL.c_str(), reply);
195     }
196 
getControlURLMyHandler197     AString getControlURL() {
198         AString sessionLevelControlURL;
199         if (mSessionDesc->findAttribute(
200                 0,
201                 "a=control",
202                 &sessionLevelControlURL)) {
203             if (sessionLevelControlURL.compare("*") == 0) {
204                 return mBaseURL;
205             } else {
206                 AString controlURL;
207                 CHECK(MakeURL(
208                         mBaseURL.c_str(),
209                         sessionLevelControlURL.c_str(),
210                         &controlURL));
211                 return controlURL;
212             }
213         } else {
214             return mSessionURL;
215         }
216     }
217 
disconnectMyHandler218     void disconnect() {
219         (new AMessage('abor', this))->post();
220     }
221 
seekMyHandler222     void seek(int64_t timeUs) {
223         sp<AMessage> msg = new AMessage('seek', this);
224         msg->setInt64("time", timeUs);
225         mPauseGeneration++;
226         msg->post();
227     }
228 
continueSeekAfterPauseMyHandler229     void continueSeekAfterPause(int64_t timeUs) {
230         sp<AMessage> msg = new AMessage('see1', this);
231         msg->setInt64("time", timeUs);
232         msg->post();
233     }
234 
isSeekableMyHandler235     bool isSeekable() const {
236         return mSeekable;
237     }
238 
pauseMyHandler239     void pause() {
240         sp<AMessage> msg = new AMessage('paus', this);
241         mPauseGeneration++;
242         msg->setInt32("pausecheck", mPauseGeneration);
243         msg->post();
244     }
245 
resumeMyHandler246     void resume() {
247         sp<AMessage> msg = new AMessage('resu', this);
248         mPauseGeneration++;
249         msg->post();
250     }
251 
addRRMyHandler252     static void addRR(const sp<ABuffer> &buf) {
253         uint8_t *ptr = buf->data() + buf->size();
254         ptr[0] = 0x80 | 0;
255         ptr[1] = 201;  // RR
256         ptr[2] = 0;
257         ptr[3] = 1;
258         ptr[4] = 0xde;  // SSRC
259         ptr[5] = 0xad;
260         ptr[6] = 0xbe;
261         ptr[7] = 0xef;
262 
263         buf->setRange(0, buf->size() + 8);
264     }
265 
addSDESMyHandler266     static void addSDES(int s, const sp<ABuffer> &buffer) {
267         struct sockaddr_in addr;
268         socklen_t addrSize = sizeof(addr);
269         if (getsockname(s, (sockaddr *)&addr, &addrSize) != 0) {
270             inet_aton("0.0.0.0", &(addr.sin_addr));
271         }
272 
273         uint8_t *data = buffer->data() + buffer->size();
274         data[0] = 0x80 | 1;
275         data[1] = 202;  // SDES
276         data[4] = 0xde;  // SSRC
277         data[5] = 0xad;
278         data[6] = 0xbe;
279         data[7] = 0xef;
280 
281         size_t offset = 8;
282 
283         data[offset++] = 1;  // CNAME
284 
285         AString cname = "stagefright@";
286         cname.append(inet_ntoa(addr.sin_addr));
287         data[offset++] = cname.size();
288 
289         memcpy(&data[offset], cname.c_str(), cname.size());
290         offset += cname.size();
291 
292         data[offset++] = 6;  // TOOL
293 
294         AString tool = MakeUserAgent();
295 
296         data[offset++] = tool.size();
297 
298         memcpy(&data[offset], tool.c_str(), tool.size());
299         offset += tool.size();
300 
301         data[offset++] = 0;
302 
303         if ((offset % 4) > 0) {
304             size_t count = 4 - (offset % 4);
305             switch (count) {
306                 case 3:
307                     data[offset++] = 0;
308                 case 2:
309                     data[offset++] = 0;
310                 case 1:
311                     data[offset++] = 0;
312             }
313         }
314 
315         size_t numWords = (offset / 4) - 1;
316         data[2] = numWords >> 8;
317         data[3] = numWords & 0xff;
318 
319         buffer->setRange(buffer->offset(), buffer->size() + offset);
320     }
321 
322     // In case we're behind NAT, fire off two UDP packets to the remote
323     // rtp/rtcp ports to poke a hole into the firewall for future incoming
324     // packets. We're going to send an RR/SDES RTCP packet to both of them.
pokeAHoleMyHandler325     bool pokeAHole(int rtpSocket, int rtcpSocket, const AString &transport) {
326         struct sockaddr_in addr;
327         memset(addr.sin_zero, 0, sizeof(addr.sin_zero));
328         addr.sin_family = AF_INET;
329 
330         AString source;
331         AString server_port;
332         if (!GetAttribute(transport.c_str(),
333                           "source",
334                           &source)) {
335             ALOGW("Missing 'source' field in Transport response. Using "
336                  "RTSP endpoint address.");
337 
338             struct hostent *ent = gethostbyname(mSessionHost.c_str());
339             if (ent == NULL) {
340                 ALOGE("Failed to look up address of session host '%s'",
341                      mSessionHost.c_str());
342 
343                 return false;
344             }
345 
346             addr.sin_addr.s_addr = *(in_addr_t *)ent->h_addr;
347         } else {
348             addr.sin_addr.s_addr = inet_addr(source.c_str());
349         }
350 
351         if (!GetAttribute(transport.c_str(),
352                                  "server_port",
353                                  &server_port)) {
354             ALOGI("Missing 'server_port' field in Transport response.");
355             return false;
356         }
357 
358         int rtpPort, rtcpPort;
359         if (sscanf(server_port.c_str(), "%d-%d", &rtpPort, &rtcpPort) != 2
360                 || rtpPort <= 0 || rtpPort > 65535
361                 || rtcpPort <=0 || rtcpPort > 65535
362                 || rtcpPort != rtpPort + 1) {
363             ALOGE("Server picked invalid RTP/RTCP port pair %s,"
364                  " RTP port must be even, RTCP port must be one higher.",
365                  server_port.c_str());
366 
367             return false;
368         }
369 
370         if (rtpPort & 1) {
371             ALOGW("Server picked an odd RTP port, it should've picked an "
372                  "even one, we'll let it pass for now, but this may break "
373                  "in the future.");
374         }
375 
376         if (addr.sin_addr.s_addr == INADDR_NONE) {
377             return true;
378         }
379 
380         if (IN_LOOPBACK(ntohl(addr.sin_addr.s_addr))) {
381             // No firewalls to traverse on the loopback interface.
382             return true;
383         }
384 
385         // Make up an RR/SDES RTCP packet.
386         sp<ABuffer> buf = new ABuffer(65536);
387         buf->setRange(0, 0);
388         addRR(buf);
389         addSDES(rtpSocket, buf);
390 
391         addr.sin_port = htons(rtpPort);
392 
393         ssize_t n = sendto(
394                 rtpSocket, buf->data(), buf->size(), 0,
395                 (const sockaddr *)&addr, sizeof(addr));
396 
397         if (n < (ssize_t)buf->size()) {
398             ALOGE("failed to poke a hole for RTP packets");
399             return false;
400         }
401 
402         addr.sin_port = htons(rtcpPort);
403 
404         n = sendto(
405                 rtcpSocket, buf->data(), buf->size(), 0,
406                 (const sockaddr *)&addr, sizeof(addr));
407 
408         if (n < (ssize_t)buf->size()) {
409             ALOGE("failed to poke a hole for RTCP packets");
410             return false;
411         }
412 
413         ALOGV("successfully poked holes.");
414 
415         return true;
416     }
417 
isLiveStreamMyHandler418     static bool isLiveStream(const sp<ASessionDescription> &desc) {
419         AString attrLiveStream;
420         if (desc->findAttribute(0, "a=LiveStream", &attrLiveStream)) {
421             ssize_t semicolonPos = attrLiveStream.find(";", 2);
422 
423             const char* liveStreamValue;
424             if (semicolonPos < 0) {
425                 liveStreamValue = attrLiveStream.c_str();
426             } else {
427                 AString valString;
428                 valString.setTo(attrLiveStream,
429                         semicolonPos + 1,
430                         attrLiveStream.size() - semicolonPos - 1);
431                 liveStreamValue = valString.c_str();
432             }
433 
434             uint32_t value = strtoul(liveStreamValue, NULL, 10);
435             if (value == 1) {
436                 ALOGV("found live stream");
437                 return true;
438             }
439         } else {
440             // It is a live stream if no duration is returned
441             int64_t durationUs;
442             if (!desc->getDurationUs(&durationUs)) {
443                 ALOGV("No duration found, assume live stream");
444                 return true;
445             }
446         }
447 
448         return false;
449     }
450 
onMessageReceivedMyHandler451     virtual void onMessageReceived(const sp<AMessage> &msg) {
452         switch (msg->what()) {
453             case 'conn':
454             {
455                 int32_t result;
456                 CHECK(msg->findInt32("result", &result));
457 
458                 ALOGI("connection request completed with result %d (%s)",
459                      result, strerror(-result));
460 
461                 if (result == OK) {
462                     AString request;
463                     request = "DESCRIBE ";
464                     request.append(mSessionURL);
465                     request.append(" RTSP/1.0\r\n");
466                     request.append("Accept: application/sdp\r\n");
467                     request.append("\r\n");
468 
469                     sp<AMessage> reply = new AMessage('desc', this);
470                     mConn->sendRequest(request.c_str(), reply);
471                 } else {
472                     (new AMessage('disc', this))->post();
473                 }
474                 break;
475             }
476 
477             case 'disc':
478             {
479                 ++mKeepAliveGeneration;
480 
481                 int32_t reconnect;
482                 if (msg->findInt32("reconnect", &reconnect) && reconnect) {
483                     sp<AMessage> reply = new AMessage('conn', this);
484                     mConn->connect(mOriginalSessionURL.c_str(), reply);
485                 } else {
486                     (new AMessage('quit', this))->post();
487                 }
488                 break;
489             }
490 
491             case 'desc':
492             {
493                 int32_t result;
494                 CHECK(msg->findInt32("result", &result));
495 
496                 ALOGI("DESCRIBE completed with result %d (%s)",
497                      result, strerror(-result));
498 
499                 if (result == OK) {
500                     sp<RefBase> obj;
501                     CHECK(msg->findObject("response", &obj));
502                     sp<ARTSPResponse> response =
503                         static_cast<ARTSPResponse *>(obj.get());
504 
505                     if (response->mStatusCode == 301 || response->mStatusCode == 302) {
506                         ssize_t i = response->mHeaders.indexOfKey("location");
507                         CHECK_GE(i, 0);
508 
509                         mOriginalSessionURL = response->mHeaders.valueAt(i);
510                         mSessionURL = mOriginalSessionURL;
511 
512                         // Strip any authentication info from the session url, we don't
513                         // want to transmit user/pass in cleartext.
514                         AString host, path, user, pass;
515                         unsigned port;
516                         if (ARTSPConnection::ParseURL(
517                                     mSessionURL.c_str(), &host, &port, &path, &user, &pass)
518                                 && user.size() > 0) {
519                             mSessionURL.clear();
520                             mSessionURL.append("rtsp://");
521                             mSessionURL.append(host);
522                             mSessionURL.append(":");
523                             mSessionURL.append(AStringPrintf("%u", port));
524                             mSessionURL.append(path);
525 
526                             ALOGI("rewritten session url: '%s'", mSessionURL.c_str());
527                         }
528 
529                         sp<AMessage> reply = new AMessage('conn', this);
530                         mConn->connect(mOriginalSessionURL.c_str(), reply);
531                         break;
532                     }
533 
534                     if (response->mStatusCode != 200) {
535                         result = UNKNOWN_ERROR;
536                     } else if (response->mContent == NULL) {
537                         result = ERROR_MALFORMED;
538                         ALOGE("The response has no content.");
539                     } else {
540                         mSessionDesc = new ASessionDescription;
541 
542                         mSessionDesc->setTo(
543                                 response->mContent->data(),
544                                 response->mContent->size());
545 
546                         if (!mSessionDesc->isValid()) {
547                             ALOGE("Failed to parse session description.");
548                             result = ERROR_MALFORMED;
549                         } else {
550                             ssize_t i = response->mHeaders.indexOfKey("content-base");
551                             if (i >= 0) {
552                                 mBaseURL = response->mHeaders.valueAt(i);
553                             } else {
554                                 i = response->mHeaders.indexOfKey("content-location");
555                                 if (i >= 0) {
556                                     mBaseURL = response->mHeaders.valueAt(i);
557                                 } else {
558                                     mBaseURL = mSessionURL;
559                                 }
560                             }
561 
562                             mSeekable = !isLiveStream(mSessionDesc);
563 
564                             if (!mBaseURL.startsWith("rtsp://")) {
565                                 // Some misbehaving servers specify a relative
566                                 // URL in one of the locations above, combine
567                                 // it with the absolute session URL to get
568                                 // something usable...
569 
570                                 ALOGW("Server specified a non-absolute base URL"
571                                      ", combining it with the session URL to "
572                                      "get something usable...");
573 
574                                 AString tmp;
575                                 CHECK(MakeURL(
576                                             mSessionURL.c_str(),
577                                             mBaseURL.c_str(),
578                                             &tmp));
579 
580                                 mBaseURL = tmp;
581                             }
582 
583                             mControlURL = getControlURL();
584 
585                             if (mSessionDesc->countTracks() < 2) {
586                                 // There's no actual tracks in this session.
587                                 // The first "track" is merely session meta
588                                 // data.
589 
590                                 ALOGW("Session doesn't contain any playable "
591                                      "tracks. Aborting.");
592                                 result = ERROR_UNSUPPORTED;
593                             } else {
594                                 setupTrack(1);
595                             }
596                         }
597                     }
598                 }
599 
600                 if (result != OK) {
601                     sp<AMessage> reply = new AMessage('disc', this);
602                     mConn->disconnect(reply);
603                 }
604                 break;
605             }
606 
607             case 'sdpl':
608             {
609                 int32_t result;
610                 CHECK(msg->findInt32("result", &result));
611 
612                 ALOGI("SDP connection request completed with result %d (%s)",
613                      result, strerror(-result));
614 
615                 if (result == OK) {
616                     sp<RefBase> obj;
617                     CHECK(msg->findObject("description", &obj));
618                     mSessionDesc =
619                         static_cast<ASessionDescription *>(obj.get());
620 
621                     if (!mSessionDesc->isValid()) {
622                         ALOGE("Failed to parse session description.");
623                         result = ERROR_MALFORMED;
624                     } else {
625                         mBaseURL = mSessionURL;
626 
627                         mSeekable = !isLiveStream(mSessionDesc);
628 
629                         mControlURL = getControlURL();
630 
631                         if (mSessionDesc->countTracks() < 2) {
632                             // There's no actual tracks in this session.
633                             // The first "track" is merely session meta
634                             // data.
635 
636                             ALOGW("Session doesn't contain any playable "
637                                  "tracks. Aborting.");
638                             result = ERROR_UNSUPPORTED;
639                         } else {
640                             setupTrack(1);
641                         }
642                     }
643                 }
644 
645                 if (result != OK) {
646                     sp<AMessage> reply = new AMessage('disc', this);
647                     mConn->disconnect(reply);
648                 }
649                 break;
650             }
651 
652             case 'setu':
653             {
654                 size_t index;
655                 CHECK(msg->findSize("index", &index));
656 
657                 TrackInfo *track = NULL;
658                 size_t trackIndex;
659                 if (msg->findSize("track-index", &trackIndex)) {
660                     track = &mTracks.editItemAt(trackIndex);
661                 }
662 
663                 int32_t result;
664                 CHECK(msg->findInt32("result", &result));
665 
666                 ALOGI("SETUP(%zu) completed with result %d (%s)",
667                      index, result, strerror(-result));
668 
669                 if (result == OK) {
670                     CHECK(track != NULL);
671 
672                     sp<RefBase> obj;
673                     CHECK(msg->findObject("response", &obj));
674                     sp<ARTSPResponse> response =
675                         static_cast<ARTSPResponse *>(obj.get());
676 
677                     if (response->mStatusCode != 200) {
678                         result = UNKNOWN_ERROR;
679                     } else {
680                         ssize_t i = response->mHeaders.indexOfKey("session");
681                         CHECK_GE(i, 0);
682 
683                         mSessionID = response->mHeaders.valueAt(i);
684 
685                         mKeepAliveTimeoutUs = kDefaultKeepAliveTimeoutUs;
686                         AString timeoutStr;
687                         if (GetAttribute(
688                                     mSessionID.c_str(), "timeout", &timeoutStr)) {
689                             char *end;
690                             unsigned long timeoutSecs =
691                                 strtoul(timeoutStr.c_str(), &end, 10);
692 
693                             if (end == timeoutStr.c_str() || *end != '\0') {
694                                 ALOGW("server specified malformed timeout '%s'",
695                                      timeoutStr.c_str());
696 
697                                 mKeepAliveTimeoutUs = kDefaultKeepAliveTimeoutUs;
698                             } else if (timeoutSecs < 15) {
699                                 ALOGW("server specified too short a timeout "
700                                      "(%lu secs), using default.",
701                                      timeoutSecs);
702 
703                                 mKeepAliveTimeoutUs = kDefaultKeepAliveTimeoutUs;
704                             } else {
705                                 mKeepAliveTimeoutUs = timeoutSecs * 1000000ll;
706 
707                                 ALOGI("server specified timeout of %lu secs.",
708                                      timeoutSecs);
709                             }
710                         }
711 
712                         i = mSessionID.find(";");
713                         if (i >= 0) {
714                             // Remove options, i.e. ";timeout=90"
715                             mSessionID.erase(i, mSessionID.size() - i);
716                         }
717 
718                         sp<AMessage> notify = new AMessage('accu', this);
719                         notify->setSize("track-index", trackIndex);
720 
721                         i = response->mHeaders.indexOfKey("transport");
722                         CHECK_GE(i, 0);
723 
724                         if (track->mRTPSocket != -1 && track->mRTCPSocket != -1) {
725                             if (!track->mUsingInterleavedTCP) {
726                                 AString transport = response->mHeaders.valueAt(i);
727 
728                                 // We are going to continue even if we were
729                                 // unable to poke a hole into the firewall...
730                                 pokeAHole(
731                                         track->mRTPSocket,
732                                         track->mRTCPSocket,
733                                         transport);
734                             }
735 
736                             mRTPConn->addStream(
737                                     track->mRTPSocket, track->mRTCPSocket,
738                                     mSessionDesc, index,
739                                     notify, track->mUsingInterleavedTCP);
740 
741                             mSetupTracksSuccessful = true;
742                         } else {
743                             result = BAD_VALUE;
744                         }
745                     }
746                 }
747 
748                 if (result != OK) {
749                     if (track) {
750                         if (!track->mUsingInterleavedTCP) {
751                             // Clear the tag
752                             if (mUIDValid) {
753                                 HTTPBase::UnRegisterSocketUserTag(track->mRTPSocket);
754                                 HTTPBase::UnRegisterSocketUserMark(track->mRTPSocket);
755                                 HTTPBase::UnRegisterSocketUserTag(track->mRTCPSocket);
756                                 HTTPBase::UnRegisterSocketUserMark(track->mRTCPSocket);
757                             }
758 
759                             close(track->mRTPSocket);
760                             close(track->mRTCPSocket);
761                         }
762 
763                         mTracks.removeItemsAt(trackIndex);
764                     }
765                 }
766 
767                 ++index;
768                 if (result == OK && index < mSessionDesc->countTracks()) {
769                     setupTrack(index);
770                 } else if (mSetupTracksSuccessful) {
771                     ++mKeepAliveGeneration;
772                     postKeepAlive();
773 
774                     AString request = "PLAY ";
775                     request.append(mControlURL);
776                     request.append(" RTSP/1.0\r\n");
777 
778                     request.append("Session: ");
779                     request.append(mSessionID);
780                     request.append("\r\n");
781 
782                     request.append("\r\n");
783 
784                     sp<AMessage> reply = new AMessage('play', this);
785                     mConn->sendRequest(request.c_str(), reply);
786                 } else {
787                     sp<AMessage> reply = new AMessage('disc', this);
788                     mConn->disconnect(reply);
789                 }
790                 break;
791             }
792 
793             case 'play':
794             {
795                 int32_t result;
796                 CHECK(msg->findInt32("result", &result));
797 
798                 ALOGI("PLAY completed with result %d (%s)",
799                      result, strerror(-result));
800 
801                 if (result == OK) {
802                     sp<RefBase> obj;
803                     CHECK(msg->findObject("response", &obj));
804                     sp<ARTSPResponse> response =
805                         static_cast<ARTSPResponse *>(obj.get());
806 
807                     if (response->mStatusCode != 200) {
808                         result = UNKNOWN_ERROR;
809                     } else {
810                         parsePlayResponse(response);
811                         postTimeout();
812                     }
813                 }
814 
815                 if (result != OK) {
816                     sp<AMessage> reply = new AMessage('disc', this);
817                     mConn->disconnect(reply);
818                 }
819 
820                 break;
821             }
822 
823             case 'aliv':
824             {
825                 int32_t generation;
826                 CHECK(msg->findInt32("generation", &generation));
827 
828                 if (generation != mKeepAliveGeneration) {
829                     // obsolete event.
830                     break;
831                 }
832 
833                 AString request;
834                 request.append("OPTIONS ");
835                 request.append(mSessionURL);
836                 request.append(" RTSP/1.0\r\n");
837                 request.append("Session: ");
838                 request.append(mSessionID);
839                 request.append("\r\n");
840                 request.append("\r\n");
841 
842                 sp<AMessage> reply = new AMessage('opts', this);
843                 reply->setInt32("generation", mKeepAliveGeneration);
844                 mConn->sendRequest(request.c_str(), reply);
845                 break;
846             }
847 
848             case 'opts':
849             {
850                 int32_t result;
851                 CHECK(msg->findInt32("result", &result));
852 
853                 ALOGI("OPTIONS completed with result %d (%s)",
854                      result, strerror(-result));
855 
856                 int32_t generation;
857                 CHECK(msg->findInt32("generation", &generation));
858 
859                 if (generation != mKeepAliveGeneration) {
860                     // obsolete event.
861                     break;
862                 }
863 
864                 postKeepAlive();
865                 break;
866             }
867 
868             case 'abor':
869             {
870                 for (size_t i = 0; i < mTracks.size(); ++i) {
871                     TrackInfo *info = &mTracks.editItemAt(i);
872 
873                     if (!mFirstAccessUnit) {
874                         postQueueEOS(i, ERROR_END_OF_STREAM);
875                     }
876 
877                     if (!info->mUsingInterleavedTCP) {
878                         mRTPConn->removeStream(info->mRTPSocket, info->mRTCPSocket);
879 
880                         // Clear the tag
881                         if (mUIDValid) {
882                             HTTPBase::UnRegisterSocketUserTag(info->mRTPSocket);
883                             HTTPBase::UnRegisterSocketUserMark(info->mRTPSocket);
884                             HTTPBase::UnRegisterSocketUserTag(info->mRTCPSocket);
885                             HTTPBase::UnRegisterSocketUserMark(info->mRTCPSocket);
886                         }
887 
888                         close(info->mRTPSocket);
889                         close(info->mRTCPSocket);
890                     }
891                 }
892                 mTracks.clear();
893                 mSetupTracksSuccessful = false;
894                 mSeekPending = false;
895                 mFirstAccessUnit = true;
896                 mAllTracksHaveTime = false;
897                 mNTPAnchorUs = -1;
898                 mMediaAnchorUs = -1;
899                 mNumAccessUnitsReceived = 0;
900                 mReceivedFirstRTCPPacket = false;
901                 mReceivedFirstRTPPacket = false;
902                 mPausing = false;
903                 mSeekable = true;
904 
905                 sp<AMessage> reply = new AMessage('tear', this);
906 
907                 int32_t reconnect;
908                 if (msg->findInt32("reconnect", &reconnect) && reconnect) {
909                     reply->setInt32("reconnect", true);
910                 }
911 
912                 AString request;
913                 request = "TEARDOWN ";
914 
915                 // XXX should use aggregate url from SDP here...
916                 request.append(mSessionURL);
917                 request.append(" RTSP/1.0\r\n");
918 
919                 request.append("Session: ");
920                 request.append(mSessionID);
921                 request.append("\r\n");
922 
923                 request.append("\r\n");
924 
925                 mConn->sendRequest(request.c_str(), reply);
926                 break;
927             }
928 
929             case 'tear':
930             {
931                 int32_t result;
932                 CHECK(msg->findInt32("result", &result));
933 
934                 ALOGI("TEARDOWN completed with result %d (%s)",
935                      result, strerror(-result));
936 
937                 sp<AMessage> reply = new AMessage('disc', this);
938 
939                 int32_t reconnect;
940                 if (msg->findInt32("reconnect", &reconnect) && reconnect) {
941                     reply->setInt32("reconnect", true);
942                 }
943 
944                 mConn->disconnect(reply);
945                 break;
946             }
947 
948             case 'quit':
949             {
950                 sp<AMessage> msg = mNotify->dup();
951                 msg->setInt32("what", kWhatDisconnected);
952                 msg->setInt32("result", UNKNOWN_ERROR);
953                 msg->post();
954                 break;
955             }
956 
957             case 'chek':
958             {
959                 int32_t generation;
960                 CHECK(msg->findInt32("generation", &generation));
961                 if (generation != mCheckGeneration) {
962                     // This is an outdated message. Ignore.
963                     break;
964                 }
965 
966                 if (mNumAccessUnitsReceived == 0) {
967 #if 1
968                     ALOGI("stream ended? aborting.");
969                     (new AMessage('abor', this))->post();
970                     break;
971 #else
972                     ALOGI("haven't seen an AU in a looong time.");
973 #endif
974                 }
975 
976                 mNumAccessUnitsReceived = 0;
977                 msg->post(kAccessUnitTimeoutUs);
978                 break;
979             }
980 
981             case 'accu':
982             {
983                 if (mSeekPending) {
984                     ALOGV("Stale access unit.");
985                     break;
986                 }
987 
988                 int32_t timeUpdate;
989                 if (msg->findInt32("time-update", &timeUpdate) && timeUpdate) {
990                     size_t trackIndex;
991                     CHECK(msg->findSize("track-index", &trackIndex));
992 
993                     uint32_t rtpTime;
994                     uint64_t ntpTime;
995                     CHECK(msg->findInt32("rtp-time", (int32_t *)&rtpTime));
996                     CHECK(msg->findInt64("ntp-time", (int64_t *)&ntpTime));
997 
998                     onTimeUpdate(trackIndex, rtpTime, ntpTime);
999                     break;
1000                 }
1001 
1002                 int32_t first;
1003                 if (msg->findInt32("first-rtcp", &first)) {
1004                     mReceivedFirstRTCPPacket = true;
1005                     break;
1006                 }
1007 
1008                 if (msg->findInt32("first-rtp", &first)) {
1009                     mReceivedFirstRTPPacket = true;
1010                     break;
1011                 }
1012 
1013                 ++mNumAccessUnitsReceived;
1014                 postAccessUnitTimeoutCheck();
1015 
1016                 size_t trackIndex;
1017                 CHECK(msg->findSize("track-index", &trackIndex));
1018 
1019                 if (trackIndex >= mTracks.size()) {
1020                     ALOGV("late packets ignored.");
1021                     break;
1022                 }
1023 
1024                 TrackInfo *track = &mTracks.editItemAt(trackIndex);
1025 
1026                 int32_t eos;
1027                 if (msg->findInt32("eos", &eos)) {
1028                     ALOGI("received BYE on track index %zu", trackIndex);
1029                     if (!mAllTracksHaveTime && dataReceivedOnAllChannels()) {
1030                         ALOGI("No time established => fake existing data");
1031 
1032                         track->mEOSReceived = true;
1033                         mTryFakeRTCP = true;
1034                         mReceivedFirstRTCPPacket = true;
1035                         fakeTimestamps();
1036                     } else {
1037                         postQueueEOS(trackIndex, ERROR_END_OF_STREAM);
1038                     }
1039                     return;
1040                 }
1041 
1042                 if (mSeekPending) {
1043                     ALOGV("we're seeking, dropping stale packet.");
1044                     break;
1045                 }
1046 
1047                 sp<ABuffer> accessUnit;
1048                 CHECK(msg->findBuffer("access-unit", &accessUnit));
1049                 onAccessUnitComplete(trackIndex, accessUnit);
1050                 break;
1051             }
1052 
1053             case 'paus':
1054             {
1055                 int32_t generation;
1056                 CHECK(msg->findInt32("pausecheck", &generation));
1057                 if (generation != mPauseGeneration) {
1058                     ALOGV("Ignoring outdated pause message.");
1059                     break;
1060                 }
1061 
1062                 if (!mSeekable) {
1063                     ALOGW("This is a live stream, ignoring pause request.");
1064                     break;
1065                 }
1066 
1067                 if (mPausing) {
1068                     ALOGV("This stream is already paused.");
1069                     break;
1070                 }
1071 
1072                 mCheckPending = true;
1073                 ++mCheckGeneration;
1074                 mPausing = true;
1075 
1076                 AString request = "PAUSE ";
1077                 request.append(mControlURL);
1078                 request.append(" RTSP/1.0\r\n");
1079 
1080                 request.append("Session: ");
1081                 request.append(mSessionID);
1082                 request.append("\r\n");
1083 
1084                 request.append("\r\n");
1085 
1086                 sp<AMessage> reply = new AMessage('pau2', this);
1087                 mConn->sendRequest(request.c_str(), reply);
1088                 break;
1089             }
1090 
1091             case 'pau2':
1092             {
1093                 int32_t result;
1094                 CHECK(msg->findInt32("result", &result));
1095                 mCheckTimeoutGeneration++;
1096 
1097                 ALOGI("PAUSE completed with result %d (%s)",
1098                      result, strerror(-result));
1099                 break;
1100             }
1101 
1102             case 'resu':
1103             {
1104                 if (mPausing && mSeekPending) {
1105                     // If seeking, Play will be sent from see1 instead
1106                     break;
1107                 }
1108 
1109                 if (!mPausing) {
1110                     // Dont send PLAY if we have not paused
1111                     break;
1112                 }
1113                 AString request = "PLAY ";
1114                 request.append(mControlURL);
1115                 request.append(" RTSP/1.0\r\n");
1116 
1117                 request.append("Session: ");
1118                 request.append(mSessionID);
1119                 request.append("\r\n");
1120 
1121                 request.append("\r\n");
1122 
1123                 sp<AMessage> reply = new AMessage('res2', this);
1124                 mConn->sendRequest(request.c_str(), reply);
1125                 break;
1126             }
1127 
1128             case 'res2':
1129             {
1130                 int32_t result;
1131                 CHECK(msg->findInt32("result", &result));
1132 
1133                 ALOGI("PLAY (for resume) completed with result %d (%s)",
1134                      result, strerror(-result));
1135 
1136                 mCheckPending = false;
1137                 ++mCheckGeneration;
1138                 postAccessUnitTimeoutCheck();
1139 
1140                 if (result == OK) {
1141                     sp<RefBase> obj;
1142                     CHECK(msg->findObject("response", &obj));
1143                     sp<ARTSPResponse> response =
1144                         static_cast<ARTSPResponse *>(obj.get());
1145 
1146                     if (response->mStatusCode != 200) {
1147                         result = UNKNOWN_ERROR;
1148                     } else {
1149                         parsePlayResponse(response);
1150 
1151                         // Post new timeout in order to make sure to use
1152                         // fake timestamps if no new Sender Reports arrive
1153                         postTimeout();
1154                     }
1155                 }
1156 
1157                 if (result != OK) {
1158                     ALOGE("resume failed, aborting.");
1159                     (new AMessage('abor', this))->post();
1160                 }
1161 
1162                 mPausing = false;
1163                 break;
1164             }
1165 
1166             case 'seek':
1167             {
1168                 if (!mSeekable) {
1169                     ALOGW("This is a live stream, ignoring seek request.");
1170 
1171                     sp<AMessage> msg = mNotify->dup();
1172                     msg->setInt32("what", kWhatSeekDone);
1173                     msg->post();
1174                     break;
1175                 }
1176 
1177                 int64_t timeUs;
1178                 CHECK(msg->findInt64("time", &timeUs));
1179 
1180                 mSeekPending = true;
1181 
1182                 // Disable the access unit timeout until we resumed
1183                 // playback again.
1184                 mCheckPending = true;
1185                 ++mCheckGeneration;
1186 
1187                 sp<AMessage> reply = new AMessage('see0', this);
1188                 reply->setInt64("time", timeUs);
1189 
1190                 if (mPausing) {
1191                     // PAUSE already sent
1192                     ALOGI("Pause already sent");
1193                     reply->post();
1194                     break;
1195                 }
1196                 AString request = "PAUSE ";
1197                 request.append(mControlURL);
1198                 request.append(" RTSP/1.0\r\n");
1199 
1200                 request.append("Session: ");
1201                 request.append(mSessionID);
1202                 request.append("\r\n");
1203 
1204                 request.append("\r\n");
1205 
1206                 mConn->sendRequest(request.c_str(), reply);
1207                 break;
1208             }
1209 
1210             case 'see0':
1211             {
1212                 // Session is paused now.
1213                 status_t err = OK;
1214                 msg->findInt32("result", &err);
1215 
1216                 int64_t timeUs;
1217                 CHECK(msg->findInt64("time", &timeUs));
1218 
1219                 sp<AMessage> notify = mNotify->dup();
1220                 notify->setInt32("what", kWhatSeekPaused);
1221                 notify->setInt32("err", err);
1222                 notify->setInt64("time", timeUs);
1223                 notify->post();
1224                 break;
1225 
1226             }
1227 
1228             case 'see1':
1229             {
1230                 for (size_t i = 0; i < mTracks.size(); ++i) {
1231                     TrackInfo *info = &mTracks.editItemAt(i);
1232 
1233                     postQueueSeekDiscontinuity(i);
1234                     info->mEOSReceived = false;
1235 
1236                     info->mRTPAnchor = 0;
1237                     info->mNTPAnchorUs = -1;
1238                 }
1239 
1240                 mAllTracksHaveTime = false;
1241                 mNTPAnchorUs = -1;
1242 
1243                 // Start new timeoutgeneration to avoid getting timeout
1244                 // before PLAY response arrive
1245                 postTimeout();
1246 
1247                 int64_t timeUs;
1248                 CHECK(msg->findInt64("time", &timeUs));
1249 
1250                 AString request = "PLAY ";
1251                 request.append(mControlURL);
1252                 request.append(" RTSP/1.0\r\n");
1253 
1254                 request.append("Session: ");
1255                 request.append(mSessionID);
1256                 request.append("\r\n");
1257 
1258                 request.append(
1259                         AStringPrintf(
1260                             "Range: npt=%lld-\r\n", timeUs / 1000000ll));
1261 
1262                 request.append("\r\n");
1263 
1264                 sp<AMessage> reply = new AMessage('see2', this);
1265                 mConn->sendRequest(request.c_str(), reply);
1266                 break;
1267             }
1268 
1269             case 'see2':
1270             {
1271                 if (mTracks.size() == 0) {
1272                     // We have already hit abor, break
1273                     break;
1274                 }
1275 
1276                 int32_t result;
1277                 CHECK(msg->findInt32("result", &result));
1278 
1279                 ALOGI("PLAY (for seek) completed with result %d (%s)",
1280                      result, strerror(-result));
1281 
1282                 mCheckPending = false;
1283                 ++mCheckGeneration;
1284                 postAccessUnitTimeoutCheck();
1285 
1286                 if (result == OK) {
1287                     sp<RefBase> obj;
1288                     CHECK(msg->findObject("response", &obj));
1289                     sp<ARTSPResponse> response =
1290                         static_cast<ARTSPResponse *>(obj.get());
1291 
1292                     if (response->mStatusCode != 200) {
1293                         result = UNKNOWN_ERROR;
1294                     } else {
1295                         parsePlayResponse(response);
1296 
1297                         // Post new timeout in order to make sure to use
1298                         // fake timestamps if no new Sender Reports arrive
1299                         postTimeout();
1300 
1301                         ssize_t i = response->mHeaders.indexOfKey("rtp-info");
1302                         CHECK_GE(i, 0);
1303 
1304                         ALOGV("rtp-info: %s", response->mHeaders.valueAt(i).c_str());
1305 
1306                         ALOGI("seek completed.");
1307                     }
1308                 }
1309 
1310                 if (result != OK) {
1311                     ALOGE("seek failed, aborting.");
1312                     (new AMessage('abor', this))->post();
1313                 }
1314 
1315                 mPausing = false;
1316                 mSeekPending = false;
1317 
1318                 // Discard all stale access units.
1319                 for (size_t i = 0; i < mTracks.size(); ++i) {
1320                     TrackInfo *track = &mTracks.editItemAt(i);
1321                     track->mPackets.clear();
1322                 }
1323 
1324                 sp<AMessage> msg = mNotify->dup();
1325                 msg->setInt32("what", kWhatSeekDone);
1326                 msg->post();
1327                 break;
1328             }
1329 
1330             case 'biny':
1331             {
1332                 sp<ABuffer> buffer;
1333                 CHECK(msg->findBuffer("buffer", &buffer));
1334 
1335                 int32_t index;
1336                 CHECK(buffer->meta()->findInt32("index", &index));
1337 
1338                 mRTPConn->injectPacket(index, buffer);
1339                 break;
1340             }
1341 
1342             case 'tiou':
1343             {
1344                 int32_t timeoutGenerationCheck;
1345                 CHECK(msg->findInt32("tioucheck", &timeoutGenerationCheck));
1346                 if (timeoutGenerationCheck != mCheckTimeoutGeneration) {
1347                     // This is an outdated message. Ignore.
1348                     // This typically happens if a lot of seeks are
1349                     // performed, since new timeout messages now are
1350                     // posted at seek as well.
1351                     break;
1352                 }
1353                 if (!mReceivedFirstRTCPPacket) {
1354                     if (dataReceivedOnAllChannels() && !mTryFakeRTCP) {
1355                         ALOGW("We received RTP packets but no RTCP packets, "
1356                              "using fake timestamps.");
1357 
1358                         mTryFakeRTCP = true;
1359 
1360                         mReceivedFirstRTCPPacket = true;
1361 
1362                         fakeTimestamps();
1363                     } else if (!mReceivedFirstRTPPacket && !mTryTCPInterleaving) {
1364                         ALOGW("Never received any data, switching transports.");
1365 
1366                         mTryTCPInterleaving = true;
1367 
1368                         sp<AMessage> msg = new AMessage('abor', this);
1369                         msg->setInt32("reconnect", true);
1370                         msg->post();
1371                     } else {
1372                         ALOGW("Never received any data, disconnecting.");
1373                         (new AMessage('abor', this))->post();
1374                     }
1375                 } else {
1376                     if (!mAllTracksHaveTime) {
1377                         ALOGW("We received some RTCP packets, but time "
1378                               "could not be established on all tracks, now "
1379                               "using fake timestamps");
1380 
1381                         fakeTimestamps();
1382                     }
1383                 }
1384                 break;
1385             }
1386 
1387             default:
1388                 TRESPASS();
1389                 break;
1390         }
1391     }
1392 
postKeepAliveMyHandler1393     void postKeepAlive() {
1394         sp<AMessage> msg = new AMessage('aliv', this);
1395         msg->setInt32("generation", mKeepAliveGeneration);
1396         msg->post((mKeepAliveTimeoutUs * 9) / 10);
1397     }
1398 
cancelAccessUnitTimeoutCheckMyHandler1399     void cancelAccessUnitTimeoutCheck() {
1400         ALOGV("cancelAccessUnitTimeoutCheck");
1401         ++mCheckGeneration;
1402     }
1403 
postAccessUnitTimeoutCheckMyHandler1404     void postAccessUnitTimeoutCheck() {
1405         if (mCheckPending) {
1406             return;
1407         }
1408 
1409         mCheckPending = true;
1410         sp<AMessage> check = new AMessage('chek', this);
1411         check->setInt32("generation", mCheckGeneration);
1412         check->post(kAccessUnitTimeoutUs);
1413     }
1414 
SplitStringMyHandler1415     static void SplitString(
1416             const AString &s, const char *separator, List<AString> *items) {
1417         items->clear();
1418         size_t start = 0;
1419         while (start < s.size()) {
1420             ssize_t offset = s.find(separator, start);
1421 
1422             if (offset < 0) {
1423                 items->push_back(AString(s, start, s.size() - start));
1424                 break;
1425             }
1426 
1427             items->push_back(AString(s, start, offset - start));
1428             start = offset + strlen(separator);
1429         }
1430     }
1431 
parsePlayResponseMyHandler1432     void parsePlayResponse(const sp<ARTSPResponse> &response) {
1433         mPlayResponseParsed = true;
1434         if (mTracks.size() == 0) {
1435             ALOGV("parsePlayResponse: late packets ignored.");
1436             return;
1437         }
1438 
1439         ssize_t i = response->mHeaders.indexOfKey("range");
1440         if (i < 0) {
1441             // Server doesn't even tell use what range it is going to
1442             // play, therefore we won't support seeking.
1443             return;
1444         }
1445 
1446         AString range = response->mHeaders.valueAt(i);
1447         ALOGV("Range: %s", range.c_str());
1448 
1449         AString val;
1450         CHECK(GetAttribute(range.c_str(), "npt", &val));
1451 
1452         float npt1, npt2;
1453         if (!ASessionDescription::parseNTPRange(val.c_str(), &npt1, &npt2)) {
1454             // This is a live stream and therefore not seekable.
1455 
1456             ALOGI("This is a live stream");
1457             return;
1458         }
1459 
1460         i = response->mHeaders.indexOfKey("rtp-info");
1461         CHECK_GE(i, 0);
1462 
1463         AString rtpInfo = response->mHeaders.valueAt(i);
1464         List<AString> streamInfos;
1465         SplitString(rtpInfo, ",", &streamInfos);
1466 
1467         int n = 1;
1468         for (List<AString>::iterator it = streamInfos.begin();
1469              it != streamInfos.end(); ++it) {
1470             (*it).trim();
1471             ALOGV("streamInfo[%d] = %s", n, (*it).c_str());
1472 
1473             CHECK(GetAttribute((*it).c_str(), "url", &val));
1474 
1475             size_t trackIndex = 0;
1476             while (trackIndex < mTracks.size()
1477                     && !(val == mTracks.editItemAt(trackIndex).mURL)) {
1478                 ++trackIndex;
1479             }
1480             CHECK_LT(trackIndex, mTracks.size());
1481 
1482             CHECK(GetAttribute((*it).c_str(), "seq", &val));
1483 
1484             char *end;
1485             unsigned long seq = strtoul(val.c_str(), &end, 10);
1486 
1487             TrackInfo *info = &mTracks.editItemAt(trackIndex);
1488             info->mFirstSeqNumInSegment = seq;
1489             info->mNewSegment = true;
1490             info->mAllowedStaleAccessUnits = kMaxAllowedStaleAccessUnits;
1491 
1492             CHECK(GetAttribute((*it).c_str(), "rtptime", &val));
1493 
1494             uint32_t rtpTime = strtoul(val.c_str(), &end, 10);
1495 
1496             ALOGV("track #%d: rtpTime=%u <=> npt=%.2f", n, rtpTime, npt1);
1497 
1498             info->mNormalPlayTimeRTP = rtpTime;
1499             info->mNormalPlayTimeUs = (int64_t)(npt1 * 1E6);
1500 
1501             if (!mFirstAccessUnit) {
1502                 postNormalPlayTimeMapping(
1503                         trackIndex,
1504                         info->mNormalPlayTimeRTP, info->mNormalPlayTimeUs);
1505             }
1506 
1507             ++n;
1508         }
1509     }
1510 
getTrackFormatMyHandler1511     sp<MetaData> getTrackFormat(size_t index, int32_t *timeScale) {
1512         CHECK_GE(index, 0u);
1513         CHECK_LT(index, mTracks.size());
1514 
1515         const TrackInfo &info = mTracks.itemAt(index);
1516 
1517         *timeScale = info.mTimeScale;
1518 
1519         return info.mPacketSource->getFormat();
1520     }
1521 
countTracksMyHandler1522     size_t countTracks() const {
1523         return mTracks.size();
1524     }
1525 
1526 private:
1527     struct TrackInfo {
1528         AString mURL;
1529         int mRTPSocket;
1530         int mRTCPSocket;
1531         bool mUsingInterleavedTCP;
1532         uint32_t mFirstSeqNumInSegment;
1533         bool mNewSegment;
1534         int32_t mAllowedStaleAccessUnits;
1535 
1536         uint32_t mRTPAnchor;
1537         int64_t mNTPAnchorUs;
1538         int32_t mTimeScale;
1539         bool mEOSReceived;
1540 
1541         uint32_t mNormalPlayTimeRTP;
1542         int64_t mNormalPlayTimeUs;
1543 
1544         sp<APacketSource> mPacketSource;
1545 
1546         // Stores packets temporarily while no notion of time
1547         // has been established yet.
1548         List<sp<ABuffer> > mPackets;
1549     };
1550 
1551     sp<AMessage> mNotify;
1552     bool mUIDValid;
1553     uid_t mUID;
1554     sp<ALooper> mNetLooper;
1555     sp<ARTSPConnection> mConn;
1556     sp<ARTPConnection> mRTPConn;
1557     sp<ASessionDescription> mSessionDesc;
1558     AString mOriginalSessionURL;  // This one still has user:pass@
1559     AString mSessionURL;
1560     AString mSessionHost;
1561     AString mBaseURL;
1562     AString mControlURL;
1563     AString mSessionID;
1564     bool mSetupTracksSuccessful;
1565     bool mSeekPending;
1566     bool mFirstAccessUnit;
1567 
1568     bool mAllTracksHaveTime;
1569     int64_t mNTPAnchorUs;
1570     int64_t mMediaAnchorUs;
1571     int64_t mLastMediaTimeUs;
1572 
1573     int64_t mNumAccessUnitsReceived;
1574     bool mCheckPending;
1575     int32_t mCheckGeneration;
1576     int32_t mCheckTimeoutGeneration;
1577     bool mTryTCPInterleaving;
1578     bool mTryFakeRTCP;
1579     bool mReceivedFirstRTCPPacket;
1580     bool mReceivedFirstRTPPacket;
1581     bool mSeekable;
1582     int64_t mKeepAliveTimeoutUs;
1583     int32_t mKeepAliveGeneration;
1584     bool mPausing;
1585     int32_t mPauseGeneration;
1586 
1587     Vector<TrackInfo> mTracks;
1588 
1589     bool mPlayResponseParsed;
1590 
setupTrackMyHandler1591     void setupTrack(size_t index) {
1592         sp<APacketSource> source =
1593             new APacketSource(mSessionDesc, index);
1594 
1595         if (source->initCheck() != OK) {
1596             ALOGW("Unsupported format. Ignoring track #%zu.", index);
1597 
1598             sp<AMessage> reply = new AMessage('setu', this);
1599             reply->setSize("index", index);
1600             reply->setInt32("result", ERROR_UNSUPPORTED);
1601             reply->post();
1602             return;
1603         }
1604 
1605         AString url;
1606         CHECK(mSessionDesc->findAttribute(index, "a=control", &url));
1607 
1608         AString trackURL;
1609         CHECK(MakeURL(mBaseURL.c_str(), url.c_str(), &trackURL));
1610 
1611         mTracks.push(TrackInfo());
1612         TrackInfo *info = &mTracks.editItemAt(mTracks.size() - 1);
1613         info->mURL = trackURL;
1614         info->mPacketSource = source;
1615         info->mUsingInterleavedTCP = false;
1616         info->mFirstSeqNumInSegment = 0;
1617         info->mNewSegment = true;
1618         info->mAllowedStaleAccessUnits = kMaxAllowedStaleAccessUnits;
1619         info->mRTPSocket = -1;
1620         info->mRTCPSocket = -1;
1621         info->mRTPAnchor = 0;
1622         info->mNTPAnchorUs = -1;
1623         info->mNormalPlayTimeRTP = 0;
1624         info->mNormalPlayTimeUs = 0ll;
1625 
1626         unsigned long PT;
1627         AString formatDesc;
1628         AString formatParams;
1629         mSessionDesc->getFormatType(index, &PT, &formatDesc, &formatParams);
1630 
1631         int32_t timescale;
1632         int32_t numChannels;
1633         ASessionDescription::ParseFormatDesc(
1634                 formatDesc.c_str(), &timescale, &numChannels);
1635 
1636         info->mTimeScale = timescale;
1637         info->mEOSReceived = false;
1638 
1639         ALOGV("track #%zu URL=%s", mTracks.size(), trackURL.c_str());
1640 
1641         AString request = "SETUP ";
1642         request.append(trackURL);
1643         request.append(" RTSP/1.0\r\n");
1644 
1645         if (mTryTCPInterleaving) {
1646             size_t interleaveIndex = 2 * (mTracks.size() - 1);
1647             info->mUsingInterleavedTCP = true;
1648             info->mRTPSocket = interleaveIndex;
1649             info->mRTCPSocket = interleaveIndex + 1;
1650 
1651             request.append("Transport: RTP/AVP/TCP;interleaved=");
1652             request.append(interleaveIndex);
1653             request.append("-");
1654             request.append(interleaveIndex + 1);
1655         } else {
1656             unsigned rtpPort;
1657             ARTPConnection::MakePortPair(
1658                     &info->mRTPSocket, &info->mRTCPSocket, &rtpPort);
1659 
1660             if (mUIDValid) {
1661                 HTTPBase::RegisterSocketUserTag(info->mRTPSocket, mUID,
1662                                                 (uint32_t)*(uint32_t*) "RTP_");
1663                 HTTPBase::RegisterSocketUserTag(info->mRTCPSocket, mUID,
1664                                                 (uint32_t)*(uint32_t*) "RTP_");
1665                 HTTPBase::RegisterSocketUserMark(info->mRTPSocket, mUID);
1666                 HTTPBase::RegisterSocketUserMark(info->mRTCPSocket, mUID);
1667             }
1668 
1669             request.append("Transport: RTP/AVP/UDP;unicast;client_port=");
1670             request.append(rtpPort);
1671             request.append("-");
1672             request.append(rtpPort + 1);
1673         }
1674 
1675         request.append("\r\n");
1676 
1677         if (index > 1) {
1678             request.append("Session: ");
1679             request.append(mSessionID);
1680             request.append("\r\n");
1681         }
1682 
1683         request.append("\r\n");
1684 
1685         sp<AMessage> reply = new AMessage('setu', this);
1686         reply->setSize("index", index);
1687         reply->setSize("track-index", mTracks.size() - 1);
1688         mConn->sendRequest(request.c_str(), reply);
1689     }
1690 
MakeURLMyHandler1691     static bool MakeURL(const char *baseURL, const char *url, AString *out) {
1692         out->clear();
1693 
1694         if (strncasecmp("rtsp://", baseURL, 7)) {
1695             // Base URL must be absolute
1696             return false;
1697         }
1698 
1699         if (!strncasecmp("rtsp://", url, 7)) {
1700             // "url" is already an absolute URL, ignore base URL.
1701             out->setTo(url);
1702             return true;
1703         }
1704 
1705         size_t n = strlen(baseURL);
1706         out->setTo(baseURL);
1707         if (baseURL[n - 1] != '/') {
1708             out->append("/");
1709         }
1710         out->append(url);
1711 
1712         return true;
1713     }
1714 
fakeTimestampsMyHandler1715     void fakeTimestamps() {
1716         mNTPAnchorUs = -1ll;
1717         for (size_t i = 0; i < mTracks.size(); ++i) {
1718             onTimeUpdate(i, 0, 0ll);
1719         }
1720     }
1721 
dataReceivedOnAllChannelsMyHandler1722     bool dataReceivedOnAllChannels() {
1723         TrackInfo *track;
1724         for (size_t i = 0; i < mTracks.size(); ++i) {
1725             track = &mTracks.editItemAt(i);
1726             if (track->mPackets.empty()) {
1727                 return false;
1728             }
1729         }
1730         return true;
1731     }
1732 
handleFirstAccessUnitMyHandler1733     void handleFirstAccessUnit() {
1734         if (mFirstAccessUnit) {
1735             sp<AMessage> msg = mNotify->dup();
1736             msg->setInt32("what", kWhatConnected);
1737             msg->post();
1738 
1739             if (mSeekable) {
1740                 for (size_t i = 0; i < mTracks.size(); ++i) {
1741                     TrackInfo *info = &mTracks.editItemAt(i);
1742 
1743                     postNormalPlayTimeMapping(
1744                             i,
1745                             info->mNormalPlayTimeRTP, info->mNormalPlayTimeUs);
1746                 }
1747             }
1748 
1749             mFirstAccessUnit = false;
1750         }
1751     }
1752 
onTimeUpdateMyHandler1753     void onTimeUpdate(int32_t trackIndex, uint32_t rtpTime, uint64_t ntpTime) {
1754         ALOGV("onTimeUpdate track %d, rtpTime = 0x%08x, ntpTime = %#016llx",
1755              trackIndex, rtpTime, (long long)ntpTime);
1756 
1757         int64_t ntpTimeUs = (int64_t)(ntpTime * 1E6 / (1ll << 32));
1758 
1759         TrackInfo *track = &mTracks.editItemAt(trackIndex);
1760 
1761         track->mRTPAnchor = rtpTime;
1762         track->mNTPAnchorUs = ntpTimeUs;
1763 
1764         if (mNTPAnchorUs < 0) {
1765             mNTPAnchorUs = ntpTimeUs;
1766             mMediaAnchorUs = mLastMediaTimeUs;
1767         }
1768 
1769         if (!mAllTracksHaveTime) {
1770             bool allTracksHaveTime = (mTracks.size() > 0);
1771             for (size_t i = 0; i < mTracks.size(); ++i) {
1772                 TrackInfo *track = &mTracks.editItemAt(i);
1773                 if (track->mNTPAnchorUs < 0) {
1774                     allTracksHaveTime = false;
1775                     break;
1776                 }
1777             }
1778             if (allTracksHaveTime) {
1779                 mAllTracksHaveTime = true;
1780                 ALOGI("Time now established for all tracks.");
1781             }
1782         }
1783         if (mAllTracksHaveTime && dataReceivedOnAllChannels()) {
1784             handleFirstAccessUnit();
1785 
1786             // Time is now established, lets start timestamping immediately
1787             for (size_t i = 0; i < mTracks.size(); ++i) {
1788                 if (OK != processAccessUnitQueue(i)) {
1789                     return;
1790                 }
1791             }
1792             for (size_t i = 0; i < mTracks.size(); ++i) {
1793                 TrackInfo *trackInfo = &mTracks.editItemAt(i);
1794                 if (trackInfo->mEOSReceived) {
1795                     postQueueEOS(i, ERROR_END_OF_STREAM);
1796                     trackInfo->mEOSReceived = false;
1797                 }
1798             }
1799         }
1800     }
1801 
processAccessUnitQueueMyHandler1802     status_t processAccessUnitQueue(int32_t trackIndex) {
1803         TrackInfo *track = &mTracks.editItemAt(trackIndex);
1804         while (!track->mPackets.empty()) {
1805             sp<ABuffer> accessUnit = *track->mPackets.begin();
1806             track->mPackets.erase(track->mPackets.begin());
1807 
1808             uint32_t seqNum = (uint32_t)accessUnit->int32Data();
1809             if (track->mNewSegment) {
1810                 // The sequence number from RTP packet has only 16 bits and is extended
1811                 // by ARTPSource. Only the low 16 bits of seq in RTP-Info of reply of
1812                 // RTSP "PLAY" command should be used to detect the first RTP packet
1813                 // after seeking.
1814                 if (mSeekable) {
1815                     if (track->mAllowedStaleAccessUnits > 0) {
1816                         uint32_t seqNum16 = seqNum & 0xffff;
1817                         uint32_t firstSeqNumInSegment16 = track->mFirstSeqNumInSegment & 0xffff;
1818                         if (seqNum16 > firstSeqNumInSegment16 + kMaxAllowedStaleAccessUnits
1819                                 || seqNum16 < firstSeqNumInSegment16) {
1820                             // Not the first rtp packet of the stream after seeking, discarding.
1821                             track->mAllowedStaleAccessUnits--;
1822                             ALOGV("discarding stale access unit (0x%x : 0x%x)",
1823                                  seqNum, track->mFirstSeqNumInSegment);
1824                             continue;
1825                         }
1826                         ALOGW_IF(seqNum16 != firstSeqNumInSegment16,
1827                                 "Missing the first packet(%u), now take packet(%u) as first one",
1828                                 track->mFirstSeqNumInSegment, seqNum);
1829                     } else { // track->mAllowedStaleAccessUnits <= 0
1830                         mNumAccessUnitsReceived = 0;
1831                         ALOGW_IF(track->mAllowedStaleAccessUnits == 0,
1832                              "Still no first rtp packet after %d stale ones",
1833                              kMaxAllowedStaleAccessUnits);
1834                         track->mAllowedStaleAccessUnits = -1;
1835                         return UNKNOWN_ERROR;
1836                     }
1837                 }
1838 
1839                 // Now found the first rtp packet of the stream after seeking.
1840                 track->mFirstSeqNumInSegment = seqNum;
1841                 track->mNewSegment = false;
1842             }
1843 
1844             if (seqNum < track->mFirstSeqNumInSegment) {
1845                 ALOGV("dropping stale access-unit (%d < %d)",
1846                      seqNum, track->mFirstSeqNumInSegment);
1847                 continue;
1848             }
1849 
1850             if (addMediaTimestamp(trackIndex, track, accessUnit)) {
1851                 postQueueAccessUnit(trackIndex, accessUnit);
1852             }
1853         }
1854         return OK;
1855     }
1856 
onAccessUnitCompleteMyHandler1857     void onAccessUnitComplete(
1858             int32_t trackIndex, const sp<ABuffer> &accessUnit) {
1859         TrackInfo *track = &mTracks.editItemAt(trackIndex);
1860         track->mPackets.push_back(accessUnit);
1861 
1862         uint32_t seqNum = (uint32_t)accessUnit->int32Data();
1863         ALOGV("onAccessUnitComplete track %d storing accessunit %u", trackIndex, seqNum);
1864 
1865         if(!mPlayResponseParsed){
1866             ALOGV("play response is not parsed");
1867             return;
1868         }
1869 
1870         handleFirstAccessUnit();
1871 
1872         if (!mAllTracksHaveTime) {
1873             ALOGV("storing accessUnit, no time established yet");
1874             return;
1875         }
1876 
1877         if (OK != processAccessUnitQueue(trackIndex)) {
1878             return;
1879         }
1880 
1881         if (track->mEOSReceived) {
1882             postQueueEOS(trackIndex, ERROR_END_OF_STREAM);
1883             track->mEOSReceived = false;
1884         }
1885     }
1886 
addMediaTimestampMyHandler1887     bool addMediaTimestamp(
1888             int32_t trackIndex, const TrackInfo *track,
1889             const sp<ABuffer> &accessUnit) {
1890         UNUSED_UNLESS_VERBOSE(trackIndex);
1891 
1892         uint32_t rtpTime;
1893         CHECK(accessUnit->meta()->findInt32(
1894                     "rtp-time", (int32_t *)&rtpTime));
1895 
1896         int64_t relRtpTimeUs =
1897             (((int64_t)rtpTime - (int64_t)track->mRTPAnchor) * 1000000ll)
1898                 / track->mTimeScale;
1899 
1900         int64_t ntpTimeUs = track->mNTPAnchorUs + relRtpTimeUs;
1901 
1902         int64_t mediaTimeUs = mMediaAnchorUs + ntpTimeUs - mNTPAnchorUs;
1903 
1904         if (mediaTimeUs > mLastMediaTimeUs) {
1905             mLastMediaTimeUs = mediaTimeUs;
1906         }
1907 
1908         if (mediaTimeUs < 0) {
1909             ALOGV("dropping early accessUnit.");
1910             return false;
1911         }
1912 
1913         ALOGV("track %d rtpTime=%u mediaTimeUs = %lld us (%.2f secs)",
1914              trackIndex, rtpTime, (long long)mediaTimeUs, mediaTimeUs / 1E6);
1915 
1916         accessUnit->meta()->setInt64("timeUs", mediaTimeUs);
1917 
1918         return true;
1919     }
1920 
postQueueAccessUnitMyHandler1921     void postQueueAccessUnit(
1922             size_t trackIndex, const sp<ABuffer> &accessUnit) {
1923         sp<AMessage> msg = mNotify->dup();
1924         msg->setInt32("what", kWhatAccessUnit);
1925         msg->setSize("trackIndex", trackIndex);
1926         msg->setBuffer("accessUnit", accessUnit);
1927         msg->post();
1928     }
1929 
postQueueEOSMyHandler1930     void postQueueEOS(size_t trackIndex, status_t finalResult) {
1931         sp<AMessage> msg = mNotify->dup();
1932         msg->setInt32("what", kWhatEOS);
1933         msg->setSize("trackIndex", trackIndex);
1934         msg->setInt32("finalResult", finalResult);
1935         msg->post();
1936     }
1937 
postQueueSeekDiscontinuityMyHandler1938     void postQueueSeekDiscontinuity(size_t trackIndex) {
1939         sp<AMessage> msg = mNotify->dup();
1940         msg->setInt32("what", kWhatSeekDiscontinuity);
1941         msg->setSize("trackIndex", trackIndex);
1942         msg->post();
1943     }
1944 
postNormalPlayTimeMappingMyHandler1945     void postNormalPlayTimeMapping(
1946             size_t trackIndex, uint32_t rtpTime, int64_t nptUs) {
1947         sp<AMessage> msg = mNotify->dup();
1948         msg->setInt32("what", kWhatNormalPlayTimeMapping);
1949         msg->setSize("trackIndex", trackIndex);
1950         msg->setInt32("rtpTime", rtpTime);
1951         msg->setInt64("nptUs", nptUs);
1952         msg->post();
1953     }
1954 
postTimeoutMyHandler1955     void postTimeout() {
1956         sp<AMessage> timeout = new AMessage('tiou', this);
1957         mCheckTimeoutGeneration++;
1958         timeout->setInt32("tioucheck", mCheckTimeoutGeneration);
1959 
1960         int64_t startupTimeoutUs;
1961         startupTimeoutUs = property_get_int64("media.rtsp.timeout-us", kStartupTimeoutUs);
1962         timeout->post(startupTimeoutUs);
1963     }
1964 
1965     DISALLOW_EVIL_CONSTRUCTORS(MyHandler);
1966 };
1967 
1968 }  // namespace android
1969 
1970 #endif  // MY_HANDLER_H_
1971