Skip to content

Implements the middleware pattern for your Ecto repos

License

Notifications You must be signed in to change notification settings

vereis/ecto_middleware

Repository files navigation

EctoMiddleware

EctoMiddleware

Hex Version Hex Docs CI Status Coverage Status

Intercept and transform Ecto repository operations using a middleware pipeline pattern.

EctoMiddleware provides a clean, composable way to add cross-cutting concerns to your Ecto operations. Inspired by Absinthe's middleware and Plug, it allows you to transform data before and after database operations, or replace operations entirely.

Features

  • Transform data before it reaches the database (normalization, validation, enrichment)
  • Transform data after database operations (logging, notifications, caching)
  • Replace operations entirely (soft deletes, read-through caching, authorization)
  • Halt execution with authorization checks or validation failures
  • Telemetry integration for observability
  • Backwards compatible with v1.x middleware

Installation

Add ecto_middleware to your dependencies in mix.exs:

def deps do
  [
    {:ecto_middleware, "~> 2.0"}
  ]
end

Quick Start

1. Enable middleware in your Repo

defmodule MyApp.Repo do
  use Ecto.Repo,
    otp_app: :my_app,
    adapter: Ecto.Adapters.Postgres

  use EctoMiddleware.Repo

  # Define middleware for specific operations
  @impl EctoMiddleware.Repo
  def middleware(action, resource) when is_insert(action, resource) do
    [NormalizeEmail, HashPassword, AuditLog]
  end

  def middleware(action, resource) when is_delete(action, resource) do
    [SoftDelete, AuditLog]
  end

  def middleware(_action, _resource) do
    [AuditLog]
  end
end

2. Write middleware

defmodule NormalizeEmail do
  use EctoMiddleware

  @impl EctoMiddleware
  def process_before(changeset, _resolution) do
    case Ecto.Changeset.fetch_change(changeset, :email) do
      {:ok, email} ->
        {:cont, Ecto.Changeset.put_change(changeset, :email, String.downcase(email))}

      :error ->
        {:cont, changeset}
    end
  end
end

3. Use your Repo as normal

# Middleware runs automatically!

%User{}
|> User.changeset(%{name: "Alice", email: "ALICE@EXAMPLE.COM"})
|> MyApp.Repo.insert()
# => Email is normalized to "alice@example.com" before insertion

Core Concepts

The API

Implement process_before/2 to transform data before the database operation:

defmodule AddTimestamp do
  use EctoMiddleware

  @impl EctoMiddleware
  def process_before(changeset, _resolution) do
    {:cont, Ecto.Changeset.put_change(changeset, :processed_at, DateTime.utc_now())}
  end
end

Implement process_after/2 to process data after the database operation:

defmodule NotifyAdmin do
  use EctoMiddleware

  @impl EctoMiddleware
  def process_after({:ok, user} = result, _resolution) do
    Task.start(fn -> send_notification(user) end)
    {:cont, result}
  end

  def process_after(result, _resolution) do
    {:cont, result}
  end
end

Implement process/2 to wrap around the entire operation:

defmodule MeasureLatency do
  use EctoMiddleware

  @impl EctoMiddleware
  def process(resource, resolution) do
    start_time = System.monotonic_time()

    # Yields control to the next middleware (or Repo operation) in the chain
    # before resuming here.
    {result, _updated_resolution} = yield(resource, resolution)

    duration = System.monotonic_time() - start_time
    Logger.info("#{resolution.action} took #{duration}ns")

    result
  end
end

Important: When using process/2, you must call yield/2 to continue the middleware chain.

Halting Execution

Return {:halt, value} to stop the middleware chain and return a value immediately:

defmodule RequireAuth do
  use EctoMiddleware

  @impl EctoMiddleware
  def process(resource, resolution) do
    if authorized?(resolution) do
      {result, _} = yield(resource, resolution)
      result
    else
      {:halt, {:error, :unauthorized}}
    end
  end

  defp authorized?(resolution) do
    get_private(resolution, :current_user) != nil
  end
