1 /* 2 * Copyright (C) 2016 The Android Open Source Project 3 * 4 * Licensed under the Apache License, Version 2.0 (the "License"); 5 * you may not use this file except in compliance with the License. 6 * You may obtain a copy of the License at 7 * 8 * http://www.apache.org/licenses/LICENSE-2.0 9 * 10 * Unless required by applicable law or agreed to in writing, software 11 * distributed under the License is distributed on an "AS IS" BASIS, 12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13 * See the License for the specific language governing permissions and 14 * limitations under the License. 15 */ 16 17 package com.android.tv.tuner.exoplayer.buffer; 18 19 import android.media.MediaCodec; 20 import android.os.ConditionVariable; 21 import android.os.Handler; 22 import android.os.HandlerThread; 23 import android.os.Message; 24 import android.util.Log; 25 import android.util.Pair; 26 27 import com.google.android.exoplayer.MediaFormat; 28 import com.google.android.exoplayer.SampleHolder; 29 import com.google.android.exoplayer.util.MimeTypes; 30 import com.android.tv.common.SoftPreconditions; 31 import com.android.tv.tuner.exoplayer.buffer.RecordingSampleBuffer.BufferReason; 32 33 import java.io.IOException; 34 import java.util.List; 35 import java.util.concurrent.ConcurrentLinkedQueue; 36 37 /** 38 * Handles all {@link SampleChunk} I/O operations. 39 * An I/O dedicated thread handles all I/O operations for synchronization. 40 */ 41 public class SampleChunkIoHelper implements Handler.Callback { 42 private static final String TAG = "SampleChunkIoHelper"; 43 44 private static final int MAX_READ_BUFFER_SAMPLES = 3; 45 private static final int READ_RESCHEDULING_DELAY_MS = 10; 46 47 private static final int MSG_OPEN_READ = 1; 48 private static final int MSG_OPEN_WRITE = 2; 49 private static final int MSG_CLOSE_WRITE = 3; 50 private static final int MSG_READ = 4; 51 private static final int MSG_WRITE = 5; 52 private static final int MSG_RELEASE = 6; 53 54 private final int mTrackCount; 55 private final List<String> mIds; 56 private final List<MediaFormat> mMediaFormats; 57 private final @BufferReason int mBufferReason; 58 private final BufferManager mBufferManager; 59 private final SamplePool mSamplePool; 60 private final IoCallback mIoCallback; 61 62 private Handler mIoHandler; 63 private final ConcurrentLinkedQueue<SampleHolder> mReadSampleBuffers[]; 64 private final ConcurrentLinkedQueue<SampleHolder> mHandlerReadSampleBuffers[]; 65 private final long[] mWriteEndPositionUs; 66 private final SampleChunk.IoState[] mReadIoStates; 67 private final SampleChunk.IoState[] mWriteIoStates; 68 private long mBufferDurationUs = 0; 69 private boolean mWriteEnded; 70 private boolean mErrorNotified; 71 private boolean mFinished; 72 73 /** 74 * A Callback for I/O events. 75 */ 76 public static abstract class IoCallback { 77 78 /** 79 * Called when there is no sample to read. 80 */ onIoReachedEos()81 public void onIoReachedEos() { 82 } 83 84 /** 85 * Called when there is an irrecoverable error during I/O. 86 */ onIoError()87 public void onIoError() { 88 } 89 } 90 91 private class IoParams { 92 private final int index; 93 private final long positionUs; 94 private final SampleHolder sample; 95 private final ConditionVariable conditionVariable; 96 private final ConcurrentLinkedQueue<SampleHolder> readSampleBuffer; 97 IoParams(int index, long positionUs, SampleHolder sample, ConditionVariable conditionVariable, ConcurrentLinkedQueue<SampleHolder> readSampleBuffer)98 private IoParams(int index, long positionUs, SampleHolder sample, 99 ConditionVariable conditionVariable, 100 ConcurrentLinkedQueue<SampleHolder> readSampleBuffer) { 101 this.index = index; 102 this.positionUs = positionUs; 103 this.sample = sample; 104 this.conditionVariable = conditionVariable; 105 this.readSampleBuffer = readSampleBuffer; 106 } 107 } 108 109 /** 110 * Creates {@link SampleChunk} I/O handler. 111 * 112 * @param ids track names 113 * @param mediaFormats {@link android.media.MediaFormat} for each track 114 * @param bufferReason reason to be buffered 115 * @param bufferManager manager of {@link SampleChunk} collections 116 * @param samplePool allocator for a sample 117 * @param ioCallback listeners for I/O events 118 */ SampleChunkIoHelper(List<String> ids, List<MediaFormat> mediaFormats, @BufferReason int bufferReason, BufferManager bufferManager, SamplePool samplePool, IoCallback ioCallback)119 public SampleChunkIoHelper(List<String> ids, List<MediaFormat> mediaFormats, 120 @BufferReason int bufferReason, BufferManager bufferManager, SamplePool samplePool, 121 IoCallback ioCallback) { 122 mTrackCount = ids.size(); 123 mIds = ids; 124 mMediaFormats = mediaFormats; 125 mBufferReason = bufferReason; 126 mBufferManager = bufferManager; 127 mSamplePool = samplePool; 128 mIoCallback = ioCallback; 129 130 mReadSampleBuffers = new ConcurrentLinkedQueue[mTrackCount]; 131 mHandlerReadSampleBuffers = new ConcurrentLinkedQueue[mTrackCount]; 132 mWriteEndPositionUs = new long[mTrackCount]; 133 mReadIoStates = new SampleChunk.IoState[mTrackCount]; 134 mWriteIoStates = new SampleChunk.IoState[mTrackCount]; 135 for (int i = 0; i < mTrackCount; ++i) { 136 mWriteEndPositionUs[i] = RecordingSampleBuffer.CHUNK_DURATION_US; 137 mReadIoStates[i] = new SampleChunk.IoState(); 138 mWriteIoStates[i] = new SampleChunk.IoState(); 139 } 140 } 141 142 /** 143 * Prepares and initializes for I/O operations. 144 * 145 * @throws IOException 146 */ init()147 public void init() throws IOException { 148 HandlerThread handlerThread = new HandlerThread(TAG); 149 handlerThread.start(); 150 mIoHandler = new Handler(handlerThread.getLooper(), this); 151 if (mBufferReason == RecordingSampleBuffer.BUFFER_REASON_RECORDED_PLAYBACK) { 152 for (int i = 0; i < mTrackCount; ++i) { 153 mBufferManager.loadTrackFromStorage(mIds.get(i), mSamplePool); 154 } 155 mWriteEnded = true; 156 } else { 157 for (int i = 0; i < mTrackCount; ++i) { 158 mIoHandler.sendMessage(mIoHandler.obtainMessage(MSG_OPEN_WRITE, i)); 159 } 160 } 161 } 162 163 /** 164 * Reads a sample if it is available. 165 * 166 * @param index track index 167 * @return {@code null} if a sample is not available, otherwise returns a sample 168 */ readSample(int index)169 public SampleHolder readSample(int index) { 170 SampleHolder sample = mReadSampleBuffers[index].poll(); 171 mIoHandler.sendMessage(mIoHandler.obtainMessage(MSG_READ, index)); 172 return sample; 173 } 174 175 /** 176 * Writes a sample. 177 * 178 * @param index track index 179 * @param sample to write 180 * @param conditionVariable which will be wait until the write is finished 181 * @throws IOException 182 */ writeSample(int index, SampleHolder sample, ConditionVariable conditionVariable)183 public void writeSample(int index, SampleHolder sample, 184 ConditionVariable conditionVariable) throws IOException { 185 if (mErrorNotified) { 186 throw new IOException("Storage I/O error happened"); 187 } 188 conditionVariable.close(); 189 IoParams params = new IoParams(index, 0, sample, conditionVariable, null); 190 mIoHandler.sendMessage(mIoHandler.obtainMessage(MSG_WRITE, params)); 191 } 192 193 /** 194 * Starts read from the specified position. 195 * 196 * @param index track index 197 * @param positionUs the specified position 198 */ openRead(int index, long positionUs)199 public void openRead(int index, long positionUs) { 200 // Old mReadSampleBuffers may have a pending read. 201 mReadSampleBuffers[index] = new ConcurrentLinkedQueue<>(); 202 IoParams params = new IoParams(index, positionUs, null, null, mReadSampleBuffers[index]); 203 mIoHandler.sendMessage(mIoHandler.obtainMessage(MSG_OPEN_READ, params)); 204 } 205 206 /** 207 * Notifies writes are finished. 208 */ closeWrite()209 public void closeWrite() { 210 mIoHandler.sendEmptyMessage(MSG_CLOSE_WRITE); 211 } 212 213 /** 214 * Finishes I/O operations and releases all the resources. 215 * @throws IOException 216 */ release()217 public void release() throws IOException { 218 if (mIoHandler == null) { 219 return; 220 } 221 // Finishes all I/O operations. 222 ConditionVariable conditionVariable = new ConditionVariable(); 223 mIoHandler.sendMessage(mIoHandler.obtainMessage(MSG_RELEASE, conditionVariable)); 224 conditionVariable.block(); 225 226 for (int i = 0; i < mTrackCount; ++i) { 227 mBufferManager.unregisterChunkEvictedListener(mIds.get(i)); 228 } 229 try { 230 if (mBufferReason == RecordingSampleBuffer.BUFFER_REASON_RECORDING && mTrackCount > 0) { 231 // Saves meta information for recording. 232 Pair<String, android.media.MediaFormat> audio = null, video = null; 233 for (int i = 0; i < mTrackCount; ++i) { 234 android.media.MediaFormat format = 235 mMediaFormats.get(i).getFrameworkMediaFormatV16(); 236 format.setLong(android.media.MediaFormat.KEY_DURATION, mBufferDurationUs); 237 if (audio == null && MimeTypes.isAudio(mMediaFormats.get(i).mimeType)) { 238 audio = new Pair<>(mIds.get(i), format); 239 } else if (video == null && MimeTypes.isVideo(mMediaFormats.get(i).mimeType)) { 240 video = new Pair<>(mIds.get(i), format); 241 } 242 if (audio != null && video != null) { 243 break; 244 } 245 } 246 mBufferManager.writeMetaFiles(audio, video); 247 } 248 } finally { 249 mBufferManager.release(); 250 mIoHandler.getLooper().quitSafely(); 251 } 252 } 253 254 @Override handleMessage(Message message)255 public boolean handleMessage(Message message) { 256 if (mFinished) { 257 return true; 258 } 259 releaseEvictedChunks(); 260 try { 261 switch (message.what) { 262 case MSG_OPEN_READ: 263 doOpenRead((IoParams) message.obj); 264 return true; 265 case MSG_OPEN_WRITE: 266 doOpenWrite((int) message.obj); 267 return true; 268 case MSG_CLOSE_WRITE: 269 doCloseWrite(); 270 return true; 271 case MSG_READ: 272 doRead((int) message.obj); 273 return true; 274 case MSG_WRITE: 275 doWrite((IoParams) message.obj); 276 // Since only write will increase storage, eviction will be handled here. 277 return true; 278 case MSG_RELEASE: 279 doRelease((ConditionVariable) message.obj); 280 return true; 281 } 282 } catch (IOException e) { 283 mIoCallback.onIoError(); 284 mErrorNotified = true; 285 Log.e(TAG, "IoException happened", e); 286 return true; 287 } 288 return false; 289 } 290 doOpenRead(IoParams params)291 private void doOpenRead(IoParams params) throws IOException { 292 int index = params.index; 293 mIoHandler.removeMessages(MSG_READ, index); 294 SampleChunk chunk = mBufferManager.getReadFile(mIds.get(index), params.positionUs); 295 if (chunk == null) { 296 String errorMessage = "Chunk ID:" + mIds.get(index) + " pos:" + params.positionUs 297 + "is not found"; 298 SoftPreconditions.checkNotNull(chunk, TAG, errorMessage); 299 throw new IOException(errorMessage); 300 } 301 mReadIoStates[index].openRead(chunk); 302 if (mHandlerReadSampleBuffers[index] != null) { 303 SampleHolder sample; 304 while ((sample = mHandlerReadSampleBuffers[index].poll()) != null) { 305 mSamplePool.releaseSample(sample); 306 } 307 } 308 mHandlerReadSampleBuffers[index] = params.readSampleBuffer; 309 mIoHandler.sendMessage(mIoHandler.obtainMessage(MSG_READ, index)); 310 } 311 doOpenWrite(int index)312 private void doOpenWrite(int index) throws IOException { 313 SampleChunk chunk = mBufferManager.createNewWriteFile(mIds.get(index), 0, mSamplePool); 314 mWriteIoStates[index].openWrite(chunk); 315 } 316 doRead(int index)317 private void doRead(int index) throws IOException { 318 mIoHandler.removeMessages(MSG_READ, index); 319 if (mHandlerReadSampleBuffers[index].size() >= MAX_READ_BUFFER_SAMPLES) { 320 // If enough samples are buffered, try again few moments later hoping that 321 // buffered samples are consumed. 322 mIoHandler.sendMessageDelayed( 323 mIoHandler.obtainMessage(MSG_READ, index), READ_RESCHEDULING_DELAY_MS); 324 } else { 325 if (mReadIoStates[index].isReadFinished()) { 326 for (int i = 0; i < mTrackCount; ++i) { 327 if (!mReadIoStates[i].isReadFinished()) { 328 return; 329 } 330 } 331 mIoCallback.onIoReachedEos(); 332 return; 333 } 334 SampleHolder sample = mReadIoStates[index].read(); 335 if (sample != null) { 336 mHandlerReadSampleBuffers[index].offer(sample); 337 } else { 338 // Read reached write but write is not finished yet --- wait a few moments to 339 // see if another sample is written. 340 mIoHandler.sendMessageDelayed( 341 mIoHandler.obtainMessage(MSG_READ, index), 342 READ_RESCHEDULING_DELAY_MS); 343 } 344 } 345 } 346 doWrite(IoParams params)347 private void doWrite(IoParams params) throws IOException { 348 try { 349 if (mWriteEnded) { 350 SoftPreconditions.checkState(false); 351 return; 352 } 353 int index = params.index; 354 SampleHolder sample = params.sample; 355 SampleChunk nextChunk = null; 356 if ((sample.flags & MediaCodec.BUFFER_FLAG_KEY_FRAME) != 0) { 357 if (sample.timeUs > mBufferDurationUs) { 358 mBufferDurationUs = sample.timeUs; 359 } 360 361 if (sample.timeUs >= mWriteEndPositionUs[index]) { 362 nextChunk = mBufferManager.createNewWriteFile(mIds.get(index), 363 mWriteEndPositionUs[index], mSamplePool); 364 mWriteEndPositionUs[index] = 365 ((sample.timeUs / RecordingSampleBuffer.CHUNK_DURATION_US) + 1) * 366 RecordingSampleBuffer.CHUNK_DURATION_US; 367 } 368 } 369 mWriteIoStates[params.index].write(params.sample, nextChunk); 370 } finally { 371 params.conditionVariable.open(); 372 } 373 } 374 doCloseWrite()375 private void doCloseWrite() throws IOException { 376 if (mWriteEnded) { 377 return; 378 } 379 mWriteEnded = true; 380 boolean readFinished = true; 381 for (int i = 0; i < mTrackCount; ++i) { 382 readFinished = readFinished && mReadIoStates[i].isReadFinished(); 383 mWriteIoStates[i].closeWrite(); 384 } 385 if (readFinished) { 386 mIoCallback.onIoReachedEos(); 387 } 388 } 389 doRelease(ConditionVariable conditionVariable)390 private void doRelease(ConditionVariable conditionVariable) { 391 mIoHandler.removeCallbacksAndMessages(null); 392 mFinished = true; 393 conditionVariable.open(); 394 } 395 releaseEvictedChunks()396 private void releaseEvictedChunks() { 397 if (mBufferReason != RecordingSampleBuffer.BUFFER_REASON_LIVE_PLAYBACK) { 398 return; 399 } 400 for (int i = 0; i < mTrackCount; ++i) { 401 long evictEndPositionUs = Math.min(mBufferManager.getStartPositionUs(mIds.get(i)), 402 mReadIoStates[i].getStartPositionUs()); 403 mBufferManager.evictChunks(mIds.get(i), evictEndPositionUs); 404 } 405 } 406 }