0%

前言

源码:https://github.com/cameron314/concurrentqueue

原文
Solving the ABA Problem for Lock-Free Free Lists

共识

  • CAS 足以保证线程安全;
  • 我们要做的是消除 CAS 的 ABA 问题。

常见的解决方案

Tagged / versioned 指针

每次修改 head 的同时也变更一个版本号(tag/version),把 (pointer, version) 当作 CAS 的对象。即使指针
回到同一个 A,只要版本号不同,CAS 就会失败,防止 ABA。

缺点包括:在某些机器上需要双字宽度(两个 word)CAS,如果 硬件 / 编译器 / 平台 不支持就只能 模拟 / 锁
住;或者要压缩 pointer 或 tag 大小,可能限制可管理的节点数量 / 地址空间。

LL/SC(Load-Linked / Store-Conditional)原语

在支持 LL/SC 的架构上,它自然就可以阻止 ABA:因为 store-conditional 会检测在 load 和 store 之间地址
是否被“写过”,哪怕写了后来改回原来的值也不行。缺点是很多主流架构(尤其 x86)不支持或者支持有限。

作者的方法:以引用计数+“should be on freelist”标志(标志 + 引用计数)

作者提出了一个适用于 free list 的通用方法来避免 ABA,且保持 lock‐free 特性。

提示:该代码应该从 try_get() 开始阅读,然后回到 add() ,这样才能理解设计引用计数的意图,否则很容
易迷惑。

  • 引用计数是状态标志:
    • 表示当前有多少线程正在操作节点;
    • 空闲链表本身也持有一个对节点的引用计数
  • 任何线程尝试摘取空闲链表的节点(总是从头部开始)时(try_get()):
    • 必须先增加 head 的引用计数(不同线程可能并发地将节点的引用计数增加到某个值);→ (3)
    • next = head->next;
    • 然后 CAS 竞争,尝试将 freeListHead 调整为 next;
      • 如果 CAS 竞争成功,则成功将 freeListHead 调整为 next,该节点被摘下;引用计数减 2(自己增加 的
        引用计数 + 链表本身持有的引用计数),我们称该线程为线程 1。→ (1)
      • 如果 CAS 竞争失败,则回退:将自己所增加的引用计数减去;用 CAS 返回的新的 head 重试(下一次获取
        新的 head 节点),这是线程 2;→ (2)
      • 注意:这两步之间以及每一步自身内,对引用计数的操作都有中间状态,但是此时引用计数必然大于 0 (
        重点! )。
  • ABA 问题:如果成功摘取节点的线程 1完成节点的使用,将节点又添加回空闲链表(add()):
    • 它会发现某个线程正在第 (2) 步的回退过程中(还没有将自身增加的引用计数减回去),所以此时引用计
      数还是大于 0(也就是说线程 1使用该节点的整个过程,该节点的引用计数都是大于 0 的,但是不影响)
    • 此时它可以选择自旋地等待引用计数降为 0 (即所有之前在 try_get() 中 CAS 竞争失败的线程(
      线程 2)回退成功)。因为只有引用计数降为 0 了,才能说明自己是唯一一个尝试在链表上操作该节点
      的线程。但是这从技术上说,就是一个 lock。
    • 作者设计了一个更聪明的办法:add() 设置一个 SHOULD_BE_ON_FREELIST 标志,然后直接放弃
      add()的任务已经完成了)。
    • 让处于回退中的 线程 2,在回退结束的时候,检查 SHOULD_BE_ON_FREELIST 标志,如果有该标志,并且
      自己就是最后一个操作该节点的线程的话(即此时的引用计数已经降为 0 了),就帮助 add() 把该节点添
      加到 free list。

引用计数

现在我们回过头来看引用计数的作用:

  • 如果 refs > 0

    • (1) 要么该节点在链表上;
    • (2) 要么有线程正在操作该节点(准确来讲,是在 try_get());
    • (3) 要么以上两种情况同时存在。
    • 注:(1)(2) 可以只存在一种,见上面的 ABA 问题的描述。
  • 如果 refs == 0

    • 节点已经被从空闲链表上取下;
    • 并且其他线程都已经从 try_get() 成功回退了。

如果第三个线程,与线程 1线程 2一起进入上面的 try_get() 竞争,但是当线程 3即将执行步
(3)时,发现 refs == 0,它就不能够再增加 head 的引用计数了,因为节点已经被成功取下。否则当它成
功增加引用计数,再去拿取 head->next 的时候,是未定义行为。

正是基于此,在步骤(3)增加引用计数的时候,我们需要判断 refs == 0 ? 和使用 CAS 增加引用计数,如果
不符合预期,则放弃增加,进入下一轮重试。

1
2
3
4
if ((refs & REFS_MASK) == 0 || !head->freeListRefs.compare_exchange_strong(refs, refs + 1, std::memory_order_acquire)) {
head = freeListHead.load(std::memory_order_acquire);
continue;
}

ABA 问题

在典型的无锁栈/队列里,指针会被反复复用(比如 pop 出再 push 回去)。如果只用 compare_exchange 比较指
针,那么:

  • T1 读到 A
  • T2 把 A → B → A 改一圈
  • T1 CAS 时看到还是 A,以为结构没变,但其实已经发生变化

这就是 ABA 问题。

解决方案

版本标记指针方案

‌ 核心思想 ‌:为每个指针附加一个版本号标记,每次修改时递增版本号。

‌ 实现要点 ‌:

  • 需要双字 CAS(DCAS)硬件支持
  • 每次修改头指针时递增标记值
  • 即使指针地址相同,标记值不同也会使 CAS 失败
  • 适用于支持 DCAS 的平台(如某些 ARM 架构)
  • 在非 DCAS 平台也可用,前提是将 data + version 放入一个机器字中,以维持操作的原子性

‌ 限制 ‌:

  • 在某些机器上需要双字宽度(两个 word)CAS;
  • 如果硬件/编译器/平台不支持就只能模拟/锁住;或者要压缩 pointer 或 tag 大小,可能限制可管理的节点数量/地址空间。

引用计数方案

‌ 核心思想 ‌:为每个节点维护引用计数,防止节点在被使用时被重新添加到列表。

‌ 关键实现细节 ‌:

每个节点包含 freeListRefs(引用计数)和 freeListNext(下一节点指针) 引用计数高位用作 “应返回自由列表”
标志位 try_get()操作前确保引用计数不为零 add()操作使用原子操作管理引用计数和标志位 ‌ 优势 ‌:

完全通用的解决方案,不依赖特定硬件特性保持真正的锁无关性质正确处理并发场景下的各种竞争条件

Hazard Pointer(危险指针)

思路:每个线程在访问共享指针之前,把自己正在访问的指针写到一个全局可见的“hazard pointer”里。

作用:其他线程在想要回收这个节点内存时,必须检查所有线程的 hazard pointers,如果发现有人还在用这个节
点,就不能释放。

优点:简单直接,内存可以安全回收。

缺点:维护 hazard pointers 有一定开销,每次回收都要检查所有线程。

Epoch-Based Reclamation(基于世代的回收,简称 Epoch GC)

思路:把时间切分成 epoch(世代)。线程进入临界区时声明自己在某个 epoch。当一个节点被删除后,先放到一
个“延迟回收队列”,等到所有线程都离开这个 epoch 之后,才能真正释放这些节点。

作用:保证没有线程会在旧 epoch 中访问到已经释放的节点。

优点:比 hazard pointer 更高效(不用逐个检查指针)。

缺点:需要所有线程都周期性地报告自己活跃的 epoch,否则内存可能迟迟回收不了。

🚩 为什么会和 ABA 有关?

像 Michael-Scott 队列这种链表结构,节点被 pop 出队后地址可能被重用。如果没有安全的内存回收,另一个线
程可能 CAS 成功指向了一个“已经被释放并重用的地址”,这就是 ABA 的根源。所以 hazard pointer 或 epoch
GC 是在链表队列里用来避免这种 悬空引用 + ABA 的。

而 moodycamel:: ConcurrentQueue 因为用的是 环形 buffer + sequence number,节点不会反复 malloc/free,
所以根本就不需要 hazard pointer 或 epoch GC。

TBB 的特殊性

本文的法则是以线程为调度单位的。

如果你使用的是 TBB ,那么请将 “线程” 对应为 “任务”,而将本文的“任务” 对应为“载荷”( payloads)。

因为 TBB 是以任务为调度单位的:

  • 每个 “任务” 是并发运行的最小单位,必须保证数据独立或线程安全。
  • TBB 采用 “任务窃取” 算法来保证线程的复杂均衡,所以若干任务可能被同一线程或不同线程运行。

经验法则

以 tasksPerThread 按需分配线程

  • tasksPerThread:均匀性。如果每个线程分配的任务数不均匀,那么任务数最多的线程就会成为瓶颈。
  • 按需分配线程:如果任务可以很快完成,那么没有必要开启过多的线程,否则调度开销也不可小觑。

以空间换时间

为了任务能并发运行,进行任务分隔时,必须尽可能减少数据共享。
所以不要吝惜空间,为每个任务单独开辟内存(不论是为输入还是输出目的)都是值得且必要的。

长临界区使用 lock + 条件变量

如果一个任务的临界区比较大,意味着该任务执行时,其他线程在短时间内无法进入临界区。
与其让这些线程忙等,不如释放 CPU 进入阻塞 / 休眠。

前言

这是阅读 Cameron Desrochers 的
A Fast General Purpose Lock-Free Queue for C++
源码的笔记。

系统概览

MPMC 队列由一系列 SPMC 队列组成。消费者使用启发式 (heuristic) 来决定消费哪个 SPMC 队列。允许批量入列
和出列,只需要很小的额外开销。

producer 需要一些 thread-local 数据; consumer 也可以用一些可选的 thread-local 数据来加速;这些
thread-local 数据可以与用户分配的 tokens 关联;如果用户没有为生产者提供 tokens ,则使用无锁哈希表(
以当前线程 ID 为键)来查找线程本地生产者队列:每个 SPMC 队列都使用一个预分配的 token (或隐式分配的
token,如果没有提供的话)来创建。由于 token 包含相当于线程特定的数据,因此它们不应该同时在多个线程中
使用(尽管可以将 token 的所有权转移给另一个线程;特别是,这允许在线程池任务中使用令牌,即使运行任务
的线程在中途发生变化)。

所有生产者队列都以无锁链表的形式连接在一起。当显式生产者不再有元素被添加时(即其令牌被销毁),它会被
标记为与任何生产者都无关联,但它会保留在链表中,且其内存不会被释放;下一个新生产者会重用旧生产者的内
存(这样,无锁生产者列表就只能添加)。隐式生产者永远不会被销毁(直到高层队列本身被销毁),因为无法知
道给定线程是否已完成对数据结构的使用。需要注意的是,最坏情况下的出队速度取决于生产者队列的数量,即使
它们都为空。

