0%

前言

这是阅读 Cameron Desrochers 的
A Fast General Purpose Lock-Free Queue for C++
源码的笔记。

系统概览

MPMC 队列由一系列 SPMC 队列组成。消费者使用启发式 (heuristic) 来决定消费哪个 SPMC 队列。允许批量入列
和出列,只需要很小的额外开销。

producer 需要一些 thread-local 数据; consumer 也可以用一些可选的 thread-local 数据来加速;这些
thread-local 数据可以与用户分配的 tokens 关联;如果用户没有为生产者提供 tokens ,则使用无锁哈希表(
以当前线程 ID 为键)来查找线程本地生产者队列:每个 SPMC 队列都使用一个预分配的 token (或隐式分配的
token,如果没有提供的话)来创建。由于 token 包含相当于线程特定的数据,因此它们不应该同时在多个线程中
使用(尽管可以将 token 的所有权转移给另一个线程;特别是,这允许在线程池任务中使用令牌,即使运行任务
的线程在中途发生变化)。

所有生产者队列都以无锁链表的形式连接在一起。当显式生产者不再有元素被添加时(即其令牌被销毁),它会被
标记为与任何生产者都无关联,但它会保留在链表中,且其内存不会被释放;下一个新生产者会重用旧生产者的内
存(这样,无锁生产者列表就只能添加)。隐式生产者永远不会被销毁(直到高层队列本身被销毁),因为无法知
道给定线程是否已完成对数据结构的使用。需要注意的是,最坏情况下的出队速度取决于生产者队列的数量,即使
它们都为空。

显式生产者队列和隐式生产者队列的生命周期存在根本区别:显式生产者队列的生产生命周期有限,与令牌的生命
周期绑定;而隐式生产者队列的生产生命周期不受限制,且与高级队列本身的生命周期相同。因此,为了最大化速
度和内存利用率,我们使用了两种略有不同的 SPMC 算法。通常,显式生产者队列设计得更快,占用的内存也更多
;而隐式生产者队列设计得更慢,但会将更多内存回收到高级队列的全局池中。为了获得最佳速度,请始终使用显
式令牌(除非您觉得它太不方便)。

任何分配的内存只有在高级队列被销毁时才会释放(尽管存在一些重用机制)。内存分配可以预先完成,如果内存
不足,操作就会失败(而不是分配更多内存)。如果需要,用户可以覆盖各种默认大小参数(以及队列使用的内存
分配函数)。

Full API (pseudocode)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
# Allocates more memory if necessary
enqueue(item) : bool
enqueue(prod_token, item) : bool
enqueue_bulk(item_first, count) : bool
enqueue_bulk(prod_token, item_first, count) : bool

# Fails if not enough memory to enqueue
try_enqueue(item) : bool
try_enqueue(prod_token, item) : bool
try_enqueue_bulk(item_first, count) : bool
try_enqueue_bulk(prod_token, item_first, count) : bool

# Attempts to dequeue from the queue (never allocates)
try_dequeue(item&) : bool
try_dequeue(cons_token, item&) : bool
try_dequeue_bulk(item_first, max) : size_t
try_dequeue_bulk(cons_token, item_first, max) : size_t

# If you happen to know which producer you want to dequeue from
try_dequeue_from_producer(prod_token, item&) : bool
try_dequeue_bulk_from_producer(prod_token, item_first, max) : size_t

# A not-necessarily-accurate count of the total number of elements
size_approx() : size_t

Producer Queue (SPMC) Design

隐式和显式版本的共享设计

生产者队列由块组成(显式和隐式生产者队列使用相同的块对象,以实现更好的内存共享)。初始状态下,它没有
块。每个块可以容纳固定数量的元素(所有块的容量相同,均为 2 的幂)。此外,块包含一个标志,指示已填充
的槽位是否已被完全消耗(显式版本使用此标志来判断块何时为空),以及一个原子计数器,用于计数已完全出队
的元素数量(隐式版本使用此标志来判断块何时为空)。

为了实现无锁操作,生产者队列可以被认为是一个抽象的无限数组。尾部索引指示生产者下一个可用的槽位;它同
时也是已入队元素数量的两倍( 入队计数 (enqueue count) )。尾部索引仅由生产者写入,并且始终递增(除
非溢出并回绕,但就我们的目的而言,这种情况仍被视为“递增”)。由于只有一个线程在更新相关变量,因此生产
一个元素的过程非常简单。头索引指示下一个可以被消费的元素。头索引由消费者原子地递增,可能并发进行。为
了防止头索引达到/超过感知到的尾部索引,我们使用了一个额外的原子计数器: 出队计数 (dequeue count)
。出队计数是乐观的,即当消费者推测有元素需要出队时,它会递增。如果出队计数在递增后的值小于入队计数(
尾部),则保证至少有一个元素要出队(即使考虑到并发性),并且可以安全地递增头部索引,因为知道之后它会
小于尾部索引。另一方面,如果出队计数在递增后超过(或等于)尾部,则出队操作失败,并且出队计数在逻辑上
会递减(以使其最终与入队计数保持一致):这可以通过直接递减出队计数来实现,但是(为了增加并行性并使所
有相关变量单调递增),改为递增**出队过量提交计数器 (dequeue overcommit counter)**。

1
出队计数的逻辑值 = 出队计数变量 - 出队过量提交值

在消费时,一旦如上所述确定了有效索引,仍然需要将其映射到一个块以及该块中的偏移量;为此会使用某种索引
数据结构(具体使用哪种结构取决于它是隐式队列还是显式队列)。最后,可以将元素移出,并更新某种状态,以
便最终知道该块何时完全消费。下文将分别在隐式和显式队列的各个部分中对这些机制进行完整描述。

如前所述,尾部和头部的索引/计数最终会溢出。这是预料之中的,并且已被考虑在内。因此,索引/计数被视为存
在于一个与最大整数值大小相同的圆上(类似于 360 度的圆,其中 359 在 1 之前)。为了检查一个索引/计数(
例如 a)是否位于另一个索引/计数(例如 b)之前(即逻辑小于),我们必须确定 a 是否沿着圆上的顺时针圆弧
更接近 b。使用以下”环形小于”算法(32 位版本):a < b 变为 a - b > (1U << 31U)a <= b 变为
a - b - 1ULL > (1ULL << 31ULL)。请注意,环形减法“仅适用于”普通无符号整数(假设为二进制补码)。需要
注意的是,尾部索引的增量不会超过头部索引(这会破坏队列)。请注意,尽管如此,从技术上讲仍然存在竞争条
件,即消费者(或生产者)看到的索引值过于陈旧,几乎比当前值落后一整圈(甚至更多!),从而导致队列的内
部状态损坏。但在实践中,这不是问题,因为遍历 2^31 个值(对于 32 位索引类型)需要一段时间,而其他核心
到那时会看到更新的值。实际上,许多无锁算法都基于相关的标签指针习语(tag-pointer idiom),其中前 16
位用于重复递增的标签,后 16 位用于指针值;这依赖于类似的假设,即一个核心不能将标签递增超过 2^15 次,
而其他核心却不知道。尽管如此,队列的默认索引类型是 64 位宽(如果 16 位看起来就足够了,那么理论上应该
可以避免任何潜在的竞争)。

内存分配失败也会得到妥善处理,不会损坏队列(只会报告失败)。此外,队列元素本身在操作时也应确保不会抛
出异常。

Block Pools

有两种不同的块池可供使用:首先,有一个初始的预分配块数组。一旦使用完毕,该池将永远保持为空。这简化了
其无等待(wait-free)实现,只需一条 fetch-and-add 原子指令(用于获取空闲块的下一个索引)并进行检查(
以确保该索引在范围内)。其次,有一个无锁(但非无等待)的全局空闲列表(“全局”是指对高级队列而言是全局
的),其中包含已用完且可重复使用的块,该列表实现为一个无锁单链表:头指针最初指向空(null)。要将块添
加到空闲列表,需要将块的下一个指针设置为头指针,然后使用比较并交换 (CAS) 更新头指针,使其指向该块,
前提是头指针未发生更改;如果发生更改,则重复该过程(这是一个经典的无锁 CAS 循环设计模式)。要从空闲
列表中移除一个块,可以使用类似的算法:读取头部块的下一个指针,然后将头部设置为该下一个指针(使用
CAS),前提是在此期间头部块没有发生变化。为了避免 ABA 问题,每个块都有一个引用计数,在执行 CAS 移除
块之前会递增,之后会递减;如果在块的引用计数大于 0 的情况下尝试将其重新添加到空闲列表中,则会设置一
个标志,指示该块应该在空闲列表中,并且下一个线程在完成最后一个引用的持有后会检查此标志,并将该块添加
到列表中(这种方法有效,因为我们不关心顺序)。我
另一篇博文中更
详细地描述了这个无锁空闲列表的具体设计和实现。当生产者队列需要新块时,它首先检查初始块池,然后检查全
局空闲列表,只有当它在那里找不到空闲块时,它才会在堆上分配一个新块(如果不允许内存分配,则失败)。

基准测试

Ticket System

BlockQueue(只用分块):使用分块内存布局,但不使用 ticket 分发机制。

TicketQueue_benchmark.cppview raw
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
constexpr size_t BLOCK_SIZE = 64;
constexpr int N = 1'000'000;
constexpr int NUM_PRODUCERS = 8;
constexpr int NUM_CONSUMERS = 8;
constexpr int ITEMS_PER_PRODUCER = N / NUM_PRODUCERS;

struct Slot {
std::atomic<bool> ready;
int data;
};

struct Block {
Slot slots[BLOCK_SIZE];
};

// BlockQueue: 分块但无 ticket
class BlockQueue {
public:
BlockQueue() : head(0), tail(0) {
}

void enqueue(int value) {
size_t index = head.fetch_add(1) % BLOCK_SIZE;
block.slots[index].data = value;
block.slots[index].ready.store(true, std::memory_order_release);
}

bool try_dequeue(int& value) {
size_t index = tail.fetch_add(1) % BLOCK_SIZE;
if (!block.slots[index].ready.load(std::memory_order_acquire))
return false;
value = block.slots[index].data;
return true;
}

private:
std::atomic<size_t> head;
std::atomic<size_t> tail;
Block block;
};

TicketQueue(分块 + ticket):模拟 moodycamel 的 ticket 分发方式。

TicketQueue_benchmark.cppview raw
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
// TicketQueue: 分块 + ticket
class TicketQueue {
public:
TicketQueue() : head(0), tail(0) {
}

void enqueue(int value) {
size_t ticket = head.fetch_add(1);
size_t index = ticket % BLOCK_SIZE;
block.slots[index].data = value;
block.slots[index].ready.store(true, std::memory_order_release);
}

bool try_dequeue(int& value) {
size_t ticket = tail.fetch_add(1);
size_t index = ticket % BLOCK_SIZE;
while (!block.slots[index].ready.load(std::memory_order_acquire)) {
// 自旋等待
}
value = block.slots[index].data;
return true;
}

private:
std::atomic<size_t> head;
std::atomic<size_t> tail;
Block block;
};

Benchmark 代码:

TicketQueue_benchmark.cppview raw
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
// 基准测试函数
template <typename QueueType>
double benchmark(const std::string& name, double& opsPerSec) {
QueueType queue;
std::atomic<int> totalConsumed{0};
std::map<int, double> threadWaitTimes;

auto start = std::chrono::high_resolution_clock::now();

// 启动生产者线程
std::vector<std::thread> producers;
for (int p = 0; p < NUM_PRODUCERS; ++p) {
producers.emplace_back([&queue, p]() {
for (int i = 0; i < ITEMS_PER_PRODUCER; ++i) {
queue.enqueue(i + p * ITEMS_PER_PRODUCER);
}
});
}

// 启动消费者线程
std::vector<std::thread> consumers;
for (int c = 0; c < NUM_CONSUMERS; ++c) {
consumers.emplace_back([&queue, &totalConsumed, c, &threadWaitTimes]() {
int item;
auto localStart = std::chrono::high_resolution_clock::now();
while (true) {
auto t0 = std::chrono::high_resolution_clock::now();
while (!queue.try_dequeue(item)) {
// busy wait
}
auto t1 = std::chrono::high_resolution_clock::now();
threadWaitTimes[c] +=
std::chrono::duration<double>(t1 - t0).count();

if (++totalConsumed >= N)
break;
}
auto localEnd = std::chrono::high_resolution_clock::now();
double threadTime =
std::chrono::duration<double>(localEnd - localStart).count();
std::cout << "Consumer " << c << " finished in " << threadTime
<< "s, wait time: " << threadWaitTimes[c] << "s\n";
});
}

for (auto& t : producers)
t.join();
for (auto& t : consumers)
t.join();

auto end = std::chrono::high_resolution_clock::now();
std::chrono::duration<double> elapsed = end - start;
opsPerSec = N / elapsed.count();

std::cout << "\n"
<< name << " completed in " << elapsed.count()
<< "s, throughput: " << opsPerSec << " ops/sec\n";

return elapsed.count();
}

int main() {
double opsBlock = 0.0, opsTicket = 0.0;
double timeBlock = benchmark<BlockQueue>("BlockQueue", opsBlock);
double timeTicket = benchmark<TicketQueue>("TicketQueue", opsTicket);
double speedup = opsTicket / opsBlock;

std::cout << std::left << "| " << std::setw(14) << "Queue Type"
<< "| " << std::setw(12) << "Time (s)"
<< "| " << std::setw(20) << "Throughput (ops/s)"
<< "| " << std::setw(10) << "Speedup"
<< "|\n";

std::cout << std::string(70, '-') << "\n";

// BlockQueue row
std::cout << std::left << "| " << std::setw(14) << "BlockQueue"
<< "| " << std::setw(12) << std::fixed << std::setprecision(6)
<< timeBlock << "| " << std::setw(20) << std::fixed
<< std::setprecision(0) << opsBlock << "| " << std::setw(10)
<< "1.00×"
<< "|\n";

// TicketQueue row
std::ostringstream speedupStream;
speedupStream << std::fixed << std::setprecision(2) << speedup << "×";

std::cout << std::left << "| " << std::setw(14) << "TicketQueue"
<< "| " << std::setw(12) << std::fixed << std::setprecision(6)
<< timeTicket << "| " << std::setw(20) << std::fixed
<< std::setprecision(0) << opsTicket << "| " << std::setw(10)
<< speedupStream.str() << "|\n";

return 0;
}

