上篇分享我们简单介绍了 Consul 的基本原理,以及在开发模式下的启动过程,今天我们还是以开发模式为例,介绍在 Consul 底层,服务注册、删除和健康检查是如何实现的。
服务注册
假设你已经在本地环境通过 consul agent -dev
启动了 Consul 代理,然后到之前示例的微服务项目 hello
中启动微服务完成服务注册:
然后我们在启动 Consul 代理的终端窗口也可以看到服务注册日志:
关于 Go Micro 框架中启动服务时注册服务到 Consul 的底层源码我们在服务注册底层实现中已经详细介绍过,这里我们重点看下 Consul 这边做了什么。
服务可以通过服务定义文件或者是 HTTP API 的方式进行注册,基于 Go Micro 框架的 Registry 组件 ` consul 注册服务显然是通过 HTTP API 的方式,在日志中也可以看到这个 PUT 请求,对应的服务注册接口是
/v1/agent/service/register,这个逻辑可以在
github.com/hashicorp/consul/api/agent.go 的
ServiceRegister` 方法中看到:
func (a *Agent) ServiceRegister(service *AgentServiceRegistration) error {
r := a.c.newRequest("PUT", "/v1/agent/service/register")
r.obj = service
_, resp, err := requireOK(a.c.doRequest(r))
if err != nil {
return err
}
resp.Body.Close()
return nil
}
服务信息数据会从 github.com/micro/go-micro/registry/consul/consul.go
的 Register
方法中传过来,对应的数据结构是个 AgentServiceRegistration
类:
// AgentServiceRegistration is used to register a new service
type AgentServiceRegistration struct {
Kind ServiceKind `json:",omitempty"`
ID string `json:",omitempty"`
Name string `json:",omitempty"`
Tags []string `json:",omitempty"`
Port int `json:",omitempty"`
Address string `json:",omitempty"`
EnableTagOverride bool `json:",omitempty"`
Meta map[string]string `json:",omitempty"`
Weights *AgentWeights `json:",omitempty"`
Check *AgentServiceCheck
Checks AgentServiceChecks
// DEPRECATED (ProxyDestination) - remove this field
ProxyDestination string `json:",omitempty"`
Proxy *AgentServiceConnectProxyConfig `json:",omitempty"`
Connect *AgentServiceConnect `json:",omitempty"`
}
其中 ID 是服务节点 ID,即 go.micro.srv.greeter-eac4032b-5b26-45fe-a1bd-0e71266d042e
,Name 即服务节点名称 go.micro.srv.greeter
,Address 是服务节点的 IP 地址,Port 则是服务节点上运行微服务的进程端口,Tags 则是对服务节点的元数据 Metadata 以及所有端点(Endpoint)、版本信息进行编码后的切片:
// encode the tags
tags := encodeMetadata(node.Metadata)
tags = append(tags, encodeEndpoints(s.Endpoints)...)
tags = append(tags, encodeVersion(s.Version)...)
服务节点的 Metadata 数据结构如下:
AgentServiceRegistration 中的 Check 包含服务健康检查信息(包含 IP、端口、间隔时间)。所有这些信息组装处理后在 consul
组件的服务注册方法(Register
)中调用代理的 ServiceRegister
发送过来:
// register the service
asr := &consul.AgentServiceRegistration{
ID: node.Id,
Name: s.Name,
Tags: tags,
Port: node.Port,
Address: node.Address,
Check: check,
}
// Specify consul connect
if c.connect {
asr.Connect = &consul.AgentServiceConnect{
Native: true,
}
}
if err := c.Client.Agent().ServiceRegister(asr); err != nil {
return err
}
服务数据的结构符合 Consul 官方文档约定的格式才可以通过 HTTP API 进行注册:https://www.consul.io/docs/agent/services.html。
服务注册成功后如果是集群部署的话还会把这个服务从领导者同步到其它 Consul Server Follower 节点。
健康检查
服务注册时会带上健康检查信息,Go Micro 实现的 consul
组件支持通过 TCP 或 TTL 两种方式来实现健康检查,在 Register 方法中可以看到对应的实现:
var regTCPCheck bool
var regInterval time.Duration
var options registry.RegisterOptions
for _, o := range opts {
o(&options)
}
// 如果 consul 组件上下文不为空且包含 consul_tcp_check 配置,则设置通过 TCP 进行健康检查
if c.opts.Context != nil {
if tcpCheckInterval, ok := c.opts.Context.Value("consul_tcp_check").(time.Duration); ok {
regTCPCheck = true
regInterval = tcpCheckInterval
}
}
...
// 如果服务已经注册并匹配到则对其进行健康检查,如果通过则不再重新注册服务节点信息
if ok && v == h {
// 如果 options.TTL = 0
if options.TTL == time.Duration(0) {
// 确保服务节点没有被 Consul 删除
if time.Since(lastChecked) <= getDeregisterTTL(regInterval) {
return nil
}
// 检查服务节点是否健康,如果健康并且与传入 node.id 相匹配则表示不需要进行注册
services, _, err := c.Client.Health().Checks(s.Name, c.queryOptions)
if err == nil {
for _, v := range services {
if v.ServiceID == node.Id {
return nil
}
}
}
} else {
// 如果 options.TTL > 0,则调用 Consul 代理 API 更新健康检查 TTL,
// 调用成功则不再注册服务节点,不成功则不能确保服务状态,故而重新注册
if err := c.Client.Agent().PassTTL("service:"+node.Id, ""); err == nil {
return nil
}
}
}
...
var check *consul.AgentServiceCheck
if regTCPCheck {
// 如果是通过 TCP 进行健康检查,则通过 regInterval 设置 TTL
deregTTL := getDeregisterTTL(regInterval)
// 然后初始化健康检查接口数据
check = &consul.AgentServiceCheck{
TCP: fmt.Sprintf("%s:%d", node.Address, node.Port),
Interval: fmt.Sprintf("%v", regInterval),
DeregisterCriticalServiceAfter: fmt.Sprintf("%v", deregTTL),
}
} else if options.TTL > time.Duration(0) {
// 如果 options.TTL 大于0,则通过 TTL 进行健康检查,通过该值设置 TTL
deregTTL := getDeregisterTTL(options.TTL)
// 然后初始化健康检查接口数据
check = &consul.AgentServiceCheck{
TTL: fmt.Sprintf("%v", options.TTL),
DeregisterCriticalServiceAfter: fmt.Sprintf("%v", deregTTL),
}
}
...
// 如果 TTL = 0,直接返回
if options.TTL == time.Duration(0) {
return nil
}
// 如果 TTL > 0,则服务注册后重新设置 TTL
return c.Client.Agent().PassTTL("service:"+node.Id, "")
Consul 健康检查相关接口 API 方法和数据结构都定义在 github.com/hashicorp/consul/api/agent.go
中,可以自行去查看,在 Go Micro 中,options.TTL
可以通过启动服务时传递 MICRO_REGISTER_TTL
(过期时间) 和 MICRO_REGISTER_INTERVAL
(重新注册间隔时间)参数来设置(或者将它们放到环境变量中),或者你还可以在设置 consul_tcp_check
配置项通过 TCP 来进行健康检查。
默认情况下,既没有设置 options.TTL
,也没有设置 consul_tcp_check
,所以在本地示例微服务运行过程中,没有进行健康检查,相应的注册服务节点时对应的 check
实例值为 nil。
服务删除
当我们通过 Ctrl+C 终止服务的时候,会对 Consul 中的服务节点进行反注册操作(删除):
从 Consul 代理运行窗口的日志中也可以看到相应的操作流程,也是通过调用对应的 HTTP API 接口实现的:
对应的 API 接口是 /v1/agent/service/deregister/<node.id>
,请求方式是 PUT
。
Go Micro 框架的 consul
组件对应的方法是 Deregister
:
func (c *consulRegistry) Deregister(s *registry.Service) error {
if len(s.Nodes) == 0 {
return errors.New("Require at least one node")
}
// delete our hash and time check of the service
c.Lock()
delete(c.register, s.Name)
delete(c.lastChecked, s.Name)
c.Unlock()
node := s.Nodes[0]
return c.Client.Agent().ServiceDeregister(node.Id)
}
Consul Agent 对应的调用方法实现如下:
// ServiceDeregister is used to deregister a service with
// the local agent
func (a *Agent) ServiceDeregister(serviceID string) error {
r := a.c.newRequest("PUT", "/v1/agent/service/deregister/"+serviceID)
_, resp, err := requireOK(a.c.doRequest(r))
if err != nil {
return err
}
resp.Body.Close()
return nil
}