mux: run http server if gateway | grpcWeb | react | mux is defined

goroutines: use errgroup
Signed-off-by: Adphi <philippe.adrien.nousse@gmail.com>
This commit is contained in:
Adphi 2023-07-08 01:29:18 +02:00
parent 36f3c5bc81
commit 1fa30d9706
Signed by: adphi
GPG Key ID: 46BE4062DB2397FF
5 changed files with 52 additions and 38 deletions

1
go.mod
View File

@ -36,6 +36,7 @@ require (
go.linka.cloud/protofilters v0.5.0 go.linka.cloud/protofilters v0.5.0
go.uber.org/multierr v1.7.0 go.uber.org/multierr v1.7.0
golang.org/x/net v0.8.0 golang.org/x/net v0.8.0
golang.org/x/sync v0.1.0
google.golang.org/genproto v0.0.0-20230306155012-7f2fa6fef1f4 google.golang.org/genproto v0.0.0-20230306155012-7f2fa6fef1f4
google.golang.org/grpc v1.53.0 google.golang.org/grpc v1.53.0
google.golang.org/grpc/cmd/protoc-gen-go-grpc v1.3.0 google.golang.org/grpc/cmd/protoc-gen-go-grpc v1.3.0

2
go.sum
View File

@ -333,7 +333,6 @@ github.com/grpc-ecosystem/go-grpc-middleware v1.2.2/go.mod h1:EaizFBKfUKtMIF5iaD
github.com/grpc-ecosystem/go-grpc-middleware v1.3.0 h1:+9834+KizmvFV7pXQGSXQTsaWhq2GjuNUt0aUU0YBYw= github.com/grpc-ecosystem/go-grpc-middleware v1.3.0 h1:+9834+KizmvFV7pXQGSXQTsaWhq2GjuNUt0aUU0YBYw=
github.com/grpc-ecosystem/go-grpc-middleware v1.3.0/go.mod h1:z0ButlSOZa5vEBq9m2m2hlwIgKw+rp3sdCBRoJY+30Y= github.com/grpc-ecosystem/go-grpc-middleware v1.3.0/go.mod h1:z0ButlSOZa5vEBq9m2m2hlwIgKw+rp3sdCBRoJY+30Y=
github.com/grpc-ecosystem/grpc-gateway v1.9.5/go.mod h1:vNeuVxBJEsws4ogUvrchl83t/GYV9WGTSLVdBhOQFDY= github.com/grpc-ecosystem/grpc-gateway v1.9.5/go.mod h1:vNeuVxBJEsws4ogUvrchl83t/GYV9WGTSLVdBhOQFDY=
github.com/grpc-ecosystem/grpc-gateway v1.16.0 h1:gmcG1KaJ57LophUzW0Hy8NmPhnMZb4M0+kPpLofRdBo=
github.com/grpc-ecosystem/grpc-gateway v1.16.0/go.mod h1:BDjrQk3hbvj6Nolgz8mAMFbcEtjT1g+wF4CSlocrBnw= github.com/grpc-ecosystem/grpc-gateway v1.16.0/go.mod h1:BDjrQk3hbvj6Nolgz8mAMFbcEtjT1g+wF4CSlocrBnw=
github.com/grpc-ecosystem/grpc-opentracing v0.0.0-20180507213350-8e809c8a8645 h1:MJG/KsmcqMwFAkh8mTnAwhyKoB+sTAnY4CACC110tbU= github.com/grpc-ecosystem/grpc-opentracing v0.0.0-20180507213350-8e809c8a8645 h1:MJG/KsmcqMwFAkh8mTnAwhyKoB+sTAnY4CACC110tbU=
github.com/grpc-ecosystem/grpc-opentracing v0.0.0-20180507213350-8e809c8a8645/go.mod h1:6iZfnjpejD4L/4DwD7NryNaJyCQdzwWwH2MWhCA90Kw= github.com/grpc-ecosystem/grpc-opentracing v0.0.0-20180507213350-8e809c8a8645/go.mod h1:6iZfnjpejD4L/4DwD7NryNaJyCQdzwWwH2MWhCA90Kw=
@ -886,6 +885,7 @@ golang.org/x/sync v0.0.0-20201207232520-09787c993a3a/go.mod h1:RxMgew5VJxzue5/jJ
golang.org/x/sync v0.0.0-20210220032951-036812b2e83c/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20210220032951-036812b2e83c/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.0.0-20220722155255-886fb9371eb4/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20220722155255-886fb9371eb4/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.1.0 h1:wsuoTGHzEhffawBOhz5CYhcrV4IdKZbEyZjBMuTp12o= golang.org/x/sync v0.1.0 h1:wsuoTGHzEhffawBOhz5CYhcrV4IdKZbEyZjBMuTp12o=
golang.org/x/sync v0.1.0/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sys v0.0.0-20180823144017-11551d06cbcc/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20180823144017-11551d06cbcc/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/sys v0.0.0-20180830151530-49385e6e1522/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20180830151530-49385e6e1522/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/sys v0.0.0-20180905080454-ebe1bf3edb33/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20180905080454-ebe1bf3edb33/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=

View File

@ -22,9 +22,9 @@ func (s *service) gateway(opts ...runtime.ServeMuxOption) error {
return err return err
} }
if s.opts.gatewayPrefix != "" { if s.opts.gatewayPrefix != "" {
s.opts.mux.Handle(s.opts.gatewayPrefix+"/", http.StripPrefix(s.opts.gatewayPrefix, wsproxy.WebsocketProxy(mux))) s.lazyMux().Handle(s.opts.gatewayPrefix+"/", http.StripPrefix(s.opts.gatewayPrefix, wsproxy.WebsocketProxy(mux)))
} else { } else {
s.opts.mux.Handle("/", wsproxy.WebsocketProxy(mux)) s.lazyMux().Handle("/", wsproxy.WebsocketProxy(mux))
} }
return nil return nil
} }

