1 /* 2 * Copyright (C) 2015 The Android Open Source Project 3 * 4 * Licensed under the Apache License, Version 2.0 (the "License"); 5 * you may not use this file except in compliance with the License. 6 * You may obtain a copy of the License at 7 * 8 * http://www.apache.org/licenses/LICENSE-2.0 9 * 10 * Unless required by applicable law or agreed to in writing, software 11 * distributed under the License is distributed on an "AS IS" BASIS, 12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13 * See the License for the specific language governing permissions and 14 * limitations under the License. 15 */ 16 17 package com.android.tv.tuner.source; 18 19 import android.content.Context; 20 import android.util.Log; 21 22 import com.google.android.exoplayer.C; 23 import com.google.android.exoplayer.upstream.DataSpec; 24 import com.android.tv.common.SoftPreconditions; 25 import com.android.tv.tuner.ChannelScanFileParser; 26 import com.android.tv.tuner.TunerHal; 27 import com.android.tv.tuner.TunerPreferences; 28 import com.android.tv.tuner.data.TunerChannel; 29 import com.android.tv.tuner.tvinput.EventDetector; 30 import com.android.tv.tuner.tvinput.EventDetector.EventListener; 31 32 import java.io.IOException; 33 import java.util.List; 34 import java.util.concurrent.atomic.AtomicLong; 35 36 /** 37 * Provides MPEG-2 TS stream sources for channel playing from an underlying tuner device. 38 */ 39 public class TunerTsStreamer implements TsStreamer { 40 private static final String TAG = "TunerTsStreamer"; 41 42 private static final int MIN_READ_UNIT = 1500; 43 private static final int READ_BUFFER_SIZE = MIN_READ_UNIT * 10; // ~15KB 44 private static final int CIRCULAR_BUFFER_SIZE = MIN_READ_UNIT * 20000; // ~ 30MB 45 46 private static final int READ_TIMEOUT_MS = 5000; // 5 secs. 47 private static final int BUFFER_UNDERRUN_SLEEP_MS = 10; 48 49 private final Object mCircularBufferMonitor = new Object(); 50 private final byte[] mCircularBuffer = new byte[CIRCULAR_BUFFER_SIZE]; 51 private long mBytesFetched; 52 private final AtomicLong mLastReadPosition = new AtomicLong(); 53 private boolean mEndOfStreamSent; 54 private boolean mStreaming; 55 56 private final TunerHal mTunerHal; 57 private TunerChannel mChannel; 58 private Thread mStreamingThread; 59 private final EventDetector mEventDetector; 60 61 private final TsStreamWriter mTsStreamWriter; 62 63 public static class TunerDataSource extends TsDataSource { 64 private final TunerTsStreamer mTsStreamer; 65 private final AtomicLong mLastReadPosition = new AtomicLong(0); 66 private long mStartBufferedPosition; 67 TunerDataSource(TunerTsStreamer tsStreamer)68 private TunerDataSource(TunerTsStreamer tsStreamer) { 69 mTsStreamer = tsStreamer; 70 mStartBufferedPosition = tsStreamer.getBufferedPosition(); 71 } 72 73 @Override getBufferedPosition()74 public long getBufferedPosition() { 75 return mTsStreamer.getBufferedPosition() - mStartBufferedPosition; 76 } 77 78 @Override getLastReadPosition()79 public long getLastReadPosition() { 80 return mLastReadPosition.get(); 81 } 82 83 @Override shiftStartPosition(long offset)84 public void shiftStartPosition(long offset) { 85 SoftPreconditions.checkState(mLastReadPosition.get() == 0); 86 SoftPreconditions.checkArgument(0 <= offset && offset <= getBufferedPosition()); 87 mStartBufferedPosition += offset; 88 } 89 90 @Override open(DataSpec dataSpec)91 public long open(DataSpec dataSpec) throws IOException { 92 mLastReadPosition.set(0); 93 return C.LENGTH_UNBOUNDED; 94 } 95 96 @Override close()97 public void close() { 98 } 99 100 @Override read(byte[] buffer, int offset, int readLength)101 public int read(byte[] buffer, int offset, int readLength) throws IOException { 102 int ret = mTsStreamer.readAt(mStartBufferedPosition + mLastReadPosition.get(), buffer, 103 offset, readLength); 104 if (ret > 0) { 105 mLastReadPosition.addAndGet(ret); 106 } 107 return ret; 108 } 109 } 110 /** 111 * Creates {@link TsStreamer} for playing or recording the specified channel. 112 * @param tunerHal the HAL for tuner device 113 * @param eventListener the listener for channel & program information 114 */ TunerTsStreamer(TunerHal tunerHal, EventListener eventListener, Context context)115 public TunerTsStreamer(TunerHal tunerHal, EventListener eventListener, Context context) { 116 mTunerHal = tunerHal; 117 mEventDetector = new EventDetector(mTunerHal, eventListener); 118 mTsStreamWriter = context != null && TunerPreferences.getStoreTsStream(context) ? 119 new TsStreamWriter(context) : null; 120 } 121 TunerTsStreamer(TunerHal tunerHal, EventListener eventListener)122 public TunerTsStreamer(TunerHal tunerHal, EventListener eventListener) { 123 this(tunerHal, eventListener, null); 124 } 125 126 @Override startStream(TunerChannel channel)127 public boolean startStream(TunerChannel channel) { 128 if (mTunerHal.tune(channel.getFrequency(), channel.getModulation())) { 129 if (channel.hasVideo()) { 130 mTunerHal.addPidFilter(channel.getVideoPid(), 131 TunerHal.FILTER_TYPE_VIDEO); 132 } 133 boolean audioFilterSet = false; 134 for (Integer audioPid : channel.getAudioPids()) { 135 if (!audioFilterSet) { 136 mTunerHal.addPidFilter(audioPid, TunerHal.FILTER_TYPE_AUDIO); 137 audioFilterSet = true; 138 } else { 139 // FILTER_TYPE_AUDIO overrides the previous filter for audio. We use 140 // FILTER_TYPE_OTHER from the secondary one to get the all audio tracks. 141 mTunerHal.addPidFilter(audioPid, TunerHal.FILTER_TYPE_OTHER); 142 } 143 } 144 mTunerHal.addPidFilter(channel.getPcrPid(), 145 TunerHal.FILTER_TYPE_PCR); 146 if (mEventDetector != null) { 147 mEventDetector.startDetecting(channel.getFrequency(), channel.getModulation(), 148 channel.getProgramNumber()); 149 } 150 mChannel = channel; 151 synchronized (mCircularBufferMonitor) { 152 if (mStreaming) { 153 Log.w(TAG, "Streaming should be stopped before start streaming"); 154 return true; 155 } 156 mStreaming = true; 157 mBytesFetched = 0; 158 mLastReadPosition.set(0L); 159 mEndOfStreamSent = false; 160 } 161 if (mTsStreamWriter != null) { 162 mTsStreamWriter.setChannel(mChannel); 163 mTsStreamWriter.openFile(); 164 } 165 mStreamingThread = new StreamingThread(); 166 mStreamingThread.start(); 167 Log.i(TAG, "Streaming started"); 168 return true; 169 } 170 return false; 171 } 172 173 @Override startStream(ChannelScanFileParser.ScanChannel channel)174 public boolean startStream(ChannelScanFileParser.ScanChannel channel) { 175 if (mTunerHal.tune(channel.frequency, channel.modulation)) { 176 mEventDetector.startDetecting( 177 channel.frequency, channel.modulation, EventDetector.ALL_PROGRAM_NUMBERS); 178 synchronized (mCircularBufferMonitor) { 179 if (mStreaming) { 180 Log.w(TAG, "Streaming should be stopped before start streaming"); 181 return true; 182 } 183 mStreaming = true; 184 mBytesFetched = 0; 185 mLastReadPosition.set(0L); 186 mEndOfStreamSent = false; 187 } 188 mStreamingThread = new StreamingThread(); 189 mStreamingThread.start(); 190 Log.i(TAG, "Streaming started"); 191 return true; 192 } 193 return false; 194 } 195 196 /** 197 * Blocks the current thread until the streaming thread stops. In rare cases when the tuner 198 * device is overloaded this can take a while, but usually it returns pretty quickly. 199 */ 200 @Override stopStream()201 public void stopStream() { 202 mChannel = null; 203 synchronized (mCircularBufferMonitor) { 204 mStreaming = false; 205 mCircularBufferMonitor.notifyAll(); 206 } 207 208 try { 209 if (mStreamingThread != null) { 210 mStreamingThread.join(); 211 } 212 } catch (InterruptedException e) { 213 Thread.currentThread().interrupt(); 214 } 215 if (mTsStreamWriter != null) { 216 mTsStreamWriter.closeFile(true); 217 mTsStreamWriter.setChannel(null); 218 } 219 } 220 221 @Override createDataSource()222 public TsDataSource createDataSource() { 223 return new TunerDataSource(this); 224 } 225 226 /** 227 * Returns incomplete channel lists which was scanned so far. Incomplete channel means 228 * the channel whose channel information is not complete or is not well-formed. 229 * @return {@link List} of {@link TunerChannel} 230 */ getMalFormedChannels()231 public List<TunerChannel> getMalFormedChannels() { 232 return mEventDetector.getMalFormedChannels(); 233 } 234 235 /** 236 * Returns the current {@link TunerHal} which provides MPEG-TS stream for TunerTsStreamer. 237 * @return {@link TunerHal} 238 */ getTunerHal()239 public TunerHal getTunerHal() { 240 return mTunerHal; 241 } 242 243 /** 244 * Returns the current tuned channel for TunerTsStreamer. 245 * @return {@link TunerChannel} 246 */ getChannel()247 public TunerChannel getChannel() { 248 return mChannel; 249 } 250 251 /** 252 * Returns the current buffered position from tuner. 253 * @return the current buffered position 254 */ getBufferedPosition()255 public long getBufferedPosition() { 256 synchronized (mCircularBufferMonitor) { 257 return mBytesFetched; 258 } 259 } 260 261 private class StreamingThread extends Thread { 262 @Override run()263 public void run() { 264 // Buffers for streaming data from the tuner and the internal buffer. 265 byte[] dataBuffer = new byte[READ_BUFFER_SIZE]; 266 267 while (true) { 268 synchronized (mCircularBufferMonitor) { 269 if (!mStreaming) { 270 break; 271 } 272 } 273 274 int bytesWritten = mTunerHal.readTsStream(dataBuffer, dataBuffer.length); 275 if (bytesWritten <= 0) { 276 try { 277 // When buffer is underrun, we sleep for short time to prevent 278 // unnecessary CPU draining. 279 sleep(BUFFER_UNDERRUN_SLEEP_MS); 280 } catch (InterruptedException e) { 281 Thread.currentThread().interrupt(); 282 } 283 continue; 284 } 285 286 if (mTsStreamWriter != null) { 287 mTsStreamWriter.writeToFile(dataBuffer, bytesWritten); 288 } 289 290 if (mEventDetector != null) { 291 mEventDetector.feedTSStream(dataBuffer, 0, bytesWritten); 292 } 293 synchronized (mCircularBufferMonitor) { 294 int posInBuffer = (int) (mBytesFetched % CIRCULAR_BUFFER_SIZE); 295 int bytesToCopyInFirstPass = bytesWritten; 296 if (posInBuffer + bytesToCopyInFirstPass > mCircularBuffer.length) { 297 bytesToCopyInFirstPass = mCircularBuffer.length - posInBuffer; 298 } 299 System.arraycopy(dataBuffer, 0, mCircularBuffer, posInBuffer, 300 bytesToCopyInFirstPass); 301 if (bytesToCopyInFirstPass < bytesWritten) { 302 System.arraycopy(dataBuffer, bytesToCopyInFirstPass, mCircularBuffer, 0, 303 bytesWritten - bytesToCopyInFirstPass); 304 } 305 mBytesFetched += bytesWritten; 306 mCircularBufferMonitor.notifyAll(); 307 } 308 } 309 310 Log.i(TAG, "Streaming stopped"); 311 } 312 } 313 314 /** 315 * Reads data from internal buffer. 316 * @param pos the position to read from 317 * @param buffer to read 318 * @param offset start position of the read buffer 319 * @param amount number of bytes to read 320 * @return number of read bytes when successful, {@code -1} otherwise 321 * @throws IOException 322 */ readAt(long pos, byte[] buffer, int offset, int amount)323 public int readAt(long pos, byte[] buffer, int offset, int amount) throws IOException { 324 long readStartTime = System.currentTimeMillis(); 325 while (true) { 326 synchronized (mCircularBufferMonitor) { 327 if (mEndOfStreamSent || !mStreaming) { 328 return -1; 329 } 330 if (mBytesFetched - CIRCULAR_BUFFER_SIZE > pos) { 331 Log.e(TAG, "Demux is requesting the data which is already overwritten."); 332 return -1; 333 } 334 if (System.currentTimeMillis() - readStartTime > READ_TIMEOUT_MS) { 335 // Nothing was received during READ_TIMEOUT_MS before. 336 mEndOfStreamSent = true; 337 mCircularBufferMonitor.notifyAll(); 338 return -1; 339 } 340 if (mBytesFetched < pos + amount) { 341 try { 342 mCircularBufferMonitor.wait(READ_TIMEOUT_MS); 343 } catch (InterruptedException e) { 344 Thread.currentThread().interrupt(); 345 } 346 // Try again to prevent starvation. 347 // Give chances to read from other threads. 348 continue; 349 } 350 int startPos = (int) (pos % CIRCULAR_BUFFER_SIZE); 351 int endPos = (int) ((pos + amount) % CIRCULAR_BUFFER_SIZE); 352 int firstLength = (startPos > endPos ? CIRCULAR_BUFFER_SIZE : endPos) - startPos; 353 System.arraycopy(mCircularBuffer, startPos, buffer, offset, firstLength); 354 if (firstLength < amount) { 355 System.arraycopy(mCircularBuffer, 0, buffer, offset + firstLength, 356 amount - firstLength); 357 } 358 mCircularBufferMonitor.notifyAll(); 359 return amount; 360 } 361 } 362 } 363 } 364