精通IPFS:IPFS 儲存內容之中篇

買賣虛擬貨幣

在上一篇文章中,我們分析了儲存檔案/內容的整體流程,基本上知道在這個過程中檔案/內容是怎麼處理的,但是還流下了一個疑問,就是檔案是怎麼分片的,又是怎麼儲存到本地系統,這篇文章我們就來解決這幾個問題。透過上一篇文章,我們知道 ipfs-unixfs-importer 這個類庫,它實現了 IPFS 用於處理檔案的佈局和分塊機制,它的 index.js 檔案內容只有一行程式碼 require('./importer'),接下來我們直接來看這個 importer/index.js 是怎麼處理的。

  1. 把引數傳遞的選項和預設選項進行合併,生成新的選項,然後檢查選項的相關配置。
    const options = Object.assign({}, defaultOptions, _options)
    options.cidVersion = options.cidVersion || 0
    if (options.cidVersion > 0 && _options.rawLeaves === undefined) {
        options.rawLeaves = true
    }
    if (_options && _options.hash !== undefined && _options.rawLeaves === undefined) {
        options.rawLeaves = true
    }
    
    預設選項即 defaultOptions 內容如下:
    const defaultOptions = {
      chunker: 'fixed',
      rawLeaves: false,
      hashOnly: false,
      cidVersion: 0,
      hash: null,
      leafType: 'file',
      hashAlg: 'sha2-256'
    }
    
  2. 根據選項中指定的分割方式,從 IPFS 中提供的所有分割方法找到對應的分割物件。
    const Chunker = chunkers[options.chunker]
    
    chunkers 表示系統提供的所有分割方法物件,在父目錄下 chunker/index.js 檔案中定義的,預設有 fixedrabin 兩種方法,預設使用的是的前者,即固定大小。
  3. 生成一個 pull-through 的雙向流,雙向流的意思就是即可以從它讀取資料,又可以提供資料讓其它流讀取。
      const entry = {
        sink: writable(
          (nodes, callback) => {
            pending += nodes.length
            nodes.forEach((node) => entry.source.push(node))
            setImmediate(callback)
          },
          null,
          1,
          (err) => entry.source.end(err)
        ),
        source: pushable()
      }
    
    source 流是 pull-pushable 類庫提供的一個可以其它流主動 push 的 pull-stream 源流,它提供了一個 push 方法,當呼叫這個方法時,它開始呼叫回撥函式,從而把資料傳遞給後續的 through 或 sink。當時,它還提供了一個 end 方法,當資料讀取完成後,呼叫這個方法。

    sink 流是 pull-write 類庫提供的一個建立通用 pull-stream sinks 流的基礎類。它的簽名如下:(write, reduce, max, cb), 因為它是一個 sinks 流,所以它會讀取前一個流的資料,在讀取到資料之後就呼叫它的 write 方法儲存讀取到的資料,如果資料讀取完成就呼叫它的 cb 方法。

    在這裡 sink 函式從前一個流中讀取資料,然後放入 source 中。同時,source 成為下一個流的讀取函式。

  4. 生成一個 dagStream 物件,這個物件也是一個 {source,sink} 物件。
    const dagStream = DAGBuilder(Chunker, ipld, options)
    
    DAGBuilder 函式定義於父目錄下的 builder/index.js 中,接下來我們看下這個執行過程:
    • 合併選項引數和預設選項
      const options = Object.assign({}, defaultOptions, _options)
      
      預設選項如下:
      const defaultOptions = {
        strategy: 'balanced',
        highWaterMark: 100,
        reduceSingleLeafToSelf: true
      }
      
    • 根據選項指定的 reduce 策略,從系統提供的多個策略中選擇指定的策略。
      const strategyName = options.strategy
        const reducer = reducers[strategyName]
      
      系統定義的的策略如下:
      const reducers = {
        flat: require('./flat'),
        balanced: require('./balanced'),
        trickle: require('./trickle')
      }
      
      在使用者不指定具體策略的預設情況下,根據前面執行過程,最終選定的策略為 balanced
    • 呼叫 Builder 方法建立最終的策略物件。
      const createStrategy = Builder(Chunker, ipld, reducer, options)
      
      Builder 方法位於 builder.js 檔案中,它會建立一個 pull-stream 的 through 流物件。在看它的內部之前,我們首先看下的 4個引數。
      • 第 1 個引數 Chunker/createChunker,它表示具體分割內容的策略,預設情況下為 fixed,詳見第一步中的 defaultOptions 變數內容;
      • 第 2 個引數 ipld/ipld,這個是 IPFS 物件的 _ipld 屬性,在 IPFS 物件建立時生成的,表示星際接續的資料,目前它可以連線比特幣、以太坊、git、zcash 等,在 IPFS 體系中具有非常重要的位置;
      • 第 3 個引數 reducer/createReducer 是具體的 reduce 策略,預設情況為 balanced,詳見第四步中生成 reducer 變數的過程。
      • 第 4 個引數 options/_options 為選項。
      看完引數,接下來,我們看下它的執行邏輯。
      • 合併指定的選項和自身預設的選項。
        const options = extend({}, defaultOptions, _options)
        
        預設選項如下:
        const defaultOptions = {
          chunkerOptions: {
            maxChunkSize: 262144,
            avgChunkSize: 262144
          },
          rawLeaves: false,
          hashAlg: 'sha2-256',
          leafType: 'file',
          cidVersion: 0,
          progress: () => {}
        }
        
      • 返回一個函式物件。
        return function (source) {
            return function (items, cb) {
              parallel(items.map((item) => (cb) => {
                if (!item.content) {
                  return createAndStoreDir(item, (err, node) => {
                    if (err) {
                      return cb(err)
                    }
                    if (node) {
                      source.push(node)
                    }
                    cb()
                  })
                }
                createAndStoreFile(item, (err, node) => {
                  if (err) {
                    return cb(err)
                  }
                  if (node) {
                    source.push(node)
                  }
                  cb()
                })
              }), cb)
            }
        }
        
      返回的這個函式,最終成為了一個 sink 流的 write 方法。
    • 呼叫 createBuildStream 方法,生成一個雙向流物件。
      createBuildStream(createStrategy, ipld, options)
      
      createBuildStream 方法位於 create-build-stream.js 檔案中,程式碼如下:
      const source = pullPushable()
      const sink = pullWrite(
          createStrategy(source),
          null,
          options.highWaterMark,
          (err) => source.end(err)
      )
      return {
      source: source,
      sink: sink
      }
      
      在這段程式碼中,source 流是 pull-pushable 類庫提供的一個可以主動 push 到其它流的 pull-stream 源流,這個類庫在前面我們已經分析過,這裡就直接略過。

      sink 流是 pull-write 類庫提供的一個建立通用 pull-stream sinks 流的基礎類,這個類庫也在前面分析過,這裡也不細講,我們只看下它的 write 方法,這裡的 createStrategy 函式正是呼叫 Builder 方法返回的 createStrategy 函式,用 source 作為引數,呼叫它,用返回的第二層匿名函式作為 write 方法。

  5. 生成一個樹構建器流物件,並返回其雙向流物件。
    const treeBuilder = createTreeBuilder(ipld, options)
    const treeBuilderStream = treeBuilder.stream()
    
    createTreeBuilder 函式位於 tree-builder.js 檔案中,我們來看它的執行邏輯。
    • 首先,合併預設選項物件和指定的選項物件。
      const options = Object.assign({}, defaultOptions, _options)
      
      預設選擇物件如下:
      const defaultOptions = {
        wrap: false,
        shardSplitThreshold: 1000,
        onlyHash: false
      }
      
      onlyHash 表示是否不儲存檔案/內容,只計算其雜湊。
    • 建立一個佇列物件。
      const queue = createQueue(consumeQueue, 1)
      
    • 建立一個雙向流物件
      let stream = createStream()
      
      其中 sink 物件是一個 pull-write 類庫提供的流,這個已經見過多次了,它的 write 方法後面遇到時再來看,source 是一個 pull-pushable 類庫提供的流,這個也見過多次。
    • 建立一個 DirFlat 物件。
      let tree = DirFlat({
          path: '',
          root: true,
          dir: true,
          dirty: false,
          flat: true
      }, options) 
      
    • 返回特權函式構成的物件。
      return {
          flush: flushRoot,
          stream: getStream
      }
      
  6. 建立一個暫停流。這裡什麼也不做。
  7. 呼叫 pull 方法,建立一個完整的流來儲存檔案內容。
    pull(
        entry,
        pausable,
        dagStream,
        map((node) => {
          pending--
          if (!pending) {
            process.nextTick(() => {
              while (waitingPending.length) {
                waitingPending.shift()()
              }
            })
          }
          return node
        }),
        treeBuilderStream
    )
    
    pull 函式是 pull-stream 是類庫中的核心函式,在它的執行過程中,最後的 sink 流透過依次呼叫前面的 through 流,最終從最前面的 source 流中拉取資料,除了最前面的 Source 流和最後面的 Sink 流,中間的都是 through 流,它們即可以被後面的流呼叫以提供資料,也可以呼叫前面的流來讀取資料。

    當 pull 函式在呼叫某個引數從前面讀取資料時,如果當前引數是一個物件(即雙向流)時,那麼就會呼叫它的 sink 方法來讀取,同時用它的 source 方法作為後面引數的讀取方法。

    下面我們分析這段程式碼中的幾個流,它們太重要了。

    首先是 entry 流,它是一個雙向流,它的 sink 函式(型別為 pull-write 流)呼叫前一個流的 read 方法來讀取資料,並把讀取到的資料放在 source 中(型別為 pull-pushable )。

    然後是 dagStream 流,它也是一個雙向流,它的 sink 函式(型別為 pull-write 流)呼叫 entry 流的 source 方法來讀取資料。sink 函式的非同步寫函式引數為 builder.js 中返回的第二層函式,當讀取到資料之後,呼叫 builder.js 中返回的第二層函式進行處理,在第二層函式中,大致流程是把資料儲存自身的 source 中(型別為 pull-pushable )。

    dagStream 在 create-build-stream.js 中生成。為了方便理解,這裡我們再次看下它的程式碼。

    const source = pullPushable()

    const sink = pullWrite( createStrategy(source), null, options.highWaterMark, (err) => source.end(err) )

    return { source: source, sink: sink }

    最後是 treeBuilderStream 流,它也是一個雙向流,它的 sink 函式(型別為 pull-write 流)呼叫 dagStream 流的 source 方法來讀取資料,並把讀取到的資料放在 source 中(型別為 pull-pushable )。

    其他兩個流對流程沒有任何影響,讀者可以自行分析,這裡略過不提。

    在這一步中,透過 pull 函式把最重要的幾個流連線到一起,並透過下面最後一步,把它們與外部的流聯絡到一起。

  8. 最後,返回雙向流物件。
    {
        sink: entry.sink,
        source: treeBuilderStream.source,
        flush: flush
    }
    

