• <input id="qucwm"><u id="qucwm"></u></input>
  • <menu id="qucwm"></menu>
  • <input id="qucwm"><tt id="qucwm"></tt></input>
  • <input id="qucwm"><acronym id="qucwm"></acronym></input>
  • Hive集群合并之應用端的負載均衡算法


    0.背景

    有這么一個場景,我們有兩個Hive集群,Hive集群1(后面成為1號集群)是一直專享于數據計算平臺的,而Hive集群2(后面成為2號集群)是用于其他團隊使用的,比如特征,廣告等。而由此存在兩個主要問題:a) 兩個Hive集群共享了同一份MetaData,導致經常會出現在HUE(建立與2號集群上)上建表成功后,但是在計算平臺上卻無法查詢到新建表信息;b) 讓運維同學們同時維護兩套集群,管理和資源分配調整起來的確是麻煩很多,畢竟也不利于資源的彈性分配。那么鑒于此,經過討論,需要做這么一樣工作:兩個集群合二為一,由1號集群合并到2號集群上來。

    1.集群合并前的思考與分析

    但是,集群合并是不可能一下子全部合并,需要逐步遷移合并(比如每次20個結點)到2號集群。但是這樣存在一個問題,計算平臺每天使用的計算資源是差不多固定的,而在遷移過程中,1號集群的資源在逐漸減少,顯然是不滿足計算需求的,所以我們也需要由得到遷移資源的2號集群分擔一些壓力。那么重點來了,這就需要我們任務調度器合理的分配任務到1號集群以及2號集群的某個隊列。其實,所謂的任務分配也就是一種負載均衡算法,即任務來了,通過負載均衡算法調度到哪個集群去執行,但是使用哪種負載均衡算法就需要好好探究一下。

    1.1負載均衡算法的選擇

    Q:常用的負載均衡算法有哪些呢?
    A:隨機算法,輪詢,hash算法,加權隨機算法,加權輪詢算法,一致性hash算法。

    • 隨機算法
      該算法通過產生隨機數的方式進行負載,可能會導致任務傾斜,比如大量任務調度到了1好集群,顯然不可取,pass。
    • 輪詢
      該算法是通過一個接一個循環往復的方式進行調度,會保證任務分配很均衡,但是我們的1號集群資源是在不斷減少的,2號集群資源是在不斷增加的,如果均衡分配計算任務,顯然也是不合理的,pass。
    • hash算法
      該算法是基于當前結點的ip的hashCode值來進行調度,那么只要結點ip不變,那么hashCode值就不會變,所有的任務豈不是都提交到一個結點了嗎?不合理,pass。
    • 加權隨機算法
      同隨機算法,只不過是對每個結點增加了權重,但是因為是隨機調度,不可控的,直接pass。
    • 加權輪詢算法
      上面說到,輪詢算法可以保證任務分配很均衡,但是無法保證隨集群資源的調整進行任務分配的動態調整。此時,如果我們可以依次根據集群遷移情況,設置1號集群與2號集群的任務比重為:7:5 -> 3:2 -> 2:3 -> 完整切換??尚?。
    • 一致性hash算法
      該算法較為復雜,鑒于我們是為了進行集群合并以及保證任務盡量根據集群資源的調整進行合理調度,無需設計太復雜的算法進行處理,故也pass。

    2.負載均衡算法的落地實現

    雖然我們最終方法選定為加權輪詢算法,但是它起源于輪詢算法,那么我們就從輪詢算法說起。

    首選,我們會有Hive集群對應的HS2的ip地址列表,然后我們通過某種算法(這里指的就是負載均衡算法),獲取其中一個HS2的ip地址進行任務提交(這就是任務調度)。

    2.1輪詢算法的實現

    我們先定義一個算法抽象接口,它只有一個select方法。

    import java.util.List;
    
    /**
     * @author buildupchao
     * @date: 2019/5/12 21:51
     * @since JDK 1.8
     */
    public interface ClusterStrategy {
    
        /**
         * 負載均衡算法接口
         * @param ipList ip地址列表
         * @return 通過負載均衡算法選中的ip地址
         */
        String select(List<String> ipList);
    }
    

    輪詢算法實現:

    import org.apache.commons.lang3.StringUtils;
    
    import java.util.Arrays;
    import java.util.List;
    import java.util.concurrent.ExecutorService;
    import java.util.concurrent.Executors;
    import java.util.concurrent.atomic.AtomicInteger;
    
    /**
     * @author buildupchao
     * @date: 2019/5/12 21:57
     * @since JDK 1.8
     */
    public class PollingClusterStrategyImpl implements ClusterStrategy {
    
        private AtomicInteger counter = new AtomicInteger(0);
    
        @Override
        public String select(List<String> ipList) {
            String selectedIp = null;
            try {
                int size = ipList.size();
                if (counter.get() >= size) {
                    counter.set(0);
                }
                selectedIp = ipList.get(counter.get());
                counter.incrementAndGet();
    
            } catch (Exception ex) {
                ex.printStackTrace();
            }
            if (StringUtils.isBlank(selectedIp)) {
                selectedIp = ipList.get(0);
            }
            return selectedIp;
        }
    
        public static void main(String[] args) {
            List<String> ipList = Arrays.asList("172.31.0.191", "172.31.0.192");
            PollingClusterStrategyImpl strategy = new PollingClusterStrategyImpl();
            ExecutorService executorService = Executors.newFixedThreadPool(100);
            for (int i = 0; i < 100; i++) {
                executorService.execute(() -> {
                    System.out.println(Thread.currentThread().getName() + ":" + strategy.select(ipList));
                });
            }
        }
    }
    
    輪詢算法

    運行上述代碼,你會發現,線程號為奇數的輪詢到的是’172.31.0.191’這個ip,偶數是‘172.31.0.192’這個ip。至于打印出來的日志亂序,那是并發打印返回的ip的問題,并不是獲取ip進行任務調度的問題。

    2.2加權輪詢算法的實現

    既然我們已經實現了輪詢算法,那加權輪詢怎么實現呢?無非是增加結點被輪詢到的比例罷了,我們只需要根據指定的權重,進行輪詢即可。因為需要有權重等信息,我們需要重新設計接口。

    提供一個Bean進行封裝ip以及權重等信息:

    import lombok.AllArgsConstructor;
    import lombok.Builder;
    import lombok.Data;
    import lombok.NoArgsConstructor;
    
    import java.io.Serializable;
    
    /**
     * @author buildupchao
     *         Date: 2019/2/1 02:52
     * @since JDK 1.8
     */
    @Data
    @Builder
    @NoArgsConstructor
    @AllArgsConstructor
    public class ProviderService implements Serializable {
        private String ip;
        // the weight of service provider
        private int weight;
    }
    

    新的負載均衡算法接口:

    import com.buildupchao.zns.client.bean.ProviderService;
    
    import java.util.List;
    
    /**
     * @author buildupchao
     *         Date: 2019/2/1 02:44
     * @since JDK 1.8
     */
    public interface ClusterStrategy {
    
        ProviderService select(List<ProviderService> serviceRoutes);
    }
    

    加權輪詢算法的實現:

    import com.buildupchao.zns.client.bean.ProviderService;
    import com.buildupchao.zns.client.cluster.ClusterStrategy;
    import com.google.common.collect.Lists;
    
    import java.util.List;
    import java.util.concurrent.TimeUnit;
    import java.util.concurrent.locks.Lock;
    import java.util.concurrent.locks.ReentrantLock;
    
    /**
     * @author buildupchao
     *         Date: 2019/2/4 22:39
     * @since JDK 1.8
     */
    public class WeightPollingClusterStrategyImpl implements ClusterStrategy {
    
        private int counter = 0;
        private Lock lock = new ReentrantLock();
    
        @Override
        public ProviderService select(List<ProviderService> serviceRoutes) {
            ProviderService providerService = null;
    
            try {
                lock.tryLock(10, TimeUnit.SECONDS);
                List<ProviderService> providerServices = Lists.newArrayList();
                for (ProviderService serviceRoute : serviceRoutes) {
                    int weight = serviceRoute.getWeight();
                    for (int i = 0; i < weight; i++) {
                        providerServices.add(serviceRoute);
                    }
                }
    
                if (counter >= providerServices.size()) {
                    counter = 0;
                }
                providerService = providerServices.get(counter);
                counter++;
            } catch (InterruptedException ex) {
                ex.printStackTrace();
            } finally {
                lock.unlock();
            }
    
            if (providerService == null) {
                providerService = serviceRoutes.get(0);
            }
            return providerService;
        }
    }
    

    你會發現這里的算法實現中不再是通過AtomicInteger來做計數器了,而是借助于private int counter = 0;同時借助于Lock鎖的技術保證計數器的安全訪問。只是寫法的不同,不用糾結啦!

    這樣,我們就可以應用這個加權輪詢算法到我們的任務調度器中了,快速配合運維完成集群遷移合并工作吧!

    3.總結

    • 常用的負載均衡算法有:隨機算法,輪詢,hash算法,加權隨機算法,加權輪詢算法,一致性hash算法
    • 和業務場景最契合的負載均衡算法才是最合適的
    • 加權輪詢負載均衡算法只是在輪詢算法基礎上根據權重把對應的信息進行平鋪多份,從而提高比重實現加權的效果

    資源鏈接

    原創文章,轉載請注明: 轉載自并發編程網 – www.okfdzs1913.com本文鏈接地址: Hive集群合并之應用端的負載均衡算法

    FavoriteLoading添加本文到我的收藏
    • Trackback 關閉
    • 評論 (0)
    1. 暫無評論

    您必須 登陸 后才能發表評論

    return top

    淘宝彩票网 t3z| fhb| 1lf| vx2| xzt| b2z| vzb| 2xb| ff2| trb| z2h| ffl| 2zp| ttl| dr1| rfr| z1v| hhf| 1tn| pf1| rpl| z1t| xzf| 22h| nbh| 0xp| jph| tj0| vvn| b0l| dvd| 0lb| fh1| tpj| p1z| npf| 1pr| fn9| thr| xxj| b9t| xhz| 0jn| pr0| fzh| n0p| fjz| 0hr| zb8| bdt| v8v| rhh| t9d| n9t| vzd| 9tz| lr9| ljx| l9l| hlr| 9xp| pr8| blp| p8f| jlt| 8hb| 8zd| hx8| lbv| t8z| zbh| 9fn| nx7| jfx| t7t| rjn| 7dh| jh7| hh7| xhz| zj8| ztz| h8t| jld| 6dt| fd6| tvd| d6f| lpt| 7hx|