dubbo的總體架構(gòu)如圖所示:
服務(wù)注冊
對于服務(wù)提供方,它需要發(fā)布服務(wù),而且由于應(yīng)用系統(tǒng)的復(fù)雜性,服務(wù)的數(shù)量、類型也不斷膨脹;對于服務(wù)消費(fèi)方,它最關(guān)心如何獲取到它所需要的服務(wù),而面對復(fù)雜的應(yīng)用系統(tǒng),需要管理大量的服務(wù)調(diào)用。而且,對于服務(wù)提供方和服務(wù)消費(fèi)方來說,他們還有可能兼具這兩種角色,即既需要提供服務(wù),有需要消費(fèi)服務(wù)。
通過將服務(wù)統(tǒng)一管理起來,可以有效地優(yōu)化內(nèi)部應(yīng)用對服務(wù)發(fā)布/使用的流程和管理。服務(wù)注冊中心可以通過特定協(xié)議來完成服務(wù)對外的統(tǒng)一。Dubbo提供的注冊中心有如下幾種類型可供選擇:
- Multicast注冊中心
- Zookeeper注冊中心
- Redis注冊中心
- Simple注冊中心
服務(wù)首先暴露在服務(wù)端,然后調(diào)用Registry的register方法在注冊中心(它是一個服務(wù)協(xié)調(diào)中心,dubbo以外的獨(dú)立服務(wù)端,dubbo提供了客戶端實現(xiàn))注冊服務(wù),然后用戶通過配置文件中配置的service的url去subscribe(訂閱服務(wù)),Registry接收到訂閱消息后會往url對應(yīng)的的List<NotifyListener>中塞入當(dāng)前NotifyListener,反之從這個list中移除listener就是取消訂閱。registry會調(diào)用據(jù)consumer的訂閱情況調(diào)用notify方法推送服務(wù)列表給Consumer。這里我們以Zookeeper注冊中心來說明:
ZookeeperRegistry.java的構(gòu)造函數(shù),創(chuàng)建Zookeeper客戶端:
- public ZookeeperRegistry(URL url, ZookeeperTransporter zookeeperTransporter) {
- super(url);
- //如果provider的url是“0.0.0.0”或者在參數(shù)中帶anyHost=true則拋出異常注冊地址不存在
- if (url.isAnyHost()) {
- throw new IllegalStateException("registry address == null");
- }
- //服務(wù)分組(默認(rèn)“dubbo”)
- String group = url.getParameter(Constants.GROUP_KEY, DEFAULT_ROOT);
- //在group頭補(bǔ)齊“/”
- if (! group.startsWith(Constants.PATH_SEPARATOR)) {
- group = Constants.PATH_SEPARATOR + group;
- }
- //服務(wù)分組根地址
- this.root = group;
- //創(chuàng)建Zookeeper客戶端
- zkClient = zookeeperTransporter.connect(url);
- //添加狀態(tài)監(jiān)聽器
- zkClient.addStateListener(new StateListener() {
- public void stateChanged(int state) {
- if (state == RECONNECTED) {
- try {
- recover();
- } catch (Exception e) {
- logger.error(e.getMessage(), e);
- }
- }
- }
- });
- }
ZookeeperRegistry的doRegister方法:
- protected void doRegister(URL url) {
- try {
- //連接注冊中心注冊
- zkClient.create(toUrlPath(url), url.getParameter(Constants.DYNAMIC_KEY, true));
- } catch (Throwable e) {
- throw new RpcException("Failed to register " + url + " to zookeeper " + getUrl() + ", cause: " + e.getMessage(), e);
- }
- }
Provider初始化時會調(diào)用doRegister方法向注冊中心發(fā)起注冊。那么客戶端又是怎么subscribe在注冊中心訂閱服務(wù)的呢?答案是服務(wù)消費(fèi)者在初始化ConsumerConfig時會調(diào)用RegistryProtocol的refer方法進(jìn)一步調(diào)用RegistryDirectory的subscribe方法最終調(diào)用ZookeeperRegistry的subscribe方法向注冊中心訂閱服務(wù)。
com.alibaba.dubbo.registry.support.FailBackRegistry的subscribe方法:
- @Override
- public void subscribe(URL url, NotifyListener listener) {
- super.subscribe(url, listener);
- removeFailedSubscribed(url, listener);
- try {
- // 向服務(wù)器端發(fā)送訂閱請求
- doSubscribe(url, listener);
- } catch (Exception e) {
- Throwable t = e;
-
- List<URL> urls = getCacheUrls(url);
- if (urls != null && urls.size() > 0) {
- notify(url, listener, urls);
- logger.error("Failed to subscribe " + url + ", Using cached list: " + urls + " from cache file: " + getUrl().getParameter(Constants.FILE_KEY, System.getProperty("user.home") + "/dubbo-registry-" + url.getHost() + ".cache") + ", cause: " + t.getMessage(), t);
- } else {
- // 如果開啟了啟動時檢測,則直接拋出異常
- boolean check = getUrl().getParameter(Constants.CHECK_KEY, true)
- && url.getParameter(Constants.CHECK_KEY, true);
- boolean skipFailback = t instanceof SkipFailbackWrapperException;
- if (check || skipFailback) {
- if(skipFailback) {
- t = t.getCause();
- }
- throw new IllegalStateException("Failed to subscribe " + url + ", cause: " + t.getMessage(), t);
- } else {
- logger.error("Failed to subscribe " + url + ", waiting for retry, cause: " + t.getMessage(), t);
- }
- }
-
- // 將失敗的訂閱請求記錄到失敗列表,定時重試
- addFailedSubscribed(url, listener);
- }
- }
com.alibaba.dubbo.registry.zookeeper.Zookeeper的doSubscribe方法:
- protected void doSubscribe(final URL url, final NotifyListener listener) {
- try {
- //如果provider的service的接口配置的是“*”
- if (Constants.ANY_VALUE.equals(url.getServiceInterface())) {
- //獲取服務(wù)分組根路徑
- String root = toRootPath();
- //獲取服務(wù)的NotifyListener
- ConcurrentMap<NotifyListener, ChildListener> listeners = zkListeners.get(url);
- if (listeners == null) {
- //如果沒有則創(chuàng)建一個
- zkListeners.putIfAbsent(url, new ConcurrentHashMap<NotifyListener, ChildListener>());
- listeners = zkListeners.get(url);
- }
- ChildListener zkListener = listeners.get(listener);
- //如果沒有子監(jiān)聽器則創(chuàng)建一個
- if (zkListener == null) {
- listeners.putIfAbsent(listener, new ChildListener() {
- public void childChanged(String parentPath, List<String> currentChilds) {
- for (String child : currentChilds) {
- child = URL.decode(child);
- if (! anyServices.contains(child)) {
- anyServices.add(child);
- subscribe(url.setPath(child).addParameters(Constants.INTERFACE_KEY, child,
- Constants.CHECK_KEY, String.valueOf(false)), listener);
- }
- }
- }
- });
- zkListener = listeners.get(listener);
- }
- //向服務(wù)器訂閱服務(wù),注冊中心會調(diào)用NotifyListener的notify函數(shù)返回服務(wù)列表
- zkClient.create(root, false);
- //獲取服務(wù)地址列表
- List<String> services = zkClient.addChildListener(root, zkListener);
- if (services != null && services.size() > 0) {
- //如果存在服務(wù)
- for (String service : services) {
- service = URL.decode(service);
- anyServices.add(service);
- //如果serviceInterface是“*”則從分組根路徑遍歷service并訂閱所有服務(wù)
- subscribe(url.setPath(service).addParameters(Constants.INTERFACE_KEY, service,
- Constants.CHECK_KEY, String.valueOf(false)), listener);
- }
- }
- } else {
- //如果serviceInterface不是“*”則創(chuàng)建Zookeeper客戶端索取服務(wù)列表,并通知(notify)消費(fèi)者(consumer)這些服務(wù)可以用了
- List<URL> urls = new ArrayList<URL>();
- //獲取類似于http://xxx.xxx.xxx.xxx/context/com.service.xxxService/consumer的地址
- for (String path : toCategoriesPath(url)) {
- //獲取例如com.service.xxxService對應(yīng)的NotifyListener map
- ConcurrentMap<NotifyListener, ChildListener> listeners = zkListeners.get(url);
- if (listeners == null) {
- zkListeners.putIfAbsent(url, new ConcurrentHashMap<NotifyListener, ChildListener>());
- listeners = zkListeners.get(url);
- }
- //獲取ChildListener
- ChildListener zkListener = listeners.get(listener);
- if (zkListener == null) {
- listeners.putIfAbsent(listener, new ChildListener() {
- public void childChanged(String parentPath, List<String> currentChilds) {
- ZookeeperRegistry.this.notify(url, listener, toUrlsWithEmpty(url, parentPath, currentChilds));
- }
- });
- zkListener = listeners.get(listener);
- }
- //創(chuàng)建Zookeeper客戶端
- zkClient.create(path, false);
- List<String> children = zkClient.addChildListener(path, zkListener);
- if (children != null) {
- urls.addAll(toUrlsWithEmpty(url, path, children));
- }
- }
- //提醒消費(fèi)者
- notify(url, listener, urls);
- }
- } catch (Throwable e) {
- throw new RpcException("Failed to subscribe " + url + " to zookeeper " + getUrl() + ", cause: " + e.getMessage(), e);
- }
- }
至此,Dubbo的源碼解析結(jié)束,以后還會對一些細(xì)節(jié)進(jìn)行補(bǔ)充。特別在此建議看官使用apache的開源項目Zookeeper作注冊中心,來完成分布式服務(wù)的協(xié)調(diào)調(diào)用。
本站僅提供存儲服務(wù),所有內(nèi)容均由用戶發(fā)布,如發(fā)現(xiàn)有害或侵權(quán)內(nèi)容,請
點(diǎn)擊舉報。