0%

TBB入门

1. 控制线程数

  • Method 1: Use the environment variable TBB_NUM_THREADS for the gloabl setting.
1
export TBB_NUM_THREADS=4

TODO: It doesn’t seem to work!

  • Method 2: Use tbb::task_arena or tbb::task_scheduler_init (Deprecated).

TBB will use this setting locally within the scope of the tbb::task_arena.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
#include <tbb/pipeline.h>
// Deprecated:
// #include <tbb/task_scheduler_init.h>
#include <tbb/task_arena.h>

// Define your pipeline body
class MyPipeline {
public:
void operator() (tbb::flow_control& fc) const {
// Your pipeline logic here
// ...
// Inform the pipeline that there is no more data
fc.stop();
}
};

int main() {
// Deprecated: tbb::task_scheduler_init init(1);
tbb::task_arena arena(4); // 4 threads
// Do some tasks:
tbb::parallel_pipeline(/* max_number_of_live_tokens */ 4, MyPipeline); // FIXME: 似乎需要放入 arena 的 execute 函数中

return 0;
}

2. parallel_for

API: parallel_for

  1. my_parallel_for 模拟 parallel_for 的实现:
my_parallel_for.cppview raw
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
#include <tbb/tbb.h>
#include <vector>
#include <iostream>

// 模拟 parallel_for 的内部实现
void my_parallel_for(const tbb::blocked_range<size_t>& range, const std::function<void(const tbb::blocked_range<size_t>&)>& body) {
if (range.is_divisible()) {
// 分割范围
tbb::blocked_range<size_t> left(range.begin(), range.begin() + (range.end() - range.begin()) / 2);
tbb::blocked_range<size_t> right(left.end(), range.end());

// 递归调用
tbb::parallel_invoke(
[&] { my_parallel_for(left, body); },
[&] { my_parallel_for(right, body); }
);
} else {
// 处理当前范围
body(range);
}
}

int main() {
std::vector<int> data(100);
for (int i = 0; i < 100; ++i) {
data[i] = i;
}

// 使用自定义的 parallel_for 进行并行处理
my_parallel_for(tbb::blocked_range<size_t>(0, data.size()), [&](const tbb::blocked_range<size_t>& r) {
for (size_t i = r.begin(); i != r.end(); ++i) {
data[i] *= 2; // 示例操作:将每个元素乘以2
}
});

// 输出结果
for (const auto& val : data) {
std::cout << val << " ";
}
std::cout << std::endl;

return 0;
}
  1. 发出任务的线程也会成为工作线程之一,并参与任务的执行,测试代码如下:
test_parallel_for.cppview raw
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
#include <tbb/tbb.h>

#include <iostream>
#include <mutex>
#include <thread>
#include <vector>
using namespace std;
using namespace tbb;

static std::atomic<int> total_blocks(0); // Atomic counter to track the number of tasks processed
static std::atomic<int> total_blocks2(0); // Atomic counter to track the number of tasks processed

// Function to process each data element
void process_data(int i, std::mutex& mtx) {
// std::this_thread::sleep_for(std::chrono::milliseconds(1)); // Simulate some processing time
for (int i = 0; i < 100; ++i)
;
}

