Eureka 内部机制及源码分析

Eureka 运作原理

Eureka-server 对外提供的是 restful 风格的服务,以http动词的形式对url资源进行操作:getpostputdelete,只要利用这些restful接口我们就能对项目实现注册和发现,只不过eureka已经帮我们使用java语言封装好了client端的代码,让开发者只需要在项目中依赖Eureka Client就能实现注册和发现。

只要能发起 Http 请求,那就可以向Eureka Server进行服务注册和发现,不管是什么语言。

服务注册

当Eureka Client项目启动时,就会向Eureka Server发送自己的元数据(原始数据),如:运行的 ip、端口 port、健康的状态监控等。使用的是HTTP/ResuFul 请求风格。Eureka Server会在自己内部保留这些元数据(JVM内存中),形成一个服务列表。注册过程中为避免网络问题,会尝试3次。Eureka提供的是ResutFul风格的HTTP请求,Eureka封装的Java客户端底层使用的是Jersey框架进行HTTP请求。

注册调用链路

核心类 com.netflix.discovery.DiscoveryClient

注册方法 | com.netflix.discovery.DiscoveryClient#register( )

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
 /**
* Register with the eureka service by making the appropriate REST call.
* Eureka Client注册方法入口.
*/
boolean register() throws Throwable {
logger.info(PREFIX + "{}: registering service...", appPathIdentifier);
EurekaHttpResponse<Void> httpResponse;
try {
// Eureka Client向目标URL进行注册,其实就是将实例的具体信息发送至Eureka Server端
httpResponse = eurekaTransport.registrationClient.register(instanceInfo);
} catch (Exception e) {
logger.warn(PREFIX + "{} - registration failed {}", appPathIdentifier, e.getMessage(), e);
throw e;
}
if (logger.isInfoEnabled()) {
logger.info(PREFIX + "{} - registration status: {}", appPathIdentifier, httpResponse.getStatusCode());
}
return httpResponse.getStatusCode() == Status.NO_CONTENT.getStatusCode();
}

重试封装层 | com.netflix.discovery.shared.transport.decorator.RetryableEurekaHttpClient#execute()

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
@Override
protected <R> EurekaHttpResponse<R> execute(RequestExecutor<R> requestExecutor) {
List<EurekaEndpoint> candidateHosts = null;
int endpointIdx = 0;
for (int retry = 0; retry < numberOfRetries; retry++) { // numberOfRetries固定值,默认为3次,当注册失败时会自动尝试3次
EurekaHttpClient currentHttpClient = delegate.get();
EurekaEndpoint currentEndpoint = null;
if (currentHttpClient == null) {
if (candidateHosts == null) {
candidateHosts = getHostCandidates();
if (candidateHosts.isEmpty()) {
throw new TransportException("There is no known eureka server; cluster server list is empty");
}
}
if (endpointIdx >= candidateHosts.size()) {
throw new TransportException("Cannot execute request on any known server");
}

currentEndpoint = candidateHosts.get(endpointIdx++);
currentHttpClient = clientFactory.newClient(currentEndpoint);
}

try {
// 注册
EurekaHttpResponse<R> response = requestExecutor.execute(currentHttpClient);
if (serverStatusEvaluator.accept(response.getStatusCode(), requestExecutor.getRequestType())) {
delegate.set(currentHttpClient);
if (retry > 0) {
logger.info("Request execution succeeded on retry #{}", retry);
}
return response;
}
logger.warn("Request execution failure with status code {}; retrying on another server if available", response.getStatusCode());
} catch (Exception e) {
logger.warn("Request execution failed with message: {}", e.getMessage()); // just log message as the underlying client should log the stacktrace
}

// Connection error or 5xx from the server that must be retried on another server
delegate.compareAndSet(currentHttpClient, null);
if (currentEndpoint != null) {
quarantineSet.add(currentEndpoint);
}
}
throw new TransportException("Retry limit reached; giving up on completing the request");
}

Jersey封装层 | com.netflix.discovery.shared.transport.jersey.AbstractJerseyEurekaHttpClient#register

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
@Override
public EurekaHttpResponse<Void> register(InstanceInfo info) {
String urlPath = "apps/" + info.getAppName();
ClientResponse response = null;
try {
// 底层使用Jersey框架进行RestFul风格的HTTP请求 | serviceUrl为目标Eureka Server的地址
Builder resourceBuilder = jerseyClient.resource(serviceUrl).path(urlPath).getRequestBuilder();
addExtraHeaders(resourceBuilder);
response = resourceBuilder
.header("Accept-Encoding", "gzip") // 压缩
.type(MediaType.APPLICATION_JSON_TYPE) // json格式
.accept(MediaType.APPLICATION_JSON)
.post(ClientResponse.class, info); // POST请求
return anEurekaHttpResponse(response.getStatus()).headers(headersOf(response)).build();
} finally {
if (logger.isDebugEnabled()) {
logger.debug("Jersey HTTP POST {}{} with instance {}; statusCode={}", serviceUrl, urlPath, info.getId(),
response == null ? "N/A" : response.getStatus());
}
if (response != null) {
response.close();
}
}
}

