STM 介绍

STM 介绍

简介 ZIO 支持软件事务性内存 (STM),它是一种模块化的可组合的并发性数据结构。它允许我们在一个原子事务中组合并执行一组内存操作。 STM是一组并发任务之间的通信的抽象。STM 的主要优点是可组合性和模块化。我们可以编写可以与同样使用 STM 构建的任何其他抽象组合的并发抽象,同时不必暴露我们的抽象是如何确保安全的细节。而锁定机制则通常不是这样的。 Transactional 操作的想法并不新鲜,它们一直是分布式系统的基础,也是那些数据库之所以能够保证我们拥有 ACID 性质。STM是纯内存操作的。所有的操作都发生在内存中,与远程系统或数据库无关。与 ACID 属性的数据库概念非常相似,但缺少持久性,因为那对内存中的操作没有意义。 在事务性内存中,我们有关于 ACID 属性的几个方面: Atomicity(原子性)——在写操作中,我们需要原子更新,这意味着更新操作要么应该立即运行,要么根本不运行。 Consistency(一致性)——在读操作中,我们希望程序状态具有一致的视图,以确保各部分都引用相同的状态,在获得状态时获得相同的值。 Isolated(隔离性)——如果我们有多个同步更新,我们需要在隔离的事务中执行这些更新。每个事务不会影响其他并发事务。无论有多少纤程在运行多少数量的事务,都不必担心其它事务中发生的事情。 ZIO STM API 的灵感来自 Haskell 的 STM库,尽管 ZIO 中的实现完全不同。 问题 让我们从一个简单的inc函数开始,它接受一个 Int 类型的可变引用 并增加其 amount: 如果只有一个纤程,那不会有问题。这个函数看上去是正确的。但是,如果在读取计数器的值和设置新值之间,另一个纤程出现并改变了计数器的值,会发生什么?在我们读取计数器之后,另一个纤程恰好更新了计数器。所以这个函数是有竞争条件的,我们可以用下面的程序来测试: 由于上述程序运行了 10 个并发纤程来增加计数器值。但是,我们不能期望这个程序总是返回结果 10。 为了解决这个问题,我们需要原子地执行 get 和 set 操作。所幸 Ref 数据类型的一些 API,比如 update,updateAndGet和modify 提供了读写同步的原子操作。 需要注意的是有关 modify 操作最重要的一点是它不使用悲观锁定。它不对临界区使用任何锁定原语。它对潜在的操作碰撞有一个乐观的假设。 该 modify 函数执行以下三个步骤: 它假设其他纤程在大多数情况下不会改变共享状态并且在多数情况下不会互相干扰。因此它在不使用任何锁定原语的情况下读取共享状态。 你应该为最坏的情况做好准备。如果另一个纤程同时接入,会发生什么?因此,当我们开始写入新值时,应该检查所有可能影响的方面。它应该确保看到全局一致的状态,如果它看到了,那么它才可以改变那个值。 如果它遇到不一致的值,则不应继续。它应该使以上假设变得无效并中止更新共享状态。然后基于被修改过的值重新尝试 modify 操作。 让我们看看在没有任何锁定机制的情况下 Ref 如何实现 modify 功能: 正如我们所看到的,modify 操作是根据 compare-and-swap 操作来实现的,它帮助我们以原子方式执行读取和更新。 让我们将 inc 函数重命名为以下的 deposit,尝试将钱从一个账户转移到另一个账户的经典问题: 增加 withdraw 函数: 看起来还不错,但是实际上我们要先检查账户中是否有足够的余额可以提现。所以让我们修改它添加一个不变量来检查: 如果在检查和更新余额之间,另一条纤程来提取账户中的资金怎么办?所以这个解决方案包含一个错误。它有可能让账户达到负平衡。 假设我们最终达成了一个解决方案来自动退出,但问题仍然存在。我们需要以原子的方式将 withdraw 和 deposit 组合在一起以创建 transfer函数: 在上面的例子中,即使我们假设 withdraw 和 deposit 各自是原子的,我们也不能组合这两个事务。它们在并发环境中会产生错误。这段代码不能给我们保证 withdraw 都 deposit 两者在同一个原子操作中执行。同时执行此 transfer 方法的其他纤程可以覆盖共享状态并引入竞争条件。 我们需要一个解决方案来原子地组合事务。这就是STM发挥作用的地方。 可组合的并发 软件事务内存为我们提供了一种组合多个事务并在一个事务中执行它们的方法。 让我们继续我们的最后努力,将我们的 withdraw 方法转换为一个原子操作。为了使用 STM 来解决问题,我们将 Ref 替换 TRef. TRef 的含义是Transactional Reference;它是 STM 世界中的可变引用。STM 是一个一元的数据结构,代表一个可以事务化执行的 effect: 同样 deposit 操作是原子的,但为了能够和 withdraw 组合,我们也需要用 TRef 将其重构并 return STM: 在 STM 的世界中,我们可以组合所有操作,并直到世界的尽头,我们才在一个操作中原子地执行所有这些操作。为了能够让 withdraw 和 deposit 组合在一起,我们需要让它们都保持在 STM 的世界中。因此,我们不对它们中的每一个单独执行STM.atomically 或 STM#commit 方法。 现在我们可以在 STM 的世界中组合这两个函数来定义 transfer,并将它们转换为单一原子的 IO 方法: 假设我们正在将资金从一个账户转移到另一个账户。如果我们取出第一个账户但没有存入第二个账户,这种中间状态对任何外部纤程都是不可见的。如果不存在任何有冲突的变更,则事务完全成功。如果存在任何冲突或冲突更改,则整个 STM 将被重试。 它是如何工作的 STM 使用了和 Ref#modify 函数相同的思想,但具有可组合性特征。STM 的主要目标是提供一种机制来组合多个事务并在一个原子操作中执行它们。 可组合部分背后的机制是显而易见的。STM 有自己的世界。它有很多有用的 combinators(组合子),例如 flatMap 和orElse 可用于组合多个 STM 并创建更优雅的结果。在我们通过 STM#commit 或 […]

