Ecto and Databases

Master Ecto - Elixir's database wrapper and query DSL for working with SQL databases.

What is Ecto?

Ecto provides:

  • Repository pattern: Database access layer
  • Schemas: Map database tables to Elixir structs
  • Changesets: Data validation and casting
  • Query DSL: Type-safe database queries
  • Migrations: Version control for database schema

Setup

Add Dependencies

# mix.exs
defp deps do
  [
    {:ecto_sql, "~> 3.10"},
    {:postgrex, "~> 0.17"}  # For PostgreSQL
    # {:myxql, "~> 0.6"}    # For MySQL
    # {:ecto_sqlite3, "~> 0.12"}  # For SQLite
  ]
end

Generate Repo

mix ecto.gen.repo -r MyApp.Repo

Creates:

  • lib/myapp/repo.ex
  • Configuration in config/config.exs

Configure

# config/config.exs
config :myapp, MyApp.Repo,
  database: "myapp_dev",
  username: "postgres",
  password: "postgres",
  hostname: "localhost"

config :myapp, ecto_repos: [MyApp.Repo]

Add to Supervision Tree

# lib/myapp/application.ex
def start(_type, _args) do
  children = [
    MyApp.Repo
  ]

  Supervisor.start_link(children, strategy: :one_for_one)
end

Create Database

mix ecto.create

Schemas

Map database tables to Elixir structs.

Basic Schema

defmodule MyApp.Accounts.User do
  use Ecto.Schema

  schema "users" do
    field :name, :string
    field :email, :string
    field :age, :integer
    field :active, :boolean, default: true

    timestamps()  # Adds inserted_at and updated_at
  end
end

Field Types

field :string_field, :string
field :integer_field, :integer
field :float_field, :float
field :boolean_field, :boolean
field :binary_field, :binary
field :date_field, :date
field :time_field, :time
field :naive_datetime_field, :naive_datetime
field :utc_datetime_field, :utc_datetime
field :decimal_field, :decimal
field :map_field, :map
field :array_field, {:array, :string}
field :enum_field, Ecto.Enum, values: [:pending, :active, :inactive]

Virtual Fields

schema "users" do
  field :name, :string
  field :email, :string
  field :password, :string, virtual: true  # Not persisted
  field :password_hash, :string

  timestamps()
end

Associations

belongs_to

defmodule MyApp.Blog.Post do
  use Ecto.Schema

  schema "posts" do
    field :title, :string
    field :body, :text

    belongs_to :user, MyApp.Accounts.User

    timestamps()
  end
end

has_many

defmodule MyApp.Accounts.User do
  use Ecto.Schema

  schema "users" do
    field :name, :string

    has_many :posts, MyApp.Blog.Post

    timestamps()
  end
end

has_one

defmodule MyApp.Accounts.User do
  use Ecto.Schema

  schema "users" do
    field :name, :string

    has_one :profile, MyApp.Accounts.Profile

    timestamps()
  end
end

many_to_many

defmodule MyApp.Blog.Post do
  use Ecto.Schema

  schema "posts" do
    field :title, :string

    many_to_many :tags, MyApp.Blog.Tag,
      join_through: "posts_tags",
      on_replace: :delete

    timestamps()
  end
end

Migrations

Version control for database schema.

Generate Migration

mix ecto.gen.migration create_users

Basic Migration

defmodule MyApp.Repo.Migrations.CreateUsers do
  use Ecto.Migration

  def change do
    create table(:users) do
      add :name, :string, null: false
      add :email, :string, null: false
      add :age, :integer
      add :active, :boolean, default: true

      timestamps()
    end

    create unique_index(:users, [:email])
  end
end

Run Migrations

mix ecto.migrate

# Rollback
mix ecto.rollback

# Rollback specific number of migrations
mix ecto.rollback --step 2

# Migrate to specific version
mix ecto.migrate --to 20230101120000

Migration Operations

# Create table
create table(:users) do
  add :name, :string
end

# Alter table
alter table(:users) do
  add :username, :string
  modify :email, :string, null: false
  remove :age
end

# Drop table
drop table(:users)

# Rename table
rename table(:old_name), to: table(:new_name)

# Rename column
rename table(:users), :old_column, to: :new_column

# Create index
create index(:users, [:email])
create unique_index(:users, [:username])

# Drop index
drop index(:users, [:email])

# Add foreign key
alter table(:posts) do
  add :user_id, references(:users, on_delete: :delete_all)
end

# Execute raw SQL
execute "ALTER TABLE users ADD CONSTRAINT..."

