add h2c/http2 support with WithoutCmux option

Signed-off-by: Adphi <philippe.adrien.nousse@gmail.com>
This commit is contained in:
Adphi 2023-07-10 13:34:40 +02:00
parent df505b58d7
commit 9a645d3f44
Signed by: adphi
GPG Key ID: 46BE4062DB2397FF
3 changed files with 99 additions and 31 deletions

View File

@ -6,12 +6,14 @@ import (
"errors" "errors"
"fmt" "fmt"
"io" "io"
"net"
"net/http" "net/http"
"strings" "strings"
"time" "time"
"github.com/prometheus/client_golang/prometheus/promhttp" "github.com/prometheus/client_golang/prometheus/promhttp"
"github.com/spf13/cobra" "github.com/spf13/cobra"
"golang.org/x/net/http2"
"google.golang.org/grpc" "google.golang.org/grpc"
"google.golang.org/grpc/health/grpc_health_v1" "google.golang.org/grpc/health/grpc_health_v1"
"google.golang.org/grpc/metadata" "google.golang.org/grpc/metadata"
@ -130,6 +132,7 @@ func run(opts ...service.Option) {
close(done) close(done)
return nil return nil
}), }),
service.WithoutCmux(),
service.WithGateway(RegisterGreeterHandler), service.WithGateway(RegisterGreeterHandler),
service.WithGatewayPrefix("/rest"), service.WithGatewayPrefix("/rest"),
service.WithGRPCWeb(true), service.WithGRPCWeb(true),
@ -242,14 +245,26 @@ func run(opts ...service.Option) {
log.Infof("received stream message: %s", m.Message) log.Infof("received stream message: %s", m.Message)
} }
scheme := "http://" scheme := "http://"
var (
tlsConfig *tls.Config
dial func(ctx context.Context, network, addr string, cfg *tls.Config) (net.Conn, error)
)
if secure { if secure {
scheme = "https://" scheme = "https://"
tlsConfig = &tls.Config{
InsecureSkipVerify: true,
}
} else {
dial = func(ctx context.Context, network, addr string, cfg *tls.Config) (net.Conn, error) {
var d net.Dialer
return d.DialContext(ctx, network, addr)
}
} }
httpc := &http.Client{ httpc := &http.Client{
Transport: &http.Transport{ Transport: &http2.Transport{
TLSClientConfig: &tls.Config{ AllowHTTP: true,
InsecureSkipVerify: true, TLSClientConfig: tlsConfig,
}, DialTLSContext: dial,
}, },
} }
req := `{"name":"test"}` req := `{"name":"test"}`

View File

