我们先回顾下通过客户端请求服务端部署的服务:
// Create a new service. Optionally include some options here.
service := micro.NewService(micro.Name("go.micro.cli.greeter"))
service.Init()
// Create new greeter client
greeter := proto.NewGreeterService("go.micro.srv.greeter", service.Client())
// Call the greeter
rsp, err := greeter.Hello(context.TODO(), &proto.HelloRequest{Name: "学院君"})
if err != nil {
fmt.Println(err)
}
// Print response
fmt.Println(rsp.Greeting)
需要指定调用的服务名称(go.micro.srv.greeter
),端点信息(Greeter.Hello
),已经请求参数(可选),如果区分版本的话还要指定服务的版本,我们在终端运行这个客户端调用,就会打印出服务端返回的数据:
然后在运行 Consul 代理的终端窗口,可以看到如下这条日志,表示有请求过来查询可用的服务节点信息:
下面我们分析下 Go Micro 中客户端如何从 Consul Server 中查询服务名称对应的节点信息,已经如何根据获取到的可用节点发起服务请求。关于这一块和我们前面介绍的服务发现实现原理类似,只不过这一次更加底层,更加彻底。
根据前面的介绍我们知道,客户端服务调用最终会走到 github.com/micro/go-micro/client/rpc_client.go
的 Call
方法,在该方法中会通过 next()
方法获取部署微服务的某个节点,然后针对该节点发起请求调用:
node, err := next()
...
// make the call
err = rcall(ctx, node, request, response, callOpts)
我们先来看看服务节点的查询,进入 next
方法源码:
func (r *rpcClient) next(request Request, opts CallOptions) (selector.Next, error) {
service := request.Service()
...
// get next nodes from the selector
next, err := r.opts.Selector.Select(service, opts.SelectOptions...)
if err != nil && err == selector.ErrNotFound {
return nil, errors.NotFound("go.micro.client", "service %s: %v", service, err.Error())
} else if err != nil {
return nil, errors.InternalServerError("go.micro.client", "error selecting %s node: %v", service, err.Error())
}
return next, nil
}
request
对象实例数据如下:
service
即我们要获取的服务名称 go.micro.srv.greeter
,Go Micro 框架在 Registry 之上封装了 Selector 组件来对服务节点进行负载均衡,所以服务节点的信息通过 Selector 组件来获取,默认的 Selector 类是 registrySelector
,对应源码位于 github.com/micro/go-micro/selector/default.go
,我们来看其中的 Select
方法是如何实现服务节点查询的:
func (c *registrySelector) Select(service string, opts ...SelectOption) (Next, error) {
sopts := SelectOptions{
Strategy: c.so.Strategy,
}
for _, opt := range opts {
opt(&sopts)
}
// get the service
// try the cache first
// if that fails go directly to the registry
services, err := c.rc.GetService(service)
if err != nil {
return nil, err
}
// apply the filters
for _, filter := range sopts.Filters {
services = filter(services)
}
// if there's nothing left, return
if len(services) == 0 {
return nil, ErrNoneAvailable
}
return sopts.Strategy(services), nil
}
c.so
属性在 Selector 的构造函数中初始化:
func NewSelector(opts ...Option) Selector {
sopts := Options{
Strategy: Random,
}
for _, opt := range opts {
opt(&sopts)
}
if sopts.Registry == nil {
sopts.Registry = registry.DefaultRegistry
}
s := ®istrySelector{
so: sopts,
}
s.rc = s.newCache()
return s
}
默认的负载均衡策略是随机算法,即从所有可用服务节点中随机选择一个,默认的注册中心属性即 Go Micro 框架默认的注册中心,此外还提供了缓存层对注册中心查询结果进行缓存(基于 Registry 组件中的 Cache 类,源码位于 github.com/micro/go-micro/registry/cache
目录)。
回到 Select
方法,设置了节点选择策略后,我们通过 c.rc.getService(service)
从缓存层获取 指定服务名称对应的所有节点,该方法源码定义在 github.com/micro/go-micro/registry/cache/rcache.go
中:
func (c *cache) GetService(service string) ([]*registry.Service, error) {
// get the service
services, err := c.get(service)
...
// return services
return services, nil
}
核心逻辑定义在同一个类中的 get
方法:
func (c *cache) get(service string) ([]*registry.Service, error) {
// read lock
c.RLock()
// check the cache first
services := c.cache[service]
// get cache ttl
ttl := c.ttls[service]
// got services && within ttl so return cache
if c.isValid(services, ttl) {
// make a copy
cp := c.cp(services)
// unlock the read
c.RUnlock()
// return servics
return cp, nil
}
// get does the actual request for a service and cache it
get := func(service string) ([]*registry.Service, error) {
// ask the registry
services, err := c.Registry.GetService(service)
if err != nil {
return nil, err
}
// cache results
c.Lock()
c.set(service, c.cp(services))
c.Unlock()
return services, nil
}
// watch service if not watched
if _, ok := c.watched[service]; !ok {
go c.run(service)
}
// unlock the read lock
c.RUnlock()
// get and return services
return get(service)
}
在该方法中,先判断缓存中是否有记录且有效,如果有效的话直接返回,否则通过 c.Registry.GetService(service)
从注册中心中查询对应的服务节点,c.Registry
在构造 Cache 类的时候已经被初始化为和 Selector 组件一致的默认注册中心实例,也就是 consul
,然后我们调用 consulRegistry
的 GetService
方法获取对应服务节点,源码定义在 github.com/micro/go-micro/registry/consul/consul.go
文件中:
func (c *consulRegistry) GetService(name string) ([]*registry.Service, error) {
var rsp []*consul.ServiceEntry
var err error
// if we're connect enabled only get connect services
if c.connect {
rsp, _, err = c.Client.Health().Connect(name, "", false, c.queryOptions)
} else {
rsp, _, err = c.Client.Health().Service(name, "", false, c.queryOptions)
}
if err != nil {
return nil, err
}
serviceMap := map[string]*registry.Service{}
for _, s := range rsp {
if s.Service.Service != name {
continue
}
// version is now a tag
version, _ := decodeVersion(s.Service.Tags)
// service ID is now the node id
id := s.Service.ID
// key is always the version
key := version
// address is service address
address := s.Service.Address
// use node address
if len(address) == 0 {
address = s.Node.Address
}
svc, ok := serviceMap[key]
if !ok {
svc = ®istry.Service{
Endpoints: decodeEndpoints(s.Service.Tags),
Name: s.Service.Service,
Version: version,
}
serviceMap[key] = svc
}
var del bool
for _, check := range s.Checks {
// delete the node if the status is critical
if check.Status == "critical" {
del = true
break
}
}
// if delete then skip the node
if del {
continue
}
svc.Nodes = append(svc.Nodes, ®istry.Node{
Id: id,
Address: address,
Port: s.Service.Port,
Metadata: decodeMetadata(s.Service.Tags),
})
}
var services []*registry.Service
for _, service := range serviceMap {
services = append(services, service)
}
return services, nil
}
在这个方法中,无论 c.connect
是否为 true,代码都会执行到 Health
类的 Service
方法,然后调用 service
方法执行核心逻辑,源码位于 github.com/hashicorp/consul/api/health.go
:
func (h *Health) service(service string, tags []string, passingOnly bool, q *QueryOptions, connect bool) ([]*ServiceEntry, *QueryMeta, error) {
path := "/v1/health/service/" + service
if connect {
path = "/v1/health/connect/" + service
}
r := h.c.newRequest("GET", path)
r.setQueryOptions(q)
if len(tags) > 0 {
for _, tag := range tags {
r.params.Add("tag", tag)
}
}
if passingOnly {
r.params.Set(HealthPassing, "1")
}
rtt, resp, err := requireOK(h.c.doRequest(r))
if err != nil {
return nil, nil, err
}
defer resp.Body.Close()
qm := &QueryMeta{}
parseQueryMeta(resp, qm)
qm.RequestTime = rtt
var out []*ServiceEntry
if err := decodeBody(resp, &out); err != nil {
return nil, nil, err
}
return out, qm, nil
}
在这里,我们正式组装出针对 Consul 提供的 HTTP API 调用 URL 和参数并将其设置到请求对象 r
上,最后通过 h.c.doRequest(r)
发起请求查询指定服务名称下所有有效的服务节点,h.c
实例对应的类是定义在 github.com/hashicorp/consul/api/api.go
中的 Client
类,对应的请求组装、发起方法源码都在这个类中,对应的请求日志正是我们开头看到的 Consul 终端输出的那个 GET 请求。
如果调用过程中没有错误,则会对响应实体进行解码后返回给 Go Micro 中的 consul
组件,回到 GetService
方法,对响应结果进行遍历,然后将有效的服务节点信息添加到 serviceMap
字典并将其返回给上一层的缓存方法缓存起来。
缓存方法中还有一段监听该服务的代码:
go c.run(service)
这段代码通过协程异步启动,最终调用的是 consul.go
的 Watch
方法,其用途是监听到指定服务名称下有新的节点加入将其添加到缓存,如果有节点删除或出现故障,则将起从缓存中删除,这样一来,客户端和注册中心、微服务节点完全解耦,同时还能保证服务的可用性。
接下来,我们回到 registrySelector
类的 Select
方法,获取到从缓存层缓存的节点信息后,如果设置了过滤器的话还会运行过滤器对服务节点进行过滤(比如 IP 黑名单之类的),然后运行指定的负载均衡策略算法从服务节点中选取一个进行通信。
在 Go Micro 框架中,最终针对实际部署微服务的节点发起请求并获取响应结果的逻辑位于 github.com/micro/go-micro/client/rpc_client.go
的 call
方法:
func (r *rpcClient) call(ctx context.Context, node *registry.Node, req Request, resp interface{}, opts CallOptions) error {
...
go func() {
...
// send request
if err := stream.Send(req.Body()); err != nil {
ch <- err
return
}
// recv request
if err := stream.Recv(resp); err != nil {
ch <- err
return
}
// success
ch <- nil
}()
...
}
stream.Send(req.Body())
方法对应源码位于 github.com/micro/go-micro/client/rpc_stream.go
文件中,req
实例和上面截图数据一致:
func (r *rpcStream) Send(msg interface{}) error {
r.Lock()
defer r.Unlock()
if r.isClosed() {
r.err = errShutdown
return errShutdown
}
req := codec.Message{
Id: r.id,
Target: r.request.Service(),
Method: r.request.Method(),
Endpoint: r.request.Endpoint(),
Type: codec.Request,
}
if err := r.codec.Write(&req, msg); err != nil {
r.err = err
return err
}
return nil
}
r.codec
在上述 rpc_client.go
文件的 call
方法中有设置:
codec := newRpcCodec(msg, c, cf)
对应的 Write()
方法定义在 github.com/micro/go-micro/client/rpc_codec.go
中:
func (c *rpcCodec) Write(m *codec.Message, body interface{}) error {
...
setHeaders(m)
if body != nil {
b, ok := body.(*raw.Frame)
if ok {
// set body
m.Body = b.Data
body = nil
}
}
if len(m.Body) == 0 {
// write to codec
if err := c.codec.Write(m, body); err != nil {
return errors.InternalServerError("go.micro.client.codec", err.Error())
}
// set body
m.Body = c.buf.wbuf.Bytes()
}
// create new transport message
msg := transport.Message{
Header: m.Header,
Body: m.Body,
}
// send the request
if err := c.client.Send(&msg); err != nil {
return errors.InternalServerError("go.micro.client.transport", err.Error())
}
return nil
}
在这个方法中,先通过 setHeaders(m)
设置请求头(包含服务名、端点、版本),c.codec
对应的是默认 app lication/protobuf
类型对应的 Codec 组件 proro
,相应源码位于 github.com/micro/go-micro/codec/proto/proto.go
,通过编码器提供的方法对请求实体(req.Body()
返回数据)进行编码,然后经过 transport.Message
转化后再通过 c.client.Send()
方法发送,c.client
是从连接池中取出的 poolConn
指针,对应数据结构定义在 github.com/micro/go-micro/client/rpc_pool.go
中:
type poolConn struct {
transport.Client
created int64
}
Send
方法即定义在这个类中,Go Micro 框架默认的 transport
传输协议是 HTTP,所以对应的 Client 是 httpTransportClient
,源码位于 github.com/micro/go-micro/transport/http_transport.go
,最终对应的 Send()
方法定义如下:
func (h *httpTransportClient) Send(m *Message) error {
header := make(http.Header)
for k, v := range m.Header {
header.Set(k, v)
}
reqB := bytes.NewBuffer(m.Body)
defer reqB.Reset()
buf := &buffer{
reqB,
}
req := &http.Request{
Method: "POST",
URL: &url.URL{
Scheme: "http",
Host: h.addr,
},
Header: header,
Body: buf,
ContentLength: int64(reqB.Len()),
Host: h.addr,
}
h.Lock()
h.bl = append(h.bl, req)
select {
case h.r <- h.bl[0]:
h.bl = h.bl[1:]
default:
}
h.Unlock()
// set timeout if its greater than 0
if h.ht.opts.Timeout > time.Duration(0) {
h.conn.SetDeadline(time.Now().Add(h.ht.opts.Timeout))
}
return req.Write(h.conn)
}
在这里会构建 HTTP 请求,并发送这个请求到部署微服务的指定节点(之前从通过 Selector 组件从注册中心获取到的服务节点)。
响应的接收和请求是分离的,不过分析思路类似,这个与服务节点查询和请求处理关系不是很大,这里就不展开了。