Sector狀態管理相關的模組如下:
Sector的狀態管理基於狀態機。Lotus實現了通用的狀態機(statemachine),在go-statemachine包中。在StateMachine之上,進一步抽象出了StateGroup,管理多個狀態機。StateMachine只是實現狀態的轉變,具體狀態是透過go-statestore進行儲存。在StateMachine之上,定義狀態轉變的規則以及狀態對應的處理函式,這些就是在具體的業務。SectorState就是Lotus管理Sector的具體業務。在這些底層模組的基礎上,Lotus的相關程式碼呼叫就比較簡單。先從狀態管理的底層模組,StateMachine講起:
2. StateMachine
StateMachine定義在go-statemachine包中,machine.go:
type StateMachine struct {
planner Planner
eventsIn chan Event
name interface{}
st *statestore.StoredState
stateType reflect.Type
stageDone chan struct{}
closing chan struct{}
closed chan struct{}
busy int32
}
其中,planner是抽象出來的狀態機的狀態轉化函式。Planner接收Event,結合當前的狀態user,確定下一步的處理。
type Planner func(events []Event, user interface{}) (interface{}, uint64, error)
st是儲存的狀態。stateType是儲存狀態的型別。
StateMachine的核心是run函式,分成三部分:接收Event,狀態處理,下一步的呼叫。其中狀態處理是關鍵:
err := fsm.mutateUser(func(user interface{}) (err error) {
nextStep, processed, err = fsm.planner(pendingEvents, user)
ustate = user
if xerrors.Is(err, ErrTerminated) {
terminated = true
return nil
}
return err
})
mutateUser就是檢視當前儲存的狀態(StoredState),並執行planner函式,並將planner處理後的狀態儲存。planner函式返回下一步的處理,已經處理的Event 的個數,有可能的出錯。run函式會啟動另外一個go routine執行nextStep。
mutateUser具體的實現是利用go的反射(reflect),實現抽象和模組化。相關的邏輯感興趣的小夥伴可以自己檢視。
3. StateGroup
往往業務需要很多StateMachine。舉個例子,Lotus的Miner儲存多個Sector,每個Sector都由一個StateMachine維護獨立的狀態。StateGroup就是實現多個“同樣”的StateMachine,定義在group.go中。
type StateGroup struct {
sts *statestore.StateStore
hnd StateHandler
stateType reflect.Type
closing chan struct{}
initNotifier sync.Once
lk sync.Mutex
sms map[datastore.Key]*StateMachine
}
其中,sms就是StateMachine的狀態陣列。StateHandler是StateMachine的狀態處理函式介面:
type StateHandler interface {
Plan(events []Event, user interface{}) (interface{}, uint64, error)
}
也就是說,在StateMachine狀態機上,需要實現StateHandler介面(Plan函式,實現狀態的轉換)。
4. StoredState
StoredState是抽象的Key-Value的儲存。相對比較簡單,Get/Put介面,非常容易理解。
5. SectorState
storage-fsm實現了和Sector狀態相關的業務邏輯。也就是狀態的定義,狀態的轉換函式都是在這個包裡實現。整個Sector的資訊定義在storage-fsm/types.go中:
type SectorInfo struct {
State SectorState
SectorNumber abi.SectorNumber
Nonce uint64
SectorType abi.RegisteredProof
// Packing
Pieces []Piece
// PreCommit1
TicketValue abi.SealRandomness
TicketEpoch abi.ChainEpoch
PreCommit1Out storage.PreCommit1Out
// PreCommit2
CommD *cid.Cid
CommR *cid.Cid
Proof []byte
PreCommitMessage *cid.Cid
// WaitSeed
SeedValue abi.InteractiveSealRandomness
SeedEpoch abi.ChainEpoch
// Committing
CommitMessage *cid.Cid
InvalidProofs uint64
// Faults
FaultReportMsg *cid.Cid
// Debug
LastErr string
Log []Log
}
SectorInfo包括了Sector狀態,Precommit1/2的資料,Committing的資料等等。其中,SectorState描述了Sector的具體狀態。Sector的所有的狀態定義在sector_state.go檔案中:
const (
UndefinedSectorState SectorState = ""
// happy path
Empty SectorState = "Empty"
Packing SectorState = "Packing" // sector not in sealStore, and not on chain
PreCommit1 SectorState = "PreCommit1" // do PreCommit1
PreCommit2 SectorState = "PreCommit2" // do PreCommit1
PreCommitting SectorState = "PreCommitting" // on chain pre-commit
WaitSeed SectorState = "WaitSeed" // waiting for seed
Committing SectorState = "Committing"
CommitWait SectorState = "CommitWait" // waiting for message to land on chain
FinalizeSector SectorState = "FinalizeSector"
Proving SectorState = "Proving"
// error modes
FailedUnrecoverable SectorState = "FailedUnrecoverable"
SealFailed SectorState = "SealFailed"
PreCommitFailed SectorState = "PreCommitFailed"
ComputeProofFailed SectorState = "ComputeProofFailed"
CommitFailed SectorState = "CommitFailed"
PackingFailed SectorState = "PackingFailed"
Faulty SectorState = "Faulty" // sector is corrupted or gone for some reason
FaultReported SectorState = "FaultReported" // sector has been declared as a fault on chain
FaultedFinal SectorState = "FaultedFinal" // fault declared on chain
)
熟悉SDR演算法的小夥伴,會發現很多熟悉的字眼:PreCommit1,PreCommit2,Commiting等等。結合狀態處理函式,就能很清楚各個狀態的含義以及需要處理的內容。
fsm.go中Sealing結構的Plan函式是Sector狀態的處理函式:
func (m *Sealing) Plan(events []statemachine.Event, user interface{}) (interface{}, uint64, error) {
next, err := m.plan(events, user.(*SectorInfo))
if err != nil || next == nil {
return nil, uint64(len(events)), err
}
return func(ctx statemachine.Context, si SectorInfo) error {
err := next(ctx, si)
if err != nil {
log.Errorf("unhandled sector error (%d): %+v", si.SectorNumber, err)
return nil
}
return nil
}, uint64(len(events)), nil // TODO: This processed event count is not very correct
}
Plan函式的實現也比較簡單,呼叫plan處理當前的狀態,並返回接下來需要處理的函式。狀態的處理又可以分成兩部分來看:1/ 狀態轉換的定義 2/ 狀態的處理。
針對Sector的狀態轉換的定義,在fsmPlanners中:
var fsmPlanners = map[SectorState]func(events []statemachine.Event, state *SectorInfo) error{
UndefinedSectorState: planOne(on(SectorStart{}, Packing)),
Packing: planOne(on(SectorPacked{}, PreCommit1)),
PreCommit1: planOne(
on(SectorPreCommit1{}, PreCommit2),
on(SectorSealPreCommitFailed{}, SealFailed),
on(SectorPackingFailed{}, PackingFailed),
),
PreCommit2: planOne(
on(SectorPreCommit2{}, PreCommitting),
on(SectorSealPreCommitFailed{}, SealFailed),
on(SectorPackingFailed{}, PackingFailed),
),
...
比如說,在PreCommit1的狀態,接收到SectorPreCommit1事件,將進入PreCommit2狀態。所有Sector的狀態轉換如下圖:
在處理好狀態好,就進入相應狀態的處理程式。以PreCommit1狀態為例,相應的處理函式是handlePreCommit1。
func (m *Sealing) handlePreCommit1(ctx statemachine.Context, sector SectorInfo) error {
tok, epoch, err := m.api.ChainHead(ctx.Context())
...
pc1o, err := m.sealer.SealPreCommit1(ctx.Context(), m.minerSector(sector.SectorNumber), ticketValue, sector.pieceInfos())
if err != nil {
return ctx.Send(SectorSealPreCommitFailed{xerrors.Errorf("seal pre commit(1) failed: %w", err)})
}
...
}
很清楚看出,在PreCommit1的處理函式中,會透過SealPrecommit1呼叫rust-fil-proofs實現Precommit1的計算。最後總結一下,各個狀態的含義:
Empty - 空狀態
Packing - 打包狀態,多個Piece填充到一個Sector中
PreCommit1 - PreCommit1計算
PreCommit2 - PreCommit2計算
PreCommitting - 提交Precommit2的結果到鏈上
WaitSeed - 等待隨機種子(給定10個區塊的時間,讓隨機數種子不可提前預測)
Committing - 計算Commit1/Commit2,並將證明提交到鏈上
CommitWait - 等待鏈上確認 FinalizeSector - Sector狀態確定
總結:
Sector的狀態管理基於狀態機。通用狀態機的實現是透過go-statemachine實現。狀態的儲存透過go-statestore實現。在這些模組的基礎上,storage-fsm實現了Sector的狀態定義以及狀態處理函式。