具有共享全域性佇列的多個 producerconsumer 示例
下面的程式碼展示了多個 Producer / Consumer 程式。Producer 和 Consumer 執行緒共享相同的全域性佇列。
import java.util.concurrent.*;
import java.util.Random;
public class ProducerConsumerWithES {
public static void main(String args[]) {
BlockingQueue<Integer> sharedQueue = new LinkedBlockingQueue<Integer>();
ExecutorService pes = Executors.newFixedThreadPool(2);
ExecutorService ces = Executors.newFixedThreadPool(2);
pes.submit(new Producer(sharedQueue, 1));
pes.submit(new Producer(sharedQueue, 2));
ces.submit(new Consumer(sharedQueue, 1));
ces.submit(new Consumer(sharedQueue, 2));
pes.shutdown();
ces.shutdown();
}
}
/* Different producers produces a stream of integers continuously to a shared queue,
which is shared between all Producers and consumers */
class Producer implements Runnable {
private final BlockingQueue<Integer> sharedQueue;
private int threadNo;
private Random random = new Random();
public Producer(BlockingQueue<Integer> sharedQueue,int threadNo) {
this.threadNo = threadNo;
this.sharedQueue = sharedQueue;
}
@Override
public void run() {
// Producer produces a continuous stream of numbers for every 200 milli seconds
while (true) {
try {
int number = random.nextInt(1000);
System.out.println("Produced:" + number + ":by thread:"+ threadNo);
sharedQueue.put(number);
Thread.sleep(200);
} catch (Exception err) {
err.printStackTrace();
}
}
}
}
/* Different consumers consume data from shared queue, which is shared by both producer and consumer threads */
class Consumer implements Runnable {
private final BlockingQueue<Integer> sharedQueue;
private int threadNo;
public Consumer (BlockingQueue<Integer> sharedQueue,int threadNo) {
this.sharedQueue = sharedQueue;
this.threadNo = threadNo;
}
@Override
public void run() {
// Consumer consumes numbers generated from Producer threads continuously
while(true){
try {
int num = sharedQueue.take();
System.out.println("Consumed: "+ num + ":by thread:"+threadNo);
} catch (Exception err) {
err.printStackTrace();
}
}
}
}
輸出:
Produced:69:by thread:2
Produced:553:by thread:1
Consumed: 69:by thread:1
Consumed: 553:by thread:2
Produced:41:by thread:2
Produced:796:by thread:1
Consumed: 41:by thread:1
Consumed: 796:by thread:2
Produced:728:by thread:2
Consumed: 728:by thread:1
等等 …………….
說明:
sharedQueue
,它是一個LinkedBlockingQueue
,在所有 Producer 和 Consumer 執行緒之間共享。- 生產者執行緒連續每 200 毫秒生成一個整數,並將其附加到
sharedQueue
Consumer
thread 連續消耗sharedQueue
的整數。- 該程式是在沒有明確的
synchronized
或Lock
結構的情況下實現的。 BlockingQueue 是實現它的關鍵。
BlockingQueue 實現主要用於生產者 - 消費者佇列。
BlockingQueue 實現是執行緒安全的。所有排隊方法都使用內部鎖或其他形式的併發控制以原子方式實現其效果。