其他分享
首页 > 其他分享> > [MIT 6.S081] Lab 7: Multithreading

[MIT 6.S081] Lab 7: Multithreading

作者:互联网

Lab 7: Multithreading

Uthread: switching between threads (moderate)

要点

思路

该部分实验是在用户模式模拟一个进程有多个用户线程. 通过 thread_create() 创建线程, thread_schedule() 进行线程调度. 这与 xv6 的线程调度是大同小异的.
对于此处用户多线程之间的切换, 也需要保存寄存器信息, 实际上就可以直接参考内核线程切换的 struct context 结构体, 需要保存的寄存器信息是一致的. 而每个线程会有独立的需要执行的函数和线程栈, 则需要在创建线程进行设置.

步骤

  1. 设置线程上下文结构体.
    由于此时用户多线程切换需要保存的寄存器信息和 xv6 内核线程切换需要保存的寄存器信息是一致的, 因此可以直接使用 kernel/proc.h 中定义的 struct context 结构体. 此处为了代码的清晰独立, 单独设置了 struct ctx 结构体, 其成员是和前者一致的.
// Saved registers for thread context switches. - lab7-1
struct ctx {
    uint64 ra;
    uint64 sp;

    // callee-saved
    uint64 s0;
    uint64 s1;
    uint64 s2;
    uint64 s3;
    uint64 s4;
    uint64 s5;
    uint64 s6;
    uint64 s7;
    uint64 s8;
    uint64 s9;
    uint64 s10;
    uint64 s11;
};
  1. 在线程结构体 struct thread 中添加线程上下文字段 context.
    很显然, 上文定义的线程上下文结构体 struct ctx 是和线程一一对应的, 应作为线程结构体的一个成员变量.
struct thread {
  char       stack[STACK_SIZE]; /* the thread's stack */
  int        state;             /* FREE, RUNNING, RUNNABLE */
  struct ctx context;       // thread's context - lab7-1
};
  1. 添加代码到 thread_create() 函数.
    thread_create() 函数主要进行线程的初始化操作: 其先在线程数组中找到一个状态为 FREE 即未初始化的线程, 然后设置其状态为 RUNNABLE 等进行初始化.
    这里要注意到, 传递的 thread_create() 参数 func 需要记录, 这样在线程运行时才能运行该函数, 此外线程的栈结构是独立的, 在运行函数时要在线程自己的栈上, 因此也要初始化线程的栈指针. 而在线程进行调度切换时, 同样需要保存和恢复寄存器状态, 而上述二者实际上分别对应着 rasp 寄存器, 在线程初始化进行设置, 这样在后续调度切换时便能保持其正确性.
void 
thread_create(void (*func)())
{
  struct thread *t;

  for (t = all_thread; t < all_thread + MAX_THREAD; t++) {
    if (t->state == FREE) break;
  }
  t->state = RUNNABLE;
  // YOUR CODE HERE
  // set thread's function address and thread's stack pointer - lab7-1
  t->context.ra = (uint64) func;
  t->context.sp = (uint64) t->stack + STACK_SIZE;
}
  1. 添加代码到 thread_schedule() 函数.
    thread_schedule() 函数负责进行用户多线程间的调度. 此处是通过函数的主动调用进行的线程切换. 其主要工作就是从当前线程在线程数组的位置开始寻找一个 RUNNABLE 状态的线程进行运行. 实际上与 kernel/proc.c 中的 scheduler() 函数是很相似的. 而很明显在找到线程后就需要进行线程的切换, 调用函数 thread_switch().
    thread_switch() 根据其在 user/thread.c 中的外部声明以及指导书的要求可以推断出, 该函数应该是定义在 user/uthread_switch.S, 用汇编代码实现. 因此其功能应该与 kernel/swtch.S 中的 swtch() 函数一致, 进行线程切换时的寄存器代码的保存与恢复.
void 
thread_schedule(void)
{
  struct thread *t, *next_thread;

  /* Find another runnable thread. */
  next_thread = 0;
  t = current_thread + 1;
  for(int i = 0; i < MAX_THREAD; i++){
    if(t >= all_thread + MAX_THREAD)
      t = all_thread;
    if(t->state == RUNNABLE) {
      next_thread = t;
      break;
    }
    t = t + 1;
  }

  if (next_thread == 0) {
    printf("thread_schedule: no runnable threads\n");
    exit(-1);
  }

  if (current_thread != next_thread) {         /* switch threads?  */
    next_thread->state = RUNNING;
    t = current_thread;
    current_thread = next_thread;
    /* YOUR CODE HERE
     * Invoke thread_switch to switch from t to next_thread:
     * thread_switch(??, ??);
     */
    thread_switch(&t->context, &current_thread->context);   // switch thread - lab7-1
  } else
    next_thread = 0;
}
  1. 最后在 user/uthread_switch.S 中添加 thread_switch 的代码. 正如上文所述, 该函数实际上功能与 kernel/swtch.S 中的 swtch 函数一致, 而由于此处 struct ctx 与内核的 struct context 结构体的成员是相同的, 因此该函数可以直接复用 kernel/swtch.S 中的 swtch 代码.
	.text

	/*
     * save the old thread's registers,
     * restore the new thread's registers.
     */

	.globl thread_switch
