1 /*
2  * Copyright (C) 2010 The Android Open Source Project
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *      http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 //#define LOG_NDEBUG 0
18 #define LOG_TAG "AnotherPacketSource"
19 
20 #include "AnotherPacketSource.h"
21 
22 #include "include/avc_utils.h"
23 
24 #include <media/stagefright/foundation/ABuffer.h>
25 #include <media/stagefright/foundation/ADebug.h>
26 #include <media/stagefright/foundation/AMessage.h>
27 #include <media/stagefright/foundation/AString.h>
28 #include <media/stagefright/foundation/hexdump.h>
29 #include <media/stagefright/MediaBuffer.h>
30 #include <media/stagefright/MediaDefs.h>
31 #include <media/stagefright/MetaData.h>
32 #include <media/stagefright/Utils.h>
33 #include <utils/Vector.h>
34 
35 #include <inttypes.h>
36 
37 namespace android {
38 
39 const int64_t kNearEOSMarkUs = 2000000ll; // 2 secs
40 
AnotherPacketSource(const sp<MetaData> & meta)41 AnotherPacketSource::AnotherPacketSource(const sp<MetaData> &meta)
42     : mIsAudio(false),
43       mIsVideo(false),
44       mEnabled(true),
45       mFormat(NULL),
46       mLastQueuedTimeUs(0),
47       mEOSResult(OK),
48       mLatestEnqueuedMeta(NULL),
49       mLatestDequeuedMeta(NULL) {
50     setFormat(meta);
51 
52     mDiscontinuitySegments.push_back(DiscontinuitySegment());
53 }
54 
setFormat(const sp<MetaData> & meta)55 void AnotherPacketSource::setFormat(const sp<MetaData> &meta) {
56     if (mFormat != NULL) {
57         // Only allowed to be set once. Requires explicit clear to reset.
58         return;
59     }
60 
61     mIsAudio = false;
62     mIsVideo = false;
63 
64     if (meta == NULL) {
65         return;
66     }
67 
68     mFormat = meta;
69     const char *mime;
70     CHECK(meta->findCString(kKeyMIMEType, &mime));
71 
72     if (!strncasecmp("audio/", mime, 6)) {
73         mIsAudio = true;
74     } else  if (!strncasecmp("video/", mime, 6)) {
75         mIsVideo = true;
76     } else {
77         CHECK(!strncasecmp("text/", mime, 5) || !strncasecmp("application/", mime, 12));
78     }
79 }
80 
~AnotherPacketSource()81 AnotherPacketSource::~AnotherPacketSource() {
82 }
83 
start(MetaData *)84 status_t AnotherPacketSource::start(MetaData * /* params */) {
85     return OK;
86 }
87 
stop()88 status_t AnotherPacketSource::stop() {
89     return OK;
90 }
91 
getFormat()92 sp<MetaData> AnotherPacketSource::getFormat() {
93     Mutex::Autolock autoLock(mLock);
94     if (mFormat != NULL) {
95         return mFormat;
96     }
97 
98     List<sp<ABuffer> >::iterator it = mBuffers.begin();
99     while (it != mBuffers.end()) {
100         sp<ABuffer> buffer = *it;
101         int32_t discontinuity;
102         if (!buffer->meta()->findInt32("discontinuity", &discontinuity)) {
103             sp<RefBase> object;
104             if (buffer->meta()->findObject("format", &object)) {
105                 setFormat(static_cast<MetaData*>(object.get()));
106                 return mFormat;
107             }
108         }
109 
110         ++it;
111     }
112     return NULL;
113 }
114 
dequeueAccessUnit(sp<ABuffer> * buffer)115 status_t AnotherPacketSource::dequeueAccessUnit(sp<ABuffer> *buffer) {
116     buffer->clear();
117 
118     Mutex::Autolock autoLock(mLock);
119     while (mEOSResult == OK && mBuffers.empty()) {
120         mCondition.wait(mLock);
121     }
122 
123     if (!mBuffers.empty()) {
124         *buffer = *mBuffers.begin();
125         mBuffers.erase(mBuffers.begin());
126 
127         int32_t discontinuity;
128         if ((*buffer)->meta()->findInt32("discontinuity", &discontinuity)) {
129             if (wasFormatChange(discontinuity)) {
130                 mFormat.clear();
131             }
132 
133             mDiscontinuitySegments.erase(mDiscontinuitySegments.begin());
134             // CHECK(!mDiscontinuitySegments.empty());
135             return INFO_DISCONTINUITY;
136         }
137 
138         // CHECK(!mDiscontinuitySegments.empty());
139         DiscontinuitySegment &seg = *mDiscontinuitySegments.begin();
140 
141         int64_t timeUs;
142         mLatestDequeuedMeta = (*buffer)->meta()->dup();
143         CHECK(mLatestDequeuedMeta->findInt64("timeUs", &timeUs));
144         if (timeUs > seg.mMaxDequeTimeUs) {
145             seg.mMaxDequeTimeUs = timeUs;
146         }
147 
148         sp<RefBase> object;
149         if ((*buffer)->meta()->findObject("format", &object)) {
150             setFormat(static_cast<MetaData*>(object.get()));
151         }
152 
153         return OK;
154     }
155 
156     return mEOSResult;
157 }
158 
requeueAccessUnit(const sp<ABuffer> & buffer)159 void AnotherPacketSource::requeueAccessUnit(const sp<ABuffer> &buffer) {
160     // TODO: update corresponding book keeping info.
161     Mutex::Autolock autoLock(mLock);
162     mBuffers.push_front(buffer);
163 }
164 
read(MediaBuffer ** out,const ReadOptions *)165 status_t AnotherPacketSource::read(
166         MediaBuffer **out, const ReadOptions *) {
167     *out = NULL;
168 
169     Mutex::Autolock autoLock(mLock);
170     while (mEOSResult == OK && mBuffers.empty()) {
171         mCondition.wait(mLock);
172     }
173 
174     if (!mBuffers.empty()) {
175 
176         const sp<ABuffer> buffer = *mBuffers.begin();
177         mBuffers.erase(mBuffers.begin());
178 
179         int32_t discontinuity;
180         if (buffer->meta()->findInt32("discontinuity", &discontinuity)) {
181             if (wasFormatChange(discontinuity)) {
182                 mFormat.clear();
183             }
184 
185             mDiscontinuitySegments.erase(mDiscontinuitySegments.begin());
186             // CHECK(!mDiscontinuitySegments.empty());
187             return INFO_DISCONTINUITY;
188         }
189 
190         mLatestDequeuedMeta = buffer->meta()->dup();
191 
192         sp<RefBase> object;
193         if (buffer->meta()->findObject("format", &object)) {
194             setFormat(static_cast<MetaData*>(object.get()));
195         }
196 
197         int64_t timeUs;
198         CHECK(buffer->meta()->findInt64("timeUs", &timeUs));
199         // CHECK(!mDiscontinuitySegments.empty());
200         DiscontinuitySegment &seg = *mDiscontinuitySegments.begin();
201         if (timeUs > seg.mMaxDequeTimeUs) {
202             seg.mMaxDequeTimeUs = timeUs;
203         }
204 
205         MediaBuffer *mediaBuffer = new MediaBuffer(buffer);
206 
207         mediaBuffer->meta_data()->setInt64(kKeyTime, timeUs);
208 
209         int32_t isSync;
210         if (buffer->meta()->findInt32("isSync", &isSync)) {
211             mediaBuffer->meta_data()->setInt32(kKeyIsSyncFrame, isSync);
212         }
213 
214         sp<ABuffer> sei;
215         if (buffer->meta()->findBuffer("sei", &sei) && sei != NULL) {
216             mediaBuffer->meta_data()->setData(kKeySEI, 0, sei->data(), sei->size());
217         }
218 
219         *out = mediaBuffer;
220         return OK;
221     }
222 
223     return mEOSResult;
224 }
225 
wasFormatChange(int32_t discontinuityType) const226 bool AnotherPacketSource::wasFormatChange(
227         int32_t discontinuityType) const {
228     if (mIsAudio) {
229         return (discontinuityType & ATSParser::DISCONTINUITY_AUDIO_FORMAT) != 0;
230     }
231 
232     if (mIsVideo) {
233         return (discontinuityType & ATSParser::DISCONTINUITY_VIDEO_FORMAT) != 0;
234     }
235 
236     return false;
237 }
238 
queueAccessUnit(const sp<ABuffer> & buffer)239 void AnotherPacketSource::queueAccessUnit(const sp<ABuffer> &buffer) {
240     int32_t damaged;
241     if (buffer->meta()->findInt32("damaged", &damaged) && damaged) {
242         // LOG(VERBOSE) << "discarding damaged AU";
243         return;
244     }
245 
246     Mutex::Autolock autoLock(mLock);
247     mBuffers.push_back(buffer);
248     mCondition.signal();
249 
250     int32_t discontinuity;
251     if (buffer->meta()->findInt32("discontinuity", &discontinuity)){
252         ALOGV("queueing a discontinuity with queueAccessUnit");
253 
254         mLastQueuedTimeUs = 0ll;
255         mEOSResult = OK;
256         mLatestEnqueuedMeta = NULL;
257 
258         mDiscontinuitySegments.push_back(DiscontinuitySegment());
259         return;
260     }
261 
262     int64_t lastQueuedTimeUs;
263     CHECK(buffer->meta()->findInt64("timeUs", &lastQueuedTimeUs));
264     mLastQueuedTimeUs = lastQueuedTimeUs;
265     ALOGV("queueAccessUnit timeUs=%" PRIi64 " us (%.2f secs)",
266             mLastQueuedTimeUs, mLastQueuedTimeUs / 1E6);
267 
268     // CHECK(!mDiscontinuitySegments.empty());
269     DiscontinuitySegment &tailSeg = *(--mDiscontinuitySegments.end());
270     if (lastQueuedTimeUs > tailSeg.mMaxEnqueTimeUs) {
271         tailSeg.mMaxEnqueTimeUs = lastQueuedTimeUs;
272     }
273     if (tailSeg.mMaxDequeTimeUs == -1) {
274         tailSeg.mMaxDequeTimeUs = lastQueuedTimeUs;
275     }
276 
277     if (mLatestEnqueuedMeta == NULL) {
278         mLatestEnqueuedMeta = buffer->meta()->dup();
279     } else {
280         int64_t latestTimeUs = 0;
281         int64_t frameDeltaUs = 0;
282         CHECK(mLatestEnqueuedMeta->findInt64("timeUs", &latestTimeUs));
283         if (lastQueuedTimeUs > latestTimeUs) {
284             mLatestEnqueuedMeta = buffer->meta()->dup();
285             frameDeltaUs = lastQueuedTimeUs - latestTimeUs;
286             mLatestEnqueuedMeta->setInt64("durationUs", frameDeltaUs);
287         } else if (!mLatestEnqueuedMeta->findInt64("durationUs", &frameDeltaUs)) {
288             // For B frames
289             frameDeltaUs = latestTimeUs - lastQueuedTimeUs;
290             mLatestEnqueuedMeta->setInt64("durationUs", frameDeltaUs);
291         }
292     }
293 }
294 
clear()295 void AnotherPacketSource::clear() {
296     Mutex::Autolock autoLock(mLock);
297 
298     mBuffers.clear();
299     mEOSResult = OK;
300 
301     mDiscontinuitySegments.clear();
302     mDiscontinuitySegments.push_back(DiscontinuitySegment());
303 
304     mFormat = NULL;
305     mLatestEnqueuedMeta = NULL;
306 }
307 
queueDiscontinuity(ATSParser::DiscontinuityType type,const sp<AMessage> & extra,bool discard)308 void AnotherPacketSource::queueDiscontinuity(
309         ATSParser::DiscontinuityType type,
310         const sp<AMessage> &extra,
311         bool discard) {
312     Mutex::Autolock autoLock(mLock);
313 
314     if (discard) {
315         // Leave only discontinuities in the queue.
316         List<sp<ABuffer> >::iterator it = mBuffers.begin();
317         while (it != mBuffers.end()) {
318             sp<ABuffer> oldBuffer = *it;
319 
320             int32_t oldDiscontinuityType;
321             if (!oldBuffer->meta()->findInt32(
322                         "discontinuity", &oldDiscontinuityType)) {
323                 it = mBuffers.erase(it);
324                 continue;
325             }
326 
327             ++it;
328         }
329 
330         for (List<DiscontinuitySegment>::iterator it2 = mDiscontinuitySegments.begin();
331                 it2 != mDiscontinuitySegments.end();
332                 ++it2) {
333             DiscontinuitySegment &seg = *it2;
334             seg.clear();
335         }
336 
337     }
338 
339     mEOSResult = OK;
340     mLastQueuedTimeUs = 0;
341     mLatestEnqueuedMeta = NULL;
342 
343     if (type == ATSParser::DISCONTINUITY_NONE) {
344         return;
345     }
346 
347     mDiscontinuitySegments.push_back(DiscontinuitySegment());
348 
349     sp<ABuffer> buffer = new ABuffer(0);
350     buffer->meta()->setInt32("discontinuity", static_cast<int32_t>(type));
351     buffer->meta()->setMessage("extra", extra);
352 
353     mBuffers.push_back(buffer);
354     mCondition.signal();
355 }
356 
signalEOS(status_t result)357 void AnotherPacketSource::signalEOS(status_t result) {
358     CHECK(result != OK);
359 
360     Mutex::Autolock autoLock(mLock);
361     mEOSResult = result;
362     mCondition.signal();
363 }
364 
hasBufferAvailable(status_t * finalResult)365 bool AnotherPacketSource::hasBufferAvailable(status_t *finalResult) {
366     Mutex::Autolock autoLock(mLock);
367     *finalResult = OK;
368     if (!mEnabled) {
369         return false;
370     }
371     if (!mBuffers.empty()) {
372         return true;
373     }
374 
375     *finalResult = mEOSResult;
376     return false;
377 }
378 
hasDataBufferAvailable(status_t * finalResult)379 bool AnotherPacketSource::hasDataBufferAvailable(status_t *finalResult) {
380     Mutex::Autolock autoLock(mLock);
381     *finalResult = OK;
382     if (!mEnabled) {
383         return false;
384     }
385     List<sp<ABuffer> >::iterator it;
386     for (it = mBuffers.begin(); it != mBuffers.end(); it++) {
387         int32_t discontinuity;
388         if (!(*it)->meta()->findInt32("discontinuity", &discontinuity)) {
389             return true;
390         }
391     }
392 
393     *finalResult = mEOSResult;
394     return false;
395 }
396 
getAvailableBufferCount(status_t * finalResult)397 size_t AnotherPacketSource::getAvailableBufferCount(status_t *finalResult) {
398     Mutex::Autolock autoLock(mLock);
399 
400     *finalResult = OK;
401     if (!mEnabled) {
402         return 0;
403     }
404     if (!mBuffers.empty()) {
405         return mBuffers.size();
406     }
407     *finalResult = mEOSResult;
408     return 0;
409 }
410 
getBufferedDurationUs(status_t * finalResult)411 int64_t AnotherPacketSource::getBufferedDurationUs(status_t *finalResult) {
412     Mutex::Autolock autoLock(mLock);
413     *finalResult = mEOSResult;
414 
415     int64_t durationUs = 0;
416     for (List<DiscontinuitySegment>::iterator it = mDiscontinuitySegments.begin();
417             it != mDiscontinuitySegments.end();
418             ++it) {
419         const DiscontinuitySegment &seg = *it;
420         // dequeued access units should be a subset of enqueued access units
421         // CHECK(seg.maxEnqueTimeUs >= seg.mMaxDequeTimeUs);
422         durationUs += (seg.mMaxEnqueTimeUs - seg.mMaxDequeTimeUs);
423     }
424 
425     return durationUs;
426 }
427 
nextBufferTime(int64_t * timeUs)428 status_t AnotherPacketSource::nextBufferTime(int64_t *timeUs) {
429     *timeUs = 0;
430 
431     Mutex::Autolock autoLock(mLock);
432 
433     if (mBuffers.empty()) {
434         return mEOSResult != OK ? mEOSResult : -EWOULDBLOCK;
435     }
436 
437     sp<ABuffer> buffer = *mBuffers.begin();
438     CHECK(buffer->meta()->findInt64("timeUs", timeUs));
439 
440     return OK;
441 }
442 
isFinished(int64_t duration) const443 bool AnotherPacketSource::isFinished(int64_t duration) const {
444     if (duration > 0) {
445         int64_t diff = duration - mLastQueuedTimeUs;
446         if (diff < kNearEOSMarkUs && diff > -kNearEOSMarkUs) {
447             ALOGV("Detecting EOS due to near end");
448             return true;
449         }
450     }
451     return (mEOSResult != OK);
452 }
453 
getLatestEnqueuedMeta()454 sp<AMessage> AnotherPacketSource::getLatestEnqueuedMeta() {
455     Mutex::Autolock autoLock(mLock);
456     return mLatestEnqueuedMeta;
457 }
458 
getLatestDequeuedMeta()459 sp<AMessage> AnotherPacketSource::getLatestDequeuedMeta() {
460     Mutex::Autolock autoLock(mLock);
461     return mLatestDequeuedMeta;
462 }
463 
enable(bool enable)464 void AnotherPacketSource::enable(bool enable) {
465     Mutex::Autolock autoLock(mLock);
466     mEnabled = enable;
467 }
468 
469 /*
470  * returns the sample meta that's delayUs after queue head
471  * (NULL if such sample is unavailable)
472  */
getMetaAfterLastDequeued(int64_t delayUs)473 sp<AMessage> AnotherPacketSource::getMetaAfterLastDequeued(int64_t delayUs) {
474     Mutex::Autolock autoLock(mLock);
475     int64_t firstUs = -1;
476     int64_t lastUs = -1;
477     int64_t durationUs = 0;
478 
479     List<sp<ABuffer> >::iterator it;
480     for (it = mBuffers.begin(); it != mBuffers.end(); ++it) {
481         const sp<ABuffer> &buffer = *it;
482         int32_t discontinuity;
483         if (buffer->meta()->findInt32("discontinuity", &discontinuity)) {
484             durationUs += lastUs - firstUs;
485             firstUs = -1;
486             lastUs = -1;
487             continue;
488         }
489         int64_t timeUs;
490         if (buffer->meta()->findInt64("timeUs", &timeUs)) {
491             if (firstUs < 0) {
492                 firstUs = timeUs;
493             }
494             if (lastUs < 0 || timeUs > lastUs) {
495                 lastUs = timeUs;
496             }
497             if (durationUs + (lastUs - firstUs) >= delayUs) {
498                 return buffer->meta();
499             }
500         }
501     }
502     return NULL;
503 }
504 
505 /*
506  * removes samples with time equal or after meta
507  */
trimBuffersAfterMeta(const sp<AMessage> & meta)508 void AnotherPacketSource::trimBuffersAfterMeta(
509         const sp<AMessage> &meta) {
510     if (meta == NULL) {
511         ALOGW("trimming with NULL meta, ignoring");
512         return;
513     }
514 
515     Mutex::Autolock autoLock(mLock);
516     if (mBuffers.empty()) {
517         return;
518     }
519 
520     HLSTime stopTime(meta);
521     ALOGV("trimBuffersAfterMeta: discontinuitySeq %d, timeUs %lld",
522             stopTime.mSeq, (long long)stopTime.mTimeUs);
523 
524     List<sp<ABuffer> >::iterator it;
525     List<DiscontinuitySegment >::iterator it2;
526     sp<AMessage> newLatestEnqueuedMeta = NULL;
527     int64_t newLastQueuedTimeUs = 0;
528     for (it = mBuffers.begin(), it2 = mDiscontinuitySegments.begin(); it != mBuffers.end(); ++it) {
529         const sp<ABuffer> &buffer = *it;
530         int32_t discontinuity;
531         if (buffer->meta()->findInt32("discontinuity", &discontinuity)) {
532             // CHECK(it2 != mDiscontinuitySegments.end());
533             ++it2;
534             continue;
535         }
536 
537         HLSTime curTime(buffer->meta());
538         if (!(curTime < stopTime)) {
539             ALOGV("trimming from %lld (inclusive) to end",
540                     (long long)curTime.mTimeUs);
541             break;
542         }
543         newLatestEnqueuedMeta = buffer->meta();
544         newLastQueuedTimeUs = curTime.mTimeUs;
545     }
546 
547     mBuffers.erase(it, mBuffers.end());
548     mLatestEnqueuedMeta = newLatestEnqueuedMeta;
549     mLastQueuedTimeUs = newLastQueuedTimeUs;
550 
551     DiscontinuitySegment &seg = *it2;
552     if (newLatestEnqueuedMeta != NULL) {
553         seg.mMaxEnqueTimeUs = newLastQueuedTimeUs;
554     } else {
555         seg.clear();
556     }
557     mDiscontinuitySegments.erase(++it2, mDiscontinuitySegments.end());
558 }
559 
560 /*
561  * removes samples with time equal or before meta;
562  * returns first sample left in the queue.
563  *
564  * (for AVC, if trim happens, the samples left will always start
565  * at next IDR.)
566  */
trimBuffersBeforeMeta(const sp<AMessage> & meta)567 sp<AMessage> AnotherPacketSource::trimBuffersBeforeMeta(
568         const sp<AMessage> &meta) {
569     HLSTime startTime(meta);
570     ALOGV("trimBuffersBeforeMeta: discontinuitySeq %d, timeUs %lld",
571             startTime.mSeq, (long long)startTime.mTimeUs);
572 
573     sp<AMessage> firstMeta;
574     int64_t firstTimeUs = -1;
575     Mutex::Autolock autoLock(mLock);
576     if (mBuffers.empty()) {
577         return NULL;
578     }
579 
580     sp<MetaData> format;
581     bool isAvc = false;
582 
583     List<sp<ABuffer> >::iterator it;
584     for (it = mBuffers.begin(); it != mBuffers.end(); ++it) {
585         const sp<ABuffer> &buffer = *it;
586         int32_t discontinuity;
587         if (buffer->meta()->findInt32("discontinuity", &discontinuity)) {
588             mDiscontinuitySegments.erase(mDiscontinuitySegments.begin());
589             // CHECK(!mDiscontinuitySegments.empty());
590             format = NULL;
591             isAvc = false;
592             continue;
593         }
594         if (format == NULL) {
595             sp<RefBase> object;
596             if (buffer->meta()->findObject("format", &object)) {
597                 const char* mime;
598                 format = static_cast<MetaData*>(object.get());
599                 isAvc = format != NULL
600                         && format->findCString(kKeyMIMEType, &mime)
601                         && !strcasecmp(mime, MEDIA_MIMETYPE_VIDEO_AVC);
602             }
603         }
604         if (isAvc && !IsIDR(buffer)) {
605             continue;
606         }
607 
608         HLSTime curTime(buffer->meta());
609         if (startTime < curTime) {
610             ALOGV("trimming from beginning to %lld (not inclusive)",
611                     (long long)curTime.mTimeUs);
612             firstMeta = buffer->meta();
613             firstTimeUs = curTime.mTimeUs;
614             break;
615         }
616     }
617     mBuffers.erase(mBuffers.begin(), it);
618     mLatestDequeuedMeta = NULL;
619 
620     // CHECK(!mDiscontinuitySegments.empty());
621     DiscontinuitySegment &seg = *mDiscontinuitySegments.begin();
622     if (firstTimeUs >= 0) {
623         seg.mMaxDequeTimeUs = firstTimeUs;
624     } else {
625         seg.clear();
626     }
627 
628     return firstMeta;
629 }
630 
631 }  // namespace android
632