其他分享
首页 > 其他分享> > absl教程(五):Synchronization library

absl教程(五):Synchronization library

作者:互联网

同步库包括用于跨不同线程管理任务的抽象和原语。该库包含以下头文件:

Abseilbase库还包括许多与并发相关的头文件:

本文档将涵盖上述所有内容。

同步概述

在顺序(即单线程)系统中,我们通常认为事件以特定的总顺序发生:对于任何操作 A 和 B,要么 A 发生在 B 之前,要么 B 发生在 A 之前。在并发系统中,这不再是情况:对于某些操作对,可能无法说哪个更早发生(即事件只是部分排序的),在这种情况下,我们说它们同时发生。请注意,此定义与它们是否“实际上”同时发生无关,而仅与我们是否可以保证它们不会发生有关。

如果在多线程环境中没有正确使用(或设计)并发操作可能会发生冲突,从而导致以下问题:

在任何一种情况下,对共享资源缺乏控制或对操作顺序缺乏控制都可能导致竞争条件。该库中并发抽象的目的是解决这些问题并避免此类竞争条件。

内存访问问题

内存访问问题通常通过多种方式解决,包括:

锁定对共享资源的访问通常通过称为互斥锁的互斥锁来解决Mutex 为此,Abseil 提供了自己的类;类似地,C++ 标准库提供了一个std::mutex用于相同目的的 类。(互斥体设计说明Mutex中讨论了我们实现自己的类的原因 。)

无论操作的顺序、调度或交错如何,行为正确的类型都被称为线程安全的。在大多数情况下,此类类型在幕后使用互斥锁和原子操作来保护对对象内部状态的访问。

有关更多信息,请参阅下面的互斥锁

同步操作

除了简单的内存访问问题之外,同步问题通常更复杂,需要专门构建的抽象来解决底层问题,(同样,通常通过互斥锁和原子操作,正确实现非常棘手)。同步操作旨在控制不同线程中事件的顺序。

请记住,对“线程安全”类型的操作不一定是同步操作。当您读取另一个线程写入的值时,您不能假设写入发生在读取之前;它们可能同时发生。

例如:

foo::counter first, second;

void thread1() {
  first.Add(1);    // (a)
  second.Add(1);   // (b)
}

void thread2() {
  while (second.value() == 0) {
    sleep(10);
  }
  CHECK(first.value() == 1);   // ERROR
}

即使 foo::counter 是线程安全的(并且您不需要担心数据竞争),您可能会认为它CHECK()会成功,因为第 (a) 行发生在第 (b) 行之前,并且CHECK()不可能是到达,直到行 (b) 已执行。但是,除非Add()value()也是同步操作,线程 1 中的任何操作都不需要在线程 2 中的任何操作之前发生,这CHECK()可能会失败。

Abseil 提供了几个同步抽象。有关更多信息,请参阅 同步操作

互斥体

主要原语并发任务内的使用是互斥,这是一个 MUT uallyclusive锁可用于防止多线程访问和/或写入到共享资源。

absl::Mutexstd::mutex

Abseil 提供了自己的Mutex类,在 Google 中,我们使用这个类而不是类似的std::mutex. Mutex提供了 的大部分功能,std::mutex但增加了以下附加功能:

此外,absl::Mutex可以作为读写锁(像 std::shared_mutex)带特殊ReaderLock()ReaderUnlock()功能。(请参阅下面的读写锁。)

我们发现这些功能对于维护庞大而复杂的代码库至关重要。我们不一定要与std::mutex自己竞争 ;如果您发现代码库中可用的功能,请考虑absl::Mutex及其实用程序。

就像std::mutex,Abseil 的互斥锁是不可重入的(也称为非递归)。同样,它在短期内不提供严格的 FIFO 行为或公平性;这样做需要大量的开销。然而,从长远来看,它往往是大致公平的。

互斥基础

