0%

TBB典型场景

IO + CPU 密集 + IO

tasks

tasks.hppview raw
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
/**
* 模拟 IO 和 CPU 任务
*/

#include <unistd.h>

#include <iostream>
#include <vector>

struct DataChunk {
std::vector<char> data;
DataChunk() = default;
explicit DataChunk(size_t size) : data(size) {}
};

struct CompressedChunk {
std::vector<char> data;
CompressedChunk() = default;
explicit CompressedChunk(size_t size) : data(size) {}
};

// 模拟数据读取函数
bool read_from_network(DataChunk& chunk) {
sleep(3); // 模拟 IO 延迟

chunk.data.resize(100); // 模拟每个数据块有 100 个字节

static int count = 0;
if (count++ >= 10) return false; // 模拟读取 10 个数据块后结束

std::generate(chunk.data.begin(), chunk.data.end(),
[]() { return rand() % 256; });
return true;
}

// 模拟压缩函数
char compress_byte(char byte) {
for (int i = 0; i < 10'000ll; ++i)
; // 模拟 CPU busy
return byte % 128; // 简单压缩算法示例
}

// 模拟写入函数
void write_to_file(const CompressedChunk& chunk) {
sleep(3); // 模拟 IO 延迟
std::cout << "Writing chunk of size " << chunk.data.size() << "\n";
}

方案一

1_message_queue.cppview raw
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
/**
* 方案 1:异步队列 + TBB 流水线
*/

#include <tbb/tbb.h>

#include <condition_variable>
#include <mutex>
#include <queue>
#include <thread>
#include <vector>
#include <atomic>

#include "tasks.hpp"

// 全局队列
std::queue<std::vector<char>> readQueue;
std::mutex mtx;
std::condition_variable cv;
std::atomic_bool stop = false;

// 读线程
void networkReader() {
while (!stop) {
DataChunk chunk;

if (!read_from_network(chunk)) // 阻塞 I/O
{
stop.store(true, memory_order::releaxed);
return;
}

{
std::lock_guard<std::mutex> lock(mtx);
readQueue.push(std::move(chunk.data));
}
cv.notify_one();
}
}

// 压缩任务
void compressor() {
while (!stop) {
DataChunk chunk;
{
std::unique_lock<std::mutex> lock(mtx);
cv.wait(lock, [] { return !readQueue.empty(); });
chunk.data = std::move(readQueue.front());
readQueue.pop();
}

// CPU 密集计算,使用 TBB 并行
CompressedChunk compressed(chunk.data.size());
tbb::parallel_for(size_t(0), chunk.data.size(), [&](size_t i) {
compressed.data[i] = compress_byte(chunk.data[i]); // 假设单字节压缩
});

write_to_file(compressed); // 可以异步
}
}

int main() {
std::thread reader(networkReader);
std::thread worker(compressor);

reader.join();
worker.join();
}

方案二

2_flow_graph.cppview raw
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
/**
* 方案 2:TBB Flow Graph
*/

#include <tbb/flow_graph.h>
#include <tbb/tbb.h>

#include <iostream>
#include <vector>

#include "tasks.hpp"

using namespace tbb;
using namespace tbb::flow;

int main() {
graph g;

// 1. 读取节点(串行)
input_node<DataChunk> reader(
g,
[](flow_control& fc) -> DataChunk {
DataChunk chunk(1024); // 1KB数据块
if (!read_from_network(chunk)) {
fc.stop();
return DataChunk();
}
return chunk;
});

// 2. 并行处理节点(无限制并发)
function_node<DataChunk, CompressedChunk> processor(
g, unlimited,
[](const DataChunk& input) -> CompressedChunk {
CompressedChunk output(input.data.size());

tbb::parallel_for(
tbb::blocked_range<size_t>(0, input.data.size()),
[&](const tbb::blocked_range<size_t>& r) {
for (size_t i = r.begin(); i != r.end(); ++i) {
output.data[i] = compress_byte(input.data[i]);
}
});

return output;
});

// 3. 写入节点(串行保证写入顺序)
function_node<CompressedChunk> writer(
g, serial,
[](const CompressedChunk& output) {
write_to_file(output);
});

// 构建数据流管道
make_edge(reader, processor);
make_edge(processor, writer);

// 启动管道
reader.activate();
g.wait_for_all();

return 0;
}

方案三

3_pipeline.cppview raw
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
/**
* 方案 3: TBB 流水线
*/

#include <tbb/tbb.h>

#include <iostream>
#include <vector>

#include "tasks.hpp"

int main() {
tbb::parallel_pipeline(
/* max_number_of_live_token */ 4,
// Stage 1: 读网络数据
tbb::make_filter<void, DataChunk>(
tbb::filter_mode::serial_in_order,
[](tbb::flow_control& fc) -> DataChunk {
DataChunk chunk;
if (!read_from_network(chunk)) { // 返回 false 时结束
fc.stop();
}
return chunk;
}) &
// Stage 2: CPU 压缩
tbb::make_filter<DataChunk, CompressedChunk>(
tbb::filter_mode::parallel,
[](DataChunk chunk) -> CompressedChunk {
CompressedChunk out(chunk.data.size());
tbb::parallel_for(size_t(0), chunk.data.size(), [&](size_t i) {
out.data[i] = compress_byte(chunk.data[i]);
});
return out;
}) &
// Stage 3: 写文件
tbb::make_filter<CompressedChunk, void>(
tbb::filter_mode::serial_in_order,
[](CompressedChunk out) {
write_to_file(out); // 可以异步
}));

return 0;
}