前言
上篇文章我们介绍了Spring Cloud Ribbon中实现客户端负载均衡的一些基本脉络,了解了它的一些特点。
虽然Spring Cloud 中定义了 LoadBalancerClient
作为负载均衡器的通用接口,并且针对Ribbon实现了RibbonLoadBalancerClient
,但是它在具体实现客户端负载均衡时,是通过Ribbon的ILoadBalancer
接口实现的。
我们这篇文章来看下ILoadBalancer
接口的实现类,来了解它是如何实现客户端负载均衡的。
正文
AbstractLoadBalancer
AbstractLoadBalancer
是ILoadBalancer
接口的抽象实现。该类中定义了一个关于服务实例的分组枚举类ServerGroup
,包含三种不同的类型。
- ALL:所有服务实例。
- STATUS_UP:正常服务的实例。
- STATUS_NOT_UP:停止服务的实例。
另外还实现一个chooseServer()
函数,该函数通过调用接口的chooseServer(Object key)
实现,其中参数key
为null
,表示在选择具体服务实例时忽略key
的条件判断。
还定义了两个抽象函数:
- getServerList(ServerGroup serverGroup):根据分组类型来获取不同的服务列表。
- getLoadBalancerStats():定义获取
LoadBalancerStats
对象的方法。
PS:LoadBalancerStats
对象用来存储负载均衡器中各个服务实例当前的属性和统计信息。这些信息可以帮助我们观察负载均衡器的运行情况,制定合适的负载均衡策略。
1 | public abstract class AbstractLoadBalancer implements ILoadBalancer { |
BaseLoadBalancer
BaseLoadBalancer
类是Ribbon负载均衡器的基础实现类,该类中定义了很多有关负载均衡器的基础内容。
定义维护了两个存储服务实例Server对象列表。一个用于存储所有服务实例的清单,一个用于存储正常服务的清单。
1
2
3
4
5
6
protected volatile List<Server> allServerList = Collections
.synchronizedList(new ArrayList<Server>());
protected volatile List<Server> upServerList = Collections
.synchronizedList(new ArrayList<Server>());定义了用来存储负载均衡服务器各服务实例属性和统计信息的
LoadBalancerStats
对象。定义了检查服务实例是否正常的
IPing
对象,在BaseLoadBalancer
中默认为null
,需要在构造时注入它的具体实现。定义了检查服务实例操作的执行策略对象
IPingStrategy
,在BaseLoadBalancer
中默认使用了该类中定义的静态内部类SerialPingStrategy
实现。根据源码,我们可以看到该策略采用线性遍历ping服务实例的方式实现检查。该策略在当
IPing
的实现速度并不理想,或者Server
列表过大时,可能会影响系统性能,这时候需要通过实现IPingStrategy
接口并重写pingServers(IPing ping, Server[] servers)
函数去扩展ping的执行策略。1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22private static class SerialPingStrategy implements IPingStrategy {
public boolean[] pingServers(IPing ping, Server[] servers) {
int numCandidates = servers.length;
boolean[] results = new boolean[numCandidates];
logger.debug("LoadBalancer: PingTask executing [{}] servers configured", numCandidates);
for (int i = 0; i < numCandidates; i++) {
results[i] = false; /* Default answer is DEAD. */
try {
if (ping != null) {
results[i] = ping.isAlive(servers[i]);
}
} catch (Exception e) {
logger.error("Exception while pinging Server: '{}'", servers[i], e);
}
}
return results;
}
}比如我们想改成并行处理,则需要写自己的并行ping策略,如下:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35public class ParallelPingStrategy implements IPingStrategy {
private static Logger logger = LoggerFactory
.getLogger(ParallelPingStrategy.class);
public boolean[] pingServers(IPing ping, Server[] servers) {
int numCandidates = servers.length;
boolean[] results = new boolean[numCandidates];
logger.debug("LoadBalancer: PingTask executing [{}] servers configured", numCandidates);
List<Future<Boolean>> futures = new ArrayList<>();
ExecutorService ex = Executors.newFixedThreadPool(numCandidates+10);
for (Server server : servers) {
if(ping !=null){
futures.add(ex.submit(()->ping.isAlive(server)));
}
}
for (int i = 0; i < numCandidates; i++) {
results[i] = false;
try {
if(!futures.isEmpty()){
results[i] = futures.get(i).get();
}
}catch (Exception e){
logger.error("Exception while pinging Server: '{}'", servers[i], e);
}
}
return results;
}
}然后添加到配置即可。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
public class CustomLoadBalancer {
ILoadBalancer loadBalancer(){
return new BaseLoadBalancer(new NIWSDiscoveryPing(),new RandomRule(),new ParallelPingStrategy());
}
}
public class SakuraConsumerApplication {
//.....略
}定义了负载均衡的处理规则
IRule
对象,从BaseLoadBalancer
中的chooseServer(Object key)
源码,我们可以知道,负载均衡器实际将服务实例选择任务委托给了IRule
实例中的choose
函数来实现,而这里默认初始化了RoundRobinRule
作为IRule
的实现对象。RoundRobinRule
实现的是最基本的线性负载均衡规则。1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16public Server chooseServer(Object key) {
if (counter == null) {
counter = createCounter();
}
counter.increment();
if (rule == null) {
return null;
} else {
try {
return rule.choose(key);
} catch (Exception e) {
logger.warn("LoadBalancer [{}]: Error choosing server for key {}", name, key, e);
return null;
}
}
}启动ping任务:在
BaseLoadBalancer
的默认构造函数中,会直接启动一个用于定时检查Server是否健康的任务。该任务默认的执行间隔时间为10s。1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19public BaseLoadBalancer() {
this.name = DEFAULT_NAME;
this.ping = null;
setRule(DEFAULT_RULE);
setupPingTask();
lbStats = new LoadBalancerStats(DEFAULT_NAME);
}
void setupPingTask() {
if (canSkipPing()) {
return;
}
if (lbTimer != null) {
lbTimer.cancel();
}
lbTimer = new ShutdownEnabledTimer("NFLoadBalancer-PingTimer-" + name,
true);
lbTimer.schedule(new PingTask(), 0, pingIntervalSeconds * 1000);
forceQuickPing();
}实现了
ILoadBalancer
接口定义的负载均衡器应具备以下一系列基本操作。addServers(List
newServers):向负载均衡器中添加新的服务实例列表。 1
2
3
4
5
6
7
8
9
10
11
12
13
public void addServers(List<Server> newServers) {
if (newServers != null && newServers.size() > 0) {
try {
ArrayList<Server> newList = new ArrayList<Server>();
newList.addAll(allServerList);
newList.addAll(newServers);
setServersList(newList);
} catch (Exception e) {
logger.error("LoadBalancer [{}]: Exception while adding Servers", name, e);
}
}
}chooseServer(Object key):挑选一个具体服务实例。代码见上面。
markServerDown(Server server):标记某个服务实例暂停服务。
1
2
3
4
5
6
7
8
9
10
11public void markServerDown(Server server) {
if (server == null || !server.isAlive()) {
return;
}
logger.error("LoadBalancer [{}]: markServerDown called on [{}]", name, server.getId());
server.setAlive(false);
// forceQuickPing();
notifyServerStatusChangeListener(singleton(server));
}getReachableServers():获取可用的服务实例列表。由于
BaseLoadBalancer
中单独维护了一个正常服务的实例清单,所以直接返回即可。1
2
3
4
public List<Server> getReachableServers() {
return Collections.unmodifiableList(upServerList);
}getAllServers():获取所有的服务实例列表。同样由于
BaseLoadBalancer
中单独维护了一个所有服务的实例清单,所以直接返回即可。1
2
3
4
public List<Server> getAllServers() {
return Collections.unmodifiableList(allServerList);
}
DynamicServerListLoadBalancer
DynamicServerListLoadBalancer
类继承于BaseLoadBalancer
类,它是对基础负载均衡器的扩展。在该负载均衡器中,实现了服务实例清单在运行期的动态更新能力;同时它还具备了对服务实例清单的过滤功能。
我们来看下相比BaseLoadBalancer
,该类新增了哪些内容。
ServerList
在类成员定义中,我们可以发现新增了一个关于服务列表的操作对象
ServerList
,泛型 T 根据serverListImpl DynamicServerListLoadBalancer
可知它是一个Server
的子类。ServerList
接口定义如下:1
2
3
4
5
6
7public interface ServerList<T extends Server> {
public List<T> getInitialListOfServers();
public List<T> getUpdatedListOfServers();
}- getInitialListOfServers():获取初始化的服务实例清单。
- getUpdatedListOfServers():获取更新的服务实例清单。
DynamicServerListLoadBalancer
里默认是使用哪个ServerList
的实现类呢?在
EurekaRibbonClientConfiguration
类中,我们可以找到如下创建ServerList
实例的内容。1
2
3
4
5
6
7
8
9
10
11
12
public ServerList<?> ribbonServerList(IClientConfig config, Provider<EurekaClient> eurekaClientProvider) {
if (this.propertiesFactory.isSet(ServerList.class, serviceId)) {
return this.propertiesFactory.get(ServerList.class, config, serviceId);
}
DiscoveryEnabledNIWSServerList discoveryServerList = new DiscoveryEnabledNIWSServerList(
config, eurekaClientProvider);
DomainExtractingServerList serverList = new DomainExtractingServerList(
discoveryServerList, config, this.approximateZoneFromHostname);
return serverList;
}这里我们看到它构造函数传入了
DiscoveryEnabledNIWSServerList
,我们来看下代码。1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29public class DomainExtractingServerList implements ServerList<DiscoveryEnabledServer> {
private ServerList<DiscoveryEnabledServer> list;
private final RibbonProperties ribbon;
private boolean approximateZoneFromHostname;
public DomainExtractingServerList(ServerList<DiscoveryEnabledServer> list,
IClientConfig clientConfig, boolean approximateZoneFromHostname) {
this.list = list;
this.ribbon = RibbonProperties.from(clientConfig);
this.approximateZoneFromHostname = approximateZoneFromHostname;
}
public List<DiscoveryEnabledServer> getInitialListOfServers() {
List<DiscoveryEnabledServer> servers = setZones(this.list
.getInitialListOfServers());
return servers;
}
public List<DiscoveryEnabledServer> getUpdatedListOfServers() {
List<DiscoveryEnabledServer> servers = setZones(this.list
.getUpdatedListOfServers());
return servers;
}
//......
}可以看到调用了
DiscoveryEnabledNIWSServerList
的getInitialListOfServers()
方法和getUpdatedListOfServers()
方法。这两个方法通过EurekaClient
从服务中心中获取到具体的服务实例列表。1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
public List<DiscoveryEnabledServer> getInitialListOfServers(){
return obtainServersViaDiscovery();
}
public List<DiscoveryEnabledServer> getUpdatedListOfServers(){
return obtainServersViaDiscovery();
}
private List<DiscoveryEnabledServer> obtainServersViaDiscovery() {
List<DiscoveryEnabledServer> serverList = new ArrayList<DiscoveryEnabledServer>();
if (eurekaClientProvider == null || eurekaClientProvider.get() == null) {
logger.warn("EurekaClient has not been initialized yet, returning an empty list");
return new ArrayList<DiscoveryEnabledServer>();
}
EurekaClient eurekaClient = eurekaClientProvider.get();
if (vipAddresses!=null){
for (String vipAddress : vipAddresses.split(",")) {
// if targetRegion is null, it will be interpreted as the same region of client
List<InstanceInfo> listOfInstanceInfo = eurekaClient.getInstancesByVipAddress(vipAddress, isSecure, targetRegion);
for (InstanceInfo ii : listOfInstanceInfo) {
if (ii.getStatus().equals(InstanceStatus.UP)) {
if(shouldUseOverridePort){
if(logger.isDebugEnabled()){
logger.debug("Overriding port on client name: " + clientName + " to " + overridePort);
}
InstanceInfo copy = new InstanceInfo(ii);
if(isSecure){
ii = new InstanceInfo.Builder(copy).setSecurePort(overridePort).build();
}else{
ii = new InstanceInfo.Builder(copy).setPort(overridePort).build();
}
}
DiscoveryEnabledServer des = createServer(ii, isSecure, shouldUseIpAddr);
serverList.add(des);
}
}
if (serverList.size()>0 && prioritizeVipAddressBasedServers){
break; // if the current vipAddress has servers, we dont use subsequent vipAddress based servers
}
}
}
return serverList;
}ServerListUpdater
通过上面内容,我们已经知道Ribbon可以从Eureka Server中获取服务实例清单。那么它又是如何触发向Eureka Server去获取服务清单以及如何在获取到服务实例清单后更新本地的服务实例清单的呢?
这就要涉及到
ServerListUpdater
的内容了,我们来看一下。1
2
3
4
5
6protected final ServerListUpdater.UpdateAction updateAction = new ServerListUpdater.UpdateAction() {
public void doUpdate() {
updateListOfServers();
}
};我们来看下
ServerListUpdater
接口内容。1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24public interface ServerListUpdater {
//服务更新接口
public interface UpdateAction {
void doUpdate();
}
//启动服务更新
void start(UpdateAction updateAction);
//停止服务更新器
void stop();
//获取最近的更新时间戳
String getLastUpdate();
//获取上一次更新到现在的时间间隔,单位毫秒
long getDurationSinceLastUpdateMs();
//获取错过的更新周期数
int getNumberMissedCycles();
//获取核心线程数
int getCoreThreads();
}ServerListUpdater
的实现类有两个,如下:PollingServerListUpdater:动态服务列表更新的默认策略,使用的是定时任务的方式进行服务列表的更新。
EurekaNotificationServerListUpdater:利用Eureka事件监听器来驱动服务列表的更新操作。
ServerListFilter
我们回到
UpdateAction
,看一下doUpdate
的具体调用方法,可以看到它调用了updateListOfServers
方法。1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
public void updateListOfServers() {
List<T> servers = new ArrayList<T>();
if (serverListImpl != null) {
servers = serverListImpl.getUpdatedListOfServers();
LOGGER.debug("List of Servers for {} obtained from Discovery client: {}",
getIdentifier(), servers);
if (filter != null) {
servers = filter.getFilteredListOfServers(servers);
LOGGER.debug("Filtered List of Servers for {} obtained from Discovery client: {}",
getIdentifier(), servers);
}
}
updateAllServerList(servers);
}可以看到用到了前面ServerList的
getUpdatedListOfServers
方法。用于从Eureka Server中获取服务可用实例列表。同时引入了一个新对象 filter,它是通过
ServerListFilter
进行定义的。ServerListFilter
接口只定义了一个方法,getFilteredListOfServers(List
,用于实现对服务实例列表按照过滤规则进行过滤。servers) 1
2
3
4
5public interface ServerListFilter<T extends Server> {
public List<T> getFilteredListOfServers(List<T> servers);
}它有四个具体实现及一个抽象实现。
AbstractServerListFilter
抽象过滤器,这里定一个一个重要依据对象
LoadBalancerStats
,用来存储关于负载均衡器的一些属性和统计信息。1
2
3
4
5
6
7
8
9
10
11
12
13public abstract class AbstractServerListFilter<T extends Server> implements ServerListFilter<T> {
private volatile LoadBalancerStats stats;
public void setLoadBalancerStats(LoadBalancerStats stats) {
this.stats = stats;
}
public LoadBalancerStats getLoadBalancerStats() {
return stats;
}
}ZoneAffinityServerListFilter
区域感知过滤器,该过滤器会基于“区域感知”方式实现服务过滤。也就是说它会根据提供服务的实例所处的区域(Zone)与消费者自身所处区域(Zone)进行比较,过滤掉不是同处于一个区域的实例。关键代码:
1
2
3
4
5
6
7
8
9
10
11
12
13
public List<T> getFilteredListOfServers(List<T> servers) {
if (zone != null && (zoneAffinity || zoneExclusive) && servers !=null && servers.size() > 0){
List<T> filteredServers = Lists.newArrayList(Iterables.filter(
servers, this.zoneAffinityPredicate.getServerOnlyPredicate()));
if (shouldEnableZoneAffinity(filteredServers)) {
return filteredServers;
} else if (zoneAffinity) {
overrideCounter.increment();
}
}
return servers;
}DefaultNIWSServerListFilter
该过滤器完全继承自
ZoneAffinityServerListFilter
,是默认的NIWS(Netflix Internal Web Service)过滤器。1
2public class DefaultNIWSServerListFilter<T extends Server> extends ZoneAffinityServerListFilter<T> {
}ServerListSubsetFilter
该过滤器也继承自
ZoneAffinityServerListFilter
,它非常适合用于拥有大规模服务器集群的系统。因为它可以产生一个“区域感知”结果的子集列表,同时它还能够通过比较服务实例的通信失败数量和并发连接数来判定该服务是否健康来选择性地从服务实例列表中剔除那些相对不够健康的实例。该过滤器实现主要有3步。获取“区域感知”的过滤结果,作为候选的服务实例清单。
从当前消费者维护的服务实例子集中剔除那些相对不够健康的实例(同时也将这些实例从候选清单中剔除,防止第三步的时候又被选入),不够健康的标准如下。
a. 服务实例的并发连接数超过客户端配置的值,默认0,配置参数为
. .ServerListSubsetFilter.eliminationConnectionThresold。 b. 服务实例的失败数超过客户端配置的值,默认0,配置参数为
. .ServerListSubsetFilter.eliminationFailureThresold。 c. 如果按符合上面任一规则的服务实例剔除后,剔除比例小于客户端默认配置的百分比,默认为0.1(10%),配置参数为
. .ServerListSubsetFilter.forceEliminationPercent,那么就先对剩下的实例列表进行健康排序,再从最不健康的实例进行剔除,直到达到配置剔除的百分比。 在完成剔除后,清单至少已经少了10%(默认值)的服务实例,最后通过随机的方式从候选清单中选出一批实例加入到清单中,以保持服务实例子集与原来的数量一致,默认实例子集数量为20,配置参数
. .ServerListSubsetFilter.size。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
public List<T> getFilteredListOfServers(List<T> servers) {
List<T> zoneAffinityFiltered = super.getFilteredListOfServers(servers);
Set<T> candidates = Sets.newHashSet(zoneAffinityFiltered);
Set<T> newSubSet = Sets.newHashSet(currentSubset);
LoadBalancerStats lbStats = getLoadBalancerStats();
for (T server: currentSubset) {
// this server is either down or out of service
if (!candidates.contains(server)) {
newSubSet.remove(server);
} else {
ServerStats stats = lbStats.getSingleServerStat(server);
// remove the servers that do not meet health criteria
if (stats.getActiveRequestsCount() > eliminationConnectionCountThreshold.get()
|| stats.getFailureCount() > eliminationFailureCountThreshold.get()) {
newSubSet.remove(server);
// also remove from the general pool to avoid selecting them again
candidates.remove(server);
}
}
}
int targetedListSize = sizeProp.get();
int numEliminated = currentSubset.size() - newSubSet.size();
int minElimination = (int) (targetedListSize * eliminationPercent.get());
int numToForceEliminate = 0;
if (targetedListSize < newSubSet.size()) {
// size is shrinking
numToForceEliminate = newSubSet.size() - targetedListSize;
} else if (minElimination > numEliminated) {
numToForceEliminate = minElimination - numEliminated;
}
if (numToForceEliminate > newSubSet.size()) {
numToForceEliminate = newSubSet.size();
}
if (numToForceEliminate > 0) {
List<T> sortedSubSet = Lists.newArrayList(newSubSet);
Collections.sort(sortedSubSet, this);
List<T> forceEliminated = sortedSubSet.subList(0, numToForceEliminate);
newSubSet.removeAll(forceEliminated);
candidates.removeAll(forceEliminated);
}
// after forced elimination or elimination of unhealthy instances,
// the size of the set may be less than the targeted size,
// then we just randomly add servers from the big pool
if (newSubSet.size() < targetedListSize) {
int numToChoose = targetedListSize - newSubSet.size();
candidates.removeAll(newSubSet);
if (numToChoose > candidates.size()) {
// Not enough healthy instances to choose, fallback to use the
// total server pool
candidates = Sets.newHashSet(zoneAffinityFiltered);
candidates.removeAll(newSubSet);
}
List<T> chosen = randomChoose(Lists.newArrayList(candidates), numToChoose);
for (T server: chosen) {
newSubSet.add(server);
}
}
currentSubset = newSubSet;
return Lists.newArrayList(newSubSet);
}ZonePreferenceServerListFilter
Spring Cloud整合是新增的过滤器。若使用Spring Cloud整合Eureka 和 Ribbon时会默认使用该过滤器。它实现通过配置或者Eureka实例元数据的所属区域(Zone)来过滤出同区域的服务实例。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
public List<Server> getFilteredListOfServers(List<Server> servers) {
List<Server> output = super.getFilteredListOfServers(servers);
if (this.zone != null && output.size() == servers.size()) {
List<Server> local = new ArrayList<>();
for (Server server : output) {
if (this.zone.equalsIgnoreCase(server.getZone())) {
local.add(server);
}
}
if (!local.isEmpty()) {
return local;
}
}
return output;
}
ZoneAwareLoadBalancer
ZoneAwareLoadBalancer
负载均衡器是对DynamicServerListLoadBalancer
的扩展。在DynamicServerListLoadBalancer
中,并没有重写选择具体服务实例的chooseServer
函数,所以它依然会采用BaseLoadBalancer
中实现的算法。使用RoundRobinRule
规则,以线性轮询的方式来选择调用的服务实例,该算法并没有区域(Zone)的概念。这样就会周期性的产生跨区域访问情况,由于跨区域有更高的延迟,这些区域实例主要以防止区域性故障来实现高可用的目的,而不能作为常规的访问性实例,所以在多区域部署的情况下会有一定的问题。
而ZoneAwareLoadBalancer
则可以避免这样的问题,我们来看下它是如何实现的。
我们可以看到它并没有重写setServersList
,而是重写了setServerListForZones
,我们先来看下DynamicServerListLoadBalancer
的部分代码。
1 |
|
可以看到setServersList
在最后调用了setServerListForZones
方法,而ZoneAwareLoadBalancer
重写了setServerListForZones
方法。
1 |
|
可以看到它创建了一个名为balancers
的ConcurrentHashMap
,用来存储每个Zone区域对应的负载均衡器。
再来看下它的chooseServer
方法,看它如何挑选实例。
1 |
|
由源码可知,只有当负载均衡器中维护的实例所属的Zone区域个数大于1的时候才会执行选择策略。可以看到当区域个数大于1时,使用的规则为ZoneAvoidanceRule
。
总结
通过本篇文章,我们大致了解了几种Ribbon客户端负载均衡器的一些特点。后面我们将对负载均衡的一些策略做相关整理及梳理。