service: replace cmd with NewFlagSet, add internal service version metadata interceptors

client: add NewFlagSet, add missing Options interface methods

Signed-off-by: Adphi <philippe.adrien.nousse@gmail.com>
This commit is contained in:
2022-03-10 12:46:36 +01:00
parent e578d62a29
commit c0e79d8834
10 changed files with 597 additions and 159 deletions

View File

@ -1,58 +0,0 @@
package service
import (
"github.com/spf13/cobra"
"github.com/spf13/viper"
)
var cmd = &cobra.Command{
Short: "A gRPC micro service",
SilenceUsage: true,
FParseErrWhitelist: cobra.FParseErrWhitelist{UnknownFlags: true},
}
const (
serverAddress = "server_address"
secure = "secure"
reflection = "reflection"
caCert = "ca_cert"
serverCert = "server_cert"
serverKey = "server_key"
)
func init() {
viper.AutomaticEnv()
// server_address
cmd.Flags().String(serverAddress, "0.0.0.0:0", "Bind address for the server. 127.0.0.1:9090 [$SERVER_ADDRESS]")
viper.BindPFlag(serverAddress, cmd.Flags().Lookup(serverAddress))
// secure
cmd.Flags().Bool(secure, true, "Generate self signed certificate if none provided [$SECURE]")
viper.BindPFlag(secure, cmd.Flags().Lookup(secure))
// reflection
cmd.Flags().Bool(reflection, false, "Enable gRPC reflection server [$REFLECT]")
viper.BindPFlag(reflection, cmd.Flags().Lookup(reflection))
// ca_cert
cmd.Flags().String(caCert, "", "Path to Root CA certificate [$CA_CERT]")
viper.BindPFlag(caCert, cmd.Flags().Lookup(caCert))
// server_cert
cmd.Flags().String(serverCert, "", "Path to Server certificate [$SERVER_CERT]")
viper.BindPFlag(serverCert, cmd.Flags().Lookup(serverCert))
// server_key
cmd.Flags().String(serverKey, "", "Path to Server key [$SERVER_KEY]")
viper.BindPFlag(serverKey, cmd.Flags().Lookup(serverKey))
}
func parseFlags(o *options) *options {
o.address = viper.GetString(serverAddress)
o.secure = viper.GetBool(secure)
o.reflection = viper.GetBool(reflection)
o.caCert = viper.GetString(caCert)
o.cert = viper.GetString(serverCert)
o.key = viper.GetString(serverKey)
return o
}

52
service/flags.go Normal file
View File

@ -0,0 +1,52 @@
package service
import (
"fmt"
"strings"
env "github.com/caitlinelfring/go-env-default"
"github.com/spf13/pflag"
)
const (
serverAddress = "address"
insecure = "insecure"
reflection = "reflection"
caCert = "ca-cert"
serverCert = "server-cert"
serverKey = "server-key"
)
var u = strings.ToUpper
func NewFlagSet() (*pflag.FlagSet, Option) {
var (
optAddress string
optInsecure bool
optReflection bool
optCACert string
optCert string
optKey string
)
flags := pflag.NewFlagSet("gRPC", pflag.ContinueOnError)
flags.StringVarP(&optAddress, serverAddress, "a", env.GetDefault(u(serverAddress), "0.0.0.0:0"), "Bind address for the server, e.g. 127.0.0.1:9090"+flagEnv(serverAddress))
flags.BoolVar(&optInsecure, insecure, env.GetBoolDefault(u(insecure), false), "Do not generate self signed certificate if none provided"+flagEnv(insecure))
flags.BoolVar(&optReflection, reflection, env.GetBoolDefault(u(reflection), false), "Enable gRPC reflection server"+flagEnv(reflection))
flags.StringVar(&optCACert, caCert, "", "Path to Root CA certificate"+flagEnv(caCert))
flags.StringVar(&optCert, serverCert, "", "Path to Server certificate"+flagEnv(serverCert))
flags.StringVar(&optKey, serverKey, "", "Path to Server key"+flagEnv(serverKey))
return flags, func(o *options) {
o.address = optAddress
o.secure = !optInsecure
o.reflection = optReflection
// o.caCert = optCACert
// o.cert = optCert
// o.key = optKey
}
}
func flagEnv(name string) string {
return fmt.Sprintf(" [$%s]", strings.Replace(u(name), "-", "_", -1))
}

