Lesson 5 — Asynchronous Programming with Tokio
Learning Objectives
- Hiểu khái niệm async/await trong Rust và khi nào nên dùng.
- Biết cách dùng Tokio runtime để chạy tác vụ bất đồng bộ.
Explanation & Key Concepts
- Cần
#[tokio::main]để bootstrap runtime. tokio::spawnchạy future trên worker thread pool.
- Async trong Rust = concurrency (xen kẽ, không block).
- Muốn tận dụng nhiều core: kết hợp async + multi-thread.
Example Implementation
Cargo.toml
[package]
name = "lesson05_tokio_async"
version = "0.1.0"
edition = "2024"
[dependencies]
tokio = { version = "1", features = ["full"] }
src/main.rs
// cargo-deps: tokio="1" use tokio::time::{sleep, Duration}; #[tokio::main] async fn main() { let task1 = tokio::spawn(async { for i in 1..=3 { println!("Task 1 (Hi, dp) — step {}", i); sleep(Duration::from_millis(400)).await; } }); let task2 = tokio::spawn(async { for i in 1..=3 { println!("Task 2 — step {}", i); sleep(Duration::from_millis(600)).await; } }); // Await both tasks let _ = tokio::join!(task1, task2); println!("All async tasks done."); }
Tokio Cookbook — Các Pattern Thường Dùng
1. Chạy nhiều task song song với join!
// cargo-deps: tokio="1" use tokio::time::{sleep, Duration}; #[tokio::main] async fn main() { let f1 = async { sleep(Duration::from_secs(1)).await; "done f1" }; let f2 = async { sleep(Duration::from_secs(2)).await; "done f2" }; let (r1, r2) = tokio::join!(f1, f2); println!("{r1}, {r2}"); }
2. Spawn Blocking cho CPU-bound code
// cargo-deps: tokio="1" #[tokio::main] async fn main() { let heavy = tokio::task::spawn_blocking(|| { let mut sum = 0; for i in 0..8_000 { sum += i; } sum }); println!("Result = {}", heavy.await.unwrap()); }
3. Channels để giao tiếp giữa tasks
// cargo-deps: tokio="1" use tokio::sync::mpsc; #[tokio::main] async fn main() { let (tx, mut rx) = mpsc::channel(32); tokio::spawn(async move { for i in 1..=5 { tx.send(format!("msg {i}")).await.unwrap(); } }); while let Some(msg) = rx.recv().await { println!("Got: {msg}"); } }
4. Semaphore để giới hạn song song
// cargo-deps: tokio="1" use tokio::sync::Semaphore; use std::sync::Arc; #[tokio::main] async fn main() { let semaphore = Arc::new(Semaphore::new(2)); for i in 1..=5 { let permit = semaphore.clone().acquire_owned().await.unwrap(); tokio::spawn(async move { println!("Task {i} started"); tokio::time::sleep(std::time::Duration::from_millis(8)).await; println!("Task {i} done"); drop(permit); }); } tokio::time::sleep(std::time::Duration::from_millis(8)).await; }
5. Timeout cho Future
// cargo-deps: tokio="1" use tokio::time::{timeout, Duration}; #[tokio::main] async fn main() { let res = timeout(Duration::from_secs(1), async { tokio::time::sleep(Duration::from_secs(2)).await; "done" }).await; match res { Ok(v) => println!("Finished: {v}"), Err(_) => println!("Timeout!"), } }
6. Interval — Task lặp lại định kỳ
// cargo-deps: tokio="1" use tokio::time; #[tokio::main] async fn main() { let mut interval = time::interval(time::Duration::from_secs(2)); for _ in 0..3 { interval.tick().await; println!("Tick at {:?}", time::Instant::now()); } }
Comparison: Tokio Concurrency Primitives
tokio::spawn➜ Tạo task mới trên runtime (thread pool). Trả vềJoinHandle<T>. Chạy song song với task hiện tại.join!/try_join!➜ Không tạo task mới. Poll nhiều futures trong chính task hiện tại cho đến khi tất cả xong (hoặc error vớitry_join!).select!➜ Không tạo task mới. Chờ một trong số các futures hoàn tất rồi tiếp tục nhánh đó; các futures còn lại bị hủy (drop) trừ khi được giữ lại.spawn_blocking➜ Đưa code blocking sang thread pool riêng để không chặn runtime.
| API | Execution Context | Description | Recommended Use |
|---|---|---|---|
tokio::spawn(fut) | Worker threads của runtime | Lên lịch một future chạy độc lập; trả JoinHandle để await sau | Tách tác vụ dài / độc lập; fan-out nhiều việc song song thực sự |
tokio::join!(a,b,...) | Task hiện tại | Poll đồng thời nhiều futures trong cùng task cho đến khi tất cả xong | Cần tất cả cùng hoàn tất; chạy đồng thời nhưng không tách task |
tokio::try_join!(a,b,...) | Task hiện tại | Như join! nhưng trả về Err ngay khi có future lỗi | Muốn propagate lỗi sớm, hủy phần còn lại |
tokio::select!{ ... } | Task hiện tại | Chọn nhánh hoàn tất đầu tiên; futures còn lại bị drop (bị hủy) | Đua nhiều nguồn (race), timeout, cancel một nhánh khi nhánh khác xong |
tokio::task::spawn_blocking(f) | Thread pool blocking | Chạy closure blocking (CPU/I/O sync) tách khỏi runtime | Bọc code không async (hash nặng, nén, đọc file lớn, v.v.) |
Hands-on Exercises
- Basic: Viết async fn
say_after(msg: &str, delay_ms: u64)in ra message saudelay_msms. Gọi 2 hàm và.awaitchúng. - Intermediate: Dùng
tokio::join!để chạy 3 tác vụ song song (mỗi tác vụ in 5 dòng với sleep khác nhau). - Challenge: Viết mini HTTP fetcher dùng
reqwest+ Tokio để tải nội dung 3 URL cùng lúc và in độ dài nội dung.
Common Pitfalls & Debugging Tips
- Quên thêm
#[tokio::main]→ không có runtime. - Block bằng
std::thread::sleeptrong async code → chặn toàn bộ runtime. - Dùng
.awaittuần tự thay vìtokio::join!→ mất tính song song. - Không phân biệt
spawn(chạy task độc lập) vớijoin!(chạy và chờ đồng thời).
Q&A — Common Questions
Q1. Async khác gì với multi-threading truyền thống?
- Async là cooperative concurrency: các tác vụ nhường quyền khi
.await, giúp tận dụng tốt I/O chờ. Multi-threading là preemptive: OS scheduler xen kẽ CPU time giữa nhiều thread, phù hợp CPU-bound.
Q2. async fn thực sự trả về gì?
- Nó trả về một type ẩn danh
impl Future<Output = T>. Future chỉ chạy khi được poll bởi runtime;.awaitđăng ký tiếp tục khi sẵn sàng.
Q3. Khi nào nên dùng tokio::spawn thay vì .await?
- Dùng
spawnkhi muốn một công việc chạy độc lập (không chặn luồng logic hiện tại) hoặc fan-out nhiều việc song song thật sự; còn.awaittrực tiếp thì chờ xong mới đi tiếp trong cùng task.
Migration Notes (Rust 2024+)
- Async/await đã stable từ Rust 1.39.
- Tokio 1.x tương thích Rust 2024 edition.
- Tận dụng
tokio::task::spawn_blockingcho code blocking (CPU-heavy).
References
Glossary of Terms
- Future: giá trị biểu diễn tính toán bất đồng bộ.
- .await: chờ completion của future.
- Tokio runtime: executor chạy futures.
- tokio::spawn: chạy future trên thread pool.
- join!: macro chờ nhiều future cùng lúc.
- spawn_blocking: tách code blocking sang thread pool riêng.
- mpsc channel: hàng đợi gửi/nhận dữ liệu giữa tasks.
- Semaphore: công cụ giới hạn số task chạy song song.
Wisdom Note
Đại thành nhược khuyết, kỳ dụng bất bì.
Great accomplishment may appear incomplete, yet its use is never exhausted.
Async programming cũng vậy: nhìn có vẻ phức tạp, nhưng dùng đúng sẽ mở rộng vô hạn khả năng xử lý đồng thời.