干货 | 携程基于Quasar协程的NIO实践

时间:2022-07-23
本文章向大家介绍干货 | 携程基于Quasar协程的NIO实践,主要内容包括其使用实例、应用技巧、基本知识点总结和需要注意事项,具有一定的参考价值,需要的朋友可以参考一下。

作者简介

Ryan,携程Java开发工程师,对高并发、网络编程等领域有浓厚兴趣。

IO密集型系统在高并发场景下,会有大量线程处于阻塞状态,性能低下,JAVA上成熟的非阻塞IO(NIO)技术可解决该问题。目前Java项目对接NIO的方式主要依靠回调,代码复杂度高,降低了代码可读性与可维护性。近年来Golang、Kotlin等语言的协程(Coroutine)能达到高性能与可读性的兼顾。

本文利用开源的Quasar框架提供的协程对系统进行NIO改造,解决以下两个问题:

1)提升单机任务的吞吐量,保证业务请求突增时系统的可伸缩性。

2)使用更轻量的协程同步等待IO,替代处理NIO常用的异步回调。

一、Java异步编程与非阻塞IO

本文改造的系统处理来自前台的任务,通过HTTP请求对端服务,还通过RPC调用内部服务。当业务高峰时,系统会遇到瞬时并发任务量数十倍激增的情况,系统的线程数量急剧增加造成性能下降。为此,不得不扩容以保证业务高峰时期的性能。

基于epoll的NIO框架Netty在一些框架级别的应用中已经得到了广泛使用,但在快速迭代的业务系统中的应用依然有一定的局限性。NIO 消除了线程的同步阻塞,意味着只能异步处理IO的结果,这与业务开发者顺序化的思维模式有一定差异。当业务逻辑复杂以及出现多次远程调用的情况下,多级回调难以实现和维护。

1.1 Java中的异步工具

Java项目大多使用JDK8,除线程外可以获得的异步的编程支持包括CompletableFuture,以及开源的RxJava、Vert.x等反应式编程框架等。这些工具使用了基于响应式编程的链式调用逐级传递事件,未从根本解决回调问题。

如下为将一段简单的逻辑判断使用CompletableFuture进行异步改造后的对比。原始版本使用getA方法获得第一步的请求结果,根据其相应选择使用getB1还是getB2获取第二步的响应作为结果。

HttpResponse a = getA();

HttpResponse b ;
if(a.getBody().equals("1")){
    b=getB1();
}
else{
    b=getB2();
}

String ans=b.getBody();

首先将三个获取响应的方法改为异步。此处假设getB1与getB2内部已经具有复杂逻辑,且不属于同一领域,不适合合并为一个方法。

private CompletableFuture<HttpResponse> getA();
private CompletableFuture<HttpResponse> getB1();
private CompletableFuture<HttpResponse> getB2();

然后使用CompletableFuture的链式调用,将两个步骤组合起来:

String ans = getA()
        .thenCompose(a -> {
            if (a.getBody().equals("1")) {
                return getB1();
            } else {
                return getB2();
            }
        }).get()
        .getBody();

使用CompletableFuture的链式回调后,代码变得不友好。RxJava等框架同样具有这个问题。这类反应式的编程工具更适合于数据流的传递。对于if/else、switch/case,乃至while/for、break/continue这类过程控制语句,实现与维护的难度都很大。业务系统需要类似于线程的同步等待,同时具有低资源消耗的编码工具,配合 NIO使用。当时使用NIO时,由于可以不占用线程,可以使用一种资源消耗更小的协程来等待。

1.2 协程

协程是一种进程自身来调度任务的调度模式。协程与线程不同之处在于,线程由内核调度,而协程的调度是进程自身完成的。协程只是一种抽象,最终的执行者是线程,每个线程只能同时执行一个协程,但大量的协程可以只拥有少量几个线程执行者,协程的调度器负责决定当前线程在执行那个协程,其余协程处于休眠并被调度器保存在内存中。

