最码农 最码农
  • 首页
  • 动态广场
  • 精选栏目
  • 闲言碎语
  • 左邻右里
  • 注册
  • 登录
首页 › 大数据 › Zookeeper API应用

Zookeeper API应用

Cosy
1年前大数据阅读 486

目录

Zookeeper

IDEA环境搭建

1)创建一个Maven Module

2)添加pom文件


<dependencies>
		<dependency>
			<groupId>junit</groupId>
			<artifactId>junit</artifactId>
			<version>RELEASE</version>
		</dependency>
		<dependency>
			<groupId>org.apache.logging.log4j</groupId>
			<artifactId>log4j-core</artifactId>
			<version>2.8.2</version>
		</dependency>
		<dependency>
			<groupId>org.apache.zookeeper</groupId>
			<artifactId>zookeeper</artifactId>
			<version>3.5.7</version>
		</dependency>
</dependencies>

3)拷贝log4j.properties文件到项目根目录

需要在项目的src/main/resources目录下,新建一个文件,命名为“log4j.properties”,在文件中填入。


log4j.rootLogger=INFO, stdout
log4j.appender.stdout=org.apache.log4j.ConsoleAppender
log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
log4j.appender.stdout.layout.ConversionPattern=%d %p [%c] - %m%n
log4j.appender.logfile=org.apache.log4j.FileAppender
log4j.appender.logfile.File=target/spring.log
log4j.appender.logfile.layout=org.apache.log4j.PatternLayout
log4j.appender.logfile.layout.ConversionPattern=%d %p [%c] - %m%n

初始化ZooKeeper客户端


public class Zookeeper {

    private String connectString;
    private int sessionTimeout;
    private ZooKeeper zkClient;

    @Before // 获取客户端对象
    public void init() throws IOException {

        connectString = "hadoop102:2181,hadoop103:2181,hadoop104:2181";
        int sessionTimeout = 10000;

        // 参数解读 1集群连接字符串 2连接超时时间 单位:毫秒 3当前客户端默认的监控器
        zkClient = new ZooKeeper(connectString, sessionTimeout, new Watcher() {
            @Override
            public void process(WatchedEvent event) {
            }
        });
    }

    @After // 关闭客户端对象
    public void close() throws InterruptedException {
        zkClient.close();
    }
}

获取子节点列表,不监听


@Test
public void lsAndWatch() throws KeeperException, InterruptedException {
    List<String> children = zkClient.getChildren("/hadoop", new Watcher() {
        @Override
        public void process(WatchedEvent event) {
            System.out.println(event);
        }
    });
    System.out.println(children);
    //因为设置了监听,所以当前线程不能结束
    Thread.sleep(Long.MAX_VALUE);
}

创建子节点


@Test
public void create() throws KeeperException, InterruptedException {
    //参数解读 1节点路径  2节点存储的数据  
    //3节点的权限(使用Ids选个OPEN即可) 4节点类型 短暂 持久 短暂带序号 持久带序号
    String path = zkClient.create("/hadoop", "shanguigu".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
    //创建临时节点
    //String path = zkClient.create("/hadoop2", "shanguigu".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL);
    System.out.println(path);
    //创建临时节点的话,需要线程阻塞
    //Thread.sleep(10000);
}

判断Znode是否存在


@Test
public void exist() throws Exception {
    Stat stat = zkClient.exists("/hadoop", false);
	  System.out.println(stat == null ? "not exist" : "exist");
}

获取子节点存储的数据,不监听


@Test
public void get() throws KeeperException, InterruptedException {
    //判断节点是否存在
    Stat stat = zkClient.exists("/hadoop", false);
    if (stat == null) {
        System.out.println("节点不存在...");
        return;
    }
    byte[] data = zkClient.getData("/hadoop", false, stat);
    System.out.println(new String(data));
}

获取子节点存储的数据,并监听


@Test
public void getAndWatch() throws KeeperException, InterruptedException {
    //判断节点是否存在
    Stat stat = zkClient.exists("/hadoop", false);
    if (stat == null) {
        System.out.println("节点不存在...");
        return;
    }

    byte[] data = zkClient.getData("/hadoop", new Watcher() {
        @Override
        public void process(WatchedEvent event) {
            System.out.println(event);
        }
    }, stat);
    System.out.println(new String(data));
    //线程阻塞
    Thread.sleep(Long.MAX_VALUE);
}

设置节点的值


@Test
public void set() throws KeeperException, InterruptedException {
    //判断节点是否存在
    Stat stat = zkClient.exists("/hadoop", false);
    if (stat == null) {
        System.out.println("节点不存在...");
        return;
    }
    //参数解读 1节点路径 2节点的值 3版本号
    zkClient.setData("/hadoop", "sgg".getBytes(), stat.getVersion());
}

删除空节点


@Test
public void delete() throws KeeperException, InterruptedException {
    //判断节点是否存在
    Stat stat = zkClient.exists("/aaa", false);
    if (stat == null) {
        System.out.println("节点不存在...");
        return;
    }
    zkClient.delete("/aaa", stat.getVersion());
}

删除非空节点,递归实现


//封装一个方法,方便递归调用
public void deleteAll(String path, ZooKeeper zk) throws KeeperException, InterruptedException {
    //判断节点是否存在
    Stat stat = zkClient.exists(path, false);
    if (stat == null) {
        System.out.println("节点不存在...");
        return;
    }
    //先获取当前传入节点下的所有子节点
    List<String> children = zk.getChildren(path, false);
    if (children.isEmpty()) {
        //说明传入的节点没有子节点,可以直接删除
        zk.delete(path, stat.getVersion());
    } else {
        //如果传入的节点有子节点,循环所有子节点
        for (String child : children) {
            //删除子节点,但是不知道子节点下面还有没有子节点,所以递归调用
            deleteAll(path + "/" + child, zk);  
        }
        //删除完所有子节点以后,记得删除传入的节点
        zk.delete(path, stat.getVersion());
    }
}
//测试deleteAll
@Test
public void testDeleteAll() throws KeeperException, InterruptedException {
    deleteAll("/hadoop",zkClient);
}
Zookeeper 教程
赞(0) 收藏(0)
Zookeeper 客户端命令行操作
上一篇
Zookeeper 内部原理
下一篇
再想想
暂无评论
随 机 推 荐
从Hadoop框架讨论大数据生态
MapReduce框架原理-计数器应用及数据清洗
关于Xshell6和Xftp6必须更新否则打不开的问题
Hive 数据库的相关操作
Flume 概述
Flume 自定义Interceptor
线性回归实例——预测二手乐高价格
MapReduce 作业提交全过程
  • 0
  • 0
介绍

我们致力于打造一个原创的计算机相关技术的博客网站,旨在为访客提供一个优质的计算机技术教程交流平台。网站开辟了很多于计算机相关的栏目,并且收集了不少实用资源,同时也鼓励欢迎访客一起分享、交流、学习。

灵魂推荐
Veer图库 数码荔枝
栏目标题
首页 动态广场 精选栏目 闲言碎语 左邻右里
Copyright © 2021-2023 最码农. 苏ICP备20033168号
  • 首页
  • 动态广场
  • 精选栏目
  • 闲言碎语
  • 左邻右里
# 教程 # # Hadoop # # Hive # # Flume # # 人工智能 #
Cosy
即使世界毁灭,也总有回光返照的那一刻
117
文章
3
评论
434
喜欢