Up/Down Migrations

defmodule MyApp.Repo.Migrations.AddFieldToUsers do
  use Ecto.Migration

  def up do
    alter table(:users) do
      add :new_field, :string
    end
  end

  def down do
    alter table(:users) do
      remove :new_field
    end
  end
end

Changesets

Validate and cast data before inserting/updating.

Basic Changeset

defmodule MyApp.Accounts.User do
  use Ecto.Schema
  import Ecto.Changeset

  schema "users" do
    field :name, :string
    field :email, :string
    field :age, :integer

    timestamps()
  end

  def changeset(user, attrs) do
    user
    |> cast(attrs, [:name, :email, :age])
    |> validate_required([:name, :email])
    |> validate_format(:email, ~r/@/)
    |> validate_number(:age, greater_than: 0)
    |> unique_constraint(:email)
  end
end

Validations

def changeset(user, attrs) do
  user
  |> cast(attrs, [:name, :email, :password, :age])
  |> validate_required([:name, :email, :password])
  |> validate_length(:name, min: 2, max: 100)
  |> validate_length(:password, min: 8)
  |> validate_format(:email, ~r/@/)
  |> validate_number(:age, greater_than: 0, less_than: 150)
  |> validate_inclusion(:role, ["user", "admin"])
  |> validate_exclusion(:username, ["admin", "root"])
  |> validate_confirmation(:password)
  |> unique_constraint(:email)
end

Custom Validations

def changeset(user, attrs) do
  user
  |> cast(attrs, [:username, :email])
  |> validate_required([:username, :email])
  |> validate_username()
end

defp validate_username(changeset) do
  validate_change(changeset, :username, fn :username, username ->
    if String.match?(username, ~r/^[a-z0-9_]+$/) do
      []
    else
      [username: "must contain only lowercase letters, numbers, and underscores"]
    end
  end)
end

Changeset Operations

# Create changeset
changeset = User.changeset(%User{}, %{name: "Alice", email: "alice@example.com"})

# Check if valid
changeset.valid?  # => true/false

# Get errors
changeset.errors  # => [name: {"is required", [validation: :required]}]

# Get changes
changeset.changes  # => %{name: "Alice", email: "alice@example.com"}

# Put change
changeset = put_change(changeset, :active, true)

# Get field
get_field(changeset, :name)  # => "Alice"
get_change(changeset, :name)  # => "Alice" (only if changed)

Nested Changesets

defmodule MyApp.Blog.Post do
  use Ecto.Schema
  import Ecto.Changeset

  schema "posts" do
    field :title, :string

    has_many :comments, MyApp.Blog.Comment

    timestamps()
  end

  def changeset(post, attrs) do
    post
    |> cast(attrs, [:title])
    |> cast_assoc(:comments)
  end
end

Queries

Repo Operations

# Insert
{:ok, user} = MyApp.Repo.insert(%User{name: "Alice", email: "alice@example.com"})

# Insert with changeset
changeset = User.changeset(%User{}, %{name: "Bob", email: "bob@example.com"})
{:ok, user} = MyApp.Repo.insert(changeset)

# Insert! (raises on error)
user = MyApp.Repo.insert!(%User{name: "Charlie", email: "charlie@example.com"})

# Get by ID
user = MyApp.Repo.get(User, 1)
user = MyApp.Repo.get!(User, 1)  # Raises if not found

# Get by field
user = MyApp.Repo.get_by(User, email: "alice@example.com")

# Get all
users = MyApp.Repo.all(User)

# Update
user = MyApp.Repo.get!(User, 1)
changeset = User.changeset(user, %{name: "Updated Name"})
{:ok, user} = MyApp.Repo.update(changeset)

# Delete
user = MyApp.Repo.get!(User, 1)
{:ok, user} = MyApp.Repo.delete(user)

# Delete all
{count, _} = MyApp.Repo.delete_all(User)

Query DSL

import Ecto.Query

# Basic query
query = from u in User, select: u
users = MyApp.Repo.all(query)

# Where
query = from u in User, where: u.age > 18
adults = MyApp.Repo.all(query)

# Multiple conditions
query = from u in User,
  where: u.age > 18 and u.active == true

# Select specific fields
query = from u in User, select: {u.name, u.email}
name_emails = MyApp.Repo.all(query)

# Select map
query = from u in User, select: %{name: u.name, email: u.email}

# Order
query = from u in User, order_by: u.name
query = from u in User, order_by: [desc: u.inserted_at]

# Limit and offset
query = from u in User, limit: 10, offset: 20

