diff --git a/examples/workflow/README.md b/examples/workflow/README.md index ac70cfa8..3b102e23 100644 --- a/examples/workflow/README.md +++ b/examples/workflow/README.md @@ -461,3 +461,44 @@ app1 - received workflow error from app2 ``` among others. This shows that the workflow calls are failing as expected, and they are being handled as expected too. + +### Versioning + +This example demonstrates how to version a workflow. The Dapr CLI can be started using the following command: + + + +```sh +dapr run --app-id wf-versioning-example -- python3 versioning.py part1 +dapr run --app-id wf-versioning-example -- python3 versioning.py part2 +``` + diff --git a/examples/workflow/versioning.py b/examples/workflow/versioning.py new file mode 100644 index 00000000..0e15a638 --- /dev/null +++ b/examples/workflow/versioning.py @@ -0,0 +1,272 @@ +# -*- coding: utf-8 -*- +# Copyright 2026 The Dapr Authors +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# http://www.apache.org/licenses/LICENSE-2.0 +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import sys +import time +from datetime import timedelta + +import dapr.ext.workflow as wf +from durabletask.task import TaskFailedError + +wfr = wf.WorkflowRuntime() + +current_test = 0 +def print_test(message: str): + print(f'test{current_test}: {message}', flush=True) + +def test_full_versioning(client: wf.DaprWorkflowClient): + global current_test + + # Start with only one version defined. Runnig the workflow should run this version as it normally would. + current_test = 1 + @wfr.versioned_workflow(name="workflow", is_latest=True) + def version1_workflow(ctx: wf.DaprWorkflowContext): + print_test('Received workflow call for version1') + yield ctx.wait_for_external_event(name='event') + print_test('Finished workflow for version1') + return 1 + + print_test('triggering workflow') + instance_id = client.schedule_new_workflow(workflow=version1_workflow) + client.raise_workflow_event(instance_id, event_name='event') + client.wait_for_workflow_completion(instance_id, timeout_in_seconds=30) + + # Now we start a workflow, but introduce a latest version half way. It should resume the execution in the old version. + current_test = 2 + print_test('triggering workflow') + instance_id = client.schedule_new_workflow(workflow=version1_workflow) + time.sleep(2) # wait for the workflow to start and wait for the event + + @wfr.versioned_workflow(name="workflow", is_latest=True) + def version2_workflow(ctx: wf.DaprWorkflowContext): + print_test('Received workflow call for version2') + yield ctx.wait_for_external_event(name='event') + print_test('Finished workflow for version2') + return 1 + + client.raise_workflow_event(instance_id, event_name='event') + client.wait_for_workflow_completion(instance_id, timeout_in_seconds=30) + + + # Now we have the two versions defined, running the workflow now should run v2 as it's the latest version. + current_test = 3 + print_test('triggering workflow') + instance_id = client.schedule_new_workflow(workflow=version1_workflow) + client.raise_workflow_event(instance_id, event_name='event') + client.wait_for_workflow_completion(instance_id, timeout_in_seconds=30) + +def test_patching(client: wf.DaprWorkflowClient): + global current_test + + @wfr.workflow + def patching_workflow(ctx: wf.DaprWorkflowContext): + # This function will be changed throughout the test, to simulate different scenarios + return workflow_code(ctx) + + + # Runs the patched branch by default + current_test = 4 + def workflow_code(ctx: wf.DaprWorkflowContext): + print_test('start') + if ctx.is_patched('patch1'): + print_test('patch1 is patched') + else: + print_test('patch1 is not patched') + return 1 + + instance_id = client.schedule_new_workflow(workflow=patching_workflow) + client.wait_for_workflow_completion(instance_id, timeout_in_seconds=30) + + # When the execution passed the place where a patch is introduced, it should be not patched. + def workflow_code(ctx: wf.DaprWorkflowContext): + print_test('start') + yield ctx.wait_for_external_event(name='event') + if ctx.is_patched('patch2'): + print_test('patch2 is patched') + else: + print_test('patch2 is not patched') + return 1 + + current_test = 5 + instance_id = client.schedule_new_workflow(workflow=patching_workflow) + time.sleep(2) + def workflow_code(ctx: wf.DaprWorkflowContext): + print_test('start') + if ctx.is_patched('patch1'): + print_test('patch1 is patched') + else: + print_test('patch1 is not patched') + yield ctx.wait_for_external_event(name='event') + if ctx.is_patched('patch2'): + print_test('patch2 is patched') + else: + print_test('patch2 is not patched') + return 1 + + client.raise_workflow_event(instance_id, event_name='event') + client.wait_for_workflow_completion(instance_id, timeout_in_seconds=30) + + # It remembers previous patches. + def workflow_code(ctx: wf.DaprWorkflowContext): + print_test('start') + if ctx.is_patched('patch1'): + pass # keep it silenced for now, we'll add logs later and this ones would confuse the test + else: + pass + yield ctx.wait_for_external_event(name='event') + if ctx.is_patched('patch2'): + print_test('patch2 is patched') + else: + print_test('patch2 is not patched') + return 1 + + current_test = 6 + instance_id = client.schedule_new_workflow(workflow=patching_workflow) + time.sleep(2) + def workflow_code(ctx: wf.DaprWorkflowContext): + print_test('start') + if ctx.is_patched('patch1'): + print_test('patch1 is patched') + else: + print_test('patch1 is not patched') + yield ctx.wait_for_external_event(name='event') + if ctx.is_patched('patch2'): + print_test('patch2 is patched') + else: + print_test('patch2 is not patched') + return 1 + + client.raise_workflow_event(instance_id, event_name='event') + client.wait_for_workflow_completion(instance_id, timeout_in_seconds=30) + + +def test_full_versioning_stall(client: wf.DaprWorkflowClient): + global current_test + + new_wfr() + @wfr.versioned_workflow(name="stall_workflow", is_latest=True) + def version1_workflow(ctx: wf.DaprWorkflowContext): + print_test('Received workflow call for version1') + yield ctx.wait_for_external_event(name='event') + print_test('Finished workflow for version1') + return 1 + + wfr.start() + current_test = 7 + instance_id = client.schedule_new_workflow(workflow=version1_workflow) + time.sleep(2) + new_wfr() + + @wfr.versioned_workflow(name="stall_workflow", is_latest=True) + def version2_workflow(ctx: wf.DaprWorkflowContext): + print_test('Received workflow call for version2') + yield ctx.wait_for_external_event(name='event') + print_test('Finished workflow for version2') + return 1 + wfr.start() + client.raise_workflow_event(instance_id, event_name='event') + time.sleep(2) + md = client.get_workflow_state(instance_id) + if md.runtime_status == wf.WorkflowStatus.STALLED: + print_test('Workflow is stalled') + else: + print_test('Workflow is not stalled') + +def test_patching_stall(client: wf.DaprWorkflowClient): + global current_test + + current_test = 8 + @wfr.workflow + def patching_workflow(ctx: wf.DaprWorkflowContext): + # This function will be changed throughout the test, to simulate different scenarios + return workflow_code(ctx) + + def workflow_code(ctx: wf.DaprWorkflowContext): + if ctx.is_patched('patch1'): + pass + else: + pass + yield ctx.wait_for_external_event(name='event') + return 1 + + instance_id = client.schedule_new_workflow(workflow=patching_workflow) + time.sleep(2) + + def workflow_code(ctx: wf.DaprWorkflowContext): + # Removed patch1 check + yield ctx.wait_for_external_event(name='event') + return 1 + + client.raise_workflow_event(instance_id, event_name='event') + time.sleep(2) + md = client.get_workflow_state(instance_id) + if md.runtime_status == wf.WorkflowStatus.STALLED: + print_test('Workflow is stalled') + else: + print_test('Workflow is not stalled') + + +def new_wfr(): + global wfr + wfr.shutdown() + wfr = wf.WorkflowRuntime() + + +def main(): + args = sys.argv[1:] + if len(args) == 0: + print('Usage: python versioning.py ') + return + if args[0] == 'part1': + wfr.start() + time.sleep(2) # wait for workflow runtime to start + client = wf.DaprWorkflowClient() + + test_full_versioning(client) + test_patching(client) + + test_full_versioning_stall(client) + test_patching_stall(client) + wfr.shutdown() + elif args[0] == 'part2': + global current_test + current_test = 100 + print_test('part2') + @wfr.versioned_workflow(name="stall_workflow", is_latest=False) + def version1_workflow(ctx: wf.DaprWorkflowContext): + print_test('Received workflow call for version1') + yield ctx.wait_for_external_event(name='event') + print_test('Finished stalled version1 workflow') + return 1 + @wfr.versioned_workflow(name="stall_workflow", is_latest=True) + def version2_workflow(ctx: wf.DaprWorkflowContext): + print_test('Received workflow call for version2') + yield ctx.wait_for_external_event(name='event') + print_test('Finished stalled version2 workflow') + return 1 + + @wfr.workflow + def patching_workflow(ctx: wf.DaprWorkflowContext): + if ctx.is_patched('patch1'): + pass + else: + pass + yield ctx.wait_for_external_event(name='event') + print_test('Finished stalled patching workflow') + return 1 + + wfr.start() + time.sleep(10) + wfr.shutdown() + +if __name__ == '__main__': + main() diff --git a/ext/dapr-ext-workflow/dapr/ext/workflow/dapr_workflow_context.py b/ext/dapr-ext-workflow/dapr/ext/workflow/dapr_workflow_context.py index 714def3f..d649d2ff 100644 --- a/ext/dapr-ext-workflow/dapr/ext/workflow/dapr_workflow_context.py +++ b/ext/dapr-ext-workflow/dapr/ext/workflow/dapr_workflow_context.py @@ -154,6 +154,11 @@ def continue_as_new(self, new_input: Any, *, save_events: bool = False) -> None: self.__obj.continue_as_new(new_input, save_events=save_events) + def is_patched(self, patch_name: str) -> bool: + self._logger.debug(f'{self.instance_id}: Checking if {patch_name} is patched') + return self.__obj.is_patched(patch_name) + + def when_all(tasks: List[task.Task[T]]) -> task.WhenAllTask[T]: """Returns a task that completes when all of the provided tasks complete or when one of the tasks fail.""" diff --git a/ext/dapr-ext-workflow/dapr/ext/workflow/workflow_runtime.py b/ext/dapr-ext-workflow/dapr/ext/workflow/workflow_runtime.py index 593e55c6..71092d07 100644 --- a/ext/dapr-ext-workflow/dapr/ext/workflow/workflow_runtime.py +++ b/ext/dapr-ext-workflow/dapr/ext/workflow/workflow_runtime.py @@ -108,6 +108,38 @@ def orchestrationWrapper(ctx: task.OrchestrationContext, inp: Optional[TInput] = ) fn.__dict__['_workflow_registered'] = True + def register_versioned_workflow(self, fn: Workflow, *, name: str, version_name: Optional[str] = None, is_latest: bool): + self._logger.info(f"Registering version {version_name} of workflow '{fn.__name__}' with runtime") + + def orchestrationWrapper(ctx: task.OrchestrationContext, inp: Optional[TInput] = None): + """Responsible to call Workflow function in orchestrationWrapper""" + daprWfContext = DaprWorkflowContext(ctx, self._logger.get_options()) + if inp is None: + return fn(daprWfContext) + return fn(daprWfContext, inp) + + if hasattr(fn, '_workflow_registered'): + # whenever a workflow is registered, it has a _dapr_alternate_name attribute + alt_name = fn.__dict__['_dapr_alternate_name'] + raise ValueError(f'Workflow {fn.__name__} already registered as {alt_name}') + if hasattr(fn, '_dapr_alternate_name'): + alt_name = fn._dapr_alternate_name + if name is not None: + m = f'Workflow {fn.__name__} already has an alternate name {alt_name}' + raise ValueError(m) + else: + fn.__dict__['_dapr_alternate_name'] = name + + actual_version_name = version_name if version_name is not None else fn.__name__ + + self.__worker._registry.add_named_orchestrator( + name, + orchestrationWrapper, + version_name=actual_version_name, + is_latest=is_latest, + ) + fn.__dict__['_workflow_registered'] = True + def register_activity(self, fn: Activity, *, name: Optional[str] = None): """Registers a workflow activity as a function that takes a specified input type and returns a specified output type. @@ -146,6 +178,29 @@ def shutdown(self): """Stops the listening for work items on a background thread.""" self.__worker.stop() + def versioned_workflow(self, __fn: Workflow = None, *, name: str, version_name: Optional[str] = None, is_latest: bool): + def wrapper(fn: Workflow): + self.register_versioned_workflow(fn, name=name, version_name=version_name, is_latest=is_latest) + + @wraps(fn) + def innerfn(): + return fn + + if hasattr(fn, '_dapr_alternate_name'): + innerfn.__dict__['_dapr_alternate_name'] = fn.__dict__['_dapr_alternate_name'] + else: + innerfn.__dict__['_dapr_alternate_name'] = name + + innerfn.__signature__ = inspect.signature(fn) + return innerfn + + if __fn: + # This case is true when the decorator is used without arguments + # and the function to be decorated is passed as the first argument. + return wrapper(__fn) + + return wrapper + def workflow(self, __fn: Workflow = None, *, name: Optional[str] = None): """Decorator to register a workflow function. diff --git a/ext/dapr-ext-workflow/dapr/ext/workflow/workflow_state.py b/ext/dapr-ext-workflow/dapr/ext/workflow/workflow_state.py index af1d7e73..79b2c95a 100644 --- a/ext/dapr-ext-workflow/dapr/ext/workflow/workflow_state.py +++ b/ext/dapr-ext-workflow/dapr/ext/workflow/workflow_state.py @@ -27,6 +27,7 @@ class WorkflowStatus(Enum): TERMINATED = 4 PENDING = 5 SUSPENDED = 6 + STALLED = 7 class WorkflowState: @@ -53,6 +54,8 @@ def runtime_status(self) -> WorkflowStatus: return WorkflowStatus.PENDING elif self.__obj.runtime_status == client.OrchestrationStatus.SUSPENDED: return WorkflowStatus.SUSPENDED + elif self.__obj.runtime_status == client.OrchestrationStatus.STALLED: + return WorkflowStatus.STALLED else: return WorkflowStatus.UNKNOWN