Queue

Queue 是一个基于 ZIO 的轻量级的,驻扎在内存的队列,具有可组合性,并且支持透明的背压。它是完全异步(无锁或无阻塞)的,纯函数且类型安全的。

Queue[A] 包含的值的类型为 A,并且具有两个基本操作:offer,将 A 放入队列中,和 take, 删除并返回队列中最旧的值。

import zio._

val res: UIO[Int] = for {
  queue <- Queue.bounded[Int](100)
  _ <- queue.offer(1)
  v1 <- queue.take
} yield v1

新建一个队列

一个 Queue 可以是有界的(容量有限)或无界的。

当队列已满时,有几种策略可用于处理新值:

  • 对默认的有界(bounded)队列是反压:填满后,任何提供数据的纤程都将被挂起,直到队列能够添加新内容为止;
  • 下降(dropping)队列当队列满时会丢弃新的数据;
  • 滑动(sliding)队列满时会丢弃最陈旧的数据。

建立一个支持背压的有界队列:

val boundedQueue: UIO[Queue[Int]] = Queue.bounded[Int](100)

建立一个下降队列:

val droppingQueue: UIO[Queue[Int]] = Queue.dropping[Int](100)

建立一个滑动队列:

val slidingQueue: UIO[Queue[Int]] = Queue.sliding[Int](100)

建立一个无界队列:

val unboundedQueue: UIO[Queue[Int]] = Queue.unbounded[Int]

向队列添加数据

向队列添加值的最简单方法是 offer

val res1: UIO[Unit] = for {
  queue <- Queue.bounded[Int](100)
  _ <- queue.offer(1)
} yield ()

使用背压队列时,如果队列已满,offer 可能会暂时挂起:您可以使用 fork 让其在其它纤程中等待。

val res2: UIO[Unit] = for {
  queue <- Queue.bounded[Int](1)
  _ <- queue.offer(1)
  f <- queue.offer(1).fork // will be suspended because the queue is full
  _ <- queue.take
  _ <- f.join
} yield ()

还可以使用 offerAll 一次添加多个值:

val res3: UIO[Unit] = for {
  queue <- Queue.bounded[Int](100)
  items = Range.inclusive(1, 10).toList
  _ <- queue.offerAll(items)
} yield ()

从队列中消费数据

take 操作从队列中删除最旧的数据并返回它。如果队列为空,它将时挂起,直到新的数据添加到队列后才继续。与 offer 一样,您可以使用 fork 让其在其他纤程中等待新值。

val oldestItem: UIO[String] = for {
  queue <- Queue.bounded[String](100)
  f <- queue.take.fork // will be suspended because the queue is empty
  _ <- queue.offer("something")
  v <- f.join
} yield v

您可以通过 poll 来消费最旧的数据。如果队列为空,你会得到 None,否则数据将被包装在 Some 返回。

val polled: UIO[Option[Int]] = for {
  queue <- Queue.bounded[Int](100)
  _ <- queue.offer(10)
  _ <- queue.offer(20)
  head <- queue.poll
} yield head

可以使用 takeUpTo 一次消费多个数据。如果队列中没有足够的数据要退回,则它将返回所有数据,而无需等待更多 offer

val taken: UIO[List[Int]] = for {
  queue <- Queue.bounded[Int](100)
  _ <- queue.offer(10)
  _ <- queue.offer(20)
  list  <- queue.takeUpTo(5)
} yield list

同样,您可以使用 takeAll 一次获得所有数据。它也无需等待就返回(如果队列为空,则为空列表)。

val all: UIO[List[Int]] = for {
  queue <- Queue.bounded[Int](100)
  _ <- queue.offer(10)
  _ <- queue.offer(20)
  list  <- queue.takeAll
} yield list

关闭队列

可以通过 shutdown 来中断(interrupt exception)所有处于 offer*take* 而挂起的纤程。它还将清空队列,并让所有未来的 offer* 和 take* 调用立刻结束(interrupt exception)。

