ZLayer[A, E, B] 描述应用程序的一层:应用程序中的每个层都需要一些服务(作为输入)并产生出一些服务(作为输出)。Layers 可以被视基于给定他们的依赖关系(其他服务)而产生这些服务绑定所需要的清单。 层的构造可以使资源的使用效果化,并且使资源被安全地获取,使用完后安全地释放。 默认情况下,层(Layer)是共享的,这意味着如果同一图层使用两次,则该图层将仅分配一次。由于层具有出色的可组合性,因此它们是 ZIO 中创建依赖于其他服务的服务的惯用方式。 最简单的 ZLayer 应用 具有依赖项的 ZLayer 应用 复杂的 ZLayer 依赖案例
Chunk[A] 代表一大块类型为 A 的值。块的设计块通常由数组支持,但向下层元素公开纯函数式的,安全的接口,并且它们对使用数组代价高昂的操作(例如重复级联)采用懒模式。 新建一个块(chunk) 块连接(Concatenating): ++ 操作返回当前块与指定块的串联块。例如: 块搜集(collecting): collect 过滤、映射块中的元素并返回新的块。以下是如何使用 collect 函数从Chunk[A] 中挑选所有字符串的例子: 如何使用 collect 函数从 Chunk[A] 中挑选所有数字[A]: collectWhile (从左到右)收集元素,直到选择条件首次返回“ false”: 或者另外一个例子: 块删除(dropping): drop 函数从块中删除前 n 个元素。 dropWhile 依据条件函数删除所有返回 true 的元素: 块比较(Comparing): 块转换(Converting) toArray 将一个块转换成 Array。 toSeq 将一个块t转换成 Seq。
Stream[E, A] 表示一个可以产生 A 类型输出值,或可能以 E 类型为失败值的,效果化的流。 新建 Stream 或产生自 Iterable: 转换一个 Stream ZIO Stream 提供了许多标准的转换函数,例如:map,partition,grouped,groupByKey,groupedWithin等。以下是如何使用它们的示例。 map partition partition 根据函数参数将 stream 分成多个流元组。第一个流包含评估为 true的所有元素,第二个流包含评估为 false 的所有元素。较快的流可能比较慢的流领先,领先的程度受缓冲大小的限制。两个流都以 ZManaged 类型打包。在下面的示例中,左流仅包含偶数。 grouped 可以使用分组(grouped)函数将流的结果划分为指定的块大小。 groupByKey 可以使用 groupByKey 或 groupBy,按函数的执行结果对流进行分区。在下面的示例中,检查的结果被分组并计数。 groupedWithin groupedWithin 允许按时间或块大小对事件进行分组,以先满足者为准。在下面的示例中,每个块均最多包含 30 个元素,并且每 3 秒生成一次。 消费一个 Stream 使用 Sink Sink[E, A0, A, B] 表示接受的消费类型为 A,最终产生或者 E 型的错误, B 型的成功结果,以及剩余的类型为 A0。 例如,您可以使用 Sink.foldLeft 将 Stream 中的数据累加到单一个 ZIO 值: 在多个流上工作 您可以使用合并方法合并多个流: 或合并(zip)多个流: 然后您可以将流中的原属合并为单个 ZIO 值: 流压缩 解压 如果您读取到 Content-Encoding: deflate, Content-Encoding: gzip 或其它此类压缩数据流,则以下转换器可能会有所帮助: inflate 转换器可以根据 RFC 1951 标准对 deflated 格式的压缩输入流进行解压缩。 gunzip 转换器可以根据 RFC 1952 标准对 gzipped 格式的压缩输入流进行解压缩。 如果输入未经过正确的压缩,这两种解压缩方法都将以 CompressionException 作为失败类型。 压缩 deflate 转换器根据 RFC 1951 标准对流中的字节进行压缩。
ZSink[R, E, A, B] 用于消费从流中产生的元素。您可以将此接收器视为消费可变数量的 A 元素(可能为 0、1或很多!)的函数,可能因 E 类型错误而失败,或最终产生 B 类型的值。 ZSink 被作为参数传递给 ZStream#run: 建立 sinks zio.stream 提供了多种不同的 sink 供使用。 将数据收集到 Chunk[A] 中: 尝试将第一个元素接收到一个 Option 中(如果流为空则返回 None): stream.runDrain 的以下实现将忽略流中所有的输入: 产生一个给定类型的失败: 基本的接收数据累积函数: 具有短路功能的折叠器: sinks 转换 创建 Sink 后,我们可以使用提供的操作对其进行转换。 并行运行两个接收器并返回先执行完成的那一个: 我们可以使用 contramap,通过给定 C => A,其中 C 是输入类型,而 A 是 Sink的接收的元素类型,将给定的输入转换为某个特定的 Sink dimap 是 contramap 的扩展,它还可以指定 Sink 的输出转换:
在 OO 中如果我们要实现某种“行为”,一般来讲我们会采用在 interface 或抽象基类中定义方法,然后在子类中实现方法的做法,这也是 OO 所倡导的“继承”和“重写”。假设我们现在要写一个“超级玛丽”游戏,在这个游戏中我们当然要实现一个 Mario,并且这个 Mario 至少需要会 move 和 Jump,也就是说我们肯定需要这样两个方法。并且考虑到游戏中肯定不止只有 Mario 会跑会跳,很多怪物也都会,所以我们需要设计一个叫 Npc 的基类,并且在这里定义一些虚函数,于是我们得到这样一个实现: So far so good. 可是写着写着我们发现,其实并不是每一个 Monster 都会 jump,比如 Mushroom就不会 jump,可是因为我们将 jump 这个方法定义在了基类里,这导致我们不得不为每一个怪物都实现 jump方法,这显然就不合适了,为了应对这一个小小的挑战,Easy! 我们只需对对象进行重构,将 jump方法拆出来单独放在一个 interface里就可以了,修改后我们得到一个新的 Jumpable interface,它包含了从 Npc 里面拆出来的 jump 方法: Mario 也调整为: 而 Mushroom 就可以实现为: 这样看上去好多了。 Continue… 接着我们遇到一个新的挑战,我们发现有些怪物不仅会走,而且会飞!参考之前的例子,我们又实现了一个 Flyable interface 成功地解决了这个问题。到目前为止 一切都还在 OO 的轨道上正常运行。接下来我们又遇到了一个挑战:我们要实现一个会“变形”的怪物,这个怪物初期的时候会飞,可是在遭打打击后会失去飞的能力但是获得走的能力,我们需要再次重构能力之间的关系,……并且不断有新的怪物出现,有些怪物可能只会飞,有的会游泳……,甚至我们还遇到连最基本的 move 都不会的怪物,比如“食人草” ……随着不断地重构,我们会发现我们陷入了重构陷阱,我们需要不断从基类中分离出新的接口,也可能将新的方法添加到基类中去。而每次对基类的改动都会造成巨大的扰动,这种扰动随着代码规模的扩大,修改也越来越困难。甚至有些方法,它们需要同时出现在没有继承关系的两个对象中,这种需求通过重构基类都无法解决。 为什么会出现这样的麻烦呢?这是因为 OO 强调的高内聚加强了方法之间的耦合,而这种耦合对程序是有害的,我们很难在设计的初期就对未来功能的耦合性做很好的预测,特别是对具有版本演进的应用而言,这种前瞻性是很难一步做到位的。这就使我们不断进入重构陷阱。有没有一种方法让我们避免这样的陷阱呢?答案就是接下来介绍的“鸭类”(duck mode)模型。 先介绍“鸭类”这个名称的来源。假设我们需要得到一只鸭子,先思考一下凭什么我们认为“它”将会是一只合格的鸭子呢?可能最初我们只需要它会游泳,并且它会“嘎嘎”叫就行。如果以 OO 的观点来看这有点开玩笑了,会游泳和嘎嘎叫的可不止鸭子,它至少要来自禽类。可是别忘记了,鸭子可不会飞!所以它来自不来自禽类对于鸭子来说其实意义并不大!我们只关注它此刻能做什么,而不是关注它来自什么,如果一只猴子学会了游泳和嘎嘎叫,那么我们就用它来冒充鸭子有什么不可以吗?是的,“鸭类”就是这么任性,我们将这种面向能力的类统称为“鸭类”。在更多场合下你也可能看到“蛋糕模型”这个词汇,它们基本上指的是同一个意思。 那么怎么构造出一个鸭类呢?我们需要了解一个新的词汇叫 mix in(混入),以混入的角度,我们不需要考虑基类。坚守一个可能连 move 方法都是多余的基类真的是毫无意义。从混入的角度来看一个对象的本质,它只是一个专有的数据结构,比如身高,重量,颜色……等等,而所有的“方法”都是在构造的时候才添加进去,就好像工厂里组装台上的流水线。 还是以 Super Mario为例,其实通过不断地重构,现在我们已经获得了许许多多小的部件,比如 flyable, movable, jumping, swimable…和它们不同的实现,它们都被拆到了不同的“类”中,确切滴说,此时我们不再称它们为“类”,因为我们并不打算遵循OO的理念来使用它们,我们给他们一个新的称谓“特质”以有别于传统的“类”,在Scala语言中它有一个专有的关键词:trait。接下来我们是这样构建我们的Npc: 在以上代码中我们先是构建了一系列以行为为准则的“特质”,接下来当我们构建Mario 这个玩家的时候,我们既没有去实现它的行为也没有继承自父类,只是用一个关键词 with 来声明我们希望这个npc具有某种特定的 move 方式(在这里我们使用了特设泛型参数来指定我们需要一个专属于 Mario 的 Move 实例来提供支持),然后我们就可以在 play 里面直接使用这个方法了。这里的魔法还包括了那个奇怪的 “self: =>” 语法,这条语法被称为“自类型”,它的意识是“你认为你是什么你就是什么”。至于这个“什么”,就是冒号后指定的那一系列特质。在这里的意识是:我认为我是一个具有 “Mario走”和“Mario跳”特质的“Mario”(感觉好拗口)。此外我们还用到隐式来为将这些特质的实例注入到行为中去。关于隐式,这是另一个话题,在此我们不展开,现在我们只需要知道它为 Mario 对象提供了“依赖注入”就可以了。 让我们再来看一下它如何让我们具有构建不同的 Turtle 的能力 以上代码我们可以看到我们可以随意根据需要组合出只能飞的 Turtle,只能走的 Turtle 和即能飞也能走的 Turtle,随着我们提供的特质部件越来越多,我们甚至可以随时创造出能飞的、能游的……乌龟,并且这种能力不限于乌龟,我们也可以将这些部件用于不同的“妖怪”,使得它们也能具有不同的行为能力,甚至产生出组合妖怪,非常灵活。 从以上的例子我们可以看到当我们摆脱了继承的约束而实现了混入后,我们获得新对象的能力不仅没有削弱,反而更加强大,甚至我们还依然可以为方法提供依赖注入。这给予了我们更大的设计自由度。
信号量 Semaphore 数据类型,它允许通过 withPermit 方法在纤程之间进行同步,该方法可以安全地获取和释放许可证。信号量是基于 Ref[A] 数据类型的。 操作 例如,异步任务可以通过获取和释放具有给定数量许可的信号量来完成彼此的同步。当信号量中的许可值不足,获取操作无法执行时,该任务将在纤程队列中被置于挂起状态,直到有足够的许可值时被唤醒: (以上)二值信号量只是一种特殊的信号量。我们可以要求获取和释放任意指定数量的信号量: withPermit(及其对应的计数版本 withPermits)的保证是,无论任务是成功,失败还是被中断,许可证在被成功获取之后都会被释放。
Schedules 允许你定义和编写灵活的重复执行调度器,这些事件可以是重复的计算,或在出现错误时重试操作。Schedules 被用于以下场景: 重复 IO#repeat —— 重复执行 effect,直到调度计划结束。 IO#repeatOrElse —— 重复执行 effect,直到调度计划完成,如果得到错误,则返回另一个 effect 的结果。 IO#repeatOrElse0 —— 重复执行 effect,直到调度计划完成,如果得到错误,则返回另一个具有更多(定制)能力的 effect 的结果。 Retries IO#retry —— 重试一个 effect 直到成功。 IO#retryOrElse —— 运行一个 effect,如果失败则尝试另一个 effect,不断重试两者直到成功。 IO#retryOrElse0 —— 运行一个 effect,如果失败则尝试另一个具有更多(定制)能力的 effect,不断重试两者直到成功。 Schedules 定义了有状态的,可能有效果的事件的重复执行调度计划,并以允许以多种方式进行组合。 一个 Schedule[R, A, B] 它的输入类型为 A(A 在 retry 情况下为(前一调用的)错误类型,或在 repeat 情况下为(前一调用的)输出类型),并根据这些值和内部状态决定是重复执行还是返回结果。每个决策都会伴随(可能为零的)延迟,该延迟指示下一次重复发生之前需要多少时间停顿,并最终得到 B 类型输出值。 基本的 Schedules 一个永远循环的 Schedule: 一个循环 10 次的 Schedule: 一个每 10 毫秒循环一次的 Schedule: (缺省为平方)指数延迟的 Scheduler 以 fibonacci 计数(每次延迟是前两次延迟之和)的延迟: Schedule 组合器 给 Schedule 附加一个随机修正值。 修改 schedule 的延迟间隔: 依次串行组合两个调度器,先遵循第一个策略直到结束,然后遵循第二个策略直到结束: 合并两个调度器(取交集),仅在两个调度器都得到满足时才以两者之间的最大延迟重复执行调度: 合并两个调度器(取并集),如果两个调度计划中的任何一个想要重复执行,则使用两次重复之间的最小延迟来重复: 经过指定的时间后,停止重试: 仅在发生特定异常时重试:
Ref[A] 是对 A 类型的可变引用值的建模。它有两个基本操作:set,向 Ref 中填入新值;get,从中获得当前的值。Ref 上所有的操作都是原子且线程安全的,这为同步并发程序提供了可靠的基础。 更新一个 Ref 使用 Ref 最简单的手段是使用 update 或它更强大的好兄弟 modify。通过这些我们可以轻易地写一个像这样的 repeat 组合器。 状态转换器 可变量的阴暗面在于它们能够轻易地被改变;它们可以像圣诞节装饰品一样添加在任何地方并改变状态。比如: 作为函数式程序员,我们相当了解如何对付它。我们可以 S =>(A, S) 类型的函数形式来捕获状态的改变。Ref 提供了这样的编码能力,S 用来表达值的类型,并通过 modify 引介状态改变函数。 构建更复杂的并发原语 Ref 的级别足够低,以至于可以用作其他并发数据类型的基础。 信号量是一种经典的用于控制对共享资源的访问的抽象数据类型。它的定义为一个三元组形式 S = (v, P, V),其中 v 是当前可用资源的单位数,P 和 V 分别是对 v 进行递减和递增的运算;P 只能在 v 是非负数的时候才能对它进行递减,否则必须等待直到非负为止。 现在,有了 Refs,我们可以很容易实现它!唯一的困难在于 P,我们必须在 v 为负数时,或在我们读取并试图改变它时,它恰好被(别的线程)改变了,这时让 P 失败并重试。 一个不成熟的实现看上去可以像下面这样: 现在让我们伴随前几天在市场上发现的这些鳄鱼皮靴子摇滚起来,在夜总会测试我们的信号灯吧,来吧! 同时不用说,您应该看一下 ZIO 内建的 Semaphore,它可以完成所有的这些甚至更多工作而不会浪费任何 CPU 周期。 多态的 Refs Ref[A] 实际上是类型 ZRef[Nothing, Nothing, A, A] 的别名。ZRef 的类型签名如下: ZRef 是对可变引用的多态的,纯函数的描述。它的基本操作包括 set 和 get。set 接受一个类型为 A 的值并将引用设置为该新值,这个操作可能以 EA 错误类型失败。get 获取并返回当前的 B 类型的引用值,或者它可能以 EB 错误类型失败。 当 ZRef 的错误和值类型统一时,即 ZRef[E, E, A, A]。ZRef 还支持如上所述的原子 modify 和 update 操作。 一个简单的用例是可以获得引用值的只读或只读视图:
Queue 是一个基于 ZIO 的轻量级的,驻扎在内存的队列,具有可组合性,并且支持透明的背压。它是完全异步(无锁或无阻塞)的,纯函数且类型安全的。 Queue[A] 包含的值的类型为 A,并且具有两个基本操作:offer,将 A 放入队列中,和 take, 删除并返回队列中最旧的值。 新建一个队列 一个 Queue 可以是有界的(容量有限)或无界的。 当队列已满时,有几种策略可用于处理新值: 对默认的有界(bounded)队列是反压:填满后,任何提供数据的纤程都将被挂起,直到队列能够添加新内容为止; 下降(dropping)队列当队列满时会丢弃新的数据; 当滑动(sliding)队列满时会丢弃最陈旧的数据。 建立一个支持背压的有界队列: 建立一个下降队列: 建立一个滑动队列: 建立一个无界队列: 向队列添加数据 向队列添加值的最简单方法是 offer: 使用背压队列时,如果队列已满,offer 可能会暂时挂起:您可以使用 fork 让其在其它纤程中等待。 还可以使用 offerAll 一次添加多个值: 从队列中消费数据 take 操作从队列中删除最旧的数据并返回它。如果队列为空,它将时挂起,直到新的数据添加到队列后才继续。与 offer 一样,您可以使用 fork 让其在其他纤程中等待新值。 您可以通过 poll 来消费最旧的数据。如果队列为空,你会得到 None,否则数据将被包装在 Some 返回。 可以使用 takeUpTo 一次消费多个数据。如果队列中没有足够的数据要退回,则它将返回所有数据,而无需等待更多 offer 。 同样,您可以使用 takeAll 一次获得所有数据。它也无需等待就返回(如果队列为空,则为空列表)。 关闭队列 可以通过 shutdown 来中断(interrupt exception)所有处于 offer* 或 take* 而挂起的纤程。它还将清空队列,并让所有未来的 offer* 和 take* 调用立刻结束(interrupt exception)。 您可以在队列关闭时使用 awaitShutdown 执行 effect。它将等待直到队列被关闭。如果队列已经关闭,它将立即返回。 转换队列Transforming queues 实际上,Queue[A] 是 ZQueue[Any, Any, Nothing, Nothing, A, A] 类型的别名。完整版本的签名为: 它的含义是: 这个队列接收 A 类型的值。需要 RA 类型的环境支持入队操作,并且可能会因EA 类型的错误而失败; 队列将产生 B 类型输出值。需要 RB 型环境支持出队操作,并可能因 EB 型错误而失败。 请注意,基本的 Queue[A] 不会失败也不需要任何环境来支持进行任何操作。 基于输入和输出的不同类型参数,可以实现各种不同的队列组合: ZQueue#map 可以将队列的输出映射(到不同类型): ZQueue#mapM 我们还可以使用一个效果(effectful)函数来映射输出。例如,我们可以为每个元素加上出列的时间戳: ZQueue#contramapM 与 mapM 相似,我们也可以在元素入列时将效果(effectful)函数应用于元素。此例将使用其入队时间戳注释元素: 此例中的队列与上一个例子中的队列具有相同的类型((Long, String)),但是当元素排队时,时间戳附加到元素上。这反映在排队入队所需的环境类型中。 一个完整的示例:我们可以将此队列(contramapM)与 mapM 结合起来,以计算元素保留在队列中的时长: ZQueue#bothWith 我们也可以将两个队列组合成一个队列,该(组合)队列广播 offer (到两个队列中)并(同时)从两个队列中接收(take)消息: 附加资料 ZIO Queue Talk by John De Goes @ ScalaWave 2018 ZIO Queue Talk by Wiem Zine El Abidine @ PSUG 2018 Elevator Control System using ZIO Scalaz 8 IO vs Akka (typed) actors vs Monix
Promise[E, A] 是只能被设置一次的 IO[E, A] 类型的变量。 Promise 用于构建更高级别的并发原语,通常用于需要多个纤程相互协调传递值的情况。 建立 可以使用 Promise.make[E, A] 来创建 Promise,它返回 UIO[Promise[E, A]]。这是对创建 Promise 的描述,而不是实际的 Promise。不能在 IO 外部创建Promise,因为创建它们涉及分配可变内存,这是一种 effect,必须安全地在 IO 中捕获。 运算 完成 您可以通过几种不同的方式完成 Promise[E, A]: 使用 succeed 成功地设置 A 类型的值。 用 done 设置 Exit[E, A] —— await 将得到该退出值。 用 complete 获取 IO[E, A] effect 的结果 —— effect 将会被(而且只会被)执行一次,其结果会被所有等待该 promise 的纤程得到。 用 completeWith 将 effect IO[E, A] 设置为结果 —— 第一个调用 completeWith 的纤程赢得设置该 effect 的权限,并且而该 effect 在 promise 每次被 await 时(才)都会被求值,因此要小心使用completeWith(someEffect),尽可能使用 complete(someEffect) 除非确实需要让每个线程在 await 的时候执行效果。 用 fail 设置 E 类型失败。 通过 die 让 promise 抛出 Throwable 异常 通过 halt 为 promise 设置 Cause[E] 失败或异常 使用 interrupt 终止 promise 以下示例显示了所有这些的用法: 完成 Promise 的操作后会产生一个UIO[Boolean],其中 Boolean 表示已设置成功(true)还是已经被设置(false)。如下所示: 另一个关于 fail(…) 的例子: 再次重申,布尔值告诉我们操作是否成功(true),即成功设置了 Promise 的值或已经被设置而导致错误。 等待 您可以使用 await 从 Promise 中获取值,调用的纤程将被挂起直到 Promise 完成。 轮询 计算将暂停(以非阻塞方式),直到 Promise 出现一个值或错误为止。如果您不想暂停,而只想查询 Promise 是否已完成的状态,则可以使用 poll: 如果调用 poll 时 Promise 未完成,则 IO 将以返回 Unit 值的方式宣告失败,否则将获得 IO[E, A],其中 E 表示 Promise 完成但有错误,而 A 表示 Promise 成功完成返回一个 A 值。 isDone 返回 UIO[Boolean],如果 Promise 已经完成,则评估为 true。 使用案例 这是一个如何在两个纤程之间使用 Promise 交换数据的案例: 在上面的示例中,我们创建了一个 Promise 并让一个 Fiber(fiberA)在1秒后完成该 Promise,第二个 Fiber(fiberB)在该 Promise 上调用 await 以获取字符串,然后将其打印到屏幕上。该示例在 1 秒后将 hello world 打印到了屏幕上。请记住,这只是程序的描述,而不是执行本身。