diff --git a/README.md b/README.md index 624ae96..a7a63ee 100644 --- a/README.md +++ b/README.md @@ -1,28 +1,788 @@ -# conductor-sharp +# ConductorSharp -A Conductor client library with some quality of life additions and a builder for conductor workflows. -[More info on wiki](https://github.com/codaxy/conductor-sharp/wiki) +A comprehensive .NET client library for [Conductor](https://github.com/conductor-oss/conductor) workflow orchestration engine. Features a strongly-typed workflow builder DSL, task handlers, and quality-of-life additions for building robust workflow applications. -## Installing ConductorSharp +[![NuGet](https://img.shields.io/nuget/v/ConductorSharp.Client.svg)](https://www.nuget.org/packages/ConductorSharp.Client) +[![License: MIT](https://img.shields.io/badge/License-MIT-yellow.svg)](https://opensource.org/licenses/MIT) -If you require Conductor API client use: +[📖 More info on wiki](https://github.com/codaxy/conductor-sharp/wiki) -`dotnet add package ConductorSharp.Client --version 1.0.3` +## Table of Contents -If you also require workflow and task registration, worker scheduling and execution, workflow builder, use: +- [Installation](#installation) +- [Quick Start](#quick-start) +- [Core Concepts](#core-concepts) + - [Workflow Definition](#workflow-definition) + - [Task Handlers](#task-handlers) + - [Input/Output Models](#inputoutput-models) +- [Task Types](#task-types) +- [Configuration](#configuration) +- [Pipeline Behaviors](#pipeline-behaviors) +- [Health Checks](#health-checks) +- [Patterns Package](#patterns-package) +- [Kafka Cancellation Notifier](#kafka-cancellation-notifier) +- [Toolkit CLI](#toolkit-cli) +- [API Services](#api-services) +- [Running the Examples](#running-the-examples) -`dotnet add package ConductorSharp.Engine --version 1.0.3` +## Installation -## Running the example +### Core Packages -### Conductor setup +```bash +# API client for Conductor +dotnet add package ConductorSharp.Client --version 3.5.0 -Clone Conductor repo and run docker-compose file according to https://github.com/Netflix/conductor. +# Workflow engine with builder DSL, task handlers, and worker scheduling +dotnet add package ConductorSharp.Engine --version 3.5.0 +``` -The conductor UI can be accessed at http://localhost:5000 +### Additional Packages -### Starting the example solution +```bash +# Built-in tasks (WaitSeconds, ReadWorkflowTasks, C# Lambda Tasks) +dotnet add package ConductorSharp.Patterns --version 3.5.0 -Selecting and running the Docker Compose as startup for the .NET project will register the task and workflow definitions. +# Kafka-based task cancellation notifications +dotnet add package ConductorSharp.KafkaCancellationNotifier --version 3.5.0 -The example projects provide a simple API to test running the workflow and fetching workflow definitions. +# CLI tool for scaffolding task/workflow definitions +dotnet tool install --global ConductorSharp.Toolkit --version 3.0.1-beta3 +``` + +## Quick Start + +### 1. Configure Services + +```csharp +using ConductorSharp.Engine.Extensions; + +var builder = Host.CreateDefaultBuilder() + .ConfigureServices((_, services) => + { + services + .AddConductorSharp(baseUrl: "http://localhost:8080") + .AddExecutionManager( + maxConcurrentWorkers: 10, + sleepInterval: 500, + longPollInterval: 100, + domain: null, + typeof(Program).Assembly + ) + .AddPipelines(pipelines => + { + pipelines.AddRequestResponseLogging(); + pipelines.AddValidation(); + }); + + services.RegisterWorkflow(); + }); +``` + +### 2. Define a Task Handler + +```csharp +using ConductorSharp.Engine.Builders.Metadata; +using ConductorSharp.Engine.Interface; +using MediatR; + +public class PrepareEmailRequest : IRequest +{ + public string CustomerName { get; set; } + public string Address { get; set; } +} + +public class PrepareEmailResponse +{ + public string EmailBody { get; set; } +} + +[OriginalName("EMAIL_prepare")] +public class PrepareEmailHandler : ITaskRequestHandler +{ + public async Task Handle(PrepareEmailRequest request, CancellationToken cancellationToken) + { + var body = $"Hello {request.CustomerName} at {request.Address}!"; + return new PrepareEmailResponse { EmailBody = body }; + } +} +``` + +### 3. Define a Workflow + +```csharp +using ConductorSharp.Engine.Builders; +using ConductorSharp.Engine.Builders.Metadata; + +public class SendNotificationInput : WorkflowInput +{ + public int CustomerId { get; set; } +} + +public class SendNotificationOutput : WorkflowOutput +{ + public string EmailBody { get; set; } +} + +[OriginalName("NOTIFICATION_send")] +[WorkflowMetadata(OwnerEmail = "team@example.com")] +public class SendNotificationWorkflow : Workflow +{ + public SendNotificationWorkflow( + WorkflowDefinitionBuilder builder + ) : base(builder) { } + + public CustomerGetV1 GetCustomer { get; set; } + public EmailPrepareV1 PrepareEmail { get; set; } + + public override void BuildDefinition() + { + _builder.AddTask( + wf => wf.GetCustomer, + wf => new() { CustomerId = wf.WorkflowInput.CustomerId } + ); + + _builder.AddTask( + wf => wf.PrepareEmail, + wf => new() + { + CustomerName = wf.GetCustomer.Output.Name, + Address = wf.GetCustomer.Output.Address + } + ); + + _builder.SetOutput(wf => new() + { + EmailBody = wf.PrepareEmail.Output.EmailBody + }); + } +} +``` + +## Core Concepts + +### Workflow Definition + +Workflows are defined by inheriting from `Workflow`: + +```csharp +public class MyWorkflow : Workflow +{ + public MyWorkflow(WorkflowDefinitionBuilder builder) + : base(builder) { } + + // Task properties - these become task references in the workflow + public SomeTaskV1 FirstTask { get; set; } + public AnotherTaskV1 SecondTask { get; set; } + + public override void BuildDefinition() + { + // Add tasks with strongly-typed input expressions + _builder.AddTask(wf => wf.FirstTask, wf => new() { Input = wf.WorkflowInput.SomeValue }); + _builder.AddTask(wf => wf.SecondTask, wf => new() { Input = wf.FirstTask.Output.Result }); + + // Set workflow output + _builder.SetOutput(wf => new() { Result = wf.SecondTask.Output.Value }); + } +} +``` + +### Task Handlers + +Two approaches for implementing task handlers: + +#### Interface-based (Recommended) + +```csharp +[OriginalName("MY_TASK_name")] +public class MyTaskHandler : ITaskRequestHandler +{ + private readonly ConductorSharpExecutionContext _context; + + public MyTaskHandler(ConductorSharpExecutionContext context) + { + _context = context; // Access workflow/task metadata + } + + public async Task Handle(MyTaskRequest request, CancellationToken cancellationToken) + { + // Implementation + return new MyTaskResponse { /* ... */ }; + } +} +``` + +#### Abstract class-based + +```csharp +[OriginalName("MY_TASK_name")] +public class MyTaskHandler : TaskRequestHandler +{ + public override async Task Handle(MyTaskRequest request, CancellationToken cancellationToken) + { + return new MyTaskResponse { /* ... */ }; + } +} +``` + +### Input/Output Models + +```csharp +// Workflow I/O +public class MyWorkflowInput : WorkflowInput +{ + public string CustomerId { get; set; } +} + +public class MyWorkflowOutput : WorkflowOutput +{ + public string Result { get; set; } +} + +// Task I/O +public class MyTaskRequest : IRequest +{ + [Required] + public string InputValue { get; set; } +} + +public class MyTaskResponse +{ + public string OutputValue { get; set; } +} +``` + +### Metadata Attributes + +| Attribute | Target | Description | +|-----------|--------|-------------| +| `[OriginalName("NAME")]` | Class | Custom task/workflow name in Conductor | +| `[WorkflowMetadata(...)]` | Class | Workflow metadata (OwnerEmail, OwnerApp, Description, FailureWorkflow) | +| `[Version(n)]` | Class | Version number for sub-workflow references | +| `[TaskDomain("domain")]` | Class | Assign task to specific domain | + +## Task Types + +### Simple Task + +```csharp +_builder.AddTask(wf => wf.MySimpleTask, wf => new() { Input = wf.WorkflowInput.Value }); +``` + +### Sub-Workflow Task + +```csharp +public SubWorkflowTaskModel ChildWorkflow { get; set; } + +_builder.AddTask(wf => wf.ChildWorkflow, wf => new() { CustomerId = wf.WorkflowInput.CustomerId }); +``` + +### Switch Task (Conditional Branching) + +```csharp +public SwitchTaskModel SwitchTask { get; set; } +public TaskA TaskInCaseA { get; set; } +public TaskB TaskInCaseB { get; set; } + +_builder.AddTask( + wf => wf.SwitchTask, + wf => new SwitchTaskInput { SwitchCaseValue = wf.WorkflowInput.Operation }, + new DecisionCases + { + ["caseA"] = builder => builder.AddTask(wf => wf.TaskInCaseA, wf => new() { }), + ["caseB"] = builder => builder.AddTask(wf => wf.TaskInCaseB, wf => new() { }), + DefaultCase = builder => { /* default case tasks */ } + } +); +``` + +### Dynamic Task + +```csharp +public DynamicTaskModel DynamicHandler { get; set; } + +_builder.AddTask( + wf => wf.DynamicHandler, + wf => new() + { + TaskInput = new() { CustomerId = wf.WorkflowInput.CustomerId }, + TaskToExecute = wf.WorkflowInput.TaskName // Task name resolved at runtime + } +); +``` + +### Dynamic Fork-Join Task + +```csharp +public DynamicForkJoinTaskModel DynamicFork { get; set; } + +_builder.AddTask( + wf => wf.DynamicFork, + wf => new DynamicForkJoinInput + { + DynamicTasks = /* list of tasks */, + DynamicTasksInput = /* corresponding inputs */ + } +); +``` + +### Lambda Task (JavaScript) + +```csharp +public LambdaTaskModel LambdaTask { get; set; } + +_builder.AddTask( + wf => wf.LambdaTask, + wf => new() { Value = wf.WorkflowInput.Input }, + script: "return { result: $.Value.toUpperCase() }" +); +``` + +### Wait Task + +```csharp +public WaitTaskModel WaitTask { get; set; } + +_builder.AddTask( + wf => wf.WaitTask, + wf => new WaitTaskInput { Duration = "1h" } // or Until = "2024-01-01T00:00:00Z" +); +``` + +### Terminate Task + +```csharp +public TerminateTaskModel TerminateTask { get; set; } + +_builder.AddTask( + wf => wf.TerminateTask, + wf => new TerminateTaskInput + { + TerminationStatus = "COMPLETED", + WorkflowOutput = new { Result = "Done" } + } +); +``` + +### Event Task + +```csharp +public EventTaskModel EventTask { get; set; } + +_builder.AddTask( + wf => wf.EventTask, + wf => new() { EventData = wf.WorkflowInput.Data }, + sink: "kafka:my-topic" +); +``` + +### Human Task + +```csharp +public HumanTaskModel HumanTask { get; set; } + +_builder.AddTask( + wf => wf.HumanTask, + wf => new HumanTaskInput { /* ... */ } +); +``` + +### JSON JQ Transform Task + +```csharp +public JsonJqTransformTaskModel TransformTask { get; set; } + +_builder.AddTask( + wf => wf.TransformTask, + wf => new() { QueryExpression = ".data | map(.name)", Data = wf.WorkflowInput.Items } +); +``` + +### PassThrough Task (Raw Definition) + +For tasks not covered by the builder: + +```csharp +_builder.AddTasks(new WorkflowTask +{ + Name = "CUSTOM_task", + TaskReferenceName = "custom_ref", + Type = "CUSTOM", + InputParameters = new Dictionary { ["key"] = "value" } +}); +``` + +### Optional Tasks + +Mark tasks as optional (workflow continues on failure): + +```csharp +_builder.AddTask(wf => wf.OptionalTask, wf => new() { }).AsOptional(); +``` + +## Configuration + +### Execution Manager + +```csharp +services + .AddConductorSharp(baseUrl: "http://localhost:8080") + .AddExecutionManager( + maxConcurrentWorkers: 10, // Max concurrent task executions + sleepInterval: 500, // Base polling interval (ms) + longPollInterval: 100, // Long poll timeout (ms) + domain: "my-domain", // Optional worker domain + typeof(Program).Assembly // Assemblies containing handlers + ); +``` + +### Multiple Conductor Instances + +```csharp +services + .AddConductorSharp(baseUrl: "http://primary-conductor:8080") + .AddAlternateClient( + baseUrl: "http://secondary-conductor:8080", + key: "Secondary", + apiPath: "api", + ignoreInvalidCertificate: false + ); + +// Usage with keyed services +public class MyController( + IWorkflowService primaryService, + [FromKeyedServices("Secondary")] IWorkflowService secondaryService +) { } +``` + +### Poll Timing Strategies + +```csharp +// Default: Inverse exponential backoff +.AddExecutionManager(...) + +// Constant interval polling +.AddExecutionManager(...) +.UseConstantPollTimingStrategy() +``` + +### Worker Task Registration + +Register standalone tasks without workflow: + +```csharp +services.RegisterWorkerTask(options => +{ + options.OwnerEmail = "team@example.com"; + options.Description = "My task description"; +}); +``` + +## Pipeline Behaviors + +Behaviors form a middleware pipeline for task execution (powered by MediatR): + +```csharp +.AddPipelines(pipelines => +{ + // Add custom behavior (runs first) + pipelines.AddCustomBehavior(typeof(MyCustomBehavior<,>)); + + // Built-in behaviors + pipelines.AddExecutionTaskTracking(); // Track task execution metrics + pipelines.AddContextLogging(); // Add context to log scopes + pipelines.AddRequestResponseLogging(); // Log requests/responses + pipelines.AddValidation(); // Validate using DataAnnotations +}) +``` + +### Custom Behavior Example + +```csharp +public class TimingBehavior : IPipelineBehavior +{ + public async Task Handle( + TRequest request, + RequestHandlerDelegate next, + CancellationToken cancellationToken) + { + var sw = Stopwatch.StartNew(); + var response = await next(); + Console.WriteLine($"Execution took {sw.ElapsedMilliseconds}ms"); + return response; + } +} +``` + +## Health Checks + +### ASP.NET Core Integration + +```csharp +// In Program.cs +builder.Services.AddHealthChecks() + .AddCheck("conductor-worker"); + +// Configure health service +.AddExecutionManager(...) +.SetHealthCheckService() // or InMemoryHealthService +``` + +### Available Health Services + +| Service | Description | +|---------|-------------| +| `InMemoryHealthService` | In-memory health state (default) | +| `FileHealthService` | Persists health to `CONDUCTORSHARP_HEALTH.json` file | + +### Execution Context + +Access workflow/task metadata in handlers: + +```csharp +public class MyHandler : ITaskRequestHandler +{ + private readonly ConductorSharpExecutionContext _context; + + public MyHandler(ConductorSharpExecutionContext context) + { + _context = context; + } + + public async Task Handle(MyRequest request, CancellationToken cancellationToken) + { + var workflowId = _context.WorkflowId; + var taskId = _context.TaskId; + var correlationId = _context.CorrelationId; + // ... + } +} +``` + +## Patterns Package + +Additional built-in tasks and utilities: + +```csharp +.AddExecutionManager(...) +.AddConductorSharpPatterns() // Adds WaitSeconds, ReadWorkflowTasks +.AddCSharpLambdaTasks() // Adds C# lambda task support +``` + +### WaitSeconds Task + +```csharp +public WaitSeconds WaitTask { get; set; } + +_builder.AddTask(wf => wf.WaitTask, wf => new() { Seconds = 30 }); +``` + +### ReadWorkflowTasks Task + +Read task data from another workflow: + +```csharp +public ReadWorkflowTasks ReadTasks { get; set; } + +_builder.AddTask( + wf => wf.ReadTasks, + wf => new() + { + WorkflowId = wf.WorkflowInput.TargetWorkflowId, + TaskNames = "task1,task2" // Comma-separated reference names + } +); +``` + +### C# Lambda Tasks + +Execute C# code inline in workflows: + +```csharp +public CSharpLambdaTaskModel InlineLambda { get; set; } + +_builder.AddTask( + wf => wf.InlineLambda, + wf => new() { Value = wf.WorkflowInput.Input }, + input => new LambdaOutput { Result = input.Value.ToUpperInvariant() } +); +``` + +## Kafka Cancellation Notifier + +Handle task cancellation via Kafka events: + +```csharp +.AddExecutionManager(...) +.AddKafkaCancellationNotifier( + kafkaBootstrapServers: "localhost:9092", + topicName: "conductor.status.task", + groupId: "my-worker-group", + createTopicOnStartup: true +) +``` + +**appsettings.json:** +```json +{ + "Conductor": { + "BaseUrl": "http://localhost:8080", + "MaxConcurrentWorkers": 10, + "SleepInterval": 500, + "LongPollInterval": 100, + "KafkaCancellationNotifier": { + "BootstrapServers": "localhost:9092", + "GroupId": "my-worker", + "TopicName": "conductor.status.task" + } + } +} +``` + +## Toolkit CLI + +Generate C# models from existing Conductor task/workflow definitions. + +### Installation + +```bash +dotnet tool install --global ConductorSharp.Toolkit --version 3.0.1-beta3 +``` + +### Configuration + +Create `conductorsharp.yaml`: + +```yaml +baseUrl: http://localhost:8080 +apiPath: api +namespace: MyApp.Generated +destination: ./Generated +``` + +### Usage + +```bash +# Scaffold all tasks and workflows +dotnet-conductorsharp + +# Use custom config file +dotnet-conductorsharp -f myconfig.yaml + +# Filter by name +dotnet-conductorsharp -n CUSTOMER_get -n ORDER_create + +# Filter by owner email +dotnet-conductorsharp -e team@example.com + +# Filter by owner app +dotnet-conductorsharp -a my-application + +# Skip tasks or workflows +dotnet-conductorsharp --no-tasks +dotnet-conductorsharp --no-workflows + +# Preview without generating files +dotnet-conductorsharp --dry-run +``` + +### Command Options + +| Option | Description | +|--------|-------------| +| `-f, --file` | Configuration file path (default: `conductorsharp.yaml`) | +| `-n, --name` | Filter by task/workflow name (can specify multiple) | +| `-a, --app` | Filter by owner app | +| `-e, --email` | Filter by owner email | +| `--no-tasks` | Skip task scaffolding | +| `--no-workflows` | Skip workflow scaffolding | +| `--dry-run` | Preview what would be generated | + +## API Services + +Inject these services to interact with Conductor programmatically: + +| Service | Description | +|---------|-------------| +| `IWorkflowService` | Start, pause, resume, terminate workflows | +| `ITaskService` | Update tasks, get logs, poll for tasks | +| `IMetadataService` | Manage workflow/task definitions | +| `IAdminService` | Admin operations, queue management | +| `IEventService` | Event handlers | +| `IQueueAdminService` | Queue administration | +| `IWorkflowBulkService` | Bulk workflow operations | +| `IHealthService` | Conductor server health | +| `IExternalPayloadService` | External payload storage | + +### Example Usage + +```csharp +public class WorkflowController : ControllerBase +{ + private readonly IWorkflowService _workflowService; + private readonly IMetadataService _metadataService; + + public WorkflowController(IWorkflowService workflowService, IMetadataService metadataService) + { + _workflowService = workflowService; + _metadataService = metadataService; + } + + [HttpPost("start")] + public async Task StartWorkflow([FromBody] StartRequest request) + { + return await _workflowService.StartAsync(new StartWorkflowRequest + { + Name = "MY_workflow", + Version = 1, + Input = new Dictionary { ["customerId"] = request.CustomerId } + }); + } + + [HttpGet("definitions")] + public async Task> GetDefinitions() + { + return await _metadataService.ListWorkflowsAsync(); + } +} +``` + +## Running the Examples + +### Prerequisites + +1. Clone and run Conductor: + ```bash + git clone https://github.com/conductor-oss/conductor.git + cd conductor + docker-compose up -d + ``` + +2. Conductor UI available at: http://localhost:5000 (may vary by version) + +### Starting the Examples + +The solution includes three example projects: + +| Project | Description | +|---------|-------------| +| `ConductorSharp.Definitions` | Console app with workflow definitions | +| `ConductorSharp.ApiEnabled` | Web API with workflow execution endpoints | +| `ConductorSharp.NoApi` | Console app with Kafka cancellation support | + +```bash +# Run with Docker Compose +docker-compose up + +# Or run individual projects +cd examples/ConductorSharp.Definitions +dotnet run +``` + +## License + +MIT License - see [LICENSE](LICENSE) for details. + +## Contributing + +Contributions are welcome! Please feel free to submit a Pull Request.