int main(int argc, char* argv[]) {
if (argc < 3) {
std::cerr << "Usage: " << argv[0] << " <number_of_elements>"
<< "<grain_size>" << std::endl;
return 1;
}

int num_elements = std::stoi(argv[1]);
int grain_size = std::stoi(argv[2]);

std::mutex mtx;
std::vector<int> data(num_elements);
for (int i = 0; i < num_elements; ++i) {
data[i] = i;
}

// {
// std::lock_guard<std::mutex> lock(mtx);
// std::cout << "Main thread ID: " << std::this_thread::get_id() << std::endl;
// }

tbb::concurrent_unordered_map<std::thread::id, tbb::concurrent_vector<int>> thread_task_counts; // To store task counts for each thread
tbb::concurrent_unordered_map<std::thread::id, tbb::concurrent_vector<int>> thread_task_counts2; // To store task counts for each thread

tbb::parallel_for(0, static_cast<int>((data.size() + grain_size - 1) / grain_size), [&](int i) {
total_blocks++;
int cnt = 0; // Thread-local variable to avoid data
for (int j = i * grain_size; j < std::min(static_cast<int>(data.size()), (i + 1) * grain_size); ++j) {
process_data(i, mtx);
++cnt;
}
thread_task_counts[std::this_thread::get_id()].push_back(cnt);
});

tbb::parallel_for(blocked_range<int>(0u, data.size(), grain_size), [&](const blocked_range<int>& r) {
total_blocks2++;
int cnt = 0; // Thread-local variable to avoid data
for (int i = r.begin(); i < r.end(); ++i) {
process_data(i, mtx);
++cnt;
}
thread_task_counts2[std::this_thread::get_id()].push_back(cnt);
});

std::cout << "Total blocks processed: " << total_blocks.load() << std::endl;
for (const auto& pair : thread_task_counts) {
std::cout << "Thread " << pair.first << " processed: ";
for (const auto& task_count : pair.second)
std::cout << task_count << ", ";
std::cout << std::endl;
}

std::cout << "Total blocks2 processed: " << total_blocks2.load() << std::endl;
for (const auto& pair : thread_task_counts2) {
std::cout << "Thread " << pair.first << " processed: ";
for (const auto& task_count : pair.second)
std::cout << task_count << ", ";
std::cout << std::endl;
}

return 0;
}

测试结果:

1
2
3
4
5
6
7
$ ./test_parallel_for 
Main thread ID: 140220582070080
Processing data: 2 on thread 140220582070080
Processing data: 6 on thread 140220557755968
Processing data: 4 on thread 140220574795328
Processing data: 8 on thread 140220566275648
Processing data: 10 on thread 140220562015808

可见,data 2 是由主线程处理的。也就是说,parallel_for 虽然被称为 a blocking parallel construt,但线程等待所有任务完成期间是非阻塞的,它还可以充当工作线程执行任务池中的任务。

代码模拟 parallel_forwait

my_task_scheduler.cppview raw
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
#include <iostream>
#include <vector>
#include <thread>
#include <functional>
#include <condition_variable>
#include <queue>

class TaskScheduler {
public:
TaskScheduler(size_t numThreads);
~TaskScheduler();
void enqueue(std::function<void()> task);
void wait();

private:
std::vector<std::thread> workers;
std::queue<std::function<void()>> tasks;
std::mutex queueMutex;
std::condition_variable condition;
std::condition_variable finished;
bool stop;
size_t activeTasks;

void workerThread();
void executeTask();
};

TaskScheduler::TaskScheduler(size_t numThreads) : stop(false), activeTasks(0) {
for (size_t i = 0; i < numThreads; ++i) {
workers.emplace_back(&TaskScheduler::workerThread, this);
}
}

TaskScheduler::~TaskScheduler() {
{
std::unique_lock<std::mutex> lock(queueMutex);
stop = true;
}
condition.notify_all();
for (std::thread &worker : workers) {
worker.join();
}
}

void TaskScheduler::enqueue(std::function<void()> task) {
{
std::unique_lock<std::mutex> lock(queueMutex);
tasks.push(std::move(task));
}
condition.notify_one();
}

void TaskScheduler::wait() {
std::unique_lock<std::mutex> lock(queueMutex);
while (!tasks.empty() || activeTasks > 0) {
// 如果还有任务,执行一个任务,避免当前线程被阻塞
if (!tasks.empty()) {
executeTask();
} else {
finished.wait(lock);
}
}
}

void TaskScheduler::workerThread() {
while (true) {
std::function<void()> task;
{
std::unique_lock<std::mutex> lock(queueMutex);
condition.wait(lock, [this] { return stop || !tasks.empty(); });
if (stop && tasks.empty()) return;
task = std::move(tasks.front());
tasks.pop();
++activeTasks;
}
task();
{
std::unique_lock<std::mutex> lock(queueMutex);
--activeTasks;
if (tasks.empty() && activeTasks == 0) {
finished.notify_all();
}
}
}
}

void TaskScheduler::executeTask() {
std::function<void()> task;
{
std::unique_lock<std::mutex> lock(queueMutex);
if (tasks.empty()) return;
task = std::move(tasks.front());
tasks.pop();
++activeTasks;
}
task();
{
std::unique_lock<std::mutex> lock(queueMutex);
--activeTasks;
if (tasks.empty() && activeTasks == 0) {
finished.notify_all();
}
}
}

