diff --git a/pkg/app/app.go b/pkg/app/app.go deleted file mode 100644 index 697c77b..0000000 --- a/pkg/app/app.go +++ /dev/null @@ -1,90 +0,0 @@ -package app - -import ( - "errors" - "sync" - - "github.com/rs/zerolog" - "go.opentelemetry.io/otel/attribute" - "go.opentelemetry.io/otel/codes" - - "gitea.libretechconsulting.com/rmcguire/go-app/pkg/config" -) - -func (a *App) Done() <-chan any { - return a.appDone -} - -func (a *App) MustRun() { - if a.cfg != nil { - panic(errors.New("already ran app trying to run")) - } - - // Set up app - a.cfg = config.MustFromCtx(a.AppContext) - a.l = zerolog.Ctx(a.AppContext) - a.shutdownFuncs = make([]shutdownFunc, 0) - a.appDone = make(chan any) - a.HTTP.HTTPDone = make(chan any) - - if !a.cfg.HTTPEnabled() && !a.cfg.GRPCEnabled() { - panic(errors.New("neither http nor grpc enabled, nothing to do")) - } - - if len(a.HTTP.Funcs) < 1 { - a.l.Warn().Msg("no http funcs provided, only serving health and metrics") - } - - // Start OTEL - // Registers a NO-OP provider if not enabled - a.initOTEL() - - ctx, initSpan := a.tracer.Start(a.AppContext, "init") - defer initSpan.End() - - var serverWG sync.WaitGroup - - // TODO: Figure out where to merge GRPC Gateway Serve Mux - // into HTTP serve mux - - // Start HTTP (does not block) - if a.cfg.HTTPEnabled() { - serverWG.Add(1) - go func() { - defer serverWG.Done() - if err := a.initHTTP(ctx); err != nil { - initSpan.RecordError(err) - initSpan.SetStatus(codes.Error, err.Error()) - } - initSpan.AddEvent("http server started") - initSpan.SetAttributes(attribute.Int("http.handlers", len(a.HTTP.Funcs))) - }() - } - - // Start GRPC (does not block) - if a.cfg.GRPCEnabled() { - serverWG.Add(1) - go func() { - defer serverWG.Done() - if err := a.initGRPC(ctx); err != nil { - initSpan.RecordError(err) - initSpan.SetStatus(codes.Error, err.Error()) - } - initSpan.AddEvent("grpc server started") - initSpan.SetAttributes(attribute.Int("grpc.services", len(a.GRPC.Services))) - }() - } - - serverWG.Wait() - - // Monitor app lifecycle - go a.run() - - // Startup Complete - a.l.Info(). - Str("name", a.cfg.Name). - Str("version", a.cfg.Version). - Str("logLevel", a.cfg.Logging.Level). - Msg("app initialized") - initSpan.SetStatus(codes.Ok, "") -} diff --git a/pkg/app/app_init.go b/pkg/app/init.go similarity index 100% rename from pkg/app/app_init.go rename to pkg/app/init.go diff --git a/pkg/app/monitor.go b/pkg/app/monitor.go new file mode 100644 index 0000000..27bfcef --- /dev/null +++ b/pkg/app/monitor.go @@ -0,0 +1,72 @@ +package app + +import ( + "context" + "sync" + "time" + + "go.opentelemetry.io/otel/attribute" + "go.opentelemetry.io/otel/codes" +) + +// Watches contexts and channels for the +// app to be finished and calls shutdown once +// the app is done +func (a *App) monitor() { + select { + case <-a.AppContext.Done(): + a.l.Warn().Str("reason", a.AppContext.Err().Error()). + Msg("shutting down on context done") + case <-a.HTTP.HTTPDone: // TODO: return error on this channel + a.l.Warn().Msg("shutting down early on http server done") + case err := <-a.GRPC.GRPCDone: + a.l.Warn().Err(err).Msg("shutting down early on grpc server done") + } + + a.Shutdown() // Run through all shutdown funcs + a.appDone <- nil // Notify app +} + +// Typically invoked when AppContext is done +// or Server has exited. Not intended to be called +// manually +func (a *App) Shutdown() { + now := time.Now() + + doneCtx, cncl := context.WithTimeout(context.Background(), 15*time.Second) + defer func() { + if doneCtx.Err() == context.DeadlineExceeded { + a.l.Err(doneCtx.Err()). + Dur("shutdownTime", time.Since(now)). + Msg("app shutdown aborted") + } else { + a.l.Info(). + Int("shutdownFuncsCalled", len(a.shutdownFuncs)). + Dur("shutdownTime", time.Since(now)). + Msg("app shutdown normally") + } + cncl() + }() + + doneCtx, span := a.tracer.Start(doneCtx, "shutdown") + defer span.End() + + span.SetAttributes(attribute.Int("shutdown.funcs", len(a.shutdownFuncs))) + + var wg sync.WaitGroup + wg.Add(len(a.shutdownFuncs)) + + for _, f := range a.shutdownFuncs { + go func() { + defer wg.Done() + err := f(doneCtx) + if err != nil { + span.SetStatus(codes.Error, "shutdown failed") + span.RecordError(err) + a.l.Err(err).Send() + } + }() + } + + wg.Wait() +} diff --git a/pkg/app/run.go b/pkg/app/run.go index 88f591d..3f012bb 100644 --- a/pkg/app/run.go +++ b/pkg/app/run.go @@ -1,72 +1,68 @@ package app import ( - "context" - "sync" - "time" + "errors" - "go.opentelemetry.io/otel/attribute" + "github.com/rs/zerolog" "go.opentelemetry.io/otel/codes" + + "gitea.libretechconsulting.com/rmcguire/go-app/pkg/config" ) -// Watches contexts and channels for the -// app to be finished and calls shutdown once -// the app is done -func (a *App) run() { - select { - case <-a.AppContext.Done(): - a.l.Warn().Str("reason", a.AppContext.Err().Error()). - Msg("shutting down on context done") - case <-a.HTTP.HTTPDone: // TODO: return error on this channel - a.l.Warn().Msg("shutting down early on http server done") - case err := <-a.GRPC.GRPCDone: - a.l.Warn().Err(err).Msg("shutting down early on grpc server done") - } +// TODO: Make Configurable +const GRPC_GW_API_PATH = "/api" - a.Shutdown() // Run through all shutdown funcs - a.appDone <- nil // Notify app +func (a *App) Done() <-chan any { + return a.appDone } -// Typically invoked when AppContext is done -// or Server has exited. Not intended to be called -// manually -func (a *App) Shutdown() { - now := time.Now() - - doneCtx, cncl := context.WithTimeout(context.Background(), 15*time.Second) - defer func() { - if doneCtx.Err() == context.DeadlineExceeded { - a.l.Err(doneCtx.Err()). - Dur("shutdownTime", time.Since(now)). - Msg("app shutdown aborted") - } else { - a.l.Info(). - Int("shutdownFuncsCalled", len(a.shutdownFuncs)). - Dur("shutdownTime", time.Since(now)). - Msg("app shutdown normally") - } - cncl() - }() - - doneCtx, span := a.tracer.Start(doneCtx, "shutdown") - defer span.End() - - span.SetAttributes(attribute.Int("shutdown.funcs", len(a.shutdownFuncs))) - - var wg sync.WaitGroup - wg.Add(len(a.shutdownFuncs)) - - for _, f := range a.shutdownFuncs { - go func() { - defer wg.Done() - err := f(doneCtx) - if err != nil { - span.SetStatus(codes.Error, "shutdown failed") - span.RecordError(err) - a.l.Err(err).Send() - } - }() +func (a *App) MustRun() { + if a.cfg != nil { + panic(errors.New("already ran app trying to run")) } - wg.Wait() + // Set up app + a.cfg = config.MustFromCtx(a.AppContext) + a.l = zerolog.Ctx(a.AppContext) + a.shutdownFuncs = make([]shutdownFunc, 0) + a.appDone = make(chan any) + a.HTTP.HTTPDone = make(chan any) + + if !a.cfg.HTTPEnabled() && !a.cfg.GRPCEnabled() { + panic(errors.New("neither http nor grpc enabled, nothing to do")) + } + + if len(a.HTTP.Funcs) < 1 { + a.l.Warn().Msg("no http funcs provided, only serving health and metrics") + } + + // Start OTEL + // Registers a NO-OP provider if not enabled + a.initOTEL() + + // With OTEL ready, create an init span to track startup + ctx, initSpan := a.tracer.Start(a.AppContext, "init") + defer initSpan.End() + + // Prepare GRPC first. The GRPC server may update its opts + // with a prepared GRPC-Gateway runtime.ServeMux if any of its services + // have added GRPC-Gateway handlers. If present, serve under api path + if a.cfg.GRPCEnabled() { + a.runGRPC(ctx) + } + + if a.cfg.HTTPEnabled() { + a.runHTTP(ctx) + } + + // Monitor app lifecycle + go a.monitor() + + // Startup Complete + a.l.Info(). + Str("name", a.cfg.Name). + Str("version", a.cfg.Version). + Str("logLevel", a.cfg.Logging.Level). + Msg("app initialized") + initSpan.SetStatus(codes.Ok, "") } diff --git a/pkg/app/run_grpc.go b/pkg/app/run_grpc.go new file mode 100644 index 0000000..7e73832 --- /dev/null +++ b/pkg/app/run_grpc.go @@ -0,0 +1,35 @@ +package app + +import ( + "context" + + "go.opentelemetry.io/otel/attribute" + "go.opentelemetry.io/otel/codes" + "go.opentelemetry.io/otel/trace" + + "gitea.libretechconsulting.com/rmcguire/go-app/pkg/srv/http/opts" +) + +func (a *App) runGRPC(ctx context.Context) { + span := trace.SpanFromContext(ctx) + if err := a.initGRPC(ctx); err != nil { + span.RecordError(err) + span.SetStatus(codes.Error, err.Error()) + } + span.AddEvent("grpc server started") + span.SetAttributes(attribute.Int("grpc.services", len(a.GRPC.Services))) + + if a.GRPC.GetGatewayMux() != nil { + if a.HTTP.Handlers == nil { + a.HTTP.Handlers = make([]opts.HTTPHandler, 0, 1) + } + + a.HTTP.Handlers = append(a.HTTP.Handlers, opts.HTTPHandler{ + Prefix: GRPC_GW_API_PATH, + StripPrefix: true, + Handler: a.GRPC.GetGatewayMux(), + }) + + span.AddEvent("GRPC Gateway Mux Registered") + } +} diff --git a/pkg/app/run_http.go b/pkg/app/run_http.go new file mode 100644 index 0000000..6c90b2c --- /dev/null +++ b/pkg/app/run_http.go @@ -0,0 +1,19 @@ +package app + +import ( + "context" + + "go.opentelemetry.io/otel/attribute" + "go.opentelemetry.io/otel/codes" + "go.opentelemetry.io/otel/trace" +) + +func (a *App) runHTTP(ctx context.Context) { + span := trace.SpanFromContext(ctx) + if err := a.initHTTP(ctx); err != nil { + span.RecordError(err) + span.SetStatus(codes.Error, err.Error()) + } + span.AddEvent("http server started") + span.SetAttributes(attribute.Int("http.handlers", len(a.HTTP.Funcs))) +} diff --git a/pkg/srv/grpc/gateway.go b/pkg/srv/grpc/gateway.go index d15ec13..1c86156 100644 --- a/pkg/srv/grpc/gateway.go +++ b/pkg/srv/grpc/gateway.go @@ -2,20 +2,18 @@ package grpc import ( "context" - "net/http" + "fmt" + "net" "github.com/grpc-ecosystem/grpc-gateway/runtime" "go.opentelemetry.io/otel/attribute" + "go.opentelemetry.io/otel/codes" "go.opentelemetry.io/otel/trace" "google.golang.org/grpc" "gitea.libretechconsulting.com/rmcguire/go-app/pkg/srv/grpc/opts" ) -const API_PATH = "/api" // TODO: Make Configurable - -// TODO: Add GetClientConn method to appGRPCServer that supports TLS, auth, etc.. -// Will probably have to convert server opts to client opts func (a *appGRPCServer) registerServiceGatewayHandlers(ctx context.Context, service *opts.GRPCService) { if len(service.GwRegistrationFuncs) < 1 { return @@ -29,15 +27,30 @@ func (a *appGRPCServer) registerServiceGatewayHandlers(ctx context.Context, serv )) defer span.End() - // TODO: move to GetClientConn method which doesn't exist yet - clientConn, _ := grpc.NewClient(a.opts.Listen) + clientConn := a.GetClientConn(ctx) for _, registerGW := range service.GwRegistrationFuncs { registerGW(ctx, a.gatewayMux, clientConn) } } -// Add our gateway mux to an existing http.ServeMux -func (a *appGRPCServer) mergeGatewayMuxWithHTTPMux(mux *http.ServeMux) { - mux.Handle(API_PATH, http.StripPrefix(API_PATH, a.gatewayMux)) +func (a *appGRPCServer) GetClientConn(ctx context.Context) *grpc.ClientConn { + span := trace.SpanFromContext(ctx) + + host, port, err := net.SplitHostPort(a.opts.Listen) + if err != nil { + panic(err) + } + + if host == "" || host == "0.0.0.0" { + host = "localhost" + } + + clientConn, err := grpc.NewClient(fmt.Sprintf("%s:%s", host, port), a.opts.GRPCDialOpts...) + if err != nil { + span.RecordError(err) + span.SetStatus(codes.Error, err.Error()) + } + + return clientConn } diff --git a/pkg/srv/grpc/grpc.go b/pkg/srv/grpc/grpc.go index f8d9c09..09a7ff0 100644 --- a/pkg/srv/grpc/grpc.go +++ b/pkg/srv/grpc/grpc.go @@ -29,10 +29,6 @@ type appGRPCServer struct { doneChan chan error } -func (g *appGRPCServer) GetMux() *runtime.ServeMux { - return g.gatewayMux -} - // Returns a shutdown func, a channel indicating done / error, // and an up-front error if server fails to start func InitGRPCServer(ctx context.Context, opts *opts.GRPCOpts) ( diff --git a/pkg/srv/grpc/opts/config.go b/pkg/srv/grpc/opts/config.go index 2cc638a..efed352 100644 --- a/pkg/srv/grpc/opts/config.go +++ b/pkg/srv/grpc/opts/config.go @@ -19,7 +19,9 @@ type AppGRPC struct { UnaryInterceptors []grpc.UnaryServerInterceptor StreamInterceptors []grpc.StreamServerInterceptor GRPCOpts []grpc.ServerOption + GRPCDialOpts []grpc.DialOption // Map ServerOptions to DialOpts for GRPC Gateway support GRPCDone <-chan error + gatewayMux *runtime.ServeMux } type GRPCService struct { @@ -28,3 +30,11 @@ type GRPCService struct { Service any // Implementation of GRPCService.Type (ptr) GwRegistrationFuncs []func(context.Context, *runtime.ServeMux, *grpc.ClientConn) error // Gateway regustration handler funcs } + +func (a *AppGRPC) SetGatewayMux(mux *runtime.ServeMux) { + a.gatewayMux = mux +} + +func (a *AppGRPC) GetGatewayMux() *runtime.ServeMux { + return a.gatewayMux +} diff --git a/pkg/srv/grpc/prepare.go b/pkg/srv/grpc/prepare.go index 736f25c..774d53d 100644 --- a/pkg/srv/grpc/prepare.go +++ b/pkg/srv/grpc/prepare.go @@ -50,6 +50,15 @@ func (a *appGRPCServer) prepGRPCServer(spanCtx context.Context) error { a.registerGRPCServices(ctx) span.SetAttributes(attribute.Int("grpc.server.grpcservices", len(a.opts.Services))) + // If a grpc-gateway mux was created, store it in opts + // so it can be used by AppHTTP + if a.gatewayMux != nil { + a.opts.SetGatewayMux(a.gatewayMux) + span.SetAttributes(attribute.Bool("grpc.server.grpcgateway.enabled", true)) + } else { + span.SetAttributes(attribute.Bool("grpc.server.grpcgateway.enabled", false)) + } + // Enable reflection if desired if a.opts.EnableReflection { reflection.Register(a.server) diff --git a/pkg/srv/http/http.go b/pkg/srv/http/http.go index 5d91a18..456fb9a 100644 --- a/pkg/srv/http/http.go +++ b/pkg/srv/http/http.go @@ -57,6 +57,15 @@ func prepHTTPServer(opts *opts.AppHTTP) *http.Server { Msg("mounted prometheus metrics endpoint") } + // Inject extra handlers if given + // Used for grpc-gateway runtime.ServeMux handlers + for _, h := range opts.Handlers { + if h.StripPrefix { + h.Handler = http.StripPrefix(h.Prefix, h.Handler) + } + mux.Handle(h.Prefix, h.Handler) + } + // Add OTEL, skip health-check spans // NOTE: Add any other span exclusions here handler := otelhttp.NewHandler(mux, "/", diff --git a/pkg/srv/http/opts/http_config.go b/pkg/srv/http/opts/http_config.go index a34baf0..f18e620 100644 --- a/pkg/srv/http/opts/http_config.go +++ b/pkg/srv/http/opts/http_config.go @@ -8,8 +8,9 @@ import ( type AppHTTP struct { Ctx context.Context - Funcs []HTTPFunc - Middleware []http.Handler + Funcs []HTTPFunc // Handler funcs, will be wrapped with OTEL + Middleware []http.Handler // Middleware (e.g. request logging) + Handlers []HTTPHandler // Raw Handler/Mux to add, optional prefix stripping HealthChecks []HealthCheckFunc CustomListener net.Listener HTTPDone <-chan any @@ -20,4 +21,10 @@ type HTTPFunc struct { HandlerFunc http.HandlerFunc } +type HTTPHandler struct { + Prefix string // path prefix under which to serve this handler/mux + StripPrefix bool // strip path before sending to handler/mux + Handler http.Handler +} + type HealthCheckFunc func(context.Context) error