177 lines
5.1 KiB
Go
177 lines
5.1 KiB
Go
// This provides a shim between HTTP GET requests sent
|
|
// by ambient devices, and the providers that may be
|
|
// configured (awn, wunderground)
|
|
package ambient
|
|
|
|
import (
|
|
"context"
|
|
"fmt"
|
|
"net/http"
|
|
"sync"
|
|
|
|
"gitea.libretechconsulting.com/rmcguire/go-app/pkg/otel"
|
|
"github.com/rs/zerolog"
|
|
"go.opentelemetry.io/otel/attribute"
|
|
"go.opentelemetry.io/otel/codes"
|
|
|
|
"gitea.libretechconsulting.com/rmcguire/ambient-weather-local-exporter/pkg/ambient/config"
|
|
"gitea.libretechconsulting.com/rmcguire/ambient-weather-local-exporter/pkg/provider"
|
|
"gitea.libretechconsulting.com/rmcguire/ambient-weather-local-exporter/pkg/provider/awn"
|
|
"gitea.libretechconsulting.com/rmcguire/ambient-weather-local-exporter/pkg/provider/wunderground"
|
|
"gitea.libretechconsulting.com/rmcguire/ambient-weather-local-exporter/pkg/weather"
|
|
)
|
|
|
|
type AmbientWeather struct {
|
|
// These providers implement support for the update sent
|
|
// when either "AmbientWeather" or "Wunderground" are selected
|
|
// in the "Custom" section of the AWNet app, or the web UI
|
|
// of an Ambient WeatherHub
|
|
config *config.AmbientLocalExporterConfig
|
|
awnProvider provider.AmbientProvider
|
|
wuProvider provider.AmbientProvider
|
|
appCtx context.Context
|
|
metrics *weather.WeatherMetrics
|
|
l *zerolog.Logger
|
|
}
|
|
|
|
func New(appCtx context.Context, awConfig *config.AmbientLocalExporterConfig) *AmbientWeather {
|
|
return &AmbientWeather{
|
|
config: awConfig,
|
|
appCtx: appCtx,
|
|
}
|
|
}
|
|
|
|
// Initialize with defaults, set logger from context
|
|
func (aw *AmbientWeather) Init() *AmbientWeather {
|
|
aw.awnProvider = &awn.AWNProvider{}
|
|
aw.wuProvider = &wunderground.WUProvider{}
|
|
aw.l = zerolog.Ctx(aw.appCtx)
|
|
|
|
aw.l.Trace().Any("awConfig", aw.config).Send()
|
|
return aw
|
|
}
|
|
|
|
func (aw *AmbientWeather) GetAWNHandlerFunc(appCtx context.Context) func(http.ResponseWriter, *http.Request) {
|
|
return func(w http.ResponseWriter, r *http.Request) {
|
|
aw.handleProviderRequest(aw.awnProvider, w, r)
|
|
}
|
|
}
|
|
|
|
func (aw *AmbientWeather) GetWundergroundHandlerFunc(appCtx context.Context) func(http.ResponseWriter, *http.Request) {
|
|
return func(w http.ResponseWriter, r *http.Request) {
|
|
aw.handleProviderRequest(aw.wuProvider, w, r)
|
|
}
|
|
}
|
|
|
|
// Takes an HTTP requests and converts it to a
|
|
// stable type. Enrich is called on the type to complete
|
|
// any missing fields as the two providers supported by Ambient
|
|
// devices (awn/wunderground) produce different fields
|
|
func (aw *AmbientWeather) handleProviderRequest(
|
|
p provider.AmbientProvider,
|
|
w http.ResponseWriter,
|
|
r *http.Request,
|
|
) {
|
|
l := zerolog.Ctx(aw.appCtx)
|
|
tracer := otel.GetTracer(aw.appCtx, p.Name()+".http.handler")
|
|
|
|
ctx, span := tracer.Start(r.Context(), p.Name()+".update")
|
|
span.SetAttributes(attribute.String("provider", p.Name()))
|
|
defer span.End()
|
|
|
|
l.Trace().Str("p", p.Name()).
|
|
Any("query", r.URL.Query()).Send()
|
|
|
|
// Convert to WeatherUpdate
|
|
update, err := p.ReqToWeather(ctx, r)
|
|
if err != nil {
|
|
l.Err(err).Send()
|
|
span.RecordError(err)
|
|
span.SetStatus(codes.Error,
|
|
fmt.Sprintf("failed to handle %s update: %s",
|
|
p.Name(), err.Error()))
|
|
|
|
w.WriteHeader(http.StatusInternalServerError)
|
|
w.Write([]byte(err.Error()))
|
|
return
|
|
}
|
|
|
|
// Calculate any fields that may be missing
|
|
// such as dew point and wind chill
|
|
update.Enrich()
|
|
|
|
// Prepare metrics if this is the first update
|
|
if aw.metrics == nil {
|
|
aw.InitMetrics()
|
|
}
|
|
|
|
// Enrich station if configured
|
|
aw.enrichStation(update)
|
|
|
|
// Update metrics
|
|
aw.metrics.Update(update)
|
|
|
|
l.Debug().
|
|
Str("provider", p.Name()).
|
|
Any("update", update).
|
|
Msg("successfully handled update")
|
|
w.Write([]byte("ok"))
|
|
|
|
// Proxy update to one or both services if configured to do so
|
|
// Uses a weather update to allow awn to publish to wunderground and
|
|
// visa versa.
|
|
if station := update.StationConfig; station != nil {
|
|
// Perform proxy updates in parallel if enabled
|
|
var proxyWg sync.WaitGroup
|
|
|
|
if station.ProxyToAWN {
|
|
proxyWg.Add(1)
|
|
go func() {
|
|
defer proxyWg.Done()
|
|
err := aw.awnProvider.ProxyReq(ctx, update)
|
|
if err != nil {
|
|
zerolog.Ctx(aw.appCtx).Err(err).Msg("failed to proxy to ambient weather")
|
|
return
|
|
}
|
|
zerolog.Ctx(aw.appCtx).Debug().
|
|
Str("station", station.Name).
|
|
Msg("proxied weather update to awn")
|
|
}()
|
|
}
|
|
|
|
if station.ProxyToWunderground {
|
|
proxyWg.Add(1)
|
|
go func() {
|
|
defer proxyWg.Done()
|
|
err := aw.wuProvider.ProxyReq(ctx, update)
|
|
if err != nil {
|
|
zerolog.Ctx(aw.appCtx).Err(err).Msg("failed to proxy to ambient weather")
|
|
return
|
|
}
|
|
zerolog.Ctx(aw.appCtx).Debug().
|
|
Str("station", station.Name).
|
|
Msg("proxied weather update to wunderground")
|
|
}()
|
|
}
|
|
|
|
proxyWg.Wait()
|
|
}
|
|
}
|
|
|
|
func (aw *AmbientWeather) InitMetrics() {
|
|
if aw.config.MetricPrefix != "" {
|
|
weather.MetricPrefix = aw.config.MetricPrefix
|
|
}
|
|
aw.metrics = weather.MustInitMetrics(aw.appCtx)
|
|
}
|
|
|
|
func (aw *AmbientWeather) enrichStation(update *weather.WeatherUpdate) {
|
|
if update != nil && update.StationID != nil && *update.StationID != "" {
|
|
for _, station := range aw.config.WeatherStations {
|
|
if *update.StationID == station.AWNPassKey || *update.StationID == station.WundergroundID {
|
|
update.StationConfig = &station
|
|
}
|
|
}
|
|
}
|
|
}
|