1 /*
2  * Copyright (C) 2010 The Android Open Source Project
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *      http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 //#define LOG_NDEBUG 0
18 #define LOG_TAG "AnotherPacketSource"
19 
20 #include "AnotherPacketSource.h"
21 
22 #include "include/avc_utils.h"
23 
24 #include <media/stagefright/foundation/ABuffer.h>
25 #include <media/stagefright/foundation/ADebug.h>
26 #include <media/stagefright/foundation/AMessage.h>
27 #include <media/stagefright/foundation/AString.h>
28 #include <media/stagefright/foundation/hexdump.h>
29 #include <media/stagefright/MediaBuffer.h>
30 #include <media/stagefright/MediaDefs.h>
31 #include <media/stagefright/MetaData.h>
32 #include <media/stagefright/Utils.h>
33 #include <utils/Vector.h>
34 
35 #include <inttypes.h>
36 
37 namespace android {
38 
39 const int64_t kNearEOSMarkUs = 2000000ll; // 2 secs
40 
AnotherPacketSource(const sp<MetaData> & meta)41 AnotherPacketSource::AnotherPacketSource(const sp<MetaData> &meta)
42     : mIsAudio(false),
43       mIsVideo(false),
44       mEnabled(true),
45       mFormat(NULL),
46       mLastQueuedTimeUs(0),
47       mEOSResult(OK),
48       mLatestEnqueuedMeta(NULL),
49       mLatestDequeuedMeta(NULL) {
50     setFormat(meta);
51 
52     mDiscontinuitySegments.push_back(DiscontinuitySegment());
53 }
54 
setFormat(const sp<MetaData> & meta)55 void AnotherPacketSource::setFormat(const sp<MetaData> &meta) {
56     if (mFormat != NULL) {
57         // Only allowed to be set once. Requires explicit clear to reset.
58         return;
59     }
60 
61     mIsAudio = false;
62     mIsVideo = false;
63 
64     if (meta == NULL) {
65         return;
66     }
67 
68     mFormat = meta;
69     const char *mime;
70     CHECK(meta->findCString(kKeyMIMEType, &mime));
71 
72     if (!strncasecmp("audio/", mime, 6)) {
73         mIsAudio = true;
74     } else  if (!strncasecmp("video/", mime, 6)) {
75         mIsVideo = true;
76     } else {
77         CHECK(!strncasecmp("text/", mime, 5) || !strncasecmp("application/", mime, 12));
78     }
79 }
80 
~AnotherPacketSource()81 AnotherPacketSource::~AnotherPacketSource() {
82 }
83 
start(MetaData *)84 status_t AnotherPacketSource::start(MetaData * /* params */) {
85     return OK;
86 }
87 
stop()88 status_t AnotherPacketSource::stop() {
89     return OK;
90 }
91 
getFormat()92 sp<MetaData> AnotherPacketSource::getFormat() {
93     Mutex::Autolock autoLock(mLock);
94     if (mFormat != NULL) {
95         return mFormat;
96     }
97 
98     List<sp<ABuffer> >::iterator it = mBuffers.begin();
99     while (it != mBuffers.end()) {
100         sp<ABuffer> buffer = *it;
101         int32_t discontinuity;
102         if (!buffer->meta()->findInt32("discontinuity", &discontinuity)) {
103             sp<RefBase> object;
104             if (buffer->meta()->findObject("format", &object)) {
105                 setFormat(static_cast<MetaData*>(object.get()));
106                 return mFormat;
107             }
108         }
109 
110         ++it;
111     }
112     return NULL;
113 }
114 
dequeueAccessUnit(sp<ABuffer> * buffer)115 status_t AnotherPacketSource::dequeueAccessUnit(sp<ABuffer> *buffer) {
116     buffer->clear();
117 
118     Mutex::Autolock autoLock(mLock);
119     while (mEOSResult == OK && mBuffers.empty()) {
120         mCondition.wait(mLock);
121     }
122 
123     if (!mBuffers.empty()) {
124         *buffer = *mBuffers.begin();
125         mBuffers.erase(mBuffers.begin());
126 
127         int32_t discontinuity;
128         if ((*buffer)->meta()->findInt32("discontinuity", &discontinuity)) {
129             if (wasFormatChange(discontinuity)) {
130                 mFormat.clear();
131             }
132 
133             mDiscontinuitySegments.erase(mDiscontinuitySegments.begin());
134             // CHECK(!mDiscontinuitySegments.empty());
135             return INFO_DISCONTINUITY;
136         }
137 
138         // CHECK(!mDiscontinuitySegments.empty());
139         DiscontinuitySegment &seg = *mDiscontinuitySegments.begin();
140 
141         int64_t timeUs;
142         mLatestDequeuedMeta = (*buffer)->meta()->dup();
143         CHECK(mLatestDequeuedMeta->findInt64("timeUs", &timeUs));
144         if (timeUs > seg.mMaxDequeTimeUs) {
145             seg.mMaxDequeTimeUs = timeUs;
146         }
147 
148         sp<RefBase> object;
149         if ((*buffer)->meta()->findObject("format", &object)) {
150             setFormat(static_cast<MetaData*>(object.get()));
151         }
152 
153         return OK;
154     }
155 
156     return mEOSResult;
157 }
158 
requeueAccessUnit(const sp<ABuffer> & buffer)159 void AnotherPacketSource::requeueAccessUnit(const sp<ABuffer> &buffer) {
160     // TODO: update corresponding book keeping info.
161     Mutex::Autolock autoLock(mLock);
162     mBuffers.push_front(buffer);
163 }
164 
read(MediaBuffer ** out,const ReadOptions *)165 status_t AnotherPacketSource::read(
166         MediaBuffer **out, const ReadOptions *) {
167     *out = NULL;
168 
169     Mutex::Autolock autoLock(mLock);
170     while (mEOSResult == OK && mBuffers.empty()) {
171         mCondition.wait(mLock);
172     }
173 
174     if (!mBuffers.empty()) {
175 
176         const sp<ABuffer> buffer = *mBuffers.begin();
177         mBuffers.erase(mBuffers.begin());
178 
179         int32_t discontinuity;
180         if (buffer->meta()->findInt32("discontinuity", &discontinuity)) {
181             if (wasFormatChange(discontinuity)) {
182                 mFormat.clear();
183             }
184 
185             mDiscontinuitySegments.erase(mDiscontinuitySegments.begin());
186             // CHECK(!mDiscontinuitySegments.empty());
187             return INFO_DISCONTINUITY;
188         }
189 
190         mLatestDequeuedMeta = buffer->meta()->dup();
191 
192         sp<RefBase> object;
193         if (buffer->meta()->findObject("format", &object)) {
194             setFormat(static_cast<MetaData*>(object.get()));
195         }
196 
197         int64_t timeUs;
198         CHECK(buffer->meta()->findInt64("timeUs", &timeUs));
199         // CHECK(!mDiscontinuitySegments.empty());
200         DiscontinuitySegment &seg = *mDiscontinuitySegments.begin();
201         if (timeUs > seg.mMaxDequeTimeUs) {
202             seg.mMaxDequeTimeUs = timeUs;
203         }
204 
205         MediaBuffer *mediaBuffer = new MediaBuffer(buffer);
206 
207         mediaBuffer->meta_data()->setInt64(kKeyTime, timeUs);
208 
209         int32_t isSync;
210         if (buffer->meta()->findInt32("isSync", &isSync)) {
211             mediaBuffer->meta_data()->setInt32(kKeyIsSyncFrame, isSync);
212         }
213 
214         sp<ABuffer> sei;
215         if (buffer->meta()->findBuffer("sei", &sei) && sei != NULL) {
216             mediaBuffer->meta_data()->setData(kKeySEI, 0, sei->data(), sei->size());
217         }
218 
219         sp<ABuffer> mpegUserData;
220         if (buffer->meta()->findBuffer("mpegUserData", &mpegUserData) && mpegUserData != NULL) {
221             mediaBuffer->meta_data()->setData(
222                     kKeyMpegUserData, 0, mpegUserData->data(), mpegUserData->size());
223         }
224 
225         *out = mediaBuffer;
226         return OK;
227     }
228 
229     return mEOSResult;
230 }
231 
wasFormatChange(int32_t discontinuityType) const232 bool AnotherPacketSource::wasFormatChange(
233         int32_t discontinuityType) const {
234     if (mIsAudio) {
235         return (discontinuityType & ATSParser::DISCONTINUITY_AUDIO_FORMAT) != 0;
236     }
237 
238     if (mIsVideo) {
239         return (discontinuityType & ATSParser::DISCONTINUITY_VIDEO_FORMAT) != 0;
240     }
241 
242     return false;
243 }
244 
queueAccessUnit(const sp<ABuffer> & buffer)245 void AnotherPacketSource::queueAccessUnit(const sp<ABuffer> &buffer) {
246     int32_t damaged;
247     if (buffer->meta()->findInt32("damaged", &damaged) && damaged) {
248         // LOG(VERBOSE) << "discarding damaged AU";
249         return;
250     }
251 
252     Mutex::Autolock autoLock(mLock);
253     mBuffers.push_back(buffer);
254     mCondition.signal();
255 
256     int32_t discontinuity;
257     if (buffer->meta()->findInt32("discontinuity", &discontinuity)){
258         ALOGV("queueing a discontinuity with queueAccessUnit");
259 
260         mLastQueuedTimeUs = 0ll;
261         mEOSResult = OK;
262         mLatestEnqueuedMeta = NULL;
263 
264         mDiscontinuitySegments.push_back(DiscontinuitySegment());
265         return;
266     }
267 
268     int64_t lastQueuedTimeUs;
269     CHECK(buffer->meta()->findInt64("timeUs", &lastQueuedTimeUs));
270     mLastQueuedTimeUs = lastQueuedTimeUs;
271     ALOGV("queueAccessUnit timeUs=%" PRIi64 " us (%.2f secs)",
272             mLastQueuedTimeUs, mLastQueuedTimeUs / 1E6);
273 
274     // CHECK(!mDiscontinuitySegments.empty());
275     DiscontinuitySegment &tailSeg = *(--mDiscontinuitySegments.end());
276     if (lastQueuedTimeUs > tailSeg.mMaxEnqueTimeUs) {
277         tailSeg.mMaxEnqueTimeUs = lastQueuedTimeUs;
278     }
279     if (tailSeg.mMaxDequeTimeUs == -1) {
280         tailSeg.mMaxDequeTimeUs = lastQueuedTimeUs;
281     }
282 
283     if (mLatestEnqueuedMeta == NULL) {
284         mLatestEnqueuedMeta = buffer->meta()->dup();
285     } else {
286         int64_t latestTimeUs = 0;
287         int64_t frameDeltaUs = 0;
288         CHECK(mLatestEnqueuedMeta->findInt64("timeUs", &latestTimeUs));
289         if (lastQueuedTimeUs > latestTimeUs) {
290             mLatestEnqueuedMeta = buffer->meta()->dup();
291             frameDeltaUs = lastQueuedTimeUs - latestTimeUs;
292             mLatestEnqueuedMeta->setInt64("durationUs", frameDeltaUs);
293         } else if (!mLatestEnqueuedMeta->findInt64("durationUs", &frameDeltaUs)) {
294             // For B frames
295             frameDeltaUs = latestTimeUs - lastQueuedTimeUs;
296             mLatestEnqueuedMeta->setInt64("durationUs", frameDeltaUs);
297         }
298     }
299 }
300 
clear()301 void AnotherPacketSource::clear() {
302     Mutex::Autolock autoLock(mLock);
303 
304     mBuffers.clear();
305     mEOSResult = OK;
306 
307     mDiscontinuitySegments.clear();
308     mDiscontinuitySegments.push_back(DiscontinuitySegment());
309 
310     mFormat = NULL;
311     mLatestEnqueuedMeta = NULL;
312 }
313 
queueDiscontinuity(ATSParser::DiscontinuityType type,const sp<AMessage> & extra,bool discard)314 void AnotherPacketSource::queueDiscontinuity(
315         ATSParser::DiscontinuityType type,
316         const sp<AMessage> &extra,
317         bool discard) {
318     Mutex::Autolock autoLock(mLock);
319 
320     if (discard) {
321         // Leave only discontinuities in the queue.
322         List<sp<ABuffer> >::iterator it = mBuffers.begin();
323         while (it != mBuffers.end()) {
324             sp<ABuffer> oldBuffer = *it;
325 
326             int32_t oldDiscontinuityType;
327             if (!oldBuffer->meta()->findInt32(
328                         "discontinuity", &oldDiscontinuityType)) {
329                 it = mBuffers.erase(it);
330                 continue;
331             }
332 
333             ++it;
334         }
335 
336         for (List<DiscontinuitySegment>::iterator it2 = mDiscontinuitySegments.begin();
337                 it2 != mDiscontinuitySegments.end();
338                 ++it2) {
339             DiscontinuitySegment &seg = *it2;
340             seg.clear();
341         }
342 
343     }
344 
345     mEOSResult = OK;
346     mLastQueuedTimeUs = 0;
347     mLatestEnqueuedMeta = NULL;
348 
349     if (type == ATSParser::DISCONTINUITY_NONE) {
350         return;
351     }
352 
353     mDiscontinuitySegments.push_back(DiscontinuitySegment());
354 
355     sp<ABuffer> buffer = new ABuffer(0);
356     buffer->meta()->setInt32("discontinuity", static_cast<int32_t>(type));
357     buffer->meta()->setMessage("extra", extra);
358 
359     mBuffers.push_back(buffer);
360     mCondition.signal();
361 }
362 
signalEOS(status_t result)363 void AnotherPacketSource::signalEOS(status_t result) {
364     CHECK(result != OK);
365 
366     Mutex::Autolock autoLock(mLock);
367     mEOSResult = result;
368     mCondition.signal();
369 }
370 
hasBufferAvailable(status_t * finalResult)371 bool AnotherPacketSource::hasBufferAvailable(status_t *finalResult) {
372     Mutex::Autolock autoLock(mLock);
373     *finalResult = OK;
374     if (!mEnabled) {
375         return false;
376     }
377     if (!mBuffers.empty()) {
378         return true;
379     }
380 
381     *finalResult = mEOSResult;
382     return false;
383 }
384 
hasDataBufferAvailable(status_t * finalResult)385 bool AnotherPacketSource::hasDataBufferAvailable(status_t *finalResult) {
386     Mutex::Autolock autoLock(mLock);
387     *finalResult = OK;
388     if (!mEnabled) {
389         return false;
390     }
391     List<sp<ABuffer> >::iterator it;
392     for (it = mBuffers.begin(); it != mBuffers.end(); it++) {
393         int32_t discontinuity;
394         if (!(*it)->meta()->findInt32("discontinuity", &discontinuity)) {
395             return true;
396         }
397     }
398 
399     *finalResult = mEOSResult;
400     return false;
401 }
402 
getAvailableBufferCount(status_t * finalResult)403 size_t AnotherPacketSource::getAvailableBufferCount(status_t *finalResult) {
404     Mutex::Autolock autoLock(mLock);
405 
406     *finalResult = OK;
407     if (!mEnabled) {
408         return 0;
409     }
410     if (!mBuffers.empty()) {
411         return mBuffers.size();
412     }
413     *finalResult = mEOSResult;
414     return 0;
415 }
416 
getBufferedDurationUs(status_t * finalResult)417 int64_t AnotherPacketSource::getBufferedDurationUs(status_t *finalResult) {
418     Mutex::Autolock autoLock(mLock);
419     *finalResult = mEOSResult;
420 
421     int64_t durationUs = 0;
422     for (List<DiscontinuitySegment>::iterator it = mDiscontinuitySegments.begin();
423             it != mDiscontinuitySegments.end();
424             ++it) {
425         const DiscontinuitySegment &seg = *it;
426         // dequeued access units should be a subset of enqueued access units
427         // CHECK(seg.maxEnqueTimeUs >= seg.mMaxDequeTimeUs);
428         durationUs += (seg.mMaxEnqueTimeUs - seg.mMaxDequeTimeUs);
429     }
430 
431     return durationUs;
432 }
433 
nextBufferTime(int64_t * timeUs)434 status_t AnotherPacketSource::nextBufferTime(int64_t *timeUs) {
435     *timeUs = 0;
436 
437     Mutex::Autolock autoLock(mLock);
438 
439     if (mBuffers.empty()) {
440         return mEOSResult != OK ? mEOSResult : -EWOULDBLOCK;
441     }
442 
443     sp<ABuffer> buffer = *mBuffers.begin();
444     CHECK(buffer->meta()->findInt64("timeUs", timeUs));
445 
446     return OK;
447 }
448 
isFinished(int64_t duration) const449 bool AnotherPacketSource::isFinished(int64_t duration) const {
450     if (duration > 0) {
451         int64_t diff = duration - mLastQueuedTimeUs;
452         if (diff < kNearEOSMarkUs && diff > -kNearEOSMarkUs) {
453             ALOGV("Detecting EOS due to near end");
454             return true;
455         }
456     }
457     return (mEOSResult != OK);
458 }
459 
getLatestEnqueuedMeta()460 sp<AMessage> AnotherPacketSource::getLatestEnqueuedMeta() {
461     Mutex::Autolock autoLock(mLock);
462     return mLatestEnqueuedMeta;
463 }
464 
getLatestDequeuedMeta()465 sp<AMessage> AnotherPacketSource::getLatestDequeuedMeta() {
466     Mutex::Autolock autoLock(mLock);
467     return mLatestDequeuedMeta;
468 }
469 
enable(bool enable)470 void AnotherPacketSource::enable(bool enable) {
471     Mutex::Autolock autoLock(mLock);
472     mEnabled = enable;
473 }
474 
475 /*
476  * returns the sample meta that's delayUs after queue head
477  * (NULL if such sample is unavailable)
478  */
getMetaAfterLastDequeued(int64_t delayUs)479 sp<AMessage> AnotherPacketSource::getMetaAfterLastDequeued(int64_t delayUs) {
480     Mutex::Autolock autoLock(mLock);
481     int64_t firstUs = -1;
482     int64_t lastUs = -1;
483     int64_t durationUs = 0;
484 
485     List<sp<ABuffer> >::iterator it;
486     for (it = mBuffers.begin(); it != mBuffers.end(); ++it) {
487         const sp<ABuffer> &buffer = *it;
488         int32_t discontinuity;
489         if (buffer->meta()->findInt32("discontinuity", &discontinuity)) {
490             durationUs += lastUs - firstUs;
491             firstUs = -1;
492             lastUs = -1;
493             continue;
494         }
495         int64_t timeUs;
496         if (buffer->meta()->findInt64("timeUs", &timeUs)) {
497             if (firstUs < 0) {
498                 firstUs = timeUs;
499             }
500             if (lastUs < 0 || timeUs > lastUs) {
501                 lastUs = timeUs;
502             }
503             if (durationUs + (lastUs - firstUs) >= delayUs) {
504                 return buffer->meta();
505             }
506         }
507     }
508     return NULL;
509 }
510 
511 /*
512  * removes samples with time equal or after meta
513  */
trimBuffersAfterMeta(const sp<AMessage> & meta)514 void AnotherPacketSource::trimBuffersAfterMeta(
515         const sp<AMessage> &meta) {
516     if (meta == NULL) {
517         ALOGW("trimming with NULL meta, ignoring");
518         return;
519     }
520 
521     Mutex::Autolock autoLock(mLock);
522     if (mBuffers.empty()) {
523         return;
524     }
525 
526     HLSTime stopTime(meta);
527     ALOGV("trimBuffersAfterMeta: discontinuitySeq %d, timeUs %lld",
528             stopTime.mSeq, (long long)stopTime.mTimeUs);
529 
530     List<sp<ABuffer> >::iterator it;
531     List<DiscontinuitySegment >::iterator it2;
532     sp<AMessage> newLatestEnqueuedMeta = NULL;
533     int64_t newLastQueuedTimeUs = 0;
534     for (it = mBuffers.begin(), it2 = mDiscontinuitySegments.begin(); it != mBuffers.end(); ++it) {
535         const sp<ABuffer> &buffer = *it;
536         int32_t discontinuity;
537         if (buffer->meta()->findInt32("discontinuity", &discontinuity)) {
538             // CHECK(it2 != mDiscontinuitySegments.end());
539             ++it2;
540             continue;
541         }
542 
543         HLSTime curTime(buffer->meta());
544         if (!(curTime < stopTime)) {
545             ALOGV("trimming from %lld (inclusive) to end",
546                     (long long)curTime.mTimeUs);
547             break;
548         }
549         newLatestEnqueuedMeta = buffer->meta();
550         newLastQueuedTimeUs = curTime.mTimeUs;
551     }
552 
553     mBuffers.erase(it, mBuffers.end());
554     mLatestEnqueuedMeta = newLatestEnqueuedMeta;
555     mLastQueuedTimeUs = newLastQueuedTimeUs;
556 
557     DiscontinuitySegment &seg = *it2;
558     if (newLatestEnqueuedMeta != NULL) {
559         seg.mMaxEnqueTimeUs = newLastQueuedTimeUs;
560     } else {
561         seg.clear();
562     }
563     mDiscontinuitySegments.erase(++it2, mDiscontinuitySegments.end());
564 }
565 
566 /*
567  * removes samples with time equal or before meta;
568  * returns first sample left in the queue.
569  *
570  * (for AVC, if trim happens, the samples left will always start
571  * at next IDR.)
572  */
trimBuffersBeforeMeta(const sp<AMessage> & meta)573 sp<AMessage> AnotherPacketSource::trimBuffersBeforeMeta(
574         const sp<AMessage> &meta) {
575     HLSTime startTime(meta);
576     ALOGV("trimBuffersBeforeMeta: discontinuitySeq %d, timeUs %lld",
577             startTime.mSeq, (long long)startTime.mTimeUs);
578 
579     sp<AMessage> firstMeta;
580     int64_t firstTimeUs = -1;
581     Mutex::Autolock autoLock(mLock);
582     if (mBuffers.empty()) {
583         return NULL;
584     }
585 
586     sp<MetaData> format;
587     bool isAvc = false;
588 
589     List<sp<ABuffer> >::iterator it;
590     for (it = mBuffers.begin(); it != mBuffers.end(); ++it) {
591         const sp<ABuffer> &buffer = *it;
592         int32_t discontinuity;
593         if (buffer->meta()->findInt32("discontinuity", &discontinuity)) {
594             mDiscontinuitySegments.erase(mDiscontinuitySegments.begin());
595             // CHECK(!mDiscontinuitySegments.empty());
596             format = NULL;
597             isAvc = false;
598             continue;
599         }
600         if (format == NULL) {
601             sp<RefBase> object;
602             if (buffer->meta()->findObject("format", &object)) {
603                 const char* mime;
604                 format = static_cast<MetaData*>(object.get());
605                 isAvc = format != NULL
606                         && format->findCString(kKeyMIMEType, &mime)
607                         && !strcasecmp(mime, MEDIA_MIMETYPE_VIDEO_AVC);
608             }
609         }
610         if (isAvc && !IsIDR(buffer)) {
611             continue;
612         }
613 
614         HLSTime curTime(buffer->meta());
615         if (startTime < curTime) {
616             ALOGV("trimming from beginning to %lld (not inclusive)",
617                     (long long)curTime.mTimeUs);
618             firstMeta = buffer->meta();
619             firstTimeUs = curTime.mTimeUs;
620             break;
621         }
622     }
623     mBuffers.erase(mBuffers.begin(), it);
624     mLatestDequeuedMeta = NULL;
625 
626     // CHECK(!mDiscontinuitySegments.empty());
627     DiscontinuitySegment &seg = *mDiscontinuitySegments.begin();
628     if (firstTimeUs >= 0) {
629         seg.mMaxDequeTimeUs = firstTimeUs;
630     } else {
631         seg.clear();
632     }
633 
634     return firstMeta;
635 }
636 
637 }  // namespace android
638