前言

这是阅读 Cameron Desrochers 的 A Fast Lock-Free Queue for C++ 源码的笔记。

仓库地址:https://github.com/cameron314/readerwriterqueue

其他参考文献:

An Introduction to Lock-Free Programming
C++ and Beyond 2012: Herb Sutter - atomic Weapons 1 of 2
C++ and Beyond 2012: Herb Sutter - atomic Weapons 2 of 2

内存屏障

约束 memory loads/stores 的顺序。

  • releaase 内存屏障:告诉 CPU,如果屏障之后的任何写入变得可见,那么屏障之前的任何写入都应该在其他核心中可见,前提是其他核心在读取 写屏障之后写入的数据 后执行读屏障。
    换句话说,如果线程 B 可以看到在另一个线程 A 上的写屏障之后写入的新值,那么在执行读屏障(在线程 B 上)之后,可以保证在线程 A 上的写屏障之前发生的所有写入在线程 B 上可见。

实现细节

  1. block: 一个连续的环形缓冲区,用来存储元素。这样可以预分配内存。
  2. 块过小(这不利于无锁)时,无需将所有现有元素复制到新的块中;多个块(大小独立)以循环链表的形式链接在一起。
  3. 当前插入的块称为 “尾块”,当前消费的块称为 “头块”。
  4. 头索引指向下一个要读取的满槽;尾索引指向下一个要插入的空槽。如果两个索引相等,则块为空(确切地说,当队列已满时,恰好有一个插槽为空,以避免在具有相同头和尾索引的满块和空块之间产生歧义)。
  5. 为了允许队列对任意线程创建 / 析构(独立于生产 / 消费线程),全内存屏障(memory_order_acq_cst)被用在析构函数地最后、析构函数的开头(这会强制所有的 CPU cores 同步 outstanding changes)。显然,在析构函数可以被安全地调用之前,生产者和消费者必须已经停止使用该队列。

Give me the codes

  1. 用户不需要管理内存。
  2. 预分配内存,在连续的块中。
  3. try_enqueue: 保证不会分配内存(队列有初始容量);
  4. enqueue: 会根据需要动态扩容。
  5. 没有使用 CAS loop;这意味者 enqueue 和 dequeue 是 O(1) 的(没有计入内存分配的时间)。
  6. 因为在 x86 平台,内存屏障是空操作,所以 enqueue 和 dequeue 是一系列简单的 loads 和 stores (and branches) 。

此代码仅仅适用于以原子方式处理 自然对齐的整型(aligned integer) 和 原生指针大小(native-pointer-size) 的 loads/stores 的 CPU 上;
幸运的是,这包括了所有的现代处理器(包括 ARM, x86/x86_64 和 PowerPC)。
它不是为在 DEC Alpha 上运行而设计的(DEC Alpha 似乎具有有史以来最弱的内存排序保证)。

注:在 x86 上,memory_order_acquire/release 通常不需要额外指令就能实现语义,但仍然能限制编译器的重排。
fetch_add 不是一个原子操作,而是三个:load, add, store. 所以不适用上述说的 “自然对齐的整型” 或“原生指针大小”的 load/store.

性能优化点

  1. 平凡析构:跳过析构,直接释放内存。
  2. MCRingBuffer paper
    1. cache line padding
    2. local control variables
      1. 减少对全局 read/write 指针的读取
    3. local block

正确性测试

  1. 定义不可预测性延时函数,用于模拟线程调度。
  2. 写线程塞入 32 M 个数据;读线程读取 32 M 次。读写线程中使用 unpredDelay() 模拟调度延迟。
  3. 测试能否顺序读取,失败则打印日志,不退出。
  4. 测试程序无限运行,每次使用一个写线程和读线程。直至手动 Ctrl C 关闭。

查看磁盘类型

1
2
3
$ lsblk -d -o name,rota,type,size,model
NAME ROTA TYPE SIZE MODEL
sda 1 disk 1.8T PERC H740P Mini

ROTA=1:这是旋转磁盘。

测试方法

顺序写吞吐测试(逼近最大写入速度)

1
fio --name=seqwrite --rw=write --bs=1M --size=5G --numjobs=4 --iodepth=32 --direct=1 --runtime=60 --group_reporting

随机读 IOPS 测试(逼近最大并发处理能力)

1
fio --name=randread --rw=randread --bs=4k --size=5G --numjobs=4 --iodepth=64 --direct=1 --runtime=60 --group_reporting

混合读写测试(模拟数据库负载)

1
fio --name=mixrw --rw=randrw --rwmixread=70 --bs=4k --size=5G --numjobs=4 --iodepth=32 --direct=1 --runtime=60 --group_reporting

磁盘的测试结果

由于是旋转磁盘,iodepth 总是 1(设成其他值不会生效)

单线程读写文件:

点击展开代码
    
fio_bs_test.shview raw
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
#!/bin/bash

# 测试参数
DEVICE="./testfile" # 修改为你要测试的文件或设备路径
RUNTIME=30 # 每个测试运行时间(秒)
# BLOCK_SIZES=("4k" "16k" "64k" "256k" "1M" "4M" "16M" "32M" "64M" "128M") # 测试块大小列表
BLOCK_SIZES=("128M" "64M" "32M" "16M" "4M" "1M" "256k" "64k" "16k" "4k") # 测试块大小列表
LOG_FILE="fio_bs_output.log" # 原始输出日志文件
PERFORMANCE_LOG="fio_bs_performance.log" # 性能结果日志文件

# 输出表头
printf "%-8s | %-10s | %-8s | %-10s | %-10s\n" "RW" "BlockSize" "IOPS" "BW(MiB/s)" "AvgLat(ms)" | tee "$PERFORMANCE_LOG"
echo "-------------------------------------------------------------" | tee -a "$PERFORMANCE_LOG"

echo "" > "$LOG_FILE" # 清空日志文件

# 循环测试不同块大小
for RW in read write; do
for BS in "${BLOCK_SIZES[@]}"; do
OUTPUT=$(fio --name=bs_test \
--filename="$DEVICE" \
--rw=$RW \
--bs=$BS \
--size=1G \
--time_based \
--runtime=$RUNTIME \
--numjobs=1 \
--direct=1 \
--ioengine=psync \
--group_reporting)
echo "----------------------------------------------" >> "$LOG_FILE"
echo "$OUTPUT" >> "$LOG_FILE"
echo "" >> "$LOG_FILE"

# 提取关键指标
read IOPS BW BWUNIT LAT LAT_UNIT <<< $(echo "$OUTPUT" | awk '
/IOPS=/ {match($0, /IOPS= *([0-9.]+)/, iops)}
/BW=/ {
match($0, /BW= *([0-9.]+)([KMG]iB)\/s/, bwinfo)
bwval=bwinfo[1]; bwunit=bwinfo[2]
}
/clat \(/ {match($0, /avg= *([0-9.]+),/, lat); match($0, /\(([^)]+)\)/, lat_unit)}
END {print iops[1], bwval, bwunit, lat[1], lat_unit[1]}
')

# 延迟单位换算
if [ "$LAT_UNIT" = "usec" ]; then
LAT_MS=$(awk "BEGIN {printf \"%.2f\", $LAT/1000}")
elif [ "$LAT_UNIT" = "msec" ]; then
LAT_MS=$LAT
else
LAT_MS="Unknown"
fi

# 带宽单位换算为 MiB/s
case "$BWUNIT" in
"KiB") BW_MIB=$(awk "BEGIN {printf \"%.2f\", $BW/1024}") ;;
"MiB") BW_MIB=$BW ;;
"GiB") BW_MIB=$(awk "BEGIN {printf \"%.2f\", $BW*1024}") ;;
*) BW_MIB="Unknown" ;;
esac

# 输出结果行
printf "%-8s | %-10s | %-8s | %-10s | %-10s\n" "$RW" "$BS" "$IOPS" "$BW_MIB" "$LAT_MS" | tee -a "$PERFORMANCE_LOG"
done
done

# 删除 fio 创建的测试文件
rm -f "$DEVICE"
echo "测试完成,结果已保存到 $LOG_FILE$PERFORMANCE_LOG"
1
2
3
4
5
6
7
8
9
10
11
12
RW       | BlockSize  | IOPS     | BW(MiB/s)  | AvgLat(ms)
-----------------------------------------------------------
read | 4k | 194 | 0.76 | 5.14
read | 16k | 239 | 3.74 | 4.17
read | 64k | 347 | 21.7 | 2.88
read | 256k | 271 | 67.9 | 3.68
read | 1M | 94 | 94.9 | 10.53
read | 4M | 14 | 57.2 | 69.74
read | 16M | 6 | 97.6 | 163.96
read | 32M | 3 | 111 | 288.99
read | 64M | 1 | 112 | 573.70
read | 128M | 0 | 112 | 1147.00
1
2
3
4
5
6
7
8
9
10
11
12
RW       | BlockSize  | IOPS     | BW(MiB/s)  | AvgLat(ms)
-----------------------------------------------------------
write | 4k | 1884 | 7.36 | 0.53
write | 16k | 1452 | 22.7 | 0.69
write | 64k | 793 | 49.6 | 1.26
write | 256k | 307 | 76.8 | 3.24
write | 1M | 100 | 100 | 9.96
write | 4M | 18 | 73.7 | 53.99
write | 16M | 5 | 93.8 | 169.43
write | 32M | 3 | 101 | 313.24
write | 64M | 1 | 107 | 594.46
write | 128M | 0 | 102 | 1249.35

当 BlockSize=32M 以后,写入性能基本达到顶峰(110 MiB/s),和旋转磁盘的参数基本一致。

多线程读写同一个文件,BS=64KiB:

点击展开代码
    
fio_mt_test.shview raw
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
#!/bin/bash

DEVICE="./mt_testfile" # 测试文件路径
RUNTIME=30 # 每个测试运行时间(秒)
THREADS=(1 2 4 8 16 32 64 72 120) # 测试线程数列表
BLOCK_SIZE="64k" # 块大小,可根据需要修改
LOG_FILE="fio_thread_output.log"
PERFORMANCE_LOG="fio_thread_performance.log"

# 输出表头
printf "%-8s | %-8s | %-10s | %-10s | %-10s\n" "RW" "Threads" "IOPS" "BW(MiB/s)" "AvgLat(ms)" | tee "$PERFORMANCE_LOG"
echo "---------------------------------------------------------------" | tee -a "$PERFORMANCE_LOG"

echo "" > "$LOG_FILE" # 清空日志文件

for RW in read write; do
for THREAD in "${THREADS[@]}"; do
OUTPUT=$(fio --name=thread_test \
--filename="$DEVICE" \
--rw=$RW \
--bs=$BLOCK_SIZE \
--size=5G \
--time_based \
--runtime=$RUNTIME \
--numjobs=$THREAD \
--direct=1 \
--ioengine=psync \
--group_reporting)
echo "---------------------------------------------------------------" >> "$LOG_FILE"
echo "$OUTPUT" >> "$LOG_FILE"
echo "" >> "$LOG_FILE"

# 提取关键指标
read IOPS BW BWUNIT LAT LAT_UNIT <<< $(echo "$OUTPUT" | awk '
/IOPS=/ {match($0, /IOPS= *([0-9.]+)/, iops)}
/BW=/ {
match($0, /BW= *([0-9.]+)([KMG]iB)\/s/, bwinfo)
bwval=bwinfo[1]; bwunit=bwinfo[2]
}
/clat \(/ {match($0, /avg= *([0-9.]+),/, lat); match($0, /\(([^)]+)\)/, lat_unit)}
END {print iops[1], bwval, bwunit, lat[1], lat_unit[1]}
')

# 延迟单位换算
if [ "$LAT_UNIT" = "usec" ]; then
LAT_MS=$(awk "BEGIN {printf \"%.2f\", $LAT/1000}")
elif [ "$LAT_UNIT" = "msec" ]; then
LAT_MS=$LAT
else
LAT_MS="Unknown"
fi

# 带宽单位换算为 MiB/s
case "$BWUNIT" in
"KiB") BW_MIB=$(awk "BEGIN {printf \"%.2f\", $BW/1024}") ;;
"MiB") BW_MIB=$BW ;;
"GiB") BW_MIB=$(awk "BEGIN {printf \"%.2f\", $BW*1024}") ;;
*) BW_MIB="Unknown" ;;
esac

# 输出结果行
printf "%-8s | %-8s | %-10s | %-10s | %-10s\n" "$RW" "$THREAD" "$IOPS" "$BW_MIB" "$LAT_MS" | tee -a "$PERFORMANCE_LOG"
done
done

