Reference implementation
This commit is contained in:
@ -57,12 +57,14 @@ const (
|
||||
// HTTP Configuration
|
||||
type HTTPConfig struct {
|
||||
Listen string `yaml:"listen" env:"APP_HTTP_LISTEN" envDefault:"127.0.0.1:8080"`
|
||||
RequestTimeout int `yaml:"request_timeout" env:"APP_HTTP_REQUEST_TIMEOUT" envDefault:"30"`
|
||||
RequestTimeout int `yaml:"requestTimeout" env:"APP_HTTP_REQUEST_TIMEOUT" envDefault:"30"`
|
||||
}
|
||||
|
||||
// OTEL Configuration
|
||||
type OTELConfig struct {
|
||||
Enabled bool `yaml:"enabled" env:"APP_OTEL_ENABLED" envDefault:"true"`
|
||||
PrometheusEnabled bool `yaml:"prometheus_enabled" env:"APP_OTEL_PROMETHEUS_ENABLED" envDefault:"true"`
|
||||
PrometheusPath string `yaml:"prometheus_path" env:"APP_OTEL_PROMETHEUS_PATH" envDefault:"/metrics"`
|
||||
Enabled bool `yaml:"enabled" env:"APP_OTEL_ENABLED" envDefault:"true"`
|
||||
PrometheusEnabled bool `yaml:"prometheusEnabled" env:"APP_OTEL_PROMETHEUS_ENABLED" envDefault:"true"`
|
||||
PrometheusPath string `yaml:"prometheusPath" env:"APP_OTEL_PROMETHEUS_PATH" envDefault:"/metrics"`
|
||||
StdoutEnabled bool `yaml:"stdoutEnabled" env:"APP_OTEL_STDOUT_ENABLED" envDefault:"false"`
|
||||
MetricIntervalSecs int `yaml:"metricIntervalSecs" env:"APP_OTEL_METRIC_INTERVAL_SECS" envDefault:"15"`
|
||||
}
|
||||
|
54
pkg/otel/ctx.go
Normal file
54
pkg/otel/ctx.go
Normal file
@ -0,0 +1,54 @@
|
||||
package otel
|
||||
|
||||
import (
|
||||
"context"
|
||||
"errors"
|
||||
|
||||
"go.opentelemetry.io/otel/metric"
|
||||
"go.opentelemetry.io/otel/trace"
|
||||
)
|
||||
|
||||
type otelCtxKey uint8
|
||||
|
||||
const (
|
||||
ctxKeyTracer otelCtxKey = iota
|
||||
ctxKeyMeter
|
||||
)
|
||||
|
||||
func MustTracerFromCtx(ctx context.Context) trace.Tracer {
|
||||
ctxData := ctx.Value(ctxKeyTracer)
|
||||
if ctxData == nil {
|
||||
panic(errors.New("no tracer found in context"))
|
||||
}
|
||||
|
||||
tracer, ok := ctxData.(trace.Tracer)
|
||||
if !ok {
|
||||
panic(errors.New("invalid tracer found in context"))
|
||||
}
|
||||
|
||||
return tracer
|
||||
}
|
||||
|
||||
func AddTracerToCtx(ctx context.Context, tracer trace.Tracer) context.Context {
|
||||
ctx = context.WithValue(ctx, ctxKeyTracer, tracer)
|
||||
return ctx
|
||||
}
|
||||
|
||||
func MustMeterFromCtx(ctx context.Context) metric.Meter {
|
||||
ctxData := ctx.Value(ctxKeyMeter)
|
||||
if ctxData == nil {
|
||||
panic(errors.New("no meter found in context"))
|
||||
}
|
||||
|
||||
meter, ok := ctxData.(metric.Meter)
|
||||
if !ok {
|
||||
panic(errors.New("invalid meter found in context"))
|
||||
}
|
||||
|
||||
return meter
|
||||
}
|
||||
|
||||
func AddMeterToCtx(ctx context.Context, meter metric.Meter) context.Context {
|
||||
ctx = context.WithValue(ctx, ctxKeyMeter, meter)
|
||||
return ctx
|
||||
}
|
@ -1,4 +1,4 @@
|
||||
package observability
|
||||
package otel
|
||||
|
||||
import (
|
||||
"context"
|
||||
@ -14,31 +14,64 @@ import (
|
||||
"go.opentelemetry.io/otel/exporters/stdout/stdoutmetric"
|
||||
"go.opentelemetry.io/otel/exporters/stdout/stdouttrace"
|
||||
|
||||
"gitea.libretechconsulting.com/rmcguire/go-http-server-with-otel/pkg/config"
|
||||
|
||||
noopMetric "go.opentelemetry.io/otel/metric/noop"
|
||||
"go.opentelemetry.io/otel/propagation"
|
||||
"go.opentelemetry.io/otel/sdk/metric"
|
||||
"go.opentelemetry.io/otel/sdk/resource"
|
||||
traceSDK "go.opentelemetry.io/otel/sdk/trace"
|
||||
semconv "go.opentelemetry.io/otel/semconv/v1.24.0"
|
||||
trace "go.opentelemetry.io/otel/trace"
|
||||
"go.opentelemetry.io/otel/trace/noop"
|
||||
)
|
||||
|
||||
const defaultMetricExportInterval = 15 * time.Second
|
||||
|
||||
// OTEL Options
|
||||
var (
|
||||
EnableStdoutExporter Option = enableStdoutExporter{}
|
||||
EnableStdoutExporter Option = enableStdoutExporter{}
|
||||
EnablePrometheusExporter Option = enablePrometheusExporter{}
|
||||
// Overide the default metric export interval
|
||||
WithMetricExportInterval = func(interval time.Duration) Option {
|
||||
return exportInterval{interval: interval}
|
||||
}
|
||||
)
|
||||
|
||||
func Init(ctx context.Context, options ...Option) (shutdown func(context.Context) error) {
|
||||
var (
|
||||
shutdownFuncs []func(context.Context) error
|
||||
s = &settings{
|
||||
MetricExportInterval: defaultMetricExportInterval,
|
||||
const defMetricInterval = 15 * time.Second
|
||||
|
||||
// Context must carry config.AppConfig
|
||||
func Init(ctx context.Context) (context.Context, func(context.Context) error) {
|
||||
cfg := config.MustFromCtx(ctx)
|
||||
|
||||
// Nothing to do here if not enabled
|
||||
if !cfg.OTEL.Enabled {
|
||||
opentelemetry.SetMeterProvider(noopMetric.NewMeterProvider())
|
||||
opentelemetry.SetTracerProvider(noop.NewTracerProvider())
|
||||
return ctx, func(context.Context) error {
|
||||
return nil
|
||||
}
|
||||
}
|
||||
|
||||
var (
|
||||
metricInterval = defMetricInterval
|
||||
options = make([]Option, 0)
|
||||
s = &settings{}
|
||||
shutdownFuncs []func(context.Context) error
|
||||
)
|
||||
|
||||
// Prepare settings for OTEL from configuration
|
||||
if cfg.OTEL.StdoutEnabled {
|
||||
options = append(options, EnableStdoutExporter)
|
||||
}
|
||||
if cfg.OTEL.MetricIntervalSecs > 0 {
|
||||
metricInterval = time.Duration(cfg.OTEL.MetricIntervalSecs) * time.Second
|
||||
}
|
||||
if cfg.OTEL.PrometheusEnabled {
|
||||
options = append(options, EnablePrometheusExporter)
|
||||
}
|
||||
options = append(options,
|
||||
WithMetricExportInterval(metricInterval))
|
||||
|
||||
// Apply settings
|
||||
for _, opt := range options {
|
||||
opt.apply(s)
|
||||
}
|
||||
@ -46,7 +79,7 @@ func Init(ctx context.Context, options ...Option) (shutdown func(context.Context
|
||||
// shutdown calls cleanup functions registered via shutdownFuncs.
|
||||
// The errors from the calls are joined.
|
||||
// Each registered cleanup will be invoked once.
|
||||
shutdown = func(ctx context.Context) error {
|
||||
shutdown := func(ctx context.Context) error {
|
||||
var err error
|
||||
for _, fn := range shutdownFuncs {
|
||||
err = errors.Join(err, fn(ctx))
|
||||
@ -66,24 +99,30 @@ func Init(ctx context.Context, options ...Option) (shutdown func(context.Context
|
||||
meterProvider, err := s.newMeterProvider(ctx)
|
||||
if err != nil {
|
||||
handleErr(err)
|
||||
return
|
||||
return ctx, shutdown
|
||||
}
|
||||
|
||||
shutdownFuncs = append(shutdownFuncs, meterProvider.Shutdown)
|
||||
opentelemetry.SetMeterProvider(meterProvider)
|
||||
|
||||
meter := opentelemetry.Meter(cfg.Name)
|
||||
ctx = AddMeterToCtx(ctx, meter)
|
||||
|
||||
// Set up tracing
|
||||
opentelemetry.SetTextMapPropagator(newPropagator())
|
||||
var tracerProvider *traceSDK.TracerProvider
|
||||
tracerProvider, err = s.newTracerProvider(ctx)
|
||||
if err != nil {
|
||||
handleErr(err)
|
||||
return
|
||||
return ctx, shutdown
|
||||
}
|
||||
shutdownFuncs = append(shutdownFuncs, tracerProvider.Shutdown)
|
||||
opentelemetry.SetTracerProvider(tracerProvider)
|
||||
|
||||
return
|
||||
tracer := opentelemetry.Tracer(cfg.Name)
|
||||
ctx = AddTracerToCtx(ctx, tracer)
|
||||
|
||||
return ctx, shutdown
|
||||
}
|
||||
|
||||
func newPropagator() propagation.TextMapPropagator {
|
||||
@ -162,7 +201,7 @@ func (s *settings) newMeterProvider(ctx context.Context) (*metric.MeterProvider,
|
||||
metric.WithReader(
|
||||
metric.NewPeriodicReader(
|
||||
otlpExporter,
|
||||
metric.WithInterval(defaultMetricExportInterval),
|
||||
metric.WithInterval(s.MetricExportInterval),
|
||||
),
|
||||
),
|
||||
metric.WithResource(newResource()),
|
@ -1,10 +1,11 @@
|
||||
package observability
|
||||
package otel
|
||||
|
||||
import "time"
|
||||
|
||||
type settings struct {
|
||||
EnableStdoutExporter bool
|
||||
MetricExportInterval time.Duration
|
||||
EnableStdoutExporter bool
|
||||
EnablePrometheusExporter bool
|
||||
MetricExportInterval time.Duration
|
||||
}
|
||||
|
||||
type Option interface {
|
||||
@ -19,6 +20,14 @@ func (setting enableStdoutExporter) apply(o *settings) {
|
||||
o.EnableStdoutExporter = true
|
||||
}
|
||||
|
||||
type enablePrometheusExporter struct {
|
||||
Option
|
||||
}
|
||||
|
||||
func (setting enablePrometheusExporter) apply(o *settings) {
|
||||
o.EnablePrometheusExporter = true
|
||||
}
|
||||
|
||||
type exportInterval struct {
|
||||
Option
|
||||
interval time.Duration
|
30
pkg/otel/util.go
Normal file
30
pkg/otel/util.go
Normal file
@ -0,0 +1,30 @@
|
||||
package otel
|
||||
|
||||
import (
|
||||
"context"
|
||||
"strings"
|
||||
|
||||
"go.opentelemetry.io/otel"
|
||||
"go.opentelemetry.io/otel/metric"
|
||||
"go.opentelemetry.io/otel/trace"
|
||||
|
||||
"gitea.libretechconsulting.com/rmcguire/go-http-server-with-otel/pkg/config"
|
||||
)
|
||||
|
||||
func GetTracer(ctx context.Context, components ...string) trace.Tracer {
|
||||
return otel.Tracer(getName(ctx, components...))
|
||||
}
|
||||
|
||||
func GetMeter(ctx context.Context, components ...string) metric.Meter {
|
||||
return otel.Meter(getName(ctx, components...))
|
||||
}
|
||||
|
||||
func getName(ctx context.Context, components ...string) string {
|
||||
cfg := config.MustFromCtx(ctx)
|
||||
|
||||
path := make([]string, 0, len(components)+1)
|
||||
path = append(path, cfg.Name)
|
||||
path = append(path, components...)
|
||||
|
||||
return strings.Join(path, ".")
|
||||
}
|
126
pkg/srv/http.go
Normal file
126
pkg/srv/http.go
Normal file
@ -0,0 +1,126 @@
|
||||
package srv
|
||||
|
||||
import (
|
||||
"context"
|
||||
"net"
|
||||
"net/http"
|
||||
"time"
|
||||
|
||||
"github.com/prometheus/client_golang/prometheus/promhttp"
|
||||
"github.com/rs/zerolog"
|
||||
"go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp"
|
||||
"go.opentelemetry.io/otel/metric"
|
||||
"go.opentelemetry.io/otel/trace"
|
||||
|
||||
"gitea.libretechconsulting.com/rmcguire/go-http-server-with-otel/pkg/config"
|
||||
"gitea.libretechconsulting.com/rmcguire/go-http-server-with-otel/pkg/otel"
|
||||
)
|
||||
|
||||
var (
|
||||
httpMeter metric.Meter
|
||||
httpTracer trace.Tracer
|
||||
readTimeout = 10 * time.Second
|
||||
writeTimeout = 10 * time.Second
|
||||
idleTimeout = 15 * time.Second
|
||||
)
|
||||
|
||||
type HTTPFunc struct {
|
||||
Path string
|
||||
HandlerFunc http.HandlerFunc
|
||||
}
|
||||
|
||||
func prepHTTPServer(ctx context.Context, handleFuncs ...HTTPFunc) *http.Server {
|
||||
var (
|
||||
cfg = config.MustFromCtx(ctx)
|
||||
l = zerolog.Ctx(ctx)
|
||||
mux = &http.ServeMux{}
|
||||
)
|
||||
|
||||
// NOTE: Wraps handle func with otelhttp handler and
|
||||
// inserts route tag
|
||||
otelHandleFunc := func(pattern string, handlerFunc func(http.ResponseWriter, *http.Request)) {
|
||||
handler := otelhttp.WithRouteTag(pattern, http.HandlerFunc(handlerFunc))
|
||||
mux.Handle(pattern, handler) // Associate pattern with handler
|
||||
}
|
||||
|
||||
otelHandleFunc("/health", handleHealthCheckFunc(ctx))
|
||||
otelHandleFunc("/", handleHealthCheckFunc(ctx))
|
||||
|
||||
for _, f := range handleFuncs {
|
||||
otelHandleFunc(f.Path, f.HandlerFunc)
|
||||
}
|
||||
|
||||
// Prometheus metrics endpoint
|
||||
if cfg.OTEL.PrometheusEnabled {
|
||||
mux.Handle(cfg.OTEL.PrometheusPath, promhttp.Handler())
|
||||
l.Info().Str("prometheusPath", cfg.OTEL.PrometheusPath).
|
||||
Msg("mounted prometheus metrics endpoint")
|
||||
}
|
||||
|
||||
// Add OTEL, skip health-check spans
|
||||
// NOTE: Add any other span exclusions here
|
||||
handler := otelhttp.NewHandler(mux, "/",
|
||||
otelhttp.WithFilter(func(r *http.Request) bool {
|
||||
switch r.URL.Path {
|
||||
case "/health":
|
||||
return false
|
||||
case cfg.OTEL.PrometheusPath:
|
||||
return false
|
||||
default:
|
||||
return true
|
||||
}
|
||||
}))
|
||||
|
||||
return &http.Server{
|
||||
Addr: cfg.HTTP.Listen,
|
||||
ReadTimeout: readTimeout,
|
||||
WriteTimeout: writeTimeout,
|
||||
IdleTimeout: idleTimeout,
|
||||
Handler: handler,
|
||||
BaseContext: func(_ net.Listener) context.Context {
|
||||
return ctx
|
||||
},
|
||||
}
|
||||
}
|
||||
|
||||
// Returns a shutdown func and a done channel if the
|
||||
// server aborts abnormally. Panics on error.
|
||||
func MustInitHTTPServer(ctx context.Context, funcs ...HTTPFunc) (func(context.Context) error, <-chan interface{}) {
|
||||
shutdownFunc, doneChan, err := InitHTTPServer(ctx, funcs...)
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
return shutdownFunc, doneChan
|
||||
}
|
||||
|
||||
// Returns a shutdown func and a done channel if the
|
||||
// server aborts abnormally. Returns error on failure to start
|
||||
func InitHTTPServer(ctx context.Context, funcs ...HTTPFunc) (func(context.Context) error, <-chan interface{}, error) {
|
||||
l := zerolog.Ctx(ctx)
|
||||
doneChan := make(chan interface{})
|
||||
|
||||
var server *http.Server
|
||||
|
||||
httpMeter = otel.GetMeter(ctx, "http")
|
||||
httpTracer = otel.GetTracer(ctx, "http")
|
||||
|
||||
server = prepHTTPServer(ctx, funcs...)
|
||||
|
||||
go func() {
|
||||
l.Debug().Msg("HTTP Server Started")
|
||||
err := server.ListenAndServe()
|
||||
if err != nil && err != http.ErrServerClosed {
|
||||
l.Err(err).Msg("HTTP server error")
|
||||
} else {
|
||||
l.Info().Msg("HTTP server shut down")
|
||||
}
|
||||
doneChan <- nil
|
||||
}()
|
||||
|
||||
// Shut down http server with a deadline
|
||||
return func(shutdownCtx context.Context) error {
|
||||
l.Debug().Msg("stopping http server")
|
||||
server.Shutdown(shutdownCtx)
|
||||
return nil
|
||||
}, doneChan, nil
|
||||
}
|
59
pkg/srv/http_health.go
Normal file
59
pkg/srv/http_health.go
Normal file
@ -0,0 +1,59 @@
|
||||
package srv
|
||||
|
||||
import (
|
||||
"context"
|
||||
"errors"
|
||||
"math/rand"
|
||||
"net/http"
|
||||
"sync"
|
||||
"time"
|
||||
)
|
||||
|
||||
func handleHealthCheckFunc(_ context.Context) func(w http.ResponseWriter, r *http.Request) {
|
||||
// Return http handle func
|
||||
return func(w http.ResponseWriter, r *http.Request) {
|
||||
var err error
|
||||
var healthChecksFailed bool
|
||||
|
||||
// TODO: Insert useful health checks here
|
||||
// For multiple checks, perform concurrently
|
||||
// Consider using errors.Join() for multiple checks
|
||||
var hcWg sync.WaitGroup
|
||||
for range 5 {
|
||||
hcWg.Add(1)
|
||||
go func() {
|
||||
defer hcWg.Done()
|
||||
err = errors.Join(err, dummyHealthCheck(r.Context()))
|
||||
}()
|
||||
}
|
||||
hcWg.Wait()
|
||||
if err != nil {
|
||||
healthChecksFailed = true
|
||||
}
|
||||
|
||||
// TODO: Friendly reminder...
|
||||
err = errors.New("WARNING: Unimplemented health-check")
|
||||
|
||||
if healthChecksFailed {
|
||||
w.WriteHeader(http.StatusInternalServerError)
|
||||
}
|
||||
|
||||
if err != nil {
|
||||
w.Write([]byte(err.Error()))
|
||||
} else {
|
||||
w.Write([]byte("ok"))
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func dummyHealthCheck(ctx context.Context) error {
|
||||
workFor := rand.Intn(750)
|
||||
ticker := time.NewTicker(time.Duration(time.Duration(workFor) * time.Millisecond))
|
||||
|
||||
select {
|
||||
case <-ticker.C:
|
||||
return nil
|
||||
case <-ctx.Done():
|
||||
return ctx.Err()
|
||||
}
|
||||
}
|
Reference in New Issue
Block a user