DebugEN
科技森
专注于Java开发~每天都会更新文章~
  1. 首页
  2. Java
  3. 正文

我在RxJava使用线程池时遇到的问题

2021年12月30日 1081点热度 0人点赞 0条评论 作者: kejisen

最近在瞎折腾rxjava,写了一段自认为能并发执行的代码如下:

// 大小为5的线程池
ThreadPoolExecutor exec = new ThreadPoolExecutor(
    5, 5, 200, TimeUnit.SECONDS, new LinkedBlockingQueue<>());

Flowable.just(1, 2, 3, 4, 5)
        .subscribeOn(Schedulers.from(exec))
        .subscribe(i -> {
            Thread.sleep(1000L);
            System.out.println(i + "\t" + Thread.currentThread().getName());
        });

由于我在subscribe中sleep了1s,所以我认为这五个数字会并发的执行到subscribe中去,期待会有如下的输出

1   pool-1-thread-1
2   pool-1-thread-2
3   pool-1-thread-3
4   pool-1-thread-4
5   pool-1-thread-5

然而事与愿违,实际的输出是这样的

1   pool-1-thread-1
2   pool-1-thread-1
3   pool-1-thread-1
4   pool-1-thread-1
5   pool-1-thread-1

嗯嗯?为什么没有并发执行subscribe里的代码呢?我以为是我自己的代码有问题,又陆续尝试了内置的一些Scheduler,consumer均是在同一个线程中执行的,好吧,看来是我理解错rxjava的Schedulers了,这货的from方法接收一个Executor参数,并不是指接下来的任务会提交给这个线程池并发的执行。

这大概也是RxJava和CompletableFuture的区别之一吧。搜索了一圈,用rxjava实现并发主要以以下几个方法

  1. 在flatMap中使用obseveOn
Flowable.just(1, 2, 3, 4, 5)
        .flatMap(i -> Flowable.just(i).observeOn(Schedulers.from(exec))
                .doOnNext(d -> {
                    System.out.println(d + "\t" + Thread.currentThread().getName());
                    Thread.sleep(1000L);
                }))
        .subscribe();
  1. 在flatMap中使用Future
Flowable.just(1, 2, 3, 4, 5)
        .flatMap(i -> Flowable.fromFuture(CompletableFuture.completedFuture(i), Schedulers.from(exec))
                .doOnNext(d -> {
                    System.out.println(d + "\t" + Thread.currentThread().getName());
                    Thread.sleep(1000L);
                })
        )
        .subscribe();
  1. 使用ParallelFlowable
Flowable.just(1, 2, 3, 4, 5)
        .parallel()
        .runOn(Schedulers.from(exec))
        .doOnNext(d -> {
            System.out.println(d + "\t" + Thread.currentThread().getName());
            Thread.sleep(1000L);
        })
        .sequential()
        .subscribe();

有两个要注意的地方

  1. RxJava在执行并发的时候,并不会使用Executor的maximumPollSize这个属性,corePollSize有多大,那么最大就有多少个线程
  2. parallel()有一个重载方法可以传入并发数,默认为cpu核心数,在单核的服务器上这个数字是1,也就是不管Executor有多少个线程,只会用一个线程去执行任务

后续

最近发现在使用UnicastProcessor和ParallelFlowable的时候有cpu占用高的情况,经过跟踪发现是这样的问题:

Flowable是支持背压的,所以在元素弹出过快的时候会抛出异常,而我又使用了retry,使得在抛出异常的时候会重新订阅Flowable,而UnicastProcessor只能被订阅一次,所以抛出了大量的IllegalStateException

我还发现了其他问题

  1. subscribeOn是对subscribe中的代码不起作用的,应该用observeOn
  2. 之所以没有用到maximumPoolSize,是因为没有达到阀值,应该给blockingqueue设置一个大小
标签: java rxjava
最后更新:2021年12月30日

kejisen

保持饥渴的专注,追求最佳的品质

点赞
< 上一篇
下一篇 >

文章评论

取消回复
最新 热点 随机
最新 热点 随机
【原创】记录一次失败的折腾——使用jkeymaster实现的按键监听 【原创】这些年我用过的IDEA插件 【原创】在windows上使用VNC远程连接linux桌面 我在RxJava使用线程池时遇到的问题 [原创文章] Swagger生成pdf格式的接口文档 [个人翻译]Java HTTP工具类的客户端证书认证 [原创] 如何从 Git 的提交历史记录中删除大文件 [翻译] 创建一个只读的Repository接口(Spring Data) [翻译] 反射的用法——用Java调用私有方法 Java 虚拟机最多可以支持多少个线程? 排查Hibernate的慢查询日志–这是查找慢查询的最简单方法 [翻译] 使用apache poi在excel文件中插入一行数据 [翻译] 在Spring 中@EntityScan与@ComponentScan注解有什么区别 [原创] 从QQ音乐网页版扒歌词的补充说明 [原创] 介绍java maven项目的多种打包方式 原创——在Java中生成随机数 将G1垃圾回收的内存使用量减少20%(翻译) [原创] java8 lambda表达式的toMap造成的空指针异常 [原创] 在Spring Boot中使用CommandLineRunner来在启动时执行代码 [转载] Kafka 节点重启失败导致数据丢失的分析排查与解决之道
排查Hibernate的慢查询日志–这是查找慢查询的最简单方法 [原创] 从QQ音乐网页版扒歌词的补充说明 [个人翻译]Java HTTP工具类的客户端证书认证 [原创] 关于Java的Base64编码 【原创】这些年我用过的IDEA插件 java maven项目的几种打包方式 我在RxJava使用线程池时遇到的问题 [原创] java8 lambda表达式的toMap造成的空指针异常 从QQ音乐获取并解析音乐的歌词 将G1垃圾回收的内存使用量减少20%(翻译) [原创] 如何从 Git 的提交历史记录中删除大文件 [原创] 如何使用okhttp发起application/json类型的请求 [原创] 在spring-boot中使用querydsl Spring Boot项目修改Tomcat端口号 [翻译] 使用apache poi在excel文件中插入一行数据 [原创] 如何使用java(javamail)发送带附件的邮件 [原创文章] Swagger生成pdf格式的接口文档 [翻译] 创建一个只读的Repository接口(Spring Data) [原创]javaslang(vavr.io)中Try的使用 使用Spring RestTemplate压缩请求
标签聚合
springboot linux base64 歌词 spring json elasticsearch java maven qq音乐

COPYRIGHT © 2020 Kejisen. ALL RIGHTS RESERVED.

THEME KRATOS MADE BY VTROIS