完整的代码请参考 github.com/javahongxi/…
我们的一些企业对于HTTP服务有一些非正常的做法,它们客户端的请求body是加密的,即在服务端需要对请求body进行解密,而服务端响应的body也要求加密。本文就来揭秘这一需求在 WebFlux 中如何实现,我们给 request/response body 均增加一个表示时间戳的字段 start/end 来模拟请求数据解密和响应数据加密,思路如下。
首先我们需要知道,WebFlux 的过滤器/拦截器是统一用 WebFilter 来表示的,与 Spring MVC 类似,对于 application/json 请求,WebFlux 读取 body inputstream 也只能读取一次,对于query params / form-data / form-x-www 请求,可以反复获取。 所以,怎么修改 application/json 请求的 request/response body 是一个难点。网上找遍了例子,大都是错误的例子或是hello demo,完全没法解决本文的需求。经过苦心探索,本人终于找到了解决方案,代码如下。
import lombok.extern.slf4j.Slf4j;
import org.hongxi.sample.webflux.support.*;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.core.annotation.Order;
import org.springframework.http.MediaType;
import org.springframework.http.codec.ServerCodecConfigurer;
import org.springframework.http.server.reactive.ServerHttpRequest;
import org.springframework.http.server.reactive.ServerHttpResponse;
import org.springframework.stereotype.Component;
import org.springframework.web.server.ServerWebExchange;
import org.springframework.web.server.WebFilter;
import org.springframework.web.server.WebFilterChain;
import reactor.core.publisher.Mono;
import java.util.Map;
* Created by shenhongxi on 2021/4/29.
@Slf4j
@Order(-1)
@Component
public class ModifyBodyFilter implements WebFilter {
@Autowired
private ServerCodecConfigurer codecConfigurer;
@Override
public Mono<Void> filter(ServerWebExchange exchange, WebFilterChain chain) {
if (WebUtils.shouldNotFilter(exchange)) {
return chain.filter(exchange);
return ParamUtils.from(exchange)
.map(params -> decorate(exchange, params))
.flatMap(chain::filter);
private ServerWebExchange decorate(ServerWebExchange exchange, Map<String, Object> params) {
if (params.isEmpty()) {
return exchange;
ServerHttpResponse serverHttpResponse = new ModifiedServerHttpResponse(exchange, codecConfigurer.getReaders());
MediaType contentType = exchange.getRequest().getHeaders().getContentType();
if (contentType != null && contentType.isCompatibleWith(MediaType.APPLICATION_JSON)) {
Map<String, Object> decrypted = Crypto.decrypt(params);
exchange.getAttributes().put(WebUtils.REQUEST_PARAMS_ATTR, decrypted);
byte[] rawBody = JacksonUtils.serialize(decrypted);
ServerHttpRequest serverHttpRequest = new ModifiedServerHttpRequest(exchange.getRequest(), rawBody);
return exchange.mutate().request(serverHttpRequest).response(serverHttpResponse).build();
ServerWebExchange serverWebExchange = new ModifiedServerWebExchange(exchange);
return serverWebExchange.mutate().response(serverHttpResponse).build();
import org.springframework.core.ResolvableType;
import org.springframework.http.MediaType;
import org.springframework.http.codec.DecoderHttpMessageReader;
import org.springframework.http.codec.HttpMessageReader;
import org.springframework.http.codec.json.Jackson2JsonDecoder;
import org.springframework.web.bind.support.WebExchangeDataBinder;
import org.springframework.web.server.ServerWebExchange;
import reactor.core.publisher.Mono;
import java.util.Collections;
import java.util.Map;
* Created by shenhongxi on 2021/4/29.
public abstract class ParamUtils {
@SuppressWarnings("rawtypes")
private static final HttpMessageReader messageReader =
new DecoderHttpMessageReader<>(new Jackson2JsonDecoder());
@SuppressWarnings("unchecked")
public static Mono<Map<String, Object>> from(ServerWebExchange exchange) {
Map<String, Object> data = exchange.getAttribute(WebUtils.REQUEST_PARAMS_ATTR);
if (data != null) {
return Mono.just(data);
Mono<Map<String, Object>> params;
MediaType contentType = exchange.getRequest().getHeaders().getContentType();
if (contentType != null && contentType.isCompatibleWith(MediaType.APPLICATION_JSON)) {
params = (Mono<Map<String, Object>>) messageReader.readMono(
ResolvableType.forType(Map.class), exchange.getRequest(), Collections.emptyMap());
} else {
params = WebExchangeDataBinder.extractValuesToBind(exchange);
return params.doOnNext(e -> exchange.getAttributes().put(WebUtils.REQUEST_PARAMS_ATTR, e));
import io.netty.buffer.ByteBufAllocator;
import org.springframework.core.io.buffer.DataBuffer;
import org.springframework.core.io.buffer.NettyDataBufferFactory;
import org.springframework.http.HttpHeaders;
import org.springframework.http.server.reactive.ServerHttpRequest;
import org.springframework.http.server.reactive.ServerHttpRequestDecorator;
import reactor.core.publisher.Flux;
import java.nio.charset.StandardCharsets;
* Created by shenhongxi on 2021/4/29.
public class ModifiedServerHttpRequest extends ServerHttpRequestDecorator {
private final byte[] rawBody;
public ModifiedServerHttpRequest(ServerHttpRequest delegate, byte[] rawBody) {
super(delegate);
this.rawBody = rawBody;
@Override
public Flux<DataBuffer> getBody() {
NettyDataBufferFactory nettyDataBufferFactory = new NettyDataBufferFactory(ByteBufAllocator.DEFAULT);
DataBuffer buffer = nettyDataBufferFactory.allocateBuffer(this.rawBody.length);
buffer.write(this.rawBody);
return Flux.just(buffer);
@Override
public HttpHeaders getHeaders() {
// 必须 new,不能直接操作 super.getHeaders()(readonly)
HttpHeaders headers = new HttpHeaders();
headers.addAll(super.getHeaders());
headers.setContentLength(this.rawBody.length);
return headers;
* @return body json string
public String bodyString() {
return new String(rawBody, StandardCharsets.UTF_8);
import org.reactivestreams.Publisher;
import org.springframework.core.io.buffer.DataBuffer;
import org.springframework.core.io.buffer.DataBufferUtils;
import org.springframework.http.HttpHeaders;
import org.springframework.http.HttpStatus;
import org.springframework.http.MediaType;
import org.springframework.http.ReactiveHttpOutputMessage;
import org.springframework.http.codec.HttpMessageReader;
import org.springframework.http.server.reactive.ServerHttpResponseDecorator;
import org.springframework.web.reactive.function.BodyInserter;
import org.springframework.web.reactive.function.BodyInserters;
import org.springframework.web.reactive.function.client.ClientResponse;
import org.springframework.web.server.ServerWebExchange;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import java.util.List;
* Created by shenhongxi on 2021/4/29.
public class ModifiedServerHttpResponse extends ServerHttpResponseDecorator {
private final ServerWebExchange exchange;
private final List<HttpMessageReader<?>> messageReaders;
public ModifiedServerHttpResponse(ServerWebExchange exchange,
List<HttpMessageReader<?>> messageReaders) {
super(exchange.getResponse());
this.exchange = exchange;
this.messageReaders = messageReaders;
@Override
public Mono<Void> writeWith(Publisher<? extends DataBuffer> body) {
HttpHeaders httpHeaders = new HttpHeaders();
httpHeaders.setContentType(MediaType.APPLICATION_JSON);
// 这里只是借用 ClientResponse 这个类获取修改之前的 body
// server 端最终返回的是 ServerResponse/ServerHttpResponse
ClientResponse clientResponse = prepareClientResponse(body, httpHeaders);
Mono<byte[]> modifiedBody = clientResponse.bodyToMono(byte[].class)
.flatMap(originalBody -> Mono.just(Crypto.encrypt(originalBody)));
BodyInserter<Mono<byte[]>, ReactiveHttpOutputMessage> bodyInserter =
BodyInserters.fromPublisher(modifiedBody, byte[].class);
CachedBodyOutputMessage outputMessage = new CachedBodyOutputMessage(exchange,
exchange.getResponse().getHeaders());
return bodyInserter.insert(outputMessage, new BodyInserterContext()).then(Mono.defer(() -> {
Mono<DataBuffer> messageBody = DataBufferUtils.join(outputMessage.getBody());
HttpHeaders headers = getDelegate().getHeaders();
if (!headers.containsKey(HttpHeaders.TRANSFER_ENCODING)
|| headers.containsKey(HttpHeaders.CONTENT_LENGTH)) {
messageBody = messageBody.doOnNext(data -> headers.setContentLength(data.readableByteCount()));
return getDelegate().writeWith(messageBody);
@Override
public Mono<Void> writeAndFlushWith(Publisher<? extends Publisher<? extends DataBuffer>> body) {
return writeWith(Flux.from(body).flatMapSequential(p -> p));
private ClientResponse prepareClientResponse(Publisher<? extends DataBuffer> body, HttpHeaders httpHeaders) {
ClientResponse.Builder builder = ClientResponse.create(HttpStatus.OK, messageReaders);
return builder.headers(headers -> headers.putAll(httpHeaders)).body(Flux.from(body)).build();
import org.springframework.util.MultiValueMap;
import org.springframework.web.server.ServerWebExchange;
import org.springframework.web.server.ServerWebExchangeDecorator;
import reactor.core.publisher.Mono;
* Created by shenhongxi on 2021/4/29.
public class ModifiedServerWebExchange extends ServerWebExchangeDecorator {
public ModifiedServerWebExchange(ServerWebExchange delegate) {
super(delegate);
@Override
public Mono<MultiValueMap<String, String>> getFormData() {
return super.getFormData()
.map(Crypto::decrypt)
.doOnNext(decrypted -> getDelegate().getAttributes()
.put(WebUtils.REQUEST_PARAMS_ATTR, decrypted)
import org.springframework.util.MultiValueMap;
import org.springframework.util.MultiValueMapAdapter;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
* Created by shenhongxi on 2021/4/29.
public class Crypto {
* 模拟解密逻辑:添加一个请求参数 start
* @param requestBody
* @return
public static Map<String, Object> decrypt(Map<String, Object> requestBody) {
Map<String, Object> decrypted = new HashMap<>(requestBody);
decrypted.put("start", System.currentTimeMillis());
return decrypted;
* 模拟解密逻辑:添加一个请求参数 start
* @param formData
* @return
public static MultiValueMap<String, String> decrypt(MultiValueMap<String, String> formData) {
MultiValueMap<String, String> decrypted = new MultiValueMapAdapter<>(formData);
decrypted.put("start", Collections.singletonList(String.valueOf(System.currentTimeMillis())));
return decrypted;
* 模拟加密逻辑:添加一个响应参数 end
* @param responseBody
* @return
public static byte[] encrypt(byte[] responseBody) {
Map<String, Object> result = JacksonUtils.deserialize(responseBody, Map.class);
result.put("end", System.currentTimeMillis());
return JacksonUtils.serialize(result);
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import org.springframework.http.codec.HttpMessageWriter;
import org.springframework.http.server.reactive.ServerHttpRequest;
import org.springframework.web.reactive.function.BodyInserter;
import org.springframework.web.reactive.function.client.ExchangeStrategies;
* Created by shenhongxi on 2021/5/3.
public class BodyInserterContext implements BodyInserter.Context {
private final ExchangeStrategies exchangeStrategies;
public BodyInserterContext() {
this.exchangeStrategies = ExchangeStrategies.withDefaults();
public BodyInserterContext(ExchangeStrategies exchangeStrategies) {
this.exchangeStrategies = exchangeStrategies;
@Override
public List<HttpMessageWriter<?>> messageWriters() {
return exchangeStrategies.messageWriters();
@Override
public Optional<ServerHttpRequest> serverRequest() {
return Optional.empty();
@Override
public Map<String, Object> hints() {
return Collections.emptyMap();
import java.util.function.Supplier;
import org.reactivestreams.Publisher;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import org.springframework.core.io.buffer.DataBuffer;
import org.springframework.core.io.buffer.DataBufferFactory;
import org.springframework.http.HttpHeaders;
import org.springframework.http.ReactiveHttpOutputMessage;
import org.springframework.web.server.ServerWebExchange;
* Created by shenhongxi on 2021/5/3.
public class CachedBodyOutputMessage implements ReactiveHttpOutputMessage {
private final DataBufferFactory bufferFactory;
private final HttpHeaders httpHeaders;
private boolean cached = false;
private Flux<DataBuffer> body = Flux
.error(new IllegalStateException("The body is not set. " + "Did handling complete with success?"));
public CachedBodyOutputMessage(ServerWebExchange exchange, HttpHeaders httpHeaders) {
this.bufferFactory = exchange.getResponse().bufferFactory();
this.httpHeaders = httpHeaders;
@Override
public void beforeCommit(Supplier<? extends Mono<Void>> action) {
@Override
public boolean isCommitted() {
return false;
boolean isCached() {
return this.cached;
@Override
public HttpHeaders getHeaders() {
return this.httpHeaders;
@Override
public DataBufferFactory bufferFactory() {
return this.bufferFactory;
* Return the request body, or an error stream if the body was never set or when.
* @return body as {@link Flux}
public Flux<DataBuffer> getBody() {
return this.body;
public Mono<Void> writeWith(Publisher<? extends DataBuffer> body) {
this.body = Flux.from(body);
this.cached = true;
return Mono.empty();
@Override
public Mono<Void> writeAndFlushWith(Publisher<? extends Publisher<? extends DataBuffer>> body) {
return writeWith(Flux.from(body).flatMap(p -> p));
@Override
public Mono<Void> setComplete() {
return writeWith(Flux.empty());
完整的代码请参考 github.com/javahongxi/…