1 /*
2  *
3  * Copyright 2015 gRPC authors.
4  *
5  * Licensed under the Apache License, Version 2.0 (the "License");
6  * you may not use this file except in compliance with the License.
7  * You may obtain a copy of the License at
8  *
9  *     http://www.apache.org/licenses/LICENSE-2.0
10  *
11  * Unless required by applicable law or agreed to in writing, software
12  * distributed under the License is distributed on an "AS IS" BASIS,
13  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  * See the License for the specific language governing permissions and
15  * limitations under the License.
16  *
17  */
18 
19 #include <grpc/support/port_platform.h>
20 
21 #include "src/core/lib/transport/byte_stream.h"
22 
23 #include <stdlib.h>
24 #include <string.h>
25 
26 #include <grpc/support/log.h>
27 
28 #include "src/core/lib/gprpp/memory.h"
29 #include "src/core/lib/slice/slice_internal.h"
30 
31 namespace grpc_core {
32 
33 //
34 // SliceBufferByteStream
35 //
36 
SliceBufferByteStream(grpc_slice_buffer * slice_buffer,uint32_t flags)37 SliceBufferByteStream::SliceBufferByteStream(grpc_slice_buffer* slice_buffer,
38                                              uint32_t flags)
39     : ByteStream(static_cast<uint32_t>(slice_buffer->length), flags) {
40   GPR_ASSERT(slice_buffer->length <= UINT32_MAX);
41   grpc_slice_buffer_init(&backing_buffer_);
42   grpc_slice_buffer_swap(slice_buffer, &backing_buffer_);
43 }
44 
~SliceBufferByteStream()45 SliceBufferByteStream::~SliceBufferByteStream() {}
46 
Orphan()47 void SliceBufferByteStream::Orphan() {
48   grpc_slice_buffer_destroy_internal(&backing_buffer_);
49   GRPC_ERROR_UNREF(shutdown_error_);
50   // Note: We do not actually delete the object here, since
51   // SliceBufferByteStream is usually allocated as part of a larger
52   // object and has an OrphanablePtr of itself passed down through the
53   // filter stack.
54 }
55 
Next(size_t max_size_hint,grpc_closure * on_complete)56 bool SliceBufferByteStream::Next(size_t max_size_hint,
57                                  grpc_closure* on_complete) {
58   GPR_ASSERT(cursor_ < backing_buffer_.count);
59   return true;
60 }
61 
Pull(grpc_slice * slice)62 grpc_error* SliceBufferByteStream::Pull(grpc_slice* slice) {
63   if (shutdown_error_ != GRPC_ERROR_NONE) {
64     return GRPC_ERROR_REF(shutdown_error_);
65   }
66   GPR_ASSERT(cursor_ < backing_buffer_.count);
67   *slice = grpc_slice_ref_internal(backing_buffer_.slices[cursor_]);
68   ++cursor_;
69   return GRPC_ERROR_NONE;
70 }
71 
Shutdown(grpc_error * error)72 void SliceBufferByteStream::Shutdown(grpc_error* error) {
73   GRPC_ERROR_UNREF(shutdown_error_);
74   shutdown_error_ = error;
75 }
76 
77 //
78 // ByteStreamCache
79 //
80 
ByteStreamCache(OrphanablePtr<ByteStream> underlying_stream)81 ByteStreamCache::ByteStreamCache(OrphanablePtr<ByteStream> underlying_stream)
82     : underlying_stream_(std::move(underlying_stream)),
83       length_(underlying_stream_->length()),
84       flags_(underlying_stream_->flags()) {
85   grpc_slice_buffer_init(&cache_buffer_);
86 }
87 
~ByteStreamCache()88 ByteStreamCache::~ByteStreamCache() { Destroy(); }
89 
Destroy()90 void ByteStreamCache::Destroy() {
91   underlying_stream_.reset();
92   if (cache_buffer_.length > 0) {
93     grpc_slice_buffer_destroy_internal(&cache_buffer_);
94   }
95 }
96 
97 //
98 // ByteStreamCache::CachingByteStream
99 //
100 
CachingByteStream(ByteStreamCache * cache)101 ByteStreamCache::CachingByteStream::CachingByteStream(ByteStreamCache* cache)
102     : ByteStream(cache->length_, cache->flags_), cache_(cache) {}
103 
~CachingByteStream()104 ByteStreamCache::CachingByteStream::~CachingByteStream() {}
105 
Orphan()106 void ByteStreamCache::CachingByteStream::Orphan() {
107   GRPC_ERROR_UNREF(shutdown_error_);
108   // Note: We do not actually delete the object here, since
109   // CachingByteStream is usually allocated as part of a larger
110   // object and has an OrphanablePtr of itself passed down through the
111   // filter stack.
112 }
113 
Next(size_t max_size_hint,grpc_closure * on_complete)114 bool ByteStreamCache::CachingByteStream::Next(size_t max_size_hint,
115                                               grpc_closure* on_complete) {
116   if (shutdown_error_ != GRPC_ERROR_NONE) return true;
117   if (cursor_ < cache_->cache_buffer_.count) return true;
118   GPR_ASSERT(cache_->underlying_stream_ != nullptr);
119   return cache_->underlying_stream_->Next(max_size_hint, on_complete);
120 }
121 
Pull(grpc_slice * slice)122 grpc_error* ByteStreamCache::CachingByteStream::Pull(grpc_slice* slice) {
123   if (shutdown_error_ != GRPC_ERROR_NONE) {
124     return GRPC_ERROR_REF(shutdown_error_);
125   }
126   if (cursor_ < cache_->cache_buffer_.count) {
127     *slice = grpc_slice_ref_internal(cache_->cache_buffer_.slices[cursor_]);
128     ++cursor_;
129     offset_ += GRPC_SLICE_LENGTH(*slice);
130     return GRPC_ERROR_NONE;
131   }
132   GPR_ASSERT(cache_->underlying_stream_ != nullptr);
133   grpc_error* error = cache_->underlying_stream_->Pull(slice);
134   if (error == GRPC_ERROR_NONE) {
135     grpc_slice_buffer_add(&cache_->cache_buffer_,
136                           grpc_slice_ref_internal(*slice));
137     ++cursor_;
138     offset_ += GRPC_SLICE_LENGTH(*slice);
139     // Orphan the underlying stream if it's been drained.
140     if (offset_ == cache_->underlying_stream_->length()) {
141       cache_->underlying_stream_.reset();
142     }
143   }
144   return error;
145 }
146 
Shutdown(grpc_error * error)147 void ByteStreamCache::CachingByteStream::Shutdown(grpc_error* error) {
148   GRPC_ERROR_UNREF(shutdown_error_);
149   shutdown_error_ = GRPC_ERROR_REF(error);
150   if (cache_->underlying_stream_ != nullptr) {
151     cache_->underlying_stream_->Shutdown(error);
152   }
153 }
154 
Reset()155 void ByteStreamCache::CachingByteStream::Reset() {
156   cursor_ = 0;
157   offset_ = 0;
158 }
159 
160 }  // namespace grpc_core
161