Processes
Master Elixir's lightweight concurrency with processes, message passing, spawn, links, monitors, and GenServer.
Understanding Processes
Elixir processes are NOT OS threads:
- Lightweight: ~2KB memory each - spawn millions
- Isolated: Own memory, no shared state
- Concurrent: Run simultaneously on BEAM VM
- Fast: ~microseconds to spawn
- Fault-tolerant: One crash doesn't affect others
Why Processes?
# Sequential (slow)
result1 = expensive_operation1()
result2 = expensive_operation2()
result3 = expensive_operation3()
# Concurrent (fast)
pid1 = spawn(fn -> expensive_operation1() end)
pid2 = spawn(fn -> expensive_operation2() end)
pid3 = spawn(fn -> expensive_operation3() end)
# All run simultaneously!
Spawning Processes
spawn/1
# Basic spawn
pid = spawn(fn -> IO.puts("Hello from process!") end)
# => #PID<0.123.0>
# Process executes and terminates
# "Hello from process!" is printed
# Spawn with work
pid = spawn(fn ->
result = 1 + 1
IO.puts("Result: #{result}")
end)
spawn/3
defmodule Math do
def add(a, b) do
IO.puts("#{a} + #{b} = #{a + b}")
end
end
pid = spawn(Math, :add, [2, 3])
# => Prints "2 + 3 = 5"
Process Identity
# Current process PID
self() # => #PID<0.105.0>
# Check if process is alive
pid = spawn(fn -> :timer.sleep(1000) end)
Process.alive?(pid) # => true
:timer.sleep(1100)
Process.alive?(pid) # => false
Message Passing
Processes communicate via messages - asynchronous and non-blocking.
send and receive
# Send a message
pid = spawn(fn ->
receive do
{:hello, msg} -> IO.puts("Got hello: #{msg}")
{:goodbye, msg} -> IO.puts("Got goodbye: #{msg}")
end
end)
send(pid, {:hello, "world"})
# => Prints "Got hello: world"
receive with Multiple Clauses
pid = spawn(fn ->
receive do
{:add, a, b} -> IO.puts("#{a} + #{b} = #{a + b}")
{:multiply, a, b} -> IO.puts("#{a} * #{b} = #{a * b}")
_ -> IO.puts("Unknown message")
end
end)
send(pid, {:add, 5, 3}) # => "5 + 3 = 8"
send(pid, {:multiply, 4, 7}) # Process already terminated, message lost!
receive in a Loop
defmodule Echo do
def loop do
receive do
{:echo, msg} ->
IO.puts("Echo: #{msg}")
loop() # Loop to receive more messages
:stop ->
IO.puts("Stopping")
# Don't loop, process terminates
end
end
end
pid = spawn(&Echo.loop/0)
send(pid, {:echo, "hello"}) # => "Echo: hello"
send(pid, {:echo, "world"}) # => "Echo: world"
send(pid, :stop) # => "Stopping"
Receiving with Timeout
receive do
{:hello, msg} -> msg
after
1000 -> "No message in 1 second"
end
Reply Pattern
defmodule Calculator do
def start do
spawn(fn -> loop() end)
end
defp loop do
receive do
{:add, a, b, caller} ->
send(caller, {:result, a + b})
loop()
:stop ->
:ok
end
end
end
pid = Calculator.start()
send(pid, {:add, 5, 3, self()})
receive do
{:result, value} -> IO.puts("Result: #{value}")
after
1000 -> IO.puts("Timeout")
end
# => "Result: 8"
Process Linking
When processes should fail together:
spawn_link/1
# Linked processes
spawn_link(fn ->
raise "I crashed!"
end)
# Current process also crashes!
# Trap exits to handle it
Process.flag(:trap_exit, true)
pid = spawn_link(fn ->
raise "I crashed!"
end)
receive do
{:EXIT, ^pid, reason} ->
IO.puts("Process crashed: #{inspect(reason)}")
end
Link Existing Process
pid = spawn(fn -> :timer.sleep(5000) end)
Process.link(pid)
# Now they're linked
Process.info(self(), :links)
# => {:links, [#PID<...>]}
Unlink
pid = spawn_link(fn -> :timer.sleep(5000) end)
Process.unlink(pid)
# No longer linked
Process Monitoring
One-way observation - doesn't link:
spawn_monitor/1
{pid, ref} = spawn_monitor(fn ->
:timer.sleep(1000)
raise "Oops"
end)
receive do
{:DOWN, ^ref, :process, ^pid, reason} ->
IO.puts("Process died: #{inspect(reason)}")
end
Monitor Existing Process
pid = spawn(fn -> :timer.sleep(1000) end)
ref = Process.monitor(pid)
receive do
{:DOWN, ^ref, :process, ^pid, reason} ->
IO.puts("Process finished: #{reason}")
after
2000 -> IO.puts("Timeout")
end
Demonitor
ref = Process.monitor(pid)
Process.demonitor(ref)
Links vs Monitors
# Links: Bidirectional, crash together (supervisor pattern)
spawn_link(fn -> work() end)
# Monitors: Unidirectional, observer gets notified (watching pattern)
spawn_monitor(fn -> work() end)
Named Processes
Register processes with atoms:
Process.register/2
pid = spawn(fn ->
receive do
msg -> IO.puts("Got: #{msg}")
end
end)
Process.register(pid, :my_process)
send(:my_process, "Hello!")
# => "Got: Hello!"
# Whereis
Process.whereis(:my_process)
# => #PID<0.123.0>
# Unregister
Process.unregister(:my_process)
Named spawn
defmodule Counter do
def start do
spawn(fn -> loop(0) end)
|> Process.register(:counter)
end
defp loop(count) do
receive do
:increment -> loop(count + 1)
{:get, caller} ->
send(caller, {:count, count})
loop(count)
end
end
end
Counter.start()
send(:counter, :increment)
send(:counter, :increment)
send(:counter, {:get, self()})
receive do
{:count, n} -> IO.puts("Count: #{n}")
end
# => "Count: 2"
Task Module
Higher-level abstractions for processes:
Task.async/1 and Task.await/1
# Run concurrently, wait for result
task = Task.async(fn ->
:timer.sleep(1000)
"Done!"
end)
# Do other work...
result = Task.await(task)
# => "Done!" (after 1 second)
Multiple Concurrent Tasks
tasks = [
Task.async(fn -> fetch_users() end),
Task.async(fn -> fetch_orders() end),
Task.async(fn -> fetch_products() end)
]
results = Task.await_many(tasks)
# => [users, orders, products]
Task.start/1
Fire and forget:
Task.start(fn ->
# Background work, don't care about result
send_email()
end)
Task.start_link/1
Linked task:
Task.start_link(fn ->
# If this crashes, parent is notified
critical_work()
end)
Agent Module
Simple state management:
Creating an Agent
{:ok, agent} = Agent.start_link(fn -> 0 end)
# Get value
Agent.get(agent, fn state -> state end)
# => 0
# Update value
Agent.update(agent, fn state -> state + 1 end)
Agent.get(agent, fn state -> state end)
# => 1
Named Agent
{:ok, _} = Agent.start_link(fn -> %{} end, name: :config)
Agent.update(:config, fn state -> Map.put(state, :key, "value") end)
Agent.get(:config, fn state -> state end)
# => %{key: "value"}
Agent Functions
defmodule Counter do
def start_link(initial \\ 0) do
Agent.start_link(fn -> initial end, name: __MODULE__)
end
def value do
Agent.get(__MODULE__, & &1)
end
def increment do
Agent.update(__MODULE__, &(&1 + 1))
end
def decrement do
Agent.update(__MODULE__, &(&1 - 1))
end
end
Counter.start_link(10)
Counter.increment()
Counter.value() # => 11
GenServer
Generic server behavior - the foundation of OTP:
Basic GenServer
defmodule Stack do
use GenServer
# Client API
def start_link(initial_stack) do
GenServer.start_link(__MODULE__, initial_stack, name: __MODULE__)
end
def push(item) do
GenServer.cast(__MODULE__, {:push, item})
end
def pop do
GenServer.call(__MODULE__, :pop)
end
# Server Callbacks
@impl true
def init(initial_stack) do
{:ok, initial_stack}
end
@impl true
def handle_cast({:push, item}, state) do
{:noreply, [item | state]}
end
@impl true
def handle_call(:pop, _from, [head | tail]) do
{:reply, head, tail}
end
def handle_call(:pop, _from, []) do
{:reply, nil, []}
end
end
# Usage
{:ok, _pid} = Stack.start_link([1, 2, 3])
Stack.push(4)
Stack.pop() # => 4
Stack.pop() # => 3
GenServer Callbacks
defmodule MyServer do
use GenServer
# Initialize state
def init(args) do
{:ok, initial_state}
# or {:ok, state, timeout}
# or :ignore
# or {:stop, reason}
end
# Synchronous - returns reply
def handle_call(request, from, state) do
{:reply, reply, new_state}
# or {:noreply, new_state}
# or {:stop, reason, reply, new_state}
end
# Asynchronous - no reply
def handle_cast(request, state) do
{:noreply, new_state}
# or {:stop, reason, new_state}
end
# Handle messages sent directly to process
def handle_info(msg, state) do
{:noreply, new_state}
end
# Cleanup before termination
def terminate(reason, state) do
# Cleanup code
:ok
end
end
call vs cast
# call: Synchronous, waits for reply
GenServer.call(pid, :get_state)
# cast: Asynchronous, fire and forget
GenServer.cast(pid, {:update, value})
GenServer with State
defmodule KVStore do
use GenServer
def start_link do
GenServer.start_link(__MODULE__, %{}, name: __MODULE__)
end
def put(key, value) do
GenServer.cast(__MODULE__, {:put, key, value})
end
def get(key) do
GenServer.call(__MODULE__, {:get, key})
end
def delete(key) do
GenServer.cast(__MODULE__, {:delete, key})
end
@impl true
def init(_), do: {:ok, %{}}
@impl true
def handle_cast({:put, key, value}, state) do
{:noreply, Map.put(state, key, value)}
end
def handle_cast({:delete, key}, state) do
{:noreply, Map.delete(state, key)}
end
@impl true
def handle_call({:get, key}, _from, state) do
{:reply, Map.get(state, key), state}
end
end
KVStore.start_link()
KVStore.put(:name, "Alice")
KVStore.get(:name) # => "Alice"
GenServer with Timer
defmodule Ticker do
use GenServer
def start_link(interval) do
GenServer.start_link(__MODULE__, interval, name: __MODULE__)
end
@impl true
def init(interval) do
schedule_tick(interval)
{:ok, %{interval: interval, count: 0}}
end
@impl true
def handle_info(:tick, state) do
IO.puts("Tick #{state.count}")
schedule_tick(state.interval)
{:noreply, %{state | count: state.count + 1}}
end
defp schedule_tick(interval) do
Process.send_after(self(), :tick, interval)
end
end
Ticker.start_link(1000)
# Prints "Tick 0", "Tick 1", etc. every second
Practical Examples
Concurrent URL Fetcher
defmodule URLFetcher do
def fetch_all(urls) do
urls
|> Enum.map(&Task.async(fn -> fetch(&1) end))
|> Task.await_many(timeout: 10_000)
end
defp fetch(url) do
case HTTPoison.get(url) do
{:ok, %{status_code: 200, body: body}} ->
{:ok, url, body}
{:ok, %{status_code: code}} ->
{:error, url, "Status #{code}"}
{:error, reason} ->
{:error, url, reason}
end
end
end
urls = ["https://example.com", "https://example.org"]
URLFetcher.fetch_all(urls)
Worker Pool
defmodule Worker do
use GenServer
def start_link(id) do
GenServer.start_link(__MODULE__, id)
end
def init(id) do
{:ok, id}
end
def handle_call({:work, task}, _from, id) do
result = perform_work(task)
{:reply, {:ok, result, id}, id}
end
defp perform_work(task) do
:timer.sleep(1000)
"Completed: #{task}"
end
end
defmodule WorkerPool do
def start(size) do
Enum.map(1..size, fn id ->
{:ok, pid} = Worker.start_link(id)
pid
end)
end
def distribute_work(workers, tasks) do
tasks
|> Enum.zip(Stream.cycle(workers))
|> Enum.map(fn {task, worker} ->
Task.async(fn ->
GenServer.call(worker, {:work, task})
end)
end)
|> Task.await_many()
end
end
workers = WorkerPool.start(3)
tasks = ["task1", "task2", "task3", "task4", "task5"]
WorkerPool.distribute_work(workers, tasks)
PubSub System
defmodule PubSub do
use GenServer
def start_link do
GenServer.start_link(__MODULE__, %{}, name: __MODULE__)
end
def subscribe(topic) do
GenServer.cast(__MODULE__, {:subscribe, topic, self()})
end
def publish(topic, message) do
GenServer.cast(__MODULE__, {:publish, topic, message})
end
@impl true
def init(_) do
{:ok, %{}}
end
@impl true
def handle_cast({:subscribe, topic, pid}, state) do
subscribers = Map.get(state, topic, [])
{:noreply, Map.put(state, topic, [pid | subscribers])}
end
def handle_cast({:publish, topic, message}, state) do
subscribers = Map.get(state, topic, [])
Enum.each(subscribers, fn pid ->
send(pid, {:message, topic, message})
end)
{:noreply, state}
end
end
# Usage
PubSub.start_link()
PubSub.subscribe(:news)
spawn(fn ->
receive do
{:message, topic, msg} ->
IO.puts("Received on #{topic}: #{msg}")
end
end)
PubSub.publish(:news, "Breaking news!")
Rate Limiter
defmodule RateLimiter do
use GenServer
def start_link(max_requests, window_ms) do
GenServer.start_link(__MODULE__, {max_requests, window_ms}, name: __MODULE__)
end
def check do
GenServer.call(__MODULE__, :check)
end
@impl true
def init({max_requests, window_ms}) do
state = %{
max_requests: max_requests,
window_ms: window_ms,
requests: []
}
{:ok, state}
end
@impl true
def handle_call(:check, _from, state) do
now = System.monotonic_time(:millisecond)
cutoff = now - state.window_ms
recent_requests = Enum.filter(state.requests, &(&1 > cutoff))
if length(recent_requests) < state.max_requests do
new_state = %{state | requests: [now | recent_requests]}
{:reply, :ok, new_state}
else
{:reply, :rate_limited, state}
end
end
end
# Allow 5 requests per 1 second
RateLimiter.start_link(5, 1000)
Enum.each(1..10, fn i ->
case RateLimiter.check() do
:ok -> IO.puts("Request #{i}: OK")
:rate_limited -> IO.puts("Request #{i}: RATE LIMITED")
end
:timer.sleep(100)
end)
Exercises
Create a process that echoes messages back to the sender
Implement a GenServer-based counter with increment, decrement, and get operations
Build a parallel map function using spawn and message passing
Create a simple cache using Agent that expires entries after a timeout
Implement a GenServer that manages a queue of jobs and processes them one at a time
# Solutions
# 1. Echo process
defmodule Echo do
def start do
spawn(fn -> loop() end)
end
defp loop do
receive do
{sender, msg} ->
send(sender, {:echo, msg})
loop()
end
end
end
pid = Echo.start()
send(pid, {self(), "Hello"})
receive do
{:echo, msg} -> IO.puts("Got: #{msg}")
end
# 2. GenServer counter
defmodule Counter do
use GenServer
def start_link(initial \\ 0) do
GenServer.start_link(__MODULE__, initial, name: __MODULE__)
end
def increment do
GenServer.cast(__MODULE__, :increment)
end
def decrement do
GenServer.cast(__MODULE__, :decrement)
end
def get do
GenServer.call(__MODULE__, :get)
end
@impl true
def init(initial), do: {:ok, initial}
@impl true
def handle_cast(:increment, state), do: {:noreply, state + 1}
def handle_cast(:decrement, state), do: {:noreply, state - 1}
@impl true
def handle_call(:get, _from, state), do: {:reply, state, state}
end
# 3. Parallel map
defmodule ParallelMap do
def pmap(list, func) do
list
|> Enum.map(&Task.async(fn -> func.(&1) end))
|> Enum.map(&Task.await/1)
end
end
ParallelMap.pmap([1, 2, 3, 4], fn x ->
:timer.sleep(1000)
x * 2
end)
# => [2, 4, 6, 8] (takes ~1 second, not 4)
# 4. Cache with expiry
defmodule Cache do
use Agent
def start_link do
Agent.start_link(fn -> %{} end, name: __MODULE__)
end
def put(key, value, ttl \\ 60_000) do
expires_at = System.monotonic_time(:millisecond) + ttl
Agent.update(__MODULE__, &Map.put(&1, key, {value, expires_at}))
end
def get(key) do
Agent.get(__MODULE__, fn state ->
case Map.get(state, key) do
{value, expires_at} ->
if System.monotonic_time(:millisecond) < expires_at do
{:ok, value}
else
:expired
end
nil ->
:not_found
end
end)
end
end
# 5. Job queue GenServer
defmodule JobQueue do
use GenServer
def start_link do
GenServer.start_link(__MODULE__, [], name: __MODULE__)
end
def add_job(job) do
GenServer.cast(__MODULE__, {:add, job})
end
@impl true
def init(_) do
{:ok, %{queue: [], processing: false}}
end
@impl true
def handle_cast({:add, job}, %{queue: queue} = state) do
new_state = %{state | queue: queue ++ [job]}
if not state.processing do
send(self(), :process_next)
end
{:noreply, new_state}
end
@impl true
def handle_info(:process_next, %{queue: []} = state) do
{:noreply, %{state | processing: false}}
end
def handle_info(:process_next, %{queue: [job | rest]} = state) do
# Process job
IO.puts("Processing: #{job}")
:timer.sleep(1000)
# Process next
send(self(), :process_next)
{:noreply, %{state | queue: rest, processing: true}}
end
end
Next Steps
Continue to 09-mix-projects.md to learn about Mix build tool, project structure, dependencies, and creating your own applications.