absl::Mutex类实现的一些资源互斥锁,允许线程也使用互斥,以避免该资源,这通常是一些变量或数据结构相关联的不变量的并行访问。例如,金融交易系统可能希望一次只有一个写入者访问某些数据元素。互斥体是如此常见,以至于人们创造了许多词来描述它们的操作。

每个Mutex都有两个基本操作:Mutex::Lock()Mutex::Unlock()。从概念上讲,它只有一点抽象状态:指示它是true(锁定)或false(解锁)的布尔值。当Mutex被创建,该锁定是最初false和互斥被认为是自由解锁Lock()阻塞调用者直到互斥体空闲的某个时刻,然后以原子方式将此互斥体状态从 更改falsetrue;然后互斥被调用者持有锁定。再次Unlock()设置此互斥锁状态 false

调用Lock()通常称为锁定获取互斥锁,而调用Unlock()称为解锁释放互斥锁。线程在持有互斥锁时执行的操作称为在互斥锁执行的操作 。在互斥锁下操作的数据结构及其不变量被称为互斥锁保护

的客户Mutex必须遵守这些规则:

  1. 每次线程获取 aMutex它必须稍后释放它。
  2. 线程可能不会尝试释放 a,Mutex除非它持有它。
  3. 线程可能不会尝试获取Mutex它已经持有的排他锁。

因为Lock()以原子方式改变互斥锁的状态,所以我们保证,如果遵循这些规则,在任何给定时间只有一个线程可以持有互斥锁。

必须遵循这些规则以防止并发访问受保护资源并避免死锁,在死锁中线程阻塞等待释放锁。最后一条规则防止自死锁,当互斥锁的持有者试图获取它已经持有的互斥锁时。这种互斥体被称为非递归(或非可重入)互斥体。

最好遵循这些规则,在同一过程中将具有匹配调用Lock()Unlock()互斥锁的代码区域括起来。这些代码段称为临界区。许多 Google C++ 代码使用 helper 类MutexLock,它通过 RAII 在构造时自动获取互斥锁,并在锁超出范围时释放它。

互斥体和不变量

大多数互斥锁用于确保某些不变状态只能在互斥锁被保持时以原子方式更改。要求程序员在释放互斥量之前重新建立不变量;然后,代码可以在每次获取互斥锁时假定不变量成立,即使在临界区期间临时使不变量失效的更新也是如此 。然而,不能保证不变量在不持有互斥锁的线程中为真,因为互斥锁持有者可能正在改变监控状态。

例如,假设Mutex mu保护不变量a + b == 0。这段代码是合法的:

mu.Lock();
assert(a + b == 0); // invariant assumed to hold
a++;                // invariant temporarily invalidated
b--;                // invariant restored before mu is released
mu.Unlock();

虽然这段代码是错误的:

mu.Lock();
assert(a + b == 0); // invariant assumed to hold
a++;                // invariant invalidated
mu.Unlock();        // BUG: mutex released while invariant invalid
mu.Lock();
b--;                // attempt to restore the invariant,
                    // but the damage is already done
mu.Unlock();

以下内容不会使不变量无效,但在未持有锁时错误地假设它为真:

mu.Lock();
assert(a + b == 0); // correct: invariant assumed to hold
mu.Unlock();
assert(a + b == 0); // BUG: can't assume invariant without lock

仅当对单个临界区内观察到的状态进行评估时,不变量才成立:

mu.Lock();
assert(a + b == 0);    // correct: invariant assumed to hold
temp = a;              // takes a temporary copy of "a"
mu.Unlock();
mu.Lock();
assert(temp + b == 0); // BUG: can't assume invariant on state
                       // from two distinct critical sections
mu.Unlock();

MutexLock包装

忘记获取和释放锁Mutex通常会导致您的代码出错。Abseil 并发库包括一个MutexLock包装类,使获取和发布Mutex更容易。该类使用 RAII 来获取互斥锁,并在该类超出范围时自动释放它。

例子:

