Simulate a multithreaded producer-consumer problem using Java threads and semaphores. Implement a producer that generates items and adds them to a shared buffer, and a consumer that retrieves and processes items from the same buffer. Synchronize access to the buffer using semaphores to prevent race conditions and ensure that the producer doesn't add items to a full buffer and the consumer doesn't try to retrieve items from an empty buffer.
Specifically:
ArrayList
or any other suitable data structure with a fixed capacity.Your solution should demonstrate:
Simulate a multithreaded producer-consumer problem using Java threads and semaphores. Implement a producer that generates items and adds them to a shared buffer, and a consumer that retrieves and processes items from the same buffer. Synchronize access to the buffer using semaphores to prevent race conditions and ensure that the producer doesn't add items to a full buffer and the consumer doesn't try to retrieve items from an empty buffer.
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.Semaphore;
public class ProducerConsumer {
private static final int BUFFER_CAPACITY = 5;
private static final List<Integer> buffer = new ArrayList<>(BUFFER_CAPACITY);
private static final Semaphore empty = new Semaphore(BUFFER_CAPACITY);
private static final Semaphore full = new Semaphore(0);
private static final Semaphore mutex = new Semaphore(1);
public static void main(String[] args) {
Thread producerThread = new Thread(() -> {
try {
for (int i = 0; i < 10; i++) {
empty.acquire(); // Decrement empty slots, wait if buffer is full
mutex.acquire(); // Acquire lock for critical section
buffer.add(i); // Produce
System.out.println("Producer added: " + i + ", Buffer size: " + buffer.size());
mutex.release(); // Release lock
full.release(); // Increment full slots
Thread.sleep((int) (Math.random() * 100)); // Simulate work
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
});
Thread consumerThread = new Thread(() -> {
try {
for (int i = 0; i < 10; i++) {
full.acquire(); // Decrement full slots, wait if buffer is empty
mutex.acquire(); // Acquire lock for critical section
int item = buffer.remove(0); // Consume
System.out.println("Consumer consumed: " + item + ", Buffer size: " + buffer.size());
mutex.release(); // Release lock
empty.release(); // Increment empty slots
Thread.sleep((int) (Math.random() * 100)); // Simulate work
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
});
producerThread.start();
consumerThread.start();
try {
producerThread.join();
consumerThread.join();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
System.out.println("Producer and Consumer finished.");
}
}
ArrayList
named buffer
represents the shared buffer between the producer and consumer. Its capacity is set to 5
.empty
: Counts the number of empty slots in the buffer. Initialized to BUFFER_CAPACITY
.full
: Counts the number of filled slots in the buffer. Initialized to 0
.mutex
: A binary semaphore (lock) to protect access to the buffer, ensuring mutual exclusion.0
to 9
.empty.acquire()
. This decrements the empty
semaphore, indicating that one less slot is available. If empty
is zero, the producer waits until a slot becomes available.mutex
semaphore to ensure exclusive access to the buffer.mutex
semaphore is released, and full.release()
is called to increment the full
semaphore, indicating that one more slot is filled.full.acquire()
. This decrements the full
semaphore, indicating that one less item is available. If full
is zero, the consumer waits until an item becomes available.mutex
semaphore to ensure exclusive access to the buffer.mutex
semaphore is released, and empty.release()
is called to increment the empty
semaphore, indicating that one more slot is available.empty
and full
semaphores ensure that the producer doesn't add items to a full buffer and the consumer doesn't try to retrieve items from an empty buffer.mutex
semaphore ensures that only one thread can access the buffer at a time, preventing race conditions.The buffer has a capacity of 5
. The producer generates 10
items, and the consumer consumes 10
items. The output shows the producer adding items and the consumer consuming items, with the buffer size being updated accordingly.
n
times (in our case, 10). Inside the loop, acquire()
and release()
semaphore operations take O(1) time. Adding to the ArrayList buffer.add(i)
takes O(1) on average, assuming there's enough allocated capacity. Therefore, the time complexity for the producer is O(n).n
times. Inside this loop, acquire()
and release()
are O(1) operations. However, buffer.remove(0)
takes O(n) time because it involves shifting all subsequent elements in the ArrayList to fill the gap. Therefore, the time complexity for the consumer is O(n^2).ArrayList
) has a fixed capacity (BUFFER_CAPACITY
). Therefore, the space used by the buffer is O(BUFFER_CAPACITY), which is O(1) since BUFFER_CAPACITY
is a constant.empty
, full
, mutex
) each take a constant amount of space, so their space usage is O(1).i
in the producer and consumer takes O(1) space.empty
semaphore will block the producer until the consumer consumes some items and releases empty slots.full
semaphore will block the consumer until the producer adds more items and releases filled slots.InterruptedException
by interrupting the current thread. This ensures that the threads can be interrupted and terminated gracefully if needed.BUFFER_CAPACITY
is a critical parameter. A larger capacity allows for more buffering, potentially improving throughput, but also increases memory usage. A smaller capacity reduces memory usage but may limit throughput.