添加链接
link之家
链接快照平台
  • 输入网页链接,自动生成快照
  • 标签化管理网页链接

1. Fuseable

控制变量解析 [?]

/** Indicates the QueueSubscription can't support the requested mode. */
int NONE = 0;
/** Indicates the QueueSubscription can perform sync-fusion. */
int SYNC = 1;
/** Indicates the QueueSubscription can perform only async-fusion. */
int ASYNC = 2;
/** Indicates the QueueSubscription should decide what fusion it performs (input only). */
int ANY = 3;
 * Indicates that the queue will be drained from another thread
 * thus any queue-exit computation may be invalid at that point.
 * For example, an {@code asyncSource.map().publishOn().subscribe()} sequence where {@code asyncSource}
 * is async-fuseable: publishOn may fuse the whole sequence into a single Queue. That in turn
 * could invoke the mapper function from its {@code poll()} method from another thread,
 * whereas the unfused sequence would have invoked the mapper on the previous thread.
 * If such mapper invocation is costly, it would escape its thread boundary this way.
int THREAD_BARRIER = 4;

内部接口解析

ConditionalSubscriber [?]
interface ConditionalSubscriber<T> extends CoreSubscriber<T> { 
	boolean tryOnNext(T t);
 

一种订阅者变体,它可以立即判断是否使用了该值,如果没有使用,则直接允许发送新值。这就避免了对丢弃的值进行通常的请求(1)往返。

QueueSubscription [?]
interface QueueSubscription<T> extends Queue<T>, Subscription {
	int requestFusion(int requestedMode);
	......
 

支持基于订阅的基于队列融合优化的契约。 同步源具有固定的大小,可以以拉方式发出它们的项,因此在许多情况下避免了请求计算开销。 异步源可以同时充当队列和订阅,节省了分配另一个队列的大部分时间。

int requestFusion(int requestedMode);

从该队列订阅请求特定的融合模式。 一个人应该请求SYNC, ASYNC或任何模式(从不是NONE),实现者应该返回NONE, SYNC或ASYNC(从不是ANY)。 例如,如果一个源只支持异步融合,而中间操作符只支持同步融合源,那么操作符可以请求同步融合,源可以通过NONE拒绝它,这样操作符也可以向下游返回NONE,而融合不会发生。

SynchronousSubscription
interface SynchronousSubscription<T> extends QueueSubscription<T> {
	@Override
	default int requestFusion(int requestedMode) {
		if ((requestedMode & Fuseable.SYNC) != 0) {
			return Fuseable.SYNC;
		return NONE;
 

同步源的基类,它有固定的大小,可以以拉的方式发出它们的项,因此在许多情况下避免了请求计算开销。

仅当请求模式为SYNC时,返回SYNC进行握手;否则,返回NONE拒绝。

ScalarCallable
interface ScalarCallable<T> extends Callable<T> { }
 

指示目标可以返回值或null的标记接口,否则立即失败,从而成为程序集时优化的可行目标。

2. CoreSubscriber

public interface CoreSubscriber<T> extends Subscriber<T> {
	 * Request a {@link Context} from dependent components which can include downstream
	 * operators during subscribing or a terminal {@link org.reactivestreams.Subscriber}.
	 * @return a resolved context or {@link Context#empty()}
	default Context currentContext(){
		return Context.empty();
	 * Implementors should initialize any state used by {@link #onNext(Object)} before
	 * calling {@link Subscription#request(long)}. Should further {@code onNext} related
	 * state modification occur, thread-safety will be required.
	 *    Note that an invalid request {@code <= 0} will not produce an onError and
	 *    will simply be ignored or reported through a debug-enabled
	 *    {@link reactor.util.Logger}.
	 * {@inheritDoc}
	@Override
	void onSubscribe(Subscription s);
 

一个上下文感知的订阅者,与反应流的原始订阅者相比,放宽了§1.3和§3.9的规则。如果在接收到的订阅上执行了一个无效的请求<= 0,该请求将不会产生一个onError,并且会被忽略。 规则松弛是在反应流公共域下初步建立的。

一个持有上下文的订阅者

3. CorePublisher

public interface CorePublisher<T> extends Publisher<T> {
	 * An internal {@link Publisher#subscribe(Subscriber)} that will bypass
	 * {@link Hooks#onLastOperator(Function)} pointcut.
	 * In addition to behave as expected by {@link Publisher#subscribe(Subscriber)}
	 * in a controlled manner, it supports direct subscribe-time {@link Context} passing.
	 * @param subscriber the {@link Subscriber} interested into the published sequence
	 * @see Publisher#subscribe(Subscriber)
	void subscribe(CoreSubscriber<? super T> subscriber);
 

支持CoreSubscriber的发布者。

允许包含上下文的发布者

4. Disposable

表示可以取消/释放任务或资源。 dispose方法的调用是/应该是幂等的。

常见于Task对象(例如SchedulerTask)或Subscriber对象,提供可取消执行的功能

例如,通过状态机的方式,当调用dispose方法时将状态置为取消,同时调用Future.cancel试图取消任务

当在实际执行时,需要先检查状态机的状态是否可执行

* Cancel or dispose the underlying task or resource. * Implementations are required to make this method idempotent. void dispose(); * Optionally return {@literal true} when the resource or task is disposed. * Implementations are not required to track disposition and as such may never * return {@literal true} even when disposed. However, they MUST only return true * when there's a guarantee the resource or task is disposed. * @return {@literal true} when there's a guarantee the resource or task is disposed. default boolean isDisposed() { return false;

void dispose();

取消或释放底层任务或资源。 需要实现使这个方法幂等。

default boolean isDisposed()

当资源或任务被释放时,可选地返回true。 实现不需要跟踪处理,因此即使处理也可能永远不会返回true。但是,只有在保证资源或任务被释放时,它们才必须返回true。

Disposables支持类

一个支持类,它为专用的可丢弃子接口(可丢弃的子接口)的实现提供工厂方法。复合,Disposable.Swap)。

  1. static final class ListCompositeDisposable implements Disposable.Composite, Scannable

使用List维护的复合容器

  1. static final class SwapDisposable implements Disposable.Swap

使用AtomicReferenceFieldUpdater进行原子更新的一次性容器

  1. static final class SimpleDisposable extends AtomicBoolean implements Disposable

简单可处置实例,在调用dispose时进行处理

  1. static final class AlwaysDisposable implements Disposable

始终被处理的容器,调用dispose()不做任何操作,isdispose()总是返回true

  1. static final class NeverDisposable implements Disposable

永远不被处理的实例

内部接口解析

interface Swap extends Disposable, Supplier<Disposable> {
	boolean update(@Nullable Disposable next);
	boolean replace(@Nullable Disposable next);
 

一种一次性容器,允许原子地更新/替换其内部一次性容器,同时允许处理容器本身。

常用于定时器执行,当符合时间条件时,使用新的Task替换原有的定时任务

Composite

interface Composite extends Disposable 
 

一次性容器本身就是一次性的。通过使用dispose()将一次性物品累积起来并一次性处理掉。使用add(Disposable)方法将所有权交给容器,容器现在负责处理这些方法。但是,您可以通过保留一个引用并使用remove(可丢弃的)来重新获得单个元素的所有权,这就把处理所述元素的责任交给了您。请注意,一旦丢弃,容器就不能再使用,您需要一个新的Disposable.Composite。

复合容器,用于执行一组DisposableTask,并在单个Task执行后从复合容器中移除

Flux.range(1, 100).buffer(20).subscribe(System.out::println); //一直收集元素,直到Predicate返回true,开启下一次收集 Flux.range(1, 10).bufferUntil(i -> i%3 == 0).subscribe(System.out::println); //只收集Predicate为true import lombok.extern.slf4j.Slf4j; import org.springframework.util.StringUtils; import org.springframework.web.bind.annotation.GetMapping; import org.springframework.web.bind.annotation.Re.
官网:https://projectreactor.io/ 教程:https://projectreactor.io/docs/core/release/reference/#about-doc Reactor的类型 Reactor有两种类型,Flux<T>和Mono<T>。Flux类似RaxJava的Observable,它可以触发零到多个事件,并根据实际情况结束...
Flux.interval(Duration.ofSeconds(1)).buffer(Duration.ofSeconds(60),Duration.ofSeconds(15)).subscribe(System.out::println); Thread.sleep(180*1000l); Flux.interval(Duration.ofSeconds(1)) 模... Function<Flux<String>, Flux<String>> filterAndMap = f -> f.filter(color ->!color.equals("orange")) .map(String :: toUpperCase); Flux.fromIterable(Arrays.asList("blue","green","o
我们都知道,在执行 io 操作时都需要在执行finally 里面进行 colse 方法进行 关闭流,这样可以出现 资源出现占用等一系列问题,在jdk1.7中出现了try-with-resources 特性帮助我们做了这些事情,我们先来看看 api文档对特性的描述 jdk文档try-with-resources特性描述 文档中给出了几个实例,我们先来看看前面2个实例就可以很好的看出了try-with-resources 特性 : static String readFirstLineFromFile(Stri
(一)简介 2009年微软为了应对高并发的服务器端开发,提出了Reactive Programming,中文称响应式编程(或反应式编程)。之后Java 社区如Netflix 和 TypeSafe 公司提供了 RxJava 和 Akka Stream 技术,让 Java 平台也有了能够实现反应式编程的框架,但因缺少简单易用的技术将反应式编程推广普及,并同诸如 MVC 框架、HTTP 客户端、数据库技术等整合,所以整体应用范围并不大。 本文将解析 Spring 的 Reactor 项目的源码。主要目的是让自己能深入理解 Reactor 这个项目,以及 Spring 5 和 Spring Boot 2。 Project Reactor 项目地址:https://github.com/reactor Reactor 项目主要包含 Reactor CoreReactor Netty 两部分。Reactor Cor...
这个Reactor项目的主要构件就是 reactor-core,一个反应性的库,它关注于反应流规范和以Java 8为目标。 Reactor引入了可组合的反应类型,这些类型实现了Publisher ,但也提供了丰富的操作符词汇表,尤其是Flux和Mono。Flux对象表示0..N个项目的反应序列。而Mono对象表示(0.. .1)单值。 类型中存在一些语义上的区别,表示一种异步处理粗略基...
原文:http://blog.51cto.com/liukang/2090191 Project Reactor与Spring是兄弟项目,侧重于Server端的响应式编程,主要artifact是reactor-core,这是一个基于Java 8的实现了响应式流规范(Reactive Streams specification)的响应式库。 1.Flux与Mono Reactor中发布者(Pu...