精通IPFS:IPFS 儲存內容之上篇

買賣虛擬貨幣

經過前面的分析,我們已經明白了 IPFS 啟動過程,從今天起,我會分析一些常見的命令或動作,希望大家喜歡。

在開始真正分析這些命令/動作之前,先要對 pull-stream 類庫進行簡單介紹,如果不熟悉這個類庫,接下來就沒辦法進行。

pull-stream 是一個新型的流庫,資料被從源中拉取到目的中,它有兩種基本型別的流:Source 源和 Sink 接收器。除此之外,有兩種複合型別的流:Through 通道流(比如轉換)和 Duplex 雙向流。

source 流,這類流返回一個匿名函式,這個匿名函式被稱為 read 函式,它被後續的 sink 流函式或 through 流函式呼叫,從而讀取 source 流中的內容。

sink 流,這類流最終都返回內部 drain.js 中的 sink 函式。這類流主要是讀取資料,並且對每一個讀取到的資料進行處理,如果流已經結束,則呼叫使用者指定結束函式進行處理。

through 流,這類流的函式會返回巢狀的匿名函式,第一層函式接收一個 source 流的 read 函式或其他 through 函式返回的第一層函式為引數,第二層函式接收最終 sink 提供的寫函式或其他 through 返回的第二層函式,第二層函式內部呼叫 read 函式,從而直接或間接從 source 中取得資料,獲取資料後直接或間接呼叫 sink 函式,從而把資料寫入到目的地址。

在 pull-streams 中,資料在流動之前,必須有一個完整的管道,這意味著一個源、零個或多個通道、一個接收器。但是仍然可以建立一個部分化的管道,這非常有用。也就是說可以建立一個完整的管道,比如 pull(source, sink) => undefined,也可以部分化的管道,比如 pull(through, sink) => sink,或者 pull(through2, through2) => through,我們在下面會大量遇到這種部分化的管道。

今天,我們看下第一個最常用的 add 命令/動作,我們使用 IPFS 就是為了把檔案儲存到 IPFS,自然少不了儲存操作,add 命令就是幹這個的,閒話少數,我們來看一段程式碼。

const {createNode} = require('ipfs')

const node = createNode({   libp2p:{     config:{       dht:{         enabled:true       }     }   } })

node.on('ready', async () => {

    const content = `我愛黑螢`;

    const filesAdded = await node.add({       content: Buffer.from(content)     },{       chunkerOptions:{         maxChunkSize:1000,         avgChunkSize:1000       }     })

    console.log('Added file:', filesAdded[0].path, filesAdded[0].hash) })

這次我們沒有完全使用預設配置,開啟了 DHT,看過我文章的讀者都知道 DHT 是什麼東東,這裡不詳細解釋。在程式中,透過呼叫 IPFS 節點的 add 方法來上傳內容,內容可以是檔案,也可以是直接的內容,兩者有稍微的區別,在講到相關程式碼時,我們指出這種區別的,這裡我們為了簡單直接上傳內容為例來說明。

add 方法位於 core/components/files-regular/add.js 檔案中,在 《精通IPFS:系統啟動之概覽》 那篇文章中,我們說過,系統會把 core/components/files-regular 目錄下的所有檔案擴充套件到 IPFS 物件上面,這其中自然包括這裡的 add.js 檔案。下面,我們直接看這個函式的執行流程。

這個函式返回了一個內部定義的函式,在這個內部定義的函式中對引數做了一些處理,然後就呼叫內部的 add函式,後者才是主體,它的邏輯如下:

  1. 首先,檢查選項物件是否為函式,如果是,則重新生成相關的變數。
    if (typeof options === 'function') {
      callback = options
      options = {}
    }
    
  2. 定義檢測內容的工具函式來檢測我們要上傳的內容。
    const isBufferOrStream = obj => Buffer.isBuffer(obj) || isStream.readable(obj) || isSource(obj)
    const isContentObject = obj => {
      if (typeof obj !== 'object') return false
      if (obj.content) return isBufferOrStream(obj.content)
      return Boolean(obj.path) && typeof obj.path === 'string'
    }

    const isInput = obj => isBufferOrStream(obj) || isContentObject(obj) const ok = isInput(data) || (Array.isArray(data) && data.every(isInput))

    if (!ok) {   return callback(new Error('invalid input: expected buffer, readable stream, pull stream, object or array of objects')) }

  3. 接下來,執行 pull-stream 類庫提供的 pull 函式。我們來看 pull 函式的主要內容。它的第一個引數是 pull.values 函式執行的結果,這個 values 函式就是一個 source 流,它返回一個稱為 read 的函式來讀取我們提供的資料。這個 read 函式從陣列中讀取當前索引位置的值,以此值為引數,呼叫它之後的 through 函式第二層函式內部定義的回撥函式或最終的 sink 函式內部定義的回撥函式。如果陣列已經讀取完成,則直接以 true 為引數進行呼叫。

    第二個引數是 IPFS 物件的 addPullStream 方法,這個方法也是在啟動時候使用同樣的方法擴充套件到 IPFS 物件,它的主體是當前目錄的 add-pull-stream.js 檔案中的函式。接下來,我們會詳細看這個函式,現在我們只需要知道這個函式返回了一個部分化的管道。

    第三個引數是 pull-sort 中定義的函式,這是一個依賴於 pull-stream 的庫,根據一定規則來排序,這個函式我們不用管。

    最後一個引數是 pull.collect 函式執行的結果,這個 collect 函式就是一個 sink 流。它把最終的結果放入一個陣列中,然後呼叫回撥函式。我們在前面程式碼中看到的 filesAdded 之所以是一個陣列就是拜這個函式所賜。

    上面邏輯的程式碼如下:

    pull(
      pull.values([data]),
      self.addPullStream(options),
      sort((a, b) => {
        if (a.path < b.path) return 1
        if (a.path > b.path) return -1
        return 0
      }),
      pull.collect(callback)
    )
    
    在上面的程式碼中,我們把要儲存的內容構成一個陣列,具體原因下面解釋。

