1 /* 2 * Copyright (C) 2017 The Android Open Source Project 3 * 4 * Licensed under the Apache License, Version 2.0 (the "License"); 5 * you may not use this file except in compliance with the License. 6 * You may obtain a copy of the License at 7 * 8 * http://www.apache.org/licenses/LICENSE-2.0 9 * 10 * Unless required by applicable law or agreed to in writing, software 11 * distributed under the License is distributed on an "AS IS" BASIS, 12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13 * See the License for the specific language governing permissions and 14 * limitations under the License. 15 */ 16 17 package com.android.tradefed.util; 18 19 import com.android.tradefed.log.LogUtil.CLog; 20 21 import java.io.IOException; 22 import java.io.InputStreamReader; 23 import java.io.OutputStreamWriter; 24 import java.io.Reader; 25 import java.io.Writer; 26 27 /** 28 * A helper for interruptible process execution. 29 */ 30 public class ProcessHelper { 31 // Timeout values of destroying the process. 32 private static final int DESTROY_PROCESS_MAX_POLL_COUNT = 3; 33 private static final long DESTROY_PROCESS_POLL_INTERVAL_MSECS = 500; 34 private static final String DEBUG = "DEBUG"; 35 private static final String INFO = "INFO"; 36 private static final String WARN = "WARN"; 37 private static final String ERROR = "ERROR"; 38 39 // Timeout value of joining the stdout and stderr threads. 40 private static final int THREAD_JOIN_TIMEOUT_MSECS = 1000; 41 42 // The process being monitored. 43 private final Process mProcess; 44 45 // The stdout and stderr of the process. 46 private final Reader mStdoutReader; 47 private final Reader mStderrReader; 48 49 // The threads redirecting the stdout and stderr to buffers. 50 private final ReaderThread mStdoutThread; 51 private final ReaderThread mStderrThread; 52 53 // The buffers of stdout and stderr. 54 private final StringBuilder mStdout; 55 private final StringBuilder mStderr; 56 57 // The stdin of the process. 58 private final Writer mStdinWriter; 59 60 /** 61 * A thread that keeps reading string from an input stream. 62 */ 63 static class ReaderThread extends Thread { 64 private static final int BUF_SIZE = 16 * 1024; 65 private Reader mReader; 66 private StringBuilder mBuffer; 67 68 static enum LogType { 69 STDOUT, 70 STDERR; 71 } 72 73 private LogType mLogType; 74 75 /** 76 * @param reader the input stream to read from. 77 * @param buffer the buffer containing the input data. 78 * @param name the name of the thread. 79 * @param logType enum, type of log output. 80 */ ReaderThread(Reader reader, StringBuilder buffer, String name, LogType logType)81 public ReaderThread(Reader reader, StringBuilder buffer, String name, LogType logType) { 82 super(name); 83 mReader = reader; 84 mBuffer = buffer; 85 mLogType = logType; 86 } 87 88 /** 89 * Read string from the input stream until EOF. 90 */ 91 @Override run()92 public void run() { 93 char[] charBuffer = new char[BUF_SIZE]; 94 // reader will be closed in cleanUp() 95 try { 96 String currentLogLevel = INFO; 97 while (true) { 98 int readCount = mReader.read(charBuffer, 0, charBuffer.length); 99 if (readCount < 0) { 100 break; 101 } 102 String newRead = new String(charBuffer, 0, readCount); 103 104 int newLineLen = 0; 105 if (newRead.endsWith("\r\n")) { 106 newLineLen = 2; 107 } else if (newRead.endsWith("\n")) { 108 newLineLen = 1; 109 } 110 111 String newReadPrint = newRead.substring(0, newRead.length() - newLineLen); 112 switch (mLogType) { 113 case STDOUT: 114 // Logs coming from stdout for the process, which may contain 115 // python DEBUG and ERROR logs. 116 String[] tokens = newReadPrint.split("\\s+"); 117 if (tokens.length >= 4) { 118 String level = tokens[3]; 119 switch (tokens[3]) { 120 case DEBUG: 121 case INFO: 122 case WARN: 123 case ERROR: 124 currentLogLevel = level; 125 break; 126 default: 127 // Use the last known log level 128 } 129 } 130 131 switch (currentLogLevel) { 132 case DEBUG: 133 CLog.d(newReadPrint); 134 break; 135 case INFO: 136 CLog.i(newReadPrint); 137 break; 138 case WARN: 139 CLog.w(newReadPrint); 140 break; 141 case ERROR: 142 CLog.e(newReadPrint); 143 break; 144 default: 145 // This case should never happen 146 CLog.e("Error in current log level state."); 147 CLog.i(newReadPrint); 148 } 149 150 break; 151 case STDERR: 152 // Logs coming from stderr for the process, which is always 153 // ERROR level 154 CLog.e(newReadPrint); 155 break; 156 } 157 mBuffer.append(newRead); 158 } 159 } catch (IOException e) { 160 CLog.e("IOException during ProcessHelper#ReaderThread run."); 161 CLog.e(e); 162 } 163 } 164 } 165 166 /** 167 * This class waits for a process. It is run by {@link IRunUtil}. 168 */ 169 class VtsRunnable implements IRunUtil.IRunnableResult { 170 private boolean mCancelled = false; 171 private Thread mExecutionThread = null; 172 private final Object mLock = new Object(); 173 174 /** 175 * @return whether the command is successful. {@link RunUtil} returns 176 * {@link CommandStatus#SUCCESS} or {@link CommandStatus#FAILED} according to the 177 * this return value. 178 */ 179 @Override run()180 public boolean run() { 181 synchronized (mLock) { 182 mExecutionThread = Thread.currentThread(); 183 if (mCancelled) { 184 CLog.w("Process was cancelled before being awaited."); 185 return false; 186 } 187 } 188 boolean success; 189 try { 190 success = (mProcess.waitFor() == 0); 191 CLog.d("Process terminates normally."); 192 } catch (InterruptedException e) { 193 success = false; 194 CLog.e("Process is interrupted."); 195 } 196 return success; 197 } 198 199 /** 200 * This method makes {@link #run()} method stop waiting for the process. It can be called 201 * more than once. {@link RunUtil} calls this method if {@link CommandStatus} is TIMED_OUT 202 * or EXCEPTION. 203 */ 204 @Override cancel()205 public void cancel() { 206 CLog.w("Attempt to interrupt execution thread."); 207 synchronized (mLock) { 208 if (!mCancelled) { 209 mCancelled = true; 210 if (mExecutionThread != null) { 211 mExecutionThread.interrupt(); 212 } else { 213 CLog.d("Execution thread has not started."); 214 } 215 } else { 216 CLog.e("Execution thread has been cancelled."); 217 } 218 } 219 } 220 221 /** 222 * @return the thread of {@link #run()}; null if the thread has not started. 223 */ getExecutionThread()224 public Thread getExecutionThread() { 225 synchronized (mLock) { 226 return mExecutionThread; 227 } 228 } 229 } 230 231 /** 232 * Create an instance that monitors a running process. 233 * 234 * @param process the process to monitor. 235 */ ProcessHelper(Process process)236 public ProcessHelper(Process process) { 237 mProcess = process; 238 mStdout = new StringBuilder(); 239 mStderr = new StringBuilder(); 240 mStdinWriter = new OutputStreamWriter(mProcess.getOutputStream()); 241 mStdoutReader = new InputStreamReader(mProcess.getInputStream()); 242 mStderrReader = new InputStreamReader(mProcess.getErrorStream()); 243 mStdoutThread = new ReaderThread( 244 mStdoutReader, mStdout, "process-helper-stdout", ReaderThread.LogType.STDOUT); 245 mStderrThread = new ReaderThread( 246 mStderrReader, mStderr, "process-helper-stderr", ReaderThread.LogType.STDERR); 247 mStdoutThread.start(); 248 mStderrThread.start(); 249 } 250 251 /** 252 * Wait for the process until termination, timeout, or interrupt. 253 * 254 * @param timeoutMsecs the time to wait in milliseconds. 255 * @return {@link CommandStatus#SUCCESS} or {@link CommandStatus#FAILED} if the process 256 * terminated. {@link CommandStatus#TIMED_OUT} if timeout. {@link CommandStatus#EXCEPTION} for 257 * other types of errors. 258 * @throws RunInterruptedException if TradeFed interrupts the test invocation. 259 */ waitForProcess(long timeoutMsecs)260 public CommandStatus waitForProcess(long timeoutMsecs) throws RunInterruptedException { 261 VtsRunnable vtsRunnable = new VtsRunnable(); 262 CommandStatus status; 263 // Use default RunUtil because it can receive the notification of "invocation stop". 264 try { 265 status = RunUtil.getDefault().runTimed(timeoutMsecs, vtsRunnable, true); 266 } catch (RunInterruptedException e) { 267 // clear the flag set by default RunUtil. 268 Thread.interrupted(); 269 // RunUtil does not always call cancel() and join() when interrupted. 270 vtsRunnable.cancel(); 271 Thread execThread = vtsRunnable.getExecutionThread(); 272 if (execThread != null) { 273 joinThread(execThread, THREAD_JOIN_TIMEOUT_MSECS); 274 } 275 throw e; 276 } 277 if (CommandStatus.SUCCESS.equals(status) || CommandStatus.FAILED.equals(status)) { 278 // Join the receiver threads otherwise output might not be available yet. 279 joinThread(mStdoutThread, THREAD_JOIN_TIMEOUT_MSECS); 280 joinThread(mStderrThread, THREAD_JOIN_TIMEOUT_MSECS); 281 } else { 282 CLog.w("Process status is %s", status); 283 } 284 return status; 285 } 286 287 /** 288 * Write a string to stdin of the process. 289 * 290 * @param data the string. 291 * @throws IOException if the operation fails. 292 */ writeStdin(String data)293 public void writeStdin(String data) throws IOException { 294 mStdinWriter.write(data); 295 mStdinWriter.flush(); 296 } 297 298 /** 299 * Close stdin of the process. 300 * 301 * @throws IOException if the operation fails. 302 */ closeStdin()303 public void closeStdin() throws IOException { 304 mStdinWriter.close(); 305 } 306 307 /** 308 * @return the stdout of the process. As the buffer is not thread safe, the caller must call 309 * {@link #cleanUp()} or {@link #waitForProcess(long)} to ensure process termination before 310 * calling this method. 311 */ getStdout()312 public String getStdout() { 313 return mStdout.toString(); 314 } 315 316 /** 317 * @return the stderr of the process. As the buffer is not thread safe, the caller must call 318 * {@link #cleanUp()} or {@link #waitForProcess(long)} to ensure process termination before 319 * calling this method. 320 */ getStderr()321 public String getStderr() { 322 return mStderr.toString(); 323 } 324 325 /** 326 * @return whether the process is running. 327 */ isRunning()328 public boolean isRunning() { 329 try { 330 mProcess.exitValue(); 331 return false; 332 } catch (IllegalThreadStateException e) { 333 return true; 334 } 335 } 336 337 /** 338 * Kill the process if it is running. Clean up all threads and streams. 339 */ cleanUp()340 public void cleanUp() { 341 try { 342 for (int pollCount = 0; isRunning(); pollCount++) { 343 if (pollCount >= DESTROY_PROCESS_MAX_POLL_COUNT) { 344 CLog.e("Cannot destroy the process."); 345 break; 346 } 347 if (pollCount == 0) { 348 CLog.w("Kill the running process."); 349 mProcess.destroy(); 350 } else { 351 Thread.sleep(DESTROY_PROCESS_POLL_INTERVAL_MSECS); 352 } 353 } 354 } catch (InterruptedException e) { 355 CLog.e(e); 356 } 357 try { 358 closeStdin(); 359 } catch (IOException e) { 360 CLog.e(e); 361 } 362 try { 363 mStdoutReader.close(); 364 } catch (IOException e) { 365 CLog.e(e); 366 } 367 try { 368 mStderrReader.close(); 369 } catch (IOException e) { 370 CLog.e(e); 371 } 372 joinThread(mStdoutThread, THREAD_JOIN_TIMEOUT_MSECS); 373 joinThread(mStderrThread, THREAD_JOIN_TIMEOUT_MSECS); 374 } 375 376 /** 377 * Join a thread and log error. 378 * 379 * @param thread the thread to join. 380 * @param timeoutMsec the timeout in milliseconds. 381 * @return whether the thread is joined successfully. 382 */ joinThread(Thread thread, long timeoutMsec)383 private static boolean joinThread(Thread thread, long timeoutMsec) { 384 try { 385 thread.join(timeoutMsec); 386 } catch (InterruptedException e) { 387 CLog.e(e); 388 } 389 if (thread.isAlive()) { 390 CLog.e("Failed to join %s.", thread.getName()); 391 return false; 392 } 393 return true; 394 } 395 } 396