当Eureka Client启动的时候,会向我们指定的 serviceUrl 发送请求,把自己节点的数据用json格式以post 请求的方式发送过到Eureka Server端。当返回的状态码为 204 的时候,表示注册成功。

服务续约

项目Eureka Client启动成功了,除了向Eureka Server注册自己成功,还会定时的向Eureka Server发送心跳请求,表示自己还活着。

服务下线 | 主动下线

当项目关闭时,会给 eureka-server 报告,说明自己要下机了。

服务剔除 | 主动剔除、被动下线

当项目超过了指定时间没有向 eureka-server 汇报自己,那么 eureka-server 就会认为此节点死掉了,会把它剔除掉,也不会放流量和请求到此节点了。

注册中心有一个容器保留各个注册的服务的信息:IP、端口号、续约时间、健康状态等等。

注册中心有一个剔除机制(定时删除下线的客户端)。

客户端可以设置续约时间(心跳)。

客户端可以从注册中心的拉取一份服务列表信息缓存到本地,但是会有脏读问题。

客户端可以设置拉取服务列表的时间间隔,时间间隔越短,脏读问题出现的概率越小,但是对客户端性能会有影响。

Eureka保护机制

正常情况下Eureka Server接收心跳失败的比率在15分钟之内,低于85%的节点,Eureka server会认为这个实例出现了网络故障,直接删除这个有问题的服务。这样在网络抖动和网络不稳定的情况下就会出现误删除有效的Eureka Client。Eureka Server保护模式主要用于一组客户端和Eureka Server之间存在网络分区场景下的保护。为了防止Eureka Client可以正常运行,但是与Eureka Server网络不畅通情况下(网络延迟等原因),在保护模式开启的情况下,Eureka Server不会立刻将Eureka Client服务剔除,以避免误删除。

Eureka保护机制的核心就是AP!

配置

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
eureka:
server: # server配置,eureka-server既是服务端又是客户端,也就是说,它不仅可以提供客户端注册,同时本身也可以注册到其他server上.
eviction-interval-timer-in-ms: 10000 # 服务端定时剔除下线的实例信息的间隔时间/ms
renewal-percent-threshold: 0.85 # 续约百分比,超过85%的应用没有发送心跳续约,那么eureka会保护服务,不会剔除任何一个实例(eureka会认为是自己的网络问题,AP)
enable-self-preservation: true # Eureka Server的自我保护机制,避免因为网络原因造成误删除,生产环境建议打开

instance: # instance实例配置
instance-id: ${eureka.instance.hostname}:${spring.application.name}:${server.port} # 实例ID名称: 主机名:应用名:端口号
hostname: localhost # 主机名称或者服务节点IP
prefer-ip-address: true # 服务列表以IP形式展示
lease-renewal-interval-in-seconds: 5 # 服务实例的心跳续约时间间隔,需要比上面的eviction-interval-timer-in-ms配置值小
lease-expiration-duration-in-seconds: 20 # Eureka Server至上一次收到Eureka Client心跳之后,等待下一次心跳的超时时间,这个时间内若没有收到下一次心跳,就剔除该客户端实例.

client:
service-url: # 指定注册中心的地址
defaultZone: ${EUREKA_SERVER_URL:http://localhost:8761/eureka}
register-with-eureka: ${REGISTER_WITH_EUREKA:false} # 先将server自己注册自己关掉|默认是开启的(集群模式需要开启,单机一般关闭)
fetch-registry: true # 应用是否去拉取服务列表到本地

保护流程

  1. Eureka服务端会检查最近15分钟内所有Eureka 实例正常心跳占比(这个15分钟是在源码当中有个每15分钟执行一次的定时任务),如果低于85%就会触发自我保护机制。
  2. 触发了保护机制,Eureka将暂时把这些失效的服务保护起来,不让其过期,但这些服务也并不是永远不会过期(该现象可能出现在如果网络不通但是EurekaClient未出现岩机)。
  3. Eureka在启动完成后,每隔60秒会检查一次服务健康状态(这个10秒就是上面提到的Eureka Server查看心跳是否收到默认的配置:eviction-interval-timer-in-ms)
  4. 如果这些被保护起来失效的服务过一段时间后(默认90秒,这个20秒就是上面提到的心跳最大等待时间:lease-expiration-duration-in-seconds)还是没有恢复,就会把这些服务剔除。如果在此期间服务恢复了并且实例心跳占比高于85%时,就会自动关闭自我保护机制。

如果换做别的注册中心,如果一定时间内没有收到心跳会将剔除该服务,这样就出现了严重失误,因为客户端还能正常发送心跳,只是网络延迟问题,而保护机制是为了解决此问题而产生的。


参考文档: