Introduction
Sometimes you need to decouple method execution from method invocation. In such cases the Active Object design pattern is a good choice. It allows for more flexible and above all concurrent systems. One the of the hallmarks of this pattern is the fact that each method call is encapsulated, or packaged if you will in an object.
These objects are then placed in a queue. The Active Object itself then processes the queue in a separate thread.
This all sounds rather abstract, so we will implement a very simplified example.
Implementation in Python
In this example, we’ll create a basic log processor. Instead of writing logs directly to the console, we’ll use a queue to process log entries asynchronously. This is especially useful when dealing with time-consuming tasks like writing logs to a database or a remote server.
Let’s start:
import queue
import threading
import time
In most logging systems, you can usually have several types of severity, this is how we implement that in Python:
class LogSeverityType:
Info = "Info"
Warning = "Warning"
Error = "Error"
Now let’s implement the LogMessage
:
class LogMessage:
_severity: str = None
_message: str = None
def __init__(self, severity: str, initial_message: str):
self._severity = severity
self._message = initial_message
@property
def severity(self) -> str:
return self._severity
@property
def message(self) -> str:
return self._message
Nothing special, a LogMessage
has a message and a severity
Before we start on our active object, we need to define a function to process the messages:
def process_message(message_to_process: LogMessage) -> None:
print(f"Processing: ({message_to_process.severity}): {message_to_process.message}")
time.sleep(0.05)
print(f"Processed: ({message_to_process.severity}): {message_to_process.message}")
All this does is print out two messages, but you could imagine it doing more, like writing to a database or a webservice.
Now we come to our active object, the ActiveLogger
:
class ActiveLogger:
_queue: queue.Queue[LogMessage] = None
_stop_event: threading.Event = None
_log_thread: threading.Thread = None
def __init__(self):
self._stop_event = threading.Event()
self._log_thread = threading.Thread(target=self._log_processor)
self._queue = queue.Queue()
def log(self, message: LogMessage) -> None:
self._queue.put(message)
def start_logging(self) -> None:
self._log_thread.start()
def stop_logging(self) -> None:
self._stop_event.set()
self._log_thread.join()
def _log_processor(self) -> None:
while not self._stop_event.is_set() or not self._queue.empty():
message_log_processor: LogMessage = self._queue.get()
process_message(message_log_processor)
Some notable points:
- The
ActiveLogger
has a queue forLogMessage
objects. - It also contains an as of yet unset
stop_event
- And a thread on which it listens to incoming
LogMessage
objects and processes them, in the_log_processor()
method. - There is the
log()
method to send messages to the queue - And of course there are some start and stop methods to switch the processor on or off.
- Note that in the
stop()
method we call thejoin()
which waits for the logging thread to finish.
Time to test
Now we can test our setup by sending message to the ActiveLogger
:
if __name__ == '__main__':
logger: ActiveLogger = ActiveLogger()
logger.start_logging()
for i in range(1, 11):
message: LogMessage = LogMessage(LogSeverityType.Info, f"Message number is {i}")
logger.log(message)
message = LogMessage(LogSeverityType.Error, "There has been an error")
logger.log(message)
logger.stop_logging()
print("Stopped the processor")
We create our processor and turn it on, and the start sending messages to the queue, which are picked up in the thread. After that is all done, we stop the processor.
Conclusion
Python’s support for multi-threading makes implementing the Active Object pattern quite straightforward. This pattern is useful for various scenarios, such as message processing, printer spooling, or chat applications, where decoupling method execution from invocation and handling tasks concurrently is beneficial.