dubbo-0x02-注册中心

为什么要先看注册中心,因为dubbo-0x01-源码阅读规划从可以看到Provider和Consumer都要跟注册中心交互,换言之先研究注册中心可以弄清楚交互枢纽以及弄明白到底注册了服务的什么维度信息。

从类图很清晰看出来这是模板方法,对于使用了模板方法的设计模式的就是在基类中看共性特征,在子类中看个性化实现。

1 注册中心定义

注册中心的抽象很简单,因为它的使用方有两类,Provider和Consumer,所以API声明了两类。

  • 注册
  • 注销
  • 订阅
  • 取消订阅

2 服务的本地缓存

2.1 registered缓存

1
2
3
4
5
6
7
8
9
10
11
12
13
14
/**
* 写到缓存{@link AbstractRegistry#registered}
*/
@Override
public void register(URL url) {
if (url == null) {
throw new IllegalArgumentException("register url == null");
}
if (logger.isInfoEnabled()) {
logger.info("Register: " + url);
}
// 入缓存
registered.add(url);
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
/**
* 从缓存{@link AbstractRegistry#registered}中移除
*/
@Override
public void unregister(URL url) {
if (url == null) {
throw new IllegalArgumentException("unregister url == null");
}
if (logger.isInfoEnabled()) {
logger.info("Unregister: " + url);
}
// 移除缓存
registered.remove(url);
}

2.2 subscribed缓存

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
/**
* 写到缓存{@link AbstractRegistry#subscribed}
*/
@Override
public void subscribe(URL url, NotifyListener listener) {
if (url == null) {
throw new IllegalArgumentException("subscribe url == null");
}
if (listener == null) {
throw new IllegalArgumentException("subscribe listener == null");
}
if (logger.isInfoEnabled()) {
logger.info("Subscribe: " + url);
}
Set<NotifyListener> listeners = subscribed.get(url);
if (listeners == null) {
// 入缓存
subscribed.putIfAbsent(url, new ConcurrentHashSet<NotifyListener>());
listeners = subscribed.get(url);
}
listeners.add(listener);
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
/**
* 从缓存{@link AbstractRegistry#subscribed}中移除
*/
@Override
public void unsubscribe(URL url, NotifyListener listener) {
if (url == null) {
throw new IllegalArgumentException("unsubscribe url == null");
}
if (listener == null) {
throw new IllegalArgumentException("unsubscribe listener == null");
}
if (logger.isInfoEnabled()) {
logger.info("Unsubscribe: " + url);
}
Set<NotifyListener> listeners = subscribed.get(url);
if (listeners != null) {
// 删除缓存
listeners.remove(listener);
}
}

3 容错机制

注册中心可能面临的问题

  • 网络不可用
  • 注册中心重启
  • 注册中心读写请求过大

面对这种情况,一般都会设计一些失败补偿机制,FailbackRegistry在AbstractRegistry的基础上扩展了失败后的安全重试。

实现方式就是定时任务扫描,将处理失败的请求缓存下来,定时任务周期性扫描缓存,将请求拿出来重新处理。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
public FailbackRegistry(URL url) {
super(url);
this.retryPeriod = url.getParameter(Constants.REGISTRY_RETRY_PERIOD_KEY, Constants.DEFAULT_REGISTRY_RETRY_PERIOD);
// 构造方法启动线程池启动定时任务去对失败的请求进行安全地补偿
this.retryFuture = retryExecutor.scheduleWithFixedDelay(new Runnable() {
@Override
public void run() {
// Check and connect to the registry
try {
retry();
} catch (Throwable t) { // Defensive fault tolerance
logger.error("Unexpected error occur at failed retry, cause: " + t.getMessage(), t);
}
}
}, retryPeriod, retryPeriod, TimeUnit.MILLISECONDS);
}

模板方法留给子类去关注实现的就是下面这几个方法

1
2
3
4
5
6
7
protected abstract void doRegister(URL url);

protected abstract void doUnregister(URL url);

protected abstract void doSubscribe(URL url, NotifyListener listener);

protected abstract void doUnsubscribe(URL url, NotifyListener listener);

目前为止,注册中心如图,具体的实现只要看一种就行,下面我就会看借助zk的实现

4 zk的实现

以注册为例,就是借助zk的临时节点和watch机制,所有这个时候比较有意义的是研究一下zk中node节点路径,也就是dubbo封装了URLdubbo-0x03-URL,看看这个结构。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
protected void doRegister(URL url) {
try {
/**
* zk读写操作
* - 创建节点
* - path
* - /dubbo/com.alibaba.dubbo.demo.DemoService/providers/dubbo%3A%2F%2F192.168.0.11%3A20880%2Fcom.alibaba.dubbo.demo.DemoService%3Fanyhost%3Dtrue%26application%3Dnative-provider%26dubbo%3D2.0.2%26generic%3Dfalse%26interface%3Dcom.alibaba.dubbo.demo.DemoService%26methods%3DsayHello%26pid%3D41099%26side%3Dprovider%26timestamp%3D1669562090672
* - 永久/临时
* - 默认临时节点
*/
this.zkClient.create(toUrlPath(url), url.getParameter(Constants.DYNAMIC_KEY, true));
} catch (Throwable e) {
throw new RpcException("Failed to register " + url + " to zookeeper " + getUrl() + ", cause: " + e.getMessage(), e);
}
}

dubbo-0x02-注册中心
https://bannirui.github.io/2025/05/13/dubbo/dubbo-0x02-注册中心/
作者
dingrui
发布于
2025年5月13日
许可协议