其他分享
首页 > 其他分享> > TinyKV lab4完成总结

TinyKV lab4完成总结

作者:互联网

lab4是实现一个基于Percolator模型的分布式事务。

lab4a

lab4a是介绍三种Column Family的作用和协作,分别包括CfDefault, CfWrite, CfLock。
CfDefault是暂存实际数据的cf, CfWrite是写入时间的记录,CfLock是该数据的锁。数据版本(事务)由一个全局递增的ts表示,ts越大越新,数据在数据库中的排序是先按Key由小到大再按ts由大到小。
实现除GetValue的方法直接创建一个结构体写入txn.writes就行。
GetValue比较复杂,因为读事务只能看到它之前已经提交的事务。首先我们要先看有没有CfWrite的记录,如果有,我们还要看这个写入是Delete还是Put,如果是Put,那么说明有这个值,然后我们再对这个写入时间直接在CfDefault这个列族中找数据即可。

// GetValue finds the value for key, valid at the start timestamp of this transaction.
// I.e., the most recent value committed before the start of this transaction.
func (txn *MvccTxn) GetValue(key []byte) ([]byte, error) {
	// Your Code Here (4A).
	iter := txn.Reader.IterCF(engine_util.CfWrite)
	defer iter.Close()
	iter.Seek(EncodeKey(key, txn.StartTS))
	for ; iter.Valid(); iter.Next() {
		uKey := DecodeUserKey(iter.Item().Key())
		if bytes.Equal(key, uKey) {
			break
		}
	}
	if !iter.Valid() {
		return nil, nil
	}
	v, err := iter.Item().Value()
	if err != nil {
		return nil, err
	}
	w, err := ParseWrite(v)
	if err != nil {
		return nil, err
	}
	if w.Kind != WriteKindPut {
		return nil, nil
	}
	return txn.Reader.GetCF(engine_util.CfDefault, EncodeKey(key, w.StartTS))
}

MostRecentWrite和CurrentWrite就是找某个key的最新版本和当前事务是否写入,后者注意比较ts就好。

// CurrentWrite searches for a write with this transaction's start timestamp. It returns a Write from the DB and that
// write's commit timestamp, or an error.
func (txn *MvccTxn) CurrentWrite(key []byte) (*Write, uint64, error) {
	// Your Code Here (4A).
	iter := txn.Reader.IterCF(engine_util.CfWrite)
	defer iter.Close()
	for ; iter.Valid(); iter.Next() {
		rawKey := iter.Item().Key()
		uKey := DecodeUserKey(rawKey)
		if bytes.Equal(key, uKey) {
			v, err := iter.Item().Value()
			if err != nil {
				return nil, 0, err
			}
			w, err := ParseWrite(v)
			if err != nil {
				return nil, 0, err
			}
			if w.StartTS == txn.StartTS {
				return w, decodeTimestamp(rawKey), nil
			}
		}
	}
	return nil, 0, nil
}
// MostRecentWrite finds the most recent write with the given key. It returns a Write from the DB and that
// write's commit timestamp, or an error.
func (txn *MvccTxn) MostRecentWrite(key []byte) (*Write, uint64, error) {
	// Your Code Here (4A).
	iter := txn.Reader.IterCF(engine_util.CfWrite)
	defer iter.Close()
	for ; iter.Valid(); iter.Next() {
		rawKey := iter.Item().Key()
		uKey := DecodeUserKey(rawKey)
		if bytes.Equal(key, uKey) {
			v, err := iter.Item().Value()
			if err != nil {
				return nil, 0, err
			}
			w, err := ParseWrite(v)
			if err != nil {
				return nil, 0, err
			}
			return w, decodeTimestamp(rawKey), nil
		}
	}
	return nil, 0, nil
}

lab4b

lab4b是实现2PC模型。2PC模型分为两个阶段,一个是PreWrite,一个是Commit。
PreWrite的过程就是检查写冲突和检查锁冲突,如果都不冲突,再上锁。
检查写冲突的过程就是看CfWrite中有没有比这个事务更新的事务在写入这个key,如果有,显然不允许更旧的版本修改更新的版本,出现写冲突。检查锁冲突的过程就是看有没有版本更老的锁,如果有,则说明这个key被更早的事务锁住了,就不能修改别的锁占用的key。如果都没有,我们对这个key上锁。
Commit的过程就是写入数据库和解锁。Commit写入数据库和解锁必须是原子性的,我们需要用系统提供的锁保证这一点,首先检查所有的key均已经上该版本的锁,如果没有,我们不能提交没有上锁的key。然后记录写入时间,最后写入数据和解锁,再释放系统提供的原子性的锁。如果Commit出现失败,我们可以让client在未来重试成功(因为此时锁一定在更后的版本)。