显式生产者队列和隐式生产者队列的生命周期存在根本区别:显式生产者队列的生产生命周期有限,与令牌的生命
周期绑定;而隐式生产者队列的生产生命周期不受限制,且与高级队列本身的生命周期相同。因此,为了最大化速
度和内存利用率,我们使用了两种略有不同的 SPMC 算法。通常,显式生产者队列设计得更快,占用的内存也更多
;而隐式生产者队列设计得更慢,但会将更多内存回收到高级队列的全局池中。为了获得最佳速度,请始终使用显
式令牌(除非您觉得它太不方便)。

任何分配的内存只有在高级队列被销毁时才会释放(尽管存在一些重用机制)。内存分配可以预先完成,如果内存
不足,操作就会失败(而不是分配更多内存)。如果需要,用户可以覆盖各种默认大小参数(以及队列使用的内存
分配函数)。

Full API (pseudocode)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
# Allocates more memory if necessary
enqueue(item) : bool
enqueue(prod_token, item) : bool
enqueue_bulk(item_first, count) : bool
enqueue_bulk(prod_token, item_first, count) : bool

# Fails if not enough memory to enqueue
try_enqueue(item) : bool
try_enqueue(prod_token, item) : bool
try_enqueue_bulk(item_first, count) : bool
try_enqueue_bulk(prod_token, item_first, count) : bool

# Attempts to dequeue from the queue (never allocates)
try_dequeue(item&) : bool
try_dequeue(cons_token, item&) : bool
try_dequeue_bulk(item_first, max) : size_t
try_dequeue_bulk(cons_token, item_first, max) : size_t

# If you happen to know which producer you want to dequeue from
try_dequeue_from_producer(prod_token, item&) : bool
try_dequeue_bulk_from_producer(prod_token, item_first, max) : size_t

# A not-necessarily-accurate count of the total number of elements
size_approx() : size_t

Producer Queue (SPMC) Design

隐式和显式版本的共享设计

生产者队列由块组成(显式和隐式生产者队列使用相同的块对象,以实现更好的内存共享)。初始状态下,它没有
块。每个块可以容纳固定数量的元素(所有块的容量相同,均为 2 的幂)。此外,块包含一个标志,指示已填充
的槽位是否已被完全消耗(显式版本使用此标志来判断块何时为空),以及一个原子计数器,用于计数已完全出队
的元素数量(隐式版本使用此标志来判断块何时为空)。

为了实现无锁操作,生产者队列可以被认为是一个抽象的无限数组。尾部索引指示生产者下一个可用的槽位;它同
时也是已入队元素数量的两倍( 入队计数 (enqueue count) )。尾部索引仅由生产者写入,并且始终递增(除
非溢出并回绕,但就我们的目的而言,这种情况仍被视为“递增”)。由于只有一个线程在更新相关变量,因此生产
一个元素的过程非常简单。头索引指示下一个可以被消费的元素。头索引由消费者原子地递增,可能并发进行。为
了防止头索引达到/超过感知到的尾部索引,我们使用了一个额外的原子计数器: 出队计数 (dequeue count)
。出队计数是乐观的,即当消费者推测有元素需要出队时,它会递增。如果出队计数在递增后的值小于入队计数(
尾部),则保证至少有一个元素要出队(即使考虑到并发性),并且可以安全地递增头部索引,因为知道之后它会
小于尾部索引。另一方面,如果出队计数在递增后超过(或等于)尾部,则出队操作失败,并且出队计数在逻辑上
会递减(以使其最终与入队计数保持一致):这可以通过直接递减出队计数来实现,但是(为了增加并行性并使所
有相关变量单调递增),改为递增**出队过量提交计数器 (dequeue overcommit counter)**。

1
出队计数的逻辑值 = 出队计数变量 - 出队过量提交值

在消费时,一旦如上所述确定了有效索引,仍然需要将其映射到一个块以及该块中的偏移量;为此会使用某种索引
数据结构(具体使用哪种结构取决于它是隐式队列还是显式队列)。最后,可以将元素移出,并更新某种状态,以
便最终知道该块何时完全消费。下文将分别在隐式和显式队列的各个部分中对这些机制进行完整描述。

如前所述,尾部和头部的索引/计数最终会溢出。这是预料之中的,并且已被考虑在内。因此,索引/计数被视为存
在于一个与最大整数值大小相同的圆上(类似于 360 度的圆,其中 359 在 1 之前)。为了检查一个索引/计数(
例如 a)是否位于另一个索引/计数(例如 b)之前(即逻辑小于),我们必须确定 a 是否沿着圆上的顺时针圆弧
更接近 b。使用以下”环形小于”算法(32 位版本):a < b 变为 a - b > (1U << 31U)a <= b 变为
a - b - 1ULL > (1ULL << 31ULL)。请注意,环形减法“仅适用于”普通无符号整数(假设为二进制补码)。需要
注意的是,尾部索引的增量不会超过头部索引(这会破坏队列)。请注意,尽管如此,从技术上讲仍然存在竞争条
件,即消费者(或生产者)看到的索引值过于陈旧,几乎比当前值落后一整圈(甚至更多!),从而导致队列的内
部状态损坏。但在实践中,这不是问题,因为遍历 2^31 个值(对于 32 位索引类型)需要一段时间,而其他核心
到那时会看到更新的值。实际上,许多无锁算法都基于相关的标签指针习语(tag-pointer idiom),其中前 16
位用于重复递增的标签,后 16 位用于指针值;这依赖于类似的假设,即一个核心不能将标签递增超过 2^15 次,
而其他核心却不知道。尽管如此,队列的默认索引类型是 64 位宽(如果 16 位看起来就足够了,那么理论上应该
可以避免任何潜在的竞争)。

内存分配失败也会得到妥善处理,不会损坏队列(只会报告失败)。此外,队列元素本身在操作时也应确保不会抛
出异常。

Block Pools

有两种不同的块池可供使用:首先,有一个初始的预分配块数组。一旦使用完毕,该池将永远保持为空。这简化了
其无等待(wait-free)实现,只需一条 fetch-and-add 原子指令(用于获取空闲块的下一个索引)并进行检查(
以确保该索引在范围内)。其次,有一个无锁(但非无等待)的全局空闲列表(“全局”是指对高级队列而言是全局
的),其中包含已用完且可重复使用的块,该列表实现为一个无锁单链表:头指针最初指向空(null)。要将块添
加到空闲列表,需要将块的下一个指针设置为头指针,然后使用比较并交换 (CAS) 更新头指针,使其指向该块,
前提是头指针未发生更改;如果发生更改,则重复该过程(这是一个经典的无锁 CAS 循环设计模式)。要从空闲
列表中移除一个块,可以使用类似的算法:读取头部块的下一个指针,然后将头部设置为该下一个指针(使用
CAS),前提是在此期间头部块没有发生变化。为了避免 ABA 问题,每个块都有一个引用计数,在执行 CAS 移除
块之前会递增,之后会递减;如果在块的引用计数大于 0 的情况下尝试将其重新添加到空闲列表中,则会设置一
个标志,指示该块应该在空闲列表中,并且下一个线程在完成最后一个引用的持有后会检查此标志,并将该块添加
到列表中(这种方法有效,因为我们不关心顺序)。我
另一篇博文中更
详细地描述了这个无锁空闲列表的具体设计和实现。当生产者队列需要新块时,它首先检查初始块池,然后检查全
局空闲列表,只有当它在那里找不到空闲块时,它才会在堆上分配一个新块(如果不允许内存分配,则失败)。

基准测试

Ticket System

BlockQueue(只用分块):使用分块内存布局,但不使用 ticket 分发机制。

TicketQueue_benchmark.cppview raw
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
constexpr size_t BLOCK_SIZE = 64;
constexpr int N = 1'000'000;
constexpr int NUM_PRODUCERS = 8;
constexpr int NUM_CONSUMERS = 8;
constexpr int ITEMS_PER_PRODUCER = N / NUM_PRODUCERS;

struct Slot {
std::atomic<bool> ready;
int data;
};

struct Block {
Slot slots[BLOCK_SIZE];
};

// BlockQueue: 分块但无 ticket
class BlockQueue {
public:
BlockQueue() : head(0), tail(0) {
}

void enqueue(int value) {
size_t index = head.fetch_add(1) % BLOCK_SIZE;
block.slots[index].data = value;
block.slots[index].ready.store(true, std::memory_order_release);
}

bool try_dequeue(int& value) {
size_t index = tail.fetch_add(1) % BLOCK_SIZE;
if (!block.slots[index].ready.load(std::memory_order_acquire))
return false;
value = block.slots[index].data;
return true;
}

private:
std::atomic<size_t> head;
std::atomic<size_t> tail;
Block block;
};

TicketQueue(分块 + ticket):模拟 moodycamel 的 ticket 分发方式。

TicketQueue_benchmark.cppview raw
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
// TicketQueue: 分块 + ticket
class TicketQueue {
public:
TicketQueue() : head(0), tail(0) {
}

void enqueue(int value) {
size_t ticket = head.fetch_add(1);
size_t index = ticket % BLOCK_SIZE;
block.slots[index].data = value;
block.slots[index].ready.store(true, std::memory_order_release);
}

bool try_dequeue(int& value) {
size_t ticket = tail.fetch_add(1);
size_t index = ticket % BLOCK_SIZE;
while (!block.slots[index].ready.load(std::memory_order_acquire)) {
// 自旋等待
}
value = block.slots[index].data;
return true;
}

private:
std::atomic<size_t> head;
std::atomic<size_t> tail;
Block block;
};

Benchmark 代码:

TicketQueue_benchmark.cppview raw
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
// 基准测试函数
template <typename QueueType>
double benchmark(const std::string& name, double& opsPerSec) {
QueueType queue;
std::atomic<int> totalConsumed{0};
std::map<int, double> threadWaitTimes;

auto start = std::chrono::high_resolution_clock::now();

// 启动生产者线程
std::vector<std::thread> producers;
for (int p = 0; p < NUM_PRODUCERS; ++p) {
producers.emplace_back([&queue, p]() {
for (int i = 0; i < ITEMS_PER_PRODUCER; ++i) {
queue.enqueue(i + p * ITEMS_PER_PRODUCER);
}
});
}

// 启动消费者线程
std::vector<std::thread> consumers;
for (int c = 0; c < NUM_CONSUMERS; ++c) {
consumers.emplace_back([&queue, &totalConsumed, c, &threadWaitTimes]() {
int item;
auto localStart = std::chrono::high_resolution_clock::now();
while (true) {
auto t0 = std::chrono::high_resolution_clock::now();
while (!queue.try_dequeue(item)) {
// busy wait
}
auto t1 = std::chrono::high_resolution_clock::now();
threadWaitTimes[c] +=
std::chrono::duration<double>(t1 - t0).count();

if (++totalConsumed >= N)
break;
}
auto localEnd = std::chrono::high_resolution_clock::now();
double threadTime =
std::chrono::duration<double>(localEnd - localStart).count();
std::cout << "Consumer " << c << " finished in " << threadTime
<< "s, wait time: " << threadWaitTimes[c] << "s\n";
});
}

for (auto& t : producers)
t.join();
for (auto& t : consumers)
t.join();

auto end = std::chrono::high_resolution_clock::now();
std::chrono::duration<double> elapsed = end - start;
opsPerSec = N / elapsed.count();

std::cout << "\n"
<< name << " completed in " << elapsed.count()
<< "s, throughput: " << opsPerSec << " ops/sec\n";

return elapsed.count();
}