摘要

Here are a few guides for common patterns with ZIO: 使用模块和层: 如何借助ZIO环境构建大型的 ZIO 程序。 Test effects: 如何使用 ZIO Test 无缝测试效果化的程序. Mock services: 如何使用模拟(mocks)来测试服务之间的交互。 Handle errors: 如何处理 ZIO 中的错误(可声明的错误与无法预见的缺陷)。 Access system information: 如何使用ZIO访问环境变量和其他系统属性。

使用模块和层

用 ZLayer 注入 ZIO 环境 ZIO 是围绕3个参数设计的,R, E, A。R 代表运行 effect 时的要求,这意味着我们需要满足这些要求才能使 effect 可运行。我们将探讨我们可以用 R 做些什么,因为 R 在 ZIO 中起着至关重要的作用。 有关 ZIO 环境的简单案例 让我们构建一个用户管理的简单程序,该程序可以检索用户(如果存在)和创建用户。我们需要一个 DBConnection 来访问数据库,并且程序中的每个步骤都通过环境类型来表示这一点。然后,我们可以通过 flatMap 将两个(小的)步骤组合在一起,或者通过 for comprehension 更方便地进行组合。 结果是一个决于对 DBConnection 的依赖的程序。 要运行该程序,我们必须通过 provide 方法提供 DBConnection,然后再将运算提供给 ZIO 运行时。 请注意,通过 provide 为该环境提供 effect 的行为导致的结果是消除了返回的 effect 中的环境依赖,返回的 effect 中的环境类型呈现为 Any。 通常,我们不仅需要数据库连接。我们需要使我们能够执行不同操作的组件,并且需要将它们连接在一起。这也是模块化的作用。 我们的第一个 ZIO 模块 接下来,我们将看到如何定义模块,并使用它们来创建彼此依赖的不同的应用层。核心思想是,一个层依赖于紧接在下面的层,但是完全不知道其内部实现。 这种模块式的表述是 ZIO 管理应用程序组件之间的依赖关系的方式,在组合性方面提供了强大的功能,并提供了轻松更改不同实现的功能。这在测试/模拟阶段特别有用。 模块是什么? 模块是仅处理一个关注点的一组功能的集合。限制模块的范围可以提高我们理解代码的能力,因为我们一次只需要专注于一个主题,而不会在脑海中纠缠过多的概念。 ZIO 本身就通过模块来提供基本功能,比如查看 ZEnv 是如何定义的。 模块的构成 让我们按照以下简单步骤构建一个用于用户数据访问的模块: 定义一个指定模块的名称的对象,它可以是(非必须)一个包对象。 在模块对象中定义一个特质(trait)服务,该服务定义了我们模块所公开的接口,在本例中它包括两个方法:检索和创建用户。 在模块对象中,通过 ZLayer 定义该模块的不同实现(有关 ZLayer 的详细信息,请参见下文) 定义类型别名,例如 type ModuleName = Has[Service](有关以下内容的详细信息,请参见下文) 我们遇到了两种新的数据类型 Has 和 ZLayer,接下来让我们熟悉它们。 Has 数据类型 Has[A] 表示对A类型服务的依赖。可以通过 ++ 运算符将两个 Has[_] 水平合并,如: 此时您可能会问:如果结果类型只是两个 trait 的混合,那有什么用?为什么我们不仅仅依靠特质 mixins? Has 赋予的额外能力是,通过将服务类型交叉映射到服务实现,从而得到的结果数据结构混合了每个实例,不仅可以访问/提取/修改其中的某个实例,同时也保证了彼此之间的类型安全。 根据之前的说明,为 Has[Service] 定义类型别名非常方便。通常我们不直接创建一个 Has,而是通过 ZLayer 来实现。 ZLayer 数据类型 ZLayer[-RIn, +E, +ROut <: Has[_]] 表示根据输入 RIn 来生成 ROut 类型的环境值,可能产生的错误类型用 E 来表达。 遵循环境的概念,当 RIn = Any 时表示可以不需要输入,并可将类型别名简写为 Layer。 创建 ZLayer 的方法有很多,这是一个不完整的列表: 用 ZLayer.succeed 或 ZIO.asService 通过现有服务创建 Layer ZLayer.succeedMany 根据一个或多个服务创建一个 Layer。 ZLayer.fromFunction 通过一个将请求转换成服务的函数创建 Layer。 ZLayer.fromEffect 将 ZIO effect 提升为一个具有效果化的环境的 Layer。 ZLayer.fromAcquireRelease 具有资源获取/释放能力的 Layer。这个想法与 ZManaged 相同。 ZLayer.fromServices 从多个服务中构建 Layer。 非常合理地,这些构建服务的方法各自还有不同的变体:具有效果化的(以后缀M来标示),资源化的(后缀Managed)或不同服务的组合(后缀为Many)。 我们可以水平地组合 layerA 和 layerB,通过 layerA ++ layerB 来组合两个层以构建出同时具有两个需求层的组合层, 我们也可以垂直地组合层,这意味着将一层的输出用作下一层的输入以构建出下一层,从而得到需要将第一层作为输入的第二层输出:layerA >>> layerB 将模块连接在一起 在这里,我们定义了一个模块来处理 User 域对象的 CRUD 操作。我们还提供模块在内存中的实现。 然后,我们定义另一个模块来执行一些基本的日志功能。我们提供了它的基于zioConsole 的 consoleLogger […]

