1 /*
2  * Copyright (C) 2018, The Android Open Source Project
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *     http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 #include "statsd_writer.h"
17 
18 #include <errno.h>
19 #include <fcntl.h>
20 #include <inttypes.h>
21 #include <poll.h>
22 #include <private/android_logger.h>
23 #include <stdarg.h>
24 #include <stdatomic.h>
25 #include <stdio.h>
26 #include <stdlib.h>
27 #include <string.h>
28 #include <sys/stat.h>
29 #include <sys/types.h>
30 #include <sys/uio.h>
31 #include <sys/un.h>
32 #include <time.h>
33 #include <unistd.h>
34 
35 
36 // Compatibility shims for glibc-2.17 in the Android tree.
37 #ifndef __BIONIC__
38 
39 // gettid() is not present in unistd.h for glibc-2.17.
40 extern pid_t gettid();
41 
42 // TEMP_FAILURE_RETRY is not present in unistd.h for glibc-2.17.
43 #ifndef TEMP_FAILURE_RETRY
44 #define TEMP_FAILURE_RETRY(exp) ({       \
45   __typeof__(exp) _rc;                   \
46   do {                                   \
47   _rc = (exp);                           \
48   } while (_rc == -1 && errno == EINTR); \
49   _rc; })
50 #endif  // TEMP_FAILURE_RETRY
51 
52 #endif  // __BIONIC__
53 
54 static pthread_mutex_t log_init_lock = PTHREAD_MUTEX_INITIALIZER;
55 static atomic_int dropped = 0;
56 static atomic_int log_error = 0;
57 static atomic_int atom_tag = 0;
58 
59 void statsd_writer_init_lock() {
60     /*
61      * If we trigger a signal handler in the middle of locked activity and the
62      * signal handler logs a message, we could get into a deadlock state.
63      */
64     pthread_mutex_lock(&log_init_lock);
65 }
66 
67 int statd_writer_trylock() {
68     return pthread_mutex_trylock(&log_init_lock);
69 }
70 
71 void statsd_writer_init_unlock() {
72     pthread_mutex_unlock(&log_init_lock);
73 }
74 
75 static int statsdAvailable();
76 static int statsdOpen();
77 static void statsdClose();
78 static int statsdWrite(struct timespec* ts, struct iovec* vec, size_t nr);
79 static void statsdNoteDrop();
80 static int statsdIsClosed();
81 
82 struct android_log_transport_write statsdLoggerWrite = {
83         .name = "statsd",
84         .sock = -EBADF,
85         .available = statsdAvailable,
86         .open = statsdOpen,
87         .close = statsdClose,
88         .write = statsdWrite,
89         .noteDrop = statsdNoteDrop,
90         .isClosed = statsdIsClosed,
91 };
92 
93 /* log_init_lock assumed */
94 static int statsdOpen() {
95     int i, ret = 0;
96 
97     i = atomic_load(&statsdLoggerWrite.sock);
98     if (i < 0) {
99         int flags = SOCK_DGRAM;
100 #ifdef SOCK_CLOEXEC
101         flags |= SOCK_CLOEXEC;
102 #endif
103 #ifdef SOCK_NONBLOCK
104         flags |= SOCK_NONBLOCK;
105 #endif
106         int sock = TEMP_FAILURE_RETRY(socket(PF_UNIX, flags, 0));
107         if (sock < 0) {
108             ret = -errno;
109         } else {
110             int sndbuf = 1 * 1024 * 1024;  // set max send buffer size 1MB
111             socklen_t bufLen = sizeof(sndbuf);
112             // SO_RCVBUF does not have an effect on unix domain socket, but SO_SNDBUF does.
113             // Proceed to connect even setsockopt fails.
114             setsockopt(sock, SOL_SOCKET, SO_SNDBUF, &sndbuf, bufLen);
115             struct sockaddr_un un;
116             memset(&un, 0, sizeof(struct sockaddr_un));
117             un.sun_family = AF_UNIX;
118             strcpy(un.sun_path, "/dev/socket/statsdw");
119 
120             if (TEMP_FAILURE_RETRY(
121                         connect(sock, (struct sockaddr*)&un, sizeof(struct sockaddr_un))) < 0) {
122                 ret = -errno;
123                 switch (ret) {
124                     case -ENOTCONN:
125                     case -ECONNREFUSED:
126                     case -ENOENT:
127                         i = atomic_exchange(&statsdLoggerWrite.sock, ret);
128                     /* FALLTHRU */
129                     default:
130                         break;
131                 }
132                 close(sock);
133             } else {
134                 ret = atomic_exchange(&statsdLoggerWrite.sock, sock);
135                 if ((ret >= 0) && (ret != sock)) {
136                     close(ret);
137                 }
138                 ret = 0;
139             }
140         }
141     }
142 
143     return ret;
144 }
145 
146 static void __statsdClose(int negative_errno) {
147     int sock = atomic_exchange(&statsdLoggerWrite.sock, negative_errno);
148     if (sock >= 0) {
149         close(sock);
150     }
151 }
152 
153 static void statsdClose() {
154     __statsdClose(-EBADF);
155 }
156 
157 static int statsdAvailable() {
158     if (atomic_load(&statsdLoggerWrite.sock) < 0) {
159         if (access("/dev/socket/statsdw", W_OK) == 0) {
160             return 0;
161         }
162         return -EBADF;
163     }
164     return 1;
165 }
166 
167 static void statsdNoteDrop(int error, int tag) {
168     atomic_fetch_add_explicit(&dropped, 1, memory_order_relaxed);
169     atomic_exchange_explicit(&log_error, error, memory_order_relaxed);
170     atomic_exchange_explicit(&atom_tag, tag, memory_order_relaxed);
171 }
172 
173 static int statsdIsClosed() {
174     if (atomic_load(&statsdLoggerWrite.sock) < 0) {
175         return 1;
176     }
177     return 0;
178 }
179 
180 static int statsdWrite(struct timespec* ts, struct iovec* vec, size_t nr) {
181     ssize_t ret;
182     int sock;
183     static const unsigned headerLength = 1;
184     struct iovec newVec[nr + headerLength];
185     android_log_header_t header;
186     size_t i, payloadSize;
187 
188     sock = atomic_load(&statsdLoggerWrite.sock);
189     if (sock < 0) switch (sock) {
190             case -ENOTCONN:
191             case -ECONNREFUSED:
192             case -ENOENT:
193                 break;
194             default:
195                 return -EBADF;
196         }
197     /*
198      *  struct {
199      *      // what we provide to socket
200      *      android_log_header_t header;
201      *      // caller provides
202      *      union {
203      *          struct {
204      *              char     prio;
205      *              char     payload[];
206      *          } string;
207      *          struct {
208      *              uint32_t tag
209      *              char     payload[];
210      *          } binary;
211      *      };
212      *  };
213      */
214 
215     header.tid = gettid();
216     header.realtime.tv_sec = ts->tv_sec;
217     header.realtime.tv_nsec = ts->tv_nsec;
218 
219     newVec[0].iov_base = (unsigned char*)&header;
220     newVec[0].iov_len = sizeof(header);
221 
222     // If we dropped events before, try to tell statsd.
223     if (sock >= 0) {
224         int32_t snapshot = atomic_exchange_explicit(&dropped, 0, memory_order_relaxed);
225         if (snapshot) {
226             android_log_event_long_t buffer;
227             header.id = LOG_ID_STATS;
228             // store the last log error in the tag field. This tag field is not used by statsd.
229             buffer.header.tag = atomic_load(&log_error);
230             buffer.payload.type = EVENT_TYPE_LONG;
231             // format:
232             // |atom_tag|dropped_count|
233             int64_t composed_long = atomic_load(&atom_tag);
234             // Send 2 int32's via an int64.
235             composed_long = ((composed_long << 32) | ((int64_t)snapshot));
236             buffer.payload.data = composed_long;
237 
238             newVec[headerLength].iov_base = &buffer;
239             newVec[headerLength].iov_len = sizeof(buffer);
240 
241             ret = TEMP_FAILURE_RETRY(writev(sock, newVec, 2));
242             if (ret != (ssize_t)(sizeof(header) + sizeof(buffer))) {
243                 atomic_fetch_add_explicit(&dropped, snapshot, memory_order_relaxed);
244             }
245         }
246     }
247 
248     header.id = LOG_ID_STATS;
249 
250     for (payloadSize = 0, i = headerLength; i < nr + headerLength; i++) {
251         newVec[i].iov_base = vec[i - headerLength].iov_base;
252         payloadSize += newVec[i].iov_len = vec[i - headerLength].iov_len;
253 
254         if (payloadSize > LOGGER_ENTRY_MAX_PAYLOAD) {
255             newVec[i].iov_len -= payloadSize - LOGGER_ENTRY_MAX_PAYLOAD;
256             if (newVec[i].iov_len) {
257                 ++i;
258             }
259             break;
260         }
261     }
262 
263     /*
264      * The write below could be lost, but will never block.
265      *
266      * ENOTCONN occurs if statsd has died.
267      * ENOENT occurs if statsd is not running and socket is missing.
268      * ECONNREFUSED occurs if we can not reconnect to statsd.
269      * EAGAIN occurs if statsd is overloaded.
270      */
271     if (sock < 0) {
272         ret = sock;
273     } else {
274         ret = TEMP_FAILURE_RETRY(writev(sock, newVec, i));
275         if (ret < 0) {
276             ret = -errno;
277         }
278     }
279     switch (ret) {
280         case -ENOTCONN:
281         case -ECONNREFUSED:
282         case -ENOENT:
283             if (statd_writer_trylock()) {
284                 return ret; /* in a signal handler? try again when less stressed
285                              */
286             }
287             __statsdClose(ret);
288             ret = statsdOpen();
289             statsd_writer_init_unlock();
290 
291             if (ret < 0) {
292                 return ret;
293             }
294 
295             ret = TEMP_FAILURE_RETRY(writev(atomic_load(&statsdLoggerWrite.sock), newVec, i));
296             if (ret < 0) {
297                 ret = -errno;
298             }
299         /* FALLTHRU */
300         default:
301             break;
302     }
303 
304     if (ret > (ssize_t)sizeof(header)) {
305         ret -= sizeof(header);
306     }
307 
308     return ret;
309 }
310