grpc/example/example.go

368 lines
10 KiB
Go

package main
import (
"context"
"crypto/tls"
"errors"
"fmt"
"io"
"net/http"
"strings"
"time"
"github.com/prometheus/client_golang/prometheus/promhttp"
"github.com/spf13/cobra"
"google.golang.org/grpc"
"google.golang.org/grpc/health/grpc_health_v1"
"google.golang.org/grpc/metadata"
greflectsvc "google.golang.org/grpc/reflection/grpc_reflection_v1alpha"
"google.golang.org/protobuf/proto"
"google.golang.org/protobuf/types/descriptorpb"
"go.linka.cloud/grpc/client"
"go.linka.cloud/grpc/interceptors/auth"
"go.linka.cloud/grpc/interceptors/ban"
"go.linka.cloud/grpc/interceptors/defaulter"
"go.linka.cloud/grpc/interceptors/iface"
metrics2 "go.linka.cloud/grpc/interceptors/metrics"
validation2 "go.linka.cloud/grpc/interceptors/validation"
"go.linka.cloud/grpc/logger"
"go.linka.cloud/grpc/service"
)
var (
_ iface.UnaryInterceptor = (*GreeterHandler)(nil)
_ iface.StreamInterceptor = (*GreeterHandler)(nil)
)
type GreeterHandler struct {
UnimplementedGreeterServer
}
func hello(name string) string {
return fmt.Sprintf("Hello %s !", name)
}
func (g *GreeterHandler) SayHello(ctx context.Context, req *HelloRequest) (*HelloReply, error) {
return &HelloReply{Message: hello(req.Name)}, nil
}
func (g *GreeterHandler) SayHelloStream(req *HelloStreamRequest, s Greeter_SayHelloStreamServer) error {
for i := int64(0); i < req.Count; i++ {
if err := s.Send(&HelloReply{Message: fmt.Sprintf("Hello %s (%d)!", req.Name, i+1)}); err != nil {
return err
}
// time.Sleep(time.Second)
}
return nil
}
func (g *GreeterHandler) UnaryServerInterceptor() grpc.UnaryServerInterceptor {
return func(ctx context.Context, req interface{}, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (resp interface{}, err error) {
logger.C(ctx).Infof("called service interface unary interceptor")
return handler(ctx, req)
}
}
func (g *GreeterHandler) StreamServerInterceptor() grpc.StreamServerInterceptor {
return func(srv interface{}, ss grpc.ServerStream, info *grpc.StreamServerInfo, handler grpc.StreamHandler) error {
logger.C(ss.Context()).Infof("called service interface stream interceptor")
return handler(srv, ss)
}
}
func httpLogger(next http.Handler) http.Handler {
return http.HandlerFunc(func(writer http.ResponseWriter, request *http.Request) {
start := time.Now()
log := logger.From(request.Context()).WithFields(
"method", request.Method,
"host", request.Host,
"path", request.URL.Path,
"remoteAddress", request.RemoteAddr,
)
next.ServeHTTP(writer, request)
log.WithField("duration", time.Since(start)).Info()
})
}
func main() {
f, opts := service.NewFlagSet()
cmd := &cobra.Command{
Use: "example",
Run: func(cmd *cobra.Command, args []string) {
run(opts)
},
}
cmd.Flags().AddFlagSet(f)
cmd.Execute()
}
func run(opts ...service.Option) {
name := "greeter"
version := "v0.0.1"
secure := true
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
log := logger.New().WithFields("service", name)
ctx = logger.Set(ctx, log)
done := make(chan struct{})
ready := make(chan struct{})
var svc service.Service
var err error
metrics := metrics2.DefaultInterceptors()
validation := validation2.NewInterceptors(true)
defaulter := defaulter.NewInterceptors()
address := "0.0.0.0:9991"
opts = append(opts, service.WithContext(ctx),
service.WithName(name),
service.WithVersion(version),
service.WithAddress(address),
// service.WithRegistry(mdns.NewRegistry()),
service.WithReflection(true),
service.WithSecure(secure),
service.WithAfterStart(func() error {
log.Info("Server listening on", svc.Options().Address())
close(ready)
return nil
}),
service.WithAfterStop(func() error {
log.Info("Stopping server")
close(done)
return nil
}),
service.WithGateway(RegisterGreeterHandler),
service.WithGatewayPrefix("/rest"),
service.WithGRPCWeb(true),
service.WithGRPCWebPrefix("/grpc"),
service.WithMiddlewares(httpLogger),
service.WithInterceptors(metrics),
service.WithServerInterceptors(
ban.NewInterceptors(ban.WithDefaultJailDuration(time.Second), ban.WithDefaultCallback(func(action ban.Action, actor string, rule *ban.Rule) error {
log.WithFields("action", action, "actor", actor, "rule", rule.Name).Info("ban callback")
return nil
})),
auth.NewServerInterceptors(auth.WithBasicValidators(func(ctx context.Context, user, password string) (context.Context, error) {
if !auth.Equals(user, "admin") || !auth.Equals(password, "admin") {
return ctx, fmt.Errorf("invalid user or password")
}
log.Infof("request authenticated")
return ctx, nil
})),
),
service.WithInterceptors(defaulter, validation),
// enable server interface interceptor
service.WithServerInterceptors(iface.New()),
)
svc, err = service.New(opts...)
if err != nil {
panic(err)
}
RegisterGreeterServer(svc, &GreeterHandler{})
metrics.EnableHandlingTimeHistogram()
metrics.Register(svc)
go func() {
if err := svc.Start(); err != nil {
panic(err)
}
}()
go func() {
http.Handle("/metrics", promhttp.Handler())
if err := http.ListenAndServe(":9992", nil); err != nil {
panic(err)
}
}()
<-ready
copts := []client.Option{
// client.WithName(name),
// client.WithVersion(version),
client.WithAddress("localhost:9991"),
// client.WithRegistry(mdns.NewRegistry()),
client.WithSecure(secure),
client.WithUnaryInterceptors(func(ctx context.Context, method string, req, reply interface{}, cc *grpc.ClientConn, invoker grpc.UnaryInvoker, opts ...grpc.CallOption) error {
logger.From(ctx).WithFields("party", "client", "method", method).Info(req)
return invoker(ctx, method, req, reply, cc, opts...)
}),
}
s, err := client.New(copts...)
if err != nil {
log.Fatal(err)
}
g := NewGreeterClient(s)
h := grpc_health_v1.NewHealthClient(s)
for i := 0; i < 5; i++ {
_, err := h.Check(ctx, &grpc_health_v1.HealthCheckRequest{})
if err != nil {
log.Error(err)
} else {
log.Fatalf("expected error")
}
}
log.Infof("waiting for unban")
time.Sleep(time.Second)
s, err = client.New(append(copts, client.WithInterceptors(auth.NewBasicAuthClientIntereptors("admin", "admin")))...)
if err != nil {
log.Fatal(err)
}
g = NewGreeterClient(s)
h = grpc_health_v1.NewHealthClient(s)
hres, err := h.Check(ctx, &grpc_health_v1.HealthCheckRequest{Service: "helloworld.Greeter"})
if err != nil {
log.Fatal(err)
}
log.Infof("status: %v", hres.Status)
md := metadata.MD{}
res, err := g.SayHello(ctx, &HelloRequest{Name: "test"}, grpc.Header(&md))
if err != nil {
log.Fatal(err)
}
logMetadata(ctx, md)
log.Infof("received message: %s", res.Message)
md = metadata.MD{}
res, err = g.SayHello(ctx, &HelloRequest{}, grpc.Header(&md))
if err == nil {
log.Fatal("expected validation error")
}
logMetadata(ctx, md)
stream, err := g.SayHelloStream(ctx, &HelloStreamRequest{Name: "test"}, grpc.Header(&md))
if err != nil {
log.Fatal(err)
}
if md, err := stream.Header(); err == nil {
logMetadata(ctx, md)
}
for {
m, err := stream.Recv()
if errors.Is(err, io.EOF) {
break
}
if err != nil {
log.Fatal(err)
}
log.Infof("received stream message: %s", m.Message)
}
scheme := "http://"
if secure {
scheme = "https://"
}
httpc := &http.Client{
Transport: &http.Transport{
TLSClientConfig: &tls.Config{
InsecureSkipVerify: true,
},
},
}
req := `{"name":"test"}`
do := func(url, contentType string) {
req, err := http.NewRequest(http.MethodPost, url, strings.NewReader(req))
if err != nil {
log.Fatal(err)
}
req.Header.Set("content-type", contentType)
req.Header.Set("authorization", auth.BasicAuth("admin", "admin"))
resp, err := httpc.Do(req)
if err != nil {
log.Fatal(err)
}
defer resp.Body.Close()
b, err := io.ReadAll(resp.Body)
if err != nil {
log.Fatal(err)
}
log.Info(string(b))
}
do(scheme+address+"/rest/api/v1/greeter/hello", "application/json")
do(scheme+address+"/grpc/helloworld.Greeter/SayHello", "application/grpc-web+json")
if err := readSvcs(ctx, s); err != nil {
log.Fatal(err)
}
cancel()
<-done
}
func logMetadata(ctx context.Context, md metadata.MD) {
log := logger.From(ctx)
for k, v := range md {
log.Infof("%s: %v", k, v)
}
}
func readSvcs(ctx context.Context, c client.Client) (err error) {
log := logger.From(ctx)
rc := greflectsvc.NewServerReflectionClient(c)
rstream, err := rc.ServerReflectionInfo(ctx)
if err != nil {
return err
}
defer func() {
if err2 := rstream.CloseSend(); err2 != nil && err == nil {
err = err2
}
}()
if err = rstream.Send(&greflectsvc.ServerReflectionRequest{MessageRequest: &greflectsvc.ServerReflectionRequest_ListServices{}}); err != nil {
return err
}
var rres *greflectsvc.ServerReflectionResponse
rres, err = rstream.Recv()
if err != nil {
return err
}
rlist, ok := rres.MessageResponse.(*greflectsvc.ServerReflectionResponse_ListServicesResponse)
if !ok {
return fmt.Errorf("unexpected reflection response type: %T", rres.MessageResponse)
}
for _, v := range rlist.ListServicesResponse.Service {
if v.Name == "grpc.reflection.v1alpha.ServerReflection" {
continue
}
parts := strings.Split(v.Name, ".")
if len(parts) < 2 {
return fmt.Errorf("malformed service name: %s", v.Name)
}
pkg := strings.Join(parts[:len(parts)-1], ".")
svc := parts[len(parts)-1]
if err = rstream.Send(&greflectsvc.ServerReflectionRequest{MessageRequest: &greflectsvc.ServerReflectionRequest_FileContainingSymbol{
FileContainingSymbol: v.Name,
}}); err != nil {
return err
}
rres, err = rstream.Recv()
if err != nil {
log.Fatal(err)
}
rfile, ok := rres.MessageResponse.(*greflectsvc.ServerReflectionResponse_FileDescriptorResponse)
if !ok {
return fmt.Errorf("unexpected reflection response type: %T", rres.MessageResponse)
}
fdps := make(map[string]*descriptorpb.DescriptorProto)
var sdp *descriptorpb.ServiceDescriptorProto
for _, v := range rfile.FileDescriptorResponse.FileDescriptorProto {
fdp := &descriptorpb.FileDescriptorProto{}
if err = proto.Unmarshal(v, fdp); err != nil {
return err
}
for _, s := range fdp.GetService() {
if fdp.GetPackage() == pkg && s.GetName() == svc {
if sdp != nil {
log.Warnf("service already found: %s.%s", fdp.GetPackage(), s.GetName())
continue
}
sdp = s
}
}
for _, m := range fdp.GetMessageType() {
fdps[fdp.GetPackage()+"."+m.GetName()] = m
}
}
if sdp == nil {
return fmt.Errorf("%s: service not found", v.Name)
}
for _, m := range sdp.GetMethod() {
log.Infof("%s: %s", v.Name, m.GetName())
}
}
return nil
}