reactor
原文:http://blog.51cto.com/liukang/2090191
Project reactor与Spring是兄弟项目,侧重于Server端的响应式编程,主要artifact是reactor-core,这是一个基于java 8的实现了响应式流规范(Reactive Streams specification)的响应式库。
1.Flux与Mono
Reactor中发布者(Publisher)由Flux和Mono两个类定义,它们都提供了丰富的操作符。一个Flux对象代表一个包含0~N个元素的响应式序列,而一个Mono对象代表一个包含0/1个元素的结果。
即然是“数据流”的发布者,Flux和Mono都可以发现三种“数据信号”:元素值、错误信号、完成信号,错误信号和完成信号都是终止信号,完成信号用于告知下游订阅者该数据流正常结束,错误信号终止数据流的同时将错误传递给下游订阅者。
Flux.just(1, 2, 3, 4, 5, 6);
Mono.just(1);
Flux和Mono提供了多种创建数据流的方法,just就是一种比较直接的声明数据流的方式,其参数就是数据元素。
对于上面的Flux,还可以通过如下方式声明:
integer[] array = new Integer[]{1,2,3,4,5,6};
Flux.fromArray(array);
List<Integer> list = Arrays.asList(array);
Flux.fromIterable(list);
Stream<Integer> stream = list.stream();
Flux.fromStream(stream);
// 只有完成信号的空数据流
Flux.just();
Flux.empty();
Mono.empty();
Mono.justOrEmpty(optional.empty());
// 只有错误信号的数据流
Flux.ERROR(new Exception("some error"));
Mono.error(new Exception("some error"));
2.订阅前什么都不会发生
数据流有了,假设我们想把每个数据元素原封不动地打印出来
Flux.just(1, 2, 3, 4, 5, 6).subscribe(System.out::print);
Mono.just(1).subscribe(System.out::println);
此外,Flux和Mono还提供了多个subscribe方法的变体:
// 订阅并触发数据流
subscribe();
// 订阅并指定对正常数据元素如何处理
subscribe(consumer<? super T> consumer);
// 订阅并定义对正常数据元素和错误信号的处理
subscribe(Consumer<? super T> consumer,
Consumer<? super throwable> errorConsumer);
// 订阅并定义对正常数据元素、错误信号和完成信号的处理
subscribe(Consumer<? super T> consumer,
Consumer<? super Throwable> errorConsumer,
Runnable completeConsumer);
// 订阅并定义对正常数据元素、错误信号和完成信号的处理,以及订阅发生时的处理逻辑
subscribe(Consumer<? super T> consumer,
Consumer<? super Throwable> errorConsumer,
Runnable completeConsumer,
Consumer<? super Subscription> subscriptionConsumer);
如果是订阅上边声明的Flux:
Flux.just(1, 2, 3, 4, 5, 6).subscribe(
System.out::println,
System.err::println,
() -> System.out.println("Completed!"));
输出 :
1
2
3
4
5
6
Completed!
这里需要注意的一点是,Flux.just(1,2,3,4,5,6)仅仅声明了这个数据流,此时数据元素并未发出,只有subscribe()方法调用的时候才会触发数据流,所以,订阅前什么都不会发生。
3.测试与调试
在命令世界,调试通常都是非常直观的:直接看stack trace就可以找到问题出现的位置,以有其他信息。当你切换到响应式异步代码,事情就变得复杂多了,先了解一个基本的单元测试工具-StepVerifier。
最常见的测试Reactor序列的场景就是定义一个Flux或Mono,然后在订阅它的时候测试它的行为。
当你的测试关注于每一个数据元素的时候,就非常贴近使用StepVerifier的测场景。
private Flux<Integer> generateFluxFrom1To6() {
return Flux.just(1, 2, 3, 4, 5, 6);
}
private Mono<Integer> generateMonoWithError() {
return Mono.error(new Exception("some error"));
}
@Test
public void testViaStepVerifier() {
StepVerifier.create(generateFluxFrom1To6())
.expectNext(1, 2, 3, 4, 5, 6)
.expectComplete()
.verify();
StepVerifier.create(generateMonoWithError())
.expectErrormessage("some error")
.verify();
4.操作符
(1)map-元素映射为新元素
map操作符可以将数据元素进行转换/映射,得到一个新元素。
StepVerifier.create(Flux.range(1, 6) // 1
.map(i -> i * i)) // 2
.expectNext(1, 4, 9, 16, 25, 36) //3
.expectComplete(); // 4
(2)flatMap-元素映射为流
StepVerifier.create(
Flux.just("flux", "mono")
.flatMap(s -> Flux.fromArray(s.split("\\s*")) // 1.对于每一个字符串s,将其拆分为包含一个字符串流
.delayElements(Duration.ofMillis(100))) // 2.对于每个元素延迟100ms
.doOnNext(System.out::print)) // 3.对每个元素进行打印
.expectNextCount(8) // 4.验证是否发出8个元素
.verifyComplete();
flatMap通常用于每个元素又引入数据流的情况,比如我们有一串url,需要请求每个url并收集response数据,假设响应式的请求方法如下:
Mono<HttpResponse> requestUrl(String url) {...}
而url数据流为一个Flux<String> urlFluxt,那么为了得到所有HttpReponse,就需要用到flatMap
urlFlux.flatMap(url -> requestUrl(url));
(3)filter-过滤
filter操作符可以对数据元素进行筛选。
StepVerifier.create(Flux.range(1, 6)
.filter(i -> i % 2 == 1) // 1
.map(i -> i * i))
.expectNext(1, 9, 25) // 2
.verifyComplete();
(4)zip
看到zip这个词可能会联想到拉链,它能够将多个流一对一的合并起来。zip有多个方法变体,这里介绍最常见的二合一的。
举个例子,假设我们有一个关于zip方法的说明desc,我们希望将这句话拆分为一个一个的单词并以每200ms一个的速度发出,除了前flatMap的例子中用到的delayElements,可以如下操作:
private Flux<String> getZipDescFlux() {
String desc = "Zip two sources together, that is to say wait for all the sources to emit one element and combine these elements once into a Tuple2.";
return Flux.fromArray(desc.split("\\s+")); // 1.用空格拆分
}
@Test
public void testSimpleOperators() throws InterruptedException {
countdownlatch countDownLatch = new CountDownLatch(1); // 2.不使用它的话,测试方法所在线程会直接返回而不等待数据流发出完毕
Flux.zip(
getZipDescFlux(),
Flux.Interval(Duration.ofMillis(200))) // 3.每200ms发出一个数据,因为zip是一对一的,所以字符串流也将具有同样的速度
.subscribe(t -> System.out.println(t.getT1()), null, countDownLatch::countDown); // 4.zip之后的中元素为Tuple2
countDownLatch.await(10, TimeUnit.SECONDS); // 5.最多等待10s
}
5.调试器与线程模型
在以往的多线程开发场景中,我们通常使用Executors工具类来创建线程池。但在Reactor中,使用调试器(Scheduler)来处理这些事情。Scheduler是一个拥有多个实现类的抽象接口。Schedulers(工具类)提供的静态方法可搭建以下几中线程执行环境:
- 当前线程(Schedulers.immediate())
- 可重用的单线程(Schedulers.single())。注意,这个方法对所有调用者都提供同一个线程来使用,直到该调度器被废弃。如果你想使用独占的线程,请使用Schedulers.newSingle()
- 弹性线程斌(Schedulers.elastic()),线程池如果空闲时间过长(默认60s)就会废弃。对于IO阻塞的场景比较适用。Schedulers.elastic()能够给一个阻塞的任务分配它自己的线程,从而不会妨碍其他任务和资源 。
- 固定大小线程池(Schedulers.parallel()), 所创建的线程斌的大小与cpu个数等同。
- 自定义线程池(Schedulers.fromExecutorService(ExecutorService)),基于自定义ExecutorService创建Scheduler。
举例:
(1)同步变异步
private String getStringSync() {
try {
TimeUnit.SECONDS.sleep(2);
} catch (InterruptedException e) {
e.printstacktrace();
}
return "Hello, Reactor!";
}
正常情况下,调用这个方法会被阻塞2s,然后同步地返回结果,我们借助elastic调度器将其变为异步,由于是异步,为了保证测试方法所在的线程能够等待结果的返回,我们使用CountDownLatch
@Test
public void testSyncToAsync() throws InterruptedException {
CountDownLatch countDownLatch = new CountDownLatch(1);
Mono.fromCallable(() -> getStringSync()) // 1
.subscribeOn(Schedulers.elastic()) // 2
.subscribe(System.out::println, null, countDownLatch::countDown);
countDownLatch.await(10, TimeUnit.SECONDS);
}
6.切换调度器操作符
Reactor提供了两种在响应式链中调整调度器Scheduler的方法:publishOn和subscribeOn。它们都接收一个Scheduler作为参数。但是publishOn在链中出现的位置有讲究,而subscribeOn则无所谓。
7.错误处理
(1)捕获并返回一个静态的缺省值
Flux.range(1, 6)
.map(i -> 10/(i-3))
.onErrorReturn(0) // 1.这个方法能够在收到错误信号时提供一个缺省值
.map(i -> i*i)
.subscribe(System.out::println, System.err::println);
(2)捕获并执行一个异常处理方法或计算一个候补值来顶替
Flux.range(1, 6)
.map(i -> 10/(i-3))
.onErrorResume(e -> Mono.just(new Random().nextint(6))) // 提供新的数据流
.map(i -> i*i)
.subscribe(System.out::println, System.err::println);
(3)捕获,并再包装为某一个业务相关的异常,然后再抛出业务异常
Flux.just("timeout1")
.flatMap(k -> callExternalService(k)) // 1
.onErrorMap(original -> new BusinessException("SLA exceeded", original)); // 2
(4)捕获,记录错误日志,然后继续抛出
Flux.just(endpoint1, endpoint2)
.flatMap(k -> callExternalService(k))
.doOnError(e -> { // 1
log("uh oh, falling back, service failed for key " + k); // 2
})
.onErrorResume(e -> getFromcache(k));
(5)使用finally来清理资源,或使用java7引入try-with-resource
Flux.using(
() -> getResource(), // 1
resource -> Flux.just(resource.getAll()), // 2
MyResource::clean // 3.最终清理资源
);
(6)重试
Flux.range(1, 6)
.map(i -> 10 / (3 - i))
.retry(1)
.subscribe(System.out::println, System.err::println);
Thread.sleep(100); // 确保序列执行完
8.回压
public void testBackpressure() {
Flux.range(1, 6) // 1.是一个快的Publiser
.doOnRequest(n -> System.out.println("Request " + n + " values...")) // 2.每次request的时候打印request的个数
.subscribe(new Basesubscriber<Integer>() { // 3.通过重写BaseSubscriber的方法来自定义Subscriber
@Override
protected void hookOnSubscribe(Subscription subscription) { // 4.在订阅的时候执行
System.out.println("Subscribed and make a request...");
request(1); // 5.订阅时首先向上游请求1个元素
}
@Override
protected void hookOnNext(Integer value) { // 6.每次收到一个元素的时候操作
try {
TimeUnit.SECONDS.sleep(1); // 7.模拟慢的subscriber
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("Get value [" + value + "]"); // 8
request(1); // 9
}
});
}
Reactor提供了一个BaseSubscriber,我们可以通过扩展它的自定义Subscriber,Subscriber通过request(n)的方法来告知上游它的需求速度 。
最后欢迎大家访问我的个人网站:1024s