爱锋贝

 找回密码
 立即注册

只需一步,快速开始

扫一扫,极速登录

开启左侧

xv6 学习:进程管理C 互斥与同步

[复制链接]
发表于 2023-4-3 10:11:49 | 显示全部楼层 |阅读模式

一键注册,加入手机圈

您需要 登录 才可以下载或查看,没有帐号?立即注册   

x
主要参考自 《给操作系统捋条线》,加上了自己的笔记。
锁大家都很熟悉了,就是用来进程互斥、实现同步。本文首先介绍锁的实现,然后是 xv6 中锁的使用,xv6 基于锁实现了互斥和同步。
索引

  • 基本概念、基本函数
  • 自旋锁、睡眠锁的实现
  • 同步:sleep、wakeup
  • 同步:wait、exit
  • 调度切换
1. 基本概念、函数

1.1 概念

这里介绍下与锁相关的基本概念

  • 公共资源:顾名思义就是被多个任务共享的资源,可以是公共内存,也可以是公共文件等等
  • 临界区:要访问使用公共资源,肯定得通过一些代码指令去访问,这些代码指令就是临界区
  • 并发:单个 CPU 上交替处理多个任务,宏观上看就像是同时进行的一样,但微观上看任意时刻还是只有一个任务在进行
  • 并行:多个处理器同时处理多个任务,能够做到真正意义上的多个任务同时进行。
  • 互斥:也称为排他,任何时候公共资源只允许最多一个任务独享,不允许多个任务同时执行临界区的代码访问公共资源
  • 同步:多进程并发时,不同程序之间的制约关系
  • 竞争条件:竞争条件指的是多个任务以竞争的形式并行访问公共资源,公共资源的最终状态取决于这些任务的临界区代码的精确执行时序
竞争条件比如说多进程要并行读写同一段地址空间,这样很可能会破坏读写原子性,譬如读取到其他写操作的中间状态。显然竞争条件并不是我们想要的,虽然一些竞争条件出现的概率很小,但根据墨菲定律,会出错的总会出错,加之计算机的运行频率,就算出错的概率再小,在某天某时某刻那也是有可能发生的。
所以对于进入临界区访问公共资源我们要避免竞争条件,保证公共资源的互斥排他性,一般有两种大的解决方案来实现互斥

  • 忙等待:没进入临界区时一直循环,占用 CPU 资源
  • 休眠等待:没进入临界区时一直休眠,不占用 CPU,CPU 利用率较高,但如果被调度到的次数过多,有进程上下文切换的开销
xv6 就根据上面两种方案,分别实现了两个类型的锁,在后面会看到
1.2 sleep

这里介绍下 sleep、exit、wakeup 三个函数的基本作用,更深层次的,在这些函数中使用的互斥并发,将在介绍了锁后讨论。
void sleep(void *chan, struct spinlock *lk);这里的 sleep 不是那个系统调用,而是 xv6 的内核函数。chan 表示进程因为什么对象而沉睡?lk 表示管理这个对象的锁。
void sleep(void *chan, struct spinlock *lk){
    p->chan = chan;         // 休眠在chan上
    p->state = SLEEPING;    // 状态更改为SLEEPING
    sched();                // 让出CPU调度
    p->chan = 0;            // 休眠对象改为0
}sleep 大概就干了上面的事:

  • p 表示当前进程
  • 将 p->chan 设置为 chan,表示因 chan 而沉睡(在 wakeup 中会用到)
  • 将 p->state 设置为 sleeping
  • 交出 cpu,调度到其他进程
  • 等待从 sched 返回时,表示该进程被唤醒,已经重新上 cpu 了,此时设置休眠对象为 0,休眠结束
调用 sched 之后是不会返回的,而是去执行其他进程了,这就是休眠的好处,提高 cpu 的利用率。
1.3 wakeup

