Lesson 5 — Asynchronous Programming with Tokio
🎯 Learning Objectives
- Hiểu khái niệm async/await trong Rust và khi nào nên dùng.
- Biết cách dùng Tokio runtime để chạy tác vụ bất đồng bộ.
✅ Explanation & Key Concepts
- Cần
#[tokio::main]để bootstrap runtime. tokio::spawnchạy future trên worker thread pool.
- Async trong Rust = concurrency (xen kẽ, không block).
- Muốn tận dụng nhiều core: kết hợp async + multi-thread.
💻 Example Implementation
Cargo.toml
[package]
name = "lesson05_tokio_async"
version = "0.1.0"
edition = "2024"
[dependencies]
tokio = { version = "1", features = ["full"] }
src/main.rs
// cargo-deps: tokio="1" use tokio::time::{sleep, Duration}; #[tokio::main] async fn main() { let task1 = tokio::spawn(async { for i in 1..=3 { println!("Task 1 (Hi, dp) — step {}", i); sleep(Duration::from_millis(400)).await; } }); let task2 = tokio::spawn(async { for i in 1..=3 { println!("Task 2 — step {}", i); sleep(Duration::from_millis(600)).await; } }); // Await both tasks let _ = tokio::join!(task1, task2); println!("All async tasks done."); }
📖 Tokio Cookbook — Các Pattern Thường Dùng
1. Chạy nhiều task song song với join!
// cargo-deps: tokio="1" use tokio::time::{sleep, Duration}; #[tokio::main] async fn main() { let f1 = async { sleep(Duration::from_secs(1)).await; "done f1" }; let f2 = async { sleep(Duration::from_secs(2)).await; "done f2" }; let (r1, r2) = tokio::join!(f1, f2); println!("{r1}, {r2}"); }
2. Spawn Blocking cho CPU-bound code
// cargo-deps: tokio="1" #[tokio::main] async fn main() { let heavy = tokio::task::spawn_blocking(|| { let mut sum = 0; for i in 0..8_000 { sum += i; } sum }); println!("Result = {}", heavy.await.unwrap()); }
3. Channels để giao tiếp giữa tasks
// cargo-deps: tokio="1" use tokio::sync::mpsc; #[tokio::main] async fn main() { let (tx, mut rx) = mpsc::channel(32); tokio::spawn(async move { for i in 1..=5 { tx.send(format!("msg {i}")).await.unwrap(); } }); while let Some(msg) = rx.recv().await { println!("Got: {msg}"); } }
4. Semaphore để giới hạn song song
// cargo-deps: tokio="1" use tokio::sync::Semaphore; use std::sync::Arc; #[tokio::main] async fn main() { let semaphore = Arc::new(Semaphore::new(2)); for i in 1..=5 { let permit = semaphore.clone().acquire_owned().await.unwrap(); tokio::spawn(async move { println!("Task {i} started"); tokio::time::sleep(std::time::Duration::from_millis(8)).await; println!("Task {i} done"); drop(permit); }); } tokio::time::sleep(std::time::Duration::from_millis(8)).await; }
5. Timeout cho Future
// cargo-deps: tokio="1" use tokio::time::{timeout, Duration}; #[tokio::main] async fn main() { let res = timeout(Duration::from_secs(1), async { tokio::time::sleep(Duration::from_secs(2)).await; "done" }).await; match res { Ok(v) => println!("Finished: {v}"), Err(_) => println!("Timeout!"), } }
6. Interval — Task lặp lại định kỳ
// cargo-deps: tokio="1" use tokio::time; #[tokio::main] async fn main() { let mut interval = time::interval(time::Duration::from_secs(2)); for _ in 0..3 { interval.tick().await; println!("Tick at {:?}", time::Instant::now()); } }
📊 Comparison: Tokio Concurrency Primitives
tokio::spawn➜ Tạo task mới trên runtime (thread pool). Trả vềJoinHandle<T>. Chạy song song với task hiện tại.join!/try_join!➜ Không tạo task mới. Poll nhiều futures trong chính task hiện tại cho đến khi tất cả xong (hoặc error vớitry_join!).select!➜ Không tạo task mới. Chờ một trong số các futures hoàn tất rồi tiếp tục nhánh đó; các futures còn lại bị hủy (drop) trừ khi được giữ lại.spawn_blocking➜ Đưa code blocking sang thread pool riêng để không chặn runtime.
| API | Execution Context | Description | Recommended Use |
|---|---|---|---|
tokio::spawn(fut) | Worker threads của runtime | Lên lịch một future chạy độc lập; trả JoinHandle để await sau | Tách tác vụ dài / độc lập; fan-out nhiều việc song song thực sự |
tokio::join!(a,b,...) | Task hiện tại | Poll đồng thời nhiều futures trong cùng task cho đến khi tất cả xong | Cần tất cả cùng hoàn tất; chạy đồng thời nhưng không tách task |
tokio::try_join!(a,b,...) | Task hiện tại | Như join! nhưng trả về Err ngay khi có future lỗi | Muốn propagate lỗi sớm, hủy phần còn lại |
tokio::select!{ ... } | Task hiện tại | Chọn nhánh hoàn tất đầu tiên; futures còn lại bị drop (bị hủy) | Đua nhiều nguồn (race), timeout, cancel một nhánh khi nhánh khác xong |
tokio::task::spawn_blocking(f) | Thread pool blocking | Chạy closure blocking (CPU/I/O sync) tách khỏi runtime | Bọc code không async (hash nặng, nén, đọc file lớn, v.v.) |
🛠️ Hands-on Exercises
- Basic: Viết async fn
say_after(msg: &str, delay_ms: u64)in ra message saudelay_msms. Gọi 2 hàm và.awaitchúng. - Intermediate: Dùng
tokio::join!để chạy 3 tác vụ song song (mỗi tác vụ in 5 dòng với sleep khác nhau). - Challenge: Viết mini HTTP fetcher dùng
reqwest+ Tokio để tải nội dung 3 URL cùng lúc và in độ dài nội dung.
🐞 Common Pitfalls & Debugging Tips
- Quên thêm
#[tokio::main]→ không có runtime. - Block bằng
std::thread::sleeptrong async code → chặn toàn bộ runtime. - Dùng
.awaittuần tự thay vìtokio::join!→ mất tính song song. - Không phân biệt
spawn(chạy task độc lập) vớijoin!(chạy và chờ đồng thời).
❓ Q&A — Common Questions
Q1. Async khác gì với multi-threading truyền thống?
- Async là cooperative concurrency: các tác vụ nhường quyền khi
.await, giúp tận dụng tốt I/O chờ. Multi-threading là preemptive: OS scheduler xen kẽ CPU time giữa nhiều thread, phù hợp CPU-bound.
Q2. async fn thực sự trả về gì?
- Nó trả về một type ẩn danh
impl Future<Output = T>. Future chỉ chạy khi được poll bởi runtime;.awaitđăng ký tiếp tục khi sẵn sàng.
Q3. Khi nào nên dùng tokio::spawn thay vì .await?
- Dùng
spawnkhi muốn một công việc chạy độc lập (không chặn luồng logic hiện tại) hoặc fan-out nhiều việc song song thật sự; còn.awaittrực tiếp thì chờ xong mới đi tiếp trong cùng task.
🔄 Migration Notes (Rust 2024+)
- Async/await đã stable từ Rust 1.39.
- Tokio 1.x tương thích Rust 2024 edition.
- Tận dụng
tokio::task::spawn_blockingcho code blocking (CPU-heavy).
📚 References
📖 Glossary of Terms
- Future: giá trị biểu diễn tính toán bất đồng bộ.
- .await: chờ completion của future.
- Tokio runtime: executor chạy futures.
- tokio::spawn: chạy future trên thread pool.
- join!: macro chờ nhiều future cùng lúc.
- spawn_blocking: tách code blocking sang thread pool riêng.
- mpsc channel: hàng đợi gửi/nhận dữ liệu giữa tasks.
- Semaphore: công cụ giới hạn số task chạy song song.
🌿 Wisdom Note
Đại thành nhược khuyết, kỳ dụng bất bì.
Great accomplishment may appear incomplete, yet its use is never exhausted.
Async programming cũng vậy: nhìn có vẻ phức tạp, nhưng dùng đúng sẽ mở rộng vô hạn khả năng xử lý đồng thời.