add grpc support
This commit is contained in:
parent
5aa5dda111
commit
98fba4eac8
@ -22,7 +22,7 @@ import (
|
|||||||
"go.opentelemetry.io/otel/sdk/metric"
|
"go.opentelemetry.io/otel/sdk/metric"
|
||||||
"go.opentelemetry.io/otel/sdk/resource"
|
"go.opentelemetry.io/otel/sdk/resource"
|
||||||
traceSDK "go.opentelemetry.io/otel/sdk/trace"
|
traceSDK "go.opentelemetry.io/otel/sdk/trace"
|
||||||
semconv "go.opentelemetry.io/otel/semconv/v1.24.0"
|
semconv "go.opentelemetry.io/otel/semconv/v1.27.0"
|
||||||
trace "go.opentelemetry.io/otel/trace"
|
trace "go.opentelemetry.io/otel/trace"
|
||||||
"go.opentelemetry.io/otel/trace/noop"
|
"go.opentelemetry.io/otel/trace/noop"
|
||||||
)
|
)
|
||||||
@ -170,7 +170,7 @@ func newResource(ctx context.Context) *resource.Resource {
|
|||||||
attributes := []attribute.KeyValue{
|
attributes := []attribute.KeyValue{
|
||||||
semconv.ServiceName(cfg.Name),
|
semconv.ServiceName(cfg.Name),
|
||||||
semconv.ServiceVersion(cfg.Version),
|
semconv.ServiceVersion(cfg.Version),
|
||||||
semconv.DeploymentEnvironment(cfg.Environment),
|
semconv.DeploymentEnvironmentName(cfg.Environment),
|
||||||
semconv.K8SPodName(os.Getenv("HOSTNAME")),
|
semconv.K8SPodName(os.Getenv("HOSTNAME")),
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -2,8 +2,10 @@ package grpc
|
|||||||
|
|
||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
|
"strings"
|
||||||
|
|
||||||
"github.com/rs/zerolog"
|
"github.com/rs/zerolog"
|
||||||
|
"go.opentelemetry.io/otel/attribute"
|
||||||
"go.opentelemetry.io/otel/codes"
|
"go.opentelemetry.io/otel/codes"
|
||||||
"go.opentelemetry.io/otel/metric"
|
"go.opentelemetry.io/otel/metric"
|
||||||
"go.opentelemetry.io/otel/trace"
|
"go.opentelemetry.io/otel/trace"
|
||||||
@ -38,12 +40,18 @@ func InitGRPCServer(ctx context.Context, opts *opts.GRPCOpts) error {
|
|||||||
ctx, span := appGRPC.tracer.Start(ctx, "appgrpc.init")
|
ctx, span := appGRPC.tracer.Start(ctx, "appgrpc.init")
|
||||||
defer span.End()
|
defer span.End()
|
||||||
|
|
||||||
|
// Prepare grpc.Server for use
|
||||||
if err := appGRPC.prepGRPCServer(ctx); err != nil {
|
if err := appGRPC.prepGRPCServer(ctx); err != nil {
|
||||||
span.RecordError(err)
|
span.RecordError(err)
|
||||||
span.SetStatus(codes.Error, "failed to prepare GRPC Server")
|
span.SetStatus(codes.Error, "failed to prepare GRPC Server")
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Load span with server info
|
||||||
|
span.SetAttributes(appGRPC.getServerAttributes()...)
|
||||||
|
|
||||||
|
// Run, returning a shutdown func and an error chan
|
||||||
|
// TODO: Implement shutdown func and error chan
|
||||||
err := appGRPC.runGRPCServer(ctx)
|
err := appGRPC.runGRPCServer(ctx)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
span.RecordError(err)
|
span.RecordError(err)
|
||||||
@ -54,3 +62,31 @@ func InitGRPCServer(ctx context.Context, opts *opts.GRPCOpts) error {
|
|||||||
span.SetStatus(codes.Ok, "")
|
span.SetStatus(codes.Ok, "")
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Convert grpc.ServiceInfo map to []attribute.KeyValue
|
||||||
|
func (a *appGRPCServer) getServerAttributes() []attribute.KeyValue {
|
||||||
|
var attrs []attribute.KeyValue
|
||||||
|
|
||||||
|
for serviceName, info := range a.server.GetServiceInfo() {
|
||||||
|
// Add the service name
|
||||||
|
attrs = append(attrs, attribute.String("grpc.service", serviceName))
|
||||||
|
|
||||||
|
// Convert methods into a comma-separated string
|
||||||
|
var methods []string
|
||||||
|
for _, method := range info.Methods {
|
||||||
|
methods = append(methods, method.Name)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Add methods if present
|
||||||
|
if len(methods) > 0 {
|
||||||
|
attrs = append(attrs, attribute.String("grpc.service.methods", strings.Join(methods, ",")))
|
||||||
|
}
|
||||||
|
|
||||||
|
// If metadata is a string, store it
|
||||||
|
if metadata, ok := info.Metadata.(string); ok && metadata != "" {
|
||||||
|
attrs = append(attrs, attribute.String("grpc.service.metadata", metadata))
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return attrs
|
||||||
|
}
|
||||||
|
@ -34,31 +34,31 @@ func (a *appGRPCServer) prepGRPCServer(spanCtx context.Context) error {
|
|||||||
// Chain interceptors for unary RPCs
|
// Chain interceptors for unary RPCs
|
||||||
a.serverOpts = append(a.serverOpts,
|
a.serverOpts = append(a.serverOpts,
|
||||||
grpc.ChainUnaryInterceptor(a.opts.UnaryInterceptors...))
|
grpc.ChainUnaryInterceptor(a.opts.UnaryInterceptors...))
|
||||||
span.SetAttributes(attribute.Int("numUnaryInterceptors",
|
span.SetAttributes(attribute.Int("grpc.server.unaryinterceptors",
|
||||||
len(a.opts.UnaryInterceptors)))
|
len(a.opts.UnaryInterceptors)))
|
||||||
|
|
||||||
// Chain interceptors for streaming RPCs
|
// Chain interceptors for streaming RPCs
|
||||||
a.serverOpts = append(a.serverOpts,
|
a.serverOpts = append(a.serverOpts,
|
||||||
grpc.ChainStreamInterceptor(a.opts.StreamInterceptors...))
|
grpc.ChainStreamInterceptor(a.opts.StreamInterceptors...))
|
||||||
span.SetAttributes(attribute.Int("numStreamInterceptors",
|
span.SetAttributes(attribute.Int("grpc.server.streaminterceptors",
|
||||||
len(a.opts.StreamInterceptors)))
|
len(a.opts.StreamInterceptors)))
|
||||||
|
|
||||||
// Prepare GRPC Server
|
// Prepare GRPC Server
|
||||||
a.server = grpc.NewServer(a.serverOpts...)
|
a.server = grpc.NewServer(a.serverOpts...)
|
||||||
span.SetAttributes(attribute.Int("numServerOpts", len(a.serverOpts)))
|
span.SetAttributes(attribute.Int("grpc.server.serveropts", len(a.serverOpts)))
|
||||||
|
|
||||||
// Load given services into server registry
|
// Load given services into server registry
|
||||||
for _, service := range a.opts.Services {
|
for _, service := range a.opts.Services {
|
||||||
span.AddEvent(fmt.Sprintf("registered %s service", service.Name))
|
span.AddEvent(fmt.Sprintf("registered %s service", service.Name))
|
||||||
a.server.RegisterService(service.Type, service.Service)
|
a.server.RegisterService(service.Type, service.Service)
|
||||||
}
|
}
|
||||||
span.SetAttributes(attribute.Int("numGRPCServices", len(a.opts.Services)))
|
span.SetAttributes(attribute.Int("grpc.server.grpcservices", len(a.opts.Services)))
|
||||||
|
|
||||||
// Enable reflection if desired
|
// Enable reflection if desired
|
||||||
if a.opts.EnableReflection {
|
if a.opts.EnableReflection {
|
||||||
reflection.Register(a.server)
|
reflection.Register(a.server)
|
||||||
}
|
}
|
||||||
span.SetAttributes(attribute.Bool("reflectionEnabled", a.opts.EnableReflection))
|
span.SetAttributes(attribute.Bool("grpc.server.reflection", a.opts.EnableReflection))
|
||||||
|
|
||||||
span.SetStatus(codes.Ok, "")
|
span.SetStatus(codes.Ok, "")
|
||||||
return nil
|
return nil
|
||||||
@ -66,8 +66,7 @@ func (a *appGRPCServer) prepGRPCServer(spanCtx context.Context) error {
|
|||||||
|
|
||||||
func (a *appGRPCServer) prepareOTEL(spanCtx context.Context) {
|
func (a *appGRPCServer) prepareOTEL(spanCtx context.Context) {
|
||||||
_, span := a.tracer.Start(spanCtx, "appgrpc.init.prepare.otel", trace.WithAttributes(
|
_, span := a.tracer.Start(spanCtx, "appgrpc.init.prepare.otel", trace.WithAttributes(
|
||||||
attribute.Bool("instrumentationEnabled", a.opts.EnableInstrumentation),
|
attribute.Bool("grpc.server.instrumented", a.opts.EnableInstrumentation)))
|
||||||
attribute.String("logLevel", a.logger.GetLevel().String())))
|
|
||||||
defer span.End()
|
defer span.End()
|
||||||
|
|
||||||
if a.opts.EnableInstrumentation {
|
if a.opts.EnableInstrumentation {
|
||||||
@ -83,7 +82,8 @@ func (a *appGRPCServer) prepareOTEL(spanCtx context.Context) {
|
|||||||
|
|
||||||
func (a *appGRPCServer) prepareLogging(spanCtx context.Context) {
|
func (a *appGRPCServer) prepareLogging(spanCtx context.Context) {
|
||||||
_, span := a.tracer.Start(spanCtx, "appgrpc.init.prepare.logging", trace.WithAttributes(
|
_, span := a.tracer.Start(spanCtx, "appgrpc.init.prepare.logging", trace.WithAttributes(
|
||||||
attribute.Bool("instrumentationEnabled", a.opts.LogRequests)))
|
attribute.Bool("grpc.server.instrumentated", a.opts.LogRequests),
|
||||||
|
attribute.String("grpc.server.loglevel", a.logger.GetLevel().String())))
|
||||||
defer span.End()
|
defer span.End()
|
||||||
|
|
||||||
if a.opts.LogRequests {
|
if a.opts.LogRequests {
|
||||||
|
@ -1,11 +1,21 @@
|
|||||||
package grpc
|
package grpc
|
||||||
|
|
||||||
import "context"
|
import (
|
||||||
|
"context"
|
||||||
|
|
||||||
|
semconv "go.opentelemetry.io/otel/semconv/v1.27.0"
|
||||||
|
)
|
||||||
|
|
||||||
// TODO: Implement
|
// TODO: Implement
|
||||||
func (a *appGRPCServer) runGRPCServer(spanCtx context.Context) error {
|
func (a *appGRPCServer) runGRPCServer(spanCtx context.Context) error {
|
||||||
_, span := a.tracer.Start(spanCtx, "appgrpc.init.start")
|
_, span := a.tracer.Start(spanCtx, "appgrpc.init.start")
|
||||||
defer span.End()
|
defer span.End()
|
||||||
|
|
||||||
|
span.SetAttributes(
|
||||||
|
semconv.RPCSystemGRPC,
|
||||||
|
semconv.NetworkProtocolName("grpc"),
|
||||||
|
semconv.ServerAddress(a.opts.Listen),
|
||||||
|
)
|
||||||
|
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
Loading…
x
Reference in New Issue
Block a user