Cats IO

Cats IO 是一种将 side effect 编码为纯值的数据类型,它能够描述同步和异步计算。

介绍

IO[A] 类型的值代表一种计算模型,在对它进行求值时,它将执行某种效果运算并返回 A 类型的值。

IO 的值是纯的,不变的,因此保留了引用透明性,因此被用于函数式编程。 IO 是一种数据结构,仅表示对副作用计算的描述。

IO 可以描述以下同步或异步计算:

  1. 只能得到唯一解。
  2. 可能以成功结束或以失败结束,并且在失败的情况下,flatMap 链会被短路(IO 实现了 MonadError 代数结构)。
  3. 可以取消,但请注意,此功能依赖用户提供取消逻辑。

由抽象的过程所描述的 effect 将不会被执行直到“世界的尽头”,具体地说,直到某个“unsafe”方法被调用为止。 效果的执行结果是不被记忆的,这意味着内存开销最小(并且没有泄漏),并且,单个 effect 可以被以“引用透明”的方式被多次执行。例如:

import cats.effect.IO

val ioa = IO { println("hey!") }

val program: IO[Unit] =
  for {
     _ <- ioa
     _ <- ioa
  } yield ()

program.unsafeRunSync()
//=> hey!
//=> hey!
()

上面的这个的例子,“hey!”被打印了两次,因为这个 effect 在“单子”链中被重复地执行。

引用透明和惰性求值

IO 可以“冻结” side effect,因为它是一种惰性求值的数据类型。请参考以下分类并反复与标准库中的“Future”的进行比较,来理解求值模型(在 Scala 语境中)的全貌。

 EagerLazy
SynchronousA() => A
  Eval[A]
Asynchronous(A => Unit) => Unit() => (A => Unit) => Unit
 Future[A]IO[A]

通过与 Scala 的 Future 比较,IO 数据类型即使在处理副作用时也保留了引用透明性,并且它是惰性求值的。与像 Scala 这样的即时求值语言相比较,这是结果与产生结果的函数之间的区别。

Future 类似,通过 IO,您可以推断异步处理的结果,但是由于其纯性和惰性,可以将 IO 视为一个规范(直到“世界的尽头”才进行求值),从而可以对 IO 的求值模型施加更多的控制,并且更具可预测性。例如当组合多个 IO,或处理错误时,是以序列化的方式处理,还是以并行的方式处理。

注意惰性求值总是与引用透明性并存。参考以下示例:

for {
  _ <- addToGauge(32)
  _ <- addToGauge(32)
} yield ()

如果我们考虑引用透明性,则可以将该示例重写为:

val task = addToGauge(32)

for {
  _ <- task
  _ <- task
} yield ()

但是这不适用于 Future,但适用于 IO,此能力对于函数式编程至关重要。

堆栈安全

IO 在它的 flatMap 中是以 trampoline 的方式进行求值的,这意味着您可以在任意深度的递归函数中安全地调用 flatMap,而不必担心会顶爆堆栈:

def fib(n: Int, a: Long = 0, b: Long = 1): IO[Long] =
  IO(a + b).flatMap { b2 =>
    if (n > 0) 
      fib(n - 1, b, b2)
    else 
      IO.pure(a)
  }

根据 IO 中实现的类型类的层次结构。除某些函数外,所有这里定义的操作都可用于 IO。

Effects 介绍

IO 是一种强大的抽象,它可以效果化地描述多种不同的 effect:

纯值 — IO.pure & IO.unit

在 IO 的伴随类中定义了以下函数,可以用于将纯值加载到 IO 中,从而生成出“已经求值”的 IO 值:

def pure[A](a: A): IO[A] = ???

请注意,给定的参数形式是值传递而不是按名(by-name)传递。

例如,我们可以将一个数字(纯值)放入 IO 中,然后将其安全地与另一个打包了side effect 的 IO 组合在一起,因为它们将不执行任何操作:

IO.pure(25).flatMap(n => IO(println(s"Number is: $n")))

显然,IO.pure 无法挂起副作用,因为当参数被以值传递给它时,IO.pure 是即时求值的,因此请不要这样做:

IO.pure(println("THIS IS WRONG!"))

在这种情况下,println 将触发副作用,该副作用不会在 IO 中被挂起,有鉴于此,这样的代码可能不是我们想要的。

IO.unit 是 IO.pure(()) 的简化的别名,可在需要 IO[Unit] 值时重复使用,而无需担心触发任何其他副作用:

val unit: IO[Unit] = IO.pure(())

IO[Unit] 在 Scala 代码中使用的相当普遍,Unit 类型本身就意味着调用的返回有副作用,它可以作为 pure(()) 的快捷方式,并且可以起到优化的用处,因为返回了相同的引用。

同步效果 — IO.apply

它可能是最常用的构建器,等效于 Sync[IO].delay,描述了可以在当前线程和调用堆栈上立即做求值的 IO 操作:

def apply[A](body: => A): IO[A] = ???

请注意,给定的参数是通过“by-name”传递的,其执行被“暂停”在 IO 上下文中。

这个示例是在 JVM 之上使用阻塞 I/O 从控制台读取和写入信息:

def putStrLn(value: String) = IO(println(value))
val readLn = IO(scala.io.StdIn.readLine())

然后,我们可以以纯函数的方式使用它们来对与控制台的交互进行建模:

for {
  _ <- putStrLn("What's your name?")
  n <- readLn
  _ <- putStrLn(s"Hello, $n!")
} yield ()

异步 Effects — IO.async & IO.cancelable

IO可以通过 IO.asyncIO.cancelable 构建器描述异步过程。

IO.async 是基于 Async#async (请参阅 Async))的定义的操作,可以用于描述不可取消的简单异步过程,其函数签名为:

def async[A](k: (Either[Throwable, A] => Unit) => Unit): IO[A] = ???

提供的参数是一个可以用来注入处理成功(Right(a))的或失败(Left(error))结果的回调函数。用户可要求触发任何异步副作用,然后用注入的回调函数来处理结果。

