ZSink[R, E, A, B]
用于消费从流中产生的元素。您可以将此接收器视为消费可变数量的 A 元素(可能为 0、1或很多!)的函数,可能因 E 类型错误而失败,或最终产生 B 类型的值。
Contents
hide
ZSink 被作为参数传递给 ZStream#run
:
import zio._
import zio.stream._
val stream = ZStream.fromIterable(1 to 1000)
val sink = ZSink.sum[Int]
stream.run(sink)
建立 sinks
zio.stream
提供了多种不同的 sink 供使用。
将数据收集到 Chunk[A]
中:
ZSink.collectAll[Int]
尝试将第一个元素接收到一个 Option 中(如果流为空则返回 None):
ZSink.head[Int]
stream.runDrain
的以下实现将忽略流中所有的输入:
ZSink.drain
产生一个给定类型的失败:
ZSink.fail("Boom")
基本的接收数据累积函数:
ZSink.foldLeft[Int, Int](0)(_ + _)
具有短路功能的折叠器:
ZSink.fold(0)(sum => sum >= 10)((acc, n: Int) => acc + n)
sinks 转换
创建 Sink 后,我们可以使用提供的操作对其进行转换。
并行运行两个接收器并返回先执行完成的那一个:
Sink.foldLeft[Int, Int](0)(_ + _).race(Sink.head[Int])
我们可以使用 contramap
,通过给定 C => A
,其中 C
是输入类型,而 A
是 Sink的接收的元素类型,将给定的输入转换为某个特定的 Sink
Sink.collectAll[String].contramap[Int](_.toString + "id")
dimap
是 contramap
的扩展,它还可以指定 Sink 的输出转换:
Sink.collectAll[String].dimap[Int, Chunk[String]](_.toString + "id", _.take(10))