1 /*
2  * Copyright (C) 2010 The Android Open Source Project
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *      http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 #ifndef MY_HANDLER_H_
18 
19 #define MY_HANDLER_H_
20 
21 //#define LOG_NDEBUG 0
22 
23 #ifndef LOG_TAG
24 #define LOG_TAG "MyHandler"
25 #endif
26 
27 #include <utils/Log.h>
28 #include <cutils/properties.h> // for property_get
29 
30 #include "APacketSource.h"
31 #include "ARTPConnection.h"
32 #include "ARTSPConnection.h"
33 #include "ASessionDescription.h"
34 #include "NetworkUtils.h"
35 
36 #include <ctype.h>
37 #include <cutils/properties.h>
38 
39 #include <datasource/HTTPBase.h>
40 #include <media/stagefright/foundation/ABuffer.h>
41 #include <media/stagefright/foundation/ADebug.h>
42 #include <media/stagefright/foundation/ALooper.h>
43 #include <media/stagefright/foundation/AMessage.h>
44 #include <media/stagefright/MediaErrors.h>
45 #include <media/stagefright/Utils.h>
46 #include <media/stagefright/FoundationUtils.h>
47 
48 #include <arpa/inet.h>
49 #include <sys/socket.h>
50 #include <netdb.h>
51 
52 
53 #if LOG_NDEBUG
54 #define UNUSED_UNLESS_VERBOSE(x) (void)(x)
55 #else
56 #define UNUSED_UNLESS_VERBOSE(x)
57 #endif
58 
59 #ifndef FALLTHROUGH_INTENDED
60 #define FALLTHROUGH_INTENDED [[clang::fallthrough]]  // NOLINT
61 #endif
62 
63 // If no access units are received within 5 secs, assume that the rtp
64 // stream has ended and signal end of stream.
65 static int64_t kAccessUnitTimeoutUs = 10000000ll;
66 
67 // If no access units arrive for the first 10 secs after starting the
68 // stream, assume none ever will and signal EOS or switch transports.
69 static int64_t kStartupTimeoutUs = 10000000ll;
70 
71 static int64_t kDefaultKeepAliveTimeoutUs = 60000000ll;
72 
73 static int64_t kPauseDelayUs = 3000000ll;
74 
75 // The allowed maximum number of stale access units at the beginning of
76 // a new sequence.
77 static int32_t kMaxAllowedStaleAccessUnits = 20;
78 
79 static int64_t kTearDownTimeoutUs = 3000000ll;
80 
81 namespace android {
82 
GetAttribute(const char * s,const char * key,AString * value)83 static bool GetAttribute(const char *s, const char *key, AString *value) {
84     value->clear();
85 
86     size_t keyLen = strlen(key);
87 
88     for (;;) {
89         while (isspace(*s)) {
90             ++s;
91         }
92 
93         const char *colonPos = strchr(s, ';');
94 
95         size_t len =
96             (colonPos == NULL) ? strlen(s) : colonPos - s;
97 
98         if (len >= keyLen + 1 && s[keyLen] == '=' && !strncmp(s, key, keyLen)) {
99             value->setTo(&s[keyLen + 1], len - keyLen - 1);
100             return true;
101         }
102 
103         if (colonPos == NULL) {
104             return false;
105         }
106 
107         s = colonPos + 1;
108     }
109 }
110 
111 struct MyHandler : public AHandler {
112     enum {
113         kWhatConnected                  = 'conn',
114         kWhatDisconnected               = 'disc',
115         kWhatSeekPaused                 = 'spau',
116         kWhatSeekDone                   = 'sdon',
117 
118         kWhatAccessUnit                 = 'accU',
119         kWhatEOS                        = 'eos!',
120         kWhatSeekDiscontinuity          = 'seeD',
121         kWhatNormalPlayTimeMapping      = 'nptM',
122     };
123 
124     MyHandler(
125             const char *url,
126             const sp<AMessage> &notify,
127             bool uidValid = false, uid_t uid = 0)
mNotifyMyHandler128         : mNotify(notify),
129           mUIDValid(uidValid),
130           mUID(uid),
131           mNetLooper(new ALooper),
132           mConn(new ARTSPConnection(mUIDValid, mUID)),
133           mRTPConn(new ARTPConnection),
134           mOriginalSessionURL(url),
135           mSessionURL(url),
136           mSetupTracksSuccessful(false),
137           mSeekPending(false),
138           mFirstAccessUnit(true),
139           mAllTracksHaveTime(false),
140           mNTPAnchorUs(-1),
141           mMediaAnchorUs(-1),
142           mLastMediaTimeUs(0),
143           mNumAccessUnitsReceived(0),
144           mCheckPending(false),
145           mCheckGeneration(0),
146           mCheckTimeoutGeneration(0),
147           mTryTCPInterleaving(property_get_bool("rtp.transport.TCP", false)),
148           mTryFakeRTCP(false),
149           mReceivedFirstRTCPPacket(false),
150           mReceivedFirstRTPPacket(false),
151           mSeekable(true),
152           mKeepAliveTimeoutUs(kDefaultKeepAliveTimeoutUs),
153           mKeepAliveGeneration(0),
154           mPausing(false),
155           mPauseGeneration(0),
156           mPlayResponseParsed(false) {
157         mNetLooper->setName("rtsp net");
158         mNetLooper->start(false /* runOnCallingThread */,
159                           false /* canCallJava */,
160                           PRIORITY_HIGHEST);
161 
162         // Strip any authentication info from the session url, we don't
163         // want to transmit user/pass in cleartext.
164         AString host, path, user, pass;
165         unsigned port;
166         CHECK(ARTSPConnection::ParseURL(
167                     mSessionURL.c_str(), &host, &port, &path, &user, &pass));
168 
169         if (user.size() > 0) {
170             mSessionURL.clear();
171             mSessionURL.append("rtsp://");
172             mSessionURL.append(host);
173             mSessionURL.append(":");
174             mSessionURL.append(AStringPrintf("%u", port));
175             mSessionURL.append(path);
176 
177             ALOGV("rewritten session url: '%s'", mSessionURL.c_str());
178         }
179 
180         mSessionHost = host;
181     }
182 
connectMyHandler183     void connect() {
184         looper()->registerHandler(mConn);
185         (1 ? mNetLooper : looper())->registerHandler(mRTPConn);
186 
187         sp<AMessage> notify = new AMessage('biny', this);
188         mConn->observeBinaryData(notify);
189 
190         sp<AMessage> reply = new AMessage('conn', this);
191         mConn->connect(mOriginalSessionURL.c_str(), reply);
192     }
193 
loadSDPMyHandler194     void loadSDP(const sp<ASessionDescription>& desc) {
195         looper()->registerHandler(mConn);
196         (1 ? mNetLooper : looper())->registerHandler(mRTPConn);
197 
198         sp<AMessage> notify = new AMessage('biny', this);
199         mConn->observeBinaryData(notify);
200 
201         sp<AMessage> reply = new AMessage('sdpl', this);
202         reply->setObject("description", desc);
203         mConn->connect(mOriginalSessionURL.c_str(), reply);
204     }
205 
getControlURLMyHandler206     AString getControlURL() {
207         AString sessionLevelControlURL;
208         if (mSessionDesc->findAttribute(
209                 0,
210                 "a=control",
211                 &sessionLevelControlURL)) {
212             if (sessionLevelControlURL.compare("*") == 0) {
213                 return mBaseURL;
214             } else {
215                 AString controlURL;
216                 CHECK(MakeURL(
217                         mBaseURL.c_str(),
218                         sessionLevelControlURL.c_str(),
219                         &controlURL));
220                 return controlURL;
221             }
222         } else {
223             return mSessionURL;
224         }
225     }
226 
disconnectMyHandler227     void disconnect() {
228         (new AMessage('abor', this))->post();
229     }
230 
seekMyHandler231     void seek(int64_t timeUs) {
232         sp<AMessage> msg = new AMessage('seek', this);
233         msg->setInt64("time", timeUs);
234         mPauseGeneration++;
235         msg->post();
236     }
237 
continueSeekAfterPauseMyHandler238     void continueSeekAfterPause(int64_t timeUs) {
239         sp<AMessage> msg = new AMessage('see1', this);
240         msg->setInt64("time", timeUs);
241         msg->post();
242     }
243 
isSeekableMyHandler244     bool isSeekable() const {
245         return mSeekable;
246     }
247 
pauseMyHandler248     void pause() {
249         sp<AMessage> msg = new AMessage('paus', this);
250         mPauseGeneration++;
251         msg->setInt32("pausecheck", mPauseGeneration);
252         msg->post();
253     }
254 
resumeMyHandler255     void resume() {
256         sp<AMessage> msg = new AMessage('resu', this);
257         mPauseGeneration++;
258         msg->post();
259     }
260 
getARTSPConnectionMyHandler261     sp<ARTSPConnection> getARTSPConnection() {
262       return mConn;
263     }
264 
addRRMyHandler265     static void addRR(const sp<ABuffer> &buf) {
266         uint8_t *ptr = buf->data() + buf->size();
267         ptr[0] = 0x80 | 0;
268         ptr[1] = 201;  // RR
269         ptr[2] = 0;
270         ptr[3] = 1;
271         ptr[4] = 0xde;  // SSRC
272         ptr[5] = 0xad;
273         ptr[6] = 0xbe;
274         ptr[7] = 0xef;
275 
276         buf->setRange(0, buf->size() + 8);
277     }
278 
addSDESMyHandler279     static void addSDES(int s, const sp<ABuffer> &buffer) {
280         struct sockaddr_in addr;
281         socklen_t addrSize = sizeof(addr);
282         if (getsockname(s, (sockaddr *)&addr, &addrSize) != 0) {
283             inet_aton("0.0.0.0", &(addr.sin_addr));
284         }
285 
286         uint8_t *data = buffer->data() + buffer->size();
287         data[0] = 0x80 | 1;
288         data[1] = 202;  // SDES
289         data[4] = 0xde;  // SSRC
290         data[5] = 0xad;
291         data[6] = 0xbe;
292         data[7] = 0xef;
293 
294         size_t offset = 8;
295 
296         data[offset++] = 1;  // CNAME
297 
298         AString cname = "stagefright@";
299         cname.append(inet_ntoa(addr.sin_addr));
300         data[offset++] = cname.size();
301 
302         memcpy(&data[offset], cname.c_str(), cname.size());
303         offset += cname.size();
304 
305         data[offset++] = 6;  // TOOL
306 
307         AString tool = MakeUserAgent();
308 
309         data[offset++] = tool.size();
310 
311         memcpy(&data[offset], tool.c_str(), tool.size());
312         offset += tool.size();
313 
314         data[offset++] = 0;
315 
316         if ((offset % 4) > 0) {
317             size_t count = 4 - (offset % 4);
318             switch (count) {
319                 case 3:
320                     data[offset++] = 0;
321                     FALLTHROUGH_INTENDED;
322                 case 2:
323                     data[offset++] = 0;
324                     FALLTHROUGH_INTENDED;
325                 case 1:
326                     data[offset++] = 0;
327             }
328         }
329 
330         size_t numWords = (offset / 4) - 1;
331         data[2] = numWords >> 8;
332         data[3] = numWords & 0xff;
333 
334         buffer->setRange(buffer->offset(), buffer->size() + offset);
335     }
336 
337     // In case we're behind NAT, fire off two UDP packets to the remote
338     // rtp/rtcp ports to poke a hole into the firewall for future incoming
339     // packets. We're going to send an RR/SDES RTCP packet to both of them.
pokeAHoleMyHandler340     bool pokeAHole(int rtpSocket, int rtcpSocket, const AString &transport) {
341         struct sockaddr_in addr;
342         memset(addr.sin_zero, 0, sizeof(addr.sin_zero));
343         addr.sin_family = AF_INET;
344 
345         AString source;
346         AString server_port;
347         if (!GetAttribute(transport.c_str(),
348                           "source",
349                           &source)) {
350             ALOGW("Missing 'source' field in Transport response. Using "
351                  "RTSP endpoint address.");
352 
353             struct hostent *ent = gethostbyname(mSessionHost.c_str());
354             if (ent == NULL) {
355                 ALOGE("Failed to look up address of session host");
356 
357                 return false;
358             }
359 
360             addr.sin_addr.s_addr = *(in_addr_t *)ent->h_addr;
361         } else {
362             addr.sin_addr.s_addr = inet_addr(source.c_str());
363         }
364 
365         if (!GetAttribute(transport.c_str(),
366                                  "server_port",
367                                  &server_port)) {
368             ALOGI("Missing 'server_port' field in Transport response.");
369             return false;
370         }
371 
372         int rtpPort, rtcpPort;
373         if (sscanf(server_port.c_str(), "%d-%d", &rtpPort, &rtcpPort) != 2
374                 || rtpPort <= 0 || rtpPort > 65535
375                 || rtcpPort <=0 || rtcpPort > 65535
376                 || rtcpPort != rtpPort + 1) {
377             ALOGE("Server picked invalid RTP/RTCP port pair %s,"
378                  " RTP port must be even, RTCP port must be one higher.",
379                  server_port.c_str());
380 
381             return false;
382         }
383 
384         if (rtpPort & 1) {
385             ALOGW("Server picked an odd RTP port, it should've picked an "
386                  "even one, we'll let it pass for now, but this may break "
387                  "in the future.");
388         }
389 
390         if (addr.sin_addr.s_addr == INADDR_NONE) {
391             return true;
392         }
393 
394         if (IN_LOOPBACK(ntohl(addr.sin_addr.s_addr))) {
395             // No firewalls to traverse on the loopback interface.
396             return true;
397         }
398 
399         // Make up an RR/SDES RTCP packet.
400         sp<ABuffer> buf = new ABuffer(65536);
401         buf->setRange(0, 0);
402         addRR(buf);
403         addSDES(rtpSocket, buf);
404 
405         addr.sin_port = htons(rtpPort);
406 
407         ssize_t n = sendto(
408                 rtpSocket, buf->data(), buf->size(), 0,
409                 (const sockaddr *)&addr, sizeof(addr));
410 
411         if (n < (ssize_t)buf->size()) {
412             ALOGE("failed to poke a hole for RTP packets");
413             return false;
414         }
415 
416         addr.sin_port = htons(rtcpPort);
417 
418         n = sendto(
419                 rtcpSocket, buf->data(), buf->size(), 0,
420                 (const sockaddr *)&addr, sizeof(addr));
421 
422         if (n < (ssize_t)buf->size()) {
423             ALOGE("failed to poke a hole for RTCP packets");
424             return false;
425         }
426 
427         ALOGV("successfully poked holes.");
428 
429         return true;
430     }
431 
isLiveStreamMyHandler432     static bool isLiveStream(const sp<ASessionDescription> &desc) {
433         AString attrLiveStream;
434         if (desc->findAttribute(0, "a=LiveStream", &attrLiveStream)) {
435             ssize_t semicolonPos = attrLiveStream.find(";", 2);
436 
437             const char* liveStreamValue;
438             if (semicolonPos < 0) {
439                 liveStreamValue = attrLiveStream.c_str();
440             } else {
441                 AString valString;
442                 valString.setTo(attrLiveStream,
443                         semicolonPos + 1,
444                         attrLiveStream.size() - semicolonPos - 1);
445                 liveStreamValue = valString.c_str();
446             }
447 
448             uint32_t value = strtoul(liveStreamValue, NULL, 10);
449             if (value == 1) {
450                 ALOGV("found live stream");
451                 return true;
452             }
453         } else {
454             // It is a live stream if no duration is returned
455             int64_t durationUs;
456             if (!desc->getDurationUs(&durationUs)) {
457                 ALOGV("No duration found, assume live stream");
458                 return true;
459             }
460         }
461 
462         return false;
463     }
464 
onMessageReceivedMyHandler465     virtual void onMessageReceived(const sp<AMessage> &msg) {
466         switch (msg->what()) {
467             case 'conn':
468             {
469                 int32_t result;
470                 CHECK(msg->findInt32("result", &result));
471 
472                 ALOGI("connection request completed with result %d (%s)",
473                      result, strerror(-result));
474 
475                 if (result == OK) {
476                     AString request;
477                     request = "DESCRIBE ";
478                     request.append(mSessionURL);
479                     request.append(" RTSP/1.0\r\n");
480                     request.append("Accept: application/sdp\r\n");
481                     request.append("\r\n");
482 
483                     sp<AMessage> reply = new AMessage('desc', this);
484                     mConn->sendRequest(request.c_str(), reply);
485                 } else {
486                     (new AMessage('disc', this))->post();
487                 }
488                 break;
489             }
490 
491             case 'disc':
492             {
493                 ++mKeepAliveGeneration;
494 
495                 int32_t reconnect;
496                 if (msg->findInt32("reconnect", &reconnect) && reconnect) {
497                     sp<AMessage> reply = new AMessage('conn', this);
498                     mConn->connect(mOriginalSessionURL.c_str(), reply);
499                 } else {
500                     (new AMessage('quit', this))->post();
501                 }
502                 break;
503             }
504 
505             case 'desc':
506             {
507                 int32_t result;
508                 CHECK(msg->findInt32("result", &result));
509 
510                 ALOGI("DESCRIBE completed with result %d (%s)",
511                      result, strerror(-result));
512 
513                 if (result == OK) {
514                     sp<RefBase> obj;
515                     CHECK(msg->findObject("response", &obj));
516                     sp<ARTSPResponse> response =
517                         static_cast<ARTSPResponse *>(obj.get());
518 
519                     if (response->mStatusCode == 301 || response->mStatusCode == 302) {
520                         ssize_t i = response->mHeaders.indexOfKey("location");
521                         CHECK_GE(i, 0);
522 
523                         mOriginalSessionURL = response->mHeaders.valueAt(i);
524                         mSessionURL = mOriginalSessionURL;
525 
526                         // Strip any authentication info from the session url, we don't
527                         // want to transmit user/pass in cleartext.
528                         AString host, path, user, pass;
529                         unsigned port;
530                         if (ARTSPConnection::ParseURL(
531                                     mSessionURL.c_str(), &host, &port, &path, &user, &pass)
532                                 && user.size() > 0) {
533                             mSessionURL.clear();
534                             mSessionURL.append("rtsp://");
535                             mSessionURL.append(host);
536                             mSessionURL.append(":");
537                             mSessionURL.append(AStringPrintf("%u", port));
538                             mSessionURL.append(path);
539 
540                             ALOGV("rewritten session url: '%s'", mSessionURL.c_str());
541                         }
542 
543                         sp<AMessage> reply = new AMessage('conn', this);
544                         mConn->connect(mOriginalSessionURL.c_str(), reply);
545                         break;
546                     }
547 
548                     if (response->mStatusCode != 200) {
549                         result = UNKNOWN_ERROR;
550                     } else if (response->mContent == NULL) {
551                         result = ERROR_MALFORMED;
552                         ALOGE("The response has no content.");
553                     } else {
554                         mSessionDesc = new ASessionDescription;
555 
556                         mSessionDesc->setTo(
557                                 response->mContent->data(),
558                                 response->mContent->size());
559 
560                         if (!mSessionDesc->isValid()) {
561                             ALOGE("Failed to parse session description.");
562                             result = ERROR_MALFORMED;
563                         } else {
564                             ssize_t i = response->mHeaders.indexOfKey("content-base");
565                             if (i >= 0) {
566                                 mBaseURL = response->mHeaders.valueAt(i);
567                             } else {
568                                 i = response->mHeaders.indexOfKey("content-location");
569                                 if (i >= 0) {
570                                     mBaseURL = response->mHeaders.valueAt(i);
571                                 } else {
572                                     mBaseURL = mSessionURL;
573                                 }
574                             }
575 
576                             mSeekable = !isLiveStream(mSessionDesc);
577 
578                             if (!mBaseURL.startsWith("rtsp://")) {
579                                 // Some misbehaving servers specify a relative
580                                 // URL in one of the locations above, combine
581                                 // it with the absolute session URL to get
582                                 // something usable...
583 
584                                 ALOGW("Server specified a non-absolute base URL"
585                                      ", combining it with the session URL to "
586                                      "get something usable...");
587 
588                                 AString tmp;
589                                 CHECK(MakeURL(
590                                             mSessionURL.c_str(),
591                                             mBaseURL.c_str(),
592                                             &tmp));
593 
594                                 mBaseURL = tmp;
595                             }
596 
597                             mControlURL = getControlURL();
598 
599                             if (mSessionDesc->countTracks() < 2) {
600                                 // There's no actual tracks in this session.
601                                 // The first "track" is merely session meta
602                                 // data.
603 
604                                 ALOGW("Session doesn't contain any playable "
605                                      "tracks. Aborting.");
606                                 result = ERROR_UNSUPPORTED;
607                             } else {
608                                 setupTrack(1);
609                             }
610                         }
611                     }
612                 }
613 
614                 if (result != OK) {
615                     sp<AMessage> reply = new AMessage('disc', this);
616                     mConn->disconnect(reply);
617                 }
618                 break;
619             }
620 
621             case 'sdpl':
622             {
623                 int32_t result;
624                 CHECK(msg->findInt32("result", &result));
625 
626                 ALOGI("SDP connection request completed with result %d (%s)",
627                      result, strerror(-result));
628 
629                 if (result == OK) {
630                     sp<RefBase> obj;
631                     CHECK(msg->findObject("description", &obj));
632                     mSessionDesc =
633                         static_cast<ASessionDescription *>(obj.get());
634 
635                     if (!mSessionDesc->isValid()) {
636                         ALOGE("Failed to parse session description.");
637                         result = ERROR_MALFORMED;
638                     } else {
639                         mBaseURL = mSessionURL;
640 
641                         mSeekable = !isLiveStream(mSessionDesc);
642 
643                         mControlURL = getControlURL();
644 
645                         if (mSessionDesc->countTracks() < 2) {
646                             // There's no actual tracks in this session.
647                             // The first "track" is merely session meta
648                             // data.
649 
650                             ALOGW("Session doesn't contain any playable "
651                                  "tracks. Aborting.");
652                             result = ERROR_UNSUPPORTED;
653                         } else {
654                             setupTrack(1);
655                         }
656                     }
657                 }
658 
659                 if (result != OK) {
660                     sp<AMessage> reply = new AMessage('disc', this);
661                     mConn->disconnect(reply);
662                 }
663                 break;
664             }
665 
666             case 'setu':
667             {
668                 size_t index;
669                 CHECK(msg->findSize("index", &index));
670 
671                 TrackInfo *track = NULL;
672                 size_t trackIndex;
673                 if (msg->findSize("track-index", &trackIndex)) {
674                     track = &mTracks.editItemAt(trackIndex);
675                 }
676 
677                 int32_t result;
678                 CHECK(msg->findInt32("result", &result));
679 
680                 ALOGI("SETUP(%zu) completed with result %d (%s)",
681                      index, result, strerror(-result));
682 
683                 if (result == OK) {
684                     CHECK(track != NULL);
685 
686                     sp<RefBase> obj;
687                     CHECK(msg->findObject("response", &obj));
688                     sp<ARTSPResponse> response =
689                         static_cast<ARTSPResponse *>(obj.get());
690 
691                     if (response->mStatusCode != 200) {
692                         result = UNKNOWN_ERROR;
693                     } else {
694                         ssize_t i = response->mHeaders.indexOfKey("session");
695                         CHECK_GE(i, 0);
696 
697                         mSessionID = response->mHeaders.valueAt(i);
698 
699                         mKeepAliveTimeoutUs = kDefaultKeepAliveTimeoutUs;
700                         AString timeoutStr;
701                         if (GetAttribute(
702                                     mSessionID.c_str(), "timeout", &timeoutStr)) {
703                             char *end;
704                             unsigned long timeoutSecs =
705                                 strtoul(timeoutStr.c_str(), &end, 10);
706 
707                             if (end == timeoutStr.c_str() || *end != '\0') {
708                                 ALOGW("server specified malformed timeout '%s'",
709                                      timeoutStr.c_str());
710 
711                                 mKeepAliveTimeoutUs = kDefaultKeepAliveTimeoutUs;
712                             } else if (timeoutSecs < 15) {
713                                 ALOGW("server specified too short a timeout "
714                                      "(%lu secs), using default.",
715                                      timeoutSecs);
716 
717                                 mKeepAliveTimeoutUs = kDefaultKeepAliveTimeoutUs;
718                             } else {
719                                 mKeepAliveTimeoutUs = timeoutSecs * 1000000ll;
720 
721                                 ALOGI("server specified timeout of %lu secs.",
722                                      timeoutSecs);
723                             }
724                         }
725 
726                         i = mSessionID.find(";");
727                         if (i >= 0) {
728                             // Remove options, i.e. ";timeout=90"
729                             mSessionID.erase(i, mSessionID.size() - i);
730                         }
731 
732                         sp<AMessage> notify = new AMessage('accu', this);
733                         notify->setSize("track-index", trackIndex);
734 
735                         i = response->mHeaders.indexOfKey("transport");
736                         CHECK_GE(i, 0);
737 
738                         if (track->mRTPSocket != -1 && track->mRTCPSocket != -1) {
739                             if (!track->mUsingInterleavedTCP) {
740                                 AString transport = response->mHeaders.valueAt(i);
741 
742                                 // We are going to continue even if we were
743                                 // unable to poke a hole into the firewall...
744                                 pokeAHole(
745                                         track->mRTPSocket,
746                                         track->mRTCPSocket,
747                                         transport);
748                             }
749 
750                             mRTPConn->addStream(
751                                     track->mRTPSocket, track->mRTCPSocket,
752                                     mSessionDesc, index,
753                                     notify, track->mUsingInterleavedTCP);
754 
755                             mSetupTracksSuccessful = true;
756                         } else {
757                             result = BAD_VALUE;
758                         }
759                     }
760                 }
761 
762                 if (result != OK) {
763                     if (track) {
764                         if (!track->mUsingInterleavedTCP) {
765                             // Clear the tag
766                             if (mUIDValid) {
767                                 NetworkUtils::UnRegisterSocketUserTag(track->mRTPSocket);
768                                 NetworkUtils::UnRegisterSocketUserMark(track->mRTPSocket);
769                                 NetworkUtils::UnRegisterSocketUserTag(track->mRTCPSocket);
770                                 NetworkUtils::UnRegisterSocketUserMark(track->mRTCPSocket);
771                             }
772 
773                             close(track->mRTPSocket);
774                             close(track->mRTCPSocket);
775                         }
776 
777                         mTracks.removeItemsAt(trackIndex);
778                     }
779                 }
780 
781                 ++index;
782                 if (result == OK && index < mSessionDesc->countTracks()) {
783                     setupTrack(index);
784                 } else if (mSetupTracksSuccessful) {
785                     ++mKeepAliveGeneration;
786                     postKeepAlive();
787 
788                     AString request = "PLAY ";
789                     request.append(mControlURL);
790                     request.append(" RTSP/1.0\r\n");
791 
792                     request.append("Session: ");
793                     request.append(mSessionID);
794                     request.append("\r\n");
795 
796                     request.append("\r\n");
797 
798                     sp<AMessage> reply = new AMessage('play', this);
799                     mConn->sendRequest(request.c_str(), reply);
800                 } else {
801                     sp<AMessage> reply = new AMessage('disc', this);
802                     mConn->disconnect(reply);
803                 }
804                 break;
805             }
806 
807             case 'play':
808             {
809                 int32_t result;
810                 CHECK(msg->findInt32("result", &result));
811 
812                 ALOGI("PLAY completed with result %d (%s)",
813                      result, strerror(-result));
814 
815                 if (result == OK) {
816                     sp<RefBase> obj;
817                     CHECK(msg->findObject("response", &obj));
818                     sp<ARTSPResponse> response =
819                         static_cast<ARTSPResponse *>(obj.get());
820 
821                     if (response->mStatusCode != 200) {
822                         result = UNKNOWN_ERROR;
823                     } else {
824                         parsePlayResponse(response);
825                         postTimeout();
826                     }
827                 }
828 
829                 if (result != OK) {
830                     sp<AMessage> reply = new AMessage('disc', this);
831                     mConn->disconnect(reply);
832                 }
833 
834                 break;
835             }
836 
837             case 'aliv':
838             {
839                 int32_t generation;
840                 CHECK(msg->findInt32("generation", &generation));
841 
842                 if (generation != mKeepAliveGeneration) {
843                     // obsolete event.
844                     break;
845                 }
846 
847                 AString request;
848                 request.append("OPTIONS ");
849                 request.append(mSessionURL);
850                 request.append(" RTSP/1.0\r\n");
851                 request.append("Session: ");
852                 request.append(mSessionID);
853                 request.append("\r\n");
854                 request.append("\r\n");
855 
856                 sp<AMessage> reply = new AMessage('opts', this);
857                 reply->setInt32("generation", mKeepAliveGeneration);
858                 mConn->sendRequest(request.c_str(), reply);
859                 break;
860             }
861 
862             case 'opts':
863             {
864                 int32_t result;
865                 CHECK(msg->findInt32("result", &result));
866 
867                 ALOGI("OPTIONS completed with result %d (%s)",
868                      result, strerror(-result));
869 
870                 int32_t generation;
871                 CHECK(msg->findInt32("generation", &generation));
872 
873                 if (generation != mKeepAliveGeneration) {
874                     // obsolete event.
875                     break;
876                 }
877 
878                 postKeepAlive();
879                 break;
880             }
881 
882             case 'abor':
883             {
884                 for (size_t i = 0; i < mTracks.size(); ++i) {
885                     TrackInfo *info = &mTracks.editItemAt(i);
886 
887                     if (!mFirstAccessUnit) {
888                         postQueueEOS(i, ERROR_END_OF_STREAM);
889                     }
890 
891                     if (!info->mUsingInterleavedTCP) {
892                         mRTPConn->removeStream(info->mRTPSocket, info->mRTCPSocket);
893 
894                         // Clear the tag
895                         if (mUIDValid) {
896                             NetworkUtils::UnRegisterSocketUserTag(info->mRTPSocket);
897                             NetworkUtils::UnRegisterSocketUserMark(info->mRTPSocket);
898                             NetworkUtils::UnRegisterSocketUserTag(info->mRTCPSocket);
899                             NetworkUtils::UnRegisterSocketUserMark(info->mRTCPSocket);
900                         }
901 
902                         close(info->mRTPSocket);
903                         close(info->mRTCPSocket);
904                     }
905                 }
906                 mTracks.clear();
907                 mSetupTracksSuccessful = false;
908                 mSeekPending = false;
909                 mFirstAccessUnit = true;
910                 mAllTracksHaveTime = false;
911                 mNTPAnchorUs = -1;
912                 mMediaAnchorUs = -1;
913                 mNumAccessUnitsReceived = 0;
914                 mReceivedFirstRTCPPacket = false;
915                 mReceivedFirstRTPPacket = false;
916                 mPausing = false;
917                 mSeekable = true;
918 
919                 sp<AMessage> reply = new AMessage('tear', this);
920 
921                 int32_t reconnect;
922                 if (msg->findInt32("reconnect", &reconnect) && reconnect) {
923                     reply->setInt32("reconnect", true);
924                 }
925 
926                 AString request;
927                 request = "TEARDOWN ";
928 
929                 // XXX should use aggregate url from SDP here...
930                 request.append(mSessionURL);
931                 request.append(" RTSP/1.0\r\n");
932 
933                 request.append("Session: ");
934                 request.append(mSessionID);
935                 request.append("\r\n");
936 
937                 request.append("\r\n");
938 
939                 mConn->sendRequest(request.c_str(), reply);
940 
941                 // If the response of teardown hasn't been received in 3 seconds,
942                 // post 'tear' message to avoid ANR.
943                 if (!msg->findInt32("reconnect", &reconnect) || !reconnect) {
944                     sp<AMessage> teardown = reply->dup();
945                     teardown->setInt32("result", -ECONNABORTED);
946                     teardown->post(kTearDownTimeoutUs);
947                 }
948                 break;
949             }
950 
951             case 'tear':
952             {
953                 int32_t result;
954                 CHECK(msg->findInt32("result", &result));
955 
956                 ALOGI("TEARDOWN completed with result %d (%s)",
957                      result, strerror(-result));
958 
959                 sp<AMessage> reply = new AMessage('disc', this);
960 
961                 int32_t reconnect;
962                 if (msg->findInt32("reconnect", &reconnect) && reconnect) {
963                     reply->setInt32("reconnect", true);
964                 }
965 
966                 mConn->disconnect(reply);
967                 break;
968             }
969 
970             case 'quit':
971             {
972                 sp<AMessage> msg = mNotify->dup();
973                 msg->setInt32("what", kWhatDisconnected);
974                 msg->setInt32("result", UNKNOWN_ERROR);
975                 msg->post();
976                 break;
977             }
978 
979             case 'chek':
980             {
981                 int32_t generation;
982                 CHECK(msg->findInt32("generation", &generation));
983                 if (generation != mCheckGeneration) {
984                     // This is an outdated message. Ignore.
985                     break;
986                 }
987 
988                 if (mNumAccessUnitsReceived == 0) {
989 #if 1
990                     ALOGI("stream ended? aborting.");
991                     (new AMessage('abor', this))->post();
992                     break;
993 #else
994                     ALOGI("haven't seen an AU in a looong time.");
995 #endif
996                 }
997 
998                 mNumAccessUnitsReceived = 0;
999                 msg->post(kAccessUnitTimeoutUs);
1000                 break;
1001             }
1002 
1003             case 'accu':
1004             {
1005                 if (mSeekPending) {
1006                     ALOGV("Stale access unit.");
1007                     break;
1008                 }
1009 
1010                 int32_t timeUpdate;
1011                 if (msg->findInt32("time-update", &timeUpdate) && timeUpdate) {
1012                     size_t trackIndex;
1013                     CHECK(msg->findSize("track-index", &trackIndex));
1014 
1015                     uint32_t rtpTime;
1016                     uint64_t ntpTime;
1017                     CHECK(msg->findInt32("rtp-time", (int32_t *)&rtpTime));
1018                     CHECK(msg->findInt64("ntp-time", (int64_t *)&ntpTime));
1019 
1020                     onTimeUpdate(trackIndex, rtpTime, ntpTime);
1021                     break;
1022                 }
1023 
1024                 int32_t first;
1025                 if (msg->findInt32("first-rtcp", &first)) {
1026                     mReceivedFirstRTCPPacket = true;
1027                     break;
1028                 }
1029 
1030                 if (msg->findInt32("first-rtp", &first)) {
1031                     mReceivedFirstRTPPacket = true;
1032                     break;
1033                 }
1034 
1035                 ++mNumAccessUnitsReceived;
1036                 postAccessUnitTimeoutCheck();
1037 
1038                 size_t trackIndex;
1039                 CHECK(msg->findSize("track-index", &trackIndex));
1040 
1041                 if (trackIndex >= mTracks.size()) {
1042                     ALOGV("late packets ignored.");
1043                     break;
1044                 }
1045 
1046                 TrackInfo *track = &mTracks.editItemAt(trackIndex);
1047 
1048                 int32_t eos;
1049                 if (msg->findInt32("eos", &eos)) {
1050                     ALOGI("received BYE on track index %zu", trackIndex);
1051                     if (!mAllTracksHaveTime && dataReceivedOnAllChannels()) {
1052                         ALOGI("No time established => fake existing data");
1053 
1054                         track->mEOSReceived = true;
1055                         mTryFakeRTCP = true;
1056                         mReceivedFirstRTCPPacket = true;
1057                         fakeTimestamps();
1058                     } else {
1059                         postQueueEOS(trackIndex, ERROR_END_OF_STREAM);
1060                     }
1061                     return;
1062                 }
1063 
1064                 if (mSeekPending) {
1065                     ALOGV("we're seeking, dropping stale packet.");
1066                     break;
1067                 }
1068 
1069                 sp<ABuffer> accessUnit;
1070                 CHECK(msg->findBuffer("access-unit", &accessUnit));
1071                 onAccessUnitComplete(trackIndex, accessUnit);
1072                 break;
1073             }
1074 
1075             case 'paus':
1076             {
1077                 int32_t generation;
1078                 CHECK(msg->findInt32("pausecheck", &generation));
1079                 if (generation != mPauseGeneration) {
1080                     ALOGV("Ignoring outdated pause message.");
1081                     break;
1082                 }
1083 
1084                 if (!mSeekable) {
1085                     ALOGW("This is a live stream, ignoring pause request.");
1086                     break;
1087                 }
1088 
1089                 if (mPausing) {
1090                     ALOGV("This stream is already paused.");
1091                     break;
1092                 }
1093 
1094                 mCheckPending = true;
1095                 ++mCheckGeneration;
1096                 mPausing = true;
1097 
1098                 AString request = "PAUSE ";
1099                 request.append(mControlURL);
1100                 request.append(" RTSP/1.0\r\n");
1101 
1102                 request.append("Session: ");
1103                 request.append(mSessionID);
1104                 request.append("\r\n");
1105 
1106                 request.append("\r\n");
1107 
1108                 sp<AMessage> reply = new AMessage('pau2', this);
1109                 mConn->sendRequest(request.c_str(), reply);
1110                 break;
1111             }
1112 
1113             case 'pau2':
1114             {
1115                 int32_t result;
1116                 CHECK(msg->findInt32("result", &result));
1117                 mCheckTimeoutGeneration++;
1118 
1119                 ALOGI("PAUSE completed with result %d (%s)",
1120                      result, strerror(-result));
1121                 break;
1122             }
1123 
1124             case 'resu':
1125             {
1126                 if (mPausing && mSeekPending) {
1127                     // If seeking, Play will be sent from see1 instead
1128                     break;
1129                 }
1130 
1131                 if (!mPausing) {
1132                     // Dont send PLAY if we have not paused
1133                     break;
1134                 }
1135                 AString request = "PLAY ";
1136                 request.append(mControlURL);
1137                 request.append(" RTSP/1.0\r\n");
1138 
1139                 request.append("Session: ");
1140                 request.append(mSessionID);
1141                 request.append("\r\n");
1142 
1143                 request.append("\r\n");
1144 
1145                 sp<AMessage> reply = new AMessage('res2', this);
1146                 mConn->sendRequest(request.c_str(), reply);
1147                 break;
1148             }
1149 
1150             case 'res2':
1151             {
1152                 int32_t result;
1153                 CHECK(msg->findInt32("result", &result));
1154 
1155                 ALOGI("PLAY (for resume) completed with result %d (%s)",
1156                      result, strerror(-result));
1157 
1158                 mCheckPending = false;
1159                 ++mCheckGeneration;
1160                 postAccessUnitTimeoutCheck();
1161 
1162                 if (result == OK) {
1163                     sp<RefBase> obj;
1164                     CHECK(msg->findObject("response", &obj));
1165                     sp<ARTSPResponse> response =
1166                         static_cast<ARTSPResponse *>(obj.get());
1167 
1168                     if (response->mStatusCode != 200) {
1169                         result = UNKNOWN_ERROR;
1170                     } else {
1171                         parsePlayResponse(response);
1172 
1173                         // Post new timeout in order to make sure to use
1174                         // fake timestamps if no new Sender Reports arrive
1175                         postTimeout();
1176                     }
1177                 }
1178 
1179                 if (result != OK) {
1180                     ALOGE("resume failed, aborting.");
1181                     (new AMessage('abor', this))->post();
1182                 }
1183 
1184                 mPausing = false;
1185                 break;
1186             }
1187 
1188             case 'seek':
1189             {
1190                 if (!mSeekable) {
1191                     ALOGW("This is a live stream, ignoring seek request.");
1192 
1193                     sp<AMessage> msg = mNotify->dup();
1194                     msg->setInt32("what", kWhatSeekDone);
1195                     msg->post();
1196                     break;
1197                 }
1198 
1199                 int64_t timeUs;
1200                 CHECK(msg->findInt64("time", &timeUs));
1201 
1202                 mSeekPending = true;
1203 
1204                 // Disable the access unit timeout until we resumed
1205                 // playback again.
1206                 mCheckPending = true;
1207                 ++mCheckGeneration;
1208 
1209                 sp<AMessage> reply = new AMessage('see0', this);
1210                 reply->setInt64("time", timeUs);
1211 
1212                 if (mPausing) {
1213                     // PAUSE already sent
1214                     ALOGI("Pause already sent");
1215                     reply->post();
1216                     break;
1217                 }
1218                 AString request = "PAUSE ";
1219                 request.append(mControlURL);
1220                 request.append(" RTSP/1.0\r\n");
1221 
1222                 request.append("Session: ");
1223                 request.append(mSessionID);
1224                 request.append("\r\n");
1225 
1226                 request.append("\r\n");
1227 
1228                 mConn->sendRequest(request.c_str(), reply);
1229                 break;
1230             }
1231 
1232             case 'see0':
1233             {
1234                 // Session is paused now.
1235                 status_t err = OK;
1236                 msg->findInt32("result", &err);
1237 
1238                 int64_t timeUs;
1239                 CHECK(msg->findInt64("time", &timeUs));
1240 
1241                 sp<AMessage> notify = mNotify->dup();
1242                 notify->setInt32("what", kWhatSeekPaused);
1243                 notify->setInt32("err", err);
1244                 notify->setInt64("time", timeUs);
1245                 notify->post();
1246                 break;
1247 
1248             }
1249 
1250             case 'see1':
1251             {
1252                 for (size_t i = 0; i < mTracks.size(); ++i) {
1253                     TrackInfo *info = &mTracks.editItemAt(i);
1254 
1255                     postQueueSeekDiscontinuity(i);
1256                     info->mEOSReceived = false;
1257 
1258                     info->mRTPAnchor = 0;
1259                     info->mNTPAnchorUs = -1;
1260                 }
1261 
1262                 mAllTracksHaveTime = false;
1263                 mNTPAnchorUs = -1;
1264 
1265                 // Start new timeoutgeneration to avoid getting timeout
1266                 // before PLAY response arrive
1267                 postTimeout();
1268 
1269                 int64_t timeUs;
1270                 CHECK(msg->findInt64("time", &timeUs));
1271 
1272                 AString request = "PLAY ";
1273                 request.append(mControlURL);
1274                 request.append(" RTSP/1.0\r\n");
1275 
1276                 request.append("Session: ");
1277                 request.append(mSessionID);
1278                 request.append("\r\n");
1279 
1280                 request.append(
1281                         AStringPrintf(
1282                             "Range: npt=%lld-\r\n", timeUs / 1000000ll));
1283 
1284                 request.append("\r\n");
1285 
1286                 sp<AMessage> reply = new AMessage('see2', this);
1287                 mConn->sendRequest(request.c_str(), reply);
1288                 break;
1289             }
1290 
1291             case 'see2':
1292             {
1293                 if (mTracks.size() == 0) {
1294                     // We have already hit abor, break
1295                     break;
1296                 }
1297 
1298                 int32_t result;
1299                 CHECK(msg->findInt32("result", &result));
1300 
1301                 ALOGI("PLAY (for seek) completed with result %d (%s)",
1302                      result, strerror(-result));
1303 
1304                 mCheckPending = false;
1305                 ++mCheckGeneration;
1306                 postAccessUnitTimeoutCheck();
1307 
1308                 if (result == OK) {
1309                     sp<RefBase> obj;
1310                     CHECK(msg->findObject("response", &obj));
1311                     sp<ARTSPResponse> response =
1312                         static_cast<ARTSPResponse *>(obj.get());
1313 
1314                     if (response->mStatusCode != 200) {
1315                         result = UNKNOWN_ERROR;
1316                     } else {
1317                         parsePlayResponse(response);
1318 
1319                         // Post new timeout in order to make sure to use
1320                         // fake timestamps if no new Sender Reports arrive
1321                         postTimeout();
1322 
1323                         ssize_t i = response->mHeaders.indexOfKey("rtp-info");
1324                         CHECK_GE(i, 0);
1325 
1326                         ALOGV("rtp-info: %s", response->mHeaders.valueAt(i).c_str());
1327 
1328                         ALOGI("seek completed.");
1329                     }
1330                 }
1331 
1332                 if (result != OK) {
1333                     ALOGE("seek failed, aborting.");
1334                     (new AMessage('abor', this))->post();
1335                 }
1336 
1337                 mPausing = false;
1338                 mSeekPending = false;
1339 
1340                 // Discard all stale access units.
1341                 for (size_t i = 0; i < mTracks.size(); ++i) {
1342                     TrackInfo *track = &mTracks.editItemAt(i);
1343                     track->mPackets.clear();
1344                 }
1345 
1346                 sp<AMessage> msg = mNotify->dup();
1347                 msg->setInt32("what", kWhatSeekDone);
1348                 msg->post();
1349                 break;
1350             }
1351 
1352             case 'biny':
1353             {
1354                 sp<ABuffer> buffer;
1355                 CHECK(msg->findBuffer("buffer", &buffer));
1356 
1357                 int32_t index;
1358                 CHECK(buffer->meta()->findInt32("index", &index));
1359 
1360                 mRTPConn->injectPacket(index, buffer);
1361                 break;
1362             }
1363 
1364             case 'tiou':
1365             {
1366                 int32_t timeoutGenerationCheck;
1367                 CHECK(msg->findInt32("tioucheck", &timeoutGenerationCheck));
1368                 if (timeoutGenerationCheck != mCheckTimeoutGeneration) {
1369                     // This is an outdated message. Ignore.
1370                     // This typically happens if a lot of seeks are
1371                     // performed, since new timeout messages now are
1372                     // posted at seek as well.
1373                     break;
1374                 }
1375                 if (!mReceivedFirstRTCPPacket) {
1376                     if (dataReceivedOnAllChannels() && !mTryFakeRTCP) {
1377                         ALOGW("We received RTP packets but no RTCP packets, "
1378                              "using fake timestamps.");
1379 
1380                         mTryFakeRTCP = true;
1381 
1382                         mReceivedFirstRTCPPacket = true;
1383 
1384                         fakeTimestamps();
1385                     } else if (!mReceivedFirstRTPPacket && !mTryTCPInterleaving) {
1386                         ALOGW("Never received any data, switching transports.");
1387 
1388                         mTryTCPInterleaving = true;
1389 
1390                         sp<AMessage> msg = new AMessage('abor', this);
1391                         msg->setInt32("reconnect", true);
1392                         msg->post();
1393                     } else {
1394                         ALOGW("Never received any data, disconnecting.");
1395                         (new AMessage('abor', this))->post();
1396                     }
1397                 } else {
1398                     if (!mAllTracksHaveTime) {
1399                         ALOGW("We received some RTCP packets, but time "
1400                               "could not be established on all tracks, now "
1401                               "using fake timestamps");
1402 
1403                         fakeTimestamps();
1404                     }
1405                 }
1406                 break;
1407             }
1408 
1409             default:
1410                 TRESPASS();
1411                 break;
1412         }
1413     }
1414 
postKeepAliveMyHandler1415     void postKeepAlive() {
1416         sp<AMessage> msg = new AMessage('aliv', this);
1417         msg->setInt32("generation", mKeepAliveGeneration);
1418         msg->post((mKeepAliveTimeoutUs * 9) / 10);
1419     }
1420 
cancelAccessUnitTimeoutCheckMyHandler1421     void cancelAccessUnitTimeoutCheck() {
1422         ALOGV("cancelAccessUnitTimeoutCheck");
1423         ++mCheckGeneration;
1424     }
1425 
postAccessUnitTimeoutCheckMyHandler1426     void postAccessUnitTimeoutCheck() {
1427         if (mCheckPending) {
1428             return;
1429         }
1430 
1431         mCheckPending = true;
1432         sp<AMessage> check = new AMessage('chek', this);
1433         check->setInt32("generation", mCheckGeneration);
1434         check->post(kAccessUnitTimeoutUs);
1435     }
1436 
SplitStringMyHandler1437     static void SplitString(
1438             const AString &s, const char *separator, List<AString> *items) {
1439         items->clear();
1440         size_t start = 0;
1441         while (start < s.size()) {
1442             ssize_t offset = s.find(separator, start);
1443 
1444             if (offset < 0) {
1445                 items->push_back(AString(s, start, s.size() - start));
1446                 break;
1447             }
1448 
1449             items->push_back(AString(s, start, offset - start));
1450             start = offset + strlen(separator);
1451         }
1452     }
1453 
parsePlayResponseMyHandler1454     void parsePlayResponse(const sp<ARTSPResponse> &response) {
1455         mPlayResponseParsed = true;
1456         if (mTracks.size() == 0) {
1457             ALOGV("parsePlayResponse: late packets ignored.");
1458             return;
1459         }
1460 
1461         ssize_t i = response->mHeaders.indexOfKey("range");
1462         if (i < 0) {
1463             // Server doesn't even tell use what range it is going to
1464             // play, therefore we won't support seeking.
1465             return;
1466         }
1467 
1468         AString range = response->mHeaders.valueAt(i);
1469         ALOGV("Range: %s", range.c_str());
1470 
1471         AString val;
1472         CHECK(GetAttribute(range.c_str(), "npt", &val));
1473 
1474         float npt1, npt2;
1475         if (!ASessionDescription::parseNTPRange(val.c_str(), &npt1, &npt2)) {
1476             // This is a live stream and therefore not seekable.
1477 
1478             ALOGI("This is a live stream");
1479             return;
1480         }
1481 
1482         i = response->mHeaders.indexOfKey("rtp-info");
1483         CHECK_GE(i, 0);
1484 
1485         AString rtpInfo = response->mHeaders.valueAt(i);
1486         List<AString> streamInfos;
1487         SplitString(rtpInfo, ",", &streamInfos);
1488 
1489         int n = 1;
1490         for (List<AString>::iterator it = streamInfos.begin();
1491              it != streamInfos.end(); ++it) {
1492             (*it).trim();
1493             ALOGV("streamInfo[%d] = %s", n, (*it).c_str());
1494 
1495             CHECK(GetAttribute((*it).c_str(), "url", &val));
1496 
1497             size_t trackIndex = 0;
1498             while (trackIndex < mTracks.size()
1499                     && !(val == mTracks.editItemAt(trackIndex).mURL)) {
1500                 ++trackIndex;
1501             }
1502             CHECK_LT(trackIndex, mTracks.size());
1503 
1504             CHECK(GetAttribute((*it).c_str(), "seq", &val));
1505 
1506             char *end;
1507             unsigned long seq = strtoul(val.c_str(), &end, 10);
1508 
1509             TrackInfo *info = &mTracks.editItemAt(trackIndex);
1510             info->mFirstSeqNumInSegment = seq;
1511             info->mNewSegment = true;
1512             info->mAllowedStaleAccessUnits = kMaxAllowedStaleAccessUnits;
1513 
1514             CHECK(GetAttribute((*it).c_str(), "rtptime", &val));
1515 
1516             uint32_t rtpTime = strtoul(val.c_str(), &end, 10);
1517 
1518             ALOGV("track #%d: rtpTime=%u <=> npt=%.2f", n, rtpTime, npt1);
1519 
1520             info->mNormalPlayTimeRTP = rtpTime;
1521             info->mNormalPlayTimeUs = (int64_t)(npt1 * 1E6);
1522 
1523             if (!mFirstAccessUnit) {
1524                 postNormalPlayTimeMapping(
1525                         trackIndex,
1526                         info->mNormalPlayTimeRTP, info->mNormalPlayTimeUs);
1527             }
1528 
1529             ++n;
1530         }
1531     }
1532 
getTrackFormatMyHandler1533     sp<MetaData> getTrackFormat(size_t index, int32_t *timeScale) {
1534         CHECK_GE(index, 0u);
1535         CHECK_LT(index, mTracks.size());
1536 
1537         const TrackInfo &info = mTracks.itemAt(index);
1538 
1539         *timeScale = info.mTimeScale;
1540 
1541         return info.mPacketSource->getFormat();
1542     }
1543 
countTracksMyHandler1544     size_t countTracks() const {
1545         return mTracks.size();
1546     }
1547 
1548 private:
1549     struct TrackInfo {
1550         AString mURL;
1551         int mRTPSocket;
1552         int mRTCPSocket;
1553         bool mUsingInterleavedTCP;
1554         uint32_t mFirstSeqNumInSegment;
1555         bool mNewSegment;
1556         int32_t mAllowedStaleAccessUnits;
1557 
1558         uint32_t mRTPAnchor;
1559         int64_t mNTPAnchorUs;
1560         int32_t mTimeScale;
1561         bool mEOSReceived;
1562 
1563         uint32_t mNormalPlayTimeRTP;
1564         int64_t mNormalPlayTimeUs;
1565 
1566         sp<APacketSource> mPacketSource;
1567 
1568         // Stores packets temporarily while no notion of time
1569         // has been established yet.
1570         List<sp<ABuffer> > mPackets;
1571     };
1572 
1573     sp<AMessage> mNotify;
1574     bool mUIDValid;
1575     uid_t mUID;
1576     sp<ALooper> mNetLooper;
1577     sp<ARTSPConnection> mConn;
1578     sp<ARTPConnection> mRTPConn;
1579     sp<ASessionDescription> mSessionDesc;
1580     AString mOriginalSessionURL;  // This one still has user:pass@
1581     AString mSessionURL;
1582     AString mSessionHost;
1583     AString mBaseURL;
1584     AString mControlURL;
1585     AString mSessionID;
1586     bool mSetupTracksSuccessful;
1587     bool mSeekPending;
1588     bool mFirstAccessUnit;
1589 
1590     bool mAllTracksHaveTime;
1591     int64_t mNTPAnchorUs;
1592     int64_t mMediaAnchorUs;
1593     int64_t mLastMediaTimeUs;
1594 
1595     int64_t mNumAccessUnitsReceived;
1596     bool mCheckPending;
1597     int32_t mCheckGeneration;
1598     int32_t mCheckTimeoutGeneration;
1599     bool mTryTCPInterleaving;
1600     bool mTryFakeRTCP;
1601     bool mReceivedFirstRTCPPacket;
1602     bool mReceivedFirstRTPPacket;
1603     bool mSeekable;
1604     int64_t mKeepAliveTimeoutUs;
1605     int32_t mKeepAliveGeneration;
1606     bool mPausing;
1607     int32_t mPauseGeneration;
1608 
1609     Vector<TrackInfo> mTracks;
1610 
1611     bool mPlayResponseParsed;
1612 
setupTrackMyHandler1613     void setupTrack(size_t index) {
1614         sp<APacketSource> source =
1615             new APacketSource(mSessionDesc, index);
1616 
1617         if (source->initCheck() != OK) {
1618             ALOGW("Unsupported format. Ignoring track #%zu.", index);
1619 
1620             sp<AMessage> reply = new AMessage('setu', this);
1621             reply->setSize("index", index);
1622             reply->setInt32("result", ERROR_UNSUPPORTED);
1623             reply->post();
1624             return;
1625         }
1626 
1627         AString url;
1628         CHECK(mSessionDesc->findAttribute(index, "a=control", &url));
1629 
1630         AString trackURL;
1631         CHECK(MakeURL(mBaseURL.c_str(), url.c_str(), &trackURL));
1632 
1633         mTracks.push(TrackInfo());
1634         TrackInfo *info = &mTracks.editItemAt(mTracks.size() - 1);
1635         info->mURL = trackURL;
1636         info->mPacketSource = source;
1637         info->mUsingInterleavedTCP = false;
1638         info->mFirstSeqNumInSegment = 0;
1639         info->mNewSegment = true;
1640         info->mAllowedStaleAccessUnits = kMaxAllowedStaleAccessUnits;
1641         info->mRTPSocket = -1;
1642         info->mRTCPSocket = -1;
1643         info->mRTPAnchor = 0;
1644         info->mNTPAnchorUs = -1;
1645         info->mNormalPlayTimeRTP = 0;
1646         info->mNormalPlayTimeUs = 0ll;
1647 
1648         unsigned long PT;
1649         AString formatDesc;
1650         AString formatParams;
1651         mSessionDesc->getFormatType(index, &PT, &formatDesc, &formatParams);
1652 
1653         int32_t timescale;
1654         int32_t numChannels;
1655         ASessionDescription::ParseFormatDesc(
1656                 formatDesc.c_str(), &timescale, &numChannels);
1657 
1658         info->mTimeScale = timescale;
1659         info->mEOSReceived = false;
1660 
1661         ALOGV("track #%zu URL=%s", mTracks.size(), trackURL.c_str());
1662 
1663         AString request = "SETUP ";
1664         request.append(trackURL);
1665         request.append(" RTSP/1.0\r\n");
1666 
1667         if (mTryTCPInterleaving) {
1668             size_t interleaveIndex = 2 * (mTracks.size() - 1);
1669             info->mUsingInterleavedTCP = true;
1670             info->mRTPSocket = interleaveIndex;
1671             info->mRTCPSocket = interleaveIndex + 1;
1672 
1673             request.append("Transport: RTP/AVP/TCP;interleaved=");
1674             request.append(interleaveIndex);
1675             request.append("-");
1676             request.append(interleaveIndex + 1);
1677         } else {
1678             unsigned rtpPort;
1679             ARTPConnection::MakePortPair(
1680                     &info->mRTPSocket, &info->mRTCPSocket, &rtpPort);
1681 
1682             if (mUIDValid) {
1683                 NetworkUtils::RegisterSocketUserTag(info->mRTPSocket, mUID,
1684                         (uint32_t)*(uint32_t*) "RTP_");
1685                 NetworkUtils::RegisterSocketUserTag(info->mRTCPSocket, mUID,
1686                         (uint32_t)*(uint32_t*) "RTP_");
1687                 NetworkUtils::RegisterSocketUserMark(info->mRTPSocket, mUID);
1688                 NetworkUtils::RegisterSocketUserMark(info->mRTCPSocket, mUID);
1689             }
1690 
1691             request.append("Transport: RTP/AVP/UDP;unicast;client_port=");
1692             request.append(rtpPort);
1693             request.append("-");
1694             request.append(rtpPort + 1);
1695         }
1696 
1697         request.append("\r\n");
1698 
1699         if (index > 1) {
1700             request.append("Session: ");
1701             request.append(mSessionID);
1702             request.append("\r\n");
1703         }
1704 
1705         request.append("\r\n");
1706 
1707         sp<AMessage> reply = new AMessage('setu', this);
1708         reply->setSize("index", index);
1709         reply->setSize("track-index", mTracks.size() - 1);
1710         mConn->sendRequest(request.c_str(), reply);
1711     }
1712 
MakeURLMyHandler1713     static bool MakeURL(const char *baseURL, const char *url, AString *out) {
1714         out->clear();
1715 
1716         if (strncasecmp("rtsp://", baseURL, 7)) {
1717             // Base URL must be absolute
1718             return false;
1719         }
1720 
1721         if (!strncasecmp("rtsp://", url, 7)) {
1722             // "url" is already an absolute URL, ignore base URL.
1723             out->setTo(url);
1724             return true;
1725         }
1726 
1727         size_t n = strlen(baseURL);
1728         out->setTo(baseURL);
1729         if (baseURL[n - 1] != '/') {
1730             out->append("/");
1731         }
1732         out->append(url);
1733 
1734         return true;
1735     }
1736 
fakeTimestampsMyHandler1737     void fakeTimestamps() {
1738         mNTPAnchorUs = -1ll;
1739         for (size_t i = 0; i < mTracks.size(); ++i) {
1740             onTimeUpdate(i, 0, 0ll);
1741         }
1742     }
1743 
dataReceivedOnAllChannelsMyHandler1744     bool dataReceivedOnAllChannels() {
1745         TrackInfo *track;
1746         for (size_t i = 0; i < mTracks.size(); ++i) {
1747             track = &mTracks.editItemAt(i);
1748             if (track->mPackets.empty()) {
1749                 return false;
1750             }
1751         }
1752         return true;
1753     }
1754 
handleFirstAccessUnitMyHandler1755     void handleFirstAccessUnit() {
1756         if (mFirstAccessUnit) {
1757             sp<AMessage> msg = mNotify->dup();
1758             msg->setInt32("what", kWhatConnected);
1759             msg->post();
1760 
1761             if (mSeekable) {
1762                 for (size_t i = 0; i < mTracks.size(); ++i) {
1763                     TrackInfo *info = &mTracks.editItemAt(i);
1764 
1765                     postNormalPlayTimeMapping(
1766                             i,
1767                             info->mNormalPlayTimeRTP, info->mNormalPlayTimeUs);
1768                 }
1769             }
1770 
1771             mFirstAccessUnit = false;
1772         }
1773     }
1774 
onTimeUpdateMyHandler1775     void onTimeUpdate(int32_t trackIndex, uint32_t rtpTime, uint64_t ntpTime) {
1776         ALOGV("onTimeUpdate track %d, rtpTime = 0x%08x, ntpTime = %#016llx",
1777              trackIndex, rtpTime, (long long)ntpTime);
1778 
1779         int64_t ntpTimeUs = (int64_t)(ntpTime * 1E6 / (1ll << 32));
1780 
1781         TrackInfo *track = &mTracks.editItemAt(trackIndex);
1782 
1783         track->mRTPAnchor = rtpTime;
1784         track->mNTPAnchorUs = ntpTimeUs;
1785 
1786         if (mNTPAnchorUs < 0) {
1787             mNTPAnchorUs = ntpTimeUs;
1788             mMediaAnchorUs = mLastMediaTimeUs;
1789         }
1790 
1791         if (!mAllTracksHaveTime) {
1792             bool allTracksHaveTime = (mTracks.size() > 0);
1793             for (size_t i = 0; i < mTracks.size(); ++i) {
1794                 TrackInfo *track = &mTracks.editItemAt(i);
1795                 if (track->mNTPAnchorUs < 0) {
1796                     allTracksHaveTime = false;
1797                     break;
1798                 }
1799             }
1800             if (allTracksHaveTime) {
1801                 mAllTracksHaveTime = true;
1802                 ALOGI("Time now established for all tracks.");
1803             }
1804         }
1805         if (mAllTracksHaveTime && dataReceivedOnAllChannels()) {
1806             handleFirstAccessUnit();
1807 
1808             // Time is now established, lets start timestamping immediately
1809             for (size_t i = 0; i < mTracks.size(); ++i) {
1810                 if (OK != processAccessUnitQueue(i)) {
1811                     return;
1812                 }
1813             }
1814             for (size_t i = 0; i < mTracks.size(); ++i) {
1815                 TrackInfo *trackInfo = &mTracks.editItemAt(i);
1816                 if (trackInfo->mEOSReceived) {
1817                     postQueueEOS(i, ERROR_END_OF_STREAM);
1818                     trackInfo->mEOSReceived = false;
1819                 }
1820             }
1821         }
1822     }
1823 
processAccessUnitQueueMyHandler1824     status_t processAccessUnitQueue(int32_t trackIndex) {
1825         TrackInfo *track = &mTracks.editItemAt(trackIndex);
1826         while (!track->mPackets.empty()) {
1827             sp<ABuffer> accessUnit = *track->mPackets.begin();
1828             track->mPackets.erase(track->mPackets.begin());
1829 
1830             uint32_t seqNum = (uint32_t)accessUnit->int32Data();
1831             if (track->mNewSegment) {
1832                 // The sequence number from RTP packet has only 16 bits and is extended
1833                 // by ARTPSource. Only the low 16 bits of seq in RTP-Info of reply of
1834                 // RTSP "PLAY" command should be used to detect the first RTP packet
1835                 // after seeking.
1836                 if (mSeekable) {
1837                     if (track->mAllowedStaleAccessUnits > 0) {
1838                         uint32_t seqNum16 = seqNum & 0xffff;
1839                         uint32_t firstSeqNumInSegment16 = track->mFirstSeqNumInSegment & 0xffff;
1840                         if (seqNum16 > firstSeqNumInSegment16 + kMaxAllowedStaleAccessUnits
1841                                 || seqNum16 < firstSeqNumInSegment16) {
1842                             // Not the first rtp packet of the stream after seeking, discarding.
1843                             track->mAllowedStaleAccessUnits--;
1844                             ALOGV("discarding stale access unit (0x%x : 0x%x)",
1845                                  seqNum, track->mFirstSeqNumInSegment);
1846                             continue;
1847                         }
1848                         ALOGW_IF(seqNum16 != firstSeqNumInSegment16,
1849                                 "Missing the first packet(%u), now take packet(%u) as first one",
1850                                 track->mFirstSeqNumInSegment, seqNum);
1851                     } else { // track->mAllowedStaleAccessUnits <= 0
1852                         mNumAccessUnitsReceived = 0;
1853                         ALOGW_IF(track->mAllowedStaleAccessUnits == 0,
1854                              "Still no first rtp packet after %d stale ones",
1855                              kMaxAllowedStaleAccessUnits);
1856                         track->mAllowedStaleAccessUnits = -1;
1857                         return UNKNOWN_ERROR;
1858                     }
1859                 }
1860 
1861                 // Now found the first rtp packet of the stream after seeking.
1862                 track->mFirstSeqNumInSegment = seqNum;
1863                 track->mNewSegment = false;
1864             }
1865 
1866             if (seqNum < track->mFirstSeqNumInSegment) {
1867                 ALOGV("dropping stale access-unit (%d < %d)",
1868                      seqNum, track->mFirstSeqNumInSegment);
1869                 continue;
1870             }
1871 
1872             if (addMediaTimestamp(trackIndex, track, accessUnit)) {
1873                 postQueueAccessUnit(trackIndex, accessUnit);
1874             }
1875         }
1876         return OK;
1877     }
1878 
onAccessUnitCompleteMyHandler1879     void onAccessUnitComplete(
1880             int32_t trackIndex, const sp<ABuffer> &accessUnit) {
1881         TrackInfo *track = &mTracks.editItemAt(trackIndex);
1882         track->mPackets.push_back(accessUnit);
1883 
1884         uint32_t seqNum = (uint32_t)accessUnit->int32Data();
1885         ALOGV("onAccessUnitComplete track %d storing accessunit %u", trackIndex, seqNum);
1886 
1887         if(!mPlayResponseParsed){
1888             ALOGV("play response is not parsed");
1889             return;
1890         }
1891 
1892         handleFirstAccessUnit();
1893 
1894         if (!mAllTracksHaveTime) {
1895             ALOGV("storing accessUnit, no time established yet");
1896             return;
1897         }
1898 
1899         if (OK != processAccessUnitQueue(trackIndex)) {
1900             return;
1901         }
1902 
1903         if (track->mEOSReceived) {
1904             postQueueEOS(trackIndex, ERROR_END_OF_STREAM);
1905             track->mEOSReceived = false;
1906         }
1907     }
1908 
addMediaTimestampMyHandler1909     bool addMediaTimestamp(
1910             int32_t trackIndex, const TrackInfo *track,
1911             const sp<ABuffer> &accessUnit) {
1912         UNUSED_UNLESS_VERBOSE(trackIndex);
1913 
1914         uint32_t rtpTime;
1915         CHECK(accessUnit->meta()->findInt32(
1916                     "rtp-time", (int32_t *)&rtpTime));
1917 
1918         int64_t relRtpTimeUs =
1919             (((int64_t)rtpTime - (int64_t)track->mRTPAnchor) * 1000000ll)
1920                 / track->mTimeScale;
1921 
1922         int64_t ntpTimeUs = track->mNTPAnchorUs + relRtpTimeUs;
1923 
1924         int64_t mediaTimeUs = mMediaAnchorUs + ntpTimeUs - mNTPAnchorUs;
1925 
1926         if (mediaTimeUs > mLastMediaTimeUs) {
1927             mLastMediaTimeUs = mediaTimeUs;
1928         }
1929 
1930         if (mediaTimeUs < 0 && !mSeekable) {
1931             ALOGV("dropping early accessUnit.");
1932             return false;
1933         }
1934 
1935         ALOGV("track %d rtpTime=%u mediaTimeUs = %lld us (%.2f secs)",
1936              trackIndex, rtpTime, (long long)mediaTimeUs, mediaTimeUs / 1E6);
1937 
1938         accessUnit->meta()->setInt64("timeUs", mediaTimeUs);
1939 
1940         return true;
1941     }
1942 
postQueueAccessUnitMyHandler1943     void postQueueAccessUnit(
1944             size_t trackIndex, const sp<ABuffer> &accessUnit) {
1945         sp<AMessage> msg = mNotify->dup();
1946         msg->setInt32("what", kWhatAccessUnit);
1947         msg->setSize("trackIndex", trackIndex);
1948         msg->setBuffer("accessUnit", accessUnit);
1949         msg->post();
1950     }
1951 
postQueueEOSMyHandler1952     void postQueueEOS(size_t trackIndex, status_t finalResult) {
1953         sp<AMessage> msg = mNotify->dup();
1954         msg->setInt32("what", kWhatEOS);
1955         msg->setSize("trackIndex", trackIndex);
1956         msg->setInt32("finalResult", finalResult);
1957         msg->post();
1958     }
1959 
postQueueSeekDiscontinuityMyHandler1960     void postQueueSeekDiscontinuity(size_t trackIndex) {
1961         sp<AMessage> msg = mNotify->dup();
1962         msg->setInt32("what", kWhatSeekDiscontinuity);
1963         msg->setSize("trackIndex", trackIndex);
1964         msg->post();
1965     }
1966 
postNormalPlayTimeMappingMyHandler1967     void postNormalPlayTimeMapping(
1968             size_t trackIndex, uint32_t rtpTime, int64_t nptUs) {
1969         sp<AMessage> msg = mNotify->dup();
1970         msg->setInt32("what", kWhatNormalPlayTimeMapping);
1971         msg->setSize("trackIndex", trackIndex);
1972         msg->setInt32("rtpTime", rtpTime);
1973         msg->setInt64("nptUs", nptUs);
1974         msg->post();
1975     }
1976 
postTimeoutMyHandler1977     void postTimeout() {
1978         sp<AMessage> timeout = new AMessage('tiou', this);
1979         mCheckTimeoutGeneration++;
1980         timeout->setInt32("tioucheck", mCheckTimeoutGeneration);
1981 
1982         int64_t startupTimeoutUs;
1983         startupTimeoutUs = property_get_int64("media.rtsp.timeout-us", kStartupTimeoutUs);
1984         timeout->post(startupTimeoutUs);
1985     }
1986 
1987     DISALLOW_EVIL_CONSTRUCTORS(MyHandler);
1988 };
1989 
1990 }  // namespace android
1991 
1992 #endif  // MY_HANDLER_H_
1993