检出 interop-reactiveStreams
模块以获得对反应式流的互操作支持。
反应式流的 Producer
和 Subscriber
ZIO 通过将 zio.stream.Stream
转换到 org.reactivestreams.Publisher
,和将 zio.stream.Sink
转换到 org.reactivestreams.Subscriber
来集成 Reactive Streams。反之亦然。简单地引入 zio.interop.reactiveStreams._
来让转换生效。
例子
首先,让我们导入一些内容。
import org.reactivestreams.example.unicast._
import zio._
import zio.interop.reactiveStreams._
import zio.stream._
val runtime = Runtime.default
我们使用以下发布者和订阅者作为示例:
val publisher = new RangePublisher(3, 10)
val subscriber = new SyncSubscriber[Int] {
override protected def whenNext(v: Int): Boolean = {
print(s"$v, ")
true
}
}
将发布者作为流
将一个发布者作为流来使用时最多缓冲 qSize
个元素。如果可能的话,qSize 应该是 2 的幂以达到最佳性能。默认值为 16。
val streamFromPublisher = publisher.toStream(qSize = 16)
runtime.unsafeRun(
streamFromPublisher.run(Sink.collectAll[Integer])
)
将订阅者作为接收器
当将一个订阅者连接到 Stream 时,需要一个旁支通道来处理故障。因此,toSink
返回 Promise
和 Sink
的元组。当流失败时,Promise
必须呈现失败状态,toSink
上的 type 参数代表 Stream 的错误类型。
val asSink = subscriber.toSink[Throwable]
val failingStream = Stream.range(3, 13) ++ Stream.fail(new RuntimeException("boom!"))
runtime.unsafeRun(
asSink.flatMap { case (errorP, sink) =>
failingStream.run(sink).catchAll(errorP.fail)
}
)
将流作为发布者
val stream = Stream.range(3, 13)
runtime.unsafeRun(
stream.toPublisher.flatMap { publisher =>
UIO(publisher.subscribe(subscriber))
}
)
将接收器作为订阅者
toSubscriber
返回一个 Subscriber
和一个IO,这个 IO 负责在接收器执行结束后,或当发布者产生错误时,返回结果。作为订阅者的用接收器最多缓冲 qSize
个元素。qSize 应该尽可能是 2 的幂以达到最佳性能。默认值为16。
val sink = Sink.collectAll[Integer]
runtime.unsafeRun(
sink.toSubscriber(qSize = 16).flatMap { case (subscriber, result) =>
UIO(publisher.subscribe(subscriber)) *> result
}
)