1 /*
2  * Copyright (C) 2020 The Android Open Source Project
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *      http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 // #define LOG_NDEBUG 0
18 #define LOG_TAG "MediaSampleWriter"
19 
20 #include <android-base/logging.h>
21 #include <media/MediaSampleWriter.h>
22 #include <media/NdkCommon.h>
23 #include <media/NdkMediaMuxer.h>
24 #include <sys/prctl.h>
25 #include <utils/AndroidThreads.h>
26 
27 namespace android {
28 
29 class DefaultMuxer : public MediaSampleWriterMuxerInterface {
30 public:
31     // MediaSampleWriterMuxerInterface
addTrack(AMediaFormat * trackFormat)32     ssize_t addTrack(AMediaFormat* trackFormat) override {
33         // If the track format has rotation, need to call AMediaMuxer_setOrientationHint
34         // to set the rotation. Muxer doesn't take rotation specified on the track.
35         const char* mime;
36         if (AMediaFormat_getString(trackFormat, AMEDIAFORMAT_KEY_MIME, &mime) &&
37             strncmp(mime, "video/", 6) == 0) {
38             int32_t rotation;
39             if (AMediaFormat_getInt32(trackFormat, AMEDIAFORMAT_KEY_ROTATION, &rotation) &&
40                 (rotation != 0)) {
41                 AMediaMuxer_setOrientationHint(mMuxer, rotation);
42             }
43         }
44 
45         return AMediaMuxer_addTrack(mMuxer, trackFormat);
46     }
start()47     media_status_t start() override { return AMediaMuxer_start(mMuxer); }
writeSampleData(size_t trackIndex,const uint8_t * data,const AMediaCodecBufferInfo * info)48     media_status_t writeSampleData(size_t trackIndex, const uint8_t* data,
49                                    const AMediaCodecBufferInfo* info) override {
50         return AMediaMuxer_writeSampleData(mMuxer, trackIndex, data, info);
51     }
stop()52     media_status_t stop() override { return AMediaMuxer_stop(mMuxer); }
53     // ~MediaSampleWriterMuxerInterface
54 
create(int fd)55     static std::shared_ptr<DefaultMuxer> create(int fd) {
56         AMediaMuxer* ndkMuxer = AMediaMuxer_new(fd, AMEDIAMUXER_OUTPUT_FORMAT_MPEG_4);
57         if (ndkMuxer == nullptr) {
58             LOG(ERROR) << "Unable to create AMediaMuxer";
59             return nullptr;
60         }
61 
62         return std::make_shared<DefaultMuxer>(ndkMuxer);
63     }
64 
~DefaultMuxer()65     ~DefaultMuxer() {
66         if (mMuxer != nullptr) {
67             AMediaMuxer_delete(mMuxer);
68         }
69     }
70 
DefaultMuxer(AMediaMuxer * muxer)71     DefaultMuxer(AMediaMuxer* muxer) : mMuxer(muxer){};
72     DefaultMuxer() = delete;
73 
74 private:
75     AMediaMuxer* mMuxer;
76 };
77 
78 // static
Create()79 std::shared_ptr<MediaSampleWriter> MediaSampleWriter::Create() {
80     return std::shared_ptr<MediaSampleWriter>(new MediaSampleWriter());
81 }
82 
~MediaSampleWriter()83 MediaSampleWriter::~MediaSampleWriter() {
84     if (mState == STARTED) {
85         stop();
86     }
87 }
88 
init(int fd,const std::weak_ptr<CallbackInterface> & callbacks,int64_t heartBeatIntervalUs)89 bool MediaSampleWriter::init(int fd, const std::weak_ptr<CallbackInterface>& callbacks,
90                              int64_t heartBeatIntervalUs) {
91     return init(DefaultMuxer::create(fd), callbacks, heartBeatIntervalUs);
92 }
93 
init(const std::shared_ptr<MediaSampleWriterMuxerInterface> & muxer,const std::weak_ptr<CallbackInterface> & callbacks,int64_t heartBeatIntervalUs)94 bool MediaSampleWriter::init(const std::shared_ptr<MediaSampleWriterMuxerInterface>& muxer,
95                              const std::weak_ptr<CallbackInterface>& callbacks,
96                              int64_t heartBeatIntervalUs) {
97     if (callbacks.lock() == nullptr) {
98         LOG(ERROR) << "Callback object cannot be null";
99         return false;
100     } else if (muxer == nullptr) {
101         LOG(ERROR) << "Muxer cannot be null";
102         return false;
103     }
104 
105     std::scoped_lock lock(mMutex);
106     if (mState != UNINITIALIZED) {
107         LOG(ERROR) << "Sample writer is already initialized";
108         return false;
109     }
110 
111     mState = INITIALIZED;
112     mMuxer = muxer;
113     mCallbacks = callbacks;
114     mHeartBeatIntervalUs = heartBeatIntervalUs;
115     return true;
116 }
117 
addTrack(const std::shared_ptr<AMediaFormat> & trackFormat)118 MediaSampleWriter::MediaSampleConsumerFunction MediaSampleWriter::addTrack(
119         const std::shared_ptr<AMediaFormat>& trackFormat) {
120     if (trackFormat == nullptr) {
121         LOG(ERROR) << "Track format must be non-null";
122         return nullptr;
123     }
124 
125     std::scoped_lock lock(mMutex);
126     if (mState != INITIALIZED) {
127         LOG(ERROR) << "Muxer needs to be initialized when adding tracks.";
128         return nullptr;
129     }
130 
131     AMediaFormat* trackFormatCopy = AMediaFormat_new();
132     AMediaFormat_copy(trackFormatCopy, trackFormat.get());
133     // Request muxer to use background priorities by default.
134     AMediaFormatUtils::SetDefaultFormatValueInt32(TBD_AMEDIACODEC_PARAMETER_KEY_BACKGROUND_MODE,
135                                                   trackFormatCopy, 1 /* true */);
136 
137     ssize_t trackIndexOrError = mMuxer->addTrack(trackFormatCopy);
138     AMediaFormat_delete(trackFormatCopy);
139     if (trackIndexOrError < 0) {
140         LOG(ERROR) << "Failed to add media track to muxer: " << trackIndexOrError;
141         return nullptr;
142     }
143     const size_t trackIndex = static_cast<size_t>(trackIndexOrError);
144 
145     int64_t durationUs;
146     if (!AMediaFormat_getInt64(trackFormat.get(), AMEDIAFORMAT_KEY_DURATION, &durationUs)) {
147         durationUs = 0;
148     }
149 
150     mTracks.emplace(trackIndex, durationUs);
151     std::shared_ptr<MediaSampleWriter> thisWriter = shared_from_this();
152 
153     return [self = shared_from_this(), trackIndex](const std::shared_ptr<MediaSample>& sample) {
154         self->addSampleToTrack(trackIndex, sample);
155     };
156 }
157 
addSampleToTrack(size_t trackIndex,const std::shared_ptr<MediaSample> & sample)158 void MediaSampleWriter::addSampleToTrack(size_t trackIndex,
159                                          const std::shared_ptr<MediaSample>& sample) {
160     if (sample == nullptr) return;
161 
162     bool wasEmpty;
163     {
164         std::scoped_lock lock(mMutex);
165         wasEmpty = mSampleQueue.empty();
166         mSampleQueue.push(std::make_pair(trackIndex, sample));
167     }
168 
169     if (wasEmpty) {
170         mSampleSignal.notify_one();
171     }
172 }
173 
start()174 bool MediaSampleWriter::start() {
175     std::scoped_lock lock(mMutex);
176 
177     if (mTracks.size() == 0) {
178         LOG(ERROR) << "No tracks to write.";
179         return false;
180     } else if (mState != INITIALIZED) {
181         LOG(ERROR) << "Sample writer is not initialized";
182         return false;
183     }
184 
185     mState = STARTED;
186     std::thread([this] {
187         androidSetThreadPriority(0 /* tid (0 = current) */, ANDROID_PRIORITY_BACKGROUND);
188         prctl(PR_SET_NAME, (unsigned long)"SampleWriterTrd", 0, 0, 0);
189 
190         bool wasStopped = false;
191         media_status_t status = writeSamples(&wasStopped);
192         if (auto callbacks = mCallbacks.lock()) {
193             if (wasStopped && status == AMEDIA_OK) {
194                 callbacks->onStopped(this);
195             } else {
196                 callbacks->onFinished(this, status);
197             }
198         }
199     }).detach();
200     return true;
201 }
202 
stop()203 void MediaSampleWriter::stop() {
204     {
205         std::scoped_lock lock(mMutex);
206         if (mState != STARTED) {
207             LOG(ERROR) << "Sample writer is not started.";
208             return;
209         }
210         mState = STOPPED;
211     }
212 
213     mSampleSignal.notify_all();
214 }
215 
writeSamples(bool * wasStopped)216 media_status_t MediaSampleWriter::writeSamples(bool* wasStopped) {
217     media_status_t muxerStatus = mMuxer->start();
218     if (muxerStatus != AMEDIA_OK) {
219         LOG(ERROR) << "Error starting muxer: " << muxerStatus;
220         return muxerStatus;
221     }
222 
223     media_status_t writeStatus = runWriterLoop(wasStopped);
224     if (writeStatus != AMEDIA_OK) {
225         LOG(ERROR) << "Error writing samples: " << writeStatus;
226     }
227 
228     muxerStatus = mMuxer->stop();
229     if (muxerStatus != AMEDIA_OK) {
230         LOG(ERROR) << "Error stopping muxer: " << muxerStatus;
231     }
232 
233     return writeStatus != AMEDIA_OK ? writeStatus : muxerStatus;
234 }
235 
runWriterLoop(bool * wasStopped)236 media_status_t MediaSampleWriter::runWriterLoop(bool* wasStopped) NO_THREAD_SAFETY_ANALYSIS {
237     AMediaCodecBufferInfo bufferInfo;
238     int32_t lastProgressUpdate = 0;
239     bool progressSinceLastReport = false;
240     int trackEosCount = 0;
241 
242     // Set the "primary" track that will be used to determine progress to the track with longest
243     // duration.
244     int primaryTrackIndex = -1;
245     int64_t longestDurationUs = 0;
246     for (auto it = mTracks.begin(); it != mTracks.end(); ++it) {
247         if (it->second.mDurationUs > longestDurationUs) {
248             primaryTrackIndex = it->first;
249             longestDurationUs = it->second.mDurationUs;
250         }
251     }
252 
253     std::chrono::microseconds updateInterval(mHeartBeatIntervalUs);
254     std::chrono::steady_clock::time_point nextUpdateTime =
255             std::chrono::steady_clock::now() + updateInterval;
256 
257     while (true) {
258         if (trackEosCount >= mTracks.size()) {
259             break;
260         }
261 
262         size_t trackIndex;
263         std::shared_ptr<MediaSample> sample;
264         {
265             std::unique_lock lock(mMutex);
266             while (mSampleQueue.empty() && mState == STARTED) {
267                 if (mHeartBeatIntervalUs <= 0) {
268                     mSampleSignal.wait(lock);
269                     continue;
270                 }
271 
272                 if (mSampleSignal.wait_until(lock, nextUpdateTime) == std::cv_status::timeout) {
273                     // Send heart-beat if there is any progress since last update time.
274                     if (progressSinceLastReport) {
275                         if (auto callbacks = mCallbacks.lock()) {
276                             callbacks->onHeartBeat(this);
277                         }
278                         progressSinceLastReport = false;
279                     }
280                     nextUpdateTime += updateInterval;
281                 }
282             }
283 
284             if (mState == STOPPED) {
285                 *wasStopped = true;
286                 return AMEDIA_OK;
287             }
288 
289             auto& topEntry = mSampleQueue.top();
290             trackIndex = topEntry.first;
291             sample = topEntry.second;
292             mSampleQueue.pop();
293         }
294 
295         TrackRecord& track = mTracks[trackIndex];
296 
297         if (sample->info.flags & SAMPLE_FLAG_END_OF_STREAM) {
298             if (track.mReachedEos) {
299                 continue;
300             }
301 
302             // Track reached end of stream.
303             track.mReachedEos = true;
304             trackEosCount++;
305 
306             // Preserve source track duration by setting the appropriate timestamp on the
307             // empty End-Of-Stream sample.
308             if (track.mDurationUs > 0 && track.mFirstSampleTimeSet) {
309                 sample->info.presentationTimeUs = track.mDurationUs + track.mFirstSampleTimeUs;
310             }
311         }
312 
313         track.mPrevSampleTimeUs = sample->info.presentationTimeUs;
314         if (!track.mFirstSampleTimeSet) {
315             // Record the first sample's timestamp in order to translate duration to EOS
316             // time for tracks that does not start at 0.
317             track.mFirstSampleTimeUs = sample->info.presentationTimeUs;
318             track.mFirstSampleTimeSet = true;
319         }
320 
321         bufferInfo.offset = sample->dataOffset;
322         bufferInfo.size = sample->info.size;
323         bufferInfo.flags = sample->info.flags;
324         bufferInfo.presentationTimeUs = sample->info.presentationTimeUs;
325 
326         media_status_t status = mMuxer->writeSampleData(trackIndex, sample->buffer, &bufferInfo);
327         if (status != AMEDIA_OK) {
328             LOG(ERROR) << "writeSampleData returned " << status;
329             return status;
330         }
331         sample.reset();
332 
333         // TODO(lnilsson): Add option to toggle progress reporting on/off.
334         if (trackIndex == primaryTrackIndex) {
335             const int64_t elapsed = track.mPrevSampleTimeUs - track.mFirstSampleTimeUs;
336             int32_t progress = (elapsed * 100) / track.mDurationUs;
337             progress = std::clamp(progress, 0, 100);
338 
339             if (progress > lastProgressUpdate) {
340                 if (auto callbacks = mCallbacks.lock()) {
341                     callbacks->onProgressUpdate(this, progress);
342                 }
343                 lastProgressUpdate = progress;
344             }
345         }
346         progressSinceLastReport = true;
347     }
348 
349     return AMEDIA_OK;
350 }
351 }  // namespace android
352