和线程类似,协程挂起时需要记录栈信息,以及方法执行的位置,这些信息会被协程调度器保存。协程从挂起到重新被执行不需要执行重量级的内核调用,而是直接将状态信息还原到执行线程的栈,高并发场景下,协程极大地避免了切换线程的开销。下图展示了协程调度器内部任务的流转。

协程中调用的方法是可以挂起的。不同于线程的阻塞会使线程休眠,协程在等待异步任务的结果时,会通知调度器将自己放入挂起队列,释放占用的线程以处理其他的协程。异步任务完毕后,通过回调将异步结果告知协程,并通知调度器将协程重新加入就绪队列执行。

1.3 Quasar任务调度原理

Quasar(https://github.com/puniverse/quasar)是一个开源的Java协程框架,通过利用Java instrument技术对字节码进行修改,使方法挂起前后可以保存和恢复JVM栈帧,方法内部已执行到的字节码位置也通过增加状态机的方式记录,在下次恢复执行可直接跳转至最新位置。以如下方法为例,该方法分为两步,第一步为initial初始化,第二部为通过NIO获取网络响应。

public String instrumentDemo(){
    initial();
    String ans = getFromNIO();
    return ans;
}

Quasar会在initial前增加一个flag字段,表明当前方法执行的位置。第一次执行方法时,检查到flag为0,修改flag为1并继续往下执行initial方法。执行getFromNIO方法前插入字节码指令将栈帧中的数据全部保存在一个Quasar自定义的栈结构中,在执行getFromNIO后,挂起协程,让出线程资源。直至NIO异步完成后,协程调度器将第二次执行该方法,检测到flag为1,将会调用jump指令跳转到returnans语句前,并将保存的栈结构还原到当前栈中,最后调用人return ans语句,方法执行完毕。

二、系统异步IO改造

在项目中添加Quasar依赖后,可以使用Fiber类新建协程。建立的方法与线程类似。

new Fiber(()->{
    //方法体
}).start();

2.1 整合Netty与Quasar

系统使用的Http框架是基于Netty的async-http-client(https://github.com/AsyncHttpClient/async-http-client),该框架提供了异步回调和CompletableFuture两种对响应的异步处理方式。

CompletableFuture自JDK8推出,与之前的Future类最大的不同在于,提供了异步任务跨线程的通知和控制机制。即,任务的等待者可以在CompletableFuture注册任务完成或异常时的回调,而执行者也可以通过它通知等待者。Quaasr框架对它也做了支持,提供了API用于在协程中等待CompletableFuture的结果。调用后,协程将挂起,直至future状态为已完成。

AsyncCompletionStage.get(future)

通过CompletableFuture作为通知中介,我们可以将AsyncHttpClient与Quasar做整合,挂起协程等待IO结果。

//创建HttpClient
AsyncHttpClient httpClient = Dsl.asyncHttpClient();
//创建请求
Request request = createRequest();
//将网络请求交给HttpClient执行
CompletableFuture<Response> future = httpClient.executeRequest(request)
.toCompletableFuture();
//通过Quasar挂起协程
Response response = AsyncCompletionStage.get(future);
//获取网络结果后,通过future传递response并唤醒协程重新执行
deal(response);

过程可由下图表示。

Quasar框架AsyncCompletionStage.get内部完成的工作相当于,在HttpClient返回的future上注册回调,回调的内容是“IO操作完成后通知调度器唤醒协程”,这样将NIO异步回调全部操作封装在协程调度器中,用户代码看起来是同步等待的形式,避免了自行实现回调处理带来的繁琐,解决了前文所述的回调地狱。

2.2 声明挂起方法

Quasar需要织入字节码接管挂起方法的调度,在项目主pom下添加quasar-maven-plugin插件,该插件将在编译后的class文件中修改字节码。

<plugin>
    <groupId>com.vlkan</groupId>
    <artifactId>quasar-maven-plugin</artifactId>
    <version>0.7.9</version>
    <executions>
        <execution>
            <goals>
                <goal>instrument</goal>
            </goals>
        </execution>
    </executions>
</plugin>

Quasar通过识别方法是否抛出了该框架定义的SuspendExecution异常决定是否修改字节码。Quasar框架在AsyncCompletionStage.get方法上声明了SuspendExceution异常,该异常是捕获异常,但仅作为识别挂起方法的声明,在运行时不会实际抛出。使用者必须逐层抛出该异常直至新建协程的一层。当方法内部存在try/catch语句时,也必须抛出该异常。

public void startFiber() throws ExecutionException, InterruptedException {
    Fiber<Void> fiber = new Fiber<Void>(() -> {
        //不用继续抛出异常
        Response response = waitNextLayer1();
        deal(response);
    }).start();
}

private Response waitNextLayer1() throws SuspendExecution {
    return waitNextLayer2();
}

private Response waitNextLayer2() throws SuspendExecution {
       CompletableFuture<Response> future = httpClient.executeRequest(request)
.toCompletableFuture();
    try {
        // Quasar框架工具类抛出SuspendExecution
        return AsyncCompletionStage.get(future);
    } catch (Exception e) {
        return null;
    }
}

2.3 异步RPC调用

目前主流的RPC框架都基于NIO实现,支持异步回调,有的RPC框架已经直接提供了返回CompletableFuture或ListenableFuture(Guava工具类提供)的异步接口,通过使用ComplatableFuture,可以按前文类似的方法将Quasar与RPC框架结合起来。当RPC框架没有该返回类型时,一般会提供如下类似的带泛型的异步回调接口:

interface Callback<TResponse> {
    void callback(TResponse TResponse, Exception e);
}

这种情况,可以使用者自己创建ComplatableFuture,在回调中设置其状态,并调用AsyncCompletionStage.get等待这个future。

CompletableFuture<Response> future=new CompletableFuture<>();
//调用hello接口的异步API
new RpcClient().helloAsync(request, new Callback<Response>() {
    public void callback(Response response, Exception e) {
        if (e == null) future.complete(response);
        else future.completeExceptionally(e);
    }
});
//在此处调用Quasar的API,挂起直至RPC调用完成
Response response = AsyncCompletionStage.get(future);

上述代码依然具有异步回调不直观的缺点,通过JDK8的函数式接口可以实现一个通用的调用模板,将异步回调变为同步等待的形式。

@FunctionalInterface
private interface RpcAsyncCall<TRequest, TResponse> {
    void request(TRequest request, Callback<TResponse> callback);
}
public <TRequest, TResponse> TResponse waitRpc(RpcAsyncCall<TRequest, TResponse> call, TRequest request) throws SuspendExecution {
    CompletableFuture<TResponse> future = new CompletableFuture<>();

    call.request(request, (response, e) -> {
        if (e == null) future.complete(response);
        else future.completeExceptionally(e);
    });

    try {
  //使用Quasar等待Future结果
        return AsyncCompletionStage.get(future);
    } catch (Exception e) {
        return null;
    }
}

最后的调用可简化一行代码,该方法适用于所有该Rpc框架提供的异步接口。

Response response= waitRpc(new RpcClient()::helloAsync, request);

2.4 阻塞操作的处理

Quasar协程使用的时候有一定的限制,由于调度器线程池大小固定,在协程中不能阻塞线程,执行线程将被占用。对于某些暂时只能依靠阻塞IO的调用,如数据库,消息队列等,无法使用协程等待其结果,当这些阻塞操作量不大的情况下,可使用另一个可伸缩的线程池等待结果,避免对协程调度器的影响。

public void waitBlocking() throws SuspendExecution {
    //从DB获取结果
    String ans = waitBlocking(this::selectFromDB);
}

private ExecutorService threadPool = Executors.newCachedThreadPool();

private <T> T waitBlocking(Supplier<T> supplier) throws SuspendExecution {
    CompletableFuture<T> future = new CompletableFuture<>();
    threadPool.submit(() -> {
        T ans = supplier.get();
        future.complete(ans);
    });

    try {
        return AsyncCompletionStage.get(future);
    } catch (Exception e) {
        return null;
    }
}

2.5 并发工具的使用

协程对并发锁的使用有比较大的限制,需要使用者理解线程锁与协程的调度机制。在synchronized同步块的内部,不能包含挂起协程的语句。当持有锁的协程挂起后会让出线程资源,由于锁的可重入性,另一个运行在同一个线程上的协程再加锁时同样会成功。另一方面,协程挂起后恢复执行时,也可能会在另一个线程上运行。出现两个线程操作共享资源的异常。同时未持有锁的线程释放时,会出现IllegalMonitorStateException异常。

但如果同步块的内部没有挂起协程的语句,则线程锁的机制仍然有效。线程的在执行过程中可能切换,而协程的调度在每个执行线程上是串行的,协程持有的锁在不包含挂起操作时,会在占用线程执行完毕直到退出同步块为止,不会发生锁失效的情况。

JDK并发包中的工具可分为两类,一类是Lock、Semaphore、CountDownLatch等具有线程可重入性的工具,不能在未释放资源前使用挂起协程的操作,而另一类则是原子变量、并发容器等不会让出线程的工具,仍可正常使用,但要注意高并发的情况下锁的性能。此外,在使用并发工具的阻塞方法,如await时,可能导致协程的执行线程中发生阻塞。

三、总结

系统运行在4核心的主机上,线程池构成如下。

业务逻辑运行在Quasar的协程调度线程池中,线程池大小为CPU核数。HTTP请求与RPC调用均通过内部的NIO线程池管理。此外定义了一个core size为8的可伸缩的线程池用于少量消息队列、DB等阻塞IO的操作。其余的线程是系统中引入的其他组件所新建的线程,正常情况下不会成为系统性能的瓶颈。

改造后,在业务高峰流量激增数十倍的情况下线程数量依然稳定,而CPU利用率也从平均5%以下提升至10%-60%,在瞬时与高峰流量下能保持稳定。集群CPU核数在保留一定的业务冗余以应对业务高峰的情况下,缩减至1/5。

3.1 限制与风险

Quasar协程不是Java的语言标准,没有JVM层面的支持,使用时必须手动抛出异常声明每一个挂起方法,对代码有一定的侵入性。使用不当时,可能出现异常。

代码的try/catch时可能同时捕获SuspendExecution异常,从而忘记标记方法,此方法字节码不会被修改,结合Quasar的原理不难看出,当没有织入字节码时,挂起方法恢复执行,无法还原方法栈帧和执行状态,将会出现语句被重复执行、空指针等错误。运行时空指针、死循环的症状,排查的重点是是否漏加SuspendExecution标记。

在新线程而不是新协程中使用挂起方法时,会出现同样的问题。Thread的构造方法中传入的是Runnable接口对象,其run方法没有声明SuspendExecution异常,run内部的语句不会被织入字节码,造成上述异常。

3.2 总结与展望

协程使得NIO能够更好地应用在Java中,比回调方法更易读易维护。对系统的改造集中在底层通信封装和对方法的标记上,业务逻辑无需修改。虽然具有一定的代码侵入性和理解成本,但这种学习成本能逐渐被代码的可维护性优势抵消。

异步编程最佳的实现方式是:“Codes Like Sync,Works Like Async”,即以同步的方式编码,达到异步的效果与性能,兼顾可维护性与可伸缩性。OpenJDK 在2018年创建了Loom 项目(https://wiki.openjdk.java.net/display/loom),目标是在JVM上实现轻量级的线程,并解除JVM线程与内核线程的映射。相信会给Java生态带来巨大的改变。