在上一篇文章中,我們分析了儲存檔案/內容的整體流程,基本上知道在這個過程中檔案/內容是怎麼處理的,但是還流下了一個疑問,就是檔案是怎麼分片的,又是怎麼儲存到本地系統,這篇文章我們就來解決這幾個問題。透過上一篇文章,我們知道 ipfs-unixfs-importer
這個類庫,它實現了 IPFS 用於處理檔案的佈局和分塊機制,它的 index.js
檔案內容只有一行程式碼 require('./importer')
,接下來我們直接來看這個 importer/index.js
是怎麼處理的。
- 把引數傳遞的選項和預設選項進行合併,生成新的選項,然後檢查選項的相關配置。
預設選項即const options = Object.assign({}, defaultOptions, _options) options.cidVersion = options.cidVersion || 0 if (options.cidVersion > 0 && _options.rawLeaves === undefined) { options.rawLeaves = true } if (_options && _options.hash !== undefined && _options.rawLeaves === undefined) { options.rawLeaves = true }
defaultOptions
內容如下:const defaultOptions = { chunker: 'fixed', rawLeaves: false, hashOnly: false, cidVersion: 0, hash: null, leafType: 'file', hashAlg: 'sha2-256' }
- 根據選項中指定的分割方式,從 IPFS 中提供的所有分割方法找到對應的分割物件。
const Chunker = chunkers[options.chunker]
chunkers
表示系統提供的所有分割方法物件,在父目錄下chunker/index.js
檔案中定義的,預設有fixed
、rabin
兩種方法,預設使用的是的前者,即固定大小。 - 生成一個 pull-through 的雙向流,雙向流的意思就是即可以從它讀取資料,又可以提供資料讓其它流讀取。
const entry = { sink: writable( (nodes, callback) => { pending += nodes.length nodes.forEach((node) => entry.source.push(node)) setImmediate(callback) }, null, 1, (err) => entry.source.end(err) ), source: pushable() }
source
流是 pull-pushable 類庫提供的一個可以其它流主動 push 的 pull-stream 源流,它提供了一個push
方法,當呼叫這個方法時,它開始呼叫回撥函式,從而把資料傳遞給後續的 through 或 sink。當時,它還提供了一個end
方法,當資料讀取完成後,呼叫這個方法。sink
流是 pull-write 類庫提供的一個建立通用 pull-streamsinks
流的基礎類。它的簽名如下:(write, reduce, max, cb)
, 因為它是一個sinks
流,所以它會讀取前一個流的資料,在讀取到資料之後就呼叫它的write
方法儲存讀取到的資料,如果資料讀取完成就呼叫它的cb
方法。在這裡
sink
函式從前一個流中讀取資料,然後放入source
中。同時,source
成為下一個流的讀取函式。 - 生成一個
dagStream
物件,這個物件也是一個{source,sink}
物件。const dagStream = DAGBuilder(Chunker, ipld, options)
DAGBuilder
函式定義於父目錄下的builder/index.js
中,接下來我們看下這個執行過程:- 合併選項引數和預設選項
預設選項如下:const options = Object.assign({}, defaultOptions, _options)
const defaultOptions = { strategy: 'balanced', highWaterMark: 100, reduceSingleLeafToSelf: true }
- 根據選項指定的 reduce 策略,從系統提供的多個策略中選擇指定的策略。
系統定義的的策略如下:const strategyName = options.strategy const reducer = reducers[strategyName]
在使用者不指定具體策略的預設情況下,根據前面執行過程,最終選定的策略為const reducers = { flat: require('./flat'), balanced: require('./balanced'), trickle: require('./trickle') }
balanced
。 - 呼叫
Builder
方法建立最終的策略物件。const createStrategy = Builder(Chunker, ipld, reducer, options)
Builder
方法位於builder.js
檔案中,它會建立一個 pull-stream 的 through 流物件。在看它的內部之前,我們首先看下的 4個引數。- 第 1 個引數
Chunker/createChunker
,它表示具體分割內容的策略,預設情況下為fixed
,詳見第一步中的defaultOptions
變數內容; - 第 2 個引數
ipld/ipld
,這個是 IPFS 物件的_ipld
屬性,在 IPFS 物件建立時生成的,表示星際接續的資料,目前它可以連線比特幣、以太坊、git、zcash 等,在 IPFS 體系中具有非常重要的位置; - 第 3 個引數
reducer/createReducer
是具體的 reduce 策略,預設情況為balanced
,詳見第四步中生成reducer
變數的過程。 - 第 4 個引數
options/_options
為選項。
- 合併指定的選項和自身預設的選項。
預設選項如下:const options = extend({}, defaultOptions, _options)
const defaultOptions = { chunkerOptions: { maxChunkSize: 262144, avgChunkSize: 262144 }, rawLeaves: false, hashAlg: 'sha2-256', leafType: 'file', cidVersion: 0, progress: () => {} }
- 返回一個函式物件。
return function (source) { return function (items, cb) { parallel(items.map((item) => (cb) => { if (!item.content) { return createAndStoreDir(item, (err, node) => { if (err) { return cb(err) } if (node) { source.push(node) } cb() }) } createAndStoreFile(item, (err, node) => { if (err) { return cb(err) } if (node) { source.push(node) } cb() }) }), cb) } }
sink
流的write
方法。 - 第 1 個引數
- 呼叫
createBuildStream
方法,生成一個雙向流物件。createBuildStream(createStrategy, ipld, options)
createBuildStream
方法位於create-build-stream.js
檔案中,程式碼如下:
在這段程式碼中,const source = pullPushable() const sink = pullWrite( createStrategy(source), null, options.highWaterMark, (err) => source.end(err) ) return { source: source, sink: sink }
source
流是 pull-pushable 類庫提供的一個可以主動 push 到其它流的 pull-stream 源流,這個類庫在前面我們已經分析過,這裡就直接略過。sink
流是 pull-write 類庫提供的一個建立通用 pull-streamsinks
流的基礎類,這個類庫也在前面分析過,這裡也不細講,我們只看下它的write
方法,這裡的createStrategy
函式正是呼叫Builder
方法返回的createStrategy
函式,用source
作為引數,呼叫它,用返回的第二層匿名函式作為write
方法。
- 合併選項引數和預設選項
- 生成一個樹構建器流物件,並返回其雙向流物件。
const treeBuilder = createTreeBuilder(ipld, options) const treeBuilderStream = treeBuilder.stream()
createTreeBuilder
函式位於tree-builder.js
檔案中,我們來看它的執行邏輯。- 首先,合併預設選項物件和指定的選項物件。
預設選擇物件如下:const options = Object.assign({}, defaultOptions, _options)
const defaultOptions = { wrap: false, shardSplitThreshold: 1000, onlyHash: false }
onlyHash
表示是否不儲存檔案/內容,只計算其雜湊。 - 建立一個佇列物件。
const queue = createQueue(consumeQueue, 1)
- 建立一個雙向流物件
其中let stream = createStream()
sink
物件是一個pull-write
類庫提供的流,這個已經見過多次了,它的write
方法後面遇到時再來看,source
是一個pull-pushable
類庫提供的流,這個也見過多次。 - 建立一個
DirFlat
物件。let tree = DirFlat({ path: '', root: true, dir: true, dirty: false, flat: true }, options)
- 返回特權函式構成的物件。
return { flush: flushRoot, stream: getStream }
- 首先,合併預設選項物件和指定的選項物件。
- 建立一個暫停流。這裡什麼也不做。
- 呼叫
pull
方法,建立一個完整的流來儲存檔案內容。pull( entry, pausable, dagStream, map((node) => { pending-- if (!pending) { process.nextTick(() => { while (waitingPending.length) { waitingPending.shift()() } }) } return node }), treeBuilderStream )
下面我們分析這段程式碼中的幾個流,它們太重要了。pull
函式是 pull-stream 是類庫中的核心函式,在它的執行過程中,最後的 sink 流透過依次呼叫前面的 through 流,最終從最前面的 source 流中拉取資料,除了最前面的 Source 流和最後面的 Sink 流,中間的都是 through 流,它們即可以被後面的流呼叫以提供資料,也可以呼叫前面的流來讀取資料。當
pull
函式在呼叫某個引數從前面讀取資料時,如果當前引數是一個物件(即雙向流)時,那麼就會呼叫它的sink
方法來讀取,同時用它的source
方法作為後面引數的讀取方法。首先是
entry
流,它是一個雙向流,它的sink
函式(型別為pull-write
流)呼叫前一個流的read
方法來讀取資料,並把讀取到的資料放在source
中(型別為 pull-pushable )。然後是
dagStream
流,它也是一個雙向流,它的sink
函式(型別為pull-write
流)呼叫entry
流的source
方法來讀取資料。sink
函式的非同步寫函式引數為builder.js
中返回的第二層函式,當讀取到資料之後,呼叫builder.js
中返回的第二層函式進行處理,在第二層函式中,大致流程是把資料儲存自身的source
中(型別為 pull-pushable )。dagStream 在
最後是create-build-stream.js
中生成。為了方便理解,這裡我們再次看下它的程式碼。const source = pullPushable()
const sink = pullWrite( createStrategy(source), null, options.highWaterMark, (err) => source.end(err) )
return { source: source, sink: sink }
treeBuilderStream
流,它也是一個雙向流,它的sink
函式(型別為pull-write
流)呼叫dagStream
流的source
方法來讀取資料,並把讀取到的資料放在source
中(型別為 pull-pushable )。其他兩個流對流程沒有任何影響,讀者可以自行分析,這裡略過不提。
在這一步中,透過
pull
函式把最重要的幾個流連線到一起,並透過下面最後一步,把它們與外部的流聯絡到一起。 - 最後,返回雙向流物件。
{ sink: entry.sink, source: treeBuilderStream.source, flush: flush }
到這裡,檔案已經儲存完成了。
啥?檔案已經儲存完成了?什麼都沒看到就把檔案儲存完了,不會騙我們的吧?哈,因為儲存檔案這個動作太複雜了,所以上面只是靜態的從程式碼層面進行梳理,下面我們從頭到尾從動態處理的過程來看下檔案到底是怎麼儲存在本地的。
一切要從我們在上篇寫的這個示例說起
const {createNode} = require('ipfs')
const node = createNode({ libp2p:{ config:{ dht:{ enabled:true } } } })
node.on('ready', async () => {
const content = `我愛黑螢`;
const filesAdded = await node.add({ content: Buffer.from(content) },{ chunkerOptions:{ maxChunkSize:1000, avgChunkSize:1000 } })
console.log('Added file:', filesAdded[0].path, filesAdded[0].hash)
})
上面這段程式碼,最終執行的是 core/components/files-regular/add-pull-stream.js
檔案中的函式,它的主體就是下面的這段程式碼:
pull(
pull.map(content => normalizeContent(content, opts)),
pull.flatten(),
importer(self._ipld, opts),
pull.asyncMap((file, cb) => prepareFile(file, self, opts, cb)),
pull.map(file => preloadFile(file, self, opts)),
pull.asyncMap((file, cb) => pinFile(file, self, opts, cb))
)
為了便於分析理解,我們在分析過程中仍然使用推的方式,從源流推到目的流中,注意這個僅是為了理解方便,真實的過程是目的流從源流中拉取資料。
下面程式碼簡單解釋如下:
- 首先,呼叫第一個
pull.map
流,對收到的檔案或內容並進行一些必要的轉換, - 呼叫
pull.flatten
流,把前一步生成的陣列進行扁平化處理。 - 呼叫
importer
流來儲存內容。 - 呼叫
pull.asyncMap
方法,對已經儲存的檔案/內容進行預處理,生成使用者看到的內容。 - 呼叫
pull.map
方法,把已經儲存到本地的檔案預載入到指定節點。 - 呼叫
pull.asyncMap
方法,把已經儲存到本地的檔案長期儲存在本地,確保不被垃圾回收。
下面我們重點看下檔案內容在 importer
流中的處理邏輯。
- 呼叫
entry.sink
函式從前面的pull.flatten
流中讀取儲存的每一個檔案/內容。 - 呼叫
dagStream.sink
函式從前面的流中讀取資料,並在讀取到資料之後,呼叫builder.js
中定義的第二層匿名函式進行處理。在這個函式中,呼叫非同步流程庫async
的parallel
方法對收到的每個要處理的檔案內容進行處理,具體處理如下:如果儲存的是目錄,那麼呼叫createAndStoreDir
方法,建立並儲存目錄;如果儲存的是檔案,那麼呼叫createAndStoreFile
方法,建立並儲存主檔案。因為我們儲存的是檔案,所以在這裡詳細看下createAndStoreFile
方法,它的過程如下:- 如果儲存的內容是 Buffer,那麼呼叫 pull-stream 的
values
方法,生成內容源流。if (Buffer.isBuffer(file.content)) { file.content = values([file.content]) }
- 呼叫
createReducer
方法,建立 reducer 物件,預設為balanced
,所以這裡建立的 reducer 物件型別為balanced/balanced-reducer.js
檔案中定義的函式。const reducer = createReducer(reduce(file, ipld, options), options)
- 呼叫
createChunker
方法,建立 chunker 物件,預設為fixed
,所以這裡建立的 chunker 物件型別為chunker/fixed-size.js
主檔案中定義的函式。chunker = createChunker(options.chunkerOptions)
- 呼叫
pull
函式進行儲存檔案。- 設定源流為
file.content
。 - 呼叫
chunker
流,對儲存的內容進行分塊。 - 呼叫
paraMap
流(型別為 pull-paramap),對每一個分塊進行處理。 - 呼叫
pullThrough
流(型別為 pull-through 流),對收到的每個資料進行處理。 - 呼叫
reducer
流,把所有生成的分塊進行 reduce 處理。如果檔案進行了多次分塊,這裡就會根據生成的分塊生成一個父塊。 - 呼叫
collect
流,呼叫回撥函式即createAndStoreFile
,把儲存檔案的結果傳遞到外部函式中。
pull
函式會進行 IPFS 特有業務,涉及到 IPFS 儲存檔案核心邏輯,這塊我們留在下一篇文章中進行分析。 - 設定源流為
- 如果儲存的內容是 Buffer,那麼呼叫 pull-stream 的
- 呼叫
treeBuilderStream.sink
函式從前面的流中讀取資料,在這裡即為儲存檔案的結果,並在讀取到儲存檔案結果之後,把結果儲存在source
中。當把儲存檔案的結果儲存到source
中之後,core/components/files-regular/add-pull-stream.js
檔案中定義的pull.asyncMap
就可以得到這個結果了。