0%

TBB并发库代码学习

TBB并发库代码学习

基本介绍

TBB是Intel开发的一个用于并行计算的C++组件库, 现已开源为OneTBB. TBB的全称为Thread Building Blocks. 他有一个优势就是即使你不是写并发线程的高手, 也可以轻松地通过其完成并行任务的执行. 其引用领域适合HPC等CPU密集的计算场景.

简单使用

举个使用的例子最容易理解, 以其最常用的方法 parallel_for 为例:

如果我有一个数组矩阵, 想并行地对每个元素进行计算(纯CPU计算). 串行的执行方式可能如下

1
2
3
for(int i=0;i < taskSize; ++i) {
do_task(task[i]);
}

这种方式在数据量小的时候可以被接受, 但是一旦taskSize很大了, 则会非常影响执行效率并且浪费多处理的CPU资源. 大家一般都会考虑多线程的方式完成.
如果使用TBB的话, 代码可以这么写:

1
2
3
tbb::parallel_for(0, taskSize, [&](int i) {
do_task(task[i]);
});

这样就非常容易地将执行任务分发到多个CPU上进行, 并且其负载是动态均衡的. 由于在工作中用到它, 并且要搞清楚其内部的一些机制(怎么做负载均衡,怎么分配和调度任务), 因而有了本文的一些学习总结.

代码学习

架构原理理解

学习新的代码首先肯定是要搞清楚其设计的架构,有哪些概念, 以及总体的代码框架大概是什么样子. 我这里学习的代码是TBB 2020的版本.

首先在Intel的TBB专家的建议下, 我阅读了Pro TBB这本书. 从书中大致了解的其设计结构和一些概念名称. 这里也推荐想学习的人先阅读下这本书(参考中给出了下载地址) . TBB的工作线程worker是会在第一次使用时自动启动的, 并且在使用结束后可以继续被后续并发计算请求复用. 它有一个global worker pool/market的概念, 用于存放哪些空闲的没事可做的线程. 而当我们的代码执行到tbb::parallel_for这里时, 就会生成一个task_arena, 这里发起这个并发任务的线程被成为master thread/main thread, 空闲的worker就会被调度到这个task_arena中去获取任务并完成计算任务. task_arena中的可以容纳的worker数量(slot)为master thread 可用的CPU资源的数量减1, 而master thread自己也在其中占一个slot. 当一个worker发现在这个arena中无任务可做时就会离开返回global thead pool中睡眠直到有新的arena再次召唤. 在执行时每个线程都会将任务做适当的切分成多个小的任务块放到自己的本地任务队列中待依次执行, 而新进入的thread则会从其他的thread的任务队列中偷取一部分过来, 完成后再尝试偷取直到没有任务可做.

下载源代码后可见其使用的是cmake工具进行编译的. 在build中存在一个build.py的脚本可以快速编译lib库. 其中主要头文件都在include中, 实现代码都在src下.

parallel_for

由于我们的使用场景没有涉及到flow, 这里我重点看下最基本的算法组件parallel_for的实现.

头文件是include/tbb/parallel_for.h, 就从这里看起, 看下我们调用tbb::parallel_for后都发生了什么. 头文件中定义了大量的tbb::parallel_for入口函数的模板, 例如最简单的入口:

1
2
3
4
5
6
7
8
9
template <typename Index, typename Function>
void parallel_for(Index first, Index last, const Function& f) {
parallel_for_impl<Index,Function,const auto_partitioner>(first, last, static_cast<Index>(1), f, auto_partitioner()); // 这里会默认step为1, 分割器为auto_partitioner
}

template <typename Index, typename Function>
void parallel_for(Index first, Index last, Index step, const Function& f) {
parallel_for_impl<Index,Function,const auto_partitioner>(first, last, step, f, auto_partitioner());
}

如果第四个参数不指定TBB的调度器的任务分割器, 就会默认使用auto_partitioner, 其他可以使用的分割器有:simple_partitioner, static_partitioner, affinity_partitioner.分割器代表着如何进行任务拆分和分配, 是负载均衡的保证, 后面会单独进行代码分析.

所有的tbb::parallel_for最终都会调用parallel_for_impl, 这里由于支持task_group会存在编译时选择而有两个parallel_for_impl, 我们暂时忽略task_group的版本.