end

Guards for Operation Detection

EctoMiddleware provides guards to detect operations, especially useful for insert_or_update:

defmodule ConditionalMiddleware do
  use EctoMiddleware

  @impl EctoMiddleware
  def process_before(changeset, resolution) when is_insert(changeset, resolution) do
    {:cont, add_created_metadata(changeset)}
  end

  def process_before(changeset, resolution) when is_update(changeset, resolution) do
    {:cont, add_updated_metadata(changeset)}
  end

  def process_before(changeset, _resolution) do
    {:cont, changeset}
  end
end

Return Value Conventions

process_before/2 and process_after/2

Returning bare values from middleware is supported, but to be explicit, return one of:

  • {:cont, value} - Continue to next middleware
  • {:halt, value} - Stop execution, return value
  • {:cont, value, updated_resolution} - Continue with updated resolution
  • {:halt, value, updated_resolution} - Stop with updated resolution

Bare values are always treated as {:cont, value}.

Telemetry

EctoMiddleware emits telemetry events for observability:

Pipeline Events

  • [:ecto_middleware, :pipeline, :start] - Pipeline execution starts

    • Measurements: %{system_time: integer()}
    • Metadata: %{repo: module(), action: atom(), pipeline_id: reference()}
  • [:ecto_middleware, :pipeline, :stop] - Pipeline execution completes

    • Measurements: %{duration: integer()}
    • Metadata: %{repo: module(), action: atom()}
  • [:ecto_middleware, :pipeline, :exception] - Pipeline execution fails

    • Measurements: %{duration: integer()}
    • Metadata: %{repo: module(), action: atom(), kind: atom(), reason: term()}

Middleware Events

  • [:ecto_middleware, :middleware, :start] - Individual middleware starts

    • Measurements: %{system_time: integer()}
    • Metadata: %{middleware: module(), pipeline_id: reference()}
  • [:ecto_middleware, :middleware, :stop] - Individual middleware completes

    • Measurements: %{duration: integer()}
    • Metadata: %{middleware: module(), result: :cont | :halt}
  • [:ecto_middleware, :middleware, :exception] - Individual middleware fails

    • Measurements: %{duration: integer()}
    • Metadata: %{middleware: module(), kind: atom(), reason: term()}

Example handler:

:telemetry.attach(
  "log-slow-middleware",
  [:ecto_middleware, :middleware, :stop],
  fn _event, %{duration: duration}, %{middleware: middleware}, _config ->
    if duration > 1_000_000 do  # 1ms
      Logger.warn("Slow middleware: #{inspect(middleware)} took #{duration}ns")
    end
  end,
  nil
)

Migration from V1

V1 middleware continue to work but emit deprecation warnings. See the Migration Guide for details.

Key differences:

  • Use EctoMiddleware.Repo instead of EctoMiddleware in Repo modules
  • Implement process_before/2, process_after/2, or process/2 instead of middleware/2
  • No need for EctoMiddleware.Super marker
  • Return {:cont, value} or {:halt, value} instead of bare values

Silencing Deprecation Warnings

During migration, you can silence warnings temporarily:

# In config/config.exs
config :ecto_middleware, :silence_deprecation_warnings, true

This should only be used during migration - deprecated APIs will be removed in v3.0.

Example Usage in the Wild

EctoHooks - Adds before_* and after_* callbacks to Ecto schemas, similar to the old Ecto.Model callbacks. Implemented entirely using EctoMiddleware.

See the implementation for real-world middleware examples.

Documentation

Contributing

Contributions are welcome! Please feel free to submit a Pull Request.

License

MIT License. See LICENSE for details.

About

Implements the middleware pattern for your Ecto repos

Resources

License

Stars

Watchers

Forks

Releases

No releases published

Packages

No packages published

Languages