diff --git a/.gitignore b/.gitignore index e9c7ff9..83ff4e8 100644 --- a/.gitignore +++ b/.gitignore @@ -1,3 +1,2 @@ .idea .bin -transport diff --git a/service/service.go b/service/service.go index ce55745..dee22cc 100644 --- a/service/service.go +++ b/service/service.go @@ -220,7 +220,7 @@ func (s *service) run() error { logrus.Warnf("received %v", sig) return s.Close() case err := <-errs: - if err != nil { + if err != nil && !ignoreMuxError(err){ logrus.Error(err) return err } diff --git a/transport/grpc/grpc.go b/transport/grpc/grpc.go new file mode 100644 index 0000000..6c9cda3 --- /dev/null +++ b/transport/grpc/grpc.go @@ -0,0 +1,15 @@ +package grpc + +import ( + "google.golang.org/grpc" + + "go.linka.cloud/grpc/transport" +) + +var ( + _ transport.Transport = &grpc.Server{} +) + +func New() transport.Transport { + return grpc.NewServer() +} diff --git a/transport/inproc/inproc.go b/transport/inproc/inproc.go new file mode 100644 index 0000000..fef51ef --- /dev/null +++ b/transport/inproc/inproc.go @@ -0,0 +1,53 @@ +package inproc + +import ( + "errors" + "net" + "sync" + + "github.com/fullstorydev/grpchan/inprocgrpc" + + "go.linka.cloud/grpc/transport" +) + +var ( + _ transport.Transport = &InProc{} +) + +func New() transport.Transport { + return &InProc{stop: make(chan struct{})} +} + +type InProc struct { + inprocgrpc.Channel + stop chan struct{} + running bool + mu sync.RWMutex +} + +func (i *InProc) Serve(_ net.Listener) error { + i.mu.RLock() + running := i.running + i.mu.RUnlock() + if running { + return errors.New("already running") + } + i.mu.Lock() + i.running = true + i.mu.Unlock() + <-i.stop + return nil +} + +func (i *InProc) Stop() { + i.mu.RLock() + running := i.running + i.mu.RUnlock() + if running { + i.stop <- struct{}{} + } +} + +func (i *InProc) GracefulStop() { + i.Stop() +} diff --git a/transport/transport.go b/transport/transport.go new file mode 100644 index 0000000..e8b4082 --- /dev/null +++ b/transport/transport.go @@ -0,0 +1,14 @@ +package transport + +import ( + "net" + + "google.golang.org/grpc" +) + +type Transport interface { + grpc.ServiceRegistrar + Serve(lis net.Listener) error + Stop() + GracefulStop() +}