Processes

Master Elixir's lightweight concurrency with processes, message passing, spawn, links, monitors, and GenServer.

Understanding Processes

Elixir processes are NOT OS threads:

  • Lightweight: ~2KB memory each - spawn millions
  • Isolated: Own memory, no shared state
  • Concurrent: Run simultaneously on BEAM VM
  • Fast: ~microseconds to spawn
  • Fault-tolerant: One crash doesn't affect others

Why Processes?

# Sequential (slow)
result1 = expensive_operation1()
result2 = expensive_operation2()
result3 = expensive_operation3()

# Concurrent (fast)
pid1 = spawn(fn -> expensive_operation1() end)
pid2 = spawn(fn -> expensive_operation2() end)
pid3 = spawn(fn -> expensive_operation3() end)
# All run simultaneously!

Spawning Processes

spawn/1

# Basic spawn
pid = spawn(fn -> IO.puts("Hello from process!") end)
# => #PID<0.123.0>

# Process executes and terminates
# "Hello from process!" is printed

# Spawn with work
pid = spawn(fn ->
  result = 1 + 1
  IO.puts("Result: #{result}")
end)

spawn/3

defmodule Math do
  def add(a, b) do
    IO.puts("#{a} + #{b} = #{a + b}")
  end
end

pid = spawn(Math, :add, [2, 3])
# => Prints "2 + 3 = 5"

Process Identity

# Current process PID
self()  # => #PID<0.105.0>

# Check if process is alive
pid = spawn(fn -> :timer.sleep(1000) end)
Process.alive?(pid)  # => true
:timer.sleep(1100)
Process.alive?(pid)  # => false

Message Passing

Processes communicate via messages - asynchronous and non-blocking.

send and receive

# Send a message
pid = spawn(fn ->
  receive do
    {:hello, msg} -> IO.puts("Got hello: #{msg}")
    {:goodbye, msg} -> IO.puts("Got goodbye: #{msg}")
  end
end)

send(pid, {:hello, "world"})
# => Prints "Got hello: world"

receive with Multiple Clauses

pid = spawn(fn ->
  receive do
    {:add, a, b} -> IO.puts("#{a} + #{b} = #{a + b}")
    {:multiply, a, b} -> IO.puts("#{a} * #{b} = #{a * b}")
    _ -> IO.puts("Unknown message")
  end
end)

send(pid, {:add, 5, 3})      # => "5 + 3 = 8"
send(pid, {:multiply, 4, 7}) # Process already terminated, message lost!

receive in a Loop

defmodule Echo do
  def loop do
    receive do
      {:echo, msg} -> 
        IO.puts("Echo: #{msg}")
        loop()  # Loop to receive more messages
      :stop -> 
        IO.puts("Stopping")
        # Don't loop, process terminates
    end
  end
end

pid = spawn(&Echo.loop/0)
send(pid, {:echo, "hello"})  # => "Echo: hello"
send(pid, {:echo, "world"})  # => "Echo: world"
send(pid, :stop)             # => "Stopping"

Receiving with Timeout

receive do
  {:hello, msg} -> msg
after
  1000 -> "No message in 1 second"
end

Reply Pattern

defmodule Calculator do
  def start do
    spawn(fn -> loop() end)
  end
  
  defp loop do
    receive do
      {:add, a, b, caller} ->
        send(caller, {:result, a + b})
        loop()
      :stop ->
        :ok
    end
  end
end

pid = Calculator.start()
send(pid, {:add, 5, 3, self()})

receive do
  {:result, value} -> IO.puts("Result: #{value}")
after
  1000 -> IO.puts("Timeout")
end
# => "Result: 8"

Process Linking

When processes should fail together:

spawn_link/1

# Linked processes
spawn_link(fn ->
  raise "I crashed!"
end)
# Current process also crashes!

# Trap exits to handle it
Process.flag(:trap_exit, true)

pid = spawn_link(fn ->
  raise "I crashed!"
end)

