STM 介绍

STM 介绍

简介 ZIO 支持软件事务性内存 (STM),它是一种模块化的可组合的并发性数据结构。它允许我们在一个原子事务中组合并执行一组内存操作。 STM是一组并发任务之间的通信的抽象。STM 的主要优点是可组合性和模块化。我们可以编写可以与同样使用 STM 构建的任何其他抽象组合的并发抽象,同时不必暴露我们的抽象是如何确保安全的细节。而锁定机制则通常不是这样的。 Transactional 操作的想法并不新鲜,它们一直是分布式系统的基础,也是那些数据库之所以能够保证我们拥有 ACID 性质。STM是纯内存操作的。所有的操作都发生在内存中,与远程系统或数据库无关。与 ACID 属性的数据库概念非常相似,但缺少持久性,因为那对内存中的操作没有意义。 在事务性内存中,我们有关于 ACID 属性的几个方面: Atomicity(原子性)——在写操作中,我们需要原子更新,这意味着更新操作要么应该立即运行,要么根本不运行。 Consistency(一致性)——在读操作中,我们希望程序状态具有一致的视图,以确保各部分都引用相同的状态,在获得状态时获得相同的值。 Isolated(隔离性)——如果我们有多个同步更新,我们需要在隔离的事务中执行这些更新。每个事务不会影响其他并发事务。无论有多少纤程在运行多少数量的事务,都不必担心其它事务中发生的事情。 ZIO STM API 的灵感来自 Haskell 的 STM库,尽管 ZIO 中的实现完全不同。 问题 让我们从一个简单的inc函数开始,它接受一个 Int 类型的可变引用 并增加其 amount: 如果只有一个纤程,那不会有问题。这个函数看上去是正确的。但是,如果在读取计数器的值和设置新值之间,另一个纤程出现并改变了计数器的值,会发生什么?在我们读取计数器之后,另一个纤程恰好更新了计数器。所以这个函数是有竞争条件的,我们可以用下面的程序来测试: 由于上述程序运行了 10 个并发纤程来增加计数器值。但是,我们不能期望这个程序总是返回结果 10。 为了解决这个问题,我们需要原子地执行 get 和 set 操作。所幸 Ref 数据类型的一些 API,比如 update,updateAndGet和modify 提供了读写同步的原子操作。 需要注意的是有关 modify 操作最重要的一点是它不使用悲观锁定。它不对临界区使用任何锁定原语。它对潜在的操作碰撞有一个乐观的假设。 该 modify 函数执行以下三个步骤: 它假设其他纤程在大多数情况下不会改变共享状态并且在多数情况下不会互相干扰。因此它在不使用任何锁定原语的情况下读取共享状态。 你应该为最坏的情况做好准备。如果另一个纤程同时接入,会发生什么?因此,当我们开始写入新值时,应该检查所有可能影响的方面。它应该确保看到全局一致的状态,如果它看到了,那么它才可以改变那个值。 如果它遇到不一致的值,则不应继续。它应该使以上假设变得无效并中止更新共享状态。然后基于被修改过的值重新尝试 modify 操作。 让我们看看在没有任何锁定机制的情况下 Ref 如何实现 modify 功能: 正如我们所看到的,modify 操作是根据 compare-and-swap 操作来实现的,它帮助我们以原子方式执行读取和更新。 让我们将 inc 函数重命名为以下的 deposit,尝试将钱从一个账户转移到另一个账户的经典问题: 增加 withdraw 函数: 看起来还不错,但是实际上我们要先检查账户中是否有足够的余额可以提现。所以让我们修改它添加一个不变量来检查: 如果在检查和更新余额之间,另一条纤程来提取账户中的资金怎么办?所以这个解决方案包含一个错误。它有可能让账户达到负平衡。 假设我们最终达成了一个解决方案来自动退出,但问题仍然存在。我们需要以原子的方式将 withdraw 和 deposit 组合在一起以创建 transfer函数: 在上面的例子中,即使我们假设 withdraw 和 deposit 各自是原子的,我们也不能组合这两个事务。它们在并发环境中会产生错误。这段代码不能给我们保证 withdraw 都 deposit 两者在同一个原子操作中执行。同时执行此 transfer 方法的其他纤程可以覆盖共享状态并引入竞争条件。 我们需要一个解决方案来原子地组合事务。这就是STM发挥作用的地方。 可组合的并发 软件事务内存为我们提供了一种组合多个事务并在一个事务中执行它们的方法。 让我们继续我们的最后努力,将我们的 withdraw 方法转换为一个原子操作。为了使用 STM 来解决问题,我们将 Ref 替换 TRef. TRef 的含义是Transactional Reference;它是 STM 世界中的可变引用。STM 是一个一元的数据结构,代表一个可以事务化执行的 effect: 同样 deposit 操作是原子的,但为了能够和 withdraw 组合,我们也需要用 TRef 将其重构并 return STM: 在 STM 的世界中,我们可以组合所有操作,并直到世界的尽头,我们才在一个操作中原子地执行所有这些操作。为了能够让 withdraw 和 deposit 组合在一起,我们需要让它们都保持在 STM 的世界中。因此,我们不对它们中的每一个单独执行STM.atomically 或 STM#commit 方法。 现在我们可以在 STM 的世界中组合这两个函数来定义 transfer,并将它们转换为单一原子的 IO 方法: 假设我们正在将资金从一个账户转移到另一个账户。如果我们取出第一个账户但没有存入第二个账户,这种中间状态对任何外部纤程都是不可见的。如果不存在任何有冲突的变更,则事务完全成功。如果存在任何冲突或冲突更改,则整个 STM 将被重试。 它是如何工作的 STM 使用了和 Ref#modify 函数相同的思想,但具有可组合性特征。STM 的主要目标是提供一种机制来组合多个事务并在一个原子操作中执行它们。 可组合部分背后的机制是显而易见的。STM 有自己的世界。它有很多有用的 combinators(组合子),例如 flatMap 和orElse 可用于组合多个 STM 并创建更优雅的结果。在我们通过 STM#commit 或 […]

