|
|
|
@ -9,6 +9,7 @@ import (
|
|
|
|
|
"go.opentelemetry.io/otel/codes"
|
|
|
|
|
|
|
|
|
|
pb "gitea.libretechconsulting.com/rmcguire/ambient-local-exporter/api/v1alpha1/weather"
|
|
|
|
|
"gitea.libretechconsulting.com/rmcguire/ambient-local-exporter/pkg/util"
|
|
|
|
|
"gitea.libretechconsulting.com/rmcguire/ambient-local-exporter/pkg/weather"
|
|
|
|
|
)
|
|
|
|
|
|
|
|
|
@ -23,8 +24,6 @@ func (r *MemoryRecorder) Get(ctx context.Context, req *pb.GetWeatherRequest) (
|
|
|
|
|
|
|
|
|
|
span.AddEvent("acquired lock on recorder cache")
|
|
|
|
|
|
|
|
|
|
updates := r.updates
|
|
|
|
|
|
|
|
|
|
if r.count() == 0 {
|
|
|
|
|
err := errors.New("no recorded updates to get")
|
|
|
|
|
span.RecordError(err)
|
|
|
|
@ -33,24 +32,8 @@ func (r *MemoryRecorder) Get(ctx context.Context, req *pb.GetWeatherRequest) (
|
|
|
|
|
span.RecordError(errors.New("requested more updates than recorded"))
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// Filter by Station Name if requested
|
|
|
|
|
if req.Opts.StationName != nil && *req.Opts.StationName != "" {
|
|
|
|
|
updates = slices.DeleteFunc(updates, func(u *weather.WeatherUpdate) bool {
|
|
|
|
|
return u.StationConfig == nil || u.StationConfig.Name != *req.Opts.StationName
|
|
|
|
|
})
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// Filter by Station Type if requested
|
|
|
|
|
if req.Opts.StationType != nil && *req.Opts.StationType != "" {
|
|
|
|
|
updates = slices.DeleteFunc(updates, func(u *weather.WeatherUpdate) bool {
|
|
|
|
|
return u.StationType == nil || *u.StationType != *req.Opts.StationType
|
|
|
|
|
})
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// Limit results
|
|
|
|
|
if len(updates) > int(*req.Limit) {
|
|
|
|
|
updates = updates[len(updates)-int(*req.Limit):]
|
|
|
|
|
}
|
|
|
|
|
updates := r.getUpdatesFromReq(req)
|
|
|
|
|
span.AddEvent("request limit/opts applied to updates")
|
|
|
|
|
|
|
|
|
|
span.SetAttributes(attribute.Int("retrieved", len(updates)))
|
|
|
|
|
span.SetStatus(codes.Ok, "")
|
|
|
|
@ -58,6 +41,49 @@ func (r *MemoryRecorder) Get(ctx context.Context, req *pb.GetWeatherRequest) (
|
|
|
|
|
return updates, nil
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
func (r *MemoryRecorder) getUpdatesFromReq(req *pb.GetWeatherRequest) []*weather.WeatherUpdate {
|
|
|
|
|
if req.Opts == nil {
|
|
|
|
|
return limitUpdates(r.updates, int(req.GetLimit()))
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
return r.applyOptsToUpdates(r.updates, int(req.GetLimit()), req.Opts)
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
func (r *MemoryRecorder) applyOptsToUpdates(updates []*weather.WeatherUpdate, limit int, opts *pb.GetWeatherOpts) []*weather.WeatherUpdate {
|
|
|
|
|
if opts == nil {
|
|
|
|
|
return updates
|
|
|
|
|
} else if opts.StationName == nil && opts.StationType == nil {
|
|
|
|
|
return updates
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
filtered := make([]*weather.WeatherUpdate, 0, limit)
|
|
|
|
|
|
|
|
|
|
for i := len(updates) - 1; i >= 0; i-- {
|
|
|
|
|
update := updates[i]
|
|
|
|
|
match := true
|
|
|
|
|
|
|
|
|
|
if opts.GetStationName() != "" {
|
|
|
|
|
if update.GetStationName() != opts.GetStationName() {
|
|
|
|
|
match = false
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
if opts.GetStationType() != "" {
|
|
|
|
|
if util.DerefStr(update.StationType) != opts.GetStationType() {
|
|
|
|
|
match = false
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
if match {
|
|
|
|
|
filtered = append(filtered, update)
|
|
|
|
|
if len(filtered) >= limit {
|
|
|
|
|
return filtered
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
return slices.Clip(filtered)
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
func (r *MemoryRecorder) Count(ctx context.Context) int {
|
|
|
|
|
_, span := r.tracer.Start(ctx, "countWeatherRecorder")
|
|
|
|
|
defer span.End()
|
|
|
|
@ -73,3 +99,10 @@ func (r *MemoryRecorder) Count(ctx context.Context) int {
|
|
|
|
|
func (r *MemoryRecorder) count() int {
|
|
|
|
|
return len(r.updates)
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
func limitUpdates(updates []*weather.WeatherUpdate, limit int) []*weather.WeatherUpdate {
|
|
|
|
|
if len(updates) > limit {
|
|
|
|
|
return updates[len(updates)-limit:]
|
|
|
|
|
}
|
|
|
|
|
return updates
|
|
|
|
|
}
|
|
|
|
|