receive do
  {:EXIT, ^pid, reason} ->
    IO.puts("Process crashed: #{inspect(reason)}")
end
pid = spawn(fn -> :timer.sleep(5000) end)
Process.link(pid)

# Now they're linked
Process.info(self(), :links)
# => {:links, [#PID<...>]}
pid = spawn_link(fn -> :timer.sleep(5000) end)
Process.unlink(pid)
# No longer linked

Process Monitoring

One-way observation - doesn't link:

spawn_monitor/1

{pid, ref} = spawn_monitor(fn ->
  :timer.sleep(1000)
  raise "Oops"
end)

receive do
  {:DOWN, ^ref, :process, ^pid, reason} ->
    IO.puts("Process died: #{inspect(reason)}")
end

Monitor Existing Process

pid = spawn(fn -> :timer.sleep(1000) end)
ref = Process.monitor(pid)

receive do
  {:DOWN, ^ref, :process, ^pid, reason} ->
    IO.puts("Process finished: #{reason}")
after
  2000 -> IO.puts("Timeout")
end

Demonitor

ref = Process.monitor(pid)
Process.demonitor(ref)
# Links: Bidirectional, crash together (supervisor pattern)
spawn_link(fn -> work() end)

# Monitors: Unidirectional, observer gets notified (watching pattern)
spawn_monitor(fn -> work() end)

Named Processes

Register processes with atoms:

Process.register/2

pid = spawn(fn ->
  receive do
    msg -> IO.puts("Got: #{msg}")
  end
end)

Process.register(pid, :my_process)

send(:my_process, "Hello!")
# => "Got: Hello!"

# Whereis
Process.whereis(:my_process)
# => #PID<0.123.0>

# Unregister
Process.unregister(:my_process)

Named spawn

defmodule Counter do
  def start do
    spawn(fn -> loop(0) end)
    |> Process.register(:counter)
  end
  
  defp loop(count) do
    receive do
      :increment -> loop(count + 1)
      {:get, caller} ->
        send(caller, {:count, count})
        loop(count)
    end
  end
end

Counter.start()
send(:counter, :increment)
send(:counter, :increment)
send(:counter, {:get, self()})

receive do
  {:count, n} -> IO.puts("Count: #{n}")
end
# => "Count: 2"

Task Module

Higher-level abstractions for processes:

Task.async/1 and Task.await/1

# Run concurrently, wait for result
task = Task.async(fn ->
  :timer.sleep(1000)
  "Done!"
end)

# Do other work...

result = Task.await(task)
# => "Done!" (after 1 second)

Multiple Concurrent Tasks

tasks = [
  Task.async(fn -> fetch_users() end),
  Task.async(fn -> fetch_orders() end),
  Task.async(fn -> fetch_products() end)
]

results = Task.await_many(tasks)
# => [users, orders, products]

Task.start/1

Fire and forget:

Task.start(fn ->
  # Background work, don't care about result
  send_email()
end)

Task.start_link/1

Linked task:

Task.start_link(fn ->
  # If this crashes, parent is notified
  critical_work()
end)

Agent Module

Simple state management:

Creating an Agent

{:ok, agent} = Agent.start_link(fn -> 0 end)

# Get value
Agent.get(agent, fn state -> state end)
# => 0

# Update value
Agent.update(agent, fn state -> state + 1 end)

Agent.get(agent, fn state -> state end)
# => 1

Named Agent

{:ok, _} = Agent.start_link(fn -> %{} end, name: :config)

Agent.update(:config, fn state -> Map.put(state, :key, "value") end)

Agent.get(:config, fn state -> state end)
# => %{key: "value"}

Agent Functions

defmodule Counter do
  def start_link(initial \\ 0) do
    Agent.start_link(fn -> initial end, name: __MODULE__)
  end
  
  def value do
    Agent.get(__MODULE__, & &1)
  end
  
  def increment do
    Agent.update(__MODULE__, &(&1 + 1))
  end
  
  def decrement do
    Agent.update(__MODULE__, &(&1 - 1))
  end
