View Javadoc
1   /*
2    * Copyright (c) 2016 Spotify AB
3    *
4    * Licensed under the Apache License, Version 2.0 (the "License"); you may not
5    * use this file except in compliance with the License. You may obtain a copy of
6    * the License at
7    *
8    * http://www.apache.org/licenses/LICENSE-2.0
9    *
10   * Unless required by applicable law or agreed to in writing, software
11   * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12   * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13   * License for the specific language governing permissions and limitations under
14   * the License.
15   */
16  package com.spotify.futures;
17  
18  import org.hamcrest.CustomTypeSafeMatcher;
19  import org.hamcrest.Matcher;
20  import org.jmock.lib.concurrent.DeterministicScheduler;
21  import org.junit.Before;
22  import org.junit.Rule;
23  import org.junit.Test;
24  import org.junit.rules.ExpectedException;
25  
26  import java.lang.reflect.Constructor;
27  import java.lang.reflect.InvocationTargetException;
28  import java.time.Duration;
29  import java.util.List;
30  import java.util.Optional;
31  import java.util.concurrent.CancellationException;
32  import java.util.concurrent.CompletableFuture;
33  import java.util.concurrent.CompletionStage;
34  import java.util.concurrent.ScheduledExecutorService;
35  import java.util.concurrent.ScheduledFuture;
36  import java.util.function.Supplier;
37  import java.util.stream.Stream;
38  
39  import static com.spotify.futures.CompletableFutures.allAsList;
40  import static com.spotify.futures.CompletableFutures.combine;
41  import static com.spotify.futures.CompletableFutures.combineFutures;
42  import static com.spotify.futures.CompletableFutures.dereference;
43  import static com.spotify.futures.CompletableFutures.exceptionallyCompletedFuture;
44  import static com.spotify.futures.CompletableFutures.exceptionallyCompose;
45  import static com.spotify.futures.CompletableFutures.getCompleted;
46  import static com.spotify.futures.CompletableFutures.getException;
47  import static com.spotify.futures.CompletableFutures.handleCompose;
48  import static com.spotify.futures.CompletableFutures.joinList;
49  import static com.spotify.futures.CompletableFutures.poll;
50  import static com.spotify.futures.CompletableFutures.successfulAsList;
51  import static java.util.Arrays.asList;
52  import static java.util.Collections.emptyList;
53  import static java.util.Collections.singletonList;
54  import static java.util.concurrent.CompletableFuture.completedFuture;
55  import static java.util.concurrent.TimeUnit.MILLISECONDS;
56  import static java.util.concurrent.TimeUnit.SECONDS;
57  import static java.util.stream.Collectors.toList;
58  import static org.hamcrest.Matchers.both;
59  import static org.hamcrest.Matchers.contains;
60  import static org.hamcrest.Matchers.hasProperty;
61  import static org.hamcrest.Matchers.hasSize;
62  import static org.hamcrest.Matchers.nullValue;
63  import static org.hamcrest.core.Is.is;
64  import static org.hamcrest.core.Is.isA;
65  import static org.hamcrest.core.IsNot.not;
66  import static org.junit.Assert.assertNull;
67  import static org.junit.Assert.assertThat;
68  import static org.mockito.Matchers.any;
69  import static org.mockito.Matchers.anyLong;
70  import static org.mockito.Matchers.eq;
71  import static org.mockito.Mockito.mock;
72  import static org.mockito.Mockito.verify;
73  import static org.mockito.Mockito.when;
74  
75  public class CompletableFuturesTest {
76  
77    private DeterministicScheduler executor;
78  
79    @Rule
80    public ExpectedException exception = ExpectedException.none();
81  
82    @Before
83    public void setUp() {
84      executor = new DeterministicScheduler();
85    }
86  
87    @Test
88    public void allAsList_empty() throws Exception {
89      final List<CompletionStage<String>> input = emptyList();
90      assertThat(allAsList(input), completesTo(emptyList()));
91    }
92  
93    @Test
94    public void allAsList_one() throws Exception {
95      final String value = "a";
96      final List<CompletionStage<String>> input = singletonList(completedFuture(value));
97      assertThat(allAsList(input), completesTo(singletonList(value)));
98    }
99  
100   @Test
101   public void allAsList_multiple() throws Exception {
102     final List<String> values = asList("a", "b", "c");
103     final List<CompletableFuture<String>> input = values.stream()
104         .map(CompletableFuture::completedFuture)
105         .collect(toList());
106     assertThat(allAsList(input), completesTo(values));
107   }
108 
109   @Test
110   public void allAsList_exceptional() throws Exception {
111     final RuntimeException ex = new RuntimeException("boom");
112     final List<CompletionStage<String>> input = asList(
113         completedFuture("a"),
114         exceptionallyCompletedFuture(ex),
115         completedFuture("b")
116     );
117 
118     exception.expectCause(is(ex));
119     allAsList(input).get();
120   }
121 
122   @Test
123   public void allAsList_null() throws Exception {
124     exception.expect(NullPointerException.class);
125     allAsList(null);
126   }
127 
128   @Test
129   public void allAsList_containsNull() throws Exception {
130     final List<CompletionStage<String>> input = asList(
131         completedFuture("a"),
132         null,
133         completedFuture("b")
134     );
135 
136     exception.expect(NullPointerException.class);
137     allAsList(input);
138   }
139 
140   @Test
141   public void successfulAsList_exceptionalAndNull() throws Exception {
142     final List<CompletableFuture<String>> input = asList(
143         completedFuture("a"),
144         exceptionallyCompletedFuture(new RuntimeException("boom")),
145         completedFuture(null),
146         completedFuture("d")
147     );
148     final List<String> expected = asList("a", "default", null, "d");
149     assertThat(successfulAsList(input, t -> "default"), completesTo(expected));
150   }
151 
152   @Test
153   public void getCompleted_done() throws Exception {
154     final CompletionStage<String> future = completedFuture("hello");
155     assertThat(getCompleted(future), is("hello"));
156   }
157 
158   @Test
159   public void getCompleted_exceptional() throws Exception {
160     final Exception ex = new Exception("boom");
161     final CompletionStage<String> future = exceptionallyCompletedFuture(ex);
162     exception.expectCause(is(ex));
163     getCompleted(future);
164   }
165 
166   @Test
167   public void getCompleted_nilResult() throws Exception {
168     final CompletableFuture<Void> future = completedFuture(null);
169     assertNull(getCompleted(future));
170   }
171 
172   @Test
173   public void getCompleted_pending() throws Exception {
174     final CompletionStage<String> future = new CompletableFuture<>();
175 
176     exception.expect(IllegalStateException.class);
177     getCompleted(future);
178   }
179 
180   @Test
181   public void getException_completedExceptionally() throws Exception {
182     final Exception ex = new Exception("boom");
183     final CompletionStage<String> future = exceptionallyCompletedFuture(ex);
184     assertThat(getException(future), is(ex));
185   }
186 
187   @Test
188   public void getException_completedNormally() throws Exception {
189     final CompletionStage<String> future = completedFuture("hello");
190     exception.expect(IllegalStateException.class);
191     getException(future);
192   }
193 
194   @Test
195   public void getException_pending() throws Exception {
196     final CompletionStage<String> future = new CompletableFuture<>();
197     exception.expect(IllegalStateException.class);
198     getException(future);
199   }
200 
201   @Test
202   public void getException_cancelled() throws Exception {
203     final CompletionStage<String> future = new CompletableFuture<>();
204     future.toCompletableFuture().cancel(true);
205     exception.expect(CancellationException.class);
206     getException(future);
207   }
208 
209   @Test
210   public void getException_returnsNullIfImplementationDoesNotThrow() throws Exception {
211     final CompletableFuture<Void> future = new NonThrowingFuture<>();
212     future.completeExceptionally(new NullPointerException());
213     assertNull(getException(future));
214   }
215 
216   @Test
217   public void exceptionallyCompletedFuture_completed() throws Exception {
218     final CompletableFuture<String> future = exceptionallyCompletedFuture(new Exception("boom"));
219     assertThat(future.isCompletedExceptionally(), is(true));
220   }
221 
222   @Test
223   public void exceptionallyCompletedFuture_throws() throws Exception {
224     final Exception ex = new Exception("boom");
225     final CompletableFuture<String> future = exceptionallyCompletedFuture(ex);
226 
227     exception.expectCause(is(ex));
228     future.get();
229   }
230 
231   @Test
232   public void exceptionallyCompletedFuture_null() throws Exception {
233     exception.expect(NullPointerException.class);
234     exceptionallyCompletedFuture(null);
235   }
236 
237   @Test
238   public void joinList_empty() throws Exception {
239     final List<String> result = Stream.<CompletableFuture<String>>of()
240         .collect(joinList())
241         .get();
242 
243     assertThat(result, not(nullValue()));
244     assertThat(result, hasSize(0));
245   }
246 
247   @Test
248   public void joinList_one() throws Exception {
249     final List<String> result = Stream.of(completedFuture("a"))
250         .collect(joinList())
251         .get();
252 
253     assertThat(result, hasSize(1));
254     assertThat(result, contains("a"));
255   }
256 
257   @Test
258   public void joinList_two() throws Exception {
259     final CompletableFuture<String> a = completedFuture("hello");
260     final CompletableFuture<String> b = completedFuture("world");
261 
262     final List<String> result = Stream.of(a, b)
263         .collect(joinList())
264         .get();
265     assertThat(result, contains("hello", "world"));
266   }
267 
268   @Test
269   public void joinList_mixedStageTypes() throws Exception {
270     // Note that a and b use different subclasses of CompletionStage
271     final CompletionStage<String> a = completedFuture("hello");
272     final CompletableFuture<String> b = completedFuture("world");
273 
274     final List<String> result = Stream.of(a, b)
275         .collect(joinList())
276         .get();
277     assertThat(result, contains("hello", "world"));
278   }
279 
280   @Test
281   public void joinList_mixedValueTypes() throws Exception {
282     // Note that a and b have different result types
283     final CompletionStage<Integer> a = completedFuture(3);
284     final CompletableFuture<Long> b = completedFuture(4L);
285 
286     final List<? extends Number> result = Stream.of(a, b)
287         .collect(joinList())
288         .get();
289     assertThat(result, contains(3, 4L));
290   }
291 
292   @Test
293   public void joinList_exceptional() throws Exception {
294     final RuntimeException ex = new RuntimeException("boom");
295     final CompletableFuture<String> a = completedFuture("hello");
296     final CompletableFuture<String> b = exceptionallyCompletedFuture(ex);
297 
298     final CompletableFuture<List<String>> result = Stream.of(a, b).collect(joinList());
299 
300     exception.expectCause(is(ex));
301     result.get();
302   }
303 
304   @Test
305   public void joinList_containsNull() throws Exception {
306     final CompletableFuture<String> a = completedFuture("hello");
307     final CompletableFuture<String> b = null;
308     final Stream<CompletableFuture<String>> stream = Stream.of(a, b);
309 
310     exception.expect(NullPointerException.class);
311     stream.collect(joinList());
312   }
313 
314   @Test
315   public void dereference_completed() throws Exception {
316     final CompletionStage<String> future = completedFuture("hello");
317     final CompletionStage<String> dereferenced = dereference(completedFuture(future));
318 
319     assertThat(dereferenced, completesTo("hello"));
320   }
321 
322   @Test
323   public void dereference_exceptional() throws Exception {
324     final IllegalArgumentException ex = new IllegalArgumentException();
325     final CompletionStage<Object> future = exceptionallyCompletedFuture(ex);
326     final CompletionStage<Object> dereferenced = dereference(completedFuture(future));
327 
328     exception.expectCause(is(ex));
329     getCompleted(dereferenced);
330   }
331 
332   @Test
333   public void dereference_null() throws Exception {
334     final CompletionStage<Object> dereferenced = dereference(completedFuture(null));
335 
336     exception.expectCause(isA(NullPointerException.class));
337     getCompleted(dereferenced);
338   }
339 
340   @Test
341   public void exceptionallyCompose_complete() throws Exception {
342     final CompletionStage<String> future = exceptionallyCompletedFuture(new Exception("boom"));
343     final CompletableFuture<String> fallback = completedFuture("hello");
344 
345     final CompletionStage<String> composed = exceptionallyCompose(future, throwable -> fallback);
346 
347     assertThat(composed, completesTo("hello"));
348   }
349 
350   @Test
351   public void exceptionallyCompose_exceptional() throws Exception {
352     final CompletionStage<String> future = exceptionallyCompletedFuture(new Exception("boom"));
353     final IllegalStateException fallbackException = new IllegalStateException();
354     final CompletableFuture<String> fallback = exceptionallyCompletedFuture(fallbackException);
355 
356     final CompletionStage<String> composed = exceptionallyCompose(future, throwable -> fallback);
357 
358     exception.expectCause(is(fallbackException));
359     getCompleted(composed);
360   }
361 
362   @Test
363   public void exceptionallyCompose_unused() throws Exception {
364     final CompletionStage<String> future = completedFuture("hello");
365     final IllegalStateException fallbackException = new IllegalStateException();
366     final CompletableFuture<String> fallback = exceptionallyCompletedFuture(fallbackException);
367 
368     final CompletionStage<String> composed = exceptionallyCompose(future, throwable -> fallback);
369     assertThat(composed, completesTo("hello"));
370   }
371 
372   @Test
373   public void exceptionallyCompose_throws() throws Exception {
374     final CompletionStage<String> future = exceptionallyCompletedFuture(new Exception("boom"));
375     final IllegalStateException ex = new IllegalStateException();
376 
377     final CompletionStage<String> composed = exceptionallyCompose(future, throwable -> {
378       throw ex;
379     });
380 
381     exception.expectCause(is(ex));
382     getCompleted(composed);
383   }
384 
385   @Test
386   public void exceptionallyCompose_returnsNull() throws Exception {
387     final CompletionStage<String> future = exceptionallyCompletedFuture(new Exception("boom"));
388 
389     final CompletionStage<String> composed = exceptionallyCompose(future, throwable -> null);
390 
391     exception.expectCause(isA(NullPointerException.class));
392     getCompleted(composed);
393   }
394 
395   @Test
396   public void handleCompose_completed() throws Exception {
397     final CompletionStage<String> future = exceptionallyCompletedFuture(new Exception("boom"));
398 
399     final CompletionStage<String> composed =
400         handleCompose(future, (s, t) -> completedFuture("hello"));
401 
402     assertThat(composed, completesTo("hello"));
403   }
404 
405   @Test
406   public void handleCompose_failure() throws Exception {
407     final CompletionStage<String> future = exceptionallyCompletedFuture(new Exception("boom"));
408     final IllegalStateException ex = new IllegalStateException();
409 
410     final CompletionStage<String> composed =
411         handleCompose(future, (s, t) -> exceptionallyCompletedFuture(ex));
412 
413     exception.expectCause(is(ex));
414     getCompleted(composed);
415   }
416 
417   @Test
418   public void handleCompose_throws() throws Exception {
419     final CompletionStage<String> future = exceptionallyCompletedFuture(new Exception("boom"));
420     final IllegalStateException ex = new IllegalStateException();
421 
422     final CompletionStage<String> composed = handleCompose(future, (s, throwable) -> { throw ex; });
423 
424     exception.expectCause(is(ex));
425     getCompleted(composed);
426   }
427 
428   @Test
429   public void handleCompose_returnsNull() throws Exception {
430     final CompletionStage<String> future = exceptionallyCompletedFuture(new Exception("boom"));
431     final CompletionStage<String> composed = handleCompose(future, (s, throwable) -> null);
432 
433     exception.expectCause(isA(NullPointerException.class));
434     getCompleted(composed);
435   }
436 
437   @Test
438   public void combine2_completed() throws Exception {
439     final CompletionStage<String> future = combine(
440         completedFuture("a"), completedFuture("b"),
441         (a, b) -> a + b);
442 
443     assertThat(future, completesTo("ab"));
444   }
445 
446   @Test
447   public void combine2_exceptional() throws Exception {
448     final CompletionStage<String> future = combine(
449         completedFuture("a"),
450         exceptionallyCompletedFuture(new IllegalStateException()),
451         (a, b) -> a + b);
452 
453     exception.expectCause(isA(IllegalStateException.class));
454     getCompleted(future);
455   }
456 
457   @Test
458   public void combine3_completed() throws Exception {
459     final CompletionStage<String> future = combine(
460         completedFuture("a"), completedFuture("b"), completedFuture("c"),
461         (a, b, c) -> a + b + c);
462 
463     assertThat(future, completesTo("abc"));
464   }
465 
466   @Test
467   public void combine3_exceptional() throws Exception {
468     final CompletionStage<String> future = combine(
469         completedFuture("a"), completedFuture("b"),
470         exceptionallyCompletedFuture(new IllegalStateException()),
471         (a, b, c) -> a + b + c);
472 
473     exception.expectCause(isA(IllegalStateException.class));
474     getCompleted(future);
475   }
476 
477   @Test
478   public void combine4_completed() throws Exception {
479     final CompletionStage<String> future = combine(
480         completedFuture("a"), completedFuture("b"), completedFuture("c"),
481         completedFuture("d"),
482         (a, b, c, d) -> a + b + c + d);
483 
484     assertThat(future, completesTo("abcd"));
485   }
486 
487   @Test
488   public void combine4_exceptional() throws Exception {
489     final CompletionStage<String> future = combine(
490         completedFuture("a"), completedFuture("b"), completedFuture("c"),
491         exceptionallyCompletedFuture(new IllegalStateException()),
492             (a, b, c, d)-> a + b + c + d);
493 
494     exception.expectCause(isA(IllegalStateException.class));
495     getCompleted(future);
496   }
497 
498   @Test
499   public void combine4_incomplete() throws Exception {
500     final CompletionStage<String> future = combine(
501         completedFuture("a"), completedFuture("b"), completedFuture("c"),
502         incompleteFuture(),
503             (a, b, c, d) -> a + b + c + d);
504     exception.expect(isA(IllegalStateException.class));
505     getCompleted(future);
506   }
507 
508   @Test
509   public void combine5_completed() throws Exception {
510     final CompletionStage<String> future = combine(
511         completedFuture("a"), completedFuture("b"), completedFuture("c"),
512         completedFuture("d"), completedFuture("e"),
513         (a, b, c, d, e) -> a + b + c + d + e);
514 
515     assertThat(future, completesTo("abcde"));
516   }
517 
518   @Test
519   public void combine5_exceptional() throws Exception {
520     final CompletionStage<String> future = combine(
521         completedFuture("a"), completedFuture("b"), completedFuture("c"),
522         completedFuture("d"),
523         exceptionallyCompletedFuture(new IllegalStateException()),
524             (a, b, c, d, e)-> a + b + c + d + e);
525 
526     exception.expectCause(isA(IllegalStateException.class));
527     getCompleted(future);
528   }
529 
530   @Test
531   public void combine5_incomplete() throws Exception {
532     final CompletionStage<String> future = combine(
533         completedFuture("a"), completedFuture("b"), completedFuture("c"),
534         completedFuture("d"),
535         incompleteFuture(),
536         (a, b, c, d, e) -> a + b + c + d + e);
537     exception.expect(isA(IllegalStateException.class));
538     getCompleted(future);
539   }
540 
541   @Test
542   public void combine6_completed() throws Exception {
543     final CompletionStage<String> future = combine(
544         completedFuture("a"), completedFuture("b"), completedFuture("c"),
545         completedFuture("d"), completedFuture("e"), completedFuture("f"),
546         (a, b, c, d, e, f) -> a + b + c + d + e + f);
547 
548     assertThat(future, completesTo("abcdef"));
549   }
550 
551   @Test
552   public void combine6_exceptional() throws Exception {
553     final CompletionStage<String> future = combine(
554         completedFuture("a"), completedFuture("b"), completedFuture("c"),
555         completedFuture("d"), completedFuture("e"),
556         exceptionallyCompletedFuture(new IllegalStateException()),
557         (a, b, c, d, e, f)-> a + b + c + d + e + f);
558 
559     exception.expectCause(isA(IllegalStateException.class));
560     getCompleted(future);
561   }
562 
563   @Test
564   public void combine6_incomplete() throws Exception {
565     final CompletionStage<String> future = combine(
566         completedFuture("a"), completedFuture("b"), completedFuture("c"),
567         completedFuture("d"), completedFuture("e"),
568         incompleteFuture(),
569         (a, b, c, d, e, f) -> a + b + c + d + e + f);
570     exception.expect(isA(IllegalStateException.class));
571     getCompleted(future);
572   }
573 
574   @Test
575   public void combineFutures2_completed() throws Exception {
576     final CompletionStage<String> future = combineFutures(
577         completedFuture("a"),
578         completedFuture("b"),
579         (a, b) -> completedFuture(a + b));
580 
581     assertThat(future, completesTo("ab"));
582   }
583 
584   @Test
585   public void combineFutures2_incomplete() throws Exception {
586     final CompletionStage<String> future = combineFutures(
587         completedFuture("a"),
588         incompleteFuture(),
589         (a, b) -> completedFuture(a + b));
590 
591     exception.expect(isA(IllegalStateException.class));
592     getCompleted(future);
593   }
594 
595   @Test
596   public void combineFutures2_exceptional() throws Exception {
597     final CompletionStage<String> future = combineFutures(
598         completedFuture("a"),
599         exceptionallyCompletedFuture(new IllegalStateException()),
600         (a, b) -> completedFuture(a + b));
601 
602     exception.expectCause(isA(IllegalStateException.class));
603     getCompleted(future);
604   }
605 
606   @Test
607   public void combineFutures3_completed() throws Exception {
608     final CompletionStage<String> future = combineFutures(
609         completedFuture("a"),
610         completedFuture("b"),
611         completedFuture("c"),
612         (a, b, c) -> completedFuture(a + b + c));
613 
614     assertThat(future, completesTo("abc"));
615   }
616 
617   @Test
618   public void combineFutures3_incomplete() throws Exception {
619     final CompletionStage<String> future = combineFutures(
620         completedFuture("a"),
621         completedFuture("b"),
622         incompleteFuture(),
623         (a, b, c) -> completedFuture(a + b + c));
624 
625     exception.expect(isA(IllegalStateException.class));
626     getCompleted(future);
627   }
628 
629   @Test
630   public void combineFutures3_exceptional() throws Exception {
631     final CompletionStage<String> future = combineFutures(
632         completedFuture("a"),
633         completedFuture("b"),
634         exceptionallyCompletedFuture(new IllegalStateException()),
635         (a, b, c) -> completedFuture(a + b + c));
636 
637     exception.expectCause(isA(IllegalStateException.class));
638     getCompleted(future);
639   }
640 
641   @Test
642   public void combineFutures4_completed() throws Exception {
643     final CompletionStage<String> future = combineFutures(
644         completedFuture("a"),
645         completedFuture("b"),
646         completedFuture("c"),
647         completedFuture("d"),
648         (a, b, c, d) -> completedFuture(a + b + c + d));
649 
650     assertThat(future, completesTo("abcd"));
651   }
652 
653   @Test
654   public void combineFutures4_incomplete() throws Exception {
655     final CompletionStage<String> future = combineFutures(
656         completedFuture("a"),
657         completedFuture("b"),
658         completedFuture("c"),
659         incompleteFuture(),
660         (a, b, c, d) -> completedFuture(a + b + c + d));
661 
662     exception.expect(isA(IllegalStateException.class));
663     getCompleted(future);
664   }
665 
666   @Test
667   public void combineFutures4_exceptional() throws Exception {
668     final CompletionStage<String> future = combineFutures(
669         completedFuture("a"),
670         completedFuture("b"),
671         completedFuture("c"),
672         exceptionallyCompletedFuture(new IllegalStateException()),
673         (a, b, c, d) -> completedFuture(a + b + c + d));
674 
675     exception.expectCause(isA(IllegalStateException.class));
676     getCompleted(future);
677   }
678 
679   @Test
680   public void combineFutures5_completed() throws Exception {
681     final CompletionStage<String> future = combineFutures(
682         completedFuture("a"),
683         completedFuture("b"),
684         completedFuture("c"),
685         completedFuture("d"),
686         completedFuture("e"),
687         (a, b, c, d, e) -> completedFuture(a + b + c + d + e));
688 
689     assertThat(future, completesTo("abcde"));
690   }
691 
692   @Test
693   public void combineFutures5_incomplete() throws Exception {
694     final CompletionStage<String> future = combineFutures(
695         completedFuture("a"),
696         completedFuture("b"),
697         completedFuture("c"),
698         completedFuture("d"),
699         incompleteFuture(),
700         (a, b, c, d, e) -> completedFuture(a + b + c + d + e));
701 
702     exception.expect(isA(IllegalStateException.class));
703     getCompleted(future);
704   }
705 
706   @Test
707   public void combineFutures5_exceptional() throws Exception {
708     final CompletionStage<String> future = combineFutures(
709         completedFuture("a"),
710         completedFuture("b"),
711         completedFuture("c"),
712         completedFuture("d"),
713         exceptionallyCompletedFuture(new IllegalStateException()),
714         (a, b, c, d, e) -> completedFuture(a + b + c + d + e));
715 
716     exception.expectCause(isA(IllegalStateException.class));
717     getCompleted(future);
718   }
719 
720   @Test
721   public void combineFutures6_completed() throws Exception {
722     final CompletionStage<String> future = combineFutures(
723         completedFuture("a"),
724         completedFuture("b"),
725         completedFuture("c"),
726         completedFuture("d"),
727         completedFuture("e"),
728         completedFuture("f"),
729         (a, b, c, d, e, f) -> completedFuture(a + b + c + d + e + f));
730 
731     assertThat(future, completesTo("abcdef"));
732   }
733 
734   @Test
735   public void combineFutures6_incomplete() throws Exception {
736     final CompletionStage<String> future = combineFutures(
737         completedFuture("a"),
738         completedFuture("b"),
739         completedFuture("c"),
740         completedFuture("d"),
741         completedFuture("e"),
742         incompleteFuture(),
743         (a, b, c, d, e, f) -> completedFuture(a + b + c + d + e + f));
744 
745     exception.expect(isA(IllegalStateException.class));
746     getCompleted(future);
747   }
748 
749   @Test
750   public void combineFutures6_exceptional() throws Exception {
751     final CompletionStage<String> future = combineFutures(
752         completedFuture("a"),
753         completedFuture("b"),
754         completedFuture("c"),
755         completedFuture("d"),
756         completedFuture("e"),
757         exceptionallyCompletedFuture(new IllegalStateException()),
758         (a, b, c, d, e, f) -> completedFuture(a + b + c + d + e + f));
759 
760     exception.expectCause(isA(IllegalStateException.class));
761     getCompleted(future);
762   }
763 
764   @Test
765   public void ctor_preventInstantiation() throws Exception {
766     exception.expect(both(isA(InvocationTargetException.class))
767                          .and(hasProperty("cause", isA(IllegalAccessError.class))));
768 
769     final Constructor<CompletableFutures> ctor = CompletableFutures.class.getDeclaredConstructor();
770     ctor.setAccessible(true);
771     ctor.newInstance();
772   }
773 
774   @Test
775   public void poll_done() throws Exception {
776     final Supplier<Optional<String>> supplier = () -> Optional.of("done");
777     final CompletableFuture<String> future = poll(supplier, Duration.ofMillis(2), executor);
778 
779     executor.runNextPendingCommand();
780     assertThat(future, completesTo("done"));
781   }
782 
783   @Test
784   public void poll_twice() throws Exception {
785     final List<Optional<String>> results = asList(Optional.empty(), Optional.of("done"));
786     final Supplier<Optional<String>> supplier = results.iterator()::next;
787     final CompletableFuture<String> future = poll(supplier, Duration.ofMillis(2), executor);
788 
789     executor.tick(1, MILLISECONDS);
790     assertThat(future.isDone(), is(false));
791 
792     executor.tick(10, MILLISECONDS);
793     assertThat(future, completesTo("done"));
794   }
795 
796   @Test
797   public void poll_taskReturnsNull() throws Exception {
798     final Supplier<Optional<String>> supplier = () -> null;
799     final CompletableFuture<String> future = poll(supplier, Duration.ofMillis(2), executor);
800 
801     executor.runNextPendingCommand();
802     exception.expectCause(isA(NullPointerException.class));
803     future.get();
804   }
805 
806   @Test
807   public void poll_taskThrows() throws Exception {
808     final RuntimeException ex = new RuntimeException("boom");
809     final Supplier<Optional<String>> supplier = () ->  {throw ex;};
810     final CompletableFuture<String> future = poll(supplier, Duration.ofMillis(2), executor);
811 
812     executor.runNextPendingCommand();
813     exception.expectCause(is(ex));
814     future.get();
815   }
816 
817   @Test
818   public void poll_scheduled() throws Exception {
819     final ScheduledExecutorService executor = mock(ScheduledExecutorService.class);
820     final Supplier<Optional<String>> supplier = () -> Optional.of("hello");
821     poll(supplier, Duration.ofMillis(2), executor);
822 
823     verify(executor).scheduleAtFixedRate(any(), eq(0L), eq(2L), eq(MILLISECONDS));
824   }
825 
826   @Test
827   @SuppressWarnings("unchecked")
828   public void poll_resultFutureCanceled() throws Exception {
829     final ScheduledFuture scheduledFuture = mock(ScheduledFuture.class);
830     final ScheduledExecutorService executor = mock(ScheduledExecutorService.class);
831     when(executor.scheduleAtFixedRate(any(), anyLong(), anyLong(), any()))
832         .thenReturn(scheduledFuture);
833 
834     final CompletableFuture<String> future = poll(Optional::empty, Duration.ofMillis(2), executor);
835     future.cancel(true);
836 
837     verify(scheduledFuture).cancel(true);
838   }
839 
840   @Test
841   public void poll_notRunningAfterCancel() throws Exception {
842     final CompletableFuture<String> future = poll(Optional::empty, Duration.ofMillis(2), executor);
843 
844     future.cancel(true);
845 
846     executor.tick(5, MILLISECONDS);
847     assertThat(executor.isIdle(), is(true));
848   }
849 
850   private static <T> CompletableFuture<T> incompleteFuture() {
851     return new CompletableFuture<>();
852   }
853 
854   private static <T> Matcher<CompletionStage<T>> completesTo(final T expected) {
855     return completesTo(is(expected));
856   }
857 
858   private static <T> Matcher<CompletionStage<T>> completesTo(final Matcher<T> expected) {
859     return new CustomTypeSafeMatcher<CompletionStage<T>>("completes to " + String.valueOf(expected)) {
860       @Override
861       protected boolean matchesSafely(CompletionStage<T> item) {
862         try {
863           final T value = item.toCompletableFuture().get(1, SECONDS);
864           return expected.matches(value);
865         } catch (Exception ex) {
866           return false;
867         }
868       }
869     };
870   }
871 
872   private static class NonThrowingFuture<T> extends CompletableFuture<T> {
873     @Override
874     public T join() {
875       if (this.isCompletedExceptionally()) {
876         return null;
877       }
878       return super.join();
879     }
880   }
881 
882 }