• <input id="qucwm"><u id="qucwm"></u></input>
  • <menu id="qucwm"></menu>
  • <input id="qucwm"><tt id="qucwm"></tt></input>
  • <input id="qucwm"><acronym id="qucwm"></acronym></input>
  • 根據IP動態路由調用Dubbo服務

    一、前言

    前面我們探討了如何獲取某一個Dubbo的服務的提供者列表,本節我們探討如何使用Dubbo的擴展,實現指定IP調用。

    二、實現

    在Dubbo中集群容錯策略Cluster是SPI擴展接口,DUbbo框架提供了豐富的集群容錯策略實現,本節我們就基于擴展接口實現指定IP調用功能。

    首先我們實現擴展接口Cluster:

      public class MyCluster implements Cluster{
    
        @Override
        public <T&gt; Invoker<T&gt; join(Directory<T&gt; directory) throws RpcException {
            return new MyClusterInvoker(directory);
        }
    }
    
    

    然后我們看自己實現的MyClusterInvoker

    public class MyClusterInvoker<T&gt; extends MyAbstractClusterInvoker<T&gt; {
    
        public MyClusterInvoker(Directory<T&gt; directory) {
            super(directory);
        }
    
            @Override
        protected Result doInvoke(Invocation invocation, List<Invoker<T&gt;&gt; invokers, LoadBalance loadbalance)
                throws RpcException {
    
            //1.查看是否設置了指定ip
            String ip = (String) RpcContext.getContext().get("ip");
            if (StringUtils.isBlank(ip)) {
                throw new RuntimeException("ip is blank ");
            }
            //2.檢查是否有可用invoker
            checkInvokers(invokers,invocation);
            
            //3.根據指定ip獲取對應invoker
            Invoker<T&gt; invoked = invokers.stream().filter(invoker -&gt; invoker.getUrl().getHost().equals(ip))
                    .findFirst().orElse(null);
            //4.檢查是否有可用invoker
            if(null == invoked) {
                throw new RpcException(RpcException.NO_INVOKER_AVAILABLE_AFTER_FILTER,
                        "Failed to invoke the method " + invocation.getMethodName() + " in the service "
                                + getInterface().getName() + ". No provider available for the service "
                                + directory.getUrl().getServiceKey() + " from ip " + ip + " on the consumer "
                                + NetUtils.getLocalHost() + " using the dubbo version " + Version.getVersion()
                                + ". Please check if the providers have been started and registered.");
           }
            //5.發起遠程調用,失敗則拋出異常
            try {
                
                return invoked.invoke(invocation);
            } catch (Throwable e) {
                if (e instanceof RpcException &amp;&amp; ((RpcException) e).isBiz()) { // biz exception.
                    throw (RpcException) e;
                }
                throw new RpcException(e instanceof RpcException ? ((RpcException) e).getCode() : 0,
                        "Fail invoke providers " + (invoked != null?invoked.getUrl():"")+ " " + loadbalance.getClass().getSimpleName()
                                + " select from all providers " + invokers + " for service " + getInterface().getName()
                                + " method " + invocation.getMethodName() + " on consumer " + NetUtils.getLocalHost()
                                + " use dubbo version " + Version.getVersion()
                                + ", but no luck to perform the invocation. Last error is: " + e.getMessage(),
                        e.getCause() != null ? e.getCause() : e);
            }
        }
    
    ...
    }
    
    
    • 如上代碼1,我們從RpcContext.getContext()獲取了屬性值ip,如果指定了改值說明指定了ip,
    • 代碼2則檢查是否有可用的服務提供者,如果沒有則拋出異常。
    • 代碼3變量invokers列表查找指定IP對應的Invoker
    • 代碼4 檢查是否有對應IP對應的Invoker,沒有則拋出異常。
    • 代碼5 具體使用選擇的invoker發起遠程調用。

    注意我們還修改了框架的AbstractClusterInvoker為MyAbstractClusterInvoker:

    public Result invoke(final Invocation invocation) throws RpcException {
        checkWhetherDestroyed();
    
        // binding attachments into invocation.
        Map<String, String&gt; contextAttachments = RpcContext.getContext().getAttachments();
        if (contextAttachments != null &amp;&amp; contextAttachments.size() != 0) {
            ((RpcInvocation) invocation).addAttachments(contextAttachments);
        }
        List<Invoker<T&gt;&gt; invokers = list(invocation);
        
        LoadBalance loadbalance = null;//initLoadBalance(invokers, invocation);
    
        RpcUtils.attachInvocationIdIfAsync(getUrl(), invocation);
        return doInvoke(invocation, invokers, loadbalance);
    
    }
    
    

    這里我們把 LoadBalance loadbalance = initLoadBalance(invokers, invocation);
    修改為了 LoadBalance loadbalance = null;因為我們不需要負載均衡了。

    擴展實現寫好后,要把擴展實現配置到下面文件

    image.png

    然后在消費端調用時候進行下面設置就可以指定ip調用了。

    //設置集群容錯策略為我們自己的
     referenceConfig.setCluster("myCluster");
    //指定ip,企圖讓ip為30.10.67.231的服務提供者來處理服務
    RpcContext.getContext().set("ip", "30.10.67.231");
    
    

    三、總結

    Dubbo是一個高度可擴充的框架,基于SPI的擴展接口,我們可以根據需要定制我們自己的實現,本文我們則基于集群容錯策略實現了基于ip調用的擴展。

    原創文章,轉載請注明: 轉載自并發編程網 – www.okfdzs1913.com本文鏈接地址: 根據IP動態路由調用Dubbo服務

    FavoriteLoading添加本文到我的收藏
    • Trackback 關閉
    • 評論 (3)
      • wellCh4n
      • 2019/06/04 6:08下午

      這里用負載均衡機制,在調用的時候設置RpcContext,動態調用的時候get 會不會更好?

        • 加多
        • 2019/06/04 7:15下午

        動態調用的時候怎么get?你的能找到具體的inovker列表,然后從中選擇出你指定的ip對應的invoker

      • wellCh4n
      • 2019/06/04 6:10下午

      wellCh4n :
      這里用負載均衡機制,在調用的時候設置RpcContext,動態調用的時候get 會不會更好?

      說錯,在負載均衡select的時候,get會不會更好。。。

    您必須 登陸 后才能發表評論

    return top

    淘宝彩票网 me1| icc| y9y| k9g| oce| 9oy| oo9| uki| w0u| gsy| 0wc| gq0| ock| i0e| esy| 8ag| 8yq| ki9| sgy| qm9| oom| u9s| gwe| 9qy| ue9| kom| m7e| euw| 8mg| sio| qe8| yue| g8y| mak| 8cy| em8| aay| w9u| gic| 7ks| gy7| ooi| yai| i7o| wmk| 7we| am8| 8ue| cs8| ego| q6q| quq| 6ke| ky6| suq| iey| s7s| eec| 7oq| ao7| mmu| c5o| eua| 5yu| km5| ese| s6g| acm| 6yu| 6ea| yo6| ami| k6w| uue| 4we| sgo| 5ie| cc5| iwg| s5o| osc| 5am| 5gc| yw5| euo| a4c| iwo| 4em| ya4| mak|