TReentrantLock

TReentrantLock(可重入锁)允许效果性地、安全地并发访问某些可变状态,从而允多个纤程读取其中的状态(因为并发读取是安全的),但是只有一个纤程可以修改状态(以防止数据损坏)。此外,TReentrantLock 是使用 STM 实现的,读写操作都可以当作事务进行提交,因此可以将其用作纯 ZIO effect 解决方案的基石,并在内部允许以简单且可组合的方式锁定多个状态(感谢 STM)。 TReentrantLock 是可重入的读/写锁。可重入锁是一个纤程可以多次取得而不会对其自身造成阻塞的锁。在难以跟踪您是否已经获得锁的情况下,此功能很有用。如果锁是不可重入的,则在你获得该锁后再次去获得它时会被阻塞,从而实际上导致了死锁。 语义 该锁对读取方和写入方都提供了重新获得读取或写入锁的重入保证。在释放由写纤程持有的所有写锁之前,不允许读纤程进入。除非没有其他纤程持有写入锁,或者想要写入的纤程已经持有读取锁且没有其他纤程也持有读取锁,否则不允许写纤程进入。 此锁还允许从读锁升级到写锁(自动),以及从写锁降级到读锁(前提是您从读锁升级到写锁是自动的)。 创建一个可重入锁 获取一个读锁 获取一个写锁 多个纤程可同时持有读锁 锁升级和降级 如果您的纤程已经持有读锁,则可以将其升级为写锁,前提是(除了当前纤程外)没有其他读取纤程持有该锁。 在有争议的情况下获取写锁定 只有满足以下条件之一,才能立即获取写锁定: 没有其他的(写入)锁持有人。 当前的纤程已经持有读取锁,且没有其他方持有读取锁。 如果以上两种情况都不满足,则尝试获取写锁定将在语义上阻塞当前纤程。这是一个示例,说明仅当所有其他读取器(尝试获取写锁的纤程除外)释放对(读或写)锁的锁定后,该纤程才能获得写锁。 此例中纤程 f1 获得读取锁定,并在释放它之前休眠 5 秒钟。同时纤程 f2 尝试获取读锁定,并立即尝试获取写锁定。但是,f2 必须在语义上阻塞大约5 秒钟才能获得写锁,因为 f1 到时才将释放其对锁的保留,然后 f2 才能获取写锁定。 更安全的方法(readLock and writeLock) 对于一些简单的使用案例,应避免使用 acquireRead,acquireWrite,releaseRead 和 releaseWrite ,而应该使用诸如 readLock 和 writeLock 之类的方法。借助 Managed 构造,readLock 和 writeLock 可以自动获取并释放锁。下面的程序是一个比上面的例子更安全的版本,它确保一旦通过可重入锁完成操作,我们就不会占用(会释放)任何资源。

