1 /*
2  *
3  * Copyright 2015 gRPC authors.
4  *
5  * Licensed under the Apache License, Version 2.0 (the "License");
6  * you may not use this file except in compliance with the License.
7  * You may obtain a copy of the License at
8  *
9  *     http://www.apache.org/licenses/LICENSE-2.0
10  *
11  * Unless required by applicable law or agreed to in writing, software
12  * distributed under the License is distributed on an "AS IS" BASIS,
13  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  * See the License for the specific language governing permissions and
15  * limitations under the License.
16  *
17  */
18 
19 #ifndef GRPC_CORE_LIB_TRANSPORT_BYTE_STREAM_H
20 #define GRPC_CORE_LIB_TRANSPORT_BYTE_STREAM_H
21 
22 #include <grpc/support/port_platform.h>
23 
24 #include <grpc/slice_buffer.h>
25 #include "src/core/lib/gprpp/abstract.h"
26 #include "src/core/lib/gprpp/orphanable.h"
27 #include "src/core/lib/iomgr/closure.h"
28 
29 /** Internal bit flag for grpc_begin_message's \a flags signaling the use of
30  * compression for the message */
31 #define GRPC_WRITE_INTERNAL_COMPRESS (0x80000000u)
32 /** Mask of all valid internal flags. */
33 #define GRPC_WRITE_INTERNAL_USED_MASK (GRPC_WRITE_INTERNAL_COMPRESS)
34 
35 namespace grpc_core {
36 
37 class ByteStream : public Orphanable {
38  public:
~ByteStream()39   virtual ~ByteStream() {}
40 
41   // Returns true if the bytes are available immediately (in which case
42   // on_complete will not be called), or false if the bytes will be available
43   // asynchronously (in which case on_complete will be called when they
44   // are available).
45   //
46   // max_size_hint can be set as a hint as to the maximum number
47   // of bytes that would be acceptable to read.
48   virtual bool Next(size_t max_size_hint,
49                     grpc_closure* on_complete) GRPC_ABSTRACT;
50 
51   // Returns the next slice in the byte stream when it is available, as
52   // indicated by Next().
53   //
54   // Once a slice is returned into *slice, it is owned by the caller.
55   virtual grpc_error* Pull(grpc_slice* slice) GRPC_ABSTRACT;
56 
57   // Shuts down the byte stream.
58   //
59   // If there is a pending call to on_complete from Next(), it will be
60   // invoked with the error passed to Shutdown().
61   //
62   // The next call to Pull() (if any) will return the error passed to
63   // Shutdown().
64   virtual void Shutdown(grpc_error* error) GRPC_ABSTRACT;
65 
length()66   uint32_t length() const { return length_; }
flags()67   uint32_t flags() const { return flags_; }
68 
set_flags(uint32_t flags)69   void set_flags(uint32_t flags) { flags_ = flags; }
70 
71   GRPC_ABSTRACT_BASE_CLASS
72 
73  protected:
ByteStream(uint32_t length,uint32_t flags)74   ByteStream(uint32_t length, uint32_t flags)
75       : length_(length), flags_(flags) {}
76 
77  private:
78   const uint32_t length_;
79   uint32_t flags_;
80 };
81 
82 //
83 // SliceBufferByteStream
84 //
85 // A ByteStream that wraps a slice buffer.
86 //
87 
88 class SliceBufferByteStream : public ByteStream {
89  public:
90   // Removes all slices in slice_buffer, leaving it empty.
91   SliceBufferByteStream(grpc_slice_buffer* slice_buffer, uint32_t flags);
92 
93   ~SliceBufferByteStream();
94 
95   void Orphan() override;
96 
97   bool Next(size_t max_size_hint, grpc_closure* on_complete) override;
98   grpc_error* Pull(grpc_slice* slice) override;
99   void Shutdown(grpc_error* error) override;
100 
101  private:
102   grpc_slice_buffer backing_buffer_;
103   size_t cursor_ = 0;
104   grpc_error* shutdown_error_ = GRPC_ERROR_NONE;
105 };
106 
107 //
108 // CachingByteStream
109 //
110 // A ByteStream that that wraps an underlying byte stream but caches
111 // the resulting slices in a slice buffer.  If an initial attempt fails
112 // without fully draining the underlying stream, a new caching stream
113 // can be created from the same underlying cache, in which case it will
114 // return whatever is in the backing buffer before continuing to read the
115 // underlying stream.
116 //
117 // NOTE: No synchronization is done, so it is not safe to have multiple
118 // CachingByteStreams simultaneously drawing from the same underlying
119 // ByteStreamCache at the same time.
120 //
121 
122 class ByteStreamCache {
123  public:
124   class CachingByteStream : public ByteStream {
125    public:
126     explicit CachingByteStream(ByteStreamCache* cache);
127 
128     ~CachingByteStream();
129 
130     void Orphan() override;
131 
132     bool Next(size_t max_size_hint, grpc_closure* on_complete) override;
133     grpc_error* Pull(grpc_slice* slice) override;
134     void Shutdown(grpc_error* error) override;
135 
136     // Resets the byte stream to the start of the underlying stream.
137     void Reset();
138 
139    private:
140     ByteStreamCache* cache_;
141     size_t cursor_ = 0;
142     size_t offset_ = 0;
143     grpc_error* shutdown_error_ = GRPC_ERROR_NONE;
144   };
145 
146   explicit ByteStreamCache(OrphanablePtr<ByteStream> underlying_stream);
147 
148   ~ByteStreamCache();
149 
150   // Must not be destroyed while still in use by a CachingByteStream.
151   void Destroy();
152 
cache_buffer()153   grpc_slice_buffer* cache_buffer() { return &cache_buffer_; }
154 
155  private:
156   OrphanablePtr<ByteStream> underlying_stream_;
157   uint32_t length_;
158   uint32_t flags_;
159   grpc_slice_buffer cache_buffer_;
160 };
161 
162 }  // namespace grpc_core
163 
164 #endif /* GRPC_CORE_LIB_TRANSPORT_BYTE_STREAM_H */
165