fix recorder npe, implement filters
This commit is contained in:
		| @@ -8,7 +8,6 @@ import ( | |||||||
| 	"k8s.io/utils/ptr" | 	"k8s.io/utils/ptr" | ||||||
|  |  | ||||||
| 	pb "gitea.libretechconsulting.com/rmcguire/ambient-local-exporter/api/v1alpha1/weather" | 	pb "gitea.libretechconsulting.com/rmcguire/ambient-local-exporter/api/v1alpha1/weather" | ||||||
| 	"gitea.libretechconsulting.com/rmcguire/ambient-local-exporter/pkg/util" |  | ||||||
| 	"gitea.libretechconsulting.com/rmcguire/ambient-local-exporter/pkg/weather" | 	"gitea.libretechconsulting.com/rmcguire/ambient-local-exporter/pkg/weather" | ||||||
| ) | ) | ||||||
|  |  | ||||||
| @@ -27,10 +26,16 @@ func (w *WeatherRecorder) Get(ctx context.Context, req *pb.GetWeatherRequest) ( | |||||||
| 		req.Limit = ptr.To(int32(1)) | 		req.Limit = ptr.To(int32(1)) | ||||||
| 	} | 	} | ||||||
|  |  | ||||||
|  | 	var filterSN, filterST string | ||||||
|  | 	if req.Opts != nil { | ||||||
|  | 		filterSN = req.Opts.GetStationName() | ||||||
|  | 		filterST = req.Opts.GetStationType() | ||||||
|  | 	} | ||||||
|  |  | ||||||
| 	ctx, span := w.tracer.Start(ctx, "getWeatherRecorder") | 	ctx, span := w.tracer.Start(ctx, "getWeatherRecorder") | ||||||
| 	span.SetAttributes( | 	span.SetAttributes( | ||||||
| 		attribute.String("stationNameFilter", util.DerefStr(req.Opts.StationName)), | 		attribute.String("stationNameFilter", filterSN), | ||||||
| 		attribute.String("stationTypeFilter", util.DerefStr(req.Opts.StationType)), | 		attribute.String("stationTypeFilter", filterST), | ||||||
| 		attribute.Int("last", int(*req.Limit)), | 		attribute.Int("last", int(*req.Limit)), | ||||||
| 		attribute.Int("currentSize", w.Count(ctx)), | 		attribute.Int("currentSize", w.Count(ctx)), | ||||||
| 	) | 	) | ||||||
|   | |||||||
| @@ -9,6 +9,7 @@ import ( | |||||||
| 	"go.opentelemetry.io/otel/codes" | 	"go.opentelemetry.io/otel/codes" | ||||||
|  |  | ||||||
| 	pb "gitea.libretechconsulting.com/rmcguire/ambient-local-exporter/api/v1alpha1/weather" | 	pb "gitea.libretechconsulting.com/rmcguire/ambient-local-exporter/api/v1alpha1/weather" | ||||||
|  | 	"gitea.libretechconsulting.com/rmcguire/ambient-local-exporter/pkg/util" | ||||||
| 	"gitea.libretechconsulting.com/rmcguire/ambient-local-exporter/pkg/weather" | 	"gitea.libretechconsulting.com/rmcguire/ambient-local-exporter/pkg/weather" | ||||||
| ) | ) | ||||||
|  |  | ||||||
| @@ -23,8 +24,6 @@ func (r *MemoryRecorder) Get(ctx context.Context, req *pb.GetWeatherRequest) ( | |||||||
|  |  | ||||||
| 	span.AddEvent("acquired lock on recorder cache") | 	span.AddEvent("acquired lock on recorder cache") | ||||||
|  |  | ||||||
| 	updates := r.updates |  | ||||||
|  |  | ||||||
| 	if r.count() == 0 { | 	if r.count() == 0 { | ||||||
| 		err := errors.New("no recorded updates to get") | 		err := errors.New("no recorded updates to get") | ||||||
| 		span.RecordError(err) | 		span.RecordError(err) | ||||||
| @@ -33,24 +32,8 @@ func (r *MemoryRecorder) Get(ctx context.Context, req *pb.GetWeatherRequest) ( | |||||||
| 		span.RecordError(errors.New("requested more updates than recorded")) | 		span.RecordError(errors.New("requested more updates than recorded")) | ||||||
| 	} | 	} | ||||||
|  |  | ||||||
| 	// Filter by Station Name if requested | 	updates := r.getUpdatesFromReq(req) | ||||||
| 	if req.Opts.StationName != nil && *req.Opts.StationName != "" { | 	span.AddEvent("request limit/opts applied to updates") | ||||||
| 		updates = slices.DeleteFunc(updates, func(u *weather.WeatherUpdate) bool { |  | ||||||
| 			return u.StationConfig == nil || u.StationConfig.Name != *req.Opts.StationName |  | ||||||
| 		}) |  | ||||||
| 	} |  | ||||||
|  |  | ||||||
| 	// Filter by Station Type if requested |  | ||||||
| 	if req.Opts.StationType != nil && *req.Opts.StationType != "" { |  | ||||||
| 		updates = slices.DeleteFunc(updates, func(u *weather.WeatherUpdate) bool { |  | ||||||
| 			return u.StationType == nil || *u.StationType != *req.Opts.StationType |  | ||||||
| 		}) |  | ||||||
| 	} |  | ||||||
|  |  | ||||||
| 	// Limit results |  | ||||||
| 	if len(updates) > int(*req.Limit) { |  | ||||||
| 		updates = updates[len(updates)-int(*req.Limit):] |  | ||||||
| 	} |  | ||||||
|  |  | ||||||
| 	span.SetAttributes(attribute.Int("retrieved", len(updates))) | 	span.SetAttributes(attribute.Int("retrieved", len(updates))) | ||||||
| 	span.SetStatus(codes.Ok, "") | 	span.SetStatus(codes.Ok, "") | ||||||
| @@ -58,6 +41,49 @@ func (r *MemoryRecorder) Get(ctx context.Context, req *pb.GetWeatherRequest) ( | |||||||
| 	return updates, nil | 	return updates, nil | ||||||
| } | } | ||||||
|  |  | ||||||
|  | func (r *MemoryRecorder) getUpdatesFromReq(req *pb.GetWeatherRequest) []*weather.WeatherUpdate { | ||||||
|  | 	if req.Opts == nil { | ||||||
|  | 		return limitUpdates(r.updates, int(req.GetLimit())) | ||||||
|  | 	} | ||||||
|  |  | ||||||
|  | 	return r.applyOptsToUpdates(r.updates, int(req.GetLimit()), req.Opts) | ||||||
|  | } | ||||||
|  |  | ||||||
|  | func (r *MemoryRecorder) applyOptsToUpdates(updates []*weather.WeatherUpdate, limit int, opts *pb.GetWeatherOpts) []*weather.WeatherUpdate { | ||||||
|  | 	if opts == nil { | ||||||
|  | 		return updates | ||||||
|  | 	} else if opts.StationName == nil && opts.StationType == nil { | ||||||
|  | 		return updates | ||||||
|  | 	} | ||||||
|  |  | ||||||
|  | 	filtered := make([]*weather.WeatherUpdate, 0, limit) | ||||||
|  |  | ||||||
|  | 	for i := len(updates) - 1; i >= 0; i-- { | ||||||
|  | 		update := updates[i] | ||||||
|  | 		match := true | ||||||
|  |  | ||||||
|  | 		if opts.GetStationName() != "" { | ||||||
|  | 			if update.GetStationName() != opts.GetStationName() { | ||||||
|  | 				match = false | ||||||
|  | 			} | ||||||
|  | 		} | ||||||
|  | 		if opts.GetStationType() != "" { | ||||||
|  | 			if util.DerefStr(update.StationType) != opts.GetStationType() { | ||||||
|  | 				match = false | ||||||
|  | 			} | ||||||
|  | 		} | ||||||
|  |  | ||||||
|  | 		if match { | ||||||
|  | 			filtered = append(filtered, update) | ||||||
|  | 			if len(filtered) >= limit { | ||||||
|  | 				return filtered | ||||||
|  | 			} | ||||||
|  | 		} | ||||||
|  | 	} | ||||||
|  |  | ||||||
|  | 	return slices.Clip(filtered) | ||||||
|  | } | ||||||
|  |  | ||||||
| func (r *MemoryRecorder) Count(ctx context.Context) int { | func (r *MemoryRecorder) Count(ctx context.Context) int { | ||||||
| 	_, span := r.tracer.Start(ctx, "countWeatherRecorder") | 	_, span := r.tracer.Start(ctx, "countWeatherRecorder") | ||||||
| 	defer span.End() | 	defer span.End() | ||||||
| @@ -73,3 +99,10 @@ func (r *MemoryRecorder) Count(ctx context.Context) int { | |||||||
| func (r *MemoryRecorder) count() int { | func (r *MemoryRecorder) count() int { | ||||||
| 	return len(r.updates) | 	return len(r.updates) | ||||||
| } | } | ||||||
|  |  | ||||||
|  | func limitUpdates(updates []*weather.WeatherUpdate, limit int) []*weather.WeatherUpdate { | ||||||
|  | 	if len(updates) > limit { | ||||||
|  | 		return updates[len(updates)-limit:] | ||||||
|  | 	} | ||||||
|  | 	return updates | ||||||
|  | } | ||||||
|   | |||||||
		Reference in New Issue
	
	Block a user