implement grpc gateway and addl handler support

This commit is contained in:
2025-03-24 16:24:03 -04:00
parent 4e014d5ea0
commit 262f6a4232
12 changed files with 241 additions and 165 deletions

View File

@ -1,90 +0,0 @@
package app
import (
"errors"
"sync"
"github.com/rs/zerolog"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/codes"
"gitea.libretechconsulting.com/rmcguire/go-app/pkg/config"
)
func (a *App) Done() <-chan any {
return a.appDone
}
func (a *App) MustRun() {
if a.cfg != nil {
panic(errors.New("already ran app trying to run"))
}
// Set up app
a.cfg = config.MustFromCtx(a.AppContext)
a.l = zerolog.Ctx(a.AppContext)
a.shutdownFuncs = make([]shutdownFunc, 0)
a.appDone = make(chan any)
a.HTTP.HTTPDone = make(chan any)
if !a.cfg.HTTPEnabled() && !a.cfg.GRPCEnabled() {
panic(errors.New("neither http nor grpc enabled, nothing to do"))
}
if len(a.HTTP.Funcs) < 1 {
a.l.Warn().Msg("no http funcs provided, only serving health and metrics")
}
// Start OTEL
// Registers a NO-OP provider if not enabled
a.initOTEL()
ctx, initSpan := a.tracer.Start(a.AppContext, "init")
defer initSpan.End()
var serverWG sync.WaitGroup
// TODO: Figure out where to merge GRPC Gateway Serve Mux
// into HTTP serve mux
// Start HTTP (does not block)
if a.cfg.HTTPEnabled() {
serverWG.Add(1)
go func() {
defer serverWG.Done()
if err := a.initHTTP(ctx); err != nil {
initSpan.RecordError(err)
initSpan.SetStatus(codes.Error, err.Error())
}
initSpan.AddEvent("http server started")
initSpan.SetAttributes(attribute.Int("http.handlers", len(a.HTTP.Funcs)))
}()
}
// Start GRPC (does not block)
if a.cfg.GRPCEnabled() {
serverWG.Add(1)
go func() {
defer serverWG.Done()
if err := a.initGRPC(ctx); err != nil {
initSpan.RecordError(err)
initSpan.SetStatus(codes.Error, err.Error())
}
initSpan.AddEvent("grpc server started")
initSpan.SetAttributes(attribute.Int("grpc.services", len(a.GRPC.Services)))
}()
}
serverWG.Wait()
// Monitor app lifecycle
go a.run()
// Startup Complete
a.l.Info().
Str("name", a.cfg.Name).
Str("version", a.cfg.Version).
Str("logLevel", a.cfg.Logging.Level).
Msg("app initialized")
initSpan.SetStatus(codes.Ok, "")
}

72
pkg/app/monitor.go Normal file
View File

@ -0,0 +1,72 @@
package app
import (
"context"
"sync"
"time"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/codes"
)
// Watches contexts and channels for the
// app to be finished and calls shutdown once
// the app is done
func (a *App) monitor() {
select {
case <-a.AppContext.Done():
a.l.Warn().Str("reason", a.AppContext.Err().Error()).
Msg("shutting down on context done")
case <-a.HTTP.HTTPDone: // TODO: return error on this channel
a.l.Warn().Msg("shutting down early on http server done")
case err := <-a.GRPC.GRPCDone:
a.l.Warn().Err(err).Msg("shutting down early on grpc server done")
}
a.Shutdown() // Run through all shutdown funcs
a.appDone <- nil // Notify app
}
// Typically invoked when AppContext is done
// or Server has exited. Not intended to be called
// manually
func (a *App) Shutdown() {
now := time.Now()
doneCtx, cncl := context.WithTimeout(context.Background(), 15*time.Second)
defer func() {
if doneCtx.Err() == context.DeadlineExceeded {
a.l.Err(doneCtx.Err()).
Dur("shutdownTime", time.Since(now)).
Msg("app shutdown aborted")
} else {
a.l.Info().
Int("shutdownFuncsCalled", len(a.shutdownFuncs)).
Dur("shutdownTime", time.Since(now)).
Msg("app shutdown normally")
}
cncl()
}()
doneCtx, span := a.tracer.Start(doneCtx, "shutdown")
defer span.End()
span.SetAttributes(attribute.Int("shutdown.funcs", len(a.shutdownFuncs)))
var wg sync.WaitGroup
wg.Add(len(a.shutdownFuncs))
for _, f := range a.shutdownFuncs {
go func() {
defer wg.Done()
err := f(doneCtx)
if err != nil {
span.SetStatus(codes.Error, "shutdown failed")
span.RecordError(err)
a.l.Err(err).Send()
}
}()
}
wg.Wait()
}

View File

