1 /*
2  * Copyright (C) 2016 Google Inc.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License"); you may not
5  * use this file except in compliance with the License. You may obtain a copy of
6  * the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12  * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13  * License for the specific language governing permissions and limitations under
14  * the License.
15  */
16 
17 package com.trilead.ssh2;
18 
19 import com.googlecode.android_scripting.Log;
20 
21 import java.io.File;
22 import java.io.FileOutputStream;
23 import java.io.IOException;
24 import java.io.InputStream;
25 
26 /**
27  * A <code>StreamGobbler</code> is an InputStream that uses an internal worker thread to constantly
28  * consume input from another InputStream. It uses a buffer to store the consumed data. The buffer
29  * size is automatically adjusted, if needed.
30  * <p>
31  * This class is sometimes very convenient - if you wrap a session's STDOUT and STDERR InputStreams
32  * with instances of this class, then you don't have to bother about the shared window of STDOUT and
33  * STDERR in the low level SSH-2 protocol, since all arriving data will be immediatelly consumed by
34  * the worker threads. Also, as a side effect, the streams will be buffered (e.g., single byte
35  * read() operations are faster).
36  * <p>
37  * Other SSH for Java libraries include this functionality by default in their STDOUT and STDERR
38  * InputStream implementations, however, please be aware that this approach has also a downside:
39  * <p>
40  * If you do not call the StreamGobbler's <code>read()</code> method often enough and the peer is
41  * constantly sending huge amounts of data, then you will sooner or later encounter a low memory
42  * situation due to the aggregated data (well, it also depends on the Java heap size). Joe Average
43  * will like this class anyway - a paranoid programmer would never use such an approach.
44  * <p>
45  * The term "StreamGobbler" was taken from an article called "When Runtime.exec() won't", see
46  * http://www.javaworld.com/javaworld/jw-12-2000/jw-1229-traps.html.
47  *
48  * @author Christian Plattner, plattner@trilead.com
49  * @version $Id: StreamGobbler.java,v 1.1 2007/10/15 12:49:56 cplattne Exp $
50  */
51 
52 public class StreamGobbler extends InputStream {
53   class GobblerThread extends Thread {
54     @Override
run()55     public void run() {
56 
57       while (true) {
58         try {
59           byte[] saveBuffer = null;
60 
61           int avail = is.read(buffer, write_pos, buffer.length - write_pos);
62 
63           synchronized (synchronizer) {
64             if (avail <= 0) {
65               isEOF = true;
66               synchronizer.notifyAll();
67               break;
68             }
69             write_pos += avail;
70 
71             int space_available = buffer.length - write_pos;
72 
73             if (space_available == 0) {
74               if (read_pos > 0) {
75                 saveBuffer = new byte[read_pos];
76                 System.arraycopy(buffer, 0, saveBuffer, 0, read_pos);
77                 System.arraycopy(buffer, read_pos, buffer, 0, buffer.length - read_pos);
78                 write_pos -= read_pos;
79                 read_pos = 0;
80               } else {
81                 write_pos = 0;
82                 saveBuffer = buffer;
83               }
84             }
85 
86             synchronizer.notifyAll();
87           }
88 
89           writeToFile(saveBuffer);
90 
91         } catch (IOException e) {
92           synchronized (synchronizer) {
93             exception = e;
94             synchronizer.notifyAll();
95             break;
96           }
97         }
98       }
99     }
100   }
101 
102   private InputStream is;
103   private GobblerThread t;
104 
105   private Object synchronizer = new Object();
106 
107   private boolean isEOF = false;
108   private boolean isClosed = false;
109   private IOException exception = null;
110 
111   private byte[] buffer;
112   private int read_pos = 0;
113   private int write_pos = 0;
114   private final FileOutputStream mLogStream;
115   private final int mBufferSize;
116 
StreamGobbler(InputStream is, File log, int buffer_size)117   public StreamGobbler(InputStream is, File log, int buffer_size) {
118     this.is = is;
119     mBufferSize = buffer_size;
120     FileOutputStream out = null;
121     try {
122       out = new FileOutputStream(log, false);
123     } catch (IOException e) {
124       Log.e(e);
125     }
126     mLogStream = out;
127     buffer = new byte[mBufferSize];
128     t = new GobblerThread();
129     t.setDaemon(true);
130     t.start();
131   }
132 
writeToFile(byte[] buffer)133   public void writeToFile(byte[] buffer) {
134     if (mLogStream != null && buffer != null) {
135       try {
136         mLogStream.write(buffer);
137       } catch (IOException e) {
138         Log.e(e);
139       }
140     }
141   }
142 
143   @Override
read()144   public int read() throws IOException {
145     synchronized (synchronizer) {
146       if (isClosed) {
147         throw new IOException("This StreamGobbler is closed.");
148       }
149 
150       while (read_pos == write_pos) {
151         if (exception != null) {
152           throw exception;
153         }
154 
155         if (isEOF) {
156           return -1;
157         }
158 
159         try {
160           synchronizer.wait();
161         } catch (InterruptedException e) {
162         }
163       }
164 
165       int b = buffer[read_pos++] & 0xff;
166 
167       return b;
168     }
169   }
170 
171   @Override
available()172   public int available() throws IOException {
173     synchronized (synchronizer) {
174       if (isClosed) {
175         throw new IOException("This StreamGobbler is closed.");
176       }
177 
178       return write_pos - read_pos;
179     }
180   }
181 
182   @Override
read(byte[] b)183   public int read(byte[] b) throws IOException {
184     return read(b, 0, b.length);
185   }
186 
187   @Override
close()188   public void close() throws IOException {
189     synchronized (synchronizer) {
190       if (isClosed) {
191         return;
192       }
193       isClosed = true;
194       isEOF = true;
195       synchronizer.notifyAll();
196       is.close();
197     }
198   }
199 
200   @Override
read(byte[] b, int off, int len)201   public int read(byte[] b, int off, int len) throws IOException {
202     if (b == null) {
203       throw new NullPointerException();
204     }
205 
206     if ((off < 0) || (len < 0) || ((off + len) > b.length) || ((off + len) < 0) || (off > b.length)) {
207       throw new IndexOutOfBoundsException();
208     }
209 
210     if (len == 0) {
211       return 0;
212     }
213 
214     synchronized (synchronizer) {
215       if (isClosed) {
216         throw new IOException("This StreamGobbler is closed.");
217       }
218 
219       while (read_pos == write_pos) {
220         if (exception != null) {
221           throw exception;
222         }
223 
224         if (isEOF) {
225           return -1;
226         }
227 
228         try {
229           synchronizer.wait();
230         } catch (InterruptedException e) {
231         }
232       }
233 
234       int avail = write_pos - read_pos;
235 
236       avail = (avail > len) ? len : avail;
237 
238       System.arraycopy(buffer, read_pos, b, off, avail);
239 
240       read_pos += avail;
241 
242       return avail;
243     }
244   }
245 }
246