rm -f "$DEVICE"
echo "测试完成,结果已保存到 $LOG_FILE$PERFORMANCE_LOG"
1
2
3
4
5
6
7
8
9
10
11
RW       | Threads  | IOPS       | BW(MiB/s)  | AvgLat(ms)
------------------------------------------------------------
read | 1 | 826 | 51.6 | 1.21
read | 2 | 1300 | 81.3 | 1.54
read | 4 | 1681 | 105 | 2.38
read | 8 | 1778 | 111 | 4.49
read | 16 | 1789 | 112 | 8.93
read | 32 | 1790 | 112 | 17.86
read | 64 | 1789 | 112 | 35.73
read | 72 | 1790 | 112 | 40.18
read | 120 | 1789 | 112 | 66.98
1
2
3
4
5
6
7
8
9
10
11
RW       | Threads  | IOPS       | BW(MiB/s)  | AvgLat(ms)
------------------------------------------------------------
write | 1 | 847 | 52.9 | 1.18
write | 2 | 1367 | 85.5 | 1.46
write | 4 | 1757 | 110 | 2.27
write | 8 | 1786 | 112 | 4.47
write | 16 | 1788 | 112 | 8.92
write | 32 | 1788 | 112 | 17.79
write | 64 | 1788 | 112 | 35.09
write | 72 | 1788 | 112 | 39.54
write | 120 | 1784 | 112 | 64.50

当线程数增加,IO 性能随之提高,可能原因是 64KiB 小块数据大量提交到 I/O 队列,操作系统能更好地完成读写路径优化。
但达到8线程的时候,就基本到达性能顶峰了。

注意:fio 多线程写入同一个文件是没有加锁的,如果超过 page cache (一般是 4 KB),那么可能乱序写入。

概念(以 fio 为例)

ioengine

ioengine(I/O 引擎)是 fio 提供以执行读写任务的底层接口。不同的引擎代表不同的 I/O 模型,比如同步、异步、内存映射、零拷贝等。

引擎名称 类型 特点与用途
sync 同步 默认方式,每次 I/O 都等待完成,适合简单测试
psync 同步 使用 pread/pwrite,可指定偏移,略快
libaio 异步 Linux 异步 I/O,适合高性能 SSD/NVMe
io_uring 异步 新一代 Linux 异步接口,低延迟、高并发
mmap 内存映射 将文件映射到内存,适合大文件顺序访问
splice 零拷贝 用于高效数据传输,减少 CPU 和内存开销
windowsaio 异步 Windows 原生异步 I/O,适合多线程写入
net 网络 用于网络 I/O 测试,如 socket 传输
sg SCSI 用于直接访问 SCSI 设备

每种 ioengine 都依赖操作系统提供的底层 I/O 接口。例如:

ioengine 类型 操作系统要求 是否异步 说明
sync / psync 所有系统 使用标准阻塞 I/O,几乎总是可用
libaio Linux,需安装 libaio 库 依赖 Linux 的异步 I/O 接口
io_uring Linux ≥ 5.1,推荐 ≥ 5.4 依赖新内核特性和 liburing 库
windowsaio Windows 使用 Windows 原生异步 I/O
mmap 所有主流系统 使用内存映射,适合顺序读写
posixaio POSIX 兼容系统 使用 aio_read / aio_write 接口

你可以指定任意 ioengine (默认值是 sync / psync),但它是否能运行,必须得到操作系统的支持。这包括内核版本、系统接口、库文件等。如果系统不支持,fio 会报错或自动回退。

查看支持列表:

1
fio --enghelp

这会列出当前系统上可用的 ioengine,但注意:列出来 ≠ 能用,还要看运行时是否报错。

实际测试:

1
fio --name=test --ioengine=io_uring --rw=write --size=1G --bs=1M

如果不支持,会报错,例如:

1
fio: pid=132756, err=38/file:engines/io_uring.c:1351, func=io_queue_init, error=Function not implemented

iodepth

--iodepth 是传递给内核的参数。

如果你不显式设置 --iodepth,那么 fio 会根据所选的 I/O 引擎(--ioengine) 来决定默认值

I/O 引擎 默认 iodepth
sync / psync / vsync 1(同步 I/O,只能一个一个处理)
libaio / io_uring 1,但可以设置更高以启用异步并发
mmap / pread / pwrite 1
windowsaio(Windows) 1
sg(SCSI generic) 1

fio 并不会主动维护队列,队列是内核的特性。

I/O 引擎 队列位置 是否异步 说明
psync / sync 无队列(直接调用) 每次写入调用 write(),无排队机制
libaio 内核空间 ✅ 是 使用 Linux AIO,队列在内核中,由 io_submit() 提交
io_uring 用户 + 内核共享 ✅ 是 使用环形缓冲区,用户空间提交,内核空间处理
mmap / null 用户空间 ❌ 否 模拟或跳过实际 I/O,不涉及内核队列
  • 对于支持异步 I/O 的引擎(如 libaio 或 io_uring),你可以设置更高的 iodepth(如 32、64、128)来模拟高并发负载;
  • 对 SSD 或 NVMe 设备,高 iodepth 能显著提升 IOPS 和吞吐量;
  • 对机械硬盘,提升有限,但仍可用于测试调度策略和队列行为。

但如果你的 ioengine 是 sync 或 psync,这些是同步阻塞 I/O,根本不支持高并发,所以 iodepth 实际上不会生效。

参数 说明
–name=seqwrite 定义测试任务的名称为 seqwrite,用于标识输出结果
–rw=write 设置为顺序写入模式(sequential write),数据按顺序写入磁盘
–bs=1M 每次 I/O 操作的块大小为 1MB,适合测试吞吐量
–size=5G 每个线程写入的总数据量为 5GB(不是总共,是每个 job)
–numjobs=4 启动 4 个并发线程(job),模拟多线程写入场景
–iodepth=32 每个线程的 I/O 队列深度为 32,表示最多可同时挂起 32 个 I/O 请求
本例中每个线程会发起 5G/1M=5120 个 I/O 请求
–direct=1 绕过系统缓存,直接对磁盘进行读写,更真实地反映设备性能
–runtime=60 测试持续时间为 60 秒,优先于 –size,
1. 即使数据写完也继续写更多数据直到时间结束 < br>2. 如果没有写完,则时间到就结束
–group_reporting 汇总所有线程的测试结果,输出整体性能指标而不是每个线程单独显示

可能的代码实现:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
#include <libaio.h>
#include <fcntl.h>
#include <unistd.h>
#include <string.h>
#include <stdlib.h>
#include <stdio.h>

#define FILE_PATH "testfile.bin"
#define BLOCK_SIZE 4096
#define IODEPTH 4 // 控制并发请求数量

int main() {
int fd = open(FILE_PATH, O_CREAT | O_WRONLY | O_DIRECT, 0644);
if (fd < 0) {
perror("open");
return 1;
}

// io_setup 是 libaio 的函数
io_context_t ctx = 0;
if (io_setup(IODEPTH, &ctx) < 0) {
perror("io_setup");
return 1;
}

struct iocb *iocbs[IODEPTH];
struct iocb iocb_array[IODEPTH];
char *buffers[IODEPTH];

for (int i = 0; i < IODEPTH; i++) {
// 分配对齐内存
posix_memalign((void**)&buffers[i], BLOCK_SIZE, BLOCK_SIZE);
memset(buffers[i], 'A' + i, BLOCK_SIZE);

// 初始化 iocb
io_prep_pwrite(&iocb_array[i], fd, buffers[i], BLOCK_SIZE, i * BLOCK_SIZE);
iocbs[i] = &iocb_array[i];
}

// 提交所有请求
int ret = io_submit(ctx, IODEPTH, iocbs);
if (ret < 0) {
perror("io_submit");
return 1;
}

// 等待所有请求完成
struct io_event events[IODEPTH];
io_getevents(ctx, IODEPTH, IODEPTH, events, NULL);

// 清理
for (int i = 0; i < IODEPTH; i++) {
free(buffers[i]);
}
io_destroy(ctx);
close(fd);

printf("All %d I/O requests completed.\n", IODEPTH);
return 0;
}

延迟

指标 含义 描述
slat Submission Latency 从 fio 发起 I/O 请求到内核接收该请求的时间。通常很短,单位是微秒(usec)。
clat Completion Latency 从内核接收请求到 I/O 操作完成的时间。这个是最能反映存储设备性能的部分。
lat Total Latency 总延迟,即 slat + clat,表示从 fio 发起请求到 I/O 完成的整个过程。

结果分析

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
$ fio --name=seqwrite --rw=write --bs=1M --size=5G --numjobs=2 --direct=1 --runtime=60 --group_reporting
seqwrite: (g=0): rw=write, bs=(R) 1024KiB-1024KiB, (W) 1024KiB-1024KiB, (T) 1024KiB-1024KiB, ioengine=psync, iodepth=1
...
fio-3.41
Starting 2 processes
seqwrite: Laying out IO file (1 file / 5120MiB)
seqwrite: Laying out IO file (1 file / 5120MiB)
Jobs: 2 (f=2): [W(2)][100.0%][w=112MiB/s][w=112 IOPS][eta 00m:00s]
seqwrite: (groupid=0, jobs=2): err= 0: pid=70688: Sun Sep 7 22:37:47 2025
write: IOPS=111, BW=111MiB/s (117MB/s)(6685MiB/60016msec); 0 zone resets
clat (usec): min=9606, max=74763, avg=17922.82, stdev=1446.89
lat (usec): min=9628, max=74791, avg=17951.49, stdev=1446.80
clat percentiles (usec):
| 1.00th=[15008], 5.00th=[16319], 10.00th=[16909], 20.00th=[17433],
| 30.00th=[17695], 40.00th=[17695], 50.00th=[17957], 60.00th=[17957],
| 70.00th=[18220], 80.00th=[18482], 90.00th=[19006], 95.00th=[19268],
| 99.00th=[21365], 99.50th=[22414], 99.90th=[32375], 99.95th=[40109],
| 99.99th=[74974]
bw (KiB/s): min=96062, max=116736, per=100.00%, avg=114150.20, stdev=1061.35, samples=238
iops : min= 92, max= 114, avg=110.77, stdev= 1.18, samples=238
lat (msec) : 10=0.03%, 20=97.43%, 50=2.53%, 100=0.01%
cpu : usr=0.22%, sys=0.75%, ctx=6717, majf=0, minf=67
IO depths : 1=100.0%, 2=0.0%, 4=0.0%, 8=0.0%, 16=0.0%, 32=0.0%, >=64=0.0%
submit : 0=0.0%, 4=100.0%, 8=0.0%, 16=0.0%, 32=0.0%, 64=0.0%, >=64=0.0%
complete : 0=0.0%, 4=100.0%, 8=0.0%, 16=0.0%, 32=0.0%, 64=0.0%, >=64=0.0%
issued rwts: total=0,6685,0,0 short=0,0,0,0 dropped=0,0,0,0
latency : target=0, window=0, percentile=100.00%, depth=1

Run status group 0 (all jobs):
WRITE: bw=111MiB/s (117MB/s), 111MiB/s-111MiB/s (117MB/s-117MB/s), io=6685MiB (7010MB), run=60016-60016mse

🧾 测试配置解析
bash
fio –name=seqwrite –rw=write –bs=1M –size=10G –numjobs=1 –direct=1 –runtime=60 –group_reporting
参数 含义
rw=write 顺序写入
bs=1M 每次写入块大小为 1MiB
numjobs=1 单线程写入
direct=1 使用 Direct I/O,绕过页缓存
ioengine=psync 使用同步 I/O(每次 pwrite())
iodepth=1 每次只挂起一个 I/O 请求(同步模式下默认如此)
📊 性能结果概览
指标 数值 说明
IOPS 96 每秒执行 96 次写入操作
带宽 96.2 MiB/s(101 MB/s) 每秒写入约 96 MiB 数据
总写入量 5772 MiB 在 60 秒内完成的写入总量
延迟(avg clat) 10.36 ms 每次写入的平均完成时间
CPU 使用率 usr=0.46%, sys=1.23% CPU 负载极低,瓶颈不在 CPU
⏱ 延迟分布分析
50% 的写入延迟低于 9.9 ms

95% 的写入低于 12.5 ms

99.95% 的写入延迟达到了 22.9 ms

最慢的写入高达 28.2 ms

尾部延迟略高,说明偶尔会有磁盘响应变慢的情况,可能是设备内部缓存刷新或寻址造成。

📈 带宽波动情况
平均带宽:约 96 MiB/s

最小带宽:60 MiB/s

最大带宽:104 MiB/s

标准差:6.2 MiB/s → 表明带宽相对稳定,但仍有轻微波动

🧠 深层解读
✅ 为什么 IOPS ≈ 带宽(MiB/s)?
因为你设置了 bs=1M,每次写入 1MiB 数据,所以:

Code
IOPS × Block Size = Bandwidth
96 IOPS × 1 MiB = 96 MiB/s
✅ 为什么 Direct I/O?
绕过页缓存,测试的是磁盘的真实物理性能,避免被内存加速 “欺骗”。

✅ 为什么使用 psync?
psync 是同步写入,每次调用 pwrite(),适合模拟数据库或日志系统的写入行为。但它无法并发挂起多个请求,限制了吞吐。

📌 性能瓶颈分析
磁盘类型:如果是 HDD,这个结果(96 MiB/s)非常合理;如果是 SSD,则偏低,可能受限于同步 I/O 或单线程。

IO 引擎限制:psync 是阻塞式,无法发挥磁盘的并发能力。

线程数限制:只有一个线程,磁盘可能未被充分利用。

使用共享内存优化

matrix-Mul