void parallel_for(int start, int end, std::function<void(int)> func) {
static TaskScheduler scheduler(std::thread::hardware_concurrency());
for (int i = start; i < end; ++i) {
scheduler.enqueue([i, &func] { func(i); });
}
scheduler.wait();
}

int main() {
const int N1 = 100;
const int N2 = 100;

// The first parallel loop.
parallel_for(0, N1, [&](int i) {
// The second parallel loop.
parallel_for(0, N2, [&](int j) {
// Some work
});
// 线程发出 parallel_for 之后,需要等待内部所有 parallel loop 的任务完成
// 在此期间允许继续拿取外部的 parallel loop 的任务执行
});

return 0;
}

3. TBB线程池

TBB似乎总是使用同一个全局线程池。测试代码如下:

tbb_thread_pool.cppview raw
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
#include <tbb/tbb.h>
#include <unistd.h>

#include <iostream>
#include <mutex>
#include <thread>
#include <vector>

using namespace std;

std::mutex mtx;

void task_group_function() {
tbb::task_group tg;
int max_concurrency = tbb::this_task_arena::max_concurrency();
{
std::lock_guard<std::mutex> lock(mtx);
cout << "Task group max concurrency: " << max_concurrency << endl;
}
for (int i = 0; i < 16; ++i) {
tg.run([i] {
{
std::lock_guard<std::mutex> lock(mtx);
std::cout << "Task group thread " << std::this_thread::get_id() << " is running." << std::endl;
}
sleep(1);
});
}
tg.wait();
}

void task_arena_function() {
tbb::task_arena arena(4);
int max_concurrency = arena.max_concurrency();
{
std::lock_guard<std::mutex> lock(mtx);
cout << "Task arena max concurrency: " << max_concurrency << endl;
}
arena.execute([max_concurrency] {
tbb::parallel_for(0, 16, [](int i) {
{
std::lock_guard<std::mutex> lock(mtx);
std::cout << "Task arena thread " << std::this_thread::get_id() << " is running." << std::endl;
}
sleep(2);
});
});
}

int main() {
// 获取默认task_arena的最大并发线程数
int arena_max_concurrency = tbb::this_task_arena::max_concurrency();
std::cout << "Default task_arena max concurrency: " << arena_max_concurrency << std::endl;

// 创建两个线程
std::thread tg_thread(task_group_function);
std::thread ta_thread(task_arena_function);

// 等待两个线程完成
tg_thread.join();
ta_thread.join();

return 0;
}

测试:

1
2
3
4
5
6
7
8
9
10
11
12
13
$ mkdir build && cd build && cmake .. && make
$ ./tbb_thread_pool > result.txt
$ cat result.txt | grep running | sort | uniq
Task arena thread 140667163379264 is running.
Task arena thread 140667167639104 is running.
Task arena thread 140667184678464 is running.
Task arena thread 140667201848896 is running.
Task group thread 140667167639104 is running.
Task group thread 140667171898944 is running.
Task group thread 140667176158784 is running.
Task group thread 140667180418624 is running.
Task group thread 140667188938304 is running.
Task group thread 140667210303040 is running.

从这两行日志可以看出,arena 和 group 重用了同一个线程 ID ,说明它们同属于同一个全局线程池。

1
2
Task arena thread 140667167639104 is running.
Task group thread 140667167639104 is running.

进一步,我们发现全局线程池中的线程总数是自适应的,比如本例就是 10 个,既不是 task_group8 个,
也不是 task_arena4 个:

TODO

1
2
$ cat result.txt | grep running | sort | uniq | wc -l
10

4. 任务调度器(Task Scheduler)

The Task Scheduler

4.1. 基于任务编程(Task-Based Programming)

当追求性能时,推荐以逻辑任务(logical tasks)而不是线程(threads)来编程,有以下原因:

  • 将并行性与可用资源匹配
  • 更快的任务启动和关闭
  • 更有效的评估顺序
  • 改进负载均衡
  • 更高层的思考

TODO

4.2. 任务调度器(Task Scheduler)如何工作

How Task Scheduler Works

4.2.1. 深度优先(depth-first)

每个线程都有自己的双端队列,头部称为 top (也称顶部),尾部称为 bottom (也称底部)。
队列的底部是队列的最深处(最末处),底部任务是最新的,顶部任务是最旧的。

