[wip] base options, command, service

This commit is contained in:
2020-01-22 14:02:06 +01:00
commit 8197d933f8
10 changed files with 1151 additions and 0 deletions

41
service/command.go Normal file
View File

@ -0,0 +1,41 @@
package service
import (
"github.com/spf13/cobra"
"github.com/spf13/viper"
)
var cmd = &cobra.Command{
Short: "A gRPC micro service",
}
const (
caCert = "ca_cert"
serverAddress = "server_address"
serverCert = "server_cert"
serverKey = "server_key"
)
func init() {
viper.AutomaticEnv()
// server_address
cmd.Flags().String(serverAddress, "0.0.0.0:0", "Bind address for the server. 127.0.0.1:9090 [$SERVER_ADDRESS]")
viper.BindPFlag(serverAddress, cmd.Flags().Lookup(serverAddress))
// ca_cert
cmd.Flags().String(caCert, "", "Path to Root CA certificate [$CA_CERT]")
viper.BindPFlag(caCert, cmd.Flags().Lookup(caCert))
// server_cert
cmd.Flags().String(serverCert, "", "Path to Server certificate [$SERVER_CERT]")
viper.BindPFlag(serverCert, cmd.Flags().Lookup(serverCert))
// server_key
cmd.Flags().String(serverKey, "", "Path to Server key [$SERVER_KEY]")
viper.BindPFlag(serverKey, cmd.Flags().Lookup(serverKey))
}
func parseFlags(o *options) *options {
o.address = viper.GetString(serverAddress)
o.caCert = viper.GetString(caCert)
o.cert = viper.GetString(serverCert)
o.key = viper.GetString(serverKey)
return o
}

337
service/options.go Normal file
View File