matrixMul.cppview raw
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
/**
* Matrix multiplication (CUDA Kernel) on the device: C = A * B
* wA is A's width and wB is B's width
*/
template <int BLOCK_SIZE> __global__ void MatrixMulCUDA(float *C, float *A, float *B, int wA, int wB)
{
// Block index
int bx = blockIdx.x;
int by = blockIdx.y;

// Thread index
int tx = threadIdx.x;
int ty = threadIdx.y;

// Index of the first sub-matrix of A processed by the block
int aBegin = wA * BLOCK_SIZE * by;

// Index of the last sub-matrix of A processed by the block
int aEnd = aBegin + wA - 1;

// Step size used to iterate through the sub-matrices of A
int aStep = BLOCK_SIZE;

// Index of the first sub-matrix of B processed by the block
int bBegin = BLOCK_SIZE * bx;

// Step size used to iterate through the sub-matrices of B
int bStep = BLOCK_SIZE * wB;

// Csub is used to store the element of the block sub-matrix
// that is computed by the thread
float Csub = 0;

// Loop over all the sub-matrices of A and B
// required to compute the block sub-matrix
for (int a = aBegin, b = bBegin; a <= aEnd; a += aStep, b += bStep) {
// Declaration of the shared memory array As used to
// store the sub-matrix of A
__shared__ float As[BLOCK_SIZE][BLOCK_SIZE];

// Declaration of the shared memory array Bs used to
// store the sub-matrix of B
__shared__ float Bs[BLOCK_SIZE][BLOCK_SIZE];

// Load the matrices from device memory
// to shared memory; each thread loads
// one element of each matrix
As[ty][tx] = A[a + wA * ty + tx];
Bs[ty][tx] = B[b + wB * ty + tx];

// Synchronize to make sure the matrices are loaded
__syncthreads();

// Multiply the two matrices together;
// each thread computes one element
// of the block sub-matrix
#pragma unroll

for (int k = 0; k < BLOCK_SIZE; ++k) {
Csub += As[ty][k] * Bs[k][tx];
}

// Synchronize to make sure that the preceding
// computation is done before loading two new
// sub-matrices of A and B in the next iteration
__syncthreads();
}

// Write the block sub-matrix to device memory;
// each thread writes one element
int c = wB * BLOCK_SIZE * by + BLOCK_SIZE * bx;
C[c + wB * ty + tx] = Csub;
}

术语

内存模型(Memory Model)是定义数据一致性执行顺序规则的一组规范。

关键概念:

  1. 原子性:操作不可打断;如果同时修改,硬件会串行排队。
  2. 可见性:对其他核心(即其他线程)可见,也就是 load 指令可以读到最新值。
  3. 顺序性:禁止本线程内的指令重排序。所以可以用作同步点。

所有原子变量都满足原子性。但是其他两者不一定满足,由内存序定义。

比如原子自增 fetch_add(relaxed) 是线程安全的,但是无法使用 load 指令读到最新的结果。

缓存一致性协议 (MESI)

状态 含义
M (Modified) 缓存行已被修改,只有当前 CPU 拥有,主内存未更新
E (Exclusive) 缓存行未被修改,只有当前 CPU 拥有,与主内存一致
S (Shared) 缓存行未被修改,多个 CPU 拥有,与主内存一致
I (Invalid) 缓存行无效,必须重新从主内存加载

relaxed 内存序

术语:

  1. 保证原子性:整个操作不可被打断;不一定立即回写主内存,可能暂时把结果放在 Store Buffer 中。
  2. 不保证顺序:编译器和CPU可能对指令重排序。
  3. 不保证可见性:其他核心不一定立即可见。

说人话:

  1. 原子性:原子操作是串行的(由硬件排队)
  2. 不保证可见性:
    • load 操作是并行的,并且不保证看到最新值。
    • “可见性”是针对程序员的,因为对于 relaxed ,没有任何指令可以保证 load 拿到最新值。它可能直接从自己的缓存行中把旧值返回给你。
    • 硬件本身是知道最新值的,它会根据 MESI 来保证自己当时拿到的是最新值(但是无法通过指令告知你)。
  3. 不保证顺序:不是同步点,无法保证前后指令与程序员的编写顺序一致。
层级 机制 作用
指令级 原子指令(如 LOCK XADD) 保证操作不可打断
缓存级 MESI 协议 控制缓存行访问,避免冲突
编译器级 编译器屏障 防止指令重排
CPU级 内存屏障 保证执行顺序
高级机制 事务性内存 实现复杂原子逻辑(可选)

release-acquire 语义

指令重排序

  1. 编译器指令重排:编译器屏障防止指令重排
  2. CPU 乱序执行:内存屏障保证执行顺序。

原子指令

原子指令(如 LOCK XADD):保证操作不可打断

Load / Store Buffer

缓存行

内存屏障

位置无关代码(PIC)

  • 位置无关代码(PIC):程序可以在内存中的任意位置运行,不需要修改代码中的绝对地址。
  • 节省空间:相比使用 64 位绝对地址,RIP 相对寻址只需要一个 32 位偏移量。
  • 更安全:支持地址随机化(ASLR),提高程序的安全性。

在 x86-64 架构中,传统的绝对地址寻址方式不再适用于位置无关代码。于是引入了 RIP(指令指针)相对寻址:

假设你有一个全局变量 int x = 42;,在汇编中访问它可能会变成:

1
2
asm
mov eax, DWORD PTR [rip + offset_to_x]

这里的 offset_to_x 是编译器计算出来的 x 相对于当前指令的偏移量。

寻址方式 描述 是否位置无关
绝对地址寻址 使用固定地址,如 [0x400123] ❌ 否
寄存器间接寻址 如 [rax],地址由寄存器决定 ✅ 是
RIP 相对寻址 如 [rip + offset],相对当前指令位置 ✅ 是

但并不是所有 PIC 都用 RIP 相对寻址,PIC 的实现方式取决于:

  • 架构:在 x86(32 位)中没有 RIP 寄存器,PIC 通常通过 call 指令获取当前地址,再加偏移量。
  • 编译器策略:有些编译器会使用全局偏移表(GOT)或过程链接表(PLT)来实现位置无关性。
  • 访问目标:访问函数地址时可能通过 PLT;访问外部变量时可能通过 GOT;访问静态数据时可能用 RIP 相对寻址。
架构 是否使用 RIP 相对寻址 是否支持位置无关代码
x86-64 ✅ 常用,尤其访问数据段 ✅ 强力支持(默认启用)
x86 (32位) ❌ 无 RIP,用其他方式实现 ✅ 但需要特殊技巧

举个 gdb 调试的例子:

1
2
3
4
(gdb) x/i $rip
=> 0x2ac084b5ec10 <poll>: cmpl $0x0,0x2d939d(%rip) # 0x2ac084e37fb4 <__libc_multiple_threads>
(gdb) p (bool)$__libc_multiple_threads
true
  • cmpl $0x0, 0x2d939d(%rip) 是一条比较指令(cmp),用于将某个内存地址中的值与立即数 0 进行比较。
  • (%rip) 表示使用 RIP 相对寻址,这是 x86-64 架构中常见的一种寻址方式。
  • 实际比较的是地址 0x2ac084e37fb4 处的值,也就是 __libc_multiple_threads 这个变量。

__libc_multiple_threads 是什么?

  • 这是 GNU C 库(glibc)中的一个内部变量,用来标记当前进程是否启用了多线程。
  • 如果这个值是 0,说明当前进程是单线程。
  • 如果是非零,说明进程中有多个线程。

所以这条指令的作用是:判断当前进程是否是多线程环境,可能用于决定是否启用线程安全的行为。

为什么使用 RIP 相对寻址?

  1. RIP 是唯一始终已知的寄存器
  • 在执行指令时,CPU总是知道当前指令的地址(即 RIP)。
  • 所以可以在编译时计算出目标数据与当前指令之间的偏移量,而不需要知道数据的绝对地址。

这就允许编译器生成位置无关代码,即使程序被加载到不同的内存地址,偏移量仍然有效。

  1. 其他寄存器值是动态的,不可预测
  • 比如 RBX、RAX、RDI 等寄存器,它们的值在运行时可能被程序修改。
  • 如果用这些寄存器做基址寻址,编译器就无法提前知道它们的值,也就无法生成稳定的偏移量。
  1. 支持共享库和地址空间布局随机化(ASLR)
  • RIP 相对寻址让代码段不依赖固定地址,可以被多个进程共享。
  • 也支持操作系统在运行时随机加载地址,提高安全性(ASLR)。
  1. 节省指令空间
  • 使用 RIP 相对寻址只需要一个 32 位偏移量。
  • 如果使用绝对地址,需要嵌入完整的 64 位地址,指令长度更长,效率更低。

为什么使用 RIP 相对寻址只需要一个 32 位偏移量

在 x86-64 架构中,RIP 相对寻址的偏移量被设计为一个有符号的 32 位整数,也就是一个 displacement(位移)字段,它在机器码中只占用 4 个字节。

  • RIP 是 64 位的指令指针,表示当前指令的地址。

  • RIP 相对寻址的目标地址是通过:

    目标地址 = 下一条指令地址(RIP) + 32 位偏移量

  • 这个偏移量是一个 有符号整数,所以它的范围是:

    从 −2³¹ 到 +2³¹−1,即 ±2GB 的寻址范围。

这意味着,当前指令附近 ±2GB 范围内的任何数据都可以通过 RIP 相对寻址访问。

优点 说明
✅ 节省空间 只用 4 字节表示偏移,比使用完整 64 位地址节省指令长度
✅ 支持位置无关代码 编译器只需计算偏移,不依赖绝对地址
✅ 高效 CPU 执行时只需加法运算,无需查表或重定位
✅ 安全 支持地址空间布局随机化(ASLR),提高安全性

为什么可以被多个进程共享?

因为代码中不再硬编码具体地址,多个进程可以:

  • 使用同一份物理内存中的代码段。
  • 每个进程有自己的数据段,但共享同一份只读代码。

这大大节省了内存,提高了系统效率。

举个例子:

进程 加载地址 使用的代码段
A 0x400000 使用共享代码段
B 0x500000 使用共享代码段

两者的代码段内容完全一样,因为里面的寻址是相对 RIP 的,不依赖于加载地址。

为什么绝对寻址不可以被多进程共享?

  • 每个进程的虚拟地址空间是独立的
    • 操作系统为每个进程分配独立的虚拟地址空间。
    • 即使两个进程都加载了同一个程序,它们的地址空间可能完全不同。
    • 如果代码中使用绝对地址,加载到不同地址空间后,这些地址就不再有效。

所以,绝对地址在一个进程中是有效的,在另一个进程中可能就指向错误的地方或根本不存在。

  • 需要重定位,无法直接共享物理页

    • 如果使用绝对地址,操作系统必须在每个进程加载时对代码进行“重定位”,修改指令中的地址。
    • 一旦修改,代码段就变成了进程私有,不能共享同一份物理内存。
    • 而位置无关代码(如使用 RIP 相对寻址)不需要修改,可以直接映射到多个进程的地址空间。
  • 违反共享库的设计原则

    • 动态链接库(如 .so.dll)的核心优势就是可以被多个进程共享。
    • 如果库中使用绝对地址,每个进程都要有自己的副本,失去了共享的意义。
    • 正确做法是使用位置无关代码(PIC),让库在任意地址都能运行。
区域 是否可共享 原因说明
代码段 ✅ 是 只读 + 位置无关,多个进程可映射同一物理页
数据段 ❌ 否 每个进程的数据不同,需独立副本
❌ 否 动态分配,地址空间不同
❌ 否 私有调用栈,不能混用
共享内存段 ✅ 是 显式创建,专门用于共享

如果你想深入了解某个进程的内存布局,可以分析 /proc/[pid]/maps 或用工具如 pmapvmmap

1. 前言

我们在调试 release 版本的程序时,由于缺乏符号信息,所以需要通过寄存器来查看函数的参数、返回值等。

2. 寄存器

2.1. 通用寄存器 (General Purpose Registers)

寄存器名 英文名称 作用
rax Accumulator 累加器,通常用于算术运算和函数返回值存储。
rbx Base 基址寄存器,常用于存储数据或指针。
rsi Source Index 源索引寄存器,常用于字符串操作中的源地址指针(函数第一个参数)。
rdi Destination Index 目标索引寄存器,常用于字符串操作中的目标地址指针或结构体指针(函数第二个参数)。
rdx Data 数据寄存器,常用于 I/O 操作或乘除法运算中的扩展数据存储(函数第三个参数)。
rcx Counter 计数器寄存器,常用于循环计数或字符串操作中的计数(函数第四个参数)。
rsp Stack Pointer 栈指针寄存器,指向当前栈顶。
rbp Base Pointer 基址指针寄存器,指向当前栈帧的基址。
r8~r15 General Purpose 通用寄存器,扩展的 64 位寄存器之一,用于存储数据或指针(r8r9 常用于保存函数第五六个参数)。

2.2. 特殊用途寄存器 (Special Purpose Registers)

寄存器名 英文名称 作用
rip Instruction Pointer 指令指针寄存器,存储当前执行指令的地址。
rflags Flags 标志寄存器,存储状态标志位(如进位、溢出、零标志等)。

2.3. 段寄存器 (Segment Registers)

寄存器名 英文名称 作用
cs Code Segment 代码段寄存器,指向当前代码段的基址。
ds Data Segment 数据段寄存器,指向当前数据段的基址。
es Extra Segment 额外段寄存器,指向额外数据段的基址。
fs FS Segment 特殊用途段寄存器,常用于线程本地存储等。
gs GS Segment 特殊用途段寄存器,常用于线程本地存储等。
ss Stack Segment 栈段寄存器,指向当前栈段的基址。

2.4. 浮点与向量寄存器 (Floating Point and Vector Registers)

寄存器名 英文名称 作用
xmm0-xmm15 SIMD Registers 用于 SSE 指令集的 128 位向量运算。
ymm0-ymm15 AVX Registers 用于 AVX 指令集的 256 位向量运算。
zmm0-zmm31 AVX-512 Registers 用于 AVX-512 指令集的 512 位向量运算。

2.5. 函数调用时的参数传递

在 x86_64 架构中,函数调用时的参数传递遵循 System V AMD64 ABI(Linux/Unix 系统的标准调用约定)。

