1 /* 2 * Licensed to the Apache Software Foundation (ASF) under one or more 3 * contributor license agreements. See the NOTICE file distributed with 4 * this work for additional information regarding copyright ownership. 5 * The ASF licenses this file to You under the Apache License, Version 2.0 6 * (the "License"); you may not use this file except in compliance with 7 * the License. You may obtain a copy of the License at 8 * 9 * http://www.apache.org/licenses/LICENSE-2.0 10 * 11 * Unless required by applicable law or agreed to in writing, software 12 * distributed under the License is distributed on an "AS IS" BASIS, 13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 14 * See the License for the specific language governing permissions and 15 * limitations under the License. 16 * 17 */ 18 package org.apache.commons.compress.archivers.zip; 19 20 import org.apache.commons.compress.parallel.FileBasedScatterGatherBackingStore; 21 import org.apache.commons.compress.parallel.InputStreamSupplier; 22 import org.apache.commons.compress.parallel.ScatterGatherBackingStore; 23 import org.apache.commons.compress.parallel.ScatterGatherBackingStoreSupplier; 24 25 import java.io.File; 26 import java.io.IOException; 27 import java.util.ArrayList; 28 import java.util.List; 29 import java.util.concurrent.Callable; 30 import java.util.concurrent.ExecutionException; 31 import java.util.concurrent.ExecutorService; 32 import java.util.concurrent.Executors; 33 import java.util.concurrent.Future; 34 import java.util.concurrent.TimeUnit; 35 import java.util.concurrent.atomic.AtomicInteger; 36 import java.util.zip.Deflater; 37 38 import static java.util.Collections.synchronizedList; 39 import static org.apache.commons.compress.archivers.zip.ZipArchiveEntryRequest.createZipArchiveEntryRequest; 40 41 /** 42 * Creates a zip in parallel by using multiple threadlocal {@link ScatterZipOutputStream} instances. 43 * <p> 44 * Note that this class generally makes no guarantees about the order of things written to 45 * the output file. Things that need to come in a specific order (manifests, directories) 46 * must be handled by the client of this class, usually by writing these things to the 47 * {@link ZipArchiveOutputStream} <em>before</em> calling {@link #writeTo writeTo} on this class.</p> 48 * <p> 49 * The client can supply an {@link java.util.concurrent.ExecutorService}, but for reasons of 50 * memory model consistency, this will be shut down by this class prior to completion. 51 * </p> 52 * @since 1.10 53 */ 54 public class ParallelScatterZipCreator { 55 private final List<ScatterZipOutputStream> streams = synchronizedList(new ArrayList<ScatterZipOutputStream>()); 56 private final ExecutorService es; 57 private final ScatterGatherBackingStoreSupplier backingStoreSupplier; 58 private final List<Future<Object>> futures = new ArrayList<>(); 59 60 private final long startedAt = System.currentTimeMillis(); 61 private long compressionDoneAt = 0; 62 private long scatterDoneAt; 63 64 private static class DefaultBackingStoreSupplier implements ScatterGatherBackingStoreSupplier { 65 final AtomicInteger storeNum = new AtomicInteger(0); 66 67 @Override get()68 public ScatterGatherBackingStore get() throws IOException { 69 final File tempFile = File.createTempFile("parallelscatter", "n" + storeNum.incrementAndGet()); 70 return new FileBasedScatterGatherBackingStore(tempFile); 71 } 72 } 73 createDeferred(final ScatterGatherBackingStoreSupplier scatterGatherBackingStoreSupplier)74 private ScatterZipOutputStream createDeferred(final ScatterGatherBackingStoreSupplier scatterGatherBackingStoreSupplier) 75 throws IOException { 76 final ScatterGatherBackingStore bs = scatterGatherBackingStoreSupplier.get(); 77 // lifecycle is bound to the ScatterZipOutputStream returned 78 final StreamCompressor sc = StreamCompressor.create(Deflater.DEFAULT_COMPRESSION, bs); //NOSONAR 79 return new ScatterZipOutputStream(bs, sc); 80 } 81 82 private final ThreadLocal<ScatterZipOutputStream> tlScatterStreams = new ThreadLocal<ScatterZipOutputStream>() { 83 @Override 84 protected ScatterZipOutputStream initialValue() { 85 try { 86 final ScatterZipOutputStream scatterStream = createDeferred(backingStoreSupplier); 87 streams.add(scatterStream); 88 return scatterStream; 89 } catch (final IOException e) { 90 throw new RuntimeException(e); //NOSONAR 91 } 92 } 93 }; 94 95 /** 96 * Create a ParallelScatterZipCreator with default threads, which is set to the number of available 97 * processors, as defined by {@link java.lang.Runtime#availableProcessors} 98 */ ParallelScatterZipCreator()99 public ParallelScatterZipCreator() { 100 this(Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors())); 101 } 102 103 /** 104 * Create a ParallelScatterZipCreator 105 * 106 * @param executorService The executorService to use for parallel scheduling. For technical reasons, 107 * this will be shut down by this class. 108 */ ParallelScatterZipCreator(final ExecutorService executorService)109 public ParallelScatterZipCreator(final ExecutorService executorService) { 110 this(executorService, new DefaultBackingStoreSupplier()); 111 } 112 113 /** 114 * Create a ParallelScatterZipCreator 115 * 116 * @param executorService The executorService to use. For technical reasons, this will be shut down 117 * by this class. 118 * @param backingStoreSupplier The supplier of backing store which shall be used 119 */ ParallelScatterZipCreator(final ExecutorService executorService, final ScatterGatherBackingStoreSupplier backingStoreSupplier)120 public ParallelScatterZipCreator(final ExecutorService executorService, 121 final ScatterGatherBackingStoreSupplier backingStoreSupplier) { 122 this.backingStoreSupplier = backingStoreSupplier; 123 es = executorService; 124 } 125 126 /** 127 * Adds an archive entry to this archive. 128 * <p> 129 * This method is expected to be called from a single client thread 130 * </p> 131 * 132 * @param zipArchiveEntry The entry to add. 133 * @param source The source input stream supplier 134 */ 135 addArchiveEntry(final ZipArchiveEntry zipArchiveEntry, final InputStreamSupplier source)136 public void addArchiveEntry(final ZipArchiveEntry zipArchiveEntry, final InputStreamSupplier source) { 137 submit(createCallable(zipArchiveEntry, source)); 138 } 139 140 /** 141 * Adds an archive entry to this archive. 142 * <p> 143 * This method is expected to be called from a single client thread 144 * </p> 145 * 146 * @param zipArchiveEntryRequestSupplier Should supply the entry to be added. 147 * @since 1.13 148 */ addArchiveEntry(final ZipArchiveEntryRequestSupplier zipArchiveEntryRequestSupplier)149 public void addArchiveEntry(final ZipArchiveEntryRequestSupplier zipArchiveEntryRequestSupplier) { 150 submit(createCallable(zipArchiveEntryRequestSupplier)); 151 } 152 153 /** 154 * Submit a callable for compression. 155 * 156 * @see ParallelScatterZipCreator#createCallable for details of if/when to use this. 157 * 158 * @param callable The callable to run, created by {@link #createCallable createCallable}, possibly wrapped by caller. 159 */ submit(final Callable<Object> callable)160 public final void submit(final Callable<Object> callable) { 161 futures.add(es.submit(callable)); 162 } 163 164 /** 165 * Create a callable that will compress the given archive entry. 166 * 167 * <p>This method is expected to be called from a single client thread.</p> 168 * 169 * Consider using {@link #addArchiveEntry addArchiveEntry}, which wraps this method and {@link #submit submit}. 170 * The most common use case for using {@link #createCallable createCallable} and {@link #submit submit} from a 171 * client is if you want to wrap the callable in something that can be prioritized by the supplied 172 * {@link ExecutorService}, for instance to process large or slow files first. 173 * Since the creation of the {@link ExecutorService} is handled by the client, all of this is up to the client. 174 * 175 * @param zipArchiveEntry The entry to add. 176 * @param source The source input stream supplier 177 * @return A callable that should subsequently passed to #submit, possibly in a wrapped/adapted from. The 178 * value of this callable is not used, but any exceptions happening inside the compression 179 * will be propagated through the callable. 180 */ 181 createCallable(final ZipArchiveEntry zipArchiveEntry, final InputStreamSupplier source)182 public final Callable<Object> createCallable(final ZipArchiveEntry zipArchiveEntry, final InputStreamSupplier source) { 183 final int method = zipArchiveEntry.getMethod(); 184 if (method == ZipMethod.UNKNOWN_CODE) { 185 throw new IllegalArgumentException("Method must be set on zipArchiveEntry: " + zipArchiveEntry); 186 } 187 final ZipArchiveEntryRequest zipArchiveEntryRequest = createZipArchiveEntryRequest(zipArchiveEntry, source); 188 return new Callable<Object>() { 189 @Override 190 public Object call() throws Exception { 191 tlScatterStreams.get().addArchiveEntry(zipArchiveEntryRequest); 192 return null; 193 } 194 }; 195 } 196 197 /** 198 * Create a callable that will compress archive entry supplied by {@link ZipArchiveEntryRequestSupplier}. 199 * 200 * <p>This method is expected to be called from a single client thread.</p> 201 * 202 * The same as {@link #createCallable(ZipArchiveEntry, InputStreamSupplier)}, but the archive entry 203 * to be added is supplied by a {@link ZipArchiveEntryRequestSupplier}. 204 * 205 * @see #createCallable(ZipArchiveEntry, InputStreamSupplier) 206 * 207 * @param zipArchiveEntryRequestSupplier Should supply the entry to be added. 208 * @return A callable that should subsequently passed to #submit, possibly in a wrapped/adapted from. The 209 * value of this callable is not used, but any exceptions happening inside the compression 210 * will be propagated through the callable. 211 * @since 1.13 212 */ 213 public final Callable<Object> createCallable(final ZipArchiveEntryRequestSupplier zipArchiveEntryRequestSupplier) { 214 return new Callable<Object>() { 215 @Override 216 public Object call() throws Exception { 217 tlScatterStreams.get().addArchiveEntry(zipArchiveEntryRequestSupplier.get()); 218 return null; 219 } 220 }; 221 } 222 223 /** 224 * Write the contents this to the target {@link ZipArchiveOutputStream}. 225 * <p> 226 * It may be beneficial to write things like directories and manifest files to the targetStream 227 * before calling this method. 228 * </p> 229 * 230 * <p>Calling this method will shut down the {@link ExecutorService} used by this class. If any of the {@link 231 * Callable}s {@link #submit}ted to this instance throws an exception, the archive can not be created properly and 232 * this method will throw an exception.</p> 233 * 234 * @param targetStream The {@link ZipArchiveOutputStream} to receive the contents of the scatter streams 235 * @throws IOException If writing fails 236 * @throws InterruptedException If we get interrupted 237 * @throws ExecutionException If something happens in the parallel execution 238 */ 239 public void writeTo(final ZipArchiveOutputStream targetStream) 240 throws IOException, InterruptedException, ExecutionException { 241 242 // Make sure we catch any exceptions from parallel phase 243 try { 244 for (final Future<?> future : futures) { 245 future.get(); 246 } 247 } finally { 248 es.shutdown(); 249 } 250 251 es.awaitTermination(1000 * 60L, TimeUnit.SECONDS); // == Infinity. We really *must* wait for this to complete 252 253 // It is important that all threads terminate before we go on, ensure happens-before relationship 254 compressionDoneAt = System.currentTimeMillis(); 255 256 synchronized (streams) { 257 for (final ScatterZipOutputStream scatterStream : streams) { 258 scatterStream.writeTo(targetStream); 259 scatterStream.close(); 260 } 261 } 262 263 scatterDoneAt = System.currentTimeMillis(); 264 } 265 266 /** 267 * Returns a message describing the overall statistics of the compression run 268 * 269 * @return A string 270 */ 271 public ScatterStatistics getStatisticsMessage() { 272 return new ScatterStatistics(compressionDoneAt - startedAt, scatterDoneAt - compressionDoneAt); 273 } 274 } 275 276