update grpc, service: implements reflection.GRPCService

Signed-off-by: Adphi <philippe.adrien.nousse@gmail.com>
This commit is contained in:
2022-03-11 13:42:02 +01:00
parent 08a67bf162
commit 2832f13f3c
5 changed files with 105 additions and 144 deletions

View File

@ -34,13 +34,13 @@ import (
)
type Service interface {
greflect.GRPCServer
Options() Options
DB() *gorm.DB
Start() error
Stop() error
Close() error
RegisterService(desc *grpc.ServiceDesc, impl interface{})
}
func New(opts ...Option) (Service, error) {
@ -56,7 +56,8 @@ type service struct {
running bool
// inproc Channel is used to serve grpc gateway
inproc *inprocgrpc.Channel
inproc *inprocgrpc.Channel
services map[string]*serviceInfo
id string
regSvc *registry.Service
@ -65,9 +66,10 @@ type service struct {
func newService(opts ...Option) (*service, error) {
s := &service{
opts: NewOptions(),
id: uuid.New().String(),
inproc: &inprocgrpc.Channel{},
opts: NewOptions(),
id: uuid.New().String(),
inproc: &inprocgrpc.Channel{},
services: make(map[string]*serviceInfo),
}
s.mu.Lock()
defer s.mu.Unlock()
@ -126,7 +128,7 @@ func newService(opts ...Option) (*service, error) {
greflect.Register(s.server)
}
if s.opts.health {
grpc_health_v1.RegisterHealthServer(s, health.NewServer())
s.registerService(&grpc_health_v1.Health_ServiceDesc, health.NewServer())
}
if err := s.gateway(s.opts.gatewayOpts...); err != nil {
return nil, err
@ -292,8 +294,70 @@ func (s *service) Stop() error {
}
func (s *service) RegisterService(desc *grpc.ServiceDesc, impl interface{}) {
s.server.RegisterService(desc, impl)
s.inproc.RegisterService(desc, impl)
s.mu.Lock()
defer s.mu.Unlock()
s.registerService(desc, impl)
}
// serviceInfo wraps information about a service. It is very similar to
// ServiceDesc and is constructed from it for internal purposes.
type serviceInfo struct {
// Contains the implementation for the methods in this service.
serviceImpl interface{}
methods map[string]*grpc.MethodDesc
streams map[string]*grpc.StreamDesc
mdata interface{}
}
func (s *service) registerService(sd *grpc.ServiceDesc, ss interface{}) {
s.server.RegisterService(sd, ss)
s.inproc.RegisterService(sd, ss)
if _, ok := s.services[sd.ServiceName]; ok {
logrus.Fatalf("grpc: Service.RegisterService found duplicate service registration for %q", sd.ServiceName)
}
info := &serviceInfo{
serviceImpl: ss,
methods: make(map[string]*grpc.MethodDesc),
streams: make(map[string]*grpc.StreamDesc),
mdata: sd.Metadata,
}
for i := range sd.Methods {
d := &sd.Methods[i]
info.methods[d.MethodName] = d
}
for i := range sd.Streams {
d := &sd.Streams[i]
info.streams[d.StreamName] = d
}
s.services[sd.ServiceName] = info
}
func (s *service) GetServiceInfo() map[string]grpc.ServiceInfo {
ret := make(map[string]grpc.ServiceInfo)
for n, srv := range s.services {
methods := make([]grpc.MethodInfo, 0, len(srv.methods)+len(srv.streams))
for m := range srv.methods {
methods = append(methods, grpc.MethodInfo{
Name: m,
IsClientStream: false,
IsServerStream: false,
})
}
for m, d := range srv.streams {
methods = append(methods, grpc.MethodInfo{
Name: m,
IsClientStream: d.ClientStreams,
IsServerStream: d.ServerStreams,
})
}
ret[n] = grpc.ServiceInfo{
Methods: methods,
Metadata: srv.mdata,
}
}
return ret
}
func (s *service) Close() error {