implement grpc weather

This commit is contained in:
2025-03-14 23:10:01 -04:00
parent 421c246cc6
commit 69fd27916b
7 changed files with 447 additions and 169 deletions

View File

@ -0,0 +1,49 @@
package grpc
import (
"k8s.io/utils/ptr"
pb "gitea.libretechconsulting.com/rmcguire/ambient-local-exporter/api/v1alpha1/weather"
"gitea.libretechconsulting.com/rmcguire/ambient-local-exporter/pkg/weather"
)
func UpdatesToPbUpdates(u []*weather.WeatherUpdate) []*pb.WeatherUpdate {
updates := make([]*pb.WeatherUpdate, len(u))
for i, update := range u {
updates[i] = UpdateToPbUpdate(update)
}
return updates
}
// TODO: Finish Implementing
func UpdateToPbUpdate(u *weather.WeatherUpdate) *pb.WeatherUpdate {
return &pb.WeatherUpdate{
StationName: u.StationConfig.Name,
StationType: *u.StationType,
StationId: *u.StationID,
TempOutdoorF: u.TempOutdoorF,
TempIndoorF: u.TempIndoorF,
HumidityOutdoor: ptr.To(int32(*u.HumidityOudoor)),
HumidityIndoor: ptr.To(int32(*u.HumidityIndoor)),
WindSpeedMph: new(float64),
WindGustMph: new(float64),
MaxDailyGust: new(float64),
WindDir: new(int32),
WindDirAvg_10M: new(int32),
Uv: new(int32),
SolarRadiation: new(float64),
HourlyRainIn: new(float64),
EventRainIn: new(float64),
DailyRainIn: new(float64),
WeeklyRainIn: new(float64),
MonthlyRainIn: new(float64),
YearlyRainIn: new(float64),
TotalRainIn: new(float64),
Batteries: []*pb.BatteryStatus{},
BaromRelativeIn: new(float64),
BaromAbsoluteIn: new(float64),
DewPointF: new(float64),
WindChillF: new(float64),
TempHumiditySensors: []*pb.TempHumiditySensor{},
}
}

View File

@ -3,11 +3,65 @@ package grpc
import (
"context"
"go.opentelemetry.io/otel/attribute"
otelcodes "go.opentelemetry.io/otel/codes"
"go.opentelemetry.io/otel/trace"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
"google.golang.org/protobuf/types/known/timestamppb"
"gitea.libretechconsulting.com/rmcguire/go-app/pkg/otel"
pb "gitea.libretechconsulting.com/rmcguire/ambient-local-exporter/api/v1alpha1/weather"
"gitea.libretechconsulting.com/rmcguire/ambient-local-exporter/pkg/weather/state"
)
// TODO: Implement
type GRPCWeather struct {
ctx context.Context
ctx context.Context
state *state.WeatherState
tracer trace.Tracer
*pb.UnimplementedAmbientLocalWeatherServiceServer
}
func NewGRPCWeather(ctx context.Context, state *state.WeatherState) *GRPCWeather {
return &GRPCWeather{
ctx: ctx,
state: state,
tracer: otel.GetTracer(ctx, "grpcWeather"),
}
}
func (w *GRPCWeather) GetWeather(ctx context.Context, req *pb.GetWeatherRequest) (
*pb.GetWeatherResponse, error,
) {
ctx, span := w.tracer.Start(ctx, "getWeather")
defer span.End()
limit := 1
if req.Limit != nil {
if *req.Limit > 1 {
limit = int(*req.Limit)
}
}
span.SetAttributes(attribute.Int("limit", limit))
updates, err := w.state.Get(ctx, limit)
if err != nil {
span.RecordError(err)
span.SetStatus(otelcodes.Error, err.Error())
return nil, status.Errorf(codes.Internal, err.Error())
} else if len(updates) < 1 {
return nil, status.Errorf(codes.OutOfRange, "no weather available")
}
span.SetStatus(otelcodes.Ok, "")
return &pb.GetWeatherResponse{
LastUpdated: &timestamppb.Timestamp{
Seconds: updates[0].DateUTC.Unix(),
},
WeatherUpdates: UpdatesToPbUpdates(updates),
}, nil
}

