1 /*
2 * Copyright (C) 2010 The Android Open Source Project
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
17 //#define LOG_NDEBUG 0
18 #define LOG_TAG "AnotherPacketSource"
19
20 #include "AnotherPacketSource.h"
21
22 #include "include/avc_utils.h"
23
24 #include <media/stagefright/foundation/ABuffer.h>
25 #include <media/stagefright/foundation/ADebug.h>
26 #include <media/stagefright/foundation/AMessage.h>
27 #include <media/stagefright/foundation/AString.h>
28 #include <media/stagefright/foundation/hexdump.h>
29 #include <media/stagefright/MediaBuffer.h>
30 #include <media/stagefright/MediaDefs.h>
31 #include <media/stagefright/MetaData.h>
32 #include <media/stagefright/Utils.h>
33 #include <utils/Vector.h>
34
35 #include <inttypes.h>
36
37 namespace android {
38
39 const int64_t kNearEOSMarkUs = 2000000ll; // 2 secs
40
AnotherPacketSource(const sp<MetaData> & meta)41 AnotherPacketSource::AnotherPacketSource(const sp<MetaData> &meta)
42 : mIsAudio(false),
43 mIsVideo(false),
44 mEnabled(true),
45 mFormat(NULL),
46 mLastQueuedTimeUs(0),
47 mEOSResult(OK),
48 mLatestEnqueuedMeta(NULL),
49 mLatestDequeuedMeta(NULL) {
50 setFormat(meta);
51
52 mDiscontinuitySegments.push_back(DiscontinuitySegment());
53 }
54
setFormat(const sp<MetaData> & meta)55 void AnotherPacketSource::setFormat(const sp<MetaData> &meta) {
56 if (mFormat != NULL) {
57 // Only allowed to be set once. Requires explicit clear to reset.
58 return;
59 }
60
61 mIsAudio = false;
62 mIsVideo = false;
63
64 if (meta == NULL) {
65 return;
66 }
67
68 mFormat = meta;
69 const char *mime;
70 CHECK(meta->findCString(kKeyMIMEType, &mime));
71
72 if (!strncasecmp("audio/", mime, 6)) {
73 mIsAudio = true;
74 } else if (!strncasecmp("video/", mime, 6)) {
75 mIsVideo = true;
76 } else {
77 CHECK(!strncasecmp("text/", mime, 5) || !strncasecmp("application/", mime, 12));
78 }
79 }
80
~AnotherPacketSource()81 AnotherPacketSource::~AnotherPacketSource() {
82 }
83
start(MetaData *)84 status_t AnotherPacketSource::start(MetaData * /* params */) {
85 return OK;
86 }
87
stop()88 status_t AnotherPacketSource::stop() {
89 return OK;
90 }
91
getFormat()92 sp<MetaData> AnotherPacketSource::getFormat() {
93 Mutex::Autolock autoLock(mLock);
94 if (mFormat != NULL) {
95 return mFormat;
96 }
97
98 List<sp<ABuffer> >::iterator it = mBuffers.begin();
99 while (it != mBuffers.end()) {
100 sp<ABuffer> buffer = *it;
101 int32_t discontinuity;
102 if (!buffer->meta()->findInt32("discontinuity", &discontinuity)) {
103 sp<RefBase> object;
104 if (buffer->meta()->findObject("format", &object)) {
105 setFormat(static_cast<MetaData*>(object.get()));
106 return mFormat;
107 }
108 }
109
110 ++it;
111 }
112 return NULL;
113 }
114
dequeueAccessUnit(sp<ABuffer> * buffer)115 status_t AnotherPacketSource::dequeueAccessUnit(sp<ABuffer> *buffer) {
116 buffer->clear();
117
118 Mutex::Autolock autoLock(mLock);
119 while (mEOSResult == OK && mBuffers.empty()) {
120 mCondition.wait(mLock);
121 }
122
123 if (!mBuffers.empty()) {
124 *buffer = *mBuffers.begin();
125 mBuffers.erase(mBuffers.begin());
126
127 int32_t discontinuity;
128 if ((*buffer)->meta()->findInt32("discontinuity", &discontinuity)) {
129 if (wasFormatChange(discontinuity)) {
130 mFormat.clear();
131 }
132
133 mDiscontinuitySegments.erase(mDiscontinuitySegments.begin());
134 // CHECK(!mDiscontinuitySegments.empty());
135 return INFO_DISCONTINUITY;
136 }
137
138 // CHECK(!mDiscontinuitySegments.empty());
139 DiscontinuitySegment &seg = *mDiscontinuitySegments.begin();
140
141 int64_t timeUs;
142 mLatestDequeuedMeta = (*buffer)->meta()->dup();
143 CHECK(mLatestDequeuedMeta->findInt64("timeUs", &timeUs));
144 if (timeUs > seg.mMaxDequeTimeUs) {
145 seg.mMaxDequeTimeUs = timeUs;
146 }
147
148 sp<RefBase> object;
149 if ((*buffer)->meta()->findObject("format", &object)) {
150 setFormat(static_cast<MetaData*>(object.get()));
151 }
152
153 return OK;
154 }
155
156 return mEOSResult;
157 }
158
requeueAccessUnit(const sp<ABuffer> & buffer)159 void AnotherPacketSource::requeueAccessUnit(const sp<ABuffer> &buffer) {
160 // TODO: update corresponding book keeping info.
161 Mutex::Autolock autoLock(mLock);
162 mBuffers.push_front(buffer);
163 }
164
read(MediaBuffer ** out,const ReadOptions *)165 status_t AnotherPacketSource::read(
166 MediaBuffer **out, const ReadOptions *) {
167 *out = NULL;
168
169 Mutex::Autolock autoLock(mLock);
170 while (mEOSResult == OK && mBuffers.empty()) {
171 mCondition.wait(mLock);
172 }
173
174 if (!mBuffers.empty()) {
175
176 const sp<ABuffer> buffer = *mBuffers.begin();
177 mBuffers.erase(mBuffers.begin());
178
179 int32_t discontinuity;
180 if (buffer->meta()->findInt32("discontinuity", &discontinuity)) {
181 if (wasFormatChange(discontinuity)) {
182 mFormat.clear();
183 }
184
185 mDiscontinuitySegments.erase(mDiscontinuitySegments.begin());
186 // CHECK(!mDiscontinuitySegments.empty());
187 return INFO_DISCONTINUITY;
188 }
189
190 mLatestDequeuedMeta = buffer->meta()->dup();
191
192 sp<RefBase> object;
193 if (buffer->meta()->findObject("format", &object)) {
194 setFormat(static_cast<MetaData*>(object.get()));
195 }
196
197 int64_t timeUs;
198 CHECK(buffer->meta()->findInt64("timeUs", &timeUs));
199 // CHECK(!mDiscontinuitySegments.empty());
200 DiscontinuitySegment &seg = *mDiscontinuitySegments.begin();
201 if (timeUs > seg.mMaxDequeTimeUs) {
202 seg.mMaxDequeTimeUs = timeUs;
203 }
204
205 MediaBuffer *mediaBuffer = new MediaBuffer(buffer);
206
207 mediaBuffer->meta_data()->setInt64(kKeyTime, timeUs);
208
209 int32_t isSync;
210 if (buffer->meta()->findInt32("isSync", &isSync)) {
211 mediaBuffer->meta_data()->setInt32(kKeyIsSyncFrame, isSync);
212 }
213
214 sp<ABuffer> sei;
215 if (buffer->meta()->findBuffer("sei", &sei) && sei != NULL) {
216 mediaBuffer->meta_data()->setData(kKeySEI, 0, sei->data(), sei->size());
217 }
218
219 sp<ABuffer> mpegUserData;
220 if (buffer->meta()->findBuffer("mpegUserData", &mpegUserData) && mpegUserData != NULL) {
221 mediaBuffer->meta_data()->setData(
222 kKeyMpegUserData, 0, mpegUserData->data(), mpegUserData->size());
223 }
224
225 *out = mediaBuffer;
226 return OK;
227 }
228
229 return mEOSResult;
230 }
231
wasFormatChange(int32_t discontinuityType) const232 bool AnotherPacketSource::wasFormatChange(
233 int32_t discontinuityType) const {
234 if (mIsAudio) {
235 return (discontinuityType & ATSParser::DISCONTINUITY_AUDIO_FORMAT) != 0;
236 }
237
238 if (mIsVideo) {
239 return (discontinuityType & ATSParser::DISCONTINUITY_VIDEO_FORMAT) != 0;
240 }
241
242 return false;
243 }
244
queueAccessUnit(const sp<ABuffer> & buffer)245 void AnotherPacketSource::queueAccessUnit(const sp<ABuffer> &buffer) {
246 int32_t damaged;
247 if (buffer->meta()->findInt32("damaged", &damaged) && damaged) {
248 // LOG(VERBOSE) << "discarding damaged AU";
249 return;
250 }
251
252 Mutex::Autolock autoLock(mLock);
253 mBuffers.push_back(buffer);
254 mCondition.signal();
255
256 int32_t discontinuity;
257 if (buffer->meta()->findInt32("discontinuity", &discontinuity)){
258 ALOGV("queueing a discontinuity with queueAccessUnit");
259
260 mLastQueuedTimeUs = 0ll;
261 mEOSResult = OK;
262 mLatestEnqueuedMeta = NULL;
263
264 mDiscontinuitySegments.push_back(DiscontinuitySegment());
265 return;
266 }
267
268 int64_t lastQueuedTimeUs;
269 CHECK(buffer->meta()->findInt64("timeUs", &lastQueuedTimeUs));
270 mLastQueuedTimeUs = lastQueuedTimeUs;
271 ALOGV("queueAccessUnit timeUs=%" PRIi64 " us (%.2f secs)",
272 mLastQueuedTimeUs, mLastQueuedTimeUs / 1E6);
273
274 // CHECK(!mDiscontinuitySegments.empty());
275 DiscontinuitySegment &tailSeg = *(--mDiscontinuitySegments.end());
276 if (lastQueuedTimeUs > tailSeg.mMaxEnqueTimeUs) {
277 tailSeg.mMaxEnqueTimeUs = lastQueuedTimeUs;
278 }
279 if (tailSeg.mMaxDequeTimeUs == -1) {
280 tailSeg.mMaxDequeTimeUs = lastQueuedTimeUs;
281 }
282
283 if (mLatestEnqueuedMeta == NULL) {
284 mLatestEnqueuedMeta = buffer->meta()->dup();
285 } else {
286 int64_t latestTimeUs = 0;
287 int64_t frameDeltaUs = 0;
288 CHECK(mLatestEnqueuedMeta->findInt64("timeUs", &latestTimeUs));
289 if (lastQueuedTimeUs > latestTimeUs) {
290 mLatestEnqueuedMeta = buffer->meta()->dup();
291 frameDeltaUs = lastQueuedTimeUs - latestTimeUs;
292 mLatestEnqueuedMeta->setInt64("durationUs", frameDeltaUs);
293 } else if (!mLatestEnqueuedMeta->findInt64("durationUs", &frameDeltaUs)) {
294 // For B frames
295 frameDeltaUs = latestTimeUs - lastQueuedTimeUs;
296 mLatestEnqueuedMeta->setInt64("durationUs", frameDeltaUs);
297 }
298 }
299 }
300
clear()301 void AnotherPacketSource::clear() {
302 Mutex::Autolock autoLock(mLock);
303
304 mBuffers.clear();
305 mEOSResult = OK;
306
307 mDiscontinuitySegments.clear();
308 mDiscontinuitySegments.push_back(DiscontinuitySegment());
309
310 mFormat = NULL;
311 mLatestEnqueuedMeta = NULL;
312 }
313
queueDiscontinuity(ATSParser::DiscontinuityType type,const sp<AMessage> & extra,bool discard)314 void AnotherPacketSource::queueDiscontinuity(
315 ATSParser::DiscontinuityType type,
316 const sp<AMessage> &extra,
317 bool discard) {
318 Mutex::Autolock autoLock(mLock);
319
320 if (discard) {
321 // Leave only discontinuities in the queue.
322 List<sp<ABuffer> >::iterator it = mBuffers.begin();
323 while (it != mBuffers.end()) {
324 sp<ABuffer> oldBuffer = *it;
325
326 int32_t oldDiscontinuityType;
327 if (!oldBuffer->meta()->findInt32(
328 "discontinuity", &oldDiscontinuityType)) {
329 it = mBuffers.erase(it);
330 continue;
331 }
332
333 ++it;
334 }
335
336 for (List<DiscontinuitySegment>::iterator it2 = mDiscontinuitySegments.begin();
337 it2 != mDiscontinuitySegments.end();
338 ++it2) {
339 DiscontinuitySegment &seg = *it2;
340 seg.clear();
341 }
342
343 }
344
345 mEOSResult = OK;
346 mLastQueuedTimeUs = 0;
347 mLatestEnqueuedMeta = NULL;
348
349 if (type == ATSParser::DISCONTINUITY_NONE) {
350 return;
351 }
352
353 mDiscontinuitySegments.push_back(DiscontinuitySegment());
354
355 sp<ABuffer> buffer = new ABuffer(0);
356 buffer->meta()->setInt32("discontinuity", static_cast<int32_t>(type));
357 buffer->meta()->setMessage("extra", extra);
358
359 mBuffers.push_back(buffer);
360 mCondition.signal();
361 }
362
signalEOS(status_t result)363 void AnotherPacketSource::signalEOS(status_t result) {
364 CHECK(result != OK);
365
366 Mutex::Autolock autoLock(mLock);
367 mEOSResult = result;
368 mCondition.signal();
369 }
370
hasBufferAvailable(status_t * finalResult)371 bool AnotherPacketSource::hasBufferAvailable(status_t *finalResult) {
372 Mutex::Autolock autoLock(mLock);
373 *finalResult = OK;
374 if (!mEnabled) {
375 return false;
376 }
377 if (!mBuffers.empty()) {
378 return true;
379 }
380
381 *finalResult = mEOSResult;
382 return false;
383 }
384
hasDataBufferAvailable(status_t * finalResult)385 bool AnotherPacketSource::hasDataBufferAvailable(status_t *finalResult) {
386 Mutex::Autolock autoLock(mLock);
387 *finalResult = OK;
388 if (!mEnabled) {
389 return false;
390 }
391 List<sp<ABuffer> >::iterator it;
392 for (it = mBuffers.begin(); it != mBuffers.end(); it++) {
393 int32_t discontinuity;
394 if (!(*it)->meta()->findInt32("discontinuity", &discontinuity)) {
395 return true;
396 }
397 }
398
399 *finalResult = mEOSResult;
400 return false;
401 }
402
getAvailableBufferCount(status_t * finalResult)403 size_t AnotherPacketSource::getAvailableBufferCount(status_t *finalResult) {
404 Mutex::Autolock autoLock(mLock);
405
406 *finalResult = OK;
407 if (!mEnabled) {
408 return 0;
409 }
410 if (!mBuffers.empty()) {
411 return mBuffers.size();
412 }
413 *finalResult = mEOSResult;
414 return 0;
415 }
416
getBufferedDurationUs(status_t * finalResult)417 int64_t AnotherPacketSource::getBufferedDurationUs(status_t *finalResult) {
418 Mutex::Autolock autoLock(mLock);
419 *finalResult = mEOSResult;
420
421 int64_t durationUs = 0;
422 for (List<DiscontinuitySegment>::iterator it = mDiscontinuitySegments.begin();
423 it != mDiscontinuitySegments.end();
424 ++it) {
425 const DiscontinuitySegment &seg = *it;
426 // dequeued access units should be a subset of enqueued access units
427 // CHECK(seg.maxEnqueTimeUs >= seg.mMaxDequeTimeUs);
428 durationUs += (seg.mMaxEnqueTimeUs - seg.mMaxDequeTimeUs);
429 }
430
431 return durationUs;
432 }
433
nextBufferTime(int64_t * timeUs)434 status_t AnotherPacketSource::nextBufferTime(int64_t *timeUs) {
435 *timeUs = 0;
436
437 Mutex::Autolock autoLock(mLock);
438
439 if (mBuffers.empty()) {
440 return mEOSResult != OK ? mEOSResult : -EWOULDBLOCK;
441 }
442
443 sp<ABuffer> buffer = *mBuffers.begin();
444 CHECK(buffer->meta()->findInt64("timeUs", timeUs));
445
446 return OK;
447 }
448
isFinished(int64_t duration) const449 bool AnotherPacketSource::isFinished(int64_t duration) const {
450 if (duration > 0) {
451 int64_t diff = duration - mLastQueuedTimeUs;
452 if (diff < kNearEOSMarkUs && diff > -kNearEOSMarkUs) {
453 ALOGV("Detecting EOS due to near end");
454 return true;
455 }
456 }
457 return (mEOSResult != OK);
458 }
459
getLatestEnqueuedMeta()460 sp<AMessage> AnotherPacketSource::getLatestEnqueuedMeta() {
461 Mutex::Autolock autoLock(mLock);
462 return mLatestEnqueuedMeta;
463 }
464
getLatestDequeuedMeta()465 sp<AMessage> AnotherPacketSource::getLatestDequeuedMeta() {
466 Mutex::Autolock autoLock(mLock);
467 return mLatestDequeuedMeta;
468 }
469
enable(bool enable)470 void AnotherPacketSource::enable(bool enable) {
471 Mutex::Autolock autoLock(mLock);
472 mEnabled = enable;
473 }
474
475 /*
476 * returns the sample meta that's delayUs after queue head
477 * (NULL if such sample is unavailable)
478 */
getMetaAfterLastDequeued(int64_t delayUs)479 sp<AMessage> AnotherPacketSource::getMetaAfterLastDequeued(int64_t delayUs) {
480 Mutex::Autolock autoLock(mLock);
481 int64_t firstUs = -1;
482 int64_t lastUs = -1;
483 int64_t durationUs = 0;
484
485 List<sp<ABuffer> >::iterator it;
486 for (it = mBuffers.begin(); it != mBuffers.end(); ++it) {
487 const sp<ABuffer> &buffer = *it;
488 int32_t discontinuity;
489 if (buffer->meta()->findInt32("discontinuity", &discontinuity)) {
490 durationUs += lastUs - firstUs;
491 firstUs = -1;
492 lastUs = -1;
493 continue;
494 }
495 int64_t timeUs;
496 if (buffer->meta()->findInt64("timeUs", &timeUs)) {
497 if (firstUs < 0) {
498 firstUs = timeUs;
499 }
500 if (lastUs < 0 || timeUs > lastUs) {
501 lastUs = timeUs;
502 }
503 if (durationUs + (lastUs - firstUs) >= delayUs) {
504 return buffer->meta();
505 }
506 }
507 }
508 return NULL;
509 }
510
511 /*
512 * removes samples with time equal or after meta
513 */
trimBuffersAfterMeta(const sp<AMessage> & meta)514 void AnotherPacketSource::trimBuffersAfterMeta(
515 const sp<AMessage> &meta) {
516 if (meta == NULL) {
517 ALOGW("trimming with NULL meta, ignoring");
518 return;
519 }
520
521 Mutex::Autolock autoLock(mLock);
522 if (mBuffers.empty()) {
523 return;
524 }
525
526 HLSTime stopTime(meta);
527 ALOGV("trimBuffersAfterMeta: discontinuitySeq %d, timeUs %lld",
528 stopTime.mSeq, (long long)stopTime.mTimeUs);
529
530 List<sp<ABuffer> >::iterator it;
531 List<DiscontinuitySegment >::iterator it2;
532 sp<AMessage> newLatestEnqueuedMeta = NULL;
533 int64_t newLastQueuedTimeUs = 0;
534 for (it = mBuffers.begin(), it2 = mDiscontinuitySegments.begin(); it != mBuffers.end(); ++it) {
535 const sp<ABuffer> &buffer = *it;
536 int32_t discontinuity;
537 if (buffer->meta()->findInt32("discontinuity", &discontinuity)) {
538 // CHECK(it2 != mDiscontinuitySegments.end());
539 ++it2;
540 continue;
541 }
542
543 HLSTime curTime(buffer->meta());
544 if (!(curTime < stopTime)) {
545 ALOGV("trimming from %lld (inclusive) to end",
546 (long long)curTime.mTimeUs);
547 break;
548 }
549 newLatestEnqueuedMeta = buffer->meta();
550 newLastQueuedTimeUs = curTime.mTimeUs;
551 }
552
553 mBuffers.erase(it, mBuffers.end());
554 mLatestEnqueuedMeta = newLatestEnqueuedMeta;
555 mLastQueuedTimeUs = newLastQueuedTimeUs;
556
557 DiscontinuitySegment &seg = *it2;
558 if (newLatestEnqueuedMeta != NULL) {
559 seg.mMaxEnqueTimeUs = newLastQueuedTimeUs;
560 } else {
561 seg.clear();
562 }
563 mDiscontinuitySegments.erase(++it2, mDiscontinuitySegments.end());
564 }
565
566 /*
567 * removes samples with time equal or before meta;
568 * returns first sample left in the queue.
569 *
570 * (for AVC, if trim happens, the samples left will always start
571 * at next IDR.)
572 */
trimBuffersBeforeMeta(const sp<AMessage> & meta)573 sp<AMessage> AnotherPacketSource::trimBuffersBeforeMeta(
574 const sp<AMessage> &meta) {
575 HLSTime startTime(meta);
576 ALOGV("trimBuffersBeforeMeta: discontinuitySeq %d, timeUs %lld",
577 startTime.mSeq, (long long)startTime.mTimeUs);
578
579 sp<AMessage> firstMeta;
580 int64_t firstTimeUs = -1;
581 Mutex::Autolock autoLock(mLock);
582 if (mBuffers.empty()) {
583 return NULL;
584 }
585
586 sp<MetaData> format;
587 bool isAvc = false;
588
589 List<sp<ABuffer> >::iterator it;
590 for (it = mBuffers.begin(); it != mBuffers.end(); ++it) {
591 const sp<ABuffer> &buffer = *it;
592 int32_t discontinuity;
593 if (buffer->meta()->findInt32("discontinuity", &discontinuity)) {
594 mDiscontinuitySegments.erase(mDiscontinuitySegments.begin());
595 // CHECK(!mDiscontinuitySegments.empty());
596 format = NULL;
597 isAvc = false;
598 continue;
599 }
600 if (format == NULL) {
601 sp<RefBase> object;
602 if (buffer->meta()->findObject("format", &object)) {
603 const char* mime;
604 format = static_cast<MetaData*>(object.get());
605 isAvc = format != NULL
606 && format->findCString(kKeyMIMEType, &mime)
607 && !strcasecmp(mime, MEDIA_MIMETYPE_VIDEO_AVC);
608 }
609 }
610 if (isAvc && !IsIDR(buffer)) {
611 continue;
612 }
613
614 HLSTime curTime(buffer->meta());
615 if (startTime < curTime) {
616 ALOGV("trimming from beginning to %lld (not inclusive)",
617 (long long)curTime.mTimeUs);
618 firstMeta = buffer->meta();
619 firstTimeUs = curTime.mTimeUs;
620 break;
621 }
622 }
623 mBuffers.erase(mBuffers.begin(), it);
624 mLatestDequeuedMeta = NULL;
625
626 // CHECK(!mDiscontinuitySegments.empty());
627 DiscontinuitySegment &seg = *mDiscontinuitySegments.begin();
628 if (firstTimeUs >= 0) {
629 seg.mMaxDequeTimeUs = firstTimeUs;
630 } else {
631 seg.clear();
632 }
633
634 return firstMeta;
635 }
636
637 } // namespace android
638