2.2.1 流水线作业

流水作业通常与流数据的处理有关,如监视传送带上对象的相机拍摄的图像(比较 图 2.4)。在这种情况下,可以在处理对象 2 的图像的同时获取对象 3 的图像,并同时评估对象 1 的图像处理结果。

图 2.4:可能产生流式数据的设置。

2.2.1.1 流水线的基本设计

各个处理步骤,如图像采集、图像处理和结果评估,由不同的阶段执行,这些阶段由消息队列连接,一个阶段的输出是下一个阶段的输入(见 图 2.5)。因此,一个流水线由(通常)多个生产者/消费者对组成。

图 2.5:由图像采集、图像处理和结果评估三个阶段组成的简单流水线。

图 2.5 中的流水线包括三个阶段,由两个消息队列连接。所有三个阶段都在独立的线程中并行运行。图像采集阶段获取图像。每幅图像都被放入一条消息,然后在消息队列 1中入队列。消息队列作为 FIFO(先进先出)缓冲器,将连续的阶段连接起来。如果图像处理阶段已准备好处理下一幅图像,而消息队列 1 中又有消息可用,那么图像处理阶段就会从消息队列 1 的首部出队列一个消息。它处理消息中包含的图像,并将处理结果添加到消息中。然后,它将该消息入队列消息队列 2。最后,结果评估阶段将消息从消息队列 2 的首部出队列,并对结果进行评估,例如,启动某些操作,如吹掉有缺陷的产品。

由于线程并行运行,第一幅图像的结果评估可与第二幅图像的处理并行执行,也可与第三幅图像的采集并行执行。通常情况下,不同阶段的处理时间是不相等的。例如,在 图 2.6 中,图像处理阶段所需的时间是结果评估阶段的两倍。

流水线的周期时间是完成连续任务之间的时间间隔。流水线的吞吐量是其周期时间的倒数。请注意,流水线的吞吐量受其最慢吞吐量的限制。

图 2.6:简单流水线的调度图。

由于流水线的各个阶段是并行执行的,其吞吐量通常高于按顺序执行各个阶段的程序的吞吐量(见 图 2.7)。

图 2.7:顺序程序的调度图。

流水线实施示例

流水线的实施遵循以下模式:

  1. 将各阶段的功能放入单个函数中。
  2. 创建和配置消息队列。
  3. 为彻底终止流水线准备数据结构。
  4. 在单个线程中启动函数。

HDevelop 示例程序 hdevelop/System/Multithreading/pipeline_one_thread_per_stage.hdev 显示了与 图 2.5图 2.6 所示类似的流水线实施。

首先,必须将不同阶段的功能放入单个函数中。这里使用的是 acquire_images(获取图像)process_images(处理图像)evaluate_results(评估结果)三个过程。必须从一个阶段传递到下一个阶段的数据将通过消息队列传递。由于我们有三个阶段,因此需要两个消息队列来连接这些阶段。它们是通过 create_message_queue 算子创建的:

create_message_queue (QueueOriginalImage)
create_message_queue (QueueImageProcessingResult)

通常情况下,应限制消息队列的大小,以控制程序的内存使用量。这可以通过以下调用来实施:

MessageQueueMaxMessageNum := 200
set_message_queue_param (QueueOriginalImage, 'max_message_num', \
                         MessageQueueMaxMessageNum)
set_message_queue_param (QueueImageProcessingResult, 'max_message_num', \
                         MessageQueueMaxMessageNum)

需要注意的是,消息队列的大小应选择得足够大,以便能够处理短期的峰值负载。例如,如果在短时间内采集图像的速度超过了处理速度,则会在消息队列中缓冲。

为了能够停止图像采集,我们在程序 acquire_images 中使用了一个监控事件。

create_event ('', '', StopAcq)

此外,我们还通过相应的消息来传播图像采集已停止的消息。一旦图像采集停止,图像采集阶段就会生成该消息。它表示接下来不会再有其他消息。为了识别这条消息,我们定义了一个特定的消息密钥:

TerminationKey := 'terminate'

然后,我们就可以在单个线程中启动函数了:

par_start<AcquisitionThread> : acquire_images (MeanAcquisitionInterval, \
                                               StopAcq, TerminationKey, \
                                               QueueOriginalImage)
par_start<ImageProcessingThread> : process_images (QueueOriginalImage, \
                                                   NumAOPThreadsForImageProcessing, \
                                                   TerminationKey, \
                                                   QueueImageProcessingResult)
par_start<ResultEvaluationThread> : evaluate_results (QueueImageProcessingResult, \
                                                      TerminationKey, \
                                                      WindowHandle)

在示例程序中,流水线只运行了几秒钟,然后通过信号事件 StopAcq 终止:

signal_event (StopAcq)

现在,监控事件 StopAcq 状态的图像采集阶段会终止并向后续阶段发送一条消息,后续阶段在评估这条消息后会立即自行终止。

最后,我们必须等待所有线程结束。

par_join ([AcquisitionThread,ImageProcessingThread,ResultEvaluationThread])

图像采集阶段实施示例

第一阶段(在此以函数 acquire_images 表示)的执行遵循以下模式:

  1. 获取数据。
  2. 创建新消息。
  3. 为消息添加数据和元信息。
  4. 将消息入队列到输出消息队列。
  5. 检查并处理标志着阶段终止的事件。

图像采集阶段在一个循环中采集图像,直到发出终止事件信号终止图像采集,或者连接图像采集阶段和图像处理阶段的消息队列达到极限时才会退出。

在我们的示例中,图像采集是通过从预读图像数组中随机选取一幅图像并等待一段时间来模拟两幅图像采集之间的实际时间间隔。在实际应用中,必须替换这两行才能获取真实图像。

repeat
    wait_to_simulate_non_trivial_acquisition_time (MeanAcquisitionInterval, \
                                                   ImageId, \
                                                   LastAcquisitionTime, \
                                                   LastAcquisitionTime)
    acquire_random_image (Images, Image)

对于每幅图像,我们都会创建一条消息,并将图像、图像 ID 和时间戳一起存储在这条消息中。

    create_message (MessageImg)
    set_message_obj (Image, MessageImg, 'image')
    set_message_tuple (MessageImg, 'image_id', ImageId)
    set_message_tuple (MessageImg, 'acquisition_time', AcquisitionTime)

最后,消息会被入队列连接图像采集阶段和图像处理阶段的消息队列。由于消息队列的大小是有限的,因此我们必须考虑到消息队列已经包含了最大允许消息数的情况。为此,我们使用了一个函数,该函数可根据需要进行配置,以处理这种情况。

    enqueue_message_to_limited_queue (MessageImg, QueueOut, \
                                      'throw_exception')

在这里,我们希望在消息队列达到极限时抛出异常。在实际应用中,这种情况必须单独谨慎处理。它表明接下来的某个阶段无法以所需的速度处理图像。因为降低图像采集速率往往不是理想的解决方案,必须提高流水线的速度。参见 "加快流水线速度" 一节 了解加快流水线速度的可能方法。

最后,我们通过检查事件 StopAcq 的状态来确定是否应终止图像采集。只要该事件没有发出信号,图像采集阶段就会继续采集图像。

    try_wait_event (StopAcq, Busy)
until (not Busy)

如果事件 StopAcq 发出信号,图像采集循环就会结束,整个流水线就可以终止。为确保所有已获取的图像仍在处理中,我们需要创建一条包含上述定义的终止键的消息,并将其作为最后一条消息在消息队列中排队。在这种情况下,我们必须确保消息入队列到消息队列中。因此,如果消息队列已达到极限,我们不会抛出异常,而是等待消息入队列。

create_message (TerminationMessage)
set_message_tuple (TerminationMessage, TerminationKey, 'true')
enqueue_message_to_limited_queue (TerminationMessage, QueueOut, 'wait')

图像处理阶段实施示例