Class Foo {

  Foo::Bar* Baz() {
    MutexLock l(&lock_);
    ...
    return bar;
  }

  private:
    Mutex lock_;
  };
}

条件互斥体行为

Mutex可以将Abseil配置为阻塞线程,直到特定条件发生。这种条件行为通过两种方式实现:传统的 条件变量 CondVar(类似于std::condition_variable available to std::mutex)或通过 Abseil 的互斥锁独有的机制: 条件临界区,使用Mutex::Condition.

条件临界区(使用Condition构造)通常是首选,因为使用单独的条件变量已被证明是容易出错的。在Mutex包含大量的成员函数(例如,Mutex::Await()是很难得到错误的)。通常,更喜欢使用Condition; 然而,在极少数情况下,当有多个线程在等待明显不同的条件时,一组CondVars 可能更有效。

条件临界区

Abseil’sMutex已通过添加条件临界区(条件变量的替代方案)进行了扩展。成员函数,例如 Mutex::Await()Mutex::LockWhen()使用内部Condition谓词来允许客户端在不需要条件变量的情况下等待条件;客户端不需要编写 while 循环,也不需要使用Signal().

客户可以想象互斥锁包含一个虚构的条件变量 mu.cv;有了这个假设,左边和右边的这些相应的代码片段是等效的:

条件临界区条件变量
mu.Lock(); ... // 任意代码 A mu.Await(条件(f, arg)); ... // 任意代码 B mu.Unlock(); mu.LockWhen(条件(f, arg)); ... // 任意代码 C mu.Unlock();mu.Lock(); ... // 任意代码 A 而 (!f(arg)) { mu.cv.Wait(&mu); } ... // 任意代码 B mu.Unlock(); mu.Lock(); 而 (!f(arg)) { mu.cv.Wait(&mu); } ... // 任意代码 C mu.Unlock();

使用LockWhen()和 时Await(),条件必须封装在f带有void *参数(或 lambda)的函数中(在示例中)。与条件变量一样,条件必须是受互斥锁保护的状态函数。通过消除对现在隐藏在Mutex实现中的条件变量和 while 循环的需要,需要函数的轻微不便得到了回报。

更好的是,Mutex::Unlock()将负责呼叫SignalSignalAll()唤醒条件现在为真的服务员;它的性能通常与使用条件变量实现的性能一样好或更好。因此,上一小节的例子可以写成:

bool f(bool *arg) { return *arg; }

// Waiter
mu.LockWhen(Condition(f, &cond_expr));
// cond_expr now holds
...
mu.Unlock();

// Waker
mu.Lock();
Make_cond_expr_True();
// cond_expr now holds
mu.Unlock();

在极少数情况下,当许多线程可能以许多不同且通常为假的条件等待时,最好使用多个条件变量。通常,为了简单起见,我们建议使用条件临界区。

调用mu.LockWhen(Condition(f, arg))相当于 mu.Lock(); mu.Await(Condition(f, arg))。同样,调用 mu.Await(Condition(f, arg))等效于 mu.Unlock(); mu.LockWhen(Condition(f, arg))

变体LockWhenWithTimeout()AwaitWithTimeout()允许线程等待条件变为真或等待一段时间过去。true如果条件为真,它们每个都返回:

if (mu.LockWhenWithTimeout(Condition(f, &cond_expr), 1000 /*ms*/)) {
  // mu held; cond_expr true
} else {
  // mu held; cond_expr false; 1000ms timeout expired
}
mu.Unlock();

这些调用类似于cv.WaitWithTimeout().

CondVar 条件变量

条件变量与条件临界区的用途相同;它们是一种阻塞线程直到满足某些条件的方法。通常,条件临界区更容易使用,但条件变量对于程序员来说可能更熟悉,因为它们包含在 POSIX 标准和 Java 语言中。

孤立地看,条件变量允许线程阻塞并被其他线程唤醒。但是,条件变量被设计为以特定方式使用;条件变量与互斥锁交互,以便轻松等待互斥锁保护的状态上的任意条件。

