Thursday, December 13, 2012

How to Solve Producer Consumer Problem in Java

Change the preJava5 flag to true if you are running Java 4 and below. Change preJava5 flag to true if you are running Java 5 and above.
import java.util.LinkedList;
import java.util.Random;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;

public class ProducerConsumer {
    public static final int MAX_SIZE = 5;
    public static final int SLEEP_TIME = 5 * 100;
    
    public static class PreJava5 {
        public static class Consumer {
            private Random random = new Random();
            
            public void consume(LinkedList<Integer> queue) {
                while (true) {
                    try {
                        synchronized (queue) {
                            if (queue.isEmpty()) {
                                System.out.println("Queue is empty");
                                queue.wait();
                            } else {
                                System.out.println("Consuming something, size="
                                    + queue.size());
                                queue.removeFirst();
                                queue.notifyAll();
                            }
                        }
                        // to simulate performing some busy tasks
                        Thread.sleep(random.nextInt(SLEEP_TIME));
                    } catch (InterruptedException e) {
                        Thread.currentThread().interrupt();
                    }
                }
            }
        }
        
        public static class Producer {
            private Random random = new Random();
            
            public void produce(LinkedList<Integer> queue) {
                while (true) {
                    try {
                        synchronized (queue) {
                            if (queue.size() == MAX_SIZE) {
                                System.out.println("Queue is full");
                                queue.wait();
                            } else {
                                System.out.println("Producing something, size="
                                    + queue.size());
                                queue.addLast(0);
                                queue.notifyAll();
                            }
                        }
                        // to simulate performing some busy tasks
                        Thread.sleep(random.nextInt(SLEEP_TIME));
                    } catch (InterruptedException e) {
                        Thread.currentThread().interrupt();
                    }
                }
            }
        }
    }
    
    public static class Java5AndAbove {
        public static class Consumer {
            private Random random = new Random();
            
            public void consume(BlockingQueue<Integer> queue) {
                while (true) {
                    try {
                        System.out.println("Consuming something, size="
                            + queue.size());
                        // will block if the queue is empty
                        queue.take();
                        // to simulate performing some busy tasks
                        Thread.sleep(random.nextInt(SLEEP_TIME));
                    } catch (InterruptedException e) {
                        Thread.currentThread().interrupt();
                    }
                }
            }
        }
        
        public static class Producer {
            private Random random = new Random();
            
            public void produce(BlockingQueue<Integer> queue) {
                while (true) {
                    try {
                        System.out.println("Producing something, size="
                            + queue.size());
                        // will block if the queue is full
                        queue.put(0);
                        // to simulate performing some busy tasks
                        Thread.sleep(random.nextInt(SLEEP_TIME));
                    } catch (InterruptedException e) {
                        Thread.currentThread().interrupt();
                    }
                }
            }
        }
    }
    
    public static void main(String[] args) {
        boolean preJava5 = false;
        
        if (preJava5) {
            final LinkedList<Integer> queue = new LinkedList<>();
            final PreJava5.Producer producer = new PreJava5.Producer();
            final PreJava5.Consumer consumer = new PreJava5.Consumer();
            Thread t1 = new Thread(new Runnable() {
                @Override
                public void run() {
                    producer.produce(queue);
                }
            });
            Thread t2 = new Thread(new Runnable() {
                @Override
                public void run() {
                    consumer.consume(queue);
                }
            });
            t1.start();
            t2.start();
        } else {
            final BlockingQueue<Integer> queue = new LinkedBlockingQueue<>(MAX_SIZE);
            final Java5AndAbove.Producer producer = new Java5AndAbove.Producer();
            final Java5AndAbove.Consumer consumer = new Java5AndAbove.Consumer();
            
            ExecutorService es1 = Executors.newSingleThreadExecutor();
            ExecutorService es2 = Executors.newSingleThreadExecutor();
            
            es1.execute(new Runnable() {
                @Override
                public void run() {
                    producer.produce(queue);
                }
            });
            
            es2.execute(new Runnable() {
                @Override
                public void run() {
                    consumer.consume(queue);
                }
            });
        }
    }
}

No comments:

Post a Comment