中间阶段的实施(此处由函数 process_images 表示)遵循以下模式:

  1. 从输入消息队列中出队列一个消息。
  2. 处理消息中的数据。
  3. 将处理结果添加到消息中。
  4. 将消息入队列到输出消息队列。

所有这些步骤都在一个循环中进行,如果流水线终止,循环就会退出。

首先,从输入消息队列中出队列一条消息:

while (1)
    dequeue_message (QueueIn, [], [], MessageHandle)

然后,我们检查图像采集是否已终止,因此必须终止流水线:

    get_message_param (MessageHandle, 'key_exists', TerminationKey, \
                       Terminate)

如果我们必须终止流水线,则需要以下两个步骤:

  1. 将终止消息入队列 到输出队列,以便向后续阶段通报终止消息。
  2. 退出循环。

    if (Terminate)
        enqueue_message_to_limited_queue (MessageHandle, QueueOut, 'wait')
        break
    endif

如果没有要求终止,我们可以从报文中提取输入数据(此处:图像),执行某些操作(此处:图像处理),并将结果(此处:缺陷数量)添加到消息中:

    get_message_obj (Image, MessageHandle, 'image')
    image_processing (Image, ImageReduced, RegionUnion, RingSize, \
                      PolarResolution, SmoothX, Number)
    set_message_tuple (MessageHandle, 'num_defects', Number)

最后,我们将消息入队列到输出队列:

    enqueue_message_to_limited_queue (MessageHandle, QueueOut, 'wait')
endwhile

在这里,如果输出队列已达到极限,则无需抛出异常。我们只需等待,直到可以将消息入队列。只要我们还在等待,就不能从输入队列中删除其他消息。如果等待时间过长,输入队列就会达到极限,图像采集阶段就会抛出相应的异常。

结果评估阶段实施示例

最后阶段(此处由函数 evalu_results 表示)的实施遵循以下模式:

  1. 从输入消息队列中出队列消息。
  2. 处理消息中的数据。

所有这些步骤都在一个循环中进行,直到流水线结束时才退出循环。

首先,从输入消息队列中出队列一条消息:

while (1)
    dequeue_message (QueueIn, [], [], MessageHandle)

然后,我们检查图像采集是否已终止,因此必须终止流水线:

    get_message_param (MessageHandle, 'key_exists', TerminationKey, \
                       Terminate)
    if (Terminate)
        break
    endif
endwhile

现在,我们可以评估结果了。在示例中,我们只需计算响应时间、处理时间和平均采集间隔,并将数据可视化。在实际应用中,这些结果可用于控制缺陷部件的剔除。

运行情况

图 2.8 显示了上述 HDevelop 示例程序中实施的简单流水线的运行情况。横轴上绘制的是最近 720 幅图像。橙色线表示采集间隔,即连续采集两幅图像之间的时间间隔。采集间隔会出现一些变化,具体是由图像采集函数引起的。洋红色线表示图像处理阶段所需的时间,该时间在所有图像中基本保持不变。蓝线表示响应时间,即从获取图像到完成结果评估之间的时间。

(a) (b)

(c)

图 2.8:简单流水线的运行情况: (a) 近似恒定的采集间隔;(b) 变化的采集间隔;(c) 变化的采集间隔导致短期峰值负载。

图 2.8(a) 中,图像采集的时间间隔大致相等,均为 6 毫秒。请注意,图像采集的速度不能快于流水线后续阶段中最慢的阶段。在我们的示例中,这个阶段就是图像处理阶段,大约需要 4.5 毫秒。在上述 6 毫秒的图像采集间隔下,我们可以在响应时间几乎不变的情况下实施 \nicefrac1(\Unit6ms) ≈ \Unit160\nicefrac[\textnormal]imagessec 的吞吐量。

