From e89119a68d5483605292447b3d58f224c760fd2e Mon Sep 17 00:00:00 2001 From: Adphi Date: Wed, 16 Oct 2024 14:53:10 +0200 Subject: [PATCH] add pm service wrapper and child through re-exec Signed-off-by: Adphi --- go.mod | 9 ++- go.sum | 20 +++++ process/child.go | 177 ++++++++++++++++++++++++++++++++++++++++++ process/child_test.go | 54 +++++++++++++ process/service.go | 56 +++++++++++++ service/service.go | 2 +- 6 files changed, 314 insertions(+), 4 deletions(-) create mode 100644 process/child.go create mode 100644 process/child_test.go create mode 100644 process/service.go diff --git a/go.mod b/go.mod index 6731b87..321c778 100644 --- a/go.mod +++ b/go.mod @@ -6,13 +6,13 @@ toolchain go1.23.2 require ( github.com/alta/protopatch v0.5.3 - github.com/bombsimon/logrusr/v4 v4.0.0 + github.com/bombsimon/logrusr/v4 v4.1.0 github.com/caitlinelfring/go-env-default v1.1.0 github.com/envoyproxy/protoc-gen-validate v1.1.0 github.com/fatih/color v1.13.0 github.com/fsnotify/fsnotify v1.5.4 github.com/fullstorydev/grpchan v1.1.1 - github.com/go-logr/logr v1.3.0 + github.com/go-logr/logr v1.4.2 github.com/golang/protobuf v1.5.4 github.com/google/uuid v1.6.0 github.com/grpc-ecosystem/go-grpc-middleware v1.4.0 @@ -29,7 +29,7 @@ require ( github.com/planetscale/vtprotobuf v0.6.1-0.20240917153116-6f2963f01587 github.com/prometheus/client_golang v1.20.4 github.com/rs/cors v1.7.0 - github.com/sirupsen/logrus v1.9.0 + github.com/sirupsen/logrus v1.9.3 github.com/soheilhy/cmux v0.1.5 github.com/spf13/cobra v1.3.0 github.com/spf13/pflag v1.0.5 @@ -72,6 +72,9 @@ require ( github.com/prometheus/common v0.60.0 // indirect github.com/prometheus/procfs v0.15.1 // indirect github.com/spf13/afero v1.11.0 // indirect + github.com/thejerf/suture/v4 v4.0.5 // indirect + go.linka.cloud/pm v0.0.0-20241010170247-3b07e9ded7e5 // indirect + go.linka.cloud/pubsub v0.0.0-20220728154114-8213058139f3 // indirect go.uber.org/atomic v1.7.0 // indirect golang.org/x/mod v0.21.0 // indirect golang.org/x/sys v0.26.0 // indirect diff --git a/go.sum b/go.sum index 798835b..75e1a17 100644 --- a/go.sum +++ b/go.sum @@ -86,6 +86,8 @@ github.com/beorn7/perks v1.0.1/go.mod h1:G2ZrVWU2WbWT9wwq4/hrbKbnv/1ERSJQ0ibhJ6r github.com/bgentry/speakeasy v0.1.0/go.mod h1:+zsyZBPWlz7T6j88CTgSN5bM796AkVf0kBD4zp0CCIs= github.com/bombsimon/logrusr/v4 v4.0.0 h1:Pm0InGphX0wMhPqC02t31onlq9OVyJ98eP/Vh63t1Oo= github.com/bombsimon/logrusr/v4 v4.0.0/go.mod h1:pjfHC5e59CvjTBIU3V3sGhFWFAnsnhOR03TRc6im0l8= +github.com/bombsimon/logrusr/v4 v4.1.0 h1:uZNPbwusB0eUXlO8hIUwStE6Lr5bLN6IgYgG+75kuh4= +github.com/bombsimon/logrusr/v4 v4.1.0/go.mod h1:pjfHC5e59CvjTBIU3V3sGhFWFAnsnhOR03TRc6im0l8= github.com/bytedance/sonic v1.5.0/go.mod h1:ED5hyg4y6t3/9Ku1R6dU/4KyJ48DZ4jPhfY1O2AihPM= github.com/bytedance/sonic v1.9.1 h1:6iJ6NqdoxCDr6mbY8h18oSO+cShGSMRGCEo7F2h0x8s= github.com/bytedance/sonic v1.9.1/go.mod h1:i736AoUSYt75HyZLoJW9ERYxcy6eaN6h4BZXU064P/U= @@ -195,6 +197,8 @@ github.com/go-logfmt/logfmt v0.4.0/go.mod h1:3RMwSq7FuexP4Kalkev3ejPJsZTpXXBr9+V github.com/go-logfmt/logfmt v0.5.0/go.mod h1:wCYkCAKZfumFQihp8CzCvQ3paCTfi41vtzG1KdI/P7A= github.com/go-logr/logr v1.3.0 h1:2y3SDp0ZXuc6/cjLSZ+Q3ir+QB9T/iG5yYRXqsagWSY= github.com/go-logr/logr v1.3.0/go.mod h1:9T104GzyrTigFIr8wt5mBrctHMim0Nb2HLGrmQ40KvY= +github.com/go-logr/logr v1.4.2 h1:6pFjapn8bFcIbiKo3XT4j/BhANplGihG6tvd+8rYgrY= +github.com/go-logr/logr v1.4.2/go.mod h1:9T104GzyrTigFIr8wt5mBrctHMim0Nb2HLGrmQ40KvY= github.com/go-playground/assert/v2 v2.2.0/go.mod h1:VDjEfimB/XKnb+ZQfWdccd7VUvScMdVu0Titje2rxJ4= github.com/go-playground/locales v0.14.1 h1:EWaQ/wswjilfKLTECiXz7Rh+3BjFhfDFKv/oXslEjJA= github.com/go-playground/locales v0.14.1/go.mod h1:hxrqLVvrK65+Rwrd5Fc6F2O76J/NuW9t0sjnWqG1slY= @@ -591,6 +595,8 @@ github.com/sirupsen/logrus v1.6.0/go.mod h1:7uNnSEd1DgxDLC74fIahvMZmmYsHGZGEOFrf github.com/sirupsen/logrus v1.7.0/go.mod h1:yWOB1SBYBC5VeMP7gHvWumXLIWorT60ONWic61uBYv0= github.com/sirupsen/logrus v1.9.0 h1:trlNQbNUG3OdDrDil03MCb1H2o9nJ1x4/5LYw7byDE0= github.com/sirupsen/logrus v1.9.0/go.mod h1:naHLuLoDiP4jHNo9R0sCBMtWGeIprob74mVsIT4qYEQ= +github.com/sirupsen/logrus v1.9.3 h1:dueUQJ1C2q9oE3F7wvmSGAaVtTmUizReu6fjN8uqzbQ= +github.com/sirupsen/logrus v1.9.3/go.mod h1:naHLuLoDiP4jHNo9R0sCBMtWGeIprob74mVsIT4qYEQ= github.com/smartystreets/assertions v0.0.0-20180927180507-b2de0cb4f26d/go.mod h1:OnSkiWE9lh6wB0YB77sQom3nweQdgAjqCqsofrRNTgc= github.com/smartystreets/goconvey v1.6.4/go.mod h1:syvi0/a8iFYH4r/RixwvyeAJjdLS9QV7WQ/tjFTllLA= github.com/soheilhy/cmux v0.1.4/go.mod h1:IM3LyeVVIOuxMH7sFAkER9+bJ4dT7Ms6E4xg4kGIyLM= @@ -632,6 +638,8 @@ github.com/stretchr/testify v1.8.3/go.mod h1:sz/lmYIOXD/1dqDmKjjqLyZ2RngseejIcXl github.com/stretchr/testify v1.9.0 h1:HtqpIVDClZ4nwg75+f6Lvsy/wHu+3BoSGCbBAcpTsTg= github.com/stretchr/testify v1.9.0/go.mod h1:r2ic/lqez/lEtzL7wO/rwa5dbSLXVDPFyf8C91i36aY= github.com/subosito/gotenv v1.2.0/go.mod h1:N0PQaV/YGNqwC0u51sEeR/aUtSLEXKX9iv69rRypqCw= +github.com/thejerf/suture/v4 v4.0.5 h1:F1E/4FZwXWqvlWDKEUo6/ndLtxGAUzMmNqkrMknZbAA= +github.com/thejerf/suture/v4 v4.0.5/go.mod h1:gu9Y4dXNUWFrByqRt30Rm9/UZ0wzRSt9AJS6xu/ZGxU= github.com/tmc/grpc-websocket-proxy v0.0.0-20170815181823-89b8d40f7ca8/go.mod h1:ncp9v5uamzpCO7NfCPTXjqaC+bZgJeR0sMTm6dMHP7U= github.com/tmc/grpc-websocket-proxy v0.0.0-20190109142713-0ad062ec5ee5 h1:LnC5Kc/wtumK+WB441p7ynQJzVuNRJiqddSIE3IlSEQ= github.com/tmc/grpc-websocket-proxy v0.0.0-20190109142713-0ad062ec5ee5/go.mod h1:ncp9v5uamzpCO7NfCPTXjqaC+bZgJeR0sMTm6dMHP7U= @@ -654,12 +662,24 @@ go.etcd.io/etcd v0.0.0-20191023171146-3cf2f69b5738/go.mod h1:dnLIgRNXwCJa5e+c6mI go.etcd.io/etcd/api/v3 v3.5.1/go.mod h1:cbVKeC6lCfl7j/8jBhAK6aIYO9XOjdptoxU/nLQcPvs= go.etcd.io/etcd/client/pkg/v3 v3.5.1/go.mod h1:IJHfcCEKxYu1Os13ZdwCwIUTUVGYTSAM3YSwc9/Ac1g= go.etcd.io/etcd/client/v2 v2.305.1/go.mod h1:pMEacxZW7o8pg4CrFE7pquyCJJzZvkvdD2RibOCCCGs= +go.linka.cloud/pm v0.0.0-20241010161437-832985f01fbe h1:j3eUzAMZflkoJUn86QUfJw+ZyEWGFV6dvmI5nF8t2m0= +go.linka.cloud/pm v0.0.0-20241010161437-832985f01fbe/go.mod h1:yrOr60M3ecN7cxy9XRv0OzFSghptmjqb2vK816BVEPM= +go.linka.cloud/pm v0.0.0-20241010163939-5155fc3d6e22 h1:ID203ue46rFphUTjrHglPznzbiwNjKfxXf8ixaq5/AI= +go.linka.cloud/pm v0.0.0-20241010163939-5155fc3d6e22/go.mod h1:yrOr60M3ecN7cxy9XRv0OzFSghptmjqb2vK816BVEPM= +go.linka.cloud/pm v0.0.0-20241010170247-3b07e9ded7e5 h1:AtdK5M4q6w/iL9Z4RxmQxevaRTDcPpoF+wNti2+eLvM= +go.linka.cloud/pm v0.0.0-20241010170247-3b07e9ded7e5/go.mod h1:yrOr60M3ecN7cxy9XRv0OzFSghptmjqb2vK816BVEPM= +go.linka.cloud/pm v0.1.0 h1:igP0fUzxzgjPhXBFoC4l35HMV56tIibusq2qynR8J1o= +go.linka.cloud/pm v0.1.0/go.mod h1:yrOr60M3ecN7cxy9XRv0OzFSghptmjqb2vK816BVEPM= +go.linka.cloud/pm v0.2.0 h1:PGA5eQq8L/dwh5WiStoMikpT65zqBbFld5cVpfgiPM4= +go.linka.cloud/pm v0.2.0/go.mod h1:yrOr60M3ecN7cxy9XRv0OzFSghptmjqb2vK816BVEPM= go.linka.cloud/protoc-gen-defaults v0.4.0 h1:ekcfTTY74AhKBGMF9usz+xkUFxLaPVAu6xmQvwmjbfc= go.linka.cloud/protoc-gen-defaults v0.4.0/go.mod h1:IJcTbM/oraQvdE/mz0vxhoBmJHE+rb4vF2IXJztcadY= go.linka.cloud/protoc-gen-go-fields v0.4.0 h1:jSfDhnQ42VAr8iNeDjljrWMeGDKGXEBWzmUIK690wow= go.linka.cloud/protoc-gen-go-fields v0.4.0/go.mod h1:Rd9hJ8aB9zHeWLtpmMgzNFjnpjh82sj9BMHwFLE9Fbs= go.linka.cloud/protofilters v0.8.1 h1:fvo95k2i1yIGg3QA655qg2Yqdeq9/cB1JWsGwmKydrI= go.linka.cloud/protofilters v0.8.1/go.mod h1:8gMkhSFLQzuaY8SN0g5XZ03JHj6CdjUQNL5Vy1Cc3ig= +go.linka.cloud/pubsub v0.0.0-20220728154114-8213058139f3 h1:8qfogYXX5OrCrDa8CI9w6K1NnEYCz27n32bDveCoCvM= +go.linka.cloud/pubsub v0.0.0-20220728154114-8213058139f3/go.mod h1:n1VFgi7RahshaOFJYKThBv/LwaMxKoBTfymxVWRJHJk= go.opencensus.io v0.20.1/go.mod h1:6WKK9ahsWS3RSO+PY9ZHZUfv2irvY6gN279GOPZjmmk= go.opencensus.io v0.20.2/go.mod h1:6WKK9ahsWS3RSO+PY9ZHZUfv2irvY6gN279GOPZjmmk= go.opencensus.io v0.21.0/go.mod h1:mSImk1erAIZhrmZN+AvHh14ztQfjbGwt4TtuofqLduU= diff --git a/process/child.go b/process/child.go new file mode 100644 index 0000000..0aee87a --- /dev/null +++ b/process/child.go @@ -0,0 +1,177 @@ +package process + +import ( + "context" + "errors" + "fmt" + "net" + "os" + "os/exec" + "path/filepath" + "regexp" + "strings" + "sync" + + "go.linka.cloud/pm" + "go.linka.cloud/pm/reexec" + "go.uber.org/multierr" + "google.golang.org/grpc" + + "go.linka.cloud/grpc-toolkit/logger" + "go.linka.cloud/grpc-toolkit/service" + "go.linka.cloud/grpc-toolkit/signals" +) + +var _ pm.Service = (*Child)(nil) + +var serviceRegx = regexp.MustCompile(`^[a-zA-Z0-9_-]+$`) + +type Childs []*Child + +func (c Childs) Register() { + for _, c := range c { + c.Register() + } +} + +func (c Childs) WithOpts(opts ...pm.CmdOpt) Childs { + for _, c := range c { + c.WithOpts(opts...) + } + return c +} + +func (c Childs) Close() error { + var err error + for _, c := range c { + err = multierr.Append(err, c.Close()) + } + return err +} + +func NewChild(name string, opts ...service.Option) (*Child, error) { + if !serviceRegx.MatchString(name) { + return nil, errors.New("invalid name") + } + return &Child{name: name, o: opts}, nil +} + +type Child struct { + name string + o []service.Option + co []pm.CmdOpt + c *pm.Cmd + m sync.RWMutex +} + +func (c *Child) WithOpts(opts ...pm.CmdOpt) *Child { + c.co = append(c.co, opts...) + return c +} + +func (c *Child) Serve(ctx context.Context) error { + c.m.RLock() + if c.c != nil { + c.m.RUnlock() + return pm.ErrAlreadyRunning + } + c.m.RUnlock() + s := c.socket() + _ = os.Remove(s) + lis, err := net.ListenUnix("unix", &net.UnixAddr{Name: s}) + if err != nil { + return err + } + defer lis.Close() + defer os.Remove(s) + if os.Getenv("PM_NO_FORK") == "1" { + return c.serve(ctx, lis) + } + f, err := lis.File() + if err != nil { + return err + } + c.m.Lock() + c.c = pm.ReExec(c.name).WithOpts(pm.WithExtraFiles(f), pm.WithCancel(func(cmd *exec.Cmd) error { + return cmd.Process.Signal(os.Interrupt) + })) + c.m.Unlock() + defer func() { + c.m.Lock() + c.c = nil + c.m.Unlock() + }() + return c.c.Serve(ctx) +} + +func (c *Child) Register() { + reexec.Register(c.name, func() { + ctx := signals.SetupSignalHandler() + ctx = logger.Set(ctx, logger.C(ctx).WithField("service", c.String())) + if err := c.run(ctx); err != nil { + logger.C(ctx).Fatal(err) + } + }) +} + +func (c *Child) Dial(ctx context.Context, opts ...grpc.DialOption) (grpc.ClientConnInterface, error) { + conn, err := grpc.DialContext(ctx, "", append(opts, grpc.WithContextDialer(func(ctx context.Context, addr string) (net.Conn, error) { + c.m.RLock() + defer c.m.RUnlock() + if c.c == nil { + return nil, errors.New("not running") + } + return net.Dial("unix", c.socket()) + }))...) + return conn, err +} + +func (c *Child) Close() error { + c.m.Lock() + defer c.m.Unlock() + if c.c != nil { + return c.c.Signal(os.Interrupt) + } + return nil +} + +func (c *Child) String() string { + return c.name +} + +func (c *Child) socket() string { + name := strings.NewReplacer("/", "-", ":", "-", " ", "-").Replace(c.name) + dir := "/tmp" + if d := os.Getenv("TMPDIR"); d != "" { + dir = d + } + return filepath.Join(dir, name+".sock") +} + +func (c *Child) run(ctx context.Context) error { + f := os.NewFile(3, "conn") + if f == nil { + return errors.New("invalid connection file descriptor") + } + lis, err := net.FileListener(f) + if err != nil { + return err + } + defer lis.Close() + return c.serve(ctx, lis) +} + +func (c *Child) serve(ctx context.Context, lis net.Listener) error { + logger.C(ctx).Infof("starting service") + s, err := service.New(append( + c.o, + service.WithContext(ctx), + service.WithListener(lis), + )...) + if err != nil { + return fmt.Errorf("failed to create service: %w", err) + } + defer s.Close() + return s.Start() + return nil +} diff --git a/process/child_test.go b/process/child_test.go new file mode 100644 index 0000000..5837910 --- /dev/null +++ b/process/child_test.go @@ -0,0 +1,54 @@ +package process_test + +import ( + "context" + "os" + "testing" + + "go.linka.cloud/pm/reexec" + "golang.org/x/sync/errgroup" + "google.golang.org/grpc" + "google.golang.org/grpc/credentials/insecure" + "google.golang.org/grpc/health/grpc_health_v1" + + "go.linka.cloud/grpc-toolkit/logger" + "go.linka.cloud/grpc-toolkit/process" + "go.linka.cloud/grpc-toolkit/signals" +) + +func TestChild(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + c, err := process.NewChild("test-child") + if err != nil { + t.Fatal(err) + } + c.Register() + + if reexec.Init() { + os.Exit(0) + } + + ctx = signals.SetupSignalHandlerWithContext(ctx) + logger.C(ctx).Infof("starting host: %v", os.Args) + g, ctx := errgroup.WithContext(ctx) + g.Go(func() error { + return c.Serve(ctx) + }) + g.Go(func() error { + conn, err := c.Dial(ctx, grpc.WithTransportCredentials(insecure.NewCredentials())) + if err != nil { + return err + } + res, err := grpc_health_v1.NewHealthClient(conn).Check(ctx, &grpc_health_v1.HealthCheckRequest{}) + if err != nil { + return err + } + logger.C(ctx).Infof("health check: %v", res) + return c.Close() + }) + if err := g.Wait(); err != nil { + t.Error(err) + } +} diff --git a/process/service.go b/process/service.go new file mode 100644 index 0000000..b471ab7 --- /dev/null +++ b/process/service.go @@ -0,0 +1,56 @@ +package process + +import ( + "context" + + "go.linka.cloud/pm" + + "go.linka.cloud/grpc-toolkit/service" +) + +var _ pm.Service = (*Service)(nil) + +var ( + Notify = pm.Notify +) + +func NewService(name string, opts ...service.Option) *Service { + return &Service{name: name, o: opts} +} + +type Service struct { + name string + o []service.Option +} + +func (s *Service) Serve(ctx context.Context) error { + svc, err := service.New(s.opts(ctx)...) + if err != nil { + return err + } + defer svc.Close() + pm.Notify(ctx, pm.StatusStarting) + defer func() { + pm.Notify(ctx, pm.StatusStopped) + }() + return svc.Start() +} + +func (s *Service) String() string { + return s.name +} + +func (s *Service) opts(ctx context.Context) []service.Option { + return append(s.o, + service.WithName(s.name), + service.WithContext(ctx), + service.WithAfterStart(func() error { + pm.Notify(ctx, pm.StatusRunning) + return nil + }), + service.WithBeforeStop(func() error { + pm.Notify(ctx, pm.StatusStopping) + return nil + }), + ) +} diff --git a/service/service.go b/service/service.go index d4e4067..e23f313 100644 --- a/service/service.go +++ b/service/service.go @@ -274,7 +274,7 @@ func (s *service) run() error { logger.C(s.opts.ctx).Warnf("received %v", sig) return s.Close() case err := <-errs: - if !isMuxError(err) { + if err != nil && !isMuxError(err) { logger.C(s.opts.ctx).Error(err) return err }