作者:jiyf
原文来源: https://tidb.net/blog/9722d003
【是否原创】是\ 【首发渠道】TiDB 社区
死锁检测 leader
每个 tikv 都会开启死锁检测进程,开启的进程有 leader 和 follow 两种角色可以切换,默认为 follow 角色。
leader 角色:维护锁的 DAG 信息,接受检测请求,计算 DAG 检测是否有锁存在等
follow 角色:通过 GRPC 将检测请求发送给 leader,接受检测结果
死锁检测 的 leader 为:包含 key 为空字符串的 region,也就是集群按 key 排序第一个 region 的 region leader 所在的 store 将成为死锁检测的 leader。
const LEADER_KEY: &[u8] = b""; fn is_leader_region(region: &Region) -> bool { // The key range of a new created region is empty which misleads the leader // of the deadlock detector stepping down. // If the peers of a region is not empty, the region info is complete. is_region_initialized(region) && region.get_start_key() <= LEADER_KEY && (region.get_end_key().is_empty() || LEADER_KEY < region.get_end_key())
通过订阅 region 变化的信息,包括 region 创建、更新、role 角色变更来捕获 leader region 的角色变化,然后同步死锁检测的角色。
pub(crate) fn register(self, host: &mut CoprocessorHost<RocksEngine>) { host.registry .register_role_observer(1, BoxRoleObserver::new(self.clone())); // role 变化的时候调用 host.registry .register_region_change_observer(1, BoxRegionChangeObserver::new(self)); // region 变化的时候改变
检测接口函数
死锁检测接口函数,当自己是 leader 时候,直接调用 local 函数接口,如果是 follower,那么通过 grpc 向 leader 查询。
/// Handles detect requests of itself. /// 处理锁 detect. fn handle_detect(&mut self, tp: DetectType, txn_ts: TimeStamp, lock: Lock) { if self.is_leader() { self.handle_detect_locally(tp, txn_ts, lock); } else { for _ in 0..2 { if self.send_request_to_leader(tp, txn_ts, lock) { return;
从这里接口对应三种查询类型:
DetectType::Detect,也就是死锁检测的接口,当悲观事务遇到一个锁的时候,就会通过这个接口来检测是否产生了死锁 DetectType::CleanUpWaitFor,删除事务等待的一个锁,事务对这个锁没有等待了 DetectType::CleanUp,删除这个事务所有等待的锁,比如事务回滚了,所以就清除这个事务的锁等待信息
fn handle_detect_locally(&self, tp: DetectType, txn_ts: TimeStamp, lock: Lock) { let detect_table = &mut self.inner.borrow_mut().detect_table; // 原子锁的。 match tp { DetectType::Detect => { // 检测死锁是否存在。 if let Some(deadlock_key_hash) = detect_table.detect(txn_ts, lock.ts, lock.hash) { self.waiter_mgr_scheduler .deadlock(txn_ts, lock, deadlock_key_hash); // 处理死锁吧? DetectType::CleanUpWaitFor => { // 清理一个索等待。 detect_table.clean_up_wait_for(txn_ts, lock.ts, lock.hash) DetectType::CleanUp => detect_table.clean_up(txn_ts), // 删除这个事务的锁等待。
死锁检测算法
Locks` is a set of locks belonging to one transaction. struct Locks { ts: TimeStamp, // 事务ts吧。 hashes: Vec
当悲观事务过程中,尝试锁定一个 key,发现 key 已经被上锁,这时候会调用死锁检测接口,假设当前事务是 txn_1,持有锁的事务是 txn_lock,检测当事务 txn_1 等待 txn_lock 的锁的情况下,存不存在死锁。
检测算法是构建一个 DAG 有向无环图,如果目前集群存在的锁中存在一条从 txn_lock 到 txn_1 的边,那么就代表死锁将会存在。
///, last_detect_time: Instant, /// Used to detect the deadlock of wait-for-lock in the cluster. pub struct DetectTable { /// Keeps the DAG of wait-for-lock. Every edge from
txn_ts
tolock_ts
has a survival time --ttl
. /// When checking the deadlock, if the ttl has elpased, the corresponding edge will be removed. ///last_detect_time
is the start time of the edge.Detect
requests will refresh it. // txn_ts => (lock_ts => Locks) wait_for_map: HashMap>, /// The ttl of every edge. ttl: Duration, /// The time of last `active_expire`. last_active_expire: Instant, now: Instant, 其中 wait_for_map 是个两层的 hashMap,第一层 key 是等待锁的事务 txn_ts,第二层 key 是等待的事务 txn_lock,第二层 value 是事务 txn_ts 等待事务 txn_lock 持有的锁列表。
wait_for_map 描述了集群事务锁等待的关系,通过 txn_lock,可以查询出当前事务在等待哪些事务的锁、等待哪些锁。
/// Returns the key hash which causes deadlock. /// // 检查是否存在死锁。 pub fn detect(&mut self, txn_ts: TimeStamp, lock_ts: TimeStamp, lock_hash: u64) -> Option{ let _timer = DETECT_DURATION_HISTOGRAM.start_coarse_timer(); TASK_COUNTER_METRICS.detect.inc(); self.now = Instant::now_coarse(); self.active_expire(); // 清理过期的。 // If `txn_ts` is waiting for `lock_ts`, it won't cause deadlock. // 已经有 txn_tx 等待 lock_ts,那么就不会存在 lock_ts 等待 txn_ts,也就是不会存在死锁。 if self.register_if_existed(txn_ts, lock_ts, lock_hash) { return None; if let Some(deadlock_key_hash) = self.do_detect(txn_ts, lock_ts) { ERROR_COUNTER_METRICS.deadlock.inc(); return Some(deadlock_key_hash); // 存在这个死锁。 self.register(txn_ts, lock_ts, lock_hash);
wait_for_ts算法流程:
清理过期锁(一般很少走这里,只有等待的事务数量达到100000,且距离上次清理达到1个小时才会执行) 检查是否存在 txn_ts 在等待事务 txn_lock 的锁,如果已经存在,那么必然不存在 txn_lock 到 txn_ts 的边,必然不会有死锁,那么加入新的锁,返回 调用 do_detect 函数,遍历构建所有 DAG 检查是否有存在 txn_lock 到 txn_ts 的边,如果存在那么死锁就存在 如果没有死锁存在,那么说明 txn_ts 等待 txn_lock 不会产生死锁,把 txn_ts 等待 txn_lock 的锁信息添加进去 /// Checks if there is an edge fromto
txn_ts`. /// 检查有没有从 wait_for_ts 到 txn_tx 的锁。 fn do_detect(&mut self, txn_ts: TimeStamp, wait_for_ts: TimeStamp) -> Option{ let now = self.now; let ttl = self.ttl; let mut stack = vec![wait_for_ts]; // Memorize the pushed vertexes to avoid duplicate search. let mut pushed: HashSet<TimeStamp> = HashSet::default(); pushed.insert(wait_for_ts); while let Some(wait_for_ts) = stack.pop() { if let Some(wait_for) = self.wait_for_map.get_mut(&wait_for_ts) { // Remove expired edges. wait_for.retain(|_, locks| !locks.is_expired(now, ttl)); // 清理过期的。 if wait_for.is_empty() { self.wait_for_map.remove(&wait_for_ts); // 清理掉。 } else { for (lock_ts, locks) in wait_for { if *lock_ts == txn_ts { return Some(locks.hashes[0]); if !pushed.contains(lock_ts) { stack.push(*lock_ts); pushed.insert(*lock_ts);
do_detect 函数构建 DAG 遍历所有从 wait_for_ts(txn_lock)出发的可能,检查有没有到 txn_ts 的边,如果有,那么返回一个存在的锁的 hash,告诉死锁的存在。
唤醒锁等待
死锁检测中维护了从事务出发可以找到所有等待的锁的信息,当锁被释放、超时、死锁存在情况下,需要唤醒等待锁的事务,这里就需要根据锁 id 找到等待的事务,进行唤醒操作。
锁等待信息
/// If a pessimistic transaction meets a lock, it will wait for the lock /// released in `WaiterManager`. /// `Waiter` contains the context of the pessimistic transaction. Each `Waiter` /// has a timeout. Transaction will be notified when the lock is released /// or the corresponding waiter times out. pub(crate) struct Waiter { pub(crate) start_ts: TimeStamp, pub(crate) cb: StorageCallback, /// The result of `Command::AcquirePessimisticLock`. /// It contains a `KeyIsLocked` error at the beginning. It will be changed /// to `WriteConflict` error if the lock is released or `Deadlock` error if /// it causes deadlock. pub(crate) pr: ProcessResult, pub(crate) lock: Lock, delay: Delay, _lifetime_timer: HistogramTimer,
start_ts: 代表等待锁的事务 ts
Waiters
lock: 代表等待的锁 pr: 代表等待的锁的结果,例如锁冲突、死锁等 delay: 等待超时时间 cb: 回调函数,唤醒函数,把锁等待结果 pr 返回给等待锁的事务的钩子函数 // NOTE: Now we assumeis not very long. // Maybe needs to use
BinaryHeapor sorted
VecDeque` instead. type Waiters = Vec; struct WaitTable { // Map lock hash to waiters. wait_table: HashMap
, waiter_count: Arc , wait_table 维护了等待某个锁的所有事务列表,key 为锁的 hashId,value 是等待这个锁的所有 Waiter。
至此,当某个 key 上的锁被释放时候,根据锁的 hash ID 查找到所有的 Waiter,选择等待时间最早的事务进行直接唤醒。
当事务提交或者回滚以后,事务持有的锁将会被释放,事务持有的每一个锁,都会对其 Waiter 进行唤醒操作(只唤醒等待锁最久的 Waiter)。
fn handle_wake_up(&mut self, lock_ts: TimeStamp, hashes: Vec<u64>, commit_ts: TimeStamp) { for hash in hashes { // 对于事务的每一个锁都进行唤醒操作 let lock = Lock { ts: lock_ts, hash }; // 找到最老的 waiter 进行唤醒 if let Some((mut oldest, others)) = wait_table.remove_oldest_waiter(lock) { // Notify the oldest one immediately. self.detector_scheduler .clean_up_wait_for(oldest.start_ts, oldest.lock); oldest.conflict_with(lock_ts, commit_ts); oldest.notify(); // Others will be waked up after `wake_up_delay_duration`. // NOTE: Actually these waiters are waiting for an unknown transaction. // If there is a deadlock between them, it will be detected after timeout. if others.is_empty() { // Remove the empty entry here. wait_table.remove(lock); } else { others.iter_mut().for_each(|waiter| { waiter.conflict_with(lock_ts, commit_ts); waiter.reset_timeout(new_timeout);
参数 lock_ts 代表是有锁的事务,hashes 代表持有的锁信息。
LockManager
找到等待时间最久的 Waiter,从 Waiter 列表中删除 删除死锁检测维护的 txn_ts 到 txn_lock 的锁等待信息 构建唤醒的 pr 结果,调用唤醒函数 对于其他的 waiter(除了等待最久剩余的),构建唤醒的 pr,通过等待超时方式唤醒 锁管理接口
///has two components working in two threads: /// * One is the
WaiterManagerwhich manages transactions waiting for locks. /// * The other one is the
Detector` which detects deadlocks between transactions. pub struct LockManager { waiter_mgr_worker: Option>, detector_worker: Option >, waiter_mgr_scheduler: WaiterMgrScheduler, detector_scheduler: DetectorScheduler, waiter_count: Arc<AtomicUsize>, /// Record transactions which have sent requests to detect deadlock. detected: Arc<[CachePadded<Mutex<HashSet<TimeStamp>>>]>, pipelined: Arc<AtomicBool>,
impl LockManagerTrait for LockManager { fn wait_for( &self, start_ts: TimeStamp, cb: StorageCallback, pr: ProcessResult, lock: Lock, is_first_lock: bool, timeout: Option
, let timeout = match timeout { Some(t) => t, None => { cb.execute(pr); return; // Increase `waiter_count` here to prevent there is an on-the-fly WaitFor msg // but the waiter_mgr haven't processed it, subsequent WakeUp msgs may be lost. self.waiter_count.fetch_add(1, Ordering::SeqCst); self.waiter_mgr_scheduler .wait_for(start_ts, cb, pr, lock, timeout); // If it is the first lock the transaction tries to lock, it won't cause deadlock. if !is_first_lock { // 不是第一个锁的时候,不检测?问题是不加入这个锁信息。那这个锁等待不会被以后的锁检查 self.add_to_detected(start_ts); self.detector_scheduler.detect(start_ts, lock); // 这里检测一下。 fn wake_up( &self, lock_ts: TimeStamp, hashes: Vec<u64>, commit_ts: TimeStamp, is_pessimistic_txn: bool, // If `hashes` is some, there may be some waiters waiting for these locks. // Try to wake up them. if !hashes.is_empty() && self.has_waiter() { self.waiter_mgr_scheduler .wake_up(lock_ts, hashes, commit_ts); // If a pessimistic transaction is committed or rolled back and it once sent requests to // detect deadlock, clean up its wait-for entries in the deadlock detector. if is_pessimistic_txn && self.remove_from_detected(lock_ts) { self.detector_scheduler.clean_up(lock_ts); fn has_waiter(&self) -> bool { self.waiter_count.load(Ordering::SeqCst) > 0