进程的同步与互斥

进程会对临界资源的访问权限竞争。通常临界区只支持一个进程进行读/写,因为如果两条进程对临界区的相同数据进行写操作,有可能导致两个进程的发生先后不同,导致产生不同的执行结果。

临界资源

临界资源是能被多个进程访问,但是被多个进程同时访问会产生冲突的资源。临界区是访问这一块共享资源的代码

进程对临界资源的访问分为四个部分

  • 进入区 -- 进入临界区前的检查部分
  • 临界区 -- 实际访问临界资源的代码段
  • 退出区 -- 离开临界区的释放资源的段
  • 剩余区 -- 其他部分

进程的同步

进程的同步指的是多个异步进程,需要彼此协调运行次序,而在执行流程上产生等待或者进程信息传递等制约关系。比如进程B需要进程A提供的相关数据,A在缓冲区写入信息、B在缓冲区读信息,那么进程A就是进程B的严格前序进程,这两个进程是同步的。

进程的互斥

进程的互斥指的是,对于一个缓冲区有多个进程能进行读、写操作。当一个进程进入缓冲区后,其他需要访问缓冲区的进程只能等待当前进程结束才能进入缓冲区。

避免多进程进入缓冲区的设计策略:

  • 空闲让进 -- 缓冲区空则放行
  • 忙则等待 -- 缓冲区有进程,其他进程就挂起等待
  • 有限等待 -- 挂起进程不能一直被挂起等待,应保证在有限时间内得到缓冲区资源
  • 让权等待 -- 等待期应当放弃CPU使用权,变为阻塞态,避免忙等待。具体体现在不应反复执行判断语句占用CPU资源。

实现临界区互斥的方法

实现临界区互斥的方法通常都需要基于上面四个设计策略构建,其中让权等待是节省CPU性能开销的可选择策略。

实现方法分为软件实现方法与硬件实现方法。

软件实现方法

单标志法

单标志法通过一个一个公共变量turn指示当前允许进入临界区的进程编号

#include <thread>
#include <iostream>
#include <atomic>
std::atomic<int> turn = 0;

void critical_section(int id) {
    std::cout << "thread " << id << " in critical section\n";
}

void remainder_section(int id) {
    std::cout << "thread " << id << " in remainder section\n";
}

void thread0() {
    while (true) {
        while (turn != 0) {
            // busy waiting
        }

        critical_section(0);

        turn = 1;

        remainder_section(0);
    }
}
void thread1() {
    while (true) {
        while (turn != 1) {

        }

        critical_section(1);
        turn = 0;
        remainder_section(1);
    }
}
int main() {
    std::thread t0(thread0);
    std::thread t1(thread1);

    t0.join();
    t1.join();

    return 0;
}

两个进程必须按照turn的标记交替进入临界区访问,是互锁的,假如thread0不再申请临界区, 则turn始终为0,thread1也无法再申临界区,违背空闲让进原则

双标志法

turn的基础上,添加了布尔数组flag[], 用于表示进程访问临界区的意愿,flag[i]=true表示进程i需要访问临界区。

双标志先检查法

#include<iostream>
#include<thread>
#include<atomic>

std::atomic<int> turn = 0;
std::atomic<bool> flag[2] = (false,false);

void critical_section(int id) {
    std::cout << "thread " << id << " in critical section\n";
}

void remainder_section(int id) {
    std::cout << "thread " << id << " in remainder section\n";
}
void thread0(){
    while(flag[1]) ;   // 先进行其他进程的意愿检查
    flag[0] = true;    // 再设置本进程的进入意愿
    critical_section(0);
    flag[0] = false;    // 访问完临界区后降低访问意愿
    remainder_section(0);
}

void thread1(){
    while(flag[0]) ;
    flag[1] = true;
    critical_section(1);
    flag[1] = false;
    remainder_section{1};
}
int main(){
    int main() {
        std::thread t0(thread0);
        std::thread t1(thread1);

        t0.join();
        t1.join();

        return 0;
    }
}

双标志先检查法的问题在于,如果两个进程都通过了意愿检查,可能出现两个进程同时使用临界区的问题,违背了忙则等待原则