int main() {
double opsBlock = 0.0, opsTicket = 0.0;
double timeBlock = benchmark<BlockQueue>("BlockQueue", opsBlock);
double timeTicket = benchmark<TicketQueue>("TicketQueue", opsTicket);
double speedup = opsTicket / opsBlock;

std::cout << std::left << "| " << std::setw(14) << "Queue Type"
<< "| " << std::setw(12) << "Time (s)"
<< "| " << std::setw(20) << "Throughput (ops/s)"
<< "| " << std::setw(10) << "Speedup"
<< "|\n";

std::cout << std::string(70, '-') << "\n";

// BlockQueue row
std::cout << std::left << "| " << std::setw(14) << "BlockQueue"
<< "| " << std::setw(12) << std::fixed << std::setprecision(6)
<< timeBlock << "| " << std::setw(20) << std::fixed
<< std::setprecision(0) << opsBlock << "| " << std::setw(10)
<< "1.00×"
<< "|\n";

// TicketQueue row
std::ostringstream speedupStream;
speedupStream << std::fixed << std::setprecision(2) << speedup << "×";

std::cout << std::left << "| " << std::setw(14) << "TicketQueue"
<< "| " << std::setw(12) << std::fixed << std::setprecision(6)
<< timeTicket << "| " << std::setw(20) << std::fixed
<< std::setprecision(0) << opsTicket << "| " << std::setw(10)
<< speedupStream.str() << "|\n";

return 0;
}

前言

这是阅读 Cameron Desrochers 的 A Fast Lock-Free Queue for C++ 源码的笔记。

仓库地址:https://github.com/cameron314/readerwriterqueue

其他参考文献:

An Introduction to Lock-Free Programming
C++ and Beyond 2012: Herb Sutter - atomic Weapons 1 of 2
C++ and Beyond 2012: Herb Sutter - atomic Weapons 2 of 2

内存屏障

约束 memory loads/stores 的顺序。

  • releaase 内存屏障:告诉 CPU,如果屏障之后的任何写入变得可见,那么屏障之前的任何写入都应该在其他核心中可见,前提是其他核心在读取 写屏障之后写入的数据 后执行读屏障。
    换句话说,如果线程 B 可以看到在另一个线程 A 上的写屏障之后写入的新值,那么在执行读屏障(在线程 B 上)之后,可以保证在线程 A 上的写屏障之前发生的所有写入在线程 B 上可见。

实现细节

  1. block: 一个连续的环形缓冲区,用来存储元素。这样可以预分配内存。
  2. 块过小(这不利于无锁)时,无需将所有现有元素复制到新的块中;多个块(大小独立)以循环链表的形式链接在一起。
  3. 当前插入的块称为 “尾块”,当前消费的块称为 “头块”。
  4. 头索引指向下一个要读取的满槽;尾索引指向下一个要插入的空槽。如果两个索引相等,则块为空(确切地说,当队列已满时,恰好有一个插槽为空,以避免在具有相同头和尾索引的满块和空块之间产生歧义)。
  5. 为了允许队列对任意线程创建 / 析构(独立于生产 / 消费线程),全内存屏障(memory_order_acq_cst)被用在析构函数地最后、析构函数的开头(这会强制所有的 CPU cores 同步 outstanding changes)。显然,在析构函数可以被安全地调用之前,生产者和消费者必须已经停止使用该队列。

Give me the codes

  1. 用户不需要管理内存。
  2. 预分配内存,在连续的块中。
  3. try_enqueue: 保证不会分配内存(队列有初始容量);
  4. enqueue: 会根据需要动态扩容。
  5. 没有使用 CAS loop;这意味者 enqueue 和 dequeue 是 O(1) 的(没有计入内存分配的时间)。
  6. 因为在 x86 平台,内存屏障是空操作,所以 enqueue 和 dequeue 是一系列简单的 loads 和 stores (and branches) 。

此代码仅仅适用于以原子方式处理 自然对齐的整型(aligned integer) 和 原生指针大小(native-pointer-size) 的 loads/stores 的 CPU 上;
幸运的是,这包括了所有的现代处理器(包括 ARM, x86/x86_64 和 PowerPC)。
它不是为在 DEC Alpha 上运行而设计的(DEC Alpha 似乎具有有史以来最弱的内存排序保证)。

注:在 x86 上,memory_order_acquire/release 通常不需要额外指令就能实现语义,但仍然能限制编译器的重排。
fetch_add 不是一个原子操作,而是三个:load, add, store. 所以不适用上述说的 “自然对齐的整型” 或“原生指针大小”的 load/store.

性能优化点

  1. 平凡析构:跳过析构,直接释放内存。
  2. MCRingBuffer paper
    1. cache line padding
    2. local control variables
      1. 减少对全局 read/write 指针的读取
    3. local block

正确性测试

  1. 定义不可预测性延时函数,用于模拟线程调度。
  2. 写线程塞入 32 M 个数据;读线程读取 32 M 次。读写线程中使用 unpredDelay() 模拟调度延迟。
  3. 测试能否顺序读取,失败则打印日志,不退出。
  4. 测试程序无限运行,每次使用一个写线程和读线程。直至手动 Ctrl C 关闭。

查看磁盘类型

1
2
3
$ lsblk -d -o name,rota,type,size,model
NAME ROTA TYPE SIZE MODEL
sda 1 disk 1.8T PERC H740P Mini

ROTA=1:这是旋转磁盘。

测试方法

顺序写吞吐测试(逼近最大写入速度)

1
fio --name=seqwrite --rw=write --bs=1M --size=5G --numjobs=4 --iodepth=32 --direct=1 --runtime=60 --group_reporting

随机读 IOPS 测试(逼近最大并发处理能力)

1
fio --name=randread --rw=randread --bs=4k --size=5G --numjobs=4 --iodepth=64 --direct=1 --runtime=60 --group_reporting

混合读写测试(模拟数据库负载)

1
fio --name=mixrw --rw=randrw --rwmixread=70 --bs=4k --size=5G --numjobs=4 --iodepth=32 --direct=1 --runtime=60 --group_reporting

磁盘的测试结果

由于是旋转磁盘,iodepth 总是 1(设成其他值不会生效)

单线程读写文件:

点击展开代码
    
fio_bs_test.shview raw
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
#!/bin/bash

# 测试参数
DEVICE="./testfile" # 修改为你要测试的文件或设备路径
RUNTIME=30 # 每个测试运行时间(秒)
# BLOCK_SIZES=("4k" "16k" "64k" "256k" "1M" "4M" "16M" "32M" "64M" "128M") # 测试块大小列表
BLOCK_SIZES=("128M" "64M" "32M" "16M" "4M" "1M" "256k" "64k" "16k" "4k") # 测试块大小列表
LOG_FILE="fio_bs_output.log" # 原始输出日志文件
PERFORMANCE_LOG="fio_bs_performance.log" # 性能结果日志文件

# 输出表头
printf "%-8s | %-10s | %-8s | %-10s | %-10s\n" "RW" "BlockSize" "IOPS" "BW(MiB/s)" "AvgLat(ms)" | tee "$PERFORMANCE_LOG"
echo "-------------------------------------------------------------" | tee -a "$PERFORMANCE_LOG"

echo "" > "$LOG_FILE" # 清空日志文件

# 循环测试不同块大小
for RW in read write; do
for BS in "${BLOCK_SIZES[@]}"; do
OUTPUT=$(fio --name=bs_test \
--filename="$DEVICE" \
--rw=$RW \
--bs=$BS \
--size=1G \
--time_based \
--runtime=$RUNTIME \
--numjobs=1 \
--direct=1 \
--ioengine=psync \
--group_reporting)
echo "----------------------------------------------" >> "$LOG_FILE"
echo "$OUTPUT" >> "$LOG_FILE"
echo "" >> "$LOG_FILE"

# 提取关键指标
read IOPS BW BWUNIT LAT LAT_UNIT <<< $(echo "$OUTPUT" | awk '
/IOPS=/ {match($0, /IOPS= *([0-9.]+)/, iops)}
/BW=/ {
match($0, /BW= *([0-9.]+)([KMG]iB)\/s/, bwinfo)
bwval=bwinfo[1]; bwunit=bwinfo[2]
}
/clat \(/ {match($0, /avg= *([0-9.]+),/, lat); match($0, /\(([^)]+)\)/, lat_unit)}
END {print iops[1], bwval, bwunit, lat[1], lat_unit[1]}
')

# 延迟单位换算
if [ "$LAT_UNIT" = "usec" ]; then
LAT_MS=$(awk "BEGIN {printf \"%.2f\", $LAT/1000}")
elif [ "$LAT_UNIT" = "msec" ]; then
LAT_MS=$LAT
else
LAT_MS="Unknown"
fi

# 带宽单位换算为 MiB/s
case "$BWUNIT" in
"KiB") BW_MIB=$(awk "BEGIN {printf \"%.2f\", $BW/1024}") ;;
"MiB") BW_MIB=$BW ;;
"GiB") BW_MIB=$(awk "BEGIN {printf \"%.2f\", $BW*1024}") ;;
*) BW_MIB="Unknown" ;;
esac

# 输出结果行
printf "%-8s | %-10s | %-8s | %-10s | %-10s\n" "$RW" "$BS" "$IOPS" "$BW_MIB" "$LAT_MS" | tee -a "$PERFORMANCE_LOG"
done
done

# 删除 fio 创建的测试文件
rm -f "$DEVICE"
echo "测试完成,结果已保存到 $LOG_FILE$PERFORMANCE_LOG"
1
2
3
4
5
6
7
8
9
10
11
12
RW       | BlockSize  | IOPS     | BW(MiB/s)  | AvgLat(ms)
-----------------------------------------------------------
read | 4k | 194 | 0.76 | 5.14
read | 16k | 239 | 3.74 | 4.17
read | 64k | 347 | 21.7 | 2.88
read | 256k | 271 | 67.9 | 3.68
read | 1M | 94 | 94.9 | 10.53
read | 4M | 14 | 57.2 | 69.74
read | 16M | 6 | 97.6 | 163.96
read | 32M | 3 | 111 | 288.99
read | 64M | 1 | 112 | 573.70
read | 128M | 0 | 112 | 1147.00
1
2
3
4
5
6
7
8
9
10
11
12
RW       | BlockSize  | IOPS     | BW(MiB/s)  | AvgLat(ms)
-----------------------------------------------------------
write | 4k | 1884 | 7.36 | 0.53
write | 16k | 1452 | 22.7 | 0.69
write | 64k | 793 | 49.6 | 1.26
write | 256k | 307 | 76.8 | 3.24
write | 1M | 100 | 100 | 9.96
write | 4M | 18 | 73.7 | 53.99
write | 16M | 5 | 93.8 | 169.43
write | 32M | 3 | 101 | 313.24
write | 64M | 1 | 107 | 594.46
write | 128M | 0 | 102 | 1249.35

