1/* 2 * 3 * Copyright 2015 gRPC authors. 4 * 5 * Licensed under the Apache License, Version 2.0 (the "License"); 6 * you may not use this file except in compliance with the License. 7 * You may obtain a copy of the License at 8 * 9 * http://www.apache.org/licenses/LICENSE-2.0 10 * 11 * Unless required by applicable law or agreed to in writing, software 12 * distributed under the License is distributed on an "AS IS" BASIS, 13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 14 * See the License for the specific language governing permissions and 15 * limitations under the License. 16 * 17 */ 18 19#import "GRPCCompletionQueue.h" 20 21#import <grpc/grpc.h> 22 23const grpc_completion_queue_attributes kCompletionQueueAttr = { 24 GRPC_CQ_CURRENT_VERSION, GRPC_CQ_NEXT, GRPC_CQ_DEFAULT_POLLING, NULL}; 25 26@implementation GRPCCompletionQueue 27 28+ (instancetype)completionQueue { 29 static GRPCCompletionQueue *singleton = nil; 30 static dispatch_once_t onceToken; 31 dispatch_once(&onceToken, ^{ 32 singleton = [[self alloc] init]; 33 }); 34 return singleton; 35} 36 37- (instancetype)init { 38 if ((self = [super init])) { 39 _unmanagedQueue = grpc_completion_queue_create( 40 grpc_completion_queue_factory_lookup(&kCompletionQueueAttr), &kCompletionQueueAttr, NULL); 41 42 // This is for the following block to capture the pointer by value (instead 43 // of retaining self and doing self->_unmanagedQueue). This is essential 44 // because the block doesn't end until after grpc_completion_queue_shutdown 45 // is called, and we only want that to happen after nobody's using the queue 46 // anymore (i.e. on self dealloc). So the block would never end if it 47 // retained self. 48 grpc_completion_queue *unmanagedQueue = _unmanagedQueue; 49 50 // Start a loop on a concurrent queue to read events from the completion 51 // queue and dispatch each. 52 static dispatch_once_t initialization; 53 static dispatch_queue_t gDefaultConcurrentQueue; 54 dispatch_once(&initialization, ^{ 55 gDefaultConcurrentQueue = dispatch_get_global_queue(DISPATCH_QUEUE_PRIORITY_DEFAULT, 0); 56 }); 57 dispatch_async(gDefaultConcurrentQueue, ^{ 58 while (YES) { 59 // The following call blocks until an event is available. 60 grpc_event event = 61 grpc_completion_queue_next(unmanagedQueue, gpr_inf_future(GPR_CLOCK_REALTIME), NULL); 62 GRPCQueueCompletionHandler handler; 63 switch (event.type) { 64 case GRPC_OP_COMPLETE: 65 handler = (__bridge_transfer GRPCQueueCompletionHandler)event.tag; 66 handler(event.success); 67 break; 68 case GRPC_QUEUE_SHUTDOWN: 69 grpc_completion_queue_destroy(unmanagedQueue); 70 return; 71 default: 72 [NSException raise:@"Unrecognized completion type" format:@""]; 73 } 74 }; 75 }); 76 } 77 return self; 78} 79 80- (void)dealloc { 81 // This makes the completion queue produce a GRPC_QUEUE_SHUTDOWN event *after* 82 // all other pending events are flushed. What this means is all the blocks 83 // passed to the gRPC C library as void* are eventually called, even if some 84 // are called after self is dealloc'd. 85 grpc_completion_queue_shutdown(_unmanagedQueue); 86} 87@end 88