end

Counter.start_link(10)
Counter.increment()
Counter.value()  # => 11

GenServer

Generic server behavior - the foundation of OTP:

Basic GenServer

defmodule Stack do
  use GenServer
  
  # Client API
  
  def start_link(initial_stack) do
    GenServer.start_link(__MODULE__, initial_stack, name: __MODULE__)
  end
  
  def push(item) do
    GenServer.cast(__MODULE__, {:push, item})
  end
  
  def pop do
    GenServer.call(__MODULE__, :pop)
  end
  
  # Server Callbacks
  
  @impl true
  def init(initial_stack) do
    {:ok, initial_stack}
  end
  
  @impl true
  def handle_cast({:push, item}, state) do
    {:noreply, [item | state]}
  end
  
  @impl true
  def handle_call(:pop, _from, [head | tail]) do
    {:reply, head, tail}
  end
  
  def handle_call(:pop, _from, []) do
    {:reply, nil, []}
  end
end

# Usage
{:ok, _pid} = Stack.start_link([1, 2, 3])
Stack.push(4)
Stack.pop()  # => 4
Stack.pop()  # => 3

GenServer Callbacks

defmodule MyServer do
  use GenServer
  
  # Initialize state
  def init(args) do
    {:ok, initial_state}
    # or {:ok, state, timeout}
    # or :ignore
    # or {:stop, reason}
  end
  
  # Synchronous - returns reply
  def handle_call(request, from, state) do
    {:reply, reply, new_state}
    # or {:noreply, new_state}
    # or {:stop, reason, reply, new_state}
  end
  
  # Asynchronous - no reply
  def handle_cast(request, state) do
    {:noreply, new_state}
    # or {:stop, reason, new_state}
  end
  
  # Handle messages sent directly to process
  def handle_info(msg, state) do
    {:noreply, new_state}
  end
  
  # Cleanup before termination
  def terminate(reason, state) do
    # Cleanup code
    :ok
  end
end

call vs cast

# call: Synchronous, waits for reply
GenServer.call(pid, :get_state)

# cast: Asynchronous, fire and forget
GenServer.cast(pid, {:update, value})

GenServer with State

defmodule KVStore do
  use GenServer
  
  def start_link do
    GenServer.start_link(__MODULE__, %{}, name: __MODULE__)
  end
  
  def put(key, value) do
    GenServer.cast(__MODULE__, {:put, key, value})
  end
  
  def get(key) do
    GenServer.call(__MODULE__, {:get, key})
  end
  
  def delete(key) do
    GenServer.cast(__MODULE__, {:delete, key})
  end
  
  @impl true
  def init(_), do: {:ok, %{}}
  
  @impl true
  def handle_cast({:put, key, value}, state) do
    {:noreply, Map.put(state, key, value)}
  end
  
  def handle_cast({:delete, key}, state) do
    {:noreply, Map.delete(state, key)}
  end
  
  @impl true
  def handle_call({:get, key}, _from, state) do
    {:reply, Map.get(state, key), state}
  end
end

KVStore.start_link()
KVStore.put(:name, "Alice")
KVStore.get(:name)  # => "Alice"

GenServer with Timer

defmodule Ticker do
  use GenServer
  
  def start_link(interval) do
    GenServer.start_link(__MODULE__, interval, name: __MODULE__)
  end
  
  @impl true
  def init(interval) do
    schedule_tick(interval)
    {:ok, %{interval: interval, count: 0}}
  end
  
  @impl true
  def handle_info(:tick, state) do
    IO.puts("Tick #{state.count}")
    schedule_tick(state.interval)
    {:noreply, %{state | count: state.count + 1}}
  end
  
  defp schedule_tick(interval) do
    Process.send_after(self(), :tick, interval)
  end
end

Ticker.start_link(1000)
# Prints "Tick 0", "Tick 1", etc. every second

Practical Examples

Concurrent URL Fetcher