@ -334,6 +334,13 @@ func WithReactUI(fs fs.FS, subpath string) Option {
} }
} }
// WithoutCmux disables the use of cmux for http support to instead use grpc.Server.ServeHTTP method when http support is enabled
func WithoutCmux() Option {
return func(o *options) {
o.withoutCmux = true
}
}
type options struct { type options struct {
ctx context.Context ctx context.Context
name string name string
@ -381,6 +388,7 @@ type options struct {
error error error error
gatewayPrefix string gatewayPrefix string
withoutCmux bool
} }
func (o *options) Name() string { func (o *options) Name() string {
@ -499,6 +507,10 @@ func (o *options) GatewayOpts() []runtime.ServeMuxOption {
return o.gatewayOpts return o.gatewayOpts
} }
func (o *options) WithoutCmux() bool {
return o.withoutCmux
}
func (o *options) parseTLSConfig() error { func (o *options) parseTLSConfig() error {
if o.tlsConfig != nil { if o.tlsConfig != nil {
return nil return nil

View File

@ -21,6 +21,8 @@ import (
"github.com/rs/cors" "github.com/rs/cors"
"github.com/soheilhy/cmux" "github.com/soheilhy/cmux"
"go.uber.org/multierr" "go.uber.org/multierr"
"golang.org/x/net/http2"
"golang.org/x/net/http2/h2c"
"golang.org/x/sync/errgroup" "golang.org/x/sync/errgroup"
"google.golang.org/grpc" "google.golang.org/grpc"
"google.golang.org/grpc/health" "google.golang.org/grpc/health"
@ -166,12 +168,6 @@ func (s *service) run() error {
s.opts.address = s.opts.lis.Addr().String() s.opts.address = s.opts.lis.Addr().String()
} }
mux := cmux.New(s.opts.lis)
mux.SetReadTimeout(5 * time.Second)
gLis := mux.MatchWithWriters(cmux.HTTP2MatchHeaderFieldSendSettings("content-type", "application/grpc"))
hList := mux.Match(cmux.Any())
for i := range s.opts.beforeStart { for i := range s.opts.beforeStart {
if err := s.opts.beforeStart[i](); err != nil { if err := s.opts.beforeStart[i](); err != nil {
s.mu.Unlock() s.mu.Unlock()
@ -201,35 +197,23 @@ func (s *service) run() error {
} }
} }
fn := s.runWithCmux
if s.opts.withoutCmux {
fn = s.runWithoutCmux
}
g, ctx := errgroup.WithContext(s.opts.ctx) g, ctx := errgroup.WithContext(s.opts.ctx)
if s.opts.mux != nil { if err := fn(ctx, g); err != nil {
hServer := &http.Server{ return err
Handler: alice.New(s.opts.middlewares...).Then(cors.New(s.opts.cors).Handler(s.opts.mux)),
} }
g.Go(func() error {
defer hServer.Shutdown(ctx)
return hServer.Serve(hList)
})
}
g.Go(func() error {
return s.server.Serve(gLis)
})
g.Go(func() error {
return ignoreMuxError(mux.Serve())
})
if s.healthServer != nil { if s.healthServer != nil {
for k := range s.services { for k := range s.services {
s.healthServer.SetServingStatus(k, grpc_health_v1.HealthCheckResponse_SERVING) s.healthServer.SetServingStatus(k, grpc_health_v1.HealthCheckResponse_SERVING)
} }
defer func() { defer func() {
if s.healthServer != nil {
for k := range s.services { for k := range s.services {
s.healthServer.SetServingStatus(k, grpc_health_v1.HealthCheckResponse_NOT_SERVING) s.healthServer.SetServingStatus(k, grpc_health_v1.HealthCheckResponse_NOT_SERVING)
} }
}
}() }()
} }
for i := range s.opts.afterStart { for i := range s.opts.afterStart {
@ -260,6 +244,63 @@ func (s *service) run() error {
} }
} }
func (s *service) runWithoutCmux(ctx context.Context, g *errgroup.Group) error {
if s.opts.mux != nil {
handler := alice.New(s.opts.middlewares...).Then(cors.New(s.opts.cors).Handler(s.opts.mux))
hServer := &http.Server{
Handler: h2c.NewHandler(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
if r.ProtoMajor == 2 && r.Header.Get("Content-Type") == "application/grpc" {
s.server.ServeHTTP(w, r)
} else {
handler.ServeHTTP(w, r)
}
}), &http2.Server{}),
TLSConfig: s.opts.tlsConfig,
}
if err := http2.ConfigureServer(hServer, &http2.Server{}); err != nil {
return err
}
g.Go(func() error {
defer hServer.Shutdown(ctx)
return hServer.Serve(s.opts.lis)
})
} else {
g.Go(func() error {
return s.server.Serve(s.opts.lis)
})
}
return nil
}
func (s *service) runWithCmux(ctx context.Context, g *errgroup.Group) error {
mux := cmux.New(s.opts.lis)
mux.SetReadTimeout(5 * time.Second)
gLis := mux.MatchWithWriters(cmux.HTTP2MatchHeaderFieldSendSettings("content-type", "application/grpc"))
hList := mux.Match(cmux.Any())
if s.opts.mux != nil {
hServer := &http.Server{
Handler: alice.New(s.opts.middlewares...).Then(cors.New(s.opts.cors).Handler(s.opts.mux)),
TLSConfig: s.opts.tlsConfig,
}
g.Go(func() error {
defer hServer.Shutdown(ctx)
return hServer.Serve(hList)
})
}
g.Go(func() error {
return s.server.Serve(gLis)
})
g.Go(func() error {
return ignoreMuxError(mux.Serve())
})
return nil
}
func (s *service) Start() error { func (s *service) Start() error {
return s.run() return s.run()
} }