Schedule

Schedule

Schedules 允许你定义和编写灵活的重复执行调度器,这些事件可以是重复的计算,或在出现错误时重试操作。Schedules 被用于以下场景: 重复 IO#repeat —— 重复执行 effect,直到调度计划结束。 IO#repeatOrElse —— 重复执行 effect,直到调度计划完成,如果得到错误,则返回另一个 effect 的结果。 IO#repeatOrElse0 —— 重复执行 effect,直到调度计划完成,如果得到错误,则返回另一个具有更多(定制)能力的 effect 的结果。 Retries IO#retry —— 重试一个 effect 直到成功。 IO#retryOrElse —— 运行一个 effect,如果失败则尝试另一个 effect,不断重试两者直到成功。 IO#retryOrElse0 —— 运行一个 effect,如果失败则尝试另一个具有更多(定制)能力的 effect,不断重试两者直到成功。 Schedules 定义了有状态的,可能有效果的事件的重复执行调度计划,并以允许以多种方式进行组合。 一个 Schedule[R, A, B] 它的输入类型为 A(A 在 retry 情况下为(前一调用的)错误类型,或在 repeat 情况下为(前一调用的)输出类型),并根据这些值和内部状态决定是重复执行还是返回结果。每个决策都会伴随(可能为零的)延迟,该延迟指示下一次重复发生之前需要多少时间停顿,并最终得到 B 类型输出值。 基本的 Schedules 一个永远循环的 Schedule: 一个循环 10 次的 Schedule: 一个每 10 毫秒循环一次的 Schedule: (缺省为平方)指数延迟的 Scheduler 以 fibonacci 计数(每次延迟是前两次延迟之和)的延迟: Schedule 组合器 给 Schedule 附加一个随机修正值。 修改 schedule 的延迟间隔: 依次串行组合两个调度器,先遵循第一个策略直到结束,然后遵循第二个策略直到结束: 合并两个调度器(取交集),仅在两个调度器都得到满足时才以两者之间的最大延迟重复执行调度: 合并两个调度器(取并集),如果两个调度计划中的任何一个想要重复执行,则使用两次重复之间的最小延迟来重复: 经过指定的时间后,停止重试: 仅在发生特定异常时重试:

Ref

Ref[A] 是对 A 类型的可变引用值的建模。它有两个基本操作:set,向 Ref 中填入新值;get,从中获得当前的值。Ref 上所有的操作都是原子且线程安全的,这为同步并发程序提供了可靠的基础。 更新一个 Ref 使用 Ref 最简单的手段是使用 update 或它更强大的好兄弟 modify。通过这些我们可以轻易地写一个像这样的 repeat 组合器。 状态转换器 可变量的阴暗面在于它们能够轻易地被改变;它们可以像圣诞节装饰品一样添加在任何地方并改变状态。比如: 作为函数式程序员,我们相当了解如何对付它。我们可以 S =>(A, S) 类型的函数形式来捕获状态的改变。Ref 提供了这样的编码能力,S 用来表达值的类型,并通过 modify 引介状态改变函数。 构建更复杂的并发原语 Ref 的级别足够低,以至于可以用作其他并发数据类型的基础。 信号量是一种经典的用于控制对共享资源的访问的抽象数据类型。它的定义为一个三元组形式 S = (v, P, V),其中 v 是当前可用资源的单位数,P 和 V 分别是对 v 进行递减和递增的运算;P 只能在 v 是非负数的时候才能对它进行递减,否则必须等待直到非负为止。 现在,有了 Refs,我们可以很容易实现它!唯一的困难在于 P,我们必须在 v 为负数时,或在我们读取并试图改变它时,它恰好被(别的线程)改变了,这时让 P 失败并重试。 一个不成熟的实现看上去可以像下面这样: 现在让我们伴随前几天在市场上发现的这些鳄鱼皮靴子摇滚起来,在夜总会测试我们的信号灯吧,来吧! 同时不用说,您应该看一下 ZIO 内建的 Semaphore,它可以完成所有的这些甚至更多工作而不会浪费任何 CPU 周期。 多态的 Refs Ref[A] 实际上是类型 ZRef[Nothing, Nothing, A, A] 的别名。ZRef 的类型签名如下: ZRef 是对可变引用的多态的,纯函数的描述。它的基本操作包括 set 和 get。set 接受一个类型为 A 的值并将引用设置为该新值,这个操作可能以 EA 错误类型失败。get 获取并返回当前的 B 类型的引用值,或者它可能以 EB 错误类型失败。 当 ZRef 的错误和值类型统一时,即 ZRef[E, E, A, A]。ZRef 还支持如上所述的原子 modify 和 update 操作。 一个简单的用例是可以获得引用值的只读或只读视图:

