1 /* MtCoder.c -- Multi-thread Coder
2 2010-09-24 : Igor Pavlov : Public domain */
3 
4 #include "Precomp.h"
5 
6 #include <stdio.h>
7 
8 #include "MtCoder.h"
9 
LoopThread_Construct(CLoopThread * p)10 void LoopThread_Construct(CLoopThread *p)
11 {
12   Thread_Construct(&p->thread);
13   Event_Construct(&p->startEvent);
14   Event_Construct(&p->finishedEvent);
15 }
16 
LoopThread_Close(CLoopThread * p)17 void LoopThread_Close(CLoopThread *p)
18 {
19   Thread_Close(&p->thread);
20   Event_Close(&p->startEvent);
21   Event_Close(&p->finishedEvent);
22 }
23 
LoopThreadFunc(void * pp)24 static THREAD_FUNC_RET_TYPE THREAD_FUNC_CALL_TYPE LoopThreadFunc(void *pp)
25 {
26   CLoopThread *p = (CLoopThread *)pp;
27   for (;;)
28   {
29     if (Event_Wait(&p->startEvent) != 0)
30       return SZ_ERROR_THREAD;
31     if (p->stop)
32       return 0;
33     p->res = p->func(p->param);
34     if (Event_Set(&p->finishedEvent) != 0)
35       return SZ_ERROR_THREAD;
36   }
37 }
38 
LoopThread_Create(CLoopThread * p)39 WRes LoopThread_Create(CLoopThread *p)
40 {
41   p->stop = 0;
42   RINOK(AutoResetEvent_CreateNotSignaled(&p->startEvent));
43   RINOK(AutoResetEvent_CreateNotSignaled(&p->finishedEvent));
44   return Thread_Create(&p->thread, LoopThreadFunc, p);
45 }
46 
LoopThread_StopAndWait(CLoopThread * p)47 WRes LoopThread_StopAndWait(CLoopThread *p)
48 {
49   p->stop = 1;
50   if (Event_Set(&p->startEvent) != 0)
51     return SZ_ERROR_THREAD;
52   return Thread_Wait(&p->thread);
53 }
54 
LoopThread_StartSubThread(CLoopThread * p)55 WRes LoopThread_StartSubThread(CLoopThread *p) { return Event_Set(&p->startEvent); }
LoopThread_WaitSubThread(CLoopThread * p)56 WRes LoopThread_WaitSubThread(CLoopThread *p) { return Event_Wait(&p->finishedEvent); }
57 
Progress(ICompressProgress * p,UInt64 inSize,UInt64 outSize)58 static SRes Progress(ICompressProgress *p, UInt64 inSize, UInt64 outSize)
59 {
60   return (p && p->Progress(p, inSize, outSize) != SZ_OK) ? SZ_ERROR_PROGRESS : SZ_OK;
61 }
62 
MtProgress_Init(CMtProgress * p,ICompressProgress * progress)63 static void MtProgress_Init(CMtProgress *p, ICompressProgress *progress)
64 {
65   unsigned i;
66   for (i = 0; i < NUM_MT_CODER_THREADS_MAX; i++)
67     p->inSizes[i] = p->outSizes[i] = 0;
68   p->totalInSize = p->totalOutSize = 0;
69   p->progress = progress;
70   p->res = SZ_OK;
71 }
72 
MtProgress_Reinit(CMtProgress * p,unsigned index)73 static void MtProgress_Reinit(CMtProgress *p, unsigned index)
74 {
75   p->inSizes[index] = 0;
76   p->outSizes[index] = 0;
77 }
78 
79 #define UPDATE_PROGRESS(size, prev, total) \
80   if (size != (UInt64)(Int64)-1) { total += size - prev; prev = size; }
81 
MtProgress_Set(CMtProgress * p,unsigned index,UInt64 inSize,UInt64 outSize)82 SRes MtProgress_Set(CMtProgress *p, unsigned index, UInt64 inSize, UInt64 outSize)
83 {
84   SRes res;
85   CriticalSection_Enter(&p->cs);
86   UPDATE_PROGRESS(inSize, p->inSizes[index], p->totalInSize)
87   UPDATE_PROGRESS(outSize, p->outSizes[index], p->totalOutSize)
88   if (p->res == SZ_OK)
89     p->res = Progress(p->progress, p->totalInSize, p->totalOutSize);
90   res = p->res;
91   CriticalSection_Leave(&p->cs);
92   return res;
93 }
94 
MtProgress_SetError(CMtProgress * p,SRes res)95 static void MtProgress_SetError(CMtProgress *p, SRes res)
96 {
97   CriticalSection_Enter(&p->cs);
98   if (p->res == SZ_OK)
99     p->res = res;
100   CriticalSection_Leave(&p->cs);
101 }
102 
MtCoder_SetError(CMtCoder * p,SRes res)103 static void MtCoder_SetError(CMtCoder* p, SRes res)
104 {
105   CriticalSection_Enter(&p->cs);
106   if (p->res == SZ_OK)
107     p->res = res;
108   CriticalSection_Leave(&p->cs);
109 }
110 
111 /* ---------- MtThread ---------- */
112 
CMtThread_Construct(CMtThread * p,CMtCoder * mtCoder)113 void CMtThread_Construct(CMtThread *p, CMtCoder *mtCoder)
114 {
115   p->mtCoder = mtCoder;
116   p->outBuf = 0;
117   p->inBuf = 0;
118   Event_Construct(&p->canRead);
119   Event_Construct(&p->canWrite);
120   LoopThread_Construct(&p->thread);
121 }
122 
123 #define RINOK_THREAD(x) { if((x) != 0) return SZ_ERROR_THREAD; }
124 
CMtThread_CloseEvents(CMtThread * p)125 static void CMtThread_CloseEvents(CMtThread *p)
126 {
127   Event_Close(&p->canRead);
128   Event_Close(&p->canWrite);
129 }
130 
CMtThread_Destruct(CMtThread * p)131 static void CMtThread_Destruct(CMtThread *p)
132 {
133   CMtThread_CloseEvents(p);
134 
135   if (Thread_WasCreated(&p->thread.thread))
136   {
137     LoopThread_StopAndWait(&p->thread);
138     LoopThread_Close(&p->thread);
139   }
140 
141   if (p->mtCoder->alloc)
142     IAlloc_Free(p->mtCoder->alloc, p->outBuf);
143   p->outBuf = 0;
144 
145   if (p->mtCoder->alloc)
146     IAlloc_Free(p->mtCoder->alloc, p->inBuf);
147   p->inBuf = 0;
148 }
149 
150 #define MY_BUF_ALLOC(buf, size, newSize) \
151   if (buf == 0 || size != newSize) \
152   { IAlloc_Free(p->mtCoder->alloc, buf); \
153     size = newSize; buf = (Byte *)IAlloc_Alloc(p->mtCoder->alloc, size); \
154     if (buf == 0) return SZ_ERROR_MEM; }
155 
CMtThread_Prepare(CMtThread * p)156 static SRes CMtThread_Prepare(CMtThread *p)
157 {
158   MY_BUF_ALLOC(p->inBuf, p->inBufSize, p->mtCoder->blockSize)
159   MY_BUF_ALLOC(p->outBuf, p->outBufSize, p->mtCoder->destBlockSize)
160 
161   p->stopReading = False;
162   p->stopWriting = False;
163   RINOK_THREAD(AutoResetEvent_CreateNotSignaled(&p->canRead));
164   RINOK_THREAD(AutoResetEvent_CreateNotSignaled(&p->canWrite));
165 
166   return SZ_OK;
167 }
168 
FullRead(ISeqInStream * stream,Byte * data,size_t * processedSize)169 static SRes FullRead(ISeqInStream *stream, Byte *data, size_t *processedSize)
170 {
171   size_t size = *processedSize;
172   *processedSize = 0;
173   while (size != 0)
174   {
175     size_t curSize = size;
176     SRes res = stream->Read(stream, data, &curSize);
177     *processedSize += curSize;
178     data += curSize;
179     size -= curSize;
180     RINOK(res);
181     if (curSize == 0)
182       return SZ_OK;
183   }
184   return SZ_OK;
185 }
186 
187 #define GET_NEXT_THREAD(p) &p->mtCoder->threads[p->index == p->mtCoder->numThreads  - 1 ? 0 : p->index + 1]
188 
MtThread_Process(CMtThread * p,Bool * stop)189 static SRes MtThread_Process(CMtThread *p, Bool *stop)
190 {
191   CMtThread *next;
192   *stop = True;
193   if (Event_Wait(&p->canRead) != 0)
194     return SZ_ERROR_THREAD;
195 
196   next = GET_NEXT_THREAD(p);
197 
198   if (p->stopReading)
199   {
200     next->stopReading = True;
201     return Event_Set(&next->canRead) == 0 ? SZ_OK : SZ_ERROR_THREAD;
202   }
203 
204   {
205     size_t size = p->mtCoder->blockSize;
206     size_t destSize = p->outBufSize;
207 
208     RINOK(FullRead(p->mtCoder->inStream, p->inBuf, &size));
209     next->stopReading = *stop = (size != p->mtCoder->blockSize);
210     if (Event_Set(&next->canRead) != 0)
211       return SZ_ERROR_THREAD;
212 
213     RINOK(p->mtCoder->mtCallback->Code(p->mtCoder->mtCallback, p->index,
214         p->outBuf, &destSize, p->inBuf, size, *stop));
215 
216     MtProgress_Reinit(&p->mtCoder->mtProgress, p->index);
217 
218     if (Event_Wait(&p->canWrite) != 0)
219       return SZ_ERROR_THREAD;
220     if (p->stopWriting)
221       return SZ_ERROR_FAIL;
222     if (p->mtCoder->outStream->Write(p->mtCoder->outStream, p->outBuf, destSize) != destSize)
223       return SZ_ERROR_WRITE;
224     return Event_Set(&next->canWrite) == 0 ? SZ_OK : SZ_ERROR_THREAD;
225   }
226 }
227 
ThreadFunc(void * pp)228 static THREAD_FUNC_RET_TYPE THREAD_FUNC_CALL_TYPE ThreadFunc(void *pp)
229 {
230   CMtThread *p = (CMtThread *)pp;
231   for (;;)
232   {
233     Bool stop;
234     CMtThread *next = GET_NEXT_THREAD(p);
235     SRes res = MtThread_Process(p, &stop);
236     if (res != SZ_OK)
237     {
238       MtCoder_SetError(p->mtCoder, res);
239       MtProgress_SetError(&p->mtCoder->mtProgress, res);
240       next->stopReading = True;
241       next->stopWriting = True;
242       Event_Set(&next->canRead);
243       Event_Set(&next->canWrite);
244       return res;
245     }
246     if (stop)
247       return 0;
248   }
249 }
250 
MtCoder_Construct(CMtCoder * p)251 void MtCoder_Construct(CMtCoder* p)
252 {
253   unsigned i;
254   p->alloc = 0;
255   for (i = 0; i < NUM_MT_CODER_THREADS_MAX; i++)
256   {
257     CMtThread *t = &p->threads[i];
258     t->index = i;
259     CMtThread_Construct(t, p);
260   }
261   CriticalSection_Init(&p->cs);
262   CriticalSection_Init(&p->mtProgress.cs);
263 }
264 
MtCoder_Destruct(CMtCoder * p)265 void MtCoder_Destruct(CMtCoder* p)
266 {
267   unsigned i;
268   for (i = 0; i < NUM_MT_CODER_THREADS_MAX; i++)
269     CMtThread_Destruct(&p->threads[i]);
270   CriticalSection_Delete(&p->cs);
271   CriticalSection_Delete(&p->mtProgress.cs);
272 }
273 
MtCoder_Code(CMtCoder * p)274 SRes MtCoder_Code(CMtCoder *p)
275 {
276   unsigned i, numThreads = p->numThreads;
277   SRes res = SZ_OK;
278   p->res = SZ_OK;
279 
280   MtProgress_Init(&p->mtProgress, p->progress);
281 
282   for (i = 0; i < numThreads; i++)
283   {
284     RINOK(CMtThread_Prepare(&p->threads[i]));
285   }
286 
287   for (i = 0; i < numThreads; i++)
288   {
289     CMtThread *t = &p->threads[i];
290     CLoopThread *lt = &t->thread;
291 
292     if (!Thread_WasCreated(&lt->thread))
293     {
294       lt->func = ThreadFunc;
295       lt->param = t;
296 
297       if (LoopThread_Create(lt) != SZ_OK)
298       {
299         res = SZ_ERROR_THREAD;
300         break;
301       }
302     }
303   }
304 
305   if (res == SZ_OK)
306   {
307     unsigned j;
308     for (i = 0; i < numThreads; i++)
309     {
310       CMtThread *t = &p->threads[i];
311       if (LoopThread_StartSubThread(&t->thread) != SZ_OK)
312       {
313         res = SZ_ERROR_THREAD;
314         p->threads[0].stopReading = True;
315         break;
316       }
317     }
318 
319     Event_Set(&p->threads[0].canWrite);
320     Event_Set(&p->threads[0].canRead);
321 
322     for (j = 0; j < i; j++)
323       LoopThread_WaitSubThread(&p->threads[j].thread);
324   }
325 
326   for (i = 0; i < numThreads; i++)
327     CMtThread_CloseEvents(&p->threads[i]);
328   return (res == SZ_OK) ? p->res : res;
329 }
330