Compare commits
5 Commits
Author | SHA1 | Date | |
---|---|---|---|
59f5bfbba1 | |||
0adab13221 | |||
3b0748da25 | |||
2d683e61b0 | |||
82bc3acfc3 |
9
TODO.md
9
TODO.md
@ -1,10 +1,15 @@
|
||||
# Issues
|
||||
- [x] Redis recorder panic
|
||||
|
||||
# TODO
|
||||
- [ ] Add json schema for config
|
||||
- [ ] Finish implementing weather GRPC
|
||||
- [ ] Stop Marshaling sensors / batteries with no data
|
||||
- [ ] Add json schema to CI and README
|
||||
- [ ] Update README
|
||||
- [ ] Add Grafana dashboard
|
||||
|
||||
## Done
|
||||
- [x] Finish implementing weather GRPC
|
||||
- [x] Add json schema for config
|
||||
- [x] Add new spans
|
||||
- [x] Helm Chart
|
||||
- [x] Add proxy to upstream support
|
||||
|
89
contrib/wu_sample.json
Normal file
89
contrib/wu_sample.json
Normal file
@ -0,0 +1,89 @@
|
||||
{
|
||||
"PASSKEY": [
|
||||
"DE:AD:BE:EF:BE:EF"
|
||||
],
|
||||
"baromabsin": [
|
||||
"29.025"
|
||||
],
|
||||
"baromrelin": [
|
||||
"30.005"
|
||||
],
|
||||
"batt1": [
|
||||
"1"
|
||||
],
|
||||
"battin": [
|
||||
"1"
|
||||
],
|
||||
"battout": [
|
||||
"1"
|
||||
],
|
||||
"battrain": [
|
||||
"1"
|
||||
],
|
||||
"dailyrainin": [
|
||||
"0.000"
|
||||
],
|
||||
"dateutc": [
|
||||
"2025-03-23 14:07:59"
|
||||
],
|
||||
"eventrainin": [
|
||||
"0.000"
|
||||
],
|
||||
"hourlyrainin": [
|
||||
"0.000"
|
||||
],
|
||||
"humidity": [
|
||||
"55"
|
||||
],
|
||||
"humidity1": [
|
||||
"53"
|
||||
],
|
||||
"humidityin": [
|
||||
"36"
|
||||
],
|
||||
"maxdailygust": [
|
||||
"20.36"
|
||||
],
|
||||
"monthlyrainin": [
|
||||
"1.969"
|
||||
],
|
||||
"solarradiation": [
|
||||
"142.15"
|
||||
],
|
||||
"stationtype": [
|
||||
"WeatherHub_V1.0.2"
|
||||
],
|
||||
"temp1f": [
|
||||
"-2.74"
|
||||
],
|
||||
"tempf": [
|
||||
"33.98"
|
||||
],
|
||||
"tempinf": [
|
||||
"70.52"
|
||||
],
|
||||
"totalrainin": [
|
||||
"2.421"
|
||||
],
|
||||
"uv": [
|
||||
"1"
|
||||
],
|
||||
"weeklyrainin": [
|
||||
"0.000"
|
||||
],
|
||||
"winddir": [
|
||||
"117"
|
||||
],
|
||||
"winddir_avg10m": [
|
||||
"127"
|
||||
],
|
||||
"windgustmph": [
|
||||
"17.22"
|
||||
],
|
||||
"windspeedmph": [
|
||||
"10.29"
|
||||
],
|
||||
"yearlyrainin": [
|
||||
"2.421"
|
||||
]
|
||||
}
|
9
contrib/wu_test_sample.sh
Executable file
9
contrib/wu_test_sample.sh
Executable file
@ -0,0 +1,9 @@
|
||||
#!env sh
|
||||
SCRIPT_DIR=$( dirname $0 )
|
||||
JSON_FILE="${SCRIPT_DIR}/wu_sample.json"
|
||||
BASE_URL="http://127.0.0.1:8080/data/report"
|
||||
|
||||
# Convert JSON to query parameters
|
||||
QUERY_STRING=$(jq -r 'to_entries | map("\(.key)=\(.value[0] | @uri)") | join("&")' "$JSON_FILE")
|
||||
|
||||
curl -G --data "$QUERY_STRING" "$BASE_URL"
|
@ -15,13 +15,13 @@ type: application
|
||||
# This is the chart version. This version number should be incremented each time you make changes
|
||||
# to the chart and its templates, including the app version.
|
||||
# Versions are expected to follow Semantic Versioning (https://semver.org/)
|
||||
version: 0.1.2
|
||||
version: 0.1.4
|
||||
|
||||
# This is the version number of the application being deployed. This version number should be
|
||||
# incremented each time you make changes to the application. Versions are not expected to
|
||||
# follow Semantic Versioning. They should reflect the version the application is using.
|
||||
# It is recommended to use it with quotes.
|
||||
appVersion: "v0.10.2"
|
||||
appVersion: "v0.11.2"
|
||||
|
||||
dependencies:
|
||||
- name: hull
|
||||
|
@ -1,6 +1,8 @@
|
||||
package grpc
|
||||
|
||||
import (
|
||||
"slices"
|
||||
|
||||
pb "gitea.libretechconsulting.com/rmcguire/ambient-local-exporter/api/v1alpha1/weather"
|
||||
"gitea.libretechconsulting.com/rmcguire/ambient-local-exporter/pkg/util"
|
||||
"gitea.libretechconsulting.com/rmcguire/ambient-local-exporter/pkg/weather"
|
||||
@ -47,24 +49,28 @@ func UpdateToPbUpdate(u *weather.WeatherUpdate) *pb.WeatherUpdate {
|
||||
}
|
||||
|
||||
func batteriesToPbBatteries(batteries []weather.BatteryStatus) []*pb.BatteryStatus {
|
||||
pbBatteries := make([]*pb.BatteryStatus, len(batteries))
|
||||
for i, b := range batteries {
|
||||
pbBatteries[i] = &pb.BatteryStatus{
|
||||
Component: b.Component,
|
||||
Status: util.Int32ptr(b.Status),
|
||||
pbBatteries := make([]*pb.BatteryStatus, 0, len(batteries))
|
||||
for _, b := range batteries {
|
||||
if b.Status != nil {
|
||||
pbBatteries = append(pbBatteries, &pb.BatteryStatus{
|
||||
Component: b.Component,
|
||||
Status: util.Int32ptr(b.Status),
|
||||
})
|
||||
}
|
||||
}
|
||||
return pbBatteries
|
||||
return slices.Clip(pbBatteries)
|
||||
}
|
||||
|
||||
func thSensorsToPbSensors(sensors []*weather.TempHumiditySensor) []*pb.TempHumiditySensor {
|
||||
pbSensors := make([]*pb.TempHumiditySensor, len(sensors))
|
||||
for i, s := range sensors {
|
||||
pbSensors[i] = &pb.TempHumiditySensor{
|
||||
Name: s.Name,
|
||||
TempF: s.TempF,
|
||||
Humidity: util.Int32ptr(s.Humidity),
|
||||
pbSensors := make([]*pb.TempHumiditySensor, 0, len(sensors))
|
||||
for _, s := range sensors {
|
||||
if s.TempF != nil || s.Humidity != nil {
|
||||
pbSensors = append(pbSensors, &pb.TempHumiditySensor{
|
||||
Name: s.Name,
|
||||
TempF: s.TempF,
|
||||
Humidity: util.Int32ptr(s.Humidity),
|
||||
})
|
||||
}
|
||||
}
|
||||
return pbSensors
|
||||
return slices.Clip(pbSensors)
|
||||
}
|
||||
|
@ -6,16 +6,28 @@ import (
|
||||
"go.opentelemetry.io/otel/attribute"
|
||||
"go.opentelemetry.io/otel/trace"
|
||||
|
||||
"gitea.libretechconsulting.com/rmcguire/ambient-local-exporter/pkg/ambient/config"
|
||||
"gitea.libretechconsulting.com/rmcguire/ambient-local-exporter/pkg/util"
|
||||
"gitea.libretechconsulting.com/rmcguire/ambient-local-exporter/pkg/weather"
|
||||
)
|
||||
|
||||
func (w *WeatherRecorder) Set(ctx context.Context, u *weather.WeatherUpdate) error {
|
||||
if u.StationConfig == nil {
|
||||
u.StationConfig = &config.WeatherStation{
|
||||
Name: "unregistered",
|
||||
Equipment: "unknown",
|
||||
ProxyToAWN: false,
|
||||
ProxyToWunderground: false,
|
||||
KeepMetrics: []string{},
|
||||
}
|
||||
}
|
||||
|
||||
ctx, span := w.tracer.Start(ctx, "setRecorderUpdate", trace.WithAttributes(
|
||||
attribute.String("stationName", u.StationConfig.Name),
|
||||
attribute.String("stationType", util.DerefStr(u.StationType)),
|
||||
attribute.String("stationEquipment", u.StationConfig.Equipment),
|
||||
))
|
||||
defer span.End()
|
||||
|
||||
return w.recorder.Set(ctx, u)
|
||||
}
|
||||
|
@ -38,8 +38,9 @@ func (r *RedisRecorder) get(ctx context.Context, req *pb.GetWeatherRequest) (
|
||||
}
|
||||
|
||||
span.SetAttributes(attribute.Int("limit", limit))
|
||||
r.log.Debug().Int("limit", limit).Msg("getting updates from redis")
|
||||
|
||||
datas, err := r.redis.LRange(ctx, r.Key(), 0, int64(limit)).Result()
|
||||
datas, err := r.redis.LRange(ctx, r.Key(), 0, int64(limit)-1).Result()
|
||||
if err != nil {
|
||||
span.RecordError(err)
|
||||
span.SetStatus(codes.Error, err.Error())
|
||||
@ -48,6 +49,7 @@ func (r *RedisRecorder) get(ctx context.Context, req *pb.GetWeatherRequest) (
|
||||
}
|
||||
|
||||
span.AddEvent("redis queried")
|
||||
r.log.Debug().Int("results", len(datas)).Msg("redis queried")
|
||||
|
||||
updates, err := jsonDatasToUpdates(datas)
|
||||
if err != nil {
|
||||
@ -75,7 +77,7 @@ func (r *RedisRecorder) get(ctx context.Context, req *pb.GetWeatherRequest) (
|
||||
Int("updatesFiltered", len(updates)-len(filtered)).
|
||||
Msg("updates retrieved from redis")
|
||||
|
||||
return updates, err
|
||||
return filtered, err
|
||||
}
|
||||
|
||||
func jsonDatasToUpdates(datas []string) ([]*weather.WeatherUpdate, error) {
|
||||
@ -87,7 +89,7 @@ func jsonDatasToUpdates(datas []string) ([]*weather.WeatherUpdate, error) {
|
||||
err := json.Unmarshal([]byte(data), update)
|
||||
errs = errors.Join(errs, err)
|
||||
|
||||
if err != nil {
|
||||
if err == nil {
|
||||
updates = append(updates, update)
|
||||
}
|
||||
}
|
||||
|
@ -4,6 +4,7 @@ import (
|
||||
"context"
|
||||
"encoding/json"
|
||||
|
||||
"github.com/redis/go-redis/v9"
|
||||
"go.opentelemetry.io/otel/attribute"
|
||||
"go.opentelemetry.io/otel/codes"
|
||||
"go.opentelemetry.io/otel/trace"
|
||||
@ -20,7 +21,7 @@ func (r *RedisRecorder) Set(ctx context.Context, u *weather.WeatherUpdate) error
|
||||
defer span.End()
|
||||
|
||||
r.Lock()
|
||||
defer r.RUnlock()
|
||||
defer r.Unlock()
|
||||
|
||||
// First ensure we can prepare our payload
|
||||
data, err := json.Marshal(u)
|
||||
@ -42,11 +43,14 @@ func (r *RedisRecorder) set(ctx context.Context, data []byte) error {
|
||||
defer span.End()
|
||||
|
||||
// Atomic, push and trim
|
||||
tx := r.redis.TxPipeline()
|
||||
tx.LPush(ctx, r.Key(), data)
|
||||
tx.LTrim(ctx, r.Key(), 0, int64(r.keep)-1)
|
||||
|
||||
if rErr, err := tx.Exec(ctx); err != nil {
|
||||
var count *redis.IntCmd
|
||||
rErr, err := r.redis.TxPipelined(ctx, func(pipe redis.Pipeliner) error {
|
||||
pipe.LPush(ctx, r.Key(), data)
|
||||
pipe.LTrim(ctx, r.Key(), 0, int64(r.keep)-1)
|
||||
count = pipe.LLen(ctx, r.Key())
|
||||
return nil
|
||||
})
|
||||
if err != nil {
|
||||
for _, cmd := range rErr {
|
||||
span.RecordError(cmd.Err())
|
||||
}
|
||||
@ -55,7 +59,16 @@ func (r *RedisRecorder) set(ctx context.Context, data []byte) error {
|
||||
return err
|
||||
}
|
||||
|
||||
span.SetAttributes(attribute.Int("updateCount", r.count(ctx)))
|
||||
// Get new update count
|
||||
|
||||
r.log.Debug().
|
||||
Int("updateBytes", len(data)).
|
||||
Int64("updateCount", count.Val()).
|
||||
Str("redis", r.redis.String()).
|
||||
Str("key", r.Key()).
|
||||
Msg("pushed update to redis")
|
||||
|
||||
span.SetAttributes(attribute.Int64("updateCount", count.Val()))
|
||||
span.SetStatus(codes.Ok, "")
|
||||
return nil
|
||||
}
|
||||
|
Reference in New Issue
Block a user