support sensor name mapping
All checks were successful
Build and Publish / release (push) Successful in 4m6s
All checks were successful
Build and Publish / release (push) Successful in 4m6s
This commit is contained in:
@ -13,6 +13,7 @@ import (
|
||||
"github.com/rs/zerolog"
|
||||
"go.opentelemetry.io/otel/attribute"
|
||||
"go.opentelemetry.io/otel/codes"
|
||||
"go.opentelemetry.io/otel/trace"
|
||||
|
||||
"gitea.libretechconsulting.com/rmcguire/ambient-weather-local-exporter/pkg/ambient/config"
|
||||
"gitea.libretechconsulting.com/rmcguire/ambient-weather-local-exporter/pkg/provider"
|
||||
@ -67,6 +68,11 @@ func (aw *AmbientWeather) GetWundergroundHandlerFunc(appCtx context.Context) fun
|
||||
// stable type. Enrich is called on the type to complete
|
||||
// any missing fields as the two providers supported by Ambient
|
||||
// devices (awn/wunderground) produce different fields
|
||||
//
|
||||
// This will call Update on metrics, and will also proxy
|
||||
// requests to AWN/Wunderground if enabled
|
||||
// This is the main work performed when a weather station or
|
||||
// weather hub sends an update
|
||||
func (aw *AmbientWeather) handleProviderRequest(
|
||||
p provider.AmbientProvider,
|
||||
w http.ResponseWriter,
|
||||
@ -96,20 +102,17 @@ func (aw *AmbientWeather) handleProviderRequest(
|
||||
return
|
||||
}
|
||||
|
||||
// Calculate any fields that may be missing
|
||||
// such as dew point and wind chill
|
||||
update.Enrich()
|
||||
// Perform enrichment
|
||||
aw.enrichUpdate(ctx, p, update)
|
||||
|
||||
// Prepare metrics if this is the first update
|
||||
// Update metrics
|
||||
ctx, updateSpan := tracer.Start(ctx, p.Name()+".update.metrics")
|
||||
if aw.metrics == nil {
|
||||
aw.InitMetrics()
|
||||
}
|
||||
|
||||
// Enrich station if configured
|
||||
aw.enrichStation(update)
|
||||
|
||||
// Update metrics
|
||||
aw.metrics.Update(update)
|
||||
updateSpan.SetStatus(codes.Ok, "")
|
||||
updateSpan.End()
|
||||
|
||||
l.Debug().
|
||||
Str("provider", p.Name()).
|
||||
@ -120,44 +123,90 @@ func (aw *AmbientWeather) handleProviderRequest(
|
||||
// Proxy update to one or both services if configured to do so
|
||||
// Uses a weather update to allow awn to publish to wunderground and
|
||||
// visa versa.
|
||||
if station := update.StationConfig; station != nil {
|
||||
// Perform proxy updates in parallel if enabled
|
||||
var proxyWg sync.WaitGroup
|
||||
|
||||
if station.ProxyToAWN {
|
||||
proxyWg.Add(1)
|
||||
go func() {
|
||||
defer proxyWg.Done()
|
||||
err := aw.awnProvider.ProxyReq(ctx, update)
|
||||
if err != nil {
|
||||
zerolog.Ctx(aw.appCtx).Err(err).Msg("failed to proxy to ambient weather")
|
||||
return
|
||||
}
|
||||
zerolog.Ctx(aw.appCtx).Debug().
|
||||
Str("station", station.Name).
|
||||
Msg("proxied weather update to awn")
|
||||
}()
|
||||
}
|
||||
|
||||
if station.ProxyToWunderground {
|
||||
proxyWg.Add(1)
|
||||
go func() {
|
||||
defer proxyWg.Done()
|
||||
err := aw.wuProvider.ProxyReq(ctx, update)
|
||||
if err != nil {
|
||||
zerolog.Ctx(aw.appCtx).Err(err).Msg("failed to proxy to ambient weather")
|
||||
return
|
||||
}
|
||||
zerolog.Ctx(aw.appCtx).Debug().
|
||||
Str("station", station.Name).
|
||||
Msg("proxied weather update to wunderground")
|
||||
}()
|
||||
}
|
||||
|
||||
proxyWg.Wait()
|
||||
if update.StationConfig != nil {
|
||||
aw.proxyUpdate(ctx, p, update)
|
||||
}
|
||||
}
|
||||
|
||||
func (aw *AmbientWeather) enrichUpdate(
|
||||
ctx context.Context,
|
||||
p provider.AmbientProvider,
|
||||
update *weather.WeatherUpdate,
|
||||
) {
|
||||
tracer := otel.GetTracer(aw.appCtx, p.Name()+".http.handler")
|
||||
|
||||
// Calculate any fields that may be missing
|
||||
// such as dew point and wind chill
|
||||
ctx, enrichSpan := tracer.Start(ctx, p.Name()+".update.enrich")
|
||||
update.Enrich()
|
||||
|
||||
// Enrich station if configured
|
||||
aw.enrichStation(update)
|
||||
|
||||
// Map sensor names
|
||||
update.MapSensors()
|
||||
|
||||
enrichSpan.SetStatus(codes.Ok, "")
|
||||
enrichSpan.End()
|
||||
}
|
||||
|
||||
func (aw *AmbientWeather) proxyUpdate(
|
||||
ctx context.Context,
|
||||
p provider.AmbientProvider,
|
||||
update *weather.WeatherUpdate,
|
||||
) {
|
||||
var proxyWg sync.WaitGroup
|
||||
|
||||
tracer := otel.GetTracer(aw.appCtx, p.Name()+".http.handler")
|
||||
station := update.StationConfig
|
||||
|
||||
ctx, proxySpan := tracer.Start(ctx, p.Name()+".update.proxy", trace.WithAttributes(
|
||||
attribute.Bool("proxyToWunderground", station.ProxyToWunderground),
|
||||
attribute.Bool("proxyToAWN", station.ProxyToAWN),
|
||||
))
|
||||
defer proxySpan.End()
|
||||
|
||||
// Perform proxy updates in parallel if enabled
|
||||
|
||||
if station.ProxyToAWN {
|
||||
proxyWg.Add(1)
|
||||
go func() {
|
||||
defer proxyWg.Done()
|
||||
defer proxySpan.AddEvent("proxied to ambient weather network")
|
||||
err := aw.awnProvider.ProxyReq(ctx, update)
|
||||
if err != nil {
|
||||
zerolog.Ctx(aw.appCtx).Err(err).Msg("failed to proxy to ambient weather")
|
||||
proxySpan.RecordError(err)
|
||||
proxySpan.SetStatus(codes.Error, err.Error())
|
||||
return
|
||||
}
|
||||
zerolog.Ctx(aw.appCtx).Debug().
|
||||
Str("station", station.Name).
|
||||
Msg("proxied weather update to awn")
|
||||
}()
|
||||
}
|
||||
|
||||
if station.ProxyToWunderground {
|
||||
proxyWg.Add(1)
|
||||
go func() {
|
||||
defer proxyWg.Done()
|
||||
defer proxySpan.AddEvent("proxied to wunderground")
|
||||
err := aw.wuProvider.ProxyReq(ctx, update)
|
||||
if err != nil {
|
||||
zerolog.Ctx(aw.appCtx).Err(err).Msg("failed to proxy to ambient weather")
|
||||
proxySpan.RecordError(err)
|
||||
proxySpan.SetStatus(codes.Error, err.Error())
|
||||
return
|
||||
}
|
||||
zerolog.Ctx(aw.appCtx).Debug().
|
||||
Str("station", station.Name).
|
||||
Msg("proxied weather update to wunderground")
|
||||
}()
|
||||
}
|
||||
|
||||
proxyWg.Wait()
|
||||
}
|
||||
|
||||
func (aw *AmbientWeather) InitMetrics() {
|
||||
if aw.config.MetricPrefix != "" {
|
||||
weather.MetricPrefix = aw.config.MetricPrefix
|
||||
|
Reference in New Issue
Block a user