add grpc support
This commit is contained in:
		@@ -10,7 +10,9 @@ import (
 | 
				
			|||||||
	srvhttp "gitea.libretechconsulting.com/rmcguire/go-app/pkg/srv/http"
 | 
						srvhttp "gitea.libretechconsulting.com/rmcguire/go-app/pkg/srv/http"
 | 
				
			||||||
)
 | 
					)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
func (a *App) initGRPC() {
 | 
					// TODO: Implement
 | 
				
			||||||
 | 
					func (a *App) initGRPC(ctx context.Context) error {
 | 
				
			||||||
 | 
						return nil
 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
func (a *App) initHTTP(ctx context.Context) error {
 | 
					func (a *App) initHTTP(ctx context.Context) error {
 | 
				
			||||||
 
 | 
				
			|||||||
@@ -34,6 +34,7 @@ type GRPCService struct {
 | 
				
			|||||||
	Name     string            // Descriptive name of service
 | 
						Name     string            // Descriptive name of service
 | 
				
			||||||
	Type     *grpc.ServiceDesc // Type (from protoc generated code)
 | 
						Type     *grpc.ServiceDesc // Type (from protoc generated code)
 | 
				
			||||||
	Service  any               // Implementation of GRPCService.Type (ptr)
 | 
						Service  any               // Implementation of GRPCService.Type (ptr)
 | 
				
			||||||
 | 
						grpcDone <-chan error
 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
type AppHTTP struct {
 | 
					type AppHTTP struct {
 | 
				
			||||||
 
 | 
				
			|||||||
@@ -23,11 +23,15 @@ type appGRPCServer struct {
 | 
				
			|||||||
	serverOpts   []grpc.ServerOption
 | 
						serverOpts   []grpc.ServerOption
 | 
				
			||||||
	logger       *zerolog.Logger
 | 
						logger       *zerolog.Logger
 | 
				
			||||||
	server       *grpc.Server
 | 
						server       *grpc.Server
 | 
				
			||||||
 | 
						shutdownFunc func(context.Context) error
 | 
				
			||||||
 | 
						doneChan     chan error
 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
// TODO: This probably needs to pass back an error chan and a shutdown
 | 
					// Returns a shutdown func, a channel indicating done / error,
 | 
				
			||||||
// func like the http server does
 | 
					// and an up-front error if server fails to start
 | 
				
			||||||
func InitGRPCServer(ctx context.Context, opts *opts.GRPCOpts) error {
 | 
					func InitGRPCServer(ctx context.Context, opts *opts.GRPCOpts) (
 | 
				
			||||||
 | 
						func(context.Context) error, <-chan error, error,
 | 
				
			||||||
 | 
					) {
 | 
				
			||||||
	appGRPC := &appGRPCServer{
 | 
						appGRPC := &appGRPCServer{
 | 
				
			||||||
		ctx:        ctx,
 | 
							ctx:        ctx,
 | 
				
			||||||
		tracer:     appotel.GetTracer(ctx, "grpc"),
 | 
							tracer:     appotel.GetTracer(ctx, "grpc"),
 | 
				
			||||||
@@ -35,6 +39,7 @@ func InitGRPCServer(ctx context.Context, opts *opts.GRPCOpts) error {
 | 
				
			|||||||
		opts:       opts,
 | 
							opts:       opts,
 | 
				
			||||||
		serverOpts: make([]grpc.ServerOption, 0),
 | 
							serverOpts: make([]grpc.ServerOption, 0),
 | 
				
			||||||
		logger:     zerolog.Ctx(ctx),
 | 
							logger:     zerolog.Ctx(ctx),
 | 
				
			||||||
 | 
							doneChan:   make(chan error),
 | 
				
			||||||
	}
 | 
						}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
	ctx, span := appGRPC.tracer.Start(ctx, "appgrpc.init")
 | 
						ctx, span := appGRPC.tracer.Start(ctx, "appgrpc.init")
 | 
				
			||||||
@@ -44,23 +49,21 @@ func InitGRPCServer(ctx context.Context, opts *opts.GRPCOpts) error {
 | 
				
			|||||||
	if err := appGRPC.prepGRPCServer(ctx); err != nil {
 | 
						if err := appGRPC.prepGRPCServer(ctx); err != nil {
 | 
				
			||||||
		span.RecordError(err)
 | 
							span.RecordError(err)
 | 
				
			||||||
		span.SetStatus(codes.Error, "failed to prepare GRPC Server")
 | 
							span.SetStatus(codes.Error, "failed to prepare GRPC Server")
 | 
				
			||||||
		return err
 | 
							return nil, nil, err
 | 
				
			||||||
	}
 | 
						}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
	// Load span with server info
 | 
						// Load span with server info
 | 
				
			||||||
	span.SetAttributes(appGRPC.getServerAttributes()...)
 | 
						span.SetAttributes(appGRPC.getServerAttributes()...)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
	// Run, returning a shutdown func and an error chan
 | 
					 | 
				
			||||||
	// TODO: Implement shutdown func and error chan
 | 
					 | 
				
			||||||
	err := appGRPC.runGRPCServer(ctx)
 | 
						err := appGRPC.runGRPCServer(ctx)
 | 
				
			||||||
	if err != nil {
 | 
						if err != nil {
 | 
				
			||||||
		span.RecordError(err)
 | 
							span.RecordError(err)
 | 
				
			||||||
		span.SetStatus(codes.Error, "failed to start GRPC Server")
 | 
							span.SetStatus(codes.Error, "failed to start GRPC Server")
 | 
				
			||||||
		return err
 | 
							return nil, nil, err
 | 
				
			||||||
	}
 | 
						}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
	span.SetStatus(codes.Ok, "")
 | 
						span.SetStatus(codes.Ok, "")
 | 
				
			||||||
	return nil
 | 
						return appGRPC.shutdownFunc, appGRPC.doneChan, nil
 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
// Convert grpc.ServiceInfo map to []attribute.KeyValue
 | 
					// Convert grpc.ServiceInfo map to []attribute.KeyValue
 | 
				
			||||||
 
 | 
				
			|||||||
@@ -2,7 +2,9 @@ package grpc
 | 
				
			|||||||
 | 
					
 | 
				
			||||||
import (
 | 
					import (
 | 
				
			||||||
	"context"
 | 
						"context"
 | 
				
			||||||
 | 
						"net"
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
						"go.opentelemetry.io/otel/codes"
 | 
				
			||||||
	semconv "go.opentelemetry.io/otel/semconv/v1.27.0"
 | 
						semconv "go.opentelemetry.io/otel/semconv/v1.27.0"
 | 
				
			||||||
)
 | 
					)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
@@ -17,5 +19,46 @@ func (a *appGRPCServer) runGRPCServer(spanCtx context.Context) error {
 | 
				
			|||||||
		semconv.ServerAddress(a.opts.Listen),
 | 
							semconv.ServerAddress(a.opts.Listen),
 | 
				
			||||||
	)
 | 
						)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
						a.shutdownFunc = a.getShutdownFunc()
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
						l, err := net.Listen("tcp", a.opts.Listen)
 | 
				
			||||||
 | 
						if err != nil {
 | 
				
			||||||
 | 
							a.logger.Err(err).Send()
 | 
				
			||||||
 | 
							span.RecordError(err)
 | 
				
			||||||
 | 
							span.SetStatus(codes.Error, "failed to acquire net listener")
 | 
				
			||||||
 | 
							return err
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
						span.AddEvent("network listener acquired")
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
						// Launch GRPC Server
 | 
				
			||||||
 | 
						go func() {
 | 
				
			||||||
 | 
							if err := a.server.Serve(l); err != nil {
 | 
				
			||||||
 | 
								a.logger.Err(err).Send()
 | 
				
			||||||
 | 
								a.doneChan <- err
 | 
				
			||||||
 | 
							}
 | 
				
			||||||
 | 
						}()
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
						span.AddEvent("grpc server goroutine launched")
 | 
				
			||||||
 | 
						span.SetStatus(codes.Ok, "")
 | 
				
			||||||
 | 
					
 | 
				
			||||||
	return nil
 | 
						return nil
 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					func (a *appGRPCServer) getShutdownFunc() func(context.Context) error {
 | 
				
			||||||
 | 
						return func(ctx context.Context) error {
 | 
				
			||||||
 | 
							stoppedChan := make(chan any)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
							go func() {
 | 
				
			||||||
 | 
								a.server.GracefulStop()
 | 
				
			||||||
 | 
								stoppedChan <- nil
 | 
				
			||||||
 | 
							}()
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
							select {
 | 
				
			||||||
 | 
							case <-stoppedChan:
 | 
				
			||||||
 | 
								return nil
 | 
				
			||||||
 | 
							case <-ctx.Done():
 | 
				
			||||||
 | 
								return ctx.Err()
 | 
				
			||||||
 | 
							}
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
					}
 | 
				
			||||||
 
 | 
				
			|||||||
		Reference in New Issue
	
	Block a user