Files
wingbits/pkg/client/stream.go
T

86 lines
3.0 KiB
Go

package client
import (
"context"
"time"
"gitea.libretechconsulting.com/rmcguire/wingbits/pkg/types/readsb"
"gitea.libretechconsulting.com/rmcguire/wingbits/pkg/types/wingbits"
)
// Update carries one streamed sample of T, or the error from the fetch that
// produced it. Exactly one of Value and Err is meaningful per send.
type Update[T any] struct {
Value *T
Err error
}
// StreamAircraft polls aircraft.json every interval and sends each decoded,
// filtered report on the returned channel until ctx is cancelled. The first
// sample is sent immediately. The channel is closed when ctx ends.
func (c *Client) StreamAircraft(ctx context.Context, interval time.Duration, filters ...readsb.AircraftFilter) <-chan Update[readsb.AircraftReport] {
return stream(ctx, c.streamBuf, interval, func(ctx context.Context) (*readsb.AircraftReport, error) {
return c.Aircraft(ctx, filters...)
})
}
// StreamStats polls stats.json every interval.
func (c *Client) StreamStats(ctx context.Context, interval time.Duration) <-chan Update[readsb.Stats] {
return stream(ctx, c.streamBuf, interval, c.Stats)
}
// StreamReceiver polls receiver.json every interval.
func (c *Client) StreamReceiver(ctx context.Context, interval time.Duration) <-chan Update[readsb.Receiver] {
return stream(ctx, c.streamBuf, interval, c.Receiver)
}
// StreamOutline polls outline.json every interval.
func (c *Client) StreamOutline(ctx context.Context, interval time.Duration) <-chan Update[readsb.Outline] {
return stream(ctx, c.streamBuf, interval, c.Outline)
}
// StreamDiagnostics polls /network/diagnostics every interval.
func (c *Client) StreamDiagnostics(ctx context.Context, interval time.Duration) <-chan Update[wingbits.Diagnostics] {
return stream(ctx, c.streamBuf, interval, c.Diagnostics)
}
// StreamMetrics polls /metrics every interval.
func (c *Client) StreamMetrics(ctx context.Context, interval time.Duration) <-chan Update[wingbits.Metrics] {
return stream(ctx, c.streamBuf, interval, c.Metrics)
}
// StreamStatus polls the Tailscale status every interval.
func (c *Client) StreamStatus(ctx context.Context, interval time.Duration) <-chan Update[wingbits.TailscaleStatus] {
return stream(ctx, c.streamBuf, interval, c.Status)
}
// stream drives a generic poll loop: it calls fetch immediately, then once per
// interval tick, forwarding each result as an Update on a buffered channel.
func stream[T any](ctx context.Context, buf int, interval time.Duration, fetch func(context.Context) (*T, error)) <-chan Update[T] {
ch := make(chan Update[T], buf)
go func() {
defer close(ch)
tick := time.NewTicker(interval)
defer tick.Stop()
for {
send(ctx, ch, fetch)
select {
case <-ctx.Done():
return
case <-tick.C:
}
}
}()
return ch
}
// send runs one fetch and delivers the result, respecting cancellation so a
// full channel cannot wedge the loop past ctx's lifetime.
func send[T any](ctx context.Context, ch chan<- Update[T], fetch func(context.Context) (*T, error)) {
v, err := fetch(ctx)
select {
case ch <- Update[T]{Value: v, Err: err}:
case <-ctx.Done():
}
}