improve redis recorder
All checks were successful
Build and Publish / check-chart (push) Successful in 44s
Build and Publish / helm-release (push) Has been skipped
Build and Publish / release (push) Successful in 3m17s

This commit is contained in:
Ryan McGuire 2025-03-23 11:16:54 -04:00
parent 2d683e61b0
commit 3b0748da25
5 changed files with 134 additions and 9 deletions

89
contrib/wu_sample.json Normal file
View File

@ -0,0 +1,89 @@
{
"PASSKEY": [
"DE:AD:BE:EF:BE:EF"
],
"baromabsin": [
"29.025"
],
"baromrelin": [
"30.005"
],
"batt1": [
"1"
],
"battin": [
"1"
],
"battout": [
"1"
],
"battrain": [
"1"
],
"dailyrainin": [
"0.000"
],
"dateutc": [
"2025-03-23 14:07:59"
],
"eventrainin": [
"0.000"
],
"hourlyrainin": [
"0.000"
],
"humidity": [
"55"
],
"humidity1": [
"53"
],
"humidityin": [
"36"
],
"maxdailygust": [
"20.36"
],
"monthlyrainin": [
"1.969"
],
"solarradiation": [
"142.15"
],
"stationtype": [
"WeatherHub_V1.0.2"
],
"temp1f": [
"-2.74"
],
"tempf": [
"33.98"
],
"tempinf": [
"70.52"
],
"totalrainin": [
"2.421"
],
"uv": [
"1"
],
"weeklyrainin": [
"0.000"
],
"winddir": [
"117"
],
"winddir_avg10m": [
"127"
],
"windgustmph": [
"17.22"
],
"windspeedmph": [
"10.29"
],
"yearlyrainin": [
"2.421"
]
}

9
contrib/wu_test_sample.sh Executable file
View File

@ -0,0 +1,9 @@
#!env sh
SCRIPT_DIR=$( dirname $0 )
JSON_FILE="${SCRIPT_DIR}/wu_sample.json"
BASE_URL="http://127.0.0.1:8080/data/report"
# Convert JSON to query parameters
QUERY_STRING=$(jq -r 'to_entries | map("\(.key)=\(.value[0] | @uri)") | join("&")' "$JSON_FILE")
curl -G --data "$QUERY_STRING" "$BASE_URL"

View File

@ -6,16 +6,28 @@ import (
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/trace"
"gitea.libretechconsulting.com/rmcguire/ambient-local-exporter/pkg/ambient/config"
"gitea.libretechconsulting.com/rmcguire/ambient-local-exporter/pkg/util"
"gitea.libretechconsulting.com/rmcguire/ambient-local-exporter/pkg/weather"
)
func (w *WeatherRecorder) Set(ctx context.Context, u *weather.WeatherUpdate) error {
if u.StationConfig == nil {
u.StationConfig = &config.WeatherStation{
Name: "unregistered",
Equipment: "unknown",
ProxyToAWN: false,
ProxyToWunderground: false,
KeepMetrics: []string{},
}
}
ctx, span := w.tracer.Start(ctx, "setRecorderUpdate", trace.WithAttributes(
attribute.String("stationName", u.StationConfig.Name),
attribute.String("stationType", util.DerefStr(u.StationType)),
attribute.String("stationEquipment", u.StationConfig.Equipment),
))
defer span.End()
return w.recorder.Set(ctx, u)
}

View File

@ -38,8 +38,9 @@ func (r *RedisRecorder) get(ctx context.Context, req *pb.GetWeatherRequest) (
}
span.SetAttributes(attribute.Int("limit", limit))
r.log.Debug().Int("limit", limit).Msg("getting updates from redis")
datas, err := r.redis.LRange(ctx, r.Key(), 0, int64(limit)).Result()
datas, err := r.redis.LRange(ctx, r.Key(), 0, int64(limit)-1).Result()
if err != nil {
span.RecordError(err)
span.SetStatus(codes.Error, err.Error())
@ -48,6 +49,7 @@ func (r *RedisRecorder) get(ctx context.Context, req *pb.GetWeatherRequest) (
}
span.AddEvent("redis queried")
r.log.Debug().Int("results", len(datas)).Msg("redis queried")
updates, err := jsonDatasToUpdates(datas)
if err != nil {
@ -87,7 +89,7 @@ func jsonDatasToUpdates(datas []string) ([]*weather.WeatherUpdate, error) {
err := json.Unmarshal([]byte(data), update)
errs = errors.Join(errs, err)
if err != nil {
if err == nil {
updates = append(updates, update)
}
}

View File

@ -4,6 +4,7 @@ import (
"context"
"encoding/json"
"github.com/redis/go-redis/v9"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/codes"
"go.opentelemetry.io/otel/trace"
@ -20,7 +21,7 @@ func (r *RedisRecorder) Set(ctx context.Context, u *weather.WeatherUpdate) error
defer span.End()
r.Lock()
defer r.RUnlock()
defer r.Unlock()
// First ensure we can prepare our payload
data, err := json.Marshal(u)
@ -42,11 +43,14 @@ func (r *RedisRecorder) set(ctx context.Context, data []byte) error {
defer span.End()
// Atomic, push and trim
tx := r.redis.TxPipeline()
tx.LPush(ctx, r.Key(), data)
tx.LTrim(ctx, r.Key(), 0, int64(r.keep)-1)
if rErr, err := tx.Exec(ctx); err != nil {
var count *redis.IntCmd
rErr, err := r.redis.TxPipelined(ctx, func(pipe redis.Pipeliner) error {
pipe.LPush(ctx, r.Key(), data)
pipe.LTrim(ctx, r.Key(), 0, int64(r.keep)-1)
count = pipe.LLen(ctx, r.Key())
return nil
})
if err != nil {
for _, cmd := range rErr {
span.RecordError(cmd.Err())
}
@ -55,7 +59,16 @@ func (r *RedisRecorder) set(ctx context.Context, data []byte) error {
return err
}
span.SetAttributes(attribute.Int("updateCount", r.count(ctx)))
// Get new update count
r.log.Debug().
Int("updateBytes", len(data)).
Int64("updateCount", count.Val()).
Str("redis", r.redis.String()).
Str("key", r.Key()).
Msg("pushed update to redis")
span.SetAttributes(attribute.Int64("updateCount", count.Val()))
span.SetStatus(codes.Ok, "")
return nil
}