Introduction
In multithreaded applications, it’s common for one thread to let another know when specific conditions are met, like when data is ready or a time-consuming task is finished. In Python, you can use threads, locks and conditions to achieve this. This example demonstrates implementing a GuardedGarage
to park ExpensiveCar
objects/
Implementation in Python
We will start by implementing the ExpensiveCar
class:
class ExpensiveCar:
_brand: str = None
_color: str = None
def __init__(self, brand: str, color: str):
self._brand = brand
self._color = color
def __str__(self) -> str:
return f"brand: {self._brand}, color: {self._color}"
def __repr__(self) -> str:
return str(self)
In our example, a car is identified by its brand and its color.
Next we come to the implementation of the GuardedGarage
. This class has three fields:
- The queue containing the parked cars. In our case we use a
dequeue
so we can easily pop the ‘oldest’ element from the left side of the queue - A lock to make sure that only thread can access the queue at a time
- The condition is used to signal other threads that a car has been added to the queue.
- The
park()
method locks the thread, and appends the car to the queue. Then it signals the condition so that listening threads can act upon this event. - The
get()
method waits for the queue to have some contents. As long as there is no content, it waits on the condition. If there is something in the queue, it returns the left-most element of the queue.
class GuardedGarage:
_queue = None
_lock: threading.Lock = None
_condition: threading.Condition = None
def __init__(self):
self._queue = deque()
self._lock = threading.Lock()
self._condition = threading.Condition(lock=self._lock)
def park(self, car: ExpensiveCar):
with self._lock:
self._queue.append(car)
self._condition.notify()
def get(self) -> ExpensiveCar:
with self._lock:
while not self._queue:
self._condition.wait()
return self._queue.popleft()
Testing time
Let’s look at the main function:
if __name__ == "__main__":
guarded_garage = GuardedGarage()
def producer():
for i in range(10):
expensive_car: ExpensiveCar = ExpensiveCar(f"Car {i}", f"Red {i}")
print(f"Parking car {expensive_car}\n")
guarded_garage.park(expensive_car)
def consumer():
for _ in range(10):
expensive_car: ExpensiveCar = guarded_garage.get()
print(f"Got a car: {expensive_car}")
producer_thread=threading.Thread(target=producer)
consumer_thread=threading.Thread(target=consumer)
producer_thread.start()
consumer_thread.start()
producer_thread.join()
consumer_thread.join()
A couple of notes:
- We create a
GuardedGarage
object. - In the
producer()
function we create 10 cars, and park them - As an analogue, the
consumer()
function takes these cars out of the garage. - Then we wrap both of those functions in threads, which we start
- The
join()
function waits for the threads to finish.
Conclusion
Using the standard library functions it is quite easy to achieve this form of thread-synchronization in Python. The only problem I see is that that get()
method could possibly run into some form of deadlock, if no cars become available for a long time. This is a problem I will address in a coming post.