1 /*
2  * Copyright (C) 2016 The Android Open Source Project
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *      http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 #define DEBUG false
17 #include "Log.h"
18 
19 #include "FdBuffer.h"
20 #include "incidentd_util.h"
21 
22 #include <log/log.h>
23 #include <utils/SystemClock.h>
24 
25 #include <fcntl.h>
26 #include <poll.h>
27 #include <unistd.h>
28 #include <wait.h>
29 
30 namespace android {
31 namespace os {
32 namespace incidentd {
33 
34 const ssize_t BUFFER_SIZE = 16 * 1024;  // 16 KB
35 const ssize_t MAX_BUFFER_SIZE = 96 * 1024 * 1024;  // 96 MB
36 
FdBuffer()37 FdBuffer::FdBuffer(): FdBuffer(get_buffer_from_pool(), /* isBufferPooled= */ true)  {
38 }
39 
FdBuffer(sp<EncodedBuffer> buffer,bool isBufferPooled)40 FdBuffer::FdBuffer(sp<EncodedBuffer> buffer, bool isBufferPooled)
41         :mBuffer(buffer),
42          mStartTime(-1),
43          mFinishTime(-1),
44          mTimedOut(false),
45          mTruncated(false),
46          mIsBufferPooled(isBufferPooled) {
47 }
48 
~FdBuffer()49 FdBuffer::~FdBuffer() {
50     if (mIsBufferPooled) {
51         return_buffer_to_pool(mBuffer);
52     }
53 }
54 
read(int fd,int64_t timeout)55 status_t FdBuffer::read(int fd, int64_t timeout) {
56     struct pollfd pfds = {.fd = fd, .events = POLLIN};
57     mStartTime = uptimeMillis();
58 
59     fcntl(fd, F_SETFL, fcntl(fd, F_GETFL, 0) | O_NONBLOCK);
60 
61     while (true) {
62         if (mBuffer->size() >= MAX_BUFFER_SIZE) {
63             mTruncated = true;
64             VLOG("Truncating data");
65             break;
66         }
67         if (mBuffer->writeBuffer() == NULL) {
68             VLOG("No memory");
69             return NO_MEMORY;
70         }
71 
72         int64_t remainingTime = (mStartTime + timeout) - uptimeMillis();
73         if (remainingTime <= 0) {
74             VLOG("timed out due to long read");
75             mTimedOut = true;
76             break;
77         }
78 
79         int count = TEMP_FAILURE_RETRY(poll(&pfds, 1, remainingTime));
80         if (count == 0) {
81             VLOG("timed out due to block calling poll");
82             mTimedOut = true;
83             break;
84         } else if (count < 0) {
85             VLOG("poll failed: %s", strerror(errno));
86             return -errno;
87         } else {
88             if ((pfds.revents & POLLERR) != 0) {
89                 VLOG("return event has error %s", strerror(errno));
90                 return errno != 0 ? -errno : UNKNOWN_ERROR;
91             } else {
92                 ssize_t amt = TEMP_FAILURE_RETRY(
93                         ::read(fd, mBuffer->writeBuffer(), mBuffer->currentToWrite()));
94                 if (amt < 0) {
95                     if (errno == EAGAIN || errno == EWOULDBLOCK) {
96                         continue;
97                     } else {
98                         VLOG("Fail to read %d: %s", fd, strerror(errno));
99                         return -errno;
100                     }
101                 } else if (amt == 0) {
102                     VLOG("Reached EOF of fd=%d", fd);
103                     break;
104                 }
105                 mBuffer->wp()->move(amt);
106             }
107         }
108     }
109     mFinishTime = uptimeMillis();
110     return NO_ERROR;
111 }
112 
readFully(int fd)113 status_t FdBuffer::readFully(int fd) {
114     mStartTime = uptimeMillis();
115 
116     while (true) {
117         if (mBuffer->size() >= MAX_BUFFER_SIZE) {
118             // Don't let it get too big.
119             mTruncated = true;
120             VLOG("Truncating data");
121             break;
122         }
123         if (mBuffer->writeBuffer() == NULL) {
124             VLOG("No memory");
125             return NO_MEMORY;
126         }
127 
128         ssize_t amt =
129                 TEMP_FAILURE_RETRY(::read(fd, mBuffer->writeBuffer(), mBuffer->currentToWrite()));
130         if (amt < 0) {
131             VLOG("Fail to read %d: %s", fd, strerror(errno));
132             return -errno;
133         } else if (amt == 0) {
134             VLOG("Done reading %zu bytes", mBuffer->size());
135             // We're done.
136             break;
137         }
138         mBuffer->wp()->move(amt);
139     }
140 
141     mFinishTime = uptimeMillis();
142     return NO_ERROR;
143 }
144 
readProcessedDataInStream(int fd,unique_fd toFd,unique_fd fromFd,int64_t timeoutMs,const bool isSysfs)145 status_t FdBuffer::readProcessedDataInStream(int fd, unique_fd toFd, unique_fd fromFd,
146                                              int64_t timeoutMs, const bool isSysfs) {
147     struct pollfd pfds[] = {
148             {.fd = fd, .events = POLLIN},
149             {.fd = toFd.get(), .events = POLLOUT},
150             {.fd = fromFd.get(), .events = POLLIN},
151     };
152 
153     mStartTime = uptimeMillis();
154 
155     // mark all fds non blocking
156     fcntl(fd, F_SETFL, fcntl(fd, F_GETFL, 0) | O_NONBLOCK);
157     fcntl(toFd.get(), F_SETFL, fcntl(toFd.get(), F_GETFL, 0) | O_NONBLOCK);
158     fcntl(fromFd.get(), F_SETFL, fcntl(fromFd.get(), F_GETFL, 0) | O_NONBLOCK);
159 
160     // A circular buffer holds data read from fd and writes to parsing process
161     uint8_t cirBuf[BUFFER_SIZE];
162     size_t cirSize = 0;
163     int rpos = 0, wpos = 0;
164 
165     // This is the buffer used to store processed data
166     while (true) {
167         if (mBuffer->size() >= MAX_BUFFER_SIZE) {
168             VLOG("Truncating data");
169             mTruncated = true;
170             break;
171         }
172         if (mBuffer->writeBuffer() == NULL) {
173             VLOG("No memory");
174             return NO_MEMORY;
175         }
176 
177         int64_t remainingTime = (mStartTime + timeoutMs) - uptimeMillis();
178         if (remainingTime <= 0) {
179             VLOG("timed out due to long read");
180             mTimedOut = true;
181             break;
182         }
183 
184         // wait for any pfds to be ready to perform IO
185         int count = TEMP_FAILURE_RETRY(poll(pfds, 3, remainingTime));
186         if (count == 0) {
187             VLOG("timed out due to block calling poll");
188             mTimedOut = true;
189             break;
190         } else if (count < 0) {
191             VLOG("Fail to poll: %s", strerror(errno));
192             return -errno;
193         }
194 
195         // make sure no errors occur on any fds
196         for (int i = 0; i < 3; ++i) {
197             if ((pfds[i].revents & POLLERR) != 0) {
198                 if (i == 0 && isSysfs) {
199                     VLOG("fd %d is sysfs, ignore its POLLERR return value", fd);
200                     continue;
201                 }
202                 VLOG("fd[%d]=%d returns error events: %s", i, fd, strerror(errno));
203                 return errno != 0 ? -errno : UNKNOWN_ERROR;
204             }
205         }
206 
207         // read from fd
208         if (cirSize != BUFFER_SIZE && pfds[0].fd != -1) {
209             ssize_t amt;
210             if (rpos >= wpos) {
211                 amt = TEMP_FAILURE_RETRY(::read(fd, cirBuf + rpos, BUFFER_SIZE - rpos));
212             } else {
213                 amt = TEMP_FAILURE_RETRY(::read(fd, cirBuf + rpos, wpos - rpos));
214             }
215             if (amt < 0) {
216                 if (!(errno == EAGAIN || errno == EWOULDBLOCK)) {
217                     VLOG("Fail to read fd %d: %s", fd, strerror(errno));
218                     return -errno;
219                 }  // otherwise just continue
220             } else if (amt == 0) {
221                 VLOG("Reached EOF of input file %d", fd);
222                 pfds[0].fd = -1;  // reach EOF so don't have to poll pfds[0].
223             } else {
224                 rpos += amt;
225                 cirSize += amt;
226             }
227         }
228 
229         // write to parsing process
230         if (cirSize > 0 && pfds[1].fd != -1) {
231             ssize_t amt;
232             if (rpos > wpos) {
233                 amt = TEMP_FAILURE_RETRY(::write(toFd.get(), cirBuf + wpos, rpos - wpos));
234             } else {
235                 amt = TEMP_FAILURE_RETRY(::write(toFd.get(), cirBuf + wpos, BUFFER_SIZE - wpos));
236             }
237             if (amt < 0) {
238                 if (!(errno == EAGAIN || errno == EWOULDBLOCK)) {
239                     VLOG("Fail to write toFd %d: %s", toFd.get(), strerror(errno));
240                     return -errno;
241                 }  // otherwise just continue
242             } else {
243                 wpos += amt;
244                 cirSize -= amt;
245             }
246         }
247 
248         // if buffer is empty and fd is closed, close write fd.
249         if (cirSize == 0 && pfds[0].fd == -1 && pfds[1].fd != -1) {
250             VLOG("Close write pipe %d", toFd.get());
251             toFd.reset();
252             pfds[1].fd = -1;
253         }
254 
255         // circular buffer, reset rpos and wpos
256         if (rpos >= BUFFER_SIZE) {
257             rpos = 0;
258         }
259         if (wpos >= BUFFER_SIZE) {
260             wpos = 0;
261         }
262 
263         // read from parsing process
264         ssize_t amt = TEMP_FAILURE_RETRY(
265                 ::read(fromFd.get(), mBuffer->writeBuffer(), mBuffer->currentToWrite()));
266         if (amt < 0) {
267             if (!(errno == EAGAIN || errno == EWOULDBLOCK)) {
268                 VLOG("Fail to read fromFd %d: %s", fromFd.get(), strerror(errno));
269                 return -errno;
270             }  // otherwise just continue
271         } else if (amt == 0) {
272             VLOG("Reached EOF of fromFd %d", fromFd.get());
273             break;
274         } else {
275             mBuffer->wp()->move(amt);
276         }
277     }
278 
279     mFinishTime = uptimeMillis();
280     return NO_ERROR;
281 }
282 
write(uint8_t const * buf,size_t size)283 status_t FdBuffer::write(uint8_t const* buf, size_t size) {
284     return mBuffer->writeRaw(buf, size);
285 }
286 
write(const sp<ProtoReader> & reader)287 status_t FdBuffer::write(const sp<ProtoReader>& reader) {
288     return mBuffer->writeRaw(reader);
289 }
290 
write(const sp<ProtoReader> & reader,size_t size)291 status_t FdBuffer::write(const sp<ProtoReader>& reader, size_t size) {
292     return mBuffer->writeRaw(reader, size);
293 }
294 
size() const295 size_t FdBuffer::size() const {
296     return mBuffer->size();
297 }
298 
data() const299 sp<EncodedBuffer> FdBuffer::data() const {
300     return mBuffer;
301 }
302 
303 }  // namespace incidentd
304 }  // namespace os
305 }  // namespace android
306