Skip to content

Commit

Permalink
feat: add ?meta=eof for trailling metadata dag-json
Browse files Browse the repository at this point in the history
  • Loading branch information
rvagg committed Aug 10, 2023
1 parent 7ca8f48 commit 27bae29
Show file tree
Hide file tree
Showing 8 changed files with 420 additions and 48 deletions.
32 changes: 20 additions & 12 deletions carstream.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,23 +29,23 @@ var protoChooser = dagpb.AddSupportToChooser(basicnode.Chooser)

// StreamCar streams a DAG in CARv1 format to the given writer, using the given
// selector.
func StreamCar(ctx context.Context, requestLsys linking.LinkSystem, rootCid cid.Cid, selNode datamodel.Node, out io.Writer, duplicates bool) error {
func StreamCar(ctx context.Context, requestLsys linking.LinkSystem, rootCid cid.Cid, selNode datamodel.Node, out io.Writer, duplicates bool) (int64, int64, error) {
sel, err := selector.CompileSelector(selNode)
if err != nil {
return fmt.Errorf("failed to compile selector: %w", err)
return 0, 0, fmt.Errorf("failed to compile selector: %w", err)
}

carWriter, err := carstorage.NewWritable(out, []cid.Cid{rootCid}, car.WriteAsCarV1(true), car.AllowDuplicatePuts(duplicates))
if err != nil {
return fmt.Errorf("failed to create car writer: %w", err)
return 0, 0, fmt.Errorf("failed to create car writer: %w", err)
}

erro := &errorRecordingReadOpener{ctx, requestLsys.StorageReadOpener, carWriter, nil}
erro := newErrorRecordingReadOpener(ctx, requestLsys.StorageReadOpener, carWriter)
requestLsys.StorageReadOpener = erro.StorageReadOpener

rootNode, err := loadNode(ctx, rootCid, requestLsys)
if err != nil {
return fmt.Errorf("failed to load root node: %w", err)
return 0, 0, fmt.Errorf("failed to load root node: %w", err)
}

progress := traversal.Progress{Cfg: &traversal.Config{
Expand All @@ -54,20 +54,26 @@ func StreamCar(ctx context.Context, requestLsys linking.LinkSystem, rootCid cid.
LinkTargetNodePrototypeChooser: protoChooser,
}}
if err := progress.WalkAdv(rootNode, sel, visitNoop); err != nil {
return fmt.Errorf("failed to complete traversal: %w", err)
return 0, 0, fmt.Errorf("failed to complete traversal: %w", err)
}
if erro.err != nil {
return fmt.Errorf("block load failed during traversal: %w", erro.err)
return 0, 0, fmt.Errorf("block load failed during traversal: %w", erro.err)
}

return nil
return erro.byteCount, erro.blockCount, nil
}

type errorRecordingReadOpener struct {
ctx context.Context
orig linking.BlockReadOpener
car carstorage.WritableCar
err error
ctx context.Context
orig linking.BlockReadOpener
car carstorage.WritableCar
err error
byteCount int64
blockCount int64
}

func newErrorRecordingReadOpener(ctx context.Context, orig linking.BlockReadOpener, car carstorage.WritableCar) *errorRecordingReadOpener {
return &errorRecordingReadOpener{ctx, orig, car, nil, 0, 0}
}

func (erro *errorRecordingReadOpener) StorageReadOpener(lc linking.LinkContext, lnk datamodel.Link) (io.Reader, error) {
Expand All @@ -84,6 +90,8 @@ func (erro *errorRecordingReadOpener) StorageReadOpener(lc linking.LinkContext,
if err != nil {
return nil, err
}
erro.byteCount += int64(len(byts))
erro.blockCount++
return bytes.NewReader(byts), nil
}

Expand Down
96 changes: 70 additions & 26 deletions carstream_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,61 +50,73 @@ func TestStreamCar(t *testing.T) {
}

testCases := []struct {
name string
selector datamodel.Node
root cid.Cid
lsys linking.LinkSystem
validate func(t *testing.T, r io.Reader)
name string
selector datamodel.Node
root cid.Cid
lsys linking.LinkSystem
expectedBytes int64
expectedBlocks int64
validate func(t *testing.T, r io.Reader)
}{
{
name: "chain: all blocks",
selector: selectorparse.CommonSelector_ExploreAllRecursively,
root: tbc.TipLink.(cidlink.Link).Cid,
lsys: chainLsys,
name: "chain: all blocks",
selector: selectorparse.CommonSelector_ExploreAllRecursively,
root: tbc.TipLink.(cidlink.Link).Cid,
lsys: chainLsys,
expectedBytes: sizeOf(allChainBlocks),
expectedBlocks: 100,
validate: func(t *testing.T, r io.Reader) {
root, blks := carToBlocks(t, r)
require.Equal(t, tbc.TipLink.(cidlink.Link).Cid, root)
require.Equal(t, allChainBlocks, blks)
},
},
{
name: "chain: just root",
selector: selectorparse.CommonSelector_MatchPoint,
root: tbc.TipLink.(cidlink.Link).Cid,
lsys: chainLsys,
name: "chain: just root",
selector: selectorparse.CommonSelector_MatchPoint,
root: tbc.TipLink.(cidlink.Link).Cid,
lsys: chainLsys,
expectedBytes: sizeOf(allChainBlocks[:1]),
expectedBlocks: 1,
validate: func(t *testing.T, r io.Reader) {
root, blks := carToBlocks(t, r)
require.Equal(t, tbc.TipLink.(cidlink.Link).Cid, root)
require.Equal(t, []blocks.Block{allChainBlocks[0]}, blks)
},
},
{
name: "unixfs file",
selector: selectorparse.CommonSelector_ExploreAllRecursively,
root: fileEnt.Root,
lsys: fileLsys,
name: "unixfs file",
selector: selectorparse.CommonSelector_ExploreAllRecursively,
root: fileEnt.Root,
lsys: fileLsys,
expectedBytes: sizeOfDirEnt(fileEnt, fileLsys),
expectedBlocks: int64(len(fileEnt.SelfCids)),
validate: func(t *testing.T, r io.Reader) {
root, blks := carToBlocks(t, r)
require.Equal(t, fileEnt.Root, root)
require.ElementsMatch(t, fileEnt.SelfCids, blkCids(blks))
},
},
{
name: "unixfs directory",
selector: selectorparse.CommonSelector_ExploreAllRecursively,
root: dirEnt.Root,
lsys: dirLsys,
name: "unixfs directory",
selector: selectorparse.CommonSelector_ExploreAllRecursively,
root: dirEnt.Root,
lsys: dirLsys,
expectedBytes: sizeOfDirEnt(dirEnt, dirLsys),
expectedBlocks: blocksInDirEnt(dirEnt),
validate: func(t *testing.T, r io.Reader) {
root, blks := carToBlocks(t, r)
require.Equal(t, dirEnt.Root, root)
require.ElementsMatch(t, entCids(dirEnt), blkCids(blks))
},
},
{
name: "unixfs sharded directory",
selector: selectorparse.CommonSelector_ExploreAllRecursively,
root: shardedDirEnt.Root,
lsys: shardedDirLsys,
name: "unixfs sharded directory",
selector: selectorparse.CommonSelector_ExploreAllRecursively,
root: shardedDirEnt.Root,
lsys: shardedDirLsys,
expectedBytes: sizeOfDirEnt(shardedDirEnt, shardedDirLsys),
expectedBlocks: blocksInDirEnt(shardedDirEnt),
validate: func(t *testing.T, r io.Reader) {
root, blks := carToBlocks(t, r)
require.Equal(t, shardedDirEnt.Root, root)
Expand All @@ -118,8 +130,10 @@ func TestStreamCar(t *testing.T) {
t.Run(tc.name, func(t *testing.T) {
req := require.New(t)
var buf bytes.Buffer
err := frisbii.StreamCar(ctx, tc.lsys, tc.root, tc.selector, &buf, false)
byts, blks, err := frisbii.StreamCar(ctx, tc.lsys, tc.root, tc.selector, &buf, false)
req.NoError(err)
req.Equal(tc.expectedBytes, byts)
req.Equal(tc.expectedBlocks, blks)
tc.validate(t, &buf)
})
}
Expand Down Expand Up @@ -200,3 +214,33 @@ func GenerateNoDupes(gen func() unixfs.DirEntry) unixfs.DirEntry {
}
}
}

func sizeOf(blks []blocks.Block) int64 {
var size int64
for _, blk := range blks {
size += int64(len(blk.RawData()))
}
return size
}
func sizeOfDirEnt(dirEnt unixfs.DirEntry, ls linking.LinkSystem) int64 {
var size int64
for _, c := range dirEnt.SelfCids {
blk, err := ls.LoadRaw(linking.LinkContext{}, cidlink.Link{Cid: c})
if err != nil {
panic(err)
}
size += int64(len(blk))
}
for _, c := range dirEnt.Children {
size += sizeOfDirEnt(c, ls)
}
return size
}

func blocksInDirEnt(dirEnt unixfs.DirEntry) int64 {
size := int64(len(dirEnt.SelfCids))
for _, c := range dirEnt.Children {
size += blocksInDirEnt(c)
}
return size
}
4 changes: 2 additions & 2 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ require (
github.com/ipfs/go-unixfsnode v1.7.3
github.com/ipld/go-car/v2 v2.10.1
github.com/ipld/go-codec-dagpb v1.6.0
github.com/ipld/go-ipld-prime v0.20.1-0.20230613110822-3142e1304e55
github.com/ipld/go-ipld-prime v0.21.1-0.20230810111002-bdf990edcdeb
github.com/ipni/go-libipni v0.3.4
github.com/ipni/index-provider v0.13.5
github.com/libp2p/go-libp2p v0.29.2
Expand All @@ -22,6 +22,7 @@ require (
github.com/stretchr/testify v1.8.4
github.com/urfave/cli/v2 v2.25.7
golang.org/x/term v0.10.0
lukechampine.com/blake3 v1.2.1
)

require (
Expand Down Expand Up @@ -172,5 +173,4 @@ require (
golang.org/x/xerrors v0.0.0-20220907171357-04be3eba64a2 // indirect
google.golang.org/protobuf v1.30.0 // indirect
gopkg.in/yaml.v3 v3.0.1 // indirect
lukechampine.com/blake3 v1.2.1 // indirect
)
6 changes: 3 additions & 3 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -129,7 +129,7 @@ github.com/flynn/noise v1.0.0 h1:DlTHqmzmvcEiKj+4RYo/imoswx/4r6iBlCMfVtrMXpQ=
github.com/flynn/noise v1.0.0/go.mod h1:xbMo+0i6+IGbYdJhF31t2eR1BIU0CYc12+BNAKwUTag=
github.com/francoispqt/gojay v1.2.13 h1:d2m3sFjloqoIUQU3TsHBgj6qg/BVGlTBeHDUmyJnXKk=
github.com/francoispqt/gojay v1.2.13/go.mod h1:ehT5mTG4ua4581f1++1WLG0vPdaA9HaiDsoyrBGkyDY=
github.com/frankban/quicktest v1.14.5 h1:dfYrrRyLtiqT9GyKXgdh+k4inNeTvmGbuSgZ3lx3GhA=
github.com/frankban/quicktest v1.14.6 h1:7Xjx+VpznH+oBnejlPUj8oUpdxnVs4f8XU8WnHkI4W8=
github.com/fsnotify/fsnotify v1.4.7/go.mod h1:jwhsz4b93w/PPRr/qN1Yymfu8t87LnFCMoQvtojpjFo=
github.com/gammazero/channelqueue v0.2.1 h1:AcK6wnLrj8koTTn3RxjRCyfmS677TjhIZb1FSMi14qc=
github.com/gammazero/channelqueue v0.2.1/go.mod h1:824o5HHE+yO1xokh36BIuSv8YWwXW0364ku91eRMFS4=
Expand Down Expand Up @@ -327,8 +327,8 @@ github.com/ipld/go-codec-dagpb v1.6.0 h1:9nYazfyu9B1p3NAgfVdpRco3Fs2nFC72DqVsMj6
github.com/ipld/go-codec-dagpb v1.6.0/go.mod h1:ANzFhfP2uMJxRBr8CE+WQWs5UsNa0pYtmKZ+agnUw9s=
github.com/ipld/go-ipld-adl-hamt v0.0.0-20220616142416-9004dbd839e0 h1:QAI/Ridj0+foHD6epbxmB4ugxz9B4vmNdYSmQLGa05E=
github.com/ipld/go-ipld-adl-hamt v0.0.0-20220616142416-9004dbd839e0/go.mod h1:odxGcpiQZLzP5+yGu84Ljo8y3EzCvNAQKEodHNsHLXA=
github.com/ipld/go-ipld-prime v0.20.1-0.20230613110822-3142e1304e55 h1:D1JUX6l0+ugD3PE99l/NmN/97jz9YNP0uZZRLAGZQhs=
github.com/ipld/go-ipld-prime v0.20.1-0.20230613110822-3142e1304e55/go.mod h1:PRQpXNcJypaPiiSdarsrJABPkYrBvafwDl0B9HjujZ8=
github.com/ipld/go-ipld-prime v0.21.1-0.20230810111002-bdf990edcdeb h1:frdDF5813yKhOaEHAdEhgArLVk+uBQwygVtfg2jAGR4=
github.com/ipld/go-ipld-prime v0.21.1-0.20230810111002-bdf990edcdeb/go.mod h1:3RLqy//ERg/y5oShXXdx5YIp50cFGOanyMctpPjsvxQ=
github.com/ipld/go-ipld-prime/storage/bsadapter v0.0.0-20230102063945-1a409dc236dd h1:gMlw/MhNr2Wtp5RwGdsW23cs+yCuj9k2ON7i9MiJlRo=
github.com/ipni/go-libipni v0.3.4 h1:ZYgCE2TOZt/QJJcBZb+R63FaBLlA2suZGP2IH1fKv4A=
github.com/ipni/go-libipni v0.3.4/go.mod h1:6EIUhN83pd1i6q7SCSCIuuUC3XgR7D/gjKkEnVyIQWE=
Expand Down
Loading

0 comments on commit 27bae29

Please sign in to comment.