refactor state to recorder, finish implementing
This commit is contained in:
		
							
								
								
									
										42
									
								
								pkg/weather/recorder/recorder.go
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										42
									
								
								pkg/weather/recorder/recorder.go
									
									
									
									
									
										Normal file
									
								
							| @@ -0,0 +1,42 @@ | ||||
| package recorder | ||||
|  | ||||
| import ( | ||||
| 	"context" | ||||
| 	"sync" | ||||
|  | ||||
| 	"go.opentelemetry.io/otel/metric" | ||||
| 	"go.opentelemetry.io/otel/trace" | ||||
|  | ||||
| 	"gitea.libretechconsulting.com/rmcguire/go-app/pkg/otel" | ||||
|  | ||||
| 	"gitea.libretechconsulting.com/rmcguire/ambient-local-exporter/pkg/weather" | ||||
| ) | ||||
|  | ||||
| type WeatherRecorder struct { | ||||
| 	updates []*weather.WeatherUpdate | ||||
| 	keep    int | ||||
| 	ctx     context.Context | ||||
| 	tracer  trace.Tracer | ||||
| 	meter   metric.Meter | ||||
| 	*sync.RWMutex | ||||
| } | ||||
|  | ||||
| type Opts struct { | ||||
| 	Ctx      context.Context | ||||
| 	KeepLast int | ||||
| } | ||||
|  | ||||
| func NewWeatherRecorder(opts *Opts) *WeatherRecorder { | ||||
| 	if opts.KeepLast < 1 { | ||||
| 		opts.KeepLast = 1 | ||||
| 	} | ||||
|  | ||||
| 	return &WeatherRecorder{ | ||||
| 		updates: make([]*weather.WeatherUpdate, 0, opts.KeepLast), | ||||
| 		keep:    opts.KeepLast, | ||||
| 		ctx:     opts.Ctx, | ||||
| 		tracer:  otel.GetTracer(opts.Ctx, "weatherRecorder"), | ||||
| 		meter:   otel.GetMeter(opts.Ctx, "weatherRecorder"), | ||||
| 		RWMutex: &sync.RWMutex{}, | ||||
| 	} | ||||
| } | ||||
							
								
								
									
										90
									
								
								pkg/weather/recorder/recorder_get.go
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										90
									
								
								pkg/weather/recorder/recorder_get.go
									
									
									
									
									
										Normal file
									
								
							| @@ -0,0 +1,90 @@ | ||||
