implement redis and noop recorders
This commit is contained in:
		| @@ -86,12 +86,18 @@ | |||||||
|     "ConfigRecorderConfig": { |     "ConfigRecorderConfig": { | ||||||
|       "properties": { |       "properties": { | ||||||
|         "keepLast": { |         "keepLast": { | ||||||
|  |           "default": 120, | ||||||
|           "type": "integer" |           "type": "integer" | ||||||
|         }, |         }, | ||||||
|         "redisConfig": { |         "redisConfig": { | ||||||
|           "$ref": "#/definitions/ConfigRedisConfig" |           "$ref": "#/definitions/ConfigRedisConfig" | ||||||
|         }, |         }, | ||||||
|         "type": { |         "type": { | ||||||
|  |           "enum": [ | ||||||
|  |             "memory", | ||||||
|  |             "redis", | ||||||
|  |             "noop" | ||||||
|  |           ], | ||||||
|           "type": "string" |           "type": "string" | ||||||
|         } |         } | ||||||
|       }, |       }, | ||||||
|   | |||||||
| @@ -91,6 +91,7 @@ func (aw *AmbientWeather) Init() *AmbientWeather { | |||||||
| 	} | 	} | ||||||
|  |  | ||||||
| 	aw.weatherRecorder = recorder.MustNewWeatherRecorder(&recorder.Opts{ | 	aw.weatherRecorder = recorder.MustNewWeatherRecorder(&recorder.Opts{ | ||||||
|  | 		AppConfig: aw.Config, | ||||||
| 		Ctx:       aw.appCtx, | 		Ctx:       aw.appCtx, | ||||||
| 		KeepLast:  updatesToKeep, | 		KeepLast:  updatesToKeep, | ||||||
| 		Recorder:  r, | 		Recorder:  r, | ||||||
|   | |||||||
| @@ -9,9 +9,9 @@ const ( | |||||||
| ) | ) | ||||||
|  |  | ||||||
| type RecorderConfig struct { | type RecorderConfig struct { | ||||||
| 	Type        RecorderType `yaml:"type" env:"RECORDER_TYPE" json:"type,omitempty"` // memory|redis | 	Type        RecorderType `yaml:"type" env:"RECORDER_TYPE" json:"type,omitempty" enum:"memory,redis,noop"` | ||||||
| 	KeepLast    int          `yaml:"keepLast" env:"RECORDER_KEEP_LAST" json:"keepLast,omitempty"` | 	KeepLast    int          `yaml:"keepLast" env:"RECORDER_KEEP_LAST" json:"keepLast,omitempty" default:"120"` | ||||||
| 	RedisConfig *RedisConfig `json:"redisConfig,omitempty"` | 	RedisConfig *RedisConfig `yaml:"redisConfig,omitempty" json:"redisConfig,omitempty"` | ||||||
| } | } | ||||||
|  |  | ||||||
| type RedisConfig struct { | type RedisConfig struct { | ||||||
|   | |||||||
| @@ -4,11 +4,13 @@ import ( | |||||||
| 	"context" | 	"context" | ||||||
| 	"sync" | 	"sync" | ||||||
|  |  | ||||||
|  | 	"github.com/rs/zerolog" | ||||||
| 	"go.opentelemetry.io/otel/metric" | 	"go.opentelemetry.io/otel/metric" | ||||||
| 	"go.opentelemetry.io/otel/trace" | 	"go.opentelemetry.io/otel/trace" | ||||||
|  |  | ||||||
| 	"gitea.libretechconsulting.com/rmcguire/go-app/pkg/otel" | 	"gitea.libretechconsulting.com/rmcguire/go-app/pkg/otel" | ||||||
|  |  | ||||||
|  | 	"gitea.libretechconsulting.com/rmcguire/ambient-local-exporter/pkg/ambient/config" | ||||||
| 	"gitea.libretechconsulting.com/rmcguire/ambient-local-exporter/pkg/weather/recorder/recorders" | 	"gitea.libretechconsulting.com/rmcguire/ambient-local-exporter/pkg/weather/recorder/recorders" | ||||||
| ) | ) | ||||||
|  |  | ||||||
| @@ -21,6 +23,7 @@ type WeatherRecorder struct { | |||||||
| } | } | ||||||
|  |  | ||||||
| type Opts struct { | type Opts struct { | ||||||
|  | 	AppConfig *config.AmbientLocalExporterConfig | ||||||
| 	Ctx       context.Context | 	Ctx       context.Context | ||||||
| 	Recorder  recorders.Recorder // If nil, will use memory recorder | 	Recorder  recorders.Recorder // If nil, will use memory recorder | ||||||
| 	KeepLast  int | 	KeepLast  int | ||||||
| @@ -36,10 +39,14 @@ func MustNewWeatherRecorder(opts *Opts) *WeatherRecorder { | |||||||
| 	} | 	} | ||||||
|  |  | ||||||
| 	opts.Recorder.Init(opts.Ctx, &recorders.RecorderOpts{ | 	opts.Recorder.Init(opts.Ctx, &recorders.RecorderOpts{ | ||||||
|  | 		AppConfig:  opts.AppConfig, | ||||||
| 		RetainLast: opts.KeepLast, | 		RetainLast: opts.KeepLast, | ||||||
| 		BaseCtx:    opts.Ctx, | 		BaseCtx:    opts.Ctx, | ||||||
| 	}) | 	}) | ||||||
|  |  | ||||||
|  | 	zerolog.Ctx(opts.Ctx).Info().Str("recorderType", opts.Recorder.Name()). | ||||||
|  | 		Msg("weather update recorder ready") | ||||||
|  |  | ||||||
| 	return &WeatherRecorder{ | 	return &WeatherRecorder{ | ||||||
| 		ctx:      opts.Ctx, | 		ctx:      opts.Ctx, | ||||||
| 		recorder: opts.Recorder, | 		recorder: opts.Recorder, | ||||||
|   | |||||||
| @@ -12,7 +12,10 @@ import ( | |||||||
| 	"gitea.libretechconsulting.com/rmcguire/ambient-local-exporter/pkg/weather/recorder/recorders" | 	"gitea.libretechconsulting.com/rmcguire/ambient-local-exporter/pkg/weather/recorder/recorders" | ||||||
| ) | ) | ||||||
|  |  | ||||||
| const defRetainLast = 120 | const ( | ||||||
|  | 	DEF_RETAIN_LAST = 120 | ||||||
|  | 	NAME            = "memory recorder" | ||||||
|  | ) | ||||||
|  |  | ||||||
| type MemoryRecorder struct { | type MemoryRecorder struct { | ||||||
| 	baseCtx context.Context | 	baseCtx context.Context | ||||||
| @@ -24,7 +27,7 @@ type MemoryRecorder struct { | |||||||
|  |  | ||||||
| func (r *MemoryRecorder) Init(ctx context.Context, opts *recorders.RecorderOpts) { | func (r *MemoryRecorder) Init(ctx context.Context, opts *recorders.RecorderOpts) { | ||||||
| 	if opts.RetainLast < 1 { | 	if opts.RetainLast < 1 { | ||||||
| 		opts.RetainLast = defRetainLast | 		opts.RetainLast = DEF_RETAIN_LAST | ||||||
| 	} | 	} | ||||||
|  |  | ||||||
| 	r.updates = make([]*weather.WeatherUpdate, 0, opts.RetainLast) | 	r.updates = make([]*weather.WeatherUpdate, 0, opts.RetainLast) | ||||||
| @@ -33,3 +36,5 @@ func (r *MemoryRecorder) Init(ctx context.Context, opts *recorders.RecorderOpts) | |||||||
| 	r.RWMutex = &sync.RWMutex{} | 	r.RWMutex = &sync.RWMutex{} | ||||||
| 	r.tracer = otel.GetTracer(r.baseCtx, "memoryRecorder") | 	r.tracer = otel.GetTracer(r.baseCtx, "memoryRecorder") | ||||||
| } | } | ||||||
|  |  | ||||||
|  | func (r *MemoryRecorder) Name() string { return NAME } | ||||||
|   | |||||||
| @@ -19,3 +19,5 @@ func (n *NoopRecorder) Get(context.Context, *pb.GetWeatherRequest) ([]*weather.W | |||||||
| } | } | ||||||
|  |  | ||||||
| func (n *NoopRecorder) Count(context.Context) int { return 0 } | func (n *NoopRecorder) Count(context.Context) int { return 0 } | ||||||
|  |  | ||||||
|  | func (r *NoopRecorder) Name() string { return "no-op recorder" } | ||||||
|   | |||||||
| @@ -19,4 +19,5 @@ type Recorder interface { | |||||||
| 	Set(context.Context, *weather.WeatherUpdate) error | 	Set(context.Context, *weather.WeatherUpdate) error | ||||||
| 	Get(context.Context, *pb.GetWeatherRequest) ([]*weather.WeatherUpdate, error) | 	Get(context.Context, *pb.GetWeatherRequest) ([]*weather.WeatherUpdate, error) | ||||||
| 	Count(context.Context) int // Best Effort | 	Count(context.Context) int // Best Effort | ||||||
|  | 	Name() string | ||||||
| } | } | ||||||
|   | |||||||
| @@ -22,6 +22,7 @@ import ( | |||||||
| const ( | const ( | ||||||
| 	DEF_RETAIN  = 120 | 	DEF_RETAIN  = 120 | ||||||
| 	UPDATES_KEY = "weatherUpdates" | 	UPDATES_KEY = "weatherUpdates" | ||||||
|  | 	NAME        = "redis recorder" | ||||||
| ) | ) | ||||||
|  |  | ||||||
| type RedisRecorder struct { | type RedisRecorder struct { | ||||||
| @@ -36,11 +37,19 @@ type RedisRecorder struct { | |||||||
| } | } | ||||||
|  |  | ||||||
| func (r *RedisRecorder) Init(ctx context.Context, opts *recorders.RecorderOpts) { | func (r *RedisRecorder) Init(ctx context.Context, opts *recorders.RecorderOpts) { | ||||||
|  | 	if opts.AppConfig.RecorderConfig.RedisConfig == nil { | ||||||
|  | 		panic("refusing to init redis recorder with no redisConfig") | ||||||
|  | 	} | ||||||
|  |  | ||||||
| 	if opts.RetainLast < 1 { | 	if opts.RetainLast < 1 { | ||||||
| 		opts.RetainLast = DEF_RETAIN | 		opts.RetainLast = DEF_RETAIN | ||||||
| 	} | 	} | ||||||
|  |  | ||||||
| 	r.log = zerolog.Ctx(opts.BaseCtx) | 	r.config = opts.AppConfig | ||||||
|  | 	r.keep = opts.RetainLast | ||||||
|  | 	r.RWMutex = &sync.RWMutex{} | ||||||
|  | 	r.baseCtx = opts.BaseCtx | ||||||
|  | 	r.log = zerolog.Ctx(r.baseCtx) | ||||||
|  |  | ||||||
| 	r.tracer = otel.GetTracer(r.baseCtx, "redisRecorder") | 	r.tracer = otel.GetTracer(r.baseCtx, "redisRecorder") | ||||||
| 	ctx, span := r.tracer.Start(ctx, "redisRecorder.init", trace.WithAttributes( | 	ctx, span := r.tracer.Start(ctx, "redisRecorder.init", trace.WithAttributes( | ||||||
| @@ -51,11 +60,6 @@ func (r *RedisRecorder) Init(ctx context.Context, opts *recorders.RecorderOpts) | |||||||
| 	)) | 	)) | ||||||
| 	defer span.End() | 	defer span.End() | ||||||
|  |  | ||||||
| 	r.config = opts.AppConfig |  | ||||||
| 	r.keep = opts.RetainLast |  | ||||||
| 	r.baseCtx = opts.BaseCtx |  | ||||||
| 	r.RWMutex = &sync.RWMutex{} |  | ||||||
|  |  | ||||||
| 	// Unique key prefix for this version/env/name of exporter | 	// Unique key prefix for this version/env/name of exporter | ||||||
| 	// will be consistent across replicas, but resets on upgrade | 	// will be consistent across replicas, but resets on upgrade | ||||||
| 	// as it is using version | 	// as it is using version | ||||||
| @@ -106,3 +110,5 @@ func (r *RedisRecorder) MustInitRedis(ctx context.Context) { | |||||||
| func (r *RedisRecorder) Key() string { | func (r *RedisRecorder) Key() string { | ||||||
| 	return fmt.Sprintf("%s:%s", r.appKey, UPDATES_KEY) | 	return fmt.Sprintf("%s:%s", r.appKey, UPDATES_KEY) | ||||||
| } | } | ||||||
|  |  | ||||||
|  | func (r *RedisRecorder) Name() string { return NAME } | ||||||
|   | |||||||
		Reference in New Issue
	
	Block a user