Skip to content

Commit

Permalink
add Map.Drain to iterate and remove entries
Browse files Browse the repository at this point in the history
Signed-off-by: Simone Magnani <[email protected]>
  • Loading branch information
smagnani96 committed Jun 25, 2024
1 parent 2996c5a commit c3c539c
Show file tree
Hide file tree
Showing 4 changed files with 262 additions and 189 deletions.
1 change: 0 additions & 1 deletion examples/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@ Please see our [guide on what makes a good example](https://ebpf-go.dev/contribu
* [tcp_close](tcprtt/) - Log RTT of IPv4 TCP connections using eBPF CO-RE helpers.
* XDP - Attach a program to a network interface to process incoming packets.
* [xdp](xdp/) - Print packet counts by IPv4 source address.
* Add your use case(s) here!

## How to run

Expand Down
218 changes: 111 additions & 107 deletions map.go
Original file line number Diff line number Diff line change
Expand Up @@ -1272,14 +1272,20 @@ func batchCount(keys, values any) (int, error) {
//
// It's not possible to guarantee that all keys in a map will be
// returned if there are concurrent modifications to the map.
//
// For value-only maps (Queue and Stack), multiple iterators will
// never lookup the same entry, as it is removed from the buffer by design
// (pop operation).
func (m *Map) Iterate() MapIterator {
func (m *Map) Iterate() *MapIterator {
return newMapIterator(m)
}

// Drain traverses a map while also removing entries.
//
// It's safe to create multiple drainers at the same time,
// but their respective outputs will differ.
func (m *Map) Drain() *MapIterator {
it := newMapIterator(m)
it.drain = true
return it
}

// Close the Map's underlying file descriptor, which could unload the
// Map from the kernel if it is not pinned or in use by a loaded Program.
func (m *Map) Close() error {
Expand Down Expand Up @@ -1529,103 +1535,57 @@ func marshalMap(m *Map, length int) ([]byte, error) {
return buf, nil
}

// MapIterator is the interface defining methods to iterate
// through the content of the target map.
//
// See Map.Iterate.
type MapIterator interface {
// Err returns any encountered error.
//
// The method must be called after Next returns nil.
//
// For key-value maps, returns ErrIterationAborted if
// it wasn't possible to do a full iteration.
Err() error

// Next decodes the next key and value.
//
// In case of a value-only map (Queue and Stack), the key
// parameter is set to nil in case of no errors.
//
// Returns false if there are no more entries. You must check
// the result of Err afterwards.
Next(keyOut, valueOut interface{}) bool
}

// keyValueMapIterator iterates a Map defined by key-value pairs.
// (all except Queue and Stack)
// MapIterator iterates a Map.
//
// See Map.Iterate.
type keyValueMapIterator struct {
type MapIterator struct {
target *Map
// Temporary storage to avoid allocations in Next(). This is any instead
// of []byte to avoid allocations.
cursor any
count, maxEntries uint32
done bool
err error
}

// keylessMapIterator iterates a Map defined with only values.
// (Queue and Stack)
//
// See Map.Iterate.
type keylessMapIterator struct {
target *Map
err error
done, drain bool
// Used in old kernels while Draining a map and LookupAndDelete is not supported
fallback bool
err error
}

// newMapIterator return the correct MapIterator implementation
// based on the underlying map type.
func newMapIterator(target *Map) MapIterator {
if target.typ.isQueueStack() {
return &keylessMapIterator{
target: target,
}
}
return &keyValueMapIterator{
func newMapIterator(target *Map) *MapIterator {
return &MapIterator{
target: target,
maxEntries: target.maxEntries,
}
}

// next decodes the next value from a map of type Queue or Stack.
// Next decodes the next key and value. If the iterator is created
// through the Map.Drain API, the key and value are also removed.
//
// Returns false if there are no more entries. You must check
// the result of Err afterwards.
func (mi *keylessMapIterator) next(_, valueOut interface{}) bool {
// Check whether there was a previous error
if mi.err != nil {
return false
}

err := mi.target.LookupAndDelete(nil, valueOut)
if errors.Is(err, ErrKeyNotExist) {
// For these maps this error indicates that the map is empty.
// Return false instead of the error.
return false
}
if err != nil {
// Whether any other error rather than ErrKeyNotExist occurred
mi.err = fmt.Errorf("look up next value: %w", err)
return false
}
return true
}

// next decodes the next key and value from a map with key-value pair
// (all except Queue and Stack).
// Iterating a hash map from which keys are being deleted is not
// safe. You may see the same key multiple times. Iteration may
// also abort with an error, see IsIterationAborted.
//
// Iterating a queue/stack map returns ErrIterationAborted, as only
// Map.Drain is supported.
//
// Returns false if there are no more entries. You must check
// the result of Err afterwards.
func (mi *keyValueMapIterator) next(keyOut, valueOut interface{}) bool {
//
// See Map.Get for further caveats around valueOut.
func (mi *MapIterator) Next(keyOut, valueOut interface{}) bool {
if mi.err != nil || mi.done {
return false
}
if mi.drain {
return mi.nextDrain(keyOut, valueOut)
}
return mi.nextIterate(keyOut, valueOut)
}

// For array-like maps NextKey returns nil only after maxEntries
// iterations.
for mi.count <= mi.maxEntries {
func (mi *MapIterator) nextIterate(keyOut, valueOut interface{}) bool {
// For array-like maps NextKey returns nil only after maxEntries iterations.
// For maps with keySize equal to 0 (Queue/Stack) we return ErrIterationAborted,
// since NextKey returns an error. In this case, Map.Drain should be used instead.
for mi.target.keySize != 0 && mi.count <= mi.maxEntries {
if mi.cursor == nil {
// Pass nil interface to NextKey to make sure the Map's first key
// is returned. If we pass an uninitialized []byte instead, it'll see a
Expand Down Expand Up @@ -1677,41 +1637,85 @@ func (mi *keyValueMapIterator) next(keyOut, valueOut interface{}) bool {
return false
}

// Next decodes the next key and value.
//
// Iterating a hash map from which keys are being deleted is not
// safe. You may see the same key multiple times. Iteration may
// also abort with an error, see IsIterationAborted.
//
// Returns false if there are no more entries. You must check
// the result of Err afterwards.
//
// See Map.Get for further caveats around valueOut.
func (mi *keyValueMapIterator) Next(keyOut, valueOut interface{}) bool {
return mi.next(keyOut, valueOut)
}
func (mi *MapIterator) nextDrain(keyOut, valueOut interface{}) bool {
// Check for maps without a key (e.g., Queue/Stack). In this case, when there is no
// more data, ErrKeyNotExist arise, but we gently stop the retrieval with no error.
if mi.target.keySize == 0 {
mi.err = mi.target.LookupAndDelete(keyOut, valueOut)
if errors.Is(mi.err, ErrKeyNotExist) {
mi.done = true
mi.err = nil
return false
}
return mi.err == nil
}

// Next decodes the next value in the Queue/Stack, ignoring the keyOut parameter.
//
// Returns false if there are no more entries (ErrKeyNotExist).
// You must check the result of Err afterwards.
func (mi *keylessMapIterator) Next(keyOut, valueOut interface{}) bool {
return mi.next(keyOut, valueOut)
// Here we allocate only once data for retrieving the next key in the map.
if mi.cursor == nil {
mi.cursor = make([]byte, mi.target.keySize)
}

// Always retrieve first key in the map. This should ensure that
// the whole map is traversed, despite concurrent insertion.
// The expected ordering might differ:
// - initial keys in map: `a -> b -> c`
// - call MapIterator.Next and retrieve key `a`
// - insert key `d` in map
// - retrieve all the remaining keys `d -> b -> c`
mi.err = mi.target.NextKey(nil, mi.cursor)
if errors.Is(mi.err, ErrKeyNotExist) {
mi.done = true
mi.err = nil
return false
} else if mi.err != nil {
mi.err = fmt.Errorf("get next key: %w", mi.err)
return false
}

// fallback to sequential Lookup -> Delete when LookupAndDelete
// is not supported (e.g., kernel < 5.14).
if mi.fallback {
mi.err = mi.target.Lookup(mi.cursor, valueOut)
if errors.Is(mi.err, ErrKeyNotExist) {
// Same as in MapIterator.nextIterate.
return mi.nextDrain(keyOut, valueOut)
} else if mi.err != nil {
mi.err = fmt.Errorf("look up next key: %w", mi.err)
return false
}
mi.err = mi.target.Delete(mi.cursor)
} else {
mi.err = mi.target.LookupAndDelete(mi.cursor, valueOut)
if errors.Is(mi.err, ErrNotSupported) {
mi.fallback = true
return mi.nextDrain(keyOut, valueOut)
}
}

if errors.Is(mi.err, ErrKeyNotExist) {
// Same as in MapIterator.nextIterate.
return mi.nextDrain(keyOut, valueOut)
} else if mi.err != nil {
mi.err = fmt.Errorf("delete next key: %w", mi.err)
return false
}

buf := mi.cursor.([]byte)
if ptr, ok := keyOut.(unsafe.Pointer); ok {
copy(unsafe.Slice((*byte)(ptr), len(buf)), buf)
} else {
mi.err = sysenc.Unmarshal(keyOut, buf)
}

return mi.err == nil
}

// Err returns any encountered error.
//
// The method must be called after Next returns nil.
//
// Returns ErrIterationAborted if it wasn't possible to do a full iteration.
func (mi *keyValueMapIterator) Err() error {
return mi.err
}

// Err returns any encountered error.
//
// The method must be called after Next returns nil.
func (mi *keylessMapIterator) Err() error {
func (mi *MapIterator) Err() error {
return mi.err
}

Expand Down
Loading

0 comments on commit c3c539c

Please sign in to comment.