From 1fa30d97068402a25a9683e0bdfce47674c79b3f Mon Sep 17 00:00:00 2001 From: Adphi Date: Sat, 8 Jul 2023 01:29:18 +0200 Subject: [PATCH] mux: run http server if gateway | grpcWeb | react | mux is defined goroutines: use errgroup Signed-off-by: Adphi --- go.mod | 1 + go.sum | 2 +- service/gateway.go | 4 +-- service/service.go | 77 +++++++++++++++++++++++++++------------------- service/web.go | 6 ++-- 5 files changed, 52 insertions(+), 38 deletions(-) diff --git a/go.mod b/go.mod index 9fcb8b7..4d5daee 100644 --- a/go.mod +++ b/go.mod @@ -36,6 +36,7 @@ require ( go.linka.cloud/protofilters v0.5.0 go.uber.org/multierr v1.7.0 golang.org/x/net v0.8.0 + golang.org/x/sync v0.1.0 google.golang.org/genproto v0.0.0-20230306155012-7f2fa6fef1f4 google.golang.org/grpc v1.53.0 google.golang.org/grpc/cmd/protoc-gen-go-grpc v1.3.0 diff --git a/go.sum b/go.sum index 8e884f0..4c173f7 100644 --- a/go.sum +++ b/go.sum @@ -333,7 +333,6 @@ github.com/grpc-ecosystem/go-grpc-middleware v1.2.2/go.mod h1:EaizFBKfUKtMIF5iaD github.com/grpc-ecosystem/go-grpc-middleware v1.3.0 h1:+9834+KizmvFV7pXQGSXQTsaWhq2GjuNUt0aUU0YBYw= github.com/grpc-ecosystem/go-grpc-middleware v1.3.0/go.mod h1:z0ButlSOZa5vEBq9m2m2hlwIgKw+rp3sdCBRoJY+30Y= github.com/grpc-ecosystem/grpc-gateway v1.9.5/go.mod h1:vNeuVxBJEsws4ogUvrchl83t/GYV9WGTSLVdBhOQFDY= -github.com/grpc-ecosystem/grpc-gateway v1.16.0 h1:gmcG1KaJ57LophUzW0Hy8NmPhnMZb4M0+kPpLofRdBo= github.com/grpc-ecosystem/grpc-gateway v1.16.0/go.mod h1:BDjrQk3hbvj6Nolgz8mAMFbcEtjT1g+wF4CSlocrBnw= github.com/grpc-ecosystem/grpc-opentracing v0.0.0-20180507213350-8e809c8a8645 h1:MJG/KsmcqMwFAkh8mTnAwhyKoB+sTAnY4CACC110tbU= github.com/grpc-ecosystem/grpc-opentracing v0.0.0-20180507213350-8e809c8a8645/go.mod h1:6iZfnjpejD4L/4DwD7NryNaJyCQdzwWwH2MWhCA90Kw= @@ -886,6 +885,7 @@ golang.org/x/sync v0.0.0-20201207232520-09787c993a3a/go.mod h1:RxMgew5VJxzue5/jJ golang.org/x/sync v0.0.0-20210220032951-036812b2e83c/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20220722155255-886fb9371eb4/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.1.0 h1:wsuoTGHzEhffawBOhz5CYhcrV4IdKZbEyZjBMuTp12o= +golang.org/x/sync v0.1.0/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sys v0.0.0-20180823144017-11551d06cbcc/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20180830151530-49385e6e1522/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20180905080454-ebe1bf3edb33/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= diff --git a/service/gateway.go b/service/gateway.go index b6c1fcc..4a19638 100644 --- a/service/gateway.go +++ b/service/gateway.go @@ -22,9 +22,9 @@ func (s *service) gateway(opts ...runtime.ServeMuxOption) error { return err } if s.opts.gatewayPrefix != "" { - s.opts.mux.Handle(s.opts.gatewayPrefix+"/", http.StripPrefix(s.opts.gatewayPrefix, wsproxy.WebsocketProxy(mux))) + s.lazyMux().Handle(s.opts.gatewayPrefix+"/", http.StripPrefix(s.opts.gatewayPrefix, wsproxy.WebsocketProxy(mux))) } else { - s.opts.mux.Handle("/", wsproxy.WebsocketProxy(mux)) + s.lazyMux().Handle("/", wsproxy.WebsocketProxy(mux)) } return nil } diff --git a/service/service.go b/service/service.go index ba74dc4..d4b8a1d 100644 --- a/service/service.go +++ b/service/service.go @@ -21,6 +21,7 @@ import ( "github.com/rs/cors" "github.com/soheilhy/cmux" "go.uber.org/multierr" + "golang.org/x/sync/errgroup" "google.golang.org/grpc" "google.golang.org/grpc/health" "google.golang.org/grpc/health/grpc_health_v1" @@ -84,9 +85,6 @@ func newService(opts ...Option) (*service, error) { s.opts.streamClientInterceptors = append([]grpc.StreamClientInterceptor{md.StreamClientInterceptor()}, s.opts.streamClientInterceptors...) } - if s.opts.mux == nil { - s.opts.mux = http.NewServeMux() - } if s.opts.error != nil { return nil, s.opts.error } @@ -186,8 +184,6 @@ func (s *service) run() error { } s.running = true - errs := make(chan error, 3) - if reflect.DeepEqual(s.opts.cors, cors.Options{}) { s.opts.cors = cors.Options{ AllowedHeaders: []string{"*"}, @@ -204,31 +200,26 @@ func (s *service) run() error { AllowCredentials: true, } } - hServer := &http.Server{ - Handler: alice.New(s.opts.middlewares...).Then(cors.New(s.opts.cors).Handler(s.opts.mux)), - } - if s.opts.Gateway() || s.opts.grpcWeb || s.opts.hasReactUI { - go func() { - errs <- hServer.Serve(hList) - hServer.Shutdown(s.opts.ctx) - }() - } - go func() { - errs <- s.server.Serve(gLis) - }() - go func() { - if err := mux.Serve(); err != nil { - // TODO(adphi): find more elegant solution - if ignoreMuxError(err) { - errs <- nil - return - } - errs <- err - return + g, ctx := errgroup.WithContext(s.opts.ctx) + if s.opts.mux != nil { + hServer := &http.Server{ + Handler: alice.New(s.opts.middlewares...).Then(cors.New(s.opts.cors).Handler(s.opts.mux)), } - errs <- nil - }() + g.Go(func() error { + defer hServer.Shutdown(ctx) + return hServer.Serve(hList) + }) + } + + g.Go(func() error { + return s.server.Serve(gLis) + }) + + g.Go(func() error { + return ignoreMuxError(mux.Serve()) + }) + if s.healthServer != nil { for k := range s.services { s.healthServer.SetServingStatus(k, grpc_health_v1.HealthCheckResponse_SERVING) @@ -250,13 +241,18 @@ func (s *service) run() error { } s.mu.Unlock() sigs := s.notify() + + errs := make(chan error, 1) + go func() { + errs <- g.Wait() + }() select { case sig := <-sigs: fmt.Println() logger.C(s.opts.ctx).Warnf("received %v", sig) return s.Close() case err := <-errs: - if err != nil && !ignoreMuxError(err) { + if !isMuxError(err) { logger.C(s.opts.ctx).Error(err) return err } @@ -394,10 +390,27 @@ func (s *service) notify() <-chan os.Signal { return sigs } -func ignoreMuxError(err error) bool { +func (s *service) lazyMux() ServeMux { + if s.opts.mux == nil { + s.opts.mux = http.NewServeMux() + } + return s.opts.mux +} + +func ignoreMuxError(err error) error { + if !isMuxError(err) { + return err + } + return nil +} + +func isMuxError(err error) bool { if err == nil { + return false + } + if strings.Contains(err.Error(), "use of closed network connection") || + strings.Contains(err.Error(), "mux: server closed") { return true } - return strings.Contains(err.Error(), "use of closed network connection") || - strings.Contains(err.Error(), "mux: server closed") + return false } diff --git a/service/web.go b/service/web.go index 4dd600b..6a4bb85 100644 --- a/service/web.go +++ b/service/web.go @@ -28,9 +28,9 @@ func (s *service) grpcWeb(opts ...grpcweb.Option) error { h := grpcweb.WrapServer(s.server, append(defaultWebOptions, opts...)...) for _, v := range grpcweb.ListGRPCResources(s.server) { if s.opts.grpcWebPrefix != "" { - s.opts.mux.Handle(s.opts.grpcWebPrefix+v, http.StripPrefix(s.opts.grpcWebPrefix, h)) + s.lazyMux().Handle(s.opts.grpcWebPrefix+v, http.StripPrefix(s.opts.grpcWebPrefix, h)) } else { - s.opts.mux.Handle(v, h) + s.lazyMux().Handle(v, h) } } return nil @@ -44,6 +44,6 @@ func (s *service) reactApp() error { if err != nil { return err } - s.opts.mux.Handle("/", h) + s.lazyMux().Handle("/", h) return nil }