1 /*
2  * Copyright (C) 2017 The Android Open Source Project
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *      http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 package com.android.tradefed.util;
18 
19 import com.android.tradefed.log.LogUtil.CLog;
20 
21 import java.io.IOException;
22 import java.io.InputStreamReader;
23 import java.io.OutputStreamWriter;
24 import java.io.Reader;
25 import java.io.Writer;
26 
27 /**
28  * A helper for interruptible process execution.
29  */
30 public class ProcessHelper {
31     // Timeout values of destroying the process.
32     private static final int DESTROY_PROCESS_MAX_POLL_COUNT = 3;
33     private static final long DESTROY_PROCESS_POLL_INTERVAL_MSECS = 500;
34     private static final String DEBUG = "DEBUG";
35     private static final String INFO = "INFO";
36     private static final String WARN = "WARN";
37     private static final String ERROR = "ERROR";
38 
39     // Timeout value of joining the stdout and stderr threads.
40     private static final int THREAD_JOIN_TIMEOUT_MSECS = 1000;
41 
42     // The process being monitored.
43     private final Process mProcess;
44 
45     // The stdout and stderr of the process.
46     private final Reader mStdoutReader;
47     private final Reader mStderrReader;
48 
49     // The threads redirecting the stdout and stderr to buffers.
50     private final ReaderThread mStdoutThread;
51     private final ReaderThread mStderrThread;
52 
53     // The buffers of stdout and stderr.
54     private final StringBuilder mStdout;
55     private final StringBuilder mStderr;
56 
57     // The stdin of the process.
58     private final Writer mStdinWriter;
59 
60     /**
61      * A thread that keeps reading string from an input stream.
62      */
63     static class ReaderThread extends Thread {
64         private static final int BUF_SIZE = 16 * 1024;
65         private Reader mReader;
66         private StringBuilder mBuffer;
67 
68         static enum LogType {
69             STDOUT,
70             STDERR;
71         }
72 
73         private LogType mLogType;
74 
75         /**
76          * @param reader the input stream to read from.
77          * @param buffer the buffer containing the input data.
78          * @param name the name of the thread.
79          * @param logType enum, type of log output.
80          */
ReaderThread(Reader reader, StringBuilder buffer, String name, LogType logType)81         public ReaderThread(Reader reader, StringBuilder buffer, String name, LogType logType) {
82             super(name);
83             mReader = reader;
84             mBuffer = buffer;
85             mLogType = logType;
86         }
87 
88         /**
89          * Read string from the input stream until EOF.
90          */
91         @Override
run()92         public void run() {
93             char[] charBuffer = new char[BUF_SIZE];
94             // reader will be closed in cleanUp()
95             try {
96                 String currentLogLevel = INFO;
97                 while (true) {
98                     int readCount = mReader.read(charBuffer, 0, charBuffer.length);
99                     if (readCount < 0) {
100                         break;
101                     }
102                     String newRead = new String(charBuffer, 0, readCount);
103 
104                     int newLineLen = 0;
105                     if (newRead.endsWith("\r\n")) {
106                         newLineLen = 2;
107                     } else if (newRead.endsWith("\n")) {
108                         newLineLen = 1;
109                     }
110 
111                     String newReadPrint = newRead.substring(0, newRead.length() - newLineLen);
112                     switch (mLogType) {
113                         case STDOUT:
114                             // Logs coming from stdout for the process, which may contain
115                             // python DEBUG and ERROR logs.
116                             String[] tokens = newReadPrint.split("\\s+");
117                             if (tokens.length >= 4) {
118                                 String level = tokens[3];
119                                 switch (tokens[3]) {
120                                     case DEBUG:
121                                     case INFO:
122                                     case WARN:
123                                     case ERROR:
124                                         currentLogLevel = level;
125                                         break;
126                                     default:
127                                         // Use the last known log level
128                                 }
129                             }
130 
131                             switch (currentLogLevel) {
132                                 case DEBUG:
133                                     CLog.d(newReadPrint);
134                                     break;
135                                 case INFO:
136                                     CLog.i(newReadPrint);
137                                     break;
138                                 case WARN:
139                                     CLog.w(newReadPrint);
140                                     break;
141                                 case ERROR:
142                                     CLog.e(newReadPrint);
143                                     break;
144                                 default:
145                                     // This case should never happen
146                                     CLog.e("Error in current log level state.");
147                                     CLog.i(newReadPrint);
148                             }
149 
150                             break;
151                         case STDERR:
152                             // Logs coming from stderr for the process, which is always
153                             // ERROR level
154                             CLog.e(newReadPrint);
155                             break;
156                     }
157                     mBuffer.append(newRead);
158                 }
159             } catch (IOException e) {
160                 CLog.e("IOException during ProcessHelper#ReaderThread run.");
161                 CLog.e(e);
162             }
163         }
164     }
165 
166     /**
167      * This class waits for a process. It is run by {@link IRunUtil}.
168      */
169     class VtsRunnable implements IRunUtil.IRunnableResult {
170         private boolean mCancelled = false;
171         private Thread mExecutionThread = null;
172         private final Object mLock = new Object();
173 
174         /**
175          * @return whether the command is successful. {@link RunUtil} returns
176          * {@link CommandStatus#SUCCESS} or {@link CommandStatus#FAILED} according to the
177          * this return value.
178          */
179         @Override
run()180         public boolean run() {
181             synchronized (mLock) {
182                 mExecutionThread = Thread.currentThread();
183                 if (mCancelled) {
184                     CLog.w("Process was cancelled before being awaited.");
185                     return false;
186                 }
187             }
188             boolean success;
189             try {
190                 success = (mProcess.waitFor() == 0);
191                 CLog.d("Process terminates normally.");
192             } catch (InterruptedException e) {
193                 success = false;
194                 CLog.e("Process is interrupted.");
195             }
196             return success;
197         }
198 
199         /**
200          * This method makes {@link #run()} method stop waiting for the process. It can be called
201          * more than once. {@link RunUtil} calls this method if {@link CommandStatus} is TIMED_OUT
202          * or EXCEPTION.
203          */
204         @Override
cancel()205         public void cancel() {
206             CLog.w("Attempt to interrupt execution thread.");
207             synchronized (mLock) {
208                 if (!mCancelled) {
209                     mCancelled = true;
210                     if (mExecutionThread != null) {
211                         mExecutionThread.interrupt();
212                     } else {
213                         CLog.d("Execution thread has not started.");
214                     }
215                 } else {
216                     CLog.e("Execution thread has been cancelled.");
217                 }
218             }
219         }
220 
221         /**
222          * @return the thread of {@link #run()}; null if the thread has not started.
223          */
getExecutionThread()224         public Thread getExecutionThread() {
225             synchronized (mLock) {
226                 return mExecutionThread;
227             }
228         }
229     }
230 
231     /**
232      * Create an instance that monitors a running process.
233      *
234      * @param process the process to monitor.
235      */
ProcessHelper(Process process)236     public ProcessHelper(Process process) {
237         mProcess = process;
238         mStdout = new StringBuilder();
239         mStderr = new StringBuilder();
240         mStdinWriter = new OutputStreamWriter(mProcess.getOutputStream());
241         mStdoutReader = new InputStreamReader(mProcess.getInputStream());
242         mStderrReader = new InputStreamReader(mProcess.getErrorStream());
243         mStdoutThread = new ReaderThread(
244                 mStdoutReader, mStdout, "process-helper-stdout", ReaderThread.LogType.STDOUT);
245         mStderrThread = new ReaderThread(
246                 mStderrReader, mStderr, "process-helper-stderr", ReaderThread.LogType.STDERR);
247         mStdoutThread.start();
248         mStderrThread.start();
249     }
250 
251     /**
252      * Wait for the process until termination, timeout, or interrupt.
253      *
254      * @param timeoutMsecs the time to wait in milliseconds.
255      * @return {@link CommandStatus#SUCCESS} or {@link CommandStatus#FAILED} if the process
256      * terminated. {@link CommandStatus#TIMED_OUT} if timeout. {@link CommandStatus#EXCEPTION} for
257      * other types of errors.
258      * @throws RunInterruptedException if TradeFed interrupts the test invocation.
259      */
waitForProcess(long timeoutMsecs)260     public CommandStatus waitForProcess(long timeoutMsecs) throws RunInterruptedException {
261         VtsRunnable vtsRunnable = new VtsRunnable();
262         CommandStatus status;
263         // Use default RunUtil because it can receive the notification of "invocation stop".
264         try {
265             status = RunUtil.getDefault().runTimed(timeoutMsecs, vtsRunnable, true);
266         } catch (RunInterruptedException e) {
267             // clear the flag set by default RunUtil.
268             Thread.interrupted();
269             // RunUtil does not always call cancel() and join() when interrupted.
270             vtsRunnable.cancel();
271             Thread execThread = vtsRunnable.getExecutionThread();
272             if (execThread != null) {
273                 joinThread(execThread, THREAD_JOIN_TIMEOUT_MSECS);
274             }
275             throw e;
276         }
277         if (CommandStatus.SUCCESS.equals(status) || CommandStatus.FAILED.equals(status)) {
278             // Join the receiver threads otherwise output might not be available yet.
279             joinThread(mStdoutThread, THREAD_JOIN_TIMEOUT_MSECS);
280             joinThread(mStderrThread, THREAD_JOIN_TIMEOUT_MSECS);
281         } else {
282             CLog.w("Process status is %s", status);
283         }
284         return status;
285     }
286 
287     /**
288      * Write a string to stdin of the process.
289      *
290      * @param data the string.
291      * @throws IOException if the operation fails.
292      */
writeStdin(String data)293     public void writeStdin(String data) throws IOException {
294         mStdinWriter.write(data);
295         mStdinWriter.flush();
296     }
297 
298     /**
299      * Close stdin of the process.
300      *
301      * @throws IOException if the operation fails.
302      */
closeStdin()303     public void closeStdin() throws IOException {
304         mStdinWriter.close();
305     }
306 
307     /**
308      * @return the stdout of the process. As the buffer is not thread safe, the caller must call
309      * {@link #cleanUp()} or {@link #waitForProcess(long)} to ensure process termination before
310      * calling this method.
311      */
getStdout()312     public String getStdout() {
313         return mStdout.toString();
314     }
315 
316     /**
317      * @return the stderr of the process. As the buffer is not thread safe, the caller must call
318      * {@link #cleanUp()} or {@link #waitForProcess(long)} to ensure process termination before
319      * calling this method.
320      */
getStderr()321     public String getStderr() {
322         return mStderr.toString();
323     }
324 
325     /**
326      * @return whether the process is running.
327      */
isRunning()328     public boolean isRunning() {
329         try {
330             mProcess.exitValue();
331             return false;
332         } catch (IllegalThreadStateException e) {
333             return true;
334         }
335     }
336 
337     /**
338      * Kill the process if it is running. Clean up all threads and streams.
339      */
cleanUp()340     public void cleanUp() {
341         try {
342             for (int pollCount = 0; isRunning(); pollCount++) {
343                 if (pollCount >= DESTROY_PROCESS_MAX_POLL_COUNT) {
344                     CLog.e("Cannot destroy the process.");
345                     break;
346                 }
347                 if (pollCount == 0) {
348                     CLog.w("Kill the running process.");
349                     mProcess.destroy();
350                 } else {
351                     Thread.sleep(DESTROY_PROCESS_POLL_INTERVAL_MSECS);
352                 }
353             }
354         } catch (InterruptedException e) {
355             CLog.e(e);
356         }
357         try {
358             closeStdin();
359         } catch (IOException e) {
360             CLog.e(e);
361         }
362         try {
363             mStdoutReader.close();
364         } catch (IOException e) {
365             CLog.e(e);
366         }
367         try {
368             mStderrReader.close();
369         } catch (IOException e) {
370             CLog.e(e);
371         }
372         joinThread(mStdoutThread, THREAD_JOIN_TIMEOUT_MSECS);
373         joinThread(mStderrThread, THREAD_JOIN_TIMEOUT_MSECS);
374     }
375 
376     /**
377      * Join a thread and log error.
378      *
379      * @param thread the thread to join.
380      * @param timeoutMsec the timeout in milliseconds.
381      * @return whether the thread is joined successfully.
382      */
joinThread(Thread thread, long timeoutMsec)383     private static boolean joinThread(Thread thread, long timeoutMsec) {
384         try {
385             thread.join(timeoutMsec);
386         } catch (InterruptedException e) {
387             CLog.e(e);
388         }
389         if (thread.isAlive()) {
390             CLog.e("Failed to join %s.", thread.getName());
391             return false;
392         }
393         return true;
394     }
395 }
396