thread_switch:
	/* YOUR CODE HERE */
	# same as swtch in swtch.S - lab7
    sd ra, 0(a0)
    sd sp, 8(a0)
    sd s0, 16(a0)
    sd s1, 24(a0)
    sd s2, 32(a0)
    sd s3, 40(a0)
    sd s4, 48(a0)
    sd s5, 56(a0)
    sd s6, 64(a0)
    sd s7, 72(a0)
    sd s8, 80(a0)
    sd s9, 88(a0)
    sd s10, 96(a0)
    sd s11, 104(a0)

    ld ra, 0(a1)
    ld sp, 8(a1)
    ld s0, 16(a1)
    ld s1, 24(a1)
    ld s2, 32(a1)
    ld s3, 40(a1)
    ld s4, 48(a1)
    ld s5, 56(a1)
    ld s6, 64(a1)
    ld s7, 72(a1)
    ld s8, 80(a1)
    ld s9, 88(a1)
    ld s10, 96(a1)
    ld s11, 104(a1)
	ret    /* return to ra */

测试

思考题

Using threads (moderate)

预处理

  1. 使用如下命令构建 ph 程序, 该程序包含一个线程不安全的哈希表.
$ make ph
  1. 运行 ./ph 1 即使用单线程运行该哈希表, 输出如下, 其 0 个键丢失:
    在这里插入图片描述
  2. 运行 ./ph 2 即使用两个线程运行该哈希表, 输出如下, 可以看到其 put 速度近乎先前 2 倍, 但是有 16423 个键丢失, 也说明了该哈希表非线程安全.
    在这里插入图片描述

思考题

要点

步骤

  1. 定义互斥锁数组.
    根据指导书可知, 此处主要通过加互斥锁来解决线程不安全的问题. 此处没有选择使用一个互斥锁, 这样会导致访问整个哈希表都是串行的. 而考虑到对该哈希表, 实际上只有对同一 bucket 操作时才可能造成数据的丢失, 不同 bucket 之间是互不影响的, 因此此处是构建了一个互斥锁数组, 每个 bucket 对应一个互斥锁.
pthread_mutex_t locks[NBUCKET]; // lab7-2
  1. main() 函数中对所有互斥锁进行初始化.
int
main(int argc, char *argv[])
{
  pthread_t *tha;
  void *value;
  double t1, t0;

  if (argc < 2) {
    fprintf(stderr, "Usage: %s nthreads\n", argv[0]);
    exit(-1);
  }
  nthread = atoi(argv[1]);
  tha = malloc(sizeof(pthread_t) * nthread);
  srandom(0);
  assert(NKEYS % nthread == 0);
  for (int i = 0; i < NKEYS; i++) {
    keys[i] = random();
  }
  // initialize locks - lab7-2
  for(int i = 0; i < NBUCKET; ++i) {
      pthread_mutex_init(&locks[i], NULL);
  }

  //
  // first the puts
  // ...
}
  1. put() 中加锁.
    由于线程安全问题是由于对 bucket 中的链表操作时产生的, 因此要在对链表操作的前后加锁.
    但实际上, 对于加锁的临界区可以缩小至 insert() 函数. 原因是 insert() 函数采取头插法插入 entry, 在函数的最后才使用 *p=e 修改 bucket 链表头 table[i] 的值, 也就是说, 在前面操作的同时, 并不会对 bucket 链表进行修改, 因此可以缩小临界区的方法. 实际上加锁的范围可以缩小至 *p=e 前后, 但由于需要修改 insert() 函数, 此处便未这样修改.
