最码农 最码农
  • 首页
  • 动态广场
  • 精选栏目
  • 闲言碎语
  • 左邻右里
  • 笔记屋
  • 注册
  • 登录
首页 › 大数据 › Hadoop HDFS(三)

Hadoop HDFS(三)

Cosy
2年前大数据阅读 1,686

文章链接

Hadoop HDFS(一)

Hadoop HDFS(二)

Hadoop HDFS(三)

Java API

下面将介绍使 Java API 操作 HDFS。

project

使用 Eclipse 或者 IDEA 创建 SpringBoot 项目。

Hadoop HDFS(三)-最码农

pom.xml

在 pom.xml 文件中配置完 Spring 相关内容后,添加以下 Hadoop HDFS 相关jar包。

<!-- hadoop hdfs -->
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-hdfs</artifactId>
<version>2.9.2</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-client</artifactId>
<version>2.9.2</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-common</artifactId>
<version>2.9.2</version>
</dependency>

API

package com.iss.hadoop.hdfs.impl;

import com.iss.hadoop.common.ApiResult;
import com.iss.hadoop.hdfs.HDFSService;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.*;
import org.apache.hadoop.io.IOUtils;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Service;
import java.io.InputStream;
import java.net.URI;
import java.util.ArrayList;
import java.util.List;

@Service
public class HDFSServiceImpl implements HDFSService {

    @Value("${hdfs.uri}")
    private String hdfsURI;

    /**
     * 获取 HDFS 文件系统对象
    */
    public FileSystem getFileSystem() {
        System.setProperty("HADOOP_USER_NAME", "root");
        FileSystem fs = null;
        try {
            fs = FileSystem.get(URI.create(hdfsURI), new Configuration());
        } catch (Exception e) {
            System.out.println(e.getMessage());
        }
        return fs;
    }

    /**
     * 创建文件夹
     */
    public ApiResult mkdirs(String dirs) {
        boolean flag = false;
        try {
            flag = this.getFileSystem().mkdirs(new Path(dirs));
        } catch (Exception e) {
            System.out.println(e.getMessage());
        }
        return flag ? ApiResult.success() : ApiResult.failure("Operate Failure!");
    }

    /**
    * 创建文件
    */
    public ApiResult upload(String fileName, InputStream input) {
        boolean flag = false;
        FSDataOutputStream fos = null;
        try {
            fos = this.getFileSystem().create(new Path("/hdfs/" + fileName));
            byte[] buffer = new byte[1024];
            int len = 0;
            while ((len = input.read(buffer)) != -1) {
                fos.write(buffer, 0, len);
            }
            IOUtils.closeStream(fos);
            flag = true;
        } catch (Exception e) {
            System.out.println(e.getMessage());
        }
        return flag ? ApiResult.success() : ApiResult.failure("Operate Failure!");
    }

    /**
     * 删除文件夹 or 文件
     */
    public ApiResult delete(String fileName) {
        boolean flag = false;
        try {
            flag = this.getFileSystem().delete(new Path(fileName), true);
        } catch (Exception e) {
            System.out.println(e.getMessage());
        }
        return flag ? ApiResult.success() : ApiResult.failure("Operate Failure !");
    }

    /**
     * 重命名文件夹 or 文件
     */
    public ApiResult rename(String oldStr, String newStr) {
        boolean flag = false;
        try {
            flag = this.getFileSystem().rename(new Path(oldStr), new Path(newStr));
        } catch (Exception e) {
            System.out.println(e.getMessage());
        }
        return flag ? ApiResult.success() : ApiResult.failure("Operate Failure !");
    }

    /**
     * 文件是否存在
     */
    public ApiResult exists(String fileName) {
        boolean flag = false;
        try {
            flag = this.getFileSystem().exists(new Path(fileName));
        } catch (Exception e) {
            System.out.println(e.getMessage());
        }
        return flag ? ApiResult.success("File exists !", true) : ApiResult.success("File not exists !", false);
    }

    /**
     * 下载
     */
    public InputStream download(String fileName) {
        FSDataInputStream fis = null;
        try {
            fis = this.getFileSystem().open(new Path(fileName));
        } catch (Exception e) {
            System.out.println(e.getMessage());
        }
        return fis;
    }

    /**
     * 遍历指定路径下的文件
     */
    public ApiResult list(String path) {
        List<String> list = new ArrayList<String>();
        try {
            // 方式一
            RemoteIterator<LocatedFileStatus> iterator = this.getFileSystem().listFiles(new Path(path), true);
            while (iterator.hasNext()) {
                LocatedFileStatus ls = iterator.next();
                list.add(ls.getPath().getName());
            }
            // 方式二
            FileStatus[] fileStatus = this.getFileSystem().listStatus(new Path(path));
            for (int i = 0; i < fileStatus.length; i++) {
                System.out.println(fileStatus[i].getPath().toString());
            }
        } catch (Exception e) {
            System.out.println(e.getMessage());
        }
        return ApiResult.success(list);
    }