TSemaphore

TSemaphore 是具有事务语义的信号量,可用于控制对公共资源的访问。它拥有一定数量的许可证,并且可以获取或释放许可证。 创建一个 TSemaphore 创建一个具有 10 个许可证的 TSemaphore: 获取一个许可证 一旦外部程序获得许可证,这会减少 TSemaphore 包含的剩余许可证数量。当用户想要访问受限共享资源时,就需要获取可: 请注意,如果在信号量中没有剩余的许可证时,尝试获取许可证则具有阻塞的语意,直到有许可证为止。请注意,阻塞语义不会阻塞线程,只当有许可证被释放时系统才会尝试重试 STM 事务。 释放一个许可证 访问完共享资源后,必须释放许可证,以便其它第三方可以访问共享资源: 查询可用的许可证 您可以使用 available 查询在 TSemaphore 中的剩余许可数量: 上面的代码创建一个具有两个许可证的 TSemaphore,然后立刻获得但不释放一个许可。然后,“available” 将会报告仅剩一个许可证。 执行带有自动获取和释放功能的 STM 操作 您可以将任意在 TSemaphore 上 acquire 和 release 许可证的 STM 操作,作为一个事务的一部分。与其: 不如: 最佳实践是使用 withPermit 而不是直接使用 acquire 和 release,除非更复杂的,比如涉及多个 STM 动作,并且它们不以 acquire 作为事务的起点,也不以 release 作为终点的情况。 获取和释放多个许可证 使用 acquireN 和 releaseN 一次可以获取和释放若干个许可证:

TSet

TSet[A] 是一个可以参与 STM 中事务可变集合。 创建一个 TSet 创建一个空的 TSet: 或创建具有指定值的 TSet: 或者,您可以通过具有值的集合来创建 TSet: 如果提供了重复项,则取最后一个。 将一个值加入 TSet 可以通过以下方式将新元素添加到集合中: 如果集合已经包含元素,则不会进行任何修改。 从 TSet 中删除元素 从 TSet 中删除元素的最简单方法是使用 delete 函数: 同样,可以删除满足所提供函数参数的每个元素: 或者,您可以保留所有与函数参数相匹配的元素: 请注意,retainIf 和 removeIf 与 filter 和 filterNot 具有相同的目的。分别命名它们的原因是要强调其本质上的区别。也就是说,retainIf 和 removeIf 都是破坏性的,调用它们会修改集合本身。 TSet 的并集 Set A 和 Set B 的并集表示属于 Set A 或 Set B 或两者都属于的元素集。使用 A union B 函数来修改 A 集合(取得两者的并集)。 TSet 的交集 Set A 和 Set B 的交集指的是同时属于 A 和 B 的元素集。使用 A intersect B 函数修改 A 集合(来取得交集)。 TSet 的差集 Set A 和 Set B 的差集是包含于集合 A但,集合 B 中没有元素。使用 A diff B 函数来获得。The difference between sets A and B is the set containing elements of set A but not in B. Using A diff B method modifies set A. 转换 TSet 的元素 transform(A => A) 函数允许基于集合中的每个元素计算一个新值: 以下可以压缩 TSet: 上面示例得到的结果集只有一个元素。 请注意,transform 的作用与 map 相同。对其进行不同命名的原因是要强调其本质上的区别。也就是说,transform 是破坏性的,调用它会修改集合本身。 可以通过 transformM 效果化地映射元素: fold 可以使用指定的二元运算折叠 TSet 的元素: 元素可以通过 foldM 有效折叠:The elements can be folded effectfully via foldM: 对 TSet 中的元素执行side-effect foreach 用于对集合中的每个元素执行副作用: 检查 TSet 的成员 检查元素是否存在于 TSet 中: 将 TSet 转换为 List 可以按以下方式获取集合元素的列表: TSet 的大小 集的大小可以通过以下方式获得:

TRef

