Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
"""Add score_trace_url to evaluation_run

Revision ID: 041
Revises: 040
Create Date: 2026-01-24 19:34:46.763908

"""
from alembic import op
import sqlalchemy as sa
import sqlmodel.sql.sqltypes


revision = "041"
down_revision = "040"
branch_labels = None
depends_on = None


def upgrade():
op.add_column(
"evaluation_run",
sa.Column(
"score_trace_url",
sqlmodel.sql.sqltypes.AutoString(),
nullable=True,
comment="S3 URL where per-trace evaluation scores are stored",
),
)


def downgrade():
op.drop_column("evaluation_run", "score_trace_url")
37 changes: 36 additions & 1 deletion backend/app/core/storage_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,7 @@ def upload_jsonl_to_object_store(
results: list[dict],
filename: str,
subdirectory: str,
as_json_array: bool = False,
) -> str | None:
"""
Upload JSONL (JSON Lines) content to object store.
Expand Down Expand Up @@ -115,7 +116,10 @@ def upload_jsonl_to_object_store(
file_path = Path(subdirectory) / filename

# Convert results to JSONL
jsonl_content = "\n".join([json.dumps(result) for result in results])
if as_json_array:
jsonl_content = json.dumps(results, ensure_ascii=False)
else:
jsonl_content = "\n".join([json.dumps(result) for result in results])
content_bytes = jsonl_content.encode("utf-8")

# Create UploadFile-like object
Expand Down Expand Up @@ -152,6 +156,37 @@ def upload_jsonl_to_object_store(
return None


def load_json_from_object_store(storage: CloudStorage, url: str) -> list | None:
logger.info(f"[load_json_from_object_store] Loading JSON from '{url}")
try:
body = storage.stream(url)
content = body.read()

data = json.loads(content.decode("utf-8"))

logger.info(
f"[load_json_from_object_store] Download successful | "
f"url='{url}', size={len(content)} bytes"
)
return data
except CloudStorageError as e:
logger.warning(
f"[load_json_from_object_store] failed to load JSON from '{url}': {e}",
)
return None
except json.JSONDecodeError as e:
logger.warning(
f"[load_json_from_object_store] JSON decode error loading JSON from '{url}': {e}",
)
return None
except Exception as e:
logger.warning(
f"[load_json_from_object_store] unexpected error loading JSON from '{url}': {e}",
exc_info=True,
)
return None


def generate_timestamped_filename(base_name: str, extension: str = "csv") -> str:
"""
Generate a filename with timestamp.
Expand Down
66 changes: 60 additions & 6 deletions backend/app/crud/evaluations/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -147,6 +147,7 @@ def update_evaluation_run(
status: str | None = None,
error_message: str | None = None,
object_store_url: str | None = None,
score_trace_url: str | None = None,
score: dict | None = None,
embedding_batch_job_id: int | None = None,
) -> EvaluationRun:
Expand Down Expand Up @@ -179,6 +180,8 @@ def update_evaluation_run(
eval_run.score = score
if embedding_batch_job_id is not None:
eval_run.embedding_batch_job_id = embedding_batch_job_id
if score_trace_url is not None:
eval_run.score_trace_url = score_trace_url

# Always update timestamp
eval_run.updated_at = now()
Expand Down Expand Up @@ -296,6 +299,8 @@ def save_score(
Updated EvaluationRun instance, or None if not found
"""
from app.core.db import engine
from app.core.cloud.storage import get_cloud_storage
from app.core.storage_utils import upload_jsonl_to_object_store

with Session(engine) as session:
eval_run = get_evaluation_run_by_id(
Expand All @@ -304,10 +309,59 @@ def save_score(
organization_id=organization_id,
project_id=project_id,
)
if eval_run:
update_evaluation_run(session=session, eval_run=eval_run, score=score)
logger.info(
f"[save_score] Saved score | evaluation_id={eval_run_id} | "
f"traces={len(score.get('traces', []))}"
)
if not eval_run:
return None

traces = score.get("traces", [])
summary_score = score.get("summary_scores", [])
score_trace_url = None

if traces:
try:
storage = get_cloud_storage(session=session, project_id=project_id)
score_trace_url = upload_jsonl_to_object_store(
storage=storage,
results=traces,
filename=f"traces_{eval_run_id}.json",
subdirectory=f"evaluations/score/{eval_run_id}",
as_json_array=True,
)
if score_trace_url:
logger.info(
f"[save_score] uploaded traces to S3 | "
f"evaluation_id={eval_run_id} | url={score_trace_url} | "
f"traces_count={len(traces)}"
)
else:
logger.warning(
f"[save_score] failed to upload traces to S3, "
f"falling back to DB storage | evaluation_id={eval_run_id}"
)
except Exception as e:
logger.error(
f"[save_score] Error uploading traces to S3: {e} | "
f"evaluation_id={eval_run_id}",
exc_info=True,
)

# IF TRACES DATA IS STORED IN S3 URL THEN HERE WE ARE JUST STORING THE SUMMARY SCORE
# TODO: Evaluate weather this behaviour is needed or completely discard the storing data in db
if score_trace_url:
db_score = {"summary_scores": summary_score}
else:
# fallback to store data in db if failed to store in s3
db_score = score

update_evaluation_run(
session=session,
eval_run=eval_run,
score=db_score,
score_trace_url=score_trace_url,
)

logger.info(
f"[save_score] Saved score | evaluation_id={eval_run_id} | "
f"traces={len(score.get('traces', []))}"
)

return eval_run
7 changes: 7 additions & 0 deletions backend/app/models/evaluation.py
Original file line number Diff line number Diff line change
Expand Up @@ -247,6 +247,13 @@ class EvaluationRun(SQLModel, table=True):
description="Object store URL of processed evaluation results for future reference",
sa_column_kwargs={"comment": "S3 URL of processed evaluation results"},
)
score_trace_url: str | None = SQLField(
default=None,
description="S3 URL per-trace score data is stored",
sa_column_kwargs={
"comment": "S3 URL where per-trace evaluation scores are stored"
},
)
total_items: int = SQLField(
default=0,
description="Total number of items evaluated (set during processing)",
Expand Down
44 changes: 41 additions & 3 deletions backend/app/services/evaluations/evaluation.py
Original file line number Diff line number Diff line change
Expand Up @@ -244,6 +244,9 @@ def get_evaluation_with_scores(
Returns:
Tuple of (EvaluationRun or None, error_message or None)
"""
from app.core.cloud.storage import get_cloud_storage
from app.core.storage_utils import load_json_from_object_store

logger.info(
f"[get_evaluation_with_scores] Fetching status for evaluation run | "
f"evaluation_id={evaluation_id} | "
Expand Down Expand Up @@ -282,9 +285,41 @@ def get_evaluation_with_scores(
return eval_run, None

# Check if we already have cached traces
has_cached_traces = eval_run.score is not None and "traces" in eval_run.score
if not resync_score and has_cached_traces:
return eval_run, None
has_cached_traces_s3 = eval_run.score_trace_url is not None
has_cached_traces_db = eval_run.score is not None and "traces" in eval_run.score
if not resync_score:
if has_cached_traces_s3:
try:
storage = get_cloud_storage(session=session, project_id=project_id)
traces = load_json_from_object_store(
storage=storage, url=eval_run.score_trace_url
)
if traces is not None:
eval_run.score = {
"summary_scores": (eval_run.score or {}).get(
"summary_scores", []
),
"traces": traces,
}
logger.info(
f"[get_evaluation_with_scores] Loaded traces from S3 | "
f"evaluation_id={evaluation_id} | "
f"traces_count={len(traces)}"
)
return eval_run, None
except Exception as e:
logger.error(
f"[get_evaluation_with_scores] Error loading traces from S3: {e} | "
f"evaluation_id={evaluation_id}",
exc_info=True,
)

elif has_cached_traces_db:
logger.info(
f"[get_evaluation_with_scores] Returning traces from DB | "
f"evaluation_id={evaluation_id}"
)
return eval_run, None

langfuse = get_langfuse_client(
session=session,
Expand Down Expand Up @@ -344,4 +379,7 @@ def get_evaluation_with_scores(
score=score,
)

if eval_run:
eval_run.score = score

return eval_run, None