1 /*
2  *
3  * Copyright 2015 gRPC authors.
4  *
5  * Licensed under the Apache License, Version 2.0 (the "License");
6  * you may not use this file except in compliance with the License.
7  * You may obtain a copy of the License at
8  *
9  *     http://www.apache.org/licenses/LICENSE-2.0
10  *
11  * Unless required by applicable law or agreed to in writing, software
12  * distributed under the License is distributed on an "AS IS" BASIS,
13  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  * See the License for the specific language governing permissions and
15  * limitations under the License.
16  *
17  */
18 
19 #include <grpc/support/port_platform.h>
20 
21 #include "src/core/lib/compression/message_compress.h"
22 
23 #include <string.h>
24 
25 #include <grpc/support/alloc.h>
26 #include <grpc/support/log.h>
27 
28 #include <zlib.h>
29 
30 #include "src/core/lib/slice/slice_internal.h"
31 
32 #define OUTPUT_BLOCK_SIZE 1024
33 
zlib_body(z_stream * zs,grpc_slice_buffer * input,grpc_slice_buffer * output,int (* flate)(z_stream * zs,int flush))34 static int zlib_body(z_stream* zs, grpc_slice_buffer* input,
35                      grpc_slice_buffer* output,
36                      int (*flate)(z_stream* zs, int flush)) {
37   int r;
38   int flush;
39   size_t i;
40   grpc_slice outbuf = GRPC_SLICE_MALLOC(OUTPUT_BLOCK_SIZE);
41   const uInt uint_max = ~static_cast<uInt>(0);
42 
43   GPR_ASSERT(GRPC_SLICE_LENGTH(outbuf) <= uint_max);
44   zs->avail_out = static_cast<uInt> GRPC_SLICE_LENGTH(outbuf);
45   zs->next_out = GRPC_SLICE_START_PTR(outbuf);
46   flush = Z_NO_FLUSH;
47   for (i = 0; i < input->count; i++) {
48     if (i == input->count - 1) flush = Z_FINISH;
49     GPR_ASSERT(GRPC_SLICE_LENGTH(input->slices[i]) <= uint_max);
50     zs->avail_in = static_cast<uInt> GRPC_SLICE_LENGTH(input->slices[i]);
51     zs->next_in = GRPC_SLICE_START_PTR(input->slices[i]);
52     do {
53       if (zs->avail_out == 0) {
54         grpc_slice_buffer_add_indexed(output, outbuf);
55         outbuf = GRPC_SLICE_MALLOC(OUTPUT_BLOCK_SIZE);
56         GPR_ASSERT(GRPC_SLICE_LENGTH(outbuf) <= uint_max);
57         zs->avail_out = static_cast<uInt> GRPC_SLICE_LENGTH(outbuf);
58         zs->next_out = GRPC_SLICE_START_PTR(outbuf);
59       }
60       r = flate(zs, flush);
61       if (r < 0 && r != Z_BUF_ERROR /* not fatal */) {
62         gpr_log(GPR_INFO, "zlib error (%d)", r);
63         goto error;
64       }
65     } while (zs->avail_out == 0);
66     if (zs->avail_in) {
67       gpr_log(GPR_INFO, "zlib: not all input consumed");
68       goto error;
69     }
70   }
71 
72   GPR_ASSERT(outbuf.refcount);
73   outbuf.data.refcounted.length -= zs->avail_out;
74   grpc_slice_buffer_add_indexed(output, outbuf);
75 
76   return 1;
77 
78 error:
79   grpc_slice_unref_internal(outbuf);
80   return 0;
81 }
82 
zalloc_gpr(void * opaque,unsigned int items,unsigned int size)83 static void* zalloc_gpr(void* opaque, unsigned int items, unsigned int size) {
84   return gpr_malloc(items * size);
85 }
86 
zfree_gpr(void * opaque,void * address)87 static void zfree_gpr(void* opaque, void* address) { gpr_free(address); }
88 
zlib_compress(grpc_slice_buffer * input,grpc_slice_buffer * output,int gzip)89 static int zlib_compress(grpc_slice_buffer* input, grpc_slice_buffer* output,
90                          int gzip) {
91   z_stream zs;
92   int r;
93   size_t i;
94   size_t count_before = output->count;
95   size_t length_before = output->length;
96   memset(&zs, 0, sizeof(zs));
97   zs.zalloc = zalloc_gpr;
98   zs.zfree = zfree_gpr;
99   r = deflateInit2(&zs, Z_DEFAULT_COMPRESSION, Z_DEFLATED, 15 | (gzip ? 16 : 0),
100                    8, Z_DEFAULT_STRATEGY);
101   GPR_ASSERT(r == Z_OK);
102   r = zlib_body(&zs, input, output, deflate) && output->length < input->length;
103   if (!r) {
104     for (i = count_before; i < output->count; i++) {
105       grpc_slice_unref_internal(output->slices[i]);
106     }
107     output->count = count_before;
108     output->length = length_before;
109   }
110   deflateEnd(&zs);
111   return r;
112 }
113 
zlib_decompress(grpc_slice_buffer * input,grpc_slice_buffer * output,int gzip)114 static int zlib_decompress(grpc_slice_buffer* input, grpc_slice_buffer* output,
115                            int gzip) {
116   z_stream zs;
117   int r;
118   size_t i;
119   size_t count_before = output->count;
120   size_t length_before = output->length;
121   memset(&zs, 0, sizeof(zs));
122   zs.zalloc = zalloc_gpr;
123   zs.zfree = zfree_gpr;
124   r = inflateInit2(&zs, 15 | (gzip ? 16 : 0));
125   GPR_ASSERT(r == Z_OK);
126   r = zlib_body(&zs, input, output, inflate);
127   if (!r) {
128     for (i = count_before; i < output->count; i++) {
129       grpc_slice_unref_internal(output->slices[i]);
130     }
131     output->count = count_before;
132     output->length = length_before;
133   }
134   inflateEnd(&zs);
135   return r;
136 }
137 
copy(grpc_slice_buffer * input,grpc_slice_buffer * output)138 static int copy(grpc_slice_buffer* input, grpc_slice_buffer* output) {
139   size_t i;
140   for (i = 0; i < input->count; i++) {
141     grpc_slice_buffer_add(output, grpc_slice_ref_internal(input->slices[i]));
142   }
143   return 1;
144 }
145 
compress_inner(grpc_message_compression_algorithm algorithm,grpc_slice_buffer * input,grpc_slice_buffer * output)146 static int compress_inner(grpc_message_compression_algorithm algorithm,
147                           grpc_slice_buffer* input, grpc_slice_buffer* output) {
148   switch (algorithm) {
149     case GRPC_MESSAGE_COMPRESS_NONE:
150       /* the fallback path always needs to be send uncompressed: we simply
151          rely on that here */
152       return 0;
153     case GRPC_MESSAGE_COMPRESS_DEFLATE:
154       return zlib_compress(input, output, 0);
155     case GRPC_MESSAGE_COMPRESS_GZIP:
156       return zlib_compress(input, output, 1);
157     case GRPC_MESSAGE_COMPRESS_ALGORITHMS_COUNT:
158       break;
159   }
160   gpr_log(GPR_ERROR, "invalid compression algorithm %d", algorithm);
161   return 0;
162 }
163 
grpc_msg_compress(grpc_message_compression_algorithm algorithm,grpc_slice_buffer * input,grpc_slice_buffer * output)164 int grpc_msg_compress(grpc_message_compression_algorithm algorithm,
165                       grpc_slice_buffer* input, grpc_slice_buffer* output) {
166   if (!compress_inner(algorithm, input, output)) {
167     copy(input, output);
168     return 0;
169   }
170   return 1;
171 }
172 
grpc_msg_decompress(grpc_message_compression_algorithm algorithm,grpc_slice_buffer * input,grpc_slice_buffer * output)173 int grpc_msg_decompress(grpc_message_compression_algorithm algorithm,
174                         grpc_slice_buffer* input, grpc_slice_buffer* output) {
175   switch (algorithm) {
176     case GRPC_MESSAGE_COMPRESS_NONE:
177       return copy(input, output);
178     case GRPC_MESSAGE_COMPRESS_DEFLATE:
179       return zlib_decompress(input, output, 0);
180     case GRPC_MESSAGE_COMPRESS_GZIP:
181       return zlib_decompress(input, output, 1);
182     case GRPC_MESSAGE_COMPRESS_ALGORITHMS_COUNT:
183       break;
184   }
185   gpr_log(GPR_ERROR, "invalid compression algorithm %d", algorithm);
186   return 0;
187 }
188