ZIO 与外部 Java 代码具有完全的互操作性。让我向您展示它的工作原理,然后讲解第一个案例,明天您就可以在工作中使用纯函数式 Java 了。
From Java CompletionStage and back
CompletionStage
是(Java 提供的)最便捷的用于模拟函数式异步效果的 API(例如 ZIO)的接口,因此我们从它开始。轻而易举地:
def loggedStage[A](stage: => CompletionStage[A]): Task[A] =
ZIO.fromCompletionStage(UIO {
stage.thenApplyAsync { a =>
println("Stage completed with " + a)
a
}
})
您甚至可以将其变成纤程!
def stageToFiber[A](stage: => CompletionStage[A]): Fiber[Throwable, A] =
Fiber.fromCompletionStage(future)
该 API 创建了一个不绑定具体对象的纤程。
此外,您如果希望将 ZIO 值转换为 CompletionStage
也易如反掌:
def taskToStage[A](task: Task[A]): UIO[CompletableFuture[A]] =
task.toCompletableFuture
正如您所看到,它返回 CompletionStage
接口的具体类,即CompletableFuture
。需要指出的是,只要可以将类型 E
的值转换为Throwable
,那么任何 IO[E, A]
都可以变成 CompletableFuture
:
def ioToStage[E, A](io: IO[E, A])(toThrowable: E => Throwable): UIO[CompletableFuture[A]] =
io.toCompletableFutureWith(toThrowable)
Java 的 Future 类
您可以通过 ZIO.fromFutureJava
将任何 java.util.concurrent.Future
嵌入 ZIO 计算中。一个简单的 Apache Async HTTP 客户端的例子看起来如下:
def execute(client: HttpAsyncClient, request: HttpUriRequest): RIO[Blocking, HttpResponse] =
ZIO.fromFutureJava(UIO {
client.execute(request, null)
})
就这么简单。请注意,从对产出值的签名中可以看出,ZIO 在内部使用了阻塞Future#get
调用。显然,它被运行在阻塞线程池上,我想您应该清楚地知道。如果可能的话,请如上所述使用 ZIO.fromCompletionStage
。
如果您需要,也可以使用 Fiber.fromFutureJava
将它转换为纤程。类似又有差别:
def execute(client: HttpAsyncClient, request: HttpUriRequest): Fiber[Throwable, HttpResponse] =
Fiber.fromFutureJava {
client.execute(request, null)
}
NIO 完成具柄
通过提供完成处理具柄,Java 对使用 NIO API 的通讯通道执行异步处理,它通过将完成处理具柄挂接到可中断 I/O 中来实现。例如,读取文件的内容:
def readFile(file: AsynchronousFileChannel): Task[Chunk[Byte]] = for {
pos <- Ref.make(0)
buf <- ZIO.effectTotal(ByteBuffer.allocate(1024))
contents <- Ref.make[Chunk[Byte]](Chunk.empty)
def go = pos.get.flatMap { p =>
ZIO.effectAsyncWithCompletionHandler[Chunk[Byte]] { handler =>
file.read(buf, p, buf, handler)
}.flatMap {
case -1 => contents.get
case n =>
ZIO.effectTotal {
val arr = Array.ofDim[Byte](n)
buf.get(arr, 0, n)
buf.clear()
Chunk.fromArray(arr)
}.flatMap { slice =>
contents.update(_ ++ slice)
} *> pos.update(_ + n) *> go
}
}
dump <- go
} yield dump
如您所见,ZIO 在此处提供了 CPS 样式的 API,与上面的两个例子有所不同,但是仍然非常优雅。