前六个整数或指针类型的参数依次存储在以下寄存器中:

  1. rdi - 第一个参数
  2. rsi - 第二个参数
  3. rdx - 第三个参数
  4. rcx - 第四个参数
  5. r8 - 第五个参数
  6. r9 - 第六个参数

对于浮点类型的参数(如 floatdouble),前八个参数存储在以下 SSE 寄存器 中:

  1. xmm0 - 第一个浮点参数
  2. xmm1 - 第二个浮点参数
  3. xmm2 - 第三个浮点参数
  4. xmm3 - 第四个浮点参数
  5. xmm4 - 第五个浮点参数
  6. xmm5 - 第六个浮点参数
  7. xmm6 - 第七个浮点参数
  8. xmm7 - 第八个浮点参数

溢出参数(超过寄存器数量)会依次存储在 中:

  • 超过寄存器数量(整数参数超过 6 个,浮点参数超过 8 个)的参数会依次压入栈中。
  • 栈需要保持 16 字节对齐,可能会插入填充字节。
  • 可以通过访问栈指针(rsp)或基址指针(rbp)来找到栈上的参数。
    • 使用 rsp(栈指针)
      • 在函数入口时,rsp 指向栈顶(即返回地址的下一个位置)。
      • 栈上的第一个参数位于 [rsp + 8](跳过返回地址)。
      • 第二个参数位于 [rsp + 16],依此类推。
    • 使用 rbp(基址指针)
      • 如果函数使用了帧指针(rbp),rbp 通常指向调用者的栈帧基址。
      • 栈上的第一个参数位于 [rbp + 16](跳过返回地址和保存的 rbp)。
      • 第二个参数位于 [rbp + 24],依此类推。

3. 栈

3.1. 理解栈布局

在函数调用时,栈的布局通常如下(从高地址到低地址):

  1. 返回地址:调用函数时,call 指令会将返回地址(下一条指令的地址)压入栈中。
  2. 溢出参数:如果参数超过寄存器数量,多余的参数会依次压入栈中。
  3. 栈对齐填充:为了满足 16 字节对齐要求,可能会有额外的填充字节。
  4. 局部变量和保存的寄存器:函数内部可能会在栈上分配空间用于局部变量或保存调用者的寄存器。

3.2. 函数调用时的压栈过程

在 x86_64 架构中,函数调用时会涉及到栈的操作,包括压栈和出栈。这些操作主要用于保存调用者的上下文(如
返回地址、寄存器值)以及为被调用函数分配栈帧。

3.2.1. 调用者(Caller)的操作

  1. 压入返回地址 当调用者使用 call 指令调用函数时,CPU 会自动将返回地址(下一条指令的地址)压入
    栈中。此时,rsp(栈指针)会减少 8 字节(64 位系统)。

    1
    2
    3
    4
    call function
    # 等价于:
    push rip ; 将返回地址压入栈
    jmp function
  2. 压入溢出参数(如果有) 如果函数的参数超过了寄存器数量(整数参数超过 6 个,浮点参数超过 8 个)
    ,多余的参数会从右到左依次压入栈中。rsp 会随着每个参数的压入减少。

  3. 对齐栈 为了满足 16 字节对齐 的要求,调用者可能会插入额外的填充字节,使得 rsp 在调用函数
    前保持 16 字节对齐。

3.2.2. 被调用者(Callee)的操作

  1. 保存调用者的栈帧基址 被调用者通常会保存调用者的栈帧基址(rbp),以便在函数返回时恢复调用者
    的栈帧。

    1
    2
    push rbp       ; 保存调用者的 rbp
    mov rbp, rsp ; 设置当前函数的栈帧基址
  2. 分配栈空间 被调用者会根据函数内部局部变量的需求,在栈上分配空间。rsp 会减少相应的字节数。

    1
    sub rsp, <size>  ; 为局部变量分配栈空间

3.3. 2. 函数返回时的出栈过程

3.3.1. 被调用者(Callee)的操作

  1. 释放局部变量的栈空间 被调用者在返回前会释放为局部变量分配的栈空间。

    1
    add rsp, <size>  ; 恢复 rsp
  2. 恢复调用者的栈帧基址 被调用者会恢复调用者的 rbp,以确保调用者的栈帧完整。

    1
    pop rbp  ; 恢复调用者的 rbp
  3. 返回到调用者 被调用者使用 ret 指令从栈中弹出返回地址,并跳转到该地址。

    1
    ret  ; 等价于:pop rip

3.3.2. 调用者(Caller)的操作

  1. 清理栈上的参数(如果需要) 如果调用约定要求调用者清理栈上的参数(如 cdecl 调用约定),调用
    者会调整 rsp
    1
    add rsp, <size>  ; 清理栈上的参数

3.4. 3. 栈指针(rsp)和基址指针(rbp)的变化

以下是一个函数调用的栈布局示例:

C 代码

1
2
3
4
5
6
7
8
void example(int a, int b) {
int x = a + b;
}

int main() {
example(1, 2);
return 0;
}

汇编代码(简化版)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
# main 函数
main:
sub rsp, 16 ; 对齐栈
mov edi, 1 ; 第一个参数 -> rdi
mov esi, 2 ; 第二个参数 -> rsi
call example ; 调用 example 函数
add rsp, 16 ; 恢复栈
ret

# example 函数
example:
push rbp ; 保存调用者的 rbp
mov rbp, rsp ; 设置当前栈帧基址
sub rsp, 16 ; 为局部变量分配栈空间
mov eax, edi ; a -> eax
add eax, esi ; a + b
leave ; 恢复栈帧(等价于:mov rsp, rbp; pop rbp)
ret ; 返回调用者

栈布局变化 | 操作 | rsp 变化 | 栈内容(从高地址到低地址) | | ————– | ———– |
————————– | | call example | rsp -= 8 | 返回地址 | | push rbp | rsp -= 8 |
保存调用者的 rbp | | sub rsp, 16 | rsp -= 16 | 为局部变量分配空间 | | leave | rsp += 16 |
释放局部变量空间 | | ret | rsp += 8 | 弹出返回地址 |

3.5. 总结

  1. 函数调用的栈操作
    • 调用者负责压入返回地址和溢出参数。
    • 被调用者负责保存 rbp 和分配局部变量空间。
    • 函数返回时,释放局部变量空间并恢复调用者的栈帧。
  2. rsprbp 的变化
    • rsp 指向栈顶,动态变化。
    • rbp 指向栈帧基址,通常固定不变。

4. 使用 gdb 查看寄存器

1
2
3
4
5
6
7
8
9
10
11
12
13
# 查看所有寄存器
info registers

# 查看指定寄存器
info registers rdi
# 或简写
i r rdi

# 查看十六进制
p/x $rdx # 十六进制
p/d $rdx # 十进制
# 或简写
p $rdx

其中,info registers 会打印三列:

  • 第一列:寄存器名称
  • 第二列:寄存器的值(十六进制)
  • 第三列:寄存器的值(十进制;也可能是十六进制,用 0x 开头)

info registers rdip $rdi 效果相同。

从寄存器查到的内存地址,可以用 x (examinze)命令来查看内存的值:

1
2
3
4
5
6
7
8
9
10
# 查看指令
x/i $rip
# 查看栈顶
x/16x $rsp

# 查看内存
p $rdi
x/2gx $rdi
# 或先用 $rdi 查出内存地址,直接用地址访问
x/2gx 47926411878160

x 命令的说明:

1
x/FMT ADDRESS

其中:

  • x:表示“examine memory”(查看内存)
  • 2:数字,表示要查看的单元数
  • g:表示每个单元的 size,有 b(byte), h(halfword), w(word), g(giant, 8 bytes)
  • x:表示值的格式,有 o(octal), x(hex), d(decimal), u(unsigned decimal), t(binary), f(float),
    a(address), i(instruction), c(char), s(string) and z(hex, zero padded on the left).

在 gdb 命令行中使用 help 命令,可以查看命令的说明。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
(gdb) help x
Examine memory: x/FMT ADDRESS.
ADDRESS is an expression for the memory address to examine.
FMT is a repeat count followed by a format letter and a size letter.
Format letters are o(octal), x(hex), d(decimal), u(unsigned decimal),
t(binary), f(float), a(address), i(instruction), c(char), s(string)
and z(hex, zero padded on the left).
Size letters are b(byte), h(halfword), w(word), g(giant, 8 bytes).
The specified number of objects of the specified size are printed
according to the format. If a negative number is specified, memory is
examined backward from the address.

Defaults for format and size letters are those previously used.
Default count is 1. Default address is following last thing printed
with this command or "print".

5. 使用 gdb 查看栈

  • bt
  • frame
  • args
  • locals
  • x

TDODO

5.1. frame 与寄存器的值

  • GDB 中的寄存器值(如 $rax, $rdi, $rsp 等)是当前 CPU 执行上下文的快照。
  • 当你切换到 frame 0(最内层栈帧)时,寄存器值是最真实的,因为这是程序当前正在执行的地方。
  • 当你切换到 外层栈帧(frame 1, 2, …)时,GDB 会尝试还原当时的寄存器状态,但这依赖于:
    • 编译器是否保存了寄存器值(如 callee-saved)
    • 是否有调试符号或 unwind 信息
    • GDB 是否能推断出寄存器的保存位置

寄存器值可能出现的情况

情况 表现
寄存器是 caller-saved(如 rdi, rsi, rax) 可能显示 或错误值
寄存器是 callee-saved(如 rbx, rbp, r12~r15) 通常能正确还原
没有调试信息或优化严重 GDB 无法还原,显示当前值或

建议

  • 如果你要分析寄存器状态,最好在 frame 0 或断点处进行。
  • 如果你在分析 core dump 或栈破坏问题,寄存器值只能作为参考,不要完全依赖外层 frame 的寄存器快照。
  • 使用 info args 和 info locals 更可靠地查看参数和局部变量(如果有符号信息)。

6. 在特定线程中设置断点

6.1. 断点只作用于某线程

1
2
3
4
5
6
7
8
9
10
11
12
13
# 查看所有线程 ID 和当前线程 ID(gdb 中会使用 * 标注当前线程)
(gdb) info threads
# 切换当前上下文到指定线程
(gdb) thread <THREAD_ID>
# 通过查看当前堆栈是不是自己要断点的线程
(gdb) bt
(gdb) break LOCATION thread THREADNUM
# 条件断点
(gdb) break source.c:123 thread 5 if fds[0].fd == 7
# 如果没有 debug 符号,可以利用函数返回值寄存器断点
(gdb) break poll thread 2 if $rdx > 0
# 完整格式
break [PROBE_MODIFIER] [LOCATION] [thread THREADNUM] [if CONDITION]

6.2. 锁定调度器,只让当前线程运行

默认情况下,GDB 会让所有线程一起运行(比如你执行 continue 时)。如果你只想让当前线程运行,其它线程保
持暂停,可以使用:

1
(gdb) set scheduler-locking on

这表示:只有当前线程会执行,其他线程全部暂停。

其中模式还有:

模式 说明
off 默认值,所有线程都可以运行
on 只有当前线程运行,其他线程暂停
step 单步调试时只运行当前线程,continue 时其他线程也会运行

你可以随时切换:

1
(gdb) set scheduler-locking step
  • 如果你在调试死锁、竞态或线程间通信问题,锁定调度器是非常有效的方式。
  • 如果你在调试某个 poll() 或 epoll_wait() 调用,只想观察某个线程的行为,可以结合 catch syscall 和
    thread 命令一起使用。

7. 查看汇编代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
# 查看汇编代码,其中 "=>" 标记的是当前执行位置
(gdb) disassemble
# 反汇编指定地址范围
# 这会显示从当前指令开始的 32 字节范围内的汇编代码。
(gdb) disassemble $rip, $rip+32

# 查看当前指令(x86)
(gdb) x/i $pc
# 或在 x86-64 架构下:
(gdb) x/i $rip

# Bonus: 默认 GDB 使用 AT&T 风格(如 %rax),你可以切换为 Intel 风格
# 这样输出会更接近你在汇编教材或 IDA Pro 中看到的格式
(gdb) set disassembly-flavor intel

8. 位置无关代码(PIC)

8.1. 什么是 PIC

  • PIC(Position Independent Code,位置无关代码)是一种编译方式,使得生成的代码可以在内存中的任意
    位置运行,而无需硬编码绝对地址。
  • 在动态链接库(shared libraries)中,通常需要使用 PIC,以便库可以被加载到任意内存地址。
  • 节省空间:相比使用 64 位绝对地址,RIP 相对寻址只需要一个 32 位偏移量。
  • 更安全:支持地址随机化(ASLR),提高程序的安全性。

8.2. PIC 的实现

  1. 访问全局变量 在 PIC 模式下,代码通过 全局偏移表(GOT, Global Offset Table)过程链接
    表(PLT, Procedure Linkage Table)
    访问全局变量和函数地址。

  2. 寄存器 rip 的使用 x86_64 支持基于 rip(指令指针)的寻址方式,PIC 会利用 rip 相对寻址来
    访问全局变量或函数地址,而不是使用绝对地址。

在 x86-64 架构中,传统的绝对地址寻址方式不再适用于位置无关代码。于是引入了 RIP(指令指针)相对寻址:

假设你有一个全局变量 int x = 42;,在汇编中访问它可能会变成:

1
2
asm
mov eax, DWORD PTR [rip + offset_to_x]

这里的 offset_to_x 是编译器计算出来的 x 相对于当前指令的偏移量。

寻址方式 描述 是否位置无关
绝对地址寻址 使用固定地址,如 [0x400123] ❌ 否
寄存器间接寻址 如 [rax],地址由寄存器决定 ✅ 是
RIP 相对寻址 如 [rip + offset],相对当前指令位置 ✅ 是