func (server *Server) KvPrewrite(_ context.Context, req *kvrpcpb.PrewriteRequest) (*kvrpcpb.PrewriteResponse, error) {
	// Your Code Here (4B).
	reader, err := server.storage.Reader(req.Context)
	if err != nil {
		if regionErr, ok := err.(*raft_storage.RegionError); ok {
			return &kvrpcpb.PrewriteResponse{RegionError: regionErr.RequestErr}, err
		}
		return nil, err
	}
	defer reader.Close()
	resp := &kvrpcpb.PrewriteResponse{}
	txn := mvcc.NewMvccTxn(reader, req.StartVersion)

	for _, mutation := range req.Mutations {
		w, ts, err := txn.MostRecentWrite(mutation.Key)
		if err != nil {
			if regionErr, ok := err.(*raft_storage.RegionError); ok {
				return &kvrpcpb.PrewriteResponse{RegionError: regionErr.RequestErr}, err
			}
			return nil, err
		}
		if w != nil && ts >= req.StartVersion {
			resp.Errors = append(resp.Errors, &kvrpcpb.KeyError{
				Conflict: &kvrpcpb.WriteConflict{
					StartTs:    req.StartVersion,
					ConflictTs: ts,
					Key:        mutation.Key,
					Primary:    req.PrimaryLock,
				},
			})
			continue
		}
		lock, err := txn.GetLock(mutation.Key)
		if err != nil {
			if regionErr, ok := err.(*raft_storage.RegionError); ok {
				return &kvrpcpb.PrewriteResponse{RegionError: regionErr.RequestErr}, err
			}
			return nil, err
		}
		if lock != nil && lock.Ts < req.StartVersion {
			resp.Errors = append(resp.Errors, &kvrpcpb.KeyError{
				Locked: &kvrpcpb.LockInfo{
					PrimaryLock: req.PrimaryLock,
					LockVersion: lock.Ts,
					Key:         mutation.Key,
					LockTtl:     lock.Ttl,
				},
			})
			continue
		}
		switch mutation.Op {
		case kvrpcpb.Op_Put:
			txn.PutValue(mutation.Key, mutation.Value)
			txn.PutLock(mutation.Key, &mvcc.Lock{
				Primary: req.PrimaryLock,
				Ts:      req.StartVersion,
				Ttl:     req.LockTtl,
				Kind:    mvcc.WriteKindPut,
			})
		case kvrpcpb.Op_Del:
			txn.DeleteValue(mutation.Key)
			txn.PutLock(mutation.Key, &mvcc.Lock{
				Primary: req.PrimaryLock,
				Ts:      req.StartVersion,
				Ttl:     req.LockTtl,
				Kind:    mvcc.WriteKindDelete,
			})
		}
	}
	if len(resp.Errors) != 0 {
		return resp, nil
	}
	err = server.storage.Write(req.Context, txn.Writes())
	if err != nil {
		if regionErr, ok := err.(*raft_storage.RegionError); ok {
			return &kvrpcpb.PrewriteResponse{RegionError: regionErr.RequestErr}, err
		}
		return nil, err
	}
	return resp, nil
}

func (server *Server) KvCommit(_ context.Context, req *kvrpcpb.CommitRequest) (*kvrpcpb.CommitResponse, error) {
	// Your Code Here (4B).
	reader, err := server.storage.Reader(req.Context)
	if err != nil {
		if regionErr, ok := err.(*raft_storage.RegionError); ok {
			return &kvrpcpb.CommitResponse{RegionError: regionErr.RequestErr}, err
		}
		return nil, err
	}
	defer reader.Close()
	txn := mvcc.NewMvccTxn(reader, req.StartVersion)
	server.Latches.WaitForLatches(req.Keys)
	defer server.Latches.ReleaseLatches(req.Keys)
	for _, key := range req.Keys {
		lock, err := txn.GetLock(key)
		if err != nil {
			if regionErr, ok := err.(*raft_storage.RegionError); ok {
				return &kvrpcpb.CommitResponse{RegionError: regionErr.RequestErr}, err
			}
			return nil, err
		}
		if lock == nil {
			return &kvrpcpb.CommitResponse{}, nil
		}
		if lock.Ts != req.StartVersion {
			return &kvrpcpb.CommitResponse{
				Error: &kvrpcpb.KeyError{
					Retryable: "true",
				},
			}, nil
		}
		txn.PutWrite(key, req.CommitVersion, &mvcc.Write{
			StartTS: req.StartVersion,
			Kind:    lock.Kind,
		})
		txn.DeleteLock(key)
	}
	err = server.storage.Write(req.Context, txn.Writes())
	if err != nil {
		if regionErr, ok := err.(*raft_storage.RegionError); ok {
			return &kvrpcpb.CommitResponse{RegionError: regionErr.RequestErr}, err
		}
		return nil, err
	}
	return &kvrpcpb.CommitResponse{}, nil
}

