Lines Matching refs:mtctx

869 static size_t ZSTDMT_expandJobsTable (ZSTDMT_CCtx* mtctx, U32 nbWorkers) {  in ZSTDMT_expandJobsTable()  argument
871 if (nbJobs > mtctx->jobIDMask+1) { /* need more job capacity */ in ZSTDMT_expandJobsTable()
872 ZSTDMT_freeJobsTable(mtctx->jobs, mtctx->jobIDMask+1, mtctx->cMem); in ZSTDMT_expandJobsTable()
873 mtctx->jobIDMask = 0; in ZSTDMT_expandJobsTable()
874 mtctx->jobs = ZSTDMT_createJobsTable(&nbJobs, mtctx->cMem); in ZSTDMT_expandJobsTable()
875 if (mtctx->jobs==NULL) return ERROR(memory_allocation); in ZSTDMT_expandJobsTable()
877 mtctx->jobIDMask = nbJobs - 1; in ZSTDMT_expandJobsTable()
892 ZSTDMT_CCtx* mtctx; in ZSTDMT_createCCtx_advanced_internal() local
903 mtctx = (ZSTDMT_CCtx*) ZSTD_customCalloc(sizeof(ZSTDMT_CCtx), cMem); in ZSTDMT_createCCtx_advanced_internal()
904 if (!mtctx) return NULL; in ZSTDMT_createCCtx_advanced_internal()
905 ZSTDMT_CCtxParam_setNbWorkers(&mtctx->params, nbWorkers); in ZSTDMT_createCCtx_advanced_internal()
906 mtctx->cMem = cMem; in ZSTDMT_createCCtx_advanced_internal()
907 mtctx->allJobsCompleted = 1; in ZSTDMT_createCCtx_advanced_internal()
909 mtctx->factory = pool; in ZSTDMT_createCCtx_advanced_internal()
910 mtctx->providedFactory = 1; in ZSTDMT_createCCtx_advanced_internal()
913 mtctx->factory = POOL_create_advanced(nbWorkers, 0, cMem); in ZSTDMT_createCCtx_advanced_internal()
914 mtctx->providedFactory = 0; in ZSTDMT_createCCtx_advanced_internal()
916 mtctx->jobs = ZSTDMT_createJobsTable(&nbJobs, cMem); in ZSTDMT_createCCtx_advanced_internal()
918 mtctx->jobIDMask = nbJobs - 1; in ZSTDMT_createCCtx_advanced_internal()
919 mtctx->bufPool = ZSTDMT_createBufferPool(nbWorkers, cMem); in ZSTDMT_createCCtx_advanced_internal()
920 mtctx->cctxPool = ZSTDMT_createCCtxPool(nbWorkers, cMem); in ZSTDMT_createCCtx_advanced_internal()
921 mtctx->seqPool = ZSTDMT_createSeqPool(nbWorkers, cMem); in ZSTDMT_createCCtx_advanced_internal()
922 initError = ZSTDMT_serialState_init(&mtctx->serial); in ZSTDMT_createCCtx_advanced_internal()
923 mtctx->roundBuff = kNullRoundBuff; in ZSTDMT_createCCtx_advanced_internal()
924 …if (!mtctx->factory | !mtctx->jobs | !mtctx->bufPool | !mtctx->cctxPool | !mtctx->seqPool | initEr… in ZSTDMT_createCCtx_advanced_internal()
925 ZSTDMT_freeCCtx(mtctx); in ZSTDMT_createCCtx_advanced_internal()
929 return mtctx; in ZSTDMT_createCCtx_advanced_internal()
947 static void ZSTDMT_releaseAllJobResources(ZSTDMT_CCtx* mtctx) in ZSTDMT_releaseAllJobResources() argument
951 for (jobID=0; jobID <= mtctx->jobIDMask; jobID++) { in ZSTDMT_releaseAllJobResources()
953 ZSTD_pthread_mutex_t const mutex = mtctx->jobs[jobID].job_mutex; in ZSTDMT_releaseAllJobResources()
954 ZSTD_pthread_cond_t const cond = mtctx->jobs[jobID].job_cond; in ZSTDMT_releaseAllJobResources()
956 …DEBUGLOG(4, "job%02u: release dst address %08X", jobID, (U32)(size_t)mtctx->jobs[jobID].dstBuff.st… in ZSTDMT_releaseAllJobResources()
957 ZSTDMT_releaseBuffer(mtctx->bufPool, mtctx->jobs[jobID].dstBuff); in ZSTDMT_releaseAllJobResources()
960 ZSTD_memset(&mtctx->jobs[jobID], 0, sizeof(mtctx->jobs[jobID])); in ZSTDMT_releaseAllJobResources()
961 mtctx->jobs[jobID].job_mutex = mutex; in ZSTDMT_releaseAllJobResources()
962 mtctx->jobs[jobID].job_cond = cond; in ZSTDMT_releaseAllJobResources()
964 mtctx->inBuff.buffer = g_nullBuffer; in ZSTDMT_releaseAllJobResources()
965 mtctx->inBuff.filled = 0; in ZSTDMT_releaseAllJobResources()
966 mtctx->allJobsCompleted = 1; in ZSTDMT_releaseAllJobResources()
969 static void ZSTDMT_waitForAllJobsCompleted(ZSTDMT_CCtx* mtctx) in ZSTDMT_waitForAllJobsCompleted() argument
972 while (mtctx->doneJobID < mtctx->nextJobID) { in ZSTDMT_waitForAllJobsCompleted()
973 unsigned const jobID = mtctx->doneJobID & mtctx->jobIDMask; in ZSTDMT_waitForAllJobsCompleted()
974 ZSTD_PTHREAD_MUTEX_LOCK(&mtctx->jobs[jobID].job_mutex); in ZSTDMT_waitForAllJobsCompleted()
975 while (mtctx->jobs[jobID].consumed < mtctx->jobs[jobID].src.size) { in ZSTDMT_waitForAllJobsCompleted()
976 …DEBUGLOG(4, "waiting for jobCompleted signal from job %u", mtctx->doneJobID); /* we want to bloc… in ZSTDMT_waitForAllJobsCompleted()
977 ZSTD_pthread_cond_wait(&mtctx->jobs[jobID].job_cond, &mtctx->jobs[jobID].job_mutex); in ZSTDMT_waitForAllJobsCompleted()
979 ZSTD_pthread_mutex_unlock(&mtctx->jobs[jobID].job_mutex); in ZSTDMT_waitForAllJobsCompleted()
980 mtctx->doneJobID++; in ZSTDMT_waitForAllJobsCompleted()
984 size_t ZSTDMT_freeCCtx(ZSTDMT_CCtx* mtctx) in ZSTDMT_freeCCtx() argument
986 if (mtctx==NULL) return 0; /* compatible with free on NULL */ in ZSTDMT_freeCCtx()
987 if (!mtctx->providedFactory) in ZSTDMT_freeCCtx()
988 POOL_free(mtctx->factory); /* stop and free worker threads */ in ZSTDMT_freeCCtx()
989 ZSTDMT_releaseAllJobResources(mtctx); /* release job resources into pools first */ in ZSTDMT_freeCCtx()
990 ZSTDMT_freeJobsTable(mtctx->jobs, mtctx->jobIDMask+1, mtctx->cMem); in ZSTDMT_freeCCtx()
991 ZSTDMT_freeBufferPool(mtctx->bufPool); in ZSTDMT_freeCCtx()
992 ZSTDMT_freeCCtxPool(mtctx->cctxPool); in ZSTDMT_freeCCtx()
993 ZSTDMT_freeSeqPool(mtctx->seqPool); in ZSTDMT_freeCCtx()
994 ZSTDMT_serialState_free(&mtctx->serial); in ZSTDMT_freeCCtx()
995 ZSTD_freeCDict(mtctx->cdictLocal); in ZSTDMT_freeCCtx()
996 if (mtctx->roundBuff.buffer) in ZSTDMT_freeCCtx()
997 ZSTD_customFree(mtctx->roundBuff.buffer, mtctx->cMem); in ZSTDMT_freeCCtx()
998 ZSTD_customFree(mtctx, mtctx->cMem); in ZSTDMT_freeCCtx()
1002 size_t ZSTDMT_sizeof_CCtx(ZSTDMT_CCtx* mtctx) in ZSTDMT_sizeof_CCtx() argument
1004 if (mtctx == NULL) return 0; /* supports sizeof NULL */ in ZSTDMT_sizeof_CCtx()
1005 return sizeof(*mtctx) in ZSTDMT_sizeof_CCtx()
1006 + POOL_sizeof(mtctx->factory) in ZSTDMT_sizeof_CCtx()
1007 + ZSTDMT_sizeof_bufferPool(mtctx->bufPool) in ZSTDMT_sizeof_CCtx()
1008 + (mtctx->jobIDMask+1) * sizeof(ZSTDMT_jobDescription) in ZSTDMT_sizeof_CCtx()
1009 + ZSTDMT_sizeof_CCtxPool(mtctx->cctxPool) in ZSTDMT_sizeof_CCtx()
1010 + ZSTDMT_sizeof_seqPool(mtctx->seqPool) in ZSTDMT_sizeof_CCtx()
1011 + ZSTD_sizeof_CDict(mtctx->cdictLocal) in ZSTDMT_sizeof_CCtx()
1012 + mtctx->roundBuff.capacity; in ZSTDMT_sizeof_CCtx()
1018 static size_t ZSTDMT_resize(ZSTDMT_CCtx* mtctx, unsigned nbWorkers) in ZSTDMT_resize() argument
1020 if (POOL_resize(mtctx->factory, nbWorkers)) return ERROR(memory_allocation); in ZSTDMT_resize()
1021 FORWARD_IF_ERROR( ZSTDMT_expandJobsTable(mtctx, nbWorkers) , ""); in ZSTDMT_resize()
1022 mtctx->bufPool = ZSTDMT_expandBufferPool(mtctx->bufPool, nbWorkers); in ZSTDMT_resize()
1023 if (mtctx->bufPool == NULL) return ERROR(memory_allocation); in ZSTDMT_resize()
1024 mtctx->cctxPool = ZSTDMT_expandCCtxPool(mtctx->cctxPool, nbWorkers); in ZSTDMT_resize()
1025 if (mtctx->cctxPool == NULL) return ERROR(memory_allocation); in ZSTDMT_resize()
1026 mtctx->seqPool = ZSTDMT_expandSeqPool(mtctx->seqPool, nbWorkers); in ZSTDMT_resize()
1027 if (mtctx->seqPool == NULL) return ERROR(memory_allocation); in ZSTDMT_resize()
1028 ZSTDMT_CCtxParam_setNbWorkers(&mtctx->params, nbWorkers); in ZSTDMT_resize()
1036 void ZSTDMT_updateCParams_whileCompressing(ZSTDMT_CCtx* mtctx, const ZSTD_CCtx_params* cctxParams) in ZSTDMT_updateCParams_whileCompressing() argument
1038 …U32 const saved_wlog = mtctx->params.cParams.windowLog; /* Do not modify windowLog while compres… in ZSTDMT_updateCParams_whileCompressing()
1042 mtctx->params.compressionLevel = compressionLevel; in ZSTDMT_updateCParams_whileCompressing()
1045 mtctx->params.cParams = cParams; in ZSTDMT_updateCParams_whileCompressing()
1053 ZSTD_frameProgression ZSTDMT_getFrameProgression(ZSTDMT_CCtx* mtctx) in ZSTDMT_getFrameProgression() argument
1057 fps.ingested = mtctx->consumed + mtctx->inBuff.filled; in ZSTDMT_getFrameProgression()
1058 fps.consumed = mtctx->consumed; in ZSTDMT_getFrameProgression()
1059 fps.produced = fps.flushed = mtctx->produced; in ZSTDMT_getFrameProgression()
1060 fps.currentJobID = mtctx->nextJobID; in ZSTDMT_getFrameProgression()
1063 unsigned lastJobNb = mtctx->nextJobID + mtctx->jobReady; assert(mtctx->jobReady <= 1); in ZSTDMT_getFrameProgression()
1065 mtctx->doneJobID, lastJobNb, mtctx->jobReady) in ZSTDMT_getFrameProgression()
1066 for (jobNb = mtctx->doneJobID ; jobNb < lastJobNb ; jobNb++) { in ZSTDMT_getFrameProgression()
1067 unsigned const wJobID = jobNb & mtctx->jobIDMask; in ZSTDMT_getFrameProgression()
1068 ZSTDMT_jobDescription* jobPtr = &mtctx->jobs[wJobID]; in ZSTDMT_getFrameProgression()
1080 ZSTD_pthread_mutex_unlock(&mtctx->jobs[wJobID].job_mutex); in ZSTDMT_getFrameProgression()
1087 size_t ZSTDMT_toFlushNow(ZSTDMT_CCtx* mtctx) in ZSTDMT_toFlushNow() argument
1090 unsigned const jobID = mtctx->doneJobID; in ZSTDMT_toFlushNow()
1091 assert(jobID <= mtctx->nextJobID); in ZSTDMT_toFlushNow()
1092 if (jobID == mtctx->nextJobID) return 0; /* no active job => nothing to flush */ in ZSTDMT_toFlushNow()
1095 { unsigned const wJobID = jobID & mtctx->jobIDMask; in ZSTDMT_toFlushNow()
1096 ZSTDMT_jobDescription* const jobPtr = &mtctx->jobs[wJobID]; in ZSTDMT_toFlushNow()
1113 ZSTD_pthread_mutex_unlock(&mtctx->jobs[wJobID].job_mutex); in ZSTDMT_toFlushNow()
1190 ZSTDMT_CCtx* mtctx, in ZSTDMT_initCStream_internal() argument
1196 (U32)pledgedSrcSize, params.nbWorkers, mtctx->cctxPool->totalCCtx); in ZSTDMT_initCStream_internal()
1203 if (params.nbWorkers != mtctx->params.nbWorkers) in ZSTDMT_initCStream_internal()
1204 FORWARD_IF_ERROR( ZSTDMT_resize(mtctx, params.nbWorkers) , ""); in ZSTDMT_initCStream_internal()
1211 if (mtctx->allJobsCompleted == 0) { /* previous compression not correctly finished */ in ZSTDMT_initCStream_internal()
1212 ZSTDMT_waitForAllJobsCompleted(mtctx); in ZSTDMT_initCStream_internal()
1213 ZSTDMT_releaseAllJobResources(mtctx); in ZSTDMT_initCStream_internal()
1214 mtctx->allJobsCompleted = 1; in ZSTDMT_initCStream_internal()
1217 mtctx->params = params; in ZSTDMT_initCStream_internal()
1218 mtctx->frameContentSize = pledgedSrcSize; in ZSTDMT_initCStream_internal()
1220 ZSTD_freeCDict(mtctx->cdictLocal); in ZSTDMT_initCStream_internal()
1221 mtctx->cdictLocal = ZSTD_createCDict_advanced(dict, dictSize, in ZSTDMT_initCStream_internal()
1223 params.cParams, mtctx->cMem); in ZSTDMT_initCStream_internal()
1224 mtctx->cdict = mtctx->cdictLocal; in ZSTDMT_initCStream_internal()
1225 if (mtctx->cdictLocal == NULL) return ERROR(memory_allocation); in ZSTDMT_initCStream_internal()
1227 ZSTD_freeCDict(mtctx->cdictLocal); in ZSTDMT_initCStream_internal()
1228 mtctx->cdictLocal = NULL; in ZSTDMT_initCStream_internal()
1229 mtctx->cdict = cdict; in ZSTDMT_initCStream_internal()
1232 mtctx->targetPrefixSize = ZSTDMT_computeOverlapSize(&params); in ZSTDMT_initCStream_internal()
1233 DEBUGLOG(4, "overlapLog=%i => %u KB", params.overlapLog, (U32)(mtctx->targetPrefixSize>>10)); in ZSTDMT_initCStream_internal()
1234 mtctx->targetSectionSize = params.jobSize; in ZSTDMT_initCStream_internal()
1235 if (mtctx->targetSectionSize == 0) { in ZSTDMT_initCStream_internal()
1236 mtctx->targetSectionSize = 1ULL << ZSTDMT_computeTargetJobLog(&params); in ZSTDMT_initCStream_internal()
1238 assert(mtctx->targetSectionSize <= (size_t)ZSTDMT_JOBSIZE_MAX); in ZSTDMT_initCStream_internal()
1242 U32 const jobSizeMB = (U32)(mtctx->targetSectionSize >> 20); in ZSTDMT_initCStream_internal()
1246 mtctx->rsync.hash = 0; in ZSTDMT_initCStream_internal()
1247 mtctx->rsync.hitMask = (1ULL << rsyncBits) - 1; in ZSTDMT_initCStream_internal()
1248 mtctx->rsync.primePower = ZSTD_rollingHash_primePower(RSYNC_LENGTH); in ZSTDMT_initCStream_internal()
1250 …if (mtctx->targetSectionSize < mtctx->targetPrefixSize) mtctx->targetSectionSize = mtctx->targetPr… in ZSTDMT_initCStream_internal()
1251 …DEBUGLOG(4, "Job Size : %u KB (note : set to %u)", (U32)(mtctx->targetSectionSize>>10), (U32)param… in ZSTDMT_initCStream_internal()
1252 DEBUGLOG(4, "inBuff Size : %u KB", (U32)(mtctx->targetSectionSize>>10)); in ZSTDMT_initCStream_internal()
1253 ZSTDMT_setBufferSize(mtctx->bufPool, ZSTD_compressBound(mtctx->targetSectionSize)); in ZSTDMT_initCStream_internal()
1256 …size_t const windowSize = mtctx->params.ldmParams.enableLdm ? (1U << mtctx->params.cParams.windowL… in ZSTDMT_initCStream_internal()
1263 size_t const nbSlackBuffers = 2 + (mtctx->targetPrefixSize > 0); in ZSTDMT_initCStream_internal()
1264 size_t const slackSize = mtctx->targetSectionSize * nbSlackBuffers; in ZSTDMT_initCStream_internal()
1266 size_t const nbWorkers = MAX(mtctx->params.nbWorkers, 1); in ZSTDMT_initCStream_internal()
1267 size_t const sectionsSize = mtctx->targetSectionSize * nbWorkers; in ZSTDMT_initCStream_internal()
1269 if (mtctx->roundBuff.capacity < capacity) { in ZSTDMT_initCStream_internal()
1270 if (mtctx->roundBuff.buffer) in ZSTDMT_initCStream_internal()
1271 ZSTD_customFree(mtctx->roundBuff.buffer, mtctx->cMem); in ZSTDMT_initCStream_internal()
1272 mtctx->roundBuff.buffer = (BYTE*)ZSTD_customMalloc(capacity, mtctx->cMem); in ZSTDMT_initCStream_internal()
1273 if (mtctx->roundBuff.buffer == NULL) { in ZSTDMT_initCStream_internal()
1274 mtctx->roundBuff.capacity = 0; in ZSTDMT_initCStream_internal()
1277 mtctx->roundBuff.capacity = capacity; in ZSTDMT_initCStream_internal()
1280 DEBUGLOG(4, "roundBuff capacity : %u KB", (U32)(mtctx->roundBuff.capacity>>10)); in ZSTDMT_initCStream_internal()
1281 mtctx->roundBuff.pos = 0; in ZSTDMT_initCStream_internal()
1282 mtctx->inBuff.buffer = g_nullBuffer; in ZSTDMT_initCStream_internal()
1283 mtctx->inBuff.filled = 0; in ZSTDMT_initCStream_internal()
1284 mtctx->inBuff.prefix = kNullRange; in ZSTDMT_initCStream_internal()
1285 mtctx->doneJobID = 0; in ZSTDMT_initCStream_internal()
1286 mtctx->nextJobID = 0; in ZSTDMT_initCStream_internal()
1287 mtctx->frameEnded = 0; in ZSTDMT_initCStream_internal()
1288 mtctx->allJobsCompleted = 0; in ZSTDMT_initCStream_internal()
1289 mtctx->consumed = 0; in ZSTDMT_initCStream_internal()
1290 mtctx->produced = 0; in ZSTDMT_initCStream_internal()
1291 if (ZSTDMT_serialState_reset(&mtctx->serial, mtctx->seqPool, params, mtctx->targetSectionSize, in ZSTDMT_initCStream_internal()
1321 static size_t ZSTDMT_createCompressionJob(ZSTDMT_CCtx* mtctx, size_t srcSize, ZSTD_EndDirective end… in ZSTDMT_createCompressionJob() argument
1323 unsigned const jobID = mtctx->nextJobID & mtctx->jobIDMask; in ZSTDMT_createCompressionJob()
1326 if (mtctx->nextJobID > mtctx->doneJobID + mtctx->jobIDMask) { in ZSTDMT_createCompressionJob()
1328 assert((mtctx->nextJobID & mtctx->jobIDMask) == (mtctx->doneJobID & mtctx->jobIDMask)); in ZSTDMT_createCompressionJob()
1332 if (!mtctx->jobReady) { in ZSTDMT_createCompressionJob()
1333 BYTE const* src = (BYTE const*)mtctx->inBuff.buffer.start; in ZSTDMT_createCompressionJob()
1335 mtctx->nextJobID, (U32)srcSize, (U32)mtctx->inBuff.prefix.size); in ZSTDMT_createCompressionJob()
1336 mtctx->jobs[jobID].src.start = src; in ZSTDMT_createCompressionJob()
1337 mtctx->jobs[jobID].src.size = srcSize; in ZSTDMT_createCompressionJob()
1338 assert(mtctx->inBuff.filled >= srcSize); in ZSTDMT_createCompressionJob()
1339 mtctx->jobs[jobID].prefix = mtctx->inBuff.prefix; in ZSTDMT_createCompressionJob()
1340 mtctx->jobs[jobID].consumed = 0; in ZSTDMT_createCompressionJob()
1341 mtctx->jobs[jobID].cSize = 0; in ZSTDMT_createCompressionJob()
1342 mtctx->jobs[jobID].params = mtctx->params; in ZSTDMT_createCompressionJob()
1343 mtctx->jobs[jobID].cdict = mtctx->nextJobID==0 ? mtctx->cdict : NULL; in ZSTDMT_createCompressionJob()
1344 mtctx->jobs[jobID].fullFrameSize = mtctx->frameContentSize; in ZSTDMT_createCompressionJob()
1345 mtctx->jobs[jobID].dstBuff = g_nullBuffer; in ZSTDMT_createCompressionJob()
1346 mtctx->jobs[jobID].cctxPool = mtctx->cctxPool; in ZSTDMT_createCompressionJob()
1347 mtctx->jobs[jobID].bufPool = mtctx->bufPool; in ZSTDMT_createCompressionJob()
1348 mtctx->jobs[jobID].seqPool = mtctx->seqPool; in ZSTDMT_createCompressionJob()
1349 mtctx->jobs[jobID].serial = &mtctx->serial; in ZSTDMT_createCompressionJob()
1350 mtctx->jobs[jobID].jobID = mtctx->nextJobID; in ZSTDMT_createCompressionJob()
1351 mtctx->jobs[jobID].firstJob = (mtctx->nextJobID==0); in ZSTDMT_createCompressionJob()
1352 mtctx->jobs[jobID].lastJob = endFrame; in ZSTDMT_createCompressionJob()
1353mtctx->jobs[jobID].frameChecksumNeeded = mtctx->params.fParams.checksumFlag && endFrame && (mtctx-… in ZSTDMT_createCompressionJob()
1354 mtctx->jobs[jobID].dstFlushed = 0; in ZSTDMT_createCompressionJob()
1357 mtctx->roundBuff.pos += srcSize; in ZSTDMT_createCompressionJob()
1358 mtctx->inBuff.buffer = g_nullBuffer; in ZSTDMT_createCompressionJob()
1359 mtctx->inBuff.filled = 0; in ZSTDMT_createCompressionJob()
1362 size_t const newPrefixSize = MIN(srcSize, mtctx->targetPrefixSize); in ZSTDMT_createCompressionJob()
1363 mtctx->inBuff.prefix.start = src + srcSize - newPrefixSize; in ZSTDMT_createCompressionJob()
1364 mtctx->inBuff.prefix.size = newPrefixSize; in ZSTDMT_createCompressionJob()
1366 mtctx->inBuff.prefix = kNullRange; in ZSTDMT_createCompressionJob()
1367 mtctx->frameEnded = endFrame; in ZSTDMT_createCompressionJob()
1368 if (mtctx->nextJobID == 0) { in ZSTDMT_createCompressionJob()
1370 mtctx->params.fParams.checksumFlag = 0; in ZSTDMT_createCompressionJob()
1374 && (mtctx->nextJobID>0)/*single job must also write frame header*/ ) { in ZSTDMT_createCompressionJob()
1377 ZSTDMT_writeLastEmptyBlock(mtctx->jobs + jobID); in ZSTDMT_createCompressionJob()
1378 mtctx->nextJobID++; in ZSTDMT_createCompressionJob()
1384 mtctx->nextJobID, in ZSTDMT_createCompressionJob()
1385 (U32)mtctx->jobs[jobID].src.size, in ZSTDMT_createCompressionJob()
1386 mtctx->jobs[jobID].lastJob, in ZSTDMT_createCompressionJob()
1387 mtctx->nextJobID, in ZSTDMT_createCompressionJob()
1389 if (POOL_tryAdd(mtctx->factory, ZSTDMT_compressionJob, &mtctx->jobs[jobID])) { in ZSTDMT_createCompressionJob()
1390 mtctx->nextJobID++; in ZSTDMT_createCompressionJob()
1391 mtctx->jobReady = 0; in ZSTDMT_createCompressionJob()
1393 … DEBUGLOG(5, "ZSTDMT_createCompressionJob: no worker available for job %u", mtctx->nextJobID); in ZSTDMT_createCompressionJob()
1394 mtctx->jobReady = 1; in ZSTDMT_createCompressionJob()
1406 static size_t ZSTDMT_flushProduced(ZSTDMT_CCtx* mtctx, ZSTD_outBuffer* output, unsigned blockToFlus… in ZSTDMT_flushProduced() argument
1408 unsigned const wJobID = mtctx->doneJobID & mtctx->jobIDMask; in ZSTDMT_flushProduced()
1410 blockToFlush, mtctx->doneJobID, mtctx->nextJobID); in ZSTDMT_flushProduced()
1413 ZSTD_PTHREAD_MUTEX_LOCK(&mtctx->jobs[wJobID].job_mutex); in ZSTDMT_flushProduced()
1415 && (mtctx->doneJobID < mtctx->nextJobID) ) { in ZSTDMT_flushProduced()
1416 assert(mtctx->jobs[wJobID].dstFlushed <= mtctx->jobs[wJobID].cSize); in ZSTDMT_flushProduced()
1417 … while (mtctx->jobs[wJobID].dstFlushed == mtctx->jobs[wJobID].cSize) { /* nothing to flush */ in ZSTDMT_flushProduced()
1418 if (mtctx->jobs[wJobID].consumed == mtctx->jobs[wJobID].src.size) { in ZSTDMT_flushProduced()
1420mtctx->doneJobID, (U32)mtctx->jobs[wJobID].consumed, (U32)mtctx->jobs[wJobID].src.size); in ZSTDMT_flushProduced()
1424 mtctx->doneJobID, (U32)mtctx->jobs[wJobID].dstFlushed); in ZSTDMT_flushProduced()
1425 …ZSTD_pthread_cond_wait(&mtctx->jobs[wJobID].job_cond, &mtctx->jobs[wJobID].job_mutex); /* block w… in ZSTDMT_flushProduced()
1429 { size_t cSize = mtctx->jobs[wJobID].cSize; /* shared */ in ZSTDMT_flushProduced()
1430 size_t const srcConsumed = mtctx->jobs[wJobID].consumed; /* shared */ in ZSTDMT_flushProduced()
1431 …size_t const srcSize = mtctx->jobs[wJobID].src.size; /* read-only, could be done after mutex… in ZSTDMT_flushProduced()
1432 ZSTD_pthread_mutex_unlock(&mtctx->jobs[wJobID].job_mutex); in ZSTDMT_flushProduced()
1435 mtctx->doneJobID, ZSTD_getErrorName(cSize)); in ZSTDMT_flushProduced()
1436 ZSTDMT_waitForAllJobsCompleted(mtctx); in ZSTDMT_flushProduced()
1437 ZSTDMT_releaseAllJobResources(mtctx); in ZSTDMT_flushProduced()
1443 && mtctx->jobs[wJobID].frameChecksumNeeded ) { in ZSTDMT_flushProduced()
1444 U32 const checksum = (U32)XXH64_digest(&mtctx->serial.xxhState); in ZSTDMT_flushProduced()
1446 … MEM_writeLE32((char*)mtctx->jobs[wJobID].dstBuff.start + mtctx->jobs[wJobID].cSize, checksum); in ZSTDMT_flushProduced()
1448mtctx->jobs[wJobID].cSize += 4; /* can write this shared value, as worker is no longer active */ in ZSTDMT_flushProduced()
1449 mtctx->jobs[wJobID].frameChecksumNeeded = 0; in ZSTDMT_flushProduced()
1453 … size_t const toFlush = MIN(cSize - mtctx->jobs[wJobID].dstFlushed, output->size - output->pos); in ZSTDMT_flushProduced()
1455 (U32)toFlush, mtctx->doneJobID, (U32)srcConsumed, (U32)srcSize, (U32)cSize); in ZSTDMT_flushProduced()
1456 assert(mtctx->doneJobID < mtctx->nextJobID); in ZSTDMT_flushProduced()
1457 assert(cSize >= mtctx->jobs[wJobID].dstFlushed); in ZSTDMT_flushProduced()
1458 assert(mtctx->jobs[wJobID].dstBuff.start != NULL); in ZSTDMT_flushProduced()
1461 (const char*)mtctx->jobs[wJobID].dstBuff.start + mtctx->jobs[wJobID].dstFlushed, in ZSTDMT_flushProduced()
1465mtctx->jobs[wJobID].dstFlushed += toFlush; /* can write : this value is only used by mtctx */ in ZSTDMT_flushProduced()
1468 …&& (mtctx->jobs[wJobID].dstFlushed == cSize) ) { /* output buffer fully flushed => free this job… in ZSTDMT_flushProduced()
1470 mtctx->doneJobID, (U32)mtctx->jobs[wJobID].dstFlushed); in ZSTDMT_flushProduced()
1471 ZSTDMT_releaseBuffer(mtctx->bufPool, mtctx->jobs[wJobID].dstBuff); in ZSTDMT_flushProduced()
1473 mtctx->jobs[wJobID].dstBuff = g_nullBuffer; in ZSTDMT_flushProduced()
1474mtctx->jobs[wJobID].cSize = 0; /* ensure this job slot is considered "not started" in future che… in ZSTDMT_flushProduced()
1475 mtctx->consumed += srcSize; in ZSTDMT_flushProduced()
1476 mtctx->produced += cSize; in ZSTDMT_flushProduced()
1477 mtctx->doneJobID++; in ZSTDMT_flushProduced()
1481 if (cSize > mtctx->jobs[wJobID].dstFlushed) return (cSize - mtctx->jobs[wJobID].dstFlushed); in ZSTDMT_flushProduced()
1484 if (mtctx->doneJobID < mtctx->nextJobID) return 1; /* some more jobs ongoing */ in ZSTDMT_flushProduced()
1485 if (mtctx->jobReady) return 1; /* one job is ready to push, just not yet in the list */ in ZSTDMT_flushProduced()
1486 …if (mtctx->inBuff.filled > 0) return 1; /* input is not empty, and still needs to be converted i… in ZSTDMT_flushProduced()
1487mtctx->allJobsCompleted = mtctx->frameEnded; /* all jobs are entirely flushed => if this one is … in ZSTDMT_flushProduced()
1488 …if (end == ZSTD_e_end) return !mtctx->frameEnded; /* for ZSTD_e_end, question becomes : is frame … in ZSTDMT_flushProduced()
1497 static range_t ZSTDMT_getInputDataInUse(ZSTDMT_CCtx* mtctx) in ZSTDMT_getInputDataInUse() argument
1499 unsigned const firstJobID = mtctx->doneJobID; in ZSTDMT_getInputDataInUse()
1500 unsigned const lastJobID = mtctx->nextJobID; in ZSTDMT_getInputDataInUse()
1504 unsigned const wJobID = jobID & mtctx->jobIDMask; in ZSTDMT_getInputDataInUse()
1507 ZSTD_PTHREAD_MUTEX_LOCK(&mtctx->jobs[wJobID].job_mutex); in ZSTDMT_getInputDataInUse()
1508 consumed = mtctx->jobs[wJobID].consumed; in ZSTDMT_getInputDataInUse()
1509 ZSTD_pthread_mutex_unlock(&mtctx->jobs[wJobID].job_mutex); in ZSTDMT_getInputDataInUse()
1511 if (consumed < mtctx->jobs[wJobID].src.size) { in ZSTDMT_getInputDataInUse()
1512 range_t range = mtctx->jobs[wJobID].prefix; in ZSTDMT_getInputDataInUse()
1515 range = mtctx->jobs[wJobID].src; in ZSTDMT_getInputDataInUse()
1518 assert(range.start <= mtctx->jobs[wJobID].src.start); in ZSTDMT_getInputDataInUse()
1566 static void ZSTDMT_waitForLdmComplete(ZSTDMT_CCtx* mtctx, buffer_t buffer) in ZSTDMT_waitForLdmComplete() argument
1568 if (mtctx->params.ldmParams.enableLdm) { in ZSTDMT_waitForLdmComplete()
1569 ZSTD_pthread_mutex_t* mutex = &mtctx->serial.ldmWindowMutex; in ZSTDMT_waitForLdmComplete()
1575 while (ZSTDMT_doesOverlapWindow(buffer, mtctx->serial.ldmWindow)) { in ZSTDMT_waitForLdmComplete()
1577 ZSTD_pthread_cond_wait(&mtctx->serial.ldmWindowCond, mutex); in ZSTDMT_waitForLdmComplete()
1589 static int ZSTDMT_tryGetInputRange(ZSTDMT_CCtx* mtctx) in ZSTDMT_tryGetInputRange() argument
1591 range_t const inUse = ZSTDMT_getInputDataInUse(mtctx); in ZSTDMT_tryGetInputRange()
1592 size_t const spaceLeft = mtctx->roundBuff.capacity - mtctx->roundBuff.pos; in ZSTDMT_tryGetInputRange()
1593 size_t const target = mtctx->targetSectionSize; in ZSTDMT_tryGetInputRange()
1597 assert(mtctx->inBuff.buffer.start == NULL); in ZSTDMT_tryGetInputRange()
1598 assert(mtctx->roundBuff.capacity >= target); in ZSTDMT_tryGetInputRange()
1604 BYTE* const start = (BYTE*)mtctx->roundBuff.buffer; in ZSTDMT_tryGetInputRange()
1605 size_t const prefixSize = mtctx->inBuff.prefix.size; in ZSTDMT_tryGetInputRange()
1613 ZSTDMT_waitForLdmComplete(mtctx, buffer); in ZSTDMT_tryGetInputRange()
1614 ZSTD_memmove(start, mtctx->inBuff.prefix.start, prefixSize); in ZSTDMT_tryGetInputRange()
1615 mtctx->inBuff.prefix.start = start; in ZSTDMT_tryGetInputRange()
1616 mtctx->roundBuff.pos = prefixSize; in ZSTDMT_tryGetInputRange()
1618 buffer.start = mtctx->roundBuff.buffer + mtctx->roundBuff.pos; in ZSTDMT_tryGetInputRange()
1625 assert(!ZSTDMT_isOverlapped(buffer, mtctx->inBuff.prefix)); in ZSTDMT_tryGetInputRange()
1627 ZSTDMT_waitForLdmComplete(mtctx, buffer); in ZSTDMT_tryGetInputRange()
1630 (size_t)mtctx->inBuff.prefix.start, in ZSTDMT_tryGetInputRange()
1631 (size_t)mtctx->inBuff.prefix.start + mtctx->inBuff.prefix.size); in ZSTDMT_tryGetInputRange()
1637 mtctx->inBuff.buffer = buffer; in ZSTDMT_tryGetInputRange()
1638 mtctx->inBuff.filled = 0; in ZSTDMT_tryGetInputRange()
1639 assert(mtctx->roundBuff.pos + buffer.capacity <= mtctx->roundBuff.capacity); in ZSTDMT_tryGetInputRange()
1655 findSynchronizationPoint(ZSTDMT_CCtx const* mtctx, ZSTD_inBuffer const input) in findSynchronizationPoint() argument
1658 U64 const primePower = mtctx->rsync.primePower; in findSynchronizationPoint()
1659 U64 const hitMask = mtctx->rsync.hitMask; in findSynchronizationPoint()
1666 syncPoint.toLoad = MIN(input.size - input.pos, mtctx->targetSectionSize - mtctx->inBuff.filled); in findSynchronizationPoint()
1668 if (!mtctx->params.rsyncable) in findSynchronizationPoint()
1671 if (mtctx->inBuff.filled + syncPoint.toLoad < RSYNC_LENGTH) in findSynchronizationPoint()
1681 if (mtctx->inBuff.filled >= RSYNC_LENGTH) { in findSynchronizationPoint()
1686 prev = (BYTE const*)mtctx->inBuff.buffer.start + mtctx->inBuff.filled - RSYNC_LENGTH; in findSynchronizationPoint()
1704 pos = RSYNC_LENGTH - mtctx->inBuff.filled; in findSynchronizationPoint()
1705 prev = (BYTE const*)mtctx->inBuff.buffer.start - pos; in findSynchronizationPoint()
1706 hash = ZSTD_rollingHash_compute(mtctx->inBuff.buffer.start, mtctx->inBuff.filled); in findSynchronizationPoint()
1730 size_t ZSTDMT_nextInputSizeHint(const ZSTDMT_CCtx* mtctx) in ZSTDMT_nextInputSizeHint() argument
1732 size_t hintInSize = mtctx->targetSectionSize - mtctx->inBuff.filled; in ZSTDMT_nextInputSizeHint()
1733 if (hintInSize==0) hintInSize = mtctx->targetSectionSize; in ZSTDMT_nextInputSizeHint()
1741 size_t ZSTDMT_compressStream_generic(ZSTDMT_CCtx* mtctx, in ZSTDMT_compressStream_generic() argument
1752 if ((mtctx->frameEnded) && (endOp==ZSTD_e_continue)) { in ZSTDMT_compressStream_generic()
1758 if ( (!mtctx->jobReady) in ZSTDMT_compressStream_generic()
1760 if (mtctx->inBuff.buffer.start == NULL) { in ZSTDMT_compressStream_generic()
1761 assert(mtctx->inBuff.filled == 0); /* Can't fill an empty buffer */ in ZSTDMT_compressStream_generic()
1762 if (!ZSTDMT_tryGetInputRange(mtctx)) { in ZSTDMT_compressStream_generic()
1767 assert(mtctx->doneJobID != mtctx->nextJobID); in ZSTDMT_compressStream_generic()
1769 …yGetInputRange completed successfully : mtctx->inBuff.buffer.start = %p", mtctx->inBuff.buffer.sta… in ZSTDMT_compressStream_generic()
1771 if (mtctx->inBuff.buffer.start != NULL) { in ZSTDMT_compressStream_generic()
1772 syncPoint_t const syncPoint = findSynchronizationPoint(mtctx, *input); in ZSTDMT_compressStream_generic()
1776 assert(mtctx->inBuff.buffer.capacity >= mtctx->targetSectionSize); in ZSTDMT_compressStream_generic()
1778 … (U32)syncPoint.toLoad, (U32)mtctx->inBuff.filled, (U32)mtctx->targetSectionSize); in ZSTDMT_compressStream_generic()
1779 …ZSTD_memcpy((char*)mtctx->inBuff.buffer.start + mtctx->inBuff.filled, (const char*)input->src + in… in ZSTDMT_compressStream_generic()
1781 mtctx->inBuff.filled += syncPoint.toLoad; in ZSTDMT_compressStream_generic()
1792 …assert(mtctx->inBuff.filled == 0 || mtctx->inBuff.filled == mtctx->targetSectionSize || mtctx->par… in ZSTDMT_compressStream_generic()
1796 if ( (mtctx->jobReady) in ZSTDMT_compressStream_generic()
1797 || (mtctx->inBuff.filled >= mtctx->targetSectionSize) /* filled enough : let's compress */ in ZSTDMT_compressStream_generic()
1798 … || ((endOp != ZSTD_e_continue) && (mtctx->inBuff.filled > 0)) /* something to flush : let's go */ in ZSTDMT_compressStream_generic()
1799 …|| ((endOp == ZSTD_e_end) && (!mtctx->frameEnded)) ) { /* must finish the frame with a zero-size… in ZSTDMT_compressStream_generic()
1800 size_t const jobSize = mtctx->inBuff.filled; in ZSTDMT_compressStream_generic()
1801 assert(mtctx->inBuff.filled <= mtctx->targetSectionSize); in ZSTDMT_compressStream_generic()
1802 FORWARD_IF_ERROR( ZSTDMT_createCompressionJob(mtctx, jobSize, endOp) , ""); in ZSTDMT_compressStream_generic()
1806 …{ size_t const remainingToFlush = ZSTDMT_flushProduced(mtctx, output, !forwardInputProgress, end… in ZSTDMT_compressStream_generic()