1 /* 2 * Copyright (C) 2011 The Guava Authors 3 * 4 * Licensed under the Apache License, Version 2.0 (the "License"); 5 * you may not use this file except in compliance with the License. 6 * You may obtain a copy of the License at 7 * 8 * http://www.apache.org/licenses/LICENSE-2.0 9 * 10 * Unless required by applicable law or agreed to in writing, software 11 * distributed under the License is distributed on an "AS IS" BASIS, 12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13 * See the License for the specific language governing permissions and 14 * limitations under the License. 15 */ 16 17 package com.google.common.util.concurrent; 18 19 import com.google.common.util.concurrent.AbstractScheduledService.Scheduler; 20 import com.google.common.util.concurrent.Service.State; 21 22 import junit.framework.TestCase; 23 24 import java.util.concurrent.CyclicBarrier; 25 import java.util.concurrent.ExecutionException; 26 import java.util.concurrent.Executors; 27 import java.util.concurrent.Future; 28 import java.util.concurrent.ScheduledExecutorService; 29 import java.util.concurrent.ScheduledFuture; 30 import java.util.concurrent.ScheduledThreadPoolExecutor; 31 import java.util.concurrent.TimeUnit; 32 import java.util.concurrent.atomic.AtomicBoolean; 33 import java.util.concurrent.atomic.AtomicInteger; 34 import java.util.concurrent.atomic.AtomicReference; 35 36 /** 37 * Unit test for {@link AbstractScheduledService}. 38 * 39 * @author Luke Sandberg 40 */ 41 42 public class AbstractScheduledServiceTest extends TestCase { 43 44 volatile Scheduler configuration = Scheduler.newFixedDelaySchedule(0, 10, TimeUnit.MILLISECONDS); 45 volatile ScheduledFuture<?> future = null; 46 47 volatile boolean atFixedRateCalled = false; 48 volatile boolean withFixedDelayCalled = false; 49 volatile boolean scheduleCalled = false; 50 51 final ScheduledExecutorService executor = new ScheduledThreadPoolExecutor(10) { 52 @Override 53 public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command, long initialDelay, 54 long delay, TimeUnit unit) { 55 return future = super.scheduleWithFixedDelay(command, initialDelay, delay, unit); 56 } 57 }; 58 testServiceStartStop()59 public void testServiceStartStop() throws Exception { 60 NullService service = new NullService(); 61 service.startAsync().awaitRunning(); 62 assertFalse(future.isDone()); 63 service.stopAsync().awaitTerminated(); 64 assertTrue(future.isCancelled()); 65 } 66 67 private class NullService extends AbstractScheduledService { runOneIteration()68 @Override protected void runOneIteration() throws Exception {} scheduler()69 @Override protected Scheduler scheduler() { return configuration; } executor()70 @Override protected ScheduledExecutorService executor() { return executor; } 71 } 72 testFailOnExceptionFromRun()73 public void testFailOnExceptionFromRun() throws Exception { 74 TestService service = new TestService(); 75 service.runException = new Exception(); 76 service.startAsync().awaitRunning(); 77 service.runFirstBarrier.await(); 78 service.runSecondBarrier.await(); 79 try { 80 future.get(); 81 fail(); 82 } catch (ExecutionException e) { 83 // An execution exception holds a runtime exception (from throwables.propogate) that holds our 84 // original exception. 85 assertEquals(service.runException, e.getCause().getCause()); 86 } 87 assertEquals(service.state(), Service.State.FAILED); 88 } 89 testFailOnExceptionFromStartUp()90 public void testFailOnExceptionFromStartUp() { 91 TestService service = new TestService(); 92 service.startUpException = new Exception(); 93 try { 94 service.startAsync().awaitRunning(); 95 fail(); 96 } catch (IllegalStateException e) { 97 assertEquals(service.startUpException, e.getCause()); 98 } 99 assertEquals(0, service.numberOfTimesRunCalled.get()); 100 assertEquals(Service.State.FAILED, service.state()); 101 } 102 testFailOnExceptionFromShutDown()103 public void testFailOnExceptionFromShutDown() throws Exception { 104 TestService service = new TestService(); 105 service.shutDownException = new Exception(); 106 service.startAsync().awaitRunning(); 107 service.runFirstBarrier.await(); 108 service.stopAsync(); 109 service.runSecondBarrier.await(); 110 try { 111 service.awaitTerminated(); 112 fail(); 113 } catch (IllegalStateException e) { 114 assertEquals(service.shutDownException, e.getCause()); 115 } 116 assertEquals(Service.State.FAILED, service.state()); 117 } 118 testRunOneIterationCalledMultipleTimes()119 public void testRunOneIterationCalledMultipleTimes() throws Exception { 120 TestService service = new TestService(); 121 service.startAsync().awaitRunning(); 122 for (int i = 1; i < 10; i++) { 123 service.runFirstBarrier.await(); 124 assertEquals(i, service.numberOfTimesRunCalled.get()); 125 service.runSecondBarrier.await(); 126 } 127 service.runFirstBarrier.await(); 128 service.stopAsync(); 129 service.runSecondBarrier.await(); 130 service.stopAsync().awaitTerminated(); 131 } 132 testExecutorOnlyCalledOnce()133 public void testExecutorOnlyCalledOnce() throws Exception { 134 TestService service = new TestService(); 135 service.startAsync().awaitRunning(); 136 // It should be called once during startup. 137 assertEquals(1, service.numberOfTimesExecutorCalled.get()); 138 for (int i = 1; i < 10; i++) { 139 service.runFirstBarrier.await(); 140 assertEquals(i, service.numberOfTimesRunCalled.get()); 141 service.runSecondBarrier.await(); 142 } 143 service.runFirstBarrier.await(); 144 service.stopAsync(); 145 service.runSecondBarrier.await(); 146 service.stopAsync().awaitTerminated(); 147 // Only called once overall. 148 assertEquals(1, service.numberOfTimesExecutorCalled.get()); 149 } 150 testDefaultExecutorIsShutdownWhenServiceIsStopped()151 public void testDefaultExecutorIsShutdownWhenServiceIsStopped() throws Exception { 152 final AtomicReference<ScheduledExecutorService> executor = Atomics.newReference(); 153 AbstractScheduledService service = new AbstractScheduledService() { 154 @Override protected void runOneIteration() throws Exception {} 155 156 @Override protected ScheduledExecutorService executor() { 157 executor.set(super.executor()); 158 return executor.get(); 159 } 160 161 @Override protected Scheduler scheduler() { 162 return Scheduler.newFixedDelaySchedule(0, 1, TimeUnit.MILLISECONDS); 163 } 164 }; 165 166 service.startAsync(); 167 assertFalse(service.executor().isShutdown()); 168 service.awaitRunning(); 169 service.stopAsync(); 170 service.awaitTerminated(); 171 assertTrue(executor.get().awaitTermination(100, TimeUnit.MILLISECONDS)); 172 } 173 testDefaultExecutorIsShutdownWhenServiceFails()174 public void testDefaultExecutorIsShutdownWhenServiceFails() throws Exception { 175 final AtomicReference<ScheduledExecutorService> executor = Atomics.newReference(); 176 AbstractScheduledService service = new AbstractScheduledService() { 177 @Override protected void startUp() throws Exception { 178 throw new Exception("Failed"); 179 } 180 181 @Override protected void runOneIteration() throws Exception {} 182 183 @Override protected ScheduledExecutorService executor() { 184 executor.set(super.executor()); 185 return executor.get(); 186 } 187 188 @Override protected Scheduler scheduler() { 189 return Scheduler.newFixedDelaySchedule(0, 1, TimeUnit.MILLISECONDS); 190 } 191 }; 192 193 try { 194 service.startAsync().awaitRunning(); 195 fail("Expected service to fail during startup"); 196 } catch (IllegalStateException expected) {} 197 198 assertTrue(executor.get().awaitTermination(100, TimeUnit.MILLISECONDS)); 199 } 200 testSchedulerOnlyCalledOnce()201 public void testSchedulerOnlyCalledOnce() throws Exception { 202 TestService service = new TestService(); 203 service.startAsync().awaitRunning(); 204 // It should be called once during startup. 205 assertEquals(1, service.numberOfTimesSchedulerCalled.get()); 206 for (int i = 1; i < 10; i++) { 207 service.runFirstBarrier.await(); 208 assertEquals(i, service.numberOfTimesRunCalled.get()); 209 service.runSecondBarrier.await(); 210 } 211 service.runFirstBarrier.await(); 212 service.stopAsync(); 213 service.runSecondBarrier.await(); 214 service.awaitTerminated(); 215 // Only called once overall. 216 assertEquals(1, service.numberOfTimesSchedulerCalled.get()); 217 } 218 219 private class TestService extends AbstractScheduledService { 220 CyclicBarrier runFirstBarrier = new CyclicBarrier(2); 221 CyclicBarrier runSecondBarrier = new CyclicBarrier(2); 222 223 volatile boolean startUpCalled = false; 224 volatile boolean shutDownCalled = false; 225 AtomicInteger numberOfTimesRunCalled = new AtomicInteger(0); 226 AtomicInteger numberOfTimesExecutorCalled = new AtomicInteger(0); 227 AtomicInteger numberOfTimesSchedulerCalled = new AtomicInteger(0); 228 volatile Exception runException = null; 229 volatile Exception startUpException = null; 230 volatile Exception shutDownException = null; 231 232 @Override runOneIteration()233 protected void runOneIteration() throws Exception { 234 assertTrue(startUpCalled); 235 assertFalse(shutDownCalled); 236 numberOfTimesRunCalled.incrementAndGet(); 237 assertEquals(State.RUNNING, state()); 238 runFirstBarrier.await(); 239 runSecondBarrier.await(); 240 if (runException != null) { 241 throw runException; 242 } 243 } 244 245 @Override startUp()246 protected void startUp() throws Exception { 247 assertFalse(startUpCalled); 248 assertFalse(shutDownCalled); 249 startUpCalled = true; 250 assertEquals(State.STARTING, state()); 251 if (startUpException != null) { 252 throw startUpException; 253 } 254 } 255 256 @Override shutDown()257 protected void shutDown() throws Exception { 258 assertTrue(startUpCalled); 259 assertFalse(shutDownCalled); 260 shutDownCalled = true; 261 if (shutDownException != null) { 262 throw shutDownException; 263 } 264 } 265 266 @Override executor()267 protected ScheduledExecutorService executor() { 268 numberOfTimesExecutorCalled.incrementAndGet(); 269 return executor; 270 } 271 272 @Override scheduler()273 protected Scheduler scheduler() { 274 numberOfTimesSchedulerCalled.incrementAndGet(); 275 return configuration; 276 } 277 } 278 279 public static class SchedulerTest extends TestCase { 280 // These constants are arbitrary and just used to make sure that the correct method is called 281 // with the correct parameters. 282 private static final int initialDelay = 10; 283 private static final int delay = 20; 284 private static final TimeUnit unit = TimeUnit.MILLISECONDS; 285 286 // Unique runnable object used for comparison. 287 final Runnable testRunnable = new Runnable() {@Override public void run() {}}; 288 boolean called = false; 289 assertSingleCallWithCorrectParameters(Runnable command, long initialDelay, long delay, TimeUnit unit)290 private void assertSingleCallWithCorrectParameters(Runnable command, long initialDelay, 291 long delay, TimeUnit unit) { 292 assertFalse(called); // only called once. 293 called = true; 294 assertEquals(SchedulerTest.initialDelay, initialDelay); 295 assertEquals(SchedulerTest.delay, delay); 296 assertEquals(SchedulerTest.unit, unit); 297 assertEquals(testRunnable, command); 298 } 299 testFixedRateSchedule()300 public void testFixedRateSchedule() { 301 Scheduler schedule = Scheduler.newFixedRateSchedule(initialDelay, delay, unit); 302 schedule.schedule(null, new ScheduledThreadPoolExecutor(1) { 303 @Override 304 public ScheduledFuture<?> scheduleAtFixedRate(Runnable command, long initialDelay, 305 long period, TimeUnit unit) { 306 assertSingleCallWithCorrectParameters(command, initialDelay, delay, unit); 307 return null; 308 } 309 }, testRunnable); 310 assertTrue(called); 311 } 312 testFixedDelaySchedule()313 public void testFixedDelaySchedule() { 314 Scheduler schedule = Scheduler.newFixedDelaySchedule(initialDelay, delay, unit); 315 schedule.schedule(null, new ScheduledThreadPoolExecutor(10) { 316 @Override 317 public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command, long initialDelay, 318 long delay, TimeUnit unit) { 319 assertSingleCallWithCorrectParameters(command, initialDelay, delay, unit); 320 return null; 321 } 322 }, testRunnable); 323 assertTrue(called); 324 } 325 326 private class TestCustomScheduler extends AbstractScheduledService.CustomScheduler { 327 public AtomicInteger scheduleCounter = new AtomicInteger(0); 328 @Override getNextSchedule()329 protected Schedule getNextSchedule() throws Exception { 330 scheduleCounter.incrementAndGet(); 331 return new Schedule(0, TimeUnit.SECONDS); 332 } 333 } 334 testCustomSchedule_startStop()335 public void testCustomSchedule_startStop() throws Exception { 336 final CyclicBarrier firstBarrier = new CyclicBarrier(2); 337 final CyclicBarrier secondBarrier = new CyclicBarrier(2); 338 final AtomicBoolean shouldWait = new AtomicBoolean(true); 339 Runnable task = new Runnable() { 340 @Override public void run() { 341 try { 342 if (shouldWait.get()) { 343 firstBarrier.await(); 344 secondBarrier.await(); 345 } 346 } catch (Exception e) { 347 throw new RuntimeException(e); 348 } 349 } 350 }; 351 TestCustomScheduler scheduler = new TestCustomScheduler(); 352 Future<?> future = scheduler.schedule(null, Executors.newScheduledThreadPool(10), task); 353 firstBarrier.await(); 354 assertEquals(1, scheduler.scheduleCounter.get()); 355 secondBarrier.await(); 356 firstBarrier.await(); 357 assertEquals(2, scheduler.scheduleCounter.get()); 358 shouldWait.set(false); 359 secondBarrier.await(); 360 future.cancel(false); 361 } 362 testCustomSchedulerServiceStop()363 public void testCustomSchedulerServiceStop() throws Exception { 364 TestAbstractScheduledCustomService service = new TestAbstractScheduledCustomService(); 365 service.startAsync().awaitRunning(); 366 service.firstBarrier.await(); 367 assertEquals(1, service.numIterations.get()); 368 service.stopAsync(); 369 service.secondBarrier.await(); 370 service.awaitTerminated(); 371 // Sleep for a while just to ensure that our task wasn't called again. 372 Thread.sleep(unit.toMillis(3 * delay)); 373 assertEquals(1, service.numIterations.get()); 374 } 375 testBig()376 public void testBig() throws Exception { 377 TestAbstractScheduledCustomService service = new TestAbstractScheduledCustomService() { 378 @Override protected Scheduler scheduler() { 379 return new AbstractScheduledService.CustomScheduler() { 380 @Override 381 protected Schedule getNextSchedule() throws Exception { 382 // Explicitly yield to increase the probability of a pathological scheduling. 383 Thread.yield(); 384 return new Schedule(0, TimeUnit.SECONDS); 385 } 386 }; 387 } 388 }; 389 service.useBarriers = false; 390 service.startAsync().awaitRunning(); 391 Thread.sleep(50); 392 service.useBarriers = true; 393 service.firstBarrier.await(); 394 int numIterations = service.numIterations.get(); 395 service.stopAsync(); 396 service.secondBarrier.await(); 397 service.awaitTerminated(); 398 assertEquals(numIterations, service.numIterations.get()); 399 } 400 401 private static class TestAbstractScheduledCustomService extends AbstractScheduledService { 402 final AtomicInteger numIterations = new AtomicInteger(0); 403 volatile boolean useBarriers = true; 404 final CyclicBarrier firstBarrier = new CyclicBarrier(2); 405 final CyclicBarrier secondBarrier = new CyclicBarrier(2); 406 runOneIteration()407 @Override protected void runOneIteration() throws Exception { 408 numIterations.incrementAndGet(); 409 if (useBarriers) { 410 firstBarrier.await(); 411 secondBarrier.await(); 412 } 413 } 414 executor()415 @Override protected ScheduledExecutorService executor() { 416 // use a bunch of threads so that weird overlapping schedules are more likely to happen. 417 return Executors.newScheduledThreadPool(10); 418 } 419 startUp()420 @Override protected void startUp() throws Exception {} 421 shutDown()422 @Override protected void shutDown() throws Exception {} 423 scheduler()424 @Override protected Scheduler scheduler() { 425 return new CustomScheduler() { 426 @Override 427 protected Schedule getNextSchedule() throws Exception { 428 return new Schedule(delay, unit); 429 }}; 430 } 431 } 432 testCustomSchedulerFailure()433 public void testCustomSchedulerFailure() throws Exception { 434 TestFailingCustomScheduledService service = new TestFailingCustomScheduledService(); 435 service.startAsync().awaitRunning(); 436 for (int i = 1; i < 4; i++) { 437 service.firstBarrier.await(); 438 assertEquals(i, service.numIterations.get()); 439 service.secondBarrier.await(); 440 } 441 Thread.sleep(1000); 442 try { 443 service.stopAsync().awaitTerminated(100, TimeUnit.SECONDS); 444 fail(); 445 } catch (IllegalStateException e) { 446 assertEquals(State.FAILED, service.state()); 447 } 448 } 449 450 private static class TestFailingCustomScheduledService extends AbstractScheduledService { 451 final AtomicInteger numIterations = new AtomicInteger(0); 452 final CyclicBarrier firstBarrier = new CyclicBarrier(2); 453 final CyclicBarrier secondBarrier = new CyclicBarrier(2); 454 runOneIteration()455 @Override protected void runOneIteration() throws Exception { 456 numIterations.incrementAndGet(); 457 firstBarrier.await(); 458 secondBarrier.await(); 459 } 460 executor()461 @Override protected ScheduledExecutorService executor() { 462 // use a bunch of threads so that weird overlapping schedules are more likely to happen. 463 return Executors.newScheduledThreadPool(10); 464 } 465 scheduler()466 @Override protected Scheduler scheduler() { 467 return new CustomScheduler() { 468 @Override 469 protected Schedule getNextSchedule() throws Exception { 470 if (numIterations.get() > 2) { 471 throw new IllegalStateException("Failed"); 472 } 473 return new Schedule(delay, unit); 474 }}; 475 } 476 } 477 } 478 } 479