How to Create an Easy Thread-Safe Object Pool in Python with ContextManager

Introduction

Sometimes, to save time and resources, it’s handy to have a collection of ready-to-use items, which we call a “pool.” This is especially helpful for things that are costly to create, like database connections. Here’s the simple concept: You gather these items in a group, often in a list, and have a manager who gives them out when required and keeps an eye on how they’re used.

One problem we face is that, besides getting objects from a pool, we also need to return them. We could do this manually, but it’s more elegant to have it done automatically, and that’s where a Context Manager comes in.

For this post I will use code and ideas from two previous articles, one on creating an object pool and one about Resource Acquisitiion is Initialization.

Implementation in Python

We will start by defining our imports:

import threading
import traceback
from typing import Self

Next we need to define the object in our object pool, this case a DbConnection:

class DbConnection:
    _id: int = None

    def __init__(self, initial_value: int):
        self._id = initial_value

    @property
    def id(self) -> int:
        return self._id

This is very simple class, as its only attribute is an id.

Next we come to the ConnectionPool itself:

This looks a bit complicated, so let’s break it down:

class ConnectionPool:
    _idle_connections: list[DbConnection] = None
    _active_connections: list[DbConnection] = None
    _pool_lock: threading.Lock = None

    def __init__(self, initial_connections: list[DbConnection]):
        self._idle_connections = initial_connections
        self._active_connections = []
        self._pool_lock = threading.Lock()

    def get_connection(self) -> DbConnection | None:
        self._pool_lock.acquire()

        try:
            if len(self._idle_connections) == 0:
                return None
            result: DbConnection = self._idle_connections[0]
            self._idle_connections = self._idle_connections[1:]
            self._active_connections.append(result)
            print(f"Gave out connection with Id {result.id}")
            return result
        finally:
            self._pool_lock.release()

    def remove_from_active(self, connection: DbConnection) -> DbConnection | None:
        active_length = len(self._active_connections)
        for index, conn in enumerate(self._active_connections):
            if conn.id == connection.id:
                self._active_connections[active_length - 1], self._active_connections[index] = self._active_connections[
                    index], self._active_connections[active_length - 1]
                self._active_connections = self._active_connections[:active_length - 1]
                return connection
        return None

    def return_connection(self, connection: DbConnection):
        self._pool_lock.acquire()
        try:
            remove_result = self.remove_from_active(connection)
            if remove_result is not None:
                self._idle_connections.append(connection)
        finally:
            self._pool_lock.release()

    def _return_all_connections(self):
        self._pool_lock.acquire()
        try:
            current_connections = self._active_connections.copy()
            for conn in current_connections:
                remove_result=self.remove_from_active(conn)
                if remove_result is not None:
                    self._idle_connections.append(conn)
        finally:
            self._pool_lock.release()

    def __enter__(self) -> Self:
        return self

    def __exit__(self, exc_type: type | None, exc_val: Exception | None, exc_tb: traceback):
        self._return_all_connections()

The attributes and the constructor

The attributes and consructor look like this:

    _idle_connections: list[DbConnection] = None
    _active_connections: list[DbConnection] = None
    _pool_lock: threading.Lock = None

    def __init__(self, initial_connections: list[DbConnection]):
        self._idle_connections = initial_connections
        self._active_connections = []
        self._pool_lock = threading.Lock()

In summary, we have a list of free connections called _idleConnections, a list for connections in use called _activeConnections, and a lock called _poolLock to make the code thread-safe. In the constructor, we receive a list of initial connections and initialize the attributes.

Getting connections

For our connection pool we need to be able to get ready-made connections from the pool:

    def get_connection(self) -> DbConnection | None:
        self._pool_lock.acquire()

        try:
            if len(self._idle_connections) == 0:
                return None
            result: DbConnection = self._idle_connections[0]
            self._idle_connections = self._idle_connections[1:]
            self._active_connections.append(result)
            print(f"Gave out connection with Id {result.id}")
            return result
        finally:
            self._pool_lock.release()

This method locks the code, checks for available connections, gives one out if possible, and releases the lock.

Removing an active connection

There’s also a method for removing an active connection from the pool:

    def remove_from_active(self, connection: DbConnection) -> DbConnection | None:
        active_length = len(self._active_connections)
        for index, conn in enumerate(self._active_connections):
            if conn.id == connection.id:
                self._active_connections[active_length - 1], self._active_connections[index] = self._active_connections[
                    index], self._active_connections[active_length - 1]
                self._active_connections = self._active_connections[:active_length - 1]
                return connection
        return None

This method removes a connection from the list of active connections.

Returning a single connection

Returning a single connection to the pool is achieved with this method:

    def return_connection(self, connection: DbConnection):
        self._pool_lock.acquire()
        try:
            remove_result = self.remove_from_active(connection)
            if remove_result is not None:
                self._idle_connections.append(connection)
        finally:
            self._pool_lock.release()

It locks the thread, removes the connection from active, and adds it back to the idle connections.

Returning all connections

When exiting the context manager, all connections are returned:

    def _return_all_connections(self):
        self._pool_lock.acquire()
        try:
            current_connections = self._active_connections.copy()
            for conn in current_connections:
                remove_result=self.remove_from_active(conn)
                if remove_result is not None:
                    self._idle_connections.append(conn)
        finally:
            self._pool_lock.release()

This method locks the thread, copies the active connections, iterates through them, removes each connection, and adds it back to the idle connections.

The Context Manager methods

In order to use the Context Manager, we create two special methods:

    def __enter__(self) -> Self:
        return self

    def __exit__(self, exc_type: type | None, exc_val: Exception | None, exc_tb: traceback):
        self._return_all_connections()

The __enter__ method sets things up and returns the current instance. The __exit__ method is executed when the context manager exits, and it handles exceptions and returns all connections to the pool.

Testing time

Now we can test our little setup:

if __name__ == "__main__":
    connections: list[DbConnection] = [DbConnection(1), DbConnection(2)]
    connection_pool: ConnectionPool = ConnectionPool(connections)
    with connection_pool:
        connection1: DbConnection = connection_pool.get_connection()
        if connection1 is None:
            print("Could not get connection1")
        else:
            print(f"Got oonnection with id {connection1.id}")

        connection2: DbConnection = connection_pool.get_connection()
        if connection2 is None:
            print("Could not get connection2")
        else:
            print(f"Got oonnection with id {connection2.id}")

        connection3: DbConnection = connection_pool.get_connection()
        if connection3 is None:
            print("Could not get connection3")
        else:
            print(f"Got oonnection with id {connection3.id}")

    connection3: DbConnection = connection_pool.get_connection()
    if connection3 is None:
        print("Could not get connection3")
    else:
        print(f"Got oonnection3 with id {connection3.id}")

In the testing section, we create a ConnectionPool with two connections and use the context manager to get and return connections. This demonstrates how the combination of a Context Manager and a connection pool can simplify resource management.

Conclusion

The Object Pool pattern is a useful technique for efficiently managing and reusing resources. Using a Context Manager makes it even more convenient and secure.

Leave a Reply

Your email address will not be published. Required fields are marked *