1
2
3
4
5
6
7
8
9
10
11
12
template <typename Index, typename Function, typename Partitioner>
void parallel_for_impl(Index first, Index last, Index step, const Function& f, Partitioner& partitioner) {
if (step <= 0 )
internal::throw_exception(internal::eid_nonpositive_step); // throws std::invalid_argument
else if (last > first) {
// Above "else" avoids "potential divide by zero" warning on some platforms
Index end = (last - first - Index(1)) / step + Index(1);
tbb::blocked_range<Index> range(static_cast<Index>(0), end); // 创建tbb::blocked_range对象
internal::parallel_for_body<Function, Index> body(f, first, step); // 创建parallel_for_body结构提, 这个是对模板对象Body的具体实现, 每个算法都会自行实现一个.
tbb::parallel_for(range, body, partitioner); //调用算法内部的parallel_for方法执行算法的并行计算
}
}

parallel_for_body的定义中可以看到其执行过程和串行无差别:

1
2
3
4
5
6
7
8
9
10
11
12
13
parallel_for_body( const Function& _func, Index& _begin, Index& _step )
: my_func(_func), my_begin(_begin), my_step(_step) {}
void operator()( const tbb::blocked_range<Index>& r ) const {
// A set of local variables to help the compiler with vectorization of the following loop.
Index b = r.begin();
Index e = r.end();
Index ms = my_step;
Index k = my_begin + b*ms;
...
for ( Index i = b; i < e; ++i, k += ms ) {
my_func( k );
}
}

而内部实现的parallel_for则是类似下面的那样的定义, 即调用start_forrun完成并发启动.

1
2
3
4
template<typename Range, typename Body>
void parallel_for( const Range& range, const Body& body, const auto_partitioner& partitioner ) {
internal::start_for<Range,Body,const auto_partitioner>::run(range,body,partitioner);
}

查看start_for的定义, 其继承自task, run的内部实现如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
static void run(  const Range& range, const Body& body, Partitioner& partitioner ) {
if( !range.empty() ) {
#if !__TBB_TASK_GROUP_CONTEXT || TBB_JOIN_OUTER_TASK_GROUP
start_for& a = *new(task::allocate_root()) start_for(range,body,partitioner); // 在arena中生成一个task任务,
#else
// Bound context prevents exceptions from body to affect nesting or sibling algorithms,
// and allows users to handle exceptions safely by wrapping parallel_for in the try-block.
task_group_context context(PARALLEL_FOR);
start_for& a = *new(task::allocate_root(context)) start_for(range,body,partitioner);
#endif /* __TBB_TASK_GROUP_CONTEXT && !TBB_JOIN_OUTER_TASK_GROUP */
// REGION BEGIN
fgt_begin_algorithm( tbb::internal::PARALLEL_FOR_TASK, (void*)&context ); // intel的trace工具断点
task::spawn_root_and_wait(a); // 分派任务arena, 触发其他线程的scheduler加入来完成并发任务.
fgt_end_algorithm( (void*)&context );
// REGION END
}
}

在这个头文件中还会定义void run_body( Range &r ), void offer_work(typename Partitioner::split_type& split_obj)task* start_for<Range,Body,Partitioner>::execute()这些方法, 这些方法会在调度器和分割器中被调用到,后续用到再回来看.

schedule和partition

task::spawn_root_and_wait定义在include/tbb/task.h中, 内部会调用tbb::internal::generic_scheduler::spawn_root_and_wait并最终调用scheduler的local_spawn_root_and_wait. 其实现在src/tbb/scheduler.cpp中:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
void generic_scheduler::local_spawn_root_and_wait( task* first, task*& next ) {
...
auto_empty_task dummy( __TBB_CONTEXT_ARG(this, first->prefix().context) ); //dummy即是task
internal::reference_count n = 0;
for( task* t=first; ; t=t->prefix().next ) {
++n;
...
t->prefix().parent = &dummy;
if( &t->prefix().next==&next ) break;
...
}
dummy.prefix().ref_count = n+1; // 如果这个引用计数为1则任务完成
if( n>1 ) // 对于单个parallel_for任务, 这里n == 1.
local_spawn( first->prefix().next, next );
local_wait_for_all( dummy, first ); // 这里调用后, 其他worker就会加入到arena中, 并也调用其scheduler的local_spawn_root_and_wait方法来执行task任务.
}

