1 /*
2  *  Licensed to the Apache Software Foundation (ASF) under one or more
3  *  contributor license agreements.  See the NOTICE file distributed with
4  *  this work for additional information regarding copyright ownership.
5  *  The ASF licenses this file to You under the Apache License, Version 2.0
6  *  (the "License"); you may not use this file except in compliance with
7  *  the License.  You may obtain a copy of the License at
8  *
9  *      http://www.apache.org/licenses/LICENSE-2.0
10  *
11  *  Unless required by applicable law or agreed to in writing, software
12  *  distributed under the License is distributed on an "AS IS" BASIS,
13  *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  *  See the License for the specific language governing permissions and
15  *  limitations under the License.
16  *
17  */
18 package org.apache.commons.compress.archivers.zip;
19 
20 import org.apache.commons.compress.parallel.FileBasedScatterGatherBackingStore;
21 import org.apache.commons.compress.parallel.InputStreamSupplier;
22 import org.apache.commons.compress.parallel.ScatterGatherBackingStore;
23 import org.apache.commons.compress.parallel.ScatterGatherBackingStoreSupplier;
24 
25 import java.io.File;
26 import java.io.IOException;
27 import java.util.ArrayList;
28 import java.util.List;
29 import java.util.concurrent.Callable;
30 import java.util.concurrent.ExecutionException;
31 import java.util.concurrent.ExecutorService;
32 import java.util.concurrent.Executors;
33 import java.util.concurrent.Future;
34 import java.util.concurrent.TimeUnit;
35 import java.util.concurrent.atomic.AtomicInteger;
36 import java.util.zip.Deflater;
37 
38 import static java.util.Collections.synchronizedList;
39 import static org.apache.commons.compress.archivers.zip.ZipArchiveEntryRequest.createZipArchiveEntryRequest;
40 
41 /**
42  * Creates a zip in parallel by using multiple threadlocal {@link ScatterZipOutputStream} instances.
43  * <p>
44  * Note that this class generally makes no guarantees about the order of things written to
45  * the output file. Things that need to come in a specific order (manifests, directories)
46  * must be handled by the client of this class, usually by writing these things to the
47  * {@link ZipArchiveOutputStream} <em>before</em> calling {@link #writeTo writeTo} on this class.</p>
48  * <p>
49  * The client can supply an {@link java.util.concurrent.ExecutorService}, but for reasons of
50  * memory model consistency, this will be shut down by this class prior to completion.
51  * </p>
52  * @since 1.10
53  */
54 public class ParallelScatterZipCreator {
55     private final List<ScatterZipOutputStream> streams = synchronizedList(new ArrayList<ScatterZipOutputStream>());
56     private final ExecutorService es;
57     private final ScatterGatherBackingStoreSupplier backingStoreSupplier;
58     private final List<Future<Object>> futures = new ArrayList<>();
59 
60     private final long startedAt = System.currentTimeMillis();
61     private long compressionDoneAt = 0;
62     private long scatterDoneAt;
63 
64     private static class DefaultBackingStoreSupplier implements ScatterGatherBackingStoreSupplier {
65         final AtomicInteger storeNum = new AtomicInteger(0);
66 
67         @Override
get()68         public ScatterGatherBackingStore get() throws IOException {
69             final File tempFile = File.createTempFile("parallelscatter", "n" + storeNum.incrementAndGet());
70             return new FileBasedScatterGatherBackingStore(tempFile);
71         }
72     }
73 
createDeferred(final ScatterGatherBackingStoreSupplier scatterGatherBackingStoreSupplier)74     private ScatterZipOutputStream createDeferred(final ScatterGatherBackingStoreSupplier scatterGatherBackingStoreSupplier)
75             throws IOException {
76         final ScatterGatherBackingStore bs = scatterGatherBackingStoreSupplier.get();
77         // lifecycle is bound to the ScatterZipOutputStream returned
78         final StreamCompressor sc = StreamCompressor.create(Deflater.DEFAULT_COMPRESSION, bs); //NOSONAR
79         return new ScatterZipOutputStream(bs, sc);
80     }
81 
82     private final ThreadLocal<ScatterZipOutputStream> tlScatterStreams = new ThreadLocal<ScatterZipOutputStream>() {
83         @Override
84         protected ScatterZipOutputStream initialValue() {
85             try {
86                 final ScatterZipOutputStream scatterStream = createDeferred(backingStoreSupplier);
87                 streams.add(scatterStream);
88                 return scatterStream;
89             } catch (final IOException e) {
90                 throw new RuntimeException(e); //NOSONAR
91             }
92         }
93     };
94 
95     /**
96      * Create a ParallelScatterZipCreator with default threads, which is set to the number of available
97      * processors, as defined by {@link java.lang.Runtime#availableProcessors}
98      */
ParallelScatterZipCreator()99     public ParallelScatterZipCreator() {
100         this(Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors()));
101     }
102 
103     /**
104      * Create a ParallelScatterZipCreator
105      *
106      * @param executorService The executorService to use for parallel scheduling. For technical reasons,
107      *                        this will be shut down by this class.
108      */
ParallelScatterZipCreator(final ExecutorService executorService)109     public ParallelScatterZipCreator(final ExecutorService executorService) {
110         this(executorService, new DefaultBackingStoreSupplier());
111     }
112 
113     /**
114      * Create a ParallelScatterZipCreator
115      *
116      * @param executorService The executorService to use. For technical reasons, this will be shut down
117      *                        by this class.
118      * @param backingStoreSupplier The supplier of backing store which shall be used
119      */
ParallelScatterZipCreator(final ExecutorService executorService, final ScatterGatherBackingStoreSupplier backingStoreSupplier)120     public ParallelScatterZipCreator(final ExecutorService executorService,
121                                      final ScatterGatherBackingStoreSupplier backingStoreSupplier) {
122         this.backingStoreSupplier = backingStoreSupplier;
123         es = executorService;
124     }
125 
126     /**
127      * Adds an archive entry to this archive.
128      * <p>
129      * This method is expected to be called from a single client thread
130      * </p>
131      *
132      * @param zipArchiveEntry The entry to add.
133      * @param source          The source input stream supplier
134      */
135 
addArchiveEntry(final ZipArchiveEntry zipArchiveEntry, final InputStreamSupplier source)136     public void addArchiveEntry(final ZipArchiveEntry zipArchiveEntry, final InputStreamSupplier source) {
137         submit(createCallable(zipArchiveEntry, source));
138     }
139 
140     /**
141      * Adds an archive entry to this archive.
142      * <p>
143      * This method is expected to be called from a single client thread
144      * </p>
145      *
146      * @param zipArchiveEntryRequestSupplier Should supply the entry to be added.
147      * @since 1.13
148      */
addArchiveEntry(final ZipArchiveEntryRequestSupplier zipArchiveEntryRequestSupplier)149     public void addArchiveEntry(final ZipArchiveEntryRequestSupplier zipArchiveEntryRequestSupplier) {
150         submit(createCallable(zipArchiveEntryRequestSupplier));
151     }
152 
153     /**
154      * Submit a callable for compression.
155      *
156      * @see ParallelScatterZipCreator#createCallable for details of if/when to use this.
157      *
158      * @param callable The callable to run, created by {@link #createCallable createCallable}, possibly wrapped by caller.
159      */
submit(final Callable<Object> callable)160     public final void submit(final Callable<Object> callable) {
161         futures.add(es.submit(callable));
162     }
163 
164     /**
165      * Create a callable that will compress the given archive entry.
166      *
167      * <p>This method is expected to be called from a single client thread.</p>
168      *
169      * Consider using {@link #addArchiveEntry addArchiveEntry}, which wraps this method and {@link #submit submit}.
170      * The most common use case for using {@link #createCallable createCallable} and {@link #submit submit} from a
171      * client is if you want to wrap the callable in something that can be prioritized by the supplied
172      * {@link ExecutorService}, for instance to process large or slow files first.
173      * Since the creation of the {@link ExecutorService} is handled by the client, all of this is up to the client.
174      *
175      * @param zipArchiveEntry The entry to add.
176      * @param source          The source input stream supplier
177      * @return A callable that should subsequently passed to #submit, possibly in a wrapped/adapted from. The
178      * value of this callable is not used, but any exceptions happening inside the compression
179      * will be propagated through the callable.
180      */
181 
createCallable(final ZipArchiveEntry zipArchiveEntry, final InputStreamSupplier source)182     public final Callable<Object> createCallable(final ZipArchiveEntry zipArchiveEntry, final InputStreamSupplier source) {
183         final int method = zipArchiveEntry.getMethod();
184         if (method == ZipMethod.UNKNOWN_CODE) {
185             throw new IllegalArgumentException("Method must be set on zipArchiveEntry: " + zipArchiveEntry);
186         }
187         final ZipArchiveEntryRequest zipArchiveEntryRequest = createZipArchiveEntryRequest(zipArchiveEntry, source);
188         return new Callable<Object>() {
189             @Override
190             public Object call() throws Exception {
191                 tlScatterStreams.get().addArchiveEntry(zipArchiveEntryRequest);
192                 return null;
193             }
194         };
195     }
196 
197     /**
198      * Create a callable that will compress archive entry supplied by {@link ZipArchiveEntryRequestSupplier}.
199      *
200      * <p>This method is expected to be called from a single client thread.</p>
201      *
202      * The same as {@link #createCallable(ZipArchiveEntry, InputStreamSupplier)}, but the archive entry
203      * to be added is supplied by a {@link ZipArchiveEntryRequestSupplier}.
204      *
205      * @see #createCallable(ZipArchiveEntry, InputStreamSupplier)
206      *
207      * @param zipArchiveEntryRequestSupplier Should supply the entry to be added.
208      * @return A callable that should subsequently passed to #submit, possibly in a wrapped/adapted from. The
209      * value of this callable is not used, but any exceptions happening inside the compression
210      * will be propagated through the callable.
211      * @since 1.13
212      */
213     public final Callable<Object> createCallable(final ZipArchiveEntryRequestSupplier zipArchiveEntryRequestSupplier) {
214         return new Callable<Object>() {
215             @Override
216             public Object call() throws Exception {
217                 tlScatterStreams.get().addArchiveEntry(zipArchiveEntryRequestSupplier.get());
218                 return null;
219             }
220         };
221     }
222 
223     /**
224      * Write the contents this to the target {@link ZipArchiveOutputStream}.
225      * <p>
226      * It may be beneficial to write things like directories and manifest files to the targetStream
227      * before calling this method.
228      * </p>
229      *
230      * <p>Calling this method will shut down the {@link ExecutorService} used by this class. If any of the {@link
231      * Callable}s {@link #submit}ted to this instance throws an exception, the archive can not be created properly and
232      * this method will throw an exception.</p>
233      *
234      * @param targetStream The {@link ZipArchiveOutputStream} to receive the contents of the scatter streams
235      * @throws IOException          If writing fails
236      * @throws InterruptedException If we get interrupted
237      * @throws ExecutionException   If something happens in the parallel execution
238      */
239     public void writeTo(final ZipArchiveOutputStream targetStream)
240             throws IOException, InterruptedException, ExecutionException {
241 
242         // Make sure we catch any exceptions from parallel phase
243         try {
244             for (final Future<?> future : futures) {
245                 future.get();
246             }
247         } finally {
248             es.shutdown();
249         }
250 
251         es.awaitTermination(1000 * 60L, TimeUnit.SECONDS);  // == Infinity. We really *must* wait for this to complete
252 
253         // It is important that all threads terminate before we go on, ensure happens-before relationship
254         compressionDoneAt = System.currentTimeMillis();
255 
256         synchronized (streams) {
257             for (final ScatterZipOutputStream scatterStream : streams) {
258                 scatterStream.writeTo(targetStream);
259                 scatterStream.close();
260             }
261         }
262 
263         scatterDoneAt = System.currentTimeMillis();
264     }
265 
266     /**
267      * Returns a message describing the overall statistics of the compression run
268      *
269      * @return A string
270      */
271     public ScatterStatistics getStatisticsMessage() {
272         return new ScatterStatistics(compressionDoneAt - startedAt, scatterDoneAt - compressionDoneAt);
273     }
274 }
275 
276