move recorder to interface
This commit is contained in:
		| @@ -9,20 +9,21 @@ import ( | ||||
|  | ||||
| 	"gitea.libretechconsulting.com/rmcguire/go-app/pkg/otel" | ||||
|  | ||||
| 	"gitea.libretechconsulting.com/rmcguire/ambient-local-exporter/pkg/weather" | ||||
| 	"gitea.libretechconsulting.com/rmcguire/ambient-local-exporter/pkg/weather/recorder/recorders" | ||||
| 	"gitea.libretechconsulting.com/rmcguire/ambient-local-exporter/pkg/weather/recorder/recorders/memory" | ||||
| ) | ||||
|  | ||||
| type WeatherRecorder struct { | ||||
| 	updates []*weather.WeatherUpdate | ||||
| 	keep    int | ||||
| 	ctx     context.Context | ||||
| 	tracer  trace.Tracer | ||||
| 	meter   metric.Meter | ||||
| 	recorder recorders.Recorder | ||||
| 	ctx      context.Context | ||||
| 	tracer   trace.Tracer | ||||
| 	meter    metric.Meter | ||||
| 	*sync.RWMutex | ||||
| } | ||||
|  | ||||
| type Opts struct { | ||||
| 	Ctx      context.Context | ||||
| 	Recorder recorders.Recorder // If nil, will use memory recorder | ||||
| 	KeepLast int | ||||
| } | ||||
|  | ||||
| @@ -31,12 +32,20 @@ func NewWeatherRecorder(opts *Opts) *WeatherRecorder { | ||||
| 		opts.KeepLast = 1 | ||||
| 	} | ||||
|  | ||||
| 	if opts.Recorder == nil { | ||||
| 		opts.Recorder = &memory.MemoryRecorder{} | ||||
| 	} | ||||
|  | ||||
| 	opts.Recorder.Init(opts.Ctx, &recorders.RecorderOpts{ | ||||
| 		RetainLast: opts.KeepLast, | ||||
| 		BaseCtx:    opts.Ctx, | ||||
| 	}) | ||||
|  | ||||
| 	return &WeatherRecorder{ | ||||
| 		updates: make([]*weather.WeatherUpdate, 0, opts.KeepLast), | ||||
| 		keep:    opts.KeepLast, | ||||
| 		ctx:     opts.Ctx, | ||||
| 		tracer:  otel.GetTracer(opts.Ctx, "weatherRecorder"), | ||||
| 		meter:   otel.GetMeter(opts.Ctx, "weatherRecorder"), | ||||
| 		RWMutex: &sync.RWMutex{}, | ||||
| 		ctx:      opts.Ctx, | ||||
| 		recorder: opts.Recorder, | ||||
| 		tracer:   otel.GetTracer(opts.Ctx, "weatherRecorder"), | ||||
| 		meter:    otel.GetMeter(opts.Ctx, "weatherRecorder"), | ||||
| 		RWMutex:  &sync.RWMutex{}, | ||||
| 	} | ||||
| } | ||||
|   | ||||
| @@ -2,35 +2,41 @@ package recorder | ||||
|  | ||||
| import ( | ||||
| 	"context" | ||||
| 	"errors" | ||||
|  | ||||
| 	"go.opentelemetry.io/otel/attribute" | ||||
| 	"go.opentelemetry.io/otel/codes" | ||||
| 	"go.opentelemetry.io/otel/trace" | ||||
| 	"k8s.io/utils/ptr" | ||||
|  | ||||
| 	pb "gitea.libretechconsulting.com/rmcguire/ambient-local-exporter/api/v1alpha1/weather" | ||||
| 	"gitea.libretechconsulting.com/rmcguire/ambient-local-exporter/pkg/util" | ||||
| 	"gitea.libretechconsulting.com/rmcguire/ambient-local-exporter/pkg/weather" | ||||
| ) | ||||
|  | ||||
| // Returns last requested number of weather updates | ||||
| // If negative number given, will return all weather observations | ||||
| func (w *WeatherRecorder) Get(ctx context.Context, last int) ( | ||||
| func (w *WeatherRecorder) Get(ctx context.Context, req *pb.GetWeatherRequest) ( | ||||
| 	[]*weather.WeatherUpdate, error, | ||||
| ) { | ||||
| 	if last < 0 { | ||||
| 		last = w.keep | ||||
| 	} else if last < 1 { | ||||
| 		last = 1 | ||||
| 	if req == nil { | ||||
| 		req = &pb.GetWeatherRequest{Limit: ptr.To(int32(-1))} | ||||
| 	} | ||||
|  | ||||
| 	if req.Limit == nil || *req.Limit < 0 { | ||||
| 		req.Limit = ptr.To(int32(-1)) | ||||
| 	} else if *req.Limit == 0 { | ||||
| 		req.Limit = ptr.To(int32(1)) | ||||
| 	} | ||||
|  | ||||
| 	ctx, span := w.tracer.Start(ctx, "getWeatherRecorder") | ||||
| 	span.SetAttributes( | ||||
| 		attribute.Int("last", last), | ||||
| 		attribute.Int("keep", w.keep), | ||||
| 		attribute.String("stationNameFilter", util.DerefStr(req.Opts.StationName)), | ||||
| 		attribute.String("stationTypeFilter", util.DerefStr(req.Opts.StationType)), | ||||
| 		attribute.Int("last", int(*req.Limit)), | ||||
| 		attribute.Int("currentSize", w.Count(ctx)), | ||||
| 	) | ||||
| 	defer span.End() | ||||
|  | ||||
| 	updates, err := w.get(ctx, last) | ||||
| 	updates, err := w.recorder.Get(ctx, req) | ||||
| 	if err != nil { | ||||
| 		span.RecordError(err) | ||||
| 		span.SetStatus(codes.Error, err.Error()) | ||||
| @@ -41,50 +47,14 @@ func (w *WeatherRecorder) Get(ctx context.Context, last int) ( | ||||
| 	return updates, err | ||||
| } | ||||
|  | ||||
| func (w *WeatherRecorder) get(ctx context.Context, last int) ( | ||||
| 	[]*weather.WeatherUpdate, error, | ||||
| ) { | ||||
| 	span := trace.SpanFromContext(ctx) | ||||
|  | ||||
| 	w.RLock() | ||||
| 	defer w.RUnlock() | ||||
|  | ||||
| 	span.AddEvent("acquired lock on recorder cache") | ||||
|  | ||||
| 	updates := w.updates | ||||
|  | ||||
| 	if w.count() == 0 { | ||||
| 		err := errors.New("no recorded updates to get") | ||||
| 		span.RecordError(err) | ||||
| 		return nil, err | ||||
| 	} else if w.count() <= last { | ||||
| 		span.RecordError(errors.New("requested more updates than recorded")) | ||||
| 	} else { | ||||
| 		updates = updates[len(updates)-last:] | ||||
| 	} | ||||
|  | ||||
| 	span.SetAttributes(attribute.Int("retrieved", len(updates))) | ||||
| 	span.SetStatus(codes.Ok, "") | ||||
|  | ||||
| 	return updates, nil | ||||
| } | ||||
|  | ||||
| // Returns count of retained weather updates | ||||
| func (w *WeatherRecorder) Count(ctx context.Context) int { | ||||
| 	_, span := w.tracer.Start(ctx, "countWeatherRecorder") | ||||
| 	ctx, span := w.tracer.Start(ctx, "countWeatherRecorder") | ||||
| 	defer span.End() | ||||
|  | ||||
| 	count := w.count() | ||||
|  | ||||
| 	count := w.recorder.Count(ctx) | ||||
| 	span.SetAttributes(attribute.Int("count", count)) | ||||
| 	span.SetStatus(codes.Ok, "") | ||||
|  | ||||
| 	span.SetStatus(codes.Ok, "") | ||||
| 	return count | ||||
| } | ||||
|  | ||||
| func (w *WeatherRecorder) count() int { | ||||
| 	w.RLock() | ||||
| 	defer w.RUnlock() | ||||
|  | ||||
| 	return len(w.updates) | ||||
| } | ||||
|   | ||||
| @@ -6,30 +6,16 @@ import ( | ||||
| 	"go.opentelemetry.io/otel/attribute" | ||||
| 	"go.opentelemetry.io/otel/trace" | ||||
|  | ||||
| 	"gitea.libretechconsulting.com/rmcguire/ambient-local-exporter/pkg/util" | ||||
| 	"gitea.libretechconsulting.com/rmcguire/ambient-local-exporter/pkg/weather" | ||||
| ) | ||||
|  | ||||
| func (w *WeatherRecorder) Set(ctx context.Context, u *weather.WeatherUpdate) error { | ||||
| 	ctx, span := w.tracer.Start(ctx, "recordWeatherUpdate") | ||||
| 	span.SetAttributes( | ||||
| 		attribute.Int("countWeatherUpdates", w.Count(ctx)), | ||||
| 		attribute.Int("keepUpdates", w.keep), | ||||
| 	) | ||||
| 	ctx, span := w.tracer.Start(ctx, "setRecorderUpdate", trace.WithAttributes( | ||||
| 		attribute.String("stationName", u.StationConfig.Name), | ||||
| 		attribute.String("stationType", util.DerefStr(u.StationType)), | ||||
| 		attribute.String("stationEquipment", u.StationConfig.Equipment), | ||||
| 	)) | ||||
| 	defer span.End() | ||||
|  | ||||
| 	return w.set(span, u) | ||||
| } | ||||
|  | ||||
| func (w *WeatherRecorder) set(span trace.Span, u *weather.WeatherUpdate) error { | ||||
| 	w.Lock() | ||||
| 	defer w.Unlock() | ||||
|  | ||||
| 	if len(w.updates) > w.keep { | ||||
| 		w.updates = w.updates[1:] | ||||
| 		span.AddEvent("trimmed recorded updates by 1") | ||||
| 	} | ||||
|  | ||||
| 	w.updates = append(w.updates, u) | ||||
| 	span.AddEvent("recorded weather update") | ||||
| 	return nil | ||||
| 	return w.recorder.Set(ctx, u) | ||||
| } | ||||
|   | ||||
							
								
								
									
										75
									
								
								pkg/weather/recorder/recorders/memory/get.go
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										75
									
								
								pkg/weather/recorder/recorders/memory/get.go
									
									
									
									
									
										Normal file
									
								
							| @@ -0,0 +1,75 @@ | ||||
| package memory | ||||
|  | ||||
| import ( | ||||
| 	"context" | ||||
| 	"errors" | ||||
| 	"slices" | ||||
|  | ||||
| 	"go.opentelemetry.io/otel/attribute" | ||||
| 	"go.opentelemetry.io/otel/codes" | ||||
|  | ||||
| 	pb "gitea.libretechconsulting.com/rmcguire/ambient-local-exporter/api/v1alpha1/weather" | ||||
| 	"gitea.libretechconsulting.com/rmcguire/ambient-local-exporter/pkg/weather" | ||||
| ) | ||||
|  | ||||
| func (r *MemoryRecorder) Get(ctx context.Context, req *pb.GetWeatherRequest) ( | ||||
| 	[]*weather.WeatherUpdate, error, | ||||
| ) { | ||||
| 	ctx, span := r.tracer.Start(ctx, "memoryRecorder.Get") | ||||
| 	defer span.End() | ||||
|  | ||||
| 	r.RLock() | ||||
| 	defer r.RUnlock() | ||||
|  | ||||
| 	span.AddEvent("acquired lock on recorder cache") | ||||
|  | ||||
| 	updates := r.updates | ||||
|  | ||||
| 	if r.count() == 0 { | ||||
| 		err := errors.New("no recorded updates to get") | ||||
| 		span.RecordError(err) | ||||
| 		return nil, err | ||||
| 	} else if r.count() <= int(*req.Limit) { | ||||
| 		span.RecordError(errors.New("requested more updates than recorded")) | ||||
| 	} | ||||
|  | ||||
| 	// Filter by Station Name if requested | ||||
| 	if req.Opts.StationName != nil && *req.Opts.StationName != "" { | ||||
| 		updates = slices.DeleteFunc(updates, func(u *weather.WeatherUpdate) bool { | ||||
| 			return u.StationConfig == nil || u.StationConfig.Name != *req.Opts.StationName | ||||
| 		}) | ||||
| 	} | ||||
|  | ||||
| 	// Filter by Station Type if requested | ||||
| 	if req.Opts.StationType != nil && *req.Opts.StationType != "" { | ||||
| 		updates = slices.DeleteFunc(updates, func(u *weather.WeatherUpdate) bool { | ||||
| 			return u.StationType == nil || *u.StationType != *req.Opts.StationType | ||||
| 		}) | ||||
| 	} | ||||
|  | ||||
| 	// Limit results | ||||
| 	if len(updates) > int(*req.Limit) { | ||||
| 		updates = updates[len(updates)-int(*req.Limit):] | ||||
| 	} | ||||
|  | ||||
| 	span.SetAttributes(attribute.Int("retrieved", len(updates))) | ||||
| 	span.SetStatus(codes.Ok, "") | ||||
|  | ||||
| 	return updates, nil | ||||
| } | ||||
|  | ||||
| func (r *MemoryRecorder) Count(ctx context.Context) int { | ||||
| 	_, span := r.tracer.Start(ctx, "countWeatherRecorder") | ||||
| 	defer span.End() | ||||
|  | ||||
| 	count := r.count() | ||||
|  | ||||
| 	span.SetAttributes(attribute.Int("count", count)) | ||||
| 	span.SetStatus(codes.Ok, "") | ||||
|  | ||||
| 	return count | ||||
| } | ||||
|  | ||||
| func (r *MemoryRecorder) count() int { | ||||
| 	return len(r.updates) | ||||
| } | ||||
							
								
								
									
										35
									
								
								pkg/weather/recorder/recorders/memory/memory.go
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										35
									
								
								pkg/weather/recorder/recorders/memory/memory.go
									
									
									
									
									
										Normal file
									
								
							| @@ -0,0 +1,35 @@ | ||||
| package memory | ||||
|  | ||||
| import ( | ||||
| 	"context" | ||||
| 	"sync" | ||||
|  | ||||
| 	"go.opentelemetry.io/otel/trace" | ||||
|  | ||||
| 	"gitea.libretechconsulting.com/rmcguire/go-app/pkg/otel" | ||||
|  | ||||
| 	"gitea.libretechconsulting.com/rmcguire/ambient-local-exporter/pkg/weather" | ||||
| 	"gitea.libretechconsulting.com/rmcguire/ambient-local-exporter/pkg/weather/recorder/recorders" | ||||
| ) | ||||
|  | ||||
| const defRetainLast = 120 | ||||
|  | ||||
| type MemoryRecorder struct { | ||||
| 	baseCtx context.Context | ||||
| 	updates []*weather.WeatherUpdate | ||||
| 	tracer  trace.Tracer | ||||
| 	keep    int | ||||
| 	*sync.RWMutex | ||||
| } | ||||
|  | ||||
| func (r *MemoryRecorder) Init(ctx context.Context, opts *recorders.RecorderOpts) { | ||||
| 	if opts.RetainLast < 1 { | ||||
| 		opts.RetainLast = defRetainLast | ||||
| 	} | ||||
|  | ||||
| 	r.updates = make([]*weather.WeatherUpdate, 0, opts.RetainLast) | ||||
| 	r.keep = opts.RetainLast | ||||
| 	r.baseCtx = opts.BaseCtx | ||||
| 	r.RWMutex = &sync.RWMutex{} | ||||
| 	r.tracer = otel.GetTracer(r.baseCtx, "memoryRecorder") | ||||
| } | ||||
							
								
								
									
										38
									
								
								pkg/weather/recorder/recorders/memory/set.go
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										38
									
								
								pkg/weather/recorder/recorders/memory/set.go
									
									
									
									
									
										Normal file
									
								
							| @@ -0,0 +1,38 @@ | ||||
| package memory | ||||
|  | ||||
| import ( | ||||
| 	"context" | ||||
|  | ||||
| 	"go.opentelemetry.io/otel/attribute" | ||||
| 	"go.opentelemetry.io/otel/trace" | ||||
|  | ||||
| 	"gitea.libretechconsulting.com/rmcguire/ambient-local-exporter/pkg/weather" | ||||
| ) | ||||
|  | ||||
| func (r *MemoryRecorder) Set(ctx context.Context, u *weather.WeatherUpdate) error { | ||||
| 	r.Lock() | ||||
| 	defer r.Unlock() | ||||
|  | ||||
| 	ctx, span := r.tracer.Start(ctx, "memoryRecorder.Set") | ||||
| 	span.SetAttributes( | ||||
| 		attribute.Int("countWeatherUpdates", r.Count(ctx)), | ||||
| 		attribute.Int("keepUpdates", r.keep), | ||||
| 	) | ||||
| 	defer span.End() | ||||
|  | ||||
| 	return r.set(ctx, u) | ||||
| } | ||||
|  | ||||
| func (r *MemoryRecorder) set(ctx context.Context, u *weather.WeatherUpdate) error { | ||||
| 	span := trace.SpanFromContext(ctx) | ||||
|  | ||||
| 	if len(r.updates) > r.keep { | ||||
| 		r.updates = r.updates[1:] | ||||
| 		span.AddEvent("trimmed recorded updates by 1") | ||||
| 	} | ||||
|  | ||||
| 	r.updates = append(r.updates, u) | ||||
| 	span.AddEvent("recorded weather update") | ||||
|  | ||||
| 	return nil | ||||
| } | ||||
							
								
								
									
										20
									
								
								pkg/weather/recorder/recorders/recorders.go
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										20
									
								
								pkg/weather/recorder/recorders/recorders.go
									
									
									
									
									
										Normal file
									
								
							| @@ -0,0 +1,20 @@ | ||||
| package recorders | ||||
|  | ||||
| import ( | ||||
| 	"context" | ||||
|  | ||||
| 	pb "gitea.libretechconsulting.com/rmcguire/ambient-local-exporter/api/v1alpha1/weather" | ||||
| 	"gitea.libretechconsulting.com/rmcguire/ambient-local-exporter/pkg/weather" | ||||
| ) | ||||
|  | ||||
| type RecorderOpts struct { | ||||
| 	RetainLast int | ||||
| 	BaseCtx    context.Context | ||||
| } | ||||
|  | ||||
| type Recorder interface { | ||||
| 	Init(context.Context, *RecorderOpts) | ||||
| 	Set(context.Context, *weather.WeatherUpdate) error | ||||
| 	Get(context.Context, *pb.GetWeatherRequest) ([]*weather.WeatherUpdate, error) | ||||
| 	Count(context.Context) int // Best Effort | ||||
| } | ||||
		Reference in New Issue
	
	Block a user