1 /*
2  * Copyright (C) 2010 The Android Open Source Project
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *      http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 //#define LOG_NDEBUG 0
18 #define LOG_TAG "LiveDataSource"
19 #include <utils/Log.h>
20 
21 #include "LiveDataSource.h"
22 
23 #include <media/stagefright/foundation/ABuffer.h>
24 #include <media/stagefright/foundation/ADebug.h>
25 
26 #define SAVE_BACKUP     0
27 
28 namespace android {
29 
LiveDataSource()30 LiveDataSource::LiveDataSource()
31     : mOffset(0),
32       mFinalResult(OK),
33       mBackupFile(NULL) {
34 #if SAVE_BACKUP
35     mBackupFile = fopen("/data/misc/backup.ts", "wb");
36     CHECK(mBackupFile != NULL);
37 #endif
38 }
39 
~LiveDataSource()40 LiveDataSource::~LiveDataSource() {
41     if (mBackupFile != NULL) {
42         fclose(mBackupFile);
43         mBackupFile = NULL;
44     }
45 }
46 
initCheck() const47 status_t LiveDataSource::initCheck() const {
48     return OK;
49 }
50 
countQueuedBuffers()51 size_t LiveDataSource::countQueuedBuffers() {
52     Mutex::Autolock autoLock(mLock);
53 
54     return mBufferQueue.size();
55 }
56 
readAtNonBlocking(off64_t offset,void * data,size_t size)57 ssize_t LiveDataSource::readAtNonBlocking(
58         off64_t offset, void *data, size_t size) {
59     Mutex::Autolock autoLock(mLock);
60 
61     if (offset != mOffset) {
62         ALOGE("Attempt at reading non-sequentially from LiveDataSource.");
63         return -EPIPE;
64     }
65 
66     size_t totalAvailable = 0;
67     for (List<sp<ABuffer> >::iterator it = mBufferQueue.begin();
68          it != mBufferQueue.end(); ++it) {
69         sp<ABuffer> buffer = *it;
70 
71         totalAvailable += buffer->size();
72 
73         if (totalAvailable >= size) {
74             break;
75         }
76     }
77 
78     if (totalAvailable < size) {
79         return mFinalResult == OK ? -EWOULDBLOCK : mFinalResult;
80     }
81 
82     return readAt_l(offset, data, size);
83 }
84 
readAt(off64_t offset,void * data,size_t size)85 ssize_t LiveDataSource::readAt(off64_t offset, void *data, size_t size) {
86     Mutex::Autolock autoLock(mLock);
87     return readAt_l(offset, data, size);
88 }
89 
readAt_l(off64_t offset,void * data,size_t size)90 ssize_t LiveDataSource::readAt_l(off64_t offset, void *data, size_t size) {
91     if (offset != mOffset) {
92         ALOGE("Attempt at reading non-sequentially from LiveDataSource.");
93         return -EPIPE;
94     }
95 
96     size_t sizeDone = 0;
97 
98     while (sizeDone < size) {
99         while (mBufferQueue.empty() && mFinalResult == OK) {
100             mCondition.wait(mLock);
101         }
102 
103         if (mBufferQueue.empty()) {
104             if (sizeDone > 0) {
105                 mOffset += sizeDone;
106                 return sizeDone;
107             }
108 
109             return mFinalResult;
110         }
111 
112         sp<ABuffer> buffer = *mBufferQueue.begin();
113 
114         size_t copy = size - sizeDone;
115 
116         if (copy > buffer->size()) {
117             copy = buffer->size();
118         }
119 
120         memcpy((uint8_t *)data + sizeDone, buffer->data(), copy);
121 
122         sizeDone += copy;
123 
124         buffer->setRange(buffer->offset() + copy, buffer->size() - copy);
125 
126         if (buffer->size() == 0) {
127             mBufferQueue.erase(mBufferQueue.begin());
128         }
129     }
130 
131     mOffset += sizeDone;
132 
133     return sizeDone;
134 }
135 
queueBuffer(const sp<ABuffer> & buffer)136 void LiveDataSource::queueBuffer(const sp<ABuffer> &buffer) {
137     Mutex::Autolock autoLock(mLock);
138 
139     if (mFinalResult != OK) {
140         return;
141     }
142 
143 #if SAVE_BACKUP
144     if (mBackupFile != NULL) {
145         CHECK_EQ(fwrite(buffer->data(), 1, buffer->size(), mBackupFile),
146                  buffer->size());
147     }
148 #endif
149 
150     mBufferQueue.push_back(buffer);
151     mCondition.broadcast();
152 }
153 
queueEOS(status_t finalResult)154 void LiveDataSource::queueEOS(status_t finalResult) {
155     CHECK_NE(finalResult, (status_t)OK);
156 
157     Mutex::Autolock autoLock(mLock);
158 
159     mFinalResult = finalResult;
160     mCondition.broadcast();
161 }
162 
reset()163 void LiveDataSource::reset() {
164     Mutex::Autolock autoLock(mLock);
165 
166     // XXX FIXME: If we've done a partial read and waiting for more buffers,
167     // we'll mix old and new data...
168 
169     mFinalResult = OK;
170     mBufferQueue.clear();
171 }
172 
173 }  // namespace android
174