1 /*
2  * Copyright (C) 2015 The Android Open Source Project
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *      http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 package com.android.usbtuner;
18 
19 import android.media.MediaDataSource;
20 import android.os.Environment;
21 import android.util.Log;
22 import android.util.SparseBooleanArray;
23 
24 import com.android.usbtuner.ChannelScanFileParser.ScanChannel;
25 import com.android.usbtuner.data.Channel;
26 import com.android.usbtuner.data.TunerChannel;
27 import com.android.usbtuner.ts.TsParser;
28 import com.android.usbtuner.tvinput.EventDetector;
29 import com.android.usbtuner.tvinput.FileSourceEventDetector;
30 
31 import java.io.BufferedInputStream;
32 import java.io.File;
33 import java.io.FileInputStream;
34 import java.io.IOException;
35 import java.util.List;
36 
37 /**
38  * A {@link DataSource} implementation which provides the MPEG-2 TS stream from a local file
39  * generated by capturing TV signal.
40  */
41 public class FileDataSource extends MediaDataSource implements InputStreamSource {
42     private static final String TAG = "FileDataSource";
43 
44     private static final int TS_PACKET_SIZE = 188;
45     private static final int TS_SYNC_BYTE = 0x47;
46     private static final int MIN_READ_UNIT = TS_PACKET_SIZE * 10;
47     private static final int READ_BUFFER_SIZE = MIN_READ_UNIT * 10; // ~20KB
48     private static final int CIRCULAR_BUFFER_SIZE = MIN_READ_UNIT * 4000; // ~ 8MB
49     private static final int PADDING_SIZE = MIN_READ_UNIT * 1000; // ~2MB
50     private static final int READ_TIMEOUT_MS = 10000; // 10 secs.
51     private static final int BUFFER_UNDERRUN_SLEEP_MS = 10;
52     private static final String FILE_DIR =
53             new File(Environment.getExternalStorageDirectory(), "Streams").getAbsolutePath();
54 
55     // Virtual frequency base used for file-based source
56     public static final int FREQ_BASE = 100;
57 
58     private final Object mCircularBufferMonitor = new Object();
59     private final byte[] mCircularBuffer = new byte[CIRCULAR_BUFFER_SIZE];
60     private final FileSourceEventDetector mEventDetector;
61 
62     private long mBytesFetched;
63     private long mLastReadPosition;
64     private boolean mStreaming;
65 
66     private Thread mStreamingThread;
67     private StreamProvider mSource;
68 
FileDataSource(EventDetector.EventListener eventListener)69     public FileDataSource(EventDetector.EventListener eventListener) {
70         mEventDetector = new FileSourceEventDetector(eventListener);
71     }
72 
73     @Override
setScanChannel(ScanChannel channel)74     public boolean setScanChannel(ScanChannel channel) {
75         String filepath = new File(FILE_DIR, channel.filename).getAbsolutePath();
76         mSource = new StreamProvider(filepath);
77         if (mSource.isReady()) {
78             mEventDetector.start(mSource);
79             return true;
80         }
81         return false;
82     }
83 
84     /**
85      * Sets the channel required to start streaming from this device. Afterwards, prepares
86      * the tuner device for streaming. Package retrieval can be made at any time after invoking
87      * this method and before stopping the stream.
88      *
89      * @param channel a {@link TunerChannel} instance tune to
90      * @return {@code true} if the entire operation was successful; {@code false} otherwise
91      */
92     @Override
tuneToChannel(TunerChannel channel)93     public boolean tuneToChannel(TunerChannel channel) {
94         Log.i(TAG, "tuneToChannel with: " + channel.getFilepath());
95         mSource = new StreamProvider(channel.getFilepath());
96         if (!mSource.isReady()) {
97             return false;
98         }
99         mEventDetector.start(mSource);
100         mSource.addPidFilter(channel.getVideoPid());
101         mSource.addPidFilter(channel.getAudioPid());
102         mSource.addPidFilter(channel.getPcrPid());
103         return true;
104     }
105 
106     /**
107      * Starts streaming data.
108      */
109     @Override
startStream()110     public void startStream() {
111         mSource.addPidFilter(TsParser.ATSC_SI_BASE_PID);
112         mSource.addPidFilter(TsParser.PAT_PID);
113         synchronized (mCircularBufferMonitor) {
114             if (mStreaming) {
115                 return;
116             }
117             mStreaming = true;
118         }
119 
120         mStreamingThread = new StreamingThread();
121         mStreamingThread.start();
122         Log.i(TAG, "Streaming started");
123     }
124 
125     /**
126      * Blocks the current thread until the streaming thread stops. In rare cases when the tuner
127      * device is overloaded this can take a while, but usually it returns pretty quickly.
128      */
129     @Override
stopStream()130     public void stopStream() {
131         synchronized (mCircularBufferMonitor) {
132             mStreaming = false;
133             mCircularBufferMonitor.notify();
134         }
135 
136         try {
137             if (mStreamingThread != null) {
138                 mStreamingThread.join();
139             }
140         } catch (InterruptedException e) {
141             Thread.currentThread().interrupt();
142         }
143     }
144 
145     @Override
getLimit()146     public long getLimit() {
147         synchronized (mCircularBufferMonitor) {
148             return mBytesFetched;
149         }
150     }
151 
152     @Override
getPosition()153     public long getPosition() {
154         synchronized (mCircularBufferMonitor) {
155             return mLastReadPosition;
156         }
157     }
158 
159     /**
160      * Provides MPEG-2 transport stream from a local file. Stream can be filtered by PID.
161      */
162     public static class StreamProvider {
163         private final String mFilepath;
164         private final SparseBooleanArray mPids = new SparseBooleanArray();
165         private final byte[] mPreBuffer = new byte[READ_BUFFER_SIZE];
166 
167         private BufferedInputStream mInputStream;
168 
StreamProvider(String filepath)169         private StreamProvider(String filepath) {
170             mFilepath = filepath;
171             open(filepath);
172         }
173 
open(String filepath)174         private void open(String filepath) {
175             try {
176                 mInputStream = new BufferedInputStream(new FileInputStream(filepath));
177             } catch (IOException e) {
178                 Log.e(TAG, "Error opening input stream", e);
179                 mInputStream = null;
180             }
181         }
182 
isReady()183         private boolean isReady() {
184             return mInputStream != null;
185         }
186 
getFilepath()187         public String getFilepath() {
188             return mFilepath;
189         }
190 
addPidFilter(int pid)191         public void addPidFilter(int pid) {
192             mPids.put(pid, true);
193         }
194 
isFilterEmpty()195         public boolean isFilterEmpty() {
196             return mPids.size() > 0;
197         }
198 
clearPidFilter()199         public void clearPidFilter() {
200             mPids.clear();
201         }
202 
isInFilter(int pid)203         public boolean isInFilter(int pid) {
204             return mPids.get(pid);
205         }
206 
read(byte[] inputBuffer)207         private int read(byte[] inputBuffer) {
208             int readSize = readInternal();
209             if (readSize <= 0) {
210                 // Reached the end of stream. Restart from the beginning.
211                 close();
212                 open(mFilepath);
213                 if (mInputStream == null) {
214                     return -1;
215                 }
216                 readSize = readInternal();
217             }
218 
219             if (mPreBuffer[0] != TS_SYNC_BYTE) {
220                 Log.e(TAG, "Error reading input stream - no TS sync found");
221                 return -1;
222             }
223             int filteredSize = 0;
224             for (int i = 0, destPos = 0; i < readSize; i += TS_PACKET_SIZE) {
225                 if (mPreBuffer[i] == TS_SYNC_BYTE) {
226                     int pid = ((mPreBuffer[i + 1] & 0x1f) << 8) + (mPreBuffer[i + 2] & 0xff);
227                     if (mPids.get(pid)) {
228                         System.arraycopy(mPreBuffer, i, inputBuffer, destPos, TS_PACKET_SIZE);
229                         destPos += TS_PACKET_SIZE;
230                         filteredSize += TS_PACKET_SIZE;
231                     }
232                 }
233             }
234             return filteredSize;
235         }
236 
readInternal()237         private int readInternal() {
238             int readSize;
239             try {
240                 readSize = mInputStream.read(mPreBuffer, 0, mPreBuffer.length);
241             } catch (IOException e) {
242                 Log.e(TAG, "Error reading input stream", e);
243                 return -1;
244             }
245             return readSize;
246         }
247 
close()248         private void close() {
249             try {
250                 mInputStream.close();
251             } catch (IOException e) {
252                 Log.e(TAG, "Error closing input stream:", e);
253             }
254             mInputStream = null;
255         }
256     }
257 
258     @Override
readAt(long pos, byte[] buffer, int offset, int amount)259     public int readAt(long pos, byte[] buffer, int offset, int amount) throws IOException {
260         synchronized (mCircularBufferMonitor) {
261             long initialBytesFetched = mBytesFetched;
262             while (mBytesFetched < pos + amount && mStreaming) {
263                 try {
264                     mCircularBufferMonitor.wait(READ_TIMEOUT_MS);
265                 } catch (InterruptedException e) {
266                     // Wait again.
267                     Thread.currentThread().interrupt();
268                 }
269                 if (initialBytesFetched == mBytesFetched) {
270                     Log.w(TAG, "No data update for " + READ_TIMEOUT_MS + "ms. returning -1.");
271 
272                     // Returning -1 will make demux report EOS so that the input service can retry
273                     // the playback.
274                     return -1;
275                 }
276             }
277             if (!mStreaming) {
278                 Log.w(TAG, "Stream is already stopped.");
279                 return -1;
280             }
281             if (mBytesFetched - CIRCULAR_BUFFER_SIZE > pos) {
282                 Log.e(TAG, "Demux is requesting the data which is already overwritten.");
283                 return -1;
284             }
285             int posInBuffer = (int) (pos % CIRCULAR_BUFFER_SIZE);
286             int bytesToCopyInFirstPass = amount;
287             if (posInBuffer + bytesToCopyInFirstPass > mCircularBuffer.length) {
288                 bytesToCopyInFirstPass = mCircularBuffer.length - posInBuffer;
289             }
290             System.arraycopy(mCircularBuffer, posInBuffer, buffer, offset, bytesToCopyInFirstPass);
291             if (bytesToCopyInFirstPass < amount) {
292                 System.arraycopy(mCircularBuffer, 0, buffer, offset + bytesToCopyInFirstPass,
293                         amount - bytesToCopyInFirstPass);
294             }
295             mLastReadPosition = pos + amount;
296             mCircularBufferMonitor.notify();
297             return amount;
298         }
299     }
300 
301     @Override
getSize()302     public long getSize() throws IOException {
303         return -1;
304     }
305 
306     @Override
close()307     public void close() {}
308 
309     @Override
getType()310     public int getType() {
311         return Channel.TYPE_FILE;
312     }
313 
314     /**
315      * Adds {@link ScanChannel} instance for local files.
316      *
317      * @param output a list of channels where the results will be placed in
318      */
addLocalStreamFiles(List<ScanChannel> output)319     public static void addLocalStreamFiles(List<ScanChannel> output) {
320         File dir = new File(FILE_DIR);
321         if (!dir.exists()) return;
322 
323         File[] tsFiles = dir.listFiles();
324         if (tsFiles == null) return;
325         int freq = FileDataSource.FREQ_BASE;
326         for (File file : tsFiles) {
327             if (!file.isFile()) continue;
328             output.add(ScanChannel.forFile(freq, file.getName()));
329             freq += 100;
330         }
331     }
332 
333     /**
334      * A thread managing a circular buffer that holds stream data to be consumed by player.
335      * Keeps reading data in from a {@link StreamProvider} to hold enough amount for buffering.
336      * Started and stopped by {@link #startStream()} and {@link #stopStream()}, respectively.
337      */
338     private class StreamingThread extends Thread {
339         @Override
run()340         public void run() {
341             byte[] dataBuffer = new byte[READ_BUFFER_SIZE];
342 
343             synchronized (mCircularBufferMonitor) {
344                 mBytesFetched = 0;
345                 mLastReadPosition = 0;
346             }
347 
348             while (true) {
349                 synchronized (mCircularBufferMonitor) {
350                     while ((mBytesFetched - mLastReadPosition + PADDING_SIZE) > CIRCULAR_BUFFER_SIZE
351                             && mStreaming) {
352                         try {
353                             mCircularBufferMonitor.wait();
354                         } catch (InterruptedException e) {
355                             // Wait again.
356                             Thread.currentThread().interrupt();
357                         }
358                     }
359                     if (!mStreaming) {
360                         break;
361                     }
362                 }
363 
364                 int bytesWritten = mSource.read(dataBuffer);
365                 if (bytesWritten <= 0) {
366                     try {
367                         // When buffer is underrun, we sleep for short time to prevent
368                         // unnecessary CPU draining.
369                         sleep(BUFFER_UNDERRUN_SLEEP_MS);
370                     } catch (InterruptedException e) {
371                         Thread.currentThread().interrupt();
372                     }
373                     continue;
374                 }
375 
376                 mEventDetector.feedTSStream(dataBuffer, 0, bytesWritten);
377 
378                 synchronized (mCircularBufferMonitor) {
379                     int posInBuffer = (int) (mBytesFetched % CIRCULAR_BUFFER_SIZE);
380                     int bytesToCopyInFirstPass = bytesWritten;
381                     if (posInBuffer + bytesToCopyInFirstPass > mCircularBuffer.length) {
382                         bytesToCopyInFirstPass = mCircularBuffer.length - posInBuffer;
383                     }
384                     System.arraycopy(dataBuffer, 0, mCircularBuffer, posInBuffer,
385                             bytesToCopyInFirstPass);
386                     if (bytesToCopyInFirstPass < bytesWritten) {
387                         System.arraycopy(dataBuffer, bytesToCopyInFirstPass, mCircularBuffer, 0,
388                                 bytesWritten - bytesToCopyInFirstPass);
389                     }
390                     mBytesFetched += bytesWritten;
391                     mCircularBufferMonitor.notify();
392                 }
393             }
394 
395             Log.i(TAG, "Streaming stopped");
396             mSource.close();
397         }
398     }
399 }
400