mirror of
https://github.com/linka-cloud/grpc.git
synced 2024-11-24 03:46:24 +00:00
add h2c/http2 support with WithoutCmux option
Signed-off-by: Adphi <philippe.adrien.nousse@gmail.com>
This commit is contained in:
parent
f455c9994c
commit
c0c19683cf
@ -6,12 +6,14 @@ import (
|
|||||||
"errors"
|
"errors"
|
||||||
"fmt"
|
"fmt"
|
||||||
"io"
|
"io"
|
||||||
|
"net"
|
||||||
"net/http"
|
"net/http"
|
||||||
"strings"
|
"strings"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
"github.com/prometheus/client_golang/prometheus/promhttp"
|
"github.com/prometheus/client_golang/prometheus/promhttp"
|
||||||
"github.com/spf13/cobra"
|
"github.com/spf13/cobra"
|
||||||
|
"golang.org/x/net/http2"
|
||||||
"google.golang.org/grpc"
|
"google.golang.org/grpc"
|
||||||
"google.golang.org/grpc/health/grpc_health_v1"
|
"google.golang.org/grpc/health/grpc_health_v1"
|
||||||
"google.golang.org/grpc/metadata"
|
"google.golang.org/grpc/metadata"
|
||||||
@ -130,6 +132,7 @@ func run(opts ...service.Option) {
|
|||||||
close(done)
|
close(done)
|
||||||
return nil
|
return nil
|
||||||
}),
|
}),
|
||||||
|
service.WithoutCmux(),
|
||||||
service.WithGateway(RegisterGreeterHandler),
|
service.WithGateway(RegisterGreeterHandler),
|
||||||
service.WithGatewayPrefix("/rest"),
|
service.WithGatewayPrefix("/rest"),
|
||||||
service.WithGRPCWeb(true),
|
service.WithGRPCWeb(true),
|
||||||
@ -242,14 +245,26 @@ func run(opts ...service.Option) {
|
|||||||
log.Infof("received stream message: %s", m.Message)
|
log.Infof("received stream message: %s", m.Message)
|
||||||
}
|
}
|
||||||
scheme := "http://"
|
scheme := "http://"
|
||||||
|
var (
|
||||||
|
tlsConfig *tls.Config
|
||||||
|
dial func(ctx context.Context, network, addr string, cfg *tls.Config) (net.Conn, error)
|
||||||
|
)
|
||||||
if secure {
|
if secure {
|
||||||
scheme = "https://"
|
scheme = "https://"
|
||||||
|
tlsConfig = &tls.Config{
|
||||||
|
InsecureSkipVerify: true,
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
dial = func(ctx context.Context, network, addr string, cfg *tls.Config) (net.Conn, error) {
|
||||||
|
var d net.Dialer
|
||||||
|
return d.DialContext(ctx, network, addr)
|
||||||
|
}
|
||||||
}
|
}
|
||||||
httpc := &http.Client{
|
httpc := &http.Client{
|
||||||
Transport: &http.Transport{
|
Transport: &http2.Transport{
|
||||||
TLSClientConfig: &tls.Config{
|
AllowHTTP: true,
|
||||||
InsecureSkipVerify: true,
|
TLSClientConfig: tlsConfig,
|
||||||
},
|
DialTLSContext: dial,
|
||||||
},
|
},
|
||||||
}
|
}
|
||||||
req := `{"name":"test"}`
|
req := `{"name":"test"}`
|
||||||
|
@ -334,6 +334,13 @@ func WithReactUI(fs fs.FS, subpath string) Option {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// WithoutCmux disables the use of cmux for http support to instead use grpc.Server.ServeHTTP method when http support is enabled
|
||||||
|
func WithoutCmux() Option {
|
||||||
|
return func(o *options) {
|
||||||
|
o.withoutCmux = true
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
type options struct {
|
type options struct {
|
||||||
ctx context.Context
|
ctx context.Context
|
||||||
name string
|
name string
|
||||||
@ -381,6 +388,7 @@ type options struct {
|
|||||||
|
|
||||||
error error
|
error error
|
||||||
gatewayPrefix string
|
gatewayPrefix string
|
||||||
|
withoutCmux bool
|
||||||
}
|
}
|
||||||
|
|
||||||
func (o *options) Name() string {
|
func (o *options) Name() string {
|
||||||
@ -499,6 +507,10 @@ func (o *options) GatewayOpts() []runtime.ServeMuxOption {
|
|||||||
return o.gatewayOpts
|
return o.gatewayOpts
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (o *options) WithoutCmux() bool {
|
||||||
|
return o.withoutCmux
|
||||||
|
}
|
||||||
|
|
||||||
func (o *options) parseTLSConfig() error {
|
func (o *options) parseTLSConfig() error {
|
||||||
if o.tlsConfig != nil {
|
if o.tlsConfig != nil {
|
||||||
return nil
|
return nil
|
||||||
|
@ -21,6 +21,8 @@ import (
|
|||||||
"github.com/rs/cors"
|
"github.com/rs/cors"
|
||||||
"github.com/soheilhy/cmux"
|
"github.com/soheilhy/cmux"
|
||||||
"go.uber.org/multierr"
|
"go.uber.org/multierr"
|
||||||
|
"golang.org/x/net/http2"
|
||||||
|
"golang.org/x/net/http2/h2c"
|
||||||
"golang.org/x/sync/errgroup"
|
"golang.org/x/sync/errgroup"
|
||||||
"google.golang.org/grpc"
|
"google.golang.org/grpc"
|
||||||
"google.golang.org/grpc/health"
|
"google.golang.org/grpc/health"
|
||||||
@ -170,12 +172,6 @@ func (s *service) start() (*errgroup.Group, error) {
|
|||||||
s.opts.address = s.opts.lis.Addr().String()
|
s.opts.address = s.opts.lis.Addr().String()
|
||||||
}
|
}
|
||||||
|
|
||||||
mux := cmux.New(s.opts.lis)
|
|
||||||
mux.SetReadTimeout(5 * time.Second)
|
|
||||||
|
|
||||||
gLis := mux.MatchWithWriters(cmux.HTTP2MatchHeaderFieldSendSettings("content-type", "application/grpc"))
|
|
||||||
hList := mux.Match(cmux.Any())
|
|
||||||
|
|
||||||
for i := range s.opts.beforeStart {
|
for i := range s.opts.beforeStart {
|
||||||
if err := s.opts.beforeStart[i](); err != nil {
|
if err := s.opts.beforeStart[i](); err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
@ -204,34 +200,23 @@ func (s *service) start() (*errgroup.Group, error) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
g, ctx := errgroup.WithContext(s.opts.ctx)
|
fn := s.runWithCmux
|
||||||
if s.opts.mux != nil {
|
if s.opts.withoutCmux {
|
||||||
hServer := &http.Server{
|
fn = s.runWithoutCmux
|
||||||
Handler: alice.New(s.opts.middlewares...).Then(cors.New(s.opts.cors).Handler(s.opts.mux)),
|
|
||||||
}
|
|
||||||
g.Go(func() error {
|
|
||||||
defer hServer.Shutdown(ctx)
|
|
||||||
return hServer.Serve(hList)
|
|
||||||
})
|
|
||||||
}
|
}
|
||||||
|
|
||||||
g.Go(func() error {
|
g, ctx := errgroup.WithContext(s.opts.ctx)
|
||||||
return s.server.Serve(gLis)
|
if err := fn(ctx, g); err != nil {
|
||||||
})
|
return nil, err
|
||||||
|
}
|
||||||
g.Go(func() error {
|
|
||||||
return ignoreMuxError(mux.Serve())
|
|
||||||
})
|
|
||||||
|
|
||||||
if s.healthServer != nil {
|
if s.healthServer != nil {
|
||||||
for k := range s.services {
|
for k := range s.services {
|
||||||
s.healthServer.SetServingStatus(k, grpc_health_v1.HealthCheckResponse_SERVING)
|
s.healthServer.SetServingStatus(k, grpc_health_v1.HealthCheckResponse_SERVING)
|
||||||
}
|
}
|
||||||
defer func() {
|
defer func() {
|
||||||
if s.healthServer != nil {
|
for k := range s.services {
|
||||||
for k := range s.services {
|
s.healthServer.SetServingStatus(k, grpc_health_v1.HealthCheckResponse_NOT_SERVING)
|
||||||
s.healthServer.SetServingStatus(k, grpc_health_v1.HealthCheckResponse_NOT_SERVING)
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
}()
|
}()
|
||||||
}
|
}
|
||||||
@ -268,6 +253,63 @@ func (s *service) run() error {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (s *service) runWithoutCmux(ctx context.Context, g *errgroup.Group) error {
|
||||||
|
if s.opts.mux != nil {
|
||||||
|
handler := alice.New(s.opts.middlewares...).Then(cors.New(s.opts.cors).Handler(s.opts.mux))
|
||||||
|
hServer := &http.Server{
|
||||||
|
Handler: h2c.NewHandler(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
|
||||||
|
if r.ProtoMajor == 2 && r.Header.Get("Content-Type") == "application/grpc" {
|
||||||
|
s.server.ServeHTTP(w, r)
|
||||||
|
} else {
|
||||||
|
handler.ServeHTTP(w, r)
|
||||||
|
}
|
||||||
|
}), &http2.Server{}),
|
||||||
|
TLSConfig: s.opts.tlsConfig,
|
||||||
|
}
|
||||||
|
if err := http2.ConfigureServer(hServer, &http2.Server{}); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
g.Go(func() error {
|
||||||
|
defer hServer.Shutdown(ctx)
|
||||||
|
return hServer.Serve(s.opts.lis)
|
||||||
|
})
|
||||||
|
} else {
|
||||||
|
g.Go(func() error {
|
||||||
|
return s.server.Serve(s.opts.lis)
|
||||||
|
})
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *service) runWithCmux(ctx context.Context, g *errgroup.Group) error {
|
||||||
|
mux := cmux.New(s.opts.lis)
|
||||||
|
mux.SetReadTimeout(5 * time.Second)
|
||||||
|
|
||||||
|
gLis := mux.MatchWithWriters(cmux.HTTP2MatchHeaderFieldSendSettings("content-type", "application/grpc"))
|
||||||
|
hList := mux.Match(cmux.Any())
|
||||||
|
|
||||||
|
if s.opts.mux != nil {
|
||||||
|
hServer := &http.Server{
|
||||||
|
Handler: alice.New(s.opts.middlewares...).Then(cors.New(s.opts.cors).Handler(s.opts.mux)),
|
||||||
|
TLSConfig: s.opts.tlsConfig,
|
||||||
|
}
|
||||||
|
g.Go(func() error {
|
||||||
|
defer hServer.Shutdown(ctx)
|
||||||
|
return hServer.Serve(hList)
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
g.Go(func() error {
|
||||||
|
return s.server.Serve(gLis)
|
||||||
|
})
|
||||||
|
|
||||||
|
g.Go(func() error {
|
||||||
|
return ignoreMuxError(mux.Serve())
|
||||||
|
})
|
||||||
|
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
func (s *service) Serve(lis net.Listener) error {
|
func (s *service) Serve(lis net.Listener) error {
|
||||||
s.mu.Lock()
|
s.mu.Lock()
|
||||||
s.opts.lis = lis
|
s.opts.lis = lis
|
||||||
|
Loading…
Reference in New Issue
Block a user