微服务架构系列 - 框架篇:基于 RPC 和 HTTP 请求的客户端服务发现是如何实现的

- 6 mins

微服务架构系列(九)

介绍了基于 Consul 作为注册中心的 Go Micro 服务注册底层实现原理,今天我们来看看 Go Micro 中客户端服务发现是如何实现的。

客户端服务发现要复杂一些,涉及到服务发现 Registry 和节点选择 Selector 两部分。

所谓服务发现指的是当我们从客户端向指定服务发起请求时,可以通过名字识别服务,然后通过服务发现获取到包含 IP 地址和端口号的对应远程服务实例,远程服务会在启动时向注册中心注册,退出时注销,客户端无需关心这些细节,由 Go Micro 的 Registry 组件统一处理服务注册与发现逻辑(这里,我们基于 Consul 作为 Registry 的具体实现插件)。

而节点选择指的是,远程服务实例通常部署在多个节点上,通过指定的服务名称可以获取到一个地址列表,节点选择要做的事情是通过某种策略从列表中获取指定的 IP 进行访问,这就是 Go Micro 中 Selector 组件发挥作用的地方,它基于 Registry 组件实现,提供了负载均衡策略,比如轮询或随机,以及过滤、缓存和黑名单的功能。

下面我们还是通过分析客户端调用底层源码来看下 Go Micro 框架中服务发现与节点选择的具体实现。首先打开 ~/go/hello/src/hello/client.go 文件,在 main 函数中,通过一系列的初始化操作后,真正发起服务调用的代码是 greeter.Hello 函数调用:

    func main() {
        // Create a new service. Optionally include some options here.
        service := micro.NewService(micro.Name("go.micro.cli.greeter"))
        service.Init()
    
        // Create new greeter client
        greeter := proto.NewGreeterService("go.micro.srv.greeter", service.Client())
    
        // Call the greeter
        rsp, err := greeter.Hello(context.TODO(), &proto.HelloRequest{Name: "学院君"})
        if err != nil {
            fmt.Println(err)
        }
    
        // Print response
        fmt.Println(rsp.Greeting)
    }

greeter.Hello 函数定义在 ~/go/hello/src/hello/proto/hello.micro.go 中:

    func (c *greeterService) Hello(ctx context.Context, in *HelloRequest, opts ...client.CallOption) (*HelloResponse, error) {
        req := c.c.NewRequest(c.name, "Greeter.Hello", in)
        out := new(HelloResponse)
        err := c.c.Call(ctx, req, out, opts...)
        if err != nil {
            return nil, err
        }
        return out, nil
    }

在这个函数中,通过 NewRequest 初始化了请求实例,包含服务名称及请求端点、参数信息,然后初始化了响应实例,接下来是调用 ClientCall 函数,所有请求调用处理的核心逻辑(服务发现、节点选择、请求处理、超时、重试、编码)都在这个函数里,最终源码对应 ~/go/hello/src/github.com/micro/go-micro/client/rpc_client.goCall 函数,这个函数代码量较大,我们选取一些关键片段进行解读。

首先会通过 rpcClient 类的 next 方法获取远程服务节点,在未设置系统环境变量 MICRO_PROXY_ADDRESS 的情况下会执行 Selector 的 Select 方法获取服务节点,默认的 Selector 初始化操作位于 ~/go/hello/src/github.com/micro/go-micro/client/options.gonewOptions 方法(该方法会在 client.go 的初始化操作中调用):

    if opts.Selector == nil {
        opts.Selector = selector.NewSelector(
            selector.Registry(opts.Registry),
        )
    }

这里我们可以看到 Selector 依赖于 Registry 组件,如果没有额外设置的话,基于系统默认的 Registry 实现(这里是 Consul),NewSelector 方法源码如下:

    func NewSelector(opts ...Option) Selector {
        sopts := Options{
            Strategy: Random,
        }
        
        for _, opt := range opts {
            opt(&sopts)
        }
        
        if sopts.Registry == nil {
            sopts.Registry = registry.DefaultRegistry
        }
        
        s := &registrySelector{
            so: sopts,
        }
        
        s.rc = s.newCache()
        
        return s
    }