    /**
     * 查找某个文件在集群中的位置
     */
    public void getFileLocation(String fileName) {
        try {
            FileStatus fileStatus = this.getFileSystem().getFileStatus(new Path(fileName));
            BlockLocation[] blockLocation = this.getFileSystem().getFileBlockLocations(fileStatus, 0,
                    fileStatus.getLen());
            for (int i = 0; i < blockLocation.length; i++) {
                String[] hosts = blockLocation[i].getHosts();
                System.out.println("block_" + i + "_location:" + hosts[0]);
            }
        } catch (Exception e) {
            System.out.println(e.getMessage());
        }
    }
}

Test

package com.iss.hadoop.controller;

import com.iss.hadoop.common.ApiResult;
import com.iss.hadoop.hdfs.HDFSService;
import org.apache.commons.io.FileUtils;
import org.apache.commons.io.IOUtils;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.http.HttpHeaders;
import org.springframework.http.HttpStatus;
import org.springframework.http.MediaType;
import org.springframework.http.ResponseEntity;
import org.springframework.web.bind.annotation.*;
import org.springframework.web.multipart.MultipartFile;
import javax.servlet.http.HttpServletRequest;
import java.io.File;
import java.io.IOException;
import java.io.InputStream;
import java.net.URLEncoder;

@RestController
@RequestMapping("/hdfs")
public class HDFSController {
    @Autowired
    private HDFSService hdfsService;

    // http://localhost:8080/hdfs/mkdirs?dirs=/hdfs/a/b/c/
    @GetMapping("/mkdirs")
    public ApiResult mkdirs(@RequestParam("dirs") String dirs) {
        return this.hdfsService.mkdirs(dirs);
    }

    // http://localhost:8080/hdfs/upload
    @PostMapping("/upload")
    public ApiResult upload(@RequestParam("file") MultipartFile file) {
        ApiResult apiResult = null;
        try {
            apiResult = this.hdfsService.upload(file.getOriginalFilename(), file.getInputStream());
        } catch (IOException e) {
            e.printStackTrace();
            apiResult = ApiResult.failure("Operate Failure !");
        }
        return apiResult;
    }

    // http://localhost:8080/hdfs/delete?fileName=
    @GetMapping("/delete")
    public ApiResult delete(@RequestParam("fileName") String fileName) {
        return this.hdfsService.delete(fileName);
    }

    // http://localhost:8080/hdfs/rename?oldStr= &newStr=
    @GetMapping("/rename")
    public ApiResult rename(@RequestParam("oldStr") String oldStr, @RequestParam("newStr") String newStr) {
        return this.hdfsService.rename(oldStr, newStr);
    }

    // http://localhost:8080/hdfs/exists?fileName=
    @GetMapping("/exists")
    public ApiResult exists(@RequestParam("fileName") String fileName) {
        return this.hdfsService.exists(fileName);
    }

    // http://localhost:8080/hdfs/download?fileName=
    @GetMapping(value = "/download")
    public ResponseEntity<byte[]> download(@RequestParam("fileName") String fileName) throws Exception {
        HttpHeaders headers = new HttpHeaders();
        headers.setContentType(MediaType.APPLICATION_OCTET_STREAM);
        headers.setContentDispositionFormData("attachment", URLEncoder.encode(fileName, "UTF-8"));
        InputStream input = this.hdfsService.download(fileName);
        return new ResponseEntity<byte[]>(IOUtils.toByteArray(input), headers, HttpStatus.OK);
    }

    // http://localhost:8080/hdfs/list?path=
    @GetMapping("/list")
    public ApiResult list(@RequestParam("path") String path) {
        return this.hdfsService.list(path);
    }
}

Hadoop HDFS
赞(15) 收藏(0)
Hadoop HDFS(二)
上一篇
K-均值聚类算法实例——分类地图上的地点
下一篇
再想想
暂无评论
随 机 推 荐
Hive 行转列与列转行
Hive 常用函数整理
Flume 自定义Sink
Flume 入门案例 – 实时监控单个追加文件
从Hadoop框架讨论大数据生态
HDFS 概述
MapReduce框架原理-InputFormat数据输入
Hadoop 序列化
15
  • 15
  • 0
介绍

我们致力于打造一个原创的计算机相关技术的博客网站,旨在为访客提供一个优质的计算机技术教程交流平台。网站开辟了很多于计算机相关的栏目,并且收集了不少实用资源,同时也鼓励欢迎访客一起分享、交流、学习。

灵魂推荐
Veer图库 数码荔枝
栏目标题
首页 动态广场 精选栏目 闲言碎语 左邻右里 笔记屋
Copyright © 2021-2023 最码农. 苏ICP备20033168号
  • 首页
  • 动态广场
  • 精选栏目
  • 闲言碎语
  • 左邻右里
  • 笔记屋
# 教程 # # Hadoop # # Hive # # Flume # # 人工智能 #
Cosy
即使世界毁灭,也总有回光返照的那一刻
117
文章
3
评论
432
喜欢