mirror of
https://github.com/linka-cloud/grpc.git
synced 2025-01-11 02:27:20 +00:00
add minimal config interface and file implementation
This commit is contained in:
parent
1bad45b563
commit
8e6fde19b5
10
config/config.go
Normal file
10
config/config.go
Normal file
@ -0,0 +1,10 @@
|
|||||||
|
package config
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
)
|
||||||
|
|
||||||
|
type Config interface {
|
||||||
|
Read() ([]byte, error)
|
||||||
|
Watch(ctx context.Context, updates chan<- []byte) error
|
||||||
|
}
|
99
config/file/config.go
Normal file
99
config/file/config.go
Normal file
@ -0,0 +1,99 @@
|
|||||||
|
// inspired by / taken from github.com/spf13/viper
|
||||||
|
|
||||||
|
package file
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
"io/ioutil"
|
||||||
|
"os"
|
||||||
|
"path/filepath"
|
||||||
|
"sync"
|
||||||
|
|
||||||
|
"github.com/fsnotify/fsnotify"
|
||||||
|
|
||||||
|
"go.linka.cloud/grpc/config"
|
||||||
|
"go.linka.cloud/grpc/logger"
|
||||||
|
)
|
||||||
|
|
||||||
|
func NewConfig(path string) (config.Config, error) {
|
||||||
|
if _, err := os.Stat(path); err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
return &file{path: path}, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
type file struct {
|
||||||
|
path string
|
||||||
|
}
|
||||||
|
|
||||||
|
func (c *file) Read() ([]byte, error) {
|
||||||
|
return ioutil.ReadFile(c.path)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Watch listen for config changes and send updated content to the updates channel
|
||||||
|
func (c *file) Watch(ctx context.Context, updates chan<- []byte) error {
|
||||||
|
log := logger.From(ctx)
|
||||||
|
errs := make(chan error, 1)
|
||||||
|
go func() {
|
||||||
|
watcher, err := fsnotify.NewWatcher()
|
||||||
|
if err != nil {
|
||||||
|
errs <- err
|
||||||
|
return
|
||||||
|
}
|
||||||
|
defer watcher.Close()
|
||||||
|
// we have to watch the entire directory to pick up renames/atomic saves in a cross-platform way
|
||||||
|
configFile := filepath.Clean(c.path)
|
||||||
|
configDir, _ := filepath.Split(configFile)
|
||||||
|
realConfigFile, _ := filepath.EvalSymlinks(c.path)
|
||||||
|
|
||||||
|
eventsWG := sync.WaitGroup{}
|
||||||
|
eventsWG.Add(1)
|
||||||
|
go func() {
|
||||||
|
for {
|
||||||
|
select {
|
||||||
|
case event, ok := <-watcher.Events:
|
||||||
|
if !ok { // 'Events' channel is closed
|
||||||
|
eventsWG.Done()
|
||||||
|
return
|
||||||
|
}
|
||||||
|
currentConfigFile, _ := filepath.EvalSymlinks(c.path)
|
||||||
|
// we only care about the config file with the following cases:
|
||||||
|
// 1 - if the config file was modified or created
|
||||||
|
// 2 - if the real path to the config file changed (eg: k8s ConfigMap replacement)
|
||||||
|
const writeOrCreateMask = fsnotify.Write | fsnotify.Create
|
||||||
|
if (filepath.Clean(event.Name) == configFile &&
|
||||||
|
event.Op&writeOrCreateMask != 0) ||
|
||||||
|
(currentConfigFile != "" && currentConfigFile != realConfigFile) {
|
||||||
|
realConfigFile = currentConfigFile
|
||||||
|
b, err := c.Read()
|
||||||
|
if err != nil {
|
||||||
|
log.WithError(err).Error("failed to read config")
|
||||||
|
break
|
||||||
|
}
|
||||||
|
out := make([]byte, len(b))
|
||||||
|
copy(out, b)
|
||||||
|
updates <- out
|
||||||
|
} else if filepath.Clean(event.Name) == configFile &&
|
||||||
|
event.Op&fsnotify.Remove&fsnotify.Remove != 0 {
|
||||||
|
eventsWG.Done()
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
case err, ok := <-watcher.Errors:
|
||||||
|
if ok { // 'Errors' channel is not closed
|
||||||
|
log.WithError(err).Error("watcher failed")
|
||||||
|
}
|
||||||
|
eventsWG.Done()
|
||||||
|
return
|
||||||
|
case <-ctx.Done():
|
||||||
|
return
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}()
|
||||||
|
|
||||||
|
errs <- watcher.Add(configDir) // done initializing the watch in this go routine, so the parent routine can move on...
|
||||||
|
eventsWG.Wait() // now, wait for event loop to end in this go-routine...
|
||||||
|
}()
|
||||||
|
// initWG.Wait() // make sure that the go routine above fully ended before returning
|
||||||
|
return <-errs
|
||||||
|
}
|
98
config/file/config_test.go
Normal file
98
config/file/config_test.go
Normal file
@ -0,0 +1,98 @@
|
|||||||
|
// inspired by / taken from github.com/spf13/viper
|
||||||
|
|
||||||
|
package file
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
"io/ioutil"
|
||||||
|
"os"
|
||||||
|
"os/exec"
|
||||||
|
"path"
|
||||||
|
"path/filepath"
|
||||||
|
"runtime"
|
||||||
|
"testing"
|
||||||
|
|
||||||
|
"github.com/stretchr/testify/assert"
|
||||||
|
"github.com/stretchr/testify/require"
|
||||||
|
|
||||||
|
"go.linka.cloud/grpc/config"
|
||||||
|
)
|
||||||
|
|
||||||
|
func newConfigFile(t *testing.T) (config.Config, string, func()){
|
||||||
|
path := filepath.Join(os.TempDir(), "config.yaml")
|
||||||
|
if err := ioutil.WriteFile(path, []byte("ok"), os.ModePerm); err != nil {
|
||||||
|
t.Fatal(err)
|
||||||
|
}
|
||||||
|
cleanUp := func() {
|
||||||
|
if err := os.Remove(path); err != nil {
|
||||||
|
t.Error(err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return &file{path: path}, path, cleanUp
|
||||||
|
}
|
||||||
|
|
||||||
|
func newSymlinkedConfigFile(t *testing.T) (config.Config, string, string, func()) {
|
||||||
|
watchDir, err := ioutil.TempDir("", "")
|
||||||
|
require.Nil(t, err)
|
||||||
|
dataDir1 := path.Join(watchDir, "data1")
|
||||||
|
err = os.Mkdir(dataDir1, 0o777)
|
||||||
|
require.Nil(t, err)
|
||||||
|
realConfigFile := path.Join(dataDir1, "config.yaml")
|
||||||
|
t.Logf("Real config file location: %s\n", realConfigFile)
|
||||||
|
err = ioutil.WriteFile(realConfigFile, []byte("foo: bar\n"), 0o640)
|
||||||
|
require.Nil(t, err)
|
||||||
|
cleanup := func() {
|
||||||
|
os.RemoveAll(watchDir)
|
||||||
|
}
|
||||||
|
// now, symlink the tm `data1` dir to `data` in the baseDir
|
||||||
|
os.Symlink(dataDir1, path.Join(watchDir, "data"))
|
||||||
|
// and link the `<watchdir>/datadir1/config.yaml` to `<watchdir>/config.yaml`
|
||||||
|
configFile := path.Join(watchDir, "config.yaml")
|
||||||
|
os.Symlink(path.Join(watchDir, "data", "config.yaml"), configFile)
|
||||||
|
path := path.Join(watchDir, "config.yaml")
|
||||||
|
t.Logf("Config file location: %s\n", path)
|
||||||
|
return &file{path: path}, watchDir, configFile, cleanup
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestWatch(t *testing.T) {
|
||||||
|
t.Run("file content changed", func(t *testing.T) {
|
||||||
|
// given a `config.yaml` file being watched
|
||||||
|
v, cpath, cleanup := newConfigFile(t)
|
||||||
|
defer cleanup()
|
||||||
|
updates := make(chan []byte, 1)
|
||||||
|
if err := v.Watch(context.Background(), updates); err != nil {
|
||||||
|
t.Fatal(err)
|
||||||
|
}
|
||||||
|
// when overwriting the file and waiting for the custom change notification handler to be triggered
|
||||||
|
err := ioutil.WriteFile(cpath, []byte("foo: baz\n"), 0o640)
|
||||||
|
b := <- updates
|
||||||
|
// then the config value should have changed
|
||||||
|
require.Nil(t, err)
|
||||||
|
assert.Equal(t, []byte("foo: baz\n"), b)
|
||||||
|
})
|
||||||
|
|
||||||
|
t.Run("link to real file changed (à la Kubernetes)", func(t *testing.T) {
|
||||||
|
// skip if not executed on Linux
|
||||||
|
if runtime.GOOS != "linux" {
|
||||||
|
t.Skipf("Skipping test as symlink replacements don't work on non-linux environment...")
|
||||||
|
}
|
||||||
|
v, watchDir, _, cleanup := newSymlinkedConfigFile(t)
|
||||||
|
defer cleanup()
|
||||||
|
updates := make(chan []byte, 1)
|
||||||
|
if err := v.Watch(context.Background(), updates); err != nil {
|
||||||
|
t.Fatal(err)
|
||||||
|
}
|
||||||
|
// when link to another `config.yaml` file
|
||||||
|
dataDir2 := path.Join(watchDir, "data2")
|
||||||
|
err := os.MkdirAll(dataDir2, 0o777)
|
||||||
|
require.NoError(t, err)
|
||||||
|
configFile2 := path.Join(dataDir2, "config.yaml")
|
||||||
|
err = ioutil.WriteFile(configFile2, []byte("foo: baz\n"), 0o640)
|
||||||
|
require.NoError(t, err)
|
||||||
|
// change the symlink using the `ln -sfn` command
|
||||||
|
err = exec.Command("ln", "-sfn", dataDir2, path.Join(watchDir, "data")).Run()
|
||||||
|
require.NoError(t, err)
|
||||||
|
b := <-updates
|
||||||
|
assert.Equal(t, []byte("foo: baz\n"), b)
|
||||||
|
})
|
||||||
|
}
|
Loading…
x
Reference in New Issue
Block a user