diff --git a/README.md b/README.md index 51c0a35..e644002 100644 --- a/README.md +++ b/README.md @@ -39,8 +39,8 @@ and validated against real hardware. lives under `/readsb/` on `:8088` and under `/data/` on the tar1090 root `:8504`. - **Composable aircraft filters** — position, altitude, range, squawk, emergency, callsign, category, MLAT, signal strength, and more, AND/OR‑composable. -- **Streaming built in.** Any endpoint becomes a `<-chan Update[T]` with a - configurable poll interval and buffer. +- **Streaming built in.** Any endpoint becomes a Go 1.23 iterator + (`iter.Seq2[*T, error]`) you can `range` over at a configurable interval. - **Bring your own everything** — `context.Context` on every call, pluggable `*http.Client`, per‑endpoint TLS, custom ports/schemes/paths. - **Zero non‑stdlib dependencies**, including a small Prometheus text parser for @@ -152,24 +152,47 @@ plus the `Not` and `Any` combinators. ### Streaming -Every endpoint has a `Stream*` variant. It emits immediately, then on each tick, -and closes the channel when the context is cancelled. Each `Update[T]` carries -either a value or the error from that poll. +Every endpoint has a `Poll*` variant that returns a Go 1.23 iterator +(`iter.Seq2[*T, error]`). It yields a fresh result immediately, then once per +interval, until the context is cancelled or you `break`. Each step yields either +a value or the error from that poll. ```go ctx, cancel := context.WithCancel(context.Background()) defer cancel() -for u := range c.StreamAircraft(ctx, 2*time.Second, readsb.WithPosition()) { - if u.Err != nil { - log.Printf("poll failed: %v", u.Err) +for report, err := range c.PollAircraft(ctx, 2*time.Second, readsb.WithPosition()) { + if err != nil { + log.Printf("poll failed: %v", err) continue } - log.Printf("%d aircraft in view", len(u.Value.Aircraft)) + log.Printf("%d aircraft in view", len(report.Aircraft)) } ``` -Set the channel buffer with `client.WithStreamBufferSize(n)`. +Because the iterator runs in your goroutine, breaking the loop stops polling +immediately — there is no background goroutine to leak and no buffer to tune. +Need a channel for fan-out or `select`? Wrap the iterator at the call site. + +The `Poll*` methods are thin wrappers over the exported generic engine, +`client.Poll`, so you never re-implement the interval loop — apply the same +cadence to any custom or composed fetch: + +```go +for v, err := range client.Poll(ctx, time.Second, func(ctx context.Context) (int, error) { + r, err := c.Aircraft(ctx, readsb.InEmergency()) + if err != nil { + return 0, err + } + return len(r.Aircraft), nil +}) { + log.Printf("emergencies: %d (err=%v)", v, err) +} +``` + +Filtering exposes an iterator too: `report.All(filters...)` yields matching +aircraft lazily (handy with `break` or `slices.Collect`), while +`report.Filter(...)` is the eager slice form. ### Station health & feeder telemetry @@ -245,7 +268,6 @@ c, err := client.New( Host: "station.example", Port: 8504, DataPath: "data", }), client.WithHTTPClient(myHTTPClient), // optional; TLS above is ignored if set - client.WithStreamBufferSize(8), client.WithUserAgent("my-app/1.0"), ) ``` diff --git a/cmd/wingbits/main.go b/cmd/wingbits/main.go index 5c685e7..decd32b 100644 --- a/cmd/wingbits/main.go +++ b/cmd/wingbits/main.go @@ -152,32 +152,36 @@ func summarize(v any) string { return "ok" } -// printOnce runs a single query under a timeout and prints the result as JSON. -func printOnce(ctx context.Context, timeout time.Duration, q query) error { +// callOnce runs a single query under a fresh timeout derived from ctx. +func callOnce(ctx context.Context, timeout time.Duration, q query) (any, error) { ctx, cancel := context.WithTimeout(ctx, timeout) defer cancel() - v, err := q(ctx) + return q(ctx) +} + +// printOnce runs a single query and prints the result as JSON. +func printOnce(ctx context.Context, timeout time.Duration, q query) error { + v, err := callOnce(ctx, timeout, q) if err != nil { return err } return printJSON(v) } -// pollLoop prints a fresh result immediately and then once per interval until -// the context is cancelled (Ctrl-C). Per-poll errors are logged, not fatal. +// pollLoop ranges over client.Poll — the same interval engine the typed Poll* +// methods use — printing each result as JSON until the context is cancelled +// (Ctrl-C). Per-poll errors are logged, not fatal. func pollLoop(ctx context.Context, cfg config, q query) error { - tick := time.NewTicker(cfg.interval) - defer tick.Stop() - for { - if err := printOnce(ctx, cfg.timeout, q); err != nil && ctx.Err() == nil { + fetch := func(ctx context.Context) (any, error) { return callOnce(ctx, cfg.timeout, q) } + for v, err := range client.Poll(ctx, cfg.interval, fetch) { + switch { + case err != nil && ctx.Err() == nil: fmt.Fprintln(os.Stderr, "wingbits:", err) - } - select { - case <-ctx.Done(): - return nil - case <-tick.C: + case err == nil: + printJSON(v) } } + return nil } func printJSON(v any) error { diff --git a/pkg/client/client.go b/pkg/client/client.go index 3783010..e43d164 100644 --- a/pkg/client/client.go +++ b/pkg/client/client.go @@ -88,7 +88,6 @@ type Client struct { wingbits Endpoint tar1090 *Endpoint http *http.Client - streamBuf int userAgent string } @@ -118,16 +117,6 @@ func WithHTTPClient(h *http.Client) Option { return func(c *Client) { c.http = h } } -// WithStreamBufferSize sets the buffer size of channels returned by the Stream* -// methods. The default is 1. -func WithStreamBufferSize(n int) Option { - return func(c *Client) { - if n >= 0 { - c.streamBuf = n - } - } -} - // WithUserAgent overrides the User-Agent header sent with requests. func WithUserAgent(ua string) Option { return func(c *Client) { c.userAgent = ua } @@ -137,7 +126,6 @@ func WithUserAgent(ua string) Option { // Host must be supplied via WithWingbitsEndpoint. func New(opts ...Option) (*Client, error) { c := &Client{ - streamBuf: 1, userAgent: "wingbits-go/1.0", } for _, opt := range opts { diff --git a/pkg/client/client_test.go b/pkg/client/client_test.go index aafb1da..453f92a 100644 --- a/pkg/client/client_test.go +++ b/pkg/client/client_test.go @@ -144,21 +144,21 @@ func TestClientWingbitsEndpoints(t *testing.T) { } } -func TestStreamAircraft(t *testing.T) { +func TestPollAircraft(t *testing.T) { c := newTestClient(t) - ctx, cancel := context.WithCancel(context.Background()) - defer cancel() + ctx := t.Context() - ch := c.StreamAircraft(ctx, time.Hour) - select { - case u := <-ch: - if u.Err != nil || u.Value == nil || len(u.Value.Aircraft) == 0 { - t.Fatalf("first update bad: %v", u.Err) + // A long interval means the iterator only yields its immediate first sample; + // breaking out must stop polling cleanly (no goroutine, no hang). + var got int + for report, err := range c.PollAircraft(ctx, time.Hour, readsb.WithPosition()) { + if err != nil || report == nil || len(report.Aircraft) == 0 { + t.Fatalf("first poll bad: %v", err) } - case <-time.After(5 * time.Second): - t.Fatal("no first update") + got++ + break } - cancel() - for range ch { // channel must drain and close after cancellation + if got != 1 { + t.Fatalf("expected exactly one sample before break, got %d", got) } } diff --git a/pkg/client/poll.go b/pkg/client/poll.go new file mode 100644 index 0000000..f9f42f9 --- /dev/null +++ b/pkg/client/poll.go @@ -0,0 +1,73 @@ +package client + +import ( + "context" + "iter" + "time" + + "gitea.libretechconsulting.com/rmcguire/wingbits/pkg/types/readsb" + "gitea.libretechconsulting.com/rmcguire/wingbits/pkg/types/wingbits" +) + +// Poll turns any fetch into a pull-based stream: an iter.Seq2[R, error] that +// yields a fresh result immediately, then once per interval, until ctx is +// cancelled or the consumer breaks out of the range. Each step yields exactly +// one of a value or the error from that poll. +// +// It is the engine behind every Poll* method, exported so the same cadence can +// be applied to a custom or composed fetch without re-implementing the loop. +// Because the iterator runs in the consumer's goroutine, breaking the range +// stops polling immediately — there is no background goroutine to leak. +func Poll[R any](ctx context.Context, interval time.Duration, fetch func(context.Context) (R, error)) iter.Seq2[R, error] { + return func(yield func(R, error) bool) { + tick := time.NewTicker(interval) + defer tick.Stop() + for { + if !yield(fetch(ctx)) { + return + } + select { + case <-ctx.Done(): + return + case <-tick.C: + } + } + } +} + +// PollAircraft polls aircraft.json every interval, applying the given filters. +func (c *Client) PollAircraft(ctx context.Context, interval time.Duration, filters ...readsb.AircraftFilter) iter.Seq2[*readsb.AircraftReport, error] { + return Poll(ctx, interval, func(ctx context.Context) (*readsb.AircraftReport, error) { + return c.Aircraft(ctx, filters...) + }) +} + +// PollStats polls stats.json every interval. +func (c *Client) PollStats(ctx context.Context, interval time.Duration) iter.Seq2[*readsb.Stats, error] { + return Poll(ctx, interval, c.Stats) +} + +// PollReceiver polls receiver.json every interval. +func (c *Client) PollReceiver(ctx context.Context, interval time.Duration) iter.Seq2[*readsb.Receiver, error] { + return Poll(ctx, interval, c.Receiver) +} + +// PollOutline polls outline.json every interval. +func (c *Client) PollOutline(ctx context.Context, interval time.Duration) iter.Seq2[*readsb.Outline, error] { + return Poll(ctx, interval, c.Outline) +} + +// PollDiagnostics polls /network/diagnostics every interval. +func (c *Client) PollDiagnostics(ctx context.Context, interval time.Duration) iter.Seq2[*wingbits.Diagnostics, error] { + return Poll(ctx, interval, c.Diagnostics) +} + +// PollMetrics polls /metrics every interval. +func (c *Client) PollMetrics(ctx context.Context, interval time.Duration) iter.Seq2[*wingbits.Metrics, error] { + return Poll(ctx, interval, c.Metrics) +} + +// PollStatus polls the Tailscale status every interval. +func (c *Client) PollStatus(ctx context.Context, interval time.Duration) iter.Seq2[*wingbits.TailscaleStatus, error] { + return Poll(ctx, interval, c.Status) +} diff --git a/pkg/client/stream.go b/pkg/client/stream.go deleted file mode 100644 index c23a0a9..0000000 --- a/pkg/client/stream.go +++ /dev/null @@ -1,85 +0,0 @@ -package client - -import ( - "context" - "time" - - "gitea.libretechconsulting.com/rmcguire/wingbits/pkg/types/readsb" - "gitea.libretechconsulting.com/rmcguire/wingbits/pkg/types/wingbits" -) - -// Update carries one streamed sample of T, or the error from the fetch that -// produced it. Exactly one of Value and Err is meaningful per send. -type Update[T any] struct { - Value *T - Err error -} - -// StreamAircraft polls aircraft.json every interval and sends each decoded, -// filtered report on the returned channel until ctx is cancelled. The first -// sample is sent immediately. The channel is closed when ctx ends. -func (c *Client) StreamAircraft(ctx context.Context, interval time.Duration, filters ...readsb.AircraftFilter) <-chan Update[readsb.AircraftReport] { - return stream(ctx, c.streamBuf, interval, func(ctx context.Context) (*readsb.AircraftReport, error) { - return c.Aircraft(ctx, filters...) - }) -} - -// StreamStats polls stats.json every interval. -func (c *Client) StreamStats(ctx context.Context, interval time.Duration) <-chan Update[readsb.Stats] { - return stream(ctx, c.streamBuf, interval, c.Stats) -} - -// StreamReceiver polls receiver.json every interval. -func (c *Client) StreamReceiver(ctx context.Context, interval time.Duration) <-chan Update[readsb.Receiver] { - return stream(ctx, c.streamBuf, interval, c.Receiver) -} - -// StreamOutline polls outline.json every interval. -func (c *Client) StreamOutline(ctx context.Context, interval time.Duration) <-chan Update[readsb.Outline] { - return stream(ctx, c.streamBuf, interval, c.Outline) -} - -// StreamDiagnostics polls /network/diagnostics every interval. -func (c *Client) StreamDiagnostics(ctx context.Context, interval time.Duration) <-chan Update[wingbits.Diagnostics] { - return stream(ctx, c.streamBuf, interval, c.Diagnostics) -} - -// StreamMetrics polls /metrics every interval. -func (c *Client) StreamMetrics(ctx context.Context, interval time.Duration) <-chan Update[wingbits.Metrics] { - return stream(ctx, c.streamBuf, interval, c.Metrics) -} - -// StreamStatus polls the Tailscale status every interval. -func (c *Client) StreamStatus(ctx context.Context, interval time.Duration) <-chan Update[wingbits.TailscaleStatus] { - return stream(ctx, c.streamBuf, interval, c.Status) -} - -// stream drives a generic poll loop: it calls fetch immediately, then once per -// interval tick, forwarding each result as an Update on a buffered channel. -func stream[T any](ctx context.Context, buf int, interval time.Duration, fetch func(context.Context) (*T, error)) <-chan Update[T] { - ch := make(chan Update[T], buf) - go func() { - defer close(ch) - tick := time.NewTicker(interval) - defer tick.Stop() - for { - send(ctx, ch, fetch) - select { - case <-ctx.Done(): - return - case <-tick.C: - } - } - }() - return ch -} - -// send runs one fetch and delivers the result, respecting cancellation so a -// full channel cannot wedge the loop past ctx's lifetime. -func send[T any](ctx context.Context, ch chan<- Update[T], fetch func(context.Context) (*T, error)) { - v, err := fetch(ctx) - select { - case ch <- Update[T]{Value: v, Err: err}: - case <-ctx.Done(): - } -} diff --git a/pkg/types/readsb/filter.go b/pkg/types/readsb/filter.go index 49d5cc6..bc55911 100644 --- a/pkg/types/readsb/filter.go +++ b/pkg/types/readsb/filter.go @@ -1,6 +1,8 @@ package readsb import ( + "iter" + "slices" "strings" "time" ) @@ -9,19 +11,26 @@ import ( // an aircraft is kept only if every filter passes (logical AND). type AircraftFilter func(*Aircraft) bool -// Filter returns the aircraft that satisfy all of the supplied filters. With no -// filters it returns every aircraft. The receiver report is never mutated. +// All yields each aircraft that satisfies every supplied filter, lazily and in +// order. With no filters it yields every aircraft. Ranging may break early; the +// receiver report is never mutated. +func (r *AircraftReport) All(filters ...AircraftFilter) iter.Seq[Aircraft] { + return func(yield func(Aircraft) bool) { + for i := range r.Aircraft { + if keep(&r.Aircraft[i], filters) && !yield(r.Aircraft[i]) { + return + } + } + } +} + +// Filter is the eager form of All: it collects the matching aircraft into a +// slice. With no filters it returns the report's own slice without copying. func (r *AircraftReport) Filter(filters ...AircraftFilter) []Aircraft { if len(filters) == 0 { return r.Aircraft } - out := make([]Aircraft, 0, len(r.Aircraft)) - for i := range r.Aircraft { - if keep(&r.Aircraft[i], filters) { - out = append(out, r.Aircraft[i]) - } - } - return out + return slices.Collect(r.All(filters...)) } func keep(a *Aircraft, filters []AircraftFilter) bool {