ambient-local-exporter/pkg/ambient/ambient.go

177 lines
5.1 KiB
Go

// This provides a shim between HTTP GET requests sent
// by ambient devices, and the providers that may be
// configured (awn, wunderground)
package ambient
import (
"context"
"fmt"
"net/http"
"sync"
"gitea.libretechconsulting.com/rmcguire/go-app/pkg/otel"
"github.com/rs/zerolog"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/codes"
"gitea.libretechconsulting.com/rmcguire/ambient-weather-local-exporter/pkg/ambient/config"
"gitea.libretechconsulting.com/rmcguire/ambient-weather-local-exporter/pkg/provider"
"gitea.libretechconsulting.com/rmcguire/ambient-weather-local-exporter/pkg/provider/awn"
"gitea.libretechconsulting.com/rmcguire/ambient-weather-local-exporter/pkg/provider/wunderground"
"gitea.libretechconsulting.com/rmcguire/ambient-weather-local-exporter/pkg/weather"
)
type AmbientWeather struct {
// These providers implement support for the update sent
// when either "AmbientWeather" or "Wunderground" are selected
// in the "Custom" section of the AWNet app, or the web UI
// of an Ambient WeatherHub
config *config.AmbientLocalExporterConfig
awnProvider provider.AmbientProvider
wuProvider provider.AmbientProvider
appCtx context.Context
metrics *weather.WeatherMetrics
l *zerolog.Logger
}
func New(appCtx context.Context, awConfig *config.AmbientLocalExporterConfig) *AmbientWeather {
return &AmbientWeather{
config: awConfig,
appCtx: appCtx,
}
}
// Initialize with defaults, set logger from context
func (aw *AmbientWeather) Init() *AmbientWeather {
aw.awnProvider = &awn.AWNProvider{}
aw.wuProvider = &wunderground.WUProvider{}
aw.l = zerolog.Ctx(aw.appCtx)
aw.l.Trace().Any("awConfig", aw.config).Send()
return aw
}
func (aw *AmbientWeather) GetAWNHandlerFunc(appCtx context.Context) func(http.ResponseWriter, *http.Request) {
return func(w http.ResponseWriter, r *http.Request) {
aw.handleProviderRequest(aw.awnProvider, w, r)
}
}
func (aw *AmbientWeather) GetWundergroundHandlerFunc(appCtx context.Context) func(http.ResponseWriter, *http.Request) {
return func(w http.ResponseWriter, r *http.Request) {
aw.handleProviderRequest(aw.wuProvider, w, r)
}
}
// Takes an HTTP requests and converts it to a
// stable type. Enrich is called on the type to complete
// any missing fields as the two providers supported by Ambient
// devices (awn/wunderground) produce different fields
func (aw *AmbientWeather) handleProviderRequest(
p provider.AmbientProvider,
w http.ResponseWriter,
r *http.Request,
) {
l := zerolog.Ctx(aw.appCtx)
tracer := otel.GetTracer(aw.appCtx, p.Name()+".http.handler")
ctx, span := tracer.Start(r.Context(), p.Name()+".update")
span.SetAttributes(attribute.String("provider", p.Name()))
defer span.End()
l.Trace().Str("p", p.Name()).
Any("query", r.URL.Query()).Send()
// Convert to WeatherUpdate
update, err := p.ReqToWeather(ctx, r)
if err != nil {
l.Err(err).Send()
span.RecordError(err)
span.SetStatus(codes.Error,
fmt.Sprintf("failed to handle %s update: %s",
p.Name(), err.Error()))
w.WriteHeader(http.StatusInternalServerError)
w.Write([]byte(err.Error()))
return
}
// Calculate any fields that may be missing
// such as dew point and wind chill
update.Enrich()
// Prepare metrics if this is the first update
if aw.metrics == nil {
aw.InitMetrics()
}
// Enrich station if configured
aw.enrichStation(update)
// Update metrics
aw.metrics.Update(update)
l.Debug().
Str("provider", p.Name()).
Any("update", update).
Msg("successfully handled update")
w.Write([]byte("ok"))
// Proxy update to one or both services if configured to do so
// Uses a weather update to allow awn to publish to wunderground and
// visa versa.
if station := update.StationConfig; station != nil {
// Perform proxy updates in parallel if enabled
var proxyWg sync.WaitGroup
if station.ProxyToAWN {
proxyWg.Add(1)
go func() {
defer proxyWg.Done()
err := aw.awnProvider.ProxyReq(ctx, update)
if err != nil {
zerolog.Ctx(aw.appCtx).Err(err).Msg("failed to proxy to ambient weather")
return
}
zerolog.Ctx(aw.appCtx).Debug().
Str("station", station.Name).
Msg("proxied weather update to awn")
}()
}
if station.ProxyToWunderground {
proxyWg.Add(1)
go func() {
defer proxyWg.Done()
err := aw.wuProvider.ProxyReq(ctx, update)
if err != nil {
zerolog.Ctx(aw.appCtx).Err(err).Msg("failed to proxy to ambient weather")
return
}
zerolog.Ctx(aw.appCtx).Debug().
Str("station", station.Name).
Msg("proxied weather update to wunderground")
}()
}
proxyWg.Wait()
}
}
func (aw *AmbientWeather) InitMetrics() {
if aw.config.MetricPrefix != "" {
weather.MetricPrefix = aw.config.MetricPrefix
}
aw.metrics = weather.MustInitMetrics(aw.appCtx)
}
func (aw *AmbientWeather) enrichStation(update *weather.WeatherUpdate) {
if update != nil && update.StationID != nil && *update.StationID != "" {
for _, station := range aw.config.WeatherStations {
if *update.StationID == station.AWNPassKey || *update.StationID == station.WundergroundID {
update.StationConfig = &station
}
}
}
}