@ -1,72 +1,68 @@
package app
import (
"context"
"sync"
"time"
"errors"
"go.opentelemetry.io/otel/attribute"
"github.com/rs/zerolog"
"go.opentelemetry.io/otel/codes"
"gitea.libretechconsulting.com/rmcguire/go-app/pkg/config"
)
// Watches contexts and channels for the
// app to be finished and calls shutdown once
// the app is done
func (a *App) run() {
select {
case <-a.AppContext.Done():
a.l.Warn().Str("reason", a.AppContext.Err().Error()).
Msg("shutting down on context done")
case <-a.HTTP.HTTPDone: // TODO: return error on this channel
a.l.Warn().Msg("shutting down early on http server done")
case err := <-a.GRPC.GRPCDone:
a.l.Warn().Err(err).Msg("shutting down early on grpc server done")
}
// TODO: Make Configurable
const GRPC_GW_API_PATH = "/api"
a.Shutdown() // Run through all shutdown funcs
a.appDone <- nil // Notify app
func (a *App) Done() <-chan any {
return a.appDone
}
// Typically invoked when AppContext is done
// or Server has exited. Not intended to be called
// manually
func (a *App) Shutdown() {
now := time.Now()
doneCtx, cncl := context.WithTimeout(context.Background(), 15*time.Second)
defer func() {
if doneCtx.Err() == context.DeadlineExceeded {
a.l.Err(doneCtx.Err()).
Dur("shutdownTime", time.Since(now)).
Msg("app shutdown aborted")
} else {
a.l.Info().
Int("shutdownFuncsCalled", len(a.shutdownFuncs)).
Dur("shutdownTime", time.Since(now)).
Msg("app shutdown normally")
}
cncl()
}()
doneCtx, span := a.tracer.Start(doneCtx, "shutdown")
defer span.End()
span.SetAttributes(attribute.Int("shutdown.funcs", len(a.shutdownFuncs)))
var wg sync.WaitGroup
wg.Add(len(a.shutdownFuncs))
for _, f := range a.shutdownFuncs {
go func() {
defer wg.Done()
err := f(doneCtx)
if err != nil {
span.SetStatus(codes.Error, "shutdown failed")
span.RecordError(err)
a.l.Err(err).Send()
}
}()
func (a *App) MustRun() {
if a.cfg != nil {
panic(errors.New("already ran app trying to run"))
}
wg.Wait()
// Set up app
a.cfg = config.MustFromCtx(a.AppContext)
a.l = zerolog.Ctx(a.AppContext)
a.shutdownFuncs = make([]shutdownFunc, 0)
a.appDone = make(chan any)
a.HTTP.HTTPDone = make(chan any)
if !a.cfg.HTTPEnabled() && !a.cfg.GRPCEnabled() {
panic(errors.New("neither http nor grpc enabled, nothing to do"))
}
if len(a.HTTP.Funcs) < 1 {
a.l.Warn().Msg("no http funcs provided, only serving health and metrics")
}
// Start OTEL
// Registers a NO-OP provider if not enabled
a.initOTEL()
// With OTEL ready, create an init span to track startup
ctx, initSpan := a.tracer.Start(a.AppContext, "init")
defer initSpan.End()
// Prepare GRPC first. The GRPC server may update its opts
// with a prepared GRPC-Gateway runtime.ServeMux if any of its services
// have added GRPC-Gateway handlers. If present, serve under api path
if a.cfg.GRPCEnabled() {
a.runGRPC(ctx)
}
if a.cfg.HTTPEnabled() {
a.runHTTP(ctx)
}
// Monitor app lifecycle
go a.monitor()
// Startup Complete
a.l.Info().
Str("name", a.cfg.Name).
Str("version", a.cfg.Version).
Str("logLevel", a.cfg.Logging.Level).
Msg("app initialized")
initSpan.SetStatus(codes.Ok, "")
}

35
pkg/app/run_grpc.go Normal file
View File

@ -0,0 +1,35 @@
package app
import (
"context"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/codes"
"go.opentelemetry.io/otel/trace"
"gitea.libretechconsulting.com/rmcguire/go-app/pkg/srv/http/opts"
)
func (a *App) runGRPC(ctx context.Context) {
span := trace.SpanFromContext(ctx)
if err := a.initGRPC(ctx); err != nil {
span.RecordError(err)
span.SetStatus(codes.Error, err.Error())
}
span.AddEvent("grpc server started")
span.SetAttributes(attribute.Int("grpc.services", len(a.GRPC.Services)))
if a.GRPC.GetGatewayMux() != nil {
if a.HTTP.Handlers == nil {
a.HTTP.Handlers = make([]opts.HTTPHandler, 0, 1)
}
a.HTTP.Handlers = append(a.HTTP.Handlers, opts.HTTPHandler{
Prefix: GRPC_GW_API_PATH,
StripPrefix: true,
Handler: a.GRPC.GetGatewayMux(),
})
span.AddEvent("GRPC Gateway Mux Registered")
}
}

19
pkg/app/run_http.go Normal file
View File

@ -0,0 +1,19 @@
package app
import (
"context"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/codes"
"go.opentelemetry.io/otel/trace"
)
func (a *App) runHTTP(ctx context.Context) {
span := trace.SpanFromContext(ctx)
if err := a.initHTTP(ctx); err != nil {
span.RecordError(err)
span.SetStatus(codes.Error, err.Error())
}
span.AddEvent("http server started")
span.SetAttributes(attribute.Int("http.handlers", len(a.HTTP.Funcs)))
}