8.3. PIC 的优化

  • 减少重定位:通过 rip 相对寻址,避免了加载时的重定位操作,提高了加载速度。
  • 共享内存:多个进程可以共享同一段动态库代码,而无需为每个进程生成独立的副本。

8.4. 示例

1
2
mov rax, [rip + global_var@GOTPCREL]  ; 通过 GOT 表访问全局变量
call [rip + func@PLT] ; 通过 PLT 表调用函数

举个 gdb 调试的例子:

1
2
3
4
(gdb) x/i $rip
=> 0x2ac084b5ec10 <poll>: cmpl $0x0,0x2d939d(%rip) # 0x2ac084e37fb4 <__libc_multiple_threads>
(gdb) p (bool)$__libc_multiple_threads
true
  • cmpl $0x0, 0x2d939d(%rip) 是一条比较指令(cmp),用于将某个内存地址中的值与立即数 0 进行比较
  • (%rip) 表示使用 RIP 相对寻址,这是 x86-64 架构中常见的一种寻址方式。
  • 实际比较的是地址 0x2ac084e37fb4 处的值,也就是 __libc_multiple_threads 这个变量。

__libc_multiple_threads 是什么?

  • 这是 GNU C 库(glibc)中的一个内部变量,用来标记当前进程是否启用了多线程。
  • 如果这个值是 0,说明当前进程是单线程。
  • 如果是非零,说明进程中有多个线程。

所以这条指令的作用是:判断当前进程是否是多线程环境,可能用于决定是否启用线程安全的行为。

8.5. 为什么使用 RIP 相对寻址?

  1. RIP 是唯一始终已知的寄存器
  • 在执行指令时,CPU 总是知道当前指令的地址(即 RIP)。
  • 所以可以在编译时计算出目标数据与当前指令之间的偏移量,而不需要知道数据的绝对地址。

这就允许编译器生成位置无关代码,即使程序被加载到不同的内存地址,偏移量仍然有效。

  1. 其他寄存器值是动态的,不可预测
  • 比如 RBX、RAX、RDI 等寄存器,它们的值在运行时可能被程序修改。
  • 如果用这些寄存器做基址寻址,编译器就无法提前知道它们的值,也就无法生成稳定的偏移量。
  1. 支持共享库和地址空间布局随机化(ASLR)
  • RIP 相对寻址让代码段不依赖固定地址,可以被多个进程共享。
  • 也支持操作系统在运行时随机加载地址,提高安全性(ASLR)。
  1. 节省指令空间
  • 使用 RIP 相对寻址只需要一个 32 位偏移量。
  • 如果使用绝对地址,需要嵌入完整的 64 位地址,指令长度更长,效率更低。

8.6. 为什么使用 RIP 相对寻址只需要一个 32 位偏移量

在 x86-64 架构中,RIP 相对寻址的偏移量被设计为一个有符号的 32 位整数,也就是一个 displacement(位移
)字段,它在机器码中只占用 4 个字节。

  • RIP 是 64 位的指令指针,表示当前指令的地址。

  • RIP 相对寻址的目标地址是通过:

    目标地址 = 下一条指令地址(RIP) + 32 位偏移量

  • 这个偏移量是一个 有符号整数,所以它的范围是:

    从 −2³¹ 到 +2³¹−1,即 ±2GB 的寻址范围。

这意味着,当前指令附近 ±2GB 范围内的任何数据都可以通过 RIP 相对寻址访问。

优点 说明
✅ 节省空间 只用 4 字节表示偏移,比使用完整 64 位地址节省指令长度
✅ 支持位置无关代码 编译器只需计算偏移,不依赖绝对地址
✅ 高效 CPU 执行时只需加法运算,无需查表或重定位
✅ 安全 支持地址空间布局随机化(ASLR),提高安全性

8.7. 为什么可以被多个进程共享?

因为代码中不再硬编码具体地址,多个进程可以:

  • 使用同一份物理内存中的代码段。
  • 每个进程有自己的数据段,但共享同一份只读代码。

这大大节省了内存,提高了系统效率。

举个例子:

进程 加载地址 使用的代码段
A 0x400000 使用共享代码段
B 0x500000 使用共享代码段

两者的代码段内容完全一样,因为里面的寻址是相对 RIP 的,不依赖于加载地址。

为什么绝对寻址不可以被多进程共享?

  • 每个进程的虚拟地址空间是独立的
    • 操作系统为每个进程分配独立的虚拟地址空间。
    • 即使两个进程都加载了同一个程序,它们的地址空间可能完全不同。
    • 如果代码中使用绝对地址,加载到不同地址空间后,这些地址就不再有效。

所以,绝对地址在一个进程中是有效的,在另一个进程中可能就指向错误的地方或根本不存在。

  • 需要重定位,无法直接共享物理页

    • 如果使用绝对地址,操作系统必须在每个进程加载时对代码进行“重定位”,修改指令中的地址。
    • 一旦修改,代码段就变成了进程私有,不能共享同一份物理内存。
    • 而位置无关代码(如使用 RIP 相对寻址)不需要修改,可以直接映射到多个进程的地址空间。
  • 违反共享库的设计原则

    • 动态链接库(如 .so.dll)的核心优势就是可以被多个进程共享。
    • 如果库中使用绝对地址,每个进程都要有自己的副本,失去了共享的意义。
    • 正确做法是使用位置无关代码(PIC),让库在任意地址都能运行。
区域 是否可共享 原因说明
代码段 ✅ 是 只读 + 位置无关,多个进程可映射同一物理页
数据段 ❌ 否 每个进程的数据不同,需独立副本
❌ 否 动态分配,地址空间不同
❌ 否 私有调用栈,不能混用
共享内存段 ✅ 是 显式创建,专门用于共享

如果你想深入了解某个进程的内存布局,可以分析 /proc/[pid]/maps 或用工具如 pmapvmmap

9. 实际 debug 例子:在多线程中查看 poll 的事件

先复习下 poll 函数:

1
2
3
4
5
6
7
8
int poll(struct pollfd *fds, nfds_t nfds, int timeout);

// 第一个参数 fds 的类型
struct pollfd {
int fd; /* file descriptor */
short events; /* requested events */
short revents; /* returned events */
};

FIXME:这种在汇编代码 ret 前断点,并依据 raxrdi 设置条件断点的方式不可靠,因为可能进入了
libc 层。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
# 查看 polll 的汇编代码
(gdb) disass poll
Dump of assembler code for function poll:
0x00002ac084b5ec10 <+0>: cmpl $0x0,0x2d939d(%rip) # 0x2ac084e37fb4 <__libc_multiple_threads>
0x00002ac084b5ec17 <+7>: jne 0x2ac084b5ec29 <poll+25>
0x00002ac084b5ec19 <+0>: mov $0x7,%eax
0x00002ac084b5ec1e <+5>: syscall
0x00002ac084b5ec20 <+7>: cmp $0xfffffffffffff001,%rax
0x00002ac084b5ec26 <+13>: jae 0x2ac084b5ec59 <poll+73>
0x00002ac084b5ec28 <+15>: ret
0x00002ac084b5ec29 <+25>: sub $0x8,%rsp
0x00002ac084b5ec2d <+29>: call 0x2ac084b77600 <__libc_enable_asynccancel>
0x00002ac084b5ec32 <+34>: mov %rax,(%rsp)
0x00002ac084b5ec36 <+38>: mov $0x7,%eax
0x00002ac084b5ec3b <+43>: syscall
0x00002ac084b5ec3d <+45>: mov (%rsp),%rdi
0x00002ac084b5ec41 <+49>: mov %rax,%rdx
0x00002ac084b5ec44 <+52>: call 0x2ac084b77660 <__libc_disable_asynccancel>
0x00002ac084b5ec49 <+57>: mov %rdx,%rax
0x00002ac084b5ec4c <+60>: add $0x8,%rsp
0x00002ac084b5ec50 <+64>: cmp $0xfffffffffffff001,%rax
0x00002ac084b5ec56 <+70>: jae 0x2ac084b5ec59 <poll+73>
=> 0x00002ac084b5ec58 <+72>: ret
0x00002ac084b5ec59 <+73>: mov 0x2d31f0(%rip),%rcx # 0x2ac084e31e50
0x00002ac084b5ec60 <+80>: neg %eax
0x00002ac084b5ec62 <+82>: mov %eax,%fs:(%rcx)
0x00002ac084b5ec65 <+85>: or $0xffffffffffffffff,%rax
0x00002ac084b5ec69 <+89>: ret

# 找到所有的 ret 指令,设置条件断点
# 注意:最好是在 ret 指令之前的指令上也加上断点,
# 因为 ret 的时候,可能已经把当前栈(除 rsp / rbp 外)都弹出了,寄存器中将看不到当前栈的信息
#
# $rax 是返回值寄存器,也就是返回大于 0 时,让进程暂停
# 这里的 * 表示取内存的值(存放的是指令),* 断不可少,不然会被认为是 Function name
#
(gdb) b *0x00002ac084b5ec26 thread 4 if $rax > 0
(gdb) b *0x00002ac084b5ec28 thread 4 if $rax > 0
(gdb) b *0x00002ac084b5ec56 thread 4 if $rax > 0
(gdb) b *0x00002ac084b5ec58 thread 4 if $rax > 0
(gdb) b *0x00002ac084b5ec65 thread 4 if $rax > 0
(gdb) b *0x00002ac084b5ec69 thread 4 if $rax > 0

# 继续运行
(gdb) c

# 当 IO 事件发生,程序会被暂停

其他工具

  • addr2line

日志:

1
Thread 0 (crashed) 0 libexample.so + 0x36406
1
addr2line -e libexample.so 0x36406

或用 gdb :

1
2
gdb libexample.so
(gdb) info line *0x36406

源码

https://github.com/dmtcp/dmtcp/tree/main/jalib

malloc 的特性和局限

malloc/free 是操作系统(或 C 库)提供的通用堆分配器。

  • 它通常会采用 “内存池 + 分块 + 空闲链表” 等技术,但它为了通用性和线程安全,设计得很复杂,开销较大。
  • 在高并发 / 频繁小块分配释放的场景下,malloc 的性能和碎片控制未必理想。

malloc 很难让你:

  • 控制分配内存的位置(如 DMTCP 需要特殊内存区域)
  • 统计 / 追踪所有分配的内存块
  • 实现定制的分配策略(如无锁、分层小块池、预扩展等)
  • 轻松调试和隔离内存问题

内存碎片

malloc 的确在其实现内部也维护着自己的 “内存池”,并且会对小块内存(small bins/tcache/fast bins 等)做优化和分组管理。比如在 glibc 的 malloc(ptmalloc)中,就有针对小块内存的快速分配机制。

  1. malloc 是 “通用分配器”
    malloc 需要支持所有应用场景,包括大 / 小 / 奇异尺寸的分配、跨多线程、兼容各种系统调用和 ABI。
    为了兼容性和健壮性,malloc 实现复杂,包含很多额外的元数据和检查,导致分配 / 释放开销更大。

  2. malloc 的小块管理是 “全局的”
    malloc 管理的小块是全进程共享的,所有线程 / 模块都会竞争同一套管理结构(如 fastbin、tcache、small bin)。
    在高并发、频繁小块分配 / 释放的场景下,锁竞争和同步成本变高,可能成为性能瓶颈。

  3. 自定义分配器(如 jalib)“更窄、更专用”
    jalib 只服务 DMTCP 内部的特殊内存分配需求,只关注固定几种典型的小块尺寸(如 64/256/1024…)。
    可以用更简单、更高效的 “无锁链表 + 内存对齐块” 来管理池,分配和释放几乎都是 O(1)的原子操作。
    不需要兼容所有 malloc 的场景(如 realloc、跨模块释放等),所以能极致优化。

  4. 控制权和可观测性
    jalib 可以完全掌控池的生命周期、分配区域、分配策略(如预扩展、定制回收),还可以追踪统计、调试。
    malloc 的内部状态你无法直接控制或感知,也无法方便地和 DMTCP 的 checkpoint、回滚等功能集成。

  5. 内存碎片和确定性
    专用分配器能保证分配块 “定长、对齐”,几乎无碎片,分配和回收都是确定性的。
    malloc 需要兼容各种尺寸,碎片和内存抖动不可避免。

jalib(自定义分配器)的设计动机

性能优化

  • DMTCP 频繁地分配和释放小块内存(如元数据、临时缓存等),如果每次都用 malloc,性能损耗大。
  • jalib 采用分级固定块池,每次分配 / 释放只需操作链表和原子变量,比 malloc 更快、更少碎片。

线程安全的高效实现

  • jalib 用无锁(128 位 CAS)或轻量级互斥方案,适合高并发分配 / 释放。
  • malloc 虽然线程安全,但实现方式更重,适用范围更广,未必最优。

可控性和可追踪性

  • jalib 可以统计分配次数、追踪所有内存池区,方便调试、分析和 checkpoint 恢复。
  • 可根据实际需求预分配或批量扩展,避免运行时大规模内存抖动。

适应 DMTCP 的特殊需求

  • DMTCP 需要在 checkpoint/restore 时管理所有内存区域,jalib 可以定制分配区域、分配方式,malloc 无法满足。
  • 可实现特定平台的优化,如 mmap 固定地址分配等。

故障隔离和调试

  • jalib 可以在有 bug 或内存泄漏时,帮助定位具体的分配 / 释放流程。
  • 可以方便地记录所有 arena 信息,甚至实现特殊的调试模式。

总结

虽然 malloc 也是内存池管理,但它是为通用用途设计的,不能满足 DMTCP 这类高性能、高可控性、特殊内存管理需求场景。自定义 jalib 分配器可以更高效地管理小块内存,优化多线程性能,便于调试和适配特定需求。