例如以下简单的例子可用于转换 Scala 的 Future 的代码。虽然您并不需要像这样转换,因为我们已经在 IO.fromFuture 中定义了转换操作:

import scala.concurrent.{Future, ExecutionContext}
import scala.util.{Success, Failure}

def convert[A](fa: => Future[A])(implicit ec: ExecutionContext): IO[A] =
  IO.async { cb =>
    // This triggers evaluation of the by-name param and of onComplete, 
    // so it's OK to have side effects in this callback
    fa.onComplete {
      case Success(a) => cb(Right(a))
      case Failure(e) => cb(Left(e))
    }
  }

可撤销的处理

对于构建可取消的 IO 任务,您需要使用 IO.cancelable 构建器,该构建器符合Concurrent#cancelable(请参见 Concurrent)的定义,具有以下签名:

def cancelable[A](k: (Either[Throwable, A] => Unit) => IO[Unit]): IO[A] = ???

因此它与 IO.async 相似,但是用户期待它的注册函数的返回类型为 IO[Unit] 的,其中包含取消运算所需的逻辑 。

重要提示:取消是在 IO 完成任务之前终止执行的能力,为释放任何获得的资源提供可能性,这在竞争条件下很有用,可防止泄漏。

作为示例,我们要实现一个基于 Java 的 ScheduledExecutorService 的定期休眠(sleep)程序为例:

import java.util.concurrent.ScheduledExecutorService
import scala.concurrent.duration._

def delayedTick(d: FiniteDuration)
  (implicit sc: ScheduledExecutorService): IO[Unit] = {
 
  IO.cancelable { cb =>
    val r = new Runnable { def run() = cb(Right(())) }
    val f = sc.schedule(r, d.length, d.unit)
    
    // 返回需要被提前取消并释放资源的 cancellation token
    IO(f.cancel(false)).void
  }
}

注意,这个延时滴答功能(需要 Timer 支持)已经通过 IO.sleep 提供了,所以你不需要自己去编写。

更多有关 ‘‘cancellation’’ 的细节请参考下面。

IO.never

IO.never 表示一个由 async 定义的,不会结束的 IO,可重复使用:

val never: IO[Nothing] = IO.async(_ => ())

在某​​些情况下不结束运算是很有用的,例如比赛条件。例如,给定 IO.race,我们有以下运算:

IO.race(lh, IO.never) <-> lh.map(Left(_))

IO.race(IO.never, rh) <-> rh.map(Right(_))

推迟计算 — IO.suspend

IO.suspend 构建器等效于:

IO.suspend(f) <-> IO(f).flatten

因此它对于暂停 effect 很有用,但这会推迟返回IO的完成结果。这对于建模堆栈安全的尾递归循环很有用:

import cats.effect.IO

def fib(n: Int, a: Long, b: Long): IO[Long] =
  IO.suspend {
    if (n > 0)
      fib(n - 1, b, a + b)
    else
      IO.pure(a)
  }

通常,像这样的函数最终会在 JVM 顶部产生堆栈溢出错误。通过使用 IO.suspend 并使用 IO 循环执行所有这些运算,它的每个求值循环是惰性的,并且使用恒定大小的内存。当然,您也可以使用 flatMap,在这个示例中 suspend 显得得更好些。

我们可以使用 Scala 的 @tailrec 机制来标注此功能,同时通过使用 IO,我们还可以通过插入异步边界来保持(计算资源使用的)公平性:

import cats.effect._

def fib(n: Int, a: Long, b: Long)(implicit cs: ContextShift[IO]): IO[Long] =
  IO.suspend {
    if (n == 0) IO.pure(a) else {
      val next = fib(n - 1, b, a + b)
      // Every 100 cycles, introduce a logical thread fork
      if (n % 100 == 0)
        cs.shift *> next
      else
        next
    }
  }

这里我们还有比 @tailrec 循环更有趣的东西。可以看出,IO 允许对求值进行非常精确的控制。

并发与取消

IO 可以描述可中断的异步过程。作为实施细节:

  1. 并非所有的任务都可以被取消。取消状态只会在到达异步边界之后被检查。可以通过以下方式实现:
    • 使用 IO.cancelableIO.asyncIO.asyncF or IO.bracket 来构建。
    • 使用 IO.cancelBoundary 或 IO.shift

请注意,第二点是第一点的结果。包括但不限于 Mvar.takeMvar.putDeferred.get 这些阻塞操作都可以被取消。

我们还应注意,仅当 FlatMap 调用链到达异步边界之后才可以取消。在每 NFlatMap 的异步调用边界之后执行取消检查。 N 的值被硬编码为 512

例子如下:

import cats.effect.{ContextShift, IO}
import scala.concurrent.ExecutionContext

implicit val contextShift: ContextShift[IO] = IO.contextShift(ExecutionContext.global)

def retryUntilRight[A, B](io: IO[Either[A, B]]): IO[B] = {
  io.flatMap {
    case Right(b) => IO.pure(b)
    case Left(_) => retryUntilRight(io)
  }
}

// 不可取消的无终止计算
val notCancelable: IO[Int] = retryUntilRight(IO(Left(0)))

// 可取消的无终止计算,IO.shift 在 `flatMap` 之前建立了异步边界。
val cancelable: IO[Int] = IO.shift *> retryUntilRight(IO(Left(0)))
  1. 可取消的IO任务通常通过 cancel 终止。

这对于来自 Java 的人们来说这可能是一个困惑点,他们可能会期盼通过 Thread.interrupt 或旧的过时的 Thread.stop

IO 的取消不这样工作,因为 Java 的线程中断本质上是不安全不可靠不可移植的!

接下来的小节将更详细地介绍与取消有关的操作。

构建可取消的 IO 任务

可取消的 IO 任务可以通过 IO.cancelable 构建器来描述。就像之前已经给出的的使用 Java 的  ScheduledExecutorService 来实现的例子 delayedTick, 让我们回顾以下:

