1 /* 2 * Copyright (C) 2015 The Android Open Source Project 3 * 4 * Licensed under the Apache License, Version 2.0 (the "License"); 5 * you may not use this file except in compliance with the License. 6 * You may obtain a copy of the License at 7 * 8 * http://www.apache.org/licenses/LICENSE-2.0 9 * 10 * Unless required by applicable law or agreed to in writing, software 11 * distributed under the License is distributed on an "AS IS" BASIS, 12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13 * See the License for the specific language governing permissions and 14 * limitations under the License. 15 */ 16 17 package com.android.tv.tuner.source; 18 19 import android.content.Context; 20 import android.net.Uri; 21 import android.support.annotation.Nullable; 22 import android.util.Log; 23 import android.util.Pair; 24 25 import com.android.tv.common.SoftPreconditions; 26 import com.android.tv.tuner.api.ScanChannel; 27 import com.android.tv.tuner.api.Tuner; 28 import com.android.tv.tuner.data.TunerChannel; 29 import com.android.tv.tuner.prefs.TunerPreferences; 30 import com.android.tv.tuner.ts.EventDetector; 31 import com.android.tv.tuner.ts.EventDetector.EventListener; 32 33 import com.google.android.exoplayer2.C; 34 import com.google.android.exoplayer2.upstream.DataSpec; 35 import com.google.android.exoplayer2.upstream.TransferListener; 36 import java.io.IOException; 37 import java.util.ArrayList; 38 import java.util.List; 39 import java.util.concurrent.atomic.AtomicLong; 40 41 /** Provides MPEG-2 TS stream sources for channel playing from an underlying tuner device. */ 42 public class TunerTsStreamer implements TsStreamer { 43 private static final String TAG = "TunerTsStreamer"; 44 45 private static final int MIN_READ_UNIT = 1500; 46 private static final int READ_BUFFER_SIZE = MIN_READ_UNIT * 10; // ~15KB 47 private static final int CIRCULAR_BUFFER_SIZE = MIN_READ_UNIT * 20000; // ~ 30MB 48 private static final int TS_PACKET_SIZE = 188; 49 50 private static final int READ_TIMEOUT_MS = 5000; // 5 secs. 51 private static final int BUFFER_UNDERRUN_SLEEP_MS = 10; 52 private static final int READ_ERROR_STREAMING_ENDED = -1; 53 private static final int READ_ERROR_BUFFER_OVERWRITTEN = -2; 54 55 private final Object mCircularBufferMonitor = new Object(); 56 private final byte[] mCircularBuffer = new byte[CIRCULAR_BUFFER_SIZE]; 57 private long mBytesFetched; 58 private final AtomicLong mLastReadPosition = new AtomicLong(); 59 private boolean mStreaming; 60 61 private final Tuner mTunerHal; 62 private TunerChannel mChannel; 63 private Thread mStreamingThread; 64 private final EventDetector mEventDetector; 65 private final List<Pair<EventListener, Boolean>> mEventListenerActions = new ArrayList<>(); 66 67 private final TsStreamWriter mTsStreamWriter; 68 private String mChannelNumber; 69 70 public static class TunerDataSource extends TsDataSource { 71 private final TunerTsStreamer mTsStreamer; 72 private final AtomicLong mLastReadPosition = new AtomicLong(0); 73 private long mStartBufferedPosition; 74 private Uri mUri; 75 TunerDataSource(TunerTsStreamer tsStreamer)76 private TunerDataSource(TunerTsStreamer tsStreamer) { 77 mTsStreamer = tsStreamer; 78 mStartBufferedPosition = tsStreamer.getBufferedPosition(); 79 } 80 81 @Override getBufferedPosition()82 public long getBufferedPosition() { 83 return mTsStreamer.getBufferedPosition() - mStartBufferedPosition; 84 } 85 86 @Override getLastReadPosition()87 public long getLastReadPosition() { 88 return mLastReadPosition.get(); 89 } 90 91 @Override shiftStartPosition(long offset)92 public void shiftStartPosition(long offset) { 93 SoftPreconditions.checkState(mLastReadPosition.get() == 0); 94 SoftPreconditions.checkArgument(0 <= offset && offset <= getBufferedPosition()); 95 mStartBufferedPosition += offset; 96 } 97 98 @Override open(DataSpec dataSpec)99 public long open(DataSpec dataSpec) { 100 mUri = dataSpec.uri; 101 mLastReadPosition.set(0); 102 return C.LENGTH_UNSET; 103 } 104 105 @Override close()106 public void close() { 107 mUri = null; 108 } 109 110 @Override read(byte[] buffer, int offset, int readLength)111 public int read(byte[] buffer, int offset, int readLength) throws IOException { 112 int ret = 113 mTsStreamer.readAt( 114 mStartBufferedPosition + mLastReadPosition.get(), 115 buffer, 116 offset, 117 readLength); 118 if (ret > 0) { 119 mLastReadPosition.addAndGet(ret); 120 } else if (ret == READ_ERROR_BUFFER_OVERWRITTEN) { 121 long currentPosition = mStartBufferedPosition + mLastReadPosition.get(); 122 long endPosition = mTsStreamer.getBufferedPosition(); 123 long diff = 124 ((endPosition - currentPosition + TS_PACKET_SIZE - 1) / TS_PACKET_SIZE) 125 * TS_PACKET_SIZE; 126 Log.w(TAG, "Demux position jump by overwritten buffer: " + diff); 127 mStartBufferedPosition = currentPosition + diff; 128 mLastReadPosition.set(0); 129 return 0; 130 } 131 return ret; 132 } 133 134 @Override getSignalStrength()135 public int getSignalStrength() { 136 return mTsStreamer.getSignalStrength(); 137 } 138 139 @Override addTransferListener(TransferListener transferListener)140 public void addTransferListener(TransferListener transferListener) { 141 // TODO: Implement to support metrics collection. 142 } 143 144 @Nullable 145 @Override getUri()146 public Uri getUri() { 147 return mUri; 148 } 149 150 } 151 /** 152 * Creates {@link TsStreamer} for playing or recording the specified channel. 153 * 154 * @param tunerHal the HAL for tuner device 155 * @param eventListener the listener for channel & program information 156 */ TunerTsStreamer(Tuner tunerHal, EventListener eventListener, Context context)157 public TunerTsStreamer(Tuner tunerHal, EventListener eventListener, Context context) { 158 mTunerHal = tunerHal; 159 mEventDetector = new EventDetector(mTunerHal); 160 if (eventListener != null) { 161 mEventDetector.registerListener(eventListener); 162 } 163 mTsStreamWriter = 164 context != null && TunerPreferences.getStoreTsStream(context) 165 ? new TsStreamWriter(context) 166 : null; 167 } 168 TunerTsStreamer(Tuner tunerHal, EventListener eventListener)169 public TunerTsStreamer(Tuner tunerHal, EventListener eventListener) { 170 this(tunerHal, eventListener, null); 171 } 172 173 @Override startStream(TunerChannel channel)174 public boolean startStream(TunerChannel channel) { 175 if (mTunerHal.tune( 176 channel.getDeliverySystemType().getNumber(), channel.getFrequency(), 177 channel.getModulation(), channel.getDisplayNumber(false))) { 178 if (channel.hasVideo()) { 179 mTunerHal.addPidFilter(channel.getVideoPid(), Tuner.FILTER_TYPE_VIDEO); 180 } 181 boolean audioFilterSet = false; 182 for (Integer audioPid : channel.getAudioPids()) { 183 if (!audioFilterSet) { 184 mTunerHal.addPidFilter(audioPid, Tuner.FILTER_TYPE_AUDIO); 185 audioFilterSet = true; 186 } else { 187 // FILTER_TYPE_AUDIO overrides the previous filter for audio. We use 188 // FILTER_TYPE_OTHER from the secondary one to get the all audio tracks. 189 mTunerHal.addPidFilter(audioPid, Tuner.FILTER_TYPE_OTHER); 190 } 191 } 192 mTunerHal.addPidFilter(channel.getPcrPid(), Tuner.FILTER_TYPE_PCR); 193 if (mEventDetector != null) { 194 mEventDetector.startDetecting( 195 channel.getDeliverySystemType(), 196 channel.getFrequency(), 197 channel.getModulation(), 198 channel.getProgramNumber()); 199 } 200 mChannel = channel; 201 mChannelNumber = channel.getDisplayNumber(); 202 synchronized (mCircularBufferMonitor) { 203 if (mStreaming) { 204 Log.w(TAG, "Streaming should be stopped before start streaming"); 205 return true; 206 } 207 mStreaming = true; 208 mBytesFetched = 0; 209 mLastReadPosition.set(0L); 210 } 211 if (mTsStreamWriter != null) { 212 mTsStreamWriter.setChannel(mChannel); 213 mTsStreamWriter.openFile(); 214 } 215 mStreamingThread = new StreamingThread(); 216 mStreamingThread.start(); 217 Log.i(TAG, "Streaming started"); 218 return true; 219 } 220 return false; 221 } 222 223 @Override startStream(ScanChannel channel)224 public boolean startStream(ScanChannel channel) { 225 if (mTunerHal.tune(channel.deliverySystemType.getNumber(), channel.frequency, 226 channel.modulation, null)) { 227 mEventDetector.startDetecting( 228 channel.deliverySystemType, channel.frequency, channel.modulation, 229 EventDetector.ALL_PROGRAM_NUMBERS); 230 synchronized (mCircularBufferMonitor) { 231 if (mStreaming) { 232 Log.w(TAG, "Streaming should be stopped before start streaming"); 233 return true; 234 } 235 mStreaming = true; 236 mBytesFetched = 0; 237 mLastReadPosition.set(0L); 238 } 239 mStreamingThread = new StreamingThread(); 240 mStreamingThread.start(); 241 Log.i(TAG, "Streaming started"); 242 return true; 243 } 244 return false; 245 } 246 247 /** 248 * Blocks the current thread until the streaming thread stops. In rare cases when the tuner 249 * device is overloaded this can take a while, but usually it returns pretty quickly. 250 */ 251 @Override stopStream()252 public void stopStream() { 253 mChannel = null; 254 synchronized (mCircularBufferMonitor) { 255 mStreaming = false; 256 mCircularBufferMonitor.notifyAll(); 257 } 258 259 try { 260 if (mStreamingThread != null) { 261 mStreamingThread.join(); 262 } 263 } catch (InterruptedException e) { 264 Thread.currentThread().interrupt(); 265 } 266 if (mTsStreamWriter != null) { 267 mTsStreamWriter.closeFile(true); 268 mTsStreamWriter.setChannel(null); 269 } 270 } 271 272 @Override createDataSource()273 public TsDataSource createDataSource() { 274 return new TunerDataSource(this); 275 } 276 277 /** 278 * Returns incomplete channel lists which was scanned so far. Incomplete channel means the 279 * channel whose channel information is not complete or is not well-formed. 280 * 281 * @return {@link List} of {@link TunerChannel} 282 */ getMalFormedChannels()283 public List<TunerChannel> getMalFormedChannels() { 284 return mEventDetector.getMalFormedChannels(); 285 } 286 287 /** 288 * Returns the current {@link Tuner} which provides MPEG-TS stream for TunerTsStreamer. 289 * 290 * @return {@link Tuner} 291 */ getTunerHal()292 public Tuner getTunerHal() { 293 return mTunerHal; 294 } 295 296 /** 297 * Returns the current tuned channel for TunerTsStreamer. 298 * 299 * @return {@link TunerChannel} 300 */ getChannel()301 public TunerChannel getChannel() { 302 return mChannel; 303 } 304 305 /** 306 * Returns the current buffered position from tuner. 307 * 308 * @return the current buffered position 309 */ getBufferedPosition()310 public long getBufferedPosition() { 311 synchronized (mCircularBufferMonitor) { 312 return mBytesFetched; 313 } 314 } 315 getStreamerInfo()316 public String getStreamerInfo() { 317 return "Channel: " + mChannelNumber + ", Streaming: " + mStreaming; 318 } 319 registerListener(EventListener listener)320 public void registerListener(EventListener listener) { 321 if (mEventDetector != null && listener != null) { 322 synchronized (mEventListenerActions) { 323 mEventListenerActions.add(Pair.create(listener, true)); 324 } 325 } 326 } 327 unregisterListener(EventListener listener)328 public void unregisterListener(EventListener listener) { 329 if (mEventDetector != null) { 330 synchronized (mEventListenerActions) { 331 mEventListenerActions.add(new Pair(listener, false)); 332 } 333 } 334 } 335 getSignalStrength()336 public int getSignalStrength() { 337 return mTunerHal.getSignalStrength(); 338 } 339 340 private class StreamingThread extends Thread { 341 @Override run()342 public void run() { 343 // Buffers for streaming data from the tuner and the internal buffer. 344 byte[] dataBuffer = new byte[READ_BUFFER_SIZE]; 345 346 while (true) { 347 synchronized (mCircularBufferMonitor) { 348 if (!mStreaming) { 349 break; 350 } 351 } 352 353 if (mEventDetector != null) { 354 synchronized (mEventListenerActions) { 355 for (Pair listenerAction : mEventListenerActions) { 356 EventListener listener = (EventListener) listenerAction.first; 357 if ((boolean) listenerAction.second) { 358 mEventDetector.registerListener(listener); 359 } else { 360 mEventDetector.unregisterListener(listener); 361 } 362 } 363 mEventListenerActions.clear(); 364 } 365 } 366 367 int bytesWritten = mTunerHal.readTsStream(dataBuffer, dataBuffer.length); 368 if (bytesWritten <= 0) { 369 try { 370 // When buffer is underrun, we sleep for short time to prevent 371 // unnecessary CPU draining. 372 sleep(BUFFER_UNDERRUN_SLEEP_MS); 373 } catch (InterruptedException e) { 374 Thread.currentThread().interrupt(); 375 } 376 continue; 377 } 378 379 if (mTsStreamWriter != null) { 380 mTsStreamWriter.writeToFile(dataBuffer, bytesWritten); 381 } 382 383 if (mEventDetector != null) { 384 mEventDetector.feedTSStream(dataBuffer, 0, bytesWritten); 385 } 386 synchronized (mCircularBufferMonitor) { 387 int posInBuffer = (int) (mBytesFetched % CIRCULAR_BUFFER_SIZE); 388 int bytesToCopyInFirstPass = bytesWritten; 389 if (posInBuffer + bytesToCopyInFirstPass > mCircularBuffer.length) { 390 bytesToCopyInFirstPass = mCircularBuffer.length - posInBuffer; 391 } 392 System.arraycopy( 393 dataBuffer, 0, mCircularBuffer, posInBuffer, bytesToCopyInFirstPass); 394 if (bytesToCopyInFirstPass < bytesWritten) { 395 System.arraycopy( 396 dataBuffer, 397 bytesToCopyInFirstPass, 398 mCircularBuffer, 399 0, 400 bytesWritten - bytesToCopyInFirstPass); 401 } 402 mBytesFetched += bytesWritten; 403 mCircularBufferMonitor.notifyAll(); 404 } 405 } 406 407 Log.i(TAG, "Streaming stopped"); 408 } 409 } 410 411 /** 412 * Reads data from internal buffer. 413 * 414 * @param pos the position to read from 415 * @param buffer to read 416 * @param offset start position of the read buffer 417 * @param amount number of bytes to read 418 * @return number of read bytes when successful, {@code -1} otherwise 419 * @throws IOException 420 */ readAt(long pos, byte[] buffer, int offset, int amount)421 public int readAt(long pos, byte[] buffer, int offset, int amount) throws IOException { 422 while (true) { 423 synchronized (mCircularBufferMonitor) { 424 if (!mStreaming) { 425 return READ_ERROR_STREAMING_ENDED; 426 } 427 if (mBytesFetched - CIRCULAR_BUFFER_SIZE > pos) { 428 Log.w(TAG, "Demux is requesting the data which is already overwritten."); 429 return READ_ERROR_BUFFER_OVERWRITTEN; 430 } 431 if (mBytesFetched < pos + amount) { 432 try { 433 mCircularBufferMonitor.wait(READ_TIMEOUT_MS); 434 } catch (InterruptedException e) { 435 Thread.currentThread().interrupt(); 436 } 437 // Try again to prevent starvation. 438 // Give chances to read from other threads. 439 continue; 440 } 441 int startPos = (int) (pos % CIRCULAR_BUFFER_SIZE); 442 int endPos = (int) ((pos + amount) % CIRCULAR_BUFFER_SIZE); 443 int firstLength = (startPos > endPos ? CIRCULAR_BUFFER_SIZE : endPos) - startPos; 444 System.arraycopy(mCircularBuffer, startPos, buffer, offset, firstLength); 445 if (firstLength < amount) { 446 System.arraycopy( 447 mCircularBuffer, 0, buffer, offset + firstLength, amount - firstLength); 448 } 449 mCircularBufferMonitor.notifyAll(); 450 return amount; 451 } 452 } 453 } 454 } 455