反应式流

检出 interop-reactiveStreams 模块以获得对反应式流的互操作支持。 反应式流的 Producer 和 Subscriber ZIO 通过将 zio.stream.Stream 转换到 org.reactivestreams.Publisher,和将 zio.stream.Sink 转换到 org.reactivestreams.Subscriber 来集成 Reactive Streams。反之亦然。简单地引入 zio.interop.reactiveStreams._ 来让转换生效。 例子 首先,让我们导入一些内容。 我们使用以下发布者和订阅者作为示例: 将发布者作为流 将一个发布者作为流来使用时最多缓冲 qSize 个元素。如果可能的话,qSize 应该是 2 的幂以达到最佳性能。默认值为 16。 将订阅者作为接收器 当将一个订阅者连接到 Stream 时,需要一个旁支通道来处理故障。因此,toSink 返回 Promise 和 Sink 的元组。当流失败时,Promise 必须呈现失败状态,toSink 上的 type 参数代表 Stream 的错误类型。 将流作为发布者 将接收器作为订阅者 toSubscriber 返回一个 Subscriber 和一个IO,这个 IO 负责在接收器执行结束后,或当发布者产生错误时,返回结果。作为订阅者的用接收器最多缓冲 qSize 个元素。qSize 应该尽可能是 2 的幂以达到最佳性能。默认值为16。