import java.util.concurrent.ScheduledExecutorService

import cats.effect.IO

import scala.concurrent.duration.FiniteDuration

def sleep(d: FiniteDuration)
  (implicit sc: ScheduledExecutorService): IO[Unit] = {

  IO.cancelable { cb => 
    val r = new Runnable { def run() = cb(Right(())) }
    val f = sc.schedule(r, d.length, d.unit)
    // 返回一个可以让我们取消执行计划的函数
    IO(f.cancel(false)).void
  }
}

重点:如果您未为任务指定取消逻辑,则该任务是不可取消的。所以,例如使用Java 的阻塞 I/O:

import java.io._

import cats.effect.IO

import scala.concurrent.ExecutionContext
import scala.util.control.NonFatal

def unsafeFileToString(file: File) = {
  // 让人抓狂的 Java :-)
  val in = new BufferedReader(
    new InputStreamReader(new FileInputStream(file), "utf-8"))
  
  try {
    // 不可取消的循环
    val sb = new StringBuilder()
    var hasNext = true
    while (hasNext) {
      hasNext = false
      val line = in.readLine()
      if (line != null) {
        hasNext = true
        sb.append(line)
      }
    }
    sb.toString
  } finally {
    in.close()
  }
}

def readFile(file: File)(implicit ec: ExecutionContext) =
  IO.async[String] { cb =>
    ec.execute(() => {
      try {
        // 结束信号
        cb(Right(unsafeFileToString(file)))
      } catch {
        case NonFatal(e) =>
          cb(Left(e))
      }
    })
  }

显然这是无法取消的,IO 的实现并无使该循环取消的魔力。不,我们不该使用Java 的 Thread.interrupt,因为那将是不安全和不可靠的,并且无论如何, IO 都应该保持它能够在平台之间移植。

对于(如何安全地取消)这件事来讲有很大的灵活性,包括在这里,我们可以简单地引入一个可设置为 false 的变量,以便在 while 循环中检查它:

import java.io.File
import java.util.concurrent.atomic.AtomicBoolean

import cats.effect.IO

import scala.concurrent.ExecutionContext
import scala.io.Source
import scala.util.control.NonFatal

def unsafeFileToString(file: File, isActive: AtomicBoolean) = {
  val sc = new StringBuilder
  val linesIterator = Source.fromFile(file).getLines()
  var hasNext = true
  while (hasNext && isActive.get) {
    sc.append(linesIterator.next())
    hasNext = linesIterator.hasNext
  }
  sc.toString
}

def readFile(file: File)(implicit ec: ExecutionContext) =
  IO.cancelable[String] { cb =>
    val isActive = new AtomicBoolean(true)
    
    ec.execute(() => {
      try {
        // 结束信号
        cb(Right(unsafeFileToString(file, isActive)))
      } catch {
        case NonFatal(e) =>
          cb(Left(e))
      }
    })    
    // 当取消时设置它
    IO(isActive.set(false)).void
  }

小心: 取消,是一个并发动作!

有些问题并非总是显而易见的,不是从上面的示例中得出的,但是您可能会想这样做:

import java.io._

import cats.effect.IO

import scala.concurrent.ExecutionContext
import scala.util.control.NonFatal

def readLine(in: BufferedReader)(implicit ec: ExecutionContext) =
  IO.cancelable[String] { cb =>
    ec.execute(() => cb(
      try Right(in.readLine()) 
      catch { case NonFatal(e) => Left(e) }))
      
    // 取消逻辑不是线程安全的!
    IO(in.close()).void
  }

这样的操作在通过 IO(FS2,Monix 之类的 stream 库)的抽象流,流化地处理 IO 块时可能很有用。

但是它所描述的操作并不正确,因为 in.close() 是并发于 in.readLine() 的,这可能导致引发异常,并且在许多情况下可能导致数据损毁。这是绝对禁止的。我们想中断 IO 所做的一切,但不以破坏数据为代价。

因此,用户需要处理线程安全问题。以下这是一种实现方法:

import java.io._
import java.util.concurrent.atomic.AtomicBoolean

import cats.effect.IO

import scala.util.control.NonFatal
import scala.concurrent.ExecutionContext

def readLine(in: BufferedReader)(implicit ec: ExecutionContext) =
  IO.cancelable[String] { cb =>
    val isActive = new AtomicBoolean(true)
    ec.execute { () => 
      if (isActive.getAndSet(false)) {
        try cb(Right(in.readLine()))
        catch { case NonFatal(e) => cb(Left(e)) }
      }
      // 注意这路并没有 else 子句;如果取消被执行,我们不调用回调函数,任务将永不终结。;-)
    }
    // 取消逻辑
    IO {
      // 线程安全
      if (isActive.getAndSet(false))
        in.close()
    }.void
  }

在这个例子中,取消逻辑本身调用了 in.close(),但由于我们使用了原子对象的 getAndSet 创建了线程安全防护措施,因此调用是安全的。

这里使用了 AtomicBoolean 来确保线程安全,也不必回避通过 synchronize 块或任何 JVM 内部提供的其它并发原语来建立锁/互斥体,这些功能对这些副作用函数而言是需要的。不用担心,这通常仅在 IO.cancelableIO.asyncIO.apply 中才需要,因为这些构建器代表了与不纯净的世界互动的交互界面,正是因为这些暗面(起到隔离作用),才保证一旦您处于 IO 的上下文中,您就可以使用更高级的方法来撰写并发任务。

不幸的是,共享内存并发既是使用内核线程的福音也是诅咒。在 JavaScript 之类的 N:1 平台上,这不是一个大问题,因为(在那里)您不会获得进程内的 CPU 并行性。这就是现实,这是经过权衡后巨大的退让。

启动并发 + 取消

您可以将 IO 当作绿色线程看待,可以通过基于 Concurrent#startIO#start 来执行 “fork”操作。它基于以下签名:

def start: IO[Fiber[IO, A]]

