Add OTEL
This commit is contained in:
		
							
								
								
									
										188
									
								
								pkg/observability/otel.go
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										188
									
								
								pkg/observability/otel.go
									
									
									
									
									
										Normal file
									
								
							@@ -0,0 +1,188 @@
 | 
			
		||||
package observability
 | 
			
		||||
 | 
			
		||||
import (
 | 
			
		||||
	"context"
 | 
			
		||||
	"errors"
 | 
			
		||||
	"fmt"
 | 
			
		||||
	"os"
 | 
			
		||||
	"time"
 | 
			
		||||
 | 
			
		||||
	opentelemetry "go.opentelemetry.io/otel"
 | 
			
		||||
	"go.opentelemetry.io/otel/exporters/otlp/otlpmetric/otlpmetricgrpc"
 | 
			
		||||
	"go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracegrpc"
 | 
			
		||||
	"go.opentelemetry.io/otel/exporters/prometheus"
 | 
			
		||||
	"go.opentelemetry.io/otel/exporters/stdout/stdoutmetric"
 | 
			
		||||
	"go.opentelemetry.io/otel/exporters/stdout/stdouttrace"
 | 
			
		||||
 | 
			
		||||
	"go.opentelemetry.io/otel/propagation"
 | 
			
		||||
	"go.opentelemetry.io/otel/sdk/metric"
 | 
			
		||||
	"go.opentelemetry.io/otel/sdk/resource"
 | 
			
		||||
	traceSDK "go.opentelemetry.io/otel/sdk/trace"
 | 
			
		||||
	semconv "go.opentelemetry.io/otel/semconv/v1.24.0"
 | 
			
		||||
	trace "go.opentelemetry.io/otel/trace"
 | 
			
		||||
)
 | 
			
		||||
 | 
			
		||||
const defaultMetricExportInterval = 15 * time.Second
 | 
			
		||||
 | 
			
		||||
var (
 | 
			
		||||
	EnableStdoutExporter Option = enableStdoutExporter{}
 | 
			
		||||
	// Overide the default metric export interval
 | 
			
		||||
	WithMetricExportInterval = func(interval time.Duration) Option {
 | 
			
		||||
		return exportInterval{interval: interval}
 | 
			
		||||
	}
 | 
			
		||||
)
 | 
			
		||||
 | 
			
		||||
