1 /*
2  *
3  * Copyright 2015 gRPC authors.
4  *
5  * Licensed under the Apache License, Version 2.0 (the "License");
6  * you may not use this file except in compliance with the License.
7  * You may obtain a copy of the License at
8  *
9  *     http://www.apache.org/licenses/LICENSE-2.0
10  *
11  * Unless required by applicable law or agreed to in writing, software
12  * distributed under the License is distributed on an "AS IS" BASIS,
13  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  * See the License for the specific language governing permissions and
15  * limitations under the License.
16  *
17  */
18 
19 #include <grpc/support/port_platform.h>
20 
21 #include "src/core/lib/iomgr/port.h"
22 
23 #ifdef GRPC_POSIX_SOCKET_TCP
24 
25 #include "src/core/lib/iomgr/tcp_posix.h"
26 
27 #include <errno.h>
28 #include <limits.h>
29 #include <netinet/in.h>
30 #include <netinet/tcp.h>
31 #include <stdbool.h>
32 #include <stdio.h>
33 #include <stdlib.h>
34 #include <string.h>
35 #include <sys/socket.h>
36 #include <sys/types.h>
37 #include <unistd.h>
38 #include <algorithm>
39 #include <unordered_map>
40 
41 #include <grpc/slice.h>
42 #include <grpc/support/alloc.h>
43 #include <grpc/support/log.h>
44 #include <grpc/support/string_util.h>
45 #include <grpc/support/sync.h>
46 #include <grpc/support/time.h>
47 
48 #include "src/core/lib/channel/channel_args.h"
49 #include "src/core/lib/debug/stats.h"
50 #include "src/core/lib/debug/trace.h"
51 #include "src/core/lib/gpr/string.h"
52 #include "src/core/lib/gpr/useful.h"
53 #include "src/core/lib/gprpp/sync.h"
54 #include "src/core/lib/iomgr/buffer_list.h"
55 #include "src/core/lib/iomgr/ev_posix.h"
56 #include "src/core/lib/iomgr/executor.h"
57 #include "src/core/lib/iomgr/sockaddr_utils.h"
58 #include "src/core/lib/iomgr/socket_utils_posix.h"
59 #include "src/core/lib/profiling/timers.h"
60 #include "src/core/lib/slice/slice_internal.h"
61 #include "src/core/lib/slice/slice_string_helpers.h"
62 
63 #ifndef SOL_TCP
64 #define SOL_TCP IPPROTO_TCP
65 #endif
66 
67 #ifndef TCP_INQ
68 #define TCP_INQ 36
69 #define TCP_CM_INQ TCP_INQ
70 #endif
71 
72 #ifdef GRPC_HAVE_MSG_NOSIGNAL
73 #define SENDMSG_FLAGS MSG_NOSIGNAL
74 #else
75 #define SENDMSG_FLAGS 0
76 #endif
77 
78 // TCP zero copy sendmsg flag.
79 // NB: We define this here as a fallback in case we're using an older set of
80 // library headers that has not defined MSG_ZEROCOPY. Since this constant is
81 // part of the kernel, we are guaranteed it will never change/disagree so
82 // defining it here is safe.
83 #ifndef MSG_ZEROCOPY
84 #define MSG_ZEROCOPY 0x4000000
85 #endif
86 
87 #ifdef GRPC_MSG_IOVLEN_TYPE
88 typedef GRPC_MSG_IOVLEN_TYPE msg_iovlen_type;
89 #else
90 typedef size_t msg_iovlen_type;
91 #endif
92 
93 extern grpc_core::TraceFlag grpc_tcp_trace;
94 
95 namespace grpc_core {
96 
97 class TcpZerocopySendRecord {
98  public:
TcpZerocopySendRecord()99   TcpZerocopySendRecord() { grpc_slice_buffer_init(&buf_); }
100 
~TcpZerocopySendRecord()101   ~TcpZerocopySendRecord() {
102     AssertEmpty();
103     grpc_slice_buffer_destroy_internal(&buf_);
104   }
105 
106   // Given the slices that we wish to send, and the current offset into the
107   //   slice buffer (indicating which have already been sent), populate an iovec
108   //   array that will be used for a zerocopy enabled sendmsg().
109   msg_iovlen_type PopulateIovs(size_t* unwind_slice_idx,
110                                size_t* unwind_byte_idx, size_t* sending_length,
111                                iovec* iov);
112 
113   // A sendmsg() may not be able to send the bytes that we requested at this
114   // time, returning EAGAIN (possibly due to backpressure). In this case,
115   // unwind the offset into the slice buffer so we retry sending these bytes.
UnwindIfThrottled(size_t unwind_slice_idx,size_t unwind_byte_idx)116   void UnwindIfThrottled(size_t unwind_slice_idx, size_t unwind_byte_idx) {
117     out_offset_.byte_idx = unwind_byte_idx;
118     out_offset_.slice_idx = unwind_slice_idx;
119   }
120 
121   // Update the offset into the slice buffer based on how much we wanted to sent
122   // vs. what sendmsg() actually sent (which may be lower, possibly due to
123   // backpressure).
124   void UpdateOffsetForBytesSent(size_t sending_length, size_t actually_sent);
125 
126   // Indicates whether all underlying data has been sent or not.
AllSlicesSent()127   bool AllSlicesSent() { return out_offset_.slice_idx == buf_.count; }
128 
129   // Reset this structure for a new tcp_write() with zerocopy.
PrepareForSends(grpc_slice_buffer * slices_to_send)130   void PrepareForSends(grpc_slice_buffer* slices_to_send) {
131     AssertEmpty();
132     out_offset_.slice_idx = 0;
133     out_offset_.byte_idx = 0;
134     grpc_slice_buffer_swap(slices_to_send, &buf_);
135     Ref();
136   }
137 
138   // References: 1 reference per sendmsg(), and 1 for the tcp_write().
Ref()139   void Ref() { ref_.FetchAdd(1, MemoryOrder::RELAXED); }
140 
141   // Unref: called when we get an error queue notification for a sendmsg(), if a
142   //  sendmsg() failed or when tcp_write() is done.
Unref()143   bool Unref() {
144     const intptr_t prior = ref_.FetchSub(1, MemoryOrder::ACQ_REL);
145     GPR_DEBUG_ASSERT(prior > 0);
146     if (prior == 1) {
147       AllSendsComplete();
148       return true;
149     }
150     return false;
151   }
152 
153  private:
154   struct OutgoingOffset {
155     size_t slice_idx = 0;
156     size_t byte_idx = 0;
157   };
158 
AssertEmpty()159   void AssertEmpty() {
160     GPR_DEBUG_ASSERT(buf_.count == 0);
161     GPR_DEBUG_ASSERT(buf_.length == 0);
162     GPR_DEBUG_ASSERT(ref_.Load(MemoryOrder::RELAXED) == 0);
163   }
164 
165   // When all sendmsg() calls associated with this tcp_write() have been
166   // completed (ie. we have received the notifications for each sequence number
167   // for each sendmsg()) and all reference counts have been dropped, drop our
168   // reference to the underlying data since we no longer need it.
AllSendsComplete()169   void AllSendsComplete() {
170     GPR_DEBUG_ASSERT(ref_.Load(MemoryOrder::RELAXED) == 0);
171     grpc_slice_buffer_reset_and_unref_internal(&buf_);
172   }
173 
174   grpc_slice_buffer buf_;
175   Atomic<intptr_t> ref_;
176   OutgoingOffset out_offset_;
177 };
178 
179 class TcpZerocopySendCtx {
180  public:
181   static constexpr int kDefaultMaxSends = 4;
182   static constexpr size_t kDefaultSendBytesThreshold = 16 * 1024;  // 16KB
183 
TcpZerocopySendCtx(int max_sends=kDefaultMaxSends,size_t send_bytes_threshold=kDefaultSendBytesThreshold)184   explicit TcpZerocopySendCtx(
185       int max_sends = kDefaultMaxSends,
186       size_t send_bytes_threshold = kDefaultSendBytesThreshold)
187       : max_sends_(max_sends),
188         free_send_records_size_(max_sends),
189         threshold_bytes_(send_bytes_threshold) {
190     send_records_ = static_cast<TcpZerocopySendRecord*>(
191         gpr_malloc(max_sends * sizeof(*send_records_)));
192     free_send_records_ = static_cast<TcpZerocopySendRecord**>(
193         gpr_malloc(max_sends * sizeof(*free_send_records_)));
194     if (send_records_ == nullptr || free_send_records_ == nullptr) {
195       gpr_free(send_records_);
196       gpr_free(free_send_records_);
197       gpr_log(GPR_INFO, "Disabling TCP TX zerocopy due to memory pressure.\n");
198       memory_limited_ = true;
199     } else {
200       for (int idx = 0; idx < max_sends_; ++idx) {
201         new (send_records_ + idx) TcpZerocopySendRecord();
202         free_send_records_[idx] = send_records_ + idx;
203       }
204     }
205   }
206 
~TcpZerocopySendCtx()207   ~TcpZerocopySendCtx() {
208     if (send_records_ != nullptr) {
209       for (int idx = 0; idx < max_sends_; ++idx) {
210         send_records_[idx].~TcpZerocopySendRecord();
211       }
212     }
213     gpr_free(send_records_);
214     gpr_free(free_send_records_);
215   }
216 
217   // True if we were unable to allocate the various bookkeeping structures at
218   // transport initialization time. If memory limited, we do not zerocopy.
memory_limited() const219   bool memory_limited() const { return memory_limited_; }
220 
221   // TCP send zerocopy maintains an implicit sequence number for every
222   // successful sendmsg() with zerocopy enabled; the kernel later gives us an
223   // error queue notification with this sequence number indicating that the
224   // underlying data buffers that we sent can now be released. Once that
225   // notification is received, we can release the buffers associated with this
226   // zerocopy send record. Here, we associate the sequence number with the data
227   // buffers that were sent with the corresponding call to sendmsg().
NoteSend(TcpZerocopySendRecord * record)228   void NoteSend(TcpZerocopySendRecord* record) {
229     record->Ref();
230     AssociateSeqWithSendRecord(last_send_, record);
231     ++last_send_;
232   }
233 
234   // If sendmsg() actually failed, though, we need to revert the sequence number
235   // that we speculatively bumped before calling sendmsg(). Note that we bump
236   // this sequence number and perform relevant bookkeeping (see: NoteSend())
237   // *before* calling sendmsg() since, if we called it *after* sendmsg(), then
238   // there is a possible race with the release notification which could occur on
239   // another thread before we do the necessary bookkeeping. Hence, calling
240   // NoteSend() *before* sendmsg() and implementing an undo function is needed.
UndoSend()241   void UndoSend() {
242     --last_send_;
243     if (ReleaseSendRecord(last_send_)->Unref()) {
244       // We should still be holding the ref taken by tcp_write().
245       GPR_DEBUG_ASSERT(0);
246     }
247   }
248 
249   // Simply associate this send record (and the underlying sent data buffers)
250   // with the implicit sequence number for this zerocopy sendmsg().
AssociateSeqWithSendRecord(uint32_t seq,TcpZerocopySendRecord * record)251   void AssociateSeqWithSendRecord(uint32_t seq, TcpZerocopySendRecord* record) {
252     MutexLock guard(&lock_);
253     ctx_lookup_.emplace(seq, record);
254   }
255 
256   // Get a send record for a send that we wish to do with zerocopy.
GetSendRecord()257   TcpZerocopySendRecord* GetSendRecord() {
258     MutexLock guard(&lock_);
259     return TryGetSendRecordLocked();
260   }
261 
262   // A given send record corresponds to a single tcp_write() with zerocopy
263   // enabled. This can result in several sendmsg() calls to flush all of the
264   // data to wire. Each sendmsg() takes a reference on the
265   // TcpZerocopySendRecord, and corresponds to a single sequence number.
266   // ReleaseSendRecord releases a reference on TcpZerocopySendRecord for a
267   // single sequence number. This is called either when we receive the relevant
268   // error queue notification (saying that we can discard the underlying
269   // buffers for this sendmsg()) is received from the kernel - or, in case
270   // sendmsg() was unsuccessful to begin with.
ReleaseSendRecord(uint32_t seq)271   TcpZerocopySendRecord* ReleaseSendRecord(uint32_t seq) {
272     MutexLock guard(&lock_);
273     return ReleaseSendRecordLocked(seq);
274   }
275 
276   // After all the references to a TcpZerocopySendRecord are released, we can
277   // add it back to the pool (of size max_sends_). Note that we can only have
278   // max_sends_ tcp_write() instances with zerocopy enabled in flight at the
279   // same time.
PutSendRecord(TcpZerocopySendRecord * record)280   void PutSendRecord(TcpZerocopySendRecord* record) {
281     GPR_DEBUG_ASSERT(record >= send_records_ &&
282                      record < send_records_ + max_sends_);
283     MutexLock guard(&lock_);
284     PutSendRecordLocked(record);
285   }
286 
287   // Indicate that we are disposing of this zerocopy context. This indicator
288   // will prevent new zerocopy writes from being issued.
Shutdown()289   void Shutdown() { shutdown_.Store(true, MemoryOrder::RELEASE); }
290 
291   // Indicates that there are no inflight tcp_write() instances with zerocopy
292   // enabled.
AllSendRecordsEmpty()293   bool AllSendRecordsEmpty() {
294     MutexLock guard(&lock_);
295     return free_send_records_size_ == max_sends_;
296   }
297 
enabled() const298   bool enabled() const { return enabled_; }
299 
set_enabled(bool enabled)300   void set_enabled(bool enabled) {
301     GPR_DEBUG_ASSERT(!enabled || !memory_limited());
302     enabled_ = enabled;
303   }
304 
305   // Only use zerocopy if we are sending at least this many bytes. The
306   // additional overhead of reading the error queue for notifications means that
307   // zerocopy is not useful for small transfers.
threshold_bytes() const308   size_t threshold_bytes() const { return threshold_bytes_; }
309 
310  private:
ReleaseSendRecordLocked(uint32_t seq)311   TcpZerocopySendRecord* ReleaseSendRecordLocked(uint32_t seq) {
312     auto iter = ctx_lookup_.find(seq);
313     GPR_DEBUG_ASSERT(iter != ctx_lookup_.end());
314     TcpZerocopySendRecord* record = iter->second;
315     ctx_lookup_.erase(iter);
316     return record;
317   }
318 
TryGetSendRecordLocked()319   TcpZerocopySendRecord* TryGetSendRecordLocked() {
320     if (shutdown_.Load(MemoryOrder::ACQUIRE)) {
321       return nullptr;
322     }
323     if (free_send_records_size_ == 0) {
324       return nullptr;
325     }
326     free_send_records_size_--;
327     return free_send_records_[free_send_records_size_];
328   }
329 
PutSendRecordLocked(TcpZerocopySendRecord * record)330   void PutSendRecordLocked(TcpZerocopySendRecord* record) {
331     GPR_DEBUG_ASSERT(free_send_records_size_ < max_sends_);
332     free_send_records_[free_send_records_size_] = record;
333     free_send_records_size_++;
334   }
335 
336   TcpZerocopySendRecord* send_records_;
337   TcpZerocopySendRecord** free_send_records_;
338   int max_sends_;
339   int free_send_records_size_;
340   Mutex lock_;
341   uint32_t last_send_ = 0;
342   Atomic<bool> shutdown_;
343   bool enabled_ = false;
344   size_t threshold_bytes_ = kDefaultSendBytesThreshold;
345   std::unordered_map<uint32_t, TcpZerocopySendRecord*> ctx_lookup_;
346   bool memory_limited_ = false;
347 };
348 
349 }  // namespace grpc_core
350 
351 using grpc_core::TcpZerocopySendCtx;
352 using grpc_core::TcpZerocopySendRecord;
353 
354 namespace {
355 struct grpc_tcp {
grpc_tcp__anon92f59b080111::grpc_tcp356   grpc_tcp(int max_sends, size_t send_bytes_threshold)
357       : tcp_zerocopy_send_ctx(max_sends, send_bytes_threshold) {}
358   grpc_endpoint base;
359   grpc_fd* em_fd;
360   int fd;
361   /* Used by the endpoint read function to distinguish the very first read call
362    * from the rest */
363   bool is_first_read;
364   double target_length;
365   double bytes_read_this_round;
366   grpc_core::RefCount refcount;
367   gpr_atm shutdown_count;
368 
369   int min_read_chunk_size;
370   int max_read_chunk_size;
371 
372   /* garbage after the last read */
373   grpc_slice_buffer last_read_buffer;
374 
375   grpc_slice_buffer* incoming_buffer;
376   int inq;          /* bytes pending on the socket from the last read. */
377   bool inq_capable; /* cache whether kernel supports inq */
378 
379   grpc_slice_buffer* outgoing_buffer;
380   /* byte within outgoing_buffer->slices[0] to write next */
381   size_t outgoing_byte_idx;
382 
383   grpc_closure* read_cb;
384   grpc_closure* write_cb;
385   grpc_closure* release_fd_cb;
386   int* release_fd;
387 
388   grpc_closure read_done_closure;
389   grpc_closure write_done_closure;
390   grpc_closure error_closure;
391 
392   std::string peer_string;
393   std::string local_address;
394 
395   grpc_resource_user* resource_user;
396   grpc_resource_user_slice_allocator slice_allocator;
397 
398   grpc_core::TracedBuffer* tb_head; /* List of traced buffers */
399   gpr_mu tb_mu; /* Lock for access to list of traced buffers */
400 
401   /* grpc_endpoint_write takes an argument which if non-null means that the
402    * transport layer wants the TCP layer to collect timestamps for this write.
403    * This arg is forwarded to the timestamps callback function when the ACK
404    * timestamp is received from the kernel. This arg is a (void *) which allows
405    * users of this API to pass in a pointer to any kind of structure. This
406    * structure could actually be a tag or any book-keeping object that the user
407    * can use to distinguish between different traced writes. The only
408    * requirement from the TCP endpoint layer is that this arg should be non-null
409    * if the user wants timestamps for the write. */
410   void* outgoing_buffer_arg;
411   /* A counter which starts at 0. It is initialized the first time the socket
412    * options for collecting timestamps are set, and is incremented with each
413    * byte sent. */
414   int bytes_counter;
415   bool socket_ts_enabled; /* True if timestamping options are set on the socket
416                            */
417   bool ts_capable;        /* Cache whether we can set timestamping options */
418   gpr_atm stop_error_notification; /* Set to 1 if we do not want to be notified
419                                       on errors anymore */
420   TcpZerocopySendCtx tcp_zerocopy_send_ctx;
421   TcpZerocopySendRecord* current_zerocopy_send = nullptr;
422 };
423 
424 struct backup_poller {
425   gpr_mu* pollset_mu;
426   grpc_closure run_poller;
427 };
428 
429 }  // namespace
430 
431 static void ZerocopyDisableAndWaitForRemaining(grpc_tcp* tcp);
432 
433 #define BACKUP_POLLER_POLLSET(b) ((grpc_pollset*)((b) + 1))
434 
435 static gpr_atm g_uncovered_notifications_pending;
436 static gpr_atm g_backup_poller; /* backup_poller* */
437 
438 static void tcp_handle_read(void* arg /* grpc_tcp */, grpc_error* error);
439 static void tcp_handle_write(void* arg /* grpc_tcp */, grpc_error* error);
440 static void tcp_drop_uncovered_then_handle_write(void* arg /* grpc_tcp */,
441                                                  grpc_error* error);
442 
done_poller(void * bp,grpc_error *)443 static void done_poller(void* bp, grpc_error* /*error_ignored*/) {
444   backup_poller* p = static_cast<backup_poller*>(bp);
445   if (GRPC_TRACE_FLAG_ENABLED(grpc_tcp_trace)) {
446     gpr_log(GPR_INFO, "BACKUP_POLLER:%p destroy", p);
447   }
448   grpc_pollset_destroy(BACKUP_POLLER_POLLSET(p));
449   gpr_free(p);
450 }
451 
run_poller(void * bp,grpc_error *)452 static void run_poller(void* bp, grpc_error* /*error_ignored*/) {
453   backup_poller* p = static_cast<backup_poller*>(bp);
454   if (GRPC_TRACE_FLAG_ENABLED(grpc_tcp_trace)) {
455     gpr_log(GPR_INFO, "BACKUP_POLLER:%p run", p);
456   }
457   gpr_mu_lock(p->pollset_mu);
458   grpc_millis deadline = grpc_core::ExecCtx::Get()->Now() + 10 * GPR_MS_PER_SEC;
459   GRPC_STATS_INC_TCP_BACKUP_POLLER_POLLS();
460   GRPC_LOG_IF_ERROR(
461       "backup_poller:pollset_work",
462       grpc_pollset_work(BACKUP_POLLER_POLLSET(p), nullptr, deadline));
463   gpr_mu_unlock(p->pollset_mu);
464   /* last "uncovered" notification is the ref that keeps us polling, if we get
465    * there try a cas to release it */
466   if (gpr_atm_no_barrier_load(&g_uncovered_notifications_pending) == 1 &&
467       gpr_atm_full_cas(&g_uncovered_notifications_pending, 1, 0)) {
468     gpr_mu_lock(p->pollset_mu);
469     bool cas_ok =
470         gpr_atm_full_cas(&g_backup_poller, reinterpret_cast<gpr_atm>(p), 0);
471     if (GRPC_TRACE_FLAG_ENABLED(grpc_tcp_trace)) {
472       gpr_log(GPR_INFO, "BACKUP_POLLER:%p done cas_ok=%d", p, cas_ok);
473     }
474     gpr_mu_unlock(p->pollset_mu);
475     if (GRPC_TRACE_FLAG_ENABLED(grpc_tcp_trace)) {
476       gpr_log(GPR_INFO, "BACKUP_POLLER:%p shutdown", p);
477     }
478     grpc_pollset_shutdown(BACKUP_POLLER_POLLSET(p),
479                           GRPC_CLOSURE_INIT(&p->run_poller, done_poller, p,
480                                             grpc_schedule_on_exec_ctx));
481   } else {
482     if (GRPC_TRACE_FLAG_ENABLED(grpc_tcp_trace)) {
483       gpr_log(GPR_INFO, "BACKUP_POLLER:%p reschedule", p);
484     }
485     grpc_core::Executor::Run(&p->run_poller, GRPC_ERROR_NONE,
486                              grpc_core::ExecutorType::DEFAULT,
487                              grpc_core::ExecutorJobType::LONG);
488   }
489 }
490 
drop_uncovered(grpc_tcp *)491 static void drop_uncovered(grpc_tcp* /*tcp*/) {
492   backup_poller* p =
493       reinterpret_cast<backup_poller*>(gpr_atm_acq_load(&g_backup_poller));
494   gpr_atm old_count =
495       gpr_atm_full_fetch_add(&g_uncovered_notifications_pending, -1);
496   if (GRPC_TRACE_FLAG_ENABLED(grpc_tcp_trace)) {
497     gpr_log(GPR_INFO, "BACKUP_POLLER:%p uncover cnt %d->%d", p,
498             static_cast<int>(old_count), static_cast<int>(old_count) - 1);
499   }
500   GPR_ASSERT(old_count != 1);
501 }
502 
503 // gRPC API considers a Write operation to be done the moment it clears ‘flow
504 // control’ i.e., not necessarily sent on the wire. This means that the
505 // application MIGHT not call `grpc_completion_queue_next/pluck` in a timely
506 // manner when its `Write()` API is acked.
507 //
508 // We need to ensure that the fd is 'covered' (i.e being monitored by some
509 // polling thread and progress is made) and hence add it to a backup poller here
cover_self(grpc_tcp * tcp)510 static void cover_self(grpc_tcp* tcp) {
511   backup_poller* p;
512   gpr_atm old_count =
513       gpr_atm_no_barrier_fetch_add(&g_uncovered_notifications_pending, 2);
514   if (GRPC_TRACE_FLAG_ENABLED(grpc_tcp_trace)) {
515     gpr_log(GPR_INFO, "BACKUP_POLLER: cover cnt %d->%d",
516             static_cast<int>(old_count), 2 + static_cast<int>(old_count));
517   }
518   if (old_count == 0) {
519     GRPC_STATS_INC_TCP_BACKUP_POLLERS_CREATED();
520     p = static_cast<backup_poller*>(
521         gpr_zalloc(sizeof(*p) + grpc_pollset_size()));
522     if (GRPC_TRACE_FLAG_ENABLED(grpc_tcp_trace)) {
523       gpr_log(GPR_INFO, "BACKUP_POLLER:%p create", p);
524     }
525     grpc_pollset_init(BACKUP_POLLER_POLLSET(p), &p->pollset_mu);
526     gpr_atm_rel_store(&g_backup_poller, (gpr_atm)p);
527     grpc_core::Executor::Run(
528         GRPC_CLOSURE_INIT(&p->run_poller, run_poller, p, nullptr),
529         GRPC_ERROR_NONE, grpc_core::ExecutorType::DEFAULT,
530         grpc_core::ExecutorJobType::LONG);
531   } else {
532     while ((p = reinterpret_cast<backup_poller*>(
533                 gpr_atm_acq_load(&g_backup_poller))) == nullptr) {
534       // spin waiting for backup poller
535     }
536   }
537   if (GRPC_TRACE_FLAG_ENABLED(grpc_tcp_trace)) {
538     gpr_log(GPR_INFO, "BACKUP_POLLER:%p add %p", p, tcp);
539   }
540   grpc_pollset_add_fd(BACKUP_POLLER_POLLSET(p), tcp->em_fd);
541   if (old_count != 0) {
542     drop_uncovered(tcp);
543   }
544 }
545 
notify_on_read(grpc_tcp * tcp)546 static void notify_on_read(grpc_tcp* tcp) {
547   if (GRPC_TRACE_FLAG_ENABLED(grpc_tcp_trace)) {
548     gpr_log(GPR_INFO, "TCP:%p notify_on_read", tcp);
549   }
550   grpc_fd_notify_on_read(tcp->em_fd, &tcp->read_done_closure);
551 }
552 
notify_on_write(grpc_tcp * tcp)553 static void notify_on_write(grpc_tcp* tcp) {
554   if (GRPC_TRACE_FLAG_ENABLED(grpc_tcp_trace)) {
555     gpr_log(GPR_INFO, "TCP:%p notify_on_write", tcp);
556   }
557   if (!grpc_event_engine_run_in_background()) {
558     cover_self(tcp);
559   }
560   grpc_fd_notify_on_write(tcp->em_fd, &tcp->write_done_closure);
561 }
562 
tcp_drop_uncovered_then_handle_write(void * arg,grpc_error * error)563 static void tcp_drop_uncovered_then_handle_write(void* arg, grpc_error* error) {
564   if (GRPC_TRACE_FLAG_ENABLED(grpc_tcp_trace)) {
565     gpr_log(GPR_INFO, "TCP:%p got_write: %s", arg, grpc_error_string(error));
566   }
567   drop_uncovered(static_cast<grpc_tcp*>(arg));
568   tcp_handle_write(arg, error);
569 }
570 
add_to_estimate(grpc_tcp * tcp,size_t bytes)571 static void add_to_estimate(grpc_tcp* tcp, size_t bytes) {
572   tcp->bytes_read_this_round += static_cast<double>(bytes);
573 }
574 
finish_estimate(grpc_tcp * tcp)575 static void finish_estimate(grpc_tcp* tcp) {
576   /* If we read >80% of the target buffer in one read loop, increase the size
577      of the target buffer to either the amount read, or twice its previous
578      value */
579   if (tcp->bytes_read_this_round > tcp->target_length * 0.8) {
580     tcp->target_length =
581         GPR_MAX(2 * tcp->target_length, tcp->bytes_read_this_round);
582   } else {
583     tcp->target_length =
584         0.99 * tcp->target_length + 0.01 * tcp->bytes_read_this_round;
585   }
586   tcp->bytes_read_this_round = 0;
587 }
588 
get_target_read_size(grpc_tcp * tcp)589 static size_t get_target_read_size(grpc_tcp* tcp) {
590   grpc_resource_quota* rq = grpc_resource_user_quota(tcp->resource_user);
591   double pressure = grpc_resource_quota_get_memory_pressure(rq);
592   double target =
593       tcp->target_length * (pressure > 0.8 ? (1.0 - pressure) / 0.2 : 1.0);
594   size_t sz = ((static_cast<size_t> GPR_CLAMP(target, tcp->min_read_chunk_size,
595                                               tcp->max_read_chunk_size)) +
596                255) &
597               ~static_cast<size_t>(255);
598   /* don't use more than 1/16th of the overall resource quota for a single read
599    * alloc */
600   size_t rqmax = grpc_resource_quota_peek_size(rq);
601   if (sz > rqmax / 16 && rqmax > 1024) {
602     sz = rqmax / 16;
603   }
604   return sz;
605 }
606 
tcp_annotate_error(grpc_error * src_error,grpc_tcp * tcp)607 static grpc_error* tcp_annotate_error(grpc_error* src_error, grpc_tcp* tcp) {
608   return grpc_error_set_str(
609       grpc_error_set_int(
610           grpc_error_set_int(src_error, GRPC_ERROR_INT_FD, tcp->fd),
611           /* All tcp errors are marked with UNAVAILABLE so that application may
612            * choose to retry. */
613           GRPC_ERROR_INT_GRPC_STATUS, GRPC_STATUS_UNAVAILABLE),
614       GRPC_ERROR_STR_TARGET_ADDRESS,
615       grpc_slice_from_copied_string(tcp->peer_string.c_str()));
616 }
617 
618 static void tcp_handle_read(void* arg /* grpc_tcp */, grpc_error* error);
619 static void tcp_handle_write(void* arg /* grpc_tcp */, grpc_error* error);
620 
tcp_shutdown(grpc_endpoint * ep,grpc_error * why)621 static void tcp_shutdown(grpc_endpoint* ep, grpc_error* why) {
622   grpc_tcp* tcp = reinterpret_cast<grpc_tcp*>(ep);
623   ZerocopyDisableAndWaitForRemaining(tcp);
624   grpc_fd_shutdown(tcp->em_fd, why);
625   grpc_resource_user_shutdown(tcp->resource_user);
626 }
627 
tcp_free(grpc_tcp * tcp)628 static void tcp_free(grpc_tcp* tcp) {
629   grpc_fd_orphan(tcp->em_fd, tcp->release_fd_cb, tcp->release_fd,
630                  "tcp_unref_orphan");
631   grpc_slice_buffer_destroy_internal(&tcp->last_read_buffer);
632   grpc_resource_user_unref(tcp->resource_user);
633   /* The lock is not really necessary here, since all refs have been released */
634   gpr_mu_lock(&tcp->tb_mu);
635   grpc_core::TracedBuffer::Shutdown(
636       &tcp->tb_head, tcp->outgoing_buffer_arg,
637       GRPC_ERROR_CREATE_FROM_STATIC_STRING("endpoint destroyed"));
638   gpr_mu_unlock(&tcp->tb_mu);
639   tcp->outgoing_buffer_arg = nullptr;
640   gpr_mu_destroy(&tcp->tb_mu);
641   delete tcp;
642 }
643 
644 #ifndef NDEBUG
645 #define TCP_UNREF(tcp, reason) tcp_unref((tcp), (reason), DEBUG_LOCATION)
646 #define TCP_REF(tcp, reason) tcp_ref((tcp), (reason), DEBUG_LOCATION)
tcp_unref(grpc_tcp * tcp,const char * reason,const grpc_core::DebugLocation & debug_location)647 static void tcp_unref(grpc_tcp* tcp, const char* reason,
648                       const grpc_core::DebugLocation& debug_location) {
649   if (GPR_UNLIKELY(tcp->refcount.Unref(debug_location, reason))) {
650     tcp_free(tcp);
651   }
652 }
653 
tcp_ref(grpc_tcp * tcp,const char * reason,const grpc_core::DebugLocation & debug_location)654 static void tcp_ref(grpc_tcp* tcp, const char* reason,
655                     const grpc_core::DebugLocation& debug_location) {
656   tcp->refcount.Ref(debug_location, reason);
657 }
658 #else
659 #define TCP_UNREF(tcp, reason) tcp_unref((tcp))
660 #define TCP_REF(tcp, reason) tcp_ref((tcp))
tcp_unref(grpc_tcp * tcp)661 static void tcp_unref(grpc_tcp* tcp) {
662   if (GPR_UNLIKELY(tcp->refcount.Unref())) {
663     tcp_free(tcp);
664   }
665 }
666 
tcp_ref(grpc_tcp * tcp)667 static void tcp_ref(grpc_tcp* tcp) { tcp->refcount.Ref(); }
668 #endif
669 
tcp_destroy(grpc_endpoint * ep)670 static void tcp_destroy(grpc_endpoint* ep) {
671   grpc_tcp* tcp = reinterpret_cast<grpc_tcp*>(ep);
672   grpc_slice_buffer_reset_and_unref_internal(&tcp->last_read_buffer);
673   if (grpc_event_engine_can_track_errors()) {
674     ZerocopyDisableAndWaitForRemaining(tcp);
675     gpr_atm_no_barrier_store(&tcp->stop_error_notification, true);
676     grpc_fd_set_error(tcp->em_fd);
677   }
678   TCP_UNREF(tcp, "destroy");
679 }
680 
call_read_cb(grpc_tcp * tcp,grpc_error * error)681 static void call_read_cb(grpc_tcp* tcp, grpc_error* error) {
682   grpc_closure* cb = tcp->read_cb;
683 
684   if (GRPC_TRACE_FLAG_ENABLED(grpc_tcp_trace)) {
685     gpr_log(GPR_INFO, "TCP:%p call_cb %p %p:%p", tcp, cb, cb->cb, cb->cb_arg);
686     size_t i;
687     const char* str = grpc_error_string(error);
688     gpr_log(GPR_INFO, "READ %p (peer=%s) error=%s", tcp,
689             tcp->peer_string.c_str(), str);
690 
691     if (gpr_should_log(GPR_LOG_SEVERITY_DEBUG)) {
692       for (i = 0; i < tcp->incoming_buffer->count; i++) {
693         char* dump = grpc_dump_slice(tcp->incoming_buffer->slices[i],
694                                      GPR_DUMP_HEX | GPR_DUMP_ASCII);
695         gpr_log(GPR_DEBUG, "DATA: %s", dump);
696         gpr_free(dump);
697       }
698     }
699   }
700 
701   tcp->read_cb = nullptr;
702   tcp->incoming_buffer = nullptr;
703   grpc_core::Closure::Run(DEBUG_LOCATION, cb, error);
704 }
705 
706 #define MAX_READ_IOVEC 4
tcp_do_read(grpc_tcp * tcp)707 static void tcp_do_read(grpc_tcp* tcp) {
708   GPR_TIMER_SCOPE("tcp_do_read", 0);
709   struct msghdr msg;
710   struct iovec iov[MAX_READ_IOVEC];
711   ssize_t read_bytes;
712   size_t total_read_bytes = 0;
713   size_t iov_len =
714       std::min<size_t>(MAX_READ_IOVEC, tcp->incoming_buffer->count);
715 #ifdef GRPC_LINUX_ERRQUEUE
716   constexpr size_t cmsg_alloc_space =
717       CMSG_SPACE(sizeof(grpc_core::scm_timestamping)) + CMSG_SPACE(sizeof(int));
718 #else
719   constexpr size_t cmsg_alloc_space = 24 /* CMSG_SPACE(sizeof(int)) */;
720 #endif /* GRPC_LINUX_ERRQUEUE */
721   char cmsgbuf[cmsg_alloc_space];
722   for (size_t i = 0; i < iov_len; i++) {
723     iov[i].iov_base = GRPC_SLICE_START_PTR(tcp->incoming_buffer->slices[i]);
724     iov[i].iov_len = GRPC_SLICE_LENGTH(tcp->incoming_buffer->slices[i]);
725   }
726 
727   do {
728     /* Assume there is something on the queue. If we receive TCP_INQ from
729      * kernel, we will update this value, otherwise, we have to assume there is
730      * always something to read until we get EAGAIN. */
731     tcp->inq = 1;
732 
733     msg.msg_name = nullptr;
734     msg.msg_namelen = 0;
735     msg.msg_iov = iov;
736     msg.msg_iovlen = static_cast<msg_iovlen_type>(iov_len);
737     if (tcp->inq_capable) {
738       msg.msg_control = cmsgbuf;
739       msg.msg_controllen = sizeof(cmsgbuf);
740     } else {
741       msg.msg_control = nullptr;
742       msg.msg_controllen = 0;
743     }
744     msg.msg_flags = 0;
745 
746     GRPC_STATS_INC_TCP_READ_OFFER(tcp->incoming_buffer->length);
747     GRPC_STATS_INC_TCP_READ_OFFER_IOV_SIZE(tcp->incoming_buffer->count);
748 
749     do {
750       GPR_TIMER_SCOPE("recvmsg", 0);
751       GRPC_STATS_INC_SYSCALL_READ();
752       read_bytes = recvmsg(tcp->fd, &msg, 0);
753     } while (read_bytes < 0 && errno == EINTR);
754 
755     /* We have read something in previous reads. We need to deliver those
756      * bytes to the upper layer. */
757     if (read_bytes <= 0 && total_read_bytes > 0) {
758       tcp->inq = 1;
759       break;
760     }
761 
762     if (read_bytes < 0) {
763       /* NB: After calling call_read_cb a parallel call of the read handler may
764        * be running. */
765       if (errno == EAGAIN) {
766         finish_estimate(tcp);
767         tcp->inq = 0;
768         /* We've consumed the edge, request a new one */
769         notify_on_read(tcp);
770       } else {
771         grpc_slice_buffer_reset_and_unref_internal(tcp->incoming_buffer);
772         call_read_cb(tcp,
773                      tcp_annotate_error(GRPC_OS_ERROR(errno, "recvmsg"), tcp));
774         TCP_UNREF(tcp, "read");
775       }
776       return;
777     }
778     if (read_bytes == 0) {
779       /* 0 read size ==> end of stream
780        *
781        * We may have read something, i.e., total_read_bytes > 0, but
782        * since the connection is closed we will drop the data here, because we
783        * can't call the callback multiple times. */
784       grpc_slice_buffer_reset_and_unref_internal(tcp->incoming_buffer);
785       call_read_cb(
786           tcp, tcp_annotate_error(
787                    GRPC_ERROR_CREATE_FROM_STATIC_STRING("Socket closed"), tcp));
788       TCP_UNREF(tcp, "read");
789       return;
790     }
791 
792     GRPC_STATS_INC_TCP_READ_SIZE(read_bytes);
793     add_to_estimate(tcp, static_cast<size_t>(read_bytes));
794     GPR_DEBUG_ASSERT((size_t)read_bytes <=
795                      tcp->incoming_buffer->length - total_read_bytes);
796 
797 #ifdef GRPC_HAVE_TCP_INQ
798     if (tcp->inq_capable) {
799       GPR_DEBUG_ASSERT(!(msg.msg_flags & MSG_CTRUNC));
800       struct cmsghdr* cmsg = CMSG_FIRSTHDR(&msg);
801       for (; cmsg != nullptr; cmsg = CMSG_NXTHDR(&msg, cmsg)) {
802         if (cmsg->cmsg_level == SOL_TCP && cmsg->cmsg_type == TCP_CM_INQ &&
803             cmsg->cmsg_len == CMSG_LEN(sizeof(int))) {
804           tcp->inq = *reinterpret_cast<int*>(CMSG_DATA(cmsg));
805           break;
806         }
807       }
808     }
809 #endif /* GRPC_HAVE_TCP_INQ */
810 
811     total_read_bytes += read_bytes;
812     if (tcp->inq == 0 || total_read_bytes == tcp->incoming_buffer->length) {
813       /* We have filled incoming_buffer, and we cannot read any more. */
814       break;
815     }
816 
817     /* We had a partial read, and still have space to read more data.
818      * So, adjust IOVs and try to read more. */
819     size_t remaining = read_bytes;
820     size_t j = 0;
821     for (size_t i = 0; i < iov_len; i++) {
822       if (remaining >= iov[i].iov_len) {
823         remaining -= iov[i].iov_len;
824         continue;
825       }
826       if (remaining > 0) {
827         iov[j].iov_base = static_cast<char*>(iov[i].iov_base) + remaining;
828         iov[j].iov_len = iov[i].iov_len - remaining;
829         remaining = 0;
830       } else {
831         iov[j].iov_base = iov[i].iov_base;
832         iov[j].iov_len = iov[i].iov_len;
833       }
834       ++j;
835     }
836     iov_len = j;
837   } while (true);
838 
839   if (tcp->inq == 0) {
840     finish_estimate(tcp);
841   }
842 
843   GPR_DEBUG_ASSERT(total_read_bytes > 0);
844   if (total_read_bytes < tcp->incoming_buffer->length) {
845     grpc_slice_buffer_trim_end(tcp->incoming_buffer,
846                                tcp->incoming_buffer->length - total_read_bytes,
847                                &tcp->last_read_buffer);
848   }
849   call_read_cb(tcp, GRPC_ERROR_NONE);
850   TCP_UNREF(tcp, "read");
851 }
852 
tcp_read_allocation_done(void * tcpp,grpc_error * error)853 static void tcp_read_allocation_done(void* tcpp, grpc_error* error) {
854   grpc_tcp* tcp = static_cast<grpc_tcp*>(tcpp);
855   if (GRPC_TRACE_FLAG_ENABLED(grpc_tcp_trace)) {
856     gpr_log(GPR_INFO, "TCP:%p read_allocation_done: %s", tcp,
857             grpc_error_string(error));
858   }
859   if (GPR_UNLIKELY(error != GRPC_ERROR_NONE)) {
860     grpc_slice_buffer_reset_and_unref_internal(tcp->incoming_buffer);
861     grpc_slice_buffer_reset_and_unref_internal(&tcp->last_read_buffer);
862     call_read_cb(tcp, GRPC_ERROR_REF(error));
863     TCP_UNREF(tcp, "read");
864   } else {
865     tcp_do_read(tcp);
866   }
867 }
868 
tcp_continue_read(grpc_tcp * tcp)869 static void tcp_continue_read(grpc_tcp* tcp) {
870   size_t target_read_size = get_target_read_size(tcp);
871   /* Wait for allocation only when there is no buffer left. */
872   if (tcp->incoming_buffer->length == 0 &&
873       tcp->incoming_buffer->count < MAX_READ_IOVEC) {
874     if (GRPC_TRACE_FLAG_ENABLED(grpc_tcp_trace)) {
875       gpr_log(GPR_INFO, "TCP:%p alloc_slices", tcp);
876     }
877     if (GPR_UNLIKELY(!grpc_resource_user_alloc_slices(&tcp->slice_allocator,
878                                                       target_read_size, 1,
879                                                       tcp->incoming_buffer))) {
880       // Wait for allocation.
881       return;
882     }
883   }
884   if (GRPC_TRACE_FLAG_ENABLED(grpc_tcp_trace)) {
885     gpr_log(GPR_INFO, "TCP:%p do_read", tcp);
886   }
887   tcp_do_read(tcp);
888 }
889 
tcp_handle_read(void * arg,grpc_error * error)890 static void tcp_handle_read(void* arg /* grpc_tcp */, grpc_error* error) {
891   grpc_tcp* tcp = static_cast<grpc_tcp*>(arg);
892   if (GRPC_TRACE_FLAG_ENABLED(grpc_tcp_trace)) {
893     gpr_log(GPR_INFO, "TCP:%p got_read: %s", tcp, grpc_error_string(error));
894   }
895 
896   if (GPR_UNLIKELY(error != GRPC_ERROR_NONE)) {
897     grpc_slice_buffer_reset_and_unref_internal(tcp->incoming_buffer);
898     grpc_slice_buffer_reset_and_unref_internal(&tcp->last_read_buffer);
899     call_read_cb(tcp, GRPC_ERROR_REF(error));
900     TCP_UNREF(tcp, "read");
901   } else {
902     tcp_continue_read(tcp);
903   }
904 }
905 
tcp_read(grpc_endpoint * ep,grpc_slice_buffer * incoming_buffer,grpc_closure * cb,bool urgent)906 static void tcp_read(grpc_endpoint* ep, grpc_slice_buffer* incoming_buffer,
907                      grpc_closure* cb, bool urgent) {
908   grpc_tcp* tcp = reinterpret_cast<grpc_tcp*>(ep);
909   GPR_ASSERT(tcp->read_cb == nullptr);
910   tcp->read_cb = cb;
911   tcp->incoming_buffer = incoming_buffer;
912   grpc_slice_buffer_reset_and_unref_internal(incoming_buffer);
913   grpc_slice_buffer_swap(incoming_buffer, &tcp->last_read_buffer);
914   TCP_REF(tcp, "read");
915   if (tcp->is_first_read) {
916     /* Endpoint read called for the very first time. Register read callback with
917      * the polling engine */
918     tcp->is_first_read = false;
919     notify_on_read(tcp);
920   } else if (!urgent && tcp->inq == 0) {
921     /* Upper layer asked to read more but we know there is no pending data
922      * to read from previous reads. So, wait for POLLIN.
923      */
924     notify_on_read(tcp);
925   } else {
926     /* Not the first time. We may or may not have more bytes available. In any
927      * case call tcp->read_done_closure (i.e tcp_handle_read()) which does the
928      * right thing (i.e calls tcp_do_read() which either reads the available
929      * bytes or calls notify_on_read() to be notified when new bytes become
930      * available */
931     grpc_core::Closure::Run(DEBUG_LOCATION, &tcp->read_done_closure,
932                             GRPC_ERROR_NONE);
933   }
934 }
935 
936 /* A wrapper around sendmsg. It sends \a msg over \a fd and returns the number
937  * of bytes sent. */
tcp_send(int fd,const struct msghdr * msg,int additional_flags=0)938 ssize_t tcp_send(int fd, const struct msghdr* msg, int additional_flags = 0) {
939   GPR_TIMER_SCOPE("sendmsg", 1);
940   ssize_t sent_length;
941   do {
942     /* TODO(klempner): Cork if this is a partial write */
943     GRPC_STATS_INC_SYSCALL_WRITE();
944     sent_length = sendmsg(fd, msg, SENDMSG_FLAGS | additional_flags);
945   } while (sent_length < 0 && errno == EINTR);
946   return sent_length;
947 }
948 
949 /** This is to be called if outgoing_buffer_arg is not null. On linux platforms,
950  * this will call sendmsg with socket options set to collect timestamps inside
951  * the kernel. On return, sent_length is set to the return value of the sendmsg
952  * call. Returns false if setting the socket options failed. This is not
953  * implemented for non-linux platforms currently, and crashes out.
954  */
955 static bool tcp_write_with_timestamps(grpc_tcp* tcp, struct msghdr* msg,
956                                       size_t sending_length,
957                                       ssize_t* sent_length,
958                                       int additional_flags = 0);
959 
960 /** The callback function to be invoked when we get an error on the socket. */
961 static void tcp_handle_error(void* arg /* grpc_tcp */, grpc_error* error);
962 
963 static TcpZerocopySendRecord* tcp_get_send_zerocopy_record(
964     grpc_tcp* tcp, grpc_slice_buffer* buf);
965 
966 #ifdef GRPC_LINUX_ERRQUEUE
967 static bool process_errors(grpc_tcp* tcp);
968 
tcp_get_send_zerocopy_record(grpc_tcp * tcp,grpc_slice_buffer * buf)969 static TcpZerocopySendRecord* tcp_get_send_zerocopy_record(
970     grpc_tcp* tcp, grpc_slice_buffer* buf) {
971   TcpZerocopySendRecord* zerocopy_send_record = nullptr;
972   const bool use_zerocopy =
973       tcp->tcp_zerocopy_send_ctx.enabled() &&
974       tcp->tcp_zerocopy_send_ctx.threshold_bytes() < buf->length;
975   if (use_zerocopy) {
976     zerocopy_send_record = tcp->tcp_zerocopy_send_ctx.GetSendRecord();
977     if (zerocopy_send_record == nullptr) {
978       process_errors(tcp);
979       zerocopy_send_record = tcp->tcp_zerocopy_send_ctx.GetSendRecord();
980     }
981     if (zerocopy_send_record != nullptr) {
982       zerocopy_send_record->PrepareForSends(buf);
983       GPR_DEBUG_ASSERT(buf->count == 0);
984       GPR_DEBUG_ASSERT(buf->length == 0);
985       tcp->outgoing_byte_idx = 0;
986       tcp->outgoing_buffer = nullptr;
987     }
988   }
989   return zerocopy_send_record;
990 }
991 
ZerocopyDisableAndWaitForRemaining(grpc_tcp * tcp)992 static void ZerocopyDisableAndWaitForRemaining(grpc_tcp* tcp) {
993   tcp->tcp_zerocopy_send_ctx.Shutdown();
994   while (!tcp->tcp_zerocopy_send_ctx.AllSendRecordsEmpty()) {
995     process_errors(tcp);
996   }
997 }
998 
tcp_write_with_timestamps(grpc_tcp * tcp,struct msghdr * msg,size_t sending_length,ssize_t * sent_length,int additional_flags)999 static bool tcp_write_with_timestamps(grpc_tcp* tcp, struct msghdr* msg,
1000                                       size_t sending_length,
1001                                       ssize_t* sent_length,
1002                                       int additional_flags) {
1003   if (!tcp->socket_ts_enabled) {
1004     uint32_t opt = grpc_core::kTimestampingSocketOptions;
1005     if (setsockopt(tcp->fd, SOL_SOCKET, SO_TIMESTAMPING,
1006                    static_cast<void*>(&opt), sizeof(opt)) != 0) {
1007       if (GRPC_TRACE_FLAG_ENABLED(grpc_tcp_trace)) {
1008         gpr_log(GPR_ERROR, "Failed to set timestamping options on the socket.");
1009       }
1010       return false;
1011     }
1012     tcp->bytes_counter = -1;
1013     tcp->socket_ts_enabled = true;
1014   }
1015   /* Set control message to indicate that you want timestamps. */
1016   union {
1017     char cmsg_buf[CMSG_SPACE(sizeof(uint32_t))];
1018     struct cmsghdr align;
1019   } u;
1020   cmsghdr* cmsg = reinterpret_cast<cmsghdr*>(u.cmsg_buf);
1021   cmsg->cmsg_level = SOL_SOCKET;
1022   cmsg->cmsg_type = SO_TIMESTAMPING;
1023   cmsg->cmsg_len = CMSG_LEN(sizeof(uint32_t));
1024   *reinterpret_cast<int*>(CMSG_DATA(cmsg)) =
1025       grpc_core::kTimestampingRecordingOptions;
1026   msg->msg_control = u.cmsg_buf;
1027   msg->msg_controllen = CMSG_SPACE(sizeof(uint32_t));
1028 
1029   /* If there was an error on sendmsg the logic in tcp_flush will handle it. */
1030   ssize_t length = tcp_send(tcp->fd, msg, additional_flags);
1031   *sent_length = length;
1032   /* Only save timestamps if all the bytes were taken by sendmsg. */
1033   if (sending_length == static_cast<size_t>(length)) {
1034     gpr_mu_lock(&tcp->tb_mu);
1035     grpc_core::TracedBuffer::AddNewEntry(
1036         &tcp->tb_head, static_cast<uint32_t>(tcp->bytes_counter + length),
1037         tcp->fd, tcp->outgoing_buffer_arg);
1038     gpr_mu_unlock(&tcp->tb_mu);
1039     tcp->outgoing_buffer_arg = nullptr;
1040   }
1041   return true;
1042 }
1043 
1044 static void UnrefMaybePutZerocopySendRecord(grpc_tcp* tcp,
1045                                             TcpZerocopySendRecord* record,
1046                                             uint32_t seq, const char* tag);
1047 // Reads \a cmsg to process zerocopy control messages.
process_zerocopy(grpc_tcp * tcp,struct cmsghdr * cmsg)1048 static void process_zerocopy(grpc_tcp* tcp, struct cmsghdr* cmsg) {
1049   GPR_DEBUG_ASSERT(cmsg);
1050   auto serr = reinterpret_cast<struct sock_extended_err*>(CMSG_DATA(cmsg));
1051   GPR_DEBUG_ASSERT(serr->ee_errno == 0);
1052   GPR_DEBUG_ASSERT(serr->ee_origin == SO_EE_ORIGIN_ZEROCOPY);
1053   const uint32_t lo = serr->ee_info;
1054   const uint32_t hi = serr->ee_data;
1055   for (uint32_t seq = lo; seq <= hi; ++seq) {
1056     // TODO(arjunroy): It's likely that lo and hi refer to zerocopy sequence
1057     // numbers that are generated by a single call to grpc_endpoint_write; ie.
1058     // we can batch the unref operation. So, check if record is the same for
1059     // both; if so, batch the unref/put.
1060     TcpZerocopySendRecord* record =
1061         tcp->tcp_zerocopy_send_ctx.ReleaseSendRecord(seq);
1062     GPR_DEBUG_ASSERT(record);
1063     UnrefMaybePutZerocopySendRecord(tcp, record, seq, "CALLBACK RCVD");
1064   }
1065 }
1066 
1067 // Whether the cmsg received from error queue is of the IPv4 or IPv6 levels.
CmsgIsIpLevel(const cmsghdr & cmsg)1068 static bool CmsgIsIpLevel(const cmsghdr& cmsg) {
1069   return (cmsg.cmsg_level == SOL_IPV6 && cmsg.cmsg_type == IPV6_RECVERR) ||
1070          (cmsg.cmsg_level == SOL_IP && cmsg.cmsg_type == IP_RECVERR);
1071 }
1072 
CmsgIsZeroCopy(const cmsghdr & cmsg)1073 static bool CmsgIsZeroCopy(const cmsghdr& cmsg) {
1074   if (!CmsgIsIpLevel(cmsg)) {
1075     return false;
1076   }
1077   auto serr = reinterpret_cast<const sock_extended_err*> CMSG_DATA(&cmsg);
1078   return serr->ee_errno == 0 && serr->ee_origin == SO_EE_ORIGIN_ZEROCOPY;
1079 }
1080 
1081 /** Reads \a cmsg to derive timestamps from the control messages. If a valid
1082  * timestamp is found, the traced buffer list is updated with this timestamp.
1083  * The caller of this function should be looping on the control messages found
1084  * in \a msg. \a cmsg should point to the control message that the caller wants
1085  * processed.
1086  * On return, a pointer to a control message is returned. On the next iteration,
1087  * CMSG_NXTHDR(msg, ret_val) should be passed as \a cmsg. */
process_timestamp(grpc_tcp * tcp,msghdr * msg,struct cmsghdr * cmsg)1088 struct cmsghdr* process_timestamp(grpc_tcp* tcp, msghdr* msg,
1089                                   struct cmsghdr* cmsg) {
1090   auto next_cmsg = CMSG_NXTHDR(msg, cmsg);
1091   cmsghdr* opt_stats = nullptr;
1092   if (next_cmsg == nullptr) {
1093     if (GRPC_TRACE_FLAG_ENABLED(grpc_tcp_trace)) {
1094       gpr_log(GPR_ERROR, "Received timestamp without extended error");
1095     }
1096     return cmsg;
1097   }
1098 
1099   /* Check if next_cmsg is an OPT_STATS msg */
1100   if (next_cmsg->cmsg_level == SOL_SOCKET &&
1101       next_cmsg->cmsg_type == SCM_TIMESTAMPING_OPT_STATS) {
1102     opt_stats = next_cmsg;
1103     next_cmsg = CMSG_NXTHDR(msg, opt_stats);
1104     if (next_cmsg == nullptr) {
1105       if (GRPC_TRACE_FLAG_ENABLED(grpc_tcp_trace)) {
1106         gpr_log(GPR_ERROR, "Received timestamp without extended error");
1107       }
1108       return opt_stats;
1109     }
1110   }
1111 
1112   if (!(next_cmsg->cmsg_level == SOL_IP || next_cmsg->cmsg_level == SOL_IPV6) ||
1113       !(next_cmsg->cmsg_type == IP_RECVERR ||
1114         next_cmsg->cmsg_type == IPV6_RECVERR)) {
1115     if (GRPC_TRACE_FLAG_ENABLED(grpc_tcp_trace)) {
1116       gpr_log(GPR_ERROR, "Unexpected control message");
1117     }
1118     return cmsg;
1119   }
1120 
1121   auto tss =
1122       reinterpret_cast<struct grpc_core::scm_timestamping*>(CMSG_DATA(cmsg));
1123   auto serr = reinterpret_cast<struct sock_extended_err*>(CMSG_DATA(next_cmsg));
1124   if (serr->ee_errno != ENOMSG ||
1125       serr->ee_origin != SO_EE_ORIGIN_TIMESTAMPING) {
1126     gpr_log(GPR_ERROR, "Unexpected control message");
1127     return cmsg;
1128   }
1129   /* The error handling can potentially be done on another thread so we need
1130    * to protect the traced buffer list. A lock free list might be better. Using
1131    * a simple mutex for now. */
1132   gpr_mu_lock(&tcp->tb_mu);
1133   grpc_core::TracedBuffer::ProcessTimestamp(&tcp->tb_head, serr, opt_stats,
1134                                             tss);
1135   gpr_mu_unlock(&tcp->tb_mu);
1136   return next_cmsg;
1137 }
1138 
1139 /** For linux platforms, reads the socket's error queue and processes error
1140  * messages from the queue.
1141  */
process_errors(grpc_tcp * tcp)1142 static bool process_errors(grpc_tcp* tcp) {
1143   bool processed_err = false;
1144   struct iovec iov;
1145   iov.iov_base = nullptr;
1146   iov.iov_len = 0;
1147   struct msghdr msg;
1148   msg.msg_name = nullptr;
1149   msg.msg_namelen = 0;
1150   msg.msg_iov = &iov;
1151   msg.msg_iovlen = 0;
1152   msg.msg_flags = 0;
1153   /* Allocate enough space so we don't need to keep increasing this as size
1154    * of OPT_STATS increase */
1155   constexpr size_t cmsg_alloc_space =
1156       CMSG_SPACE(sizeof(grpc_core::scm_timestamping)) +
1157       CMSG_SPACE(sizeof(sock_extended_err) + sizeof(sockaddr_in)) +
1158       CMSG_SPACE(32 * NLA_ALIGN(NLA_HDRLEN + sizeof(uint64_t)));
1159   /* Allocate aligned space for cmsgs received along with timestamps */
1160   union {
1161     char rbuf[cmsg_alloc_space];
1162     struct cmsghdr align;
1163   } aligned_buf;
1164   msg.msg_control = aligned_buf.rbuf;
1165   msg.msg_controllen = sizeof(aligned_buf.rbuf);
1166   int r, saved_errno;
1167   while (true) {
1168     do {
1169       r = recvmsg(tcp->fd, &msg, MSG_ERRQUEUE);
1170       saved_errno = errno;
1171     } while (r < 0 && saved_errno == EINTR);
1172 
1173     if (r == -1 && saved_errno == EAGAIN) {
1174       return processed_err; /* No more errors to process */
1175     }
1176     if (r == -1) {
1177       return processed_err;
1178     }
1179     if (GPR_UNLIKELY((msg.msg_flags & MSG_CTRUNC) != 0)) {
1180       gpr_log(GPR_ERROR, "Error message was truncated.");
1181     }
1182 
1183     if (msg.msg_controllen == 0) {
1184       /* There was no control message found. It was probably spurious. */
1185       return processed_err;
1186     }
1187     bool seen = false;
1188     for (auto cmsg = CMSG_FIRSTHDR(&msg); cmsg && cmsg->cmsg_len;
1189          cmsg = CMSG_NXTHDR(&msg, cmsg)) {
1190       if (CmsgIsZeroCopy(*cmsg)) {
1191         process_zerocopy(tcp, cmsg);
1192         seen = true;
1193         processed_err = true;
1194       } else if (cmsg->cmsg_level == SOL_SOCKET &&
1195                  cmsg->cmsg_type == SCM_TIMESTAMPING) {
1196         cmsg = process_timestamp(tcp, &msg, cmsg);
1197         seen = true;
1198         processed_err = true;
1199       } else {
1200         /* Got a control message that is not a timestamp or zerocopy. Don't know
1201          * how to handle this. */
1202         if (GRPC_TRACE_FLAG_ENABLED(grpc_tcp_trace)) {
1203           gpr_log(GPR_INFO,
1204                   "unknown control message cmsg_level:%d cmsg_type:%d",
1205                   cmsg->cmsg_level, cmsg->cmsg_type);
1206         }
1207         return processed_err;
1208       }
1209     }
1210     if (!seen) {
1211       return processed_err;
1212     }
1213   }
1214 }
1215 
tcp_handle_error(void * arg,grpc_error * error)1216 static void tcp_handle_error(void* arg /* grpc_tcp */, grpc_error* error) {
1217   grpc_tcp* tcp = static_cast<grpc_tcp*>(arg);
1218   if (GRPC_TRACE_FLAG_ENABLED(grpc_tcp_trace)) {
1219     gpr_log(GPR_INFO, "TCP:%p got_error: %s", tcp, grpc_error_string(error));
1220   }
1221 
1222   if (error != GRPC_ERROR_NONE ||
1223       static_cast<bool>(gpr_atm_acq_load(&tcp->stop_error_notification))) {
1224     /* We aren't going to register to hear on error anymore, so it is safe to
1225      * unref. */
1226     TCP_UNREF(tcp, "error-tracking");
1227     return;
1228   }
1229 
1230   /* We are still interested in collecting timestamps, so let's try reading
1231    * them. */
1232   bool processed = process_errors(tcp);
1233   /* This might not a timestamps error. Set the read and write closures to be
1234    * ready. */
1235   if (!processed) {
1236     grpc_fd_set_readable(tcp->em_fd);
1237     grpc_fd_set_writable(tcp->em_fd);
1238   }
1239   grpc_fd_notify_on_error(tcp->em_fd, &tcp->error_closure);
1240 }
1241 
1242 #else  /* GRPC_LINUX_ERRQUEUE */
tcp_get_send_zerocopy_record(grpc_tcp * tcp,grpc_slice_buffer * buf)1243 static TcpZerocopySendRecord* tcp_get_send_zerocopy_record(
1244     grpc_tcp* tcp, grpc_slice_buffer* buf) {
1245   return nullptr;
1246 }
1247 
ZerocopyDisableAndWaitForRemaining(grpc_tcp * tcp)1248 static void ZerocopyDisableAndWaitForRemaining(grpc_tcp* tcp) {}
1249 
tcp_write_with_timestamps(grpc_tcp *,struct msghdr *,size_t,ssize_t *,int)1250 static bool tcp_write_with_timestamps(grpc_tcp* /*tcp*/, struct msghdr* /*msg*/,
1251                                       size_t /*sending_length*/,
1252                                       ssize_t* /*sent_length*/,
1253                                       int /*additional_flags*/) {
1254   gpr_log(GPR_ERROR, "Write with timestamps not supported for this platform");
1255   GPR_ASSERT(0);
1256   return false;
1257 }
1258 
tcp_handle_error(void *,grpc_error *)1259 static void tcp_handle_error(void* /*arg*/ /* grpc_tcp */,
1260                              grpc_error* /*error*/) {
1261   gpr_log(GPR_ERROR, "Error handling is not supported for this platform");
1262   GPR_ASSERT(0);
1263 }
1264 #endif /* GRPC_LINUX_ERRQUEUE */
1265 
1266 /* If outgoing_buffer_arg is filled, shuts down the list early, so that any
1267  * release operations needed can be performed on the arg */
tcp_shutdown_buffer_list(grpc_tcp * tcp)1268 void tcp_shutdown_buffer_list(grpc_tcp* tcp) {
1269   if (tcp->outgoing_buffer_arg) {
1270     gpr_mu_lock(&tcp->tb_mu);
1271     grpc_core::TracedBuffer::Shutdown(
1272         &tcp->tb_head, tcp->outgoing_buffer_arg,
1273         GRPC_ERROR_CREATE_FROM_STATIC_STRING("TracedBuffer list shutdown"));
1274     gpr_mu_unlock(&tcp->tb_mu);
1275     tcp->outgoing_buffer_arg = nullptr;
1276   }
1277 }
1278 
1279 #if defined(IOV_MAX) && IOV_MAX < 1000
1280 #define MAX_WRITE_IOVEC IOV_MAX
1281 #else
1282 #define MAX_WRITE_IOVEC 1000
1283 #endif
PopulateIovs(size_t * unwind_slice_idx,size_t * unwind_byte_idx,size_t * sending_length,iovec * iov)1284 msg_iovlen_type TcpZerocopySendRecord::PopulateIovs(size_t* unwind_slice_idx,
1285                                                     size_t* unwind_byte_idx,
1286                                                     size_t* sending_length,
1287                                                     iovec* iov) {
1288   msg_iovlen_type iov_size;
1289   *unwind_slice_idx = out_offset_.slice_idx;
1290   *unwind_byte_idx = out_offset_.byte_idx;
1291   for (iov_size = 0;
1292        out_offset_.slice_idx != buf_.count && iov_size != MAX_WRITE_IOVEC;
1293        iov_size++) {
1294     iov[iov_size].iov_base =
1295         GRPC_SLICE_START_PTR(buf_.slices[out_offset_.slice_idx]) +
1296         out_offset_.byte_idx;
1297     iov[iov_size].iov_len =
1298         GRPC_SLICE_LENGTH(buf_.slices[out_offset_.slice_idx]) -
1299         out_offset_.byte_idx;
1300     *sending_length += iov[iov_size].iov_len;
1301     ++(out_offset_.slice_idx);
1302     out_offset_.byte_idx = 0;
1303   }
1304   GPR_DEBUG_ASSERT(iov_size > 0);
1305   return iov_size;
1306 }
1307 
UpdateOffsetForBytesSent(size_t sending_length,size_t actually_sent)1308 void TcpZerocopySendRecord::UpdateOffsetForBytesSent(size_t sending_length,
1309                                                      size_t actually_sent) {
1310   size_t trailing = sending_length - actually_sent;
1311   while (trailing > 0) {
1312     size_t slice_length;
1313     out_offset_.slice_idx--;
1314     slice_length = GRPC_SLICE_LENGTH(buf_.slices[out_offset_.slice_idx]);
1315     if (slice_length > trailing) {
1316       out_offset_.byte_idx = slice_length - trailing;
1317       break;
1318     } else {
1319       trailing -= slice_length;
1320     }
1321   }
1322 }
1323 
1324 // returns true if done, false if pending; if returning true, *error is set
do_tcp_flush_zerocopy(grpc_tcp * tcp,TcpZerocopySendRecord * record,grpc_error ** error)1325 static bool do_tcp_flush_zerocopy(grpc_tcp* tcp, TcpZerocopySendRecord* record,
1326                                   grpc_error** error) {
1327   struct msghdr msg;
1328   struct iovec iov[MAX_WRITE_IOVEC];
1329   msg_iovlen_type iov_size;
1330   ssize_t sent_length = 0;
1331   size_t sending_length;
1332   size_t unwind_slice_idx;
1333   size_t unwind_byte_idx;
1334   while (true) {
1335     sending_length = 0;
1336     iov_size = record->PopulateIovs(&unwind_slice_idx, &unwind_byte_idx,
1337                                     &sending_length, iov);
1338     msg.msg_name = nullptr;
1339     msg.msg_namelen = 0;
1340     msg.msg_iov = iov;
1341     msg.msg_iovlen = iov_size;
1342     msg.msg_flags = 0;
1343     bool tried_sending_message = false;
1344     // Before calling sendmsg (with or without timestamps): we
1345     // take a single ref on the zerocopy send record.
1346     tcp->tcp_zerocopy_send_ctx.NoteSend(record);
1347     if (tcp->outgoing_buffer_arg != nullptr) {
1348       if (!tcp->ts_capable ||
1349           !tcp_write_with_timestamps(tcp, &msg, sending_length, &sent_length,
1350                                      MSG_ZEROCOPY)) {
1351         /* We could not set socket options to collect Fathom timestamps.
1352          * Fallback on writing without timestamps. */
1353         tcp->ts_capable = false;
1354         tcp_shutdown_buffer_list(tcp);
1355       } else {
1356         tried_sending_message = true;
1357       }
1358     }
1359     if (!tried_sending_message) {
1360       msg.msg_control = nullptr;
1361       msg.msg_controllen = 0;
1362       GRPC_STATS_INC_TCP_WRITE_SIZE(sending_length);
1363       GRPC_STATS_INC_TCP_WRITE_IOV_SIZE(iov_size);
1364       sent_length = tcp_send(tcp->fd, &msg, MSG_ZEROCOPY);
1365     }
1366     if (sent_length < 0) {
1367       // If this particular send failed, drop ref taken earlier in this method.
1368       tcp->tcp_zerocopy_send_ctx.UndoSend();
1369       if (errno == EAGAIN) {
1370         record->UnwindIfThrottled(unwind_slice_idx, unwind_byte_idx);
1371         return false;
1372       } else if (errno == EPIPE) {
1373         *error = tcp_annotate_error(GRPC_OS_ERROR(errno, "sendmsg"), tcp);
1374         tcp_shutdown_buffer_list(tcp);
1375         return true;
1376       } else {
1377         *error = tcp_annotate_error(GRPC_OS_ERROR(errno, "sendmsg"), tcp);
1378         tcp_shutdown_buffer_list(tcp);
1379         return true;
1380       }
1381     }
1382     tcp->bytes_counter += sent_length;
1383     record->UpdateOffsetForBytesSent(sending_length,
1384                                      static_cast<size_t>(sent_length));
1385     if (record->AllSlicesSent()) {
1386       *error = GRPC_ERROR_NONE;
1387       return true;
1388     }
1389   }
1390 }
1391 
UnrefMaybePutZerocopySendRecord(grpc_tcp * tcp,TcpZerocopySendRecord * record,uint32_t seq,const char *)1392 static void UnrefMaybePutZerocopySendRecord(grpc_tcp* tcp,
1393                                             TcpZerocopySendRecord* record,
1394                                             uint32_t seq,
1395                                             const char* /* tag */) {
1396   if (record->Unref()) {
1397     tcp->tcp_zerocopy_send_ctx.PutSendRecord(record);
1398   }
1399 }
1400 
tcp_flush_zerocopy(grpc_tcp * tcp,TcpZerocopySendRecord * record,grpc_error ** error)1401 static bool tcp_flush_zerocopy(grpc_tcp* tcp, TcpZerocopySendRecord* record,
1402                                grpc_error** error) {
1403   bool done = do_tcp_flush_zerocopy(tcp, record, error);
1404   if (done) {
1405     // Either we encountered an error, or we successfully sent all the bytes.
1406     // In either case, we're done with this record.
1407     UnrefMaybePutZerocopySendRecord(tcp, record, 0, "flush_done");
1408   }
1409   return done;
1410 }
1411 
tcp_flush(grpc_tcp * tcp,grpc_error ** error)1412 static bool tcp_flush(grpc_tcp* tcp, grpc_error** error) {
1413   struct msghdr msg;
1414   struct iovec iov[MAX_WRITE_IOVEC];
1415   msg_iovlen_type iov_size;
1416   ssize_t sent_length = 0;
1417   size_t sending_length;
1418   size_t trailing;
1419   size_t unwind_slice_idx;
1420   size_t unwind_byte_idx;
1421 
1422   // We always start at zero, because we eagerly unref and trim the slice
1423   // buffer as we write
1424   size_t outgoing_slice_idx = 0;
1425 
1426   while (true) {
1427     sending_length = 0;
1428     unwind_slice_idx = outgoing_slice_idx;
1429     unwind_byte_idx = tcp->outgoing_byte_idx;
1430     for (iov_size = 0; outgoing_slice_idx != tcp->outgoing_buffer->count &&
1431                        iov_size != MAX_WRITE_IOVEC;
1432          iov_size++) {
1433       iov[iov_size].iov_base =
1434           GRPC_SLICE_START_PTR(
1435               tcp->outgoing_buffer->slices[outgoing_slice_idx]) +
1436           tcp->outgoing_byte_idx;
1437       iov[iov_size].iov_len =
1438           GRPC_SLICE_LENGTH(tcp->outgoing_buffer->slices[outgoing_slice_idx]) -
1439           tcp->outgoing_byte_idx;
1440       sending_length += iov[iov_size].iov_len;
1441       outgoing_slice_idx++;
1442       tcp->outgoing_byte_idx = 0;
1443     }
1444     GPR_ASSERT(iov_size > 0);
1445 
1446     msg.msg_name = nullptr;
1447     msg.msg_namelen = 0;
1448     msg.msg_iov = iov;
1449     msg.msg_iovlen = iov_size;
1450     msg.msg_flags = 0;
1451     bool tried_sending_message = false;
1452     if (tcp->outgoing_buffer_arg != nullptr) {
1453       if (!tcp->ts_capable ||
1454           !tcp_write_with_timestamps(tcp, &msg, sending_length, &sent_length)) {
1455         /* We could not set socket options to collect Fathom timestamps.
1456          * Fallback on writing without timestamps. */
1457         tcp->ts_capable = false;
1458         tcp_shutdown_buffer_list(tcp);
1459       } else {
1460         tried_sending_message = true;
1461       }
1462     }
1463     if (!tried_sending_message) {
1464       msg.msg_control = nullptr;
1465       msg.msg_controllen = 0;
1466 
1467       GRPC_STATS_INC_TCP_WRITE_SIZE(sending_length);
1468       GRPC_STATS_INC_TCP_WRITE_IOV_SIZE(iov_size);
1469 
1470       sent_length = tcp_send(tcp->fd, &msg);
1471     }
1472 
1473     if (sent_length < 0) {
1474       if (errno == EAGAIN) {
1475         tcp->outgoing_byte_idx = unwind_byte_idx;
1476         // unref all and forget about all slices that have been written to this
1477         // point
1478         for (size_t idx = 0; idx < unwind_slice_idx; ++idx) {
1479           grpc_slice_buffer_remove_first(tcp->outgoing_buffer);
1480         }
1481         return false;
1482       } else if (errno == EPIPE) {
1483         *error = tcp_annotate_error(GRPC_OS_ERROR(errno, "sendmsg"), tcp);
1484         grpc_slice_buffer_reset_and_unref_internal(tcp->outgoing_buffer);
1485         tcp_shutdown_buffer_list(tcp);
1486         return true;
1487       } else {
1488         *error = tcp_annotate_error(GRPC_OS_ERROR(errno, "sendmsg"), tcp);
1489         grpc_slice_buffer_reset_and_unref_internal(tcp->outgoing_buffer);
1490         tcp_shutdown_buffer_list(tcp);
1491         return true;
1492       }
1493     }
1494 
1495     GPR_ASSERT(tcp->outgoing_byte_idx == 0);
1496     tcp->bytes_counter += sent_length;
1497     trailing = sending_length - static_cast<size_t>(sent_length);
1498     while (trailing > 0) {
1499       size_t slice_length;
1500 
1501       outgoing_slice_idx--;
1502       slice_length =
1503           GRPC_SLICE_LENGTH(tcp->outgoing_buffer->slices[outgoing_slice_idx]);
1504       if (slice_length > trailing) {
1505         tcp->outgoing_byte_idx = slice_length - trailing;
1506         break;
1507       } else {
1508         trailing -= slice_length;
1509       }
1510     }
1511     if (outgoing_slice_idx == tcp->outgoing_buffer->count) {
1512       *error = GRPC_ERROR_NONE;
1513       grpc_slice_buffer_reset_and_unref_internal(tcp->outgoing_buffer);
1514       return true;
1515     }
1516   }
1517 }
1518 
tcp_handle_write(void * arg,grpc_error * error)1519 static void tcp_handle_write(void* arg /* grpc_tcp */, grpc_error* error) {
1520   grpc_tcp* tcp = static_cast<grpc_tcp*>(arg);
1521   grpc_closure* cb;
1522 
1523   if (error != GRPC_ERROR_NONE) {
1524     cb = tcp->write_cb;
1525     tcp->write_cb = nullptr;
1526     if (tcp->current_zerocopy_send != nullptr) {
1527       UnrefMaybePutZerocopySendRecord(tcp, tcp->current_zerocopy_send, 0,
1528                                       "handle_write_err");
1529       tcp->current_zerocopy_send = nullptr;
1530     }
1531     grpc_core::Closure::Run(DEBUG_LOCATION, cb, GRPC_ERROR_REF(error));
1532     TCP_UNREF(tcp, "write");
1533     return;
1534   }
1535 
1536   bool flush_result =
1537       tcp->current_zerocopy_send != nullptr
1538           ? tcp_flush_zerocopy(tcp, tcp->current_zerocopy_send, &error)
1539           : tcp_flush(tcp, &error);
1540   if (!flush_result) {
1541     if (GRPC_TRACE_FLAG_ENABLED(grpc_tcp_trace)) {
1542       gpr_log(GPR_INFO, "write: delayed");
1543     }
1544     notify_on_write(tcp);
1545     // tcp_flush does not populate error if it has returned false.
1546     GPR_DEBUG_ASSERT(error == GRPC_ERROR_NONE);
1547   } else {
1548     cb = tcp->write_cb;
1549     tcp->write_cb = nullptr;
1550     tcp->current_zerocopy_send = nullptr;
1551     if (GRPC_TRACE_FLAG_ENABLED(grpc_tcp_trace)) {
1552       const char* str = grpc_error_string(error);
1553       gpr_log(GPR_INFO, "write: %s", str);
1554     }
1555     // No need to take a ref on error since tcp_flush provides a ref.
1556     grpc_core::Closure::Run(DEBUG_LOCATION, cb, error);
1557     TCP_UNREF(tcp, "write");
1558   }
1559 }
1560 
tcp_write(grpc_endpoint * ep,grpc_slice_buffer * buf,grpc_closure * cb,void * arg)1561 static void tcp_write(grpc_endpoint* ep, grpc_slice_buffer* buf,
1562                       grpc_closure* cb, void* arg) {
1563   GPR_TIMER_SCOPE("tcp_write", 0);
1564   grpc_tcp* tcp = reinterpret_cast<grpc_tcp*>(ep);
1565   grpc_error* error = GRPC_ERROR_NONE;
1566   TcpZerocopySendRecord* zerocopy_send_record = nullptr;
1567 
1568   if (GRPC_TRACE_FLAG_ENABLED(grpc_tcp_trace)) {
1569     size_t i;
1570 
1571     for (i = 0; i < buf->count; i++) {
1572       gpr_log(GPR_INFO, "WRITE %p (peer=%s)", tcp, tcp->peer_string.c_str());
1573       if (gpr_should_log(GPR_LOG_SEVERITY_DEBUG)) {
1574         char* data =
1575             grpc_dump_slice(buf->slices[i], GPR_DUMP_HEX | GPR_DUMP_ASCII);
1576         gpr_log(GPR_DEBUG, "DATA: %s", data);
1577         gpr_free(data);
1578       }
1579     }
1580   }
1581 
1582   GPR_ASSERT(tcp->write_cb == nullptr);
1583   GPR_DEBUG_ASSERT(tcp->current_zerocopy_send == nullptr);
1584 
1585   if (buf->length == 0) {
1586     grpc_core::Closure::Run(
1587         DEBUG_LOCATION, cb,
1588         grpc_fd_is_shutdown(tcp->em_fd)
1589             ? tcp_annotate_error(GRPC_ERROR_CREATE_FROM_STATIC_STRING("EOF"),
1590                                  tcp)
1591             : GRPC_ERROR_NONE);
1592     tcp_shutdown_buffer_list(tcp);
1593     return;
1594   }
1595 
1596   zerocopy_send_record = tcp_get_send_zerocopy_record(tcp, buf);
1597   if (zerocopy_send_record == nullptr) {
1598     // Either not enough bytes, or couldn't allocate a zerocopy context.
1599     tcp->outgoing_buffer = buf;
1600     tcp->outgoing_byte_idx = 0;
1601   }
1602   tcp->outgoing_buffer_arg = arg;
1603   if (arg) {
1604     GPR_ASSERT(grpc_event_engine_can_track_errors());
1605   }
1606 
1607   bool flush_result =
1608       zerocopy_send_record != nullptr
1609           ? tcp_flush_zerocopy(tcp, zerocopy_send_record, &error)
1610           : tcp_flush(tcp, &error);
1611   if (!flush_result) {
1612     TCP_REF(tcp, "write");
1613     tcp->write_cb = cb;
1614     tcp->current_zerocopy_send = zerocopy_send_record;
1615     if (GRPC_TRACE_FLAG_ENABLED(grpc_tcp_trace)) {
1616       gpr_log(GPR_INFO, "write: delayed");
1617     }
1618     notify_on_write(tcp);
1619   } else {
1620     if (GRPC_TRACE_FLAG_ENABLED(grpc_tcp_trace)) {
1621       const char* str = grpc_error_string(error);
1622       gpr_log(GPR_INFO, "write: %s", str);
1623     }
1624     grpc_core::Closure::Run(DEBUG_LOCATION, cb, error);
1625   }
1626 }
1627 
tcp_add_to_pollset(grpc_endpoint * ep,grpc_pollset * pollset)1628 static void tcp_add_to_pollset(grpc_endpoint* ep, grpc_pollset* pollset) {
1629   grpc_tcp* tcp = reinterpret_cast<grpc_tcp*>(ep);
1630   grpc_pollset_add_fd(pollset, tcp->em_fd);
1631 }
1632 
tcp_add_to_pollset_set(grpc_endpoint * ep,grpc_pollset_set * pollset_set)1633 static void tcp_add_to_pollset_set(grpc_endpoint* ep,
1634                                    grpc_pollset_set* pollset_set) {
1635   grpc_tcp* tcp = reinterpret_cast<grpc_tcp*>(ep);
1636   grpc_pollset_set_add_fd(pollset_set, tcp->em_fd);
1637 }
1638 
tcp_delete_from_pollset_set(grpc_endpoint * ep,grpc_pollset_set * pollset_set)1639 static void tcp_delete_from_pollset_set(grpc_endpoint* ep,
1640                                         grpc_pollset_set* pollset_set) {
1641   grpc_tcp* tcp = reinterpret_cast<grpc_tcp*>(ep);
1642   ZerocopyDisableAndWaitForRemaining(tcp);
1643   grpc_pollset_set_del_fd(pollset_set, tcp->em_fd);
1644 }
1645 
tcp_get_peer(grpc_endpoint * ep)1646 static absl::string_view tcp_get_peer(grpc_endpoint* ep) {
1647   grpc_tcp* tcp = reinterpret_cast<grpc_tcp*>(ep);
1648   return tcp->peer_string;
1649 }
1650 
tcp_get_local_address(grpc_endpoint * ep)1651 static absl::string_view tcp_get_local_address(grpc_endpoint* ep) {
1652   grpc_tcp* tcp = reinterpret_cast<grpc_tcp*>(ep);
1653   return tcp->local_address;
1654 }
1655 
tcp_get_fd(grpc_endpoint * ep)1656 static int tcp_get_fd(grpc_endpoint* ep) {
1657   grpc_tcp* tcp = reinterpret_cast<grpc_tcp*>(ep);
1658   return tcp->fd;
1659 }
1660 
tcp_get_resource_user(grpc_endpoint * ep)1661 static grpc_resource_user* tcp_get_resource_user(grpc_endpoint* ep) {
1662   grpc_tcp* tcp = reinterpret_cast<grpc_tcp*>(ep);
1663   return tcp->resource_user;
1664 }
1665 
tcp_can_track_err(grpc_endpoint * ep)1666 static bool tcp_can_track_err(grpc_endpoint* ep) {
1667   grpc_tcp* tcp = reinterpret_cast<grpc_tcp*>(ep);
1668   if (!grpc_event_engine_can_track_errors()) {
1669     return false;
1670   }
1671   struct sockaddr addr;
1672   socklen_t len = sizeof(addr);
1673   if (getsockname(tcp->fd, &addr, &len) < 0) {
1674     return false;
1675   }
1676   if (addr.sa_family == AF_INET || addr.sa_family == AF_INET6) {
1677     return true;
1678   }
1679   return false;
1680 }
1681 
1682 static const grpc_endpoint_vtable vtable = {tcp_read,
1683                                             tcp_write,
1684                                             tcp_add_to_pollset,
1685                                             tcp_add_to_pollset_set,
1686                                             tcp_delete_from_pollset_set,
1687                                             tcp_shutdown,
1688                                             tcp_destroy,
1689                                             tcp_get_resource_user,
1690                                             tcp_get_peer,
1691                                             tcp_get_local_address,
1692                                             tcp_get_fd,
1693                                             tcp_can_track_err};
1694 
1695 #define MAX_CHUNK_SIZE (32 * 1024 * 1024)
1696 
grpc_tcp_create(grpc_fd * em_fd,const grpc_channel_args * channel_args,const char * peer_string)1697 grpc_endpoint* grpc_tcp_create(grpc_fd* em_fd,
1698                                const grpc_channel_args* channel_args,
1699                                const char* peer_string) {
1700   static constexpr bool kZerocpTxEnabledDefault = false;
1701   int tcp_read_chunk_size = GRPC_TCP_DEFAULT_READ_SLICE_SIZE;
1702   int tcp_max_read_chunk_size = 4 * 1024 * 1024;
1703   int tcp_min_read_chunk_size = 256;
1704   bool tcp_tx_zerocopy_enabled = kZerocpTxEnabledDefault;
1705   int tcp_tx_zerocopy_send_bytes_thresh =
1706       grpc_core::TcpZerocopySendCtx::kDefaultSendBytesThreshold;
1707   int tcp_tx_zerocopy_max_simult_sends =
1708       grpc_core::TcpZerocopySendCtx::kDefaultMaxSends;
1709   grpc_resource_quota* resource_quota = grpc_resource_quota_create(nullptr);
1710   if (channel_args != nullptr) {
1711     for (size_t i = 0; i < channel_args->num_args; i++) {
1712       if (0 ==
1713           strcmp(channel_args->args[i].key, GRPC_ARG_TCP_READ_CHUNK_SIZE)) {
1714         grpc_integer_options options = {tcp_read_chunk_size, 1, MAX_CHUNK_SIZE};
1715         tcp_read_chunk_size =
1716             grpc_channel_arg_get_integer(&channel_args->args[i], options);
1717       } else if (0 == strcmp(channel_args->args[i].key,
1718                              GRPC_ARG_TCP_MIN_READ_CHUNK_SIZE)) {
1719         grpc_integer_options options = {tcp_read_chunk_size, 1, MAX_CHUNK_SIZE};
1720         tcp_min_read_chunk_size =
1721             grpc_channel_arg_get_integer(&channel_args->args[i], options);
1722       } else if (0 == strcmp(channel_args->args[i].key,
1723                              GRPC_ARG_TCP_MAX_READ_CHUNK_SIZE)) {
1724         grpc_integer_options options = {tcp_read_chunk_size, 1, MAX_CHUNK_SIZE};
1725         tcp_max_read_chunk_size =
1726             grpc_channel_arg_get_integer(&channel_args->args[i], options);
1727       } else if (0 ==
1728                  strcmp(channel_args->args[i].key, GRPC_ARG_RESOURCE_QUOTA)) {
1729         grpc_resource_quota_unref_internal(resource_quota);
1730         resource_quota =
1731             grpc_resource_quota_ref_internal(static_cast<grpc_resource_quota*>(
1732                 channel_args->args[i].value.pointer.p));
1733       } else if (0 == strcmp(channel_args->args[i].key,
1734                              GRPC_ARG_TCP_TX_ZEROCOPY_ENABLED)) {
1735         tcp_tx_zerocopy_enabled = grpc_channel_arg_get_bool(
1736             &channel_args->args[i], kZerocpTxEnabledDefault);
1737       } else if (0 == strcmp(channel_args->args[i].key,
1738                              GRPC_ARG_TCP_TX_ZEROCOPY_SEND_BYTES_THRESHOLD)) {
1739         grpc_integer_options options = {
1740             grpc_core::TcpZerocopySendCtx::kDefaultSendBytesThreshold, 0,
1741             INT_MAX};
1742         tcp_tx_zerocopy_send_bytes_thresh =
1743             grpc_channel_arg_get_integer(&channel_args->args[i], options);
1744       } else if (0 == strcmp(channel_args->args[i].key,
1745                              GRPC_ARG_TCP_TX_ZEROCOPY_MAX_SIMULT_SENDS)) {
1746         grpc_integer_options options = {
1747             grpc_core::TcpZerocopySendCtx::kDefaultMaxSends, 0, INT_MAX};
1748         tcp_tx_zerocopy_max_simult_sends =
1749             grpc_channel_arg_get_integer(&channel_args->args[i], options);
1750       }
1751     }
1752   }
1753 
1754   if (tcp_min_read_chunk_size > tcp_max_read_chunk_size) {
1755     tcp_min_read_chunk_size = tcp_max_read_chunk_size;
1756   }
1757   tcp_read_chunk_size = GPR_CLAMP(tcp_read_chunk_size, tcp_min_read_chunk_size,
1758                                   tcp_max_read_chunk_size);
1759 
1760   grpc_tcp* tcp = new grpc_tcp(tcp_tx_zerocopy_max_simult_sends,
1761                                tcp_tx_zerocopy_send_bytes_thresh);
1762   tcp->base.vtable = &vtable;
1763   tcp->peer_string = peer_string;
1764   tcp->fd = grpc_fd_wrapped_fd(em_fd);
1765   grpc_resolved_address resolved_local_addr;
1766   memset(&resolved_local_addr, 0, sizeof(resolved_local_addr));
1767   resolved_local_addr.len = sizeof(resolved_local_addr.addr);
1768   if (getsockname(tcp->fd,
1769                   reinterpret_cast<sockaddr*>(resolved_local_addr.addr),
1770                   &resolved_local_addr.len) < 0) {
1771     tcp->local_address = "";
1772   } else {
1773     tcp->local_address = grpc_sockaddr_to_uri(&resolved_local_addr);
1774   }
1775   tcp->read_cb = nullptr;
1776   tcp->write_cb = nullptr;
1777   tcp->current_zerocopy_send = nullptr;
1778   tcp->release_fd_cb = nullptr;
1779   tcp->release_fd = nullptr;
1780   tcp->incoming_buffer = nullptr;
1781   tcp->target_length = static_cast<double>(tcp_read_chunk_size);
1782   tcp->min_read_chunk_size = tcp_min_read_chunk_size;
1783   tcp->max_read_chunk_size = tcp_max_read_chunk_size;
1784   tcp->bytes_read_this_round = 0;
1785   /* Will be set to false by the very first endpoint read function */
1786   tcp->is_first_read = true;
1787   tcp->bytes_counter = -1;
1788   tcp->socket_ts_enabled = false;
1789   tcp->ts_capable = true;
1790   tcp->outgoing_buffer_arg = nullptr;
1791   if (tcp_tx_zerocopy_enabled && !tcp->tcp_zerocopy_send_ctx.memory_limited()) {
1792 #ifdef GRPC_LINUX_ERRQUEUE
1793     const int enable = 1;
1794     auto err =
1795         setsockopt(tcp->fd, SOL_SOCKET, SO_ZEROCOPY, &enable, sizeof(enable));
1796     if (err == 0) {
1797       tcp->tcp_zerocopy_send_ctx.set_enabled(true);
1798     } else {
1799       gpr_log(GPR_ERROR, "Failed to set zerocopy options on the socket.");
1800     }
1801 #endif
1802   }
1803   /* paired with unref in grpc_tcp_destroy */
1804   new (&tcp->refcount) grpc_core::RefCount(
1805       1, GRPC_TRACE_FLAG_ENABLED(grpc_tcp_trace) ? "tcp" : nullptr);
1806   gpr_atm_no_barrier_store(&tcp->shutdown_count, 0);
1807   tcp->em_fd = em_fd;
1808   grpc_slice_buffer_init(&tcp->last_read_buffer);
1809   tcp->resource_user = grpc_resource_user_create(resource_quota, peer_string);
1810   grpc_resource_user_slice_allocator_init(
1811       &tcp->slice_allocator, tcp->resource_user, tcp_read_allocation_done, tcp);
1812   grpc_resource_quota_unref_internal(resource_quota);
1813   gpr_mu_init(&tcp->tb_mu);
1814   tcp->tb_head = nullptr;
1815   GRPC_CLOSURE_INIT(&tcp->read_done_closure, tcp_handle_read, tcp,
1816                     grpc_schedule_on_exec_ctx);
1817   if (grpc_event_engine_run_in_background()) {
1818     // If there is a polling engine always running in the background, there is
1819     // no need to run the backup poller.
1820     GRPC_CLOSURE_INIT(&tcp->write_done_closure, tcp_handle_write, tcp,
1821                       grpc_schedule_on_exec_ctx);
1822   } else {
1823     GRPC_CLOSURE_INIT(&tcp->write_done_closure,
1824                       tcp_drop_uncovered_then_handle_write, tcp,
1825                       grpc_schedule_on_exec_ctx);
1826   }
1827   /* Always assume there is something on the queue to read. */
1828   tcp->inq = 1;
1829 #ifdef GRPC_HAVE_TCP_INQ
1830   int one = 1;
1831   if (setsockopt(tcp->fd, SOL_TCP, TCP_INQ, &one, sizeof(one)) == 0) {
1832     tcp->inq_capable = true;
1833   } else {
1834     gpr_log(GPR_DEBUG, "cannot set inq fd=%d errno=%d", tcp->fd, errno);
1835     tcp->inq_capable = false;
1836   }
1837 #else
1838   tcp->inq_capable = false;
1839 #endif /* GRPC_HAVE_TCP_INQ */
1840   /* Start being notified on errors if event engine can track errors. */
1841   if (grpc_event_engine_can_track_errors()) {
1842     /* Grab a ref to tcp so that we can safely access the tcp struct when
1843      * processing errors. We unref when we no longer want to track errors
1844      * separately. */
1845     TCP_REF(tcp, "error-tracking");
1846     gpr_atm_rel_store(&tcp->stop_error_notification, 0);
1847     GRPC_CLOSURE_INIT(&tcp->error_closure, tcp_handle_error, tcp,
1848                       grpc_schedule_on_exec_ctx);
1849     grpc_fd_notify_on_error(tcp->em_fd, &tcp->error_closure);
1850   }
1851 
1852   return &tcp->base;
1853 }
1854 
grpc_tcp_fd(grpc_endpoint * ep)1855 int grpc_tcp_fd(grpc_endpoint* ep) {
1856   grpc_tcp* tcp = reinterpret_cast<grpc_tcp*>(ep);
1857   GPR_ASSERT(ep->vtable == &vtable);
1858   return grpc_fd_wrapped_fd(tcp->em_fd);
1859 }
1860 
grpc_tcp_destroy_and_release_fd(grpc_endpoint * ep,int * fd,grpc_closure * done)1861 void grpc_tcp_destroy_and_release_fd(grpc_endpoint* ep, int* fd,
1862                                      grpc_closure* done) {
1863   grpc_tcp* tcp = reinterpret_cast<grpc_tcp*>(ep);
1864   GPR_ASSERT(ep->vtable == &vtable);
1865   tcp->release_fd = fd;
1866   tcp->release_fd_cb = done;
1867   grpc_slice_buffer_reset_and_unref_internal(&tcp->last_read_buffer);
1868   if (grpc_event_engine_can_track_errors()) {
1869     /* Stop errors notification. */
1870     ZerocopyDisableAndWaitForRemaining(tcp);
1871     gpr_atm_no_barrier_store(&tcp->stop_error_notification, true);
1872     grpc_fd_set_error(tcp->em_fd);
1873   }
1874   TCP_UNREF(tcp, "destroy");
1875 }
1876 
1877 #endif /* GRPC_POSIX_SOCKET_TCP */
1878