文章链接
Java API
下面将介绍使 Java API 操作 HDFS。
project
使用 Eclipse 或者 IDEA 创建 SpringBoot 项目。

pom.xml
在 pom.xml 文件中配置完 Spring 相关内容后,添加以下 Hadoop HDFS 相关jar包。
<!-- hadoop hdfs -->
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-hdfs</artifactId>
<version>2.9.2</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-client</artifactId>
<version>2.9.2</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-common</artifactId>
<version>2.9.2</version>
</dependency>
API
package com.iss.hadoop.hdfs.impl;
import com.iss.hadoop.common.ApiResult;
import com.iss.hadoop.hdfs.HDFSService;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.*;
import org.apache.hadoop.io.IOUtils;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Service;
import java.io.InputStream;
import java.net.URI;
import java.util.ArrayList;
import java.util.List;
@Service
public class HDFSServiceImpl implements HDFSService {
@Value("${hdfs.uri}")
private String hdfsURI;
/**
* 获取 HDFS 文件系统对象
*/
public FileSystem getFileSystem() {
System.setProperty("HADOOP_USER_NAME", "root");
FileSystem fs = null;
try {
fs = FileSystem.get(URI.create(hdfsURI), new Configuration());
} catch (Exception e) {
System.out.println(e.getMessage());
}
return fs;
}
/**
* 创建文件夹
*/
public ApiResult mkdirs(String dirs) {
boolean flag = false;
try {
flag = this.getFileSystem().mkdirs(new Path(dirs));
} catch (Exception e) {
System.out.println(e.getMessage());
}
return flag ? ApiResult.success() : ApiResult.failure("Operate Failure!");
}
/**
* 创建文件
*/
public ApiResult upload(String fileName, InputStream input) {
boolean flag = false;
FSDataOutputStream fos = null;
try {
fos = this.getFileSystem().create(new Path("/hdfs/" + fileName));
byte[] buffer = new byte[1024];
int len = 0;
while ((len = input.read(buffer)) != -1) {
fos.write(buffer, 0, len);
}
IOUtils.closeStream(fos);
flag = true;
} catch (Exception e) {
System.out.println(e.getMessage());
}
return flag ? ApiResult.success() : ApiResult.failure("Operate Failure!");
}
/**
* 删除文件夹 or 文件
*/
public ApiResult delete(String fileName) {
boolean flag = false;
try {
flag = this.getFileSystem().delete(new Path(fileName), true);
} catch (Exception e) {
System.out.println(e.getMessage());
}
return flag ? ApiResult.success() : ApiResult.failure("Operate Failure !");
}
/**
* 重命名文件夹 or 文件
*/
public ApiResult rename(String oldStr, String newStr) {
boolean flag = false;
try {
flag = this.getFileSystem().rename(new Path(oldStr), new Path(newStr));
} catch (Exception e) {
System.out.println(e.getMessage());
}
return flag ? ApiResult.success() : ApiResult.failure("Operate Failure !");
}
/**
* 文件是否存在
*/
public ApiResult exists(String fileName) {
boolean flag = false;
try {
flag = this.getFileSystem().exists(new Path(fileName));
} catch (Exception e) {
System.out.println(e.getMessage());
}
return flag ? ApiResult.success("File exists !", true) : ApiResult.success("File not exists !", false);
}
/**
* 下载
*/
public InputStream download(String fileName) {
FSDataInputStream fis = null;
try {
fis = this.getFileSystem().open(new Path(fileName));
} catch (Exception e) {
System.out.println(e.getMessage());
}
return fis;
}
/**
* 遍历指定路径下的文件
*/
public ApiResult list(String path) {
List<String> list = new ArrayList<String>();
try {
// 方式一
RemoteIterator<LocatedFileStatus> iterator = this.getFileSystem().listFiles(new Path(path), true);
while (iterator.hasNext()) {
LocatedFileStatus ls = iterator.next();
list.add(ls.getPath().getName());
}
// 方式二
FileStatus[] fileStatus = this.getFileSystem().listStatus(new Path(path));
for (int i = 0; i < fileStatus.length; i++) {
System.out.println(fileStatus[i].getPath().toString());
}
} catch (Exception e) {
System.out.println(e.getMessage());
}
return ApiResult.success(list);
}
/**
* 查找某个文件在集群中的位置
*/
public void getFileLocation(String fileName) {
try {
FileStatus fileStatus = this.getFileSystem().getFileStatus(new Path(fileName));
BlockLocation[] blockLocation = this.getFileSystem().getFileBlockLocations(fileStatus, 0,
fileStatus.getLen());
for (int i = 0; i < blockLocation.length; i++) {
String[] hosts = blockLocation[i].getHosts();
System.out.println("block_" + i + "_location:" + hosts[0]);
}
} catch (Exception e) {
System.out.println(e.getMessage());
}
}
}
Test
package com.iss.hadoop.controller;
import com.iss.hadoop.common.ApiResult;
import com.iss.hadoop.hdfs.HDFSService;
import org.apache.commons.io.FileUtils;
import org.apache.commons.io.IOUtils;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.http.HttpHeaders;
import org.springframework.http.HttpStatus;
import org.springframework.http.MediaType;
import org.springframework.http.ResponseEntity;
import org.springframework.web.bind.annotation.*;
import org.springframework.web.multipart.MultipartFile;
import javax.servlet.http.HttpServletRequest;
import java.io.File;
import java.io.IOException;
import java.io.InputStream;
import java.net.URLEncoder;
@RestController
@RequestMapping("/hdfs")
public class HDFSController {
@Autowired
private HDFSService hdfsService;
// http://localhost:8080/hdfs/mkdirs?dirs=/hdfs/a/b/c/
@GetMapping("/mkdirs")
public ApiResult mkdirs(@RequestParam("dirs") String dirs) {
return this.hdfsService.mkdirs(dirs);
}
// http://localhost:8080/hdfs/upload
@PostMapping("/upload")
public ApiResult upload(@RequestParam("file") MultipartFile file) {
ApiResult apiResult = null;
try {
apiResult = this.hdfsService.upload(file.getOriginalFilename(), file.getInputStream());
} catch (IOException e) {
e.printStackTrace();
apiResult = ApiResult.failure("Operate Failure !");
}
return apiResult;
}
// http://localhost:8080/hdfs/delete?fileName=
@GetMapping("/delete")
public ApiResult delete(@RequestParam("fileName") String fileName) {
return this.hdfsService.delete(fileName);
}
// http://localhost:8080/hdfs/rename?oldStr= &newStr=
@GetMapping("/rename")
public ApiResult rename(@RequestParam("oldStr") String oldStr, @RequestParam("newStr") String newStr) {
return this.hdfsService.rename(oldStr, newStr);
}
// http://localhost:8080/hdfs/exists?fileName=
@GetMapping("/exists")
public ApiResult exists(@RequestParam("fileName") String fileName) {
return this.hdfsService.exists(fileName);
}
// http://localhost:8080/hdfs/download?fileName=
@GetMapping(value = "/download")
public ResponseEntity<byte[]> download(@RequestParam("fileName") String fileName) throws Exception {
HttpHeaders headers = new HttpHeaders();
headers.setContentType(MediaType.APPLICATION_OCTET_STREAM);
headers.setContentDispositionFormData("attachment", URLEncoder.encode(fileName, "UTF-8"));
InputStream input = this.hdfsService.download(fileName);
return new ResponseEntity<byte[]>(IOUtils.toByteArray(input), headers, HttpStatus.OK);
}
// http://localhost:8080/hdfs/list?path=
@GetMapping("/list")
public ApiResult list(@RequestParam("path") String path) {
return this.hdfsService.list(path);
}
}