一個來源 - 工作流水線 - 一個水槽
我們希望並行處理資料並將其推送到其他工作人員處理的行。
由於 Workers 既消費又生產資料,我們必須建立兩個佇列:
first_input_source = Queue.new
first_output_sink = Queue.new
100.times { |i| first_input_source << i }
第一波工人從 first_input_source
讀取一個專案,處理該專案,並在 first_output_sink
中寫入結果:
(1..16).to_a.map do
Thread.new do
loop do
item = first_input_source.pop
first_output_source << item ** 2
first_output_source << item ** 3
end
end
end
第二波工人使用 first_output_sink
作為其輸入源並讀取,然後處理寫入另一個輸出接收器:
second_input_source = first_output_sink
second_output_sink = Queue.new
(1..32).to_a.map do
Thread.new do
loop do
item = second_input_source.pop
second_output_sink << item * 2
second_output_sink << item * 3
end
end
end
現在 second_output_sink
是接收器,讓我們將它轉換為陣列:
sleep 5 # workaround in place of synchronization
sink = second_output_sink
[].tap { |a| a << sink.pop until sink.empty? }