1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16 package com.spotify.futures;
17
18 import org.hamcrest.CustomTypeSafeMatcher;
19 import org.hamcrest.Matcher;
20 import org.jmock.lib.concurrent.DeterministicScheduler;
21 import org.junit.Before;
22 import org.junit.Rule;
23 import org.junit.Test;
24 import org.junit.rules.ExpectedException;
25
26 import java.lang.reflect.Constructor;
27 import java.lang.reflect.InvocationTargetException;
28 import java.time.Duration;
29 import java.util.List;
30 import java.util.Optional;
31 import java.util.concurrent.CancellationException;
32 import java.util.concurrent.CompletableFuture;
33 import java.util.concurrent.CompletionStage;
34 import java.util.concurrent.ScheduledExecutorService;
35 import java.util.concurrent.ScheduledFuture;
36 import java.util.function.Supplier;
37 import java.util.stream.Stream;
38
39 import static com.spotify.futures.CompletableFutures.allAsList;
40 import static com.spotify.futures.CompletableFutures.combine;
41 import static com.spotify.futures.CompletableFutures.combineFutures;
42 import static com.spotify.futures.CompletableFutures.dereference;
43 import static com.spotify.futures.CompletableFutures.exceptionallyCompletedFuture;
44 import static com.spotify.futures.CompletableFutures.exceptionallyCompose;
45 import static com.spotify.futures.CompletableFutures.getCompleted;
46 import static com.spotify.futures.CompletableFutures.getException;
47 import static com.spotify.futures.CompletableFutures.handleCompose;
48 import static com.spotify.futures.CompletableFutures.joinList;
49 import static com.spotify.futures.CompletableFutures.poll;
50 import static com.spotify.futures.CompletableFutures.successfulAsList;
51 import static java.util.Arrays.asList;
52 import static java.util.Collections.emptyList;
53 import static java.util.Collections.singletonList;
54 import static java.util.concurrent.CompletableFuture.completedFuture;
55 import static java.util.concurrent.TimeUnit.MILLISECONDS;
56 import static java.util.concurrent.TimeUnit.SECONDS;
57 import static java.util.stream.Collectors.toList;
58 import static org.hamcrest.Matchers.both;
59 import static org.hamcrest.Matchers.contains;
60 import static org.hamcrest.Matchers.hasProperty;
61 import static org.hamcrest.Matchers.hasSize;
62 import static org.hamcrest.Matchers.nullValue;
63 import static org.hamcrest.core.Is.is;
64 import static org.hamcrest.core.Is.isA;
65 import static org.hamcrest.core.IsNot.not;
66 import static org.junit.Assert.assertNull;
67 import static org.junit.Assert.assertThat;
68 import static org.mockito.Matchers.any;
69 import static org.mockito.Matchers.anyLong;
70 import static org.mockito.Matchers.eq;
71 import static org.mockito.Mockito.mock;
72 import static org.mockito.Mockito.verify;
73 import static org.mockito.Mockito.when;
74
75 public class CompletableFuturesTest {
76
77 private DeterministicScheduler executor;
78
79 @Rule
80 public ExpectedException exception = ExpectedException.none();
81
82 @Before
83 public void setUp() {
84 executor = new DeterministicScheduler();
85 }
86
87 @Test
88 public void allAsList_empty() throws Exception {
89 final List<CompletionStage<String>> input = emptyList();
90 assertThat(allAsList(input), completesTo(emptyList()));
91 }
92
93 @Test
94 public void allAsList_one() throws Exception {
95 final String value = "a";
96 final List<CompletionStage<String>> input = singletonList(completedFuture(value));
97 assertThat(allAsList(input), completesTo(singletonList(value)));
98 }
99
100 @Test
101 public void allAsList_multiple() throws Exception {
102 final List<String> values = asList("a", "b", "c");
103 final List<CompletableFuture<String>> input = values.stream()
104 .map(CompletableFuture::completedFuture)
105 .collect(toList());
106 assertThat(allAsList(input), completesTo(values));
107 }
108
109 @Test
110 public void allAsList_exceptional() throws Exception {
111 final RuntimeException ex = new RuntimeException("boom");
112 final List<CompletionStage<String>> input = asList(
113 completedFuture("a"),
114 exceptionallyCompletedFuture(ex),
115 completedFuture("b")
116 );
117
118 exception.expectCause(is(ex));
119 allAsList(input).get();
120 }
121
122 @Test
123 public void allAsList_null() throws Exception {
124 exception.expect(NullPointerException.class);
125 allAsList(null);
126 }
127
128 @Test
129 public void allAsList_containsNull() throws Exception {
130 final List<CompletionStage<String>> input = asList(
131 completedFuture("a"),
132 null,
133 completedFuture("b")
134 );
135
136 exception.expect(NullPointerException.class);
137 allAsList(input);
138 }
139
140 @Test
141 public void successfulAsList_exceptionalAndNull() throws Exception {
142 final List<CompletableFuture<String>> input = asList(
143 completedFuture("a"),
144 exceptionallyCompletedFuture(new RuntimeException("boom")),
145 completedFuture(null),
146 completedFuture("d")
147 );
148 final List<String> expected = asList("a", "default", null, "d");
149 assertThat(successfulAsList(input, t -> "default"), completesTo(expected));
150 }
151
152 @Test
153 public void getCompleted_done() throws Exception {
154 final CompletionStage<String> future = completedFuture("hello");
155 assertThat(getCompleted(future), is("hello"));
156 }
157
158 @Test
159 public void getCompleted_exceptional() throws Exception {
160 final Exception ex = new Exception("boom");
161 final CompletionStage<String> future = exceptionallyCompletedFuture(ex);
162 exception.expectCause(is(ex));
163 getCompleted(future);
164 }
165
166 @Test
167 public void getCompleted_nilResult() throws Exception {
168 final CompletableFuture<Void> future = completedFuture(null);
169 assertNull(getCompleted(future));
170 }
171
172 @Test
173 public void getCompleted_pending() throws Exception {
174 final CompletionStage<String> future = new CompletableFuture<>();
175
176 exception.expect(IllegalStateException.class);
177 getCompleted(future);
178 }
179
180 @Test
181 public void getException_completedExceptionally() throws Exception {
182 final Exception ex = new Exception("boom");
183 final CompletionStage<String> future = exceptionallyCompletedFuture(ex);
184 assertThat(getException(future), is(ex));
185 }
186
187 @Test
188 public void getException_completedNormally() throws Exception {
189 final CompletionStage<String> future = completedFuture("hello");
190 exception.expect(IllegalStateException.class);
191 getException(future);
192 }
193
194 @Test
195 public void getException_pending() throws Exception {
196 final CompletionStage<String> future = new CompletableFuture<>();
197 exception.expect(IllegalStateException.class);
198 getException(future);
199 }
200
201 @Test
202 public void getException_cancelled() throws Exception {
203 final CompletionStage<String> future = new CompletableFuture<>();
204 future.toCompletableFuture().cancel(true);
205 exception.expect(CancellationException.class);
206 getException(future);
207 }
208
209 @Test
210 public void getException_returnsNullIfImplementationDoesNotThrow() throws Exception {
211 final CompletableFuture<Void> future = new NonThrowingFuture<>();
212 future.completeExceptionally(new NullPointerException());
213 assertNull(getException(future));
214 }
215
216 @Test
217 public void exceptionallyCompletedFuture_completed() throws Exception {
218 final CompletableFuture<String> future = exceptionallyCompletedFuture(new Exception("boom"));
219 assertThat(future.isCompletedExceptionally(), is(true));
220 }
221
222 @Test
223 public void exceptionallyCompletedFuture_throws() throws Exception {
224 final Exception ex = new Exception("boom");
225 final CompletableFuture<String> future = exceptionallyCompletedFuture(ex);
226
227 exception.expectCause(is(ex));
228 future.get();
229 }
230
231 @Test
232 public void exceptionallyCompletedFuture_null() throws Exception {
233 exception.expect(NullPointerException.class);
234 exceptionallyCompletedFuture(null);
235 }
236
237 @Test
238 public void joinList_empty() throws Exception {
239 final List<String> result = Stream.<CompletableFuture<String>>of()
240 .collect(joinList())
241 .get();
242
243 assertThat(result, not(nullValue()));
244 assertThat(result, hasSize(0));
245 }
246
247 @Test
248 public void joinList_one() throws Exception {
249 final List<String> result = Stream.of(completedFuture("a"))
250 .collect(joinList())
251 .get();
252
253 assertThat(result, hasSize(1));
254 assertThat(result, contains("a"));
255 }
256
257 @Test
258 public void joinList_two() throws Exception {
259 final CompletableFuture<String> a = completedFuture("hello");
260 final CompletableFuture<String> b = completedFuture("world");
261
262 final List<String> result = Stream.of(a, b)
263 .collect(joinList())
264 .get();
265 assertThat(result, contains("hello", "world"));
266 }
267
268 @Test
269 public void joinList_mixedStageTypes() throws Exception {
270
271 final CompletionStage<String> a = completedFuture("hello");
272 final CompletableFuture<String> b = completedFuture("world");
273
274 final List<String> result = Stream.of(a, b)
275 .collect(joinList())
276 .get();
277 assertThat(result, contains("hello", "world"));
278 }
279
280 @Test
281 public void joinList_mixedValueTypes() throws Exception {
282
283 final CompletionStage<Integer> a = completedFuture(3);
284 final CompletableFuture<Long> b = completedFuture(4L);
285
286 final List<? extends Number> result = Stream.of(a, b)
287 .collect(joinList())
288 .get();
289 assertThat(result, contains(3, 4L));
290 }
291
292 @Test
293 public void joinList_exceptional() throws Exception {
294 final RuntimeException ex = new RuntimeException("boom");
295 final CompletableFuture<String> a = completedFuture("hello");
296 final CompletableFuture<String> b = exceptionallyCompletedFuture(ex);
297
298 final CompletableFuture<List<String>> result = Stream.of(a, b).collect(joinList());
299
300 exception.expectCause(is(ex));
301 result.get();
302 }
303
304 @Test
305 public void joinList_containsNull() throws Exception {
306 final CompletableFuture<String> a = completedFuture("hello");
307 final CompletableFuture<String> b = null;
308 final Stream<CompletableFuture<String>> stream = Stream.of(a, b);
309
310 exception.expect(NullPointerException.class);
311 stream.collect(joinList());
312 }
313
314 @Test
315 public void dereference_completed() throws Exception {
316 final CompletionStage<String> future = completedFuture("hello");
317 final CompletionStage<String> dereferenced = dereference(completedFuture(future));
318
319 assertThat(dereferenced, completesTo("hello"));
320 }
321
322 @Test
323 public void dereference_exceptional() throws Exception {
324 final IllegalArgumentException ex = new IllegalArgumentException();
325 final CompletionStage<Object> future = exceptionallyCompletedFuture(ex);
326 final CompletionStage<Object> dereferenced = dereference(completedFuture(future));
327
328 exception.expectCause(is(ex));
329 getCompleted(dereferenced);
330 }
331
332 @Test
333 public void dereference_null() throws Exception {
334 final CompletionStage<Object> dereferenced = dereference(completedFuture(null));
335
336 exception.expectCause(isA(NullPointerException.class));
337 getCompleted(dereferenced);
338 }
339
340 @Test
341 public void exceptionallyCompose_complete() throws Exception {
342 final CompletionStage<String> future = exceptionallyCompletedFuture(new Exception("boom"));
343 final CompletableFuture<String> fallback = completedFuture("hello");
344
345 final CompletionStage<String> composed = exceptionallyCompose(future, throwable -> fallback);
346
347 assertThat(composed, completesTo("hello"));
348 }
349
350 @Test
351 public void exceptionallyCompose_exceptional() throws Exception {
352 final CompletionStage<String> future = exceptionallyCompletedFuture(new Exception("boom"));
353 final IllegalStateException fallbackException = new IllegalStateException();
354 final CompletableFuture<String> fallback = exceptionallyCompletedFuture(fallbackException);
355
356 final CompletionStage<String> composed = exceptionallyCompose(future, throwable -> fallback);
357
358 exception.expectCause(is(fallbackException));
359 getCompleted(composed);
360 }
361
362 @Test
363 public void exceptionallyCompose_unused() throws Exception {
364 final CompletionStage<String> future = completedFuture("hello");
365 final IllegalStateException fallbackException = new IllegalStateException();
366 final CompletableFuture<String> fallback = exceptionallyCompletedFuture(fallbackException);
367
368 final CompletionStage<String> composed = exceptionallyCompose(future, throwable -> fallback);
369 assertThat(composed, completesTo("hello"));
370 }
371
372 @Test
373 public void exceptionallyCompose_throws() throws Exception {
374 final CompletionStage<String> future = exceptionallyCompletedFuture(new Exception("boom"));
375 final IllegalStateException ex = new IllegalStateException();
376
377 final CompletionStage<String> composed = exceptionallyCompose(future, throwable -> {
378 throw ex;
379 });
380
381 exception.expectCause(is(ex));
382 getCompleted(composed);
383 }
384
385 @Test
386 public void exceptionallyCompose_returnsNull() throws Exception {
387 final CompletionStage<String> future = exceptionallyCompletedFuture(new Exception("boom"));
388
389 final CompletionStage<String> composed = exceptionallyCompose(future, throwable -> null);
390
391 exception.expectCause(isA(NullPointerException.class));
392 getCompleted(composed);
393 }
394
395 @Test
396 public void handleCompose_completed() throws Exception {
397 final CompletionStage<String> future = exceptionallyCompletedFuture(new Exception("boom"));
398
399 final CompletionStage<String> composed =
400 handleCompose(future, (s, t) -> completedFuture("hello"));
401
402 assertThat(composed, completesTo("hello"));
403 }
404
405 @Test
406 public void handleCompose_failure() throws Exception {
407 final CompletionStage<String> future = exceptionallyCompletedFuture(new Exception("boom"));
408 final IllegalStateException ex = new IllegalStateException();
409
410 final CompletionStage<String> composed =
411 handleCompose(future, (s, t) -> exceptionallyCompletedFuture(ex));
412
413 exception.expectCause(is(ex));
414 getCompleted(composed);
415 }
416
417 @Test
418 public void handleCompose_throws() throws Exception {
419 final CompletionStage<String> future = exceptionallyCompletedFuture(new Exception("boom"));
420 final IllegalStateException ex = new IllegalStateException();
421
422 final CompletionStage<String> composed = handleCompose(future, (s, throwable) -> { throw ex; });
423
424 exception.expectCause(is(ex));
425 getCompleted(composed);
426 }
427
428 @Test
429 public void handleCompose_returnsNull() throws Exception {
430 final CompletionStage<String> future = exceptionallyCompletedFuture(new Exception("boom"));
431 final CompletionStage<String> composed = handleCompose(future, (s, throwable) -> null);
432
433 exception.expectCause(isA(NullPointerException.class));
434 getCompleted(composed);
435 }
436
437 @Test
438 public void combine2_completed() throws Exception {
439 final CompletionStage<String> future = combine(
440 completedFuture("a"), completedFuture("b"),
441 (a, b) -> a + b);
442
443 assertThat(future, completesTo("ab"));
444 }
445
446 @Test
447 public void combine2_exceptional() throws Exception {
448 final CompletionStage<String> future = combine(
449 completedFuture("a"),
450 exceptionallyCompletedFuture(new IllegalStateException()),
451 (a, b) -> a + b);
452
453 exception.expectCause(isA(IllegalStateException.class));
454 getCompleted(future);
455 }
456
457 @Test
458 public void combine3_completed() throws Exception {
459 final CompletionStage<String> future = combine(
460 completedFuture("a"), completedFuture("b"), completedFuture("c"),
461 (a, b, c) -> a + b + c);
462
463 assertThat(future, completesTo("abc"));
464 }
465
466 @Test
467 public void combine3_exceptional() throws Exception {
468 final CompletionStage<String> future = combine(
469 completedFuture("a"), completedFuture("b"),
470 exceptionallyCompletedFuture(new IllegalStateException()),
471 (a, b, c) -> a + b + c);
472
473 exception.expectCause(isA(IllegalStateException.class));
474 getCompleted(future);
475 }
476
477 @Test
478 public void combine4_completed() throws Exception {
479 final CompletionStage<String> future = combine(
480 completedFuture("a"), completedFuture("b"), completedFuture("c"),
481 completedFuture("d"),
482 (a, b, c, d) -> a + b + c + d);
483
484 assertThat(future, completesTo("abcd"));
485 }
486
487 @Test
488 public void combine4_exceptional() throws Exception {
489 final CompletionStage<String> future = combine(
490 completedFuture("a"), completedFuture("b"), completedFuture("c"),
491 exceptionallyCompletedFuture(new IllegalStateException()),
492 (a, b, c, d)-> a + b + c + d);
493
494 exception.expectCause(isA(IllegalStateException.class));
495 getCompleted(future);
496 }
497
498 @Test
499 public void combine4_incomplete() throws Exception {
500 final CompletionStage<String> future = combine(
501 completedFuture("a"), completedFuture("b"), completedFuture("c"),
502 incompleteFuture(),
503 (a, b, c, d) -> a + b + c + d);
504 exception.expect(isA(IllegalStateException.class));
505 getCompleted(future);
506 }
507
508 @Test
509 public void combine5_completed() throws Exception {
510 final CompletionStage<String> future = combine(
511 completedFuture("a"), completedFuture("b"), completedFuture("c"),
512 completedFuture("d"), completedFuture("e"),
513 (a, b, c, d, e) -> a + b + c + d + e);
514
515 assertThat(future, completesTo("abcde"));
516 }
517
518 @Test
519 public void combine5_exceptional() throws Exception {
520 final CompletionStage<String> future = combine(
521 completedFuture("a"), completedFuture("b"), completedFuture("c"),
522 completedFuture("d"),
523 exceptionallyCompletedFuture(new IllegalStateException()),
524 (a, b, c, d, e)-> a + b + c + d + e);
525
526 exception.expectCause(isA(IllegalStateException.class));
527 getCompleted(future);
528 }
529
530 @Test
531 public void combine5_incomplete() throws Exception {
532 final CompletionStage<String> future = combine(
533 completedFuture("a"), completedFuture("b"), completedFuture("c"),
534 completedFuture("d"),
535 incompleteFuture(),
536 (a, b, c, d, e) -> a + b + c + d + e);
537 exception.expect(isA(IllegalStateException.class));
538 getCompleted(future);
539 }
540
541 @Test
542 public void combine6_completed() throws Exception {
543 final CompletionStage<String> future = combine(
544 completedFuture("a"), completedFuture("b"), completedFuture("c"),
545 completedFuture("d"), completedFuture("e"), completedFuture("f"),
546 (a, b, c, d, e, f) -> a + b + c + d + e + f);
547
548 assertThat(future, completesTo("abcdef"));
549 }
550
551 @Test
552 public void combine6_exceptional() throws Exception {
553 final CompletionStage<String> future = combine(
554 completedFuture("a"), completedFuture("b"), completedFuture("c"),
555 completedFuture("d"), completedFuture("e"),
556 exceptionallyCompletedFuture(new IllegalStateException()),
557 (a, b, c, d, e, f)-> a + b + c + d + e + f);
558
559 exception.expectCause(isA(IllegalStateException.class));
560 getCompleted(future);
561 }
562
563 @Test
564 public void combine6_incomplete() throws Exception {
565 final CompletionStage<String> future = combine(
566 completedFuture("a"), completedFuture("b"), completedFuture("c"),
567 completedFuture("d"), completedFuture("e"),
568 incompleteFuture(),
569 (a, b, c, d, e, f) -> a + b + c + d + e + f);
570 exception.expect(isA(IllegalStateException.class));
571 getCompleted(future);
572 }
573
574 @Test
575 public void combineFutures2_completed() throws Exception {
576 final CompletionStage<String> future = combineFutures(
577 completedFuture("a"),
578 completedFuture("b"),
579 (a, b) -> completedFuture(a + b));
580
581 assertThat(future, completesTo("ab"));
582 }
583
584 @Test
585 public void combineFutures2_incomplete() throws Exception {
586 final CompletionStage<String> future = combineFutures(
587 completedFuture("a"),
588 incompleteFuture(),
589 (a, b) -> completedFuture(a + b));
590
591 exception.expect(isA(IllegalStateException.class));
592 getCompleted(future);
593 }
594
595 @Test
596 public void combineFutures2_exceptional() throws Exception {
597 final CompletionStage<String> future = combineFutures(
598 completedFuture("a"),
599 exceptionallyCompletedFuture(new IllegalStateException()),
600 (a, b) -> completedFuture(a + b));
601
602 exception.expectCause(isA(IllegalStateException.class));
603 getCompleted(future);
604 }
605
606 @Test
607 public void combineFutures3_completed() throws Exception {
608 final CompletionStage<String> future = combineFutures(
609 completedFuture("a"),
610 completedFuture("b"),
611 completedFuture("c"),
612 (a, b, c) -> completedFuture(a + b + c));
613
614 assertThat(future, completesTo("abc"));
615 }
616
617 @Test
618 public void combineFutures3_incomplete() throws Exception {
619 final CompletionStage<String> future = combineFutures(
620 completedFuture("a"),
621 completedFuture("b"),
622 incompleteFuture(),
623 (a, b, c) -> completedFuture(a + b + c));
624
625 exception.expect(isA(IllegalStateException.class));
626 getCompleted(future);
627 }
628
629 @Test
630 public void combineFutures3_exceptional() throws Exception {
631 final CompletionStage<String> future = combineFutures(
632 completedFuture("a"),
633 completedFuture("b"),
634 exceptionallyCompletedFuture(new IllegalStateException()),
635 (a, b, c) -> completedFuture(a + b + c));
636
637 exception.expectCause(isA(IllegalStateException.class));
638 getCompleted(future);
639 }
640
641 @Test
642 public void combineFutures4_completed() throws Exception {
643 final CompletionStage<String> future = combineFutures(
644 completedFuture("a"),
645 completedFuture("b"),
646 completedFuture("c"),
647 completedFuture("d"),
648 (a, b, c, d) -> completedFuture(a + b + c + d));
649
650 assertThat(future, completesTo("abcd"));
651 }
652
653 @Test
654 public void combineFutures4_incomplete() throws Exception {
655 final CompletionStage<String> future = combineFutures(
656 completedFuture("a"),
657 completedFuture("b"),
658 completedFuture("c"),
659 incompleteFuture(),
660 (a, b, c, d) -> completedFuture(a + b + c + d));
661
662 exception.expect(isA(IllegalStateException.class));
663 getCompleted(future);
664 }
665
666 @Test
667 public void combineFutures4_exceptional() throws Exception {
668 final CompletionStage<String> future = combineFutures(
669 completedFuture("a"),
670 completedFuture("b"),
671 completedFuture("c"),
672 exceptionallyCompletedFuture(new IllegalStateException()),
673 (a, b, c, d) -> completedFuture(a + b + c + d));
674
675 exception.expectCause(isA(IllegalStateException.class));
676 getCompleted(future);
677 }
678
679 @Test
680 public void combineFutures5_completed() throws Exception {
681 final CompletionStage<String> future = combineFutures(
682 completedFuture("a"),
683 completedFuture("b"),
684 completedFuture("c"),
685 completedFuture("d"),
686 completedFuture("e"),
687 (a, b, c, d, e) -> completedFuture(a + b + c + d + e));
688
689 assertThat(future, completesTo("abcde"));
690 }
691
692 @Test
693 public void combineFutures5_incomplete() throws Exception {
694 final CompletionStage<String> future = combineFutures(
695 completedFuture("a"),
696 completedFuture("b"),
697 completedFuture("c"),
698 completedFuture("d"),
699 incompleteFuture(),
700 (a, b, c, d, e) -> completedFuture(a + b + c + d + e));
701
702 exception.expect(isA(IllegalStateException.class));
703 getCompleted(future);
704 }
705
706 @Test
707 public void combineFutures5_exceptional() throws Exception {
708 final CompletionStage<String> future = combineFutures(
709 completedFuture("a"),
710 completedFuture("b"),
711 completedFuture("c"),
712 completedFuture("d"),
713 exceptionallyCompletedFuture(new IllegalStateException()),
714 (a, b, c, d, e) -> completedFuture(a + b + c + d + e));
715
716 exception.expectCause(isA(IllegalStateException.class));
717 getCompleted(future);
718 }
719
720 @Test
721 public void combineFutures6_completed() throws Exception {
722 final CompletionStage<String> future = combineFutures(
723 completedFuture("a"),
724 completedFuture("b"),
725 completedFuture("c"),
726 completedFuture("d"),
727 completedFuture("e"),
728 completedFuture("f"),
729 (a, b, c, d, e, f) -> completedFuture(a + b + c + d + e + f));
730
731 assertThat(future, completesTo("abcdef"));
732 }
733
734 @Test
735 public void combineFutures6_incomplete() throws Exception {
736 final CompletionStage<String> future = combineFutures(
737 completedFuture("a"),
738 completedFuture("b"),
739 completedFuture("c"),
740 completedFuture("d"),
741 completedFuture("e"),
742 incompleteFuture(),
743 (a, b, c, d, e, f) -> completedFuture(a + b + c + d + e + f));
744
745 exception.expect(isA(IllegalStateException.class));
746 getCompleted(future);
747 }
748
749 @Test
750 public void combineFutures6_exceptional() throws Exception {
751 final CompletionStage<String> future = combineFutures(
752 completedFuture("a"),
753 completedFuture("b"),
754 completedFuture("c"),
755 completedFuture("d"),
756 completedFuture("e"),
757 exceptionallyCompletedFuture(new IllegalStateException()),
758 (a, b, c, d, e, f) -> completedFuture(a + b + c + d + e + f));
759
760 exception.expectCause(isA(IllegalStateException.class));
761 getCompleted(future);
762 }
763
764 @Test
765 public void ctor_preventInstantiation() throws Exception {
766 exception.expect(both(isA(InvocationTargetException.class))
767 .and(hasProperty("cause", isA(IllegalAccessError.class))));
768
769 final Constructor<CompletableFutures> ctor = CompletableFutures.class.getDeclaredConstructor();
770 ctor.setAccessible(true);
771 ctor.newInstance();
772 }
773
774 @Test
775 public void poll_done() throws Exception {
776 final Supplier<Optional<String>> supplier = () -> Optional.of("done");
777 final CompletableFuture<String> future = poll(supplier, Duration.ofMillis(2), executor);
778
779 executor.runNextPendingCommand();
780 assertThat(future, completesTo("done"));
781 }
782
783 @Test
784 public void poll_twice() throws Exception {
785 final List<Optional<String>> results = asList(Optional.empty(), Optional.of("done"));
786 final Supplier<Optional<String>> supplier = results.iterator()::next;
787 final CompletableFuture<String> future = poll(supplier, Duration.ofMillis(2), executor);
788
789 executor.tick(1, MILLISECONDS);
790 assertThat(future.isDone(), is(false));
791
792 executor.tick(10, MILLISECONDS);
793 assertThat(future, completesTo("done"));
794 }
795
796 @Test
797 public void poll_taskReturnsNull() throws Exception {
798 final Supplier<Optional<String>> supplier = () -> null;
799 final CompletableFuture<String> future = poll(supplier, Duration.ofMillis(2), executor);
800
801 executor.runNextPendingCommand();
802 exception.expectCause(isA(NullPointerException.class));
803 future.get();
804 }
805
806 @Test
807 public void poll_taskThrows() throws Exception {
808 final RuntimeException ex = new RuntimeException("boom");
809 final Supplier<Optional<String>> supplier = () -> {throw ex;};
810 final CompletableFuture<String> future = poll(supplier, Duration.ofMillis(2), executor);
811
812 executor.runNextPendingCommand();
813 exception.expectCause(is(ex));
814 future.get();
815 }
816
817 @Test
818 public void poll_scheduled() throws Exception {
819 final ScheduledExecutorService executor = mock(ScheduledExecutorService.class);
820 final Supplier<Optional<String>> supplier = () -> Optional.of("hello");
821 poll(supplier, Duration.ofMillis(2), executor);
822
823 verify(executor).scheduleAtFixedRate(any(), eq(0L), eq(2L), eq(MILLISECONDS));
824 }
825
826 @Test
827 @SuppressWarnings("unchecked")
828 public void poll_resultFutureCanceled() throws Exception {
829 final ScheduledFuture scheduledFuture = mock(ScheduledFuture.class);
830 final ScheduledExecutorService executor = mock(ScheduledExecutorService.class);
831 when(executor.scheduleAtFixedRate(any(), anyLong(), anyLong(), any()))
832 .thenReturn(scheduledFuture);
833
834 final CompletableFuture<String> future = poll(Optional::empty, Duration.ofMillis(2), executor);
835 future.cancel(true);
836
837 verify(scheduledFuture).cancel(true);
838 }
839
840 @Test
841 public void poll_notRunningAfterCancel() throws Exception {
842 final CompletableFuture<String> future = poll(Optional::empty, Duration.ofMillis(2), executor);
843
844 future.cancel(true);
845
846 executor.tick(5, MILLISECONDS);
847 assertThat(executor.isIdle(), is(true));
848 }
849
850 private static <T> CompletableFuture<T> incompleteFuture() {
851 return new CompletableFuture<>();
852 }
853
854 private static <T> Matcher<CompletionStage<T>> completesTo(final T expected) {
855 return completesTo(is(expected));
856 }
857
858 private static <T> Matcher<CompletionStage<T>> completesTo(final Matcher<T> expected) {
859 return new CustomTypeSafeMatcher<CompletionStage<T>>("completes to " + String.valueOf(expected)) {
860 @Override
861 protected boolean matchesSafely(CompletionStage<T> item) {
862 try {
863 final T value = item.toCompletableFuture().get(1, SECONDS);
864 return expected.matches(value);
865 } catch (Exception ex) {
866 return false;
867 }
868 }
869 };
870 }
871
872 private static class NonThrowingFuture<T> extends CompletableFuture<T> {
873 @Override
874 public T join() {
875 if (this.isCompletedExceptionally()) {
876 return null;
877 }
878 return super.join();
879 }
880 }
881
882 }