Queue

Queue 是一个基于 ZIO 的轻量级的,驻扎在内存的队列,具有可组合性,并且支持透明的背压。它是完全异步(无锁或无阻塞)的,纯函数且类型安全的。 Queue[A] 包含的值的类型为 A,并且具有两个基本操作:offer,将 A 放入队列中,和 take, 删除并返回队列中最旧的值。 新建一个队列 一个 Queue 可以是有界的(容量有限)或无界的。 当队列已满时,有几种策略可用于处理新值: 对默认的有界(bounded)队列是反压:填满后,任何提供数据的纤程都将被挂起,直到队列能够添加新内容为止; 下降(dropping)队列当队列满时会丢弃新的数据; 当滑动(sliding)队列满时会丢弃最陈旧的数据。 建立一个支持背压的有界队列: 建立一个下降队列: 建立一个滑动队列: 建立一个无界队列: 向队列添加数据 向队列添加值的最简单方法是 offer: 使用背压队列时,如果队列已满,offer 可能会暂时挂起:您可以使用 fork 让其在其它纤程中等待。 还可以使用 offerAll 一次添加多个值: 从队列中消费数据 take 操作从队列中删除最旧的数据并返回它。如果队列为空,它将时挂起,直到新的数据添加到队列后才继续。与 offer 一样,您可以使用 fork 让其在其他纤程中等待新值。 您可以通过 poll 来消费最旧的数据。如果队列为空,你会得到 None,否则数据将被包装在 Some 返回。 可以使用 takeUpTo 一次消费多个数据。如果队列中没有足够的数据要退回,则它将返回所有数据,而无需等待更多 offer 。 同样,您可以使用 takeAll 一次获得所有数据。它也无需等待就返回(如果队列为空,则为空列表)。 关闭队列 可以通过 shutdown 来中断(interrupt exception)所有处于 offer* 或 take* 而挂起的纤程。它还将清空队列,并让所有未来的 offer* 和 take* 调用立刻结束(interrupt exception)。 您可以在队列关闭时使用 awaitShutdown 执行 effect。它将等待直到队列被关闭。如果队列已经关闭,它将立即返回。 转换队列Transforming queues 实际上,Queue[A] 是 ZQueue[Any, Any, Nothing, Nothing, A, A] 类型的别名。完整版本的签名为: 它的含义是: 这个队列接收 A 类型的值。需要 RA 类型的环境支持入队操作,并且可能会因EA 类型的错误而失败; 队列将产生 B 类型输出值。需要 RB 型环境支持出队操作,并可能因 EB 型错误而失败。 请注意,基本的 Queue[A] 不会失败也不需要任何环境来支持进行任何操作。 基于输入和输出的不同类型参数,可以实现各种不同的队列组合: ZQueue#map 可以将队列的输出映射(到不同类型): ZQueue#mapM 我们还可以使用一个效果(effectful)函数来映射输出。例如,我们可以为每个元素加上出列的时间戳: ZQueue#contramapM 与 mapM 相似,我们也可以在元素入列时将效果(effectful)函数应用于元素。此例将使用其入队时间戳注释元素: 此例中的队列与上一个例子中的队列具有相同的类型((Long, String)),但是当元素排队时,时间戳附加到元素上。这反映在排队入队所需的环境类型中。 一个完整的示例:我们可以将此队列(contramapM)与 mapM 结合起来,以计算元素保留在队列中的时长: ZQueue#bothWith 我们也可以将两个队列组合成一个队列,该(组合)队列广播 offer (到两个队列中)并(同时)从两个队列中接收(take)消息: 附加资料 ZIO Queue Talk by John De Goes @ ScalaWave 2018 ZIO Queue Talk by Wiem Zine El Abidine @ PSUG 2018 Elevator Control System using ZIO Scalaz 8 IO vs Akka (typed) actors vs Monix

