diff --git a/contrib/wu_sample.json b/contrib/wu_sample.json new file mode 100644 index 0000000..ff00b03 --- /dev/null +++ b/contrib/wu_sample.json @@ -0,0 +1,89 @@ +{ + "PASSKEY": [ + "DE:AD:BE:EF:BE:EF" + ], + "baromabsin": [ + "29.025" + ], + "baromrelin": [ + "30.005" + ], + "batt1": [ + "1" + ], + "battin": [ + "1" + ], + "battout": [ + "1" + ], + "battrain": [ + "1" + ], + "dailyrainin": [ + "0.000" + ], + "dateutc": [ + "2025-03-23 14:07:59" + ], + "eventrainin": [ + "0.000" + ], + "hourlyrainin": [ + "0.000" + ], + "humidity": [ + "55" + ], + "humidity1": [ + "53" + ], + "humidityin": [ + "36" + ], + "maxdailygust": [ + "20.36" + ], + "monthlyrainin": [ + "1.969" + ], + "solarradiation": [ + "142.15" + ], + "stationtype": [ + "WeatherHub_V1.0.2" + ], + "temp1f": [ + "-2.74" + ], + "tempf": [ + "33.98" + ], + "tempinf": [ + "70.52" + ], + "totalrainin": [ + "2.421" + ], + "uv": [ + "1" + ], + "weeklyrainin": [ + "0.000" + ], + "winddir": [ + "117" + ], + "winddir_avg10m": [ + "127" + ], + "windgustmph": [ + "17.22" + ], + "windspeedmph": [ + "10.29" + ], + "yearlyrainin": [ + "2.421" + ] +} diff --git a/contrib/wu_test_sample.sh b/contrib/wu_test_sample.sh new file mode 100755 index 0000000..d44506f --- /dev/null +++ b/contrib/wu_test_sample.sh @@ -0,0 +1,9 @@ +#!env sh +SCRIPT_DIR=$( dirname $0 ) +JSON_FILE="${SCRIPT_DIR}/wu_sample.json" +BASE_URL="http://127.0.0.1:8080/data/report" + +# Convert JSON to query parameters +QUERY_STRING=$(jq -r 'to_entries | map("\(.key)=\(.value[0] | @uri)") | join("&")' "$JSON_FILE") + +curl -G --data "$QUERY_STRING" "$BASE_URL" diff --git a/pkg/weather/recorder/recorder_set.go b/pkg/weather/recorder/recorder_set.go index b711b6a..39fdb3c 100644 --- a/pkg/weather/recorder/recorder_set.go +++ b/pkg/weather/recorder/recorder_set.go @@ -6,16 +6,28 @@ import ( "go.opentelemetry.io/otel/attribute" "go.opentelemetry.io/otel/trace" + "gitea.libretechconsulting.com/rmcguire/ambient-local-exporter/pkg/ambient/config" "gitea.libretechconsulting.com/rmcguire/ambient-local-exporter/pkg/util" "gitea.libretechconsulting.com/rmcguire/ambient-local-exporter/pkg/weather" ) func (w *WeatherRecorder) Set(ctx context.Context, u *weather.WeatherUpdate) error { + if u.StationConfig == nil { + u.StationConfig = &config.WeatherStation{ + Name: "unregistered", + Equipment: "unknown", + ProxyToAWN: false, + ProxyToWunderground: false, + KeepMetrics: []string{}, + } + } + ctx, span := w.tracer.Start(ctx, "setRecorderUpdate", trace.WithAttributes( attribute.String("stationName", u.StationConfig.Name), attribute.String("stationType", util.DerefStr(u.StationType)), attribute.String("stationEquipment", u.StationConfig.Equipment), )) defer span.End() + return w.recorder.Set(ctx, u) } diff --git a/pkg/weather/recorder/recorders/redis/get.go b/pkg/weather/recorder/recorders/redis/get.go index 2c28f0b..7f3af16 100644 --- a/pkg/weather/recorder/recorders/redis/get.go +++ b/pkg/weather/recorder/recorders/redis/get.go @@ -38,8 +38,9 @@ func (r *RedisRecorder) get(ctx context.Context, req *pb.GetWeatherRequest) ( } span.SetAttributes(attribute.Int("limit", limit)) + r.log.Debug().Int("limit", limit).Msg("getting updates from redis") - datas, err := r.redis.LRange(ctx, r.Key(), 0, int64(limit)).Result() + datas, err := r.redis.LRange(ctx, r.Key(), 0, int64(limit)-1).Result() if err != nil { span.RecordError(err) span.SetStatus(codes.Error, err.Error()) @@ -48,6 +49,7 @@ func (r *RedisRecorder) get(ctx context.Context, req *pb.GetWeatherRequest) ( } span.AddEvent("redis queried") + r.log.Debug().Int("results", len(datas)).Msg("redis queried") updates, err := jsonDatasToUpdates(datas) if err != nil { @@ -87,7 +89,7 @@ func jsonDatasToUpdates(datas []string) ([]*weather.WeatherUpdate, error) { err := json.Unmarshal([]byte(data), update) errs = errors.Join(errs, err) - if err != nil { + if err == nil { updates = append(updates, update) } } diff --git a/pkg/weather/recorder/recorders/redis/set.go b/pkg/weather/recorder/recorders/redis/set.go index 21ab231..d94bbb7 100644 --- a/pkg/weather/recorder/recorders/redis/set.go +++ b/pkg/weather/recorder/recorders/redis/set.go @@ -4,6 +4,7 @@ import ( "context" "encoding/json" + "github.com/redis/go-redis/v9" "go.opentelemetry.io/otel/attribute" "go.opentelemetry.io/otel/codes" "go.opentelemetry.io/otel/trace" @@ -20,7 +21,7 @@ func (r *RedisRecorder) Set(ctx context.Context, u *weather.WeatherUpdate) error defer span.End() r.Lock() - defer r.RUnlock() + defer r.Unlock() // First ensure we can prepare our payload data, err := json.Marshal(u) @@ -42,11 +43,14 @@ func (r *RedisRecorder) set(ctx context.Context, data []byte) error { defer span.End() // Atomic, push and trim - tx := r.redis.TxPipeline() - tx.LPush(ctx, r.Key(), data) - tx.LTrim(ctx, r.Key(), 0, int64(r.keep)-1) - - if rErr, err := tx.Exec(ctx); err != nil { + var count *redis.IntCmd + rErr, err := r.redis.TxPipelined(ctx, func(pipe redis.Pipeliner) error { + pipe.LPush(ctx, r.Key(), data) + pipe.LTrim(ctx, r.Key(), 0, int64(r.keep)-1) + count = pipe.LLen(ctx, r.Key()) + return nil + }) + if err != nil { for _, cmd := range rErr { span.RecordError(cmd.Err()) } @@ -55,7 +59,16 @@ func (r *RedisRecorder) set(ctx context.Context, data []byte) error { return err } - span.SetAttributes(attribute.Int("updateCount", r.count(ctx))) + // Get new update count + + r.log.Debug(). + Int("updateBytes", len(data)). + Int64("updateCount", count.Val()). + Str("redis", r.redis.String()). + Str("key", r.Key()). + Msg("pushed update to redis") + + span.SetAttributes(attribute.Int64("updateCount", count.Val())) span.SetStatus(codes.Ok, "") return nil }