添加链接
link之家
链接快照平台
  • 输入网页链接,自动生成快照
  • 标签化管理网页链接

Spring5的函数式编程使用了ProjectReactor工程的类,使用最为多的就是Mono和Flux类型,其中Mono是针对0到1个元素进行操作,Flux是针对多个元素进行操作。要使用这两个类前提是自己要有jdk8,Lambda,函数式编程的基础,否则请先学习了以上知识再来接触Spring5的Mono和FLux。

我们针对Mono类的一行代码,来进行源码分析与讲解。代码如下:

Mono.just("hello").subscribe(System.out::println);

这行代码是生成有一个元素的Mono类,并打印“hello”字符串。

首先,通过Mono.just()方法生成了Mono类

	public static <T> Mono<T> just(T data) {
		return onAssembly(new MonoJust<>(data));

以上代码可以看到是创建了一个MonoJust类型,这个类和Mono,FLux类型一样在reactor-core包中,其实Mono的每个方法都会对应生成一个Mono的子类,其子类很多,如下:

截图只是列举了一部分,Mono采用这种每个方法都生成一个类的方式是与jdk8中的Stream流水线的最大区别,目的是为了重用任意阶段的结果,且其所有子类都实现了Mono类的方法:

public abstract void subscribe(CoreSubscriber<? super T> actual);

这个方法我们后面会讲,它的作用就是处理后面的函数式的逻辑。

在返回MonoJust类型后,我们调用了subscribe(System.out::println)方法,这个方法在Mono类有具体实现,方法如下:

	public final Disposable subscribe(Consumer<? super T> consumer) {
		Objects.requireNonNull(consumer, "consumer");
		return subscribe(consumer, null, null);

这个方法是所有Mono的子类执行subscribe(CoreSubscriber<? super T> actual)方法的入口,我们进入看下,执行到了如下方法:

	public final Disposable subscribe(
			@Nullable Consumer<? super T> consumer,
			@Nullable Consumer<? super Throwable> errorConsumer,
			@Nullable Runnable completeConsumer,
			@Nullable Consumer<? super Subscription> subscriptionConsumer) {
		return subscribeWith(new LambdaMonoSubscriber<>(consumer, errorConsumer,
				completeConsumer, subscriptionConsumer));

我们看到有四个参数,根据字面意思即可理解,lambda表达式是第一个参数consumer,可以看到执行Lambda的方法时候创建了一个类型LambdaMonoSubscriber,这个类型封装了封装了四个参数。

后面进入此方法:

	@Override
	public final void subscribe(Subscriber<? super T> actual) {
		onLastAssembly(this).subscribe(Operators.toCoreSubscriber(actual));

这个方法是重写了父类Publisher的suscribe方法,我们看到代码做了类型转换,通过类型转换就转换为了可以调用MonoJust中的subscribe方法了,MonoJust总的subscribe方法如下,所有的Mono子类都重写了此方法:

	@Override
	public void subscribe(CoreSubscriber<? super T> actual) {
		actual.onSubscribe(Operators.scalarSubscription(actual, value));

然后通过LambdaMonoSubscriber的onSubscribe方法在调用Operators的request方法,再调用此LambdaMonoSubscriber的onNext方法,最后调用了我们自己写的Lambda表达式,如下:

	@Override
	public final void onNext(T x) {
		Subscription s = S.getAndSet(this, Operators.cancelledSubscription());
		if (s == Operators.cancelledSubscription()) {
			Operators.onNextDropped(x, Context.empty());
			return;
		if (consumer != null) {
			try {
				consumer.accept(x);

consumer就是我们传入进来的Lambda表达式,这个方法继承自Suscriber类

通过这种方式,Publisher发送的subscribe就被Subscriber类消费掉了。其实全程都是一个单线程的操作。借鉴了消费订阅模式。

关于Mono的方法很多,在此只是举了一个简单的例子,其他的方法也可以通过类似的方式去研究。

在此处有一篇比较好的介绍Reactor编程的文章

ReactorReactor有两种型,Flux<T>和Mono<T>。Flux似RaxJava的Observable,它可以触发零到多个事件,并根据实际情况结束处理或触发错误。 Mono最多只触发一个事件,它跟RxJava的Single和Maybe似,所以可以把Mono<Void>用于在异步任务完成时发出通知。 因为这两种型之间的简单... 浏览器打开 适合阅读的人群:本文适合对 Spring、Netty 等框架,以及 Java 8 的 Lambda、Stream 等特性有基本认识,希望了解 Spring 5 的反应式编程特性的技术人员阅读。一、前言最近几年,随着 Node.js、Golang 等新技术、新语言的出现,Java 的服务器端开发语言老大的地位受到了不小的挑战。虽然,Java 的市场份额依旧很大,短时间内也不会改变,但 Java 社区... 浏览器打开 反应式编程(Reactive Programming)这种新的编程范式越来越受到开发人员的欢迎。在 Java 社区中比较流行的是 RxJava 和 RxJava 2。本文要介绍的是另外一个新的反应式编程Reactor。 反应式编程介绍 反应式编程来源于数据流和变化的传播,意味着由底层的执行模型负责通过数据流来自动传播变化。比如求值一个简单的表达式 c=a+b,当 a 或者 b 的值发生变化 浏览器打开 FluxMonoReactor 中的两个基本概念。Flux 表示的是包含 0 到 N 个元素的异步序列。在该序列中可以包含三种不同型的消息通知:正常的包含元素的消息、序列结束的消息和序列出错的消息。当消息通知产生时,订阅者中对应的方法onNext(), onComplete()和 onError()会被调用。Mono 表示的是包含 0 或者 1 个元素的异步序列。该序列中 浏览器打开 搞懂webflux和reactive,首先要搞懂以下问题: 1. 关于Reactive Streams、Srping Reactor 和 Spring WebFlux之间的关系? 2. 反应式编程思想是什么?Backpressure背压又是什么? 3. 既然Webflux不是基于Servlet,那么Spring Security等基于Servlet的组件可以用吗? 4. 如何更好的理解MonoFlux? 5. spring官方如何reactive化? 6. 如何控制Backpressure? 浏览器打开 import org.junit.jupiter.api.Test; import org.springframework.boot.test.context.SpringBootTest; import reactor.core.publisher.Mono; import java.util.Date; @SpringBootTest 浏览器打开 官网:https://projectreactor.io/ 教程:https://projectreactor.io/docs/core/release/reference/#about-doc ReactorReactor有两种型,Flux<T>和Mono<T>。Flux似RaxJava的Observable,它可以触发零到多个事件,并根据实际情况结束... 浏览器打开