1 /* 2 * Copyright (c) 2016 Spotify AB 3 * 4 * Licensed under the Apache License, Version 2.0 (the "License"); you may not 5 * use this file except in compliance with the License. You may obtain a copy of 6 * the License at 7 * 8 * http://www.apache.org/licenses/LICENSE-2.0 9 * 10 * Unless required by applicable law or agreed to in writing, software 11 * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT 12 * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the 13 * License for the specific language governing permissions and limitations under 14 * the License. 15 */ 16 package com.spotify.futures; 17 18 import java.time.Duration; 19 import java.util.ArrayList; 20 import java.util.List; 21 import java.util.Optional; 22 import java.util.concurrent.CancellationException; 23 import java.util.concurrent.CompletableFuture; 24 import java.util.concurrent.CompletionException; 25 import java.util.concurrent.CompletionStage; 26 import java.util.concurrent.ScheduledExecutorService; 27 import java.util.concurrent.ScheduledFuture; 28 import java.util.concurrent.TimeUnit; 29 import java.util.function.BiFunction; 30 import java.util.function.Function; 31 import java.util.function.Supplier; 32 import java.util.stream.Collector; 33 34 import static java.util.stream.Collectors.collectingAndThen; 35 import static java.util.stream.Collectors.toList; 36 37 /** 38 * A collection of static utility methods that extend the 39 * {@link java.util.concurrent.CompletableFuture Java completable future} API. 40 * 41 * @since 0.1.0 42 */ 43 public final class CompletableFutures { 44 45 private CompletableFutures() { 46 throw new IllegalAccessError("This class must not be instantiated."); 47 } 48 49 /** 50 * Returns a new {@link CompletableFuture} which completes to a list of all values of its input 51 * stages, if all succeed. The list of results is in the same order as the input stages. 52 * 53 * <p> If any of the given stages complete exceptionally, then the returned future also does so, 54 * with a {@link CompletionException} holding this exception as its cause. 55 * 56 * <p> If no stages are provided, returns a future holding an empty list. 57 * 58 * @param stages the stages to combine 59 * @param <T> the common super-type of all of the input stages, that determines the monomorphic 60 * type of the output future 61 * @return a future that completes to a list of the results of the supplied stages 62 * @throws NullPointerException if the stages list or any of its elements are {@code null} 63 * @since 0.1.0 64 */ 65 public static <T> CompletableFuture<List<T>> allAsList( 66 List<? extends CompletionStage<? extends T>> stages) { 67 // We use traditional for-loops instead of streams here for performance reasons, 68 // see AllAsListBenchmark 69 70 @SuppressWarnings("unchecked") // generic array creation 71 final CompletableFuture<? extends T>[] all = new CompletableFuture[stages.size()]; 72 for (int i = 0; i < stages.size(); i++) { 73 all[i] = stages.get(i).toCompletableFuture(); 74 } 75 return CompletableFuture.allOf(all) 76 .thenApply(ignored -> { 77 final List<T> result = new ArrayList<>(all.length); 78 for (int i = 0; i < all.length; i++) { 79 result.add(all[i].join()); 80 } 81 return result; 82 }); 83 } 84 85 /** 86 * Returns a new {@link CompletableFuture} which completes to a list of values of those input 87 * stages that succeeded. The list of results is in the same order as the input stages. For failed 88 * stages, the defaultValueMapper will be called, and the value returned from that function will 89 * be put in the resulting list. 90 * 91 * <p>If no stages are provided, returns a future holding an empty list. 92 * 93 * @param stages the stages to combine. 94 * @param defaultValueMapper a function that will be called when a future completes exceptionally 95 * to provide a default value to place in the resulting list 96 * @param <T> the common type of all of the input stages, that determines the type of the 97 * output future 98 * @return a future that completes to a list of the results of the supplied stages 99 * @throws NullPointerException if the stages list or any of its elements are {@code null} 100 */ 101 public static <T> CompletableFuture<List<T>> successfulAsList( 102 List<? extends CompletionStage<T>> stages, 103 Function<Throwable, ? extends T> defaultValueMapper) { 104 return stages.stream() 105 .map(f -> f.exceptionally(defaultValueMapper)) 106 .collect(joinList()); 107 } 108 109 /** 110 * Returns a new {@code CompletableFuture} that is already exceptionally completed with 111 * the given exception. 112 * 113 * @param throwable the exception 114 * @param <T> an arbitrary type for the returned future; can be anything since the future 115 * will be exceptionally completed and thus there will never be a value of type 116 * {@code T} 117 * @return a future that exceptionally completed with the supplied exception 118 * @throws NullPointerException if the supplied throwable is {@code null} 119 * @since 0.1.0 120 */ 121 public static <T> CompletableFuture<T> exceptionallyCompletedFuture(Throwable throwable) { 122 final CompletableFuture<T> future = new CompletableFuture<>(); 123 future.completeExceptionally(throwable); 124 return future; 125 } 126 127 /** 128 * Collect a stream of {@link CompletionStage}s into a single future holding a list of the 129 * joined entities. 130 * 131 * <p> Usage: 132 * 133 * <pre>{@code 134 * collection.stream() 135 * .map(this::someAsyncFunc) 136 * .collect(joinList()) 137 * .thenApply(this::consumeList) 138 * }</pre> 139 * 140 * <p> The generated {@link CompletableFuture} will complete to a list of all entities, in the 141 * order they were encountered in the original stream. Similar to 142 * {@link CompletableFuture#allOf(CompletableFuture[])}, if any of the input futures complete 143 * exceptionally, then the returned CompletableFuture also does so, with a 144 * {@link CompletionException} holding this exception as its cause. 145 * 146 * @param <T> the common super-type of all of the input stages, that determines the monomorphic 147 * type of the output future 148 * @param <S> the implementation of {@link CompletionStage} that the stream contains 149 * @return a new {@link CompletableFuture} according to the rules outlined in the method 150 * description 151 * @throws NullPointerException if any future in the stream is {@code null} 152 * @since 0.1.0 153 */ 154 public static <T, S extends CompletionStage<? extends T>> 155 Collector<S, ?, CompletableFuture<List<T>>> joinList() { 156 return collectingAndThen(toList(), CompletableFutures::allAsList); 157 } 158 159 /** 160 * Checks that a stage is completed. 161 * 162 * @param stage the {@link CompletionStage} to check 163 * @throws IllegalStateException if the stage is not completed 164 * @since 0.1.0 165 */ 166 public static void checkCompleted(CompletionStage<?> stage) { 167 if (!stage.toCompletableFuture().isDone()) { 168 throw new IllegalStateException("future was not completed"); 169 } 170 } 171 172 /** 173 * Gets the value of a completed stage. 174 * 175 * @param stage a completed {@link CompletionStage} 176 * @param <T> the type of the value that the stage completes into 177 * @return the value of the stage if it has one 178 * @throws IllegalStateException if the stage is not completed 179 * @since 0.1.0 180 */ 181 public static <T> T getCompleted(CompletionStage<T> stage) { 182 CompletableFuture<T> future = stage.toCompletableFuture(); 183 checkCompleted(future); 184 return future.join(); 185 } 186 187 /** 188 * Gets the exception from an exceptionally completed future 189 * @param stage an exceptionally completed {@link CompletionStage} 190 * @param <T> the type of the value that the stage completes into 191 * @return the exception the stage has completed with 192 * @throws IllegalStateException if the stage is not completed exceptionally 193 * @throws CancellationException if the stage was cancelled 194 * @throws UnsupportedOperationException if the {@link CompletionStage} does not 195 * support the {@link CompletionStage#toCompletableFuture()} operation 196 */ 197 public static <T> Throwable getException(CompletionStage<T> stage) { 198 CompletableFuture<T> future = stage.toCompletableFuture(); 199 if (!future.isCompletedExceptionally()) { 200 throw new IllegalStateException("future was not completed exceptionally"); 201 } 202 try { 203 future.join(); 204 return null; 205 } catch (CompletionException x) { 206 return x.getCause(); 207 } 208 } 209 210 /** 211 * Returns a new stage that, when this stage completes either normally or exceptionally, is 212 * executed with this stage's result and exception as arguments to the supplied function. 213 * 214 * <p> When this stage is complete, the given function is invoked with the result (or {@code null} 215 * if none) and the exception (or {@code null} if none) of this stage as arguments, and the 216 * function's result is used to complete the returned stage. 217 * 218 * <p> This differs from 219 * {@link java.util.concurrent.CompletionStage#handle(java.util.function.BiFunction)} in that the 220 * function should return a {@link java.util.concurrent.CompletionStage} rather than the value 221 * directly. 222 * 223 * @param stage the {@link CompletionStage} to compose 224 * @param fn the function to use to compute the value of the 225 * returned {@link CompletionStage} 226 * @param <T> the type of the input stage's value. 227 * @param <U> the function's return type 228 * @return the new {@link CompletionStage} 229 * @since 0.1.0 230 */ 231 public static <T, U> CompletionStage<U> handleCompose( 232 CompletionStage<T> stage, 233 BiFunction<? super T, Throwable, ? extends CompletionStage<U>> fn) { 234 return dereference(stage.handle(fn)); 235 } 236 237 /** 238 * Returns a new stage that, when this stage completes 239 * exceptionally, is executed with this stage's exception as the 240 * argument to the supplied function. Otherwise, if this stage 241 * completes normally, then the returned stage also completes 242 * normally with the same value. 243 * 244 * <p>This differs from 245 * {@link java.util.concurrent.CompletionStage#exceptionally(java.util.function.Function)} 246 * in that the function should return a {@link java.util.concurrent.CompletionStage} rather than 247 * the value directly. 248 * 249 * @param stage the {@link CompletionStage} to compose 250 * @param fn the function to use to compute the value of the 251 * returned {@link CompletionStage} if this stage completed 252 * exceptionally 253 * @param <T> the type of the input stage's value. 254 * @return the new {@link CompletionStage} 255 * @since 0.1.0 256 */ 257 public static <T> CompletionStage<T> exceptionallyCompose( 258 CompletionStage<T> stage, 259 Function<Throwable, ? extends CompletionStage<T>> fn) { 260 return dereference(wrap(stage).exceptionally(fn)); 261 } 262 263 /** 264 * This takes a stage of a stage of a value and returns a plain stage of a value. 265 * 266 * @param stage a {@link CompletionStage} of a {@link CompletionStage} of a value 267 * @param <T> the type of the inner stage's value. 268 * @return the {@link CompletionStage} of the value 269 * @since 0.1.0 270 */ 271 public static <T> CompletionStage<T> dereference( 272 CompletionStage<? extends CompletionStage<T>> stage) { 273 return stage.thenCompose(Function.identity()); 274 } 275 276 private static <T> CompletionStage<CompletionStage<T>> wrap(CompletionStage<T> future) { 277 //noinspection unchecked 278 return future.thenApply(CompletableFuture::completedFuture); 279 } 280 281 /** 282 * Combines multiple stages by applying a function. 283 * 284 * @param a the first stage. 285 * @param b the second stage. 286 * @param function the combining function. 287 * @param <R> the type of the combining function's return value. 288 * @param <A> the type of the first stage's value. 289 * @param <B> the type of the second stage's value. 290 * @return a stage that completes into the return value of the supplied function. 291 * @since 0.1.0 292 */ 293 public static <R, A, B> CompletionStage<R> combine( 294 CompletionStage<A> a, CompletionStage<B> b, 295 BiFunction<A, B, R> function) { 296 return a.thenCombine(b, function); 297 } 298 299 /** 300 * Combines multiple stages by applying a function. 301 * 302 * @param a the first stage. 303 * @param b the second stage. 304 * @param c the third stage. 305 * @param function the combining function. 306 * @param <R> the type of the combining function's return value. 307 * @param <A> the type of the first stage's value. 308 * @param <B> the type of the second stage's value. 309 * @param <C> the type of the third stage's value. 310 * @return a stage that completes into the return value of the supplied function. 311 * @since 0.1.0 312 */ 313 public static <R, A, B, C> CompletionStage<R> combine( 314 CompletionStage<A> a, CompletionStage<B> b, CompletionStage<C> c, 315 Function3<A, B, C, R> function) { 316 final CompletableFuture<A> af = a.toCompletableFuture(); 317 final CompletableFuture<B> bf = b.toCompletableFuture(); 318 final CompletableFuture<C> cf = c.toCompletableFuture(); 319 320 return CompletableFuture.allOf(af, bf, cf) 321 .thenApply(ignored -> function.apply(af.join(), bf.join(), cf.join())); 322 } 323 324 /** 325 * Combines multiple stages by applying a function. 326 * 327 * @param a the first stage. 328 * @param b the second stage. 329 * @param c the third stage. 330 * @param d the fourth stage. 331 * @param function the combining function. 332 * @param <R> the type of the combining function's return value. 333 * @param <A> the type of the first stage's value. 334 * @param <B> the type of the second stage's value. 335 * @param <C> the type of the third stage's value. 336 * @param <D> the type of the fourth stage's value. 337 * @return a stage that completes into the return value of the supplied function. 338 * @since 0.1.0 339 */ 340 public static <R, A, B, C, D> CompletionStage<R> combine( 341 CompletionStage<A> a, CompletionStage<B> b, CompletionStage<C> c, CompletionStage<D> d, 342 Function4<A, B, C, D, R> function) { 343 final CompletableFuture<A> af = a.toCompletableFuture(); 344 final CompletableFuture<B> bf = b.toCompletableFuture(); 345 final CompletableFuture<C> cf = c.toCompletableFuture(); 346 final CompletableFuture<D> df = d.toCompletableFuture(); 347 348 return CompletableFuture.allOf(af, bf, cf, df) 349 .thenApply(ignored -> function.apply(af.join(), bf.join(), cf.join(), df.join())); 350 } 351 352 /** 353 * Combines multiple stages by applying a function. 354 * 355 * @param a the first stage. 356 * @param b the second stage. 357 * @param c the third stage. 358 * @param d the fourth stage. 359 * @param e the fifth stage. 360 * @param function the combining function. 361 * @param <R> the type of the combining function's return value. 362 * @param <A> the type of the first stage's value. 363 * @param <B> the type of the second stage's value. 364 * @param <C> the type of the third stage's value. 365 * @param <D> the type of the fourth stage's value. 366 * @param <E> the type of the fifth stage's value. 367 * @return a stage that completes into the return value of the supplied function. 368 * @since 0.1.0 369 */ 370 public static <R, A, B, C, D, E> CompletionStage<R> combine( 371 CompletionStage<A> a, CompletionStage<B> b, CompletionStage<C> c, 372 CompletionStage<D> d, CompletionStage<E> e, 373 Function5<A, B, C, D, E, R> function) { 374 final CompletableFuture<A> af = a.toCompletableFuture(); 375 final CompletableFuture<B> bf = b.toCompletableFuture(); 376 final CompletableFuture<C> cf = c.toCompletableFuture(); 377 final CompletableFuture<D> df = d.toCompletableFuture(); 378 final CompletableFuture<E> ef = e.toCompletableFuture(); 379 380 return CompletableFuture.allOf(af, bf, cf, df, ef) 381 .thenApply(ignored -> 382 function.apply(af.join(), bf.join(), cf.join(), df.join(), ef.join())); 383 } 384 385 /** 386 * Combines multiple stages by applying a function. 387 * 388 * @param a the first stage. 389 * @param b the second stage. 390 * @param c the third stage. 391 * @param d the fourth stage. 392 * @param e the fifth stage. 393 * @param f the sixth stage. 394 * @param function the combining function. 395 * @param <R> the type of the combining function's return value. 396 * @param <A> the type of the first stage's value. 397 * @param <B> the type of the second stage's value. 398 * @param <C> the type of the third stage's value. 399 * @param <D> the type of the fourth stage's value. 400 * @param <E> the type of the fifth stage's value. 401 * @param <F> the type of the sixth stage's value. 402 * @return a stage that completes into the return value of the supplied function. 403 * @since 0.3.2 404 */ 405 public static <R, A, B, C, D, E, F> CompletionStage<R> combine( 406 CompletionStage<A> a, CompletionStage<B> b, CompletionStage<C> c, 407 CompletionStage<D> d, CompletionStage<E> e, CompletionStage<F> f, 408 Function6<A, B, C, D, E, F, R> function) { 409 final CompletableFuture<A> af = a.toCompletableFuture(); 410 final CompletableFuture<B> bf = b.toCompletableFuture(); 411 final CompletableFuture<C> cf = c.toCompletableFuture(); 412 final CompletableFuture<D> df = d.toCompletableFuture(); 413 final CompletableFuture<E> ef = e.toCompletableFuture(); 414 final CompletableFuture<F> ff = f.toCompletableFuture(); 415 416 return CompletableFuture.allOf(af, bf, cf, df, ef, ff) 417 .thenApply(ignored -> 418 function.apply(af.join(), 419 bf.join(), 420 cf.join(), 421 df.join(), 422 ef.join(), 423 ff.join())); 424 } 425 426 /** 427 * Composes multiple stages into another stage using a function. 428 * 429 * @param a the first stage. 430 * @param b the second stage. 431 * @param function the combining function. 432 * @param <R> the type of the composed {@link CompletionStage}. 433 * @param <A> the type of the first stage's value. 434 * @param <B> the type of the second stage's value. 435 * @return a stage that is composed from the input stages using the function. 436 * @throws UnsupportedOperationException if any of the {@link CompletionStage}s 437 * do not interoperate with CompletableFuture 438 */ 439 public static <R, A, B> CompletionStage<R> combineFutures( 440 CompletionStage<A> a, 441 CompletionStage<B> b, 442 BiFunction<A, B, CompletionStage<R>> function) { 443 final CompletableFuture<A> af = a.toCompletableFuture(); 444 final CompletableFuture<B> bf = b.toCompletableFuture(); 445 446 return CompletableFuture.allOf(af, bf) 447 .thenCompose(ignored -> function.apply(af.join(), bf.join())); 448 } 449 450 /** 451 * Composes multiple stages into another stage using a function. 452 * 453 * @param a the first stage. 454 * @param b the second stage. 455 * @param c the third stage. 456 * @param function the combining function. 457 * @param <R> the type of the composed {@link CompletionStage}. 458 * @param <A> the type of the first stage's value. 459 * @param <B> the type of the second stage's value. 460 * @param <C> the type of the third stage's value. 461 * @return a stage that is composed from the input stages using the function. 462 * @throws UnsupportedOperationException if any of the {@link CompletionStage}s 463 * do not interoperate with CompletableFuture 464 */ 465 public static <R, A, B, C> CompletionStage<R> combineFutures( 466 CompletionStage<A> a, 467 CompletionStage<B> b, 468 CompletionStage<C> c, 469 Function3<A, B, C, CompletionStage<R>> function) { 470 final CompletableFuture<A> af = a.toCompletableFuture(); 471 final CompletableFuture<B> bf = b.toCompletableFuture(); 472 final CompletableFuture<C> cf = c.toCompletableFuture(); 473 474 return CompletableFuture.allOf(af, bf, cf) 475 .thenCompose(ignored -> function.apply(af.join(), 476 bf.join(), 477 cf.join())); 478 } 479 480 /** 481 * Composes multiple stages into another stage using a function. 482 * 483 * @param a the first stage. 484 * @param b the second stage. 485 * @param c the third stage. 486 * @param d the fourth stage. 487 * @param function the combining function. 488 * @param <R> the type of the composed {@link CompletionStage}. 489 * @param <A> the type of the first stage's value. 490 * @param <B> the type of the second stage's value. 491 * @param <C> the type of the third stage's value. 492 * @param <D> the type of the fourth stage's value. 493 * @return a stage that is composed from the input stages using the function. 494 * @throws UnsupportedOperationException if any of the {@link CompletionStage}s 495 * do not interoperate with CompletableFuture 496 */ 497 public static <R, A, B, C, D> CompletionStage<R> combineFutures( 498 CompletionStage<A> a, 499 CompletionStage<B> b, 500 CompletionStage<C> c, 501 CompletionStage<D> d, 502 Function4<A, B, C, D, CompletionStage<R>> function) { 503 final CompletableFuture<A> af = a.toCompletableFuture(); 504 final CompletableFuture<B> bf = b.toCompletableFuture(); 505 final CompletableFuture<C> cf = c.toCompletableFuture(); 506 final CompletableFuture<D> df = d.toCompletableFuture(); 507 508 return CompletableFuture.allOf(af, bf, cf, df) 509 .thenCompose(ignored -> function.apply(af.join(), bf.join(), cf.join(), df.join())); 510 } 511 512 /** 513 * Composes multiple stages into another stage using a function. 514 * 515 * @param a the first stage. 516 * @param b the second stage. 517 * @param c the third stage. 518 * @param d the fourth stage. 519 * @param e the fifth stage. 520 * @param function the combining function. 521 * @param <R> the type of the composed {@link CompletionStage}. 522 * @param <A> the type of the first stage's value. 523 * @param <B> the type of the second stage's value. 524 * @param <C> the type of the third stage's value. 525 * @param <D> the type of the fourth stage's value. 526 * @param <E> the type of the fifth stage's value. 527 * @return a stage that is composed from the input stages using the function. 528 * @throws UnsupportedOperationException if any of the {@link CompletionStage}s 529 * do not interoperate with CompletableFuture 530 */ 531 public static <R, A, B, C, D, E> CompletionStage<R> combineFutures( 532 CompletionStage<A> a, 533 CompletionStage<B> b, 534 CompletionStage<C> c, 535 CompletionStage<D> d, 536 CompletionStage<E> e, 537 Function5<A, B, C, D, E, CompletionStage<R>> function) { 538 final CompletableFuture<A> af = a.toCompletableFuture(); 539 final CompletableFuture<B> bf = b.toCompletableFuture(); 540 final CompletableFuture<C> cf = c.toCompletableFuture(); 541 final CompletableFuture<D> df = d.toCompletableFuture(); 542 final CompletableFuture<E> ef = e.toCompletableFuture(); 543 544 return CompletableFuture.allOf(af, bf, cf, df, ef) 545 .thenCompose(ignored -> function.apply(af.join(), 546 bf.join(), 547 cf.join(), 548 df.join(), 549 ef.join())); 550 } 551 552 /** 553 * Composes multiple stages into another stage using a function. 554 * 555 * @param a the first stage. 556 * @param b the second stage. 557 * @param c the third stage. 558 * @param d the fourth stage. 559 * @param e the fifth stage. 560 * @param f the sixth stage. 561 * @param function the combining function. 562 * @param <R> the type of the composed {@link CompletionStage}. 563 * @param <A> the type of the first stage's value. 564 * @param <B> the type of the second stage's value. 565 * @param <C> the type of the third stage's value. 566 * @param <D> the type of the fourth stage's value. 567 * @param <E> the type of the fifth stage's value. 568 * @param <F> the type of the sixth stage's value. 569 * @return a stage that is composed from the input stages using the function. 570 * @throws UnsupportedOperationException if any of the {@link CompletionStage}s 571 * do not interoperate with CompletableFuture 572 */ 573 public static <R, A, B, C, D, E, F> CompletionStage<R> combineFutures( 574 CompletionStage<A> a, 575 CompletionStage<B> b, 576 CompletionStage<C> c, 577 CompletionStage<D> d, 578 CompletionStage<E> e, 579 CompletionStage<F> f, 580 Function6<A, B, C, D, E, F, CompletionStage<R>> function) { 581 final CompletableFuture<A> af = a.toCompletableFuture(); 582 final CompletableFuture<B> bf = b.toCompletableFuture(); 583 final CompletableFuture<C> cf = c.toCompletableFuture(); 584 final CompletableFuture<D> df = d.toCompletableFuture(); 585 final CompletableFuture<E> ef = e.toCompletableFuture(); 586 final CompletableFuture<F> ff = f.toCompletableFuture(); 587 588 return CompletableFuture.allOf(af, bf, cf, df, ef, ff) 589 .thenCompose(ignored -> function.apply(af.join(), 590 bf.join(), 591 cf.join(), 592 df.join(), 593 ef.join(), 594 ff.join())); 595 } 596 597 /** 598 * Polls an external resource periodically until it returns a non-empty result. 599 * 600 * <p> The polling task should return {@code Optional.empty()} until it becomes available, and 601 * then {@code Optional.of(result)}. If the polling task throws an exception or returns null, 602 * that will cause the result future to complete exceptionally. 603 * 604 * <p> Canceling the returned future will cancel the scheduled polling task as well. 605 * 606 * <p> Note that on a ScheduledThreadPoolExecutor the polling task might remain allocated for up 607 * to {@code frequency} time after completing or being cancelled. If you have lots of polling 608 * operations or a long polling frequency, consider setting {@code removeOnCancelPolicy} to true. 609 * See {@link java.util.concurrent.ScheduledThreadPoolExecutor#setRemoveOnCancelPolicy(boolean)}. 610 * 611 * @param pollingTask the polling task 612 * @param frequency the frequency to run the polling task at 613 * @param executorService the executor service to schedule the polling task on 614 * @param <T> the type of the result of the polling task, that will be returned when 615 * the task succeeds. 616 * @return a future completing to the result of the polling task once that becomes available 617 */ 618 public static <T> CompletableFuture<T> poll( 619 final Supplier<Optional<T>> pollingTask, 620 final Duration frequency, 621 final ScheduledExecutorService executorService) { 622 final CompletableFuture<T> result = new CompletableFuture<>(); 623 final ScheduledFuture<?> scheduled = executorService.scheduleAtFixedRate( 624 () -> pollTask(pollingTask, result), 0, frequency.toMillis(), TimeUnit.MILLISECONDS); 625 result.whenComplete((r, ex) -> scheduled.cancel(true)); 626 return result; 627 } 628 629 private static <T> void pollTask( 630 final Supplier<Optional<T>> pollingTask, 631 final CompletableFuture<T> resultFuture) { 632 try { 633 pollingTask.get().ifPresent(resultFuture::complete); 634 } catch (Exception ex) { 635 resultFuture.completeExceptionally(ex); 636 } 637 } 638 639 }