const node = createNode() node.on('ready', async () => {
const file = await node.get('QmW2WQi7j6c7UgJTarActp7tDNikE4B2qXtFCfLPdsgaTQ/cat.jpg') fs.writeFile('cat.jpg',file[0].content,(err) => { if (err) throw err;
console.log('cat saved!');
});
})
欣赏完喵星人,接下来就看代码是如何执行的。上面代码的主体是 options = options || {} pull(
self.getPullStream(ipfsPath, options),
pull.asyncMap((file, cb) => {
if (file.content) {
pull(
file.content,
pull.collect((err, buffers) => {
if (err) { return cb(err) }
file.content = Buffer.concat(buffers)
cb(null, file)
})
)
} else {
cb(null, file)
}
}),
pull.collect(callback)
)
}
QmW2WQi7j6c7UgJTarActp7tDNikE4B2qXtFCfLPdsgaTQ
,下载它的示例代码如下:
运行这段代码,就会在当前目录下生成喵星人,让我们先来欣赏下美丽的喵星人吧!const {createNode} = require('ipfs')
const fs = require('fs');
get
方法,这个方法位于 IPFS 的 core/components/files-regular/get.js
文件,它的内容如下:
这个匿名函数接收我们传递给它的图片路径,通过 pull-stream 类库的 (ipfsPath, options, callback) => {
if (typeof options === 'function') {
callback = options
options = {}
}
pull
函数,调用 IPFS 对象的 getPullStream
方法 获取文件,并在获取到文件之后,调用 pull.asyncMap
流对获取到的文件进行处理,最后把最终的文件传递给 pull.collect
流进行处理,后者直接调用我们提供的回调函数把最终的文件交给用户来处理。
通过上面的简单分析,我们可以发现从 IPFS 网络中获取文件主要是通过 IPFS 对象的 getPullStream
方法,而这个方法是在创建 IPFS 对象的过程中在 core/index.js
文件中被注册为 IPFS 对象的一个方法,它的主要内容就是返回一个 pull-stream 类库的流,代码如下:
pull(
exporter(ipfsPath, self._ipld, options),
pull.map(file => {
file.hash = file.cid.toString()
delete file.cid
return file
})
)
上面的代码定义在 core/components/files-regular/get-pull-stream.js
文件,主体也是 pull-stream 类库的 pull
函数,第一个参数是调用 ipfs-unixfs-exporter
类库函数返回的内部流,这个内部流会从 IPFS 网络中获取我们想要的文件,第二个参数是 pull-stream 类库的 map
流,它根据获取到的文件对象的 CID 生成文件的哈希。接下来,我们开始看下 exporter
函数,它的代码位于 ipfs-unixfs-exporter 类库的 index.js
文件中。它的执行逻辑如下:
- 对传递进来的路径进行处理,返回一个对象。返回的对象中包含了基本路径和中间路径。
这里我们传递进来的路径是let dPath try { dPath = pathBaseAndRest(path) } catch (err) { return error(err) }
QmW2WQi7j6c7UgJTarActp7tDNikE4B2qXtFCfLPdsgaTQ/cat.jpg
,所以dPath
对象的基本路径就是QmW2WQi7j6c7UgJTarActp7tDNikE4B2qXtFCfLPdsgaTQ
,中间路径的数组为空。 - 获取不包括最终名称在内的路径长度
const pathLengthToCut = join([dPath.base].concat(dPath.rest.slice(0, dPath.rest.length - 1))).length
- 根据基本路径生成 CID 对象。
CID 对象包括四个部分:multibase、版本号、multicodec、multihash。当我们传递const cid = new CID(dPath.base)
QmW2WQi7j6c7UgJTarActp7tDNikE4B2qXtFCfLPdsgaTQ
到 CID 构造函数中时,函数内部会设置版本号为 0,multicodec 为dag-pb
,同时调用多重哈希函数把 Base58 字符串转化为 multihash。 - 最后,使用
pull
函数返回 pull-stream 类库中使用的流。
返回的这个流被外面return pull( values([{ cid, name: dPath.base, path: dPath.base, pathRest: dPath.rest, depth: 0 }]), createResolver(dag, options), filter(Boolean), map((node) => { return { depth: node.depth, name: node.name, path: options.fullPath ? node.path : finalPathFor(node), size: node.size, cid: node.cid, content: node.content, type: node.type } }) )
get-pull-stream.js
文件中的pull
函数所调用,从而从 IPFS 网络中获取指定的文件。在上面的代码简单解释如下:
values
函数是 pull-stream 类库中定义的source
流,它会创建一个从数组或对象读取值,然后终止的源流,这里我们用传递进来的路径及其生成 CID 生成一个对象,并生成一个数组来做为流的内容。createResolver
函数是当前目录下resolve.js
文件中的createResolver
函数,它接收 dag 对象,并返回一个 pull-stream 流对象,这里 dag 对象是 IPFS 对象中的_ipld
对象。这里返回的流对象,从前面一个流中读取要获取的文件,然后调用 dag 对象的get
方法来获取指定的文件,具体处理下面进行分析。map
函数是 pull-stream 类库中定义的through
流,它使用用户指定的转换函数对数组中的每个元素进行转换。这里的处理函数比较简单,根据前面流返回的对象,生成并返回另一个对象。这里返回的对象,就是我们最终在示例程序中看到的对象,除了它没有hash
属性,并且cid
被删除。
createResolver
这个函数,它定义于 ipfs-unixfs-exporter 类库的 resolve.js
文件中,它的主体是生成并返回一个 pull-stream 类库中使用 pull
函数,代码具体如下:
pull(
paramap((item, cb) => {
if ((typeof item.depth) !== 'number') {
return error(new Error('no depth'))
} if (item.object) {
return cb(null, resolveItem(null, item.object, item, options))
}
waterfall([
function (done) {
dag.get(item.cid, done)
},
function (node, done) {
// node 为区块对象反序列化后的结果,可能为文件总的 Dag,也可能为某个文件(在不分块的情况下)。
done(null, resolveItem(item.cid, node.value, item, options))
}
], cb)
}),
flatten(),
filter(Boolean),
filter((node) => node.depth <= options.maxDepth)
)
上面代码简单解释如下:
- 首先,调用
paramap
函数,返回 pull-paramap 流,在这个流中使用异步类库的waterfall
方法,依次调用 IPLD 对象的get
方法,从本地或其他节点获取区块对象;在得到区块对象之后,调用resolveItem
方法,处理得到的区块对象(这里得到的区块对象,可能为一个完整的文件,也可能是文件的一个碎片,还可能是一个目录等)。pull-paramap 流是一个 pull-stream 流,它接收 3个参数,第一个参数类型为函数,函数签名为
这里(data, cb)
,在函数中执行用户自定义的业务逻辑,第二个和第三参数都是可选的。pull-paramap 并行地从前面的流中读取数据,调用第一个参数指定函数进行处理,把函数调用结果以数组的形式返回给后面的流。数组中结果的顺序与前面的流提供的源数据顺序保持一致。
paramap
函数的异步处理函数内容如下:- 检查当前对象的
depth
属性是否不是数字,如果不是数字,则返回错误。这里当前对象即为前面values
流中生成并返回的对象。 - 如果当前对象的
object
属性存在,则调用resolveItem
解释当前对象,并把结果传递给下一个函数。我们的对象没有object
属性,所以这里的代码不会执行。 - 调用异步类库的
waterfall
方法,依次调用 IPLD 对象的get
方法,从本地或其他节点获取区块对象;在得到区块对象之后,调用resolveItem
方法,处理得到的区块对象。
- 检查当前对象的
- 然后,调用 pull-stream 类库的
flatten
和filter
两个 through 流进返回的区块对象进行处理。 - 最后,调用 pull-stream 类库的
filter
流,过滤超过指定深度区块对象。
1、获取区块对象
上面的代码,我们是通过调用 dag 对象的get
方法来获取区块对象,它的执行逻辑如下:
dag 对象的类型是 ipld 类库中的 IPLDResolver
对象,在 IPFS 对象初始化时生成并设置在 IPFS 对象上面。
- 如果路径参数类型为函数,则重新设置参数。根据上面调用,我们这里的路径参数是
waterfall
提供的内部函数done
,所以会执行下面的代码重新设置下面两个变量。if (typeof path === 'function') { callback = path path = undefined }
- 如果选项参数为函数,则重新设置参数。根据上面的调用,这里没有选项参数,所以不会执行下面的代码。
if (typeof options === 'function') { callback = options options = {} }
- 处理路径参数
if (typeof path === 'string') { path = joinPath('/', path) .substr(1) .split(osPathSep) .join('/') }
- 如果路径参数为空串或没有定义,则调用内部函数
_get
进行处理,并在它的异步回调函数中返回其结果。内部函数_get
内部通过waterfall
函数来处理,具体代码如下:
在waterfall([ (cb) => this._getFormat(cid.codec, cb), (format, cb) => this.bs.get(cid, (err, block) => { if (err) return cb(err) cb(null, format, block) }), (format, block, cb) => { format.util.deserialize(block.data, (err, deserialized) => { if (err) { return cb(err) } cb(null, deserialized) }) } ], callback)
waterfall
函数内部,首先调用_getFormat
方法,根据 CID 对象来获取其所对用的格式化对象;然后调用区块服务对象的get
方法来获得区块对象;最后,使用格式化对象的工具对象的反序列化方法,反序列化区块服务对象获得的区块数据。区块服务对象位于 ipfs-block-service 类库的
index.js
文件中,它的get
方法,根据是否有 bitswap 对象决定是从 bitswap 对象获取区块对象,还是从本地仓库中获取。它的代码如下:
当系统启动过程中,在处理get (cid, callback) { if (this.hasExchange()) { this._bitswap.get(cid, callback) } else { this._repo.blocks.get(cid, callback) } }
init-docs
目录内的帮助文档时,bitswap 对象才有空,即只有这个过程才会直接从本地仓库保存/获取区块对象,其他情况都是调用 bitswap 对象的get
方法来获取区块对象。bitswap 对象的
get
方法委托自身的getMany
方法进行处理。后者的处理过程如下:- 初始化内部所用的变量:
wantList
数组为空,promptedNetwork
为假,pendingStart
为请求的所有 CID 数量。 - 生成一个从其他节点获取区块对象的函数对象
getFromOutside
。 - 调用异步类库的
map
函数,遍历每一个要请求的区块对象。针对每一个要请求的区块,使用异步类库的waterfall
函数进行处理。waterfall
函数处理如下:- 调用区块存储对象的
has
方法,检查本地是否有请求的区块; - 如果本地有请求的区块,则:如果已经处理完所有请求的 CID,那么调用 WantManager 的
wantBlocks
方法获取需要的区块;调用区块存储对象的get
方法从本地加载区块并返回。 - 如果内部变量
promptedNetwork
为假,则:设置这个变量为真(保证只有一个请求可以马上处理);调用网络对象的findAndConnect
方法,查找第一个请求的 CID。 - 调用函数对象
getFromOutside
进行处理。这个函数把指定的 CID 放入wantList
数组中,然后调用notifications
对象的wantBlock
方法通知系统,我们想要这个区块;如果已经处理完所有请求的 CID,那么调用 WantManager 的wantBlocks
方法获取需要的区块。
- 调用区块存储对象的
notifications
对象是一个内部模块,用来跟踪收到的区块、想要的区块、不想要的区块等。这个函数的第一个参数就是请求的区块 CID,第二个参数是一个函数,用来在收到一个区块后,从想要列表中取消这个区块,避免再次请求别的节点,第三个参数用来取消请求某个区块。 - 初始化内部所用的变量:
- 当路径参数不为空串,并且有值时,调用异步类库的
doUntil
函数进行处理。doUntil
函数内部的业务处理与上面基本类似,读者可以自己分析。
2、解析区块对象
当调用 dag 对象的get
方法获取到区块对象之后,是不是处理流程已经结束了?不是这样的,我们可以想像下,Unix 文件系统是一个树状的结构,从根目录 /
开始,然后是子目录,子目录下面又可以是孙目录或文件,孙目录下面又可以重孙目录或文件,子子孙孙无穷尽也。除了目录之外,当我们要获取的文件如果很大,在上传时候也会被切分类似目录树结构的结构,最顶层为文件总的 DAGNode 对象,它通过 DAGLink 连接到子碎片,子碎片又可以通过 DAGLink 连接到孙碎片,孙碎片又可以通过 DAGLink 连接到重孙碎片,子子孙孙无穷尽也。所以获取到区块对象之后,异步类库的 waterfall
函数通过调用它的第二个参数,从而调用 resolveItem
函数来解析获取到的区块对象,根据获取到的区块来获取完整的区块对象。resolveItem
函数实现为直接委托给另一个函数 resolve
进行处理,代码如下:
function resolveItem (cid, node, item, options) {
return resolve({
cid,
node,
name: item.name,
path: item.path,
pathRest: item.pathRest,
dag,
parentNode: item.parent || parent,
depth: item.depth,
options
})
}
resolve
函数处理过程如下:
- 调用函数
typeOf
检测区块对象的真实类型。
区块对象可能为 directory、hamt-sharded-directory、file、object、raw,每种类型都有特定的处理器进行解析。try { type = typeOf(node) } catch (err) { return error(err) }
- 从
resolvers
对象中获取对应类型的解析器const nodeResolver = resolvers[type]
resolvers
对象定义在文件开头,内容如下:const resolvers = { directory: require('./dir-flat'), 'hamt-sharded-directory': require('./dir-hamt-sharded'), file: require('./file'), object: require('./object'), raw: require('./raw') }
- 调用
createResolver
函数,创建resolveDeep
。 - 调用
nodeResolver
函数,解析指定的区块对象。对于不同的类型,nodeResolver
函数是不同的。当获取的区块对象类型为目录时,函数为dir-flat.js
中定义的dirExporter
函数;当获取的区块对象类型为文件时,函数为file.js
文件中定义的函数;当获取的区块对象类型为对象时,函数object.js
文件中定义的函数。当我们获取喵星人时,涉及到两种类型,即目录和文件,下面我们就以这两种类型进行分析。dirExporter
函数执行过程如下:- 设置要获取的第一个对象
const accepts = pathRest[0]
pathRest
是我们在调用get
方法时,根据提供的路径生成的路径数组,数组中不包括路径的基础部分。对于我们的例子,数组只有一个元素,即cat.jpg
。 - 生成一个代表当前目录的变量。
const dir = { name: name, depth: depth, path: path, cid, size: 0, type: 'dir' }
- 如果当前获取对象的深度超过了选项指定的最大深度,则返回 pull-stream 类库的源流
values
。if (options.maxDepth && options.maxDepth <= depth) { return values([dir]) }
- 生成一个流数组。
上面生成的流数组,在函数尾部会通过 pull-cat 类库进行级连调用,上面代码最终执行过程描述如下:遍历当前目录的所有连接;除非掉不属于当前请求的连接;把所有符合条件的连接生成对应的对象;最后,把生成的对象传递给const streams = [ pull( values(node.links), filter((item) => accepts === undefined || item.name === accepts), map((link) => ({ depth: depth + 1, size: 0, name: link.name, path: path + '/' + link.name, cid: link.cid, linkName: link.name, pathRest: pathRest.slice(1), type: 'dir' })), resolve ) ]
resolve
流,即调用createResolver
函数返回的流,这个流我们在前面已经分析过,这里不再细讲。在我们获取喵星人的例子当中,我们指定的路径为
QmW2WQi7j6c7UgJTarActp7tDNikE4B2qXtFCfLPdsgaTQ/cat.jpg
,/
前面的路径为基本路径,它代表了一个目录,这个目录中有一个文件,这个文件即是喵星人,也即node.links
指向的是喵星人的 CID。通过 pull-cat 类库的调用,在createResolver
函数返回的流中我们会真正请求喵星人,具体文件的处理,我们在下面进行分析。 - 如果路径中除了基本路径之外没有别的路径,或者选项指定为完全路径,则使用
dir
变量生成一个values
流,并放在流数组streams
的最前面。 - 调用 pull-cat 类库进行流处理,上体处理过程见前面描述。
下面我们来看 IPFS 是如何具体文件的,正如我们在前面所提到的,每个文件在保存到 IPFS 网络中都可能进行分片,即把大的文件分成小的碎片,每个碎片有自己的哈希,根据碎片的哈希生成对应的 DAGLink,以碎片在文件中出现的顺序,使用这些 DAGLink 生成连接数组,使用连接数组生成最终的顶层 DAGNode 对象,以此来表示文件。我们的喵星人同样也被分成了两个碎片,在前面分析中,请求目录之后,通过 pull-cat 类库的调用,再次请求
createResolver
函数返回的流的过程中,我们会请求喵星人总的 DAGNode 对象,当调用nodeResolver
函数时,这次会选择file.js
文件进行请求处理,它的执行过程如下:- 设置要获取的第一个对象
这次const accepts = pathRest[0]
if (accepts !== undefined && accepts !== path) { return empty() }
pathRest
数组为空,所以这里accepts
是未定义。 - 调用
UnixFS
的静态方法unmarshal
方法,从区块对象的data
属性中解组出 Uninx 文件对象。try { file = UnixFS.unmarshal(node.data) } catch (err) { return error(err) }
- 获取文件大小、指定的长度和偏移量
const fileSize = file.fileSize()
let offset = options.offset let length = options.length
if (offset < 0) { return error(new Error('Offset must be greater than or equal to 0')) }
if (offset > fileSize) { return error(new Error('Offset must be less than the file size')) }
if (length < 0) { return error(new Error('Length must be greater than or equal to 0')) }
- 如果长度为 0,则生成并返回 pull-stream 类库的
once
流。if (length === 0) { return once({ depth: depth, content: once(Buffer.alloc(0)), name: name, path: path, cid, size: fileSize, type: 'file' }) }
- 重新计算偏移量和文件长度。
if (!offset) { offset = 0 }
if (!length || (offset + length > fileSize)) { length = fileSize - offset }
- 调用
streamBytes
函数,根据偏移量、长度及节点的连接数组,获取指定的内容。streamBytes
函数采用深度优先算法获取区块对象的所有碎片数据,它的结果是一个 pull-stream 类库的 through 流。代码如下:
pull-travers 类库中提供了深度优先、广度优先、叶子优先3种算法来遍历一颗树,这里我们使用了深度优先来遍历文件的所有碎片。if (offset === fileSize || length === 0) { return once(Buffer.alloc(0)) }
const end = offset + length
return pull( traverse.depthFirst({ node, start: 0, end: fileSize }, getChildren(dag, offset, end)), map(extractData(offset, end)), filter(Boolean) )
- 生成并返回 pull-stream 类库的
values
流。返回的流在依次被用在resolver.js
的createResolver
函数返回的流中,后者又被 ipfs-unixfs-exporter 类库中的pull
函数中的map
流所使用;ipfs-unixfs-exporter 类库中的pull
函数中的map
流又被get-pull-stream.js
文件中的pull.map
所使用,并且最终被get.js
文件中的pull.asyncMap
流的处理函数转换为 Buffer 对象,从而我们的程序从 Buffer 对象中读取出文件内容。
- 设置要获取的第一个对象