CompletableDeferred
package org.example.util;
import java.util.concurrent.*;
import java.util.function.*;
/**
* CompletableDeferred 类是一个用于异步操作的工具类,提供了类似于 Kotlin 协程库中 CompletableDeferred 类的功能。
* CompletableDeferred 类使用 CompletableFuture 类作为内部实现,封装了异步操作的结果和取消功能。
*
* @author TheEnd
* @param <T> 异步操作的结果类型
*/
public class CompletableDeferred<T> {
/**
* 用于保存异步操作的结果和状态的 CompletableFuture 对象。
*/
private final CompletableFuture<T> future;
/**
* 创建一个新的 CompletableDeferred 实例。
* 在内部,它使用 CompletableFuture 类来实现异步操作的结果和取消功能。
*/
public CompletableDeferred() {
future = new CompletableFuture<>();
}
/**
* 将异步操作标记为已完成,并设置结果值。
* 将异步操作标记为已完成,并设置结果值。方法的参数 value 表示异步操作的结果值。
* 在方法内部,调用 future.complete(value) 来将 CompletableFuture 对象标记为已完成,并将结果值设置为 value。
* 通过调用该方法,可以手动完成异步操作,并将结果传递给等待该结果的代码。
*
* @param value 异步操作的结果值
*/
public void complete(T value) {
future.complete(value);
}
/**
* 将异步操作标记为已完成,并设置异常结果。
* 方法内部调用 future.completeExceptionally(exception) 来将 CompletableFuture 对象标记为已完成,并将异常结果设置为 exception。
* 通过调用该方法,可以手动完成异步操作,并指示操作以异常的方式结束。
* 这对于在异步操作出现错误或异常时进行处理非常有用。
* 等待该结果的代码可以通过捕获 CompletionException 或 ExecutionException 来获取异常结果,并进行相应的处理。
*
* @param exception 异常结果
*/
public void completeExceptionally(Throwable exception) {
future.completeExceptionally(exception);
}
/**
* 等待异步操作的完成,并返回结果值。
* 在方法内部,调用 future.get() 来等待异步操作的完成,并获取结果值。
* 通过调用 await() 方法,可以阻塞当前线程,直到异步操作完成,并获取到结果值。
* 请注意,如果异步操作尚未完成,该方法将一直阻塞,直到操作完成或抛出异常。
*
* @return 异步操作的结果值
* @throws InterruptedException 如果在等待过程中被中断,即其他线程调用了当前线程的 interrupt() 方法,那么会抛出该异常。这种情况下,可以根据需要进行相应的处理。
* @throws ExecutionException 如果异步操作抛出异常,那么会将异常封装在 ExecutionException 中抛出。可以通过捕获该异常来获取异步操作的异常结果,并进行相应的处理。
*/
public T await() throws InterruptedException, ExecutionException {
try {
return future.get();
} catch (InterruptedException | ExecutionException e) {
throw e;
} catch (Exception e) {
throw new CancellationException("CompletableDeferred was cancelled.");
}
}
/**
* 等待异步操作的完成,并返回结果值。并忽略异常。
* 在方法内部,调用 future.get() 来等待异步操作的完成,并获取结果值。
* 通过调用 await() 方法,可以阻塞当前线程,直到异步操作完成,并获取到结果值。
* 请注意,如果异步操作尚未完成,该方法将一直阻塞,直到操作完成或抛出异常。
*
* @return 异步操作的结果值
*/
public T awaitIgnoreException() {
try {
return future.get();
} catch (InterruptedException | ExecutionException e) {
throw new RuntimeException(e);
} catch (Exception e) {
throw new CancellationException("CompletableDeferred was cancelled.");
}
}
/**
* 在指定的时间内等待异步操作的完成,并返回结果值。
* 在方法内部,调用 future.get(timeout, unit) 来等待异步操作的完成,并获取结果值。
* 如果在指定的时间内操作完成,那么将返回结果值;如果超过了指定的等待时间,将抛出 TimeoutException 异常。
*
* @param timeout 等待超时时间,表示等待的最长时间。
* @param unit 超时时间的单位,可以是秒、毫秒、微秒等。
* @return 异步操作的结果值
* @throws InterruptedException 如果在等待过程中被中断,即其他线程调用了当前线程的 interrupt() 方法,那么会抛出该异常。这种情况下,可以根据需要进行响应的处理。
* @throws ExecutionException 如果异步操作抛出异常,那么会将异常封装在 ExecutionException 中抛出。可以通过捕获该异常来获取异步操作的异常结果,并进行相应的处理。
* @throws TimeoutException 如果等待超时,即超过了指定的等待时间,那么会抛出该异常。可以根据需要进行相应的处理。
*/
public T await(long timeout, TimeUnit unit) throws ExecutionException, InterruptedException, TimeoutException {
try {
return future.get(timeout, unit);
} catch (InterruptedException | ExecutionException | TimeoutException e) {
throw e;
} catch (Exception e) {
throw new CancellationException("CompletableDeferred was cancelled.");
}
}
/**
* 在指定的时间内等待异步操作的完成,并返回结果值。并忽略异常。
* 在方法内部,调用 future.get(timeout, unit) 来等待异步操作的完成,并获取结果值。
* 如果在指定的时间内操作完成,那么将返回结果值;如果超过了指定的等待时间,将抛出 TimeoutException 异常。
*
* @param timeout 等待超时时间,表示等待的最长时间。
* @param unit 超时时间的单位,可以是秒、毫秒、微秒等。
* @return 异步操作的结果值
*/
public T awaitIgnoreException(long timeout, TimeUnit unit) {
try {
return future.get(timeout, unit);
} catch (InterruptedException | ExecutionException | TimeoutException e) {
e.printStackTrace();
throw new RuntimeException(e);
} catch (Exception e) {
throw new CancellationException("CompletableDeferred was cancelled.");
}
}
/**
* 尝试取消异步操作的执行。
* 默认会中断正在执行的操作。
*
* @return 如果成功取消了异步操作,则返回 true;如果无法取消或者已经完成,则返回 false
*/
public boolean cancel() {
return future.cancel(true);
}
/**
* 尝试取消异步操作的执行。
* 取消操作并不保证一定成功,它取决于异步操作的具体实现。如果成功取消了操作,你可以根据需要进行相应的处理。
*
* @param mayInterruptIfRunning 是否允许中断正在执行的操作;如果设置为 true,则会尝试中断正在执行的操作;如果设置为 false,则不会中断正在执行的操作。
* @return 如果成功取消了异步操作,则返回 true;如果无法取消或者已经完成,则返回 false
*/
public boolean cancel(boolean mayInterruptIfRunning) {
return future.cancel(mayInterruptIfRunning);
}
/**
* 检查异步操作是否已被取消。
*
* @return 如果异步操作已被取消,则返回 true;否则返回 false
*/
public boolean isCancelled() {
return future.isCancelled();
}
/**
* 检查异步操作是否已经异常完成。
* 在方法内部,调用 future.isCompletedExceptionally() 来检查异步操作是否已经异常完成。
*
* @return 如果异步操作已经异常完成,则返回 true;否则返回 false
*/
public boolean isCompletedExceptionally() {
return future.isCompletedExceptionally();
}
/**
* 应用给定的函数对异步操作的结果进行转换,并返回一个新的 CompletableDeferred 对象,该对象表示转换后的异步操作。
* 可以构建异步操作的处理链,实现结果的转换和串联。
*
* @param function 要应用的函数,接受异步操作的结果作为参数,并返回转换后的结果
* @param <U> 转换后的结果类型
* @return 一个新的 CompletableDeferred 对象,表示转换后的异步操作
*/
public <U> CompletableDeferred<U> thenApply(Function<? super T, ? extends U> function) {
// 创建一个新的 CompletableDeferred 对象,用于表示转换后的异步操作
CompletableDeferred<U> deferred = new CompletableDeferred<>();
// 使用 future.thenApply() 方法将给定的函数应用于异步操作的结果,并在完成时处理结果或异常
future.thenApply(function).whenComplete((result, throwable) -> {
if (throwable != null) {
// 如果发生异常,则使用 deferred.completeExceptionally() 方法将异常传播给新的异步操作
deferred.completeExceptionally(throwable);
} else {
// 如果结果正常完成,则使用 deferred.complete() 方法传播结果给新的异步操作
deferred.complete(result);
}
});
// 返回表示转换后的异步操作的 CompletableDeferred 对象
return deferred;
}
/**
* 对异步操作的结果应用给定的消费者函数,并返回一个新的 CompletableDeferred 对象,该对象表示处理结果后的异步操作。
* 可以实现对结果的处理,例如打印结果、保存到数据库等操作。
*
* @param action 要应用的消费者函数,接受异步操作的结果作为参数,无返回值
* @return 一个新的 CompletableDeferred 对象,表示处理结果后的异步操作
*/
public CompletableDeferred<Void> thenAccept(Consumer<? super T> action) {
// 创建一个新的 CompletableDeferred<Void> 对象,用于表示处理结果后的异步操作
CompletableDeferred<Void> deferred = new CompletableDeferred<>();
// 使用 future.thenAccept() 方法对异步操作的结果应用给定的消费者函数
future.thenAccept(result -> {
try {
// 调用 action.accept() 方法将异步操作的结果传递给消费者函数进行处理
action.accept(result);
// 当处理完成时,使用 deferred.complete(null) 方法标记异步操作完成
deferred.complete(null);
} catch (Exception e) {
// 如果发生异常,则使用 deferred.completeExceptionally() 方法将异常传播给新的异步操作
deferred.completeExceptionally(e);
}
});
// 返回表示处理结果后的异步操作的 CompletableDeferred<Void> 对象
return deferred;
}
/**
* 在异步操作发生异常时,应用给定的函数对异常进行处理,并返回一个新的 CompletableDeferred 对象,该对象表示处理异常后的异步操作。
* 可以实现对异常情况的处理,例如返回默认值、进行错误日志记录等操作。
*
* @param function 用于处理异常的函数,接受异常对象作为参数,并返回处理后的结果
* @return 一个新的 CompletableDeferred 对象,表示处理异常后的异步操作
*/
public CompletableDeferred<T> exceptionally(Function<Throwable, ? extends T> function) {
// 创建一个新的 CompletableDeferred<T> 对象,用于表示处理异常后的异步操作
CompletableDeferred<T> deferred = new CompletableDeferred<>();
// 使用 future.exceptionally() 方法对异步操作的异常进行处理
future.exceptionally(throwable -> {
try {
// 调用 function.apply() 方法对异常进行处理,并获取处理后的结果
T result = function.apply(throwable);
// 使用 deferred.complete() 方法传播处理后的结果给新的异步操作
deferred.complete(result);
} catch (Exception e) {
// 如果处理过程中发生异常,则使用 deferred.completeExceptionally() 方法将异常传播给新的异步操作
deferred.completeExceptionally(e);
}
return null;
});
// 返回表示处理异常后的异步操作的 CompletableDeferred<T> 对象
return deferred;
}
/**
* 将当前异步操作的结果与另一个异步操作的结果进行组合,并应用给定的函数对组合结果进行处理,返回一个新的 CompletableDeferred 对象,该对象表示处理组合结果后的异步操作。
* 可以实现对多个异步操作结果的组合和处理,例如进行数据合并、计算结果聚合等操作。
*
* @param other 另一个异步操作的 CompletableDeferred 对象
* @param function 用于处理组合结果的函数,接受当前异步操作的结果和另一个异步操作的结果作为参数,并返回处理后的结果
* @return 一个新的 CompletableDeferred 对象,表示处理组合结果后的异步操作
*/
public <U, V> CompletableDeferred<V> thenCombine(CompletableDeferred<? extends U> other, BiFunction<? super T, ? super U, ? extends V> function) {
// 创建一个新的 CompletableDeferred<V> 对象,用于表示处理组合结果后的异步操作
CompletableDeferred<V> deferred = new CompletableDeferred<>();
// 使用 future.thenCombine() 方法将当前异步操作的结果与另一个异步操作的结果进行组合,并应用给定的函数对组合结果进行处理
future.thenCombine(other.future, function).whenComplete((result, throwable) -> {
if (throwable != null) {
// 如果组合过程中发生异常,则使用 deferred.completeExceptionally() 方法将异常传播给新的异步操作
deferred.completeExceptionally(throwable);
} else {
// 如果组合过程没有发生异常,则使用 deferred.complete() 方法传播处理后的结果给新的异步操作
deferred.complete(result);
}
});
// 返回表示处理组合结果后的异步操作的 CompletableDeferred<V> 对象
return deferred;
}
/**
* 将当前异步操作的结果与另一个异步操作的结果进行处理,不返回任何结果,返回一个新的 CompletableDeferred 对象,该对象表示处理结果后的异步操作。
* 可以实现对多个异步操作结果的处理,例如进行结果合并、执行副作用等操作。
*
* @param other 另一个异步操作的 CompletableDeferred 对象
* @param action 用于处理结果的消费者函数,接受当前异步操作的结果和另一个异步操作的结果作为参数,不返回任何结果
* @return 一个新的 CompletableDeferred 对象,表示处理结果后的异步操作
*/
public <U> CompletableDeferred<Void> thenAcceptBoth(CompletableDeferred<? extends U> other, BiConsumer<? super T, ? super U> action) {
// 创建一个新的 CompletableDeferred<Void> 对象,用于表示处理结果后的异步操作
CompletableDeferred<Void> deferred = new CompletableDeferred<>();
// 使用 future.thenAcceptBoth() 方法将当前异步操作的结果与另一个异步操作的结果进行处理
future.thenAcceptBoth(other.future, (result1, result2) -> {
try {
// 调用 action.accept() 方法对结果进行处理
action.accept(result1, result2);
// 使用 deferred.complete() 方法表示处理结果完成
deferred.complete(null);
} catch (Exception e) {
// 如果处理过程中发生异常,则使用 deferred.completeExceptionally() 方法将异常传播给新的异步操作
deferred.completeExceptionally(e);
}
});
// 返回表示处理结果后的异步操作的 CompletableDeferred<Void> 对象
return deferred;
}
/**
* 使用给定的 Supplier 函数创建一个异步操作,并返回一个新的 CompletableDeferred 对象,该对象表示异步操作的结果。
* 可以实现在异步环境中执行耗时操作、并行处理等操作。
*
* @param supplier 用于创建异步操作的 Supplier 函数,不接受任何参数,返回一个结果对象
* @return 一个新的 CompletableDeferred 对象,表示异步操作的结果
*/
public static <U> CompletableDeferred<U> supplyAsync(Supplier<U> supplier) {
// 创建一个新的 CompletableDeferred<U> 对象,用于表示异步操作的结果
CompletableDeferred<U> deferred = new CompletableDeferred<>();
// 使用 CompletableFuture.supplyAsync() 方法创建一个异步操作,并注册一个回调函数处理异步操作的结果
CompletableFuture.supplyAsync(supplier).whenComplete((result, throwable) -> {
if (throwable != null) {
// 如果异步操作过程中发生异常,则使用 deferred.completeExceptionally() 方法将异常传播给新的异步操作
deferred.completeExceptionally(throwable);
} else {
// 如果异步操作完成且没有发生异常,则使用 deferred.complete() 方法传播结果给新的异步操作
deferred.complete(result);
}
});
// 返回表示异步操作结果的 CompletableDeferred<U> 对象
return deferred;
}
}
示例
@Test
public void testAwait() {
// 创建一个供应商函数,用于模拟一个耗时操作
Supplier<Integer> supplier = () -> {
try {
// 模拟耗时操作
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}
return 42;
};
// 创建一个 CompletableDeferred 对象
CompletableDeferred<Integer> deferred = new CompletableDeferred<>();
// 在另一个线程中执行耗时操作,并使用 complete() 方法传播结果给异步操作
CompletableFuture.runAsync(() -> {
Integer result = supplier.get();
deferred.complete(result);
});
// 等待异步操作完成并获取结果
try {
Integer result = deferred.await();
System.out.println("异步操作结果: " + result);
} catch (InterruptedException | ExecutionException e) {
e.printStackTrace();
}
}