`

ZooKeeper的锁、同步和队列分析

阅读更多

集群管理结构图

 


清单 3. Leader Election 关键代码

                               
 void findLeader() throws InterruptedException { 
        byte[] leader = null; 
        try { 
            leader = zk.getData(root + "/leader", true, null); 
        } catch (Exception e) { 
            logger.error(e); 
        } 
        if (leader != null) { 
            following(); 
        } else { 
            String newLeader = null; 
            try { 
                byte[] localhost = InetAddress.getLocalHost().getAddress(); 
                newLeader = zk.create(root + "/leader", localhost, 
                ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL); 
            } catch (Exception e) { 
                logger.error(e); 
            } 
            if (newLeader != null) { 
                leading(); 
            } else { 
                mutex.wait(); 
            } 
        } 
    } 

 

共享锁(Locks)

共享锁在同一个进程中很容易实现,但是在跨进程或者在不同 Server 之间就不好实现了。Zookeeper 却很容易实现这个功能,实现方式也是需要获得锁的 Server 创建一个 EPHEMERAL_SEQUENTIAL 目录节点,然后调用 getChildren方法获取当前的目录节点列表中最小的目录节点是不是就是自己创建的目录节点,如果正是自己创建的,那么它就获得了这个锁,如果不是那么它就调用 exists(String path, boolean watch) 方法并监控 Zookeeper 上目录节点列表的变化,一直到自己创建的节点是列表中最小编号的目录节点,从而获得锁,释放锁很简单,只要删除前面它自己所创建的目录节点就行了。

  Zookeeper 实现 Locks 的流程图

同步锁的实现代码如下,完整的代码请看源代码:


清单 4. 同步锁的关键代码

                               
 void getLock() throws KeeperException, InterruptedException{ 
        List<String> list = zk.getChildren(root, false); 
        String[] nodes = list.toArray(new String[list.size()]); 
        Arrays.sort(nodes); 
        if(myZnode.equals(root+"/"+nodes[0])){ 
            doAction(); 
        } 
        else{ 
            waitForLock(nodes[0]); 
        } 
    } 
    void waitForLock(String lower) throws InterruptedException, KeeperException {
        Stat stat = zk.exists(root + "/" + lower,true); 
        if(stat != null){ 
            mutex.wait(); 
        } 
        else{ 
            getLock(); 
        } 
    } 

 

队列管理

Zookeeper 可以处理两种类型的队列:

  1. 当一个队列的成员都聚齐时,这个队列才可用,否则一直等待所有成员到达,这种是同步队列。
  2. 队列按照 FIFO 方式进行入队和出队操作,例如实现生产者和消费者模型。

同步队列用 Zookeeper 实现的实现思路如下:

创建一个父目录 /synchronizing,每个成员都监控标志(Set Watch)位目录 /synchronizing/start 是否存在,然后每个成员都加入这个队列,加入队列的方式就是创建 /synchronizing/member_i 的临时目录节点,然后每个成员获取 / synchronizing 目录的所有目录节点,也就是 member_i。判断 i 的值是否已经是成员的个数,如果小于成员个数等待 /synchronizing/start 的出现,如果已经相等就创建 /synchronizing/start。

用下面的流程图更容易理解:

同步队列流程图

同步队列的关键代码如下,完整的代码请看附件:


清单 5. 同步队列

                               
 void addQueue() throws KeeperException, InterruptedException{ 
        zk.exists(root + "/start",true); 
        zk.create(root + "/" + name, new byte[0], Ids.OPEN_ACL_UNSAFE, 
        CreateMode.EPHEMERAL_SEQUENTIAL); 
        synchronized (mutex) { 
            List<String> list = zk.getChildren(root, false); 
            if (list.size() < size) { 
                mutex.wait(); 
            } else { 
                zk.create(root + "/start", new byte[0], Ids.OPEN_ACL_UNSAFE,
                 CreateMode.PERSISTENT); 
            } 
        } 
 } 

 

当队列没满是进入 wait(),然后会一直等待 Watch 的通知,Watch 的代码如下:

 public void process(WatchedEvent event) { 
        if(event.getPath().equals(root + "/start") &&
         event.getType() == Event.EventType.NodeCreated){ 
            System.out.println("得到通知"); 
            super.process(event); 
            doAction(); 
        } 
    } 

 

FIFO 队列用 Zookeeper 实现思路如下:

实现的思路也非常简单,就是在特定的目录下创建 SEQUENTIAL 类型的子目录 /queue_i,这样就能保证所有成员加入队列时都是有编号的,出队列时通过 getChildren( ) 方法可以返回当前所有的队列中的元素,然后消费其中最小的一个,这样就能保证 FIFO

下面是生产者和消费者这种队列形式的示例代码,完整的代码请看附件:


清单 6. 生产者代码

                               
 boolean produce(int i) throws KeeperException, InterruptedException{ 
        ByteBuffer b = ByteBuffer.allocate(4); 
        byte[] value; 
        b.putInt(i); 
        value = b.array(); 
        zk.create(root + "/element", value, ZooDefs.Ids.OPEN_ACL_UNSAFE, 
                    CreateMode.PERSISTENT_SEQUENTIAL); 
        return true; 
    } 



清单 7. 消费者代码

                               
 int consume() throws KeeperException, InterruptedException{ 
        int retvalue = -1; 
        Stat stat = null; 
        while (true) { 
            synchronized (mutex) { 
                List<String> list = zk.getChildren(root, true); 
                if (list.size() == 0) { 
                    mutex.wait(); 
                } else { 
                    Integer min = new Integer(list.get(0).substring(7)); 
                    for(String s : list){ 
                        Integer tempValue = new Integer(s.substring(7)); 
                        if(tempValue < min) min = tempValue; 
                    } 
                    byte[] b = zk.getData(root + "/element" + min,false, stat); 
                    zk.delete(root + "/element" + min, 0); 
                    ByteBuffer buffer = ByteBuffer.wrap(b); 
                    retvalue = buffer.getInt(); 
                    return retvalue; 
                } 
            } 
        } 
 } 

 

总结

Zookeeper 作为 Hadoop 项目中的一个子项目,是 Hadoop 集群管理的一个必不可少的模块,它主要用来控制集群中的数据,如它管理 Hadoop 集群中的 NameNode,还有 Hbase 中 Master Election、Server 之间状态同步等。

分享到:
评论

相关推荐

    zookeeper 经典应用设计 示例代码

    zookeeper 经典应用设计 锁、同步和队列分析

    zookeeper-3.4.11.tar.gz

    ZooKeeper是一个分布式的,开放源码的分布式应用程序协调...ZooKeeper代码版本中,提供了分布式独享锁、选举、队列的接口,代码在zookeeper-3.4.3\src\recipes。其中分布锁和队列有Java和C两个版本,选举只有Java版本。

    Zookeeper篇.pdf

    1.9 zk 的分布式锁 2.0 zk 队列管理 2.1 zk 数据复制 2.2 zk 的工作原理 2.3 zk 是如何保证事物的顺序一致性 2.4 zk 集群下 server 工作状态 2.5 zk 是如何选举 Leader 的? 2.6 zk 同步流程 2.7 分布式通知和协调 ...

    zookeeper-3.3.6.rar

    ZooKeeper是一个分布式的,开放源码的分布式应用程序协调...ZooKeeper代码版本中,提供了分布式独享锁、选举、队列的接口,代码在zookeeper-3.4.3\src\recipes。其中分布锁和队列有Java和C两个版本,选举只有Java版本。

    apache-zookeeper-3.5.10-bin 环境搭配

    apache-zookeeper-3.5.10-bin 环境搭配 ...ZooKeeper代码版本中,提供了分布式独享锁、选举、队列的接口,代码在$zookeeper_home\src\recipes。其中分布锁和队列有Java和C两个版本,选举只有Java版本。

    zookeeper-3.4.12.tar.zip

    ZooKeeper是一个分布式的,开放源码的分布式应用程序协调服务,是Google的Chubby一个开源的实现,是Hadoop和Hbase的重要组件。它是一个为分布式应用提供一致...其中分布锁和队列有Java和C两个版本,选举只有Java版本。

    zookeeper-3.4.5-cdh5.12.0

    ZooKeeper是一个分布式的,开放源码的...其中分布锁和队列有Java和C两个版本,选举只有Java版本。 cdh5.12.0是Cloudera公司发布的最新release版本,与apache版本3.4.5版本对应。 安装配置简单,支持单机、集群模式

    zookeeper-3.4.12

    ZooKeeper是一个分布式的,开放源码的分布式应用程序协调...ZooKeeper代码版本中,提供了分布式独享锁、选举、队列的接口,代码在zookeeper-3.4.3\src\recipes。其中分布锁和队列有Java和C两个版本,选举只有Java版本。

    zookeeper-3.4.5

    ZooKeeper是一个分布式的,开放源码的分布式应用程序协调...ZooKeeper代码版本中,提供了分布式独享锁、选举、队列的接口,代码在zookeeper-3.4.3\src\recipes。其中分布锁和队列有Java和C两个版本,选举只有Java版本。

    zookeeper面试专题.pdf

    12.Zookeeper 队列管理(文件系统、通知 机制) 13.Zookeeper 数据复制 14.Zookeeper 工作原理 15.zookeeper 是如何保证事务的顺序一致性 的? 16.Zookeeper 下 Server 工作状态 17.zookeeper 是如何选取主 leader ...

    zookeeper-3.4.13.rar

    ZooKeeper是一个分布式的,开放源码的分布式应用程序协调...ZooKeeper代码版本中,提供了分布式独享锁、选举、队列的接口,代码在zookeeper-3.4.3\src\recipes。其中分布锁和队列有Java和C两个版本,选举只有Java版本。

    dubbo注册中心 zookeeper-3.4.8

    ZooKeeper是一个分布式的,开放源码的分布式应用程序协调...ZooKeeper代码版本中,提供了分布式独享锁、选举、队列的接口,代码在zookeeper-3.4.3\src\recipes。其中分布锁和队列有Java和C两个版本,选举只有Java版本。

    最详细Zookeeper学习资料(源码)

    分布式同步:ZooKeeper提供了分布式锁和顺序节点等特性,可以帮助开发者实现复杂的分布式同步机制。 组服务:ZooKeeper支持创建临时节点,可以用于实现分布式队列、成员管理和领导者选举等功能。 ZooKeeper被广泛...

    zookeeper-3.4.6.tar.gz

    zookeeper-3.4.6.tar.gz ZooKeeper是一个分布式的,开放源码的分布式应用程序协调服务,是Google的Chubby一个开源的实现,是Hadoop和Hbase的重要组件。...ZooKeeper代码版本中,提供了分布式独享锁、选举、队列的接口

    知识领域:JAVA 技术关键词:Dubbo 内容关键词:ZooKeeper 用途:分布式系统的可靠协调系统

    -- 此rar文件包含两个ZooKeeper版本 一个3.4.6、一个... ZooKeeper代码版本中,提供了分布式独享锁、选举、队列的接口,代码在zookeeper-3.4.3\src\recipes。其中分布锁和队列有Java和C两个版本,选举只有Java版本。

    kafka_2.11-2.0.0.zip (包含zookeeper)

    Kafka构建在ZooKeeper同步服务之上。 它与Apache Storm和Spark非常好地集成,用于实时流式数据分析。 Kafka专为分布式高吞吐量系统而设计。 Kafka往往工作得很好,作为一个更传统的消息代理的替代品。 与其他消息...

    分布式服务框架Zookeeper

    Zookeeper 分布式服务框架是ApacheHadoop...Zookeeper的安装和配置文件中各个配置项的意义,以及分析Zookeeper的典型的应用场景(配置文件的管理、集群管理、同步锁、Leader 选举、队列管理等),用Java实现它们并给出

    netty同步传输demo

    采用队列等待实现netty客户端同步调用,zookeeper管理分布式服务端。

    2024年java面试题-Kafka面试题

    Apache Kafka是一个分布式发布... Kafka构建在ZooKeeper同步服务之上. 它与Apache Storm 和Spark非常好地集成, 用于实时流式数据分析. 本资料主要在于介绍kafka相关的知识点,在实际工作与面试中会有一定的帮助的!!!

Global site tag (gtag.js) - Google Analytics