ZIO 的底层使用了 纤程 来支持多并发。 纤程非常强大,但是它们运行在底层,为了提高效率,ZIO 提供了基于纤程的上层操作。
如果可能,你应该总是选择使用上层操作,而不是直接和纤程打交道。为了完整起见,本节将介绍两种纤程以及在其上构建的一些上层操作。
纤程 Fibers
ZIO 的并发是构建在 纤程
上的,它是一种由 ZIO 运行时系统实现的轻量级的“绿色线程”。
和操作系统线程不同,纤程几乎不占用内存,具有可伸缩的堆栈,不会浪费资源,并且如果纤程处于挂起和无法访问状态,则会自动被垃圾回收。
纤程是由 ZIO 运行时调度的,并且会相互协作产生,即使在单线程环境(例如JavaScript,甚至是只配置一个线程的 JVM)中运行时,也可以实现多任务处理。
ZIO 中的所有效果都是由某个纤程执行的。如果你没有创建过纤程,那么纤程将由正在当前的操作(如果该操作是并发的或并行的)或由 ZIO 运行时系统自动创建。
即使您仅编写“单线程”代码,没有并行或并发操作,也将至少有一个纤程,作为执行 effect 的“主”纤程。
Fiber 数据类型
每条 ZIO 纤程都负责执行着某个 effect,并且由 ZIO 的 Fiber
数据类型来代表该运行计算的“句柄”。Fiber
数据类型于 Scala 的 Future
数据类型很类似。
Fiber[E, A]
数据类型在 ZIO 中有两个参数:
E
Failure Type. 如果纤程运行失败,将返回这种类型的值。A
Success Type. 如果纤程运行成功,则返回这种类型的值。
纤程并没有代表环境类型的 R
参数,以为它们执行的 effects 已经处于运行状态了,并且也已经为他们提供了所需的环境。
效果分支
创建纤程的最基本方法是对一个已存在的 effect 执行 fork
操作。 从概念上讲,效果分叉
后会在新的纤程上开始执行,并且返回对新创建的 Fiber
具柄。
下面的代码创建了一个纤程来执行 fib(100)
:
def fib(n: Long): UIO[Long] = UIO {
if (n <= 1) UIO.succeed(n)
else fib(n - 1).zipWith(fib(n - 2))(_ + _)
}.flatten
val fib100Fiber: UIO[Fiber[Nothing, Long]] =
for {
fiber <- fib(100).fork
} yield fiber
Joining Fibers
从 Fiber 中返回一个 effect 的方法之一,是使用 Fiber#join
。由方法 Fiber#join
返回的纤程可以是成功的,也可以是失败的:
for {
fiber <- IO.succeed("Hi!").fork
message <- fiber.join
} yield message
Awaiting Fibers
另一种纤程返回的方法是 Fiber#await
,它返回一个包含返回值的 effect,该值提供了有关纤程如何完成的完整信息。
for {
fiber <- IO.succeed("Hi!").fork
exit <- fiber.await
} yield exit
纤程的中断
可以中断不再需要结果的纤程从而立即终止纤程的执行,安全地释放所有资源并运行所有的终结器。
和 await
一样, Fiber#interrupt
返回一个纤程如何完成退出的说明。
for {
fiber <- IO.succeed("Hi!").forever.fork
exit <- fiber.interrupt
} yield exit
根据设计,由 Fiber#interrupt
返回的 effect 将不可恢复直到纤程彻底完成后。如果不需要恢复它,则可以对中断本身使用 fork
:
for {
fiber <- IO.succeed("Hi!").forever.fork
_ <- fiber.interrupt.fork // I don't care!
} yield ()
纤程的组合
ZIO 允许您使用 Fiber#zip
或 Fiber#zipWith
来组合纤程。
这些方法将两个纤程合并为一个单个的纤程,并产生两者的结果。如果任一个纤程发生失败,则合成的纤程也将失败。
for {
fiber1 <- IO.succeed("Hi!").fork
fiber2 <- IO.succeed("Bye!").fork
fiber = fiber1.zip(fiber2)
tuple <- fiber.join
} yield tuple
另一个组合纤程的方法是使用 Fiber#orElse
函数。如果第一个纤程成功,那么组合将返回该成功值,否则组合将执行并返回第二个纤程的返回值。(无论是否成功)
for {
fiber1 <- IO.fail("Uh oh!").fork
fiber2 <- IO.succeed("Hurray!").fork
fiber = fiber1.orElse(fiber2)
tuple <- fiber.join
} yield tuple
并行
ZIO 提供了很多操作用于实现 effect 的并行。这些方法都以 Par
作为后缀以有助于您判断何时将采用并行。
例如, 普通的 ZIO#zip
方法将两个 effect 按序列运算并合并在一起。但是还有一个 ZIO#zipPar
方法,可以将两个 effect 并行执行并合并在一起。
下表总结了一些顺序操作及其对应的并行版本:
Description | Sequential | Parallel |
---|---|---|
Zips two effects into one | ZIO#zip | ZIO#zipPar |
Zips two effects into one | ZIO#zipWith | ZIO#zipWithPar |
Collects from many effects | ZIO.collectAll | ZIO.collectAllPar |
Effectfully loop over values | ZIO.foreach | ZIO.foreachPar |
Reduces many values | ZIO.reduceAll | ZIO.reduceAllPar |
Merges many values | ZIO.mergeAll | ZIO.mergeAllPar |
对于所有并行操作,如果一个 effect 失败,则其他 effect 也将被中断,以最大程度地减少不必要的计算。
如果不希望这种快速失败的行为,则可以先使用 ZIO#either
或 ZIO#option
方法将潜在的失效转换为可靠的 effect。
Racing
ZIO 允许您可以并行运行多个 effect,并返回第一个成功的结果:
for {
winner <- IO.succeed("Hello").race(IO.succeed("Goodbye"))
} yield winner
如果您想要第一个成功或失败,而不是局限于第一个成功,则可以使用 left.either rase right.either
。
for {
winner <- IO("Hello").delay(10.seconds).either race IO.fail("Goodbye").either
} yield winner
超时
ZIO 使您可以使用 ZIO#timeout
方法控制任何 effect 的超时,该方法会返回一个新的以 Option
为其成功类型的 effect。如果 Option 等于 None
则表示效果完成前已超时。
import zio.duration._
IO.succeed("Hello").timeout(10.seconds)
如果 effect 超时,那么它会被中断,而不是在后台继续执行,因此不会浪费资源。
Next Steps
如果您对基本并发感到满意,那么下一步就是学习effects测试.