1 /*
2  * Copyright (C) 2016 The Android Open Source Project
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *      http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 package com.android.tv.tuner.exoplayer.buffer;
18 
19 import android.media.MediaCodec;
20 import android.os.ConditionVariable;
21 import android.os.Handler;
22 import android.os.HandlerThread;
23 import android.os.Message;
24 import android.util.Log;
25 import android.util.Pair;
26 
27 import com.google.android.exoplayer.MediaFormat;
28 import com.google.android.exoplayer.SampleHolder;
29 import com.google.android.exoplayer.util.MimeTypes;
30 import com.android.tv.common.SoftPreconditions;
31 import com.android.tv.tuner.exoplayer.buffer.RecordingSampleBuffer.BufferReason;
32 
33 import java.io.IOException;
34 import java.util.List;
35 import java.util.concurrent.ConcurrentLinkedQueue;
36 
37 /**
38  * Handles all {@link SampleChunk} I/O operations.
39  * An I/O dedicated thread handles all I/O operations for synchronization.
40  */
41 public class SampleChunkIoHelper implements Handler.Callback {
42     private static final String TAG = "SampleChunkIoHelper";
43 
44     private static final int MAX_READ_BUFFER_SAMPLES = 3;
45     private static final int READ_RESCHEDULING_DELAY_MS = 10;
46 
47     private static final int MSG_OPEN_READ = 1;
48     private static final int MSG_OPEN_WRITE = 2;
49     private static final int MSG_CLOSE_WRITE = 3;
50     private static final int MSG_READ = 4;
51     private static final int MSG_WRITE = 5;
52     private static final int MSG_RELEASE = 6;
53 
54     private final int mTrackCount;
55     private final List<String> mIds;
56     private final List<MediaFormat> mMediaFormats;
57     private final @BufferReason int mBufferReason;
58     private final BufferManager mBufferManager;
59     private final SamplePool mSamplePool;
60     private final IoCallback mIoCallback;
61 
62     private Handler mIoHandler;
63     private final ConcurrentLinkedQueue<SampleHolder> mReadSampleBuffers[];
64     private final ConcurrentLinkedQueue<SampleHolder> mHandlerReadSampleBuffers[];
65     private final long[] mWriteEndPositionUs;
66     private final SampleChunk.IoState[] mReadIoStates;
67     private final SampleChunk.IoState[] mWriteIoStates;
68     private long mBufferDurationUs = 0;
69     private boolean mWriteEnded;
70     private boolean mErrorNotified;
71     private boolean mFinished;
72 
73     /**
74      * A Callback for I/O events.
75      */
76     public static abstract class IoCallback {
77 
78         /**
79          * Called when there is no sample to read.
80          */
onIoReachedEos()81         public void onIoReachedEos() {
82         }
83 
84         /**
85          * Called when there is an irrecoverable error during I/O.
86          */
onIoError()87         public void onIoError() {
88         }
89     }
90 
91     private class IoParams {
92         private final int index;
93         private final long positionUs;
94         private final SampleHolder sample;
95         private final ConditionVariable conditionVariable;
96         private final ConcurrentLinkedQueue<SampleHolder> readSampleBuffer;
97 
IoParams(int index, long positionUs, SampleHolder sample, ConditionVariable conditionVariable, ConcurrentLinkedQueue<SampleHolder> readSampleBuffer)98         private IoParams(int index, long positionUs, SampleHolder sample,
99                 ConditionVariable conditionVariable,
100                 ConcurrentLinkedQueue<SampleHolder> readSampleBuffer) {
101             this.index = index;
102             this.positionUs = positionUs;
103             this.sample = sample;
104             this.conditionVariable = conditionVariable;
105             this.readSampleBuffer = readSampleBuffer;
106         }
107     }
108 
109     /**
110      * Creates {@link SampleChunk} I/O handler.
111      *
112      * @param ids track names
113      * @param mediaFormats {@link android.media.MediaFormat} for each track
114      * @param bufferReason reason to be buffered
115      * @param bufferManager manager of {@link SampleChunk} collections
116      * @param samplePool allocator for a sample
117      * @param ioCallback listeners for I/O events
118      */
SampleChunkIoHelper(List<String> ids, List<MediaFormat> mediaFormats, @BufferReason int bufferReason, BufferManager bufferManager, SamplePool samplePool, IoCallback ioCallback)119     public SampleChunkIoHelper(List<String> ids, List<MediaFormat> mediaFormats,
120             @BufferReason int bufferReason, BufferManager bufferManager, SamplePool samplePool,
121             IoCallback ioCallback) {
122         mTrackCount = ids.size();
123         mIds = ids;
124         mMediaFormats = mediaFormats;
125         mBufferReason = bufferReason;
126         mBufferManager = bufferManager;
127         mSamplePool = samplePool;
128         mIoCallback = ioCallback;
129 
130         mReadSampleBuffers = new ConcurrentLinkedQueue[mTrackCount];
131         mHandlerReadSampleBuffers = new ConcurrentLinkedQueue[mTrackCount];
132         mWriteEndPositionUs = new long[mTrackCount];
133         mReadIoStates = new SampleChunk.IoState[mTrackCount];
134         mWriteIoStates = new SampleChunk.IoState[mTrackCount];
135         for (int i = 0; i < mTrackCount; ++i) {
136             mWriteEndPositionUs[i] = RecordingSampleBuffer.CHUNK_DURATION_US;
137             mReadIoStates[i] = new SampleChunk.IoState();
138             mWriteIoStates[i] = new SampleChunk.IoState();
139         }
140     }
141 
142     /**
143      * Prepares and initializes for I/O operations.
144      *
145      * @throws IOException
146      */
init()147     public void init() throws IOException {
148         HandlerThread handlerThread = new HandlerThread(TAG);
149         handlerThread.start();
150         mIoHandler = new Handler(handlerThread.getLooper(), this);
151         if (mBufferReason == RecordingSampleBuffer.BUFFER_REASON_RECORDED_PLAYBACK) {
152             for (int i = 0; i < mTrackCount; ++i) {
153                 mBufferManager.loadTrackFromStorage(mIds.get(i), mSamplePool);
154             }
155             mWriteEnded = true;
156         } else {
157             for (int i = 0; i < mTrackCount; ++i) {
158                 mIoHandler.sendMessage(mIoHandler.obtainMessage(MSG_OPEN_WRITE, i));
159             }
160         }
161     }
162 
163     /**
164      * Reads a sample if it is available.
165      *
166      * @param index track index
167      * @return {@code null} if a sample is not available, otherwise returns a sample
168      */
readSample(int index)169     public SampleHolder readSample(int index) {
170         SampleHolder sample = mReadSampleBuffers[index].poll();
171         mIoHandler.sendMessage(mIoHandler.obtainMessage(MSG_READ, index));
172         return sample;
173     }
174 
175     /**
176      * Writes a sample.
177      *
178      * @param index track index
179      * @param sample to write
180      * @param conditionVariable which will be wait until the write is finished
181      * @throws IOException
182      */
writeSample(int index, SampleHolder sample, ConditionVariable conditionVariable)183     public void writeSample(int index, SampleHolder sample,
184             ConditionVariable conditionVariable) throws IOException {
185         if (mErrorNotified) {
186             throw new IOException("Storage I/O error happened");
187         }
188         conditionVariable.close();
189         IoParams params = new IoParams(index, 0, sample, conditionVariable, null);
190         mIoHandler.sendMessage(mIoHandler.obtainMessage(MSG_WRITE, params));
191     }
192 
193     /**
194      * Starts read from the specified position.
195      *
196      * @param index track index
197      * @param positionUs the specified position
198      */
openRead(int index, long positionUs)199     public void openRead(int index, long positionUs) {
200         // Old mReadSampleBuffers may have a pending read.
201         mReadSampleBuffers[index] = new ConcurrentLinkedQueue<>();
202         IoParams params = new IoParams(index, positionUs, null, null, mReadSampleBuffers[index]);
203         mIoHandler.sendMessage(mIoHandler.obtainMessage(MSG_OPEN_READ, params));
204     }
205 
206     /**
207      * Notifies writes are finished.
208      */
closeWrite()209     public void closeWrite() {
210         mIoHandler.sendEmptyMessage(MSG_CLOSE_WRITE);
211     }
212 
213     /**
214      * Finishes I/O operations and releases all the resources.
215      * @throws IOException
216      */
release()217     public void release() throws IOException {
218         if (mIoHandler == null) {
219             return;
220         }
221         // Finishes all I/O operations.
222         ConditionVariable conditionVariable = new ConditionVariable();
223         mIoHandler.sendMessage(mIoHandler.obtainMessage(MSG_RELEASE, conditionVariable));
224         conditionVariable.block();
225 
226         for (int i = 0; i < mTrackCount; ++i) {
227             mBufferManager.unregisterChunkEvictedListener(mIds.get(i));
228         }
229         try {
230             if (mBufferReason == RecordingSampleBuffer.BUFFER_REASON_RECORDING && mTrackCount > 0) {
231                 // Saves meta information for recording.
232                 Pair<String, android.media.MediaFormat> audio = null, video = null;
233                 for (int i = 0; i < mTrackCount; ++i) {
234                     android.media.MediaFormat format =
235                             mMediaFormats.get(i).getFrameworkMediaFormatV16();
236                     format.setLong(android.media.MediaFormat.KEY_DURATION, mBufferDurationUs);
237                     if (audio == null && MimeTypes.isAudio(mMediaFormats.get(i).mimeType)) {
238                         audio = new Pair<>(mIds.get(i), format);
239                     } else if (video == null && MimeTypes.isVideo(mMediaFormats.get(i).mimeType)) {
240                         video = new Pair<>(mIds.get(i), format);
241                     }
242                     if (audio != null && video != null) {
243                         break;
244                     }
245                 }
246                 mBufferManager.writeMetaFiles(audio, video);
247             }
248         } finally {
249             mBufferManager.release();
250             mIoHandler.getLooper().quitSafely();
251         }
252     }
253 
254     @Override
handleMessage(Message message)255     public boolean handleMessage(Message message) {
256         if (mFinished) {
257             return true;
258         }
259         releaseEvictedChunks();
260         try {
261             switch (message.what) {
262                 case MSG_OPEN_READ:
263                     doOpenRead((IoParams) message.obj);
264                     return true;
265                 case MSG_OPEN_WRITE:
266                     doOpenWrite((int) message.obj);
267                     return true;
268                 case MSG_CLOSE_WRITE:
269                     doCloseWrite();
270                     return true;
271                 case MSG_READ:
272                     doRead((int) message.obj);
273                     return true;
274                 case MSG_WRITE:
275                     doWrite((IoParams) message.obj);
276                     // Since only write will increase storage, eviction will be handled here.
277                     return true;
278                 case MSG_RELEASE:
279                     doRelease((ConditionVariable) message.obj);
280                     return true;
281             }
282         } catch (IOException e) {
283             mIoCallback.onIoError();
284             mErrorNotified = true;
285             Log.e(TAG, "IoException happened", e);
286             return true;
287         }
288         return false;
289     }
290 
doOpenRead(IoParams params)291     private void doOpenRead(IoParams params) throws IOException {
292         int index = params.index;
293         mIoHandler.removeMessages(MSG_READ, index);
294         SampleChunk chunk = mBufferManager.getReadFile(mIds.get(index), params.positionUs);
295         if (chunk == null) {
296             String errorMessage = "Chunk ID:" + mIds.get(index) + " pos:" + params.positionUs
297                     + "is not found";
298             SoftPreconditions.checkNotNull(chunk, TAG, errorMessage);
299             throw new IOException(errorMessage);
300         }
301         mReadIoStates[index].openRead(chunk);
302         if (mHandlerReadSampleBuffers[index] != null) {
303             SampleHolder sample;
304             while ((sample = mHandlerReadSampleBuffers[index].poll()) != null) {
305                 mSamplePool.releaseSample(sample);
306             }
307         }
308         mHandlerReadSampleBuffers[index] = params.readSampleBuffer;
309         mIoHandler.sendMessage(mIoHandler.obtainMessage(MSG_READ, index));
310     }
311 
doOpenWrite(int index)312     private void doOpenWrite(int index) throws IOException {
313         SampleChunk chunk = mBufferManager.createNewWriteFile(mIds.get(index), 0, mSamplePool);
314         mWriteIoStates[index].openWrite(chunk);
315     }
316 
doRead(int index)317     private void doRead(int index) throws IOException {
318         mIoHandler.removeMessages(MSG_READ, index);
319         if (mHandlerReadSampleBuffers[index].size() >= MAX_READ_BUFFER_SAMPLES) {
320             // If enough samples are buffered, try again few moments later hoping that
321             // buffered samples are consumed.
322             mIoHandler.sendMessageDelayed(
323                     mIoHandler.obtainMessage(MSG_READ, index), READ_RESCHEDULING_DELAY_MS);
324         } else {
325             if (mReadIoStates[index].isReadFinished()) {
326                 for (int i = 0; i < mTrackCount; ++i) {
327                     if (!mReadIoStates[i].isReadFinished()) {
328                         return;
329                     }
330                 }
331                 mIoCallback.onIoReachedEos();
332                 return;
333             }
334             SampleHolder sample = mReadIoStates[index].read();
335             if (sample != null) {
336                 mHandlerReadSampleBuffers[index].offer(sample);
337             } else {
338                 // Read reached write but write is not finished yet --- wait a few moments to
339                 // see if another sample is written.
340                 mIoHandler.sendMessageDelayed(
341                         mIoHandler.obtainMessage(MSG_READ, index),
342                         READ_RESCHEDULING_DELAY_MS);
343             }
344         }
345     }
346 
doWrite(IoParams params)347     private void doWrite(IoParams params) throws IOException {
348         try {
349             if (mWriteEnded) {
350                 SoftPreconditions.checkState(false);
351                 return;
352             }
353             int index = params.index;
354             SampleHolder sample = params.sample;
355             SampleChunk nextChunk = null;
356             if ((sample.flags & MediaCodec.BUFFER_FLAG_KEY_FRAME) != 0) {
357                 if (sample.timeUs > mBufferDurationUs) {
358                     mBufferDurationUs = sample.timeUs;
359                 }
360 
361                 if (sample.timeUs >= mWriteEndPositionUs[index]) {
362                     nextChunk = mBufferManager.createNewWriteFile(mIds.get(index),
363                             mWriteEndPositionUs[index], mSamplePool);
364                     mWriteEndPositionUs[index] =
365                             ((sample.timeUs / RecordingSampleBuffer.CHUNK_DURATION_US) + 1) *
366                                     RecordingSampleBuffer.CHUNK_DURATION_US;
367                 }
368             }
369             mWriteIoStates[params.index].write(params.sample, nextChunk);
370         } finally {
371             params.conditionVariable.open();
372         }
373     }
374 
doCloseWrite()375     private void doCloseWrite() throws IOException {
376         if (mWriteEnded) {
377             return;
378         }
379         mWriteEnded = true;
380         boolean readFinished = true;
381         for (int i = 0; i < mTrackCount; ++i) {
382             readFinished = readFinished && mReadIoStates[i].isReadFinished();
383             mWriteIoStates[i].closeWrite();
384         }
385         if (readFinished) {
386             mIoCallback.onIoReachedEos();
387         }
388     }
389 
doRelease(ConditionVariable conditionVariable)390     private void doRelease(ConditionVariable conditionVariable) {
391         mIoHandler.removeCallbacksAndMessages(null);
392         mFinished = true;
393         conditionVariable.open();
394     }
395 
releaseEvictedChunks()396     private void releaseEvictedChunks() {
397         if (mBufferReason != RecordingSampleBuffer.BUFFER_REASON_LIVE_PLAYBACK) {
398             return;
399         }
400         for (int i = 0; i < mTrackCount; ++i) {
401             long evictEndPositionUs = Math.min(mBufferManager.getStartPositionUs(mIds.get(i)),
402                     mReadIoStates[i].getStartPositionUs());
403             mBufferManager.evictChunks(mIds.get(i), evictEndPositionUs);
404         }
405     }
406 }