基本并发处理

ZIO 的底层使用了 纤程 来支持多并发。 纤程非常强大,但是它们运行在底层,为了提高效率,ZIO 提供了基于纤程的上层操作。

如果可能,你应该总是选择使用上层操作,而不是直接和纤程打交道。为了完整起见,本节将介绍两种纤程以及在其上构建的一些上层操作。

纤程 Fibers

ZIO 的并发是构建在 纤程 上的,它是一种由 ZIO 运行时系统实现的轻量级的“绿色线程”。

和操作系统线程不同,纤程几乎不占用内存,具有可伸缩的堆栈,不会浪费资源,并且如果纤程处于挂起和无法访问状态,则会自动被垃圾回收。

纤程是由 ZIO 运行时调度的,并且会相互协作产生,即使在单线程环境(例如JavaScript,甚至是只配置一个线程的 JVM)中运行时,也可以实现多任务处理。

ZIO 中的所有效果都是由某个纤程执行的。如果你没有创建过纤程,那么纤程将由正在当前的操作(如果该操作是并发的或并行的)或由 ZIO 运行时系统自动创建。

即使您仅编写“单线程”代码,没有并行或并发操作,也将至少有一个纤程,作为执行 effect 的“主”纤程。

Fiber 数据类型

每条 ZIO 纤程都负责执行着某个 effect,并且由 ZIO 的 Fiber 数据类型来代表该运行计算的“句柄”。Fiber 数据类型于 Scala 的 Future 数据类型很类似。

Fiber[E, A] 数据类型在 ZIO 中有两个参数:

  • E Failure Type. 如果纤程运行失败,将返回这种类型的值。
  • A Success Type. 如果纤程运行成功,则返回这种类型的值。

纤程并没有代表环境类型的 R 参数,以为它们执行的 effects 已经处于运行状态了,并且也已经为他们提供了所需的环境。

效果分支

创建纤程的最基本方法是对一个已存在的 effect 执行 fork 操作。 从概念上讲,效果分叉后会在新的纤程上开始执行,并且返回对新创建的 Fiber 具柄。

下面的代码创建了一个纤程来执行 fib(100):

def fib(n: Long): UIO[Long] = UIO {
  if (n <= 1) UIO.succeed(n)
  else fib(n - 1).zipWith(fib(n - 2))(_ + _)
}.flatten

val fib100Fiber: UIO[Fiber[Nothing, Long]] = 
  for {
    fiber <- fib(100).fork
  } yield fiber

Joining Fibers

从 Fiber 中返回一个 effect 的方法之一,是使用 Fiber#join。由方法 Fiber#join 返回的纤程可以是成功的,也可以是失败的:

for {
  fiber   <- IO.succeed("Hi!").fork
  message <- fiber.join
} yield message

Awaiting Fibers

另一种纤程返回的方法是 Fiber#await,它返回一个包含返回值的 effect,该值提供了有关纤程如何完成的完整信息。

for {
  fiber <- IO.succeed("Hi!").fork
  exit  <- fiber.await
} yield exit

纤程的中断

可以中断不再需要结果的纤程从而立即终止纤程的执行,安全地释放所有资源并运行所有的终结器。

和 await 一样, Fiber#interrupt 返回一个纤程如何完成退出的说明。

for {
  fiber <- IO.succeed("Hi!").forever.fork
  exit  <- fiber.interrupt
} yield exit

根据设计,由 Fiber#interrupt 返回的 effect 将不可恢复直到纤程彻底完成后。如果不需要恢复它,则可以对中断本身使用 fork

for {
  fiber <- IO.succeed("Hi!").forever.fork
  _     <- fiber.interrupt.fork // I don't care!
} yield ()

纤程的组合

ZIO 允许您使用  Fiber#zip 或 Fiber#zipWith 来组合纤程。

这些方法将两个纤程合并为一个单个的纤程,并产生两者的结果。如果任一个纤程发生失败,则合成的纤程也将失败。

for {
  fiber1 <- IO.succeed("Hi!").fork
  fiber2 <- IO.succeed("Bye!").fork
  fiber   = fiber1.zip(fiber2)
  tuple  <- fiber.join
} yield tuple

另一个组合纤程的方法是使用 Fiber#orElse 函数。如果第一个纤程成功,那么组合将返回该成功值,否则组合将执行并返回第二个纤程的返回值。(无论是否成功)

for {
  fiber1 <- IO.fail("Uh oh!").fork
  fiber2 <- IO.succeed("Hurray!").fork
  fiber   = fiber1.orElse(fiber2)
  tuple  <- fiber.join
} yield tuple

并行

ZIO 提供了很多操作用于实现 effect 的并行。这些方法都以 Par 作为后缀以有助于您判断何时将采用并行。

例如, 普通的 ZIO#zip 方法将两个 effect 按序列运算并合并在一起。但是还有一个 ZIO#zipPar 方法,可以将两个 effect 并行执行并合并在一起。

下表总结了一些顺序操作及其对应的并行版本:

DescriptionSequentialParallel
Zips two effects into oneZIO#zipZIO#zipPar
Zips two effects into oneZIO#zipWithZIO#zipWithPar
Collects from many effectsZIO.collectAllZIO.collectAllPar
Effectfully loop over valuesZIO.foreachZIO.foreachPar
Reduces many valuesZIO.reduceAllZIO.reduceAllPar
Merges many valuesZIO.mergeAllZIO.mergeAllPar

对于所有并行操作,如果一个 effect 失败,则其他 effect 也将被中断,以最大程度地减少不必要的计算。

如果不希望这种快速失败的行为,则可以先使用 ZIO#eitherZIO#option 方法将潜在的失效转换为可靠的 effect。

Racing

ZIO 允许您可以并行运行多个 effect,并返回第一个成功的结果:

for {
  winner <- IO.succeed("Hello").race(IO.succeed("Goodbye"))
} yield winner

如果您想要第一个成功或失败,而不是局限于第一个成功,则可以使用 left.either rase right.either

for {
    winner <- IO("Hello").delay(10.seconds).either race IO.fail("Goodbye").either
} yield winner

超时

ZIO 使您可以使用 ZIO#timeout 方法控制任何 effect 的超时,该方法会返回一个新的以 Option 为其成功类型的 effect。如果 Option 等于 None 则表示效果完成前已超时。

import zio.duration._

IO.succeed("Hello").timeout(10.seconds)

如果 effect 超时,那么它会被中断,而不是在后台继续执行,因此不会浪费资源。

Next Steps

如果您对基本并发感到满意,那么下一步就是学习effects测试.

Leave a Reply
Your email address will not be published.
*
*

BACK TO TOP