到這裡,檔案已經儲存完成了。

啥?檔案已經儲存完成了?什麼都沒看到就把檔案儲存完了,不會騙我們的吧?哈,因為儲存檔案這個動作太複雜了,所以上面只是靜態的從程式碼層面進行梳理,下面我們從頭到尾從動態處理的過程來看下檔案到底是怎麼儲存在本地的。

一切要從我們在上篇寫的這個示例說起

const {createNode} = require('ipfs')

const node = createNode({   libp2p:{     config:{       dht:{         enabled:true       }     }   } })

node.on('ready', async () => {

    const content = `我愛黑螢`;

    const filesAdded = await node.add({       content: Buffer.from(content)     },{       chunkerOptions:{         maxChunkSize:1000,         avgChunkSize:1000       }     })

    console.log('Added file:', filesAdded[0].path, filesAdded[0].hash) })

上面這段程式碼,最終執行的是 core/components/files-regular/add-pull-stream.js 檔案中的函式,它的主體就是下面的這段程式碼:

pull(
    pull.map(content => normalizeContent(content, opts)),
    pull.flatten(),
    importer(self._ipld, opts), 
    pull.asyncMap((file, cb) => prepareFile(file, self, opts, cb)),
    pull.map(file => preloadFile(file, self, opts)),
    pull.asyncMap((file, cb) => pinFile(file, self, opts, cb))
)
為了便於分析理解,我們在分析過程中仍然使用推的方式,從源流推到目的流中,注意這個僅是為了理解方便,真實的過程是目的流從源流中拉取資料。

