/* * Copyright (C) 2019 The Android Open Source Project * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ #define LOG_TAG "ExecutionBurstUtils" #include "ExecutionBurstUtils.h" #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include namespace android::hardware::neuralnetworks::V1_2::utils { namespace { constexpr V1_2::Timing kNoTiming = {std::numeric_limits::max(), std::numeric_limits::max()}; std::chrono::microseconds getPollingTimeWindow(const std::string& property) { constexpr int32_t kDefaultPollingTimeWindow = 0; #ifdef NN_DEBUGGABLE constexpr int32_t kMinPollingTimeWindow = 0; const int32_t selectedPollingTimeWindow = base::GetIntProperty(property, kDefaultPollingTimeWindow, kMinPollingTimeWindow); return std::chrono::microseconds(selectedPollingTimeWindow); #else (void)property; return std::chrono::microseconds(kDefaultPollingTimeWindow); #endif // NN_DEBUGGABLE } } // namespace std::chrono::microseconds getBurstControllerPollingTimeWindow() { return getPollingTimeWindow("debug.nn.burst-controller-polling-window"); } std::chrono::microseconds getBurstServerPollingTimeWindow() { return getPollingTimeWindow("debug.nn.burst-server-polling-window"); } // serialize a request into a packet std::vector serialize(const V1_0::Request& request, V1_2::MeasureTiming measure, const std::vector& slots) { // count how many elements need to be sent for a request size_t count = 2 + request.inputs.size() + request.outputs.size() + slots.size(); for (const auto& input : request.inputs) { count += input.dimensions.size(); } for (const auto& output : request.outputs) { count += output.dimensions.size(); } CHECK_LE(count, std::numeric_limits::max()); // create buffer to temporarily store elements std::vector data; data.reserve(count); // package packetInfo data.emplace_back(); data.back().packetInformation( {.packetSize = static_cast(count), .numberOfInputOperands = static_cast(request.inputs.size()), .numberOfOutputOperands = static_cast(request.outputs.size()), .numberOfPools = static_cast(slots.size())}); // package input data for (const auto& input : request.inputs) { // package operand information data.emplace_back(); data.back().inputOperandInformation( {.hasNoValue = input.hasNoValue, .location = input.location, .numberOfDimensions = static_cast(input.dimensions.size())}); // package operand dimensions for (uint32_t dimension : input.dimensions) { data.emplace_back(); data.back().inputOperandDimensionValue(dimension); } } // package output data for (const auto& output : request.outputs) { // package operand information data.emplace_back(); data.back().outputOperandInformation( {.hasNoValue = output.hasNoValue, .location = output.location, .numberOfDimensions = static_cast(output.dimensions.size())}); // package operand dimensions for (uint32_t dimension : output.dimensions) { data.emplace_back(); data.back().outputOperandDimensionValue(dimension); } } // package pool identifier for (int32_t slot : slots) { data.emplace_back(); data.back().poolIdentifier(slot); } // package measureTiming data.emplace_back(); data.back().measureTiming(measure); CHECK_EQ(data.size(), count); // return packet return data; } // serialize result std::vector serialize(V1_0::ErrorStatus errorStatus, const std::vector& outputShapes, V1_2::Timing timing) { // count how many elements need to be sent for a request size_t count = 2 + outputShapes.size(); for (const auto& outputShape : outputShapes) { count += outputShape.dimensions.size(); } // create buffer to temporarily store elements std::vector data; data.reserve(count); // package packetInfo data.emplace_back(); data.back().packetInformation({.packetSize = static_cast(count), .errorStatus = errorStatus, .numberOfOperands = static_cast(outputShapes.size())}); // package output shape data for (const auto& operand : outputShapes) { // package operand information data.emplace_back(); data.back().operandInformation( {.isSufficient = operand.isSufficient, .numberOfDimensions = static_cast(operand.dimensions.size())}); // package operand dimensions for (uint32_t dimension : operand.dimensions) { data.emplace_back(); data.back().operandDimensionValue(dimension); } } // package executionTiming data.emplace_back(); data.back().executionTiming(timing); CHECK_EQ(data.size(), count); // return result return data; } // deserialize request nn::Result, V1_2::MeasureTiming>> deserialize( const std::vector& data) { using discriminator = FmqRequestDatum::hidl_discriminator; size_t index = 0; // validate packet information if (data.size() == 0 || data[index].getDiscriminator() != discriminator::packetInformation) { return NN_ERROR() << "FMQ Request packet ill-formed"; } // unpackage packet information const FmqRequestDatum::PacketInformation& packetInfo = data[index].packetInformation(); index++; const uint32_t packetSize = packetInfo.packetSize; const uint32_t numberOfInputOperands = packetInfo.numberOfInputOperands; const uint32_t numberOfOutputOperands = packetInfo.numberOfOutputOperands; const uint32_t numberOfPools = packetInfo.numberOfPools; // verify packet size if (data.size() != packetSize) { return NN_ERROR() << "FMQ Request packet ill-formed"; } // unpackage input operands std::vector inputs; inputs.reserve(numberOfInputOperands); for (size_t operand = 0; operand < numberOfInputOperands; ++operand) { // validate input operand information if (data[index].getDiscriminator() != discriminator::inputOperandInformation) { return NN_ERROR() << "FMQ Request packet ill-formed"; } // unpackage operand information const FmqRequestDatum::OperandInformation& operandInfo = data[index].inputOperandInformation(); index++; const bool hasNoValue = operandInfo.hasNoValue; const V1_0::DataLocation location = operandInfo.location; const uint32_t numberOfDimensions = operandInfo.numberOfDimensions; // unpackage operand dimensions std::vector dimensions; dimensions.reserve(numberOfDimensions); for (size_t i = 0; i < numberOfDimensions; ++i) { // validate dimension if (data[index].getDiscriminator() != discriminator::inputOperandDimensionValue) { return NN_ERROR() << "FMQ Request packet ill-formed"; } // unpackage dimension const uint32_t dimension = data[index].inputOperandDimensionValue(); index++; // store result dimensions.push_back(dimension); } // store result inputs.push_back( {.hasNoValue = hasNoValue, .location = location, .dimensions = dimensions}); } // unpackage output operands std::vector outputs; outputs.reserve(numberOfOutputOperands); for (size_t operand = 0; operand < numberOfOutputOperands; ++operand) { // validate output operand information if (data[index].getDiscriminator() != discriminator::outputOperandInformation) { return NN_ERROR() << "FMQ Request packet ill-formed"; } // unpackage operand information const FmqRequestDatum::OperandInformation& operandInfo = data[index].outputOperandInformation(); index++; const bool hasNoValue = operandInfo.hasNoValue; const V1_0::DataLocation location = operandInfo.location; const uint32_t numberOfDimensions = operandInfo.numberOfDimensions; // unpackage operand dimensions std::vector dimensions; dimensions.reserve(numberOfDimensions); for (size_t i = 0; i < numberOfDimensions; ++i) { // validate dimension if (data[index].getDiscriminator() != discriminator::outputOperandDimensionValue) { return NN_ERROR() << "FMQ Request packet ill-formed"; } // unpackage dimension const uint32_t dimension = data[index].outputOperandDimensionValue(); index++; // store result dimensions.push_back(dimension); } // store result outputs.push_back( {.hasNoValue = hasNoValue, .location = location, .dimensions = dimensions}); } // unpackage pools std::vector slots; slots.reserve(numberOfPools); for (size_t pool = 0; pool < numberOfPools; ++pool) { // validate input operand information if (data[index].getDiscriminator() != discriminator::poolIdentifier) { return NN_ERROR() << "FMQ Request packet ill-formed"; } // unpackage operand information const int32_t poolId = data[index].poolIdentifier(); index++; // store result slots.push_back(poolId); } // validate measureTiming if (data[index].getDiscriminator() != discriminator::measureTiming) { return NN_ERROR() << "FMQ Request packet ill-formed"; } // unpackage measureTiming const V1_2::MeasureTiming measure = data[index].measureTiming(); index++; // validate packet information if (index != packetSize) { return NN_ERROR() << "FMQ Result packet ill-formed"; } // return request V1_0::Request request = {.inputs = inputs, .outputs = outputs, .pools = {}}; return std::make_tuple(std::move(request), std::move(slots), measure); } // deserialize a packet into the result nn::Result, V1_2::Timing>> deserialize( const std::vector& data) { using discriminator = FmqResultDatum::hidl_discriminator; size_t index = 0; // validate packet information if (data.size() == 0 || data[index].getDiscriminator() != discriminator::packetInformation) { return NN_ERROR() << "FMQ Result packet ill-formed"; } // unpackage packet information const FmqResultDatum::PacketInformation& packetInfo = data[index].packetInformation(); index++; const uint32_t packetSize = packetInfo.packetSize; const V1_0::ErrorStatus errorStatus = packetInfo.errorStatus; const uint32_t numberOfOperands = packetInfo.numberOfOperands; // verify packet size if (data.size() != packetSize) { return NN_ERROR() << "FMQ Result packet ill-formed"; } // unpackage operands std::vector outputShapes; outputShapes.reserve(numberOfOperands); for (size_t operand = 0; operand < numberOfOperands; ++operand) { // validate operand information if (data[index].getDiscriminator() != discriminator::operandInformation) { return NN_ERROR() << "FMQ Result packet ill-formed"; } // unpackage operand information const FmqResultDatum::OperandInformation& operandInfo = data[index].operandInformation(); index++; const bool isSufficient = operandInfo.isSufficient; const uint32_t numberOfDimensions = operandInfo.numberOfDimensions; // unpackage operand dimensions std::vector dimensions; dimensions.reserve(numberOfDimensions); for (size_t i = 0; i < numberOfDimensions; ++i) { // validate dimension if (data[index].getDiscriminator() != discriminator::operandDimensionValue) { return NN_ERROR() << "FMQ Result packet ill-formed"; } // unpackage dimension const uint32_t dimension = data[index].operandDimensionValue(); index++; // store result dimensions.push_back(dimension); } // store result outputShapes.push_back({.dimensions = dimensions, .isSufficient = isSufficient}); } // validate execution timing if (data[index].getDiscriminator() != discriminator::executionTiming) { return NN_ERROR() << "FMQ Result packet ill-formed"; } // unpackage execution timing const V1_2::Timing timing = data[index].executionTiming(); index++; // validate packet information if (index != packetSize) { return NN_ERROR() << "FMQ Result packet ill-formed"; } // return result return std::make_tuple(errorStatus, std::move(outputShapes), timing); } // RequestChannelSender methods nn::GeneralResult< std::pair, const MQDescriptorSync*>> RequestChannelSender::create(size_t channelLength) { auto requestChannelSender = std::make_unique(PrivateConstructorTag{}, channelLength); if (!requestChannelSender->mFmqRequestChannel.isValid()) { return NN_ERROR() << "Unable to create RequestChannelSender"; } const MQDescriptorSync* descriptor = requestChannelSender->mFmqRequestChannel.getDesc(); return std::make_pair(std::move(requestChannelSender), descriptor); } RequestChannelSender::RequestChannelSender(PrivateConstructorTag /*tag*/, size_t channelLength) : mFmqRequestChannel(channelLength, /*configureEventFlagWord=*/true) {} nn::Result RequestChannelSender::send(const V1_0::Request& request, V1_2::MeasureTiming measure, const std::vector& slots) { const std::vector serialized = serialize(request, measure, slots); return sendPacket(serialized); } nn::Result RequestChannelSender::sendPacket(const std::vector& packet) { if (!mValid) { return NN_ERROR() << "FMQ object is invalid"; } if (packet.size() > mFmqRequestChannel.availableToWrite()) { return NN_ERROR() << "RequestChannelSender::sendPacket -- packet size exceeds size available in FMQ"; } // Always send the packet with "blocking" because this signals the futex and unblocks the // consumer if it is waiting on the futex. const bool success = mFmqRequestChannel.writeBlocking(packet.data(), packet.size()); if (!success) { return NN_ERROR() << "RequestChannelSender::sendPacket -- FMQ's writeBlocking returned an error"; } return {}; } void RequestChannelSender::notifyAsDeadObject() { mValid = false; } // RequestChannelReceiver methods nn::GeneralResult> RequestChannelReceiver::create( const MQDescriptorSync& requestChannel, std::chrono::microseconds pollingTimeWindow) { auto requestChannelReceiver = std::make_unique( PrivateConstructorTag{}, requestChannel, pollingTimeWindow); if (!requestChannelReceiver->mFmqRequestChannel.isValid()) { return NN_ERROR() << "Unable to create RequestChannelReceiver"; } if (requestChannelReceiver->mFmqRequestChannel.getEventFlagWord() == nullptr) { return NN_ERROR() << "RequestChannelReceiver::create was passed an MQDescriptor without an EventFlag"; } return requestChannelReceiver; } RequestChannelReceiver::RequestChannelReceiver( PrivateConstructorTag /*tag*/, const MQDescriptorSync& requestChannel, std::chrono::microseconds pollingTimeWindow) : mFmqRequestChannel(requestChannel), kPollingTimeWindow(pollingTimeWindow) {} nn::Result, V1_2::MeasureTiming>> RequestChannelReceiver::getBlocking() { const auto packet = NN_TRY(getPacketBlocking()); return deserialize(packet); } void RequestChannelReceiver::invalidate() { mTeardown = true; // force unblock // ExecutionBurstServer is by default waiting on a request packet. If the client process // destroys its burst object, the server may still be waiting on the futex. This force unblock // wakes up any thread waiting on the futex. const auto data = serialize(V1_0::Request{}, V1_2::MeasureTiming::NO, {}); mFmqRequestChannel.writeBlocking(data.data(), data.size()); } nn::Result> RequestChannelReceiver::getPacketBlocking() { if (mTeardown) { return NN_ERROR() << "FMQ object is being torn down"; } // First spend time polling if results are available in FMQ instead of waiting on the futex. // Polling is more responsive (yielding lower latencies), but can take up more power, so only // poll for a limited period of time. auto& getCurrentTime = std::chrono::high_resolution_clock::now; const auto timeToStopPolling = getCurrentTime() + kPollingTimeWindow; while (getCurrentTime() < timeToStopPolling) { // if class is being torn down, immediately return if (mTeardown.load(std::memory_order_relaxed)) { return NN_ERROR() << "FMQ object is being torn down"; } // Check if data is available. If it is, immediately retrieve it and return. const size_t available = mFmqRequestChannel.availableToRead(); if (available > 0) { std::vector packet(available); const bool success = mFmqRequestChannel.readBlocking(packet.data(), available); if (!success) { return NN_ERROR() << "Error receiving packet"; } return packet; } std::this_thread::yield(); } // If we get to this point, we either stopped polling because it was taking too long or polling // was not allowed. Instead, perform a blocking call which uses a futex to save power. // wait for request packet and read first element of request packet FmqRequestDatum datum; bool success = mFmqRequestChannel.readBlocking(&datum, 1); // retrieve remaining elements // NOTE: all of the data is already available at this point, so there's no need to do a blocking // wait to wait for more data. This is known because in FMQ, all writes are published (made // available) atomically. Currently, the producer always publishes the entire packet in one // function call, so if the first element of the packet is available, the remaining elements are // also available. const size_t count = mFmqRequestChannel.availableToRead(); std::vector packet(count + 1); std::memcpy(&packet.front(), &datum, sizeof(datum)); success &= mFmqRequestChannel.read(packet.data() + 1, count); // terminate loop if (mTeardown) { return NN_ERROR() << "FMQ object is being torn down"; } // ensure packet was successfully received if (!success) { return NN_ERROR() << "Error receiving packet"; } return packet; } // ResultChannelSender methods nn::GeneralResult> ResultChannelSender::create( const MQDescriptorSync& resultChannel) { auto resultChannelSender = std::make_unique(PrivateConstructorTag{}, resultChannel); if (!resultChannelSender->mFmqResultChannel.isValid()) { return NN_ERROR() << "Unable to create RequestChannelSender"; } if (resultChannelSender->mFmqResultChannel.getEventFlagWord() == nullptr) { return NN_ERROR() << "ResultChannelSender::create was passed an MQDescriptor without an EventFlag"; } return resultChannelSender; } ResultChannelSender::ResultChannelSender(PrivateConstructorTag /*tag*/, const MQDescriptorSync& resultChannel) : mFmqResultChannel(resultChannel) {} void ResultChannelSender::send(V1_0::ErrorStatus errorStatus, const std::vector& outputShapes, V1_2::Timing timing) { const std::vector serialized = serialize(errorStatus, outputShapes, timing); sendPacket(serialized); } void ResultChannelSender::sendPacket(const std::vector& packet) { if (packet.size() > mFmqResultChannel.availableToWrite()) { LOG(ERROR) << "ResultChannelSender::sendPacket -- packet size exceeds size available in FMQ"; const std::vector errorPacket = serialize(V1_0::ErrorStatus::GENERAL_FAILURE, {}, kNoTiming); // Always send the packet with "blocking" because this signals the futex and unblocks the // consumer if it is waiting on the futex. mFmqResultChannel.writeBlocking(errorPacket.data(), errorPacket.size()); } else { // Always send the packet with "blocking" because this signals the futex and unblocks the // consumer if it is waiting on the futex. mFmqResultChannel.writeBlocking(packet.data(), packet.size()); } } // ResultChannelReceiver methods nn::GeneralResult< std::pair, const MQDescriptorSync*>> ResultChannelReceiver::create(size_t channelLength, std::chrono::microseconds pollingTimeWindow) { auto resultChannelReceiver = std::make_unique( PrivateConstructorTag{}, channelLength, pollingTimeWindow); if (!resultChannelReceiver->mFmqResultChannel.isValid()) { return NN_ERROR() << "Unable to create ResultChannelReceiver"; } const MQDescriptorSync* descriptor = resultChannelReceiver->mFmqResultChannel.getDesc(); return std::make_pair(std::move(resultChannelReceiver), descriptor); } ResultChannelReceiver::ResultChannelReceiver(PrivateConstructorTag /*tag*/, size_t channelLength, std::chrono::microseconds pollingTimeWindow) : mFmqResultChannel(channelLength, /*configureEventFlagWord=*/true), kPollingTimeWindow(pollingTimeWindow) {} nn::Result, V1_2::Timing>> ResultChannelReceiver::getBlocking() { const auto packet = NN_TRY(getPacketBlocking()); return deserialize(packet); } void ResultChannelReceiver::notifyAsDeadObject() { mValid = false; // force unblock // ExecutionBurstController waits on a result packet after sending a request. If the driver // containing ExecutionBurstServer crashes, the controller may be waiting on the futex. This // force unblock wakes up any thread waiting on the futex. const auto data = serialize(V1_0::ErrorStatus::GENERAL_FAILURE, {}, kNoTiming); mFmqResultChannel.writeBlocking(data.data(), data.size()); } nn::Result> ResultChannelReceiver::getPacketBlocking() { if (!mValid) { return NN_ERROR() << "FMQ object is invalid"; } // First spend time polling if results are available in FMQ instead of waiting on the futex. // Polling is more responsive (yielding lower latencies), but can take up more power, so only // poll for a limited period of time. auto& getCurrentTime = std::chrono::high_resolution_clock::now; const auto timeToStopPolling = getCurrentTime() + kPollingTimeWindow; while (getCurrentTime() < timeToStopPolling) { // if class is being torn down, immediately return if (!mValid.load(std::memory_order_relaxed)) { return NN_ERROR() << "FMQ object is invalid"; } // Check if data is available. If it is, immediately retrieve it and return. const size_t available = mFmqResultChannel.availableToRead(); if (available > 0) { std::vector packet(available); const bool success = mFmqResultChannel.readBlocking(packet.data(), available); if (!success) { return NN_ERROR() << "Error receiving packet"; } return packet; } std::this_thread::yield(); } // If we get to this point, we either stopped polling because it was taking too long or polling // was not allowed. Instead, perform a blocking call which uses a futex to save power. // wait for result packet and read first element of result packet FmqResultDatum datum; bool success = mFmqResultChannel.readBlocking(&datum, 1); // retrieve remaining elements // NOTE: all of the data is already available at this point, so there's no need to do a blocking // wait to wait for more data. This is known because in FMQ, all writes are published (made // available) atomically. Currently, the producer always publishes the entire packet in one // function call, so if the first element of the packet is available, the remaining elements are // also available. const size_t count = mFmqResultChannel.availableToRead(); std::vector packet(count + 1); std::memcpy(&packet.front(), &datum, sizeof(datum)); success &= mFmqResultChannel.read(packet.data() + 1, count); if (!mValid) { return NN_ERROR() << "FMQ object is invalid"; } // ensure packet was successfully received if (!success) { return NN_ERROR() << "Error receiving packet"; } return packet; } } // namespace android::hardware::neuralnetworks::V1_2::utils