This commit is contained in:
+18
-2
@@ -25,8 +25,18 @@ import (
|
||||
"time"
|
||||
)
|
||||
|
||||
// TODO: Implement client retry configuration
|
||||
// Also support exponential backoff
|
||||
// RetryConfig controls how transient request failures (transport errors and 5xx
|
||||
// responses) are retried. The zero value disables retries.
|
||||
type RetryConfig struct {
|
||||
// MaxRetries is the number of additional attempts made after the initial
|
||||
// request. Zero means no retries.
|
||||
MaxRetries int
|
||||
// BaseDelay is the wait before the first retry. It doubles on each
|
||||
// subsequent retry (exponential backoff). Zero means retry immediately.
|
||||
BaseDelay time.Duration
|
||||
// MaxDelay caps the per-retry backoff delay. Zero means no cap.
|
||||
MaxDelay time.Duration
|
||||
}
|
||||
|
||||
// Default ports exposed by an MGW310 station.
|
||||
const (
|
||||
@@ -92,6 +102,7 @@ type Client struct {
|
||||
tar1090 *Endpoint
|
||||
http *http.Client
|
||||
userAgent string
|
||||
retry RetryConfig
|
||||
}
|
||||
|
||||
// Option customizes a Client.
|
||||
@@ -125,6 +136,11 @@ func WithUserAgent(ua string) Option {
|
||||
return func(c *Client) { c.userAgent = ua }
|
||||
}
|
||||
|
||||
// WithRetry enables retries with exponential backoff for transient failures.
|
||||
func WithRetry(cfg RetryConfig) Option {
|
||||
return func(c *Client) { c.retry = cfg }
|
||||
}
|
||||
|
||||
// New constructs a Client from options. A Wingbits endpoint with a non-empty
|
||||
// Host must be supplied via WithWingbitsEndpoint.
|
||||
func New(opts ...Option) (*Client, error) {
|
||||
|
||||
@@ -144,6 +144,61 @@ func TestClientWingbitsEndpoints(t *testing.T) {
|
||||
}
|
||||
}
|
||||
|
||||
func TestClientRetry(t *testing.T) {
|
||||
// Fail with 503 for the first two attempts, then succeed; the client must
|
||||
// retry past the failures and return the eventual body.
|
||||
var hits int
|
||||
srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, _ *http.Request) {
|
||||
if hits++; hits <= 2 {
|
||||
http.Error(w, "warming up", http.StatusServiceUnavailable)
|
||||
return
|
||||
}
|
||||
w.Write([]byte(`{}`))
|
||||
}))
|
||||
t.Cleanup(srv.Close)
|
||||
|
||||
host, port := splitHostPort(t, strings.TrimPrefix(srv.URL, "http://"))
|
||||
c, err := New(
|
||||
WithWingbitsEndpoint(Endpoint{Host: host, Port: port}),
|
||||
WithRetry(RetryConfig{MaxRetries: 3, BaseDelay: time.Millisecond, MaxDelay: 5 * time.Millisecond}),
|
||||
)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
if _, err := c.Status(context.Background()); err != nil {
|
||||
t.Fatalf("status after retries: %v", err)
|
||||
}
|
||||
if hits != 3 {
|
||||
t.Fatalf("expected 3 attempts, got %d", hits)
|
||||
}
|
||||
}
|
||||
|
||||
func TestClientRetryGivesUp(t *testing.T) {
|
||||
// A 404 is not retryable: the client must fail after a single attempt even
|
||||
// with retries configured.
|
||||
var hits int
|
||||
srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, _ *http.Request) {
|
||||
hits++
|
||||
http.Error(w, "nope", http.StatusNotFound)
|
||||
}))
|
||||
t.Cleanup(srv.Close)
|
||||
|
||||
host, port := splitHostPort(t, strings.TrimPrefix(srv.URL, "http://"))
|
||||
c, err := New(
|
||||
WithWingbitsEndpoint(Endpoint{Host: host, Port: port}),
|
||||
WithRetry(RetryConfig{MaxRetries: 3, BaseDelay: time.Millisecond}),
|
||||
)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
if _, err := c.Status(context.Background()); err == nil {
|
||||
t.Fatal("expected error on 404")
|
||||
}
|
||||
if hits != 1 {
|
||||
t.Fatalf("expected 1 attempt for non-retryable status, got %d", hits)
|
||||
}
|
||||
}
|
||||
|
||||
func TestPollAircraft(t *testing.T) {
|
||||
c := newTestClient(t)
|
||||
ctx := t.Context()
|
||||
|
||||
+68
-1
@@ -3,9 +3,11 @@ package client
|
||||
import (
|
||||
"context"
|
||||
"encoding/json"
|
||||
"errors"
|
||||
"fmt"
|
||||
"io"
|
||||
"net/http"
|
||||
"time"
|
||||
|
||||
"gitea.libretechconsulting.com/rmcguire/wingbits/pkg/types/readsb"
|
||||
"gitea.libretechconsulting.com/rmcguire/wingbits/pkg/types/wingbits"
|
||||
@@ -79,8 +81,33 @@ func getJSON[T any](ctx context.Context, c *Client, url string) (*T, error) {
|
||||
return out, nil
|
||||
}
|
||||
|
||||
// statusError reports a non-200 HTTP status returned by a station.
|
||||
type statusError struct {
|
||||
url string
|
||||
status string
|
||||
code int
|
||||
}
|
||||
|
||||
func (e *statusError) Error() string {
|
||||
return fmt.Sprintf("wingbits: GET %s: unexpected status %s", e.url, e.status)
|
||||
}
|
||||
|
||||
// get performs a GET and returns the response body, which the caller must close.
|
||||
// Transient failures are retried per the client's RetryConfig.
|
||||
func (c *Client) get(ctx context.Context, url string) (io.ReadCloser, error) {
|
||||
for attempt := 0; ; attempt++ {
|
||||
body, err := c.doGet(ctx, url)
|
||||
if err == nil || attempt >= c.retry.MaxRetries || !retryable(err) {
|
||||
return body, err
|
||||
}
|
||||
if werr := c.waitBackoff(ctx, attempt); werr != nil {
|
||||
return nil, werr
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// doGet performs a single GET attempt.
|
||||
func (c *Client) doGet(ctx context.Context, url string) (io.ReadCloser, error) {
|
||||
req, err := http.NewRequestWithContext(ctx, http.MethodGet, url, nil)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("wingbits: building request: %w", err)
|
||||
@@ -92,7 +119,47 @@ func (c *Client) get(ctx context.Context, url string) (io.ReadCloser, error) {
|
||||
}
|
||||
if resp.StatusCode != http.StatusOK {
|
||||
resp.Body.Close()
|
||||
return nil, fmt.Errorf("wingbits: GET %s: unexpected status %s", url, resp.Status)
|
||||
return nil, &statusError{url: url, status: resp.Status, code: resp.StatusCode}
|
||||
}
|
||||
return resp.Body, nil
|
||||
}
|
||||
|
||||
// retryable reports whether err is a transient failure worth retrying: any
|
||||
// transport error, or a 5xx response. Context cancellation is never retried.
|
||||
func retryable(err error) bool {
|
||||
if err == nil || errors.Is(err, context.Canceled) || errors.Is(err, context.DeadlineExceeded) {
|
||||
return false
|
||||
}
|
||||
if se, ok := errors.AsType[*statusError](err); ok {
|
||||
return se.code >= 500
|
||||
}
|
||||
return true
|
||||
}
|
||||
|
||||
// waitBackoff sleeps for the attempt's backoff delay or until ctx is done.
|
||||
func (c *Client) waitBackoff(ctx context.Context, attempt int) error {
|
||||
t := time.NewTimer(c.backoff(attempt))
|
||||
defer t.Stop()
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return ctx.Err()
|
||||
case <-t.C:
|
||||
return nil
|
||||
}
|
||||
}
|
||||
|
||||
// backoff returns the delay before the given retry attempt (0-indexed),
|
||||
// doubling BaseDelay each attempt and capping at MaxDelay.
|
||||
func (c *Client) backoff(attempt int) time.Duration {
|
||||
d := c.retry.BaseDelay
|
||||
for range attempt {
|
||||
d *= 2
|
||||
if c.retry.MaxDelay > 0 && d >= c.retry.MaxDelay {
|
||||
return c.retry.MaxDelay
|
||||
}
|
||||
}
|
||||
if c.retry.MaxDelay > 0 && d > c.retry.MaxDelay {
|
||||
return c.retry.MaxDelay
|
||||
}
|
||||
return d
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user