Skip to content

A C++ library that provides utilities for managing thread-safe operations. This library is designed to simplify concurrent programming by offering easy-to-use and robust thread-safe abstractions.

Notifications You must be signed in to change notification settings

tranglecong/trlc_threadsafe

Repository files navigation

TRLC-ThreadSafe

CI

Modern C++ concurrency library using policy-based design for composable, high-performance multithreading primitives.

Features

  • Header-Only: Zero build dependencies, just include and use
  • Policy-Based Design: Compose components with compile-time policies
  • Type-Safe: Leverage C++17 templates for compile-time safety
  • High Performance: Lock-free algorithms, cache-aligned data structures
  • Cross-Platform: Linux, macOS, Windows support

Library Components

Containers

  • BoundedQueue<T, N> - Fixed-size MPMC queue with backpressure
  • UnboundedQueue<T> - Dynamic MPMC queue
  • Channel<T> - Go-style message passing (buffered/unbuffered)
  • ConcurrentMap<K, V> - Sharded hash map for concurrent access

Synchronization

  • Guarded<T> - Mutex-protected data with scoped locking
  • RwGuarded<T> - Reader-writer lock wrapper
  • Event - Manual/auto-reset signaling primitive
  • Notification - One-shot signaling
  • WaitGroup - Go-style synchronization

Lock-Free

  • SPSCQueue<T, N> - Wait-free single-producer, single-consumer queue
  • MPSCQueue<T> - Lock-free multi-producer, single-consumer queue
  • WorkStealingDeque<T> - Chase-Lev work-stealing deque

Execution

  • ThreadPool - Work-stealing thread pool with task submission
  • Pipeline<In, Out> - Staged data processing with backpressure
  • EventBus<Events...> - Type-safe publish-subscribe with policies
  • ActorSystem - Message-based concurrency

Foundation

  • Result<T, E> - Rust-style error handling with monadic operations
  • Option<T> - Maybe/Optional with monadic operations
  • Status - gRPC-style status codes

Examples

💡 Complete working examples in examples/showcase/:

  • result_railway.cpp - Railway-oriented programming with monadic operations
  • eventbus_policies.cpp - Type-erased execution policies (inline/threadpool)
  • channel_select.cpp - Go-style CSP with select multiplexing
  • queue_overflow.cpp - 5 overflow policies demonstration
  • pipeline_backpressure.cpp - Multi-stage processing with backpressure
  • threadpool_work_stealing.cpp - Work-stealing load balancing

See examples/ directory for 40+ complete examples covering all components.

1. Result - Railway-Oriented Programming

#include <trlc/threadsafe/core/error/result.hpp>

enum class Error { ParseError, ValidationError, NetworkError };

// Chain operations - errors automatically short-circuit
auto processRequest(const std::string& json) {
    return parseJSON(json)                    // Result<Data, Error>
        .andThen(validateSchema)              // Only runs if parse succeeded
        .andThen(enrichWithUserData)          // Only runs if validation passed
        .map([](Data d) { return d.value * 2; })  // Transform success
        .orElse([](Error e) {                 // Fallback on any error
            log("Failed: {}", e);
            return loadCachedData();
        });
}

// Pattern matching for exhaustive handling
auto result = processRequest(request_body);
result.match(
    [](int value) { sendResponse(200, value); },
    [](Error e) { sendResponse(500, errorMsg(e)); }
);

// Monadic operations: map, andThen, orElse, filter
auto age = getUser(id)
    .map([](User u) { return u.age; })
    .filter([](int age) { return age >= 18; })
    .valueOr(0);

2. EventBus - Pub-Sub with Execution Policies

#include <trlc/threadsafe/patterns/event_bus.hpp>
#include <trlc/threadsafe/exec/thread_pool.hpp>

struct SensorData { int id; double temp; };
struct Alert { std::string msg; };

// Inline: <1μs, runs on publisher thread
auto ui_bus = makeInlineEventBus<SensorData, Alert>();

