Skip to content

Commit 4f63ea9

Browse files
authored
feat: add persistance of error aggregates to pipeline
1 parent 90ff789 commit 4f63ea9

File tree

5 files changed

+36
-5
lines changed

5 files changed

+36
-5
lines changed

src/dve/pipeline/foundry_ddb_pipeline.py

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,8 @@
55
from dve.core_engine.exceptions import CriticalProcessingError
66
from dve.core_engine.models import SubmissionInfo
77
from dve.core_engine.type_hints import URI, Failed
8+
from dve.parser.file_handling.implementations.file import LocalFilesystemImplementation
9+
from dve.parser.file_handling.service import _get_implementation
810
from dve.pipeline.duckdb_pipeline import DDBDVEPipeline
911
from dve.pipeline.utils import SubmissionStatus
1012
from dve.parser import file_handling as fh
@@ -18,13 +20,17 @@ class FoundryDDBPipeline(DDBDVEPipeline):
1820
def persist_audit_records(self, submission_info: SubmissionInfo) -> URI:
1921
"""Write out key audit relations to parquet for persisting to datasets"""
2022
write_to = fh.joinuri(self.processed_files_path, submission_info.submission_id, "audit/")
23+
if isinstance(_get_implementation(write_to), LocalFilesystemImplementation):
24+
write_to = fh.file_uri_to_local_path(write_to)
25+
write_to.parent.mkdir(parents=True, exist_ok=True)
26+
write_to = write_to.as_posix()
2127
self.write_parquet(
2228
self._audit_tables._processing_status.get_relation(),
23-
write_to + "processing_status.parquet",
29+
fh.joinuri(write_to, "processing_status.parquet"),
2430
)
2531
self.write_parquet(
2632
self._audit_tables._submission_statistics.get_relation(),
27-
write_to + "submission_statistics.parquet",
33+
fh.joinuri(write_to, "submission_statistics.parquet"),
2834
)
2935
return write_to
3036

src/dve/pipeline/pipeline.py

Lines changed: 18 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22
"""Generic Pipeline object to define how DVE should be interacted with."""
33
from itertools import starmap
44
import json
5+
from pathlib import Path
56
import re
67
from collections import defaultdict
78
from collections.abc import Generator, Iterable, Iterator
@@ -16,6 +17,8 @@
1617

1718
from dve.core_engine.exceptions import CriticalProcessingError
1819
from dve.core_engine.message import FeedbackMessage
20+
from dve.parser.file_handling.implementations.file import LocalFilesystemImplementation
21+
from dve.parser.file_handling.service import _get_implementation
1922
import dve.reporting.excel_report as er
2023
from dve.core_engine.backends.base.auditing import BaseAuditingManager
2124
from dve.core_engine.backends.base.contract import BaseDataContract
@@ -635,7 +638,18 @@ def business_rule_step(
635638
)
636639

637640
return successful_files, unsucessful_files, failed_processing
638-
641+
642+
def _publish_error_aggregates(self, submission_id:str, aggregates_df: pl.DataFrame) -> URI:
643+
"""Store error aggregates as parquet for auditing"""
644+
output_uri = fh.joinuri(self.processed_files_path, submission_id, "audit", "error_aggregates.parquet")
645+
if isinstance(_get_implementation(output_uri), LocalFilesystemImplementation):
646+
output_uri = fh.file_uri_to_local_path(output_uri)
647+
output_uri.parent.mkdir(parents=True, exist_ok=True)
648+
output_uri = output_uri.as_posix()
649+
aggregates_df = aggregates_df.with_columns(pl.lit(submission_id).alias("submission_id"))
650+
aggregates_df.write_parquet(output_uri)
651+
return output_uri
652+
639653
@lru_cache() # noqa: B019
640654
def _get_error_dataframes(self, submission_id: str):
641655
if not self.processed_files_path:
@@ -738,6 +752,8 @@ def error_report(self,
738752
)
739753
with fh.open_stream(report_uri, "wb") as stream:
740754
stream.write(er.ExcelFormat.convert_to_bytes(workbook))
755+
756+
self._publish_error_aggregates(submission_info.submission_id, aggregates)
741757

742758
return submission_info, submission_status, sub_stats, report_uri
743759

@@ -842,3 +858,4 @@ def cluster_pipeline_run(
842858
)
843859

844860
yield from report_results # type: ignore
861+

tests/features/movies.feature

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,7 @@ Feature: Pipeline tests using the movies dataset
4141
| record_count | 5 |
4242
| number_record_rejections | 4 |
4343
| number_warnings | 1 |
44+
And the error aggregates are persisted
4445

4546
Scenario: Validate and filter movies (duckdb)
4647
Given I submit the movies file movies.json for processing
@@ -76,4 +77,5 @@ Feature: Pipeline tests using the movies dataset
7677
| record_count | 5 |
7778
| number_record_rejections | 4 |
7879
| number_warnings | 1 |
80+
And the error aggregates are persisted
7981

tests/features/steps/steps_post_pipeline.py

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -111,3 +111,9 @@ def check_stats_record(context):
111111
get_pipeline(context)._audit_tables.get_submission_statistics(sub_info.submission_id).dict()
112112
)
113113
assert all([val == stats.get(fld) for fld, val in expected.items()])
114+
115+
@then("the error aggregates are persisted")
116+
def check_error_aggregates_persisted(context):
117+
processing_location = get_processing_location(context)
118+
agg_file = Path(processing_location, "audit", "error_aggregates.parquet")
119+
assert agg_file.exists() and agg_file.is_file()

tests/test_pipeline/test_foundry_ddb_pipeline.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -49,7 +49,7 @@ def test_foundry_runner_validation_fail(planet_test_files, temp_ddb_conn):
4949
output_loc, report_uri, audit_files = dve_pipeline.run_pipeline(sub_info)
5050
assert fh.get_resource_exists(report_uri)
5151
assert not output_loc
52-
assert len(list(fh.iter_prefix(audit_files))) == 2
52+
assert len(list(fh.iter_prefix(audit_files))) == 3
5353

5454

5555
def test_foundry_runner_validation_success(movies_test_files, temp_ddb_conn):
@@ -86,7 +86,7 @@ def test_foundry_runner_validation_success(movies_test_files, temp_ddb_conn):
8686
output_loc, report_uri, audit_files = dve_pipeline.run_pipeline(sub_info)
8787
assert fh.get_resource_exists(report_uri)
8888
assert len(list(fh.iter_prefix(output_loc))) == 2
89-
assert len(list(fh.iter_prefix(audit_files))) == 2
89+
assert len(list(fh.iter_prefix(audit_files))) == 3
9090

9191
def test_foundry_runner_error(planet_test_files, temp_ddb_conn):
9292
# using spark reader config - should error in file transformation - check gracefully handled

0 commit comments

Comments
 (0)