返回的是一个纤程。你可以认为纤程是轻量级纤程,一个纤程是可以(通过join)连接或(通过cancel)取消的,纯净的和轻量的线程等效物。

例如:

import cats.effect.{ContextShift, IO}

import scala.concurrent.ExecutionContext

// 被用于 IO.start 来启动逻辑线程分支
implicit val cs: ContextShift[IO] = IO.contextShift(ExecutionContext.global)

val launchMissiles: IO[Unit] = IO.raiseError(new Exception("boom!"))
val runToBunker = IO(println("To the bunker!!!"))

for {
  fiber <- launchMissiles.start
  _ <- runToBunker.handleErrorWith { error =>
    // 撤退, 取消发射!(也许我们应该在发射之前撤退到掩体里?)
    fiber.cancel *> IO.raiseError(error)
  }
  aftermath <- fiber.join
} yield aftermath

实现说明:

  • *> 操作定义在 Cats 中,你可以将它看做是以下表达式的别名:lh.flatMap(_ => rh)

runCancelable & unsafeRunCancelable

以上是纯的 cancel 操作,可以通过线程来访问。第二种访问并取消任务的方法是通过 runCancelable(纯函数方式)和 unsafeRunCancelable(非安全方式)。

依赖于副作用的 unsafeRunCancelable 的示例,请注意,这种代码不纯,应谨慎使用:

import cats.effect.IO

import scala.concurrent.ExecutionContext
import scala.concurrent.duration._

// Needed for `sleep`
implicit val timer = IO.timer(ExecutionContext.global)

// 延迟打印
val io: IO[Unit] = IO.sleep(10.seconds) *> IO(println("Hello!"))

val cancel: IO[Unit] = 
  io.unsafeRunCancelable(r => println(s"Done: $r"))

// ... if a race condition happens, we can cancel it,
// thus canceling the scheduling of `IO.sleep`
cancel.unsafeRunSync()

作为顶替选项的 runCancelable,它符合  ConcurrentEffect 定义的法则。基于同样的道理,SyncIO 会将执行暂时挂起(直到世界的尽头):

import cats.effect.SyncIO
import cats.syntax.flatMap._

val pureResult: SyncIO[IO[Unit]] = io.runCancelable { r => 
  IO(println(s"Done: $r"))
}

// On evaluation, this will first execute the source, then it 
// will cancel it, because it makes perfect sense :-)
pureResult.toIO.flatten

不可取消标记

给定一个可取消的 IO,我们可以将其变成无法取消的 IO:

import cats.effect.IO

import scala.concurrent.ExecutionContext
import scala.concurrent.duration._

// Needed for `sleep`
implicit val timer = IO.timer(ExecutionContext.global)

// Our reference from above
val io: IO[Unit] = IO.sleep(10.seconds) *> IO(println("Hello!"))

// 这个 IO 将不可取消,哪怕你去尝试
io.uncancelable

有时您需要确保 IO 的执行是原子的,换句话说,要么全部执行,要么都不执行。这就是此(不可取消)操作的作用。—— 可取消的 IO 从定义上来说并不是原子的,在某些情况下,我们需要使它们成为原子的。

它符合 Concurrent#uncancelable (see Concurrent) 中定义的法则。

IO.cancelBoundary

将一个可取消边界返回给一个 IO 任务,供其在循环中检查其取消状态,当发现取消状态被置位时取消绑定的程序。

这个操作非常类似 IO.shift,因为它也可以被用于 flatMap 调用链以便取消一长串调用:

import cats.effect.IO

def fib(n: Int, a: Long, b: Long): IO[Long] =
  IO.suspend {
    if (n <= 0) IO.pure(a) else {
      val next = fib(n - 1, b, a + b)

      // 每一百次循环检查一次状态
      if (n % 100 == 0)
        IO.cancelBoundary *> next
      else
        next
    }
  }

如本节开头所述,需要明确地管理资源分配公平性,该协议易于遵循并且可以被以所见即所得的方式预测。

与 IO.shift 比较

IO.cancelBoundary 本质上是 IO.shift 的轻量版本,它没有转移到不同的线程池的能力。它避免了对逻辑的 fork, 因此从某种意义上讲它很轻便。

竞争执行的条件 — race & racePair

“竞赛条件”指的是一种逻辑,它在两个或多个任务之间产生竞赛,胜利者会立即返回信号,而失败者则通常会被取消。

IO 为参与竞赛的同伴提供两种操作:

// 简单模式
def race[A, B](lh: IO[A], rh: IO[B])
  (implicit cs: ContextShift[IO]): IO[Either[A, B]]
  
// 高级模式
def racePair[A, B](lh: IO[A], rh: IO[B])
  (implicit cs: ContextShift[IO]): IO[Either[(A, Fiber[IO, B]), (Fiber[IO, A], B)]]

简单的版本中 IO.race 将立即取消失败者,而第二个版本为您提供一个线程,让您决定下一步要做什么。

因此 race 可以像这样从 racePair 中派生出来:

import cats.effect.{ContextShift, IO}

def race[A, B](lh: IO[A], rh: IO[B])
  (implicit cs: ContextShift[IO]): IO[Either[A, B]] = {
  
  IO.racePair(lh, rh).flatMap {
    case Left((a, fiber)) => 
      fiber.cancel.map(_ => Left(a))
    case Right((fiber, b)) => 
      fiber.cancel.map(_ => Right(b))
  }
}

我们可以使用 race 实现一个 “timeout” 运算:

import cats.effect.{ContextShift, Timer, IO}

import scala.concurrent.CancellationException
import scala.concurrent.duration.FiniteDuration

def timeoutTo[A](fa: IO[A], after: FiniteDuration, fallback: IO[A])
  (implicit timer: Timer[IO], cs: ContextShift[IO]): IO[A] = {

  IO.race(fa, timer.sleep(after)).flatMap {
    case Left(a) => IO.pure(a)
    case Right(_) => fallback
  }
}

