Filecoin - Sector狀態管理邏輯

買賣虛擬貨幣
好久不看go語言的程式碼了,最近有空換換腦子,看看Sector的狀態管理,純go語言實現。看看大型專案的程式碼設計,對程式碼以及專案的設計開發有莫大的好處。Lotus的go語言部分的最新的程式碼,採用模組化設計。Lotus的程式碼也是一步步演進的,第一版的程式碼也是所有的邏輯耦合在一起,後面才逐步的模組化。Lotus Miner將使用者需要儲存的資料“打包”成一個個Sector,並對Sector進行處理。本文就講講Sector的狀態管理。1. 模組框架

Sector狀態管理相關的模組如下:

Sector的狀態管理基於狀態機。Lotus實現了通用的狀態機(statemachine),在go-statemachine包中。在StateMachine之上,進一步抽象出了StateGroup,管理多個狀態機。StateMachine只是實現狀態的轉變,具體狀態是透過go-statestore進行儲存。在StateMachine之上,定義狀態轉變的規則以及狀態對應的處理函式,這些就是在具體的業務。SectorState就是Lotus管理Sector的具體業務。在這些底層模組的基礎上,Lotus的相關程式碼呼叫就比較簡單。先從狀態管理的底層模組,StateMachine講起:

2. StateMachine

StateMachine定義在go-statemachine包中,machine.go:

 type StateMachine struct {
     planner  Planner
     eventsIn chan Event

     name      interface{}
     st        *statestore.StoredState
     stateType reflect.Type

     stageDone chan struct{}
     closing   chan struct{}
     closed    chan struct{}

     busy int32
 }

其中,planner是抽象出來的狀態機的狀態轉化函式。Planner接收Event,結合當前的狀態user,確定下一步的處理。

type Planner func(events []Event, user interface{}) (interface{}, uint64, error)

st是儲存的狀態。stateType是儲存狀態的型別。

StateMachine的核心是run函式,分成三部分:接收Event,狀態處理,下一步的呼叫。其中狀態處理是關鍵:

err := fsm.mutateUser(func(user interface{}) (err error) {
    nextStep, processed, err = fsm.planner(pendingEvents, user)
    ustate = user
    if xerrors.Is(err, ErrTerminated) {
        terminated = true
        return nil
    }
    return err
})

mutateUser就是檢視當前儲存的狀態(StoredState),並執行planner函式,並將planner處理後的狀態儲存。planner函式返回下一步的處理,已經處理的Event 的個數,有可能的出錯。run函式會啟動另外一個go routine執行nextStep。

mutateUser具體的實現是利用go的反射(reflect),實現抽象和模組化。相關的邏輯感興趣的小夥伴可以自己檢視。

3. StateGroup

往往業務需要很多StateMachine。舉個例子,Lotus的Miner儲存多個Sector,每個Sector都由一個StateMachine維護獨立的狀態。StateGroup就是實現多個“同樣”的StateMachine,定義在group.go中。

 type StateGroup struct {
     sts       *statestore.StateStore
     hnd       StateHandler
     stateType reflect.Type

     closing      chan struct{}
     initNotifier sync.Once

     lk  sync.Mutex
     sms map[datastore.Key]*StateMachine
 }

其中,sms就是StateMachine的狀態陣列。StateHandler是StateMachine的狀態處理函式介面:

 type StateHandler interface {
     Plan(events []Event, user interface{}) (interface{}, uint64, error)
 }

也就是說,在StateMachine狀態機上,需要實現StateHandler介面(Plan函式,實現狀態的轉換)。

4. StoredState

StoredState是抽象的Key-Value的儲存。相對比較簡單,Get/Put介面,非常容易理解。

5. SectorState

storage-fsm實現了和Sector狀態相關的業務邏輯。也就是狀態的定義,狀態的轉換函式都是在這個包裡實現。整個Sector的資訊定義在storage-fsm/types.go中:

type SectorInfo struct {
     State        SectorState
     SectorNumber abi.SectorNumber
     Nonce        uint64
     SectorType abi.RegisteredProof
     // Packing
     Pieces []Piece
     // PreCommit1
     TicketValue   abi.SealRandomness
     TicketEpoch   abi.ChainEpoch
     PreCommit1Out storage.PreCommit1Out
     // PreCommit2
     CommD *cid.Cid
     CommR *cid.Cid
     Proof []byte
     PreCommitMessage *cid.Cid
     // WaitSeed
     SeedValue abi.InteractiveSealRandomness
     SeedEpoch abi.ChainEpoch
     // Committing
     CommitMessage *cid.Cid
     InvalidProofs uint64
     // Faults
     FaultReportMsg *cid.Cid
     // Debug
     LastErr string
     Log []Log
 }

