要在不影响当前进程的情况下执行 effect,可以使用纤程,这是一种轻量级的并发机制。
您可以通过 fork
让任何 IO[E, A]
立即产生出一个纤程 (UIO[Fiber[E, A]]
)。可以通过 join 来合并一个持有的纤程,该调用将会得到该纤程的返回值,或者也可以中断(interrupt)
该纤程的执行,一个终止的纤程会安全地释放该纤程持有的所有资源。
import zio._
val analyzed =
for {
fiber1 <- analyzeData(data).delay(2 seconds).fork // IO[E, Analysis]
fiber2 <- validateData(data).fork // IO[E, Boolean]
// Do other stuff
valid <- fiber2.join
_ <- if (!valid) fiber1.interrupt
else IO.unit
analyzed <- fiber1.join
} yield analyzed
在 JVM 上,纤程会使用到线程,但不会无限制地消耗线程。相反,纤程(对有限的线程以)高竞态地方式协同运行。
def fib(n: Int): UIO[Int] =
if (n <= 1) {
IO.succeed(1)
} else {
for {
fiber1 <- fib(n - 2).fork
fiber2 <- fib(n - 1).fork
v2 <- fiber2.join
v1 <- fiber1.join
} yield v1 + v2
}
直到纤程已完成或已被彻底中断并且其所有终结器都已运行,中断操作才会返回。这些精确的语义是为了允许构建一个不会泄漏资源的程序。
fork0
是一个更强大的 fork
变种,它允许指定监管程序,该监管程序可以处理任何来自受监管的纤程的任何的不可恢复的错误,包括终结器中发生的所有此类错误。如果未指定监管程序,则将采用父纤程的监管程序,并以此递归直到根处理程序为止,监管程序可以在运行时(runtime)中指定(缺省的监管程序仅打印堆栈跟踪)。
错误模型
IO
的错误模型简单且一致,支持类型错误和终止,并且不违背 Functor
层次结构中的任何法则。
一个 IO[E, A]
的值只会引发E类型的错误。这个错误可以通过 either
方法来恢复。这是一个不会失败的 effect,因为失败值被作为 Either
的成功值的一部分返回。
val error: Task[String] = IO.fail(new RuntimeException("Some Error"))
val errorEither: ZIO[Any, Nothing, Either[Throwable, String]] = error.either
除了类型 E
错误外,一个纤程可能由于以下原因终止:
- 纤程自行终止或被另一个纤程中断。 “主”纤程不能被中断,因为它不是从任何其他纤程中分支出来的。
- 纤程无法处理某些
E
型错误。只有在IO.fail
未被处理时才会发生。对于UIO[A]
类型的值,这种类型的故障是不可能的。 - 纤程中的缺陷会导致不可恢复的错误。但是只能通过两种方式产生这种缺陷:
- 将一个偏函数传递给高阶函数,比如
map
或flatMap
。例如,io.map(_ => throw e)
或io.flatMap(a => throw e)
。解决此问题的方法是不要将不纯函数传递给像 ZIO 这样的纯函数库,因为这样做会导致违反代数定律和破坏方程式推理。 - 在
IO.effectTotal
等函数中使用会抛出错误的代码。要在 IO 中使用不完全效果的代码,正确的解决办法是使用诸如IO.effect
之类的方法,该方法可以将异常安全地转换为值。
- 将一个偏函数传递给高阶函数,比如
当纤程被终止时,终止原因被以 Throwable 的方式传递给纤程的监管程序,该管理程序可以选择记录日志,打印堆栈跟踪记录,重新启动纤程或执行其它符合上下文的其他操作。
纤程如果被中断,其自身无法停止这个过程。哪怕在中断过程中某些终结器抛出了不可恢复的错误,所有终结器也都依然会被执行。终结器抛出的错误将被传递给纤程的监管器。
在任何情况下,都不会丢失任何错误,这使得 IO 错误模型比 Scala 和 Java 中的try/catch/finally
结构更易于诊断,因为它们很容易丢失错误。
并行
zipPar
可以用于并行计算:
def bigCompute(m1: Matrix, m2: Matrix, v: Matrix): UIO[Matrix] =
for {
t <- computeInverse(m1).zipPar(computeInverse(m2))
(i1, i2) = t
r <- applyMatrices(i1, i2, v)
} yield r
zipPar
组合器具有资源安全的语义。如果一个计算失败,另一计算将被中断,以防止浪费资源。
Racing
两个 IO 操作可以执行“竞速”运算,这意味着它们将并行执行,并且将返回先成功
的运算的值。
fib(100) race fib(200)
race
组合器也是资源安全的,这意味着如果两个操作之一返回一个值,则另一个将被中断,以防止浪费资源。
race
和 zipPar
组合器是功能更强大的 raceWith
组合器的特例,它可以在两个操作中的第一个成功执行时执行用户定义的逻辑。
线程切换 – JVM
默认情况下,纤程不保证它们会在哪个线程上执行。它们可能会在线程之间切换,尤其是长时间执行时。
纤程只会在运行时(Runtime
)系统的线程池中的线程间转移,这意味着默认情况下,长时间运行的纤程最终将回到运行时系统的线程池中,哪怕它们是从其他线程(池)中启动的,(异步)恢复后也是如此。
出于性能方面的考虑,纤程会尝试在同一个线程上执行一个(可配置的)最短的时间,然后切换到其它纤程。从异步回调中恢复的纤程将在它的启动线程上恢复,并持续一段时间,然后被切换到运行时线程池上恢复运行。
这样的默认配置有助于确保堆栈安全和协作式多任务处理。如果不需要自动线程转移,则可以在运行时(Runtime
)中更改它们。