def timeout[A](fa: IO[A], after: FiniteDuration)
  (implicit timer: Timer[IO], cs: ContextShift[IO]): IO[A] = {

  val error = new CancellationException(after.toString)
  timeoutTo(fa, after, IO.raiseError(error))
}

有关如何获得一个 Timer[IO] 参考并行章节

与 Haskell 的“异步中断”比较

Haskell 将中断称为“异步异常”,通过从另一个线程(并发地)抛出异常来中断正在运行的任务。

而对于 cats.effect 的 “cancel”操作来说,它只是执行您在 IO.cancelable 构建器中指定的任何内容。取决于 IO.cancelable 任务的实现,它有可能无止境地执行下去。如果我们使用一个不纯的签名来描述这个取消操作的话,将是这样的:

() => Unit

与 Haskell(也许还有即将到来的 Scalaz 8 IO)相比较,它在中断中发送一个 Throwable 给任务,目标任务则以这个 Throwable 结束任务。它的不纯签名如下:

Throwable => Unit

Throwable => Unit 允许任务的逻辑知道取消的原因,但是取消是为了切断与生产者的连接,并尽快关闭所有资源,因为由于某些竞争条件,您不再对结果感兴趣。

并且 Throwable => Unit 范围太宽泛,这也有点令人困惑。可能会诱使用户通过此通道将消息发送回生产者,以便对其进行控制以更改输出结果 —— 但是取消就是取消,我们这样做是为了释放资源,并且竞争条件的实现将关闭连接,不允许取消的任务向下游再发送任何内容。

因此,这会使用户感到困惑,并且唯一的实际用途是根据收到的错误以不同的方式释放资源。但是,考虑到复杂性的增加,这不是值得追求的用例。

资源的安全获取与释放

现状

在主流命令式语言中,您通常可以 try/finally 声明来获取和安全地释放资源。模式如下:

import java.io._

def javaReadFirstLine(file: File): String = {
  val in = new BufferedReader(new FileReader(file))
  try {
    in.readLine()
  } finally {
    in.close()
  }
}

它的确存在以下类似的问题:

  1. 该语句显然是用于副作用计算的,在 FP 抽象中不能使用。
  2. 它仅用于同步执行,因此在处理具有异步性的抽象(例如 IO,Task,Future)时,我们无法使用它。
  3. 不管异常类型如何,finally 都会执行。因此,如果遇到内存不足错误,它仍然会尝试关闭文件句柄,从而不必要地延迟了进程崩溃。
  4. 如果 try 主体抛出异常,然后 finally 主体也抛出异常,则 finally 异常会被重新抛出,从而隐藏了原始问题。

bracket

通过 bracket 操作,我们可以轻松地重述上述内容:

import java.io._

import cats.effect.IO

def readFirstLine(file: File): IO[String] =
  IO(new BufferedReader(new FileReader(file))).bracket { in =>
    // 使用 (the try block)
    IO(in.readLine())
  } { in =>
    // 释放 reader (the finally block)
    IO(in.close()).void
  }

要点:

  1. 它是纯的,因此可以用于 FP。
  2. 它适用于异步 IO 运算。
  3. 无论 use 动作的退出状态如何,都会发生 release 动作,因此它伴随执行于“成功”,“抛出错误”或“取消”等行为。
  4. 如果 use 动作引发错误,然后 release 动作也引发错误,则报告的错误将是 use 的错误,而 release 引发的错误将(通过 System.err)被记录。

需要特别注意的是,bracket 在任务被取消时也会调用 release 动作。考虑以下示例:

import java.io._

import cats.effect.{ContextShift, IO}

import scala.concurrent.ExecutionContext

implicit val contextShift: ContextShift[IO] = IO.contextShift(ExecutionContext.global)

def readFile(file: File): IO[String] = {
  // 在异步边界之后打开文件,以确保该处理不会阻塞当前线程
  val acquire = IO.shift *> IO(new BufferedReader(new FileReader(file)))
    
  acquire.bracket { in =>
    // 使用 (the try block)
    IO {
      // 警告!难看的低阶 Java 代码。
      val content = new StringBuilder()
      var line: String = null
      do {
        line = in.readLine()
        if (line != null) content.append(line)
      } while (line != null)
      content.toString()
    }
  } { in =>
    // 释放 reader (the finally block)
    // 这是有问题的代码,如果这段 IO 被取消,它可能导致数据损毁
    IO(in.close()).void
  }
}

这个循环可能很慢,我们可能正在处理一个大文件,正如 “并发与取消” 中所述的,取消是一项并发操作,不管使用中正在发生什么。

在这种情况下,在具有多线程功能的 JVM 上,与该循环同时调用 io.close() 可能导致数据损坏。为了防止这种情况,可能需要根据情况进行同步:

import java.io._

import cats.effect.{ContextShift, IO}

import scala.concurrent.ExecutionContext

implicit val contextShift: ContextShift[IO] = IO.contextShift(ExecutionContext.global)

def readFile(file: File): IO[String] = {
  // 在打开文件之前建立异步边界,以确保操作不会阻塞当前线程
  val acquire = IO.shift *> IO(new BufferedReader(new FileReader(file)))
    
  // 挂起执行,因为我们将要修改一个共享变量。
  IO.suspend {
    // 共享取消标志
    var isCanceled = false
    
    acquire.bracket { in =>
      IO {
        val content = new StringBuilder()
        var line: String = null
        do {
        // 同步访问 isCanceled 和 reader
          line = in.synchronized {
            if (!isCanceled)
              in.readLine()
            else
              null
          }
          if (line != null) content.append(line)
        } while (line != null)
        content.toString()
      }
    } { in =>
      IO {
        // 同步访问 isCanceled 和 reader
        in.synchronized {
          isCanceled = true
          in.close()
        }
      }.void
    }
  }
}

bracketCase

bracketCase 操作是一般化的的 bracket ,它的 release 会收到一个ExitCase,以示区分对待以下情况:

  1. 任务成功结束
  2. 任务失败
  3. 任务被取消

用例:

