CompletableDeferred

package org.example.util;

import java.util.concurrent.*;
import java.util.function.*;

/**
 * CompletableDeferred 类是一个用于异步操作的工具类,提供了类似于 Kotlin 协程库中 CompletableDeferred 类的功能。
 * CompletableDeferred 类使用 CompletableFuture 类作为内部实现,封装了异步操作的结果和取消功能。
 *
 * @author TheEnd
 * @param <T> 异步操作的结果类型
 */
public class CompletableDeferred<T> {

    /**
     * 用于保存异步操作的结果和状态的 CompletableFuture 对象。
     */
    private final CompletableFuture<T> future;

    /**
     * 创建一个新的 CompletableDeferred 实例。
     * 在内部,它使用 CompletableFuture 类来实现异步操作的结果和取消功能。
     */
    public CompletableDeferred() {
        future = new CompletableFuture<>();
    }

    /**
     * 将异步操作标记为已完成,并设置结果值。
     * 将异步操作标记为已完成,并设置结果值。方法的参数 value 表示异步操作的结果值。
     * 在方法内部,调用 future.complete(value) 来将 CompletableFuture 对象标记为已完成,并将结果值设置为 value。
     * 通过调用该方法,可以手动完成异步操作,并将结果传递给等待该结果的代码。
     *
     * @param value 异步操作的结果值
     */
    public void complete(T value) {
        future.complete(value);
    }

    /**
     * 将异步操作标记为已完成,并设置异常结果。
     * 方法内部调用 future.completeExceptionally(exception) 来将 CompletableFuture 对象标记为已完成,并将异常结果设置为 exception。
     * 通过调用该方法,可以手动完成异步操作,并指示操作以异常的方式结束。
     * 这对于在异步操作出现错误或异常时进行处理非常有用。
     * 等待该结果的代码可以通过捕获 CompletionException 或 ExecutionException 来获取异常结果,并进行相应的处理。
     *
     * @param exception 异常结果
     */
    public void completeExceptionally(Throwable exception) {
        future.completeExceptionally(exception);
    }

    /**
     * 等待异步操作的完成,并返回结果值。
     * 在方法内部,调用 future.get() 来等待异步操作的完成,并获取结果值。
     * 通过调用 await() 方法,可以阻塞当前线程,直到异步操作完成,并获取到结果值。
     * 请注意,如果异步操作尚未完成,该方法将一直阻塞,直到操作完成或抛出异常。
     *
     * @return 异步操作的结果值
     * @throws InterruptedException 如果在等待过程中被中断,即其他线程调用了当前线程的 interrupt() 方法,那么会抛出该异常。这种情况下,可以根据需要进行相应的处理。
     * @throws ExecutionException   如果异步操作抛出异常,那么会将异常封装在 ExecutionException 中抛出。可以通过捕获该异常来获取异步操作的异常结果,并进行相应的处理。
     */
    public T await() throws InterruptedException, ExecutionException {
        try {
            return future.get();
        } catch (InterruptedException | ExecutionException e) {
            throw e;
        } catch (Exception e) {
            throw new CancellationException("CompletableDeferred was cancelled.");
        }
    }

    /**
     * 等待异步操作的完成,并返回结果值。并忽略异常。
     * 在方法内部,调用 future.get() 来等待异步操作的完成,并获取结果值。
     * 通过调用 await() 方法,可以阻塞当前线程,直到异步操作完成,并获取到结果值。
     * 请注意,如果异步操作尚未完成,该方法将一直阻塞,直到操作完成或抛出异常。
     *
     * @return 异步操作的结果值
     */
    public T awaitIgnoreException() {
        try {
            return future.get();
        } catch (InterruptedException | ExecutionException e) {
            throw new RuntimeException(e);
        } catch (Exception e) {
            throw new CancellationException("CompletableDeferred was cancelled.");
        }
    }

