Skip to content

Commit

Permalink
refactored Map.Drain API and tests
Browse files Browse the repository at this point in the history
Signed-off-by: Simone Magnani <[email protected]>
  • Loading branch information
smagnani96 committed Jul 29, 2024
1 parent 57a6e30 commit 39b3749
Show file tree
Hide file tree
Showing 2 changed files with 199 additions and 224 deletions.
197 changes: 107 additions & 90 deletions map.go
Original file line number Diff line number Diff line change
Expand Up @@ -1291,8 +1291,8 @@ func batchCount(keys, values any) (int, error) {
// safe. You may see the same key multiple times. Iteration may
// also abort with an error, see IsIterationAborted.
//
// Iterating a queue/stack map returns an error (NextKey).
// Map.Drain should be used instead.
// Iterating a queue/stack map returns an error (NextKey) as the.
// Map.Drain API should be used instead.
func (m *Map) Iterate() *MapIterator {
return newMapIterator(m)
}
Expand Down Expand Up @@ -1595,25 +1595,21 @@ func (mi *MapIterator) Next(keyOut, valueOut interface{}) bool {
}

func (mi *MapIterator) nextIterate(keyOut, valueOut interface{}) bool {
// For array-like maps NextKey returns nil only after maxEntries
// iterations.
var key interface{}

// For array-like maps NextKey returns nil only after maxEntries iterations.
for mi.count <= mi.maxEntries {
if mi.cursor == nil {
// Pass nil interface to NextKey to make sure the Map's first key
// is returned. If we pass an uninitialized []byte instead, it'll see a
// non-nil interface and try to marshal it.
mi.cursor = make([]byte, mi.target.keySize)
mi.err = mi.target.NextKey(nil, mi.cursor)
key = nil
} else {
mi.err = mi.target.NextKey(mi.cursor, mi.cursor)
key = mi.cursor
}

if errors.Is(mi.err, ErrKeyNotExist) {
mi.done = true
mi.err = nil
return false
} else if mi.err != nil {
mi.err = fmt.Errorf("get next key: %w", mi.err)
if !mi.fetchNextKey(key) {
return false
}

Expand All @@ -1635,110 +1631,131 @@ func (mi *MapIterator) nextIterate(keyOut, valueOut interface{}) bool {
return false
}

buf := mi.cursor.([]byte)
if ptr, ok := keyOut.(unsafe.Pointer); ok {
copy(unsafe.Slice((*byte)(ptr), len(buf)), buf)
} else {
mi.err = sysenc.Unmarshal(keyOut, buf)
}

return mi.err == nil
return mi.copyCursorToKeyOut(keyOut)
}

mi.err = fmt.Errorf("%w", ErrIterationAborted)
return false
}

func (mi *MapIterator) nextDrain(keyOut, valueOut interface{}) bool {
// Check for maps without a key (e.g., Queue/Stack). In this case, when there is no
// more data, ErrKeyNotExist arise, but we gently stop the retrieval with no error.
if mi.target.keySize == 0 {
if keyOut != nil {
mi.err = fmt.Errorf("non-nil keyOut provided for map without a key, must be nil instead")
return false
}
mi.err = mi.target.LookupAndDelete(keyOut, valueOut)
if errors.Is(mi.err, ErrKeyNotExist) {
mi.done = true
mi.err = nil
return false
} else if mi.err != nil {
return false
}
mi.count++
return true
if mi.isKeylessMap() {
return mi.handleKeylessMap(keyOut, valueOut)
}

// Here we allocate only once data for retrieving the next key in the map.
// Allocate only once data for retrieving the next key in the map.
if mi.cursor == nil {
mi.cursor = make([]byte, mi.target.keySize)
}

for {
// Always retrieve first key in the map. This should ensure that
// the whole map is traversed, despite concurrent insertion.
// The expected ordering might differ:
// - initial keys in map: `a -> b -> c`
// - call MapIterator.Next and retrieve key `a`
// - insert key `d` in map
// - retrieve all the remaining keys `d -> b -> c`
mi.err = mi.target.NextKey(nil, mi.cursor)
if errors.Is(mi.err, ErrKeyNotExist) {
mi.done = true
// Always retrieve first key in the map. This should ensure that the whole map
// is traversed, despite concurrent operations (ordering of items might differ).
for mi.err == nil && mi.fetchNextKey(nil) {
if mi.tryLookupAndDelete(valueOut) {
mi.count++
return mi.copyCursorToKeyOut(keyOut)
}
}
return false
}

func (mi *MapIterator) tryLookupAndDelete(valueOut interface{}) bool {
// Default try using the updated `Map.LookupAndDelete` API.
if !mi.fallback {
mi.err = mi.target.LookupAndDelete(mi.cursor, valueOut)
if mi.err == nil {
return true
}

switch {
case errors.Is(mi.err, ErrNotSupported) || errors.Is(mi.err, unix.EINVAL):
mi.fallback = true
case errors.Is(mi.err, ErrKeyNotExist):
// Same as `MapIterator.nextIterate`: valid key but no value retrieved.
mi.err = nil
return false
} else if mi.err != nil {
mi.err = fmt.Errorf("get next key: %w", mi.err)
default:
mi.err = fmt.Errorf("lookup_and_delete key: %w", mi.err)
return false
}
}

// Check if LookupAndDelete is supported and not invalid args, otherwise fallback
if !mi.fallback {
mi.err = mi.target.LookupAndDelete(mi.cursor, valueOut)
if mi.err != nil && errors.Is(mi.err, ErrNotSupported) || errors.Is(mi.err, unix.EINVAL) {
// Fallback to sequential while also reusing `mi.cursor`.
mi.fallback = true
} else if errors.Is(mi.err, ErrKeyNotExist) {
// Same as in MapIterator.nextIterate.
continue
} else if mi.err != nil {
mi.err = fmt.Errorf("lookup_and_delete key: %w", mi.err)
return false
}
// Fallback to sequential `Map.Lookup` -> `Map.Delete` when `Map.LookupAndDelete` is not supported.
mi.err = mi.target.Lookup(mi.cursor, valueOut)
if mi.err != nil {
if errors.Is(mi.err, ErrKeyNotExist) {
// Same as `MapIterator.nextIterate`: valid key but no value retrieved.
mi.err = nil
} else {
mi.err = fmt.Errorf("look up next key: %w", mi.err)
}
return false
}

// Falling back to sequential Lookup -> Delete in the case
// LookupAndDelete is not supported (e.g., kernel < 5.14).
if mi.fallback {
mi.err = mi.target.Lookup(mi.cursor, valueOut)
if errors.Is(mi.err, ErrKeyNotExist) {
// Same as in MapIterator.nextIterate.
continue
} else if mi.err != nil {
mi.err = fmt.Errorf("look up next key: %w", mi.err)
return false
}
mi.err = mi.target.Delete(mi.cursor)
if errors.Is(mi.err, ErrKeyNotExist) {
// Same as in MapIterator.nextIterate.
continue
} else if mi.err != nil {
mi.err = fmt.Errorf("delete key: %w", mi.err)
return false
}
mi.err = mi.target.Delete(mi.cursor)
if mi.err != nil {
if errors.Is(mi.err, ErrKeyNotExist) {
// Same as `MapIterator.nextIterate`: valid key but no value retrieved.
mi.err = nil
} else {
mi.err = fmt.Errorf("delete key: %w", mi.err)
}
return false
}

return true
}

func (mi *MapIterator) isKeylessMap() bool {
return mi.target.keySize == 0
}

func (mi *MapIterator) handleKeylessMap(keyOut, valueOut interface{}) bool {
if keyOut != nil {
mi.err = fmt.Errorf("non-nil keyOut provided for map without a key, must be nil instead")
return false
}

mi.err = mi.target.LookupAndDelete(nil, valueOut)
if mi.err == nil {
mi.count++
return true
}

buf := mi.cursor.([]byte)
if ptr, ok := keyOut.(unsafe.Pointer); ok {
copy(unsafe.Slice((*byte)(ptr), len(buf)), buf)
} else {
mi.err = sysenc.Unmarshal(keyOut, buf)
}
if errors.Is(mi.err, ErrKeyNotExist) {
mi.done = true
mi.err = nil
} else {
mi.err = fmt.Errorf("lookup_and_delete key: %w", mi.err)
}

return false
}

func (mi *MapIterator) fetchNextKey(key interface{}) bool {
mi.err = mi.target.NextKey(key, mi.cursor)
if mi.err == nil {
return true
}

if errors.Is(mi.err, ErrKeyNotExist) {
mi.done = true
mi.err = nil
} else {
mi.err = fmt.Errorf("get next key: %w", mi.err)
}

return mi.err == nil
return false
}

func (mi *MapIterator) copyCursorToKeyOut(keyOut interface{}) bool {
buf := mi.cursor.([]byte)
if ptr, ok := keyOut.(unsafe.Pointer); ok {
copy(unsafe.Slice((*byte)(ptr), len(buf)), buf)
} else {
mi.err = sysenc.Unmarshal(keyOut, buf)
}
return mi.err == nil
}

// Err returns any encountered error.
Expand Down
Loading

0 comments on commit 39b3749

Please sign in to comment.