diff --git a/backend/app/alembic/versions/042_add_score_trace_url_to_evaluation_run.py b/backend/app/alembic/versions/042_add_score_trace_url_to_evaluation_run.py new file mode 100644 index 000000000..46231d4b2 --- /dev/null +++ b/backend/app/alembic/versions/042_add_score_trace_url_to_evaluation_run.py @@ -0,0 +1,32 @@ +"""Add score_trace_url to evaluation_run + +Revision ID: 042 +Revises: 041 +Create Date: 2026-01-24 19:34:46.763908 + +""" +from alembic import op +import sqlalchemy as sa +import sqlmodel.sql.sqltypes + + +revision = "042" +down_revision = "041" +branch_labels = None +depends_on = None + + +def upgrade(): + op.add_column( + "evaluation_run", + sa.Column( + "score_trace_url", + sqlmodel.sql.sqltypes.AutoString(), + nullable=True, + comment="S3 URL where per-trace evaluation scores are stored", + ), + ) + + +def downgrade(): + op.drop_column("evaluation_run", "score_trace_url") diff --git a/backend/app/core/storage_utils.py b/backend/app/core/storage_utils.py index 63830d7d0..5b557872a 100644 --- a/backend/app/core/storage_utils.py +++ b/backend/app/core/storage_utils.py @@ -88,6 +88,7 @@ def upload_jsonl_to_object_store( results: list[dict], filename: str, subdirectory: str, + as_json_array: bool = False, ) -> str | None: """ Upload JSONL (JSON Lines) content to object store. @@ -115,7 +116,10 @@ def upload_jsonl_to_object_store( file_path = Path(subdirectory) / filename # Convert results to JSONL - jsonl_content = "\n".join([json.dumps(result) for result in results]) + if as_json_array: + jsonl_content = json.dumps(results, ensure_ascii=False) + else: + jsonl_content = "\n".join([json.dumps(result) for result in results]) content_bytes = jsonl_content.encode("utf-8") # Create UploadFile-like object @@ -152,6 +156,37 @@ def upload_jsonl_to_object_store( return None +def load_json_from_object_store(storage: CloudStorage, url: str) -> list | None: + logger.info(f"[load_json_from_object_store] Loading JSON from '{url}") + try: + body = storage.stream(url) + content = body.read() + + data = json.loads(content.decode("utf-8")) + + logger.info( + f"[load_json_from_object_store] Download successful | " + f"url='{url}', size={len(content)} bytes" + ) + return data + except CloudStorageError as e: + logger.warning( + f"[load_json_from_object_store] failed to load JSON from '{url}': {e}", + ) + return None + except json.JSONDecodeError as e: + logger.warning( + f"[load_json_from_object_store] JSON decode error loading JSON from '{url}': {e}", + ) + return None + except Exception as e: + logger.warning( + f"[load_json_from_object_store] unexpected error loading JSON from '{url}': {e}", + exc_info=True, + ) + return None + + def generate_timestamped_filename(base_name: str, extension: str = "csv") -> str: """ Generate a filename with timestamp. diff --git a/backend/app/crud/evaluations/core.py b/backend/app/crud/evaluations/core.py index 6cca562f2..01014f2fa 100644 --- a/backend/app/crud/evaluations/core.py +++ b/backend/app/crud/evaluations/core.py @@ -186,6 +186,7 @@ def update_evaluation_run( status: str | None = None, error_message: str | None = None, object_store_url: str | None = None, + score_trace_url: str | None = None, score: dict | None = None, embedding_batch_job_id: int | None = None, ) -> EvaluationRun: @@ -218,6 +219,8 @@ def update_evaluation_run( eval_run.score = score if embedding_batch_job_id is not None: eval_run.embedding_batch_job_id = embedding_batch_job_id + if score_trace_url is not None: + eval_run.score_trace_url = score_trace_url # Always update timestamp eval_run.updated_at = now() @@ -335,6 +338,8 @@ def save_score( Updated EvaluationRun instance, or None if not found """ from app.core.db import engine + from app.core.cloud.storage import get_cloud_storage + from app.core.storage_utils import upload_jsonl_to_object_store with Session(engine) as session: eval_run = get_evaluation_run_by_id( @@ -343,12 +348,61 @@ def save_score( organization_id=organization_id, project_id=project_id, ) - if eval_run: - update_evaluation_run(session=session, eval_run=eval_run, score=score) - logger.info( - f"[save_score] Saved score | evaluation_id={eval_run_id} | " - f"traces={len(score.get('traces', []))}" - ) + if not eval_run: + return None + + traces = score.get("traces", []) + summary_score = score.get("summary_scores", []) + score_trace_url = None + + if traces: + try: + storage = get_cloud_storage(session=session, project_id=project_id) + score_trace_url = upload_jsonl_to_object_store( + storage=storage, + results=traces, + filename=f"traces_{eval_run_id}.json", + subdirectory=f"evaluations/score/{eval_run_id}", + as_json_array=True, + ) + if score_trace_url: + logger.info( + f"[save_score] uploaded traces to S3 | " + f"evaluation_id={eval_run_id} | url={score_trace_url} | " + f"traces_count={len(traces)}" + ) + else: + logger.warning( + f"[save_score] failed to upload traces to S3, " + f"falling back to DB storage | evaluation_id={eval_run_id}" + ) + except Exception as e: + logger.error( + f"[save_score] Error uploading traces to S3: {e} | " + f"evaluation_id={eval_run_id}", + exc_info=True, + ) + + # IF TRACES DATA IS STORED IN S3 URL THEN HERE WE ARE JUST STORING THE SUMMARY SCORE + # TODO: Evaluate weather this behaviour is needed or completely discard the storing data in db + if score_trace_url: + db_score = {"summary_scores": summary_score} + else: + # fallback to store data in db if failed to store in s3 + db_score = score + + update_evaluation_run( + session=session, + eval_run=eval_run, + score=db_score, + score_trace_url=score_trace_url, + ) + + logger.info( + f"[save_score] Saved score | evaluation_id={eval_run_id} | " + f"traces={len(score.get('traces', []))}" + ) + return eval_run diff --git a/backend/app/models/evaluation.py b/backend/app/models/evaluation.py index 6ae4542fb..5aa8bf29d 100644 --- a/backend/app/models/evaluation.py +++ b/backend/app/models/evaluation.py @@ -250,6 +250,13 @@ class EvaluationRun(SQLModel, table=True): description="Object store URL of processed evaluation results for future reference", sa_column_kwargs={"comment": "S3 URL of processed evaluation results"}, ) + score_trace_url: str | None = SQLField( + default=None, + description="S3 URL per-trace score data is stored", + sa_column_kwargs={ + "comment": "S3 URL where per-trace evaluation scores are stored" + }, + ) total_items: int = SQLField( default=0, description="Total number of items evaluated (set during processing)", diff --git a/backend/app/services/evaluations/evaluation.py b/backend/app/services/evaluations/evaluation.py index 785eb02af..3fc463086 100644 --- a/backend/app/services/evaluations/evaluation.py +++ b/backend/app/services/evaluations/evaluation.py @@ -189,6 +189,9 @@ def get_evaluation_with_scores( Returns: Tuple of (EvaluationRun or None, error_message or None) """ + from app.core.cloud.storage import get_cloud_storage + from app.core.storage_utils import load_json_from_object_store + logger.info( f"[get_evaluation_with_scores] Fetching status for evaluation run | " f"evaluation_id={evaluation_id} | " @@ -227,9 +230,41 @@ def get_evaluation_with_scores( return eval_run, None # Check if we already have cached traces - has_cached_traces = eval_run.score is not None and "traces" in eval_run.score - if not resync_score and has_cached_traces: - return eval_run, None + has_cached_traces_s3 = eval_run.score_trace_url is not None + has_cached_traces_db = eval_run.score is not None and "traces" in eval_run.score + if not resync_score: + if has_cached_traces_s3: + try: + storage = get_cloud_storage(session=session, project_id=project_id) + traces = load_json_from_object_store( + storage=storage, url=eval_run.score_trace_url + ) + if traces is not None: + eval_run.score = { + "summary_scores": (eval_run.score or {}).get( + "summary_scores", [] + ), + "traces": traces, + } + logger.info( + f"[get_evaluation_with_scores] Loaded traces from S3 | " + f"evaluation_id={evaluation_id} | " + f"traces_count={len(traces)}" + ) + return eval_run, None + except Exception as e: + logger.error( + f"[get_evaluation_with_scores] Error loading traces from S3: {e} | " + f"evaluation_id={evaluation_id}", + exc_info=True, + ) + + elif has_cached_traces_db: + logger.info( + f"[get_evaluation_with_scores] Returning traces from DB | " + f"evaluation_id={evaluation_id}" + ) + return eval_run, None langfuse = get_langfuse_client( session=session, @@ -289,4 +324,7 @@ def get_evaluation_with_scores( score=score, ) + if eval_run: + eval_run.score = score + return eval_run, None