1 /*
2 * Copyright (C) 2010 The Android Open Source Project
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
17 //#define LOG_NDEBUG 0
18 #define LOG_TAG "AnotherPacketSource"
19
20 #include "AnotherPacketSource.h"
21
22 #include "include/avc_utils.h"
23
24 #include <media/stagefright/foundation/ABuffer.h>
25 #include <media/stagefright/foundation/ADebug.h>
26 #include <media/stagefright/foundation/AMessage.h>
27 #include <media/stagefright/foundation/AString.h>
28 #include <media/stagefright/foundation/hexdump.h>
29 #include <media/stagefright/MediaBuffer.h>
30 #include <media/stagefright/MediaDefs.h>
31 #include <media/stagefright/MetaData.h>
32 #include <media/stagefright/Utils.h>
33 #include <utils/Vector.h>
34
35 #include <inttypes.h>
36
37 namespace android {
38
39 const int64_t kNearEOSMarkUs = 2000000ll; // 2 secs
40
AnotherPacketSource(const sp<MetaData> & meta)41 AnotherPacketSource::AnotherPacketSource(const sp<MetaData> &meta)
42 : mIsAudio(false),
43 mIsVideo(false),
44 mEnabled(true),
45 mFormat(NULL),
46 mLastQueuedTimeUs(0),
47 mEOSResult(OK),
48 mLatestEnqueuedMeta(NULL),
49 mLatestDequeuedMeta(NULL) {
50 setFormat(meta);
51
52 mDiscontinuitySegments.push_back(DiscontinuitySegment());
53 }
54
setFormat(const sp<MetaData> & meta)55 void AnotherPacketSource::setFormat(const sp<MetaData> &meta) {
56 if (mFormat != NULL) {
57 // Only allowed to be set once. Requires explicit clear to reset.
58 return;
59 }
60
61 mIsAudio = false;
62 mIsVideo = false;
63
64 if (meta == NULL) {
65 return;
66 }
67
68 mFormat = meta;
69 const char *mime;
70 CHECK(meta->findCString(kKeyMIMEType, &mime));
71
72 if (!strncasecmp("audio/", mime, 6)) {
73 mIsAudio = true;
74 } else if (!strncasecmp("video/", mime, 6)) {
75 mIsVideo = true;
76 } else {
77 CHECK(!strncasecmp("text/", mime, 5) || !strncasecmp("application/", mime, 12));
78 }
79 }
80
~AnotherPacketSource()81 AnotherPacketSource::~AnotherPacketSource() {
82 }
83
start(MetaData *)84 status_t AnotherPacketSource::start(MetaData * /* params */) {
85 return OK;
86 }
87
stop()88 status_t AnotherPacketSource::stop() {
89 return OK;
90 }
91
getFormat()92 sp<MetaData> AnotherPacketSource::getFormat() {
93 Mutex::Autolock autoLock(mLock);
94 if (mFormat != NULL) {
95 return mFormat;
96 }
97
98 List<sp<ABuffer> >::iterator it = mBuffers.begin();
99 while (it != mBuffers.end()) {
100 sp<ABuffer> buffer = *it;
101 int32_t discontinuity;
102 if (!buffer->meta()->findInt32("discontinuity", &discontinuity)) {
103 sp<RefBase> object;
104 if (buffer->meta()->findObject("format", &object)) {
105 setFormat(static_cast<MetaData*>(object.get()));
106 return mFormat;
107 }
108 }
109
110 ++it;
111 }
112 return NULL;
113 }
114
dequeueAccessUnit(sp<ABuffer> * buffer)115 status_t AnotherPacketSource::dequeueAccessUnit(sp<ABuffer> *buffer) {
116 buffer->clear();
117
118 Mutex::Autolock autoLock(mLock);
119 while (mEOSResult == OK && mBuffers.empty()) {
120 mCondition.wait(mLock);
121 }
122
123 if (!mBuffers.empty()) {
124 *buffer = *mBuffers.begin();
125 mBuffers.erase(mBuffers.begin());
126
127 int32_t discontinuity;
128 if ((*buffer)->meta()->findInt32("discontinuity", &discontinuity)) {
129 if (wasFormatChange(discontinuity)) {
130 mFormat.clear();
131 }
132
133 mDiscontinuitySegments.erase(mDiscontinuitySegments.begin());
134 // CHECK(!mDiscontinuitySegments.empty());
135 return INFO_DISCONTINUITY;
136 }
137
138 // CHECK(!mDiscontinuitySegments.empty());
139 DiscontinuitySegment &seg = *mDiscontinuitySegments.begin();
140
141 int64_t timeUs;
142 mLatestDequeuedMeta = (*buffer)->meta()->dup();
143 CHECK(mLatestDequeuedMeta->findInt64("timeUs", &timeUs));
144 if (timeUs > seg.mMaxDequeTimeUs) {
145 seg.mMaxDequeTimeUs = timeUs;
146 }
147
148 sp<RefBase> object;
149 if ((*buffer)->meta()->findObject("format", &object)) {
150 setFormat(static_cast<MetaData*>(object.get()));
151 }
152
153 return OK;
154 }
155
156 return mEOSResult;
157 }
158
requeueAccessUnit(const sp<ABuffer> & buffer)159 void AnotherPacketSource::requeueAccessUnit(const sp<ABuffer> &buffer) {
160 // TODO: update corresponding book keeping info.
161 Mutex::Autolock autoLock(mLock);
162 mBuffers.push_front(buffer);
163 }
164
read(MediaBuffer ** out,const ReadOptions *)165 status_t AnotherPacketSource::read(
166 MediaBuffer **out, const ReadOptions *) {
167 *out = NULL;
168
169 Mutex::Autolock autoLock(mLock);
170 while (mEOSResult == OK && mBuffers.empty()) {
171 mCondition.wait(mLock);
172 }
173
174 if (!mBuffers.empty()) {
175
176 const sp<ABuffer> buffer = *mBuffers.begin();
177 mBuffers.erase(mBuffers.begin());
178
179 int32_t discontinuity;
180 if (buffer->meta()->findInt32("discontinuity", &discontinuity)) {
181 if (wasFormatChange(discontinuity)) {
182 mFormat.clear();
183 }
184
185 mDiscontinuitySegments.erase(mDiscontinuitySegments.begin());
186 // CHECK(!mDiscontinuitySegments.empty());
187 return INFO_DISCONTINUITY;
188 }
189
190 mLatestDequeuedMeta = buffer->meta()->dup();
191
192 sp<RefBase> object;
193 if (buffer->meta()->findObject("format", &object)) {
194 setFormat(static_cast<MetaData*>(object.get()));
195 }
196
197 int64_t timeUs;
198 CHECK(buffer->meta()->findInt64("timeUs", &timeUs));
199 // CHECK(!mDiscontinuitySegments.empty());
200 DiscontinuitySegment &seg = *mDiscontinuitySegments.begin();
201 if (timeUs > seg.mMaxDequeTimeUs) {
202 seg.mMaxDequeTimeUs = timeUs;
203 }
204
205 MediaBuffer *mediaBuffer = new MediaBuffer(buffer);
206
207 mediaBuffer->meta_data()->setInt64(kKeyTime, timeUs);
208
209 int32_t isSync;
210 if (buffer->meta()->findInt32("isSync", &isSync)) {
211 mediaBuffer->meta_data()->setInt32(kKeyIsSyncFrame, isSync);
212 }
213
214 sp<ABuffer> sei;
215 if (buffer->meta()->findBuffer("sei", &sei) && sei != NULL) {
216 mediaBuffer->meta_data()->setData(kKeySEI, 0, sei->data(), sei->size());
217 }
218
219 *out = mediaBuffer;
220 return OK;
221 }
222
223 return mEOSResult;
224 }
225
wasFormatChange(int32_t discontinuityType) const226 bool AnotherPacketSource::wasFormatChange(
227 int32_t discontinuityType) const {
228 if (mIsAudio) {
229 return (discontinuityType & ATSParser::DISCONTINUITY_AUDIO_FORMAT) != 0;
230 }
231
232 if (mIsVideo) {
233 return (discontinuityType & ATSParser::DISCONTINUITY_VIDEO_FORMAT) != 0;
234 }
235
236 return false;
237 }
238
queueAccessUnit(const sp<ABuffer> & buffer)239 void AnotherPacketSource::queueAccessUnit(const sp<ABuffer> &buffer) {
240 int32_t damaged;
241 if (buffer->meta()->findInt32("damaged", &damaged) && damaged) {
242 // LOG(VERBOSE) << "discarding damaged AU";
243 return;
244 }
245
246 Mutex::Autolock autoLock(mLock);
247 mBuffers.push_back(buffer);
248 mCondition.signal();
249
250 int32_t discontinuity;
251 if (buffer->meta()->findInt32("discontinuity", &discontinuity)){
252 ALOGV("queueing a discontinuity with queueAccessUnit");
253
254 mLastQueuedTimeUs = 0ll;
255 mEOSResult = OK;
256 mLatestEnqueuedMeta = NULL;
257
258 mDiscontinuitySegments.push_back(DiscontinuitySegment());
259 return;
260 }
261
262 int64_t lastQueuedTimeUs;
263 CHECK(buffer->meta()->findInt64("timeUs", &lastQueuedTimeUs));
264 mLastQueuedTimeUs = lastQueuedTimeUs;
265 ALOGV("queueAccessUnit timeUs=%" PRIi64 " us (%.2f secs)",
266 mLastQueuedTimeUs, mLastQueuedTimeUs / 1E6);
267
268 // CHECK(!mDiscontinuitySegments.empty());
269 DiscontinuitySegment &tailSeg = *(--mDiscontinuitySegments.end());
270 if (lastQueuedTimeUs > tailSeg.mMaxEnqueTimeUs) {
271 tailSeg.mMaxEnqueTimeUs = lastQueuedTimeUs;
272 }
273 if (tailSeg.mMaxDequeTimeUs == -1) {
274 tailSeg.mMaxDequeTimeUs = lastQueuedTimeUs;
275 }
276
277 if (mLatestEnqueuedMeta == NULL) {
278 mLatestEnqueuedMeta = buffer->meta()->dup();
279 } else {
280 int64_t latestTimeUs = 0;
281 int64_t frameDeltaUs = 0;
282 CHECK(mLatestEnqueuedMeta->findInt64("timeUs", &latestTimeUs));
283 if (lastQueuedTimeUs > latestTimeUs) {
284 mLatestEnqueuedMeta = buffer->meta()->dup();
285 frameDeltaUs = lastQueuedTimeUs - latestTimeUs;
286 mLatestEnqueuedMeta->setInt64("durationUs", frameDeltaUs);
287 } else if (!mLatestEnqueuedMeta->findInt64("durationUs", &frameDeltaUs)) {
288 // For B frames
289 frameDeltaUs = latestTimeUs - lastQueuedTimeUs;
290 mLatestEnqueuedMeta->setInt64("durationUs", frameDeltaUs);
291 }
292 }
293 }
294
clear()295 void AnotherPacketSource::clear() {
296 Mutex::Autolock autoLock(mLock);
297
298 mBuffers.clear();
299 mEOSResult = OK;
300
301 mDiscontinuitySegments.clear();
302 mDiscontinuitySegments.push_back(DiscontinuitySegment());
303
304 mFormat = NULL;
305 mLatestEnqueuedMeta = NULL;
306 }
307
queueDiscontinuity(ATSParser::DiscontinuityType type,const sp<AMessage> & extra,bool discard)308 void AnotherPacketSource::queueDiscontinuity(
309 ATSParser::DiscontinuityType type,
310 const sp<AMessage> &extra,
311 bool discard) {
312 Mutex::Autolock autoLock(mLock);
313
314 if (discard) {
315 // Leave only discontinuities in the queue.
316 List<sp<ABuffer> >::iterator it = mBuffers.begin();
317 while (it != mBuffers.end()) {
318 sp<ABuffer> oldBuffer = *it;
319
320 int32_t oldDiscontinuityType;
321 if (!oldBuffer->meta()->findInt32(
322 "discontinuity", &oldDiscontinuityType)) {
323 it = mBuffers.erase(it);
324 continue;
325 }
326
327 ++it;
328 }
329
330 for (List<DiscontinuitySegment>::iterator it2 = mDiscontinuitySegments.begin();
331 it2 != mDiscontinuitySegments.end();
332 ++it2) {
333 DiscontinuitySegment &seg = *it2;
334 seg.clear();
335 }
336
337 }
338
339 mEOSResult = OK;
340 mLastQueuedTimeUs = 0;
341 mLatestEnqueuedMeta = NULL;
342
343 if (type == ATSParser::DISCONTINUITY_NONE) {
344 return;
345 }
346
347 mDiscontinuitySegments.push_back(DiscontinuitySegment());
348
349 sp<ABuffer> buffer = new ABuffer(0);
350 buffer->meta()->setInt32("discontinuity", static_cast<int32_t>(type));
351 buffer->meta()->setMessage("extra", extra);
352
353 mBuffers.push_back(buffer);
354 mCondition.signal();
355 }
356
signalEOS(status_t result)357 void AnotherPacketSource::signalEOS(status_t result) {
358 CHECK(result != OK);
359
360 Mutex::Autolock autoLock(mLock);
361 mEOSResult = result;
362 mCondition.signal();
363 }
364
hasBufferAvailable(status_t * finalResult)365 bool AnotherPacketSource::hasBufferAvailable(status_t *finalResult) {
366 Mutex::Autolock autoLock(mLock);
367 *finalResult = OK;
368 if (!mEnabled) {
369 return false;
370 }
371 if (!mBuffers.empty()) {
372 return true;
373 }
374
375 *finalResult = mEOSResult;
376 return false;
377 }
378
hasDataBufferAvailable(status_t * finalResult)379 bool AnotherPacketSource::hasDataBufferAvailable(status_t *finalResult) {
380 Mutex::Autolock autoLock(mLock);
381 *finalResult = OK;
382 if (!mEnabled) {
383 return false;
384 }
385 List<sp<ABuffer> >::iterator it;
386 for (it = mBuffers.begin(); it != mBuffers.end(); it++) {
387 int32_t discontinuity;
388 if (!(*it)->meta()->findInt32("discontinuity", &discontinuity)) {
389 return true;
390 }
391 }
392
393 *finalResult = mEOSResult;
394 return false;
395 }
396
getAvailableBufferCount(status_t * finalResult)397 size_t AnotherPacketSource::getAvailableBufferCount(status_t *finalResult) {
398 Mutex::Autolock autoLock(mLock);
399
400 *finalResult = OK;
401 if (!mEnabled) {
402 return 0;
403 }
404 if (!mBuffers.empty()) {
405 return mBuffers.size();
406 }
407 *finalResult = mEOSResult;
408 return 0;
409 }
410
getBufferedDurationUs(status_t * finalResult)411 int64_t AnotherPacketSource::getBufferedDurationUs(status_t *finalResult) {
412 Mutex::Autolock autoLock(mLock);
413 *finalResult = mEOSResult;
414
415 int64_t durationUs = 0;
416 for (List<DiscontinuitySegment>::iterator it = mDiscontinuitySegments.begin();
417 it != mDiscontinuitySegments.end();
418 ++it) {
419 const DiscontinuitySegment &seg = *it;
420 // dequeued access units should be a subset of enqueued access units
421 // CHECK(seg.maxEnqueTimeUs >= seg.mMaxDequeTimeUs);
422 durationUs += (seg.mMaxEnqueTimeUs - seg.mMaxDequeTimeUs);
423 }
424
425 return durationUs;
426 }
427
nextBufferTime(int64_t * timeUs)428 status_t AnotherPacketSource::nextBufferTime(int64_t *timeUs) {
429 *timeUs = 0;
430
431 Mutex::Autolock autoLock(mLock);
432
433 if (mBuffers.empty()) {
434 return mEOSResult != OK ? mEOSResult : -EWOULDBLOCK;
435 }
436
437 sp<ABuffer> buffer = *mBuffers.begin();
438 CHECK(buffer->meta()->findInt64("timeUs", timeUs));
439
440 return OK;
441 }
442
isFinished(int64_t duration) const443 bool AnotherPacketSource::isFinished(int64_t duration) const {
444 if (duration > 0) {
445 int64_t diff = duration - mLastQueuedTimeUs;
446 if (diff < kNearEOSMarkUs && diff > -kNearEOSMarkUs) {
447 ALOGV("Detecting EOS due to near end");
448 return true;
449 }
450 }
451 return (mEOSResult != OK);
452 }
453
getLatestEnqueuedMeta()454 sp<AMessage> AnotherPacketSource::getLatestEnqueuedMeta() {
455 Mutex::Autolock autoLock(mLock);
456 return mLatestEnqueuedMeta;
457 }
458
getLatestDequeuedMeta()459 sp<AMessage> AnotherPacketSource::getLatestDequeuedMeta() {
460 Mutex::Autolock autoLock(mLock);
461 return mLatestDequeuedMeta;
462 }
463
enable(bool enable)464 void AnotherPacketSource::enable(bool enable) {
465 Mutex::Autolock autoLock(mLock);
466 mEnabled = enable;
467 }
468
469 /*
470 * returns the sample meta that's delayUs after queue head
471 * (NULL if such sample is unavailable)
472 */
getMetaAfterLastDequeued(int64_t delayUs)473 sp<AMessage> AnotherPacketSource::getMetaAfterLastDequeued(int64_t delayUs) {
474 Mutex::Autolock autoLock(mLock);
475 int64_t firstUs = -1;
476 int64_t lastUs = -1;
477 int64_t durationUs = 0;
478
479 List<sp<ABuffer> >::iterator it;
480 for (it = mBuffers.begin(); it != mBuffers.end(); ++it) {
481 const sp<ABuffer> &buffer = *it;
482 int32_t discontinuity;
483 if (buffer->meta()->findInt32("discontinuity", &discontinuity)) {
484 durationUs += lastUs - firstUs;
485 firstUs = -1;
486 lastUs = -1;
487 continue;
488 }
489 int64_t timeUs;
490 if (buffer->meta()->findInt64("timeUs", &timeUs)) {
491 if (firstUs < 0) {
492 firstUs = timeUs;
493 }
494 if (lastUs < 0 || timeUs > lastUs) {
495 lastUs = timeUs;
496 }
497 if (durationUs + (lastUs - firstUs) >= delayUs) {
498 return buffer->meta();
499 }
500 }
501 }
502 return NULL;
503 }
504
505 /*
506 * removes samples with time equal or after meta
507 */
trimBuffersAfterMeta(const sp<AMessage> & meta)508 void AnotherPacketSource::trimBuffersAfterMeta(
509 const sp<AMessage> &meta) {
510 if (meta == NULL) {
511 ALOGW("trimming with NULL meta, ignoring");
512 return;
513 }
514
515 Mutex::Autolock autoLock(mLock);
516 if (mBuffers.empty()) {
517 return;
518 }
519
520 HLSTime stopTime(meta);
521 ALOGV("trimBuffersAfterMeta: discontinuitySeq %d, timeUs %lld",
522 stopTime.mSeq, (long long)stopTime.mTimeUs);
523
524 List<sp<ABuffer> >::iterator it;
525 List<DiscontinuitySegment >::iterator it2;
526 sp<AMessage> newLatestEnqueuedMeta = NULL;
527 int64_t newLastQueuedTimeUs = 0;
528 for (it = mBuffers.begin(), it2 = mDiscontinuitySegments.begin(); it != mBuffers.end(); ++it) {
529 const sp<ABuffer> &buffer = *it;
530 int32_t discontinuity;
531 if (buffer->meta()->findInt32("discontinuity", &discontinuity)) {
532 // CHECK(it2 != mDiscontinuitySegments.end());
533 ++it2;
534 continue;
535 }
536
537 HLSTime curTime(buffer->meta());
538 if (!(curTime < stopTime)) {
539 ALOGV("trimming from %lld (inclusive) to end",
540 (long long)curTime.mTimeUs);
541 break;
542 }
543 newLatestEnqueuedMeta = buffer->meta();
544 newLastQueuedTimeUs = curTime.mTimeUs;
545 }
546
547 mBuffers.erase(it, mBuffers.end());
548 mLatestEnqueuedMeta = newLatestEnqueuedMeta;
549 mLastQueuedTimeUs = newLastQueuedTimeUs;
550
551 DiscontinuitySegment &seg = *it2;
552 if (newLatestEnqueuedMeta != NULL) {
553 seg.mMaxEnqueTimeUs = newLastQueuedTimeUs;
554 } else {
555 seg.clear();
556 }
557 mDiscontinuitySegments.erase(++it2, mDiscontinuitySegments.end());
558 }
559
560 /*
561 * removes samples with time equal or before meta;
562 * returns first sample left in the queue.
563 *
564 * (for AVC, if trim happens, the samples left will always start
565 * at next IDR.)
566 */
trimBuffersBeforeMeta(const sp<AMessage> & meta)567 sp<AMessage> AnotherPacketSource::trimBuffersBeforeMeta(
568 const sp<AMessage> &meta) {
569 HLSTime startTime(meta);
570 ALOGV("trimBuffersBeforeMeta: discontinuitySeq %d, timeUs %lld",
571 startTime.mSeq, (long long)startTime.mTimeUs);
572
573 sp<AMessage> firstMeta;
574 int64_t firstTimeUs = -1;
575 Mutex::Autolock autoLock(mLock);
576 if (mBuffers.empty()) {
577 return NULL;
578 }
579
580 sp<MetaData> format;
581 bool isAvc = false;
582
583 List<sp<ABuffer> >::iterator it;
584 for (it = mBuffers.begin(); it != mBuffers.end(); ++it) {
585 const sp<ABuffer> &buffer = *it;
586 int32_t discontinuity;
587 if (buffer->meta()->findInt32("discontinuity", &discontinuity)) {
588 mDiscontinuitySegments.erase(mDiscontinuitySegments.begin());
589 // CHECK(!mDiscontinuitySegments.empty());
590 format = NULL;
591 isAvc = false;
592 continue;
593 }
594 if (format == NULL) {
595 sp<RefBase> object;
596 if (buffer->meta()->findObject("format", &object)) {
597 const char* mime;
598 format = static_cast<MetaData*>(object.get());
599 isAvc = format != NULL
600 && format->findCString(kKeyMIMEType, &mime)
601 && !strcasecmp(mime, MEDIA_MIMETYPE_VIDEO_AVC);
602 }
603 }
604 if (isAvc && !IsIDR(buffer)) {
605 continue;
606 }
607
608 HLSTime curTime(buffer->meta());
609 if (startTime < curTime) {
610 ALOGV("trimming from beginning to %lld (not inclusive)",
611 (long long)curTime.mTimeUs);
612 firstMeta = buffer->meta();
613 firstTimeUs = curTime.mTimeUs;
614 break;
615 }
616 }
617 mBuffers.erase(mBuffers.begin(), it);
618 mLatestDequeuedMeta = NULL;
619
620 // CHECK(!mDiscontinuitySegments.empty());
621 DiscontinuitySegment &seg = *mDiscontinuitySegments.begin();
622 if (firstTimeUs >= 0) {
623 seg.mMaxDequeTimeUs = firstTimeUs;
624 } else {
625 seg.clear();
626 }
627
628 return firstMeta;
629 }
630
631 } // namespace android
632