@ -0,0 +1,337 @@
package service
import (
"context"
"crypto/tls"
"crypto/x509"
"fmt"
"io/ioutil"
"reflect"
"github.com/jinzhu/gorm"
"google.golang.org/grpc"
)
/*
GLOBAL OPTIONS:
--client value Client for go-micro; rpc [$MICRO_CLIENT]
--client_request_timeout value Sets the client request timeout. e.g 500ms, 5s, 1m. Default: 5s [$MICRO_CLIENT_REQUEST_TIMEOUT]
--client_retries value Sets the client retries. Default: 1 (default: 1) [$MICRO_CLIENT_RETRIES]
--client_pool_size value Sets the client connection pool size. Default: 1 (default: 0) [$MICRO_CLIENT_POOL_SIZE]
--client_pool_ttl value Sets the client connection pool ttl. e.g 500ms, 5s, 1m. Default: 1m [$MICRO_CLIENT_POOL_TTL]
--register_ttl value Register TTL in seconds (default: 0) [$MICRO_REGISTER_TTL]
--register_interval value Register interval in seconds (default: 0) [$MICRO_REGISTER_INTERVAL]
--server value Server for go-micro; rpc [$MICRO_SERVER]
--server_name value Name of the server. go.micro.srv.example [$MICRO_SERVER_NAME]
--server_version value Version of the server. 1.1.0 [$MICRO_SERVER_VERSION]
--server_id value Id of the server. Auto-generated if not specified [$MICRO_SERVER_ID]
--server_address value Bind address for the server. 127.0.0.1:8080 [$MICRO_SERVER_ADDRESS]
--server_advertise value Used instead of the server_address when registering with discovery. 127.0.0.1:8080 [$MICRO_SERVER_ADVERTISE]
--server_metadata value A list of key-value pairs defining metadata. version=1.0.0 [$MICRO_SERVER_METADATA]
--broker value Broker for pub/sub. http, nats, rabbitmq [$MICRO_BROKER]
--broker_address value Comma-separated list of broker addresses [$MICRO_BROKER_ADDRESS]
--registry value Registry for discovery. consul, mdns [$MICRO_REGISTRY]
--registry_address value Comma-separated list of registry addresses [$MICRO_REGISTRY_ADDRESS]
--selector value Selector used to pick nodes for querying [$MICRO_SELECTOR]
--transport value Transport mechanism used; http [$MICRO_TRANSPORT]
--transport_address value Comma-separated list of transport addresses [$MICRO_TRANSPORT_ADDRESS]
--db_path value Path to sqlite db (e.g. /data/agents.db) (default: "agents.db") [$DB_PATH]
--help, -h show help
--register_ttl REGISTER_TTL
--register_interval REGISTER_INTERVAL
--server_name SERVER_NAME
--server_version SERVER_VERSION
--server_id SERVER_ID
--server_advertise SERVER_ADVERTISE
--broker BROKER
--broker_address BROKER_ADDRESS
--registry REGISTRY
--registry_address REGISTRY_ADDRESS
--selector SELECTOR
--transport TRANSPORT
--transport_address TRANSPORT_ADDRESS
--db_path DB_PATH
--server_address SERVER_ADDRESS
--ca_cert CA_CERT
--server_cert SERVER_CERT
--server_key SERVER_KEY
*/
type Options interface {
Context() context.Context
Name() string
Address() string
CACert() string
Cert() string
Key() string
TLSConfig() tls.Config
DB() *gorm.DB
BeforeStart() []func() error
AfterStart() []func() error
BeforeStop() []func() error
AfterStop() []func() error
ServerOpts() []grpc.ServerOption
ServerInterceptors() []grpc.UnaryServerInterceptor
StreamServerInterceptors() []grpc.StreamServerInterceptor
ClientInterceptors() []grpc.UnaryClientInterceptor
StreamClientInterceptors() []grpc.StreamClientInterceptor
Defaults()
}
func NewOptions() *options {
return &options{
ctx: context.Background(),
address: ":0",
}
}
func (o *options) Defaults() {
if o.ctx == nil {
o.ctx = context.Background()
}
if o.address == "" {
o.address = "0.0.0.0:0"
}
}
type Option func(*options)
func WithName(name string) Option {
return func(o *options) {
o.name = name
}
}
// Context specifies a context for the service.
// Can be used to signal shutdown of the service.
// Can be used for extra option values.
func WithContext(ctx context.Context) Option {
return func(o *options) {
o.ctx = ctx
}
}
// Address sets the address of the server
func WithAddress(addr string) Option {
return func(o *options) {
o.address = addr
}
}
func WithGRPCServerOpts(opts ...grpc.ServerOption) Option {
return func(o *options) {
o.serverOpts = append(o.serverOpts, opts...)
}
}
func WithCACert(path string) Option {
return func(o *options) {
o.caCert = path
}
}
func WithCert(path string) Option {
return func(o *options) {
o.cert = path
}
}
func WithKey(path string) Option {
return func(o *options) {
o.key = path
}
}
func WithDB(db *gorm.DB) Option {
return func(o *options) {
o.db = db
}
}
func WithTLSConfig(conf *tls.Config) Option {
return func(o *options) {
if conf != nil {
o.tlsConfig = *conf
}
}
}
func WithBeforeStart(fn ...func() error) Option {
return func(o *options) {
o.beforeStart = append(o.beforeStart, fn...)
}
}
func WithBeforeStop(fn ...func() error) Option {
return func(o *options) {
o.beforeStop = append(o.beforeStop, fn...)
}
}
func WithAfterStart(fn ...func() error) Option {
return func(o *options) {
o.afterStart = append(o.afterStart, fn...)
}
}
func WithAfterStop(fn ...func() error) Option {
return func(o *options) {
o.afterStop = append(o.afterStop, fn...)
}
}
func WithUnaryClientInterceptor(i ...grpc.UnaryClientInterceptor) Option {
return func(o *options) {
o.clientInterceptors = append(o.clientInterceptors, i...)
}
}
// WrapHandler adds a handler Wrapper to a list of options passed into the server
func WithUnaryServerInterceptor(i ...grpc.UnaryServerInterceptor) Option {
return func(o *options) {
o.serverInterceptors = append(o.serverInterceptors, i...)
}
}
func WithStreamServerInterceptor(i ...grpc.StreamServerInterceptor) Option {
return func(o *options) {
o.streamServerInterceptors = append(o.streamServerInterceptors, i...)
}
}
func WithStreamClientInterceptor(i ...grpc.StreamClientInterceptor) Option {
return func(o *options) {
o.streamClientInterceptors = append(o.streamClientInterceptors, i...)
}
}
// WrapSubscriber adds a subscriber Wrapper to a list of options passed into the server
func WithSubscriberInterceptor(w ...interface{}) Option {
return func(o *options) {
}
}
type options struct {
ctx context.Context
name string
address string
caCert string
cert string
key string
tlsConfig tls.Config
db *gorm.DB
beforeStart []func() error
afterStart []func() error
beforeStop []func() error
afterStop []func() error
serverOpts []grpc.ServerOption
serverInterceptors []grpc.UnaryServerInterceptor
streamServerInterceptors []grpc.StreamServerInterceptor
clientInterceptors []grpc.UnaryClientInterceptor
streamClientInterceptors []grpc.StreamClientInterceptor
}
func (o *options) Name() string {
return o.name
}
func (o *options) Context() context.Context {
return o.ctx
}
func (o *options) Address() string {
return o.address
}
func (o *options) CACert() string {
return o.caCert
}
func (o *options) Cert() string {
return o.cert
}
func (o *options) Key() string {
return o.key
}
func (o *options) TLSConfig() tls.Config {
return o.tlsConfig
}
func (o *options) DB() *gorm.DB {
return o.db
}
func (o *options) BeforeStart() []func() error {
return o.beforeStart
}
func (o *options) AfterStart() []func() error {
return o.afterStart
}
func (o *options) BeforeStop() []func() error {
return o.beforeStop
}
func (o *options) AfterStop() []func() error {
return o.afterStop
}
func (o *options) ServerOpts() []grpc.ServerOption {
return o.serverOpts
}
func (o *options) ServerInterceptors() []grpc.UnaryServerInterceptor {
return o.serverInterceptors
}
func (o *options) StreamServerInterceptors() []grpc.StreamServerInterceptor {
return o.streamServerInterceptors
}
func (o *options) ClientInterceptors() []grpc.UnaryClientInterceptor {
return o.clientInterceptors
}
func (o *options) StreamClientInterceptors() []grpc.StreamClientInterceptor {
return o.streamClientInterceptors
}
func (o *options) parseTLSConfig() error {
if o.hasTLSConfig() {
return nil
}
caCert, err := ioutil.ReadFile(o.caCert)
if err != nil {
return err
}
caCertPool := x509.NewCertPool()
ok := caCertPool.AppendCertsFromPEM(caCert)
if !ok {
return fmt.Errorf("failed to load CA Cert from %s", o.caCert)
}
cert, err := tls.LoadX509KeyPair(o.cert, o.key)
if err != nil {
return err
}
o.tlsConfig = tls.Config{
ClientAuth: tls.RequireAndVerifyClientCert,
ClientCAs: caCertPool,
RootCAs: caCertPool,
Certificates: []tls.Certificate{cert},
}
return nil
}
func (o *options) hasTLSConfig() bool {
return reflect.DeepEqual(o.tlsConfig, tls.Config{})
}