static void
wakeup1(void *chan)
{
  struct proc *p;

  for(p = ptable.proc; p < &ptable.proc[NPROC]; p++)
    if(p->state == SLEEPING && p->chan == chan)
      p->state = RUNNABLE;
}wakeup 的作用就是唤醒所有因 chan 而沉睡的进程,具体就是遍历一遍 pcb 数组,然后判断 sleeping && p->chan == chan。wakeup1 是没有加锁版本的 wakeup,一般是获取了锁然后调用这个函数。
1.4 exit

这里来看看 exit 函数。在进程的 main 函数执行完后将会系统调用 exit 函数,让子进程回收一部分资源,标记为僵尸态等到父进程回收剩下的资源。
1.5 wait

父进程调用 wait 就是用来负责回收子进程,流程为:

  • 首先看看遍历看看有没有孩子,没有孩子或者当前进程被杀死就直接退出
  • 如果找到僵尸子进程成功,回收他的页表、内核栈、pcb
2. Lock

简单介绍了几个函数和基本概念,接下来就是锁是怎么实现的了!前面说过,锁分为自旋锁和睡眠锁。首先介绍最简单的自旋锁
2.1 自旋锁

2.1.1 结构体

struct spinlock {

  uint locked;       // Is the lock held?

  // For debugging:
  char *name;        // Name of lock.
  struct cpu *cpu;   // The cpu holding the lock.
  uint pcs[10];      // The call stack (an array of program counters)
                     // that locked the lock.
};有关自旋锁的结构定义如上,最重要的就是 locked 元素,用来表示该锁是否已被某 cpu 取得,1表示该锁已某 cpu 取走,0 表示该锁空闲。其他三个元素用作为 debug 调试信息。
因为锁不属于信号量,所以 locked 最多只能为1,这应该很清楚了。
2.1.2 初始化

void initlock(struct spinlock *lk, char *name)
//初始化锁 lk
{
    lk->name = name; //初始化该锁的名字
    lk->locked = 0;  //初始化该锁空闲
    lk->cpu = 0;     //初始化持有该锁的CPU为空
}这部分就是初始化锁的名字、设置为空闲、没有 CPU 持有该锁
2.1.3 开关中断

void pushcli(void)
{
    int eflags;
    eflags = readeflags(); //cli之前读取eflags寄存器

    cli(); //关中断
    if(mycpu()->ncli == 0) //第一次pushcli()
        mycpu()->intena = eflags & FL_IF; //记录cli之前的中断状态

    mycpu()->ncli += 1; //关中断次数(深度)加1
}


void popcli(void)
{
    if(readeflags()&FL_IF) //如果eflags寄存器IF位为1
        panic("popcli - interruptible");
    if(--mycpu()->ncli < 0) //如果计数小于0
        panic("popcli");
    if(mycpu()->ncli == 0 && mycpu()->intena) //关中断次数为0时即开中断
        sti();
}开关中断这两个函数在请求锁、释放锁的时候会被用到。这里看下函数原理:pushcli 和 popcli 为 cli 和 sti 的封装函数,只是增加了计数,每个 popcli 与 pushcli 匹配,有多少次 pushcli 就要有多少次 popcli,只有第一次 pushcli、最后一次 popcli 才会开关中断。

  • 第一个 pushcli:关中断,并且记录关中断之前的 eflags 寄存器状态以便恢复,最后增加关中断次数
  • 之后的 pushcli:可以认为只是增加中断计数,关中断已经发生过了
  • 最后一个 popcli:此时如果关中断次数为0,并且第一次 pushcli 之前的中断状态(之前保存的 intena)为开中断,这里才会开中断
  • 之前的 popcli:可以认为只是减少了中断次数
在持有锁的过程中,进程将不会处理中断,因此后面本文的讨论中,只要拥有一个锁,此时是不会因时间片到期放弃 CPU 进入调度的。
为什么使用 pushcli() 和 popcli() 而不是使用 cli() sti() 后面详细说明。
2.1.4 检测持有锁

