recorder implementations
This commit is contained in:
parent
fb6941e6bd
commit
b483fc22a3
3
TODO.md
3
TODO.md
@ -1,10 +1,11 @@
|
|||||||
# TODO
|
# TODO
|
||||||
|
- [ ] Add json schema for config
|
||||||
- [ ] Finish implementing weather GRPC
|
- [ ] Finish implementing weather GRPC
|
||||||
- [ ] Update README
|
- [ ] Update README
|
||||||
- [ ] Add Grafana dashboard
|
- [ ] Add Grafana dashboard
|
||||||
- [ ] Add new spans
|
|
||||||
|
|
||||||
## Done
|
## Done
|
||||||
|
- [x] Add new spans
|
||||||
- [x] Helm Chart
|
- [x] Helm Chart
|
||||||
- [x] Add proxy to upstream support
|
- [x] Add proxy to upstream support
|
||||||
- [x] Fix wunderground 401
|
- [x] Fix wunderground 401
|
||||||
|
2
go.mod
2
go.mod
@ -21,6 +21,7 @@ require (
|
|||||||
github.com/caarlos0/env/v11 v11.3.1 // indirect
|
github.com/caarlos0/env/v11 v11.3.1 // indirect
|
||||||
github.com/cenkalti/backoff/v4 v4.3.0 // indirect
|
github.com/cenkalti/backoff/v4 v4.3.0 // indirect
|
||||||
github.com/cespare/xxhash/v2 v2.3.0 // indirect
|
github.com/cespare/xxhash/v2 v2.3.0 // indirect
|
||||||
|
github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f // indirect
|
||||||
github.com/felixge/httpsnoop v1.0.4 // indirect
|
github.com/felixge/httpsnoop v1.0.4 // indirect
|
||||||
github.com/go-logr/logr v1.4.2 // indirect
|
github.com/go-logr/logr v1.4.2 // indirect
|
||||||
github.com/go-logr/stdr v1.2.2 // indirect
|
github.com/go-logr/stdr v1.2.2 // indirect
|
||||||
@ -35,6 +36,7 @@ require (
|
|||||||
github.com/prometheus/client_model v0.6.1 // indirect
|
github.com/prometheus/client_model v0.6.1 // indirect
|
||||||
github.com/prometheus/common v0.62.0 // indirect
|
github.com/prometheus/common v0.62.0 // indirect
|
||||||
github.com/prometheus/procfs v0.15.1 // indirect
|
github.com/prometheus/procfs v0.15.1 // indirect
|
||||||
|
github.com/redis/go-redis/v9 v9.7.3 // indirect
|
||||||
go.opentelemetry.io/auto/sdk v1.1.0 // indirect
|
go.opentelemetry.io/auto/sdk v1.1.0 // indirect
|
||||||
go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc v0.60.0 // indirect
|
go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc v0.60.0 // indirect
|
||||||
go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.60.0 // indirect
|
go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.60.0 // indirect
|
||||||
|
4
go.sum
4
go.sum
@ -15,6 +15,8 @@ github.com/cespare/xxhash/v2 v2.3.0/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XL
|
|||||||
github.com/coreos/go-systemd/v22 v22.5.0/go.mod h1:Y58oyj3AT4RCenI/lSvhwexgC+NSVTIJ3seZv2GcEnc=
|
github.com/coreos/go-systemd/v22 v22.5.0/go.mod h1:Y58oyj3AT4RCenI/lSvhwexgC+NSVTIJ3seZv2GcEnc=
|
||||||
github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
|
github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
|
||||||
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
|
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
|
||||||
|
github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f h1:lO4WD4F/rVNCu3HqELle0jiPLLBs70cWOduZpkS1E78=
|
||||||
|
github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f/go.mod h1:cuUVRXasLTGF7a8hSLbxyZXjz+1KgoB3wDUb6vlszIc=
|
||||||
github.com/felixge/httpsnoop v1.0.4 h1:NFTV2Zj1bL4mc9sqWACXbQFVBBg2W3GPvqp8/ESS2Wg=
|
github.com/felixge/httpsnoop v1.0.4 h1:NFTV2Zj1bL4mc9sqWACXbQFVBBg2W3GPvqp8/ESS2Wg=
|
||||||
github.com/felixge/httpsnoop v1.0.4/go.mod h1:m8KPJKqk1gH5J9DgRY2ASl2lWCfGKXixSwevea8zH2U=
|
github.com/felixge/httpsnoop v1.0.4/go.mod h1:m8KPJKqk1gH5J9DgRY2ASl2lWCfGKXixSwevea8zH2U=
|
||||||
github.com/go-logr/logr v1.2.2/go.mod h1:jdQByPbusPIv2/zmleS9BjJVeZ6kBagPoEUsqbVz/1A=
|
github.com/go-logr/logr v1.2.2/go.mod h1:jdQByPbusPIv2/zmleS9BjJVeZ6kBagPoEUsqbVz/1A=
|
||||||
@ -65,6 +67,8 @@ github.com/prometheus/common v0.62.0 h1:xasJaQlnWAeyHdUBeGjXmutelfJHWMRr+Fg4QszZ
|
|||||||
github.com/prometheus/common v0.62.0/go.mod h1:vyBcEuLSvWos9B1+CyL7JZ2up+uFzXhkqml0W5zIY1I=
|
github.com/prometheus/common v0.62.0/go.mod h1:vyBcEuLSvWos9B1+CyL7JZ2up+uFzXhkqml0W5zIY1I=
|
||||||
github.com/prometheus/procfs v0.15.1 h1:YagwOFzUgYfKKHX6Dr+sHT7km/hxC76UB0learggepc=
|
github.com/prometheus/procfs v0.15.1 h1:YagwOFzUgYfKKHX6Dr+sHT7km/hxC76UB0learggepc=
|
||||||
github.com/prometheus/procfs v0.15.1/go.mod h1:fB45yRUv8NstnjriLhBQLuOUt+WW4BsoGhij/e3PBqk=
|
github.com/prometheus/procfs v0.15.1/go.mod h1:fB45yRUv8NstnjriLhBQLuOUt+WW4BsoGhij/e3PBqk=
|
||||||
|
github.com/redis/go-redis/v9 v9.7.3 h1:YpPyAayJV+XErNsatSElgRZZVCwXX9QzkKYNvO7x0wM=
|
||||||
|
github.com/redis/go-redis/v9 v9.7.3/go.mod h1:bGUrSggJ9X9GUmZpZNEOQKaANxSGgOEBRltRTZHSvrA=
|
||||||
github.com/rogpeppe/go-internal v1.13.1 h1:KvO1DLK/DRN07sQ1LQKScxyZJuNnedQ5/wKSR38lUII=
|
github.com/rogpeppe/go-internal v1.13.1 h1:KvO1DLK/DRN07sQ1LQKScxyZJuNnedQ5/wKSR38lUII=
|
||||||
github.com/rogpeppe/go-internal v1.13.1/go.mod h1:uMEvuHeurkdAXX61udpOXGD/AzZDWNMNyH2VO9fmH0o=
|
github.com/rogpeppe/go-internal v1.13.1/go.mod h1:uMEvuHeurkdAXX61udpOXGD/AzZDWNMNyH2VO9fmH0o=
|
||||||
github.com/rs/xid v1.5.0/go.mod h1:trrq9SKmegXys3aeAKXMUTdJsYXVwGY3RLcfgqegfbg=
|
github.com/rs/xid v1.5.0/go.mod h1:trrq9SKmegXys3aeAKXMUTdJsYXVwGY3RLcfgqegfbg=
|
||||||
|
@ -21,6 +21,10 @@ import (
|
|||||||
"gitea.libretechconsulting.com/rmcguire/ambient-local-exporter/pkg/provider/wunderground"
|
"gitea.libretechconsulting.com/rmcguire/ambient-local-exporter/pkg/provider/wunderground"
|
||||||
"gitea.libretechconsulting.com/rmcguire/ambient-local-exporter/pkg/weather"
|
"gitea.libretechconsulting.com/rmcguire/ambient-local-exporter/pkg/weather"
|
||||||
"gitea.libretechconsulting.com/rmcguire/ambient-local-exporter/pkg/weather/recorder"
|
"gitea.libretechconsulting.com/rmcguire/ambient-local-exporter/pkg/weather/recorder"
|
||||||
|
"gitea.libretechconsulting.com/rmcguire/ambient-local-exporter/pkg/weather/recorder/recorders"
|
||||||
|
"gitea.libretechconsulting.com/rmcguire/ambient-local-exporter/pkg/weather/recorder/recorders/memory"
|
||||||
|
"gitea.libretechconsulting.com/rmcguire/ambient-local-exporter/pkg/weather/recorder/recorders/noop"
|
||||||
|
"gitea.libretechconsulting.com/rmcguire/ambient-local-exporter/pkg/weather/recorder/recorders/redis"
|
||||||
)
|
)
|
||||||
|
|
||||||
const defUpdatesToKeep = 120
|
const defUpdatesToKeep = 120
|
||||||
@ -70,10 +74,22 @@ func (aw *AmbientWeather) Init() *AmbientWeather {
|
|||||||
}
|
}
|
||||||
span.SetAttributes(attribute.Int("updatesToKeep", updatesToKeep))
|
span.SetAttributes(attribute.Int("updatesToKeep", updatesToKeep))
|
||||||
|
|
||||||
// TODO: Support other recorders (don't rely on default)
|
// Choose weather recorder for grpc / api requests,
|
||||||
aw.weatherRecorder = recorder.NewWeatherRecorder(&recorder.Opts{
|
// default is memory recorder
|
||||||
|
var r recorders.Recorder
|
||||||
|
switch aw.Config.RecorderConfig.Type {
|
||||||
|
case config.TypeMemory:
|
||||||
|
r = &memory.MemoryRecorder{}
|
||||||
|
case config.TypeRedis:
|
||||||
|
r = &redis.RedisRecorder{}
|
||||||
|
case config.TypeNoop:
|
||||||
|
r = &noop.NoopRecorder{}
|
||||||
|
}
|
||||||
|
|
||||||
|
aw.weatherRecorder = recorder.MustNewWeatherRecorder(&recorder.Opts{
|
||||||
Ctx: aw.appCtx,
|
Ctx: aw.appCtx,
|
||||||
KeepLast: updatesToKeep,
|
KeepLast: updatesToKeep,
|
||||||
|
Recorder: r,
|
||||||
})
|
})
|
||||||
|
|
||||||
aw.l.Trace().Any("awConfig", aw.Config).Send()
|
aw.l.Trace().Any("awConfig", aw.Config).Send()
|
||||||
|
@ -8,7 +8,8 @@ import (
|
|||||||
type AmbientLocalExporterConfig struct {
|
type AmbientLocalExporterConfig struct {
|
||||||
MetricPrefix string `yaml:"metricPrefix" default:"weather" env:"AMBIENT_METRIC_PREFIX"`
|
MetricPrefix string `yaml:"metricPrefix" default:"weather" env:"AMBIENT_METRIC_PREFIX"`
|
||||||
UpdatesToKeep *int `yaml:"updatesToKeep" default:"1" env:"AMBIENT_UPDATES_TO_KEEP"`
|
UpdatesToKeep *int `yaml:"updatesToKeep" default:"1" env:"AMBIENT_UPDATES_TO_KEEP"`
|
||||||
WeatherStations []WeatherStation `yaml:"weatherStations" env:"weatherStations"` // No env, too complex, not worth the time
|
WeatherStations []WeatherStation `yaml:"weatherStations"` // No env, too complex, not worth the time
|
||||||
|
RecorderConfig *RecorderConfig `yaml:"recorderConfig"`
|
||||||
*config.AppConfig // Extends app config
|
*config.AppConfig // Extends app config
|
||||||
}
|
}
|
||||||
|
|
||||||
|
25
pkg/ambient/config/config_recorder.go
Normal file
25
pkg/ambient/config/config_recorder.go
Normal file
@ -0,0 +1,25 @@
|
|||||||
|
package config
|
||||||
|
|
||||||
|
type RecorderType string
|
||||||
|
|
||||||
|
const (
|
||||||
|
TypeMemory RecorderType = "memory" // Stores weather updates in memory
|
||||||
|
TypeRedis RecorderType = "redis" // Required for replicas > 1
|
||||||
|
TypeNoop RecorderType = "noop" // No-op implementation
|
||||||
|
)
|
||||||
|
|
||||||
|
type RecorderConfig struct {
|
||||||
|
Type RecorderType `yaml:"type" env:"RECORDER_TYPE"` // memory|redis
|
||||||
|
KeepLast int `yaml:"keepLast" env:"RECORDER_KEEP_LAST"`
|
||||||
|
RedisConfig *RedisConfig
|
||||||
|
}
|
||||||
|
|
||||||
|
type RedisConfig struct {
|
||||||
|
RedisHost string `yaml:"redisHost" env:"REDIS_HOST" default:"127.0.0.1"`
|
||||||
|
RedisPort int `yaml:"redisPort" env:"REDIS_PORT" default:"6379"`
|
||||||
|
RedisUser string `yaml:"redisUser" env:"REDIS_USER"`
|
||||||
|
RedisPassword string `yaml:"redisPassword" env:"REDIS_PASSWORD"`
|
||||||
|
RedisDB int `yaml:"redisDB" env:"REDIS_DB" default:"0"`
|
||||||
|
RedisTLS bool `yaml:"redisTLS" env:"REDIS_TLS" default:"false"`
|
||||||
|
RedisTLSInsecure bool `yaml:"redisTLSInsecure" env:"REDIS_TLS_INSECURE" default:"false"`
|
||||||
|
}
|
@ -1,6 +1,19 @@
|
|||||||
package util
|
package util
|
||||||
|
|
||||||
import "k8s.io/utils/ptr"
|
import (
|
||||||
|
"crypto/md5"
|
||||||
|
"encoding/hex"
|
||||||
|
"fmt"
|
||||||
|
"slices"
|
||||||
|
|
||||||
|
"gitea.libretechconsulting.com/rmcguire/go-app/pkg/config"
|
||||||
|
"k8s.io/utils/ptr"
|
||||||
|
|
||||||
|
pb "gitea.libretechconsulting.com/rmcguire/ambient-local-exporter/api/v1alpha1/weather"
|
||||||
|
"gitea.libretechconsulting.com/rmcguire/ambient-local-exporter/pkg/weather"
|
||||||
|
)
|
||||||
|
|
||||||
|
const defaultLimit = -1
|
||||||
|
|
||||||
func DerefStr(s *string) string {
|
func DerefStr(s *string) string {
|
||||||
if s == nil {
|
if s == nil {
|
||||||
@ -15,3 +28,77 @@ func Int32ptr(i *int) *int32 {
|
|||||||
}
|
}
|
||||||
return ptr.To(int32(*i))
|
return ptr.To(int32(*i))
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Generates a hash that will be consistent
|
||||||
|
// across all running replicas
|
||||||
|
func GetAppHash(conf *config.AppConfig) string {
|
||||||
|
hashName := fmt.Sprintf("%s-%s-%s",
|
||||||
|
conf.Name,
|
||||||
|
conf.Environment,
|
||||||
|
conf.Version,
|
||||||
|
)
|
||||||
|
|
||||||
|
hash := md5.Sum([]byte(hashName))
|
||||||
|
return hex.EncodeToString(hash[:])
|
||||||
|
}
|
||||||
|
|
||||||
|
// Get a limit from req, applying a default if unset or not sane
|
||||||
|
func GetLimitFromReq(req *pb.GetWeatherRequest) int {
|
||||||
|
if req == nil || req.Limit == nil {
|
||||||
|
return defaultLimit
|
||||||
|
}
|
||||||
|
|
||||||
|
if req.GetLimit() == 0 {
|
||||||
|
return defaultLimit
|
||||||
|
}
|
||||||
|
|
||||||
|
return int(req.GetLimit())
|
||||||
|
}
|
||||||
|
|
||||||
|
// Simple helper to trim a list of updates
|
||||||
|
func LimitUpdates(updates []*weather.WeatherUpdate, limit int) []*weather.WeatherUpdate {
|
||||||
|
if limit < 0 {
|
||||||
|
return updates // No limit
|
||||||
|
}
|
||||||
|
|
||||||
|
if len(updates) > limit {
|
||||||
|
return updates[len(updates)-limit:] // Trim to limit
|
||||||
|
}
|
||||||
|
|
||||||
|
return updates // Within limit
|
||||||
|
}
|
||||||
|
|
||||||
|
func ApplyOptsToUpdates(updates []*weather.WeatherUpdate, limit int, opts *pb.GetWeatherOpts) []*weather.WeatherUpdate {
|
||||||
|
if opts == nil {
|
||||||
|
return updates
|
||||||
|
} else if opts.StationName == nil && opts.StationType == nil {
|
||||||
|
return updates
|
||||||
|
}
|
||||||
|
|
||||||
|
filtered := make([]*weather.WeatherUpdate, 0, limit)
|
||||||
|
|
||||||
|
for i := len(updates) - 1; i >= 0; i-- {
|
||||||
|
update := updates[i]
|
||||||
|
match := true
|
||||||
|
|
||||||
|
if opts.GetStationName() != "" {
|
||||||
|
if update.GetStationName() != opts.GetStationName() {
|
||||||
|
match = false
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if opts.GetStationType() != "" {
|
||||||
|
if DerefStr(update.StationType) != opts.GetStationType() {
|
||||||
|
match = false
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if match {
|
||||||
|
filtered = append(filtered, update)
|
||||||
|
if limit > 0 && len(filtered) >= limit {
|
||||||
|
return slices.Clip(filtered)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return slices.Clip(filtered)
|
||||||
|
}
|
||||||
|
@ -10,7 +10,6 @@ import (
|
|||||||
"gitea.libretechconsulting.com/rmcguire/go-app/pkg/otel"
|
"gitea.libretechconsulting.com/rmcguire/go-app/pkg/otel"
|
||||||
|
|
||||||
"gitea.libretechconsulting.com/rmcguire/ambient-local-exporter/pkg/weather/recorder/recorders"
|
"gitea.libretechconsulting.com/rmcguire/ambient-local-exporter/pkg/weather/recorder/recorders"
|
||||||
"gitea.libretechconsulting.com/rmcguire/ambient-local-exporter/pkg/weather/recorder/recorders/memory"
|
|
||||||
)
|
)
|
||||||
|
|
||||||
type WeatherRecorder struct {
|
type WeatherRecorder struct {
|
||||||
@ -27,13 +26,13 @@ type Opts struct {
|
|||||||
KeepLast int
|
KeepLast int
|
||||||
}
|
}
|
||||||
|
|
||||||
func NewWeatherRecorder(opts *Opts) *WeatherRecorder {
|
func MustNewWeatherRecorder(opts *Opts) *WeatherRecorder {
|
||||||
if opts.KeepLast < 1 {
|
if opts.KeepLast < 1 {
|
||||||
opts.KeepLast = 1
|
opts.KeepLast = 1
|
||||||
}
|
}
|
||||||
|
|
||||||
if opts.Recorder == nil {
|
if opts.Recorder == nil {
|
||||||
opts.Recorder = &memory.MemoryRecorder{}
|
panic("no recorder provided")
|
||||||
}
|
}
|
||||||
|
|
||||||
opts.Recorder.Init(opts.Ctx, &recorders.RecorderOpts{
|
opts.Recorder.Init(opts.Ctx, &recorders.RecorderOpts{
|
||||||
|
24
pkg/weather/recorder/recorders/memory/count.go
Normal file
24
pkg/weather/recorder/recorders/memory/count.go
Normal file
@ -0,0 +1,24 @@
|
|||||||
|
package memory
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
|
||||||
|
"go.opentelemetry.io/otel/attribute"
|
||||||
|
"go.opentelemetry.io/otel/codes"
|
||||||
|
)
|
||||||
|
|
||||||
|
func (r *MemoryRecorder) Count(ctx context.Context) int {
|
||||||
|
_, span := r.tracer.Start(ctx, "countWeatherRecorder")
|
||||||
|
defer span.End()
|
||||||
|
|
||||||
|
count := r.count()
|
||||||
|
|
||||||
|
span.SetAttributes(attribute.Int("count", count))
|
||||||
|
span.SetStatus(codes.Ok, "")
|
||||||
|
|
||||||
|
return count
|
||||||
|
}
|
||||||
|
|
||||||
|
func (r *MemoryRecorder) count() int {
|
||||||
|
return len(r.updates)
|
||||||
|
}
|
@ -3,7 +3,6 @@ package memory
|
|||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
"errors"
|
"errors"
|
||||||
"slices"
|
|
||||||
|
|
||||||
"go.opentelemetry.io/otel/attribute"
|
"go.opentelemetry.io/otel/attribute"
|
||||||
"go.opentelemetry.io/otel/codes"
|
"go.opentelemetry.io/otel/codes"
|
||||||
@ -16,7 +15,7 @@ import (
|
|||||||
func (r *MemoryRecorder) Get(ctx context.Context, req *pb.GetWeatherRequest) (
|
func (r *MemoryRecorder) Get(ctx context.Context, req *pb.GetWeatherRequest) (
|
||||||
[]*weather.WeatherUpdate, error,
|
[]*weather.WeatherUpdate, error,
|
||||||
) {
|
) {
|
||||||
ctx, span := r.tracer.Start(ctx, "memoryRecorder.Get")
|
_, span := r.tracer.Start(ctx, "memoryRecorder.Get")
|
||||||
defer span.End()
|
defer span.End()
|
||||||
|
|
||||||
r.RLock()
|
r.RLock()
|
||||||
@ -24,11 +23,12 @@ func (r *MemoryRecorder) Get(ctx context.Context, req *pb.GetWeatherRequest) (
|
|||||||
|
|
||||||
span.AddEvent("acquired lock on recorder cache")
|
span.AddEvent("acquired lock on recorder cache")
|
||||||
|
|
||||||
|
limit := util.GetLimitFromReq(req)
|
||||||
if r.count() == 0 {
|
if r.count() == 0 {
|
||||||
err := errors.New("no recorded updates to get")
|
err := errors.New("no recorded updates to get")
|
||||||
span.RecordError(err)
|
span.RecordError(err)
|
||||||
return nil, err
|
return nil, err
|
||||||
} else if r.count() <= int(*req.Limit) {
|
} else if limit > 0 && r.count() <= limit {
|
||||||
span.RecordError(errors.New("requested more updates than recorded"))
|
span.RecordError(errors.New("requested more updates than recorded"))
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -43,66 +43,8 @@ func (r *MemoryRecorder) Get(ctx context.Context, req *pb.GetWeatherRequest) (
|
|||||||
|
|
||||||
func (r *MemoryRecorder) getUpdatesFromReq(req *pb.GetWeatherRequest) []*weather.WeatherUpdate {
|
func (r *MemoryRecorder) getUpdatesFromReq(req *pb.GetWeatherRequest) []*weather.WeatherUpdate {
|
||||||
if req.Opts == nil {
|
if req.Opts == nil {
|
||||||
return limitUpdates(r.updates, int(req.GetLimit()))
|
return util.LimitUpdates(r.updates, util.GetLimitFromReq(req))
|
||||||
}
|
}
|
||||||
|
|
||||||
return r.applyOptsToUpdates(r.updates, int(req.GetLimit()), req.Opts)
|
return util.ApplyOptsToUpdates(r.updates, util.GetLimitFromReq(req), req.Opts)
|
||||||
}
|
|
||||||
|
|
||||||
func (r *MemoryRecorder) applyOptsToUpdates(updates []*weather.WeatherUpdate, limit int, opts *pb.GetWeatherOpts) []*weather.WeatherUpdate {
|
|
||||||
if opts == nil {
|
|
||||||
return updates
|
|
||||||
} else if opts.StationName == nil && opts.StationType == nil {
|
|
||||||
return updates
|
|
||||||
}
|
|
||||||
|
|
||||||
filtered := make([]*weather.WeatherUpdate, 0, limit)
|
|
||||||
|
|
||||||
for i := len(updates) - 1; i >= 0; i-- {
|
|
||||||
update := updates[i]
|
|
||||||
match := true
|
|
||||||
|
|
||||||
if opts.GetStationName() != "" {
|
|
||||||
if update.GetStationName() != opts.GetStationName() {
|
|
||||||
match = false
|
|
||||||
}
|
|
||||||
}
|
|
||||||
if opts.GetStationType() != "" {
|
|
||||||
if util.DerefStr(update.StationType) != opts.GetStationType() {
|
|
||||||
match = false
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
if match {
|
|
||||||
filtered = append(filtered, update)
|
|
||||||
if len(filtered) >= limit {
|
|
||||||
return filtered
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
return slices.Clip(filtered)
|
|
||||||
}
|
|
||||||
|
|
||||||
func (r *MemoryRecorder) Count(ctx context.Context) int {
|
|
||||||
_, span := r.tracer.Start(ctx, "countWeatherRecorder")
|
|
||||||
defer span.End()
|
|
||||||
|
|
||||||
count := r.count()
|
|
||||||
|
|
||||||
span.SetAttributes(attribute.Int("count", count))
|
|
||||||
span.SetStatus(codes.Ok, "")
|
|
||||||
|
|
||||||
return count
|
|
||||||
}
|
|
||||||
|
|
||||||
func (r *MemoryRecorder) count() int {
|
|
||||||
return len(r.updates)
|
|
||||||
}
|
|
||||||
|
|
||||||
func limitUpdates(updates []*weather.WeatherUpdate, limit int) []*weather.WeatherUpdate {
|
|
||||||
if len(updates) > limit {
|
|
||||||
return updates[len(updates)-limit:]
|
|
||||||
}
|
|
||||||
return updates
|
|
||||||
}
|
}
|
||||||
|
21
pkg/weather/recorder/recorders/noop/noop.go
Normal file
21
pkg/weather/recorder/recorders/noop/noop.go
Normal file
@ -0,0 +1,21 @@
|
|||||||
|
package noop
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
|
||||||
|
pb "gitea.libretechconsulting.com/rmcguire/ambient-local-exporter/api/v1alpha1/weather"
|
||||||
|
"gitea.libretechconsulting.com/rmcguire/ambient-local-exporter/pkg/weather"
|
||||||
|
"gitea.libretechconsulting.com/rmcguire/ambient-local-exporter/pkg/weather/recorder/recorders"
|
||||||
|
)
|
||||||
|
|
||||||
|
type NoopRecorder struct{}
|
||||||
|
|
||||||
|
func (n *NoopRecorder) Init(context.Context, *recorders.RecorderOpts) {}
|
||||||
|
|
||||||
|
func (n *NoopRecorder) Set(context.Context, *weather.WeatherUpdate) error { return nil }
|
||||||
|
|
||||||
|
func (n *NoopRecorder) Get(context.Context, *pb.GetWeatherRequest) ([]*weather.WeatherUpdate, error) {
|
||||||
|
return nil, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (n *NoopRecorder) Count(context.Context) int { return 0 }
|
@ -4,12 +4,14 @@ import (
|
|||||||
"context"
|
"context"
|
||||||
|
|
||||||
pb "gitea.libretechconsulting.com/rmcguire/ambient-local-exporter/api/v1alpha1/weather"
|
pb "gitea.libretechconsulting.com/rmcguire/ambient-local-exporter/api/v1alpha1/weather"
|
||||||
|
"gitea.libretechconsulting.com/rmcguire/ambient-local-exporter/pkg/ambient/config"
|
||||||
"gitea.libretechconsulting.com/rmcguire/ambient-local-exporter/pkg/weather"
|
"gitea.libretechconsulting.com/rmcguire/ambient-local-exporter/pkg/weather"
|
||||||
)
|
)
|
||||||
|
|
||||||
type RecorderOpts struct {
|
type RecorderOpts struct {
|
||||||
RetainLast int
|
RetainLast int
|
||||||
BaseCtx context.Context
|
BaseCtx context.Context
|
||||||
|
AppConfig *config.AmbientLocalExporterConfig
|
||||||
}
|
}
|
||||||
|
|
||||||
type Recorder interface {
|
type Recorder interface {
|
||||||
|
38
pkg/weather/recorder/recorders/redis/count.go
Normal file
38
pkg/weather/recorder/recorders/redis/count.go
Normal file
@ -0,0 +1,38 @@
|
|||||||
|
package redis
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
|
||||||
|
"go.opentelemetry.io/otel/attribute"
|
||||||
|
"go.opentelemetry.io/otel/codes"
|
||||||
|
"go.opentelemetry.io/otel/trace"
|
||||||
|
)
|
||||||
|
|
||||||
|
func (r *RedisRecorder) Count(ctx context.Context) int {
|
||||||
|
ctx, span := r.tracer.Start(ctx, "redisRecorder.count")
|
||||||
|
defer span.End()
|
||||||
|
|
||||||
|
r.RLock()
|
||||||
|
defer r.RUnlock()
|
||||||
|
|
||||||
|
return r.count(ctx)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (r *RedisRecorder) count(ctx context.Context) int {
|
||||||
|
ctx, span := r.tracer.Start(ctx, "redisRecorder.count.redis", trace.WithAttributes(
|
||||||
|
attribute.String("updatesKey", r.Key())))
|
||||||
|
defer span.End()
|
||||||
|
|
||||||
|
count, err := r.redis.LLen(ctx, r.Key()).Result()
|
||||||
|
if err != nil {
|
||||||
|
span.RecordError(err)
|
||||||
|
span.SetStatus(codes.Error, err.Error())
|
||||||
|
r.log.Err(err).Send()
|
||||||
|
return int(count)
|
||||||
|
}
|
||||||
|
|
||||||
|
span.SetAttributes(attribute.Int64("updatesCount", count))
|
||||||
|
span.SetStatus(codes.Ok, "")
|
||||||
|
|
||||||
|
return int(count)
|
||||||
|
}
|
96
pkg/weather/recorder/recorders/redis/get.go
Normal file
96
pkg/weather/recorder/recorders/redis/get.go
Normal file
@ -0,0 +1,96 @@
|
|||||||
|
package redis
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
"encoding/json"
|
||||||
|
"errors"
|
||||||
|
"slices"
|
||||||
|
|
||||||
|
"go.opentelemetry.io/otel/attribute"
|
||||||
|
"go.opentelemetry.io/otel/codes"
|
||||||
|
"go.opentelemetry.io/otel/trace"
|
||||||
|
|
||||||
|
pb "gitea.libretechconsulting.com/rmcguire/ambient-local-exporter/api/v1alpha1/weather"
|
||||||
|
"gitea.libretechconsulting.com/rmcguire/ambient-local-exporter/pkg/util"
|
||||||
|
"gitea.libretechconsulting.com/rmcguire/ambient-local-exporter/pkg/weather"
|
||||||
|
)
|
||||||
|
|
||||||
|
func (r *RedisRecorder) Get(ctx context.Context, req *pb.GetWeatherRequest) (
|
||||||
|
[]*weather.WeatherUpdate, error,
|
||||||
|
) {
|
||||||
|
ctx, span := r.tracer.Start(ctx, "redisRecorder.get", trace.WithAttributes(
|
||||||
|
attribute.Int("limit", util.GetLimitFromReq(req)),
|
||||||
|
))
|
||||||
|
defer span.End()
|
||||||
|
|
||||||
|
return r.get(ctx, req)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (r *RedisRecorder) get(ctx context.Context, req *pb.GetWeatherRequest) (
|
||||||
|
[]*weather.WeatherUpdate, error,
|
||||||
|
) {
|
||||||
|
ctx, span := r.tracer.Start(ctx, "redisRecorder.get.redis")
|
||||||
|
defer span.End()
|
||||||
|
|
||||||
|
limit := util.GetLimitFromReq(req)
|
||||||
|
if limit < 1 {
|
||||||
|
limit = r.keep
|
||||||
|
}
|
||||||
|
|
||||||
|
span.SetAttributes(attribute.Int("limit", limit))
|
||||||
|
|
||||||
|
datas, err := r.redis.LRange(ctx, r.Key(), 0, int64(limit)).Result()
|
||||||
|
if err != nil {
|
||||||
|
span.RecordError(err)
|
||||||
|
span.SetStatus(codes.Error, err.Error())
|
||||||
|
r.log.Err(err).Send()
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
span.AddEvent("redis queried")
|
||||||
|
|
||||||
|
updates, err := jsonDatasToUpdates(datas)
|
||||||
|
if err != nil {
|
||||||
|
span.RecordError(err)
|
||||||
|
span.SetStatus(codes.Error, err.Error())
|
||||||
|
r.log.Err(err).Send()
|
||||||
|
} else {
|
||||||
|
span.SetStatus(codes.Ok, "")
|
||||||
|
}
|
||||||
|
|
||||||
|
span.AddEvent("results unmarshalled")
|
||||||
|
span.SetAttributes(attribute.Int("results", len(updates)))
|
||||||
|
|
||||||
|
filtered := util.ApplyOptsToUpdates(updates, limit, req.Opts)
|
||||||
|
|
||||||
|
span.AddEvent("results filtered")
|
||||||
|
span.SetAttributes(
|
||||||
|
attribute.Int("filteredResults", len(filtered)),
|
||||||
|
attribute.Int("resultsFiltered", len(updates)-len(filtered)),
|
||||||
|
)
|
||||||
|
|
||||||
|
r.log.Debug().
|
||||||
|
Int("updatesRetrieved", len(updates)).
|
||||||
|
Int("updatesAfterFiltering", len(filtered)).
|
||||||
|
Int("updatesFiltered", len(updates)-len(filtered)).
|
||||||
|
Msg("updates retrieved from redis")
|
||||||
|
|
||||||
|
return updates, err
|
||||||
|
}
|
||||||
|
|
||||||
|
func jsonDatasToUpdates(datas []string) ([]*weather.WeatherUpdate, error) {
|
||||||
|
var errs error
|
||||||
|
updates := make([]*weather.WeatherUpdate, 0, len(datas))
|
||||||
|
|
||||||
|
for _, data := range datas {
|
||||||
|
update := new(weather.WeatherUpdate)
|
||||||
|
err := json.Unmarshal([]byte(data), update)
|
||||||
|
errs = errors.Join(errs, err)
|
||||||
|
|
||||||
|
if err != nil {
|
||||||
|
updates = append(updates, update)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return slices.Clip(updates), errs
|
||||||
|
}
|
108
pkg/weather/recorder/recorders/redis/redis.go
Normal file
108
pkg/weather/recorder/recorders/redis/redis.go
Normal file
@ -0,0 +1,108 @@
|
|||||||
|
package redis
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
"crypto/tls"
|
||||||
|
"fmt"
|
||||||
|
"sync"
|
||||||
|
|
||||||
|
"gitea.libretechconsulting.com/rmcguire/go-app/pkg/otel"
|
||||||
|
"go.opentelemetry.io/otel/attribute"
|
||||||
|
"go.opentelemetry.io/otel/codes"
|
||||||
|
"go.opentelemetry.io/otel/trace"
|
||||||
|
|
||||||
|
redis "github.com/redis/go-redis/v9"
|
||||||
|
"github.com/rs/zerolog"
|
||||||
|
|
||||||
|
"gitea.libretechconsulting.com/rmcguire/ambient-local-exporter/pkg/ambient/config"
|
||||||
|
"gitea.libretechconsulting.com/rmcguire/ambient-local-exporter/pkg/util"
|
||||||
|
"gitea.libretechconsulting.com/rmcguire/ambient-local-exporter/pkg/weather/recorder/recorders"
|
||||||
|
)
|
||||||
|
|
||||||
|
const (
|
||||||
|
DEF_RETAIN = 120
|
||||||
|
UPDATES_KEY = "weatherUpdates"
|
||||||
|
)
|
||||||
|
|
||||||
|
type RedisRecorder struct {
|
||||||
|
baseCtx context.Context
|
||||||
|
tracer trace.Tracer
|
||||||
|
redis *redis.Client
|
||||||
|
config *config.AmbientLocalExporterConfig
|
||||||
|
log *zerolog.Logger
|
||||||
|
appKey string // prefix for redis keys, uses app name, environment, and version
|
||||||
|
keep int
|
||||||
|
*sync.RWMutex
|
||||||
|
}
|
||||||
|
|
||||||
|
func (r *RedisRecorder) Init(ctx context.Context, opts *recorders.RecorderOpts) {
|
||||||
|
if opts.RetainLast < 1 {
|
||||||
|
opts.RetainLast = DEF_RETAIN
|
||||||
|
}
|
||||||
|
|
||||||
|
r.log = zerolog.Ctx(opts.BaseCtx)
|
||||||
|
|
||||||
|
r.tracer = otel.GetTracer(r.baseCtx, "redisRecorder")
|
||||||
|
ctx, span := r.tracer.Start(ctx, "redisRecorder.init", trace.WithAttributes(
|
||||||
|
attribute.String("redisHost", opts.AppConfig.RecorderConfig.RedisConfig.RedisHost),
|
||||||
|
attribute.Int("retainLast", opts.RetainLast),
|
||||||
|
attribute.Int("redisPort", opts.AppConfig.RecorderConfig.RedisConfig.RedisPort),
|
||||||
|
attribute.Bool("tls", opts.AppConfig.RecorderConfig.RedisConfig.RedisTLS),
|
||||||
|
))
|
||||||
|
defer span.End()
|
||||||
|
|
||||||
|
r.config = opts.AppConfig
|
||||||
|
r.keep = opts.RetainLast
|
||||||
|
r.baseCtx = opts.BaseCtx
|
||||||
|
r.RWMutex = &sync.RWMutex{}
|
||||||
|
|
||||||
|
// Unique key prefix for this version/env/name of exporter
|
||||||
|
// will be consistent across replicas, but resets on upgrade
|
||||||
|
// as it is using version
|
||||||
|
r.appKey = util.GetAppHash(r.config.AppConfig)
|
||||||
|
|
||||||
|
r.MustInitRedis(ctx)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (r *RedisRecorder) MustInitRedis(ctx context.Context) {
|
||||||
|
ctx, span := r.tracer.Start(ctx, "redisRecorder.init.redis")
|
||||||
|
defer span.End()
|
||||||
|
|
||||||
|
rc := r.config.RecorderConfig.RedisConfig
|
||||||
|
|
||||||
|
var tlsConfig *tls.Config
|
||||||
|
if rc.RedisTLS {
|
||||||
|
tlsConfig = &tls.Config{
|
||||||
|
ServerName: rc.RedisHost,
|
||||||
|
InsecureSkipVerify: rc.RedisTLSInsecure,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
r.redis = redis.NewClient(&redis.Options{
|
||||||
|
Addr: fmt.Sprintf("%s:%d", rc.RedisHost, rc.RedisPort),
|
||||||
|
ClientName: fmt.Sprintf("%s-%s", r.config.Name, r.config.Environment),
|
||||||
|
Username: rc.RedisUser,
|
||||||
|
Password: rc.RedisPassword,
|
||||||
|
DB: rc.RedisDB,
|
||||||
|
TLSConfig: tlsConfig,
|
||||||
|
})
|
||||||
|
|
||||||
|
span.AddEvent("redis client ready")
|
||||||
|
|
||||||
|
resp := r.redis.Ping(ctx)
|
||||||
|
if resp.Err() != nil {
|
||||||
|
span.RecordError(resp.Err())
|
||||||
|
span.SetStatus(codes.Error, resp.Err().Error())
|
||||||
|
r.log.Fatal().Err(resp.Err()).Msg("failed to ping redis")
|
||||||
|
}
|
||||||
|
|
||||||
|
span.AddEvent("redis client ping ok")
|
||||||
|
span.SetStatus(codes.Ok, "")
|
||||||
|
|
||||||
|
r.log.Info().Str("appKey", r.appKey).
|
||||||
|
Msg("redis ping ok, client ready")
|
||||||
|
}
|
||||||
|
|
||||||
|
func (r *RedisRecorder) Key() string {
|
||||||
|
return fmt.Sprintf("%s:%s", r.appKey, UPDATES_KEY)
|
||||||
|
}
|
61
pkg/weather/recorder/recorders/redis/set.go
Normal file
61
pkg/weather/recorder/recorders/redis/set.go
Normal file
@ -0,0 +1,61 @@
|
|||||||
|
package redis
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
"encoding/json"
|
||||||
|
|
||||||
|
"go.opentelemetry.io/otel/attribute"
|
||||||
|
"go.opentelemetry.io/otel/codes"
|
||||||
|
"go.opentelemetry.io/otel/trace"
|
||||||
|
|
||||||
|
"gitea.libretechconsulting.com/rmcguire/ambient-local-exporter/pkg/util"
|
||||||
|
"gitea.libretechconsulting.com/rmcguire/ambient-local-exporter/pkg/weather"
|
||||||
|
)
|
||||||
|
|
||||||
|
func (r *RedisRecorder) Set(ctx context.Context, u *weather.WeatherUpdate) error {
|
||||||
|
ctx, span := r.tracer.Start(ctx, "redisRecorder.set", trace.WithAttributes(
|
||||||
|
attribute.String("stationName", u.GetStationName()),
|
||||||
|
attribute.String("stationType", util.DerefStr(u.StationType)),
|
||||||
|
))
|
||||||
|
defer span.End()
|
||||||
|
|
||||||
|
r.Lock()
|
||||||
|
defer r.RUnlock()
|
||||||
|
|
||||||
|
// First ensure we can prepare our payload
|
||||||
|
data, err := json.Marshal(u)
|
||||||
|
if err != nil {
|
||||||
|
span.RecordError(err)
|
||||||
|
span.SetStatus(codes.Error, err.Error())
|
||||||
|
|
||||||
|
r.log.Err(err).Send()
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
return r.set(ctx, data)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (r *RedisRecorder) set(ctx context.Context, data []byte) error {
|
||||||
|
ctx, span := r.tracer.Start(ctx, "redisRecorder.set.push", trace.WithAttributes(
|
||||||
|
attribute.Int("updateBytes", len(data)),
|
||||||
|
))
|
||||||
|
defer span.End()
|
||||||
|
|
||||||
|
// Atomic, push and trim
|
||||||
|
tx := r.redis.TxPipeline()
|
||||||
|
tx.LPush(ctx, r.Key(), data)
|
||||||
|
tx.LTrim(ctx, r.Key(), 0, int64(r.keep)-1)
|
||||||
|
|
||||||
|
if rErr, err := tx.Exec(ctx); err != nil {
|
||||||
|
for _, cmd := range rErr {
|
||||||
|
span.RecordError(cmd.Err())
|
||||||
|
}
|
||||||
|
span.SetStatus(codes.Error, err.Error())
|
||||||
|
r.log.Err(err).Send()
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
span.SetAttributes(attribute.Int("updateCount", r.count(ctx)))
|
||||||
|
span.SetStatus(codes.Ok, "")
|
||||||
|
return nil
|
||||||
|
}
|
Loading…
x
Reference in New Issue
Block a user