1 /* 2 * Copyright (c) 2018, 2019, Oracle and/or its affiliates. All rights reserved. 3 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. 4 * 5 * This code is free software; you can redistribute it and/or modify it 6 * under the terms of the GNU General Public License version 2 only, as 7 * published by the Free Software Foundation. 8 * 9 * This code is distributed in the hope that it will be useful, but WITHOUT 10 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or 11 * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License 12 * version 2 for more details (a copy is included in the LICENSE file that 13 * accompanied this code). 14 * 15 * You should have received a copy of the GNU General Public License version 16 * 2 along with this work; if not, write to the Free Software Foundation, 17 * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA. 18 * 19 * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA 20 * or visit www.oracle.com if you need additional information or have any 21 * questions. 22 */ 23 package test.java.nio.channels.Selector; 24 25 /* @test 26 * @bug 8199433 8208780 27 * @run testng SelectWithConsumer 28 * @summary Unit test for Selector select(Consumer), select(Consumer,long) and 29 * selectNow(Consumer) 30 */ 31 32 import java.io.Closeable; 33 import java.io.IOException; 34 import java.net.InetSocketAddress; 35 import java.nio.ByteBuffer; 36 import java.nio.channels.ClosedSelectorException; 37 import java.nio.channels.Pipe; 38 import java.nio.channels.SelectionKey; 39 import java.nio.channels.Selector; 40 import java.nio.channels.ServerSocketChannel; 41 import java.nio.channels.SocketChannel; 42 import java.nio.channels.WritableByteChannel; 43 import java.util.concurrent.Executors; 44 import java.util.concurrent.ScheduledExecutorService; 45 import java.util.concurrent.TimeUnit; 46 import java.util.concurrent.atomic.AtomicInteger; 47 import static java.util.concurrent.TimeUnit.*; 48 49 import org.testng.annotations.AfterTest; 50 import org.testng.annotations.Test; 51 import static org.testng.Assert.*; 52 53 import android.platform.test.annotations.LargeTest; 54 55 @Test 56 public class SelectWithConsumer { 57 58 /** 59 * Invoke the select methods that take an action and check that the 60 * accumulated ready ops notified to the action matches the expected ops. 61 */ testActionInvoked(SelectionKey key, int expectedOps)62 void testActionInvoked(SelectionKey key, int expectedOps) throws Exception { 63 Thread callerThread = Thread.currentThread(); 64 Selector sel = key.selector(); 65 int interestOps = key.interestOps(); 66 AtomicInteger notifiedOps = new AtomicInteger(); 67 68 if (expectedOps == 0) { 69 // ensure select(Consumer) does not block indefinitely 70 sel.wakeup(); 71 } else { 72 // ensure that the channel is ready for all expected operations 73 sel.select(); 74 while ((key.readyOps() & interestOps) != expectedOps) { 75 Thread.sleep(100); 76 sel.select(); 77 } 78 } 79 80 // select(Consumer) 81 notifiedOps.set(0); 82 int n = sel.select(k -> { 83 assertTrue(Thread.currentThread() == callerThread); 84 assertTrue(k == key); 85 int readyOps = key.readyOps(); 86 assertTrue((readyOps & interestOps) != 0); 87 assertTrue((readyOps & notifiedOps.get()) == 0); 88 notifiedOps.set(notifiedOps.get() | readyOps); 89 }); 90 assertTrue((n == 1) ^ (expectedOps == 0)); 91 assertTrue(notifiedOps.get() == expectedOps); 92 93 // select(Consumer, timeout) 94 notifiedOps.set(0); 95 n = sel.select(k -> { 96 assertTrue(Thread.currentThread() == callerThread); 97 assertTrue(k == key); 98 int readyOps = key.readyOps(); 99 assertTrue((readyOps & interestOps) != 0); 100 assertTrue((readyOps & notifiedOps.get()) == 0); 101 notifiedOps.set(notifiedOps.get() | readyOps); 102 }, 1000); 103 assertTrue((n == 1) ^ (expectedOps == 0)); 104 assertTrue(notifiedOps.get() == expectedOps); 105 106 // selectNow(Consumer) 107 notifiedOps.set(0); 108 n = sel.selectNow(k -> { 109 assertTrue(Thread.currentThread() == callerThread); 110 assertTrue(k == key); 111 int readyOps = key.readyOps(); 112 assertTrue((readyOps & interestOps) != 0); 113 assertTrue((readyOps & notifiedOps.get()) == 0); 114 notifiedOps.set(notifiedOps.get() | readyOps); 115 }); 116 assertTrue((n == 1) ^ (expectedOps == 0)); 117 assertTrue(notifiedOps.get() == expectedOps); 118 } 119 120 /** 121 * Test that an action is performed when a channel is ready for reading. 122 */ testReadable()123 public void testReadable() throws Exception { 124 Pipe p = Pipe.open(); 125 try (Selector sel = Selector.open()) { 126 Pipe.SinkChannel sink = p.sink(); 127 Pipe.SourceChannel source = p.source(); 128 source.configureBlocking(false); 129 SelectionKey key = source.register(sel, SelectionKey.OP_READ); 130 131 // write to sink to ensure source is readable 132 scheduleWrite(sink, messageBuffer(), 100, MILLISECONDS); 133 134 // test that action is invoked 135 testActionInvoked(key, SelectionKey.OP_READ); 136 } finally { 137 closePipe(p); 138 } 139 } 140 141 /** 142 * Test that an action is performed when a channel is ready for writing. 143 */ testWritable()144 public void testWritable() throws Exception { 145 Pipe p = Pipe.open(); 146 try (Selector sel = Selector.open()) { 147 Pipe.SourceChannel source = p.source(); 148 Pipe.SinkChannel sink = p.sink(); 149 sink.configureBlocking(false); 150 SelectionKey key = sink.register(sel, SelectionKey.OP_WRITE); 151 152 // test that action is invoked 153 testActionInvoked(key, SelectionKey.OP_WRITE); 154 } finally { 155 closePipe(p); 156 } 157 } 158 159 /** 160 * Test that an action is performed when a channel is ready for both 161 * reading and writing. 162 */ testReadableAndWriteable()163 public void testReadableAndWriteable() throws Exception { 164 ServerSocketChannel ssc = null; 165 SocketChannel sc = null; 166 SocketChannel peer = null; 167 try (Selector sel = Selector.open()) { 168 ssc = ServerSocketChannel.open().bind(new InetSocketAddress(0)); 169 sc = SocketChannel.open(ssc.getLocalAddress()); 170 sc.configureBlocking(false); 171 SelectionKey key = sc.register(sel, (SelectionKey.OP_READ | 172 SelectionKey.OP_WRITE)); 173 174 // accept connection and write data so the source is readable 175 peer = ssc.accept(); 176 peer.write(messageBuffer()); 177 178 // test that action is invoked 179 testActionInvoked(key, (SelectionKey.OP_READ | SelectionKey.OP_WRITE)); 180 } finally { 181 if (ssc != null) ssc.close(); 182 if (sc != null) sc.close(); 183 if (peer != null) peer.close(); 184 } 185 } 186 187 /** 188 * Test that the action is called for two selected channels 189 */ testTwoChannels()190 public void testTwoChannels() throws Exception { 191 Pipe p = Pipe.open(); 192 try (Selector sel = Selector.open()) { 193 Pipe.SourceChannel source = p.source(); 194 Pipe.SinkChannel sink = p.sink(); 195 source.configureBlocking(false); 196 sink.configureBlocking(false); 197 SelectionKey key1 = source.register(sel, SelectionKey.OP_READ); 198 SelectionKey key2 = sink.register(sel, SelectionKey.OP_WRITE); 199 200 // write to sink to ensure that the source is readable 201 sink.write(messageBuffer()); 202 203 // wait for key1 to be readable 204 sel.select(); 205 assertTrue(key2.isWritable()); 206 while (!key1.isReadable()) { 207 Thread.sleep(20); 208 sel.select(); 209 } 210 211 var counter = new AtomicInteger(); 212 213 // select(Consumer) 214 counter.set(0); 215 int n = sel.select(k -> { 216 assertTrue(k == key1 || k == key2); 217 counter.incrementAndGet(); 218 }); 219 assertTrue(n == 2); 220 assertTrue(counter.get() == 2); 221 222 // select(Consumer, timeout) 223 counter.set(0); 224 n = sel.select(k -> { 225 assertTrue(k == key1 || k == key2); 226 counter.incrementAndGet(); 227 }, 1000); 228 assertTrue(n == 2); 229 assertTrue(counter.get() == 2); 230 231 // selectNow(Consumer) 232 counter.set(0); 233 n = sel.selectNow(k -> { 234 assertTrue(k == key1 || k == key2); 235 counter.incrementAndGet(); 236 }); 237 assertTrue(n == 2); 238 assertTrue(counter.get() == 2); 239 } finally { 240 closePipe(p); 241 } 242 } 243 244 /** 245 * Test calling select twice, the action should be invoked each time 246 */ testRepeatedSelect1()247 public void testRepeatedSelect1() throws Exception { 248 Pipe p = Pipe.open(); 249 try (Selector sel = Selector.open()) { 250 Pipe.SourceChannel source = p.source(); 251 Pipe.SinkChannel sink = p.sink(); 252 source.configureBlocking(false); 253 SelectionKey key = source.register(sel, SelectionKey.OP_READ); 254 255 // write to sink to ensure that the source is readable 256 sink.write(messageBuffer()); 257 258 // test that action is invoked 259 testActionInvoked(key, SelectionKey.OP_READ); 260 testActionInvoked(key, SelectionKey.OP_READ); 261 262 } finally { 263 closePipe(p); 264 } 265 } 266 267 /** 268 * Test calling select twice. An I/O operation is performed after the 269 * first select so the channel will not be selected by the second select. 270 */ testRepeatedSelect2()271 public void testRepeatedSelect2() throws Exception { 272 Pipe p = Pipe.open(); 273 try (Selector sel = Selector.open()) { 274 Pipe.SourceChannel source = p.source(); 275 Pipe.SinkChannel sink = p.sink(); 276 source.configureBlocking(false); 277 SelectionKey key = source.register(sel, SelectionKey.OP_READ); 278 279 // write to sink to ensure that the source is readable 280 sink.write(messageBuffer()); 281 282 // test that action is invoked 283 testActionInvoked(key, SelectionKey.OP_READ); 284 285 // read all bytes 286 int n; 287 ByteBuffer bb = ByteBuffer.allocate(100); 288 do { 289 n = source.read(bb); 290 bb.clear(); 291 } while (n > 0); 292 293 // test that action is not invoked 294 testActionInvoked(key, 0); 295 } finally { 296 closePipe(p); 297 } 298 } 299 300 /** 301 * Test timeout 302 */ testTimeout()303 public void testTimeout() throws Exception { 304 Pipe p = Pipe.open(); 305 try (Selector sel = Selector.open()) { 306 Pipe.SourceChannel source = p.source(); 307 Pipe.SinkChannel sink = p.sink(); 308 source.configureBlocking(false); 309 source.register(sel, SelectionKey.OP_READ); 310 long start = System.currentTimeMillis(); 311 int n = sel.select(k -> assertTrue(false), 1000L); 312 long duration = System.currentTimeMillis() - start; 313 assertTrue(n == 0); 314 assertTrue(duration > 500, "select took " + duration + " ms"); 315 } finally { 316 closePipe(p); 317 } 318 } 319 320 /** 321 * Test wakeup prior to select 322 */ testWakeupBeforeSelect()323 public void testWakeupBeforeSelect() throws Exception { 324 // select(Consumer) 325 try (Selector sel = Selector.open()) { 326 sel.wakeup(); 327 int n = sel.select(k -> assertTrue(false)); 328 assertTrue(n == 0); 329 } 330 331 // select(Consumer, timeout) 332 try (Selector sel = Selector.open()) { 333 sel.wakeup(); 334 long start = System.currentTimeMillis(); 335 int n = sel.select(k -> assertTrue(false), 60*1000); 336 long duration = System.currentTimeMillis() - start; 337 assertTrue(n == 0); 338 assertTrue(duration < 5000, "select took " + duration + " ms"); 339 } 340 } 341 342 /** 343 * Test wakeup during select 344 */ 345 @LargeTest 346 public void testWakeupDuringSelect() throws Exception { 347 // select(Consumer) 348 try (Selector sel = Selector.open()) { 349 scheduleWakeup(sel, 1, SECONDS); 350 int n = sel.select(k -> assertTrue(false)); 351 assertTrue(n == 0); 352 } 353 354 // select(Consumer, timeout) try(Selector sel = Selector.open())355 try (Selector sel = Selector.open()) { 356 scheduleWakeup(sel, 1, SECONDS); 357 long start = System.currentTimeMillis(); 358 int n = sel.select(k -> assertTrue(false), 60*1000); 359 long duration = System.currentTimeMillis() - start; 360 assertTrue(n == 0); 361 assertTrue(duration > 500 && duration < 10*1000, 362 "select took " + duration + " ms"); 363 } 364 } 365 366 /** 367 * Test invoking select with interrupt status set 368 */ testInterruptBeforeSelect()369 public void testInterruptBeforeSelect() throws Exception { 370 // select(Consumer) 371 try (Selector sel = Selector.open()) { 372 Thread.currentThread().interrupt(); 373 int n = sel.select(k -> assertTrue(false)); 374 assertTrue(n == 0); 375 assertTrue(Thread.currentThread().isInterrupted()); 376 assertTrue(sel.isOpen()); 377 } finally { 378 Thread.currentThread().interrupted(); // clear interrupt status 379 } 380 381 // select(Consumer, timeout) 382 try (Selector sel = Selector.open()) { 383 Thread.currentThread().interrupt(); 384 long start = System.currentTimeMillis(); 385 int n = sel.select(k -> assertTrue(false), 60*1000); 386 long duration = System.currentTimeMillis() - start; 387 assertTrue(n == 0); 388 assertTrue(duration < 5000, "select took " + duration + " ms"); 389 assertTrue(Thread.currentThread().isInterrupted()); 390 assertTrue(sel.isOpen()); 391 } finally { 392 Thread.currentThread().interrupted(); // clear interrupt status 393 } 394 } 395 396 /** 397 * Test interrupt thread during select 398 */ 399 public void testInterruptDuringSelect() throws Exception { 400 // select(Consumer) 401 try (Selector sel = Selector.open()) { 402 scheduleInterrupt(Thread.currentThread(), 1, SECONDS); 403 int n = sel.select(k -> assertTrue(false)); 404 assertTrue(n == 0); 405 assertTrue(Thread.currentThread().isInterrupted()); 406 assertTrue(sel.isOpen()); 407 } finally { 408 Thread.currentThread().interrupted(); // clear interrupt status 409 } 410 411 // select(Consumer, timeout) try(Selector sel = Selector.open())412 try (Selector sel = Selector.open()) { 413 scheduleInterrupt(Thread.currentThread(), 1, SECONDS); 414 long start = System.currentTimeMillis(); 415 int n = sel.select(k -> assertTrue(false), 60*1000); 416 long duration = System.currentTimeMillis() - start; 417 assertTrue(n == 0); 418 assertTrue(Thread.currentThread().isInterrupted()); 419 assertTrue(sel.isOpen()); 420 } finally { 421 Thread.currentThread().interrupted(); // clear interrupt status 422 } 423 } 424 425 /** 426 * Test invoking select on a closed selector 427 */ 428 @Test(expectedExceptions = ClosedSelectorException.class) testClosedSelector1()429 public void testClosedSelector1() throws Exception { 430 Selector sel = Selector.open(); 431 sel.close(); 432 sel.select(k -> assertTrue(false)); 433 } 434 @Test(expectedExceptions = ClosedSelectorException.class) testClosedSelector2()435 public void testClosedSelector2() throws Exception { 436 Selector sel = Selector.open(); 437 sel.close(); 438 sel.select(k -> assertTrue(false), 1000); 439 } 440 @Test(expectedExceptions = ClosedSelectorException.class) testClosedSelector3()441 public void testClosedSelector3() throws Exception { 442 Selector sel = Selector.open(); 443 sel.close(); 444 sel.selectNow(k -> assertTrue(false)); 445 } 446 447 /** 448 * Test closing selector while in a selection operation 449 */ 450 @LargeTest testCloseDuringSelect()451 public void testCloseDuringSelect() throws Exception { 452 // select(Consumer) 453 try (Selector sel = Selector.open()) { 454 scheduleClose(sel, 3, SECONDS); 455 int n = sel.select(k -> assertTrue(false)); 456 assertTrue(n == 0); 457 assertFalse(sel.isOpen()); 458 } 459 460 // select(Consumer, timeout) 461 try (Selector sel = Selector.open()) { 462 scheduleClose(sel, 3, SECONDS); 463 long start = System.currentTimeMillis(); 464 int n = sel.select(k -> assertTrue(false), 60*1000); 465 long duration = System.currentTimeMillis() - start; 466 assertTrue(n == 0); 467 assertTrue(duration > 2000 && duration < 10*1000, 468 "select took " + duration + " ms"); 469 assertFalse(sel.isOpen()); 470 } 471 } 472 473 /** 474 * Test action closing selector 475 */ 476 @Test(expectedExceptions = ClosedSelectorException.class) testActionClosingSelector()477 public void testActionClosingSelector() throws Exception { 478 Pipe p = Pipe.open(); 479 try (Selector sel = Selector.open()) { 480 Pipe.SourceChannel source = p.source(); 481 Pipe.SinkChannel sink = p.sink(); 482 source.configureBlocking(false); 483 SelectionKey key = source.register(sel, SelectionKey.OP_READ); 484 485 // write to sink to ensure that the source is readable 486 sink.write(messageBuffer()); 487 488 // should relay ClosedSelectorException 489 sel.select(k -> { 490 assertTrue(k == key); 491 try { 492 sel.close(); 493 } catch (IOException ioe) { } 494 }); 495 } finally { 496 closePipe(p); 497 } 498 } 499 500 /** 501 * Test that the action is invoked while synchronized on the selector and 502 * its selected-key set. 503 */ testLocks()504 public void testLocks() throws Exception { 505 Pipe p = Pipe.open(); 506 try (Selector sel = Selector.open()) { 507 Pipe.SourceChannel source = p.source(); 508 Pipe.SinkChannel sink = p.sink(); 509 source.configureBlocking(false); 510 SelectionKey key = source.register(sel, SelectionKey.OP_READ); 511 512 // write to sink to ensure that the source is readable 513 sink.write(messageBuffer()); 514 515 // select(Consumer) 516 sel.select(k -> { 517 assertTrue(k == key); 518 assertTrue(Thread.holdsLock(sel)); 519 assertFalse(Thread.holdsLock(sel.keys())); 520 assertTrue(Thread.holdsLock(sel.selectedKeys())); 521 }); 522 523 // select(Consumer, timeout) 524 sel.select(k -> { 525 assertTrue(k == key); 526 assertTrue(Thread.holdsLock(sel)); 527 assertFalse(Thread.holdsLock(sel.keys())); 528 assertTrue(Thread.holdsLock(sel.selectedKeys())); 529 }, 1000L); 530 531 // selectNow(Consumer) 532 sel.selectNow(k -> { 533 assertTrue(k == key); 534 assertTrue(Thread.holdsLock(sel)); 535 assertFalse(Thread.holdsLock(sel.keys())); 536 assertTrue(Thread.holdsLock(sel.selectedKeys())); 537 }); 538 } finally { 539 closePipe(p); 540 } 541 } 542 543 /** 544 * Test that selection operations remove cancelled keys from the selector's 545 * key and selected-key sets. 546 */ testCancel()547 public void testCancel() throws Exception { 548 Pipe p = Pipe.open(); 549 try (Selector sel = Selector.open()) { 550 Pipe.SinkChannel sink = p.sink(); 551 Pipe.SourceChannel source = p.source(); 552 553 // write to sink to ensure that the source is readable 554 sink.write(messageBuffer()); 555 556 source.configureBlocking(false); 557 SelectionKey key1 = source.register(sel, SelectionKey.OP_READ); 558 // make sure pipe source is readable before we do following checks. 559 // this is sometime necessary on windows where pipe is implemented 560 // as a pair of connected socket, so there is no guarantee that written 561 // bytes on sink side is immediately available on source side. 562 sel.select(); 563 564 sink.configureBlocking(false); 565 SelectionKey key2 = sink.register(sel, SelectionKey.OP_WRITE); 566 sel.selectNow(); 567 568 assertTrue(sel.keys().contains(key1)); 569 assertTrue(sel.keys().contains(key2)); 570 assertTrue(sel.selectedKeys().contains(key1)); 571 assertTrue(sel.selectedKeys().contains(key2)); 572 573 // cancel key1 574 key1.cancel(); 575 int n = sel.selectNow(k -> assertTrue(k == key2)); 576 assertTrue(n == 1); 577 assertFalse(sel.keys().contains(key1)); 578 assertTrue(sel.keys().contains(key2)); 579 sel.selectNow(); 580 assertFalse(sel.selectedKeys().contains(key1)); 581 assertTrue(sel.selectedKeys().contains(key2)); 582 583 // cancel key2 584 key2.cancel(); 585 n = sel.selectNow(k -> assertTrue(false)); 586 assertTrue(n == 0); 587 assertFalse(sel.keys().contains(key1)); 588 assertFalse(sel.keys().contains(key2)); 589 assertFalse(sel.selectedKeys().contains(key1)); 590 assertFalse(sel.selectedKeys().contains(key2)); 591 } finally { 592 closePipe(p); 593 } 594 } 595 596 /** 597 * Test an action invoking select() 598 */ 599 @Test(enabled = false) testReentrantSelect1()600 public void testReentrantSelect1() throws Exception { 601 Pipe p = Pipe.open(); 602 try (Selector sel = Selector.open()) { 603 Pipe.SinkChannel sink = p.sink(); 604 Pipe.SourceChannel source = p.source(); 605 source.configureBlocking(false); 606 source.register(sel, SelectionKey.OP_READ); 607 608 // write to sink to ensure that the source is readable 609 scheduleWrite(sink, messageBuffer(), 100, MILLISECONDS); 610 611 int n = sel.select(k -> { 612 try { 613 sel.select(); 614 assertTrue(false); 615 } catch (IOException ioe) { 616 throw new RuntimeException(ioe); 617 } catch (IllegalStateException expected) { 618 } 619 }); 620 assertTrue(n == 1); 621 } finally { 622 closePipe(p); 623 } 624 } 625 626 /** 627 * Test an action invoking selectNow() 628 */ 629 @Test(enabled = false) testReentrantSelect2()630 public void testReentrantSelect2() throws Exception { 631 Pipe p = Pipe.open(); 632 try (Selector sel = Selector.open()) { 633 Pipe.SinkChannel sink = p.sink(); 634 Pipe.SourceChannel source = p.source(); 635 636 // write to sink to ensure that the source is readable 637 scheduleWrite(sink, messageBuffer(), 100, MILLISECONDS); 638 639 source.configureBlocking(false); 640 source.register(sel, SelectionKey.OP_READ); 641 int n = sel.select(k -> { 642 try { 643 sel.selectNow(); 644 assertTrue(false); 645 } catch (IOException ioe) { 646 throw new RuntimeException(ioe); 647 } catch (IllegalStateException expected) { 648 } 649 }); 650 assertTrue(n == 1); 651 } finally { 652 closePipe(p); 653 } 654 } 655 656 /** 657 * Test an action invoking select(Consumer) 658 */ 659 @Test(enabled = false) testReentrantSelect3()660 public void testReentrantSelect3() throws Exception { 661 Pipe p = Pipe.open(); 662 try (Selector sel = Selector.open()) { 663 Pipe.SinkChannel sink = p.sink(); 664 Pipe.SourceChannel source = p.source(); 665 666 // write to sink to ensure that the source is readable 667 scheduleWrite(sink, messageBuffer(), 100, MILLISECONDS); 668 669 source.configureBlocking(false); 670 source.register(sel, SelectionKey.OP_READ); 671 int n = sel.select(k -> { 672 try { 673 sel.select(x -> assertTrue(false)); 674 assertTrue(false); 675 } catch (IOException ioe) { 676 throw new RuntimeException(ioe); 677 } catch (IllegalStateException expected) { 678 } 679 }); 680 assertTrue(n == 1); 681 } finally { 682 closePipe(p); 683 } 684 } 685 686 /** 687 * Negative timeout 688 */ 689 @Test(expectedExceptions = IllegalArgumentException.class) testNegativeTimeout()690 public void testNegativeTimeout() throws Exception { 691 try (Selector sel = Selector.open()) { 692 sel.select(k -> { }, -1L); 693 } 694 } 695 696 /** 697 * Null action 698 */ 699 @Test(expectedExceptions = NullPointerException.class) testNull1()700 public void testNull1() throws Exception { 701 try (Selector sel = Selector.open()) { 702 sel.select(null); 703 } 704 } 705 @Test(expectedExceptions = NullPointerException.class) testNull2()706 public void testNull2() throws Exception { 707 try (Selector sel = Selector.open()) { 708 sel.select(null, 1000); 709 } 710 } 711 @Test(expectedExceptions = NullPointerException.class) testNull3()712 public void testNull3() throws Exception { 713 try (Selector sel = Selector.open()) { 714 sel.selectNow(null); 715 } 716 } 717 718 719 // -- support methods --- 720 721 private final ScheduledExecutorService POOL = Executors.newScheduledThreadPool(1); 722 723 @AfterTest shutdownThreadPool()724 void shutdownThreadPool() { 725 POOL.shutdown(); 726 } 727 scheduleWakeup(Selector sel, long delay, TimeUnit unit)728 void scheduleWakeup(Selector sel, long delay, TimeUnit unit) { 729 POOL.schedule(() -> sel.wakeup(), delay, unit); 730 } 731 scheduleInterrupt(Thread t, long delay, TimeUnit unit)732 void scheduleInterrupt(Thread t, long delay, TimeUnit unit) { 733 POOL.schedule(() -> t.interrupt(), delay, unit); 734 } 735 scheduleClose(Closeable c, long delay, TimeUnit unit)736 void scheduleClose(Closeable c, long delay, TimeUnit unit) { 737 POOL.schedule(() -> { 738 try { 739 c.close(); 740 } catch (IOException ioe) { 741 ioe.printStackTrace(); 742 } 743 }, delay, unit); 744 } 745 scheduleWrite(WritableByteChannel sink, ByteBuffer buf, long delay, TimeUnit unit)746 void scheduleWrite(WritableByteChannel sink, ByteBuffer buf, long delay, TimeUnit unit) { 747 POOL.schedule(() -> { 748 try { 749 sink.write(buf); 750 } catch (IOException ioe) { 751 ioe.printStackTrace(); 752 } 753 }, delay, unit); 754 } 755 closePipe(Pipe p)756 static void closePipe(Pipe p) { 757 try { p.sink().close(); } catch (IOException ignore) { } 758 try { p.source().close(); } catch (IOException ignore) { } 759 } 760 messageBuffer()761 static ByteBuffer messageBuffer() { 762 try { 763 return ByteBuffer.wrap("message".getBytes("UTF-8")); 764 } catch (Exception e) { 765 throw new RuntimeException(e); 766 } 767 } 768 }