Khipu專案 - 交易並行執行和儲存效能最佳化

買賣虛擬貨幣
最近在Github上發現一個剛上線的專案 - Khipu,涉及到兩個對以太坊的改進點:1)交易並行執行 2)儲存效能最佳化。這兩點也正是Dora網路的部分技術特點和優勢。很高興看到更多的專案對區塊鏈效能做最佳化。本文介紹Khipu專案內容,實現方法以及目前狀態。

首先看看Khipu官方github的介紹(來自https://github.com/khipu-io )

就是說Khipu是一個基於Scala/Akka實現的Ethereum協議,目前正在開發中,已經上線的alpha版本的主要feature包括:

1. 儘可能並行的執行block內部的交易。目前80%的block內部的交易都可以並行執行。
2. 為blockchain專門設計實現了一個儲存引擎(Kesque),基於Kafka的log引擎開發,對於99%以上的隨機讀只需要1次disk io。

再來看看專案進展


目前Khipu實現了3個大的模組:

* 節點發現(訪問)
* 快速同步(同步到近期的狀態資料快照和所有的區塊)
* 常規同步(同步區塊並執行區塊內包含的交易)

後續待開發的feature就不細說了。簡單來說就是目前這個0.1.0-alpha版本實現了一個不完全節點(不能說輕節點因為不是光同步區塊頭),或者說資料備份節點的功能。它可以同步區塊,執行並驗證區塊交易的合法性。但不能作為一個完整的節點存在因為還不能接受,或者產生交易,也不能出塊因為挖礦共識這塊還沒有實現。雖然協議實現還不完全,但專案設計和實現上都有不少值得借鑑的地方,也能看出開發者做了一番深入的思考。

Khipu設計思路

和github上的介紹一致,Khipu的設計思路是比較清晰的,如圖所示:


整個service由兩大塊組成:Akka部分主要負責處理區塊(內部的交易),Kafka部分完成一個database的功能,用於儲存區塊和各種狀態資料。首先我們看看Khipu如何處理區塊。

區塊處理

由於Khipu實現基於Akka,先簡單介紹一下akka和actor模型。

什麼是akka

Akka是一個用scala編寫的庫,用於簡化編寫可容錯的,可擴充套件的,高併發應用。akka使用actor模型來提升抽象能力,提供更好的平臺來構建可擴充套件的,彈性的應用。對於比較難處理的錯誤,akka採用“let it crash”模型來處理,這種模式可以使得一個任務的處理失敗不會導致整個應用的crash,使你的系統擁有強大的自愈能力,也不需要重啟來恢復系統。同時akka的分散式部署更加簡單透明。

actor模型

actor模型並非什麼新鮮事務,它早在20世紀70年代就被提出了,主要目的是為了解決分散式程式設計中的一系列問題。actor模型具有以下優點:1.更簡單的、高度抽象的併發處理。2.非同步的,非阻塞的,高效能的事件驅動程式設計模型。3.非常輕量級的事件驅動處理。

基於上述兩個模組,Khipu的區塊處理邏輯不復雜,其主要的工作都是在並行執行區塊交易上,這段並行處理的邏輯就是一套map reduce,(極端)簡化的流程如下圖:(要感謝Scala的map reduce特性,換Java來估計程式碼量翻好多倍)

這裡Map task容易理解,分開區塊內的交易到多個並行的世界裡面執行各自的交易並改變各自的狀態,主要的問題在於Reduce階段,如何合併這一系列的世界狀態。在合併這一塊,Khipu採用了衝突檢測的辦法,定義了兩種不同的衝突模式(並行的狀態衝突和世界狀態衝突)。

object ProgramState {
trait ParallelRace
case object OnAccount extends ParallelRace
case object OnError extends ParallelRace
}

object BlockWorldState {
sealed trait RaceCondition
case object OnAddress extends RaceCondition
case object OnAccount extends RaceCondition
case object OnStorage extends RaceCondition
case object OnCode extends RaceCondition
}
衝突檢測的基本的邏輯很清楚,比如兩個世界狀態訪問了同一個Account肯定有衝突,或者在並行執行時產生了Error則很有可能是因為並行執行引起的。而不論檢測到什麼衝突,結果都一樣是回退到序列化執行模式,也就是和以太坊區塊打包交易一致的順序序列執行。個人以為這裡定義衝突,解決衝突的思路沒有問題,但是衝突型別的定義和檢測還不夠全面,不過考慮到alpha版本只是一個資料節點,後續這塊肯定有加強。資料儲存Khipu設計了一個基於Kafka Log引擎的資料讀寫引擎Kesque,試圖用類似streaming message queue的方式來重新定義區塊鏈資料儲存。
簡單介紹一下KafkaApache Kafka™是一個分散式流平臺,一般認為流處理平臺(a streaming platform)有以下三個關鍵的能力:1.它允許你釋出和訂閱流記錄(streams of records)。從這個角度上說,它接近於一個訊息佇列或者企業訊息系統。 2.它允許你以容錯的方式儲存流記錄。 3.它允許你即時處理訊息流。Kafka的優勢體現在兩大類應用上: 1.構建實時流資料管道,在應用系統間可靠穩定地獲取資料。 2.構建實時流應用,響應資料流(react to the streams of data)。
Kafka工作模式: 1.Kafka以叢集的方式執行在一臺或多臺伺服器上。2.Kafka儲存流記錄是以topic進行分類的。 3.每條記錄包含一個key、一個value和一個timestamp。為什麼Khipu選擇KafkaKhipu採用Kafka應該是看中了它持久化流式資料的能力以及持久化實現裡面的儲存的組織方式(topic/index等)。這一塊和區塊鏈應用貼合也很緊密,對於以太坊上的資料來說,相當一部分資料,特別是MPT,accounts相關的資料都是追加寫隨機讀,也就是說增改刪查只用到了增和查,基本不改也不刪。而資料需要儲存的型別卻有很多種。所以Khipu定義了很多topic table 用來區分不同的資料,也可以算做儲存並行化的一種手段。建立Topic Table的(部分)程式碼如下:
lazy val kesque = new Kesque(kafkaProps)
log.info(s"Kesque started using config file: $kafkaConfigFile")
private val futureTables = Future.sequence(List(
Future(kesque.getTable(Array(KesqueDataSource.account))),
Future(kesque.getTable(Array(KesqueDataSource.storage))),
Future(kesque.getTable(Array(KesqueDataSource.evmcode))),
Future(kesque.getTimedTable(Array(
KesqueDataSource.header,
KesqueDataSource.body,
KesqueDataSource.receipts
), 1024000))
))
private val List(accountTable, storageTable, evmcodeTable, blockTable) = Await.result(futureTables, Duration.Inf)
//private val headerTable = kesque.getTimedTable(Array(KesqueDataSource.header), 1024000)
//private val bodyTable = kesque.getTable(Array(KesqueDataSource.body), 1024000)
//private val receiptTable = kesque.getTable(Array(KesqueDataSource.receipts), 1024000)

lazy val accountNodeDataSource = new KesqueDataSource(accountTable, KesqueDataSource.account)
lazy val storageNodeDataSource = new KesqueDataSource(storageTable, KesqueDataSource.storage)
lazy val evmCodeDataSource = new KesqueDataSource(evmcodeTable, KesqueDataSource.evmcode)

lazy val blockHeadersDataSource = new KesqueDataSource(blockTable, KesqueDataSource.header)
lazy val blockBodiesDataSource = new KesqueDataSource(blockTable, KesqueDataSource.body)
lazy val receiptsDataSource = new KesqueDataSource(blockTable, KesqueDataSource.receipts)
以上看到的是Khipu用KesqueDataSource儲存的一些topic tables,可以看到主要是賬戶相關的資料,其餘的資料還是會儲存到levelDB裡。另外值得一提的是Khipu在隨機讀上做了一個index offset的hashOffsets map table,透過這個table可以直接訪問到Log File裡面對應的Value,如果沒有從Cache裡面直接得到的話。個人以為這就是它號稱99%隨機讀僅僅需要一次IO的原因。read部分的程式碼如下:
def read(key: Array[Byte], topic: String): Option[TVal] = {
try {
readLock.lock

val valueIndex = topicIndex(topic)
caches(valueIndex).get(Hash(key)) match {
case None =>
val hash = Hash(key)
hashOffsets.get(hash.hashCode, valueIndex) match {
case IntIntsMap.NO_VALUE => None
case offsets =>
var foundValue: Option[TVal] = None
var foundOffset = Int.MinValue
var i = offsets.length - 1 // loop backward to find newest one
while (i >= 0 && foundValue.isEmpty) {
val offset = offsets(i)
val (topicPartition, result) = db.read(topic, offset, fetchMaxBytes).head
val recs = result.info.records.records.iterator
while (recs.hasNext) { // NOTE: the records are offset resversed !!
val rec = recs.next
if (rec.offset == offset && java.util.Arrays.equals(db.getBytes(rec.key), key)) {
foundOffset = offset
foundValue = if (rec.hasValue) Some(TVal(db.getBytes(rec.value), rec.timestamp)) else None
}
}
i -= 1
}

foundValue foreach { x =>
caches(valueIndex).put(hash, (x, foundOffset))
}

foundValue
}
case Some((value, offset)) => Some(value)
}
} finally {
readLock.unlock()
}
}
這裡有個小小的疑問,感覺這個hashOffsets table可能對memory產生一些壓力,大概是因為當前資料量還不夠大,所以Khipu似乎沒有考慮這個table本身可能需要spill到磁碟的問題。總之,Khipu在資料儲存這個方向上的嘗試是非常值得肯定的,雖然目前以太坊的實現上(go版本)單機的leveldb效能和空間都能滿足要求,但是隨著區塊鏈技術進一步發展,必然提高對儲存的要求,那麼區塊鏈技術和大資料(至少儲存方面)框架融合應該會是一個很有潛力的方向。總結Khipu是一個用Scala語言,結合了Scala語言特性,Akka,Kafka等框架實現的以太坊資料節點協議。在交易執行和資料儲存上做出了一些有意義的嘗試,程式碼裡體現的設計和思考,包括Scala語言,Akka/Kafka框架的選擇對其他基於以太坊開發的專案也有一定的啟發。


更多區塊鏈資訊:www.qukuaiwang.com.cn/news

免責聲明:

  1. 本文版權歸原作者所有,僅代表作者本人觀點,不代表鏈報觀點或立場。
  2. 如發現文章、圖片等侵權行爲,侵權責任將由作者本人承擔。
  3. 鏈報僅提供相關項目信息,不構成任何投資建議

推荐阅读

;