1 #include "../test.h" 2 #include <rxcpp/operators/rx-take.hpp> 3 4 SCENARIO("take 2", "[take][operators]"){ 5 GIVEN("a source"){ 6 auto sc = rxsc::make_test(); 7 auto w = sc.create_worker(); 8 const rxsc::test::messages<int> on; 9 10 auto xs = sc.make_hot_observable({ 11 on.next(150, 1), 12 on.next(210, 2), 13 on.next(220, 3), 14 on.next(230, 4), 15 on.next(240, 5), 16 on.completed(250) 17 }); 18 19 WHEN("2 values are taken"){ 20 21 auto res = w.start( __anon4424e56f0102() 22 [xs]() { 23 return xs 24 | rxo::take(2) 25 // forget type to workaround lambda deduction bug on msvc 2013 26 | rxo::as_dynamic(); 27 } 28 ); 29 30 THEN("the output only contains items sent while subscribed"){ 31 auto required = rxu::to_vector({ 32 on.next(210, 2), 33 on.next(220, 3), 34 on.completed(220) 35 }); 36 auto actual = res.get_observer().messages(); 37 REQUIRE(required == actual); 38 } 39 40 THEN("there was 1 subscription/unsubscription to the source"){ 41 auto required = rxu::to_vector({ 42 on.subscribe(200, 220) 43 }); 44 auto actual = xs.subscriptions(); 45 REQUIRE(required == actual); 46 } 47 48 } 49 } 50 } 51 52 SCENARIO("take, complete after", "[take][operators]"){ 53 GIVEN("a source"){ 54 auto sc = rxsc::make_test(); 55 auto w = sc.create_worker(); 56 const rxsc::test::messages<int> on; 57 58 auto xs = sc.make_hot_observable({ 59 on.next(70, 6), 60 on.next(150, 4), 61 on.next(210, 9), 62 on.next(230, 13), 63 on.next(270, 7), 64 on.next(280, 1), 65 on.next(300, -1), 66 on.next(310, 3), 67 on.next(340, 8), 68 on.next(370, 11), 69 on.next(410, 15), 70 on.next(415, 16), 71 on.next(460, 72), 72 on.next(510, 76), 73 on.next(560, 32), 74 on.next(570, -100), 75 on.next(580, -3), 76 on.next(590, 5), 77 on.next(630, 10), 78 on.completed(690) 79 }); 80 81 WHEN("20 values are taken"){ 82 83 auto res = w.start( __anon4424e56f0202() 84 [xs]() { 85 return xs 86 .take(20) 87 // forget type to workaround lambda deduction bug on msvc 2013 88 .as_dynamic(); 89 } 90 ); 91 92 THEN("the output only contains items sent while subscribed"){ 93 auto required = rxu::to_vector({ 94 on.next(210, 9), 95 on.next(230, 13), 96 on.next(270, 7), 97 on.next(280, 1), 98 on.next(300, -1), 99 on.next(310, 3), 100 on.next(340, 8), 101 on.next(370, 11), 102 on.next(410, 15), 103 on.next(415, 16), 104 on.next(460, 72), 105 on.next(510, 76), 106 on.next(560, 32), 107 on.next(570, -100), 108 on.next(580, -3), 109 on.next(590, 5), 110 on.next(630, 10), 111 on.completed(690) 112 }); 113 auto actual = res.get_observer().messages(); 114 REQUIRE(required == actual); 115 } 116 117 THEN("there was 1 subscription/unsubscription to the source"){ 118 auto required = rxu::to_vector({ 119 on.subscribe(200, 690) 120 }); 121 auto actual = xs.subscriptions(); 122 REQUIRE(required == actual); 123 } 124 125 } 126 } 127 } 128 129 SCENARIO("take, complete same", "[take][operators]"){ 130 GIVEN("a source"){ 131 auto sc = rxsc::make_test(); 132 auto w = sc.create_worker(); 133 const rxsc::test::messages<int> on; 134 135 auto xs = sc.make_hot_observable({ 136 on.next(70, 6), 137 on.next(150, 4), 138 on.next(210, 9), 139 on.next(230, 13), 140 on.next(270, 7), 141 on.next(280, 1), 142 on.next(300, -1), 143 on.next(310, 3), 144 on.next(340, 8), 145 on.next(370, 11), 146 on.next(410, 15), 147 on.next(415, 16), 148 on.next(460, 72), 149 on.next(510, 76), 150 on.next(560, 32), 151 on.next(570, -100), 152 on.next(580, -3), 153 on.next(590, 5), 154 on.next(630, 10), 155 on.completed(690) 156 }); 157 158 WHEN("17 values are taken"){ 159 160 auto res = w.start( __anon4424e56f0302() 161 [xs]() { 162 return xs 163 .take(17) 164 // forget type to workaround lambda deduction bug on msvc 2013 165 .as_dynamic(); 166 } 167 ); 168 169 THEN("the output only contains items sent while subscribed"){ 170 auto required = rxu::to_vector({ 171 on.next(210, 9), 172 on.next(230, 13), 173 on.next(270, 7), 174 on.next(280, 1), 175 on.next(300, -1), 176 on.next(310, 3), 177 on.next(340, 8), 178 on.next(370, 11), 179 on.next(410, 15), 180 on.next(415, 16), 181 on.next(460, 72), 182 on.next(510, 76), 183 on.next(560, 32), 184 on.next(570, -100), 185 on.next(580, -3), 186 on.next(590, 5), 187 on.next(630, 10), 188 on.completed(630) 189 }); 190 auto actual = res.get_observer().messages(); 191 REQUIRE(required == actual); 192 } 193 194 THEN("there was 1 subscription/unsubscription to the source"){ 195 auto required = rxu::to_vector({ 196 on.subscribe(200, 630) 197 }); 198 auto actual = xs.subscriptions(); 199 REQUIRE(required == actual); 200 } 201 202 } 203 } 204 } 205 206 SCENARIO("take, complete before", "[take][operators]"){ 207 GIVEN("a source"){ 208 auto sc = rxsc::make_test(); 209 auto w = sc.create_worker(); 210 const rxsc::test::messages<int> on; 211 212 auto xs = sc.make_hot_observable({ 213 on.next(70, 6), 214 on.next(150, 4), 215 on.next(210, 9), 216 on.next(230, 13), 217 on.next(270, 7), 218 on.next(280, 1), 219 on.next(300, -1), 220 on.next(310, 3), 221 on.next(340, 8), 222 on.next(370, 11), 223 on.next(410, 15), 224 on.next(415, 16), 225 on.next(460, 72), 226 on.next(510, 76), 227 on.next(560, 32), 228 on.next(570, -100), 229 on.next(580, -3), 230 on.next(590, 5), 231 on.next(630, 10), 232 on.completed(690) 233 }); 234 235 WHEN("10 values are taken"){ 236 237 auto res = w.start( __anon4424e56f0402() 238 [xs]() { 239 return xs 240 .take(10) 241 // forget type to workaround lambda deduction bug on msvc 2013 242 .as_dynamic(); 243 } 244 ); 245 246 THEN("the output only contains items sent while subscribed"){ 247 auto required = rxu::to_vector({ 248 on.next(210, 9), 249 on.next(230, 13), 250 on.next(270, 7), 251 on.next(280, 1), 252 on.next(300, -1), 253 on.next(310, 3), 254 on.next(340, 8), 255 on.next(370, 11), 256 on.next(410, 15), 257 on.next(415, 16), 258 on.completed(415) 259 }); 260 auto actual = res.get_observer().messages(); 261 REQUIRE(required == actual); 262 } 263 264 THEN("there was 1 subscription/unsubscription to the source"){ 265 auto required = rxu::to_vector({ 266 on.subscribe(200, 415) 267 }); 268 auto actual = xs.subscriptions(); 269 REQUIRE(required == actual); 270 } 271 272 } 273 } 274 } 275 276 SCENARIO("take, error after", "[take][operators]"){ 277 GIVEN("a source"){ 278 auto sc = rxsc::make_test(); 279 auto w = sc.create_worker(); 280 const rxsc::test::messages<int> on; 281 282 std::runtime_error ex("take on_error from source"); 283 284 auto xs = sc.make_hot_observable({ 285 on.next(70, 6), 286 on.next(150, 4), 287 on.next(210, 9), 288 on.next(230, 13), 289 on.next(270, 7), 290 on.next(280, 1), 291 on.next(300, -1), 292 on.next(310, 3), 293 on.next(340, 8), 294 on.next(370, 11), 295 on.next(410, 15), 296 on.next(415, 16), 297 on.next(460, 72), 298 on.next(510, 76), 299 on.next(560, 32), 300 on.next(570, -100), 301 on.next(580, -3), 302 on.next(590, 5), 303 on.next(630, 10), 304 on.error(690, ex) 305 }); 306 307 WHEN("20 values are taken"){ 308 309 auto res = w.start( __anon4424e56f0502() 310 [xs]() { 311 return xs 312 .take(20) 313 // forget type to workaround lambda deduction bug on msvc 2013 314 .as_dynamic(); 315 } 316 ); 317 318 THEN("the output only contains items sent while subscribed"){ 319 auto required = rxu::to_vector({ 320 on.next(210, 9), 321 on.next(230, 13), 322 on.next(270, 7), 323 on.next(280, 1), 324 on.next(300, -1), 325 on.next(310, 3), 326 on.next(340, 8), 327 on.next(370, 11), 328 on.next(410, 15), 329 on.next(415, 16), 330 on.next(460, 72), 331 on.next(510, 76), 332 on.next(560, 32), 333 on.next(570, -100), 334 on.next(580, -3), 335 on.next(590, 5), 336 on.next(630, 10), 337 on.error(690, ex) 338 }); 339 auto actual = res.get_observer().messages(); 340 REQUIRE(required == actual); 341 } 342 343 THEN("there was 1 subscription/unsubscription to the source"){ 344 auto required = rxu::to_vector({ 345 on.subscribe(200, 690) 346 }); 347 auto actual = xs.subscriptions(); 348 REQUIRE(required == actual); 349 } 350 351 } 352 } 353 } 354 355 SCENARIO("take, error same", "[take][operators]"){ 356 GIVEN("a source"){ 357 auto sc = rxsc::make_test(); 358 auto w = sc.create_worker(); 359 const rxsc::test::messages<int> on; 360 361 auto xs = sc.make_hot_observable({ 362 on.next(70, 6), 363 on.next(150, 4), 364 on.next(210, 9), 365 on.next(230, 13), 366 on.next(270, 7), 367 on.next(280, 1), 368 on.next(300, -1), 369 on.next(310, 3), 370 on.next(340, 8), 371 on.next(370, 11), 372 on.next(410, 15), 373 on.next(415, 16), 374 on.next(460, 72), 375 on.next(510, 76), 376 on.next(560, 32), 377 on.next(570, -100), 378 on.next(580, -3), 379 on.next(590, 5), 380 on.next(630, 10), 381 on.error(690, std::runtime_error("error in unsubscribed stream")) 382 }); 383 384 WHEN("17 values are taken"){ 385 386 auto res = w.start( __anon4424e56f0602() 387 [xs]() { 388 return xs 389 .take(17) 390 // forget type to workaround lambda deduction bug on msvc 2013 391 .as_dynamic(); 392 } 393 ); 394 395 THEN("the output only contains items sent while subscribed"){ 396 auto required = rxu::to_vector({ 397 on.next(210, 9), 398 on.next(230, 13), 399 on.next(270, 7), 400 on.next(280, 1), 401 on.next(300, -1), 402 on.next(310, 3), 403 on.next(340, 8), 404 on.next(370, 11), 405 on.next(410, 15), 406 on.next(415, 16), 407 on.next(460, 72), 408 on.next(510, 76), 409 on.next(560, 32), 410 on.next(570, -100), 411 on.next(580, -3), 412 on.next(590, 5), 413 on.next(630, 10), 414 on.completed(630) 415 }); 416 auto actual = res.get_observer().messages(); 417 REQUIRE(required == actual); 418 } 419 420 THEN("there was 1 subscription/unsubscription to the source"){ 421 auto required = rxu::to_vector({ 422 on.subscribe(200, 630) 423 }); 424 auto actual = xs.subscriptions(); 425 REQUIRE(required == actual); 426 } 427 428 } 429 } 430 } 431 432 SCENARIO("take, error before", "[take][operators]"){ 433 GIVEN("a source"){ 434 auto sc = rxsc::make_test(); 435 auto w = sc.create_worker(); 436 const rxsc::test::messages<int> on; 437 438 auto xs = sc.make_hot_observable({ 439 on.next(70, 6), 440 on.next(150, 4), 441 on.next(210, 9), 442 on.next(230, 13), 443 on.next(270, 7), 444 on.next(280, 1), 445 on.next(300, -1), 446 on.next(310, 3), 447 on.next(340, 8), 448 on.next(370, 11), 449 on.next(410, 15), 450 on.next(415, 16), 451 on.next(460, 72), 452 on.next(510, 76), 453 on.next(560, 32), 454 on.next(570, -100), 455 on.next(580, -3), 456 on.next(590, 5), 457 on.next(630, 10), 458 on.error(690, std::runtime_error("error in unsubscribed stream")) 459 }); 460 461 WHEN("3 values are taken"){ 462 463 auto res = w.start( __anon4424e56f0702() 464 [xs]() { 465 return xs 466 .take(3) 467 // forget type to workaround lambda deduction bug on msvc 2013 468 .as_dynamic(); 469 } 470 ); 471 472 THEN("the output only contains items sent while subscribed"){ 473 auto required = rxu::to_vector({ 474 on.next(210, 9), 475 on.next(230, 13), 476 on.next(270, 7), 477 on.completed(270) 478 }); 479 auto actual = res.get_observer().messages(); 480 REQUIRE(required == actual); 481 } 482 483 THEN("there was 1 subscription/unsubscription to the source"){ 484 auto required = rxu::to_vector({ 485 on.subscribe(200, 270) 486 }); 487 auto actual = xs.subscriptions(); 488 REQUIRE(required == actual); 489 } 490 491 } 492 } 493 } 494 495 SCENARIO("take, dispose before", "[take][operators]"){ 496 GIVEN("a source"){ 497 auto sc = rxsc::make_test(); 498 auto w = sc.create_worker(); 499 const rxsc::test::messages<int> on; 500 501 auto xs = sc.make_hot_observable({ 502 on.next(70, 6), 503 on.next(150, 4), 504 on.next(210, 9), 505 on.next(230, 13), 506 on.next(270, 7), 507 on.next(280, 1), 508 on.next(300, -1), 509 on.next(310, 3), 510 on.next(340, 8), 511 on.next(370, 11), 512 on.next(410, 15), 513 on.next(415, 16), 514 on.next(460, 72), 515 on.next(510, 76), 516 on.next(560, 32), 517 on.next(570, -100), 518 on.next(580, -3), 519 on.next(590, 5), 520 on.next(630, 10) 521 }); 522 523 WHEN("3 values are taken"){ 524 525 auto res = w.start( __anon4424e56f0802() 526 [xs]() { 527 return xs 528 .take(3) 529 // forget type to workaround lambda deduction bug on msvc 2013 530 .as_dynamic(); 531 }, 532 250 533 ); 534 535 THEN("the output only contains items sent while subscribed"){ 536 auto required = rxu::to_vector({ 537 on.next(210, 9), 538 on.next(230, 13) 539 }); 540 auto actual = res.get_observer().messages(); 541 REQUIRE(required == actual); 542 } 543 544 THEN("there was 1 subscription/unsubscription to the source"){ 545 auto required = rxu::to_vector({ 546 on.subscribe(200, 250) 547 }); 548 auto actual = xs.subscriptions(); 549 REQUIRE(required == actual); 550 } 551 552 } 553 } 554 } 555 556 SCENARIO("take, dispose after", "[take][operators]"){ 557 GIVEN("a source"){ 558 auto sc = rxsc::make_test(); 559 auto w = sc.create_worker(); 560 const rxsc::test::messages<int> on; 561 562 auto xs = sc.make_hot_observable({ 563 on.next(70, 6), 564 on.next(150, 4), 565 on.next(210, 9), 566 on.next(230, 13), 567 on.next(270, 7), 568 on.next(280, 1), 569 on.next(300, -1), 570 on.next(310, 3), 571 on.next(340, 8), 572 on.next(370, 11), 573 on.next(410, 15), 574 on.next(415, 16), 575 on.next(460, 72), 576 on.next(510, 76), 577 on.next(560, 32), 578 on.next(570, -100), 579 on.next(580, -3), 580 on.next(590, 5), 581 on.next(630, 10) 582 }); 583 584 WHEN("3 values are taken"){ 585 586 auto res = w.start( __anon4424e56f0902() 587 [xs]() { 588 return xs 589 .take(3) 590 // forget type to workaround lambda deduction bug on msvc 2013 591 .as_dynamic(); 592 }, 593 400 594 ); 595 596 THEN("the output only contains items sent while subscribed"){ 597 auto required = rxu::to_vector({ 598 on.next(210, 9), 599 on.next(230, 13), 600 on.next(270, 7), 601 on.completed(270) 602 }); 603 auto actual = res.get_observer().messages(); 604 REQUIRE(required == actual); 605 } 606 607 THEN("there was 1 subscription/unsubscription to the source"){ 608 auto required = rxu::to_vector({ 609 on.subscribe(200, 270) 610 }); 611 auto actual = xs.subscriptions(); 612 REQUIRE(required == actual); 613 } 614 615 } 616 } 617 } 618 619