1 /*
2  * Copyright (C) 2018, The Android Open Source Project
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *     http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 #include "statsd_writer.h"
17 
18 #include <android-base/threads.h>
19 #include <errno.h>
20 #include <fcntl.h>
21 #include <inttypes.h>
22 #include <poll.h>
23 #include <private/android_logger.h>
24 #include <stdarg.h>
25 #include <stdatomic.h>
26 #include <stdio.h>
27 #include <stdlib.h>
28 #include <string.h>
29 #include <sys/stat.h>
30 #include <sys/types.h>
31 #include <sys/uio.h>
32 #include <sys/un.h>
33 #include <time.h>
34 #include <unistd.h>
35 
36 #include "stats_socket_loss_reporter.h"
37 #include "utils.h"
38 
39 // Compatibility shims for glibc-2.17 in the Android tree.
40 #ifndef __BIONIC__
41 
42 // TEMP_FAILURE_RETRY is not present in unistd.h for glibc-2.17.
43 #ifndef TEMP_FAILURE_RETRY
44 #define TEMP_FAILURE_RETRY(exp) ({       \
45   __typeof__(exp) _rc;                   \
46   do {                                   \
47   _rc = (exp);                           \
48   } while (_rc == -1 && errno == EINTR); \
49   _rc; })
50 #endif  // TEMP_FAILURE_RETRY
51 
52 #endif  // __BIONIC__
53 
54 static pthread_mutex_t log_init_lock = PTHREAD_MUTEX_INITIALIZER;
55 static atomic_int dropped = 0;
56 static atomic_int log_error = 0;
57 static atomic_int atom_tag = 0;
58 
statsd_writer_init_lock()59 void statsd_writer_init_lock() {
60     /*
61      * If we trigger a signal handler in the middle of locked activity and the
62      * signal handler logs a message, we could get into a deadlock state.
63      */
64     pthread_mutex_lock(&log_init_lock);
65 }
66 
statd_writer_trylock()67 int statd_writer_trylock() {
68     return pthread_mutex_trylock(&log_init_lock);
69 }
70 
statsd_writer_init_unlock()71 void statsd_writer_init_unlock() {
72     pthread_mutex_unlock(&log_init_lock);
73 }
74 
75 static int statsdAvailable();
76 static int statsdOpen();
77 static void statsdClose();
78 static int statsdWrite(struct timespec* ts, struct iovec* vec, size_t nr);
79 static void statsdNoteDrop(int error, int tag);
80 static int statsdIsClosed();
81 
82 struct android_log_transport_write statsdLoggerWrite = {
83         .name = "statsd",
84         .sock = -EBADF,
85         .available = statsdAvailable,
86         .open = statsdOpen,
87         .close = statsdClose,
88         .write = statsdWrite,
89         .noteDrop = statsdNoteDrop,
90         .isClosed = statsdIsClosed,
91 };
92 
93 /* log_init_lock assumed */
statsdOpen()94 static int statsdOpen() {
95     int i, ret = 0;
96 
97     i = atomic_load(&statsdLoggerWrite.sock);
98     if (i < 0) {
99         int flags = SOCK_DGRAM;
100 #ifdef SOCK_CLOEXEC
101         flags |= SOCK_CLOEXEC;
102 #endif
103 #ifdef SOCK_NONBLOCK
104         flags |= SOCK_NONBLOCK;
105 #endif
106         int sock = TEMP_FAILURE_RETRY(socket(PF_UNIX, flags, 0));
107         if (sock < 0) {
108             ret = -errno;
109         } else {
110             const int sndbuf = 2 * 1024 * 1024;  // set max send buffer size 2MB
111             socklen_t bufLen = sizeof(sndbuf);
112             // SO_RCVBUF does not have an effect on unix domain socket, but SO_SNDBUF does.
113             // Proceed to connect even setsockopt fails.
114             setsockopt(sock, SOL_SOCKET, SO_SNDBUF, &sndbuf, bufLen);
115             struct sockaddr_un un;
116             memset(&un, 0, sizeof(struct sockaddr_un));
117             un.sun_family = AF_UNIX;
118             strcpy(un.sun_path, "/dev/socket/statsdw");
119 
120             if (TEMP_FAILURE_RETRY(
121                         connect(sock, (struct sockaddr*)&un, sizeof(struct sockaddr_un))) < 0) {
122                 ret = -errno;
123                 switch (ret) {
124                     case -ENOTCONN:
125                     case -ECONNREFUSED:
126                     case -ENOENT:
127                         i = atomic_exchange(&statsdLoggerWrite.sock, ret);
128                         break;
129                     default:
130                         break;
131                 }
132                 close(sock);
133             } else {
134                 ret = atomic_exchange(&statsdLoggerWrite.sock, sock);
135                 if ((ret >= 0) && (ret != sock)) {
136                     close(ret);
137                 }
138                 ret = 0;
139             }
140         }
141     }
142 
143     return ret;
144 }
145 
__statsdClose(int negative_errno)146 static void __statsdClose(int negative_errno) {
147     int sock = atomic_exchange(&statsdLoggerWrite.sock, negative_errno);
148     if (sock >= 0) {
149         close(sock);
150     }
151 }
152 
statsdClose()153 static void statsdClose() {
154     __statsdClose(-EBADF);
155 }
156 
statsdAvailable()157 static int statsdAvailable() {
158     if (atomic_load(&statsdLoggerWrite.sock) < 0) {
159         if (access("/dev/socket/statsdw", W_OK) == 0) {
160             return 0;
161         }
162         return -EBADF;
163     }
164     return 1;
165 }
166 
statsdNoteDrop(int error,int tag)167 static void statsdNoteDrop(int error, int tag) {
168     atomic_fetch_add_explicit(&dropped, 1, memory_order_relaxed);
169     atomic_exchange_explicit(&log_error, error, memory_order_relaxed);
170     atomic_exchange_explicit(&atom_tag, tag, memory_order_relaxed);
171 
172     StatsSocketLossReporter::getInstance().noteDrop(error, tag);
173 }
174 
statsdIsClosed()175 static int statsdIsClosed() {
176     if (atomic_load(&statsdLoggerWrite.sock) < 0) {
177         return 1;
178     }
179     return 0;
180 }
181 
statsdWrite(struct timespec * ts,struct iovec * vec,size_t nr)182 static int statsdWrite(struct timespec* ts, struct iovec* vec, size_t nr) {
183     ssize_t ret;
184     int sock;
185     static const unsigned headerLength = 1;
186     struct iovec newVec[nr + headerLength];
187     android_log_header_t header;
188     size_t i, payloadSize;
189 
190     sock = atomic_load(&statsdLoggerWrite.sock);
191     if (sock < 0) switch (sock) {
192             case -ENOTCONN:
193             case -ECONNREFUSED:
194             case -ENOENT:
195                 break;
196             default:
197                 return -EBADF;
198         }
199     /*
200      *  struct {
201      *      // what we provide to socket
202      *      android_log_header_t header;
203      *      // caller provides
204      *      union {
205      *          struct {
206      *              char     prio;
207      *              char     payload[];
208      *          } string;
209      *          struct {
210      *              uint32_t tag
211      *              char     payload[];
212      *          } binary;
213      *      };
214      *  };
215      */
216 
217     header.tid = android::base::GetThreadId();
218     header.realtime.tv_sec = ts->tv_sec;
219     header.realtime.tv_nsec = ts->tv_nsec;
220 
221     newVec[0].iov_base = (unsigned char*)&header;
222     newVec[0].iov_len = sizeof(header);
223 
224     // If we dropped events before, try to tell statsd.
225     if (sock >= 0) {
226         int32_t snapshot = atomic_exchange_explicit(&dropped, 0, memory_order_relaxed);
227         if (snapshot) {
228             android_log_event_long_t buffer;
229             header.id = LOG_ID_STATS;
230             // store the last log error in the tag field. This tag field is not used by statsd.
231             buffer.header.tag = atomic_load(&log_error);
232             buffer.payload.type = EVENT_TYPE_LONG;
233             // format:
234             // |atom_tag|dropped_count|
235             int64_t composed_long = atomic_load(&atom_tag);
236             // Send 2 int32's via an int64.
237             composed_long = ((composed_long << 32) | ((int64_t)snapshot));
238             buffer.payload.data = composed_long;
239 
240             newVec[headerLength].iov_base = &buffer;
241             newVec[headerLength].iov_len = sizeof(buffer);
242 
243             ret = TEMP_FAILURE_RETRY(writev(sock, newVec, 2));
244             if (ret != (ssize_t)(sizeof(header) + sizeof(buffer))) {
245                 atomic_fetch_add_explicit(&dropped, snapshot, memory_order_relaxed);
246             } else {
247                 // try to send socket loss info only when socket connection established
248                 // and it is proved by previous write that socket is available
249                 StatsSocketLossReporter::getInstance().dumpAtomsLossStats();
250             }
251         }
252     }
253 
254     header.id = LOG_ID_STATS;
255 
256     for (payloadSize = 0, i = headerLength; i < nr + headerLength; i++) {
257         newVec[i].iov_base = vec[i - headerLength].iov_base;
258         payloadSize += newVec[i].iov_len = vec[i - headerLength].iov_len;
259 
260         if (payloadSize > LOGGER_ENTRY_MAX_PAYLOAD) {
261             newVec[i].iov_len -= payloadSize - LOGGER_ENTRY_MAX_PAYLOAD;
262             if (newVec[i].iov_len) {
263                 ++i;
264             }
265             break;
266         }
267     }
268 
269     /*
270      * The write below could be lost, but will never block.
271      *
272      * ENOTCONN occurs if statsd has died.
273      * ENOENT occurs if statsd is not running and socket is missing.
274      * ECONNREFUSED occurs if we can not reconnect to statsd.
275      * EAGAIN occurs if statsd is overloaded.
276      */
277     if (sock < 0) {
278         ret = sock;
279     } else {
280         ret = TEMP_FAILURE_RETRY(writev(sock, newVec, i));
281         if (ret < 0) {
282             ret = -errno;
283         }
284     }
285     switch (ret) {
286         case -ENOTCONN:
287         case -ECONNREFUSED:
288         case -ENOENT:
289             if (statd_writer_trylock()) {
290                 return ret; /* in a signal handler? try again when less stressed
291                              */
292             }
293             __statsdClose(ret);
294             ret = statsdOpen();
295             statsd_writer_init_unlock();
296 
297             if (ret < 0) {
298                 return ret;
299             }
300 
301             ret = TEMP_FAILURE_RETRY(writev(atomic_load(&statsdLoggerWrite.sock), newVec, i));
302             if (ret < 0) {
303                 ret = -errno;
304             }
305             break;
306         default:
307             break;
308     }
309 
310     if (ret > (ssize_t)sizeof(header)) {
311         ret -= sizeof(header);
312     }
313 
314     return ret;
315 }
316