From 5aa5dda111933f68110cdd3f2a11709fcd239959 Mon Sep 17 00:00:00 2001 From: Ryan McGuire Date: Fri, 7 Mar 2025 11:16:32 -0500 Subject: [PATCH] add grpc support --- go.mod | 1 + go.sum | 2 + pkg/app/app_types.go | 6 ++- pkg/srv/grpc/grpc.go | 53 +++++++++++++++----- pkg/srv/grpc/grpc_logger.go | 51 +++++++++++++++++++ pkg/srv/grpc/grpc_prepare.go | 96 ++++++++++++++++++++++++++++++++++++ pkg/srv/grpc/grpc_run.go | 11 +++++ 7 files changed, 205 insertions(+), 15 deletions(-) create mode 100644 pkg/srv/grpc/grpc_logger.go create mode 100644 pkg/srv/grpc/grpc_prepare.go create mode 100644 pkg/srv/grpc/grpc_run.go diff --git a/go.mod b/go.mod index 2e353a8..030b9fe 100644 --- a/go.mod +++ b/go.mod @@ -4,6 +4,7 @@ go 1.23.4 require ( github.com/caarlos0/env/v11 v11.3.1 + github.com/grpc-ecosystem/go-grpc-middleware/v2 v2.3.1 github.com/prometheus/client_golang v1.20.5 github.com/rs/zerolog v1.33.0 go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc v0.60.0 diff --git a/go.sum b/go.sum index 03cc49f..71ee7a4 100644 --- a/go.sum +++ b/go.sum @@ -23,6 +23,8 @@ github.com/google/go-cmp v0.6.0 h1:ofyhxvXcZhMsU5ulbFiLKl/XBFqE1GSq7atu8tAmTRI= github.com/google/go-cmp v0.6.0/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY= github.com/google/uuid v1.6.0 h1:NIvaJDMOsjHA8n1jAhLSgzrAzy1Hgr+hNrb57e+94F0= github.com/google/uuid v1.6.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= +github.com/grpc-ecosystem/go-grpc-middleware/v2 v2.3.1 h1:KcFzXwzM/kGhIRHvc8jdixfIJjVzuUJdnv+5xsPutog= +github.com/grpc-ecosystem/go-grpc-middleware/v2 v2.3.1/go.mod h1:qOchhhIlmRcqk/O9uCo/puJlyo07YINaIqdZfZG3Jkc= github.com/grpc-ecosystem/grpc-gateway/v2 v2.25.1 h1:VNqngBF40hVlDloBruUehVYC3ArSgIyScOAyMRqBxRg= github.com/grpc-ecosystem/grpc-gateway/v2 v2.25.1/go.mod h1:RBRO7fro65R6tjKzYgLAFo0t1QEXY1Dp+i/bvpRiqiQ= github.com/grpc-ecosystem/grpc-gateway/v2 v2.26.0 h1:VD1gqscl4nYs1YxVuSdemTrSgTKrwOWDK0FVFMqm+Cg= diff --git a/pkg/app/app_types.go b/pkg/app/app_types.go index e4c7b3b..e260ce0 100644 --- a/pkg/app/app_types.go +++ b/pkg/app/app_types.go @@ -24,8 +24,10 @@ type App struct { } type AppGRPC struct { - Services []*GRPCService - GRPCOpts []grpc.ServerOption + Services []*GRPCService + UnaryInterceptors []grpc.UnaryServerInterceptor + StreamInterceptors []grpc.StreamServerInterceptor + GRPCOpts []grpc.ServerOption } type GRPCService struct { diff --git a/pkg/srv/grpc/grpc.go b/pkg/srv/grpc/grpc.go index 4c43bdf..1314230 100644 --- a/pkg/srv/grpc/grpc.go +++ b/pkg/srv/grpc/grpc.go @@ -3,27 +3,54 @@ package grpc import ( "context" - "go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc" + "github.com/rs/zerolog" + "go.opentelemetry.io/otel/codes" "go.opentelemetry.io/otel/metric" "go.opentelemetry.io/otel/trace" - sdktrace "go.opentelemetry.io/otel/sdk/trace" "google.golang.org/grpc" + appotel "gitea.libretechconsulting.com/rmcguire/go-app/pkg/otel" "gitea.libretechconsulting.com/rmcguire/go-app/pkg/srv/grpc/opts" ) -type AppGRPCServer struct { - ctx context.Context - tracer trace.Tracer - meter metric.Meter +type appGRPCServer struct { + ctx context.Context + tracer trace.Tracer + meter metric.Meter + opts *opts.GRPCOpts + serverOpts []grpc.ServerOption + logger *zerolog.Logger + server *grpc.Server } -func InitGRPCServer(ctx context.Context, opts *opts.GRPCOpts) { - server := grpc.NewServer() - - stats := otelgrpc.NewServerHandler(otelgrpc.WithTracerProvider(sdktrace.T)) - - for _, service := range opts.Services { - server.RegisterService(service.Type, service.Service) +// TODO: This probably needs to pass back an error chan and a shutdown +// func like the http server does +func InitGRPCServer(ctx context.Context, opts *opts.GRPCOpts) error { + appGRPC := &appGRPCServer{ + ctx: ctx, + tracer: appotel.GetTracer(ctx, "grpc"), + meter: appotel.GetMeter(ctx, "grpc"), + opts: opts, + serverOpts: make([]grpc.ServerOption, 0), + logger: zerolog.Ctx(ctx), } + + ctx, span := appGRPC.tracer.Start(ctx, "appgrpc.init") + defer span.End() + + if err := appGRPC.prepGRPCServer(ctx); err != nil { + span.RecordError(err) + span.SetStatus(codes.Error, "failed to prepare GRPC Server") + return err + } + + err := appGRPC.runGRPCServer(ctx) + if err != nil { + span.RecordError(err) + span.SetStatus(codes.Error, "failed to start GRPC Server") + return err + } + + span.SetStatus(codes.Ok, "") + return nil } diff --git a/pkg/srv/grpc/grpc_logger.go b/pkg/srv/grpc/grpc_logger.go new file mode 100644 index 0000000..ef9d304 --- /dev/null +++ b/pkg/srv/grpc/grpc_logger.go @@ -0,0 +1,51 @@ +package grpc + +import ( + "context" + + grpclogging "github.com/grpc-ecosystem/go-grpc-middleware/v2/interceptors/logging" + "github.com/rs/zerolog" + "github.com/rs/zerolog/log" +) + +// A grpc logging middleware compatible logger +// implementation using a zerolog logger, either from +// the global logger or a context logger +type GRPCLogger struct { + logger zerolog.Logger +} + +// Returns a grpcLogger using the global zerolog Logger +func NewGRPCLogger() *GRPCLogger { + return &GRPCLogger{logger: log.Logger} +} + +// Returns a grpcLogger using the zerolog logger in the provided context +func NewGRPCContextLogger(ctx context.Context) *GRPCLogger { + logger := log.Ctx(ctx) + return &GRPCLogger{logger: *logger} +} + +// Log logs the fields for given log level. We can assume users (middleware library) will put fields in pairs and +// those will be unique. +func (l *GRPCLogger) Log(ctx context.Context, level grpclogging.Level, msg string, fields ...any) { + var event *zerolog.Event + + switch level { + case grpclogging.LevelDebug: + event = l.logger.Debug() + case grpclogging.LevelInfo: + event = l.logger.Info() + case grpclogging.LevelWarn: + event = l.logger.Warn() + case grpclogging.LevelError: + event = l.logger.Error() + default: + event = l.logger.Warn(). + Int("unknowngrpcloglevel", int(level)) + } + + event.Ctx(ctx) + event.Fields(fields) + event.Msg(msg) +} diff --git a/pkg/srv/grpc/grpc_prepare.go b/pkg/srv/grpc/grpc_prepare.go new file mode 100644 index 0000000..3b1a19d --- /dev/null +++ b/pkg/srv/grpc/grpc_prepare.go @@ -0,0 +1,96 @@ +package grpc + +import ( + "context" + "errors" + "fmt" + + grpclogging "github.com/grpc-ecosystem/go-grpc-middleware/v2/interceptors/logging" + "go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc" + "go.opentelemetry.io/otel" + "go.opentelemetry.io/otel/attribute" + "go.opentelemetry.io/otel/codes" + "go.opentelemetry.io/otel/trace" + "google.golang.org/grpc" + "google.golang.org/grpc/reflection" +) + +func (a *appGRPCServer) prepGRPCServer(spanCtx context.Context) error { + ctx, span := a.tracer.Start(spanCtx, "appgrpc.init.prepare") + defer span.End() + + if len(a.opts.Services) < 1 { + err := errors.New("refusing to create grpc server with no services") + a.logger.Err(err).Send() + span.RecordError(err) + span.SetStatus(codes.Error, err.Error()) + return err + } + + // Prepare GRPC Server Opts + a.prepareOTEL(ctx) + a.prepareLogging(ctx) + + // Chain interceptors for unary RPCs + a.serverOpts = append(a.serverOpts, + grpc.ChainUnaryInterceptor(a.opts.UnaryInterceptors...)) + span.SetAttributes(attribute.Int("numUnaryInterceptors", + len(a.opts.UnaryInterceptors))) + + // Chain interceptors for streaming RPCs + a.serverOpts = append(a.serverOpts, + grpc.ChainStreamInterceptor(a.opts.StreamInterceptors...)) + span.SetAttributes(attribute.Int("numStreamInterceptors", + len(a.opts.StreamInterceptors))) + + // Prepare GRPC Server + a.server = grpc.NewServer(a.serverOpts...) + span.SetAttributes(attribute.Int("numServerOpts", len(a.serverOpts))) + + // Load given services into server registry + for _, service := range a.opts.Services { + span.AddEvent(fmt.Sprintf("registered %s service", service.Name)) + a.server.RegisterService(service.Type, service.Service) + } + span.SetAttributes(attribute.Int("numGRPCServices", len(a.opts.Services))) + + // Enable reflection if desired + if a.opts.EnableReflection { + reflection.Register(a.server) + } + span.SetAttributes(attribute.Bool("reflectionEnabled", a.opts.EnableReflection)) + + span.SetStatus(codes.Ok, "") + return nil +} + +func (a *appGRPCServer) prepareOTEL(spanCtx context.Context) { + _, span := a.tracer.Start(spanCtx, "appgrpc.init.prepare.otel", trace.WithAttributes( + attribute.Bool("instrumentationEnabled", a.opts.EnableInstrumentation), + attribute.String("logLevel", a.logger.GetLevel().String()))) + defer span.End() + + if a.opts.EnableInstrumentation { + a.serverOpts = append(a.serverOpts, + grpc.StatsHandler( + otelgrpc.NewServerHandler( + otelgrpc.WithTracerProvider(otel.GetTracerProvider()), + otelgrpc.WithMeterProvider(otel.GetMeterProvider()), + ))) + span.SetStatus(codes.Ok, "") + } +} + +func (a *appGRPCServer) prepareLogging(spanCtx context.Context) { + _, span := a.tracer.Start(spanCtx, "appgrpc.init.prepare.logging", trace.WithAttributes( + attribute.Bool("instrumentationEnabled", a.opts.LogRequests))) + defer span.End() + + if a.opts.LogRequests { + a.opts.UnaryInterceptors = append(a.opts.UnaryInterceptors, + grpclogging.UnaryServerInterceptor(NewGRPCContextLogger(a.ctx))) + a.opts.StreamInterceptors = append(a.opts.StreamInterceptors, + grpclogging.StreamServerInterceptor(NewGRPCContextLogger(a.ctx))) + span.SetStatus(codes.Ok, "") + } +} diff --git a/pkg/srv/grpc/grpc_run.go b/pkg/srv/grpc/grpc_run.go new file mode 100644 index 0000000..84d531c --- /dev/null +++ b/pkg/srv/grpc/grpc_run.go @@ -0,0 +1,11 @@ +package grpc + +import "context" + +// TODO: Implement +func (a *appGRPCServer) runGRPCServer(spanCtx context.Context) error { + _, span := a.tracer.Start(spanCtx, "appgrpc.init.start") + defer span.End() + + return nil +}