1 /*
2  *
3  * Copyright 2017 gRPC authors.
4  *
5  * Licensed under the Apache License, Version 2.0 (the "License");
6  * you may not use this file except in compliance with the License.
7  * You may obtain a copy of the License at
8  *
9  *     http://www.apache.org/licenses/LICENSE-2.0
10  *
11  * Unless required by applicable law or agreed to in writing, software
12  * distributed under the License is distributed on an "AS IS" BASIS,
13  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  * See the License for the specific language governing permissions and
15  * limitations under the License.
16  *
17  */
18 
19 #include "src/core/lib/transport/byte_stream.h"
20 
21 #include <grpc/grpc.h>
22 #include <grpc/support/alloc.h>
23 #include <grpc/support/log.h>
24 
25 #include "src/core/lib/gpr/useful.h"
26 #include "src/core/lib/iomgr/exec_ctx.h"
27 #include "src/core/lib/slice/slice_internal.h"
28 
29 #include "test/core/util/test_config.h"
30 
31 #include <gtest/gtest.h>
32 
33 namespace grpc_core {
34 namespace {
35 
36 //
37 // SliceBufferByteStream tests
38 //
39 
NotCalledClosure(void * arg,grpc_error * error)40 void NotCalledClosure(void* arg, grpc_error* error) { GPR_ASSERT(false); }
41 
TEST(SliceBufferByteStream,Basic)42 TEST(SliceBufferByteStream, Basic) {
43   grpc_core::ExecCtx exec_ctx;
44   // Create and populate slice buffer.
45   grpc_slice_buffer buffer;
46   grpc_slice_buffer_init(&buffer);
47   grpc_slice input[] = {
48       grpc_slice_from_static_string("foo"),
49       grpc_slice_from_static_string("bar"),
50   };
51   for (size_t i = 0; i < GPR_ARRAY_SIZE(input); ++i) {
52     grpc_slice_buffer_add(&buffer, input[i]);
53   }
54   // Create byte stream.
55   SliceBufferByteStream stream(&buffer, 0);
56   grpc_slice_buffer_destroy_internal(&buffer);
57   EXPECT_EQ(6U, stream.length());
58   grpc_closure closure;
59   GRPC_CLOSURE_INIT(&closure, NotCalledClosure, nullptr,
60                     grpc_schedule_on_exec_ctx);
61   // Read each slice.  Note that Next() always returns synchronously.
62   for (size_t i = 0; i < GPR_ARRAY_SIZE(input); ++i) {
63     ASSERT_TRUE(stream.Next(~(size_t)0, &closure));
64     grpc_slice output;
65     grpc_error* error = stream.Pull(&output);
66     EXPECT_TRUE(error == GRPC_ERROR_NONE);
67     EXPECT_TRUE(grpc_slice_eq(input[i], output));
68     grpc_slice_unref_internal(output);
69   }
70   // Clean up.
71   stream.Orphan();
72 }
73 
TEST(SliceBufferByteStream,Shutdown)74 TEST(SliceBufferByteStream, Shutdown) {
75   grpc_core::ExecCtx exec_ctx;
76   // Create and populate slice buffer.
77   grpc_slice_buffer buffer;
78   grpc_slice_buffer_init(&buffer);
79   grpc_slice input[] = {
80       grpc_slice_from_static_string("foo"),
81       grpc_slice_from_static_string("bar"),
82   };
83   for (size_t i = 0; i < GPR_ARRAY_SIZE(input); ++i) {
84     grpc_slice_buffer_add(&buffer, input[i]);
85   }
86   // Create byte stream.
87   SliceBufferByteStream stream(&buffer, 0);
88   grpc_slice_buffer_destroy_internal(&buffer);
89   EXPECT_EQ(6U, stream.length());
90   grpc_closure closure;
91   GRPC_CLOSURE_INIT(&closure, NotCalledClosure, nullptr,
92                     grpc_schedule_on_exec_ctx);
93   // Read the first slice.
94   ASSERT_TRUE(stream.Next(~(size_t)0, &closure));
95   grpc_slice output;
96   grpc_error* error = stream.Pull(&output);
97   EXPECT_TRUE(error == GRPC_ERROR_NONE);
98   EXPECT_TRUE(grpc_slice_eq(input[0], output));
99   grpc_slice_unref_internal(output);
100   // Now shutdown.
101   grpc_error* shutdown_error =
102       GRPC_ERROR_CREATE_FROM_STATIC_STRING("shutdown error");
103   stream.Shutdown(GRPC_ERROR_REF(shutdown_error));
104   // After shutdown, the next pull() should return the error.
105   ASSERT_TRUE(stream.Next(~(size_t)0, &closure));
106   error = stream.Pull(&output);
107   EXPECT_TRUE(error == shutdown_error);
108   GRPC_ERROR_UNREF(error);
109   GRPC_ERROR_UNREF(shutdown_error);
110   // Clean up.
111   stream.Orphan();
112 }
113 
114 //
115 // CachingByteStream tests
116 //
117 
TEST(CachingByteStream,Basic)118 TEST(CachingByteStream, Basic) {
119   grpc_core::ExecCtx exec_ctx;
120   // Create and populate slice buffer byte stream.
121   grpc_slice_buffer buffer;
122   grpc_slice_buffer_init(&buffer);
123   grpc_slice input[] = {
124       grpc_slice_from_static_string("foo"),
125       grpc_slice_from_static_string("bar"),
126   };
127   for (size_t i = 0; i < GPR_ARRAY_SIZE(input); ++i) {
128     grpc_slice_buffer_add(&buffer, input[i]);
129   }
130   SliceBufferByteStream underlying_stream(&buffer, 0);
131   grpc_slice_buffer_destroy_internal(&buffer);
132   // Create cache and caching stream.
133   ByteStreamCache cache((OrphanablePtr<ByteStream>(&underlying_stream)));
134   ByteStreamCache::CachingByteStream stream(&cache);
135   grpc_closure closure;
136   GRPC_CLOSURE_INIT(&closure, NotCalledClosure, nullptr,
137                     grpc_schedule_on_exec_ctx);
138   // Read each slice.  Note that next() always returns synchronously,
139   // because the underlying byte stream always does.
140   for (size_t i = 0; i < GPR_ARRAY_SIZE(input); ++i) {
141     ASSERT_TRUE(stream.Next(~(size_t)0, &closure));
142     grpc_slice output;
143     grpc_error* error = stream.Pull(&output);
144     EXPECT_TRUE(error == GRPC_ERROR_NONE);
145     EXPECT_TRUE(grpc_slice_eq(input[i], output));
146     grpc_slice_unref_internal(output);
147   }
148   // Clean up.
149   stream.Orphan();
150   cache.Destroy();
151 }
152 
TEST(CachingByteStream,Reset)153 TEST(CachingByteStream, Reset) {
154   grpc_core::ExecCtx exec_ctx;
155   // Create and populate slice buffer byte stream.
156   grpc_slice_buffer buffer;
157   grpc_slice_buffer_init(&buffer);
158   grpc_slice input[] = {
159       grpc_slice_from_static_string("foo"),
160       grpc_slice_from_static_string("bar"),
161   };
162   for (size_t i = 0; i < GPR_ARRAY_SIZE(input); ++i) {
163     grpc_slice_buffer_add(&buffer, input[i]);
164   }
165   SliceBufferByteStream underlying_stream(&buffer, 0);
166   grpc_slice_buffer_destroy_internal(&buffer);
167   // Create cache and caching stream.
168   ByteStreamCache cache((OrphanablePtr<ByteStream>(&underlying_stream)));
169   ByteStreamCache::CachingByteStream stream(&cache);
170   grpc_closure closure;
171   GRPC_CLOSURE_INIT(&closure, NotCalledClosure, nullptr,
172                     grpc_schedule_on_exec_ctx);
173   // Read one slice.
174   ASSERT_TRUE(stream.Next(~(size_t)0, &closure));
175   grpc_slice output;
176   grpc_error* error = stream.Pull(&output);
177   EXPECT_TRUE(error == GRPC_ERROR_NONE);
178   EXPECT_TRUE(grpc_slice_eq(input[0], output));
179   grpc_slice_unref_internal(output);
180   // Reset the caching stream.  The reads should start over from the
181   // first slice.
182   stream.Reset();
183   for (size_t i = 0; i < GPR_ARRAY_SIZE(input); ++i) {
184     ASSERT_TRUE(stream.Next(~(size_t)0, &closure));
185     error = stream.Pull(&output);
186     EXPECT_TRUE(error == GRPC_ERROR_NONE);
187     EXPECT_TRUE(grpc_slice_eq(input[i], output));
188     grpc_slice_unref_internal(output);
189   }
190   // Clean up.
191   stream.Orphan();
192   cache.Destroy();
193 }
194 
TEST(CachingByteStream,SharedCache)195 TEST(CachingByteStream, SharedCache) {
196   grpc_core::ExecCtx exec_ctx;
197   // Create and populate slice buffer byte stream.
198   grpc_slice_buffer buffer;
199   grpc_slice_buffer_init(&buffer);
200   grpc_slice input[] = {
201       grpc_slice_from_static_string("foo"),
202       grpc_slice_from_static_string("bar"),
203   };
204   for (size_t i = 0; i < GPR_ARRAY_SIZE(input); ++i) {
205     grpc_slice_buffer_add(&buffer, input[i]);
206   }
207   SliceBufferByteStream underlying_stream(&buffer, 0);
208   grpc_slice_buffer_destroy_internal(&buffer);
209   // Create cache and two caching streams.
210   ByteStreamCache cache((OrphanablePtr<ByteStream>(&underlying_stream)));
211   ByteStreamCache::CachingByteStream stream1(&cache);
212   ByteStreamCache::CachingByteStream stream2(&cache);
213   grpc_closure closure;
214   GRPC_CLOSURE_INIT(&closure, NotCalledClosure, nullptr,
215                     grpc_schedule_on_exec_ctx);
216   // Read one slice from stream1.
217   EXPECT_TRUE(stream1.Next(~(size_t)0, &closure));
218   grpc_slice output;
219   grpc_error* error = stream1.Pull(&output);
220   EXPECT_TRUE(error == GRPC_ERROR_NONE);
221   EXPECT_TRUE(grpc_slice_eq(input[0], output));
222   grpc_slice_unref_internal(output);
223   // Read all slices from stream2.
224   for (size_t i = 0; i < GPR_ARRAY_SIZE(input); ++i) {
225     EXPECT_TRUE(stream2.Next(~(size_t)0, &closure));
226     error = stream2.Pull(&output);
227     EXPECT_TRUE(error == GRPC_ERROR_NONE);
228     EXPECT_TRUE(grpc_slice_eq(input[i], output));
229     grpc_slice_unref_internal(output);
230   }
231   // Now read the second slice from stream1.
232   EXPECT_TRUE(stream1.Next(~(size_t)0, &closure));
233   error = stream1.Pull(&output);
234   EXPECT_TRUE(error == GRPC_ERROR_NONE);
235   EXPECT_TRUE(grpc_slice_eq(input[1], output));
236   grpc_slice_unref_internal(output);
237   // Clean up.
238   stream1.Orphan();
239   stream2.Orphan();
240   cache.Destroy();
241 }
242 
243 }  // namespace
244 }  // namespace grpc_core
245 
main(int argc,char ** argv)246 int main(int argc, char** argv) {
247   grpc_init();
248   grpc_test_init(argc, argv);
249   ::testing::InitGoogleTest(&argc, argv);
250   int retval = RUN_ALL_TESTS();
251   grpc_shutdown();
252   return retval;
253 }
254