Pipeline in TBB

TBB pipeline mimics traditional assembly lines to perform pipelined execution. It is useful when we have a linear sequence of stages. Data flows through a series of pipeline filters. Each filter takes an input, processes it, and produces an output. The first filter does not take any inputs, but produces output. Similarly, the last filter takes an input, but its output is discarded (Figure 1). Items in different filters may run simultaneously.

Figure 1. A pipeline with three stages. Taken from Expressing Pipeline Parallelism Using TBB Constructs, Reed et al.

These filters can operate in a serial or parallel way. Serial filters can be in order or out of order. pipeline and filter classes of TBB are used to implement this pattern. The idea behind to achieve speedup is receiving n chunks of data (This data can be read from a file, it can be generated, and so on.) very quickly and then process this n chunks concurrently while receiving another n chunks of data.

Classical thread per-stage approach faces with several problems. First of all, the speedup is limited to the number of the filters in the pipeline. Additionally, when a thread finishes a filter, it has to pass its data to the next filter’s thread. Finally, with a classical thread per-stage approach, the developer is forced to handle some bookkeeping issues. tbb::pipeline eliminates these problems.

tbb::pipeline uses the cache efficiently and it is biased toward finishing an old task before starting a new one. It processes a task as deep into the pipeline as possible before switching to another task automatically. In this way, memory performance improves since the processed item is more likely to remain in the cache for the each filter.

tbb::pipeline also hides and takes care of most of the bookkeeping. This bookkeeping is needed to keep the system running and it includes operations such as holding items arriving earlier due to load imbalance between filters, identifying item processing order, and so on.

Since, TBB takes care of these kinds of low level management tasks for us, we can free ourselves from these issues and only focus on the actual product. This also improves understandability and maintenance of the code.

However, tbb::pipeline is no perfect. tbb::pipeline has a fixed structure and cannot be updated in the run time. So, dynamically constructed pipelines are not an option with tbb::pipeline. Also, oversubscription (if you need to perform a task via oversubscription) do not perform well with TBB, since oversubscription is what TBB tries to avoid to achieve significant parallelism.

TBB filter Class

Filter is an abstract class and contains the algorithm of the process that will be applied to each input item. Filter classes are added into tbb::pipeline with respect to their flowing order. If we denote the position of filters as fi, then the execution starts with f0, and goes as f1, f2, and so on.

The mode of the filter (parallel, serial_out_of_order, and serial_in_order) is specified at the instantiation. A parallel filter runs concurrently and in no particular order. Obviously, parallel mode is preferred for the parallel speedup.

A serial_out_of_order filter runs serially (processes one item at a time) in no particular order. A serial_in_order filter also runs serially, but the execution order is set implicitly by the first serial_in_order filter and the all other serial_in_order filters follow this order. As you can guess, if a filter must be serial, then serial_out_of_order is preferred since it puts less constraints on processing order. However, this is not always possible:

Let’s assume that you have serial input (f0) and output (f2) filters and a parallel middle filter (f1). If you need to keep the output in the same order as the input, then you need to mark the serial filters as serial_in_order. In this way, if an item arrives out of order to a serial_in_order filter, then the pipeline automatically delays its process until its predecessors are handled.

There are some key rules to follow for being able to add our Filter classes to tbb::pipeline. First of all, the filter class should be inherited from tbb::filter class. Then, the virtual filter::operator() method should be overridden.

filter::operator() holds the algorithm (action) that will be applied to the Body objects. Additionally, filter::operator() should return a pointer specifying the Body object operated on, so the next filter can also process it. The first filter (f0) generates the stream and should return NULL if there are no more items in the stream. The return of the last filter (fn) is ignored.

As mentioned earlier, instances of the filters are added to the instance of the pipeline from first to last, i.e., from f0 to fn. A pipeline can have only one instance of a specific filter, and filters cannot be a member of more than one pipeline at a time.

Filters of parallel_pipeline may copy the Body object given. Thus, Body::operator() should be const to prevent any modification on the Body object, since the modification may or may not be visible to the thread invoking parallel_pipeline. Also, TBB 2.0 and prior treats parallel input stages as serial, but the later versions can execute a parallel input stage in parallel. This is another reason why filter::operator()‘d better thread safe.

