1 /*
2  * Copyright (C) 2010 The Android Open Source Project
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *      http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 #ifndef MY_HANDLER_H_
18 
19 #define MY_HANDLER_H_
20 
21 //#define LOG_NDEBUG 0
22 
23 #ifndef LOG_TAG
24 #define LOG_TAG "MyHandler"
25 #endif
26 
27 #include <utils/Log.h>
28 #include <cutils/properties.h> // for property_get
29 
30 #include "APacketSource.h"
31 #include "ARTPConnection.h"
32 #include "ARTSPConnection.h"
33 #include "ASessionDescription.h"
34 
35 #include <ctype.h>
36 #include <cutils/properties.h>
37 
38 #include <media/stagefright/foundation/ABuffer.h>
39 #include <media/stagefright/foundation/ADebug.h>
40 #include <media/stagefright/foundation/ALooper.h>
41 #include <media/stagefright/foundation/AMessage.h>
42 #include <media/stagefright/MediaErrors.h>
43 #include <media/stagefright/Utils.h>
44 
45 #include <arpa/inet.h>
46 #include <sys/socket.h>
47 #include <netdb.h>
48 
49 #include "HTTPBase.h"
50 
51 #if LOG_NDEBUG
52 #define UNUSED_UNLESS_VERBOSE(x) (void)(x)
53 #else
54 #define UNUSED_UNLESS_VERBOSE(x)
55 #endif
56 
57 // If no access units are received within 5 secs, assume that the rtp
58 // stream has ended and signal end of stream.
59 static int64_t kAccessUnitTimeoutUs = 10000000ll;
60 
61 // If no access units arrive for the first 10 secs after starting the
62 // stream, assume none ever will and signal EOS or switch transports.
63 static int64_t kStartupTimeoutUs = 10000000ll;
64 
65 static int64_t kDefaultKeepAliveTimeoutUs = 60000000ll;
66 
67 static int64_t kPauseDelayUs = 3000000ll;
68 
69 // The allowed maximum number of stale access units at the beginning of
70 // a new sequence.
71 static int32_t kMaxAllowedStaleAccessUnits = 20;
72 
73 namespace android {
74 
GetAttribute(const char * s,const char * key,AString * value)75 static bool GetAttribute(const char *s, const char *key, AString *value) {
76     value->clear();
77 
78     size_t keyLen = strlen(key);
79 
80     for (;;) {
81         while (isspace(*s)) {
82             ++s;
83         }
84 
85         const char *colonPos = strchr(s, ';');
86 
87         size_t len =
88             (colonPos == NULL) ? strlen(s) : colonPos - s;
89 
90         if (len >= keyLen + 1 && s[keyLen] == '=' && !strncmp(s, key, keyLen)) {
91             value->setTo(&s[keyLen + 1], len - keyLen - 1);
92             return true;
93         }
94 
95         if (colonPos == NULL) {
96             return false;
97         }
98 
99         s = colonPos + 1;
100     }
101 }
102 
103 struct MyHandler : public AHandler {
104     enum {
105         kWhatConnected                  = 'conn',
106         kWhatDisconnected               = 'disc',
107         kWhatSeekPaused                 = 'spau',
108         kWhatSeekDone                   = 'sdon',
109 
110         kWhatAccessUnit                 = 'accU',
111         kWhatEOS                        = 'eos!',
112         kWhatSeekDiscontinuity          = 'seeD',
113         kWhatNormalPlayTimeMapping      = 'nptM',
114     };
115 
116     MyHandler(
117             const char *url,
118             const sp<AMessage> &notify,
119             bool uidValid = false, uid_t uid = 0)
mNotifyMyHandler120         : mNotify(notify),
121           mUIDValid(uidValid),
122           mUID(uid),
123           mNetLooper(new ALooper),
124           mConn(new ARTSPConnection(mUIDValid, mUID)),
125           mRTPConn(new ARTPConnection),
126           mOriginalSessionURL(url),
127           mSessionURL(url),
128           mSetupTracksSuccessful(false),
129           mSeekPending(false),
130           mFirstAccessUnit(true),
131           mAllTracksHaveTime(false),
132           mNTPAnchorUs(-1),
133           mMediaAnchorUs(-1),
134           mLastMediaTimeUs(0),
135           mNumAccessUnitsReceived(0),
136           mCheckPending(false),
137           mCheckGeneration(0),
138           mCheckTimeoutGeneration(0),
139           mTryTCPInterleaving(property_get_bool("rtp.transport.TCP", false)),
140           mTryFakeRTCP(false),
141           mReceivedFirstRTCPPacket(false),
142           mReceivedFirstRTPPacket(false),
143           mSeekable(true),
144           mKeepAliveTimeoutUs(kDefaultKeepAliveTimeoutUs),
145           mKeepAliveGeneration(0),
146           mPausing(false),
147           mPauseGeneration(0),
148           mPlayResponseParsed(false) {
149         mNetLooper->setName("rtsp net");
150         mNetLooper->start(false /* runOnCallingThread */,
151                           false /* canCallJava */,
152                           PRIORITY_HIGHEST);
153 
154         // Strip any authentication info from the session url, we don't
155         // want to transmit user/pass in cleartext.
156         AString host, path, user, pass;
157         unsigned port;
158         CHECK(ARTSPConnection::ParseURL(
159                     mSessionURL.c_str(), &host, &port, &path, &user, &pass));
160 
161         if (user.size() > 0) {
162             mSessionURL.clear();
163             mSessionURL.append("rtsp://");
164             mSessionURL.append(host);
165             mSessionURL.append(":");
166             mSessionURL.append(AStringPrintf("%u", port));
167             mSessionURL.append(path);
168 
169             ALOGV("rewritten session url: '%s'", mSessionURL.c_str());
170         }
171 
172         mSessionHost = host;
173     }
174 
connectMyHandler175     void connect() {
176         looper()->registerHandler(mConn);
177         (1 ? mNetLooper : looper())->registerHandler(mRTPConn);
178 
179         sp<AMessage> notify = new AMessage('biny', this);
180         mConn->observeBinaryData(notify);
181 
182         sp<AMessage> reply = new AMessage('conn', this);
183         mConn->connect(mOriginalSessionURL.c_str(), reply);
184     }
185 
loadSDPMyHandler186     void loadSDP(const sp<ASessionDescription>& desc) {
187         looper()->registerHandler(mConn);
188         (1 ? mNetLooper : looper())->registerHandler(mRTPConn);
189 
190         sp<AMessage> notify = new AMessage('biny', this);
191         mConn->observeBinaryData(notify);
192 
193         sp<AMessage> reply = new AMessage('sdpl', this);
194         reply->setObject("description", desc);
195         mConn->connect(mOriginalSessionURL.c_str(), reply);
196     }
197 
getControlURLMyHandler198     AString getControlURL() {
199         AString sessionLevelControlURL;
200         if (mSessionDesc->findAttribute(
201                 0,
202                 "a=control",
203                 &sessionLevelControlURL)) {
204             if (sessionLevelControlURL.compare("*") == 0) {
205                 return mBaseURL;
206             } else {
207                 AString controlURL;
208                 CHECK(MakeURL(
209                         mBaseURL.c_str(),
210                         sessionLevelControlURL.c_str(),
211                         &controlURL));
212                 return controlURL;
213             }
214         } else {
215             return mSessionURL;
216         }
217     }
218 
disconnectMyHandler219     void disconnect() {
220         (new AMessage('abor', this))->post();
221     }
222 
seekMyHandler223     void seek(int64_t timeUs) {
224         sp<AMessage> msg = new AMessage('seek', this);
225         msg->setInt64("time", timeUs);
226         mPauseGeneration++;
227         msg->post();
228     }
229 
continueSeekAfterPauseMyHandler230     void continueSeekAfterPause(int64_t timeUs) {
231         sp<AMessage> msg = new AMessage('see1', this);
232         msg->setInt64("time", timeUs);
233         msg->post();
234     }
235 
isSeekableMyHandler236     bool isSeekable() const {
237         return mSeekable;
238     }
239 
pauseMyHandler240     void pause() {
241         sp<AMessage> msg = new AMessage('paus', this);
242         mPauseGeneration++;
243         msg->setInt32("pausecheck", mPauseGeneration);
244         msg->post();
245     }
246 
resumeMyHandler247     void resume() {
248         sp<AMessage> msg = new AMessage('resu', this);
249         mPauseGeneration++;
250         msg->post();
251     }
252 
addRRMyHandler253     static void addRR(const sp<ABuffer> &buf) {
254         uint8_t *ptr = buf->data() + buf->size();
255         ptr[0] = 0x80 | 0;
256         ptr[1] = 201;  // RR
257         ptr[2] = 0;
258         ptr[3] = 1;
259         ptr[4] = 0xde;  // SSRC
260         ptr[5] = 0xad;
261         ptr[6] = 0xbe;
262         ptr[7] = 0xef;
263 
264         buf->setRange(0, buf->size() + 8);
265     }
266 
addSDESMyHandler267     static void addSDES(int s, const sp<ABuffer> &buffer) {
268         struct sockaddr_in addr;
269         socklen_t addrSize = sizeof(addr);
270         if (getsockname(s, (sockaddr *)&addr, &addrSize) != 0) {
271             inet_aton("0.0.0.0", &(addr.sin_addr));
272         }
273 
274         uint8_t *data = buffer->data() + buffer->size();
275         data[0] = 0x80 | 1;
276         data[1] = 202;  // SDES
277         data[4] = 0xde;  // SSRC
278         data[5] = 0xad;
279         data[6] = 0xbe;
280         data[7] = 0xef;
281 
282         size_t offset = 8;
283 
284         data[offset++] = 1;  // CNAME
285 
286         AString cname = "stagefright@";
287         cname.append(inet_ntoa(addr.sin_addr));
288         data[offset++] = cname.size();
289 
290         memcpy(&data[offset], cname.c_str(), cname.size());
291         offset += cname.size();
292 
293         data[offset++] = 6;  // TOOL
294 
295         AString tool = MakeUserAgent();
296 
297         data[offset++] = tool.size();
298 
299         memcpy(&data[offset], tool.c_str(), tool.size());
300         offset += tool.size();
301 
302         data[offset++] = 0;
303 
304         if ((offset % 4) > 0) {
305             size_t count = 4 - (offset % 4);
306             switch (count) {
307                 case 3:
308                     data[offset++] = 0;
309                 case 2:
310                     data[offset++] = 0;
311                 case 1:
312                     data[offset++] = 0;
313             }
314         }
315 
316         size_t numWords = (offset / 4) - 1;
317         data[2] = numWords >> 8;
318         data[3] = numWords & 0xff;
319 
320         buffer->setRange(buffer->offset(), buffer->size() + offset);
321     }
322 
323     // In case we're behind NAT, fire off two UDP packets to the remote
324     // rtp/rtcp ports to poke a hole into the firewall for future incoming
325     // packets. We're going to send an RR/SDES RTCP packet to both of them.
pokeAHoleMyHandler326     bool pokeAHole(int rtpSocket, int rtcpSocket, const AString &transport) {
327         struct sockaddr_in addr;
328         memset(addr.sin_zero, 0, sizeof(addr.sin_zero));
329         addr.sin_family = AF_INET;
330 
331         AString source;
332         AString server_port;
333         if (!GetAttribute(transport.c_str(),
334                           "source",
335                           &source)) {
336             ALOGW("Missing 'source' field in Transport response. Using "
337                  "RTSP endpoint address.");
338 
339             struct hostent *ent = gethostbyname(mSessionHost.c_str());
340             if (ent == NULL) {
341                 ALOGE("Failed to look up address of session host '%s'",
342                      mSessionHost.c_str());
343 
344                 return false;
345             }
346 
347             addr.sin_addr.s_addr = *(in_addr_t *)ent->h_addr;
348         } else {
349             addr.sin_addr.s_addr = inet_addr(source.c_str());
350         }
351 
352         if (!GetAttribute(transport.c_str(),
353                                  "server_port",
354                                  &server_port)) {
355             ALOGI("Missing 'server_port' field in Transport response.");
356             return false;
357         }
358 
359         int rtpPort, rtcpPort;
360         if (sscanf(server_port.c_str(), "%d-%d", &rtpPort, &rtcpPort) != 2
361                 || rtpPort <= 0 || rtpPort > 65535
362                 || rtcpPort <=0 || rtcpPort > 65535
363                 || rtcpPort != rtpPort + 1) {
364             ALOGE("Server picked invalid RTP/RTCP port pair %s,"
365                  " RTP port must be even, RTCP port must be one higher.",
366                  server_port.c_str());
367 
368             return false;
369         }
370 
371         if (rtpPort & 1) {
372             ALOGW("Server picked an odd RTP port, it should've picked an "
373                  "even one, we'll let it pass for now, but this may break "
374                  "in the future.");
375         }
376 
377         if (addr.sin_addr.s_addr == INADDR_NONE) {
378             return true;
379         }
380 
381         if (IN_LOOPBACK(ntohl(addr.sin_addr.s_addr))) {
382             // No firewalls to traverse on the loopback interface.
383             return true;
384         }
385 
386         // Make up an RR/SDES RTCP packet.
387         sp<ABuffer> buf = new ABuffer(65536);
388         buf->setRange(0, 0);
389         addRR(buf);
390         addSDES(rtpSocket, buf);
391 
392         addr.sin_port = htons(rtpPort);
393 
394         ssize_t n = sendto(
395                 rtpSocket, buf->data(), buf->size(), 0,
396                 (const sockaddr *)&addr, sizeof(addr));
397 
398         if (n < (ssize_t)buf->size()) {
399             ALOGE("failed to poke a hole for RTP packets");
400             return false;
401         }
402 
403         addr.sin_port = htons(rtcpPort);
404 
405         n = sendto(
406                 rtcpSocket, buf->data(), buf->size(), 0,
407                 (const sockaddr *)&addr, sizeof(addr));
408 
409         if (n < (ssize_t)buf->size()) {
410             ALOGE("failed to poke a hole for RTCP packets");
411             return false;
412         }
413 
414         ALOGV("successfully poked holes.");
415 
416         return true;
417     }
418 
isLiveStreamMyHandler419     static bool isLiveStream(const sp<ASessionDescription> &desc) {
420         AString attrLiveStream;
421         if (desc->findAttribute(0, "a=LiveStream", &attrLiveStream)) {
422             ssize_t semicolonPos = attrLiveStream.find(";", 2);
423 
424             const char* liveStreamValue;
425             if (semicolonPos < 0) {
426                 liveStreamValue = attrLiveStream.c_str();
427             } else {
428                 AString valString;
429                 valString.setTo(attrLiveStream,
430                         semicolonPos + 1,
431                         attrLiveStream.size() - semicolonPos - 1);
432                 liveStreamValue = valString.c_str();
433             }
434 
435             uint32_t value = strtoul(liveStreamValue, NULL, 10);
436             if (value == 1) {
437                 ALOGV("found live stream");
438                 return true;
439             }
440         } else {
441             // It is a live stream if no duration is returned
442             int64_t durationUs;
443             if (!desc->getDurationUs(&durationUs)) {
444                 ALOGV("No duration found, assume live stream");
445                 return true;
446             }
447         }
448 
449         return false;
450     }
451 
onMessageReceivedMyHandler452     virtual void onMessageReceived(const sp<AMessage> &msg) {
453         switch (msg->what()) {
454             case 'conn':
455             {
456                 int32_t result;
457                 CHECK(msg->findInt32("result", &result));
458 
459                 ALOGI("connection request completed with result %d (%s)",
460                      result, strerror(-result));
461 
462                 if (result == OK) {
463                     AString request;
464                     request = "DESCRIBE ";
465                     request.append(mSessionURL);
466                     request.append(" RTSP/1.0\r\n");
467                     request.append("Accept: application/sdp\r\n");
468                     request.append("\r\n");
469 
470                     sp<AMessage> reply = new AMessage('desc', this);
471                     mConn->sendRequest(request.c_str(), reply);
472                 } else {
473                     (new AMessage('disc', this))->post();
474                 }
475                 break;
476             }
477 
478             case 'disc':
479             {
480                 ++mKeepAliveGeneration;
481 
482                 int32_t reconnect;
483                 if (msg->findInt32("reconnect", &reconnect) && reconnect) {
484                     sp<AMessage> reply = new AMessage('conn', this);
485                     mConn->connect(mOriginalSessionURL.c_str(), reply);
486                 } else {
487                     (new AMessage('quit', this))->post();
488                 }
489                 break;
490             }
491 
492             case 'desc':
493             {
494                 int32_t result;
495                 CHECK(msg->findInt32("result", &result));
496 
497                 ALOGI("DESCRIBE completed with result %d (%s)",
498                      result, strerror(-result));
499 
500                 if (result == OK) {
501                     sp<RefBase> obj;
502                     CHECK(msg->findObject("response", &obj));
503                     sp<ARTSPResponse> response =
504                         static_cast<ARTSPResponse *>(obj.get());
505 
506                     if (response->mStatusCode == 301 || response->mStatusCode == 302) {
507                         ssize_t i = response->mHeaders.indexOfKey("location");
508                         CHECK_GE(i, 0);
509 
510                         mOriginalSessionURL = response->mHeaders.valueAt(i);
511                         mSessionURL = mOriginalSessionURL;
512 
513                         // Strip any authentication info from the session url, we don't
514                         // want to transmit user/pass in cleartext.
515                         AString host, path, user, pass;
516                         unsigned port;
517                         if (ARTSPConnection::ParseURL(
518                                     mSessionURL.c_str(), &host, &port, &path, &user, &pass)
519                                 && user.size() > 0) {
520                             mSessionURL.clear();
521                             mSessionURL.append("rtsp://");
522                             mSessionURL.append(host);
523                             mSessionURL.append(":");
524                             mSessionURL.append(AStringPrintf("%u", port));
525                             mSessionURL.append(path);
526 
527                             ALOGI("rewritten session url: '%s'", mSessionURL.c_str());
528                         }
529 
530                         sp<AMessage> reply = new AMessage('conn', this);
531                         mConn->connect(mOriginalSessionURL.c_str(), reply);
532                         break;
533                     }
534 
535                     if (response->mStatusCode != 200) {
536                         result = UNKNOWN_ERROR;
537                     } else if (response->mContent == NULL) {
538                         result = ERROR_MALFORMED;
539                         ALOGE("The response has no content.");
540                     } else {
541                         mSessionDesc = new ASessionDescription;
542 
543                         mSessionDesc->setTo(
544                                 response->mContent->data(),
545                                 response->mContent->size());
546 
547                         if (!mSessionDesc->isValid()) {
548                             ALOGE("Failed to parse session description.");
549                             result = ERROR_MALFORMED;
550                         } else {
551                             ssize_t i = response->mHeaders.indexOfKey("content-base");
552                             if (i >= 0) {
553                                 mBaseURL = response->mHeaders.valueAt(i);
554                             } else {
555                                 i = response->mHeaders.indexOfKey("content-location");
556                                 if (i >= 0) {
557                                     mBaseURL = response->mHeaders.valueAt(i);
558                                 } else {
559                                     mBaseURL = mSessionURL;
560                                 }
561                             }
562 
563                             mSeekable = !isLiveStream(mSessionDesc);
564 
565                             if (!mBaseURL.startsWith("rtsp://")) {
566                                 // Some misbehaving servers specify a relative
567                                 // URL in one of the locations above, combine
568                                 // it with the absolute session URL to get
569                                 // something usable...
570 
571                                 ALOGW("Server specified a non-absolute base URL"
572                                      ", combining it with the session URL to "
573                                      "get something usable...");
574 
575                                 AString tmp;
576                                 CHECK(MakeURL(
577                                             mSessionURL.c_str(),
578                                             mBaseURL.c_str(),
579                                             &tmp));
580 
581                                 mBaseURL = tmp;
582                             }
583 
584                             mControlURL = getControlURL();
585 
586                             if (mSessionDesc->countTracks() < 2) {
587                                 // There's no actual tracks in this session.
588                                 // The first "track" is merely session meta
589                                 // data.
590 
591                                 ALOGW("Session doesn't contain any playable "
592                                      "tracks. Aborting.");
593                                 result = ERROR_UNSUPPORTED;
594                             } else {
595                                 setupTrack(1);
596                             }
597                         }
598                     }
599                 }
600 
601                 if (result != OK) {
602                     sp<AMessage> reply = new AMessage('disc', this);
603                     mConn->disconnect(reply);
604                 }
605                 break;
606             }
607 
608             case 'sdpl':
609             {
610                 int32_t result;
611                 CHECK(msg->findInt32("result", &result));
612 
613                 ALOGI("SDP connection request completed with result %d (%s)",
614                      result, strerror(-result));
615 
616                 if (result == OK) {
617                     sp<RefBase> obj;
618                     CHECK(msg->findObject("description", &obj));
619                     mSessionDesc =
620                         static_cast<ASessionDescription *>(obj.get());
621 
622                     if (!mSessionDesc->isValid()) {
623                         ALOGE("Failed to parse session description.");
624                         result = ERROR_MALFORMED;
625                     } else {
626                         mBaseURL = mSessionURL;
627 
628                         mSeekable = !isLiveStream(mSessionDesc);
629 
630                         mControlURL = getControlURL();
631 
632                         if (mSessionDesc->countTracks() < 2) {
633                             // There's no actual tracks in this session.
634                             // The first "track" is merely session meta
635                             // data.
636 
637                             ALOGW("Session doesn't contain any playable "
638                                  "tracks. Aborting.");
639                             result = ERROR_UNSUPPORTED;
640                         } else {
641                             setupTrack(1);
642                         }
643                     }
644                 }
645 
646                 if (result != OK) {
647                     sp<AMessage> reply = new AMessage('disc', this);
648                     mConn->disconnect(reply);
649                 }
650                 break;
651             }
652 
653             case 'setu':
654             {
655                 size_t index;
656                 CHECK(msg->findSize("index", &index));
657 
658                 TrackInfo *track = NULL;
659                 size_t trackIndex;
660                 if (msg->findSize("track-index", &trackIndex)) {
661                     track = &mTracks.editItemAt(trackIndex);
662                 }
663 
664                 int32_t result;
665                 CHECK(msg->findInt32("result", &result));
666 
667                 ALOGI("SETUP(%zu) completed with result %d (%s)",
668                      index, result, strerror(-result));
669 
670                 if (result == OK) {
671                     CHECK(track != NULL);
672 
673                     sp<RefBase> obj;
674                     CHECK(msg->findObject("response", &obj));
675                     sp<ARTSPResponse> response =
676                         static_cast<ARTSPResponse *>(obj.get());
677 
678                     if (response->mStatusCode != 200) {
679                         result = UNKNOWN_ERROR;
680                     } else {
681                         ssize_t i = response->mHeaders.indexOfKey("session");
682                         CHECK_GE(i, 0);
683 
684                         mSessionID = response->mHeaders.valueAt(i);
685 
686                         mKeepAliveTimeoutUs = kDefaultKeepAliveTimeoutUs;
687                         AString timeoutStr;
688                         if (GetAttribute(
689                                     mSessionID.c_str(), "timeout", &timeoutStr)) {
690                             char *end;
691                             unsigned long timeoutSecs =
692                                 strtoul(timeoutStr.c_str(), &end, 10);
693 
694                             if (end == timeoutStr.c_str() || *end != '\0') {
695                                 ALOGW("server specified malformed timeout '%s'",
696                                      timeoutStr.c_str());
697 
698                                 mKeepAliveTimeoutUs = kDefaultKeepAliveTimeoutUs;
699                             } else if (timeoutSecs < 15) {
700                                 ALOGW("server specified too short a timeout "
701                                      "(%lu secs), using default.",
702                                      timeoutSecs);
703 
704                                 mKeepAliveTimeoutUs = kDefaultKeepAliveTimeoutUs;
705                             } else {
706                                 mKeepAliveTimeoutUs = timeoutSecs * 1000000ll;
707 
708                                 ALOGI("server specified timeout of %lu secs.",
709                                      timeoutSecs);
710                             }
711                         }
712 
713                         i = mSessionID.find(";");
714                         if (i >= 0) {
715                             // Remove options, i.e. ";timeout=90"
716                             mSessionID.erase(i, mSessionID.size() - i);
717                         }
718 
719                         sp<AMessage> notify = new AMessage('accu', this);
720                         notify->setSize("track-index", trackIndex);
721 
722                         i = response->mHeaders.indexOfKey("transport");
723                         CHECK_GE(i, 0);
724 
725                         if (track->mRTPSocket != -1 && track->mRTCPSocket != -1) {
726                             if (!track->mUsingInterleavedTCP) {
727                                 AString transport = response->mHeaders.valueAt(i);
728 
729                                 // We are going to continue even if we were
730                                 // unable to poke a hole into the firewall...
731                                 pokeAHole(
732                                         track->mRTPSocket,
733                                         track->mRTCPSocket,
734                                         transport);
735                             }
736 
737                             mRTPConn->addStream(
738                                     track->mRTPSocket, track->mRTCPSocket,
739                                     mSessionDesc, index,
740                                     notify, track->mUsingInterleavedTCP);
741 
742                             mSetupTracksSuccessful = true;
743                         } else {
744                             result = BAD_VALUE;
745                         }
746                     }
747                 }
748 
749                 if (result != OK) {
750                     if (track) {
751                         if (!track->mUsingInterleavedTCP) {
752                             // Clear the tag
753                             if (mUIDValid) {
754                                 HTTPBase::UnRegisterSocketUserTag(track->mRTPSocket);
755                                 HTTPBase::UnRegisterSocketUserMark(track->mRTPSocket);
756                                 HTTPBase::UnRegisterSocketUserTag(track->mRTCPSocket);
757                                 HTTPBase::UnRegisterSocketUserMark(track->mRTCPSocket);
758                             }
759 
760                             close(track->mRTPSocket);
761                             close(track->mRTCPSocket);
762                         }
763 
764                         mTracks.removeItemsAt(trackIndex);
765                     }
766                 }
767 
768                 ++index;
769                 if (result == OK && index < mSessionDesc->countTracks()) {
770                     setupTrack(index);
771                 } else if (mSetupTracksSuccessful) {
772                     ++mKeepAliveGeneration;
773                     postKeepAlive();
774 
775                     AString request = "PLAY ";
776                     request.append(mControlURL);
777                     request.append(" RTSP/1.0\r\n");
778 
779                     request.append("Session: ");
780                     request.append(mSessionID);
781                     request.append("\r\n");
782 
783                     request.append("\r\n");
784 
785                     sp<AMessage> reply = new AMessage('play', this);
786                     mConn->sendRequest(request.c_str(), reply);
787                 } else {
788                     sp<AMessage> reply = new AMessage('disc', this);
789                     mConn->disconnect(reply);
790                 }
791                 break;
792             }
793 
794             case 'play':
795             {
796                 int32_t result;
797                 CHECK(msg->findInt32("result", &result));
798 
799                 ALOGI("PLAY completed with result %d (%s)",
800                      result, strerror(-result));
801 
802                 if (result == OK) {
803                     sp<RefBase> obj;
804                     CHECK(msg->findObject("response", &obj));
805                     sp<ARTSPResponse> response =
806                         static_cast<ARTSPResponse *>(obj.get());
807 
808                     if (response->mStatusCode != 200) {
809                         result = UNKNOWN_ERROR;
810                     } else {
811                         parsePlayResponse(response);
812                         postTimeout();
813                     }
814                 }
815 
816                 if (result != OK) {
817                     sp<AMessage> reply = new AMessage('disc', this);
818                     mConn->disconnect(reply);
819                 }
820 
821                 break;
822             }
823 
824             case 'aliv':
825             {
826                 int32_t generation;
827                 CHECK(msg->findInt32("generation", &generation));
828 
829                 if (generation != mKeepAliveGeneration) {
830                     // obsolete event.
831                     break;
832                 }
833 
834                 AString request;
835                 request.append("OPTIONS ");
836                 request.append(mSessionURL);
837                 request.append(" RTSP/1.0\r\n");
838                 request.append("Session: ");
839                 request.append(mSessionID);
840                 request.append("\r\n");
841                 request.append("\r\n");
842 
843                 sp<AMessage> reply = new AMessage('opts', this);
844                 reply->setInt32("generation", mKeepAliveGeneration);
845                 mConn->sendRequest(request.c_str(), reply);
846                 break;
847             }
848 
849             case 'opts':
850             {
851                 int32_t result;
852                 CHECK(msg->findInt32("result", &result));
853 
854                 ALOGI("OPTIONS completed with result %d (%s)",
855                      result, strerror(-result));
856 
857                 int32_t generation;
858                 CHECK(msg->findInt32("generation", &generation));
859 
860                 if (generation != mKeepAliveGeneration) {
861                     // obsolete event.
862                     break;
863                 }
864 
865                 postKeepAlive();
866                 break;
867             }
868 
869             case 'abor':
870             {
871                 for (size_t i = 0; i < mTracks.size(); ++i) {
872                     TrackInfo *info = &mTracks.editItemAt(i);
873 
874                     if (!mFirstAccessUnit) {
875                         postQueueEOS(i, ERROR_END_OF_STREAM);
876                     }
877 
878                     if (!info->mUsingInterleavedTCP) {
879                         mRTPConn->removeStream(info->mRTPSocket, info->mRTCPSocket);
880 
881                         // Clear the tag
882                         if (mUIDValid) {
883                             HTTPBase::UnRegisterSocketUserTag(info->mRTPSocket);
884                             HTTPBase::UnRegisterSocketUserMark(info->mRTPSocket);
885                             HTTPBase::UnRegisterSocketUserTag(info->mRTCPSocket);
886                             HTTPBase::UnRegisterSocketUserMark(info->mRTCPSocket);
887                         }
888 
889                         close(info->mRTPSocket);
890                         close(info->mRTCPSocket);
891                     }
892                 }
893                 mTracks.clear();
894                 mSetupTracksSuccessful = false;
895                 mSeekPending = false;
896                 mFirstAccessUnit = true;
897                 mAllTracksHaveTime = false;
898                 mNTPAnchorUs = -1;
899                 mMediaAnchorUs = -1;
900                 mNumAccessUnitsReceived = 0;
901                 mReceivedFirstRTCPPacket = false;
902                 mReceivedFirstRTPPacket = false;
903                 mPausing = false;
904                 mSeekable = true;
905 
906                 sp<AMessage> reply = new AMessage('tear', this);
907 
908                 int32_t reconnect;
909                 if (msg->findInt32("reconnect", &reconnect) && reconnect) {
910                     reply->setInt32("reconnect", true);
911                 }
912 
913                 AString request;
914                 request = "TEARDOWN ";
915 
916                 // XXX should use aggregate url from SDP here...
917                 request.append(mSessionURL);
918                 request.append(" RTSP/1.0\r\n");
919 
920                 request.append("Session: ");
921                 request.append(mSessionID);
922                 request.append("\r\n");
923 
924                 request.append("\r\n");
925 
926                 mConn->sendRequest(request.c_str(), reply);
927                 break;
928             }
929 
930             case 'tear':
931             {
932                 int32_t result;
933                 CHECK(msg->findInt32("result", &result));
934 
935                 ALOGI("TEARDOWN completed with result %d (%s)",
936                      result, strerror(-result));
937 
938                 sp<AMessage> reply = new AMessage('disc', this);
939 
940                 int32_t reconnect;
941                 if (msg->findInt32("reconnect", &reconnect) && reconnect) {
942                     reply->setInt32("reconnect", true);
943                 }
944 
945                 mConn->disconnect(reply);
946                 break;
947             }
948 
949             case 'quit':
950             {
951                 sp<AMessage> msg = mNotify->dup();
952                 msg->setInt32("what", kWhatDisconnected);
953                 msg->setInt32("result", UNKNOWN_ERROR);
954                 msg->post();
955                 break;
956             }
957 
958             case 'chek':
959             {
960                 int32_t generation;
961                 CHECK(msg->findInt32("generation", &generation));
962                 if (generation != mCheckGeneration) {
963                     // This is an outdated message. Ignore.
964                     break;
965                 }
966 
967                 if (mNumAccessUnitsReceived == 0) {
968 #if 1
969                     ALOGI("stream ended? aborting.");
970                     (new AMessage('abor', this))->post();
971                     break;
972 #else
973                     ALOGI("haven't seen an AU in a looong time.");
974 #endif
975                 }
976 
977                 mNumAccessUnitsReceived = 0;
978                 msg->post(kAccessUnitTimeoutUs);
979                 break;
980             }
981 
982             case 'accu':
983             {
984                 if (mSeekPending) {
985                     ALOGV("Stale access unit.");
986                     break;
987                 }
988 
989                 int32_t timeUpdate;
990                 if (msg->findInt32("time-update", &timeUpdate) && timeUpdate) {
991                     size_t trackIndex;
992                     CHECK(msg->findSize("track-index", &trackIndex));
993 
994                     uint32_t rtpTime;
995                     uint64_t ntpTime;
996                     CHECK(msg->findInt32("rtp-time", (int32_t *)&rtpTime));
997                     CHECK(msg->findInt64("ntp-time", (int64_t *)&ntpTime));
998 
999                     onTimeUpdate(trackIndex, rtpTime, ntpTime);
1000                     break;
1001                 }
1002 
1003                 int32_t first;
1004                 if (msg->findInt32("first-rtcp", &first)) {
1005                     mReceivedFirstRTCPPacket = true;
1006                     break;
1007                 }
1008 
1009                 if (msg->findInt32("first-rtp", &first)) {
1010                     mReceivedFirstRTPPacket = true;
1011                     break;
1012                 }
1013 
1014                 ++mNumAccessUnitsReceived;
1015                 postAccessUnitTimeoutCheck();
1016 
1017                 size_t trackIndex;
1018                 CHECK(msg->findSize("track-index", &trackIndex));
1019 
1020                 if (trackIndex >= mTracks.size()) {
1021                     ALOGV("late packets ignored.");
1022                     break;
1023                 }
1024 
1025                 TrackInfo *track = &mTracks.editItemAt(trackIndex);
1026 
1027                 int32_t eos;
1028                 if (msg->findInt32("eos", &eos)) {
1029                     ALOGI("received BYE on track index %zu", trackIndex);
1030                     if (!mAllTracksHaveTime && dataReceivedOnAllChannels()) {
1031                         ALOGI("No time established => fake existing data");
1032 
1033                         track->mEOSReceived = true;
1034                         mTryFakeRTCP = true;
1035                         mReceivedFirstRTCPPacket = true;
1036                         fakeTimestamps();
1037                     } else {
1038                         postQueueEOS(trackIndex, ERROR_END_OF_STREAM);
1039                     }
1040                     return;
1041                 }
1042 
1043                 if (mSeekPending) {
1044                     ALOGV("we're seeking, dropping stale packet.");
1045                     break;
1046                 }
1047 
1048                 sp<ABuffer> accessUnit;
1049                 CHECK(msg->findBuffer("access-unit", &accessUnit));
1050                 onAccessUnitComplete(trackIndex, accessUnit);
1051                 break;
1052             }
1053 
1054             case 'paus':
1055             {
1056                 int32_t generation;
1057                 CHECK(msg->findInt32("pausecheck", &generation));
1058                 if (generation != mPauseGeneration) {
1059                     ALOGV("Ignoring outdated pause message.");
1060                     break;
1061                 }
1062 
1063                 if (!mSeekable) {
1064                     ALOGW("This is a live stream, ignoring pause request.");
1065                     break;
1066                 }
1067 
1068                 if (mPausing) {
1069                     ALOGV("This stream is already paused.");
1070                     break;
1071                 }
1072 
1073                 mCheckPending = true;
1074                 ++mCheckGeneration;
1075                 mPausing = true;
1076 
1077                 AString request = "PAUSE ";
1078                 request.append(mControlURL);
1079                 request.append(" RTSP/1.0\r\n");
1080 
1081                 request.append("Session: ");
1082                 request.append(mSessionID);
1083                 request.append("\r\n");
1084 
1085                 request.append("\r\n");
1086 
1087                 sp<AMessage> reply = new AMessage('pau2', this);
1088                 mConn->sendRequest(request.c_str(), reply);
1089                 break;
1090             }
1091 
1092             case 'pau2':
1093             {
1094                 int32_t result;
1095                 CHECK(msg->findInt32("result", &result));
1096                 mCheckTimeoutGeneration++;
1097 
1098                 ALOGI("PAUSE completed with result %d (%s)",
1099                      result, strerror(-result));
1100                 break;
1101             }
1102 
1103             case 'resu':
1104             {
1105                 if (mPausing && mSeekPending) {
1106                     // If seeking, Play will be sent from see1 instead
1107                     break;
1108                 }
1109 
1110                 if (!mPausing) {
1111                     // Dont send PLAY if we have not paused
1112                     break;
1113                 }
1114                 AString request = "PLAY ";
1115                 request.append(mControlURL);
1116                 request.append(" RTSP/1.0\r\n");
1117 
1118                 request.append("Session: ");
1119                 request.append(mSessionID);
1120                 request.append("\r\n");
1121 
1122                 request.append("\r\n");
1123 
1124                 sp<AMessage> reply = new AMessage('res2', this);
1125                 mConn->sendRequest(request.c_str(), reply);
1126                 break;
1127             }
1128 
1129             case 'res2':
1130             {
1131                 int32_t result;
1132                 CHECK(msg->findInt32("result", &result));
1133 
1134                 ALOGI("PLAY (for resume) completed with result %d (%s)",
1135                      result, strerror(-result));
1136 
1137                 mCheckPending = false;
1138                 ++mCheckGeneration;
1139                 postAccessUnitTimeoutCheck();
1140 
1141                 if (result == OK) {
1142                     sp<RefBase> obj;
1143                     CHECK(msg->findObject("response", &obj));
1144                     sp<ARTSPResponse> response =
1145                         static_cast<ARTSPResponse *>(obj.get());
1146 
1147                     if (response->mStatusCode != 200) {
1148                         result = UNKNOWN_ERROR;
1149                     } else {
1150                         parsePlayResponse(response);
1151 
1152                         // Post new timeout in order to make sure to use
1153                         // fake timestamps if no new Sender Reports arrive
1154                         postTimeout();
1155                     }
1156                 }
1157 
1158                 if (result != OK) {
1159                     ALOGE("resume failed, aborting.");
1160                     (new AMessage('abor', this))->post();
1161                 }
1162 
1163                 mPausing = false;
1164                 break;
1165             }
1166 
1167             case 'seek':
1168             {
1169                 if (!mSeekable) {
1170                     ALOGW("This is a live stream, ignoring seek request.");
1171 
1172                     sp<AMessage> msg = mNotify->dup();
1173                     msg->setInt32("what", kWhatSeekDone);
1174                     msg->post();
1175                     break;
1176                 }
1177 
1178                 int64_t timeUs;
1179                 CHECK(msg->findInt64("time", &timeUs));
1180 
1181                 mSeekPending = true;
1182 
1183                 // Disable the access unit timeout until we resumed
1184                 // playback again.
1185                 mCheckPending = true;
1186                 ++mCheckGeneration;
1187 
1188                 sp<AMessage> reply = new AMessage('see0', this);
1189                 reply->setInt64("time", timeUs);
1190 
1191                 if (mPausing) {
1192                     // PAUSE already sent
1193                     ALOGI("Pause already sent");
1194                     reply->post();
1195                     break;
1196                 }
1197                 AString request = "PAUSE ";
1198                 request.append(mControlURL);
1199                 request.append(" RTSP/1.0\r\n");
1200 
1201                 request.append("Session: ");
1202                 request.append(mSessionID);
1203                 request.append("\r\n");
1204 
1205                 request.append("\r\n");
1206 
1207                 mConn->sendRequest(request.c_str(), reply);
1208                 break;
1209             }
1210 
1211             case 'see0':
1212             {
1213                 // Session is paused now.
1214                 status_t err = OK;
1215                 msg->findInt32("result", &err);
1216 
1217                 int64_t timeUs;
1218                 CHECK(msg->findInt64("time", &timeUs));
1219 
1220                 sp<AMessage> notify = mNotify->dup();
1221                 notify->setInt32("what", kWhatSeekPaused);
1222                 notify->setInt32("err", err);
1223                 notify->setInt64("time", timeUs);
1224                 notify->post();
1225                 break;
1226 
1227             }
1228 
1229             case 'see1':
1230             {
1231                 for (size_t i = 0; i < mTracks.size(); ++i) {
1232                     TrackInfo *info = &mTracks.editItemAt(i);
1233 
1234                     postQueueSeekDiscontinuity(i);
1235                     info->mEOSReceived = false;
1236 
1237                     info->mRTPAnchor = 0;
1238                     info->mNTPAnchorUs = -1;
1239                 }
1240 
1241                 mAllTracksHaveTime = false;
1242                 mNTPAnchorUs = -1;
1243 
1244                 // Start new timeoutgeneration to avoid getting timeout
1245                 // before PLAY response arrive
1246                 postTimeout();
1247 
1248                 int64_t timeUs;
1249                 CHECK(msg->findInt64("time", &timeUs));
1250 
1251                 AString request = "PLAY ";
1252                 request.append(mControlURL);
1253                 request.append(" RTSP/1.0\r\n");
1254 
1255                 request.append("Session: ");
1256                 request.append(mSessionID);
1257                 request.append("\r\n");
1258 
1259                 request.append(
1260                         AStringPrintf(
1261                             "Range: npt=%lld-\r\n", timeUs / 1000000ll));
1262 
1263                 request.append("\r\n");
1264 
1265                 sp<AMessage> reply = new AMessage('see2', this);
1266                 mConn->sendRequest(request.c_str(), reply);
1267                 break;
1268             }
1269 
1270             case 'see2':
1271             {
1272                 if (mTracks.size() == 0) {
1273                     // We have already hit abor, break
1274                     break;
1275                 }
1276 
1277                 int32_t result;
1278                 CHECK(msg->findInt32("result", &result));
1279 
1280                 ALOGI("PLAY (for seek) completed with result %d (%s)",
1281                      result, strerror(-result));
1282 
1283                 mCheckPending = false;
1284                 ++mCheckGeneration;
1285                 postAccessUnitTimeoutCheck();
1286 
1287                 if (result == OK) {
1288                     sp<RefBase> obj;
1289                     CHECK(msg->findObject("response", &obj));
1290                     sp<ARTSPResponse> response =
1291                         static_cast<ARTSPResponse *>(obj.get());
1292 
1293                     if (response->mStatusCode != 200) {
1294                         result = UNKNOWN_ERROR;
1295                     } else {
1296                         parsePlayResponse(response);
1297 
1298                         // Post new timeout in order to make sure to use
1299                         // fake timestamps if no new Sender Reports arrive
1300                         postTimeout();
1301 
1302                         ssize_t i = response->mHeaders.indexOfKey("rtp-info");
1303                         CHECK_GE(i, 0);
1304 
1305                         ALOGV("rtp-info: %s", response->mHeaders.valueAt(i).c_str());
1306 
1307                         ALOGI("seek completed.");
1308                     }
1309                 }
1310 
1311                 if (result != OK) {
1312                     ALOGE("seek failed, aborting.");
1313                     (new AMessage('abor', this))->post();
1314                 }
1315 
1316                 mPausing = false;
1317                 mSeekPending = false;
1318 
1319                 // Discard all stale access units.
1320                 for (size_t i = 0; i < mTracks.size(); ++i) {
1321                     TrackInfo *track = &mTracks.editItemAt(i);
1322                     track->mPackets.clear();
1323                 }
1324 
1325                 sp<AMessage> msg = mNotify->dup();
1326                 msg->setInt32("what", kWhatSeekDone);
1327                 msg->post();
1328                 break;
1329             }
1330 
1331             case 'biny':
1332             {
1333                 sp<ABuffer> buffer;
1334                 CHECK(msg->findBuffer("buffer", &buffer));
1335 
1336                 int32_t index;
1337                 CHECK(buffer->meta()->findInt32("index", &index));
1338 
1339                 mRTPConn->injectPacket(index, buffer);
1340                 break;
1341             }
1342 
1343             case 'tiou':
1344             {
1345                 int32_t timeoutGenerationCheck;
1346                 CHECK(msg->findInt32("tioucheck", &timeoutGenerationCheck));
1347                 if (timeoutGenerationCheck != mCheckTimeoutGeneration) {
1348                     // This is an outdated message. Ignore.
1349                     // This typically happens if a lot of seeks are
1350                     // performed, since new timeout messages now are
1351                     // posted at seek as well.
1352                     break;
1353                 }
1354                 if (!mReceivedFirstRTCPPacket) {
1355                     if (dataReceivedOnAllChannels() && !mTryFakeRTCP) {
1356                         ALOGW("We received RTP packets but no RTCP packets, "
1357                              "using fake timestamps.");
1358 
1359                         mTryFakeRTCP = true;
1360 
1361                         mReceivedFirstRTCPPacket = true;
1362 
1363                         fakeTimestamps();
1364                     } else if (!mReceivedFirstRTPPacket && !mTryTCPInterleaving) {
1365                         ALOGW("Never received any data, switching transports.");
1366 
1367                         mTryTCPInterleaving = true;
1368 
1369                         sp<AMessage> msg = new AMessage('abor', this);
1370                         msg->setInt32("reconnect", true);
1371                         msg->post();
1372                     } else {
1373                         ALOGW("Never received any data, disconnecting.");
1374                         (new AMessage('abor', this))->post();
1375                     }
1376                 } else {
1377                     if (!mAllTracksHaveTime) {
1378                         ALOGW("We received some RTCP packets, but time "
1379                               "could not be established on all tracks, now "
1380                               "using fake timestamps");
1381 
1382                         fakeTimestamps();
1383                     }
1384                 }
1385                 break;
1386             }
1387 
1388             default:
1389                 TRESPASS();
1390                 break;
1391         }
1392     }
1393 
postKeepAliveMyHandler1394     void postKeepAlive() {
1395         sp<AMessage> msg = new AMessage('aliv', this);
1396         msg->setInt32("generation", mKeepAliveGeneration);
1397         msg->post((mKeepAliveTimeoutUs * 9) / 10);
1398     }
1399 
cancelAccessUnitTimeoutCheckMyHandler1400     void cancelAccessUnitTimeoutCheck() {
1401         ALOGV("cancelAccessUnitTimeoutCheck");
1402         ++mCheckGeneration;
1403     }
1404 
postAccessUnitTimeoutCheckMyHandler1405     void postAccessUnitTimeoutCheck() {
1406         if (mCheckPending) {
1407             return;
1408         }
1409 
1410         mCheckPending = true;
1411         sp<AMessage> check = new AMessage('chek', this);
1412         check->setInt32("generation", mCheckGeneration);
1413         check->post(kAccessUnitTimeoutUs);
1414     }
1415 
SplitStringMyHandler1416     static void SplitString(
1417             const AString &s, const char *separator, List<AString> *items) {
1418         items->clear();
1419         size_t start = 0;
1420         while (start < s.size()) {
1421             ssize_t offset = s.find(separator, start);
1422 
1423             if (offset < 0) {
1424                 items->push_back(AString(s, start, s.size() - start));
1425                 break;
1426             }
1427 
1428             items->push_back(AString(s, start, offset - start));
1429             start = offset + strlen(separator);
1430         }
1431     }
1432 
parsePlayResponseMyHandler1433     void parsePlayResponse(const sp<ARTSPResponse> &response) {
1434         mPlayResponseParsed = true;
1435         if (mTracks.size() == 0) {
1436             ALOGV("parsePlayResponse: late packets ignored.");
1437             return;
1438         }
1439 
1440         ssize_t i = response->mHeaders.indexOfKey("range");
1441         if (i < 0) {
1442             // Server doesn't even tell use what range it is going to
1443             // play, therefore we won't support seeking.
1444             return;
1445         }
1446 
1447         AString range = response->mHeaders.valueAt(i);
1448         ALOGV("Range: %s", range.c_str());
1449 
1450         AString val;
1451         CHECK(GetAttribute(range.c_str(), "npt", &val));
1452 
1453         float npt1, npt2;
1454         if (!ASessionDescription::parseNTPRange(val.c_str(), &npt1, &npt2)) {
1455             // This is a live stream and therefore not seekable.
1456 
1457             ALOGI("This is a live stream");
1458             return;
1459         }
1460 
1461         i = response->mHeaders.indexOfKey("rtp-info");
1462         CHECK_GE(i, 0);
1463 
1464         AString rtpInfo = response->mHeaders.valueAt(i);
1465         List<AString> streamInfos;
1466         SplitString(rtpInfo, ",", &streamInfos);
1467 
1468         int n = 1;
1469         for (List<AString>::iterator it = streamInfos.begin();
1470              it != streamInfos.end(); ++it) {
1471             (*it).trim();
1472             ALOGV("streamInfo[%d] = %s", n, (*it).c_str());
1473 
1474             CHECK(GetAttribute((*it).c_str(), "url", &val));
1475 
1476             size_t trackIndex = 0;
1477             while (trackIndex < mTracks.size()
1478                     && !(val == mTracks.editItemAt(trackIndex).mURL)) {
1479                 ++trackIndex;
1480             }
1481             CHECK_LT(trackIndex, mTracks.size());
1482 
1483             CHECK(GetAttribute((*it).c_str(), "seq", &val));
1484 
1485             char *end;
1486             unsigned long seq = strtoul(val.c_str(), &end, 10);
1487 
1488             TrackInfo *info = &mTracks.editItemAt(trackIndex);
1489             info->mFirstSeqNumInSegment = seq;
1490             info->mNewSegment = true;
1491             info->mAllowedStaleAccessUnits = kMaxAllowedStaleAccessUnits;
1492 
1493             CHECK(GetAttribute((*it).c_str(), "rtptime", &val));
1494 
1495             uint32_t rtpTime = strtoul(val.c_str(), &end, 10);
1496 
1497             ALOGV("track #%d: rtpTime=%u <=> npt=%.2f", n, rtpTime, npt1);
1498 
1499             info->mNormalPlayTimeRTP = rtpTime;
1500             info->mNormalPlayTimeUs = (int64_t)(npt1 * 1E6);
1501 
1502             if (!mFirstAccessUnit) {
1503                 postNormalPlayTimeMapping(
1504                         trackIndex,
1505                         info->mNormalPlayTimeRTP, info->mNormalPlayTimeUs);
1506             }
1507 
1508             ++n;
1509         }
1510     }
1511 
getTrackFormatMyHandler1512     sp<MetaData> getTrackFormat(size_t index, int32_t *timeScale) {
1513         CHECK_GE(index, 0u);
1514         CHECK_LT(index, mTracks.size());
1515 
1516         const TrackInfo &info = mTracks.itemAt(index);
1517 
1518         *timeScale = info.mTimeScale;
1519 
1520         return info.mPacketSource->getFormat();
1521     }
1522 
countTracksMyHandler1523     size_t countTracks() const {
1524         return mTracks.size();
1525     }
1526 
1527 private:
1528     struct TrackInfo {
1529         AString mURL;
1530         int mRTPSocket;
1531         int mRTCPSocket;
1532         bool mUsingInterleavedTCP;
1533         uint32_t mFirstSeqNumInSegment;
1534         bool mNewSegment;
1535         int32_t mAllowedStaleAccessUnits;
1536 
1537         uint32_t mRTPAnchor;
1538         int64_t mNTPAnchorUs;
1539         int32_t mTimeScale;
1540         bool mEOSReceived;
1541 
1542         uint32_t mNormalPlayTimeRTP;
1543         int64_t mNormalPlayTimeUs;
1544 
1545         sp<APacketSource> mPacketSource;
1546 
1547         // Stores packets temporarily while no notion of time
1548         // has been established yet.
1549         List<sp<ABuffer> > mPackets;
1550     };
1551 
1552     sp<AMessage> mNotify;
1553     bool mUIDValid;
1554     uid_t mUID;
1555     sp<ALooper> mNetLooper;
1556     sp<ARTSPConnection> mConn;
1557     sp<ARTPConnection> mRTPConn;
1558     sp<ASessionDescription> mSessionDesc;
1559     AString mOriginalSessionURL;  // This one still has user:pass@
1560     AString mSessionURL;
1561     AString mSessionHost;
1562     AString mBaseURL;
1563     AString mControlURL;
1564     AString mSessionID;
1565     bool mSetupTracksSuccessful;
1566     bool mSeekPending;
1567     bool mFirstAccessUnit;
1568 
1569     bool mAllTracksHaveTime;
1570     int64_t mNTPAnchorUs;
1571     int64_t mMediaAnchorUs;
1572     int64_t mLastMediaTimeUs;
1573 
1574     int64_t mNumAccessUnitsReceived;
1575     bool mCheckPending;
1576     int32_t mCheckGeneration;
1577     int32_t mCheckTimeoutGeneration;
1578     bool mTryTCPInterleaving;
1579     bool mTryFakeRTCP;
1580     bool mReceivedFirstRTCPPacket;
1581     bool mReceivedFirstRTPPacket;
1582     bool mSeekable;
1583     int64_t mKeepAliveTimeoutUs;
1584     int32_t mKeepAliveGeneration;
1585     bool mPausing;
1586     int32_t mPauseGeneration;
1587 
1588     Vector<TrackInfo> mTracks;
1589 
1590     bool mPlayResponseParsed;
1591 
setupTrackMyHandler1592     void setupTrack(size_t index) {
1593         sp<APacketSource> source =
1594             new APacketSource(mSessionDesc, index);
1595 
1596         if (source->initCheck() != OK) {
1597             ALOGW("Unsupported format. Ignoring track #%zu.", index);
1598 
1599             sp<AMessage> reply = new AMessage('setu', this);
1600             reply->setSize("index", index);
1601             reply->setInt32("result", ERROR_UNSUPPORTED);
1602             reply->post();
1603             return;
1604         }
1605 
1606         AString url;
1607         CHECK(mSessionDesc->findAttribute(index, "a=control", &url));
1608 
1609         AString trackURL;
1610         CHECK(MakeURL(mBaseURL.c_str(), url.c_str(), &trackURL));
1611 
1612         mTracks.push(TrackInfo());
1613         TrackInfo *info = &mTracks.editItemAt(mTracks.size() - 1);
1614         info->mURL = trackURL;
1615         info->mPacketSource = source;
1616         info->mUsingInterleavedTCP = false;
1617         info->mFirstSeqNumInSegment = 0;
1618         info->mNewSegment = true;
1619         info->mAllowedStaleAccessUnits = kMaxAllowedStaleAccessUnits;
1620         info->mRTPSocket = -1;
1621         info->mRTCPSocket = -1;
1622         info->mRTPAnchor = 0;
1623         info->mNTPAnchorUs = -1;
1624         info->mNormalPlayTimeRTP = 0;
1625         info->mNormalPlayTimeUs = 0ll;
1626 
1627         unsigned long PT;
1628         AString formatDesc;
1629         AString formatParams;
1630         mSessionDesc->getFormatType(index, &PT, &formatDesc, &formatParams);
1631 
1632         int32_t timescale;
1633         int32_t numChannels;
1634         ASessionDescription::ParseFormatDesc(
1635                 formatDesc.c_str(), &timescale, &numChannels);
1636 
1637         info->mTimeScale = timescale;
1638         info->mEOSReceived = false;
1639 
1640         ALOGV("track #%zu URL=%s", mTracks.size(), trackURL.c_str());
1641 
1642         AString request = "SETUP ";
1643         request.append(trackURL);
1644         request.append(" RTSP/1.0\r\n");
1645 
1646         if (mTryTCPInterleaving) {
1647             size_t interleaveIndex = 2 * (mTracks.size() - 1);
1648             info->mUsingInterleavedTCP = true;
1649             info->mRTPSocket = interleaveIndex;
1650             info->mRTCPSocket = interleaveIndex + 1;
1651 
1652             request.append("Transport: RTP/AVP/TCP;interleaved=");
1653             request.append(interleaveIndex);
1654             request.append("-");
1655             request.append(interleaveIndex + 1);
1656         } else {
1657             unsigned rtpPort;
1658             ARTPConnection::MakePortPair(
1659                     &info->mRTPSocket, &info->mRTCPSocket, &rtpPort);
1660 
1661             if (mUIDValid) {
1662                 HTTPBase::RegisterSocketUserTag(info->mRTPSocket, mUID,
1663                                                 (uint32_t)*(uint32_t*) "RTP_");
1664                 HTTPBase::RegisterSocketUserTag(info->mRTCPSocket, mUID,
1665                                                 (uint32_t)*(uint32_t*) "RTP_");
1666                 HTTPBase::RegisterSocketUserMark(info->mRTPSocket, mUID);
1667                 HTTPBase::RegisterSocketUserMark(info->mRTCPSocket, mUID);
1668             }
1669 
1670             request.append("Transport: RTP/AVP/UDP;unicast;client_port=");
1671             request.append(rtpPort);
1672             request.append("-");
1673             request.append(rtpPort + 1);
1674         }
1675 
1676         request.append("\r\n");
1677 
1678         if (index > 1) {
1679             request.append("Session: ");
1680             request.append(mSessionID);
1681             request.append("\r\n");
1682         }
1683 
1684         request.append("\r\n");
1685 
1686         sp<AMessage> reply = new AMessage('setu', this);
1687         reply->setSize("index", index);
1688         reply->setSize("track-index", mTracks.size() - 1);
1689         mConn->sendRequest(request.c_str(), reply);
1690     }
1691 
MakeURLMyHandler1692     static bool MakeURL(const char *baseURL, const char *url, AString *out) {
1693         out->clear();
1694 
1695         if (strncasecmp("rtsp://", baseURL, 7)) {
1696             // Base URL must be absolute
1697             return false;
1698         }
1699 
1700         if (!strncasecmp("rtsp://", url, 7)) {
1701             // "url" is already an absolute URL, ignore base URL.
1702             out->setTo(url);
1703             return true;
1704         }
1705 
1706         size_t n = strlen(baseURL);
1707         out->setTo(baseURL);
1708         if (baseURL[n - 1] != '/') {
1709             out->append("/");
1710         }
1711         out->append(url);
1712 
1713         return true;
1714     }
1715 
fakeTimestampsMyHandler1716     void fakeTimestamps() {
1717         mNTPAnchorUs = -1ll;
1718         for (size_t i = 0; i < mTracks.size(); ++i) {
1719             onTimeUpdate(i, 0, 0ll);
1720         }
1721     }
1722 
dataReceivedOnAllChannelsMyHandler1723     bool dataReceivedOnAllChannels() {
1724         TrackInfo *track;
1725         for (size_t i = 0; i < mTracks.size(); ++i) {
1726             track = &mTracks.editItemAt(i);
1727             if (track->mPackets.empty()) {
1728                 return false;
1729             }
1730         }
1731         return true;
1732     }
1733 
handleFirstAccessUnitMyHandler1734     void handleFirstAccessUnit() {
1735         if (mFirstAccessUnit) {
1736             sp<AMessage> msg = mNotify->dup();
1737             msg->setInt32("what", kWhatConnected);
1738             msg->post();
1739 
1740             if (mSeekable) {
1741                 for (size_t i = 0; i < mTracks.size(); ++i) {
1742                     TrackInfo *info = &mTracks.editItemAt(i);
1743 
1744                     postNormalPlayTimeMapping(
1745                             i,
1746                             info->mNormalPlayTimeRTP, info->mNormalPlayTimeUs);
1747                 }
1748             }
1749 
1750             mFirstAccessUnit = false;
1751         }
1752     }
1753 
onTimeUpdateMyHandler1754     void onTimeUpdate(int32_t trackIndex, uint32_t rtpTime, uint64_t ntpTime) {
1755         ALOGV("onTimeUpdate track %d, rtpTime = 0x%08x, ntpTime = %#016llx",
1756              trackIndex, rtpTime, (long long)ntpTime);
1757 
1758         int64_t ntpTimeUs = (int64_t)(ntpTime * 1E6 / (1ll << 32));
1759 
1760         TrackInfo *track = &mTracks.editItemAt(trackIndex);
1761 
1762         track->mRTPAnchor = rtpTime;
1763         track->mNTPAnchorUs = ntpTimeUs;
1764 
1765         if (mNTPAnchorUs < 0) {
1766             mNTPAnchorUs = ntpTimeUs;
1767             mMediaAnchorUs = mLastMediaTimeUs;
1768         }
1769 
1770         if (!mAllTracksHaveTime) {
1771             bool allTracksHaveTime = (mTracks.size() > 0);
1772             for (size_t i = 0; i < mTracks.size(); ++i) {
1773                 TrackInfo *track = &mTracks.editItemAt(i);
1774                 if (track->mNTPAnchorUs < 0) {
1775                     allTracksHaveTime = false;
1776                     break;
1777                 }
1778             }
1779             if (allTracksHaveTime) {
1780                 mAllTracksHaveTime = true;
1781                 ALOGI("Time now established for all tracks.");
1782             }
1783         }
1784         if (mAllTracksHaveTime && dataReceivedOnAllChannels()) {
1785             handleFirstAccessUnit();
1786 
1787             // Time is now established, lets start timestamping immediately
1788             for (size_t i = 0; i < mTracks.size(); ++i) {
1789                 if (OK != processAccessUnitQueue(i)) {
1790                     return;
1791                 }
1792             }
1793             for (size_t i = 0; i < mTracks.size(); ++i) {
1794                 TrackInfo *trackInfo = &mTracks.editItemAt(i);
1795                 if (trackInfo->mEOSReceived) {
1796                     postQueueEOS(i, ERROR_END_OF_STREAM);
1797                     trackInfo->mEOSReceived = false;
1798                 }
1799             }
1800         }
1801     }
1802 
processAccessUnitQueueMyHandler1803     status_t processAccessUnitQueue(int32_t trackIndex) {
1804         TrackInfo *track = &mTracks.editItemAt(trackIndex);
1805         while (!track->mPackets.empty()) {
1806             sp<ABuffer> accessUnit = *track->mPackets.begin();
1807             track->mPackets.erase(track->mPackets.begin());
1808 
1809             uint32_t seqNum = (uint32_t)accessUnit->int32Data();
1810             if (track->mNewSegment) {
1811                 // The sequence number from RTP packet has only 16 bits and is extended
1812                 // by ARTPSource. Only the low 16 bits of seq in RTP-Info of reply of
1813                 // RTSP "PLAY" command should be used to detect the first RTP packet
1814                 // after seeking.
1815                 if (mSeekable) {
1816                     if (track->mAllowedStaleAccessUnits > 0) {
1817                         uint32_t seqNum16 = seqNum & 0xffff;
1818                         uint32_t firstSeqNumInSegment16 = track->mFirstSeqNumInSegment & 0xffff;
1819                         if (seqNum16 > firstSeqNumInSegment16 + kMaxAllowedStaleAccessUnits
1820                                 || seqNum16 < firstSeqNumInSegment16) {
1821                             // Not the first rtp packet of the stream after seeking, discarding.
1822                             track->mAllowedStaleAccessUnits--;
1823                             ALOGV("discarding stale access unit (0x%x : 0x%x)",
1824                                  seqNum, track->mFirstSeqNumInSegment);
1825                             continue;
1826                         }
1827                         ALOGW_IF(seqNum16 != firstSeqNumInSegment16,
1828                                 "Missing the first packet(%u), now take packet(%u) as first one",
1829                                 track->mFirstSeqNumInSegment, seqNum);
1830                     } else { // track->mAllowedStaleAccessUnits <= 0
1831                         mNumAccessUnitsReceived = 0;
1832                         ALOGW_IF(track->mAllowedStaleAccessUnits == 0,
1833                              "Still no first rtp packet after %d stale ones",
1834                              kMaxAllowedStaleAccessUnits);
1835                         track->mAllowedStaleAccessUnits = -1;
1836                         return UNKNOWN_ERROR;
1837                     }
1838                 }
1839 
1840                 // Now found the first rtp packet of the stream after seeking.
1841                 track->mFirstSeqNumInSegment = seqNum;
1842                 track->mNewSegment = false;
1843             }
1844 
1845             if (seqNum < track->mFirstSeqNumInSegment) {
1846                 ALOGV("dropping stale access-unit (%d < %d)",
1847                      seqNum, track->mFirstSeqNumInSegment);
1848                 continue;
1849             }
1850 
1851             if (addMediaTimestamp(trackIndex, track, accessUnit)) {
1852                 postQueueAccessUnit(trackIndex, accessUnit);
1853             }
1854         }
1855         return OK;
1856     }
1857 
onAccessUnitCompleteMyHandler1858     void onAccessUnitComplete(
1859             int32_t trackIndex, const sp<ABuffer> &accessUnit) {
1860         TrackInfo *track = &mTracks.editItemAt(trackIndex);
1861         track->mPackets.push_back(accessUnit);
1862 
1863         uint32_t seqNum = (uint32_t)accessUnit->int32Data();
1864         ALOGV("onAccessUnitComplete track %d storing accessunit %u", trackIndex, seqNum);
1865 
1866         if(!mPlayResponseParsed){
1867             ALOGV("play response is not parsed");
1868             return;
1869         }
1870 
1871         handleFirstAccessUnit();
1872 
1873         if (!mAllTracksHaveTime) {
1874             ALOGV("storing accessUnit, no time established yet");
1875             return;
1876         }
1877 
1878         if (OK != processAccessUnitQueue(trackIndex)) {
1879             return;
1880         }
1881 
1882         if (track->mEOSReceived) {
1883             postQueueEOS(trackIndex, ERROR_END_OF_STREAM);
1884             track->mEOSReceived = false;
1885         }
1886     }
1887 
addMediaTimestampMyHandler1888     bool addMediaTimestamp(
1889             int32_t trackIndex, const TrackInfo *track,
1890             const sp<ABuffer> &accessUnit) {
1891         UNUSED_UNLESS_VERBOSE(trackIndex);
1892 
1893         uint32_t rtpTime;
1894         CHECK(accessUnit->meta()->findInt32(
1895                     "rtp-time", (int32_t *)&rtpTime));
1896 
1897         int64_t relRtpTimeUs =
1898             (((int64_t)rtpTime - (int64_t)track->mRTPAnchor) * 1000000ll)
1899                 / track->mTimeScale;
1900 
1901         int64_t ntpTimeUs = track->mNTPAnchorUs + relRtpTimeUs;
1902 
1903         int64_t mediaTimeUs = mMediaAnchorUs + ntpTimeUs - mNTPAnchorUs;
1904 
1905         if (mediaTimeUs > mLastMediaTimeUs) {
1906             mLastMediaTimeUs = mediaTimeUs;
1907         }
1908 
1909         if (mediaTimeUs < 0) {
1910             ALOGV("dropping early accessUnit.");
1911             return false;
1912         }
1913 
1914         ALOGV("track %d rtpTime=%u mediaTimeUs = %lld us (%.2f secs)",
1915              trackIndex, rtpTime, (long long)mediaTimeUs, mediaTimeUs / 1E6);
1916 
1917         accessUnit->meta()->setInt64("timeUs", mediaTimeUs);
1918 
1919         return true;
1920     }
1921 
postQueueAccessUnitMyHandler1922     void postQueueAccessUnit(
1923             size_t trackIndex, const sp<ABuffer> &accessUnit) {
1924         sp<AMessage> msg = mNotify->dup();
1925         msg->setInt32("what", kWhatAccessUnit);
1926         msg->setSize("trackIndex", trackIndex);
1927         msg->setBuffer("accessUnit", accessUnit);
1928         msg->post();
1929     }
1930 
postQueueEOSMyHandler1931     void postQueueEOS(size_t trackIndex, status_t finalResult) {
1932         sp<AMessage> msg = mNotify->dup();
1933         msg->setInt32("what", kWhatEOS);
1934         msg->setSize("trackIndex", trackIndex);
1935         msg->setInt32("finalResult", finalResult);
1936         msg->post();
1937     }
1938 
postQueueSeekDiscontinuityMyHandler1939     void postQueueSeekDiscontinuity(size_t trackIndex) {
1940         sp<AMessage> msg = mNotify->dup();
1941         msg->setInt32("what", kWhatSeekDiscontinuity);
1942         msg->setSize("trackIndex", trackIndex);
1943         msg->post();
1944     }
1945 
postNormalPlayTimeMappingMyHandler1946     void postNormalPlayTimeMapping(
1947             size_t trackIndex, uint32_t rtpTime, int64_t nptUs) {
1948         sp<AMessage> msg = mNotify->dup();
1949         msg->setInt32("what", kWhatNormalPlayTimeMapping);
1950         msg->setSize("trackIndex", trackIndex);
1951         msg->setInt32("rtpTime", rtpTime);
1952         msg->setInt64("nptUs", nptUs);
1953         msg->post();
1954     }
1955 
postTimeoutMyHandler1956     void postTimeout() {
1957         sp<AMessage> timeout = new AMessage('tiou', this);
1958         mCheckTimeoutGeneration++;
1959         timeout->setInt32("tioucheck", mCheckTimeoutGeneration);
1960 
1961         int64_t startupTimeoutUs;
1962         startupTimeoutUs = property_get_int64("media.rtsp.timeout-us", kStartupTimeoutUs);
1963         timeout->post(startupTimeoutUs);
1964     }
1965 
1966     DISALLOW_EVIL_CONSTRUCTORS(MyHandler);
1967 };
1968 
1969 }  // namespace android
1970 
1971 #endif  // MY_HANDLER_H_
1972