Skip to content

Commit 6d6c1d2

Browse files
fix: fix spark xsd check to raise messageBearingError
1 parent c5bb802 commit 6d6c1d2

File tree

2 files changed

+9
-15
lines changed

2 files changed

+9
-15
lines changed

src/dve/core_engine/backends/implementations/spark/readers/xml.py

Lines changed: 8 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -5,24 +5,23 @@
55
from typing import Any, Optional
66

77
from pydantic import BaseModel
8+
from pyspark.errors.exceptions.base import AnalysisException
89
from pyspark.sql import DataFrame, SparkSession
910
from pyspark.sql import functions as sf
1011
from pyspark.sql.column import Column
1112
from pyspark.sql.types import StringType, StructField, StructType
12-
from pyspark.sql.utils import AnalysisException
1313
from typing_extensions import Literal
1414

1515
from dve.core_engine.backends.base.reader import read_function
16-
from dve.core_engine.backends.exceptions import EmptyFileError
16+
from dve.core_engine.backends.exceptions import EmptyFileError, MessageBearingError
1717
from dve.core_engine.backends.implementations.spark.spark_helpers import (
1818
df_is_empty,
1919
get_type_from_annotation,
2020
spark_write_parquet,
2121
)
2222
from dve.core_engine.backends.readers.xml import BasicXMLFileReader, XMLStreamReader
23-
from dve.core_engine.backends.utilities import dump_errors
2423
from dve.core_engine.type_hints import URI, EntityName
25-
from dve.parser.file_handling import get_content_length, get_parent
24+
from dve.parser.file_handling import get_content_length
2625
from dve.parser.file_handling.service import open_stream
2726

2827
SparkXMLMode = Literal["PERMISSIVE", "FAILFAST", "DROPMALFORMED"]
@@ -44,7 +43,7 @@ def read_to_dataframe(
4443
) -> DataFrame:
4544
"""Stream an XML file into a Spark data frame"""
4645
if not self.spark:
47-
self.spark = SparkSession.builder.getOrCreate()
46+
self.spark = SparkSession.builder.getOrCreate() # type: ignore
4847
spark_schema = get_type_from_annotation(schema)
4948
return self.spark.createDataFrame( # type: ignore
5049
list(self.read_to_py_iterator(resource, entity_name, schema)),
@@ -90,7 +89,7 @@ def __init__(
9089
rules_location=rules_location,
9190
)
9291

93-
self.spark_session = spark_session or SparkSession.builder.getOrCreate()
92+
self.spark_session = spark_session or SparkSession.builder.getOrCreate() # type: ignore
9493
self.sampling_ratio = sampling_ratio
9594
self.exclude_attribute = exclude_attribute
9695
self.mode = mode
@@ -122,9 +121,9 @@ def read_to_dataframe(
122121
if self.xsd_location:
123122
msg = self._run_xmllint(file_uri=resource)
124123
if msg:
125-
working_folder = get_parent(resource)
126-
dump_errors(
127-
working_folder=working_folder, step_name="file_transformation", messages=[msg]
124+
raise MessageBearingError(
125+
"Submitted file failed XSD validation.",
126+
messages=[msg],
128127
)
129128

130129
spark_schema: StructType = get_type_from_annotation(schema)

src/dve/core_engine/backends/readers/xml_linting.py

Lines changed: 1 addition & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -11,12 +11,7 @@
1111
from uuid import uuid4
1212

1313
from dve.core_engine.message import FeedbackMessage
14-
from dve.parser.file_handling import (
15-
copy_resource,
16-
get_file_name,
17-
get_resource_exists,
18-
open_stream,
19-
)
14+
from dve.parser.file_handling import copy_resource, get_file_name, get_resource_exists, open_stream
2015
from dve.parser.file_handling.implementations.file import file_uri_to_local_path
2116
from dve.parser.type_hints import URI
2217

0 commit comments

Comments
 (0)