1 // Protocol Buffers - Google's data interchange format
2 // Copyright 2008 Google Inc. All rights reserved.
3 // https://developers.google.com/protocol-buffers/
4 //
5 // Redistribution and use in source and binary forms, with or without
6 // modification, are permitted provided that the following conditions are
7 // met:
8 //
9 // * Redistributions of source code must retain the above copyright
10 // notice, this list of conditions and the following disclaimer.
11 // * Redistributions in binary form must reproduce the above
12 // copyright notice, this list of conditions and the following disclaimer
13 // in the documentation and/or other materials provided with the
14 // distribution.
15 // * Neither the name of Google Inc. nor the names of its
16 // contributors may be used to endorse or promote products derived from
17 // this software without specific prior written permission.
18 //
19 // THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
20 // "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
21 // LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
22 // A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
23 // OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
24 // SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
25 // LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
26 // DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
27 // THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
28 // (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
29 // OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
30
31 // Author: kenton@google.com (Kenton Varda)
32 // Based on original Protocol Buffers design by
33 // Sanjay Ghemawat, Jeff Dean, and others.
34
35 #ifdef _MSC_VER
36 #include <io.h>
37 #else
38 #include <unistd.h>
39 #include <sys/types.h>
40 #include <sys/stat.h>
41 #include <fcntl.h>
42 #endif
43 #include <errno.h>
44 #include <iostream>
45 #include <algorithm>
46
47 #include <google/protobuf/io/zero_copy_stream_impl.h>
48 #include <google/protobuf/stubs/common.h>
49 #include <google/protobuf/stubs/stl_util.h>
50
51
52 namespace google {
53 namespace protobuf {
54 namespace io {
55
56 #ifdef _WIN32
57 // Win32 lseek is broken: If invoked on a non-seekable file descriptor, its
58 // return value is undefined. We re-define it to always produce an error.
59 #define lseek(fd, offset, origin) ((off_t)-1)
60 #endif
61
62 namespace {
63
64 // EINTR sucks.
close_no_eintr(int fd)65 int close_no_eintr(int fd) {
66 int result;
67 do {
68 result = close(fd);
69 } while (result < 0 && errno == EINTR);
70 return result;
71 }
72
73 } // namespace
74
75
76 // ===================================================================
77
FileInputStream(int file_descriptor,int block_size)78 FileInputStream::FileInputStream(int file_descriptor, int block_size)
79 : copying_input_(file_descriptor),
80 impl_(©ing_input_, block_size) {
81 }
82
~FileInputStream()83 FileInputStream::~FileInputStream() {}
84
Close()85 bool FileInputStream::Close() {
86 return copying_input_.Close();
87 }
88
Next(const void ** data,int * size)89 bool FileInputStream::Next(const void** data, int* size) {
90 return impl_.Next(data, size);
91 }
92
BackUp(int count)93 void FileInputStream::BackUp(int count) {
94 impl_.BackUp(count);
95 }
96
Skip(int count)97 bool FileInputStream::Skip(int count) {
98 return impl_.Skip(count);
99 }
100
ByteCount() const101 int64 FileInputStream::ByteCount() const {
102 return impl_.ByteCount();
103 }
104
CopyingFileInputStream(int file_descriptor)105 FileInputStream::CopyingFileInputStream::CopyingFileInputStream(
106 int file_descriptor)
107 : file_(file_descriptor),
108 close_on_delete_(false),
109 is_closed_(false),
110 errno_(0),
111 previous_seek_failed_(false) {
112 }
113
~CopyingFileInputStream()114 FileInputStream::CopyingFileInputStream::~CopyingFileInputStream() {
115 if (close_on_delete_) {
116 if (!Close()) {
117 GOOGLE_LOG(ERROR) << "close() failed: " << strerror(errno_);
118 }
119 }
120 }
121
Close()122 bool FileInputStream::CopyingFileInputStream::Close() {
123 GOOGLE_CHECK(!is_closed_);
124
125 is_closed_ = true;
126 if (close_no_eintr(file_) != 0) {
127 // The docs on close() do not specify whether a file descriptor is still
128 // open after close() fails with EIO. However, the glibc source code
129 // seems to indicate that it is not.
130 errno_ = errno;
131 return false;
132 }
133
134 return true;
135 }
136
Read(void * buffer,int size)137 int FileInputStream::CopyingFileInputStream::Read(void* buffer, int size) {
138 GOOGLE_CHECK(!is_closed_);
139
140 int result;
141 do {
142 result = read(file_, buffer, size);
143 } while (result < 0 && errno == EINTR);
144
145 if (result < 0) {
146 // Read error (not EOF).
147 errno_ = errno;
148 }
149
150 return result;
151 }
152
Skip(int count)153 int FileInputStream::CopyingFileInputStream::Skip(int count) {
154 GOOGLE_CHECK(!is_closed_);
155
156 if (!previous_seek_failed_ &&
157 lseek(file_, count, SEEK_CUR) != (off_t)-1) {
158 // Seek succeeded.
159 return count;
160 } else {
161 // Failed to seek.
162
163 // Note to self: Don't seek again. This file descriptor doesn't
164 // support it.
165 previous_seek_failed_ = true;
166
167 // Use the default implementation.
168 return CopyingInputStream::Skip(count);
169 }
170 }
171
172 // ===================================================================
173
FileOutputStream(int file_descriptor,int block_size)174 FileOutputStream::FileOutputStream(int file_descriptor, int block_size)
175 : copying_output_(file_descriptor),
176 impl_(©ing_output_, block_size) {
177 }
178
~FileOutputStream()179 FileOutputStream::~FileOutputStream() {
180 impl_.Flush();
181 }
182
Close()183 bool FileOutputStream::Close() {
184 bool flush_succeeded = impl_.Flush();
185 return copying_output_.Close() && flush_succeeded;
186 }
187
Flush()188 bool FileOutputStream::Flush() {
189 return impl_.Flush();
190 }
191
Next(void ** data,int * size)192 bool FileOutputStream::Next(void** data, int* size) {
193 return impl_.Next(data, size);
194 }
195
BackUp(int count)196 void FileOutputStream::BackUp(int count) {
197 impl_.BackUp(count);
198 }
199
ByteCount() const200 int64 FileOutputStream::ByteCount() const {
201 return impl_.ByteCount();
202 }
203
CopyingFileOutputStream(int file_descriptor)204 FileOutputStream::CopyingFileOutputStream::CopyingFileOutputStream(
205 int file_descriptor)
206 : file_(file_descriptor),
207 close_on_delete_(false),
208 is_closed_(false),
209 errno_(0) {
210 }
211
~CopyingFileOutputStream()212 FileOutputStream::CopyingFileOutputStream::~CopyingFileOutputStream() {
213 if (close_on_delete_) {
214 if (!Close()) {
215 GOOGLE_LOG(ERROR) << "close() failed: " << strerror(errno_);
216 }
217 }
218 }
219
Close()220 bool FileOutputStream::CopyingFileOutputStream::Close() {
221 GOOGLE_CHECK(!is_closed_);
222
223 is_closed_ = true;
224 if (close_no_eintr(file_) != 0) {
225 // The docs on close() do not specify whether a file descriptor is still
226 // open after close() fails with EIO. However, the glibc source code
227 // seems to indicate that it is not.
228 errno_ = errno;
229 return false;
230 }
231
232 return true;
233 }
234
Write(const void * buffer,int size)235 bool FileOutputStream::CopyingFileOutputStream::Write(
236 const void* buffer, int size) {
237 GOOGLE_CHECK(!is_closed_);
238 int total_written = 0;
239
240 const uint8* buffer_base = reinterpret_cast<const uint8*>(buffer);
241
242 while (total_written < size) {
243 int bytes;
244 do {
245 bytes = write(file_, buffer_base + total_written, size - total_written);
246 } while (bytes < 0 && errno == EINTR);
247
248 if (bytes <= 0) {
249 // Write error.
250
251 // FIXME(kenton): According to the man page, if write() returns zero,
252 // there was no error; write() simply did not write anything. It's
253 // unclear under what circumstances this might happen, but presumably
254 // errno won't be set in this case. I am confused as to how such an
255 // event should be handled. For now I'm treating it as an error, since
256 // retrying seems like it could lead to an infinite loop. I suspect
257 // this never actually happens anyway.
258
259 if (bytes < 0) {
260 errno_ = errno;
261 }
262 return false;
263 }
264 total_written += bytes;
265 }
266
267 return true;
268 }
269
270 // ===================================================================
271
IstreamInputStream(istream * input,int block_size)272 IstreamInputStream::IstreamInputStream(istream* input, int block_size)
273 : copying_input_(input),
274 impl_(©ing_input_, block_size) {
275 }
276
~IstreamInputStream()277 IstreamInputStream::~IstreamInputStream() {}
278
Next(const void ** data,int * size)279 bool IstreamInputStream::Next(const void** data, int* size) {
280 return impl_.Next(data, size);
281 }
282
BackUp(int count)283 void IstreamInputStream::BackUp(int count) {
284 impl_.BackUp(count);
285 }
286
Skip(int count)287 bool IstreamInputStream::Skip(int count) {
288 return impl_.Skip(count);
289 }
290
ByteCount() const291 int64 IstreamInputStream::ByteCount() const {
292 return impl_.ByteCount();
293 }
294
CopyingIstreamInputStream(istream * input)295 IstreamInputStream::CopyingIstreamInputStream::CopyingIstreamInputStream(
296 istream* input)
297 : input_(input) {
298 }
299
~CopyingIstreamInputStream()300 IstreamInputStream::CopyingIstreamInputStream::~CopyingIstreamInputStream() {}
301
Read(void * buffer,int size)302 int IstreamInputStream::CopyingIstreamInputStream::Read(
303 void* buffer, int size) {
304 input_->read(reinterpret_cast<char*>(buffer), size);
305 int result = input_->gcount();
306 if (result == 0 && input_->fail() && !input_->eof()) {
307 return -1;
308 }
309 return result;
310 }
311
312 // ===================================================================
313
OstreamOutputStream(ostream * output,int block_size)314 OstreamOutputStream::OstreamOutputStream(ostream* output, int block_size)
315 : copying_output_(output),
316 impl_(©ing_output_, block_size) {
317 }
318
~OstreamOutputStream()319 OstreamOutputStream::~OstreamOutputStream() {
320 impl_.Flush();
321 }
322
Next(void ** data,int * size)323 bool OstreamOutputStream::Next(void** data, int* size) {
324 return impl_.Next(data, size);
325 }
326
BackUp(int count)327 void OstreamOutputStream::BackUp(int count) {
328 impl_.BackUp(count);
329 }
330
ByteCount() const331 int64 OstreamOutputStream::ByteCount() const {
332 return impl_.ByteCount();
333 }
334
CopyingOstreamOutputStream(ostream * output)335 OstreamOutputStream::CopyingOstreamOutputStream::CopyingOstreamOutputStream(
336 ostream* output)
337 : output_(output) {
338 }
339
~CopyingOstreamOutputStream()340 OstreamOutputStream::CopyingOstreamOutputStream::~CopyingOstreamOutputStream() {
341 }
342
Write(const void * buffer,int size)343 bool OstreamOutputStream::CopyingOstreamOutputStream::Write(
344 const void* buffer, int size) {
345 output_->write(reinterpret_cast<const char*>(buffer), size);
346 return output_->good();
347 }
348
349 // ===================================================================
350
ConcatenatingInputStream(ZeroCopyInputStream * const streams[],int count)351 ConcatenatingInputStream::ConcatenatingInputStream(
352 ZeroCopyInputStream* const streams[], int count)
353 : streams_(streams), stream_count_(count), bytes_retired_(0) {
354 }
355
~ConcatenatingInputStream()356 ConcatenatingInputStream::~ConcatenatingInputStream() {
357 }
358
Next(const void ** data,int * size)359 bool ConcatenatingInputStream::Next(const void** data, int* size) {
360 while (stream_count_ > 0) {
361 if (streams_[0]->Next(data, size)) return true;
362
363 // That stream is done. Advance to the next one.
364 bytes_retired_ += streams_[0]->ByteCount();
365 ++streams_;
366 --stream_count_;
367 }
368
369 // No more streams.
370 return false;
371 }
372
BackUp(int count)373 void ConcatenatingInputStream::BackUp(int count) {
374 if (stream_count_ > 0) {
375 streams_[0]->BackUp(count);
376 } else {
377 GOOGLE_LOG(DFATAL) << "Can't BackUp() after failed Next().";
378 }
379 }
380
Skip(int count)381 bool ConcatenatingInputStream::Skip(int count) {
382 while (stream_count_ > 0) {
383 // Assume that ByteCount() can be used to find out how much we actually
384 // skipped when Skip() fails.
385 int64 target_byte_count = streams_[0]->ByteCount() + count;
386 if (streams_[0]->Skip(count)) return true;
387
388 // Hit the end of the stream. Figure out how many more bytes we still have
389 // to skip.
390 int64 final_byte_count = streams_[0]->ByteCount();
391 GOOGLE_DCHECK_LT(final_byte_count, target_byte_count);
392 count = target_byte_count - final_byte_count;
393
394 // That stream is done. Advance to the next one.
395 bytes_retired_ += final_byte_count;
396 ++streams_;
397 --stream_count_;
398 }
399
400 return false;
401 }
402
ByteCount() const403 int64 ConcatenatingInputStream::ByteCount() const {
404 if (stream_count_ == 0) {
405 return bytes_retired_;
406 } else {
407 return bytes_retired_ + streams_[0]->ByteCount();
408 }
409 }
410
411
412 // ===================================================================
413
LimitingInputStream(ZeroCopyInputStream * input,int64 limit)414 LimitingInputStream::LimitingInputStream(ZeroCopyInputStream* input,
415 int64 limit)
416 : input_(input), limit_(limit) {
417 prior_bytes_read_ = input_->ByteCount();
418 }
419
~LimitingInputStream()420 LimitingInputStream::~LimitingInputStream() {
421 // If we overshot the limit, back up.
422 if (limit_ < 0) input_->BackUp(-limit_);
423 }
424
Next(const void ** data,int * size)425 bool LimitingInputStream::Next(const void** data, int* size) {
426 if (limit_ <= 0) return false;
427 if (!input_->Next(data, size)) return false;
428
429 limit_ -= *size;
430 if (limit_ < 0) {
431 // We overshot the limit. Reduce *size to hide the rest of the buffer.
432 *size += limit_;
433 }
434 return true;
435 }
436
BackUp(int count)437 void LimitingInputStream::BackUp(int count) {
438 if (limit_ < 0) {
439 input_->BackUp(count - limit_);
440 limit_ = count;
441 } else {
442 input_->BackUp(count);
443 limit_ += count;
444 }
445 }
446
Skip(int count)447 bool LimitingInputStream::Skip(int count) {
448 if (count > limit_) {
449 if (limit_ < 0) return false;
450 input_->Skip(limit_);
451 limit_ = 0;
452 return false;
453 } else {
454 if (!input_->Skip(count)) return false;
455 limit_ -= count;
456 return true;
457 }
458 }
459
ByteCount() const460 int64 LimitingInputStream::ByteCount() const {
461 if (limit_ < 0) {
462 return input_->ByteCount() + limit_ - prior_bytes_read_;
463 } else {
464 return input_->ByteCount() - prior_bytes_read_;
465 }
466 }
467
468
469 // ===================================================================
470
471 } // namespace io
472 } // namespace protobuf
473 } // namespace google
474