Stream

Stream[E, A] 表示一个可以产生 A 类型输出值,或可能以 E 类型为失败值的,效果化的流。

新建 Stream

import zio.stream._

val stream: Stream[Nothing, Int] = Stream(1,2,3)

或产生自 Iterable

import zio.stream._

val streamFromIterable: Stream[Nothing, Int] = Stream.fromIterable(0 to 100)

转换一个 Stream

ZIO Stream 提供了许多标准的转换函数,例如:mappartitiongroupedgroupByKeygroupedWithin等。以下是如何使用它们的示例。

map

import zio.stream._

val intStream: Stream[Nothing, Int] = Stream.fromIterable(0 to 100)
val stringStream: Stream[Nothing, String] = intStream.map(_.toString)

partition

partition 根据函数参数将 stream 分成多个流元组。第一个流包含评估为 true的所有元素,第二个流包含评估为 false 的所有元素。较快的流可能比较慢的流领先,领先的程度受缓冲大小的限制。两个流都以 ZManaged 类型打包。在下面的示例中,左流仅包含偶数。

import zio._
import zio.stream._

val partitionResult: ZManaged[Any, Nothing, (ZStream[Any, Nothing, Int], ZStream[Any, Nothing, Int])] =
  Stream
    .fromIterable(0 to 100)
    .partition(_ % 2 == 0, buffer = 50)

grouped

可以使用分组(grouped)函数将流的结果划分为指定的块大小。

import zio._
import zio.stream._

val groupedResult: ZStream[Any, Nothing, Chunk[Int]] =
  Stream
    .fromIterable(0 to 100)
    .grouped(50)

groupByKey

可以使用 groupByKeygroupBy,按函数的执行结果对流进行分区。在下面的示例中,检查的结果被分组并计数。

import zio._
import zio.stream._

  case class Exam(person: String, score: Int)

  val examResults = Seq(
    Exam("Alex", 64),
    Exam("Michael", 97),
    Exam("Bill", 77),
    Exam("John", 78),
    Exam("Bobby", 71)
  )

  val groupByKeyResult: ZStream[Any, Nothing, (Int, Int)] =
    Stream
      .fromIterable(examResults)
      .groupByKey(exam => exam.score / 10 * 10) {
        case (k, s) => ZStream.fromEffect(s.runCollect.map(l => k -> l.size))
      }

groupedWithin

groupedWithin 允许按时间或块大小对事件进行分组,以先满足者为准。在下面的示例中,每个块均最多包含 30 个元素,并且每 3 秒生成一次。

import zio._
import zio.stream._
import zio.duration._
import zio.clock.Clock

val groupedWithinResult: ZStream[Any with Clock, Nothing, Chunk[Int]] =
  Stream.fromIterable(0 to 10)
    .repeat(Schedule.spaced(1.seconds))
    .groupedWithin(30, 10.seconds)

消费一个 Stream

import zio._
import zio.console._
import zio.stream._

val result: RIO[Console, Unit] = Stream.fromIterable(0 to 100).foreach(i => putStrLn(i.toString))

使用 Sink

Sink[E, A0, A, B] 表示接受的消费类型为 A,最终产生或者 E 型的错误, B 型的成功结果,以及剩余的类型为 A0。

例如,您可以使用 Sink.foldLeft 将 Stream 中的数据累加到单一个 ZIO 值:

import zio._
import zio.stream._

def streamReduce(total: Int, element: Int): Int = total + element
val resultFromSink: UIO[Int] = Stream(1,2,3).run(Sink.foldLeft(0)(streamReduce))

在多个流上工作

您可以使用合并方法合并多个流:

import zio.stream._

val merged: Stream[Nothing, Int] = Stream(1,2,3).merge(Stream(2,3,4))

或合并(zip)多个流:

import zio.stream._

val zippedStream: Stream[Nothing, (Int, Int)] = Stream(1,2,3).zip(Stream(2,3,4))

然后您可以将流中的原属合并为单个 ZIO 值:

import zio._

def tupleStreamReduce(total: Int, element: (Int, Int)) = {
  val (a,b) = element
  total + (a +b)
} 

val reducedResult: UIO[Int] = zippedStream.run(Sink.foldLeft(0)(tupleStreamReduce))

流压缩

解压

如果您读取到 Content-Encoding: deflate, Content-Encoding: gzip 或其它此类压缩数据流,则以下转换器可能会有所帮助:

  • inflate 转换器可以根据 RFC 1951 标准对 deflated 格式的压缩输入流进行解压缩。
  • gunzip 转换器可以根据 RFC 1952 标准对 gzipped 格式的压缩输入流进行解压缩。

如果输入未经过正确的压缩,这两种解压缩方法都将以 CompressionException 作为失败类型。

import zio.stream.ZStream
import zio.stream.Transducer.{ gunzip, inflate }
import zio.stream.compression.CompressionException

def decompressDeflated(deflated: ZStream[Any, Nothing, Byte]): ZStream[Any, CompressionException, Byte] = {
  val bufferSize: Int = 64 * 1024 // Internal buffer size. Few times bigger than upstream chunks should work well.
  val noWrap: Boolean = false     // For HTTP Content-Encoding should be false.
  deflated.transduce(inflate(bufferSize, noWrap))
}

def decompressGzipped(gzipped: ZStream[Any, Nothing, Byte]): ZStream[Any, CompressionException, Byte] = {
  val bufferSize: Int = 64 * 1024 // Internal buffer size. Few times bigger than upstream chunks should work well.
  gzipped.transduce(gunzip(bufferSize))
}

压缩

deflate 转换器根据  RFC 1951 标准对流中的字节进行压缩。

import zio.stream.ZStream
import zio.stream.Transducer.deflate
import zio.stream.compression.{CompressionLevel, CompressionStrategy, FlushMode}

def compressWithDeflate(clearText: ZStream[Any, Nothing, Byte]): ZStream[Any, Nothing, Byte] = {
  val bufferSize: Int = 64 * 1024 // Internal buffer size. Few times bigger than upstream chunks should work well.
  val noWrap: Boolean = false // For HTTP Content-Encoding should be false.
  val level: CompressionLevel = CompressionLevel.DefaultCompression
  val strategy: CompressionStrategy = CompressionStrategy.DefaultStrategy
  val flushMode: FlushMode = FlushMode.NoFlush
  clearText.transduce(deflate(bufferSize, noWrap, level, strategy, flushMode))
}

def deflateWithDefaultParameters(clearText: ZStream[Any, Nothing, Byte]): ZStream[Any, Nothing, Byte] =
  clearText.transduce(deflate())
Leave a Reply
Your email address will not be published.
*
*

BACK TO TOP