为什么要先看注册中心,因为dubbo-0x01-源码阅读规划从可以看到Provider和Consumer都要跟注册中心交互,换言之先研究注册中心可以弄清楚交互枢纽以及弄明白到底注册了服务的什么维度信息。
从类图很清晰看出来这是模板方法,对于使用了模板方法的设计模式的就是在基类中看共性特征,在子类中看个性化实现。
1 注册中心定义
注册中心的抽象很简单,因为它的使用方有两类,Provider和Consumer,所以API声明了两类。
2 服务的本地缓存
2.1 registered缓存
1 2 3 4 5 6 7 8 9 10 11 12 13 14
|
@Override public void register(URL url) { if (url == null) { throw new IllegalArgumentException("register url == null"); } if (logger.isInfoEnabled()) { logger.info("Register: " + url); } registered.add(url); }
|
1 2 3 4 5 6 7 8 9 10 11 12 13 14
|
@Override public void unregister(URL url) { if (url == null) { throw new IllegalArgumentException("unregister url == null"); } if (logger.isInfoEnabled()) { logger.info("Unregister: " + url); } registered.remove(url); }
|
2.2 subscribed缓存
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22
|
@Override public void subscribe(URL url, NotifyListener listener) { if (url == null) { throw new IllegalArgumentException("subscribe url == null"); } if (listener == null) { throw new IllegalArgumentException("subscribe listener == null"); } if (logger.isInfoEnabled()) { logger.info("Subscribe: " + url); } Set<NotifyListener> listeners = subscribed.get(url); if (listeners == null) { subscribed.putIfAbsent(url, new ConcurrentHashSet<NotifyListener>()); listeners = subscribed.get(url); } listeners.add(listener); }
|
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20
|
@Override public void unsubscribe(URL url, NotifyListener listener) { if (url == null) { throw new IllegalArgumentException("unsubscribe url == null"); } if (listener == null) { throw new IllegalArgumentException("unsubscribe listener == null"); } if (logger.isInfoEnabled()) { logger.info("Unsubscribe: " + url); } Set<NotifyListener> listeners = subscribed.get(url); if (listeners != null) { listeners.remove(listener); } }
|
3 容错机制
注册中心可能面临的问题
面对这种情况,一般都会设计一些失败补偿机制,FailbackRegistry在AbstractRegistry的基础上扩展了失败后的安全重试。
实现方式就是定时任务扫描,将处理失败的请求缓存下来,定时任务周期性扫描缓存,将请求拿出来重新处理。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16
| public FailbackRegistry(URL url) { super(url); this.retryPeriod = url.getParameter(Constants.REGISTRY_RETRY_PERIOD_KEY, Constants.DEFAULT_REGISTRY_RETRY_PERIOD); this.retryFuture = retryExecutor.scheduleWithFixedDelay(new Runnable() { @Override public void run() { try { retry(); } catch (Throwable t) { logger.error("Unexpected error occur at failed retry, cause: " + t.getMessage(), t); } } }, retryPeriod, retryPeriod, TimeUnit.MILLISECONDS); }
|
模板方法留给子类去关注实现的就是下面这几个方法
1 2 3 4 5 6 7
| protected abstract void doRegister(URL url);
protected abstract void doUnregister(URL url);
protected abstract void doSubscribe(URL url, NotifyListener listener);
protected abstract void doUnsubscribe(URL url, NotifyListener listener);
|
目前为止,注册中心如图,具体的实现只要看一种就行,下面我就会看借助zk的实现
4 zk的实现
以注册为例,就是借助zk的临时节点和watch机制,所有这个时候比较有意义的是研究一下zk中node节点路径,也就是dubbo封装了URLdubbo-0x03-URL,看看这个结构。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
| protected void doRegister(URL url) { try {
this.zkClient.create(toUrlPath(url), url.getParameter(Constants.DYNAMIC_KEY, true)); } catch (Throwable e) { throw new RpcException("Failed to register " + url + " to zookeeper " + getUrl() + ", cause: " + e.getMessage(), e); } }
|