Stream[E, A] 表示一个可以产生 A 类型输出值,或可能以 E 类型为失败值的,效果化的流。 新建 Stream 或产生自 Iterable: 转换一个 Stream ZIO Stream 提供了许多标准的转换函数,例如:map,partition,grouped,groupByKey,groupedWithin等。以下是如何使用它们的示例。 map partition partition 根据函数参数将 stream 分成多个流元组。第一个流包含评估为 true的所有元素,第二个流包含评估为 false 的所有元素。较快的流可能比较慢的流领先,领先的程度受缓冲大小的限制。两个流都以 ZManaged 类型打包。在下面的示例中,左流仅包含偶数。 grouped 可以使用分组(grouped)函数将流的结果划分为指定的块大小。 groupByKey 可以使用 groupByKey 或 groupBy,按函数的执行结果对流进行分区。在下面的示例中,检查的结果被分组并计数。 groupedWithin groupedWithin 允许按时间或块大小对事件进行分组,以先满足者为准。在下面的示例中,每个块均最多包含 30 个元素,并且每 3 秒生成一次。 消费一个 Stream 使用 Sink Sink[E, A0, A, B] 表示接受的消费类型为 A,最终产生或者 E 型的错误, B 型的成功结果,以及剩余的类型为 A0。 例如,您可以使用 Sink.foldLeft 将 Stream 中的数据累加到单一个 ZIO 值: 在多个流上工作 您可以使用合并方法合并多个流: 或合并(zip)多个流: 然后您可以将流中的原属合并为单个 ZIO 值: 流压缩 解压 如果您读取到 Content-Encoding: deflate, Content-Encoding: gzip 或其它此类压缩数据流,则以下转换器可能会有所帮助: inflate 转换器可以根据 RFC 1951 标准对 deflated 格式的压缩输入流进行解压缩。 gunzip 转换器可以根据 RFC 1952 标准对 gzipped 格式的压缩输入流进行解压缩。 如果输入未经过正确的压缩,这两种解压缩方法都将以 CompressionException 作为失败类型。 压缩 deflate 转换器根据 RFC 1951 标准对流中的字节进行压缩。
ZSink[R, E, A, B] 用于消费从流中产生的元素。您可以将此接收器视为消费可变数量的 A 元素(可能为 0、1或很多!)的函数,可能因 E 类型错误而失败,或最终产生 B 类型的值。 ZSink 被作为参数传递给 ZStream#run: 建立 sinks zio.stream 提供了多种不同的 sink 供使用。 将数据收集到 Chunk[A] 中: 尝试将第一个元素接收到一个 Option 中(如果流为空则返回 None): stream.runDrain 的以下实现将忽略流中所有的输入: 产生一个给定类型的失败: 基本的接收数据累积函数: 具有短路功能的折叠器: sinks 转换 创建 Sink 后,我们可以使用提供的操作对其进行转换。 并行运行两个接收器并返回先执行完成的那一个: 我们可以使用 contramap,通过给定 C => A,其中 C 是输入类型,而 A 是 Sink的接收的元素类型,将给定的输入转换为某个特定的 Sink dimap 是 contramap 的扩展,它还可以指定 Sink 的输出转换:
信号量 Semaphore 数据类型,它允许通过 withPermit 方法在纤程之间进行同步,该方法可以安全地获取和释放许可证。信号量是基于 Ref[A] 数据类型的。 操作 例如,异步任务可以通过获取和释放具有给定数量许可的信号量来完成彼此的同步。当信号量中的许可值不足,获取操作无法执行时,该任务将在纤程队列中被置于挂起状态,直到有足够的许可值时被唤醒: (以上)二值信号量只是一种特殊的信号量。我们可以要求获取和释放任意指定数量的信号量: withPermit(及其对应的计数版本 withPermits)的保证是,无论任务是成功,失败还是被中断,许可证在被成功获取之后都会被释放。
Schedules 允许你定义和编写灵活的重复执行调度器,这些事件可以是重复的计算,或在出现错误时重试操作。Schedules 被用于以下场景: 重复 IO#repeat —— 重复执行 effect,直到调度计划结束。 IO#repeatOrElse —— 重复执行 effect,直到调度计划完成,如果得到错误,则返回另一个 effect 的结果。 IO#repeatOrElse0 —— 重复执行 effect,直到调度计划完成,如果得到错误,则返回另一个具有更多(定制)能力的 effect 的结果。 Retries IO#retry —— 重试一个 effect 直到成功。 IO#retryOrElse —— 运行一个 effect,如果失败则尝试另一个 effect,不断重试两者直到成功。 IO#retryOrElse0 —— 运行一个 effect,如果失败则尝试另一个具有更多(定制)能力的 effect,不断重试两者直到成功。 Schedules 定义了有状态的,可能有效果的事件的重复执行调度计划,并以允许以多种方式进行组合。 一个 Schedule[R, A, B] 它的输入类型为 A(A 在 retry 情况下为(前一调用的)错误类型,或在 repeat 情况下为(前一调用的)输出类型),并根据这些值和内部状态决定是重复执行还是返回结果。每个决策都会伴随(可能为零的)延迟,该延迟指示下一次重复发生之前需要多少时间停顿,并最终得到 B 类型输出值。 基本的 Schedules 一个永远循环的 Schedule: 一个循环 10 次的 Schedule: 一个每 10 毫秒循环一次的 Schedule: (缺省为平方)指数延迟的 Scheduler 以 fibonacci 计数(每次延迟是前两次延迟之和)的延迟: Schedule 组合器 给 Schedule 附加一个随机修正值。 修改 schedule 的延迟间隔: 依次串行组合两个调度器,先遵循第一个策略直到结束,然后遵循第二个策略直到结束: 合并两个调度器(取交集),仅在两个调度器都得到满足时才以两者之间的最大延迟重复执行调度: 合并两个调度器(取并集),如果两个调度计划中的任何一个想要重复执行,则使用两次重复之间的最小延迟来重复: 经过指定的时间后,停止重试: 仅在发生特定异常时重试:
Ref[A] 是对 A 类型的可变引用值的建模。它有两个基本操作:set,向 Ref 中填入新值;get,从中获得当前的值。Ref 上所有的操作都是原子且线程安全的,这为同步并发程序提供了可靠的基础。 更新一个 Ref 使用 Ref 最简单的手段是使用 update 或它更强大的好兄弟 modify。通过这些我们可以轻易地写一个像这样的 repeat 组合器。 状态转换器 可变量的阴暗面在于它们能够轻易地被改变;它们可以像圣诞节装饰品一样添加在任何地方并改变状态。比如: 作为函数式程序员,我们相当了解如何对付它。我们可以 S =>(A, S) 类型的函数形式来捕获状态的改变。Ref 提供了这样的编码能力,S 用来表达值的类型,并通过 modify 引介状态改变函数。 构建更复杂的并发原语 Ref 的级别足够低,以至于可以用作其他并发数据类型的基础。 信号量是一种经典的用于控制对共享资源的访问的抽象数据类型。它的定义为一个三元组形式 S = (v, P, V),其中 v 是当前可用资源的单位数,P 和 V 分别是对 v 进行递减和递增的运算;P 只能在 v 是非负数的时候才能对它进行递减,否则必须等待直到非负为止。 现在,有了 Refs,我们可以很容易实现它!唯一的困难在于 P,我们必须在 v 为负数时,或在我们读取并试图改变它时,它恰好被(别的线程)改变了,这时让 P 失败并重试。 一个不成熟的实现看上去可以像下面这样: 现在让我们伴随前几天在市场上发现的这些鳄鱼皮靴子摇滚起来,在夜总会测试我们的信号灯吧,来吧! 同时不用说,您应该看一下 ZIO 内建的 Semaphore,它可以完成所有的这些甚至更多工作而不会浪费任何 CPU 周期。 多态的 Refs Ref[A] 实际上是类型 ZRef[Nothing, Nothing, A, A] 的别名。ZRef 的类型签名如下: ZRef 是对可变引用的多态的,纯函数的描述。它的基本操作包括 set 和 get。set 接受一个类型为 A 的值并将引用设置为该新值,这个操作可能以 EA 错误类型失败。get 获取并返回当前的 B 类型的引用值,或者它可能以 EB 错误类型失败。 当 ZRef 的错误和值类型统一时,即 ZRef[E, E, A, A]。ZRef 还支持如上所述的原子 modify 和 update 操作。 一个简单的用例是可以获得引用值的只读或只读视图:
Queue 是一个基于 ZIO 的轻量级的,驻扎在内存的队列,具有可组合性,并且支持透明的背压。它是完全异步(无锁或无阻塞)的,纯函数且类型安全的。 Queue[A] 包含的值的类型为 A,并且具有两个基本操作:offer,将 A 放入队列中,和 take, 删除并返回队列中最旧的值。 新建一个队列 一个 Queue 可以是有界的(容量有限)或无界的。 当队列已满时,有几种策略可用于处理新值: 对默认的有界(bounded)队列是反压:填满后,任何提供数据的纤程都将被挂起,直到队列能够添加新内容为止; 下降(dropping)队列当队列满时会丢弃新的数据; 当滑动(sliding)队列满时会丢弃最陈旧的数据。 建立一个支持背压的有界队列: 建立一个下降队列: 建立一个滑动队列: 建立一个无界队列: 向队列添加数据 向队列添加值的最简单方法是 offer: 使用背压队列时,如果队列已满,offer 可能会暂时挂起:您可以使用 fork 让其在其它纤程中等待。 还可以使用 offerAll 一次添加多个值: 从队列中消费数据 take 操作从队列中删除最旧的数据并返回它。如果队列为空,它将时挂起,直到新的数据添加到队列后才继续。与 offer 一样,您可以使用 fork 让其在其他纤程中等待新值。 您可以通过 poll 来消费最旧的数据。如果队列为空,你会得到 None,否则数据将被包装在 Some 返回。 可以使用 takeUpTo 一次消费多个数据。如果队列中没有足够的数据要退回,则它将返回所有数据,而无需等待更多 offer 。 同样,您可以使用 takeAll 一次获得所有数据。它也无需等待就返回(如果队列为空,则为空列表)。 关闭队列 可以通过 shutdown 来中断(interrupt exception)所有处于 offer* 或 take* 而挂起的纤程。它还将清空队列,并让所有未来的 offer* 和 take* 调用立刻结束(interrupt exception)。 您可以在队列关闭时使用 awaitShutdown 执行 effect。它将等待直到队列被关闭。如果队列已经关闭,它将立即返回。 转换队列Transforming queues 实际上,Queue[A] 是 ZQueue[Any, Any, Nothing, Nothing, A, A] 类型的别名。完整版本的签名为: 它的含义是: 这个队列接收 A 类型的值。需要 RA 类型的环境支持入队操作,并且可能会因EA 类型的错误而失败; 队列将产生 B 类型输出值。需要 RB 型环境支持出队操作,并可能因 EB 型错误而失败。 请注意,基本的 Queue[A] 不会失败也不需要任何环境来支持进行任何操作。 基于输入和输出的不同类型参数,可以实现各种不同的队列组合: ZQueue#map 可以将队列的输出映射(到不同类型): ZQueue#mapM 我们还可以使用一个效果(effectful)函数来映射输出。例如,我们可以为每个元素加上出列的时间戳: ZQueue#contramapM 与 mapM 相似,我们也可以在元素入列时将效果(effectful)函数应用于元素。此例将使用其入队时间戳注释元素: 此例中的队列与上一个例子中的队列具有相同的类型((Long, String)),但是当元素排队时,时间戳附加到元素上。这反映在排队入队所需的环境类型中。 一个完整的示例:我们可以将此队列(contramapM)与 mapM 结合起来,以计算元素保留在队列中的时长: ZQueue#bothWith 我们也可以将两个队列组合成一个队列,该(组合)队列广播 offer (到两个队列中)并(同时)从两个队列中接收(take)消息: 附加资料 ZIO Queue Talk by John De Goes @ ScalaWave 2018 ZIO Queue Talk by Wiem Zine El Abidine @ PSUG 2018 Elevator Control System using ZIO Scalaz 8 IO vs Akka (typed) actors vs Monix
Promise[E, A] 是只能被设置一次的 IO[E, A] 类型的变量。 Promise 用于构建更高级别的并发原语,通常用于需要多个纤程相互协调传递值的情况。 建立 可以使用 Promise.make[E, A] 来创建 Promise,它返回 UIO[Promise[E, A]]。这是对创建 Promise 的描述,而不是实际的 Promise。不能在 IO 外部创建Promise,因为创建它们涉及分配可变内存,这是一种 effect,必须安全地在 IO 中捕获。 运算 完成 您可以通过几种不同的方式完成 Promise[E, A]: 使用 succeed 成功地设置 A 类型的值。 用 done 设置 Exit[E, A] —— await 将得到该退出值。 用 complete 获取 IO[E, A] effect 的结果 —— effect 将会被(而且只会被)执行一次,其结果会被所有等待该 promise 的纤程得到。 用 completeWith 将 effect IO[E, A] 设置为结果 —— 第一个调用 completeWith 的纤程赢得设置该 effect 的权限,并且而该 effect 在 promise 每次被 await 时(才)都会被求值,因此要小心使用completeWith(someEffect),尽可能使用 complete(someEffect) 除非确实需要让每个线程在 await 的时候执行效果。 用 fail 设置 E 类型失败。 通过 die 让 promise 抛出 Throwable 异常 通过 halt 为 promise 设置 Cause[E] 失败或异常 使用 interrupt 终止 promise 以下示例显示了所有这些的用法: 完成 Promise 的操作后会产生一个UIO[Boolean],其中 Boolean 表示已设置成功(true)还是已经被设置(false)。如下所示: 另一个关于 fail(…) 的例子: 再次重申,布尔值告诉我们操作是否成功(true),即成功设置了 Promise 的值或已经被设置而导致错误。 等待 您可以使用 await 从 Promise 中获取值,调用的纤程将被挂起直到 Promise 完成。 轮询 计算将暂停(以非阻塞方式),直到 Promise 出现一个值或错误为止。如果您不想暂停,而只想查询 Promise 是否已完成的状态,则可以使用 poll: 如果调用 poll 时 Promise 未完成,则 IO 将以返回 Unit 值的方式宣告失败,否则将获得 IO[E, A],其中 E 表示 Promise 完成但有错误,而 A 表示 Promise 成功完成返回一个 A 值。 isDone 返回 UIO[Boolean],如果 Promise 已经完成,则评估为 true。 使用案例 这是一个如何在两个纤程之间使用 Promise 交换数据的案例: 在上面的示例中,我们创建了一个 Promise 并让一个 Fiber(fiberA)在1秒后完成该 Promise,第二个 Fiber(fiberB)在该 Promise 上调用 await 以获取字符串,然后将其打印到屏幕上。该示例在 1 秒后将 hello world 打印到了屏幕上。请记住,这只是程序的描述,而不是执行本身。
Managed 是一个封装了资源的 acquire 和 release 的数据结构。 Managed[E, A] 表示一个类型为 A 的托管资源,它可以通过 use 方法被使用。资源将在使用之前自动获取资源,并在使用之后自动释放资源。 如果资源无法在 use 范围内生效,这意味着您可能在获得资源后,在 use 中将其浪费掉,然后在资源消耗完后再次使用它,根据资源提供的功能类型它可能已经不再有效,并且可能会因检查错误而失败。 在此示例中,Managed 类在调用 use 时创建队列,并在 doSomething 完成时调用 shutdown。 创建一个 Managed 如上例所示,可以通过传递 acquire 函数和 release 函数来创建 Managed。 也可以从 effect 中创建。在这种情况下,release 函数将不执行任何操作。 您也可以从纯值创建 Managed。 ZIO 环境中的 Managed Managed[E, A] 实际上是 ZManaged[Any, E, A] 的别名。如果您希望acquire,release 或 use 函数中使用环境 R,请使用 ZManaged 来代替 Managed。 合并 Managed 可以使用 flatMap 将多个 Managed 合并在一起,以得到获取和释放所有资源的单个 Managed。
Posted on in Cats, Cats Effect, Cats中文文档, scala, 泛函
Cats IO 是一种将 side effect 编码为纯值的数据类型,它能够描述同步和异步计算。 介绍 IO[A] 类型的值代表一种计算模型,在对它进行求值时,它将执行某种效果运算并返回 A 类型的值。 IO 的值是纯的,不变的,因此保留了引用透明性,因此被用于函数式编程。 IO 是一种数据结构,仅表示对副作用计算的描述。 IO 可以描述以下同步或异步计算: 只能得到唯一解。 可能以成功结束或以失败结束,并且在失败的情况下,flatMap 链会被短路(IO 实现了 MonadError 代数结构)。 可以取消,但请注意,此功能依赖用户提供取消逻辑。 由抽象的过程所描述的 effect 将不会被执行直到“世界的尽头”,具体地说,直到某个“unsafe”方法被调用为止。 效果的执行结果是不被记忆的,这意味着内存开销最小(并且没有泄漏),并且,单个 effect 可以被以“引用透明”的方式被多次执行。例如: 上面的这个的例子,“hey!”被打印了两次,因为这个 effect 在“单子”链中被重复地执行。 引用透明和惰性求值 IO 可以“冻结” side effect,因为它是一种惰性求值的数据类型。请参考以下分类并反复与标准库中的“Future”的进行比较,来理解求值模型(在 Scala 语境中)的全貌。 Eager Lazy Synchronous A () => A Eval[A] Asynchronous (A => Unit) => Unit () => (A => Unit) => Unit Future[A] IO[A] 通过与 Scala 的 Future 比较,IO 数据类型即使在处理副作用时也保留了引用透明性,并且它是惰性求值的。与像 Scala 这样的即时求值语言相比较,这是结果与产生结果的函数之间的区别。 与 Future 类似,通过 IO,您可以推断异步处理的结果,但是由于其纯性和惰性,可以将 IO 视为一个规范(直到“世界的尽头”才进行求值),从而可以对 IO 的求值模型施加更多的控制,并且更具可预测性。例如当组合多个 IO,或处理错误时,是以序列化的方式处理,还是以并行的方式处理。 注意惰性求值总是与引用透明性并存。参考以下示例: 如果我们考虑引用透明性,则可以将该示例重写为: 但是这不适用于 Future,但适用于 IO,此能力对于函数式编程至关重要。 堆栈安全 IO 在它的 flatMap 中是以 trampoline 的方式进行求值的,这意味着您可以在任意深度的递归函数中安全地调用 flatMap,而不必担心会顶爆堆栈: 根据 IO 中实现的类型类的层次结构。除某些函数外,所有这里定义的操作都可用于 IO。 Effects 介绍 IO 是一种强大的抽象,它可以效果化地描述多种不同的 effect: 纯值 — IO.pure & IO.unit 在 IO 的伴随类中定义了以下函数,可以用于将纯值加载到 IO 中,从而生成出“已经求值”的 IO 值: 请注意,给定的参数形式是值传递而不是按名(by-name)传递。 例如,我们可以将一个数字(纯值)放入 IO 中,然后将其安全地与另一个打包了side effect 的 IO 组合在一起,因为它们将不执行任何操作: 显然,IO.pure 无法挂起副作用,因为当参数被以值传递给它时,IO.pure 是即时求值的,因此请不要这样做: 在这种情况下,println 将触发副作用,该副作用不会在 IO 中被挂起,有鉴于此,这样的代码可能不是我们想要的。 IO.unit 是 IO.pure(()) 的简化的别名,可在需要 IO[Unit] 值时重复使用,而无需担心触发任何其他副作用: IO[Unit] 在 Scala 代码中使用的相当普遍,Unit 类型本身就意味着调用的返回有副作用,它可以作为 pure(()) 的快捷方式,并且可以起到优化的用处,因为返回了相同的引用。 同步效果 — IO.apply 它可能是最常用的构建器,等效于 Sync[IO].delay,描述了可以在当前线程和调用堆栈上立即做求值的 IO 操作: 请注意,给定的参数是通过“by-name”传递的,其执行被“暂停”在 IO 上下文中。 这个示例是在 JVM 之上使用阻塞 I/O 从控制台读取和写入信息: 然后,我们可以以纯函数的方式使用它们来对与控制台的交互进行建模: 异步 […]
IO[E, A] 类型的值,是一个可能导致 E 类型的失效,或永远运行,或产生 A 类型成功值的 effect。 IO 的值是不可变的,并且所有的 IO 函数都会产生新的 IO 值,使得 IO 像其它普通的 Scala 不变数据结构一样可以被推理和使用。 IO 值实际上啥也不做;它们只是一个描述交互效果的模型的值。 IO 可以被 ZIO 运行时系统解释成与外部世界的交互效果。理想情况下,这个过程在应用程序的 main 函数中一次性发生的。 App 类自动提供了此功能。 纯值 您可以通过 IO.succeed 将一个纯值装载入 IO 绝对不要使用任何构造函数将不纯代码导入 IO。这样做的结果是不确定的。 不会失败的 IO UIO[A] 类型的 IO 值(它的错误类型为 Nothing)被认为是不会失败的,因为Nothing 类型表示不存在的,即,不能有 Nothing 类型的实际值。此类型的值可能会产生 A 类型的成功结果,但永远不会导致 E 类型的失败。 不产生有效值的 IO IO[E, Nothing] 类型的 IO 值(其中值类型为 Nothing)被认为是无有效值的,因为 Nothing 类型表示不存在的,即不能有 Nothing 类型的实际值。此类型的值可能会得到 E 类型的失败,但永远不会产生成功的结果值。 不纯的代码 您可以使用 IO 的 effectTotal 方法将同步效果的代码导入为纯函数程序: 它的执行结果有可能是任何 Throwable 类型的失败。 如果(这个结果的)范围太广,可以使用 ZIO 的 refineOrDie 方法仅保留对某些类型的异常的关注,而其他任何类型的异常则直接导致“死亡”: 您可以使用 IO 的 effectAsync 方法将异步效果的代码导入为纯函数程序: 在此示例中,假设 Http.req 方法在得到异步执行的结果后将调用指定的回调函数。 映射 您可以通过调用 map 方法时给予函数 A => B 来将 IO[E, A] 更改为 IO[E, B]。这可以将前一操作产生的值转换为另一个值。 您可以在调用 mapError 方法时使用函数 E => E2 ,将 IO[E, A] 转换为 IO[E2, A]: 链式调用 您可以使用 flatMap 方法依次执行两个操作。第二动作的执行取决于第一动作产生的值。 您可以使用 Scala 的 for comprehension 语法使这种类型的代码更紧凑: Brackets bracket 是内置原语,可让您安全地获取和释放资源。 Brackets 被用在类似 try/catch/finally 的场景,但是 brackets 可以被用于同步和异步,可以和纤程中断无缝配合。它基于不同的错误模型构建,以确保不会丢失任何错误。 Brackets 由一个获取方法,一个使用方法(用于使用获取的资源),和一个释放方法组成。 释放动作由运行时来保证执行,哪怕使用中抛出了异常或执行的纤程被中断。 Brackets 支持组合语义,因此,如果将一个 bracket 嵌套在另一个 bracket 内,如果外部 bracket 获取了资源,那么即使内部 bracket 的释放失败,外部 bracket 的释放动作也依然会得到执行。 有一个名为 ensuring 的方法提供了另一个类似 finally 的功能: 一个完整的使用 brackets 的例子