在基於Raft的共識演算法中,在Raft和Ethereum節點之間做了一對一的對應關係,每個Ethereum節點也是Raft節點,並且按照約定,Raft叢集的Leader是產生新塊的唯一Ethereum節點。這個Leader負責將交易打包成一個區塊,但不提供工作量證明(PoW)。
在這裡把Leader和產生新塊的節點繫結到一起的主要原因有兩點:第一是為了方便,因為Raft確保一次只有一個Leader,第二是為了避免從節點建立新塊到Leader的網路跳轉,所有的Raft寫入操作都必須透過該跳轉。Quorum的實現關注Raft Leader的變化——如果一個節點成為Leader,它將開始產生新塊,如果一個節點失去Leader地位,它將停止產生新塊。
在Raft的Leader轉換期間,其中有一小段時間,有多個節點可能假定自己具有產生新塊的職責,本文稍後將更詳細地描述如何保持正確性。
Quorum使用現有的Etherum P2P傳輸層來負責在節點之間的通訊,但是隻透過Raft的傳輸層來傳輸Block。這些Block是由Leader創造的,並從那裡傳輸到叢集的其餘部分,總是以相同的順序透過Raft傳輸。
當Leader建立新塊時,不像在Ethereum中,塊被寫入資料庫並立即成為鏈的新Head,只在新塊透過Raft傳輸之後才插入塊或將其設定為鏈的新Head。所有節點都會在鎖定步驟中將鏈擴充套件到新的狀態,就好像是他們在Raft中同步日誌。
從Ethereum的角度來說,Raft是透過實現 node/service.go 檔案中的 Service 介面而整合的。一個獨立的協議可以透過這個 Service 介面,註冊到節點裡面。
// quorum/cmd/geth/config.go func makeFullNode(ctx *cli.Context) *node.Node { if ctx.GlobalBool(utils.RaftModeFlag.Name) { // 在這裡判斷,如果是raft mode,則註冊raft service RegisterRaftService(stack, ctx, cfg, ethChan) } } func RegisterRaftService(stack *node.Node, ctx *cli.Context, cfg gethConfig, ethChan <-chan *eth.Ethereum) { // 在這裡把raft service註冊到node裡面去 if err := stack.Register(func(ctx *node.ServiceContext) (node.Service, error) { // 呼叫raft.New建立raft service,這個RaftService實現了node.Service介面 return raft.New(ctx, ethereum.ChainConfig(), myId, raftPort, joinExisting, blockTimeNanos, ethereum, peers, datadir) }); err != nil { } |
// quorum/core/events.go type NewMinedBlockEvent struct{ Block *types.Block } |
// quorum/raft/handler.go func (pm *ProtocolManager) Start(p2pServer *p2p.Server) { pm.p2pServer = p2pServer pm.minedBlockSub = pm.eventMux.Subscribe(core.NewMinedBlockEvent{}) pm.startRaft() go pm.minedBroadcastLoop() } // quorum/miner/worker.go func (self *worker) wait() { for { mustCommitNewWork := true for result := range self.recv { // Broadcast the block and announce chain insertion event self.mux.Post(core.NewMinedBlockEvent{Block: block}) } } } // quorum/raft/handler.go func (pm *ProtocolManager) minedBroadcastLoop() { for obj := range pm.minedBlockSub.Chan() { switch ev := obj.Data.(type) { case core.NewMinedBlockEvent: select { case pm.blockProposalC <- ev.Block: case <-pm.quitSync: return } } } } |
// quorum/raft/handler.go func (pm *ProtocolManager) serveLocalProposals() { for { select { case block, ok := <-pm.blockProposalC: size, r, err := rlp.EncodeToReader(block) var buffer = make([]byte, uint32(size)) r.Read(buffer) // blocks until accepted by the raft state machine pm.rawNode().Propose(context.TODO(), buffer) } } } |
在任意一個節點上
· 到了這個時間點,Raft協議會達成共識並且把包含新塊的日誌記錄新增到Raft日誌之中。Raft完成這一步是透過Leader傳送AppendEntries給所有的Follower,並且所有的Follower對這個訊息進行確認。一旦Leader收到了超過半數的確認訊息,它就通知每一個節點,這個新的日誌已經被永久性的寫入日誌。
· 這個新塊透過Raft傳輸到整個網路之後,到達了eventLoop,在這裡處理Raft的新日誌項。他們從Leader透過pm.transport(rafthttp.Transport的一個instance)到達。
// quorum/raft/handler.go func (pm *ProtocolManager) eventLoop() { for { select { case <-ticker.C: // when the node is first ready it gives us entries to commit and messages // to immediately publish case rd := <-pm.rawNode().Ready(): // 1: Write HardState, Entries, and Snapshot to persistent storage if they // are not empty. pm.raftStorage.Append(rd.Entries) // 2: Send all Messages to the nodes named in the To field. pm.transport.Send(rd.Messages) // 3: Apply Snapshot (if any) and CommittedEntries to the state machine. for _, entry := range pm.entriesToApply(rd.CommittedEntries) { switch entry.Type { case raftpb.EntryNormal: var block types.Block err := rlp.DecodeBytes(entry.Data, &block) if pm.blockchain.HasBlock(block.Hash(), block.NumberU64()) { } else { pm.applyNewChainHead(&block) } pm.advanceAppliedIndex(entry.Index) } case <-pm.quitSync: return } } } |
// quorum/raft/handler.go func (pm *ProtocolManager) applyNewChainHead(block *types.Block) { if !blockExtendsChain(block, pm.blockchain) { headBlock := pm.blockchain.CurrentBlock() pm.minter.invalidRaftOrderingChan <- InvalidRaftOrdering{headBlock: headBlock, invalidBlock: block} } else { if existingBlock := pm.blockchain.GetBlockByHash(block.Hash()); nil == existingBlock { if err := pm.blockchain.Validator().ValidateBody(block); err != nil { panic(fmt.Sprintf("failed to validate block %x (%v)", block.Hash(), err)) } } _, err := pm.blockchain.InsertChain([]*types.Block{block}) } } // quorum/core/blockchain.go func (bc *BlockChain) InsertChain(chain types.Blocks) (int, error) { n, events, logs, err := bc.insertChain(chain) bc.PostChainEvents(events, logs) return n, err } |
// quorum/core/blockchain.go func (bc *BlockChain) PostChainEvents(events []interface{}, logs []*types.Log) { for _, event := range events { switch ev := event.(type) { case ChainEvent: bc.chainFeed.Send(ev) case ChainHeadEvent: bc.chainHeadFeed.Send(ev) case ChainSideEvent: bc.chainSideFeed.Send(ev) } } } |
一旦網路分割槽癒合, 在Raft層節點1將重新提交0x90f72a, 結果序列化日誌可能看起來如下:
· 0xea097c Parent: 0xacaa - 挖礦成功
· [ 0xa12345 Parent: 0xea097c - 挖礦成功 ] (節點2; 競賽獲勝者)
· 0x69c92376 Parent: 0xa12345 - 挖礦成功
· 0xb7239ae Parent: 0x69c92376 - 挖礦成功
· [ 0x90f72a Parent: 0xa12345 - 挖礦失敗,沒有操作 ] (節點1; 競賽失敗者)
· 0x73a896c Parent: 0xb7239ae - 挖礦成功
由於被序列化後的"贏家"將會延長鏈, 所以"失敗者"將不會延長鏈, 因為它的parent(0xea097c)已經不是鏈的head了, 競賽"獲勝者"已經提前延長了同一個parent(0xa12345),然後0xb7239ae進一步延長了它。
請注意, 每個塊都被Raft接受並在日誌中序列化, 並且這個失敗者的延長被"忽略"。從Raft的角度來看, 每個日誌條目都是有效的, 但在Quorum-Raft的角度看, 將會選擇使用哪些條目作為有效條目, 並且在實際上將延長鏈。此鏈的延長邏輯是確定性的: 在群集中的每個節點上都會發生相同的精確行為, 從而保持blockchain同步。
還要注意Quorum的方法不同於Ethereum的"最長有效鏈"(LVC:Longest Valid Chain)機制。LVC用於在最終一致的網路中解決分叉問題。因為Quorum使用Raft, blockchain的狀態是保持一致的。Raft設定中不能分叉。一旦一個塊被新增為新的鏈Head, 對於整個叢集來說都是這樣的,而且它是永久性的。
建立新塊的頻率
預設情況下, 建立新塊的頻率是50ms。當新的交易來了, 將立即建立一個新塊(所以延遲時間很低), 但是新塊的建立時間至少也是上一個塊建立的50ms之後。這樣的頻率是在交易速度和延遲之間獲取一個平衡。
50ms這個頻率是可以透過引數--raftblocktime配置。
預測挖礦
Quorum的方法不同於Ethereum的方法之一,是引入了一個新的概念"預測挖礦"。對基於Raft的Quorum的共識演算法來說, 這並不是嚴格要求的, 而是一個最佳化, 它提供了降低建立新塊之間的時間延遲,或者說是更快的最終確認時間。
透過基於Raft的共識演算法,新塊可以更快的成為鏈的Head。如果在建立新塊之前,所有的節點都同步等待上一個塊成為新的鏈頭,那麼這個叢集收到的任何交易都需要更多的時間才能使其進入鏈。
在預測挖礦中,我們允許一個parent塊透過Raft進入塊鏈之前,建立一個新塊。
由於這個過程可能重複發生,這些塊(每個都有一個對其父塊的引用)可以形成一種鏈。稱之為“預測鏈”。
在預測鍊形成的過程中,Quorum會持續跟蹤交易池中的事務子集,這些事務子集已經加入到塊中,只是這些塊還沒有放入到鏈中而是在預測鏈中)。
由於競賽的存在(如我們上面所詳細描述的),有可能預測鏈的中間某些區塊最終不會進入到鏈。在這種情況下,將會觸發一個InvalidRaftOrdering事件,並且相應地清理預測鏈的狀態。
這些預測鏈的長度目前還沒有限制,但在未來可能會增加對這一點的支援。
預測鏈的狀態
· head:這是最後一個建立的預測區塊,如果最後一個建立的block已經包含在區塊鏈中,這個值可以是nil
· proposedTxes:這是一個交易的集合,這些交易已經被打包到一個block中,並且這個block已經提交到Raft協議,但是這個block還沒有加入到鏈中
· unappliedBlocks:這是一個block的佇列,這些block已經提交到Raft協議,但是這些block還沒有加入到鏈中
· 當建立一個新塊的時候,這個新塊會被新增到這個佇列的尾部
· 當一個新塊被新增到鏈中以後,accept方法會被呼叫來把這個blokc從這個佇列刪除
· 當一個InvalidRaftOrdering事件發生的時候,透過從佇列的“最新的末尾”彈出最新的塊,直到找到無效的塊來展開佇列。我們必須重複地刪除這些“新”的預測塊,因為它們都依賴於一個沒有被包括在鏈中的block。
· expectedInvalidBlockHashes:在無效塊上建立的一組塊,但尚未透過Raft傳遞。這些塊要被刪除。當這些不延伸的塊透過Raft回來時,會把它們從預測鏈中移除。在不應該去嘗試預測鏈的時候,這一套方法就成為一種保護機制。
Raft傳輸層
Quorum透過Raft(etcd實現)內建的HTTP傳輸方法來傳輸block,從理論上來說,使用Ethereum的P2P網路來作為Raft的傳輸層也是可以的。在實際的測試中,在高負載的情況下,Raft內建的HTTP傳輸方法比geth中內建的P2P網路更為可靠。
在預設情況下,Quorum監聽50400埠,這個也可以透過--raftport引數來做配置。
預設的peers數量被設定為25。最大的peers數量可以透過--maxpeers來做配置,這個數量也是整個叢集的數量。
初始化配置
當前基於Raft的共識演算法,要求所有的初始節點都要配置為把前面所有的其他節點都作為靜態節點對待。對每一個節點來說,這些靜態節點的URI必須包含在raftport引數中,比如:enode://admin@chaindaily:30400?raftport=50400
注意:所有節點的static-nodes.json檔案中,enodes的順序必須保持一致。
想要從一個叢集中刪除一個節點,那就進入JavaScript控制檯,執行命令:raft.removePeer(raftId),這個raftId就是你想要刪除的節點id。對於初始節點來說,這個id是在靜態節點列表中的索引值,這個索引值是從1開始的(不是從0開始)。一旦一個節點從叢集中刪除了,這個是永久性的刪除。這個raftId在將來也不能夠使用。如果這個節點想要再次加入叢集,那麼它必須使用一個新的raftId。
想要把一個節點加入到叢集,那就進入JavaScript控制檯,執行raft.addPeer(enodeId)命令。就像enode ID需要包含在靜態節點JSON檔案中一樣,這個enode ID也必須要包含在raftport引數中。這個命令會分配一個新的raftID,並且返回。成功執行addPeer命令之後,就可以啟動一個新的geth節點,並且新增引數 --raftjoinexisting RAFTID
小結
透過這篇文章對Quorum共識機制的介紹,我們可以看到,Quorum對於適合於自己的目標場景有著非常清晰的理解和認識,把Ethereum原生的PoW修改成Raft,從而打造出適用於企業級的聯盟鏈平臺。
來源:360區塊鏈實驗室
更多區塊鏈資訊:www.qukuaiwang.com.cn/news