Promise

Promise[E, A] 是只能被设置一次的 IO[E, A] 类型的变量。 Promise 用于构建更高级别的并发原语,通常用于需要多个纤程相互协调传递值的情况。 建立 可以使用 Promise.make[E, A] 来创建 Promise,它返回 UIO[Promise[E, A]]。这是对创建 Promise 的描述,而不是实际的 Promise。不能在 IO 外部创建Promise,因为创建它们涉及分配可变内存,这是一种 effect,必须安全地在 IO 中捕获。 运算 完成 您可以通过几种不同的方式完成 Promise[E, A]: 使用 succeed 成功地设置 A 类型的值。 用 done 设置 Exit[E, A] —— await 将得到该退出值。 用 complete 获取 IO[E, A] effect 的结果 —— effect 将会被(而且只会被)执行一次,其结果会被所有等待该 promise 的纤程得到。 用 completeWith 将 effect IO[E, A] 设置为结果 —— 第一个调用 completeWith 的纤程赢得设置该 effect 的权限,并且而该 effect 在 promise 每次被 await 时(才)都会被求值,因此要小心使用completeWith(someEffect),尽可能使用 complete(someEffect) 除非确实需要让每个线程在 await 的时候执行效果。 用 fail 设置 E 类型失败。 通过 die 让 promise 抛出 Throwable 异常 通过 halt 为 promise 设置 Cause[E] 失败或异常 使用 interrupt 终止 promise 以下示例显示了所有这些的用法: 完成 Promise 的操作后会产生一个UIO[Boolean],其中 Boolean 表示已设置成功(true)还是已经被设置(false)。如下所示: 另一个关于 fail(…) 的例子: 再次重申,布尔值告诉我们操作是否成功(true),即成功设置了 Promise 的值或已经被设置而导致错误。 等待 您可以使用 await 从 Promise 中获取值,调用的纤程将被挂起直到 Promise 完成。 轮询 计算将暂停(以非阻塞方式),直到 Promise 出现一个值或错误为止。如果您不想暂停,而只想查询 Promise 是否已完成的状态,则可以使用 poll: 如果调用 poll 时 Promise 未完成,则 IO 将以返回 Unit 值的方式宣告失败,否则将获得 IO[E, A],其中 E 表示 Promise 完成但有错误,而 A 表示 Promise 成功完成返回一个 A 值。 isDone 返回 UIO[Boolean],如果 Promise 已经完成,则评估为 true。 使用案例 这是一个如何在两个纤程之间使用 Promise 交换数据的案例: 在上面的示例中,我们创建了一个 Promise 并让一个 Fiber(fiberA)在1秒后完成该 Promise,第二个 Fiber(fiberB)在该 Promise 上调用 await 以获取字符串,然后将其打印到屏幕上。该示例在 1 秒后将 hello world 打印到了屏幕上。请记住,这只是程序的描述,而不是执行本身。

Managed

