1 /*
2  * Copyright (C) 2015 The Android Open Source Project
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *      http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 package com.android.usbtuner;
18 
19 import android.media.MediaDataSource;
20 import android.util.Log;
21 
22 import com.android.usbtuner.ChannelScanFileParser.ScanChannel;
23 import com.android.usbtuner.data.Channel;
24 import com.android.usbtuner.data.TunerChannel;
25 import com.android.usbtuner.tvinput.EventDetector;
26 import com.android.usbtuner.tvinput.EventDetector.EventListener;
27 import com.android.usbtuner.tvinput.UsbTunerDebug;
28 
29 import java.io.IOException;
30 import java.util.Locale;
31 import java.util.concurrent.atomic.AtomicLong;
32 
33 /**
34  * A {@link MediaDataSource} implementation which provides the mpeg2ts stream from the tuner device
35  * to {@link MediaExtractor}.
36  */
37 public class UsbTunerDataSource extends MediaDataSource implements InputStreamSource {
38     private static final String TAG = "UsbTunerDataSource";
39 
40     private static final int MIN_READ_UNIT = 1500;
41     private static final int READ_BUFFER_SIZE = MIN_READ_UNIT * 10; // ~15KB
42     private static final int CIRCULAR_BUFFER_SIZE = MIN_READ_UNIT * 20000;  // ~ 30MB
43 
44     private static final int READ_TIMEOUT_MS = 5000; // 5 secs.
45     private static final int BUFFER_UNDERRUN_SLEEP_MS = 10;
46 
47     private static final int CACHE_KEY_VERSION = 1;
48 
49     // UTCK stands for USB Tuner Cache Key.
50     private static final String CACHE_KEY_PREFIX = "UTCK";
51 
52     private final Object mCircularBufferMonitor = new Object();
53     private final byte[] mCircularBuffer = new byte[CIRCULAR_BUFFER_SIZE];
54     private long mBytesFetched;
55     private final AtomicLong mLastReadPosition = new AtomicLong();
56     private boolean mEndOfStreamSent;
57     private boolean mStreaming;
58 
59     private final TunerHal mTunerHal;
60     private Thread mStreamingThread;
61     private boolean mDeviceConfigured;
62     private EventDetector mEventDetector;
63 
UsbTunerDataSource(TunerHal tunerHal, EventListener eventListener)64     public UsbTunerDataSource(TunerHal tunerHal, EventListener eventListener) {
65         mTunerHal = tunerHal;
66         mEventDetector = new EventDetector(mTunerHal, eventListener);
67     }
68 
69     /**
70      * Starts the streaming of a configured program. Throws a runtime exception if no channel and
71      * program have successfully been configured yet.
72      */
73     @Override
startStream()74     public void startStream() {
75         if (!mDeviceConfigured) {
76             throw new RuntimeException("Channel and program not configured!");
77         }
78 
79         synchronized (mCircularBufferMonitor) {
80             if (mStreaming) {
81                 Log.w(TAG, "Streaming should be stopped before start streaming");
82                 return;
83             }
84             mStreaming = true;
85             mBytesFetched = 0;
86             mLastReadPosition.set(0L);
87             mEndOfStreamSent = false;
88         }
89 
90         mStreamingThread = new StreamingThread();
91         mStreamingThread.start();
92         Log.i(TAG, "Streaming started");
93     }
94 
95     /**
96      * Sets the channel required to start streaming from this device. Afterwards, prepares the tuner
97      * device for streaming. Package retrieval can be made at any time after invoking this method
98      * and before stopping the stream.
99      *
100      * @param channel a {@link TunerChannel} instance tune to
101      * @return {@code true} if the entire operation was successful; {@code false} otherwise
102      */
103     @Override
tuneToChannel(TunerChannel channel)104     public boolean tuneToChannel(TunerChannel channel) {
105         if (mTunerHal.tune(channel.getFrequency(), channel.getModulation())) {
106             if (channel.hasVideo()) {
107                 mTunerHal.addPidFilter(channel.getVideoPid(),
108                         TunerHal.FILTER_TYPE_VIDEO);
109             }
110             if (channel.hasAudio()) {
111                 mTunerHal.addPidFilter(channel.getAudioPid(),
112                         TunerHal.FILTER_TYPE_AUDIO);
113             }
114             mTunerHal.addPidFilter(channel.getPcrPid(),
115                     TunerHal.FILTER_TYPE_PCR);
116             if (mEventDetector != null) {
117                 mEventDetector.startDetecting(channel.getFrequency(), channel.getModulation());
118             }
119             mDeviceConfigured = true;
120             return true;
121         }
122         return false;
123     }
124 
125     /**
126      * Blocks the current thread until the streaming thread stops. In rare cases when the tuner
127      * device is overloaded this can take a while, but usually it returns pretty quickly.
128      */
129     @Override
stopStream()130     public void stopStream() {
131         synchronized (mCircularBufferMonitor) {
132             mStreaming = false;
133             mCircularBufferMonitor.notify();
134         }
135 
136         try {
137             if (mStreamingThread != null) {
138                 mStreamingThread.join();
139             }
140         } catch (InterruptedException e) {
141             Thread.currentThread().interrupt();
142         } finally {
143             mTunerHal.stopTune();
144         }
145     }
146 
147     @Override
getLimit()148     public long getLimit() {
149         synchronized (mCircularBufferMonitor) {
150             return mBytesFetched;
151         }
152     }
153 
154     @Override
getPosition()155     public long getPosition() {
156         return mLastReadPosition.get();
157     }
158 
159     private class StreamingThread extends Thread {
160         @Override
run()161         public void run() {
162             // Buffers for streaming data from the tuner and the internal buffer.
163             byte[] dataBuffer = new byte[READ_BUFFER_SIZE];
164 
165             while (true) {
166                 synchronized (mCircularBufferMonitor) {
167                     if (!mStreaming) {
168                         break;
169                     }
170                 }
171 
172                 int bytesWritten = mTunerHal.readTsStream(dataBuffer, dataBuffer.length);
173                 if (bytesWritten <= 0) {
174                     try {
175                         // When buffer is underrun, we sleep for short time to prevent
176                         // unnecessary CPU draining.
177                         sleep(BUFFER_UNDERRUN_SLEEP_MS);
178                     } catch (InterruptedException e) {
179                         Thread.currentThread().interrupt();
180                     }
181                     continue;
182                 }
183 
184                 if (mEventDetector != null) {
185                     mEventDetector.feedTSStream(dataBuffer, 0, bytesWritten);
186                 }
187                 synchronized (mCircularBufferMonitor) {
188                     int posInBuffer = (int) (mBytesFetched % CIRCULAR_BUFFER_SIZE);
189                     int bytesToCopyInFirstPass = bytesWritten;
190                     if (posInBuffer + bytesToCopyInFirstPass > mCircularBuffer.length) {
191                         bytesToCopyInFirstPass = mCircularBuffer.length - posInBuffer;
192                     }
193                     System.arraycopy(dataBuffer, 0, mCircularBuffer, posInBuffer,
194                             bytesToCopyInFirstPass);
195                     if (bytesToCopyInFirstPass < bytesWritten) {
196                         System.arraycopy(dataBuffer, bytesToCopyInFirstPass, mCircularBuffer, 0,
197                                 bytesWritten - bytesToCopyInFirstPass);
198                     }
199                     mBytesFetched += bytesWritten;
200                     mCircularBufferMonitor.notify();
201                 }
202             }
203 
204             Log.i(TAG, "Streaming stopped");
205         }
206     }
207 
208     @Override
readAt(long pos, byte[] buffer, int offset, int amount)209     public int readAt(long pos, byte[] buffer, int offset, int amount) throws IOException {
210         synchronized (mCircularBufferMonitor) {
211             if (mEndOfStreamSent) {
212                 // Nothing was received during READ_TIMEOUT_MS before.
213                 return -1;
214             }
215             if (mBytesFetched - CIRCULAR_BUFFER_SIZE > pos) {
216                 // Not available at circular buffer.
217                 Log.w(TAG, "Not available at circular buffer");
218                 return -1;
219             }
220             long initialBytesFetched = mBytesFetched;
221             while (mBytesFetched < pos + amount && mStreaming) {
222                 try {
223                     mCircularBufferMonitor.wait(READ_TIMEOUT_MS);
224                 } catch (InterruptedException e) {
225                     // Wait again.
226                     Thread.currentThread().interrupt();
227                 }
228                 if (initialBytesFetched == mBytesFetched) {
229                     Log.w(TAG, "No data update for " + READ_TIMEOUT_MS + "ms. returning -1.");
230 
231                     // Returning -1 will make demux report EOS so that the input service can retry
232                     // the playback.
233                     mEndOfStreamSent = true;
234                     return -1;
235                 }
236             }
237             if (!mStreaming) {
238                 Log.w(TAG, "Stream is already stopped.");
239                 return -1;
240             }
241             if (mBytesFetched - CIRCULAR_BUFFER_SIZE > pos) {
242                 Log.e(TAG, "Demux is requesting the data which is already overwritten.");
243                 return -1;
244             }
245             int posInBuffer = (int) (pos % CIRCULAR_BUFFER_SIZE);
246             int bytesToCopyInFirstPass = amount;
247             if (posInBuffer + bytesToCopyInFirstPass > mCircularBuffer.length) {
248                 bytesToCopyInFirstPass = mCircularBuffer.length - posInBuffer;
249             }
250             System.arraycopy(mCircularBuffer, posInBuffer, buffer, offset, bytesToCopyInFirstPass);
251             if (bytesToCopyInFirstPass < amount) {
252                 System.arraycopy(mCircularBuffer, 0, buffer, offset + bytesToCopyInFirstPass,
253                         amount - bytesToCopyInFirstPass);
254             }
255             mLastReadPosition.set(pos + amount);
256             mCircularBufferMonitor.notify();
257 
258             if (UsbTunerDebug.ENABLED) {
259                 UsbTunerDebug.setBytesInQueue((int) (mBytesFetched - mLastReadPosition.get()));
260             }
261 
262             return amount;
263         }
264     }
265 
266     @Override
getSize()267     public long getSize() throws IOException {
268         return -1;
269     }
270 
271     @Override
close()272     public void close() {
273         // Called from system MediaExtractor. All the resource should be closed
274         // in stopStream() already.
275     }
276 
277     @Override
getType()278     public int getType() {
279         return Channel.TYPE_TUNER;
280     }
281 
282     @Override
setScanChannel(ScanChannel channel)283     public boolean setScanChannel(ScanChannel channel) {
284         return false;
285     }
286 
generateCacheKey(TunerChannel channel, long timestampMs)287     public static String generateCacheKey(TunerChannel channel, long timestampMs) {
288         return String.format(Locale.ENGLISH, "%s-%x-%x-%x-%x", CACHE_KEY_PREFIX, CACHE_KEY_VERSION,
289                 channel.getFrequency(), channel.getProgramNumber(), timestampMs);
290     }
291 
292     /**
293      * Parses the timestamp from a cache key generated by {@link #generateCacheKey}.
294      *
295      * @param cacheKey a cache key generated by {@link #generateCacheKey}
296      * @return the timestamp parsed from the given cache key. {@code -1} if unable to parse.
297      */
parseTimestampFromCacheKey(String cacheKey)298     public static long parseTimestampFromCacheKey(String cacheKey) {
299         String[] tokens = cacheKey.split("-");
300         if (tokens.length < 2 || !tokens[0].equals(CACHE_KEY_PREFIX)) {
301             return -1;
302         }
303         int version = Integer.parseInt(tokens[1], 16);
304         if (version == 1) {
305             return Long.parseLong(tokens[4], 16);
306         } else {
307             return -1;
308         }
309     }
310 }
311