import threading
class BoundedBlockingQueue:
def __init__(self, capacity: int):
self.capacity = capacity
self.queue = []
self.lock = threading.Lock()
self.not_full = threading.Semaphore(capacity)
self.not_empty = threading.Semaphore(0)
def enqueue(self, element: int) -> None:
self.not_full.acquire()
with self.lock:
self.queue.append(element)
self.not_empty.release()
def dequeue(self) -> int:
self.not_empty.acquire()
with self.lock:
val = self.queue.pop(0)
self.not_full.release()
return val
def size(self) -> int:
with self.lock:
return len(self.queue)
#include <mutex>
#include <condition_variable>
#include <queue>
class BoundedBlockingQueue {
private:
std::queue<int> q;
int cap;
std::mutex mtx;
std::condition_variable not_full, not_empty;
public:
BoundedBlockingQueue(int capacity) : cap(capacity) {}
void enqueue(int element) {
std::unique_lock<std::mutex> lock(mtx);
not_full.wait(lock, [&]() { return q.size() < cap; });
q.push(element);
not_empty.notify_one();
}
int dequeue() {
std::unique_lock<std::mutex> lock(mtx);
not_empty.wait(lock, [&]() { return !q.empty(); });
int val = q.front();
q.pop();
not_full.notify_one();
return val;
}
int size() {
std::lock_guard<std::mutex> lock(mtx);
return q.size();
}
};
import java.util.*;
import java.util.concurrent.locks.*;
class BoundedBlockingQueue {
private Queue<Integer> queue;
private int capacity;
private Lock lock;
private Condition notFull, notEmpty;
public BoundedBlockingQueue(int capacity) {
this.capacity = capacity;
this.queue = new LinkedList<>();
this.lock = new ReentrantLock();
this.notFull = lock.newCondition();
this.notEmpty = lock.newCondition();
}
public void enqueue(int element) throws InterruptedException {
lock.lock();
try {
while (queue.size() == capacity) {
notFull.await();
}
queue.offer(element);
notEmpty.signal();
} finally {
lock.unlock();
}
}
public int dequeue() throws InterruptedException {
lock.lock();
try {
while (queue.isEmpty()) {
notEmpty.await();
}
int val = queue.poll();
notFull.signal();
return val;
} finally {
lock.unlock();
}
}
public int size() {
lock.lock();
try {
return queue.size();
} finally {
lock.unlock();
}
}
}
class Semaphore {
constructor(count) {
this.count = count;
this.waiters = [];
}
async acquire() {
if (this.count > 0) {
this.count--;
} else {
await new Promise(resolve => this.waiters.push(resolve));
}
}
release() {
if (this.waiters.length > 0) {
this.waiters.shift()();
} else {
this.count++;
}
}
}
class BoundedBlockingQueue {
constructor(capacity) {
this.capacity = capacity;
this.queue = [];
this.notFull = new Semaphore(capacity);
this.notEmpty = new Semaphore(0);
this.lock = Promise.resolve();
}
async enqueue(element) {
await this.notFull.acquire();
await this.lock;
let unlock;
this.lock = new Promise(resolve => unlock = resolve);
this.queue.push(element);
unlock();
this.notEmpty.release();
}
async dequeue() {
await this.notEmpty.acquire();
await this.lock;
let unlock;
this.lock = new Promise(resolve => unlock = resolve);
const val = this.queue.shift();
unlock();
this.notFull.release();
return val;
}
size() {
return this.queue.length;
}
}
The "Design Bounded Blocking Queue" problem asks you to implement a thread-safe queue that supports blocking operations with a fixed maximum capacity. You must provide three methods:
enqueue(element)
: Adds element
to the queue. If the queue is full, this operation should block until space becomes available.dequeue()
: Removes and returns the front element from the queue. If the queue is empty, this operation should block until an item is available.size()
: Returns the current number of elements in the queue.
The queue needs to be safe for use by multiple threads concurrently. The capacity is fixed (bounded), so you must ensure that enqueue
blocks when full and dequeue
blocks when empty. You cannot use built-in thread-safe queue classes; you must implement synchronization yourself.
The key challenge is to ensure that multiple threads can safely add to and remove from the queue without causing race conditions or data corruption. We need to coordinate threads so that:
capacity
elements are ever in the queue.A naive approach would be to use a simple list or array, but this would not be safe in a multi-threaded environment. We need to use synchronization primitives (like locks, semaphores, or condition variables) to manage access and control blocking behavior.
The brute-force idea is to use a lock for every operation, but that doesn't provide the required blocking behavior. Instead, we need a way for threads to wait (block) until the condition they need is true (e.g., space available for enqueue, items available for dequeue).
To design a bounded blocking queue, we combine three main concepts:
Here’s how each method works:
The synchronization primitives (semaphores, mutexes, or conditions) ensure that threads block and wake up appropriately, and that only one thread modifies the queue at a time.
This design ensures:
Suppose the queue has a capacity of 2
.
enqueue(1)
. The queue is empty, so it adds 1
and returns. Queue: [1]
enqueue(2)
. The queue has room, so it adds 2
. Queue: [1, 2]
enqueue(3)
. The queue is full (size == 2), so Thread C blocks and waits.
dequeue()
. The queue is not empty, so it removes 1
and returns it. Queue: [2]
3
. Queue: [2, 3]
dequeue()
. Removes 2
and returns it. Queue: [3]
At each step, blocking and waking up are handled automatically by the synchronization primitives.
Brute-force approach: If we used busy-waiting (constantly checking if the queue is full/empty), time complexity would be poor and CPU usage high.
Optimized approach (with proper blocking):
The key improvement is efficient waiting (threads are blocked, not spinning), so the queue is both fast and fair.
The bounded blocking queue problem demonstrates how to combine data structures with concurrency control. The elegant solution uses semaphores or condition variables for blocking, and locks for mutual exclusion, ensuring that all operations are thread-safe and efficient. The design is modular, easy to reason about, and avoids busy-waiting, making it suitable for real-world multi-threaded applications.