HDFS是Hadoop生态系统的根基,也是Hadoop生态系统中的重要一员,大部分时候,我们都会使用Linux shell命令来管理HDFS,包括一些文件的创建,删除,修改,上传等等,因为使用shell命令操作HDFS的方式,相对比较简单,方便,但是有时候,我们也需要通过编程的方式来实现对文件系统的管理。
比如有如下的一个小需求,要求我们实现读取HDFS某个文件夹下所有日志,经过加工处理后在写入到HDFS上,或者存进Hbase里,或者存进其他一些存储系统。这时候使用shell的方式就有点麻烦了,所以这时候我们就可以使用编程的方式来完成这件事了 。
下面给出代码,以供参考:
001 |
package com.java.api.hdfs; |
002 |
003 |
import java.io.BufferedReader; |
004 |
import java.io.IOException; |
005 |
import java.io.InputStream; |
006 |
import java.io.InputStreamReader; |
007 |
008 |
import org.apache.hadoop.conf.Configuration; |
009 |
import org.apache.hadoop.fs.FileStatus; |
010 |
import org.apache.hadoop.fs.FileSystem; |
011 |
import org.apache.hadoop.fs.Path; |
012 |
013 |
014 |
/** |
015 |
* @author 三劫散仙 |
016 |
* Java API操作HDFS |
017 |
* 工具类 |
018 |
* |
019 |
* **/ |
020 |
public class OperaHDFS { |
021 |
|
022 |
|
023 |
public static void main(String[] args) throws Exception { |
024 |
|
025 |
//System.out.println("aaa"); |
026 |
// uploadFile(); |
027 |
//createFileOnHDFS(); |
028 |
//deleteFileOnHDFS(); |
029 |
//createDirectoryOnHDFS(); |
030 |
//deleteDirectoryOnHDFS(); |
031 |
// renameFileOrDirectoryOnHDFS(); |
032 |
//downloadFileorDirectoryOnHDFS(); |
033 |
readHDFSListAll(); |
034 |
} |
035 |
|
036 |
|
037 |
|
038 |
|
039 |
/*** |
040 |
* 加载配置文件 |
041 |
* **/ |
042 |
static Configuration conf= new Configuration(); |
043 |
|
044 |
|
045 |
|
046 |
/** |
047 |
* 重名名一个文件夹或者文件 |
048 |
* |
049 |
* **/ |
050 |
public static void renameFileOrDirectoryOnHDFS() throws Exception{ |
051 |
|
052 |
FileSystem fs=FileSystem.get(conf); |
053 |
Path p1 = new Path( "hdfs://10.2.143.5:9090/root/myfile/my.txt" ); |
054 |
Path p2 = new Path( "hdfs://10.2.143.5:9090/root/myfile/my2.txt" ); |
055 |
fs.rename(p1, p2); |
056 |
|
057 |
fs.close(); //释放资源 |
058 |
System.out.println( "重命名文件夹或文件成功....." ); |
059 |
|
060 |
} |
061 |
|
062 |
|
063 |
/*** |
064 |
* |
065 |
* 读取HDFS某个文件夹的所有 |
066 |
* 文件,并打印 |
067 |
* |
068 |
* **/ |
069 |
public static void readHDFSListAll() throws Exception{ |
070 |
|
071 |
//流读入和写入 |
072 |
InputStream in= null ; |
073 |
//获取HDFS的conf |
074 |
//读取HDFS上的文件系统 |
075 |
FileSystem hdfs=FileSystem.get(conf); |
076 |
//使用缓冲流,进行按行读取的功能 |
077 |
BufferedReader buff= null ; |
078 |
//获取日志文件的根目录 |
079 |
Path listf = new Path( "hdfs://10.2.143.5:9090/root/myfile/" ); |
080 |
//获取根目录下的所有2级子文件目录 |
081 |
FileStatus stats[]=hdfs.listStatus(listf); |
082 |
//自定义j,方便查看插入信息 |
083 |
int j= 0 ; |
084 |
for ( int i = 0 ; i < stats.length; i++){ |
085 |
//获取子目录下的文件路径 |
086 |
FileStatus temp[]=hdfs.listStatus( new Path(stats[i].getPath().toString())); |
087 |
for ( int k = 0 ; k < temp.length;k++){ |
088 |
System.out.println( "文件路径名:" +temp[k].getPath().toString()); |
089 |
//获取Path |
090 |
Path p= new Path(temp[k].getPath().toString()); |
091 |
//打开文件流 |
092 |
in=hdfs.open(p); |
093 |
//BufferedReader包装一个流 |
094 |
buff= new BufferedReader( new InputStreamReader(in)); |
095 |
String str= null ; |
096 |
while ((str=buff.readLine())!= null ){ |
097 |
|
098 |
System.out.println(str); |
099 |
} |
100 |
buff.close(); |
101 |
in.close(); |
102 |
|
103 |
|
104 |
} |
105 |
|
106 |
|
107 |
|
108 |
|
109 |
} |
110 |
|
111 |
hdfs.close(); |
112 |
|
113 |
114 |
} |
115 |
/** |
116 |
* 从HDFS上下载文件或文件夹到本地 |
117 |
* |
118 |
* **/ |
119 |
public static void downloadFileorDirectoryOnHDFS() throws Exception{ |
120 |
|
121 |
FileSystem fs=FileSystem.get(conf); |
122 |
Path p1 = new Path( "hdfs://10.2.143.5:9090/root/myfile//my2.txt" ); |
123 |
Path p2 = new Path( "D://7" ); |
124 |
fs.copyToLocalFile(p1, p2); |
125 |
fs.close(); //释放资源 |
126 |
System.out.println( "下载文件夹或文件成功....." ); |
127 |
|
128 |
} |
129 |
/** |
130 |
* 在HDFS上创建一个文件夹 |
131 |
* |
132 |
* **/ |
133 |
public static void createDirectoryOnHDFS() throws Exception{ |
134 |
|
135 |
FileSystem fs=FileSystem.get(conf); |
136 |
Path p = new Path( "hdfs://10.2.143.5:9090/root/myfile" ); |
137 |
fs.mkdirs(p); |
138 |
fs.close(); //释放资源 |
139 |
System.out.println( "创建文件夹成功....." ); |
140 |
|
141 |
} |
142 |
|
143 |
/** |
144 |
* 在HDFS上删除一个文件夹 |
145 |
* |
146 |
* **/ |
147 |
public static void deleteDirectoryOnHDFS() throws Exception{ |
148 |
|
149 |
FileSystem fs=FileSystem.get(conf); |
150 |
Path p = new Path( "hdfs://10.2.143.5:9090/root/myfile" ); |
151 |
fs.deleteOnExit(p); |
152 |
fs.close(); //释放资源 |
153 |
System.out.println( "删除文件夹成功....." ); |
154 |
|
155 |
} |
156 |
/** |
157 |
* 在HDFS上创建一个文件 |
158 |
* |
159 |
* **/ |
160 |
public static void createFileOnHDFS() throws Exception{ |
161 |
|
162 |
FileSystem fs=FileSystem.get(conf); |
163 |
Path p = new Path( "hdfs://10.2.143.5:9090/root/abc.txt" ); |
164 |
fs.createNewFile(p); |
165 |
//fs.create(p); |
166 |
fs.close(); //释放资源 |
167 |
System.out.println( "创建文件成功....." ); |
168 |
|
169 |
} |
170 |
|
171 |
/** |
172 |
* 在HDFS上删除一个文件 |
173 |
* |
174 |
* **/ |
175 |
public static void deleteFileOnHDFS() throws Exception{ |
176 |
|
177 |
FileSystem fs=FileSystem.get(conf); |
178 |
Path p = new Path( "hdfs://10.2.143.5:9090/root/abc.txt" ); |
179 |
fs.deleteOnExit(p); |
180 |
fs.close(); //释放资源 |
181 |
System.out.println( "删除成功....." ); |
182 |
|
183 |
} |
184 |
|
185 |
|
186 |
/*** |
187 |
* 上传本地文件到 |
188 |
* HDFS上 |
189 |
* |
190 |
* **/ |
191 |
public static void uploadFile() throws Exception{ |
192 |
//加载默认配置 |
193 |
FileSystem fs=FileSystem.get(conf); |
194 |
//本地文件 |
195 |
Path src = new Path( "D:\\6" ); |
196 |
//HDFS为止 |
197 |
Path dst = new Path( "hdfs://10.2.143.5:9090/root/" ); |
198 |
try { |
199 |
fs.copyFromLocalFile(src, dst); |
200 |
} catch (IOException e) { |
201 |
// TODO Auto-generated catch block |
202 |
e.printStackTrace(); |
203 |
} |
204 |
System.out.println( "上传成功........" ); |
205 |
|
206 |
fs.close(); //释放资源 |
207 |
|
208 |
|
209 |
} |
210 |
211 |
} |
Configuration conf = new Configuration();
conf.set("fs.defaultFS", "hdfs://h6:9000");
FileSystem fileSystem = FileSystem.get(conf);
1.创建文件夹:
判断是否存在
不存在再创建
if (!fileSystem.exists(new Path("/weir01"))) {
fileSystem.mkdirs(new Path("/weir01"));
}
2.创建文件:
in - InputStream to read from 原文件路径
out - OutputStream to write to hdfs 目录
the size of the buffer 缓冲大小
close - whether or not close the InputStream and OutputStream at the end. The streams are closed in the finally clause. 是否关闭流
FSDataOutputStream out =fileSystem.create(new Path("/d1"));
FileInputStream in = new FileInputStream("f:/hadoop.zip");
IOUtils.copyBytes(in, out, 1024, true);
3上传本地文件
delSrc - whether to delete the src是否删除源文件
overwrite - whether to overwrite an existing file是否覆盖已存在的文件
srcs - array of paths which are source 可以上传多个文件数组方式
dst – path 目标路径
fileSystem.copyFromLocalFile(src, dst);
fileSystem.copyFromLocalFile(delSrc, src, dst);
fileSystem.copyFromLocalFile(delSrc, overwrite, src, dst);
fileSystem.copyFromLocalFile(delSrc, overwrite, srcs, dst);
4 重命名HDFS文件
fileSystem.rename(src, dst);
5.删除文件
True 表示递归删除
fileSystem.delete(new Path("/d1"), true);
6.查看目录及文件信息
FileStatus[] fs = fileSystem.listStatus(new Path("/"));
for (FileStatus f : fs) {
String dir = f.isDirectory() ? "目录":"文件";
String name = f.getPath().getName();
String path = f.getPath().toString();
System.out.println(dir+"----"+name+" path:"+path);
System.out.println(f.getAccessTime());
System.out.println(f.getBlockSize());
System.out.println(f.getGroup());
System.out.println(f.getLen());
System.out.println(f.getModificationTime());
System.out.println(f.getOwner());
System.out.println(f.getPermission());
System.out.println(f.getReplication());
System.out.println(f.getSymlink());
}
7.查找某个文件在HDFS集群的位置
FileStatus fs = fileSystem.getFileStatus(new Path("/data"));
BlockLocation[] bls=fileSystem.getFileBlockLocations(fs, 0, fs.getLen());
for (int i = 0,h=bls.length; i < h; i++) {
String[] hosts= bls[i].getHosts();
System.out.println("block_"+i+"_location: "+hosts[0]);
}
8.获取HDFS集群上所有节点名称信息
DistributedFileSystem hdfs = (DistributedFileSystem) fileSystem;
DatanodeInfo[] dns=hdfs.getDataNodeStats();
for (int i = 0,h=dns.length; i < h; i++) {
System.out.println("datanode_"+i+"_name: "+dns[i].getHostName());
}
http://www.xuehuile.com/blog/db693eb859b64c39b945d7ae333a1343.html
http://weir2009.iteye.com/blog/2082445
道友请留步 2025-03-26
道友请留步 2025-03-26
藏家708 2025-03-26
leoaim 2025-03-25
恭明惠 2025-03-26
藏家468 2025-03-26
藏家101 2025-03-25
藏家101 2025-03-25
藏家101 2025-03-25
藏家068 2025-03-25