当 BlockSize=32M 以后,写入性能基本达到顶峰(110 MiB/s),和旋转磁盘的参数基本一致。

多线程读写同一个文件,BS=64KiB:

点击展开代码
    
fio_mt_test.shview raw
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
#!/bin/bash

DEVICE="./mt_testfile" # 测试文件路径
RUNTIME=30 # 每个测试运行时间(秒)
THREADS=(1 2 4 8 16 32 64 72 120) # 测试线程数列表
BLOCK_SIZE="64k" # 块大小,可根据需要修改
LOG_FILE="fio_thread_output.log"
PERFORMANCE_LOG="fio_thread_performance.log"

# 输出表头
printf "%-8s | %-8s | %-10s | %-10s | %-10s\n" "RW" "Threads" "IOPS" "BW(MiB/s)" "AvgLat(ms)" | tee "$PERFORMANCE_LOG"
echo "---------------------------------------------------------------" | tee -a "$PERFORMANCE_LOG"

echo "" > "$LOG_FILE" # 清空日志文件

for RW in read write; do
for THREAD in "${THREADS[@]}"; do
OUTPUT=$(fio --name=thread_test \
--filename="$DEVICE" \
--rw=$RW \
--bs=$BLOCK_SIZE \
--size=5G \
--time_based \
--runtime=$RUNTIME \
--numjobs=$THREAD \
--direct=1 \
--ioengine=psync \
--group_reporting)
echo "---------------------------------------------------------------" >> "$LOG_FILE"
echo "$OUTPUT" >> "$LOG_FILE"
echo "" >> "$LOG_FILE"

# 提取关键指标
read IOPS BW BWUNIT LAT LAT_UNIT <<< $(echo "$OUTPUT" | awk '
/IOPS=/ {match($0, /IOPS= *([0-9.]+)/, iops)}
/BW=/ {
match($0, /BW= *([0-9.]+)([KMG]iB)\/s/, bwinfo)
bwval=bwinfo[1]; bwunit=bwinfo[2]
}
/clat \(/ {match($0, /avg= *([0-9.]+),/, lat); match($0, /\(([^)]+)\)/, lat_unit)}
END {print iops[1], bwval, bwunit, lat[1], lat_unit[1]}
')

# 延迟单位换算
if [ "$LAT_UNIT" = "usec" ]; then
LAT_MS=$(awk "BEGIN {printf \"%.2f\", $LAT/1000}")
elif [ "$LAT_UNIT" = "msec" ]; then
LAT_MS=$LAT
else
LAT_MS="Unknown"
fi

# 带宽单位换算为 MiB/s
case "$BWUNIT" in
"KiB") BW_MIB=$(awk "BEGIN {printf \"%.2f\", $BW/1024}") ;;
"MiB") BW_MIB=$BW ;;
"GiB") BW_MIB=$(awk "BEGIN {printf \"%.2f\", $BW*1024}") ;;
*) BW_MIB="Unknown" ;;
esac

# 输出结果行
printf "%-8s | %-8s | %-10s | %-10s | %-10s\n" "$RW" "$THREAD" "$IOPS" "$BW_MIB" "$LAT_MS" | tee -a "$PERFORMANCE_LOG"
done
done

rm -f "$DEVICE"
echo "测试完成,结果已保存到 $LOG_FILE$PERFORMANCE_LOG"
1
2
3
4
5
6
7
8
9
10
11
RW       | Threads  | IOPS       | BW(MiB/s)  | AvgLat(ms)
------------------------------------------------------------
read | 1 | 826 | 51.6 | 1.21
read | 2 | 1300 | 81.3 | 1.54
read | 4 | 1681 | 105 | 2.38
read | 8 | 1778 | 111 | 4.49
read | 16 | 1789 | 112 | 8.93
read | 32 | 1790 | 112 | 17.86
read | 64 | 1789 | 112 | 35.73
read | 72 | 1790 | 112 | 40.18
read | 120 | 1789 | 112 | 66.98
1
2
3
4
5
6
7
8
9
10
11
RW       | Threads  | IOPS       | BW(MiB/s)  | AvgLat(ms)
------------------------------------------------------------
write | 1 | 847 | 52.9 | 1.18
write | 2 | 1367 | 85.5 | 1.46
write | 4 | 1757 | 110 | 2.27
write | 8 | 1786 | 112 | 4.47
write | 16 | 1788 | 112 | 8.92
write | 32 | 1788 | 112 | 17.79
write | 64 | 1788 | 112 | 35.09
write | 72 | 1788 | 112 | 39.54
write | 120 | 1784 | 112 | 64.50

当线程数增加,IO 性能随之提高,可能原因是 64KiB 小块数据大量提交到 I/O 队列,操作系统能更好地完成读写路径优化。
但达到8线程的时候,就基本到达性能顶峰了。

注意:fio 多线程写入同一个文件是没有加锁的,如果超过 page cache (一般是 4 KB),那么可能乱序写入。

概念(以 fio 为例)

ioengine

ioengine(I/O 引擎)是 fio 提供以执行读写任务的底层接口。不同的引擎代表不同的 I/O 模型,比如同步、异步、内存映射、零拷贝等。

引擎名称 类型 特点与用途
sync 同步 默认方式,每次 I/O 都等待完成,适合简单测试
psync 同步 使用 pread/pwrite,可指定偏移,略快
libaio 异步 Linux 异步 I/O,适合高性能 SSD/NVMe
io_uring 异步 新一代 Linux 异步接口,低延迟、高并发
mmap 内存映射 将文件映射到内存,适合大文件顺序访问
splice 零拷贝 用于高效数据传输,减少 CPU 和内存开销
windowsaio 异步 Windows 原生异步 I/O,适合多线程写入
net 网络 用于网络 I/O 测试,如 socket 传输
sg SCSI 用于直接访问 SCSI 设备

每种 ioengine 都依赖操作系统提供的底层 I/O 接口。例如:

ioengine 类型 操作系统要求 是否异步 说明
sync / psync 所有系统 使用标准阻塞 I/O,几乎总是可用
libaio Linux,需安装 libaio 库 依赖 Linux 的异步 I/O 接口
io_uring Linux ≥ 5.1,推荐 ≥ 5.4 依赖新内核特性和 liburing 库
windowsaio Windows 使用 Windows 原生异步 I/O
mmap 所有主流系统 使用内存映射,适合顺序读写
posixaio POSIX 兼容系统 使用 aio_read / aio_write 接口

你可以指定任意 ioengine (默认值是 sync / psync),但它是否能运行,必须得到操作系统的支持。这包括内核版本、系统接口、库文件等。如果系统不支持,fio 会报错或自动回退。

查看支持列表:

1
fio --enghelp

这会列出当前系统上可用的 ioengine,但注意:列出来 ≠ 能用,还要看运行时是否报错。

实际测试:

1
fio --name=test --ioengine=io_uring --rw=write --size=1G --bs=1M

如果不支持,会报错,例如:

1
fio: pid=132756, err=38/file:engines/io_uring.c:1351, func=io_queue_init, error=Function not implemented

iodepth

--iodepth 是传递给内核的参数。

如果你不显式设置 --iodepth,那么 fio 会根据所选的 I/O 引擎(--ioengine) 来决定默认值

I/O 引擎 默认 iodepth
sync / psync / vsync 1(同步 I/O,只能一个一个处理)
libaio / io_uring 1,但可以设置更高以启用异步并发
mmap / pread / pwrite 1
windowsaio(Windows) 1
sg(SCSI generic) 1

fio 并不会主动维护队列,队列是内核的特性。

I/O 引擎 队列位置 是否异步 说明
psync / sync 无队列(直接调用) 每次写入调用 write(),无排队机制
libaio 内核空间 ✅ 是 使用 Linux AIO,队列在内核中,由 io_submit() 提交
io_uring 用户 + 内核共享 ✅ 是 使用环形缓冲区,用户空间提交,内核空间处理
mmap / null 用户空间 ❌ 否 模拟或跳过实际 I/O,不涉及内核队列
  • 对于支持异步 I/O 的引擎(如 libaio 或 io_uring),你可以设置更高的 iodepth(如 32、64、128)来模拟高并发负载;
  • 对 SSD 或 NVMe 设备,高 iodepth 能显著提升 IOPS 和吞吐量;
  • 对机械硬盘,提升有限,但仍可用于测试调度策略和队列行为。

但如果你的 ioengine 是 sync 或 psync,这些是同步阻塞 I/O,根本不支持高并发,所以 iodepth 实际上不会生效。

参数 说明
–name=seqwrite 定义测试任务的名称为 seqwrite,用于标识输出结果
–rw=write 设置为顺序写入模式(sequential write),数据按顺序写入磁盘
–bs=1M 每次 I/O 操作的块大小为 1MB,适合测试吞吐量
–size=5G 每个线程写入的总数据量为 5GB(不是总共,是每个 job)
–numjobs=4 启动 4 个并发线程(job),模拟多线程写入场景
–iodepth=32 每个线程的 I/O 队列深度为 32,表示最多可同时挂起 32 个 I/O 请求
本例中每个线程会发起 5G/1M=5120 个 I/O 请求
–direct=1 绕过系统缓存,直接对磁盘进行读写,更真实地反映设备性能
–runtime=60 测试持续时间为 60 秒,优先于 –size,
1. 即使数据写完也继续写更多数据直到时间结束 < br>2. 如果没有写完,则时间到就结束
–group_reporting 汇总所有线程的测试结果,输出整体性能指标而不是每个线程单独显示

可能的代码实现:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
#include <libaio.h>
#include <fcntl.h>
#include <unistd.h>
#include <string.h>
#include <stdlib.h>
#include <stdio.h>

#define FILE_PATH "testfile.bin"
#define BLOCK_SIZE 4096
#define IODEPTH 4 // 控制并发请求数量

int main() {
int fd = open(FILE_PATH, O_CREAT | O_WRONLY | O_DIRECT, 0644);
if (fd < 0) {
perror("open");
return 1;
}

// io_setup 是 libaio 的函数
io_context_t ctx = 0;
if (io_setup(IODEPTH, &ctx) < 0) {
perror("io_setup");
return 1;
}

struct iocb *iocbs[IODEPTH];
struct iocb iocb_array[IODEPTH];
char *buffers[IODEPTH];

for (int i = 0; i < IODEPTH; i++) {
// 分配对齐内存
posix_memalign((void**)&buffers[i], BLOCK_SIZE, BLOCK_SIZE);
memset(buffers[i], 'A' + i, BLOCK_SIZE);

// 初始化 iocb
io_prep_pwrite(&iocb_array[i], fd, buffers[i], BLOCK_SIZE, i * BLOCK_SIZE);
iocbs[i] = &iocb_array[i];
}

// 提交所有请求
int ret = io_submit(ctx, IODEPTH, iocbs);
if (ret < 0) {
perror("io_submit");
return 1;
}

// 等待所有请求完成
struct io_event events[IODEPTH];
io_getevents(ctx, IODEPTH, IODEPTH, events, NULL);

// 清理
for (int i = 0; i < IODEPTH; i++) {
free(buffers[i]);
}
io_destroy(ctx);
close(fd);

printf("All %d I/O requests completed.\n", IODEPTH);
return 0;
}

