template <typename Index, typename Function, typename Partitioner> voidparallel_for_impl(Index first, Index last, Index step, const Function& f, Partitioner& partitioner){ if (step <= 0 ) internal::throw_exception(internal::eid_nonpositive_step); // throws std::invalid_argument elseif (last > first) { // Above "else" avoids "potential divide by zero" warning on some platforms Index end = (last - first - Index(1)) / step + Index(1); tbb::blocked_range<Index> range(static_cast<Index>(0), end); // 创建tbb::blocked_range对象 internal::parallel_for_body<Function, Index> body(f, first, step); // 创建parallel_for_body结构提, 这个是对模板对象Body的具体实现, 每个算法都会自行实现一个. tbb::parallel_for(range, body, partitioner); //调用算法内部的parallel_for方法执行算法的并行计算 } }
从parallel_for_body的定义中可以看到其执行过程和串行无差别:
1 2 3 4 5 6 7 8 9 10 11 12 13
parallel_for_body( const Function& _func, Index& _begin, Index& _step ) : my_func(_func), my_begin(_begin), my_step(_step) {} voidoperator()( const tbb::blocked_range<Index>& r )const{ // A set of local variables to help the compiler with vectorization of the following loop. Index b = r.begin(); Index e = r.end(); Index ms = my_step; Index k = my_begin + b*ms; ... for ( Index i = b; i < e; ++i, k += ms ) { my_func( k ); } }
template<typename SchedulerTraits> void custom_scheduler<SchedulerTraits>::local_wait_for_all( task& parent, task* child ) { ... task* t = child; ... #if TBB_USE_EXCEPTIONS // 如果使用exception会通过条件编译加上一层try..catch // Infinite safeguard EH loop for (;;) { try { #endif/* TBB_USE_EXCEPTIONS */// 下面的注释解释地很明白了. 外层循环从全局队列中获得任务, 中间循环从本地任务队列中偷取任务, 内层执行任务. // Outer loop receives tasks from global environment (via mailbox, FIFO queue(s), // and by stealing from other threads' task pools). // All exit points from the dispatch loop are located in its immediate scope. for(;;) { // Middle loop retrieves tasks from the local task pool. for(;;) { // Inner loop evaluates tasks coming from nesting loops and those returned // by just executed tasks (bypassing spawn or enqueue calls). if ( !process_bypass_loop( context_guard, __TBB_ISOLATION_ARG(t, isolation) ) ) { // 这里的process_bypass_loop是核心功能函数 ... // Check "normal" exit condition when parent's work is done. if ( parent.prefix().ref_count == 1 ) { __TBB_ASSERT( !cleanup, NULL ); __TBB_control_consistency_helper(); // on ref_count ITT_NOTIFY( sync_acquired, &parent.prefix().ref_count ); goto done; } ... // Retrieve the task from local task pool. ... t = is_task_pool_published() ? get_task( __TBB_ISOLATION_EXPR( isolation ) ) : NULL; ... if ( !t ) // No tasks in the local task pool. Go to stealing loop. break; }; // end of local task pool retrieval loop ... t = receive_or_steal_task( __TBB_ISOLATION_ARG( parent.prefix().ref_count, isolation ) ); // 通过这里来完成任务窃取及负载均衡的逻辑 if ( !t ) { ... goto done; } } // end of infinite stealing loop #if TBB_USE_EXCEPTIONS __TBB_ASSERT( false, "Must never get here" ); } // end of try-block TbbCatchAll( my_innermost_running_task->prefix().context ); ... #endif/* TBB_USE_EXCEPTIONS */ done: // 执行到这里的worker就会完成arena中任务的执行并退出 ... if ( !ConcurrentWaitsEnabled(parent) ) { if ( parent.prefix().ref_count != 1) { // This is a worker that was revoked by the market. __TBB_ASSERT( worker_outermost_level(), "Worker thread exits nested dispatch loop prematurely" ); return; } parent.prefix().ref_count = 0; } ... }
template<typename StartType, typename Range> voidexecute(StartType &start, Range &range){ // The algorithm in a few words ([]-denotes calls to decision methods of partitioner): // [If this task is stolen, adjust depth and divisions if necessary, set flag]. // If range is divisible { // Spread the work while [initial divisions left]; // Create trap task [if necessary]; // } // If not divisible or [max depth is reached], execute, else do the range pool part if ( range.is_divisible() ) { // 检查range对象是否还能被切分 if ( self().is_divisible() ) { // 检查是否还需要再切割任务 do { // split until is divisible typename Partition::split_type split_obj = self().template get_split<Range>(); start.offer_work( split_obj ); //调用具体算法实现的offer_work生成一个新的task分发出去 } while ( range.is_divisible() && self().is_divisible() ); } } self().work_balance(start, range); // 调用work_balance进行负载的平衡 } // 位于include/tbb/parallel_for.h中实现 //! spawn right task, serves as callback for partitioner voidoffer_work(typename Partitioner::split_type& split_obj){ spawn( *new( allocate_sibling(static_cast<task*>(this), sizeof(start_for)) ) start_for(*this, split_obj) ); }
template<typename StartType, typename Range> voidwork_balance(StartType &start, Range &range){ if( !range.is_divisible() || !self().max_depth() ) { start.run_body( range ); // simple partitioner goes always here } else { // do range pool internal::range_vector<Range, range_pool_size> range_pool(range); do { // 不断尝试切分任务并同时执行任务直到所有任务完成. range_pool.split_to_fill(self().max_depth()); // fill range pool if( self().check_for_demand( start ) ) { // 通过check_for_demand检查执行时间是否满足大于40000个时钟cycle的设置, 如果任务执行时间过小, 那么任务就不再进行切割以避免浪费调度开销. 这种短时任务直接串行执行性能反而会更好. if( range_pool.size() > 1 ) { start.offer_work( range_pool.front(), range_pool.front_depth() ); range_pool.pop_front(); continue; } if( range_pool.is_divisible(self().max_depth()) ) // was not enough depth to fork a task continue; // note: next split_to_fill() should split range at least once } start.run_body( range_pool.back() ); // 调用算法的run_body方法执行具体任务 range_pool.pop_back(); } while( !range_pool.empty() && !start.is_cancelled() ); } } boolcheck_for_demand( task &t ){ if( pass == my_delay ) { if( self().my_divisor > 1 ) // produce affinitized tasks while they have slot in array returntrue; // do not do my_max_depth++ here, but be sure range_pool is splittable once more elseif( self().my_divisor && my_max_depth ) { // make balancing task self().my_divisor = 0; // once for each task; depth will be decreased in align_depth() returntrue; } elseif( flag_task::is_peer_stolen(t) ) { my_max_depth += __TBB_DEMAND_DEPTH_ADD; // __TBB_DEMAND_DEPTH_ADD是1 returntrue; } } elseif( begin == my_delay ) { ... my_dst_tsc = __TBB_time_stamp() + __TBB_task_duration(); // 这里__TBB_time_stamp()返回的是40000 my_delay = run; } elseif( run == my_delay ) { if( __TBB_time_stamp() < my_dst_tsc ) { __TBB_ASSERT(my_max_depth > 0, NULL); my_max_depth--; // increase granularity since tasks seem having too small work returnfalse; } my_delay = pass; returntrue; ... } returnfalse; } //! Run body for range, serves as callback for partitioner voidrun_body( Range &r ){ fgt_alg_begin_body( tbb::internal::PARALLEL_FOR_TASK, (void *)const_cast<Body*>(&(this->my_body)), (void*)this ); my_body( r ); // 调用parallel_for_body完成任务的执行. fgt_alg_end_body( (void *)const_cast<Body*>(&(this->my_body)) ); }
// 代码位于src/tbb/market.cpp中 void market::process( job& j ) { generic_scheduler& s = static_cast<generic_scheduler&>(j); // s.my_arena can be dead. Don't access it until arena_in_need is called arena *a = s.my_arena; ... for (int i = 0; i < 2; ++i) { while ( (a = arena_in_need(a)) ) { a->process(s); // 如果有新的arena产生, 则调用其process方法, s就是使用的scheduler类型 a = NULL; // to avoid double checks in arena_in_need(arena*) for the same priority level } ... } ... }
// 代码位于src/tbb/arena.cpp中 voidarena::process( generic_scheduler& s ){ ... size_t index = occupy_free_slot</*as_worker*/true>( s ); ... if ( index == out_of_arena ) goto quit; // 超出arena的slot数量限制就不再加入其中 ... s.attach_arena( this, index, /*is_master*/false ); ...
// Task pool can be marked as non-empty if the worker occupies the slot left by a master. if ( s.my_arena_slot->task_pool != EmptyTaskPool ) { ... s.local_wait_for_all( *s.my_dummy_task, NULL ); // 开始执行scheduler的local_wait_for_all, 其与主线程逻辑相同 ... }
for ( ;; ) { ... if ( is_recall_requested() ) break; // Try to steal a task. // Passing reference count is technically unnecessary in this context, // but omitting it here would add checks inside the function. task* t = s.receive_or_steal_task( __TBB_ISOLATION_ARG( s.my_dummy_task->prefix().ref_count, no_isolation ) ); // 如果还有任务没完成, 则会偷取其他线程的任务帮其执行直到无事可做 if (t) { ... s.local_wait_for_all(*s.my_dummy_task,t); } } ... quit: ... on_thread_leaving<ref_worker>(); // 离开对应的arena }