您现在的位置是:首页 > 正文

RxJava 两种生产和消费模式,(冷)cold和(热)hot

2024-01-30 21:34:23阅读 0

RxJava目前有两种发布和订阅模式

第一种 cold模式,这种模式观察者订阅被观察模式时,被观察者的动作会重放,举例说明:
@NonNull
		Flowable<@NonNull Object> observeOn =Flowable.create(e -> {
			e.onNext(1); Thread.sleep(1000);
			e.onNext(2); Thread.sleep(1000);
			e.onNext(3); Thread.sleep(1000);
			e.onNext(4); Thread.sleep(1000);
			e.onNext(5); Thread.sleep(1000);
			e.onNext(6); Thread.sleep(1000);
		}, BackpressureStrategy.BUFFER).observeOn(Schedulers.computation());
		Thread.sleep(1000 * 1);
		observeOn.subscribe(e -> System.out.println(Thread.currentThread().getName() + " + next + " + e));
		Thread.sleep(1000 * 1);
		observeOn.subscribe(e -> System.out.println(Thread.currentThread().getName() + " + next + " + e));

这种会大音两次1,2,3,4,5,6,即使时不同的线程,第二次会等第一次完成过后开始(因为未设置subscribeOn,所以是单线程的,第二次会在第一次完成后再开始)。而且是当观察者订阅被观察者的时候触发被观察者的动作。

RxComputationThreadPool-1 + next + 1
RxComputationThreadPool-1 + next + 2
RxComputationThreadPool-1 + next + 3
RxComputationThreadPool-1 + next + 4
RxComputationThreadPool-1 + next + 5
RxComputationThreadPool-1 + next + 6
RxComputationThreadPool-2 + next + 1
RxComputationThreadPool-2 + next + 2
RxComputationThreadPool-2 + next + 3
RxComputationThreadPool-2 + next + 4
RxComputationThreadPool-2 + next + 5
RxComputationThreadPool-2 + next + 6

我们create中lambda的方法会再次运行

相似的ReplaySubject

还有一种当订阅后会重新处理已发送的数据ReplaySubject

ReplaySubject<Integer> replaySubject = ReplaySubject.create(1);
		replaySubject.onNext(1);
		replaySubject.onNext(2);
		replaySubject.onNext(3);
		System.out.println("subscribe 1");
		replaySubject.observeOn(Schedulers.computation()).subscribe(e -> System.out.println(Thread.currentThread().getName() + " + next + " + e));
		replaySubject.onNext(4);
		replaySubject.onNext(5);
		System.out.println("subscribe 2");
		replaySubject.observeOn(Schedulers.computation()).subscribe(e -> System.out.println(Thread.currentThread().getName() + " + next + " + e));
		replaySubject.onNext(6);

控制台输出为

subscribe 1
subscribe 2
RxComputationThreadPool-1 + next + 1
RxComputationThreadPool-1 + next + 2
RxComputationThreadPool-1 + next + 3
RxComputationThreadPool-1 + next + 4
RxComputationThreadPool-1 + next + 5
RxComputationThreadPool-1 + next + 6
RxComputationThreadPool-2 + next + 1
RxComputationThreadPool-2 + next + 2
RxComputationThreadPool-2 + next + 3
RxComputationThreadPool-2 + next + 4
RxComputationThreadPool-2 + next + 5
RxComputationThreadPool-2 + next + 6

但是Flowable.create和ReplaySubject的模式不太一样。
对于Flowable来说,新的subscribe来临时,会重新执行create方法里面的FlowableOnSubscribe的apply方法。
对于ReplaySubject是把前面onNext()的数据保存到list中,然后新的subscribe来临时重新遍历list消费。这里需要注意ReplaySubject有内存泄漏的风险。见io.reactivex.rxjava3.subjects.ReplaySubject.buffer。

第二种是HOT模式。

1、使用cold + publish()方法修改cold为hot。

@NonNull
		ConnectableFlowable<@NonNull Object> publish = Flowable.create(e -> {
			e.onNext(1); Thread.sleep(1000);System.out.println(1+ " " + Thread.currentThread().getName());
			e.onNext(2); Thread.sleep(1000);System.out.println(2+ " " + Thread.currentThread().getName());
			e.onNext(3); Thread.sleep(1000);System.out.println(3+ " " + Thread.currentThread().getName());
			e.onNext(4); Thread.sleep(1000);System.out.println(4+ " " + Thread.currentThread().getName());
			e.onNext(5); Thread.sleep(1000);System.out.println(5+ " " + Thread.currentThread().getName());
			e.onNext(6); Thread.sleep(1000);System.out.println(6+ " " + Thread.currentThread().getName());
		}, BackpressureStrategy.BUFFER).subscribeOn(Schedulers.computation()).publish();
		System.out.println("connect");
		publish.connect();
		System.out.println("subscribe 1");
		publish.subscribe(e -> System.out.println(Thread.currentThread().getName() + " + next + " + e));
		Thread.sleep(3000 * 1);
		System.out.println("subscribe 2");
		publish.observeOn(Schedulers.computation()).subscribe(e -> System.out.println(Thread.currentThread().getName() + " + next + " + e));

