Stream[E, A]
表示一个可以产生 A
类型输出值,或可能以 E
类型为失败值的,效果化的流。
新建 Stream
import zio.stream._
val stream: Stream[Nothing, Int] = Stream(1,2,3)
或产生自 Iterable
:
import zio.stream._
val streamFromIterable: Stream[Nothing, Int] = Stream.fromIterable(0 to 100)
转换一个 Stream
ZIO Stream 提供了许多标准的转换函数,例如:map
,partition
,grouped
,groupByKey
,groupedWithin
等。以下是如何使用它们的示例。
map
import zio.stream._
val intStream: Stream[Nothing, Int] = Stream.fromIterable(0 to 100)
val stringStream: Stream[Nothing, String] = intStream.map(_.toString)
partition
partition
根据函数参数将 stream 分成多个流元组。第一个流包含评估为 true的所有元素,第二个流包含评估为 false 的所有元素。较快的流可能比较慢的流领先,领先的程度受缓冲大小的限制。两个流都以 ZManaged
类型打包。在下面的示例中,左流仅包含偶数。
import zio._
import zio.stream._
val partitionResult: ZManaged[Any, Nothing, (ZStream[Any, Nothing, Int], ZStream[Any, Nothing, Int])] =
Stream
.fromIterable(0 to 100)
.partition(_ % 2 == 0, buffer = 50)
grouped
可以使用分组(grouped
)函数将流的结果划分为指定的块大小。
import zio._
import zio.stream._
val groupedResult: ZStream[Any, Nothing, Chunk[Int]] =
Stream
.fromIterable(0 to 100)
.grouped(50)
groupByKey
可以使用 groupByKey
或 groupBy
,按函数的执行结果对流进行分区。在下面的示例中,检查的结果被分组并计数。
import zio._
import zio.stream._
case class Exam(person: String, score: Int)
val examResults = Seq(
Exam("Alex", 64),
Exam("Michael", 97),
Exam("Bill", 77),
Exam("John", 78),
Exam("Bobby", 71)
)
val groupByKeyResult: ZStream[Any, Nothing, (Int, Int)] =
Stream
.fromIterable(examResults)
.groupByKey(exam => exam.score / 10 * 10) {
case (k, s) => ZStream.fromEffect(s.runCollect.map(l => k -> l.size))
}
groupedWithin
groupedWithin
允许按时间或块大小对事件进行分组,以先满足者为准。在下面的示例中,每个块均最多包含 30 个元素,并且每 3 秒生成一次。
import zio._
import zio.stream._
import zio.duration._
import zio.clock.Clock
val groupedWithinResult: ZStream[Any with Clock, Nothing, Chunk[Int]] =
Stream.fromIterable(0 to 10)
.repeat(Schedule.spaced(1.seconds))
.groupedWithin(30, 10.seconds)
消费一个 Stream
import zio._
import zio.console._
import zio.stream._
val result: RIO[Console, Unit] = Stream.fromIterable(0 to 100).foreach(i => putStrLn(i.toString))
使用 Sink
Sink[E, A0, A, B]
表示接受的消费类型为 A,最终产生或者 E 型的错误, B 型的成功结果,以及剩余的类型为 A0。
例如,您可以使用 Sink.foldLeft
将 Stream 中的数据累加到单一个 ZIO 值:
import zio._
import zio.stream._
def streamReduce(total: Int, element: Int): Int = total + element
val resultFromSink: UIO[Int] = Stream(1,2,3).run(Sink.foldLeft(0)(streamReduce))
在多个流上工作
您可以使用合并方法合并多个流:
import zio.stream._
val merged: Stream[Nothing, Int] = Stream(1,2,3).merge(Stream(2,3,4))
或合并(zip)多个流:
import zio.stream._
val zippedStream: Stream[Nothing, (Int, Int)] = Stream(1,2,3).zip(Stream(2,3,4))
然后您可以将流中的原属合并为单个 ZIO 值:
import zio._
def tupleStreamReduce(total: Int, element: (Int, Int)) = {
val (a,b) = element
total + (a +b)
}
val reducedResult: UIO[Int] = zippedStream.run(Sink.foldLeft(0)(tupleStreamReduce))
流压缩
解压
如果您读取到 Content-Encoding: deflate, Content-Encoding: gzip
或其它此类压缩数据流,则以下转换器可能会有所帮助:
inflate
转换器可以根据 RFC 1951 标准对 deflated 格式的压缩输入流进行解压缩。gunzip
转换器可以根据 RFC 1952 标准对 gzipped 格式的压缩输入流进行解压缩。
如果输入未经过正确的压缩,这两种解压缩方法都将以 CompressionException
作为失败类型。
import zio.stream.ZStream
import zio.stream.Transducer.{ gunzip, inflate }
import zio.stream.compression.CompressionException
def decompressDeflated(deflated: ZStream[Any, Nothing, Byte]): ZStream[Any, CompressionException, Byte] = {
val bufferSize: Int = 64 * 1024 // Internal buffer size. Few times bigger than upstream chunks should work well.
val noWrap: Boolean = false // For HTTP Content-Encoding should be false.
deflated.transduce(inflate(bufferSize, noWrap))
}
def decompressGzipped(gzipped: ZStream[Any, Nothing, Byte]): ZStream[Any, CompressionException, Byte] = {
val bufferSize: Int = 64 * 1024 // Internal buffer size. Few times bigger than upstream chunks should work well.
gzipped.transduce(gunzip(bufferSize))
}
压缩
deflate
转换器根据 RFC 1951 标准对流中的字节进行压缩。
import zio.stream.ZStream
import zio.stream.Transducer.deflate
import zio.stream.compression.{CompressionLevel, CompressionStrategy, FlushMode}
def compressWithDeflate(clearText: ZStream[Any, Nothing, Byte]): ZStream[Any, Nothing, Byte] = {
val bufferSize: Int = 64 * 1024 // Internal buffer size. Few times bigger than upstream chunks should work well.
val noWrap: Boolean = false // For HTTP Content-Encoding should be false.
val level: CompressionLevel = CompressionLevel.DefaultCompression
val strategy: CompressionStrategy = CompressionStrategy.DefaultStrategy
val flushMode: FlushMode = FlushMode.NoFlush
clearText.transduce(deflate(bufferSize, noWrap, level, strategy, flushMode))
}
def deflateWithDefaultParameters(clearText: ZStream[Any, Nothing, Byte]): ZStream[Any, Nothing, Byte] =
clearText.transduce(deflate())