Simplified Implementation of the Guarded Suspension Pattern in Python for Easy Concurrency

Photo by Pixabay: https://www.pexels.com/photo/vehicles-parked-inside-elevated-parking-lot-63294/

Introduction

In multithreaded applications, it’s common for one thread to let another know when specific conditions are met, like when data is ready or a time-consuming task is finished. In Python, you can use threads, locks and conditions to achieve this. This example demonstrates implementing a GuardedGarage to park ExpensiveCar objects/

Implementation in Python

We will start by implementing the ExpensiveCar class:

class ExpensiveCar:
    _brand: str = None
    _color: str = None

    def __init__(self, brand: str, color: str):
        self._brand = brand
        self._color = color

    def __str__(self) -> str:
        return f"brand: {self._brand}, color: {self._color}"

    def __repr__(self) -> str:
        return str(self)

In our example, a car is identified by its brand and its color.

Next we come to the implementation of the GuardedGarage. This class has three fields:

  1. The queue containing the parked cars. In our case we use a dequeue so we can easily pop the ‘oldest’ element from the left side of the queue
  2. A lock to make sure that only thread can access the queue at a time
  3. The condition is used to signal other threads that a car has been added to the queue.
  4. The park() method locks the thread, and appends the car to the queue. Then it signals the condition so that listening threads can act upon this event.
  5. The get() method waits for the queue to have some contents. As long as there is no content, it waits on the condition. If there is something in the queue, it returns the left-most element of the queue.
class GuardedGarage:
    _queue = None
    _lock: threading.Lock = None
    _condition: threading.Condition = None

    def __init__(self):
        self._queue = deque()
        self._lock = threading.Lock()
        self._condition = threading.Condition(lock=self._lock)

    def park(self, car: ExpensiveCar):
        with self._lock:
            self._queue.append(car)
            self._condition.notify()

    def get(self) -> ExpensiveCar:
        with self._lock:
            while not self._queue:
                self._condition.wait()
            return self._queue.popleft()

Testing time

Let’s look at the main function:

if __name__ == "__main__":
    guarded_garage = GuardedGarage()


    def producer():
        for i in range(10):
            expensive_car: ExpensiveCar = ExpensiveCar(f"Car {i}", f"Red {i}")
            print(f"Parking car {expensive_car}\n")
            guarded_garage.park(expensive_car)

    def consumer():
        for _ in range(10):
            expensive_car: ExpensiveCar = guarded_garage.get()
            print(f"Got a car: {expensive_car}")

    producer_thread=threading.Thread(target=producer)
    consumer_thread=threading.Thread(target=consumer)

    producer_thread.start()
    consumer_thread.start()

    producer_thread.join()
    consumer_thread.join()

A couple of notes:

  1. We create a GuardedGarage object.
  2. In the producer() function we create 10 cars, and park them
  3. As an analogue, the consumer() function takes these cars out of the garage.
  4. Then we wrap both of those functions in threads, which we start
  5. The join() function waits for the threads to finish.

Conclusion

Using the standard library functions it is quite easy to achieve this form of thread-synchronization in Python. The only problem I see is that that get() method could possibly run into some form of deadlock, if no cars become available for a long time. This is a problem I will address in a coming post.

Leave a Reply

Your email address will not be published. Required fields are marked *