双标志后检查法

/* 其他的设置与 双标志先检查法 相同*/
void thread0(){
    flag[0] = true;      // 先设置本进程访问意愿
    while(flag[1]);      // 再检查其他进程的访问优先级
    critical_section(0);
    flag[0] = false;    // 访问完临界区后降低访问意愿
    remainder_section(0);
}


void thread1(){
    flag[1] = true;
    while(flag[0]) ;
    critical_section(1);
    flag[1] = false;
    remainder_section{1};
}
int main(){
    int main() {
        std::thread t0(thread0);
        std::thread t1(thread1);

        t0.join();
        t1.join();

        return 0;
    }
}

双标志后检查法解决了双标志先检查法的忙则等待问题,但是双标记后检查法可能遇到两个进程都想访问临界区,都进行等待,导致临界区空闲,违背了空闲让进有限等待准则。

Peterson 算法

Peterson算法融合了turnflag的优点,通过turn管理互斥访问,通过flag管理饥饿问题

#include <array>
#include <atomic>
#include <chrono>
#include <iostream>
#include <mutex>
#include <string>
#include <thread>

using namespace std::chrono_literals;

std::array<std::atomic<bool>, 2> flag = {false, false};
std::atomic<int> turn{0};

std::atomic<int> shared_counter{0};
std::atomic<int> active_in_critical{0};
std::mutex output_mutex;

void log(int process_id, const std::string& message) {
    std::lock_guard<std::mutex> lock(output_mutex);
    std::cout << "P" << process_id << " | " << message << '\n';
}

void peterson_lock(int process_id) {
    const int other = 1 - process_id;

    flag[process_id].store(true);
    turn.store(other);

    log(process_id, "wants to enter: flag[" + std::to_string(process_id) +
                        "]=true, turn=" + std::to_string(other));

    bool printed_wait_message = false;
    while (flag[other].load() && turn.load() == other) {
        if (!printed_wait_message) {
            log(process_id, "waits because flag[" + std::to_string(other) +
                                "]=true and turn=" + std::to_string(other));
            printed_wait_message = true;
        }
        std::this_thread::sleep_for(20ms);
    }

    if (printed_wait_message) {
        log(process_id, "can enter now");
    }
}

void peterson_unlock(int process_id) {
    flag[process_id].store(false);
    log(process_id, "leaves: flag[" + std::to_string(process_id) + "]=false");
}

void critical_section(int process_id, int round) {
    const int already_inside = active_in_critical.fetch_add(1);
    if (already_inside != 0) {
        log(process_id, "ERROR: mutual exclusion was broken");
    }

    log(process_id, "enters critical section, round " + std::to_string(round));

    const int before = shared_counter.load();
    std::this_thread::sleep_for(80ms);
    shared_counter.store(before + 1);

    log(process_id, "updates shared_counter: " + std::to_string(before) +
                        " -> " + std::to_string(before + 1));
    log(process_id, "exits critical section, round " + std::to_string(round));

    active_in_critical.fetch_sub(1);
}

void process(int process_id, int rounds) {
    for (int round = 1; round <= rounds; ++round) {
        peterson_lock(process_id);
        critical_section(process_id, round);
        peterson_unlock(process_id);

        std::this_thread::sleep_for(process_id == 0 ? 35ms : 55ms);
    }
}

int main() {
    constexpr int rounds = 2;

    std::cout << "Peterson algorithm demo: two threads, one critical section\n";
    std::cout << "----------------------------------------------------------\n";

    std::thread p0(process, 0, rounds);
    std::thread p1(process, 1, rounds);

    p0.join();
    p1.join();

    std::cout << "----------------------------------------------------------\n";
    std::cout << "Final shared_counter = " << shared_counter.load()
              << " (expected " << rounds * 2 << ")\n";
    return 0;
}

Peterson算法的while循环会持续检测是否具有进入临界区的权限(对于进程0,当对方进程进入的时候挂起,flag[1]=true+ turn =1), 因此没有遵循让权等待原则,但是这已经是纯软件互斥算法中最完善的方案。

