diff --git a/TODO.md b/TODO.md index d6f655b..c1e7a41 100644 --- a/TODO.md +++ b/TODO.md @@ -1,10 +1,11 @@ # TODO +- [ ] Add json schema for config - [ ] Finish implementing weather GRPC - [ ] Update README - [ ] Add Grafana dashboard -- [ ] Add new spans ## Done +- [x] Add new spans - [x] Helm Chart - [x] Add proxy to upstream support - [x] Fix wunderground 401 diff --git a/go.mod b/go.mod index 7526413..f2fd345 100644 --- a/go.mod +++ b/go.mod @@ -21,6 +21,7 @@ require ( github.com/caarlos0/env/v11 v11.3.1 // indirect github.com/cenkalti/backoff/v4 v4.3.0 // indirect github.com/cespare/xxhash/v2 v2.3.0 // indirect + github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f // indirect github.com/felixge/httpsnoop v1.0.4 // indirect github.com/go-logr/logr v1.4.2 // indirect github.com/go-logr/stdr v1.2.2 // indirect @@ -35,6 +36,7 @@ require ( github.com/prometheus/client_model v0.6.1 // indirect github.com/prometheus/common v0.62.0 // indirect github.com/prometheus/procfs v0.15.1 // indirect + github.com/redis/go-redis/v9 v9.7.3 // indirect go.opentelemetry.io/auto/sdk v1.1.0 // indirect go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc v0.60.0 // indirect go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.60.0 // indirect diff --git a/go.sum b/go.sum index be9160a..408872d 100644 --- a/go.sum +++ b/go.sum @@ -15,6 +15,8 @@ github.com/cespare/xxhash/v2 v2.3.0/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XL github.com/coreos/go-systemd/v22 v22.5.0/go.mod h1:Y58oyj3AT4RCenI/lSvhwexgC+NSVTIJ3seZv2GcEnc= github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f h1:lO4WD4F/rVNCu3HqELle0jiPLLBs70cWOduZpkS1E78= +github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f/go.mod h1:cuUVRXasLTGF7a8hSLbxyZXjz+1KgoB3wDUb6vlszIc= github.com/felixge/httpsnoop v1.0.4 h1:NFTV2Zj1bL4mc9sqWACXbQFVBBg2W3GPvqp8/ESS2Wg= github.com/felixge/httpsnoop v1.0.4/go.mod h1:m8KPJKqk1gH5J9DgRY2ASl2lWCfGKXixSwevea8zH2U= github.com/go-logr/logr v1.2.2/go.mod h1:jdQByPbusPIv2/zmleS9BjJVeZ6kBagPoEUsqbVz/1A= @@ -65,6 +67,8 @@ github.com/prometheus/common v0.62.0 h1:xasJaQlnWAeyHdUBeGjXmutelfJHWMRr+Fg4QszZ github.com/prometheus/common v0.62.0/go.mod h1:vyBcEuLSvWos9B1+CyL7JZ2up+uFzXhkqml0W5zIY1I= github.com/prometheus/procfs v0.15.1 h1:YagwOFzUgYfKKHX6Dr+sHT7km/hxC76UB0learggepc= github.com/prometheus/procfs v0.15.1/go.mod h1:fB45yRUv8NstnjriLhBQLuOUt+WW4BsoGhij/e3PBqk= +github.com/redis/go-redis/v9 v9.7.3 h1:YpPyAayJV+XErNsatSElgRZZVCwXX9QzkKYNvO7x0wM= +github.com/redis/go-redis/v9 v9.7.3/go.mod h1:bGUrSggJ9X9GUmZpZNEOQKaANxSGgOEBRltRTZHSvrA= github.com/rogpeppe/go-internal v1.13.1 h1:KvO1DLK/DRN07sQ1LQKScxyZJuNnedQ5/wKSR38lUII= github.com/rogpeppe/go-internal v1.13.1/go.mod h1:uMEvuHeurkdAXX61udpOXGD/AzZDWNMNyH2VO9fmH0o= github.com/rs/xid v1.5.0/go.mod h1:trrq9SKmegXys3aeAKXMUTdJsYXVwGY3RLcfgqegfbg= diff --git a/pkg/ambient/ambient.go b/pkg/ambient/ambient.go index b93ee67..b060df3 100644 --- a/pkg/ambient/ambient.go +++ b/pkg/ambient/ambient.go @@ -21,6 +21,10 @@ import ( "gitea.libretechconsulting.com/rmcguire/ambient-local-exporter/pkg/provider/wunderground" "gitea.libretechconsulting.com/rmcguire/ambient-local-exporter/pkg/weather" "gitea.libretechconsulting.com/rmcguire/ambient-local-exporter/pkg/weather/recorder" + "gitea.libretechconsulting.com/rmcguire/ambient-local-exporter/pkg/weather/recorder/recorders" + "gitea.libretechconsulting.com/rmcguire/ambient-local-exporter/pkg/weather/recorder/recorders/memory" + "gitea.libretechconsulting.com/rmcguire/ambient-local-exporter/pkg/weather/recorder/recorders/noop" + "gitea.libretechconsulting.com/rmcguire/ambient-local-exporter/pkg/weather/recorder/recorders/redis" ) const defUpdatesToKeep = 120 @@ -70,10 +74,22 @@ func (aw *AmbientWeather) Init() *AmbientWeather { } span.SetAttributes(attribute.Int("updatesToKeep", updatesToKeep)) - // TODO: Support other recorders (don't rely on default) - aw.weatherRecorder = recorder.NewWeatherRecorder(&recorder.Opts{ + // Choose weather recorder for grpc / api requests, + // default is memory recorder + var r recorders.Recorder + switch aw.Config.RecorderConfig.Type { + case config.TypeMemory: + r = &memory.MemoryRecorder{} + case config.TypeRedis: + r = &redis.RedisRecorder{} + case config.TypeNoop: + r = &noop.NoopRecorder{} + } + + aw.weatherRecorder = recorder.MustNewWeatherRecorder(&recorder.Opts{ Ctx: aw.appCtx, KeepLast: updatesToKeep, + Recorder: r, }) aw.l.Trace().Any("awConfig", aw.Config).Send() diff --git a/pkg/ambient/config/config.go b/pkg/ambient/config/config.go index 1f9e697..4384c99 100644 --- a/pkg/ambient/config/config.go +++ b/pkg/ambient/config/config.go @@ -8,7 +8,8 @@ import ( type AmbientLocalExporterConfig struct { MetricPrefix string `yaml:"metricPrefix" default:"weather" env:"AMBIENT_METRIC_PREFIX"` UpdatesToKeep *int `yaml:"updatesToKeep" default:"1" env:"AMBIENT_UPDATES_TO_KEEP"` - WeatherStations []WeatherStation `yaml:"weatherStations" env:"weatherStations"` // No env, too complex, not worth the time + WeatherStations []WeatherStation `yaml:"weatherStations"` // No env, too complex, not worth the time + RecorderConfig *RecorderConfig `yaml:"recorderConfig"` *config.AppConfig // Extends app config } diff --git a/pkg/ambient/config/config_recorder.go b/pkg/ambient/config/config_recorder.go new file mode 100644 index 0000000..7967def --- /dev/null +++ b/pkg/ambient/config/config_recorder.go @@ -0,0 +1,25 @@ +package config + +type RecorderType string + +const ( + TypeMemory RecorderType = "memory" // Stores weather updates in memory + TypeRedis RecorderType = "redis" // Required for replicas > 1 + TypeNoop RecorderType = "noop" // No-op implementation +) + +type RecorderConfig struct { + Type RecorderType `yaml:"type" env:"RECORDER_TYPE"` // memory|redis + KeepLast int `yaml:"keepLast" env:"RECORDER_KEEP_LAST"` + RedisConfig *RedisConfig +} + +type RedisConfig struct { + RedisHost string `yaml:"redisHost" env:"REDIS_HOST" default:"127.0.0.1"` + RedisPort int `yaml:"redisPort" env:"REDIS_PORT" default:"6379"` + RedisUser string `yaml:"redisUser" env:"REDIS_USER"` + RedisPassword string `yaml:"redisPassword" env:"REDIS_PASSWORD"` + RedisDB int `yaml:"redisDB" env:"REDIS_DB" default:"0"` + RedisTLS bool `yaml:"redisTLS" env:"REDIS_TLS" default:"false"` + RedisTLSInsecure bool `yaml:"redisTLSInsecure" env:"REDIS_TLS_INSECURE" default:"false"` +} diff --git a/pkg/util/util.go b/pkg/util/util.go index 8e07e36..2d64a28 100644 --- a/pkg/util/util.go +++ b/pkg/util/util.go @@ -1,6 +1,19 @@ package util -import "k8s.io/utils/ptr" +import ( + "crypto/md5" + "encoding/hex" + "fmt" + "slices" + + "gitea.libretechconsulting.com/rmcguire/go-app/pkg/config" + "k8s.io/utils/ptr" + + pb "gitea.libretechconsulting.com/rmcguire/ambient-local-exporter/api/v1alpha1/weather" + "gitea.libretechconsulting.com/rmcguire/ambient-local-exporter/pkg/weather" +) + +const defaultLimit = -1 func DerefStr(s *string) string { if s == nil { @@ -15,3 +28,77 @@ func Int32ptr(i *int) *int32 { } return ptr.To(int32(*i)) } + +// Generates a hash that will be consistent +// across all running replicas +func GetAppHash(conf *config.AppConfig) string { + hashName := fmt.Sprintf("%s-%s-%s", + conf.Name, + conf.Environment, + conf.Version, + ) + + hash := md5.Sum([]byte(hashName)) + return hex.EncodeToString(hash[:]) +} + +// Get a limit from req, applying a default if unset or not sane +func GetLimitFromReq(req *pb.GetWeatherRequest) int { + if req == nil || req.Limit == nil { + return defaultLimit + } + + if req.GetLimit() == 0 { + return defaultLimit + } + + return int(req.GetLimit()) +} + +// Simple helper to trim a list of updates +func LimitUpdates(updates []*weather.WeatherUpdate, limit int) []*weather.WeatherUpdate { + if limit < 0 { + return updates // No limit + } + + if len(updates) > limit { + return updates[len(updates)-limit:] // Trim to limit + } + + return updates // Within limit +} + +func ApplyOptsToUpdates(updates []*weather.WeatherUpdate, limit int, opts *pb.GetWeatherOpts) []*weather.WeatherUpdate { + if opts == nil { + return updates + } else if opts.StationName == nil && opts.StationType == nil { + return updates + } + + filtered := make([]*weather.WeatherUpdate, 0, limit) + + for i := len(updates) - 1; i >= 0; i-- { + update := updates[i] + match := true + + if opts.GetStationName() != "" { + if update.GetStationName() != opts.GetStationName() { + match = false + } + } + if opts.GetStationType() != "" { + if DerefStr(update.StationType) != opts.GetStationType() { + match = false + } + } + + if match { + filtered = append(filtered, update) + if limit > 0 && len(filtered) >= limit { + return slices.Clip(filtered) + } + } + } + + return slices.Clip(filtered) +} diff --git a/pkg/weather/recorder/recorder.go b/pkg/weather/recorder/recorder.go index 88b8ca5..0aed4f6 100644 --- a/pkg/weather/recorder/recorder.go +++ b/pkg/weather/recorder/recorder.go @@ -10,7 +10,6 @@ import ( "gitea.libretechconsulting.com/rmcguire/go-app/pkg/otel" "gitea.libretechconsulting.com/rmcguire/ambient-local-exporter/pkg/weather/recorder/recorders" - "gitea.libretechconsulting.com/rmcguire/ambient-local-exporter/pkg/weather/recorder/recorders/memory" ) type WeatherRecorder struct { @@ -27,13 +26,13 @@ type Opts struct { KeepLast int } -func NewWeatherRecorder(opts *Opts) *WeatherRecorder { +func MustNewWeatherRecorder(opts *Opts) *WeatherRecorder { if opts.KeepLast < 1 { opts.KeepLast = 1 } if opts.Recorder == nil { - opts.Recorder = &memory.MemoryRecorder{} + panic("no recorder provided") } opts.Recorder.Init(opts.Ctx, &recorders.RecorderOpts{ diff --git a/pkg/weather/recorder/recorders/memory/count.go b/pkg/weather/recorder/recorders/memory/count.go new file mode 100644 index 0000000..6dbf2bf --- /dev/null +++ b/pkg/weather/recorder/recorders/memory/count.go @@ -0,0 +1,24 @@ +package memory + +import ( + "context" + + "go.opentelemetry.io/otel/attribute" + "go.opentelemetry.io/otel/codes" +) + +func (r *MemoryRecorder) Count(ctx context.Context) int { + _, span := r.tracer.Start(ctx, "countWeatherRecorder") + defer span.End() + + count := r.count() + + span.SetAttributes(attribute.Int("count", count)) + span.SetStatus(codes.Ok, "") + + return count +} + +func (r *MemoryRecorder) count() int { + return len(r.updates) +} diff --git a/pkg/weather/recorder/recorders/memory/get.go b/pkg/weather/recorder/recorders/memory/get.go index 9831a6f..1859bbf 100644 --- a/pkg/weather/recorder/recorders/memory/get.go +++ b/pkg/weather/recorder/recorders/memory/get.go @@ -3,7 +3,6 @@ package memory import ( "context" "errors" - "slices" "go.opentelemetry.io/otel/attribute" "go.opentelemetry.io/otel/codes" @@ -16,7 +15,7 @@ import ( func (r *MemoryRecorder) Get(ctx context.Context, req *pb.GetWeatherRequest) ( []*weather.WeatherUpdate, error, ) { - ctx, span := r.tracer.Start(ctx, "memoryRecorder.Get") + _, span := r.tracer.Start(ctx, "memoryRecorder.Get") defer span.End() r.RLock() @@ -24,11 +23,12 @@ func (r *MemoryRecorder) Get(ctx context.Context, req *pb.GetWeatherRequest) ( span.AddEvent("acquired lock on recorder cache") + limit := util.GetLimitFromReq(req) if r.count() == 0 { err := errors.New("no recorded updates to get") span.RecordError(err) return nil, err - } else if r.count() <= int(*req.Limit) { + } else if limit > 0 && r.count() <= limit { span.RecordError(errors.New("requested more updates than recorded")) } @@ -43,66 +43,8 @@ func (r *MemoryRecorder) Get(ctx context.Context, req *pb.GetWeatherRequest) ( func (r *MemoryRecorder) getUpdatesFromReq(req *pb.GetWeatherRequest) []*weather.WeatherUpdate { if req.Opts == nil { - return limitUpdates(r.updates, int(req.GetLimit())) + return util.LimitUpdates(r.updates, util.GetLimitFromReq(req)) } - return r.applyOptsToUpdates(r.updates, int(req.GetLimit()), req.Opts) -} - -func (r *MemoryRecorder) applyOptsToUpdates(updates []*weather.WeatherUpdate, limit int, opts *pb.GetWeatherOpts) []*weather.WeatherUpdate { - if opts == nil { - return updates - } else if opts.StationName == nil && opts.StationType == nil { - return updates - } - - filtered := make([]*weather.WeatherUpdate, 0, limit) - - for i := len(updates) - 1; i >= 0; i-- { - update := updates[i] - match := true - - if opts.GetStationName() != "" { - if update.GetStationName() != opts.GetStationName() { - match = false - } - } - if opts.GetStationType() != "" { - if util.DerefStr(update.StationType) != opts.GetStationType() { - match = false - } - } - - if match { - filtered = append(filtered, update) - if len(filtered) >= limit { - return filtered - } - } - } - - return slices.Clip(filtered) -} - -func (r *MemoryRecorder) Count(ctx context.Context) int { - _, span := r.tracer.Start(ctx, "countWeatherRecorder") - defer span.End() - - count := r.count() - - span.SetAttributes(attribute.Int("count", count)) - span.SetStatus(codes.Ok, "") - - return count -} - -func (r *MemoryRecorder) count() int { - return len(r.updates) -} - -func limitUpdates(updates []*weather.WeatherUpdate, limit int) []*weather.WeatherUpdate { - if len(updates) > limit { - return updates[len(updates)-limit:] - } - return updates + return util.ApplyOptsToUpdates(r.updates, util.GetLimitFromReq(req), req.Opts) } diff --git a/pkg/weather/recorder/recorders/noop/noop.go b/pkg/weather/recorder/recorders/noop/noop.go new file mode 100644 index 0000000..2e04106 --- /dev/null +++ b/pkg/weather/recorder/recorders/noop/noop.go @@ -0,0 +1,21 @@ +package noop + +import ( + "context" + + pb "gitea.libretechconsulting.com/rmcguire/ambient-local-exporter/api/v1alpha1/weather" + "gitea.libretechconsulting.com/rmcguire/ambient-local-exporter/pkg/weather" + "gitea.libretechconsulting.com/rmcguire/ambient-local-exporter/pkg/weather/recorder/recorders" +) + +type NoopRecorder struct{} + +func (n *NoopRecorder) Init(context.Context, *recorders.RecorderOpts) {} + +func (n *NoopRecorder) Set(context.Context, *weather.WeatherUpdate) error { return nil } + +func (n *NoopRecorder) Get(context.Context, *pb.GetWeatherRequest) ([]*weather.WeatherUpdate, error) { + return nil, nil +} + +func (n *NoopRecorder) Count(context.Context) int { return 0 } diff --git a/pkg/weather/recorder/recorders/recorders.go b/pkg/weather/recorder/recorders/recorders.go index cf73f8a..d2d870e 100644 --- a/pkg/weather/recorder/recorders/recorders.go +++ b/pkg/weather/recorder/recorders/recorders.go @@ -4,12 +4,14 @@ import ( "context" pb "gitea.libretechconsulting.com/rmcguire/ambient-local-exporter/api/v1alpha1/weather" + "gitea.libretechconsulting.com/rmcguire/ambient-local-exporter/pkg/ambient/config" "gitea.libretechconsulting.com/rmcguire/ambient-local-exporter/pkg/weather" ) type RecorderOpts struct { RetainLast int BaseCtx context.Context + AppConfig *config.AmbientLocalExporterConfig } type Recorder interface { diff --git a/pkg/weather/recorder/recorders/redis/count.go b/pkg/weather/recorder/recorders/redis/count.go new file mode 100644 index 0000000..47dd7d6 --- /dev/null +++ b/pkg/weather/recorder/recorders/redis/count.go @@ -0,0 +1,38 @@ +package redis + +import ( + "context" + + "go.opentelemetry.io/otel/attribute" + "go.opentelemetry.io/otel/codes" + "go.opentelemetry.io/otel/trace" +) + +func (r *RedisRecorder) Count(ctx context.Context) int { + ctx, span := r.tracer.Start(ctx, "redisRecorder.count") + defer span.End() + + r.RLock() + defer r.RUnlock() + + return r.count(ctx) +} + +func (r *RedisRecorder) count(ctx context.Context) int { + ctx, span := r.tracer.Start(ctx, "redisRecorder.count.redis", trace.WithAttributes( + attribute.String("updatesKey", r.Key()))) + defer span.End() + + count, err := r.redis.LLen(ctx, r.Key()).Result() + if err != nil { + span.RecordError(err) + span.SetStatus(codes.Error, err.Error()) + r.log.Err(err).Send() + return int(count) + } + + span.SetAttributes(attribute.Int64("updatesCount", count)) + span.SetStatus(codes.Ok, "") + + return int(count) +} diff --git a/pkg/weather/recorder/recorders/redis/get.go b/pkg/weather/recorder/recorders/redis/get.go new file mode 100644 index 0000000..2c28f0b --- /dev/null +++ b/pkg/weather/recorder/recorders/redis/get.go @@ -0,0 +1,96 @@ +package redis + +import ( + "context" + "encoding/json" + "errors" + "slices" + + "go.opentelemetry.io/otel/attribute" + "go.opentelemetry.io/otel/codes" + "go.opentelemetry.io/otel/trace" + + pb "gitea.libretechconsulting.com/rmcguire/ambient-local-exporter/api/v1alpha1/weather" + "gitea.libretechconsulting.com/rmcguire/ambient-local-exporter/pkg/util" + "gitea.libretechconsulting.com/rmcguire/ambient-local-exporter/pkg/weather" +) + +func (r *RedisRecorder) Get(ctx context.Context, req *pb.GetWeatherRequest) ( + []*weather.WeatherUpdate, error, +) { + ctx, span := r.tracer.Start(ctx, "redisRecorder.get", trace.WithAttributes( + attribute.Int("limit", util.GetLimitFromReq(req)), + )) + defer span.End() + + return r.get(ctx, req) +} + +func (r *RedisRecorder) get(ctx context.Context, req *pb.GetWeatherRequest) ( + []*weather.WeatherUpdate, error, +) { + ctx, span := r.tracer.Start(ctx, "redisRecorder.get.redis") + defer span.End() + + limit := util.GetLimitFromReq(req) + if limit < 1 { + limit = r.keep + } + + span.SetAttributes(attribute.Int("limit", limit)) + + datas, err := r.redis.LRange(ctx, r.Key(), 0, int64(limit)).Result() + if err != nil { + span.RecordError(err) + span.SetStatus(codes.Error, err.Error()) + r.log.Err(err).Send() + return nil, err + } + + span.AddEvent("redis queried") + + updates, err := jsonDatasToUpdates(datas) + if err != nil { + span.RecordError(err) + span.SetStatus(codes.Error, err.Error()) + r.log.Err(err).Send() + } else { + span.SetStatus(codes.Ok, "") + } + + span.AddEvent("results unmarshalled") + span.SetAttributes(attribute.Int("results", len(updates))) + + filtered := util.ApplyOptsToUpdates(updates, limit, req.Opts) + + span.AddEvent("results filtered") + span.SetAttributes( + attribute.Int("filteredResults", len(filtered)), + attribute.Int("resultsFiltered", len(updates)-len(filtered)), + ) + + r.log.Debug(). + Int("updatesRetrieved", len(updates)). + Int("updatesAfterFiltering", len(filtered)). + Int("updatesFiltered", len(updates)-len(filtered)). + Msg("updates retrieved from redis") + + return updates, err +} + +func jsonDatasToUpdates(datas []string) ([]*weather.WeatherUpdate, error) { + var errs error + updates := make([]*weather.WeatherUpdate, 0, len(datas)) + + for _, data := range datas { + update := new(weather.WeatherUpdate) + err := json.Unmarshal([]byte(data), update) + errs = errors.Join(errs, err) + + if err != nil { + updates = append(updates, update) + } + } + + return slices.Clip(updates), errs +} diff --git a/pkg/weather/recorder/recorders/redis/redis.go b/pkg/weather/recorder/recorders/redis/redis.go new file mode 100644 index 0000000..b112ab2 --- /dev/null +++ b/pkg/weather/recorder/recorders/redis/redis.go @@ -0,0 +1,108 @@ +package redis + +import ( + "context" + "crypto/tls" + "fmt" + "sync" + + "gitea.libretechconsulting.com/rmcguire/go-app/pkg/otel" + "go.opentelemetry.io/otel/attribute" + "go.opentelemetry.io/otel/codes" + "go.opentelemetry.io/otel/trace" + + redis "github.com/redis/go-redis/v9" + "github.com/rs/zerolog" + + "gitea.libretechconsulting.com/rmcguire/ambient-local-exporter/pkg/ambient/config" + "gitea.libretechconsulting.com/rmcguire/ambient-local-exporter/pkg/util" + "gitea.libretechconsulting.com/rmcguire/ambient-local-exporter/pkg/weather/recorder/recorders" +) + +const ( + DEF_RETAIN = 120 + UPDATES_KEY = "weatherUpdates" +) + +type RedisRecorder struct { + baseCtx context.Context + tracer trace.Tracer + redis *redis.Client + config *config.AmbientLocalExporterConfig + log *zerolog.Logger + appKey string // prefix for redis keys, uses app name, environment, and version + keep int + *sync.RWMutex +} + +func (r *RedisRecorder) Init(ctx context.Context, opts *recorders.RecorderOpts) { + if opts.RetainLast < 1 { + opts.RetainLast = DEF_RETAIN + } + + r.log = zerolog.Ctx(opts.BaseCtx) + + r.tracer = otel.GetTracer(r.baseCtx, "redisRecorder") + ctx, span := r.tracer.Start(ctx, "redisRecorder.init", trace.WithAttributes( + attribute.String("redisHost", opts.AppConfig.RecorderConfig.RedisConfig.RedisHost), + attribute.Int("retainLast", opts.RetainLast), + attribute.Int("redisPort", opts.AppConfig.RecorderConfig.RedisConfig.RedisPort), + attribute.Bool("tls", opts.AppConfig.RecorderConfig.RedisConfig.RedisTLS), + )) + defer span.End() + + r.config = opts.AppConfig + r.keep = opts.RetainLast + r.baseCtx = opts.BaseCtx + r.RWMutex = &sync.RWMutex{} + + // Unique key prefix for this version/env/name of exporter + // will be consistent across replicas, but resets on upgrade + // as it is using version + r.appKey = util.GetAppHash(r.config.AppConfig) + + r.MustInitRedis(ctx) +} + +func (r *RedisRecorder) MustInitRedis(ctx context.Context) { + ctx, span := r.tracer.Start(ctx, "redisRecorder.init.redis") + defer span.End() + + rc := r.config.RecorderConfig.RedisConfig + + var tlsConfig *tls.Config + if rc.RedisTLS { + tlsConfig = &tls.Config{ + ServerName: rc.RedisHost, + InsecureSkipVerify: rc.RedisTLSInsecure, + } + } + + r.redis = redis.NewClient(&redis.Options{ + Addr: fmt.Sprintf("%s:%d", rc.RedisHost, rc.RedisPort), + ClientName: fmt.Sprintf("%s-%s", r.config.Name, r.config.Environment), + Username: rc.RedisUser, + Password: rc.RedisPassword, + DB: rc.RedisDB, + TLSConfig: tlsConfig, + }) + + span.AddEvent("redis client ready") + + resp := r.redis.Ping(ctx) + if resp.Err() != nil { + span.RecordError(resp.Err()) + span.SetStatus(codes.Error, resp.Err().Error()) + r.log.Fatal().Err(resp.Err()).Msg("failed to ping redis") + } + + span.AddEvent("redis client ping ok") + span.SetStatus(codes.Ok, "") + + r.log.Info().Str("appKey", r.appKey). + Msg("redis ping ok, client ready") +} + +func (r *RedisRecorder) Key() string { + return fmt.Sprintf("%s:%s", r.appKey, UPDATES_KEY) +} diff --git a/pkg/weather/recorder/recorders/redis/set.go b/pkg/weather/recorder/recorders/redis/set.go new file mode 100644 index 0000000..21ab231 --- /dev/null +++ b/pkg/weather/recorder/recorders/redis/set.go @@ -0,0 +1,61 @@ +package redis + +import ( + "context" + "encoding/json" + + "go.opentelemetry.io/otel/attribute" + "go.opentelemetry.io/otel/codes" + "go.opentelemetry.io/otel/trace" + + "gitea.libretechconsulting.com/rmcguire/ambient-local-exporter/pkg/util" + "gitea.libretechconsulting.com/rmcguire/ambient-local-exporter/pkg/weather" +) + +func (r *RedisRecorder) Set(ctx context.Context, u *weather.WeatherUpdate) error { + ctx, span := r.tracer.Start(ctx, "redisRecorder.set", trace.WithAttributes( + attribute.String("stationName", u.GetStationName()), + attribute.String("stationType", util.DerefStr(u.StationType)), + )) + defer span.End() + + r.Lock() + defer r.RUnlock() + + // First ensure we can prepare our payload + data, err := json.Marshal(u) + if err != nil { + span.RecordError(err) + span.SetStatus(codes.Error, err.Error()) + + r.log.Err(err).Send() + return err + } + + return r.set(ctx, data) +} + +func (r *RedisRecorder) set(ctx context.Context, data []byte) error { + ctx, span := r.tracer.Start(ctx, "redisRecorder.set.push", trace.WithAttributes( + attribute.Int("updateBytes", len(data)), + )) + defer span.End() + + // Atomic, push and trim + tx := r.redis.TxPipeline() + tx.LPush(ctx, r.Key(), data) + tx.LTrim(ctx, r.Key(), 0, int64(r.keep)-1) + + if rErr, err := tx.Exec(ctx); err != nil { + for _, cmd := range rErr { + span.RecordError(cmd.Err()) + } + span.SetStatus(codes.Error, err.Error()) + r.log.Err(err).Send() + return err + } + + span.SetAttributes(attribute.Int("updateCount", r.count(ctx))) + span.SetStatus(codes.Ok, "") + return nil +}