// Copyright 2017 Michal Witkowski. All Rights Reserved. // See LICENSE for licensing terms. package proxy_test import ( "context" "fmt" "io" "net" "strings" "testing" "time" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" "github.com/stretchr/testify/suite" "google.golang.org/grpc" "google.golang.org/grpc/codes" "google.golang.org/grpc/metadata" "google.golang.org/grpc/status" "google.golang.org/protobuf/proto" "google.golang.org/protobuf/types/known/emptypb" "go.linka.cloud/grpc-toolkit/proxy" pb "go.linka.cloud/grpc-toolkit/proxy/testservice" ) const ( pingDefaultValue = "I like kittens." clientMdKey = "test-client-header" serverHeaderMdKey = "test-client-header" serverTrailerMdKey = "test-client-trailer" rejectingMdKey = "test-reject-rpc-if-in-context" countListResponses = 20 ) // asserting service is implemented on the server side and serves as a handler for stuff type assertingService struct { t *testing.T pb.UnsafeTestServiceServer } var _ pb.TestServiceServer = (*assertingService)(nil) func (s *assertingService) PingEmpty(ctx context.Context, _ *emptypb.Empty) (*pb.PingResponse, error) { // Check that this call has client's metadata. md, ok := metadata.FromIncomingContext(ctx) assert.True(s.t, ok, "PingEmpty call must have metadata in context") _, ok = md[clientMdKey] assert.True(s.t, ok, "PingEmpty call must have clients's custom headers in metadata") return &pb.PingResponse{Value: pingDefaultValue, Counter: 42}, nil } func (s *assertingService) Ping(ctx context.Context, ping *pb.PingRequest) (*pb.PingResponse, error) { // Send user trailers and headers. grpc.SendHeader(ctx, metadata.Pairs(serverHeaderMdKey, "I like turtles.")) grpc.SetTrailer(ctx, metadata.Pairs(serverTrailerMdKey, "I like ending turtles.")) return &pb.PingResponse{Value: ping.Value, Counter: 42}, nil } func (s *assertingService) PingError(ctx context.Context, ping *pb.PingRequest) (*emptypb.Empty, error) { return nil, status.Errorf(codes.FailedPrecondition, "Userspace error.") } func (s *assertingService) PingList(ping *pb.PingRequest, stream pb.TestService_PingListServer) error { // Send user trailers and headers. stream.SendHeader(metadata.Pairs(serverHeaderMdKey, "I like turtles.")) for i := 0; i < countListResponses; i++ { stream.Send(&pb.PingResponse{Value: ping.Value, Counter: int32(i)}) } stream.SetTrailer(metadata.Pairs(serverTrailerMdKey, "I like ending turtles.")) return nil } func (s *assertingService) PingStream(stream pb.TestService_PingStreamServer) error { stream.SendHeader(metadata.Pairs(serverHeaderMdKey, "I like turtles.")) counter := int32(0) for { ping, err := stream.Recv() if err == io.EOF { break } else if err != nil { require.NoError(s.t, err, "can't fail reading stream") return err } pong := &pb.PingResponse{Value: ping.Value, Counter: counter} if err := stream.Send(pong); err != nil { require.NoError(s.t, err, "can't fail sending back a pong") } counter += 1 } stream.SetTrailer(metadata.Pairs(serverTrailerMdKey, "I like ending turtles.")) return nil } // ProxyHappySuite tests the "happy" path of handling: that everything works in absence of connection issues. type ProxyHappySuite struct { suite.Suite serverListener net.Listener server *grpc.Server proxyListener net.Listener proxy *grpc.Server serverClientConn *grpc.ClientConn client *grpc.ClientConn testClient pb.TestServiceClient } func (s *ProxyHappySuite) TestPingEmptyCarriesClientMetadata() { ctx := metadata.NewOutgoingContext(context.Background(), metadata.Pairs(clientMdKey, "true")) out, err := s.testClient.PingEmpty(ctx, &emptypb.Empty{}) require.NoError(s.T(), err, "PingEmpty should succeed without errors") want := &pb.PingResponse{Value: pingDefaultValue, Counter: 42} require.True(s.T(), proto.Equal(want, out)) } func (s *ProxyHappySuite) TestPingEmpty_StressTest() { for i := 0; i < 50; i++ { s.TestPingEmptyCarriesClientMetadata() } } func (s *ProxyHappySuite) TestPingCarriesServerHeadersAndTrailers() { headerMd := make(metadata.MD) trailerMd := make(metadata.MD) // This is an awkward calling convention... but meh. out, err := s.testClient.Ping(context.Background(), &pb.PingRequest{Value: "foo"}, grpc.Header(&headerMd), grpc.Trailer(&trailerMd)) want := &pb.PingResponse{Value: "foo", Counter: 42} require.NoError(s.T(), err, "Ping should succeed without errors") require.True(s.T(), proto.Equal(want, out)) assert.Contains(s.T(), headerMd, serverHeaderMdKey, "server response headers must contain server data") assert.Len(s.T(), trailerMd, 1, "server response trailers must contain server data") } func (s *ProxyHappySuite) TestPingErrorPropagatesAppError() { _, err := s.testClient.PingError(context.Background(), &pb.PingRequest{Value: "foo"}) require.Error(s.T(), err, "PingError should never succeed") assert.Equal(s.T(), codes.FailedPrecondition, status.Code(err)) assert.Equal(s.T(), "Userspace error.", status.Convert(err).Message()) } func (s *ProxyHappySuite) TestDirectorErrorIsPropagated() { // See SetupSuite where the StreamDirector has a special case. ctx := metadata.NewOutgoingContext(context.Background(), metadata.Pairs(rejectingMdKey, "true")) _, err := s.testClient.Ping(ctx, &pb.PingRequest{Value: "foo"}) require.Error(s.T(), err, "Director should reject this RPC") assert.Equal(s.T(), codes.PermissionDenied, status.Code(err)) assert.Equal(s.T(), "testing rejection", status.Convert(err).Message()) } func (s *ProxyHappySuite) TestPingStream_FullDuplexWorks() { stream, err := s.testClient.PingStream(context.Background()) require.NoError(s.T(), err, "PingStream request should be successful.") for i := 0; i < countListResponses; i++ { ping := &pb.PingRequest{Value: fmt.Sprintf("foo:%d", i)} require.NoError(s.T(), stream.Send(ping), "sending to PingStream must not fail") resp, err := stream.Recv() if err == io.EOF { break } if i == 0 { // Check that the header arrives before all entries. headerMd, err := stream.Header() require.NoError(s.T(), err, "PingStream headers should not error.") assert.Contains(s.T(), headerMd, serverHeaderMdKey, "PingStream response headers user contain metadata") } assert.EqualValues(s.T(), i, resp.Counter, "ping roundtrip must succeed with the correct id") } require.NoError(s.T(), stream.CloseSend(), "no error on close send") _, err = stream.Recv() require.Equal(s.T(), io.EOF, err, "stream should close with io.EOF, meaining OK") // Check that the trailer headers are here. trailerMd := stream.Trailer() assert.Len(s.T(), trailerMd, 1, "PingList trailer headers user contain metadata") } func (s *ProxyHappySuite) TestPingStream_StressTest() { for i := 0; i < 50; i++ { s.TestPingStream_FullDuplexWorks() } } func (s *ProxyHappySuite) SetupSuite() { var err error s.proxyListener, err = net.Listen("tcp", "127.0.0.1:0") require.NoError(s.T(), err, "must be able to allocate a port for proxyListener") s.serverListener, err = net.Listen("tcp", "127.0.0.1:0") require.NoError(s.T(), err, "must be able to allocate a port for serverListener") s.server = grpc.NewServer() pb.RegisterTestServiceServer(s.server, &assertingService{t: s.T()}) // Setup of the proxy's Director. //lint:ignore SA1019 regression test s.serverClientConn, err = grpc.Dial(s.serverListener.Addr().String(), grpc.WithInsecure(), grpc.WithCodec(proxy.Codec())) require.NoError(s.T(), err, "must not error on deferred client Dial") director := func(ctx context.Context, fullName string) (context.Context, grpc.ClientConnInterface, error) { md, ok := metadata.FromIncomingContext(ctx) if ok { if _, exists := md[rejectingMdKey]; exists { return ctx, nil, status.Errorf(codes.PermissionDenied, "testing rejection") } } // Explicitly copy the metadata, otherwise the tests will fail. outCtx := metadata.NewOutgoingContext(ctx, md.Copy()) return outCtx, s.serverClientConn, nil } s.proxy = grpc.NewServer( //lint:ignore SA1019 regression test grpc.CustomCodec(proxy.Codec()), grpc.UnknownServiceHandler(proxy.TransparentHandler(director)), ) // Ping handler is handled as an explicit registration and not as a TransparentHandler. proxy.RegisterService(s.proxy, director, "mwitkow.testproto.TestService", "Ping") // Start the serving loops. s.T().Logf("starting grpc.Server at: %v", s.serverListener.Addr().String()) go func() { s.server.Serve(s.serverListener) }() s.T().Logf("starting grpc.Proxy at: %v", s.proxyListener.Addr().String()) go func() { s.proxy.Serve(s.proxyListener) }() dCtx, ccl := context.WithTimeout(context.Background(), time.Second) defer ccl() clientConn, err := grpc.DialContext(dCtx, strings.Replace(s.proxyListener.Addr().String(), "127.0.0.1", "localhost", 1), grpc.WithInsecure()) require.NoError(s.T(), err, "must not error on deferred client Dial") s.testClient = pb.NewTestServiceClient(clientConn) } func (s *ProxyHappySuite) TearDownSuite() { if s.client != nil { s.client.Close() } if s.serverClientConn != nil { s.serverClientConn.Close() } // Close all transports so the logs don't get spammy. time.Sleep(10 * time.Millisecond) if s.proxy != nil { s.proxy.Stop() s.proxyListener.Close() } if s.serverListener != nil { s.server.Stop() s.serverListener.Close() } } func TestProxyHappySuite(t *testing.T) { suite.Run(t, &ProxyHappySuite{}) }