Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions .fides/db_dataset.yml
Original file line number Diff line number Diff line change
Expand Up @@ -1064,6 +1064,8 @@ dataset:
data_categories: [system.operations]
- name: digest_config_id
data_categories: [system.operations]
- name: condition_tree
data_categories: [system.operations]
- name: parent_id
data_categories: [system.operations]
- name: digest_condition_type
Expand Down Expand Up @@ -1277,6 +1279,8 @@ dataset:
data_categories: [system.operations]
- name: manual_task_id
data_categories: [system.operations]
- name: condition_tree
data_categories: [system.operations]
- name: parent_id
data_categories: [system.operations]
- name: condition_type
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,125 @@
"""add jsonb tree column

Revision ID: 85ce2c1c9579
Revises: b9c8e7f6d5a4
Create Date: 2025-12-16 16:30:52.073758

"""

import json

import sqlalchemy as sa
from alembic import op
from sqlalchemy.dialects import postgresql
from sqlalchemy.orm import Session

# revision identifiers, used by Alembic.
revision = "85ce2c1c9579"
down_revision = "b9c8e7f6d5a4"
branch_labels = None
depends_on = None


def build_condition_tree(
db: Session, table_name: str, row_id: str, id_column: str = "id"
):
"""Recursively build a condition tree from row-based storage.

Returns:
dict: Condition tree as a dictionary (ConditionLeaf or ConditionGroup format)
"""
result = db.execute(
sa.text(
f"SELECT condition_type, field_address, operator, value, logical_operator "
f"FROM {table_name} WHERE {id_column} = :row_id"
),
{"row_id": row_id},
).fetchone()

if not result:
return None

condition_type, field_address, operator, value, logical_operator = result

if condition_type == "leaf":
parsed_value = value
if isinstance(value, str):
try:
parsed_value = json.loads(value)
except (json.JSONDecodeError, TypeError):
parsed_value = value

return {
"field_address": field_address,
"operator": operator,
"value": parsed_value,
}

# It's a group - get children ordered by sort_order
children_rows = db.execute(
sa.text(
f"SELECT {id_column} FROM {table_name} "
f"WHERE parent_id = :parent_id ORDER BY sort_order"
),
{"parent_id": row_id},
).fetchall()

child_conditions = []
for (child_id,) in children_rows:
child_tree = build_condition_tree(db, table_name, child_id, id_column)
if child_tree:
child_conditions.append(child_tree)

if not child_conditions:
return None

return {
"logical_operator": logical_operator,
"conditions": child_conditions,
}


def migrate_conditions(db: Session, table_name: str):
"""Migrate existing row-based condition trees to JSONB format for the given table."""
root_rows = db.execute(
sa.text(f"SELECT id FROM {table_name} WHERE parent_id IS NULL")
).fetchall()

for (root_id,) in root_rows:
tree = build_condition_tree(db, table_name, root_id)

if tree:
db.execute(
sa.text(
f"UPDATE {table_name} "
"SET condition_tree = :tree WHERE id = :root_id"
),
{"tree": json.dumps(tree), "root_id": root_id},
)


def upgrade():
# Step 1: Add condition_tree column to both tables
op.add_column(
"digest_condition",
sa.Column(
"condition_tree", postgresql.JSONB(astext_type=sa.Text()), nullable=True
),
)
op.add_column(
"manual_task_conditional_dependency",
sa.Column(
"condition_tree", postgresql.JSONB(astext_type=sa.Text()), nullable=True
),
)

# Step 2: Migrate existing row-based trees to JSONB
db = Session(op.get_bind())
migrate_conditions(db, "manual_task_conditional_dependency")
migrate_conditions(db, "digest_condition")
db.commit()


def downgrade():
op.drop_column("manual_task_conditional_dependency", "condition_tree")
op.drop_column("digest_condition", "condition_tree")
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
from enum import Enum
from typing import TYPE_CHECKING, Any, Optional, Union

from pydantic import TypeAdapter
from sqlalchemy import Column, Integer, String
from sqlalchemy.dialects.postgresql import JSONB
from sqlalchemy.orm import Session
Expand All @@ -17,6 +18,10 @@
from sqlalchemy.orm.relationships import RelationshipProperty


# TypeAdapter for deserializing JSONB to Condition (handles Union discrimination)
ConditionTypeAdapter: TypeAdapter[Condition] = TypeAdapter(Condition)


class ConditionalDependencyError(Exception):
"""Exception for conditional dependency errors."""