Managed 是一个封装了资源的 acquire 和 release 的数据结构。 Managed[E, A] 表示一个类型为 A 的托管资源,它可以通过 use 方法被使用。资源将在使用之前自动获取资源,并在使用之后自动释放资源。 如果资源无法在 use 范围内生效,这意味着您可能在获得资源后,在 use 中将其浪费掉,然后在资源消耗完后再次使用它,根据资源提供的功能类型它可能已经不再有效,并且可能会因检查错误而失败。 在此示例中,Managed 类在调用 use 时创建队列,并在 doSomething 完成时调用 shutdown。 创建一个 Managed 如上例所示,可以通过传递 acquire 函数和 release 函数来创建 Managed。 也可以从 effect 中创建。在这种情况下,release 函数将不执行任何操作。 您也可以从纯值创建 Managed。 ZIO 环境中的 Managed Managed[E, A] 实际上是 ZManaged[Any, E, A] 的别名。如果您希望acquire,release 或 use 函数中使用环境 R,请使用 ZManaged 来代替 Managed。 合并 Managed 可以使用 flatMap 将多个 Managed 合并在一起,以得到获取和释放所有资源的单个 Managed。

Cats IO

Cats IO 是一种将 side effect 编码为纯值的数据类型,它能够描述同步和异步计算。 介绍 IO[A] 类型的值代表一种计算模型,在对它进行求值时,它将执行某种效果运算并返回 A 类型的值。 IO 的值是纯的,不变的,因此保留了引用透明性,因此被用于函数式编程。 IO 是一种数据结构,仅表示对副作用计算的描述。 IO 可以描述以下同步或异步计算: 只能得到唯一解。 可能以成功结束或以失败结束,并且在失败的情况下,flatMap 链会被短路(IO 实现了 MonadError 代数结构)。 可以取消,但请注意,此功能依赖用户提供取消逻辑。 由抽象的过程所描述的 effect 将不会被执行直到“世界的尽头”,具体地说,直到某个“unsafe”方法被调用为止。 效果的执行结果是不被记忆的,这意味着内存开销最小(并且没有泄漏),并且,单个 effect 可以被以“引用透明”的方式被多次执行。例如: 上面的这个的例子,“hey!”被打印了两次,因为这个 effect 在“单子”链中被重复地执行。 引用透明和惰性求值 IO 可以“冻结” side effect,因为它是一种惰性求值的数据类型。请参考以下分类并反复与标准库中的“Future”的进行比较,来理解求值模型(在 Scala 语境中)的全貌。   Eager Lazy Synchronous A () => A     Eval[A] Asynchronous (A => Unit) => Unit () => (A => Unit) => Unit   Future[A] IO[A] 通过与 Scala 的 Future 比较,IO 数据类型即使在处理副作用时也保留了引用透明性,并且它是惰性求值的。与像 Scala 这样的即时求值语言相比较,这是结果与产生结果的函数之间的区别。 与 Future 类似,通过 IO,您可以推断异步处理的结果,但是由于其纯性和惰性,可以将 IO 视为一个规范(直到“世界的尽头”才进行求值),从而可以对 IO 的求值模型施加更多的控制,并且更具可预测性。例如当组合多个 IO,或处理错误时,是以序列化的方式处理,还是以并行的方式处理。 注意惰性求值总是与引用透明性并存。参考以下示例: 如果我们考虑引用透明性,则可以将该示例重写为: 但是这不适用于 Future,但适用于 IO,此能力对于函数式编程至关重要。 堆栈安全 IO 在它的 flatMap 中是以 trampoline 的方式进行求值的,这意味着您可以在任意深度的递归函数中安全地调用 flatMap,而不必担心会顶爆堆栈: 根据 IO 中实现的类型类的层次结构。除某些函数外,所有这里定义的操作都可用于 IO。 Effects 介绍 IO 是一种强大的抽象,它可以效果化地描述多种不同的 effect: 纯值 — IO.pure & IO.unit 在 IO 的伴随类中定义了以下函数,可以用于将纯值加载到 IO 中,从而生成出“已经求值”的 IO 值: 请注意,给定的参数形式是值传递而不是按名(by-name)传递。 例如,我们可以将一个数字(纯值)放入 IO 中,然后将其安全地与另一个打包了side effect 的 IO 组合在一起,因为它们将不执行任何操作: 显然,IO.pure 无法挂起副作用,因为当参数被以值传递给它时,IO.pure 是即时求值的,因此请不要这样做: 在这种情况下,println 将触发副作用,该副作用不会在 IO 中被挂起,有鉴于此,这样的代码可能不是我们想要的。 IO.unit 是 IO.pure(()) 的简化的别名,可在需要 IO[Unit] 值时重复使用,而无需担心触发任何其他副作用: IO[Unit] 在 Scala 代码中使用的相当普遍,Unit 类型本身就意味着调用的返回有副作用,它可以作为 pure(()) 的快捷方式,并且可以起到优化的用处,因为返回了相同的引用。 同步效果 — IO.apply 它可能是最常用的构建器,等效于 Sync[IO].delay,描述了可以在当前线程和调用堆栈上立即做求值的 IO 操作: 请注意,给定的参数是通过“by-name”传递的,其执行被“暂停”在 IO 上下文中。 这个示例是在 JVM 之上使用阻塞 I/O 从控制台读取和写入信息: 然后,我们可以以纯函数的方式使用它们来对与控制台的交互进行建模: 异步 […]

