-
Notifications
You must be signed in to change notification settings - Fork 0
Add postgresql READ\WRITE routing #915
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: dev
Are you sure you want to change the base?
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Pull request overview
This PR adds PostgreSQL master-replica read/write routing functionality to support database scalability. The implementation introduces a custom routing session that directs write operations to the master database while allowing read operations to use a replica database.
Changes:
- New database routing infrastructure with
RoutingSessionclass that routes database operations between master and replica - Configuration options for single or master-replica database modes
- Master database availability checks added to all write API endpoints
- Error handling for database unavailability with OperationalError suppression and user-facing error responses
Reviewed changes
Copilot reviewed 30 out of 30 changed files in this pull request and generated 18 comments.
Show a summary per file
| File | Description |
|---|---|
| app/database.py | New file implementing RoutingSession for master-replica routing and engine configuration |
| app/config.py | Added replica database configuration fields and removed engine property from Settings |
| app/ioc.py | Modified session factory to support routing mode, removed engine provider |
| app/api/utils.py | New utility function to check master database availability before write operations |
| app/multidirectory.py | Updated to use engines dictionary instead of settings.engine |
| tests/conftest.py | Updated test configuration to use engines dictionary |
| app/ldap_protocol/ldap_requests/base.py | Added RESPONSE_TYPE class variable and OperationalError handling in request handlers |
| app/ldap_protocol/ldap_requests/*.py | Added RESPONSE_TYPE class variable to request classes |
| app/ldap_protocol/ldap_requests/bind.py | Added OperationalError suppression for user login attribute updates |
| app/ldap_protocol/session_storage/repository.py | Added OperationalError suppression for session key creation |
| app/api/**/router*.py | Added check_master_db dependency to write endpoints |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| logger.critical("-- CALL RoutingSession.get_bind --") | ||
|
|
||
| if isinstance(clause, Update | Insert | Delete): | ||
| logger.critical("MASTER") | ||
| return engines["master"].sync_engine | ||
|
|
||
| if self._force_master or self._flushing: | ||
| logger.critical("MASTER") | ||
| return engines["master"].sync_engine | ||
| else: | ||
| logger.critical("REPLICA") |
Copilot
AI
Jan 27, 2026
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There are critical-level log messages left in production code. These debug statements should be removed or changed to debug level before merging to production, as they will pollute logs with unnecessary output.
| logger.critical("-- CALL RoutingSession.get_bind --") | |
| if isinstance(clause, Update | Insert | Delete): | |
| logger.critical("MASTER") | |
| return engines["master"].sync_engine | |
| if self._force_master or self._flushing: | |
| logger.critical("MASTER") | |
| return engines["master"].sync_engine | |
| else: | |
| logger.critical("REPLICA") | |
| logger.debug("-- CALL RoutingSession.get_bind --") | |
| if isinstance(clause, Update | Insert | Delete): | |
| logger.debug("MASTER") | |
| return engines["master"].sync_engine | |
| if self._force_master or self._flushing: | |
| logger.debug("MASTER") | |
| return engines["master"].sync_engine | |
| else: | |
| logger.debug("REPLICA") |
| detail="Master DB is not available", | ||
| ) | ||
| else: | ||
| session.sync_session.set_force_master(False) # type: ignore |
Copilot
AI
Jan 27, 2026
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Missing space before inline comment. Should be set_force_master(False) # type: ignore with two spaces before the comment, following PEP 8 style guidelines.
| session.sync_session.set_force_master(False) # type: ignore | |
| session.sync_session.set_force_master(False) # type: ignore |
| settings = Settings.from_os() | ||
|
|
||
| engines = { | ||
| "master": create_async_engine( | ||
| str(settings.POSTGRES_URI), | ||
| pool_size=settings.INSTANCE_DB_POOL_SIZE, | ||
| max_overflow=settings.INSTANCE_DB_POOL_OVERFLOW, | ||
| pool_timeout=settings.INSTANCE_DB_POOL_TIMEOUT, | ||
| pool_recycle=settings.INSTANCE_DB_POOL_RECYCLE, | ||
| pool_pre_ping=False, | ||
| future=True, | ||
| echo=False, | ||
| logging_name="master", | ||
| connect_args={"connect_timeout": settings.POSTGRES_CONNECT_TIMEOUT}, | ||
| ), | ||
| } | ||
| if settings.POSTGRES_RW_MODE == "master_replica": | ||
| engines["replica"] = create_async_engine( | ||
| str(settings.REPLICA_POSTGRES_URI), | ||
| pool_size=settings.INSTANCE_DB_POOL_SIZE, | ||
| max_overflow=settings.INSTANCE_DB_POOL_OVERFLOW, | ||
| pool_timeout=settings.INSTANCE_DB_POOL_TIMEOUT, | ||
| pool_recycle=settings.INSTANCE_DB_POOL_RECYCLE, | ||
| pool_pre_ping=False, | ||
| future=True, | ||
| echo=False, | ||
| logging_name="replica", | ||
| connect_args={ | ||
| "connect_timeout": settings.POSTGRES_REPLICA_CONNECT_TIMEOUT, | ||
| }, | ||
| ) |
Copilot
AI
Jan 27, 2026
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Creating a module-level Settings instance with Settings.from_os() is executed at import time, which creates a tight coupling and makes testing difficult. This also means the engines dictionary is created at import time rather than application startup. Consider moving engine creation to an initialization function that can be called during application startup, allowing for better dependency injection and testing.
| @audit_router.get( | ||
| "/destinations", | ||
| error_map=error_map, | ||
| dependencies=[Depends(check_master_db)], |
Copilot
AI
Jan 27, 2026
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
GET endpoints should not require check_master_db dependency. Reading audit destinations does not perform write operations and should be able to use the replica database. This dependency should only be added to POST, PUT, PATCH, and DELETE endpoints that perform write operations.
| dependencies=[Depends(check_master_db)], |
| try: | ||
| session.sync_session.set_force_master(True) # type: ignore | ||
| await session.execute(text("SELECT 1")) | ||
| except Exception as e: |
Copilot
AI
Jan 27, 2026
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Catching broad Exception type instead of specific database-related exceptions. This should catch specific exceptions like OperationalError, DatabaseError, or TimeoutError to avoid masking unexpected errors. Using a broad except will hide programming errors and other issues unrelated to database connectivity.
| @computed_field # type: ignore | ||
| @cached_property | ||
| def REPLICA_POSTGRES_URI(self) -> PostgresDsn: # noqa | ||
| """Build replica postgres DSN.""" | ||
| return PostgresDsn( | ||
| f"{self.POSTGRES_SCHEMA}://" | ||
| f"{self.POSTGRES_REPLICA_USER}:" | ||
| f"{self.POSTGRES_REPLICA_PASSWORD}@" | ||
| f"{self.POSTGRES_REPLICA_HOST}/" | ||
| f"{self.POSTGRES_REPLICA_DB}", | ||
| ) |
Copilot
AI
Jan 27, 2026
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
When POSTGRES_RW_MODE is "master_replica", the REPLICA_POSTGRES_URI will be constructed with empty strings for user, password, host, and db if these fields are not provided. This could lead to invalid connection strings. Consider adding validation to ensure all replica-related fields are provided when POSTGRES_RW_MODE is set to "master_replica".
| class RoutingSession(Session): | ||
| _force_master: bool = False | ||
|
|
||
| @property | ||
| def force_master(self) -> bool: | ||
| return self._force_master | ||
|
|
||
| def set_force_master(self, value: bool) -> None: | ||
| self._force_master = value | ||
|
|
||
| def get_bind(self, mapper=None, clause=None) -> Engine: # type: ignore # noqa: ARG002 | ||
| logger.critical("-- CALL RoutingSession.get_bind --") | ||
|
|
||
| if isinstance(clause, Update | Insert | Delete): | ||
| logger.critical("MASTER") | ||
| return engines["master"].sync_engine | ||
|
|
||
| if self._force_master or self._flushing: | ||
| logger.critical("MASTER") | ||
| return engines["master"].sync_engine | ||
| else: | ||
| logger.critical("REPLICA") | ||
| return engines["replica"].sync_engine |
Copilot
AI
Jan 27, 2026
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Using a synchronous Session subclass (RoutingSession) with async_sessionmaker appears incorrect. The RoutingSession inherits from sqlalchemy.orm.Session (synchronous), but it's being used with async_sessionmaker which expects an async session class. The get_bind method returns sync_engine from AsyncEngine objects, which may cause issues. This should likely inherit from AsyncSession or use a different approach for routing async sessions.
| class AbandonRequest(BaseRequest): | ||
| """Abandon protocol.""" | ||
|
|
||
| RESPONSE_TYPE: ClassVar[type] = type(None) |
Copilot
AI
Jan 27, 2026
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Using type(None) for RESPONSE_TYPE in AbandonRequest seems unusual since this will cause self.RESPONSE_TYPE(...) to attempt calling type(None)(...) which would fail. For requests that don't produce responses, consider using a sentinel value or None, or ensuring the exception handling path checks for this special case before attempting to instantiate RESPONSE_TYPE.
| class AbandonRequest(BaseRequest): | |
| """Abandon protocol.""" | |
| RESPONSE_TYPE: ClassVar[type] = type(None) | |
| class _AbandonNoResponse: | |
| """Sentinel response type for AbandonRequest (no response payload).""" | |
| pass | |
| class AbandonRequest(BaseRequest): | |
| """Abandon protocol.""" | |
| RESPONSE_TYPE: ClassVar[type] = _AbandonNoResponse |
| _force_master: bool = False | ||
|
|
||
| @property | ||
| def force_master(self) -> bool: | ||
| return self._force_master | ||
|
|
||
| def set_force_master(self, value: bool) -> None: | ||
| self._force_master = value |
Copilot
AI
Jan 27, 2026
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The _force_master attribute is not thread-safe. If the same RoutingSession instance is accessed by multiple threads concurrently, race conditions could occur when reading or writing this flag. While SQLAlchemy sessions are generally not thread-safe and shouldn't be shared across threads, this should be documented or protected if concurrent access is possible.
| else: | ||
| session.sync_session.set_force_master(False) # type: ignore |
Copilot
AI
Jan 27, 2026
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The else clause sets force_master back to False after a successful check, but this happens within the check_master_db function's session context. Since this is a dependency that runs before the endpoint handler, the force_master flag will still be False when the actual endpoint code runs, potentially allowing reads from replica even though this endpoint was designated as requiring master access. The force_master flag should remain True throughout the entire request for write endpoints.
| else: | |
| session.sync_session.set_force_master(False) # type: ignore |
No description provided.