| package recorder | ||||
|  | ||||
| import ( | ||||
| 	"context" | ||||
| 	"errors" | ||||
|  | ||||
| 	"go.opentelemetry.io/otel/attribute" | ||||
| 	"go.opentelemetry.io/otel/codes" | ||||
| 	"go.opentelemetry.io/otel/trace" | ||||
|  | ||||
| 	"gitea.libretechconsulting.com/rmcguire/ambient-local-exporter/pkg/weather" | ||||
| ) | ||||
|  | ||||
| // Returns last requested number of weather updates | ||||
| // If negative number given, will return all weather observations | ||||
| func (w *WeatherRecorder) Get(ctx context.Context, last int) ( | ||||
| 	[]*weather.WeatherUpdate, error, | ||||
| ) { | ||||
| 	if last < 0 { | ||||
| 		last = w.keep | ||||
| 	} else if last < 1 { | ||||
| 		last = 1 | ||||
| 	} | ||||
|  | ||||
| 	ctx, span := w.tracer.Start(ctx, "getWeatherRecorder") | ||||
| 	span.SetAttributes( | ||||
| 		attribute.Int("last", last), | ||||
| 		attribute.Int("keep", w.keep), | ||||
| 		attribute.Int("currentSize", w.Count()), | ||||
| 	) | ||||
| 	defer span.End() | ||||
|  | ||||
| 	updates, err := w.get(ctx, last) | ||||
| 	if err != nil { | ||||
| 		span.RecordError(err) | ||||
| 		span.SetStatus(codes.Error, err.Error()) | ||||
| 	} else { | ||||
| 		span.SetStatus(codes.Ok, "") | ||||
| 	} | ||||
|  | ||||
| 	return updates, err | ||||
| } | ||||
|  | ||||
| func (w *WeatherRecorder) get(ctx context.Context, last int) ( | ||||
| 	[]*weather.WeatherUpdate, error, | ||||
| ) { | ||||
| 	span := trace.SpanFromContext(ctx) | ||||
|  | ||||
| 	w.RLock() | ||||
| 	defer w.Unlock() | ||||
|  | ||||
| 	span.AddEvent("acquired lock on recorder cache") | ||||
|  | ||||
| 	updates := w.updates | ||||
|  | ||||
| 	if w.count() == 0 { | ||||
| 		err := errors.New("no recorded updates to get") | ||||
| 		span.RecordError(err) | ||||
| 		return nil, err | ||||
| 	} else if w.count() <= last { | ||||
| 		span.RecordError(errors.New("requested more updates than recorded")) | ||||
| 	} else { | ||||
| 		updates = updates[len(updates)-last:] | ||||
| 	} | ||||
|  | ||||
| 	span.SetAttributes(attribute.Int("retrieved", len(updates))) | ||||
| 	span.SetStatus(codes.Ok, "") | ||||
|  | ||||
| 	return updates, nil | ||||
| } | ||||
|  | ||||
| // Returns count of retained weather updates | ||||
| func (w *WeatherRecorder) Count() int { | ||||
| 	_, span := w.tracer.Start(w.ctx, "countWeatherRecorder") | ||||
| 	defer span.End() | ||||
|  | ||||
| 	count := w.count() | ||||
|  | ||||
| 	span.SetAttributes(attribute.Int("count", count)) | ||||
| 	span.SetStatus(codes.Ok, "") | ||||
|  | ||||
| 	return count | ||||
| } | ||||
|  | ||||
| func (w *WeatherRecorder) count() int { | ||||
| 	w.RLock() | ||||
| 	defer w.RUnlock() | ||||
|  | ||||
| 	return len(w.updates) | ||||
| } | ||||
							
								
								
									
										35
									
								
								pkg/weather/recorder/recorder_set.go
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										35
									
								
								pkg/weather/recorder/recorder_set.go
									
									
									
									
									
										Normal file
									
								
							| @@ -0,0 +1,35 @@ | ||||
| package recorder | ||||
|  | ||||
| import ( | ||||
| 	"context" | ||||
|  | ||||
| 	"go.opentelemetry.io/otel/attribute" | ||||
| 	"go.opentelemetry.io/otel/trace" | ||||
|  | ||||
| 	"gitea.libretechconsulting.com/rmcguire/ambient-local-exporter/pkg/weather" | ||||
| ) | ||||
|  | ||||
| func (w *WeatherRecorder) Set(ctx context.Context, u *weather.WeatherUpdate) error { | ||||
| 	_, span := w.tracer.Start(ctx, "recordWeatherUpdate") | ||||
| 	span.SetAttributes( | ||||
| 		attribute.Int("countWeatherUpdates", w.Count()), | ||||
| 		attribute.Int("keepUpdates", w.keep), | ||||
| 	) | ||||
| 	defer span.End() | ||||
|  | ||||
| 	return w.set(span, u) | ||||
| } | ||||
|  | ||||
| func (w *WeatherRecorder) set(span trace.Span, u *weather.WeatherUpdate) error { | ||||
| 	w.Lock() | ||||
| 	defer w.Unlock() | ||||
|  | ||||
| 	if len(w.updates) > w.keep { | ||||
| 		w.updates = w.updates[1:] | ||||
| 		span.AddEvent("trimmed recorded updates by 1") | ||||
| 	} | ||||
|  | ||||
| 	w.updates = append(w.updates, u) | ||||
| 	span.AddEvent("recorded weather update") | ||||
| 	return nil | ||||
| } | ||||
		Reference in New Issue
	
	Block a user