// ThreadPool: ~100μs, async execution
auto pool = std::make_shared<exec::ThreadPool>(4);
auto io_bus = makeThreadPoolEventBus<SensorData, Alert>(pool);

// Subscribe - multiple handlers, decoupled
auto h1 = ui_bus->subscribe<SensorData>([](const SensorData& d) {
    dashboard.update(d);  // Fast UI update
});

auto h2 = io_bus->subscribe<SensorData>([](const SensorData& d) {
    database.save(d);     // Heavy I/O - don't block!
    if (d.temp > 80) {
        sendAlert({"Overheat: " + std::to_string(d.temp)});
    }
});

auto h3 = io_bus->subscribe<Alert>([](const Alert& a) {
    smtp.send(a.msg);     // Network call - async
});

// Publish - fire and forget
ui_bus->publish(SensorData{1, 75.5});   // Runs immediately
io_bus->publish(SensorData{2, 85.2});   // Queued, returns instantly

// RAII: auto-unsubscribe when handles destroyed
pool->shutdown();

3. Channel - Go-Style CSP

#include <trlc/threadsafe/containers/channel.hpp>

// Unbuffered: Rendezvous (both sides must be ready)
Channel<Request, Unbuffered> rpc;
std::thread server([&] {
    auto req = rpc.recv();        // Blocks until client sends
    auto res = process(req.value());
    response_ch.send(res);        // Client is waiting
});
rpc.send(Request{"data"});        // Synchronization point

// Buffered: Producer-consumer with automatic backpressure
Channel<Image, Bounded<8>> images;
std::thread producer([&] {
    for (auto& img : camera.stream()) {
        images.send(img);         // Blocks if buffer full
    }
    images.close();               // Signal completion
});

std::thread consumer([&] {
    while (auto img = images.recv(); img.isOk()) {
        processImage(img.value());
    }
});

// Select pattern: Multiplexing multiple channels
Channel<int, Bounded<4>> ch1, ch2, ch3;
while (running) {
    auto r1 = ch1.tryRecv();
    auto r2 = ch2.tryRecv();
    auto r3 = ch3.tryRecv();
    
    if (r1.isOk()) handleSource1(r1.value());
    if (r2.isOk()) handleSource2(r2.value());
    if (r3.isOk()) handleSource3(r3.value());
}

// Timeout operations
auto result = ch1.recvFor(100ms);
if (result.isErr() && result.error() == ChannelStatus::timeout) {
    handleTimeout();
}

4. BoundedQueue - 5 Overflow Policies

#include <trlc/threadsafe/containers/bounded_queue.hpp>

// Block: Waits when full (critical data - orders, payments)
BoundedQueue<Order, 128> orders;
orders.push(order);  // Blocks if full

// Fail: Returns error (need to know capacity reached)
BoundedQueue<Request, 128, OverflowPolicy::Fail> requests;
if (auto res = requests.tryPush(req); res.isErr()) {
    metrics.queueFull++;
    return Error::ServiceOverloaded;
}

// DropOldest: Keep latest (sensor data, stock prices)
BoundedQueue<Temperature, 64, OverflowPolicy::DropOldest> sensors;
sensors.tryPush(temp);  // Always succeeds, drops old data

// DropNewest: Keep history (audit logs, telemetry)
BoundedQueue<AuditLog, 256, OverflowPolicy::DropNewest> logs;
logs.tryPush(log);  // Keeps old logs if full

// Overwrite: Ring buffer (video frames, audio samples)
BoundedQueue<Frame, 32, OverflowPolicy::Overwrite> frames;
frames.tryPush(frame);  // Overwrites oldest position

// Consumer (same for all policies)
while (auto item = orders.tryPop(); item.isOk()) {
    process(item.value());
}

// MPMC safe: Multiple producers and consumers
std::thread t1([&] { orders.push(Order{1}); });
std::thread t2([&] { orders.push(Order{2}); });
std::thread c1([&] { orders.tryPop(); });
std::thread c2([&] { orders.tryPop(); });

