Skip to content

Fault-tolerant supervision trees for Rust with distributed capabilities inspired by Erlang/OTP. Build resilient systems that automatically recover from failures with supervisor trees, restart strategies, and distributed supervision.

License

Notifications You must be signed in to change notification settings

gntem/ash-flare

Folders and files

NameName
Last commit message
Last commit date

Latest commit

 

History

31 Commits
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 

Repository files navigation

ash-flare

Crates.io Documentation License: MIT Rust Version

Fault-tolerant supervision trees for Rust with distributed capabilities inspired by Erlang/OTP. Build resilient systems that automatically recover from failures with supervisor trees, restart strategies, and distributed supervision.

Features

  • 🌲 Supervision Trees: Hierarchical supervision with nested supervisors and workers
  • 🔄 Restart Strategies: OneForOne, OneForAll, and RestForOne strategies
  • ⚡ Restart Policies: Permanent, Temporary, and Transient restart behaviors
  • 📊 Restart Intensity: Configurable restart limits with sliding time windows
  • 🗂️ Stateful Workers: Optional shared in-memory KV store for workers (StatefulSupervisorSpec)
  • 🌐 Distributed: Run supervisors across processes or machines via TCP/Unix sockets
  • 🔌 Generic Workers: Trait-based worker system for any async workload
  • 🛠️ Dynamic Management: Add/remove children at runtime
  • 📝 Structured Logging: Built-in support for slog structured logging

Quick Start

Add to your Cargo.toml:

cargo add ash-flare

Basic Example

use ash_flare::{SupervisorSpec, RestartPolicy, Worker};
use async_trait::async_trait;

// Define your worker
struct Counter {
    id: u32,
    max: u32,
}

#[async_trait]
impl Worker for Counter {
    type Error = std::io::Error;

    async fn run(&mut self) -> Result<(), Self::Error> {
        for i in 0..self.max {
            println!("Counter {}: {}", self.id, i);
            tokio::time::sleep(tokio::time::Duration::from_secs(1)).await;
        }
        Ok(())
    }
}

#[tokio::main]
async fn main() {
    // Build supervisor tree
    let spec = SupervisorSpec::new("root")
        .with_worker("counter-1", || Counter { id: 1, max: 5 }, RestartPolicy::Permanent)
        .with_worker("counter-2", || Counter { id: 2, max: 5 }, RestartPolicy::Permanent);

    // Start supervision tree
    let handle = SupervisorHandle::start(spec);
    
    // Query children
    let children = handle.which_children().await.unwrap();
    println!("Running children: {}", children.len());
    
    // Graceful shutdown
    handle.shutdown().await.unwrap();
}

Restart Strategies

OneForOne

Restarts only the failed child (default):

use ash_flare::{SupervisorSpec, RestartStrategy};

let spec = SupervisorSpec::new("supervisor")
    .with_restart_strategy(RestartStrategy::OneForOne);

OneForAll

Restarts all children if any child fails:

let spec = SupervisorSpec::new("supervisor")
    .with_restart_strategy(RestartStrategy::OneForAll);

RestForOne

Restarts the failed child and all children started after it:

let spec = SupervisorSpec::new("supervisor")
    .with_restart_strategy(RestartStrategy::RestForOne);

Restart Policies

Control when a child should be restarted:

use ash_flare::RestartPolicy;

// Always restart (default)
RestartPolicy::Permanent

// Never restart
RestartPolicy::Temporary

// Restart only on abnormal termination
RestartPolicy::Transient

Nested Supervisors

Build hierarchical supervision trees:

let database_supervisor = SupervisorSpec::new("database")
    .with_worker("db-pool", || DbPool::new(), RestartPolicy::Permanent)
    .with_worker("db-cache", || DbCache::new(), RestartPolicy::Transient);

let app_supervisor = SupervisorSpec::new("app")
    .with_supervisor(database_supervisor)
    .with_worker("http-server", || HttpServer::new(), RestartPolicy::Permanent);

let handle = SupervisorHandle::start(app_supervisor);

Restart Intensity

Configure maximum restart attempts within a time window:

use ash_flare::RestartIntensity;

let spec = SupervisorSpec::new("supervisor")
    .with_restart_intensity(RestartIntensity {
        max_restarts: 5,      // Maximum restarts
        within_seconds: 10,   // Within time window
    });

Stateful Workers with Shared Store

Use StatefulSupervisorSpec for workers that need to share state via an in-memory KV store:

use ash_flare::{StatefulSupervisorSpec, StatefulSupervisorHandle, WorkerContext};
use std::sync::Arc;

struct AuctionWorker {
    id: u32,
    ctx: Arc<WorkerContext>,
}

#[async_trait]
impl Worker for AuctionWorker {
    type Error = std::io::Error;

