Go併發程式設計元語以及在Tendermint/CosmosSDK中的應用

買賣虛擬貨幣

本文首先對併發程式設計模型和元語進行簡要介紹,然後結合Tendermint/Cosmos-SDK原始碼詳細介紹Go語言支援的兩種併發程式設計模型所提供的元語。本文不區分併發(Concurrency)和並行(Parallel),文中會根據情況使用這兩個詞中的一個。

併發程式設計模型 併發程式設計元語 共享記憶體模型元語 3.1 sync.Mutex 3.2 sync.RWMutex 3.3 sync.WaitGroup 3.4 sync.Cond 3.5 sync.Once 3.6 sync.Pool CSP模型元語 4.1 Channel基本用法 4.2 Channel的容量 4.3 Channel的方向 4.4 select語句

1.併發程式設計模型

併發程式設計模型可以按兩種方式分類:按照process互動方式,或者按照問題分解方式。按照process互動方式又可以進一步分為共享記憶體模型、訊息傳遞模型、隱式互動模型。很多程式語言(比如Java)提供的執行緒模型就屬於共享記憶體模型,在這種模型下,並行執行的多個執行緒透過共享記憶體來進行互動。訊息傳遞模型又可以進一步分為同步和非同步兩種模式,同步模式比較常用的有Communicating Sequential Processes(後文簡稱CSP)模型,非同步模式比較常用的有Actor模型。Actor模型被Scala等語言採用,CSP模型則被Go語言採用並且大放異彩。本文只結合Go語言討論共享記憶體和CSP這兩種模型,下面給出併發程式設計模型的分類僅供參考。

Parallel programming model:

Process interaction

Shared memory Message passing synchronous CSP asynchronous Actor Implicit interation Problem decomposition

Task parallelism Data parallelism Implicit parallelism

2.併發程式設計元語

不管某個程式語言使用哪種併發程式設計模型,都需要提供一些基本的語法或者API來供程式設計師在這種模型下程式設計,這些基本語法或API就叫做併發程式設計元語(Primitives)。大家都知道在Go語言裡可以使用go關鍵字非常方便的開啟協程(Goroutine),Go語言同時支援共享記憶體和CSP這兩種程式設計模型。共享記憶體元語主要由sync包提供,包括Mutex和Cond等。CSP元語則是內建在語言內的,主要包括Channel型別和select語句。下面是Go語言提供的(也是本文將要介紹的)併發程式設計元語。

Go concurrency primitives:

Shared memory sync.Locker sync.Mutex sync.RWMutex sync.WaitGroup sync.Cond sync.Once sync.Pool CSP channels select 這裡需要指出,雖然Go語言同時支援共享記憶體和CSP兩種程式設計模型,但通常還是鼓勵使用CSP模型。只有在真正必要時,再使用sync包。Go的口號是:

Do not communicate by sharing memory; instead, share memory by communicating.

3.共享記憶體模型元語

Go語言共享記憶體程式設計模型主要由標準庫sync包支援,這一節對sync包提供的6個元語進行介紹。

3.1 sync.Mutex

Mutex是Go語言提供的互斥鎖實現。Mutex本身的用法非常簡單,只有Lock()和Unlock()兩個方法。這兩個方法也是Locker介面僅有的兩個方法,因此Mutex實現了Locker介面:

// A Locker represents an object that can be locked and unlocked. type Locker interface { Lock() Unlock() } 在Go語言裡,使用Mutex的套路一般是:當需要進入臨界區時調先用Lock()方法加鎖,然後使用defer語句呼叫Unlock()方法解鎖,最後訪問臨界區。以Cosmos-SDK提供的cachekv.Store為例:

// Store wraps an in-memory cache around an underlying types.KVStore. type Store struct { mtx sync.Mutex cache map[string]*cValue unsortedCache map[string]struct{} sortedCache *list.List // always ascending sorted parent types.KVStore } Store的Get()、Set()、Delete()等方法均需要保證併發安全性,所以按照前面描述的套路使用Mutex。以Get()方法為例:

