Queue
是一个基于 ZIO 的轻量级的,驻扎在内存的队列,具有可组合性,并且支持透明的背压。它是完全异步(无锁或无阻塞)的,纯函数且类型安全的。
Queue[A]
包含的值的类型为 A,并且具有两个基本操作:offer
,将 A 放入队列中,和 take
, 删除并返回队列中最旧的值。
import zio._
val res: UIO[Int] = for {
queue <- Queue.bounded[Int](100)
_ <- queue.offer(1)
v1 <- queue.take
} yield v1
新建一个队列
一个 Queue
可以是有界的(容量有限)或无界的。
当队列已满时,有几种策略可用于处理新值:
- 对默认的
有界(bounded)队列
是反压:填满后,任何提供数据的纤程都将被挂起,直到队列能够添加新内容为止; 下降(dropping)队列
当队列满时会丢弃新的数据;- 当
滑动(sliding)队列
满时会丢弃最陈旧的数据。
建立一个支持背压的有界队列:
val boundedQueue: UIO[Queue[Int]] = Queue.bounded[Int](100)
建立一个下降队列:
val droppingQueue: UIO[Queue[Int]] = Queue.dropping[Int](100)
建立一个滑动队列:
val slidingQueue: UIO[Queue[Int]] = Queue.sliding[Int](100)
建立一个无界队列:
val unboundedQueue: UIO[Queue[Int]] = Queue.unbounded[Int]
向队列添加数据
向队列添加值的最简单方法是 offer
:
val res1: UIO[Unit] = for {
queue <- Queue.bounded[Int](100)
_ <- queue.offer(1)
} yield ()
使用背压队列时,如果队列已满,offer
可能会暂时挂起:您可以使用 fork
让其在其它纤程中等待。
val res2: UIO[Unit] = for {
queue <- Queue.bounded[Int](1)
_ <- queue.offer(1)
f <- queue.offer(1).fork // will be suspended because the queue is full
_ <- queue.take
_ <- f.join
} yield ()
还可以使用 offerAll
一次添加多个值:
val res3: UIO[Unit] = for {
queue <- Queue.bounded[Int](100)
items = Range.inclusive(1, 10).toList
_ <- queue.offerAll(items)
} yield ()
从队列中消费数据
take
操作从队列中删除最旧的数据并返回它。如果队列为空,它将时挂起,直到新的数据添加到队列后才继续。与 offer
一样,您可以使用 fork 让其在其他纤程中等待新值。
val oldestItem: UIO[String] = for {
queue <- Queue.bounded[String](100)
f <- queue.take.fork // will be suspended because the queue is empty
_ <- queue.offer("something")
v <- f.join
} yield v
您可以通过 poll
来消费最旧的数据。如果队列为空,你会得到 None
,否则数据将被包装在 Some
返回。
val polled: UIO[Option[Int]] = for {
queue <- Queue.bounded[Int](100)
_ <- queue.offer(10)
_ <- queue.offer(20)
head <- queue.poll
} yield head
可以使用 takeUpTo
一次消费多个数据。如果队列中没有足够的数据要退回,则它将返回所有数据,而无需等待更多 offer
。
val taken: UIO[List[Int]] = for {
queue <- Queue.bounded[Int](100)
_ <- queue.offer(10)
_ <- queue.offer(20)
list <- queue.takeUpTo(5)
} yield list
同样,您可以使用 takeAll
一次获得所有数据。它也无需等待就返回(如果队列为空,则为空列表)。
val all: UIO[List[Int]] = for {
queue <- Queue.bounded[Int](100)
_ <- queue.offer(10)
_ <- queue.offer(20)
list <- queue.takeAll
} yield list
关闭队列
可以通过 shutdown
来中断(interrupt exception)所有处于 offer*
或 take*
而挂起的纤程。它还将清空队列,并让所有未来的 offer* 和 take* 调用立刻结束(interrupt exception)。
val takeFromShutdownQueue: UIO[Unit] = for {
queue <- Queue.bounded[Int](3)
f <- queue.take.fork
_ <- queue.shutdown // will interrupt f
_ <- f.join // Will terminate
} yield ()
您可以在队列关闭时使用 awaitShutdown 执行 effect。它将等待直到队列被关闭。如果队列已经关闭,它将立即返回。
val awaitShutdown: UIO[Unit] = for {
queue <- Queue.bounded[Int](3)
p <- Promise.make[Nothing, Boolean]
f <- queue.awaitShutdown.fork
_ <- queue.shutdown
_ <- f.join
} yield ()
转换队列Transforming queues
实际上,Queue[A]
是 ZQueue[Any, Any, Nothing, Nothing, A, A]
类型的别名。完整版本的签名为:
trait ZQueue[RA, RB, EA, EB, A, B]
它的含义是:
- 这个队列接收
A
类型的值。需要RA
类型的环境支持入队操作,并且可能会因EA
类型的错误而失败; - 队列将产生
B
类型输出值。需要RB
型环境支持出队操作,并可能因EB
型错误而失败。
请注意,基本的 Queue[A] 不会失败也不需要任何环境来支持进行任何操作。
基于输入和输出的不同类型参数,可以实现各种不同的队列组合:
ZQueue#map
可以将队列的输出映射(到不同类型):
val mapped: UIO[String] =
for {
queue <- Queue.bounded[Int](3)
mapped = queue.map(_.toString)
_ <- mapped.offer(1)
s <- mapped.take
} yield s
ZQueue#mapM
我们还可以使用一个效果(effectful)函数来映射输出。例如,我们可以为每个元素加上出列的时间戳:
import java.util.concurrent.TimeUnit
import zio.clock._
val currentTimeMillis = currentTime(TimeUnit.MILLISECONDS)
val annotatedOut: UIO[ZQueue[Any, Clock, Nothing, Nothing, String, (Long, String)]] =
for {
queue <- Queue.bounded[String](3)
mapped = queue.mapM { el =>
currentTimeMillis.map((_, el))
}
} yield mapped
ZQueue#contramapM
与 mapM
相似,我们也可以在元素入列时将效果(effectful)函数应用于元素。此例将使用其入队时间戳注释元素:
val annotatedIn: UIO[ZQueue[Clock, Any, Nothing, Nothing, String, (Long, String)]] =
for {
queue <- Queue.bounded[(Long, String)](3)
mapped = queue.contramapM { el: String =>
currentTimeMillis.map((_, el))
}
} yield mapped
此例中的队列与上一个例子中的队列具有相同的类型((Long, String)
),但是当元素排队时,时间戳附加到元素上。这反映在排队入队所需的环境类型中。
一个完整的示例:我们可以将此队列(contramapM
)与 mapM
结合起来,以计算元素保留在队列中的时长:
import zio.duration._
val timeQueued: UIO[ZQueue[Clock, Clock, Nothing, Nothing, String, (Duration, String)]] =
for {
queue <- Queue.bounded[(Long, String)](3)
enqueueTimestamps = queue.contramapM { el: String =>
currentTimeMillis.map((_, el))
}
durations = enqueueTimestamps.mapM { case (enqueueTs, el) =>
currentTimeMillis
.map(dequeueTs => ((dequeueTs - enqueueTs).millis, el))
}
} yield durations
ZQueue#bothWith
我们也可以将两个队列组合成一个队列,该(组合)队列广播 offer
(到两个队列中)并(同时)从两个队列中接收(take
)消息:
val fromComposedQueues: UIO[(Int, String)] =
for {
q1 <- Queue.bounded[Int](3)
q2 <- Queue.bounded[Int](3)
q2Mapped = q2.map(_.toString)
both = q1.bothWith(q2Mapped)((_, _))
_ <- both.offer(1)
iAndS <- both.take
(i, s) = iAndS
} yield (i, s)