Sink

ZSink[R, E, A, B] 用于消费从流中产生的元素。您可以将此接收器视为消费可变数量的 A 元素(可能为 0、1或很多!)的函数,可能因 E 类型错误而失败,或最终产生 B 类型的值。

ZSink 被作为参数传递给 ZStream#run

import zio._
import zio.stream._

val stream = ZStream.fromIterable(1 to 1000)

val sink = ZSink.sum[Int]

stream.run(sink)

建立 sinks

zio.stream 提供了多种不同的 sink 供使用。

将数据收集到 Chunk[A] 中:

ZSink.collectAll[Int]

尝试将第一个元素接收到一个 Option 中(如果流为空则返回 None):

ZSink.head[Int]

 stream.runDrain 的以下实现将忽略流中所有的输入:

ZSink.drain

产生一个给定类型的失败:

ZSink.fail("Boom")

基本的接收数据累积函数:

ZSink.foldLeft[Int, Int](0)(_ + _)

具有短路功能的折叠器:

ZSink.fold(0)(sum => sum >= 10)((acc, n: Int) => acc + n)

sinks 转换

创建 Sink 后,我们可以使用提供的操作对其进行转换。

并行运行两个接收器并返回先执行完成的那一个:

Sink.foldLeft[Int, Int](0)(_ + _).race(Sink.head[Int])

我们可以使用 contramap,通过给定 C => A,其中 C 是输入类型,而 A 是 Sink的接收的元素类型,将给定的输入转换为某个特定的 Sink

Sink.collectAll[String].contramap[Int](_.toString + "id")

dimapcontramap 的扩展,它还可以指定 Sink 的输出转换:

Sink.collectAll[String].dimap[Int, Chunk[String]](_.toString + "id", _.take(10))
Leave a Reply
Your email address will not be published.
*
*

BACK TO TOP