148
service/service.go Normal file
View File

@ -0,0 +1,148 @@
package service
import (
"net"
"os"
"sync"
grpcmiddleware "github.com/grpc-ecosystem/go-grpc-middleware"
"github.com/jinzhu/gorm"
"github.com/spf13/cobra"
"google.golang.org/grpc"
)
type Service interface {
Options() Options
Server() *grpc.Server
DB() *gorm.DB
Start() error
Stop() error
Close() error
Cmd() *cobra.Command
}
func New(opts ...Option) (Service, error) {
return newService(opts...)
}
type service struct {
cmd *cobra.Command
opts *options
server *grpc.Server
list net.Listener
mu sync.Mutex
running bool
}
func newService(opts ...Option) (*service, error) {
cmd.ParseFlags(os.Args)
s := &service{
opts: parseFlags(NewOptions()),
cmd: cmd,
}
s.mu.Lock()
defer s.mu.Unlock()
for _, f := range opts {
f(s.opts)
}
go func() {
for {
select {
case <-s.opts.ctx.Done():
s.Stop()
}
}
}()
if err := s.opts.parseTLSConfig(); err != nil {
return nil, err
}
cmd.Use = s.opts.name
cmd.RunE = func(cmd *cobra.Command, args []string) error {
if cmd.Use == "" {
cmd.Use = os.Args[0]
}
return s.run()
}
gopts := []grpc.ServerOption{grpc.UnaryInterceptor(grpcmiddleware.ChainUnaryServer(s.opts.serverInterceptors...))}
// TODO : check tls config and tls auth
// grpc.Creds(credentials.NewTLS(&s.opts.tlsConfig))
s.server = grpc.NewServer(append(gopts, s.opts.serverOpts...)...)
return s, nil
}
func (s *service) Options() Options {
return s.opts
}
func (s *service) DB() *gorm.DB {
return s.opts.db
}
func (s *service) Server() *grpc.Server {
return s.server
}
func (s *service) Cmd() *cobra.Command {
return s.cmd
}
func (s *service) run() error {
s.mu.Lock()
for i := range s.opts.beforeStart {
if err := s.opts.beforeStart[i](); err != nil {
return err
}
}
var err error
s.running = true
s.list, err = net.Listen("tcp", s.opts.address)
if err != nil {
s.mu.Unlock()
return err
}
s.opts.address = s.list.Addr().String()
errs := make(chan error)
go func() {
errs <- s.server.Serve(s.list)
}()
for i := range s.opts.afterStart {
if err := s.opts.afterStart[i](); err != nil {
s.mu.Unlock()
s.Stop()
return err
}
}
s.mu.Unlock()
return <- errs
}
func (s *service) Start() error {
return s.cmd.Execute()
}
func (s *service) Stop() error {
s.mu.Lock()
defer s.mu.Unlock()
if ! s.running {
return nil
}
for i := range s.opts.beforeStop {
if err := s.opts.beforeStop[i](); err != nil {
return err
}
}
s.server.GracefulStop()
s.running = false
for i := range s.opts.afterStop {
if err := s.opts.afterStop[i](); err != nil {
}
}
return nil
}
func (s *service) Close() error {
s.mu.Lock()
defer s.mu.Unlock()
return s.Stop()
}