    /**
     * 在指定的时间内等待异步操作的完成,并返回结果值。
     * 在方法内部,调用 future.get(timeout, unit) 来等待异步操作的完成,并获取结果值。
     * 如果在指定的时间内操作完成,那么将返回结果值;如果超过了指定的等待时间,将抛出 TimeoutException 异常。
     *
     * @param timeout 等待超时时间,表示等待的最长时间。
     * @param unit    超时时间的单位,可以是秒、毫秒、微秒等。
     * @return 异步操作的结果值
     * @throws InterruptedException 如果在等待过程中被中断,即其他线程调用了当前线程的 interrupt() 方法,那么会抛出该异常。这种情况下,可以根据需要进行响应的处理。
     * @throws ExecutionException   如果异步操作抛出异常,那么会将异常封装在 ExecutionException 中抛出。可以通过捕获该异常来获取异步操作的异常结果,并进行相应的处理。
     * @throws TimeoutException     如果等待超时,即超过了指定的等待时间,那么会抛出该异常。可以根据需要进行相应的处理。
     */
    public T await(long timeout, TimeUnit unit) throws ExecutionException, InterruptedException, TimeoutException {
        try {
            return future.get(timeout, unit);
        } catch (InterruptedException | ExecutionException | TimeoutException e) {
            throw e;
        } catch (Exception e) {
            throw new CancellationException("CompletableDeferred was cancelled.");
        }
    }

    /**
     * 在指定的时间内等待异步操作的完成,并返回结果值。并忽略异常。
     * 在方法内部,调用 future.get(timeout, unit) 来等待异步操作的完成,并获取结果值。
     * 如果在指定的时间内操作完成,那么将返回结果值;如果超过了指定的等待时间,将抛出 TimeoutException 异常。
     *
     * @param timeout 等待超时时间,表示等待的最长时间。
     * @param unit    超时时间的单位,可以是秒、毫秒、微秒等。
     * @return 异步操作的结果值
     */
    public T awaitIgnoreException(long timeout, TimeUnit unit) {
        try {
            return future.get(timeout, unit);
        } catch (InterruptedException | ExecutionException | TimeoutException e) {
            e.printStackTrace();
            throw new RuntimeException(e);
        } catch (Exception e) {
            throw new CancellationException("CompletableDeferred was cancelled.");
        }
    }

    /**
     * 尝试取消异步操作的执行。
     * 默认会中断正在执行的操作。
     *
     * @return 如果成功取消了异步操作,则返回 true;如果无法取消或者已经完成,则返回 false
     */
    public boolean cancel() {
        return future.cancel(true);
    }

    /**
     * 尝试取消异步操作的执行。
     * 取消操作并不保证一定成功,它取决于异步操作的具体实现。如果成功取消了操作,你可以根据需要进行相应的处理。
     *
     * @param mayInterruptIfRunning 是否允许中断正在执行的操作;如果设置为 true,则会尝试中断正在执行的操作;如果设置为 false,则不会中断正在执行的操作。
     * @return 如果成功取消了异步操作,则返回 true;如果无法取消或者已经完成,则返回 false
     */
    public boolean cancel(boolean mayInterruptIfRunning) {
        return future.cancel(mayInterruptIfRunning);
    }

    /**
     * 检查异步操作是否已被取消。
     *
     * @return 如果异步操作已被取消,则返回 true;否则返回 false
     */
    public boolean isCancelled() {
        return future.isCancelled();
    }

    /**
     * 检查异步操作是否已经异常完成。
     * 在方法内部,调用 future.isCompletedExceptionally() 来检查异步操作是否已经异常完成。
     *
     * @return 如果异步操作已经异常完成,则返回 true;否则返回 false
     */
    public boolean isCompletedExceptionally() {
        return future.isCompletedExceptionally();
    }

    /**
     * 应用给定的函数对异步操作的结果进行转换,并返回一个新的 CompletableDeferred 对象,该对象表示转换后的异步操作。
     * 可以构建异步操作的处理链,实现结果的转换和串联。
     *
     * @param function 要应用的函数,接受异步操作的结果作为参数,并返回转换后的结果
     * @param <U>      转换后的结果类型
     * @return 一个新的 CompletableDeferred 对象,表示转换后的异步操作
     */
    public <U> CompletableDeferred<U> thenApply(Function<? super T, ? extends U> function) {
        // 创建一个新的 CompletableDeferred 对象,用于表示转换后的异步操作
        CompletableDeferred<U> deferred = new CompletableDeferred<>();
        // 使用 future.thenApply() 方法将给定的函数应用于异步操作的结果,并在完成时处理结果或异常
        future.thenApply(function).whenComplete((result, throwable) -> {
            if (throwable != null) {
                // 如果发生异常,则使用 deferred.completeExceptionally() 方法将异常传播给新的异步操作
                deferred.completeExceptionally(throwable);
            } else {
                // 如果结果正常完成,则使用 deferred.complete() 方法传播结果给新的异步操作
                deferred.complete(result);
            }
        });
        // 返回表示转换后的异步操作的 CompletableDeferred 对象
        return deferred;
    }

