Compare commits

...

3 Commits

Author SHA1 Message Date
6e1ec0b5c1 implement redis and noop recorders
All checks were successful
Build and Publish / check-chart (push) Successful in 20s
Build and Publish / helm-release (push) Has been skipped
Build and Publish / release (push) Successful in 4m12s
2025-03-22 14:29:24 -04:00
46a213f314 support json schema
All checks were successful
Build and Publish / release (push) Has been skipped
Build and Publish / check-chart (push) Successful in 10s
Build and Publish / helm-release (push) Has been skipped
2025-03-22 14:04:20 -04:00
b483fc22a3 recorder implementations
All checks were successful
Build and Publish / release (push) Has been skipped
Build and Publish / check-chart (push) Successful in 11s
Build and Publish / helm-release (push) Has been skipped
2025-03-22 12:05:25 -04:00
21 changed files with 880 additions and 144 deletions

View File

@ -33,6 +33,9 @@ build: test
done
go build -ldflags "-X $(VER_PKG)=$(VERSION)" -o bin/${CMD_NAME}
schema:
go run . -schema > contrib/schema.json
docker:
@echo "Building Docker image $(DOCKER_IMG):$(VERSION)"
docker build \

View File

@ -1,10 +1,11 @@
# TODO
- [ ] Add json schema for config
- [ ] Finish implementing weather GRPC
- [ ] Update README
- [ ] Add Grafana dashboard
- [ ] Add new spans
## Done
- [x] Add new spans
- [x] Helm Chart
- [x] Add proxy to upstream support
- [x] Fix wunderground 401

61
app.go Normal file
View File

@ -0,0 +1,61 @@
package main
import (
"context"
"gitea.libretechconsulting.com/rmcguire/go-app/pkg/app"
grpcopts "gitea.libretechconsulting.com/rmcguire/go-app/pkg/srv/grpc/opts"
httpopts "gitea.libretechconsulting.com/rmcguire/go-app/pkg/srv/http/opts"
weatherpb "gitea.libretechconsulting.com/rmcguire/ambient-local-exporter/api/v1alpha1/weather"
"gitea.libretechconsulting.com/rmcguire/ambient-local-exporter/pkg/ambient"
"gitea.libretechconsulting.com/rmcguire/ambient-local-exporter/pkg/ambient/ambienthttp"
weathergrpc "gitea.libretechconsulting.com/rmcguire/ambient-local-exporter/pkg/weather/grpc"
)
func prepareApp(ctx context.Context, aw *ambient.AmbientWeather) *app.App {
// Load ambient routes into app
awApp := &app.App{
AppContext: ctx,
// HTTP Endpoints for Ambient Weather Stations
HTTP: &httpopts.AppHTTP{
Funcs: []httpopts.HTTPFunc{
{
Path: "/weatherstation/updateweatherstation.php",
HandlerFunc: aw.GetWundergroundHandlerFunc(ctx),
},
{
Path: "/data/report",
HandlerFunc: aw.GetAWNHandlerFunc(ctx),
},
},
// HTTP Listener that fixes broken requests generated by
// some versions of awn firmware
CustomListener: ambienthttp.NewAWNMutatingListener(ctx,
aw.Config.HTTP.Listen), // Necessary to fix certain bad AWN firmware
// Health check funcs
HealthChecks: []httpopts.HealthCheckFunc{
// TODO: Implement
func(ctx context.Context) error {
return nil
},
},
},
// GRPC Service for retrieving weather
GRPC: &grpcopts.AppGRPC{
Services: []*grpcopts.GRPCService{
{
Name: "Weather Service",
Type: &weatherpb.AmbientLocalWeatherService_ServiceDesc,
Service: weathergrpc.NewGRPCWeather(ctx, aw.GetRecorder()),
},
},
},
}
return awApp
}

226
contrib/schema.json Normal file
View File

@ -0,0 +1,226 @@
{
"definitions": {
"ConfigGRPCConfig": {
"properties": {
"enableInstrumentation": {
"type": "boolean"
},
"enableReflection": {
"type": "boolean"
},
"enabled": {
"type": "boolean"
},
"listen": {
"type": "string"
},
"logRequests": {
"type": "boolean"
}
},
"type": "object"
},
"ConfigHTTPConfig": {
"properties": {
"enabled": {
"type": "boolean"
},
"idleTimeout": {
"type": "string"
},
"listen": {
"type": "string"
},
"logRequests": {
"type": "boolean"
},
"readTimeout": {
"type": "string"
},
"writeTimeout": {
"type": "string"
}
},
"type": "object"
},
"ConfigLogConfig": {
"properties": {
"enabled": {
"type": "boolean"
},
"format": {
"type": "string"
},
"level": {
"type": "string"
},
"output": {
"type": "string"
},
"timeFormat": {
"type": "string"
}
},
"type": "object"
},
"ConfigOTELConfig": {
"properties": {
"enabled": {
"type": "boolean"
},
"metricIntervalSecs": {
"type": "integer"
},
"prometheusEnabled": {
"type": "boolean"
},
"prometheusPath": {
"type": "string"
},
"stdoutEnabled": {
"type": "boolean"
}
},
"type": "object"
},
"ConfigRecorderConfig": {
"properties": {
"keepLast": {
"default": 120,
"type": "integer"
},
"redisConfig": {
"$ref": "#/definitions/ConfigRedisConfig"
},
"type": {
"enum": [
"memory",
"redis",
"noop"
],
"type": "string"
}
},
"type": "object"
},
"ConfigRedisConfig": {
"properties": {
"redisDB": {
"default": 0,
"type": "integer"
},
"redisHost": {
"default": "127.0.0.1",
"type": "string"
},
"redisPassword": {
"type": "string"
},
"redisPort": {
"default": 6379,
"type": "integer"
},
"redisTLS": {
"default": false,
"type": "boolean"
},
"redisTLSInsecure": {
"default": false,
"type": "boolean"
},
"redisUser": {
"type": "string"
}
},
"type": "object"
},
"ConfigWeatherStation": {
"properties": {
"awnPassKey": {
"type": "string"
},
"dropMetrics": {
"items": {
"type": "string"
},
"type": "array"
},
"equipment": {
"type": "string"
},
"keepMetrics": {
"items": {
"type": "string"
},
"type": "array"
},
"name": {
"type": "string"
},
"proxyToAWN": {
"type": "boolean"
},
"proxyToWunderground": {
"type": "boolean"
},
"sensorMappings": {
"additionalProperties": {
"type": "string"
},
"type": "object"
},
"wundergroundID": {
"type": "string"
},
"wundergroundPassword": {
"type": "string"
}
},
"type": "object"
}
},
"properties": {
"environment": {
"type": "string"
},
"grpc": {
"$ref": "#/definitions/ConfigGRPCConfig"
},
"http": {
"$ref": "#/definitions/ConfigHTTPConfig"
},
"logging": {
"$ref": "#/definitions/ConfigLogConfig"
},
"metricPrefix": {
"default": "weather",
"type": "string"
},
"name": {
"type": "string"
},
"otel": {
"$ref": "#/definitions/ConfigOTELConfig"
},
"recorderConfig": {
"$ref": "#/definitions/ConfigRecorderConfig"
},
"updatesToKeep": {
"default": 1,
"type": [
"null",
"integer"
]
},
"version": {
"type": "string"
},
"weatherStations": {
"items": {
"$ref": "#/definitions/ConfigWeatherStation"
},
"type": "array"
}
},
"type": "object"
}