延迟

指标 含义 描述
slat Submission Latency 从 fio 发起 I/O 请求到内核接收该请求的时间。通常很短,单位是微秒(usec)。
clat Completion Latency 从内核接收请求到 I/O 操作完成的时间。这个是最能反映存储设备性能的部分。
lat Total Latency 总延迟,即 slat + clat,表示从 fio 发起请求到 I/O 完成的整个过程。

结果分析

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
$ fio --name=seqwrite --rw=write --bs=1M --size=5G --numjobs=2 --direct=1 --runtime=60 --group_reporting
seqwrite: (g=0): rw=write, bs=(R) 1024KiB-1024KiB, (W) 1024KiB-1024KiB, (T) 1024KiB-1024KiB, ioengine=psync, iodepth=1
...
fio-3.41
Starting 2 processes
seqwrite: Laying out IO file (1 file / 5120MiB)
seqwrite: Laying out IO file (1 file / 5120MiB)
Jobs: 2 (f=2): [W(2)][100.0%][w=112MiB/s][w=112 IOPS][eta 00m:00s]
seqwrite: (groupid=0, jobs=2): err= 0: pid=70688: Sun Sep 7 22:37:47 2025
write: IOPS=111, BW=111MiB/s (117MB/s)(6685MiB/60016msec); 0 zone resets
clat (usec): min=9606, max=74763, avg=17922.82, stdev=1446.89
lat (usec): min=9628, max=74791, avg=17951.49, stdev=1446.80
clat percentiles (usec):
| 1.00th=[15008], 5.00th=[16319], 10.00th=[16909], 20.00th=[17433],
| 30.00th=[17695], 40.00th=[17695], 50.00th=[17957], 60.00th=[17957],
| 70.00th=[18220], 80.00th=[18482], 90.00th=[19006], 95.00th=[19268],
| 99.00th=[21365], 99.50th=[22414], 99.90th=[32375], 99.95th=[40109],
| 99.99th=[74974]
bw (KiB/s): min=96062, max=116736, per=100.00%, avg=114150.20, stdev=1061.35, samples=238
iops : min= 92, max= 114, avg=110.77, stdev= 1.18, samples=238
lat (msec) : 10=0.03%, 20=97.43%, 50=2.53%, 100=0.01%
cpu : usr=0.22%, sys=0.75%, ctx=6717, majf=0, minf=67
IO depths : 1=100.0%, 2=0.0%, 4=0.0%, 8=0.0%, 16=0.0%, 32=0.0%, >=64=0.0%
submit : 0=0.0%, 4=100.0%, 8=0.0%, 16=0.0%, 32=0.0%, 64=0.0%, >=64=0.0%
complete : 0=0.0%, 4=100.0%, 8=0.0%, 16=0.0%, 32=0.0%, 64=0.0%, >=64=0.0%
issued rwts: total=0,6685,0,0 short=0,0,0,0 dropped=0,0,0,0
latency : target=0, window=0, percentile=100.00%, depth=1

Run status group 0 (all jobs):
WRITE: bw=111MiB/s (117MB/s), 111MiB/s-111MiB/s (117MB/s-117MB/s), io=6685MiB (7010MB), run=60016-60016mse

🧾 测试配置解析
bash
fio –name=seqwrite –rw=write –bs=1M –size=10G –numjobs=1 –direct=1 –runtime=60 –group_reporting
参数 含义
rw=write 顺序写入
bs=1M 每次写入块大小为 1MiB
numjobs=1 单线程写入
direct=1 使用 Direct I/O,绕过页缓存
ioengine=psync 使用同步 I/O(每次 pwrite())
iodepth=1 每次只挂起一个 I/O 请求(同步模式下默认如此)
📊 性能结果概览
指标 数值 说明
IOPS 96 每秒执行 96 次写入操作
带宽 96.2 MiB/s(101 MB/s) 每秒写入约 96 MiB 数据
总写入量 5772 MiB 在 60 秒内完成的写入总量
延迟(avg clat) 10.36 ms 每次写入的平均完成时间
CPU 使用率 usr=0.46%, sys=1.23% CPU 负载极低,瓶颈不在 CPU
⏱ 延迟分布分析
50% 的写入延迟低于 9.9 ms

95% 的写入低于 12.5 ms

99.95% 的写入延迟达到了 22.9 ms

最慢的写入高达 28.2 ms

尾部延迟略高,说明偶尔会有磁盘响应变慢的情况,可能是设备内部缓存刷新或寻址造成。

📈 带宽波动情况
平均带宽:约 96 MiB/s

最小带宽:60 MiB/s

最大带宽:104 MiB/s

标准差:6.2 MiB/s → 表明带宽相对稳定,但仍有轻微波动

🧠 深层解读
✅ 为什么 IOPS ≈ 带宽(MiB/s)?
因为你设置了 bs=1M,每次写入 1MiB 数据,所以:

Code
IOPS × Block Size = Bandwidth
96 IOPS × 1 MiB = 96 MiB/s
✅ 为什么 Direct I/O?
绕过页缓存,测试的是磁盘的真实物理性能,避免被内存加速 “欺骗”。

✅ 为什么使用 psync?
psync 是同步写入,每次调用 pwrite(),适合模拟数据库或日志系统的写入行为。但它无法并发挂起多个请求,限制了吞吐。

📌 性能瓶颈分析
磁盘类型:如果是 HDD,这个结果(96 MiB/s)非常合理;如果是 SSD,则偏低,可能受限于同步 I/O 或单线程。

IO 引擎限制:psync 是阻塞式,无法发挥磁盘的并发能力。

线程数限制:只有一个线程,磁盘可能未被充分利用。

使用共享内存优化

matrix-Mul

matrixMul.cppview raw
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
/**
* Matrix multiplication (CUDA Kernel) on the device: C = A * B
* wA is A's width and wB is B's width
*/
template <int BLOCK_SIZE> __global__ void MatrixMulCUDA(float *C, float *A, float *B, int wA, int wB)
{
// Block index
int bx = blockIdx.x;
int by = blockIdx.y;

// Thread index
int tx = threadIdx.x;
int ty = threadIdx.y;

// Index of the first sub-matrix of A processed by the block
int aBegin = wA * BLOCK_SIZE * by;

// Index of the last sub-matrix of A processed by the block
int aEnd = aBegin + wA - 1;

// Step size used to iterate through the sub-matrices of A
int aStep = BLOCK_SIZE;

// Index of the first sub-matrix of B processed by the block
int bBegin = BLOCK_SIZE * bx;

// Step size used to iterate through the sub-matrices of B
int bStep = BLOCK_SIZE * wB;

// Csub is used to store the element of the block sub-matrix
// that is computed by the thread
float Csub = 0;

// Loop over all the sub-matrices of A and B
// required to compute the block sub-matrix
for (int a = aBegin, b = bBegin; a <= aEnd; a += aStep, b += bStep) {
// Declaration of the shared memory array As used to
// store the sub-matrix of A
__shared__ float As[BLOCK_SIZE][BLOCK_SIZE];

// Declaration of the shared memory array Bs used to
// store the sub-matrix of B
__shared__ float Bs[BLOCK_SIZE][BLOCK_SIZE];

// Load the matrices from device memory
// to shared memory; each thread loads
// one element of each matrix
As[ty][tx] = A[a + wA * ty + tx];
Bs[ty][tx] = B[b + wB * ty + tx];

// Synchronize to make sure the matrices are loaded
__syncthreads();

// Multiply the two matrices together;
// each thread computes one element
// of the block sub-matrix
#pragma unroll

for (int k = 0; k < BLOCK_SIZE; ++k) {
Csub += As[ty][k] * Bs[k][tx];
}

// Synchronize to make sure that the preceding
// computation is done before loading two new
// sub-matrices of A and B in the next iteration
__syncthreads();
}

// Write the block sub-matrix to device memory;
// each thread writes one element
int c = wB * BLOCK_SIZE * by + BLOCK_SIZE * bx;
C[c + wB * ty + tx] = Csub;
}

术语

内存模型(Memory Model)是定义数据一致性执行顺序规则的一组规范。

关键概念:

  1. 原子性:操作不可打断;如果同时修改,硬件会串行排队。
  2. 可见性:对其他核心(即其他线程)可见,也就是 load 指令可以读到最新值。
  3. 顺序性:禁止本线程内的指令重排序。所以可以用作同步点。

所有原子变量都满足原子性。但是其他两者不一定满足,由内存序定义。

比如原子自增 fetch_add(relaxed) 是线程安全的,但是无法使用 load 指令读到最新的结果。

缓存一致性协议 (MESI)

状态 含义
M (Modified) 缓存行已被修改,只有当前 CPU 拥有,主内存未更新
E (Exclusive) 缓存行未被修改,只有当前 CPU 拥有,与主内存一致
S (Shared) 缓存行未被修改,多个 CPU 拥有,与主内存一致
I (Invalid) 缓存行无效,必须重新从主内存加载

relaxed 内存序

术语:

  1. 保证原子性:整个操作不可被打断;不一定立即回写主内存,可能暂时把结果放在 Store Buffer 中。
  2. 不保证顺序:编译器和CPU可能对指令重排序。
  3. 不保证可见性:其他核心不一定立即可见。

说人话:

  1. 原子性:原子操作是串行的(由硬件排队)
  2. 不保证可见性:
    • load 操作是并行的,并且不保证看到最新值。
    • “可见性”是针对程序员的,因为对于 relaxed ,没有任何指令可以保证 load 拿到最新值。它可能直接从自己的缓存行中把旧值返回给你。
    • 硬件本身是知道最新值的,它会根据 MESI 来保证自己当时拿到的是最新值(但是无法通过指令告知你)。
  3. 不保证顺序:不是同步点,无法保证前后指令与程序员的编写顺序一致。
层级 机制 作用
指令级 原子指令(如 LOCK XADD) 保证操作不可打断
缓存级 MESI 协议 控制缓存行访问,避免冲突
编译器级 编译器屏障 防止指令重排
CPU级 内存屏障 保证执行顺序
高级机制 事务性内存 实现复杂原子逻辑(可选)

release-acquire 语义

指令重排序

  1. 编译器指令重排:编译器屏障防止指令重排
  2. CPU 乱序执行:内存屏障保证执行顺序。

原子指令