可以归纳为三点:

  • 性能更高,碎片更少
  • 更好地适应 DMTCP 的需求
  • 更易于调试和控制

jalloc 设计思路

多层级固定大小块分配(层级分配器)

  • 设计了 5 个分配层级(lvl1~lvl5),每层负责不同大小的定长内存块(如 64、256、1024、4096、16384 字节)。
  • 小于等于这 5 个等级的分配请求,会被分配到各自的层级。
  • 超过最大层级的请求,则直接调用 _alloc_raw(通常是 mmap)。

优点:

  • 小块内存可以复用,减少系统调用和碎片。
  • 大块内存仍可直接用系统接口,兼顾通用性。

固定块分配器 JFixedAllocStack

每个层级对应一个 JFixedAllocStack<N>,其核心是无锁栈式管理:

  • 内部维护一个空闲块栈(LIFO 链表)。
  • allocate 时从栈取出一个空闲块,栈空时调用 expand 申请一批新块。
  • deallocate 时将块归还到栈顶。

核心技术点

  • 原子双字比较交换(128 位 CAS)

    为了线程安全,栈顶指针 _top 需要原子更新。这里用到了 128 位 CAS(Compare-And-Swap),保证 node 指针和计数器同时更新,避免 ABA 问题。

  • CAS 不可用时的降级方案

    对于不支持 128 位原子操作的平台,采用 futex+memcpy 的方式手动实现互斥和原子性。

线程安全设计

分配和释放都用原子操作保护,无需锁,性能高。
多线程环境下不会出现竞争条件或内存破坏。

Arena 记录和调试

分配的内存区域(arena)可以记录到全局数组中,方便调试和统计。
通过 JAlloc::getAllocArenas() 可获得分配区域列表。

全局 new/delete 重载(可选)

如果定义了 OVERRIDE_GLOBAL_ALLOCATOR,会重载 operator newoperator delete,让全局 new/delete 也用这个分配器。

灵活切换

可以通过宏 JALIB_ALLOCATOR 切换:

  • 启用时用自定义分配器
  • 否则回退为标准 malloc/free

总结

本内存分配器的设计核心在于:

  • 采用多级固定块内存池 + 无锁算法,高效服务于小块高频分配 / 释放;
  • 通过 128 位原子操作或 futex 确保并发安全,适用多平台;
  • 提供 arena 管理和统计,方便调试与维护;
  • 兼容传统分配方式,易于集成和切换。

这种设计非常适合像 DMTCP 这样对性能和内存管理有特殊要求的系统级软件。

Memory

Memory = 存储 + 访问逻辑

存储

  • 在仿真里,memory 本质上就是一个数组(Array)或者向量(Vector),每个元素对应一个存储单元(bit/byte/word)。
  • 例:一个 8 位 × 1024 深度的 RAM,可以在仿真里用 uint8_t mem[1024]; 表示。

访问逻辑

  • 读(Read):根据地址返回对应的数据。
  • 写(Write):根据地址和写使能信号,将数据写入存储单元。
  • 可能涉及 时序:同步(clock 边沿写入)或异步(立即生效)。

时序和延迟

  • 在硬件里,memory 访问不是瞬间的:存在 读延迟、写延迟。
  • 仿真时,可以用 延时事件 或 clock 边沿触发 来模拟。

仿真代码

功能说明

  • 多端口读写:支持同时多个端口访问 memory
  • 写冲突仲裁:写优先策略或延迟写,可扩展读优先 / 轮询
  • 读延迟 pipeline:延迟由 read_delay 控制
  • Burst / wrap-around:访问超出末尾自动回绕
  • 简单 Cache/Tag:模拟命中 / 未命中
  • 异步端口:不同端口调用 read/write 可在不同 tick,模拟异步时钟
  • 随机 bit flip / SEU:1% 概率错误注入
  • 统计与功耗估算基础:记录读写次数、命中数、平均延迟
EdaMemoryFull.cppview raw
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
#include <iostream>
#include <vector>
#include <queue>
#include <functional>
#include <random>
#include <map>
#include <algorithm>
#include <fstream>

struct Event {
int time;
std::function<void()> action;
bool operator<(const Event& other) const { return time > other.time; }
};

class EdaMemoryFull {
std::vector<uint8_t> mem;
std::priority_queue<Event> event_queue;
int sim_time = 0;
int read_delay;
std::mt19937 gen;

int cache_size;
struct CacheLine { bool valid=false; int tag=-1; };
std::vector<CacheLine> cache;

int max_bus_access_per_cycle = 2;
int current_cycle_access = 0;

// 统计信息
std::map<int,int> read_count, write_count, read_hits, write_hits;
std::map<int,int> read_delay_total;
int dynamic_power = 0; // 动态功耗
int static_power; // 静态功耗
std::vector<int> dynamic_power_per_cycle; // 每周期动态功耗

// 动态功耗公式(单位:动态功耗单位 = 每bit切换一次算1)
//
// P_dyn = α * C * V^2 * f
// 其中,α 开关活动因子(switching activity)
// C 负载电容
// V 电源电压
// f 时钟频率
// 简化为每次 bit 改变增加一个单位动态功耗
//
// 静态功耗公式(单位:静态功耗单位 = 每8bit存储单元算1)
//
// P_static = I_leak * V * N
// 简化为每8bit存储单元增加1单位静态功耗
//
// 总功耗
//
// P_total = P_dyn + P_static

public:
enum WritePriority { WRITE_FIRST, READ_FIRST, ROUND_ROBIN } write_prio;

EdaMemoryFull(size_t size, int delay, int c_size, WritePriority prio=WRITE_FIRST)
: mem(size,0), read_delay(delay), gen(std::random_device{}()),
cache_size(c_size), cache(c_size), static_power(size/8), write_prio(prio)
{
std::uniform_int_distribution<> dis(0,255);
for(auto &v: mem) v = dis(gen);
}

void write(int port,int addr,const std::vector<uint8_t>& data){
write_count[port]++;
if(current_cycle_access>=max_bus_access_per_cycle){
event_queue.push({sim_time+1,[this,port,addr,data](){ write(port,addr,data); }});
return;
}
current_cycle_access++;
if(write_prio==WRITE_FIRST){
apply_write(addr,data);
} else {
event_queue.push({sim_time+1,[this,addr,data](){ apply_write(addr,data); }});
}
update_cache(port, addr,data,true);
}

void read(int port,int addr,size_t length,std::function<void(std::vector<uint8_t>)> callback){
read_count[port]++;
if(current_cycle_access>=max_bus_access_per_cycle){
event_queue.push({sim_time+1,[this,port,addr,length,callback](){ read(port,addr,length,callback); }});
return;
}
current_cycle_access++;
int trigger_time = sim_time+read_delay;
bool hit = check_cache(port, addr,length);
if(hit) read_hits[port]++;
event_queue.push({trigger_time,[this,addr,length,callback,port,trigger_time]() {
std::vector<uint8_t> data;
for(size_t i=0;i<length;i++){
uint8_t val = mem[(addr+i)%mem.size()];
if(random_bit_flip()) val ^= (1<<(gen()%8));
dynamic_power += count_bit_changes(val, mem[(addr+i)%mem.size()]);
data.push_back(val);
}
read_delay_total[port] += (trigger_time - sim_time);
callback(data);
}});
}

void tick(){
sim_time++;
current_cycle_access = 0;
int cycle_dyn_power = 0;

while(!event_queue.empty() && event_queue.top().time <= sim_time){
auto e = event_queue.top(); event_queue.pop();
int before = dynamic_power;
e.action();
cycle_dyn_power += (dynamic_power - before);
}

dynamic_power_per_cycle.push_back(cycle_dyn_power);

// ASCII 可视化每周期动态功耗
int scale = 50;
int bar_len = *std::max_element(dynamic_power_per_cycle.begin(), dynamic_power_per_cycle.end())>0 ?
cycle_dyn_power*scale/(*std::max_element(dynamic_power_per_cycle.begin(), dynamic_power_per_cycle.end())) : 0;
std::cout << "Cycle " << sim_time << " dyn power: " << cycle_dyn_power
<< " total power: " << dynamic_power + static_power << " ";
for(int i=0;i<bar_len;i++) std::cout<<"#";
std::cout << std::endl;
}

void print_stats() const {
std::cout<<"Simulation stats:\n";
for(auto& [port,cnt]: read_count)
std::cout<<"Port "<<port<<" read count: "<<cnt
<<", hits: "<<read_hits.at(port)
<<", avg delay: "<<(cnt?read_delay_total.at(port)/cnt:0)<<"\n";
for(auto& [port,cnt]: write_count)
std::cout<<"Port "<<port<<" write count: "<<cnt
<<", hits: "<<write_hits.at(port)<<"\n";
std::cout<<"Dynamic power units: "<<dynamic_power<<"\n";
std::cout<<"Static power units: "<<static_power<<"\n";
std::cout<<"Total power units: "<<dynamic_power + static_power<<"\n";
}

void export_power_csv(const std::string &filename) const {
std::ofstream ofs(filename);
if(!ofs.is_open()) {
std::cerr << "Failed to open file: " << filename << std::endl;
return;
}
ofs << "Cycle,DynamicPower,StaticPower,TotalPower\n";
for(size_t i=0;i<dynamic_power_per_cycle.size();i++){
int dyn = dynamic_power_per_cycle[i];
int total = dyn + static_power;
ofs << (i+1) << "," << dyn << "," << static_power << "," << total << "\n";
}
ofs.close();
std::cout << "Power data exported to " << filename << std::endl;
}

private:
bool random_bit_flip(){
std::uniform_real_distribution<> dis(0.0,1.0);
return dis(gen)<0.01;
}

int count_bit_changes(uint8_t a,uint8_t b){
uint8_t diff = a^b;
int count=0;
while(diff){ count+=diff&1; diff>>=1; }
return count;
}

bool check_cache(int port, int addr,size_t length){
int line = addr % cache_size;
int tag = addr / cache_size;
return cache[line].valid && cache[line].tag==tag;
}

void update_cache(int port, int addr,const std::vector<uint8_t>& data,bool write=false){
int line = addr % cache_size;
int tag = addr / cache_size;
cache[line].valid=true;
cache[line].tag=tag;
if(write) write_hits[port]++;
}

void apply_write(int addr,const std::vector<uint8_t>& data){
for(size_t i=0;i<data.size();i++) mem[(addr+i)%mem.size()]=data[i];
}
};

// 示例主程序
int main(){
EdaMemoryFull mem(1024,2,16,EdaMemoryFull::WRITE_FIRST);

mem.write(0,10,{42,43,44});
mem.write(1,11,{99});
mem.read(0,10,3,[](std::vector<uint8_t> data){
std::cout<<"Port0 read burst: ";
for(auto v:data) std::cout<<(int)v<<" ";
std::cout<<std::endl;
});
mem.read(1,11,1,[](std::vector<uint8_t> data){
std::cout<<"Port1 read: "<<(int)data[0]<<std::endl;
});

for(int i=0;i<10;i++) mem.tick();
// 导出功耗数据
mem.export_power_csv("memory_power.csv");

mem.print_stats();
return 0;
}

输出的 memory_power.csv 文件内容示例:

1
2
3
4
5
6
7
8
9
10
11
Cycle,DynamicPower,StaticPower,TotalPower
1,3,128,131
2,0,128,128
3,5,128,133
4,2,128,130
5,7,128,135
6,1,128,129
7,4,128,132
8,0,128,128
9,2,128,130
10,3,128,131

每列含义:

  • Cycle:周期号
  • DynamicPower:每周期动态功耗单位
  • StaticPower:静态功耗单位(固定)
  • TotalPower:总功耗单位

功耗分析

report_power.pyview raw
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
import pandas as pd
import matplotlib.pyplot as plt

# 读取 CSV
df = pd.read_csv('memory_power.csv')

# 找到动态功耗峰值周期
peak_cycle = df['DynamicPower'].idxmax() + 1
peak_value = df['DynamicPower'].max()

plt.figure(figsize=(12,6))

# 绘制条形背景(ASCII风格效果)
for i, val in enumerate(df['DynamicPower']):
bar_len = int(val / peak_value * 50) # 50字符最大长度
plt.text(i+1, 0, '#' * bar_len, fontsize=8, color='grey', va='bottom')

# 绘制曲线
plt.plot(df['Cycle'], df['DynamicPower'], label='Dynamic Power', marker='o', color='blue')
plt.plot(df['Cycle'], df['StaticPower'], label='Static Power', linestyle='--', color='orange')
plt.plot(df['Cycle'], df['TotalPower'], label='Total Power', linestyle='-.', color='green')

# 高亮峰值
plt.scatter(peak_cycle, peak_value, color='red', s=100, label='Peak Dynamic Power')

plt.title('Memory Power Simulation with Peak Highlight & ASCII-style Bars')
plt.xlabel('Cycle')
plt.ylabel('Power Units')
plt.grid(True, linestyle=':')
plt.legend()
plt.tight_layout()
plt.show()

burst/multi-port 总线冲突

在多端口系统中,尤其是在使用总线结构的系统中,总线冲突(Bus contention)是一个常见的问题。总线冲突通常发生在多个设备尝试同时访问总线上的同一资源时。这种情况可能会导致数据损坏、系统性能下降或甚至系统崩溃。下面是一些解决和缓解总线冲突的策略:

  1. 仲裁机制
    仲裁是解决总线冲突的一种常用方法。它通过一个仲裁器(Arbiter)来决定哪个设备可以访问总线。常见的仲裁策略有:

优先级仲裁:根据预先设定的优先级顺序决定哪个设备可以访问总线。

轮询仲裁:轮流让每个设备访问总线。

