From 89ebbee8dc7f53db63b0f3a30b23225a8f62315d Mon Sep 17 00:00:00 2001 From: Adphi Date: Thu, 30 Sep 2021 12:00:22 +0200 Subject: [PATCH] add missing transport folder --- .gitignore | 1 - service/service.go | 2 +- transport/grpc/grpc.go | 15 +++++++++++ transport/inproc/inproc.go | 53 ++++++++++++++++++++++++++++++++++++++ transport/transport.go | 14 ++++++++++ 5 files changed, 83 insertions(+), 2 deletions(-) create mode 100644 transport/grpc/grpc.go create mode 100644 transport/inproc/inproc.go create mode 100644 transport/transport.go diff --git a/.gitignore b/.gitignore index e9c7ff9..83ff4e8 100644 --- a/.gitignore +++ b/.gitignore @@ -1,3 +1,2 @@ .idea .bin -transport diff --git a/service/service.go b/service/service.go index ce55745..dee22cc 100644 --- a/service/service.go +++ b/service/service.go @@ -220,7 +220,7 @@ func (s *service) run() error { logrus.Warnf("received %v", sig) return s.Close() case err := <-errs: - if err != nil { + if err != nil && !ignoreMuxError(err){ logrus.Error(err) return err } diff --git a/transport/grpc/grpc.go b/transport/grpc/grpc.go new file mode 100644 index 0000000..6c9cda3 --- /dev/null +++ b/transport/grpc/grpc.go @@ -0,0 +1,15 @@ +package grpc + +import ( + "google.golang.org/grpc" + + "go.linka.cloud/grpc/transport" +) + +var ( + _ transport.Transport = &grpc.Server{} +) + +func New() transport.Transport { + return grpc.NewServer() +} diff --git a/transport/inproc/inproc.go b/transport/inproc/inproc.go new file mode 100644 index 0000000..fef51ef --- /dev/null +++ b/transport/inproc/inproc.go @@ -0,0 +1,53 @@ +package inproc + +import ( + "errors" + "net" + "sync" + + "github.com/fullstorydev/grpchan/inprocgrpc" + + "go.linka.cloud/grpc/transport" +) + +var ( + _ transport.Transport = &InProc{} +) + +func New() transport.Transport { + return &InProc{stop: make(chan struct{})} +} + +type InProc struct { + inprocgrpc.Channel + stop chan struct{} + running bool + mu sync.RWMutex +} + +func (i *InProc) Serve(_ net.Listener) error { + i.mu.RLock() + running := i.running + i.mu.RUnlock() + if running { + return errors.New("already running") + } + i.mu.Lock() + i.running = true + i.mu.Unlock() + <-i.stop + return nil +} + +func (i *InProc) Stop() { + i.mu.RLock() + running := i.running + i.mu.RUnlock() + if running { + i.stop <- struct{}{} + } +} + +func (i *InProc) GracefulStop() { + i.Stop() +} diff --git a/transport/transport.go b/transport/transport.go new file mode 100644 index 0000000..e8b4082 --- /dev/null +++ b/transport/transport.go @@ -0,0 +1,14 @@ +package transport + +import ( + "net" + + "google.golang.org/grpc" +) + +type Transport interface { + grpc.ServiceRegistrar + Serve(lis net.Listener) error + Stop() + GracefulStop() +}