TRef[A] 是可以参与 STM 事务的对不可变值的可变引用。其可变引用可以在事务内检索和设置,并被强制保证原子性,一致性并与其他事务的隔离。 TRef 在 STM 内存中发生改变时,使用了低级机制来创建事务。 创建一个 TRef 在事务内部创建TRef: 或者在事务内创建一个TRef,然后立即提交该事务,这使您可以存储并传递一个值引用。 从 TRef 中取出值 从一次事务中取回结果值: 或多次事务提交后取回值: 给 TRef 设置一个值 设置该值将覆盖现的引用内容。 在单个交易中设置值: 或(在)多次交易(中多次设置): 更新 TRef 中的值 update(A => A) 函数允许根据 TRef 中旧的值计算新的值。 在单次事务中更新值: 或(在)多次事务(中多次更新): 修改 TRef 中的值 modify(A => (B, A)): B 函数和 update 相似,但是它允许返回一些(B 类型的)信息。 在单个事务中修改值: 或(在)多次事务(中多次修改): 使用例子 这是使用 TRef 在两个纤程之间传递值的例子: 在此示例中,我们为发送者和接收者创建并提交两个事务引用,以便能够提取它们的值。在接下来的步骤中,我们创建一个原子事务,仅在发送者帐户中有足够的可用余额时才更新两个帐户。然后,我们(fork)以异步运行它。在接下去的纤程中,我们暂停它的执行直到发件人余额发生变化(在这种情况下达到零为止)。 最后,我们提取两个帐户的新值并将其合并为一个结果。 ZTRef 和 Ref[A] 类似,TRef[A] 实际上是 ZTRef[+EA, +EB, -A, +B] 类型的别名, ZTRef[+EA, +EB, -A, +B] 是一个多态的事务性引用,支持 ZRef 所提供的所有运算。有关多态引用的更多讨论,请参见 ZRef.

TQueue

TQueue[A] 是一个可以参与 STM 事务的可变队列。 创建一个 TQueue 创建一个具有指定容量的空的有界 TQueue: 创建一个空的无容量限制的 TQueue: 将元素加入一个 TQueue 将元素放入一个 TQueue: 如果队列未满,则指定的元素将被成功添加到队列中。否则,它将等待队列中的空插槽位。 另外,您可以使用一个元素列表来填充队列: 从 TQueue 中取回元素 您可以从队列中取回第一个元素,如下例: 如果队列为空,它将阻塞等待您所期待的元素。 可以通过使用 poll 方法来避免此阻塞行为,该方法将返回一个元素(如果存在)否则返回 None: 取回队列的前n个元素: 可以按以下方式获取队列的所有元素: TQueue 的大小 可以按以下方式获取队列中元素的个数:

TPromise

TPromise 是一个可以设置一次,并且可以参与 STM 事务的可变参考。 创建一个 TPromise 创建一个 TPromise: 结束一个 TPromise 成功完成 TPromise: 得到一个失败的 TPromise: 另外,您也可以使用 done 组合器,并通过传递 Either[E, A] 来完成 Promise: 设置它的值后,之后任何尝试对其进行设置的操作都会返回 false。 从一个 TPromise 中得到值 如果 Promise 已经完成,则返回结果,否则返回 None: 或者,您可以(阻塞)等待 Promise 的完成并将值返回:

TPriorityQueue

TPriorityQueue[A] 是一个可以参与 STM 事务的可变队列。一个 TPriorityQueue 中包含了类型为 A 的带有顺序定义的值。与 TQueue 不同,take 返回的是最高优先级(指定顺序中的第一个)值,而不是队列中的第一个值。当从队列中取出时,不保证共享相同优先级的元素的顺序。 创建一个 TPriorityQueue 您可以使用 empty 函数创建一个空的 TPriorityQueue: 请注意,TPriorityQueue 的创建使用了隐式 Ordering。默认情况下,take 将返回指定顺序中第一个的值。例如,在按时间排序的事件队列中,最早的事件将被首先取出。如果您想要不同的行为,可以使用自定义的 Ordering。 您还可以使用 fromIterable 或 make 构造函数创建一个使用指定元素初始化的TPriorityQueue。fromIterable 构造函数采用 Iterable,而 make 构造函数采用可变参数元素序列。 向 TPriorityQueue 添加元素 您可以使用 offer 或 offerAll 方法将元素添加入 TPriorityQueue。如果您要同时向队列添加多个元素,则 offerAll 方法会更加高效。 从 TPriorityQueue 中获取元素 使用 take 从 TPriorityQueue 中获取一个元素。take 在语义上会阻塞,直到队列中至少要取一个值为止。您还可以使用 takeAll 立即获取队列中当前的所有值,或使用 takeUpTo 立即获取队列中指定数量的元素。 您也可以使用 takeOption 方法从队列中获取第一个值(如果不存在也不会被挂起),或者使用 peek 方法观察队列中的第一个元素(如果存在)而不将其从队列中删除。 有时,您想要对队列的当前状态进行快照而不修改它。为此,toChunk 组合器或其变体 toList 或 toVector 非常有用。这些函数将返回一个不可变的集合,该集合由当前队列中的所有元素组成,而队列的状态保持不变。 TPriorityQueue 的大小 您可以使用 size 方法检查 TPriorityQueue 的大小:

TMap

TMap[A] 是一种可以参与 STM 事务的可变映射。 创建一个 TMap 创建一个空的 TMap: 或创建具有指定值的 TMap: 或者,您可以通过提供的元组集合来创建 TMap: 将键值对存入一个 TMap 可以通过以下方式将新的键值对添加到 map 映射: 在 map 中添加条目的另一种方法是使用 merge: 如果该键不存在于 map 中,则其行为类似于简单的 put 方法。否则,使用提供的函数将现有值与新值合并。 从 TMap 中删除元素 从 TMap 中删除键值对的最简单方法是使用采用 delete 删除某个键: 同样,它可以删除满足函数参数的每个键值对: 或者,您可以保留所有与函数参数匹配的键值对: 请注意,retainIf 和 removeIf 与 filter 和 filterNot 具有相同的目的。但是分别命名它们的原因是要强调其本质上存在的区别。也就是,retainIf 和removeIf 都是破坏性的,调用它们会修改原集合。(而 filter 和 filterNot 只是返回新集合而不修改原集合) 从 TMap 中读取 可以通过以下方式获取与键关联的值: 或者,如果 map 映射中不存在该键,则可以提供默认值: 转换 TMap 中的条目 函数 transform((K, V) => (K, V)) 可以用于为 map 映射中的每一个条目计算新的值: 请注意,它也可以用来压缩 TMap: TransformM 可以用于效果化地映射条目: 函数 transformValues(V => V) 可以为 map 中的每个值计算一个新值: 可以通过 transformValuesM 效果化地处理 map 中的这些值: 请注意,transform 和 transformValues 的用途与 map 和 mapValues 相同。 之所以分别命名它们的原因是要强调其本质上的区别。也就是说,transform 和 transformValues 都是破坏性的,调用它们可以修改原集合。 fold 使用指定的两个关联运算来遍历折叠 TMap 中的元素: foldM 可以效果化地折叠原属: 对 TMap 键值对执行side effect计算 foreach 用于对映射中的每个键值对执行 side-effect 计算: 检查 TMap 中的成员 检查键值对是否存在于 TMap 中: 将 TMap 转换为 List 可以通过以下方式转换成元组列表: 可以按如下方式获得键列表: 可以按以下方式获得值列表:

TArray

TArray 是可以参与 STM 事务的可变引用的数组。 创建一个 TArray 创建一个空的 TArray: 或创建具有指定值的 TArray: 或者,您可以通过指定的集合来创建 TArray: 从 TArray 读取值 可以通过以下方式获得数组的第 n 个元素: 访问不存在的索引会引发 ArrayIndexOutOfBoundsException 异常并中止事务。 更新 TArray 中的值 可以按照以下步骤更新数组的第n个元素: 可以通过 updateM 效果化地更新数组的第n个元素: 更新不存在的索引会引发 ArrayIndexOutOfBoundsException 并中止事务。 转换 TArray 的元素 transform(A => A) 函数可以为数组中的每一个元素计算新值: 可以通过 transformM 效果化地映射元素: fold 通过两个指定的运算遍历折叠 TArray 中的元素: 可以通过 foldM 效果化地进行遍历折叠: 对 TArray 元素执行 side-effect 运算 foreach 用于对数组中的每个元素执行 side-effect 运算:

BACK TO TOP