import java.io.BufferedReader
import cats.effect.IO
import cats.effect.ExitCase.{Completed, Error, Canceled}

def readLine(in: BufferedReader): IO[String] =
  IO.pure(in).bracketCase { in =>
    IO(in.readLine())
  } { 
    case (_, Completed | Error(_)) =>
      // Do nothing
      IO.unit
    case (in, Canceled) =>
      IO(in.close())
  }

在以上示例中,我们仅在发生任务取消的情况下关闭传递来的资源。至于我们为什么这样做 —— 考虑到向我们提供了 BufferedReader 引用,通常,这种资源的生产者也应负责释放它。因此如果此函数在成功状态下也释放了 BufferedReader,那么这将是一个有缺陷的实现。

记住古老的 C++习惯用法“资源获取就是初始化(RAII)”,它说的是资源的生存期应该与其父资源的生存期联系在一起。

但是如果我们检测到取消,我们可能就需要(主动)关闭该资源,因为在取消事件中,此 IO 返回结果后我们已经没有了有效的“执行环路”,因此也就没有人可以去释放它。

转换

在 IO 伴随对象中定义了两个有用的操作,可以将 scala 的 FutureEither 提取到 IO 中。

fromFuture

构造一个用以评估给定的 Future 并计算出结果或失败的 IO 的定义如下:

import cats.effect.IO
import scala.concurrent.Future

def fromFuture[A](iof: IO[Future[A]]): IO[A] = ???

由于 Future 会立即地进行估值计算并具有记忆性,因此此函数将其作为 IO 的参数来使用,可以延迟进行估值。如果这种惰性能够被适当地传递到定义 Future 的地方,则可以确保由 IO 完全管理此计算,从而具有引用透明性。

惰性求值,等效于 by-name 参数:

import cats.effect.{ContextShift, IO}

import scala.concurrent.Future
import scala.concurrent.ExecutionContext
import ExecutionContext.Implicits.global

implicit val contextShift: ContextShift[IO] = IO.contextShift(ExecutionContext.global)

IO.fromFuture(IO {
  Future(println("I come from the Future!"))
})

饥饿求值:

val f = Future.successful("I come from the Future!")

IO.fromFuture(IO.pure(f))

fromEither

将一个 [Throwable, A] 加载到 IO[A] 上下文中:

import cats.effect.IO

def fromEither[A](e: Either[Throwable, A]): IO[A] = e.fold(IO.raiseError, IO.pure)

错误处理

Cats Effect 中有一个 MonadError[IO, Throwable] 实例,所有错误处理都通过它来完成。这意味着您可以在 IO 中使用由 MonadError 提供的所有的所有操作,乃至于使用 ApplicativeError 提供的所有操作,只要错误类型为 Throwable 即可。诸如 raiseErrorattempthandleErrorWithrecoverWith 等操作。只需确保将诸如 cats.implicits._ 等语法导入即可:

raiseError

构造一个 IO,推升指定的异常到处理序列中。

import cats.effect.IO

val boom: IO[Unit] = IO.raiseError(new Exception("boom"))
boom.unsafeRunSync()

attempt

将处理序列中任何可能存在的异常,物化到可对其进行处理的值空间。这类似于 try/catch 中的 catch 子句,是 IO.raiseError 的反函数。例:

import cats.effect.IO

val boom: IO[Unit] = IO.raiseError(new Exception("boom"))
boom.attempt.unsafeRunSync()

参考 MonadError 类型类以获得更多信息。

案例: 指数式递归重试

使用 IO,您可以轻松地对循环进行建模,重新估值计算,直到满足成功或其他条件为止。

例如这个例子演示如何实现指数(函数参数)的重试:

import cats.effect.{IO, Timer}

import scala.concurrent.duration._

def retryWithBackoff[A](ioa: IO[A], initialDelay: FiniteDuration, maxRetries: Int)
  (implicit timer: Timer[IO]): IO[A] = {

  ioa.handleErrorWith { error =>
    if (maxRetries > 0)
      IO.sleep(initialDelay) *> retryWithBackoff(ioa, initialDelay * 2, maxRetries - 1)
    else
      IO.raiseError(error)
  }
}

线程转移

IO 提供了一种函数转移shift)的能力,使您可以更好地控制操作的执行。

shift

请注意 IO.shift 函数有 2 个重载:

  • 一个以管理着线程池的 ContextShift 作为参数,以触发异步边界。
  • 另一个则将 Scala 的 ExecutionContext 作为线程池。

默认情况下,请使用前者,而将后者仅用于需要对线程池进行细粒度控制的应用。

默认情况下,Cats Effect 提供了用于管理线程池的 ContextShift[IO] 实例,但前提是在当前范围内存在 ExecutionContext 或使用 IOApp

import cats.effect.{ContextShift, IO}
import scala.concurrent.ExecutionContext.Implicits.global

implicit val contextShift: ContextShift[IO] = IO.contextShift(global)

我们可以在执行某些任务之前在 FlatMap 链中先引入异步边界:

val task = IO(println("task"))

IO.shift(contextShift).flatMap(_ => task)

请注意,ContextShift 值是从上下文隐式中获取的,因此您也可以写成以下操作:

IO.shift.flatMap(_ => task)

或者使用 Cats 语法:

IO.shift *> task
// 等效于
implicitly[ContextShift[IO]].shift *> task

或者,我们也可以在评估某个任务之后再指定异步边界:

task.flatMap(a => IO.shift.map(_ => a))

或使用 Cats 语法:

task <* IO.shift
// 等效于
task <* implicitly[ContextShift[IO]].shift

一个完整一些的示例:

import java.util.concurrent.Executors

import cats.effect.IO
import scala.concurrent.ExecutionContext

val cachedThreadPool = Executors.newCachedThreadPool()
val BlockingFileIO   = ExecutionContext.fromExecutor(cachedThreadPool)
implicit val Main = ExecutionContext.global

