1 /*-------------------------------------------------------------------------
2  * drawElements Stream Library
3  * ---------------------------
4  *
5  * Copyright 2014 The Android Open Source Project
6  *
7  * Licensed under the Apache License, Version 2.0 (the "License");
8  * you may not use this file except in compliance with the License.
9  * You may obtain a copy of the License at
10  *
11  *      http://www.apache.org/licenses/LICENSE-2.0
12  *
13  * Unless required by applicable law or agreed to in writing, software
14  * distributed under the License is distributed on an "AS IS" BASIS,
15  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16  * See the License for the specific language governing permissions and
17  * limitations under the License.
18  *
19  *//*!
20  * \file
21  * \brief Buffered and threaded input and output streams
22  *//*--------------------------------------------------------------------*/
23 
24 #include "deThreadStream.h"
25 #include "deStreamCpyThread.h"
26 #include "deRingbuffer.h"
27 #include "stdlib.h"
28 
29 typedef struct deThreadInStream_s
30 {
31 	deRingbuffer*		ringbuffer;
32 	deInStream*			input;
33 	deInStream			consumerStream;
34 	deOutStream			producerStream;
35 	deThread			thread;
36 	int					bufferSize;
37 } deThreadInStream;
38 
39 typedef struct deThreadOutStream_s
40 {
41 	deRingbuffer*		ringbuffer;
42 	deInStream			consumerStream;
43 	deOutStream			producerStream;
44 	deStreamCpyThread*	thread;
45 } deThreadOutStream;
46 
inStreamCopy(void * arg)47 static void inStreamCopy (void* arg)
48 {
49 	deThreadInStream* threadStream = (deThreadInStream*)arg;
50 
51 	deUint8* buffer = malloc(sizeof(deUint8) * (size_t)threadStream->bufferSize);
52 
53 	for(;;)
54 	{
55 		deInt32 read	= 0;
56 		deInt32 written	= 0;
57 		deStreamResult readResult = DE_STREAMRESULT_ERROR;
58 
59 		readResult = deInStream_read(threadStream->input, buffer, threadStream->bufferSize, &read);
60 		DE_ASSERT(readResult != DE_STREAMRESULT_ERROR);
61 		while (written < read)
62 		{
63 			deInt32 wrote = 0;
64 
65 			/* \todo [mika] Handle errors */
66 			deOutStream_write(&(threadStream->producerStream), buffer, read - written, &wrote);
67 
68 			written += wrote;
69 		}
70 
71 		if (readResult == DE_STREAMRESULT_END_OF_STREAM)
72 		{
73 			break;
74 		}
75 	}
76 
77 	deOutStream_flush(&(threadStream->producerStream));
78 	deRingbuffer_stop(threadStream->ringbuffer);
79 	free(buffer);
80 
81 }
82 
threadInStream_read(deStreamData * stream,void * buf,deInt32 bufSize,deInt32 * numRead)83 static deStreamResult threadInStream_read (deStreamData* stream, void* buf, deInt32 bufSize, deInt32* numRead)
84 {
85 	deThreadInStream* threadStream = (deThreadInStream*)stream;
86 	return deInStream_read(&(threadStream->consumerStream), buf, bufSize, numRead);
87 }
88 
threadInStream_getError(deStreamData * stream)89 static const char* threadInStream_getError (deStreamData* stream)
90 {
91 	deThreadInStream* threadStream = (deThreadInStream*)stream;
92 
93 	/* \todo [mika] Add handling for errors on thread stream */
94 	return deInStream_getError(&(threadStream->consumerStream));
95 }
96 
threadInStream_getStatus(deStreamData * stream)97 static deStreamStatus threadInStream_getStatus (deStreamData* stream)
98 {
99 	deThreadInStream* threadStream = (deThreadInStream*)stream;
100 
101 	/* \todo [mika] Add handling for status on thread stream */
102 	return deInStream_getStatus(&(threadStream->consumerStream));
103 }
104 
105 /* \note [mika] Used by both in and out stream */
threadStream_deinit(deStreamData * stream)106 static deStreamResult threadStream_deinit (deStreamData* stream)
107 {
108 	deThreadInStream* threadStream = (deThreadInStream*)stream;
109 
110 	deRingbuffer_stop(threadStream->ringbuffer);
111 
112 	deThread_join(threadStream->thread);
113 	deThread_destroy(threadStream->thread);
114 
115 	deOutStream_deinit(&(threadStream->producerStream));
116 	deInStream_deinit(&(threadStream->consumerStream));
117 
118 	deRingbuffer_destroy(threadStream->ringbuffer);
119 
120 	return DE_STREAMRESULT_SUCCESS;
121 }
122 
123 static const deIOStreamVFTable threadInStreamVFTable = {
124 	threadInStream_read,
125 	DE_NULL,
126 	threadInStream_getError,
127 	DE_NULL,
128 	threadStream_deinit,
129 	threadInStream_getStatus
130 };
131 
deThreadInStream_init(deInStream * stream,deInStream * input,int ringbufferBlockSize,int ringbufferBlockCount)132 void deThreadInStream_init (deInStream* stream, deInStream* input, int ringbufferBlockSize, int ringbufferBlockCount)
133 {
134 	deThreadInStream* threadStream = DE_NULL;
135 
136 	threadStream = malloc(sizeof(deThreadInStream));
137 	DE_ASSERT(threadStream);
138 
139 	threadStream->ringbuffer = deRingbuffer_create(ringbufferBlockSize, ringbufferBlockCount);
140 	DE_ASSERT(threadStream->ringbuffer);
141 
142 	threadStream->bufferSize = ringbufferBlockSize;
143 	threadStream->input = input;
144 	deProducerStream_init(&(threadStream->producerStream), threadStream->ringbuffer);
145 	deConsumerStream_init(&(threadStream->consumerStream), threadStream->ringbuffer);
146 
147 	threadStream->thread		= deThread_create(inStreamCopy, threadStream, DE_NULL);
148 	stream->ioStream.vfTable 	= &threadInStreamVFTable;
149 	stream->ioStream.streamData = threadStream;
150 }
151 
threadOutStream_write(deStreamData * stream,const void * buf,deInt32 bufSize,deInt32 * numWritten)152 static deStreamResult threadOutStream_write (deStreamData* stream, const void* buf, deInt32 bufSize, deInt32* numWritten)
153 {
154 	deThreadOutStream* threadStream = (deThreadOutStream*)stream;
155 	return deOutStream_write(&(threadStream->producerStream), buf, bufSize, numWritten);
156 }
157 
threadOutStream_getError(deStreamData * stream)158 static const char* threadOutStream_getError (deStreamData* stream)
159 {
160 	deThreadOutStream* threadStream = (deThreadOutStream*)stream;
161 
162 	/* \todo [mika] Add handling for errors on thread stream */
163 	return deOutStream_getError(&(threadStream->producerStream));
164 }
165 
threadOutStream_getStatus(deStreamData * stream)166 static deStreamStatus threadOutStream_getStatus (deStreamData* stream)
167 {
168 	deThreadOutStream* threadStream = (deThreadOutStream*)stream;
169 
170 	/* \todo [mika] Add handling for errors on thread stream */
171 	return deOutStream_getStatus(&(threadStream->producerStream));
172 }
173 
threadOutStream_flush(deStreamData * stream)174 static deStreamResult threadOutStream_flush (deStreamData* stream)
175 {
176 	deThreadOutStream* threadStream = (deThreadOutStream*)stream;
177 
178 	return deOutStream_flush(&(threadStream->producerStream));
179 }
180 
181 static const deIOStreamVFTable threadOutStreamVFTable = {
182 	DE_NULL,
183 	threadOutStream_write,
184 	threadOutStream_getError,
185 	threadOutStream_flush,
186 	threadStream_deinit,
187 	threadOutStream_getStatus
188 };
189 
deThreadOutStream_init(deOutStream * stream,deOutStream * output,int ringbufferBlockSize,int ringbufferBlockCount)190 void deThreadOutStream_init (deOutStream* stream, deOutStream* output, int ringbufferBlockSize, int ringbufferBlockCount)
191 {
192 	deThreadOutStream* threadStream = DE_NULL;
193 
194 	threadStream = malloc(sizeof(deThreadOutStream));
195 	DE_ASSERT(threadStream);
196 
197 	threadStream->ringbuffer = deRingbuffer_create(ringbufferBlockSize, ringbufferBlockCount);
198 	DE_ASSERT(threadStream->ringbuffer);
199 
200 	deProducerStream_init(&(threadStream->producerStream), threadStream->ringbuffer);
201 	deConsumerStream_init(&(threadStream->consumerStream), threadStream->ringbuffer);
202 
203 	threadStream->thread		= deStreamCpyThread_create(&(threadStream->consumerStream), output, ringbufferBlockSize);
204 	stream->ioStream.vfTable 	= &threadOutStreamVFTable;
205 	stream->ioStream.streamData = threadStream;
206 }
207 
208