[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-rI7dsGXh-1657721282486)(img/b33888d8-4408-4fd5-9215-3a609f5cea79.png)]
通过使用以下操作符之一,可以通过到达顺序(flatMap
)、保持最后发射的顺序(switchMap
)或通过保持原始顺序(concatMap
)将给定的可观察对象转换为单个可观察对象:concatMap
、concatMapDelayError
、concatMapEager
、concatMapEagerDelayError
、concatMapIterable
、flatMap
、flatMapIterable
、switchMap
,或switchMapDelayError
。下面的示例演示了如何通过随机选择可观察对象的顺序来更改输出的内容。(flatMap
、concatMap
、switchMap
:
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-5a7ELn9t-1657721282491)(img/c3fdb310-ec85-4469-86b6-329586a071bd.png)]
concatMap
实现将c
字符串附加到给定的a
、b
和c
字符串中的每一个,因此,输出是ac
、bc
和cc
。
flatMap
实现将f
字符串附加到给定的a
、b
和c
字符串中的每一个,如下所示:
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-PeMOYZtM-1657721282491)(img/96750fcd-8d66-48dc-b62d-dea37238d9bf.png)]
由于随机延迟,顺序与预期的af
、bf
、cf
不同,运行几次就会输出预期的顺序。
下面的代码段显示了不同的输出。
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-AmB5BdJs-1657721282491)(img/98226dc4-761c-43f3-a4cf-d70b57c77f86.png)]
switchMap
实现将s
字符串附加到给定的a
、b
和c
字符串列表中的最后一个元素。
注意advanceTimeBy
的用法。没有这个电话,什么都不会打印,因为发射被推迟了。
这些是在发生可恢复的故障(例如服务暂时关闭)时要使用的操作符。他们通过重新订阅来工作,希望这次能顺利完成。可用的 RxJava 方法如下:
retry
:错误时永远重放同一流程,直到成功retryUntil
:重试,直到给定的stop
函数返回true
retryWhen
:基于接收错误/异常的重试逻辑函数,在错误情况下永远重放相同的流,直到成功为止
在下面的示例中,我们使用只包含两个值的zip
来创建重试逻辑,该逻辑在一个时间段后重试两次以运行失败的序列,或者用 500 乘以重试计数。当连接到无响应的 Web 服务时,尤其是从每次重试都会消耗设备电池的移动设备时,可以使用此方法:
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-aXhV9JCk-1657721282499)(img/70d6545a-2809-4a40-8d32-b37e6e937ae9.png)]
在线程调度方面,可观测是不可知的——在多线程环境中,这是调度器的工作。一些操作符提供了可以将调度器作为参数的变体。有一些特定的调用允许从下游(使用操作符的点,这是observeOn
的情况)或不考虑调用位置(调用位置无关紧要,因为这是subscribeOn
方法的情况)观察流。在下面的示例中,我们将从上游和下游打印当前线程。注意,在subscribeOn
的情况下,线程总是相同的:
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-5qza8GLT-1657721282499)(img/4acf684b-55fd-4938-845d-b40f900a1022.png)]
注意map
方法中的线程主要用法:
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-y5mpwm36-1657721282499)(img/027fbe82-d16e-4588-99b5-8abae35211b6.png)]
请注意,map
方法不再使用线程main
。
RxJava2.0 提供了更多来自io.reactivex.schedulers.Schedulers
工厂的调度器,每个调度器都有特定的用途:
computation()
:返回用于计算工作的Scheduler
实例io()
:返回一个用于 I/O 工作的Scheduler
实例single()
:对于需要在同一后台线程上强顺序执行的工作,返回Scheduler
实例trampoline()
:返回一个Scheduler
实例,该实例在一个参与线程上以 FIFO 方式执行给定的工作newThread()
:返回一个Scheduler
实例,该实例为每个工作单元创建一个新线程from(Executor executor)
:将Executor
转换成新的Scheduler
实例,并将工作委托给它
有一个只用于特殊测试目的的Scheduler
,称为io.reactivex.schedulers.TestScheduler
。我们已经使用了它,因为它允许手动推进虚拟时间,因此非常适合于测试依赖于时间的流,而不必等待时间通过(例如,单元测试)。
主体是可观察的和订户的混合体,因为它们都接收和发射事件。RxJava2.0 提供了五个主题:
AsyncSubject
:仅发射源可观测到的最后一个值,后跟一个完成BehaviorSubject
:发射最近发射的值,然后是可观测源发射的任何值PublishSubject
:仅向订阅方发送订阅时间之后源发送的项目ReplaySubject
:向任何订户发送源发出的所有项目,即使没有订阅UnicastSubject
:只允许单个用户在其生存期内订阅
在下面的示例中,我们将展示 RxJava 在实时处理从多个传感器接收到的温度中的用法。传感器数据由 Spring 引导服务器提供(随机生成)。服务器配置为接受传感器名称作为配置,以便我们可以为每个实例更改它。我们将启动五个实例,并在客户端显示警告,如果其中一个传感器输出超过 80 摄氏度。
使用以下命令可以从 bash 轻松启动多个传感器:
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-TiNyK0FI-1657721282500)(img/2570a499-f35d-448f-92a0-e008531b9272.png)]
服务器端代码很简单,我们只配置了一个 REST 控制器,将传感器数据输出为 JSON,如下代码所示:
@RestController
publicclass SensorController
@Value("${sensor.name}")
private String sensorName;
@RequestMapping(value="/sensor", method=RequestMethod.GET,
produces=MediaType.APPLICATION_JSON_VALUE)
public ResponseEntity<SensorData> sensor() throws Exception
SensorData data = new SensorData(sensorName);
HttpHeaders headers = new HttpHeaders();
headers.set(HttpHeaders.CONTENT_LENGTH, String.valueOf(new
ObjectMapper().writeValueAsString(data).length()));
returnnew ResponseEntity<SensorData>(data, headers,
HttpStatus.CREATED);
传感器数据是在SensorData
构造器中随机生成的(注意 Lombock 库的使用,以摆脱获取设置器代码):
@Data
publicclass SensorData
@JsonProperty
Double humidity;
@JsonProperty
Double temperature;
@JsonProperty
String sensorName;
public SensorData(String sensorName)
this.sensorName = sensorName;
humidity = Double.valueOf(20 + 80 * Math.random());
temperature = Double.valueOf(80 + 20 * Math.random());
现在我们已经启动了服务器,我们可以从支持 RxJava 的客户端连接到它。
客户端代码使用 rxapache http 库:
publicclass Main
@JsonIgnoreProperties(ignoreUnknown = true)
staticclass SensorTemperature
Double temperature;
String sensorName;
public Double getTemperature()
return temperature;
publicvoid setTemperature(Double temperature)
this.temperature = temperature;
public String getSensorName()
return sensorName;
publicvoid setSensorName(String sensorName)
this.sensorName = sensorName;
@Override
public String toString()
return sensorName + " temperature=" + temperature;
SensorTemperature
是我们的客户资料。它是服务器可以提供的内容的快照。其余信息将被 Jackson 数据绑定器忽略:
publicstaticvoid main(String[] args) throws Exception
final RequestConfig requestConfig = RequestConfig.custom()
.setSocketTimeout(3000)
.setConnectTimeout(500).build();
final CloseableHttpAsyncClient httpClient = HttpAsyncClients.custom()
.setDefaultRequestConfig(requestConfig)
.setMaxConnPerRoute(20)
.setMaxConnTotal(50)
.build();
httpClient.start();
在前面的代码中,我们通过设置 TCP/IP 超时和允许的连接数来设置并启动 HTTP 客户端:
Observable.range(1, 5).map(x ->
Try.withCatch(() -> new URI("http", null, "127.0.0.1", 8080 + x, "/sensor", null, null), URISyntaxException.class).orElse(null))
.flatMap(address -> ObservableHttp.createRequest(HttpAsyncMethods.createGet(address), httpClient)
.toObservable())
.flatMap(response -> response.getContent().map(bytes -> new String(bytes)))
.onErrorReturn(error -> "{"temperature":0,"sensorName":""}")
.map(json ->
Try.withCatch(() -> new ObjectMapper().readValue(json, SensorTemperature.class), Exception.class)
.orElse(new SensorTemperature()))
.repeatWhen(observable -> observable.delay(500, TimeUnit.MILLISECONDS))
.subscribeOn(Schedulers.io())
.subscribe(x -> {
if (x.getTemperature() > 90) {
System.out.println("Temperature warning for " + x.getSensorName());
} else {
System.out.println(x.toString());
}, Throwable::printStackTrace);
前面的代码基于范围创建 URL 列表,将其转换为响应列表,将响应字节展开为字符串,将字符串转换为 JSON,并将结果打印到控制台。如果温度超过 90 度,它将打印一条警告信息。它通过在 I/O 调度器中运行来完成所有这些,每 500 毫秒重复一次,如果出现错误,它将返回默认值。请注意Try
单子的用法,因为选中的异常是由 Lambda 代码引发的,因此需要通过转换为可由 RxJava 在onError
中处理的未选中表达式或在 Lambda 块中本地处理来处理。
由于客户端永远旋转,部分输出如下:
NuclearCell2 temperature=83.92902289170053
Temperature warning for NuclearCell1
Temperature warning for NuclearCell3
Temperature warning for NuclearCell4
NuclearCell5 temperature=84.23921169948811
Temperature warning for NuclearCell1
NuclearCell2 temperature=83.16267124851476
Temperature warning for NuclearCell3
NuclearCell4 temperature=81.34379085987851
Temperature warning for NuclearCell5
NuclearCell2 temperature=88.4133065761349
在本章中,我们学习了反应式编程,然后重点介绍了可用的最常用的反应式库之一——RxJava。我们学习了反应式编程抽象及其在 RxJava 中的实现。我们通过了解可观察对象、调度器和订阅是如何工作的、最常用的方法以及它们是如何使用的,从而通过具体的示例迈出了进入 RxJava 世界的第一步。
在下一章中,我们将学习最常用的反应式编程模式,以及如何在代码中应用它们。
使用RxJava实现反应式编程
RxJava是对Java和Android进行反应式编程的具体实现,它受到了函数式编程的影响。RxJava倡导函数组合,避免出现全局状态和副作用,并且要以流的方式思考,进而组合异步和基于事件的程序。它起源于观察者模式的生产者和消费者回调,并且扩展了几十个操作符来实现组合、转换、调度、节流、错误处理和生命周期管理。
反应式编程和RxJava
反应式编程是一个通用的编程术语,它主要关注对变更做出反应,比如数据值或事件。反应式编程通常可以按照命令式的方式实现。回调就是以一种命令式实现
rxjava适用于大量的任务之间没有依赖关系,可以并发执行的情况。
回调方法 (onNext, onCompleted, onError)
Subscribe方法用于将观察者连接到Observable,你的观察者需要实现以下方法的一个子集:
onNext(T item)
Observable调用这个方法发射数据,方法的参数就是Observable发射的数据,这个方法可能会被调用多次,取决于你的实现...
反应器设计模式(Reactor
pattern)是一种为处理并发服务请求,并将请求提交到一个或
者多个服务处理程序的事件设计模式。当客户端请求抵达后,服务处理程序使用多路分配策略,由一个非阻塞的线程来接收所有的请求,然后派发这些请求至相关的工作线程进行处理。
二、什么场景下使用Reactor模式?
出版商(Publisher)接口声明了一个方法subscribe()(订阅)。
订阅人(Subscriber)可以通过此方法向出版商(Publisher)发起订阅。
出版商(Publisher)创建数据,并将数据发送给订阅的订阅人(Subscriber)。
public interface Publisher<...