1 /* 2 * Copyright 2016-2018 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license. 3 */ 4 5 package kotlinx.coroutines.reactive 6 7 import kotlinx.coroutines.* 8 import org.reactivestreams.* 9 import org.reactivestreams.tck.* 10 import org.testng.* 11 import org.testng.annotations.* 12 13 14 class ReactiveStreamTckTest : TestBase() { 15 16 @Factory(dataProvider = "dispatchers") createTestsnull17 fun createTests(dispatcher: Dispatcher): Array<Any> { 18 return arrayOf(ReactiveStreamTckTestSuite(dispatcher)) 19 } 20 21 @DataProvider(name = "dispatchers") <lambda>null22 public fun dispatchers(): Array<Array<Any>> = Dispatcher.values().map { arrayOf<Any>(it) }.toTypedArray() 23 24 25 public class ReactiveStreamTckTestSuite( 26 private val dispatcher: Dispatcher 27 ) : PublisherVerification<Long>(TestEnvironment(500, 500)) { 28 createPublishernull29 override fun createPublisher(elements: Long): Publisher<Long> = 30 publish(dispatcher.dispatcher) { 31 for (i in 1..elements) send(i) 32 } 33 createFailedPublishernull34 override fun createFailedPublisher(): Publisher<Long> = 35 publish(dispatcher.dispatcher) { 36 throw TestException() 37 } 38 39 @Test optional_spec105_emptyStreamMustTerminateBySignallingOnCompletenull40 public override fun optional_spec105_emptyStreamMustTerminateBySignallingOnComplete() { 41 throw SkipException("Skipped") 42 } 43 44 class TestException : Exception() 45 } 46 } 47 48 enum class Dispatcher(val dispatcher: CoroutineDispatcher) { 49 DEFAULT(Dispatchers.Default), 50 UNCONFINED(Dispatchers.Unconfined) 51 } 52