defmodule URLFetcher do
  def fetch_all(urls) do
    urls
    |> Enum.map(&Task.async(fn -> fetch(&1) end))
    |> Task.await_many(timeout: 10_000)
  end
  
  defp fetch(url) do
    case HTTPoison.get(url) do
      {:ok, %{status_code: 200, body: body}} ->
        {:ok, url, body}
      {:ok, %{status_code: code}} ->
        {:error, url, "Status #{code}"}
      {:error, reason} ->
        {:error, url, reason}
    end
  end
end

urls = ["https://example.com", "https://example.org"]
URLFetcher.fetch_all(urls)

Worker Pool

defmodule Worker do
  use GenServer
  
  def start_link(id) do
    GenServer.start_link(__MODULE__, id)
  end
  
  def init(id) do
    {:ok, id}
  end
  
  def handle_call({:work, task}, _from, id) do
    result = perform_work(task)
    {:reply, {:ok, result, id}, id}
  end
  
  defp perform_work(task) do
    :timer.sleep(1000)
    "Completed: #{task}"
  end
end

defmodule WorkerPool do
  def start(size) do
    Enum.map(1..size, fn id ->
      {:ok, pid} = Worker.start_link(id)
      pid
    end)
  end
  
  def distribute_work(workers, tasks) do
    tasks
    |> Enum.zip(Stream.cycle(workers))
    |> Enum.map(fn {task, worker} ->
      Task.async(fn ->
        GenServer.call(worker, {:work, task})
      end)
    end)
    |> Task.await_many()
  end
end

workers = WorkerPool.start(3)
tasks = ["task1", "task2", "task3", "task4", "task5"]
WorkerPool.distribute_work(workers, tasks)

PubSub System

defmodule PubSub do
  use GenServer
  
  def start_link do
    GenServer.start_link(__MODULE__, %{}, name: __MODULE__)
  end
  
  def subscribe(topic) do
    GenServer.cast(__MODULE__, {:subscribe, topic, self()})
  end
  
  def publish(topic, message) do
    GenServer.cast(__MODULE__, {:publish, topic, message})
  end
  
  @impl true
  def init(_) do
    {:ok, %{}}
  end
  
  @impl true
  def handle_cast({:subscribe, topic, pid}, state) do
    subscribers = Map.get(state, topic, [])
    {:noreply, Map.put(state, topic, [pid | subscribers])}
  end
  
  def handle_cast({:publish, topic, message}, state) do
    subscribers = Map.get(state, topic, [])
    Enum.each(subscribers, fn pid ->
      send(pid, {:message, topic, message})
    end)
    {:noreply, state}
  end
end

# Usage
PubSub.start_link()
PubSub.subscribe(:news)

spawn(fn ->
  receive do
    {:message, topic, msg} ->
      IO.puts("Received on #{topic}: #{msg}")
  end
end)

PubSub.publish(:news, "Breaking news!")

Rate Limiter

defmodule RateLimiter do
  use GenServer
  
  def start_link(max_requests, window_ms) do
    GenServer.start_link(__MODULE__, {max_requests, window_ms}, name: __MODULE__)
  end
  
  def check do
    GenServer.call(__MODULE__, :check)
  end
  
  @impl true
  def init({max_requests, window_ms}) do
    state = %{
      max_requests: max_requests,
      window_ms: window_ms,
      requests: []
    }
    {:ok, state}
  end
  
  @impl true
  def handle_call(:check, _from, state) do
    now = System.monotonic_time(:millisecond)
    cutoff = now - state.window_ms
    
    recent_requests = Enum.filter(state.requests, &(&1 > cutoff))
    
    if length(recent_requests) < state.max_requests do
      new_state = %{state | requests: [now | recent_requests]}
      {:reply, :ok, new_state}
    else
      {:reply, :rate_limited, state}
    end
  end
end

# Allow 5 requests per 1 second
RateLimiter.start_link(5, 1000)