Scalaz 7.x

ZIO 实例 如果您是 Scalaz 7.2 的忠实用户,那么 interop-scala7x 模块为它的几种类型类提供了 ZIO 支持,请查看源代码以获取更多详细信息。 例子 通过 ZIO 并行执行 Applicative 实例 由于 Applicative 和 Monad 相关的法则,ZIO 的 Applicative 实例必须通过 bind 来获得,因此组合多个 Applicative effect 时将只能串行获得。为了解除这个限制,ZIO 通过非 Monad 的标记(Tag)来并行执行 Applicative 实例。 例子

Monix

签出 interop-monix 模块以获得与 Monix 的互操作支持。 转换 Task 互操作层提供对以下转换的支持: 将 Task[A] 转为 UIO[Task[A]] 将 Task[A] 转为 Task[A] 要将 IO 转换为 Task,请使用以下方法: 要执行反方向的转换,请使用以下定义在 IO 伴随对象中的扩展方法: 请注意,为了将 Task 转换为 IO,需要使用适当的 Scheduler。 例子 转换 Coeval 要将 IO 转换为 Coeval,请使用以下方法: 要执行反方向的转换,请使用以下定义在 IO 伴随对象中的方法: 例子

JavaScript

通过将以下内容添加到您的 build.sbt 中,使得在 Scala.js 项目中支持 ZIO: 例子 您的 main 函数可以像下面这样通过扩展 App 得到。这个例子使用 scala-js-dom 来访问 DOM;要运行该示例,您将需要将该库作为依赖项添加到 build.sbt中。

Java

ZIO 与外部 Java 代码具有完全的互操作性。让我向您展示它的工作原理,然后讲解第一个案例,明天您就可以在工作中使用纯函数式 Java 了。 From Java CompletionStage and back CompletionStage 是(Java 提供的)最便捷的用于模拟函数式异步效果的 API(例如 ZIO)的接口,因此我们从它开始。轻而易举地: 您甚至可以将其变成纤程! 该 API 创建了一个不绑定具体对象的纤程。 此外,您如果希望将 ZIO 值转换为 CompletionStage 也易如反掌: 正如您所看到,它返回 CompletionStage 接口的具体类,即CompletableFuture。需要指出的是,只要可以将类型 E 的值转换为Throwable,那么任何 IO[E, A] 都可以变成 CompletableFuture: Java 的 Future 类 您可以通过 ZIO.fromFutureJava 将任何 java.util.concurrent.Future 嵌入 ZIO 计算中。一个简单的 Apache Async HTTP 客户端的例子看起来如下: 就这么简单。请注意,从对产出值的签名中可以看出,ZIO 在内部使用了阻塞Future#get 调用。显然,它被运行在阻塞线程池上,我想您应该清楚地知道。如果可能的话,请如上所述使用 ZIO.fromCompletionStage。 如果您需要,也可以使用 Fiber.fromFutureJava 将它转换为纤程。类似又有差别: NIO 完成具柄 通过提供完成处理具柄,Java 对使用 NIO API 的通讯通道执行异步处理,它通过将完成处理具柄挂接到可中断 I/O 中来实现。例如,读取文件的内容: 如您所见,ZIO 在此处提供了 CPS 样式的 API,与上面的两个例子有所不同,但是仍然非常优雅。

Future

Scala Future ZIO 现在提供了与 Scala 的 Future 的基本互操作性,并且不需要额外的模块提供支持。 转换自 Future 可以使用 ZIO.fromFuture 将 Scala 的 Future 转换为 ZIO effect: Scala 的 Future 也可以使用 Fiber.fromFuture 转换为 Fiber: 这是一个纯操作,因此于任何纤程执行公平性而言都是一个明智的注脚。 转换到 Future ZIO Task effect 可以通过 ZIO#toFuture 转换为 Future: 因为将 Task 转换为(即时的)Future 是效果化的,所以 ZIO#toFuture 的返回值是一个 effect。要真正开始执行 Future 并访问其中,必须使用 runtime 来执行 effect。 ZIO Fiber 可以通过 Fiber#toFuture 转换为 Future: 执行 Future Runtime 类型具有方法 unsafeRunToFuture,该方法可以异步执行 ZIO effect,并在 effect 执行完成时返回 Future。

BACK TO TOP