View File

@ -21,6 +21,7 @@ import (
"github.com/rs/cors" "github.com/rs/cors"
"github.com/soheilhy/cmux" "github.com/soheilhy/cmux"
"go.uber.org/multierr" "go.uber.org/multierr"
"golang.org/x/sync/errgroup"
"google.golang.org/grpc" "google.golang.org/grpc"
"google.golang.org/grpc/health" "google.golang.org/grpc/health"
"google.golang.org/grpc/health/grpc_health_v1" "google.golang.org/grpc/health/grpc_health_v1"
@ -84,9 +85,6 @@ func newService(opts ...Option) (*service, error) {
s.opts.streamClientInterceptors = append([]grpc.StreamClientInterceptor{md.StreamClientInterceptor()}, s.opts.streamClientInterceptors...) s.opts.streamClientInterceptors = append([]grpc.StreamClientInterceptor{md.StreamClientInterceptor()}, s.opts.streamClientInterceptors...)
} }
if s.opts.mux == nil {
s.opts.mux = http.NewServeMux()
}
if s.opts.error != nil { if s.opts.error != nil {
return nil, s.opts.error return nil, s.opts.error
} }
@ -186,8 +184,6 @@ func (s *service) run() error {
} }
s.running = true s.running = true
errs := make(chan error, 3)
if reflect.DeepEqual(s.opts.cors, cors.Options{}) { if reflect.DeepEqual(s.opts.cors, cors.Options{}) {
s.opts.cors = cors.Options{ s.opts.cors = cors.Options{
AllowedHeaders: []string{"*"}, AllowedHeaders: []string{"*"},
@ -204,31 +200,26 @@ func (s *service) run() error {
AllowCredentials: true, AllowCredentials: true,
} }
} }
hServer := &http.Server{
Handler: alice.New(s.opts.middlewares...).Then(cors.New(s.opts.cors).Handler(s.opts.mux)),
}
if s.opts.Gateway() || s.opts.grpcWeb || s.opts.hasReactUI {
go func() {
errs <- hServer.Serve(hList)
hServer.Shutdown(s.opts.ctx)
}()
}
go func() {
errs <- s.server.Serve(gLis)
}()
go func() { g, ctx := errgroup.WithContext(s.opts.ctx)
if err := mux.Serve(); err != nil { if s.opts.mux != nil {
// TODO(adphi): find more elegant solution hServer := &http.Server{
if ignoreMuxError(err) { Handler: alice.New(s.opts.middlewares...).Then(cors.New(s.opts.cors).Handler(s.opts.mux)),
errs <- nil
return
}
errs <- err
return
} }
errs <- nil g.Go(func() error {
}() defer hServer.Shutdown(ctx)
return hServer.Serve(hList)
})
}
g.Go(func() error {
return s.server.Serve(gLis)
})
g.Go(func() error {
return ignoreMuxError(mux.Serve())
})
if s.healthServer != nil { if s.healthServer != nil {
for k := range s.services { for k := range s.services {
s.healthServer.SetServingStatus(k, grpc_health_v1.HealthCheckResponse_SERVING) s.healthServer.SetServingStatus(k, grpc_health_v1.HealthCheckResponse_SERVING)
@ -250,13 +241,18 @@ func (s *service) run() error {
} }
s.mu.Unlock() s.mu.Unlock()
sigs := s.notify() sigs := s.notify()
errs := make(chan error, 1)
go func() {
errs <- g.Wait()
}()
select { select {
case sig := <-sigs: case sig := <-sigs:
fmt.Println() fmt.Println()
logger.C(s.opts.ctx).Warnf("received %v", sig) logger.C(s.opts.ctx).Warnf("received %v", sig)
return s.Close() return s.Close()
case err := <-errs: case err := <-errs:
if err != nil && !ignoreMuxError(err) { if !isMuxError(err) {
logger.C(s.opts.ctx).Error(err) logger.C(s.opts.ctx).Error(err)
return err return err
} }
@ -394,10 +390,27 @@ func (s *service) notify() <-chan os.Signal {
return sigs return sigs
} }
func ignoreMuxError(err error) bool { func (s *service) lazyMux() ServeMux {
if s.opts.mux == nil {
s.opts.mux = http.NewServeMux()
}
return s.opts.mux
}
func ignoreMuxError(err error) error {
if !isMuxError(err) {
return err
}
return nil
}
func isMuxError(err error) bool {
if err == nil { if err == nil {
return false
}
if strings.Contains(err.Error(), "use of closed network connection") ||
strings.Contains(err.Error(), "mux: server closed") {
return true return true
} }
return strings.Contains(err.Error(), "use of closed network connection") || return false
strings.Contains(err.Error(), "mux: server closed")
} }

View File

@ -28,9 +28,9 @@ func (s *service) grpcWeb(opts ...grpcweb.Option) error {
h := grpcweb.WrapServer(s.server, append(defaultWebOptions, opts...)...) h := grpcweb.WrapServer(s.server, append(defaultWebOptions, opts...)...)
for _, v := range grpcweb.ListGRPCResources(s.server) { for _, v := range grpcweb.ListGRPCResources(s.server) {
if s.opts.grpcWebPrefix != "" { if s.opts.grpcWebPrefix != "" {
s.opts.mux.Handle(s.opts.grpcWebPrefix+v, http.StripPrefix(s.opts.grpcWebPrefix, h)) s.lazyMux().Handle(s.opts.grpcWebPrefix+v, http.StripPrefix(s.opts.grpcWebPrefix, h))
} else { } else {
s.opts.mux.Handle(v, h) s.lazyMux().Handle(v, h)
} }
} }
return nil return nil
@ -44,6 +44,6 @@ func (s *service) reactApp() error {
if err != nil { if err != nil {
return err return err
} }
s.opts.mux.Handle("/", h) s.lazyMux().Handle("/", h)
return nil return nil
} }