1 /*
2 * Copyright (C) 2016 The Android Open Source Project
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16 #define DEBUG false
17 #include "Log.h"
18
19 #include "FdBuffer.h"
20
21 #include <cutils/log.h>
22 #include <utils/SystemClock.h>
23
24 #include <fcntl.h>
25 #include <poll.h>
26 #include <unistd.h>
27 #include <wait.h>
28
29 namespace android {
30 namespace os {
31 namespace incidentd {
32
33 const ssize_t BUFFER_SIZE = 16 * 1024; // 16 KB
34 const ssize_t MAX_BUFFER_COUNT = 256; // 4 MB max
35
FdBuffer()36 FdBuffer::FdBuffer()
37 : mBuffer(BUFFER_SIZE), mStartTime(-1), mFinishTime(-1), mTimedOut(false), mTruncated(false) {}
38
~FdBuffer()39 FdBuffer::~FdBuffer() {}
40
read(int fd,int64_t timeout)41 status_t FdBuffer::read(int fd, int64_t timeout) {
42 struct pollfd pfds = {.fd = fd, .events = POLLIN};
43 mStartTime = uptimeMillis();
44
45 fcntl(fd, F_SETFL, fcntl(fd, F_GETFL, 0) | O_NONBLOCK);
46
47 while (true) {
48 if (mBuffer.size() >= MAX_BUFFER_COUNT * BUFFER_SIZE) {
49 mTruncated = true;
50 break;
51 }
52 if (mBuffer.writeBuffer() == NULL) return NO_MEMORY;
53
54 int64_t remainingTime = (mStartTime + timeout) - uptimeMillis();
55 if (remainingTime <= 0) {
56 VLOG("timed out due to long read");
57 mTimedOut = true;
58 break;
59 }
60
61 int count = poll(&pfds, 1, remainingTime);
62 if (count == 0) {
63 VLOG("timed out due to block calling poll");
64 mTimedOut = true;
65 break;
66 } else if (count < 0) {
67 VLOG("poll failed: %s", strerror(errno));
68 return -errno;
69 } else {
70 if ((pfds.revents & POLLERR) != 0) {
71 VLOG("return event has error %s", strerror(errno));
72 return errno != 0 ? -errno : UNKNOWN_ERROR;
73 } else {
74 ssize_t amt = ::read(fd, mBuffer.writeBuffer(), mBuffer.currentToWrite());
75 if (amt < 0) {
76 if (errno == EAGAIN || errno == EWOULDBLOCK) {
77 continue;
78 } else {
79 VLOG("Fail to read %d: %s", fd, strerror(errno));
80 return -errno;
81 }
82 } else if (amt == 0) {
83 VLOG("Reached EOF of fd=%d", fd);
84 break;
85 }
86 mBuffer.wp()->move(amt);
87 }
88 }
89 }
90 mFinishTime = uptimeMillis();
91 return NO_ERROR;
92 }
93
readFully(int fd)94 status_t FdBuffer::readFully(int fd) {
95 mStartTime = uptimeMillis();
96
97 while (true) {
98 if (mBuffer.size() >= MAX_BUFFER_COUNT * BUFFER_SIZE) {
99 // Don't let it get too big.
100 mTruncated = true;
101 VLOG("Truncating data");
102 break;
103 }
104 if (mBuffer.writeBuffer() == NULL) return NO_MEMORY;
105
106 ssize_t amt =
107 TEMP_FAILURE_RETRY(::read(fd, mBuffer.writeBuffer(), mBuffer.currentToWrite()));
108 if (amt < 0) {
109 VLOG("Fail to read %d: %s", fd, strerror(errno));
110 return -errno;
111 } else if (amt == 0) {
112 VLOG("Done reading %zu bytes", mBuffer.size());
113 // We're done.
114 break;
115 }
116 mBuffer.wp()->move(amt);
117 }
118
119 mFinishTime = uptimeMillis();
120 return NO_ERROR;
121 }
122
readProcessedDataInStream(int fd,unique_fd toFd,unique_fd fromFd,int64_t timeoutMs,const bool isSysfs)123 status_t FdBuffer::readProcessedDataInStream(int fd, unique_fd toFd, unique_fd fromFd,
124 int64_t timeoutMs, const bool isSysfs) {
125 struct pollfd pfds[] = {
126 {.fd = fd, .events = POLLIN},
127 {.fd = toFd.get(), .events = POLLOUT},
128 {.fd = fromFd.get(), .events = POLLIN},
129 };
130
131 mStartTime = uptimeMillis();
132
133 // mark all fds non blocking
134 fcntl(fd, F_SETFL, fcntl(fd, F_GETFL, 0) | O_NONBLOCK);
135 fcntl(toFd.get(), F_SETFL, fcntl(toFd.get(), F_GETFL, 0) | O_NONBLOCK);
136 fcntl(fromFd.get(), F_SETFL, fcntl(fromFd.get(), F_GETFL, 0) | O_NONBLOCK);
137
138 // A circular buffer holds data read from fd and writes to parsing process
139 uint8_t cirBuf[BUFFER_SIZE];
140 size_t cirSize = 0;
141 int rpos = 0, wpos = 0;
142
143 // This is the buffer used to store processed data
144 while (true) {
145 if (mBuffer.size() >= MAX_BUFFER_COUNT * BUFFER_SIZE) {
146 mTruncated = true;
147 break;
148 }
149 if (mBuffer.writeBuffer() == NULL) return NO_MEMORY;
150
151 int64_t remainingTime = (mStartTime + timeoutMs) - uptimeMillis();
152 if (remainingTime <= 0) {
153 VLOG("timed out due to long read");
154 mTimedOut = true;
155 break;
156 }
157
158 // wait for any pfds to be ready to perform IO
159 int count = poll(pfds, 3, remainingTime);
160 if (count == 0) {
161 VLOG("timed out due to block calling poll");
162 mTimedOut = true;
163 break;
164 } else if (count < 0) {
165 VLOG("Fail to poll: %s", strerror(errno));
166 return -errno;
167 }
168
169 // make sure no errors occur on any fds
170 for (int i = 0; i < 3; ++i) {
171 if ((pfds[i].revents & POLLERR) != 0) {
172 if (i == 0 && isSysfs) {
173 VLOG("fd %d is sysfs, ignore its POLLERR return value", fd);
174 continue;
175 }
176 VLOG("fd[%d]=%d returns error events: %s", i, fd, strerror(errno));
177 return errno != 0 ? -errno : UNKNOWN_ERROR;
178 }
179 }
180
181 // read from fd
182 if (cirSize != BUFFER_SIZE && pfds[0].fd != -1) {
183 ssize_t amt;
184 if (rpos >= wpos) {
185 amt = ::read(fd, cirBuf + rpos, BUFFER_SIZE - rpos);
186 } else {
187 amt = ::read(fd, cirBuf + rpos, wpos - rpos);
188 }
189 if (amt < 0) {
190 if (!(errno == EAGAIN || errno == EWOULDBLOCK)) {
191 VLOG("Fail to read fd %d: %s", fd, strerror(errno));
192 return -errno;
193 } // otherwise just continue
194 } else if (amt == 0) {
195 VLOG("Reached EOF of input file %d", fd);
196 pfds[0].fd = -1; // reach EOF so don't have to poll pfds[0].
197 } else {
198 rpos += amt;
199 cirSize += amt;
200 }
201 }
202
203 // write to parsing process
204 if (cirSize > 0 && pfds[1].fd != -1) {
205 ssize_t amt;
206 if (rpos > wpos) {
207 amt = ::write(toFd.get(), cirBuf + wpos, rpos - wpos);
208 } else {
209 amt = ::write(toFd.get(), cirBuf + wpos, BUFFER_SIZE - wpos);
210 }
211 if (amt < 0) {
212 if (!(errno == EAGAIN || errno == EWOULDBLOCK)) {
213 VLOG("Fail to write toFd %d: %s", toFd.get(), strerror(errno));
214 return -errno;
215 } // otherwise just continue
216 } else {
217 wpos += amt;
218 cirSize -= amt;
219 }
220 }
221
222 // if buffer is empty and fd is closed, close write fd.
223 if (cirSize == 0 && pfds[0].fd == -1 && pfds[1].fd != -1) {
224 VLOG("Close write pipe %d", toFd.get());
225 toFd.reset();
226 pfds[1].fd = -1;
227 }
228
229 // circular buffer, reset rpos and wpos
230 if (rpos >= BUFFER_SIZE) {
231 rpos = 0;
232 }
233 if (wpos >= BUFFER_SIZE) {
234 wpos = 0;
235 }
236
237 // read from parsing process
238 ssize_t amt = ::read(fromFd.get(), mBuffer.writeBuffer(), mBuffer.currentToWrite());
239 if (amt < 0) {
240 if (!(errno == EAGAIN || errno == EWOULDBLOCK)) {
241 VLOG("Fail to read fromFd %d: %s", fromFd.get(), strerror(errno));
242 return -errno;
243 } // otherwise just continue
244 } else if (amt == 0) {
245 VLOG("Reached EOF of fromFd %d", fromFd.get());
246 break;
247 } else {
248 mBuffer.wp()->move(amt);
249 }
250 }
251
252 mFinishTime = uptimeMillis();
253 return NO_ERROR;
254 }
255
size() const256 size_t FdBuffer::size() const { return mBuffer.size(); }
257
data() const258 EncodedBuffer::iterator FdBuffer::data() const { return mBuffer.begin(); }
259
260 } // namespace incidentd
261 } // namespace os
262 } // namespace android
263