fabric 2.0, orderer, creating a new chain
作者:互联网
Use Solo Consensus as example.
Broadcast service
Orderer’s Broadcast service will take transactions from RPC.
Solo consenter
Solo worker is waiting for new messages and then packing them into a block. In the end, it will invoke ConsenterSupport.WriteConfigBlock to make a new chain and then ‘go WriteBlock()’
in my case: file is located at: \var\hyperledger\production\orderer\chains\test5/blockfile_000000
In addBlock(), block data will be appended to the disk file, and index data of this block will be put onto leveldb with 4 indices as below:
//Index1
if index.isAttributeIndexed(blkstorage.IndexableAttrBlockHash) {
batch.Put(constructBlockHashKey(blkHash), flpBytes)
}
//Index2
if index.isAttributeIndexed(blkstorage.IndexableAttrBlockNum) {
batch.Put(constructBlockNumKey(blkNum), flpBytes)
}
//Index3 Used to find a transaction by its transaction id
if index.isAttributeIndexed(blkstorage.IndexableAttrTxID) {
for i, txoffset := range txOffsets {
txFlp := newFileLocationPointer(flp.fileSuffixNum, flp.offset, txoffset.loc)
logger.Debugf("Adding txLoc [%s] for tx ID: [%s] to txid-index", txFlp, txoffset.txID)
txFlpBytes, marshalErr := txFlp.marshal()
if marshalErr != nil {
return marshalErr
}
indexVal := &msgs.TxIDIndexValProto{
BlkLocation: flpBytes,
TxLocation: txFlpBytes,
TxValidationCode: int32(txsfltr.Flag(i)),
}
indexValBytes, err := proto.Marshal(indexVal)
if err != nil {
return errors.Wrap(err, "unexpected error while marshaling TxIDIndexValProto message")
}
batch.Put(
constructTxIDKey(txoffset.txID, blkNum, uint64(i)),
indexValBytes,
)
}
}
//Index4 - Store BlockNumTranNum will be used to query history data
if index.isAttributeIndexed(blkstorage.IndexableAttrBlockNumTranNum) {
for i, txoffset := range txOffsets {
txFlp := newFileLocationPointer(flp.fileSuffixNum, flp.offset, txoffset.loc)
logger.Debugf("Adding txLoc [%s] for tx number:[%d] ID: [%s] to blockNumTranNum index", txFlp, i, txoffset.txID)
txFlpBytes, marshalErr := txFlp.marshal()
if marshalErr != nil {
return marshalErr
}
batch.Put(constructBlockNumTranNumKey(blkNum, uint64(i)), txFlpBytes)
}
}
call stack
THis is an example of creating a new channel, similar with Normal transaction messages:
github.com/hyperledger/fabric/common/ledger/blkstorage/fsblkstorage.(*blockfileMgr).addBlock at blockfile_mgr.go:298
github.com/hyperledger/fabric/common/ledger/blkstorage/fsblkstorage.(*fsBlockStore).AddBlock at fs_blockstore.go:54
github.com/hyperledger/fabric/common/ledger/blockledger/fileledger.(*FileLedger).Append at impl.go:107
github.com/hyperledger/fabric/orderer/common/multichannel.(*Registrar).newChain at registrar.go:336
github.com/hyperledger/fabric/orderer/common/multichannel.(*BlockWriter).WriteConfigBlock at blockwriter.go:118
github.com/hyperledger/fabric/orderer/consensus/solo.(*chain).main at consensus.go:158
runtime.goexit at asm_amd64.s:1357
- Async stack trace
github.com/hyperledger/fabric/orderer/consensus/solo.(*chain).Start at consensus.go:56
newChain
newChain() makes a new chain, updates Registar.chains map and starts chain worker.
func (r *Registrar) newChain(configtx *cb.Envelope) {
r.lock.Lock()
defer r.lock.Unlock()
ledgerResources := r.newLedgerResources(configtx)
// If we have no blocks, we need to create the genesis block ourselves.
if ledgerResources.Height() == 0 {
ledgerResources.Append(blockledger.CreateNextBlock(ledgerResources, []*cb.Envelope{configtx}))
}
// Copy the map to allow concurrent reads from broadcast/deliver while the new chainSupport is
newChains := make(map[string]*ChainSupport)
for key, value := range r.chains {
newChains[key] = value
}
cs := newChainSupport(r, ledgerResources, r.consenters, r.signer, r.blockcutterMetrics, r.bccsp)
chainID := ledgerResources.ConfigtxValidator().ChannelID()
logger.Infof("Created and starting new channel %s", chainID)
newChains[string(chainID)] = cs
cs.start()
r.chains = newChains
}
BlockWriter addBlock():
func (mgr *blockfileMgr) addBlock(block *common.Block) error {
bcInfo := mgr.getBlockchainInfo()
if block.Header.Number != bcInfo.Height {
return errors.Errorf(
"block number should have been %d but was %d",
mgr.getBlockchainInfo().Height, block.Header.Number,
)
}
// Add the previous hash check - Though, not essential but may not be a bad idea to
// verify the field `block.Header.PreviousHash` present in the block.
// This check is a simple bytes comparison and hence does not cause any observable performance penalty
// and may help in detecting a rare scenario if there is any bug in the ordering service.
if !bytes.Equal(block.Header.PreviousHash, bcInfo.CurrentBlockHash) {
return errors.Errorf(
"unexpected Previous block hash. Expected PreviousHash = [%x], PreviousHash referred in the latest block= [%x]",
bcInfo.CurrentBlockHash, block.Header.PreviousHash,
)
}
blockBytes, info, err := serializeBlock(block)
if err != nil {
return errors.WithMessage(err, "error serializing block")
}
blockHash := protoutil.BlockHeaderHash(block.Header)
//Get the location / offset where each transaction starts in the block and where the block ends
txOffsets := info.txOffsets
currentOffset := mgr.cpInfo.latestFileChunksize
blockBytesLen := len(blockBytes)
blockBytesEncodedLen := proto.EncodeVarint(uint64(blockBytesLen))
totalBytesToAppend := blockBytesLen + len(blockBytesEncodedLen)
//Determine if we need to start a new file since the size of this block
//exceeds the amount of space left in the current file
if currentOffset+totalBytesToAppend > mgr.conf.maxBlockfileSize {
mgr.moveToNextFile()
currentOffset = 0
}
//append blockBytesEncodedLen to the file
err = mgr.currentFileWriter.append(blockBytesEncodedLen, false)
if err == nil {
//append the actual block bytes to the file
err = mgr.currentFileWriter.append(blockBytes, true)
}
if err != nil {
truncateErr := mgr.currentFileWriter.truncateFile(mgr.cpInfo.latestFileChunksize)
if truncateErr != nil {
panic(fmt.Sprintf("Could not truncate current file to known size after an error during block append: %s", err))
}
return errors.WithMessage(err, "error appending block to file")
}
//Update the checkpoint info with the results of adding the new block
currentCPInfo := mgr.cpInfo
newCPInfo := &checkpointInfo{
latestFileChunkSuffixNum: currentCPInfo.latestFileChunkSuffixNum,
latestFileChunksize: currentCPInfo.latestFileChunksize + totalBytesToAppend,
isChainEmpty: false,
lastBlockNumber: block.Header.Number}
//save the checkpoint information in the database
if err = mgr.saveCurrentInfo(newCPInfo, false); err != nil {
truncateErr := mgr.currentFileWriter.truncateFile(currentCPInfo.latestFileChunksize)
if truncateErr != nil {
panic(fmt.Sprintf("Error in truncating current file to known size after an error in saving checkpoint info: %s", err))
}
return errors.WithMessage(err, "error saving current file info to db")
}
//Index block file location pointer updated with file suffex and offset for the new block
blockFLP := &fileLocPointer{fileSuffixNum: newCPInfo.latestFileChunkSuffixNum}
blockFLP.offset = currentOffset
// shift the txoffset because we prepend length of bytes before block bytes
for _, txOffset := range txOffsets {
txOffset.loc.offset += len(blockBytesEncodedLen)
}
//save the index in the database
if err = mgr.index.indexBlock(&blockIdxInfo{
blockNum: block.Header.Number, blockHash: blockHash,
flp: blockFLP, txOffsets: txOffsets, metadata: block.Metadata}); err != nil {
return err
}
//update the checkpoint info (for storage) and the blockchain info (for APIs) in the manager
mgr.updateCheckpoint(newCPInfo)
mgr.updateBlockchainInfo(blockHash, block)
return nil
}
new chain’s genesis block will be appended to system channel
Note, in the end of WriteConfigBlock, this channel genesis block will be written to system channel as well.
bw.WriteBlock(block, encodedMetadataValue)
标签:fabric,err,nil,mgr,orderer,file,new,chain,block 来源: https://blog.csdn.net/m0_37889044/article/details/104677720