1 /*
2  * Copyright (C) 2015 The Android Open Source Project
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *      http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 package com.android.tv.tuner.source;
18 
19 import android.content.Context;
20 import android.util.Log;
21 
22 import com.google.android.exoplayer.C;
23 import com.google.android.exoplayer.upstream.DataSpec;
24 import com.android.tv.common.SoftPreconditions;
25 import com.android.tv.tuner.ChannelScanFileParser;
26 import com.android.tv.tuner.TunerHal;
27 import com.android.tv.tuner.TunerPreferences;
28 import com.android.tv.tuner.data.TunerChannel;
29 import com.android.tv.tuner.tvinput.EventDetector;
30 import com.android.tv.tuner.tvinput.EventDetector.EventListener;
31 
32 import java.io.IOException;
33 import java.util.List;
34 import java.util.concurrent.atomic.AtomicLong;
35 
36 /**
37  * Provides MPEG-2 TS stream sources for channel playing from an underlying tuner device.
38  */
39 public class TunerTsStreamer implements TsStreamer {
40     private static final String TAG = "TunerTsStreamer";
41 
42     private static final int MIN_READ_UNIT = 1500;
43     private static final int READ_BUFFER_SIZE = MIN_READ_UNIT * 10; // ~15KB
44     private static final int CIRCULAR_BUFFER_SIZE = MIN_READ_UNIT * 20000;  // ~ 30MB
45 
46     private static final int READ_TIMEOUT_MS = 5000; // 5 secs.
47     private static final int BUFFER_UNDERRUN_SLEEP_MS = 10;
48 
49     private final Object mCircularBufferMonitor = new Object();
50     private final byte[] mCircularBuffer = new byte[CIRCULAR_BUFFER_SIZE];
51     private long mBytesFetched;
52     private final AtomicLong mLastReadPosition = new AtomicLong();
53     private boolean mEndOfStreamSent;
54     private boolean mStreaming;
55 
56     private final TunerHal mTunerHal;
57     private TunerChannel mChannel;
58     private Thread mStreamingThread;
59     private final EventDetector mEventDetector;
60 
61     private final TsStreamWriter mTsStreamWriter;
62 
63     public static class TunerDataSource extends TsDataSource {
64         private final TunerTsStreamer mTsStreamer;
65         private final AtomicLong mLastReadPosition = new AtomicLong(0);
66         private long mStartBufferedPosition;
67 
TunerDataSource(TunerTsStreamer tsStreamer)68         private TunerDataSource(TunerTsStreamer tsStreamer) {
69             mTsStreamer = tsStreamer;
70             mStartBufferedPosition = tsStreamer.getBufferedPosition();
71         }
72 
73         @Override
getBufferedPosition()74         public long getBufferedPosition() {
75             return mTsStreamer.getBufferedPosition() - mStartBufferedPosition;
76         }
77 
78         @Override
getLastReadPosition()79         public long getLastReadPosition() {
80             return mLastReadPosition.get();
81         }
82 
83         @Override
shiftStartPosition(long offset)84         public void shiftStartPosition(long offset) {
85             SoftPreconditions.checkState(mLastReadPosition.get() == 0);
86             SoftPreconditions.checkArgument(0 <= offset && offset <= getBufferedPosition());
87             mStartBufferedPosition += offset;
88         }
89 
90         @Override
open(DataSpec dataSpec)91         public long open(DataSpec dataSpec) throws IOException {
92             mLastReadPosition.set(0);
93             return C.LENGTH_UNBOUNDED;
94         }
95 
96         @Override
close()97         public void close() {
98         }
99 
100         @Override
read(byte[] buffer, int offset, int readLength)101         public int read(byte[] buffer, int offset, int readLength) throws IOException {
102             int ret = mTsStreamer.readAt(mStartBufferedPosition + mLastReadPosition.get(), buffer,
103                     offset, readLength);
104             if (ret > 0) {
105                 mLastReadPosition.addAndGet(ret);
106             }
107             return ret;
108         }
109     }
110     /**
111      * Creates {@link TsStreamer} for playing or recording the specified channel.
112      * @param tunerHal the HAL for tuner device
113      * @param eventListener the listener for channel & program information
114      */
TunerTsStreamer(TunerHal tunerHal, EventListener eventListener, Context context)115     public TunerTsStreamer(TunerHal tunerHal, EventListener eventListener, Context context) {
116         mTunerHal = tunerHal;
117         mEventDetector = new EventDetector(mTunerHal, eventListener);
118         mTsStreamWriter = context != null && TunerPreferences.getStoreTsStream(context) ?
119                 new TsStreamWriter(context) : null;
120     }
121 
TunerTsStreamer(TunerHal tunerHal, EventListener eventListener)122     public TunerTsStreamer(TunerHal tunerHal, EventListener eventListener) {
123         this(tunerHal, eventListener, null);
124     }
125 
126     @Override
startStream(TunerChannel channel)127     public boolean startStream(TunerChannel channel) {
128         if (mTunerHal.tune(channel.getFrequency(), channel.getModulation())) {
129             if (channel.hasVideo()) {
130                 mTunerHal.addPidFilter(channel.getVideoPid(),
131                         TunerHal.FILTER_TYPE_VIDEO);
132             }
133             boolean audioFilterSet = false;
134             for (Integer audioPid : channel.getAudioPids()) {
135                 if (!audioFilterSet) {
136                     mTunerHal.addPidFilter(audioPid, TunerHal.FILTER_TYPE_AUDIO);
137                     audioFilterSet = true;
138                 } else {
139                     // FILTER_TYPE_AUDIO overrides the previous filter for audio. We use
140                     // FILTER_TYPE_OTHER from the secondary one to get the all audio tracks.
141                     mTunerHal.addPidFilter(audioPid, TunerHal.FILTER_TYPE_OTHER);
142                 }
143             }
144             mTunerHal.addPidFilter(channel.getPcrPid(),
145                     TunerHal.FILTER_TYPE_PCR);
146             if (mEventDetector != null) {
147                 mEventDetector.startDetecting(channel.getFrequency(), channel.getModulation(),
148                         channel.getProgramNumber());
149             }
150             mChannel = channel;
151             synchronized (mCircularBufferMonitor) {
152                 if (mStreaming) {
153                     Log.w(TAG, "Streaming should be stopped before start streaming");
154                     return true;
155                 }
156                 mStreaming = true;
157                 mBytesFetched = 0;
158                 mLastReadPosition.set(0L);
159                 mEndOfStreamSent = false;
160             }
161             if (mTsStreamWriter != null) {
162                 mTsStreamWriter.setChannel(mChannel);
163                 mTsStreamWriter.openFile();
164             }
165             mStreamingThread = new StreamingThread();
166             mStreamingThread.start();
167             Log.i(TAG, "Streaming started");
168             return true;
169         }
170         return false;
171     }
172 
173     @Override
startStream(ChannelScanFileParser.ScanChannel channel)174     public boolean startStream(ChannelScanFileParser.ScanChannel channel) {
175         if (mTunerHal.tune(channel.frequency, channel.modulation)) {
176             mEventDetector.startDetecting(
177                     channel.frequency, channel.modulation, EventDetector.ALL_PROGRAM_NUMBERS);
178             synchronized (mCircularBufferMonitor) {
179                 if (mStreaming) {
180                     Log.w(TAG, "Streaming should be stopped before start streaming");
181                     return true;
182                 }
183                 mStreaming = true;
184                 mBytesFetched = 0;
185                 mLastReadPosition.set(0L);
186                 mEndOfStreamSent = false;
187             }
188             mStreamingThread = new StreamingThread();
189             mStreamingThread.start();
190             Log.i(TAG, "Streaming started");
191             return true;
192         }
193         return false;
194     }
195 
196     /**
197      * Blocks the current thread until the streaming thread stops. In rare cases when the tuner
198      * device is overloaded this can take a while, but usually it returns pretty quickly.
199      */
200     @Override
stopStream()201     public void stopStream() {
202         mChannel = null;
203         synchronized (mCircularBufferMonitor) {
204             mStreaming = false;
205             mCircularBufferMonitor.notifyAll();
206         }
207 
208         try {
209             if (mStreamingThread != null) {
210                 mStreamingThread.join();
211             }
212         } catch (InterruptedException e) {
213             Thread.currentThread().interrupt();
214         }
215         if (mTsStreamWriter != null) {
216             mTsStreamWriter.closeFile(true);
217             mTsStreamWriter.setChannel(null);
218         }
219     }
220 
221     @Override
createDataSource()222     public TsDataSource createDataSource() {
223         return new TunerDataSource(this);
224     }
225 
226     /**
227      * Returns incomplete channel lists which was scanned so far. Incomplete channel means
228      * the channel whose channel information is not complete or is not well-formed.
229      * @return {@link List} of {@link TunerChannel}
230      */
getMalFormedChannels()231     public List<TunerChannel> getMalFormedChannels() {
232         return mEventDetector.getMalFormedChannels();
233     }
234 
235     /**
236      * Returns the current {@link TunerHal} which provides MPEG-TS stream for TunerTsStreamer.
237      * @return {@link TunerHal}
238      */
getTunerHal()239     public TunerHal getTunerHal() {
240         return mTunerHal;
241     }
242 
243     /**
244      * Returns the current tuned channel for TunerTsStreamer.
245      * @return {@link TunerChannel}
246      */
getChannel()247     public TunerChannel getChannel() {
248         return mChannel;
249     }
250 
251     /**
252      * Returns the current buffered position from tuner.
253      * @return the current buffered position
254      */
getBufferedPosition()255     public long getBufferedPosition() {
256         synchronized (mCircularBufferMonitor) {
257             return mBytesFetched;
258         }
259     }
260 
261     private class StreamingThread extends Thread {
262         @Override
run()263         public void run() {
264             // Buffers for streaming data from the tuner and the internal buffer.
265             byte[] dataBuffer = new byte[READ_BUFFER_SIZE];
266 
267             while (true) {
268                 synchronized (mCircularBufferMonitor) {
269                     if (!mStreaming) {
270                         break;
271                     }
272                 }
273 
274                 int bytesWritten = mTunerHal.readTsStream(dataBuffer, dataBuffer.length);
275                 if (bytesWritten <= 0) {
276                     try {
277                         // When buffer is underrun, we sleep for short time to prevent
278                         // unnecessary CPU draining.
279                         sleep(BUFFER_UNDERRUN_SLEEP_MS);
280                     } catch (InterruptedException e) {
281                         Thread.currentThread().interrupt();
282                     }
283                     continue;
284                 }
285 
286                 if (mTsStreamWriter != null) {
287                     mTsStreamWriter.writeToFile(dataBuffer, bytesWritten);
288                 }
289 
290                 if (mEventDetector != null) {
291                     mEventDetector.feedTSStream(dataBuffer, 0, bytesWritten);
292                 }
293                 synchronized (mCircularBufferMonitor) {
294                     int posInBuffer = (int) (mBytesFetched % CIRCULAR_BUFFER_SIZE);
295                     int bytesToCopyInFirstPass = bytesWritten;
296                     if (posInBuffer + bytesToCopyInFirstPass > mCircularBuffer.length) {
297                         bytesToCopyInFirstPass = mCircularBuffer.length - posInBuffer;
298                     }
299                     System.arraycopy(dataBuffer, 0, mCircularBuffer, posInBuffer,
300                             bytesToCopyInFirstPass);
301                     if (bytesToCopyInFirstPass < bytesWritten) {
302                         System.arraycopy(dataBuffer, bytesToCopyInFirstPass, mCircularBuffer, 0,
303                                 bytesWritten - bytesToCopyInFirstPass);
304                     }
305                     mBytesFetched += bytesWritten;
306                     mCircularBufferMonitor.notifyAll();
307                 }
308             }
309 
310             Log.i(TAG, "Streaming stopped");
311         }
312     }
313 
314     /**
315      * Reads data from internal buffer.
316      * @param pos the position to read from
317      * @param buffer to read
318      * @param offset start position of the read buffer
319      * @param amount number of bytes to read
320      * @return number of read bytes when successful, {@code -1} otherwise
321      * @throws IOException
322      */
readAt(long pos, byte[] buffer, int offset, int amount)323     public int readAt(long pos, byte[] buffer, int offset, int amount) throws IOException {
324         long readStartTime = System.currentTimeMillis();
325         while (true) {
326             synchronized (mCircularBufferMonitor) {
327                 if (mEndOfStreamSent || !mStreaming) {
328                     return -1;
329                 }
330                 if (mBytesFetched - CIRCULAR_BUFFER_SIZE > pos) {
331                     Log.e(TAG, "Demux is requesting the data which is already overwritten.");
332                     return -1;
333                 }
334                 if (System.currentTimeMillis() - readStartTime > READ_TIMEOUT_MS) {
335                     // Nothing was received during READ_TIMEOUT_MS before.
336                     mEndOfStreamSent = true;
337                     mCircularBufferMonitor.notifyAll();
338                     return -1;
339                 }
340                 if (mBytesFetched < pos + amount) {
341                     try {
342                         mCircularBufferMonitor.wait(READ_TIMEOUT_MS);
343                     } catch (InterruptedException e) {
344                         Thread.currentThread().interrupt();
345                     }
346                     // Try again to prevent starvation.
347                     // Give chances to read from other threads.
348                     continue;
349                 }
350                 int startPos = (int) (pos % CIRCULAR_BUFFER_SIZE);
351                 int endPos = (int) ((pos + amount) % CIRCULAR_BUFFER_SIZE);
352                 int firstLength = (startPos > endPos ? CIRCULAR_BUFFER_SIZE : endPos) - startPos;
353                 System.arraycopy(mCircularBuffer, startPos, buffer, offset, firstLength);
354                 if (firstLength < amount) {
355                     System.arraycopy(mCircularBuffer, 0, buffer, offset + firstLength,
356                             amount - firstLength);
357                 }
358                 mCircularBufferMonitor.notifyAll();
359                 return amount;
360             }
361         }
362     }
363 }
364