图 2.8(b)和(c)中,图像采集间隔随时间变化,导致短期峰值负载。只要流水线(更准确地说:图像采集阶段之后最慢的阶段)足够快,就不会对响应时间产生负面影响。从 图 2.8(b)中可以看出,图像采集速度总是慢于处理速度。这自然限制了可实施的吞吐量,因为在任何时候,图像采集速度都不能超过后续阶段的速度。在我们示例的这种配置中,只能处理 \nicefrac1(\Unit12ms) ≈ \Unit80\nicefrac[\textnormal]imagessec ,而且响应时间几乎不变。

如果响应时间不那么重要,在有限的时间内,图像采集间隔甚至可以低于处理一幅图像所需的时间(见 图 2.8(c))。只要图像处理的速度快于图像采集的速度(洋红色线低于橙色线),响应时间就会略高于图像处理时间。在这种情况下,消息队列通常最多包含一条消息。一旦采集间隔短于处理一幅图像所需的时间,采集的图像就必须在图像处理阶段的输入消息队列中缓冲。这些必须在消息队列中等待处理的图像的响应时间会急剧增加。一旦图像获取速度再次慢于图像处理速度,缓冲图像的数量就会减少,新获取图像的响应时间也会随之减少。请注意,消息队列必须配置得足够大,以便能够缓冲等待处理的图像。在我们示例的这种配置中,同样可以处理 \nicefrac1(\Unit6ms) ≈ \Unit160\nicefrac[\textnormal]imagessec ,但响应时间可能会大大降低。

2.2.1.2 加快流水线速度

提高流水线速度有两种可能的方法:

  1. 将最慢的阶段分成多个阶段。
  2. 在多个线程上并行执行最慢的阶段。

这两种方法都试图减少流水线最慢阶段所需的平均处理时间,因为这限制了整个流水线的速度。

虽然将速度最慢的阶段拆分成多个阶段是一种简单直接的方法,但其有效性取决于能否将该阶段拆分成大小(运行时间)大致相同的部分。如果做不到这一点,各个阶段的速度就会再次出现差异,流水线的速度也就无法达到最佳状态。

图 2.9 显示了将图像处理阶段分成两个阶段的流水线调度图。在这里,无法将图像处理阶段分成大小相等的两个阶段。相反,第一部分比第二部分大三倍。与 图 2.6 中描述的简单流水线相比,流水线的周期时间仅略有缩短。"图像处理 1" 阶段仍然明显慢于其他阶段,因此限制了流水线的吞吐量。

图 2.9: 有两个大小不等的图像处理阶段的流水线调度图。

第二种方法是在多个线程上并行执行最慢的阶段,这种方法需要更多的实施工作,但通常更容易使该阶段的线程达到更相似的工作量。图 2.10 展示了这样一个流水线,其中图像处理阶段在两个线程上并行运行。

图 2.10:图像处理阶段在两个线程上并行运行的流水线。注意多路复用器,它能确保结果以正确的顺序出现。

根据图像内容,最后启动的图像处理任务可能会在最先启动的任务之前完成。

如果需要按照任务启动的顺序显示结果,则必须在并行阶段之后插入一个多路复用器进入流水线。多路复用器可确保任务以与开始时相同的顺序返回。多路复用器将并行阶段的输出消息队列合并为一个单一的消息队列,其中按正确顺序包含任务。

如果结果的顺序无关紧要,例如,如果结果被写入数据库,则不需要多路复用器。在这种情况下,仍然可以通过图像 ID 来识别图像。

图 2.11 显示了上述流水线的调度图。当第一幅图像仍在处理时,第二幅图像的图像处理已经开始。与 图 2.6 相比,图像处理阶段的并行化(几乎)将流水线的吞吐量提高了一倍。

图 2.11:图像处理阶段在两个线程上并行运行的流水线调度图。

流水线实施示例

HDevelop 示例程序 hdevelop/System/Multithreading/pipeline_multiple_threads_per_stage.hdev 展示了图像处理阶段在多线程上并行运行的流水线的实施,即与 图 2.10图 2.11 类似的流水线。

下面仅介绍将 "流水线的基本设计" 一节 中描述的简单流水线转换为在多个线程上运行图像处理阶段的流水线所需的改动。