int holding(struct spinlock *lock)
{
    int r;
    pushcli();
    r = lock->locked && lock->cpu == mycpu(); //检验锁lock是否被某CPU锁持有且上锁
    popcli();
    return r;
}holding 函数是在关中断下检查锁是否被某 CPU 取走,仔细看检查是否持有锁的条件为两个:一是锁是否被取走,二锁是否由当前 CPU 取走。
由于不一定持有锁,所以为了保证获取信息这个操作的原子性,需要关中断下执行,避免被调度。
2.1.5 取锁解锁

void acquire(struct spinlock *lk)
{
    pushcli(); // disable interrupts to avoid deadlock.
    if(holding(lk)) // 如果已经取了锁
        panic("acquire");
    while(xchg(&lk->locked, 1) != 0) //原子赋值
        ;
    __sync_synchronize(); //发出一个full barrier

    // 调试信息
    lk->cpu = mycpu(); //记录当前取锁的CPU
    getcallerpcs(&lk, lk->pcs); //获取调用栈信息
}关中断下进行取锁的操作,以避免死锁,原因见后面 FAQ
acquire 检查当前 CPU 是否已经持有锁,如果已持有,则 panic(),也就是说 xv6 不允许同一个 CPU 对同一个锁重复上锁。
上锁的语句为 xchg(&lk->locked, 1),xchg 函数可以看作一个原子赋值函数(本是交换,与 1 交换就相当于赋值)

  • 将 lk->locked 赋值为 1,返回 locked 的旧值
  • 也就是说如果该锁空闲没有 CPU 持有,那么当前 CPU 将其赋值为 1 表示取得该锁,xchg 返回旧值 0,跳出 while 循环
  • 如果该锁已经被某 CPU 持有,那么 xchg 对其赋值为 1,但返回值也是 1,不满足循环跳出条件,所以一直循环等待某 CPU 释放该锁,当前 CPU 因为是关中断只能一直忙等。
因取锁可能需要一直循环等待,所以名为自旋锁。
__sync_synchronize() 是发出一个 full barrier,简单来说就是不允许将这条语句之前的内存读写指令放在这条之后,也不允许将这条语句之后的内存读写指令放在这条指令之前。
由于编译器翻译成汇编代码是乱序的,在 xchg 获得锁之前,后面关于 debug 的语句可能提前执行,这条指令就能保证需要等到获取锁之后,才会执行 debug 相关的汇编指令。
void release(struct spinlock *lk)
{
    if(!holding(lk))
        panic("release");
    lk->pcs[0] = 0;

    //清除调试信息
    lk->cpu = 0;
    __sync_synchronize(); //发出一个full barrier
    asm volatile("movl $0, %0" : "+m" (lk->locked) : ); //lk->locked=0
    popcli();
}释放锁主要就是将 lk->locked 设置为 0,其他进程被调度到时如果检测到锁为空闲,将会重新上锁,访问互斥资源。
2.1.6 faq