原子指令(如 LOCK XADD):保证操作不可打断

Load / Store Buffer

缓存行

内存屏障

位置无关代码(PIC)

  • 位置无关代码(PIC):程序可以在内存中的任意位置运行,不需要修改代码中的绝对地址。
  • 节省空间:相比使用 64 位绝对地址,RIP 相对寻址只需要一个 32 位偏移量。
  • 更安全:支持地址随机化(ASLR),提高程序的安全性。

在 x86-64 架构中,传统的绝对地址寻址方式不再适用于位置无关代码。于是引入了 RIP(指令指针)相对寻址:

假设你有一个全局变量 int x = 42;,在汇编中访问它可能会变成:

1
2
asm
mov eax, DWORD PTR [rip + offset_to_x]

这里的 offset_to_x 是编译器计算出来的 x 相对于当前指令的偏移量。

寻址方式 描述 是否位置无关
绝对地址寻址 使用固定地址,如 [0x400123] ❌ 否
寄存器间接寻址 如 [rax],地址由寄存器决定 ✅ 是
RIP 相对寻址 如 [rip + offset],相对当前指令位置 ✅ 是

但并不是所有 PIC 都用 RIP 相对寻址,PIC 的实现方式取决于:

  • 架构:在 x86(32 位)中没有 RIP 寄存器,PIC 通常通过 call 指令获取当前地址,再加偏移量。
  • 编译器策略:有些编译器会使用全局偏移表(GOT)或过程链接表(PLT)来实现位置无关性。
  • 访问目标:访问函数地址时可能通过 PLT;访问外部变量时可能通过 GOT;访问静态数据时可能用 RIP 相对寻址。
架构 是否使用 RIP 相对寻址 是否支持位置无关代码
x86-64 ✅ 常用,尤其访问数据段 ✅ 强力支持(默认启用)
x86 (32位) ❌ 无 RIP,用其他方式实现 ✅ 但需要特殊技巧

举个 gdb 调试的例子:

1
2
3
4
(gdb) x/i $rip
=> 0x2ac084b5ec10 <poll>: cmpl $0x0,0x2d939d(%rip) # 0x2ac084e37fb4 <__libc_multiple_threads>
(gdb) p (bool)$__libc_multiple_threads
true
  • cmpl $0x0, 0x2d939d(%rip) 是一条比较指令(cmp),用于将某个内存地址中的值与立即数 0 进行比较。
  • (%rip) 表示使用 RIP 相对寻址,这是 x86-64 架构中常见的一种寻址方式。
  • 实际比较的是地址 0x2ac084e37fb4 处的值,也就是 __libc_multiple_threads 这个变量。

__libc_multiple_threads 是什么?

  • 这是 GNU C 库(glibc)中的一个内部变量,用来标记当前进程是否启用了多线程。
  • 如果这个值是 0,说明当前进程是单线程。
  • 如果是非零,说明进程中有多个线程。

所以这条指令的作用是:判断当前进程是否是多线程环境,可能用于决定是否启用线程安全的行为。

为什么使用 RIP 相对寻址?

  1. RIP 是唯一始终已知的寄存器
  • 在执行指令时,CPU总是知道当前指令的地址(即 RIP)。
  • 所以可以在编译时计算出目标数据与当前指令之间的偏移量,而不需要知道数据的绝对地址。

这就允许编译器生成位置无关代码,即使程序被加载到不同的内存地址,偏移量仍然有效。

  1. 其他寄存器值是动态的,不可预测
  • 比如 RBX、RAX、RDI 等寄存器,它们的值在运行时可能被程序修改。
  • 如果用这些寄存器做基址寻址,编译器就无法提前知道它们的值,也就无法生成稳定的偏移量。
  1. 支持共享库和地址空间布局随机化(ASLR)
  • RIP 相对寻址让代码段不依赖固定地址,可以被多个进程共享。
  • 也支持操作系统在运行时随机加载地址,提高安全性(ASLR)。
  1. 节省指令空间
  • 使用 RIP 相对寻址只需要一个 32 位偏移量。
  • 如果使用绝对地址,需要嵌入完整的 64 位地址,指令长度更长,效率更低。

为什么使用 RIP 相对寻址只需要一个 32 位偏移量

在 x86-64 架构中,RIP 相对寻址的偏移量被设计为一个有符号的 32 位整数,也就是一个 displacement(位移)字段,它在机器码中只占用 4 个字节。

  • RIP 是 64 位的指令指针,表示当前指令的地址。

  • RIP 相对寻址的目标地址是通过:

    目标地址 = 下一条指令地址(RIP) + 32 位偏移量

  • 这个偏移量是一个 有符号整数,所以它的范围是:

    从 −2³¹ 到 +2³¹−1,即 ±2GB 的寻址范围。

这意味着,当前指令附近 ±2GB 范围内的任何数据都可以通过 RIP 相对寻址访问。

优点 说明
✅ 节省空间 只用 4 字节表示偏移,比使用完整 64 位地址节省指令长度
✅ 支持位置无关代码 编译器只需计算偏移,不依赖绝对地址
✅ 高效 CPU 执行时只需加法运算,无需查表或重定位
✅ 安全 支持地址空间布局随机化(ASLR),提高安全性

为什么可以被多个进程共享?

因为代码中不再硬编码具体地址,多个进程可以:

  • 使用同一份物理内存中的代码段。
  • 每个进程有自己的数据段,但共享同一份只读代码。

这大大节省了内存,提高了系统效率。

举个例子:

进程 加载地址 使用的代码段
A 0x400000 使用共享代码段
B 0x500000 使用共享代码段

两者的代码段内容完全一样,因为里面的寻址是相对 RIP 的,不依赖于加载地址。

为什么绝对寻址不可以被多进程共享?

  • 每个进程的虚拟地址空间是独立的
    • 操作系统为每个进程分配独立的虚拟地址空间。
    • 即使两个进程都加载了同一个程序,它们的地址空间可能完全不同。
    • 如果代码中使用绝对地址,加载到不同地址空间后,这些地址就不再有效。

所以,绝对地址在一个进程中是有效的,在另一个进程中可能就指向错误的地方或根本不存在。

  • 需要重定位,无法直接共享物理页

    • 如果使用绝对地址,操作系统必须在每个进程加载时对代码进行“重定位”,修改指令中的地址。
    • 一旦修改,代码段就变成了进程私有,不能共享同一份物理内存。
    • 而位置无关代码(如使用 RIP 相对寻址)不需要修改,可以直接映射到多个进程的地址空间。
  • 违反共享库的设计原则

    • 动态链接库(如 .so.dll)的核心优势就是可以被多个进程共享。
    • 如果库中使用绝对地址,每个进程都要有自己的副本,失去了共享的意义。
    • 正确做法是使用位置无关代码(PIC),让库在任意地址都能运行。
区域 是否可共享 原因说明
代码段 ✅ 是 只读 + 位置无关,多个进程可映射同一物理页
数据段 ❌ 否 每个进程的数据不同,需独立副本
❌ 否 动态分配,地址空间不同
❌ 否 私有调用栈,不能混用
共享内存段 ✅ 是 显式创建,专门用于共享

如果你想深入了解某个进程的内存布局,可以分析 /proc/[pid]/maps 或用工具如 pmapvmmap

1. 前言

我们在调试 release 版本的程序时,由于缺乏符号信息,所以需要通过寄存器来查看函数的参数、返回值等。

2. 寄存器

2.1. 通用寄存器 (General Purpose Registers)

寄存器名 英文名称 作用
rax Accumulator 累加器,通常用于算术运算和函数返回值存储。
rbx Base 基址寄存器,常用于存储数据或指针。
rsi Source Index 源索引寄存器,常用于字符串操作中的源地址指针(函数第一个参数)。
rdi Destination Index 目标索引寄存器,常用于字符串操作中的目标地址指针或结构体指针(函数第二个参数)。
rdx Data 数据寄存器,常用于 I/O 操作或乘除法运算中的扩展数据存储(函数第三个参数)。
rcx Counter 计数器寄存器,常用于循环计数或字符串操作中的计数(函数第四个参数)。
rsp Stack Pointer 栈指针寄存器,指向当前栈顶。
rbp Base Pointer 基址指针寄存器,指向当前栈帧的基址。
r8~r15 General Purpose 通用寄存器,扩展的 64 位寄存器之一,用于存储数据或指针(r8r9 常用于保存函数第五六个参数)。

2.2. 特殊用途寄存器 (Special Purpose Registers)

寄存器名 英文名称 作用
rip Instruction Pointer 指令指针寄存器,存储当前执行指令的地址。
rflags Flags 标志寄存器,存储状态标志位(如进位、溢出、零标志等)。

2.3. 段寄存器 (Segment Registers)

寄存器名 英文名称 作用
cs Code Segment 代码段寄存器,指向当前代码段的基址。
ds Data Segment 数据段寄存器,指向当前数据段的基址。
es Extra Segment 额外段寄存器,指向额外数据段的基址。
fs FS Segment 特殊用途段寄存器,常用于线程本地存储等。
gs GS Segment 特殊用途段寄存器,常用于线程本地存储等。
ss Stack Segment 栈段寄存器,指向当前栈段的基址。

2.4. 浮点与向量寄存器 (Floating Point and Vector Registers)

寄存器名 英文名称 作用
xmm0-xmm15 SIMD Registers 用于 SSE 指令集的 128 位向量运算。
ymm0-ymm15 AVX Registers 用于 AVX 指令集的 256 位向量运算。
zmm0-zmm31 AVX-512 Registers 用于 AVX-512 指令集的 512 位向量运算。

2.5. 函数调用时的参数传递

在 x86_64 架构中,函数调用时的参数传递遵循 System V AMD64 ABI(Linux/Unix 系统的标准调用约定)。

前六个整数或指针类型的参数依次存储在以下寄存器中:

  1. rdi - 第一个参数
  2. rsi - 第二个参数
  3. rdx - 第三个参数
  4. rcx - 第四个参数
  5. r8 - 第五个参数
  6. r9 - 第六个参数

对于浮点类型的参数(如 floatdouble),前八个参数存储在以下 SSE 寄存器 中:

  1. xmm0 - 第一个浮点参数
  2. xmm1 - 第二个浮点参数
  3. xmm2 - 第三个浮点参数
  4. xmm3 - 第四个浮点参数
  5. xmm4 - 第五个浮点参数
  6. xmm5 - 第六个浮点参数
  7. xmm6 - 第七个浮点参数
  8. xmm7 - 第八个浮点参数

溢出参数(超过寄存器数量)会依次存储在 中:

  • 超过寄存器数量(整数参数超过 6 个,浮点参数超过 8 个)的参数会依次压入栈中。
  • 栈需要保持 16 字节对齐,可能会插入填充字节。
  • 可以通过访问栈指针(rsp)或基址指针(rbp)来找到栈上的参数。
    • 使用 rsp(栈指针)
      • 在函数入口时,rsp 指向栈顶(即返回地址的下一个位置)。
      • 栈上的第一个参数位于 [rsp + 8](跳过返回地址)。
      • 第二个参数位于 [rsp + 16],依此类推。
    • 使用 rbp(基址指针)
      • 如果函数使用了帧指针(rbp),rbp 通常指向调用者的栈帧基址。
      • 栈上的第一个参数位于 [rbp + 16](跳过返回地址和保存的 rbp)。
      • 第二个参数位于 [rbp + 24],依此类推。

