1 /* 2 * Copyright (C) 2018, The Android Open Source Project 3 * 4 * Licensed under the Apache License, Version 2.0 (the "License"); 5 * you may not use this file except in compliance with the License. 6 * You may obtain a copy of the License at 7 * 8 * http://www.apache.org/licenses/LICENSE-2.0 9 * 10 * Unless required by applicable law or agreed to in writing, software 11 * distributed under the License is distributed on an "AS IS" BASIS, 12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13 * See the License for the specific language governing permissions and 14 * limitations under the License. 15 */ 16 #include "statsd_writer.h" 17 18 #include <errno.h> 19 #include <fcntl.h> 20 #include <inttypes.h> 21 #include <poll.h> 22 #include <private/android_logger.h> 23 #include <stdarg.h> 24 #include <stdatomic.h> 25 #include <stdio.h> 26 #include <stdlib.h> 27 #include <string.h> 28 #include <sys/stat.h> 29 #include <sys/types.h> 30 #include <sys/uio.h> 31 #include <sys/un.h> 32 #include <time.h> 33 #include <unistd.h> 34 35 36 // Compatibility shims for glibc-2.17 in the Android tree. 37 #ifndef __BIONIC__ 38 39 // gettid() is not present in unistd.h for glibc-2.17. 40 extern pid_t gettid(); 41 42 // TEMP_FAILURE_RETRY is not present in unistd.h for glibc-2.17. 43 #ifndef TEMP_FAILURE_RETRY 44 #define TEMP_FAILURE_RETRY(exp) ({ \ 45 __typeof__(exp) _rc; \ 46 do { \ 47 _rc = (exp); \ 48 } while (_rc == -1 && errno == EINTR); \ 49 _rc; }) 50 #endif // TEMP_FAILURE_RETRY 51 52 #endif // __BIONIC__ 53 54 static pthread_mutex_t log_init_lock = PTHREAD_MUTEX_INITIALIZER; 55 static atomic_int dropped = 0; 56 static atomic_int log_error = 0; 57 static atomic_int atom_tag = 0; 58 59 void statsd_writer_init_lock() { 60 /* 61 * If we trigger a signal handler in the middle of locked activity and the 62 * signal handler logs a message, we could get into a deadlock state. 63 */ 64 pthread_mutex_lock(&log_init_lock); 65 } 66 67 int statd_writer_trylock() { 68 return pthread_mutex_trylock(&log_init_lock); 69 } 70 71 void statsd_writer_init_unlock() { 72 pthread_mutex_unlock(&log_init_lock); 73 } 74 75 static int statsdAvailable(); 76 static int statsdOpen(); 77 static void statsdClose(); 78 static int statsdWrite(struct timespec* ts, struct iovec* vec, size_t nr); 79 static void statsdNoteDrop(); 80 static int statsdIsClosed(); 81 82 struct android_log_transport_write statsdLoggerWrite = { 83 .name = "statsd", 84 .sock = -EBADF, 85 .available = statsdAvailable, 86 .open = statsdOpen, 87 .close = statsdClose, 88 .write = statsdWrite, 89 .noteDrop = statsdNoteDrop, 90 .isClosed = statsdIsClosed, 91 }; 92 93 /* log_init_lock assumed */ 94 static int statsdOpen() { 95 int i, ret = 0; 96 97 i = atomic_load(&statsdLoggerWrite.sock); 98 if (i < 0) { 99 int flags = SOCK_DGRAM; 100 #ifdef SOCK_CLOEXEC 101 flags |= SOCK_CLOEXEC; 102 #endif 103 #ifdef SOCK_NONBLOCK 104 flags |= SOCK_NONBLOCK; 105 #endif 106 int sock = TEMP_FAILURE_RETRY(socket(PF_UNIX, flags, 0)); 107 if (sock < 0) { 108 ret = -errno; 109 } else { 110 int sndbuf = 1 * 1024 * 1024; // set max send buffer size 1MB 111 socklen_t bufLen = sizeof(sndbuf); 112 // SO_RCVBUF does not have an effect on unix domain socket, but SO_SNDBUF does. 113 // Proceed to connect even setsockopt fails. 114 setsockopt(sock, SOL_SOCKET, SO_SNDBUF, &sndbuf, bufLen); 115 struct sockaddr_un un; 116 memset(&un, 0, sizeof(struct sockaddr_un)); 117 un.sun_family = AF_UNIX; 118 strcpy(un.sun_path, "/dev/socket/statsdw"); 119 120 if (TEMP_FAILURE_RETRY( 121 connect(sock, (struct sockaddr*)&un, sizeof(struct sockaddr_un))) < 0) { 122 ret = -errno; 123 switch (ret) { 124 case -ENOTCONN: 125 case -ECONNREFUSED: 126 case -ENOENT: 127 i = atomic_exchange(&statsdLoggerWrite.sock, ret); 128 /* FALLTHRU */ 129 default: 130 break; 131 } 132 close(sock); 133 } else { 134 ret = atomic_exchange(&statsdLoggerWrite.sock, sock); 135 if ((ret >= 0) && (ret != sock)) { 136 close(ret); 137 } 138 ret = 0; 139 } 140 } 141 } 142 143 return ret; 144 } 145 146 static void __statsdClose(int negative_errno) { 147 int sock = atomic_exchange(&statsdLoggerWrite.sock, negative_errno); 148 if (sock >= 0) { 149 close(sock); 150 } 151 } 152 153 static void statsdClose() { 154 __statsdClose(-EBADF); 155 } 156 157 static int statsdAvailable() { 158 if (atomic_load(&statsdLoggerWrite.sock) < 0) { 159 if (access("/dev/socket/statsdw", W_OK) == 0) { 160 return 0; 161 } 162 return -EBADF; 163 } 164 return 1; 165 } 166 167 static void statsdNoteDrop(int error, int tag) { 168 atomic_fetch_add_explicit(&dropped, 1, memory_order_relaxed); 169 atomic_exchange_explicit(&log_error, error, memory_order_relaxed); 170 atomic_exchange_explicit(&atom_tag, tag, memory_order_relaxed); 171 } 172 173 static int statsdIsClosed() { 174 if (atomic_load(&statsdLoggerWrite.sock) < 0) { 175 return 1; 176 } 177 return 0; 178 } 179 180 static int statsdWrite(struct timespec* ts, struct iovec* vec, size_t nr) { 181 ssize_t ret; 182 int sock; 183 static const unsigned headerLength = 1; 184 struct iovec newVec[nr + headerLength]; 185 android_log_header_t header; 186 size_t i, payloadSize; 187 188 sock = atomic_load(&statsdLoggerWrite.sock); 189 if (sock < 0) switch (sock) { 190 case -ENOTCONN: 191 case -ECONNREFUSED: 192 case -ENOENT: 193 break; 194 default: 195 return -EBADF; 196 } 197 /* 198 * struct { 199 * // what we provide to socket 200 * android_log_header_t header; 201 * // caller provides 202 * union { 203 * struct { 204 * char prio; 205 * char payload[]; 206 * } string; 207 * struct { 208 * uint32_t tag 209 * char payload[]; 210 * } binary; 211 * }; 212 * }; 213 */ 214 215 header.tid = gettid(); 216 header.realtime.tv_sec = ts->tv_sec; 217 header.realtime.tv_nsec = ts->tv_nsec; 218 219 newVec[0].iov_base = (unsigned char*)&header; 220 newVec[0].iov_len = sizeof(header); 221 222 // If we dropped events before, try to tell statsd. 223 if (sock >= 0) { 224 int32_t snapshot = atomic_exchange_explicit(&dropped, 0, memory_order_relaxed); 225 if (snapshot) { 226 android_log_event_long_t buffer; 227 header.id = LOG_ID_STATS; 228 // store the last log error in the tag field. This tag field is not used by statsd. 229 buffer.header.tag = atomic_load(&log_error); 230 buffer.payload.type = EVENT_TYPE_LONG; 231 // format: 232 // |atom_tag|dropped_count| 233 int64_t composed_long = atomic_load(&atom_tag); 234 // Send 2 int32's via an int64. 235 composed_long = ((composed_long << 32) | ((int64_t)snapshot)); 236 buffer.payload.data = composed_long; 237 238 newVec[headerLength].iov_base = &buffer; 239 newVec[headerLength].iov_len = sizeof(buffer); 240 241 ret = TEMP_FAILURE_RETRY(writev(sock, newVec, 2)); 242 if (ret != (ssize_t)(sizeof(header) + sizeof(buffer))) { 243 atomic_fetch_add_explicit(&dropped, snapshot, memory_order_relaxed); 244 } 245 } 246 } 247 248 header.id = LOG_ID_STATS; 249 250 for (payloadSize = 0, i = headerLength; i < nr + headerLength; i++) { 251 newVec[i].iov_base = vec[i - headerLength].iov_base; 252 payloadSize += newVec[i].iov_len = vec[i - headerLength].iov_len; 253 254 if (payloadSize > LOGGER_ENTRY_MAX_PAYLOAD) { 255 newVec[i].iov_len -= payloadSize - LOGGER_ENTRY_MAX_PAYLOAD; 256 if (newVec[i].iov_len) { 257 ++i; 258 } 259 break; 260 } 261 } 262 263 /* 264 * The write below could be lost, but will never block. 265 * 266 * ENOTCONN occurs if statsd has died. 267 * ENOENT occurs if statsd is not running and socket is missing. 268 * ECONNREFUSED occurs if we can not reconnect to statsd. 269 * EAGAIN occurs if statsd is overloaded. 270 */ 271 if (sock < 0) { 272 ret = sock; 273 } else { 274 ret = TEMP_FAILURE_RETRY(writev(sock, newVec, i)); 275 if (ret < 0) { 276 ret = -errno; 277 } 278 } 279 switch (ret) { 280 case -ENOTCONN: 281 case -ECONNREFUSED: 282 case -ENOENT: 283 if (statd_writer_trylock()) { 284 return ret; /* in a signal handler? try again when less stressed 285 */ 286 } 287 __statsdClose(ret); 288 ret = statsdOpen(); 289 statsd_writer_init_unlock(); 290 291 if (ret < 0) { 292 return ret; 293 } 294 295 ret = TEMP_FAILURE_RETRY(writev(atomic_load(&statsdLoggerWrite.sock), newVec, i)); 296 if (ret < 0) { 297 ret = -errno; 298 } 299 /* FALLTHRU */ 300 default: 301 break; 302 } 303 304 if (ret > (ssize_t)sizeof(header)) { 305 ret -= sizeof(header); 306 } 307 308 return ret; 309 } 310