TArray 是可以参与 STM 事务的可变引用的数组。 创建一个 TArray 创建一个空的 TArray: 或创建具有指定值的 TArray: 或者,您可以通过指定的集合来创建 TArray: 从 TArray 读取值 可以通过以下方式获得数组的第 n 个元素: 访问不存在的索引会引发 ArrayIndexOutOfBoundsException 异常并中止事务。 更新 TArray 中的值 可以按照以下步骤更新数组的第n个元素: 可以通过 updateM 效果化地更新数组的第n个元素: 更新不存在的索引会引发 ArrayIndexOutOfBoundsException 并中止事务。 转换 TArray 的元素 transform(A => A) 函数可以为数组中的每一个元素计算新值: 可以通过 transformM 效果化地映射元素: fold 通过两个指定的运算遍历折叠 TArray 中的元素: 可以通过 foldM 效果化地进行遍历折叠: 对 TArray 元素执行 side-effect 运算 foreach 用于对数组中的每个元素执行 side-effect 运算:
ZLayer[A, E, B] 描述应用程序的一层:应用程序中的每个层都需要一些服务(作为输入)并产生出一些服务(作为输出)。Layers 可以被视基于给定他们的依赖关系(其他服务)而产生这些服务绑定所需要的清单。 层的构造可以使资源的使用效果化,并且使资源被安全地获取,使用完后安全地释放。 默认情况下,层(Layer)是共享的,这意味着如果同一图层使用两次,则该图层将仅分配一次。由于层具有出色的可组合性,因此它们是 ZIO 中创建依赖于其他服务的服务的惯用方式。 最简单的 ZLayer 应用 具有依赖项的 ZLayer 应用 复杂的 ZLayer 依赖案例
Chunk[A] 代表一大块类型为 A 的值。块的设计块通常由数组支持,但向下层元素公开纯函数式的,安全的接口,并且它们对使用数组代价高昂的操作(例如重复级联)采用懒模式。 新建一个块(chunk) 块连接(Concatenating): ++ 操作返回当前块与指定块的串联块。例如: 块搜集(collecting): collect 过滤、映射块中的元素并返回新的块。以下是如何使用 collect 函数从Chunk[A] 中挑选所有字符串的例子: 如何使用 collect 函数从 Chunk[A] 中挑选所有数字[A]: collectWhile (从左到右)收集元素,直到选择条件首次返回“ false”: 或者另外一个例子: 块删除(dropping): drop 函数从块中删除前 n 个元素。 dropWhile 依据条件函数删除所有返回 true 的元素: 块比较(Comparing): 块转换(Converting) toArray 将一个块转换成 Array。 toSeq 将一个块t转换成 Seq。
Stream[E, A] 表示一个可以产生 A 类型输出值,或可能以 E 类型为失败值的,效果化的流。 新建 Stream 或产生自 Iterable: 转换一个 Stream ZIO Stream 提供了许多标准的转换函数,例如:map,partition,grouped,groupByKey,groupedWithin等。以下是如何使用它们的示例。 map partition partition 根据函数参数将 stream 分成多个流元组。第一个流包含评估为 true的所有元素,第二个流包含评估为 false 的所有元素。较快的流可能比较慢的流领先,领先的程度受缓冲大小的限制。两个流都以 ZManaged 类型打包。在下面的示例中,左流仅包含偶数。 grouped 可以使用分组(grouped)函数将流的结果划分为指定的块大小。 groupByKey 可以使用 groupByKey 或 groupBy,按函数的执行结果对流进行分区。在下面的示例中,检查的结果被分组并计数。 groupedWithin groupedWithin 允许按时间或块大小对事件进行分组,以先满足者为准。在下面的示例中,每个块均最多包含 30 个元素,并且每 3 秒生成一次。 消费一个 Stream 使用 Sink Sink[E, A0, A, B] 表示接受的消费类型为 A,最终产生或者 E 型的错误, B 型的成功结果,以及剩余的类型为 A0。 例如,您可以使用 Sink.foldLeft 将 Stream 中的数据累加到单一个 ZIO 值: 在多个流上工作 您可以使用合并方法合并多个流: 或合并(zip)多个流: 然后您可以将流中的原属合并为单个 ZIO 值: 流压缩 解压 如果您读取到 Content-Encoding: deflate, Content-Encoding: gzip 或其它此类压缩数据流,则以下转换器可能会有所帮助: inflate 转换器可以根据 RFC 1951 标准对 deflated 格式的压缩输入流进行解压缩。 gunzip 转换器可以根据 RFC 1952 标准对 gzipped 格式的压缩输入流进行解压缩。 如果输入未经过正确的压缩,这两种解压缩方法都将以 CompressionException 作为失败类型。 压缩 deflate 转换器根据 RFC 1951 标准对流中的字节进行压缩。
ZSink[R, E, A, B] 用于消费从流中产生的元素。您可以将此接收器视为消费可变数量的 A 元素(可能为 0、1或很多!)的函数,可能因 E 类型错误而失败,或最终产生 B 类型的值。 ZSink 被作为参数传递给 ZStream#run: 建立 sinks zio.stream 提供了多种不同的 sink 供使用。 将数据收集到 Chunk[A] 中: 尝试将第一个元素接收到一个 Option 中(如果流为空则返回 None): stream.runDrain 的以下实现将忽略流中所有的输入: 产生一个给定类型的失败: 基本的接收数据累积函数: 具有短路功能的折叠器: sinks 转换 创建 Sink 后,我们可以使用提供的操作对其进行转换。 并行运行两个接收器并返回先执行完成的那一个: 我们可以使用 contramap,通过给定 C => A,其中 C 是输入类型,而 A 是 Sink的接收的元素类型,将给定的输入转换为某个特定的 Sink dimap 是 contramap 的扩展,它还可以指定 Sink 的输出转换:
信号量 Semaphore 数据类型,它允许通过 withPermit 方法在纤程之间进行同步,该方法可以安全地获取和释放许可证。信号量是基于 Ref[A] 数据类型的。 操作 例如,异步任务可以通过获取和释放具有给定数量许可的信号量来完成彼此的同步。当信号量中的许可值不足,获取操作无法执行时,该任务将在纤程队列中被置于挂起状态,直到有足够的许可值时被唤醒: (以上)二值信号量只是一种特殊的信号量。我们可以要求获取和释放任意指定数量的信号量: withPermit(及其对应的计数版本 withPermits)的保证是,无论任务是成功,失败还是被中断,许可证在被成功获取之后都会被释放。
Schedules 允许你定义和编写灵活的重复执行调度器,这些事件可以是重复的计算,或在出现错误时重试操作。Schedules 被用于以下场景: 重复 IO#repeat —— 重复执行 effect,直到调度计划结束。 IO#repeatOrElse —— 重复执行 effect,直到调度计划完成,如果得到错误,则返回另一个 effect 的结果。 IO#repeatOrElse0 —— 重复执行 effect,直到调度计划完成,如果得到错误,则返回另一个具有更多(定制)能力的 effect 的结果。 Retries IO#retry —— 重试一个 effect 直到成功。 IO#retryOrElse —— 运行一个 effect,如果失败则尝试另一个 effect,不断重试两者直到成功。 IO#retryOrElse0 —— 运行一个 effect,如果失败则尝试另一个具有更多(定制)能力的 effect,不断重试两者直到成功。 Schedules 定义了有状态的,可能有效果的事件的重复执行调度计划,并以允许以多种方式进行组合。 一个 Schedule[R, A, B] 它的输入类型为 A(A 在 retry 情况下为(前一调用的)错误类型,或在 repeat 情况下为(前一调用的)输出类型),并根据这些值和内部状态决定是重复执行还是返回结果。每个决策都会伴随(可能为零的)延迟,该延迟指示下一次重复发生之前需要多少时间停顿,并最终得到 B 类型输出值。 基本的 Schedules 一个永远循环的 Schedule: 一个循环 10 次的 Schedule: 一个每 10 毫秒循环一次的 Schedule: (缺省为平方)指数延迟的 Scheduler 以 fibonacci 计数(每次延迟是前两次延迟之和)的延迟: Schedule 组合器 给 Schedule 附加一个随机修正值。 修改 schedule 的延迟间隔: 依次串行组合两个调度器,先遵循第一个策略直到结束,然后遵循第二个策略直到结束: 合并两个调度器(取交集),仅在两个调度器都得到满足时才以两者之间的最大延迟重复执行调度: 合并两个调度器(取并集),如果两个调度计划中的任何一个想要重复执行,则使用两次重复之间的最小延迟来重复: 经过指定的时间后,停止重试: 仅在发生特定异常时重试:
Ref[A] 是对 A 类型的可变引用值的建模。它有两个基本操作:set,向 Ref 中填入新值;get,从中获得当前的值。Ref 上所有的操作都是原子且线程安全的,这为同步并发程序提供了可靠的基础。 更新一个 Ref 使用 Ref 最简单的手段是使用 update 或它更强大的好兄弟 modify。通过这些我们可以轻易地写一个像这样的 repeat 组合器。 状态转换器 可变量的阴暗面在于它们能够轻易地被改变;它们可以像圣诞节装饰品一样添加在任何地方并改变状态。比如: 作为函数式程序员,我们相当了解如何对付它。我们可以 S =>(A, S) 类型的函数形式来捕获状态的改变。Ref 提供了这样的编码能力,S 用来表达值的类型,并通过 modify 引介状态改变函数。 构建更复杂的并发原语 Ref 的级别足够低,以至于可以用作其他并发数据类型的基础。 信号量是一种经典的用于控制对共享资源的访问的抽象数据类型。它的定义为一个三元组形式 S = (v, P, V),其中 v 是当前可用资源的单位数,P 和 V 分别是对 v 进行递减和递增的运算;P 只能在 v 是非负数的时候才能对它进行递减,否则必须等待直到非负为止。 现在,有了 Refs,我们可以很容易实现它!唯一的困难在于 P,我们必须在 v 为负数时,或在我们读取并试图改变它时,它恰好被(别的线程)改变了,这时让 P 失败并重试。 一个不成熟的实现看上去可以像下面这样: 现在让我们伴随前几天在市场上发现的这些鳄鱼皮靴子摇滚起来,在夜总会测试我们的信号灯吧,来吧! 同时不用说,您应该看一下 ZIO 内建的 Semaphore,它可以完成所有的这些甚至更多工作而不会浪费任何 CPU 周期。 多态的 Refs Ref[A] 实际上是类型 ZRef[Nothing, Nothing, A, A] 的别名。ZRef 的类型签名如下: ZRef 是对可变引用的多态的,纯函数的描述。它的基本操作包括 set 和 get。set 接受一个类型为 A 的值并将引用设置为该新值,这个操作可能以 EA 错误类型失败。get 获取并返回当前的 B 类型的引用值,或者它可能以 EB 错误类型失败。 当 ZRef 的错误和值类型统一时,即 ZRef[E, E, A, A]。ZRef 还支持如上所述的原子 modify 和 update 操作。 一个简单的用例是可以获得引用值的只读或只读视图:
Queue 是一个基于 ZIO 的轻量级的,驻扎在内存的队列,具有可组合性,并且支持透明的背压。它是完全异步(无锁或无阻塞)的,纯函数且类型安全的。 Queue[A] 包含的值的类型为 A,并且具有两个基本操作:offer,将 A 放入队列中,和 take, 删除并返回队列中最旧的值。 新建一个队列 一个 Queue 可以是有界的(容量有限)或无界的。 当队列已满时,有几种策略可用于处理新值: 对默认的有界(bounded)队列是反压:填满后,任何提供数据的纤程都将被挂起,直到队列能够添加新内容为止; 下降(dropping)队列当队列满时会丢弃新的数据; 当滑动(sliding)队列满时会丢弃最陈旧的数据。 建立一个支持背压的有界队列: 建立一个下降队列: 建立一个滑动队列: 建立一个无界队列: 向队列添加数据 向队列添加值的最简单方法是 offer: 使用背压队列时,如果队列已满,offer 可能会暂时挂起:您可以使用 fork 让其在其它纤程中等待。 还可以使用 offerAll 一次添加多个值: 从队列中消费数据 take 操作从队列中删除最旧的数据并返回它。如果队列为空,它将时挂起,直到新的数据添加到队列后才继续。与 offer 一样,您可以使用 fork 让其在其他纤程中等待新值。 您可以通过 poll 来消费最旧的数据。如果队列为空,你会得到 None,否则数据将被包装在 Some 返回。 可以使用 takeUpTo 一次消费多个数据。如果队列中没有足够的数据要退回,则它将返回所有数据,而无需等待更多 offer 。 同样,您可以使用 takeAll 一次获得所有数据。它也无需等待就返回(如果队列为空,则为空列表)。 关闭队列 可以通过 shutdown 来中断(interrupt exception)所有处于 offer* 或 take* 而挂起的纤程。它还将清空队列,并让所有未来的 offer* 和 take* 调用立刻结束(interrupt exception)。 您可以在队列关闭时使用 awaitShutdown 执行 effect。它将等待直到队列被关闭。如果队列已经关闭,它将立即返回。 转换队列Transforming queues 实际上,Queue[A] 是 ZQueue[Any, Any, Nothing, Nothing, A, A] 类型的别名。完整版本的签名为: 它的含义是: 这个队列接收 A 类型的值。需要 RA 类型的环境支持入队操作,并且可能会因EA 类型的错误而失败; 队列将产生 B 类型输出值。需要 RB 型环境支持出队操作,并可能因 EB 型错误而失败。 请注意,基本的 Queue[A] 不会失败也不需要任何环境来支持进行任何操作。 基于输入和输出的不同类型参数,可以实现各种不同的队列组合: ZQueue#map 可以将队列的输出映射(到不同类型): ZQueue#mapM 我们还可以使用一个效果(effectful)函数来映射输出。例如,我们可以为每个元素加上出列的时间戳: ZQueue#contramapM 与 mapM 相似,我们也可以在元素入列时将效果(effectful)函数应用于元素。此例将使用其入队时间戳注释元素: 此例中的队列与上一个例子中的队列具有相同的类型((Long, String)),但是当元素排队时,时间戳附加到元素上。这反映在排队入队所需的环境类型中。 一个完整的示例:我们可以将此队列(contramapM)与 mapM 结合起来,以计算元素保留在队列中的时长: ZQueue#bothWith 我们也可以将两个队列组合成一个队列,该(组合)队列广播 offer (到两个队列中)并(同时)从两个队列中接收(take)消息: 附加资料 ZIO Queue Talk by John De Goes @ ScalaWave 2018 ZIO Queue Talk by Wiem Zine El Abidine @ PSUG 2018 Elevator Control System using ZIO Scalaz 8 IO vs Akka (typed) actors vs Monix