硬件实现算法

硬件实现算法相当于给临界区资源添加锁

中断屏蔽

对于单处理器系统,一次只能执行单进程任务。因此能够通过关中断方式,保证进程在临界区执行时不进行进程的切换,保证进程访问临界区不受干扰。

硬件指令 -- TestAndSetSwap

TestAndSet原语实现了加锁功能

以CPP伪代码实现

#include <atomic>
using namespace std;

atomic<bool> TestAndSet(atomic<bool>& lock) {
    return lock.exchange(true, memory_order_acquire);
}

void Unlock(atomic<bool>& lock) {
    lock.store(false, memory_order_release);
}
int main(){
    ...
    while (TestAndSet(lock)) {
        // 忙等待
    }
    fun_();       // 上锁的临界区代码段
    Unlock(lock); // 开锁 
    fun_other();
}

Swap实现的是交换两个原子变量的值

#include<atomic>
using namespace std;

void Swap(atomic<bool> a, atomic b){
    atomic<bool> tmp = a;
    a = b;
    b = tmp;
}
int main(){
    atomic<bool> key = true;
    atomic<bool> lock = false;
    while(key != false){
        swap(lock, key);
    }
    fun_();
    lock = false;
    fun_other();
}

Swap实现的是,进程维护的Key值与临界区的Lock的原子布尔值的交换,进程持有Key = True访问临界区时,临界区的Lock变为True,此时相当于上锁。其他进程便无法访问临界区。但是通过TSSwap实现的互斥锁,在临界区外的进程需要持续检查Key的值,形成忙等待。

互斥锁

互斥锁(自旋锁)和TS/Swap指令逻辑相同,基于这两个指令实现开/关锁管理进程与临界区

atomic<bool> available = true;
void acquire(){
    while (!available);
    available = false;
}
void release(){
    available = true;
}

实际场景下的同步算法

信号量机制

信号量将互斥锁的二元占用状态推广为整数计数状态,可用于表示可用资源数量或限制并发访问数量;其 acquire/release 操作本质上是原子的计数检查、递减、递增与等待唤醒机制,而不只是普通整数比较。

信号量只能通过两个标准原语访问wait()/ signal(), 也称为P/V操作

  • 整型信号量
    通过整数S记录信号量

伪代码实现:

wait(S){
    while(S<=0){
         // 忙等待
    }
    S--;
    
}
signal(S){
    S++;
}
  • 记录型信号量

通过整数S与进程链表L的整个结构体共同实现信号量。

struct semaphore{
    int val;
    llist<process> *L;
};

void wait(semaphore S){
    S.val -- ;
    if (S.val < 0){
        // L 中插入进程
        block(S.L);
    }
}
void signal(semaphroe S){
    S.val ++;
    if (S.val < 0){
        // L 中 移除进程
        wakeup(S.L);
    }
}
信号量与互斥
semaphore S = n; // 初始化信号量
thread P1(){
    P(S);    // 加锁
    fun();   // 执行
    V(S)     // 开锁
}
信号量与同步
semaphore S = 0; // 初始化信号量

thread P1(){
    fun1();
    V(S);
}

thread P2(){
    while(S<=0);
    P(S);
    fun2();
}

进程P1执行完后,信号量才能增加到1,进程P2才能通过忙等待执行。因此通过信号量的序关系确定了进程的前驱关系

信号量与计算图

信号量可以描述程序段间的前后前驱关系,它能实现一个有向图的任务执行流程,每一个有向边对应一个同步问题。通过维护不同的信号量值,管理不同的前驱关系,从而构建一个进程前驱图。

神经网络中的计算图是张量信息流作为有向边与算子计算结点构成顶点的DAG。由于数据依赖天然诱导执行依赖,因此它可以被视为一种前驱图。若从并行调度角度实现,每条依赖边都可以抽象为一个同步约束,信号量是表达这种约束的一种机制。

生产者-消费者问题

系统中存在多个生产者与消费者,其中生产者、消费者只能在缓冲区互斥读/写

