mirror of
https://github.com/linka-cloud/grpc.git
synced 2024-11-24 03:46:24 +00:00
add server interface interceptors
Signed-off-by: Adphi <philippe.adrien.nousse@gmail.com>
This commit is contained in:
parent
bb7e4b124b
commit
6e86120943
@ -14,6 +14,7 @@ import (
|
|||||||
"github.com/prometheus/client_golang/prometheus/promhttp"
|
"github.com/prometheus/client_golang/prometheus/promhttp"
|
||||||
"github.com/spf13/cobra"
|
"github.com/spf13/cobra"
|
||||||
"google.golang.org/grpc"
|
"google.golang.org/grpc"
|
||||||
|
"google.golang.org/grpc/health/grpc_health_v1"
|
||||||
"google.golang.org/grpc/metadata"
|
"google.golang.org/grpc/metadata"
|
||||||
greflectsvc "google.golang.org/grpc/reflection/grpc_reflection_v1alpha"
|
greflectsvc "google.golang.org/grpc/reflection/grpc_reflection_v1alpha"
|
||||||
"google.golang.org/protobuf/proto"
|
"google.golang.org/protobuf/proto"
|
||||||
@ -22,12 +23,18 @@ import (
|
|||||||
"go.linka.cloud/grpc/client"
|
"go.linka.cloud/grpc/client"
|
||||||
"go.linka.cloud/grpc/interceptors/auth"
|
"go.linka.cloud/grpc/interceptors/auth"
|
||||||
"go.linka.cloud/grpc/interceptors/defaulter"
|
"go.linka.cloud/grpc/interceptors/defaulter"
|
||||||
|
"go.linka.cloud/grpc/interceptors/iface"
|
||||||
metrics2 "go.linka.cloud/grpc/interceptors/metrics"
|
metrics2 "go.linka.cloud/grpc/interceptors/metrics"
|
||||||
validation2 "go.linka.cloud/grpc/interceptors/validation"
|
validation2 "go.linka.cloud/grpc/interceptors/validation"
|
||||||
"go.linka.cloud/grpc/logger"
|
"go.linka.cloud/grpc/logger"
|
||||||
"go.linka.cloud/grpc/service"
|
"go.linka.cloud/grpc/service"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
var (
|
||||||
|
_ iface.UnaryInterceptor = (*GreeterHandler)(nil)
|
||||||
|
_ iface.StreamInterceptor = (*GreeterHandler)(nil)
|
||||||
|
)
|
||||||
|
|
||||||
type GreeterHandler struct {
|
type GreeterHandler struct {
|
||||||
UnimplementedGreeterServer
|
UnimplementedGreeterServer
|
||||||
}
|
}
|
||||||
@ -50,6 +57,20 @@ func (g *GreeterHandler) SayHelloStream(req *HelloStreamRequest, s Greeter_SayHe
|
|||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (g *GreeterHandler) UnaryServerInterceptor() grpc.UnaryServerInterceptor {
|
||||||
|
return func(ctx context.Context, req interface{}, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (resp interface{}, err error) {
|
||||||
|
logger.C(ctx).Infof("called service interface unary interceptor")
|
||||||
|
return handler(ctx, req)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (g *GreeterHandler) StreamServerInterceptor() grpc.StreamServerInterceptor {
|
||||||
|
return func(srv interface{}, ss grpc.ServerStream, info *grpc.StreamServerInfo, handler grpc.StreamHandler) error {
|
||||||
|
logger.C(ss.Context()).Infof("called service interface stream interceptor")
|
||||||
|
return handler(srv, ss)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
func httpLogger(next http.Handler) http.Handler {
|
func httpLogger(next http.Handler) http.Handler {
|
||||||
return http.HandlerFunc(func(writer http.ResponseWriter, request *http.Request) {
|
return http.HandlerFunc(func(writer http.ResponseWriter, request *http.Request) {
|
||||||
start := time.Now()
|
start := time.Now()
|
||||||
@ -115,14 +136,18 @@ func run(opts ...service.Option) {
|
|||||||
service.WithGRPCWebPrefix("/grpc"),
|
service.WithGRPCWebPrefix("/grpc"),
|
||||||
service.WithMiddlewares(httpLogger),
|
service.WithMiddlewares(httpLogger),
|
||||||
service.WithInterceptors(metrics),
|
service.WithInterceptors(metrics),
|
||||||
service.WithServerInterceptors(auth.NewServerInterceptors(auth.WithBasicValidators(func(ctx context.Context, user, password string) (context.Context, error) {
|
service.WithServerInterceptors(
|
||||||
if !auth.Equals(user, "admin") || !auth.Equals(password, "admin") {
|
auth.NewServerInterceptors(auth.WithBasicValidators(func(ctx context.Context, user, password string) (context.Context, error) {
|
||||||
return ctx, fmt.Errorf("invalid user or password")
|
if !auth.Equals(user, "admin") || !auth.Equals(password, "admin") {
|
||||||
}
|
return ctx, fmt.Errorf("invalid user or password")
|
||||||
log.Infof("request authenticated")
|
}
|
||||||
return ctx, nil
|
log.Infof("request authenticated")
|
||||||
}))),
|
return ctx, nil
|
||||||
|
})),
|
||||||
|
),
|
||||||
service.WithInterceptors(defaulter, validation),
|
service.WithInterceptors(defaulter, validation),
|
||||||
|
// enable server interface interceptor
|
||||||
|
service.WithServerInterceptors(iface.New()),
|
||||||
)
|
)
|
||||||
svc, err = service.New(opts...)
|
svc, err = service.New(opts...)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
@ -158,7 +183,13 @@ func run(opts ...service.Option) {
|
|||||||
log.Fatal(err)
|
log.Fatal(err)
|
||||||
}
|
}
|
||||||
g := NewGreeterClient(s)
|
g := NewGreeterClient(s)
|
||||||
defer cancel()
|
h := grpc_health_v1.NewHealthClient(s)
|
||||||
|
hres, err := h.Check(ctx, &grpc_health_v1.HealthCheckRequest{})
|
||||||
|
if err != nil {
|
||||||
|
log.Fatal(err)
|
||||||
|
}
|
||||||
|
log.Infof("status: %v", hres.Status)
|
||||||
|
|
||||||
md := metadata.MD{}
|
md := metadata.MD{}
|
||||||
res, err := g.SayHello(ctx, &HelloRequest{Name: "test"}, grpc.Header(&md))
|
res, err := g.SayHello(ctx, &HelloRequest{Name: "test"}, grpc.Header(&md))
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
41
interceptors/iface/interceptors.go
Normal file
41
interceptors/iface/interceptors.go
Normal file
@ -0,0 +1,41 @@
|
|||||||
|
package iface
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
|
||||||
|
"google.golang.org/grpc"
|
||||||
|
|
||||||
|
"go.linka.cloud/grpc/interceptors"
|
||||||
|
)
|
||||||
|
|
||||||
|
type UnaryInterceptor interface {
|
||||||
|
UnaryServerInterceptor() grpc.UnaryServerInterceptor
|
||||||
|
}
|
||||||
|
|
||||||
|
type StreamInterceptor interface {
|
||||||
|
StreamServerInterceptor() grpc.StreamServerInterceptor
|
||||||
|
}
|
||||||
|
|
||||||
|
type iface struct{}
|
||||||
|
|
||||||
|
func New() interceptors.ServerInterceptors {
|
||||||
|
return &iface{}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s iface) UnaryServerInterceptor() grpc.UnaryServerInterceptor {
|
||||||
|
return func(ctx context.Context, req interface{}, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (resp interface{}, err error) {
|
||||||
|
if i, ok := info.Server.(UnaryInterceptor); ok {
|
||||||
|
return i.UnaryServerInterceptor()(ctx, req, info, handler)
|
||||||
|
}
|
||||||
|
return handler(ctx, req)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s iface) StreamServerInterceptor() grpc.StreamServerInterceptor {
|
||||||
|
return func(srv interface{}, ss grpc.ServerStream, info *grpc.StreamServerInfo, handler grpc.StreamHandler) error {
|
||||||
|
if i, ok := srv.(StreamInterceptor); ok {
|
||||||
|
return i.StreamServerInterceptor()(srv, ss, info, handler)
|
||||||
|
}
|
||||||
|
return handler(srv, ss)
|
||||||
|
}
|
||||||
|
}
|
39
interceptors/noop/interceptors.go
Normal file
39
interceptors/noop/interceptors.go
Normal file
@ -0,0 +1,39 @@
|
|||||||
|
package noop
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
|
||||||
|
"google.golang.org/grpc"
|
||||||
|
|
||||||
|
"go.linka.cloud/grpc/interceptors"
|
||||||
|
)
|
||||||
|
|
||||||
|
func New() interceptors.Interceptors {
|
||||||
|
return &noopInterceptor{}
|
||||||
|
}
|
||||||
|
|
||||||
|
type noopInterceptor struct{}
|
||||||
|
|
||||||
|
func (m *noopInterceptor) UnaryServerInterceptor() grpc.UnaryServerInterceptor {
|
||||||
|
return func(ctx context.Context, req interface{}, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (resp interface{}, err error) {
|
||||||
|
return handler(ctx, req)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (m *noopInterceptor) StreamServerInterceptor() grpc.StreamServerInterceptor {
|
||||||
|
return func(srv interface{}, ss grpc.ServerStream, info *grpc.StreamServerInfo, handler grpc.StreamHandler) error {
|
||||||
|
return handler(srv, ss)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (m *noopInterceptor) UnaryClientInterceptor() grpc.UnaryClientInterceptor {
|
||||||
|
return func(ctx context.Context, method string, req, reply interface{}, cc *grpc.ClientConn, invoker grpc.UnaryInvoker, opts ...grpc.CallOption) error {
|
||||||
|
return invoker(ctx, method, req, reply, cc, opts...)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (m *noopInterceptor) StreamClientInterceptor() grpc.StreamClientInterceptor {
|
||||||
|
return func(ctx context.Context, desc *grpc.StreamDesc, cc *grpc.ClientConn, method string, streamer grpc.Streamer, opts ...grpc.CallOption) (grpc.ClientStream, error) {
|
||||||
|
return streamer(ctx, desc, cc, method, opts...)
|
||||||
|
}
|
||||||
|
}
|
20
service/interceptors.go
Normal file
20
service/interceptors.go
Normal file
@ -0,0 +1,20 @@
|
|||||||
|
package service
|
||||||
|
|
||||||
|
import (
|
||||||
|
"go.linka.cloud/grpc/interceptors"
|
||||||
|
"go.linka.cloud/grpc/interceptors/metadata"
|
||||||
|
)
|
||||||
|
|
||||||
|
func md(opts *options) interceptors.Interceptors {
|
||||||
|
var pairs []string
|
||||||
|
if opts.name != "" {
|
||||||
|
pairs = append(pairs, "grpc-service-name", opts.name)
|
||||||
|
}
|
||||||
|
if opts.version != "" {
|
||||||
|
pairs = append(pairs, "grpc-service-version", opts.version)
|
||||||
|
}
|
||||||
|
if len(pairs) != 0 {
|
||||||
|
return metadata.NewInterceptors(pairs...)
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
}
|
@ -27,7 +27,6 @@ import (
|
|||||||
"google.golang.org/grpc/health/grpc_health_v1"
|
"google.golang.org/grpc/health/grpc_health_v1"
|
||||||
greflect "google.golang.org/grpc/reflection"
|
greflect "google.golang.org/grpc/reflection"
|
||||||
|
|
||||||
"go.linka.cloud/grpc/interceptors/metadata"
|
|
||||||
"go.linka.cloud/grpc/logger"
|
"go.linka.cloud/grpc/logger"
|
||||||
"go.linka.cloud/grpc/registry"
|
"go.linka.cloud/grpc/registry"
|
||||||
"go.linka.cloud/grpc/registry/noop"
|
"go.linka.cloud/grpc/registry/noop"
|
||||||
@ -76,20 +75,15 @@ func newService(opts ...Option) (*service, error) {
|
|||||||
for _, f := range opts {
|
for _, f := range opts {
|
||||||
f(s.opts)
|
f(s.opts)
|
||||||
}
|
}
|
||||||
if s.opts.name != "" {
|
|
||||||
i := metadata.NewInterceptors("grpc-service-name", s.opts.name)
|
md := md(s.opts)
|
||||||
s.opts.unaryServerInterceptors = append([]grpc.UnaryServerInterceptor{i.UnaryServerInterceptor()}, s.opts.unaryServerInterceptors...)
|
if md != nil {
|
||||||
s.opts.unaryClientInterceptors = append([]grpc.UnaryClientInterceptor{i.UnaryClientInterceptor()}, s.opts.unaryClientInterceptors...)
|
s.opts.unaryServerInterceptors = append([]grpc.UnaryServerInterceptor{md.UnaryServerInterceptor()}, s.opts.unaryServerInterceptors...)
|
||||||
s.opts.streamServerInterceptors = append([]grpc.StreamServerInterceptor{i.StreamServerInterceptor()}, s.opts.streamServerInterceptors...)
|
s.opts.unaryClientInterceptors = append([]grpc.UnaryClientInterceptor{md.UnaryClientInterceptor()}, s.opts.unaryClientInterceptors...)
|
||||||
s.opts.streamClientInterceptors = append([]grpc.StreamClientInterceptor{i.StreamClientInterceptor()}, s.opts.streamClientInterceptors...)
|
s.opts.streamServerInterceptors = append([]grpc.StreamServerInterceptor{md.StreamServerInterceptor()}, s.opts.streamServerInterceptors...)
|
||||||
}
|
s.opts.streamClientInterceptors = append([]grpc.StreamClientInterceptor{md.StreamClientInterceptor()}, s.opts.streamClientInterceptors...)
|
||||||
if s.opts.version != "" {
|
|
||||||
i := metadata.NewInterceptors("grpc-service-version", s.opts.version)
|
|
||||||
s.opts.unaryServerInterceptors = append([]grpc.UnaryServerInterceptor{i.UnaryServerInterceptor()}, s.opts.unaryServerInterceptors...)
|
|
||||||
s.opts.unaryClientInterceptors = append([]grpc.UnaryClientInterceptor{i.UnaryClientInterceptor()}, s.opts.unaryClientInterceptors...)
|
|
||||||
s.opts.streamServerInterceptors = append([]grpc.StreamServerInterceptor{i.StreamServerInterceptor()}, s.opts.streamServerInterceptors...)
|
|
||||||
s.opts.streamClientInterceptors = append([]grpc.StreamClientInterceptor{i.StreamClientInterceptor()}, s.opts.streamClientInterceptors...)
|
|
||||||
}
|
}
|
||||||
|
|
||||||
if s.opts.mux == nil {
|
if s.opts.mux == nil {
|
||||||
s.opts.mux = http.NewServeMux()
|
s.opts.mux = http.NewServeMux()
|
||||||
}
|
}
|
||||||
|
Loading…
Reference in New Issue
Block a user