18
go.mod
View File

@ -3,17 +3,18 @@ module gitea.libretechconsulting.com/rmcguire/ambient-local-exporter
go 1.23.4
require (
gitea.libretechconsulting.com/rmcguire/go-app v0.6.3
gitea.libretechconsulting.com/rmcguire/go-app v0.7.0
github.com/go-resty/resty/v2 v2.16.5
github.com/gorilla/schema v1.4.1
github.com/rs/zerolog v1.33.0
github.com/redis/go-redis/v9 v9.7.3
github.com/rs/zerolog v1.34.0
go.opentelemetry.io/otel v1.35.0
go.opentelemetry.io/otel/metric v1.35.0
go.opentelemetry.io/otel/trace v1.35.0
golang.org/x/sys v0.31.0
google.golang.org/grpc v1.71.0
google.golang.org/protobuf v1.36.5
k8s.io/utils v0.0.0-20241210054802-24370beab758
k8s.io/utils v0.0.0-20250321185631-1f6e0b77f77e
)
require (
@ -21,6 +22,7 @@ require (
github.com/caarlos0/env/v11 v11.3.1 // indirect
github.com/cenkalti/backoff/v4 v4.3.0 // indirect
github.com/cespare/xxhash/v2 v2.3.0 // indirect
github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f // indirect
github.com/felixge/httpsnoop v1.0.4 // indirect
github.com/go-logr/logr v1.4.2 // indirect
github.com/go-logr/stdr v1.2.2 // indirect
@ -33,8 +35,10 @@ require (
github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822 // indirect
github.com/prometheus/client_golang v1.21.1 // indirect
github.com/prometheus/client_model v0.6.1 // indirect
github.com/prometheus/common v0.62.0 // indirect
github.com/prometheus/procfs v0.15.1 // indirect
github.com/prometheus/common v0.63.0 // indirect
github.com/prometheus/procfs v0.16.0 // indirect
github.com/swaggest/jsonschema-go v0.3.73 // indirect
github.com/swaggest/refl v1.3.1 // indirect
go.opentelemetry.io/auto/sdk v1.1.0 // indirect
go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc v0.60.0 // indirect
go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.60.0 // indirect
@ -49,7 +53,7 @@ require (
go.opentelemetry.io/proto/otlp v1.5.0 // indirect
golang.org/x/net v0.37.0 // indirect
golang.org/x/text v0.23.0 // indirect
google.golang.org/genproto/googleapis/api v0.0.0-20250303144028-a0af3efb3deb // indirect
google.golang.org/genproto/googleapis/rpc v0.0.0-20250303144028-a0af3efb3deb // indirect
google.golang.org/genproto/googleapis/api v0.0.0-20250313205543-e70fdf4c4cb4 // indirect
google.golang.org/genproto/googleapis/rpc v0.0.0-20250313205543-e70fdf4c4cb4 // indirect
gopkg.in/yaml.v3 v3.0.1 // indirect
)

23
go.sum
View File

@ -4,6 +4,8 @@ gitea.libretechconsulting.com/rmcguire/go-app v0.6.2 h1:vpEdZu7WI8qIil5NLf6OUF/T
gitea.libretechconsulting.com/rmcguire/go-app v0.6.2/go.mod h1:S3/vdMEiRWWIdD0Fr+tjJc627VzxNzO4Ia2HgTBXe+g=
gitea.libretechconsulting.com/rmcguire/go-app v0.6.3 h1:dXYHJxK/1vmWBj1wqbqEUncFt3O92agy9gNWoa9NpA0=
gitea.libretechconsulting.com/rmcguire/go-app v0.6.3/go.mod h1:S3/vdMEiRWWIdD0Fr+tjJc627VzxNzO4Ia2HgTBXe+g=
gitea.libretechconsulting.com/rmcguire/go-app v0.7.0 h1:yhRRwV/dxN4Bey1Qv8/rHHV5QvALrZuDJFI+zFPc7sU=
gitea.libretechconsulting.com/rmcguire/go-app v0.7.0/go.mod h1:EM3Z9QcRD+b7UlKGA9y37ppfUBC0Reyf5sYEC6vFZcY=
github.com/beorn7/perks v1.0.1 h1:VlbKKnNfV8bJzeqoa4cOKqO6bYr3WgKZxO8Z16+hsOM=
github.com/beorn7/perks v1.0.1/go.mod h1:G2ZrVWU2WbWT9wwq4/hrbKbnv/1ERSJQ0ibhJ6rlkpw=
github.com/caarlos0/env/v11 v11.3.1 h1:cArPWC15hWmEt+gWk7YBi7lEXTXCvpaSdCiZE2X5mCA=
@ -15,6 +17,8 @@ github.com/cespare/xxhash/v2 v2.3.0/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XL
github.com/coreos/go-systemd/v22 v22.5.0/go.mod h1:Y58oyj3AT4RCenI/lSvhwexgC+NSVTIJ3seZv2GcEnc=
github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f h1:lO4WD4F/rVNCu3HqELle0jiPLLBs70cWOduZpkS1E78=
github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f/go.mod h1:cuUVRXasLTGF7a8hSLbxyZXjz+1KgoB3wDUb6vlszIc=
github.com/felixge/httpsnoop v1.0.4 h1:NFTV2Zj1bL4mc9sqWACXbQFVBBg2W3GPvqp8/ESS2Wg=
github.com/felixge/httpsnoop v1.0.4/go.mod h1:m8KPJKqk1gH5J9DgRY2ASl2lWCfGKXixSwevea8zH2U=
github.com/go-logr/logr v1.2.2/go.mod h1:jdQByPbusPIv2/zmleS9BjJVeZ6kBagPoEUsqbVz/1A=
@ -63,15 +67,28 @@ github.com/prometheus/client_model v0.6.1 h1:ZKSh/rekM+n3CeS952MLRAdFwIKqeY8b62p
github.com/prometheus/client_model v0.6.1/go.mod h1:OrxVMOVHjw3lKMa8+x6HeMGkHMQyHDk9E3jmP2AmGiY=
github.com/prometheus/common v0.62.0 h1:xasJaQlnWAeyHdUBeGjXmutelfJHWMRr+Fg4QszZ2Io=
github.com/prometheus/common v0.62.0/go.mod h1:vyBcEuLSvWos9B1+CyL7JZ2up+uFzXhkqml0W5zIY1I=
github.com/prometheus/common v0.63.0 h1:YR/EIY1o3mEFP/kZCD7iDMnLPlGyuU2Gb3HIcXnA98k=
github.com/prometheus/common v0.63.0/go.mod h1:VVFF/fBIoToEnWRVkYoXEkq3R3paCoxG9PXP74SnV18=
github.com/prometheus/procfs v0.15.1 h1:YagwOFzUgYfKKHX6Dr+sHT7km/hxC76UB0learggepc=
github.com/prometheus/procfs v0.15.1/go.mod h1:fB45yRUv8NstnjriLhBQLuOUt+WW4BsoGhij/e3PBqk=
github.com/prometheus/procfs v0.16.0 h1:xh6oHhKwnOJKMYiYBDWmkHqQPyiY40sny36Cmx2bbsM=
github.com/prometheus/procfs v0.16.0/go.mod h1:8veyXUu3nGP7oaCxhX6yeaM5u4stL2FeMXnCqhDthZg=
github.com/redis/go-redis/v9 v9.7.3 h1:YpPyAayJV+XErNsatSElgRZZVCwXX9QzkKYNvO7x0wM=
github.com/redis/go-redis/v9 v9.7.3/go.mod h1:bGUrSggJ9X9GUmZpZNEOQKaANxSGgOEBRltRTZHSvrA=
github.com/rogpeppe/go-internal v1.13.1 h1:KvO1DLK/DRN07sQ1LQKScxyZJuNnedQ5/wKSR38lUII=
github.com/rogpeppe/go-internal v1.13.1/go.mod h1:uMEvuHeurkdAXX61udpOXGD/AzZDWNMNyH2VO9fmH0o=
github.com/rs/xid v1.5.0/go.mod h1:trrq9SKmegXys3aeAKXMUTdJsYXVwGY3RLcfgqegfbg=
github.com/rs/xid v1.6.0/go.mod h1:7XoLgs4eV+QndskICGsho+ADou8ySMSjJKDIan90Nz0=
github.com/rs/zerolog v1.33.0 h1:1cU2KZkvPxNyfgEmhHAz/1A9Bz+llsdYzklWFzgp0r8=
github.com/rs/zerolog v1.33.0/go.mod h1:/7mN4D5sKwJLZQ2b/znpjC3/GQWY/xaDXUM0kKWRHss=
github.com/rs/zerolog v1.34.0 h1:k43nTLIwcTVQAncfCw4KZ2VY6ukYoZaBPNOE8txlOeY=
github.com/rs/zerolog v1.34.0/go.mod h1:bJsvje4Z08ROH4Nhs5iH600c3IkWhwp44iRc54W6wYQ=
github.com/stretchr/testify v1.10.0 h1:Xv5erBjTwe/5IxqUQTdXv5kgmIvbHo3QQyRwhJsOfJA=
github.com/stretchr/testify v1.10.0/go.mod h1:r2ic/lqez/lEtzL7wO/rwa5dbSLXVDPFyf8C91i36aY=
github.com/swaggest/jsonschema-go v0.3.73 h1:gU1pBzF3pkZ1GDD3dRMdQoCjrA0sldJ+QcM7aSSPgvc=
github.com/swaggest/jsonschema-go v0.3.73/go.mod h1:qp+Ym2DIXHlHzch3HKz50gPf2wJhKOrAB/VYqLS2oJU=
github.com/swaggest/refl v1.3.1 h1:XGplEkYftR7p9cz1lsiwXMM2yzmOymTE9vneVVpaOh4=
github.com/swaggest/refl v1.3.1/go.mod h1:4uUVFVfPJ0NSX9FPwMPspeHos9wPFlCMGoPRllUbpvA=
go.opentelemetry.io/auto/sdk v1.1.0 h1:cH53jehLUN6UFLY71z+NDOiNJqDdPRaXzTel0sJySYA=
go.opentelemetry.io/auto/sdk v1.1.0/go.mod h1:3wSPjt5PWp2RhlCcmmOial7AvC4DQqZb7a7wCow3W8A=
go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc v0.60.0 h1:x7wzEgXfnzJcHDwStJT+mxOz4etr2EcexjqhBvmoakw=
@ -117,8 +134,12 @@ golang.org/x/time v0.6.0 h1:eTDhh4ZXt5Qf0augr54TN6suAUudPcawVZeIAPU7D4U=
golang.org/x/time v0.6.0/go.mod h1:3BpzKBy/shNhVucY/MWOyx10tF3SFh9QdLuxbVysPQM=
google.golang.org/genproto/googleapis/api v0.0.0-20250303144028-a0af3efb3deb h1:p31xT4yrYrSM/G4Sn2+TNUkVhFCbG9y8itM2S6Th950=
google.golang.org/genproto/googleapis/api v0.0.0-20250303144028-a0af3efb3deb/go.mod h1:jbe3Bkdp+Dh2IrslsFCklNhweNTBgSYanP1UXhJDhKg=
google.golang.org/genproto/googleapis/api v0.0.0-20250313205543-e70fdf4c4cb4 h1:IFnXJq3UPB3oBREOodn1v1aGQeZYQclEmvWRMN0PSsY=
google.golang.org/genproto/googleapis/api v0.0.0-20250313205543-e70fdf4c4cb4/go.mod h1:c8q6Z6OCqnfVIqUFJkCzKcrj8eCvUrz+K4KRzSTuANg=
google.golang.org/genproto/googleapis/rpc v0.0.0-20250303144028-a0af3efb3deb h1:TLPQVbx1GJ8VKZxz52VAxl1EBgKXXbTiU9Fc5fZeLn4=
google.golang.org/genproto/googleapis/rpc v0.0.0-20250303144028-a0af3efb3deb/go.mod h1:LuRYeWDFV6WOn90g357N17oMCaxpgCnbi/44qJvDn2I=
google.golang.org/genproto/googleapis/rpc v0.0.0-20250313205543-e70fdf4c4cb4 h1:iK2jbkWL86DXjEx0qiHcRE9dE4/Ahua5k6V8OWFb//c=
google.golang.org/genproto/googleapis/rpc v0.0.0-20250313205543-e70fdf4c4cb4/go.mod h1:LuRYeWDFV6WOn90g357N17oMCaxpgCnbi/44qJvDn2I=
google.golang.org/grpc v1.71.0 h1:kF77BGdPTQ4/JZWMlb9VpJ5pa25aqvVqogsxNHHdeBg=
google.golang.org/grpc v1.71.0/go.mod h1:H0GRtasmQOh9LkFoCPDu3ZrwUtD1YGE+b2vYBYd/8Ec=
google.golang.org/protobuf v1.36.5 h1:tPhr+woSbjfYvY6/GPufUoYizxw1cF/yFoxJ2fmpwlM=
@ -130,3 +151,5 @@ gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA=
gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
k8s.io/utils v0.0.0-20241210054802-24370beab758 h1:sdbE21q2nlQtFh65saZY+rRM6x6aJJI8IUa1AmH/qa0=
k8s.io/utils v0.0.0-20241210054802-24370beab758/go.mod h1:OLgZIPagt7ERELqWJFomSt595RzquPNLL48iOWgYOg0=
k8s.io/utils v0.0.0-20250321185631-1f6e0b77f77e h1:KqK5c/ghOm8xkHYhlodbp6i6+r+ChV2vuAuVRdFbLro=
k8s.io/utils v0.0.0-20250321185631-1f6e0b77f77e/go.mod h1:OLgZIPagt7ERELqWJFomSt595RzquPNLL48iOWgYOg0=

66
main.go
View File

@ -2,25 +2,24 @@ package main
import (
"context"
"flag"
"fmt"
"os"
"os/signal"
"gitea.libretechconsulting.com/rmcguire/go-app/pkg/app"
grpcopts "gitea.libretechconsulting.com/rmcguire/go-app/pkg/srv/grpc/opts"
httpopts "gitea.libretechconsulting.com/rmcguire/go-app/pkg/srv/http/opts"
"golang.org/x/sys/unix"
weatherpb "gitea.libretechconsulting.com/rmcguire/ambient-local-exporter/api/v1alpha1/weather"
"gitea.libretechconsulting.com/rmcguire/ambient-local-exporter/pkg/ambient"
"gitea.libretechconsulting.com/rmcguire/ambient-local-exporter/pkg/ambient/ambienthttp"
"gitea.libretechconsulting.com/rmcguire/ambient-local-exporter/pkg/ambient/config"
weathergrpc "gitea.libretechconsulting.com/rmcguire/ambient-local-exporter/pkg/weather/grpc"
)
const (
defaultMetricPrefix = "weather"
)
var schema bool
func main() {
ctx, cncl := signal.NotifyContext(context.Background(), os.Kill, os.Interrupt, unix.SIGTERM)
defer cncl()
@ -36,6 +35,12 @@ func main() {
// and set up logging, tracing, etc..
aw := ambient.New(ctx, awConfig).Init()
// Just show schema if that's all we were asked to do
if schema {
printSchema(awConfig)
os.Exit(0)
}
// Load http and grpc routes, prepare the app
awApp := prepareApp(ctx, aw)
@ -44,49 +49,16 @@ func main() {
<-awApp.Done()
}
func prepareApp(ctx context.Context, aw *ambient.AmbientWeather) *app.App {
// Load ambient routes into app
awApp := &app.App{
AppContext: ctx,
// flag.Parse will be called by go-app
func init() {
flag.BoolVar(&schema, "schema", false, "generate json schema and exit")
}
// HTTP Endpoints for Ambient Weather Stations
HTTP: &httpopts.AppHTTP{
Funcs: []httpopts.HTTPFunc{
{
Path: "/weatherstation/updateweatherstation.php",
HandlerFunc: aw.GetWundergroundHandlerFunc(ctx),
},
{
Path: "/data/report",
HandlerFunc: aw.GetAWNHandlerFunc(ctx),
},
},
// HTTP Listener that fixes broken requests generated by
// some versions of awn firmware
CustomListener: ambienthttp.NewAWNMutatingListener(ctx,
aw.Config.HTTP.Listen), // Necessary to fix certain bad AWN firmware
// Health check funcs
HealthChecks: []httpopts.HealthCheckFunc{
// TODO: Implement
func(ctx context.Context) error {
return nil
},
},
},
// GRPC Service for retrieving weather
GRPC: &grpcopts.AppGRPC{
Services: []*grpcopts.GRPCService{
{
Name: "Weather Service",
Type: &weatherpb.AmbientLocalWeatherService_ServiceDesc,
Service: weathergrpc.NewGRPCWeather(ctx, aw.GetRecorder()),
},
},
},
func printSchema(config *config.AmbientLocalExporterConfig) {
bytes, err := app.CustomSchema(config)
if err != nil {
panic(err)
}
return awApp
fmt.Println(string(bytes))
}

View File

@ -21,6 +21,10 @@ import (
"gitea.libretechconsulting.com/rmcguire/ambient-local-exporter/pkg/provider/wunderground"
"gitea.libretechconsulting.com/rmcguire/ambient-local-exporter/pkg/weather"
"gitea.libretechconsulting.com/rmcguire/ambient-local-exporter/pkg/weather/recorder"
"gitea.libretechconsulting.com/rmcguire/ambient-local-exporter/pkg/weather/recorder/recorders"
"gitea.libretechconsulting.com/rmcguire/ambient-local-exporter/pkg/weather/recorder/recorders/memory"
"gitea.libretechconsulting.com/rmcguire/ambient-local-exporter/pkg/weather/recorder/recorders/noop"
"gitea.libretechconsulting.com/rmcguire/ambient-local-exporter/pkg/weather/recorder/recorders/redis"
)
const defUpdatesToKeep = 120
@ -70,10 +74,27 @@ func (aw *AmbientWeather) Init() *AmbientWeather {
}
span.SetAttributes(attribute.Int("updatesToKeep", updatesToKeep))
// TODO: Support other recorders (don't rely on default)
aw.weatherRecorder = recorder.NewWeatherRecorder(&recorder.Opts{
Ctx: aw.appCtx,
KeepLast: updatesToKeep,
// Choose weather recorder for grpc / api requests,
// default is memory recorder
var r recorders.Recorder
if aw.Config == nil || aw.Config.RecorderConfig == nil {
r = &memory.MemoryRecorder{}
} else {
switch aw.Config.RecorderConfig.Type {
case config.TypeMemory:
r = &memory.MemoryRecorder{}
case config.TypeRedis:
r = &redis.RedisRecorder{}
case config.TypeNoop:
r = &noop.NoopRecorder{}
}
}
aw.weatherRecorder = recorder.MustNewWeatherRecorder(&recorder.Opts{
AppConfig: aw.Config,
Ctx: aw.appCtx,
KeepLast: updatesToKeep,
Recorder: r,
})
aw.l.Trace().Any("awConfig", aw.Config).Send()

View File

@ -6,24 +6,25 @@ import (
// This configuration includes all config from go-app/config.AppConfig
type AmbientLocalExporterConfig struct {
MetricPrefix string `yaml:"metricPrefix" default:"weather" env:"AMBIENT_METRIC_PREFIX"`
UpdatesToKeep *int `yaml:"updatesToKeep" default:"1" env:"AMBIENT_UPDATES_TO_KEEP"`
WeatherStations []WeatherStation `yaml:"weatherStations" env:"weatherStations"` // No env, too complex, not worth the time
MetricPrefix string `yaml:"metricPrefix" default:"weather" env:"AMBIENT_METRIC_PREFIX" json:"metricPrefix,omitempty"`
UpdatesToKeep *int `yaml:"updatesToKeep" default:"1" env:"AMBIENT_UPDATES_TO_KEEP" json:"updatesToKeep,omitempty"`
WeatherStations []WeatherStation `yaml:"weatherStations" json:"weatherStations,omitempty"` // No env, too complex, not worth the time
RecorderConfig *RecorderConfig `yaml:"recorderConfig" json:"recorderConfig,omitempty"`
*config.AppConfig // Extends app config
}
type WeatherStation struct {
Name string `yaml:"name"` // Human Friendly Name (e.g. Back Yard Weather)
Equipment string `yaml:"equipment"` // Equipment Type (e.g. WS-5000)
Name string `yaml:"name" json:"name,omitempty"` // Human Friendly Name (e.g. Back Yard Weather)
Equipment string `yaml:"equipment" json:"equipment,omitempty"` // Equipment Type (e.g. WS-5000)
// Required if proxying to awn/wu is enabled
WundergroundID string `yaml:"wundergroundID"`
WundergroundPassword string `yaml:"wundergroundPassword"`
AWNPassKey string `yaml:"awnPassKey"`
WundergroundID string `yaml:"wundergroundID" json:"wundergroundID,omitempty"`
WundergroundPassword string `yaml:"wundergroundPassword" json:"wundergroundPassword,omitempty"`
AWNPassKey string `yaml:"awnPassKey" json:"awnPassKey,omitempty"`
// Proxy updates to AWN or Wunderground
ProxyToAWN bool `yaml:"proxyToAWN"`
ProxyToWunderground bool `yaml:"proxyToWunderground"`
ProxyToAWN bool `yaml:"proxyToAWN" json:"proxyToAWN,omitempty"`
ProxyToWunderground bool `yaml:"proxyToWunderground" json:"proxyToWunderground,omitempty"`
// Unreliable / unwanted metrics by name of WeatherUpdate Field
// will be excluded if present in discardMetrics
@ -32,8 +33,8 @@ type WeatherStation struct {
// ignoring discardMetrics.
//
// Check weather.WeatherUpdateField for options
KeepMetrics []string `yaml:"keepMetrics"`
DropMetrics []string `yaml:"dropMetrics"`
KeepMetrics []string `yaml:"keepMetrics" json:"keepMetrics,omitempty"`
DropMetrics []string `yaml:"dropMetrics" json:"dropMetrics,omitempty"`
// Relabels battery and sensor names
// Temp+Humidity Sensors:
@ -43,5 +44,5 @@ type WeatherStation struct {
// - OutdoorSensor
// - RainSensor
// - CO2Sensor
SensorMappings map[string]string `yaml:"sensorMappings"`
SensorMappings map[string]string `yaml:"sensorMappings" json:"sensorMappings,omitempty"`
}

View File

@ -0,0 +1,25 @@
package config
type RecorderType string
const (
TypeMemory RecorderType = "memory" // Stores weather updates in memory
TypeRedis RecorderType = "redis" // Required for replicas > 1
TypeNoop RecorderType = "noop" // No-op implementation
)
type RecorderConfig struct {
Type RecorderType `yaml:"type" env:"RECORDER_TYPE" json:"type,omitempty" enum:"memory,redis,noop"`
KeepLast int `yaml:"keepLast" env:"RECORDER_KEEP_LAST" json:"keepLast,omitempty" default:"120"`
RedisConfig *RedisConfig `yaml:"redisConfig,omitempty" json:"redisConfig,omitempty"`
}
type RedisConfig struct {
RedisHost string `yaml:"redisHost" env:"REDIS_HOST" default:"127.0.0.1" json:"redisHost,omitempty"`
RedisPort int `yaml:"redisPort" env:"REDIS_PORT" default:"6379" json:"redisPort,omitempty"`
RedisUser string `yaml:"redisUser" env:"REDIS_USER" json:"redisUser,omitempty"`
RedisPassword string `yaml:"redisPassword" env:"REDIS_PASSWORD" json:"redisPassword,omitempty"`
RedisDB int `yaml:"redisDB" env:"REDIS_DB" default:"0" json:"redisDB,omitempty"`
RedisTLS bool `yaml:"redisTLS" env:"REDIS_TLS" default:"false" json:"redisTLS,omitempty"`
RedisTLSInsecure bool `yaml:"redisTLSInsecure" env:"REDIS_TLS_INSECURE" default:"false" json:"redisTLSInsecure,omitempty"`
}

View File

@ -1,6 +1,19 @@
package util
import "k8s.io/utils/ptr"
import (
"crypto/md5"
"encoding/hex"
"fmt"
"slices"
"gitea.libretechconsulting.com/rmcguire/go-app/pkg/config"
"k8s.io/utils/ptr"
pb "gitea.libretechconsulting.com/rmcguire/ambient-local-exporter/api/v1alpha1/weather"
"gitea.libretechconsulting.com/rmcguire/ambient-local-exporter/pkg/weather"
)
const defaultLimit = -1
func DerefStr(s *string) string {
if s == nil {
@ -15,3 +28,77 @@ func Int32ptr(i *int) *int32 {
}
return ptr.To(int32(*i))
}
// Generates a hash that will be consistent
// across all running replicas
func GetAppHash(conf *config.AppConfig) string {
hashName := fmt.Sprintf("%s-%s-%s",
conf.Name,
conf.Environment,
conf.Version,
)
hash := md5.Sum([]byte(hashName))
return hex.EncodeToString(hash[:])
}
// Get a limit from req, applying a default if unset or not sane
func GetLimitFromReq(req *pb.GetWeatherRequest) int {
if req == nil || req.Limit == nil {
return defaultLimit
}
if req.GetLimit() == 0 {
return defaultLimit
}
return int(req.GetLimit())
}
// Simple helper to trim a list of updates
func LimitUpdates(updates []*weather.WeatherUpdate, limit int) []*weather.WeatherUpdate {
if limit < 0 {
return updates // No limit
}
if len(updates) > limit {
return updates[len(updates)-limit:] // Trim to limit
}
return updates // Within limit
}
func ApplyOptsToUpdates(updates []*weather.WeatherUpdate, limit int, opts *pb.GetWeatherOpts) []*weather.WeatherUpdate {
if opts == nil {
return updates
} else if opts.StationName == nil && opts.StationType == nil {
return updates
}
filtered := make([]*weather.WeatherUpdate, 0, limit)
for i := len(updates) - 1; i >= 0; i-- {
update := updates[i]
match := true
if opts.GetStationName() != "" {
if update.GetStationName() != opts.GetStationName() {
match = false
}
}
if opts.GetStationType() != "" {
if DerefStr(update.StationType) != opts.GetStationType() {
match = false
}
}
if match {
filtered = append(filtered, update)
if limit > 0 && len(filtered) >= limit {
return slices.Clip(filtered)
}
}
}
return slices.Clip(filtered)
}

View File

@ -4,13 +4,14 @@ import (
"context"
"sync"
"github.com/rs/zerolog"
"go.opentelemetry.io/otel/metric"
"go.opentelemetry.io/otel/trace"
"gitea.libretechconsulting.com/rmcguire/go-app/pkg/otel"
"gitea.libretechconsulting.com/rmcguire/ambient-local-exporter/pkg/ambient/config"
"gitea.libretechconsulting.com/rmcguire/ambient-local-exporter/pkg/weather/recorder/recorders"
"gitea.libretechconsulting.com/rmcguire/ambient-local-exporter/pkg/weather/recorder/recorders/memory"
)
type WeatherRecorder struct {
@ -22,25 +23,30 @@ type WeatherRecorder struct {
}
type Opts struct {
Ctx context.Context
Recorder recorders.Recorder // If nil, will use memory recorder
KeepLast int
AppConfig *config.AmbientLocalExporterConfig
Ctx context.Context
Recorder recorders.Recorder // If nil, will use memory recorder
KeepLast int
}
func NewWeatherRecorder(opts *Opts) *WeatherRecorder {
func MustNewWeatherRecorder(opts *Opts) *WeatherRecorder {
if opts.KeepLast < 1 {
opts.KeepLast = 1
}
if opts.Recorder == nil {
opts.Recorder = &memory.MemoryRecorder{}
panic("no recorder provided")
}
opts.Recorder.Init(opts.Ctx, &recorders.RecorderOpts{
AppConfig: opts.AppConfig,
RetainLast: opts.KeepLast,
BaseCtx: opts.Ctx,
})
zerolog.Ctx(opts.Ctx).Info().Str("recorderType", opts.Recorder.Name()).
Msg("weather update recorder ready")
return &WeatherRecorder{
ctx: opts.Ctx,
recorder: opts.Recorder,

View File

@ -0,0 +1,24 @@
package memory
import (
"context"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/codes"
)
func (r *MemoryRecorder) Count(ctx context.Context) int {
_, span := r.tracer.Start(ctx, "countWeatherRecorder")
defer span.End()
count := r.count()
span.SetAttributes(attribute.Int("count", count))
span.SetStatus(codes.Ok, "")
return count
}
func (r *MemoryRecorder) count() int {
return len(r.updates)
}

View File

@ -3,7 +3,6 @@ package memory
import (
"context"
"errors"
"slices"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/codes"
@ -16,7 +15,7 @@ import (
func (r *MemoryRecorder) Get(ctx context.Context, req *pb.GetWeatherRequest) (
[]*weather.WeatherUpdate, error,
) {
ctx, span := r.tracer.Start(ctx, "memoryRecorder.Get")
_, span := r.tracer.Start(ctx, "memoryRecorder.Get")
defer span.End()
r.RLock()
@ -24,11 +23,12 @@ func (r *MemoryRecorder) Get(ctx context.Context, req *pb.GetWeatherRequest) (
span.AddEvent("acquired lock on recorder cache")
limit := util.GetLimitFromReq(req)
if r.count() == 0 {
err := errors.New("no recorded updates to get")
span.RecordError(err)
return nil, err
} else if r.count() <= int(*req.Limit) {
} else if limit > 0 && r.count() <= limit {
span.RecordError(errors.New("requested more updates than recorded"))
}
@ -43,66 +43,8 @@ func (r *MemoryRecorder) Get(ctx context.Context, req *pb.GetWeatherRequest) (
func (r *MemoryRecorder) getUpdatesFromReq(req *pb.GetWeatherRequest) []*weather.WeatherUpdate {
if req.Opts == nil {
return limitUpdates(r.updates, int(req.GetLimit()))
return util.LimitUpdates(r.updates, util.GetLimitFromReq(req))
}
return r.applyOptsToUpdates(r.updates, int(req.GetLimit()), req.Opts)
}
func (r *MemoryRecorder) applyOptsToUpdates(updates []*weather.WeatherUpdate, limit int, opts *pb.GetWeatherOpts) []*weather.WeatherUpdate {
if opts == nil {
return updates
} else if opts.StationName == nil && opts.StationType == nil {
return updates
}
filtered := make([]*weather.WeatherUpdate, 0, limit)
for i := len(updates) - 1; i >= 0; i-- {
update := updates[i]
match := true
if opts.GetStationName() != "" {
if update.GetStationName() != opts.GetStationName() {
match = false
}
}
if opts.GetStationType() != "" {
if util.DerefStr(update.StationType) != opts.GetStationType() {
match = false
}
}
if match {
filtered = append(filtered, update)
if len(filtered) >= limit {
return filtered
}
}
}
return slices.Clip(filtered)
}
func (r *MemoryRecorder) Count(ctx context.Context) int {
_, span := r.tracer.Start(ctx, "countWeatherRecorder")
defer span.End()
count := r.count()
span.SetAttributes(attribute.Int("count", count))
span.SetStatus(codes.Ok, "")
return count
}
func (r *MemoryRecorder) count() int {
return len(r.updates)
}
func limitUpdates(updates []*weather.WeatherUpdate, limit int) []*weather.WeatherUpdate {
if len(updates) > limit {
return updates[len(updates)-limit:]
}
return updates
return util.ApplyOptsToUpdates(r.updates, util.GetLimitFromReq(req), req.Opts)
}

View File

@ -12,7 +12,10 @@ import (
"gitea.libretechconsulting.com/rmcguire/ambient-local-exporter/pkg/weather/recorder/recorders"
)
const defRetainLast = 120
const (
DEF_RETAIN_LAST = 120
NAME = "memory recorder"
)
type MemoryRecorder struct {
baseCtx context.Context
@ -24,7 +27,7 @@ type MemoryRecorder struct {
func (r *MemoryRecorder) Init(ctx context.Context, opts *recorders.RecorderOpts) {
if opts.RetainLast < 1 {
opts.RetainLast = defRetainLast
opts.RetainLast = DEF_RETAIN_LAST
}
r.updates = make([]*weather.WeatherUpdate, 0, opts.RetainLast)
@ -33,3 +36,5 @@ func (r *MemoryRecorder) Init(ctx context.Context, opts *recorders.RecorderOpts)
r.RWMutex = &sync.RWMutex{}
r.tracer = otel.GetTracer(r.baseCtx, "memoryRecorder")
}
func (r *MemoryRecorder) Name() string { return NAME }

View File

@ -0,0 +1,23 @@
package noop
import (
"context"
pb "gitea.libretechconsulting.com/rmcguire/ambient-local-exporter/api/v1alpha1/weather"
"gitea.libretechconsulting.com/rmcguire/ambient-local-exporter/pkg/weather"
"gitea.libretechconsulting.com/rmcguire/ambient-local-exporter/pkg/weather/recorder/recorders"
)
type NoopRecorder struct{}
func (n *NoopRecorder) Init(context.Context, *recorders.RecorderOpts) {}
func (n *NoopRecorder) Set(context.Context, *weather.WeatherUpdate) error { return nil }
func (n *NoopRecorder) Get(context.Context, *pb.GetWeatherRequest) ([]*weather.WeatherUpdate, error) {
return nil, nil
}
func (n *NoopRecorder) Count(context.Context) int { return 0 }
func (r *NoopRecorder) Name() string { return "no-op recorder" }

View File

@ -4,12 +4,14 @@ import (
"context"
pb "gitea.libretechconsulting.com/rmcguire/ambient-local-exporter/api/v1alpha1/weather"
"gitea.libretechconsulting.com/rmcguire/ambient-local-exporter/pkg/ambient/config"
"gitea.libretechconsulting.com/rmcguire/ambient-local-exporter/pkg/weather"
)
type RecorderOpts struct {
RetainLast int
BaseCtx context.Context
AppConfig *config.AmbientLocalExporterConfig
}
type Recorder interface {
@ -17,4 +19,5 @@ type Recorder interface {
Set(context.Context, *weather.WeatherUpdate) error
Get(context.Context, *pb.GetWeatherRequest) ([]*weather.WeatherUpdate, error)
Count(context.Context) int // Best Effort
Name() string
}

View File

@ -0,0 +1,38 @@
package redis
import (
"context"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/codes"
"go.opentelemetry.io/otel/trace"
)
func (r *RedisRecorder) Count(ctx context.Context) int {
ctx, span := r.tracer.Start(ctx, "redisRecorder.count")
defer span.End()
r.RLock()
defer r.RUnlock()
return r.count(ctx)
}
func (r *RedisRecorder) count(ctx context.Context) int {
ctx, span := r.tracer.Start(ctx, "redisRecorder.count.redis", trace.WithAttributes(
attribute.String("updatesKey", r.Key())))
defer span.End()
count, err := r.redis.LLen(ctx, r.Key()).Result()
if err != nil {
span.RecordError(err)
span.SetStatus(codes.Error, err.Error())
r.log.Err(err).Send()
return int(count)
}
span.SetAttributes(attribute.Int64("updatesCount", count))
span.SetStatus(codes.Ok, "")
return int(count)
}

View File

@ -0,0 +1,96 @@
package redis
import (
"context"
"encoding/json"
"errors"
"slices"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/codes"
"go.opentelemetry.io/otel/trace"
pb "gitea.libretechconsulting.com/rmcguire/ambient-local-exporter/api/v1alpha1/weather"
"gitea.libretechconsulting.com/rmcguire/ambient-local-exporter/pkg/util"
"gitea.libretechconsulting.com/rmcguire/ambient-local-exporter/pkg/weather"
)
func (r *RedisRecorder) Get(ctx context.Context, req *pb.GetWeatherRequest) (
[]*weather.WeatherUpdate, error,
) {
ctx, span := r.tracer.Start(ctx, "redisRecorder.get", trace.WithAttributes(
attribute.Int("limit", util.GetLimitFromReq(req)),
))
defer span.End()
return r.get(ctx, req)
}
func (r *RedisRecorder) get(ctx context.Context, req *pb.GetWeatherRequest) (
[]*weather.WeatherUpdate, error,
) {
ctx, span := r.tracer.Start(ctx, "redisRecorder.get.redis")
defer span.End()
limit := util.GetLimitFromReq(req)
if limit < 1 {
limit = r.keep
}
span.SetAttributes(attribute.Int("limit", limit))
datas, err := r.redis.LRange(ctx, r.Key(), 0, int64(limit)).Result()
if err != nil {
span.RecordError(err)
span.SetStatus(codes.Error, err.Error())
r.log.Err(err).Send()
return nil, err
}
span.AddEvent("redis queried")
updates, err := jsonDatasToUpdates(datas)
if err != nil {
span.RecordError(err)
span.SetStatus(codes.Error, err.Error())
r.log.Err(err).Send()
} else {
span.SetStatus(codes.Ok, "")
}
span.AddEvent("results unmarshalled")
span.SetAttributes(attribute.Int("results", len(updates)))
filtered := util.ApplyOptsToUpdates(updates, limit, req.Opts)
span.AddEvent("results filtered")
span.SetAttributes(
attribute.Int("filteredResults", len(filtered)),
attribute.Int("resultsFiltered", len(updates)-len(filtered)),
)
r.log.Debug().
Int("updatesRetrieved", len(updates)).
Int("updatesAfterFiltering", len(filtered)).
Int("updatesFiltered", len(updates)-len(filtered)).
Msg("updates retrieved from redis")
return updates, err
}
func jsonDatasToUpdates(datas []string) ([]*weather.WeatherUpdate, error) {
var errs error
updates := make([]*weather.WeatherUpdate, 0, len(datas))
for _, data := range datas {
update := new(weather.WeatherUpdate)
err := json.Unmarshal([]byte(data), update)
errs = errors.Join(errs, err)
if err != nil {
updates = append(updates, update)
}
}
return slices.Clip(updates), errs
}

View File

@ -0,0 +1,114 @@
package redis
import (
"context"
"crypto/tls"
"fmt"
"sync"
"gitea.libretechconsulting.com/rmcguire/go-app/pkg/otel"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/codes"
"go.opentelemetry.io/otel/trace"
redis "github.com/redis/go-redis/v9"
"github.com/rs/zerolog"
"gitea.libretechconsulting.com/rmcguire/ambient-local-exporter/pkg/ambient/config"
"gitea.libretechconsulting.com/rmcguire/ambient-local-exporter/pkg/util"
"gitea.libretechconsulting.com/rmcguire/ambient-local-exporter/pkg/weather/recorder/recorders"
)
const (
DEF_RETAIN = 120
UPDATES_KEY = "weatherUpdates"
NAME = "redis recorder"
)
type RedisRecorder struct {
baseCtx context.Context
tracer trace.Tracer
redis *redis.Client
config *config.AmbientLocalExporterConfig
log *zerolog.Logger
appKey string // prefix for redis keys, uses app name, environment, and version
keep int
*sync.RWMutex
}
func (r *RedisRecorder) Init(ctx context.Context, opts *recorders.RecorderOpts) {
if opts.AppConfig.RecorderConfig.RedisConfig == nil {
panic("refusing to init redis recorder with no redisConfig")
}
if opts.RetainLast < 1 {
opts.RetainLast = DEF_RETAIN
}
r.config = opts.AppConfig
r.keep = opts.RetainLast
r.RWMutex = &sync.RWMutex{}
r.baseCtx = opts.BaseCtx
r.log = zerolog.Ctx(r.baseCtx)
r.tracer = otel.GetTracer(r.baseCtx, "redisRecorder")
ctx, span := r.tracer.Start(ctx, "redisRecorder.init", trace.WithAttributes(
attribute.String("redisHost", opts.AppConfig.RecorderConfig.RedisConfig.RedisHost),
attribute.Int("retainLast", opts.RetainLast),
attribute.Int("redisPort", opts.AppConfig.RecorderConfig.RedisConfig.RedisPort),
attribute.Bool("tls", opts.AppConfig.RecorderConfig.RedisConfig.RedisTLS),
))
defer span.End()
// Unique key prefix for this version/env/name of exporter
// will be consistent across replicas, but resets on upgrade
// as it is using version
r.appKey = util.GetAppHash(r.config.AppConfig)
r.MustInitRedis(ctx)
}
func (r *RedisRecorder) MustInitRedis(ctx context.Context) {
ctx, span := r.tracer.Start(ctx, "redisRecorder.init.redis")
defer span.End()
rc := r.config.RecorderConfig.RedisConfig
var tlsConfig *tls.Config
if rc.RedisTLS {
tlsConfig = &tls.Config{
ServerName: rc.RedisHost,
InsecureSkipVerify: rc.RedisTLSInsecure,
}
}
r.redis = redis.NewClient(&redis.Options{
Addr: fmt.Sprintf("%s:%d", rc.RedisHost, rc.RedisPort),
ClientName: fmt.Sprintf("%s-%s", r.config.Name, r.config.Environment),
Username: rc.RedisUser,
Password: rc.RedisPassword,
DB: rc.RedisDB,
TLSConfig: tlsConfig,
})
span.AddEvent("redis client ready")
resp := r.redis.Ping(ctx)
if resp.Err() != nil {
span.RecordError(resp.Err())
span.SetStatus(codes.Error, resp.Err().Error())
r.log.Fatal().Err(resp.Err()).Msg("failed to ping redis")
}
span.AddEvent("redis client ping ok")
span.SetStatus(codes.Ok, "")
r.log.Info().Str("appKey", r.appKey).
Msg("redis ping ok, client ready")
}
func (r *RedisRecorder) Key() string {
return fmt.Sprintf("%s:%s", r.appKey, UPDATES_KEY)
}
func (r *RedisRecorder) Name() string { return NAME }

View File

@ -0,0 +1,61 @@
package redis
import (
"context"
"encoding/json"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/codes"
"go.opentelemetry.io/otel/trace"
"gitea.libretechconsulting.com/rmcguire/ambient-local-exporter/pkg/util"
"gitea.libretechconsulting.com/rmcguire/ambient-local-exporter/pkg/weather"
)
func (r *RedisRecorder) Set(ctx context.Context, u *weather.WeatherUpdate) error {
ctx, span := r.tracer.Start(ctx, "redisRecorder.set", trace.WithAttributes(
attribute.String("stationName", u.GetStationName()),
attribute.String("stationType", util.DerefStr(u.StationType)),
))
defer span.End()
r.Lock()
defer r.RUnlock()
// First ensure we can prepare our payload
data, err := json.Marshal(u)
if err != nil {
span.RecordError(err)
span.SetStatus(codes.Error, err.Error())
r.log.Err(err).Send()
return err
}
return r.set(ctx, data)
}
func (r *RedisRecorder) set(ctx context.Context, data []byte) error {
ctx, span := r.tracer.Start(ctx, "redisRecorder.set.push", trace.WithAttributes(
attribute.Int("updateBytes", len(data)),
))
defer span.End()
// Atomic, push and trim
tx := r.redis.TxPipeline()
tx.LPush(ctx, r.Key(), data)
tx.LTrim(ctx, r.Key(), 0, int64(r.keep)-1)
if rErr, err := tx.Exec(ctx); err != nil {
for _, cmd := range rErr {
span.RecordError(cmd.Err())
}
span.SetStatus(codes.Error, err.Error())
r.log.Err(err).Send()
return err
}
span.SetAttributes(attribute.Int("updateCount", r.count(ctx)))
span.SetStatus(codes.Ok, "")
return nil
}