5. Pipeline - Multi-Stage Processing

#include <trlc/threadsafe/patterns/pipeline.hpp>

// Image processing: Decode → Filter → Encode
Pipeline<ImageData, CompressedImage> pipeline;

// Stage 1: Decode (2 workers)
Pipeline<ImageData, CompressedImage>::StageConfig cfg1;
cfg1.workers = 2;
cfg1.buffer_size = 16;
pipeline.addStage<ImageData, RawImage>(
    [](const ImageData& img) { return decode(img); },
    cfg1
);

// Stage 2: Apply filters (4 workers - GPU bound)
Pipeline<ImageData, CompressedImage>::StageConfig cfg2;
cfg2.workers = 4;
cfg2.buffer_size = 32;
pipeline.addStage<RawImage, FilteredImage>(
    [](const RawImage& raw) { return applyFilters(raw); },
    cfg2
);

// Stage 3: Compress (2 workers)
Pipeline<ImageData, CompressedImage>::StageConfig cfg3;
cfg3.workers = 2;
cfg3.buffer_size = 16;
pipeline.addStage<FilteredImage, CompressedImage>(
    [](const FilteredImage& f) { return compress(f); },
    cfg3
);

pipeline.start();

// Producer: Feed data (automatic backpressure)
for (auto& img : camera.capture()) {
    pipeline.submit(img);  // Blocks if stage 1 buffer full
}

// Consumer: Get results
while (auto result = pipeline.receive(); result.isOk()) {
    network.send(result.value());
}

pipeline.stop();

6. ThreadPool - Work Stealing

#include <trlc/threadsafe/exec/thread_pool.hpp>

ThreadPool pool(std::thread::hardware_concurrency());

// Submit 10,000 tasks
std::atomic<int> completed{0};
std::vector<std::future<Result>> futures;

for (int i = 0; i < 10000; ++i) {
    futures.push_back(pool.submit([&completed, i]() {
        auto result = processFile(files[i]);  // Varying duration
        completed.fetch_add(1);
        return result;
    }));
}

// Work stealing: Fast threads steal from busy threads
// No manual load balancing needed!

pool.waitIdle();
std::cout << "Completed: " << completed.load() << std::endl;

// Check results
for (auto& f : futures) {
    if (auto res = f.get(); res.isErr()) {
        handleError(res.error());
    }
}

pool.shutdown();

// Fire-and-forget style
for (int i = 0; i < 1000; ++i) {
    pool.submit([i]() {
        processTask(i);  // <100ns submission latency
    });
}

7. Option & Result - Monadic Operations

#include <trlc/threadsafe/core/error/option.hpp>
#include <trlc/threadsafe/core/error/result.hpp>

// Option: Nullable values with monadic operations
Option<User> findUser(int id);

auto name = findUser(123)
    .map([](const User& u) { return u.name; })
    .filter([](auto& n) { return !n.empty(); })
    .valueOr("Guest");

if (auto user = findUser(123); user.isSome()) {
    process(user.value());
}

// Result: Error handling with context
enum class Error { NotFound, ParseError, ValidationError };

Result<Config, Error> loadConfig() {
    return readFile("config.json")
        .andThen(parseJSON)
        .andThen(validateConfig)
        .orElse([](Error e) {
            log("Config error: {}", e);
            return loadDefaults();
        });
}

loadConfig().match(
    [](const Config& c) { app.init(c); },
    [](Error e) { app.initMinimal(); }
);

// Compose multiple operations
auto result = getUser(id)
    .andThen(checkPermissions)
    .andThen(loadProfile)
    .map([](Profile p) { return p.settings; })
    .orElse([](Error e) { return defaultSettings(); });

8. Guarded - RAII Mutex

#include <trlc/threadsafe/sync/guarded.hpp>

struct Account { double balance; };
Guarded<Account> account{Account{1000.0}};