假设一个线程要等待某个布尔表达式cond_expr变为true,其中关联的状态cond_expr受互斥锁保护mu。程序员会写:

// Waiter
mu.Lock();
while (!cond_expr) {
  cv.Wait(&mu);
}
// cond_expr now holds
...
mu.Unlock();

所述Wait()呼叫原子解锁mu(其中线程必须持有),并且基于条件变量块cv。当另一个线程向条件变量发出信号时,该线程将重新获取mu,并绕过强制的 while 循环来重新检查cond_expr

另一个使 cond_expr 为真的线程可能会执行:

// Waker
mu.Lock();
Make_cond_expr_True();
// cond_expr now holds
cv.Signal();
mu.Unlock();

调用Signal()唤醒至少一个等待的线程cv。在任何给定时间,许多线程可能因条件变量而被阻塞;如果唤醒多个这样的线程有意义,SignalAll()则可以使用。(该 SignalAll()功能在其他实现中通常称为广播。)

等待不同条件的线程可以使用单个条件变量。但是,在这种情况下,SignalAll()必须在任何条件变为 时使用trueCondVar否则实现无法保证唤醒正确的线程。对每个不同的条件使用一个条件变量会更有效;任何数量的条件变量都可以与单个互斥锁一起使用。

如果没有要唤醒的线程,则Signal()和两者SignalAll()都是有效的。客户端应调用Signal()SignalAll()在使条件为真的关键部分内。

该调用WaitWithTimeout()允许线程等待,直到条件为 true,或者直到经过一段时间。就像Wait()WaitWithTimeout() 总是在返回之前重新获取互斥锁。

例子:

static const int64 kMSToWait = 1000;  // we'll wait at most 1000ms
int64 ms_left_to_wait = kMSToWait;    // ms to wait at any given time
int64 deadline_ms = GetCurrentTimeMillis() + ms_left_to_wait;
mu.Lock();
while (!cond_expr && ms_left_to_wait > 0) {
  cv.WaitWithTimeout(&mu, ms_left_to_wait);
  ms_left_to_wait = deadline_ms - GetCurrentTimeMillis();
}
if (cond_expr) {
  // cond_expr true
} else {
  // cond_expr false; 1000ms timeout expired
}
mu.Unlock();

读写锁

读写器(共享独占)锁有两种锁定模式。如果锁不是空闲的,它可能被单个线程以写(也称为独占)模式持有,或者被一个或多个线程以读取(也就是共享)模式持有。

使用读写锁来保护经常读取但不经常修改的资源或数据结构是很自然的。修改受保护状态的临界区必须在写模式下获得锁,而那些仅仅读取状态的临界区可以在读模式下获得锁。

注意:读写锁在某些情况下可以减少锁争用,但大多数锁的争用程度不足以对此产生影响。当您使用共享锁时,您有责任确保阅读器临界区中的代码确实不会改变数据(逻辑上或物理上),并且此处的任何错误都可能导致微妙的竞争条件。

AnyMutex可以用作读写锁。该Lock()调用以写入模式获取锁,并且必须与对 的调用配对Unlock()。该 ReaderLock()调用以读取模式获取锁,并且必须与对 的调用配对ReaderUnlock()absl::Mutex不提供在没有首先释放锁的情况下从读锁转换为写锁或反之亦然的操作。

条件变量 和mu.Await()都可以与Mutex读模式临界区和写模式临界区一起使用。

Mutex不允许读者饿死作家,反之亦然。这导致了一个稍微令人惊讶的效果,即即使读锁已经被读取器持有,对读锁的请求也可能会阻塞。如果这不可能发生,那么两个或多个将锁永久保持在读取模式的读取器可以阻止等待的写入器取得进展,而没有任何人无限期地持有锁。

