1 /* 2 * Copyright (C) 2015 The Android Open Source Project 3 * 4 * Licensed under the Apache License, Version 2.0 (the "License"); 5 * you may not use this file except in compliance with the License. 6 * You may obtain a copy of the License at 7 * 8 * http://www.apache.org/licenses/LICENSE-2.0 9 * 10 * Unless required by applicable law or agreed to in writing, software 11 * distributed under the License is distributed on an "AS IS" BASIS, 12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13 * See the License for the specific language governing permissions and 14 * limitations under the License. 15 */ 16 17 package com.android.usbtuner; 18 19 import android.media.MediaDataSource; 20 import android.util.Log; 21 22 import com.android.usbtuner.ChannelScanFileParser.ScanChannel; 23 import com.android.usbtuner.data.Channel; 24 import com.android.usbtuner.data.TunerChannel; 25 import com.android.usbtuner.tvinput.EventDetector; 26 import com.android.usbtuner.tvinput.EventDetector.EventListener; 27 import com.android.usbtuner.tvinput.UsbTunerDebug; 28 29 import java.io.IOException; 30 import java.util.Locale; 31 import java.util.concurrent.atomic.AtomicLong; 32 33 /** 34 * A {@link MediaDataSource} implementation which provides the mpeg2ts stream from the tuner device 35 * to {@link MediaExtractor}. 36 */ 37 public class UsbTunerDataSource extends MediaDataSource implements InputStreamSource { 38 private static final String TAG = "UsbTunerDataSource"; 39 40 private static final int MIN_READ_UNIT = 1500; 41 private static final int READ_BUFFER_SIZE = MIN_READ_UNIT * 10; // ~15KB 42 private static final int CIRCULAR_BUFFER_SIZE = MIN_READ_UNIT * 20000; // ~ 30MB 43 44 private static final int READ_TIMEOUT_MS = 5000; // 5 secs. 45 private static final int BUFFER_UNDERRUN_SLEEP_MS = 10; 46 47 private static final int CACHE_KEY_VERSION = 1; 48 49 // UTCK stands for USB Tuner Cache Key. 50 private static final String CACHE_KEY_PREFIX = "UTCK"; 51 52 private final Object mCircularBufferMonitor = new Object(); 53 private final byte[] mCircularBuffer = new byte[CIRCULAR_BUFFER_SIZE]; 54 private long mBytesFetched; 55 private final AtomicLong mLastReadPosition = new AtomicLong(); 56 private boolean mEndOfStreamSent; 57 private boolean mStreaming; 58 59 private final TunerHal mTunerHal; 60 private Thread mStreamingThread; 61 private boolean mDeviceConfigured; 62 private EventDetector mEventDetector; 63 UsbTunerDataSource(TunerHal tunerHal, EventListener eventListener)64 public UsbTunerDataSource(TunerHal tunerHal, EventListener eventListener) { 65 mTunerHal = tunerHal; 66 mEventDetector = new EventDetector(mTunerHal, eventListener); 67 } 68 69 /** 70 * Starts the streaming of a configured program. Throws a runtime exception if no channel and 71 * program have successfully been configured yet. 72 */ 73 @Override startStream()74 public void startStream() { 75 if (!mDeviceConfigured) { 76 throw new RuntimeException("Channel and program not configured!"); 77 } 78 79 synchronized (mCircularBufferMonitor) { 80 if (mStreaming) { 81 Log.w(TAG, "Streaming should be stopped before start streaming"); 82 return; 83 } 84 mStreaming = true; 85 mBytesFetched = 0; 86 mLastReadPosition.set(0L); 87 mEndOfStreamSent = false; 88 } 89 90 mStreamingThread = new StreamingThread(); 91 mStreamingThread.start(); 92 Log.i(TAG, "Streaming started"); 93 } 94 95 /** 96 * Sets the channel required to start streaming from this device. Afterwards, prepares the tuner 97 * device for streaming. Package retrieval can be made at any time after invoking this method 98 * and before stopping the stream. 99 * 100 * @param channel a {@link TunerChannel} instance tune to 101 * @return {@code true} if the entire operation was successful; {@code false} otherwise 102 */ 103 @Override tuneToChannel(TunerChannel channel)104 public boolean tuneToChannel(TunerChannel channel) { 105 if (mTunerHal.tune(channel.getFrequency(), channel.getModulation())) { 106 if (channel.hasVideo()) { 107 mTunerHal.addPidFilter(channel.getVideoPid(), 108 TunerHal.FILTER_TYPE_VIDEO); 109 } 110 if (channel.hasAudio()) { 111 mTunerHal.addPidFilter(channel.getAudioPid(), 112 TunerHal.FILTER_TYPE_AUDIO); 113 } 114 mTunerHal.addPidFilter(channel.getPcrPid(), 115 TunerHal.FILTER_TYPE_PCR); 116 if (mEventDetector != null) { 117 mEventDetector.startDetecting(channel.getFrequency(), channel.getModulation()); 118 } 119 mDeviceConfigured = true; 120 return true; 121 } 122 return false; 123 } 124 125 /** 126 * Blocks the current thread until the streaming thread stops. In rare cases when the tuner 127 * device is overloaded this can take a while, but usually it returns pretty quickly. 128 */ 129 @Override stopStream()130 public void stopStream() { 131 synchronized (mCircularBufferMonitor) { 132 mStreaming = false; 133 mCircularBufferMonitor.notify(); 134 } 135 136 try { 137 if (mStreamingThread != null) { 138 mStreamingThread.join(); 139 } 140 } catch (InterruptedException e) { 141 Thread.currentThread().interrupt(); 142 } finally { 143 mTunerHal.stopTune(); 144 } 145 } 146 147 @Override getLimit()148 public long getLimit() { 149 synchronized (mCircularBufferMonitor) { 150 return mBytesFetched; 151 } 152 } 153 154 @Override getPosition()155 public long getPosition() { 156 return mLastReadPosition.get(); 157 } 158 159 private class StreamingThread extends Thread { 160 @Override run()161 public void run() { 162 // Buffers for streaming data from the tuner and the internal buffer. 163 byte[] dataBuffer = new byte[READ_BUFFER_SIZE]; 164 165 while (true) { 166 synchronized (mCircularBufferMonitor) { 167 if (!mStreaming) { 168 break; 169 } 170 } 171 172 int bytesWritten = mTunerHal.readTsStream(dataBuffer, dataBuffer.length); 173 if (bytesWritten <= 0) { 174 try { 175 // When buffer is underrun, we sleep for short time to prevent 176 // unnecessary CPU draining. 177 sleep(BUFFER_UNDERRUN_SLEEP_MS); 178 } catch (InterruptedException e) { 179 Thread.currentThread().interrupt(); 180 } 181 continue; 182 } 183 184 if (mEventDetector != null) { 185 mEventDetector.feedTSStream(dataBuffer, 0, bytesWritten); 186 } 187 synchronized (mCircularBufferMonitor) { 188 int posInBuffer = (int) (mBytesFetched % CIRCULAR_BUFFER_SIZE); 189 int bytesToCopyInFirstPass = bytesWritten; 190 if (posInBuffer + bytesToCopyInFirstPass > mCircularBuffer.length) { 191 bytesToCopyInFirstPass = mCircularBuffer.length - posInBuffer; 192 } 193 System.arraycopy(dataBuffer, 0, mCircularBuffer, posInBuffer, 194 bytesToCopyInFirstPass); 195 if (bytesToCopyInFirstPass < bytesWritten) { 196 System.arraycopy(dataBuffer, bytesToCopyInFirstPass, mCircularBuffer, 0, 197 bytesWritten - bytesToCopyInFirstPass); 198 } 199 mBytesFetched += bytesWritten; 200 mCircularBufferMonitor.notify(); 201 } 202 } 203 204 Log.i(TAG, "Streaming stopped"); 205 } 206 } 207 208 @Override readAt(long pos, byte[] buffer, int offset, int amount)209 public int readAt(long pos, byte[] buffer, int offset, int amount) throws IOException { 210 synchronized (mCircularBufferMonitor) { 211 if (mEndOfStreamSent) { 212 // Nothing was received during READ_TIMEOUT_MS before. 213 return -1; 214 } 215 if (mBytesFetched - CIRCULAR_BUFFER_SIZE > pos) { 216 // Not available at circular buffer. 217 Log.w(TAG, "Not available at circular buffer"); 218 return -1; 219 } 220 long initialBytesFetched = mBytesFetched; 221 while (mBytesFetched < pos + amount && mStreaming) { 222 try { 223 mCircularBufferMonitor.wait(READ_TIMEOUT_MS); 224 } catch (InterruptedException e) { 225 // Wait again. 226 Thread.currentThread().interrupt(); 227 } 228 if (initialBytesFetched == mBytesFetched) { 229 Log.w(TAG, "No data update for " + READ_TIMEOUT_MS + "ms. returning -1."); 230 231 // Returning -1 will make demux report EOS so that the input service can retry 232 // the playback. 233 mEndOfStreamSent = true; 234 return -1; 235 } 236 } 237 if (!mStreaming) { 238 Log.w(TAG, "Stream is already stopped."); 239 return -1; 240 } 241 if (mBytesFetched - CIRCULAR_BUFFER_SIZE > pos) { 242 Log.e(TAG, "Demux is requesting the data which is already overwritten."); 243 return -1; 244 } 245 int posInBuffer = (int) (pos % CIRCULAR_BUFFER_SIZE); 246 int bytesToCopyInFirstPass = amount; 247 if (posInBuffer + bytesToCopyInFirstPass > mCircularBuffer.length) { 248 bytesToCopyInFirstPass = mCircularBuffer.length - posInBuffer; 249 } 250 System.arraycopy(mCircularBuffer, posInBuffer, buffer, offset, bytesToCopyInFirstPass); 251 if (bytesToCopyInFirstPass < amount) { 252 System.arraycopy(mCircularBuffer, 0, buffer, offset + bytesToCopyInFirstPass, 253 amount - bytesToCopyInFirstPass); 254 } 255 mLastReadPosition.set(pos + amount); 256 mCircularBufferMonitor.notify(); 257 258 if (UsbTunerDebug.ENABLED) { 259 UsbTunerDebug.setBytesInQueue((int) (mBytesFetched - mLastReadPosition.get())); 260 } 261 262 return amount; 263 } 264 } 265 266 @Override getSize()267 public long getSize() throws IOException { 268 return -1; 269 } 270 271 @Override close()272 public void close() { 273 // Called from system MediaExtractor. All the resource should be closed 274 // in stopStream() already. 275 } 276 277 @Override getType()278 public int getType() { 279 return Channel.TYPE_TUNER; 280 } 281 282 @Override setScanChannel(ScanChannel channel)283 public boolean setScanChannel(ScanChannel channel) { 284 return false; 285 } 286 generateCacheKey(TunerChannel channel, long timestampMs)287 public static String generateCacheKey(TunerChannel channel, long timestampMs) { 288 return String.format(Locale.ENGLISH, "%s-%x-%x-%x-%x", CACHE_KEY_PREFIX, CACHE_KEY_VERSION, 289 channel.getFrequency(), channel.getProgramNumber(), timestampMs); 290 } 291 292 /** 293 * Parses the timestamp from a cache key generated by {@link #generateCacheKey}. 294 * 295 * @param cacheKey a cache key generated by {@link #generateCacheKey} 296 * @return the timestamp parsed from the given cache key. {@code -1} if unable to parse. 297 */ parseTimestampFromCacheKey(String cacheKey)298 public static long parseTimestampFromCacheKey(String cacheKey) { 299 String[] tokens = cacheKey.split("-"); 300 if (tokens.length < 2 || !tokens[0].equals(CACHE_KEY_PREFIX)) { 301 return -1; 302 } 303 int version = Integer.parseInt(tokens[1], 16); 304 if (version == 1) { 305 return Long.parseLong(tokens[4], 16); 306 } else { 307 return -1; 308 } 309 } 310 } 311