1 /* MtDec.h -- Multi-thread Decoder
2 2018-07-04 : Igor Pavlov : Public domain */
3 
4 #ifndef __MT_DEC_H
5 #define __MT_DEC_H
6 
7 #include "7zTypes.h"
8 
9 #ifndef _7ZIP_ST
10 #include "Threads.h"
11 #endif
12 
13 EXTERN_C_BEGIN
14 
15 #ifndef _7ZIP_ST
16 
17 #ifndef _7ZIP_ST
18   #define MTDEC__THREADS_MAX 32
19 #else
20   #define MTDEC__THREADS_MAX 1
21 #endif
22 
23 
24 typedef struct
25 {
26   ICompressProgress *progress;
27   SRes res;
28   UInt64 totalInSize;
29   UInt64 totalOutSize;
30   CCriticalSection cs;
31 } CMtProgress;
32 
33 void MtProgress_Init(CMtProgress *p, ICompressProgress *progress);
34 SRes MtProgress_Progress_ST(CMtProgress *p);
35 SRes MtProgress_ProgressAdd(CMtProgress *p, UInt64 inSize, UInt64 outSize);
36 SRes MtProgress_GetError(CMtProgress *p);
37 void MtProgress_SetError(CMtProgress *p, SRes res);
38 
39 struct _CMtDec;
40 
41 typedef struct
42 {
43   struct _CMtDec *mtDec;
44   unsigned index;
45   void *inBuf;
46 
47   size_t inDataSize_Start; // size of input data in start block
48   UInt64 inDataSize;       // total size of input data in all blocks
49 
50   CThread thread;
51   CAutoResetEvent canRead;
52   CAutoResetEvent canWrite;
53   void  *allocaPtr;
54 } CMtDecThread;
55 
56 void MtDecThread_FreeInBufs(CMtDecThread *t);
57 
58 
59 typedef enum
60 {
61   MTDEC_PARSE_CONTINUE, // continue this block with more input data
62   MTDEC_PARSE_OVERFLOW, // MT buffers overflow, need switch to single-thread
63   MTDEC_PARSE_NEW,      // new block
64   MTDEC_PARSE_END       // end of block threading. But we still can return to threading after Write(&needContinue)
65 } EMtDecParseState;
66 
67 typedef struct
68 {
69   // in
70   int startCall;
71   const Byte *src;
72   size_t srcSize;
73       // in  : (srcSize == 0) is allowed
74       // out : it's allowed to return less that actually was used ?
75   int srcFinished;
76 
77   // out
78   EMtDecParseState state;
79   BoolInt canCreateNewThread;
80   UInt64 outPos; // check it (size_t)
81 } CMtDecCallbackInfo;
82 
83 
84 typedef struct
85 {
86   void (*Parse)(void *p, unsigned coderIndex, CMtDecCallbackInfo *ci);
87 
88   // PreCode() and Code():
89   // (SRes_return_result != SZ_OK) means stop decoding, no need another blocks
90   SRes (*PreCode)(void *p, unsigned coderIndex);
91   SRes (*Code)(void *p, unsigned coderIndex,
92       const Byte *src, size_t srcSize, int srcFinished,
93       UInt64 *inCodePos, UInt64 *outCodePos, int *stop);
94   // stop - means stop another Code calls
95 
96 
97   /* Write() must be called, if Parse() was called
98       set (needWrite) if
99       {
100          && (was not interrupted by progress)
101          && (was not interrupted in previous block)
102       }
103 
104     out:
105       if (*needContinue), decoder still need to continue decoding with new iteration,
106          even after MTDEC_PARSE_END
107       if (*canRecode), we didn't flush current block data, so we still can decode current block later.
108   */
109   SRes (*Write)(void *p, unsigned coderIndex,
110       BoolInt needWriteToStream,
111       const Byte *src, size_t srcSize,
112       // int srcFinished,
113       BoolInt *needContinue,
114       BoolInt *canRecode);
115 } IMtDecCallback;
116 
117 
118 
119 typedef struct _CMtDec
120 {
121   /* input variables */
122 
123   size_t inBufSize;        /* size of input block */
124   unsigned numThreadsMax;
125   // size_t inBlockMax;
126   unsigned numThreadsMax_2;
127 
128   ISeqInStream *inStream;
129   // const Byte *inData;
130   // size_t inDataSize;
131 
132   ICompressProgress *progress;
133   ISzAllocPtr alloc;
134 
135   IMtDecCallback *mtCallback;
136   void *mtCallbackObject;
137 
138 
139   /* internal variables */
140 
141   size_t allocatedBufsSize;
142 
143   BoolInt exitThread;
144   WRes exitThreadWRes;
145 
146   UInt64 blockIndex;
147   BoolInt isAllocError;
148   BoolInt overflow;
149   SRes threadingErrorSRes;
150 
151   BoolInt needContinue;
152 
153   // CAutoResetEvent finishedEvent;
154 
155   SRes readRes;
156   SRes codeRes;
157 
158   BoolInt wasInterrupted;
159 
160   unsigned numStartedThreads_Limit;
161   unsigned numStartedThreads;
162 
163   Byte *crossBlock;
164   size_t crossStart;
165   size_t crossEnd;
166   UInt64 readProcessed;
167   BoolInt readWasFinished;
168   UInt64 inProcessed;
169 
170   unsigned filledThreadStart;
171   unsigned numFilledThreads;
172 
173   #ifndef _7ZIP_ST
174   BoolInt needInterrupt;
175   UInt64 interruptIndex;
176   CMtProgress mtProgress;
177   CMtDecThread threads[MTDEC__THREADS_MAX];
178   #endif
179 } CMtDec;
180 
181 
182 void MtDec_Construct(CMtDec *p);
183 void MtDec_Destruct(CMtDec *p);
184 
185 /*
186 MtDec_Code() returns:
187   SZ_OK - in most cases
188   MY_SRes_HRESULT_FROM_WRes(WRes_error) - in case of unexpected error in threading function
189 */
190 
191 SRes MtDec_Code(CMtDec *p);
192 Byte *MtDec_GetCrossBuff(CMtDec *p);
193 
194 int MtDec_PrepareRead(CMtDec *p);
195 const Byte *MtDec_Read(CMtDec *p, size_t *inLim);
196 
197 #endif
198 
199 EXTERN_C_END
200 
201 #endif
202