1 /*
2  * Copyright (C) 2018 Square, Inc.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *      http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 package okio
17 
18 import org.junit.After
19 import org.junit.Assert.assertEquals
20 import org.junit.Assert.assertFalse
21 import org.junit.Assert.assertTrue
22 import org.junit.Assert.fail
23 import org.junit.Rule
24 import org.junit.Test
25 import java.io.IOException
26 import java.util.concurrent.CountDownLatch
27 import java.util.concurrent.Executors
28 import java.util.concurrent.TimeUnit
29 import kotlin.test.assertFailsWith
30 import org.junit.rules.Timeout as JUnitTimeout
31 
32 class PipeKotlinTest {
33   @JvmField @Rule val timeout = JUnitTimeout(5, TimeUnit.SECONDS)
34 
35   private val executorService = Executors.newScheduledThreadPool(1)
36 
37   @After @Throws(Exception::class)
tearDownnull38   fun tearDown() {
39     executorService.shutdown()
40   }
41 
pipenull42   @Test fun pipe() {
43     val pipe = Pipe(6)
44     pipe.sink.write(Buffer().writeUtf8("abc"), 3L)
45 
46     val readBuffer = Buffer()
47     assertEquals(3L, pipe.source.read(readBuffer, 6L))
48     assertEquals("abc", readBuffer.readUtf8())
49 
50     pipe.sink.close()
51     assertEquals(-1L, pipe.source.read(readBuffer, 6L))
52 
53     pipe.source.close()
54   }
55 
foldnull56   @Test fun fold() {
57     val pipe = Pipe(128)
58 
59     val pipeSink = pipe.sink.buffer()
60     pipeSink.writeUtf8("hello")
61     pipeSink.emit()
62 
63     val pipeSource = pipe.source.buffer()
64     assertEquals("hello", pipeSource.readUtf8(5))
65 
66     val foldedSinkBuffer = Buffer()
67     var foldedSinkClosed = false
68     val foldedSink = object : ForwardingSink(foldedSinkBuffer) {
69       override fun close() {
70         foldedSinkClosed = true
71         super.close()
72       }
73     }
74     pipe.fold(foldedSink)
75 
76     pipeSink.writeUtf8("world")
77     pipeSink.emit()
78     assertEquals("world", foldedSinkBuffer.readUtf8(5))
79 
80     assertFailsWith<IllegalStateException> {
81       pipeSource.readUtf8()
82     }
83 
84     pipeSink.close()
85     assertTrue(foldedSinkClosed)
86   }
87 
foldWritesPipeContentsToSinknull88   @Test fun foldWritesPipeContentsToSink() {
89     val pipe = Pipe(128)
90 
91     val pipeSink = pipe.sink.buffer()
92     pipeSink.writeUtf8("hello")
93     pipeSink.emit()
94 
95     val foldSink = Buffer()
96     pipe.fold(foldSink)
97 
98     assertEquals("hello", foldSink.readUtf8(5))
99   }
100 
foldUnblocksBlockedWritenull101   @Test fun foldUnblocksBlockedWrite() {
102     val pipe = Pipe(4)
103     val foldSink = Buffer()
104 
105     val latch = CountDownLatch(1)
106     executorService.schedule(
107       {
108         pipe.fold(foldSink)
109         latch.countDown()
110       },
111       500, TimeUnit.MILLISECONDS
112     )
113 
114     val sink = pipe.sink.buffer()
115     sink.writeUtf8("abcdefgh") // Blocks writing 8 bytes to a 4 byte pipe.
116     sink.close()
117 
118     latch.await()
119     assertEquals("abcdefgh", foldSink.readUtf8())
120   }
121 
accessSourceAfterFoldnull122   @Test fun accessSourceAfterFold() {
123     val pipe = Pipe(100L)
124     pipe.fold(Buffer())
125     assertFailsWith<IllegalStateException> {
126       pipe.source.read(Buffer(), 1L)
127     }
128   }
129 
honorsPipeSinkTimeoutOnWritingWhenItIsSmallernull130   @Test fun honorsPipeSinkTimeoutOnWritingWhenItIsSmaller() {
131     val pipe = Pipe(4)
132     val underlying = TimeoutWritingSink()
133 
134     underlying.timeout.timeout(biggerTimeoutNanos, TimeUnit.NANOSECONDS)
135     pipe.sink.timeout().timeout(smallerTimeoutNanos, TimeUnit.NANOSECONDS)
136 
137     pipe.fold(underlying)
138 
139     assertDuration(smallerTimeoutNanos) {
140       pipe.sink.write(Buffer().writeUtf8("abc"), 3)
141     }
142     assertEquals(biggerTimeoutNanos, underlying.timeout().timeoutNanos())
143   }
144 
honorsUnderlyingTimeoutOnWritingWhenItIsSmallernull145   @Test fun honorsUnderlyingTimeoutOnWritingWhenItIsSmaller() {
146     val pipe = Pipe(4)
147     val underlying = TimeoutWritingSink()
148 
149     underlying.timeout.timeout(smallerTimeoutNanos, TimeUnit.NANOSECONDS)
150     pipe.sink.timeout().timeout(biggerTimeoutNanos, TimeUnit.NANOSECONDS)
151 
152     pipe.fold(underlying)
153 
154     assertDuration(smallerTimeoutNanos) {
155       pipe.sink.write(Buffer().writeUtf8("abc"), 3)
156     }
157     assertEquals(smallerTimeoutNanos, underlying.timeout().timeoutNanos())
158   }
159 
honorsPipeSinkTimeoutOnFlushingWhenItIsSmallernull160   @Test fun honorsPipeSinkTimeoutOnFlushingWhenItIsSmaller() {
161     val pipe = Pipe(4)
162     val underlying = TimeoutFlushingSink()
163 
164     underlying.timeout.timeout(biggerTimeoutNanos, TimeUnit.NANOSECONDS)
165     pipe.sink.timeout().timeout(smallerTimeoutNanos, TimeUnit.NANOSECONDS)
166 
167     pipe.fold(underlying)
168 
169     assertDuration(smallerTimeoutNanos) {
170       pipe.sink.flush()
171     }
172     assertEquals(biggerTimeoutNanos, underlying.timeout().timeoutNanos())
173   }
174 
honorsUnderlyingTimeoutOnFlushingWhenItIsSmallernull175   @Test fun honorsUnderlyingTimeoutOnFlushingWhenItIsSmaller() {
176     val pipe = Pipe(4)
177     val underlying = TimeoutFlushingSink()
178 
179     underlying.timeout.timeout(smallerTimeoutNanos, TimeUnit.NANOSECONDS)
180     pipe.sink.timeout().timeout(biggerTimeoutNanos, TimeUnit.NANOSECONDS)
181 
182     pipe.fold(underlying)
183 
184     assertDuration(smallerTimeoutNanos) {
185       pipe.sink.flush()
186     }
187     assertEquals(smallerTimeoutNanos, underlying.timeout().timeoutNanos())
188   }
189 
honorsPipeSinkTimeoutOnClosingWhenItIsSmallernull190   @Test fun honorsPipeSinkTimeoutOnClosingWhenItIsSmaller() {
191     val pipe = Pipe(4)
192     val underlying = TimeoutClosingSink()
193 
194     underlying.timeout.timeout(biggerTimeoutNanos, TimeUnit.NANOSECONDS)
195     pipe.sink.timeout().timeout(smallerTimeoutNanos, TimeUnit.NANOSECONDS)
196 
197     pipe.fold(underlying)
198 
199     assertDuration(smallerTimeoutNanos) {
200       pipe.sink.close()
201     }
202     assertEquals(biggerTimeoutNanos, underlying.timeout().timeoutNanos())
203   }
204 
honorsUnderlyingTimeoutOnClosingWhenItIsSmallernull205   @Test fun honorsUnderlyingTimeoutOnClosingWhenItIsSmaller() {
206     val pipe = Pipe(4)
207     val underlying = TimeoutClosingSink()
208 
209     underlying.timeout.timeout(smallerTimeoutNanos, TimeUnit.NANOSECONDS)
210     pipe.sink.timeout().timeout(biggerTimeoutNanos, TimeUnit.NANOSECONDS)
211 
212     pipe.fold(underlying)
213 
214     assertDuration(smallerTimeoutNanos) {
215       pipe.sink.close()
216     }
217     assertEquals(smallerTimeoutNanos, underlying.timeout().timeoutNanos())
218   }
219 
honorsPipeSinkTimeoutOnWritingWhenUnderlyingSinkTimeoutIsZeronull220   @Test fun honorsPipeSinkTimeoutOnWritingWhenUnderlyingSinkTimeoutIsZero() {
221     val pipeSinkTimeoutNanos = smallerTimeoutNanos
222 
223     val pipe = Pipe(4)
224     val underlying = TimeoutWritingSink()
225 
226     pipe.sink.timeout().timeout(pipeSinkTimeoutNanos, TimeUnit.NANOSECONDS)
227 
228     pipe.fold(underlying)
229 
230     assertDuration(pipeSinkTimeoutNanos) {
231       pipe.sink.write(Buffer().writeUtf8("abc"), 3)
232     }
233     assertEquals(0L, underlying.timeout().timeoutNanos())
234   }
235 
honorsUnderlyingSinkTimeoutOnWritingWhenPipeSinkTimeoutIsZeronull236   @Test fun honorsUnderlyingSinkTimeoutOnWritingWhenPipeSinkTimeoutIsZero() {
237     val underlyingSinkTimeoutNanos = smallerTimeoutNanos
238 
239     val pipe = Pipe(4)
240     val underlying = TimeoutWritingSink()
241 
242     underlying.timeout().timeout(underlyingSinkTimeoutNanos, TimeUnit.NANOSECONDS)
243 
244     pipe.fold(underlying)
245 
246     assertDuration(underlyingSinkTimeoutNanos) {
247       pipe.sink.write(Buffer().writeUtf8("abc"), 3)
248     }
249     assertEquals(underlyingSinkTimeoutNanos, underlying.timeout().timeoutNanos())
250   }
251 
honorsPipeSinkTimeoutOnFlushingWhenUnderlyingSinkTimeoutIsZeronull252   @Test fun honorsPipeSinkTimeoutOnFlushingWhenUnderlyingSinkTimeoutIsZero() {
253     val pipeSinkTimeoutNanos = smallerTimeoutNanos
254 
255     val pipe = Pipe(4)
256     val underlying = TimeoutFlushingSink()
257 
258     pipe.sink.timeout().timeout(pipeSinkTimeoutNanos, TimeUnit.NANOSECONDS)
259 
260     pipe.fold(underlying)
261 
262     assertDuration(pipeSinkTimeoutNanos) {
263       pipe.sink.flush()
264     }
265     assertEquals(0L, underlying.timeout().timeoutNanos())
266   }
267 
honorsUnderlyingSinkTimeoutOnFlushingWhenPipeSinkTimeoutIsZeronull268   @Test fun honorsUnderlyingSinkTimeoutOnFlushingWhenPipeSinkTimeoutIsZero() {
269     val underlyingSinkTimeoutNanos = smallerTimeoutNanos
270 
271     val pipe = Pipe(4)
272     val underlying = TimeoutFlushingSink()
273 
274     underlying.timeout().timeout(underlyingSinkTimeoutNanos, TimeUnit.NANOSECONDS)
275 
276     pipe.fold(underlying)
277 
278     assertDuration(underlyingSinkTimeoutNanos) {
279       pipe.sink.flush()
280     }
281     assertEquals(underlyingSinkTimeoutNanos, underlying.timeout().timeoutNanos())
282   }
283 
honorsPipeSinkTimeoutOnClosingWhenUnderlyingSinkTimeoutIsZeronull284   @Test fun honorsPipeSinkTimeoutOnClosingWhenUnderlyingSinkTimeoutIsZero() {
285     val pipeSinkTimeoutNanos = smallerTimeoutNanos
286 
287     val pipe = Pipe(4)
288     val underlying = TimeoutClosingSink()
289 
290     pipe.sink.timeout().timeout(pipeSinkTimeoutNanos, TimeUnit.NANOSECONDS)
291 
292     pipe.fold(underlying)
293 
294     assertDuration(pipeSinkTimeoutNanos) {
295       pipe.sink.close()
296     }
297     assertEquals(0L, underlying.timeout().timeoutNanos())
298   }
299 
honorsUnderlyingSinkTimeoutOnClosingWhenPipeSinkTimeoutIsZeronull300   @Test fun honorsUnderlyingSinkTimeoutOnClosingWhenPipeSinkTimeoutIsZero() {
301     val underlyingSinkTimeoutNanos = smallerTimeoutNanos
302 
303     val pipe = Pipe(4)
304     val underlying = TimeoutClosingSink()
305 
306     underlying.timeout().timeout(underlyingSinkTimeoutNanos, TimeUnit.NANOSECONDS)
307 
308     pipe.fold(underlying)
309 
310     assertDuration(underlyingSinkTimeoutNanos) {
311       pipe.sink.close()
312     }
313     assertEquals(underlyingSinkTimeoutNanos, underlying.timeout().timeoutNanos())
314   }
315 
honorsPipeSinkDeadlineOnWritingWhenItIsSmallernull316   @Test fun honorsPipeSinkDeadlineOnWritingWhenItIsSmaller() {
317     val pipe = Pipe(4)
318     val underlying = TimeoutWritingSink()
319 
320     val underlyingOriginalDeadline = System.nanoTime() + biggerDeadlineNanos
321     underlying.timeout.deadlineNanoTime(underlyingOriginalDeadline)
322     pipe.sink.timeout().deadlineNanoTime(System.nanoTime() + smallerDeadlineNanos)
323 
324     pipe.fold(underlying)
325 
326     assertDuration(smallerDeadlineNanos) {
327       pipe.sink.write(Buffer().writeUtf8("abc"), 3)
328     }
329     assertEquals(underlyingOriginalDeadline, underlying.timeout().deadlineNanoTime())
330   }
331 
honorsPipeSinkDeadlineOnWritingWhenUnderlyingSinkHasNoDeadlinenull332   @Test fun honorsPipeSinkDeadlineOnWritingWhenUnderlyingSinkHasNoDeadline() {
333     val deadlineNanos = smallerDeadlineNanos
334 
335     val pipe = Pipe(4)
336     val underlying = TimeoutWritingSink()
337 
338     underlying.timeout.clearDeadline()
339     pipe.sink.timeout().deadlineNanoTime(System.nanoTime() + deadlineNanos)
340 
341     pipe.fold(underlying)
342 
343     assertDuration(deadlineNanos) {
344       pipe.sink.write(Buffer().writeUtf8("abc"), 3)
345     }
346     assertFalse(underlying.timeout().hasDeadline())
347   }
348 
honorsUnderlyingSinkDeadlineOnWritingWhenItIsSmallernull349   @Test fun honorsUnderlyingSinkDeadlineOnWritingWhenItIsSmaller() {
350     val pipe = Pipe(4)
351     val underlying = TimeoutWritingSink()
352 
353     val underlyingOriginalDeadline = System.nanoTime() + smallerDeadlineNanos
354     underlying.timeout.deadlineNanoTime(underlyingOriginalDeadline)
355     pipe.sink.timeout().deadlineNanoTime(System.nanoTime() + biggerDeadlineNanos)
356 
357     pipe.fold(underlying)
358 
359     assertDuration(smallerDeadlineNanos) {
360       pipe.sink.write(Buffer().writeUtf8("abc"), 3)
361     }
362     assertEquals(underlyingOriginalDeadline, underlying.timeout().deadlineNanoTime())
363   }
364 
honorsUnderlyingSinkDeadlineOnWritingWhenPipeSinkHasNoDeadlinenull365   @Test fun honorsUnderlyingSinkDeadlineOnWritingWhenPipeSinkHasNoDeadline() {
366     val deadlineNanos = smallerDeadlineNanos
367 
368     val pipe = Pipe(4)
369     val underlying = TimeoutWritingSink()
370 
371     val underlyingOriginalDeadline = System.nanoTime() + deadlineNanos
372     underlying.timeout().deadlineNanoTime(underlyingOriginalDeadline)
373     pipe.sink.timeout().clearDeadline()
374 
375     pipe.fold(underlying)
376 
377     assertDuration(deadlineNanos) {
378       pipe.sink.write(Buffer().writeUtf8("abc"), 3)
379     }
380     assertEquals(underlyingOriginalDeadline, underlying.timeout().deadlineNanoTime())
381   }
382 
honorsPipeSinkDeadlineOnFlushingWhenItIsSmallernull383   @Test fun honorsPipeSinkDeadlineOnFlushingWhenItIsSmaller() {
384     val pipe = Pipe(4)
385     val underlying = TimeoutFlushingSink()
386 
387     val underlyingOriginalDeadline = System.nanoTime() + biggerDeadlineNanos
388     underlying.timeout.deadlineNanoTime(underlyingOriginalDeadline)
389     pipe.sink.timeout().deadlineNanoTime(System.nanoTime() + smallerDeadlineNanos)
390 
391     pipe.fold(underlying)
392 
393     assertDuration(smallerDeadlineNanos) {
394       pipe.sink.flush()
395     }
396     assertEquals(underlyingOriginalDeadline, underlying.timeout().deadlineNanoTime())
397   }
398 
honorsPipeSinkDeadlineOnFlushingWhenUnderlyingSinkHasNoDeadlinenull399   @Test fun honorsPipeSinkDeadlineOnFlushingWhenUnderlyingSinkHasNoDeadline() {
400     val deadlineNanos = smallerDeadlineNanos
401 
402     val pipe = Pipe(4)
403     val underlying = TimeoutFlushingSink()
404 
405     underlying.timeout.clearDeadline()
406     pipe.sink.timeout().deadlineNanoTime(System.nanoTime() + deadlineNanos)
407 
408     pipe.fold(underlying)
409 
410     assertDuration(deadlineNanos) {
411       pipe.sink.flush()
412     }
413     assertFalse(underlying.timeout().hasDeadline())
414   }
415 
honorsUnderlyingSinkDeadlineOnFlushingWhenItIsSmallernull416   @Test fun honorsUnderlyingSinkDeadlineOnFlushingWhenItIsSmaller() {
417     val pipe = Pipe(4)
418     val underlying = TimeoutFlushingSink()
419 
420     val underlyingOriginalDeadline = System.nanoTime() + smallerDeadlineNanos
421     underlying.timeout.deadlineNanoTime(underlyingOriginalDeadline)
422     pipe.sink.timeout().deadlineNanoTime(System.nanoTime() + biggerDeadlineNanos)
423 
424     pipe.fold(underlying)
425 
426     assertDuration(smallerDeadlineNanos) {
427       pipe.sink.flush()
428     }
429     assertEquals(underlyingOriginalDeadline, underlying.timeout().deadlineNanoTime())
430   }
431 
honorsUnderlyingSinkDeadlineOnFlushingWhenPipeSinkHasNoDeadlinenull432   @Test fun honorsUnderlyingSinkDeadlineOnFlushingWhenPipeSinkHasNoDeadline() {
433     val deadlineNanos = smallerDeadlineNanos
434 
435     val pipe = Pipe(4)
436     val underlying = TimeoutFlushingSink()
437 
438     val underlyingOriginalDeadline = System.nanoTime() + deadlineNanos
439     underlying.timeout().deadlineNanoTime(underlyingOriginalDeadline)
440     pipe.sink.timeout().clearDeadline()
441 
442     pipe.fold(underlying)
443 
444     assertDuration(deadlineNanos) {
445       pipe.sink.flush()
446     }
447     assertEquals(underlyingOriginalDeadline, underlying.timeout().deadlineNanoTime())
448   }
449 
honorsPipeSinkDeadlineOnClosingWhenItIsSmallernull450   @Test fun honorsPipeSinkDeadlineOnClosingWhenItIsSmaller() {
451     val pipe = Pipe(4)
452     val underlying = TimeoutClosingSink()
453 
454     val underlyingOriginalDeadline = System.nanoTime() + biggerDeadlineNanos
455     underlying.timeout.deadlineNanoTime(underlyingOriginalDeadline)
456     pipe.sink.timeout().deadlineNanoTime(System.nanoTime() + smallerDeadlineNanos)
457 
458     pipe.fold(underlying)
459 
460     assertDuration(smallerDeadlineNanos) {
461       pipe.sink.close()
462     }
463     assertEquals(underlyingOriginalDeadline, underlying.timeout().deadlineNanoTime())
464   }
465 
honorsPipeSinkDeadlineOnClosingWhenUnderlyingSinkHasNoDeadlinenull466   @Test fun honorsPipeSinkDeadlineOnClosingWhenUnderlyingSinkHasNoDeadline() {
467     val deadlineNanos = smallerDeadlineNanos
468 
469     val pipe = Pipe(4)
470     val underlying = TimeoutClosingSink()
471 
472     underlying.timeout.clearDeadline()
473     pipe.sink.timeout().deadlineNanoTime(System.nanoTime() + deadlineNanos)
474 
475     pipe.fold(underlying)
476 
477     assertDuration(deadlineNanos) {
478       pipe.sink.close()
479     }
480     assertFalse(underlying.timeout().hasDeadline())
481   }
482 
honorsUnderlyingSinkDeadlineOnClosingWhenItIsSmallernull483   @Test fun honorsUnderlyingSinkDeadlineOnClosingWhenItIsSmaller() {
484     val pipe = Pipe(4)
485     val underlying = TimeoutClosingSink()
486 
487     val underlyingOriginalDeadline = System.nanoTime() + smallerDeadlineNanos
488     underlying.timeout.deadlineNanoTime(underlyingOriginalDeadline)
489     pipe.sink.timeout().deadlineNanoTime(System.nanoTime() + biggerDeadlineNanos)
490 
491     pipe.fold(underlying)
492 
493     assertDuration(smallerDeadlineNanos) {
494       pipe.sink.close()
495     }
496     assertEquals(underlyingOriginalDeadline, underlying.timeout().deadlineNanoTime())
497   }
498 
honorsUnderlyingSinkDeadlineOnClosingWhenPipeSinkHasNoDeadlinenull499   @Test fun honorsUnderlyingSinkDeadlineOnClosingWhenPipeSinkHasNoDeadline() {
500     val deadlineNanos = smallerDeadlineNanos
501 
502     val pipe = Pipe(4)
503     val underlying = TimeoutClosingSink()
504 
505     val underlyingOriginalDeadline = System.nanoTime() + deadlineNanos
506     underlying.timeout().deadlineNanoTime(underlyingOriginalDeadline)
507     pipe.sink.timeout().clearDeadline()
508 
509     pipe.fold(underlying)
510 
511     assertDuration(deadlineNanos) {
512       pipe.sink.close()
513     }
514     assertEquals(underlyingOriginalDeadline, underlying.timeout().deadlineNanoTime())
515   }
516 
foldingTwiceThrowsnull517   @Test fun foldingTwiceThrows() {
518     val pipe = Pipe(128)
519     pipe.fold(Buffer())
520     assertFailsWith<IllegalStateException> {
521       pipe.fold(Buffer())
522     }
523   }
524 
sinkWriteThrowsIOExceptionUnblockBlockedWriternull525   @Test fun sinkWriteThrowsIOExceptionUnblockBlockedWriter() {
526     val pipe = Pipe(4)
527 
528     val foldFuture = executorService.schedule(
529       {
530         val foldFailure = assertFailsWith<IOException> {
531           pipe.fold(object : ForwardingSink(blackholeSink()) {
532             override fun write(source: Buffer, byteCount: Long) {
533               throw IOException("boom")
534             }
535           })
536         }
537         assertEquals("boom", foldFailure.message)
538       },
539       500, TimeUnit.MILLISECONDS
540     )
541 
542     val writeFailure = assertFailsWith<IOException> {
543       val pipeSink = pipe.sink.buffer()
544       pipeSink.writeUtf8("abcdefghij")
545       pipeSink.emit() // Block writing 10 bytes to a 4 byte pipe.
546     }
547     assertEquals("source is closed", writeFailure.message)
548 
549     foldFuture.get() // Confirm no unexpected exceptions.
550   }
551 
foldHoldsNoLocksWhenForwardingWritesnull552   @Test fun foldHoldsNoLocksWhenForwardingWrites() {
553     val pipe = Pipe(4)
554 
555     val pipeSink = pipe.sink.buffer()
556     pipeSink.writeUtf8("abcd")
557     pipeSink.emit()
558 
559     pipe.fold(object : ForwardingSink(blackholeSink()) {
560       override fun write(source: Buffer, byteCount: Long) {
561         assertFalse(Thread.holdsLock(pipe.buffer))
562       }
563     })
564   }
565 
566   /**
567    * Flushing the pipe wasn't causing the sink to be flushed when it was later folded. This was
568    * causing problems because the folded data was stalled.
569    */
foldFlushesWhenThereIsFoldedDatanull570   @Test fun foldFlushesWhenThereIsFoldedData() {
571     val pipe = Pipe(128)
572     val pipeSink = pipe.sink.buffer()
573     pipeSink.writeUtf8("hello")
574     pipeSink.emit()
575 
576     val ultimateSink = Buffer()
577     val unnecessaryWrapper = (ultimateSink as Sink).buffer()
578 
579     pipe.fold(unnecessaryWrapper)
580 
581     // Data should not have been flushed through the wrapper to the ultimate sink.
582     assertEquals("hello", ultimateSink.readUtf8())
583   }
584 
foldDoesNotFlushWhenThereIsNoFoldedDatanull585   @Test fun foldDoesNotFlushWhenThereIsNoFoldedData() {
586     val pipe = Pipe(128)
587 
588     val ultimateSink = Buffer()
589     val unnecessaryWrapper = (ultimateSink as Sink).buffer()
590     unnecessaryWrapper.writeUtf8("hello")
591 
592     pipe.fold(unnecessaryWrapper)
593 
594     // Data should not have been flushed through the wrapper to the ultimate sink.
595     assertEquals("", ultimateSink.readUtf8())
596   }
597 
foldingClosesUnderlyingSinkWhenPipeSinkIsClosenull598   @Test fun foldingClosesUnderlyingSinkWhenPipeSinkIsClose() {
599     val pipe = Pipe(128)
600 
601     val pipeSink = pipe.sink.buffer()
602     pipeSink.writeUtf8("world")
603     pipeSink.close()
604 
605     val foldedSinkBuffer = Buffer()
606     var foldedSinkClosed = false
607     val foldedSink = object : ForwardingSink(foldedSinkBuffer) {
608       override fun close() {
609         foldedSinkClosed = true
610         super.close()
611       }
612     }
613 
614     pipe.fold(foldedSink)
615     assertEquals("world", foldedSinkBuffer.readUtf8(5))
616     assertTrue(foldedSinkClosed)
617   }
618 
cancelPreventsSinkWritenull619   @Test fun cancelPreventsSinkWrite() {
620     val pipe = Pipe(8)
621     pipe.cancel()
622 
623     val pipeSink = pipe.sink.buffer()
624     pipeSink.writeUtf8("hello world")
625 
626     try {
627       pipeSink.emit()
628       fail()
629     } catch (e: IOException) {
630       assertEquals("canceled", e.message)
631     }
632   }
633 
cancelPreventsSinkFlushnull634   @Test fun cancelPreventsSinkFlush() {
635     val pipe = Pipe(8)
636     pipe.cancel()
637 
638     try {
639       pipe.sink.flush()
640       fail()
641     } catch (e: IOException) {
642       assertEquals("canceled", e.message)
643     }
644   }
645 
sinkCloseAfterCancelDoesNotThrownull646   @Test fun sinkCloseAfterCancelDoesNotThrow() {
647     val pipe = Pipe(8)
648     pipe.cancel()
649     pipe.sink.close()
650   }
651 
cancelInterruptsSinkWritenull652   @Test fun cancelInterruptsSinkWrite() {
653     val pipe = Pipe(8)
654 
655     executorService.schedule(
656       {
657         pipe.cancel()
658       },
659       smallerTimeoutNanos, TimeUnit.NANOSECONDS
660     )
661 
662     val pipeSink = pipe.sink.buffer()
663     pipeSink.writeUtf8("hello world")
664 
665     assertDuration(smallerTimeoutNanos) {
666       try {
667         pipeSink.emit()
668         fail()
669       } catch (e: IOException) {
670         assertEquals("canceled", e.message)
671       }
672     }
673   }
674 
cancelPreventsSourceReadnull675   @Test fun cancelPreventsSourceRead() {
676     val pipe = Pipe(8)
677     pipe.cancel()
678 
679     val pipeSource = pipe.source.buffer()
680 
681     try {
682       pipeSource.require(1)
683       fail()
684     } catch (e: IOException) {
685       assertEquals("canceled", e.message)
686     }
687   }
688 
sourceCloseAfterCancelDoesNotThrownull689   @Test fun sourceCloseAfterCancelDoesNotThrow() {
690     val pipe = Pipe(8)
691     pipe.cancel()
692     pipe.source.close()
693   }
694 
cancelInterruptsSourceReadnull695   @Test fun cancelInterruptsSourceRead() {
696     val pipe = Pipe(8)
697 
698     executorService.schedule(
699       {
700         pipe.cancel()
701       },
702       smallerTimeoutNanos, TimeUnit.NANOSECONDS
703     )
704 
705     val pipeSource = pipe.source.buffer()
706 
707     assertDuration(smallerTimeoutNanos) {
708       try {
709         pipeSource.require(1)
710         fail()
711       } catch (e: IOException) {
712         assertEquals("canceled", e.message)
713       }
714     }
715   }
716 
cancelPreventsSinkFoldnull717   @Test fun cancelPreventsSinkFold() {
718     val pipe = Pipe(8)
719     pipe.cancel()
720 
721     var foldedSinkClosed = false
722     val foldedSink = object : ForwardingSink(Buffer()) {
723       override fun close() {
724         foldedSinkClosed = true
725         super.close()
726       }
727     }
728 
729     try {
730       pipe.fold(foldedSink)
731       fail()
732     } catch (e: IOException) {
733       assertEquals("canceled", e.message)
734     }
735 
736     // But the fold is still performed so close() closes everything.
737     assertFalse(foldedSinkClosed)
738     pipe.sink.close()
739     assertTrue(foldedSinkClosed)
740   }
741 
cancelInterruptsSinkFoldnull742   @Test fun cancelInterruptsSinkFold() {
743     val pipe = Pipe(128)
744     val pipeSink = pipe.sink.buffer()
745     pipeSink.writeUtf8("hello")
746     pipeSink.emit()
747 
748     var foldedSinkClosed = false
749     val foldedSink = object : ForwardingSink(Buffer()) {
750       override fun write(source: Buffer, byteCount: Long) {
751         assertEquals("hello", source.readUtf8(byteCount))
752 
753         // Write bytes to the original pipe so the pipe write doesn't complete!
754         pipeSink.writeUtf8("more bytes")
755         pipeSink.emit()
756 
757         // Cancel while the pipe is writing.
758         pipe.cancel()
759       }
760 
761       override fun close() {
762         foldedSinkClosed = true
763         super.close()
764       }
765     }
766 
767     try {
768       pipe.fold(foldedSink)
769       fail()
770     } catch (e: IOException) {
771       assertEquals("canceled", e.message)
772     }
773 
774     // But the fold is still performed so close() closes everything.
775     assertFalse(foldedSinkClosed)
776     pipe.sink.close()
777     assertTrue(foldedSinkClosed)
778   }
779 
assertDurationnull780   private fun assertDuration(expected: Long, block: () -> Unit) {
781     val start = System.currentTimeMillis()
782     block()
783     val elapsed = TimeUnit.MILLISECONDS.toNanos(System.currentTimeMillis() - start)
784 
785     assertEquals(
786       expected.toDouble(), elapsed.toDouble(),
787       TimeUnit.MILLISECONDS.toNanos(200).toDouble()
788     )
789   }
790 
791   /** Writes on this sink never complete. They can only time out. */
792   class TimeoutWritingSink : Sink {
793     val timeout = object : AsyncTimeout() {
timedOutnull794       override fun timedOut() {
795         synchronized(this@TimeoutWritingSink) {
796           (this@TimeoutWritingSink as Object).notifyAll()
797         }
798       }
799     }
800 
writenull801     override fun write(source: Buffer, byteCount: Long) {
802       timeout.enter()
803       try {
804         synchronized(this) {
805           (this as Object).wait()
806         }
807       } finally {
808         timeout.exit()
809       }
810       source.skip(byteCount)
811     }
812 
flushnull813     override fun flush() = Unit
814 
815     override fun close() = Unit
816 
817     override fun timeout() = timeout
818   }
819 
820   /** Flushes on this sink never complete. They can only time out. */
821   class TimeoutFlushingSink : Sink {
822     val timeout = object : AsyncTimeout() {
823       override fun timedOut() {
824         synchronized(this@TimeoutFlushingSink) {
825           (this@TimeoutFlushingSink as Object).notifyAll()
826         }
827       }
828     }
829 
830     override fun write(source: Buffer, byteCount: Long) = source.skip(byteCount)
831 
832     override fun flush() {
833       timeout.enter()
834       try {
835         synchronized(this) {
836           (this as Object).wait()
837         }
838       } finally {
839         timeout.exit()
840       }
841     }
842 
843     override fun close() = Unit
844 
845     override fun timeout() = timeout
846   }
847 
848   /** Closes on this sink never complete. They can only time out. */
849   class TimeoutClosingSink : Sink {
850     val timeout = object : AsyncTimeout() {
timedOutnull851       override fun timedOut() {
852         synchronized(this@TimeoutClosingSink) {
853           (this@TimeoutClosingSink as Object).notifyAll()
854         }
855       }
856     }
857 
writenull858     override fun write(source: Buffer, byteCount: Long) = source.skip(byteCount)
859 
860     override fun flush() = Unit
861 
862     override fun close() {
863       timeout.enter()
864       try {
865         synchronized(this) {
866           (this as Object).wait()
867         }
868       } finally {
869         timeout.exit()
870       }
871     }
872 
timeoutnull873     override fun timeout() = timeout
874   }
875 
876   companion object {
877     val smallerTimeoutNanos = TimeUnit.MILLISECONDS.toNanos(500L)
878     val biggerTimeoutNanos = TimeUnit.MILLISECONDS.toNanos(1500L)
879 
880     val smallerDeadlineNanos = TimeUnit.MILLISECONDS.toNanos(500L)
881     val biggerDeadlineNanos = TimeUnit.MILLISECONDS.toNanos(1500L)
882   }
883 }
884