1 /*
2  * Copyright (C) 2010 The Android Open Source Project
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *      http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 #include <inttypes.h>
18 
19 //#define LOG_NDEBUG 0
20 #define LOG_TAG "NuCachedSource2"
21 #include <utils/Log.h>
22 
23 #include "include/NuCachedSource2.h"
24 #include "include/HTTPBase.h"
25 
26 #include <cutils/properties.h>
27 #include <media/stagefright/foundation/ADebug.h>
28 #include <media/stagefright/foundation/AMessage.h>
29 #include <media/stagefright/MediaErrors.h>
30 
31 namespace android {
32 
33 struct PageCache {
34     PageCache(size_t pageSize);
35     ~PageCache();
36 
37     struct Page {
38         void *mData;
39         size_t mSize;
40     };
41 
42     Page *acquirePage();
43     void releasePage(Page *page);
44 
45     void appendPage(Page *page);
46     size_t releaseFromStart(size_t maxBytes);
47 
totalSizeandroid::PageCache48     size_t totalSize() const {
49         return mTotalSize;
50     }
51 
52     void copy(size_t from, void *data, size_t size);
53 
54 private:
55     size_t mPageSize;
56     size_t mTotalSize;
57 
58     List<Page *> mActivePages;
59     List<Page *> mFreePages;
60 
61     void freePages(List<Page *> *list);
62 
63     DISALLOW_EVIL_CONSTRUCTORS(PageCache);
64 };
65 
PageCache(size_t pageSize)66 PageCache::PageCache(size_t pageSize)
67     : mPageSize(pageSize),
68       mTotalSize(0) {
69 }
70 
~PageCache()71 PageCache::~PageCache() {
72     freePages(&mActivePages);
73     freePages(&mFreePages);
74 }
75 
freePages(List<Page * > * list)76 void PageCache::freePages(List<Page *> *list) {
77     List<Page *>::iterator it = list->begin();
78     while (it != list->end()) {
79         Page *page = *it;
80 
81         free(page->mData);
82         delete page;
83         page = NULL;
84 
85         ++it;
86     }
87 }
88 
acquirePage()89 PageCache::Page *PageCache::acquirePage() {
90     if (!mFreePages.empty()) {
91         List<Page *>::iterator it = mFreePages.begin();
92         Page *page = *it;
93         mFreePages.erase(it);
94 
95         return page;
96     }
97 
98     Page *page = new Page;
99     page->mData = malloc(mPageSize);
100     page->mSize = 0;
101 
102     return page;
103 }
104 
releasePage(Page * page)105 void PageCache::releasePage(Page *page) {
106     page->mSize = 0;
107     mFreePages.push_back(page);
108 }
109 
appendPage(Page * page)110 void PageCache::appendPage(Page *page) {
111     mTotalSize += page->mSize;
112     mActivePages.push_back(page);
113 }
114 
releaseFromStart(size_t maxBytes)115 size_t PageCache::releaseFromStart(size_t maxBytes) {
116     size_t bytesReleased = 0;
117 
118     while (maxBytes > 0 && !mActivePages.empty()) {
119         List<Page *>::iterator it = mActivePages.begin();
120 
121         Page *page = *it;
122 
123         if (maxBytes < page->mSize) {
124             break;
125         }
126 
127         mActivePages.erase(it);
128 
129         maxBytes -= page->mSize;
130         bytesReleased += page->mSize;
131 
132         releasePage(page);
133     }
134 
135     mTotalSize -= bytesReleased;
136     return bytesReleased;
137 }
138 
copy(size_t from,void * data,size_t size)139 void PageCache::copy(size_t from, void *data, size_t size) {
140     ALOGV("copy from %zu size %zu", from, size);
141 
142     if (size == 0) {
143         return;
144     }
145 
146     CHECK_LE(from + size, mTotalSize);
147 
148     size_t offset = 0;
149     List<Page *>::iterator it = mActivePages.begin();
150     while (from >= offset + (*it)->mSize) {
151         offset += (*it)->mSize;
152         ++it;
153     }
154 
155     size_t delta = from - offset;
156     size_t avail = (*it)->mSize - delta;
157 
158     if (avail >= size) {
159         memcpy(data, (const uint8_t *)(*it)->mData + delta, size);
160         return;
161     }
162 
163     memcpy(data, (const uint8_t *)(*it)->mData + delta, avail);
164     ++it;
165     data = (uint8_t *)data + avail;
166     size -= avail;
167 
168     while (size > 0) {
169         size_t copy = (*it)->mSize;
170         if (copy > size) {
171             copy = size;
172         }
173         memcpy(data, (*it)->mData, copy);
174         data = (uint8_t *)data + copy;
175         size -= copy;
176         ++it;
177     }
178 }
179 
180 ////////////////////////////////////////////////////////////////////////////////
181 
NuCachedSource2(const sp<DataSource> & source,const char * cacheConfig,bool disconnectAtHighwatermark)182 NuCachedSource2::NuCachedSource2(
183         const sp<DataSource> &source,
184         const char *cacheConfig,
185         bool disconnectAtHighwatermark)
186     : mSource(source),
187       mReflector(new AHandlerReflector<NuCachedSource2>(this)),
188       mLooper(new ALooper),
189       mCache(new PageCache(kPageSize)),
190       mCacheOffset(0),
191       mFinalStatus(OK),
192       mLastAccessPos(0),
193       mFetching(true),
194       mDisconnecting(false),
195       mLastFetchTimeUs(-1),
196       mNumRetriesLeft(kMaxNumRetries),
197       mHighwaterThresholdBytes(kDefaultHighWaterThreshold),
198       mLowwaterThresholdBytes(kDefaultLowWaterThreshold),
199       mKeepAliveIntervalUs(kDefaultKeepAliveIntervalUs),
200       mDisconnectAtHighwatermark(disconnectAtHighwatermark) {
201     // We are NOT going to support disconnect-at-highwatermark indefinitely
202     // and we are not guaranteeing support for client-specified cache
203     // parameters. Both of these are temporary measures to solve a specific
204     // problem that will be solved in a better way going forward.
205 
206     updateCacheParamsFromSystemProperty();
207 
208     if (cacheConfig != NULL) {
209         updateCacheParamsFromString(cacheConfig);
210     }
211 
212     if (mDisconnectAtHighwatermark) {
213         // Makes no sense to disconnect and do keep-alives...
214         mKeepAliveIntervalUs = 0;
215     }
216 
217     mLooper->setName("NuCachedSource2");
218     mLooper->registerHandler(mReflector);
219 
220     // Since it may not be obvious why our looper thread needs to be
221     // able to call into java since it doesn't appear to do so at all...
222     // IMediaHTTPConnection may be (and most likely is) implemented in JAVA
223     // and a local JAVA IBinder will call directly into JNI methods.
224     // So whenever we call DataSource::readAt it may end up in a call to
225     // IMediaHTTPConnection::readAt and therefore call back into JAVA.
226     mLooper->start(false /* runOnCallingThread */, true /* canCallJava */);
227 
228     mName = String8::format("NuCachedSource2(%s)", mSource->toString().string());
229 }
230 
~NuCachedSource2()231 NuCachedSource2::~NuCachedSource2() {
232     mLooper->stop();
233     mLooper->unregisterHandler(mReflector->id());
234 
235     delete mCache;
236     mCache = NULL;
237 }
238 
239 // static
Create(const sp<DataSource> & source,const char * cacheConfig,bool disconnectAtHighwatermark)240 sp<NuCachedSource2> NuCachedSource2::Create(
241         const sp<DataSource> &source,
242         const char *cacheConfig,
243         bool disconnectAtHighwatermark) {
244     sp<NuCachedSource2> instance = new NuCachedSource2(
245             source, cacheConfig, disconnectAtHighwatermark);
246     Mutex::Autolock autoLock(instance->mLock);
247     (new AMessage(kWhatFetchMore, instance->mReflector))->post();
248     return instance;
249 }
250 
getEstimatedBandwidthKbps(int32_t * kbps)251 status_t NuCachedSource2::getEstimatedBandwidthKbps(int32_t *kbps) {
252     if (mSource->flags() & kIsHTTPBasedSource) {
253         HTTPBase* source = static_cast<HTTPBase *>(mSource.get());
254         return source->getEstimatedBandwidthKbps(kbps);
255     }
256     return ERROR_UNSUPPORTED;
257 }
258 
disconnect()259 void NuCachedSource2::disconnect() {
260     if (mSource->flags() & kIsHTTPBasedSource) {
261         ALOGV("disconnecting HTTPBasedSource");
262 
263         {
264             Mutex::Autolock autoLock(mLock);
265             // set mDisconnecting to true, if a fetch returns after
266             // this, the source will be marked as EOS.
267             mDisconnecting = true;
268 
269             // explicitly signal mCondition so that the pending readAt()
270             // will immediately return
271             mCondition.signal();
272         }
273 
274         // explicitly disconnect from the source, to allow any
275         // pending reads to return more promptly
276         static_cast<HTTPBase *>(mSource.get())->disconnect();
277     }
278 }
279 
setCacheStatCollectFreq(int32_t freqMs)280 status_t NuCachedSource2::setCacheStatCollectFreq(int32_t freqMs) {
281     if (mSource->flags() & kIsHTTPBasedSource) {
282         HTTPBase *source = static_cast<HTTPBase *>(mSource.get());
283         return source->setBandwidthStatCollectFreq(freqMs);
284     }
285     return ERROR_UNSUPPORTED;
286 }
287 
initCheck() const288 status_t NuCachedSource2::initCheck() const {
289     return mSource->initCheck();
290 }
291 
getSize(off64_t * size)292 status_t NuCachedSource2::getSize(off64_t *size) {
293     return mSource->getSize(size);
294 }
295 
flags()296 uint32_t NuCachedSource2::flags() {
297     // Remove HTTP related flags since NuCachedSource2 is not HTTP-based.
298     uint32_t flags = mSource->flags() & ~(kWantsPrefetching | kIsHTTPBasedSource);
299     return (flags | kIsCachingDataSource);
300 }
301 
onMessageReceived(const sp<AMessage> & msg)302 void NuCachedSource2::onMessageReceived(const sp<AMessage> &msg) {
303     switch (msg->what()) {
304         case kWhatFetchMore:
305         {
306             onFetch();
307             break;
308         }
309 
310         case kWhatRead:
311         {
312             onRead(msg);
313             break;
314         }
315 
316         default:
317             TRESPASS();
318     }
319 }
320 
fetchInternal()321 void NuCachedSource2::fetchInternal() {
322     ALOGV("fetchInternal");
323 
324     bool reconnect = false;
325 
326     {
327         Mutex::Autolock autoLock(mLock);
328         CHECK(mFinalStatus == OK || mNumRetriesLeft > 0);
329 
330         if (mFinalStatus != OK) {
331             --mNumRetriesLeft;
332 
333             reconnect = true;
334         }
335     }
336 
337     if (reconnect) {
338         status_t err =
339             mSource->reconnectAtOffset(mCacheOffset + mCache->totalSize());
340 
341         Mutex::Autolock autoLock(mLock);
342 
343         if (mDisconnecting) {
344             mNumRetriesLeft = 0;
345             mFinalStatus = ERROR_END_OF_STREAM;
346             return;
347         } else if (err == ERROR_UNSUPPORTED || err == -EPIPE) {
348             // These are errors that are not likely to go away even if we
349             // retry, i.e. the server doesn't support range requests or similar.
350             mNumRetriesLeft = 0;
351             return;
352         } else if (err != OK) {
353             ALOGI("The attempt to reconnect failed, %d retries remaining",
354                  mNumRetriesLeft);
355 
356             return;
357         }
358     }
359 
360     PageCache::Page *page = mCache->acquirePage();
361 
362     ssize_t n = mSource->readAt(
363             mCacheOffset + mCache->totalSize(), page->mData, kPageSize);
364 
365     Mutex::Autolock autoLock(mLock);
366 
367     if (n == 0 || mDisconnecting) {
368         ALOGI("caching reached eos.");
369 
370         mNumRetriesLeft = 0;
371         mFinalStatus = ERROR_END_OF_STREAM;
372 
373         mCache->releasePage(page);
374     } else if (n < 0) {
375         mFinalStatus = n;
376         if (n == ERROR_UNSUPPORTED || n == -EPIPE) {
377             // These are errors that are not likely to go away even if we
378             // retry, i.e. the server doesn't support range requests or similar.
379             mNumRetriesLeft = 0;
380         }
381 
382         ALOGE("source returned error %zd, %d retries left", n, mNumRetriesLeft);
383         mCache->releasePage(page);
384     } else {
385         if (mFinalStatus != OK) {
386             ALOGI("retrying a previously failed read succeeded.");
387         }
388         mNumRetriesLeft = kMaxNumRetries;
389         mFinalStatus = OK;
390 
391         page->mSize = n;
392         mCache->appendPage(page);
393     }
394 }
395 
onFetch()396 void NuCachedSource2::onFetch() {
397     ALOGV("onFetch");
398 
399     if (mFinalStatus != OK && mNumRetriesLeft == 0) {
400         ALOGV("EOS reached, done prefetching for now");
401         mFetching = false;
402     }
403 
404     bool keepAlive =
405         !mFetching
406             && mFinalStatus == OK
407             && mKeepAliveIntervalUs > 0
408             && ALooper::GetNowUs() >= mLastFetchTimeUs + mKeepAliveIntervalUs;
409 
410     if (mFetching || keepAlive) {
411         if (keepAlive) {
412             ALOGI("Keep alive");
413         }
414 
415         fetchInternal();
416 
417         mLastFetchTimeUs = ALooper::GetNowUs();
418 
419         if (mFetching && mCache->totalSize() >= mHighwaterThresholdBytes) {
420             ALOGI("Cache full, done prefetching for now");
421             mFetching = false;
422 
423             if (mDisconnectAtHighwatermark
424                     && (mSource->flags() & DataSource::kIsHTTPBasedSource)) {
425                 ALOGV("Disconnecting at high watermark");
426                 static_cast<HTTPBase *>(mSource.get())->disconnect();
427                 mFinalStatus = -EAGAIN;
428             }
429         }
430     } else {
431         Mutex::Autolock autoLock(mLock);
432         restartPrefetcherIfNecessary_l();
433     }
434 
435     int64_t delayUs;
436     if (mFetching) {
437         if (mFinalStatus != OK && mNumRetriesLeft > 0) {
438             // We failed this time and will try again in 3 seconds.
439             delayUs = 3000000ll;
440         } else {
441             delayUs = 0;
442         }
443     } else {
444         delayUs = 100000ll;
445     }
446 
447     (new AMessage(kWhatFetchMore, mReflector))->post(delayUs);
448 }
449 
onRead(const sp<AMessage> & msg)450 void NuCachedSource2::onRead(const sp<AMessage> &msg) {
451     ALOGV("onRead");
452 
453     int64_t offset;
454     CHECK(msg->findInt64("offset", &offset));
455 
456     void *data;
457     CHECK(msg->findPointer("data", &data));
458 
459     size_t size;
460     CHECK(msg->findSize("size", &size));
461 
462     ssize_t result = readInternal(offset, data, size);
463 
464     if (result == -EAGAIN) {
465         msg->post(50000);
466         return;
467     }
468 
469     Mutex::Autolock autoLock(mLock);
470     if (mDisconnecting) {
471         mCondition.signal();
472         return;
473     }
474 
475     CHECK(mAsyncResult == NULL);
476 
477     mAsyncResult = new AMessage;
478     mAsyncResult->setInt32("result", result);
479 
480     mCondition.signal();
481 }
482 
restartPrefetcherIfNecessary_l(bool ignoreLowWaterThreshold,bool force)483 void NuCachedSource2::restartPrefetcherIfNecessary_l(
484         bool ignoreLowWaterThreshold, bool force) {
485     static const size_t kGrayArea = 1024 * 1024;
486 
487     if (mFetching || (mFinalStatus != OK && mNumRetriesLeft == 0)) {
488         return;
489     }
490 
491     if (!ignoreLowWaterThreshold && !force
492             && mCacheOffset + mCache->totalSize() - mLastAccessPos
493                 >= mLowwaterThresholdBytes) {
494         return;
495     }
496 
497     size_t maxBytes = mLastAccessPos - mCacheOffset;
498 
499     if (!force) {
500         if (maxBytes < kGrayArea) {
501             return;
502         }
503 
504         maxBytes -= kGrayArea;
505     }
506 
507     size_t actualBytes = mCache->releaseFromStart(maxBytes);
508     mCacheOffset += actualBytes;
509 
510     ALOGI("restarting prefetcher, totalSize = %zu", mCache->totalSize());
511     mFetching = true;
512 }
513 
readAt(off64_t offset,void * data,size_t size)514 ssize_t NuCachedSource2::readAt(off64_t offset, void *data, size_t size) {
515     Mutex::Autolock autoSerializer(mSerializer);
516 
517     ALOGV("readAt offset %lld, size %zu", (long long)offset, size);
518 
519     Mutex::Autolock autoLock(mLock);
520     if (mDisconnecting) {
521         return ERROR_END_OF_STREAM;
522     }
523 
524     // If the request can be completely satisfied from the cache, do so.
525 
526     if (offset >= mCacheOffset
527             && offset + size <= mCacheOffset + mCache->totalSize()) {
528         size_t delta = offset - mCacheOffset;
529         mCache->copy(delta, data, size);
530 
531         mLastAccessPos = offset + size;
532 
533         return size;
534     }
535 
536     sp<AMessage> msg = new AMessage(kWhatRead, mReflector);
537     msg->setInt64("offset", offset);
538     msg->setPointer("data", data);
539     msg->setSize("size", size);
540 
541     CHECK(mAsyncResult == NULL);
542     msg->post();
543 
544     while (mAsyncResult == NULL && !mDisconnecting) {
545         mCondition.wait(mLock);
546     }
547 
548     if (mDisconnecting) {
549         mAsyncResult.clear();
550         return ERROR_END_OF_STREAM;
551     }
552 
553     int32_t result;
554     CHECK(mAsyncResult->findInt32("result", &result));
555 
556     mAsyncResult.clear();
557 
558     if (result > 0) {
559         mLastAccessPos = offset + result;
560     }
561 
562     return (ssize_t)result;
563 }
564 
cachedSize()565 size_t NuCachedSource2::cachedSize() {
566     Mutex::Autolock autoLock(mLock);
567     return mCacheOffset + mCache->totalSize();
568 }
569 
approxDataRemaining(status_t * finalStatus) const570 size_t NuCachedSource2::approxDataRemaining(status_t *finalStatus) const {
571     Mutex::Autolock autoLock(mLock);
572     return approxDataRemaining_l(finalStatus);
573 }
574 
approxDataRemaining_l(status_t * finalStatus) const575 size_t NuCachedSource2::approxDataRemaining_l(status_t *finalStatus) const {
576     *finalStatus = mFinalStatus;
577 
578     if (mFinalStatus != OK && mNumRetriesLeft > 0) {
579         // Pretend that everything is fine until we're out of retries.
580         *finalStatus = OK;
581     }
582 
583     off64_t lastBytePosCached = mCacheOffset + mCache->totalSize();
584     if (mLastAccessPos < lastBytePosCached) {
585         return lastBytePosCached - mLastAccessPos;
586     }
587     return 0;
588 }
589 
readInternal(off64_t offset,void * data,size_t size)590 ssize_t NuCachedSource2::readInternal(off64_t offset, void *data, size_t size) {
591     CHECK_LE(size, (size_t)mHighwaterThresholdBytes);
592 
593     ALOGV("readInternal offset %lld size %zu", (long long)offset, size);
594 
595     Mutex::Autolock autoLock(mLock);
596 
597     // If we're disconnecting, return EOS and don't access *data pointer.
598     // data could be on the stack of the caller to NuCachedSource2::readAt(),
599     // which may have exited already.
600     if (mDisconnecting) {
601         return ERROR_END_OF_STREAM;
602     }
603 
604     if (!mFetching) {
605         mLastAccessPos = offset;
606         restartPrefetcherIfNecessary_l(
607                 false, // ignoreLowWaterThreshold
608                 true); // force
609     }
610 
611     if (offset < mCacheOffset
612             || offset >= (off64_t)(mCacheOffset + mCache->totalSize())) {
613         static const off64_t kPadding = 256 * 1024;
614 
615         // In the presence of multiple decoded streams, once of them will
616         // trigger this seek request, the other one will request data "nearby"
617         // soon, adjust the seek position so that that subsequent request
618         // does not trigger another seek.
619         off64_t seekOffset = (offset > kPadding) ? offset - kPadding : 0;
620 
621         seekInternal_l(seekOffset);
622     }
623 
624     size_t delta = offset - mCacheOffset;
625 
626     if (mFinalStatus != OK && mNumRetriesLeft == 0) {
627         if (delta >= mCache->totalSize()) {
628             return mFinalStatus;
629         }
630 
631         size_t avail = mCache->totalSize() - delta;
632 
633         if (avail > size) {
634             avail = size;
635         }
636 
637         mCache->copy(delta, data, avail);
638 
639         return avail;
640     }
641 
642     if (offset + size <= mCacheOffset + mCache->totalSize()) {
643         mCache->copy(delta, data, size);
644 
645         return size;
646     }
647 
648     ALOGV("deferring read");
649 
650     return -EAGAIN;
651 }
652 
seekInternal_l(off64_t offset)653 status_t NuCachedSource2::seekInternal_l(off64_t offset) {
654     mLastAccessPos = offset;
655 
656     if (offset >= mCacheOffset
657             && offset <= (off64_t)(mCacheOffset + mCache->totalSize())) {
658         return OK;
659     }
660 
661     ALOGI("new range: offset= %lld", (long long)offset);
662 
663     mCacheOffset = offset;
664 
665     size_t totalSize = mCache->totalSize();
666     CHECK_EQ(mCache->releaseFromStart(totalSize), totalSize);
667 
668     mNumRetriesLeft = kMaxNumRetries;
669     mFetching = true;
670 
671     return OK;
672 }
673 
resumeFetchingIfNecessary()674 void NuCachedSource2::resumeFetchingIfNecessary() {
675     Mutex::Autolock autoLock(mLock);
676 
677     restartPrefetcherIfNecessary_l(true /* ignore low water threshold */);
678 }
679 
DrmInitialization(const char * mime)680 sp<DecryptHandle> NuCachedSource2::DrmInitialization(const char* mime) {
681     return mSource->DrmInitialization(mime);
682 }
683 
getDrmInfo(sp<DecryptHandle> & handle,DrmManagerClient ** client)684 void NuCachedSource2::getDrmInfo(sp<DecryptHandle> &handle, DrmManagerClient **client) {
685     mSource->getDrmInfo(handle, client);
686 }
687 
getUri()688 String8 NuCachedSource2::getUri() {
689     return mSource->getUri();
690 }
691 
getMIMEType() const692 String8 NuCachedSource2::getMIMEType() const {
693     return mSource->getMIMEType();
694 }
695 
updateCacheParamsFromSystemProperty()696 void NuCachedSource2::updateCacheParamsFromSystemProperty() {
697     char value[PROPERTY_VALUE_MAX];
698     if (!property_get("media.stagefright.cache-params", value, NULL)) {
699         return;
700     }
701 
702     updateCacheParamsFromString(value);
703 }
704 
updateCacheParamsFromString(const char * s)705 void NuCachedSource2::updateCacheParamsFromString(const char *s) {
706     ssize_t lowwaterMarkKb, highwaterMarkKb;
707     int keepAliveSecs;
708 
709     if (sscanf(s, "%zd/%zd/%d",
710                &lowwaterMarkKb, &highwaterMarkKb, &keepAliveSecs) != 3) {
711         ALOGE("Failed to parse cache parameters from '%s'.", s);
712         return;
713     }
714 
715     if (lowwaterMarkKb >= 0) {
716         mLowwaterThresholdBytes = lowwaterMarkKb * 1024;
717     } else {
718         mLowwaterThresholdBytes = kDefaultLowWaterThreshold;
719     }
720 
721     if (highwaterMarkKb >= 0) {
722         mHighwaterThresholdBytes = highwaterMarkKb * 1024;
723     } else {
724         mHighwaterThresholdBytes = kDefaultHighWaterThreshold;
725     }
726 
727     if (mLowwaterThresholdBytes >= mHighwaterThresholdBytes) {
728         ALOGE("Illegal low/highwater marks specified, reverting to defaults.");
729 
730         mLowwaterThresholdBytes = kDefaultLowWaterThreshold;
731         mHighwaterThresholdBytes = kDefaultHighWaterThreshold;
732     }
733 
734     if (keepAliveSecs >= 0) {
735         mKeepAliveIntervalUs = keepAliveSecs * 1000000ll;
736     } else {
737         mKeepAliveIntervalUs = kDefaultKeepAliveIntervalUs;
738     }
739 
740     ALOGV("lowwater = %zu bytes, highwater = %zu bytes, keepalive = %lld us",
741          mLowwaterThresholdBytes,
742          mHighwaterThresholdBytes,
743          (long long)mKeepAliveIntervalUs);
744 }
745 
746 // static
RemoveCacheSpecificHeaders(KeyedVector<String8,String8> * headers,String8 * cacheConfig,bool * disconnectAtHighwatermark)747 void NuCachedSource2::RemoveCacheSpecificHeaders(
748         KeyedVector<String8, String8> *headers,
749         String8 *cacheConfig,
750         bool *disconnectAtHighwatermark) {
751     *cacheConfig = String8();
752     *disconnectAtHighwatermark = false;
753 
754     if (headers == NULL) {
755         return;
756     }
757 
758     ssize_t index;
759     if ((index = headers->indexOfKey(String8("x-cache-config"))) >= 0) {
760         *cacheConfig = headers->valueAt(index);
761 
762         headers->removeItemsAt(index);
763 
764         ALOGV("Using special cache config '%s'", cacheConfig->string());
765     }
766 
767     if ((index = headers->indexOfKey(
768                     String8("x-disconnect-at-highwatermark"))) >= 0) {
769         *disconnectAtHighwatermark = true;
770         headers->removeItemsAt(index);
771 
772         ALOGV("Client requested disconnection at highwater mark");
773     }
774 }
775 
776 }  // namespace android
777