22import logging
33from abc import ABC , abstractmethod
44from dataclasses import dataclass
5- from typing import Any , Dict , Optional
5+ from typing import Any , Dict , Optional , Sequence , Union
66
77from pymongo .errors import PyMongoError
88
@@ -19,6 +19,7 @@ class ExecutionContext:
1919
2020 query : str
2121 execution_mode : str = "standard"
22+ parameters : Optional [Union [Sequence [Any ], Dict [str , Any ]]] = None
2223
2324 def __repr__ (self ) -> str :
2425 return f"ExecutionContext(mode={ self .execution_mode } , " f"query={ self .query } )"
@@ -38,13 +39,15 @@ def execute(
3839 self ,
3940 context : ExecutionContext ,
4041 connection : Any ,
42+ parameters : Optional [Union [Sequence [Any ], Dict [str , Any ]]] = None ,
4143 ) -> Optional [Dict [str , Any ]]:
4244 """
4345 Execute query and return result set.
4446
4547 Args:
4648 context: ExecutionContext with query and subquery info
4749 connection: MongoDB connection
50+ parameters: Sequence for positional (?) or Dict for named (:param) parameters
4851
4952 Returns:
5053 command_result with query results
@@ -86,19 +89,59 @@ def _parse_sql(self, sql: str) -> ExecutionPlan:
8689 _logger .error (f"SQL parsing failed: { e } " )
8790 raise SqlSyntaxError (f"Failed to parse SQL: { e } " )
8891
89- def _execute_execution_plan (self , execution_plan : ExecutionPlan , db : Any ) -> Optional [Dict [str , Any ]]:
92+ def _replace_placeholders (self , obj : Any , parameters : Sequence [Any ]) -> Any :
93+ """Recursively replace ? placeholders with parameter values in filter/projection dicts"""
94+ param_index = [0 ] # Use list to allow modification in nested function
95+
96+ def replace_recursive (value : Any ) -> Any :
97+ if isinstance (value , str ):
98+ # Replace ? with the next parameter value
99+ if value == "?" :
100+ if param_index [0 ] < len (parameters ):
101+ result = parameters [param_index [0 ]]
102+ param_index [0 ] += 1
103+ return result
104+ else :
105+ raise ProgrammingError (
106+ f"Not enough parameters provided: expected at least { param_index [0 ] + 1 } "
107+ )
108+ return value
109+ elif isinstance (value , dict ):
110+ return {k : replace_recursive (v ) for k , v in value .items ()}
111+ elif isinstance (value , list ):
112+ return [replace_recursive (item ) for item in value ]
113+ else :
114+ return value
115+
116+ return replace_recursive (obj )
117+
118+ def _execute_execution_plan (
119+ self ,
120+ execution_plan : ExecutionPlan ,
121+ db : Any ,
122+ parameters : Optional [Sequence [Any ]] = None ,
123+ ) -> Optional [Dict [str , Any ]]:
90124 """Execute an ExecutionPlan against MongoDB using db.command"""
91125 try :
92126 # Get database
93127 if not execution_plan .collection :
94128 raise ProgrammingError ("No collection specified in query" )
95129
130+ # Replace placeholders with parameters in filter_stage only (not in projection)
131+ filter_stage = execution_plan .filter_stage or {}
132+
133+ if parameters :
134+ # Positional parameters with ? (named parameters are converted to positional in execute())
135+ filter_stage = self ._replace_placeholders (filter_stage , parameters )
136+
137+ projection_stage = execution_plan .projection_stage or {}
138+
96139 # Build MongoDB find command
97- find_command = {"find" : execution_plan .collection , "filter" : execution_plan . filter_stage or {} }
140+ find_command = {"find" : execution_plan .collection , "filter" : filter_stage }
98141
99142 # Apply projection if specified
100- if execution_plan . projection_stage :
101- find_command ["projection" ] = execution_plan . projection_stage
143+ if projection_stage :
144+ find_command ["projection" ] = projection_stage
102145
103146 # Apply sort if specified
104147 if execution_plan .sort_stage :
@@ -135,14 +178,28 @@ def execute(
135178 self ,
136179 context : ExecutionContext ,
137180 connection : Any ,
181+ parameters : Optional [Union [Sequence [Any ], Dict [str , Any ]]] = None ,
138182 ) -> Optional [Dict [str , Any ]]:
139183 """Execute standard query directly against MongoDB"""
140184 _logger .debug (f"Using standard execution for query: { context .query [:100 ]} " )
141185
186+ # Preprocess query to convert named parameters to positional
187+ processed_query = context .query
188+ processed_params = parameters
189+ if isinstance (parameters , dict ):
190+ # Convert :param_name to ? for parsing
191+ import re
192+
193+ param_names = re .findall (r":(\w+)" , context .query )
194+ # Convert dict parameters to list in order of appearance
195+ processed_params = [parameters [name ] for name in param_names ]
196+ # Replace :param_name with ?
197+ processed_query = re .sub (r":(\w+)" , "?" , context .query )
198+
142199 # Parse the query
143- self ._execution_plan = self ._parse_sql (context . query )
200+ self ._execution_plan = self ._parse_sql (processed_query )
144201
145- return self ._execute_execution_plan (self ._execution_plan , connection .database )
202+ return self ._execute_execution_plan (self ._execution_plan , connection .database , processed_params )
146203
147204
148205class ExecutionPlanFactory :
0 commit comments