Introduction
Sometimes, to save time and resources, it’s handy to have a collection of ready-to-use items, which we call a “pool.” This is especially helpful for things that are costly to create, like database connections. Here’s the simple concept: You gather these items in a group, often in a list, and have a manager who gives them out when required and keeps an eye on how they’re used.
One problem we face is that, besides getting objects from a pool, we also need to return them. We could do this manually, but it’s more elegant to have it done automatically, and that’s where a Context Manager comes in.
For this post I will use code and ideas from two previous articles, one on creating an object pool and one about Resource Acquisitiion is Initialization.
Implementation in Python
We will start by defining our imports:
import threading
import traceback
from typing import Self
Next we need to define the object in our object pool, this case a DbConnection
:
class DbConnection:
_id: int = None
def __init__(self, initial_value: int):
self._id = initial_value
@property
def id(self) -> int:
return self._id
This is very simple class, as its only attribute is an id.
Next we come to the ConnectionPool
itself:
This looks a bit complicated, so let’s break it down:
class ConnectionPool:
_idle_connections: list[DbConnection] = None
_active_connections: list[DbConnection] = None
_pool_lock: threading.Lock = None
def __init__(self, initial_connections: list[DbConnection]):
self._idle_connections = initial_connections
self._active_connections = []
self._pool_lock = threading.Lock()
def get_connection(self) -> DbConnection | None:
self._pool_lock.acquire()
try:
if len(self._idle_connections) == 0:
return None
result: DbConnection = self._idle_connections[0]
self._idle_connections = self._idle_connections[1:]
self._active_connections.append(result)
print(f"Gave out connection with Id {result.id}")
return result
finally:
self._pool_lock.release()
def remove_from_active(self, connection: DbConnection) -> DbConnection | None:
active_length = len(self._active_connections)
for index, conn in enumerate(self._active_connections):
if conn.id == connection.id:
self._active_connections[active_length - 1], self._active_connections[index] = self._active_connections[
index], self._active_connections[active_length - 1]
self._active_connections = self._active_connections[:active_length - 1]
return connection
return None
def return_connection(self, connection: DbConnection):
self._pool_lock.acquire()
try:
remove_result = self.remove_from_active(connection)
if remove_result is not None:
self._idle_connections.append(connection)
finally:
self._pool_lock.release()
def _return_all_connections(self):
self._pool_lock.acquire()
try:
current_connections = self._active_connections.copy()
for conn in current_connections:
remove_result=self.remove_from_active(conn)
if remove_result is not None:
self._idle_connections.append(conn)
finally:
self._pool_lock.release()
def __enter__(self) -> Self:
return self
def __exit__(self, exc_type: type | None, exc_val: Exception | None, exc_tb: traceback):
self._return_all_connections()
The attributes and the constructor
The attributes and consructor look like this:
_idle_connections: list[DbConnection] = None
_active_connections: list[DbConnection] = None
_pool_lock: threading.Lock = None
def __init__(self, initial_connections: list[DbConnection]):
self._idle_connections = initial_connections
self._active_connections = []
self._pool_lock = threading.Lock()
In summary, we have a list of free connections called _idleConnections, a list for connections in use called _activeConnections, and a lock called _poolLock to make the code thread-safe. In the constructor, we receive a list of initial connections and initialize the attributes.
Getting connections
For our connection pool we need to be able to get ready-made connections from the pool:
def get_connection(self) -> DbConnection | None:
self._pool_lock.acquire()
try:
if len(self._idle_connections) == 0:
return None
result: DbConnection = self._idle_connections[0]
self._idle_connections = self._idle_connections[1:]
self._active_connections.append(result)
print(f"Gave out connection with Id {result.id}")
return result
finally:
self._pool_lock.release()
This method locks the code, checks for available connections, gives one out if possible, and releases the lock.
Removing an active connection
There’s also a method for removing an active connection from the pool:
def remove_from_active(self, connection: DbConnection) -> DbConnection | None:
active_length = len(self._active_connections)
for index, conn in enumerate(self._active_connections):
if conn.id == connection.id:
self._active_connections[active_length - 1], self._active_connections[index] = self._active_connections[
index], self._active_connections[active_length - 1]
self._active_connections = self._active_connections[:active_length - 1]
return connection
return None
This method removes a connection from the list of active connections.
Returning a single connection
Returning a single connection to the pool is achieved with this method:
def return_connection(self, connection: DbConnection):
self._pool_lock.acquire()
try:
remove_result = self.remove_from_active(connection)
if remove_result is not None:
self._idle_connections.append(connection)
finally:
self._pool_lock.release()
It locks the thread, removes the connection from active, and adds it back to the idle connections.
Returning all connections
When exiting the context manager, all connections are returned:
def _return_all_connections(self):
self._pool_lock.acquire()
try:
current_connections = self._active_connections.copy()
for conn in current_connections:
remove_result=self.remove_from_active(conn)
if remove_result is not None:
self._idle_connections.append(conn)
finally:
self._pool_lock.release()
This method locks the thread, copies the active connections, iterates through them, removes each connection, and adds it back to the idle connections.
The Context Manager methods
In order to use the Context Manager, we create two special methods:
def __enter__(self) -> Self:
return self
def __exit__(self, exc_type: type | None, exc_val: Exception | None, exc_tb: traceback):
self._return_all_connections()
The __enter__
method sets things up and returns the current instance. The __exit__
method is executed when the context manager exits, and it handles exceptions and returns all connections to the pool.
Testing time
Now we can test our little setup:
if __name__ == "__main__":
connections: list[DbConnection] = [DbConnection(1), DbConnection(2)]
connection_pool: ConnectionPool = ConnectionPool(connections)
with connection_pool:
connection1: DbConnection = connection_pool.get_connection()
if connection1 is None:
print("Could not get connection1")
else:
print(f"Got oonnection with id {connection1.id}")
connection2: DbConnection = connection_pool.get_connection()
if connection2 is None:
print("Could not get connection2")
else:
print(f"Got oonnection with id {connection2.id}")
connection3: DbConnection = connection_pool.get_connection()
if connection3 is None:
print("Could not get connection3")
else:
print(f"Got oonnection with id {connection3.id}")
connection3: DbConnection = connection_pool.get_connection()
if connection3 is None:
print("Could not get connection3")
else:
print(f"Got oonnection3 with id {connection3.id}")
In the testing section, we create a ConnectionPool with two connections and use the context manager to get and return connections. This demonstrates how the combination of a Context Manager and a connection pool can simplify resource management.
Conclusion
The Object Pool pattern is a useful technique for efficiently managing and reusing resources. Using a Context Manager makes it even more convenient and secure.