    async fn run(&mut self) -> Result<(), Self::Error> {
        // Read from shared store
        let current_bid = self.ctx.get("highest_bid")
            .and_then(|v| v.as_u64())
            .unwrap_or(0);

        // Update shared store
        self.ctx.set("highest_bid", serde_json::json!(current_bid + 100));

        // Atomic update
        self.ctx.update("bid_count", |v| {
            let count = v.and_then(|v| v.as_u64()).unwrap_or(0);
            Some(serde_json::json!(count + 1))
        });

        Ok(())
    }
}

// Create stateful supervisor (WorkerContext auto-initialized)
let spec = StatefulSupervisorSpec::new("auction-supervisor")
    .with_worker(
        "auction-worker",
        |ctx: Arc<WorkerContext>| AuctionWorker { id: 1, ctx },
        RestartPolicy::Permanent,
    );

let handle = StatefulSupervisorHandle::start(spec);

Or use the stateful_supervision_tree! macro for a more declarative approach:

use ash_flare::stateful_supervision_tree;

let spec = stateful_supervision_tree! {
    name: "auction-supervisor",
    strategy: OneForOne,
    intensity: (5, 10),
    workers: [
        ("bidder-1", |ctx| AuctionWorker::new(1, ctx), Permanent),
        ("bidder-2", |ctx| AuctionWorker::new(2, ctx), Permanent),
    ],
    supervisors: []
};

WorkerContext API:

  • get(key) - Retrieve a value
  • set(key, value) - Store a value
  • delete(key) - Remove a key
  • update(key, fn) - Atomic update with a function

The store is process-local, concurrent-safe (backed by DashMap), and persists across worker restarts.

Dynamic Supervision

Add and remove children at runtime:

// Dynamically add a worker
let child_id = handle
    .start_child("dynamic-worker", || MyWorker::new(), RestartPolicy::Temporary)
    .await
    .unwrap();

// Terminate a specific child
handle.terminate_child(&child_id).await.unwrap();

// List all running children
let children = handle.which_children().await.unwrap();

Distributed Supervision

Run supervisors across processes or machines:

use ash_flare::distributed::{SupervisorServer, RemoteSupervisorHandle};

// Start supervisor server
let handle = SupervisorHandle::start(spec);
let server = SupervisorServer::new(handle);

tokio::spawn(async move {
    server.listen_tcp("127.0.0.1:8080").await.unwrap();
});

// Connect from another process/machine
let remote = RemoteSupervisorHandle::connect_tcp("127.0.0.1:8080").await.unwrap();
let children = remote.which_children().await.unwrap();
remote.shutdown().await.unwrap();

Worker Lifecycle

Implement the Worker trait with optional lifecycle hooks:

use ash_flare::Worker;
use async_trait::async_trait;

struct MyWorker;

#[async_trait]
impl Worker for MyWorker {
    type Error = std::io::Error;

    async fn initialize(&mut self) -> Result<(), Self::Error> {
        // Called once before run()
        println!("Worker initializing...");
        Ok(())
    }

    async fn run(&mut self) -> Result<(), Self::Error> {
        // Main worker loop
        loop {
            // Do work...
        }
    }

    async fn shutdown(&mut self) -> Result<(), Self::Error> {
        // Called during graceful shutdown
        println!("Worker shutting down...");
        Ok(())
    }
}

Error Handling

Workers return errors that trigger restart policies:

#[async_trait]
impl Worker for MyWorker {
    type Error = MyError;

    async fn run(&mut self) -> Result<(), Self::Error> {
        match self.do_work().await {
            Ok(_) => Ok(()), // Normal termination
            Err(e) => Err(e), // Triggers restart based on policy
        }
    }
}

Structured Logging

Ash Flare uses slog for structured logging. To see logs, set up a global logger:

use slog::{Drain, Logger, o};
use slog_async::Async;
use slog_term::{FullFormat, TermDecorator};

fn main() {
    // Set up logger
    let decorator = TermDecorator::new().build();
    let drain = FullFormat::new(decorator).build().fuse();
    let drain = Async::new(drain).build().fuse();
    let logger = Logger::root(drain, o!());
    
    // Set as global logger
    let _guard = slog_scope::set_global_logger(logger);
    
    // Your supervision tree code here...
}

Logs include structured data for easy filtering:

INFO server listening on tcp; address: "127.0.0.1:8080"
DEBUG child terminated; supervisor: "root", child: "worker-1", reason: Normal
ERROR restart intensity exceeded, shutting down; supervisor: "root"

Examples

Check the examples/ directory for more:

  • counter.rs - Basic supervisor with multiple workers
  • distributed.rs - Network-distributed supervisors
  • super_tree.rs - Complex nested supervision trees
  • interactive_demo.rs - Interactive supervisor management

Run an example:

cargo run --example counter

License

MIT License - see LICENSE file for details.

Acknowledgments

Inspired by Erlang/OTP's in some way.

Some code generated with the help of AI tools.

About

Fault-tolerant supervision trees for Rust with distributed capabilities inspired by Erlang/OTP. Build resilient systems that automatically recover from failures with supervisor trees, restart strategies, and distributed supervision.

Topics

Resources

License

Stars

Watchers

Forks

Contributors 2

  •  
  •