static 
void put(int key, int value)
{
  int i = key % NBUCKET;

  // is the key already present?
  struct entry *e = 0;
  for (e = table[i]; e != 0; e = e->next) {
    if (e->key == key)
      break;
  }
  if(e){
    // update the existing key.
    e->value = value;
  } else {
    pthread_mutex_lock(&locks[i]);    // lock - lab7-2
    // the new is new.
    insert(key, value, &table[i], table[i]);
    pthread_mutex_unlock(&locks[i]);  // unlock - lab7-2
  }
}
  1. 不需要在 get() 中加锁.
    get() 函数主要是遍历 bucket 链表找寻对应的 entry, 并不会对 bucket 链表进行修改, 实际上只是读操作, 因此无需加锁.
  2. 修改 NBUCKET 避免并发写入内存重叠.
    之所以在修改前两个线程运行有并发问题, 是由于 b%NBUCKET==0, 因此两个同时运行的线程, put(keys[b*n+i],n)时, 实际上都是在操作 i%NBUCKET 这一 bucket, 也就很可能同时修改 bucket 的链表头, 即指导书中所说的写入同一物理内存, 自然容易发生丢失修改, 而且加锁后因为会同时争用锁, 也会影响并发性能.
    可以通过修改 NBUCKET 使得 b%NBUCKET!=0, 这样两个同时运行的线程进行 put() 时便大概率不会对同一个 bucket 进行操作, 自然也就减少了锁的争用, 能够一定程度上挺高并发性能.
    此处选择 NBUCKET=7, 在两个线程时, b%NBUCKET=50000%7!=0.
#define NBUCKET 7   // lab7

测试

  1. 修改 NBUCKET 前测试:
    • ./ph 2 测试: 可以加锁后两个线程运行不会有 key 丢失, 同时可以看到 put 的性能虽然相比加锁之前有所减小, 但实际影响不大.
      在这里插入图片描述
    • ./grade-lab-thread ph_safe单项测试:
      在这里插入图片描述
    • ./grade-lab-thread ph_fast 单项测试: 实际上此处在未修改 NBUCKET 前就可以通过 ph_fast 的测试.
      在这里插入图片描述
  2. 修改 NBUCKET=7 后:
    • ./ph 2 测试: 可以看到修改后的 put 和 get 性能达到了 35000 左右每秒, 达到甚至超过了最初不加锁的性能.
      在这里插入图片描述
    • ./grade-lab-thread ph_fast 单项测试: 修改后, 该测试的完成时间减少到 19.1s, 相比修改前的 22.3s 也有一定的提升.
      在这里插入图片描述

Barrier(moderate)

预处理

  1. 运行如下命令构建 barrier 程序, 该程序要求多线程同时执行到同一位置后再继续运行, 即多线程同步问题.
$ make barrier
  1. 运行 ./barrier 2 即使用两个线程运行该程序, 输出如下, 即最初版本不满足该性质, 会致使运行失败.
    在这里插入图片描述

要点

思路

此处主要涉及互斥锁和条件变量配合达到线程同步.
首先条件变量的操作需要在互斥锁锁定的临界区内.
然后进行条件判断, 此处即判断是否所有的线程都进入了 barrier() 函数, 若不满足则使用 pthread_cond_wait() 将当前线程休眠, 等待唤醒; 若全部线程都已进入 barrier() 函数, 则最后进入的线程会调用 pthread_cond_broadcast() 唤醒其他由条件变量休眠的线程继续运行.
需要注意的是, 对于 pthread_cond_wait() 涉及三个操作: 原子的释放拥有的锁并阻塞当前线程, 这两个操作是原子的; 第三个操作是由条件变量唤醒后会再次获取锁.

步骤

根据上述思路, 在 barrier() 函数中添加如下代码.
注意对于变量 bstate.roundbstate.nthread 的设置需要在 pthread_cond_broadcast() 唤醒其它线程之前, 否则其他线程进入下一轮循环时可能这两个字段的值还未得到修改.

static void 
barrier()
{
  // YOUR CODE HERE
  //
  // Block until all threads have called barrier() and
  // then increment bstate.round.
  //
  // lab7-3
  pthread_mutex_lock(&bstate.barrier_mutex);
  // judge whether all threads reach the barrier
  if(++bstate.nthread != nthread)  {    // not all threads reach    
    pthread_cond_wait(&bstate.barrier_cond,&bstate.barrier_mutex);  // wait other threads
  } else {  // all threads reach
    bstate.nthread = 0; // reset nthread
    ++bstate.round; // increase round
    pthread_cond_broadcast(&bstate.barrier_cond);   // wake up all sleeping threads
  }
  pthread_mutex_unlock(&bstate.barrier_mutex);
}

测试

标签:NBUCKET,函数,thread,bucket,switch,线程,Multithreading,S081,MIT
来源: https://blog.csdn.net/LostUnravel/article/details/121430791