请注意,未来的维护者可能会意外地向“读者”临界区添加突变,从而引入错误。例如,自我优化的数据结构(如伸展树或 LRU 缓存)可能会在每次读取时修改数据结构。即使是简单的数据结构也可以跟踪使用统计。因此,读者锁应谨慎使用,其使用方法应易于开发人员识别。它可以帮助禁止使用指向const数据结构的指针进行修改,但由于 C++ 的mutable关键字,仍然必须小心谨慎。

我们建议Mutex最初使用简单的互斥锁;仅当您知道存在锁争用并且知道写入不频繁时才引入读取器锁。

线程注释

的主要缺点absl::Mutex是任何互斥类型的缺点:您必须记住在进入临界区之前锁定它,离开时必须记住解锁它,并且必须避免死锁。

为了帮助解决这些问题,Abseil 提供了线程安全注解 (in base/thread_annotations.h) 来指定哪些变量由哪些互斥锁保护,在调用哪些函数时应该保留哪些互斥锁,应该以什么顺序获取互斥锁,等等。然后在编译时检查这些约束,虽然这种机制不是万无一失的,但它确实捕获了许多常见的Mutex使用错误。除了作为代码文档的一部分之外,编译器或分析工具还可以使用注释来识别和警告潜在的线程安全问题。

注释指南

每个需要互斥锁保护的数据对象(无论是命名空间范围内的全局变量还是类范围内的数据成员)都应该有一个注释, GUARDED_BY指示哪个互斥锁保护它:

int accesses_ GUARDED_BY(mu_); // count of accesses

每个互斥体都应该有一个补充注释,指示它保护哪些变量以及任何不明显的不变量:

Mutex mu_;       // protects accesses_, list_, count_
                 // invariant: count_ == number of elements in linked-list list_

每当一个线程可以同时持有两个互斥锁时,其中一个(或两个)互斥锁应该用ACQUIRED_BEFORE或进行注释,ACQUIRED_AFTER以指示必须首先获取哪个互斥锁:

Mutex mu0_ ACQUIRED_BEFORE(mu1_); // protects foo_

如果互斥量获取顺序不一致,可能会导致死锁。有关并发库中用于检测死锁的实用程序,请参阅 死锁检测

每个例程都应该被注释或有一个注释,指出哪些互斥体必须和不能在进入时保留。这些注释允许实现者在不检查调用站点的情况下编辑例程,并允许客户端在不读取它们的主体的情况下使用例程。

注释EXCLUSIVE_LOCKS_REQUIREDSHARED_LOCKS_REQUIREDLOCKS_EXCLUDED用于记录此类信息。由于我们目前使用 GCC 的“属性”来实现注解,因此它们只能应用于函数声明,而不能应用于定义(除非它们在类定义中)。

如果一个例程获得了mu,我们必须用以下注解它的声明 LOCKS_EXCLUDED(mu)

// Function declaration with an annotation
void CountAccesses() LOCKS_EXCLUDED(mu_);

// Function definition
void CountAccesses() {
  this->mu_.Lock();
  this->accesses_++;
  this->mu_.Unlock();
}

如果一个例程希望被muhold住,我们必须用EXCLUSIVE_LOCKS_REQUIRED(mu)SHARED_LOCKS_REQUIRED(mu)取决于例程是否需要写入器或读取器锁来注释它 :

// Function declaration with an annotation
void CountAccessesUnlocked() EXCLUSIVE_LOCKS_REQUIRED(mu_);

// Function definition
void CountAccessesUnlocked() {
  this->accesses_++;
}

如果没有这样的注释/评论,使用互斥锁会困难得多。我们强烈推荐使用它们。

僵局

当一项活动 试图获取已耗尽且无法补充的有限资源时,就会发生死锁(有时称为致命拥抱),除非该活动取得进展。

在考虑仅涉及互斥锁的死锁时,每个活动通常由一个线程表示,资源是互斥锁,持有时耗尽,释放时补充。

最简单的互斥锁相关死锁是自死锁

mu.Lock();
mu.Lock();      // BUG: deadlock: thread already holds mu

