Promise[E, A]
是只能被设置一次的 IO[E, A]
类型的变量。
Promise 用于构建更高级别的并发原语,通常用于需要多个纤程相互协调传递值的情况。
建立
可以使用 Promise.make[E, A]
来创建 Promise,它返回 UIO[Promise[E, A]]
。这是对创建 Promise 的描述,而不是实际的 Promise。不能在 IO 外部创建Promise,因为创建它们涉及分配可变内存,这是一种 effect,必须安全地在 IO 中捕获。
运算
完成
您可以通过几种不同的方式完成 Promise[E, A]
:
- 使用
succeed
成功地设置A
类型的值。 - 用
done
设置Exit[E, A]
——await
将得到该退出值。 - 用
complete
获取IO[E, A]
effect 的结果 —— effect 将会被(而且只会被)执行一次,其结果会被所有等待该 promise 的纤程得到。 - 用
completeWith
将 effectIO[E, A]
设置为结果 —— 第一个调用completeWith
的纤程赢得设置该 effect 的权限,并且而该 effect 在 promise 每次被 await 时(才)都会被求值,因此要小心使用completeWith(someEffect)
,尽可能使用complete(someEffect)
除非确实需要让每个线程在 await 的时候执行效果。 - 用
fail
设置 E 类型失败。 - 通过
die
让 promise 抛出Throwable
异常 - 通过
halt
为 promise 设置Cause[E]
失败或异常 - 使用
interrupt
终止 promise
以下示例显示了所有这些的用法:
import zio._
val race: IO[String, Int] = for {
p <- Promise.make[String, Int]
_ <- p.succeed(1).fork
_ <- p.complete(ZIO.succeed(2)).fork
_ <- p.completeWith(ZIO.succeed(3)).fork
_ <- p.done(Exit.succeed(4)).fork
_ <- p.fail("5")
_ <- p.halt(Cause.die(new Error("6")))
_ <- p.die(new Error("7"))
_ <- p.interrupt.fork
value <- p.await
} yield value
完成 Promise 的操作后会产生一个UIO[Boolean],其中 Boolean 表示已设置成功(true)还是已经被设置(false)。如下所示:
val ioPromise1: UIO[Promise[Exception, String]] = Promise.make[Exception, String]
val ioBooleanSucceeded: UIO[Boolean] = ioPromise1.flatMap(promise => promise.succeed("I'm done"))
另一个关于 fail(...)
的例子:
val ioPromise2: UIO[Promise[Exception, Nothing]] = Promise.make[Exception, Nothing]
val ioBooleanFailed: UIO[Boolean] = ioPromise2.flatMap(promise => promise.fail(new Exception("boom")))
再次重申,布尔值告诉我们操作是否成功(true),即成功设置了 Promise 的值或已经被设置而导致错误。
等待
您可以使用 await
从 Promise 中获取值,调用的纤程将被挂起直到 Promise 完成。
val ioPromise3: UIO[Promise[Exception, String]] = Promise.make[Exception, String]
val ioGet: IO[Exception, String] = ioPromise3.flatMap(promise => promise.await)
轮询
计算将暂停(以非阻塞方式),直到 Promise 出现一个值或错误为止。如果您不想暂停,而只想查询 Promise 是否已完成的状态,则可以使用 poll
:
val ioPromise4: UIO[Promise[Exception, String]] = Promise.make[Exception, String]
val ioIsItDone: UIO[Option[IO[Exception, String]]] = ioPromise4.flatMap(p => p.poll)
val ioIsItDone2: IO[Option[Nothing], IO[Exception, String]] = ioPromise4.flatMap(p => p.poll.get)
如果调用 poll
时 Promise 未完成,则 IO 将以返回 Unit 值的方式宣告失败,否则将获得 IO[E, A]
,其中 E
表示 Promise 完成但有错误,而 A
表示 Promise 成功完成返回一个 A 值。
isDone
返回 UIO[Boolean]
,如果 Promise 已经完成,则评估为 true
。
使用案例
这是一个如何在两个纤程之间使用 Promise 交换数据的案例:
import java.io.IOException
import zio.console._
import zio.duration._
import zio.clock._
val program: ZIO[Console with Clock, IOException, Unit] =
for {
promise <- Promise.make[Nothing, String]
sendHelloWorld = (IO.succeed("hello world") <* sleep(1.second)).flatMap(promise.succeed)
getAndPrint = promise.await.flatMap(putStrLn(_))
fiberA <- sendHelloWorld.fork
fiberB <- getAndPrint.fork
_ <- (fiberA zip fiberB).join
} yield ()
在上面的示例中,我们创建了一个 Promise 并让一个 Fiber(fiberA)在1秒后完成该 Promise,第二个 Fiber(fiberB)在该 Promise 上调用 await 以获取字符串,然后将其打印到屏幕上。该示例在 1 秒后将 hello world 打印到了屏幕上。请记住,这只是程序的描述,而不是执行本身。