这个场景下,只需要判断缓冲区是否有“产品”与是否能访问。通过互斥信号量mutex实现互斥访问的限制,通过计数信号量empty的值表示当前缓冲区含有的产品量。

CPP伪代码为


semaphore mutex = 1;  // 互斥缓冲区 -- 占用进程缓冲区P后不允许进程访问
semaphore empty = n;  // 空缓冲区值 -- 产品缓冲区的存量上限
semaphore full = 0;

void producer(){
    while(true){
        produce();        // 生产产品
        P(empty);         // 申请空缓冲区 -- 容量 -1
        P(mutex);         // 申请进程互斥缓冲区 -- 禁止其他进程访问
        put_product();    // 产品放入缓冲区 
        V(mutex);         // 释放进程互斥缓冲区 
        V(full);          // 满缓冲区 有效产品 +1
    }
}

void consumer(){
    while(true){
        P(full);          // 申请满缓冲区 有效产品 -1
        P(mutex);         // 申请进程互斥缓冲区
        use_product();    // 消耗产品
        V(mutex);         // 释放进程互斥缓冲区
        V(empty);         // 释放空缓冲区 -- 容量 +1
    }
}

读者-写者问题

读者-写者问题是生产者-消费者问题的变式。写者互斥写、读者并行读,但是读者读和写者写不能同时发生。

读者-写者问题分为两种权限优先

  • 读者优先 -- 读者可抢占行夺取缓冲区资源
  • 写者优先 -- 写者非抢占性优先,读者读取时写者需等待
读者优先

写者的信号量权限为 rw = 1, 读者侧维护信号量account 表示当前在读的人数,如果读者归零,则写者能进入缓冲区写

CPP伪代码实现

int count = 0;
semaphore mutex = 1; 
semaphore rw = 1;

void reader(){
    while(true){
        P(mutex);   // 申请进程缓冲区
        if (count == 0){
            P(rw);  // 关闭 写者访问
        }
        count ++;
        V(mutex);   // 释放进程缓冲区
        read();
        P(mutex);
        count -- ;
        if(count == 0){
            V(rw);
        }
        V(mutex);
    }
}


void writer(){
    while(true){
        P(rw);      // 申请rw锁,如果存在读者,则无法访问
        write();
        V(rw);
    }
}
写者优先

写者优逻辑中,还需要维护一个新的信号量w, 用于限制写者的唯一访问

CPP伪代码

int count = 0;       // 读者数
semaphore mutex = 1; // count锁
semaphore rw = 1;    // 写者优先权
semaphore w = 1;     // 文件的独占写信号量


void writer(){
    while(true){
        P(rw);      // 阻止读者读
        P(w);       // 写者单独写
        write();
        V(w);
        P(rw);
    }
}


void reader(){
    while(true){
        P(rw);      // 申请读权限,如果有写者则无法读
        P(mutex);   // 申请进程缓冲区访问
        if (count == 0){
            P(w);   // 阻止写者访问,非抢占性优先
        }
        count ++;
        V(mutex);
        V(rw);
        read();
        P(mutex);
        count -- ;
        if(count == 0){
            V(w);
        }
        V(mutex);
    }
}

哲学家进餐问题

圆形餐桌的每一个哲学家左右手各有一根筷子,进餐时哲学家需要拿起两根筷子进食,如果没有足够的筷子就进行等待。

选定一个哲学家为00 号哲学家,且逆时针编号,则第ii 个哲学家的左手边的筷子与右手边的筷子的编号分别为

{i左手边筷子(i+1)%5右手边筷子\begin{dcases} \quad i & \text{左手边筷子}\\ (i+1)\% 5 &\text{右手边筷子} \end{dcases}

对于 nn 个哲学家与 nn 根筷子,会出现一个人有一只筷子的死锁情况: 每一个人都只有左手边筷子或者右手边筷子,此时会发生死锁。

如果限制每一次只能有 n1n-1 个哲学家能吃饭,那么根据容斥原理,总有一个哲学家有两只筷子,此时这个哲学家吃完饭释放筷子资源后,其余哲学家也能继续进餐。因此基于这个想法,进餐人数的信号量初始为 n1n-1 时能避免死锁。