一个来源 - 工作流水线 - 一个水槽
我们希望并行处理数据并将其推送到其他工作人员处理的行。
由于 Workers 既消费又生产数据,我们必须创建两个队列:
first_input_source = Queue.new
first_output_sink = Queue.new
100.times { |i| first_input_source << i }
第一波工人从 first_input_source
读取一个项目,处理该项目,并在 first_output_sink
中写入结果:
(1..16).to_a.map do
Thread.new do
loop do
item = first_input_source.pop
first_output_source << item ** 2
first_output_source << item ** 3
end
end
end
第二波工人使用 first_output_sink
作为其输入源并读取,然后处理写入另一个输出接收器:
second_input_source = first_output_sink
second_output_sink = Queue.new
(1..32).to_a.map do
Thread.new do
loop do
item = second_input_source.pop
second_output_sink << item * 2
second_output_sink << item * 3
end
end
end
现在 second_output_sink
是接收器,让我们将它转换为数组:
sleep 5 # workaround in place of synchronization
sink = second_output_sink
[].tap { |a| a << sink.pop until sink.empty? }