switch to iterators and client polling
Publish / release (push) Failing after 17m20s

This commit is contained in:
2026-06-22 22:20:32 -04:00
parent 24fe4258f7
commit f174c2922c
7 changed files with 154 additions and 143 deletions
-12
View File
@@ -88,7 +88,6 @@ type Client struct {
wingbits Endpoint
tar1090 *Endpoint
http *http.Client
streamBuf int
userAgent string
}
@@ -118,16 +117,6 @@ func WithHTTPClient(h *http.Client) Option {
return func(c *Client) { c.http = h }
}
// WithStreamBufferSize sets the buffer size of channels returned by the Stream*
// methods. The default is 1.
func WithStreamBufferSize(n int) Option {
return func(c *Client) {
if n >= 0 {
c.streamBuf = n
}
}
}
// WithUserAgent overrides the User-Agent header sent with requests.
func WithUserAgent(ua string) Option {
return func(c *Client) { c.userAgent = ua }
@@ -137,7 +126,6 @@ func WithUserAgent(ua string) Option {
// Host must be supplied via WithWingbitsEndpoint.
func New(opts ...Option) (*Client, error) {
c := &Client{
streamBuf: 1,
userAgent: "wingbits-go/1.0",
}
for _, opt := range opts {
+12 -12
View File
@@ -144,21 +144,21 @@ func TestClientWingbitsEndpoints(t *testing.T) {
}
}
func TestStreamAircraft(t *testing.T) {
func TestPollAircraft(t *testing.T) {
c := newTestClient(t)
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
ctx := t.Context()
ch := c.StreamAircraft(ctx, time.Hour)
select {
case u := <-ch:
if u.Err != nil || u.Value == nil || len(u.Value.Aircraft) == 0 {
t.Fatalf("first update bad: %v", u.Err)
// A long interval means the iterator only yields its immediate first sample;
// breaking out must stop polling cleanly (no goroutine, no hang).
var got int
for report, err := range c.PollAircraft(ctx, time.Hour, readsb.WithPosition()) {
if err != nil || report == nil || len(report.Aircraft) == 0 {
t.Fatalf("first poll bad: %v", err)
}
case <-time.After(5 * time.Second):
t.Fatal("no first update")
got++
break
}
cancel()
for range ch { // channel must drain and close after cancellation
if got != 1 {
t.Fatalf("expected exactly one sample before break, got %d", got)
}
}
+73
View File
@@ -0,0 +1,73 @@
package client
import (
"context"
"iter"
"time"
"gitea.libretechconsulting.com/rmcguire/wingbits/pkg/types/readsb"
"gitea.libretechconsulting.com/rmcguire/wingbits/pkg/types/wingbits"
)
// Poll turns any fetch into a pull-based stream: an iter.Seq2[R, error] that
// yields a fresh result immediately, then once per interval, until ctx is
// cancelled or the consumer breaks out of the range. Each step yields exactly
// one of a value or the error from that poll.
//
// It is the engine behind every Poll* method, exported so the same cadence can
// be applied to a custom or composed fetch without re-implementing the loop.
// Because the iterator runs in the consumer's goroutine, breaking the range
// stops polling immediately — there is no background goroutine to leak.
func Poll[R any](ctx context.Context, interval time.Duration, fetch func(context.Context) (R, error)) iter.Seq2[R, error] {
return func(yield func(R, error) bool) {
tick := time.NewTicker(interval)
defer tick.Stop()
for {
if !yield(fetch(ctx)) {
return
}
select {
case <-ctx.Done():
return
case <-tick.C:
}
}
}
}
// PollAircraft polls aircraft.json every interval, applying the given filters.
func (c *Client) PollAircraft(ctx context.Context, interval time.Duration, filters ...readsb.AircraftFilter) iter.Seq2[*readsb.AircraftReport, error] {
return Poll(ctx, interval, func(ctx context.Context) (*readsb.AircraftReport, error) {
return c.Aircraft(ctx, filters...)
})
}
// PollStats polls stats.json every interval.
func (c *Client) PollStats(ctx context.Context, interval time.Duration) iter.Seq2[*readsb.Stats, error] {
return Poll(ctx, interval, c.Stats)
}
// PollReceiver polls receiver.json every interval.
func (c *Client) PollReceiver(ctx context.Context, interval time.Duration) iter.Seq2[*readsb.Receiver, error] {
return Poll(ctx, interval, c.Receiver)
}
// PollOutline polls outline.json every interval.
func (c *Client) PollOutline(ctx context.Context, interval time.Duration) iter.Seq2[*readsb.Outline, error] {
return Poll(ctx, interval, c.Outline)
}
// PollDiagnostics polls /network/diagnostics every interval.
func (c *Client) PollDiagnostics(ctx context.Context, interval time.Duration) iter.Seq2[*wingbits.Diagnostics, error] {
return Poll(ctx, interval, c.Diagnostics)
}
// PollMetrics polls /metrics every interval.
func (c *Client) PollMetrics(ctx context.Context, interval time.Duration) iter.Seq2[*wingbits.Metrics, error] {
return Poll(ctx, interval, c.Metrics)
}
// PollStatus polls the Tailscale status every interval.
func (c *Client) PollStatus(ctx context.Context, interval time.Duration) iter.Seq2[*wingbits.TailscaleStatus, error] {
return Poll(ctx, interval, c.Status)
}
-85
View File
@@ -1,85 +0,0 @@
package client
import (
"context"
"time"
"gitea.libretechconsulting.com/rmcguire/wingbits/pkg/types/readsb"
"gitea.libretechconsulting.com/rmcguire/wingbits/pkg/types/wingbits"
)
// Update carries one streamed sample of T, or the error from the fetch that
// produced it. Exactly one of Value and Err is meaningful per send.
type Update[T any] struct {
Value *T
Err error
}
// StreamAircraft polls aircraft.json every interval and sends each decoded,
// filtered report on the returned channel until ctx is cancelled. The first
// sample is sent immediately. The channel is closed when ctx ends.
func (c *Client) StreamAircraft(ctx context.Context, interval time.Duration, filters ...readsb.AircraftFilter) <-chan Update[readsb.AircraftReport] {
return stream(ctx, c.streamBuf, interval, func(ctx context.Context) (*readsb.AircraftReport, error) {
return c.Aircraft(ctx, filters...)
})
}
// StreamStats polls stats.json every interval.
func (c *Client) StreamStats(ctx context.Context, interval time.Duration) <-chan Update[readsb.Stats] {
return stream(ctx, c.streamBuf, interval, c.Stats)
}
// StreamReceiver polls receiver.json every interval.
func (c *Client) StreamReceiver(ctx context.Context, interval time.Duration) <-chan Update[readsb.Receiver] {
return stream(ctx, c.streamBuf, interval, c.Receiver)
}
// StreamOutline polls outline.json every interval.
func (c *Client) StreamOutline(ctx context.Context, interval time.Duration) <-chan Update[readsb.Outline] {
return stream(ctx, c.streamBuf, interval, c.Outline)
}
// StreamDiagnostics polls /network/diagnostics every interval.
func (c *Client) StreamDiagnostics(ctx context.Context, interval time.Duration) <-chan Update[wingbits.Diagnostics] {
return stream(ctx, c.streamBuf, interval, c.Diagnostics)
}
// StreamMetrics polls /metrics every interval.
func (c *Client) StreamMetrics(ctx context.Context, interval time.Duration) <-chan Update[wingbits.Metrics] {
return stream(ctx, c.streamBuf, interval, c.Metrics)
}
// StreamStatus polls the Tailscale status every interval.
func (c *Client) StreamStatus(ctx context.Context, interval time.Duration) <-chan Update[wingbits.TailscaleStatus] {
return stream(ctx, c.streamBuf, interval, c.Status)
}
// stream drives a generic poll loop: it calls fetch immediately, then once per
// interval tick, forwarding each result as an Update on a buffered channel.
func stream[T any](ctx context.Context, buf int, interval time.Duration, fetch func(context.Context) (*T, error)) <-chan Update[T] {
ch := make(chan Update[T], buf)
go func() {
defer close(ch)
tick := time.NewTicker(interval)
defer tick.Stop()
for {
send(ctx, ch, fetch)
select {
case <-ctx.Done():
return
case <-tick.C:
}
}
}()
return ch
}
// send runs one fetch and delivers the result, respecting cancellation so a
// full channel cannot wedge the loop past ctx's lifetime.
func send[T any](ctx context.Context, ch chan<- Update[T], fetch func(context.Context) (*T, error)) {
v, err := fetch(ctx)
select {
case ch <- Update[T]{Value: v, Err: err}:
case <-ctx.Done():
}
}