3. 栈

3.1. 理解栈布局

在函数调用时,栈的布局通常如下(从高地址到低地址):

  1. 返回地址:调用函数时,call 指令会将返回地址(下一条指令的地址)压入栈中。
  2. 溢出参数:如果参数超过寄存器数量,多余的参数会依次压入栈中。
  3. 栈对齐填充:为了满足 16 字节对齐要求,可能会有额外的填充字节。
  4. 局部变量和保存的寄存器:函数内部可能会在栈上分配空间用于局部变量或保存调用者的寄存器。

3.2. 函数调用时的压栈过程

在x86_64架构中,函数调用时会涉及到栈的操作,包括压栈和出栈。这些操作主要用于保存调用者的上下文(如返回地址、寄存器值)以及为被调用函数分配栈帧。

3.2.1. 调用者(Caller)的操作

  1. 压入返回地址
    当调用者使用 call 指令调用函数时,CPU会自动将返回地址(下一条指令的地址)压入栈中。此时,rsp(栈指针)会减少8字节(64位系统)。

    1
    2
    3
    4
    call function
    # 等价于:
    push rip ; 将返回地址压入栈
    jmp function
  2. 压入溢出参数(如果有)
    如果函数的参数超过了寄存器数量(整数参数超过6个,浮点参数超过8个),多余的参数会从右到左依次压入栈中。rsp 会随着每个参数的压入减少。

  3. 对齐栈
    为了满足 16字节对齐 的要求,调用者可能会插入额外的填充字节,使得 rsp 在调用函数前保持16字节对齐。

3.2.2. 被调用者(Callee)的操作

  1. 保存调用者的栈帧基址
    被调用者通常会保存调用者的栈帧基址(rbp),以便在函数返回时恢复调用者的栈帧。

    1
    2
    push rbp       ; 保存调用者的 rbp
    mov rbp, rsp ; 设置当前函数的栈帧基址
  2. 分配栈空间
    被调用者会根据函数内部局部变量的需求,在栈上分配空间。rsp 会减少相应的字节数。

    1
    sub rsp, <size>  ; 为局部变量分配栈空间

3.3. 2. 函数返回时的出栈过程

3.3.1. 被调用者(Callee)的操作

  1. 释放局部变量的栈空间
    被调用者在返回前会释放为局部变量分配的栈空间。

    1
    add rsp, <size>  ; 恢复 rsp
  2. 恢复调用者的栈帧基址
    被调用者会恢复调用者的 rbp,以确保调用者的栈帧完整。

    1
    pop rbp  ; 恢复调用者的 rbp
  3. 返回到调用者
    被调用者使用 ret 指令从栈中弹出返回地址,并跳转到该地址。

    1
    ret  ; 等价于:pop rip

3.3.2. 调用者(Caller)的操作

  1. 清理栈上的参数(如果需要)
    如果调用约定要求调用者清理栈上的参数(如 cdecl 调用约定),调用者会调整 rsp
    1
    add rsp, <size>  ; 清理栈上的参数

3.4. 3. 栈指针(rsp)和基址指针(rbp)的变化

以下是一个函数调用的栈布局示例:

C代码

1
2
3
4
5
6
7
8
void example(int a, int b) {
int x = a + b;
}

int main() {
example(1, 2);
return 0;
}

汇编代码(简化版)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
# main 函数
main:
sub rsp, 16 ; 对齐栈
mov edi, 1 ; 第一个参数 -> rdi
mov esi, 2 ; 第二个参数 -> rsi
call example ; 调用 example 函数
add rsp, 16 ; 恢复栈
ret

# example 函数
example:
push rbp ; 保存调用者的 rbp
mov rbp, rsp ; 设置当前栈帧基址
sub rsp, 16 ; 为局部变量分配栈空间
mov eax, edi ; a -> eax
add eax, esi ; a + b
leave ; 恢复栈帧(等价于:mov rsp, rbp; pop rbp)
ret ; 返回调用者

栈布局变化

操作 rsp 变化 栈内容(从高地址到低地址)
call example rsp -= 8 返回地址
push rbp rsp -= 8 保存调用者的 rbp
sub rsp, 16 rsp -= 16 为局部变量分配空间
leave rsp += 16 释放局部变量空间
ret rsp += 8 弹出返回地址

3.5. 总结

  1. 函数调用的栈操作
    • 调用者负责压入返回地址和溢出参数。
    • 被调用者负责保存 rbp 和分配局部变量空间。
    • 函数返回时,释放局部变量空间并恢复调用者的栈帧。
  2. rsprbp 的变化
    • rsp 指向栈顶,动态变化。
    • rbp 指向栈帧基址,通常固定不变。

4. 使用 gdb 查看寄存器

1
2
3
4
5
6
7
8
9
10
11
12
13
# 查看所有寄存器
info registers

# 查看指定寄存器
info registers rdi
# 或简写
i r rdi

# 查看十六进制
p/x $rdx # 十六进制
p/d $rdx # 十进制
# 或简写
p $rdx

其中,info registers 会打印三列:

  • 第一列:寄存器名称
  • 第二列:寄存器的值(十六进制)
  • 第三列:寄存器的值(十进制;也可能是十六进制,用 0x 开头)

info registers rdip $rdi 效果相同。

从寄存器查到的内存地址,可以用 x (examinze)命令来查看内存的值:

1
2
3
4
5
6
7
8
9
10
# 查看指令
x/i $rip
# 查看栈顶
x/16x $rsp

# 查看内存
p $rdi
x/2gx $rdi
# 或先用 $rdi 查出内存地址,直接用地址访问
x/2gx 47926411878160

x 命令的说明:

1
x/FMT ADDRESS

其中:

  • x:表示“examine memory”(查看内存)
  • 2:数字,表示要查看的单元数
  • g:表示每个单元的 size,有 b(byte), h(halfword), w(word), g(giant, 8 bytes)
  • x:表示值的格式,有 o(octal), x(hex), d(decimal), u(unsigned decimal), t(binary), f(float), a(address), i(instruction), c(char), s(string)
    and z(hex, zero padded on the left).

在 gdb 命令行中使用 help 命令,可以查看命令的说明。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
(gdb) help x
Examine memory: x/FMT ADDRESS.
ADDRESS is an expression for the memory address to examine.
FMT is a repeat count followed by a format letter and a size letter.
Format letters are o(octal), x(hex), d(decimal), u(unsigned decimal),
t(binary), f(float), a(address), i(instruction), c(char), s(string)
and z(hex, zero padded on the left).
Size letters are b(byte), h(halfword), w(word), g(giant, 8 bytes).
The specified number of objects of the specified size are printed
according to the format. If a negative number is specified, memory is
examined backward from the address.

Defaults for format and size letters are those previously used.
Default count is 1. Default address is following last thing printed
with this command or "print".

5. 使用 gdb 查看栈

  • bt
  • frame
  • args
  • locals
  • x

TDODO

5.1. frame 与寄存器的值

  • GDB 中的寄存器值(如 $rax, $rdi, $rsp 等)是当前 CPU 执行上下文的快照。
  • 当你切换到 frame 0(最内层栈帧)时,寄存器值是最真实的,因为这是程序当前正在执行的地方。
  • 当你切换到 外层栈帧(frame 1, 2, …)时,GDB 会尝试还原当时的寄存器状态,但这依赖于:
    • 编译器是否保存了寄存器值(如 callee-saved)
    • 是否有调试符号或 unwind 信息
    • GDB 是否能推断出寄存器的保存位置

寄存器值可能出现的情况

情况 表现
寄存器是 caller-saved(如 rdi, rsi, rax) 可能显示 或错误值
寄存器是 callee-saved(如 rbx, rbp, r12~r15) 通常能正确还原
没有调试信息或优化严重 GDB 无法还原,显示当前值或

建议

  • 如果你要分析寄存器状态,最好在 frame 0 或断点处进行。
  • 如果你在分析 core dump 或栈破坏问题,寄存器值只能作为参考,不要完全依赖外层 frame 的寄存器快照。
  • 使用 info args 和 info locals 更可靠地查看参数和局部变量(如果有符号信息)。

6. 在特定线程中设置断点

6.1. 断点只作用于某线程

1
2
3
4
5
6
7
8
9
10
11
12
13
# 查看所有线程 ID 和当前线程 ID(gdb 中会使用 * 标注当前线程)
(gdb) info threads
# 切换当前上下文到指定线程
(gdb) thread <THREAD_ID>
# 通过查看当前堆栈是不是自己要断点的线程
(gdb) bt
(gdb) break LOCATION thread THREADNUM
# 条件断点
(gdb) break source.c:123 thread 5 if fds[0].fd == 7
# 如果没有 debug 符号,可以利用函数返回值寄存器断点
(gdb) break poll thread 2 if $rdx > 0
# 完整格式
break [PROBE_MODIFIER] [LOCATION] [thread THREADNUM] [if CONDITION]

6.2. 锁定调度器,只让当前线程运行

默认情况下,GDB 会让所有线程一起运行(比如你执行 continue 时)。如果你只想让当前线程运行,其它线程保持暂停,可以使用:

1
(gdb) set scheduler-locking on

这表示:只有当前线程会执行,其他线程全部暂停。

其中模式还有:

模式 说明
off 默认值,所有线程都可以运行
on 只有当前线程运行,其他线程暂停
step 单步调试时只运行当前线程,continue 时其他线程也会运行

你可以随时切换:

1
(gdb) set scheduler-locking step
  • 如果你在调试死锁、竞态或线程间通信问题,锁定调度器是非常有效的方式。
  • 如果你在调试某个 poll() 或 epoll_wait() 调用,只想观察某个线程的行为,可以结合 catch syscall 和 thread 命令一起使用。

7. 查看汇编代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
# 查看汇编代码,其中 "=>" 标记的是当前执行位置
(gdb) disassemble
# 反汇编指定地址范围
# 这会显示从当前指令开始的 32 字节范围内的汇编代码。
(gdb) disassemble $rip, $rip+32

# 查看当前指令(x86)
(gdb) x/i $pc
# 或在 x86-64 架构下:
(gdb) x/i $rip

# Bonus: 默认 GDB 使用 AT&T 风格(如 %rax),你可以切换为 Intel 风格
# 这样输出会更接近你在汇编教材或 IDA Pro 中看到的格式
(gdb) set disassembly-flavor intel

8. 位置无关代码(PIC)