lab4c

lab4c主要实现失败之后是回滚还是继续提交。
checkTxnStatus函数检查事务状态和回滚失效的锁,事务状态直接通过CurrentWrite检查,如果不存在,说明没有写入,事务已经回滚,我们就回收锁,锁有可能是已经被清除了,或者是过期了,我们要针对这两种情况分别处理并写入回滚时间标记,如果存在,说明已经提交,不需要处理,在返回中填上写入版本即可。

func (txn *MvccTxn) RollBack(key []byte, Locked bool) {
	txn.PutWrite(key, txn.StartTS, &Write{
		StartTS: txn.StartTS,
		Kind:    WriteKindRollback,
	})
	txn.DeleteValue(key)
	if Locked {
		txn.DeleteLock(key)
	}
}
func (server *Server) KvCheckTxnStatus(_ context.Context, req *kvrpcpb.CheckTxnStatusRequest) (*kvrpcpb.CheckTxnStatusResponse, error) {
	// Your Code Here (4C).
	reader, err := server.storage.Reader(req.Context)
	if err != nil {
		if regionErr, ok := err.(*raft_storage.RegionError); ok {
			return &kvrpcpb.CheckTxnStatusResponse{RegionError: regionErr.RequestErr}, err
		}
		return nil, err
	}
	defer reader.Close()
	txn := mvcc.NewMvccTxn(reader, req.LockTs)
	w, ts, err := txn.CurrentWrite(req.PrimaryKey)
	if err != nil {
		if regionErr, ok := err.(*raft_storage.RegionError); ok {
			return &kvrpcpb.CheckTxnStatusResponse{RegionError: regionErr.RequestErr}, err
		}
		return nil, err
	}
	if w != nil {
		resp := &kvrpcpb.CheckTxnStatusResponse{}
		if w.Kind != mvcc.WriteKindRollback {
			resp.CommitVersion = ts
		}
		return resp, nil
	}
	lock, err := txn.GetLock(req.PrimaryKey)
	if err != nil {
		if regionErr, ok := err.(*raft_storage.RegionError); ok {
			return &kvrpcpb.CheckTxnStatusResponse{RegionError: regionErr.RequestErr}, err
		}
		return nil, err
	}
	resp := &kvrpcpb.CheckTxnStatusResponse{}
	if lock == nil {
		txn.RollBack(req.PrimaryKey, false)
		resp.LockTtl = 0
		resp.CommitVersion = 0
		resp.Action = kvrpcpb.Action_LockNotExistRollback
		if server.storage.Write(req.Context, txn.Writes()) != nil {
			if regionErr, ok := err.(*raft_storage.RegionError); ok {
				return &kvrpcpb.CheckTxnStatusResponse{RegionError: regionErr.RequestErr}, err
			}
			return nil, err
		}
		return resp, nil
	} else if mvcc.PhysicalTime(lock.Ts)+lock.Ttl < mvcc.PhysicalTime(req.CurrentTs) {
		txn.RollBack(req.PrimaryKey, true)
		resp.LockTtl = 0
		resp.CommitVersion = 0
		resp.Action = kvrpcpb.Action_TTLExpireRollback
		if server.storage.Write(req.Context, txn.Writes()) != nil {
			if regionErr, ok := err.(*raft_storage.RegionError); ok {
				return &kvrpcpb.CheckTxnStatusResponse{RegionError: regionErr.RequestErr}, err
			}
			return nil, err
		}
		return resp, nil
	}
	resp.LockTtl = lock.Ttl
	return resp, nil
}

batchRollback是回滚一批key的函数。如果发现某个key已经提交,则回滚失败,否则删除锁和kv。
resolveLock是根据CommitVersion对某个事务的key要么提交要么回滚。我们只需要检查锁的ts然后把满足条件的key放入一个数组然后判断CommitVersion即可。
一般来说,事务的发起者都是Client,多个Client有可能同时操作,这几个函数的关系是:一个Client进行PreWrite,这时因为有可能Client需要做其他操作或者已经宕机,或者有其他事务插队,所以执行CheckTxnStatus检查状态,然后得到CommitVersion,调用ResolveLock进行最后的提交或者回滚(作者自己的想法,欢迎指正)。

标签:总结,txn,return,err,nil,kvrpcpb,req,TinyKV,lab4
来源: https://www.cnblogs.com/mchxyz/p/16026636.html