From 6e8612094339d9a19951d3c037f11a70ec4d2f95 Mon Sep 17 00:00:00 2001 From: Adphi Date: Fri, 15 Jul 2022 15:48:09 +0200 Subject: [PATCH] add server interface interceptors Signed-off-by: Adphi --- example/example.go | 47 +++++++++++++++++++++++++----- interceptors/iface/interceptors.go | 41 ++++++++++++++++++++++++++ interceptors/noop/interceptors.go | 39 +++++++++++++++++++++++++ service/interceptors.go | 20 +++++++++++++ service/service.go | 22 +++++--------- 5 files changed, 147 insertions(+), 22 deletions(-) create mode 100644 interceptors/iface/interceptors.go create mode 100644 interceptors/noop/interceptors.go create mode 100644 service/interceptors.go diff --git a/example/example.go b/example/example.go index a86d30b..18caf86 100644 --- a/example/example.go +++ b/example/example.go @@ -14,6 +14,7 @@ import ( "github.com/prometheus/client_golang/prometheus/promhttp" "github.com/spf13/cobra" "google.golang.org/grpc" + "google.golang.org/grpc/health/grpc_health_v1" "google.golang.org/grpc/metadata" greflectsvc "google.golang.org/grpc/reflection/grpc_reflection_v1alpha" "google.golang.org/protobuf/proto" @@ -22,12 +23,18 @@ import ( "go.linka.cloud/grpc/client" "go.linka.cloud/grpc/interceptors/auth" "go.linka.cloud/grpc/interceptors/defaulter" + "go.linka.cloud/grpc/interceptors/iface" metrics2 "go.linka.cloud/grpc/interceptors/metrics" validation2 "go.linka.cloud/grpc/interceptors/validation" "go.linka.cloud/grpc/logger" "go.linka.cloud/grpc/service" ) +var ( + _ iface.UnaryInterceptor = (*GreeterHandler)(nil) + _ iface.StreamInterceptor = (*GreeterHandler)(nil) +) + type GreeterHandler struct { UnimplementedGreeterServer } @@ -50,6 +57,20 @@ func (g *GreeterHandler) SayHelloStream(req *HelloStreamRequest, s Greeter_SayHe return nil } +func (g *GreeterHandler) UnaryServerInterceptor() grpc.UnaryServerInterceptor { + return func(ctx context.Context, req interface{}, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (resp interface{}, err error) { + logger.C(ctx).Infof("called service interface unary interceptor") + return handler(ctx, req) + } +} + +func (g *GreeterHandler) StreamServerInterceptor() grpc.StreamServerInterceptor { + return func(srv interface{}, ss grpc.ServerStream, info *grpc.StreamServerInfo, handler grpc.StreamHandler) error { + logger.C(ss.Context()).Infof("called service interface stream interceptor") + return handler(srv, ss) + } +} + func httpLogger(next http.Handler) http.Handler { return http.HandlerFunc(func(writer http.ResponseWriter, request *http.Request) { start := time.Now() @@ -115,14 +136,18 @@ func run(opts ...service.Option) { service.WithGRPCWebPrefix("/grpc"), service.WithMiddlewares(httpLogger), service.WithInterceptors(metrics), - service.WithServerInterceptors(auth.NewServerInterceptors(auth.WithBasicValidators(func(ctx context.Context, user, password string) (context.Context, error) { - if !auth.Equals(user, "admin") || !auth.Equals(password, "admin") { - return ctx, fmt.Errorf("invalid user or password") - } - log.Infof("request authenticated") - return ctx, nil - }))), + service.WithServerInterceptors( + auth.NewServerInterceptors(auth.WithBasicValidators(func(ctx context.Context, user, password string) (context.Context, error) { + if !auth.Equals(user, "admin") || !auth.Equals(password, "admin") { + return ctx, fmt.Errorf("invalid user or password") + } + log.Infof("request authenticated") + return ctx, nil + })), + ), service.WithInterceptors(defaulter, validation), + // enable server interface interceptor + service.WithServerInterceptors(iface.New()), ) svc, err = service.New(opts...) if err != nil { @@ -158,7 +183,13 @@ func run(opts ...service.Option) { log.Fatal(err) } g := NewGreeterClient(s) - defer cancel() + h := grpc_health_v1.NewHealthClient(s) + hres, err := h.Check(ctx, &grpc_health_v1.HealthCheckRequest{}) + if err != nil { + log.Fatal(err) + } + log.Infof("status: %v", hres.Status) + md := metadata.MD{} res, err := g.SayHello(ctx, &HelloRequest{Name: "test"}, grpc.Header(&md)) if err != nil { diff --git a/interceptors/iface/interceptors.go b/interceptors/iface/interceptors.go new file mode 100644 index 0000000..2c1f8c7 --- /dev/null +++ b/interceptors/iface/interceptors.go @@ -0,0 +1,41 @@ +package iface + +import ( + "context" + + "google.golang.org/grpc" + + "go.linka.cloud/grpc/interceptors" +) + +type UnaryInterceptor interface { + UnaryServerInterceptor() grpc.UnaryServerInterceptor +} + +type StreamInterceptor interface { + StreamServerInterceptor() grpc.StreamServerInterceptor +} + +type iface struct{} + +func New() interceptors.ServerInterceptors { + return &iface{} +} + +func (s iface) UnaryServerInterceptor() grpc.UnaryServerInterceptor { + return func(ctx context.Context, req interface{}, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (resp interface{}, err error) { + if i, ok := info.Server.(UnaryInterceptor); ok { + return i.UnaryServerInterceptor()(ctx, req, info, handler) + } + return handler(ctx, req) + } +} + +func (s iface) StreamServerInterceptor() grpc.StreamServerInterceptor { + return func(srv interface{}, ss grpc.ServerStream, info *grpc.StreamServerInfo, handler grpc.StreamHandler) error { + if i, ok := srv.(StreamInterceptor); ok { + return i.StreamServerInterceptor()(srv, ss, info, handler) + } + return handler(srv, ss) + } +} diff --git a/interceptors/noop/interceptors.go b/interceptors/noop/interceptors.go new file mode 100644 index 0000000..0fd5876 --- /dev/null +++ b/interceptors/noop/interceptors.go @@ -0,0 +1,39 @@ +package noop + +import ( + "context" + + "google.golang.org/grpc" + + "go.linka.cloud/grpc/interceptors" +) + +func New() interceptors.Interceptors { + return &noopInterceptor{} +} + +type noopInterceptor struct{} + +func (m *noopInterceptor) UnaryServerInterceptor() grpc.UnaryServerInterceptor { + return func(ctx context.Context, req interface{}, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (resp interface{}, err error) { + return handler(ctx, req) + } +} + +func (m *noopInterceptor) StreamServerInterceptor() grpc.StreamServerInterceptor { + return func(srv interface{}, ss grpc.ServerStream, info *grpc.StreamServerInfo, handler grpc.StreamHandler) error { + return handler(srv, ss) + } +} + +func (m *noopInterceptor) UnaryClientInterceptor() grpc.UnaryClientInterceptor { + return func(ctx context.Context, method string, req, reply interface{}, cc *grpc.ClientConn, invoker grpc.UnaryInvoker, opts ...grpc.CallOption) error { + return invoker(ctx, method, req, reply, cc, opts...) + } +} + +func (m *noopInterceptor) StreamClientInterceptor() grpc.StreamClientInterceptor { + return func(ctx context.Context, desc *grpc.StreamDesc, cc *grpc.ClientConn, method string, streamer grpc.Streamer, opts ...grpc.CallOption) (grpc.ClientStream, error) { + return streamer(ctx, desc, cc, method, opts...) + } +} diff --git a/service/interceptors.go b/service/interceptors.go new file mode 100644 index 0000000..e4bf849 --- /dev/null +++ b/service/interceptors.go @@ -0,0 +1,20 @@ +package service + +import ( + "go.linka.cloud/grpc/interceptors" + "go.linka.cloud/grpc/interceptors/metadata" +) + +func md(opts *options) interceptors.Interceptors { + var pairs []string + if opts.name != "" { + pairs = append(pairs, "grpc-service-name", opts.name) + } + if opts.version != "" { + pairs = append(pairs, "grpc-service-version", opts.version) + } + if len(pairs) != 0 { + return metadata.NewInterceptors(pairs...) + } + return nil +} diff --git a/service/service.go b/service/service.go index e132976..80db11e 100644 --- a/service/service.go +++ b/service/service.go @@ -27,7 +27,6 @@ import ( "google.golang.org/grpc/health/grpc_health_v1" greflect "google.golang.org/grpc/reflection" - "go.linka.cloud/grpc/interceptors/metadata" "go.linka.cloud/grpc/logger" "go.linka.cloud/grpc/registry" "go.linka.cloud/grpc/registry/noop" @@ -76,20 +75,15 @@ func newService(opts ...Option) (*service, error) { for _, f := range opts { f(s.opts) } - if s.opts.name != "" { - i := metadata.NewInterceptors("grpc-service-name", s.opts.name) - s.opts.unaryServerInterceptors = append([]grpc.UnaryServerInterceptor{i.UnaryServerInterceptor()}, s.opts.unaryServerInterceptors...) - s.opts.unaryClientInterceptors = append([]grpc.UnaryClientInterceptor{i.UnaryClientInterceptor()}, s.opts.unaryClientInterceptors...) - s.opts.streamServerInterceptors = append([]grpc.StreamServerInterceptor{i.StreamServerInterceptor()}, s.opts.streamServerInterceptors...) - s.opts.streamClientInterceptors = append([]grpc.StreamClientInterceptor{i.StreamClientInterceptor()}, s.opts.streamClientInterceptors...) - } - if s.opts.version != "" { - i := metadata.NewInterceptors("grpc-service-version", s.opts.version) - s.opts.unaryServerInterceptors = append([]grpc.UnaryServerInterceptor{i.UnaryServerInterceptor()}, s.opts.unaryServerInterceptors...) - s.opts.unaryClientInterceptors = append([]grpc.UnaryClientInterceptor{i.UnaryClientInterceptor()}, s.opts.unaryClientInterceptors...) - s.opts.streamServerInterceptors = append([]grpc.StreamServerInterceptor{i.StreamServerInterceptor()}, s.opts.streamServerInterceptors...) - s.opts.streamClientInterceptors = append([]grpc.StreamClientInterceptor{i.StreamClientInterceptor()}, s.opts.streamClientInterceptors...) + + md := md(s.opts) + if md != nil { + s.opts.unaryServerInterceptors = append([]grpc.UnaryServerInterceptor{md.UnaryServerInterceptor()}, s.opts.unaryServerInterceptors...) + s.opts.unaryClientInterceptors = append([]grpc.UnaryClientInterceptor{md.UnaryClientInterceptor()}, s.opts.unaryClientInterceptors...) + s.opts.streamServerInterceptors = append([]grpc.StreamServerInterceptor{md.StreamServerInterceptor()}, s.opts.streamServerInterceptors...) + s.opts.streamClientInterceptors = append([]grpc.StreamClientInterceptor{md.StreamClientInterceptor()}, s.opts.streamClientInterceptors...) } + if s.opts.mux == nil { s.opts.mux = http.NewServeMux() }