Python’s Active Object Pattern: Concurrency Made Easy for Responsive Applications

Photo by Ruslan Alekso: https://www.pexels.com/photo/close-up-photo-of-light-bulb-2224219/

Introduction

Sometimes you need to decouple method execution from method invocation. In such cases the Active Object design pattern is a good choice. It allows for more flexible and above all concurrent systems. One the of the hallmarks of this pattern is the fact that each method call is encapsulated, or packaged if you will in an object.

These objects are then placed in a queue. The Active Object itself then processes the queue in a separate thread.

This all sounds rather abstract, so we will implement a very simplified example.

Implementation in Python

In this example, we’ll create a basic log processor. Instead of writing logs directly to the console, we’ll use a queue to process log entries asynchronously. This is especially useful when dealing with time-consuming tasks like writing logs to a database or a remote server.

Let’s start:

import queue
import threading
import time

In most logging systems, you can usually have several types of severity, this is how we implement that in Python:

class LogSeverityType:
    Info = "Info"
    Warning = "Warning"
    Error = "Error"

Now let’s implement the LogMessage:

class LogMessage:
    _severity: str = None
    _message: str = None

    def __init__(self, severity: str, initial_message: str):
        self._severity = severity
        self._message = initial_message

    @property
    def severity(self) -> str:
        return self._severity

    @property
    def message(self) -> str:
        return self._message

Nothing special, a LogMessage has a message and a severity

Before we start on our active object, we need to define a function to process the messages:

def process_message(message_to_process: LogMessage) -> None:
    print(f"Processing: ({message_to_process.severity}): {message_to_process.message}")
    time.sleep(0.05)
    print(f"Processed: ({message_to_process.severity}): {message_to_process.message}")

All this does is print out two messages, but you could imagine it doing more, like writing to a database or a webservice.

Now we come to our active object, the ActiveLogger:

class ActiveLogger:
    _queue: queue.Queue[LogMessage] = None
    _stop_event: threading.Event = None
    _log_thread: threading.Thread = None

    def __init__(self):
        self._stop_event = threading.Event()
        self._log_thread = threading.Thread(target=self._log_processor)
        self._queue = queue.Queue()

    def log(self, message: LogMessage) -> None:
        self._queue.put(message)

    def start_logging(self) -> None:
        self._log_thread.start()

    def stop_logging(self) -> None:
        self._stop_event.set()
        self._log_thread.join()

    def _log_processor(self) -> None:
        while not self._stop_event.is_set() or not self._queue.empty():
            message_log_processor: LogMessage = self._queue.get()
            process_message(message_log_processor)

Some notable points:

  1. The ActiveLogger has a queue for LogMessage objects.
  2. It also contains an as of yet unset stop_event
  3. And a thread on which it listens to incoming LogMessage objects and processes them, in the _log_processor() method.
  4. There is the log() method to send messages to the queue
  5. And of course there are some start and stop methods to switch the processor on or off.
  6. Note that in the stop() method we call the join() which waits for the logging thread to finish.

Time to test

Now we can test our setup by sending message to the ActiveLogger:

if __name__ == '__main__':
    logger: ActiveLogger = ActiveLogger()
    logger.start_logging()

    for i in range(1, 11):
        message: LogMessage = LogMessage(LogSeverityType.Info, f"Message number is {i}")
        logger.log(message)

    message = LogMessage(LogSeverityType.Error, "There has been an error")
    logger.log(message)

    logger.stop_logging()
    print("Stopped the processor")

We create our processor and turn it on, and the start sending messages to the queue, which are picked up in the thread. After that is all done, we stop the processor.

Conclusion

Python’s support for multi-threading makes implementing the Active Object pattern quite straightforward. This pattern is useful for various scenarios, such as message processing, printer spooling, or chat applications, where decoupling method execution from invocation and handling tasks concurrently is beneficial.

Leave a Reply

Your email address will not be published. Required fields are marked *