基于请求的仲裁(如请求共享(Request-for-Shared, RFS)和请求独占(Request-for-Exclusive, RFE)):设备首先请求对资源的访问,然后根据请求的类型(共享或独占)来决定访问权限。

  1. 分时复用
    通过时间分割(Time Division Multiplexing, TDM)或频率分割(Frequency Division Multiplexing, FDM),可以允许多个设备在不同的时间或频率上使用总线,从而减少冲突。例如,可以使用时分多路复用将总线的不同时间段分配给不同的设备。

  2. 编码和解码技术
    使用特殊的编码和解码技术,如霍纳编码(Hornar code)或格雷码(Gray code),可以减少在总线上传输数据时的错误,并帮助检测和纠正数据冲突。

  3. 总线锁定
    在访问总线期间,通过总线锁定机制确保没有其他设备可以访问总线。这可以通过在总线上设置一个锁定信号来实现,该信号在访问期间保持激活状态。

  4. 缓存和缓冲
    为每个设备提供局部缓存或缓冲机制,可以减少对总线的直接访问次数,从而降低冲突的可能性。当一个设备需要与总线上的另一个设备通信时,它可以先将数据写入自己的缓存,然后再由缓存同步到总线上。

  5. 使用更宽的总线
    增加总线的宽度可以允许在同一时间内传输更多的数据,从而减少对总线的需求,降低冲突的可能性。

实施步骤
评估系统需求:确定哪些类型的设备将使用总线,以及它们对带宽的需求。

选择仲裁策略:根据设备的优先级和带宽需求选择合适的仲裁策略。

设计硬件:根据选定的策略设计硬件,包括添加仲裁器、缓存和适当的控制逻辑。

测试和优化:实施后进行系统测试,根据测试结果调整策略或硬件设计。

通过上述方法,可以有效管理和减少多端口系统中的总线冲突问题,提高系统的稳定性和性能。

Cache Tag(缓存标记)

Cache Tag(缓存标记)是高速缓存(Cache)中的关键组成部分,用于存储数据在主存中的地址信息,以便快速定位数据位置。 ‌

核心功能
Tag字段存储了主存中数据的地址信息,当CPU访问主存时,首先通过Tag字段判断数据是否存在于Cache中。若存在,则直接从Cache读取;若不存在,则访问主存。 ‌

结构组成

  • ‌Tag‌:记录数据在主存的地址信息。
  • ‌Data‌:存储实际数据。
  • ‌Valid Bit‌:标记数据是否有效。
  • ‌Dir‌:目录信息,用于区分不同数据块。 ‌

应用场景

现代处理器通常采用多级Cache结构(如L1、L2、L3),其中Tag与Data共同构成Cache Line,用于快速访问和存储数据。例如,ARMv8-A架构的处理器包含独立的I-Cache和D-Cache,分别存储指令和数据。

Cache Tag 仿真代码

FIXME: 该代码会 coredump 。

cache_simulator.hview raw
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
#ifndef CACHE_SIMULATOR_H
#define CACHE_SIMULATOR_H

#include <vector>
#include <unordered_map>
#include <mutex>
#include <atomic>

// 缓存行结构体
struct CacheLine {
bool valid = false;
bool dirty = false;
uint32_t tag = 0;
uint64_t last_used = 0; // 用于LRU替换策略
std::vector<uint8_t> data;
};

// 端口访问请求结构体
struct PortRequest {
uint32_t port_id;
bool is_write;
uint32_t addr;
uint8_t* data_ptr;
size_t data_size;
};

class CacheSimulator {
public:
CacheSimulator(uint32_t line_size, uint32_t num_lines, uint32_t num_ports);

// 多端口访问接口
void process_request(const PortRequest& req);

// 缓存配置
void set_write_policy(bool write_back);
void set_replacement_policy(int policy); // 0:LRU, 1:FIFO, 2:Random

private:
// 内部缓存操作
bool access_cache(uint32_t port_id, uint32_t addr, bool is_write, uint8_t* data, size_t size);
void handle_miss(uint32_t port_id, uint32_t addr);
void evict_line(uint32_t set_idx, uint32_t way_idx);

// 多端口同步
std::atomic<uint64_t> global_counter_{0}; // 原子计数器
std::vector<std::mutex> port_locks_;

// 缓存结构
uint32_t line_size_;
uint32_t num_sets_;
std::vector<std::vector<CacheLine>> cache_;

// 策略配置
bool write_back_;
int replacement_policy_;
};
#endif
cache_simulator.cppview raw
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
#include "cache_simulator.h"
#include <algorithm>
#include <random>
#include <cstring>

CacheSimulator::CacheSimulator(uint32_t line_size, uint32_t num_lines, uint32_t num_ports)
: port_locks_(num_ports), line_size_(line_size) {
num_sets_ = num_lines; // 简单实现,可扩展为组相联
cache_.resize(num_sets_, std::vector<CacheLine>(1)); // 直接映射
}

void CacheSimulator::process_request(const PortRequest& req) {
std::lock_guard<std::mutex> lock(port_locks_[req.port_id]);
access_cache(req.port_id, req.addr, req.is_write, req.data_ptr, req.data_size);
}

bool CacheSimulator::access_cache(uint32_t port_id, uint32_t addr, bool is_write,
uint8_t* data, size_t size) {
uint32_t tag = addr / line_size_;
uint32_t set_idx = tag % num_sets_;

// 查找命中
for (auto& line : cache_[set_idx]) {
if (line.valid && line.tag == tag) {
line.last_used = ++global_counter_;
if (is_write) {
memcpy(line.data.data(), data, size);
line.dirty = true;
} else {
memcpy(data, line.data.data(), size);
}
return true;
}
}

// 未命中处理
handle_miss(port_id, addr);
return false;
}

void CacheSimulator::handle_miss(uint32_t port_id, uint32_t addr) {
uint32_t tag = addr / line_size_;
uint32_t set_idx = tag % num_sets_;

// 查找可替换行
auto& lines = cache_[set_idx];
auto victim = std::min_element(lines.begin(), lines.end(),
[](const CacheLine& a, const CacheLine& b) {
return a.last_used < b.last_used; // LRU策略
});

// 写回脏数据
if (write_back_ && victim->dirty) {
// 模拟写回主存操作
}

// 加载新数据
victim->valid = true;
victim->tag = tag;
victim->dirty = false;
victim->last_used = ++global_counter_;
// 模拟从主存加载数据
victim->data.resize(line_size_);
}
main.cppview raw
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
#include "cache_simulator.h"
#include <thread>
#include <iostream>

void port_thread(CacheSimulator& cache, uint32_t port_id) {
for (int i = 0; i < 1000; ++i) {
PortRequest req;
req.port_id = port_id;
req.is_write = (i % 3 == 0);
req.addr = rand() % 0xFFFF;
uint8_t data[64] = {0};
req.data_ptr = data;
req.data_size = sizeof(data);

cache.process_request(req);
}
}

int main() {
CacheSimulator cache(64, 1024, 4); // 64B行, 1024行, 4端口

std::vector<std::thread> threads;
for (int i = 0; i < 4; ++i) {
threads.emplace_back(port_thread, std::ref(cache), i);
}

for (auto& t : threads) {
t.join();
}

std::cout << "Cache simulation completed" << std::endl;
return 0;
}
Makefileview raw
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
# 编译器配置
CXX := g++
CXXFLAGS := -std=c++17 -Wall -Wextra -O3 -pthread
LDFLAGS := -pthread

# 项目结构
SRC_DIR := .
BUILD_DIR := build
TARGET := $(BUILD_DIR)/cache_simulator

# 源文件列表
SRCS := $(wildcard $(SRC_DIR)/*.cpp)
OBJS := $(patsubst $(SRC_DIR)/%.cpp,$(BUILD_DIR)/%.o,$(SRCS))
DEPS := $(OBJS:.o=.d)

# 默认目标
all: $(BUILD_DIR) $(TARGET)

# 创建构建目录
$(BUILD_DIR):
mkdir -p $(BUILD_DIR)

# 主目标链接
$(TARGET): $(OBJS)
$(CXX) $(LDFLAGS) $^ -o $@

# 编译规则
$(BUILD_DIR)/%.o: $(SRC_DIR)/%.cpp
$(CXX) $(CXXFLAGS) -MMD -c $< -o $@

# 包含依赖关系
-include $(DEPS)

# 清理
clean:
rm -rf $(BUILD_DIR)

.PHONY: all clean

IO + CPU 密集 + IO

tasks

tasks.hppview raw
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
/**
* 模拟 IO 和 CPU 任务
*/

#include <unistd.h>

#include <iostream>
#include <vector>

struct DataChunk {
std::vector<char> data;
DataChunk() = default;
explicit DataChunk(size_t size) : data(size) {}
};

struct CompressedChunk {
std::vector<char> data;
CompressedChunk() = default;
explicit CompressedChunk(size_t size) : data(size) {}
};

// 模拟数据读取函数
bool read_from_network(DataChunk& chunk) {
sleep(3); // 模拟 IO 延迟

chunk.data.resize(100); // 模拟每个数据块有 100 个字节

static int count = 0;
if (count++ >= 10) return false; // 模拟读取 10 个数据块后结束

std::generate(chunk.data.begin(), chunk.data.end(),
[]() { return rand() % 256; });
return true;
}

// 模拟压缩函数
char compress_byte(char byte) {
for (int i = 0; i < 10'000ll; ++i)
; // 模拟 CPU busy
return byte % 128; // 简单压缩算法示例
}

// 模拟写入函数
void write_to_file(const CompressedChunk& chunk) {
sleep(3); // 模拟 IO 延迟
std::cout << "Writing chunk of size " << chunk.data.size() << "\n";
}

方案一

1_message_queue.cppview raw
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
/**
* 方案 1:异步队列 + TBB 流水线
*/

#include <tbb/tbb.h>

#include <condition_variable>
#include <mutex>
#include <queue>
#include <thread>
#include <vector>
#include <atomic>

#include "tasks.hpp"

// 全局队列
std::queue<std::vector<char>> readQueue;
std::mutex mtx;
std::condition_variable cv;
std::atomic_bool stop = false;

// 读线程
void networkReader() {
while (!stop) {
DataChunk chunk;

if (!read_from_network(chunk)) // 阻塞 I/O
{
stop.store(true, memory_order::releaxed);
return;
}

{
std::lock_guard<std::mutex> lock(mtx);
readQueue.push(std::move(chunk.data));
}
cv.notify_one();
}
}

// 压缩任务
void compressor() {
while (!stop) {
DataChunk chunk;
{
std::unique_lock<std::mutex> lock(mtx);
cv.wait(lock, [] { return !readQueue.empty(); });
chunk.data = std::move(readQueue.front());
readQueue.pop();
}

// CPU 密集计算,使用 TBB 并行
CompressedChunk compressed(chunk.data.size());
tbb::parallel_for(size_t(0), chunk.data.size(), [&](size_t i) {
compressed.data[i] = compress_byte(chunk.data[i]); // 假设单字节压缩
});

write_to_file(compressed); // 可以异步
}
}

int main() {
std::thread reader(networkReader);
std::thread worker(compressor);

reader.join();
worker.join();
}

方案二

2_flow_graph.cppview raw
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
/**
* 方案 2:TBB Flow Graph
*/

#include <tbb/flow_graph.h>
#include <tbb/tbb.h>

#include <iostream>
#include <vector>

#include "tasks.hpp"

using namespace tbb;
using namespace tbb::flow;

int main() {
graph g;

// 1. 读取节点(串行)
input_node<DataChunk> reader(
g,
[](flow_control& fc) -> DataChunk {
DataChunk chunk(1024); // 1KB数据块
if (!read_from_network(chunk)) {
fc.stop();
return DataChunk();
}
return chunk;
});

// 2. 并行处理节点(无限制并发)
function_node<DataChunk, CompressedChunk> processor(
g, unlimited,
[](const DataChunk& input) -> CompressedChunk {
CompressedChunk output(input.data.size());

tbb::parallel_for(
tbb::blocked_range<size_t>(0, input.data.size()),
[&](const tbb::blocked_range<size_t>& r) {
for (size_t i = r.begin(); i != r.end(); ++i) {
output.data[i] = compress_byte(input.data[i]);
}
});

return output;
});

// 3. 写入节点(串行保证写入顺序)
function_node<CompressedChunk> writer(
g, serial,
[](const CompressedChunk& output) {
write_to_file(output);
});

// 构建数据流管道
make_edge(reader, processor);
make_edge(processor, writer);

// 启动管道
reader.activate();
g.wait_for_all();

return 0;
}

方案三

3_pipeline.cppview raw
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
/**
* 方案 3: TBB 流水线
*/

#include <tbb/tbb.h>

#include <iostream>
#include <vector>

#include "tasks.hpp"

int main() {
tbb::parallel_pipeline(
/* max_number_of_live_token */ 4,
// Stage 1: 读网络数据
tbb::make_filter<void, DataChunk>(
tbb::filter_mode::serial_in_order,
[](tbb::flow_control& fc) -> DataChunk {
DataChunk chunk;
if (!read_from_network(chunk)) { // 返回 false 时结束
fc.stop();
}
return chunk;
}) &
// Stage 2: CPU 压缩
tbb::make_filter<DataChunk, CompressedChunk>(
tbb::filter_mode::parallel,
[](DataChunk chunk) -> CompressedChunk {
CompressedChunk out(chunk.data.size());
tbb::parallel_for(size_t(0), chunk.data.size(), [&](size_t i) {
out.data[i] = compress_byte(chunk.data[i]);
});
return out;
}) &
// Stage 3: 写文件
tbb::make_filter<CompressedChunk, void>(
tbb::filter_mode::serial_in_order,
[](CompressedChunk out) {
write_to_file(out); // 可以异步
}));

return 0;
}