1 /*
2  * Copyright (C) 2015 The Android Open Source Project
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *      http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 package com.android.tv.tuner.source;
18 
19 import android.content.Context;
20 import android.net.Uri;
21 import android.support.annotation.Nullable;
22 import android.util.Log;
23 import android.util.Pair;
24 
25 import com.android.tv.common.SoftPreconditions;
26 import com.android.tv.tuner.api.ScanChannel;
27 import com.android.tv.tuner.api.Tuner;
28 import com.android.tv.tuner.data.TunerChannel;
29 import com.android.tv.tuner.prefs.TunerPreferences;
30 import com.android.tv.tuner.ts.EventDetector;
31 import com.android.tv.tuner.ts.EventDetector.EventListener;
32 
33 import com.google.android.exoplayer2.C;
34 import com.google.android.exoplayer2.upstream.DataSpec;
35 import com.google.android.exoplayer2.upstream.TransferListener;
36 import java.io.IOException;
37 import java.util.ArrayList;
38 import java.util.List;
39 import java.util.concurrent.atomic.AtomicLong;
40 
41 /** Provides MPEG-2 TS stream sources for channel playing from an underlying tuner device. */
42 public class TunerTsStreamer implements TsStreamer {
43     private static final String TAG = "TunerTsStreamer";
44 
45     private static final int MIN_READ_UNIT = 1500;
46     private static final int READ_BUFFER_SIZE = MIN_READ_UNIT * 10; // ~15KB
47     private static final int CIRCULAR_BUFFER_SIZE = MIN_READ_UNIT * 20000; // ~ 30MB
48     private static final int TS_PACKET_SIZE = 188;
49 
50     private static final int READ_TIMEOUT_MS = 5000; // 5 secs.
51     private static final int BUFFER_UNDERRUN_SLEEP_MS = 10;
52     private static final int READ_ERROR_STREAMING_ENDED = -1;
53     private static final int READ_ERROR_BUFFER_OVERWRITTEN = -2;
54 
55     private final Object mCircularBufferMonitor = new Object();
56     private final byte[] mCircularBuffer = new byte[CIRCULAR_BUFFER_SIZE];
57     private long mBytesFetched;
58     private final AtomicLong mLastReadPosition = new AtomicLong();
59     private boolean mStreaming;
60 
61     private final Tuner mTunerHal;
62     private TunerChannel mChannel;
63     private Thread mStreamingThread;
64     private final EventDetector mEventDetector;
65     private final List<Pair<EventListener, Boolean>> mEventListenerActions = new ArrayList<>();
66 
67     private final TsStreamWriter mTsStreamWriter;
68     private String mChannelNumber;
69 
70     public static class TunerDataSource extends TsDataSource {
71         private final TunerTsStreamer mTsStreamer;
72         private final AtomicLong mLastReadPosition = new AtomicLong(0);
73         private long mStartBufferedPosition;
74         private Uri mUri;
75 
TunerDataSource(TunerTsStreamer tsStreamer)76         private TunerDataSource(TunerTsStreamer tsStreamer) {
77             mTsStreamer = tsStreamer;
78             mStartBufferedPosition = tsStreamer.getBufferedPosition();
79         }
80 
81         @Override
getBufferedPosition()82         public long getBufferedPosition() {
83             return mTsStreamer.getBufferedPosition() - mStartBufferedPosition;
84         }
85 
86         @Override
getLastReadPosition()87         public long getLastReadPosition() {
88             return mLastReadPosition.get();
89         }
90 
91         @Override
shiftStartPosition(long offset)92         public void shiftStartPosition(long offset) {
93             SoftPreconditions.checkState(mLastReadPosition.get() == 0);
94             SoftPreconditions.checkArgument(0 <= offset && offset <= getBufferedPosition());
95             mStartBufferedPosition += offset;
96         }
97 
98         @Override
open(DataSpec dataSpec)99         public long open(DataSpec dataSpec) {
100             mUri = dataSpec.uri;
101             mLastReadPosition.set(0);
102             return C.LENGTH_UNSET;
103         }
104 
105         @Override
close()106         public void close() {
107             mUri = null;
108         }
109 
110         @Override
read(byte[] buffer, int offset, int readLength)111         public int read(byte[] buffer, int offset, int readLength) throws IOException {
112             int ret =
113                     mTsStreamer.readAt(
114                             mStartBufferedPosition + mLastReadPosition.get(),
115                             buffer,
116                             offset,
117                             readLength);
118             if (ret > 0) {
119                 mLastReadPosition.addAndGet(ret);
120             } else if (ret == READ_ERROR_BUFFER_OVERWRITTEN) {
121                 long currentPosition = mStartBufferedPosition + mLastReadPosition.get();
122                 long endPosition = mTsStreamer.getBufferedPosition();
123                 long diff =
124                         ((endPosition - currentPosition + TS_PACKET_SIZE - 1) / TS_PACKET_SIZE)
125                                 * TS_PACKET_SIZE;
126                 Log.w(TAG, "Demux position jump by overwritten buffer: " + diff);
127                 mStartBufferedPosition = currentPosition + diff;
128                 mLastReadPosition.set(0);
129                 return 0;
130             }
131             return ret;
132         }
133 
134         @Override
getSignalStrength()135         public int getSignalStrength() {
136             return mTsStreamer.getSignalStrength();
137         }
138 
139         @Override
addTransferListener(TransferListener transferListener)140         public void addTransferListener(TransferListener transferListener) {
141             // TODO: Implement to support metrics collection.
142         }
143 
144         @Nullable
145         @Override
getUri()146         public Uri getUri() {
147             return mUri;
148         }
149 
150     }
151     /**
152      * Creates {@link TsStreamer} for playing or recording the specified channel.
153      *
154      * @param tunerHal the HAL for tuner device
155      * @param eventListener the listener for channel & program information
156      */
TunerTsStreamer(Tuner tunerHal, EventListener eventListener, Context context)157     public TunerTsStreamer(Tuner tunerHal, EventListener eventListener, Context context) {
158         mTunerHal = tunerHal;
159         mEventDetector = new EventDetector(mTunerHal);
160         if (eventListener != null) {
161             mEventDetector.registerListener(eventListener);
162         }
163         mTsStreamWriter =
164                 context != null && TunerPreferences.getStoreTsStream(context)
165                         ? new TsStreamWriter(context)
166                         : null;
167     }
168 
TunerTsStreamer(Tuner tunerHal, EventListener eventListener)169     public TunerTsStreamer(Tuner tunerHal, EventListener eventListener) {
170         this(tunerHal, eventListener, null);
171     }
172 
173     @Override
startStream(TunerChannel channel)174     public boolean startStream(TunerChannel channel) {
175         if (mTunerHal.tune(
176                 channel.getDeliverySystemType().getNumber(), channel.getFrequency(),
177                 channel.getModulation(), channel.getDisplayNumber(false))) {
178             if (channel.hasVideo()) {
179                 mTunerHal.addPidFilter(channel.getVideoPid(), Tuner.FILTER_TYPE_VIDEO);
180             }
181             boolean audioFilterSet = false;
182             for (Integer audioPid : channel.getAudioPids()) {
183                 if (!audioFilterSet) {
184                     mTunerHal.addPidFilter(audioPid, Tuner.FILTER_TYPE_AUDIO);
185                     audioFilterSet = true;
186                 } else {
187                     // FILTER_TYPE_AUDIO overrides the previous filter for audio. We use
188                     // FILTER_TYPE_OTHER from the secondary one to get the all audio tracks.
189                     mTunerHal.addPidFilter(audioPid, Tuner.FILTER_TYPE_OTHER);
190                 }
191             }
192             mTunerHal.addPidFilter(channel.getPcrPid(), Tuner.FILTER_TYPE_PCR);
193             if (mEventDetector != null) {
194                 mEventDetector.startDetecting(
195                         channel.getDeliverySystemType(),
196                         channel.getFrequency(),
197                         channel.getModulation(),
198                         channel.getProgramNumber());
199             }
200             mChannel = channel;
201             mChannelNumber = channel.getDisplayNumber();
202             synchronized (mCircularBufferMonitor) {
203                 if (mStreaming) {
204                     Log.w(TAG, "Streaming should be stopped before start streaming");
205                     return true;
206                 }
207                 mStreaming = true;
208                 mBytesFetched = 0;
209                 mLastReadPosition.set(0L);
210             }
211             if (mTsStreamWriter != null) {
212                 mTsStreamWriter.setChannel(mChannel);
213                 mTsStreamWriter.openFile();
214             }
215             mStreamingThread = new StreamingThread();
216             mStreamingThread.start();
217             Log.i(TAG, "Streaming started");
218             return true;
219         }
220         return false;
221     }
222 
223     @Override
startStream(ScanChannel channel)224     public boolean startStream(ScanChannel channel) {
225         if (mTunerHal.tune(channel.deliverySystemType.getNumber(), channel.frequency,
226                 channel.modulation, null)) {
227             mEventDetector.startDetecting(
228                     channel.deliverySystemType, channel.frequency, channel.modulation,
229                     EventDetector.ALL_PROGRAM_NUMBERS);
230             synchronized (mCircularBufferMonitor) {
231                 if (mStreaming) {
232                     Log.w(TAG, "Streaming should be stopped before start streaming");
233                     return true;
234                 }
235                 mStreaming = true;
236                 mBytesFetched = 0;
237                 mLastReadPosition.set(0L);
238             }
239             mStreamingThread = new StreamingThread();
240             mStreamingThread.start();
241             Log.i(TAG, "Streaming started");
242             return true;
243         }
244         return false;
245     }
246 
247     /**
248      * Blocks the current thread until the streaming thread stops. In rare cases when the tuner
249      * device is overloaded this can take a while, but usually it returns pretty quickly.
250      */
251     @Override
stopStream()252     public void stopStream() {
253         mChannel = null;
254         synchronized (mCircularBufferMonitor) {
255             mStreaming = false;
256             mCircularBufferMonitor.notifyAll();
257         }
258 
259         try {
260             if (mStreamingThread != null) {
261                 mStreamingThread.join();
262             }
263         } catch (InterruptedException e) {
264             Thread.currentThread().interrupt();
265         }
266         if (mTsStreamWriter != null) {
267             mTsStreamWriter.closeFile(true);
268             mTsStreamWriter.setChannel(null);
269         }
270     }
271 
272     @Override
createDataSource()273     public TsDataSource createDataSource() {
274         return new TunerDataSource(this);
275     }
276 
277     /**
278      * Returns incomplete channel lists which was scanned so far. Incomplete channel means the
279      * channel whose channel information is not complete or is not well-formed.
280      *
281      * @return {@link List} of {@link TunerChannel}
282      */
getMalFormedChannels()283     public List<TunerChannel> getMalFormedChannels() {
284         return mEventDetector.getMalFormedChannels();
285     }
286 
287     /**
288      * Returns the current {@link Tuner} which provides MPEG-TS stream for TunerTsStreamer.
289      *
290      * @return {@link Tuner}
291      */
getTunerHal()292     public Tuner getTunerHal() {
293         return mTunerHal;
294     }
295 
296     /**
297      * Returns the current tuned channel for TunerTsStreamer.
298      *
299      * @return {@link TunerChannel}
300      */
getChannel()301     public TunerChannel getChannel() {
302         return mChannel;
303     }
304 
305     /**
306      * Returns the current buffered position from tuner.
307      *
308      * @return the current buffered position
309      */
getBufferedPosition()310     public long getBufferedPosition() {
311         synchronized (mCircularBufferMonitor) {
312             return mBytesFetched;
313         }
314     }
315 
getStreamerInfo()316     public String getStreamerInfo() {
317         return "Channel: " + mChannelNumber + ", Streaming: " + mStreaming;
318     }
319 
registerListener(EventListener listener)320     public void registerListener(EventListener listener) {
321         if (mEventDetector != null && listener != null) {
322             synchronized (mEventListenerActions) {
323                 mEventListenerActions.add(Pair.create(listener, true));
324             }
325         }
326     }
327 
unregisterListener(EventListener listener)328     public void unregisterListener(EventListener listener) {
329         if (mEventDetector != null) {
330             synchronized (mEventListenerActions) {
331                 mEventListenerActions.add(new Pair(listener, false));
332             }
333         }
334     }
335 
getSignalStrength()336     public int getSignalStrength() {
337         return mTunerHal.getSignalStrength();
338     }
339 
340     private class StreamingThread extends Thread {
341         @Override
run()342         public void run() {
343             // Buffers for streaming data from the tuner and the internal buffer.
344             byte[] dataBuffer = new byte[READ_BUFFER_SIZE];
345 
346             while (true) {
347                 synchronized (mCircularBufferMonitor) {
348                     if (!mStreaming) {
349                         break;
350                     }
351                 }
352 
353                 if (mEventDetector != null) {
354                     synchronized (mEventListenerActions) {
355                         for (Pair listenerAction : mEventListenerActions) {
356                             EventListener listener = (EventListener) listenerAction.first;
357                             if ((boolean) listenerAction.second) {
358                                 mEventDetector.registerListener(listener);
359                             } else {
360                                 mEventDetector.unregisterListener(listener);
361                             }
362                         }
363                         mEventListenerActions.clear();
364                     }
365                 }
366 
367                 int bytesWritten = mTunerHal.readTsStream(dataBuffer, dataBuffer.length);
368                 if (bytesWritten <= 0) {
369                     try {
370                         // When buffer is underrun, we sleep for short time to prevent
371                         // unnecessary CPU draining.
372                         sleep(BUFFER_UNDERRUN_SLEEP_MS);
373                     } catch (InterruptedException e) {
374                         Thread.currentThread().interrupt();
375                     }
376                     continue;
377                 }
378 
379                 if (mTsStreamWriter != null) {
380                     mTsStreamWriter.writeToFile(dataBuffer, bytesWritten);
381                 }
382 
383                 if (mEventDetector != null) {
384                     mEventDetector.feedTSStream(dataBuffer, 0, bytesWritten);
385                 }
386                 synchronized (mCircularBufferMonitor) {
387                     int posInBuffer = (int) (mBytesFetched % CIRCULAR_BUFFER_SIZE);
388                     int bytesToCopyInFirstPass = bytesWritten;
389                     if (posInBuffer + bytesToCopyInFirstPass > mCircularBuffer.length) {
390                         bytesToCopyInFirstPass = mCircularBuffer.length - posInBuffer;
391                     }
392                     System.arraycopy(
393                             dataBuffer, 0, mCircularBuffer, posInBuffer, bytesToCopyInFirstPass);
394                     if (bytesToCopyInFirstPass < bytesWritten) {
395                         System.arraycopy(
396                                 dataBuffer,
397                                 bytesToCopyInFirstPass,
398                                 mCircularBuffer,
399                                 0,
400                                 bytesWritten - bytesToCopyInFirstPass);
401                     }
402                     mBytesFetched += bytesWritten;
403                     mCircularBufferMonitor.notifyAll();
404                 }
405             }
406 
407             Log.i(TAG, "Streaming stopped");
408         }
409     }
410 
411     /**
412      * Reads data from internal buffer.
413      *
414      * @param pos the position to read from
415      * @param buffer to read
416      * @param offset start position of the read buffer
417      * @param amount number of bytes to read
418      * @return number of read bytes when successful, {@code -1} otherwise
419      * @throws IOException
420      */
readAt(long pos, byte[] buffer, int offset, int amount)421     public int readAt(long pos, byte[] buffer, int offset, int amount) throws IOException {
422         while (true) {
423             synchronized (mCircularBufferMonitor) {
424                 if (!mStreaming) {
425                     return READ_ERROR_STREAMING_ENDED;
426                 }
427                 if (mBytesFetched - CIRCULAR_BUFFER_SIZE > pos) {
428                     Log.w(TAG, "Demux is requesting the data which is already overwritten.");
429                     return READ_ERROR_BUFFER_OVERWRITTEN;
430                 }
431                 if (mBytesFetched < pos + amount) {
432                     try {
433                         mCircularBufferMonitor.wait(READ_TIMEOUT_MS);
434                     } catch (InterruptedException e) {
435                         Thread.currentThread().interrupt();
436                     }
437                     // Try again to prevent starvation.
438                     // Give chances to read from other threads.
439                     continue;
440                 }
441                 int startPos = (int) (pos % CIRCULAR_BUFFER_SIZE);
442                 int endPos = (int) ((pos + amount) % CIRCULAR_BUFFER_SIZE);
443                 int firstLength = (startPos > endPos ? CIRCULAR_BUFFER_SIZE : endPos) - startPos;
444                 System.arraycopy(mCircularBuffer, startPos, buffer, offset, firstLength);
445                 if (firstLength < amount) {
446                     System.arraycopy(
447                             mCircularBuffer, 0, buffer, offset + firstLength, amount - firstLength);
448                 }
449                 mCircularBufferMonitor.notifyAll();
450                 return amount;
451             }
452         }
453     }
454 }
455