add grpc support
This commit is contained in:
parent
81676c5404
commit
5aa5dda111
1
go.mod
1
go.mod
@ -4,6 +4,7 @@ go 1.23.4
|
||||
|
||||
require (
|
||||
github.com/caarlos0/env/v11 v11.3.1
|
||||
github.com/grpc-ecosystem/go-grpc-middleware/v2 v2.3.1
|
||||
github.com/prometheus/client_golang v1.20.5
|
||||
github.com/rs/zerolog v1.33.0
|
||||
go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc v0.60.0
|
||||
|
2
go.sum
2
go.sum
@ -23,6 +23,8 @@ github.com/google/go-cmp v0.6.0 h1:ofyhxvXcZhMsU5ulbFiLKl/XBFqE1GSq7atu8tAmTRI=
|
||||
github.com/google/go-cmp v0.6.0/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY=
|
||||
github.com/google/uuid v1.6.0 h1:NIvaJDMOsjHA8n1jAhLSgzrAzy1Hgr+hNrb57e+94F0=
|
||||
github.com/google/uuid v1.6.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo=
|
||||
github.com/grpc-ecosystem/go-grpc-middleware/v2 v2.3.1 h1:KcFzXwzM/kGhIRHvc8jdixfIJjVzuUJdnv+5xsPutog=
|
||||
github.com/grpc-ecosystem/go-grpc-middleware/v2 v2.3.1/go.mod h1:qOchhhIlmRcqk/O9uCo/puJlyo07YINaIqdZfZG3Jkc=
|
||||
github.com/grpc-ecosystem/grpc-gateway/v2 v2.25.1 h1:VNqngBF40hVlDloBruUehVYC3ArSgIyScOAyMRqBxRg=
|
||||
github.com/grpc-ecosystem/grpc-gateway/v2 v2.25.1/go.mod h1:RBRO7fro65R6tjKzYgLAFo0t1QEXY1Dp+i/bvpRiqiQ=
|
||||
github.com/grpc-ecosystem/grpc-gateway/v2 v2.26.0 h1:VD1gqscl4nYs1YxVuSdemTrSgTKrwOWDK0FVFMqm+Cg=
|
||||
|
@ -25,6 +25,8 @@ type App struct {
|
||||
|
||||
type AppGRPC struct {
|
||||
Services []*GRPCService
|
||||
UnaryInterceptors []grpc.UnaryServerInterceptor
|
||||
StreamInterceptors []grpc.StreamServerInterceptor
|
||||
GRPCOpts []grpc.ServerOption
|
||||
}
|
||||
|
||||
|
@ -3,27 +3,54 @@ package grpc
|
||||
import (
|
||||
"context"
|
||||
|
||||
"go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc"
|
||||
"github.com/rs/zerolog"
|
||||
"go.opentelemetry.io/otel/codes"
|
||||
"go.opentelemetry.io/otel/metric"
|
||||
"go.opentelemetry.io/otel/trace"
|
||||
sdktrace "go.opentelemetry.io/otel/sdk/trace"
|
||||
"google.golang.org/grpc"
|
||||
|
||||
appotel "gitea.libretechconsulting.com/rmcguire/go-app/pkg/otel"
|
||||
"gitea.libretechconsulting.com/rmcguire/go-app/pkg/srv/grpc/opts"
|
||||
)
|
||||
|
||||
type AppGRPCServer struct {
|
||||
type appGRPCServer struct {
|
||||
ctx context.Context
|
||||
tracer trace.Tracer
|
||||
meter metric.Meter
|
||||
opts *opts.GRPCOpts
|
||||
serverOpts []grpc.ServerOption
|
||||
logger *zerolog.Logger
|
||||
server *grpc.Server
|
||||
}
|
||||
|
||||
func InitGRPCServer(ctx context.Context, opts *opts.GRPCOpts) {
|
||||
server := grpc.NewServer()
|
||||
|
||||
stats := otelgrpc.NewServerHandler(otelgrpc.WithTracerProvider(sdktrace.T))
|
||||
|
||||
for _, service := range opts.Services {
|
||||
server.RegisterService(service.Type, service.Service)
|
||||
// TODO: This probably needs to pass back an error chan and a shutdown
|
||||
// func like the http server does
|
||||
func InitGRPCServer(ctx context.Context, opts *opts.GRPCOpts) error {
|
||||
appGRPC := &appGRPCServer{
|
||||
ctx: ctx,
|
||||
tracer: appotel.GetTracer(ctx, "grpc"),
|
||||
meter: appotel.GetMeter(ctx, "grpc"),
|
||||
opts: opts,
|
||||
serverOpts: make([]grpc.ServerOption, 0),
|
||||
logger: zerolog.Ctx(ctx),
|
||||
}
|
||||
|
||||
ctx, span := appGRPC.tracer.Start(ctx, "appgrpc.init")
|
||||
defer span.End()
|
||||
|
||||
if err := appGRPC.prepGRPCServer(ctx); err != nil {
|
||||
span.RecordError(err)
|
||||
span.SetStatus(codes.Error, "failed to prepare GRPC Server")
|
||||
return err
|
||||
}
|
||||
|
||||
err := appGRPC.runGRPCServer(ctx)
|
||||
if err != nil {
|
||||
span.RecordError(err)
|
||||
span.SetStatus(codes.Error, "failed to start GRPC Server")
|
||||
return err
|
||||
}
|
||||
|
||||
span.SetStatus(codes.Ok, "")
|
||||
return nil
|
||||
}
|
||||
|
51
pkg/srv/grpc/grpc_logger.go
Normal file
51
pkg/srv/grpc/grpc_logger.go
Normal file
@ -0,0 +1,51 @@
|
||||
package grpc
|
||||
|
||||
import (
|
||||
"context"
|
||||
|
||||
grpclogging "github.com/grpc-ecosystem/go-grpc-middleware/v2/interceptors/logging"
|
||||
"github.com/rs/zerolog"
|
||||
"github.com/rs/zerolog/log"
|
||||
)
|
||||
|
||||
// A grpc logging middleware compatible logger
|
||||
// implementation using a zerolog logger, either from
|
||||
// the global logger or a context logger
|
||||
type GRPCLogger struct {
|
||||
logger zerolog.Logger
|
||||
}
|
||||
|
||||
// Returns a grpcLogger using the global zerolog Logger
|
||||
func NewGRPCLogger() *GRPCLogger {
|
||||
return &GRPCLogger{logger: log.Logger}
|
||||
}
|
||||
|
||||
// Returns a grpcLogger using the zerolog logger in the provided context
|
||||
func NewGRPCContextLogger(ctx context.Context) *GRPCLogger {
|
||||
logger := log.Ctx(ctx)
|
||||
return &GRPCLogger{logger: *logger}
|
||||
}
|
||||
|
||||
// Log logs the fields for given log level. We can assume users (middleware library) will put fields in pairs and
|
||||
// those will be unique.
|
||||
func (l *GRPCLogger) Log(ctx context.Context, level grpclogging.Level, msg string, fields ...any) {
|
||||
var event *zerolog.Event
|
||||
|
||||
switch level {
|
||||
case grpclogging.LevelDebug:
|
||||
event = l.logger.Debug()
|
||||
case grpclogging.LevelInfo:
|
||||
event = l.logger.Info()
|
||||
case grpclogging.LevelWarn:
|
||||
event = l.logger.Warn()
|
||||
case grpclogging.LevelError:
|
||||
event = l.logger.Error()
|
||||
default:
|
||||
event = l.logger.Warn().
|
||||
Int("unknowngrpcloglevel", int(level))
|
||||
}
|
||||
|
||||
event.Ctx(ctx)
|
||||
event.Fields(fields)
|
||||
event.Msg(msg)
|
||||
}
|
96
pkg/srv/grpc/grpc_prepare.go
Normal file
96
pkg/srv/grpc/grpc_prepare.go
Normal file
@ -0,0 +1,96 @@
|
||||
package grpc
|
||||
|
||||
import (
|
||||
"context"
|
||||
"errors"
|
||||
"fmt"
|
||||
|
||||
grpclogging "github.com/grpc-ecosystem/go-grpc-middleware/v2/interceptors/logging"
|
||||
"go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc"
|
||||
"go.opentelemetry.io/otel"
|
||||
"go.opentelemetry.io/otel/attribute"
|
||||
"go.opentelemetry.io/otel/codes"
|
||||
"go.opentelemetry.io/otel/trace"
|
||||
"google.golang.org/grpc"
|
||||
"google.golang.org/grpc/reflection"
|
||||
)
|
||||
|
||||
func (a *appGRPCServer) prepGRPCServer(spanCtx context.Context) error {
|
||||
ctx, span := a.tracer.Start(spanCtx, "appgrpc.init.prepare")
|
||||
defer span.End()
|
||||
|
||||
if len(a.opts.Services) < 1 {
|
||||
err := errors.New("refusing to create grpc server with no services")
|
||||
a.logger.Err(err).Send()
|
||||
span.RecordError(err)
|
||||
span.SetStatus(codes.Error, err.Error())
|
||||
return err
|
||||
}
|
||||
|
||||
// Prepare GRPC Server Opts
|
||||
a.prepareOTEL(ctx)
|
||||
a.prepareLogging(ctx)
|
||||
|
||||
// Chain interceptors for unary RPCs
|
||||
a.serverOpts = append(a.serverOpts,
|
||||
grpc.ChainUnaryInterceptor(a.opts.UnaryInterceptors...))
|
||||
span.SetAttributes(attribute.Int("numUnaryInterceptors",
|
||||
len(a.opts.UnaryInterceptors)))
|
||||
|
||||
// Chain interceptors for streaming RPCs
|
||||
a.serverOpts = append(a.serverOpts,
|
||||
grpc.ChainStreamInterceptor(a.opts.StreamInterceptors...))
|
||||
span.SetAttributes(attribute.Int("numStreamInterceptors",
|
||||
len(a.opts.StreamInterceptors)))
|
||||
|
||||
// Prepare GRPC Server
|
||||
a.server = grpc.NewServer(a.serverOpts...)
|
||||
span.SetAttributes(attribute.Int("numServerOpts", len(a.serverOpts)))
|
||||
|
||||
// Load given services into server registry
|
||||
for _, service := range a.opts.Services {
|
||||
span.AddEvent(fmt.Sprintf("registered %s service", service.Name))
|
||||
a.server.RegisterService(service.Type, service.Service)
|
||||
}
|
||||
span.SetAttributes(attribute.Int("numGRPCServices", len(a.opts.Services)))
|
||||
|
||||
// Enable reflection if desired
|
||||
if a.opts.EnableReflection {
|
||||
reflection.Register(a.server)
|
||||
}
|
||||
span.SetAttributes(attribute.Bool("reflectionEnabled", a.opts.EnableReflection))
|
||||
|
||||
span.SetStatus(codes.Ok, "")
|
||||
return nil
|
||||
}
|
||||
|
||||
func (a *appGRPCServer) prepareOTEL(spanCtx context.Context) {
|
||||
_, span := a.tracer.Start(spanCtx, "appgrpc.init.prepare.otel", trace.WithAttributes(
|
||||
attribute.Bool("instrumentationEnabled", a.opts.EnableInstrumentation),
|
||||
attribute.String("logLevel", a.logger.GetLevel().String())))
|
||||
defer span.End()
|
||||
|
||||
if a.opts.EnableInstrumentation {
|
||||
a.serverOpts = append(a.serverOpts,
|
||||
grpc.StatsHandler(
|
||||
otelgrpc.NewServerHandler(
|
||||
otelgrpc.WithTracerProvider(otel.GetTracerProvider()),
|
||||
otelgrpc.WithMeterProvider(otel.GetMeterProvider()),
|
||||
)))
|
||||
span.SetStatus(codes.Ok, "")
|
||||
}
|
||||
}
|
||||
|
||||
func (a *appGRPCServer) prepareLogging(spanCtx context.Context) {
|
||||
_, span := a.tracer.Start(spanCtx, "appgrpc.init.prepare.logging", trace.WithAttributes(
|
||||
attribute.Bool("instrumentationEnabled", a.opts.LogRequests)))
|
||||
defer span.End()
|
||||
|
||||
if a.opts.LogRequests {
|
||||
a.opts.UnaryInterceptors = append(a.opts.UnaryInterceptors,
|
||||
grpclogging.UnaryServerInterceptor(NewGRPCContextLogger(a.ctx)))
|
||||
a.opts.StreamInterceptors = append(a.opts.StreamInterceptors,
|
||||
grpclogging.StreamServerInterceptor(NewGRPCContextLogger(a.ctx)))
|
||||
span.SetStatus(codes.Ok, "")
|
||||
}
|
||||
}
|
11
pkg/srv/grpc/grpc_run.go
Normal file
11
pkg/srv/grpc/grpc_run.go
Normal file
@ -0,0 +1,11 @@
|
||||
package grpc
|
||||
|
||||
import "context"
|
||||
|
||||
// TODO: Implement
|
||||
func (a *appGRPCServer) runGRPCServer(spanCtx context.Context) error {
|
||||
_, span := a.tracer.Start(spanCtx, "appgrpc.init.start")
|
||||
defer span.End()
|
||||
|
||||
return nil
|
||||
}
|
Loading…
x
Reference in New Issue
Block a user