Promise

Promise[E, A] 是只能被设置一次的 IO[E, A] 类型的变量。

Promise 用于构建更高级别的并发原语,通常用于需要多个纤程相互协调传递值的情况。

建立

可以使用 Promise.make[E, A] 来创建 Promise,它返回 UIO[Promise[E, A]]。这是对创建 Promise 的描述,而不是实际的 Promise。不能在 IO 外部创建Promise,因为创建它们涉及分配可变内存,这是一种 effect,必须安全地在 IO 中捕获。

运算

完成

您可以通过几种不同的方式完成 Promise[E, A]

  • 使用 succeed 成功地设置 A 类型的值。
  • done 设置 Exit[E, A] —— await 将得到该退出值。
  • complete 获取 IO[E, A] effect 的结果 —— effect 将会被(而且只会被)执行一次,其结果会被所有等待该 promise 的纤程得到。
  • 用 completeWith 将 effect IO[E, A] 设置为结果 —— 第一个调用 completeWith 的纤程赢得设置该 effect 的权限,并且而该 effect 在 promise 每次被 await 时(才)都会被求值,因此要小心使用completeWith(someEffect),尽可能使用 complete(someEffect) 除非确实需要让每个线程在 await 的时候执行效果。
  • fail 设置 E 类型失败。
  • 通过 die 让 promise 抛出 Throwable 异常
  • 通过 halt 为 promise 设置 Cause[E] 失败或异常
  • 使用 interrupt 终止 promise

以下示例显示了所有这些的用法:

import zio._

val race: IO[String, Int] = for {
    p     <- Promise.make[String, Int]
    _     <- p.succeed(1).fork
    _     <- p.complete(ZIO.succeed(2)).fork
    _     <- p.completeWith(ZIO.succeed(3)).fork
    _     <- p.done(Exit.succeed(4)).fork
    _     <- p.fail("5")
    _     <- p.halt(Cause.die(new Error("6")))
    _     <- p.die(new Error("7"))
    _     <- p.interrupt.fork
    value <- p.await
  } yield value

完成 Promise 的操作后会产生一个UIO[Boolean],其中 Boolean 表示已设置成功(true)还是已经被设置(false)。如下所示:

val ioPromise1: UIO[Promise[Exception, String]] = Promise.make[Exception, String]
val ioBooleanSucceeded: UIO[Boolean] = ioPromise1.flatMap(promise => promise.succeed("I'm done"))

另一个关于 fail(...) 的例子:

val ioPromise2: UIO[Promise[Exception, Nothing]] = Promise.make[Exception, Nothing]
val ioBooleanFailed: UIO[Boolean] = ioPromise2.flatMap(promise => promise.fail(new Exception("boom")))

再次重申,布尔值告诉我们操作是否成功(true),即成功设置了 Promise 的值或已经被设置而导致错误。

等待

您可以使用 await 从 Promise 中获取值,调用的纤程将被挂起直到 Promise 完成。

val ioPromise3: UIO[Promise[Exception, String]] = Promise.make[Exception, String]
val ioGet: IO[Exception, String] = ioPromise3.flatMap(promise => promise.await)

轮询

计算将暂停(以非阻塞方式),直到 Promise 出现一个值或错误为止。如果您不想暂停,而只想查询 Promise 是否已完成的状态,则可以使用 poll

val ioPromise4: UIO[Promise[Exception, String]] = Promise.make[Exception, String]
val ioIsItDone: UIO[Option[IO[Exception, String]]] = ioPromise4.flatMap(p => p.poll)
val ioIsItDone2: IO[Option[Nothing], IO[Exception, String]] = ioPromise4.flatMap(p => p.poll.get)

如果调用 poll 时 Promise 未完成,则 IO 将以返回 Unit 值的方式宣告失败,否则将获得 IO[E, A],其中 E 表示 Promise 完成但有错误,而 A 表示 Promise 成功完成返回一个 A 值。

isDone 返回 UIO[Boolean],如果 Promise 已经完成,则评估为 true

使用案例

这是一个如何在两个纤程之间使用 Promise 交换数据的案例:

import java.io.IOException
import zio.console._
import zio.duration._
import zio.clock._

val program: ZIO[Console with Clock, IOException, Unit] = 
  for {
    promise         <-  Promise.make[Nothing, String]
    sendHelloWorld  =   (IO.succeed("hello world") <* sleep(1.second)).flatMap(promise.succeed)
    getAndPrint     =   promise.await.flatMap(putStrLn(_))
    fiberA          <-  sendHelloWorld.fork
    fiberB          <-  getAndPrint.fork
    _               <-  (fiberA zip fiberB).join
    } yield ()

在上面的示例中,我们创建了一个 Promise 并让一个 Fiber(fiberA)在1秒后完成该 Promise,第二个 Fiber(fiberB)在该 Promise 上调用 await 以获取字符串,然后将其打印到屏幕上。该示例在 1 秒后将 hello world 打印到了屏幕上。请记住,这只是程序的描述,而不是执行本身。

Leave a Reply
Your email address will not be published.
*
*

BACK TO TOP