add weather grpc

This commit is contained in:
2025-03-07 08:42:38 -05:00
parent 075902ecd8
commit 81676c5404
9 changed files with 68 additions and 23 deletions

173
pkg/srv/http/http.go Normal file
View File

@ -0,0 +1,173 @@
package http
import (
"context"
"fmt"
"log"
"net"
"net/http"
"os"
"time"
"github.com/prometheus/client_golang/prometheus/promhttp"
"github.com/rs/zerolog"
"go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp"
"go.opentelemetry.io/otel/metric"
"go.opentelemetry.io/otel/trace"
"gitea.libretechconsulting.com/rmcguire/go-app/pkg/config"
"gitea.libretechconsulting.com/rmcguire/go-app/pkg/otel"
)
var (
httpMeter metric.Meter
httpTracer trace.Tracer
defReadTimeout = 10 * time.Second
defWriteTimeout = 10 * time.Second
defIdleTimeout = 15 * time.Second
)
type HTTPFunc struct {
Path string
HandlerFunc http.HandlerFunc
}
type HTTPServerOpts struct {
Ctx context.Context
HandleFuncs []HTTPFunc
Middleware []http.Handler
HealthCheckFuncs []HealthCheckFunc
CustomListener net.Listener
}
func prepHTTPServer(opts *HTTPServerOpts) *http.Server {
var (
cfg = config.MustFromCtx(opts.Ctx)
l = zerolog.Ctx(opts.Ctx)
mux = &http.ServeMux{}
)
// NOTE: Wraps handle func with otelhttp handler and
// inserts route tag
otelHandleFunc := func(pattern string, handlerFunc func(http.ResponseWriter, *http.Request)) {
handler := otelhttp.WithRouteTag(pattern, http.HandlerFunc(handlerFunc))
mux.Handle(pattern, handler) // Associate pattern with handler
}
healthChecks := handleHealthCheckFunc(opts.Ctx, opts.HealthCheckFuncs...)
otelHandleFunc("/health", healthChecks)
otelHandleFunc("/", healthChecks)
for _, f := range opts.HandleFuncs {
otelHandleFunc(f.Path, f.HandlerFunc)
}
// Prometheus metrics endpoint
if cfg.OTEL.PrometheusEnabled {
mux.Handle(cfg.OTEL.PrometheusPath, promhttp.Handler())
l.Info().Str("prometheusPath", cfg.OTEL.PrometheusPath).
Msg("mounted prometheus metrics endpoint")
}
// Add OTEL, skip health-check spans
// NOTE: Add any other span exclusions here
handler := otelhttp.NewHandler(mux, "/",
otelhttp.WithFilter(func(r *http.Request) bool {
switch r.URL.Path {
case "/health":
return false
case cfg.OTEL.PrometheusPath:
return false
default:
return true
}
}))
// Set timeouts from defaults, override
// with config timeouts if set
readTimeout := defReadTimeout
writeTimeout := defWriteTimeout
idleTimeout := defIdleTimeout
rT, wT, iT := cfg.HTTP.Timeouts()
if rT != nil {
readTimeout = *rT
}
if wT != nil {
writeTimeout = *wT
}
if iT != nil {
idleTimeout = *iT
}
// Inject any supplied middleware
for i := len(opts.Middleware) - 1; i >= 0; i-- {
mw := opts.Middleware[i]
next := handler
handler = http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
mw.ServeHTTP(w, r)
next.ServeHTTP(w, r)
})
}
// Inject logging middleware
if cfg.HTTP.LogRequests {
handler = loggingMiddleware(opts.Ctx, handler)
}
return &http.Server{
Addr: cfg.HTTP.Listen,
ReadTimeout: readTimeout,
WriteTimeout: writeTimeout,
IdleTimeout: idleTimeout,
Handler: handler,
ErrorLog: log.New(os.Stderr, fmt.Sprintf("Go-HTTP[%s]", cfg.Name), log.Flags()),
BaseContext: func(_ net.Listener) context.Context {
return opts.Ctx
},
}
}
// Returns a shutdown func and a done channel if the
// server aborts abnormally. Returns error on failure to start
func InitHTTPServer(opts *HTTPServerOpts) (
func(context.Context) error, <-chan any, error,
) {
l := zerolog.Ctx(opts.Ctx)
doneChan := make(chan any)
var server *http.Server
httpMeter = otel.GetMeter(opts.Ctx, "http")
httpTracer = otel.GetTracer(opts.Ctx, "http")
server = prepHTTPServer(opts)
go func() {
var err error
if opts.CustomListener != nil {
err = server.Serve(opts.CustomListener)
} else {
err = server.ListenAndServe()
}
if err != nil && err != http.ErrServerClosed {
l.Err(err).Msg("HTTP server error")
} else {
l.Info().Msg("HTTP server shut down")
}
// Notify app initiator
doneChan <- nil
}()
l.Debug().Msg("HTTP Server Started")
// Shut down http server with a deadline
return func(shutdownCtx context.Context) error {
l.Debug().Msg("stopping http server")
server.Shutdown(shutdownCtx)
return nil
}, doneChan, nil
}

