diff --git a/array.go b/array.go index ddf3e95b..f033f940 100644 --- a/array.go +++ b/array.go @@ -2727,3 +2727,205 @@ func nextLevelArraySlabs(storage SlabStorage, address Address, slabs []ArraySlab return slabs[:nextLevelSlabsIndex], nil } + +type arrayLoadedElementIterator struct { + storage SlabStorage + slab *ArrayDataSlab + index int +} + +func (i *arrayLoadedElementIterator) next() (Value, error) { + // Iterate loaded elements in data slab. + for i.index < len(i.slab.elements) { + element := i.slab.elements[i.index] + i.index++ + + v, err := getLoadedValue(i.storage, element) + if err != nil { + // Don't need to wrap error as external error because err is already categorized by getLoadedValue. + return nil, err + } + if v == nil { + // Skip this element because it references unloaded slab. + // Try next element. + continue + } + + return v, nil + } + + // Reach end of elements + return nil, nil +} + +type arrayLoadedSlabIterator struct { + storage SlabStorage + slab *ArrayMetaDataSlab + index int +} + +func (i *arrayLoadedSlabIterator) next() Slab { + // Iterate loaded slabs in meta data slab. + for i.index < len(i.slab.childrenHeaders) { + header := i.slab.childrenHeaders[i.index] + i.index++ + + childSlab := i.storage.RetrieveIfLoaded(header.slabID) + if childSlab == nil { + // Skip this child because it references unloaded slab. + // Try next child. + continue + } + + return childSlab + } + + // Reach end of children. + return nil +} + +// ArrayLoadedValueIterator is used to iterate over loaded array elements. +type ArrayLoadedValueIterator struct { + storage SlabStorage + parents []*arrayLoadedSlabIterator // LIFO stack for parents of dataIterator + dataIterator *arrayLoadedElementIterator +} + +func (i *ArrayLoadedValueIterator) nextDataIterator() (*arrayLoadedElementIterator, error) { + + // Iterate parents (LIFO) to find next loaded array data slab. + for len(i.parents) > 0 { + lastParent := i.parents[len(i.parents)-1] + + nextChildSlab := lastParent.next() + + switch slab := nextChildSlab.(type) { + case *ArrayDataSlab: + // Create data iterator + return &arrayLoadedElementIterator{ + storage: i.storage, + slab: slab, + }, nil + + case *ArrayMetaDataSlab: + // Push new parent to parents queue + newParent := &arrayLoadedSlabIterator{ + storage: i.storage, + slab: slab, + } + i.parents = append(i.parents, newParent) + + case nil: + // Reach end of last parent. + // Reset last parent to nil and pop last parent from parents stack. + lastParentIndex := len(i.parents) - 1 + i.parents[lastParentIndex] = nil + i.parents = i.parents[:lastParentIndex] + + default: + return nil, NewSlabDataErrorf("slab %s isn't ArraySlab", nextChildSlab.SlabID()) + } + } + + // Reach end of parents stack. + return nil, nil +} + +// Next iterates and returns next loaded element. +// It returns nil Value at end of loaded elements. +func (i *ArrayLoadedValueIterator) Next() (Value, error) { + if i.dataIterator != nil { + element, err := i.dataIterator.next() + if err != nil { + // Don't need to wrap error as external error because err is already categorized by arrayLoadedElementIterator.next(). + return nil, err + } + if element != nil { + return element, nil + } + + // Reach end of element in current data slab. + i.dataIterator = nil + } + + // Get next data iterator. + var err error + i.dataIterator, err = i.nextDataIterator() + if err != nil { + // Don't need to wrap error as external error because err is already categorized by arrayLoadedValueIterator.nextDataIterator(). + return nil, err + } + if i.dataIterator != nil { + return i.Next() + } + + // Reach end of loaded value iterator + return nil, nil +} + +// LoadedValueIterator returns iterator to iterate loaded array elements. +func (a *Array) LoadedValueIterator() (*ArrayLoadedValueIterator, error) { + switch slab := a.root.(type) { + + case *ArrayDataSlab: + // Create a data iterator from root slab. + dataIterator := &arrayLoadedElementIterator{ + storage: a.Storage, + slab: slab, + } + + // Create iterator with data iterator (no parents). + iterator := &ArrayLoadedValueIterator{ + storage: a.Storage, + dataIterator: dataIterator, + } + + return iterator, nil + + case *ArrayMetaDataSlab: + // Create a slab iterator from root slab. + slabIterator := &arrayLoadedSlabIterator{ + storage: a.Storage, + slab: slab, + } + + // Create iterator with parent (data iterater is uninitialized). + iterator := &ArrayLoadedValueIterator{ + storage: a.Storage, + parents: []*arrayLoadedSlabIterator{slabIterator}, + } + + return iterator, nil + + default: + return nil, NewSlabDataErrorf("slab %s isn't ArraySlab", slab.SlabID()) + } +} + +// IterateLoadedValues iterates loaded array values. +func (a *Array) IterateLoadedValues(fn ArrayIterationFunc) error { + iterator, err := a.LoadedValueIterator() + if err != nil { + // Don't need to wrap error as external error because err is already categorized by Array.LoadedValueIterator(). + return err + } + + for { + value, err := iterator.Next() + if err != nil { + // Don't need to wrap error as external error because err is already categorized by ArrayLoadedValueIterator.Next(). + return err + } + if value == nil { + return nil + } + resume, err := fn(value) + if err != nil { + // Wrap err as external error (if needed) because err is returned by ArrayIterationFunc callback. + return wrapErrorAsExternalErrorIfNeeded(err) + } + if !resume { + return nil + } + } +} diff --git a/array_test.go b/array_test.go index 66b06d08..f3c20f06 100644 --- a/array_test.go +++ b/array_test.go @@ -2590,6 +2590,850 @@ func errorCategorizationCount(err error) int { return count } +func TestArrayLoadedValueIterator(t *testing.T) { + + SetThreshold(256) + defer SetThreshold(1024) + + typeInfo := testTypeInfo{42} + address := Address{1, 2, 3, 4, 5, 6, 7, 8} + + t.Run("empty", func(t *testing.T) { + storage := newTestPersistentStorage(t) + + array, err := NewArray(storage, address, typeInfo) + require.NoError(t, err) + + // parent array: 1 root data slab + require.Equal(t, 1, len(storage.deltas)) + require.Equal(t, 0, getArrayMetaDataSlabCount(storage)) + + verifyArrayLoadedElements(t, array, nil) + }) + + t.Run("root data slab with simple values", func(t *testing.T) { + storage := newTestPersistentStorage(t) + + const arraySize = 3 + array, values := createArrayWithSimpleValues(t, storage, address, typeInfo, arraySize) + + // parent array: 1 root data slab + require.Equal(t, 1, len(storage.deltas)) + require.Equal(t, 0, getArrayMetaDataSlabCount(storage)) + + verifyArrayLoadedElements(t, array, values) + }) + + t.Run("root data slab with composite values", func(t *testing.T) { + storage := newTestPersistentStorage(t) + + const arraySize = 3 + array, values := createArrayWithCompositeValues(t, storage, address, typeInfo, arraySize) + + // parent array: 1 root data slab + // nested composite elements: 1 root data slab for each + require.Equal(t, 1+arraySize, len(storage.deltas)) + require.Equal(t, 0, getArrayMetaDataSlabCount(storage)) + + verifyArrayLoadedElements(t, array, values) + }) + + t.Run("root data slab with composite values, unload composite element from front to back", func(t *testing.T) { + storage := newTestPersistentStorage(t) + + const arraySize = 3 + array, values := createArrayWithCompositeValues(t, storage, address, typeInfo, arraySize) + + // parent array: 1 root data slab + // nested composite elements: 1 root data slab for each + require.Equal(t, 1+arraySize, len(storage.deltas)) + require.Equal(t, 0, getArrayMetaDataSlabCount(storage)) + + verifyArrayLoadedElements(t, array, values) + + // Unload composite element from front to back + for i := 0; i < len(values); i++ { + v := values[i] + + nestedArray, ok := v.(*Array) + require.True(t, ok) + + err := storage.Remove(nestedArray.SlabID()) + require.NoError(t, err) + + expectedValues := values[i+1:] + verifyArrayLoadedElements(t, array, expectedValues) + } + }) + + t.Run("root data slab with composite values, unload composite element from back to front", func(t *testing.T) { + storage := newTestPersistentStorage(t) + + const arraySize = 3 + array, values := createArrayWithCompositeValues(t, storage, address, typeInfo, arraySize) + + // parent array: 1 root data slab + // nested composite elements: 1 root data slab for each + require.Equal(t, 1+arraySize, len(storage.deltas)) + require.Equal(t, 0, getArrayMetaDataSlabCount(storage)) + + verifyArrayLoadedElements(t, array, values) + + // Unload composite element from back to front + for i := len(values) - 1; i >= 0; i-- { + v := values[i] + + nestedArray, ok := v.(*Array) + require.True(t, ok) + + err := storage.Remove(nestedArray.SlabID()) + require.NoError(t, err) + + expectedValues := values[:i] + verifyArrayLoadedElements(t, array, expectedValues) + } + }) + + t.Run("root data slab with composite values, unload composite element in the middle", func(t *testing.T) { + storage := newTestPersistentStorage(t) + + const arraySize = 3 + array, values := createArrayWithCompositeValues(t, storage, address, typeInfo, arraySize) + + // parent array: 1 root data slab + // nested composite elements: 1 root data slab for each + require.Equal(t, 1+arraySize, len(storage.deltas)) + require.Equal(t, 0, getArrayMetaDataSlabCount(storage)) + + verifyArrayLoadedElements(t, array, values) + + // Unload composite element in the middle + unloadValueIndex := 1 + + v := values[unloadValueIndex] + + nestedArray, ok := v.(*Array) + require.True(t, ok) + + err := storage.Remove(nestedArray.SlabID()) + require.NoError(t, err) + + copy(values[unloadValueIndex:], values[unloadValueIndex+1:]) + values = values[:len(values)-1] + + verifyArrayLoadedElements(t, array, values) + }) + + t.Run("root data slab with composite values, unload composite elements during iteration", func(t *testing.T) { + storage := newTestPersistentStorage(t) + + const arraySize = 3 + array, values := createArrayWithCompositeValues(t, storage, address, typeInfo, arraySize) + + // parent array: 1 root data slab + // nested composite elements: 1 root data slab for each + require.Equal(t, 1+arraySize, len(storage.deltas)) + require.Equal(t, 0, getArrayMetaDataSlabCount(storage)) + + verifyArrayLoadedElements(t, array, values) + + i := 0 + err := array.IterateLoadedValues(func(v Value) (bool, error) { + // At this point, iterator returned first element (v). + + // Remove all other nested composite elements (except first element) from storage. + for _, value := range values[1:] { + nestedArray, ok := value.(*Array) + require.True(t, ok) + + err := storage.Remove(nestedArray.SlabID()) + require.NoError(t, err) + } + + require.Equal(t, 0, i) + valueEqual(t, typeInfoComparator, values[0], v) + i++ + return true, nil + }) + + require.NoError(t, err) + require.Equal(t, 1, i) // Only first element is iterated because other elements are remove during iteration. + }) + + t.Run("root data slab with simple and composite values, unload composite element", func(t *testing.T) { + const arraySize = 3 + + // Create an array with nested composite value at specified index + for nestedCompositeIndex := 0; nestedCompositeIndex < arraySize; nestedCompositeIndex++ { + storage := newTestPersistentStorage(t) + + array, values := createArrayWithSimpleAndCompositeValues(t, storage, address, typeInfo, arraySize, nestedCompositeIndex) + + // parent array: 1 root data slab + // nested composite element: 1 root data slab + require.Equal(t, 2, len(storage.deltas)) + require.Equal(t, 0, getArrayMetaDataSlabCount(storage)) + + verifyArrayLoadedElements(t, array, values) + + // Unload composite element + v := values[nestedCompositeIndex].(*Array) + + err := storage.Remove(v.SlabID()) + require.NoError(t, err) + + copy(values[nestedCompositeIndex:], values[nestedCompositeIndex+1:]) + values = values[:len(values)-1] + + verifyArrayLoadedElements(t, array, values) + } + }) + + t.Run("root metadata slab with simple values", func(t *testing.T) { + storage := newTestPersistentStorage(t) + + const arraySize = 20 + array, values := createArrayWithSimpleValues(t, storage, address, typeInfo, arraySize) + + // parent array: 1 root metadata slab, 2 data slabs + require.Equal(t, 3, len(storage.deltas)) + require.Equal(t, 1, getArrayMetaDataSlabCount(storage)) + + verifyArrayLoadedElements(t, array, values) + }) + + t.Run("root metadata slab with composite values", func(t *testing.T) { + storage := newTestPersistentStorage(t) + + const arraySize = 20 + array, values := createArrayWithCompositeValues(t, storage, address, typeInfo, arraySize) + + // parent array: 1 root metadata slab, 2 data slabs + // nested composite value element: 1 root data slab for each + require.Equal(t, 3+arraySize, len(storage.deltas)) + require.Equal(t, 1, getArrayMetaDataSlabCount(storage)) + + verifyArrayLoadedElements(t, array, values) + }) + + t.Run("root metadata slab with composite values, unload composite element from front to back", func(t *testing.T) { + storage := newTestPersistentStorage(t) + + const arraySize = 20 + array, values := createArrayWithCompositeValues(t, storage, address, typeInfo, arraySize) + + // parent array: 1 root metadata slab, 2 data slabs + // nested composite value element: 1 root data slab for each + require.Equal(t, 3+arraySize, len(storage.deltas)) + require.Equal(t, 1, getArrayMetaDataSlabCount(storage)) + + verifyArrayLoadedElements(t, array, values) + + // Unload composite element from front to back + for i := 0; i < len(values); i++ { + v := values[i] + + nestedArray, ok := v.(*Array) + require.True(t, ok) + + err := storage.Remove(nestedArray.SlabID()) + require.NoError(t, err) + + expectedValues := values[i+1:] + verifyArrayLoadedElements(t, array, expectedValues) + } + }) + + t.Run("root metadata slab with composite values, unload composite element from back to front", func(t *testing.T) { + storage := newTestPersistentStorage(t) + + const arraySize = 20 + array, values := createArrayWithCompositeValues(t, storage, address, typeInfo, arraySize) + + // parent array: 1 root metadata slab, 2 data slabs + // nested composite value element: 1 root data slab for each + require.Equal(t, 3+arraySize, len(storage.deltas)) + require.Equal(t, 1, getArrayMetaDataSlabCount(storage)) + + verifyArrayLoadedElements(t, array, values) + + // Unload composite element from back to front + for i := len(values) - 1; i >= 0; i-- { + v := values[i] + + nestedArray, ok := v.(*Array) + require.True(t, ok) + + err := storage.Remove(nestedArray.SlabID()) + require.NoError(t, err) + + expectedValues := values[:i] + verifyArrayLoadedElements(t, array, expectedValues) + } + }) + + t.Run("root metadata slab with composite values, unload composite element in the middle", func(t *testing.T) { + storage := newTestPersistentStorage(t) + + const arraySize = 20 + array, values := createArrayWithCompositeValues(t, storage, address, typeInfo, arraySize) + + // parent array: 1 root metadata slab, 2 data slabs + // nested composite value element: 1 root data slab for each + require.Equal(t, 3+arraySize, len(storage.deltas)) + require.Equal(t, 1, getArrayMetaDataSlabCount(storage)) + + verifyArrayLoadedElements(t, array, values) + + // Unload composite element in the middle + for _, index := range []int{4, 14} { + + v := values[index] + + nestedArray, ok := v.(*Array) + require.True(t, ok) + + err := storage.Remove(nestedArray.SlabID()) + require.NoError(t, err) + + copy(values[index:], values[index+1:]) + values = values[:len(values)-1] + + verifyArrayLoadedElements(t, array, values) + } + }) + + t.Run("root metadata slab with simple and composite values, unload composite element", func(t *testing.T) { + const arraySize = 20 + + // Create an array with composite value at specified index. + for nestedCompositeIndex := 0; nestedCompositeIndex < arraySize; nestedCompositeIndex++ { + storage := newTestPersistentStorage(t) + + array, values := createArrayWithSimpleAndCompositeValues(t, storage, address, typeInfo, arraySize, nestedCompositeIndex) + + // parent array: 1 root metadata slab, 2 data slabs + // nested composite value element: 1 root data slab for each + require.Equal(t, 3+1, len(storage.deltas)) + require.Equal(t, 1, getArrayMetaDataSlabCount(storage)) + + verifyArrayLoadedElements(t, array, values) + + // Unload composite value + v := values[nestedCompositeIndex].(*Array) + + err := storage.Remove(v.SlabID()) + require.NoError(t, err) + + copy(values[nestedCompositeIndex:], values[nestedCompositeIndex+1:]) + values = values[:len(values)-1] + + verifyArrayLoadedElements(t, array, values) + } + }) + + t.Run("root metadata slab, unload data slab from front to back", func(t *testing.T) { + storage := newTestPersistentStorage(t) + + const arraySize = 30 + array, values := createArrayWithSimpleValues(t, storage, address, typeInfo, arraySize) + + // parent array (2 levels): 1 root metadata slab, 3 data slabs + require.Equal(t, 4, len(storage.deltas)) + require.Equal(t, 1, getArrayMetaDataSlabCount(storage)) + + verifyArrayLoadedElements(t, array, values) + + metaDataSlab, ok := array.root.(*ArrayMetaDataSlab) + require.True(t, ok) + + // Unload data slabs from front to back + for i := 0; i < len(metaDataSlab.childrenHeaders); i++ { + + childHeader := metaDataSlab.childrenHeaders[i] + + err := storage.Remove(childHeader.slabID) + require.NoError(t, err) + + values = values[childHeader.count:] + + verifyArrayLoadedElements(t, array, values) + } + }) + + t.Run("root metadata slab, unload data slab from back to front", func(t *testing.T) { + storage := newTestPersistentStorage(t) + + const arraySize = 30 + array, values := createArrayWithSimpleValues(t, storage, address, typeInfo, arraySize) + + // parent array (2 levels): 1 root metadata slab, 3 data slabs + require.Equal(t, 4, len(storage.deltas)) + require.Equal(t, 1, getArrayMetaDataSlabCount(storage)) + + verifyArrayLoadedElements(t, array, values) + + metaDataSlab, ok := array.root.(*ArrayMetaDataSlab) + require.True(t, ok) + + // Unload data slabs from back to front + for i := len(metaDataSlab.childrenHeaders) - 1; i >= 0; i-- { + + childHeader := metaDataSlab.childrenHeaders[i] + + err := storage.Remove(childHeader.slabID) + require.NoError(t, err) + + values = values[:len(values)-int(childHeader.count)] + + verifyArrayLoadedElements(t, array, values) + } + }) + + t.Run("root metadata slab, unload data slab in the middle", func(t *testing.T) { + storage := newTestPersistentStorage(t) + + const arraySize = 30 + array, values := createArrayWithSimpleValues(t, storage, address, typeInfo, arraySize) + + // parent array (2 levels): 1 root metadata slab, 3 data slabs + require.Equal(t, 4, len(storage.deltas)) + require.Equal(t, 1, getArrayMetaDataSlabCount(storage)) + + verifyArrayLoadedElements(t, array, values) + + metaDataSlab, ok := array.root.(*ArrayMetaDataSlab) + require.True(t, ok) + + require.True(t, len(metaDataSlab.childrenHeaders) > 2) + + index := 1 + childHeader := metaDataSlab.childrenHeaders[index] + + err := storage.Remove(childHeader.slabID) + require.NoError(t, err) + + copy(values[metaDataSlab.childrenCountSum[index-1]:], values[metaDataSlab.childrenCountSum[index]:]) + values = values[:array.Count()-uint64(childHeader.count)] + + verifyArrayLoadedElements(t, array, values) + }) + + t.Run("root metadata slab, unload non-root metadata slab from front to back", func(t *testing.T) { + storage := newTestPersistentStorage(t) + + const arraySize = 200 + array, values := createArrayWithSimpleValues(t, storage, address, typeInfo, arraySize) + + // parent array (3 levels): 1 root metadata slab, 2 non-root metadata slabs, n data slabs + require.Equal(t, 3, getArrayMetaDataSlabCount(storage)) + + rootMetaDataSlab, ok := array.root.(*ArrayMetaDataSlab) + require.True(t, ok) + + // Unload non-root metadata slabs from front to back + for i := 0; i < len(rootMetaDataSlab.childrenHeaders); i++ { + + childHeader := rootMetaDataSlab.childrenHeaders[i] + + err := storage.Remove(childHeader.slabID) + require.NoError(t, err) + + values = values[childHeader.count:] + + verifyArrayLoadedElements(t, array, values) + } + }) + + t.Run("root metadata slab, unload non-root metadata slab from back to front", func(t *testing.T) { + storage := newTestPersistentStorage(t) + + const arraySize = 200 + array, values := createArrayWithSimpleValues(t, storage, address, typeInfo, arraySize) + + // parent array (3 levels): 1 root metadata slab, 2 child metadata slabs, n data slabs + require.Equal(t, 3, getArrayMetaDataSlabCount(storage)) + + rootMetaDataSlab, ok := array.root.(*ArrayMetaDataSlab) + require.True(t, ok) + + // Unload non-root metadata slabs from back to front + for i := len(rootMetaDataSlab.childrenHeaders) - 1; i >= 0; i-- { + + childHeader := rootMetaDataSlab.childrenHeaders[i] + + err := storage.Remove(childHeader.slabID) + require.NoError(t, err) + + values = values[childHeader.count:] + + verifyArrayLoadedElements(t, array, values) + } + }) + + t.Run("root metadata slab with composite values, unload random composite value", func(t *testing.T) { + + storage := newTestPersistentStorage(t) + + const arraySize = 500 + array, values := createArrayWithCompositeValues(t, storage, address, typeInfo, arraySize) + + // parent array (3 levels): 1 root metadata slab, n non-root metadata slabs, n data slabs + // nested composite elements: 1 root data slab for each + require.True(t, len(storage.deltas) > 1+arraySize) + require.True(t, getArrayMetaDataSlabCount(storage) > 1) + + verifyArrayLoadedElements(t, array, values) + + r := newRand(t) + + // Unload random composite element + for len(values) > 0 { + + i := r.Intn(len(values)) + + v := values[i] + + nestedArray, ok := v.(*Array) + require.True(t, ok) + + err := storage.Remove(nestedArray.SlabID()) + require.NoError(t, err) + + copy(values[i:], values[i+1:]) + values = values[:len(values)-1] + + verifyArrayLoadedElements(t, array, values) + } + }) + + t.Run("root metadata slab with composite values, unload random data slab", func(t *testing.T) { + + storage := newTestPersistentStorage(t) + + const arraySize = 500 + array, values := createArrayWithCompositeValues(t, storage, address, typeInfo, arraySize) + + // parent array (3 levels): 1 root metadata slab, n non-root metadata slabs, n data slabs + // nested composite elements: 1 root data slab for each + require.True(t, len(storage.deltas) > 1+arraySize) + require.True(t, getArrayMetaDataSlabCount(storage) > 1) + + verifyArrayLoadedElements(t, array, values) + + rootMetaDataSlab, ok := array.root.(*ArrayMetaDataSlab) + require.True(t, ok) + + type slabInfo struct { + id SlabID + startIndex int + count int + } + + count := 0 + var dataSlabInfos []*slabInfo + for _, mheader := range rootMetaDataSlab.childrenHeaders { + nonrootMetaDataSlab, ok := storage.deltas[mheader.slabID].(*ArrayMetaDataSlab) + require.True(t, ok) + + for _, h := range nonrootMetaDataSlab.childrenHeaders { + dataSlabInfo := &slabInfo{id: h.slabID, startIndex: count, count: int(h.count)} + dataSlabInfos = append(dataSlabInfos, dataSlabInfo) + count += int(h.count) + } + } + + r := newRand(t) + + // Unload random data slab. + for len(dataSlabInfos) > 0 { + indexToUnload := r.Intn(len(dataSlabInfos)) + + slabInfoToUnload := dataSlabInfos[indexToUnload] + + // Update startIndex for all data slabs after indexToUnload. + for i := indexToUnload + 1; i < len(dataSlabInfos); i++ { + dataSlabInfos[i].startIndex -= slabInfoToUnload.count + } + + // Remove slabInfo to be unloaded from dataSlabInfos. + copy(dataSlabInfos[indexToUnload:], dataSlabInfos[indexToUnload+1:]) + dataSlabInfos = dataSlabInfos[:len(dataSlabInfos)-1] + + err := storage.Remove(slabInfoToUnload.id) + require.NoError(t, err) + + copy(values[slabInfoToUnload.startIndex:], values[slabInfoToUnload.startIndex+slabInfoToUnload.count:]) + values = values[:len(values)-slabInfoToUnload.count] + + verifyArrayLoadedElements(t, array, values) + } + + require.Equal(t, 0, len(values)) + }) + + t.Run("root metadata slab with composite values, unload random slab", func(t *testing.T) { + + storage := newTestPersistentStorage(t) + + const arraySize = 500 + array, values := createArrayWithCompositeValues(t, storage, address, typeInfo, arraySize) + + // parent array (3 levels): 1 root metadata slab, n non-root metadata slabs, n data slabs + // nested composite elements: 1 root data slab for each + require.True(t, len(storage.deltas) > 1+arraySize) + require.True(t, getArrayMetaDataSlabCount(storage) > 1) + + verifyArrayLoadedElements(t, array, values) + + type slabInfo struct { + id SlabID + startIndex int + count int + children []*slabInfo + } + + rootMetaDataSlab, ok := array.root.(*ArrayMetaDataSlab) + require.True(t, ok) + + var dataSlabCount, metadataSlabCount int + nonrootMetadataSlabInfos := make([]*slabInfo, len(rootMetaDataSlab.childrenHeaders)) + for i, mheader := range rootMetaDataSlab.childrenHeaders { + + nonrootMetadataSlabInfo := &slabInfo{ + id: mheader.slabID, + startIndex: metadataSlabCount, + count: int(mheader.count), + } + metadataSlabCount += int(mheader.count) + + nonrootMetadataSlab, ok := storage.deltas[mheader.slabID].(*ArrayMetaDataSlab) + require.True(t, ok) + + children := make([]*slabInfo, len(nonrootMetadataSlab.childrenHeaders)) + for i, h := range nonrootMetadataSlab.childrenHeaders { + children[i] = &slabInfo{ + id: h.slabID, + startIndex: dataSlabCount, + count: int(h.count), + } + dataSlabCount += int(h.count) + } + + nonrootMetadataSlabInfo.children = children + nonrootMetadataSlabInfos[i] = nonrootMetadataSlabInfo + } + + r := newRand(t) + + const ( + metadataSlabType int = iota + dataSlabType + maxSlabType + ) + + for len(nonrootMetadataSlabInfos) > 0 { + + var slabInfoToBeRemoved *slabInfo + var isLastSlab bool + + // Unload random metadata or data slab. + switch r.Intn(maxSlabType) { + + case metadataSlabType: + // Unload metadata slab at random index. + metadataSlabIndex := r.Intn(len(nonrootMetadataSlabInfos)) + + isLastSlab = metadataSlabIndex == len(nonrootMetadataSlabInfos)-1 + + slabInfoToBeRemoved = nonrootMetadataSlabInfos[metadataSlabIndex] + + count := slabInfoToBeRemoved.count + + // Update startIndex for subsequence metadata and data slabs. + for i := metadataSlabIndex + 1; i < len(nonrootMetadataSlabInfos); i++ { + nonrootMetadataSlabInfos[i].startIndex -= count + + for j := 0; j < len(nonrootMetadataSlabInfos[i].children); j++ { + nonrootMetadataSlabInfos[i].children[j].startIndex -= count + } + } + + copy(nonrootMetadataSlabInfos[metadataSlabIndex:], nonrootMetadataSlabInfos[metadataSlabIndex+1:]) + nonrootMetadataSlabInfos = nonrootMetadataSlabInfos[:len(nonrootMetadataSlabInfos)-1] + + case dataSlabType: + // Unload data slab at randome index. + metadataSlabIndex := r.Intn(len(nonrootMetadataSlabInfos)) + + metaSlabInfo := nonrootMetadataSlabInfos[metadataSlabIndex] + + dataSlabIndex := r.Intn(len(metaSlabInfo.children)) + + slabInfoToBeRemoved = metaSlabInfo.children[dataSlabIndex] + + isLastSlab = (metadataSlabIndex == len(nonrootMetadataSlabInfos)-1) && + (dataSlabIndex == len(metaSlabInfo.children)-1) + + count := slabInfoToBeRemoved.count + + // Update startIndex for subsequence data slabs. + for i := dataSlabIndex + 1; i < len(metaSlabInfo.children); i++ { + metaSlabInfo.children[i].startIndex -= count + } + + copy(metaSlabInfo.children[dataSlabIndex:], metaSlabInfo.children[dataSlabIndex+1:]) + metaSlabInfo.children = metaSlabInfo.children[:len(metaSlabInfo.children)-1] + + metaSlabInfo.count -= count + + // Update startIndex for all subsequence metadata slabs. + for i := metadataSlabIndex + 1; i < len(nonrootMetadataSlabInfos); i++ { + nonrootMetadataSlabInfos[i].startIndex -= count + + for j := 0; j < len(nonrootMetadataSlabInfos[i].children); j++ { + nonrootMetadataSlabInfos[i].children[j].startIndex -= count + } + } + + if len(metaSlabInfo.children) == 0 { + copy(nonrootMetadataSlabInfos[metadataSlabIndex:], nonrootMetadataSlabInfos[metadataSlabIndex+1:]) + nonrootMetadataSlabInfos = nonrootMetadataSlabInfos[:len(nonrootMetadataSlabInfos)-1] + } + } + + err := storage.Remove(slabInfoToBeRemoved.id) + require.NoError(t, err) + + if isLastSlab { + values = values[:slabInfoToBeRemoved.startIndex] + } else { + copy(values[slabInfoToBeRemoved.startIndex:], values[slabInfoToBeRemoved.startIndex+slabInfoToBeRemoved.count:]) + values = values[:len(values)-slabInfoToBeRemoved.count] + } + + verifyArrayLoadedElements(t, array, values) + } + + require.Equal(t, 0, len(values)) + }) +} + +func createArrayWithSimpleValues( + t *testing.T, + storage SlabStorage, + address Address, + typeInfo TypeInfo, + arraySize int, +) (*Array, []Value) { + + // Create parent array + array, err := NewArray(storage, address, typeInfo) + require.NoError(t, err) + + values := make([]Value, arraySize) + r := rune('a') + for i := 0; i < arraySize; i++ { + values[i] = NewStringValue(strings.Repeat(string(r), 20)) + + err := array.Append(values[i]) + require.NoError(t, err) + } + + return array, values +} + +func createArrayWithCompositeValues( + t *testing.T, + storage SlabStorage, + address Address, + typeInfo TypeInfo, + arraySize int, +) (*Array, []Value) { + + // Create parent array + array, err := NewArray(storage, address, typeInfo) + require.NoError(t, err) + + expectedValues := make([]Value, arraySize) + for i := 0; i < arraySize; i++ { + // Create nested array + nested, err := NewArray(storage, address, typeInfo) + require.NoError(t, err) + + err = nested.Append(Uint64Value(i)) + require.NoError(t, err) + + expectedValues[i] = nested + + // Append nested array to parent + err = array.Append(nested) + require.NoError(t, err) + } + + return array, expectedValues +} + +func createArrayWithSimpleAndCompositeValues( + t *testing.T, + storage SlabStorage, + address Address, + typeInfo TypeInfo, + arraySize int, + compositeValueIndex int, +) (*Array, []Value) { + require.True(t, compositeValueIndex < arraySize) + + array, err := NewArray(storage, address, typeInfo) + require.NoError(t, err) + + values := make([]Value, arraySize) + r := 'a' + for i := 0; i < arraySize; i++ { + + if compositeValueIndex == i { + // Create nested array with one element + a, err := NewArray(storage, address, typeInfo) + require.NoError(t, err) + + err = a.Append(Uint64Value(i)) + require.NoError(t, err) + + values[i] = a + } else { + values[i] = NewStringValue(strings.Repeat(string(r), 20)) + r++ + } + + err = array.Append(values[i]) + require.NoError(t, err) + } + + return array, values +} + +func verifyArrayLoadedElements(t *testing.T, array *Array, expectedValues []Value) { + i := 0 + err := array.IterateLoadedValues(func(v Value) (bool, error) { + require.True(t, i < len(expectedValues)) + valueEqual(t, typeInfoComparator, expectedValues[i], v) + i++ + return true, nil + }) + require.NoError(t, err) + require.Equal(t, len(expectedValues), i) +} + +func getArrayMetaDataSlabCount(storage *PersistentSlabStorage) int { + var counter int + for _, slab := range storage.deltas { + if _, ok := slab.(*ArrayMetaDataSlab); ok { + counter++ + } + } + return counter +} + func TestArrayID(t *testing.T) { typeInfo := testTypeInfo{42} storage := newTestPersistentStorage(t) diff --git a/map.go b/map.go index 26d72b59..288d80b0 100644 --- a/map.go +++ b/map.go @@ -4635,3 +4635,283 @@ func nextLevelMapSlabs(storage SlabStorage, address Address, slabs []MapSlab) ([ return slabs[:nextLevelSlabsIndex], nil } + +type mapLoadedElementIterator struct { + storage SlabStorage + elements elements + index int + collisionGroupIterator *mapLoadedElementIterator +} + +func (i *mapLoadedElementIterator) next() (key Value, value Value, err error) { + // Iterate loaded elements in data slab (including elements in collision groups). + for i.index < int(i.elements.Count()) || i.collisionGroupIterator != nil { + + // Iterate elements in collision group. + if i.collisionGroupIterator != nil { + key, value, err = i.collisionGroupIterator.next() + if err != nil { + // Don't need to wrap error as external error because err is already categorized by mapLoadedElementIterator.next(). + return nil, nil, err + } + if key != nil { + return key, value, nil + } + + // Reach end of collision group. + i.collisionGroupIterator = nil + continue + } + + element, err := i.elements.Element(i.index) + if err != nil { + // Don't need to wrap error as external error because err is already categorized by elements.Element(). + return nil, nil, err + } + + i.index++ + + switch e := element.(type) { + case *singleElement: + + keyValue, err := getLoadedValue(i.storage, e.key) + if err != nil { + // Don't need to wrap error as external error because err is already categorized by getLoadedValue. + return nil, nil, err + } + if keyValue == nil { + // Skip this element because element key references unloaded slab. + // Try next element. + continue + } + + valueValue, err := getLoadedValue(i.storage, e.value) + if err != nil { + // Don't need to wrap error as external error because err is already categorized by getLoadedValue. + return nil, nil, err + } + if valueValue == nil { + // Skip this element because element value references unloaded slab. + // Try next element. + continue + } + + return keyValue, valueValue, nil + + case *inlineCollisionGroup: + elems, err := e.Elements(i.storage) + if err != nil { + // Don't need to wrap error as external error because err is already categorized by elementGroup.Elements(). + return nil, nil, err + } + + i.collisionGroupIterator = &mapLoadedElementIterator{ + storage: i.storage, + elements: elems, + } + + // Continue to iterate elements in collision group using collisionGroupIterator. + continue + + case *externalCollisionGroup: + externalSlab := i.storage.RetrieveIfLoaded(e.slabID) + if externalSlab == nil { + // Skip this collsion group because external slab isn't loaded. + // Try next element. + continue + } + + dataSlab, ok := externalSlab.(*MapDataSlab) + if !ok { + return nil, nil, NewSlabDataErrorf("slab %s isn't MapDataSlab", e.slabID) + } + + i.collisionGroupIterator = &mapLoadedElementIterator{ + storage: i.storage, + elements: dataSlab.elements, + } + + // Continue to iterate elements in collision group using collisionGroupIterator. + continue + + default: + return nil, nil, NewSlabDataError(fmt.Errorf("unexpected element type %T during map iteration", element)) + } + } + + // Reach end of map data slab. + return nil, nil, nil +} + +type mapLoadedSlabIterator struct { + storage SlabStorage + slab *MapMetaDataSlab + index int +} + +func (i *mapLoadedSlabIterator) next() Slab { + // Iterate loaded slabs in meta data slab. + for i.index < len(i.slab.childrenHeaders) { + header := i.slab.childrenHeaders[i.index] + i.index++ + + childSlab := i.storage.RetrieveIfLoaded(header.slabID) + if childSlab == nil { + // Skip this child because it references unloaded slab. + // Try next child. + continue + } + + return childSlab + } + + // Reach end of children. + return nil +} + +// MapLoadedValueIterator is used to iterate loaded map elements. +type MapLoadedValueIterator struct { + storage SlabStorage + parents []*mapLoadedSlabIterator // LIFO stack for parents of dataIterator + dataIterator *mapLoadedElementIterator +} + +func (i *MapLoadedValueIterator) nextDataIterator() (*mapLoadedElementIterator, error) { + + // Iterate parents (LIFO) to find next loaded map data slab. + for len(i.parents) > 0 { + lastParent := i.parents[len(i.parents)-1] + + nextChildSlab := lastParent.next() + + switch slab := nextChildSlab.(type) { + case *MapDataSlab: + // Create data iterator + return &mapLoadedElementIterator{ + storage: i.storage, + elements: slab.elements, + }, nil + + case *MapMetaDataSlab: + // Push new parent to parents queue + newParent := &mapLoadedSlabIterator{ + storage: i.storage, + slab: slab, + } + i.parents = append(i.parents, newParent) + + case nil: + // Reach end of last parent. + // Reset last parent to nil and pop last parent from parents stack. + lastParentIndex := len(i.parents) - 1 + i.parents[lastParentIndex] = nil + i.parents = i.parents[:lastParentIndex] + + default: + return nil, NewSlabDataErrorf("slab %s isn't MapSlab", nextChildSlab.SlabID()) + } + } + + // Reach end of parents stack. + return nil, nil +} + +// Next iterates and returns next loaded element. +// It returns nil Value at end of loaded elements. +func (i *MapLoadedValueIterator) Next() (Value, Value, error) { + if i.dataIterator != nil { + key, value, err := i.dataIterator.next() + if err != nil { + // Don't need to wrap error as external error because err is already categorized by mapLoadedElementIterator.next(). + return nil, nil, err + } + if key != nil { + return key, value, nil + } + + // Reach end of element in current data slab. + i.dataIterator = nil + } + + // Get next data iterator. + var err error + i.dataIterator, err = i.nextDataIterator() + if err != nil { + // Don't need to wrap error as external error because err is already categorized by MapLoadedValueIterator.nextDataIterator(). + return nil, nil, err + } + if i.dataIterator != nil { + return i.Next() + } + + // Reach end of loaded value iterator + return nil, nil, nil +} + +// LoadedValueIterator returns iterator to iterate loaded map elements. +func (m *OrderedMap) LoadedValueIterator() (*MapLoadedValueIterator, error) { + switch slab := m.root.(type) { + + case *MapDataSlab: + // Create a data iterator from root slab. + dataIterator := &mapLoadedElementIterator{ + storage: m.Storage, + elements: slab.elements, + } + + // Create iterator with data iterator (no parents). + iterator := &MapLoadedValueIterator{ + storage: m.Storage, + dataIterator: dataIterator, + } + + return iterator, nil + + case *MapMetaDataSlab: + // Create a slab iterator from root slab. + slabIterator := &mapLoadedSlabIterator{ + storage: m.Storage, + slab: slab, + } + + // Create iterator with parent (data iterater is uninitialized). + iterator := &MapLoadedValueIterator{ + storage: m.Storage, + parents: []*mapLoadedSlabIterator{slabIterator}, + } + + return iterator, nil + + default: + return nil, NewSlabDataErrorf("slab %s isn't MapSlab", slab.SlabID()) + } +} + +// IterateLoadedValues iterates loaded map values. +func (m *OrderedMap) IterateLoadedValues(fn MapEntryIterationFunc) error { + iterator, err := m.LoadedValueIterator() + if err != nil { + // Don't need to wrap error as external error because err is already categorized by OrderedMap.LoadedValueIterator(). + return err + } + + var key, value Value + for { + key, value, err = iterator.Next() + if err != nil { + // Don't need to wrap error as external error because err is already categorized by MapLoadedValueIterator.Next(). + return err + } + if key == nil { + return nil + } + resume, err := fn(key, value) + if err != nil { + // Wrap err as external error (if needed) because err is returned by MapEntryIterationFunc callback. + return wrapErrorAsExternalErrorIfNeeded(err) + } + if !resume { + return nil + } + } +} diff --git a/map_test.go b/map_test.go index d31f1172..0850ac12 100644 --- a/map_test.go +++ b/map_test.go @@ -4099,6 +4099,1674 @@ func TestMaxCollisionLimitPerDigest(t *testing.T) { }) } +func TestMapLoadedValueIterator(t *testing.T) { + + SetThreshold(256) + defer SetThreshold(1024) + + typeInfo := testTypeInfo{42} + address := Address{1, 2, 3, 4, 5, 6, 7, 8} + + t.Run("empty", func(t *testing.T) { + storage := newTestPersistentStorage(t) + + digesterBuilder := &mockDigesterBuilder{} + + m, err := NewMap(storage, address, digesterBuilder, typeInfo) + require.NoError(t, err) + + // parent map: 1 root data slab + require.Equal(t, 1, len(storage.deltas)) + require.Equal(t, 0, getMapMetaDataSlabCount(storage)) + + verifyMapLoadedElements(t, m, nil) + }) + + t.Run("root data slab with simple values", func(t *testing.T) { + storage := newTestPersistentStorage(t) + + const mapSize = 3 + m, values := createMapWithSimpleValues( + t, + storage, + address, + typeInfo, + mapSize, + func(i int) []Digest { return []Digest{Digest(i)} }, + ) + + // parent map: 1 root data slab + require.Equal(t, 1, len(storage.deltas)) + require.Equal(t, 0, getMapMetaDataSlabCount(storage)) + + verifyMapLoadedElements(t, m, values) + }) + + t.Run("root data slab with composite values", func(t *testing.T) { + storage := newTestPersistentStorage(t) + + const mapSize = 3 + m, values := createMapWithCompositeValues( + t, + storage, + address, + typeInfo, + mapSize, + func(i int) []Digest { return []Digest{Digest(i)} }, + ) + + // parent map: 1 root data slab + // composite elements: 1 root data slab for each + require.Equal(t, 1+mapSize, len(storage.deltas)) + require.Equal(t, 0, getMapMetaDataSlabCount(storage)) + + verifyMapLoadedElements(t, m, values) + }) + + t.Run("root data slab with composite values in collision group", func(t *testing.T) { + storage := newTestPersistentStorage(t) + + // Create parent map with 3 collision groups, 2 elements in each group. + const mapSize = 6 + m, values := createMapWithCompositeValues( + t, + storage, + address, + typeInfo, + mapSize, + func(i int) []Digest { return []Digest{Digest(i / 2), Digest(i)} }, + ) + + // parent map: 1 root data slab + // composite elements: 1 root data slab for each + require.Equal(t, 1+mapSize, len(storage.deltas)) + require.Equal(t, 0, getMapMetaDataSlabCount(storage)) + + verifyMapLoadedElements(t, m, values) + }) + + t.Run("root data slab with composite values in external collision group", func(t *testing.T) { + storage := newTestPersistentStorage(t) + + // Create parent map with 3 external collision group, 3 elements in the group. + const mapSize = 9 + m, values := createMapWithCompositeValues( + t, + storage, + address, + typeInfo, + mapSize, + func(i int) []Digest { return []Digest{Digest(i / 3), Digest(i)} }, + ) + + // parent map: 1 root data slab, 3 external collision group + // composite elements: 1 root data slab for each + require.Equal(t, 1+3+mapSize, len(storage.deltas)) + require.Equal(t, 0, getMapMetaDataSlabCount(storage)) + + verifyMapLoadedElements(t, m, values) + }) + + t.Run("root data slab with composite values, unload value from front to back", func(t *testing.T) { + storage := newTestPersistentStorage(t) + + const mapSize = 3 + m, values := createMapWithCompositeValues( + t, + storage, + address, + typeInfo, + mapSize, + func(i int) []Digest { return []Digest{Digest(i)} }, + ) + + // parent map: 1 root data slab + // composite elements: 1 root data slab for each + require.Equal(t, 1+mapSize, len(storage.deltas)) + require.Equal(t, 0, getMapMetaDataSlabCount(storage)) + + verifyMapLoadedElements(t, m, values) + + // Unload composite element from front to back. + for i := 0; i < len(values); i++ { + v := values[i][1] + + nestedArray, ok := v.(*Array) + require.True(t, ok) + + err := storage.Remove(nestedArray.SlabID()) + require.NoError(t, err) + + expectedValues := values[i+1:] + verifyMapLoadedElements(t, m, expectedValues) + } + }) + + t.Run("root data slab with long string keys, unload key from front to back", func(t *testing.T) { + storage := newTestPersistentStorage(t) + + const mapSize = 3 + m, values := createMapWithLongStringKey(t, storage, address, typeInfo, mapSize) + + // parent map: 1 root data slab + // long string keys: 1 storable slab for each + require.Equal(t, 1+mapSize, len(storage.deltas)) + require.Equal(t, 0, getMapMetaDataSlabCount(storage)) + + verifyMapLoadedElements(t, m, values) + + // Unload external key from front to back. + for i := 0; i < len(values); i++ { + k := values[i][0] + + s, ok := k.(StringValue) + require.True(t, ok) + + // Find storage id for StringValue s. + var keyID SlabID + for id, slab := range storage.deltas { + if sslab, ok := slab.(*StorableSlab); ok { + if other, ok := sslab.storable.(StringValue); ok { + if s.str == other.str { + keyID = id + break + } + } + } + } + + require.NoError(t, keyID.Valid()) + + err := storage.Remove(keyID) + require.NoError(t, err) + + expectedValues := values[i+1:] + verifyMapLoadedElements(t, m, expectedValues) + } + }) + + t.Run("root data slab with composite values in collision group, unload value from front to back", func(t *testing.T) { + storage := newTestPersistentStorage(t) + + // Create parent map with 3 collision groups, 2 elements in each group. + const mapSize = 6 + m, values := createMapWithCompositeValues( + t, + storage, + address, + typeInfo, + mapSize, + func(i int) []Digest { return []Digest{Digest(i / 2), Digest(i)} }, + ) + + // parent map: 1 root data slab + // composite elements: 1 root data slab for each + require.Equal(t, 1+mapSize, len(storage.deltas)) + require.Equal(t, 0, getMapMetaDataSlabCount(storage)) + + verifyMapLoadedElements(t, m, values) + + // Unload composite element from front to back. + for i := 0; i < len(values); i++ { + v := values[i][1] + + nestedArray, ok := v.(*Array) + require.True(t, ok) + + err := storage.Remove(nestedArray.SlabID()) + require.NoError(t, err) + + expectedValues := values[i+1:] + verifyMapLoadedElements(t, m, expectedValues) + } + }) + + t.Run("root data slab with composite values in external collision group, unload value from front to back", func(t *testing.T) { + storage := newTestPersistentStorage(t) + + // Create parent map with 3 external collision groups, 3 elements in the group. + const mapSize = 9 + m, values := createMapWithCompositeValues( + t, + storage, + address, + typeInfo, + mapSize, + func(i int) []Digest { return []Digest{Digest(i / 3), Digest(i)} }, + ) + + // parent map: 1 root data slab, 3 external collision group + // composite elements: 1 root data slab for each + require.Equal(t, 1+3+mapSize, len(storage.deltas)) + require.Equal(t, 0, getMapMetaDataSlabCount(storage)) + + verifyMapLoadedElements(t, m, values) + + // Unload composite element from front to back + for i := 0; i < len(values); i++ { + v := values[i][1] + + nestedArray, ok := v.(*Array) + require.True(t, ok) + + err := storage.Remove(nestedArray.SlabID()) + require.NoError(t, err) + + expectedValues := values[i+1:] + verifyMapLoadedElements(t, m, expectedValues) + } + }) + + t.Run("root data slab with composite values in external collision group, unload external slab from front to back", func(t *testing.T) { + storage := newTestPersistentStorage(t) + + // Create parent map with 3 external collision groups, 3 elements in the group. + const mapSize = 9 + m, values := createMapWithCompositeValues( + t, + storage, + address, + typeInfo, + mapSize, + func(i int) []Digest { return []Digest{Digest(i / 3), Digest(i)} }, + ) + + // parent map: 1 root data slab, 3 external collision group + // composite elements: 1 root data slab for each + require.Equal(t, 1+3+mapSize, len(storage.deltas)) + require.Equal(t, 0, getMapMetaDataSlabCount(storage)) + + verifyMapLoadedElements(t, m, values) + + // Unload external collision group slab from front to back + + var externalCollisionSlabIDs []SlabID + for id, slab := range storage.deltas { + if dataSlab, ok := slab.(*MapDataSlab); ok { + if dataSlab.collisionGroup { + externalCollisionSlabIDs = append(externalCollisionSlabIDs, id) + } + } + } + require.Equal(t, 3, len(externalCollisionSlabIDs)) + + sort.Slice(externalCollisionSlabIDs, func(i, j int) bool { + a := externalCollisionSlabIDs[i] + b := externalCollisionSlabIDs[j] + if a.address == b.address { + return a.IndexAsUint64() < b.IndexAsUint64() + } + return a.AddressAsUint64() < b.AddressAsUint64() + }) + + for i, id := range externalCollisionSlabIDs { + err := storage.Remove(id) + require.NoError(t, err) + + expectedValues := values[i*3+3:] + verifyMapLoadedElements(t, m, expectedValues) + } + }) + + t.Run("root data slab with composite values, unload composite value from back to front", func(t *testing.T) { + storage := newTestPersistentStorage(t) + + const mapSize = 3 + m, values := createMapWithCompositeValues( + t, + storage, + address, + typeInfo, + mapSize, + func(i int) []Digest { return []Digest{Digest(i)} }, + ) + + // parent map: 1 root data slab + // composite elements: 1 root data slab for each + require.Equal(t, 1+mapSize, len(storage.deltas)) + require.Equal(t, 0, getMapMetaDataSlabCount(storage)) + + verifyMapLoadedElements(t, m, values) + + // Unload composite element from back to front. + for i := len(values) - 1; i >= 0; i-- { + v := values[i][1] + + nestedArray, ok := v.(*Array) + require.True(t, ok) + + err := storage.Remove(nestedArray.SlabID()) + require.NoError(t, err) + + expectedValues := values[:i] + verifyMapLoadedElements(t, m, expectedValues) + } + }) + + t.Run("root data slab with long string key, unload key from back to front", func(t *testing.T) { + storage := newTestPersistentStorage(t) + + const mapSize = 3 + m, values := createMapWithLongStringKey(t, storage, address, typeInfo, mapSize) + + // parent map: 1 root data slab + // long string keys: 1 storable slab for each + require.Equal(t, 1+mapSize, len(storage.deltas)) + require.Equal(t, 0, getMapMetaDataSlabCount(storage)) + + verifyMapLoadedElements(t, m, values) + + // Unload composite element from front to back. + for i := len(values) - 1; i >= 0; i-- { + k := values[i][0] + + s, ok := k.(StringValue) + require.True(t, ok) + + // Find storage id for StringValue s. + var keyID SlabID + for id, slab := range storage.deltas { + if sslab, ok := slab.(*StorableSlab); ok { + if other, ok := sslab.storable.(StringValue); ok { + if s.str == other.str { + keyID = id + break + } + } + } + } + + require.NoError(t, keyID.Valid()) + + err := storage.Remove(keyID) + require.NoError(t, err) + + expectedValues := values[:i] + verifyMapLoadedElements(t, m, expectedValues) + } + }) + + t.Run("root data slab with composite values in collision group, unload value from back to front", func(t *testing.T) { + storage := newTestPersistentStorage(t) + + // Create parent map with 3 collision groups, 2 elements in each group. + const mapSize = 6 + m, values := createMapWithCompositeValues( + t, + storage, + address, + typeInfo, + mapSize, + func(i int) []Digest { return []Digest{Digest(i / 2), Digest(i)} }, + ) + + // parent map: 1 root data slab + // composite elements: 1 root data slab for each + require.Equal(t, 1+mapSize, len(storage.deltas)) + require.Equal(t, 0, getMapMetaDataSlabCount(storage)) + + verifyMapLoadedElements(t, m, values) + + // Unload composite element from back to front + for i := len(values) - 1; i >= 0; i-- { + v := values[i][1] + + nestedArray, ok := v.(*Array) + require.True(t, ok) + + err := storage.Remove(nestedArray.SlabID()) + require.NoError(t, err) + + expectedValues := values[:i] + verifyMapLoadedElements(t, m, expectedValues) + } + }) + + t.Run("root data slab with composite values in external collision group, unload value from back to front", func(t *testing.T) { + storage := newTestPersistentStorage(t) + + // Create parent map with 3 external collision groups, 3 elements in the group. + const mapSize = 9 + m, values := createMapWithCompositeValues( + t, + storage, + address, + typeInfo, + mapSize, + func(i int) []Digest { return []Digest{Digest(i / 3), Digest(i)} }, + ) + + // parent map: 1 root data slab, 3 external collision group + // composite elements: 1 root data slab for each + require.Equal(t, 1+3+mapSize, len(storage.deltas)) + require.Equal(t, 0, getMapMetaDataSlabCount(storage)) + + verifyMapLoadedElements(t, m, values) + + // Unload composite element from back to front + for i := len(values) - 1; i >= 0; i-- { + v := values[i][1] + + nestedArray, ok := v.(*Array) + require.True(t, ok) + + err := storage.Remove(nestedArray.SlabID()) + require.NoError(t, err) + + expectedValues := values[:i] + verifyMapLoadedElements(t, m, expectedValues) + } + }) + + t.Run("root data slab with composite values in external collision group, unload external slab from back to front", func(t *testing.T) { + storage := newTestPersistentStorage(t) + + // Create parent map with 3 external collision groups, 3 elements in the group. + const mapSize = 9 + m, values := createMapWithCompositeValues( + t, + storage, + address, + typeInfo, + mapSize, + func(i int) []Digest { return []Digest{Digest(i / 3), Digest(i)} }, + ) + + // parent map: 1 root data slab, 3 external collision group + // composite elements: 1 root data slab for each + require.Equal(t, 1+3+mapSize, len(storage.deltas)) + require.Equal(t, 0, getMapMetaDataSlabCount(storage)) + + verifyMapLoadedElements(t, m, values) + + // Unload external slabs from back to front + var externalCollisionSlabIDs []SlabID + for id, slab := range storage.deltas { + if dataSlab, ok := slab.(*MapDataSlab); ok { + if dataSlab.collisionGroup { + externalCollisionSlabIDs = append(externalCollisionSlabIDs, id) + } + } + } + require.Equal(t, 3, len(externalCollisionSlabIDs)) + + sort.Slice(externalCollisionSlabIDs, func(i, j int) bool { + a := externalCollisionSlabIDs[i] + b := externalCollisionSlabIDs[j] + if a.address == b.address { + return a.IndexAsUint64() < b.IndexAsUint64() + } + return a.AddressAsUint64() < b.AddressAsUint64() + }) + + for i := len(externalCollisionSlabIDs) - 1; i >= 0; i-- { + err := storage.Remove(externalCollisionSlabIDs[i]) + require.NoError(t, err) + + expectedValues := values[:i*3] + verifyMapLoadedElements(t, m, expectedValues) + } + }) + + t.Run("root data slab with composite values, unload value in the middle", func(t *testing.T) { + storage := newTestPersistentStorage(t) + + const mapSize = 3 + m, values := createMapWithCompositeValues( + t, + storage, + address, + typeInfo, + mapSize, + func(i int) []Digest { return []Digest{Digest(i)} }, + ) + + // parent map: 1 root data slab + // nested composite elements: 1 root data slab for each + require.Equal(t, 1+mapSize, len(storage.deltas)) + require.Equal(t, 0, getMapMetaDataSlabCount(storage)) + + verifyMapLoadedElements(t, m, values) + + // Unload value in the middle + unloadValueIndex := 1 + + v := values[unloadValueIndex][1] + + nestedArray, ok := v.(*Array) + require.True(t, ok) + + err := storage.Remove(nestedArray.SlabID()) + require.NoError(t, err) + + copy(values[unloadValueIndex:], values[unloadValueIndex+1:]) + values = values[:len(values)-1] + + verifyMapLoadedElements(t, m, values) + }) + + t.Run("root data slab with long string key, unload key in the middle", func(t *testing.T) { + storage := newTestPersistentStorage(t) + + const mapSize = 3 + m, values := createMapWithLongStringKey(t, storage, address, typeInfo, mapSize) + + // parent map: 1 root data slab + // nested composite elements: 1 root data slab for each + require.Equal(t, 1+mapSize, len(storage.deltas)) + require.Equal(t, 0, getMapMetaDataSlabCount(storage)) + + verifyMapLoadedElements(t, m, values) + + // Unload key in the middle. + unloadValueIndex := 1 + + k := values[unloadValueIndex][0] + + s, ok := k.(StringValue) + require.True(t, ok) + + // Find storage id for StringValue s. + var keyID SlabID + for id, slab := range storage.deltas { + if sslab, ok := slab.(*StorableSlab); ok { + if other, ok := sslab.storable.(StringValue); ok { + if s.str == other.str { + keyID = id + break + } + } + } + } + + require.NoError(t, keyID.Valid()) + + err := storage.Remove(keyID) + require.NoError(t, err) + + copy(values[unloadValueIndex:], values[unloadValueIndex+1:]) + values = values[:len(values)-1] + + verifyMapLoadedElements(t, m, values) + }) + + t.Run("root data slab with composite values in collision group, unload value in the middle", func(t *testing.T) { + storage := newTestPersistentStorage(t) + + // Create parent map with 3 collision groups, 2 elements in each group. + const mapSize = 6 + m, values := createMapWithCompositeValues( + t, + storage, + address, + typeInfo, + mapSize, + func(i int) []Digest { return []Digest{Digest(i / 2), Digest(i)} }, + ) + + // parent map: 1 root data slab + // nested composite elements: 1 root data slab for each + require.Equal(t, 1+mapSize, len(storage.deltas)) + require.Equal(t, 0, getMapMetaDataSlabCount(storage)) + + verifyMapLoadedElements(t, m, values) + + // Unload composite element in the middle + for _, unloadValueIndex := range []int{1, 3, 5} { + v := values[unloadValueIndex][1] + + nestedArray, ok := v.(*Array) + require.True(t, ok) + + err := storage.Remove(nestedArray.SlabID()) + require.NoError(t, err) + } + + expectedValues := [][2]Value{ + values[0], + values[2], + values[4], + } + verifyMapLoadedElements(t, m, expectedValues) + }) + + t.Run("root data slab with composite values in external collision group, unload value in the middle", func(t *testing.T) { + storage := newTestPersistentStorage(t) + + // Create parent map with 3 external collision groups, 3 elements in the group. + const mapSize = 9 + m, values := createMapWithCompositeValues( + t, + storage, + address, + typeInfo, + mapSize, + func(i int) []Digest { return []Digest{Digest(i / 3), Digest(i)} }, + ) + + // parent map: 1 root data slab, 3 external collision group + // nested composite elements: 1 root data slab for each + require.Equal(t, 1+3+mapSize, len(storage.deltas)) + require.Equal(t, 0, getMapMetaDataSlabCount(storage)) + + verifyMapLoadedElements(t, m, values) + + // Unload composite value in the middle. + for _, unloadValueIndex := range []int{1, 3, 5, 7} { + v := values[unloadValueIndex][1] + + nestedArray, ok := v.(*Array) + require.True(t, ok) + + err := storage.Remove(nestedArray.SlabID()) + require.NoError(t, err) + } + + expectedValues := [][2]Value{ + values[0], + values[2], + values[4], + values[6], + values[8], + } + verifyMapLoadedElements(t, m, expectedValues) + }) + + t.Run("root data slab with composite values in external collision group, unload external slab in the middle", func(t *testing.T) { + storage := newTestPersistentStorage(t) + + // Create parent map with 3 external collision groups, 3 elements in the group. + const mapSize = 9 + m, values := createMapWithCompositeValues( + t, + storage, + address, + typeInfo, + mapSize, + func(i int) []Digest { return []Digest{Digest(i / 3), Digest(i)} }, + ) + + // parent map: 1 root data slab, 3 external collision group + // nested composite elements: 1 root data slab for each + require.Equal(t, 1+3+mapSize, len(storage.deltas)) + require.Equal(t, 0, getMapMetaDataSlabCount(storage)) + + verifyMapLoadedElements(t, m, values) + + // Unload external slabs in the middle. + var externalCollisionSlabIDs []SlabID + for id, slab := range storage.deltas { + if dataSlab, ok := slab.(*MapDataSlab); ok { + if dataSlab.collisionGroup { + externalCollisionSlabIDs = append(externalCollisionSlabIDs, id) + } + } + } + require.Equal(t, 3, len(externalCollisionSlabIDs)) + + sort.Slice(externalCollisionSlabIDs, func(i, j int) bool { + a := externalCollisionSlabIDs[i] + b := externalCollisionSlabIDs[j] + if a.address == b.address { + return a.IndexAsUint64() < b.IndexAsUint64() + } + return a.AddressAsUint64() < b.AddressAsUint64() + }) + + id := externalCollisionSlabIDs[1] + err := storage.Remove(id) + require.NoError(t, err) + + copy(values[3:], values[6:]) + values = values[:6] + + verifyMapLoadedElements(t, m, values) + }) + + t.Run("root data slab with composite values, unload composite elements during iteration", func(t *testing.T) { + storage := newTestPersistentStorage(t) + + const mapSize = 3 + m, values := createMapWithCompositeValues( + t, + storage, + address, + typeInfo, + mapSize, + func(i int) []Digest { return []Digest{Digest(i)} }, + ) + + // parent map: 1 root data slab + // nested composite elements: 1 root data slab for each + require.Equal(t, 1+mapSize, len(storage.deltas)) + require.Equal(t, 0, getMapMetaDataSlabCount(storage)) + + verifyMapLoadedElements(t, m, values) + + i := 0 + err := m.IterateLoadedValues(func(k Value, v Value) (bool, error) { + // At this point, iterator returned first element (v). + + // Remove all other nested composite elements (except first element) from storage. + for _, element := range values[1:] { + value := element[1] + nestedArray, ok := value.(*Array) + require.True(t, ok) + + err := storage.Remove(nestedArray.SlabID()) + require.NoError(t, err) + } + + require.Equal(t, 0, i) + valueEqual(t, typeInfoComparator, values[0][0], k) + valueEqual(t, typeInfoComparator, values[0][1], v) + i++ + return true, nil + }) + + require.NoError(t, err) + require.Equal(t, 1, i) // Only first element is iterated because other elements are remove during iteration. + }) + + t.Run("root data slab with simple and composite values, unloading composite value", func(t *testing.T) { + const mapSize = 3 + + // Create a map with nested composite value at specified index + for nestedCompositeIndex := 0; nestedCompositeIndex < mapSize; nestedCompositeIndex++ { + storage := newTestPersistentStorage(t) + + m, values := createMapWithSimpleAndCompositeValues( + t, + storage, + address, + typeInfo, + mapSize, + nestedCompositeIndex, + func(i int) []Digest { return []Digest{Digest(i)} }, + ) + + // parent map: 1 root data slab + // composite element: 1 root data slab + require.Equal(t, 2, len(storage.deltas)) + require.Equal(t, 0, getMapMetaDataSlabCount(storage)) + + verifyMapLoadedElements(t, m, values) + + // Unload composite value + v := values[nestedCompositeIndex][1].(*Array) + + err := storage.Remove(v.SlabID()) + require.NoError(t, err) + + copy(values[nestedCompositeIndex:], values[nestedCompositeIndex+1:]) + values = values[:len(values)-1] + + verifyMapLoadedElements(t, m, values) + } + }) + + t.Run("root metadata slab with simple values", func(t *testing.T) { + storage := newTestPersistentStorage(t) + + const mapSize = 20 + m, values := createMapWithSimpleValues( + t, + storage, + address, + typeInfo, + mapSize, + func(i int) []Digest { return []Digest{Digest(i)} }, + ) + + // parent map (2 levels): 1 root metadata slab, 3 data slabs + require.Equal(t, 4, len(storage.deltas)) + require.Equal(t, 1, getMapMetaDataSlabCount(storage)) + + verifyMapLoadedElements(t, m, values) + }) + + t.Run("root metadata slab with composite values", func(t *testing.T) { + storage := newTestPersistentStorage(t) + + const mapSize = 20 + m, values := createMapWithCompositeValues( + t, + storage, + address, + typeInfo, + mapSize, + func(i int) []Digest { return []Digest{Digest(i)} }, + ) + + // parent map (2 levels): 1 root metadata slab, 3 data slabs + // composite values: 1 root data slab for each + require.Equal(t, 4+mapSize, len(storage.deltas)) + require.Equal(t, 1, getMapMetaDataSlabCount(storage)) + + verifyMapLoadedElements(t, m, values) + }) + + t.Run("root metadata slab with composite values, unload value from front to back", func(t *testing.T) { + storage := newTestPersistentStorage(t) + + const mapSize = 20 + m, values := createMapWithCompositeValues( + t, + storage, + address, + typeInfo, + mapSize, + func(i int) []Digest { return []Digest{Digest(i)} }, + ) + + // parent map (2 levels): 1 root metadata slab, 3 data slabs + // composite values : 1 root data slab for each + require.Equal(t, 4+mapSize, len(storage.deltas)) + require.Equal(t, 1, getMapMetaDataSlabCount(storage)) + + verifyMapLoadedElements(t, m, values) + + // Unload composite element from front to back + for i := 0; i < len(values); i++ { + v := values[i][1] + + nestedArray, ok := v.(*Array) + require.True(t, ok) + + err := storage.Remove(nestedArray.SlabID()) + require.NoError(t, err) + + expectedValues := values[i+1:] + verifyMapLoadedElements(t, m, expectedValues) + } + }) + + t.Run("root metadata slab with composite values, unload values from back to front", func(t *testing.T) { + storage := newTestPersistentStorage(t) + + const mapSize = 20 + m, values := createMapWithCompositeValues( + t, + storage, + address, + typeInfo, + mapSize, + func(i int) []Digest { return []Digest{Digest(i)} }, + ) + + // parent map (2 levels): 1 root metadata slab, 3 data slabs + // composite values: 1 root data slab for each + require.Equal(t, 4+mapSize, len(storage.deltas)) + require.Equal(t, 1, getMapMetaDataSlabCount(storage)) + + verifyMapLoadedElements(t, m, values) + + // Unload composite element from back to front + for i := len(values) - 1; i >= 0; i-- { + v := values[i][1] + + nestedArray, ok := v.(*Array) + require.True(t, ok) + + err := storage.Remove(nestedArray.SlabID()) + require.NoError(t, err) + + expectedValues := values[:i] + verifyMapLoadedElements(t, m, expectedValues) + } + }) + + t.Run("root metadata slab with composite values, unload value in the middle", func(t *testing.T) { + storage := newTestPersistentStorage(t) + + const mapSize = 20 + m, values := createMapWithCompositeValues( + t, + storage, + address, + typeInfo, + mapSize, + func(i int) []Digest { return []Digest{Digest(i)} }, + ) + + // parent map (2 levels): 1 root metadata slab, 3 data slabs + // composite values: 1 root data slab for each + require.Equal(t, 4+mapSize, len(storage.deltas)) + require.Equal(t, 1, getMapMetaDataSlabCount(storage)) + + verifyMapLoadedElements(t, m, values) + + // Unload composite element in the middle + for _, index := range []int{4, 14} { + + v := values[index][1] + + nestedArray, ok := v.(*Array) + require.True(t, ok) + + err := storage.Remove(nestedArray.SlabID()) + require.NoError(t, err) + + copy(values[index:], values[index+1:]) + values = values[:len(values)-1] + + verifyMapLoadedElements(t, m, values) + } + }) + + t.Run("root metadata slab with simple and composite values, unload composite value", func(t *testing.T) { + const mapSize = 20 + + // Create a map with nested composite value at specified index + for nestedCompositeIndex := 0; nestedCompositeIndex < mapSize; nestedCompositeIndex++ { + storage := newTestPersistentStorage(t) + + m, values := createMapWithSimpleAndCompositeValues( + t, + storage, + address, + typeInfo, + mapSize, + nestedCompositeIndex, + func(i int) []Digest { return []Digest{Digest(i)} }, + ) + + // parent map (2 levels): 1 root metadata slab, 3 data slabs + // composite values: 1 root data slab for each + require.Equal(t, 5, len(storage.deltas)) + require.Equal(t, 1, getMapMetaDataSlabCount(storage)) + + verifyMapLoadedElements(t, m, values) + + v := values[nestedCompositeIndex][1].(*Array) + + err := storage.Remove(v.SlabID()) + require.NoError(t, err) + + copy(values[nestedCompositeIndex:], values[nestedCompositeIndex+1:]) + values = values[:len(values)-1] + + verifyMapLoadedElements(t, m, values) + } + }) + + t.Run("root metadata slab, unload data slab from front to back", func(t *testing.T) { + storage := newTestPersistentStorage(t) + + const mapSize = 20 + + m, values := createMapWithSimpleValues( + t, + storage, + address, + typeInfo, + mapSize, + func(i int) []Digest { return []Digest{Digest(i)} }, + ) + + // parent map (2 levels): 1 root metadata slab, 3 data slabs + require.Equal(t, 4, len(storage.deltas)) + require.Equal(t, 1, getMapMetaDataSlabCount(storage)) + + verifyMapLoadedElements(t, m, values) + + rootMetaDataSlab, ok := m.root.(*MapMetaDataSlab) + require.True(t, ok) + + // Unload data slabs from front to back + for i := 0; i < len(rootMetaDataSlab.childrenHeaders); i++ { + + childHeader := rootMetaDataSlab.childrenHeaders[i] + + // Get data slab element count before unload it from storage. + // Element count isn't in the header. + mapDataSlab, ok := storage.deltas[childHeader.slabID].(*MapDataSlab) + require.True(t, ok) + + count := mapDataSlab.elements.Count() + + err := storage.Remove(childHeader.slabID) + require.NoError(t, err) + + values = values[count:] + + verifyMapLoadedElements(t, m, values) + } + }) + + t.Run("root metadata slab, unload data slab from back to front", func(t *testing.T) { + storage := newTestPersistentStorage(t) + + const mapSize = 20 + + m, values := createMapWithSimpleValues( + t, + storage, + address, + typeInfo, + mapSize, + func(i int) []Digest { return []Digest{Digest(i)} }, + ) + + // parent map (2 levels): 1 root metadata slab, 3 data slabs + require.Equal(t, 4, len(storage.deltas)) + require.Equal(t, 1, getMapMetaDataSlabCount(storage)) + + verifyMapLoadedElements(t, m, values) + + rootMetaDataSlab, ok := m.root.(*MapMetaDataSlab) + require.True(t, ok) + + // Unload data slabs from back to front + for i := len(rootMetaDataSlab.childrenHeaders) - 1; i >= 0; i-- { + + childHeader := rootMetaDataSlab.childrenHeaders[i] + + // Get data slab element count before unload it from storage + // Element count isn't in the header. + mapDataSlab, ok := storage.deltas[childHeader.slabID].(*MapDataSlab) + require.True(t, ok) + + count := mapDataSlab.elements.Count() + + err := storage.Remove(childHeader.slabID) + require.NoError(t, err) + + values = values[:len(values)-int(count)] + + verifyMapLoadedElements(t, m, values) + } + }) + + t.Run("root metadata slab, unload data slab in the middle", func(t *testing.T) { + storage := newTestPersistentStorage(t) + + const mapSize = 20 + + m, values := createMapWithSimpleValues( + t, + storage, + address, + typeInfo, + mapSize, + func(i int) []Digest { return []Digest{Digest(i)} }, + ) + + // parent map (2 levels): 1 root metadata slab, 3 data slabs + require.Equal(t, 4, len(storage.deltas)) + require.Equal(t, 1, getMapMetaDataSlabCount(storage)) + + verifyMapLoadedElements(t, m, values) + + rootMetaDataSlab, ok := m.root.(*MapMetaDataSlab) + require.True(t, ok) + + require.True(t, len(rootMetaDataSlab.childrenHeaders) > 2) + + index := 1 + childHeader := rootMetaDataSlab.childrenHeaders[index] + + // Get element count from previous data slab + mapDataSlab, ok := storage.deltas[rootMetaDataSlab.childrenHeaders[0].slabID].(*MapDataSlab) + require.True(t, ok) + + countAtIndex0 := mapDataSlab.elements.Count() + + // Get element count from slab to be unloaded + mapDataSlab, ok = storage.deltas[rootMetaDataSlab.childrenHeaders[index].slabID].(*MapDataSlab) + require.True(t, ok) + + countAtIndex1 := mapDataSlab.elements.Count() + + err := storage.Remove(childHeader.slabID) + require.NoError(t, err) + + copy(values[countAtIndex0:], values[countAtIndex0+countAtIndex1:]) + values = values[:m.Count()-uint64(countAtIndex1)] + + verifyMapLoadedElements(t, m, values) + }) + + t.Run("root metadata slab, unload non-root metadata slab from front to back", func(t *testing.T) { + storage := newTestPersistentStorage(t) + + const mapSize = 130 + + m, values := createMapWithSimpleValues( + t, + storage, + address, + typeInfo, + mapSize, + func(i int) []Digest { return []Digest{Digest(i)} }, + ) + + // parent map (3 levels): 1 root metadata slab, 3 child metadata slabs, n data slabs + require.Equal(t, 4, getMapMetaDataSlabCount(storage)) + + rootMetaDataSlab, ok := m.root.(*MapMetaDataSlab) + require.True(t, ok) + + // Unload non-root metadata slabs from front to back. + for i := 0; i < len(rootMetaDataSlab.childrenHeaders); i++ { + + childHeader := rootMetaDataSlab.childrenHeaders[i] + + err := storage.Remove(childHeader.slabID) + require.NoError(t, err) + + // Use firstKey to deduce number of elements in slab. + var expectedValues [][2]Value + if i < len(rootMetaDataSlab.childrenHeaders)-1 { + nextChildHeader := rootMetaDataSlab.childrenHeaders[i+1] + expectedValues = values[int(nextChildHeader.firstKey):] + } + + verifyMapLoadedElements(t, m, expectedValues) + } + }) + + t.Run("root metadata slab, unload non-root metadata slab from back to front", func(t *testing.T) { + storage := newTestPersistentStorage(t) + + const mapSize = 130 + + m, values := createMapWithSimpleValues( + t, + storage, + address, + typeInfo, + mapSize, + func(i int) []Digest { return []Digest{Digest(i)} }, + ) + + // parent map (3 levels): 1 root metadata slab, 3 child metadata slabs, n data slabs + require.Equal(t, 4, getMapMetaDataSlabCount(storage)) + + rootMetaDataSlab, ok := m.root.(*MapMetaDataSlab) + require.True(t, ok) + + // Unload non-root metadata slabs from back to front. + for i := len(rootMetaDataSlab.childrenHeaders) - 1; i >= 0; i-- { + + childHeader := rootMetaDataSlab.childrenHeaders[i] + + err := storage.Remove(childHeader.slabID) + require.NoError(t, err) + + // Use firstKey to deduce number of elements in slabs. + values = values[:childHeader.firstKey] + + verifyMapLoadedElements(t, m, values) + } + }) + + t.Run("root metadata slab with composite values, unload composite value at random index", func(t *testing.T) { + + storage := newTestPersistentStorage(t) + + const mapSize = 500 + m, values := createMapWithCompositeValues( + t, + storage, + address, + typeInfo, + mapSize, + func(i int) []Digest { return []Digest{Digest(i)} }, + ) + + // parent map (3 levels): 1 root metadata slab, n non-root metadata slabs, n data slabs + // nested composite elements: 1 root data slab for each + require.True(t, len(storage.deltas) > 1+mapSize) + require.True(t, getMapMetaDataSlabCount(storage) > 1) + + verifyMapLoadedElements(t, m, values) + + r := newRand(t) + + // Unload composite element in random position + for len(values) > 0 { + + i := r.Intn(len(values)) + + v := values[i][1] + + nestedArray, ok := v.(*Array) + require.True(t, ok) + + err := storage.Remove(nestedArray.SlabID()) + require.NoError(t, err) + + copy(values[i:], values[i+1:]) + values = values[:len(values)-1] + + verifyMapLoadedElements(t, m, values) + } + }) + + t.Run("root metadata slab with composite values, unload random data slab", func(t *testing.T) { + + storage := newTestPersistentStorage(t) + + const mapSize = 500 + m, values := createMapWithCompositeValues( + t, + storage, + address, + typeInfo, + mapSize, + func(i int) []Digest { return []Digest{Digest(i)} }, + ) + + // parent map (3 levels): 1 root metadata slab, n non-root metadata slabs, n data slabs + // composite values: 1 root data slab for each + require.True(t, len(storage.deltas) > 1+mapSize) + require.True(t, getMapMetaDataSlabCount(storage) > 1) + + verifyMapLoadedElements(t, m, values) + + rootMetaDataSlab, ok := m.root.(*MapMetaDataSlab) + require.True(t, ok) + + type slabInfo struct { + id SlabID + startIndex int + count int + } + + var dataSlabInfos []*slabInfo + for _, mheader := range rootMetaDataSlab.childrenHeaders { + + nonRootMetaDataSlab, ok := storage.deltas[mheader.slabID].(*MapMetaDataSlab) + require.True(t, ok) + + for i := 0; i < len(nonRootMetaDataSlab.childrenHeaders); i++ { + h := nonRootMetaDataSlab.childrenHeaders[i] + + if len(dataSlabInfos) > 0 { + // Update previous slabInfo.count + dataSlabInfos[len(dataSlabInfos)-1].count = int(h.firstKey) - dataSlabInfos[len(dataSlabInfos)-1].startIndex + } + + dataSlabInfos = append(dataSlabInfos, &slabInfo{id: h.slabID, startIndex: int(h.firstKey)}) + } + } + + r := newRand(t) + + for len(dataSlabInfos) > 0 { + index := r.Intn(len(dataSlabInfos)) + + slabToBeRemoved := dataSlabInfos[index] + + // Update startIndex for all subsequence data slabs + for i := index + 1; i < len(dataSlabInfos); i++ { + dataSlabInfos[i].startIndex -= slabToBeRemoved.count + } + + err := storage.Remove(slabToBeRemoved.id) + require.NoError(t, err) + + if index == len(dataSlabInfos)-1 { + values = values[:slabToBeRemoved.startIndex] + } else { + copy(values[slabToBeRemoved.startIndex:], values[slabToBeRemoved.startIndex+slabToBeRemoved.count:]) + values = values[:len(values)-slabToBeRemoved.count] + } + + copy(dataSlabInfos[index:], dataSlabInfos[index+1:]) + dataSlabInfos = dataSlabInfos[:len(dataSlabInfos)-1] + + verifyMapLoadedElements(t, m, values) + } + + require.Equal(t, 0, len(values)) + }) + + t.Run("root metadata slab with composite values, unload random slab", func(t *testing.T) { + + storage := newTestPersistentStorage(t) + + const mapSize = 500 + m, values := createMapWithCompositeValues( + t, + storage, + address, + typeInfo, + mapSize, + func(i int) []Digest { return []Digest{Digest(i)} }, + ) + + // parent map (3 levels): 1 root metadata slab, n non-root metadata slabs, n data slabs + // composite values: 1 root data slab for each + require.True(t, len(storage.deltas) > 1+mapSize) + require.True(t, getMapMetaDataSlabCount(storage) > 1) + + verifyMapLoadedElements(t, m, values) + + type slabInfo struct { + id SlabID + startIndex int + count int + children []*slabInfo + } + + rootMetaDataSlab, ok := m.root.(*MapMetaDataSlab) + require.True(t, ok) + + metadataSlabInfos := make([]*slabInfo, len(rootMetaDataSlab.childrenHeaders)) + for i, mheader := range rootMetaDataSlab.childrenHeaders { + + if i > 0 { + prevMetaDataSlabInfo := metadataSlabInfos[i-1] + prevDataSlabInfo := prevMetaDataSlabInfo.children[len(prevMetaDataSlabInfo.children)-1] + + // Update previous metadata slab count + prevMetaDataSlabInfo.count = int(mheader.firstKey) - prevMetaDataSlabInfo.startIndex + + // Update previous data slab count + prevDataSlabInfo.count = int(mheader.firstKey) - prevDataSlabInfo.startIndex + } + + metadataSlabInfo := &slabInfo{ + id: mheader.slabID, + startIndex: int(mheader.firstKey), + } + + nonRootMetadataSlab, ok := storage.deltas[mheader.slabID].(*MapMetaDataSlab) + require.True(t, ok) + + children := make([]*slabInfo, len(nonRootMetadataSlab.childrenHeaders)) + for i, h := range nonRootMetadataSlab.childrenHeaders { + children[i] = &slabInfo{ + id: h.slabID, + startIndex: int(h.firstKey), + } + if i > 0 { + children[i-1].count = int(h.firstKey) - children[i-1].startIndex + } + } + + metadataSlabInfo.children = children + metadataSlabInfos[i] = metadataSlabInfo + } + + const ( + metadataSlabType int = iota + dataSlabType + maxSlabType + ) + + r := newRand(t) + + for len(metadataSlabInfos) > 0 { + + var slabInfoToBeRemoved *slabInfo + var isLastSlab bool + + switch r.Intn(maxSlabType) { + + case metadataSlabType: + + metadataSlabIndex := r.Intn(len(metadataSlabInfos)) + + isLastSlab = metadataSlabIndex == len(metadataSlabInfos)-1 + + slabInfoToBeRemoved = metadataSlabInfos[metadataSlabIndex] + + count := slabInfoToBeRemoved.count + + // Update startIndex for subsequence metadata slabs + for i := metadataSlabIndex + 1; i < len(metadataSlabInfos); i++ { + metadataSlabInfos[i].startIndex -= count + + for j := 0; j < len(metadataSlabInfos[i].children); j++ { + metadataSlabInfos[i].children[j].startIndex -= count + } + } + + copy(metadataSlabInfos[metadataSlabIndex:], metadataSlabInfos[metadataSlabIndex+1:]) + metadataSlabInfos = metadataSlabInfos[:len(metadataSlabInfos)-1] + + case dataSlabType: + + metadataSlabIndex := r.Intn(len(metadataSlabInfos)) + + metadataSlabInfo := metadataSlabInfos[metadataSlabIndex] + + dataSlabIndex := r.Intn(len(metadataSlabInfo.children)) + + isLastSlab = (metadataSlabIndex == len(metadataSlabInfos)-1) && + (dataSlabIndex == len(metadataSlabInfo.children)-1) + + slabInfoToBeRemoved = metadataSlabInfo.children[dataSlabIndex] + + count := slabInfoToBeRemoved.count + + // Update startIndex for all subsequence data slabs in this metadata slab info + for i := dataSlabIndex + 1; i < len(metadataSlabInfo.children); i++ { + metadataSlabInfo.children[i].startIndex -= count + } + + copy(metadataSlabInfo.children[dataSlabIndex:], metadataSlabInfo.children[dataSlabIndex+1:]) + metadataSlabInfo.children = metadataSlabInfo.children[:len(metadataSlabInfo.children)-1] + + metadataSlabInfo.count -= count + + // Update startIndex for all subsequence metadata slabs. + for i := metadataSlabIndex + 1; i < len(metadataSlabInfos); i++ { + metadataSlabInfos[i].startIndex -= count + + for j := 0; j < len(metadataSlabInfos[i].children); j++ { + metadataSlabInfos[i].children[j].startIndex -= count + } + } + + if len(metadataSlabInfo.children) == 0 { + copy(metadataSlabInfos[metadataSlabIndex:], metadataSlabInfos[metadataSlabIndex+1:]) + metadataSlabInfos = metadataSlabInfos[:len(metadataSlabInfos)-1] + } + } + + err := storage.Remove(slabInfoToBeRemoved.id) + require.NoError(t, err) + + if isLastSlab { + values = values[:slabInfoToBeRemoved.startIndex] + } else { + copy(values[slabInfoToBeRemoved.startIndex:], values[slabInfoToBeRemoved.startIndex+slabInfoToBeRemoved.count:]) + values = values[:len(values)-slabInfoToBeRemoved.count] + } + + verifyMapLoadedElements(t, m, values) + } + + require.Equal(t, 0, len(values)) + }) +} + +func createMapWithLongStringKey( + t *testing.T, + storage SlabStorage, + address Address, + typeInfo TypeInfo, + size int, +) (*OrderedMap, [][2]Value) { + + digesterBuilder := &mockDigesterBuilder{} + + // Create parent map. + m, err := NewMap(storage, address, digesterBuilder, typeInfo) + require.NoError(t, err) + + expectedValues := make([][2]Value, size) + r := 'a' + for i := 0; i < size; i++ { + s := strings.Repeat(string(r), int(maxInlineMapElementSize)) + + k := NewStringValue(s) + v := Uint64Value(i) + + expectedValues[i] = [2]Value{k, v} + + digests := []Digest{Digest(i)} + digesterBuilder.On("Digest", k).Return(mockDigester{digests}) + + existingStorable, err := m.Set(compare, hashInputProvider, k, v) + require.NoError(t, err) + require.Nil(t, existingStorable) + + r++ + } + + return m, expectedValues +} + +func createMapWithSimpleValues( + t *testing.T, + storage SlabStorage, + address Address, + typeInfo TypeInfo, + size int, + newDigests func(i int) []Digest, +) (*OrderedMap, [][2]Value) { + + digesterBuilder := &mockDigesterBuilder{} + + m, err := NewMap(storage, address, digesterBuilder, typeInfo) + require.NoError(t, err) + + expectedValues := make([][2]Value, size) + r := rune('a') + for i := 0; i < size; i++ { + k := Uint64Value(i) + v := NewStringValue(strings.Repeat(string(r), 20)) + + digests := newDigests(i) + digesterBuilder.On("Digest", k).Return(mockDigester{digests}) + + expectedValues[i] = [2]Value{k, v} + + existingStorable, err := m.Set(compare, hashInputProvider, expectedValues[i][0], expectedValues[i][1]) + require.NoError(t, err) + require.Nil(t, existingStorable) + } + + return m, expectedValues +} + +func createMapWithCompositeValues( + t *testing.T, + storage SlabStorage, + address Address, + typeInfo TypeInfo, + size int, + newDigests func(i int) []Digest, +) (*OrderedMap, [][2]Value) { + + // Use mockDigesterBuilder to guarantee element order. + digesterBuilder := &mockDigesterBuilder{} + + // Create parent map + m, err := NewMap(storage, address, digesterBuilder, typeInfo) + require.NoError(t, err) + + expectedValues := make([][2]Value, size) + for i := 0; i < size; i++ { + // Create nested array + nested, err := NewArray(storage, address, typeInfo) + require.NoError(t, err) + + err = nested.Append(Uint64Value(i)) + require.NoError(t, err) + + k := Uint64Value(i) + v := nested + + expectedValues[i] = [2]Value{k, v} + + //digests := []Digest{Digest(i)} + digests := newDigests(i) + digesterBuilder.On("Digest", k).Return(mockDigester{digests}) + + // Set nested array to parent + existingStorable, err := m.Set(compare, hashInputProvider, k, v) + require.NoError(t, err) + require.Nil(t, existingStorable) + } + + return m, expectedValues +} + +func createMapWithSimpleAndCompositeValues( + t *testing.T, + storage SlabStorage, + address Address, + typeInfo TypeInfo, + size int, + compositeValueIndex int, + newDigests func(i int) []Digest, +) (*OrderedMap, [][2]Value) { + + digesterBuilder := &mockDigesterBuilder{} + + // Create parent map + m, err := NewMap(storage, address, digesterBuilder, typeInfo) + require.NoError(t, err) + + values := make([][2]Value, size) + r := 'a' + for i := 0; i < size; i++ { + + k := Uint64Value(i) + + digests := newDigests(i) + digesterBuilder.On("Digest", k).Return(mockDigester{digests}) + + if compositeValueIndex == i { + // Create nested array with one element + a, err := NewArray(storage, address, typeInfo) + require.NoError(t, err) + + err = a.Append(Uint64Value(i)) + require.NoError(t, err) + + values[i] = [2]Value{k, a} + } else { + values[i] = [2]Value{k, NewStringValue(strings.Repeat(string(r), 18))} + } + + existingStorable, err := m.Set(compare, hashInputProvider, values[i][0], values[i][1]) + require.NoError(t, err) + require.Nil(t, existingStorable) + } + + return m, values +} + +func verifyMapLoadedElements(t *testing.T, m *OrderedMap, expectedValues [][2]Value) { + i := 0 + err := m.IterateLoadedValues(func(k Value, v Value) (bool, error) { + require.True(t, i < len(expectedValues)) + valueEqual(t, typeInfoComparator, expectedValues[i][0], k) + valueEqual(t, typeInfoComparator, expectedValues[i][1], v) + i++ + return true, nil + }) + require.NoError(t, err) + require.Equal(t, len(expectedValues), i) +} + +func getMapMetaDataSlabCount(storage *PersistentSlabStorage) int { + var counter int + for _, slab := range storage.deltas { + if _, ok := slab.(*MapMetaDataSlab); ok { + counter++ + } + } + return counter +} + func TestMaxInlineMapValueSize(t *testing.T) { t.Run("small key", func(t *testing.T) { diff --git a/storable.go b/storable.go index 1ee513a9..fdf0278d 100644 --- a/storable.go +++ b/storable.go @@ -143,3 +143,31 @@ func DecodeSlabIDStorable(dec *cbor.StreamDecoder) (Storable, error) { return SlabIDStorable(id), nil } + +func getLoadedValue(storage SlabStorage, storable Storable) (Value, error) { + switch storable := storable.(type) { + case SlabIDStorable: + slab := storage.RetrieveIfLoaded(SlabID(storable)) + if slab == nil { + // Skip because it references unloaded slab. + return nil, nil + } + + v, err := slab.StoredValue(storage) + if err != nil { + // Wrap err as external error (if needed) because err is returned by Storable interface. + return nil, wrapErrorfAsExternalErrorIfNeeded(err, "failed to get storable's stored value") + } + + return v, nil + + default: + v, err := storable.StoredValue(storage) + if err != nil { + // Wrap err as external error (if needed) because err is returned by Storable interface. + return nil, wrapErrorfAsExternalErrorIfNeeded(err, "failed to get storable's stored value") + } + + return v, nil + } +} diff --git a/storage.go b/storage.go index d36d0237..005e69fd 100644 --- a/storage.go +++ b/storage.go @@ -282,6 +282,7 @@ type SlabIterator func() (SlabID, Slab) type SlabStorage interface { Store(SlabID, Slab) error Retrieve(SlabID) (Slab, bool, error) + RetrieveIfLoaded(SlabID) Slab Remove(SlabID) error GenerateSlabID(address Address) (SlabID, error) Count() int @@ -323,6 +324,10 @@ func (s *BasicSlabStorage) GenerateSlabID(address Address) (SlabID, error) { return NewSlabID(address, nextIndex), nil } +func (s *BasicSlabStorage) RetrieveIfLoaded(id SlabID) Slab { + return s.Slabs[id] +} + func (s *BasicSlabStorage) Retrieve(id SlabID) (Slab, bool, error) { slab, ok := s.Slabs[id] return slab, ok, nil @@ -958,6 +963,21 @@ func (s *PersistentSlabStorage) RetrieveIgnoringDeltas(id SlabID) (Slab, bool, e return slab, ok, nil } +func (s *PersistentSlabStorage) RetrieveIfLoaded(id SlabID) Slab { + // check deltas first. + if slab, ok := s.deltas[id]; ok { + return slab + } + + // check the read cache next. + if slab, ok := s.cache[id]; ok { + return slab + } + + // Don't fetch from base storage. + return nil +} + func (s *PersistentSlabStorage) Retrieve(id SlabID) (Slab, bool, error) { // check deltas first if slab, ok := s.deltas[id]; ok {