Ⅰ xv6 内核 scheduler 的开中断
在 proc.c 中可以看到
for(;;){
    // Enable interrupts on this processor.
    sti();
   
    // Loop over process table looking for process to run.
    acquire(&ptable.lock);执行 scheduler 的时候是开中断的,内核中只有 scheduler 函数会短暂的开中断的,其他时候都是关中断的。而如果做过 jos 就知道,jos 在内核态下是全程关中断的。
造成两者区别的原因是:

  • xv6 scheduler 是一个无限循环,每一次都会请求锁,遍历一遍数组想要找到一个空闲的进程运行
  • jos scheduler 只运行一遍,如果找不到进程运行就直接让 cpu 沉睡、开中断,等待中断唤醒它
因此,xv6 的 scheduler 要定时开中断,如果 scheduler 一直无限运行下去,就不能执行中断处理程序。
Ⅱ xv6 加锁为什么要使用 xchg()
我们知道 xchg 是硬件提供的原子指令,xchg(&lk->locked, 1) 具体执行了下面两件事:
old = lk->locked;
lk->locked = 1;
return old;假如这个语句不是原子的,那么同时可能有多个 cpu 标记为 lk->locked = 1,然后返回。最后,有多个 cpu 访问互斥资源,显然是错误的。
Ⅲ acquire() 加锁函数的关中断
acquire(struct spinlock *lk)
{
  pushcli(); // disable interrupts to avoid deadlock.为什么要关中断呢?由于 scheduler 在 acquire 之前临时开中断,所以现在内核态下,是可以收到中断然后运行中断处理程序的。假设加锁时没有没有关中断,这里划分为几个时间点下产生中断:

  • 在 acquire 中 xchg 指令前
  • 在 acquire 中 xchg 指令后
  • 在 release 函数前
逐一分析。假设在 xchg 指令前发生中断,中断处理程序中可能会获取到和 acquire 相同的锁,因为此时内核代码还没有获取锁,所以不会发生死锁;假设在 xchg 指令后发生中断,中断处理程序中可能会请求和 acquire 相同的锁,由于先前该 CPU 的  scheduler 已经获取了锁,中断处理程序将自旋或者沉睡,发生死锁;假设在 release 函数前发生中断,和上面同理,将会发生死锁;
因此,我们需要在 xchg 前关中断,可以回答下面几个问题:

  • 能不能直接不关中断:不行、会死锁
  • 能不能先上锁再关中断:不行、会死锁
  • 关中断持续的时间:持续到 release
release() 函数先原子赋值释放锁再开中断,也就同理了,如果两者交换先开中断,那么在释放锁之前可能发生中断,而中断处理程序刚好需要该锁,那么发生死锁。
Ⅳ xv6 的竞争条件有哪些?
可能的情况如下,下面逐一分析:

  • 多个 CPU 并行执行
  • 单个 CPU 上,中断处理程序和内核代码并发执行
  • 单个 CPU 上,不同进程并发执行
xv6 是个支持多处理器的系统,各个 CPU 之间可以并行执行,所以可能会出现同时访问公共资源的情况,是一个竞争条件。
在单个 CPU 上,内核态下,scheduler 是开中断的,所以内核代码可能会和中断处理其程序并发执行,并发的进入临界区。前面提到过,取锁时关中断,内核代码就不会切换到 scheduler,所以不是竞争条件。
xv6 不支持线程,而各个进程之间内存是不共享的,加之内核进入临界区访问公共资源的时候是关了中断的,关了中断除了自己休眠是不会让出 CPU 的,所以运行在单个处理器上的各个进程之间的并发其实并不会产生竞争条件。
Ⅴ 关中断开中断为什么要使用 pushcli() 和 popcli() 而不直接使用 cli() 和 sti()?
为什么要使用 pushcli() 和 popcli(),其实也简单,那是因为某个函数中可能要取多个锁,比如先后取了锁 1 锁 2,那么释放锁 2 之后能开中断吗?显然不能,必须等锁 1 也释放的时候才能开中断。所以使用增加了计数功能的 pushcli() 和 popcli() 来实现最后一个锁释放的时候才开中断。
Ⅵ 内存乱序问题
现今的指令的执行都有流水线的技术,其实还有乱序执行。乱序执行指的是在 CPU 运行中的指令不按照代码既定的顺序执行,而是按照一定的策略打乱后顺序执行,以此来提高性能。
在一些时候,我们不希望指令顺序被打乱,为避免这种情况,我们设置了屏障,禁止这个屏障前后的指令顺序打乱
2.2 休眠锁

2.2.1 结构体

xv6 里面还提供了另一种锁,休眠锁,它在自旋锁的基础之上实现,定义如下:
struct sleeplock {
    uint locked;
    // Is the lock held? 已锁?
    struct spinlock lk; // spinlock protecting this sleep lock 自旋锁
    // For debugging:
    char *name;
    // Name of lock. 名字
    int pid;
    // Process holding lock 哪个进程持有?
};休眠锁配了一把自旋锁来保护,原因见后。休眠锁的意思是某个进程为了取这个休眠锁不得而休眠,所以有个 pid 来记录进程号。
2.2.2 取锁

休眠锁的初始化,检验是否持有锁等都类似,就不赘述了,再这里主要看看取锁和解锁的区别:
void acquiresleep(struct sleeplock *lk)
{
    acquire(&lk->lk); // 优先级:->高于&
    while (lk->locked) { // 当锁已被其他进程取走
        sleep(lk, &lk->lk); // 休眠
    }
    lk->locked = 1; // 上锁
    lk->pid = myproc()->pid; // 取锁进程的pid
    release(&lk->lk);
}睡眠锁需要保证 检测锁状态、上锁操作、休眠操作 这几个操作是原子的,在自旋锁中由于只需要保证前两个操作,所以使用 xchg 指令就可以实现。
为什么要上锁?一个原因是原子性,另一个原因就是为了防止 lost wakeup,防止死锁。可以看看后面的 同步:sleep & wakeup。
原子性即代表当前 CPU 不发生调度、其他 CPU 执行 acquiresleep 需要互斥。因此睡眠锁使用 xchg 很难保证原子性,这里 xv6 使用自旋锁,就能很好的保证睡眠锁的原子性。
这里使用自旋锁的 acquire、release 就保证了原子性。在获取自旋锁后,如果此时休眠锁上锁,将会调用 sleep 函数放弃自旋锁并陷入沉睡。如果此时休眠锁空闲,将设置 lk->locked 表示上锁,并且记录进程的 pid。
2.2.3 解锁

void releasesleep(struct sleeplock *lk)
{
    acquire(&lk->lk); //取自旋锁
    lk->locked = 0;
    lk->pid = 0;
    wakeup(lk); //唤醒
    release(&lk->lk);
}解锁操作基本上就是上锁的逆操作,注意一点,可能有其他进程休眠在休眠锁上,所以当前进程解锁后需要唤醒休眠在休眠锁上的进程,wakeup 将会请求表锁,查找因为休眠锁沉睡的进程。
3. 同步

锁同步的问题一直是操作系统里面最为复杂的问题之一,,xv6 的锁设计本身不难,难得是锁的使用,这里就根据进程这一块使用锁的地方来简单聊一聊,进程中与锁的有关地方主要有休眠,唤醒,等待,退出,调度,切换。
3.1 休眠唤醒

休眠是休眠在某个对象上,唤醒是唤醒休眠在某个对象上的进程,所以想当然的可以这样来声明 sleep 和 wakeup:
void sleep(void *obj);
void wakeup(void *obj);那这样声明对不对呢?来看个用户态下,简单的变种生产者消费者的例子
3.1.1 消费者生产者

Send:
    while(obj != NULL) //对象还在
    ;                  //循环
    obj = 1;           //制作对象
    wakeup(obj);       //唤醒休眠在其上的消费者

Recv:
    while(obj == NULL) //没有对象可消费
        sleep(obj);    //休眠
    obj = NULL;        //消耗对象乍一看感觉没什么问题,但是由于我们不能保证进程被调度到的顺序。如果 wakeup 发生在 sleep 之前就会引起死锁:

xv6 学习:进程管理C 互斥与同步-1.jpg

IMG

比如上面这种在单处理器上并发的情况,指令执行顺序如下:

  • 消费者首先检测是否有 obj,由于 obj=NULL,准备休眠
  • 在 sleep 前发生时间中断,调度到生产者
  • 生产者设置 obj=1,调用 wakeup 函数,唤醒因为 obj 沉睡的进程
  • 切换回消费者,消费者此时执行 sleep 陷入沉睡
综上,由于 sleep 发生在 wakeup 之前,陷入死锁。就算不是中断,因为是多处理器,两者运行的时间是不确定的,完全也可能出现上述的情况。
避免这种前后感知的信息不一致的办法就是加锁,来看第二个版本:
Send:
    lock(obj_lock);
    while(obj != NULL) // 对象还在
        sleep(obj);    // 休眠
    obj = 1;           // 制作对象
    wakeup(obj);       // 唤醒休眠在其上的消费者
    unlock(obj_lock);

Recv:
    lock(obj_lock);
    while(obj == NULL) // 没有对象可消费
        sleep(obj);    // 休眠
    obj = NULL;        // 消耗对象
    wakeup(obj)
    unlock(obj_lock);第二种版本做了一个修改,保证生产者和消费者操作的原子性,生产者、消费者读取对象、使用对象、唤醒、沉睡是一个原子操作,在解锁前不会被调度和抢占。
回到上面的问题,此时不会再出现消费者在 sleep 前被调度,lost wakeup 的情况。recv 在准备 sleep 之前获得了 obj_lock,send 没有 obj_lock,是不可能 wakeup 的,所以休眠就不会错过唤醒。
是否可以检测完对象之后再获取 obj_lock 呢?这是不正确的,将会导致进程的操作不是原子的,出现多个进程进入缓冲区操作同一个 obj 的情况。
3.1.2 sleep

这个问题倒是解决了,又有一个新问题,如果进程带着 obj_lock 休眠不释放的话,同样死锁。
因此将 sleep 的接口定义为:
void sleep(void *obj, void *obj_lock)这个锁作为 sleep 的参数,在进程进入休眠状态时释放该锁,在进程返回时重新获取该锁。
也就是说,在 sleep 函数中,修改进程状态前要先释放锁,然后才修改为 SLEEPING,让出 CPU。为了保证在释放锁后,不会出现 sleep 错过 wakeup 的情况,修改进程状态和释放锁需要是一个原子操作。
假设刚释放锁,虽然此时可能不会发生中断,但是其他 cpu 并行时会重新获取锁,然后快速执行完 wakeup,因此出现了 lost wakeup
由于涉及到了进程状态的修改,所以 xv6 选择用 ptable.lock 来保证上面操作的原子性。
void
sleep(void *chan, struct spinlock *lk)
{

  struct proc *p = myproc();

  if(p == 0)
    panic("sleep");

  if(lk == 0)
    panic("sleep without lk");


  if(lk != &ptable.lock){  //DOC: sleeplock0
    acquire(&ptable.lock);  //DOC: sleeplock1
    release(lk);
  }

  // Go to sleep.
  p->chan = chan;
  p->state = SLEEPING;

  sched();

  // Tidy up.
  p->chan = 0;
  // Reacquire original lock.
  
  if(lk != &ptable.lock){  //DOC: sleeplock2
    release(&ptable.lock);
    acquire(lk);
  }

}在放弃 CPU 前:

  • 获取表锁,保证 lk 的释放和状态修改是一个原子操作
  • 释放 lk
  • 修改休眠对象和进程状态,交出 CPU
sched 将会交出 CPU,等到该进程被 wakeup,重新获取 CPU。在重新拥有 CPU 后:

  • 修改休眠对象为 NULL
  • 被调度到时,sched 中获取了表锁,放弃表锁重新获取 lk
进程变为 SLEEPING 之后将不会被调度到,因此 sleeplock 减少了 CPU 的负担,不会再让一个CPU忙等,而是合理放弃 CPU。
特殊情况在于,如果 lk = 表锁,那么放弃 cpu 前不请求表锁,拥有 cpu 后不放弃表锁。休眠对象是一个进程,在 xv6 中,这种情况只发生在 wait 函数中,父进程等待子进程退出,这个时候它休眠在自己身上。
3.1.3 wakeup

static void wakeup1(void *chan)
{
    struct proc *p;
    for(p = ptable.proc; p < &ptable.proc[NPROC]; p++)
        if(p->state == SLEEPING && p->chan == chan)
            p->state = RUNNABLE;
}
void wakeup(void *chan)
{
    acquire(&ptable.lock);
    wakeup1(chan);
    release(&ptable.lock);
}然后是 wakeup 函数,分为两个部分:不加锁的 wakeup1 和加锁版本的 wakeup。这么划分是因为,在有些情况下已经获取了表锁,直接调用 wakeup 会死锁,详见下面的 wait 函数。
3.3 exit

这里来看看 exit 函数
void exit(void){
    struct proc *curproc = myproc();
    if(curproc == initproc)
        panic("init exiting");首先第一个进程是不能退出的,如果退出直接报错。他需要用来回收子进程和孤儿进程的资源
// Close all open files.

  for(fd = 0; fd < NOFILE; fd++){
    if(curproc->ofile[fd]){
      fileclose(curproc->ofile[fd]);
      curproc->ofile[fd] = 0;
    }
  }子进程会自己回收掉一部分资源。比如打开文件描述符表,这一部分关闭所有文件,如果减到 0,再释放该文件的 inode,如果文件的链接数和引用数都为 0 了,就删除该文件。
acquire(&ptable.lock);

  // Parent might be sleeping in wait().
  wakeup1(curproc->parent);

  // Pass abandoned children to init.
  for(p = ptable.proc; p < &ptable.proc[NPROC]; p++){
    if(p->parent == curproc){
      p->parent = initproc;
      if(p->state == ZOMBIE)
        wakeup1(initproc);
    }
  }

  // Jump into the scheduler, never to return.
  curproc->state = ZOMBIE;
  sched();
  panic("zombie exit");和前文对应,由于 exit 函数已经拥有了表锁,为了避免死锁,使用不加锁版本的 wakeup1。
然后是要唤醒父进程,通知父进程来回收自己。这里还会抛弃当前进程的所有子进程,子进程成为孤儿进程,将孤儿进程的父进程修改为 init 进程,让 init 进程负责回收资源。最后将自己的状态设置为 zombie,父进程将在 wait 调用中回收 zombie 的子进程。
因为这个操作会修改进程状态,所以执行前要获取表锁。提一嘴,虽然这里是获得了锁,sched 函数需要当前进程持有锁,后面会看到设计的原因。
为什么要父进程负责回收呢?私以为,因为子进程在内核中运行时还要使用到页表、内核栈、进程结构体,如果直接回收,执行下一条内核代码将会报错,影响系统执行,所以这些资源等到父进程回收为好。
3.4 wait

acquire(&ptable.lock);

for(;;){
    // Scan through table looking for exited children.

    havekids = 0;
    for(p = ptable.proc; p < &ptable.proc[NPROC]; p++){
      if(p->parent != curproc)
        continue;

      havekids = 1;

      if(p->state == ZOMBIE) { // Found one.
        pid = p->pid;
        kfree(p->kstack);
        p->kstack = 0;
        freevm(p->pgdir);
        p->pid = 0;
        p->parent = 0;
        p->name[0] = 0;
        p->killed = 0;
        p->state = UNUSED;
        release(&ptable.lock);
        return pid;
      }
    }

    // No point waiting if we don't have any children.
    if(!havekids || curproc->killed){
      release(&ptable.lock);
      return -1;
    }

    // Wait for children to exit.  (See wakeup1 call in proc_exit.)
    sleep(curproc, &ptable.lock);  //DOC: wait-sleep
}父进程调用 wait 就是用来负责回收子进程,流程为:

  • 首先遍历看看有没有孩子,没有孩子或者当前进程被杀死就直接退出
  • 有孩子进程就一定会 wait 到一个孩子变为僵尸进程为止,每次扫描一轮没有发生有子进程为僵尸进程就陷入睡眠,子进程调用 exit 后会唤醒父进程
  • 如果找到僵尸子进程成功,回收他的页表、内核栈、pcb
exit 和 wait 如何保证原子性和防止死锁(没有 lost wakeup)?因为他们使用表锁进行互斥,正好符合上面所说的生产者消费者模型
一般而言,父进程可能不会sleep两次?
4.进程调度

切换进程的流程是:进程 A --> scheduler --> 进程 B。进程 A 切换到调度进程是使用 sched 函数,sched 函数是 swtch 函数的封装。当切换回 scheduler 时,除了返回对应的切换上下文外,此时 scheduler 还需要保证关中断和拥有 ptable.lock。
在 swtch 和  scheduler 执行的过程中,必须要持有表锁。要知道执行 swtch 是会切换进程的,因为这个锁在一个进程中获取,在另一个进程中释放。这样使用锁很不常见,但这里是必须的。
也就是说,sched 是进程 A 取锁,进程 B 放锁。即使是 sched 之后的语句一般会来个放锁操作,新进程也是如此。
因此在进入 sched 前一定要获取 ptable.lock,进入调度有多种情况,我们逐一分析及验证:

  • yield 因时间片放弃 CPU
  • sleep 因阻塞放弃 CPU
  • exit 因死亡放弃 CPU
4.1 yield

4.1.1 时间中断

// Force process to give up CPU on clock tick.
  // If interrupts were on while locks held, would need to check nlock.
  if(myproc() && myproc()->state == RUNNING &&
     tf->trapno == T_IRQ0+IRQ_TIMER)
    yield();在 trap.c 中,当进程因为时间中断进入中断处理时,将会 yield 放弃 CPU。
4.1.2 yield

void
yield(void)
{
  acquire(&ptable.lock);  // DOC: yieldlock
  myproc()->state = RUNNABLE;
  sched();
  release(&ptable.lock);
}yield 对应于前文,首先要获取 ptable.lock 这把锁,更改状态为 RUNNABLE。
如果在 swtch 没有持有表锁会怎么样?例如:CPU 0 调用 yield 放弃进程A,然后 yield 中设置进程状态为 RUNNABLE。CPU0 在执行 sched 之前另一个 CPu1 会调度进程 A,在这短暂的时间内,这样的话一个进程运行在两个 CPU 上,内核栈将会发生错误。
void
sched(void)
{

  int intena;
  struct proc *p = myproc();

  if(!holding(&ptable.lock))
    panic("sched ptable.lock");

  if(mycpu()->ncli != 1)
    panic("sched locks");
   
  if(p->state == RUNNING)
    panic("sched running");

  if(readeflags()&FL_IF)
    panic("sched interruptible");

  intena = mycpu()->intena;
  swtch(&p->context, mycpu()->scheduler);
  mycpu()->intena = intena;
}sched 函数首先执行了一些检查:

  • 不可中断
  • 非运行态
  • 持有锁
intena 这里表示 pushcli 之前 CPU 允许中断的情况。这个值保存下来等待进程被调度到时,用来恢复 CPU 的中断情况,保证在同一个进程里中断允许情况一致。比如说在进程 A 放弃CPU之前是开中断,进程 A 恢复执行后,在 yield 中将会 popcli 恢复开中断。
xv6 下系统调用是异常门,所以内核下是可以开中断的。
4.1.3 进程B解锁

在 scheduler 中依旧还是关中断,等到调度到进程 B 后。
intena = mycpu()->intena;
  swtch(&p->context, mycpu()->scheduler);
  mycpu()->intena = intena;

}进程 B 在这里恢复 CPU 的中断情况
void
yield(void)
{

  acquire(&ptable.lock);  //DOC: yieldlock
  myproc()->state = RUNNABLE;
  sched();
  release(&ptable.lock);
}跳转回到 yield 函数,释放表锁,成功对应上文:进程A上锁,进程B解锁。从 yield 返回后就是跑到 trapret 恢复进程执行了,这里就不再多说。
假如是调度到一个新进程,新进程的第一条指令(不是用户指令啊!)是 forkret,在这里将会释放表锁,完成”进程B解锁“。
4.2  sleep

sleep 前面也看过了
// Go to sleep.
  p->chan = chan;
  p->state = SLEEPING;
  sched();在 sched 函数前会获取表锁,sched 函数后释放表锁,也就保证了前面说的。进程 A 上锁,进程B解锁。进入 sched 的状态不是 RUNNING!!
sleep 什么时候被使用到呢?

  • sleeplock
  • exit&wait
  • 外设驱动:系统调用 read、write 时 sleep,中断处理程序时 wakeup
4.3 exit

// Jump into the scheduler, never to return.
  curproc->state = ZOMBIE;
  sched();
  panic("zombie exit");exit 也是在 sched 前获取锁,然后设置为僵尸态。

-----------------------------
精选高品质二手iPhone,上爱锋贝APP
您需要登录后才可以回帖 登录 | 立即注册   

本版积分规则

快速回复 返回顶部 返回列表