1 /* 2 * Copyright (C) 2016 Google Inc. 3 * 4 * Licensed under the Apache License, Version 2.0 (the "License"); you may not 5 * use this file except in compliance with the License. You may obtain a copy of 6 * the License at 7 * 8 * http://www.apache.org/licenses/LICENSE-2.0 9 * 10 * Unless required by applicable law or agreed to in writing, software 11 * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT 12 * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the 13 * License for the specific language governing permissions and limitations under 14 * the License. 15 */ 16 17 package com.trilead.ssh2; 18 19 import com.googlecode.android_scripting.Log; 20 21 import java.io.File; 22 import java.io.FileOutputStream; 23 import java.io.IOException; 24 import java.io.InputStream; 25 26 /** 27 * A <code>StreamGobbler</code> is an InputStream that uses an internal worker thread to constantly 28 * consume input from another InputStream. It uses a buffer to store the consumed data. The buffer 29 * size is automatically adjusted, if needed. 30 * <p> 31 * This class is sometimes very convenient - if you wrap a session's STDOUT and STDERR InputStreams 32 * with instances of this class, then you don't have to bother about the shared window of STDOUT and 33 * STDERR in the low level SSH-2 protocol, since all arriving data will be immediatelly consumed by 34 * the worker threads. Also, as a side effect, the streams will be buffered (e.g., single byte 35 * read() operations are faster). 36 * <p> 37 * Other SSH for Java libraries include this functionality by default in their STDOUT and STDERR 38 * InputStream implementations, however, please be aware that this approach has also a downside: 39 * <p> 40 * If you do not call the StreamGobbler's <code>read()</code> method often enough and the peer is 41 * constantly sending huge amounts of data, then you will sooner or later encounter a low memory 42 * situation due to the aggregated data (well, it also depends on the Java heap size). Joe Average 43 * will like this class anyway - a paranoid programmer would never use such an approach. 44 * <p> 45 * The term "StreamGobbler" was taken from an article called "When Runtime.exec() won't", see 46 * http://www.javaworld.com/javaworld/jw-12-2000/jw-1229-traps.html. 47 * 48 * @author Christian Plattner, plattner@trilead.com 49 * @version $Id: StreamGobbler.java,v 1.1 2007/10/15 12:49:56 cplattne Exp $ 50 */ 51 52 public class StreamGobbler extends InputStream { 53 class GobblerThread extends Thread { 54 @Override run()55 public void run() { 56 57 while (true) { 58 try { 59 byte[] saveBuffer = null; 60 61 int avail = is.read(buffer, write_pos, buffer.length - write_pos); 62 63 synchronized (synchronizer) { 64 if (avail <= 0) { 65 isEOF = true; 66 synchronizer.notifyAll(); 67 break; 68 } 69 write_pos += avail; 70 71 int space_available = buffer.length - write_pos; 72 73 if (space_available == 0) { 74 if (read_pos > 0) { 75 saveBuffer = new byte[read_pos]; 76 System.arraycopy(buffer, 0, saveBuffer, 0, read_pos); 77 System.arraycopy(buffer, read_pos, buffer, 0, buffer.length - read_pos); 78 write_pos -= read_pos; 79 read_pos = 0; 80 } else { 81 write_pos = 0; 82 saveBuffer = buffer; 83 } 84 } 85 86 synchronizer.notifyAll(); 87 } 88 89 writeToFile(saveBuffer); 90 91 } catch (IOException e) { 92 synchronized (synchronizer) { 93 exception = e; 94 synchronizer.notifyAll(); 95 break; 96 } 97 } 98 } 99 } 100 } 101 102 private InputStream is; 103 private GobblerThread t; 104 105 private Object synchronizer = new Object(); 106 107 private boolean isEOF = false; 108 private boolean isClosed = false; 109 private IOException exception = null; 110 111 private byte[] buffer; 112 private int read_pos = 0; 113 private int write_pos = 0; 114 private final FileOutputStream mLogStream; 115 private final int mBufferSize; 116 StreamGobbler(InputStream is, File log, int buffer_size)117 public StreamGobbler(InputStream is, File log, int buffer_size) { 118 this.is = is; 119 mBufferSize = buffer_size; 120 FileOutputStream out = null; 121 try { 122 out = new FileOutputStream(log, false); 123 } catch (IOException e) { 124 Log.e(e); 125 } 126 mLogStream = out; 127 buffer = new byte[mBufferSize]; 128 t = new GobblerThread(); 129 t.setDaemon(true); 130 t.start(); 131 } 132 writeToFile(byte[] buffer)133 public void writeToFile(byte[] buffer) { 134 if (mLogStream != null && buffer != null) { 135 try { 136 mLogStream.write(buffer); 137 } catch (IOException e) { 138 Log.e(e); 139 } 140 } 141 } 142 143 @Override read()144 public int read() throws IOException { 145 synchronized (synchronizer) { 146 if (isClosed) { 147 throw new IOException("This StreamGobbler is closed."); 148 } 149 150 while (read_pos == write_pos) { 151 if (exception != null) { 152 throw exception; 153 } 154 155 if (isEOF) { 156 return -1; 157 } 158 159 try { 160 synchronizer.wait(); 161 } catch (InterruptedException e) { 162 } 163 } 164 165 int b = buffer[read_pos++] & 0xff; 166 167 return b; 168 } 169 } 170 171 @Override available()172 public int available() throws IOException { 173 synchronized (synchronizer) { 174 if (isClosed) { 175 throw new IOException("This StreamGobbler is closed."); 176 } 177 178 return write_pos - read_pos; 179 } 180 } 181 182 @Override read(byte[] b)183 public int read(byte[] b) throws IOException { 184 return read(b, 0, b.length); 185 } 186 187 @Override close()188 public void close() throws IOException { 189 synchronized (synchronizer) { 190 if (isClosed) { 191 return; 192 } 193 isClosed = true; 194 isEOF = true; 195 synchronizer.notifyAll(); 196 is.close(); 197 } 198 } 199 200 @Override read(byte[] b, int off, int len)201 public int read(byte[] b, int off, int len) throws IOException { 202 if (b == null) { 203 throw new NullPointerException(); 204 } 205 206 if ((off < 0) || (len < 0) || ((off + len) > b.length) || ((off + len) < 0) || (off > b.length)) { 207 throw new IndexOutOfBoundsException(); 208 } 209 210 if (len == 0) { 211 return 0; 212 } 213 214 synchronized (synchronizer) { 215 if (isClosed) { 216 throw new IOException("This StreamGobbler is closed."); 217 } 218 219 while (read_pos == write_pos) { 220 if (exception != null) { 221 throw exception; 222 } 223 224 if (isEOF) { 225 return -1; 226 } 227 228 try { 229 synchronizer.wait(); 230 } catch (InterruptedException e) { 231 } 232 } 233 234 int avail = write_pos - read_pos; 235 236 avail = (avail > len) ? len : avail; 237 238 System.arraycopy(buffer, read_pos, b, off, avail); 239 240 read_pos += avail; 241 242 return avail; 243 } 244 } 245 } 246