Enum.each(1..10, fn i ->
  case RateLimiter.check() do
    :ok -> IO.puts("Request #{i}: OK")
    :rate_limited -> IO.puts("Request #{i}: RATE LIMITED")
  end
  :timer.sleep(100)
end)

Exercises

  1. Create a process that echoes messages back to the sender

  2. Implement a GenServer-based counter with increment, decrement, and get operations

  3. Build a parallel map function using spawn and message passing

  4. Create a simple cache using Agent that expires entries after a timeout

  5. Implement a GenServer that manages a queue of jobs and processes them one at a time

# Solutions

# 1. Echo process
defmodule Echo do
  def start do
    spawn(fn -> loop() end)
  end
  
  defp loop do
    receive do
      {sender, msg} ->
        send(sender, {:echo, msg})
        loop()
    end
  end
end

pid = Echo.start()
send(pid, {self(), "Hello"})
receive do
  {:echo, msg} -> IO.puts("Got: #{msg}")
end

# 2. GenServer counter
defmodule Counter do
  use GenServer
  
  def start_link(initial \\ 0) do
    GenServer.start_link(__MODULE__, initial, name: __MODULE__)
  end
  
  def increment do
    GenServer.cast(__MODULE__, :increment)
  end
  
  def decrement do
    GenServer.cast(__MODULE__, :decrement)
  end
  
  def get do
    GenServer.call(__MODULE__, :get)
  end
  
  @impl true
  def init(initial), do: {:ok, initial}
  
  @impl true
  def handle_cast(:increment, state), do: {:noreply, state + 1}
  def handle_cast(:decrement, state), do: {:noreply, state - 1}
  
  @impl true
  def handle_call(:get, _from, state), do: {:reply, state, state}
end

# 3. Parallel map
defmodule ParallelMap do
  def pmap(list, func) do
    list
    |> Enum.map(&Task.async(fn -> func.(&1) end))
    |> Enum.map(&Task.await/1)
  end
end

ParallelMap.pmap([1, 2, 3, 4], fn x ->
  :timer.sleep(1000)
  x * 2
end)
# => [2, 4, 6, 8] (takes ~1 second, not 4)

# 4. Cache with expiry
defmodule Cache do
  use Agent
  
  def start_link do
    Agent.start_link(fn -> %{} end, name: __MODULE__)
  end
  
  def put(key, value, ttl \\ 60_000) do
    expires_at = System.monotonic_time(:millisecond) + ttl
    Agent.update(__MODULE__, &Map.put(&1, key, {value, expires_at}))
  end
  
  def get(key) do
    Agent.get(__MODULE__, fn state ->
      case Map.get(state, key) do
        {value, expires_at} ->
          if System.monotonic_time(:millisecond) < expires_at do
            {:ok, value}
          else
            :expired
          end
        nil ->
          :not_found
      end
    end)
  end
end

# 5. Job queue GenServer
defmodule JobQueue do
  use GenServer
  
  def start_link do
    GenServer.start_link(__MODULE__, [], name: __MODULE__)
  end
  
  def add_job(job) do
    GenServer.cast(__MODULE__, {:add, job})
  end
  
  @impl true
  def init(_) do
    {:ok, %{queue: [], processing: false}}
  end
  
  @impl true
  def handle_cast({:add, job}, %{queue: queue} = state) do
    new_state = %{state | queue: queue ++ [job]}
    if not state.processing do
      send(self(), :process_next)
    end
    {:noreply, new_state}
  end
  
  @impl true
  def handle_info(:process_next, %{queue: []} = state) do
    {:noreply, %{state | processing: false}}
  end
  
  def handle_info(:process_next, %{queue: [job | rest]} = state) do
    # Process job
    IO.puts("Processing: #{job}")
    :timer.sleep(1000)
    
    # Process next
    send(self(), :process_next)
    {:noreply, %{state | queue: rest, processing: true}}
  end
end

Next Steps

Continue to 09-mix-projects.md to learn about Mix build tool, project structure, dependencies, and creating your own applications.