1 //===----- RPCUTils.h - Basic tilities for building RPC APIs ----*- C++ -*-===//
2 //
3 //                     The LLVM Compiler Infrastructure
4 //
5 // This file is distributed under the University of Illinois Open Source
6 // License. See LICENSE.TXT for details.
7 //
8 //===----------------------------------------------------------------------===//
9 //
10 // Basic utilities for building RPC APIs.
11 //
12 //===----------------------------------------------------------------------===//
13 
14 #ifndef LLVM_EXECUTIONENGINE_ORC_RPCUTILS_H
15 #define LLVM_EXECUTIONENGINE_ORC_RPCUTILS_H
16 
17 #include <map>
18 #include <vector>
19 
20 #include "llvm/ADT/Optional.h"
21 #include "llvm/ADT/STLExtras.h"
22 #include "llvm/ExecutionEngine/Orc/OrcError.h"
23 
24 #ifdef _MSC_VER
25 // concrt.h depends on eh.h for __uncaught_exception declaration
26 // even if we disable exceptions.
27 #include <eh.h>
28 
29 // Disable warnings from ppltasks.h transitively included by <future>.
30 #pragma warning(push)
31 #pragma warning(disable : 4530)
32 #pragma warning(disable : 4062)
33 #endif
34 
35 #include <future>
36 
37 #ifdef _MSC_VER
38 #pragma warning(pop)
39 #endif
40 
41 namespace llvm {
42 namespace orc {
43 namespace remote {
44 
45 /// Describes reserved RPC Function Ids.
46 ///
47 /// The default implementation will serve for integer and enum function id
48 /// types. If you want to use a custom type as your FunctionId you can
49 /// specialize this class and provide unique values for InvalidId,
50 /// ResponseId and FirstValidId.
51 
52 template <typename T> class RPCFunctionIdTraits {
53 public:
54   static const T InvalidId = static_cast<T>(0);
55   static const T ResponseId = static_cast<T>(1);
56   static const T FirstValidId = static_cast<T>(2);
57 };
58 
59 // Base class containing utilities that require partial specialization.
60 // These cannot be included in RPC, as template class members cannot be
61 // partially specialized.
62 class RPCBase {
63 protected:
64   // RPC Function description type.
65   //
66   // This class provides the information and operations needed to support the
67   // RPC primitive operations (call, expect, etc) for a given function. It
68   // is specialized for void and non-void functions to deal with the differences
69   // betwen the two. Both specializations have the same interface:
70   //
71   // Id - The function's unique identifier.
72   // OptionalReturn - The return type for asyncronous calls.
73   // ErrorReturn - The return type for synchronous calls.
74   // optionalToErrorReturn - Conversion from a valid OptionalReturn to an
75   //                         ErrorReturn.
76   // readResult - Deserialize a result from a channel.
77   // abandon - Abandon a promised (asynchronous) result.
78   // respond - Retun a result on the channel.
79   template <typename FunctionIdT, FunctionIdT FuncId, typename FnT>
80   class FunctionHelper {};
81 
82   // RPC Function description specialization for non-void functions.
83   template <typename FunctionIdT, FunctionIdT FuncId, typename RetT,
84             typename... ArgTs>
85   class FunctionHelper<FunctionIdT, FuncId, RetT(ArgTs...)> {
86   public:
87     static_assert(FuncId != RPCFunctionIdTraits<FunctionIdT>::InvalidId &&
88                       FuncId != RPCFunctionIdTraits<FunctionIdT>::ResponseId,
89                   "Cannot define custom function with InvalidId or ResponseId. "
90                   "Please use RPCFunctionTraits<FunctionIdT>::FirstValidId.");
91 
92     static const FunctionIdT Id = FuncId;
93 
94     typedef Optional<RetT> OptionalReturn;
95 
96     typedef Expected<RetT> ErrorReturn;
97 
optionalToErrorReturn(OptionalReturn && V)98     static ErrorReturn optionalToErrorReturn(OptionalReturn &&V) {
99       assert(V && "Return value not available");
100       return std::move(*V);
101     }
102 
103     template <typename ChannelT>
readResult(ChannelT & C,std::promise<OptionalReturn> & P)104     static Error readResult(ChannelT &C, std::promise<OptionalReturn> &P) {
105       RetT Val;
106       auto Err = deserialize(C, Val);
107       auto Err2 = endReceiveMessage(C);
108       Err = joinErrors(std::move(Err), std::move(Err2));
109 
110       if (Err) {
111         P.set_value(OptionalReturn());
112         return Err;
113       }
114       P.set_value(std::move(Val));
115       return Error::success();
116     }
117 
abandon(std::promise<OptionalReturn> & P)118     static void abandon(std::promise<OptionalReturn> &P) {
119       P.set_value(OptionalReturn());
120     }
121 
122     template <typename ChannelT, typename SequenceNumberT>
respond(ChannelT & C,SequenceNumberT SeqNo,ErrorReturn & Result)123     static Error respond(ChannelT &C, SequenceNumberT SeqNo,
124                          ErrorReturn &Result) {
125       FunctionIdT ResponseId = RPCFunctionIdTraits<FunctionIdT>::ResponseId;
126 
127       // If the handler returned an error then bail out with that.
128       if (!Result)
129         return Result.takeError();
130 
131       // Otherwise open a new message on the channel and send the result.
132       if (auto Err = startSendMessage(C))
133         return Err;
134       if (auto Err = serializeSeq(C, ResponseId, SeqNo, *Result))
135         return Err;
136       return endSendMessage(C);
137     }
138   };
139 
140   // RPC Function description specialization for void functions.
141   template <typename FunctionIdT, FunctionIdT FuncId, typename... ArgTs>
142   class FunctionHelper<FunctionIdT, FuncId, void(ArgTs...)> {
143   public:
144     static_assert(FuncId != RPCFunctionIdTraits<FunctionIdT>::InvalidId &&
145                       FuncId != RPCFunctionIdTraits<FunctionIdT>::ResponseId,
146                   "Cannot define custom function with InvalidId or ResponseId. "
147                   "Please use RPCFunctionTraits<FunctionIdT>::FirstValidId.");
148 
149     static const FunctionIdT Id = FuncId;
150 
151     typedef bool OptionalReturn;
152     typedef Error ErrorReturn;
153 
optionalToErrorReturn(OptionalReturn && V)154     static ErrorReturn optionalToErrorReturn(OptionalReturn &&V) {
155       assert(V && "Return value not available");
156       return Error::success();
157     }
158 
159     template <typename ChannelT>
readResult(ChannelT & C,std::promise<OptionalReturn> & P)160     static Error readResult(ChannelT &C, std::promise<OptionalReturn> &P) {
161       // Void functions don't have anything to deserialize, so we're good.
162       P.set_value(true);
163       return endReceiveMessage(C);
164     }
165 
abandon(std::promise<OptionalReturn> & P)166     static void abandon(std::promise<OptionalReturn> &P) { P.set_value(false); }
167 
168     template <typename ChannelT, typename SequenceNumberT>
respond(ChannelT & C,SequenceNumberT SeqNo,ErrorReturn & Result)169     static Error respond(ChannelT &C, SequenceNumberT SeqNo,
170                          ErrorReturn &Result) {
171       const FunctionIdT ResponseId =
172           RPCFunctionIdTraits<FunctionIdT>::ResponseId;
173 
174       // If the handler returned an error then bail out with that.
175       if (Result)
176         return std::move(Result);
177 
178       // Otherwise open a new message on the channel and send the result.
179       if (auto Err = startSendMessage(C))
180         return Err;
181       if (auto Err = serializeSeq(C, ResponseId, SeqNo))
182         return Err;
183       return endSendMessage(C);
184     }
185   };
186 
187   // Helper for the call primitive.
188   template <typename ChannelT, typename SequenceNumberT, typename Func>
189   class CallHelper;
190 
191   template <typename ChannelT, typename SequenceNumberT, typename FunctionIdT,
192             FunctionIdT FuncId, typename RetT, typename... ArgTs>
193   class CallHelper<ChannelT, SequenceNumberT,
194                    FunctionHelper<FunctionIdT, FuncId, RetT(ArgTs...)>> {
195   public:
call(ChannelT & C,SequenceNumberT SeqNo,const ArgTs &...Args)196     static Error call(ChannelT &C, SequenceNumberT SeqNo,
197                       const ArgTs &... Args) {
198       if (auto Err = startSendMessage(C))
199         return Err;
200       if (auto Err = serializeSeq(C, FuncId, SeqNo, Args...))
201         return Err;
202       return endSendMessage(C);
203     }
204   };
205 
206   // Helper for handle primitive.
207   template <typename ChannelT, typename SequenceNumberT, typename Func>
208   class HandlerHelper;
209 
210   template <typename ChannelT, typename SequenceNumberT, typename FunctionIdT,
211             FunctionIdT FuncId, typename RetT, typename... ArgTs>
212   class HandlerHelper<ChannelT, SequenceNumberT,
213                       FunctionHelper<FunctionIdT, FuncId, RetT(ArgTs...)>> {
214   public:
215     template <typename HandlerT>
handle(ChannelT & C,HandlerT Handler)216     static Error handle(ChannelT &C, HandlerT Handler) {
217       return readAndHandle(C, Handler, llvm::index_sequence_for<ArgTs...>());
218     }
219 
220   private:
221     typedef FunctionHelper<FunctionIdT, FuncId, RetT(ArgTs...)> Func;
222 
223     template <typename HandlerT, size_t... Is>
readAndHandle(ChannelT & C,HandlerT Handler,llvm::index_sequence<Is...> _)224     static Error readAndHandle(ChannelT &C, HandlerT Handler,
225                                llvm::index_sequence<Is...> _) {
226       std::tuple<ArgTs...> RPCArgs;
227       SequenceNumberT SeqNo;
228       // GCC 4.7 and 4.8 incorrectly issue a -Wunused-but-set-variable warning
229       // for RPCArgs. Void cast RPCArgs to work around this for now.
230       // FIXME: Remove this workaround once we can assume a working GCC version.
231       (void)RPCArgs;
232       if (auto Err = deserializeSeq(C, SeqNo, std::get<Is>(RPCArgs)...))
233         return Err;
234 
235       // We've deserialized the arguments, so unlock the channel for reading
236       // before we call the handler. This allows recursive RPC calls.
237       if (auto Err = endReceiveMessage(C))
238         return Err;
239 
240       // Run the handler and get the result.
241       auto Result = Handler(std::get<Is>(RPCArgs)...);
242 
243       // Return the result to the client.
244       return Func::template respond<ChannelT, SequenceNumberT>(C, SeqNo,
245                                                                Result);
246     }
247   };
248 
249   // Helper for wrapping member functions up as functors.
250   template <typename ClassT, typename RetT, typename... ArgTs>
251   class MemberFnWrapper {
252   public:
253     typedef RetT (ClassT::*MethodT)(ArgTs...);
MemberFnWrapper(ClassT & Instance,MethodT Method)254     MemberFnWrapper(ClassT &Instance, MethodT Method)
255         : Instance(Instance), Method(Method) {}
operator()256     RetT operator()(ArgTs &... Args) { return (Instance.*Method)(Args...); }
257 
258   private:
259     ClassT &Instance;
260     MethodT Method;
261   };
262 
263   // Helper that provides a Functor for deserializing arguments.
264   template <typename... ArgTs> class ReadArgs {
265   public:
operator()266     Error operator()() { return Error::success(); }
267   };
268 
269   template <typename ArgT, typename... ArgTs>
270   class ReadArgs<ArgT, ArgTs...> : public ReadArgs<ArgTs...> {
271   public:
ReadArgs(ArgT & Arg,ArgTs &...Args)272     ReadArgs(ArgT &Arg, ArgTs &... Args)
273         : ReadArgs<ArgTs...>(Args...), Arg(Arg) {}
274 
operator()275     Error operator()(ArgT &ArgVal, ArgTs &... ArgVals) {
276       this->Arg = std::move(ArgVal);
277       return ReadArgs<ArgTs...>::operator()(ArgVals...);
278     }
279 
280   private:
281     ArgT &Arg;
282   };
283 };
284 
285 /// Contains primitive utilities for defining, calling and handling calls to
286 /// remote procedures. ChannelT is a bidirectional stream conforming to the
287 /// RPCChannel interface (see RPCChannel.h), and FunctionIdT is a procedure
288 /// identifier type that must be serializable on ChannelT.
289 ///
290 /// These utilities support the construction of very primitive RPC utilities.
291 /// Their intent is to ensure correct serialization and deserialization of
292 /// procedure arguments, and to keep the client and server's view of the API in
293 /// sync.
294 ///
295 /// These utilities do not support return values. These can be handled by
296 /// declaring a corresponding '.*Response' procedure and expecting it after a
297 /// call). They also do not support versioning: the client and server *must* be
298 /// compiled with the same procedure definitions.
299 ///
300 ///
301 ///
302 /// Overview (see comments individual types/methods for details):
303 ///
304 /// Function<Id, Args...> :
305 ///
306 ///   associates a unique serializable id with an argument list.
307 ///
308 ///
309 /// call<Func>(Channel, Args...) :
310 ///
311 ///   Calls the remote procedure 'Func' by serializing Func's id followed by its
312 /// arguments and sending the resulting bytes to 'Channel'.
313 ///
314 ///
315 /// handle<Func>(Channel, <functor matching Error(Args...)> :
316 ///
317 ///   Handles a call to 'Func' by deserializing its arguments and calling the
318 /// given functor. This assumes that the id for 'Func' has already been
319 /// deserialized.
320 ///
321 /// expect<Func>(Channel, <functor matching Error(Args...)> :
322 ///
323 ///   The same as 'handle', except that the procedure id should not have been
324 /// read yet. Expect will deserialize the id and assert that it matches Func's
325 /// id. If it does not, and unexpected RPC call error is returned.
326 template <typename ChannelT, typename FunctionIdT = uint32_t,
327           typename SequenceNumberT = uint16_t>
328 class RPC : public RPCBase {
329 public:
330   /// RPC default constructor.
331   RPC() = default;
332 
333   /// RPC instances cannot be copied.
334   RPC(const RPC &) = delete;
335 
336   /// RPC instances cannot be copied.
337   RPC &operator=(const RPC &) = delete;
338 
339   /// RPC move constructor.
340   // FIXME: Remove once MSVC can synthesize move ops.
RPC(RPC && Other)341   RPC(RPC &&Other)
342       : SequenceNumberMgr(std::move(Other.SequenceNumberMgr)),
343         OutstandingResults(std::move(Other.OutstandingResults)) {}
344 
345   /// RPC move assignment.
346   // FIXME: Remove once MSVC can synthesize move ops.
347   RPC &operator=(RPC &&Other) {
348     SequenceNumberMgr = std::move(Other.SequenceNumberMgr);
349     OutstandingResults = std::move(Other.OutstandingResults);
350     return *this;
351   }
352 
353   /// Utility class for defining/referring to RPC procedures.
354   ///
355   /// Typedefs of this utility are used when calling/handling remote procedures.
356   ///
357   /// FuncId should be a unique value of FunctionIdT (i.e. not used with any
358   /// other Function typedef in the RPC API being defined.
359   ///
360   /// the template argument Ts... gives the argument list for the remote
361   /// procedure.
362   ///
363   /// E.g.
364   ///
365   ///   typedef Function<0, bool> Func1;
366   ///   typedef Function<1, std::string, std::vector<int>> Func2;
367   ///
368   ///   if (auto Err = call<Func1>(Channel, true))
369   ///     /* handle Err */;
370   ///
371   ///   if (auto Err = expect<Func2>(Channel,
372   ///         [](std::string &S, std::vector<int> &V) {
373   ///           // Stuff.
374   ///           return Error::success();
375   ///         })
376   ///     /* handle Err */;
377   ///
378   template <FunctionIdT FuncId, typename FnT>
379   using Function = FunctionHelper<FunctionIdT, FuncId, FnT>;
380 
381   /// Return type for asynchronous call primitives.
382   template <typename Func>
383   using AsyncCallResult = std::future<typename Func::OptionalReturn>;
384 
385   /// Return type for asynchronous call-with-seq primitives.
386   template <typename Func>
387   using AsyncCallWithSeqResult =
388       std::pair<std::future<typename Func::OptionalReturn>, SequenceNumberT>;
389 
390   /// Serialize Args... to channel C, but do not call C.send().
391   ///
392   /// Returns an error (on serialization failure) or a pair of:
393   /// (1) A future Optional<T> (or future<bool> for void functions), and
394   /// (2) A sequence number.
395   ///
396   /// This utility function is primarily used for single-threaded mode support,
397   /// where the sequence number can be used to wait for the corresponding
398   /// result. In multi-threaded mode the appendCallAsync method, which does not
399   /// return the sequence numeber, should be preferred.
400   template <typename Func, typename... ArgTs>
401   Expected<AsyncCallWithSeqResult<Func>>
appendCallAsyncWithSeq(ChannelT & C,const ArgTs &...Args)402   appendCallAsyncWithSeq(ChannelT &C, const ArgTs &... Args) {
403     auto SeqNo = SequenceNumberMgr.getSequenceNumber();
404     std::promise<typename Func::OptionalReturn> Promise;
405     auto Result = Promise.get_future();
406     OutstandingResults[SeqNo] =
407         createOutstandingResult<Func>(std::move(Promise));
408 
409     if (auto Err = CallHelper<ChannelT, SequenceNumberT, Func>::call(C, SeqNo,
410                                                                      Args...)) {
411       abandonOutstandingResults();
412       return std::move(Err);
413     } else
414       return AsyncCallWithSeqResult<Func>(std::move(Result), SeqNo);
415   }
416 
417   /// The same as appendCallAsyncWithSeq, except that it calls C.send() to
418   /// flush the channel after serializing the call.
419   template <typename Func, typename... ArgTs>
420   Expected<AsyncCallWithSeqResult<Func>>
callAsyncWithSeq(ChannelT & C,const ArgTs &...Args)421   callAsyncWithSeq(ChannelT &C, const ArgTs &... Args) {
422     auto Result = appendCallAsyncWithSeq<Func>(C, Args...);
423     if (!Result)
424       return Result;
425     if (auto Err = C.send()) {
426       abandonOutstandingResults();
427       return std::move(Err);
428     }
429     return Result;
430   }
431 
432   /// Serialize Args... to channel C, but do not call send.
433   /// Returns an error if serialization fails, otherwise returns a
434   /// std::future<Optional<T>> (or a future<bool> for void functions).
435   template <typename Func, typename... ArgTs>
appendCallAsync(ChannelT & C,const ArgTs &...Args)436   Expected<AsyncCallResult<Func>> appendCallAsync(ChannelT &C,
437                                                   const ArgTs &... Args) {
438     auto ResAndSeqOrErr = appendCallAsyncWithSeq<Func>(C, Args...);
439     if (ResAndSeqOrErr)
440       return std::move(ResAndSeqOrErr->first);
441     return ResAndSeqOrErr.getError();
442   }
443 
444   /// The same as appendCallAsync, except that it calls C.send to flush the
445   /// channel after serializing the call.
446   template <typename Func, typename... ArgTs>
callAsync(ChannelT & C,const ArgTs &...Args)447   Expected<AsyncCallResult<Func>> callAsync(ChannelT &C,
448                                             const ArgTs &... Args) {
449     auto ResAndSeqOrErr = callAsyncWithSeq<Func>(C, Args...);
450     if (ResAndSeqOrErr)
451       return std::move(ResAndSeqOrErr->first);
452     return ResAndSeqOrErr.getError();
453   }
454 
455   /// This can be used in single-threaded mode.
456   template <typename Func, typename HandleFtor, typename... ArgTs>
457   typename Func::ErrorReturn
callSTHandling(ChannelT & C,HandleFtor & HandleOther,const ArgTs &...Args)458   callSTHandling(ChannelT &C, HandleFtor &HandleOther, const ArgTs &... Args) {
459     if (auto ResultAndSeqNoOrErr = callAsyncWithSeq<Func>(C, Args...)) {
460       auto &ResultAndSeqNo = *ResultAndSeqNoOrErr;
461       if (auto Err = waitForResult(C, ResultAndSeqNo.second, HandleOther))
462         return std::move(Err);
463       return Func::optionalToErrorReturn(ResultAndSeqNo.first.get());
464     } else
465       return ResultAndSeqNoOrErr.takeError();
466   }
467 
468   // This can be used in single-threaded mode.
469   template <typename Func, typename... ArgTs>
callST(ChannelT & C,const ArgTs &...Args)470   typename Func::ErrorReturn callST(ChannelT &C, const ArgTs &... Args) {
471     return callSTHandling<Func>(C, handleNone, Args...);
472   }
473 
474   /// Start receiving a new function call.
475   ///
476   /// Calls startReceiveMessage on the channel, then deserializes a FunctionId
477   /// into Id.
startReceivingFunction(ChannelT & C,FunctionIdT & Id)478   Error startReceivingFunction(ChannelT &C, FunctionIdT &Id) {
479     if (auto Err = startReceiveMessage(C))
480       return Err;
481 
482     return deserialize(C, Id);
483   }
484 
485   /// Deserialize args for Func from C and call Handler. The signature of
486   /// handler must conform to 'Error(Args...)' where Args... matches
487   /// the arguments used in the Func typedef.
488   template <typename Func, typename HandlerT>
handle(ChannelT & C,HandlerT Handler)489   static Error handle(ChannelT &C, HandlerT Handler) {
490     return HandlerHelper<ChannelT, SequenceNumberT, Func>::handle(C, Handler);
491   }
492 
493   /// Helper version of 'handle' for calling member functions.
494   template <typename Func, typename ClassT, typename RetT, typename... ArgTs>
handle(ChannelT & C,ClassT & Instance,RetT (ClassT::* HandlerMethod)(ArgTs...))495   static Error handle(ChannelT &C, ClassT &Instance,
496                       RetT (ClassT::*HandlerMethod)(ArgTs...)) {
497     return handle<Func>(
498         C, MemberFnWrapper<ClassT, RetT, ArgTs...>(Instance, HandlerMethod));
499   }
500 
501   /// Deserialize a FunctionIdT from C and verify it matches the id for Func.
502   /// If the id does match, deserialize the arguments and call the handler
503   /// (similarly to handle).
504   /// If the id does not match, return an unexpect RPC call error and do not
505   /// deserialize any further bytes.
506   template <typename Func, typename HandlerT>
expect(ChannelT & C,HandlerT Handler)507   Error expect(ChannelT &C, HandlerT Handler) {
508     FunctionIdT FuncId;
509     if (auto Err = startReceivingFunction(C, FuncId))
510       return std::move(Err);
511     if (FuncId != Func::Id)
512       return orcError(OrcErrorCode::UnexpectedRPCCall);
513     return handle<Func>(C, Handler);
514   }
515 
516   /// Helper version of expect for calling member functions.
517   template <typename Func, typename ClassT, typename... ArgTs>
expect(ChannelT & C,ClassT & Instance,Error (ClassT::* HandlerMethod)(ArgTs...))518   static Error expect(ChannelT &C, ClassT &Instance,
519                       Error (ClassT::*HandlerMethod)(ArgTs...)) {
520     return expect<Func>(
521         C, MemberFnWrapper<ClassT, ArgTs...>(Instance, HandlerMethod));
522   }
523 
524   /// Helper for handling setter procedures - this method returns a functor that
525   /// sets the variables referred to by Args... to values deserialized from the
526   /// channel.
527   /// E.g.
528   ///
529   ///   typedef Function<0, bool, int> Func1;
530   ///
531   ///   ...
532   ///   bool B;
533   ///   int I;
534   ///   if (auto Err = expect<Func1>(Channel, readArgs(B, I)))
535   ///     /* Handle Args */ ;
536   ///
537   template <typename... ArgTs>
readArgs(ArgTs &...Args)538   static ReadArgs<ArgTs...> readArgs(ArgTs &... Args) {
539     return ReadArgs<ArgTs...>(Args...);
540   }
541 
542   /// Read a response from Channel.
543   /// This should be called from the receive loop to retrieve results.
544   Error handleResponse(ChannelT &C, SequenceNumberT *SeqNoRet = nullptr) {
545     SequenceNumberT SeqNo;
546     if (auto Err = deserialize(C, SeqNo)) {
547       abandonOutstandingResults();
548       return Err;
549     }
550 
551     if (SeqNoRet)
552       *SeqNoRet = SeqNo;
553 
554     auto I = OutstandingResults.find(SeqNo);
555     if (I == OutstandingResults.end()) {
556       abandonOutstandingResults();
557       return orcError(OrcErrorCode::UnexpectedRPCResponse);
558     }
559 
560     if (auto Err = I->second->readResult(C)) {
561       abandonOutstandingResults();
562       // FIXME: Release sequence numbers?
563       return Err;
564     }
565 
566     OutstandingResults.erase(I);
567     SequenceNumberMgr.releaseSequenceNumber(SeqNo);
568 
569     return Error::success();
570   }
571 
572   // Loop waiting for a result with the given sequence number.
573   // This can be used as a receive loop if the user doesn't have a default.
574   template <typename HandleOtherFtor>
575   Error waitForResult(ChannelT &C, SequenceNumberT TgtSeqNo,
576                       HandleOtherFtor &HandleOther = handleNone) {
577     bool GotTgtResult = false;
578 
579     while (!GotTgtResult) {
580       FunctionIdT Id = RPCFunctionIdTraits<FunctionIdT>::InvalidId;
581       if (auto Err = startReceivingFunction(C, Id))
582         return Err;
583       if (Id == RPCFunctionIdTraits<FunctionIdT>::ResponseId) {
584         SequenceNumberT SeqNo;
585         if (auto Err = handleResponse(C, &SeqNo))
586           return Err;
587         GotTgtResult = (SeqNo == TgtSeqNo);
588       } else if (auto Err = HandleOther(C, Id))
589         return Err;
590     }
591 
592     return Error::success();
593   }
594 
595   // Default handler for 'other' (non-response) functions when waiting for a
596   // result from the channel.
handleNone(ChannelT &,FunctionIdT)597   static Error handleNone(ChannelT &, FunctionIdT) {
598     return orcError(OrcErrorCode::UnexpectedRPCCall);
599   };
600 
601 private:
602   // Manage sequence numbers.
603   class SequenceNumberManager {
604   public:
605     SequenceNumberManager() = default;
606 
607     SequenceNumberManager(const SequenceNumberManager &) = delete;
608     SequenceNumberManager &operator=(const SequenceNumberManager &) = delete;
609 
SequenceNumberManager(SequenceNumberManager && Other)610     SequenceNumberManager(SequenceNumberManager &&Other)
611         : NextSequenceNumber(std::move(Other.NextSequenceNumber)),
612           FreeSequenceNumbers(std::move(Other.FreeSequenceNumbers)) {}
613 
614     SequenceNumberManager &operator=(SequenceNumberManager &&Other) {
615       NextSequenceNumber = std::move(Other.NextSequenceNumber);
616       FreeSequenceNumbers = std::move(Other.FreeSequenceNumbers);
617     }
618 
reset()619     void reset() {
620       std::lock_guard<std::mutex> Lock(SeqNoLock);
621       NextSequenceNumber = 0;
622       FreeSequenceNumbers.clear();
623     }
624 
getSequenceNumber()625     SequenceNumberT getSequenceNumber() {
626       std::lock_guard<std::mutex> Lock(SeqNoLock);
627       if (FreeSequenceNumbers.empty())
628         return NextSequenceNumber++;
629       auto SequenceNumber = FreeSequenceNumbers.back();
630       FreeSequenceNumbers.pop_back();
631       return SequenceNumber;
632     }
633 
releaseSequenceNumber(SequenceNumberT SequenceNumber)634     void releaseSequenceNumber(SequenceNumberT SequenceNumber) {
635       std::lock_guard<std::mutex> Lock(SeqNoLock);
636       FreeSequenceNumbers.push_back(SequenceNumber);
637     }
638 
639   private:
640     std::mutex SeqNoLock;
641     SequenceNumberT NextSequenceNumber = 0;
642     std::vector<SequenceNumberT> FreeSequenceNumbers;
643   };
644 
645   // Base class for results that haven't been returned from the other end of the
646   // RPC connection yet.
647   class OutstandingResult {
648   public:
~OutstandingResult()649     virtual ~OutstandingResult() {}
650     virtual Error readResult(ChannelT &C) = 0;
651     virtual void abandon() = 0;
652   };
653 
654   // Outstanding results for a specific function.
655   template <typename Func>
656   class OutstandingResultImpl : public OutstandingResult {
657   private:
658   public:
OutstandingResultImpl(std::promise<typename Func::OptionalReturn> && P)659     OutstandingResultImpl(std::promise<typename Func::OptionalReturn> &&P)
660         : P(std::move(P)) {}
661 
readResult(ChannelT & C)662     Error readResult(ChannelT &C) override { return Func::readResult(C, P); }
663 
abandon()664     void abandon() override { Func::abandon(P); }
665 
666   private:
667     std::promise<typename Func::OptionalReturn> P;
668   };
669 
670   // Create an outstanding result for the given function.
671   template <typename Func>
672   std::unique_ptr<OutstandingResult>
createOutstandingResult(std::promise<typename Func::OptionalReturn> && P)673   createOutstandingResult(std::promise<typename Func::OptionalReturn> &&P) {
674     return llvm::make_unique<OutstandingResultImpl<Func>>(std::move(P));
675   }
676 
677   // Abandon all outstanding results.
abandonOutstandingResults()678   void abandonOutstandingResults() {
679     for (auto &KV : OutstandingResults)
680       KV.second->abandon();
681     OutstandingResults.clear();
682     SequenceNumberMgr.reset();
683   }
684 
685   SequenceNumberManager SequenceNumberMgr;
686   std::map<SequenceNumberT, std::unique_ptr<OutstandingResult>>
687       OutstandingResults;
688 };
689 
690 } // end namespace remote
691 } // end namespace orc
692 } // end namespace llvm
693 
694 #endif
695