1 /* 2 * Copyright (C) 2015 The Android Open Source Project 3 * 4 * Licensed under the Apache License, Version 2.0 (the "License"); 5 * you may not use this file except in compliance with the License. 6 * You may obtain a copy of the License at 7 * 8 * http://www.apache.org/licenses/LICENSE-2.0 9 * 10 * Unless required by applicable law or agreed to in writing, software 11 * distributed under the License is distributed on an "AS IS" BASIS, 12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13 * See the License for the specific language governing permissions and 14 * limitations under the License. 15 */ 16 17 package com.android.usbtuner; 18 19 import android.media.MediaDataSource; 20 import android.os.Environment; 21 import android.util.Log; 22 import android.util.SparseBooleanArray; 23 24 import com.android.usbtuner.ChannelScanFileParser.ScanChannel; 25 import com.android.usbtuner.data.Channel; 26 import com.android.usbtuner.data.TunerChannel; 27 import com.android.usbtuner.ts.TsParser; 28 import com.android.usbtuner.tvinput.EventDetector; 29 import com.android.usbtuner.tvinput.FileSourceEventDetector; 30 31 import java.io.BufferedInputStream; 32 import java.io.File; 33 import java.io.FileInputStream; 34 import java.io.IOException; 35 import java.util.List; 36 37 /** 38 * A {@link DataSource} implementation which provides the MPEG-2 TS stream from a local file 39 * generated by capturing TV signal. 40 */ 41 public class FileDataSource extends MediaDataSource implements InputStreamSource { 42 private static final String TAG = "FileDataSource"; 43 44 private static final int TS_PACKET_SIZE = 188; 45 private static final int TS_SYNC_BYTE = 0x47; 46 private static final int MIN_READ_UNIT = TS_PACKET_SIZE * 10; 47 private static final int READ_BUFFER_SIZE = MIN_READ_UNIT * 10; // ~20KB 48 private static final int CIRCULAR_BUFFER_SIZE = MIN_READ_UNIT * 4000; // ~ 8MB 49 private static final int PADDING_SIZE = MIN_READ_UNIT * 1000; // ~2MB 50 private static final int READ_TIMEOUT_MS = 10000; // 10 secs. 51 private static final int BUFFER_UNDERRUN_SLEEP_MS = 10; 52 private static final String FILE_DIR = 53 new File(Environment.getExternalStorageDirectory(), "Streams").getAbsolutePath(); 54 55 // Virtual frequency base used for file-based source 56 public static final int FREQ_BASE = 100; 57 58 private final Object mCircularBufferMonitor = new Object(); 59 private final byte[] mCircularBuffer = new byte[CIRCULAR_BUFFER_SIZE]; 60 private final FileSourceEventDetector mEventDetector; 61 62 private long mBytesFetched; 63 private long mLastReadPosition; 64 private boolean mStreaming; 65 66 private Thread mStreamingThread; 67 private StreamProvider mSource; 68 FileDataSource(EventDetector.EventListener eventListener)69 public FileDataSource(EventDetector.EventListener eventListener) { 70 mEventDetector = new FileSourceEventDetector(eventListener); 71 } 72 73 @Override setScanChannel(ScanChannel channel)74 public boolean setScanChannel(ScanChannel channel) { 75 String filepath = new File(FILE_DIR, channel.filename).getAbsolutePath(); 76 mSource = new StreamProvider(filepath); 77 if (mSource.isReady()) { 78 mEventDetector.start(mSource); 79 return true; 80 } 81 return false; 82 } 83 84 /** 85 * Sets the channel required to start streaming from this device. Afterwards, prepares 86 * the tuner device for streaming. Package retrieval can be made at any time after invoking 87 * this method and before stopping the stream. 88 * 89 * @param channel a {@link TunerChannel} instance tune to 90 * @return {@code true} if the entire operation was successful; {@code false} otherwise 91 */ 92 @Override tuneToChannel(TunerChannel channel)93 public boolean tuneToChannel(TunerChannel channel) { 94 Log.i(TAG, "tuneToChannel with: " + channel.getFilepath()); 95 mSource = new StreamProvider(channel.getFilepath()); 96 if (!mSource.isReady()) { 97 return false; 98 } 99 mEventDetector.start(mSource); 100 mSource.addPidFilter(channel.getVideoPid()); 101 mSource.addPidFilter(channel.getAudioPid()); 102 mSource.addPidFilter(channel.getPcrPid()); 103 return true; 104 } 105 106 /** 107 * Starts streaming data. 108 */ 109 @Override startStream()110 public void startStream() { 111 mSource.addPidFilter(TsParser.ATSC_SI_BASE_PID); 112 mSource.addPidFilter(TsParser.PAT_PID); 113 synchronized (mCircularBufferMonitor) { 114 if (mStreaming) { 115 return; 116 } 117 mStreaming = true; 118 } 119 120 mStreamingThread = new StreamingThread(); 121 mStreamingThread.start(); 122 Log.i(TAG, "Streaming started"); 123 } 124 125 /** 126 * Blocks the current thread until the streaming thread stops. In rare cases when the tuner 127 * device is overloaded this can take a while, but usually it returns pretty quickly. 128 */ 129 @Override stopStream()130 public void stopStream() { 131 synchronized (mCircularBufferMonitor) { 132 mStreaming = false; 133 mCircularBufferMonitor.notify(); 134 } 135 136 try { 137 if (mStreamingThread != null) { 138 mStreamingThread.join(); 139 } 140 } catch (InterruptedException e) { 141 Thread.currentThread().interrupt(); 142 } 143 } 144 145 @Override getLimit()146 public long getLimit() { 147 synchronized (mCircularBufferMonitor) { 148 return mBytesFetched; 149 } 150 } 151 152 @Override getPosition()153 public long getPosition() { 154 synchronized (mCircularBufferMonitor) { 155 return mLastReadPosition; 156 } 157 } 158 159 /** 160 * Provides MPEG-2 transport stream from a local file. Stream can be filtered by PID. 161 */ 162 public static class StreamProvider { 163 private final String mFilepath; 164 private final SparseBooleanArray mPids = new SparseBooleanArray(); 165 private final byte[] mPreBuffer = new byte[READ_BUFFER_SIZE]; 166 167 private BufferedInputStream mInputStream; 168 StreamProvider(String filepath)169 private StreamProvider(String filepath) { 170 mFilepath = filepath; 171 open(filepath); 172 } 173 open(String filepath)174 private void open(String filepath) { 175 try { 176 mInputStream = new BufferedInputStream(new FileInputStream(filepath)); 177 } catch (IOException e) { 178 Log.e(TAG, "Error opening input stream", e); 179 mInputStream = null; 180 } 181 } 182 isReady()183 private boolean isReady() { 184 return mInputStream != null; 185 } 186 getFilepath()187 public String getFilepath() { 188 return mFilepath; 189 } 190 addPidFilter(int pid)191 public void addPidFilter(int pid) { 192 mPids.put(pid, true); 193 } 194 isFilterEmpty()195 public boolean isFilterEmpty() { 196 return mPids.size() > 0; 197 } 198 clearPidFilter()199 public void clearPidFilter() { 200 mPids.clear(); 201 } 202 isInFilter(int pid)203 public boolean isInFilter(int pid) { 204 return mPids.get(pid); 205 } 206 read(byte[] inputBuffer)207 private int read(byte[] inputBuffer) { 208 int readSize = readInternal(); 209 if (readSize <= 0) { 210 // Reached the end of stream. Restart from the beginning. 211 close(); 212 open(mFilepath); 213 if (mInputStream == null) { 214 return -1; 215 } 216 readSize = readInternal(); 217 } 218 219 if (mPreBuffer[0] != TS_SYNC_BYTE) { 220 Log.e(TAG, "Error reading input stream - no TS sync found"); 221 return -1; 222 } 223 int filteredSize = 0; 224 for (int i = 0, destPos = 0; i < readSize; i += TS_PACKET_SIZE) { 225 if (mPreBuffer[i] == TS_SYNC_BYTE) { 226 int pid = ((mPreBuffer[i + 1] & 0x1f) << 8) + (mPreBuffer[i + 2] & 0xff); 227 if (mPids.get(pid)) { 228 System.arraycopy(mPreBuffer, i, inputBuffer, destPos, TS_PACKET_SIZE); 229 destPos += TS_PACKET_SIZE; 230 filteredSize += TS_PACKET_SIZE; 231 } 232 } 233 } 234 return filteredSize; 235 } 236 readInternal()237 private int readInternal() { 238 int readSize; 239 try { 240 readSize = mInputStream.read(mPreBuffer, 0, mPreBuffer.length); 241 } catch (IOException e) { 242 Log.e(TAG, "Error reading input stream", e); 243 return -1; 244 } 245 return readSize; 246 } 247 close()248 private void close() { 249 try { 250 mInputStream.close(); 251 } catch (IOException e) { 252 Log.e(TAG, "Error closing input stream:", e); 253 } 254 mInputStream = null; 255 } 256 } 257 258 @Override readAt(long pos, byte[] buffer, int offset, int amount)259 public int readAt(long pos, byte[] buffer, int offset, int amount) throws IOException { 260 synchronized (mCircularBufferMonitor) { 261 long initialBytesFetched = mBytesFetched; 262 while (mBytesFetched < pos + amount && mStreaming) { 263 try { 264 mCircularBufferMonitor.wait(READ_TIMEOUT_MS); 265 } catch (InterruptedException e) { 266 // Wait again. 267 Thread.currentThread().interrupt(); 268 } 269 if (initialBytesFetched == mBytesFetched) { 270 Log.w(TAG, "No data update for " + READ_TIMEOUT_MS + "ms. returning -1."); 271 272 // Returning -1 will make demux report EOS so that the input service can retry 273 // the playback. 274 return -1; 275 } 276 } 277 if (!mStreaming) { 278 Log.w(TAG, "Stream is already stopped."); 279 return -1; 280 } 281 if (mBytesFetched - CIRCULAR_BUFFER_SIZE > pos) { 282 Log.e(TAG, "Demux is requesting the data which is already overwritten."); 283 return -1; 284 } 285 int posInBuffer = (int) (pos % CIRCULAR_BUFFER_SIZE); 286 int bytesToCopyInFirstPass = amount; 287 if (posInBuffer + bytesToCopyInFirstPass > mCircularBuffer.length) { 288 bytesToCopyInFirstPass = mCircularBuffer.length - posInBuffer; 289 } 290 System.arraycopy(mCircularBuffer, posInBuffer, buffer, offset, bytesToCopyInFirstPass); 291 if (bytesToCopyInFirstPass < amount) { 292 System.arraycopy(mCircularBuffer, 0, buffer, offset + bytesToCopyInFirstPass, 293 amount - bytesToCopyInFirstPass); 294 } 295 mLastReadPosition = pos + amount; 296 mCircularBufferMonitor.notify(); 297 return amount; 298 } 299 } 300 301 @Override getSize()302 public long getSize() throws IOException { 303 return -1; 304 } 305 306 @Override close()307 public void close() {} 308 309 @Override getType()310 public int getType() { 311 return Channel.TYPE_FILE; 312 } 313 314 /** 315 * Adds {@link ScanChannel} instance for local files. 316 * 317 * @param output a list of channels where the results will be placed in 318 */ addLocalStreamFiles(List<ScanChannel> output)319 public static void addLocalStreamFiles(List<ScanChannel> output) { 320 File dir = new File(FILE_DIR); 321 if (!dir.exists()) return; 322 323 File[] tsFiles = dir.listFiles(); 324 if (tsFiles == null) return; 325 int freq = FileDataSource.FREQ_BASE; 326 for (File file : tsFiles) { 327 if (!file.isFile()) continue; 328 output.add(ScanChannel.forFile(freq, file.getName())); 329 freq += 100; 330 } 331 } 332 333 /** 334 * A thread managing a circular buffer that holds stream data to be consumed by player. 335 * Keeps reading data in from a {@link StreamProvider} to hold enough amount for buffering. 336 * Started and stopped by {@link #startStream()} and {@link #stopStream()}, respectively. 337 */ 338 private class StreamingThread extends Thread { 339 @Override run()340 public void run() { 341 byte[] dataBuffer = new byte[READ_BUFFER_SIZE]; 342 343 synchronized (mCircularBufferMonitor) { 344 mBytesFetched = 0; 345 mLastReadPosition = 0; 346 } 347 348 while (true) { 349 synchronized (mCircularBufferMonitor) { 350 while ((mBytesFetched - mLastReadPosition + PADDING_SIZE) > CIRCULAR_BUFFER_SIZE 351 && mStreaming) { 352 try { 353 mCircularBufferMonitor.wait(); 354 } catch (InterruptedException e) { 355 // Wait again. 356 Thread.currentThread().interrupt(); 357 } 358 } 359 if (!mStreaming) { 360 break; 361 } 362 } 363 364 int bytesWritten = mSource.read(dataBuffer); 365 if (bytesWritten <= 0) { 366 try { 367 // When buffer is underrun, we sleep for short time to prevent 368 // unnecessary CPU draining. 369 sleep(BUFFER_UNDERRUN_SLEEP_MS); 370 } catch (InterruptedException e) { 371 Thread.currentThread().interrupt(); 372 } 373 continue; 374 } 375 376 mEventDetector.feedTSStream(dataBuffer, 0, bytesWritten); 377 378 synchronized (mCircularBufferMonitor) { 379 int posInBuffer = (int) (mBytesFetched % CIRCULAR_BUFFER_SIZE); 380 int bytesToCopyInFirstPass = bytesWritten; 381 if (posInBuffer + bytesToCopyInFirstPass > mCircularBuffer.length) { 382 bytesToCopyInFirstPass = mCircularBuffer.length - posInBuffer; 383 } 384 System.arraycopy(dataBuffer, 0, mCircularBuffer, posInBuffer, 385 bytesToCopyInFirstPass); 386 if (bytesToCopyInFirstPass < bytesWritten) { 387 System.arraycopy(dataBuffer, bytesToCopyInFirstPass, mCircularBuffer, 0, 388 bytesWritten - bytesToCopyInFirstPass); 389 } 390 mBytesFetched += bytesWritten; 391 mCircularBufferMonitor.notify(); 392 } 393 } 394 395 Log.i(TAG, "Streaming stopped"); 396 mSource.close(); 397 } 398 } 399 } 400