涉及两个资源(例如互斥锁和有限大小的线程池)的死锁也很容易产生,但涉及三个或更多资源的死锁不太常见。当线程 T0 在持有 M0 的同时尝试获取 M1 的同时,线程 T1 在持有 M1 的同时尝试获取 M0 时,会导致双互斥锁死锁;每个线程将无限期地等待另一个。

死锁检测

幸运的是,死锁是最容易调试和避免的错误之一。调试通常很容易,因为地址空间正好在错误发生的地方停止。线程的堆栈跟踪通常是查看线程正在等待什么以及它们持有什么资源所需的全部内容。

此外,absl::MutexAPI 还提供了额外的死锁检测。仅当应用程序在调试模式下编译且标志-synch_deadlock_detection非零时,才会启用此类检测。启用后,API 会检测另外两种类型的死锁情况:

如果-synch_deadlock_detection=1,则为每个锁定顺序错误打印一条消息。如果-synch_deadlock_detection=2,第一个锁顺序错误导致进程中止。

不建议将以下调用用于生产代码,但在启用死锁检测时很有用:

请注意,死锁检测会引入大量开销;不应在生产二进制文件中启用它。

其他同步操作

大多数不限于内存访问问题的并发问题都属于“同步”操作的大类。在并发系统中,同步操作通常包含必须确保严格排序的操作。

Abseil 包含许多同步抽象:

call_once()

absl::call_once()是 C++ 标准库的快速实现 std::call_once(),用于在所有线程中最多调用一次给定函数。您将三个参数传递给call_once(): aonce_flag以协调和识别唯一调用者,一个要调用的函数,以及要使用该函数调用的参数。

call_once()使用特定once_flag参数(不引发异常)的第一次调用将使用提供的参数运行传递的函数;具有相同once_flag参数的其他调用将不会运行该函数,但会等待提供的函数完成运行(如果它仍在运行)。

这种机制为多线程进程中的一次性初始化提供了一种安全、简单、快速的机制:

class MyInitClass {
  public:
  ...
  absl::once_flag once_;

  MyInitClass* Init() {
    absl::call_once(once_, &MyInitClass::Init, this);
    return ptr_;
  }
}

Notification

ANotification允许线程接收单个事件发生的通知。线程使用通知的WaitForNotification*()成员函数之一注册通知。 Notification::Notify()用于通知那些等待的线程事件已经发生,并且对于任何给定的通知只能调用一次。

例子:

// Create the notification
Notification notification_;

// Client waits for notification
void Foo() {
  notification_.WaitForNotification();
  // Do something based on that notification
}

//
void Bar() {
  // Do a bunch of stuff that needs to be done before notification
  notification_.Notify();
}

障碍

Anabsl::Barrier阻塞线程,直到线程的预先指定阈值利用屏障。一个线程Barrier通过调用Block()屏障来利用,这将阻塞该线程;Block()在指定数量的线程调用它之前,不会返回任何调用。

例子:

Barrier *barrier_ = new Barrier(num_active_threads);

void Foo() {
  if (barrier_->Block()) {
    delete barrier_;  // This thread is responsible for destroying barrier_;
  }
  // Do something now that the Barrier has been reached.
}

阻塞计数器

Anabsl::BlockingCounter阻止所有线程执行预先指定数量的操作。线程调用Wait()阻塞计数器以阻塞直到发生指定数量的事件;工作线程DecrementCount()在完成工作后调用计数器。一旦计数器的内部“计数”达到零,被阻塞的线程就会解除阻塞。

例子:

void Foo() {
  BlockingCounter things_to_process(things.size());
  Process(&things_to_process)
  things_to_process.Wait();
}

void Process(BlockingCounter* things) {
  // Do stuff
  things->DecrementCount();
  return;
}

标签:Unlock,library,mu,互斥,死锁,Synchronization,线程,absl,Mutex
来源: https://blog.csdn.net/yao_hou/article/details/120402340