深度优先有以下好处:

  • 热点缓存命中:最新的任务的缓存是最热的,所以优先执行新任务。
  • 最小化空间:广度优先会同时创建指数级数量的共存节点,而深度优先虽然也会创建相同数量的节点,但是只有线性数目的节点会同时共存,因为它创建了其他就绪任务的栈。

生产:当线程产生一个任务时,将其放置到线程自身所有的 deque 的尾部。

消费:当线程执行任务时,根据以下规则顺序选取一个任务:

  • 规则1:获取上一个任务返回的任务,如果有;
  • 规则2:从线程自己所有的 deque 尾部选取一个任务(即深度优先),如果有;
  • 规则3:随机选择一个其他线程的 deque ,从其头部窃取一个任务(即广度优先)。如果被选 deque 为空,则重复本条规则直至成功。

规则1 被称为“任务调度绕行(Task Scheduler Bypass)”。

规则2 是深度优先,这使得当前线程得以不断执行最新的任务直至其完成所有工作。

规则3 是临时的广度优先,它将潜在的并行转化为实际的并行。

4.2.2. 任务调度绕行(Task Scheduler Bypass)技术

一个任务从产生到被执行涉及以下步骤:

  • 将新任务加入线程的 deque 。
  • 执行当前任务直至完成。
  • 从线程 deque 获取一个任务执行,除非该任务被其他线程窃取走了。

其中,步骤1 和 步骤3 会引入不必要的 deque 操作,甚至更糟的是,允许窃取会损害局部性而不会增加显著的并行性。
任务调度器绕行技术可以直接指向下一个要被执行的任务,而不是生产该任务,从而避免了上述问题。
因为根据“规则1”,上一个任务产生的新任务会称为第一个备选任务。
此外,该技术几乎保证了该新任务被当前线程执行,而不是其他线程。

注意:当前唯一能使用该优化技术的是使用 tbb::task_group

4.3. 指导任务调度器的执行(Guiding Task Scheduler Execution)

Guiding Task Scheduler Execution

默认情况下,任务计划程序会尝试使用所有可用的计算资源。在某些情况下,您可能希望将任务计划程序配置为仅使用其中的一些资源。

注意:指导任务调度程序的执行可能会导致可组合性问题。

TBB 提供 task_arena 接口,通过以下方式指导任务在 arena (竞技场)内被执行:

  • 设置首选计算单元;
  • 限制部分计算单元。

4.4. 工作隔离(Work Isolation)

Work Isolation

work_isolation_eg1.cppview raw
1
2
3
4
5
// The first parallel loop.
oneapi::tbb::parallel_for( 0, N1, []( int i ) {
// The second parallel loop.
oneapi::tbb::parallel_for( 0, N2, []( int j ) { /* Some work */ } );
} );

如果当前线程被 parallel_for “阻塞”(不是真正的阻塞,只能称为 a blocking parallel construct),那么该线程被允许拿取第一个循环的任务来执行。这会导致即使是同一个线程内,也可出现乱序执行的情况。在大多数情况下,这没有什么危害。

但是少数情况可能出现错误,例如一个 thread-local 变量可能会在嵌套并行构造之外意外被更改:

work_isolation_eg2.cppview raw
1
2
3
4
5
6
7
8
9
oneapi::tbb::enumerable_thread_specific<int> ets;
oneapi::tbb::parallel_for( 0, N1, [&ets]( int i ) {
// Set a thread specific value
ets.local() = i;
oneapi::tbb::parallel_for( 0, N2, []( int j ) { /* Some work */ } );
// While executing the above parallel_for, the thread might have run iterations
// of the outer parallel_for, and so might have changed the thread specific value.
assert( ets.local()==i ); // The assertion may fail!
} );

在其它场景下,这种行为可能会导致死锁或其他问题。在这些情况下,需要更有力地保证线程内的执行次序。为此,TBB 提供了一些隔离并行构造的执行的方法,以使其任务不会干扰其他同时运行的任务。

其中一种方法是在单独的 task_arena 中执行内层循环:

work_isolation_eg3.cppview raw
1
2
3
4
5
6
7
8
9
10
11
12
oneapi::tbb::enumerable_thread_specific<int> ets;
oneapi::tbb::task_arena nested;
oneapi::tbb::parallel_for( 0, N1, [&]( int i ) {
// Set a thread specific value
ets.local() = i;
nested.execute( []{
// Run the inner parallel_for in a separate arena to prevent the thread
// from taking tasks of the outer parallel_for.
oneapi::tbb::parallel_for( 0, N2, []( int j ) { /* Some work */ } );
} );
assert( ets.local()==i ); // Valid assertion
} );

然而,使用单独的 arena 进行工作隔离并不总是方便的,并且可能会产生明显的开销。为了解决这些缺点,TBB 提供 this_task_arena::isolate 函数,通过限制调用线程仅处理在函数对象范围内(也称为隔离区域)安排的任务,来隔离地运行一个用户提供的函数对象。

当一个线程进入一个任务等待调用或(等待)在一个隔离区域内的阻塞并行结构时,该线程只能执行在该隔离区域内生成的任务及其由其他线程生成的子任务(换句话说,即使子任务是由其他线程生成的,只要属于当前隔离区域,当前线程也可以执行这些任务)。线程被禁止执行任何外层任务或属于其他隔离区域的任务。

下面的示例展示了 this_task_arena::isolate 的使用,以保证在嵌套的并行结构调用时, thread-local 变量不会被意外修改:

work_isolation_eg4.cppview raw
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
#include "oneapi/tbb/task_arena.h"
#include "oneapi/tbb/parallel_for.h"
#include "oneapi/tbb/enumerable_thread_specific.h"
#include <cassert>


int main() {
const int N1 = 1000, N2 = 1000;
oneapi::tbb::enumerable_thread_specific<int> ets;
oneapi::tbb::parallel_for( 0, N1, [&ets]( int i ) {
// Set a thread specific value
ets.local() = i;
// Run the second parallel loop in an isolated region to prevent the current thread
// from taking tasks related to the outer parallel loop.
oneapi::tbb::this_task_arena::isolate( []{
oneapi::tbb::parallel_for( 0, N2, []( int j ) { /* Some work */ } );
} );
assert( ets.local()==i ); // Valid assertion
} );
return 0;
}

补充:让我们通过一个简单的例子来说明隔离区域内其他线程如何生成子任务,并且这些子任务可以由当前线程执行。

假设我们有一个隔离区域,其中有两个线程:线程A和线程B。我们在这个隔离区域内生成了一些任务,并且这些任务可能会生成子任务。

work_isolation_eg5.cppview raw
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
#include <tbb/tbb.h>
#include <iostream>

void taskA() {
std::cout << "Task A executed by thread " << tbb::this_task_arena::current_thread_index() << std::endl;
tbb::parallel_invoke(
{
std::cout << "Subtask A1 executed by thread " << tbb::this_task_arena::current_thread_index() << std::endl;
},
{
std::cout << "Subtask A2 executed by thread " << tbb::this_task_arena::current_thread_index() << std::endl;
}
);
}

void taskB() {
std::cout << "Task B executed by thread " << tbb::this_task_arena::current_thread_index() << std::endl;
tbb::parallel_invoke(
{
std::cout << "Subtask B1 executed by thread " << tbb::this_task_arena::current_thread_index() << std::endl;
},
{
std::cout << "Subtask B2 executed by thread " << tbb::this_task_arena::current_thread_index() << std::endl;
}
);
}

int main() {
tbb::task_arena arena;
arena.execute([&] {
tbb::this_task_arena::isolate([&] {
tbb::parallel_invoke(taskA, taskB);
});
});
return 0;
}

在这个例子中:

taskA 和 taskB 是在隔离区域内生成的任务。
taskA 生成了两个子任务 Subtask A1 和 Subtask A2。
taskB 生成了两个子任务 Subtask B1 和 Subtask B2。
假设线程A执行了 taskA,线程B执行了 taskB。在隔离区域内,线程A和线程B可以执行彼此生成的子任务。例如,线程A可以执行 Subtask B1 或 Subtask B2,而线程B可以执行 Subtask A1 或 Subtask A2,只要这些子任务属于同一个隔离区域。

5. 推荐阅读

5.1. 书籍

  1. Intel Building Blocks 编程指南. James Reinders.
  2. Patterns for Parallel Pragramming. Timothy Mattson 等.
  3. 设计模式:Design Patterns of Reusable Object-Oriented Software (Addison Wesley). Gamma, Helm, Johnson 和 Vlissides.