mirror of
https://github.com/linka-cloud/grpc.git
synced 2024-11-23 11:26:26 +00:00
263 lines
9.3 KiB
Go
263 lines
9.3 KiB
Go
|
// Copyright 2017 Michal Witkowski. All Rights Reserved.
|
||
|
// See LICENSE for licensing terms.
|
||
|
|
||
|
package proxy_test
|
||
|
|
||
|
import (
|
||
|
"context"
|
||
|
"fmt"
|
||
|
"io"
|
||
|
"net"
|
||
|
"strings"
|
||
|
"testing"
|
||
|
"time"
|
||
|
|
||
|
"github.com/stretchr/testify/assert"
|
||
|
"github.com/stretchr/testify/require"
|
||
|
"github.com/stretchr/testify/suite"
|
||
|
"google.golang.org/grpc"
|
||
|
"google.golang.org/grpc/codes"
|
||
|
"google.golang.org/grpc/metadata"
|
||
|
"google.golang.org/grpc/status"
|
||
|
"google.golang.org/protobuf/proto"
|
||
|
"google.golang.org/protobuf/types/known/emptypb"
|
||
|
|
||
|
"go.linka.cloud/grpc-toolkit/proxy"
|
||
|
pb "go.linka.cloud/grpc-toolkit/proxy/testservice"
|
||
|
)
|
||
|
|
||
|
const (
|
||
|
pingDefaultValue = "I like kittens."
|
||
|
clientMdKey = "test-client-header"
|
||
|
serverHeaderMdKey = "test-client-header"
|
||
|
serverTrailerMdKey = "test-client-trailer"
|
||
|
|
||
|
rejectingMdKey = "test-reject-rpc-if-in-context"
|
||
|
|
||
|
countListResponses = 20
|
||
|
)
|
||
|
|
||
|
// asserting service is implemented on the server side and serves as a handler for stuff
|
||
|
type assertingService struct {
|
||
|
t *testing.T
|
||
|
pb.UnsafeTestServiceServer
|
||
|
}
|
||
|
|
||
|
var _ pb.TestServiceServer = (*assertingService)(nil)
|
||
|
|
||
|
func (s *assertingService) PingEmpty(ctx context.Context, _ *emptypb.Empty) (*pb.PingResponse, error) {
|
||
|
// Check that this call has client's metadata.
|
||
|
md, ok := metadata.FromIncomingContext(ctx)
|
||
|
assert.True(s.t, ok, "PingEmpty call must have metadata in context")
|
||
|
_, ok = md[clientMdKey]
|
||
|
assert.True(s.t, ok, "PingEmpty call must have clients's custom headers in metadata")
|
||
|
return &pb.PingResponse{Value: pingDefaultValue, Counter: 42}, nil
|
||
|
}
|
||
|
|
||
|
func (s *assertingService) Ping(ctx context.Context, ping *pb.PingRequest) (*pb.PingResponse, error) {
|
||
|
// Send user trailers and headers.
|
||
|
grpc.SendHeader(ctx, metadata.Pairs(serverHeaderMdKey, "I like turtles."))
|
||
|
grpc.SetTrailer(ctx, metadata.Pairs(serverTrailerMdKey, "I like ending turtles."))
|
||
|
return &pb.PingResponse{Value: ping.Value, Counter: 42}, nil
|
||
|
}
|
||
|
|
||
|
func (s *assertingService) PingError(ctx context.Context, ping *pb.PingRequest) (*emptypb.Empty, error) {
|
||
|
return nil, status.Errorf(codes.FailedPrecondition, "Userspace error.")
|
||
|
}
|
||
|
|
||
|
func (s *assertingService) PingList(ping *pb.PingRequest, stream pb.TestService_PingListServer) error {
|
||
|
// Send user trailers and headers.
|
||
|
stream.SendHeader(metadata.Pairs(serverHeaderMdKey, "I like turtles."))
|
||
|
for i := 0; i < countListResponses; i++ {
|
||
|
stream.Send(&pb.PingResponse{Value: ping.Value, Counter: int32(i)})
|
||
|
}
|
||
|
stream.SetTrailer(metadata.Pairs(serverTrailerMdKey, "I like ending turtles."))
|
||
|
return nil
|
||
|
}
|
||
|
|
||
|
func (s *assertingService) PingStream(stream pb.TestService_PingStreamServer) error {
|
||
|
stream.SendHeader(metadata.Pairs(serverHeaderMdKey, "I like turtles."))
|
||
|
counter := int32(0)
|
||
|
for {
|
||
|
ping, err := stream.Recv()
|
||
|
if err == io.EOF {
|
||
|
break
|
||
|
} else if err != nil {
|
||
|
require.NoError(s.t, err, "can't fail reading stream")
|
||
|
return err
|
||
|
}
|
||
|
pong := &pb.PingResponse{Value: ping.Value, Counter: counter}
|
||
|
if err := stream.Send(pong); err != nil {
|
||
|
require.NoError(s.t, err, "can't fail sending back a pong")
|
||
|
}
|
||
|
counter += 1
|
||
|
}
|
||
|
stream.SetTrailer(metadata.Pairs(serverTrailerMdKey, "I like ending turtles."))
|
||
|
return nil
|
||
|
}
|
||
|
|
||
|
// ProxyHappySuite tests the "happy" path of handling: that everything works in absence of connection issues.
|
||
|
type ProxyHappySuite struct {
|
||
|
suite.Suite
|
||
|
|
||
|
serverListener net.Listener
|
||
|
server *grpc.Server
|
||
|
proxyListener net.Listener
|
||
|
proxy *grpc.Server
|
||
|
serverClientConn *grpc.ClientConn
|
||
|
|
||
|
client *grpc.ClientConn
|
||
|
testClient pb.TestServiceClient
|
||
|
}
|
||
|
|
||
|
func (s *ProxyHappySuite) TestPingEmptyCarriesClientMetadata() {
|
||
|
ctx := metadata.NewOutgoingContext(context.Background(), metadata.Pairs(clientMdKey, "true"))
|
||
|
out, err := s.testClient.PingEmpty(ctx, &emptypb.Empty{})
|
||
|
require.NoError(s.T(), err, "PingEmpty should succeed without errors")
|
||
|
want := &pb.PingResponse{Value: pingDefaultValue, Counter: 42}
|
||
|
require.True(s.T(), proto.Equal(want, out))
|
||
|
}
|
||
|
|
||
|
func (s *ProxyHappySuite) TestPingEmpty_StressTest() {
|
||
|
for i := 0; i < 50; i++ {
|
||
|
s.TestPingEmptyCarriesClientMetadata()
|
||
|
}
|
||
|
}
|
||
|
|
||
|
func (s *ProxyHappySuite) TestPingCarriesServerHeadersAndTrailers() {
|
||
|
headerMd := make(metadata.MD)
|
||
|
trailerMd := make(metadata.MD)
|
||
|
// This is an awkward calling convention... but meh.
|
||
|
out, err := s.testClient.Ping(context.Background(), &pb.PingRequest{Value: "foo"}, grpc.Header(&headerMd), grpc.Trailer(&trailerMd))
|
||
|
want := &pb.PingResponse{Value: "foo", Counter: 42}
|
||
|
require.NoError(s.T(), err, "Ping should succeed without errors")
|
||
|
require.True(s.T(), proto.Equal(want, out))
|
||
|
assert.Contains(s.T(), headerMd, serverHeaderMdKey, "server response headers must contain server data")
|
||
|
assert.Len(s.T(), trailerMd, 1, "server response trailers must contain server data")
|
||
|
}
|
||
|
|
||
|
func (s *ProxyHappySuite) TestPingErrorPropagatesAppError() {
|
||
|
_, err := s.testClient.PingError(context.Background(), &pb.PingRequest{Value: "foo"})
|
||
|
require.Error(s.T(), err, "PingError should never succeed")
|
||
|
assert.Equal(s.T(), codes.FailedPrecondition, status.Code(err))
|
||
|
assert.Equal(s.T(), "Userspace error.", status.Convert(err).Message())
|
||
|
}
|
||
|
|
||
|
func (s *ProxyHappySuite) TestDirectorErrorIsPropagated() {
|
||
|
// See SetupSuite where the StreamDirector has a special case.
|
||
|
ctx := metadata.NewOutgoingContext(context.Background(), metadata.Pairs(rejectingMdKey, "true"))
|
||
|
_, err := s.testClient.Ping(ctx, &pb.PingRequest{Value: "foo"})
|
||
|
require.Error(s.T(), err, "Director should reject this RPC")
|
||
|
assert.Equal(s.T(), codes.PermissionDenied, status.Code(err))
|
||
|
assert.Equal(s.T(), "testing rejection", status.Convert(err).Message())
|
||
|
}
|
||
|
|
||
|
func (s *ProxyHappySuite) TestPingStream_FullDuplexWorks() {
|
||
|
stream, err := s.testClient.PingStream(context.Background())
|
||
|
require.NoError(s.T(), err, "PingStream request should be successful.")
|
||
|
|
||
|
for i := 0; i < countListResponses; i++ {
|
||
|
ping := &pb.PingRequest{Value: fmt.Sprintf("foo:%d", i)}
|
||
|
require.NoError(s.T(), stream.Send(ping), "sending to PingStream must not fail")
|
||
|
resp, err := stream.Recv()
|
||
|
if err == io.EOF {
|
||
|
break
|
||
|
}
|
||
|
if i == 0 {
|
||
|
// Check that the header arrives before all entries.
|
||
|
headerMd, err := stream.Header()
|
||
|
require.NoError(s.T(), err, "PingStream headers should not error.")
|
||
|
assert.Contains(s.T(), headerMd, serverHeaderMdKey, "PingStream response headers user contain metadata")
|
||
|
}
|
||
|
assert.EqualValues(s.T(), i, resp.Counter, "ping roundtrip must succeed with the correct id")
|
||
|
}
|
||
|
require.NoError(s.T(), stream.CloseSend(), "no error on close send")
|
||
|
_, err = stream.Recv()
|
||
|
require.Equal(s.T(), io.EOF, err, "stream should close with io.EOF, meaining OK")
|
||
|
// Check that the trailer headers are here.
|
||
|
trailerMd := stream.Trailer()
|
||
|
assert.Len(s.T(), trailerMd, 1, "PingList trailer headers user contain metadata")
|
||
|
}
|
||
|
|
||
|
func (s *ProxyHappySuite) TestPingStream_StressTest() {
|
||
|
for i := 0; i < 50; i++ {
|
||
|
s.TestPingStream_FullDuplexWorks()
|
||
|
}
|
||
|
}
|
||
|
|
||
|
func (s *ProxyHappySuite) SetupSuite() {
|
||
|
var err error
|
||
|
|
||
|
s.proxyListener, err = net.Listen("tcp", "127.0.0.1:0")
|
||
|
require.NoError(s.T(), err, "must be able to allocate a port for proxyListener")
|
||
|
s.serverListener, err = net.Listen("tcp", "127.0.0.1:0")
|
||
|
require.NoError(s.T(), err, "must be able to allocate a port for serverListener")
|
||
|
|
||
|
s.server = grpc.NewServer()
|
||
|
pb.RegisterTestServiceServer(s.server, &assertingService{t: s.T()})
|
||
|
|
||
|
// Setup of the proxy's Director.
|
||
|
//lint:ignore SA1019 regression test
|
||
|
s.serverClientConn, err = grpc.Dial(s.serverListener.Addr().String(), grpc.WithInsecure(), grpc.WithCodec(proxy.Codec()))
|
||
|
require.NoError(s.T(), err, "must not error on deferred client Dial")
|
||
|
director := func(ctx context.Context, fullName string) (context.Context, grpc.ClientConnInterface, error) {
|
||
|
md, ok := metadata.FromIncomingContext(ctx)
|
||
|
if ok {
|
||
|
if _, exists := md[rejectingMdKey]; exists {
|
||
|
return ctx, nil, status.Errorf(codes.PermissionDenied, "testing rejection")
|
||
|
}
|
||
|
}
|
||
|
// Explicitly copy the metadata, otherwise the tests will fail.
|
||
|
outCtx := metadata.NewOutgoingContext(ctx, md.Copy())
|
||
|
return outCtx, s.serverClientConn, nil
|
||
|
}
|
||
|
s.proxy = grpc.NewServer(
|
||
|
//lint:ignore SA1019 regression test
|
||
|
grpc.CustomCodec(proxy.Codec()),
|
||
|
grpc.UnknownServiceHandler(proxy.TransparentHandler(director)),
|
||
|
)
|
||
|
// Ping handler is handled as an explicit registration and not as a TransparentHandler.
|
||
|
proxy.RegisterService(s.proxy, director,
|
||
|
"mwitkow.testproto.TestService",
|
||
|
"Ping")
|
||
|
|
||
|
// Start the serving loops.
|
||
|
s.T().Logf("starting grpc.Server at: %v", s.serverListener.Addr().String())
|
||
|
go func() {
|
||
|
s.server.Serve(s.serverListener)
|
||
|
}()
|
||
|
s.T().Logf("starting grpc.Proxy at: %v", s.proxyListener.Addr().String())
|
||
|
go func() {
|
||
|
s.proxy.Serve(s.proxyListener)
|
||
|
}()
|
||
|
|
||
|
dCtx, ccl := context.WithTimeout(context.Background(), time.Second)
|
||
|
defer ccl()
|
||
|
clientConn, err := grpc.DialContext(dCtx, strings.Replace(s.proxyListener.Addr().String(), "127.0.0.1", "localhost", 1), grpc.WithInsecure())
|
||
|
require.NoError(s.T(), err, "must not error on deferred client Dial")
|
||
|
s.testClient = pb.NewTestServiceClient(clientConn)
|
||
|
}
|
||
|
|
||
|
func (s *ProxyHappySuite) TearDownSuite() {
|
||
|
if s.client != nil {
|
||
|
s.client.Close()
|
||
|
}
|
||
|
if s.serverClientConn != nil {
|
||
|
s.serverClientConn.Close()
|
||
|
}
|
||
|
// Close all transports so the logs don't get spammy.
|
||
|
time.Sleep(10 * time.Millisecond)
|
||
|
if s.proxy != nil {
|
||
|
s.proxy.Stop()
|
||
|
s.proxyListener.Close()
|
||
|
}
|
||
|
if s.serverListener != nil {
|
||
|
s.server.Stop()
|
||
|
s.serverListener.Close()
|
||
|
}
|
||
|
}
|
||
|
|
||
|
func TestProxyHappySuite(t *testing.T) {
|
||
|
suite.Run(t, &ProxyHappySuite{})
|
||
|
}
|