# Count
count = MyApp.Repo.one(from u in User, select: count(u.id))

# Aggregate functions
query = from u in User, select: {avg(u.age), max(u.age), min(u.age)}

Pipe-based Queries

User
|> where([u], u.age > 18)
|> where([u], u.active == true)
|> order_by([u], desc: u.inserted_at)
|> limit(10)
|> MyApp.Repo.all()

Join Queries

# Inner join
query = from u in User,
  join: p in assoc(u, :posts),
  where: p.published == true,
  select: {u, p}

# Left join
query = from u in User,
  left_join: p in assoc(u, :posts),
  select: {u, p}

# Preload
query = from u in User, preload: [:posts, :profile]
users = MyApp.Repo.all(query)

# Manual join
query = from u in User,
  join: p in Post, on: p.user_id == u.id,
  where: p.published == true

Preloading Associations

# Preload in query
users = MyApp.Repo.all(from u in User, preload: [:posts])

# Preload after fetch
users = MyApp.Repo.all(User)
users = MyApp.Repo.preload(users, [:posts])

# Nested preload
users = MyApp.Repo.all(from u in User, preload: [posts: :comments])

# Custom preload query
posts_query = from p in Post, where: p.published == true
users = MyApp.Repo.preload(users, posts: posts_query)

Dynamic Queries

def list_users(filters) do
  User
  |> filter_by_name(filters[:name])
  |> filter_by_age(filters[:age])
  |> MyApp.Repo.all()
end

defp filter_by_name(query, nil), do: query
defp filter_by_name(query, name) do
  from u in query, where: ilike(u.name, ^"%#{name}%")
end

defp filter_by_age(query, nil), do: query
defp filter_by_age(query, age) do
  from u in query, where: u.age == ^age
end

Transactions

MyApp.Repo.transaction(fn ->
  user = MyApp.Repo.insert!(%User{name: "Alice"})
  MyApp.Repo.insert!(%Post{title: "First Post", user_id: user.id})
end)

# With explicit rollback
MyApp.Repo.transaction(fn ->
  {:ok, user} = MyApp.Repo.insert(%User{name: "Bob"})
  
  case MyApp.Repo.insert(%Post{title: "Post", user_id: user.id}) do
    {:ok, post} -> post
    {:error, _} -> MyApp.Repo.rollback(:post_creation_failed)
  end
end)

Multi

Compose multiple operations:

alias Ecto.Multi

Multi.new()
|> Multi.insert(:user, User.changeset(%User{}, user_params))
|> Multi.run(:profile, fn repo, %{user: user} ->
  profile_params = Map.put(profile_params, :user_id, user.id)
  repo.insert(Profile.changeset(%Profile{}, profile_params))
end)
|> Multi.run(:notify, fn _repo, %{user: user} ->
  send_welcome_email(user)
  {:ok, :email_sent}
end)
|> MyApp.Repo.transaction()

Practical Examples

User CRUD Module

defmodule MyApp.Accounts do
  import Ecto.Query
  alias MyApp.Repo
  alias MyApp.Accounts.User

  def list_users do
    Repo.all(User)
  end

  def get_user!(id), do: Repo.get!(User, id)

  def get_user_by_email(email) do
    Repo.get_by(User, email: email)
  end

  def create_user(attrs \\ %{}) do
    %User{}
    |> User.changeset(attrs)
    |> Repo.insert()
  end

  def update_user(%User{} = user, attrs) do
    user
    |> User.changeset(attrs)
    |> Repo.update()
  end

  def delete_user(%User{} = user) do
    Repo.delete(user)
  end

  def list_active_users do
    User
    |> where([u], u.active == true)
    |> order_by([u], desc: u.inserted_at)
    |> Repo.all()
  end
end

Pagination

defmodule MyApp.Accounts do
  def list_users(page \\ 1, per_page \\ 20) do
    offset = (page - 1) * per_page

    users = User
    |> order_by([u], desc: u.inserted_at)
    |> limit(^per_page)
    |> offset(^offset)
    |> Repo.all()

    total = Repo.aggregate(User, :count)

    %{
      users: users,
      page: page,
      per_page: per_page,
      total: total,
      total_pages: ceil(total / per_page)
    }
  end
end

Search Functionality

def search_users(search_term) do
  search_pattern = "%#{search_term}%"

  User
  |> where([u], ilike(u.name, ^search_pattern) or ilike(u.email, ^search_pattern))
  |> Repo.all()
end

Soft Delete

