Ryan D McGuire f201ac1fca
All checks were successful
Build and Publish / release (push) Has been skipped
Build and Publish / check-chart (push) Successful in 10s
Build and Publish / helm-release (push) Has been skipped
support runtime.ServeMux opts for grpc-gateway
2025-03-25 17:05:59 -04:00

339 lines
9.5 KiB
Go

// This provides a shim between HTTP GET requests sent
// by ambient devices, and the providers that may be
// configured (awn, wunderground)
package ambient
import (
"context"
"fmt"
"net/http"
"sync"
"gitea.libretechconsulting.com/rmcguire/go-app/pkg/otel"
"github.com/rs/zerolog"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/codes"
"go.opentelemetry.io/otel/trace"
"gitea.libretechconsulting.com/rmcguire/ambient-local-exporter/pkg/ambient/config"
"gitea.libretechconsulting.com/rmcguire/ambient-local-exporter/pkg/provider"
"gitea.libretechconsulting.com/rmcguire/ambient-local-exporter/pkg/provider/awn"
"gitea.libretechconsulting.com/rmcguire/ambient-local-exporter/pkg/provider/wunderground"
"gitea.libretechconsulting.com/rmcguire/ambient-local-exporter/pkg/weather"
"gitea.libretechconsulting.com/rmcguire/ambient-local-exporter/pkg/weather/recorder"
"gitea.libretechconsulting.com/rmcguire/ambient-local-exporter/pkg/weather/recorder/recorders"
"gitea.libretechconsulting.com/rmcguire/ambient-local-exporter/pkg/weather/recorder/recorders/memory"
"gitea.libretechconsulting.com/rmcguire/ambient-local-exporter/pkg/weather/recorder/recorders/noop"
"gitea.libretechconsulting.com/rmcguire/ambient-local-exporter/pkg/weather/recorder/recorders/redis"
)
const defUpdatesToKeep = 120
type AmbientWeather struct {
// These providers implement support for the update sent
// when either "AmbientWeather" or "Wunderground" are selected
// in the "Custom" section of the AWNet app, or the web UI
// of an Ambient WeatherHub
Config *config.AmbientLocalExporterConfig
awnProvider provider.AmbientProvider
wuProvider provider.AmbientProvider
weatherRecorder *recorder.WeatherRecorder
appCtx context.Context
metrics *weather.WeatherMetrics
l *zerolog.Logger
*sync.RWMutex
}
func New(appCtx context.Context, awConfig *config.AmbientLocalExporterConfig) *AmbientWeather {
return &AmbientWeather{
Config: awConfig,
appCtx: appCtx,
RWMutex: &sync.RWMutex{},
}
}
// Initialize with defaults, set logger from context
func (aw *AmbientWeather) Init() *AmbientWeather {
tracer := otel.GetTracer(aw.appCtx, "ambientWeather")
_, span := tracer.Start(aw.appCtx, "ambientWeather.init",
trace.WithAttributes(
attribute.String("name", aw.Config.Name),
attribute.Bool("grpcEnabled", aw.Config.GRPCEnabled()),
attribute.Bool("httpEnabled", aw.Config.HTTPEnabled()),
attribute.Int("weatherStations", len(aw.Config.WeatherStations)),
))
defer span.End()
aw.awnProvider = &awn.AWNProvider{}
aw.wuProvider = &wunderground.WUProvider{}
aw.l = zerolog.Ctx(aw.appCtx)
updatesToKeep := defUpdatesToKeep
if aw.Config.UpdatesToKeep != nil && *aw.Config.UpdatesToKeep > 0 {
updatesToKeep = *aw.Config.UpdatesToKeep
}
span.SetAttributes(attribute.Int("updatesToKeep", updatesToKeep))
// Choose weather recorder for grpc / api requests,
// default is memory recorder
var r recorders.Recorder
if aw.Config == nil || aw.Config.RecorderConfig == nil {
r = &memory.MemoryRecorder{}
} else {
switch aw.Config.RecorderConfig.Type {
case config.TypeMemory:
r = &memory.MemoryRecorder{}
case config.TypeRedis:
r = &redis.RedisRecorder{}
case config.TypeNoop:
r = &noop.NoopRecorder{}
}
}
aw.weatherRecorder = recorder.MustNewWeatherRecorder(&recorder.Opts{
AppConfig: aw.Config,
Ctx: aw.appCtx,
KeepLast: updatesToKeep,
Recorder: r,
})
aw.l.Trace().Any("awConfig", aw.Config).Send()
span.SetStatus(codes.Ok, "")
return aw
}
func (aw *AmbientWeather) GetAWNHandlerFunc(appCtx context.Context) func(http.ResponseWriter, *http.Request) {
return func(w http.ResponseWriter, r *http.Request) {
aw.handleProviderRequest(aw.awnProvider, w, r)
}
}
func (aw *AmbientWeather) GetWundergroundHandlerFunc(appCtx context.Context) func(http.ResponseWriter, *http.Request) {
return func(w http.ResponseWriter, r *http.Request) {
aw.handleProviderRequest(aw.wuProvider, w, r)
}
}
// Takes an HTTP requests and converts it to a
// stable type. Enrich is called on the type to complete
// any missing fields as the two providers supported by Ambient
// devices (awn/wunderground) produce different fields
//
// This will call Update on metrics, and will also proxy
// requests to AWN/Wunderground if enabled
// This is the main work performed when a weather station or
// weather hub sends an update
func (aw *AmbientWeather) handleProviderRequest(
p provider.AmbientProvider,
w http.ResponseWriter,
r *http.Request,
) {
aw.Lock()
defer aw.Unlock()
l := zerolog.Ctx(aw.appCtx)
tracer := otel.GetTracer(aw.appCtx, p.Name()+".http.handler")
ctx, updateSpan := tracer.Start(r.Context(), p.Name()+".update")
updateSpan.SetAttributes(attribute.String("provider", p.Name()))
defer updateSpan.End()
l.Trace().Str("p", p.Name()).
Any("query", r.URL.Query()).Send()
// Convert to WeatherUpdate
update, err := p.ReqToWeather(ctx, r)
if err != nil {
l.Err(err).Send()
updateSpan.RecordError(err)
updateSpan.SetStatus(codes.Error,
fmt.Sprintf("failed to handle %s update: %s",
p.Name(), err.Error()))
w.WriteHeader(http.StatusInternalServerError)
w.Write([]byte(err.Error()))
return
}
// Perform enrichment
aw.enrichUpdate(ctx, p, update)
// We may know which station this was for now
if update.StationConfig != nil {
updateSpan.SetAttributes(attribute.String("stationName", update.StationConfig.Name))
}
// Concurrently record the now enriched update
// to the configured recorder and to otel, and proxy
// to enabled downstream providers
var updateWg sync.WaitGroup
// Record state
updateWg.Add(1)
go func() {
defer updateWg.Done()
aw.weatherRecorder.Set(ctx, update)
}()
// Update metrics
updateWg.Add(1)
go func() {
defer updateWg.Done()
aw.metricsUpdate(ctx, p, update)
}()
// Proxy update to one or both services if configured to do so
// Uses a weather update to allow awn to publish to wunderground and
// visa versa.
if update.StationConfig != nil {
updateWg.Add(1)
go func() {
defer updateWg.Done()
aw.proxyUpdate(ctx, p, update)
}()
}
updateWg.Wait()
l.Debug().
Str("provider", p.Name()).
Any("update", update).
Msg("successfully handled update")
w.Write([]byte("ok"))
}
func (aw *AmbientWeather) enrichUpdate(
ctx context.Context,
p provider.AmbientProvider,
update *weather.WeatherUpdate,
) {
tracer := otel.GetTracer(aw.appCtx, p.Name()+".http.handler")
// Calculate any fields that may be missing
// such as dew point and wind chill
_, enrichSpan := tracer.Start(ctx, p.Name()+".update.enrich")
defer enrichSpan.End()
// Metric enrichment
update.Enrich()
// Enrich station if configured
aw.enrichStation(update)
// Map sensor names
update.MapSensors()
enrichSpan.SetStatus(codes.Ok, "")
}
func (aw *AmbientWeather) metricsUpdate(
ctx context.Context,
p provider.AmbientProvider,
update *weather.WeatherUpdate,
) {
tracer := otel.GetTracer(aw.appCtx, p.Name()+".http.handler")
_, metricsSpan := tracer.Start(ctx, p.Name()+".update.metrics")
if aw.metrics == nil {
aw.InitMetrics()
}
aw.metrics.Update(update)
metricsSpan.SetStatus(codes.Ok, "")
metricsSpan.End()
}
func (aw *AmbientWeather) proxyUpdate(
ctx context.Context,
p provider.AmbientProvider,
update *weather.WeatherUpdate,
) {
var proxyWg sync.WaitGroup
tracer := otel.GetTracer(aw.appCtx, p.Name()+".http.handler")
station := update.StationConfig
ctx, proxySpan := tracer.Start(ctx, p.Name()+".update.proxy", trace.WithAttributes(
attribute.Bool("proxyToWunderground", station.ProxyToWunderground),
attribute.Bool("proxyToAWN", station.ProxyToAWN),
))
defer proxySpan.End()
// Perform proxy updates in parallel if enabled
if station.ProxyToAWN {
proxyWg.Add(1)
go func() {
defer proxyWg.Done()
defer proxySpan.AddEvent("proxied to ambient weather network")
err := aw.awnProvider.ProxyReq(ctx, update)
if err != nil {
zerolog.Ctx(aw.appCtx).Err(err).Msg("failed to proxy to ambient weather")
proxySpan.RecordError(err)
proxySpan.SetStatus(codes.Error, err.Error())
return
}
zerolog.Ctx(aw.appCtx).Debug().
Str("station", station.Name).
Msg("proxied weather update to awn")
}()
}
if station.ProxyToWunderground {
proxyWg.Add(1)
go func() {
defer proxyWg.Done()
defer proxySpan.AddEvent("proxied to wunderground")
err := aw.wuProvider.ProxyReq(ctx, update)
if err != nil {
zerolog.Ctx(aw.appCtx).Err(err).Msg("failed to proxy to ambient weather")
proxySpan.RecordError(err)
proxySpan.SetStatus(codes.Error, err.Error())
return
}
zerolog.Ctx(aw.appCtx).Debug().
Str("station", station.Name).
Msg("proxied weather update to wunderground")
}()
}
proxyWg.Wait()
}
func (aw *AmbientWeather) InitMetrics() {
if aw.Config.MetricPrefix != "" {
weather.MetricPrefix = aw.Config.MetricPrefix
}
aw.metrics = weather.MustInitMetrics(aw.appCtx)
}
func (aw *AmbientWeather) enrichStation(update *weather.WeatherUpdate) {
if update != nil && update.StationID != nil && *update.StationID != "" {
for _, station := range aw.Config.WeatherStations {
if *update.StationID == station.AWNPassKey || *update.StationID == station.WundergroundID {
update.StationConfig = &station
}
}
}
}
func (aw *AmbientWeather) GetLogger() *zerolog.Logger {
aw.RLock()
defer aw.RUnlock()
return aw.l
}
func (aw *AmbientWeather) GetCtx() context.Context {
aw.RLock()
defer aw.RUnlock()
return aw.appCtx
}
func (aw *AmbientWeather) GetRecorder() *recorder.WeatherRecorder {
aw.RLock()
defer aw.RUnlock()
return aw.weatherRecorder
}