互斥与同步的实现和使用
互斥与同步的实现和使用
在进程/线程并发执行的过程中,进程/线程之间存在协作的关系,例如有互斥、同步的关系。
为了实现进程/线程间正确的协作,操作系统必须提供实现进程协作的措施和方法,主要的方法有两种:
- 锁:加锁、解锁操作(Test-and-Set 原子指令)
- 信号量:P、V 操作(P、V 原子指令)
这两个都可以方便地实现进程/线程互斥,而信号量比锁的功能更强一些,它还可以方便地实现进程/线程同步。
锁
使用加锁操作和解锁操作可以解决并发线程/进程的互斥问题。
任何想进入临界区的线程,必须先执行加锁操作。若加锁操作顺利通过,则线程可进入临界区;在完成对临界资源的访问后再执行解锁操作,以释放该临界资源。
根据锁的实现不同,可以分为「忙等待锁」和「无忙等待锁」。
原子操作、测试和置位
我们先来看看「忙等待锁」的实现
在说明「忙等待锁」的实现之前,先介绍现代 CPU 体系结构提供的特殊 原子操作指令 —— 测试和置位(Test-and-Set)指令。
如果用 C 代码表示 Test-and-Set 指令,形式如下:
/**
* (原子性) 测试和置位
*
* 函数作用:
* - 把 `old_ptr` 更新为 `new` 的新值
* - 返回 `old_ptr` 的旧值;
*/
int TestAndSet(int* old_ptr, int new) {
int old = *old_ptr;
*old_ptr = new;
return old;
}
当然,关键是这些代码是原子执行。因为既可以测试旧值,又可以设置新值,所以我们把这条指令叫作「测试并设置」。
那什么是原子操作呢?原子操作就是要么全部执行,要么都不执行,不能出现执行到一半的中间状态
(1) 忙等待锁 (也叫 自旋锁)
我们可以运用 Test-and-Set 指令来实现「忙等待锁」,代码如下:
typedef struct lock_t {
int flag;
} lock_t;
void init(lock t *lock){
lock->flag = 0;
}
void lock(lock_t *lock){
while(TestAndset(&lock->flag, 1) == 1)
;// do noting
}
void unlock(lock_t *lock){
lock->flag = 0;
}
我们来确保理解为什么这个锁能工作:
- 第一个场景是,首先假设一个线程在运行,调用
lock()
,没有其他线程持有锁,所以flag
是 0。当调用TestAndSet(flag, 1)
方法,返回 0,线程会跳出 while 循环,获取锁。同时也会原子的设置 flag 为1,标志锁已经被持有。当线程离开临界区,调用unlock()
将flag
清理为 0。 - 第二种场景是,当某一个线程已经持有锁(即
flag
为1)。本线程调用lock()
,然后调用TestAndSet(flag, 1)
,这一次返回 1。只要另一个线程一直持有锁,TestAndSet()
会重复返回 1,本线程会一直忙等。当flag
终于被改为 0,本线程会调用TestAndSet()
,返回 0 并且原子地设置为 1,从而获得锁,进入临界区。
很明显,当获取不到锁时,线程就会一直 while 循环,不做任何事情,所以就被称为「忙等待锁」,也被称为 自旋锁(spin lock)。
这是最简单的一种锁,一直自旋,利用 CPU 周期,直到锁可用。在单处理器上,需要抢占式的调度器(即不断通过时钟中断一个线程,运行其他线程)。否则,自旋锁在单 CPU 上无法使用,因为一个自旋的线程永远不会放弃 CPU。
(2) 无等待锁
无等待锁顾明思议就是获取不到锁的时候,不用自旋。
既然不想自旋,那当没获取到锁的时候,就把当前线程放入到 锁的等待队列,然后执行调度程序,把 CPU 让给其他线程执行。
type struct lock t{
int flag;
queue_t *q; // 等待队列
} lock_t;
void init(lock t *lock) {
lock->flag = 0;
queue_init(lock->q);
}
void lock(lock_t *lock) {
while(TestAndSet(&lock->flag, 1) == 1) {
保存现在运行线程 TCB;
将现在运行的线程 TCB 插入到等待队列;
设置该线程为等待状态;
调度程序;
}
}
void unlock(lock_t *lock) {
if(lock->q != NULL) {
移出等待队列的队头元素;
将该线程的 TCB 插入到就绪队列;
设置该线程为就绪状态;
}
lock->flag = 0;
}
本次只是提出了两种简单锁的实现方式。当然,在具体操作系统实现中,会更复杂,但也离不开本例子两个基本元素。
如果你想要对锁的更进一步理解,推荐大家可以看《操作系统导论》第 28 章锁的内容,这本书在「微信读书」就可以免费看。
信号量
概念
信号量是操作系统提供的一种协调共享资源访问的方法。
通常信号量表示资源的数量,对应的变量是一个整型(sem
)变量。
另外,还有两个原子操作的系统调用函数来控制信号量的,分别是:
- P 操作:将
sem
减1
,相减后,如果sem < 0
,则进程/线程进入阻塞等待,否则继续,表明 P 操作可能会阻塞; - V 操作:将
sem
加1
,相加后,如果sem <= 0
,唤醒一个等待中的进程/线程,表明 V 操作不会阻塞;
TIP
很多人问,V 操作 中 sem <= 0 的判断是不是写反了?
没写反,我举个例子,如果 sem = 1,有三个线程进行了 P 操作:
- 第一个线程 P 操作后,sem = 0;
- 第二个线程 P 操作后,sem = -1;
- 第三个线程 P 操作后,sem = -2;
这时,第一个线程执行 V 操作后, sem 是 -1,因为 sem <= 0,所以要唤醒第二或第三个线程。
P 操作是用在进入临界区之前,V 操作是用在离开临界区之后,这两个操作是必须成对出现的。
信号量初值2,奇怪情况
举个类比,2 个资源的信号量,相当于 2 条火车轨道,PV 操作如下图过程:
PV 操作的系统实现
信号量数据结构与 PV 操作的算法描述如下图:
// 信号量数据结构
type struct sem_t {
int sem; // 资源个数
queue_t *q; // 等待队列
} sem_t;
// 初始化信号量
void init(sem t *s,int sem) {
s->sem = sem;
queue init(s->q);
}
// P 操作 (原子性)
void P(sem_t *s) {
s->sem--;
if(s->sem < 0) {
1. 保留调用线程 CPU 现场;
2. 将该线程的 TCB 插入到s的等待队列;
3. 设置该线程为等待状态;
4. 执行调度程序;
}
}
// V 操作 (原子性)
void V(sem_t *s) {
s->sem++;
if(s->sem <= 0) {
1.移出s等待队列首元素;
2.将该线程的 TCB 插入就绪队列;
3.设置该线程为「就绪」状态;
}
}
PV 操作的函数是由操作系统管理和实现的,所以操作系统已经使得执行 PV 函数时是具有原子性的。
PV 操作的使用
信号量不仅可以实现临界区的互斥访问控制,还可以线程间的事件同步。
我们先来说说如何使用信号量实现临界区的互斥访问。
为每类共享资源设置一个信号量 s
,其初值为 1
,表示该临界资源未被占用。
只要把进入临界区的操作置于 P(s)
和 V(s)
之间,即可实现进程/线程互斥:
此时,任何想进入临界区的线程,必先在互斥信号量上执行 P 操作,在完成对临界资源的访问后再执行 V 操作。由于互斥信号量的初始值为 1,故在第一个线程执行 P 操作后 s 值变为 0,表示临界资源为空闲,可分配给该线程,使之进入临界区。
若此时又有第二个线程想进入临界区,也应先执行 P 操作,结果使 s 变为负值,这就意味着临界资源已被占用,因此,第二个线程被阻塞。
并且,直到第一个线程执行 V 操作,释放临界资源而恢复 s 值为 0 后,才唤醒第二个线程,使之进入临界区,待它完成临界资源的访问后,又执行 V 操作,使 s 恢复到初始值 1。
对于两个并发线程,互斥信号量的值仅取 1、0 和 -1 三个值,分别表示:
- 如果互斥信号量为 1,表示没有线程进入临界区;
- 如果互斥信号量为 0,表示有一个线程进入临界区;
- 如果互斥信号量为 -1,表示一个线程进入临界区,另一个线程等待进入。
通过互斥信号量的方式,就能保证临界区任何时刻只有一个线程在执行,就达到了互斥的效果。
信号量初值0,实现事件同步
再来,我们说说如何使用 信号量实现事件同步。
同步的方式是设置一个信号量,其初值为 0
。
我们把前面的「吃饭-做饭」同步的例子,用代码的方式实现一下:
semaphore sl = 0; // 表示不需要吃饭
semaphore s = 0; // 表示饭还没做完
// 儿子线程函数
void son() {
while(TRUE) {
肚子饿;
V(s1); // 叫妈妈做饭
P(s2); // 等待妈妈做完饭
吃饭;
}
}
// 妈妈线程函数
void mon() {
while(TRUE) {
P(s1); // 询问需不需要做饭
做饭;
V(s2); // 做完饭,通知儿子吃饭
}
}
妈妈一开始询问儿子要不要做饭时,执行的是 P(s1)
,相当于询问儿子需不需要吃饭,由于 s1
初始值为 0,此时 s1
变成 -1,表明儿子不需要吃饭,所以妈妈线程就进入等待状态。
当儿子肚子饿时,执行了 V(s1)
,使得 s1
信号量从 -1 变成 0,表明此时儿子需要吃饭了,于是就唤醒了阻塞中的妈妈线程,妈妈线程就开始做饭。
接着,儿子线程执行了 P(s2)
,相当于询问妈妈饭做完了吗,由于 s2
初始值是 0,则此时 s2
变成 -1,说明妈妈还没做完饭,儿子线程就等待状态。
最后,妈妈终于做完饭了,于是执行 V(s2)
,s2
信号量从 -1 变回了 0,于是就唤醒等待中的儿子线程,唤醒后,儿子线程就可以进行吃饭了。
生产者-消费者问题
问题描述与分析 —— 需要互斥与同步
生产者-消费者问题描述:
- 生产者在生成数据后,放在一个缓冲区中;
- 消费者从缓冲区取出数据处理;
- 任何时刻,只能有一个生产者或消费者可以访问缓冲区;
我们对问题分析可以得出:
- 任何时刻只能有一个线程操作缓冲区,说明操作缓冲区是临界代码,需要互斥;
- 缓冲区空时,消费者必须等待生产者生成数据;缓冲区满时,生产者必须等待消费者取出数据。说明生产者和消费者需要同步。
解决方案
那么我们需要三个信号量,分别是:
- 互斥信号量
mutex
:用于互斥访问缓冲区,初始化值为 1; - 资源信号量
fullBuffers
:用于消费者询问缓冲区是否有数据,有数据则读取数据,初始化值为 0(表明缓冲区一开始为空); - 资源信号量
emptyBuffers
:用于生产者询问缓冲区是否有空位,有空位则生成数据,初始化值为 n (缓冲区大小);
具体的实现代码:
#define N 100
semaphore mutex = 1; // 互斥信号量,用于临界区的互斥访问
semaphore emptyBuffers = N; // 表示缓冲区「空槽」的个数
semaphore fullBuffers = 0; // 表示缓冲区「满槽」的个数
// 生产者线程函数
void producer() {
while(TRUE) {
P(emptyBuffers); // 将空槽的个数- 1
P(mutex); // 进入临界区
将生成的数据放到缓冲区中;
V(mutex); // 离开临界区
V(fullBuffers); // 将满槽的个数 + 1
}
}
// 消费者线程函数
void consumer() {
while(TRUE) {
P(fullBuffers); // 将满槽的个数- 1
P(mutex); // 进入临界区
从缓冲区里读取数据;
V(mutex); // 离开临界区
V(emptyBuffers); // 将空槽的个数 + 1
}
}
如果消费者线程一开始执行 P(fullBuffers)
,由于信号量 fullBuffers
初始值为 0,则此时 fullBuffers
的值从 0 变为 -1,说明缓冲区里没有数据,消费者只能等待。
接着,轮到生产者执行 P(emptyBuffers)
,表示减少 1 个空槽,如果当前没有其他生产者线程在临界区执行代码,那么该生产者线程就可以把数据放到缓冲区,放完后,执行 V(fullBuffers)
,信号量 fullBuffers
从 -1 变成 0,表明有「消费者」线程正在阻塞等待数据,于是阻塞等待的消费者线程会被唤醒。
消费者线程被唤醒后,如果此时没有其他消费者线程在读数据,那么就可以直接进入临界区,从缓冲区读取数据。最后,离开临界区后,把空槽的个数 + 1。