1 /*
2  *
3  * Copyright 2018 gRPC authors.
4  *
5  * Licensed under the Apache License, Version 2.0 (the "License");
6  * you may not use this file except in compliance with the License.
7  * You may obtain a copy of the License at
8  *
9  *     http://www.apache.org/licenses/LICENSE-2.0
10  *
11  * Unless required by applicable law or agreed to in writing, software
12  * distributed under the License is distributed on an "AS IS" BASIS,
13  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  * See the License for the specific language governing permissions and
15  * limitations under the License.
16  *
17  */
18 
19 #include <grpc/support/port_platform.h>
20 
21 #include "src/core/lib/iomgr/port.h"
22 
23 #ifdef GRPC_CFSTREAM
24 #import <CoreFoundation/CoreFoundation.h>
25 #import "src/core/lib/iomgr/cfstream_handle.h"
26 
27 #include <grpc/support/atm.h>
28 #include <grpc/support/sync.h>
29 
30 #include "src/core/lib/debug/trace.h"
31 #include "src/core/lib/iomgr/closure.h"
32 #include "src/core/lib/iomgr/exec_ctx.h"
33 
34 extern grpc_core::TraceFlag grpc_tcp_trace;
35 
Retain(void * info)36 void* CFStreamHandle::Retain(void* info) {
37   CFStreamHandle* handle = static_cast<CFStreamHandle*>(info);
38   CFSTREAM_HANDLE_REF(handle, "retain");
39   return info;
40 }
41 
Release(void * info)42 void CFStreamHandle::Release(void* info) {
43   CFStreamHandle* handle = static_cast<CFStreamHandle*>(info);
44   CFSTREAM_HANDLE_UNREF(handle, "release");
45 }
46 
CreateStreamHandle(CFReadStreamRef read_stream,CFWriteStreamRef write_stream)47 CFStreamHandle* CFStreamHandle::CreateStreamHandle(
48     CFReadStreamRef read_stream, CFWriteStreamRef write_stream) {
49   return new CFStreamHandle(read_stream, write_stream);
50 }
51 
ReadCallback(CFReadStreamRef stream,CFStreamEventType type,void * client_callback_info)52 void CFStreamHandle::ReadCallback(CFReadStreamRef stream,
53                                   CFStreamEventType type,
54                                   void* client_callback_info) {
55   CFStreamHandle* handle = static_cast<CFStreamHandle*>(client_callback_info);
56   CFSTREAM_HANDLE_REF(handle, "read callback");
57   dispatch_async(
58       dispatch_get_global_queue(DISPATCH_QUEUE_PRIORITY_DEFAULT, 0), ^{
59         grpc_core::ExecCtx exec_ctx;
60         if (grpc_tcp_trace.enabled()) {
61           gpr_log(GPR_DEBUG, "CFStream ReadCallback (%p, %p, %lu, %p)", handle,
62                   stream, type, client_callback_info);
63         }
64         switch (type) {
65           case kCFStreamEventOpenCompleted:
66             handle->open_event_.SetReady();
67             break;
68           case kCFStreamEventHasBytesAvailable:
69           case kCFStreamEventEndEncountered:
70             handle->read_event_.SetReady();
71             break;
72           case kCFStreamEventErrorOccurred:
73             handle->open_event_.SetReady();
74             handle->read_event_.SetReady();
75             break;
76           default:
77             GPR_UNREACHABLE_CODE(return );
78         }
79         CFSTREAM_HANDLE_UNREF(handle, "read callback");
80       });
81 }
WriteCallback(CFWriteStreamRef stream,CFStreamEventType type,void * clientCallBackInfo)82 void CFStreamHandle::WriteCallback(CFWriteStreamRef stream,
83                                    CFStreamEventType type,
84                                    void* clientCallBackInfo) {
85   CFStreamHandle* handle = static_cast<CFStreamHandle*>(clientCallBackInfo);
86   CFSTREAM_HANDLE_REF(handle, "write callback");
87   dispatch_async(
88       dispatch_get_global_queue(DISPATCH_QUEUE_PRIORITY_DEFAULT, 0), ^{
89         grpc_core::ExecCtx exec_ctx;
90         if (grpc_tcp_trace.enabled()) {
91           gpr_log(GPR_DEBUG, "CFStream WriteCallback (%p, %p, %lu, %p)", handle,
92                   stream, type, clientCallBackInfo);
93         }
94         switch (type) {
95           case kCFStreamEventOpenCompleted:
96             handle->open_event_.SetReady();
97             break;
98           case kCFStreamEventCanAcceptBytes:
99           case kCFStreamEventEndEncountered:
100             handle->write_event_.SetReady();
101             break;
102           case kCFStreamEventErrorOccurred:
103             handle->open_event_.SetReady();
104             handle->write_event_.SetReady();
105             break;
106           default:
107             GPR_UNREACHABLE_CODE(return );
108         }
109         CFSTREAM_HANDLE_UNREF(handle, "write callback");
110       });
111 }
112 
CFStreamHandle(CFReadStreamRef read_stream,CFWriteStreamRef write_stream)113 CFStreamHandle::CFStreamHandle(CFReadStreamRef read_stream,
114                                CFWriteStreamRef write_stream) {
115   gpr_ref_init(&refcount_, 1);
116   open_event_.InitEvent();
117   read_event_.InitEvent();
118   write_event_.InitEvent();
119   CFStreamClientContext ctx = {0, static_cast<void*>(this),
120                                CFStreamHandle::Retain, CFStreamHandle::Release,
121                                nil};
122   CFReadStreamSetClient(
123       read_stream,
124       kCFStreamEventOpenCompleted | kCFStreamEventHasBytesAvailable |
125           kCFStreamEventErrorOccurred | kCFStreamEventEndEncountered,
126       CFStreamHandle::ReadCallback, &ctx);
127   CFWriteStreamSetClient(
128       write_stream,
129       kCFStreamEventOpenCompleted | kCFStreamEventCanAcceptBytes |
130           kCFStreamEventErrorOccurred | kCFStreamEventEndEncountered,
131       CFStreamHandle::WriteCallback, &ctx);
132   CFReadStreamScheduleWithRunLoop(read_stream, CFRunLoopGetMain(),
133                                   kCFRunLoopCommonModes);
134   CFWriteStreamScheduleWithRunLoop(write_stream, CFRunLoopGetMain(),
135                                    kCFRunLoopCommonModes);
136 }
137 
~CFStreamHandle()138 CFStreamHandle::~CFStreamHandle() {
139   open_event_.DestroyEvent();
140   read_event_.DestroyEvent();
141   write_event_.DestroyEvent();
142 }
143 
NotifyOnOpen(grpc_closure * closure)144 void CFStreamHandle::NotifyOnOpen(grpc_closure* closure) {
145   open_event_.NotifyOn(closure);
146 }
147 
NotifyOnRead(grpc_closure * closure)148 void CFStreamHandle::NotifyOnRead(grpc_closure* closure) {
149   read_event_.NotifyOn(closure);
150 }
151 
NotifyOnWrite(grpc_closure * closure)152 void CFStreamHandle::NotifyOnWrite(grpc_closure* closure) {
153   write_event_.NotifyOn(closure);
154 }
155 
Shutdown(grpc_error * error)156 void CFStreamHandle::Shutdown(grpc_error* error) {
157   open_event_.SetShutdown(GRPC_ERROR_REF(error));
158   read_event_.SetShutdown(GRPC_ERROR_REF(error));
159   write_event_.SetShutdown(GRPC_ERROR_REF(error));
160   GRPC_ERROR_UNREF(error);
161 }
162 
Ref(const char * file,int line,const char * reason)163 void CFStreamHandle::Ref(const char* file, int line, const char* reason) {
164   if (grpc_tcp_trace.enabled()) {
165     gpr_atm val = gpr_atm_no_barrier_load(&refcount_.count);
166     gpr_log(file, line, GPR_LOG_SEVERITY_DEBUG,
167             "CFStream Handle ref %p : %s %" PRIdPTR " -> %" PRIdPTR, this,
168             reason, val, val + 1);
169   }
170   gpr_ref(&refcount_);
171 }
172 
Unref(const char * file,int line,const char * reason)173 void CFStreamHandle::Unref(const char* file, int line, const char* reason) {
174   if (grpc_tcp_trace.enabled()) {
175     gpr_atm val = gpr_atm_no_barrier_load(&refcount_.count);
176     gpr_log(GPR_ERROR,
177             "CFStream Handle unref %p : %s %" PRIdPTR " -> %" PRIdPTR, this,
178             reason, val, val - 1);
179   }
180   if (gpr_unref(&refcount_)) {
181     delete this;
182   }
183 }
184 
185 #endif
186