-
Notifications
You must be signed in to change notification settings - Fork 32
Closed
Labels
need/triageNeeds initial labeling and prioritizationNeeds initial labeling and prioritization
Description
The Trustless Gateway spec is being productionized by Saturn, Daghaus and possibly others. It's important for DAG traversal clients to be able to consume CARs with blocks in DFS order as this is optimal for streaming and incremental verification. While DFS isn't required by the spec, it seems to be the preferred traversal order as it's the only explicitly mentioned order, the other being "unknown".
This is the section that performs BFS, there may be more though.
js-ipfs-unixfs/packages/ipfs-unixfs-exporter/src/resolvers/unixfs-v1/content/file.ts
Lines 53 to 127 in 8a271b2
| for (let i = 0; i < node.Links.length; i++) { | |
| const childLink = node.Links[i] | |
| const childStart = streamPosition // inclusive | |
| const childEnd = childStart + file.blockSizes[i] // exclusive | |
| if ((start >= childStart && start < childEnd) || // child has offset byte | |
| (end >= childStart && end <= childEnd) || // child has end byte | |
| (start < childStart && end > childEnd)) { // child is between offset and end bytes | |
| childOps.push({ | |
| link: childLink, | |
| blockStart: streamPosition | |
| }) | |
| } | |
| streamPosition = childEnd | |
| if (streamPosition > end) { | |
| break | |
| } | |
| } | |
| await pipe( | |
| childOps, | |
| (source) => map(source, (op) => { | |
| return async () => { | |
| const block = await blockstore.get(op.link.Hash, options) | |
| return { | |
| ...op, | |
| block | |
| } | |
| } | |
| }), | |
| (source) => parallel(source, { | |
| ordered: true | |
| }), | |
| async (source) => { | |
| for await (const { link, block, blockStart } of source) { | |
| let child: dagPb.PBNode | Uint8Array | |
| switch (link.Hash.code) { | |
| case dagPb.code: | |
| child = dagPb.decode(block) | |
| break | |
| case raw.code: | |
| child = block | |
| break | |
| default: | |
| queue.end(errCode(new Error(`Unsupported codec: ${link.Hash.code}`), 'ERR_NOT_UNIXFS')) | |
| return | |
| } | |
| // create a queue for this child - we use a queue instead of recursion | |
| // to avoid overflowing the stack | |
| const childQueue = new PQueue({ | |
| concurrency: 1 | |
| }) | |
| // if any of the child jobs error, end the read queue with the error | |
| childQueue.on('error', error => { | |
| queue.end(error) | |
| }) | |
| // if the job rejects the 'error' event will be emitted on the child queue | |
| void childQueue.add(async () => { | |
| options.onProgress?.(new CustomProgressEvent<ExportWalk>('unixfs:exporter:walk:file', { | |
| cid: link.Hash | |
| })) | |
| await walkDAG(blockstore, child, queue, blockStart, start, end, options) | |
| }) | |
| // wait for this child to complete before moving on to the next | |
| await childQueue.onIdle() | |
| } | |
| } | |
| ) |
Metadata
Metadata
Assignees
Labels
need/triageNeeds initial labeling and prioritizationNeeds initial labeling and prioritization