val takeFromShutdownQueue: UIO[Unit] = for {
  queue <- Queue.bounded[Int](3)
  f <- queue.take.fork
  _ <- queue.shutdown // will interrupt f
  _ <- f.join // Will terminate
} yield ()

您可以在队列关闭时使用 awaitShutdown 执行 effect。它将等待直到队列被关闭。如果队列已经关闭,它将立即返回。

val awaitShutdown: UIO[Unit] = for {
  queue <- Queue.bounded[Int](3)
  p <- Promise.make[Nothing, Boolean]
  f <- queue.awaitShutdown.fork
  _ <- queue.shutdown
  _ <- f.join
} yield ()

转换队列Transforming queues

实际上,Queue[A]ZQueue[Any, Any, Nothing, Nothing, A, A] 类型的别名。完整版本的签名为:

trait ZQueue[RA, RB, EA, EB, A, B]

它的含义是:

  • 这个队列接收 A 类型的值。需要 RA 类型的环境支持入队操作,并且可能会因EA 类型的错误而失败;
  • 队列将产生 B 类型输出值。需要 RB 型环境支持出队操作,并可能因 EB 型错误而失败。

请注意,基本的 Queue[A] 不会失败也不需要任何环境来支持进行任何操作。

基于输入和输出的不同类型参数,可以实现各种不同的队列组合:

ZQueue#map

可以将队列的输出映射(到不同类型):

val mapped: UIO[String] = 
  for {
    queue  <- Queue.bounded[Int](3)
    mapped = queue.map(_.toString)
    _      <- mapped.offer(1)
    s      <- mapped.take
  } yield s

ZQueue#mapM

我们还可以使用一个效果(effectful)函数来映射输出。例如,我们可以为每个元素加上出列的时间戳:

import java.util.concurrent.TimeUnit
import zio.clock._

val currentTimeMillis = currentTime(TimeUnit.MILLISECONDS)

val annotatedOut: UIO[ZQueue[Any, Clock, Nothing, Nothing, String, (Long, String)]] =
  for {
    queue <- Queue.bounded[String](3)
    mapped = queue.mapM { el =>
      currentTimeMillis.map((_, el))
    }
  } yield mapped

ZQueue#contramapM

mapM 相似,我们也可以在元素入列时将效果(effectful)函数应用于元素。此例将使用其入队时间戳注释元素:

val annotatedIn: UIO[ZQueue[Clock, Any, Nothing, Nothing, String, (Long, String)]] =
  for {
    queue <- Queue.bounded[(Long, String)](3)
    mapped = queue.contramapM { el: String =>
      currentTimeMillis.map((_, el))
    }
  } yield mapped

此例中的队列与上一个例子中的队列具有相同的类型((Long, String)),但是当元素排队时,时间戳附加到元素上。这反映在排队入队所需的环境类型中。

一个完整的示例:我们可以将此队列(contramapM)与 mapM 结合起来,以计算元素保留在队列中的时长:

import zio.duration._

val timeQueued: UIO[ZQueue[Clock, Clock, Nothing, Nothing, String, (Duration, String)]] =
  for {
    queue <- Queue.bounded[(Long, String)](3)
    enqueueTimestamps = queue.contramapM { el: String =>
      currentTimeMillis.map((_, el))
    }
    durations = enqueueTimestamps.mapM { case (enqueueTs, el) =>
      currentTimeMillis
        .map(dequeueTs => ((dequeueTs - enqueueTs).millis, el))
    }
  } yield durations

ZQueue#bothWith

我们也可以将两个队列组合成一个队列,该(组合)队列广播 offer (到两个队列中)并(同时)从两个队列中接收(take)消息:

val fromComposedQueues: UIO[(Int, String)] = 
  for {
    q1       <- Queue.bounded[Int](3)
    q2       <- Queue.bounded[Int](3)
    q2Mapped =  q2.map(_.toString)
    both     =  q1.bothWith(q2Mapped)((_, _))
    _        <- both.offer(1)
    iAndS    <- both.take
    (i, s)   =  iAndS
  } yield (i, s)

附加资料

Leave a Reply
Your email address will not be published.
*
*

BACK TO TOP