// Define your pipeline body classMyPipeline { public: voidoperator()(tbb::flow_control& fc)const{ // Your pipeline logic here // ... // Inform the pipeline that there is no more data fc.stop(); } };
static std::atomic<int> total_blocks(0); // Atomic counter to track the number of tasks processed static std::atomic<int> total_blocks2(0); // Atomic counter to track the number of tasks processed
// Function to process each data element voidprocess_data(int i, std::mutex& mtx){ // std::this_thread::sleep_for(std::chrono::milliseconds(1)); // Simulate some processing time for (int i = 0; i < 100; ++i) ; }
tbb::concurrent_unordered_map<std::thread::id, tbb::concurrent_vector<int>> thread_task_counts; // To store task counts for each thread tbb::concurrent_unordered_map<std::thread::id, tbb::concurrent_vector<int>> thread_task_counts2; // To store task counts for each thread
tbb::parallel_for(0, static_cast<int>((data.size() + grain_size - 1) / grain_size), [&](int i) { total_blocks++; int cnt = 0; // Thread-local variable to avoid data for (int j = i * grain_size; j < std::min(static_cast<int>(data.size()), (i + 1) * grain_size); ++j) { process_data(i, mtx); ++cnt; } thread_task_counts[std::this_thread::get_id()].push_back(cnt); });
tbb::parallel_for(blocked_range<int>(0u, data.size(), grain_size), [&](const blocked_range<int>& r) { total_blocks2++; int cnt = 0; // Thread-local variable to avoid data for (int i = r.begin(); i < r.end(); ++i) { process_data(i, mtx); ++cnt; } thread_task_counts2[std::this_thread::get_id()].push_back(cnt); });
$ mkdir build && cd build && cmake .. && make $ ./tbb_thread_pool > result.txt $ cat result.txt | grep running | sort | uniq Task arena thread 140667163379264 is running. Task arena thread 140667167639104 is running. Task arena thread 140667184678464 is running. Task arena thread 140667201848896 is running. Task group thread 140667167639104 is running. Task group thread 140667171898944 is running. Task group thread 140667176158784 is running. Task group thread 140667180418624 is running. Task group thread 140667188938304 is running. Task group thread 140667210303040 is running.
从这两行日志可以看出,arena 和 group 重用了同一个线程 ID ,说明它们同属于同一个全局线程池。
1 2
Task arena thread 140667167639104 is running. Task group thread 140667167639104 is running.
// The first parallel loop. oneapi::tbb::parallel_for( 0, N1, []( int i ) { // The second parallel loop. oneapi::tbb::parallel_for( 0, N2, []( int j ) { /* Some work */ } ); } );
如果当前线程被 parallel_for “阻塞”(不是真正的阻塞,只能称为 a blocking parallel construct),那么该线程被允许拿取第一个循环的任务来执行。这会导致即使是同一个线程内,也可出现乱序执行的情况。在大多数情况下,这没有什么危害。
oneapi::tbb::enumerable_thread_specific<int> ets; oneapi::tbb::parallel_for( 0, N1, [&ets]( int i ) { // Set a thread specific value ets.local() = i; oneapi::tbb::parallel_for( 0, N2, []( int j ) { /* Some work */ } ); // While executing the above parallel_for, the thread might have run iterations // of the outer parallel_for, and so might have changed the thread specific value. assert( ets.local()==i ); // The assertion may fail! } );
oneapi::tbb::enumerable_thread_specific<int> ets; oneapi::tbb::task_arena nested; oneapi::tbb::parallel_for( 0, N1, [&]( int i ) { // Set a thread specific value ets.local() = i; nested.execute( []{ // Run the inner parallel_for in a separate arena to prevent the thread // from taking tasks of the outer parallel_for. oneapi::tbb::parallel_for( 0, N2, []( int j ) { /* Some work */ } ); } ); assert( ets.local()==i ); // Valid assertion } );
intmain(){ constint N1 = 1000, N2 = 1000; oneapi::tbb::enumerable_thread_specific<int> ets; oneapi::tbb::parallel_for( 0, N1, [&ets]( int i ) { // Set a thread specific value ets.local() = i; // Run the second parallel loop in an isolated region to prevent the current thread // from taking tasks related to the outer parallel loop. oneapi::tbb::this_task_arena::isolate( []{ oneapi::tbb::parallel_for( 0, N2, []( int j ) { /* Some work */ } ); } ); assert( ets.local()==i ); // Valid assertion } ); return0; }