// Implements types.KVStore. func (store *Store) Get(key []byte) (value []byte) { store.mtx.Lock() defer store.mtx.Unlock() types.AssertValidKey(key)

  1. cacheValue, ok := store.cache[string(key)]

  2. if!ok {

  3. value = store.parent.Get(key)

  4. store.setCacheValue(key, value,false,false)

  5. }else{

  6. value = cacheValue.value

  7. }

  8. return value

3.2 sync.RWMutex

RWMutex是Go語言提供的讀寫鎖實現。和Mutex一樣,RWMutex也實現了Locker介面,用於操作寫鎖。另外增加了RLock()、RUnlock()、RLocker()三個方法,用於操作讀鎖。下面是RWMutex的完整API:

type RWMutex struct { /* fields */ } func (rw *RWMutex) Lock() func (rw *RWMutex) Unlock() func (rw *RWMutex) RLock() func (rw *RWMutex) RUnlock() func (rw *RWMutex) RLocker() Locker RWMutex使用起來也很簡單:當需要進入臨界區時進行寫操作時,使用寫鎖;如果只讀,則使用讀鎖。以Tendermint裡的BlockStore為例:

// BlockStore is a simple low level store for blocks. type BlockStore struct { db dbm.DB

  1. mtx sync.RWMutex

  2. height int64

} Height()方法只讀取塊的高度,因此使用讀鎖即可:

// Height returns the last known contiguous block height. func (bs *BlockStore) Height() int64 { bs.mtx.RLock() defer bs.mtx.RUnlock() return bs.height } SaveBlock()方法要寫高度,因此使用寫鎖:

func (bs *BlockStore) SaveBlock(block *types.Block, blockParts *types.PartSet, seenCommit *types.Commit) { ...

  1. // Done!

  2. bs.mtx.Lock()

  3. bs.height = height

  4. bs.mtx.Unlock()

  5. // Flush

  6. bs.db.SetSync(nil,nil)

}

3.3 sync.WaitGroup

如果要等待n個Goroutine結束,那麼可以使用WaitGroup(類似Java提供的CountDownLatch)。WaitGroup內部維護了一個計數器:Add()方法可以對計數器增加任意值(包括負值);Done()方法對計數器減一;Wait()方法會導致Goroutine被阻塞,直到計數器變為0。下面是WaitGroup的完整API:

type WaitGroup struct { /* fields */ } func (wg *WaitGroup) Add(delta int) func (wg *WaitGroup) Done() func (wg *WaitGroup) Wait() 其實Done()方法只是對Add()方法的簡單封裝而已:

// Done decrements the WaitGroup counter by one. func (wg *WaitGroup) Done() { wg.Add(-1) } 使用WaitGroup的一般套路是:第一步,建立WaitGroup例項;第二步,呼叫Add()方法設定需要等待的Goroutine數量;第三步,啟動Goroutine幹活兒,並且在Goroutine內部呼叫Done()方法;第四步,呼叫Wait()方法等待全部Goroutine結束。以Tendermint裡的p2p.Switch#Broadcast()方法為例:

// Broadcast runs a go routine for each attempted send, which will block trying // to send for defaultSendTimeoutSeconds. Returns a channel which receives // success values for each attempted send (false if times out). Channel will be // closed once msg bytes are sent to all peers (or time out). // // NOTE: Broadcast uses goroutines, so order of broadcast may not be preserved. func (sw *Switch) Broadcast(chID byte, msgBytes []byte) chan bool { sw.Logger.Debug("Broadcast", "channel", chID, "msgBytes", fmt.Sprintf("%X", msgBytes))

  1. peers := sw.peers.List()

  2. var wg sync.WaitGroup// step#1

  3. wg.Add(len(peers))// step#2

  4. successChan := make(chan bool, len(peers))

  5. for _, peer := range peers {

  6. go func(p Peer){

  7. defer wg.Done()// step#3

  8. success := p.Send(chID, msgBytes)

  9. successChan <- success

  10. }(peer)

  11. }

  12. go func(){

  13. wg.Wait()// step#4

  14. close(successChan)

  15. }()

  16. return successChan

