1 /*
2  * Copyright (C) 2019 The Android Open Source Project
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *      http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 #define LOG_TAG "ExecutionBurstServer"
18 
19 #include "ExecutionBurstServer.h"
20 
21 #include <android-base/logging.h>
22 
23 #include <algorithm>
24 #include <cstring>
25 #include <limits>
26 #include <map>
27 #include <memory>
28 #include <thread>
29 #include <tuple>
30 #include <utility>
31 #include <vector>
32 
33 #include "HalInterfaces.h"
34 #include "Tracing.h"
35 #include "Utils.h"
36 
37 namespace android::nn {
38 namespace {
39 
40 using hardware::MQDescriptorSync;
41 using V1_2::FmqRequestDatum;
42 using V1_2::FmqResultDatum;
43 using V1_2::IBurstCallback;
44 using V1_2::IBurstContext;
45 
46 constexpr V1_2::Timing kNoTiming = {std::numeric_limits<uint64_t>::max(),
47                                     std::numeric_limits<uint64_t>::max()};
48 
49 // DefaultBurstExecutorWithCache adapts an IPreparedModel so that it can be
50 // used as an IBurstExecutorWithCache. Specifically, the cache simply stores the
51 // hidl_memory object, and the execution forwards calls to the provided
52 // IPreparedModel's "executeSynchronously" method. With this class, hidl_memory
53 // must be mapped and unmapped for each execution.
54 class DefaultBurstExecutorWithCache : public ExecutionBurstServer::IBurstExecutorWithCache {
55    public:
DefaultBurstExecutorWithCache(V1_2::IPreparedModel * preparedModel)56     DefaultBurstExecutorWithCache(V1_2::IPreparedModel* preparedModel)
57         : mpPreparedModel(preparedModel) {}
58 
isCacheEntryPresent(int32_t slot) const59     bool isCacheEntryPresent(int32_t slot) const override {
60         const auto it = mMemoryCache.find(slot);
61         return (it != mMemoryCache.end()) && it->second.valid();
62     }
63 
addCacheEntry(const hardware::hidl_memory & memory,int32_t slot)64     void addCacheEntry(const hardware::hidl_memory& memory, int32_t slot) override {
65         mMemoryCache[slot] = memory;
66     }
67 
removeCacheEntry(int32_t slot)68     void removeCacheEntry(int32_t slot) override { mMemoryCache.erase(slot); }
69 
execute(const V1_0::Request & request,const std::vector<int32_t> & slots,V1_2::MeasureTiming measure)70     std::tuple<V1_0::ErrorStatus, hardware::hidl_vec<V1_2::OutputShape>, V1_2::Timing> execute(
71             const V1_0::Request& request, const std::vector<int32_t>& slots,
72             V1_2::MeasureTiming measure) override {
73         // convert slots to pools
74         hardware::hidl_vec<hardware::hidl_memory> pools(slots.size());
75         std::transform(slots.begin(), slots.end(), pools.begin(),
76                        [this](int32_t slot) { return mMemoryCache[slot]; });
77 
78         // create full request
79         V1_0::Request fullRequest = request;
80         fullRequest.pools = std::move(pools);
81 
82         // setup execution
83         V1_0::ErrorStatus returnedStatus = V1_0::ErrorStatus::GENERAL_FAILURE;
84         hardware::hidl_vec<V1_2::OutputShape> returnedOutputShapes;
85         V1_2::Timing returnedTiming;
86         auto cb = [&returnedStatus, &returnedOutputShapes, &returnedTiming](
87                           V1_0::ErrorStatus status,
88                           const hardware::hidl_vec<V1_2::OutputShape>& outputShapes,
89                           const V1_2::Timing& timing) {
90             returnedStatus = status;
91             returnedOutputShapes = outputShapes;
92             returnedTiming = timing;
93         };
94 
95         // execute
96         const hardware::Return<void> ret =
97                 mpPreparedModel->executeSynchronously(fullRequest, measure, cb);
98         if (!ret.isOk() || returnedStatus != V1_0::ErrorStatus::NONE) {
99             LOG(ERROR) << "IPreparedModelAdapter::execute -- Error executing";
100             return {returnedStatus, std::move(returnedOutputShapes), kNoTiming};
101         }
102 
103         return std::make_tuple(returnedStatus, std::move(returnedOutputShapes), returnedTiming);
104     }
105 
106    private:
107     V1_2::IPreparedModel* const mpPreparedModel;
108     std::map<int32_t, hardware::hidl_memory> mMemoryCache;
109 };
110 
111 }  // anonymous namespace
112 
113 // serialize result
serialize(V1_0::ErrorStatus errorStatus,const std::vector<V1_2::OutputShape> & outputShapes,V1_2::Timing timing)114 std::vector<FmqResultDatum> serialize(V1_0::ErrorStatus errorStatus,
115                                       const std::vector<V1_2::OutputShape>& outputShapes,
116                                       V1_2::Timing timing) {
117     // count how many elements need to be sent for a request
118     size_t count = 2 + outputShapes.size();
119     for (const auto& outputShape : outputShapes) {
120         count += outputShape.dimensions.size();
121     }
122 
123     // create buffer to temporarily store elements
124     std::vector<FmqResultDatum> data;
125     data.reserve(count);
126 
127     // package packetInfo
128     {
129         FmqResultDatum datum;
130         datum.packetInformation({/*.packetSize=*/static_cast<uint32_t>(count),
131                                  /*.errorStatus=*/errorStatus,
132                                  /*.numberOfOperands=*/static_cast<uint32_t>(outputShapes.size())});
133         data.push_back(datum);
134     }
135 
136     // package output shape data
137     for (const auto& operand : outputShapes) {
138         // package operand information
139         FmqResultDatum::OperandInformation info{};
140         info.isSufficient = operand.isSufficient;
141         info.numberOfDimensions = static_cast<uint32_t>(operand.dimensions.size());
142 
143         FmqResultDatum datum;
144         datum.operandInformation(info);
145         data.push_back(datum);
146 
147         // package operand dimensions
148         for (uint32_t dimension : operand.dimensions) {
149             FmqResultDatum datum;
150             datum.operandDimensionValue(dimension);
151             data.push_back(datum);
152         }
153     }
154 
155     // package executionTiming
156     {
157         FmqResultDatum datum;
158         datum.executionTiming(timing);
159         data.push_back(datum);
160     }
161 
162     // return result
163     return data;
164 }
165 
166 // deserialize request
deserialize(const std::vector<FmqRequestDatum> & data)167 std::optional<std::tuple<V1_0::Request, std::vector<int32_t>, V1_2::MeasureTiming>> deserialize(
168         const std::vector<FmqRequestDatum>& data) {
169     using discriminator = FmqRequestDatum::hidl_discriminator;
170 
171     size_t index = 0;
172 
173     // validate packet information
174     if (index >= data.size() ||
175         data.at(index).getDiscriminator() != discriminator::packetInformation) {
176         LOG(ERROR) << "FMQ Request packet ill-formed";
177         return std::nullopt;
178     }
179 
180     // unpackage packet information
181     const FmqRequestDatum::PacketInformation& packetInfo = data.at(index).packetInformation();
182     index++;
183     const uint32_t packetSize = packetInfo.packetSize;
184     const uint32_t numberOfInputOperands = packetInfo.numberOfInputOperands;
185     const uint32_t numberOfOutputOperands = packetInfo.numberOfOutputOperands;
186     const uint32_t numberOfPools = packetInfo.numberOfPools;
187 
188     // verify packet size
189     if (data.size() != packetSize) {
190         LOG(ERROR) << "FMQ Request packet ill-formed";
191         return std::nullopt;
192     }
193 
194     // unpackage input operands
195     std::vector<V1_0::RequestArgument> inputs;
196     inputs.reserve(numberOfInputOperands);
197     for (size_t operand = 0; operand < numberOfInputOperands; ++operand) {
198         // validate input operand information
199         if (index >= data.size() ||
200             data.at(index).getDiscriminator() != discriminator::inputOperandInformation) {
201             LOG(ERROR) << "FMQ Request packet ill-formed";
202             return std::nullopt;
203         }
204 
205         // unpackage operand information
206         const FmqRequestDatum::OperandInformation& operandInfo =
207                 data.at(index).inputOperandInformation();
208         index++;
209         const bool hasNoValue = operandInfo.hasNoValue;
210         const V1_0::DataLocation location = operandInfo.location;
211         const uint32_t numberOfDimensions = operandInfo.numberOfDimensions;
212 
213         // unpackage operand dimensions
214         std::vector<uint32_t> dimensions;
215         dimensions.reserve(numberOfDimensions);
216         for (size_t i = 0; i < numberOfDimensions; ++i) {
217             // validate dimension
218             if (index >= data.size() ||
219                 data.at(index).getDiscriminator() != discriminator::inputOperandDimensionValue) {
220                 LOG(ERROR) << "FMQ Request packet ill-formed";
221                 return std::nullopt;
222             }
223 
224             // unpackage dimension
225             const uint32_t dimension = data.at(index).inputOperandDimensionValue();
226             index++;
227 
228             // store result
229             dimensions.push_back(dimension);
230         }
231 
232         // store result
233         inputs.push_back(
234                 {/*.hasNoValue=*/hasNoValue, /*.location=*/location, /*.dimensions=*/dimensions});
235     }
236 
237     // unpackage output operands
238     std::vector<V1_0::RequestArgument> outputs;
239     outputs.reserve(numberOfOutputOperands);
240     for (size_t operand = 0; operand < numberOfOutputOperands; ++operand) {
241         // validate output operand information
242         if (index >= data.size() ||
243             data.at(index).getDiscriminator() != discriminator::outputOperandInformation) {
244             LOG(ERROR) << "FMQ Request packet ill-formed";
245             return std::nullopt;
246         }
247 
248         // unpackage operand information
249         const FmqRequestDatum::OperandInformation& operandInfo =
250                 data.at(index).outputOperandInformation();
251         index++;
252         const bool hasNoValue = operandInfo.hasNoValue;
253         const V1_0::DataLocation location = operandInfo.location;
254         const uint32_t numberOfDimensions = operandInfo.numberOfDimensions;
255 
256         // unpackage operand dimensions
257         std::vector<uint32_t> dimensions;
258         dimensions.reserve(numberOfDimensions);
259         for (size_t i = 0; i < numberOfDimensions; ++i) {
260             // validate dimension
261             if (index >= data.size() ||
262                 data.at(index).getDiscriminator() != discriminator::outputOperandDimensionValue) {
263                 LOG(ERROR) << "FMQ Request packet ill-formed";
264                 return std::nullopt;
265             }
266 
267             // unpackage dimension
268             const uint32_t dimension = data.at(index).outputOperandDimensionValue();
269             index++;
270 
271             // store result
272             dimensions.push_back(dimension);
273         }
274 
275         // store result
276         outputs.push_back(
277                 {/*.hasNoValue=*/hasNoValue, /*.location=*/location, /*.dimensions=*/dimensions});
278     }
279 
280     // unpackage pools
281     std::vector<int32_t> slots;
282     slots.reserve(numberOfPools);
283     for (size_t pool = 0; pool < numberOfPools; ++pool) {
284         // validate input operand information
285         if (index >= data.size() ||
286             data.at(index).getDiscriminator() != discriminator::poolIdentifier) {
287             LOG(ERROR) << "FMQ Request packet ill-formed";
288             return std::nullopt;
289         }
290 
291         // unpackage operand information
292         const int32_t poolId = data.at(index).poolIdentifier();
293         index++;
294 
295         // store result
296         slots.push_back(poolId);
297     }
298 
299     // validate measureTiming
300     if (index >= data.size() || data.at(index).getDiscriminator() != discriminator::measureTiming) {
301         LOG(ERROR) << "FMQ Request packet ill-formed";
302         return std::nullopt;
303     }
304 
305     // unpackage measureTiming
306     const V1_2::MeasureTiming measure = data.at(index).measureTiming();
307     index++;
308 
309     // validate packet information
310     if (index != packetSize) {
311         LOG(ERROR) << "FMQ Request packet ill-formed";
312         return std::nullopt;
313     }
314 
315     // return request
316     V1_0::Request request = {/*.inputs=*/inputs, /*.outputs=*/outputs, /*.pools=*/{}};
317     return std::make_tuple(std::move(request), std::move(slots), measure);
318 }
319 
320 // RequestChannelReceiver methods
321 
create(const FmqRequestDescriptor & requestChannel,std::chrono::microseconds pollingTimeWindow)322 std::unique_ptr<RequestChannelReceiver> RequestChannelReceiver::create(
323         const FmqRequestDescriptor& requestChannel, std::chrono::microseconds pollingTimeWindow) {
324     std::unique_ptr<FmqRequestChannel> fmqRequestChannel =
325             std::make_unique<FmqRequestChannel>(requestChannel);
326 
327     if (!fmqRequestChannel->isValid()) {
328         LOG(ERROR) << "Unable to create RequestChannelReceiver";
329         return nullptr;
330     }
331     if (fmqRequestChannel->getEventFlagWord() == nullptr) {
332         LOG(ERROR)
333                 << "RequestChannelReceiver::create was passed an MQDescriptor without an EventFlag";
334         return nullptr;
335     }
336 
337     return std::make_unique<RequestChannelReceiver>(std::move(fmqRequestChannel),
338                                                     pollingTimeWindow);
339 }
340 
RequestChannelReceiver(std::unique_ptr<FmqRequestChannel> fmqRequestChannel,std::chrono::microseconds pollingTimeWindow)341 RequestChannelReceiver::RequestChannelReceiver(std::unique_ptr<FmqRequestChannel> fmqRequestChannel,
342                                                std::chrono::microseconds pollingTimeWindow)
343     : mFmqRequestChannel(std::move(fmqRequestChannel)), kPollingTimeWindow(pollingTimeWindow) {}
344 
345 std::optional<std::tuple<V1_0::Request, std::vector<int32_t>, V1_2::MeasureTiming>>
getBlocking()346 RequestChannelReceiver::getBlocking() {
347     const auto packet = getPacketBlocking();
348     if (!packet) {
349         return std::nullopt;
350     }
351 
352     return deserialize(*packet);
353 }
354 
invalidate()355 void RequestChannelReceiver::invalidate() {
356     mTeardown = true;
357 
358     // force unblock
359     // ExecutionBurstServer is by default waiting on a request packet. If the
360     // client process destroys its burst object, the server may still be waiting
361     // on the futex. This force unblock wakes up any thread waiting on the
362     // futex.
363     // TODO: look for a different/better way to signal/notify the futex to wake
364     // up any thread waiting on it
365     FmqRequestDatum datum;
366     datum.packetInformation({/*.packetSize=*/0, /*.numberOfInputOperands=*/0,
367                              /*.numberOfOutputOperands=*/0, /*.numberOfPools=*/0});
368     mFmqRequestChannel->writeBlocking(&datum, 1);
369 }
370 
getPacketBlocking()371 std::optional<std::vector<FmqRequestDatum>> RequestChannelReceiver::getPacketBlocking() {
372 
373     if (mTeardown) {
374         return std::nullopt;
375     }
376 
377     // First spend time polling if results are available in FMQ instead of
378     // waiting on the futex. Polling is more responsive (yielding lower
379     // latencies), but can take up more power, so only poll for a limited period
380     // of time.
381 
382     auto& getCurrentTime = std::chrono::high_resolution_clock::now;
383     const auto timeToStopPolling = getCurrentTime() + kPollingTimeWindow;
384 
385     while (getCurrentTime() < timeToStopPolling) {
386         // if class is being torn down, immediately return
387         if (mTeardown.load(std::memory_order_relaxed)) {
388             return std::nullopt;
389         }
390 
391         // Check if data is available. If it is, immediately retrieve it and
392         // return.
393         const size_t available = mFmqRequestChannel->availableToRead();
394         if (available > 0) {
395             // This is the first point when we know an execution is occurring,
396             // so begin to collect systraces. Note that a similar systrace does
397             // not exist at the corresponding point in
398             // ResultChannelReceiver::getPacketBlocking because the execution is
399             // already in flight.
400             NNTRACE_FULL(NNTRACE_LAYER_IPC, NNTRACE_PHASE_EXECUTION,
401                          "ExecutionBurstServer getting packet");
402             std::vector<FmqRequestDatum> packet(available);
403             const bool success = mFmqRequestChannel->read(packet.data(), available);
404             if (!success) {
405                 LOG(ERROR) << "Error receiving packet";
406                 return std::nullopt;
407             }
408             return std::make_optional(std::move(packet));
409         }
410 
411         std::this_thread::yield();
412     }
413 
414     // If we get to this point, we either stopped polling because it was taking
415     // too long or polling was not allowed. Instead, perform a blocking call
416     // which uses a futex to save power.
417 
418     // wait for request packet and read first element of request packet
419     FmqRequestDatum datum;
420     bool success = mFmqRequestChannel->readBlocking(&datum, 1);
421 
422     // This is the first point when we know an execution is occurring, so begin
423     // to collect systraces. Note that a similar systrace does not exist at the
424     // corresponding point in ResultChannelReceiver::getPacketBlocking because
425     // the execution is already in flight.
426     NNTRACE_FULL(NNTRACE_LAYER_IPC, NNTRACE_PHASE_EXECUTION, "ExecutionBurstServer getting packet");
427 
428     // retrieve remaining elements
429     // NOTE: all of the data is already available at this point, so there's no
430     // need to do a blocking wait to wait for more data. This is known because
431     // in FMQ, all writes are published (made available) atomically. Currently,
432     // the producer always publishes the entire packet in one function call, so
433     // if the first element of the packet is available, the remaining elements
434     // are also available.
435     const size_t count = mFmqRequestChannel->availableToRead();
436     std::vector<FmqRequestDatum> packet(count + 1);
437     std::memcpy(&packet.front(), &datum, sizeof(datum));
438     success &= mFmqRequestChannel->read(packet.data() + 1, count);
439 
440     // terminate loop
441     if (mTeardown) {
442         return std::nullopt;
443     }
444 
445     // ensure packet was successfully received
446     if (!success) {
447         LOG(ERROR) << "Error receiving packet";
448         return std::nullopt;
449     }
450 
451     return std::make_optional(std::move(packet));
452 }
453 
454 // ResultChannelSender methods
455 
create(const FmqResultDescriptor & resultChannel)456 std::unique_ptr<ResultChannelSender> ResultChannelSender::create(
457         const FmqResultDescriptor& resultChannel) {
458     std::unique_ptr<FmqResultChannel> fmqResultChannel =
459             std::make_unique<FmqResultChannel>(resultChannel);
460 
461     if (!fmqResultChannel->isValid()) {
462         LOG(ERROR) << "Unable to create RequestChannelSender";
463         return nullptr;
464     }
465     if (fmqResultChannel->getEventFlagWord() == nullptr) {
466         LOG(ERROR) << "ResultChannelSender::create was passed an MQDescriptor without an EventFlag";
467         return nullptr;
468     }
469 
470     return std::make_unique<ResultChannelSender>(std::move(fmqResultChannel));
471 }
472 
ResultChannelSender(std::unique_ptr<FmqResultChannel> fmqResultChannel)473 ResultChannelSender::ResultChannelSender(std::unique_ptr<FmqResultChannel> fmqResultChannel)
474     : mFmqResultChannel(std::move(fmqResultChannel)) {}
475 
send(V1_0::ErrorStatus errorStatus,const std::vector<V1_2::OutputShape> & outputShapes,V1_2::Timing timing)476 bool ResultChannelSender::send(V1_0::ErrorStatus errorStatus,
477                                const std::vector<V1_2::OutputShape>& outputShapes,
478                                V1_2::Timing timing) {
479     const std::vector<FmqResultDatum> serialized = serialize(errorStatus, outputShapes, timing);
480     return sendPacket(serialized);
481 }
482 
sendPacket(const std::vector<FmqResultDatum> & packet)483 bool ResultChannelSender::sendPacket(const std::vector<FmqResultDatum>& packet) {
484     if (packet.size() > mFmqResultChannel->availableToWrite()) {
485         LOG(ERROR)
486                 << "ResultChannelSender::sendPacket -- packet size exceeds size available in FMQ";
487         const std::vector<FmqResultDatum> errorPacket =
488                 serialize(V1_0::ErrorStatus::GENERAL_FAILURE, {}, kNoTiming);
489 
490         // Always send the packet with "blocking" because this signals the futex
491         // and unblocks the consumer if it is waiting on the futex.
492         return mFmqResultChannel->writeBlocking(errorPacket.data(), errorPacket.size());
493     }
494 
495     // Always send the packet with "blocking" because this signals the futex and
496     // unblocks the consumer if it is waiting on the futex.
497     return mFmqResultChannel->writeBlocking(packet.data(), packet.size());
498 }
499 
500 // ExecutionBurstServer methods
501 
create(const sp<IBurstCallback> & callback,const MQDescriptorSync<FmqRequestDatum> & requestChannel,const MQDescriptorSync<FmqResultDatum> & resultChannel,std::shared_ptr<IBurstExecutorWithCache> executorWithCache,std::chrono::microseconds pollingTimeWindow)502 sp<ExecutionBurstServer> ExecutionBurstServer::create(
503         const sp<IBurstCallback>& callback, const MQDescriptorSync<FmqRequestDatum>& requestChannel,
504         const MQDescriptorSync<FmqResultDatum>& resultChannel,
505         std::shared_ptr<IBurstExecutorWithCache> executorWithCache,
506         std::chrono::microseconds pollingTimeWindow) {
507     // check inputs
508     if (callback == nullptr || executorWithCache == nullptr) {
509         LOG(ERROR) << "ExecutionBurstServer::create passed a nullptr";
510         return nullptr;
511     }
512 
513     // create FMQ objects
514     std::unique_ptr<RequestChannelReceiver> requestChannelReceiver =
515             RequestChannelReceiver::create(requestChannel, pollingTimeWindow);
516     std::unique_ptr<ResultChannelSender> resultChannelSender =
517             ResultChannelSender::create(resultChannel);
518 
519     // check FMQ objects
520     if (!requestChannelReceiver || !resultChannelSender) {
521         LOG(ERROR) << "ExecutionBurstServer::create failed to create FastMessageQueue";
522         return nullptr;
523     }
524 
525     // make and return context
526     return new ExecutionBurstServer(callback, std::move(requestChannelReceiver),
527                                     std::move(resultChannelSender), std::move(executorWithCache));
528 }
529 
create(const sp<IBurstCallback> & callback,const MQDescriptorSync<FmqRequestDatum> & requestChannel,const MQDescriptorSync<FmqResultDatum> & resultChannel,V1_2::IPreparedModel * preparedModel,std::chrono::microseconds pollingTimeWindow)530 sp<ExecutionBurstServer> ExecutionBurstServer::create(
531         const sp<IBurstCallback>& callback, const MQDescriptorSync<FmqRequestDatum>& requestChannel,
532         const MQDescriptorSync<FmqResultDatum>& resultChannel, V1_2::IPreparedModel* preparedModel,
533         std::chrono::microseconds pollingTimeWindow) {
534     // check relevant input
535     if (preparedModel == nullptr) {
536         LOG(ERROR) << "ExecutionBurstServer::create passed a nullptr";
537         return nullptr;
538     }
539 
540     // adapt IPreparedModel to have caching
541     const std::shared_ptr<DefaultBurstExecutorWithCache> preparedModelAdapter =
542             std::make_shared<DefaultBurstExecutorWithCache>(preparedModel);
543 
544     // make and return context
545     return ExecutionBurstServer::create(callback, requestChannel, resultChannel,
546                                         preparedModelAdapter, pollingTimeWindow);
547 }
548 
ExecutionBurstServer(const sp<IBurstCallback> & callback,std::unique_ptr<RequestChannelReceiver> requestChannel,std::unique_ptr<ResultChannelSender> resultChannel,std::shared_ptr<IBurstExecutorWithCache> executorWithCache)549 ExecutionBurstServer::ExecutionBurstServer(
550         const sp<IBurstCallback>& callback, std::unique_ptr<RequestChannelReceiver> requestChannel,
551         std::unique_ptr<ResultChannelSender> resultChannel,
552         std::shared_ptr<IBurstExecutorWithCache> executorWithCache)
553     : mCallback(callback),
554       mRequestChannelReceiver(std::move(requestChannel)),
555       mResultChannelSender(std::move(resultChannel)),
556       mExecutorWithCache(std::move(executorWithCache)) {
557     // TODO: highly document the threading behavior of this class
558     mWorker = std::thread([this] { task(); });
559 }
560 
~ExecutionBurstServer()561 ExecutionBurstServer::~ExecutionBurstServer() {
562     // set teardown flag
563     mTeardown = true;
564     mRequestChannelReceiver->invalidate();
565 
566     // wait for task thread to end
567     mWorker.join();
568 }
569 
freeMemory(int32_t slot)570 hardware::Return<void> ExecutionBurstServer::freeMemory(int32_t slot) {
571     std::lock_guard<std::mutex> hold(mMutex);
572     mExecutorWithCache->removeCacheEntry(slot);
573     return hardware::Void();
574 }
575 
ensureCacheEntriesArePresentLocked(const std::vector<int32_t> & slots)576 void ExecutionBurstServer::ensureCacheEntriesArePresentLocked(const std::vector<int32_t>& slots) {
577     const auto slotIsKnown = [this](int32_t slot) {
578         return mExecutorWithCache->isCacheEntryPresent(slot);
579     };
580 
581     // find unique unknown slots
582     std::vector<int32_t> unknownSlots = slots;
583     auto unknownSlotsEnd = unknownSlots.end();
584     std::sort(unknownSlots.begin(), unknownSlotsEnd);
585     unknownSlotsEnd = std::unique(unknownSlots.begin(), unknownSlotsEnd);
586     unknownSlotsEnd = std::remove_if(unknownSlots.begin(), unknownSlotsEnd, slotIsKnown);
587     unknownSlots.erase(unknownSlotsEnd, unknownSlots.end());
588 
589     // quick-exit if all slots are known
590     if (unknownSlots.empty()) {
591         return;
592     }
593 
594     V1_0::ErrorStatus errorStatus = V1_0::ErrorStatus::GENERAL_FAILURE;
595     std::vector<hardware::hidl_memory> returnedMemories;
596     auto cb = [&errorStatus, &returnedMemories](
597                       V1_0::ErrorStatus status,
598                       const hardware::hidl_vec<hardware::hidl_memory>& memories) {
599         errorStatus = status;
600         returnedMemories = memories;
601     };
602 
603     const hardware::Return<void> ret = mCallback->getMemories(unknownSlots, cb);
604 
605     if (!ret.isOk() || errorStatus != V1_0::ErrorStatus::NONE ||
606         returnedMemories.size() != unknownSlots.size()) {
607         LOG(ERROR) << "Error retrieving memories";
608         return;
609     }
610 
611     // add memories to unknown slots
612     for (size_t i = 0; i < unknownSlots.size(); ++i) {
613         mExecutorWithCache->addCacheEntry(returnedMemories[i], unknownSlots[i]);
614     }
615 }
616 
task()617 void ExecutionBurstServer::task() {
618     // loop until the burst object is being destroyed
619     while (!mTeardown) {
620         // receive request
621         auto arguments = mRequestChannelReceiver->getBlocking();
622 
623         // if the request packet was not properly received, return a generic
624         // error and skip the execution
625         //
626         // if the  burst is being torn down, skip the execution exection so the
627         // "task" function can end
628         if (!arguments) {
629             if (!mTeardown) {
630                 mResultChannelSender->send(V1_0::ErrorStatus::GENERAL_FAILURE, {}, kNoTiming);
631             }
632             continue;
633         }
634 
635         // otherwise begin tracing execution
636         NNTRACE_FULL(NNTRACE_LAYER_IPC, NNTRACE_PHASE_EXECUTION,
637                      "ExecutionBurstServer getting memory, executing, and returning results");
638 
639         // unpack the arguments; types are Request, std::vector<int32_t>, and
640         // MeasureTiming, respectively
641         const auto [requestWithoutPools, slotsOfPools, measure] = std::move(*arguments);
642 
643         // ensure executor with cache has required memory
644         std::lock_guard<std::mutex> hold(mMutex);
645         ensureCacheEntriesArePresentLocked(slotsOfPools);
646 
647         // perform computation; types are ErrorStatus, hidl_vec<OutputShape>,
648         // and Timing, respectively
649         const auto [errorStatus, outputShapes, returnedTiming] =
650                 mExecutorWithCache->execute(requestWithoutPools, slotsOfPools, measure);
651 
652         // return result
653         mResultChannelSender->send(errorStatus, outputShapes, returnedTiming);
654     }
655 }
656 
657 }  // namespace android::nn
658