1 /* 2 * Copyright (C) 2018 Square, Inc. 3 * 4 * Licensed under the Apache License, Version 2.0 (the "License"); 5 * you may not use this file except in compliance with the License. 6 * You may obtain a copy of the License at 7 * 8 * http://www.apache.org/licenses/LICENSE-2.0 9 * 10 * Unless required by applicable law or agreed to in writing, software 11 * distributed under the License is distributed on an "AS IS" BASIS, 12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13 * See the License for the specific language governing permissions and 14 * limitations under the License. 15 */ 16 package okio 17 18 import org.junit.After 19 import org.junit.Assert.assertEquals 20 import org.junit.Assert.assertFalse 21 import org.junit.Assert.assertTrue 22 import org.junit.Assert.fail 23 import org.junit.Rule 24 import org.junit.Test 25 import java.io.IOException 26 import java.util.concurrent.CountDownLatch 27 import java.util.concurrent.Executors 28 import java.util.concurrent.TimeUnit 29 import kotlin.test.assertFailsWith 30 import org.junit.rules.Timeout as JUnitTimeout 31 32 class PipeKotlinTest { 33 @JvmField @Rule val timeout = JUnitTimeout(5, TimeUnit.SECONDS) 34 35 private val executorService = Executors.newScheduledThreadPool(1) 36 37 @After @Throws(Exception::class) tearDownnull38 fun tearDown() { 39 executorService.shutdown() 40 } 41 pipenull42 @Test fun pipe() { 43 val pipe = Pipe(6) 44 pipe.sink.write(Buffer().writeUtf8("abc"), 3L) 45 46 val readBuffer = Buffer() 47 assertEquals(3L, pipe.source.read(readBuffer, 6L)) 48 assertEquals("abc", readBuffer.readUtf8()) 49 50 pipe.sink.close() 51 assertEquals(-1L, pipe.source.read(readBuffer, 6L)) 52 53 pipe.source.close() 54 } 55 foldnull56 @Test fun fold() { 57 val pipe = Pipe(128) 58 59 val pipeSink = pipe.sink.buffer() 60 pipeSink.writeUtf8("hello") 61 pipeSink.emit() 62 63 val pipeSource = pipe.source.buffer() 64 assertEquals("hello", pipeSource.readUtf8(5)) 65 66 val foldedSinkBuffer = Buffer() 67 var foldedSinkClosed = false 68 val foldedSink = object : ForwardingSink(foldedSinkBuffer) { 69 override fun close() { 70 foldedSinkClosed = true 71 super.close() 72 } 73 } 74 pipe.fold(foldedSink) 75 76 pipeSink.writeUtf8("world") 77 pipeSink.emit() 78 assertEquals("world", foldedSinkBuffer.readUtf8(5)) 79 80 assertFailsWith<IllegalStateException> { 81 pipeSource.readUtf8() 82 } 83 84 pipeSink.close() 85 assertTrue(foldedSinkClosed) 86 } 87 foldWritesPipeContentsToSinknull88 @Test fun foldWritesPipeContentsToSink() { 89 val pipe = Pipe(128) 90 91 val pipeSink = pipe.sink.buffer() 92 pipeSink.writeUtf8("hello") 93 pipeSink.emit() 94 95 val foldSink = Buffer() 96 pipe.fold(foldSink) 97 98 assertEquals("hello", foldSink.readUtf8(5)) 99 } 100 foldUnblocksBlockedWritenull101 @Test fun foldUnblocksBlockedWrite() { 102 val pipe = Pipe(4) 103 val foldSink = Buffer() 104 105 val latch = CountDownLatch(1) 106 executorService.schedule( 107 { 108 pipe.fold(foldSink) 109 latch.countDown() 110 }, 111 500, TimeUnit.MILLISECONDS 112 ) 113 114 val sink = pipe.sink.buffer() 115 sink.writeUtf8("abcdefgh") // Blocks writing 8 bytes to a 4 byte pipe. 116 sink.close() 117 118 latch.await() 119 assertEquals("abcdefgh", foldSink.readUtf8()) 120 } 121 accessSourceAfterFoldnull122 @Test fun accessSourceAfterFold() { 123 val pipe = Pipe(100L) 124 pipe.fold(Buffer()) 125 assertFailsWith<IllegalStateException> { 126 pipe.source.read(Buffer(), 1L) 127 } 128 } 129 honorsPipeSinkTimeoutOnWritingWhenItIsSmallernull130 @Test fun honorsPipeSinkTimeoutOnWritingWhenItIsSmaller() { 131 val pipe = Pipe(4) 132 val underlying = TimeoutWritingSink() 133 134 underlying.timeout.timeout(biggerTimeoutNanos, TimeUnit.NANOSECONDS) 135 pipe.sink.timeout().timeout(smallerTimeoutNanos, TimeUnit.NANOSECONDS) 136 137 pipe.fold(underlying) 138 139 assertDuration(smallerTimeoutNanos) { 140 pipe.sink.write(Buffer().writeUtf8("abc"), 3) 141 } 142 assertEquals(biggerTimeoutNanos, underlying.timeout().timeoutNanos()) 143 } 144 honorsUnderlyingTimeoutOnWritingWhenItIsSmallernull145 @Test fun honorsUnderlyingTimeoutOnWritingWhenItIsSmaller() { 146 val pipe = Pipe(4) 147 val underlying = TimeoutWritingSink() 148 149 underlying.timeout.timeout(smallerTimeoutNanos, TimeUnit.NANOSECONDS) 150 pipe.sink.timeout().timeout(biggerTimeoutNanos, TimeUnit.NANOSECONDS) 151 152 pipe.fold(underlying) 153 154 assertDuration(smallerTimeoutNanos) { 155 pipe.sink.write(Buffer().writeUtf8("abc"), 3) 156 } 157 assertEquals(smallerTimeoutNanos, underlying.timeout().timeoutNanos()) 158 } 159 honorsPipeSinkTimeoutOnFlushingWhenItIsSmallernull160 @Test fun honorsPipeSinkTimeoutOnFlushingWhenItIsSmaller() { 161 val pipe = Pipe(4) 162 val underlying = TimeoutFlushingSink() 163 164 underlying.timeout.timeout(biggerTimeoutNanos, TimeUnit.NANOSECONDS) 165 pipe.sink.timeout().timeout(smallerTimeoutNanos, TimeUnit.NANOSECONDS) 166 167 pipe.fold(underlying) 168 169 assertDuration(smallerTimeoutNanos) { 170 pipe.sink.flush() 171 } 172 assertEquals(biggerTimeoutNanos, underlying.timeout().timeoutNanos()) 173 } 174 honorsUnderlyingTimeoutOnFlushingWhenItIsSmallernull175 @Test fun honorsUnderlyingTimeoutOnFlushingWhenItIsSmaller() { 176 val pipe = Pipe(4) 177 val underlying = TimeoutFlushingSink() 178 179 underlying.timeout.timeout(smallerTimeoutNanos, TimeUnit.NANOSECONDS) 180 pipe.sink.timeout().timeout(biggerTimeoutNanos, TimeUnit.NANOSECONDS) 181 182 pipe.fold(underlying) 183 184 assertDuration(smallerTimeoutNanos) { 185 pipe.sink.flush() 186 } 187 assertEquals(smallerTimeoutNanos, underlying.timeout().timeoutNanos()) 188 } 189 honorsPipeSinkTimeoutOnClosingWhenItIsSmallernull190 @Test fun honorsPipeSinkTimeoutOnClosingWhenItIsSmaller() { 191 val pipe = Pipe(4) 192 val underlying = TimeoutClosingSink() 193 194 underlying.timeout.timeout(biggerTimeoutNanos, TimeUnit.NANOSECONDS) 195 pipe.sink.timeout().timeout(smallerTimeoutNanos, TimeUnit.NANOSECONDS) 196 197 pipe.fold(underlying) 198 199 assertDuration(smallerTimeoutNanos) { 200 pipe.sink.close() 201 } 202 assertEquals(biggerTimeoutNanos, underlying.timeout().timeoutNanos()) 203 } 204 honorsUnderlyingTimeoutOnClosingWhenItIsSmallernull205 @Test fun honorsUnderlyingTimeoutOnClosingWhenItIsSmaller() { 206 val pipe = Pipe(4) 207 val underlying = TimeoutClosingSink() 208 209 underlying.timeout.timeout(smallerTimeoutNanos, TimeUnit.NANOSECONDS) 210 pipe.sink.timeout().timeout(biggerTimeoutNanos, TimeUnit.NANOSECONDS) 211 212 pipe.fold(underlying) 213 214 assertDuration(smallerTimeoutNanos) { 215 pipe.sink.close() 216 } 217 assertEquals(smallerTimeoutNanos, underlying.timeout().timeoutNanos()) 218 } 219 honorsPipeSinkTimeoutOnWritingWhenUnderlyingSinkTimeoutIsZeronull220 @Test fun honorsPipeSinkTimeoutOnWritingWhenUnderlyingSinkTimeoutIsZero() { 221 val pipeSinkTimeoutNanos = smallerTimeoutNanos 222 223 val pipe = Pipe(4) 224 val underlying = TimeoutWritingSink() 225 226 pipe.sink.timeout().timeout(pipeSinkTimeoutNanos, TimeUnit.NANOSECONDS) 227 228 pipe.fold(underlying) 229 230 assertDuration(pipeSinkTimeoutNanos) { 231 pipe.sink.write(Buffer().writeUtf8("abc"), 3) 232 } 233 assertEquals(0L, underlying.timeout().timeoutNanos()) 234 } 235 honorsUnderlyingSinkTimeoutOnWritingWhenPipeSinkTimeoutIsZeronull236 @Test fun honorsUnderlyingSinkTimeoutOnWritingWhenPipeSinkTimeoutIsZero() { 237 val underlyingSinkTimeoutNanos = smallerTimeoutNanos 238 239 val pipe = Pipe(4) 240 val underlying = TimeoutWritingSink() 241 242 underlying.timeout().timeout(underlyingSinkTimeoutNanos, TimeUnit.NANOSECONDS) 243 244 pipe.fold(underlying) 245 246 assertDuration(underlyingSinkTimeoutNanos) { 247 pipe.sink.write(Buffer().writeUtf8("abc"), 3) 248 } 249 assertEquals(underlyingSinkTimeoutNanos, underlying.timeout().timeoutNanos()) 250 } 251 honorsPipeSinkTimeoutOnFlushingWhenUnderlyingSinkTimeoutIsZeronull252 @Test fun honorsPipeSinkTimeoutOnFlushingWhenUnderlyingSinkTimeoutIsZero() { 253 val pipeSinkTimeoutNanos = smallerTimeoutNanos 254 255 val pipe = Pipe(4) 256 val underlying = TimeoutFlushingSink() 257 258 pipe.sink.timeout().timeout(pipeSinkTimeoutNanos, TimeUnit.NANOSECONDS) 259 260 pipe.fold(underlying) 261 262 assertDuration(pipeSinkTimeoutNanos) { 263 pipe.sink.flush() 264 } 265 assertEquals(0L, underlying.timeout().timeoutNanos()) 266 } 267 honorsUnderlyingSinkTimeoutOnFlushingWhenPipeSinkTimeoutIsZeronull268 @Test fun honorsUnderlyingSinkTimeoutOnFlushingWhenPipeSinkTimeoutIsZero() { 269 val underlyingSinkTimeoutNanos = smallerTimeoutNanos 270 271 val pipe = Pipe(4) 272 val underlying = TimeoutFlushingSink() 273 274 underlying.timeout().timeout(underlyingSinkTimeoutNanos, TimeUnit.NANOSECONDS) 275 276 pipe.fold(underlying) 277 278 assertDuration(underlyingSinkTimeoutNanos) { 279 pipe.sink.flush() 280 } 281 assertEquals(underlyingSinkTimeoutNanos, underlying.timeout().timeoutNanos()) 282 } 283 honorsPipeSinkTimeoutOnClosingWhenUnderlyingSinkTimeoutIsZeronull284 @Test fun honorsPipeSinkTimeoutOnClosingWhenUnderlyingSinkTimeoutIsZero() { 285 val pipeSinkTimeoutNanos = smallerTimeoutNanos 286 287 val pipe = Pipe(4) 288 val underlying = TimeoutClosingSink() 289 290 pipe.sink.timeout().timeout(pipeSinkTimeoutNanos, TimeUnit.NANOSECONDS) 291 292 pipe.fold(underlying) 293 294 assertDuration(pipeSinkTimeoutNanos) { 295 pipe.sink.close() 296 } 297 assertEquals(0L, underlying.timeout().timeoutNanos()) 298 } 299 honorsUnderlyingSinkTimeoutOnClosingWhenPipeSinkTimeoutIsZeronull300 @Test fun honorsUnderlyingSinkTimeoutOnClosingWhenPipeSinkTimeoutIsZero() { 301 val underlyingSinkTimeoutNanos = smallerTimeoutNanos 302 303 val pipe = Pipe(4) 304 val underlying = TimeoutClosingSink() 305 306 underlying.timeout().timeout(underlyingSinkTimeoutNanos, TimeUnit.NANOSECONDS) 307 308 pipe.fold(underlying) 309 310 assertDuration(underlyingSinkTimeoutNanos) { 311 pipe.sink.close() 312 } 313 assertEquals(underlyingSinkTimeoutNanos, underlying.timeout().timeoutNanos()) 314 } 315 honorsPipeSinkDeadlineOnWritingWhenItIsSmallernull316 @Test fun honorsPipeSinkDeadlineOnWritingWhenItIsSmaller() { 317 val pipe = Pipe(4) 318 val underlying = TimeoutWritingSink() 319 320 val underlyingOriginalDeadline = System.nanoTime() + biggerDeadlineNanos 321 underlying.timeout.deadlineNanoTime(underlyingOriginalDeadline) 322 pipe.sink.timeout().deadlineNanoTime(System.nanoTime() + smallerDeadlineNanos) 323 324 pipe.fold(underlying) 325 326 assertDuration(smallerDeadlineNanos) { 327 pipe.sink.write(Buffer().writeUtf8("abc"), 3) 328 } 329 assertEquals(underlyingOriginalDeadline, underlying.timeout().deadlineNanoTime()) 330 } 331 honorsPipeSinkDeadlineOnWritingWhenUnderlyingSinkHasNoDeadlinenull332 @Test fun honorsPipeSinkDeadlineOnWritingWhenUnderlyingSinkHasNoDeadline() { 333 val deadlineNanos = smallerDeadlineNanos 334 335 val pipe = Pipe(4) 336 val underlying = TimeoutWritingSink() 337 338 underlying.timeout.clearDeadline() 339 pipe.sink.timeout().deadlineNanoTime(System.nanoTime() + deadlineNanos) 340 341 pipe.fold(underlying) 342 343 assertDuration(deadlineNanos) { 344 pipe.sink.write(Buffer().writeUtf8("abc"), 3) 345 } 346 assertFalse(underlying.timeout().hasDeadline()) 347 } 348 honorsUnderlyingSinkDeadlineOnWritingWhenItIsSmallernull349 @Test fun honorsUnderlyingSinkDeadlineOnWritingWhenItIsSmaller() { 350 val pipe = Pipe(4) 351 val underlying = TimeoutWritingSink() 352 353 val underlyingOriginalDeadline = System.nanoTime() + smallerDeadlineNanos 354 underlying.timeout.deadlineNanoTime(underlyingOriginalDeadline) 355 pipe.sink.timeout().deadlineNanoTime(System.nanoTime() + biggerDeadlineNanos) 356 357 pipe.fold(underlying) 358 359 assertDuration(smallerDeadlineNanos) { 360 pipe.sink.write(Buffer().writeUtf8("abc"), 3) 361 } 362 assertEquals(underlyingOriginalDeadline, underlying.timeout().deadlineNanoTime()) 363 } 364 honorsUnderlyingSinkDeadlineOnWritingWhenPipeSinkHasNoDeadlinenull365 @Test fun honorsUnderlyingSinkDeadlineOnWritingWhenPipeSinkHasNoDeadline() { 366 val deadlineNanos = smallerDeadlineNanos 367 368 val pipe = Pipe(4) 369 val underlying = TimeoutWritingSink() 370 371 val underlyingOriginalDeadline = System.nanoTime() + deadlineNanos 372 underlying.timeout().deadlineNanoTime(underlyingOriginalDeadline) 373 pipe.sink.timeout().clearDeadline() 374 375 pipe.fold(underlying) 376 377 assertDuration(deadlineNanos) { 378 pipe.sink.write(Buffer().writeUtf8("abc"), 3) 379 } 380 assertEquals(underlyingOriginalDeadline, underlying.timeout().deadlineNanoTime()) 381 } 382 honorsPipeSinkDeadlineOnFlushingWhenItIsSmallernull383 @Test fun honorsPipeSinkDeadlineOnFlushingWhenItIsSmaller() { 384 val pipe = Pipe(4) 385 val underlying = TimeoutFlushingSink() 386 387 val underlyingOriginalDeadline = System.nanoTime() + biggerDeadlineNanos 388 underlying.timeout.deadlineNanoTime(underlyingOriginalDeadline) 389 pipe.sink.timeout().deadlineNanoTime(System.nanoTime() + smallerDeadlineNanos) 390 391 pipe.fold(underlying) 392 393 assertDuration(smallerDeadlineNanos) { 394 pipe.sink.flush() 395 } 396 assertEquals(underlyingOriginalDeadline, underlying.timeout().deadlineNanoTime()) 397 } 398 honorsPipeSinkDeadlineOnFlushingWhenUnderlyingSinkHasNoDeadlinenull399 @Test fun honorsPipeSinkDeadlineOnFlushingWhenUnderlyingSinkHasNoDeadline() { 400 val deadlineNanos = smallerDeadlineNanos 401 402 val pipe = Pipe(4) 403 val underlying = TimeoutFlushingSink() 404 405 underlying.timeout.clearDeadline() 406 pipe.sink.timeout().deadlineNanoTime(System.nanoTime() + deadlineNanos) 407 408 pipe.fold(underlying) 409 410 assertDuration(deadlineNanos) { 411 pipe.sink.flush() 412 } 413 assertFalse(underlying.timeout().hasDeadline()) 414 } 415 honorsUnderlyingSinkDeadlineOnFlushingWhenItIsSmallernull416 @Test fun honorsUnderlyingSinkDeadlineOnFlushingWhenItIsSmaller() { 417 val pipe = Pipe(4) 418 val underlying = TimeoutFlushingSink() 419 420 val underlyingOriginalDeadline = System.nanoTime() + smallerDeadlineNanos 421 underlying.timeout.deadlineNanoTime(underlyingOriginalDeadline) 422 pipe.sink.timeout().deadlineNanoTime(System.nanoTime() + biggerDeadlineNanos) 423 424 pipe.fold(underlying) 425 426 assertDuration(smallerDeadlineNanos) { 427 pipe.sink.flush() 428 } 429 assertEquals(underlyingOriginalDeadline, underlying.timeout().deadlineNanoTime()) 430 } 431 honorsUnderlyingSinkDeadlineOnFlushingWhenPipeSinkHasNoDeadlinenull432 @Test fun honorsUnderlyingSinkDeadlineOnFlushingWhenPipeSinkHasNoDeadline() { 433 val deadlineNanos = smallerDeadlineNanos 434 435 val pipe = Pipe(4) 436 val underlying = TimeoutFlushingSink() 437 438 val underlyingOriginalDeadline = System.nanoTime() + deadlineNanos 439 underlying.timeout().deadlineNanoTime(underlyingOriginalDeadline) 440 pipe.sink.timeout().clearDeadline() 441 442 pipe.fold(underlying) 443 444 assertDuration(deadlineNanos) { 445 pipe.sink.flush() 446 } 447 assertEquals(underlyingOriginalDeadline, underlying.timeout().deadlineNanoTime()) 448 } 449 honorsPipeSinkDeadlineOnClosingWhenItIsSmallernull450 @Test fun honorsPipeSinkDeadlineOnClosingWhenItIsSmaller() { 451 val pipe = Pipe(4) 452 val underlying = TimeoutClosingSink() 453 454 val underlyingOriginalDeadline = System.nanoTime() + biggerDeadlineNanos 455 underlying.timeout.deadlineNanoTime(underlyingOriginalDeadline) 456 pipe.sink.timeout().deadlineNanoTime(System.nanoTime() + smallerDeadlineNanos) 457 458 pipe.fold(underlying) 459 460 assertDuration(smallerDeadlineNanos) { 461 pipe.sink.close() 462 } 463 assertEquals(underlyingOriginalDeadline, underlying.timeout().deadlineNanoTime()) 464 } 465 honorsPipeSinkDeadlineOnClosingWhenUnderlyingSinkHasNoDeadlinenull466 @Test fun honorsPipeSinkDeadlineOnClosingWhenUnderlyingSinkHasNoDeadline() { 467 val deadlineNanos = smallerDeadlineNanos 468 469 val pipe = Pipe(4) 470 val underlying = TimeoutClosingSink() 471 472 underlying.timeout.clearDeadline() 473 pipe.sink.timeout().deadlineNanoTime(System.nanoTime() + deadlineNanos) 474 475 pipe.fold(underlying) 476 477 assertDuration(deadlineNanos) { 478 pipe.sink.close() 479 } 480 assertFalse(underlying.timeout().hasDeadline()) 481 } 482 honorsUnderlyingSinkDeadlineOnClosingWhenItIsSmallernull483 @Test fun honorsUnderlyingSinkDeadlineOnClosingWhenItIsSmaller() { 484 val pipe = Pipe(4) 485 val underlying = TimeoutClosingSink() 486 487 val underlyingOriginalDeadline = System.nanoTime() + smallerDeadlineNanos 488 underlying.timeout.deadlineNanoTime(underlyingOriginalDeadline) 489 pipe.sink.timeout().deadlineNanoTime(System.nanoTime() + biggerDeadlineNanos) 490 491 pipe.fold(underlying) 492 493 assertDuration(smallerDeadlineNanos) { 494 pipe.sink.close() 495 } 496 assertEquals(underlyingOriginalDeadline, underlying.timeout().deadlineNanoTime()) 497 } 498 honorsUnderlyingSinkDeadlineOnClosingWhenPipeSinkHasNoDeadlinenull499 @Test fun honorsUnderlyingSinkDeadlineOnClosingWhenPipeSinkHasNoDeadline() { 500 val deadlineNanos = smallerDeadlineNanos 501 502 val pipe = Pipe(4) 503 val underlying = TimeoutClosingSink() 504 505 val underlyingOriginalDeadline = System.nanoTime() + deadlineNanos 506 underlying.timeout().deadlineNanoTime(underlyingOriginalDeadline) 507 pipe.sink.timeout().clearDeadline() 508 509 pipe.fold(underlying) 510 511 assertDuration(deadlineNanos) { 512 pipe.sink.close() 513 } 514 assertEquals(underlyingOriginalDeadline, underlying.timeout().deadlineNanoTime()) 515 } 516 foldingTwiceThrowsnull517 @Test fun foldingTwiceThrows() { 518 val pipe = Pipe(128) 519 pipe.fold(Buffer()) 520 assertFailsWith<IllegalStateException> { 521 pipe.fold(Buffer()) 522 } 523 } 524 sinkWriteThrowsIOExceptionUnblockBlockedWriternull525 @Test fun sinkWriteThrowsIOExceptionUnblockBlockedWriter() { 526 val pipe = Pipe(4) 527 528 val foldFuture = executorService.schedule( 529 { 530 val foldFailure = assertFailsWith<IOException> { 531 pipe.fold(object : ForwardingSink(blackholeSink()) { 532 override fun write(source: Buffer, byteCount: Long) { 533 throw IOException("boom") 534 } 535 }) 536 } 537 assertEquals("boom", foldFailure.message) 538 }, 539 500, TimeUnit.MILLISECONDS 540 ) 541 542 val writeFailure = assertFailsWith<IOException> { 543 val pipeSink = pipe.sink.buffer() 544 pipeSink.writeUtf8("abcdefghij") 545 pipeSink.emit() // Block writing 10 bytes to a 4 byte pipe. 546 } 547 assertEquals("source is closed", writeFailure.message) 548 549 foldFuture.get() // Confirm no unexpected exceptions. 550 } 551 foldHoldsNoLocksWhenForwardingWritesnull552 @Test fun foldHoldsNoLocksWhenForwardingWrites() { 553 val pipe = Pipe(4) 554 555 val pipeSink = pipe.sink.buffer() 556 pipeSink.writeUtf8("abcd") 557 pipeSink.emit() 558 559 pipe.fold(object : ForwardingSink(blackholeSink()) { 560 override fun write(source: Buffer, byteCount: Long) { 561 assertFalse(Thread.holdsLock(pipe.buffer)) 562 } 563 }) 564 } 565 566 /** 567 * Flushing the pipe wasn't causing the sink to be flushed when it was later folded. This was 568 * causing problems because the folded data was stalled. 569 */ foldFlushesWhenThereIsFoldedDatanull570 @Test fun foldFlushesWhenThereIsFoldedData() { 571 val pipe = Pipe(128) 572 val pipeSink = pipe.sink.buffer() 573 pipeSink.writeUtf8("hello") 574 pipeSink.emit() 575 576 val ultimateSink = Buffer() 577 val unnecessaryWrapper = (ultimateSink as Sink).buffer() 578 579 pipe.fold(unnecessaryWrapper) 580 581 // Data should not have been flushed through the wrapper to the ultimate sink. 582 assertEquals("hello", ultimateSink.readUtf8()) 583 } 584 foldDoesNotFlushWhenThereIsNoFoldedDatanull585 @Test fun foldDoesNotFlushWhenThereIsNoFoldedData() { 586 val pipe = Pipe(128) 587 588 val ultimateSink = Buffer() 589 val unnecessaryWrapper = (ultimateSink as Sink).buffer() 590 unnecessaryWrapper.writeUtf8("hello") 591 592 pipe.fold(unnecessaryWrapper) 593 594 // Data should not have been flushed through the wrapper to the ultimate sink. 595 assertEquals("", ultimateSink.readUtf8()) 596 } 597 foldingClosesUnderlyingSinkWhenPipeSinkIsClosenull598 @Test fun foldingClosesUnderlyingSinkWhenPipeSinkIsClose() { 599 val pipe = Pipe(128) 600 601 val pipeSink = pipe.sink.buffer() 602 pipeSink.writeUtf8("world") 603 pipeSink.close() 604 605 val foldedSinkBuffer = Buffer() 606 var foldedSinkClosed = false 607 val foldedSink = object : ForwardingSink(foldedSinkBuffer) { 608 override fun close() { 609 foldedSinkClosed = true 610 super.close() 611 } 612 } 613 614 pipe.fold(foldedSink) 615 assertEquals("world", foldedSinkBuffer.readUtf8(5)) 616 assertTrue(foldedSinkClosed) 617 } 618 cancelPreventsSinkWritenull619 @Test fun cancelPreventsSinkWrite() { 620 val pipe = Pipe(8) 621 pipe.cancel() 622 623 val pipeSink = pipe.sink.buffer() 624 pipeSink.writeUtf8("hello world") 625 626 try { 627 pipeSink.emit() 628 fail() 629 } catch (e: IOException) { 630 assertEquals("canceled", e.message) 631 } 632 } 633 cancelPreventsSinkFlushnull634 @Test fun cancelPreventsSinkFlush() { 635 val pipe = Pipe(8) 636 pipe.cancel() 637 638 try { 639 pipe.sink.flush() 640 fail() 641 } catch (e: IOException) { 642 assertEquals("canceled", e.message) 643 } 644 } 645 sinkCloseAfterCancelDoesNotThrownull646 @Test fun sinkCloseAfterCancelDoesNotThrow() { 647 val pipe = Pipe(8) 648 pipe.cancel() 649 pipe.sink.close() 650 } 651 cancelInterruptsSinkWritenull652 @Test fun cancelInterruptsSinkWrite() { 653 val pipe = Pipe(8) 654 655 executorService.schedule( 656 { 657 pipe.cancel() 658 }, 659 smallerTimeoutNanos, TimeUnit.NANOSECONDS 660 ) 661 662 val pipeSink = pipe.sink.buffer() 663 pipeSink.writeUtf8("hello world") 664 665 assertDuration(smallerTimeoutNanos) { 666 try { 667 pipeSink.emit() 668 fail() 669 } catch (e: IOException) { 670 assertEquals("canceled", e.message) 671 } 672 } 673 } 674 cancelPreventsSourceReadnull675 @Test fun cancelPreventsSourceRead() { 676 val pipe = Pipe(8) 677 pipe.cancel() 678 679 val pipeSource = pipe.source.buffer() 680 681 try { 682 pipeSource.require(1) 683 fail() 684 } catch (e: IOException) { 685 assertEquals("canceled", e.message) 686 } 687 } 688 sourceCloseAfterCancelDoesNotThrownull689 @Test fun sourceCloseAfterCancelDoesNotThrow() { 690 val pipe = Pipe(8) 691 pipe.cancel() 692 pipe.source.close() 693 } 694 cancelInterruptsSourceReadnull695 @Test fun cancelInterruptsSourceRead() { 696 val pipe = Pipe(8) 697 698 executorService.schedule( 699 { 700 pipe.cancel() 701 }, 702 smallerTimeoutNanos, TimeUnit.NANOSECONDS 703 ) 704 705 val pipeSource = pipe.source.buffer() 706 707 assertDuration(smallerTimeoutNanos) { 708 try { 709 pipeSource.require(1) 710 fail() 711 } catch (e: IOException) { 712 assertEquals("canceled", e.message) 713 } 714 } 715 } 716 cancelPreventsSinkFoldnull717 @Test fun cancelPreventsSinkFold() { 718 val pipe = Pipe(8) 719 pipe.cancel() 720 721 var foldedSinkClosed = false 722 val foldedSink = object : ForwardingSink(Buffer()) { 723 override fun close() { 724 foldedSinkClosed = true 725 super.close() 726 } 727 } 728 729 try { 730 pipe.fold(foldedSink) 731 fail() 732 } catch (e: IOException) { 733 assertEquals("canceled", e.message) 734 } 735 736 // But the fold is still performed so close() closes everything. 737 assertFalse(foldedSinkClosed) 738 pipe.sink.close() 739 assertTrue(foldedSinkClosed) 740 } 741 cancelInterruptsSinkFoldnull742 @Test fun cancelInterruptsSinkFold() { 743 val pipe = Pipe(128) 744 val pipeSink = pipe.sink.buffer() 745 pipeSink.writeUtf8("hello") 746 pipeSink.emit() 747 748 var foldedSinkClosed = false 749 val foldedSink = object : ForwardingSink(Buffer()) { 750 override fun write(source: Buffer, byteCount: Long) { 751 assertEquals("hello", source.readUtf8(byteCount)) 752 753 // Write bytes to the original pipe so the pipe write doesn't complete! 754 pipeSink.writeUtf8("more bytes") 755 pipeSink.emit() 756 757 // Cancel while the pipe is writing. 758 pipe.cancel() 759 } 760 761 override fun close() { 762 foldedSinkClosed = true 763 super.close() 764 } 765 } 766 767 try { 768 pipe.fold(foldedSink) 769 fail() 770 } catch (e: IOException) { 771 assertEquals("canceled", e.message) 772 } 773 774 // But the fold is still performed so close() closes everything. 775 assertFalse(foldedSinkClosed) 776 pipe.sink.close() 777 assertTrue(foldedSinkClosed) 778 } 779 assertDurationnull780 private fun assertDuration(expected: Long, block: () -> Unit) { 781 val start = System.currentTimeMillis() 782 block() 783 val elapsed = TimeUnit.MILLISECONDS.toNanos(System.currentTimeMillis() - start) 784 785 assertEquals( 786 expected.toDouble(), elapsed.toDouble(), 787 TimeUnit.MILLISECONDS.toNanos(200).toDouble() 788 ) 789 } 790 791 /** Writes on this sink never complete. They can only time out. */ 792 class TimeoutWritingSink : Sink { 793 val timeout = object : AsyncTimeout() { timedOutnull794 override fun timedOut() { 795 synchronized(this@TimeoutWritingSink) { 796 (this@TimeoutWritingSink as Object).notifyAll() 797 } 798 } 799 } 800 writenull801 override fun write(source: Buffer, byteCount: Long) { 802 timeout.enter() 803 try { 804 synchronized(this) { 805 (this as Object).wait() 806 } 807 } finally { 808 timeout.exit() 809 } 810 source.skip(byteCount) 811 } 812 flushnull813 override fun flush() = Unit 814 815 override fun close() = Unit 816 817 override fun timeout() = timeout 818 } 819 820 /** Flushes on this sink never complete. They can only time out. */ 821 class TimeoutFlushingSink : Sink { 822 val timeout = object : AsyncTimeout() { 823 override fun timedOut() { 824 synchronized(this@TimeoutFlushingSink) { 825 (this@TimeoutFlushingSink as Object).notifyAll() 826 } 827 } 828 } 829 830 override fun write(source: Buffer, byteCount: Long) = source.skip(byteCount) 831 832 override fun flush() { 833 timeout.enter() 834 try { 835 synchronized(this) { 836 (this as Object).wait() 837 } 838 } finally { 839 timeout.exit() 840 } 841 } 842 843 override fun close() = Unit 844 845 override fun timeout() = timeout 846 } 847 848 /** Closes on this sink never complete. They can only time out. */ 849 class TimeoutClosingSink : Sink { 850 val timeout = object : AsyncTimeout() { timedOutnull851 override fun timedOut() { 852 synchronized(this@TimeoutClosingSink) { 853 (this@TimeoutClosingSink as Object).notifyAll() 854 } 855 } 856 } 857 writenull858 override fun write(source: Buffer, byteCount: Long) = source.skip(byteCount) 859 860 override fun flush() = Unit 861 862 override fun close() { 863 timeout.enter() 864 try { 865 synchronized(this) { 866 (this as Object).wait() 867 } 868 } finally { 869 timeout.exit() 870 } 871 } 872 timeoutnull873 override fun timeout() = timeout 874 } 875 876 companion object { 877 val smallerTimeoutNanos = TimeUnit.MILLISECONDS.toNanos(500L) 878 val biggerTimeoutNanos = TimeUnit.MILLISECONDS.toNanos(1500L) 879 880 val smallerDeadlineNanos = TimeUnit.MILLISECONDS.toNanos(500L) 881 val biggerDeadlineNanos = TimeUnit.MILLISECONDS.toNanos(1500L) 882 } 883 } 884