add grpc support

This commit is contained in:
Ryan McGuire 2025-03-07 15:19:05 -05:00
parent 2cf15a4837
commit 00c8e2e4fc
8 changed files with 95 additions and 68 deletions

View File

@ -4,6 +4,7 @@ import (
"errors"
"github.com/rs/zerolog"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/codes"
"gitea.libretechconsulting.com/rmcguire/go-app/pkg/config"
@ -23,7 +24,7 @@ func (a *App) MustRun() {
a.l = zerolog.Ctx(a.AppContext)
a.shutdownFuncs = make([]shutdownFunc, 0)
a.appDone = make(chan any)
a.HTTP.httpDone = make(chan any)
a.HTTP.HTTPDone = make(chan any)
if len(a.HTTP.Funcs) < 1 {
a.l.Warn().Msg("no http funcs provided, only serving health and metrics")
@ -34,14 +35,21 @@ func (a *App) MustRun() {
ctx, initSpan := a.tracer.Start(a.AppContext, "init")
defer initSpan.End()
// Start HTTP
// Start HTTP (does not block)
if err := a.initHTTP(ctx); err != nil {
initSpan.RecordError(err)
initSpan.SetStatus(codes.Error, err.Error())
}
initSpan.AddEvent("http server started")
initSpan.SetAttributes(attribute.Int("http.handlers", len(a.HTTP.Funcs)))
// Start GRPC
a.initGRPC()
// Start GRPC (does not block)
if err := a.initGRPC(ctx); err != nil {
initSpan.RecordError(err)
initSpan.SetStatus(codes.Error, err.Error())
}
initSpan.AddEvent("grpc server started")
initSpan.SetAttributes(attribute.Int("grpc.services", len(a.GRPC.Services)))
// Monitor app lifecycle
go a.run()

View File

@ -7,11 +7,30 @@ import (
"go.opentelemetry.io/otel/codes"
"gitea.libretechconsulting.com/rmcguire/go-app/pkg/otel"
srvgrpc "gitea.libretechconsulting.com/rmcguire/go-app/pkg/srv/grpc"
grpcopts "gitea.libretechconsulting.com/rmcguire/go-app/pkg/srv/grpc/opts"
srvhttp "gitea.libretechconsulting.com/rmcguire/go-app/pkg/srv/http"
)
// TODO: Implement
func (a *App) initGRPC(ctx context.Context) error {
ctx, span := a.tracer.Start(ctx, "init.grpc")
defer span.End()
shutdown, doneChan, err := srvgrpc.InitGRPCServer(ctx,
&grpcopts.GRPCOpts{
GRPCConfig: a.cfg.GRPC,
AppGRPC: a.GRPC,
})
if err != nil {
span.RecordError(err)
span.SetStatus(codes.Error, err.Error())
return err
}
a.shutdownFuncs = append(a.shutdownFuncs, shutdown)
a.GRPC.GRPCDone = doneChan
span.SetStatus(codes.Ok, "")
return nil
}
@ -28,14 +47,8 @@ func (a *App) initHTTP(ctx context.Context) error {
attribute.Int("numHTTPHealthChecks", len(a.HTTP.HealthChecks)),
)
httpShutdown, a.HTTP.httpDone, err = srvhttp.InitHTTPServer(
&srvhttp.HTTPServerOpts{
Ctx: a.AppContext,
HandleFuncs: a.HTTP.Funcs,
Middleware: a.HTTP.Middleware,
HealthCheckFuncs: a.HTTP.HealthChecks,
},
)
a.HTTP.Ctx = a.AppContext
httpShutdown, a.HTTP.HTTPDone, err = srvhttp.InitHTTPServer(a.HTTP)
a.shutdownFuncs = append(a.shutdownFuncs, httpShutdown)

View File

@ -2,20 +2,19 @@ package app
import (
"context"
"net/http"
"github.com/rs/zerolog"
"go.opentelemetry.io/otel/trace"
"google.golang.org/grpc"
"gitea.libretechconsulting.com/rmcguire/go-app/pkg/config"
srvhttp "gitea.libretechconsulting.com/rmcguire/go-app/pkg/srv/http"
grpcopts "gitea.libretechconsulting.com/rmcguire/go-app/pkg/srv/grpc/opts"
httpopts "gitea.libretechconsulting.com/rmcguire/go-app/pkg/srv/http/opts"
)
type App struct {
AppContext context.Context
HTTP *AppHTTP
GRPC *AppGRPC
HTTP *httpopts.AppHTTP
GRPC *grpcopts.AppGRPC
cfg *config.AppConfig
l *zerolog.Logger
tracer trace.Tracer
@ -23,28 +22,4 @@ type App struct {
appDone chan any
}
type AppGRPC struct {
Services []*GRPCService
UnaryInterceptors []grpc.UnaryServerInterceptor
StreamInterceptors []grpc.StreamServerInterceptor
GRPCOpts []grpc.ServerOption
}
type GRPCService struct {
Name string // Descriptive name of service
Type *grpc.ServiceDesc // Type (from protoc generated code)
Service any // Implementation of GRPCService.Type (ptr)
grpcDone <-chan error
}
type AppHTTP struct {
Funcs []srvhttp.HTTPFunc
Middleware []http.Handler
HealthChecks []srvhttp.HealthCheckFunc
httpDone <-chan any
}
type (
healthCheckFunc func(context.Context) error
shutdownFunc func(context.Context) error
)
type shutdownFunc func(context.Context) error

View File

@ -5,6 +5,7 @@ import (
"sync"
"time"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/codes"
)
@ -16,12 +17,14 @@ func (a *App) run() {
case <-a.AppContext.Done():
a.l.Warn().Str("reason", a.AppContext.Err().Error()).
Msg("shutting down on context done")
case <-a.HTTP.httpDone:
case <-a.HTTP.HTTPDone: // TODO: return error on this channel
a.l.Warn().Msg("shutting down early on http server done")
case err := <-a.GRPC.GRPCDone:
a.l.Warn().Err(err).Msg("shutting down early on grpc server done")
}
a.Shutdown()
a.appDone <- nil
a.Shutdown() // Run through all shutdown funcs
a.appDone <- nil // Notify app
}
// Typically invoked when AppContext is done
@ -48,6 +51,8 @@ func (a *App) Shutdown() {
doneCtx, span := a.tracer.Start(doneCtx, "shutdown")
defer span.End()
span.SetAttributes(attribute.Int("shutdown.funcs", len(a.shutdownFuncs)))
var wg sync.WaitGroup
wg.Add(len(a.shutdownFuncs))

View File

@ -1,11 +1,26 @@
package opts
import (
"gitea.libretechconsulting.com/rmcguire/go-app/pkg/app"
"google.golang.org/grpc"
"gitea.libretechconsulting.com/rmcguire/go-app/pkg/config"
)
type GRPCOpts struct {
*config.GRPCConfig
*app.AppGRPC
*AppGRPC
}
type AppGRPC struct {
Services []*GRPCService
UnaryInterceptors []grpc.UnaryServerInterceptor
StreamInterceptors []grpc.StreamServerInterceptor
GRPCOpts []grpc.ServerOption
GRPCDone <-chan error
}
type GRPCService struct {
Name string // Descriptive name of service
Type *grpc.ServiceDesc // Type (from protoc generated code)
Service any // Implementation of GRPCService.Type (ptr)
}

View File

@ -17,6 +17,7 @@ import (
"gitea.libretechconsulting.com/rmcguire/go-app/pkg/config"
"gitea.libretechconsulting.com/rmcguire/go-app/pkg/otel"
"gitea.libretechconsulting.com/rmcguire/go-app/pkg/srv/http/opts"
)
var (
@ -27,20 +28,7 @@ var (
defIdleTimeout = 15 * time.Second
)
type HTTPFunc struct {
Path string
HandlerFunc http.HandlerFunc
}
type HTTPServerOpts struct {
Ctx context.Context
HandleFuncs []HTTPFunc
Middleware []http.Handler
HealthCheckFuncs []HealthCheckFunc
CustomListener net.Listener
}
func prepHTTPServer(opts *HTTPServerOpts) *http.Server {
func prepHTTPServer(opts *opts.AppHTTP) *http.Server {
var (
cfg = config.MustFromCtx(opts.Ctx)
l = zerolog.Ctx(opts.Ctx)
@ -54,11 +42,11 @@ func prepHTTPServer(opts *HTTPServerOpts) *http.Server {
mux.Handle(pattern, handler) // Associate pattern with handler
}
healthChecks := handleHealthCheckFunc(opts.Ctx, opts.HealthCheckFuncs...)
healthChecks := handleHealthCheckFunc(opts.Ctx, opts.HealthChecks...)
otelHandleFunc("/health", healthChecks)
otelHandleFunc("/", healthChecks)
for _, f := range opts.HandleFuncs {
for _, f := range opts.Funcs {
otelHandleFunc(f.Path, f.HandlerFunc)
}
@ -130,7 +118,7 @@ func prepHTTPServer(opts *HTTPServerOpts) *http.Server {
// Returns a shutdown func and a done channel if the
// server aborts abnormally. Returns error on failure to start
func InitHTTPServer(opts *HTTPServerOpts) (
func InitHTTPServer(opts *opts.AppHTTP) (
func(context.Context) error, <-chan any, error,
) {
l := zerolog.Ctx(opts.Ctx)

View File

@ -9,11 +9,11 @@ import (
"time"
"github.com/rs/zerolog"
"gitea.libretechconsulting.com/rmcguire/go-app/pkg/srv/http/opts"
)
type HealthCheckFunc func(context.Context) error
func handleHealthCheckFunc(ctx context.Context, hcFuncs ...HealthCheckFunc) func(w http.ResponseWriter, r *http.Request) {
func handleHealthCheckFunc(ctx context.Context, hcFuncs ...opts.HealthCheckFunc) func(w http.ResponseWriter, r *http.Request) {
// Return http handle func
return func(w http.ResponseWriter, r *http.Request) {
var (

View File

@ -0,0 +1,23 @@
package opts
import (
"context"
"net"
"net/http"
)
type AppHTTP struct {
Ctx context.Context
Funcs []HTTPFunc
Middleware []http.Handler
HealthChecks []HealthCheckFunc
CustomListener net.Listener
HTTPDone <-chan any
}
type HTTPFunc struct {
Path string
HandlerFunc http.HandlerFunc
}
type HealthCheckFunc func(context.Context) error