背景

最近在项目里需要实现这样一个接口:从多个数据源分别查询不同的数据,再将所有的结果组装在一起,返回最终结果。本身上没有什么难点,只是这一段代码需要维护在服务端中,服务端本身在公司的 RPC框架下,就是被多个客户端并发调用的。现在的情况就是,服务端线程A和线程B分别在处理客户端A和B发送过来的请求,同时,线程A又开启了多个线程去并发查询各自的数据,B也一样。

我考虑用一个公共的线程池,将查询数据的过程封装成各种任务,然后提交到线程池中,获取返回的各个future。最基本的想法就是在你的接口内部维护一个list或者类似的容器,主线程将提交的任务 的结果都保存在list中。提交完所有的任务后,再遍历list里的各个future,去get结果,这种做法的最大坏处就是你只能按照任务提交的顺序去依次get,很可能你阻塞在get的时候,其他任务结果 已经返回了,但你不得不等待着;当然这种你也需要为不同的任务设置不同的超时时间,不能一直阻塞,从而影响最终结果的返回。

不过我之前在《Java 并发编程实战》中看到过CompletionService,知道这个东西可以用于提交一组任务,本质上还是委托给了一个线程池,然后在其take()或者poll方法中,取返回结果。所以 我当时毫不犹豫的选择了这个,从此开启了挖坑之旅。伪代码大致如下(实际封装的比这个复杂):

for(int i = 0; i < n; i++) {
    completionService.submit(callable);
}

for(int i = 0; i < n; i++) {
    Future<Result> poll = completionService.poll(100, TimeUnit.MILLISECONDS);
    if (poll != null) {
        return poll.get();
    }
}

completionService是这样构造的

private static java.util.concurrent.ExecutorService executor = new ThreadPoolExecutor(32, 64,
        30, TimeUnit.MINUTES, new ArrayBlockingQueue<>(1000), new ThreadRenameFactory("FeatureGet"));

private CompletionService<Result> completionService = new ExecutorCompletionService<>(executor);

注意,我们肯定是要对所有请求都复用一个线程池的,这个线程池做的工作就是从各个不同的数据源查询相关的数据,进行相关的逻辑处理,然后返回。要注意的是,我们这现在是一个RPC服务,实现类 本身在服务启动的时候,只会产生一个proxy实例,后面所有的调用方请求都是通过这同一个实例来调用的。

RPC实现类

这儿就是问题的关键了,可以看到上面的代码。executor和completionService我都是定义为了这个实现类的成员变量(不管 静态or非静态的),刚刚提到实现类的proxy实例只有一个,那么内存中也只有一个completionService对象,调用方A的请求和 调用方B的请求都使用这个对象。它们各自起的查询线程返回的结果都保存在了同一个completionService内部维护的一个LinkedBlockingQueue 中。如果因为某种原因(底层db查询耗时长,我poll等100ms就不等了),第一个请求的结果没有全部返回,第二次请求就可能拿到 第一次请求的结果,从而导致结果的乱序。

关键

只要意识到RPC实现类的proxy实例只有一个,那么这里应该做的就是executor不变,设置为类的成员变量(此处是否为static都 可以),completionService必须在方法内部定义,即每个处理调用方请求的线程都拥有一个自己的completionService,才能 保证数据的正确性。

CompletionService

关于这个接口,其实在JDK 1.5就已经出现了,它内部维护了一个队列来保存已完成任务的结果future,因此不同的线程只要使用 不同的completionService对象是没有问题的。注意在实际使用的时候,往往用poll方法而不是take方法取数据,防止一直阻塞。

CompletableFuture

CompletableFuture在Java 8中才出现,但是其功能非常强大,上面的功能完全可以用CompletableFuture.allOf来实现, 关于CompletableFuture的API内容挺多的,但是其功能都很有意思,可以下次在项目中用一下其中的功能。