    /**
     * 对异步操作的结果应用给定的消费者函数,并返回一个新的 CompletableDeferred 对象,该对象表示处理结果后的异步操作。
     * 可以实现对结果的处理,例如打印结果、保存到数据库等操作。
     *
     * @param action 要应用的消费者函数,接受异步操作的结果作为参数,无返回值
     * @return 一个新的 CompletableDeferred 对象,表示处理结果后的异步操作
     */
    public CompletableDeferred<Void> thenAccept(Consumer<? super T> action) {
        // 创建一个新的 CompletableDeferred<Void> 对象,用于表示处理结果后的异步操作
        CompletableDeferred<Void> deferred = new CompletableDeferred<>();
        // 使用 future.thenAccept() 方法对异步操作的结果应用给定的消费者函数
        future.thenAccept(result -> {
            try {
                // 调用 action.accept() 方法将异步操作的结果传递给消费者函数进行处理
                action.accept(result);
                // 当处理完成时,使用 deferred.complete(null) 方法标记异步操作完成
                deferred.complete(null);
            } catch (Exception e) {
                // 如果发生异常,则使用 deferred.completeExceptionally() 方法将异常传播给新的异步操作
                deferred.completeExceptionally(e);
            }
        });
        // 返回表示处理结果后的异步操作的 CompletableDeferred<Void> 对象
        return deferred;
    }

    /**
     * 在异步操作发生异常时,应用给定的函数对异常进行处理,并返回一个新的 CompletableDeferred 对象,该对象表示处理异常后的异步操作。
     * 可以实现对异常情况的处理,例如返回默认值、进行错误日志记录等操作。
     *
     * @param function 用于处理异常的函数,接受异常对象作为参数,并返回处理后的结果
     * @return 一个新的 CompletableDeferred 对象,表示处理异常后的异步操作
     */
    public CompletableDeferred<T> exceptionally(Function<Throwable, ? extends T> function) {
        // 创建一个新的 CompletableDeferred<T> 对象,用于表示处理异常后的异步操作
        CompletableDeferred<T> deferred = new CompletableDeferred<>();
        // 使用 future.exceptionally() 方法对异步操作的异常进行处理
        future.exceptionally(throwable -> {
            try {
                // 调用 function.apply() 方法对异常进行处理,并获取处理后的结果
                T result = function.apply(throwable);
                // 使用 deferred.complete() 方法传播处理后的结果给新的异步操作
                deferred.complete(result);
            } catch (Exception e) {
                // 如果处理过程中发生异常,则使用 deferred.completeExceptionally() 方法将异常传播给新的异步操作
                deferred.completeExceptionally(e);
            }
            return null;
        });
        // 返回表示处理异常后的异步操作的 CompletableDeferred<T> 对象
        return deferred;
    }

    /**
     * 将当前异步操作的结果与另一个异步操作的结果进行组合,并应用给定的函数对组合结果进行处理,返回一个新的 CompletableDeferred 对象,该对象表示处理组合结果后的异步操作。
     * 可以实现对多个异步操作结果的组合和处理,例如进行数据合并、计算结果聚合等操作。
     *
     * @param other 另一个异步操作的 CompletableDeferred 对象
     * @param function 用于处理组合结果的函数,接受当前异步操作的结果和另一个异步操作的结果作为参数,并返回处理后的结果
     * @return 一个新的 CompletableDeferred 对象,表示处理组合结果后的异步操作
     */
    public <U, V> CompletableDeferred<V> thenCombine(CompletableDeferred<? extends U> other, BiFunction<? super T, ? super U, ? extends V> function) {
        // 创建一个新的 CompletableDeferred<V> 对象,用于表示处理组合结果后的异步操作
        CompletableDeferred<V> deferred = new CompletableDeferred<>();
        // 使用 future.thenCombine() 方法将当前异步操作的结果与另一个异步操作的结果进行组合,并应用给定的函数对组合结果进行处理
        future.thenCombine(other.future, function).whenComplete((result, throwable) -> {
            if (throwable != null) {
                // 如果组合过程中发生异常,则使用 deferred.completeExceptionally() 方法将异常传播给新的异步操作
                deferred.completeExceptionally(throwable);
            } else {
                // 如果组合过程没有发生异常,则使用 deferred.complete() 方法传播处理后的结果给新的异步操作
                deferred.complete(result);
            }
        });
        // 返回表示处理组合结果后的异步操作的 CompletableDeferred<V> 对象
        return deferred;
    }