// Scoped lock with lambda
account.with([](Account& acc) {
    acc.balance -= 100.0;
});  // Auto-unlock

// Manual lock guard
{
    auto guard = account.lock();
    guard->balance += 50.0;
}  // Auto-unlock

// Try-lock (non-blocking) - returns Option<Guard>
auto maybe_guard = account.tryLock();
if (maybe_guard.isSome()) {
    maybe_guard.value()->balance += 10.0;
} else {
    metrics.lockContentions++;
}

// Timeout lock
auto guard = account.tryLockFor(100ms);
guard.map([](auto& g) {
    g->balance -= 25.0;
    return true;
}).valueOr([]() {
    log("Lock timeout");
    return false;
});

// Multi-threaded access
std::thread t1([&] { account.with([](auto& a) { a.balance += 100; }); });
std::thread t2([&] { account.with([](auto& a) { a.balance -= 50; }); });

9. ConcurrentMap - Sharded HashMap

#include <trlc/threadsafe/containers/concurrent_map.hpp>

ConcurrentMap<SessionId, UserSession> sessions;  // 16 shards

// Insert (returns Option<V> - old value if key existed)
auto old = sessions.insert("sess_123", UserSession{...});
if (old.isSome()) {
    log("Replaced session: {}", old.value().user_id);
}

// Get (returns Option<V>)
auto session = sessions.get("sess_123");
if (session.isSome()) {
    process(session.value());
} else {
    createNewSession();
}

// Update with lambda (atomic)
sessions.update("sess_123", [](UserSession& s) {
    s.last_activity = now();
    s.request_count++;
});  // Only this shard locked

// Remove (returns Option<V>)
auto removed = sessions.remove("sess_123");
removed.map([](const UserSession& s) {
    cleanup(s);
});

// Lock-free size
std::cout << "Active: " << sessions.size() << std::endl;

// Concurrent access from multiple threads
std::thread t1([&] { sessions.insert("s1", UserSession{}); });
std::thread t2([&] { sessions.insert("s2", UserSession{}); });
std::thread t3([&] { sessions.get("s1"); });
std::thread t4([&] { sessions.update("s2", [](auto& s) { s.hits++; }); });

Quick Start

Prerequisites

  • C++17 compatible compiler (GCC 9+, Clang 11+, MSVC 2019+)
  • CMake 3.15+
  • GoogleTest (automatically downloaded)

Building from Source

Linux/macOS:

cmake -B build -S . -DCMAKE_BUILD_TYPE=Release
cmake --build build --parallel
cd build && ctest --output-on-failure

Windows:

cmake -B build -S .
cmake --build build --config Release --parallel
cd build
ctest -C Release --output-on-failure

Using in Your Project

Header-only (recommended):

# In your CMakeLists.txt
add_subdirectory(external/trlc-threadsafe)
target_link_libraries(your_target PRIVATE trlc::threadsafe)

Include in code:

#include <trlc/threadsafe/containers/bounded_queue.hpp>
#include <trlc/threadsafe/patterns/event_bus.hpp>
#include <trlc/threadsafe/exec/thread_pool.hpp>

using namespace trlc::threadsafe;

Contributing

Contributions are welcome! Please follow these guidelines:

  1. Fork the repository
  2. Create a feature branch (git checkout -b feature/amazing-feature)
  3. Make your changes with tests
  4. Ensure all tests pass locally (./scripts/test-all-compilers.sh)
  5. Commit with descriptive messages (git commit -m "Add amazing feature")
  6. Push to your branch (git push origin feature/amazing-feature)
  7. Open a Pull Request

License

MIT License - see LICENSE file for details.

Contact

About

A C++ library that provides utilities for managing thread-safe operations. This library is designed to simplify concurrent programming by offering easy-to-use and robust thread-safe abstractions.

Topics

Resources

Stars

Watchers

Forks

Releases

No releases published

Packages

No packages published