1 /*
2  *
3  * Copyright 2016 gRPC authors.
4  *
5  * Licensed under the Apache License, Version 2.0 (the "License");
6  * you may not use this file except in compliance with the License.
7  * You may obtain a copy of the License at
8  *
9  *     http://www.apache.org/licenses/LICENSE-2.0
10  *
11  * Unless required by applicable law or agreed to in writing, software
12  * distributed under the License is distributed on an "AS IS" BASIS,
13  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  * See the License for the specific language governing permissions and
15  * limitations under the License.
16  *
17  */
18 
19 #include <grpc/support/port_platform.h>
20 
21 #include <string.h>
22 
23 #include <grpc/slice_buffer.h>
24 #include <grpc/support/alloc.h>
25 #include <grpc/support/log.h>
26 #include <grpc/support/string_util.h>
27 
28 #include "src/core/ext/transport/chttp2/transport/bin_decoder.h"
29 #include "src/core/ext/transport/chttp2/transport/bin_encoder.h"
30 #include "src/core/ext/transport/chttp2/transport/incoming_metadata.h"
31 #include "src/core/ext/transport/cronet/transport/cronet_transport.h"
32 #include "src/core/lib/gpr/host_port.h"
33 #include "src/core/lib/gpr/string.h"
34 #include "src/core/lib/gprpp/manual_constructor.h"
35 #include "src/core/lib/iomgr/endpoint.h"
36 #include "src/core/lib/iomgr/exec_ctx.h"
37 #include "src/core/lib/slice/slice_internal.h"
38 #include "src/core/lib/slice/slice_string_helpers.h"
39 #include "src/core/lib/surface/channel.h"
40 #include "src/core/lib/transport/metadata_batch.h"
41 #include "src/core/lib/transport/static_metadata.h"
42 #include "src/core/lib/transport/transport_impl.h"
43 #include "third_party/objective_c/Cronet/bidirectional_stream_c.h"
44 
45 #define GRPC_HEADER_SIZE_IN_BYTES 5
46 #define GRPC_FLUSH_READ_SIZE 4096
47 
48 #define CRONET_LOG(...)                          \
49   do {                                           \
50     if (grpc_cronet_trace) gpr_log(__VA_ARGS__); \
51   } while (0)
52 
53 /* TODO (makdharma): Hook up into the wider tracing mechanism */
54 int grpc_cronet_trace = 0;
55 
56 enum e_op_result {
57   ACTION_TAKEN_WITH_CALLBACK,
58   ACTION_TAKEN_NO_CALLBACK,
59   NO_ACTION_POSSIBLE
60 };
61 
62 enum e_op_id {
63   OP_SEND_INITIAL_METADATA = 0,
64   OP_SEND_MESSAGE,
65   OP_SEND_TRAILING_METADATA,
66   OP_RECV_MESSAGE,
67   OP_RECV_INITIAL_METADATA,
68   OP_RECV_TRAILING_METADATA,
69   OP_CANCEL_ERROR,
70   OP_ON_COMPLETE,
71   OP_FAILED,
72   OP_SUCCEEDED,
73   OP_CANCELED,
74   OP_RECV_MESSAGE_AND_ON_COMPLETE,
75   OP_READ_REQ_MADE,
76   OP_NUM_OPS
77 };
78 
79 /* Cronet callbacks. See cronet_c_for_grpc.h for documentation for each. */
80 
81 static void on_stream_ready(bidirectional_stream*);
82 static void on_response_headers_received(
83     bidirectional_stream*, const bidirectional_stream_header_array*,
84     const char*);
85 static void on_write_completed(bidirectional_stream*, const char*);
86 static void on_read_completed(bidirectional_stream*, char*, int);
87 static void on_response_trailers_received(
88     bidirectional_stream*, const bidirectional_stream_header_array*);
89 static void on_succeeded(bidirectional_stream*);
90 static void on_failed(bidirectional_stream*, int);
91 static void on_canceled(bidirectional_stream*);
92 static bidirectional_stream_callback cronet_callbacks = {
93     on_stream_ready,
94     on_response_headers_received,
95     on_read_completed,
96     on_write_completed,
97     on_response_trailers_received,
98     on_succeeded,
99     on_failed,
100     on_canceled};
101 
102 /* Cronet transport object */
103 struct grpc_cronet_transport {
104   grpc_transport base; /* must be first element in this structure */
105   stream_engine* engine;
106   char* host;
107   bool use_packet_coalescing;
108 };
109 typedef struct grpc_cronet_transport grpc_cronet_transport;
110 
111 /* TODO (makdharma): reorder structure for memory efficiency per
112    http://www.catb.org/esr/structure-packing/#_structure_reordering: */
113 struct read_state {
114   /* vars to store data coming from server */
115   char* read_buffer;
116   bool length_field_received;
117   int received_bytes;
118   int remaining_bytes;
119   int length_field;
120   bool compressed;
121   char grpc_header_bytes[GRPC_HEADER_SIZE_IN_BYTES];
122   char* payload_field;
123   bool read_stream_closed;
124 
125   /* vars for holding data destined for the application */
126   grpc_core::ManualConstructor<grpc_core::SliceBufferByteStream> sbs;
127   grpc_slice_buffer read_slice_buffer;
128 
129   /* vars for trailing metadata */
130   grpc_chttp2_incoming_metadata_buffer trailing_metadata;
131   bool trailing_metadata_valid;
132 
133   /* vars for initial metadata */
134   grpc_chttp2_incoming_metadata_buffer initial_metadata;
135 };
136 
137 struct write_state {
138   char* write_buffer;
139 };
140 
141 /* track state of one stream op */
142 struct op_state {
143   bool state_op_done[OP_NUM_OPS];
144   bool state_callback_received[OP_NUM_OPS];
145   /* A non-zero gRPC status code has been seen */
146   bool fail_state;
147   /* Transport is discarding all buffered messages */
148   bool flush_read;
149   bool flush_cronet_when_ready;
150   bool pending_write_for_trailer;
151   bool pending_send_message;
152   /* User requested RECV_TRAILING_METADATA */
153   bool pending_recv_trailing_metadata;
154   /* Cronet has not issued a callback of a bidirectional read */
155   bool pending_read_from_cronet;
156   grpc_error* cancel_error;
157   /* data structure for storing data coming from server */
158   struct read_state rs;
159   /* data structure for storing data going to the server */
160   struct write_state ws;
161 };
162 
163 struct op_and_state {
164   grpc_transport_stream_op_batch op;
165   struct op_state state;
166   bool done;
167   struct stream_obj* s;      /* Pointer back to the stream object */
168   struct op_and_state* next; /* next op_and_state in the linked list */
169 };
170 
171 struct op_storage {
172   int num_pending_ops;
173   struct op_and_state* head;
174 };
175 
176 struct stream_obj {
177   gpr_arena* arena;
178   struct op_and_state* oas;
179   grpc_transport_stream_op_batch* curr_op;
180   grpc_cronet_transport* curr_ct;
181   grpc_stream* curr_gs;
182   bidirectional_stream* cbs;
183   bidirectional_stream_header_array header_array;
184 
185   /* Stream level state. Some state will be tracked both at stream and stream_op
186    * level */
187   struct op_state state;
188 
189   /* OP storage */
190   struct op_storage storage;
191 
192   /* Mutex to protect storage */
193   gpr_mu mu;
194 
195   /* Refcount object of the stream */
196   grpc_stream_refcount* refcount;
197 };
198 typedef struct stream_obj stream_obj;
199 
200 #ifndef NDEBUG
201 #define GRPC_CRONET_STREAM_REF(stream, reason) \
202   grpc_cronet_stream_ref((stream), (reason))
203 #define GRPC_CRONET_STREAM_UNREF(stream, reason) \
204   grpc_cronet_stream_unref((stream), (reason))
grpc_cronet_stream_ref(stream_obj * s,const char * reason)205 void grpc_cronet_stream_ref(stream_obj* s, const char* reason) {
206   grpc_stream_ref(s->refcount, reason);
207 }
grpc_cronet_stream_unref(stream_obj * s,const char * reason)208 void grpc_cronet_stream_unref(stream_obj* s, const char* reason) {
209   grpc_stream_unref(s->refcount, reason);
210 }
211 #else
212 #define GRPC_CRONET_STREAM_REF(stream, reason) grpc_cronet_stream_ref((stream))
213 #define GRPC_CRONET_STREAM_UNREF(stream, reason) \
214   grpc_cronet_stream_unref((stream))
grpc_cronet_stream_ref(stream_obj * s)215 void grpc_cronet_stream_ref(stream_obj* s) { grpc_stream_ref(s->refcount); }
grpc_cronet_stream_unref(stream_obj * s)216 void grpc_cronet_stream_unref(stream_obj* s) { grpc_stream_unref(s->refcount); }
217 #endif
218 
219 static enum e_op_result execute_stream_op(struct op_and_state* oas);
220 
221 /*
222   Utility function to translate enum into string for printing
223 */
op_result_string(enum e_op_result i)224 static const char* op_result_string(enum e_op_result i) {
225   switch (i) {
226     case ACTION_TAKEN_WITH_CALLBACK:
227       return "ACTION_TAKEN_WITH_CALLBACK";
228     case ACTION_TAKEN_NO_CALLBACK:
229       return "ACTION_TAKEN_NO_CALLBACK";
230     case NO_ACTION_POSSIBLE:
231       return "NO_ACTION_POSSIBLE";
232   }
233   GPR_UNREACHABLE_CODE(return "UNKNOWN");
234 }
235 
op_id_string(enum e_op_id i)236 static const char* op_id_string(enum e_op_id i) {
237   switch (i) {
238     case OP_SEND_INITIAL_METADATA:
239       return "OP_SEND_INITIAL_METADATA";
240     case OP_SEND_MESSAGE:
241       return "OP_SEND_MESSAGE";
242     case OP_SEND_TRAILING_METADATA:
243       return "OP_SEND_TRAILING_METADATA";
244     case OP_RECV_MESSAGE:
245       return "OP_RECV_MESSAGE";
246     case OP_RECV_INITIAL_METADATA:
247       return "OP_RECV_INITIAL_METADATA";
248     case OP_RECV_TRAILING_METADATA:
249       return "OP_RECV_TRAILING_METADATA";
250     case OP_CANCEL_ERROR:
251       return "OP_CANCEL_ERROR";
252     case OP_ON_COMPLETE:
253       return "OP_ON_COMPLETE";
254     case OP_FAILED:
255       return "OP_FAILED";
256     case OP_SUCCEEDED:
257       return "OP_SUCCEEDED";
258     case OP_CANCELED:
259       return "OP_CANCELED";
260     case OP_RECV_MESSAGE_AND_ON_COMPLETE:
261       return "OP_RECV_MESSAGE_AND_ON_COMPLETE";
262     case OP_READ_REQ_MADE:
263       return "OP_READ_REQ_MADE";
264     case OP_NUM_OPS:
265       return "OP_NUM_OPS";
266   }
267   return "UNKNOWN";
268 }
269 
null_and_maybe_free_read_buffer(stream_obj * s)270 static void null_and_maybe_free_read_buffer(stream_obj* s) {
271   if (s->state.rs.read_buffer &&
272       s->state.rs.read_buffer != s->state.rs.grpc_header_bytes) {
273     gpr_free(s->state.rs.read_buffer);
274   }
275   s->state.rs.read_buffer = nullptr;
276 }
277 
maybe_flush_read(stream_obj * s)278 static void maybe_flush_read(stream_obj* s) {
279   /* To enter flush read state (discarding all the buffered messages in
280    * transport layer), two conditions must be satisfied: 1) non-zero grpc status
281    * has been received, and 2) an op requesting the status code
282    * (RECV_TRAILING_METADATA) is issued by the user. (See
283    * doc/status_ordering.md) */
284   /* Whenever the evaluation of any of the two condition is changed, we check
285    * whether we should enter the flush read state. */
286   if (s->state.pending_recv_trailing_metadata && s->state.fail_state) {
287     if (!s->state.flush_read && !s->state.rs.read_stream_closed) {
288       CRONET_LOG(GPR_DEBUG, "%p: Flush read", s);
289       s->state.flush_read = true;
290       null_and_maybe_free_read_buffer(s);
291       s->state.rs.read_buffer =
292           static_cast<char*>(gpr_malloc(GRPC_FLUSH_READ_SIZE));
293       if (!s->state.pending_read_from_cronet) {
294         CRONET_LOG(GPR_DEBUG, "bidirectional_stream_read(%p)", s->cbs);
295         bidirectional_stream_read(s->cbs, s->state.rs.read_buffer,
296                                   GRPC_FLUSH_READ_SIZE);
297         s->state.pending_read_from_cronet = true;
298       }
299     }
300   }
301 }
302 
make_error_with_desc(int error_code,const char * desc)303 static grpc_error* make_error_with_desc(int error_code, const char* desc) {
304   grpc_error* error = GRPC_ERROR_CREATE_FROM_COPIED_STRING(desc);
305   error = grpc_error_set_int(error, GRPC_ERROR_INT_GRPC_STATUS, error_code);
306   return error;
307 }
308 
309 /*
310   Add a new stream op to op storage.
311 */
add_to_storage(struct stream_obj * s,grpc_transport_stream_op_batch * op)312 static void add_to_storage(struct stream_obj* s,
313                            grpc_transport_stream_op_batch* op) {
314   struct op_storage* storage = &s->storage;
315   /* add new op at the beginning of the linked list. The memory is freed
316   in remove_from_storage */
317   struct op_and_state* new_op = static_cast<struct op_and_state*>(
318       gpr_malloc(sizeof(struct op_and_state)));
319   memcpy(&new_op->op, op, sizeof(grpc_transport_stream_op_batch));
320   memset(&new_op->state, 0, sizeof(new_op->state));
321   new_op->s = s;
322   new_op->done = false;
323   gpr_mu_lock(&s->mu);
324   new_op->next = storage->head;
325   storage->head = new_op;
326   storage->num_pending_ops++;
327   if (op->send_message) {
328     s->state.pending_send_message = true;
329   }
330   if (op->recv_trailing_metadata) {
331     s->state.pending_recv_trailing_metadata = true;
332     maybe_flush_read(s);
333   }
334   CRONET_LOG(GPR_DEBUG, "adding new op %p. %d in the queue.", new_op,
335              storage->num_pending_ops);
336   gpr_mu_unlock(&s->mu);
337 }
338 
339 /*
340   Traverse the linked list and delete op and free memory
341 */
remove_from_storage(struct stream_obj * s,struct op_and_state * oas)342 static void remove_from_storage(struct stream_obj* s,
343                                 struct op_and_state* oas) {
344   struct op_and_state* curr;
345   if (s->storage.head == nullptr || oas == nullptr) {
346     return;
347   }
348   if (s->storage.head == oas) {
349     s->storage.head = oas->next;
350     gpr_free(oas);
351     s->storage.num_pending_ops--;
352     CRONET_LOG(GPR_DEBUG, "Freed %p. Now %d in the queue", oas,
353                s->storage.num_pending_ops);
354   } else {
355     for (curr = s->storage.head; curr != nullptr; curr = curr->next) {
356       if (curr->next == oas) {
357         curr->next = oas->next;
358         s->storage.num_pending_ops--;
359         CRONET_LOG(GPR_DEBUG, "Freed %p. Now %d in the queue", oas,
360                    s->storage.num_pending_ops);
361         gpr_free(oas);
362         break;
363       } else if (GPR_UNLIKELY(curr->next == nullptr)) {
364         CRONET_LOG(GPR_ERROR, "Reached end of LL and did not find op to free");
365       }
366     }
367   }
368 }
369 
370 /*
371   Cycle through ops and try to take next action. Break when either
372   an action with callback is taken, or no action is possible.
373   This can get executed from the Cronet network thread via cronet callback
374   or on the application supplied thread via the perform_stream_op function.
375 */
execute_from_storage(stream_obj * s)376 static void execute_from_storage(stream_obj* s) {
377   gpr_mu_lock(&s->mu);
378   for (struct op_and_state* curr = s->storage.head; curr != nullptr;) {
379     CRONET_LOG(GPR_DEBUG, "calling op at %p. done = %d", curr, curr->done);
380     GPR_ASSERT(curr->done == 0);
381     enum e_op_result result = execute_stream_op(curr);
382     CRONET_LOG(GPR_DEBUG, "execute_stream_op[%p] returns %s", curr,
383                op_result_string(result));
384     /* if this op is done, then remove it and free memory */
385     if (curr->done) {
386       struct op_and_state* next = curr->next;
387       remove_from_storage(s, curr);
388       curr = next;
389     }
390     /* continue processing the same op if ACTION_TAKEN_WITHOUT_CALLBACK */
391     if (result == NO_ACTION_POSSIBLE) {
392       curr = curr->next;
393     } else if (result == ACTION_TAKEN_WITH_CALLBACK) {
394       break;
395     }
396   }
397   gpr_mu_unlock(&s->mu);
398 }
399 
convert_cronet_array_to_metadata(const bidirectional_stream_header_array * header_array,grpc_chttp2_incoming_metadata_buffer * mds)400 static void convert_cronet_array_to_metadata(
401     const bidirectional_stream_header_array* header_array,
402     grpc_chttp2_incoming_metadata_buffer* mds) {
403   for (size_t i = 0; i < header_array->count; i++) {
404     CRONET_LOG(GPR_DEBUG, "header key=%s, value=%s",
405                header_array->headers[i].key, header_array->headers[i].value);
406     grpc_slice key = grpc_slice_intern(
407         grpc_slice_from_static_string(header_array->headers[i].key));
408     grpc_slice value;
409     if (grpc_is_binary_header(key)) {
410       value = grpc_slice_from_static_string(header_array->headers[i].value);
411       value = grpc_slice_intern(grpc_chttp2_base64_decode_with_length(
412           value, grpc_chttp2_base64_infer_length_after_decode(value)));
413     } else {
414       value = grpc_slice_intern(
415           grpc_slice_from_static_string(header_array->headers[i].value));
416     }
417     GRPC_LOG_IF_ERROR("convert_cronet_array_to_metadata",
418                       grpc_chttp2_incoming_metadata_buffer_add(
419                           mds, grpc_mdelem_from_slices(key, value)));
420   }
421 }
422 
423 /*
424   Cronet callback
425 */
on_failed(bidirectional_stream * stream,int net_error)426 static void on_failed(bidirectional_stream* stream, int net_error) {
427   gpr_log(GPR_ERROR, "on_failed(%p, %d)", stream, net_error);
428   grpc_core::ExecCtx exec_ctx;
429 
430   stream_obj* s = static_cast<stream_obj*>(stream->annotation);
431   gpr_mu_lock(&s->mu);
432   bidirectional_stream_destroy(s->cbs);
433   s->state.state_callback_received[OP_FAILED] = true;
434   s->cbs = nullptr;
435   if (s->header_array.headers) {
436     gpr_free(s->header_array.headers);
437     s->header_array.headers = nullptr;
438   }
439   if (s->state.ws.write_buffer) {
440     gpr_free(s->state.ws.write_buffer);
441     s->state.ws.write_buffer = nullptr;
442   }
443   null_and_maybe_free_read_buffer(s);
444   gpr_mu_unlock(&s->mu);
445   execute_from_storage(s);
446   GRPC_CRONET_STREAM_UNREF(s, "cronet transport");
447 }
448 
449 /*
450   Cronet callback
451 */
on_canceled(bidirectional_stream * stream)452 static void on_canceled(bidirectional_stream* stream) {
453   CRONET_LOG(GPR_DEBUG, "on_canceled(%p)", stream);
454   grpc_core::ExecCtx exec_ctx;
455 
456   stream_obj* s = static_cast<stream_obj*>(stream->annotation);
457   gpr_mu_lock(&s->mu);
458   bidirectional_stream_destroy(s->cbs);
459   s->state.state_callback_received[OP_CANCELED] = true;
460   s->cbs = nullptr;
461   if (s->header_array.headers) {
462     gpr_free(s->header_array.headers);
463     s->header_array.headers = nullptr;
464   }
465   if (s->state.ws.write_buffer) {
466     gpr_free(s->state.ws.write_buffer);
467     s->state.ws.write_buffer = nullptr;
468   }
469   null_and_maybe_free_read_buffer(s);
470   gpr_mu_unlock(&s->mu);
471   execute_from_storage(s);
472   GRPC_CRONET_STREAM_UNREF(s, "cronet transport");
473 }
474 
475 /*
476   Cronet callback
477 */
on_succeeded(bidirectional_stream * stream)478 static void on_succeeded(bidirectional_stream* stream) {
479   CRONET_LOG(GPR_DEBUG, "on_succeeded(%p)", stream);
480   grpc_core::ExecCtx exec_ctx;
481 
482   stream_obj* s = static_cast<stream_obj*>(stream->annotation);
483   gpr_mu_lock(&s->mu);
484   bidirectional_stream_destroy(s->cbs);
485   s->state.state_callback_received[OP_SUCCEEDED] = true;
486   s->cbs = nullptr;
487   null_and_maybe_free_read_buffer(s);
488   gpr_mu_unlock(&s->mu);
489   execute_from_storage(s);
490   GRPC_CRONET_STREAM_UNREF(s, "cronet transport");
491 }
492 
493 /*
494   Cronet callback
495 */
on_stream_ready(bidirectional_stream * stream)496 static void on_stream_ready(bidirectional_stream* stream) {
497   CRONET_LOG(GPR_DEBUG, "W: on_stream_ready(%p)", stream);
498   grpc_core::ExecCtx exec_ctx;
499   stream_obj* s = static_cast<stream_obj*>(stream->annotation);
500   grpc_cronet_transport* t = s->curr_ct;
501   gpr_mu_lock(&s->mu);
502   s->state.state_op_done[OP_SEND_INITIAL_METADATA] = true;
503   s->state.state_callback_received[OP_SEND_INITIAL_METADATA] = true;
504   /* Free the memory allocated for headers */
505   if (s->header_array.headers) {
506     gpr_free(s->header_array.headers);
507     s->header_array.headers = nullptr;
508   }
509   /* Send the initial metadata on wire if there is no SEND_MESSAGE or
510    * SEND_TRAILING_METADATA ops pending */
511   if (t->use_packet_coalescing) {
512     if (s->state.flush_cronet_when_ready) {
513       CRONET_LOG(GPR_DEBUG, "cronet_bidirectional_stream_flush (%p)", s->cbs);
514       bidirectional_stream_flush(stream);
515     }
516   }
517   gpr_mu_unlock(&s->mu);
518   execute_from_storage(s);
519 }
520 
521 /*
522   Cronet callback
523 */
on_response_headers_received(bidirectional_stream * stream,const bidirectional_stream_header_array * headers,const char * negotiated_protocol)524 static void on_response_headers_received(
525     bidirectional_stream* stream,
526     const bidirectional_stream_header_array* headers,
527     const char* negotiated_protocol) {
528   grpc_core::ExecCtx exec_ctx;
529   CRONET_LOG(GPR_DEBUG, "R: on_response_headers_received(%p, %p, %s)", stream,
530              headers, negotiated_protocol);
531   stream_obj* s = static_cast<stream_obj*>(stream->annotation);
532 
533   /* Identify if this is a header or a trailer (in a trailer-only response case)
534    */
535   for (size_t i = 0; i < headers->count; i++) {
536     if (0 == strcmp("grpc-status", headers->headers[i].key)) {
537       on_response_trailers_received(stream, headers);
538       return;
539     }
540   }
541 
542   gpr_mu_lock(&s->mu);
543   memset(&s->state.rs.initial_metadata, 0,
544          sizeof(s->state.rs.initial_metadata));
545   grpc_chttp2_incoming_metadata_buffer_init(&s->state.rs.initial_metadata,
546                                             s->arena);
547   convert_cronet_array_to_metadata(headers, &s->state.rs.initial_metadata);
548   s->state.state_callback_received[OP_RECV_INITIAL_METADATA] = true;
549   if (!(s->state.state_op_done[OP_CANCEL_ERROR] ||
550         s->state.state_callback_received[OP_FAILED])) {
551     /* Do an extra read to trigger on_succeeded() callback in case connection
552      is closed */
553     GPR_ASSERT(s->state.rs.length_field_received == false);
554     s->state.rs.read_buffer = s->state.rs.grpc_header_bytes;
555     s->state.rs.compressed = false;
556     s->state.rs.received_bytes = 0;
557     s->state.rs.remaining_bytes = GRPC_HEADER_SIZE_IN_BYTES;
558     CRONET_LOG(GPR_DEBUG, "bidirectional_stream_read(%p)", s->cbs);
559     bidirectional_stream_read(s->cbs, s->state.rs.read_buffer,
560                               s->state.rs.remaining_bytes);
561     s->state.pending_read_from_cronet = true;
562   }
563   gpr_mu_unlock(&s->mu);
564   execute_from_storage(s);
565 }
566 
567 /*
568   Cronet callback
569 */
on_write_completed(bidirectional_stream * stream,const char * data)570 static void on_write_completed(bidirectional_stream* stream, const char* data) {
571   grpc_core::ExecCtx exec_ctx;
572   stream_obj* s = static_cast<stream_obj*>(stream->annotation);
573   CRONET_LOG(GPR_DEBUG, "W: on_write_completed(%p, %s)", stream, data);
574   gpr_mu_lock(&s->mu);
575   if (s->state.ws.write_buffer) {
576     gpr_free(s->state.ws.write_buffer);
577     s->state.ws.write_buffer = nullptr;
578   }
579   s->state.state_callback_received[OP_SEND_MESSAGE] = true;
580   gpr_mu_unlock(&s->mu);
581   execute_from_storage(s);
582 }
583 
584 /*
585   Cronet callback
586 */
on_read_completed(bidirectional_stream * stream,char * data,int count)587 static void on_read_completed(bidirectional_stream* stream, char* data,
588                               int count) {
589   grpc_core::ExecCtx exec_ctx;
590   stream_obj* s = static_cast<stream_obj*>(stream->annotation);
591   CRONET_LOG(GPR_DEBUG, "R: on_read_completed(%p, %p, %d)", stream, data,
592              count);
593   gpr_mu_lock(&s->mu);
594   s->state.pending_read_from_cronet = false;
595   s->state.state_callback_received[OP_RECV_MESSAGE] = true;
596   if (count > 0 && s->state.flush_read) {
597     CRONET_LOG(GPR_DEBUG, "bidirectional_stream_read(%p)", s->cbs);
598     bidirectional_stream_read(s->cbs, s->state.rs.read_buffer,
599                               GRPC_FLUSH_READ_SIZE);
600     s->state.pending_read_from_cronet = true;
601     gpr_mu_unlock(&s->mu);
602   } else if (count > 0) {
603     s->state.rs.received_bytes += count;
604     s->state.rs.remaining_bytes -= count;
605     if (s->state.rs.remaining_bytes > 0) {
606       CRONET_LOG(GPR_DEBUG, "bidirectional_stream_read(%p)", s->cbs);
607       s->state.state_op_done[OP_READ_REQ_MADE] = true;
608       bidirectional_stream_read(
609           s->cbs, s->state.rs.read_buffer + s->state.rs.received_bytes,
610           s->state.rs.remaining_bytes);
611       s->state.pending_read_from_cronet = true;
612       gpr_mu_unlock(&s->mu);
613     } else {
614       gpr_mu_unlock(&s->mu);
615       execute_from_storage(s);
616     }
617   } else {
618     null_and_maybe_free_read_buffer(s);
619     s->state.rs.read_stream_closed = true;
620     gpr_mu_unlock(&s->mu);
621     execute_from_storage(s);
622   }
623 }
624 
625 /*
626   Cronet callback
627 */
on_response_trailers_received(bidirectional_stream * stream,const bidirectional_stream_header_array * trailers)628 static void on_response_trailers_received(
629     bidirectional_stream* stream,
630     const bidirectional_stream_header_array* trailers) {
631   grpc_core::ExecCtx exec_ctx;
632   CRONET_LOG(GPR_DEBUG, "R: on_response_trailers_received(%p,%p)", stream,
633              trailers);
634   stream_obj* s = static_cast<stream_obj*>(stream->annotation);
635   grpc_cronet_transport* t = s->curr_ct;
636   gpr_mu_lock(&s->mu);
637   memset(&s->state.rs.trailing_metadata, 0,
638          sizeof(s->state.rs.trailing_metadata));
639   s->state.rs.trailing_metadata_valid = false;
640   grpc_chttp2_incoming_metadata_buffer_init(&s->state.rs.trailing_metadata,
641                                             s->arena);
642   convert_cronet_array_to_metadata(trailers, &s->state.rs.trailing_metadata);
643   if (trailers->count > 0) {
644     s->state.rs.trailing_metadata_valid = true;
645   }
646   for (size_t i = 0; i < trailers->count; i++) {
647     if (0 == strcmp(trailers->headers[i].key, "grpc-status") &&
648         0 != strcmp(trailers->headers[i].value, "0")) {
649       s->state.fail_state = true;
650       maybe_flush_read(s);
651     }
652   }
653   s->state.state_callback_received[OP_RECV_TRAILING_METADATA] = true;
654   /* Send a EOS when server terminates the stream (testServerFinishesRequest) to
655    * trigger on_succeeded */
656   if (!s->state.state_op_done[OP_SEND_TRAILING_METADATA] &&
657       !(s->state.state_op_done[OP_CANCEL_ERROR] ||
658         s->state.state_callback_received[OP_FAILED])) {
659     CRONET_LOG(GPR_DEBUG, "bidirectional_stream_write (%p, 0)", s->cbs);
660     s->state.state_callback_received[OP_SEND_MESSAGE] = false;
661     bidirectional_stream_write(s->cbs, "", 0, true);
662     if (t->use_packet_coalescing) {
663       CRONET_LOG(GPR_DEBUG, "bidirectional_stream_flush (%p)", s->cbs);
664       bidirectional_stream_flush(s->cbs);
665     }
666     s->state.state_op_done[OP_SEND_TRAILING_METADATA] = true;
667 
668     gpr_mu_unlock(&s->mu);
669   } else {
670     gpr_mu_unlock(&s->mu);
671     execute_from_storage(s);
672   }
673 }
674 
675 /*
676  Utility function that takes the data from s->write_slice_buffer and assembles
677  into a contiguous byte stream with 5 byte gRPC header prepended.
678 */
create_grpc_frame(grpc_slice_buffer * write_slice_buffer,char ** pp_write_buffer,size_t * p_write_buffer_size,uint32_t flags)679 static void create_grpc_frame(grpc_slice_buffer* write_slice_buffer,
680                               char** pp_write_buffer,
681                               size_t* p_write_buffer_size, uint32_t flags) {
682   grpc_slice slice = grpc_slice_buffer_take_first(write_slice_buffer);
683   size_t length = GRPC_SLICE_LENGTH(slice);
684   *p_write_buffer_size = length + GRPC_HEADER_SIZE_IN_BYTES;
685   /* This is freed in the on_write_completed callback */
686   char* write_buffer =
687       static_cast<char*>(gpr_malloc(length + GRPC_HEADER_SIZE_IN_BYTES));
688   *pp_write_buffer = write_buffer;
689   uint8_t* p = reinterpret_cast<uint8_t*>(write_buffer);
690   /* Append 5 byte header */
691   /* Compressed flag */
692   *p++ = static_cast<uint8_t>((flags & GRPC_WRITE_INTERNAL_COMPRESS) ? 1 : 0);
693   /* Message length */
694   *p++ = static_cast<uint8_t>(length >> 24);
695   *p++ = static_cast<uint8_t>(length >> 16);
696   *p++ = static_cast<uint8_t>(length >> 8);
697   *p++ = static_cast<uint8_t>(length);
698   /* append actual data */
699   memcpy(p, GRPC_SLICE_START_PTR(slice), length);
700   grpc_slice_unref_internal(slice);
701 }
702 
703 /*
704  Convert metadata in a format that Cronet can consume
705 */
convert_metadata_to_cronet_headers(grpc_linked_mdelem * head,const char * host,char ** pp_url,bidirectional_stream_header ** pp_headers,size_t * p_num_headers,const char ** method)706 static void convert_metadata_to_cronet_headers(
707     grpc_linked_mdelem* head, const char* host, char** pp_url,
708     bidirectional_stream_header** pp_headers, size_t* p_num_headers,
709     const char** method) {
710   grpc_linked_mdelem* curr = head;
711   /* Walk the linked list and get number of header fields */
712   size_t num_headers_available = 0;
713   while (curr != nullptr) {
714     curr = curr->next;
715     num_headers_available++;
716   }
717   /* Allocate enough memory. It is freed in the on_stream_ready callback
718    */
719   bidirectional_stream_header* headers =
720       static_cast<bidirectional_stream_header*>(gpr_malloc(
721           sizeof(bidirectional_stream_header) * num_headers_available));
722   *pp_headers = headers;
723 
724   /* Walk the linked list again, this time copying the header fields.
725     s->num_headers can be less than num_headers_available, as some headers
726     are not used for cronet.
727     TODO (makdharma): Eliminate need to traverse the LL second time for perf.
728    */
729   curr = head;
730   size_t num_headers = 0;
731   while (num_headers < num_headers_available) {
732     grpc_mdelem mdelem = curr->md;
733     curr = curr->next;
734     char* key = grpc_slice_to_c_string(GRPC_MDKEY(mdelem));
735     char* value;
736     if (grpc_is_binary_header(GRPC_MDKEY(mdelem))) {
737       grpc_slice wire_value = grpc_chttp2_base64_encode(GRPC_MDVALUE(mdelem));
738       value = grpc_slice_to_c_string(wire_value);
739       grpc_slice_unref_internal(wire_value);
740     } else {
741       value = grpc_slice_to_c_string(GRPC_MDVALUE(mdelem));
742     }
743     if (grpc_slice_eq(GRPC_MDKEY(mdelem), GRPC_MDSTR_SCHEME) ||
744         grpc_slice_eq(GRPC_MDKEY(mdelem), GRPC_MDSTR_AUTHORITY)) {
745       /* Cronet populates these fields on its own */
746       gpr_free(key);
747       gpr_free(value);
748       continue;
749     }
750     if (grpc_slice_eq(GRPC_MDKEY(mdelem), GRPC_MDSTR_METHOD)) {
751       if (grpc_slice_eq(GRPC_MDVALUE(mdelem), GRPC_MDSTR_PUT)) {
752         *method = "PUT";
753       } else {
754         /* POST method in default*/
755         *method = "POST";
756       }
757       gpr_free(key);
758       gpr_free(value);
759       continue;
760     }
761     if (grpc_slice_eq(GRPC_MDKEY(mdelem), GRPC_MDSTR_PATH)) {
762       /* Create URL by appending :path value to the hostname */
763       gpr_asprintf(pp_url, "https://%s%s", host, value);
764       gpr_free(key);
765       gpr_free(value);
766       continue;
767     }
768     CRONET_LOG(GPR_DEBUG, "header %s = %s", key, value);
769     headers[num_headers].key = key;
770     headers[num_headers].value = value;
771     num_headers++;
772     if (curr == nullptr) {
773       break;
774     }
775   }
776   *p_num_headers = num_headers;
777 }
778 
parse_grpc_header(const uint8_t * data,int * length,bool * compressed)779 static void parse_grpc_header(const uint8_t* data, int* length,
780                               bool* compressed) {
781   const uint8_t c = *data;
782   const uint8_t* p = data + 1;
783   *compressed = ((c & 0x01) == 0x01);
784   *length = 0;
785   *length |= (*p++) << 24;
786   *length |= (*p++) << 16;
787   *length |= (*p++) << 8;
788   *length |= (*p++);
789 }
790 
header_has_authority(grpc_linked_mdelem * head)791 static bool header_has_authority(grpc_linked_mdelem* head) {
792   while (head != nullptr) {
793     if (grpc_slice_eq(GRPC_MDKEY(head->md), GRPC_MDSTR_AUTHORITY)) {
794       return true;
795     }
796     head = head->next;
797   }
798   return false;
799 }
800 
801 /*
802   Op Execution: Decide if one of the actions contained in the stream op can be
803   executed. This is the heart of the state machine.
804 */
op_can_be_run(grpc_transport_stream_op_batch * curr_op,struct stream_obj * s,struct op_state * op_state,enum e_op_id op_id)805 static bool op_can_be_run(grpc_transport_stream_op_batch* curr_op,
806                           struct stream_obj* s, struct op_state* op_state,
807                           enum e_op_id op_id) {
808   struct op_state* stream_state = &s->state;
809   grpc_cronet_transport* t = s->curr_ct;
810   bool result = true;
811   /* When call is canceled, every op can be run, except under following
812   conditions
813   */
814   bool is_canceled_or_failed = stream_state->state_op_done[OP_CANCEL_ERROR] ||
815                                stream_state->state_callback_received[OP_FAILED];
816   if (is_canceled_or_failed) {
817     if (op_id == OP_SEND_INITIAL_METADATA) {
818       CRONET_LOG(GPR_DEBUG, "Because");
819       result = false;
820     }
821     if (op_id == OP_SEND_MESSAGE) {
822       CRONET_LOG(GPR_DEBUG, "Because");
823       result = false;
824     }
825     if (op_id == OP_SEND_TRAILING_METADATA) {
826       CRONET_LOG(GPR_DEBUG, "Because");
827       result = false;
828     }
829     if (op_id == OP_CANCEL_ERROR) {
830       CRONET_LOG(GPR_DEBUG, "Because");
831       result = false;
832     }
833     /* already executed */
834     if (op_id == OP_RECV_INITIAL_METADATA &&
835         stream_state->state_op_done[OP_RECV_INITIAL_METADATA]) {
836       CRONET_LOG(GPR_DEBUG, "Because");
837       result = false;
838     }
839     if (op_id == OP_RECV_MESSAGE && op_state->state_op_done[OP_RECV_MESSAGE]) {
840       CRONET_LOG(GPR_DEBUG, "Because");
841       result = false;
842     }
843     if (op_id == OP_RECV_TRAILING_METADATA &&
844         stream_state->state_op_done[OP_RECV_TRAILING_METADATA]) {
845       CRONET_LOG(GPR_DEBUG, "Because");
846       result = false;
847     }
848     /* ON_COMPLETE can be processed if one of the following conditions is met:
849      * 1. the stream failed
850      * 2. the stream is cancelled, and the callback is received
851      * 3. the stream succeeded before cancel is effective
852      * 4. the stream is cancelled, and the stream is never started */
853     if (op_id == OP_ON_COMPLETE &&
854         !(stream_state->state_callback_received[OP_FAILED] ||
855           stream_state->state_callback_received[OP_CANCELED] ||
856           stream_state->state_callback_received[OP_SUCCEEDED] ||
857           !stream_state->state_op_done[OP_SEND_INITIAL_METADATA])) {
858       CRONET_LOG(GPR_DEBUG, "Because");
859       result = false;
860     }
861   } else if (op_id == OP_SEND_INITIAL_METADATA) {
862     /* already executed */
863     if (stream_state->state_op_done[OP_SEND_INITIAL_METADATA]) result = false;
864   } else if (op_id == OP_RECV_INITIAL_METADATA) {
865     /* already executed */
866     if (stream_state->state_op_done[OP_RECV_INITIAL_METADATA]) result = false;
867     /* we haven't sent headers yet. */
868     else if (!stream_state->state_callback_received[OP_SEND_INITIAL_METADATA])
869       result = false;
870     /* we haven't received headers yet. */
871     else if (!stream_state->state_callback_received[OP_RECV_INITIAL_METADATA] &&
872              !stream_state->state_op_done[OP_RECV_TRAILING_METADATA])
873       result = false;
874   } else if (op_id == OP_SEND_MESSAGE) {
875     /* already executed (note we're checking op specific state, not stream
876      state) */
877     if (op_state->state_op_done[OP_SEND_MESSAGE]) result = false;
878     /* we haven't sent headers yet. */
879     else if (!stream_state->state_callback_received[OP_SEND_INITIAL_METADATA])
880       result = false;
881   } else if (op_id == OP_RECV_MESSAGE) {
882     /* already executed */
883     if (op_state->state_op_done[OP_RECV_MESSAGE]) result = false;
884     /* we haven't received headers yet. */
885     else if (!stream_state->state_callback_received[OP_RECV_INITIAL_METADATA] &&
886              !stream_state->state_op_done[OP_RECV_TRAILING_METADATA])
887       result = false;
888   } else if (op_id == OP_RECV_TRAILING_METADATA) {
889     /* already executed */
890     if (stream_state->state_op_done[OP_RECV_TRAILING_METADATA]) result = false;
891     /* we have asked for but haven't received message yet. */
892     else if (stream_state->state_op_done[OP_READ_REQ_MADE] &&
893              !stream_state->state_op_done[OP_RECV_MESSAGE])
894       result = false;
895     /* we haven't received trailers  yet. */
896     else if (!stream_state->state_callback_received[OP_RECV_TRAILING_METADATA])
897       result = false;
898     /* we haven't received on_succeeded  yet. */
899     else if (!stream_state->state_callback_received[OP_SUCCEEDED])
900       result = false;
901   } else if (op_id == OP_SEND_TRAILING_METADATA) {
902     /* already executed */
903     if (stream_state->state_op_done[OP_SEND_TRAILING_METADATA]) result = false;
904     /* we haven't sent initial metadata yet */
905     else if (!stream_state->state_callback_received[OP_SEND_INITIAL_METADATA])
906       result = false;
907     /* we haven't sent message yet */
908     else if (stream_state->pending_send_message &&
909              !stream_state->state_op_done[OP_SEND_MESSAGE])
910       result = false;
911     /* we haven't got on_write_completed for the send yet */
912     else if (stream_state->state_op_done[OP_SEND_MESSAGE] &&
913              !stream_state->state_callback_received[OP_SEND_MESSAGE] &&
914              !(t->use_packet_coalescing &&
915                stream_state->pending_write_for_trailer))
916       result = false;
917   } else if (op_id == OP_CANCEL_ERROR) {
918     /* already executed */
919     if (stream_state->state_op_done[OP_CANCEL_ERROR]) result = false;
920   } else if (op_id == OP_ON_COMPLETE) {
921     /* already executed (note we're checking op specific state, not stream
922     state) */
923     if (op_state->state_op_done[OP_ON_COMPLETE]) {
924       CRONET_LOG(GPR_DEBUG, "Because");
925       result = false;
926     }
927     /* Check if every op that was asked for is done. */
928     /* TODO(muxi): We should not consider the recv ops here, since they
929      * have their own callbacks.  We should invoke a batch's on_complete
930      * as soon as all of the batch's send ops are complete, even if
931      * there are still recv ops pending. */
932     else if (curr_op->send_initial_metadata &&
933              !stream_state->state_callback_received[OP_SEND_INITIAL_METADATA]) {
934       CRONET_LOG(GPR_DEBUG, "Because");
935       result = false;
936     } else if (curr_op->send_message &&
937                !op_state->state_op_done[OP_SEND_MESSAGE]) {
938       CRONET_LOG(GPR_DEBUG, "Because");
939       result = false;
940     } else if (curr_op->send_message &&
941                !stream_state->state_callback_received[OP_SEND_MESSAGE]) {
942       CRONET_LOG(GPR_DEBUG, "Because");
943       result = false;
944     } else if (curr_op->send_trailing_metadata &&
945                !stream_state->state_op_done[OP_SEND_TRAILING_METADATA]) {
946       CRONET_LOG(GPR_DEBUG, "Because");
947       result = false;
948     } else if (curr_op->recv_initial_metadata &&
949                !stream_state->state_op_done[OP_RECV_INITIAL_METADATA]) {
950       CRONET_LOG(GPR_DEBUG, "Because");
951       result = false;
952     } else if (curr_op->recv_message &&
953                !op_state->state_op_done[OP_RECV_MESSAGE]) {
954       CRONET_LOG(GPR_DEBUG, "Because");
955       result = false;
956     } else if (curr_op->cancel_stream &&
957                !stream_state->state_callback_received[OP_CANCELED]) {
958       CRONET_LOG(GPR_DEBUG, "Because");
959       result = false;
960     } else if (curr_op->recv_trailing_metadata) {
961       /* We aren't done with trailing metadata yet */
962       if (!stream_state->state_op_done[OP_RECV_TRAILING_METADATA]) {
963         CRONET_LOG(GPR_DEBUG, "Because");
964         result = false;
965       }
966       /* We've asked for actual message in an earlier op, and it hasn't been
967         delivered yet. */
968       else if (stream_state->state_op_done[OP_READ_REQ_MADE]) {
969         /* If this op is not the one asking for read, (which means some earlier
970           op has asked), and the read hasn't been delivered. */
971         if (!curr_op->recv_message &&
972             !stream_state->state_callback_received[OP_SUCCEEDED]) {
973           CRONET_LOG(GPR_DEBUG, "Because");
974           result = false;
975         }
976       }
977     }
978     /* We should see at least one on_write_completed for the trailers that we
979       sent */
980     else if (curr_op->send_trailing_metadata &&
981              !stream_state->state_callback_received[OP_SEND_MESSAGE])
982       result = false;
983   }
984   CRONET_LOG(GPR_DEBUG, "op_can_be_run %s : %s", op_id_string(op_id),
985              result ? "YES" : "NO");
986   return result;
987 }
988 
989 /*
990   TODO (makdharma): Break down this function in smaller chunks for readability.
991 */
execute_stream_op(struct op_and_state * oas)992 static enum e_op_result execute_stream_op(struct op_and_state* oas) {
993   grpc_transport_stream_op_batch* stream_op = &oas->op;
994   struct stream_obj* s = oas->s;
995   grpc_cronet_transport* t = s->curr_ct;
996   struct op_state* stream_state = &s->state;
997   enum e_op_result result = NO_ACTION_POSSIBLE;
998   if (stream_op->send_initial_metadata &&
999       op_can_be_run(stream_op, s, &oas->state, OP_SEND_INITIAL_METADATA)) {
1000     CRONET_LOG(GPR_DEBUG, "running: %p OP_SEND_INITIAL_METADATA", oas);
1001     /* Start new cronet stream. It is destroyed in on_succeeded, on_canceled,
1002      * on_failed */
1003     GPR_ASSERT(s->cbs == nullptr);
1004     GPR_ASSERT(!stream_state->state_op_done[OP_SEND_INITIAL_METADATA]);
1005     s->cbs =
1006         bidirectional_stream_create(t->engine, s->curr_gs, &cronet_callbacks);
1007     CRONET_LOG(GPR_DEBUG, "%p = bidirectional_stream_create()", s->cbs);
1008     if (t->use_packet_coalescing) {
1009       bidirectional_stream_disable_auto_flush(s->cbs, true);
1010       bidirectional_stream_delay_request_headers_until_flush(s->cbs, true);
1011     }
1012     char* url = nullptr;
1013     const char* method = "POST";
1014     s->header_array.headers = nullptr;
1015     convert_metadata_to_cronet_headers(stream_op->payload->send_initial_metadata
1016                                            .send_initial_metadata->list.head,
1017                                        t->host, &url, &s->header_array.headers,
1018                                        &s->header_array.count, &method);
1019     s->header_array.capacity = s->header_array.count;
1020     CRONET_LOG(GPR_DEBUG, "bidirectional_stream_start(%p, %s)", s->cbs, url);
1021     bidirectional_stream_start(s->cbs, url, 0, method, &s->header_array, false);
1022     if (url) {
1023       gpr_free(url);
1024     }
1025     unsigned int header_index;
1026     for (header_index = 0; header_index < s->header_array.count;
1027          header_index++) {
1028       gpr_free((void*)s->header_array.headers[header_index].key);
1029       gpr_free((void*)s->header_array.headers[header_index].value);
1030     }
1031     stream_state->state_op_done[OP_SEND_INITIAL_METADATA] = true;
1032     if (t->use_packet_coalescing) {
1033       if (!stream_op->send_message && !stream_op->send_trailing_metadata) {
1034         s->state.flush_cronet_when_ready = true;
1035       }
1036     }
1037     result = ACTION_TAKEN_WITH_CALLBACK;
1038   } else if (stream_op->send_message &&
1039              op_can_be_run(stream_op, s, &oas->state, OP_SEND_MESSAGE)) {
1040     CRONET_LOG(GPR_DEBUG, "running: %p  OP_SEND_MESSAGE", oas);
1041     stream_state->pending_send_message = false;
1042     if (stream_state->state_callback_received[OP_FAILED]) {
1043       result = NO_ACTION_POSSIBLE;
1044       CRONET_LOG(GPR_DEBUG, "Stream is either cancelled or failed.");
1045     } else {
1046       grpc_slice_buffer write_slice_buffer;
1047       grpc_slice slice;
1048       grpc_slice_buffer_init(&write_slice_buffer);
1049       if (1 != stream_op->payload->send_message.send_message->Next(
1050                    stream_op->payload->send_message.send_message->length(),
1051                    nullptr)) {
1052         /* Should never reach here */
1053         GPR_ASSERT(false);
1054       }
1055       if (GRPC_ERROR_NONE !=
1056           stream_op->payload->send_message.send_message->Pull(&slice)) {
1057         /* Should never reach here */
1058         GPR_ASSERT(false);
1059       }
1060       grpc_slice_buffer_add(&write_slice_buffer, slice);
1061       if (GPR_UNLIKELY(write_slice_buffer.count != 1)) {
1062         /* Empty request not handled yet */
1063         gpr_log(GPR_ERROR, "Empty request is not supported");
1064         GPR_ASSERT(write_slice_buffer.count == 1);
1065       }
1066       if (write_slice_buffer.count > 0) {
1067         size_t write_buffer_size;
1068         create_grpc_frame(
1069             &write_slice_buffer, &stream_state->ws.write_buffer,
1070             &write_buffer_size,
1071             stream_op->payload->send_message.send_message->flags());
1072         CRONET_LOG(GPR_DEBUG, "bidirectional_stream_write (%p, %p)", s->cbs,
1073                    stream_state->ws.write_buffer);
1074         stream_state->state_callback_received[OP_SEND_MESSAGE] = false;
1075         bidirectional_stream_write(s->cbs, stream_state->ws.write_buffer,
1076                                    static_cast<int>(write_buffer_size), false);
1077         grpc_slice_buffer_destroy_internal(&write_slice_buffer);
1078         if (t->use_packet_coalescing) {
1079           if (!stream_op->send_trailing_metadata) {
1080             CRONET_LOG(GPR_DEBUG, "bidirectional_stream_flush (%p)", s->cbs);
1081             bidirectional_stream_flush(s->cbs);
1082             result = ACTION_TAKEN_WITH_CALLBACK;
1083           } else {
1084             stream_state->pending_write_for_trailer = true;
1085             result = ACTION_TAKEN_NO_CALLBACK;
1086           }
1087         } else {
1088           result = ACTION_TAKEN_WITH_CALLBACK;
1089         }
1090       } else {
1091         result = NO_ACTION_POSSIBLE;
1092       }
1093     }
1094     stream_state->state_op_done[OP_SEND_MESSAGE] = true;
1095     oas->state.state_op_done[OP_SEND_MESSAGE] = true;
1096     stream_op->payload->send_message.send_message.reset();
1097   } else if (stream_op->send_trailing_metadata &&
1098              op_can_be_run(stream_op, s, &oas->state,
1099                            OP_SEND_TRAILING_METADATA)) {
1100     CRONET_LOG(GPR_DEBUG, "running: %p  OP_SEND_TRAILING_METADATA", oas);
1101     if (stream_state->state_callback_received[OP_FAILED]) {
1102       result = NO_ACTION_POSSIBLE;
1103       CRONET_LOG(GPR_DEBUG, "Stream is either cancelled or failed.");
1104     } else {
1105       CRONET_LOG(GPR_DEBUG, "bidirectional_stream_write (%p, 0)", s->cbs);
1106       stream_state->state_callback_received[OP_SEND_MESSAGE] = false;
1107       bidirectional_stream_write(s->cbs, "", 0, true);
1108       if (t->use_packet_coalescing) {
1109         CRONET_LOG(GPR_DEBUG, "bidirectional_stream_flush (%p)", s->cbs);
1110         bidirectional_stream_flush(s->cbs);
1111       }
1112       result = ACTION_TAKEN_WITH_CALLBACK;
1113     }
1114     stream_state->state_op_done[OP_SEND_TRAILING_METADATA] = true;
1115   } else if (stream_op->recv_initial_metadata &&
1116              op_can_be_run(stream_op, s, &oas->state,
1117                            OP_RECV_INITIAL_METADATA)) {
1118     CRONET_LOG(GPR_DEBUG, "running: %p  OP_RECV_INITIAL_METADATA", oas);
1119     if (stream_state->state_op_done[OP_CANCEL_ERROR]) {
1120       GRPC_CLOSURE_SCHED(
1121           stream_op->payload->recv_initial_metadata.recv_initial_metadata_ready,
1122           GRPC_ERROR_NONE);
1123     } else if (stream_state->state_callback_received[OP_FAILED]) {
1124       GRPC_CLOSURE_SCHED(
1125           stream_op->payload->recv_initial_metadata.recv_initial_metadata_ready,
1126           GRPC_ERROR_NONE);
1127     } else if (stream_state->state_op_done[OP_RECV_TRAILING_METADATA]) {
1128       GRPC_CLOSURE_SCHED(
1129           stream_op->payload->recv_initial_metadata.recv_initial_metadata_ready,
1130           GRPC_ERROR_NONE);
1131     } else {
1132       grpc_chttp2_incoming_metadata_buffer_publish(
1133           &oas->s->state.rs.initial_metadata,
1134           stream_op->payload->recv_initial_metadata.recv_initial_metadata);
1135       GRPC_CLOSURE_SCHED(
1136           stream_op->payload->recv_initial_metadata.recv_initial_metadata_ready,
1137           GRPC_ERROR_NONE);
1138     }
1139     stream_state->state_op_done[OP_RECV_INITIAL_METADATA] = true;
1140     result = ACTION_TAKEN_NO_CALLBACK;
1141   } else if (stream_op->recv_message &&
1142              op_can_be_run(stream_op, s, &oas->state, OP_RECV_MESSAGE)) {
1143     CRONET_LOG(GPR_DEBUG, "running: %p  OP_RECV_MESSAGE", oas);
1144     if (stream_state->state_op_done[OP_CANCEL_ERROR]) {
1145       CRONET_LOG(GPR_DEBUG, "Stream is cancelled.");
1146       GRPC_CLOSURE_SCHED(stream_op->payload->recv_message.recv_message_ready,
1147                          GRPC_ERROR_NONE);
1148       stream_state->state_op_done[OP_RECV_MESSAGE] = true;
1149       oas->state.state_op_done[OP_RECV_MESSAGE] = true;
1150       result = ACTION_TAKEN_NO_CALLBACK;
1151     } else if (stream_state->state_callback_received[OP_FAILED]) {
1152       CRONET_LOG(GPR_DEBUG, "Stream failed.");
1153       GRPC_CLOSURE_SCHED(stream_op->payload->recv_message.recv_message_ready,
1154                          GRPC_ERROR_NONE);
1155       stream_state->state_op_done[OP_RECV_MESSAGE] = true;
1156       oas->state.state_op_done[OP_RECV_MESSAGE] = true;
1157       result = ACTION_TAKEN_NO_CALLBACK;
1158     } else if (stream_state->rs.read_stream_closed == true) {
1159       /* No more data will be received */
1160       CRONET_LOG(GPR_DEBUG, "read stream closed");
1161       GRPC_CLOSURE_SCHED(stream_op->payload->recv_message.recv_message_ready,
1162                          GRPC_ERROR_NONE);
1163       stream_state->state_op_done[OP_RECV_MESSAGE] = true;
1164       oas->state.state_op_done[OP_RECV_MESSAGE] = true;
1165       result = ACTION_TAKEN_NO_CALLBACK;
1166     } else if (stream_state->flush_read) {
1167       CRONET_LOG(GPR_DEBUG, "flush read");
1168       GRPC_CLOSURE_SCHED(stream_op->payload->recv_message.recv_message_ready,
1169                          GRPC_ERROR_NONE);
1170       stream_state->state_op_done[OP_RECV_MESSAGE] = true;
1171       oas->state.state_op_done[OP_RECV_MESSAGE] = true;
1172       result = ACTION_TAKEN_NO_CALLBACK;
1173     } else if (stream_state->rs.length_field_received == false) {
1174       if (stream_state->rs.received_bytes == GRPC_HEADER_SIZE_IN_BYTES &&
1175           stream_state->rs.remaining_bytes == 0) {
1176         /* Start a read operation for data */
1177         stream_state->rs.length_field_received = true;
1178         parse_grpc_header(
1179             reinterpret_cast<const uint8_t*>(stream_state->rs.read_buffer),
1180             &stream_state->rs.length_field, &stream_state->rs.compressed);
1181         CRONET_LOG(GPR_DEBUG, "length field = %d",
1182                    stream_state->rs.length_field);
1183         if (stream_state->rs.length_field > 0) {
1184           stream_state->rs.read_buffer = static_cast<char*>(
1185               gpr_malloc(static_cast<size_t>(stream_state->rs.length_field)));
1186           GPR_ASSERT(stream_state->rs.read_buffer);
1187           stream_state->rs.remaining_bytes = stream_state->rs.length_field;
1188           stream_state->rs.received_bytes = 0;
1189           CRONET_LOG(GPR_DEBUG, "bidirectional_stream_read(%p)", s->cbs);
1190           stream_state->state_op_done[OP_READ_REQ_MADE] =
1191               true; /* Indicates that at least one read request has been made */
1192           bidirectional_stream_read(s->cbs, stream_state->rs.read_buffer,
1193                                     stream_state->rs.remaining_bytes);
1194           stream_state->pending_read_from_cronet = true;
1195           result = ACTION_TAKEN_WITH_CALLBACK;
1196         } else {
1197           stream_state->rs.remaining_bytes = 0;
1198           CRONET_LOG(GPR_DEBUG, "read operation complete. Empty response.");
1199           /* Clean up read_slice_buffer in case there is unread data. */
1200           grpc_slice_buffer_destroy_internal(
1201               &stream_state->rs.read_slice_buffer);
1202           grpc_slice_buffer_init(&stream_state->rs.read_slice_buffer);
1203           uint32_t flags = 0;
1204           if (stream_state->rs.compressed) {
1205             flags |= GRPC_WRITE_INTERNAL_COMPRESS;
1206           }
1207           stream_state->rs.sbs.Init(&stream_state->rs.read_slice_buffer, flags);
1208           stream_op->payload->recv_message.recv_message->reset(
1209               stream_state->rs.sbs.get());
1210           GRPC_CLOSURE_SCHED(
1211               stream_op->payload->recv_message.recv_message_ready,
1212               GRPC_ERROR_NONE);
1213           stream_state->state_op_done[OP_RECV_MESSAGE] = true;
1214           oas->state.state_op_done[OP_RECV_MESSAGE] = true;
1215 
1216           /* Extra read to trigger on_succeed */
1217           stream_state->rs.read_buffer = stream_state->rs.grpc_header_bytes;
1218           stream_state->rs.remaining_bytes = GRPC_HEADER_SIZE_IN_BYTES;
1219           stream_state->rs.received_bytes = 0;
1220           stream_state->rs.compressed = false;
1221           stream_state->rs.length_field_received = false;
1222           CRONET_LOG(GPR_DEBUG, "bidirectional_stream_read(%p)", s->cbs);
1223           stream_state->state_op_done[OP_READ_REQ_MADE] =
1224               true; /* Indicates that at least one read request has been made */
1225           bidirectional_stream_read(s->cbs, stream_state->rs.read_buffer,
1226                                     stream_state->rs.remaining_bytes);
1227           stream_state->pending_read_from_cronet = true;
1228           result = ACTION_TAKEN_NO_CALLBACK;
1229         }
1230       } else if (stream_state->rs.remaining_bytes == 0) {
1231         /* Start a read operation for first 5 bytes (GRPC header) */
1232         stream_state->rs.read_buffer = stream_state->rs.grpc_header_bytes;
1233         stream_state->rs.remaining_bytes = GRPC_HEADER_SIZE_IN_BYTES;
1234         stream_state->rs.received_bytes = 0;
1235         stream_state->rs.compressed = false;
1236         CRONET_LOG(GPR_DEBUG, "bidirectional_stream_read(%p)", s->cbs);
1237         stream_state->state_op_done[OP_READ_REQ_MADE] =
1238             true; /* Indicates that at least one read request has been made */
1239         bidirectional_stream_read(s->cbs, stream_state->rs.read_buffer,
1240                                   stream_state->rs.remaining_bytes);
1241         stream_state->pending_read_from_cronet = true;
1242         result = ACTION_TAKEN_WITH_CALLBACK;
1243       } else {
1244         result = NO_ACTION_POSSIBLE;
1245       }
1246     } else if (stream_state->rs.remaining_bytes == 0) {
1247       CRONET_LOG(GPR_DEBUG, "read operation complete");
1248       grpc_slice read_data_slice =
1249           GRPC_SLICE_MALLOC((uint32_t)stream_state->rs.length_field);
1250       uint8_t* dst_p = GRPC_SLICE_START_PTR(read_data_slice);
1251       memcpy(dst_p, stream_state->rs.read_buffer,
1252              static_cast<size_t>(stream_state->rs.length_field));
1253       null_and_maybe_free_read_buffer(s);
1254       /* Clean up read_slice_buffer in case there is unread data. */
1255       grpc_slice_buffer_destroy_internal(&stream_state->rs.read_slice_buffer);
1256       grpc_slice_buffer_init(&stream_state->rs.read_slice_buffer);
1257       grpc_slice_buffer_add(&stream_state->rs.read_slice_buffer,
1258                             read_data_slice);
1259       uint32_t flags = 0;
1260       if (stream_state->rs.compressed) {
1261         flags = GRPC_WRITE_INTERNAL_COMPRESS;
1262       }
1263       stream_state->rs.sbs.Init(&stream_state->rs.read_slice_buffer, flags);
1264       stream_op->payload->recv_message.recv_message->reset(
1265           stream_state->rs.sbs.get());
1266       GRPC_CLOSURE_SCHED(stream_op->payload->recv_message.recv_message_ready,
1267                          GRPC_ERROR_NONE);
1268       stream_state->state_op_done[OP_RECV_MESSAGE] = true;
1269       oas->state.state_op_done[OP_RECV_MESSAGE] = true;
1270       /* Do an extra read to trigger on_succeeded() callback in case connection
1271          is closed */
1272       stream_state->rs.read_buffer = stream_state->rs.grpc_header_bytes;
1273       stream_state->rs.compressed = false;
1274       stream_state->rs.received_bytes = 0;
1275       stream_state->rs.remaining_bytes = GRPC_HEADER_SIZE_IN_BYTES;
1276       stream_state->rs.length_field_received = false;
1277       CRONET_LOG(GPR_DEBUG, "bidirectional_stream_read(%p)", s->cbs);
1278       bidirectional_stream_read(s->cbs, stream_state->rs.read_buffer,
1279                                 stream_state->rs.remaining_bytes);
1280       stream_state->pending_read_from_cronet = true;
1281       result = ACTION_TAKEN_NO_CALLBACK;
1282     }
1283   } else if (stream_op->recv_trailing_metadata &&
1284              op_can_be_run(stream_op, s, &oas->state,
1285                            OP_RECV_TRAILING_METADATA)) {
1286     CRONET_LOG(GPR_DEBUG, "running: %p  OP_RECV_TRAILING_METADATA", oas);
1287     grpc_error* error = GRPC_ERROR_NONE;
1288     if (stream_state->state_op_done[OP_CANCEL_ERROR]) {
1289       error = GRPC_ERROR_REF(stream_state->cancel_error);
1290     } else if (stream_state->state_callback_received[OP_FAILED]) {
1291       error = make_error_with_desc(GRPC_STATUS_UNAVAILABLE, "Unavailable.");
1292     } else if (oas->s->state.rs.trailing_metadata_valid) {
1293       grpc_chttp2_incoming_metadata_buffer_publish(
1294           &oas->s->state.rs.trailing_metadata,
1295           stream_op->payload->recv_trailing_metadata.recv_trailing_metadata);
1296       stream_state->rs.trailing_metadata_valid = false;
1297     }
1298     GRPC_CLOSURE_SCHED(
1299         stream_op->payload->recv_trailing_metadata.recv_trailing_metadata_ready,
1300         error);
1301     stream_state->state_op_done[OP_RECV_TRAILING_METADATA] = true;
1302     result = ACTION_TAKEN_NO_CALLBACK;
1303   } else if (stream_op->cancel_stream &&
1304              op_can_be_run(stream_op, s, &oas->state, OP_CANCEL_ERROR)) {
1305     CRONET_LOG(GPR_DEBUG, "running: %p  OP_CANCEL_ERROR", oas);
1306     if (s->cbs) {
1307       CRONET_LOG(GPR_DEBUG, "W: bidirectional_stream_cancel(%p)", s->cbs);
1308       bidirectional_stream_cancel(s->cbs);
1309       result = ACTION_TAKEN_WITH_CALLBACK;
1310     } else {
1311       result = ACTION_TAKEN_NO_CALLBACK;
1312     }
1313     stream_state->state_op_done[OP_CANCEL_ERROR] = true;
1314     if (!stream_state->cancel_error) {
1315       stream_state->cancel_error =
1316           GRPC_ERROR_REF(stream_op->payload->cancel_stream.cancel_error);
1317     }
1318   } else if (stream_op->on_complete &&
1319              op_can_be_run(stream_op, s, &oas->state, OP_ON_COMPLETE)) {
1320     CRONET_LOG(GPR_DEBUG, "running: %p  OP_ON_COMPLETE", oas);
1321     if (stream_state->state_op_done[OP_CANCEL_ERROR]) {
1322       GRPC_CLOSURE_SCHED(stream_op->on_complete,
1323                          GRPC_ERROR_REF(stream_state->cancel_error));
1324     } else if (stream_state->state_callback_received[OP_FAILED]) {
1325       GRPC_CLOSURE_SCHED(
1326           stream_op->on_complete,
1327           make_error_with_desc(GRPC_STATUS_UNAVAILABLE, "Unavailable."));
1328     } else {
1329       /* All actions in this stream_op are complete. Call the on_complete
1330        * callback
1331        */
1332       GRPC_CLOSURE_SCHED(stream_op->on_complete, GRPC_ERROR_NONE);
1333     }
1334     oas->state.state_op_done[OP_ON_COMPLETE] = true;
1335     oas->done = true;
1336     /* reset any send message state, only if this ON_COMPLETE is about a send.
1337      */
1338     if (stream_op->send_message) {
1339       stream_state->state_callback_received[OP_SEND_MESSAGE] = false;
1340       stream_state->state_op_done[OP_SEND_MESSAGE] = false;
1341     }
1342     result = ACTION_TAKEN_NO_CALLBACK;
1343     /* If this is the on_complete callback being called for a received message -
1344       make a note */
1345     if (stream_op->recv_message)
1346       stream_state->state_op_done[OP_RECV_MESSAGE_AND_ON_COMPLETE] = true;
1347   } else {
1348     result = NO_ACTION_POSSIBLE;
1349   }
1350   return result;
1351 }
1352 
1353 /*
1354   Functions used by upper layers to access transport functionality.
1355 */
1356 
init_stream(grpc_transport * gt,grpc_stream * gs,grpc_stream_refcount * refcount,const void * server_data,gpr_arena * arena)1357 static int init_stream(grpc_transport* gt, grpc_stream* gs,
1358                        grpc_stream_refcount* refcount, const void* server_data,
1359                        gpr_arena* arena) {
1360   stream_obj* s = reinterpret_cast<stream_obj*>(gs);
1361 
1362   s->refcount = refcount;
1363   GRPC_CRONET_STREAM_REF(s, "cronet transport");
1364   memset(&s->storage, 0, sizeof(s->storage));
1365   s->storage.head = nullptr;
1366   memset(&s->state, 0, sizeof(s->state));
1367   s->curr_op = nullptr;
1368   s->cbs = nullptr;
1369   memset(&s->header_array, 0, sizeof(s->header_array));
1370   memset(&s->state.rs, 0, sizeof(s->state.rs));
1371   memset(&s->state.ws, 0, sizeof(s->state.ws));
1372   memset(s->state.state_op_done, 0, sizeof(s->state.state_op_done));
1373   memset(s->state.state_callback_received, 0,
1374          sizeof(s->state.state_callback_received));
1375   s->state.fail_state = s->state.flush_read = false;
1376   s->state.cancel_error = nullptr;
1377   s->state.flush_cronet_when_ready = s->state.pending_write_for_trailer = false;
1378   s->state.pending_send_message = false;
1379   s->state.pending_recv_trailing_metadata = false;
1380   s->state.pending_read_from_cronet = false;
1381 
1382   s->curr_gs = gs;
1383   s->curr_ct = reinterpret_cast<grpc_cronet_transport*>(gt);
1384   s->arena = arena;
1385 
1386   gpr_mu_init(&s->mu);
1387   return 0;
1388 }
1389 
set_pollset_do_nothing(grpc_transport * gt,grpc_stream * gs,grpc_pollset * pollset)1390 static void set_pollset_do_nothing(grpc_transport* gt, grpc_stream* gs,
1391                                    grpc_pollset* pollset) {}
1392 
set_pollset_set_do_nothing(grpc_transport * gt,grpc_stream * gs,grpc_pollset_set * pollset_set)1393 static void set_pollset_set_do_nothing(grpc_transport* gt, grpc_stream* gs,
1394                                        grpc_pollset_set* pollset_set) {}
1395 
perform_stream_op(grpc_transport * gt,grpc_stream * gs,grpc_transport_stream_op_batch * op)1396 static void perform_stream_op(grpc_transport* gt, grpc_stream* gs,
1397                               grpc_transport_stream_op_batch* op) {
1398   CRONET_LOG(GPR_DEBUG, "perform_stream_op");
1399   if (op->send_initial_metadata &&
1400       header_has_authority(op->payload->send_initial_metadata
1401                                .send_initial_metadata->list.head)) {
1402     /* Cronet does not support :authority header field. We cancel the call when
1403      this field is present in metadata */
1404     if (op->recv_initial_metadata) {
1405       GRPC_CLOSURE_SCHED(
1406           op->payload->recv_initial_metadata.recv_initial_metadata_ready,
1407           GRPC_ERROR_CANCELLED);
1408     }
1409     if (op->recv_message) {
1410       GRPC_CLOSURE_SCHED(op->payload->recv_message.recv_message_ready,
1411                          GRPC_ERROR_CANCELLED);
1412     }
1413     if (op->recv_trailing_metadata) {
1414       GRPC_CLOSURE_SCHED(
1415           op->payload->recv_trailing_metadata.recv_trailing_metadata_ready,
1416           GRPC_ERROR_CANCELLED);
1417     }
1418     GRPC_CLOSURE_SCHED(op->on_complete, GRPC_ERROR_CANCELLED);
1419     return;
1420   }
1421   stream_obj* s = reinterpret_cast<stream_obj*>(gs);
1422   add_to_storage(s, op);
1423   execute_from_storage(s);
1424 }
1425 
destroy_stream(grpc_transport * gt,grpc_stream * gs,grpc_closure * then_schedule_closure)1426 static void destroy_stream(grpc_transport* gt, grpc_stream* gs,
1427                            grpc_closure* then_schedule_closure) {
1428   stream_obj* s = reinterpret_cast<stream_obj*>(gs);
1429   null_and_maybe_free_read_buffer(s);
1430   /* Clean up read_slice_buffer in case there is unread data. */
1431   grpc_slice_buffer_destroy_internal(&s->state.rs.read_slice_buffer);
1432   GRPC_ERROR_UNREF(s->state.cancel_error);
1433   GRPC_CLOSURE_SCHED(then_schedule_closure, GRPC_ERROR_NONE);
1434 }
1435 
destroy_transport(grpc_transport * gt)1436 static void destroy_transport(grpc_transport* gt) {}
1437 
get_endpoint(grpc_transport * gt)1438 static grpc_endpoint* get_endpoint(grpc_transport* gt) { return nullptr; }
1439 
perform_op(grpc_transport * gt,grpc_transport_op * op)1440 static void perform_op(grpc_transport* gt, grpc_transport_op* op) {}
1441 
1442 static const grpc_transport_vtable grpc_cronet_vtable = {
1443     sizeof(stream_obj),
1444     "cronet_http",
1445     init_stream,
1446     set_pollset_do_nothing,
1447     set_pollset_set_do_nothing,
1448     perform_stream_op,
1449     perform_op,
1450     destroy_stream,
1451     destroy_transport,
1452     get_endpoint};
1453 
grpc_create_cronet_transport(void * engine,const char * target,const grpc_channel_args * args,void * reserved)1454 grpc_transport* grpc_create_cronet_transport(void* engine, const char* target,
1455                                              const grpc_channel_args* args,
1456                                              void* reserved) {
1457   grpc_cronet_transport* ct = static_cast<grpc_cronet_transport*>(
1458       gpr_malloc(sizeof(grpc_cronet_transport)));
1459   if (!ct) {
1460     goto error;
1461   }
1462   ct->base.vtable = &grpc_cronet_vtable;
1463   ct->engine = static_cast<stream_engine*>(engine);
1464   ct->host = static_cast<char*>(gpr_malloc(strlen(target) + 1));
1465   if (!ct->host) {
1466     goto error;
1467   }
1468   strcpy(ct->host, target);
1469 
1470   ct->use_packet_coalescing = true;
1471   if (args) {
1472     for (size_t i = 0; i < args->num_args; i++) {
1473       if (0 ==
1474           strcmp(args->args[i].key, GRPC_ARG_USE_CRONET_PACKET_COALESCING)) {
1475         if (GPR_UNLIKELY(args->args[i].type != GRPC_ARG_INTEGER)) {
1476           gpr_log(GPR_ERROR, "%s ignored: it must be an integer",
1477                   GRPC_ARG_USE_CRONET_PACKET_COALESCING);
1478         } else {
1479           ct->use_packet_coalescing = (args->args[i].value.integer != 0);
1480         }
1481       }
1482     }
1483   }
1484 
1485   return &ct->base;
1486 
1487 error:
1488   if (ct) {
1489     if (ct->host) {
1490       gpr_free(ct->host);
1491     }
1492     gpr_free(ct);
1493   }
1494 
1495   return nullptr;
1496 }
1497