除了将图像处理结果传送到结果评估阶段的消息队列外,我们还需要多个消息队列来连接图像处理阶段和多路复用器:

for I := 0 to NumImageProcessingThreads - 1 by 1
    create_message_queue (QueueImageProcessingResultsVector.at(I))
    set_message_queue_param (QueueImageProcessingResultsVector.at(I), \
                             'max_message_num', MessageQueueMaxMessageNum)
endfor
convert_vector_to_tuple (QueueImageProcessingResultsVector, \
                         QueuesImageProcessingResult)

我们必须启动多个图像处理线程和一个多路复用器线程,而不是在图像处理阶段只启动一个线程:

for I := 0 to NumImageProcessingThreads - 1 by 1
    par_start<ImageProcessingThreadVector.at(I)> : process_images (QueueOriginalImage, \
                                             NumAOPThreadsForImageProcessing, \
                                             TerminationKey, \
                                             QueueImageProcessingResultsVector.at(I))
endfor
convert_vector_to_tuple (ImageProcessingThreadVector, \
                         ImageProcessingThreads)
par_start<MultiplexerThread> : multiplexer (QueuesImageProcessingResult, \
                                            'image_id', TerminationKey, \
                                            QueueImageProcessingResult)

我们还必须等待其他线程结束:

par_join ([AcquisitionThread,ImageProcessingThreads,MultiplexerThread, \
         ResultEvaluationThread])

为了确保正确终止流水线,我们必须对函数 process_images 的相应部分稍作修改。除了将终止消息入队列输出队列外,我们还必须将该消息入队列输入队列,以便通知正在运行函数 process_images 的其他线程:

    get_message_param (MessageHandle, 'key_exists', TerminationKey, \
                       Terminate)
    if (Terminate)
        enqueue_message_to_limited_queue (MessageHandle, QueueIn, 'wait')
        enqueue_message_to_limited_queue (MessageHandle, QueueOut, 'wait')
        break
    endif

最后,我们必须实现多路复用器。多路复用器确保结果评估阶段以正确的顺序接收处理结果。请注意,消息不需要排序或重新排序。由于每个输入消息队列都是本地排序的,因此多路复用器只需通过同时监控所有输入消息队列的头部来查找序列中的下一条消息。多路复用器根据图像 ID 确定所需的消息顺序,而图像 ID 是由图像采集阶段按递增顺序设置的。

多路复用器的基本功能由以下几行代码提供:

NumInputQueues := |QueuesIn|
LastId := -1
Buffers := gen_tuple_const(NumInputQueues,-1)
Ids := gen_tuple_const(NumInputQueues,LastId)
while (1)
    for I := 0 to NumInputQueues - 1 by 1
            if (Buffers[I] == -1)
                dequeue_message (QueuesIn[I], [], [], MessageHandle)
                get_message_tuple (MessageHandle, SortIdKey, Id)
                Buffers[I] := MessageHandle
                Ids[I] := Id
            endif
            if (Buffers[I] != -1 and Ids[I] == LastId + 1)
                enqueue_message_to_limited_queue (Buffers[I], QueueOut, \
                                                  'wait')
                Buffers[I] := -1
                LastId := Ids[I]
            endif
    endfor
endwhile

多路复用器代码的其余部分可在 HDevelop 示例程序 hdevelop/System/Multithreading/pipeline_multiple_threads_per_stage.hdev 中找到,主要处理流水线终止时的多路复用器终止问题,以及不太可能出现的情况,即某些信息丢失,从而导致图像 ID 序列出现缺口。

运行情况

图 2.12 显示了图像处理阶段在两个线程上运行的流水线的运行情况。它看起来与每个阶段只有一个线程的简单流水线相似(见 图 2.8),但有一个重要区别,即采集间隔可能会大大缩短。因此,系统的吞吐量显著提高。

(a) (b)

(c)

图 2.12:图像处理阶段在两个线程上运行的流水线的运行情况: (a) 近似恒定的采集间隔;b) 变化的采集间隔;(c) 变化的采集间隔导致短期峰值负载。