From 84bdb4c2b6343e07ba594d89a1c01f3f333d0c96 Mon Sep 17 00:00:00 2001 From: Aidan Carson Date: Tue, 23 May 2023 10:09:49 -0700 Subject: [PATCH 1/4] Refactor load --- pkg/cli/internal/commands/run/run.go | 50 ++-- pkg/loader/loader.go | 334 +++------------------------ pkg/loader/mapwatcher/receiver.go | 20 ++ pkg/loader/mapwatcher/watcher.go | 246 ++++++++++++++++++++ pkg/loader/util/util.go | 48 ++++ pkg/loader/watcher.go | 38 --- pkg/tui/filter.go | 6 +- pkg/tui/tui.go | 33 +-- 8 files changed, 402 insertions(+), 373 deletions(-) create mode 100644 pkg/loader/mapwatcher/receiver.go create mode 100644 pkg/loader/mapwatcher/watcher.go create mode 100644 pkg/loader/util/util.go delete mode 100644 pkg/loader/watcher.go diff --git a/pkg/cli/internal/commands/run/run.go b/pkg/cli/internal/commands/run/run.go index 632076a..53c2e11 100644 --- a/pkg/cli/internal/commands/run/run.go +++ b/pkg/cli/internal/commands/run/run.go @@ -12,16 +12,18 @@ import ( "github.com/cilium/ebpf/rlimit" "github.com/pkg/errors" "github.com/pterm/pterm" + "github.com/spf13/cobra" + "github.com/spf13/pflag" + "go.uber.org/zap" + "github.com/solo-io/bumblebee/pkg/cli/internal/options" "github.com/solo-io/bumblebee/pkg/decoder" "github.com/solo-io/bumblebee/pkg/loader" + "github.com/solo-io/bumblebee/pkg/loader/mapwatcher" "github.com/solo-io/bumblebee/pkg/spec" "github.com/solo-io/bumblebee/pkg/stats" "github.com/solo-io/bumblebee/pkg/tui" "github.com/solo-io/go-utils/contextutils" - "github.com/spf13/cobra" - "github.com/spf13/pflag" - "go.uber.org/zap" ) type runOptions struct { @@ -109,22 +111,18 @@ func run(cmd *cobra.Command, args []string, opts *runOptions) error { return err } - progLoader := loader.NewLoader( - decoder.NewDecoderFactory(), - promProvider, - ) - parsedELF, err := progLoader.Parse(ctx, progReader) + + parsedELF, err := loader.Parse(ctx, progReader) if err != nil { return fmt.Errorf("could not parse BPF program: %w", err) } - tuiApp, err := buildTuiApp(&progLoader, progLocation, opts.filter, parsedELF) + tuiApp, err := buildTuiApp(progLocation, opts.filter, parsedELF) if err != nil { return err } - loaderOpts := loader.LoadOptions{ + loaderOpts := &loader.LoadOptions{ ParsedELF: parsedELF, - Watcher: tuiApp, PinMaps: opts.pinMaps, PinProgs: opts.pinProgs, } @@ -134,20 +132,38 @@ func run(cmd *cobra.Command, args []string, opts *runOptions) error { contextutils.LoggerFrom(ctx).Info("before calling tui.Run() context is done") return ctx.Err() } - if opts.notty { - fmt.Println("Calling Load...") - loaderOpts.Watcher = loader.NewNoopWatcher() - err = progLoader.Load(ctx, &loaderOpts) + + contextutils.LoggerFrom(ctx).Info("calling Load()") + progLink, loadedMaps, err := loader.Load(ctx, loaderOpts) + contextutils.LoggerFrom(ctx).Info("returned from Load()") + if err != nil { return err + } + + // Close our loaded program only if there's nothing set to explicitly extend our program's lifetime. + if opts.pinMaps == "" && opts.pinProgs == "" { + defer progLink.Close() + } + + if opts.notty { + fmt.Println("Running in non-interactive mode. Hit Ctrl-C to exit.") + <-ctx.Done() } else { + mapWatcher := mapwatcher.New(parsedELF.WatchedMaps, loadedMaps, decoder.NewDecoderFactory(), promProvider) contextutils.LoggerFrom(ctx).Info("calling tui run()") - err = tuiApp.Run(ctx, progLoader, &loaderOpts) + err = tuiApp.Run(ctx, mapWatcher) contextutils.LoggerFrom(ctx).Info("after tui run()") return err } + + return nil } -func buildTuiApp(loader *loader.Loader, progLocation string, filterString []string, parsedELF *loader.ParsedELF) (*tui.App, error) { +func buildTuiApp( + progLocation string, + filterString []string, + parsedELF *loader.ParsedELF, +) (*tui.App, error) { // TODO: add filter to UI filter, err := tui.BuildFilter(filterString, parsedELF.WatchedMaps) if err != nil { diff --git a/pkg/loader/loader.go b/pkg/loader/loader.go index 6413283..ec50acd 100644 --- a/pkg/loader/loader.go +++ b/pkg/loader/loader.go @@ -5,90 +5,36 @@ import ( "errors" "fmt" "io" - "log" "os" "path/filepath" "strings" - "time" "github.com/cilium/ebpf" "github.com/cilium/ebpf/btf" "github.com/cilium/ebpf/link" - "github.com/cilium/ebpf/ringbuf" - "golang.org/x/sync/errgroup" - "github.com/solo-io/bumblebee/pkg/decoder" - "github.com/solo-io/bumblebee/pkg/stats" + "github.com/solo-io/bumblebee/pkg/loader/mapwatcher" + "github.com/solo-io/bumblebee/pkg/loader/util" "github.com/solo-io/go-utils/contextutils" ) type ParsedELF struct { Spec *ebpf.CollectionSpec - WatchedMaps map[string]WatchedMap + WatchedMaps map[string]mapwatcher.WatchedMap } type LoadOptions struct { ParsedELF *ParsedELF - Watcher MapWatcher PinMaps string PinProgs string } type Loader interface { Parse(ctx context.Context, reader io.ReaderAt) (*ParsedELF, error) - Load(ctx context.Context, opts *LoadOptions) error - WatchMaps(ctx context.Context, watchedMaps map[string]WatchedMap, coll map[string]*ebpf.Map, watcher MapWatcher) error + Load(ctx context.Context, opts *LoadOptions) (link.Link, map[string]*ebpf.Map, error) } -type WatchedMap struct { - Name string - Labels []string - - btf *btf.Map - mapType ebpf.MapType - mapSpec *ebpf.MapSpec - - valueStruct *btf.Struct -} - -type loader struct { - decoderFactory decoder.DecoderFactory - metricsProvider stats.MetricsProvider -} - -func NewLoader( - decoderFactory decoder.DecoderFactory, - metricsProvider stats.MetricsProvider, -) Loader { - return &loader{ - decoderFactory: decoderFactory, - metricsProvider: metricsProvider, - } -} - -const ( - counterMapType = "counter" - gaugeMapType = "gauge" - printMapType = "print" -) - -func isPrintMap(spec *ebpf.MapSpec) bool { - return strings.Contains(spec.SectionName, printMapType) -} - -func isGaugeMap(spec *ebpf.MapSpec) bool { - return strings.Contains(spec.SectionName, gaugeMapType) -} - -func isCounterMap(spec *ebpf.MapSpec) bool { - return strings.Contains(spec.SectionName, counterMapType) -} - -func isTrackedMap(spec *ebpf.MapSpec) bool { - return isCounterMap(spec) || isGaugeMap(spec) || isPrintMap(spec) -} - -func (l *loader) Parse(ctx context.Context, progReader io.ReaderAt) (*ParsedELF, error) { +func Parse(ctx context.Context, progReader io.ReaderAt) (*ParsedELF, error) { spec, err := ebpf.LoadCollectionSpecFromReader(progReader) if err != nil { return nil, err @@ -100,21 +46,21 @@ func (l *loader) Parse(ctx context.Context, progReader io.ReaderAt) (*ParsedELF, } } - watchedMaps := make(map[string]WatchedMap) + watchedMaps := make(map[string]mapwatcher.WatchedMap) for name, mapSpec := range spec.Maps { - if !isTrackedMap(mapSpec) { + if !util.IsTrackedMap(mapSpec) { continue } - watchedMap := WatchedMap{ + watchedMap := mapwatcher.WatchedMap{ Name: name, - btf: mapSpec.BTF, - mapType: mapSpec.Type, - mapSpec: mapSpec, + BTF: mapSpec.BTF, + MapType: mapSpec.Type, + MapSpec: mapSpec, } // TODO: Delete Hack if possible - if watchedMap.mapType == ebpf.RingBuf || watchedMap.mapType == ebpf.PerfEventArray { + if watchedMap.MapType == ebpf.RingBuf || watchedMap.MapType == ebpf.PerfEventArray { if _, ok := mapSpec.BTF.Value.(*btf.Struct); !ok { return nil, fmt.Errorf("the `value` member for map '%v' must be set to struct you will be submitting to the ringbuf/eventarray", name) } @@ -124,13 +70,13 @@ func (l *loader) Parse(ctx context.Context, progReader io.ReaderAt) (*ParsedELF, switch mapSpec.Type { case ebpf.RingBuf: - structType := watchedMap.btf.Value.(*btf.Struct) - watchedMap.valueStruct = structType - labelKeys := getLabelsForBtfStruct(structType) + structType := watchedMap.BTF.Value.(*btf.Struct) + watchedMap.ValueStruct = structType + labelKeys := util.GetLabelsForBtfStruct(structType) watchedMap.Labels = labelKeys case ebpf.Hash: - labelKeys, err := getLabelsForHashMapKey(mapSpec) + labelKeys, err := util.GetLabelsForHashMapKey(mapSpec) if err != nil { return nil, err } @@ -150,16 +96,14 @@ func (l *loader) Parse(ctx context.Context, progReader io.ReaderAt) (*ParsedELF, return &loadOptions, nil } -func (l *loader) Load(ctx context.Context, opts *LoadOptions) error { +func Load(ctx context.Context, opts *LoadOptions) (link.Link, map[string]*ebpf.Map, error) { // TODO: add invariant checks on opts contextutils.LoggerFrom(ctx).Info("enter Load()") - // on shutdown notify watcher we have no more entries to send - defer opts.Watcher.Close() // bail out before loading stuff into kernel if context canceled if ctx.Err() != nil { contextutils.LoggerFrom(ctx).Info("load entrypoint context is done") - return ctx.Err() + return nil, nil, ctx.Err() } if opts.PinMaps != "" { @@ -183,274 +127,64 @@ func (l *loader) Load(ctx context.Context, opts *LoadOptions) error { }, }) if err != nil { - return err + return nil, nil, err } defer coll.Close() + var progLink link.Link // For each program, add kprope/tracepoint for name, prog := range spec.Programs { select { case <-ctx.Done(): contextutils.LoggerFrom(ctx).Info("while loading progs context is done") - return ctx.Err() + return nil, nil, ctx.Err() default: switch prog.Type { case ebpf.Kprobe: - var kp link.Link var err error if strings.HasPrefix(prog.SectionName, "kretprobe/") { - kp, err = link.Kretprobe(prog.AttachTo, coll.Programs[name]) + progLink, err = link.Kretprobe(prog.AttachTo, coll.Programs[name]) if err != nil { - return fmt.Errorf("error attaching kretprobe '%v': %w", prog.Name, err) + return nil, nil, fmt.Errorf("error attaching kretprobe '%v': %w", prog.Name, err) } } else { - kp, err = link.Kprobe(prog.AttachTo, coll.Programs[name]) + progLink, err = link.Kprobe(prog.AttachTo, coll.Programs[name]) if err != nil { - return fmt.Errorf("error attaching kprobe '%v': %w", prog.Name, err) + return nil, nil, fmt.Errorf("error attaching kprobe '%v': %w", prog.Name, err) } } - defer kp.Close() case ebpf.TracePoint: - var tp link.Link var err error if strings.HasPrefix(prog.SectionName, "tracepoint/") { tokens := strings.Split(prog.AttachTo, "/") if len(tokens) != 2 { - return fmt.Errorf("unexpected tracepoint section '%v'", prog.AttachTo) + return nil, nil, fmt.Errorf("unexpected tracepoint section '%v'", prog.AttachTo) } - tp, err = link.Tracepoint(tokens[0], tokens[1], coll.Programs[name]) + progLink, err = link.Tracepoint(tokens[0], tokens[1], coll.Programs[name]) if err != nil { - return fmt.Errorf("error attaching to tracepoint '%v': %w", prog.Name, err) + return nil, nil, fmt.Errorf("error attaching to tracepoint '%v': %w", prog.Name, err) } } - defer tp.Close() default: - return errors.New("only kprobe programs supported") + return nil, nil, errors.New("only kprobe programs supported") } + if opts.PinProgs != "" { if err := createDir(ctx, opts.PinProgs, 0700); err != nil { - return err + return nil, nil, err } pinFile := filepath.Join(opts.PinProgs, prog.Name) if err := coll.Programs[name].Pin(pinFile); err != nil { - return fmt.Errorf("could not pin program '%s': %v", prog.Name, err) + progLink.Close() + return nil, nil, fmt.Errorf("could not pin program '%s': %v", prog.Name, err) } fmt.Printf("Successfully pinned program '%v'\n", pinFile) } } } - return l.WatchMaps(ctx, opts.ParsedELF.WatchedMaps, coll.Maps, opts.Watcher) -} - -func (l *loader) WatchMaps( - ctx context.Context, - watchedMaps map[string]WatchedMap, - maps map[string]*ebpf.Map, - watcher MapWatcher, -) error { - contextutils.LoggerFrom(ctx).Info("enter watchMaps()") - eg, ctx := errgroup.WithContext(ctx) - for name, bpfMap := range watchedMaps { - name := name - bpfMap := bpfMap - - switch bpfMap.mapType { - case ebpf.RingBuf: - var increment stats.IncrementInstrument - if isCounterMap(bpfMap.mapSpec) { - increment = l.metricsProvider.NewIncrementCounter(name, bpfMap.Labels) - } else if isPrintMap(bpfMap.mapSpec) { - increment = &noop{} - } - eg.Go(func() error { - watcher.NewRingBuf(name, bpfMap.Labels) - return l.startRingBuf(ctx, bpfMap.valueStruct, maps[name], increment, name, watcher) - }) - case ebpf.Array: - fallthrough - case ebpf.Hash: - labelKeys := bpfMap.Labels - var instrument stats.SetInstrument - if isCounterMap(bpfMap.mapSpec) { - instrument = l.metricsProvider.NewSetCounter(bpfMap.Name, labelKeys) - } else if isGaugeMap(bpfMap.mapSpec) { - instrument = l.metricsProvider.NewGauge(bpfMap.Name, labelKeys) - } else { - instrument = &noop{} - } - eg.Go(func() error { - // TODO: output type of instrument in UI? - watcher.NewHashMap(name, labelKeys) - return l.startHashMap(ctx, bpfMap.mapSpec, maps[name], instrument, name, watcher) - }) - default: - // TODO: Support more map types - return errors.New("unsupported map type") - } - } - - err := eg.Wait() - contextutils.LoggerFrom(ctx).Info("after waitgroup") - return err -} - -func (l *loader) startRingBuf( - ctx context.Context, - valueStruct *btf.Struct, - liveMap *ebpf.Map, - incrementInstrument stats.IncrementInstrument, - name string, - watcher MapWatcher, -) error { - // Initialize decoder - d := l.decoderFactory() - logger := contextutils.LoggerFrom(ctx) - - // Open a ringbuf reader from userspace RINGBUF map described in the - // eBPF C program. - rd, err := ringbuf.NewReader(liveMap) - if err != nil { - return fmt.Errorf("opening ringbuf reader: %v", err) - } - defer rd.Close() - // Close the reader when the process receives a signal, which will exit - // the read loop. - go func() { - <-ctx.Done() - logger.Info("in ringbuf watcher, got done...") - if err := rd.Close(); err != nil { - logger.Infof("error while closing ringbuf '%s' reader: %s", name, err) - } - logger.Info("after reader.Close()") - }() - - for { - record, err := rd.Read() - if err != nil { - if errors.Is(err, ringbuf.ErrClosed) { - logger.Info("ringbuf closed...") - return nil - } - logger.Infof("error while reading from ringbuf '%s' reader: %s", name, err) - continue - } - result, err := d.DecodeBtfBinary(ctx, valueStruct, record.RawSample) - if err != nil { - return err - } - - stringLabels := stringify(result) - incrementInstrument.Increment(ctx, stringLabels) - watcher.SendEntry(MapEntry{ - Name: name, - Entry: KvPair{ - Key: stringLabels, - }, - }) - } -} - -func (l *loader) startHashMap( - ctx context.Context, - mapSpec *ebpf.MapSpec, - liveMap *ebpf.Map, - instrument stats.SetInstrument, - name string, - watcher MapWatcher, -) error { - d := l.decoderFactory() - - ticker := time.NewTicker(1 * time.Second) - for { - select { - case <-ticker.C: - mapIter := liveMap.Iterate() - for { - // Use generic key,value so we can decode ourselves - var ( - key, value []byte - ) - if !mapIter.Next(&key, &value) { - break - } - if err := mapIter.Err(); err != nil { - return err - } - decodedKey, err := d.DecodeBtfBinary(ctx, mapSpec.BTF.Key, key) - if err != nil { - return fmt.Errorf("error decoding key: %w", err) - } - - decodedValue, err := d.DecodeBtfBinary(ctx, mapSpec.BTF.Value, value) - if err != nil { - return fmt.Errorf("error decoding value: %w", err) - } - - // TODO: Check this information at load time - if len(decodedValue) > 1 { - log.Fatal("only 1 value allowed") - } - intVal, ok := decodedValue[""].(uint64) - if !ok { - log.Fatal("only uint64 allowed") - } - stringLabels := stringify(decodedKey) - instrument.Set(ctx, int64(intVal), stringLabels) - thisKvPair := KvPair{Key: stringLabels, Value: fmt.Sprint(intVal)} - watcher.SendEntry(MapEntry{ - Name: name, - Entry: thisKvPair, - }) - } - - case <-ctx.Done(): - // fmt.Println("got done in hashmap loop, returning") - return nil - } - } -} - -func stringify(decodedBinary map[string]interface{}) map[string]string { - keyMap := map[string]string{} - for k, v := range decodedBinary { - valAsStr := fmt.Sprint(v) - keyMap[k] = valAsStr - } - return keyMap -} - -func getLabelsForHashMapKey(mapSpec *ebpf.MapSpec) ([]string, error) { - structKey, ok := mapSpec.BTF.Key.(*btf.Struct) - if !ok { - return nil, fmt.Errorf("hash map keys can only be a struct, found %s", mapSpec.BTF.Value.String()) - } - - return getLabelsForBtfStruct(structKey), nil -} - -func getLabelsForBtfStruct(structKey *btf.Struct) []string { - keys := make([]string, 0, len(structKey.Members)) - for _, v := range structKey.Members { - keys = append(keys, v.Name) - } - return keys -} - -type noop struct{} - -func (n *noop) Increment( - ctx context.Context, - decodedKey map[string]string, -) { -} - -func (n *noop) Set( - ctx context.Context, - val int64, - labels map[string]string, -) { + return progLink, coll.Maps, nil } func createDir(ctx context.Context, path string, perm os.FileMode) error { diff --git a/pkg/loader/mapwatcher/receiver.go b/pkg/loader/mapwatcher/receiver.go new file mode 100644 index 0000000..2672b2c --- /dev/null +++ b/pkg/loader/mapwatcher/receiver.go @@ -0,0 +1,20 @@ +package mapwatcher + +type KvPair struct { + Key map[string]string + Value string + Hash uint64 +} + +type MapEntry struct { + Name string + Entry KvPair +} + +// MapEventReceiver provides a receiver that handles various map events. +type MapEventReceiver interface { + NewRingBuf(name string, keys []string) + NewHashMap(name string, keys []string) + SendEntry(entry MapEntry) + Close() +} diff --git a/pkg/loader/mapwatcher/watcher.go b/pkg/loader/mapwatcher/watcher.go new file mode 100644 index 0000000..8d3ed3b --- /dev/null +++ b/pkg/loader/mapwatcher/watcher.go @@ -0,0 +1,246 @@ +package mapwatcher + +import ( + "context" + "errors" + "fmt" + "log" + "time" + + "github.com/cilium/ebpf" + "github.com/cilium/ebpf/btf" + "github.com/cilium/ebpf/ringbuf" + "golang.org/x/sync/errgroup" + + "github.com/solo-io/bumblebee/pkg/decoder" + "github.com/solo-io/bumblebee/pkg/loader/util" + "github.com/solo-io/bumblebee/pkg/stats" + "github.com/solo-io/go-utils/contextutils" +) + +type WatchedMap struct { + Name string + Labels []string + + BTF *btf.Map + MapType ebpf.MapType + MapSpec *ebpf.MapSpec + + ValueStruct *btf.Struct +} + +type Watcher interface { + // WatchMaps watches the loaded maps and notifies the given receiver of events. + WatchMaps(ctx context.Context, receiver MapEventReceiver) error + // Maps returns the list of WatchedMaps + Maps() map[string]WatchedMap +} + +func New( + watchedMaps map[string]WatchedMap, + loadedMaps map[string]*ebpf.Map, + decoderFactory decoder.DecoderFactory, + provider stats.MetricsProvider, +) Watcher { + return &watcher{ + watchedMaps: watchedMaps, + loadedMaps: loadedMaps, + decoderFactory: decoderFactory, + metricsProvider: provider, + } +} + +type watcher struct { + // watchedMaps represent the maps we care to watch. + watchedMaps map[string]WatchedMap + // loadedMaps represent the maps currently loaded into the kernel. + loadedMaps map[string]*ebpf.Map + // decoderFactory provides a mechanism to decode various BTF types. + decoderFactory decoder.DecoderFactory + // metricsProvider provides Prometheus metrics. + metricsProvider stats.MetricsProvider +} + +func (w *watcher) WatchMaps(ctx context.Context, receiver MapEventReceiver) error { + // on shutdown notify receiver we have no more entries to send. + defer receiver.Close() + contextutils.LoggerFrom(ctx).Info("enter watchMaps()") + eg, ctx := errgroup.WithContext(ctx) + for name, bpfMap := range w.watchedMaps { + name := name + bpfMap := bpfMap + + switch bpfMap.MapType { + case ebpf.RingBuf: + var increment stats.IncrementInstrument + if util.IsCounterMap(bpfMap.MapSpec) { + increment = w.metricsProvider.NewIncrementCounter(name, bpfMap.Labels) + } else if util.IsPrintMap(bpfMap.MapSpec) { + increment = &noop{} + } + eg.Go(func() error { + receiver.NewRingBuf(name, bpfMap.Labels) + return w.startRingBuf(ctx, bpfMap.ValueStruct, w.loadedMaps[name], increment, name, receiver) + }) + case ebpf.Array: + fallthrough + case ebpf.Hash, ebpf.LRUHash: + labelKeys := bpfMap.Labels + var instrument stats.SetInstrument + if util.IsCounterMap(bpfMap.MapSpec) { + instrument = w.metricsProvider.NewSetCounter(bpfMap.Name, labelKeys) + } else if util.IsGaugeMap(bpfMap.MapSpec) { + instrument = w.metricsProvider.NewGauge(bpfMap.Name, labelKeys) + } else { + instrument = &noop{} + } + eg.Go(func() error { + // TODO: output type of instrument in UI? + receiver.NewHashMap(name, labelKeys) + return w.startHashMap(ctx, bpfMap.MapSpec, w.loadedMaps[name], instrument, name, receiver) + }) + default: + // TODO: Support more map types + return errors.New("unsupported map type") + } + } + + err := eg.Wait() + contextutils.LoggerFrom(ctx).Info("after waitgroup") + return err +} + +func (w *watcher) Maps() map[string]WatchedMap { + return w.watchedMaps +} + +func (w *watcher) startRingBuf( + ctx context.Context, + valueStruct *btf.Struct, + liveMap *ebpf.Map, + incrementInstrument stats.IncrementInstrument, + name string, + watcher MapEventReceiver, +) error { + // Initialize decoder + d := w.decoderFactory() + logger := contextutils.LoggerFrom(ctx) + + // Open a ringbuf reader from userspace RINGBUF map described in the + // eBPF C program. + rd, err := ringbuf.NewReader(liveMap) + if err != nil { + return fmt.Errorf("opening ringbuf reader: %v", err) + } + defer rd.Close() + // Close the reader when the process receives a signal, which will exit + // the read loop. + go func() { + <-ctx.Done() + logger.Info("in ringbuf watcher, got done...") + if err := rd.Close(); err != nil { + logger.Infof("error while closing ringbuf '%s' reader: %s", name, err) + } + logger.Info("after reader.Close()") + }() + + for { + record, err := rd.Read() + if err != nil { + if errors.Is(err, ringbuf.ErrClosed) { + logger.Info("ringbuf closed...") + return nil + } + logger.Infof("error while reading from ringbuf '%s' reader: %s", name, err) + continue + } + result, err := d.DecodeBtfBinary(ctx, valueStruct, record.RawSample) + if err != nil { + return err + } + + stringLabels := stringify(result) + incrementInstrument.Increment(ctx, stringLabels) + watcher.SendEntry(MapEntry{ + Name: name, + Entry: KvPair{ + Key: stringLabels, + }, + }) + } +} + +func (w *watcher) startHashMap( + ctx context.Context, + mapSpec *ebpf.MapSpec, + liveMap *ebpf.Map, + instrument stats.SetInstrument, + name string, + watcher MapEventReceiver, +) error { + d := w.decoderFactory() + + ticker := time.NewTicker(1 * time.Second) + for { + select { + case <-ticker.C: + mapIter := liveMap.Iterate() + for { + // Use generic key,value so we can decode ourselves + var ( + key, value []byte + ) + if !mapIter.Next(&key, &value) { + break + } + if err := mapIter.Err(); err != nil { + return err + } + decodedKey, err := d.DecodeBtfBinary(ctx, mapSpec.BTF.Key, key) + if err != nil { + return fmt.Errorf("error decoding key: %w", err) + } + + decodedValue, err := d.DecodeBtfBinary(ctx, mapSpec.BTF.Value, value) + if err != nil { + return fmt.Errorf("error decoding value: %w", err) + } + + // TODO: Check this information at load time + if len(decodedValue) > 1 { + log.Fatal("only 1 value allowed") + } + intVal, ok := decodedValue[""].(uint64) + if !ok { + log.Fatal("only uint64 allowed") + } + stringLabels := stringify(decodedKey) + instrument.Set(ctx, int64(intVal), stringLabels) + thisKvPair := KvPair{Key: stringLabels, Value: fmt.Sprint(intVal)} + watcher.SendEntry(MapEntry{ + Name: name, + Entry: thisKvPair, + }) + } + + case <-ctx.Done(): + // fmt.Println("got done in hashmap loop, returning") + return nil + } + } +} + +func stringify(decodedBinary map[string]interface{}) map[string]string { + keyMap := map[string]string{} + for k, v := range decodedBinary { + valAsStr := fmt.Sprint(v) + keyMap[k] = valAsStr + } + return keyMap +} + +type noop struct{} + +func (n *noop) Increment(_ context.Context, _ map[string]string) {} + +func (n *noop) Set(_ context.Context, _ int64, _ map[string]string) {} diff --git a/pkg/loader/util/util.go b/pkg/loader/util/util.go new file mode 100644 index 0000000..ef2ead0 --- /dev/null +++ b/pkg/loader/util/util.go @@ -0,0 +1,48 @@ +package util + +import ( + "fmt" + "strings" + + "github.com/cilium/ebpf" + "github.com/cilium/ebpf/btf" +) + +const ( + counterMapType = "counter" + gaugeMapType = "gauge" + printMapType = "print" +) + +func IsPrintMap(spec *ebpf.MapSpec) bool { + return strings.Contains(spec.SectionName, printMapType) +} + +func IsGaugeMap(spec *ebpf.MapSpec) bool { + return strings.Contains(spec.SectionName, gaugeMapType) +} + +func IsCounterMap(spec *ebpf.MapSpec) bool { + return strings.Contains(spec.SectionName, counterMapType) +} + +func IsTrackedMap(spec *ebpf.MapSpec) bool { + return IsCounterMap(spec) || IsGaugeMap(spec) || IsPrintMap(spec) +} + +func GetLabelsForBtfStruct(structKey *btf.Struct) []string { + keys := make([]string, 0, len(structKey.Members)) + for _, v := range structKey.Members { + keys = append(keys, v.Name) + } + return keys +} + +func GetLabelsForHashMapKey(mapSpec *ebpf.MapSpec) ([]string, error) { + structKey, ok := mapSpec.BTF.Key.(*btf.Struct) + if !ok { + return nil, fmt.Errorf("hash map keys can only be a struct, found %s", mapSpec.BTF.Value.String()) + } + + return GetLabelsForBtfStruct(structKey), nil +} diff --git a/pkg/loader/watcher.go b/pkg/loader/watcher.go deleted file mode 100644 index 13efad5..0000000 --- a/pkg/loader/watcher.go +++ /dev/null @@ -1,38 +0,0 @@ -package loader - -type KvPair struct { - Key map[string]string - Value string - Hash uint64 -} - -type MapEntry struct { - Name string - Entry KvPair -} - -type MapWatcher interface { - NewRingBuf(name string, keys []string) - NewHashMap(name string, keys []string) - SendEntry(entry MapEntry) - Close() -} - -type noopWatcher struct{} - -func (w *noopWatcher) NewRingBuf(name string, keys []string) { - // noop -} -func (w *noopWatcher) NewHashMap(name string, keys []string) { - // noop -} -func (w *noopWatcher) SendEntry(entry MapEntry) { - // noop -} -func (w *noopWatcher) Close() { - // noop -} - -func NewNoopWatcher() *noopWatcher { - return &noopWatcher{} -} diff --git a/pkg/tui/filter.go b/pkg/tui/filter.go index d33b4b0..6edd7ef 100644 --- a/pkg/tui/filter.go +++ b/pkg/tui/filter.go @@ -4,10 +4,10 @@ import ( "fmt" "regexp" - "github.com/solo-io/bumblebee/pkg/loader" + "github.com/solo-io/bumblebee/pkg/loader/mapwatcher" ) -func (a *App) filterMatch(entry loader.MapEntry) bool { +func (a *App) filterMatch(entry mapwatcher.MapEntry) bool { if a.filter == nil { // no filters defined, allow entry return true @@ -27,7 +27,7 @@ func (a *App) filterMatch(entry loader.MapEntry) bool { return false } -func BuildFilter(filterString []string, watchedMaps map[string]loader.WatchedMap) (map[string]Filter, error) { +func BuildFilter(filterString []string, watchedMaps map[string]mapwatcher.WatchedMap) (map[string]Filter, error) { if len(filterString) == 0 { return nil, nil } diff --git a/pkg/tui/tui.go b/pkg/tui/tui.go index ae74c1b..6c4ae4c 100644 --- a/pkg/tui/tui.go +++ b/pkg/tui/tui.go @@ -11,10 +11,12 @@ import ( "github.com/gdamore/tcell/v2" "github.com/mitchellh/hashstructure/v2" "github.com/rivo/tview" - "github.com/solo-io/bumblebee/pkg/loader" - "github.com/solo-io/go-utils/contextutils" "go.uber.org/zap" "golang.org/x/sync/errgroup" + + "github.com/solo-io/bumblebee/pkg/loader" + "github.com/solo-io/bumblebee/pkg/loader/mapwatcher" + "github.com/solo-io/go-utils/contextutils" ) const titleText = `[aqua] __ @@ -39,7 +41,7 @@ type Filter struct { type MapValue struct { Hash uint64 - Entries []loader.KvPair + Entries []mapwatcher.KvPair Table *tview.Table Index int Type ebpf.MapType @@ -53,7 +55,7 @@ type AppOpts struct { } type App struct { - Entries chan loader.MapEntry + Entries chan mapwatcher.MapEntry tviewApp *tview.Application flex *tview.Flex @@ -73,7 +75,10 @@ var mapOfMaps = make(map[string]MapValue) var mapMutex = sync.RWMutex{} var currentIndex int -func buildTView(logger *zap.SugaredLogger, cancel context.CancelFunc, progLocation string) (*tview.Application, *tview.Flex) { +func buildTView(logger *zap.SugaredLogger, cancel context.CancelFunc, progLocation string) ( + *tview.Application, + *tview.Flex, +) { app := tview.NewApplication() app.SetInputCapture(func(event *tcell.EventKey) *tcell.EventKey { if event.Key() == tcell.KeyCtrlC || (event.Key() == tcell.KeyRune && event.Rune() == 'q') { @@ -127,14 +132,14 @@ func (a *App) Close() { close(a.Entries) } -func (a *App) Run(ctx context.Context, progLoader loader.Loader, loaderOpts *loader.LoadOptions) error { +func (a *App) Run(ctx context.Context, mapWatcher mapwatcher.Watcher) error { logger := contextutils.LoggerFrom(ctx) ctx, cancel := context.WithCancel(ctx) app, flex := buildTView(logger, cancel, a.progLocation) a.tviewApp = app a.flex = flex - a.Entries = make(chan loader.MapEntry, 20) + a.Entries = make(chan mapwatcher.MapEntry, 20) eg := errgroup.Group{} eg.Go(func() error { @@ -152,10 +157,8 @@ func (a *App) Run(ctx context.Context, progLoader loader.Loader, loaderOpts *loa }) eg.Go(func() error { - logger.Info("calling Load()") - err := progLoader.Load(ctx, loaderOpts) - logger.Info("returned from Load()") - return err + logger.Info("calling WatchMaps()") + return mapWatcher.WatchMaps(ctx, a) }) err := eg.Wait() @@ -180,7 +183,7 @@ func (a *App) watch(ctx context.Context) { logger.Info("no more entries, returning from Watch()") } -func (a *App) renderRingBuf(ctx context.Context, incoming loader.MapEntry) { +func (a *App) renderRingBuf(ctx context.Context, incoming mapwatcher.MapEntry) { current := mapOfMaps[incoming.Name] current.Entries = append(current.Entries, incoming.Entry) @@ -209,7 +212,7 @@ func (a *App) renderRingBuf(ctx context.Context, incoming loader.MapEntry) { } } -func (a *App) renderHash(ctx context.Context, incoming loader.MapEntry) { +func (a *App) renderHash(ctx context.Context, incoming mapwatcher.MapEntry) { logger := contextutils.LoggerFrom(ctx) current := mapOfMaps[incoming.Name] if len(current.Entries) == 0 { @@ -282,7 +285,7 @@ func (a *App) NewHashMap(name string, keys []string) { a.makeMapValue(name, keys, ebpf.Hash) } -func (a *App) SendEntry(entry loader.MapEntry) { +func (a *App) SendEntry(entry mapwatcher.MapEntry) { if a.filterMatch(entry) { a.Entries <- entry } @@ -295,7 +298,7 @@ func (a *App) makeMapValue(name string, keys []string, mapType ebpf.MapType) { sort.Strings(keysCopy) // create the array for containing the entries - entries := make([]loader.KvPair, 0, 10) + entries := make([]mapwatcher.KvPair, 0, 10) table := tview.NewTable().SetFixed(1, 0) table.SetBorder(true).SetTitle(name) From cc18942f3739a1d692c8e32e7ec9310b05cc85a0 Mon Sep 17 00:00:00 2001 From: Aidan Carson Date: Tue, 23 May 2023 13:13:42 -0700 Subject: [PATCH 2/4] Move coll.Close --- pkg/cli/internal/commands/run/run.go | 4 ++-- pkg/loader/loader.go | 12 ++++++++---- 2 files changed, 10 insertions(+), 6 deletions(-) diff --git a/pkg/cli/internal/commands/run/run.go b/pkg/cli/internal/commands/run/run.go index 53c2e11..dd74e0b 100644 --- a/pkg/cli/internal/commands/run/run.go +++ b/pkg/cli/internal/commands/run/run.go @@ -134,7 +134,7 @@ func run(cmd *cobra.Command, args []string, opts *runOptions) error { } contextutils.LoggerFrom(ctx).Info("calling Load()") - progLink, loadedMaps, err := loader.Load(ctx, loaderOpts) + loadedMaps, closeLifecycle, err := loader.Load(ctx, loaderOpts) contextutils.LoggerFrom(ctx).Info("returned from Load()") if err != nil { return err @@ -142,7 +142,7 @@ func run(cmd *cobra.Command, args []string, opts *runOptions) error { // Close our loaded program only if there's nothing set to explicitly extend our program's lifetime. if opts.pinMaps == "" && opts.pinProgs == "" { - defer progLink.Close() + defer closeLifecycle() } if opts.notty { diff --git a/pkg/loader/loader.go b/pkg/loader/loader.go index ec50acd..371f762 100644 --- a/pkg/loader/loader.go +++ b/pkg/loader/loader.go @@ -31,7 +31,7 @@ type LoadOptions struct { type Loader interface { Parse(ctx context.Context, reader io.ReaderAt) (*ParsedELF, error) - Load(ctx context.Context, opts *LoadOptions) (link.Link, map[string]*ebpf.Map, error) + Load(ctx context.Context, opts *LoadOptions) (map[string]*ebpf.Map, func(), error) } func Parse(ctx context.Context, progReader io.ReaderAt) (*ParsedELF, error) { @@ -96,7 +96,7 @@ func Parse(ctx context.Context, progReader io.ReaderAt) (*ParsedELF, error) { return &loadOptions, nil } -func Load(ctx context.Context, opts *LoadOptions) (link.Link, map[string]*ebpf.Map, error) { +func Load(ctx context.Context, opts *LoadOptions) (map[string]*ebpf.Map, func(), error) { // TODO: add invariant checks on opts contextutils.LoggerFrom(ctx).Info("enter Load()") @@ -129,7 +129,6 @@ func Load(ctx context.Context, opts *LoadOptions) (link.Link, map[string]*ebpf.M if err != nil { return nil, nil, err } - defer coll.Close() var progLink link.Link // For each program, add kprope/tracepoint @@ -184,7 +183,12 @@ func Load(ctx context.Context, opts *LoadOptions) (link.Link, map[string]*ebpf.M } } - return progLink, coll.Maps, nil + closeLifecycle := func() { + coll.Close() + progLink.Close() + } + + return coll.Maps, closeLifecycle, nil } func createDir(ctx context.Context, path string, perm os.FileMode) error { From 4fd22dbbccd2162a53dac711828f532a8453c377 Mon Sep 17 00:00:00 2001 From: Aidan Carson Date: Tue, 23 May 2023 13:52:25 -0700 Subject: [PATCH 3/4] Add array.o back --- pkg/spec/array.o | Bin 0 -> 6904 bytes 1 file changed, 0 insertions(+), 0 deletions(-) create mode 100644 pkg/spec/array.o diff --git a/pkg/spec/array.o b/pkg/spec/array.o new file mode 100644 index 0000000000000000000000000000000000000000..7082edd994afca0c46a9699cae99c87be3395d4a GIT binary patch literal 6904 zcmcIoe~esJ6~1qN?7rFA-FCYz>=wb-BCudH-K8s33T0Sm>B6>M+JbE%yw1+P-JN!J zcAS|l+XA5m8;JeGikj9aX#+7rqNW;2qsFk9U^GF4(SVAHV4_G6Q)3JoiR<^>chAh* zy954VJlT8C_nmX@k9XgF=ib>zwv23zIgS!>)Vr#|#H!TXss_$#m{k`c^1KPouYPZV z=JPE|w}0^91hf00X=-Xy#E0~(R$3l z8NsLPwxde$*}84ujDdBv&7hM;J8IN!22(v;qPmA3us(9yc z_szXGq&KBwm}1;MiVpa96l`v$dNop|D{aat@G7pl(KS zT2k0wp~UZmPV7R}p^|H%TAY@vpxoBjCa4mMb5SF_)pNSstuS(R+~u50B6dU9&uHv=1(nnFd(^S6Kch~l6{xjvt)2bv zvhL+WcdtfYPVZ}|S2BRhx}Op}aoRmD zc=9ZECP*u?(^z?8{h@MYW-Rc7x!iQQ7-aG)qkO+u$Oolrpz>3>3g|7ncHOei-#xs4 zi@#^z=oa5srqy(=tZGNffhv@0YC1RP2c=r25LDHn;E0+nRST1)VBDkA_h&a;t5^=_ zinD<#&XguSUD;vR4XRlb%*{vlesN~z&}`WciotZkr>kq-W5K8)Le`|XJXtT*P`Pc;mxp*szdXeHOzu6FTo(T{5E)*I29$?Np zi!z4NigFBP0tE+#nnz*&g6OkA+JVQIFCIX_RU^R!`3MSi)6VPZ^vA{;I?SKP&OT-;j8$I+LaDANV}R&uS)y4sVOlT=BAPRp24F`0 zKG{O29y{%X`6ZH|JKP%Y@eTwR{<${G$AmS>H4pBeQu;yXvdf zDzo{TXP(Zx27_MhJ*Yeg*3&}<_HOd}H-(1l29bqzf`f5jop%k>T<9Z$u}Qw|yRxy0 z>%1$`_8JbQrHq>Kuh5UH)@j3g z@#!QVSL&?cH}X^Z0tkgU7C&Pa5_lZ

%lv(Vnn6XD~ZU=pkLl`B!HLzHA5M@ZlZ2 zC6&pnp9*s0L8Yo>n>LPr1m$O@r)Nr;DTTrLR;PkuIjH#K!9+DQV$N`d67O-6skSxi zTjGJ2Y;TEQnn|W^YKf0|E%6WE(Gt(ykxaJ4*Lq3k@>ncPG1790egc++`s>0H2MqBr z9W8Rbt69AMJ4l=tjQy+_B08>5m59N>xZWbo>^ymvO1z%&7W?9jn`_-c{N+7u zfW1q#)Z;vZ?$dUjgrVr%3D|Eq-z8^Nwz%}f#7)foLLui_DtBt(@{@*~=lp6Eiuf-J zDYt&fY*5c1u`~{2u)Z)Lk_#Z7kZ#eH`{9b+);r#yAp-||j zD~m!AJ{rI!U?b({qMBfWc816`N!ttl74h%aelqP0WrdT{Y&5H(p zr%mFjDYuD8T(M9JqU|OsxoH@}PKq;m>ArD*vhn+0^P1k}R0b2R<;I|oOlxhPNR12) z_4^w_b#1sB|9kx^y5O4IQZ+9`|Dz`2G@o(Z!V5oba^8p|E5Op+Pf__A$OpBTeI#wT zH=^=12+u29%KDdxQl4?}k|%qMrsc+dKQza;*WiNj%Xw?4julM37NuEx3G^5g5qtiR z!N>Ntc*giW%VEco+cq}&jWPc7zqB8EpZ34thX#4a=#VWPYcm|p*8ikg|JTjJa{U=6 z&oArW2OFo_j}n!69x{-V*24)E{Wf9*>jtJ8<)H-GWcmr&(%3iQXg2?Hci6%+4LN`2 zReS#Y-$=9ZPcRtrx~?0Q9V_e7hzQY1GyXZ_U>yH{Vvm0mHoH$YDVvSU^^JT+w3qjH5VmIHpEl!v#k3b?$MW9SZ2SXed>M!1^Ui9IzXP`T$ifWN mF_gNLO?&yS*`8vL|2x>MEw1A>a Date: Thu, 25 May 2023 07:41:59 -0700 Subject: [PATCH 4/4] Add multiple links --- pkg/loader/loader.go | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/pkg/loader/loader.go b/pkg/loader/loader.go index 371f762..fa514b4 100644 --- a/pkg/loader/loader.go +++ b/pkg/loader/loader.go @@ -129,10 +129,11 @@ func Load(ctx context.Context, opts *LoadOptions) (map[string]*ebpf.Map, func(), if err != nil { return nil, nil, err } - var progLink link.Link + var progLinks []link.Link // For each program, add kprope/tracepoint for name, prog := range spec.Programs { + var progLink link.Link select { case <-ctx.Done(): contextutils.LoggerFrom(ctx).Info("while loading progs context is done") @@ -167,6 +168,7 @@ func Load(ctx context.Context, opts *LoadOptions) (map[string]*ebpf.Map, func(), default: return nil, nil, errors.New("only kprobe programs supported") } + progLinks = append(progLinks, progLink) if opts.PinProgs != "" { if err := createDir(ctx, opts.PinProgs, 0700); err != nil { @@ -185,7 +187,9 @@ func Load(ctx context.Context, opts *LoadOptions) (map[string]*ebpf.Map, func(), closeLifecycle := func() { coll.Close() - progLink.Close() + for _, link := range progLinks { + link.Close() + } } return coll.Maps, closeLifecycle, nil