Concurrency
Rust's ownership system makes concurrency safe at compile time. The compiler prevents data races before your code ever runs - a concept known as "fearless concurrency."
For Python developers: This is where Rust truly shines! No GIL (Global Interpreter Lock) - you get true parallelism across CPU cores. The compiler prevents data races at compile time.
Python's GIL vs Rust's Concurrency
Python's limitation:
import threading
# Multiple threads, but GIL prevents true parallelism
# Only one thread executes Python bytecode at a time
def worker():
total = 0
for i in range(10000000):
total += i
return total
threads = [threading.Thread(target=worker) for _ in range(4)]
# These won't run in parallel for CPU-bound work!
Rust's advantage:
use std::thread;
// True parallelism - all threads run simultaneously
fn main() {
let handles: Vec<_> = (0..4)
.map(|_| {
thread::spawn(|| {
let mut total = 0;
for i in 0..10_000_000 {
total += i;
}
total
})
})
.collect();
// All 4 threads genuinely run in parallel!
}
Threads
Spawning Threads
use std::thread;
use std::time::Duration;
fn main() {
let handle = thread::spawn(|| {
for i in 1..5 {
println!(" spawned thread: {}", i);
thread::sleep(Duration::from_millis(1));
}
});
for i in 1..3 {
println!("main thread: {}", i);
thread::sleep(Duration::from_millis(1));
}
// Wait for the spawned thread to finish
handle.join().unwrap();
println!("All done");
}
Output (interleaved, order varies):
main thread: 1
spawned thread: 1
main thread: 2
spawned thread: 2
spawned thread: 3
spawned thread: 4
All done
Moving Data into Threads
use std::thread;
fn main() {
let name = String::from("Alice");
// Must use move to transfer ownership
let handle = thread::spawn(move || {
println!("Hello from {}", name);
});
// println!("{}", name); // ERROR: name was moved
handle.join().unwrap();
}
Without move, the closure borrows name. But the thread might outlive the scope where name is defined, so the compiler rejects it. move transfers ownership to the thread.
Multiple Threads
use std::thread;
fn main() {
let mut handles = vec![];
for i in 0..5 {
let handle = thread::spawn(move || {
println!("Thread {} running", i);
i * i
});
handles.push(handle);
}
let results: Vec<i32> = handles
.into_iter()
.map(|h| h.join().unwrap())
.collect();
println!("Results: {:?}", results);
}
Thread Builder
use std::thread;
fn main() {
let builder = thread::Builder::new()
.name("worker-1".to_string())
.stack_size(32 * 1024); // 32KB stack
let handle = builder.spawn(|| {
let name = thread::current().name().unwrap_or("unknown").to_string();
println!("Running on thread: {}", name);
}).unwrap();
handle.join().unwrap();
}
Message Passing with Channels
Channels allow threads to communicate by sending messages. Rust's channels follow the pattern: multiple producers, single consumer (mpsc).
Basic Channel
use std::sync::mpsc;
use std::thread;
fn main() {
let (tx, rx) = mpsc::channel();
thread::spawn(move || {
let msg = String::from("hello from thread");
tx.send(msg).unwrap();
// println!("{}", msg); // ERROR: msg was moved by send
});
let received = rx.recv().unwrap();
println!("Got: {}", received);
}
Sending Multiple Messages
use std::sync::mpsc;
use std::thread;
use std::time::Duration;
fn main() {
let (tx, rx) = mpsc::channel();
thread::spawn(move || {
let messages = vec!["hi", "from", "the", "thread"];
for msg in messages {
tx.send(msg.to_string()).unwrap();
thread::sleep(Duration::from_millis(200));
}
// tx is dropped here, closing the channel
});
// Iterate over received messages
for received in rx {
println!("Got: {}", received);
}
println!("Channel closed");
}
Multiple Producers
use std::sync::mpsc;
use std::thread;
fn main() {
let (tx, rx) = mpsc::channel();
// Clone the sender for each thread
for i in 0..3 {
let tx_clone = tx.clone();
thread::spawn(move || {
let msg = format!("message from thread {}", i);
tx_clone.send(msg).unwrap();
});
}
// Drop the original sender so the channel closes
// when all clones are dropped
drop(tx);
for received in rx {
println!("Got: {}", received);
}
}
Channel Methods
| Method | Behavior |
|---|---|
tx.send(val) | Send value, returns Result |
rx.recv() | Block until message arrives |
rx.try_recv() | Return immediately with Result |
rx.recv_timeout(dur) | Block with timeout |
rx.iter() | Iterate until channel closes |
use std::sync::mpsc;
use std::time::Duration;
fn main() {
let (tx, rx) = mpsc::channel();
// Non-blocking receive
match rx.try_recv() {
Ok(msg) => println!("Got: {}", msg),
Err(mpsc::TryRecvError::Empty) => println!("Nothing yet"),
Err(mpsc::TryRecvError::Disconnected) => println!("Channel closed"),
}
// Receive with timeout
match rx.recv_timeout(Duration::from_secs(1)) {
Ok(msg) => println!("Got: {}", msg),
Err(_) => println!("Timed out"),
}
}
Shared State with Mutex
Mutex Basics
A Mutex<T> provides mutual exclusion - only one thread can access the data at a time.
use std::sync::Mutex;
fn main() {
let m = Mutex::new(5);
{
// Lock the mutex to access data
let mut num = m.lock().unwrap();
*num = 6;
// Lock is automatically released when MutexGuard is dropped
}
println!("m = {:?}", m); // m = Mutex { data: 6 }
}
Sharing Mutex Across Threads with Arc
Arc<T> (Atomic Reference Counting) is a thread-safe version of Rc<T>:
use std::sync::{Arc, Mutex};
use std::thread;
fn main() {
let counter = Arc::new(Mutex::new(0));
let mut handles = vec![];
for _ in 0..10 {
let counter = Arc::clone(&counter);
let handle = thread::spawn(move || {
let mut num = counter.lock().unwrap();
*num += 1;
});
handles.push(handle);
}
for handle in handles {
handle.join().unwrap();
}
println!("Result: {}", *counter.lock().unwrap()); // 10
}
Why Arc and Not Rc?
| Type | Thread-Safe | Overhead |
|---|---|---|
Rc<T> | No | Lower (non-atomic ops) |
Arc<T> | Yes | Slightly higher (atomic ops) |
Use Rc<T> in single-threaded code, Arc<T> when sharing across threads.
RwLock: Multiple Readers, One Writer
use std::sync::{Arc, RwLock};
use std::thread;
fn main() {
let data = Arc::new(RwLock::new(vec![1, 2, 3]));
let mut handles = vec![];
// Multiple readers
for i in 0..3 {
let data = Arc::clone(&data);
handles.push(thread::spawn(move || {
let read = data.read().unwrap();
println!("Reader {}: {:?}", i, *read);
}));
}
// Single writer
{
let data = Arc::clone(&data);
handles.push(thread::spawn(move || {
let mut write = data.write().unwrap();
write.push(4);
println!("Writer added 4");
}));
}
for handle in handles {
handle.join().unwrap();
}
println!("Final: {:?}", *data.read().unwrap());
}
| Lock Type | Readers | Writers | Use When |
|---|---|---|---|
Mutex<T> | 1 at a time | 1 at a time | Mostly writes, simple access |
RwLock<T> | Many simultaneous | 1 at a time | Many reads, few writes |
Practical Patterns
Worker Pool
use std::sync::{mpsc, Arc, Mutex};
use std::thread;
fn main() {
let (tx, rx) = mpsc::channel();
let rx = Arc::new(Mutex::new(rx));
let mut workers = vec![];
// Spawn worker threads
for id in 0..4 {
let rx = Arc::clone(&rx);
let worker = thread::spawn(move || {
loop {
let task = rx.lock().unwrap().recv();
match task {
Ok(n) => println!("Worker {}: processing {}", id, n),
Err(_) => {
println!("Worker {}: shutting down", id);
break;
}
}
}
});
workers.push(worker);
}
// Send work
for i in 0..10 {
tx.send(i).unwrap();
}
// Close channel
drop(tx);
// Wait for workers
for worker in workers {
worker.join().unwrap();
}
}
Parallel Computation
use std::thread;
fn parallel_sum(data: &[i32], num_threads: usize) -> i32 {
let chunk_size = (data.len() + num_threads - 1) / num_threads;
let mut handles = vec![];
for chunk in data.chunks(chunk_size) {
let chunk = chunk.to_vec();
handles.push(thread::spawn(move || {
chunk.iter().sum::<i32>()
}));
}
handles.into_iter()
.map(|h| h.join().unwrap())
.sum()
}
fn main() {
let data: Vec<i32> = (1..=100).collect();
let sum = parallel_sum(&data, 4);
println!("Sum: {}", sum); // 5050
}
Shared Configuration
use std::sync::Arc;
use std::thread;
#[derive(Debug)]
struct Config {
db_url: String,
max_retries: u32,
}
fn main() {
let config = Arc::new(Config {
db_url: "postgres://localhost/mydb".to_string(),
max_retries: 3,
});
let mut handles = vec![];
for i in 0..3 {
let config = Arc::clone(&config);
handles.push(thread::spawn(move || {
println!("Thread {}: connecting to {}", i, config.db_url);
println!("Thread {}: max retries = {}", i, config.max_retries);
}));
}
for handle in handles {
handle.join().unwrap();
}
}
Avoiding Deadlocks
Deadlock Example
use std::sync::{Arc, Mutex};
use std::thread;
fn main() {
let lock_a = Arc::new(Mutex::new(0));
let lock_b = Arc::new(Mutex::new(0));
// DON'T DO THIS - potential deadlock
let la = Arc::clone(&lock_a);
let lb = Arc::clone(&lock_b);
let t1 = thread::spawn(move || {
let _a = la.lock().unwrap(); // Locks A first
let _b = lb.lock().unwrap(); // Then tries to lock B
});
let la = Arc::clone(&lock_a);
let lb = Arc::clone(&lock_b);
let t2 = thread::spawn(move || {
let _b = lb.lock().unwrap(); // Locks B first
let _a = la.lock().unwrap(); // Then tries to lock A
// If t1 holds A and waits for B, and t2 holds B and waits for A -> DEADLOCK
});
}
Prevention Rules
- Always acquire locks in the same order across threads
- Hold locks for as short as possible
- Avoid nested locks when possible
- Prefer channels over shared state when practical
Send and Sync Traits
These marker traits control what can be shared across threads:
| Trait | Meaning |
|---|---|
Send | Type can be transferred to another thread |
Sync | Type can be referenced from multiple threads |
Most types are Send and Sync automatically. Notable exceptions:
Rc<T>is neitherSendnorSync(useArc<T>)Cell<T>andRefCell<T>areSendbut notSync- Raw pointers are neither
The compiler enforces these at compile time - you can't accidentally share non-thread-safe types.
Practice Exercises
Exercise 1: Parallel Map
Write a function that applies a function to each element in parallel:
fn parallel_map<T, U, F>(data: Vec<T>, f: F) -> Vec<U>
where
T: Send + 'static,
U: Send + 'static,
F: Fn(T) -> U + Send + Clone + 'static,
{
// Your code
}
Exercise 2: Chat System
Build a simple chat simulation using channels:
- Multiple "users" send messages to a central receiver
- The receiver prints all messages with the sender name
Exercise 3: Thread-Safe Counter
Create a counter that multiple threads can increment and read safely. Include a get() method and an increment() method.
Common Mistakes
1. Forgetting move for Thread Closures
let name = String::from("Alice");
thread::spawn(|| {
println!("{}", name); // ERROR: closure may outlive borrowed value
});
Fix: Use move closure: thread::spawn(move || { ... }).
2. Using Rc Instead of Arc
use std::rc::Rc;
let data = Rc::new(5);
thread::spawn(move || {
println!("{}", data); // ERROR: Rc<i32> cannot be sent between threads
});
Fix: Use Arc<T> for cross-thread sharing.
3. Forgetting to Drop Sender
let (tx, rx) = mpsc::channel();
// ... spawn threads with cloned tx ...
// If you don't drop(tx), the rx iterator never ends
for msg in rx { // Blocks forever
println!("{}", msg);
}
Fix: drop(tx) after spawning threads so the channel closes properly.
Key Takeaways
- Threads with
thread::spawnrun code concurrently moveclosures transfer ownership to threads- Channels (
mpsc) pass messages between threads Mutex<T>provides exclusive access to shared dataArc<T>enables thread-safe reference countingRwLock<T>allows multiple readers or one writerSendandSynctraits are enforced at compile time- Prefer channels over shared state when possible
Next Steps
In the next chapter, we'll explore Cargo in depth and the broader Rust ecosystem.
Quick Reference
// Spawn a thread
let handle = thread::spawn(move || { ... });
handle.join().unwrap();
// Channel
let (tx, rx) = mpsc::channel();
tx.send(value).unwrap();
let val = rx.recv().unwrap();
// Mutex with Arc
let data = Arc::new(Mutex::new(0));
let data_clone = Arc::clone(&data);
thread::spawn(move || {
let mut lock = data_clone.lock().unwrap();
*lock += 1;
});
// RwLock
let data = Arc::new(RwLock::new(vec![]));
let read = data.read().unwrap();
let mut write = data.write().unwrap();