blocking
目录
1 blockingFirst
2 blockingforeach
3 blockingIterable
4 blockingLast
5 blockingLatest
6 blockingMostrecent
7 blockingNext
8 blockingSingle
9 blockingSubscribe
1 blockingFirst
T |
blockingFirst()
返回此Flowable发出的第一个项,如果它没有发出任何项,则抛出NoSuchElementException。 |
T |
blockingFirst(T defaultItem)
返回此Flowable发出的第一个项,如果它不发出任何项,则返回默认值。 |
1.1 blockingFirst图解
1.2 blockingFirst测试用例
测试代码
@Test
public void doBlockingFirst() {
System.out.println("######doAny#####");
integer result = Flowable.just(10, 2, 3, 4, 5).blockingFirst();
System.out.println(result);
}
测试结果
######doAny#####
10
1.3 blockingFirst分析说明
blockingFirst返回发射的第一个item
1.4 实用场景
后续完善
2 blockingForEach
void |
blockingForEach(consumer<? super T> onNext)
以阻塞方式消耗上游Flowable并使用当前线程上的每个上游项调用给定的Consumer,直到上游终止。 |
2.1 blockingForEach测试用例
@Test
public void doBlockingForEach() {
System.out.println("######doBlockingForEach#####");
Flowable.just(10, 2, 3, 4, 5).blockingForEach(new Consumer<Integer>() {
@Override
public void accept(Integer result) throws Exception {
System.out.println("blocking result = " + result);
}
});
}
测试结果
######doBlockingForEach#####
blocking result = 10
blocking result = 2
blocking result = 3
blocking result = 4
blocking result = 5
2.2 blockingForEach说明
blockingForEach会使用指定的Consumer消费Flowable发出的每一个项目
2.4 实用场景
后续完善
3 blockingIterable
Iterable<T> |
blockingIterable()
将此Flowable转换为Iterable。 |
Iterable<T> |
blockingIterable(int bufferSize)
将此Flowable转换为Iterable,限制 |
3.1 blockingIterable图解
3.2 blockingIterable测试用例
@Test
public void blockingIterable() {
System.out.println("######blocksingIterable#####");
Iterable<Integer> iterable = Flowable.just(10, 9, 8, 7).blockingIterable();
for(Integer aiterable :iterable) {
System.out.println("value = " + aIterable);
}
}
测试结果
######blockingIterable#####
value = 10
value = 9
value = 8
value = 7
3.3 实用场景
后续完善
4 blockingLast
T |
blockingLast()
返回此Flowable发出的最后一项,如果此Flowable没有发出任何项,则抛出NoSuchElementException。 |
T |
blockingLast(T defaultItem)
返回此Flowable发出的最后一项,如果它没有发出任何项,则返回默认值。 |
4.1 blockingLast图解
4.2 blockingLast测试用例
@Test
public void blockingLast() {
System.out.println("######blockingLast#####");
Integer last = Flowable.just(10, 9, 8, 7).blockingLast();
System.out.println("Last value = "+ last);
}
测试结果
######blockingLast#####
Last value = 7
4.3 实用场景
后续完善
5 blockingLatest
Iterable<T> |
blockingLatest()
返回一个Iterable,它将会阻塞,直到此Flowable发出的新项目,并将其返回。 |
5.1 blockingLatest图解
5.2 blockingLatest测试用例
@Test
public void blockingLatest() {
System.out.println("######blockingLatest#####");
Iterable<Integer> latest = Flowable.just(10, 9, 8, 7).delay(1, TimeUnit.SECONDS).blockingLatest();
if(latest.iterator().hasNext()) {
System.out.println("Latest value = "+ latest.iterator().next());
} else {
System.out.println("Latest value is null " );
}
}
测试结果
######blockingLatest#####
Latest value = 10
5.3 实用场景
后续完善
6 blockingMostRecent
Iterable<T> |
blockingMostRecent(T initialItem) 返回一个Iterable,会阻塞等待此Flowable发出最近的项,并将这个项返回。 |
6.1 blockingMostRecent图解
6.2 blockingMostRecent测试用例
@Test
public void blockingMostRecent() {
System.out.println("######blockingMostRecent#####");
Flowable publisher = Flowable.just("我说","123","木头人").delay(100,TimeUnit.MILLISECONDS);
Iterable<String> mostRecent = publisher.blockingMostRecent("null");
mostRecent.forEach(new java.util.function.Consumer<String>() {
@Override
public void accept(String mostRecent) {
System.out.println("mostRecent = " + mostRecent);
}
});
}
######blockingMostRecent#####
mostRecent = null
mostRecent = null
mostRecent = null
mostRecent = null
mostRecent = null
mostRecent = null
mostRecent = null
mostRecent = null
mostRecent = null
mostRecent = 我说
mostRecent = 我说
mostRecent = 我说
mostRecent = 123
mostRecent = 123
mostRecent = 123
mostRecent = 木头人
mostRecent = 木头人
mostRecent = 木头人
6.3 blockingMostRecent测试用例分析
blockingMostRecent这个操作符会阻塞等待Flowable发射项目,测试用例中反复打印null就是阻塞等待的证据,直到有新的值发射
6.4 实用场景
后续完善
7 blockingNext
Iterable<T> |
|
7.1 blockingNext图解
源码中注释里面指向说明文档图,与takeLast一样
7.2 blockingNext测试用例
测试1:
@Test
public void blockingNext() {
System.out.println("######blockingNext#####");
Flowable publisher = Flowable.just("我说","123","木头人").delay(1,TimeUnit.SECONDS);
publisher.subscribe(new Consumer<String>() {
@Override
public void accept(String value) throws Exception {
System.out.println("value = " + value);
}
});
Iterable<String> next = publisher.blockingNext();
next.forEach(new java.util.function.Consumer<String>() {
@Override
public void accept(String next) {
System.out.println("next = " + next);
}
});
}
}
测试1结果
######blockingNext#####
value = 我说
value = 123
value = 木头人
next = 我说
测试2
@Test
public void blockingNext1() {
System.out.println("######blockingNext#####");
Flowable publisher = Flowable.Interval(0, 1, TimeUnit.SECONDS);
Iterable<Long> next = publisher.blockingNext();
publisher.subscribe(new Consumer<Long>() {
@Override
public void accept(Long value) throws Exception {
System.out.println("value = " + value);
}
});
next.forEach(new java.util.function.Consumer<Long>() {
@Override
public void accept(Long next) {
System.out.println("next = " + next);
}
});
}
测试二结果
######blockingNext#####
value = 0
next = 0
value = 1
next = 1
value = 2
next = 2
...继续
7.3 blockingNext测试用例分析
Flowable实例在使用blockingNext操作符时会返回一个Iterable,该Iterable在每次迭代时阻塞,直到Flowable发出一个新项,然后Iterable返回该项,上面的测试1只是发射了三个值后就结束了,所以只打印出第一个值便结束了,剩下的还来不及返回便结束了;测试2,每隔1s发射一个长整型的值,从0开始,每发射一次+1,这时候next也会跟着一直打印。其实这说明了一个问题,也就是说
blockingNext操作符是对Flowable实例的发射做了阻塞式的监听,在Flowable没有结束前就会一直阻塞等待Flowable的发射消息,发射一次返回一次。
7.4 实用场景
后续完善
8 blockingSingle
T |
blockingSingle()
如果此Flowable在发射单个项目后便完成,则返回被发射的单个项目,如果它发出多个项目,则抛出illegalargumentException。 源码的注释是:If this {@code Flowable} completes after emitting a single item, return that item, otherwise throw a {@code NoSuchElementException}. 但是NoSuchElementException应该写错了错误的,在做测试时候抛出的异常是IllegalArgumentException,下面重载接口是正确的 |
T |
blockingSingle(T defaultItem)
如果此Flowable在发射单个项目后便完成,则返回被发射的单个项目; 如果它发出多个项目,则抛出IllegalArgumentException; 如果它没有发出任何项目,则返回默认值。 |
8.1 blockingSingle图解
图解地址:http://reactivex.io/documentation/operators/first.html
源码中指向的图解就是这个,看起来似乎和某些操作符使用了相同的,这说明这些操作符表达的是一个类似操作
8.2 blockingSingle测试用例
测试1
@Test
public void blockingSingle() {
System.out.println("######blockingSingle#####");
Flowable<Long> publisher = Flowable.just(1L, 2L);
Long singleItem = publisher.blockingSingle();
System.out.println("singleItem = " + singleItem);
}
测试1结果:
######blockingNext#####
java.lang.IllegalArgumentException: sequence contains more than one element!
at io.reactivex.internal.operators.flowable.FlowableSingleSingle$SingleElementsubscriber.onNext(FlowableSingleSingle.java:82)
at io.reactivex.internal.operators.flowable.FlowableInterval$IntervalSubscriber.run(FlowableInterval.java:84)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308)
at java.util.concurrent.scheduledthreadpoolexecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180)
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
Process finished with exit code 255
测试2
@Test
public void blockingSingle2() {
System.out.println("######blockingSingle2#####");
Flowable<Long> publisher = Flowable.just(1L);
Long singleItem = publisher.blockingSingle();
System.out.println("singleItem = " + singleItem);
}
测试2结果
######blockingSingle2#####
singleItem = 1
Process finished with exit code 0
测试3
@Test
public void blockingSingle3() {
System.out.println("######blockingSingle2#####");
Flowable<Long> publisher = Flowable.empty();
Long singleItem = publisher.blockingSingle(-1L);
System.out.println("singleItem = " + singleItem);
}
测试3结果
######blockingSingle2#####
singleItem = -1
Process finished with exit code 0
8.3 blockingSingle测试用例分析
如果Flowable在发射完单个项目便结束了,那么blockingSingle操作符会返回这个发射的结果,如果Flowable发射项目在2个一个上会报错
(java.lang.IllegalArgumentException: Sequence contains more than one element!)
测试3是对其重载方法做测试,通过 Flowable.empty()不会发射项目,blockingSingle操作符会返回默认的值-1
8.4 实用场景
后续完善
9 blockingSubscribe
void |
blockingSubscribe()
Runs the source Flowable to a terminal event, ignoring any values and rethrowing any exception. 将观察源运行到结束为止,忽略任何值并重新抛出任何异常。 |
void |
blockingSubscribe(Consumer<? super T> onNext)
订阅(观察)源并调用当前线程上的给定回调。 |
void |
blockingSubscribe(Consumer<? super T> onNext, Consumer<? super throwable> onERROR)
订阅(观察)源并调用当前线程上的给定回调。 |
void |
blockingSubscribe(Consumer<? super T> onNext, Consumer<? super Throwable> onError,Action onComplete)
订阅(观察)源并调用当前线程上的给定回调。 |
void |
blockingSubscribe(Consumer<? super T> onNext, Consumer<? super Throwable> onError,Action onComplete, int bufferSize)
订阅(观察)源并调用当前线程上的给定回调。 |
void |
blockingSubscribe(Consumer<? super T> onNext, Consumer<? super Throwable> onError, int bufferSize)
订阅(观察)源并调用当前线程上的给定回调。 |
void |
blockingSubscribe(Consumer<? super T> onNext, int bufferSize)
订阅(观察)源并调用当前线程上的给定回调。 |
void |
blockingSubscribe(Subscriber<? super T> subscriber)
订阅源并在当前线程上调用Subscriber方法。 |
9.1 blockingSubscribe测试用例
测试1
@Test
public void blockingSubscribe() {
System.out.println("######blockingSubscribe#####");
Flowable<Long> source = Flowable.just(1L, 2L, 3L);
source.subscribe(new Consumer<Long>() {
@Override
public void accept(Long aLong) throws Exception {
System.out.println("aLong = " + aLong);
}
});
source.blockingSubscribe();
}
测试1结果
######blockingSubscribe#####
aLong = 1
aLong = 2
aLong = 3
Process finished with exit code 0
测试2
@Test
public void blockingSubscribe2() {
System.out.println("######blockingSubscribe#####");
Flowable<Long> publisher = Flowable.just(4L, 5L, 3L);
publisher.blockingSubscribe(new Consumer<Long>() {
@Override
public void accept(Long aLong) throws Exception {
System.out.println("aLong = " + aLong);
}
});
测试2结果
######blockingSubscribe#####
aLong = 4
aLong = 5
aLong = 3
Process finished with exit code 0
9.2 blockingSubscribe测试用例分析
上面只是列举了最初的两个重载blockingSubscribe操作符,而且两个的运行结果看不出来有什么不同,实际上这里体现的是在当前线程订阅。与forEach类似,这个操作符后面会讲到。
9.3 实用场景
后续完善
相关阅读
在前面的章节中,我们简要地介绍了卷积神经网络的来龙去脉。接下来我们逐一来解释它之所以成功的几个核心要素。卷积神经网络的名字
本文转自:汇川技术小型PLC梯形图编程系列教程(零):梯形图编程学习指南http://www.yanjuntech.cn/archives/1969 本人目前接触的都
Chef 安装教程【系列教程一】 版本均为官方最新稳定版 Chef Server 12.17.33、Chef Manage 2.5.16、Chef Workstation 0.1.13
详解Dell EMC发布的PowerMax存储和R系列计算系统
今年5月1日,戴尔E
原创作品,允许转载,转载时请务必以超链接形式标明文章 原始出处 、作者信息和本声明。否则将追究法律责任。http://eilfei2000.b