go-app/pkg/srv/grpc/grpc_prepare.go
2025-03-07 11:44:50 -05:00

97 lines
3.1 KiB
Go

package grpc
import (
"context"
"errors"
"fmt"
grpclogging "github.com/grpc-ecosystem/go-grpc-middleware/v2/interceptors/logging"
"go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc"
"go.opentelemetry.io/otel"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/codes"
"go.opentelemetry.io/otel/trace"
"google.golang.org/grpc"
"google.golang.org/grpc/reflection"
)
func (a *appGRPCServer) prepGRPCServer(spanCtx context.Context) error {
ctx, span := a.tracer.Start(spanCtx, "appgrpc.init.prepare")
defer span.End()
if len(a.opts.Services) < 1 {
err := errors.New("refusing to create grpc server with no services")
a.logger.Err(err).Send()
span.RecordError(err)
span.SetStatus(codes.Error, err.Error())
return err
}
// Prepare GRPC Server Opts
a.prepareOTEL(ctx)
a.prepareLogging(ctx)
// Chain interceptors for unary RPCs
a.serverOpts = append(a.serverOpts,
grpc.ChainUnaryInterceptor(a.opts.UnaryInterceptors...))
span.SetAttributes(attribute.Int("grpc.server.unaryinterceptors",
len(a.opts.UnaryInterceptors)))
// Chain interceptors for streaming RPCs
a.serverOpts = append(a.serverOpts,
grpc.ChainStreamInterceptor(a.opts.StreamInterceptors...))
span.SetAttributes(attribute.Int("grpc.server.streaminterceptors",
len(a.opts.StreamInterceptors)))
// Prepare GRPC Server
a.server = grpc.NewServer(a.serverOpts...)
span.SetAttributes(attribute.Int("grpc.server.serveropts", len(a.serverOpts)))
// Load given services into server registry
for _, service := range a.opts.Services {
span.AddEvent(fmt.Sprintf("registered %s service", service.Name))
a.server.RegisterService(service.Type, service.Service)
}
span.SetAttributes(attribute.Int("grpc.server.grpcservices", len(a.opts.Services)))
// Enable reflection if desired
if a.opts.EnableReflection {
reflection.Register(a.server)
}
span.SetAttributes(attribute.Bool("grpc.server.reflection", a.opts.EnableReflection))
span.SetStatus(codes.Ok, "")
return nil
}
func (a *appGRPCServer) prepareOTEL(spanCtx context.Context) {
_, span := a.tracer.Start(spanCtx, "appgrpc.init.prepare.otel", trace.WithAttributes(
attribute.Bool("grpc.server.instrumented", a.opts.EnableInstrumentation)))
defer span.End()
if a.opts.EnableInstrumentation {
a.serverOpts = append(a.serverOpts,
grpc.StatsHandler(
otelgrpc.NewServerHandler(
otelgrpc.WithTracerProvider(otel.GetTracerProvider()),
otelgrpc.WithMeterProvider(otel.GetMeterProvider()),
)))
span.SetStatus(codes.Ok, "")
}
}
func (a *appGRPCServer) prepareLogging(spanCtx context.Context) {
_, span := a.tracer.Start(spanCtx, "appgrpc.init.prepare.logging", trace.WithAttributes(
attribute.Bool("grpc.server.instrumentated", a.opts.LogRequests),
attribute.String("grpc.server.loglevel", a.logger.GetLevel().String())))
defer span.End()
if a.opts.LogRequests {
a.opts.UnaryInterceptors = append(a.opts.UnaryInterceptors,
grpclogging.UnaryServerInterceptor(NewGRPCContextLogger(a.ctx)))
a.opts.StreamInterceptors = append(a.opts.StreamInterceptors,
grpclogging.StreamServerInterceptor(NewGRPCContextLogger(a.ctx)))
span.SetStatus(codes.Ok, "")
}
}