1 /*
2 * Copyright (C) 2018, The Android Open Source Project
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16 #include "statsd_writer.h"
17
18 #include <android-base/threads.h>
19 #include <errno.h>
20 #include <fcntl.h>
21 #include <inttypes.h>
22 #include <poll.h>
23 #include <private/android_logger.h>
24 #include <stdarg.h>
25 #include <stdatomic.h>
26 #include <stdio.h>
27 #include <stdlib.h>
28 #include <string.h>
29 #include <sys/stat.h>
30 #include <sys/types.h>
31 #include <sys/uio.h>
32 #include <sys/un.h>
33 #include <time.h>
34 #include <unistd.h>
35
36 #include "stats_socket_loss_reporter.h"
37 #include "utils.h"
38
39 // Compatibility shims for glibc-2.17 in the Android tree.
40 #ifndef __BIONIC__
41
42 // TEMP_FAILURE_RETRY is not present in unistd.h for glibc-2.17.
43 #ifndef TEMP_FAILURE_RETRY
44 #define TEMP_FAILURE_RETRY(exp) ({ \
45 __typeof__(exp) _rc; \
46 do { \
47 _rc = (exp); \
48 } while (_rc == -1 && errno == EINTR); \
49 _rc; })
50 #endif // TEMP_FAILURE_RETRY
51
52 #endif // __BIONIC__
53
54 static pthread_mutex_t log_init_lock = PTHREAD_MUTEX_INITIALIZER;
55 static atomic_int dropped = 0;
56 static atomic_int log_error = 0;
57 static atomic_int atom_tag = 0;
58
statsd_writer_init_lock()59 void statsd_writer_init_lock() {
60 /*
61 * If we trigger a signal handler in the middle of locked activity and the
62 * signal handler logs a message, we could get into a deadlock state.
63 */
64 pthread_mutex_lock(&log_init_lock);
65 }
66
statd_writer_trylock()67 int statd_writer_trylock() {
68 return pthread_mutex_trylock(&log_init_lock);
69 }
70
statsd_writer_init_unlock()71 void statsd_writer_init_unlock() {
72 pthread_mutex_unlock(&log_init_lock);
73 }
74
75 static int statsdAvailable();
76 static int statsdOpen();
77 static void statsdClose();
78 static int statsdWrite(struct timespec* ts, struct iovec* vec, size_t nr);
79 static void statsdNoteDrop(int error, int tag);
80 static int statsdIsClosed();
81
82 struct android_log_transport_write statsdLoggerWrite = {
83 .name = "statsd",
84 .sock = -EBADF,
85 .available = statsdAvailable,
86 .open = statsdOpen,
87 .close = statsdClose,
88 .write = statsdWrite,
89 .noteDrop = statsdNoteDrop,
90 .isClosed = statsdIsClosed,
91 };
92
93 /* log_init_lock assumed */
statsdOpen()94 static int statsdOpen() {
95 int i, ret = 0;
96
97 i = atomic_load(&statsdLoggerWrite.sock);
98 if (i < 0) {
99 int flags = SOCK_DGRAM;
100 #ifdef SOCK_CLOEXEC
101 flags |= SOCK_CLOEXEC;
102 #endif
103 #ifdef SOCK_NONBLOCK
104 flags |= SOCK_NONBLOCK;
105 #endif
106 int sock = TEMP_FAILURE_RETRY(socket(PF_UNIX, flags, 0));
107 if (sock < 0) {
108 ret = -errno;
109 } else {
110 const int sndbuf = 2 * 1024 * 1024; // set max send buffer size 2MB
111 socklen_t bufLen = sizeof(sndbuf);
112 // SO_RCVBUF does not have an effect on unix domain socket, but SO_SNDBUF does.
113 // Proceed to connect even setsockopt fails.
114 setsockopt(sock, SOL_SOCKET, SO_SNDBUF, &sndbuf, bufLen);
115 struct sockaddr_un un;
116 memset(&un, 0, sizeof(struct sockaddr_un));
117 un.sun_family = AF_UNIX;
118 strcpy(un.sun_path, "/dev/socket/statsdw");
119
120 if (TEMP_FAILURE_RETRY(
121 connect(sock, (struct sockaddr*)&un, sizeof(struct sockaddr_un))) < 0) {
122 ret = -errno;
123 switch (ret) {
124 case -ENOTCONN:
125 case -ECONNREFUSED:
126 case -ENOENT:
127 i = atomic_exchange(&statsdLoggerWrite.sock, ret);
128 break;
129 default:
130 break;
131 }
132 close(sock);
133 } else {
134 ret = atomic_exchange(&statsdLoggerWrite.sock, sock);
135 if ((ret >= 0) && (ret != sock)) {
136 close(ret);
137 }
138 ret = 0;
139 }
140 }
141 }
142
143 return ret;
144 }
145
__statsdClose(int negative_errno)146 static void __statsdClose(int negative_errno) {
147 int sock = atomic_exchange(&statsdLoggerWrite.sock, negative_errno);
148 if (sock >= 0) {
149 close(sock);
150 }
151 }
152
statsdClose()153 static void statsdClose() {
154 __statsdClose(-EBADF);
155 }
156
statsdAvailable()157 static int statsdAvailable() {
158 if (atomic_load(&statsdLoggerWrite.sock) < 0) {
159 if (access("/dev/socket/statsdw", W_OK) == 0) {
160 return 0;
161 }
162 return -EBADF;
163 }
164 return 1;
165 }
166
statsdNoteDrop(int error,int tag)167 static void statsdNoteDrop(int error, int tag) {
168 atomic_fetch_add_explicit(&dropped, 1, memory_order_relaxed);
169 atomic_exchange_explicit(&log_error, error, memory_order_relaxed);
170 atomic_exchange_explicit(&atom_tag, tag, memory_order_relaxed);
171
172 StatsSocketLossReporter::getInstance().noteDrop(error, tag);
173 }
174
statsdIsClosed()175 static int statsdIsClosed() {
176 if (atomic_load(&statsdLoggerWrite.sock) < 0) {
177 return 1;
178 }
179 return 0;
180 }
181
statsdWrite(struct timespec * ts,struct iovec * vec,size_t nr)182 static int statsdWrite(struct timespec* ts, struct iovec* vec, size_t nr) {
183 ssize_t ret;
184 int sock;
185 static const unsigned headerLength = 1;
186 struct iovec newVec[nr + headerLength];
187 android_log_header_t header;
188 size_t i, payloadSize;
189
190 sock = atomic_load(&statsdLoggerWrite.sock);
191 if (sock < 0) switch (sock) {
192 case -ENOTCONN:
193 case -ECONNREFUSED:
194 case -ENOENT:
195 break;
196 default:
197 return -EBADF;
198 }
199 /*
200 * struct {
201 * // what we provide to socket
202 * android_log_header_t header;
203 * // caller provides
204 * union {
205 * struct {
206 * char prio;
207 * char payload[];
208 * } string;
209 * struct {
210 * uint32_t tag
211 * char payload[];
212 * } binary;
213 * };
214 * };
215 */
216
217 header.tid = android::base::GetThreadId();
218 header.realtime.tv_sec = ts->tv_sec;
219 header.realtime.tv_nsec = ts->tv_nsec;
220
221 newVec[0].iov_base = (unsigned char*)&header;
222 newVec[0].iov_len = sizeof(header);
223
224 // If we dropped events before, try to tell statsd.
225 if (sock >= 0) {
226 int32_t snapshot = atomic_exchange_explicit(&dropped, 0, memory_order_relaxed);
227 if (snapshot) {
228 android_log_event_long_t buffer;
229 header.id = LOG_ID_STATS;
230 // store the last log error in the tag field. This tag field is not used by statsd.
231 buffer.header.tag = atomic_load(&log_error);
232 buffer.payload.type = EVENT_TYPE_LONG;
233 // format:
234 // |atom_tag|dropped_count|
235 int64_t composed_long = atomic_load(&atom_tag);
236 // Send 2 int32's via an int64.
237 composed_long = ((composed_long << 32) | ((int64_t)snapshot));
238 buffer.payload.data = composed_long;
239
240 newVec[headerLength].iov_base = &buffer;
241 newVec[headerLength].iov_len = sizeof(buffer);
242
243 ret = TEMP_FAILURE_RETRY(writev(sock, newVec, 2));
244 if (ret != (ssize_t)(sizeof(header) + sizeof(buffer))) {
245 atomic_fetch_add_explicit(&dropped, snapshot, memory_order_relaxed);
246 } else {
247 // try to send socket loss info only when socket connection established
248 // and it is proved by previous write that socket is available
249 StatsSocketLossReporter::getInstance().dumpAtomsLossStats();
250 }
251 }
252 }
253
254 header.id = LOG_ID_STATS;
255
256 for (payloadSize = 0, i = headerLength; i < nr + headerLength; i++) {
257 newVec[i].iov_base = vec[i - headerLength].iov_base;
258 payloadSize += newVec[i].iov_len = vec[i - headerLength].iov_len;
259
260 if (payloadSize > LOGGER_ENTRY_MAX_PAYLOAD) {
261 newVec[i].iov_len -= payloadSize - LOGGER_ENTRY_MAX_PAYLOAD;
262 if (newVec[i].iov_len) {
263 ++i;
264 }
265 break;
266 }
267 }
268
269 /*
270 * The write below could be lost, but will never block.
271 *
272 * ENOTCONN occurs if statsd has died.
273 * ENOENT occurs if statsd is not running and socket is missing.
274 * ECONNREFUSED occurs if we can not reconnect to statsd.
275 * EAGAIN occurs if statsd is overloaded.
276 */
277 if (sock < 0) {
278 ret = sock;
279 } else {
280 ret = TEMP_FAILURE_RETRY(writev(sock, newVec, i));
281 if (ret < 0) {
282 ret = -errno;
283 }
284 }
285 switch (ret) {
286 case -ENOTCONN:
287 case -ECONNREFUSED:
288 case -ENOENT:
289 if (statd_writer_trylock()) {
290 return ret; /* in a signal handler? try again when less stressed
291 */
292 }
293 __statsdClose(ret);
294 ret = statsdOpen();
295 statsd_writer_init_unlock();
296
297 if (ret < 0) {
298 return ret;
299 }
300
301 ret = TEMP_FAILURE_RETRY(writev(atomic_load(&statsdLoggerWrite.sock), newVec, i));
302 if (ret < 0) {
303 ret = -errno;
304 }
305 break;
306 default:
307 break;
308 }
309
310 if (ret > (ssize_t)sizeof(header)) {
311 ret -= sizeof(header);
312 }
313
314 return ret;
315 }
316