service: fix possible deadlock on close

Signed-off-by: Adphi <philippe.adrien.nousse@gmail.com>
This commit is contained in:
Adphi 2023-08-04 18:51:17 +02:00
parent df505b58d7
commit 0e1fe17b97
Signed by: adphi
GPG Key ID: 46BE4062DB2397FF

View File

@ -61,6 +61,7 @@ type service struct {
id string id string
regSvc *registry.Service regSvc *registry.Service
o sync.Once
closed chan struct{} closed chan struct{}
} }
@ -94,6 +95,7 @@ func newService(opts ...Option) (*service, error) {
select { select {
case <-s.opts.ctx.Done(): case <-s.opts.ctx.Done():
s.Stop() s.Stop()
return
} }
} }
}() }()
@ -137,13 +139,14 @@ func (s *service) Options() Options {
return s.opts return s.opts
} }
func (s *service) run() error { func (s *service) start() (*errgroup.Group, error) {
s.mu.Lock() s.mu.Lock()
defer s.mu.Unlock()
s.closed = make(chan struct{}) s.closed = make(chan struct{})
// configure grpc web now that we are ready to go // configure grpc web now that we are ready to go
if err := s.grpcWeb(s.opts.grpcWebOpts...); err != nil { if err := s.grpcWeb(s.opts.grpcWebOpts...); err != nil {
return err return nil, err
} }
network := "tcp" network := "tcp"
@ -155,7 +158,7 @@ func (s *service) run() error {
if s.opts.lis == nil { if s.opts.lis == nil {
lis, err := net.Listen(network, s.opts.address) lis, err := net.Listen(network, s.opts.address)
if err != nil { if err != nil {
return err return nil, err
} }
if s.opts.tlsConfig != nil { if s.opts.tlsConfig != nil {
lis = tls.NewListener(lis, s.opts.tlsConfig) lis = tls.NewListener(lis, s.opts.tlsConfig)
@ -174,13 +177,12 @@ func (s *service) run() error {
for i := range s.opts.beforeStart { for i := range s.opts.beforeStart {
if err := s.opts.beforeStart[i](); err != nil { if err := s.opts.beforeStart[i](); err != nil {
s.mu.Unlock() return nil, err
return err
} }
} }
if err := s.register(); err != nil { if err := s.register(); err != nil {
return err return nil, err
} }
s.running = true s.running = true
@ -234,12 +236,17 @@ func (s *service) run() error {
} }
for i := range s.opts.afterStart { for i := range s.opts.afterStart {
if err := s.opts.afterStart[i](); err != nil { if err := s.opts.afterStart[i](); err != nil {
s.mu.Unlock() return nil, err
s.Stop()
return err
} }
} }
s.mu.Unlock() return g, nil
}
func (s *service) run() error {
g, err := s.start()
if err != nil {
return err
}
sigs := s.notify() sigs := s.notify()
errs := make(chan error, 1) errs := make(chan error, 1)
@ -265,6 +272,13 @@ func (s *service) Start() error {
} }
func (s *service) Stop() error { func (s *service) Stop() error {
var err error
s.o.Do(func() {
err = s.stop()
})
return err
}
func (s *service) stop() error {
log := logger.C(s.opts.ctx) log := logger.C(s.opts.ctx)
s.mu.Lock() s.mu.Lock()
defer s.mu.Unlock() defer s.mu.Unlock()
@ -280,6 +294,8 @@ func (s *service) Stop() error {
log.Errorf("failed to deregister service: %v", err) log.Errorf("failed to deregister service: %v", err)
} }
defer close(s.closed) defer close(s.closed)
t := time.NewTimer(5 * time.Second)
defer t.Stop()
sigs := s.notify() sigs := s.notify()
done := make(chan struct{}) done := make(chan struct{})
go func() { go func() {
@ -293,6 +309,9 @@ func (s *service) Stop() error {
s.server.GracefulStop() s.server.GracefulStop()
}() }()
select { select {
case <-t.C:
log.Warnf("timeout waiting for server to stop")
s.server.Stop()
case sig := <-sigs: case sig := <-sigs:
fmt.Println() fmt.Println()
log.Warnf("received %v", sig) log.Warnf("received %v", sig)
@ -300,6 +319,9 @@ func (s *service) Stop() error {
s.server.Stop() s.server.Stop()
case <-done: case <-done:
} }
if err := s.opts.lis.Close(); err != nil {
log.Errorf("failed to close listener: %v", err)
}
s.running = false s.running = false
s.cancel() s.cancel()
for i := range s.opts.afterStop { for i := range s.opts.afterStop {