1 /*
2  * Copyright (C) 2010 The Android Open Source Project
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *      http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 #include <inttypes.h>
18 
19 //#define LOG_NDEBUG 0
20 #define LOG_TAG "NuCachedSource2"
21 #include <utils/Log.h>
22 
23 #include <datasource/NuCachedSource2.h>
24 #include <datasource/HTTPBase.h>
25 
26 #include <cutils/properties.h>
27 #include <media/stagefright/foundation/ADebug.h>
28 #include <media/stagefright/foundation/AMessage.h>
29 #include <media/stagefright/MediaErrors.h>
30 
31 namespace android {
32 
33 struct PageCache {
34     explicit PageCache(size_t pageSize);
35     ~PageCache();
36 
37     struct Page {
38         void *mData;
39         size_t mSize;
40     };
41 
42     Page *acquirePage();
43     void releasePage(Page *page);
44 
45     void appendPage(Page *page);
46     size_t releaseFromStart(size_t maxBytes);
47 
totalSizeandroid::PageCache48     size_t totalSize() const {
49         return mTotalSize;
50     }
51 
52     void copy(size_t from, void *data, size_t size);
53 
54 private:
55     size_t mPageSize;
56     size_t mTotalSize;
57 
58     List<Page *> mActivePages;
59     List<Page *> mFreePages;
60 
61     void freePages(List<Page *> *list);
62 
63     DISALLOW_EVIL_CONSTRUCTORS(PageCache);
64 };
65 
PageCache(size_t pageSize)66 PageCache::PageCache(size_t pageSize)
67     : mPageSize(pageSize),
68       mTotalSize(0) {
69 }
70 
~PageCache()71 PageCache::~PageCache() {
72     freePages(&mActivePages);
73     freePages(&mFreePages);
74 }
75 
freePages(List<Page * > * list)76 void PageCache::freePages(List<Page *> *list) {
77     List<Page *>::iterator it = list->begin();
78     while (it != list->end()) {
79         Page *page = *it;
80 
81         free(page->mData);
82         delete page;
83         page = NULL;
84 
85         ++it;
86     }
87 }
88 
acquirePage()89 PageCache::Page *PageCache::acquirePage() {
90     if (!mFreePages.empty()) {
91         List<Page *>::iterator it = mFreePages.begin();
92         Page *page = *it;
93         mFreePages.erase(it);
94 
95         return page;
96     }
97 
98     Page *page = new Page;
99     page->mData = malloc(mPageSize);
100     page->mSize = 0;
101 
102     return page;
103 }
104 
releasePage(Page * page)105 void PageCache::releasePage(Page *page) {
106     page->mSize = 0;
107     mFreePages.push_back(page);
108 }
109 
appendPage(Page * page)110 void PageCache::appendPage(Page *page) {
111     mTotalSize += page->mSize;
112     mActivePages.push_back(page);
113 }
114 
releaseFromStart(size_t maxBytes)115 size_t PageCache::releaseFromStart(size_t maxBytes) {
116     size_t bytesReleased = 0;
117 
118     while (maxBytes > 0 && !mActivePages.empty()) {
119         List<Page *>::iterator it = mActivePages.begin();
120 
121         Page *page = *it;
122 
123         if (maxBytes < page->mSize) {
124             break;
125         }
126 
127         mActivePages.erase(it);
128 
129         maxBytes -= page->mSize;
130         bytesReleased += page->mSize;
131 
132         releasePage(page);
133     }
134 
135     mTotalSize -= bytesReleased;
136     return bytesReleased;
137 }
138 
copy(size_t from,void * data,size_t size)139 void PageCache::copy(size_t from, void *data, size_t size) {
140     ALOGV("copy from %zu size %zu", from, size);
141 
142     if (size == 0) {
143         return;
144     }
145 
146     CHECK_LE(from + size, mTotalSize);
147 
148     size_t offset = 0;
149     List<Page *>::iterator it = mActivePages.begin();
150     while (from >= offset + (*it)->mSize) {
151         offset += (*it)->mSize;
152         ++it;
153     }
154 
155     size_t delta = from - offset;
156     size_t avail = (*it)->mSize - delta;
157 
158     if (avail >= size) {
159         memcpy(data, (const uint8_t *)(*it)->mData + delta, size);
160         return;
161     }
162 
163     memcpy(data, (const uint8_t *)(*it)->mData + delta, avail);
164     ++it;
165     data = (uint8_t *)data + avail;
166     size -= avail;
167 
168     while (size > 0) {
169         size_t copy = (*it)->mSize;
170         if (copy > size) {
171             copy = size;
172         }
173         memcpy(data, (*it)->mData, copy);
174         data = (uint8_t *)data + copy;
175         size -= copy;
176         ++it;
177     }
178 }
179 
180 ////////////////////////////////////////////////////////////////////////////////
181 
NuCachedSource2(const sp<DataSource> & source,const char * cacheConfig,bool disconnectAtHighwatermark)182 NuCachedSource2::NuCachedSource2(
183         const sp<DataSource> &source,
184         const char *cacheConfig,
185         bool disconnectAtHighwatermark)
186     : mSource(source),
187       mReflector(new AHandlerReflector<NuCachedSource2>(this)),
188       mLooper(new ALooper),
189       mCache(new PageCache(kPageSize)),
190       mCacheOffset(0),
191       mFinalStatus(OK),
192       mLastAccessPos(0),
193       mFetching(true),
194       mDisconnecting(false),
195       mLastFetchTimeUs(-1),
196       mNumRetriesLeft(kMaxNumRetries),
197       mHighwaterThresholdBytes(kDefaultHighWaterThreshold),
198       mLowwaterThresholdBytes(kDefaultLowWaterThreshold),
199       mKeepAliveIntervalUs(kDefaultKeepAliveIntervalUs),
200       mDisconnectAtHighwatermark(disconnectAtHighwatermark) {
201     // We are NOT going to support disconnect-at-highwatermark indefinitely
202     // and we are not guaranteeing support for client-specified cache
203     // parameters. Both of these are temporary measures to solve a specific
204     // problem that will be solved in a better way going forward.
205 
206     updateCacheParamsFromSystemProperty();
207 
208     if (cacheConfig != NULL) {
209         updateCacheParamsFromString(cacheConfig);
210     }
211 
212     if (mDisconnectAtHighwatermark) {
213         // Makes no sense to disconnect and do keep-alives...
214         mKeepAliveIntervalUs = 0;
215     }
216 
217     mLooper->setName("NuCachedSource2");
218     mLooper->registerHandler(mReflector);
219 
220     // Since it may not be obvious why our looper thread needs to be
221     // able to call into java since it doesn't appear to do so at all...
222     // IMediaHTTPConnection may be (and most likely is) implemented in JAVA
223     // and a local JAVA IBinder will call directly into JNI methods.
224     // So whenever we call DataSource::readAt it may end up in a call to
225     // IMediaHTTPConnection::readAt and therefore call back into JAVA.
226     mLooper->start(false /* runOnCallingThread */, true /* canCallJava */);
227 
228     mName = String8::format("NuCachedSource2(%s)", mSource->toString().c_str());
229 }
230 
~NuCachedSource2()231 NuCachedSource2::~NuCachedSource2() {
232     mLooper->stop();
233     mLooper->unregisterHandler(mReflector->id());
234 
235     delete mCache;
236     mCache = NULL;
237 }
238 
239 // static
Create(const sp<DataSource> & source,const char * cacheConfig,bool disconnectAtHighwatermark)240 sp<NuCachedSource2> NuCachedSource2::Create(
241         const sp<DataSource> &source,
242         const char *cacheConfig,
243         bool disconnectAtHighwatermark) {
244     sp<NuCachedSource2> instance = new NuCachedSource2(
245             source, cacheConfig, disconnectAtHighwatermark);
246     Mutex::Autolock autoLock(instance->mLock);
247     (new AMessage(kWhatFetchMore, instance->mReflector))->post();
248     return instance;
249 }
250 
getEstimatedBandwidthKbps(int32_t * kbps)251 status_t NuCachedSource2::getEstimatedBandwidthKbps(int32_t *kbps) {
252     if (mSource->flags() & kIsHTTPBasedSource) {
253         HTTPBase* source = static_cast<HTTPBase *>(mSource.get());
254         return source->getEstimatedBandwidthKbps(kbps);
255     }
256     return ERROR_UNSUPPORTED;
257 }
258 
close()259 void NuCachedSource2::close() {
260     disconnect();
261 }
262 
disconnect()263 void NuCachedSource2::disconnect() {
264     if (mSource->flags() & kIsHTTPBasedSource) {
265         ALOGV("disconnecting HTTPBasedSource");
266 
267         {
268             Mutex::Autolock autoLock(mLock);
269             // set mDisconnecting to true, if a fetch returns after
270             // this, the source will be marked as EOS.
271             mDisconnecting = true;
272 
273             // explicitly signal mCondition so that the pending readAt()
274             // will immediately return
275             mCondition.signal();
276         }
277 
278         // explicitly disconnect from the source, to allow any
279         // pending reads to return more promptly
280         static_cast<HTTPBase *>(mSource.get())->disconnect();
281     }
282 }
283 
setCacheStatCollectFreq(int32_t freqMs)284 status_t NuCachedSource2::setCacheStatCollectFreq(int32_t freqMs) {
285     if (mSource->flags() & kIsHTTPBasedSource) {
286         HTTPBase *source = static_cast<HTTPBase *>(mSource.get());
287         return source->setBandwidthStatCollectFreq(freqMs);
288     }
289     return ERROR_UNSUPPORTED;
290 }
291 
initCheck() const292 status_t NuCachedSource2::initCheck() const {
293     return mSource->initCheck();
294 }
295 
getSize(off64_t * size)296 status_t NuCachedSource2::getSize(off64_t *size) {
297     return mSource->getSize(size);
298 }
299 
flags()300 uint32_t NuCachedSource2::flags() {
301     // Remove HTTP related flags since NuCachedSource2 is not HTTP-based.
302     uint32_t flags = mSource->flags() & ~(kWantsPrefetching | kIsHTTPBasedSource);
303     return (flags | kIsCachingDataSource);
304 }
305 
onMessageReceived(const sp<AMessage> & msg)306 void NuCachedSource2::onMessageReceived(const sp<AMessage> &msg) {
307     switch (msg->what()) {
308         case kWhatFetchMore:
309         {
310             onFetch();
311             break;
312         }
313 
314         case kWhatRead:
315         {
316             onRead(msg);
317             break;
318         }
319 
320         default:
321             TRESPASS();
322     }
323 }
324 
fetchInternal()325 void NuCachedSource2::fetchInternal() {
326     ALOGV("fetchInternal");
327 
328     bool reconnect = false;
329 
330     {
331         Mutex::Autolock autoLock(mLock);
332         CHECK(mFinalStatus == OK || mNumRetriesLeft > 0);
333 
334         if (mFinalStatus != OK) {
335             --mNumRetriesLeft;
336 
337             reconnect = true;
338         }
339     }
340 
341     if (reconnect) {
342         status_t err =
343             mSource->reconnectAtOffset(mCacheOffset + mCache->totalSize());
344 
345         Mutex::Autolock autoLock(mLock);
346 
347         if (mDisconnecting) {
348             mNumRetriesLeft = 0;
349             mFinalStatus = ERROR_END_OF_STREAM;
350             return;
351         } else if (err == ERROR_UNSUPPORTED || err == -EPIPE) {
352             // These are errors that are not likely to go away even if we
353             // retry, i.e. the server doesn't support range requests or similar.
354             mNumRetriesLeft = 0;
355             return;
356         } else if (err != OK) {
357             ALOGI("The attempt to reconnect failed, %d retries remaining",
358                  mNumRetriesLeft);
359 
360             return;
361         }
362     }
363 
364     PageCache::Page *page = mCache->acquirePage();
365 
366     ssize_t n = mSource->readAt(
367             mCacheOffset + mCache->totalSize(), page->mData, kPageSize);
368 
369     Mutex::Autolock autoLock(mLock);
370 
371     if (n == 0 || mDisconnecting) {
372         ALOGI("caching reached eos.");
373 
374         mNumRetriesLeft = 0;
375         mFinalStatus = ERROR_END_OF_STREAM;
376 
377         mCache->releasePage(page);
378     } else if (n < 0) {
379         mFinalStatus = n;
380         if (n == ERROR_UNSUPPORTED || n == -EPIPE) {
381             // These are errors that are not likely to go away even if we
382             // retry, i.e. the server doesn't support range requests or similar.
383             mNumRetriesLeft = 0;
384         }
385 
386         ALOGE("source returned error %zd, %d retries left", n, mNumRetriesLeft);
387         mCache->releasePage(page);
388     } else {
389         if (mFinalStatus != OK) {
390             ALOGI("retrying a previously failed read succeeded.");
391         }
392         mNumRetriesLeft = kMaxNumRetries;
393         mFinalStatus = OK;
394 
395         page->mSize = n;
396         mCache->appendPage(page);
397     }
398 }
399 
onFetch()400 void NuCachedSource2::onFetch() {
401     ALOGV("onFetch");
402 
403     if (mFinalStatus != OK && mNumRetriesLeft == 0) {
404         ALOGV("EOS reached, done prefetching for now");
405         mFetching = false;
406     }
407 
408     bool keepAlive =
409         !mFetching
410             && mFinalStatus == OK
411             && mKeepAliveIntervalUs > 0
412             && ALooper::GetNowUs() >= mLastFetchTimeUs + mKeepAliveIntervalUs;
413 
414     if (mFetching || keepAlive) {
415         if (keepAlive) {
416             ALOGI("Keep alive");
417         }
418 
419         fetchInternal();
420 
421         mLastFetchTimeUs = ALooper::GetNowUs();
422 
423         if (mFetching && mCache->totalSize() >= mHighwaterThresholdBytes) {
424             ALOGI("Cache full, done prefetching for now");
425             mFetching = false;
426 
427             if (mDisconnectAtHighwatermark
428                     && (mSource->flags() & DataSource::kIsHTTPBasedSource)) {
429                 ALOGV("Disconnecting at high watermark");
430                 static_cast<HTTPBase *>(mSource.get())->disconnect();
431                 mFinalStatus = -EAGAIN;
432             }
433         }
434     } else {
435         Mutex::Autolock autoLock(mLock);
436         restartPrefetcherIfNecessary_l();
437     }
438 
439     int64_t delayUs;
440     if (mFetching) {
441         if (mFinalStatus != OK && mNumRetriesLeft > 0) {
442             // We failed this time and will try again in 3 seconds.
443             delayUs = 3000000LL;
444         } else {
445             delayUs = 0;
446         }
447     } else {
448         delayUs = 100000LL;
449     }
450 
451     (new AMessage(kWhatFetchMore, mReflector))->post(delayUs);
452 }
453 
onRead(const sp<AMessage> & msg)454 void NuCachedSource2::onRead(const sp<AMessage> &msg) {
455     ALOGV("onRead");
456 
457     int64_t offset;
458     CHECK(msg->findInt64("offset", &offset));
459 
460     void *data;
461     CHECK(msg->findPointer("data", &data));
462 
463     size_t size;
464     CHECK(msg->findSize("size", &size));
465 
466     ssize_t result = readInternal(offset, data, size);
467 
468     if (result == -EAGAIN) {
469         msg->post(50000);
470         return;
471     }
472 
473     Mutex::Autolock autoLock(mLock);
474     if (mDisconnecting) {
475         mCondition.signal();
476         return;
477     }
478 
479     CHECK(mAsyncResult == NULL);
480 
481     mAsyncResult = new AMessage;
482     mAsyncResult->setInt32("result", result);
483 
484     mCondition.signal();
485 }
486 
restartPrefetcherIfNecessary_l(bool ignoreLowWaterThreshold,bool force)487 void NuCachedSource2::restartPrefetcherIfNecessary_l(
488         bool ignoreLowWaterThreshold, bool force) {
489     static const size_t kGrayArea = 1024 * 1024;
490 
491     if (mFetching || (mFinalStatus != OK && mNumRetriesLeft == 0)) {
492         return;
493     }
494 
495     if (!ignoreLowWaterThreshold && !force
496             && mCacheOffset + mCache->totalSize() - mLastAccessPos
497                 >= mLowwaterThresholdBytes) {
498         return;
499     }
500 
501     size_t maxBytes = mLastAccessPos - mCacheOffset;
502 
503     if (!force) {
504         if (maxBytes < kGrayArea) {
505             return;
506         }
507 
508         maxBytes -= kGrayArea;
509     }
510 
511     size_t actualBytes = mCache->releaseFromStart(maxBytes);
512     mCacheOffset += actualBytes;
513 
514     ALOGI("restarting prefetcher, totalSize = %zu", mCache->totalSize());
515     mFetching = true;
516 }
517 
readAt(off64_t offset,void * data,size_t size)518 ssize_t NuCachedSource2::readAt(off64_t offset, void *data, size_t size) {
519     Mutex::Autolock autoSerializer(mSerializer);
520 
521     ALOGV("readAt offset %lld, size %zu", (long long)offset, size);
522 
523     Mutex::Autolock autoLock(mLock);
524     if (mDisconnecting) {
525         return ERROR_END_OF_STREAM;
526     }
527 
528     // If the request can be completely satisfied from the cache, do so.
529 
530     if (offset >= mCacheOffset
531             && offset + size <= mCacheOffset + mCache->totalSize()) {
532         size_t delta = offset - mCacheOffset;
533         mCache->copy(delta, data, size);
534 
535         mLastAccessPos = offset + size;
536 
537         return size;
538     }
539 
540     sp<AMessage> msg = new AMessage(kWhatRead, mReflector);
541     msg->setInt64("offset", offset);
542     msg->setPointer("data", data);
543     msg->setSize("size", size);
544 
545     CHECK(mAsyncResult == NULL);
546     msg->post();
547 
548     while (mAsyncResult == NULL && !mDisconnecting) {
549         mCondition.wait(mLock);
550     }
551 
552     if (mDisconnecting) {
553         mAsyncResult.clear();
554         return ERROR_END_OF_STREAM;
555     }
556 
557     int32_t result;
558     CHECK(mAsyncResult->findInt32("result", &result));
559 
560     mAsyncResult.clear();
561 
562     if (result > 0) {
563         mLastAccessPos = offset + result;
564     }
565 
566     return (ssize_t)result;
567 }
568 
cachedSize()569 size_t NuCachedSource2::cachedSize() {
570     Mutex::Autolock autoLock(mLock);
571     return mCacheOffset + mCache->totalSize();
572 }
573 
getAvailableSize(off64_t offset,off64_t * size)574 status_t NuCachedSource2::getAvailableSize(off64_t offset, off64_t *size) {
575     Mutex::Autolock autoLock(mLock);
576     status_t finalStatus = UNKNOWN_ERROR;
577     *size = approxDataRemaining_l(offset, &finalStatus);
578     return finalStatus;
579 }
580 
approxDataRemaining(status_t * finalStatus) const581 size_t NuCachedSource2::approxDataRemaining(status_t *finalStatus) const {
582     Mutex::Autolock autoLock(mLock);
583     return approxDataRemaining_l(mLastAccessPos, finalStatus);
584 }
585 
approxDataRemaining_l(off64_t offset,status_t * finalStatus) const586 size_t NuCachedSource2::approxDataRemaining_l(off64_t offset, status_t *finalStatus) const {
587     *finalStatus = mFinalStatus;
588 
589     if (mFinalStatus != OK && mNumRetriesLeft > 0) {
590         // Pretend that everything is fine until we're out of retries.
591         *finalStatus = OK;
592     }
593 
594     offset = offset >= 0 ? offset : mLastAccessPos;
595     off64_t lastBytePosCached = mCacheOffset + mCache->totalSize();
596     if (offset < lastBytePosCached) {
597         return lastBytePosCached - offset;
598     }
599     return 0;
600 }
601 
readInternal(off64_t offset,void * data,size_t size)602 ssize_t NuCachedSource2::readInternal(off64_t offset, void *data, size_t size) {
603     CHECK_LE(size, (size_t)mHighwaterThresholdBytes);
604 
605     ALOGV("readInternal offset %lld size %zu", (long long)offset, size);
606 
607     Mutex::Autolock autoLock(mLock);
608 
609     // If we're disconnecting, return EOS and don't access *data pointer.
610     // data could be on the stack of the caller to NuCachedSource2::readAt(),
611     // which may have exited already.
612     if (mDisconnecting) {
613         return ERROR_END_OF_STREAM;
614     }
615 
616     if (!mFetching) {
617         mLastAccessPos = offset;
618         restartPrefetcherIfNecessary_l(
619                 false, // ignoreLowWaterThreshold
620                 true); // force
621     }
622 
623     if (offset < mCacheOffset
624             || offset >= (off64_t)(mCacheOffset + mCache->totalSize())) {
625         static const off64_t kPadding = 256 * 1024;
626 
627         // In the presence of multiple decoded streams, once of them will
628         // trigger this seek request, the other one will request data "nearby"
629         // soon, adjust the seek position so that that subsequent request
630         // does not trigger another seek.
631         off64_t seekOffset = (offset > kPadding) ? offset - kPadding : 0;
632 
633         seekInternal_l(seekOffset);
634     }
635 
636     size_t delta = offset - mCacheOffset;
637 
638     if (mFinalStatus != OK && mNumRetriesLeft == 0) {
639         if (delta >= mCache->totalSize()) {
640             return mFinalStatus;
641         }
642 
643         size_t avail = mCache->totalSize() - delta;
644 
645         if (avail > size) {
646             avail = size;
647         }
648 
649         mCache->copy(delta, data, avail);
650 
651         return avail;
652     }
653 
654     if (offset + size <= mCacheOffset + mCache->totalSize()) {
655         mCache->copy(delta, data, size);
656 
657         return size;
658     }
659 
660     ALOGV("deferring read");
661 
662     return -EAGAIN;
663 }
664 
seekInternal_l(off64_t offset)665 status_t NuCachedSource2::seekInternal_l(off64_t offset) {
666     mLastAccessPos = offset;
667 
668     if (offset >= mCacheOffset
669             && offset <= (off64_t)(mCacheOffset + mCache->totalSize())) {
670         return OK;
671     }
672 
673     ALOGI("new range: offset= %lld", (long long)offset);
674 
675     mCacheOffset = offset;
676 
677     size_t totalSize = mCache->totalSize();
678     CHECK_EQ(mCache->releaseFromStart(totalSize), totalSize);
679 
680     mNumRetriesLeft = kMaxNumRetries;
681     mFetching = true;
682 
683     return OK;
684 }
685 
resumeFetchingIfNecessary()686 void NuCachedSource2::resumeFetchingIfNecessary() {
687     Mutex::Autolock autoLock(mLock);
688 
689     restartPrefetcherIfNecessary_l(true /* ignore low water threshold */);
690 }
691 
getUri()692 String8 NuCachedSource2::getUri() {
693     return mSource->getUri();
694 }
695 
getMIMEType() const696 String8 NuCachedSource2::getMIMEType() const {
697     return mSource->getMIMEType();
698 }
699 
updateCacheParamsFromSystemProperty()700 void NuCachedSource2::updateCacheParamsFromSystemProperty() {
701     char value[PROPERTY_VALUE_MAX];
702     if (!property_get("media.stagefright.cache-params", value, NULL)) {
703         return;
704     }
705 
706     updateCacheParamsFromString(value);
707 }
708 
updateCacheParamsFromString(const char * s)709 void NuCachedSource2::updateCacheParamsFromString(const char *s) {
710     ssize_t lowwaterMarkKb, highwaterMarkKb;
711     int keepAliveSecs;
712 
713     if (sscanf(s, "%zd/%zd/%d",
714                &lowwaterMarkKb, &highwaterMarkKb, &keepAliveSecs) != 3) {
715         ALOGE("Failed to parse cache parameters from '%s'.", s);
716         return;
717     }
718 
719     if (lowwaterMarkKb >= 0) {
720         mLowwaterThresholdBytes = lowwaterMarkKb * 1024;
721     } else {
722         mLowwaterThresholdBytes = kDefaultLowWaterThreshold;
723     }
724 
725     if (highwaterMarkKb >= 0) {
726         mHighwaterThresholdBytes = highwaterMarkKb * 1024;
727     } else {
728         mHighwaterThresholdBytes = kDefaultHighWaterThreshold;
729     }
730 
731     if (mLowwaterThresholdBytes >= mHighwaterThresholdBytes) {
732         ALOGE("Illegal low/highwater marks specified, reverting to defaults.");
733 
734         mLowwaterThresholdBytes = kDefaultLowWaterThreshold;
735         mHighwaterThresholdBytes = kDefaultHighWaterThreshold;
736     }
737 
738     if (keepAliveSecs >= 0) {
739         mKeepAliveIntervalUs = keepAliveSecs * 1000000LL;
740     } else {
741         mKeepAliveIntervalUs = kDefaultKeepAliveIntervalUs;
742     }
743 
744     ALOGV("lowwater = %zu bytes, highwater = %zu bytes, keepalive = %lld us",
745          mLowwaterThresholdBytes,
746          mHighwaterThresholdBytes,
747          (long long)mKeepAliveIntervalUs);
748 }
749 
750 // static
RemoveCacheSpecificHeaders(KeyedVector<String8,String8> * headers,String8 * cacheConfig,bool * disconnectAtHighwatermark)751 void NuCachedSource2::RemoveCacheSpecificHeaders(
752         KeyedVector<String8, String8> *headers,
753         String8 *cacheConfig,
754         bool *disconnectAtHighwatermark) {
755     *cacheConfig = String8();
756     *disconnectAtHighwatermark = false;
757 
758     if (headers == NULL) {
759         return;
760     }
761 
762     ssize_t index;
763     if ((index = headers->indexOfKey(String8("x-cache-config"))) >= 0) {
764         *cacheConfig = headers->valueAt(index);
765 
766         headers->removeItemsAt(index);
767 
768         ALOGV("Using special cache config '%s'", cacheConfig->c_str());
769     }
770 
771     if ((index = headers->indexOfKey(
772                     String8("x-disconnect-at-highwatermark"))) >= 0) {
773         *disconnectAtHighwatermark = true;
774         headers->removeItemsAt(index);
775 
776         ALOGV("Client requested disconnection at highwater mark");
777     }
778 }
779 
780 }  // namespace android
781