8.1. 什么是 PIC

  • PIC(Position Independent Code,位置无关代码)是一种编译方式,使得生成的代码可以在内存中的任意位置运行,而无需硬编码绝对地址。
  • 在动态链接库(shared libraries)中,通常需要使用 PIC,以便库可以被加载到任意内存地址。
  • 节省空间:相比使用 64 位绝对地址,RIP 相对寻址只需要一个 32 位偏移量。
  • 更安全:支持地址随机化(ASLR),提高程序的安全性。

8.2. PIC 的实现

  1. 访问全局变量
    在 PIC 模式下,代码通过 全局偏移表(GOT, Global Offset Table)过程链接表(PLT, Procedure Linkage Table) 访问全局变量和函数地址。

  2. 寄存器 rip 的使用
    x86_64 支持基于 rip(指令指针)的寻址方式,PIC 会利用 rip 相对寻址来访问全局变量或函数地址,而不是使用绝对地址。

在 x86-64 架构中,传统的绝对地址寻址方式不再适用于位置无关代码。于是引入了 RIP(指令指针)相对寻址:

假设你有一个全局变量 int x = 42;,在汇编中访问它可能会变成:

1
2
asm
mov eax, DWORD PTR [rip + offset_to_x]

这里的 offset_to_x 是编译器计算出来的 x 相对于当前指令的偏移量。

寻址方式 描述 是否位置无关
绝对地址寻址 使用固定地址,如 [0x400123] ❌ 否
寄存器间接寻址 如 [rax],地址由寄存器决定 ✅ 是
RIP 相对寻址 如 [rip + offset],相对当前指令位置 ✅ 是

8.3. PIC 的优化

  • 减少重定位:通过 rip 相对寻址,避免了加载时的重定位操作,提高了加载速度。
  • 共享内存:多个进程可以共享同一段动态库代码,而无需为每个进程生成独立的副本。

8.4. 示例

1
2
mov rax, [rip + global_var@GOTPCREL]  ; 通过 GOT 表访问全局变量
call [rip + func@PLT] ; 通过 PLT 表调用函数

举个 gdb 调试的例子:

1
2
3
4
(gdb) x/i $rip
=> 0x2ac084b5ec10 <poll>: cmpl $0x0,0x2d939d(%rip) # 0x2ac084e37fb4 <__libc_multiple_threads>
(gdb) p (bool)$__libc_multiple_threads
true
  • cmpl $0x0, 0x2d939d(%rip) 是一条比较指令(cmp),用于将某个内存地址中的值与立即数 0 进行比较。
  • (%rip) 表示使用 RIP 相对寻址,这是 x86-64 架构中常见的一种寻址方式。
  • 实际比较的是地址 0x2ac084e37fb4 处的值,也就是 __libc_multiple_threads 这个变量。

__libc_multiple_threads 是什么?

  • 这是 GNU C 库(glibc)中的一个内部变量,用来标记当前进程是否启用了多线程。
  • 如果这个值是 0,说明当前进程是单线程。
  • 如果是非零,说明进程中有多个线程。

所以这条指令的作用是:判断当前进程是否是多线程环境,可能用于决定是否启用线程安全的行为。

8.5. 为什么使用 RIP 相对寻址?

  1. RIP 是唯一始终已知的寄存器
  • 在执行指令时,CPU 总是知道当前指令的地址(即 RIP)。
  • 所以可以在编译时计算出目标数据与当前指令之间的偏移量,而不需要知道数据的绝对地址。

这就允许编译器生成位置无关代码,即使程序被加载到不同的内存地址,偏移量仍然有效。

  1. 其他寄存器值是动态的,不可预测
  • 比如 RBX、RAX、RDI 等寄存器,它们的值在运行时可能被程序修改。
  • 如果用这些寄存器做基址寻址,编译器就无法提前知道它们的值,也就无法生成稳定的偏移量。
  1. 支持共享库和地址空间布局随机化(ASLR)
  • RIP 相对寻址让代码段不依赖固定地址,可以被多个进程共享。
  • 也支持操作系统在运行时随机加载地址,提高安全性(ASLR)。
  1. 节省指令空间
  • 使用 RIP 相对寻址只需要一个 32 位偏移量。
  • 如果使用绝对地址,需要嵌入完整的 64 位地址,指令长度更长,效率更低。

8.6. 为什么使用 RIP 相对寻址只需要一个 32 位偏移量

在 x86-64 架构中,RIP 相对寻址的偏移量被设计为一个有符号的 32 位整数,也就是一个 displacement(位移)字段,它在机器码中只占用 4 个字节。

  • RIP 是 64 位的指令指针,表示当前指令的地址。

  • RIP 相对寻址的目标地址是通过:

    目标地址 = 下一条指令地址(RIP) + 32 位偏移量

  • 这个偏移量是一个 有符号整数,所以它的范围是:

    从 −2³¹ 到 +2³¹−1,即 ±2GB 的寻址范围。

这意味着,当前指令附近 ±2GB 范围内的任何数据都可以通过 RIP 相对寻址访问。

优点 说明
✅ 节省空间 只用 4 字节表示偏移,比使用完整 64 位地址节省指令长度
✅ 支持位置无关代码 编译器只需计算偏移,不依赖绝对地址
✅ 高效 CPU 执行时只需加法运算,无需查表或重定位
✅ 安全 支持地址空间布局随机化(ASLR),提高安全性

8.7. 为什么可以被多个进程共享?

因为代码中不再硬编码具体地址,多个进程可以:

  • 使用同一份物理内存中的代码段。
  • 每个进程有自己的数据段,但共享同一份只读代码。

这大大节省了内存,提高了系统效率。

举个例子:

进程 加载地址 使用的代码段
A 0x400000 使用共享代码段
B 0x500000 使用共享代码段

两者的代码段内容完全一样,因为里面的寻址是相对 RIP 的,不依赖于加载地址。

为什么绝对寻址不可以被多进程共享?

  • 每个进程的虚拟地址空间是独立的
    • 操作系统为每个进程分配独立的虚拟地址空间。
    • 即使两个进程都加载了同一个程序,它们的地址空间可能完全不同。
    • 如果代码中使用绝对地址,加载到不同地址空间后,这些地址就不再有效。

所以,绝对地址在一个进程中是有效的,在另一个进程中可能就指向错误的地方或根本不存在。

  • 需要重定位,无法直接共享物理页

    • 如果使用绝对地址,操作系统必须在每个进程加载时对代码进行“重定位”,修改指令中的地址。
    • 一旦修改,代码段就变成了进程私有,不能共享同一份物理内存。
    • 而位置无关代码(如使用 RIP 相对寻址)不需要修改,可以直接映射到多个进程的地址空间。
  • 违反共享库的设计原则

    • 动态链接库(如 .so.dll)的核心优势就是可以被多个进程共享。
    • 如果库中使用绝对地址,每个进程都要有自己的副本,失去了共享的意义。
    • 正确做法是使用位置无关代码(PIC),让库在任意地址都能运行。
区域 是否可共享 原因说明
代码段 ✅ 是 只读 + 位置无关,多个进程可映射同一物理页
数据段 ❌ 否 每个进程的数据不同,需独立副本
❌ 否 动态分配,地址空间不同
❌ 否 私有调用栈,不能混用
共享内存段 ✅ 是 显式创建,专门用于共享

如果你想深入了解某个进程的内存布局,可以分析 /proc/[pid]/maps 或用工具如 pmapvmmap

9. 实际 debug 例子:在多线程中查看 poll 的事件

先复习下 poll 函数:

1
2
3
4
5
6
7
8
int poll(struct pollfd *fds, nfds_t nfds, int timeout);

// 第一个参数 fds 的类型
struct pollfd {
int fd; /* file descriptor */
short events; /* requested events */
short revents; /* returned events */
};

FIXME:这种在汇编代码 ret 前断点,并依据 raxrdi 设置条件断点的方式不可靠,因为可能进入了 libc 层。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
# 查看 polll 的汇编代码
(gdb) disass poll
Dump of assembler code for function poll:
0x00002ac084b5ec10 <+0>: cmpl $0x0,0x2d939d(%rip) # 0x2ac084e37fb4 <__libc_multiple_threads>
0x00002ac084b5ec17 <+7>: jne 0x2ac084b5ec29 <poll+25>
0x00002ac084b5ec19 <+0>: mov $0x7,%eax
0x00002ac084b5ec1e <+5>: syscall
0x00002ac084b5ec20 <+7>: cmp $0xfffffffffffff001,%rax
0x00002ac084b5ec26 <+13>: jae 0x2ac084b5ec59 <poll+73>
0x00002ac084b5ec28 <+15>: ret
0x00002ac084b5ec29 <+25>: sub $0x8,%rsp
0x00002ac084b5ec2d <+29>: call 0x2ac084b77600 <__libc_enable_asynccancel>
0x00002ac084b5ec32 <+34>: mov %rax,(%rsp)
0x00002ac084b5ec36 <+38>: mov $0x7,%eax
0x00002ac084b5ec3b <+43>: syscall
0x00002ac084b5ec3d <+45>: mov (%rsp),%rdi
0x00002ac084b5ec41 <+49>: mov %rax,%rdx
0x00002ac084b5ec44 <+52>: call 0x2ac084b77660 <__libc_disable_asynccancel>
0x00002ac084b5ec49 <+57>: mov %rdx,%rax
0x00002ac084b5ec4c <+60>: add $0x8,%rsp
0x00002ac084b5ec50 <+64>: cmp $0xfffffffffffff001,%rax
0x00002ac084b5ec56 <+70>: jae 0x2ac084b5ec59 <poll+73>
=> 0x00002ac084b5ec58 <+72>: ret
0x00002ac084b5ec59 <+73>: mov 0x2d31f0(%rip),%rcx # 0x2ac084e31e50
0x00002ac084b5ec60 <+80>: neg %eax
0x00002ac084b5ec62 <+82>: mov %eax,%fs:(%rcx)
0x00002ac084b5ec65 <+85>: or $0xffffffffffffffff,%rax
0x00002ac084b5ec69 <+89>: ret

# 找到所有的 ret 指令,设置条件断点
# 注意:最好是在 ret 指令之前的指令上也加上断点,
# 因为 ret 的时候,可能已经把当前栈(除 rsp / rbp 外)都弹出了,寄存器中将看不到当前栈的信息
#
# $rax 是返回值寄存器,也就是返回大于 0 时,让进程暂停
# 这里的 * 表示取内存的值(存放的是指令),* 断不可少,不然会被认为是 Function name
#
(gdb) b *0x00002ac084b5ec26 thread 4 if $rax > 0
(gdb) b *0x00002ac084b5ec28 thread 4 if $rax > 0
(gdb) b *0x00002ac084b5ec56 thread 4 if $rax > 0
(gdb) b *0x00002ac084b5ec58 thread 4 if $rax > 0
(gdb) b *0x00002ac084b5ec65 thread 4 if $rax > 0
(gdb) b *0x00002ac084b5ec69 thread 4 if $rax > 0

# 继续运行
(gdb) c

# 当 IO 事件发生,程序会被暂停