# Schema
schema "users" do
  field :name, :string
  field :deleted_at, :utc_datetime

  timestamps()
end

# Context
def soft_delete_user(%User{} = user) do
  user
  |> User.changeset(%{deleted_at: DateTime.utc_now()})
  |> Repo.update()
end

def list_active_users do
  User
  |> where([u], is_nil(u.deleted_at))
  |> Repo.all()
end

Exercises

  1. Create a schema for a Product with name, price, quantity, and category. Add proper validations

  2. Write a migration to create a many-to-many relationship between users and roles

  3. Build a context module with CRUD operations for managing blog posts

  4. Implement a search function that filters posts by title, body, or author name

  5. Create a transaction that transfers inventory between two warehouses

# Solutions

# 1. Product schema
defmodule MyApp.Store.Product do
  use Ecto.Schema
  import Ecto.Changeset

  schema "products" do
    field :name, :string
    field :price, :decimal
    field :quantity, :integer
    field :category, Ecto.Enum, values: [:electronics, :clothing, :food, :other]

    timestamps()
  end

  def changeset(product, attrs) do
    product
    |> cast(attrs, [:name, :price, :quantity, :category])
    |> validate_required([:name, :price, :quantity, :category])
    |> validate_number(:price, greater_than: 0)
    |> validate_number(:quantity, greater_than_or_equal_to: 0)
  end
end

# 2. Many-to-many migration
defmodule MyApp.Repo.Migrations.CreateUsersRoles do
  use Ecto.Migration

  def change do
    create table(:roles) do
      add :name, :string, null: false

      timestamps()
    end

    create unique_index(:roles, [:name])

    create table(:users_roles, primary_key: false) do
      add :user_id, references(:users, on_delete: :delete_all), null: false
      add :role_id, references(:roles, on_delete: :delete_all), null: false
    end

    create unique_index(:users_roles, [:user_id, :role_id])
  end
end

# 3. Blog context
defmodule MyApp.Blog do
  import Ecto.Query
  alias MyApp.Repo
  alias MyApp.Blog.Post

  def list_posts do
    Post
    |> order_by([p], desc: p.inserted_at)
    |> preload(:user)
    |> Repo.all()
  end

  def get_post!(id) do
    Post
    |> preload(:user)
    |> Repo.get!(id)
  end

  def create_post(attrs) do
    %Post{}
    |> Post.changeset(attrs)
    |> Repo.insert()
  end

  def update_post(%Post{} = post, attrs) do
    post
    |> Post.changeset(attrs)
    |> Repo.update()
  end

  def delete_post(%Post{} = post) do
    Repo.delete(post)
  end

  def publish_post(%Post{} = post) do
    post
    |> Post.changeset(%{published: true, published_at: DateTime.utc_now()})
    |> Repo.update()
  end
end

# 4. Search function
def search_posts(query_string) do
  pattern = "%#{query_string}%"

  from(p in Post,
    left_join: u in assoc(p, :user),
    where: ilike(p.title, ^pattern) or
           ilike(p.body, ^pattern) or
           ilike(u.name, ^pattern),
    preload: [:user]
  )
  |> Repo.all()
end

# 5. Inventory transfer transaction
defmodule MyApp.Inventory do
  alias Ecto.Multi
  
  def transfer(product_id, from_warehouse_id, to_warehouse_id, quantity) do
    Multi.new()
    |> Multi.run(:from_stock, fn repo, _ ->
      update_stock(repo, product_id, from_warehouse_id, -quantity)
    end)
    |> Multi.run(:to_stock, fn repo, _ ->
      update_stock(repo, product_id, to_warehouse_id, quantity)
    end)
    |> Multi.insert(:transfer_log, fn %{from_stock: from, to_stock: to} ->
      %TransferLog{
        product_id: product_id,
        from_warehouse_id: from_warehouse_id,
        to_warehouse_id: to_warehouse_id,
        quantity: quantity
      }
    end)
    |> Repo.transaction()
  end

  defp update_stock(repo, product_id, warehouse_id, quantity_change) do
    case repo.get_by(Stock, product_id: product_id, warehouse_id: warehouse_id) do
      nil ->
        {:error, :stock_not_found}
      stock ->
        new_quantity = stock.quantity + quantity_change
        if new_quantity < 0 do
          {:error, :insufficient_stock}
        else
          stock
          |> Ecto.Changeset.change(%{quantity: new_quantity})
          |> repo.update()
        end
    end
  end
end

Next Steps

Continue to 12-phoenix-web.md to learn about the Phoenix web framework - controllers, views, templates, and LiveView.