Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
41 changes: 41 additions & 0 deletions examples/workflow/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -461,3 +461,44 @@ app1 - received workflow error from app2
```
among others. This shows that the workflow calls are failing as expected, and they are being handled as expected too.


### Versioning

This example demonstrates how to version a workflow. The Dapr CLI can be started using the following command:

<!--STEP
name: Run the versioning example
match_order: none
expected_stdout_lines:
- "== APP == test1: triggering workflow"
- "== APP == test1: Received workflow call for version1"
- "== APP == test1: Finished workflow for version1"
- "== APP == test2: triggering workflow"
- "== APP == test2: Received workflow call for version1"
- "== APP == test2: Finished workflow for version1"
- "== APP == test3: triggering workflow"
- "== APP == test3: Received workflow call for version2"
- "== APP == test3: Finished workflow for version2"
- "== APP == test4: start"
- "== APP == test4: patch1 is patched"
- "== APP == test5: start"
- "== APP == test5: patch1 is not patched"
- "== APP == test5: patch2 is patched"
- "== APP == test6: start"
- "== APP == test6: patch1 is patched"
- "== APP == test6: patch2 is patched"
- "== APP == test7: Received workflow call for version1"
- "== APP == test7: Workflow is stalled"
- "== APP == test8: Workflow is stalled"
- "== APP == test100: part2"
- "== APP == test100: Received workflow call for version1"
- "== APP == test100: Finished stalled version1 workflow"
- "== APP == test100: Finished stalled patching workflow"
timeout_seconds: 60
-->

```sh
dapr run --app-id wf-versioning-example -- python3 versioning.py part1
dapr run --app-id wf-versioning-example -- python3 versioning.py part2
```
<!--END_STEP-->
272 changes: 272 additions & 0 deletions examples/workflow/versioning.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,272 @@
# -*- coding: utf-8 -*-
# Copyright 2026 The Dapr Authors
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
# http://www.apache.org/licenses/LICENSE-2.0
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

import sys
import time
from datetime import timedelta

import dapr.ext.workflow as wf
from durabletask.task import TaskFailedError

wfr = wf.WorkflowRuntime()

current_test = 0
def print_test(message: str):
print(f'test{current_test}: {message}', flush=True)

def test_full_versioning(client: wf.DaprWorkflowClient):
global current_test

# Start with only one version defined. Runnig the workflow should run this version as it normally would.
current_test = 1
@wfr.versioned_workflow(name="workflow", is_latest=True)
def version1_workflow(ctx: wf.DaprWorkflowContext):
print_test('Received workflow call for version1')
yield ctx.wait_for_external_event(name='event')
print_test('Finished workflow for version1')
return 1

print_test('triggering workflow')
instance_id = client.schedule_new_workflow(workflow=version1_workflow)
client.raise_workflow_event(instance_id, event_name='event')
client.wait_for_workflow_completion(instance_id, timeout_in_seconds=30)

# Now we start a workflow, but introduce a latest version half way. It should resume the execution in the old version.
current_test = 2
print_test('triggering workflow')
instance_id = client.schedule_new_workflow(workflow=version1_workflow)
time.sleep(2) # wait for the workflow to start and wait for the event

@wfr.versioned_workflow(name="workflow", is_latest=True)
def version2_workflow(ctx: wf.DaprWorkflowContext):
print_test('Received workflow call for version2')
yield ctx.wait_for_external_event(name='event')
print_test('Finished workflow for version2')
return 1

client.raise_workflow_event(instance_id, event_name='event')
client.wait_for_workflow_completion(instance_id, timeout_in_seconds=30)


# Now we have the two versions defined, running the workflow now should run v2 as it's the latest version.
current_test = 3
print_test('triggering workflow')
instance_id = client.schedule_new_workflow(workflow=version1_workflow)
client.raise_workflow_event(instance_id, event_name='event')
client.wait_for_workflow_completion(instance_id, timeout_in_seconds=30)

def test_patching(client: wf.DaprWorkflowClient):
global current_test

@wfr.workflow
def patching_workflow(ctx: wf.DaprWorkflowContext):
# This function will be changed throughout the test, to simulate different scenarios
return workflow_code(ctx)


# Runs the patched branch by default
current_test = 4
def workflow_code(ctx: wf.DaprWorkflowContext):
print_test('start')
if ctx.is_patched('patch1'):
print_test('patch1 is patched')
else:
print_test('patch1 is not patched')
return 1

instance_id = client.schedule_new_workflow(workflow=patching_workflow)
client.wait_for_workflow_completion(instance_id, timeout_in_seconds=30)

# When the execution passed the place where a patch is introduced, it should be not patched.
def workflow_code(ctx: wf.DaprWorkflowContext):
print_test('start')
yield ctx.wait_for_external_event(name='event')
if ctx.is_patched('patch2'):
print_test('patch2 is patched')
else:
print_test('patch2 is not patched')
return 1

current_test = 5
instance_id = client.schedule_new_workflow(workflow=patching_workflow)
time.sleep(2)
def workflow_code(ctx: wf.DaprWorkflowContext):
print_test('start')
if ctx.is_patched('patch1'):
print_test('patch1 is patched')
else:
print_test('patch1 is not patched')
yield ctx.wait_for_external_event(name='event')
if ctx.is_patched('patch2'):
print_test('patch2 is patched')
else:
print_test('patch2 is not patched')
return 1

client.raise_workflow_event(instance_id, event_name='event')
client.wait_for_workflow_completion(instance_id, timeout_in_seconds=30)

# It remembers previous patches.
def workflow_code(ctx: wf.DaprWorkflowContext):
print_test('start')
if ctx.is_patched('patch1'):
pass # keep it silenced for now, we'll add logs later and this ones would confuse the test
else:
pass
yield ctx.wait_for_external_event(name='event')
if ctx.is_patched('patch2'):
print_test('patch2 is patched')
else:
print_test('patch2 is not patched')
return 1

current_test = 6
instance_id = client.schedule_new_workflow(workflow=patching_workflow)
time.sleep(2)
def workflow_code(ctx: wf.DaprWorkflowContext):
print_test('start')
if ctx.is_patched('patch1'):
print_test('patch1 is patched')
else:
print_test('patch1 is not patched')
yield ctx.wait_for_external_event(name='event')
if ctx.is_patched('patch2'):
print_test('patch2 is patched')
else:
print_test('patch2 is not patched')
return 1

client.raise_workflow_event(instance_id, event_name='event')
client.wait_for_workflow_completion(instance_id, timeout_in_seconds=30)


def test_full_versioning_stall(client: wf.DaprWorkflowClient):
global current_test

new_wfr()
@wfr.versioned_workflow(name="stall_workflow", is_latest=True)
def version1_workflow(ctx: wf.DaprWorkflowContext):
print_test('Received workflow call for version1')
yield ctx.wait_for_external_event(name='event')
print_test('Finished workflow for version1')
return 1

wfr.start()
current_test = 7
instance_id = client.schedule_new_workflow(workflow=version1_workflow)
time.sleep(2)
new_wfr()

@wfr.versioned_workflow(name="stall_workflow", is_latest=True)
def version2_workflow(ctx: wf.DaprWorkflowContext):
print_test('Received workflow call for version2')
yield ctx.wait_for_external_event(name='event')
print_test('Finished workflow for version2')
return 1
wfr.start()
client.raise_workflow_event(instance_id, event_name='event')
time.sleep(2)
md = client.get_workflow_state(instance_id)
if md.runtime_status == wf.WorkflowStatus.STALLED:
print_test('Workflow is stalled')
else:
print_test('Workflow is not stalled')

def test_patching_stall(client: wf.DaprWorkflowClient):
global current_test

current_test = 8
@wfr.workflow
def patching_workflow(ctx: wf.DaprWorkflowContext):
# This function will be changed throughout the test, to simulate different scenarios
return workflow_code(ctx)

def workflow_code(ctx: wf.DaprWorkflowContext):
if ctx.is_patched('patch1'):
pass
else:
pass
yield ctx.wait_for_external_event(name='event')
return 1

instance_id = client.schedule_new_workflow(workflow=patching_workflow)
time.sleep(2)

def workflow_code(ctx: wf.DaprWorkflowContext):
# Removed patch1 check
yield ctx.wait_for_external_event(name='event')
return 1

client.raise_workflow_event(instance_id, event_name='event')
time.sleep(2)
md = client.get_workflow_state(instance_id)
if md.runtime_status == wf.WorkflowStatus.STALLED:
print_test('Workflow is stalled')
else:
print_test('Workflow is not stalled')


def new_wfr():
global wfr
wfr.shutdown()
wfr = wf.WorkflowRuntime()


def main():
args = sys.argv[1:]
if len(args) == 0:
print('Usage: python versioning.py <part1|part2>')
return
if args[0] == 'part1':
wfr.start()
time.sleep(2) # wait for workflow runtime to start
client = wf.DaprWorkflowClient()

test_full_versioning(client)
test_patching(client)

test_full_versioning_stall(client)
test_patching_stall(client)
wfr.shutdown()
elif args[0] == 'part2':
global current_test
current_test = 100
print_test('part2')
@wfr.versioned_workflow(name="stall_workflow", is_latest=False)
def version1_workflow(ctx: wf.DaprWorkflowContext):
print_test('Received workflow call for version1')
yield ctx.wait_for_external_event(name='event')
print_test('Finished stalled version1 workflow')
return 1
@wfr.versioned_workflow(name="stall_workflow", is_latest=True)
def version2_workflow(ctx: wf.DaprWorkflowContext):
print_test('Received workflow call for version2')
yield ctx.wait_for_external_event(name='event')
print_test('Finished stalled version2 workflow')
return 1

@wfr.workflow
def patching_workflow(ctx: wf.DaprWorkflowContext):
if ctx.is_patched('patch1'):
pass
else:
pass
yield ctx.wait_for_external_event(name='event')
print_test('Finished stalled patching workflow')
return 1

wfr.start()
time.sleep(10)
wfr.shutdown()

if __name__ == '__main__':
main()
Original file line number Diff line number Diff line change
Expand Up @@ -154,6 +154,11 @@ def continue_as_new(self, new_input: Any, *, save_events: bool = False) -> None:
self.__obj.continue_as_new(new_input, save_events=save_events)


def is_patched(self, patch_name: str) -> bool:
self._logger.debug(f'{self.instance_id}: Checking if {patch_name} is patched')
return self.__obj.is_patched(patch_name)


def when_all(tasks: List[task.Task[T]]) -> task.WhenAllTask[T]:
"""Returns a task that completes when all of the provided tasks complete or when one of the
tasks fail."""
Expand Down
55 changes: 55 additions & 0 deletions ext/dapr-ext-workflow/dapr/ext/workflow/workflow_runtime.py
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,38 @@ def orchestrationWrapper(ctx: task.OrchestrationContext, inp: Optional[TInput] =
)
fn.__dict__['_workflow_registered'] = True

def register_versioned_workflow(self, fn: Workflow, *, name: str, version_name: Optional[str] = None, is_latest: bool):
self._logger.info(f"Registering version {version_name} of workflow '{fn.__name__}' with runtime")

def orchestrationWrapper(ctx: task.OrchestrationContext, inp: Optional[TInput] = None):
"""Responsible to call Workflow function in orchestrationWrapper"""
daprWfContext = DaprWorkflowContext(ctx, self._logger.get_options())
if inp is None:
return fn(daprWfContext)
return fn(daprWfContext, inp)

if hasattr(fn, '_workflow_registered'):
# whenever a workflow is registered, it has a _dapr_alternate_name attribute
alt_name = fn.__dict__['_dapr_alternate_name']
raise ValueError(f'Workflow {fn.__name__} already registered as {alt_name}')
if hasattr(fn, '_dapr_alternate_name'):
alt_name = fn._dapr_alternate_name
if name is not None:
m = f'Workflow {fn.__name__} already has an alternate name {alt_name}'
raise ValueError(m)
else:
fn.__dict__['_dapr_alternate_name'] = name

actual_version_name = version_name if version_name is not None else fn.__name__

self.__worker._registry.add_named_orchestrator(
name,
orchestrationWrapper,
version_name=actual_version_name,
is_latest=is_latest,
)
fn.__dict__['_workflow_registered'] = True

def register_activity(self, fn: Activity, *, name: Optional[str] = None):
"""Registers a workflow activity as a function that takes
a specified input type and returns a specified output type.
Expand Down Expand Up @@ -146,6 +178,29 @@ def shutdown(self):
"""Stops the listening for work items on a background thread."""
self.__worker.stop()

def versioned_workflow(self, __fn: Workflow = None, *, name: str, version_name: Optional[str] = None, is_latest: bool):
def wrapper(fn: Workflow):
self.register_versioned_workflow(fn, name=name, version_name=version_name, is_latest=is_latest)

@wraps(fn)
def innerfn():
return fn

if hasattr(fn, '_dapr_alternate_name'):
innerfn.__dict__['_dapr_alternate_name'] = fn.__dict__['_dapr_alternate_name']
else:
innerfn.__dict__['_dapr_alternate_name'] = name

innerfn.__signature__ = inspect.signature(fn)
return innerfn

if __fn:
# This case is true when the decorator is used without arguments
# and the function to be decorated is passed as the first argument.
return wrapper(__fn)

return wrapper

def workflow(self, __fn: Workflow = None, *, name: Optional[str] = None):
"""Decorator to register a workflow function.

Expand Down
Loading
Loading