SectorInfo包括了Sector狀態,Precommit1/2的資料,Committing的資料等等。其中,SectorState描述了Sector的具體狀態。Sector的所有的狀態定義在sector_state.go檔案中:

 const (
     UndefinedSectorState SectorState = ""

     // happy path
     Empty          SectorState = "Empty"
     Packing        SectorState = "Packing"       // sector not in sealStore, and not on chain
     PreCommit1     SectorState = "PreCommit1"    // do PreCommit1
     PreCommit2     SectorState = "PreCommit2"    // do PreCommit1
     PreCommitting  SectorState = "PreCommitting" // on chain pre-commit
     WaitSeed       SectorState = "WaitSeed"      // waiting for seed
     Committing     SectorState = "Committing"
     CommitWait     SectorState = "CommitWait" // waiting for message to land on chain
     FinalizeSector SectorState = "FinalizeSector"
     Proving        SectorState = "Proving"
     // error modes
     FailedUnrecoverable SectorState = "FailedUnrecoverable"
     SealFailed          SectorState = "SealFailed"
     PreCommitFailed     SectorState = "PreCommitFailed"
     ComputeProofFailed  SectorState = "ComputeProofFailed"
     CommitFailed        SectorState = "CommitFailed"
     PackingFailed       SectorState = "PackingFailed"
     Faulty              SectorState = "Faulty"        // sector is corrupted or gone for some reason
     FaultReported       SectorState = "FaultReported" // sector has been declared as a fault on chain
     FaultedFinal        SectorState = "FaultedFinal"  // fault declared on chain
 )

熟悉SDR演算法的小夥伴,會發現很多熟悉的字眼:PreCommit1,PreCommit2,Commiting等等。結合狀態處理函式,就能很清楚各個狀態的含義以及需要處理的內容。

fsm.go中Sealing結構的Plan函式是Sector狀態的處理函式:

 func (m *Sealing) Plan(events []statemachine.Event, user interface{}) (interface{}, uint64, error) {
     next, err := m.plan(events, user.(*SectorInfo))
     if err != nil || next == nil {
         return nil, uint64(len(events)), err
     }

     return func(ctx statemachine.Context, si SectorInfo) error {
         err := next(ctx, si)
         if err != nil {
             log.Errorf("unhandled sector error (%d): %+v", si.SectorNumber, err)
             return nil
         }

         return nil
     }, uint64(len(events)), nil // TODO: This processed event count is not very correct
 }

Plan函式的實現也比較簡單,呼叫plan處理當前的狀態,並返回接下來需要處理的函式。狀態的處理又可以分成兩部分來看:1/ 狀態轉換的定義 2/ 狀態的處理。

針對Sector的狀態轉換的定義,在fsmPlanners中:

 var fsmPlanners = map[SectorState]func(events []statemachine.Event, state *SectorInfo) error{
     UndefinedSectorState: planOne(on(SectorStart{}, Packing)),
     Packing:              planOne(on(SectorPacked{}, PreCommit1)),
     PreCommit1: planOne(
         on(SectorPreCommit1{}, PreCommit2),
         on(SectorSealPreCommitFailed{}, SealFailed),
         on(SectorPackingFailed{}, PackingFailed),
     ),
     PreCommit2: planOne(
         on(SectorPreCommit2{}, PreCommitting),
         on(SectorSealPreCommitFailed{}, SealFailed),
         on(SectorPackingFailed{}, PackingFailed),
     ),
...

比如說,在PreCommit1的狀態,接收到SectorPreCommit1事件,將進入PreCommit2狀態。所有Sector的狀態轉換如下圖:

在處理好狀態好,就進入相應狀態的處理程式。以PreCommit1狀態為例,相應的處理函式是handlePreCommit1。

func (m *Sealing) handlePreCommit1(ctx statemachine.Context, sector SectorInfo) error {
     tok, epoch, err := m.api.ChainHead(ctx.Context())
     ...
     pc1o, err := m.sealer.SealPreCommit1(ctx.Context(), m.minerSector(sector.SectorNumber), ticketValue, sector.pieceInfos())
     if err != nil {
         return ctx.Send(SectorSealPreCommitFailed{xerrors.Errorf("seal pre commit(1) failed: %w", err)})
     }
   ...
 }

很清楚看出,在PreCommit1的處理函式中,會透過SealPrecommit1呼叫rust-fil-proofs實現Precommit1的計算。最後總結一下,各個狀態的含義:

Empty - 空狀態 
Packing - 打包狀態,多個Piece填充到一個Sector中 
PreCommit1 - PreCommit1計算 
PreCommit2 - PreCommit2計算 
PreCommitting - 提交Precommit2的結果到鏈上 
WaitSeed - 等待隨機種子(給定10個區塊的時間,讓隨機數種子不可提前預測) 
Committing - 計算Commit1/Commit2,並將證明提交到鏈上 
CommitWait - 等待鏈上確認 FinalizeSector - Sector狀態確定

總結:

Sector的狀態管理基於狀態機。通用狀態機的實現是透過go-statemachine實現。狀態的儲存透過go-statestore實現。在這些模組的基礎上,storage-fsm實現了Sector的狀態定義以及狀態處理函式。

免責聲明:

  1. 本文版權歸原作者所有,僅代表作者本人觀點,不代表鏈報觀點或立場。
  2. 如發現文章、圖片等侵權行爲,侵權責任將由作者本人承擔。
  3. 鏈報僅提供相關項目信息,不構成任何投資建議

推荐阅读

;