val ioa: IO[Unit] =
  for {
    _     <- IO(println("Enter your name: "))
    _     <- IO.shift(BlockingFileIO)
    name  <- IO(scala.io.StdIn.readLine())
    _     <- IO.shift(Main)
    _     <- IO(println(s"Welcome $name!"))
    _     <- IO(cachedThreadPool.shutdown())
  } yield ()

我们首先要求用户输入其名称,然后我们将线程转移到 BlockingFileIO 执行上下文,因为我们认为接下去的操作会长时间阻塞该线程,而我们不希望在主线程中发生这种情况执行。在昂贵的 IO 操作(readLine)返回响应后,我们将线程移回定义为隐式值的主执行上下文中,最后程序在控制台中显示一条消息并关闭线程池,以结束所有在主执行上下文中的操作。

shift 的另一个不太常见的应用是重置线程堆栈并将控制权交还给线程池。例如:

import cats.effect.{ContextShift, IO}
import scala.concurrent.ExecutionContext

implicit val contextShift: ContextShift[IO] = IO.contextShift(ExecutionContext.global)

lazy val doStuff = IO(println("stuff"))

lazy val repeat: IO[Unit] =
  for {
    _ <- doStuff
    _ <- IO.shift
    _ <- repeat
} yield ()

在此示例中,repeat 是一个运行时间很长的 IO(实际上是无限长的!),它将持续拥有底层线程资源直到永远。这可能会有问题,因此我们注入IO.shift 以将控制权重新切换回线程池,从而使其有机会重新安排执行时间并为别的代码提供更好的公平性。这种转移还会弹出线程堆栈,一直跳回到线程池,并有效地转移(trampolining)剩余的计算。尽管线程迁移不是完全必要的,但在某些情况下可能有助于减轻主线程池的使用。

因此,此功能具有四个重要的用例:

  • 将阻塞操作移出主计算池。
  • 防御性地将异步计算重新转移回主计算池。
  • 为了公平起见,将控制权交还给线程池。
  • 防止构造不正确的异步调用导致的堆栈溢出。

IO 已针对所有同步和异步衔接进行了优化。这意味着您可以在任意深度的递归函数中安全地调用 flatMap,而不必担心会摧毁堆栈。例如,您可以这样做:

import cats.effect.IO

def signal[A](a: A): IO[A] = IO.async(_(Right(a)))

def loop(n: Int): IO[Int] =
  signal(n).flatMap { x =>
    if (x > 0) loop(n - 1) else IO.pure(0)
  }

并行

由于在 Cats 库及其 IO 实例中引入了并行类型类,因此可以并行执行两个或更多给定的 IO。

注意:所有并行操作都需要在范围内存在隐式 ContextShift[IO](请参见ContextShift)。可以通过以下方法为您的范围提供 ContextShift 属性:

  1. 使用 IOApp 来构建您的程序。
  2. 通过使用函数 IO.contextShift(executionContext) 来为用户自定义 ContextShift

parMapN

以下例子可以并行运行任意数量的 IO,并允许您将一个函数应用于结果(如同 map 一样)。所有的 IO 都成功完成或失败后,它才完成处理:

import cats.effect.{ContextShift, IO}
import cats.implicits._

import scala.concurrent.ExecutionContext

implicit val contextShift: ContextShift[IO] = IO.contextShift(ExecutionContext.global)

val ioA = IO(println("Running ioA"))
val ioB = IO(println("Running ioB"))
val ioC = IO(println("Running ioC"))

// 确保在当前范围内存在隐式 ContextShift[IO]. 
val program = (ioA, ioB, ioC).parMapN { (_, _, _) => () }

program.unsafeRunSync()
//=> Running ioB
//=> Running ioC
//=> Running ioA
()

以下例子中,如果任何 IO 失败,则整个计算的结果将失败,而未完成的任务将被取消:

import cats.effect.{ContextShift, ExitCase, IO}
import cats.implicits._

import scala.concurrent.ExecutionContext
import scala.concurrent.duration._

implicit val contextShift: ContextShift[IO] = IO.contextShift(ExecutionContext.global)
implicit val timer = IO.timer(ExecutionContext.global)

val a = IO.raiseError[Unit](new Exception("boom")) <* IO(println("Running ioA"))
val b = (IO.sleep(1.second) *> IO(println("Running ioB")))
  .guaranteeCase {
    case ExitCase.Canceled => IO(println("ioB was canceled!"))
    case _ => IO.unit
  }

val parFailure = (a, b).parMapN { (_, _) => () }

parFailure.attempt.unsafeRunSync()
//=> ioB was canceled!
//=> java.lang.Exception: boom
//=>  ... 43 elided
()

以下例子由于其中一项任务立即失败了,另一项任务被取消计算并且立即返回,因此通过 parMapN 来进行并行运算在返回错误之前不会等待 10 秒:

import cats.effect.{ContextShift, Timer, IO}
import cats.implicits._

import scala.concurrent.ExecutionContext
import scala.concurrent.duration._

implicit val contextShift: ContextShift[IO] = IO.contextShift(ExecutionContext.global)
implicit val timer: Timer[IO] = IO.timer(ExecutionContext.global)

val ioA = IO.sleep(10.seconds) *> IO(println("Delayed!"))
val ioB = IO.raiseError[Unit](new Exception("dummy"))

(ioA, ioB).parMapN((_, _) => ())

parSequence

如果您有一个 IO 列表,并且希望用一个 IO 来存放结果列表,则可以使用parSequence 并行执行 IO 任务。

import cats.data.NonEmptyList
import cats.effect.{ContextShift, Timer, IO}
import cats.syntax.parallel._

import scala.concurrent.ExecutionContext 

// Needed for IO.start to do a logical thread fork
implicit val cs: ContextShift[IO] = IO.contextShift(ExecutionContext.global)
implicit val timer: Timer[IO] = IO.timer(ExecutionContext.global)

val anIO = IO(1)

val aLotOfIOs = 
  NonEmptyList.of(anIO, anIO)

val ioOfList = aLotOfIOs.parSequence

cats.Traverse.sequence 也可以同步执行此操作。