You can see the methods of tbb::filter in the table below (taken from TBB filter Class):

Member Description
explicit filter( mode filter_mode )

Constructs a filter of the specified mode.

Note

Intel TBB 2.1 and prior had a similar constructor with a bool argument is_serial. That constructor exists but is deprecated (see Compatibility Features in the Appendices).

~filter()

Destroys the filter. If the filter is in a pipeline, it is automatically removed from that pipeline.

bool is_serial() const

Returns: False if filter mode is parallel; true otherwise.

bool is_ordered() const

Returns: True if filter mode is serial_in_order, false otherwise.

virtual void* operator()( void * item )

The derived filter should override this method to process an item and return a pointer to an item to be processed by the next filter. The item parameter is NULL for the first filter in the pipeline.

Returns: The first filter in a pipeline should return NULL if there are no more items to process. The result of the last filter in a pipeline is ignored.

virtual void finalize( void * item )

A pipeline can be cancelled by user demand or because of an exception. When a pipeline is cancelled, there may be items returned by a filter’s operator() that have not yet been processed by the next filter. When a pipeline is cancelled, the next filter invokes finalize() on each item instead of operator(). In contrast to operator(), method finalize() does not return an item for further processing. A derived filter should override finalize() to perform proper cleanup for an item. A pipeline will not invoke any further methods on the item.

Returns: The default definition has no effect.

If you need to execute a specific filter on a particular thread one day, then you will need the abstract thread_bound_filter base class. More information can be found on TBB thread_bound_filter Class page.

TBB pipeline Class

We need a pipeline class instance for being able to parallel our filters. After we create our pipeline instance, we can add our filters by pipeline::add_filter(filter&) method. Then, we can call pipeline:run( size_t max_number_of_live_tokens [,task_group_context& group] ) method to start the process.

max_number_of_live_tokens parameter provides an upper bound for the number of stages that will run concurrently. Once we reach to this limit, the pipeline does not let the first filter to create more items until an item is destroyed at the last filter.

Selecting the right value of max_number_of_live_tokens may require some experimentation. A value that is too low limits parallelism whereas a value that is too high may demand too many resources, such as memory consumption, since we will have more items in flight.

If we can provide enough processors and tokens, then the throughput of the pipeline becomes limited to the throughput of the slowest sequential filter. So, we should try to keep the sequential filters as fast as possible, and try to shift the time consuming work to the parallel filters.

You can see the methods of tbb::pipeline in the table below (taken from TBB pipeline Class):

Member Description
pipeline()

Constructs pipeline with no filters.

~pipeline()

Removes all filters from the pipeline and destroys the pipeline.

void add_filter( filter& f )

Appends filter f to sequence of filters in the pipeline. The filter f must not already be in a pipeline.

void run( size_t max_number_of_live_tokens[, task_group_context& group] )

Runs the pipeline until the first filter returns NULL and each subsequent filter has processed all items from its predecessor. The number of items processed in parallel depends upon the structure of the pipeline and number of available threads. At most max_number_of_live_tokens are in flight at any given time.

A pipeline can be run multiple times. It is safe to add stages between runs. Concurrent invocations of run on the same instance of pipeline are prohibited.

If the group argument is specified, pipeline’s tasks are executed in this group. By default the algorithm is executed in a bound group of its own.

void clear()

Removes all filters from the pipeline.

parallel_pipeline is used for lambda-based pipeline building and running. You can find more details about parallel_pipeline on TBB’s related page.

We need to allocate and free items when we need to pass them between the filters. Circular buffers can be used to minimize this situation. If the first and the last filters creating and consuming an item are both serial_in_order, then the items can be allocated and freed via a circular buffer. The size of the buffer should be at least max_number_of_live_tokens. The first filter can allocate the object, and the last filter can free it. For this task, a straightforward serial code with an array and two pointers (for the head and the tail) can be used without requiring any atomic operations, locking (since the users are serial filters), memory fences, and so on.

If the first and the last filters are not serial_in_order, then to keep the track of the buffer in use is required, since the buffers may not be retired in the same order they were allocated.

References