1 /*
2  * Copyright 2016-2018 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
3  */
4 
5 package kotlinx.coroutines.reactive
6 
7 import kotlinx.coroutines.*
8 import org.reactivestreams.*
9 import org.reactivestreams.tck.*
10 import org.testng.*
11 import org.testng.annotations.*
12 
13 
14 class ReactiveStreamTckTest : TestBase() {
15 
16     @Factory(dataProvider = "dispatchers")
createTestsnull17     fun createTests(dispatcher: Dispatcher): Array<Any> {
18         return arrayOf(ReactiveStreamTckTestSuite(dispatcher))
19     }
20 
21     @DataProvider(name = "dispatchers")
<lambda>null22     public fun dispatchers(): Array<Array<Any>> = Dispatcher.values().map { arrayOf<Any>(it) }.toTypedArray()
23 
24 
25     public class ReactiveStreamTckTestSuite(
26         private val dispatcher: Dispatcher
27     ) : PublisherVerification<Long>(TestEnvironment(500, 500)) {
28 
createPublishernull29         override fun createPublisher(elements: Long): Publisher<Long> =
30             publish(dispatcher.dispatcher) {
31                 for (i in 1..elements) send(i)
32             }
33 
createFailedPublishernull34         override fun createFailedPublisher(): Publisher<Long> =
35             publish(dispatcher.dispatcher) {
36                 throw TestException()
37             }
38 
39         @Test
optional_spec105_emptyStreamMustTerminateBySignallingOnCompletenull40         public override fun optional_spec105_emptyStreamMustTerminateBySignallingOnComplete() {
41             throw SkipException("Skipped")
42         }
43 
44         class TestException : Exception()
45     }
46 }
47 
48 enum class Dispatcher(val dispatcher: CoroutineDispatcher) {
49     DEFAULT(Dispatchers.Default),
50     UNCONFINED(Dispatchers.Unconfined)
51 }
52