1 /*
2  * Copyright (C) 2016 The Android Open Source Project
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *      http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 #define DEBUG false
17 #include "Log.h"
18 
19 #include "FdBuffer.h"
20 
21 #include <cutils/log.h>
22 #include <utils/SystemClock.h>
23 
24 #include <fcntl.h>
25 #include <poll.h>
26 #include <unistd.h>
27 #include <wait.h>
28 
29 namespace android {
30 namespace os {
31 namespace incidentd {
32 
33 const ssize_t BUFFER_SIZE = 16 * 1024;  // 16 KB
34 const ssize_t MAX_BUFFER_COUNT = 256;   // 4 MB max
35 
FdBuffer()36 FdBuffer::FdBuffer()
37     : mBuffer(BUFFER_SIZE), mStartTime(-1), mFinishTime(-1), mTimedOut(false), mTruncated(false) {}
38 
~FdBuffer()39 FdBuffer::~FdBuffer() {}
40 
read(int fd,int64_t timeout)41 status_t FdBuffer::read(int fd, int64_t timeout) {
42     struct pollfd pfds = {.fd = fd, .events = POLLIN};
43     mStartTime = uptimeMillis();
44 
45     fcntl(fd, F_SETFL, fcntl(fd, F_GETFL, 0) | O_NONBLOCK);
46 
47     while (true) {
48         if (mBuffer.size() >= MAX_BUFFER_COUNT * BUFFER_SIZE) {
49             mTruncated = true;
50             break;
51         }
52         if (mBuffer.writeBuffer() == NULL) return NO_MEMORY;
53 
54         int64_t remainingTime = (mStartTime + timeout) - uptimeMillis();
55         if (remainingTime <= 0) {
56             VLOG("timed out due to long read");
57             mTimedOut = true;
58             break;
59         }
60 
61         int count = poll(&pfds, 1, remainingTime);
62         if (count == 0) {
63             VLOG("timed out due to block calling poll");
64             mTimedOut = true;
65             break;
66         } else if (count < 0) {
67             VLOG("poll failed: %s", strerror(errno));
68             return -errno;
69         } else {
70             if ((pfds.revents & POLLERR) != 0) {
71                 VLOG("return event has error %s", strerror(errno));
72                 return errno != 0 ? -errno : UNKNOWN_ERROR;
73             } else {
74                 ssize_t amt = ::read(fd, mBuffer.writeBuffer(), mBuffer.currentToWrite());
75                 if (amt < 0) {
76                     if (errno == EAGAIN || errno == EWOULDBLOCK) {
77                         continue;
78                     } else {
79                         VLOG("Fail to read %d: %s", fd, strerror(errno));
80                         return -errno;
81                     }
82                 } else if (amt == 0) {
83                     VLOG("Reached EOF of fd=%d", fd);
84                     break;
85                 }
86                 mBuffer.wp()->move(amt);
87             }
88         }
89     }
90     mFinishTime = uptimeMillis();
91     return NO_ERROR;
92 }
93 
readFully(int fd)94 status_t FdBuffer::readFully(int fd) {
95     mStartTime = uptimeMillis();
96 
97     while (true) {
98         if (mBuffer.size() >= MAX_BUFFER_COUNT * BUFFER_SIZE) {
99             // Don't let it get too big.
100             mTruncated = true;
101             VLOG("Truncating data");
102             break;
103         }
104         if (mBuffer.writeBuffer() == NULL) return NO_MEMORY;
105 
106         ssize_t amt =
107                 TEMP_FAILURE_RETRY(::read(fd, mBuffer.writeBuffer(), mBuffer.currentToWrite()));
108         if (amt < 0) {
109             VLOG("Fail to read %d: %s", fd, strerror(errno));
110             return -errno;
111         } else if (amt == 0) {
112             VLOG("Done reading %zu bytes", mBuffer.size());
113             // We're done.
114             break;
115         }
116         mBuffer.wp()->move(amt);
117     }
118 
119     mFinishTime = uptimeMillis();
120     return NO_ERROR;
121 }
122 
readProcessedDataInStream(int fd,unique_fd toFd,unique_fd fromFd,int64_t timeoutMs,const bool isSysfs)123 status_t FdBuffer::readProcessedDataInStream(int fd, unique_fd toFd, unique_fd fromFd,
124                                              int64_t timeoutMs, const bool isSysfs) {
125     struct pollfd pfds[] = {
126             {.fd = fd, .events = POLLIN},
127             {.fd = toFd.get(), .events = POLLOUT},
128             {.fd = fromFd.get(), .events = POLLIN},
129     };
130 
131     mStartTime = uptimeMillis();
132 
133     // mark all fds non blocking
134     fcntl(fd, F_SETFL, fcntl(fd, F_GETFL, 0) | O_NONBLOCK);
135     fcntl(toFd.get(), F_SETFL, fcntl(toFd.get(), F_GETFL, 0) | O_NONBLOCK);
136     fcntl(fromFd.get(), F_SETFL, fcntl(fromFd.get(), F_GETFL, 0) | O_NONBLOCK);
137 
138     // A circular buffer holds data read from fd and writes to parsing process
139     uint8_t cirBuf[BUFFER_SIZE];
140     size_t cirSize = 0;
141     int rpos = 0, wpos = 0;
142 
143     // This is the buffer used to store processed data
144     while (true) {
145         if (mBuffer.size() >= MAX_BUFFER_COUNT * BUFFER_SIZE) {
146             mTruncated = true;
147             break;
148         }
149         if (mBuffer.writeBuffer() == NULL) return NO_MEMORY;
150 
151         int64_t remainingTime = (mStartTime + timeoutMs) - uptimeMillis();
152         if (remainingTime <= 0) {
153             VLOG("timed out due to long read");
154             mTimedOut = true;
155             break;
156         }
157 
158         // wait for any pfds to be ready to perform IO
159         int count = poll(pfds, 3, remainingTime);
160         if (count == 0) {
161             VLOG("timed out due to block calling poll");
162             mTimedOut = true;
163             break;
164         } else if (count < 0) {
165             VLOG("Fail to poll: %s", strerror(errno));
166             return -errno;
167         }
168 
169         // make sure no errors occur on any fds
170         for (int i = 0; i < 3; ++i) {
171             if ((pfds[i].revents & POLLERR) != 0) {
172                 if (i == 0 && isSysfs) {
173                     VLOG("fd %d is sysfs, ignore its POLLERR return value", fd);
174                     continue;
175                 }
176                 VLOG("fd[%d]=%d returns error events: %s", i, fd, strerror(errno));
177                 return errno != 0 ? -errno : UNKNOWN_ERROR;
178             }
179         }
180 
181         // read from fd
182         if (cirSize != BUFFER_SIZE && pfds[0].fd != -1) {
183             ssize_t amt;
184             if (rpos >= wpos) {
185                 amt = ::read(fd, cirBuf + rpos, BUFFER_SIZE - rpos);
186             } else {
187                 amt = ::read(fd, cirBuf + rpos, wpos - rpos);
188             }
189             if (amt < 0) {
190                 if (!(errno == EAGAIN || errno == EWOULDBLOCK)) {
191                     VLOG("Fail to read fd %d: %s", fd, strerror(errno));
192                     return -errno;
193                 }  // otherwise just continue
194             } else if (amt == 0) {
195                 VLOG("Reached EOF of input file %d", fd);
196                 pfds[0].fd = -1;  // reach EOF so don't have to poll pfds[0].
197             } else {
198                 rpos += amt;
199                 cirSize += amt;
200             }
201         }
202 
203         // write to parsing process
204         if (cirSize > 0 && pfds[1].fd != -1) {
205             ssize_t amt;
206             if (rpos > wpos) {
207                 amt = ::write(toFd.get(), cirBuf + wpos, rpos - wpos);
208             } else {
209                 amt = ::write(toFd.get(), cirBuf + wpos, BUFFER_SIZE - wpos);
210             }
211             if (amt < 0) {
212                 if (!(errno == EAGAIN || errno == EWOULDBLOCK)) {
213                     VLOG("Fail to write toFd %d: %s", toFd.get(), strerror(errno));
214                     return -errno;
215                 }  // otherwise just continue
216             } else {
217                 wpos += amt;
218                 cirSize -= amt;
219             }
220         }
221 
222         // if buffer is empty and fd is closed, close write fd.
223         if (cirSize == 0 && pfds[0].fd == -1 && pfds[1].fd != -1) {
224             VLOG("Close write pipe %d", toFd.get());
225             toFd.reset();
226             pfds[1].fd = -1;
227         }
228 
229         // circular buffer, reset rpos and wpos
230         if (rpos >= BUFFER_SIZE) {
231             rpos = 0;
232         }
233         if (wpos >= BUFFER_SIZE) {
234             wpos = 0;
235         }
236 
237         // read from parsing process
238         ssize_t amt = ::read(fromFd.get(), mBuffer.writeBuffer(), mBuffer.currentToWrite());
239         if (amt < 0) {
240             if (!(errno == EAGAIN || errno == EWOULDBLOCK)) {
241                 VLOG("Fail to read fromFd %d: %s", fromFd.get(), strerror(errno));
242                 return -errno;
243             }  // otherwise just continue
244         } else if (amt == 0) {
245             VLOG("Reached EOF of fromFd %d", fromFd.get());
246             break;
247         } else {
248             mBuffer.wp()->move(amt);
249         }
250     }
251 
252     mFinishTime = uptimeMillis();
253     return NO_ERROR;
254 }
255 
size() const256 size_t FdBuffer::size() const { return mBuffer.size(); }
257 
data() const258 EncodedBuffer::iterator FdBuffer::data() const { return mBuffer.begin(); }
259 
260 }  // namespace incidentd
261 }  // namespace os
262 }  // namespace android
263