From fb6941e6bd00638bd86bdc4db33977db96fae113 Mon Sep 17 00:00:00 2001 From: Ryan D McGuire Date: Fri, 21 Mar 2025 16:26:23 -0400 Subject: [PATCH] fix recorder npe, implement filters --- pkg/weather/recorder/recorder_get.go | 11 ++- pkg/weather/recorder/recorders/memory/get.go | 73 ++++++++++++++------ 2 files changed, 61 insertions(+), 23 deletions(-) diff --git a/pkg/weather/recorder/recorder_get.go b/pkg/weather/recorder/recorder_get.go index 676e632..4f2536c 100644 --- a/pkg/weather/recorder/recorder_get.go +++ b/pkg/weather/recorder/recorder_get.go @@ -8,7 +8,6 @@ import ( "k8s.io/utils/ptr" pb "gitea.libretechconsulting.com/rmcguire/ambient-local-exporter/api/v1alpha1/weather" - "gitea.libretechconsulting.com/rmcguire/ambient-local-exporter/pkg/util" "gitea.libretechconsulting.com/rmcguire/ambient-local-exporter/pkg/weather" ) @@ -27,10 +26,16 @@ func (w *WeatherRecorder) Get(ctx context.Context, req *pb.GetWeatherRequest) ( req.Limit = ptr.To(int32(1)) } + var filterSN, filterST string + if req.Opts != nil { + filterSN = req.Opts.GetStationName() + filterST = req.Opts.GetStationType() + } + ctx, span := w.tracer.Start(ctx, "getWeatherRecorder") span.SetAttributes( - attribute.String("stationNameFilter", util.DerefStr(req.Opts.StationName)), - attribute.String("stationTypeFilter", util.DerefStr(req.Opts.StationType)), + attribute.String("stationNameFilter", filterSN), + attribute.String("stationTypeFilter", filterST), attribute.Int("last", int(*req.Limit)), attribute.Int("currentSize", w.Count(ctx)), ) diff --git a/pkg/weather/recorder/recorders/memory/get.go b/pkg/weather/recorder/recorders/memory/get.go index dffda8f..9831a6f 100644 --- a/pkg/weather/recorder/recorders/memory/get.go +++ b/pkg/weather/recorder/recorders/memory/get.go @@ -9,6 +9,7 @@ import ( "go.opentelemetry.io/otel/codes" pb "gitea.libretechconsulting.com/rmcguire/ambient-local-exporter/api/v1alpha1/weather" + "gitea.libretechconsulting.com/rmcguire/ambient-local-exporter/pkg/util" "gitea.libretechconsulting.com/rmcguire/ambient-local-exporter/pkg/weather" ) @@ -23,8 +24,6 @@ func (r *MemoryRecorder) Get(ctx context.Context, req *pb.GetWeatherRequest) ( span.AddEvent("acquired lock on recorder cache") - updates := r.updates - if r.count() == 0 { err := errors.New("no recorded updates to get") span.RecordError(err) @@ -33,24 +32,8 @@ func (r *MemoryRecorder) Get(ctx context.Context, req *pb.GetWeatherRequest) ( span.RecordError(errors.New("requested more updates than recorded")) } - // Filter by Station Name if requested - if req.Opts.StationName != nil && *req.Opts.StationName != "" { - updates = slices.DeleteFunc(updates, func(u *weather.WeatherUpdate) bool { - return u.StationConfig == nil || u.StationConfig.Name != *req.Opts.StationName - }) - } - - // Filter by Station Type if requested - if req.Opts.StationType != nil && *req.Opts.StationType != "" { - updates = slices.DeleteFunc(updates, func(u *weather.WeatherUpdate) bool { - return u.StationType == nil || *u.StationType != *req.Opts.StationType - }) - } - - // Limit results - if len(updates) > int(*req.Limit) { - updates = updates[len(updates)-int(*req.Limit):] - } + updates := r.getUpdatesFromReq(req) + span.AddEvent("request limit/opts applied to updates") span.SetAttributes(attribute.Int("retrieved", len(updates))) span.SetStatus(codes.Ok, "") @@ -58,6 +41,49 @@ func (r *MemoryRecorder) Get(ctx context.Context, req *pb.GetWeatherRequest) ( return updates, nil } +func (r *MemoryRecorder) getUpdatesFromReq(req *pb.GetWeatherRequest) []*weather.WeatherUpdate { + if req.Opts == nil { + return limitUpdates(r.updates, int(req.GetLimit())) + } + + return r.applyOptsToUpdates(r.updates, int(req.GetLimit()), req.Opts) +} + +func (r *MemoryRecorder) applyOptsToUpdates(updates []*weather.WeatherUpdate, limit int, opts *pb.GetWeatherOpts) []*weather.WeatherUpdate { + if opts == nil { + return updates + } else if opts.StationName == nil && opts.StationType == nil { + return updates + } + + filtered := make([]*weather.WeatherUpdate, 0, limit) + + for i := len(updates) - 1; i >= 0; i-- { + update := updates[i] + match := true + + if opts.GetStationName() != "" { + if update.GetStationName() != opts.GetStationName() { + match = false + } + } + if opts.GetStationType() != "" { + if util.DerefStr(update.StationType) != opts.GetStationType() { + match = false + } + } + + if match { + filtered = append(filtered, update) + if len(filtered) >= limit { + return filtered + } + } + } + + return slices.Clip(filtered) +} + func (r *MemoryRecorder) Count(ctx context.Context) int { _, span := r.tracer.Start(ctx, "countWeatherRecorder") defer span.End() @@ -73,3 +99,10 @@ func (r *MemoryRecorder) Count(ctx context.Context) int { func (r *MemoryRecorder) count() int { return len(r.updates) } + +func limitUpdates(updates []*weather.WeatherUpdate, limit int) []*weather.WeatherUpdate { + if len(updates) > limit { + return updates[len(updates)-limit:] + } + return updates +}