【Rust通道】消息在飞翔:Rust通道机制深度揭秘
✨✨ 欢迎大家来到景天科技苑✨✨
🎈🎈 养成好习惯,先赞后看哦~🎈🎈
🏆 作者简介:景天科技苑
🏆《头衔》:大厂架构师,华为云开发者社区专家博主,阿里云开发者社区专家博主,CSDN全栈领域优质创作者,掘金优秀博主,51CTO博客专家等。
🏆《博客》:Rust开发,Python全栈,Golang开发,云原生开发,PyQt5和Tkinter桌面开发,小程序开发,人工智能,js逆向,App逆向,网络系统安全,数据分析,Django,fastapi,flask等框架,云原生K8S,linux,shell脚本等实操经验,网站搭建,数据库等分享。所属的专栏:Rust高性能并发编程
景天的主页:景天科技苑
文章目录
- Rust通道
-
- 一、mpsc
-
- 1.1 异步通道
- 1.2 同步通道
- 1.3 缓冲为0的同步通道
- 1.4 非阻塞接收与遍历
- 二、crossbeam-channel
-
- 2.1 基本概念
- 2.2 添加依赖
- 2.3 基本用法
-
- 2.3.1 无界通道
- 2.3.2 有界通道
- 2.4 非阻塞接收 / 发送
- 2.5 超时收发
- 2.6 选择器 select!
- 2.7 关闭通道
- 2.8 多个消费者
- 2.9 与 std::sync::mpsc 对比
- 三、flume
-
- 3.1 基本概念
- 3.2 同步使用示例
-
- 3.2.1 有界通道(阻塞)
- 3.2.2 无界通道(不会阻塞)
- 3.3 非阻塞 API
- 3.4 异步使用
- 3.5 通道关闭与 Drop 行为
-
- 3.5.1 自动关闭通道
- 3.6 选择操作
- 3.7 cloning 与多个消费者
- 3.8 注意事项
- 四、oneshot
-
- 4.1 依赖
- 4.2 基本原理
- 4.3 基本用法示例(同步代码)
- 4.4 异步使用(结合 async-std 或 tokio)
-
- 4.4.1 异步接收 async fn recv() 接口
- 4.4.2 使用 tokio runtime
- 4.5 Sender 与 Receiver 接口
- 4.6 常见错误处理
- 4.7 对比 tokio::sync::oneshot
- 4.8 应用场景
- 4.9 示例:任务完成通知
- 4.10 总结
Rust通道
Rust 中的通道 (channel) 是一种用于不同线程之间传递消息的机制。主要有以下几个特点:
• 通道提供了一种在线程之间安全传递数据的方式。向通道发送数据不会导致竞争条件或死锁。
通道运用了 Rust 的所有权系统来确保消息只被一个接收者获取。当一个值通过通道发送时, 发送者会失去这个值的所有权。
• 通道可以设置为同步的或异步的。同步通道在没有接收者准备好时会阻塞发送者。异步通道则会在后台缓冲未处理的消息。
• 通道可以是有边界的或无边界的。有边界意味着通道有一个固定长度的缓冲区, 当缓冲区填满时发送会被阻塞。无边界通道则没有这种限制。
• 通道是泛型的, 可以传递任何实现了 Send 和 Sync trait 的数据。通道最适合在不同线程间传递较大的数据, 或者作为线程安全的任务分配机制。
对于只传递少量数据的情况, 原子类型或 Mutex 可能更高效。
通道在 Rust 中被广泛应用于各种多线程、并发场景中。正确使用可以大大简化多线程编程的复杂性和风险。
一、mpsc
Rust 的标准库提供了一个 std::sync::mpsc 模块,用于实现多生产者、单消费者的通道。
mpsc(multiple producer, single consumer) 是一种特定类型的通道, 用于在多个发送者和单个接收者之间传递消息。它有以下几个主要特点:
• mpsc 通道只允许有一个接收者。这简化了所有权传递, 因为每条消息只能被唯一获取一次。
• 多个发送者可以同时向一个 mpsc 通道发送消息。通道会自动处理同步并发写访问。
• mpsc 通道既支持同步,也支持异步,同步通道需要设置边界(缓冲大小)。
• 通过 mpsc 发送的值必须实现 Send trait。这确保发送的类型可以安全的在线程间移动。
• 接收端可以通过轮询或者等待接收消息。try_recv 不会阻塞,recv 会阻塞直到有消息可用。
• mpsc 通道在发送端关闭后, 接收端会收到一个 None 消息, 表示通道的生命周期结束。
• mpsc 通道通常用来构建线程安全的生产者-消费者模式。多个生产者通过通道发送消息, 一个消费者接收处理。
• 由于只有单个接收者,mpsc 通道是最高效的通道实现之一。它们的吞吐量可以达到很高的水平。
具体来说,这个模块模块提供了基于消息的通信, 具体定义了三种类型: - Sender - SyncSender - Receiver
Sender 或 SyncSender 用于向 Receiver 发送数据。两种 sender 都是可克隆的 (多生产者), 所以多个线程可以同时向一个 receiver(单消费者) 发送。
这些通道有两种类型:
- 异步的, 无限缓冲的通道。channel 函数会返回一个 (Sender,Receiver) 元组, 其中所有发送都是异步的 (永不阻塞)。这个通道在概念上具有无限的缓冲区。
- 同步的, 有边界的通道。sync_channel 函数会返回一个 (SyncSender,Receiver) 元组, 用于挂起消息的存储由一个固定大小的预分配缓冲区组成。
所有发送都是同步的, 会阻塞直到有缓冲区空间可用。注意边界大小可以设置为 0, 这会使通道变成一个 ‘‘约定” 通道, 每个发送方原子地把一条消息传给接收方。
std::sync::mpsc 提供两个主要的通道类型:
异步通道(无界):通过 mpsc::channel() 创建,发送端不阻塞。
同步通道(有界):通过 mpsc::sync_channel(capacity) 创建,缓冲区满时发送会阻塞。
通道由一对端点组成:
Sender:发送端,支持 clone 多生产者。
Receiver:接收端,仅允许一个线程接收。
总之,mpsc 模块通过 Sender、SyncSender 和 Receiver 三种类型的通道, 提供了多生产者单消费者、异步和同步、无限缓冲和有边界缓冲等不同形式的 FIFO 队列通信机制。
1.1 异步通道
异步通道(无界):通过 mpsc::channel() 创建,发送端不阻塞。
下面是一个单生产者 单消费者的例子:
use std::sync::mpsc;use std::thread;fn main() { // 创建一个通道,用于在两个线程之间发送消息 let (sender, receiver) = mpsc::channel(); // 启动一个生产者线程 thread::spawn(move || { let message = \"Hello from the producer!\"; // 发送消息到通道 sender.send(message).expect(\"Failed to send message\"); }); // 主线程作为消费者,接收来自生产者线程的消息 let received_message = receiver.recv().expect(\"Failed to receive message\"); println!(\"Received message: {}\", received_message);}
在这个例子中,我们使用 std::sync::mpsc::channel() 创建了一个通道,其中 sender用于发送消息,receiver 用于接收消息。
我们在一个新的线程中启动了生产者,它发送了一条消息,而主线程作为消费者接收并输出了这条消息。
通道是一种线程安全的方式,确保多个线程之间的消息传递不会引发数据竞争等问题。
在实际应用中,你可以使用通道来实现不同线程之间的数据传递和同步。
以下是一个使用 Rust 的 std::sync::mpsc 实现 多生产者、单消费者模型的例子。
在这个例子中,多个线程将消息发送到一个通道,而单个线程从通道中接收这些消息。
use std::sync::mpsc;use std::thread;fn main() { // 创建一个多生产者单消费者信道 let (sender, receiver) = mpsc::channel(); // 启动三个生产者线程 for i in 0..3 { let tx = sender.clone(); // 克隆发送端。每个线程都有自己独立的发送端 thread::spawn(move || { tx.send(i).expect(\"Failed to send message\"); }); } // 主线程作为消费者,接收来自生产者的消息 for _ in 0..3 { let received_message = receiver.recv().expect(\"Failed to receive message\"); println!(\"Received message: {}\", received_message); }}
1.2 同步通道
同步通道(有界):通过 mpsc::sync_channel(capacity) 创建,缓冲区满时发送会阻塞。
以下是一个使用同步通道进行多线程同步的简单例子:
📌 背景:mpsc::sync_channel
sync_channel(cap) 是有容量限制的通道,发送端 send() 会在通道已满时阻塞,直到有消息被接收。
通道一端全部关闭后,另一端的 recv() 会返回 Err(RecvError)。
特点:
缓冲区满时,发送会阻塞。
适合实现“背压”机制,限制生产速度。
use std::sync::mpsc::sync_channel;use std::thread;fn main() { // 创建一个同步通道,容量为3 let (tx, rx) = sync_channel(3); // 在子线程中发送消息 for _ in 0..3 { let tx = tx.clone(); // cloned tx dropped within thread thread::spawn(move || tx.send(\"ok\").unwrap()); } //如果不写 drop(tx),主线程还持有一个发送端 tx,这意味着: //即使所有子线程都结束了,通道的发送端仍然未全部关闭。 //所以,rx.recv() 永远不会返回 Err,而是一直阻塞等待新的消息。 //从而导致:主线程的 while let 死循环,不会退出! drop(tx); while let Ok(msg) = rx.recv() { println!(\"{msg}\"); } println!(\"mpsc_example4 completed\");}
✅ drop(tx) 的作用
drop(tx) 显式地丢弃主线程持有的发送端。这样:
所有发送端(包括主线程和子线程的 tx)都会被 drop。
子线程结束后,整个发送端都关闭。
接收端 rx.recv() 在收到最后一个消息后会返回 Err。
while let Ok(msg) = rx.recv() 退出循环,程序正常结束。
1.3 缓冲为0的同步通道
以下是一个同步通道缓冲为 0 的简单例子:
use std::sync::mpsc;use std::thread;fn main() { // 创建一个缓冲为0的通道 let (sender, receiver) = mpsc::sync_channel::<i32>(0); //创建一个容器 let mut container = Vec::new(); // 启动生产者线程 let producer = thread::spawn(move || { for i in 0..5 { sender.send(i).expect(\"Failed to send message\"); println!(\"Sent message: {}\", i); } }); container.push(producer); // 启动消费者线程 let consumer = thread::spawn(move || { for _ in 0..5 { let received_message = receiver.recv().expect(\"Failed to receive message\"); println!(\"Received message: {}\", received_message); } }); container.push(consumer); // 等待所有线程结束 for handle in container { handle.join().unwrap(); }}
可以看到,生产者发送一个,消费者就接收一个
1.4 非阻塞接收与遍历
try_recv() 非阻塞接收:
match rx.try_recv() { Ok(msg) => println!(\"Got: {}\", msg), Err(_) => println!(\"No message available\"),}
recv_timeout() 超时接收:
use std::time::Duration;match rx.recv_timeout(Duration::from_secs(2)) { Ok(msg) => println!(\"Got: {}\", msg), Err(_) => println!(\"Timeout or disconnected\"),}
for 循环接收:
for msg in rx { println!(\"Got: {}\", msg);}
通道关闭时循环自动结束。
既然有 mpsc,那么是不是有 spmc,即单生产者多消费者, 也就是广播的功能;是不是有mpmc, 即多生产者多消费者的功能呢?
是不是有 spsc, 即单生产者单消费者的功能呢?
答案是有的,但是不是标准库提供的,而是第三方库提供的,比如 crossbeam-channel,
这个库提供了 mpsc、spmc、mpmc、bounded、unbounded 等多种通道类型,可以满足不同的需求。
而对于 spsc, 同步通道缓冲区为零就可以满足需求,当然也有一个专门的类型叫做 oneshot 的,专门实现这个功能。
接下来我会介绍一些知名的通道库,比如 crossbeam-channel、flume、tokio、crossfire 等,这些库都是非常优秀的,可以满足不同的需求。
二、crossbeam-channel
crossbeam-channel 是 Rust 中 crossbeam 库的一部分,提供了多生产者多消费者(MPMC) 的通道(channel)实现,
比标准库中的 std::sync::mpsc 更高效且功能更丰富,常用于多线程间通信。
2.1 基本概念
crossbeam-channel 提供了两类通道:
无界通道(unbounded):无限容量,发送不会阻塞;
有界通道(bounded):有固定容量,满时发送会阻塞。
支持的特性:
多发送者、多接收者;
非阻塞发送/接收;
超时与选择器(select);
关闭通道后通知;
高性能(比标准库更快);
2.2 添加依赖
# Cargo.toml[dependencies]crossbeam-channel = \"0.5.15\"
2.3 基本用法
2.3.1 无界通道
use crossbeam_channel::unbounded;use std::thread;fn main() { // 创建无界通道 let (sender, receiver) = unbounded(); // 启动一个新线程,发送消息 thread::spawn(move || { sender.send(\"hello\").unwrap(); }); // 在主线程中接收消息 let msg = receiver.recv().unwrap(); println!(\"Received: {}\", msg);}
2.3.2 有界通道
use crossbeam_channel::bounded;use std::thread;fn main() { // 创建一个容量为2的有界通道 let (sender, receiver) = bounded(2); // 容量为2 sender.send(1).unwrap(); sender.send(2).unwrap(); // 第三个发送会阻塞直到有一个元素被接收 thread::spawn(move || { sender.send(3).unwrap(); }); //主线程接收 for _ in 0..3 { let val = receiver.recv().unwrap(); println!(\"Got: {}\", val); }}
2.4 非阻塞接收 / 发送
use crossbeam_channel::unbounded;fn main() { let (s, r) = unbounded::<i32>(); assert!(s.try_send(1).is_ok()); assert_eq!(r.try_recv(), Ok(1)); assert_eq!(r.try_recv(), Err(crossbeam_channel::TryRecvError::Empty));}
2.5 超时收发
use crossbeam_channel::bounded;use std::time::Duration;fn main() { //创建无缓冲通道 let (s, r) = bounded::<i32>(0); //创建子线程发送数据,等待2秒后发送数据 std::thread::spawn(move || { std::thread::sleep(Duration::from_secs(2)); s.send(42).unwrap(); }); //接收数据,等待1秒超时 match r.recv_timeout(Duration::from_secs(1)) { Ok(v) => println!(\"Received: {}\", v), Err(e) => println!(\"Timeout: {}\", e), }}
1秒就超时,程序等待了2秒才发送,因此超时
2.6 选择器 select!
支持多个通道监听,是 crossbeam-channel 的强大功能。
use crossbeam_channel::{ unbounded, select };use std::thread;use std::time::Duration;fn main() { let (s1, r1) = unbounded(); let (s2, r2) = unbounded(); thread::spawn(move || { std::thread::sleep(Duration::from_secs(2)); s1.send(\"from s1\").unwrap(); }); thread::spawn(move || { s2.send(\"from s2\").unwrap(); }); //select! 哪个线程先发送,就先接收哪个线程的发送 select! { recv(r1) -> msg => println!(\"Got r1: {:?}\", msg), recv(r2) -> msg => println!(\"Got r2: {:?}\", msg), }}
支持的操作有:
recv(channel)
send(channel, value)
default => {…}(可选兜底)
after(duration) => {…}(超时)
基本语法结构
select! { recv(receiver1) -> msg => { /* handle msg */ }, send(sender2, val) => { /* sent val */ }, default => { /* fallback if nothing is ready */ }, after(Duration::from_secs(2)) => { /* timeout case */ },}
每一分支的语义:
分支语法 含义
recv(chan) -> msg => {…} 尝试从通道接收消息,成功后绑定为 msg
send(chan, val) => {…} 尝试向通道发送 val
after(duration) => {…} 超时等待 duration 后执行
default => {…} 所有分支都不就绪时立即执行此分支
🧪 执行行为说明(核心机制)
公平选择:select! 会在多个就绪分支中随机选择一个执行(避免偏向第一个)。
阻塞等待:如果没有 default 或 after,且所有通道都不就绪,则阻塞等待。
一次选择一次执行:执行完一个分支后退出 select!,不会再匹配其他分支。
语法限制:不能匹配多个 recv() 到同一个变量名;否则会编译失败。
2.7 关闭通道
通道在所有 Sender 被 drop 后,接收端自动关闭:
use crossbeam_channel::unbounded;fn main() { let (s, r) = unbounded::<i32>(); drop(s); // 所有发送者都被关闭 match r.recv() { Ok(v) => println!(\"Received: {}\", v), Err(_) => println!(\"Channel closed\"), }}
也可以手动关闭(但 crossbeam-channel 本身不提供 close 方法,需 drop)。
2.8 多个消费者
多个线程都可以 clone Receiver:
use crossbeam_channel::unbounded;use std::thread;fn main() { //创建无界通道 let (s, r) = unbounded(); //克隆接收者 let r1 = r.clone(); let r2 = r.clone(); //创建两个线程,在两个线程中分别接收消息 thread::spawn(move || { println!(\"Thread 1 got: {:?}\", r1.recv()); }); thread::spawn(move || { println!(\"Thread 2 got: {:?}\", r2.recv()); }); //在主线程中发送消息 s.send(\"hello\").unwrap(); //等待所有线程结束 thread::sleep(std::time::Duration::from_secs(1));}
⚠️ 注意:一个消息只会被一个 receiver 接收到。
2.9 与 std::sync::mpsc 对比
三、flume
flume 是 Rust 中一个功能强大的 多生产者多消费者(MPMC)通道库,旨在替代标准库的 std::sync::mpsc,提供更高效、灵活且易于使用的通道通信机制。
它支持同步、异步、选择(select!)等高级功能,并兼容 tokio、async-std 等异步运行时。
flume 是一个 Rust 中的异步无锁多生产者多消费者(mpmc)通道库,专注于提供高性能的异步通信。
它基于一种无锁的设计,使得在多线程环境中进行并发操作更为高效。
• 功能丰富: 提供无限、有限和会合队列
• 快速: 性能总是比 std::sync::mpsc 快, 有时也比 crossbeam-channel 快
• 安全: 整个代码库中没有任何 unsafe 代码!
• 灵活: Sender 和 Receiver 都实现了 Send + Sync + Clone
• 熟悉: 可以无缝替换 std::sync::mpsc
• 强大: 额外支持像 MPMC 和发送超时/截止时间等功能
• 简单: 依赖少, 代码库精简, 编译快
• 异步: 支持 async, 可以与同步代码混合使用
• 人性化: 提供强大的 select 式接口
3.1 基本概念
flume 提供的核心函数是:
fn flume::bounded
(cap: usize) -> (Sender
, Receiver
)
fn flume::unbounded
() -> (Sender
, Receiver
)
bounded(cap):创建有界通道(容量为 cap)。
unbounded():创建无界通道。
Sender
:发送端,可 clone。
Receiver
:接收端,支持阻塞、非阻塞和异步接收。
3.2 同步使用示例
3.2.1 有界通道(阻塞)
use flume::bounded;use std::thread;fn main() { // 创建一个容量为2的通道 let (tx, rx) = bounded(2); let producer = thread::spawn(move || { for i in 0..5 { tx.send(i).unwrap(); // 当缓冲区满,会阻塞 println!(\"sent {}\", i); } }); let consumer = thread::spawn(move || { for _ in 0..5 { let val = rx.recv().unwrap(); // 阻塞等待 println!(\"received {}\", val); } }); // 等待线程结束 producer.join().unwrap(); consumer.join().unwrap();}
3.2.2 无界通道(不会阻塞)
use flume::unbounded;fn main() { let (tx, rx) = unbounded(); tx.send(\"hello\").unwrap(); println!(\"Got: {}\", rx.recv().unwrap());}
3.3 非阻塞 API
try_send() 和 try_recv()
use flume::bounded;fn main() { let (tx, rx) = bounded(1); tx.send(1).unwrap(); if let Err(e) = tx.try_send(2) { println!(\"try_send failed: {}\", e); } let val = rx.try_recv().unwrap(); println!(\"Got: {}\", val);}
3.4 异步使用
flume 的 Sender 和 Receiver 都实现了 futures::Sink 和 Stream。
use flume::bounded;use tokio::task;#[tokio::main]async fn main() { //创建一个容量为2的通道 let (tx, rx) = bounded(2); //创建一个生产者任务 let producer = task::spawn(async move { for i in 0..5 { tx.send_async(i).await.unwrap(); println!(\"async sent {}\", i); } }); //创建一个消费者任务 let consumer = task::spawn(async move { while let Ok(v) = rx.recv_async().await { println!(\"async received {}\", v); } }); //等待生产者和消费者任务完成 producer.await.unwrap(); consumer.await.unwrap();}
3.5 通道关闭与 Drop 行为
3.5.1 自动关闭通道
Sender 和 Receiver 都实现 Drop。
当所有 Sender 被 drop,recv() 或 recv_async() 会返回 Err(RecvError)。
let (tx, rx) = flume::unbounded::<i32>();drop(tx); // 所有发送端 dropassert!(rx.recv().is_err());
3.6 选择操作
Flume 支持从多个接收器中选择第一个可用的消息:
use flume::bounded;use flume::Selector;fn main() { //创建两个通道 let (tx1, rx1) = bounded(1); let (tx2, rx2) = bounded(1); //发送数据 tx1.send(\"from tx1\").unwrap(); tx2.send(\"from tx2\").unwrap(); //创建选择器 let sel = Selector::new() .recv(&rx1, |msg| { println!(\"rx1 got: {:?}\", msg); }) .recv(&rx2, |msg| { println!(\"rx2 got: {:?}\", msg); }); //等待数据 sel.wait();}
3.7 cloning 与多个消费者
多个消费者
use flume::unbounded;use std::thread;fn main() { let (tx, rx) = unbounded(); let rx2 = rx.clone(); // 多个消费者 thread::spawn(move || { while let Ok(msg) = rx.recv() { println!(\"Thread 1 received: {}\", msg); } }); thread::spawn(move || { while let Ok(msg) = rx2.recv() { println!(\"Thread 2 received: {}\", msg); } }); tx.send(\"hello\").unwrap(); tx.send(\"world\").unwrap(); drop(tx); // 关闭发送端}
3.8 注意事项
默认发送/接收是阻塞的,如需异步必须使用 *_async。
select! 语法和 tokio::select! 不一样,不能混用。
flume 的 Receiver 是可克隆的,但不是广播语义,每个消费者竞争同一条消息(即每条消息只会被一个消费者处理)。
四、oneshot
Rust 中的 oneshot 库提供了单次 (one-shot) 通道的实现。
单次通道意味着它只能发送和接收一个值, 发送或接收后通道就会关闭。
这可以用来在线程之间发送一个信号或值, 而不需要维护一个持续的通道。
oneshot 提供了 Sender 和 Receiver 结构体来表示单次通道的两端。发送方使用Sender, 接收方使用 Receiver。
注意:这个 crate 与 Tokio 的 tokio::sync::oneshot 是不同的实现,但接口设计类似,适用于非 tokio 依赖的异步场景。
4.1 依赖
# Cargo.toml[dependencies]oneshot = \"0.1.11\"
4.2 基本原理
该 crate 提供了 oneshot::channel() 函数,它返回 (Sender
, Receiver
):
Sender
:负责发送一次值;
Receiver
:负责接收一次值;
一旦发送完成,channel 被关闭,无法重用。
4.3 基本用法示例(同步代码)
use oneshot::channel;use std::thread;fn main() { let (sender, receiver) = channel(); // 启动一个线程发送数据 thread::spawn(move || { sender.send(\"Hello from thread!\").unwrap(); }); // 接收数据 match receiver.recv() { Ok(msg) => println!(\"Got: {}\", msg), Err(_) => println!(\"Sender dropped\"), }}
4.4 异步使用(结合 async-std 或 tokio)
4.4.1 异步接收 async fn recv() 接口
需要async_std crate
use oneshot::channel;use async_std::task;fn main() { task::block_on(async { let (sender, receiver) = channel(); // 异步发送 task::spawn(async move { sender.send(\"async world\").unwrap(); }); // 异步接收 let msg = receiver.await.unwrap(); println!(\"Received: {}\", msg); });}
4.4.2 使用 tokio runtime
不推荐这个 crate,因为 tokio 提供自己的 oneshot
use oneshot::channel;use tokio::task;#[tokio::main]async fn main() { let (sender, receiver) = channel(); task::spawn(async move { sender.send(\"tokio world\").unwrap(); }); let msg = receiver.await.unwrap(); println!(\"Received: {}\", msg);}
4.5 Sender 与 Receiver 接口
Sender<T>
send(val: T) -> Result:发送一个值;
若接收者已被丢弃,返回 Err(val)。
Receiver<T>
recv():阻塞接收(同步);
await:异步接收;
如果发送者提前 drop,会返回 Err(Canceled)。
4.6 常见错误处理
use oneshot::channel;fn main() { let (_sender, mut receiver) = channel::<u32>(); match receiver.recv() { Ok(val) => println!(\"Received {}\", val), Err(_) => println!(\"Sender was dropped before sending\"), }}
4.7 对比 tokio::sync::oneshot
4.8 应用场景
异步任务之间的一次性响应机制
实现 cancel 或通知机制
用于 promise/response 模型
控制异步流程中的某个“完成标志”
4.9 示例:任务完成通知
use oneshot::channel;use async_std::task;use std::time::Duration;fn main() { task::block_on(async { let (done_tx, done_rx) = channel(); task::spawn(async move { task::sleep(Duration::from_secs(2)).await; done_tx.send(\"Done\").unwrap(); }); println!(\"Waiting...\"); let msg = done_rx.await.unwrap(); println!(\"Task finished: {}\", msg); });}
4.10 总结
oneshot crate 是轻量级、无 tokio 依赖的一次性通信工具;
支持 同步与异步 接收;
更适用于 非 tokio 环境 的单值异步通信;
如果你在使用 tokio,优先使用 tokio::sync::oneshot。