Expand All @@ -41,9 +46,12 @@ class ConditionalDependencyBase(Base):
"""Abstract base class for all conditional dependency models.

This class provides a common structure for building hierarchical condition trees
that can be evaluated to determine when certain actions should be taken.
or storing condition trees as a single JSONB object that can be evaluated to
determine when certain actions should be taken.

Architecture:
- JSONB Storage: Full condition tree stored as a single JSONB object
- Pydantic Integration: Uses Condition schema for serialization/deserialization
- Tree Structure: Supports parent-child relationships for complex logic
- Two Node Types: 'leaf' (individual conditions) and 'group' (logical operators)
- Flexible Schema: Uses JSONB for dynamic value storage
Expand Down Expand Up @@ -83,6 +91,9 @@ class ConditionalDependencyBase(Base):

__abstract__ = True

# JSONB storage for full condition tree
condition_tree = Column(JSONB, nullable=True)

# Tree structure - parent_id defined in concrete classes for proper foreign keys
condition_type = Column(
EnumColumn(ConditionalDependencyType), nullable=False, index=True
Expand Down Expand Up @@ -195,25 +206,19 @@ def to_condition_group(self) -> ConditionGroup:

@classmethod
def get_root_condition(cls, db: Session, **kwargs: Any) -> Optional[Condition]:
"""Get the root condition tree for a parent entity.
"""Get the condition tree for a parent entity.

This abstract method must be implemented by concrete subclasses to define
how to retrieve the root condition node for their specific use case.
how to retrieve the condition tree for their specific use case.
The root condition represents the top-level node in a condition tree.

Implementation Guidelines:
1. Query for conditions with parent_id=None for the given parent entity
2. Return None if no root condition exists
3. Convert the database model to a Condition schema object
4. Handle any domain-specific filtering or validation

Args:
db: SQLAlchemy database session for querying
**kwargs: Keyword arguments specific to each implementation.
Examples:
- manual_task_id: ID of the manual task (for single-type hierarchies)
- digest_config_id: ID of the digest config (for multi-type hierarchies)
- digest_condition_type: Type of digest condition (for multi-type hierarchies)
- manual_task_id: ID of the manual task
- digest_config_id: ID of the digest config
- digest_condition_type: Type of digest condition (RECEIVER, CONTENT, PRIORITY)

Returns:
Optional[Condition]: Root condition tree (ConditionLeaf or ConditionGroup) or None
Expand All @@ -222,90 +227,10 @@ def get_root_condition(cls, db: Session, **kwargs: Any) -> Optional[Condition]:
Raises:
NotImplementedError: If called on the base class directly

Example Implementation:
>>> @classmethod
>>> def get_root_condition(cls, db: Session, *, manual_task_id: str) -> Optional[Condition]:
... root = db.query(cls).filter(
... cls.manual_task_id == manual_task_id,
... cls.parent_id.is_(None)
... ).first()
... if not root:
... return None
... return root.to_condition_leaf() if root.condition_type == 'leaf' else root.to_condition_group()
"""
raise NotImplementedError(
f"Subclasses of {cls.__name__} must implement get_root_condition(). "
f"This method should query for the root condition (parent_id=None) "
f"and return it as a Condition schema object, or None if not found. "
f"See the docstring for implementation guidelines and examples."
)

def get_depth(self) -> int:
"""Calculate the depth of this node in the condition tree.

Returns:
int: Depth level (0 for root, 1 for direct children, etc.)

Note:
Requires the 'parent' relationship to be defined in concrete classes.
"""
depth = 0
current = self
try:
while hasattr(current, "parent") and current.parent is not None: # type: ignore[attr-defined]
depth += 1
current = current.parent # type: ignore[attr-defined]
except AttributeError:
# If parent relationship not defined, we can't calculate depth
pass
return depth

def get_tree_summary(self) -> str:
"""Generate a human-readable summary of this condition tree.

Returns:
str: Multi-line string representation of the condition tree structure

Example:
>>> print(condition.get_tree_summary())
Group (AND) [depth: 0, order: 0]
├── Leaf: user.role == "admin" [depth: 1, order: 0]
├── Leaf: request.priority >= 3 [depth: 1, order: 1]
└── Group (OR) [depth: 1, order: 2]
├── Leaf: user.dept == "security" [depth: 2, order: 0]
└── Leaf: user.dept == "compliance" [depth: 2, order: 1]
"""

def _build_tree_lines(
node: "ConditionalDependencyBase", prefix: str = "", is_last: bool = True
) -> list[str]:
lines = []

# Current node info
if node.condition_type == ConditionalDependencyType.leaf:
node_desc = f"Leaf: {node.field_address} {node.operator} {node.value}"
else:
node_desc = f"Group ({node.logical_operator.upper() if node.logical_operator else 'UNKNOWN'})"

depth = node.get_depth()
connector = "└── " if is_last else "├── "
lines.append(
f"{prefix}{connector}{node_desc} [depth: {depth}, order: {node.sort_order}]"
)

# Add children if this is a group
if node.condition_type == ConditionalDependencyType.group:
try:
children = sorted([child for child in node.children], key=lambda x: x.sort_order) # type: ignore[attr-defined]
for i, child in enumerate(children):
is_last_child = i == len(children) - 1
child_prefix = prefix + (" " if is_last else "│ ")
lines.extend(
_build_tree_lines(child, child_prefix, is_last_child)
)
except AttributeError:
lines.append(f"{prefix} [children relationship not defined]")

return lines

return "\n".join(_build_tree_lines(self))
Loading
Loading