經過前面的分析,我們已經明白了 IPFS 啟動過程,從今天起,我會分析一些常見的命令或動作,希望大家喜歡。
在開始真正分析這些命令/動作之前,先要對 pull-stream 類庫進行簡單介紹,如果不熟悉這個類庫,接下來就沒辦法進行。pull-stream 是一個新型的流庫,資料被從源中拉取到目的中,它有兩種基本型別的流:Source 源和 Sink 接收器。除此之外,有兩種複合型別的流:Through 通道流(比如轉換)和 Duplex 雙向流。
source 流,這類流返回一個匿名函式,這個匿名函式被稱為 read 函式,它被後續的 sink 流函式或 through 流函式呼叫,從而讀取 source 流中的內容。
sink 流,這類流最終都返回內部 drain.js 中的 sink 函式。這類流主要是讀取資料,並且對每一個讀取到的資料進行處理,如果流已經結束,則呼叫使用者指定結束函式進行處理。
through 流,這類流的函式會返回巢狀的匿名函式,第一層函式接收一個 source 流的 read 函式或其他 through 函式返回的第一層函式為引數,第二層函式接收最終 sink 提供的寫函式或其他 through 返回的第二層函式,第二層函式內部呼叫 read 函式,從而直接或間接從 source 中取得資料,獲取資料後直接或間接呼叫 sink 函式,從而把資料寫入到目的地址。
在 pull-streams 中,資料在流動之前,必須有一個完整的管道,這意味著一個源、零個或多個通道、一個接收器。但是仍然可以建立一個部分化的管道,這非常有用。也就是說可以建立一個完整的管道,比如
pull(source, sink) => undefined
,也可以部分化的管道,比如pull(through, sink) => sink
,或者pull(through2, through2) => through
,我們在下面會大量遇到這種部分化的管道。
今天,我們看下第一個最常用的 add
命令/動作,我們使用 IPFS 就是為了把檔案儲存到 IPFS,自然少不了儲存操作,add
命令就是幹這個的,閒話少數,我們來看一段程式碼。
const {createNode} = require('ipfs')
const node = createNode({ libp2p:{ config:{ dht:{ enabled:true } } } })
node.on('ready', async () => {
const content = `我愛黑螢`;
const filesAdded = await node.add({ content: Buffer.from(content) },{ chunkerOptions:{ maxChunkSize:1000, avgChunkSize:1000 } })
console.log('Added file:', filesAdded[0].path, filesAdded[0].hash)
})
這次我們沒有完全使用預設配置,開啟了 DHT,看過我文章的讀者都知道 DHT 是什麼東東,這裡不詳細解釋。在程式中,透過呼叫 IPFS 節點的 add
方法來上傳內容,內容可以是檔案,也可以是直接的內容,兩者有稍微的區別,在講到相關程式碼時,我們指出這種區別的,這裡我們為了簡單直接上傳內容為例來說明。
add
方法位於 core/components/files-regular/add.js
檔案中,在 《精通IPFS:系統啟動之概覽》 那篇文章中,我們說過,系統會把 core/components/files-regular
目錄下的所有檔案擴充套件到 IPFS 物件上面,這其中自然包括這裡的 add.js
檔案。下面,我們直接看這個函式的執行流程。
這個函式返回了一個內部定義的函式,在這個內部定義的函式中對引數做了一些處理,然後就呼叫內部的 add
函式,後者才是主體,它的邏輯如下:
- 首先,檢查選項物件是否為函式,如果是,則重新生成相關的變數。
if (typeof options === 'function') { callback = options options = {} }
- 定義檢測內容的工具函式來檢測我們要上傳的內容。
const isBufferOrStream = obj => Buffer.isBuffer(obj) || isStream.readable(obj) || isSource(obj) const isContentObject = obj => { if (typeof obj !== 'object') return false if (obj.content) return isBufferOrStream(obj.content) return Boolean(obj.path) && typeof obj.path === 'string' }
const isInput = obj => isBufferOrStream(obj) || isContentObject(obj) const ok = isInput(data) || (Array.isArray(data) && data.every(isInput))
if (!ok) { return callback(new Error('invalid input: expected buffer, readable stream, pull stream, object or array of objects')) }
- 接下來,執行 pull-stream 類庫提供的
pull
函式。我們來看pull
函式的主要內容。它的第一個引數是pull.values
函式執行的結果,這個values
函式就是一個 source 流,它返回一個稱為read
的函式來讀取我們提供的資料。這個read
函式從陣列中讀取當前索引位置的值,以此值為引數,呼叫它之後的 through 函式第二層函式內部定義的回撥函式或最終的 sink 函式內部定義的回撥函式。如果陣列已經讀取完成,則直接以 true 為引數進行呼叫。第二個引數是 IPFS 物件的
addPullStream
方法,這個方法也是在啟動時候使用同樣的方法擴充套件到 IPFS 物件,它的主體是當前目錄的add-pull-stream.js
檔案中的函式。接下來,我們會詳細看這個函式,現在我們只需要知道這個函式返回了一個部分化的管道。第三個引數是
pull-sort
中定義的函式,這是一個依賴於pull-stream
的庫,根據一定規則來排序,這個函式我們不用管。最後一個引數是
pull.collect
函式執行的結果,這個collect
函式就是一個 sink 流。它把最終的結果放入一個陣列中,然後呼叫回撥函式。我們在前面程式碼中看到的filesAdded
之所以是一個陣列就是拜這個函式所賜。上面邏輯的程式碼如下:
在上面的程式碼中,我們把要儲存的內容構成一個陣列,具體原因下面解釋。pull( pull.values([data]), self.addPullStream(options), sort((a, b) => { if (a.path < b.path) return 1 if (a.path > b.path) return -1 return 0 }), pull.collect(callback) )
現在,我們來看 addPullStream
方法,這個方法是儲存內容的主體,add
方法是隻開胃小菜。addPullStream
方法執行邏輯如下:
- 呼叫
parseChunkerString
函式,處理內容分塊相關的選項。這個函式位於相同目錄下的utils.js
檔案中,它檢查使用者指定的分塊演算法。如果使用者沒有指定,則使用固定分塊演算法,大小為系統預設的 262144;如果指定了大小,則使用固定分塊演算法,但大小為使用者指定大小;如果指定為rabin
類分割法,即變長分割法,則呼叫內部函式來生成對應的分割選項。上面邏輯程式碼如下:
注意:我們也可以透過重寫這個函式來增加自己的分割演算法。parseChunkerString = (chunker) => { if (!chunker) { return { chunker: 'fixed' } } else if (chunker.startsWith('size-')) { const sizeStr = chunker.split('-')[1] const size = parseInt(sizeStr) if (isNaN(size)) { throw new Error('Chunker parameter size must be an integer') } return { chunker: 'fixed', chunkerOptions: { maxChunkSize: size } } } else if (chunker.startsWith('rabin')) { return { chunker: 'rabin', chunkerOptions: parseRabinString(chunker) } } else { throw new Error(Unrecognized chunker option: ${chunker}) } }
- 合併整理選項變數。
const opts = Object.assign({}, { shardSplitThreshold: self._options.EXPERIMENTAL.sharding ? 1000 : Infinity }, options, chunkerOptions)
- 設定預設的 CID 版本號。如果指定了 Hash 演算法,但是 CID 版本又不是 1,則強制設為 1。CID 是分散式系統的自描述內容定址識別符號,目前有兩個版本 0 和 1,版本 0 是一個向後相容的版本,只支援 sha256 雜湊演算法,並且不能指定。
if (opts.hashAlg && opts.cidVersion !== 1) { opts.cidVersion = 1 }
- 設定進度處理函式,預設空實現。
const prog = opts.progress || noop const progress = (bytes) => { total += bytes prog(total) }
opts.progress = progress
- 用
pull
函式返回一個部分化的 pull-stream 流。這個部分化的 pull-stream 流是處理檔案/內容儲存的關鍵,我們仔細研究下。- 首先呼叫
pull.map
方法對儲存的內容進行處理。pull.map
方法是 pull-stream 流中的一個 source 流,它對陣列中的每個元素使用指定的處理函式進行處理。這就是我們在add
函式中把需要儲存的內容轉化為陣列的原因。在這裡,對每個陣列元素進行處理的函式是normalizeContent
。這個函式定義在同一個檔案中,它首先檢查儲存的內容是否為陣列,如果不是則轉化為陣列;然後,對陣列中的每一個元素進行處理,具體如下:- 如果儲存的內容是 Buffer 物件,則把要儲存的內容轉化為路徑為空字串,內容為 pull-stream 流的物件。
if (Buffer.isBuffer(data)) { data = { path: '', content: pull.values([data]) } }
- 如果儲存的內容是一個 Node.js 可讀流,比如檔案,則把要儲存的轉化為路徑為空字串,內容使用 stream-to-pull-stream 類的
source
方法庫把 Node.js 可讀流轉化為 pull-stream 的 source 流物件。if (isStream.readable(data)) { data = { path: '', content: toPull.source(data) } }
- 如果儲存的內容是 pull-stream 的 source 流,則把要儲存的內容轉化為路徑為空字串,內容不變的物件。
if (isSource(data)) { data = { path: '', content: data } }
- 如果要儲存的內容是一個物件,並且
content
屬性存在,且不是函式,則進行如下處理:if (data && data.content && typeof data.content !== 'function') { if (Buffer.isBuffer(data.content)) { data.content = pull.values([data.content]) }
if (isStream.readable(data.content)) { data.content = toPull.source(data.content) } }
- 如果指定的是路徑,則進行下面的處理。
if (opts.wrapWithDirectory && !data.path) { throw new Error('Must provide a path when wrapping with a directory') }
if (opts.wrapWithDirectory) { data.path = WRAPPER + data.path }
- 返回最終生成的要儲存的內容。
- 如果儲存的內容是 Buffer 物件,則把要儲存的內容轉化為路徑為空字串,內容為 pull-stream 流的物件。
- 呼叫
pull.flatten()
方法,把前上步生成的陣列進行扁平化處理。flatten
方法是一個 through 流,主要是把多個流或陣列流轉換為一個流,比如把多個陣列轉換成一個陣列,比如:
這樣的陣列使用這個方法處理後,最終會變成下面的陣列[ [1, 2, 3], [4, 5, 6], [7, 8, 9] ]
[1, 2, 3, 4, 5, 6, 7, 8, 9]
- 呼叫
importer
函式來儲存內容。這個函式定義在ipfs-unixfs-importer
類庫中,這個類庫是 IPFS 用於處理檔案的佈局和分塊機制的 JavaScript 實現,具體如何儲存內容,如何分塊我們將在下篇文章中進行詳細分析。 - 呼叫
pull.asyncMap
方法,對已經儲存的檔案/內容進行預處理,生成使用者看到的內容。當程式執行到這裡時,我們要儲存的檔案或內容已經儲存在本地 IPFS 倉庫,已經可以用使用cat
、get
、ls
等命令來 API 來檢視我們儲存的內容或檔案了。asyncMap
方法是一個 through 流,類似於map
流,但是有更好的效能。它會對每一個陣列元素進行處理,這裡處理函式為prepareFile
。這個函式定義在同一個檔案中,它的處理具體如下:
- 使用已經生成檔案的
multihash
內容生成 CID 物件。
CID 構造方法會檢查傳入的引數,如果是 CID 物件,則直接從物件中取出版本號、編碼方式、多雜湊等屬性;如果是字串,則又分為是否被 multibase 編碼過,如果是則需要先解碼,然後再分離出各種屬性,如果沒有經過 multibase 編碼,那麼肯定是 base58 字串,則設定版本為0,編碼方式為let cid = new CID(file.multihash)
dag-pb
,再從 base58 串中獲取多雜湊值;如果是緩衝物件,則取得第一個位元組,並按十六進位制轉化成整數,如果第一個位元組是 0或1,則生成各自屬性,否則為多雜湊,則設定版本為0,編碼方式為dag-pb
。 - 如果使用者指定 CID 版本為 1,則生成 CID 物件到版本1.
if (opts.cidVersion === 1) { cid = cid.toV1() }
- 接下來,呼叫
waterfall
方法,順序處理它指定的函式。第一個函式,檢查配置選項是否指定了onlyHash
,即不實際地上傳檔案到IFS網路,僅僅計算一下這個檔案的 HASH,那麼直接呼叫第二個函式,否則,呼叫 IPFS 物件的object.get
方法來獲取指定檔案在倉庫中儲存的節點資訊。這個方法我們後面會專門講解,這裡略去不講。第二個函式,生成最終返回給使用者的物件,這個物件包括了:path、size、hash 等。上面程式碼如下,比較簡單,可自己閱讀。
waterfall([ (cb) => opts.onlyHash ? cb(null, file) : self.object.get(file.multihash, Object.assign({}, opts, { preload: false }), cb), (node, cb) => { const b58Hash = cid.toBaseEncodedString()
let size = node.size
if (Buffer.isBuffer(node)) { size = node.length }
cb(null, { path: opts.wrapWithDirectory ? file.path.substring(WRAPPER.length) : (file.path || b58Hash), hash: b58Hash, size }) }
], callback)
- 使用已經生成檔案的
- 呼叫
pull.map
方法,把已經儲存到本地的檔案預載入到指定節點。map
是一個 through 流,它會對每一個陣列元素進行處理,這裡處理函式為preloadFile
。這個函式定義在同一個檔案中,這會把已經儲存的檔案預載入到指定的節點,具體儲存在哪些節點,可以參考《精通IPFS:系統啟動之概覽》篇中preload.addresses
,也可以手動指定。 - 呼叫
pull.asyncMap
方法,把已經儲存到本地的檔案長期儲存在本地,確保不被垃圾回收。asyncMap
方法是一個 through 流,這裡處理函式為pinFile
。pin 操作後面我們會詳細分析,這裡略過不提,讀者可以自行閱讀相關程式碼。
- 首先呼叫