func Init(ctx context.Context, options ...Option) (shutdown func(context.Context) error) {
 | 
			
		||||
	var (
 | 
			
		||||
		shutdownFuncs []func(context.Context) error
 | 
			
		||||
		s             = &settings{
 | 
			
		||||
			MetricExportInterval: defaultMetricExportInterval,
 | 
			
		||||
		}
 | 
			
		||||
	)
 | 
			
		||||
	for _, opt := range options {
 | 
			
		||||
		opt.apply(s)
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	// shutdown calls cleanup functions registered via shutdownFuncs.
 | 
			
		||||
	// The errors from the calls are joined.
 | 
			
		||||
	// Each registered cleanup will be invoked once.
 | 
			
		||||
	shutdown = func(ctx context.Context) error {
 | 
			
		||||
		var err error
 | 
			
		||||
		for _, fn := range shutdownFuncs {
 | 
			
		||||
			err = errors.Join(err, fn(ctx))
 | 
			
		||||
		}
 | 
			
		||||
		shutdownFuncs = nil
 | 
			
		||||
		return err
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	// handleErr calls shutdown for cleanup and makes sure that all errors are returned.
 | 
			
		||||
	handleErr := func(inErr error) {
 | 
			
		||||
		if err := errors.Join(inErr, shutdown(ctx)); err != nil {
 | 
			
		||||
			fmt.Fprintln(os.Stderr, "OTEL Error:", err)
 | 
			
		||||
		}
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	// Set up meter provider.
 | 
			
		||||
	meterProvider, err := s.newMeterProvider(ctx)
 | 
			
		||||
	if err != nil {
 | 
			
		||||
		handleErr(err)
 | 
			
		||||
		return
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	shutdownFuncs = append(shutdownFuncs, meterProvider.Shutdown)
 | 
			
		||||
	opentelemetry.SetMeterProvider(meterProvider)
 | 
			
		||||
 | 
			
		||||
	// Set up tracing
 | 
			
		||||
	opentelemetry.SetTextMapPropagator(newPropagator())
 | 
			
		||||
	var tracerProvider *traceSDK.TracerProvider
 | 
			
		||||
	tracerProvider, err = s.newTracerProvider(ctx)
 | 
			
		||||
	if err != nil {
 | 
			
		||||
		handleErr(err)
 | 
			
		||||
		return
 | 
			
		||||
	}
 | 
			
		||||
	shutdownFuncs = append(shutdownFuncs, tracerProvider.Shutdown)
 | 
			
		||||
	opentelemetry.SetTracerProvider(tracerProvider)
 | 
			
		||||
 | 
			
		||||
	return
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func newPropagator() propagation.TextMapPropagator {
 | 
			
		||||
	return propagation.NewCompositeTextMapPropagator(
 | 
			
		||||
		propagation.TraceContext{},
 | 
			
		||||
		propagation.Baggage{},
 | 
			
		||||
	)
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func (s *settings) newTracerProvider(ctx context.Context) (traceProvider *traceSDK.TracerProvider, err error) {
 | 
			
		||||
	traceOpts := []traceSDK.TracerProviderOption{
 | 
			
		||||
		traceSDK.WithResource(newResource()),
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	host, set := os.LookupEnv("OTEL_EXPORTER_OTLP_ENDPOINT")
 | 
			
		||||
	if set && host != "" {
 | 
			
		||||
		exporter, err := otlptracegrpc.New(ctx)
 | 
			
		||||
		if err != nil {
 | 
			
		||||
			return nil, err
 | 
			
		||||
		}
 | 
			
		||||
		traceOpts = append(traceOpts, traceSDK.WithBatcher(exporter))
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	if s.EnableStdoutExporter {
 | 
			
		||||
		stdoutExporter, err := stdouttrace.New(stdouttrace.WithPrettyPrint())
 | 
			
		||||
		if err != nil {
 | 
			
		||||
			return nil, err
 | 
			
		||||
		}
 | 
			
		||||
		traceOpts = append(traceOpts, traceSDK.WithBatcher(stdoutExporter))
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	traceProvider = traceSDK.NewTracerProvider(traceOpts...)
 | 
			
		||||
 | 
			
		||||
	return
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func newResource() *resource.Resource {
 | 
			
		||||
	return resource.NewWithAttributes(semconv.SchemaURL)
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
// Configures meter provider
 | 
			
		||||
// Always provides a prometheus metrics exporter
 | 
			
		||||
// Conditionally provides an OTLP metrics exporter
 | 
			
		||||
func (s *settings) newMeterProvider(ctx context.Context) (*metric.MeterProvider, error) {
 | 
			
		||||
	// OTEL Prometheus Exporter
 | 
			
		||||
	exporter, err := prometheus.New()
 | 
			
		||||
	if err != nil {
 | 
			
		||||
		return nil, err
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	metricOptions := make([]metric.Option, 0, 5)
 | 
			
		||||
	if s.EnableStdoutExporter {
 | 
			
		||||
		stdoutMetricExporter, err := stdoutmetric.New()
 | 
			
		||||
		if err != nil {
 | 
			
		||||
			return nil, err
 | 
			
		||||
		}
 | 
			
		||||
		metricOptions = append(metricOptions,
 | 
			
		||||
			metric.WithReader(metric.NewPeriodicReader(stdoutMetricExporter)),
 | 
			
		||||
		)
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	host, set := os.LookupEnv("OTEL_EXPORTER_OTLP_ENDPOINT")
 | 
			
		||||
	var otlpExporter *otlpmetricgrpc.Exporter
 | 
			
		||||
	if set && host != "" {
 | 
			
		||||
		if exp, err := otlpmetricgrpc.New(ctx, otlpmetricgrpc.WithInsecure()); err != nil {
 | 
			
		||||
			return nil, fmt.Errorf("otlpmetricgrpc.New: %w", err)
 | 
			
		||||
		} else {
 | 
			
		||||
			otlpExporter = exp
 | 
			
		||||
		}
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	var meterProvider *metric.MeterProvider
 | 
			
		||||
	if otlpExporter != nil {
 | 
			
		||||
		metricOptions = append(metricOptions,
 | 
			
		||||
			metric.WithReader(exporter),
 | 
			
		||||
			metric.WithReader(
 | 
			
		||||
				metric.NewPeriodicReader(
 | 
			
		||||
					otlpExporter,
 | 
			
		||||
					metric.WithInterval(defaultMetricExportInterval),
 | 
			
		||||
				),
 | 
			
		||||
			),
 | 
			
		||||
			metric.WithResource(newResource()),
 | 
			
		||||
		)
 | 
			
		||||
	} else {
 | 
			
		||||
		metricOptions = append(metricOptions,
 | 
			
		||||
			metric.WithReader(exporter),
 | 
			
		||||
			metric.WithResource(newResource()),
 | 
			
		||||
		)
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	meterProvider = metric.NewMeterProvider(metricOptions...)
 | 
			
		||||
 | 
			
		||||
	return meterProvider, nil
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
// Creates a new tracer from the global opentelemetry provider
 | 
			
		||||
func NewTracer(options ...trace.TracerOption) trace.Tracer {
 | 
			
		||||
	return opentelemetry.GetTracerProvider().Tracer(
 | 
			
		||||
		os.Getenv("OTEL_SERVICE_NAME"),
 | 
			
		||||
		options...,
 | 
			
		||||
	)
 | 
			
		||||
}
 | 
			
		||||
							
								
								
									
										29
									
								
								pkg/observability/settings.go
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										29
									
								
								pkg/observability/settings.go
									
									
									
									
									
										Normal file
									
								
							@@ -0,0 +1,29 @@
 | 
			
		||||
package observability
 | 
			
		||||
 | 
			
		||||
import "time"
 | 
			
		||||
 | 
			
		||||
type settings struct {
 | 
			
		||||
	EnableStdoutExporter bool
 | 
			
		||||
	MetricExportInterval time.Duration
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
type Option interface {
 | 
			
		||||
	apply(*settings)
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
type enableStdoutExporter struct {
 | 
			
		||||
	Option
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func (setting enableStdoutExporter) apply(o *settings) {
 | 
			
		||||
	o.EnableStdoutExporter = true
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
type exportInterval struct {
 | 
			
		||||
	Option
 | 
			
		||||
	interval time.Duration
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func (setting exportInterval) apply(o *settings) {
 | 
			
		||||
	o.MetricExportInterval = setting.interval
 | 
			
		||||
}
 | 
			
		||||
		Reference in New Issue
	
	Block a user