View File

@ -0,0 +1,41 @@
package state
import (
"context"
"sync"
"go.opentelemetry.io/otel/metric"
"go.opentelemetry.io/otel/trace"
"gitea.libretechconsulting.com/rmcguire/go-app/pkg/otel"
"gitea.libretechconsulting.com/rmcguire/ambient-local-exporter/pkg/weather"
)
type WeatherState struct {
updates []*weather.WeatherUpdate
keep int
ctx context.Context
tracer trace.Tracer
meter metric.Meter
*sync.RWMutex
}
type Opts struct {
Ctx context.Context
KeepLast int
}
func NewWeatherState(opts *Opts) *WeatherState {
if opts.KeepLast < 1 {
opts.KeepLast = 1
}
return &WeatherState{
updates: make([]*weather.WeatherUpdate, 0),
keep: opts.KeepLast,
ctx: opts.Ctx,
tracer: otel.GetTracer(opts.Ctx, "weatherState"),
meter: otel.GetMeter(opts.Ctx, "weatherState"),
}
}

View File

@ -0,0 +1,87 @@
package state
import (
"context"
"errors"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/codes"
"go.opentelemetry.io/otel/trace"
"gitea.libretechconsulting.com/rmcguire/ambient-local-exporter/pkg/weather"
)
// Returns last requested number of weather updates
func (w *WeatherState) Get(ctx context.Context, last int) (
[]*weather.WeatherUpdate, error,
) {
if last < 1 {
last = 1
}
ctx, span := w.tracer.Start(ctx, "getWeatherState")
span.SetAttributes(
attribute.Int("last", last),
attribute.Int("keep", w.keep),
attribute.Int("currentSize", w.Count()),
)
defer span.End()
updates, err := w.get(ctx, last)
if err != nil {
span.RecordError(err)
span.SetStatus(codes.Error, err.Error())
} else {
span.SetStatus(codes.Ok, "")
}
return updates, err
}
func (w *WeatherState) get(ctx context.Context, last int) (
[]*weather.WeatherUpdate, error,
) {
span := trace.SpanFromContext(ctx)
w.RLock()
defer w.Unlock()
span.AddEvent("acquired lock on state cache")
updates := w.updates
if w.count() == 0 {
err := errors.New("no state to get")
span.RecordError(err)
return nil, err
} else if w.count() <= last {
span.RecordError(errors.New("requested more state than exists"))
} else {
updates = updates[len(updates)-last:]
}
span.SetAttributes(attribute.Int("retrieved", len(updates)))
span.SetStatus(codes.Ok, "")
return updates, nil
}
// Returns count of retained weather updates
func (w *WeatherState) Count() int {
_, span := w.tracer.Start(w.ctx, "countWeatherState")
defer span.End()
count := w.count()
span.SetAttributes(attribute.Int("count", count))
span.SetStatus(codes.Ok, "")
return count
}
func (w *WeatherState) count() int {
w.RLock()
defer w.RUnlock()
return len(w.updates)
}

View File

@ -0,0 +1,35 @@
package state
import (
"context"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/trace"
"gitea.libretechconsulting.com/rmcguire/ambient-local-exporter/pkg/weather"
)
func (w *WeatherState) Set(ctx context.Context, u *weather.WeatherUpdate) error {
_, span := w.tracer.Start(ctx, "setWeatherState")
span.SetAttributes(
attribute.Int("countWeatherUpdates", w.Count()),
attribute.Int("keepUpdates", w.keep),
)
defer span.End()
return w.set(span, u)
}
func (w *WeatherState) set(span trace.Span, u *weather.WeatherUpdate) error {
w.Lock()
defer w.Unlock()
if len(w.updates) > w.keep {
w.updates = w.updates[1:]
span.AddEvent("trimmed state updates by 1")
}
w.updates = append(w.updates, u)
span.AddEvent("recorded weather state")
return nil
}