parTraverse

如果您有数据列表以及将每个项目转换为 IO 的方法,但是您只想要一个 IO 来存放所有结果,则可以使用 parTraverse 并行运行这些计算。

import cats.data.NonEmptyList
import cats.effect.{ContextShift, Timer, IO}
import cats.syntax.parallel._

import scala.concurrent.ExecutionContext 

// Needed for IO.start to do a logical thread fork
implicit val cs: ContextShift[IO] = IO.contextShift(ExecutionContext.global)
implicit val timer: Timer[IO] = IO.timer(ExecutionContext.global)

val results = NonEmptyList.of(1, 2, 3).parTraverse { i =>
  IO(i)
}

cats.Traverse.traverse 用于同步运行每个步骤。

“不安全” 的操作

在前面的示例中,我们几乎一直在使用一些“不安全”的操作,但是我们从未对它们进行任何解释,因此这里进行说明。所有带有 unsafe 前缀的操作都是不纯函数,并且会产生副作用(例如 Haskell 具有不安全 PerformIO)。但不要害怕这个名字!您应该使用诸如 mapflatMap 之类的函数以单子形式编写程序以组合其他函数,并且理想情况下,应仅在程序的最后一次调用这些不安全的操作之一。

unsafeRunSync

以不纯的副作用来运行封装的 effect 以产生出结果。

如果计算中的任何组件是异步的,则当前线程将被阻塞,并等待异步计算的结果。在 JavaScript 中,将抛出异常以避免生成死锁。默认情况下,此阻塞将是无时间界的,如果要将线程块的执行限制在某个固定时间内,请使用unsafeRunTimed来代替。

effect 中抛出(被挂起)的任何异常将在评估期间重新抛出。

IO(println("Sync!")).unsafeRunSync()
// Sync!

unsafeRunAsync

以不纯的副作用运行封装的 effect,将其中的结果传递给回调函数。

effect 中引发的任何异常都将传递给回调中的 Either。回调将最多调用一次。请注意,如果构造一个永不返,且又永远不会阻塞线程的 IO,尝试使用此方法去执行该 IO 会导致永远不会调用回调的情况。

IO(println("Async!")).unsafeRunAsync(_ => ())
// Async!

unsafeRunCancelable

运行一个 IO,并将封装的 effect 的结果传递给给定的回调,但是允许这个过程可以被打断。

IO(println("Potentially cancelable!")).unsafeRunCancelable(_ => ())
// Potentially cancelable!
// res59: cats.effect.package.CancelToken[IO] = Suspend(
//   cats.effect.internals.IOConnection$Impl$$Lambda$19396/541685241@bdae020
// )

unsafeRunTimed

unsafeRunSync 类似,增加了在等待异步结果时的有限的阻塞等待时间。

请注意,limit 参数并不限制总计算时间,而是充当任何单个异步块的时间上限。因此,如果您对仅由同步操作组成的 IO 传递了5秒的限制,则评估可能要花费5秒以上的时间!

此外,如果您对由多个合并(join)在一起的异步操作组成的 IO 传递 5 秒的限制,则计算可能最多需要 n*5 秒,其中 n 是加入的异步操作的数量。

一旦达到异步阻塞时限,评估将“立即”中止并且不返回任何内容。

请注意,此功能仅用于测试目的,它永远不应该出现在您的生产代码中!如果要实现超时或类似的方法,绝对不适合使用该功能。如果您需要那种功能,则应该使用第三方库(例如 fs2 或 Monix)。

import scala.concurrent.duration._

IO(println("Timed!")).unsafeRunTimed(5.seconds)

unsafeToFuture

评估 effect 并将结果保存在 Future 中。Evaluates the effect and produces the result in a Future.

这与 unsafeRunAsync 相似,因为它以非阻塞的风格以副作用的方式对 IO 进行评估,但是它使用的是 Future 而不是显式回调。仅当您需要与遗留的 Scala 代码进行互操作时,才应真正使用此功能。

IO("Gimme a Future!").unsafeToFuture()

最佳实践

本节介绍了使用IO的一些最佳实践:

保持粒度

最好保持较清晰的粒度,因此请不要执行以下操作:

IO {
  readingFile
  writingToDatabase
  sendBytesOverTcp
  launchMissiles
}

在 FP 中,我们欢迎以推理的方式组建程序,并且由于 IO 是 Monad,因此您可以利用 for-comprehension 将较小的程序组合成较大的程序。例如:

val program =
  for {
    data <- readFile
    _    <- writeToDatabase(data)
    _    <- sendBytesOverTcp(data)
    _    <- launchMissiles
  } yield ()

comprehension 中的每个步骤都是一个小程序,生成的程序是所有这些小步骤彼此一起组成。 最终形成合成的 IO。

在 map / flatMap 中使用纯函数

当使用 mapflatMap 时,不建议传递具有 side effect 的函数,因为 map 函数也应该是纯函数。所以应该避免:

IO.pure(123).map(n => println(s"NOT RECOMMENDED! $n"))

也应该避免以下这种情况,因为 side effect 不会在返回的 IO 值中挂起:

IO.pure(123).flatMap { n =>
  println(s"NOT RECOMMENDED! $n")
  IO.unit
}

正确的方法是这样的:

IO.pure(123).flatMap { n =>
  // 正确地挂起副作用
  IO(println(s"RECOMMENDED! $n"))
}

请注意,就 IO 的实际行为而言,类似 IO.pure(x).map(f) 等同于 IO(f(x)),而 IO.pure(x).flatMap(f) 等同与 IO.suspend(f(x))

但是您不应该这样做,因为 Sync 类型类中没有描述这种行为的法则,而那些法则是您的代码得到保障的唯一依据。例如,上述等效性将来可能会在错误处理的时候被打破。因此,出于安全原因,目前应该保持这种写法,同时您可以其视为将来可能更改的实现细节。

请坚持使用纯函数。

Leave a Reply
Your email address will not be published.
*
*

BACK TO TOP