1 /*
2  * Copyright (C) 2013 Google Inc.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
5  * in compliance with the License. You may obtain a copy of the License at
6  *
7  * http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software distributed under the License
10  * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
11  * or implied. See the License for the specific language governing permissions and limitations under
12  * the License.
13  */
14 
15 package com.google.caliper.runner;
16 
17 import static com.google.common.base.Preconditions.checkArgument;
18 import static com.google.common.base.Preconditions.checkState;
19 import static org.junit.Assert.assertEquals;
20 import static org.junit.Assert.assertNotSame;
21 import static org.junit.Assert.assertTrue;
22 import static org.junit.Assert.fail;
23 
24 import com.google.caliper.bridge.LogMessage;
25 import com.google.caliper.bridge.OpenedSocket;
26 import com.google.caliper.runner.FakeWorkers.DummyLogMessage;
27 import com.google.caliper.runner.StreamService.StreamItem;
28 import com.google.caliper.runner.StreamService.StreamItem.Kind;
29 import com.google.caliper.util.Parser;
30 import com.google.common.collect.Sets;
31 import com.google.common.util.concurrent.ListenableFuture;
32 import com.google.common.util.concurrent.ListenableFutureTask;
33 import com.google.common.util.concurrent.MoreExecutors;
34 import com.google.common.util.concurrent.Service.Listener;
35 import com.google.common.util.concurrent.Service.State;
36 
37 import org.junit.After;
38 import org.junit.Before;
39 import org.junit.Test;
40 import org.junit.runner.RunWith;
41 import org.junit.runners.JUnit4;
42 
43 import java.io.File;
44 import java.io.FileNotFoundException;
45 import java.io.IOException;
46 import java.io.PrintWriter;
47 import java.io.StringWriter;
48 import java.net.ServerSocket;
49 import java.net.SocketException;
50 import java.text.ParseException;
51 import java.util.Set;
52 import java.util.UUID;
53 import java.util.concurrent.Callable;
54 import java.util.concurrent.CountDownLatch;
55 import java.util.concurrent.TimeUnit;
56 
57 /**
58  * Tests for {@link StreamService}.
59  */
60 @RunWith(JUnit4.class)
61 
62 public class StreamServiceTest {
63 
64   private ServerSocket serverSocket;
65   private final StringWriter writer = new StringWriter();
66   private final PrintWriter stdout = new PrintWriter(writer, true);
67   private final Parser<LogMessage> parser = new Parser<LogMessage>() {
68     @Override public LogMessage parse(final CharSequence text) throws ParseException {
69       return new DummyLogMessage(text.toString());
70     }
71   };
72 
73   private StreamService service;
74   private final CountDownLatch terminalLatch = new CountDownLatch(1);
75   private static final int TRIAL_NUMBER = 3;
76 
setUp()77   @Before public void setUp() throws IOException {
78     serverSocket = new ServerSocket(0);
79   }
80 
closeSocket()81   @After public void closeSocket() throws IOException {
82     serverSocket.close();
83   }
84 
stopService()85   @After public void stopService() {
86     if (service != null && service.state() != State.FAILED && service.state() != State.TERMINATED) {
87       service.stopAsync().awaitTerminated();
88     }
89   }
90 
testReadOutput()91   @Test public void testReadOutput() throws Exception {
92     makeService(FakeWorkers.PrintClient.class, "foo", "bar");
93     service.startAsync().awaitRunning();
94     StreamItem item1 = readItem();
95     assertEquals(Kind.DATA, item1.kind());
96     Set<String> lines = Sets.newHashSet();
97     lines.add(item1.content().toString());
98     StreamItem item2 = readItem();
99     assertEquals(Kind.DATA, item2.kind());
100     lines.add(item2.content().toString());
101     assertEquals(Sets.newHashSet("foo", "bar"), lines);
102     assertEquals(State.RUNNING, service.state());
103     StreamItem item3 = readItem();
104     assertEquals(Kind.EOF, item3.kind());
105     awaitStopped(100, TimeUnit.MILLISECONDS);
106     assertTerminated();
107   }
108 
failingProcess()109   @Test public void failingProcess() throws Exception {
110     makeService(FakeWorkers.Exit.class, "1");
111     service.startAsync().awaitRunning();
112     assertEquals(Kind.EOF, readItem().kind());
113     awaitStopped(100, TimeUnit.MILLISECONDS);
114     assertEquals(State.FAILED, service.state());
115   }
116 
processDoesntExit()117   @Test public void processDoesntExit() throws Exception {
118     // close all fds and then sleep
119     makeService(FakeWorkers.CloseAndSleep.class);
120     service.startAsync().awaitRunning();
121     assertEquals(Kind.EOF, readItem().kind());
122     awaitStopped(200, TimeUnit.MILLISECONDS);  // we
123     assertEquals(State.FAILED, service.state());
124   }
125 
testSocketInputOutput()126   @Test public void testSocketInputOutput() throws Exception {
127     int localport = serverSocket.getLocalPort();
128     // read from the socket and echo it back
129     makeService(FakeWorkers.SocketEchoClient.class, Integer.toString(localport));
130 
131     service.startAsync().awaitRunning();
132     assertEquals(new DummyLogMessage("start"), readItem().content());
133     service.sendMessage(new DummyLogMessage("hello socket world"));
134     assertEquals(new DummyLogMessage("hello socket world"), readItem().content());
135     service.closeWriter();
136     assertEquals(State.RUNNING, service.state());
137     StreamItem nextItem = readItem();
138     assertEquals("Expected EOF " + nextItem, Kind.EOF, nextItem.kind());
139     awaitStopped(100, TimeUnit.MILLISECONDS);
140     assertTerminated();
141   }
142 
testSocketClosesBeforeProcess()143   @Test public void testSocketClosesBeforeProcess() throws Exception {
144     int localport = serverSocket.getLocalPort();
145     // read from the socket and echo it back
146     makeService(FakeWorkers.SocketEchoClient.class, Integer.toString(localport), "foo");
147     service.startAsync().awaitRunning();
148     assertEquals(new DummyLogMessage("start"), readItem().content());
149     service.sendMessage(new DummyLogMessage("hello socket world"));
150     assertEquals(new DummyLogMessage("hello socket world"), readItem().content());
151     service.closeWriter();
152 
153     assertEquals("foo", readItem().content().toString());
154 
155     assertEquals(State.RUNNING, service.state());
156     assertEquals(Kind.EOF, readItem().kind());
157     awaitStopped(100, TimeUnit.MILLISECONDS);
158     assertTerminated();
159   }
160 
failsToAcceptConnection()161   @Test public void failsToAcceptConnection() throws Exception {
162     serverSocket.close();  // This will force serverSocket.accept to throw a SocketException
163     makeService(FakeWorkers.Sleeper.class, Long.toString(TimeUnit.MINUTES.toMillis(10)));
164     try {
165       service.startAsync().awaitRunning();
166       fail();
167     } catch (IllegalStateException expected) {}
168     assertEquals(SocketException.class, service.failureCause().getClass());
169   }
170 
171   /** Reads an item, asserting that there was no timeout. */
readItem()172   private StreamItem readItem() throws InterruptedException {
173     StreamItem item = service.readItem(10, TimeUnit.SECONDS);
174     assertNotSame("Timed out while reading item from worker", Kind.TIMEOUT, item.kind());
175     return item;
176   }
177 
178   /**
179    * Wait for the service to reach a terminal state without calling stop.
180    */
awaitStopped(long time, TimeUnit unit)181   private void awaitStopped(long time, TimeUnit unit) throws InterruptedException {
182     assertTrue(terminalLatch.await(time, unit));
183   }
184 
assertTerminated()185   private void assertTerminated() {
186     State state = service.state();
187     if (state != State.TERMINATED) {
188       if (state == State.FAILED) {
189         throw new AssertionError(service.failureCause());
190       }
191       fail("Expected service to be terminated but was: " + state);
192     }
193   }
194 
195   @SuppressWarnings("resource")
makeService(Class<?> main, String ...args)196   private void makeService(Class<?> main, String ...args) {
197     checkState(service == null, "You can only make one StreamService per test");
198     UUID trialId = UUID.randomUUID();
199     TrialOutputLogger trialOutput = new TrialOutputLogger(new TrialOutputFactory() {
200       @Override public FileAndWriter getTrialOutputFile(int trialNumber)
201           throws FileNotFoundException {
202         checkArgument(trialNumber == TRIAL_NUMBER);
203         return new FileAndWriter(new File("/tmp/not-a-file"), stdout);
204       }
205 
206       @Override public void persistFile(File f) {
207         throw new UnsupportedOperationException();
208       }
209 
210     }, TRIAL_NUMBER, trialId, null /* experiment */);
211     try {
212       // normally the TrialRunLoop opens/closes the logger
213       trialOutput.open();
214     } catch (IOException e) {
215       throw new RuntimeException(e);
216     }
217     service = new StreamService(
218         new WorkerProcess(FakeWorkers.createProcessBuilder(main, args),
219             trialId,
220             getSocketFuture(),
221             new RuntimeShutdownHookRegistrar()),
222         parser,
223         trialOutput);
224     service.addListener(new Listener() {
225       @Override public void starting() {}
226       @Override public void running() {}
227       @Override public void stopping(State from) {}
228       @Override public void terminated(State from) {
229         terminalLatch.countDown();
230       }
231       @Override public void failed(State from, Throwable failure) {
232         terminalLatch.countDown();
233       }
234     }, MoreExecutors.directExecutor());
235   }
236 
getSocketFuture()237   private ListenableFuture<OpenedSocket> getSocketFuture() {
238     ListenableFutureTask<OpenedSocket> openSocketTask = ListenableFutureTask.create(
239         new Callable<OpenedSocket>() {
240           @Override
241           public OpenedSocket call() throws Exception {
242             return OpenedSocket.fromSocket(serverSocket.accept());
243           }
244         });
245     // N.B. this thread will block on serverSocket.accept until a connection is accepted or the
246     // socket is closed, so no matter what this thread will die with the test.
247     Thread opener = new Thread(openSocketTask, "SocketOpener");
248     opener.setDaemon(true);
249     opener.start();
250     return openSocketTask;
251   }
252 }
253