IO

IO[E, A] 类型的值,是一个可能导致 E 类型的失效,或永远运行,或产生 A 类型成功值的 effect。 IO 的值是不可变的,并且所有的 IO 函数都会产生新的 IO 值,使得 IO 像其它普通的 Scala 不变数据结构一样可以被推理和使用。 IO 值实际上啥也不做;它们只是一个描述交互效果的模型的值。 IO 可以被 ZIO 运行时系统解释成与外部世界的交互效果。理想情况下,这个过程在应用程序的 main 函数中一次性发生的。 App 类自动提供了此功能。 纯值 您可以通过 IO.succeed 将一个纯值装载入 IO 绝对不要使用任何构造函数将不纯代码导入 IO。这样做的结果是不确定的。 不会失败的 IO UIO[A] 类型的 IO 值(它的错误类型为 Nothing)被认为是不会失败的,因为Nothing 类型表示不存在的,即,不能有 Nothing 类型的实际值。此类型的值可能会产生 A 类型的成功结果,但永远不会导致 E 类型的失败。 不产生有效值的 IO IO[E, Nothing] 类型的 IO 值(其中值类型为 Nothing)被认为是无有效值的,因为 Nothing 类型表示不存在的,即不能有 Nothing 类型的实际值。此类型的值可能会得到 E 类型的失败,但永远不会产生成功的结果值。 不纯的代码 您可以使用 IO 的 effectTotal 方法将同步效果的代码导入为纯函数程序: 它的执行结果有可能是任何 Throwable 类型的失败。 如果(这个结果的)范围太广,可以使用 ZIO 的 refineOrDie 方法仅保留对某些类型的异常的关注,而其他任何类型的异常则直接导致“死亡”: 您可以使用 IO 的 effectAsync 方法将异步效果的代码导入为纯函数程序: 在此示例中,假设 Http.req 方法在得到异步执行的结果后将调用指定的回调函数。 映射 您可以通过调用 map 方法时给予函数 A => B 来将 IO[E, A] 更改为 IO[E, B]。这可以将前一操作产生的值转换为另一个值。 您可以在调用 mapError 方法时使用函数 E => E2 ,将 IO[E, A] 转换为 IO[E2, A]: 链式调用 您可以使用 flatMap 方法依次执行两个操作。第二动作的执行取决于第一动作产生的值。 您可以使用 Scala 的 for comprehension 语法使这种类型的代码更紧凑: Brackets bracket 是内置原语,可让您安全地获取和释放资源。 Brackets 被用在类似 try/catch/finally 的场景,但是 brackets 可以被用于同步和异步,可以和纤程中断无缝配合。它基于不同的错误模型构建,以确保不会丢失任何错误。 Brackets 由一个获取方法,一个使用方法(用于使用获取的资源),和一个释放方法组成。 释放动作由运行时来保证执行,哪怕使用中抛出了异常或执行的纤程被中断。 Brackets 支持组合语义,因此,如果将一个 bracket 嵌套在另一个 bracket 内,如果外部 bracket 获取了资源,那么即使内部 bracket 的释放失败,外部 bracket 的释放动作也依然会得到执行。 有一个名为 ensuring 的方法提供了另一个类似 finally 的功能: 一个完整的使用 brackets 的例子

