反应式流

检出 interop-reactiveStreams 模块以获得对反应式流的互操作支持。

反应式流的 Producer 和 Subscriber

ZIO 通过将 zio.stream.Stream 转换到 org.reactivestreams.Publisher,和将 zio.stream.Sink 转换到 org.reactivestreams.Subscriber 来集成 Reactive Streams。反之亦然。简单地引入 zio.interop.reactiveStreams._ 来让转换生效。

例子

首先,让我们导入一些内容。

import org.reactivestreams.example.unicast._
import zio._
import zio.interop.reactiveStreams._
import zio.stream._

val runtime = Runtime.default

我们使用以下发布者和订阅者作为示例:

val publisher = new RangePublisher(3, 10)
val subscriber = new SyncSubscriber[Int] {
  override protected def whenNext(v: Int): Boolean = {
    print(s"$v, ")
    true
  }
}

将发布者作为流

将一个发布者作为流来使用时最多缓冲 qSize 个元素。如果可能的话,qSize 应该是 2 的幂以达到最佳性能。默认值为 16。

val streamFromPublisher = publisher.toStream(qSize = 16)
runtime.unsafeRun(
  streamFromPublisher.run(Sink.collectAll[Integer])
)

将订阅者作为接收器

当将一个订阅者连接到 Stream 时,需要一个旁支通道来处理故障。因此,toSink 返回 PromiseSink 的元组。当流失败时,Promise 必须呈现失败状态,toSink 上的 type 参数代表 Stream 的错误类型。

val asSink = subscriber.toSink[Throwable]
val failingStream = Stream.range(3, 13) ++ Stream.fail(new RuntimeException("boom!"))
runtime.unsafeRun(
  asSink.flatMap { case (errorP, sink) =>
    failingStream.run(sink).catchAll(errorP.fail)
  }
)

将流作为发布者

val stream = Stream.range(3, 13)
runtime.unsafeRun(
  stream.toPublisher.flatMap { publisher =>
    UIO(publisher.subscribe(subscriber))
  }
)

将接收器作为订阅者

toSubscriber 返回一个 Subscriber 和一个IO,这个 IO 负责在接收器执行结束后,或当发布者产生错误时,返回结果。作为订阅者的用接收器最多缓冲 qSize 个元素。qSize 应该尽可能是 2 的幂以达到最佳性能。默认值为16。

val sink = Sink.collectAll[Integer]
runtime.unsafeRun(
  sink.toSubscriber(qSize = 16).flatMap { case (subscriber, result) => 
    UIO(publisher.subscribe(subscriber)) *> result
  }
)
Leave a Reply
Your email address will not be published.
*
*

BACK TO TOP