Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 8 additions & 7 deletions redis/asyncio/connection.py
Original file line number Diff line number Diff line change
Expand Up @@ -1215,16 +1215,17 @@ def can_get_connection(self) -> bool:
version="5.3.0",
)
async def get_connection(self, command_name=None, *keys, **options):
"""Get a connected connection from the pool"""
async with self._lock:
"""Get a connected connection from the pool"""
connection = self.get_available_connection()
try:
await self.ensure_connection(connection)
except BaseException:
await self.release(connection)
raise

return connection
# We now perform the connection check outside of the lock.
try:
await self.ensure_connection(connection)
return connection
except BaseException:
await self.release(connection)
raise

def get_available_connection(self):
"""Get a connection from the pool, without making sure it is connected"""
Expand Down
75 changes: 75 additions & 0 deletions tests/test_asyncio/test_connection_pool.py
Original file line number Diff line number Diff line change
Expand Up @@ -222,6 +222,81 @@ async def test_pool_disconnect(self, master_host):
await pool.disconnect(inuse_connections=False)
assert conn.is_connected

async def test_lock_not_held_during_connection_establishment(self):
"""
Test that the connection pool lock is not held during the
ensure_connection call, which involves socket connection and handshake.
This is important for performance under high load.
"""
lock_states = []

class SlowConnectConnection(DummyConnection):
"""Connection that simulates slow connection establishment"""

async def connect(self):
# Check if the pool's lock is held during connection
# We access the pool through the outer scope
lock_states.append(pool._lock.locked())
# Simulate slow connection
await asyncio.sleep(0.01)
self._connected = True

async with self.get_pool(connection_class=SlowConnectConnection) as pool:
# Get a connection - this should call connect() outside the lock
connection = await pool.get_connection()

# Verify the lock was NOT held during connect
assert len(lock_states) > 0, "connect() should have been called"
assert lock_states[0] is False, (
"Lock should not be held during connection establishment"
)

await pool.release(connection)

async def test_concurrent_connection_acquisition_performance(self):
"""
Test that multiple concurrent connection acquisitions don't block
each other during connection establishment.
"""
connection_delay = 0.05
num_connections = 3

class SlowConnectConnection(DummyConnection):
"""Connection that simulates slow connection establishment"""

async def connect(self):
# Simulate slow connection (e.g., network latency, TLS handshake)
await asyncio.sleep(connection_delay)
self._connected = True

async with self.get_pool(
connection_class=SlowConnectConnection, max_connections=10
) as pool:
# Start acquiring multiple connections concurrently
start_time = asyncio.get_running_loop().time()

# Try to get connections concurrently
connections = await asyncio.gather(
*[pool.get_connection() for _ in range(num_connections)]
)

elapsed_time = asyncio.get_running_loop().time() - start_time

# With proper lock handling, these should complete mostly in parallel
# If the lock was held during connect(), it would take num_connections * connection_delay
# With lock only during pop, it should take ~connection_delay (connections in parallel)
# We allow 2.5x overhead for system variance
max_allowed_time = connection_delay * 2.5
assert elapsed_time < max_allowed_time, (
f"Concurrent connections took {elapsed_time:.3f}s, "
f"expected < {max_allowed_time:.3f}s. "
f"This suggests lock was held during connection establishment."
)

# Clean up
for conn in connections:
await pool.release(conn)


class TestBlockingConnectionPool:
@asynccontextmanager
Expand Down