1 /*
2  * Copyright (C) 2010 The Android Open Source Project
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *      http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 #ifndef MY_HANDLER_H_
18 
19 #define MY_HANDLER_H_
20 
21 //#define LOG_NDEBUG 0
22 
23 #ifndef LOG_TAG
24 #define LOG_TAG "MyHandler"
25 #endif
26 
27 #include <utils/Log.h>
28 #include <cutils/properties.h> // for property_get
29 
30 #include "APacketSource.h"
31 #include "ARTPConnection.h"
32 #include "ARTSPConnection.h"
33 #include "ASessionDescription.h"
34 #include "NetworkUtils.h"
35 
36 #include <ctype.h>
37 #include <cutils/properties.h>
38 
39 #include <datasource/HTTPBase.h>
40 #include <media/stagefright/foundation/ABuffer.h>
41 #include <media/stagefright/foundation/ADebug.h>
42 #include <media/stagefright/foundation/ALooper.h>
43 #include <media/stagefright/foundation/AMessage.h>
44 #include <media/stagefright/MediaErrors.h>
45 #include <media/stagefright/Utils.h>
46 #include <media/stagefright/FoundationUtils.h>
47 
48 #include <arpa/inet.h>
49 #include <sys/socket.h>
50 #include <netdb.h>
51 
52 
53 #if LOG_NDEBUG
54 #define UNUSED_UNLESS_VERBOSE(x) (void)(x)
55 #else
56 #define UNUSED_UNLESS_VERBOSE(x)
57 #endif
58 
59 #ifndef FALLTHROUGH_INTENDED
60 #define FALLTHROUGH_INTENDED [[clang::fallthrough]]  // NOLINT
61 #endif
62 
63 // If no access units are received within 5 secs, assume that the rtp
64 // stream has ended and signal end of stream.
65 static int64_t kAccessUnitTimeoutUs = 10000000ll;
66 
67 // If no access units arrive for the first 10 secs after starting the
68 // stream, assume none ever will and signal EOS or switch transports.
69 static int64_t kStartupTimeoutUs = 10000000ll;
70 
71 static int64_t kDefaultKeepAliveTimeoutUs = 60000000ll;
72 
73 static int64_t kPauseDelayUs = 3000000ll;
74 
75 // The allowed maximum number of stale access units at the beginning of
76 // a new sequence.
77 static int32_t kMaxAllowedStaleAudioAccessUnits = 20;
78 static int32_t kMaxAllowedStaleVideoAccessUnits = 400;
79 
80 static int64_t kTearDownTimeoutUs = 3000000ll;
81 
82 namespace android {
83 
GetAttribute(const char * s,const char * key,AString * value)84 static bool GetAttribute(const char *s, const char *key, AString *value) {
85     value->clear();
86 
87     size_t keyLen = strlen(key);
88 
89     for (;;) {
90         while (isspace(*s)) {
91             ++s;
92         }
93 
94         const char *colonPos = strchr(s, ';');
95 
96         size_t len =
97             (colonPos == NULL) ? strlen(s) : colonPos - s;
98 
99         if (len >= keyLen + 1 && s[keyLen] == '=' && !strncmp(s, key, keyLen)) {
100             value->setTo(&s[keyLen + 1], len - keyLen - 1);
101             return true;
102         }
103 
104         if (colonPos == NULL) {
105             return false;
106         }
107 
108         s = colonPos + 1;
109     }
110 }
111 
GetMaxAllowedStaleCount(bool isVideo)112 static int32_t GetMaxAllowedStaleCount(bool isVideo) {
113     return isVideo ? kMaxAllowedStaleVideoAccessUnits : kMaxAllowedStaleAudioAccessUnits;
114 }
115 
116 struct MyHandler : public AHandler {
117     enum {
118         kWhatConnected                  = 'conn',
119         kWhatDisconnected               = 'disc',
120         kWhatSeekPaused                 = 'spau',
121         kWhatSeekDone                   = 'sdon',
122 
123         kWhatAccessUnit                 = 'accU',
124         kWhatEOS                        = 'eos!',
125         kWhatSeekDiscontinuity          = 'seeD',
126         kWhatNormalPlayTimeMapping      = 'nptM',
127     };
128 
129     MyHandler(
130             const char *url,
131             const sp<AMessage> &notify,
132             bool uidValid = false, uid_t uid = 0)
mNotifyMyHandler133         : mNotify(notify),
134           mUIDValid(uidValid),
135           mUID(uid),
136           mNetLooper(new ALooper),
137           mConn(new ARTSPConnection(mUIDValid, mUID)),
138           mRTPConn(new ARTPConnection),
139           mOriginalSessionURL(url),
140           mSessionURL(url),
141           mSetupTracksSuccessful(false),
142           mSeekPending(false),
143           mFirstAccessUnit(true),
144           mAllTracksHaveTime(false),
145           mNTPAnchorUs(-1),
146           mMediaAnchorUs(-1),
147           mLastMediaTimeUs(0),
148           mNumAccessUnitsReceived(0),
149           mCheckPending(false),
150           mCheckGeneration(0),
151           mCheckTimeoutGeneration(0),
152           mTryTCPInterleaving(property_get_bool("rtp.transport.TCP", false)),
153           mTryFakeRTCP(false),
154           mReceivedFirstRTCPPacket(false),
155           mReceivedFirstRTPPacket(false),
156           mSeekable(true),
157           mKeepAliveTimeoutUs(kDefaultKeepAliveTimeoutUs),
158           mKeepAliveGeneration(0),
159           mPausing(false),
160           mPauseGeneration(0),
161           mPlayResponseParsed(false) {
162         mNetLooper->setName("rtsp net");
163         mNetLooper->start(false /* runOnCallingThread */,
164                           false /* canCallJava */,
165                           PRIORITY_HIGHEST);
166 
167         // Strip any authentication info from the session url, we don't
168         // want to transmit user/pass in cleartext.
169         AString host, path, user, pass;
170         unsigned port;
171         CHECK(ARTSPConnection::ParseURL(
172                     mSessionURL.c_str(), &host, &port, &path, &user, &pass));
173 
174         if (user.size() > 0) {
175             mSessionURL.clear();
176             mSessionURL.append("rtsp://");
177             mSessionURL.append(host);
178             mSessionURL.append(":");
179             mSessionURL.append(AStringPrintf("%u", port));
180             mSessionURL.append(path);
181 
182             ALOGV("rewritten session url: '%s'", mSessionURL.c_str());
183         }
184 
185         mSessionHost = host;
186     }
187 
connectMyHandler188     void connect() {
189         looper()->registerHandler(mConn);
190         (1 ? mNetLooper : looper())->registerHandler(mRTPConn);
191 
192         sp<AMessage> notify = new AMessage('biny', this);
193         mConn->observeBinaryData(notify);
194 
195         sp<AMessage> reply = new AMessage('conn', this);
196         mConn->connect(mOriginalSessionURL.c_str(), reply);
197     }
198 
loadSDPMyHandler199     void loadSDP(const sp<ASessionDescription>& desc) {
200         looper()->registerHandler(mConn);
201         (1 ? mNetLooper : looper())->registerHandler(mRTPConn);
202 
203         sp<AMessage> notify = new AMessage('biny', this);
204         mConn->observeBinaryData(notify);
205 
206         sp<AMessage> reply = new AMessage('sdpl', this);
207         reply->setObject("description", desc);
208         mConn->connect(mOriginalSessionURL.c_str(), reply);
209     }
210 
getControlURLMyHandler211     AString getControlURL() {
212         AString sessionLevelControlURL;
213         if (mSessionDesc->findAttribute(
214                 0,
215                 "a=control",
216                 &sessionLevelControlURL)) {
217             if (sessionLevelControlURL.compare("*") == 0) {
218                 return mBaseURL;
219             } else {
220                 AString controlURL;
221                 CHECK(MakeURL(
222                         mBaseURL.c_str(),
223                         sessionLevelControlURL.c_str(),
224                         &controlURL));
225                 return controlURL;
226             }
227         } else {
228             return mSessionURL;
229         }
230     }
231 
disconnectMyHandler232     void disconnect() {
233         (new AMessage('abor', this))->post();
234     }
235 
seekMyHandler236     void seek(int64_t timeUs) {
237         sp<AMessage> msg = new AMessage('seek', this);
238         msg->setInt64("time", timeUs);
239         mPauseGeneration++;
240         msg->post();
241     }
242 
continueSeekAfterPauseMyHandler243     void continueSeekAfterPause(int64_t timeUs) {
244         sp<AMessage> msg = new AMessage('see1', this);
245         msg->setInt64("time", timeUs);
246         msg->post();
247     }
248 
isSeekableMyHandler249     bool isSeekable() const {
250         return mSeekable;
251     }
252 
pauseMyHandler253     void pause() {
254         sp<AMessage> msg = new AMessage('paus', this);
255         mPauseGeneration++;
256         msg->setInt32("pausecheck", mPauseGeneration);
257         msg->post();
258     }
259 
resumeMyHandler260     void resume() {
261         sp<AMessage> msg = new AMessage('resu', this);
262         mPauseGeneration++;
263         msg->post();
264     }
265 
getARTSPConnectionMyHandler266     sp<ARTSPConnection> getARTSPConnection() {
267       return mConn;
268     }
269 
addRRMyHandler270     static void addRR(const sp<ABuffer> &buf) {
271         uint8_t *ptr = buf->data() + buf->size();
272         ptr[0] = 0x80 | 0;
273         ptr[1] = 201;  // RR
274         ptr[2] = 0;
275         ptr[3] = 1;
276         ptr[4] = 0xde;  // SSRC
277         ptr[5] = 0xad;
278         ptr[6] = 0xbe;
279         ptr[7] = 0xef;
280 
281         buf->setRange(0, buf->size() + 8);
282     }
283 
addSDESMyHandler284     static void addSDES(int s, const sp<ABuffer> &buffer) {
285         struct sockaddr_in addr;
286         socklen_t addrSize = sizeof(addr);
287         if (getsockname(s, (sockaddr *)&addr, &addrSize) != 0) {
288             inet_aton("0.0.0.0", &(addr.sin_addr));
289         }
290 
291         uint8_t *data = buffer->data() + buffer->size();
292         data[0] = 0x80 | 1;
293         data[1] = 202;  // SDES
294         data[4] = 0xde;  // SSRC
295         data[5] = 0xad;
296         data[6] = 0xbe;
297         data[7] = 0xef;
298 
299         size_t offset = 8;
300 
301         data[offset++] = 1;  // CNAME
302 
303         AString cname = "stagefright@";
304         cname.append(inet_ntoa(addr.sin_addr));
305         data[offset++] = cname.size();
306 
307         memcpy(&data[offset], cname.c_str(), cname.size());
308         offset += cname.size();
309 
310         data[offset++] = 6;  // TOOL
311 
312         AString tool = MakeUserAgent();
313 
314         data[offset++] = tool.size();
315 
316         memcpy(&data[offset], tool.c_str(), tool.size());
317         offset += tool.size();
318 
319         data[offset++] = 0;
320 
321         if ((offset % 4) > 0) {
322             size_t count = 4 - (offset % 4);
323             switch (count) {
324                 case 3:
325                     data[offset++] = 0;
326                     FALLTHROUGH_INTENDED;
327                 case 2:
328                     data[offset++] = 0;
329                     FALLTHROUGH_INTENDED;
330                 case 1:
331                     data[offset++] = 0;
332             }
333         }
334 
335         size_t numWords = (offset / 4) - 1;
336         data[2] = numWords >> 8;
337         data[3] = numWords & 0xff;
338 
339         buffer->setRange(buffer->offset(), buffer->size() + offset);
340     }
341 
342     // In case we're behind NAT, fire off two UDP packets to the remote
343     // rtp/rtcp ports to poke a hole into the firewall for future incoming
344     // packets. We're going to send an RR/SDES RTCP packet to both of them.
pokeAHoleMyHandler345     bool pokeAHole(int rtpSocket, int rtcpSocket, const AString &transport) {
346         struct sockaddr_in addr;
347         memset(addr.sin_zero, 0, sizeof(addr.sin_zero));
348         addr.sin_family = AF_INET;
349 
350         AString source;
351         AString server_port;
352         if (!GetAttribute(transport.c_str(),
353                           "source",
354                           &source)) {
355             ALOGW("Missing 'source' field in Transport response. Using "
356                  "RTSP endpoint address.");
357 
358             struct hostent *ent = gethostbyname(mSessionHost.c_str());
359             if (ent == NULL) {
360                 ALOGE("Failed to look up address of session host");
361 
362                 return false;
363             }
364 
365             addr.sin_addr.s_addr = *(in_addr_t *)ent->h_addr;
366         } else {
367             addr.sin_addr.s_addr = inet_addr(source.c_str());
368         }
369 
370         if (!GetAttribute(transport.c_str(),
371                                  "server_port",
372                                  &server_port)) {
373             ALOGI("Missing 'server_port' field in Transport response.");
374             return false;
375         }
376 
377         int rtpPort, rtcpPort;
378         if (sscanf(server_port.c_str(), "%d-%d", &rtpPort, &rtcpPort) != 2
379                 || rtpPort <= 0 || rtpPort > 65535
380                 || rtcpPort <=0 || rtcpPort > 65535
381                 || rtcpPort != rtpPort + 1) {
382             ALOGE("Server picked invalid RTP/RTCP port pair %s,"
383                  " RTP port must be even, RTCP port must be one higher.",
384                  server_port.c_str());
385 
386             return false;
387         }
388 
389         if (rtpPort & 1) {
390             ALOGW("Server picked an odd RTP port, it should've picked an "
391                  "even one, we'll let it pass for now, but this may break "
392                  "in the future.");
393         }
394 
395         if (addr.sin_addr.s_addr == INADDR_NONE) {
396             return true;
397         }
398 
399         if (IN_LOOPBACK(ntohl(addr.sin_addr.s_addr))) {
400             // No firewalls to traverse on the loopback interface.
401             return true;
402         }
403 
404         // Make up an RR/SDES RTCP packet.
405         sp<ABuffer> buf = new ABuffer(65536);
406         buf->setRange(0, 0);
407         addRR(buf);
408         addSDES(rtpSocket, buf);
409 
410         addr.sin_port = htons(rtpPort);
411 
412         ssize_t n = sendto(
413                 rtpSocket, buf->data(), buf->size(), 0,
414                 (const sockaddr *)&addr, sizeof(addr));
415 
416         if (n < (ssize_t)buf->size()) {
417             ALOGE("failed to poke a hole for RTP packets");
418             return false;
419         }
420 
421         addr.sin_port = htons(rtcpPort);
422 
423         n = sendto(
424                 rtcpSocket, buf->data(), buf->size(), 0,
425                 (const sockaddr *)&addr, sizeof(addr));
426 
427         if (n < (ssize_t)buf->size()) {
428             ALOGE("failed to poke a hole for RTCP packets");
429             return false;
430         }
431 
432         ALOGV("successfully poked holes.");
433 
434         return true;
435     }
436 
isLiveStreamMyHandler437     static bool isLiveStream(const sp<ASessionDescription> &desc) {
438         AString attrLiveStream;
439         if (desc->findAttribute(0, "a=LiveStream", &attrLiveStream)) {
440             ssize_t semicolonPos = attrLiveStream.find(";", 2);
441 
442             const char* liveStreamValue;
443             if (semicolonPos < 0) {
444                 liveStreamValue = attrLiveStream.c_str();
445             } else {
446                 AString valString;
447                 valString.setTo(attrLiveStream,
448                         semicolonPos + 1,
449                         attrLiveStream.size() - semicolonPos - 1);
450                 liveStreamValue = valString.c_str();
451             }
452 
453             uint32_t value = strtoul(liveStreamValue, NULL, 10);
454             if (value == 1) {
455                 ALOGV("found live stream");
456                 return true;
457             }
458         } else {
459             // It is a live stream if no duration is returned
460             int64_t durationUs;
461             if (!desc->getDurationUs(&durationUs)) {
462                 ALOGV("No duration found, assume live stream");
463                 return true;
464             }
465         }
466 
467         return false;
468     }
469 
onMessageReceivedMyHandler470     virtual void onMessageReceived(const sp<AMessage> &msg) {
471         switch (msg->what()) {
472             case 'conn':
473             {
474                 int32_t result;
475                 CHECK(msg->findInt32("result", &result));
476 
477                 ALOGI("connection request completed with result %d (%s)",
478                      result, strerror(-result));
479 
480                 if (result == OK) {
481                     AString request;
482                     request = "DESCRIBE ";
483                     request.append(mSessionURL);
484                     request.append(" RTSP/1.0\r\n");
485                     request.append("Accept: application/sdp\r\n");
486                     request.append("\r\n");
487 
488                     sp<AMessage> reply = new AMessage('desc', this);
489                     mConn->sendRequest(request.c_str(), reply);
490                 } else {
491                     (new AMessage('disc', this))->post();
492                 }
493                 break;
494             }
495 
496             case 'disc':
497             {
498                 ++mKeepAliveGeneration;
499 
500                 int32_t reconnect;
501                 if (msg->findInt32("reconnect", &reconnect) && reconnect) {
502                     sp<AMessage> reply = new AMessage('conn', this);
503                     mConn->connect(mOriginalSessionURL.c_str(), reply);
504                 } else {
505                     (new AMessage('quit', this))->post();
506                 }
507                 break;
508             }
509 
510             case 'desc':
511             {
512                 int32_t result;
513                 CHECK(msg->findInt32("result", &result));
514 
515                 ALOGI("DESCRIBE completed with result %d (%s)",
516                      result, strerror(-result));
517 
518                 if (result == OK) {
519                     sp<RefBase> obj;
520                     CHECK(msg->findObject("response", &obj));
521                     sp<ARTSPResponse> response =
522                         static_cast<ARTSPResponse *>(obj.get());
523 
524                     if (response->mStatusCode == 301 || response->mStatusCode == 302) {
525                         ssize_t i = response->mHeaders.indexOfKey("location");
526                         CHECK_GE(i, 0);
527 
528                         mOriginalSessionURL = response->mHeaders.valueAt(i);
529                         mSessionURL = mOriginalSessionURL;
530 
531                         // Strip any authentication info from the session url, we don't
532                         // want to transmit user/pass in cleartext.
533                         AString host, path, user, pass;
534                         unsigned port;
535                         if (ARTSPConnection::ParseURL(
536                                     mSessionURL.c_str(), &host, &port, &path, &user, &pass)
537                                 && user.size() > 0) {
538                             mSessionURL.clear();
539                             mSessionURL.append("rtsp://");
540                             mSessionURL.append(host);
541                             mSessionURL.append(":");
542                             mSessionURL.append(AStringPrintf("%u", port));
543                             mSessionURL.append(path);
544 
545                             ALOGV("rewritten session url: '%s'", mSessionURL.c_str());
546                         }
547 
548                         sp<AMessage> reply = new AMessage('conn', this);
549                         mConn->connect(mOriginalSessionURL.c_str(), reply);
550                         break;
551                     }
552 
553                     if (response->mStatusCode != 200) {
554                         result = UNKNOWN_ERROR;
555                     } else if (response->mContent == NULL) {
556                         result = ERROR_MALFORMED;
557                         ALOGE("The response has no content.");
558                     } else {
559                         mSessionDesc = new ASessionDescription;
560 
561                         mSessionDesc->setTo(
562                                 response->mContent->data(),
563                                 response->mContent->size());
564 
565                         if (!mSessionDesc->isValid()) {
566                             ALOGE("Failed to parse session description.");
567                             result = ERROR_MALFORMED;
568                         } else {
569                             ssize_t i = response->mHeaders.indexOfKey("content-base");
570                             if (i >= 0) {
571                                 mBaseURL = response->mHeaders.valueAt(i);
572                             } else {
573                                 i = response->mHeaders.indexOfKey("content-location");
574                                 if (i >= 0) {
575                                     mBaseURL = response->mHeaders.valueAt(i);
576                                 } else {
577                                     mBaseURL = mSessionURL;
578                                 }
579                             }
580 
581                             mSeekable = !isLiveStream(mSessionDesc);
582 
583                             if (!mBaseURL.startsWith("rtsp://")) {
584                                 // Some misbehaving servers specify a relative
585                                 // URL in one of the locations above, combine
586                                 // it with the absolute session URL to get
587                                 // something usable...
588 
589                                 ALOGW("Server specified a non-absolute base URL"
590                                      ", combining it with the session URL to "
591                                      "get something usable...");
592 
593                                 AString tmp;
594                                 CHECK(MakeURL(
595                                             mSessionURL.c_str(),
596                                             mBaseURL.c_str(),
597                                             &tmp));
598 
599                                 mBaseURL = tmp;
600                             }
601 
602                             mControlURL = getControlURL();
603 
604                             if (mSessionDesc->countTracks() < 2) {
605                                 // There's no actual tracks in this session.
606                                 // The first "track" is merely session meta
607                                 // data.
608 
609                                 ALOGW("Session doesn't contain any playable "
610                                      "tracks. Aborting.");
611                                 result = ERROR_UNSUPPORTED;
612                             } else {
613                                 setupTrack(1);
614                             }
615                         }
616                     }
617                 }
618 
619                 if (result != OK) {
620                     sp<AMessage> reply = new AMessage('disc', this);
621                     mConn->disconnect(reply);
622                 }
623                 break;
624             }
625 
626             case 'sdpl':
627             {
628                 int32_t result;
629                 CHECK(msg->findInt32("result", &result));
630 
631                 ALOGI("SDP connection request completed with result %d (%s)",
632                      result, strerror(-result));
633 
634                 if (result == OK) {
635                     sp<RefBase> obj;
636                     CHECK(msg->findObject("description", &obj));
637                     mSessionDesc =
638                         static_cast<ASessionDescription *>(obj.get());
639 
640                     if (!mSessionDesc->isValid()) {
641                         ALOGE("Failed to parse session description.");
642                         result = ERROR_MALFORMED;
643                     } else {
644                         mBaseURL = mSessionURL;
645 
646                         mSeekable = !isLiveStream(mSessionDesc);
647 
648                         mControlURL = getControlURL();
649 
650                         if (mSessionDesc->countTracks() < 2) {
651                             // There's no actual tracks in this session.
652                             // The first "track" is merely session meta
653                             // data.
654 
655                             ALOGW("Session doesn't contain any playable "
656                                  "tracks. Aborting.");
657                             result = ERROR_UNSUPPORTED;
658                         } else {
659                             setupTrack(1);
660                         }
661                     }
662                 }
663 
664                 if (result != OK) {
665                     sp<AMessage> reply = new AMessage('disc', this);
666                     mConn->disconnect(reply);
667                 }
668                 break;
669             }
670 
671             case 'setu':
672             {
673                 size_t index;
674                 CHECK(msg->findSize("index", &index));
675 
676                 TrackInfo *track = NULL;
677                 size_t trackIndex;
678                 if (msg->findSize("track-index", &trackIndex)) {
679                     track = &mTracks.editItemAt(trackIndex);
680                 }
681 
682                 int32_t result;
683                 CHECK(msg->findInt32("result", &result));
684 
685                 ALOGI("SETUP(%zu) completed with result %d (%s)",
686                      index, result, strerror(-result));
687 
688                 if (result == OK) {
689                     CHECK(track != NULL);
690 
691                     sp<RefBase> obj;
692                     CHECK(msg->findObject("response", &obj));
693                     sp<ARTSPResponse> response =
694                         static_cast<ARTSPResponse *>(obj.get());
695 
696                     if (response->mStatusCode != 200) {
697                         result = UNKNOWN_ERROR;
698                     } else {
699                         ssize_t i = response->mHeaders.indexOfKey("session");
700                         CHECK_GE(i, 0);
701 
702                         mSessionID = response->mHeaders.valueAt(i);
703 
704                         mKeepAliveTimeoutUs = kDefaultKeepAliveTimeoutUs;
705                         AString timeoutStr;
706                         if (GetAttribute(
707                                     mSessionID.c_str(), "timeout", &timeoutStr)) {
708                             char *end;
709                             unsigned long timeoutSecs =
710                                 strtoul(timeoutStr.c_str(), &end, 10);
711 
712                             if (end == timeoutStr.c_str() || *end != '\0') {
713                                 ALOGW("server specified malformed timeout '%s'",
714                                      timeoutStr.c_str());
715 
716                                 mKeepAliveTimeoutUs = kDefaultKeepAliveTimeoutUs;
717                             } else if (timeoutSecs < 15) {
718                                 ALOGW("server specified too short a timeout "
719                                      "(%lu secs), using default.",
720                                      timeoutSecs);
721 
722                                 mKeepAliveTimeoutUs = kDefaultKeepAliveTimeoutUs;
723                             } else {
724                                 mKeepAliveTimeoutUs = timeoutSecs * 1000000ll;
725 
726                                 ALOGI("server specified timeout of %lu secs.",
727                                      timeoutSecs);
728                             }
729                         }
730 
731                         i = mSessionID.find(";");
732                         if (i >= 0) {
733                             // Remove options, i.e. ";timeout=90"
734                             mSessionID.erase(i, mSessionID.size() - i);
735                         }
736 
737                         sp<AMessage> notify = new AMessage('accu', this);
738                         notify->setSize("track-index", trackIndex);
739 
740                         i = response->mHeaders.indexOfKey("transport");
741                         CHECK_GE(i, 0);
742 
743                         if (track->mRTPSocket != -1 && track->mRTCPSocket != -1) {
744                             if (!track->mUsingInterleavedTCP) {
745                                 AString transport = response->mHeaders.valueAt(i);
746 
747                                 // We are going to continue even if we were
748                                 // unable to poke a hole into the firewall...
749                                 pokeAHole(
750                                         track->mRTPSocket,
751                                         track->mRTCPSocket,
752                                         transport);
753                             }
754 
755                             mRTPConn->addStream(
756                                     track->mRTPSocket, track->mRTCPSocket,
757                                     mSessionDesc, index,
758                                     notify, track->mUsingInterleavedTCP);
759 
760                             mSetupTracksSuccessful = true;
761                         } else {
762                             result = BAD_VALUE;
763                         }
764                     }
765                 }
766 
767                 if (result != OK) {
768                     if (track) {
769                         if (!track->mUsingInterleavedTCP) {
770                             // Clear the tag
771                             if (mUIDValid) {
772                                 NetworkUtils::UnRegisterSocketUserTag(track->mRTPSocket);
773                                 NetworkUtils::UnRegisterSocketUserMark(track->mRTPSocket);
774                                 NetworkUtils::UnRegisterSocketUserTag(track->mRTCPSocket);
775                                 NetworkUtils::UnRegisterSocketUserMark(track->mRTCPSocket);
776                             }
777 
778                             close(track->mRTPSocket);
779                             close(track->mRTCPSocket);
780                         }
781 
782                         mTracks.removeItemsAt(trackIndex);
783                     }
784                 }
785 
786                 ++index;
787                 if (result == OK && index < mSessionDesc->countTracks()) {
788                     setupTrack(index);
789                 } else if (mSetupTracksSuccessful) {
790                     ++mKeepAliveGeneration;
791                     postKeepAlive();
792 
793                     AString request = "PLAY ";
794                     request.append(mControlURL);
795                     request.append(" RTSP/1.0\r\n");
796 
797                     request.append("Session: ");
798                     request.append(mSessionID);
799                     request.append("\r\n");
800 
801                     request.append("\r\n");
802 
803                     sp<AMessage> reply = new AMessage('play', this);
804                     mConn->sendRequest(request.c_str(), reply);
805                 } else {
806                     sp<AMessage> reply = new AMessage('disc', this);
807                     mConn->disconnect(reply);
808                 }
809                 break;
810             }
811 
812             case 'play':
813             {
814                 int32_t result;
815                 CHECK(msg->findInt32("result", &result));
816 
817                 ALOGI("PLAY completed with result %d (%s)",
818                      result, strerror(-result));
819 
820                 if (result == OK) {
821                     sp<RefBase> obj;
822                     CHECK(msg->findObject("response", &obj));
823                     sp<ARTSPResponse> response =
824                         static_cast<ARTSPResponse *>(obj.get());
825 
826                     if (response->mStatusCode != 200) {
827                         result = UNKNOWN_ERROR;
828                     } else {
829                         parsePlayResponse(response);
830                         postTimeout();
831                     }
832                 }
833 
834                 if (result != OK) {
835                     sp<AMessage> reply = new AMessage('disc', this);
836                     mConn->disconnect(reply);
837                 }
838 
839                 break;
840             }
841 
842             case 'aliv':
843             {
844                 int32_t generation;
845                 CHECK(msg->findInt32("generation", &generation));
846 
847                 if (generation != mKeepAliveGeneration) {
848                     // obsolete event.
849                     break;
850                 }
851 
852                 AString request;
853                 request.append("OPTIONS ");
854                 request.append(mSessionURL);
855                 request.append(" RTSP/1.0\r\n");
856                 request.append("Session: ");
857                 request.append(mSessionID);
858                 request.append("\r\n");
859                 request.append("\r\n");
860 
861                 sp<AMessage> reply = new AMessage('opts', this);
862                 reply->setInt32("generation", mKeepAliveGeneration);
863                 mConn->sendRequest(request.c_str(), reply);
864                 break;
865             }
866 
867             case 'opts':
868             {
869                 int32_t result;
870                 CHECK(msg->findInt32("result", &result));
871 
872                 ALOGI("OPTIONS completed with result %d (%s)",
873                      result, strerror(-result));
874 
875                 int32_t generation;
876                 CHECK(msg->findInt32("generation", &generation));
877 
878                 if (generation != mKeepAliveGeneration) {
879                     // obsolete event.
880                     break;
881                 }
882 
883                 postKeepAlive();
884                 break;
885             }
886 
887             case 'abor':
888             {
889                 for (size_t i = 0; i < mTracks.size(); ++i) {
890                     TrackInfo *info = &mTracks.editItemAt(i);
891 
892                     if (!mFirstAccessUnit) {
893                         postQueueEOS(i, ERROR_END_OF_STREAM);
894                     }
895 
896                     if (!info->mUsingInterleavedTCP) {
897                         mRTPConn->removeStream(info->mRTPSocket, info->mRTCPSocket);
898 
899                         // Clear the tag
900                         if (mUIDValid) {
901                             NetworkUtils::UnRegisterSocketUserTag(info->mRTPSocket);
902                             NetworkUtils::UnRegisterSocketUserMark(info->mRTPSocket);
903                             NetworkUtils::UnRegisterSocketUserTag(info->mRTCPSocket);
904                             NetworkUtils::UnRegisterSocketUserMark(info->mRTCPSocket);
905                         }
906 
907                         close(info->mRTPSocket);
908                         close(info->mRTCPSocket);
909                     }
910                 }
911                 mTracks.clear();
912                 mSetupTracksSuccessful = false;
913                 mSeekPending = false;
914                 mFirstAccessUnit = true;
915                 mAllTracksHaveTime = false;
916                 mNTPAnchorUs = -1;
917                 mMediaAnchorUs = -1;
918                 mNumAccessUnitsReceived = 0;
919                 mReceivedFirstRTCPPacket = false;
920                 mReceivedFirstRTPPacket = false;
921                 mPausing = false;
922                 mSeekable = true;
923 
924                 sp<AMessage> reply = new AMessage('tear', this);
925 
926                 int32_t reconnect;
927                 if (msg->findInt32("reconnect", &reconnect) && reconnect) {
928                     reply->setInt32("reconnect", true);
929                 }
930 
931                 AString request;
932                 request = "TEARDOWN ";
933 
934                 // XXX should use aggregate url from SDP here...
935                 request.append(mSessionURL);
936                 request.append(" RTSP/1.0\r\n");
937 
938                 request.append("Session: ");
939                 request.append(mSessionID);
940                 request.append("\r\n");
941 
942                 request.append("\r\n");
943 
944                 mConn->sendRequest(request.c_str(), reply);
945 
946                 // If the response of teardown hasn't been received in 3 seconds,
947                 // post 'tear' message to avoid ANR.
948                 if (!msg->findInt32("reconnect", &reconnect) || !reconnect) {
949                     sp<AMessage> teardown = reply->dup();
950                     teardown->setInt32("result", -ECONNABORTED);
951                     teardown->post(kTearDownTimeoutUs);
952                 }
953                 break;
954             }
955 
956             case 'tear':
957             {
958                 int32_t result;
959                 CHECK(msg->findInt32("result", &result));
960 
961                 ALOGI("TEARDOWN completed with result %d (%s)",
962                      result, strerror(-result));
963 
964                 sp<AMessage> reply = new AMessage('disc', this);
965 
966                 int32_t reconnect;
967                 if (msg->findInt32("reconnect", &reconnect) && reconnect) {
968                     reply->setInt32("reconnect", true);
969                 }
970 
971                 mConn->disconnect(reply);
972                 break;
973             }
974 
975             case 'quit':
976             {
977                 sp<AMessage> msg = mNotify->dup();
978                 msg->setInt32("what", kWhatDisconnected);
979                 msg->setInt32("result", UNKNOWN_ERROR);
980                 msg->post();
981                 break;
982             }
983 
984             case 'chek':
985             {
986                 int32_t generation;
987                 CHECK(msg->findInt32("generation", &generation));
988                 if (generation != mCheckGeneration) {
989                     // This is an outdated message. Ignore.
990                     break;
991                 }
992 
993                 if (mNumAccessUnitsReceived == 0) {
994 #if 1
995                     ALOGI("stream ended? aborting.");
996                     (new AMessage('abor', this))->post();
997                     break;
998 #else
999                     ALOGI("haven't seen an AU in a looong time.");
1000 #endif
1001                 }
1002 
1003                 mNumAccessUnitsReceived = 0;
1004                 msg->post(kAccessUnitTimeoutUs);
1005                 break;
1006             }
1007 
1008             case 'accu':
1009             {
1010                 if (mSeekPending) {
1011                     ALOGV("Stale access unit.");
1012                     break;
1013                 }
1014 
1015                 int32_t timeUpdate;
1016                 if (msg->findInt32("time-update", &timeUpdate) && timeUpdate) {
1017                     size_t trackIndex;
1018                     CHECK(msg->findSize("track-index", &trackIndex));
1019 
1020                     uint32_t rtpTime;
1021                     uint64_t ntpTime;
1022                     CHECK(msg->findInt32("rtp-time", (int32_t *)&rtpTime));
1023                     CHECK(msg->findInt64("ntp-time", (int64_t *)&ntpTime));
1024 
1025                     onTimeUpdate(trackIndex, rtpTime, ntpTime);
1026                     break;
1027                 }
1028 
1029                 int32_t first;
1030                 if (msg->findInt32("first-rtcp", &first)) {
1031                     mReceivedFirstRTCPPacket = true;
1032                     break;
1033                 }
1034 
1035                 if (msg->findInt32("first-rtp", &first)) {
1036                     mReceivedFirstRTPPacket = true;
1037                     break;
1038                 }
1039 
1040                 int32_t rtcpEvent;
1041                 if (msg->findInt32("rtcp-event", &rtcpEvent)) {
1042                     break;
1043                 }
1044 
1045                 ++mNumAccessUnitsReceived;
1046                 postAccessUnitTimeoutCheck();
1047 
1048                 size_t trackIndex;
1049                 CHECK(msg->findSize("track-index", &trackIndex));
1050 
1051                 if (trackIndex >= mTracks.size()) {
1052                     ALOGV("late packets ignored.");
1053                     break;
1054                 }
1055 
1056                 TrackInfo *track = &mTracks.editItemAt(trackIndex);
1057 
1058                 int32_t eos;
1059                 if (msg->findInt32("eos", &eos)) {
1060                     ALOGI("received BYE on track index %zu", trackIndex);
1061                     if (!mAllTracksHaveTime && dataReceivedOnAllChannels()) {
1062                         ALOGI("No time established => fake existing data");
1063 
1064                         track->mEOSReceived = true;
1065                         mTryFakeRTCP = true;
1066                         mReceivedFirstRTCPPacket = true;
1067                         fakeTimestamps();
1068                     } else {
1069                         postQueueEOS(trackIndex, ERROR_END_OF_STREAM);
1070                     }
1071                     return;
1072                 }
1073 
1074                 if (mSeekPending) {
1075                     ALOGV("we're seeking, dropping stale packet.");
1076                     break;
1077                 }
1078 
1079                 sp<ABuffer> accessUnit;
1080                 CHECK(msg->findBuffer("access-unit", &accessUnit));
1081                 onAccessUnitComplete(trackIndex, accessUnit);
1082                 break;
1083             }
1084 
1085             case 'paus':
1086             {
1087                 int32_t generation;
1088                 CHECK(msg->findInt32("pausecheck", &generation));
1089                 if (generation != mPauseGeneration) {
1090                     ALOGV("Ignoring outdated pause message.");
1091                     break;
1092                 }
1093 
1094                 if (!mSeekable) {
1095                     ALOGW("This is a live stream, ignoring pause request.");
1096                     break;
1097                 }
1098 
1099                 if (mPausing) {
1100                     ALOGV("This stream is already paused.");
1101                     break;
1102                 }
1103 
1104                 mCheckPending = true;
1105                 ++mCheckGeneration;
1106                 mPausing = true;
1107 
1108                 AString request = "PAUSE ";
1109                 request.append(mControlURL);
1110                 request.append(" RTSP/1.0\r\n");
1111 
1112                 request.append("Session: ");
1113                 request.append(mSessionID);
1114                 request.append("\r\n");
1115 
1116                 request.append("\r\n");
1117 
1118                 sp<AMessage> reply = new AMessage('pau2', this);
1119                 mConn->sendRequest(request.c_str(), reply);
1120                 break;
1121             }
1122 
1123             case 'pau2':
1124             {
1125                 int32_t result;
1126                 CHECK(msg->findInt32("result", &result));
1127                 mCheckTimeoutGeneration++;
1128 
1129                 ALOGI("PAUSE completed with result %d (%s)",
1130                      result, strerror(-result));
1131                 break;
1132             }
1133 
1134             case 'resu':
1135             {
1136                 if (mPausing && mSeekPending) {
1137                     // If seeking, Play will be sent from see1 instead
1138                     break;
1139                 }
1140 
1141                 if (!mPausing) {
1142                     // Dont send PLAY if we have not paused
1143                     break;
1144                 }
1145                 AString request = "PLAY ";
1146                 request.append(mControlURL);
1147                 request.append(" RTSP/1.0\r\n");
1148 
1149                 request.append("Session: ");
1150                 request.append(mSessionID);
1151                 request.append("\r\n");
1152 
1153                 request.append("\r\n");
1154 
1155                 sp<AMessage> reply = new AMessage('res2', this);
1156                 mConn->sendRequest(request.c_str(), reply);
1157                 break;
1158             }
1159 
1160             case 'res2':
1161             {
1162                 int32_t result;
1163                 CHECK(msg->findInt32("result", &result));
1164 
1165                 ALOGI("PLAY (for resume) completed with result %d (%s)",
1166                      result, strerror(-result));
1167 
1168                 mCheckPending = false;
1169                 ++mCheckGeneration;
1170                 postAccessUnitTimeoutCheck();
1171 
1172                 if (result == OK) {
1173                     sp<RefBase> obj;
1174                     CHECK(msg->findObject("response", &obj));
1175                     sp<ARTSPResponse> response =
1176                         static_cast<ARTSPResponse *>(obj.get());
1177 
1178                     if (response->mStatusCode != 200) {
1179                         result = UNKNOWN_ERROR;
1180                     } else {
1181                         parsePlayResponse(response);
1182 
1183                         // Post new timeout in order to make sure to use
1184                         // fake timestamps if no new Sender Reports arrive
1185                         postTimeout();
1186                     }
1187                 }
1188 
1189                 if (result != OK) {
1190                     ALOGE("resume failed, aborting.");
1191                     (new AMessage('abor', this))->post();
1192                 }
1193 
1194                 mPausing = false;
1195                 break;
1196             }
1197 
1198             case 'seek':
1199             {
1200                 if (!mSeekable) {
1201                     ALOGW("This is a live stream, ignoring seek request.");
1202 
1203                     sp<AMessage> msg = mNotify->dup();
1204                     msg->setInt32("what", kWhatSeekDone);
1205                     msg->post();
1206                     break;
1207                 }
1208 
1209                 int64_t timeUs;
1210                 CHECK(msg->findInt64("time", &timeUs));
1211 
1212                 mSeekPending = true;
1213 
1214                 // Disable the access unit timeout until we resumed
1215                 // playback again.
1216                 mCheckPending = true;
1217                 ++mCheckGeneration;
1218 
1219                 sp<AMessage> reply = new AMessage('see0', this);
1220                 reply->setInt64("time", timeUs);
1221 
1222                 if (mPausing) {
1223                     // PAUSE already sent
1224                     ALOGI("Pause already sent");
1225                     reply->post();
1226                     break;
1227                 }
1228                 AString request = "PAUSE ";
1229                 request.append(mControlURL);
1230                 request.append(" RTSP/1.0\r\n");
1231 
1232                 request.append("Session: ");
1233                 request.append(mSessionID);
1234                 request.append("\r\n");
1235 
1236                 request.append("\r\n");
1237 
1238                 mConn->sendRequest(request.c_str(), reply);
1239                 break;
1240             }
1241 
1242             case 'see0':
1243             {
1244                 // Session is paused now.
1245                 status_t err = OK;
1246                 msg->findInt32("result", &err);
1247 
1248                 int64_t timeUs;
1249                 CHECK(msg->findInt64("time", &timeUs));
1250 
1251                 sp<AMessage> notify = mNotify->dup();
1252                 notify->setInt32("what", kWhatSeekPaused);
1253                 notify->setInt32("err", err);
1254                 notify->setInt64("time", timeUs);
1255                 notify->post();
1256                 break;
1257 
1258             }
1259 
1260             case 'see1':
1261             {
1262                 for (size_t i = 0; i < mTracks.size(); ++i) {
1263                     TrackInfo *info = &mTracks.editItemAt(i);
1264 
1265                     postQueueSeekDiscontinuity(i);
1266                     info->mEOSReceived = false;
1267 
1268                     info->mRTPAnchor = 0;
1269                     info->mNTPAnchorUs = -1;
1270                 }
1271 
1272                 mAllTracksHaveTime = false;
1273                 mNTPAnchorUs = -1;
1274 
1275                 // Start new timeoutgeneration to avoid getting timeout
1276                 // before PLAY response arrive
1277                 postTimeout();
1278 
1279                 int64_t timeUs;
1280                 CHECK(msg->findInt64("time", &timeUs));
1281 
1282                 AString request = "PLAY ";
1283                 request.append(mControlURL);
1284                 request.append(" RTSP/1.0\r\n");
1285 
1286                 request.append("Session: ");
1287                 request.append(mSessionID);
1288                 request.append("\r\n");
1289 
1290                 request.append(
1291                         AStringPrintf(
1292                             "Range: npt=%lld-\r\n", timeUs / 1000000ll));
1293 
1294                 request.append("\r\n");
1295 
1296                 sp<AMessage> reply = new AMessage('see2', this);
1297                 mConn->sendRequest(request.c_str(), reply);
1298                 break;
1299             }
1300 
1301             case 'see2':
1302             {
1303                 if (mTracks.size() == 0) {
1304                     // We have already hit abor, break
1305                     break;
1306                 }
1307 
1308                 int32_t result;
1309                 CHECK(msg->findInt32("result", &result));
1310 
1311                 ALOGI("PLAY (for seek) completed with result %d (%s)",
1312                      result, strerror(-result));
1313 
1314                 mCheckPending = false;
1315                 ++mCheckGeneration;
1316                 postAccessUnitTimeoutCheck();
1317 
1318                 if (result == OK) {
1319                     sp<RefBase> obj;
1320                     CHECK(msg->findObject("response", &obj));
1321                     sp<ARTSPResponse> response =
1322                         static_cast<ARTSPResponse *>(obj.get());
1323 
1324                     if (response->mStatusCode != 200) {
1325                         result = UNKNOWN_ERROR;
1326                     } else {
1327                         parsePlayResponse(response);
1328 
1329                         // Post new timeout in order to make sure to use
1330                         // fake timestamps if no new Sender Reports arrive
1331                         postTimeout();
1332 
1333                         ssize_t i = response->mHeaders.indexOfKey("rtp-info");
1334                         CHECK_GE(i, 0);
1335 
1336                         ALOGV("rtp-info: %s", response->mHeaders.valueAt(i).c_str());
1337 
1338                         mRTPConn->seekStream();
1339 
1340                         ALOGI("seek completed.");
1341                     }
1342                 }
1343 
1344                 if (result != OK) {
1345                     ALOGE("seek failed, aborting.");
1346                     (new AMessage('abor', this))->post();
1347                 }
1348 
1349                 mPausing = false;
1350                 mSeekPending = false;
1351 
1352                 // Discard all stale access units.
1353                 for (size_t i = 0; i < mTracks.size(); ++i) {
1354                     TrackInfo *track = &mTracks.editItemAt(i);
1355                     track->mPackets.clear();
1356                 }
1357 
1358                 sp<AMessage> msg = mNotify->dup();
1359                 msg->setInt32("what", kWhatSeekDone);
1360                 msg->post();
1361                 break;
1362             }
1363 
1364             case 'biny':
1365             {
1366                 sp<ABuffer> buffer;
1367                 CHECK(msg->findBuffer("buffer", &buffer));
1368 
1369                 int32_t index;
1370                 CHECK(buffer->meta()->findInt32("index", &index));
1371 
1372                 mRTPConn->injectPacket(index, buffer);
1373                 break;
1374             }
1375 
1376             case 'tiou':
1377             {
1378                 int32_t timeoutGenerationCheck;
1379                 CHECK(msg->findInt32("tioucheck", &timeoutGenerationCheck));
1380                 if (timeoutGenerationCheck != mCheckTimeoutGeneration) {
1381                     // This is an outdated message. Ignore.
1382                     // This typically happens if a lot of seeks are
1383                     // performed, since new timeout messages now are
1384                     // posted at seek as well.
1385                     break;
1386                 }
1387                 if (!mReceivedFirstRTCPPacket) {
1388                     if (dataReceivedOnAllChannels() && !mTryFakeRTCP) {
1389                         ALOGW("We received RTP packets but no RTCP packets, "
1390                              "using fake timestamps.");
1391 
1392                         mTryFakeRTCP = true;
1393 
1394                         mReceivedFirstRTCPPacket = true;
1395 
1396                         fakeTimestamps();
1397                     } else if (!mReceivedFirstRTPPacket && !mTryTCPInterleaving) {
1398                         ALOGW("Never received any data, switching transports.");
1399 
1400                         mTryTCPInterleaving = true;
1401 
1402                         sp<AMessage> msg = new AMessage('abor', this);
1403                         msg->setInt32("reconnect", true);
1404                         msg->post();
1405                     } else {
1406                         ALOGW("Never received any data, disconnecting.");
1407                         (new AMessage('abor', this))->post();
1408                     }
1409                 } else {
1410                     if (!mAllTracksHaveTime) {
1411                         ALOGW("We received some RTCP packets, but time "
1412                               "could not be established on all tracks, now "
1413                               "using fake timestamps");
1414 
1415                         fakeTimestamps();
1416                     }
1417                 }
1418                 break;
1419             }
1420 
1421             default:
1422                 TRESPASS();
1423                 break;
1424         }
1425     }
1426 
postKeepAliveMyHandler1427     void postKeepAlive() {
1428         sp<AMessage> msg = new AMessage('aliv', this);
1429         msg->setInt32("generation", mKeepAliveGeneration);
1430         msg->post((mKeepAliveTimeoutUs * 9) / 10);
1431     }
1432 
cancelAccessUnitTimeoutCheckMyHandler1433     void cancelAccessUnitTimeoutCheck() {
1434         ALOGV("cancelAccessUnitTimeoutCheck");
1435         ++mCheckGeneration;
1436     }
1437 
postAccessUnitTimeoutCheckMyHandler1438     void postAccessUnitTimeoutCheck() {
1439         if (mCheckPending) {
1440             return;
1441         }
1442 
1443         mCheckPending = true;
1444         sp<AMessage> check = new AMessage('chek', this);
1445         check->setInt32("generation", mCheckGeneration);
1446         check->post(kAccessUnitTimeoutUs);
1447     }
1448 
SplitStringMyHandler1449     static void SplitString(
1450             const AString &s, const char *separator, List<AString> *items) {
1451         items->clear();
1452         size_t start = 0;
1453         while (start < s.size()) {
1454             ssize_t offset = s.find(separator, start);
1455 
1456             if (offset < 0) {
1457                 items->push_back(AString(s, start, s.size() - start));
1458                 break;
1459             }
1460 
1461             items->push_back(AString(s, start, offset - start));
1462             start = offset + strlen(separator);
1463         }
1464     }
1465 
parsePlayResponseMyHandler1466     void parsePlayResponse(const sp<ARTSPResponse> &response) {
1467         mPlayResponseParsed = true;
1468         if (mTracks.size() == 0) {
1469             ALOGV("parsePlayResponse: late packets ignored.");
1470             return;
1471         }
1472 
1473         ssize_t i = response->mHeaders.indexOfKey("range");
1474         if (i < 0) {
1475             // Server doesn't even tell use what range it is going to
1476             // play, therefore we won't support seeking.
1477             return;
1478         }
1479 
1480         AString range = response->mHeaders.valueAt(i);
1481         ALOGV("Range: %s", range.c_str());
1482 
1483         AString val;
1484         CHECK(GetAttribute(range.c_str(), "npt", &val));
1485 
1486         float npt1, npt2;
1487         if (!ASessionDescription::parseNTPRange(val.c_str(), &npt1, &npt2)) {
1488             // This is a live stream and therefore not seekable.
1489 
1490             ALOGI("This is a live stream");
1491             return;
1492         }
1493 
1494         i = response->mHeaders.indexOfKey("rtp-info");
1495         CHECK_GE(i, 0);
1496 
1497         AString rtpInfo = response->mHeaders.valueAt(i);
1498         List<AString> streamInfos;
1499         SplitString(rtpInfo, ",", &streamInfos);
1500 
1501         int n = 1;
1502         for (List<AString>::iterator it = streamInfos.begin();
1503              it != streamInfos.end(); ++it) {
1504             (*it).trim();
1505             ALOGV("streamInfo[%d] = %s", n, (*it).c_str());
1506 
1507             CHECK(GetAttribute((*it).c_str(), "url", &val));
1508 
1509             size_t trackIndex = 0;
1510             while (trackIndex < mTracks.size()
1511                     && !(val == mTracks.editItemAt(trackIndex).mURL)) {
1512                 ++trackIndex;
1513             }
1514             CHECK_LT(trackIndex, mTracks.size());
1515 
1516             CHECK(GetAttribute((*it).c_str(), "seq", &val));
1517 
1518             char *end;
1519             unsigned long seq = strtoul(val.c_str(), &end, 10);
1520 
1521             TrackInfo *info = &mTracks.editItemAt(trackIndex);
1522             info->mFirstSeqNumInSegment = seq;
1523             info->mNewSegment = true;
1524             info->mAllowedStaleAccessUnits = GetMaxAllowedStaleCount(info->mIsVideo);
1525 
1526             CHECK(GetAttribute((*it).c_str(), "rtptime", &val));
1527 
1528             uint32_t rtpTime = strtoul(val.c_str(), &end, 10);
1529 
1530             ALOGV("track #%d: rtpTime=%u <=> npt=%.2f", n, rtpTime, npt1);
1531 
1532             info->mNormalPlayTimeRTP = rtpTime;
1533             info->mNormalPlayTimeUs = (int64_t)(npt1 * 1E6);
1534 
1535             if (!mFirstAccessUnit) {
1536                 postNormalPlayTimeMapping(
1537                         trackIndex,
1538                         info->mNormalPlayTimeRTP, info->mNormalPlayTimeUs);
1539             }
1540 
1541             ++n;
1542         }
1543     }
1544 
getTrackFormatMyHandler1545     sp<MetaData> getTrackFormat(size_t index, int32_t *timeScale) {
1546         CHECK_GE(index, 0u);
1547         CHECK_LT(index, mTracks.size());
1548 
1549         const TrackInfo &info = mTracks.itemAt(index);
1550 
1551         *timeScale = info.mTimeScale;
1552 
1553         return info.mPacketSource->getFormat();
1554     }
1555 
countTracksMyHandler1556     size_t countTracks() const {
1557         return mTracks.size();
1558     }
1559 
1560 private:
1561     struct TrackInfo {
1562         AString mURL;
1563         int mRTPSocket;
1564         int mRTCPSocket;
1565         bool mUsingInterleavedTCP;
1566         bool mIsVideo;
1567         uint32_t mFirstSeqNumInSegment;
1568         bool mNewSegment;
1569         int32_t mAllowedStaleAccessUnits;
1570 
1571         uint32_t mRTPAnchor;
1572         int64_t mNTPAnchorUs;
1573         int32_t mTimeScale;
1574         bool mEOSReceived;
1575 
1576         uint32_t mNormalPlayTimeRTP;
1577         int64_t mNormalPlayTimeUs;
1578 
1579         sp<APacketSource> mPacketSource;
1580 
1581         // Stores packets temporarily while no notion of time
1582         // has been established yet.
1583         List<sp<ABuffer> > mPackets;
1584     };
1585 
1586     sp<AMessage> mNotify;
1587     bool mUIDValid;
1588     uid_t mUID;
1589     sp<ALooper> mNetLooper;
1590     sp<ARTSPConnection> mConn;
1591     sp<ARTPConnection> mRTPConn;
1592     sp<ASessionDescription> mSessionDesc;
1593     AString mOriginalSessionURL;  // This one still has user:pass@
1594     AString mSessionURL;
1595     AString mSessionHost;
1596     AString mBaseURL;
1597     AString mControlURL;
1598     AString mSessionID;
1599     bool mSetupTracksSuccessful;
1600     bool mSeekPending;
1601     bool mFirstAccessUnit;
1602 
1603     bool mAllTracksHaveTime;
1604     int64_t mNTPAnchorUs;
1605     int64_t mMediaAnchorUs;
1606     int64_t mLastMediaTimeUs;
1607 
1608     int64_t mNumAccessUnitsReceived;
1609     bool mCheckPending;
1610     int32_t mCheckGeneration;
1611     int32_t mCheckTimeoutGeneration;
1612     bool mTryTCPInterleaving;
1613     bool mTryFakeRTCP;
1614     bool mReceivedFirstRTCPPacket;
1615     bool mReceivedFirstRTPPacket;
1616     bool mSeekable;
1617     int64_t mKeepAliveTimeoutUs;
1618     int32_t mKeepAliveGeneration;
1619     bool mPausing;
1620     int32_t mPauseGeneration;
1621 
1622     Vector<TrackInfo> mTracks;
1623 
1624     bool mPlayResponseParsed;
1625 
setupTrackMyHandler1626     void setupTrack(size_t index) {
1627         sp<APacketSource> source =
1628             new APacketSource(mSessionDesc, index);
1629 
1630         if (source->initCheck() != OK) {
1631             ALOGW("Unsupported format. Ignoring track #%zu.", index);
1632 
1633             sp<AMessage> reply = new AMessage('setu', this);
1634             reply->setSize("index", index);
1635             reply->setInt32("result", ERROR_UNSUPPORTED);
1636             reply->post();
1637             return;
1638         }
1639 
1640         AString url;
1641         CHECK(mSessionDesc->findAttribute(index, "a=control", &url));
1642 
1643         AString trackURL;
1644         CHECK(MakeURL(mBaseURL.c_str(), url.c_str(), &trackURL));
1645 
1646         mTracks.push(TrackInfo());
1647         TrackInfo *info = &mTracks.editItemAt(mTracks.size() - 1);
1648         info->mURL = trackURL;
1649         info->mPacketSource = source;
1650         info->mUsingInterleavedTCP = false;
1651         info->mIsVideo = source->isVideo();
1652         info->mFirstSeqNumInSegment = 0;
1653         info->mNewSegment = true;
1654         info->mAllowedStaleAccessUnits = GetMaxAllowedStaleCount(info->mIsVideo);
1655         info->mRTPSocket = -1;
1656         info->mRTCPSocket = -1;
1657         info->mRTPAnchor = 0;
1658         info->mNTPAnchorUs = -1;
1659         info->mNormalPlayTimeRTP = 0;
1660         info->mNormalPlayTimeUs = 0ll;
1661 
1662         unsigned long PT;
1663         AString formatDesc;
1664         AString formatParams;
1665         mSessionDesc->getFormatType(index, &PT, &formatDesc, &formatParams);
1666 
1667         int32_t timescale;
1668         int32_t numChannels;
1669         ASessionDescription::ParseFormatDesc(
1670                 formatDesc.c_str(), &timescale, &numChannels);
1671 
1672         info->mTimeScale = timescale;
1673         info->mEOSReceived = false;
1674 
1675         ALOGV("track #%zu URL=%s", mTracks.size(), trackURL.c_str());
1676 
1677         AString request = "SETUP ";
1678         request.append(trackURL);
1679         request.append(" RTSP/1.0\r\n");
1680 
1681         if (mTryTCPInterleaving) {
1682             size_t interleaveIndex = 2 * (mTracks.size() - 1);
1683             info->mUsingInterleavedTCP = true;
1684             info->mRTPSocket = interleaveIndex;
1685             info->mRTCPSocket = interleaveIndex + 1;
1686 
1687             request.append("Transport: RTP/AVP/TCP;interleaved=");
1688             request.append(interleaveIndex);
1689             request.append("-");
1690             request.append(interleaveIndex + 1);
1691         } else {
1692             unsigned rtpPort;
1693             ARTPConnection::MakePortPair(
1694                     &info->mRTPSocket, &info->mRTCPSocket, &rtpPort);
1695 
1696             if (mUIDValid) {
1697                 NetworkUtils::RegisterSocketUserTag(info->mRTPSocket, mUID,
1698                         (uint32_t)*(uint32_t*) "RTP_");
1699                 NetworkUtils::RegisterSocketUserTag(info->mRTCPSocket, mUID,
1700                         (uint32_t)*(uint32_t*) "RTP_");
1701                 NetworkUtils::RegisterSocketUserMark(info->mRTPSocket, mUID);
1702                 NetworkUtils::RegisterSocketUserMark(info->mRTCPSocket, mUID);
1703             }
1704 
1705             request.append("Transport: RTP/AVP/UDP;unicast;client_port=");
1706             request.append(rtpPort);
1707             request.append("-");
1708             request.append(rtpPort + 1);
1709         }
1710 
1711         request.append("\r\n");
1712 
1713         if (index > 1) {
1714             request.append("Session: ");
1715             request.append(mSessionID);
1716             request.append("\r\n");
1717         }
1718 
1719         request.append("\r\n");
1720 
1721         sp<AMessage> reply = new AMessage('setu', this);
1722         reply->setSize("index", index);
1723         reply->setSize("track-index", mTracks.size() - 1);
1724         mConn->sendRequest(request.c_str(), reply);
1725     }
1726 
MakeURLMyHandler1727     static bool MakeURL(const char *baseURL, const char *url, AString *out) {
1728         out->clear();
1729 
1730         if (strncasecmp("rtsp://", baseURL, 7)) {
1731             // Base URL must be absolute
1732             return false;
1733         }
1734 
1735         if (!strncasecmp("rtsp://", url, 7)) {
1736             // "url" is already an absolute URL, ignore base URL.
1737             out->setTo(url);
1738             return true;
1739         }
1740 
1741         size_t n = strlen(baseURL);
1742         out->setTo(baseURL);
1743         if (baseURL[n - 1] != '/') {
1744             out->append("/");
1745         }
1746         out->append(url);
1747 
1748         return true;
1749     }
1750 
fakeTimestampsMyHandler1751     void fakeTimestamps() {
1752         mNTPAnchorUs = -1ll;
1753         for (size_t i = 0; i < mTracks.size(); ++i) {
1754             onTimeUpdate(i, 0, 0ll);
1755         }
1756     }
1757 
dataReceivedOnAllChannelsMyHandler1758     bool dataReceivedOnAllChannels() {
1759         TrackInfo *track;
1760         for (size_t i = 0; i < mTracks.size(); ++i) {
1761             track = &mTracks.editItemAt(i);
1762             if (track->mPackets.empty()) {
1763                 return false;
1764             }
1765         }
1766         return true;
1767     }
1768 
handleFirstAccessUnitMyHandler1769     void handleFirstAccessUnit() {
1770         if (mFirstAccessUnit) {
1771             sp<AMessage> msg = mNotify->dup();
1772             msg->setInt32("what", kWhatConnected);
1773             msg->post();
1774 
1775             if (mSeekable) {
1776                 for (size_t i = 0; i < mTracks.size(); ++i) {
1777                     TrackInfo *info = &mTracks.editItemAt(i);
1778 
1779                     postNormalPlayTimeMapping(
1780                             i,
1781                             info->mNormalPlayTimeRTP, info->mNormalPlayTimeUs);
1782                 }
1783             }
1784 
1785             mFirstAccessUnit = false;
1786         }
1787     }
1788 
onTimeUpdateMyHandler1789     void onTimeUpdate(int32_t trackIndex, uint32_t rtpTime, uint64_t ntpTime) {
1790         ALOGV("onTimeUpdate track %d, rtpTime = 0x%08x, ntpTime = %#016llx",
1791              trackIndex, rtpTime, (long long)ntpTime);
1792 
1793         int64_t ntpTimeUs = (int64_t)(ntpTime * 1E6 / (1ll << 32));
1794 
1795         TrackInfo *track = &mTracks.editItemAt(trackIndex);
1796 
1797         track->mRTPAnchor = rtpTime;
1798         track->mNTPAnchorUs = ntpTimeUs;
1799 
1800         if (mNTPAnchorUs < 0) {
1801             mNTPAnchorUs = ntpTimeUs;
1802             mMediaAnchorUs = mLastMediaTimeUs;
1803         }
1804 
1805         if (!mAllTracksHaveTime) {
1806             bool allTracksHaveTime = (mTracks.size() > 0);
1807             for (size_t i = 0; i < mTracks.size(); ++i) {
1808                 TrackInfo *track = &mTracks.editItemAt(i);
1809                 if (track->mNTPAnchorUs < 0) {
1810                     allTracksHaveTime = false;
1811                     break;
1812                 }
1813             }
1814             if (allTracksHaveTime) {
1815                 mAllTracksHaveTime = true;
1816                 ALOGI("Time now established for all tracks.");
1817             }
1818         }
1819         if (mAllTracksHaveTime && dataReceivedOnAllChannels()) {
1820             handleFirstAccessUnit();
1821 
1822             // Time is now established, lets start timestamping immediately
1823             for (size_t i = 0; i < mTracks.size(); ++i) {
1824                 if (OK != processAccessUnitQueue(i)) {
1825                     return;
1826                 }
1827             }
1828             for (size_t i = 0; i < mTracks.size(); ++i) {
1829                 TrackInfo *trackInfo = &mTracks.editItemAt(i);
1830                 if (trackInfo->mEOSReceived) {
1831                     postQueueEOS(i, ERROR_END_OF_STREAM);
1832                     trackInfo->mEOSReceived = false;
1833                 }
1834             }
1835         }
1836     }
1837 
processAccessUnitQueueMyHandler1838     status_t processAccessUnitQueue(int32_t trackIndex) {
1839         TrackInfo *track = &mTracks.editItemAt(trackIndex);
1840         while (!track->mPackets.empty()) {
1841             sp<ABuffer> accessUnit = *track->mPackets.begin();
1842             track->mPackets.erase(track->mPackets.begin());
1843 
1844             uint32_t seqNum = (uint32_t)accessUnit->int32Data();
1845             if (track->mNewSegment) {
1846                 // The sequence number from RTP packet has only 16 bits and is extended
1847                 // by ARTPSource. Only the low 16 bits of seq in RTP-Info of reply of
1848                 // RTSP "PLAY" command should be used to detect the first RTP packet
1849                 // after seeking.
1850                 int32_t maxAllowedStaleAccessUnits = GetMaxAllowedStaleCount(track->mIsVideo);
1851                 if (mSeekable) {
1852                     if (track->mAllowedStaleAccessUnits > 0) {
1853                         uint32_t seqNum16 = seqNum & 0xffff;
1854                         uint32_t firstSeqNumInSegment16 = track->mFirstSeqNumInSegment & 0xffff;
1855                         if (seqNum16 > firstSeqNumInSegment16 + maxAllowedStaleAccessUnits
1856                                 || seqNum16 < firstSeqNumInSegment16) {
1857                             // Not the first rtp packet of the stream after seeking, discarding.
1858                             track->mAllowedStaleAccessUnits--;
1859                             ALOGV("discarding stale access unit (0x%x : 0x%x)",
1860                                  seqNum, track->mFirstSeqNumInSegment);
1861                             continue;
1862                         }
1863                         ALOGW_IF(seqNum16 != firstSeqNumInSegment16,
1864                                 "Missing the first packet(%u), now take packet(%u) as first one",
1865                                 track->mFirstSeqNumInSegment, seqNum);
1866                     } else { // track->mAllowedStaleAccessUnits <= 0
1867                         mNumAccessUnitsReceived = 0;
1868                         ALOGW_IF(track->mAllowedStaleAccessUnits == 0,
1869                              "Still no first rtp packet after %d stale ones",
1870                              maxAllowedStaleAccessUnits);
1871                         track->mAllowedStaleAccessUnits = -1;
1872                         return UNKNOWN_ERROR;
1873                     }
1874                 }
1875 
1876                 // Now found the first rtp packet of the stream after seeking.
1877                 track->mFirstSeqNumInSegment = seqNum;
1878                 track->mNewSegment = false;
1879             }
1880 
1881             if (seqNum < track->mFirstSeqNumInSegment) {
1882                 ALOGV("dropping stale access-unit (%d < %d)",
1883                      seqNum, track->mFirstSeqNumInSegment);
1884                 continue;
1885             }
1886 
1887             if (addMediaTimestamp(trackIndex, track, accessUnit)) {
1888                 postQueueAccessUnit(trackIndex, accessUnit);
1889             }
1890         }
1891         return OK;
1892     }
1893 
onAccessUnitCompleteMyHandler1894     void onAccessUnitComplete(
1895             int32_t trackIndex, const sp<ABuffer> &accessUnit) {
1896         TrackInfo *track = &mTracks.editItemAt(trackIndex);
1897         track->mPackets.push_back(accessUnit);
1898 
1899         uint32_t seqNum = (uint32_t)accessUnit->int32Data();
1900         ALOGV("onAccessUnitComplete track %d storing accessunit %u", trackIndex, seqNum);
1901 
1902         if(!mPlayResponseParsed){
1903             ALOGV("play response is not parsed");
1904             return;
1905         }
1906 
1907         handleFirstAccessUnit();
1908 
1909         if (!mAllTracksHaveTime) {
1910             ALOGV("storing accessUnit, no time established yet");
1911             return;
1912         }
1913 
1914         if (OK != processAccessUnitQueue(trackIndex)) {
1915             return;
1916         }
1917 
1918         if (track->mEOSReceived) {
1919             postQueueEOS(trackIndex, ERROR_END_OF_STREAM);
1920             track->mEOSReceived = false;
1921         }
1922     }
1923 
addMediaTimestampMyHandler1924     bool addMediaTimestamp(
1925             int32_t trackIndex, const TrackInfo *track,
1926             const sp<ABuffer> &accessUnit) {
1927         UNUSED_UNLESS_VERBOSE(trackIndex);
1928 
1929         uint32_t rtpTime;
1930         CHECK(accessUnit->meta()->findInt32(
1931                     "rtp-time", (int32_t *)&rtpTime));
1932 
1933         int64_t relRtpTimeUs =
1934             (((int64_t)rtpTime - (int64_t)track->mRTPAnchor) * 1000000ll)
1935                 / track->mTimeScale;
1936 
1937         int64_t ntpTimeUs = track->mNTPAnchorUs + relRtpTimeUs;
1938 
1939         int64_t mediaTimeUs = mMediaAnchorUs + ntpTimeUs - mNTPAnchorUs;
1940 
1941         if (mediaTimeUs > mLastMediaTimeUs) {
1942             mLastMediaTimeUs = mediaTimeUs;
1943         }
1944 
1945         if (mediaTimeUs < 0 && !mSeekable) {
1946             ALOGV("dropping early accessUnit.");
1947             return false;
1948         }
1949 
1950         ALOGV("track %d rtpTime=%u mediaTimeUs = %lld us (%.2f secs)",
1951              trackIndex, rtpTime, (long long)mediaTimeUs, mediaTimeUs / 1E6);
1952 
1953         accessUnit->meta()->setInt64("timeUs", mediaTimeUs);
1954 
1955         return true;
1956     }
1957 
postQueueAccessUnitMyHandler1958     void postQueueAccessUnit(
1959             size_t trackIndex, const sp<ABuffer> &accessUnit) {
1960         sp<AMessage> msg = mNotify->dup();
1961         msg->setInt32("what", kWhatAccessUnit);
1962         msg->setSize("trackIndex", trackIndex);
1963         msg->setBuffer("accessUnit", accessUnit);
1964         msg->post();
1965     }
1966 
postQueueEOSMyHandler1967     void postQueueEOS(size_t trackIndex, status_t finalResult) {
1968         sp<AMessage> msg = mNotify->dup();
1969         msg->setInt32("what", kWhatEOS);
1970         msg->setSize("trackIndex", trackIndex);
1971         msg->setInt32("finalResult", finalResult);
1972         msg->post();
1973     }
1974 
postQueueSeekDiscontinuityMyHandler1975     void postQueueSeekDiscontinuity(size_t trackIndex) {
1976         sp<AMessage> msg = mNotify->dup();
1977         msg->setInt32("what", kWhatSeekDiscontinuity);
1978         msg->setSize("trackIndex", trackIndex);
1979         msg->post();
1980     }
1981 
postNormalPlayTimeMappingMyHandler1982     void postNormalPlayTimeMapping(
1983             size_t trackIndex, uint32_t rtpTime, int64_t nptUs) {
1984         sp<AMessage> msg = mNotify->dup();
1985         msg->setInt32("what", kWhatNormalPlayTimeMapping);
1986         msg->setSize("trackIndex", trackIndex);
1987         msg->setInt32("rtpTime", rtpTime);
1988         msg->setInt64("nptUs", nptUs);
1989         msg->post();
1990     }
1991 
postTimeoutMyHandler1992     void postTimeout() {
1993         sp<AMessage> timeout = new AMessage('tiou', this);
1994         mCheckTimeoutGeneration++;
1995         timeout->setInt32("tioucheck", mCheckTimeoutGeneration);
1996 
1997         int64_t startupTimeoutUs;
1998         startupTimeoutUs = property_get_int64("media.rtsp.timeout-us", kStartupTimeoutUs);
1999         timeout->post(startupTimeoutUs);
2000     }
2001 
2002     DISALLOW_EVIL_CONSTRUCTORS(MyHandler);
2003 };
2004 
2005 }  // namespace android
2006 
2007 #endif  // MY_HANDLER_H_
2008