3.4 sync.Cond

如果想等待某種條件(Condition)發生,或者等待某種訊號發出,那麼可以使用Cond。Cond總是和一個Locker相關聯,呼叫Wait()方法之前需要先鎖住這個Locker,Wait()方法內部會自動解鎖Locker。所有呼叫Wait()方法的Goroutine會進入一個等待佇列並暫停執行(同時釋放鎖),呼叫Signal()方法可以恢復等待佇列中的某個Goroutine,呼叫Broadcast()方法則可以恢復等待佇列中的全部Goroutine。Wait()方法返回後,相應的Goroutine會繼續持有鎖。下面是Cond的完整API:

type Cond struct {  L Locker // L is held while observing or changing the condition // other fields } func NewCond(l Locker) *Cond func (c *Cond) Wait() func (c *Cond) Signal() func (c *Cond) Broadcast() 由於Tendermint/Cosmos-SDK並沒有用到Cond,所以我們看一下Tendermint依賴的gRPC。gRPC定義了一個Server結構體:

// Server is a gRPC server to serve RPC requests. type Server struct { opts options

  1. mu sync.Mutex// guards following

  2. lis map[net.Listener]bool

  3. conns map[io.Closer]bool

  4. serve bool

  5. drain bool

  6. cv *sync.Cond// signaled when connections close for GracefulStop

  7. m map[string]*service // service name -> service info

  8. events trace.EventLog

  9. ...// other fields

} Server例項由NewServer()函式建立,這個函式里會呼叫NewCond()函式初始化cv欄位:

// NewServer creates a gRPC server which has no service registered and has not // started to accept requests yet. func NewServer(opt ...ServerOption) *Server { opts := defaultServerOptions for _, o := range opt { o(&opts) } s := &Server{ ... } s.cv = sync.NewCond(&s.mu) // <--- if EnableTracing { ... } if channelz.IsOn() { ... } return s } 呼叫Wait()方法的套路一般是這樣:先持有鎖,然後迴圈判斷條件並在迴圈內呼叫Wait()方法等待,迴圈退出後條件滿足進行操作,最後解鎖。下面是這種套路的虛擬碼:

c.L.Lock() for !condition() { c.Wait() } ... make use of condition ... c.L.Unlock() Server的GracefulStop()方法需要“優雅”的關閉服務,所以要等待所有連線都關閉。該方法就是按照套路行事:

// GracefulStop stops the gRPC server gracefully. It stops the server from // accepting new connections and RPCs and blocks until all the pending RPCs are // finished. func (s *Server) GracefulStop() { ... s.mu.Lock()

  1. for len(s.conns)!=0{

  2. s.cv.Wait()// wait in a loop

  3. }

  4. s.conns =nil// make use of condition

  5. if s.events !=nil{//

  6. s.events.Finish()//

  7. s.events =nil//

  8. }//

  9. s.mu.Unlock()

} Server的removeConn()方法在連線關閉之後會呼叫Broadcast()方法:

func (s *Server) removeConn(c io.Closer) { s.mu.Lock() defer s.mu.Unlock() if s.conns != nil { delete(s.conns, c) s.cv.Broadcast() // <--- } } 需要指出的是,呼叫Singal()和Broadcast()並不一定要持有鎖。不過上面的removeConn()方法因為要改變共享狀態(conns欄位),所以才需要持有鎖。

3.5 sync.Once

如果想在併發情況下保證某操作只被執行一次,那麼可以使用Once。Once非常簡單,只有一個Do()方法,用於提交操作。下面是Once的完整API:

type Once struct { /* fields */ } func (o *Once) Do(f func()) Tendermint/Cosmos-SDK也沒有使用Once,前面介紹gRPC的Server結構體時省略了一些欄位,下面給出該結構體的完整定義:

// Server is a gRPC server to serve RPC requests. type Server struct { opts options

  1. mu sync.Mutex// guards following

  2. lis map[net.Listener]bool

  3. conns map[io.Closer]bool

  4. serve bool

  5. drain bool

  6. cv *sync.Cond// signaled when connections close for GracefulStop

  7. m map[string]*service // service name -> service info

  8. events trace.EventLog

  9. quit chan struct{}

  10. done chan struct{}

  11. quitOnce sync.Once

  12. doneOnce sync.Once

  13. channelzRemoveOnce sync.Once

  14. serveWG sync.WaitGroup// counts active Serve goroutines for GracefulStop

  15. channelzID int64 // channelz unique identification number

  16. czData *channelzData

} 可以看到,Server結構體包含了三個Once型別的欄位。以Stop()方法為例:

func (s *Server) Stop() { s.quitOnce.Do(func() { // <--- close(s.quit) })

  1. defer func(){

  2. s.serveWG.Wait()

  3. s.doneOnce.Do(func(){// <---

  4. close(s.done)

  5. })

  6. }()

  7. ...

} Once的使用保證了quit和done這兩個channel僅被關閉一次。

3.6 sync.Pool

雖然Go語言有自動垃圾回(GC)收機制,但是如果有一些物件建立和銷燬代價比較大,則使用一個物件池來重複利用這些物件也是不錯的主意。Pool就是來做這件事的,並且保證併發安全性。Pool只有兩個方法:Get()從池子裡取一個物件,Put()把物件歸還給池子。Pool還有一個可選的New欄位,型別是函式,當池子空的時候,Get()方法會呼叫New()函式建立新物件。下面是Pool的完整API:

type Pool struct { ... // other fields

  1. // New optionally specifies a function to generate

  2. // a value when Get would otherwise return nil.

  3. // It may not be changed concurrently with calls to Get.

  4. New func()interface{}

} func (p *Pool) Put(x interface{}) func (p *Pool) Get() interface{} Tendermint的tmfmtLogger使用Pool來快取tmfmtEncoder例項:

var tmfmtEncoderPool = sync.Pool{ New: func() interface{} { var enc tmfmtEncoder enc.Encoder = logfmt.NewEncoder(&enc.buf) return &enc }, } Log()方法先呼叫Get()方法從池子裡取出一個encoder,接著使用defer語句呼叫Put()歸還encoder,然後使用encoder。這也是使用Pool的一般套路,程式碼如下所示:

func (l tmfmtLogger) Log(keyvals ...interface{}) error { enc := tmfmtEncoderPool.Get().(*tmfmtEncoder) enc.Reset() defer tmfmtEncoderPool.Put(enc) ... // use enc }

  1. CSP模型元語

這一節圍繞Go語言提供的chan型別以及相關語法介紹CSP程式設計模型。

4.1 Channel基本用法

和共享記憶體模型不同,Go語言從語法上對CSP模型進行支援。從這一點也可以看出,用Go語言程式設計時,使用CSP模型才更自然一些。CSP模型最重要的概念就是訊息傳遞,為此Go語言提供了特殊的channel型別。顧名思義,channel就是通道,訊息可以經由通道在Goroutine之間流通。而在通道里流通的訊息,則可以是任意型別(包括channel)。必須使用chan關鍵字來宣告channel型別。由於channel是強型別的,因此chan的後面要跟上允許在通道里流動的訊息的型別,例如:

var mychan chan string // channel of string 通道的例項由內建函式make()建立,例如:

mychan = make(chan string) 要想給channel傳送訊息,或者從channel裡讀取訊息,可以使用<-運算子。如果<-在channel的左邊則表示讀取訊息,如果在右邊則表示傳送訊息:

s := <-mychan // read from channel mychan <- s // write to channel 還可以使用for range語句迴圈從channel裡讀取訊息:

for s := range mychan { fmt.Println(s) } 如果channel不再有用,可以使用內建函式close()把它關閉:

close(mychan) Channel非常像其他語言(比如Java)裡的阻塞佇列(Blocking Queue),只不過容量為0(後面會進一步解釋)。如果一個Goroutine試圖從channel裡讀取訊息會被阻塞,直到有其他Goroutine往裡面傳送訊息為止。反之,如果一個Goroutine試圖從channel裡傳送訊息也會被阻塞,直到有其他Goroutine從裡面拿走這個訊息為止。Tendermint/Cosmos-SDK程式碼中大量使用了channel,以Cosmos-SDK提供的iavl包為例:

// Implements types.Iterator. type iavlIterator struct { tree *iavl.ImmutableTree // Underlying store start, end []byte // Domain ascending bool // Iteration order iterCh chan cmn.KVPair // Channel to push iteration values. quitCh chan struct{} // Close this to release goroutine. initCh chan struct{} // Close this to signal that state is initialized. ... // other fields } 上面給出了iavlIterator結構體的程式碼,可以看到,這個結構體包含了三個chan型別的欄位。下面是newIAVLIterator()函式的程式碼,從中可以看到make()函式的使用:

// newIAVLIterator will create a new iavlIterator. // CONTRACT: Caller must release the iavlIterator, as each one creates a new // goroutine. func newIAVLIterator(tree *iavl.ImmutableTree, start, end []byte, ascending bool) *iavlIterator { iter := &iavlIterator{ tree: tree, start: types.Cp(start), end: types.Cp(end), ascending: ascending, iterCh: make(chan cmn.KVPair), // Set capacity > 0? quitCh: make(chan struct{}), initCh: make(chan struct{}), } go iter.iterateRoutine() go iter.initRoutine() return iter } iterateRoutine()函式則展示了<-運算子以及close()函式的使用:

// Run this to funnel items from the tree to iterCh. func (iter *iavlIterator) iterateRoutine() { iter.tree.IterateRange( iter.start, iter.end, iter.ascending, func(key, value []byte) bool { select { case <-iter.quitCh: return true // done with iteration. case iter.iterCh <- cmn.KVPair{Key: key, Value: value}: return false // yay. } }, ) close(iter.iterCh) // done. } 

4.2 Channel的容量

前面說了,channel類似於其他語言中的阻塞佇列,只不過容量(Capacity)為0。其實,channel的容量可以是任意大小的,只需要在呼叫make()函式時透過第二個引數指定即可(注意:容量不屬於channel型別,因此無法在宣告channel時指定):

mychan = make(chan string, 100) 如果建立channel時不指定容量,那麼我們建立的就是一個unbuffered channel,否則建立的就是buffered channel。當往buffered channel裡寫入訊息時,只有buffer滿才會導致Goroutine被阻塞。同理,當從buffered channel裡讀取訊息時,只有buffer空才會導致Goroutine被阻塞。Tendermint也大量使用了buffered channel,這裡就不舉例說明了,下面列出一些用到的地方供讀者參考:

$ cd tendermint $ grep -nr make(chan.,.) . | grep -v test | grep -v .md ./consensus/state.go:156: peerMsgQueue: make(chan msgInfo, msgQueueSize), ./consensus/state.go:157: internalMsgQueue: make(chan msgInfo, msgQueueSize), ./consensus/state.go:159: statsMsgQueue: make(chan msgInfo, msgQueueSize), ./consensus/ticker.go:43: tickChan: make(chan timeoutInfo, tickTockBufferSize), ./consensus/ticker.go:44: tockChan: make(chan timeoutInfo, tickTockBufferSize), ./abci/server/socketserver.go:115: closeConn := make(chan error, 2) // Push to signal connection closed ./abci/server/socketserver.go:116: responses := make(chan *types.Response, 1000) // A channel to buffer responses ./abci/client/socketclient.go:45: reqQueue: make(chan *ReqRes, reqQueueSize), ./tools/tm-signer-harness/internal/testharness.go:131: c := make(chan os.Signal, 1) ./tools/tm-monitor/monitor/monitor.go:85: blockCh := make(chan tmtypes.Header, 10) ./tools/tm-monitor/monitor/monitor.go:87: blockLatencyCh := make(chan float64, 10) ./tools/tm-monitor/monitor/monitor.go:89: disconnectCh := make(chan bool, 10) ./libs/autofile/autofile.go:72: af.hupc = make(chan os.Signal, 1) ./libs/common/os.go:18: c := make(chan os.Signal, 1) ./libs/common/async.go:127: var taskDoneCh = make(chan bool, len(tasks)) // A "wait group" channel, early abort if any true received. ./libs/common/async.go:135: var taskResultCh = make(chan TaskResult, 1) // Capacity for 1 result. ./libs/pubsub/subscription.go:33: out: make(chan Message, outCapacity), ./blockchain/v1/reactor.go:88: eventsFromFSMCh := make(chan bcFsmMessage, capacity) ./blockchain/v1/reactor.go:89: messagesForFSMCh := make(chan bcReactorMessage, capacity) ./blockchain/v1/reactor.go:90: errorsForFSMCh := make(chan bcReactorMessage, capacity) ./blockchain/v1/reactor.go:280: doProcessBlockCh := make(chan struct{}, 1) ./blockchain/v1/reactor.go:336: stopProcessing := make(chan struct{}, 1) ./blockchain/v0/reactor.go:81: requestsCh := make(chan BlockRequest, maxTotalRequesters) ./blockchain/v0/reactor.go:84: errorsCh := make(chan peerError, capacity) // so we don't block in #Receive#pool.AddBlock ./blockchain/v0/reactor.go:225: didProcessCh := make(chan struct{}, 1) ./blockchain/v0/pool.go:512: gotBlockCh: make(chan struct{}, 1), ./blockchain/v0/pool.go:513: redoCh: make(chan p2p.ID, 1), ./p2p/transport.go:332: errc := make(chan error, len(mt.connFilters)) ./p2p/transport.go:495: errc = make(chan error, 2) ./p2p/switch.go:264: successChan := make(chan bool, len(peers)) ./p2p/switch.go:719: errc := make(chan error, len(sw.peerFilters)) ./p2p/conn/connection.go:166: send: make(chan struct{}, 1), ./p2p/conn/connection.go:167: pong: make(chan struct{}, 1), ./p2p/conn/connection.go:208: c.pongTimeoutCh = make(chan bool, 1) ./p2p/conn/connection.go:739: sendQueue: make(chan []byte, desc.SendQueueCapacity), ./mempool/clistmempool.go:108: mem.txsAvailable = make(chan struct{}, 1) ./rpc/core/mempool.go:137: resCh := make(chan *abci.Response, 1) ./rpc/core/mempool.go:237: checkTxResCh := make(chan *abci.Response, 1) ./rpc/lib/server/handlers.go:543: wsc.writeChan = make(chan types.RPCResponse, wsc.writeChanCapacity) ./rpc/lib/server/handlers.go:712: pongs := make(chan string, 1) ./rpc/lib/client/wsclient.go:166: c.reconnectAfter = make(chan error, 1) ./rpc/lib/client/wsclient.go:169: c.backlog = make(chan types.RPCRequest, 1) ./rpc/client/httpclient.go:410: outc := make(chan ctypes.ResultEvent, outCap) ./rpc/client/localclient.go:179: outc := make(chan ctypes.ResultEvent, outCap) 4.3 Channel的方向

雖然宣告channel時無法指定容量,但卻能指定方向。像前面那樣宣告的channel實際是雙向(Bidirectional)通道,我們還可以宣告單向(Unidirectional)通道,只要在申明channel時帶上<-即可。如果<-在chan關鍵字左邊表示只讀通道,右邊則表示只寫通道,例如:

var mychan <-chan string // readonly var mychan chan<- string // write-only Tendermint也大量使用了單向通道,這裡也不舉例說明了,下面列出一些使用只讀通道的地方:

$ grep -nr \<-chan . | grep -v test | grep -v .md ./consensus/state.go:62: TxsAvailable() <-chan struct{} ./consensus/ticker.go:20: Chan() <-chan timeoutInfo // on which to receive a timeout ./consensus/ticker.go:66:func (t *timeoutTicker) Chan() <-chan timeoutInfo { ./abci/server/socketserver.go:218:func (s *SocketServer) handleResponses(closeConn chan error, conn net.Conn, responses <-chan *types.Response) { ./tools/tm-monitor/monitor/monitor.go:168:func (m *Monitor) listen(nodeName string, blockCh <-chan tmtypes.Header, blockLatencyCh <-chan float64, disconnectCh <-chan bool, quit <-chan struct{}) { ./types/eventbus.go:25: Out() <-chan tmpubsub.Message ./types/eventbus.go:26: Cancelled() <-chan struct{} ./evidence/pool.go:47:func (evpool *EvidencePool) EvidenceWaitChan() <-chan struct{} { ./mock/mempool.go:38:func (Mempool) TxsAvailable() <-chan struct{} { return make(chan struct{}) } ./mock/mempool.go:43:func (Mempool) TxsWaitChan() <-chan struct{} { return nil } ./libs/clist/clist.go:97:func (e *CElement) PrevWaitChan() <-chan struct{} { ./libs/clist/clist.go:106:func (e *CElement) NextWaitChan() <-chan struct{} { ./libs/clist/clist.go:308:func (l *CList) WaitChan() <-chan struct{} { ./libs/common/service.go:46: Quit() <-chan struct{} ./libs/common/service.go:217:func (bs *BaseService) Quit() <-chan struct{} { ./libs/common/async.go:20:type TaskResultCh <-chan TaskResult ./libs/pubsub/subscription.go:41:func (s *Subscription) Out() <-chan Message { ./libs/pubsub/subscription.go:47:func (s *Subscription) Cancelled() <-chan struct{} { ./blockchain/v0/reactor.go:68: requestsCh <-chan BlockRequest ./blockchain/v0/reactor.go:69: errorsCh <-chan peerError ./p2p/trust/ticker.go:13: GetChannel() <-chan time.Time ./p2p/trust/ticker.go:33:func (t *TestTicker) GetChannel() <-chan time.Time { ./p2p/trust/ticker.go:60:func (t *Ticker) GetChannel() <-chan time.Time { ./p2p/fuzz.go:18: start <-chan time.Time ./p2p/fuzz.go:34: start: make(<-chan time.Time), ./mempool/clistmempool.go:203:func (mem *CListMempool) TxsWaitChan() <-chan struct{} { ./mempool/clistmempool.go:444:func (mem *CListMempool) TxsAvailable() <-chan struct{} { ./mempool/mempool.go:58: TxsAvailable() <-chan struct{} ./rpc/lib/client/ws_client.go:367: ticker = &time.Ticker{C: make(<-chan time.Time)} ./rpc/client/httpclient.go:399: outCapacity ...int) (out <-chan ctypes.ResultEvent, err error) { ./rpc/client/interface.go:103: Subscribe(ctx context.Context, subscriber, query string, outCapacity ...int) (out <-chan ctypes.ResultEvent, err error) ./rpc/client/localclient.go:164:func (c *Local) Subscribe(ctx context.Context, subscriber, query string, outCapacity ...int) (out <-chan ctypes.ResultEvent, err error) { 下面是一些使用只寫通道的地方:

$ grep -nr chan\<- . | grep -v test | grep -v .md ./consensus/walgenerator.go:151: signalWhenStopsTo chan<- struct{} ./consensus/walgenerator.go:159:func newByteBufferWAL(logger log.Logger, enc *WALEncoder, nBlocks int64, signalStop chan<- struct{}) *byteBufferWAL { ./abci/server/socketserver.go:147:func (s *SocketServer) handleRequests(closeConn chan error, conn net.Conn, responses chan<- *types.Response) { ./abci/server/socketserver.go:179:func (s *SocketServer) handleRequest(req *types.Request, responses chan<- *types.Response) { ./tools/tm-monitor/monitor/node.go:38: blockCh chan<- tmtypes.Header ./tools/tm-monitor/monitor/node.go:39: blockLatencyCh chan<- float64 ./tools/tm-monitor/monitor/node.go:40: disconnectCh chan<- bool ./tools/tm-monitor/monitor/node.go:82:func (n *Node) SendBlocksTo(ch chan<- tmtypes.Header) { ./tools/tm-monitor/monitor/node.go:86:func (n *Node) SendBlockLatenciesTo(ch chan<- float64) { ./tools/tm-monitor/monitor/node.go:90:func (n *Node) NotifyAboutDisconnects(ch chan<- bool) { ./types/eventbus.go:218:func (NopEventBus) Subscribe(ctx context.Context, subscriber string, query tmpubsub.Query, out chan<- interface{}) error { ./blockchain/v0/pool.go:78: requestsCh chan<- BlockRequest ./blockchain/v0/pool.go:79: errorsCh chan<- peerError ./blockchain/v0/pool.go:84:func NewBlockPool(start int64, requestsCh chan<- BlockRequest, errorsCh chan<- peerError) *BlockPool { ./p2p/transport.go:335: go func(f ConnFilterFunc, c net.Conn, ips []net.IP, errc chan<- error) { ./p2p/transport.go:501: go func(errc chan<- error, c net.Conn) { ./p2p/transport.go:505: go func(errc chan<- error, c net.Conn) { ./p2p/switch.go:722: go func(f PeerFilterFunc, p Peer, errc chan<- error) { ./rpc/client/localclient.go:185:func (c *Local) eventsRoutine(sub types.Subscription, subscriber string, q tmpubsub.Query, outc chan<- ctypes.ResultEvent) { 4.4 select語句

如果想同時操作多個channel,那麼可以使用select語句(語法和switch語句有點像)。如果有多個channel可用,那麼select語句會隨機選擇一個可用channel進行讀或者寫。如果沒有channel可用,並且有default分支,則會執行此分支。如果沒有channel可用也沒有提供default分支,那麼整個select語句會被阻塞。下面是select語言的一般形式:

select { case <-c1: // do somethine case <-c2: // do somethine case c3 <- x: // do somethine case c4 <- y: // do somethine default: // do somethine } Tendermint/Cosmos-SDK程式碼中也是大量使用了select語句,前面介紹的iterateRoutine()方法就是一個例子。下面再從Temdermint p2p包中選取一個方法,以供參考:

// Accept implements Transport. func (mt *MultiplexTransport) Accept(cfg peerConfig) (Peer, error) { select { // This case should never have any side-effectful/blocking operations to // ensure that quality peers are ready to be used. case a := <-mt.acceptc: if a.err != nil { return nil, a.err } cfg.outbound = false return mt.wrapPeer(a.conn, a.nodeInfo, cfg, a.netAddr), nil case <-mt.closec: return nil, ErrTransportClosed{} } } 由於select語句的case和default分支均可省略,所以一個慣用法就是透過空的select {}來“永久”阻塞Goroutine。以Tendermint提供的node命令為例:

// NewRunNodeCmd returns the command that allows the CLI to start a node. // It can be used with a custom PrivValidator and in-process ABCI application. func NewRunNodeCmd(nodeProvider nm.NodeProvider) *cobra.Command { cmd := &cobra.Command{ Use: "node", Short: "Run the tendermint node", RunE: func(cmd *cobra.Command, args []string) error { n, err := nodeProvider(config, logger) if err != nil { return fmt.Errorf("Failed to create node: %v", err) }

  1. // Stop upon receiving SIGTERM or CTRL-C.

  2. cmn.TrapSignal(logger, func(){

  3. if n.IsRunning(){ n.Stop()}

  4. })

  5. if err := n.Start(); err !=nil{

  6. return fmt.Errorf("Failed to start node: %v", err)

  7. }

  8. logger.Info("Started node","nodeInfo", n.Switch().NodeInfo())

  9. select{}// Run forever.

  10. },

  11. }

  12. AddNodeFlags(cmd)

  13. return cmd

本文由CoinEx Chain團隊Chase寫作,轉載無需授權。

免責聲明:

  1. 本文版權歸原作者所有,僅代表作者本人觀點,不代表鏈報觀點或立場。
  2. 如發現文章、圖片等侵權行爲,侵權責任將由作者本人承擔。
  3. 鏈報僅提供相關項目信息,不構成任何投資建議

推荐阅读

;