STM

软件事务性存储(STM)是一种允许任意组合原子操作的技术。它是数据库系统中事务管理的类似物。 STM可以原子方式执行多组内存操作。该 API 受到 Haskell 的 STM 库 启发,尽管 ZIO 中的实现完全不同。 与传统的低级锁定机制相比,STM具有许多优点: 组合性 避免死锁 异常或超时的时候自动回滚 避免由并发和锁粒度带来的压力 传统的使用锁的并发程序的主要问题在于难以实现组合,各自正确的代码片段组合在一起时可能会失败,因为当越来越多的锁被引入代码时,获取和释放锁的顺序不当很容易导致死锁。 事务性存储消除了许多基于底层锁的低级编程带来的困难,因为事务性数据结构是无锁的。 事务性数据结构 STM事务中可以有多种事务性数据结构: TRef: 一个对不可变量的可变引用 TPromise: 一个只能设置一次的可变引用 TArray: 一个可变引用的数组 TMap: 一个可变的 map TQueue: 一个可变的队列 TSet: 一个可变的集合 TSemaphore: 信号量 TReentrantLock: 重入锁 由于STM非常重视组合性,因此您可以在这些数据结构上建立并定义自己的并发数据结构。例如,您可以使用 TRef,TMap 和 TQueue 构建事务优先级队列。 STM 数据类型 STM[E, A] 表示可以以事务性的方式执行,并得到失败 E 或成功 A 的 effect。它 有一种更强大的变体 ZSTM[R, E, A],它支持环境类型 R,就如同 ZIO[R, E, A]。 STM(和它的变体ZSTM)数据类型不如 ZIO[R, E, A] 数据类型强大,这是因为它不允许您执行任意的 effect。因为 STM 内的动作可以执行任意次数(也可以回滚),所以在内存性事务中只能执行 STM 操作和纯的计算。STM 操作也不能在事务之外执行,因此您不能在 STM.atomical 的保护范围之外意外地读写事务数据结构(或不明确地提交事务)。例如: transferMoney 描述了发送方和接收方之间的原子事务过程。如果发件人帐户中没有足够的资金,交易将失败。这意味着个人帐户在自动完成借贷交易过程中,如果事务在中间失败,则整个过程将被回滚,并且看起来什么也没有发生过一样。在这里,我们看到 STM effect 通过 for-comprehension 按顺序来组合,并且通过 STM.atomical(或在任何单个 STM effect上调用 commit)将 STM effect 转换为可以执行的 ZIO effect。通过使用 STM.atomically(或 commit),程序员可以将 STM.atomically 中的各个操作视为不可分割,从而将它整体上看待成一个原子事务。 错误 STM 就像 ZIO 一样通过错误通道来支持错误输出。在 transferMoney 中,我们看到了一个产生错误的示例(STM.fail)。 STM 中的错误具有中止的语义:如果原子事务遇到错误,则该事务将回滚并且无效。 重试 STM.retry 是阻塞状态下的事务实现组合的关键。例如,如果我们要等待汇款人的账户有足够的钱时进行汇款(而不是立即失败),我们就可以用 STM.retry 来代替: STM.retry 将重试整个事务,直到成功为止(而不是像前面的示例一样失败)。但是请注意,仅当其下的事务数据结构发生改变时,重试才会开始。STM.retry 组合器还有许多其他变体,例如 STM.check。您可以用 STM.check(senderBal < amount) 替换,而不是 if (senderBal < amount) STM.retry else STM.unit。 选项性组合 STM 事务一般按顺序组合在一起,依次执行两个STM effect。然而,STM 事务也可以通过 orTry 选项性地组合在一起,只要其中一个执行通过即可。假设我们有两个 STM effect sA 和 sB,则我们可以以 sA orTry sB 的形式组合这两个 effect。事务将首先尝试运行 sA,如果无效需要重试,则 sA 被放弃,转而运行 sB,现在,如果 sB 也需要重试,则尝试重试整个过程,但是在这之前,它会等待被 sA 或 sB 调用的事务数据结构发生变换。使用 orTry 是一种优雅的技术,可用于确定 STM 事务是否需要阻塞。例如,我们可以用 orTry 来将(retry 版本的) transferMoneyNoMatterWhat 转变为会立刻失败的 STM 事务,如果发件人没有足够的钱,则该交易将立即失败,而无需重试: 因为 orTry 的语义,事务将因为没有钱而立即失败。