現在,我們來看 addPullStream 方法,這個方法是儲存內容的主體,add 方法是隻開胃小菜。addPullStream方法執行邏輯如下:

  1. 呼叫 parseChunkerString 函式,處理內容分塊相關的選項。這個函式位於相同目錄下的 utils.js 檔案中,它檢查使用者指定的分塊演算法。如果使用者沒有指定,則使用固定分塊演算法,大小為系統預設的 262144;如果指定了大小,則使用固定分塊演算法,但大小為使用者指定大小;如果指定為 rabin 類分割法,即變長分割法,則呼叫內部函式來生成對應的分割選項。上面邏輯程式碼如下:
    parseChunkerString = (chunker) => {
      if (!chunker) {
        return {
          chunker: 'fixed'
        }
      } else if (chunker.startsWith('size-')) {
        const sizeStr = chunker.split('-')[1]
        const size = parseInt(sizeStr)
        if (isNaN(size)) {
          throw new Error('Chunker parameter size must be an integer')
        }
        return {
          chunker: 'fixed',
          chunkerOptions: {
            maxChunkSize: size
          }
        }
      } else if (chunker.startsWith('rabin')) {
        return {
          chunker: 'rabin',
          chunkerOptions: parseRabinString(chunker)
        }
      } else {
        throw new Error(Unrecognized chunker option: ${chunker})   } } 
    注意:我們也可以透過重寫這個函式來增加自己的分割演算法。
  2. 合併整理選項變數。
    const opts = Object.assign({}, {
      shardSplitThreshold: self._options.EXPERIMENTAL.sharding
        ? 1000
        : Infinity
    }, options, chunkerOptions)
    
  3. 設定預設的 CID 版本號。如果指定了 Hash 演算法,但是 CID 版本又不是 1,則強制設為 1。CID 是分散式系統的自描述內容定址識別符號,目前有兩個版本 0 和 1,版本 0 是一個向後相容的版本,只支援 sha256 雜湊演算法,並且不能指定。
    if (opts.hashAlg && opts.cidVersion !== 1) {
      opts.cidVersion = 1
    }
    
  4. 設定進度處理函式,預設空實現。
    const prog = opts.progress || noop
    const progress = (bytes) => {
      total += bytes
      prog(total)
    }

    opts.progress = progress

  5. 用 pull 函式返回一個部分化的 pull-stream 流。這個部分化的 pull-stream 流是處理檔案/內容儲存的關鍵,我們仔細研究下。
    1. 首先呼叫 pull.map 方法對儲存的內容進行處理。pull.map 方法是 pull-stream 流中的一個 source 流,它對陣列中的每個元素使用指定的處理函式進行處理。這就是我們在 add 函式中把需要儲存的內容轉化為陣列的原因。在這裡,對每個陣列元素進行處理的函式是 normalizeContent。這個函式定義在同一個檔案中,它首先檢查儲存的內容是否為陣列,如果不是則轉化為陣列;然後,對陣列中的每一個元素進行處理,具體如下:
      • 如果儲存的內容是 Buffer 物件,則把要儲存的內容轉化為路徑為空字串,內容為 pull-stream 流的物件。
        if (Buffer.isBuffer(data)) {
          data = { path: '', content: pull.values([data]) }
        }
        
      • 如果儲存的內容是一個 Node.js 可讀流,比如檔案,則把要儲存的轉化為路徑為空字串,內容使用 stream-to-pull-stream 類的 source 方法庫把 Node.js 可讀流轉化為 pull-stream 的 source 流物件。
        if (isStream.readable(data)) {
          data = { path: '', content: toPull.source(data) }
        }
        
      • 如果儲存的內容是 pull-stream 的 source 流,則把要儲存的內容轉化為路徑為空字串,內容不變的物件。
        if (isSource(data)) {
          data = { path: '', content: data }
        }
        
      • 如果要儲存的內容是一個物件,並且 content 屬性存在,且不是函式,則進行如下處理:
        if (data && data.content && typeof data.content !== 'function') {
          if (Buffer.isBuffer(data.content)) {
            data.content = pull.values([data.content])
          }

          if (isStream.readable(data.content)) {     data.content = toPull.source(data.content)   } }

      • 如果指定的是路徑,則進行下面的處理。
        if (opts.wrapWithDirectory && !data.path) {
          throw new Error('Must provide a path when wrapping with a directory')
        }

        if (opts.wrapWithDirectory) {   data.path = WRAPPER + data.path }

      • 返回最終生成的要儲存的內容。
    2. 呼叫 pull.flatten() 方法,把前上步生成的陣列進行扁平化處理。flatten 方法是一個 through 流,主要是把多個流或陣列流轉換為一個流,比如把多個陣列轉換成一個陣列,比如:
      [
        [1, 2, 3],
        [4, 5, 6],
        [7, 8, 9]
      ]
      
      這樣的陣列使用這個方法處理後,最終會變成下面的陣列
      [1, 2, 3, 4, 5, 6, 7, 8, 9]
      
    3. 呼叫 importer 函式來儲存內容。這個函式定義在 ipfs-unixfs-importer 類庫中,這個類庫是 IPFS 用於處理檔案的佈局和分塊機制的 JavaScript 實現,具體如何儲存內容,如何分塊我們將在下篇文章中進行詳細分析。
    4. 呼叫 pull.asyncMap 方法,對已經儲存的檔案/內容進行預處理,生成使用者看到的內容。當程式執行到這裡時,我們要儲存的檔案或內容已經儲存在本地 IPFS 倉庫,已經可以用使用 catgetls等命令來 API 來檢視我們儲存的內容或檔案了。asyncMap 方法是一個 through 流,類似於 map流,但是有更好的效能。它會對每一個陣列元素進行處理,這裡處理函式為 prepareFile

      這個函式定義在同一個檔案中,它的處理具體如下:

      • 使用已經生成檔案的 multihash 內容生成 CID 物件。
        let cid = new CID(file.multihash)
        
        CID 構造方法會檢查傳入的引數,如果是 CID 物件,則直接從物件中取出版本號、編碼方式、多雜湊等屬性;如果是字串,則又分為是否被 multibase 編碼過,如果是則需要先解碼,然後再分離出各種屬性,如果沒有經過 multibase 編碼,那麼肯定是 base58 字串,則設定版本為0,編碼方式為 dag-pb,再從 base58 串中獲取多雜湊值;如果是緩衝物件,則取得第一個位元組,並按十六進位制轉化成整數,如果第一個位元組是 0或1,則生成各自屬性,否則為多雜湊,則設定版本為0,編碼方式為 dag-pb
      • 如果使用者指定 CID 版本為 1,則生成 CID 物件到版本1.
        if (opts.cidVersion === 1) {
            cid = cid.toV1()
        }
        
      • 接下來,呼叫 waterfall 方法,順序處理它指定的函式。第一個函式,檢查配置選項是否指定了 onlyHash,即不實際地上傳檔案到IFS網路,僅僅計算一下這個檔案的 HASH,那麼直接呼叫第二個函式,否則,呼叫 IPFS 物件的 object.get 方法來獲取指定檔案在倉庫中儲存的節點資訊。這個方法我們後面會專門講解,這裡略去不講。第二個函式,生成最終返回給使用者的物件,這個物件包括了:path、size、hash 等。

        上面程式碼如下,比較簡單,可自己閱讀。

          waterfall([
            (cb) => opts.onlyHash
              ? cb(null, file)
              : self.object.get(file.multihash, Object.assign({}, opts, { preload: false }), cb),
            (node, cb) => {
              const b58Hash = cid.toBaseEncodedString()
          let size = node.size

          if (Buffer.isBuffer(node)) {     size = node.length   }

          cb(null, {     path: opts.wrapWithDirectory       ? file.path.substring(WRAPPER.length)       : (file.path || b58Hash),     hash: b58Hash,     size   }) }

          ], callback)
        
    5. 呼叫 pull.map 方法,把已經儲存到本地的檔案預載入到指定節點。map 是一個 through 流,它會對每一個陣列元素進行處理,這裡處理函式為 preloadFile。這個函式定義在同一個檔案中,這會把已經儲存的檔案預載入到指定的節點,具體儲存在哪些節點,可以參考《精通IPFS:系統啟動之概覽》篇中 preload.addresses,也可以手動指定。
    6. 呼叫 pull.asyncMap 方法,把已經儲存到本地的檔案長期儲存在本地,確保不被垃圾回收。asyncMap 方法是一個 through 流,這裡處理函式為 pinFile。pin 操作後面我們會詳細分析,這裡略過不提,讀者可以自行閱讀相關程式碼。

免責聲明:

  1. 本文版權歸原作者所有,僅代表作者本人觀點,不代表鏈報觀點或立場。
  2. 如發現文章、圖片等侵權行爲,侵權責任將由作者本人承擔。
  3. 鏈報僅提供相關項目信息,不構成任何投資建議

推荐阅读

;