下面是输出,可以看到没有出现重放。

connect
subscribe 1
main + next + 1
1 RxComputationThreadPool-1
RxComputationThreadPool-1 + next + 2
2 RxComputationThreadPool-1
RxComputationThreadPool-1 + next + 3
3 RxComputationThreadPool-1
RxComputationThreadPool-1 + next + 4
subscribe 2
4 RxComputationThreadPool-1
RxComputationThreadPool-1 + next + 5
RxComputationThreadPool-2 + next + 5
5 RxComputationThreadPool-1
RxComputationThreadPool-1 + next + 6
RxComputationThreadPool-2 + next + 6
6 RxComputationThreadPool-1

2、或者使用Subject对象
这里使用了PublishSubject,他的观察者只处理订阅过后的数据。subject包含了其他类型的对象,可以参考RxJava 的 Subject

		PublishSubject<Integer> publishSubject = PublishSubject.create();
		publishSubject.onNext(1);
		publishSubject.onNext(2);
		publishSubject.onNext(3);
		System.out.println("subscribe 1");
		publishSubject.observeOn(Schedulers.computation()).subscribe(e -> System.out.println(Thread.currentThread().getName() + " + next + " + e));
		publishSubject.onNext(4);
		publishSubject.onNext(5);
		System.out.println("subscribe 2");
		publishSubject.observeOn(Schedulers.computation()).subscribe(e -> System.out.println(Thread.currentThread().getName() + " + next + " + e));
		publishSubject.onNext(6);

执行过后的打印

subscribe 1
subscribe 2
RxComputationThreadPool-1 + next + 4
RxComputationThreadPool-1 + next + 5
RxComputationThreadPool-1 + next + 6
RxComputationThreadPool-2 + next + 6

网站文章

  • (pytorch进阶之路)四种Position Embedding的原理及实现

    (pytorch进阶之路)四种Position Embedding的原理及实现

    定义子函数,获得每个window中两两patch之间二维的位置偏差,使用torch.meshgrid函数,根据x轴和y轴范围得到网格每个点的x坐标和y坐标,将其堆叠,获取任何两个点之间的横轴与纵轴坐标...

    2024-01-30 21:33:56
  • otter学习 | canal和otter的关系? 热门推荐

    otter学习 | canal和otter的关系? 热门推荐

    在回答这问题之前,首先来看一张canal&amp;otter和mysql复制的类比图: mysql的自带复制技术可分成三步: master将改变记录到二进制日志(binary log)中(这些记录叫做二进制日志事件,binary log events,可以通过show binlog events进行查看); slave将master的binary log events拷贝到它的中继日志(...

    2024-01-30 21:33:49
  • 设计模式(19)命令模式

    设计模式(19)命令模式

    1、定义:命令模式(Command Pattern)是一种行为设计模式,它将请求封装为一个对象,从而使你可以使用不同的请求对客户端进行参数化。(2)具体命令类(Concrete Command):实现...

    2024-01-30 21:33:42
  • 自定义线程池

    自定义线程池

    一:参数分析 我们要想自定义线程池,必须先了解线程池的工作流程,才能自己定义线程池。下图是ThreadPoolExecutor的构造方法。 我们可以通过下面的场景理解ThreadPoolExecuto...

    2024-01-30 21:33:13
  • MobaXterm登录密码重置问题

    MobaXterm登录密码重置问题

    登录MobaXterm提示输入密码,密码输入多次后无果,密码忘记如法使用MobaXterm软件,经过查询后可采用密码重置的方式处理。使用浏览器打开如下网址:https://mobaxterm.moba...

    2024-01-30 21:33:04
  • 括号匹配数据结构

    学习分享本周学习的是数据结构的括号匹配,所谓括号匹配指的是在命令端输入一行只含有括号的代码,然后运行代码,判断每一个左括号是否有一个右括号与之对应,从而判断输入的数据是否违法代码如下:#define ...

    2024-01-30 21:32:57
  • echart图表保存为图片的两种方式

    echart图表保存为图片的两种方式

    将echarts图表和ucharts图表保存为图片

    2024-01-30 21:32:28
  • 头歌大数据——MapReduce 基础实战 答案 无解析

    头歌大数据——MapReduce 基础实战 答案 无解析

    2024-01-30 21:32:20
  • JMeter巧用计数器实现CSV数据文件设置的功能

    JMeter巧用计数器实现CSV数据文件设置的功能

    需求本次压测范围包含登录接口,但是压测环境user表用户数据量太少,和生产环境数据量不是一个量级,因此,需要先通过并发跑注册接口造用户数据需要参数化的字段是username和phone说明:本次演示的接口是示例接口,非实际生产环境接口注册接口如下:方案一:CSV 数据文件设置我们先通过代码(python或者java均可)造一定量的参数化数据写在参数化reg.txt文件中pac...

    2024-01-30 21:32:13
  • android线,android_线

    android线,android_线

    说明:android螺纹。android无非就是一个线程Main Thread和Worker Thread。(除了主线程Main Thread是Worker Thread)Main Thread 也叫...

    2024-01-30 21:31:35