local_wait_for_all的定义位于src/tbb/custom_scheduler.h中, 代码很多, 重点看下非条件编译的代码和核心的逻辑:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
template<typename SchedulerTraits>
void custom_scheduler<SchedulerTraits>::local_wait_for_all( task& parent, task* child ) {
...
task* t = child;
...
#if TBB_USE_EXCEPTIONS // 如果使用exception会通过条件编译加上一层try..catch
// Infinite safeguard EH loop
for (;;) {
try {
#endif /* TBB_USE_EXCEPTIONS */ // 下面的注释解释地很明白了. 外层循环从全局队列中获得任务, 中间循环从本地任务队列中偷取任务, 内层执行任务.
// Outer loop receives tasks from global environment (via mailbox, FIFO queue(s),
// and by stealing from other threads' task pools).
// All exit points from the dispatch loop are located in its immediate scope.
for(;;) {
// Middle loop retrieves tasks from the local task pool.
for(;;) {
// Inner loop evaluates tasks coming from nesting loops and those returned
// by just executed tasks (bypassing spawn or enqueue calls).
if ( !process_bypass_loop( context_guard, __TBB_ISOLATION_ARG(t, isolation) ) ) { // 这里的process_bypass_loop是核心功能函数
...
// Check "normal" exit condition when parent's work is done.
if ( parent.prefix().ref_count == 1 ) {
__TBB_ASSERT( !cleanup, NULL );
__TBB_control_consistency_helper(); // on ref_count
ITT_NOTIFY( sync_acquired, &parent.prefix().ref_count );
goto done;
}
...
// Retrieve the task from local task pool.
...
t = is_task_pool_published() ? get_task( __TBB_ISOLATION_EXPR( isolation ) ) : NULL;
...
if ( !t ) // No tasks in the local task pool. Go to stealing loop.
break;
}; // end of local task pool retrieval loop
...
t = receive_or_steal_task( __TBB_ISOLATION_ARG( parent.prefix().ref_count, isolation ) ); // 通过这里来完成任务窃取及负载均衡的逻辑
if ( !t ) {
...
goto done;
}
} // end of infinite stealing loop
#if TBB_USE_EXCEPTIONS
__TBB_ASSERT( false, "Must never get here" );
} // end of try-block
TbbCatchAll( my_innermost_running_task->prefix().context );
...
#endif /* TBB_USE_EXCEPTIONS */
done: // 执行到这里的worker就会完成arena中任务的执行并退出
...
if ( !ConcurrentWaitsEnabled(parent) ) {
if ( parent.prefix().ref_count != 1) {
// This is a worker that was revoked by the market.
__TBB_ASSERT( worker_outermost_level(),
"Worker thread exits nested dispatch loop prematurely" );
return;
}
parent.prefix().ref_count = 0;
}
...
}

process_bypass_loop也定义在同一个文件中, 简化核心逻辑后代码如下, 从中可以看到其执行过程中会不断地尝试获得一个task, 并调用其execute方法执行, 并且会返回下一个执行的task任务直到没有任务可以执行.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
template<typename SchedulerTraits>
bool custom_scheduler<SchedulerTraits>::process_bypass_loop(
context_guard_helper</*report_tasks=*/SchedulerTraits::itt_possible>& context_guard,
__TBB_ISOLATION_ARG(task* t, isolation_tag isolation) )
{
while ( t ) {
...
task* t_next = NULL;
my_innermost_running_task = t;
t->prefix().owner = this;
t->prefix().state = task::executing;
...
{
...
t_next = t->execute();
...
if (t_next) {
...
} // if there is bypassed task
}
...
t = t_next;
} // end of scheduler bypass loop
return true;
}

而这个execute方法是一个虚拟方法, 最终就是调用了parallel_forstart_forexecute方法.跳回去看这个方法则主要功能是调用分配器的execute方法完成执行动作.

1
2
3
4
5
6
7
//! execute task for parallel_for
template<typename Range, typename Body, typename Partitioner>
task* start_for<Range,Body,Partitioner>::execute() {
my_partition.check_being_stolen( *this );
my_partition.execute(*this, my_range);
return NULL;
}

所有分配器都定义在include/tbb/partitioner.h中了. 其中用了多态的实现方式, 基类是partition_type_base, 有两个execute的具体实现, 分布是simple_partition_typepartitiontype_base的. 由于simple_partition的分割功能定义就很简单所以会单独实现一个简单的执行过程. 我们关系auto_partition是如何切分任务的, 那么就直接看base中的实现:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
template<typename StartType, typename Range>
void execute(StartType &start, Range &range) {
// The algorithm in a few words ([]-denotes calls to decision methods of partitioner):
// [If this task is stolen, adjust depth and divisions if necessary, set flag].
// If range is divisible {
// Spread the work while [initial divisions left];
// Create trap task [if necessary];
// }
// If not divisible or [max depth is reached], execute, else do the range pool part
if ( range.is_divisible() ) { // 检查range对象是否还能被切分
if ( self().is_divisible() ) { // 检查是否还需要再切割任务
do { // split until is divisible
typename Partition::split_type split_obj = self().template get_split<Range>();
start.offer_work( split_obj ); //调用具体算法实现的offer_work生成一个新的task分发出去
} while ( range.is_divisible() && self().is_divisible() );
}
}
self().work_balance(start, range); // 调用work_balance进行负载的平衡
}

// 位于include/tbb/parallel_for.h中实现
//! spawn right task, serves as callback for partitioner
void offer_work(typename Partitioner::split_type& split_obj) {
spawn( *new( allocate_sibling(static_cast<task*>(this), sizeof(start_for)) ) start_for(*this, split_obj) );
}

work_balance的代码功能是在struct dynamic_grainsize_mode : Mode中实现的, 大致意图就是不断循环检查是否可以切分任务并同时执行任务, 直到所有任务完成或被取消:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
template<typename StartType, typename Range>
void work_balance(StartType &start, Range &range) {
if( !range.is_divisible() || !self().max_depth() ) {
start.run_body( range ); // simple partitioner goes always here
}
else { // do range pool
internal::range_vector<Range, range_pool_size> range_pool(range);
do { // 不断尝试切分任务并同时执行任务直到所有任务完成.
range_pool.split_to_fill(self().max_depth()); // fill range pool
if( self().check_for_demand( start ) ) { // 通过check_for_demand检查执行时间是否满足大于40000个时钟cycle的设置, 如果任务执行时间过小, 那么任务就不再进行切割以避免浪费调度开销. 这种短时任务直接串行执行性能反而会更好.
if( range_pool.size() > 1 ) {
start.offer_work( range_pool.front(), range_pool.front_depth() );
range_pool.pop_front();
continue;
}
if( range_pool.is_divisible(self().max_depth()) ) // was not enough depth to fork a task
continue; // note: next split_to_fill() should split range at least once
}
start.run_body( range_pool.back() ); // 调用算法的run_body方法执行具体任务
range_pool.pop_back();
} while( !range_pool.empty() && !start.is_cancelled() );
}
}

bool check_for_demand( task &t ) {
if( pass == my_delay ) {
if( self().my_divisor > 1 ) // produce affinitized tasks while they have slot in array
return true; // do not do my_max_depth++ here, but be sure range_pool is splittable once more
else if( self().my_divisor && my_max_depth ) { // make balancing task
self().my_divisor = 0; // once for each task; depth will be decreased in align_depth()
return true;
}
else if( flag_task::is_peer_stolen(t) ) {
my_max_depth += __TBB_DEMAND_DEPTH_ADD; // __TBB_DEMAND_DEPTH_ADD是1
return true;
}
} else if( begin == my_delay ) {
...
my_dst_tsc = __TBB_time_stamp() + __TBB_task_duration(); // 这里__TBB_time_stamp()返回的是40000
my_delay = run;
} else if( run == my_delay ) {
if( __TBB_time_stamp() < my_dst_tsc ) {
__TBB_ASSERT(my_max_depth > 0, NULL);
my_max_depth--; // increase granularity since tasks seem having too small work
return false;
}
my_delay = pass;
return true;
...
}
return false;
}

//! Run body for range, serves as callback for partitioner
void run_body( Range &r ) {
fgt_alg_begin_body( tbb::internal::PARALLEL_FOR_TASK, (void *)const_cast<Body*>(&(this->my_body)), (void*)this );
my_body( r ); // 调用parallel_for_body完成任务的执行.
fgt_alg_end_body( (void *)const_cast<Body*>(&(this->my_body)) );
}

parallel_for_each,parallel_do和task_group

parallel_for_each查看其代码发现其实现依赖了parallel_forparallel_do, 而parallel_do也是同样实现了execute,run等抽象方法, 具体逻辑类似就不再赘述.

task_group的执行大体与上面一致, 不过其使用上不同:

1
2
3
4
5
6
7
tbb::task_group task_group;
for (int i =0;i<taskSize;i++) {
task_group.run([&] {
do_task(tasks[i]);
});
}
task_group.wait();

因此其调度是通过wait: my_root->wait_for_all();, run: task::spawn( *prepare_task< internal::task_handle_task<F> >(h) );来完成的.

worker的调度

上面代码阅读中会产生一个疑问, 为何主线程执行local_wait_for_all后, 其他可用的空闲worker就会加入到arena中工作呢? 这就要看每个worker是如何实现的了. 其实每个worker都会在调用run启动后,不断的调用market::process去接收任务并加入执行, 其内部通过调用arena::process最终完成worker的调度和执行逻辑.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
// 代码位于src/tbb/private_server.cpp中
void private_worker::run() {
...
::rml::job& j = *my_client.create_one_job();
while( my_state!=st_quit ) {
if( my_server.my_slack>=0 ) {
my_client.process(j); // 调用 market::process
} else {
// 等待直到有任务时被唤醒
...
}
}
...
}

// 代码位于src/tbb/market.cpp中
void market::process( job& j ) {
generic_scheduler& s = static_cast<generic_scheduler&>(j);
// s.my_arena can be dead. Don't access it until arena_in_need is called
arena *a = s.my_arena;
...
for (int i = 0; i < 2; ++i) {
while ( (a = arena_in_need(a)) ) {
a->process(s); // 如果有新的arena产生, 则调用其process方法, s就是使用的scheduler类型
a = NULL; // to avoid double checks in arena_in_need(arena*) for the same priority level
}
...
}
...
}

// 代码位于src/tbb/arena.cpp中
void arena::process( generic_scheduler& s ) {
...
size_t index = occupy_free_slot</*as_worker*/true>( s );
...
if ( index == out_of_arena )
goto quit; // 超出arena的slot数量限制就不再加入其中
...
s.attach_arena( this, index, /*is_master*/false );
...

// Task pool can be marked as non-empty if the worker occupies the slot left by a master.
if ( s.my_arena_slot->task_pool != EmptyTaskPool ) {
...
s.local_wait_for_all( *s.my_dummy_task, NULL ); // 开始执行scheduler的local_wait_for_all, 其与主线程逻辑相同
...
}

for ( ;; ) {
...
if ( is_recall_requested() )
break;
// Try to steal a task.
// Passing reference count is technically unnecessary in this context,
// but omitting it here would add checks inside the function.
task* t = s.receive_or_steal_task( __TBB_ISOLATION_ARG( s.my_dummy_task->prefix().ref_count, no_isolation ) ); // 如果还有任务没完成, 则会偷取其他线程的任务帮其执行直到无事可做
if (t) {
...
s.local_wait_for_all(*s.my_dummy_task,t);
}
}
...
quit:
...
on_thread_leaving<ref_worker>(); // 离开对应的arena
}

代码修改

通过以上阅读理解, 我对代码进行了一些修改, 给Body添加了一些tracer以在调用TBB时自动记录run_bodytask的数量, 并记录每个task执行的平均时间以及整个算法执行的时间.主要修改的位置就是上面parallel_for.h中的run, run_body, offer_workexecute, 具体代码就不展示了. 最终测试发现当并发进行1000个元素的数组的单个执行时间小于1us的并发任务时, body的数量可能只有200-300, task的数量则更少100+. 这里可以理解为每个task内的body都是串行执行的, task并行执行, 而一个body内执行多个元素的串行执行. 这个现象也是很有意思的, lib库从多个层面上尽可能地平衡并行数量和单个耗时.

总结

通过阅读TBB代码发现其代码使用了大量的抽象概念, 将调度的逻辑和算法的逻辑分开, 并划分成多个执行层次. 代码的可扩展性和灵活性都很强, 但同时阅读起来的难度也相对大一些. 通过看代码也发现了其调度的时间几乎就是代码的常数执行时间, 其层层都注意进行负载的均衡处理, 尽可能地保证任务的执行时长被均匀地分给每个worker承担, 并且在多个线程同时发起tbb任务(只有这个arena中有任务worker就不走,会一直工作直到完成所有任务, 多个线程同时提交给同一个arena的任务并不存在worker层面的区分)或者嵌套发起tbb任务(子任务会被加入同一个arena被worker领取处理)时都依然可以很容易地负载均衡. 同时自动分区的算法还注重任务数量和执行时间的平衡, 如果单个body执行时间过短, 就不会再切分执行, 改成串行的方式完成以降低调度分配任务的开销. 整个这个执行过程是一个动态平衡的过程, 刚开始只有主线程自己工作, 随着其他worker的加入, 任务被不断二分到其他worker, 并且谁先完成就继续窃取别人的任务继续工作直到大家都完成.
可以说TBB库非常适合HPC计算场景下压榨CPU性能.

参考

  1. 书本Pro TBB, https://link.springer.com/book/10.1007/978-1-4842-4398-5
  2. 代码库: https://github.com/oneapi-src/oneTBB