Cats IO 是一种将 side effect 编码为纯值的数据类型,它能够描述同步和异步计算。
介绍
IO[A]
类型的值代表一种计算模型,在对它进行求值时,它将执行某种效果运算并返回 A
类型的值。
IO
的值是纯的,不变的,因此保留了引用透明性,因此被用于函数式编程。 IO
是一种数据结构,仅表示对副作用计算的描述。
IO
可以描述以下同步或异步计算:
- 只能得到唯一解。
- 可能以成功结束或以失败结束,并且在失败的情况下,
flatMap
链会被短路(IO 实现了MonadError
代数结构)。 - 可以取消,但请注意,此功能依赖用户提供取消逻辑。
由抽象的过程所描述的 effect 将不会被执行直到“世界的尽头”,具体地说,直到某个“unsafe”方法被调用为止。 效果的执行结果是不被记忆的,这意味着内存开销最小(并且没有泄漏),并且,单个 effect 可以被以“引用透明”的方式被多次执行。例如:
import cats.effect.IO
val ioa = IO { println("hey!") }
val program: IO[Unit] =
for {
_ <- ioa
_ <- ioa
} yield ()
program.unsafeRunSync()
//=> hey!
//=> hey!
()
上面的这个的例子,“hey!”被打印了两次,因为这个 effect 在“单子”链中被重复地执行。
引用透明和惰性求值
IO 可以“冻结” side effect,因为它是一种惰性求值的数据类型。请参考以下分类并反复与标准库中的“Future”的进行比较,来理解求值模型(在 Scala 语境中)的全貌。
Eager | Lazy | |
---|---|---|
Synchronous | A | () => A |
Eval[A] | ||
Asynchronous | (A => Unit) => Unit | () => (A => Unit) => Unit |
Future[A] | IO[A] |
通过与 Scala 的 Future
比较,IO
数据类型即使在处理副作用时也保留了引用透明性,并且它是惰性求值的。与像 Scala 这样的即时求值语言相比较,这是结果与产生结果的函数之间的区别。
与 Future
类似,通过 IO,您可以推断异步处理的结果,但是由于其纯性和惰性,可以将 IO 视为一个规范(直到“世界的尽头”才进行求值),从而可以对 IO 的求值模型施加更多的控制,并且更具可预测性。例如当组合多个 IO,或处理错误时,是以序列化的方式处理,还是以并行的方式处理。
注意惰性求值总是与引用透明性并存。参考以下示例:
for {
_ <- addToGauge(32)
_ <- addToGauge(32)
} yield ()
如果我们考虑引用透明性,则可以将该示例重写为:
val task = addToGauge(32)
for {
_ <- task
_ <- task
} yield ()
但是这不适用于 Future
,但适用于 IO
,此能力对于函数式编程至关重要。
堆栈安全
IO
在它的 flatMap
中是以 trampoline
的方式进行求值的,这意味着您可以在任意深度的递归函数中安全地调用 flatMap
,而不必担心会顶爆堆栈:
def fib(n: Int, a: Long = 0, b: Long = 1): IO[Long] =
IO(a + b).flatMap { b2 =>
if (n > 0)
fib(n - 1, b, b2)
else
IO.pure(a)
}
根据 IO
中实现的类型类的层次结构。除某些函数外,所有这里定义的操作都可用于 IO。
Effects 介绍
IO
是一种强大的抽象,它可以效果化地描述多种不同的 effect:
纯值 — IO.pure & IO.unit
在 IO 的伴随类中定义了以下函数,可以用于将纯值加载到 IO
中,从而生成出“已经求值”的 IO
值:
def pure[A](a: A): IO[A] = ???
请注意,给定的参数形式是值传递而不是按名(by-name)传递。
例如,我们可以将一个数字(纯值)放入 IO 中,然后将其安全地与另一个打包了side effect 的 IO 组合在一起,因为它们将不执行任何操作:
IO.pure(25).flatMap(n => IO(println(s"Number is: $n")))
显然,IO.pure
无法挂起副作用,因为当参数被以值传递给它时,IO.pure 是即时求值的,因此请不要这样做:
IO.pure(println("THIS IS WRONG!"))
在这种情况下,println
将触发副作用,该副作用不会在 IO 中被挂起,有鉴于此,这样的代码可能不是我们想要的。
IO.unit
是 IO.pure(())
的简化的别名,可在需要 IO[Unit]
值时重复使用,而无需担心触发任何其他副作用:
val unit: IO[Unit] = IO.pure(())
IO[Unit]
在 Scala 代码中使用的相当普遍,Unit
类型本身就意味着调用的返回有副作用,它可以作为 pure(())
的快捷方式,并且可以起到优化的用处,因为返回了相同的引用。
同步效果 — IO.apply
它可能是最常用的构建器,等效于 Sync[IO].delay
,描述了可以在当前线程和调用堆栈上立即做求值的 IO
操作:
def apply[A](body: => A): IO[A] = ???
请注意,给定的参数是通过“by-name”传递的,其执行被“暂停”在 IO 上下文中。
这个示例是在 JVM 之上使用阻塞 I/O 从控制台读取和写入信息:
def putStrLn(value: String) = IO(println(value))
val readLn = IO(scala.io.StdIn.readLine())
然后,我们可以以纯函数的方式使用它们来对与控制台的交互进行建模:
for {
_ <- putStrLn("What's your name?")
n <- readLn
_ <- putStrLn(s"Hello, $n!")
} yield ()
异步 Effects — IO.async & IO.cancelable
IO可以通过 IO.async
和 IO.cancelable
构建器描述异步过程。
IO.async 是基于 Async#async
(请参阅 Async))的定义的操作,可以用于描述不可取消的简单异步过程,其函数签名为:
def async[A](k: (Either[Throwable, A] => Unit) => Unit): IO[A] = ???
提供的参数是一个可以用来注入处理成功(Right(a)
)的或失败(Left(error)
)结果的回调函数。用户可要求触发任何异步副作用,然后用注入的回调函数来处理结果。
例如以下简单的例子可用于转换 Scala 的 Future
的代码。虽然您并不需要像这样转换,因为我们已经在 IO.fromFuture
中定义了转换操作:
import scala.concurrent.{Future, ExecutionContext}
import scala.util.{Success, Failure}
def convert[A](fa: => Future[A])(implicit ec: ExecutionContext): IO[A] =
IO.async { cb =>
// This triggers evaluation of the by-name param and of onComplete,
// so it's OK to have side effects in this callback
fa.onComplete {
case Success(a) => cb(Right(a))
case Failure(e) => cb(Left(e))
}
}
可撤销的处理
对于构建可取消的 IO 任务,您需要使用 IO.cancelable
构建器,该构建器符合Concurrent#cancelable(请参见 Concurrent)的定义,具有以下签名:
def cancelable[A](k: (Either[Throwable, A] => Unit) => IO[Unit]): IO[A] = ???
因此它与 IO.async
相似,但是用户期待它的注册函数的返回类型为 IO[Unit]
的,其中包含取消运算所需的逻辑 。
重要提示:取消是在 IO 完成任务之前终止执行的能力,为释放任何获得的资源提供可能性,这在竞争条件下很有用,可防止泄漏。
作为示例,我们要实现一个基于 Java 的 ScheduledExecutorService
的定期休眠(sleep)程序为例:
import java.util.concurrent.ScheduledExecutorService
import scala.concurrent.duration._
def delayedTick(d: FiniteDuration)
(implicit sc: ScheduledExecutorService): IO[Unit] = {
IO.cancelable { cb =>
val r = new Runnable { def run() = cb(Right(())) }
val f = sc.schedule(r, d.length, d.unit)
// 返回需要被提前取消并释放资源的 cancellation token
IO(f.cancel(false)).void
}
}
注意,这个延时滴答功能(需要 Timer 支持)已经通过 IO.sleep
提供了,所以你不需要自己去编写。
更多有关 ‘‘cancellation’’ 的细节请参考下面。
IO.never
IO.never
表示一个由 async
定义的,不会结束
的 IO,可重复使用:
val never: IO[Nothing] = IO.async(_ => ())
在某些情况下不结束运算是很有用的,例如比赛条件。例如,给定 IO.race
,我们有以下运算:
IO.race(lh, IO.never) <-> lh.map(Left(_))
IO.race(IO.never, rh) <-> rh.map(Right(_))
推迟计算 — IO.suspend
IO.suspend
构建器等效于:
IO.suspend(f) <-> IO(f).flatten
因此它对于暂停 effect 很有用,但这会推迟返回IO的完成结果。这对于建模堆栈安全的尾递归循环很有用:
import cats.effect.IO
def fib(n: Int, a: Long, b: Long): IO[Long] =
IO.suspend {
if (n > 0)
fib(n - 1, b, a + b)
else
IO.pure(a)
}
通常,像这样的函数最终会在 JVM 顶部产生堆栈溢出错误。通过使用 IO.suspend
并使用 IO 循环执行所有这些运算,它的每个求值循环是惰性的,并且使用恒定大小的内存。当然,您也可以使用 flatMap
,在这个示例中 suspend
显得得更好些。
我们可以使用 Scala 的 @tailrec
机制来标注此功能,同时通过使用 IO,我们还可以通过插入异步边界来保持(计算资源使用的)公平性:
import cats.effect._
def fib(n: Int, a: Long, b: Long)(implicit cs: ContextShift[IO]): IO[Long] =
IO.suspend {
if (n == 0) IO.pure(a) else {
val next = fib(n - 1, b, a + b)
// Every 100 cycles, introduce a logical thread fork
if (n % 100 == 0)
cs.shift *> next
else
next
}
}
这里我们还有比 @tailrec 循环更有趣的东西。可以看出,IO 允许对求值进行非常精确的控制。
并发与取消
IO 可以描述可中断的异步过程。作为实施细节:
- 并非所有的任务都可以被取消。取消状态只会在到达异步边界之后被检查。可以通过以下方式实现:
- 使用
IO.cancelable
,IO.async
,IO.asyncF
orIO.bracket
来构建。 - 使用
IO.cancelBoundary
或IO.shift
- 使用
请注意,第二点是第一点的结果。包括但不限于 Mvar.take
,Mvar.put
和Deferred.get
这些阻塞操作都可以被取消。
我们还应注意,仅当 FlatMap
调用链到达异步边界之后才可以取消。在每 N
个 FlatMap
的异步调用边界之后执行取消检查。 N
的值被硬编码为 512
。
例子如下:
import cats.effect.{ContextShift, IO}
import scala.concurrent.ExecutionContext
implicit val contextShift: ContextShift[IO] = IO.contextShift(ExecutionContext.global)
def retryUntilRight[A, B](io: IO[Either[A, B]]): IO[B] = {
io.flatMap {
case Right(b) => IO.pure(b)
case Left(_) => retryUntilRight(io)
}
}
// 不可取消的无终止计算
val notCancelable: IO[Int] = retryUntilRight(IO(Left(0)))
// 可取消的无终止计算,IO.shift 在 `flatMap` 之前建立了异步边界。
val cancelable: IO[Int] = IO.shift *> retryUntilRight(IO(Left(0)))
- 可取消的IO任务通常通过 cancel 终止。
这对于来自 Java 的人们来说这可能是一个困惑点,他们可能会期盼通过 Thread.interrupt
或旧的过时的 Thread.stop
:
IO 的取消不这样工作,因为 Java 的线程中断本质上是不安全
,不可靠
且不可移植
的!
接下来的小节将更详细地介绍与取消有关的操作。
构建可取消的 IO 任务
可取消的 IO 任务可以通过 IO.cancelable
构建器来描述。就像之前已经给出的的使用 Java 的 ScheduledExecutorService
来实现的例子 delayedTick
, 让我们回顾以下:
import java.util.concurrent.ScheduledExecutorService
import cats.effect.IO
import scala.concurrent.duration.FiniteDuration
def sleep(d: FiniteDuration)
(implicit sc: ScheduledExecutorService): IO[Unit] = {
IO.cancelable { cb =>
val r = new Runnable { def run() = cb(Right(())) }
val f = sc.schedule(r, d.length, d.unit)
// 返回一个可以让我们取消执行计划的函数
IO(f.cancel(false)).void
}
}
重点:如果您未为任务指定取消逻辑,则该任务是不可取消的。所以,例如使用Java 的阻塞 I/O:
import java.io._
import cats.effect.IO
import scala.concurrent.ExecutionContext
import scala.util.control.NonFatal
def unsafeFileToString(file: File) = {
// 让人抓狂的 Java :-)
val in = new BufferedReader(
new InputStreamReader(new FileInputStream(file), "utf-8"))
try {
// 不可取消的循环
val sb = new StringBuilder()
var hasNext = true
while (hasNext) {
hasNext = false
val line = in.readLine()
if (line != null) {
hasNext = true
sb.append(line)
}
}
sb.toString
} finally {
in.close()
}
}
def readFile(file: File)(implicit ec: ExecutionContext) =
IO.async[String] { cb =>
ec.execute(() => {
try {
// 结束信号
cb(Right(unsafeFileToString(file)))
} catch {
case NonFatal(e) =>
cb(Left(e))
}
})
}
显然这是无法取消的,IO 的实现并无使该循环取消的魔力。不,我们不该使用Java 的 Thread.interrupt
,因为那将是不安全和不可靠的,并且无论如何, IO 都应该保持它能够在平台之间移植。
对于(如何安全地取消)这件事来讲有很大的灵活性,包括在这里,我们可以简单地引入一个可设置为 false 的变量,以便在 while
循环中检查它:
import java.io.File
import java.util.concurrent.atomic.AtomicBoolean
import cats.effect.IO
import scala.concurrent.ExecutionContext
import scala.io.Source
import scala.util.control.NonFatal
def unsafeFileToString(file: File, isActive: AtomicBoolean) = {
val sc = new StringBuilder
val linesIterator = Source.fromFile(file).getLines()
var hasNext = true
while (hasNext && isActive.get) {
sc.append(linesIterator.next())
hasNext = linesIterator.hasNext
}
sc.toString
}
def readFile(file: File)(implicit ec: ExecutionContext) =
IO.cancelable[String] { cb =>
val isActive = new AtomicBoolean(true)
ec.execute(() => {
try {
// 结束信号
cb(Right(unsafeFileToString(file, isActive)))
} catch {
case NonFatal(e) =>
cb(Left(e))
}
})
// 当取消时设置它
IO(isActive.set(false)).void
}
小心: 取消,是一个并发动作!
有些问题并非总是显而易见的,不是从上面的示例中得出的,但是您可能会想这样做:
import java.io._
import cats.effect.IO
import scala.concurrent.ExecutionContext
import scala.util.control.NonFatal
def readLine(in: BufferedReader)(implicit ec: ExecutionContext) =
IO.cancelable[String] { cb =>
ec.execute(() => cb(
try Right(in.readLine())
catch { case NonFatal(e) => Left(e) }))
// 取消逻辑不是线程安全的!
IO(in.close()).void
}
这样的操作在通过 IO(FS2,Monix 之类的 stream 库)的抽象流,流化地处理 IO 块时可能很有用。
但是它所描述的操作并不正确,因为 in.close()
是并发于 in.readLine()
的,这可能导致引发异常,并且在许多情况下可能导致数据损毁。这是绝对禁止的。我们想中断 IO 所做的一切,但不以破坏数据为代价。
因此,用户需要处理线程安全问题。以下这是一种实现方法:
import java.io._
import java.util.concurrent.atomic.AtomicBoolean
import cats.effect.IO
import scala.util.control.NonFatal
import scala.concurrent.ExecutionContext
def readLine(in: BufferedReader)(implicit ec: ExecutionContext) =
IO.cancelable[String] { cb =>
val isActive = new AtomicBoolean(true)
ec.execute { () =>
if (isActive.getAndSet(false)) {
try cb(Right(in.readLine()))
catch { case NonFatal(e) => cb(Left(e)) }
}
// 注意这路并没有 else 子句;如果取消被执行,我们不调用回调函数,任务将永不终结。;-)
}
// 取消逻辑
IO {
// 线程安全
if (isActive.getAndSet(false))
in.close()
}.void
}
在这个例子中,取消逻辑本身调用了 in.close()
,但由于我们使用了原子对象的 getAndSet
创建了线程安全防护措施,因此调用是安全的。
这里使用了 AtomicBoolean 来确保线程安全,也不必回避通过 synchronize 块或任何 JVM 内部提供的其它并发原语来建立锁/互斥体,这些功能对这些副作用函数而言是需要的。不用担心,这通常仅在 IO.cancelable
,IO.async
或 IO.apply
中才需要,因为这些构建器代表了与不纯净的世界互动的交互界面,正是因为这些暗面(起到隔离作用),才保证一旦您处于 IO 的上下文中,您就可以使用更高级的方法来撰写并发任务。
不幸的是,共享内存并发既是使用内核线程的福音也是诅咒。在 JavaScript 之类的 N:1 平台上,这不是一个大问题,因为(在那里)您不会获得进程内的 CPU 并行性。这就是现实,这是经过权衡后巨大的退让。
启动并发 + 取消
您可以将 IO 当作绿色线程看待,可以通过基于 Concurrent#start
的 IO#start
来执行 “fork
”操作。它基于以下签名:
def start: IO[Fiber[IO, A]]
返回的是一个纤程。你可以认为纤程是轻量级纤程,一个纤程是可以(通过join)连接或(通过cancel)取消的,纯净的和轻量的线程等效物。
例如:
import cats.effect.{ContextShift, IO}
import scala.concurrent.ExecutionContext
// 被用于 IO.start 来启动逻辑线程分支
implicit val cs: ContextShift[IO] = IO.contextShift(ExecutionContext.global)
val launchMissiles: IO[Unit] = IO.raiseError(new Exception("boom!"))
val runToBunker = IO(println("To the bunker!!!"))
for {
fiber <- launchMissiles.start
_ <- runToBunker.handleErrorWith { error =>
// 撤退, 取消发射!(也许我们应该在发射之前撤退到掩体里?)
fiber.cancel *> IO.raiseError(error)
}
aftermath <- fiber.join
} yield aftermath
实现说明:
*>
操作定义在 Cats 中,你可以将它看做是以下表达式的别名:lh.flatMap(_ => rh)
runCancelable & unsafeRunCancelable
以上是纯的 cancel
操作,可以通过线程来访问。第二种访问并取消任务的方法是通过 runCancelable
(纯函数方式)和 unsafeRunCancelable
(非安全方式)。
依赖于副作用的 unsafeRunCancelable
的示例,请注意,这种代码不纯,应谨慎使用:
import cats.effect.IO
import scala.concurrent.ExecutionContext
import scala.concurrent.duration._
// Needed for `sleep`
implicit val timer = IO.timer(ExecutionContext.global)
// 延迟打印
val io: IO[Unit] = IO.sleep(10.seconds) *> IO(println("Hello!"))
val cancel: IO[Unit] =
io.unsafeRunCancelable(r => println(s"Done: $r"))
// ... if a race condition happens, we can cancel it,
// thus canceling the scheduling of `IO.sleep`
cancel.unsafeRunSync()
作为顶替选项的 runCancelable
,它符合 ConcurrentEffect 定义的法则。基于同样的道理,SyncIO
会将执行暂时挂起(直到世界的尽头):
import cats.effect.SyncIO
import cats.syntax.flatMap._
val pureResult: SyncIO[IO[Unit]] = io.runCancelable { r =>
IO(println(s"Done: $r"))
}
// On evaluation, this will first execute the source, then it
// will cancel it, because it makes perfect sense :-)
pureResult.toIO.flatten
不可取消标记
给定一个可取消的 IO,我们可以将其变成无法取消的 IO:
import cats.effect.IO
import scala.concurrent.ExecutionContext
import scala.concurrent.duration._
// Needed for `sleep`
implicit val timer = IO.timer(ExecutionContext.global)
// Our reference from above
val io: IO[Unit] = IO.sleep(10.seconds) *> IO(println("Hello!"))
// 这个 IO 将不可取消,哪怕你去尝试
io.uncancelable
有时您需要确保 IO 的执行是原子的,换句话说,要么全部执行,要么都不执行。这就是此(不可取消)操作的作用。—— 可取消的 IO 从定义上来说并不是原子的,在某些情况下,我们需要使它们成为原子的。
它符合 Concurrent#uncancelable
(see Concurrent) 中定义的法则。
IO.cancelBoundary
将一个可取消边界返回给一个 IO 任务,供其在循环中检查其取消状态,当发现取消状态被置位时取消绑定的程序。
这个操作非常类似 IO.shift
,因为它也可以被用于 flatMap
调用链以便取消一长串调用:
import cats.effect.IO
def fib(n: Int, a: Long, b: Long): IO[Long] =
IO.suspend {
if (n <= 0) IO.pure(a) else {
val next = fib(n - 1, b, a + b)
// 每一百次循环检查一次状态
if (n % 100 == 0)
IO.cancelBoundary *> next
else
next
}
}
如本节开头所述,需要明确地管理资源分配公平性,该协议易于遵循并且可以被以所见即所得的方式预测。
与 IO.shift 比较
IO.cancelBoundary
本质上是 IO.shift
的轻量版本,它没有转移到不同的线程池的能力。它避免了对逻辑的 fork, 因此从某种意义上讲它很轻便。
竞争执行的条件 — race & racePair
“竞赛条件”指的是一种逻辑,它在两个或多个任务之间产生竞赛,胜利者会立即返回信号,而失败者则通常会被取消。
IO 为参与竞赛的同伴提供两种操作:
// 简单模式
def race[A, B](lh: IO[A], rh: IO[B])
(implicit cs: ContextShift[IO]): IO[Either[A, B]]
// 高级模式
def racePair[A, B](lh: IO[A], rh: IO[B])
(implicit cs: ContextShift[IO]): IO[Either[(A, Fiber[IO, B]), (Fiber[IO, A], B)]]
简单的版本中 IO.race
将立即取消失败者,而第二个版本为您提供一个线程,让您决定下一步要做什么。
因此 race
可以像这样从 racePair
中派生出来:
import cats.effect.{ContextShift, IO}
def race[A, B](lh: IO[A], rh: IO[B])
(implicit cs: ContextShift[IO]): IO[Either[A, B]] = {
IO.racePair(lh, rh).flatMap {
case Left((a, fiber)) =>
fiber.cancel.map(_ => Left(a))
case Right((fiber, b)) =>
fiber.cancel.map(_ => Right(b))
}
}
我们可以使用 race
实现一个 “timeout” 运算:
import cats.effect.{ContextShift, Timer, IO}
import scala.concurrent.CancellationException
import scala.concurrent.duration.FiniteDuration
def timeoutTo[A](fa: IO[A], after: FiniteDuration, fallback: IO[A])
(implicit timer: Timer[IO], cs: ContextShift[IO]): IO[A] = {
IO.race(fa, timer.sleep(after)).flatMap {
case Left(a) => IO.pure(a)
case Right(_) => fallback
}
}
def timeout[A](fa: IO[A], after: FiniteDuration)
(implicit timer: Timer[IO], cs: ContextShift[IO]): IO[A] = {
val error = new CancellationException(after.toString)
timeoutTo(fa, after, IO.raiseError(error))
}
有关如何获得一个 Timer[IO]
参考并行
章节
与 Haskell 的“异步中断”比较
Haskell 将中断称为“异步异常”,通过从另一个线程(并发地)抛出异常来中断正在运行的任务。
而对于 cats.effect 的 “cancel
”操作来说,它只是执行您在 IO.cancelable
构建器中指定的任何内容。取决于 IO.cancelable
任务的实现,它有可能无止境地执行下去。如果我们使用一个不纯的签名来描述这个取消操作的话,将是这样的:
() => Unit
与 Haskell(也许还有即将到来的 Scalaz 8 IO
)相比较,它在中断中发送一个 Throwable 给任务,目标任务则以这个 Throwable 结束任务。它的不纯签名如下:
Throwable => Unit
Throwable => Unit
允许任务的逻辑知道取消的原因,但是取消是为了切断与生产者的连接,并尽快关闭所有资源,因为由于某些竞争条件,您不再对结果感兴趣。
并且 Throwable => Unit
范围太宽泛,这也有点令人困惑。可能会诱使用户通过此通道将消息发送回生产者,以便对其进行控制以更改输出结果 —— 但是取消就是取消,我们这样做是为了释放资源,并且竞争条件的实现将关闭连接,不允许取消的任务向下游再发送任何内容。
因此,这会使用户感到困惑,并且唯一的实际用途是根据收到的错误以不同的方式释放资源。但是,考虑到复杂性的增加,这不是值得追求的用例。
资源的安全获取与释放
现状
在主流命令式语言中,您通常可以 try/finally
声明来获取和安全地释放资源。模式如下:
import java.io._
def javaReadFirstLine(file: File): String = {
val in = new BufferedReader(new FileReader(file))
try {
in.readLine()
} finally {
in.close()
}
}
它的确存在以下类似的问题:
- 该语句显然是用于副作用计算的,在 FP 抽象中不能使用。
- 它仅用于同步执行,因此在处理具有异步性的抽象(例如 IO,Task,Future)时,我们无法使用它。
- 不管异常类型如何,
finally
都会执行。因此,如果遇到内存不足错误,它仍然会尝试关闭文件句柄,从而不必要地延迟了进程崩溃。 - 如果
try
主体抛出异常,然后finally
主体也抛出异常,则finally
异常会被重新抛出,从而隐藏了原始问题。
bracket
通过 bracket
操作,我们可以轻松地重述上述内容:
import java.io._
import cats.effect.IO
def readFirstLine(file: File): IO[String] =
IO(new BufferedReader(new FileReader(file))).bracket { in =>
// 使用 (the try block)
IO(in.readLine())
} { in =>
// 释放 reader (the finally block)
IO(in.close()).void
}
要点:
- 它是纯的,因此可以用于 FP。
- 它适用于异步 IO 运算。
- 无论
use
动作的退出状态如何,都会发生release
动作,因此它伴随执行于“成功”,“抛出错误”或“取消”等行为。 - 如果 use 动作引发错误,然后 release 动作也引发错误,则报告的错误将是 use 的错误,而 release 引发的错误将(通过
System.err
)被记录。
需要特别注意的是,bracket 在任务被取消时也会调用 release 动作。考虑以下示例:
import java.io._
import cats.effect.{ContextShift, IO}
import scala.concurrent.ExecutionContext
implicit val contextShift: ContextShift[IO] = IO.contextShift(ExecutionContext.global)
def readFile(file: File): IO[String] = {
// 在异步边界之后打开文件,以确保该处理不会阻塞当前线程
val acquire = IO.shift *> IO(new BufferedReader(new FileReader(file)))
acquire.bracket { in =>
// 使用 (the try block)
IO {
// 警告!难看的低阶 Java 代码。
val content = new StringBuilder()
var line: String = null
do {
line = in.readLine()
if (line != null) content.append(line)
} while (line != null)
content.toString()
}
} { in =>
// 释放 reader (the finally block)
// 这是有问题的代码,如果这段 IO 被取消,它可能导致数据损毁
IO(in.close()).void
}
}
这个循环可能很慢,我们可能正在处理一个大文件,正如 “并发与取消” 中所述的,取消是一项并发操作,不管使用中正在发生什么。
在这种情况下,在具有多线程功能的 JVM 上,与该循环同时调用 io.close()
可能导致数据损坏。为了防止这种情况,可能需要根据情况进行同步:
import java.io._
import cats.effect.{ContextShift, IO}
import scala.concurrent.ExecutionContext
implicit val contextShift: ContextShift[IO] = IO.contextShift(ExecutionContext.global)
def readFile(file: File): IO[String] = {
// 在打开文件之前建立异步边界,以确保操作不会阻塞当前线程
val acquire = IO.shift *> IO(new BufferedReader(new FileReader(file)))
// 挂起执行,因为我们将要修改一个共享变量。
IO.suspend {
// 共享取消标志
var isCanceled = false
acquire.bracket { in =>
IO {
val content = new StringBuilder()
var line: String = null
do {
// 同步访问 isCanceled 和 reader
line = in.synchronized {
if (!isCanceled)
in.readLine()
else
null
}
if (line != null) content.append(line)
} while (line != null)
content.toString()
}
} { in =>
IO {
// 同步访问 isCanceled 和 reader
in.synchronized {
isCanceled = true
in.close()
}
}.void
}
}
}
bracketCase
bracketCase
操作是一般化的的 bracket
,它的 release 会收到一个ExitCase
,以示区分对待以下情况:
- 任务成功结束
- 任务失败
- 任务被取消
用例:
import java.io.BufferedReader
import cats.effect.IO
import cats.effect.ExitCase.{Completed, Error, Canceled}
def readLine(in: BufferedReader): IO[String] =
IO.pure(in).bracketCase { in =>
IO(in.readLine())
} {
case (_, Completed | Error(_)) =>
// Do nothing
IO.unit
case (in, Canceled) =>
IO(in.close())
}
在以上示例中,我们仅在发生任务取消的情况下关闭传递来的资源。至于我们为什么这样做 —— 考虑到向我们提供了 BufferedReader 引用,通常,这种资源的生产者也应负责释放它。因此如果此函数在成功状态下也释放了 BufferedReader,那么这将是一个有缺陷的实现。
记住古老的 C++习惯用法“资源获取就是初始化(RAII)”,它说的是资源的生存期应该与其父资源的生存期联系在一起。
但是如果我们检测到取消,我们可能就需要(主动)关闭该资源,因为在取消事件中,此 IO 返回结果后我们已经没有了有效的“执行环路”,因此也就没有人可以去释放它。
转换
在 IO 伴随对象中定义了两个有用的操作,可以将 scala 的 Future
和 Either
提取到 IO 中。
fromFuture
构造一个用以评估给定的 Future
并计算出结果或失败的 IO 的定义如下:
import cats.effect.IO
import scala.concurrent.Future
def fromFuture[A](iof: IO[Future[A]]): IO[A] = ???
由于 Future
会立即地进行估值计算并具有记忆性,因此此函数将其作为 IO 的参数来使用,可以延迟进行估值。如果这种惰性能够被适当地传递到定义 Future
的地方,则可以确保由 IO 完全管理此计算,从而具有引用透明性。
惰性求值,等效于 by-name 参数:
import cats.effect.{ContextShift, IO}
import scala.concurrent.Future
import scala.concurrent.ExecutionContext
import ExecutionContext.Implicits.global
implicit val contextShift: ContextShift[IO] = IO.contextShift(ExecutionContext.global)
IO.fromFuture(IO {
Future(println("I come from the Future!"))
})
饥饿求值:
val f = Future.successful("I come from the Future!")
IO.fromFuture(IO.pure(f))
fromEither
将一个 [Throwable, A]
加载到 IO[A]
上下文中:
import cats.effect.IO
def fromEither[A](e: Either[Throwable, A]): IO[A] = e.fold(IO.raiseError, IO.pure)
错误处理
Cats Effect 中有一个 MonadError[IO, Throwable]
实例,所有错误处理都通过它来完成。这意味着您可以在 IO 中使用由 MonadError
提供的所有的所有操作,乃至于使用 ApplicativeError
提供的所有操作,只要错误类型为 Throwable
即可。诸如 raiseError
,attempt
,handleErrorWith
,recoverWith
等操作。只需确保将诸如 cats.implicits._ 等语法导入即可:
raiseError
构造一个 IO
,推升指定的异常到处理序列中。
import cats.effect.IO
val boom: IO[Unit] = IO.raiseError(new Exception("boom"))
boom.unsafeRunSync()
attempt
将处理序列中任何可能存在的异常,物化到可对其进行处理的值空间。这类似于 try/catch
中的 catch
子句,是 IO.raiseError
的反函数。例:
import cats.effect.IO
val boom: IO[Unit] = IO.raiseError(new Exception("boom"))
boom.attempt.unsafeRunSync()
参考 MonadError 类型类以获得更多信息。
案例: 指数式递归重试
使用 IO,您可以轻松地对循环进行建模,重新估值计算,直到满足成功或其他条件为止。
例如这个例子演示如何实现指数(函数参数)的重试:
import cats.effect.{IO, Timer}
import scala.concurrent.duration._
def retryWithBackoff[A](ioa: IO[A], initialDelay: FiniteDuration, maxRetries: Int)
(implicit timer: Timer[IO]): IO[A] = {
ioa.handleErrorWith { error =>
if (maxRetries > 0)
IO.sleep(initialDelay) *> retryWithBackoff(ioa, initialDelay * 2, maxRetries - 1)
else
IO.raiseError(error)
}
}
线程转移
IO 提供了一种函数转移
(shift
)的能力,使您可以更好地控制操作的执行。
shift
请注意 IO.shift
函数有 2 个重载:
- 一个以管理着线程池的 ContextShift 作为参数,以触发异步边界。
- 另一个则将 Scala 的
ExecutionContext
作为线程池。
默认情况下,请使用前者,而将后者仅用于需要对线程池进行细粒度控制的应用。
默认情况下,Cats Effect 提供了用于管理线程池的 ContextShift[IO]
实例,但前提是在当前范围内存在 ExecutionContext
或使用 IOApp
:
import cats.effect.{ContextShift, IO}
import scala.concurrent.ExecutionContext.Implicits.global
implicit val contextShift: ContextShift[IO] = IO.contextShift(global)
我们可以在执行某些任务之前在 FlatMap
链中先引入异步边界:
val task = IO(println("task"))
IO.shift(contextShift).flatMap(_ => task)
请注意,ContextShift
值是从上下文隐式中获取的,因此您也可以写成以下操作:
IO.shift.flatMap(_ => task)
或者使用 Cats 语法:
IO.shift *> task
// 等效于
implicitly[ContextShift[IO]].shift *> task
或者,我们也可以在评估某个任务之后再指定异步边界:
task.flatMap(a => IO.shift.map(_ => a))
或使用 Cats 语法:
task <* IO.shift
// 等效于
task <* implicitly[ContextShift[IO]].shift
一个完整一些的示例:
import java.util.concurrent.Executors
import cats.effect.IO
import scala.concurrent.ExecutionContext
val cachedThreadPool = Executors.newCachedThreadPool()
val BlockingFileIO = ExecutionContext.fromExecutor(cachedThreadPool)
implicit val Main = ExecutionContext.global
val ioa: IO[Unit] =
for {
_ <- IO(println("Enter your name: "))
_ <- IO.shift(BlockingFileIO)
name <- IO(scala.io.StdIn.readLine())
_ <- IO.shift(Main)
_ <- IO(println(s"Welcome $name!"))
_ <- IO(cachedThreadPool.shutdown())
} yield ()
我们首先要求用户输入其名称,然后我们将线程转移到 BlockingFileIO
执行上下文,因为我们认为接下去的操作会长时间阻塞该线程,而我们不希望在主线程中发生这种情况执行。在昂贵的 IO 操作(readLine
)返回响应后,我们将线程移回定义为隐式值的主执行上下文中,最后程序在控制台中显示一条消息并关闭线程池,以结束所有在主执行上下文中的操作。
shift
的另一个不太常见的应用是重置线程堆栈并将控制权交还给线程池。例如:
import cats.effect.{ContextShift, IO}
import scala.concurrent.ExecutionContext
implicit val contextShift: ContextShift[IO] = IO.contextShift(ExecutionContext.global)
lazy val doStuff = IO(println("stuff"))
lazy val repeat: IO[Unit] =
for {
_ <- doStuff
_ <- IO.shift
_ <- repeat
} yield ()
在此示例中,repeat
是一个运行时间很长的 IO(实际上是无限长的!),它将持续拥有底层线程资源直到永远。这可能会有问题,因此我们注入IO.shift
以将控制权重新切换回线程池,从而使其有机会重新安排执行时间并为别的代码提供更好的公平性。这种转移还会弹出线程堆栈,一直跳回到线程池,并有效地转移(trampolining
)剩余的计算。尽管线程迁移不是完全必要的,但在某些情况下可能有助于减轻主线程池的使用。
因此,此功能具有四个重要的用例:
- 将阻塞操作移出主计算池。
- 防御性地将异步计算重新转移回主计算池。
- 为了公平起见,将控制权交还给线程池。
- 防止构造不正确的异步调用导致的堆栈溢出。
IO 已针对所有同步和异步衔接进行了优化。这意味着您可以在任意深度的递归函数中安全地调用 flatMap
,而不必担心会摧毁堆栈。例如,您可以这样做:
import cats.effect.IO
def signal[A](a: A): IO[A] = IO.async(_(Right(a)))
def loop(n: Int): IO[Int] =
signal(n).flatMap { x =>
if (x > 0) loop(n - 1) else IO.pure(0)
}
并行
由于在 Cats 库及其 IO 实例中引入了并行类型类,因此可以并行执行两个或更多给定的 IO。
注意:所有并行操作都需要在范围内存在隐式 ContextShift[IO]
(请参见ContextShift)。可以通过以下方法为您的范围提供 ContextShift
属性:
- 使用 IOApp 来构建您的程序。
- 通过使用函数
IO.contextShift(executionContext)
来为用户自定义ContextShift
。
parMapN
以下例子可以并行运行任意数量的 IO,并允许您将一个函数应用于结果(如同 map
一样)。所有的 IO 都成功完成或失败后,它才完成处理:
import cats.effect.{ContextShift, IO}
import cats.implicits._
import scala.concurrent.ExecutionContext
implicit val contextShift: ContextShift[IO] = IO.contextShift(ExecutionContext.global)
val ioA = IO(println("Running ioA"))
val ioB = IO(println("Running ioB"))
val ioC = IO(println("Running ioC"))
// 确保在当前范围内存在隐式 ContextShift[IO].
val program = (ioA, ioB, ioC).parMapN { (_, _, _) => () }
program.unsafeRunSync()
//=> Running ioB
//=> Running ioC
//=> Running ioA
()
以下例子中,如果任何 IO 失败,则整个计算的结果将失败,而未完成的任务将被取消:
import cats.effect.{ContextShift, ExitCase, IO}
import cats.implicits._
import scala.concurrent.ExecutionContext
import scala.concurrent.duration._
implicit val contextShift: ContextShift[IO] = IO.contextShift(ExecutionContext.global)
implicit val timer = IO.timer(ExecutionContext.global)
val a = IO.raiseError[Unit](new Exception("boom")) <* IO(println("Running ioA"))
val b = (IO.sleep(1.second) *> IO(println("Running ioB")))
.guaranteeCase {
case ExitCase.Canceled => IO(println("ioB was canceled!"))
case _ => IO.unit
}
val parFailure = (a, b).parMapN { (_, _) => () }
parFailure.attempt.unsafeRunSync()
//=> ioB was canceled!
//=> java.lang.Exception: boom
//=> ... 43 elided
()
以下例子由于其中一项任务立即失败了,另一项任务被取消计算并且立即返回,因此通过 parMapN
来进行并行运算在返回错误之前不会等待 10 秒:
import cats.effect.{ContextShift, Timer, IO}
import cats.implicits._
import scala.concurrent.ExecutionContext
import scala.concurrent.duration._
implicit val contextShift: ContextShift[IO] = IO.contextShift(ExecutionContext.global)
implicit val timer: Timer[IO] = IO.timer(ExecutionContext.global)
val ioA = IO.sleep(10.seconds) *> IO(println("Delayed!"))
val ioB = IO.raiseError[Unit](new Exception("dummy"))
(ioA, ioB).parMapN((_, _) => ())
parSequence
如果您有一个 IO 列表,并且希望用一个 IO 来存放结果列表,则可以使用parSequence 并行执行 IO 任务。
import cats.data.NonEmptyList
import cats.effect.{ContextShift, Timer, IO}
import cats.syntax.parallel._
import scala.concurrent.ExecutionContext
// Needed for IO.start to do a logical thread fork
implicit val cs: ContextShift[IO] = IO.contextShift(ExecutionContext.global)
implicit val timer: Timer[IO] = IO.timer(ExecutionContext.global)
val anIO = IO(1)
val aLotOfIOs =
NonEmptyList.of(anIO, anIO)
val ioOfList = aLotOfIOs.parSequence
cats.Traverse.sequence
也可以同步执行此操作。
parTraverse
如果您有数据列表以及将每个项目转换为 IO 的方法,但是您只想要一个 IO 来存放所有结果,则可以使用 parTraverse
并行运行这些计算。
import cats.data.NonEmptyList
import cats.effect.{ContextShift, Timer, IO}
import cats.syntax.parallel._
import scala.concurrent.ExecutionContext
// Needed for IO.start to do a logical thread fork
implicit val cs: ContextShift[IO] = IO.contextShift(ExecutionContext.global)
implicit val timer: Timer[IO] = IO.timer(ExecutionContext.global)
val results = NonEmptyList.of(1, 2, 3).parTraverse { i =>
IO(i)
}
cats.Traverse.traverse 用于同步运行每个步骤。
“不安全” 的操作
在前面的示例中,我们几乎一直在使用一些“不安全”的操作,但是我们从未对它们进行任何解释,因此这里进行说明。所有带有 unsafe
前缀的操作都是不纯函数,并且会产生副作用(例如 Haskell 具有不安全 PerformIO
)。但不要害怕这个名字!您应该使用诸如 map
和 flatMap
之类的函数以单子形式编写程序以组合其他函数,并且理想情况下,应仅在程序的最后一次调用这些不安全的操作之一。
unsafeRunSync
以不纯的副作用来运行封装的 effect 以产生出结果。
如果计算中的任何组件是异步的,则当前线程将被阻塞,并等待异步计算的结果。在 JavaScript 中,将抛出异常以避免生成死锁。默认情况下,此阻塞将是无时间界的,如果要将线程块的执行限制在某个固定时间内,请使用unsafeRunTimed
来代替。
effect 中抛出(被挂起)的任何异常将在评估期间重新抛出。
IO(println("Sync!")).unsafeRunSync()
// Sync!
unsafeRunAsync
以不纯的副作用运行封装的 effect,将其中的结果传递给回调函数。
effect
中引发的任何异常都将传递给回调中的 Either
。回调将最多调用一次。请注意,如果构造一个永不返,且又永远不会阻塞线程的 IO,尝试使用此方法去执行该 IO 会导致永远不会调用回调的情况。
IO(println("Async!")).unsafeRunAsync(_ => ())
// Async!
unsafeRunCancelable
运行一个 IO,并将封装的 effect
的结果传递给给定的回调,但是允许这个过程可以被打断。
IO(println("Potentially cancelable!")).unsafeRunCancelable(_ => ())
// Potentially cancelable!
// res59: cats.effect.package.CancelToken[IO] = Suspend(
// cats.effect.internals.IOConnection$Impl$$Lambda$19396/541685241@bdae020
// )
unsafeRunTimed
与 unsafeRunSync
类似,增加了在等待异步结果时的有限的阻塞等待时间。
请注意,limit
参数并不限制总计算时间,而是充当任何单个异步块的时间上限。因此,如果您对仅由同步操作组成的 IO 传递了5秒的限制,则评估可能要花费5秒以上的时间!
此外,如果您对由多个合并(join
)在一起的异步操作组成的 IO 传递 5 秒的限制,则计算可能最多需要 n*5
秒,其中 n 是加入的异步操作的数量。
一旦达到异步阻塞时限,评估将“立即”中止并且不返回任何内容。
请注意,此功能仅用于测试目的,它永远不应该出现在您的生产代码中!如果要实现超时或类似的方法,绝对不适合使用该功能。如果您需要那种功能,则应该使用第三方库(例如 fs2 或 Monix)。
import scala.concurrent.duration._
IO(println("Timed!")).unsafeRunTimed(5.seconds)
unsafeToFuture
评估 effect 并将结果保存在 Future
中。Evaluates the effect and produces the result in a Future
.
这与 unsafeRunAsync
相似,因为它以非阻塞的风格以副作用的方式对 IO 进行评估,但是它使用的是 Future
而不是显式回调。仅当您需要与遗留的 Scala 代码进行互操作时,才应真正使用此功能。
IO("Gimme a Future!").unsafeToFuture()
最佳实践
本节介绍了使用IO的一些最佳实践:
保持粒度
最好保持较清晰的粒度,因此请不要执行以下操作:
IO {
readingFile
writingToDatabase
sendBytesOverTcp
launchMissiles
}
在 FP 中,我们欢迎以推理的方式组建程序,并且由于 IO 是 Monad,因此您可以利用 for-comprehension
将较小的程序组合成较大的程序。例如:
val program =
for {
data <- readFile
_ <- writeToDatabase(data)
_ <- sendBytesOverTcp(data)
_ <- launchMissiles
} yield ()
comprehension 中的每个步骤都是一个小程序,生成的程序是所有这些小步骤彼此一起组成。 最终形成合成的 IO。
在 map / flatMap 中使用纯函数
当使用 map
或 flatMap
时,不建议传递具有 side effect 的函数,因为 map
函数也应该是纯函数。所以应该避免:
IO.pure(123).map(n => println(s"NOT RECOMMENDED! $n"))
也应该避免以下这种情况,因为 side effect 不会在返回的 IO 值中挂起:
IO.pure(123).flatMap { n =>
println(s"NOT RECOMMENDED! $n")
IO.unit
}
正确的方法是这样的:
IO.pure(123).flatMap { n =>
// 正确地挂起副作用
IO(println(s"RECOMMENDED! $n"))
}
请注意,就 IO 的实际行为而言,类似 IO.pure(x).map(f)
等同于 IO(f(x))
,而 IO.pure(x).flatMap(f)
等同与 IO.suspend(f(x))
。
但是您不应该这样做,因为 Sync 类型类中没有描述这种行为的法则,而那些法则是您的代码得到保障的唯一依据。例如,上述等效性将来可能会在错误处理的时候被打破。因此,出于安全原因,目前应该保持这种写法,同时您可以其视为将来可能更改的实现细节。
请坚持使用纯函数。