From c0c19683cf154fe259430547a53293ff2ecf7cfa Mon Sep 17 00:00:00 2001 From: Adphi Date: Sat, 16 Sep 2023 16:21:51 +0200 Subject: [PATCH] add h2c/http2 support with WithoutCmux option Signed-off-by: Adphi --- example/example.go | 23 ++++++++++-- service/options.go | 12 ++++++ service/service.go | 94 +++++++++++++++++++++++++++++++++------------- 3 files changed, 99 insertions(+), 30 deletions(-) diff --git a/example/example.go b/example/example.go index b21487f..664f202 100644 --- a/example/example.go +++ b/example/example.go @@ -6,12 +6,14 @@ import ( "errors" "fmt" "io" + "net" "net/http" "strings" "time" "github.com/prometheus/client_golang/prometheus/promhttp" "github.com/spf13/cobra" + "golang.org/x/net/http2" "google.golang.org/grpc" "google.golang.org/grpc/health/grpc_health_v1" "google.golang.org/grpc/metadata" @@ -130,6 +132,7 @@ func run(opts ...service.Option) { close(done) return nil }), + service.WithoutCmux(), service.WithGateway(RegisterGreeterHandler), service.WithGatewayPrefix("/rest"), service.WithGRPCWeb(true), @@ -242,14 +245,26 @@ func run(opts ...service.Option) { log.Infof("received stream message: %s", m.Message) } scheme := "http://" + var ( + tlsConfig *tls.Config + dial func(ctx context.Context, network, addr string, cfg *tls.Config) (net.Conn, error) + ) if secure { scheme = "https://" + tlsConfig = &tls.Config{ + InsecureSkipVerify: true, + } + } else { + dial = func(ctx context.Context, network, addr string, cfg *tls.Config) (net.Conn, error) { + var d net.Dialer + return d.DialContext(ctx, network, addr) + } } httpc := &http.Client{ - Transport: &http.Transport{ - TLSClientConfig: &tls.Config{ - InsecureSkipVerify: true, - }, + Transport: &http2.Transport{ + AllowHTTP: true, + TLSClientConfig: tlsConfig, + DialTLSContext: dial, }, } req := `{"name":"test"}` diff --git a/service/options.go b/service/options.go index 0be453c..2da6175 100644 --- a/service/options.go +++ b/service/options.go @@ -334,6 +334,13 @@ func WithReactUI(fs fs.FS, subpath string) Option { } } +// WithoutCmux disables the use of cmux for http support to instead use grpc.Server.ServeHTTP method when http support is enabled +func WithoutCmux() Option { + return func(o *options) { + o.withoutCmux = true + } +} + type options struct { ctx context.Context name string @@ -381,6 +388,7 @@ type options struct { error error gatewayPrefix string + withoutCmux bool } func (o *options) Name() string { @@ -499,6 +507,10 @@ func (o *options) GatewayOpts() []runtime.ServeMuxOption { return o.gatewayOpts } +func (o *options) WithoutCmux() bool { + return o.withoutCmux +} + func (o *options) parseTLSConfig() error { if o.tlsConfig != nil { return nil diff --git a/service/service.go b/service/service.go index dd7bee8..c5d55f7 100644 --- a/service/service.go +++ b/service/service.go @@ -21,6 +21,8 @@ import ( "github.com/rs/cors" "github.com/soheilhy/cmux" "go.uber.org/multierr" + "golang.org/x/net/http2" + "golang.org/x/net/http2/h2c" "golang.org/x/sync/errgroup" "google.golang.org/grpc" "google.golang.org/grpc/health" @@ -170,12 +172,6 @@ func (s *service) start() (*errgroup.Group, error) { s.opts.address = s.opts.lis.Addr().String() } - mux := cmux.New(s.opts.lis) - mux.SetReadTimeout(5 * time.Second) - - gLis := mux.MatchWithWriters(cmux.HTTP2MatchHeaderFieldSendSettings("content-type", "application/grpc")) - hList := mux.Match(cmux.Any()) - for i := range s.opts.beforeStart { if err := s.opts.beforeStart[i](); err != nil { return nil, err @@ -204,34 +200,23 @@ func (s *service) start() (*errgroup.Group, error) { } } - g, ctx := errgroup.WithContext(s.opts.ctx) - if s.opts.mux != nil { - hServer := &http.Server{ - Handler: alice.New(s.opts.middlewares...).Then(cors.New(s.opts.cors).Handler(s.opts.mux)), - } - g.Go(func() error { - defer hServer.Shutdown(ctx) - return hServer.Serve(hList) - }) + fn := s.runWithCmux + if s.opts.withoutCmux { + fn = s.runWithoutCmux } - g.Go(func() error { - return s.server.Serve(gLis) - }) - - g.Go(func() error { - return ignoreMuxError(mux.Serve()) - }) + g, ctx := errgroup.WithContext(s.opts.ctx) + if err := fn(ctx, g); err != nil { + return nil, err + } if s.healthServer != nil { for k := range s.services { s.healthServer.SetServingStatus(k, grpc_health_v1.HealthCheckResponse_SERVING) } defer func() { - if s.healthServer != nil { - for k := range s.services { - s.healthServer.SetServingStatus(k, grpc_health_v1.HealthCheckResponse_NOT_SERVING) - } + for k := range s.services { + s.healthServer.SetServingStatus(k, grpc_health_v1.HealthCheckResponse_NOT_SERVING) } }() } @@ -268,6 +253,63 @@ func (s *service) run() error { } } +func (s *service) runWithoutCmux(ctx context.Context, g *errgroup.Group) error { + if s.opts.mux != nil { + handler := alice.New(s.opts.middlewares...).Then(cors.New(s.opts.cors).Handler(s.opts.mux)) + hServer := &http.Server{ + Handler: h2c.NewHandler(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + if r.ProtoMajor == 2 && r.Header.Get("Content-Type") == "application/grpc" { + s.server.ServeHTTP(w, r) + } else { + handler.ServeHTTP(w, r) + } + }), &http2.Server{}), + TLSConfig: s.opts.tlsConfig, + } + if err := http2.ConfigureServer(hServer, &http2.Server{}); err != nil { + return err + } + g.Go(func() error { + defer hServer.Shutdown(ctx) + return hServer.Serve(s.opts.lis) + }) + } else { + g.Go(func() error { + return s.server.Serve(s.opts.lis) + }) + } + return nil +} + +func (s *service) runWithCmux(ctx context.Context, g *errgroup.Group) error { + mux := cmux.New(s.opts.lis) + mux.SetReadTimeout(5 * time.Second) + + gLis := mux.MatchWithWriters(cmux.HTTP2MatchHeaderFieldSendSettings("content-type", "application/grpc")) + hList := mux.Match(cmux.Any()) + + if s.opts.mux != nil { + hServer := &http.Server{ + Handler: alice.New(s.opts.middlewares...).Then(cors.New(s.opts.cors).Handler(s.opts.mux)), + TLSConfig: s.opts.tlsConfig, + } + g.Go(func() error { + defer hServer.Shutdown(ctx) + return hServer.Serve(hList) + }) + } + + g.Go(func() error { + return s.server.Serve(gLis) + }) + + g.Go(func() error { + return ignoreMuxError(mux.Serve()) + }) + + return nil +} + func (s *service) Serve(lis net.Listener) error { s.mu.Lock() s.opts.lis = lis