介绍了基于 Consul 作为注册中心的 Go Micro 服务注册底层实现原理,今天我们来看看 Go Micro 中客户端服务发现是如何实现的。
客户端服务发现要复杂一些,涉及到服务发现 Registry 和节点选择 Selector 两部分。
所谓服务发现指的是当我们从客户端向指定服务发起请求时,可以通过名字识别服务,然后通过服务发现获取到包含 IP 地址和端口号的对应远程服务实例,远程服务会在启动时向注册中心注册,退出时注销,客户端无需关心这些细节,由 Go Micro 的 Registry 组件统一处理服务注册与发现逻辑(这里,我们基于 Consul 作为 Registry 的具体实现插件)。
而节点选择指的是,远程服务实例通常部署在多个节点上,通过指定的服务名称可以获取到一个地址列表,节点选择要做的事情是通过某种策略从列表中获取指定的 IP 进行访问,这就是 Go Micro 中 Selector 组件发挥作用的地方,它基于 Registry 组件实现,提供了负载均衡策略,比如轮询或随机,以及过滤、缓存和黑名单的功能。
下面我们还是通过分析客户端调用底层源码来看下 Go Micro 框架中服务发现与节点选择的具体实现。首先打开 ~/go/hello/src/hello/client.go
文件,在 main
函数中,通过一系列的初始化操作后,真正发起服务调用的代码是 greeter.Hello
函数调用:
func main() {
// Create a new service. Optionally include some options here.
service := micro.NewService(micro.Name("go.micro.cli.greeter"))
service.Init()
// Create new greeter client
greeter := proto.NewGreeterService("go.micro.srv.greeter", service.Client())
// Call the greeter
rsp, err := greeter.Hello(context.TODO(), &proto.HelloRequest{Name: "学院君"})
if err != nil {
fmt.Println(err)
}
// Print response
fmt.Println(rsp.Greeting)
}
greeter.Hello
函数定义在 ~/go/hello/src/hello/proto/hello.micro.go
中:
func (c *greeterService) Hello(ctx context.Context, in *HelloRequest, opts ...client.CallOption) (*HelloResponse, error) {
req := c.c.NewRequest(c.name, "Greeter.Hello", in)
out := new(HelloResponse)
err := c.c.Call(ctx, req, out, opts...)
if err != nil {
return nil, err
}
return out, nil
}
在这个函数中,通过 NewRequest
初始化了请求实例,包含服务名称及请求端点、参数信息,然后初始化了响应实例,接下来是调用 Client
的 Call
函数,所有请求调用处理的核心逻辑(服务发现、节点选择、请求处理、超时、重试、编码)都在这个函数里,最终源码对应 ~/go/hello/src/github.com/micro/go-micro/client/rpc_client.go
的 Call
函数,这个函数代码量较大,我们选取一些关键片段进行解读。
首先会通过 rpcClient
类的 next
方法获取远程服务节点,在未设置系统环境变量 MICRO_PROXY_ADDRESS
的情况下会执行 Selector 的 Select
方法获取服务节点,默认的 Selector 初始化操作位于 ~/go/hello/src/github.com/micro/go-micro/client/options.go
的 newOptions
方法(该方法会在 client.go
的初始化操作中调用):
if opts.Selector == nil {
opts.Selector = selector.NewSelector(
selector.Registry(opts.Registry),
)
}
这里我们可以看到 Selector 依赖于 Registry 组件,如果没有额外设置的话,基于系统默认的 Registry 实现(这里是 Consul),NewSelector
方法源码如下:
func NewSelector(opts ...Option) Selector {
sopts := Options{
Strategy: Random,
}
for _, opt := range opts {
opt(&sopts)
}
if sopts.Registry == nil {
sopts.Registry = registry.DefaultRegistry
}
s := ®istrySelector{
so: sopts,
}
s.rc = s.newCache()
return s
}
Selector 默认的负载均衡策略使用的是随机算法(关于 Selector 支持的所有负载均衡算法后面我们还会单独介绍),并且会在本地对节点选择结果进行缓存。
回到 Selector 的 Select
函数,该函数源码定义在 ~/go/hello/src/github.com/micro/go-micro/selector/default.go
中:
func (c *registrySelector) Select(service string, opts ...SelectOption) (Next, error) {
sopts := SelectOptions{
Strategy: c.so.Strategy,
}
for _, opt := range opts {
opt(&sopts)
}
// get the service
// try the cache first
// if that fails go directly to the registry
services, err := c.rc.GetService(service)
if err != nil {
return nil, err
}
// apply the filters
for _, filter := range sopts.Filters {
services = filter(services)
}
// if there's nothing left, return
if len(services) == 0 {
return nil, ErrNoneAvailable
}
return sopts.Strategy(services), nil
}
通过 c.rc.GetService(service)
传入指定服务名称,再通过默认 Registry 实现 Consul 获取对应的服务实例列表并缓存(如果已缓存则直接返回提高性能,在通过 Registry 获取服务节点列表时还会单独跑一个协程去监听服务注册,如果有新节点注册进来,则加到缓存中,如果有节点故障则删除缓存中的节点信息,具体源码位于 ~/go/hello/src/github.com/micro/go-micro/registry/cache/rcache.go
),应用过滤器后最后通过默认负载均衡实现(这里是 Random 算法)返回指定节点,获取到远程服务实例节点后,就可以发起远程服务请求了,回到 rpc_client.go
的 Call
函数,对应的远程调用代码逻辑实现片段如下:
call := func(i int) error {
...
// select next node
node, err := next()
if err != nil && err == selector.ErrNotFound {
return errors.NotFound("go.micro.client", "service %s: %v", request.Service(), err.Error())
} else if err != nil {
return errors.InternalServerError("go.micro.client", "error getting next %s node: %v", request.Service(), err.Error())
}
// make the call
err = rcall(ctx, node, request, response, callOpts)
r.opts.Selector.Mark(request.Service(), node, err)
return err
...
}
上述调用 rpcClient
的 next
函数返回的并不是真正的节点而是一个匿名函数,到 node, err := next()
这里才真正调用对应的函数返回远程服务节点信息,如果返回节点成功则调用 rcall
函数(即 rpcClient
的 call
函数)发起远程网络请求(通过协程实现),如果请求处理出错则返回相应错误信息,然后对服务调用成功与否通过 Selector 的 Mark 函数进行标记(以便后续对服务进行监控和治理),最后在 Call
函数中,也是通过协程发起对上述 call
匿名函数的调用:
...
for i := 0; i <= callOpts.Retries; i++ {
go func(i int) {
ch <- call(i) // 调用上面定义的 call 匿名函数
}(i)
select {
case <-ctx.Done():
return errors.Timeout("go.micro.client", fmt.Sprintf("call timeout: %v", ctx.Err()))
case err := <-ch:
// if the call succeeded lets bail early
if err == nil {
return nil
}
retry, rerr := callOpts.Retry(ctx, request, i, err)
if rerr != nil {
return rerr
}
if !retry {
return err
}
gerr = err
}
}
...
如果服务调用失败,则进行重试或报错处理。
以上就是在 Go Micro 体系内客户端请求服务发现与节点选择的底层实现,如果是以 HTTP 方式从外部通过 Micro API 网关形式对远程服务发起请求,则 API 网关会将 HTTP 请求解析并转化为默认的服务形式,比如 /greeter/say/hello
请求会被转化为服务名为 ` go.micro.api.greeter,方法名为
Say.Hello 的请求,然后调用
go.micro.srv.greeter` 远程服务,后续处理逻辑与上面完全一致。请求处理完成后,返回处理结果给 API 网关,API 网关将其转化为 HTTP 响应返回给调用客户端。