1/*
2 *
3 * Copyright 2015 gRPC authors.
4 *
5 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
8 *
9 *     http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
16 *
17 */
18
19#import "GRPCCall.h"
20
21#import "GRPCCall+OAuth2.h"
22
23#import <RxLibrary/GRXConcurrentWriteable.h>
24#import <RxLibrary/GRXImmediateSingleWriter.h>
25#include <grpc/grpc.h>
26#include <grpc/support/time.h>
27
28#import "private/GRPCConnectivityMonitor.h"
29#import "private/GRPCHost.h"
30#import "private/GRPCRequestHeaders.h"
31#import "private/GRPCWrappedCall.h"
32#import "private/NSData+GRPC.h"
33#import "private/NSDictionary+GRPC.h"
34#import "private/NSError+GRPC.h"
35
36// At most 6 ops can be in an op batch for a client: SEND_INITIAL_METADATA,
37// SEND_MESSAGE, SEND_CLOSE_FROM_CLIENT, RECV_INITIAL_METADATA, RECV_MESSAGE,
38// and RECV_STATUS_ON_CLIENT.
39NSInteger kMaxClientBatch = 6;
40
41NSString *const kGRPCHeadersKey = @"io.grpc.HeadersKey";
42NSString *const kGRPCTrailersKey = @"io.grpc.TrailersKey";
43static NSMutableDictionary *callFlags;
44
45static NSString *const kAuthorizationHeader = @"authorization";
46static NSString *const kBearerPrefix = @"Bearer ";
47
48const char *kCFStreamVarName = "grpc_cfstream";
49
50@interface GRPCCall ()<GRXWriteable>
51// Make them read-write.
52@property(atomic, strong) NSDictionary *responseHeaders;
53@property(atomic, strong) NSDictionary *responseTrailers;
54@property(atomic) BOOL isWaitingForToken;
55@end
56
57// The following methods of a C gRPC call object aren't reentrant, and thus
58// calls to them must be serialized:
59// - start_batch
60// - destroy
61//
62// start_batch with a SEND_MESSAGE argument can only be called after the
63// OP_COMPLETE event for any previous write is received. This is achieved by
64// pausing the requests writer immediately every time it writes a value, and
65// resuming it again when OP_COMPLETE is received.
66//
67// Similarly, start_batch with a RECV_MESSAGE argument can only be called after
68// the OP_COMPLETE event for any previous read is received.This is easier to
69// enforce, as we're writing the received messages into the writeable:
70// start_batch is enqueued once upon receiving the OP_COMPLETE event for the
71// RECV_METADATA batch, and then once after receiving each OP_COMPLETE event for
72// each RECV_MESSAGE batch.
73@implementation GRPCCall {
74  dispatch_queue_t _callQueue;
75
76  NSString *_host;
77  NSString *_path;
78  GRPCWrappedCall *_wrappedCall;
79  GRPCConnectivityMonitor *_connectivityMonitor;
80
81  // The C gRPC library has less guarantees on the ordering of events than we
82  // do. Particularly, in the face of errors, there's no ordering guarantee at
83  // all. This wrapper over our actual writeable ensures thread-safety and
84  // correct ordering.
85  GRXConcurrentWriteable *_responseWriteable;
86
87  // The network thread wants the requestWriter to resume (when the server is ready for more input),
88  // or to stop (on errors), concurrently with user threads that want to start it, pause it or stop
89  // it. Because a writer isn't thread-safe, we'll synchronize those operations on it.
90  // We don't use a dispatch queue for that purpose, because the writer can call writeValue: or
91  // writesFinishedWithError: on this GRPCCall as part of those operations. We want to be able to
92  // pause the writer immediately on writeValue:, so we need our locking to be recursive.
93  GRXWriter *_requestWriter;
94
95  // To create a retain cycle when a call is started, up until it finishes. See
96  // |startWithWriteable:| and |finishWithError:|. This saves users from having to retain a
97  // reference to the call object if all they're interested in is the handler being executed when
98  // the response arrives.
99  GRPCCall *_retainSelf;
100
101  GRPCRequestHeaders *_requestHeaders;
102
103  // In the case that the call is a unary call (i.e. the writer to GRPCCall is of type
104  // GRXImmediateSingleWriter), GRPCCall will delay sending ops (not send them to C core
105  // immediately) and buffer them into a batch _unaryOpBatch. The batch is sent to C core when
106  // the SendClose op is added.
107  BOOL _unaryCall;
108  NSMutableArray *_unaryOpBatch;
109
110  // The dispatch queue to be used for enqueuing responses to user. Defaulted to the main dispatch
111  // queue
112  dispatch_queue_t _responseQueue;
113
114  // Whether the call is finished. If it is, should not call finishWithError again.
115  BOOL _finished;
116}
117
118@synthesize state = _state;
119
120+ (void)initialize {
121  // Guarantees the code in {} block is invoked only once. See ref at:
122  // https://developer.apple.com/documentation/objectivec/nsobject/1418639-initialize?language=objc
123  if (self == [GRPCCall self]) {
124    grpc_init();
125    callFlags = [NSMutableDictionary dictionary];
126  }
127}
128
129+ (void)setCallSafety:(GRPCCallSafety)callSafety host:(NSString *)host path:(NSString *)path {
130  NSString *hostAndPath = [NSString stringWithFormat:@"%@/%@", host, path];
131  switch (callSafety) {
132    case GRPCCallSafetyDefault:
133      callFlags[hostAndPath] = @0;
134      break;
135    case GRPCCallSafetyIdempotentRequest:
136      callFlags[hostAndPath] = @GRPC_INITIAL_METADATA_IDEMPOTENT_REQUEST;
137      break;
138    case GRPCCallSafetyCacheableRequest:
139      callFlags[hostAndPath] = @GRPC_INITIAL_METADATA_CACHEABLE_REQUEST;
140      break;
141    default:
142      break;
143  }
144}
145
146+ (uint32_t)callFlagsForHost:(NSString *)host path:(NSString *)path {
147  NSString *hostAndPath = [NSString stringWithFormat:@"%@/%@", host, path];
148  return [callFlags[hostAndPath] intValue];
149}
150
151- (instancetype)init {
152  return [self initWithHost:nil path:nil requestsWriter:nil];
153}
154
155// Designated initializer
156- (instancetype)initWithHost:(NSString *)host
157                        path:(NSString *)path
158              requestsWriter:(GRXWriter *)requestWriter {
159  if (!host || !path) {
160    [NSException raise:NSInvalidArgumentException format:@"Neither host nor path can be nil."];
161  }
162  if (requestWriter.state != GRXWriterStateNotStarted) {
163    [NSException raise:NSInvalidArgumentException
164                format:@"The requests writer can't be already started."];
165  }
166  if ((self = [super init])) {
167    _host = [host copy];
168    _path = [path copy];
169
170    // Serial queue to invoke the non-reentrant methods of the grpc_call object.
171    _callQueue = dispatch_queue_create("io.grpc.call", NULL);
172
173    _requestWriter = requestWriter;
174
175    _requestHeaders = [[GRPCRequestHeaders alloc] initWithCall:self];
176
177    if ([requestWriter isKindOfClass:[GRXImmediateSingleWriter class]]) {
178      _unaryCall = YES;
179      _unaryOpBatch = [NSMutableArray arrayWithCapacity:kMaxClientBatch];
180    }
181
182    _responseQueue = dispatch_get_main_queue();
183  }
184  return self;
185}
186
187- (void)setResponseDispatchQueue:(dispatch_queue_t)queue {
188  if (_state != GRXWriterStateNotStarted) {
189    return;
190  }
191  _responseQueue = queue;
192}
193
194#pragma mark Finish
195
196- (void)finishWithError:(NSError *)errorOrNil {
197  @synchronized(self) {
198    _state = GRXWriterStateFinished;
199  }
200
201  // If there were still request messages coming, stop them.
202  @synchronized(_requestWriter) {
203    _requestWriter.state = GRXWriterStateFinished;
204  }
205
206  if (errorOrNil) {
207    [_responseWriteable cancelWithError:errorOrNil];
208  } else {
209    [_responseWriteable enqueueSuccessfulCompletion];
210  }
211
212  // Connectivity monitor is not required for CFStream
213  char *enableCFStream = getenv(kCFStreamVarName);
214  if (enableCFStream == nil || enableCFStream[0] != '1') {
215    [GRPCConnectivityMonitor unregisterObserver:self];
216  }
217
218  // If the call isn't retained anywhere else, it can be deallocated now.
219  _retainSelf = nil;
220}
221
222- (void)cancelCall {
223  // Can be called from any thread, any number of times.
224  [_wrappedCall cancel];
225}
226
227- (void)cancel {
228  if (!self.isWaitingForToken) {
229    [self cancelCall];
230  } else {
231    self.isWaitingForToken = NO;
232  }
233  [self
234      maybeFinishWithError:[NSError
235                               errorWithDomain:kGRPCErrorDomain
236                                          code:GRPCErrorCodeCancelled
237                                      userInfo:@{NSLocalizedDescriptionKey : @"Canceled by app"}]];
238}
239
240- (void)maybeFinishWithError:(NSError *)errorOrNil {
241  BOOL toFinish = NO;
242  @synchronized(self) {
243    if (_finished == NO) {
244      _finished = YES;
245      toFinish = YES;
246    }
247  }
248  if (toFinish == YES) {
249    [self finishWithError:errorOrNil];
250  }
251}
252
253- (void)dealloc {
254  __block GRPCWrappedCall *wrappedCall = _wrappedCall;
255  dispatch_async(_callQueue, ^{
256    wrappedCall = nil;
257  });
258}
259
260#pragma mark Read messages
261
262// Only called from the call queue.
263// The handler will be called from the network queue.
264- (void)startReadWithHandler:(void (^)(grpc_byte_buffer *))handler {
265  // TODO(jcanizales): Add error handlers for async failures
266  [_wrappedCall startBatchWithOperations:@[ [[GRPCOpRecvMessage alloc] initWithHandler:handler] ]];
267}
268
269// Called initially from the network queue once response headers are received,
270// then "recursively" from the responseWriteable queue after each response from the
271// server has been written.
272// If the call is currently paused, this is a noop. Restarting the call will invoke this
273// method.
274// TODO(jcanizales): Rename to readResponseIfNotPaused.
275- (void)startNextRead {
276  @synchronized(self) {
277    if (self.state == GRXWriterStatePaused) {
278      return;
279    }
280  }
281
282  dispatch_async(_callQueue, ^{
283    __weak GRPCCall *weakSelf = self;
284    __weak GRXConcurrentWriteable *weakWriteable = self->_responseWriteable;
285    [self startReadWithHandler:^(grpc_byte_buffer *message) {
286      __strong GRPCCall *strongSelf = weakSelf;
287      __strong GRXConcurrentWriteable *strongWriteable = weakWriteable;
288      if (message == NULL) {
289        // No more messages from the server
290        return;
291      }
292      NSData *data = [NSData grpc_dataWithByteBuffer:message];
293      grpc_byte_buffer_destroy(message);
294      if (!data) {
295        // The app doesn't have enough memory to hold the server response. We
296        // don't want to throw, because the app shouldn't crash for a behavior
297        // that's on the hands of any server to have. Instead we finish and ask
298        // the server to cancel.
299        [strongSelf cancelCall];
300        [strongSelf
301            maybeFinishWithError:[NSError errorWithDomain:kGRPCErrorDomain
302                                                     code:GRPCErrorCodeResourceExhausted
303                                                 userInfo:@{
304                                                   NSLocalizedDescriptionKey :
305                                                       @"Client does not have enough memory to "
306                                                       @"hold the server response."
307                                                 }]];
308        return;
309      }
310      [strongWriteable enqueueValue:data
311                  completionHandler:^{
312                    [strongSelf startNextRead];
313                  }];
314    }];
315  });
316}
317
318#pragma mark Send headers
319
320- (void)sendHeaders:(NSDictionary *)headers {
321  // TODO(jcanizales): Add error handlers for async failures
322  GRPCOpSendMetadata *op = [[GRPCOpSendMetadata alloc]
323      initWithMetadata:headers
324                 flags:[GRPCCall callFlagsForHost:_host path:_path]
325               handler:nil];  // No clean-up needed after SEND_INITIAL_METADATA
326  if (!_unaryCall) {
327    [_wrappedCall startBatchWithOperations:@[ op ]];
328  } else {
329    [_unaryOpBatch addObject:op];
330  }
331}
332
333#pragma mark GRXWriteable implementation
334
335// Only called from the call queue. The error handler will be called from the
336// network queue if the write didn't succeed.
337// If the call is a unary call, parameter \a errorHandler will be ignored and
338// the error handler of GRPCOpSendClose will be executed in case of error.
339- (void)writeMessage:(NSData *)message withErrorHandler:(void (^)(void))errorHandler {
340  __weak GRPCCall *weakSelf = self;
341  void (^resumingHandler)(void) = ^{
342    // Resume the request writer.
343    GRPCCall *strongSelf = weakSelf;
344    if (strongSelf) {
345      @synchronized(strongSelf->_requestWriter) {
346        strongSelf->_requestWriter.state = GRXWriterStateStarted;
347      }
348    }
349  };
350
351  GRPCOpSendMessage *op =
352      [[GRPCOpSendMessage alloc] initWithMessage:message handler:resumingHandler];
353  if (!_unaryCall) {
354    [_wrappedCall startBatchWithOperations:@[ op ] errorHandler:errorHandler];
355  } else {
356    // Ignored errorHandler since it is the same as the one for GRPCOpSendClose.
357    // TODO (mxyan): unify the error handlers of all Ops into a single closure.
358    [_unaryOpBatch addObject:op];
359  }
360}
361
362- (void)writeValue:(id)value {
363  // TODO(jcanizales): Throw/assert if value isn't NSData.
364
365  // Pause the input and only resume it when the C layer notifies us that writes
366  // can proceed.
367  @synchronized(_requestWriter) {
368    _requestWriter.state = GRXWriterStatePaused;
369  }
370
371  dispatch_async(_callQueue, ^{
372    // Write error is not processed here. It is handled by op batch of GRPC_OP_RECV_STATUS_ON_CLIENT
373    [self writeMessage:value withErrorHandler:nil];
374  });
375}
376
377// Only called from the call queue. The error handler will be called from the
378// network queue if the requests stream couldn't be closed successfully.
379- (void)finishRequestWithErrorHandler:(void (^)(void))errorHandler {
380  if (!_unaryCall) {
381    [_wrappedCall startBatchWithOperations:@[ [[GRPCOpSendClose alloc] init] ]
382                              errorHandler:errorHandler];
383  } else {
384    [_unaryOpBatch addObject:[[GRPCOpSendClose alloc] init]];
385    [_wrappedCall startBatchWithOperations:_unaryOpBatch errorHandler:errorHandler];
386  }
387}
388
389- (void)writesFinishedWithError:(NSError *)errorOrNil {
390  if (errorOrNil) {
391    [self cancel];
392  } else {
393    dispatch_async(_callQueue, ^{
394      // EOS error is not processed here. It is handled by op batch of GRPC_OP_RECV_STATUS_ON_CLIENT
395      [self finishRequestWithErrorHandler:nil];
396    });
397  }
398}
399
400#pragma mark Invoke
401
402// Both handlers will eventually be called, from the network queue. Writes can start immediately
403// after this.
404// The first one (headersHandler), when the response headers are received.
405// The second one (completionHandler), whenever the RPC finishes for any reason.
406- (void)invokeCallWithHeadersHandler:(void (^)(NSDictionary *))headersHandler
407                   completionHandler:(void (^)(NSError *, NSDictionary *))completionHandler {
408  // TODO(jcanizales): Add error handlers for async failures
409  [_wrappedCall
410      startBatchWithOperations:@[ [[GRPCOpRecvMetadata alloc] initWithHandler:headersHandler] ]];
411  [_wrappedCall
412      startBatchWithOperations:@[ [[GRPCOpRecvStatus alloc] initWithHandler:completionHandler] ]];
413}
414
415- (void)invokeCall {
416  __weak GRPCCall *weakSelf = self;
417  [self invokeCallWithHeadersHandler:^(NSDictionary *headers) {
418    // Response headers received.
419    __strong GRPCCall *strongSelf = weakSelf;
420    if (strongSelf) {
421      strongSelf.responseHeaders = headers;
422      [strongSelf startNextRead];
423    }
424  }
425      completionHandler:^(NSError *error, NSDictionary *trailers) {
426        __strong GRPCCall *strongSelf = weakSelf;
427        if (strongSelf) {
428          strongSelf.responseTrailers = trailers;
429
430          if (error) {
431            NSMutableDictionary *userInfo = [NSMutableDictionary dictionary];
432            if (error.userInfo) {
433              [userInfo addEntriesFromDictionary:error.userInfo];
434            }
435            userInfo[kGRPCTrailersKey] = strongSelf.responseTrailers;
436            // TODO(jcanizales): The C gRPC library doesn't guarantee that the headers block will be
437            // called before this one, so an error might end up with trailers but no headers. We
438            // shouldn't call finishWithError until ater both blocks are called. It is also when
439            // this is done that we can provide a merged view of response headers and trailers in a
440            // thread-safe way.
441            if (strongSelf.responseHeaders) {
442              userInfo[kGRPCHeadersKey] = strongSelf.responseHeaders;
443            }
444            error = [NSError errorWithDomain:error.domain code:error.code userInfo:userInfo];
445          }
446          [strongSelf maybeFinishWithError:error];
447        }
448      }];
449  // Now that the RPC has been initiated, request writes can start.
450  @synchronized(_requestWriter) {
451    [_requestWriter startWithWriteable:self];
452  }
453}
454
455#pragma mark GRXWriter implementation
456
457- (void)startCallWithWriteable:(id<GRXWriteable>)writeable {
458  _responseWriteable =
459      [[GRXConcurrentWriteable alloc] initWithWriteable:writeable dispatchQueue:_responseQueue];
460
461  _wrappedCall = [[GRPCWrappedCall alloc] initWithHost:_host
462                                            serverName:_serverName
463                                                  path:_path
464                                               timeout:_timeout];
465  NSAssert(_wrappedCall, @"Error allocating RPC objects. Low memory?");
466
467  [self sendHeaders:_requestHeaders];
468  [self invokeCall];
469
470  // Connectivity monitor is not required for CFStream
471  char *enableCFStream = getenv(kCFStreamVarName);
472  if (enableCFStream == nil || enableCFStream[0] != '1') {
473    [GRPCConnectivityMonitor registerObserver:self selector:@selector(connectivityChanged:)];
474  }
475}
476
477- (void)startWithWriteable:(id<GRXWriteable>)writeable {
478  @synchronized(self) {
479    _state = GRXWriterStateStarted;
480  }
481
482  // Create a retain cycle so that this instance lives until the RPC finishes (or is cancelled).
483  // This makes RPCs in which the call isn't externally retained possible (as long as it is started
484  // before being autoreleased).
485  // Care is taken not to retain self strongly in any of the blocks used in this implementation, so
486  // that the life of the instance is determined by this retain cycle.
487  _retainSelf = self;
488
489  if (self.tokenProvider != nil) {
490    self.isWaitingForToken = YES;
491    __weak typeof(self) weakSelf = self;
492    [self.tokenProvider getTokenWithHandler:^(NSString *token) {
493      typeof(self) strongSelf = weakSelf;
494      if (strongSelf && strongSelf.isWaitingForToken) {
495        if (token) {
496          NSString *t = [kBearerPrefix stringByAppendingString:token];
497          strongSelf.requestHeaders[kAuthorizationHeader] = t;
498        }
499        [strongSelf startCallWithWriteable:writeable];
500        strongSelf.isWaitingForToken = NO;
501      }
502    }];
503  } else {
504    [self startCallWithWriteable:writeable];
505  }
506}
507
508- (void)setState:(GRXWriterState)newState {
509  @synchronized(self) {
510    // Manual transitions are only allowed from the started or paused states.
511    if (_state == GRXWriterStateNotStarted || _state == GRXWriterStateFinished) {
512      return;
513    }
514
515    switch (newState) {
516      case GRXWriterStateFinished:
517        _state = newState;
518        // Per GRXWriter's contract, setting the state to Finished manually
519        // means one doesn't wish the writeable to be messaged anymore.
520        [_responseWriteable cancelSilently];
521        _responseWriteable = nil;
522        return;
523      case GRXWriterStatePaused:
524        _state = newState;
525        return;
526      case GRXWriterStateStarted:
527        if (_state == GRXWriterStatePaused) {
528          _state = newState;
529          [self startNextRead];
530        }
531        return;
532      case GRXWriterStateNotStarted:
533        return;
534    }
535  }
536}
537
538- (void)connectivityChanged:(NSNotification *)note {
539  // Cancel underlying call upon this notification
540  __strong GRPCCall *strongSelf = self;
541  if (strongSelf) {
542    [self cancelCall];
543    [self
544        maybeFinishWithError:[NSError errorWithDomain:kGRPCErrorDomain
545                                                 code:GRPCErrorCodeUnavailable
546                                             userInfo:@{
547                                               NSLocalizedDescriptionKey : @"Connectivity lost."
548                                             }]];
549  }
550}
551
552@end
553