1 /*
2  * Copyright (C) 2014 The Android Open Source Project
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *      http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 package dexfuzz;
18 
19 import java.io.BufferedReader;
20 import java.io.InputStream;
21 import java.io.InputStreamReader;
22 import java.io.IOException;
23 import java.util.ArrayList;
24 import java.util.List;
25 import java.util.concurrent.Semaphore;
26 
27 /**
28  * process.waitFor() can block if its output buffers are not drained.
29  * These threads are used to keep the buffers drained, and provide the final
30  * output once the command has finished executing. Each Executor has its own
31  * output and error StreamConsumers.
32  */
33 public class StreamConsumer extends Thread {
34   private List<String> output;
35   private BufferedReader reader;
36 
37   private State state;
38 
39   private Semaphore workToBeDone;
40   private Semaphore outputIsReady;
41 
42   enum State {
43     WAITING,
44     CONSUMING,
45     SHOULD_STOP_CONSUMING,
46     FINISHED,
47     ERROR
48   }
49 
50   /**
51    * Create a StreamConsumer, will be immediately ready to start consuming.
52    */
StreamConsumer()53   public StreamConsumer() {
54     output = new ArrayList<String>();
55     workToBeDone = new Semaphore(0);
56     outputIsReady = new Semaphore(0);
57 
58     state = State.WAITING;
59   }
60 
61   /**
62    * Executor should call this to provide its StreamConsumers with the Streams
63    * for a Process it is about to call waitFor() on.
64    */
giveStreamAndStartConsuming(InputStream stream)65   public void giveStreamAndStartConsuming(InputStream stream) {
66     output.clear();
67 
68     reader = new BufferedReader(new InputStreamReader(stream));
69 
70     changeState(State.CONSUMING, State.WAITING);
71 
72     // Tell consumer there is work to be done.
73     workToBeDone.release();
74   }
75 
76   /**
77    * Executor should call this once its call to waitFor() returns.
78    */
processFinished()79   public void processFinished() {
80     changeState(State.SHOULD_STOP_CONSUMING, State.CONSUMING);
81   }
82 
83   /**
84    * Executor should call this to get the captured output of this StreamConsumer.
85    */
getOutput()86   public List<String> getOutput() {
87 
88     try {
89       // Wait until the output is ready.
90       outputIsReady.acquire();
91     } catch (InterruptedException e) {
92       Log.error("Client of StreamConsumer was interrupted while waiting for output?");
93       return null;
94     }
95 
96     // Take a copy of the Strings, so when we call output.clear(), we don't
97     // clear the ExecutionResult's list.
98     List<String> copy = new ArrayList<String>(output);
99     return copy;
100   }
101 
102   /**
103    * Executor should call this when we're shutting down.
104    */
shutdown()105   public void shutdown() {
106     changeState(State.FINISHED, State.WAITING);
107 
108     // Tell Consumer there is work to be done (it will check first if FINISHED has been set.)
109     workToBeDone.release();
110   }
111 
consume()112   private void consume() {
113     try {
114 
115       if (checkState(State.SHOULD_STOP_CONSUMING)) {
116         // Caller already called processFinished() before we even started
117         // consuming. Just get what we can and finish.
118         while (reader.ready()) {
119           output.add(reader.readLine());
120         }
121       } else {
122         // Caller's process is still executing, so just loop and consume.
123         while (checkState(State.CONSUMING)) {
124           Thread.sleep(50);
125           while (reader.ready()) {
126             output.add(reader.readLine());
127           }
128         }
129       }
130 
131       if (checkState(State.SHOULD_STOP_CONSUMING)) {
132         changeState(State.WAITING, State.SHOULD_STOP_CONSUMING);
133       } else {
134         Log.error("StreamConsumer stopped consuming, but was not told to?");
135         setErrorState();
136       }
137 
138       reader.close();
139 
140     } catch (IOException e) {
141       Log.error("StreamConsumer caught IOException while consuming");
142       setErrorState();
143     } catch (InterruptedException e) {
144       Log.error("StreamConsumer caught InterruptedException while consuming");
145       setErrorState();
146     }
147 
148     // Tell client of Consumer that the output is ready.
149     outputIsReady.release();
150   }
151 
152   @Override
run()153   public void run() {
154     while (checkState(State.WAITING)) {
155       try {
156         // Wait until there is work to be done
157         workToBeDone.acquire();
158       } catch (InterruptedException e) {
159         Log.error("StreamConsumer caught InterruptedException while waiting for work");
160         setErrorState();
161         break;
162       }
163 
164       // Check first if we're done
165       if (checkState(State.FINISHED)) {
166         break;
167       }
168 
169       // Make sure we're either supposed to be consuming
170       // or supposed to be finishing up consuming
171       if (!(checkState(State.CONSUMING) || checkState(State.SHOULD_STOP_CONSUMING))) {
172         Log.error("invalid state: StreamConsumer told about work, but not CONSUMING?");
173         Log.error("state was: " + getCurrentState());
174         setErrorState();
175         break;
176       }
177 
178       consume();
179     }
180   }
181 
checkState(State expectedState)182   private synchronized boolean checkState(State expectedState) {
183     return (expectedState == state);
184   }
185 
changeState(State newState, State previousState)186   private synchronized void changeState(State newState, State previousState) {
187     if (state != previousState) {
188       Log.error("StreamConsumer Unexpected state: " + state + ", expected " + previousState);
189       state = State.ERROR;
190     } else {
191       state = newState;
192     }
193   }
194 
setErrorState()195   private synchronized void setErrorState() {
196     state = State.ERROR;
197   }
198 
getCurrentState()199   private synchronized State getCurrentState() {
200     return state;
201   }
202 }
203