start implementing gw serve mux support
This commit is contained in:
@ -44,6 +44,9 @@ func (a *App) MustRun() {
|
||||
|
||||
var serverWG sync.WaitGroup
|
||||
|
||||
// TODO: Figure out where to merge GRPC Gateway Serve Mux
|
||||
// into HTTP serve mux
|
||||
|
||||
// Start HTTP (does not block)
|
||||
if a.cfg.HTTPEnabled() {
|
||||
serverWG.Add(1)
|
||||
|
43
pkg/srv/grpc/gateway.go
Normal file
43
pkg/srv/grpc/gateway.go
Normal file
@ -0,0 +1,43 @@
|
||||
package grpc
|
||||
|
||||
import (
|
||||
"context"
|
||||
"net/http"
|
||||
|
||||
"github.com/grpc-ecosystem/grpc-gateway/runtime"
|
||||
"go.opentelemetry.io/otel/attribute"
|
||||
"go.opentelemetry.io/otel/trace"
|
||||
"google.golang.org/grpc"
|
||||
|
||||
"gitea.libretechconsulting.com/rmcguire/go-app/pkg/srv/grpc/opts"
|
||||
)
|
||||
|
||||
const API_PATH = "/api" // TODO: Make Configurable
|
||||
|
||||
// TODO: Add GetClientConn method to appGRPCServer that supports TLS, auth, etc..
|
||||
// Will probably have to convert server opts to client opts
|
||||
func (a *appGRPCServer) registerServiceGatewayHandlers(ctx context.Context, service *opts.GRPCService) {
|
||||
if len(service.GwRegistrationFuncs) < 1 {
|
||||
return
|
||||
} else if a.gatewayMux == nil {
|
||||
a.gatewayMux = runtime.NewServeMux()
|
||||
}
|
||||
|
||||
ctx, span := a.tracer.Start(ctx, "appgrpc.init.prepare.service.gwHandlers", trace.WithAttributes(
|
||||
attribute.String("service", service.Name),
|
||||
attribute.Int("gwHandlers", len(service.GwRegistrationFuncs)),
|
||||
))
|
||||
defer span.End()
|
||||
|
||||
// TODO: move to GetClientConn method which doesn't exist yet
|
||||
clientConn, _ := grpc.NewClient(a.opts.Listen)
|
||||
|
||||
for _, registerGW := range service.GwRegistrationFuncs {
|
||||
registerGW(ctx, a.gatewayMux, clientConn)
|
||||
}
|
||||
}
|
||||
|
||||
// Add our gateway mux to an existing http.ServeMux
|
||||
func (a *appGRPCServer) mergeGatewayMuxWithHTTPMux(mux *http.ServeMux) {
|
||||
mux.Handle(API_PATH, http.StripPrefix(API_PATH, a.gatewayMux))
|
||||
}
|
@ -4,6 +4,7 @@ import (
|
||||
"context"
|
||||
"strings"
|
||||
|
||||
"github.com/grpc-ecosystem/grpc-gateway/runtime"
|
||||
"github.com/rs/zerolog"
|
||||
"go.opentelemetry.io/otel/attribute"
|
||||
"go.opentelemetry.io/otel/codes"
|
||||
@ -23,10 +24,15 @@ type appGRPCServer struct {
|
||||
serverOpts []grpc.ServerOption
|
||||
logger *zerolog.Logger
|
||||
server *grpc.Server
|
||||
gatewayMux *runtime.ServeMux
|
||||
shutdownFunc func(context.Context) error
|
||||
doneChan chan error
|
||||
}
|
||||
|
||||
func (g *appGRPCServer) GetMux() *runtime.ServeMux {
|
||||
return g.gatewayMux
|
||||
}
|
||||
|
||||
// Returns a shutdown func, a channel indicating done / error,
|
||||
// and an up-front error if server fails to start
|
||||
func InitGRPCServer(ctx context.Context, opts *opts.GRPCOpts) (
|
||||
|
30
pkg/srv/grpc/opts/config.go
Normal file
30
pkg/srv/grpc/opts/config.go
Normal file
@ -0,0 +1,30 @@
|
||||
package opts
|
||||
|
||||
import (
|
||||
"context"
|
||||
|
||||
"github.com/grpc-ecosystem/grpc-gateway/runtime"
|
||||
"google.golang.org/grpc"
|
||||
|
||||
"gitea.libretechconsulting.com/rmcguire/go-app/pkg/config"
|
||||
)
|
||||
|
||||
type GRPCOpts struct {
|
||||
*config.GRPCConfig
|
||||
*AppGRPC
|
||||
}
|
||||
|
||||
type AppGRPC struct {
|
||||
Services []*GRPCService
|
||||
UnaryInterceptors []grpc.UnaryServerInterceptor
|
||||
StreamInterceptors []grpc.StreamServerInterceptor
|
||||
GRPCOpts []grpc.ServerOption
|
||||
GRPCDone <-chan error
|
||||
}
|
||||
|
||||
type GRPCService struct {
|
||||
Name string // Descriptive name of service
|
||||
Type *grpc.ServiceDesc // Type (from protoc generated code)
|
||||
Service any // Implementation of GRPCService.Type (ptr)
|
||||
GwRegistrationFuncs []func(context.Context, *runtime.ServeMux, *grpc.ClientConn) error // Gateway regustration handler funcs
|
||||
}
|
@ -1,26 +0,0 @@
|
||||
package opts
|
||||
|
||||
import (
|
||||
"google.golang.org/grpc"
|
||||
|
||||
"gitea.libretechconsulting.com/rmcguire/go-app/pkg/config"
|
||||
)
|
||||
|
||||
type GRPCOpts struct {
|
||||
*config.GRPCConfig
|
||||
*AppGRPC
|
||||
}
|
||||
|
||||
type AppGRPC struct {
|
||||
Services []*GRPCService
|
||||
UnaryInterceptors []grpc.UnaryServerInterceptor
|
||||
StreamInterceptors []grpc.StreamServerInterceptor
|
||||
GRPCOpts []grpc.ServerOption
|
||||
GRPCDone <-chan error
|
||||
}
|
||||
|
||||
type GRPCService struct {
|
||||
Name string // Descriptive name of service
|
||||
Type *grpc.ServiceDesc // Type (from protoc generated code)
|
||||
Service any // Implementation of GRPCService.Type (ptr)
|
||||
}
|
@ -3,7 +3,6 @@ package grpc
|
||||
import (
|
||||
"context"
|
||||
"errors"
|
||||
"fmt"
|
||||
|
||||
grpclogging "github.com/grpc-ecosystem/go-grpc-middleware/v2/interceptors/logging"
|
||||
"go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc"
|
||||
@ -48,10 +47,7 @@ func (a *appGRPCServer) prepGRPCServer(spanCtx context.Context) error {
|
||||
span.SetAttributes(attribute.Int("grpc.server.serveropts", len(a.serverOpts)))
|
||||
|
||||
// Load given services into server registry
|
||||
for _, service := range a.opts.Services {
|
||||
span.AddEvent(fmt.Sprintf("registered %s service", service.Name))
|
||||
a.server.RegisterService(service.Type, service.Service)
|
||||
}
|
||||
a.registerGRPCServices(ctx)
|
||||
span.SetAttributes(attribute.Int("grpc.server.grpcservices", len(a.opts.Services)))
|
||||
|
||||
// Enable reflection if desired
|
33
pkg/srv/grpc/services.go
Normal file
33
pkg/srv/grpc/services.go
Normal file
@ -0,0 +1,33 @@
|
||||
package grpc
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
|
||||
"go.opentelemetry.io/otel/attribute"
|
||||
"go.opentelemetry.io/otel/codes"
|
||||
"go.opentelemetry.io/otel/trace"
|
||||
|
||||
"gitea.libretechconsulting.com/rmcguire/go-app/pkg/srv/grpc/opts"
|
||||
)
|
||||
|
||||
func (a *appGRPCServer) registerGRPCServices(ctx context.Context) {
|
||||
ctx, span := a.tracer.Start(ctx, "appgrpc.init.prepare.services", trace.WithAttributes(
|
||||
attribute.Int("numServices", len(a.opts.Services)),
|
||||
))
|
||||
defer span.End()
|
||||
|
||||
for _, service := range a.opts.Services {
|
||||
a.registerGRPCService(ctx, service)
|
||||
a.registerServiceGatewayHandlers(ctx, service)
|
||||
}
|
||||
|
||||
span.SetStatus(codes.Ok, "")
|
||||
}
|
||||
|
||||
func (a *appGRPCServer) registerGRPCService(ctx context.Context, service *opts.GRPCService) {
|
||||
span := trace.SpanFromContext(ctx)
|
||||
span.AddEvent(fmt.Sprintf("registered %s service", service.Name))
|
||||
|
||||
a.server.RegisterService(service.Type, service.Service)
|
||||
}
|
Reference in New Issue
Block a user