add pm service wrapper and child through re-exec

Signed-off-by: Adphi <philippe.adrien.nousse@gmail.com>
This commit is contained in:
2024-10-16 14:53:10 +02:00
parent ccf44285f9
commit e89119a68d
6 changed files with 314 additions and 4 deletions

177
process/child.go Normal file
View File

@ -0,0 +1,177 @@
package process
import (
"context"
"errors"
"fmt"
"net"
"os"
"os/exec"
"path/filepath"
"regexp"
"strings"
"sync"
"go.linka.cloud/pm"
"go.linka.cloud/pm/reexec"
"go.uber.org/multierr"
"google.golang.org/grpc"
"go.linka.cloud/grpc-toolkit/logger"
"go.linka.cloud/grpc-toolkit/service"
"go.linka.cloud/grpc-toolkit/signals"
)
var _ pm.Service = (*Child)(nil)
var serviceRegx = regexp.MustCompile(`^[a-zA-Z0-9_-]+$`)
type Childs []*Child
func (c Childs) Register() {
for _, c := range c {
c.Register()
}
}
func (c Childs) WithOpts(opts ...pm.CmdOpt) Childs {
for _, c := range c {
c.WithOpts(opts...)
}
return c
}
func (c Childs) Close() error {
var err error
for _, c := range c {
err = multierr.Append(err, c.Close())
}
return err
}
func NewChild(name string, opts ...service.Option) (*Child, error) {
if !serviceRegx.MatchString(name) {
return nil, errors.New("invalid name")
}
return &Child{name: name, o: opts}, nil
}
type Child struct {
name string
o []service.Option
co []pm.CmdOpt
c *pm.Cmd
m sync.RWMutex
}
func (c *Child) WithOpts(opts ...pm.CmdOpt) *Child {
c.co = append(c.co, opts...)
return c
}
func (c *Child) Serve(ctx context.Context) error {
c.m.RLock()
if c.c != nil {
c.m.RUnlock()
return pm.ErrAlreadyRunning
}
c.m.RUnlock()
s := c.socket()
_ = os.Remove(s)
lis, err := net.ListenUnix("unix", &net.UnixAddr{Name: s})
if err != nil {
return err
}
defer lis.Close()
defer os.Remove(s)
if os.Getenv("PM_NO_FORK") == "1" {
return c.serve(ctx, lis)
}
f, err := lis.File()
if err != nil {
return err
}
c.m.Lock()
c.c = pm.ReExec(c.name).WithOpts(pm.WithExtraFiles(f), pm.WithCancel(func(cmd *exec.Cmd) error {
return cmd.Process.Signal(os.Interrupt)
}))
c.m.Unlock()
defer func() {
c.m.Lock()
c.c = nil
c.m.Unlock()
}()
return c.c.Serve(ctx)
}
func (c *Child) Register() {
reexec.Register(c.name, func() {
ctx := signals.SetupSignalHandler()
ctx = logger.Set(ctx, logger.C(ctx).WithField("service", c.String()))
if err := c.run(ctx); err != nil {
logger.C(ctx).Fatal(err)
}
})
}
func (c *Child) Dial(ctx context.Context, opts ...grpc.DialOption) (grpc.ClientConnInterface, error) {
conn, err := grpc.DialContext(ctx, "", append(opts, grpc.WithContextDialer(func(ctx context.Context, addr string) (net.Conn, error) {
c.m.RLock()
defer c.m.RUnlock()
if c.c == nil {
return nil, errors.New("not running")
}
return net.Dial("unix", c.socket())
}))...)
return conn, err
}
func (c *Child) Close() error {
c.m.Lock()
defer c.m.Unlock()
if c.c != nil {
return c.c.Signal(os.Interrupt)
}
return nil
}
func (c *Child) String() string {
return c.name
}
func (c *Child) socket() string {
name := strings.NewReplacer("/", "-", ":", "-", " ", "-").Replace(c.name)
dir := "/tmp"
if d := os.Getenv("TMPDIR"); d != "" {
dir = d
}
return filepath.Join(dir, name+".sock")
}
func (c *Child) run(ctx context.Context) error {
f := os.NewFile(3, "conn")
if f == nil {
return errors.New("invalid connection file descriptor")
}
lis, err := net.FileListener(f)
if err != nil {
return err
}
defer lis.Close()
return c.serve(ctx, lis)
}
func (c *Child) serve(ctx context.Context, lis net.Listener) error {
logger.C(ctx).Infof("starting service")
s, err := service.New(append(
c.o,
service.WithContext(ctx),
service.WithListener(lis),
)...)
if err != nil {
return fmt.Errorf("failed to create service: %w", err)
}
defer s.Close()
return s.Start()
return nil
}

54
process/child_test.go Normal file
View File

@ -0,0 +1,54 @@
package process_test
import (
"context"
"os"
"testing"
"go.linka.cloud/pm/reexec"
"golang.org/x/sync/errgroup"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials/insecure"
"google.golang.org/grpc/health/grpc_health_v1"
"go.linka.cloud/grpc-toolkit/logger"
"go.linka.cloud/grpc-toolkit/process"
"go.linka.cloud/grpc-toolkit/signals"
)
func TestChild(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
c, err := process.NewChild("test-child")
if err != nil {
t.Fatal(err)
}
c.Register()
if reexec.Init() {
os.Exit(0)
}
ctx = signals.SetupSignalHandlerWithContext(ctx)
logger.C(ctx).Infof("starting host: %v", os.Args)
g, ctx := errgroup.WithContext(ctx)
g.Go(func() error {
return c.Serve(ctx)
})
g.Go(func() error {
conn, err := c.Dial(ctx, grpc.WithTransportCredentials(insecure.NewCredentials()))
if err != nil {
return err
}
res, err := grpc_health_v1.NewHealthClient(conn).Check(ctx, &grpc_health_v1.HealthCheckRequest{})
if err != nil {
return err
}
logger.C(ctx).Infof("health check: %v", res)
return c.Close()
})
if err := g.Wait(); err != nil {
t.Error(err)
}
}

56
process/service.go Normal file
View File

@ -0,0 +1,56 @@
package process
import (
"context"
"go.linka.cloud/pm"
"go.linka.cloud/grpc-toolkit/service"
)
var _ pm.Service = (*Service)(nil)
var (
Notify = pm.Notify
)
func NewService(name string, opts ...service.Option) *Service {
return &Service{name: name, o: opts}
}
type Service struct {
name string
o []service.Option
}
func (s *Service) Serve(ctx context.Context) error {
svc, err := service.New(s.opts(ctx)...)
if err != nil {
return err
}
defer svc.Close()
pm.Notify(ctx, pm.StatusStarting)
defer func() {
pm.Notify(ctx, pm.StatusStopped)
}()
return svc.Start()
}
func (s *Service) String() string {
return s.name
}
func (s *Service) opts(ctx context.Context) []service.Option {
return append(s.o,
service.WithName(s.name),
service.WithContext(ctx),
service.WithAfterStart(func() error {
pm.Notify(ctx, pm.StatusRunning)
return nil
}),
service.WithBeforeStop(func() error {
pm.Notify(ctx, pm.StatusStopping)
return nil
}),
)
}