View Javadoc
1   /*
2    * Copyright (c) 2016 Spotify AB
3    *
4    * Licensed under the Apache License, Version 2.0 (the "License"); you may not
5    * use this file except in compliance with the License. You may obtain a copy of
6    * the License at
7    *
8    * http://www.apache.org/licenses/LICENSE-2.0
9    *
10   * Unless required by applicable law or agreed to in writing, software
11   * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12   * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13   * License for the specific language governing permissions and limitations under
14   * the License.
15   */
16  package com.spotify.futures;
17  
18  import java.time.Duration;
19  import java.util.ArrayList;
20  import java.util.List;
21  import java.util.Optional;
22  import java.util.concurrent.CancellationException;
23  import java.util.concurrent.CompletableFuture;
24  import java.util.concurrent.CompletionException;
25  import java.util.concurrent.CompletionStage;
26  import java.util.concurrent.ScheduledExecutorService;
27  import java.util.concurrent.ScheduledFuture;
28  import java.util.concurrent.TimeUnit;
29  import java.util.function.BiFunction;
30  import java.util.function.Function;
31  import java.util.function.Supplier;
32  import java.util.stream.Collector;
33  
34  import static java.util.stream.Collectors.collectingAndThen;
35  import static java.util.stream.Collectors.toList;
36  
37  /**
38   * A collection of static utility methods that extend the
39   * {@link java.util.concurrent.CompletableFuture Java completable future} API.
40   *
41   * @since 0.1.0
42   */
43  public final class CompletableFutures {
44  
45    private CompletableFutures() {
46      throw new IllegalAccessError("This class must not be instantiated.");
47    }
48  
49    /**
50     * Returns a new {@link CompletableFuture} which completes to a list of all values of its input
51     * stages, if all succeed.  The list of results is in the same order as the input stages.
52     *
53     * <p> If any of the given stages complete exceptionally, then the returned future also does so,
54     * with a {@link CompletionException} holding this exception as its cause.
55     *
56     * <p> If no stages are provided, returns a future holding an empty list.
57     *
58     * @param stages the stages to combine
59     * @param <T>    the common super-type of all of the input stages, that determines the monomorphic
60     *               type of the output future
61     * @return a future that completes to a list of the results of the supplied stages
62     * @throws NullPointerException if the stages list or any of its elements are {@code null}
63     * @since 0.1.0
64     */
65    public static <T> CompletableFuture<List<T>> allAsList(
66        List<? extends CompletionStage<? extends T>> stages) {
67      // We use traditional for-loops instead of streams here for performance reasons,
68      // see AllAsListBenchmark
69  
70      @SuppressWarnings("unchecked") // generic array creation
71      final CompletableFuture<? extends T>[] all = new CompletableFuture[stages.size()];
72      for (int i = 0; i < stages.size(); i++) {
73        all[i] = stages.get(i).toCompletableFuture();
74      }
75      return CompletableFuture.allOf(all)
76          .thenApply(ignored -> {
77            final List<T> result = new ArrayList<>(all.length);
78            for (int i = 0; i < all.length; i++) {
79              result.add(all[i].join());
80            }
81            return result;
82          });
83    }
84  
85    /**
86     * Returns a new {@link CompletableFuture} which completes to a list of values of those input
87     * stages that succeeded. The list of results is in the same order as the input stages. For failed
88     * stages, the defaultValueMapper will be called, and the value returned from that function will
89     * be put in the resulting list.
90     *
91     * <p>If no stages are provided, returns a future holding an empty list.
92     *
93     * @param stages the stages to combine.
94     * @param defaultValueMapper a function that will be called when a future completes exceptionally
95     * to provide a default value to place in the resulting list
96     * @param <T>    the common type of all of the input stages, that determines the type of the
97     *               output future
98     * @return a future that completes to a list of the results of the supplied stages
99     * @throws NullPointerException if the stages list or any of its elements are {@code null}
100    */
101   public static <T> CompletableFuture<List<T>> successfulAsList(
102       List<? extends CompletionStage<T>> stages,
103       Function<Throwable, ? extends T> defaultValueMapper) {
104     return stages.stream()
105         .map(f -> f.exceptionally(defaultValueMapper))
106         .collect(joinList());
107   }
108 
109   /**
110    * Returns a new {@code CompletableFuture} that is already exceptionally completed with
111    * the given exception.
112    *
113    * @param throwable the exception
114    * @param <T>       an arbitrary type for the returned future; can be anything since the future
115    *                  will be exceptionally completed and thus there will never be a value of type
116    *                  {@code T}
117    * @return a future that exceptionally completed with the supplied exception
118    * @throws NullPointerException if the supplied throwable is {@code null}
119    * @since 0.1.0
120    */
121   public static <T> CompletableFuture<T> exceptionallyCompletedFuture(Throwable throwable) {
122     final CompletableFuture<T> future = new CompletableFuture<>();
123     future.completeExceptionally(throwable);
124     return future;
125   }
126 
127   /**
128    * Collect a stream of {@link CompletionStage}s into a single future holding a list of the
129    * joined entities.
130    *
131    * <p> Usage:
132    *
133    * <pre>{@code
134    * collection.stream()
135    *     .map(this::someAsyncFunc)
136    *     .collect(joinList())
137    *     .thenApply(this::consumeList)
138    * }</pre>
139    *
140    * <p> The generated {@link CompletableFuture} will complete to a list of all entities, in the
141    * order they were encountered in the original stream.  Similar to
142    * {@link CompletableFuture#allOf(CompletableFuture[])}, if any of the input futures complete
143    * exceptionally, then the returned CompletableFuture also does so, with a
144    * {@link CompletionException} holding this exception as its cause.
145    *
146    * @param <T> the common super-type of all of the input stages, that determines the monomorphic
147    *            type of the output future
148    * @param <S> the implementation of {@link CompletionStage} that the stream contains
149    * @return a new {@link CompletableFuture} according to the rules outlined in the method
150    * description
151    * @throws NullPointerException if any future in the stream is {@code null}
152    * @since 0.1.0
153    */
154   public static <T, S extends CompletionStage<? extends T>>
155   Collector<S, ?, CompletableFuture<List<T>>> joinList() {
156     return collectingAndThen(toList(), CompletableFutures::allAsList);
157   }
158 
159   /**
160    * Checks that a stage is completed.
161    *
162    * @param stage the {@link CompletionStage} to check
163    * @throws IllegalStateException if the stage is not completed
164    * @since 0.1.0
165    */
166   public static void checkCompleted(CompletionStage<?> stage) {
167     if (!stage.toCompletableFuture().isDone()) {
168       throw new IllegalStateException("future was not completed");
169     }
170   }
171 
172   /**
173    * Gets the value of a completed stage.
174    *
175    * @param stage a completed {@link CompletionStage}
176    * @param <T>   the type of the value that the stage completes into
177    * @return the value of the stage if it has one
178    * @throws IllegalStateException if the stage is not completed
179    * @since 0.1.0
180    */
181   public static <T> T getCompleted(CompletionStage<T> stage) {
182     CompletableFuture<T> future = stage.toCompletableFuture();
183     checkCompleted(future);
184     return future.join();
185   }
186 
187   /**
188    * Gets the exception from an exceptionally completed future
189    * @param stage an exceptionally completed {@link CompletionStage}
190    * @param <T>   the type of the value that the stage completes into
191    * @return the exception the stage has completed with
192    * @throws IllegalStateException if the stage is not completed exceptionally
193    * @throws CancellationException if the stage was cancelled
194    * @throws UnsupportedOperationException if the {@link CompletionStage} does not
195    * support the {@link CompletionStage#toCompletableFuture()} operation
196    */
197   public static <T> Throwable getException(CompletionStage<T> stage) {
198     CompletableFuture<T> future = stage.toCompletableFuture();
199     if (!future.isCompletedExceptionally()) {
200       throw new IllegalStateException("future was not completed exceptionally");
201     }
202     try {
203       future.join();
204       return null;
205     } catch (CompletionException x) {
206       return x.getCause();
207     }
208   }
209 
210   /**
211    * Returns a new stage that, when this stage completes either normally or exceptionally, is
212    * executed with this stage's result and exception as arguments to the supplied function.
213    *
214    * <p> When this stage is complete, the given function is invoked with the result (or {@code null}
215    * if none) and the exception (or {@code null} if none) of this stage as arguments, and the
216    * function's result is used to complete the returned stage.
217    *
218    * <p> This differs from
219    * {@link java.util.concurrent.CompletionStage#handle(java.util.function.BiFunction)} in that the
220    * function should return a {@link java.util.concurrent.CompletionStage} rather than the value
221    * directly.
222    *
223    * @param stage the {@link CompletionStage} to compose
224    * @param fn    the function to use to compute the value of the
225    *              returned {@link CompletionStage}
226    * @param <T>   the type of the input stage's value.
227    * @param <U>   the function's return type
228    * @return the new {@link CompletionStage}
229    * @since 0.1.0
230    */
231   public static <T, U> CompletionStage<U> handleCompose(
232       CompletionStage<T> stage,
233       BiFunction<? super T, Throwable, ? extends CompletionStage<U>> fn) {
234     return dereference(stage.handle(fn));
235   }
236 
237   /**
238    * Returns a new stage that, when this stage completes
239    * exceptionally, is executed with this stage's exception as the
240    * argument to the supplied function.  Otherwise, if this stage
241    * completes normally, then the returned stage also completes
242    * normally with the same value.
243    *
244    * <p>This differs from
245    * {@link java.util.concurrent.CompletionStage#exceptionally(java.util.function.Function)}
246    * in that the function should return a {@link java.util.concurrent.CompletionStage} rather than
247    * the value directly.
248    *
249    * @param stage the {@link CompletionStage} to compose
250    * @param fn    the function to use to compute the value of the
251    *              returned {@link CompletionStage} if this stage completed
252    *              exceptionally
253    * @param <T>   the type of the input stage's value.
254    * @return the new {@link CompletionStage}
255    * @since 0.1.0
256    */
257   public static <T> CompletionStage<T> exceptionallyCompose(
258       CompletionStage<T> stage,
259       Function<Throwable, ? extends CompletionStage<T>> fn) {
260     return dereference(wrap(stage).exceptionally(fn));
261   }
262 
263   /**
264    * This takes a stage of a stage of a value and returns a plain stage of a value.
265    *
266    * @param stage a {@link CompletionStage} of a {@link CompletionStage} of a value
267    * @param <T>   the type of the inner stage's value.
268    * @return the {@link CompletionStage} of the value
269    * @since 0.1.0
270    */
271   public static <T> CompletionStage<T> dereference(
272       CompletionStage<? extends CompletionStage<T>> stage) {
273     return stage.thenCompose(Function.identity());
274   }
275 
276   private static <T> CompletionStage<CompletionStage<T>> wrap(CompletionStage<T> future) {
277     //noinspection unchecked
278     return future.thenApply(CompletableFuture::completedFuture);
279   }
280 
281   /**
282    * Combines multiple stages by applying a function.
283    *
284    * @param a        the first stage.
285    * @param b        the second stage.
286    * @param function the combining function.
287    * @param <R>      the type of the combining function's return value.
288    * @param <A>      the type of the first stage's value.
289    * @param <B>      the type of the second stage's value.
290    * @return a stage that completes into the return value of the supplied function.
291    * @since 0.1.0
292    */
293   public static <R, A, B> CompletionStage<R> combine(
294       CompletionStage<A> a, CompletionStage<B> b,
295       BiFunction<A, B, R> function) {
296     return a.thenCombine(b, function);
297   }
298 
299   /**
300    * Combines multiple stages by applying a function.
301    *
302    * @param a        the first stage.
303    * @param b        the second stage.
304    * @param c        the third stage.
305    * @param function the combining function.
306    * @param <R>      the type of the combining function's return value.
307    * @param <A>      the type of the first stage's value.
308    * @param <B>      the type of the second stage's value.
309    * @param <C>      the type of the third stage's value.
310    * @return a stage that completes into the return value of the supplied function.
311    * @since 0.1.0
312    */
313   public static <R, A, B, C> CompletionStage<R> combine(
314       CompletionStage<A> a, CompletionStage<B> b, CompletionStage<C> c,
315       Function3<A, B, C, R> function) {
316     final CompletableFuture<A> af = a.toCompletableFuture();
317     final CompletableFuture<B> bf = b.toCompletableFuture();
318     final CompletableFuture<C> cf = c.toCompletableFuture();
319 
320     return CompletableFuture.allOf(af, bf, cf)
321         .thenApply(ignored -> function.apply(af.join(), bf.join(), cf.join()));
322   }
323 
324   /**
325    * Combines multiple stages by applying a function.
326    *
327    * @param a        the first stage.
328    * @param b        the second stage.
329    * @param c        the third stage.
330    * @param d        the fourth stage.
331    * @param function the combining function.
332    * @param <R>      the type of the combining function's return value.
333    * @param <A>      the type of the first stage's value.
334    * @param <B>      the type of the second stage's value.
335    * @param <C>      the type of the third stage's value.
336    * @param <D>      the type of the fourth stage's value.
337    * @return a stage that completes into the return value of the supplied function.
338    * @since 0.1.0
339    */
340   public static <R, A, B, C, D> CompletionStage<R> combine(
341       CompletionStage<A> a, CompletionStage<B> b, CompletionStage<C> c, CompletionStage<D> d,
342       Function4<A, B, C, D, R> function) {
343     final CompletableFuture<A> af = a.toCompletableFuture();
344     final CompletableFuture<B> bf = b.toCompletableFuture();
345     final CompletableFuture<C> cf = c.toCompletableFuture();
346     final CompletableFuture<D> df = d.toCompletableFuture();
347 
348     return CompletableFuture.allOf(af, bf, cf, df)
349         .thenApply(ignored -> function.apply(af.join(), bf.join(), cf.join(), df.join()));
350   }
351 
352   /**
353    * Combines multiple stages by applying a function.
354    *
355    * @param a        the first stage.
356    * @param b        the second stage.
357    * @param c        the third stage.
358    * @param d        the fourth stage.
359    * @param e        the fifth stage.
360    * @param function the combining function.
361    * @param <R>      the type of the combining function's return value.
362    * @param <A>      the type of the first stage's value.
363    * @param <B>      the type of the second stage's value.
364    * @param <C>      the type of the third stage's value.
365    * @param <D>      the type of the fourth stage's value.
366    * @param <E>      the type of the fifth stage's value.
367    * @return a stage that completes into the return value of the supplied function.
368    * @since 0.1.0
369    */
370   public static <R, A, B, C, D, E> CompletionStage<R> combine(
371       CompletionStage<A> a, CompletionStage<B> b, CompletionStage<C> c,
372       CompletionStage<D> d, CompletionStage<E> e,
373       Function5<A, B, C, D, E, R> function) {
374     final CompletableFuture<A> af = a.toCompletableFuture();
375     final CompletableFuture<B> bf = b.toCompletableFuture();
376     final CompletableFuture<C> cf = c.toCompletableFuture();
377     final CompletableFuture<D> df = d.toCompletableFuture();
378     final CompletableFuture<E> ef = e.toCompletableFuture();
379 
380     return CompletableFuture.allOf(af, bf, cf, df, ef)
381         .thenApply(ignored ->
382                        function.apply(af.join(), bf.join(), cf.join(), df.join(), ef.join()));
383   }
384 
385   /**
386    * Combines multiple stages by applying a function.
387    *
388    * @param a        the first stage.
389    * @param b        the second stage.
390    * @param c        the third stage.
391    * @param d        the fourth stage.
392    * @param e        the fifth stage.
393    * @param f        the sixth stage.
394    * @param function the combining function.
395    * @param <R>      the type of the combining function's return value.
396    * @param <A>      the type of the first stage's value.
397    * @param <B>      the type of the second stage's value.
398    * @param <C>      the type of the third stage's value.
399    * @param <D>      the type of the fourth stage's value.
400    * @param <E>      the type of the fifth stage's value.
401    * @param <F>      the type of the sixth stage's value.
402    * @return a stage that completes into the return value of the supplied function.
403    * @since 0.3.2
404    */
405   public static <R, A, B, C, D, E, F> CompletionStage<R> combine(
406       CompletionStage<A> a, CompletionStage<B> b, CompletionStage<C> c,
407       CompletionStage<D> d, CompletionStage<E> e, CompletionStage<F> f,
408       Function6<A, B, C, D, E, F, R> function) {
409     final CompletableFuture<A> af = a.toCompletableFuture();
410     final CompletableFuture<B> bf = b.toCompletableFuture();
411     final CompletableFuture<C> cf = c.toCompletableFuture();
412     final CompletableFuture<D> df = d.toCompletableFuture();
413     final CompletableFuture<E> ef = e.toCompletableFuture();
414     final CompletableFuture<F> ff = f.toCompletableFuture();
415 
416     return CompletableFuture.allOf(af, bf, cf, df, ef, ff)
417         .thenApply(ignored ->
418                        function.apply(af.join(),
419                                       bf.join(),
420                                       cf.join(),
421                                       df.join(),
422                                       ef.join(),
423                                       ff.join()));
424   }
425 
426   /**
427    * Composes multiple stages into another stage using a function.
428    *
429    * @param a        the first stage.
430    * @param b        the second stage.
431    * @param function the combining function.
432    * @param <R>      the type of the composed {@link CompletionStage}.
433    * @param <A>      the type of the first stage's value.
434    * @param <B>      the type of the second stage's value.
435    * @return a stage that is composed from the input stages using the function.
436    * @throws UnsupportedOperationException if any of the {@link CompletionStage}s
437    * do not interoperate with CompletableFuture
438    */
439   public static <R, A, B> CompletionStage<R> combineFutures(
440       CompletionStage<A> a,
441       CompletionStage<B> b,
442       BiFunction<A, B, CompletionStage<R>> function) {
443     final CompletableFuture<A> af = a.toCompletableFuture();
444     final CompletableFuture<B> bf = b.toCompletableFuture();
445 
446     return CompletableFuture.allOf(af, bf)
447         .thenCompose(ignored -> function.apply(af.join(), bf.join()));
448   }
449 
450   /**
451    * Composes multiple stages into another stage using a function.
452    *
453    * @param a        the first stage.
454    * @param b        the second stage.
455    * @param c        the third stage.
456    * @param function the combining function.
457    * @param <R>      the type of the composed {@link CompletionStage}.
458    * @param <A>      the type of the first stage's value.
459    * @param <B>      the type of the second stage's value.
460    * @param <C>      the type of the third stage's value.
461    * @return a stage that is composed from the input stages using the function.
462    * @throws UnsupportedOperationException if any of the {@link CompletionStage}s
463    * do not interoperate with CompletableFuture
464    */
465   public static <R, A, B, C> CompletionStage<R> combineFutures(
466       CompletionStage<A> a,
467       CompletionStage<B> b,
468       CompletionStage<C> c,
469       Function3<A, B, C, CompletionStage<R>> function) {
470     final CompletableFuture<A> af = a.toCompletableFuture();
471     final CompletableFuture<B> bf = b.toCompletableFuture();
472     final CompletableFuture<C> cf = c.toCompletableFuture();
473 
474     return CompletableFuture.allOf(af, bf, cf)
475         .thenCompose(ignored -> function.apply(af.join(),
476                                                bf.join(),
477                                                cf.join()));
478   }
479 
480   /**
481    * Composes multiple stages into another stage using a function.
482    *
483    * @param a        the first stage.
484    * @param b        the second stage.
485    * @param c        the third stage.
486    * @param d        the fourth stage.
487    * @param function the combining function.
488    * @param <R>      the type of the composed {@link CompletionStage}.
489    * @param <A>      the type of the first stage's value.
490    * @param <B>      the type of the second stage's value.
491    * @param <C>      the type of the third stage's value.
492    * @param <D>      the type of the fourth stage's value.
493    * @return a stage that is composed from the input stages using the function.
494    * @throws UnsupportedOperationException if any of the {@link CompletionStage}s
495    * do not interoperate with CompletableFuture
496    */
497   public static <R, A, B, C, D> CompletionStage<R> combineFutures(
498       CompletionStage<A> a,
499       CompletionStage<B> b,
500       CompletionStage<C> c,
501       CompletionStage<D> d,
502       Function4<A, B, C, D, CompletionStage<R>> function) {
503     final CompletableFuture<A> af = a.toCompletableFuture();
504     final CompletableFuture<B> bf = b.toCompletableFuture();
505     final CompletableFuture<C> cf = c.toCompletableFuture();
506     final CompletableFuture<D> df = d.toCompletableFuture();
507 
508     return CompletableFuture.allOf(af, bf, cf, df)
509         .thenCompose(ignored -> function.apply(af.join(), bf.join(), cf.join(), df.join()));
510   }
511 
512   /**
513    * Composes multiple stages into another stage using a function.
514    *
515    * @param a        the first stage.
516    * @param b        the second stage.
517    * @param c        the third stage.
518    * @param d        the fourth stage.
519    * @param e        the fifth stage.
520    * @param function the combining function.
521    * @param <R>      the type of the composed {@link CompletionStage}.
522    * @param <A>      the type of the first stage's value.
523    * @param <B>      the type of the second stage's value.
524    * @param <C>      the type of the third stage's value.
525    * @param <D>      the type of the fourth stage's value.
526    * @param <E>      the type of the fifth stage's value.
527    * @return a stage that is composed from the input stages using the function.
528    * @throws UnsupportedOperationException if any of the {@link CompletionStage}s
529    * do not interoperate with CompletableFuture
530    */
531   public static <R, A, B, C, D, E> CompletionStage<R> combineFutures(
532       CompletionStage<A> a,
533       CompletionStage<B> b,
534       CompletionStage<C> c,
535       CompletionStage<D> d,
536       CompletionStage<E> e,
537       Function5<A, B, C, D, E, CompletionStage<R>> function) {
538     final CompletableFuture<A> af = a.toCompletableFuture();
539     final CompletableFuture<B> bf = b.toCompletableFuture();
540     final CompletableFuture<C> cf = c.toCompletableFuture();
541     final CompletableFuture<D> df = d.toCompletableFuture();
542     final CompletableFuture<E> ef = e.toCompletableFuture();
543 
544     return CompletableFuture.allOf(af, bf, cf, df, ef)
545         .thenCompose(ignored -> function.apply(af.join(),
546                                                bf.join(),
547                                                cf.join(),
548                                                df.join(),
549                                                ef.join()));
550   }
551 
552   /**
553    * Composes multiple stages into another stage using a function.
554    *
555    * @param a        the first stage.
556    * @param b        the second stage.
557    * @param c        the third stage.
558    * @param d        the fourth stage.
559    * @param e        the fifth stage.
560    * @param f        the sixth stage.
561    * @param function the combining function.
562    * @param <R>      the type of the composed {@link CompletionStage}.
563    * @param <A>      the type of the first stage's value.
564    * @param <B>      the type of the second stage's value.
565    * @param <C>      the type of the third stage's value.
566    * @param <D>      the type of the fourth stage's value.
567    * @param <E>      the type of the fifth stage's value.
568    * @param <F>      the type of the sixth stage's value.
569    * @return a stage that is composed from the input stages using the function.
570    * @throws UnsupportedOperationException if any of the {@link CompletionStage}s
571    * do not interoperate with CompletableFuture
572    */
573   public static <R, A, B, C, D, E, F> CompletionStage<R> combineFutures(
574       CompletionStage<A> a,
575       CompletionStage<B> b,
576       CompletionStage<C> c,
577       CompletionStage<D> d,
578       CompletionStage<E> e,
579       CompletionStage<F> f,
580       Function6<A, B, C, D, E, F, CompletionStage<R>> function) {
581     final CompletableFuture<A> af = a.toCompletableFuture();
582     final CompletableFuture<B> bf = b.toCompletableFuture();
583     final CompletableFuture<C> cf = c.toCompletableFuture();
584     final CompletableFuture<D> df = d.toCompletableFuture();
585     final CompletableFuture<E> ef = e.toCompletableFuture();
586     final CompletableFuture<F> ff = f.toCompletableFuture();
587 
588     return CompletableFuture.allOf(af, bf, cf, df, ef, ff)
589         .thenCompose(ignored -> function.apply(af.join(),
590                                                bf.join(),
591                                                cf.join(),
592                                                df.join(),
593                                                ef.join(),
594                                                ff.join()));
595   }
596 
597   /**
598    * Polls an external resource periodically until it returns a non-empty result.
599    *
600    * <p> The polling task should return {@code Optional.empty()} until it becomes available, and
601    * then {@code Optional.of(result)}.  If the polling task throws an exception or returns null,
602    * that will cause the result future to complete exceptionally.
603    *
604    * <p> Canceling the returned future will cancel the scheduled polling task as well.
605    *
606    * <p> Note that on a ScheduledThreadPoolExecutor the polling task might remain allocated for up
607    * to {@code frequency} time after completing or being cancelled.  If you have lots of polling
608    * operations or a long polling frequency, consider setting {@code removeOnCancelPolicy} to true.
609    * See {@link java.util.concurrent.ScheduledThreadPoolExecutor#setRemoveOnCancelPolicy(boolean)}.
610    *
611    * @param pollingTask     the polling task
612    * @param frequency       the frequency to run the polling task at
613    * @param executorService the executor service to schedule the polling task on
614    * @param <T>             the type of the result of the polling task, that will be returned when
615    *                        the task succeeds.
616    * @return a future completing to the result of the polling task once that becomes available
617    */
618   public static <T> CompletableFuture<T> poll(
619       final Supplier<Optional<T>> pollingTask,
620       final Duration frequency,
621       final ScheduledExecutorService executorService) {
622     final CompletableFuture<T> result = new CompletableFuture<>();
623     final ScheduledFuture<?> scheduled = executorService.scheduleAtFixedRate(
624         () -> pollTask(pollingTask, result), 0, frequency.toMillis(), TimeUnit.MILLISECONDS);
625     result.whenComplete((r, ex) -> scheduled.cancel(true));
626     return result;
627   }
628 
629   private static <T> void pollTask(
630       final Supplier<Optional<T>> pollingTask,
631       final CompletableFuture<T> resultFuture) {
632     try {
633       pollingTask.get().ifPresent(resultFuture::complete);
634     } catch (Exception ex) {
635       resultFuture.completeExceptionally(ex);
636     }
637   }
638 
639 }