grpc/proxy/handler_test.go

263 lines
9.3 KiB
Go
Raw Normal View History

// Copyright 2017 Michal Witkowski. All Rights Reserved.
// See LICENSE for licensing terms.
package proxy_test
import (
"context"
"fmt"
"io"
"net"
"strings"
"testing"
"time"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"github.com/stretchr/testify/suite"
"google.golang.org/grpc"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/metadata"
"google.golang.org/grpc/status"
"google.golang.org/protobuf/proto"
"google.golang.org/protobuf/types/known/emptypb"
"go.linka.cloud/grpc-toolkit/proxy"
pb "go.linka.cloud/grpc-toolkit/proxy/testservice"
)
const (
pingDefaultValue = "I like kittens."
clientMdKey = "test-client-header"
serverHeaderMdKey = "test-client-header"
serverTrailerMdKey = "test-client-trailer"
rejectingMdKey = "test-reject-rpc-if-in-context"
countListResponses = 20
)
// asserting service is implemented on the server side and serves as a handler for stuff
type assertingService struct {
t *testing.T
pb.UnsafeTestServiceServer
}
var _ pb.TestServiceServer = (*assertingService)(nil)
func (s *assertingService) PingEmpty(ctx context.Context, _ *emptypb.Empty) (*pb.PingResponse, error) {
// Check that this call has client's metadata.
md, ok := metadata.FromIncomingContext(ctx)
assert.True(s.t, ok, "PingEmpty call must have metadata in context")
_, ok = md[clientMdKey]
assert.True(s.t, ok, "PingEmpty call must have clients's custom headers in metadata")
return &pb.PingResponse{Value: pingDefaultValue, Counter: 42}, nil
}
func (s *assertingService) Ping(ctx context.Context, ping *pb.PingRequest) (*pb.PingResponse, error) {
// Send user trailers and headers.
grpc.SendHeader(ctx, metadata.Pairs(serverHeaderMdKey, "I like turtles."))
grpc.SetTrailer(ctx, metadata.Pairs(serverTrailerMdKey, "I like ending turtles."))
return &pb.PingResponse{Value: ping.Value, Counter: 42}, nil
}
func (s *assertingService) PingError(ctx context.Context, ping *pb.PingRequest) (*emptypb.Empty, error) {
return nil, status.Errorf(codes.FailedPrecondition, "Userspace error.")
}
func (s *assertingService) PingList(ping *pb.PingRequest, stream pb.TestService_PingListServer) error {
// Send user trailers and headers.
stream.SendHeader(metadata.Pairs(serverHeaderMdKey, "I like turtles."))
for i := 0; i < countListResponses; i++ {
stream.Send(&pb.PingResponse{Value: ping.Value, Counter: int32(i)})
}
stream.SetTrailer(metadata.Pairs(serverTrailerMdKey, "I like ending turtles."))
return nil
}
func (s *assertingService) PingStream(stream pb.TestService_PingStreamServer) error {
stream.SendHeader(metadata.Pairs(serverHeaderMdKey, "I like turtles."))
counter := int32(0)
for {
ping, err := stream.Recv()
if err == io.EOF {
break
} else if err != nil {
require.NoError(s.t, err, "can't fail reading stream")
return err
}
pong := &pb.PingResponse{Value: ping.Value, Counter: counter}
if err := stream.Send(pong); err != nil {
require.NoError(s.t, err, "can't fail sending back a pong")
}
counter += 1
}
stream.SetTrailer(metadata.Pairs(serverTrailerMdKey, "I like ending turtles."))
return nil
}
// ProxyHappySuite tests the "happy" path of handling: that everything works in absence of connection issues.
type ProxyHappySuite struct {
suite.Suite
serverListener net.Listener
server *grpc.Server
proxyListener net.Listener
proxy *grpc.Server
serverClientConn *grpc.ClientConn
client *grpc.ClientConn
testClient pb.TestServiceClient
}
func (s *ProxyHappySuite) TestPingEmptyCarriesClientMetadata() {
ctx := metadata.NewOutgoingContext(context.Background(), metadata.Pairs(clientMdKey, "true"))
out, err := s.testClient.PingEmpty(ctx, &emptypb.Empty{})
require.NoError(s.T(), err, "PingEmpty should succeed without errors")
want := &pb.PingResponse{Value: pingDefaultValue, Counter: 42}
require.True(s.T(), proto.Equal(want, out))
}
func (s *ProxyHappySuite) TestPingEmpty_StressTest() {
for i := 0; i < 50; i++ {
s.TestPingEmptyCarriesClientMetadata()
}
}
func (s *ProxyHappySuite) TestPingCarriesServerHeadersAndTrailers() {
headerMd := make(metadata.MD)
trailerMd := make(metadata.MD)
// This is an awkward calling convention... but meh.
out, err := s.testClient.Ping(context.Background(), &pb.PingRequest{Value: "foo"}, grpc.Header(&headerMd), grpc.Trailer(&trailerMd))
want := &pb.PingResponse{Value: "foo", Counter: 42}
require.NoError(s.T(), err, "Ping should succeed without errors")
require.True(s.T(), proto.Equal(want, out))
assert.Contains(s.T(), headerMd, serverHeaderMdKey, "server response headers must contain server data")
assert.Len(s.T(), trailerMd, 1, "server response trailers must contain server data")
}
func (s *ProxyHappySuite) TestPingErrorPropagatesAppError() {
_, err := s.testClient.PingError(context.Background(), &pb.PingRequest{Value: "foo"})
require.Error(s.T(), err, "PingError should never succeed")
assert.Equal(s.T(), codes.FailedPrecondition, status.Code(err))
assert.Equal(s.T(), "Userspace error.", status.Convert(err).Message())
}
func (s *ProxyHappySuite) TestDirectorErrorIsPropagated() {
// See SetupSuite where the StreamDirector has a special case.
ctx := metadata.NewOutgoingContext(context.Background(), metadata.Pairs(rejectingMdKey, "true"))
_, err := s.testClient.Ping(ctx, &pb.PingRequest{Value: "foo"})
require.Error(s.T(), err, "Director should reject this RPC")
assert.Equal(s.T(), codes.PermissionDenied, status.Code(err))
assert.Equal(s.T(), "testing rejection", status.Convert(err).Message())
}
func (s *ProxyHappySuite) TestPingStream_FullDuplexWorks() {
stream, err := s.testClient.PingStream(context.Background())
require.NoError(s.T(), err, "PingStream request should be successful.")
for i := 0; i < countListResponses; i++ {
ping := &pb.PingRequest{Value: fmt.Sprintf("foo:%d", i)}
require.NoError(s.T(), stream.Send(ping), "sending to PingStream must not fail")
resp, err := stream.Recv()
if err == io.EOF {
break
}
if i == 0 {
// Check that the header arrives before all entries.
headerMd, err := stream.Header()
require.NoError(s.T(), err, "PingStream headers should not error.")
assert.Contains(s.T(), headerMd, serverHeaderMdKey, "PingStream response headers user contain metadata")
}
assert.EqualValues(s.T(), i, resp.Counter, "ping roundtrip must succeed with the correct id")
}
require.NoError(s.T(), stream.CloseSend(), "no error on close send")
_, err = stream.Recv()
require.Equal(s.T(), io.EOF, err, "stream should close with io.EOF, meaining OK")
// Check that the trailer headers are here.
trailerMd := stream.Trailer()
assert.Len(s.T(), trailerMd, 1, "PingList trailer headers user contain metadata")
}
func (s *ProxyHappySuite) TestPingStream_StressTest() {
for i := 0; i < 50; i++ {
s.TestPingStream_FullDuplexWorks()
}
}
func (s *ProxyHappySuite) SetupSuite() {
var err error
s.proxyListener, err = net.Listen("tcp", "127.0.0.1:0")
require.NoError(s.T(), err, "must be able to allocate a port for proxyListener")
s.serverListener, err = net.Listen("tcp", "127.0.0.1:0")
require.NoError(s.T(), err, "must be able to allocate a port for serverListener")
s.server = grpc.NewServer()
pb.RegisterTestServiceServer(s.server, &assertingService{t: s.T()})
// Setup of the proxy's Director.
//lint:ignore SA1019 regression test
s.serverClientConn, err = grpc.Dial(s.serverListener.Addr().String(), grpc.WithInsecure(), grpc.WithCodec(proxy.Codec()))
require.NoError(s.T(), err, "must not error on deferred client Dial")
director := func(ctx context.Context, fullName string) (context.Context, grpc.ClientConnInterface, error) {
md, ok := metadata.FromIncomingContext(ctx)
if ok {
if _, exists := md[rejectingMdKey]; exists {
return ctx, nil, status.Errorf(codes.PermissionDenied, "testing rejection")
}
}
// Explicitly copy the metadata, otherwise the tests will fail.
outCtx := metadata.NewOutgoingContext(ctx, md.Copy())
return outCtx, s.serverClientConn, nil
}
s.proxy = grpc.NewServer(
//lint:ignore SA1019 regression test
grpc.CustomCodec(proxy.Codec()),
grpc.UnknownServiceHandler(proxy.TransparentHandler(director)),
)
// Ping handler is handled as an explicit registration and not as a TransparentHandler.
proxy.RegisterService(s.proxy, director,
"mwitkow.testproto.TestService",
"Ping")
// Start the serving loops.
s.T().Logf("starting grpc.Server at: %v", s.serverListener.Addr().String())
go func() {
s.server.Serve(s.serverListener)
}()
s.T().Logf("starting grpc.Proxy at: %v", s.proxyListener.Addr().String())
go func() {
s.proxy.Serve(s.proxyListener)
}()
dCtx, ccl := context.WithTimeout(context.Background(), time.Second)
defer ccl()
clientConn, err := grpc.DialContext(dCtx, strings.Replace(s.proxyListener.Addr().String(), "127.0.0.1", "localhost", 1), grpc.WithInsecure())
require.NoError(s.T(), err, "must not error on deferred client Dial")
s.testClient = pb.NewTestServiceClient(clientConn)
}
func (s *ProxyHappySuite) TearDownSuite() {
if s.client != nil {
s.client.Close()
}
if s.serverClientConn != nil {
s.serverClientConn.Close()
}
// Close all transports so the logs don't get spammy.
time.Sleep(10 * time.Millisecond)
if s.proxy != nil {
s.proxy.Stop()
s.proxyListener.Close()
}
if s.serverListener != nil {
s.server.Stop()
s.serverListener.Close()
}
}
func TestProxyHappySuite(t *testing.T) {
suite.Run(t, &ProxyHappySuite{})
}