目录
HDFS准备 Windows 关于 Hadoop 的开发环境
1)下载Hadoop的 Windows 依赖
选择 Hadoop-3.1.0,解压并拷贝到其他地方(比如 E:)。

2)配置 HADOOP_HOME 环境变量

3)配置 Path 环境变量,然后重启电脑

4)创建一个 Maven 工程 HdfsClientDemo,并导入相应的依赖坐标+日志添加
<dependencies>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>4.12</version>
</dependency>
<dependency>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-slf4j-impl</artifactId>
<version>2.12.0</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-client</artifactId>
<version>3.1.3</version>
</dependency>
</dependencies>
在项目的 src/main/resources 目录下,新建一个文件,命名为“log4j2.xml”,在文件中填入
<?xml version="1.0" encoding="UTF-8"?>
<Configuration status="error" strict="true" name="XMLConfig">
<Appenders>
<!-- 类型名为 Console,名称为必须属性 -->
<Appender type="Console" name="STDOUT">
<!-- 布局为 PatternLayout 的方式,
输出样式为[INFO] [2018-01-22 17:34:01][org.test.Console]I'm here
-->
<Layout type="PatternLayout" pattern="[%p] [%d{yyyy-MM-dd HH:mm:ss}][%c{10}]%m%n" />
</Appender>
</Appenders>
<Loggers>
<!-- 可加性为 false -->
<Logger name="test" level="info" additivity="false">
<AppenderRef ref="STDOUT" />
</Logger>
<!-- root loggerConfig 设置 -->
<Root level="info">
<AppenderRef ref="STDOUT" />
</Root>
</Loggers>
</Configuration>
5)创建包名:com.cosyblogs.hdfs
6)创建 HdfsClient 类
public class HdfsClient {
/**
* 获取HDFS的户客端连接对象
* * @param uri HFDS的访问路径 hdfs://hadoop102:9820
* * @param conf 配置对象
* * @param user 操作的用户(用哪个用户操作HDFS)
*/
@Test
public void testCreateHdfsClient() throws IOException, InterruptedException {
// HFDS的访问路径 hdfs://hadoop102:9820
URI uri = URI.create("hdfs://hadoop102:9820");
// conf 配置对象
Configuration conf = new Configuration();
// 操作的用户(用哪个用户操作HDFS)
String user = "hadoop";
// 获取HDFS的客户端连接对象(文件系统对象)
FileSystem fileSystem = FileSystem.get(uri, conf, user);
System.out.println(fileSystem.getClass().getName());
// 关闭资源
fileSystem.close();
}
}
7)执行程序
HDFS 的 API 操作
HDFS 文件上传(测试参数优先级)
1)编写源代码
@Test
public void testCopyFromLocalFile() throws IOException, InterruptedException, URISyntaxException {
// 1 获取文件系统
Configuration configuration = new Configuration();
configuration.set("dfs.replication", "2");
FileSystem fs = FileSystem.get(new URI("hdfs://hadoop102:8020"), configuration, "hadoop");
// 2 上传文件
fs.copyFromLocalFile(new Path("e:/banzhang.txt"), new Path("/banzhang.txt"));
// 3 关闭资源
fs.close();
System.out.println("over");
}
2)将 hdfs-site.xml 拷贝到项目的根目录下
<?xml version="1.0" encoding="UTF-8"?>
<?xml-stylesheet type="text/xsl" href="configuration.xsl"?>
<configuration>
<property>
<name>dfs.replication</name>
<value>1</value>
</property>
</configuration>
3)参数优先级
参数优先级排序:(1)客户端代码中设置的值 >(2)ClassPath 下的用户自定义配置文件 >(3)然后是服务器的自定义配置(xxx-site.xml) >(4)服务器的默认配置(xxx-default.xml)
HDFS 文件下载
@Test
public void testCopyToLocalFile() throws IOException, InterruptedException, URISyntaxException{
// 1 获取文件系统
Configuration configuration = new Configuration();
FileSystem fs = FileSystem.get(new URI("hdfs://hadoop102:9820"), configuration, "hadoop");
// 2 执行下载操作
// boolean delSrc 指是否将原文件删除
// Path src 指要下载的文件路径
// Path dst 指将文件下载到的路径
// boolean useRawLocalFileSystem 是否开启文件校验
fs.copyToLocalFile(false, new Path("/banzhang.txt"), new Path("e:/banhua.txt"), true);
// 3 关闭资源
fs.close();
}
HDFS 删除文件和目录
@Test
public void testDelete() throws IOException, InterruptedException, URISyntaxException{
// 1 获取文件系统
Configuration configuration = new Configuration();
FileSystem fs = FileSystem.get(new URI("hdfs://hadoop102:9820"), configuration, "hadoop");
// 2 执行删除
fs.delete(new Path("/0508/"), true);
// 3 关闭资源
fs.close();
}
HDFS 文件更名和移动
@Test
public void testRename() throws IOException, InterruptedException, URISyntaxException{
// 1 获取文件系统
Configuration configuration = new Configuration();
FileSystem fs = FileSystem.get(new URI("hdfs://hadoop102:9820"), configuration, "hadoop");
// 2 修改文件名称
fs.rename(new Path("/banzhang.txt"), new Path("/banhua.txt"));
// 3 关闭资源
fs.close();
}
HDFS 文件详情查看
查看文件名称、权限、长度、块信息
@Test
public void testListFiles() throws IOException, InterruptedException, URISyntaxException{
// 1 获取文件系统
Configuration configuration = new Configuration();
FileSystem fs = FileSystem.get(new URI("hdfs://hadoop102:9820"), configuration, "hadoop");
// 2 获取文件详情
RemoteIterator<LocatedFileStatus> listFiles = fs.listFiles(new Path("/"), true);
while(listFiles.hasNext()){
LocatedFileStatus status = listFiles.next();
// 输出详情
// 文件名称
System.out.println(status.getPath().getName());
// 长度
System.out.println(status.getLen());
// 权限
System.out.println(status.getPermission());
// 分组
System.out.println(status.getGroup());
// 获取存储的块信息
BlockLocation[] blockLocations = status.getBlockLocations();
for (BlockLocation blockLocation : blockLocations) {
// 获取块存储的主机节点
String[] hosts = blockLocation.getHosts();
for (String host : hosts) {
System.out.println(host);
}
}
System.out.println("-----------分割线----------");
}
// 3 关闭资源
fs.close();
}
HDFS 文件和文件夹判断
@Test
public void testListStatus() throws IOException, InterruptedException, URISyntaxException{
// 1 获取文件配置信息
Configuration configuration = new Configuration();
FileSystem fs = FileSystem.get(new URI("hdfs://hadoop102:9820"), configuration, "hadoop");
// 2 判断是文件还是文件夹
FileStatus[] listStatus = fs.listStatus(new Path("/"));
for (FileStatus fileStatus : listStatus) {
// 如果是文件
if (fileStatus.isFile()) {
System.out.println("f:"+fileStatus.getPath().getName());
}else {
System.out.println("d:"+fileStatus.getPath().getName());
}
}
// 3 关闭资源
fs.close();
}
汇总
public class HdfsClientTest {
private FileSystem fs;
/**
* 判断是文件还是目录
* @throws IOException
*/
@Test
public void testListStatus() throws IOException {
FileStatus[] listStatus = fs.listStatus(new Path("/"));
for (FileStatus status : listStatus) {
if(status.isDirectory()){
System.out.println("DIR:" + status.getPath().getName());
}else {
System.out.println("FILE:" + status.getPath().getName());
}
}
}
/**
* 查看文件详情
* @throws IOException
*/
@Test
public void testListFiles() throws IOException {
RemoteIterator<LocatedFileStatus> listFiles =
fs.listFiles(new Path("/"), true);
while (listFiles.hasNext()){
LocatedFileStatus fileStatus = listFiles.next();
System.out.println("文件名:" + fileStatus.getPath().getName());
System.out.println("块大小:" + fileStatus.getBlockSize());
System.out.println("副本数:" + fileStatus.getReplication());
System.out.println("权限信息:" + fileStatus.getPermission());
}
}
/**
* 文件的更名或者移动
* @throws IOException
*/
@Test
public void testRename() throws IOException {
// 移动文件
// fs.rename(new Path("/sanguo/liubei.txt"), new Path("/client_test"));
fs.rename(new Path("/client_test/liubei.txt"),
new Path("/client_test/xiaoqiao.txt"));
}
/**
* 删除文件和目录
* @throws IOException
*/
@Test
public void testDelete() throws IOException {
fs.delete(new Path("/client_test/hello.txt"), true);
}
/**
* 下载文件
*/
@Test
public void testCopyToLoacl() throws IOException {
fs.copyToLocalFile(false,
new Path("/client_test/hello.txt"),
new Path("F:\\uploadFile")
,true);
}
/**
* 上传文件
* 测试配置的优先级 Configuration > hdfs-site.xml > hdfs-default.xml
*/
@Test
public void testCopyFromLoacl() throws IOException {
fs.copyFromLocalFile(false, true,
new Path("F:\\in\\wcinput\\hello.txt"),
new Path("/client_test"));
}
/**
* 获取FileSystem对象
* @throws IOException
* @throws InterruptedException
*/
@Before
public void init() throws IOException, InterruptedException {
// HFDS的访问路径 hdfs://hadoop102:9820
URI uri = URI.create("hdfs://hadoop102:9820");
// conf 配置对象
Configuration conf = new Configuration();
// 操作的用户(用哪个用户操作HDFS)
String user = "hadoop";
// 获取HDFS的客户端连接对象(文件系统对象)
fs = FileSystem.get(uri, conf, user);
}
/**
* 关闭资源
* @throws IOException
*/
@After
public void close() throws IOException {
fs.close();
}
/**
* 获取HDFS的户客端连接对象
* * @param uri HFDS的访问路径 hdfs://hadoop102:9820
* * @param conf 配置对象
* * @param user 操作的用户(用哪个用户操作HDFS)
*/
@Test
public void testCreateHdfsClient() throws IOException, InterruptedException {
// HFDS的访问路径 hdfs://hadoop102:9820
URI uri = URI.create("hdfs://hadoop102:9820");
// conf 配置对象
Configuration conf = new Configuration();
// 操作的用户(用哪个用户操作HDFS)
String user = "hadoop";
// 获取HDFS的客户端连接对象(文件系统对象)
FileSystem fileSystem = FileSystem.get(uri, conf, user);
System.out.println(fileSystem.getClass().getName());
// 关闭资源
fileSystem.close();
}
}