Selector 默认的负载均衡策略使用的是随机算法(关于 Selector 支持的所有负载均衡算法后面我们还会单独介绍),并且会在本地对节点选择结果进行缓存。

回到 Selector 的 Select 函数,该函数源码定义在 ~/go/hello/src/github.com/micro/go-micro/selector/default.go 中:

    func (c *registrySelector) Select(service string, opts ...SelectOption) (Next, error) {
        sopts := SelectOptions{
            Strategy: c.so.Strategy,
        }
    
        for _, opt := range opts {
            opt(&sopts)
        }
    
        // get the service
        // try the cache first
        // if that fails go directly to the registry
        services, err := c.rc.GetService(service)
        if err != nil {
            return nil, err
        }
    
        // apply the filters
        for _, filter := range sopts.Filters {
            services = filter(services)
        }
    
        // if there's nothing left, return
        if len(services) == 0 {
            return nil, ErrNoneAvailable
        }
    
        return sopts.Strategy(services), nil
    }

通过 c.rc.GetService(service) 传入指定服务名称,再通过默认 Registry 实现 Consul 获取对应的服务实例列表并缓存(如果已缓存则直接返回提高性能,在通过 Registry 获取服务节点列表时还会单独跑一个协程去监听服务注册,如果有新节点注册进来,则加到缓存中,如果有节点故障则删除缓存中的节点信息,具体源码位于 ~/go/hello/src/github.com/micro/go-micro/registry/cache/rcache.go),应用过滤器后最后通过默认负载均衡实现(这里是 Random 算法)返回指定节点,获取到远程服务实例节点后,就可以发起远程服务请求了,回到 rpc_client.goCall 函数,对应的远程调用代码逻辑实现片段如下:

    call := func(i int) error {
      ...
      
      // select next node
        node, err := next()
        if err != nil && err == selector.ErrNotFound {
            return errors.NotFound("go.micro.client", "service %s: %v", request.Service(), err.Error())
        } else if err != nil {
            return errors.InternalServerError("go.micro.client", "error getting next %s node: %v", request.Service(), err.Error())
        }


        // make the call
        err = rcall(ctx, node, request, response, callOpts)
        r.opts.Selector.Mark(request.Service(), node, err)
        return err
       
       ...
      }

上述调用 rpcClientnext 函数返回的并不是真正的节点而是一个匿名函数,到 node, err := next() 这里才真正调用对应的函数返回远程服务节点信息,如果返回节点成功则调用 rcall 函数(即 rpcClientcall 函数)发起远程网络请求(通过协程实现),如果请求处理出错则返回相应错误信息,然后对服务调用成功与否通过 Selector 的 Mark 函数进行标记(以便后续对服务进行监控和治理),最后在 Call 函数中,也是通过协程发起对上述 call 匿名函数的调用:

    ...
    
    for i := 0; i <= callOpts.Retries; i++ {
        go func(i int) {
            ch <- call(i)  // 调用上面定义的 call 匿名函数
        }(i)


        select {
        case <-ctx.Done():
            return errors.Timeout("go.micro.client", fmt.Sprintf("call timeout: %v", ctx.Err()))
        case err := <-ch:
            // if the call succeeded lets bail early
            if err == nil {
                return nil
            }


            retry, rerr := callOpts.Retry(ctx, request, i, err)
            if rerr != nil {
                return rerr
            }


            if !retry {
                return err
            }


            gerr = err
        }
    }
    
    ...

如果服务调用失败,则进行重试或报错处理。

以上就是在 Go Micro 体系内客户端请求服务发现与节点选择的底层实现,如果是以 HTTP 方式从外部通过 Micro API 网关形式对远程服务发起请求,则 API 网关会将 HTTP 请求解析并转化为默认的服务形式,比如 /greeter/say/hello 请求会被转化为服务名为 ` go.micro.api.greeter,方法名为 Say.Hello 的请求,然后调用 go.micro.srv.greeter` 远程服务,后续处理逻辑与上面完全一致。请求处理完成后,返回处理结果给 API 网关,API 网关将其转化为 HTTP 响应返回给调用客户端。

rss facebook twitter github gitlab youtube mail spotify lastfm instagram linkedin google google-plus pinterest medium vimeo stackoverflow reddit quora quora