View File

@ -0,0 +1,67 @@
package http
import (
"context"
"errors"
"math/rand"
"net/http"
"sync"
"time"
"github.com/rs/zerolog"
)
type HealthCheckFunc func(context.Context) error
func handleHealthCheckFunc(ctx context.Context, hcFuncs ...HealthCheckFunc) func(w http.ResponseWriter, r *http.Request) {
// Return http handle func
return func(w http.ResponseWriter, r *http.Request) {
var (
healthChecksFailed bool
errs error
hcWg sync.WaitGroup
)
if len(hcFuncs) < 1 {
zerolog.Ctx(ctx).Warn().Msg("no health checks given responding with dummy 200")
hcFuncs = append(hcFuncs, dummyHealthCheck)
}
// Run all health check funcs concurrently
// log all errors
hcWg.Add(len(hcFuncs))
for _, hc := range hcFuncs {
go func() {
defer hcWg.Done()
errs = errors.Join(errs, hc(ctx))
}()
}
hcWg.Wait()
if errs != nil {
healthChecksFailed = true
}
if healthChecksFailed {
w.WriteHeader(http.StatusInternalServerError)
}
if errs != nil {
w.Write([]byte(errs.Error()))
} else {
w.Write([]byte("ok"))
}
}
}
func dummyHealthCheck(ctx context.Context) error {
workFor := rand.Intn(750)
ticker := time.NewTicker(time.Duration(time.Duration(workFor) * time.Millisecond))
select {
case <-ticker.C:
return nil
case <-ctx.Done():
return ctx.Err()
}
}

87
pkg/srv/http/http_log.go Normal file
View File

@ -0,0 +1,87 @@
package http
import (
"bytes"
"context"
"errors"
"net/http"
"regexp"
"time"
"github.com/rs/zerolog"
)
var ExcludeFromLogging = regexp.MustCompile(`\/(ready|live|metrics)$`)
type LoggingResponseWriter struct {
http.ResponseWriter
statusCode int
body *bytes.Buffer
}
func loggingMiddleware(appCtx context.Context, next http.Handler) http.Handler {
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
if ExcludeFromLogging.Match([]byte(r.URL.Path)) {
next.ServeHTTP(w, r)
return
}
log := zerolog.Ctx(appCtx)
start := time.Now()
lrr := newLoggingResponseWriter(w)
next.ServeHTTP(lrr, r)
log.Debug().
Str("path", r.URL.Path).
Any("query", r.URL.Query()).
Int("statusCode", lrr.statusCode).
Str("protocol", r.Proto).
Str("remote", r.RemoteAddr).
Dur("duration", time.Since(start)).
Msg("http request served")
// Log response body
trcLog := log.Trace().
Str("path", r.URL.Path).
Int("statusCode", lrr.statusCode)
// Check if it's JSON
firstByte, err := lrr.body.ReadByte()
if err != nil {
trcLog.Err(errors.New("invalid response body")).Send()
return
}
lrr.body.UnreadByte()
if firstByte == '{' {
trcLog = trcLog.RawJSON("response", lrr.body.Bytes())
} else {
trcLog = trcLog.Bytes("response", lrr.body.Bytes())
}
trcLog.Msg("response payload")
})
}
// Implement Flush to support the http.Flusher interface
func (w *LoggingResponseWriter) Flush() {
if flusher, ok := w.ResponseWriter.(http.Flusher); ok {
flusher.Flush()
}
}
func (w *LoggingResponseWriter) WriteHeader(code int) {
w.statusCode = code
w.ResponseWriter.WriteHeader(code)
}
func (w *LoggingResponseWriter) Write(b []byte) (int, error) {
w.body.Write(b)
return w.ResponseWriter.Write(b)
}
func newLoggingResponseWriter(w http.ResponseWriter) *LoggingResponseWriter {
return &LoggingResponseWriter{
ResponseWriter: w, statusCode: http.StatusOK, body: bytes.NewBuffer(nil),
}
}