From a5abbbec1f464c753463bae4c2169de6b5ec0fcf Mon Sep 17 00:00:00 2001 From: Ryan D McGuire Date: Fri, 21 Mar 2025 15:54:28 -0400 Subject: [PATCH] move recorder to interface --- main.go | 2 +- pkg/ambient/ambient.go | 23 +++--- pkg/util/util.go | 17 +++++ pkg/weather/grpc/mapupdate.go | 35 +++------ pkg/weather/grpc/weather.go | 2 +- pkg/weather/recorder/recorder.go | 33 +++++--- pkg/weather/recorder/recorder_get.go | 68 +++++------------ pkg/weather/recorder/recorder_set.go | 28 ++----- pkg/weather/recorder/recorders/memory/get.go | 75 +++++++++++++++++++ .../recorder/recorders/memory/memory.go | 35 +++++++++ pkg/weather/recorder/recorders/memory/set.go | 38 ++++++++++ pkg/weather/recorder/recorders/recorders.go | 20 +++++ 12 files changed, 256 insertions(+), 120 deletions(-) create mode 100644 pkg/util/util.go create mode 100644 pkg/weather/recorder/recorders/memory/get.go create mode 100644 pkg/weather/recorder/recorders/memory/memory.go create mode 100644 pkg/weather/recorder/recorders/memory/set.go create mode 100644 pkg/weather/recorder/recorders/recorders.go diff --git a/main.go b/main.go index ceb8212..8aa24ff 100644 --- a/main.go +++ b/main.go @@ -82,7 +82,7 @@ func prepareApp(ctx context.Context, aw *ambient.AmbientWeather) *app.App { { Name: "Weather Service", Type: &weatherpb.AmbientLocalWeatherService_ServiceDesc, - Service: weathergrpc.NewGRPCWeather(ctx, aw.GetState()), + Service: weathergrpc.NewGRPCWeather(ctx, aw.GetRecorder()), }, }, }, diff --git a/pkg/ambient/ambient.go b/pkg/ambient/ambient.go index 99cd7bc..b93ee67 100644 --- a/pkg/ambient/ambient.go +++ b/pkg/ambient/ambient.go @@ -30,13 +30,13 @@ type AmbientWeather struct { // when either "AmbientWeather" or "Wunderground" are selected // in the "Custom" section of the AWNet app, or the web UI // of an Ambient WeatherHub - Config *config.AmbientLocalExporterConfig - awnProvider provider.AmbientProvider - wuProvider provider.AmbientProvider - weatherState *recorder.WeatherRecorder - appCtx context.Context - metrics *weather.WeatherMetrics - l *zerolog.Logger + Config *config.AmbientLocalExporterConfig + awnProvider provider.AmbientProvider + wuProvider provider.AmbientProvider + weatherRecorder *recorder.WeatherRecorder + appCtx context.Context + metrics *weather.WeatherMetrics + l *zerolog.Logger *sync.RWMutex } @@ -70,7 +70,8 @@ func (aw *AmbientWeather) Init() *AmbientWeather { } span.SetAttributes(attribute.Int("updatesToKeep", updatesToKeep)) - aw.weatherState = recorder.NewWeatherRecorder(&recorder.Opts{ + // TODO: Support other recorders (don't rely on default) + aw.weatherRecorder = recorder.NewWeatherRecorder(&recorder.Opts{ Ctx: aw.appCtx, KeepLast: updatesToKeep, }) @@ -142,7 +143,7 @@ func (aw *AmbientWeather) handleProviderRequest( } // Record state - aw.weatherState.Set(ctx, update) + aw.weatherRecorder.Set(ctx, update) // Update metrics aw.metricsUpdate(ctx, p, update) @@ -277,9 +278,9 @@ func (aw *AmbientWeather) enrichStation(update *weather.WeatherUpdate) { } } -func (aw *AmbientWeather) GetState() *recorder.WeatherRecorder { +func (aw *AmbientWeather) GetRecorder() *recorder.WeatherRecorder { aw.RLock() defer aw.RUnlock() - return aw.weatherState + return aw.weatherRecorder } diff --git a/pkg/util/util.go b/pkg/util/util.go new file mode 100644 index 0000000..8e07e36 --- /dev/null +++ b/pkg/util/util.go @@ -0,0 +1,17 @@ +package util + +import "k8s.io/utils/ptr" + +func DerefStr(s *string) string { + if s == nil { + return "" + } + return *s +} + +func Int32ptr(i *int) *int32 { + if i == nil { + return nil + } + return ptr.To(int32(*i)) +} diff --git a/pkg/weather/grpc/mapupdate.go b/pkg/weather/grpc/mapupdate.go index e58ab30..32d2135 100644 --- a/pkg/weather/grpc/mapupdate.go +++ b/pkg/weather/grpc/mapupdate.go @@ -1,9 +1,8 @@ package grpc import ( - "k8s.io/utils/ptr" - pb "gitea.libretechconsulting.com/rmcguire/ambient-local-exporter/api/v1alpha1/weather" + "gitea.libretechconsulting.com/rmcguire/ambient-local-exporter/pkg/util" "gitea.libretechconsulting.com/rmcguire/ambient-local-exporter/pkg/weather" ) @@ -18,18 +17,18 @@ func UpdatesToPbUpdates(u []*weather.WeatherUpdate) []*pb.WeatherUpdate { func UpdateToPbUpdate(u *weather.WeatherUpdate) *pb.WeatherUpdate { return &pb.WeatherUpdate{ StationName: u.StationConfig.Name, - StationType: derefStr(u.StationType), - StationId: derefStr(u.StationID), + StationType: util.DerefStr(u.StationType), + StationId: util.DerefStr(u.StationID), TempOutdoorF: u.TempOutdoorF, TempIndoorF: u.TempIndoorF, - HumidityOutdoor: int32ptr(u.HumidityOudoor), - HumidityIndoor: int32ptr(u.HumidityIndoor), + HumidityOutdoor: util.Int32ptr(u.HumidityOudoor), + HumidityIndoor: util.Int32ptr(u.HumidityIndoor), WindSpeedMph: u.WindSpeedMPH, WindGustMph: u.WindGustMPH, MaxDailyGust: u.MaxDailyGust, - WindDir: int32ptr(u.WindDir), - WindDirAvg_10M: int32ptr(u.WindDirAvg10m), - Uv: int32ptr(u.UV), + WindDir: util.Int32ptr(u.WindDir), + WindDirAvg_10M: util.Int32ptr(u.WindDirAvg10m), + Uv: util.Int32ptr(u.UV), SolarRadiation: u.SolarRadiation, HourlyRainIn: u.HourlyRainIn, EventRainIn: u.EventRainIn, @@ -52,7 +51,7 @@ func batteriesToPbBatteries(batteries []weather.BatteryStatus) []*pb.BatteryStat for i, b := range batteries { pbBatteries[i] = &pb.BatteryStatus{ Component: b.Component, - Status: int32ptr(b.Status), + Status: util.Int32ptr(b.Status), } } return pbBatteries @@ -64,22 +63,8 @@ func thSensorsToPbSensors(sensors []*weather.TempHumiditySensor) []*pb.TempHumid pbSensors[i] = &pb.TempHumiditySensor{ Name: s.Name, TempF: s.TempF, - Humidity: int32ptr(s.Humidity), + Humidity: util.Int32ptr(s.Humidity), } } return pbSensors } - -func derefStr(s *string) string { - if s == nil { - return "" - } - return *s -} - -func int32ptr(i *int) *int32 { - if i == nil { - return nil - } - return ptr.To(int32(*i)) -} diff --git a/pkg/weather/grpc/weather.go b/pkg/weather/grpc/weather.go index 28c0bf6..e4545d8 100644 --- a/pkg/weather/grpc/weather.go +++ b/pkg/weather/grpc/weather.go @@ -46,7 +46,7 @@ func (w *GRPCWeather) GetWeather(ctx context.Context, req *pb.GetWeatherRequest) span.SetAttributes(attribute.Int("limit", limit)) - updates, err := w.recorder.Get(ctx, limit) + updates, err := w.recorder.Get(ctx, req) if err != nil { span.RecordError(err) span.SetStatus(otelcodes.Error, err.Error()) diff --git a/pkg/weather/recorder/recorder.go b/pkg/weather/recorder/recorder.go index 9b29d87..88b8ca5 100644 --- a/pkg/weather/recorder/recorder.go +++ b/pkg/weather/recorder/recorder.go @@ -9,20 +9,21 @@ import ( "gitea.libretechconsulting.com/rmcguire/go-app/pkg/otel" - "gitea.libretechconsulting.com/rmcguire/ambient-local-exporter/pkg/weather" + "gitea.libretechconsulting.com/rmcguire/ambient-local-exporter/pkg/weather/recorder/recorders" + "gitea.libretechconsulting.com/rmcguire/ambient-local-exporter/pkg/weather/recorder/recorders/memory" ) type WeatherRecorder struct { - updates []*weather.WeatherUpdate - keep int - ctx context.Context - tracer trace.Tracer - meter metric.Meter + recorder recorders.Recorder + ctx context.Context + tracer trace.Tracer + meter metric.Meter *sync.RWMutex } type Opts struct { Ctx context.Context + Recorder recorders.Recorder // If nil, will use memory recorder KeepLast int } @@ -31,12 +32,20 @@ func NewWeatherRecorder(opts *Opts) *WeatherRecorder { opts.KeepLast = 1 } + if opts.Recorder == nil { + opts.Recorder = &memory.MemoryRecorder{} + } + + opts.Recorder.Init(opts.Ctx, &recorders.RecorderOpts{ + RetainLast: opts.KeepLast, + BaseCtx: opts.Ctx, + }) + return &WeatherRecorder{ - updates: make([]*weather.WeatherUpdate, 0, opts.KeepLast), - keep: opts.KeepLast, - ctx: opts.Ctx, - tracer: otel.GetTracer(opts.Ctx, "weatherRecorder"), - meter: otel.GetMeter(opts.Ctx, "weatherRecorder"), - RWMutex: &sync.RWMutex{}, + ctx: opts.Ctx, + recorder: opts.Recorder, + tracer: otel.GetTracer(opts.Ctx, "weatherRecorder"), + meter: otel.GetMeter(opts.Ctx, "weatherRecorder"), + RWMutex: &sync.RWMutex{}, } } diff --git a/pkg/weather/recorder/recorder_get.go b/pkg/weather/recorder/recorder_get.go index 534ec58..676e632 100644 --- a/pkg/weather/recorder/recorder_get.go +++ b/pkg/weather/recorder/recorder_get.go @@ -2,35 +2,41 @@ package recorder import ( "context" - "errors" "go.opentelemetry.io/otel/attribute" "go.opentelemetry.io/otel/codes" - "go.opentelemetry.io/otel/trace" + "k8s.io/utils/ptr" + pb "gitea.libretechconsulting.com/rmcguire/ambient-local-exporter/api/v1alpha1/weather" + "gitea.libretechconsulting.com/rmcguire/ambient-local-exporter/pkg/util" "gitea.libretechconsulting.com/rmcguire/ambient-local-exporter/pkg/weather" ) // Returns last requested number of weather updates // If negative number given, will return all weather observations -func (w *WeatherRecorder) Get(ctx context.Context, last int) ( +func (w *WeatherRecorder) Get(ctx context.Context, req *pb.GetWeatherRequest) ( []*weather.WeatherUpdate, error, ) { - if last < 0 { - last = w.keep - } else if last < 1 { - last = 1 + if req == nil { + req = &pb.GetWeatherRequest{Limit: ptr.To(int32(-1))} + } + + if req.Limit == nil || *req.Limit < 0 { + req.Limit = ptr.To(int32(-1)) + } else if *req.Limit == 0 { + req.Limit = ptr.To(int32(1)) } ctx, span := w.tracer.Start(ctx, "getWeatherRecorder") span.SetAttributes( - attribute.Int("last", last), - attribute.Int("keep", w.keep), + attribute.String("stationNameFilter", util.DerefStr(req.Opts.StationName)), + attribute.String("stationTypeFilter", util.DerefStr(req.Opts.StationType)), + attribute.Int("last", int(*req.Limit)), attribute.Int("currentSize", w.Count(ctx)), ) defer span.End() - updates, err := w.get(ctx, last) + updates, err := w.recorder.Get(ctx, req) if err != nil { span.RecordError(err) span.SetStatus(codes.Error, err.Error()) @@ -41,50 +47,14 @@ func (w *WeatherRecorder) Get(ctx context.Context, last int) ( return updates, err } -func (w *WeatherRecorder) get(ctx context.Context, last int) ( - []*weather.WeatherUpdate, error, -) { - span := trace.SpanFromContext(ctx) - - w.RLock() - defer w.RUnlock() - - span.AddEvent("acquired lock on recorder cache") - - updates := w.updates - - if w.count() == 0 { - err := errors.New("no recorded updates to get") - span.RecordError(err) - return nil, err - } else if w.count() <= last { - span.RecordError(errors.New("requested more updates than recorded")) - } else { - updates = updates[len(updates)-last:] - } - - span.SetAttributes(attribute.Int("retrieved", len(updates))) - span.SetStatus(codes.Ok, "") - - return updates, nil -} - // Returns count of retained weather updates func (w *WeatherRecorder) Count(ctx context.Context) int { - _, span := w.tracer.Start(ctx, "countWeatherRecorder") + ctx, span := w.tracer.Start(ctx, "countWeatherRecorder") defer span.End() - count := w.count() - + count := w.recorder.Count(ctx) span.SetAttributes(attribute.Int("count", count)) - span.SetStatus(codes.Ok, "") + span.SetStatus(codes.Ok, "") return count } - -func (w *WeatherRecorder) count() int { - w.RLock() - defer w.RUnlock() - - return len(w.updates) -} diff --git a/pkg/weather/recorder/recorder_set.go b/pkg/weather/recorder/recorder_set.go index 489f4ab..b711b6a 100644 --- a/pkg/weather/recorder/recorder_set.go +++ b/pkg/weather/recorder/recorder_set.go @@ -6,30 +6,16 @@ import ( "go.opentelemetry.io/otel/attribute" "go.opentelemetry.io/otel/trace" + "gitea.libretechconsulting.com/rmcguire/ambient-local-exporter/pkg/util" "gitea.libretechconsulting.com/rmcguire/ambient-local-exporter/pkg/weather" ) func (w *WeatherRecorder) Set(ctx context.Context, u *weather.WeatherUpdate) error { - ctx, span := w.tracer.Start(ctx, "recordWeatherUpdate") - span.SetAttributes( - attribute.Int("countWeatherUpdates", w.Count(ctx)), - attribute.Int("keepUpdates", w.keep), - ) + ctx, span := w.tracer.Start(ctx, "setRecorderUpdate", trace.WithAttributes( + attribute.String("stationName", u.StationConfig.Name), + attribute.String("stationType", util.DerefStr(u.StationType)), + attribute.String("stationEquipment", u.StationConfig.Equipment), + )) defer span.End() - - return w.set(span, u) -} - -func (w *WeatherRecorder) set(span trace.Span, u *weather.WeatherUpdate) error { - w.Lock() - defer w.Unlock() - - if len(w.updates) > w.keep { - w.updates = w.updates[1:] - span.AddEvent("trimmed recorded updates by 1") - } - - w.updates = append(w.updates, u) - span.AddEvent("recorded weather update") - return nil + return w.recorder.Set(ctx, u) } diff --git a/pkg/weather/recorder/recorders/memory/get.go b/pkg/weather/recorder/recorders/memory/get.go new file mode 100644 index 0000000..dffda8f --- /dev/null +++ b/pkg/weather/recorder/recorders/memory/get.go @@ -0,0 +1,75 @@ +package memory + +import ( + "context" + "errors" + "slices" + + "go.opentelemetry.io/otel/attribute" + "go.opentelemetry.io/otel/codes" + + pb "gitea.libretechconsulting.com/rmcguire/ambient-local-exporter/api/v1alpha1/weather" + "gitea.libretechconsulting.com/rmcguire/ambient-local-exporter/pkg/weather" +) + +func (r *MemoryRecorder) Get(ctx context.Context, req *pb.GetWeatherRequest) ( + []*weather.WeatherUpdate, error, +) { + ctx, span := r.tracer.Start(ctx, "memoryRecorder.Get") + defer span.End() + + r.RLock() + defer r.RUnlock() + + span.AddEvent("acquired lock on recorder cache") + + updates := r.updates + + if r.count() == 0 { + err := errors.New("no recorded updates to get") + span.RecordError(err) + return nil, err + } else if r.count() <= int(*req.Limit) { + span.RecordError(errors.New("requested more updates than recorded")) + } + + // Filter by Station Name if requested + if req.Opts.StationName != nil && *req.Opts.StationName != "" { + updates = slices.DeleteFunc(updates, func(u *weather.WeatherUpdate) bool { + return u.StationConfig == nil || u.StationConfig.Name != *req.Opts.StationName + }) + } + + // Filter by Station Type if requested + if req.Opts.StationType != nil && *req.Opts.StationType != "" { + updates = slices.DeleteFunc(updates, func(u *weather.WeatherUpdate) bool { + return u.StationType == nil || *u.StationType != *req.Opts.StationType + }) + } + + // Limit results + if len(updates) > int(*req.Limit) { + updates = updates[len(updates)-int(*req.Limit):] + } + + span.SetAttributes(attribute.Int("retrieved", len(updates))) + span.SetStatus(codes.Ok, "") + + return updates, nil +} + +func (r *MemoryRecorder) Count(ctx context.Context) int { + _, span := r.tracer.Start(ctx, "countWeatherRecorder") + defer span.End() + + count := r.count() + + span.SetAttributes(attribute.Int("count", count)) + span.SetStatus(codes.Ok, "") + + return count +} + +func (r *MemoryRecorder) count() int { + return len(r.updates) +} diff --git a/pkg/weather/recorder/recorders/memory/memory.go b/pkg/weather/recorder/recorders/memory/memory.go new file mode 100644 index 0000000..217c9f0 --- /dev/null +++ b/pkg/weather/recorder/recorders/memory/memory.go @@ -0,0 +1,35 @@ +package memory + +import ( + "context" + "sync" + + "go.opentelemetry.io/otel/trace" + + "gitea.libretechconsulting.com/rmcguire/go-app/pkg/otel" + + "gitea.libretechconsulting.com/rmcguire/ambient-local-exporter/pkg/weather" + "gitea.libretechconsulting.com/rmcguire/ambient-local-exporter/pkg/weather/recorder/recorders" +) + +const defRetainLast = 120 + +type MemoryRecorder struct { + baseCtx context.Context + updates []*weather.WeatherUpdate + tracer trace.Tracer + keep int + *sync.RWMutex +} + +func (r *MemoryRecorder) Init(ctx context.Context, opts *recorders.RecorderOpts) { + if opts.RetainLast < 1 { + opts.RetainLast = defRetainLast + } + + r.updates = make([]*weather.WeatherUpdate, 0, opts.RetainLast) + r.keep = opts.RetainLast + r.baseCtx = opts.BaseCtx + r.RWMutex = &sync.RWMutex{} + r.tracer = otel.GetTracer(r.baseCtx, "memoryRecorder") +} diff --git a/pkg/weather/recorder/recorders/memory/set.go b/pkg/weather/recorder/recorders/memory/set.go new file mode 100644 index 0000000..c4cf6e2 --- /dev/null +++ b/pkg/weather/recorder/recorders/memory/set.go @@ -0,0 +1,38 @@ +package memory + +import ( + "context" + + "go.opentelemetry.io/otel/attribute" + "go.opentelemetry.io/otel/trace" + + "gitea.libretechconsulting.com/rmcguire/ambient-local-exporter/pkg/weather" +) + +func (r *MemoryRecorder) Set(ctx context.Context, u *weather.WeatherUpdate) error { + r.Lock() + defer r.Unlock() + + ctx, span := r.tracer.Start(ctx, "memoryRecorder.Set") + span.SetAttributes( + attribute.Int("countWeatherUpdates", r.Count(ctx)), + attribute.Int("keepUpdates", r.keep), + ) + defer span.End() + + return r.set(ctx, u) +} + +func (r *MemoryRecorder) set(ctx context.Context, u *weather.WeatherUpdate) error { + span := trace.SpanFromContext(ctx) + + if len(r.updates) > r.keep { + r.updates = r.updates[1:] + span.AddEvent("trimmed recorded updates by 1") + } + + r.updates = append(r.updates, u) + span.AddEvent("recorded weather update") + + return nil +} diff --git a/pkg/weather/recorder/recorders/recorders.go b/pkg/weather/recorder/recorders/recorders.go new file mode 100644 index 0000000..cf73f8a --- /dev/null +++ b/pkg/weather/recorder/recorders/recorders.go @@ -0,0 +1,20 @@ +package recorders + +import ( + "context" + + pb "gitea.libretechconsulting.com/rmcguire/ambient-local-exporter/api/v1alpha1/weather" + "gitea.libretechconsulting.com/rmcguire/ambient-local-exporter/pkg/weather" +) + +type RecorderOpts struct { + RetainLast int + BaseCtx context.Context +} + +type Recorder interface { + Init(context.Context, *RecorderOpts) + Set(context.Context, *weather.WeatherUpdate) error + Get(context.Context, *pb.GetWeatherRequest) ([]*weather.WeatherUpdate, error) + Count(context.Context) int // Best Effort +}