View File

@ -0,0 +1,48 @@
package service
import (
"context"
"google.golang.org/grpc"
"google.golang.org/grpc/metadata"
)
type mdInterceptors struct {
k, v string
}
func (i mdInterceptors) UnaryServerInterceptor() grpc.UnaryServerInterceptor {
return func(ctx context.Context, req interface{}, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (resp interface{}, err error) {
if err := grpc.SetHeader(ctx, metadata.Pairs(i.k, i.v)); err != nil {
return nil, err
}
return handler(ctx, req)
}
}
func (i mdInterceptors) StreamServerInterceptor() grpc.StreamServerInterceptor {
return func(srv interface{}, ss grpc.ServerStream, info *grpc.StreamServerInfo, handler grpc.StreamHandler) error {
if err := grpc.SetHeader(ss.Context(), metadata.Pairs(i.k, i.v)); err != nil {
return err
}
return handler(srv, ss)
}
}
func (i mdInterceptors) UnaryClientInterceptor() grpc.UnaryClientInterceptor {
return func(ctx context.Context, method string, req, reply interface{}, cc *grpc.ClientConn, invoker grpc.UnaryInvoker, opts ...grpc.CallOption) error {
if err := grpc.SetHeader(ctx, metadata.Pairs(i.k, i.v)); err != nil {
return err
}
return invoker(ctx, method, req, reply, cc, opts...)
}
}
func (i mdInterceptors) StreamClientInterceptor() grpc.StreamClientInterceptor {
return func(ctx context.Context, desc *grpc.StreamDesc, cc *grpc.ClientConn, method string, streamer grpc.Streamer, opts ...grpc.CallOption) (grpc.ClientStream, error) {
if err := grpc.SetHeader(ctx, metadata.Pairs(i.k, i.v)); err != nil {
return nil, err
}
return streamer(ctx, desc, cc, method, opts...)
}
}

View File

@ -61,6 +61,7 @@ type Options interface {
Address() string
Reflection() bool
Health() bool
CACert() string
Cert() string
@ -103,6 +104,7 @@ func NewOptions() *options {
return &options{
ctx: context.Background(),
address: ":0",
health: true,
}
}
@ -161,6 +163,12 @@ func WithReflection(r bool) Option {
}
}
func WithHealth(h bool) Option {
return func(o *options) {
o.health = h
}
}
func WithSecure(s bool) Option {
return func(o *options) {
o.secure = s
@ -351,6 +359,7 @@ type options struct {
address string
reflection bool
health bool
secure bool
caCert string
@ -413,6 +422,10 @@ func (o *options) Reflection() bool {
return o.reflection
}
func (o *options) Health() bool {
return o.health
}
func (o *options) CACert() string {
return o.caCert
}

View File

@ -22,9 +22,10 @@ import (
"github.com/rs/cors"
"github.com/sirupsen/logrus"
"github.com/soheilhy/cmux"
"github.com/spf13/cobra"
"go.uber.org/multierr"
"google.golang.org/grpc"
"google.golang.org/grpc/health"
"google.golang.org/grpc/health/grpc_health_v1"
greflect "google.golang.org/grpc/reflection"
"go.linka.cloud/grpc/registry"
@ -37,7 +38,6 @@ type Service interface {
Start() error
Stop() error
Close() error
Cmd() *cobra.Command
RegisterService(desc *grpc.ServiceDesc, impl interface{})
}
@ -47,7 +47,6 @@ func New(opts ...Option) (Service, error) {
}
type service struct {
cmd *cobra.Command
opts *options
cancel context.CancelFunc
@ -64,12 +63,8 @@ type service struct {
}
func newService(opts ...Option) (*service, error) {
if err := cmd.ParseFlags(os.Args); err != nil {
return nil, err
}
s := &service{
opts: parseFlags(NewOptions()),
cmd: cmd,
opts: NewOptions(),
id: uuid.New().String(),
inproc: &inprocgrpc.Channel{},
}
@ -78,6 +73,34 @@ func newService(opts ...Option) (*service, error) {
for _, f := range opts {
f(s.opts)
}
if s.opts.name != "" {
s.opts.unaryServerInterceptors = append([]grpc.UnaryServerInterceptor{mdInterceptors{
k: "grpc-service-name", v: s.opts.name,
}.UnaryServerInterceptor()}, s.opts.unaryServerInterceptors...)
s.opts.unaryClientInterceptors = append([]grpc.UnaryClientInterceptor{mdInterceptors{
k: "grpc-service-name", v: s.opts.name,
}.UnaryClientInterceptor()}, s.opts.unaryClientInterceptors...)
s.opts.streamServerInterceptors = append([]grpc.StreamServerInterceptor{mdInterceptors{
k: "grpc-service-name", v: s.opts.name,
}.StreamServerInterceptor()}, s.opts.streamServerInterceptors...)
s.opts.streamClientInterceptors = append([]grpc.StreamClientInterceptor{mdInterceptors{
k: "grpc-service-name", v: s.opts.name,
}.StreamClientInterceptor()}, s.opts.streamClientInterceptors...)
}
if s.opts.version != "" {
s.opts.unaryServerInterceptors = append([]grpc.UnaryServerInterceptor{mdInterceptors{
k: "grpc-service-version", v: s.opts.version,
}.UnaryServerInterceptor()}, s.opts.unaryServerInterceptors...)
s.opts.unaryClientInterceptors = append([]grpc.UnaryClientInterceptor{mdInterceptors{
k: "grpc-service-version", v: s.opts.version,
}.UnaryClientInterceptor()}, s.opts.unaryClientInterceptors...)
s.opts.streamServerInterceptors = append([]grpc.StreamServerInterceptor{mdInterceptors{
k: "grpc-service-version", v: s.opts.version,
}.StreamServerInterceptor()}, s.opts.streamServerInterceptors...)
s.opts.streamClientInterceptors = append([]grpc.StreamClientInterceptor{mdInterceptors{
k: "grpc-service-version", v: s.opts.version,
}.StreamClientInterceptor()}, s.opts.streamClientInterceptors...)
}
if s.opts.mux == nil {
s.opts.mux = http.NewServeMux()
}
@ -100,17 +123,11 @@ func newService(opts ...Option) (*service, error) {
if err := s.opts.parseTLSConfig(); err != nil {
return nil, err
}
cmd.Use = s.opts.name
cmd.RunE = func(cmd *cobra.Command, args []string) error {
if cmd.Use == "" {
cmd.Use = os.Args[0]
}
return s.run()
}
ui := grpcmiddleware.ChainUnaryServer(s.opts.unaryServerInterceptors...)
s.inproc = s.inproc.WithServerUnaryInterceptor(ui)
si := grpcmiddleware.ChainStreamServer(s.opts.streamServerInterceptors... )
si := grpcmiddleware.ChainStreamServer(s.opts.streamServerInterceptors...)
s.inproc = s.inproc.WithServerStreamInterceptor(si)
gopts := []grpc.ServerOption{
@ -121,6 +138,9 @@ func newService(opts ...Option) (*service, error) {
if s.opts.reflection {
greflect.Register(s.server)
}
if s.opts.health {
grpc_health_v1.RegisterHealthServer(s, health.NewServer())
}
if err := s.gateway(s.opts.gatewayOpts...); err != nil {
return nil, err
}
@ -136,10 +156,6 @@ func (s *service) DB() *gorm.DB {
return s.opts.db
}
func (s *service) Cmd() *cobra.Command {
return s.cmd
}
func (s *service) run() error {
s.mu.Lock()
s.closed = make(chan struct{})
@ -244,7 +260,7 @@ func (s *service) run() error {
}
func (s *service) Start() error {
return s.cmd.Execute()
return s.run()
}
func (s *service) Stop() error {