下面程式碼簡單解釋如下:

  1. 首先,呼叫第一個 pull.map 流,對收到的檔案或內容並進行一些必要的轉換,
  2. 呼叫 pull.flatten 流,把前一步生成的陣列進行扁平化處理。
  3. 呼叫 importer 流來儲存內容。
  4. 呼叫 pull.asyncMap 方法,對已經儲存的檔案/內容進行預處理,生成使用者看到的內容。
  5. 呼叫 pull.map 方法,把已經儲存到本地的檔案預載入到指定節點。
  6. 呼叫 pull.asyncMap 方法,把已經儲存到本地的檔案長期儲存在本地,確保不被垃圾回收。

下面我們重點看下檔案內容在 importer 流中的處理邏輯。

  1. 呼叫 entry.sink 函式從前面的 pull.flatten 流中讀取儲存的每一個檔案/內容。
  2. 呼叫 dagStream.sink 函式從前面的流中讀取資料,並在讀取到資料之後,呼叫 builder.js 中定義的第二層匿名函式進行處理。在這個函式中,呼叫非同步流程庫 async 的 parallel 方法對收到的每個要處理的檔案內容進行處理,具體處理如下:如果儲存的是目錄,那麼呼叫 createAndStoreDir 方法,建立並儲存目錄;如果儲存的是檔案,那麼呼叫 createAndStoreFile 方法,建立並儲存主檔案。因為我們儲存的是檔案,所以在這裡詳細看下 createAndStoreFile 方法,它的過程如下:
    • 如果儲存的內容是 Buffer,那麼呼叫 pull-stream 的 values 方法,生成內容源流。
      if (Buffer.isBuffer(file.content)) {
        file.content = values([file.content])
      }
      
    • 呼叫 createReducer 方法,建立 reducer 物件,預設為 balanced,所以這裡建立的 reducer 物件型別為 balanced/balanced-reducer.js 檔案中定義的函式。
      const reducer = createReducer(reduce(file, ipld, options), options)
      
    • 呼叫 createChunker 方法,建立 chunker 物件,預設為 fixed,所以這裡建立的 chunker 物件型別為 chunker/fixed-size.js 主檔案中定義的函式。
      chunker = createChunker(options.chunkerOptions)
      
    • 呼叫 pull 函式進行儲存檔案。
      • 設定源流為 file.content
      • 呼叫 chunker 流,對儲存的內容進行分塊。
      • 呼叫 paraMap 流(型別為 pull-paramap),對每一個分塊進行處理。
      • 呼叫 pullThrough 流(型別為 pull-through 流),對收到的每個資料進行處理。
      • 呼叫 reducer 流,把所有生成的分塊進行 reduce 處理。如果檔案進行了多次分塊,這裡就會根據生成的分塊生成一個父塊。
      • 呼叫 collect 流,呼叫回撥函式即 createAndStoreFile ,把儲存檔案的結果傳遞到外部函式中。
      這個 pull 函式會進行 IPFS 特有業務,涉及到 IPFS 儲存檔案核心邏輯,這塊我們留在下一篇文章中進行分析。
  3. 呼叫 treeBuilderStream.sink 函式從前面的流中讀取資料,在這裡即為儲存檔案的結果,並在讀取到儲存檔案結果之後,把結果儲存在 source 中。當把儲存檔案的結果儲存到 source 中之後,core/components/files-regular/add-pull-stream.js 檔案中定義的 pull.asyncMap 就可以得到這個結果了。

免責聲明:

  1. 本文版權歸原作者所有,僅代表作者本人觀點,不代表鏈報觀點或立場。
  2. 如發現文章、圖片等侵權行爲,侵權責任將由作者本人承擔。
  3. 鏈報僅提供相關項目信息,不構成任何投資建議

推荐阅读

;