FiberRef

FiberRef[A] 表示对一个 A 类型值的可变引用建模。它有两个基本操作,set:将引用设置为新值;get:取回的当前值。 与 Ref[A] 不同,FiberRef[A] 中的值只会与当前执行中的纤程绑定。拥有同一个 FiberRef[A] 的不同纤程可以独立设置和读取参考值,而不会发生冲突。 您可以将 FiberRef 视为与 Java 的 ThreadLocal 类似。 操作 FiberRef[A] 具有几乎与 Ref[A] 相同的 API。它包括一些熟知的方法,例如: FiberRef#get:返回当前引用的值。 FiberRef#set:设置当前引用的值。 FiberRef#update/FiberRef#updateSome:用指定的函数更新引用值。 FiberRef#modify/FiberRef#modifySome:用指定的函数更新引用值,并允许该函数返回一个值。 您也可以用 locally 让 FiberRef 值的访问范围仅限定于给定的 effect: 传递 FiberRef[A] 在 ZIO#fork 指令中具有 copy-on-fork 的语意。 简而言之这意味着子纤程一开始就具有父纤程在 FiberRef 中的值。当子纤程改变了该值,这个变化只能被子纤程自己看到,父纤程还将保持自己的值。 您可以使用 Fiber#inheritRefs 方法从一个纤程中继承所有 FiberRef 的值: 请注意,join 会自动调用 inheritRefs。这实际上意味着以下两个效果的行为相同: 此外,您可以自定义如何(如果有的话)在建立分支纤程时更新值以及在合并纤程时如何合并值。为此,请在 FiberRef#make 期间指定所需的行为: 内存安全 FiberRef 中的值在其宿主纤程结束后,会被自动垃圾收集。无法访问的FiberRef(在用户代码中没有对其的引用)中的所有特定纤程的值也会被自动垃圾回收,哪怕它们曾经在当前运行的纤程中被使用过。

Fiber(纤程)