    /**
     * 将当前异步操作的结果与另一个异步操作的结果进行处理,不返回任何结果,返回一个新的 CompletableDeferred 对象,该对象表示处理结果后的异步操作。
     * 可以实现对多个异步操作结果的处理,例如进行结果合并、执行副作用等操作。
     *
     * @param other 另一个异步操作的 CompletableDeferred 对象
     * @param action 用于处理结果的消费者函数,接受当前异步操作的结果和另一个异步操作的结果作为参数,不返回任何结果
     * @return 一个新的 CompletableDeferred 对象,表示处理结果后的异步操作
     */
    public <U> CompletableDeferred<Void> thenAcceptBoth(CompletableDeferred<? extends U> other, BiConsumer<? super T, ? super U> action) {
        // 创建一个新的 CompletableDeferred<Void> 对象,用于表示处理结果后的异步操作
        CompletableDeferred<Void> deferred = new CompletableDeferred<>();
        // 使用 future.thenAcceptBoth() 方法将当前异步操作的结果与另一个异步操作的结果进行处理
        future.thenAcceptBoth(other.future, (result1, result2) -> {
            try {
                // 调用 action.accept() 方法对结果进行处理
                action.accept(result1, result2);
                // 使用 deferred.complete() 方法表示处理结果完成
                deferred.complete(null);
            } catch (Exception e) {
                // 如果处理过程中发生异常,则使用 deferred.completeExceptionally() 方法将异常传播给新的异步操作
                deferred.completeExceptionally(e);
            }
        });
        // 返回表示处理结果后的异步操作的 CompletableDeferred<Void> 对象
        return deferred;
    }

    /**
     * 使用给定的 Supplier 函数创建一个异步操作,并返回一个新的 CompletableDeferred 对象,该对象表示异步操作的结果。
     * 可以实现在异步环境中执行耗时操作、并行处理等操作。
     *
     * @param supplier 用于创建异步操作的 Supplier 函数,不接受任何参数,返回一个结果对象
     * @return 一个新的 CompletableDeferred 对象,表示异步操作的结果
     */
    public static <U> CompletableDeferred<U> supplyAsync(Supplier<U> supplier) {
        // 创建一个新的 CompletableDeferred<U> 对象,用于表示异步操作的结果
        CompletableDeferred<U> deferred = new CompletableDeferred<>();
        // 使用 CompletableFuture.supplyAsync() 方法创建一个异步操作,并注册一个回调函数处理异步操作的结果
        CompletableFuture.supplyAsync(supplier).whenComplete((result, throwable) -> {
            if (throwable != null) {
                // 如果异步操作过程中发生异常,则使用 deferred.completeExceptionally() 方法将异常传播给新的异步操作
                deferred.completeExceptionally(throwable);
            } else {
                // 如果异步操作完成且没有发生异常,则使用 deferred.complete() 方法传播结果给新的异步操作
                deferred.complete(result);
            }
        });
        // 返回表示异步操作结果的 CompletableDeferred<U> 对象
        return deferred;
    }

}

示例

@Test
public void testAwait() {
    // 创建一个供应商函数,用于模拟一个耗时操作
    Supplier<Integer> supplier = () -> {
        try {
            // 模拟耗时操作
            Thread.sleep(2000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        return 42;
    };

    // 创建一个 CompletableDeferred 对象
    CompletableDeferred<Integer> deferred = new CompletableDeferred<>();

    // 在另一个线程中执行耗时操作,并使用 complete() 方法传播结果给异步操作
    CompletableFuture.runAsync(() -> {
        Integer result = supplier.get();
        deferred.complete(result);
    });

    // 等待异步操作完成并获取结果
    try {
        Integer result = deferred.await();
        System.out.println("异步操作结果: " + result);
    } catch (InterruptedException | ExecutionException e) {
        e.printStackTrace();
    }
}

发表评论