要在不影响当前进程的情况下执行 effect,可以使用纤程,这是一种轻量级的并发机制。 您可以通过 fork 让任何 IO[E, A] 立即产生出一个纤程 (UIO[Fiber[E, A]])。可以通过 join 来合并一个持有的纤程,该调用将会得到该纤程的返回值,或者也可以中断(interrupt)该纤程的执行,一个终止的纤程会安全地释放该纤程持有的所有资源。 在 JVM 上,纤程会使用到线程,但不会无限制地消耗线程。相反,纤程(对有限的线程以)高竞态地方式协同运行。 直到纤程已完成或已被彻底中断并且其所有终结器都已运行,中断操作才会返回。这些精确的语义是为了允许构建一个不会泄漏资源的程序。 fork0 是一个更强大的 fork 变种,它允许指定监管程序,该监管程序可以处理任何来自受监管的纤程的任何的不可恢复的错误,包括终结器中发生的所有此类错误。如果未指定监管程序,则将采用父纤程的监管程序,并以此递归直到根处理程序为止,监管程序可以在运行时(runtime)中指定(缺省的监管程序仅打印堆栈跟踪)。 错误模型 IO 的错误模型简单且一致,支持类型错误和终止,并且不违背 Functor 层次结构中的任何法则。 一个 IO[E, A] 的值只会引发E类型的错误。这个错误可以通过 either方法来恢复。这是一个不会失败的 effect,因为失败值被作为 Either 的成功值的一部分返回。 除了类型 E 错误外,一个纤程可能由于以下原因终止: 纤程自行终止或被另一个纤程中断。 “主”纤程不能被中断,因为它不是从任何其他纤程中分支出来的。 纤程无法处理某些 E 型错误。只有在 IO.fail 未被处理时才会发生。对于 UIO[A] 类型的值,这种类型的故障是不可能的。 纤程中的缺陷会导致不可恢复的错误。但是只能通过两种方式产生这种缺陷: 将一个偏函数传递给高阶函数,比如 map 或 flatMap。例如,io.map(_ => throw e) 或 io.flatMap(a => throw e)。解决此问题的方法是不要将不纯函数传递给像 ZIO 这样的纯函数库,因为这样做会导致违反代数定律和破坏方程式推理。 在 IO.effectTotal 等函数中使用会抛出错误的代码。要在 IO 中使用不完全效果的代码,正确的解决办法是使用诸如 IO.effect 之类的方法,该方法可以将异常安全地转换为值。 当纤程被终止时,终止原因被以 Throwable 的方式传递给纤程的监管程序,该管理程序可以选择记录日志,打印堆栈跟踪记录,重新启动纤程或执行其它符合上下文的其他操作。 纤程如果被中断,其自身无法停止这个过程。哪怕在中断过程中某些终结器抛出了不可恢复的错误,所有终结器也都依然会被执行。终结器抛出的错误将被传递给纤程的监管器。 在任何情况下,都不会丢失任何错误,这使得 IO 错误模型比 Scala 和 Java 中的try/catch/finally 结构更易于诊断,因为它们很容易丢失错误。 并行 zipPar 可以用于并行计算: zipPar 组合器具有资源安全的语义。如果一个计算失败,另一计算将被中断,以防止浪费资源。 Racing 两个 IO 操作可以执行“竞速”运算,这意味着它们将并行执行,并且将返回先成功的运算的值。 race 组合器也是资源安全的,这意味着如果两个操作之一返回一个值,则另一个将被中断,以防止浪费资源。 race 和 zipPar 组合器是功能更强大的 raceWith 组合器的特例,它可以在两个操作中的第一个成功执行时执行用户定义的逻辑。 线程切换 – JVM 默认情况下,纤程不保证它们会在哪个线程上执行。它们可能会在线程之间切换,尤其是长时间执行时。 纤程只会在运行时(Runtime)系统的线程池中的线程间转移,这意味着默认情况下,长时间运行的纤程最终将回到运行时系统的线程池中,哪怕它们是从其他线程(池)中启动的,(异步)恢复后也是如此。 出于性能方面的考虑,纤程会尝试在同一个线程上执行一个(可配置的)最短的时间,然后切换到其它纤程。从异步回调中恢复的纤程将在它的启动线程上恢复,并持续一段时间,然后被切换到运行时线程池上恢复运行。 这样的默认配置有助于确保堆栈安全和协作式多任务处理。如果不需要自动线程转移,则可以在运行时(Runtime)中更改它们。

BACK TO TOP