基本操作
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
|
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.*; import org.junit.Test; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.DisplayName; import org.junit.runner.RunWith; import org.junit.runners.JUnit4; import java.io.IOException; import java.net.URI; import java.net.URISyntaxException; import java.util.Arrays; @RunWith (JUnit4. class ) @DisplayName ( "Test using junit4" ) public class HadoopClientTest { private FileSystem fileSystem = null ; @BeforeEach public void init() throws URISyntaxException, IOException, InterruptedException { Configuration configuration = new Configuration(); configuration.set( "dfs.replication" , "1" ); configuration.set( "dfs.blocksize" , "64m" ); fileSystem = FileSystem.get( new URI( "hdfs://hd-even-01:9000" ), configuration, "root" ); } /** * 從本地復制文件到Hadoop * * @throws URISyntaxException * @throws IOException * @throws InterruptedException */ @Test public void copyFileFromLocal() throws URISyntaxException, IOException, InterruptedException { // 上傳文件 fileSystem.copyFromLocalFile( new Path( "C:\\Users\\Administrator\\Desktop\\win10激活.txt" ), new Path( "/even1" )); // 關閉流,報錯winUtils,因為使用了linux的tar包,如果windows要使用,則需要編譯好這個winUtils包才能使用 fileSystem.close(); } /** * 從Hadoop下載文件到本地,下載需要配置Hadoop環境,并添加winutils到bin目錄 * * @throws URISyntaxException * @throws IOException * @throws InterruptedException */ @Test public void copyFileToLocal() throws URISyntaxException, IOException, InterruptedException { // 下載文件 fileSystem.copyToLocalFile( new Path( "/win10激活.txt" ), new Path( "E:/" )); // 關閉流,報錯winUtils,因為使用了linux的tar包,如果windows要使用,則需要編譯好這個winUtils包才能使用 fileSystem.close(); } /** * 創建文件夾 * * @throws IOException */ @Test public void hdfsMkdir() throws IOException { // 調用創建文件夾方法 fileSystem.mkdirs( new Path( "/even1" )); // 關閉方法 fileSystem.close(); } /** * 移動文件/修改文件名 */ public void hdfsRename() throws IOException { fileSystem.rename( new Path( "" ), new Path( "" )); fileSystem.close(); } /** * 刪除文件/文件夾 * * @throws IOException */ @Test public void hdfsRm() throws IOException { // fileSystem.delete(new Path("")); // 第二個參數表示遞歸刪除 fileSystem.delete( new Path( "" ), true ); fileSystem.close(); } /** * 查看hdfs指定目錄的信息 * * @throws IOException */ @Test public void hdfsLs() throws IOException { // 調用方法返回遠程迭代器,第二個參數是把目錄文件夾內的文件也列出來 RemoteIterator<LocatedFileStatus> listFiles = fileSystem.listFiles( new Path( "/" ), true ); while (listFiles.hasNext()) { LocatedFileStatus locatedFileStatus = listFiles.next(); System.out.println( "文件路徑:" + locatedFileStatus.getPath()); System.out.println( "塊大小:" + locatedFileStatus.getBlockSize()); System.out.println( "文件長度:" + locatedFileStatus.getLen()); System.out.println( "副本數量:" + locatedFileStatus.getReplication()); System.out.println( "塊信息:" + Arrays.toString(locatedFileStatus.getBlockLocations())); } fileSystem.close(); } /** * 判斷是文件還是文件夾 */ @Test public void findHdfs() throws IOException { // 1,展示狀態信息 FileStatus[] listStatus = fileSystem.listStatus( new Path( "/" )); // 2,遍歷所有文件 for (FileStatus fileStatus : listStatus) { if (fileStatus.isFile()) System.out.println( "是文件:" + fileStatus.getPath().getName()); else if (fileStatus.isDirectory()) System.out.println( "是文件夾:" + fileStatus.getPath().getName()); } fileSystem.close(); } } |
文件讀寫
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
|
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IOUtils; import org.junit.Before; import org.junit.Test; import org.junit.jupiter.api.DisplayName; import org.junit.runner.RunWith; import org.junit.runners.JUnit4; import java.io.*; import java.net.URI; import java.net.URISyntaxException; import java.nio.charset.StandardCharsets; import java.util.Arrays; @RunWith (JUnit4. class ) @DisplayName ( "this is read write test!" ) public class HadoopReadWriteTest { FileSystem fileSystem = null ; Configuration configuration = null ; @Before public void init() throws URISyntaxException, IOException, InterruptedException { // 1,加載配置 configuration = new Configuration(); // 2,構建客戶端 fileSystem = FileSystem.get( new URI( "hdfs://hd-even-01:9000/" ), configuration, "root" ); } @Test public void testReadData() throws IOException { // 1,獲取hdfs文件流 FSDataInputStream open = fileSystem.open( new Path( "/win10激活.txt" )); // 2,設置一次獲取的大小 byte [] bytes = new byte [ 1024 ]; // 3,讀取數據 while (open.read(bytes) != - 1 ) System.out.println(Arrays.toString(bytes)); open.close(); fileSystem.close(); } /** * 使用緩存流 * * @throws IOException */ @Test public void testReadData1() throws IOException { FSDataInputStream open = fileSystem.open( new Path( "/win10激活.txt" )); // 使用緩沖流會快點 BufferedReader bufferedReader = new BufferedReader( new InputStreamReader(open, StandardCharsets.UTF_8)); String line = "" ; while ((line = bufferedReader.readLine()) != null ) { System.out.println(line); } bufferedReader.close(); open.close(); fileSystem.close(); } /** * 指定偏移量來實現只讀部分內容 */ @Test public void readSomeData() throws IOException { FSDataInputStream open = fileSystem.open( new Path( "/win10激活.txt" )); // 指定開始的index open.seek( 14 ); // 指定讀的多少 byte [] bytes = new byte [ 5 ]; while (open.read(bytes) != - 1 ) System.out.println( new String(bytes)); open.close(); fileSystem.close(); } /** * 流方式寫數據 * @throws IOException */ @Test public void writeData() throws IOException { // 1,獲取輸出流 FSDataOutputStream out = fileSystem.create( new Path( "/win11.txt" ), false ); // 2,獲取需要寫的文件輸入流 FileInputStream in = new FileInputStream( new File( "C:\\Users\\Administrator\\Desktop\\xixi.txt" )); byte [] b = new byte [ 1024 ]; int read = 0 ; while ((read = in.read(b)) != - 1 ) { out.write(b, 0 , read); } in.close(); out.close(); fileSystem.close(); } /** * 直接寫字符串 */ @Test public void writeData1() throws IOException { // 1,創建輸出流 FSDataOutputStream out = fileSystem.create( new Path( "/aibaobao.txt" ), false ); // 2,寫數據 out.write( "wochaoaibaobao" .getBytes()); // 3,關閉流 IOUtils.closeStream(out); fileSystem.close(); } /** * IOUtils方式上傳 * * @throws IOException */ @Test public void putToHdfs() throws IOException { // 1,獲取輸入流 FileInputStream in = new FileInputStream( new File( "C:\\Users\\Administrator\\Desktop\\xixi.txt" )); // 2,獲取輸出流 FSDataOutputStream out = fileSystem.create( new Path( "/haddopPut.txt" ), false ); // 3,拷貝 IOUtils.copyBytes(in, out, configuration); // 4,關閉流 IOUtils.closeStream(in); IOUtils.closeStream(out); fileSystem.close(); } /** * IOUtils方式下載 * @throws IOException */ @Test public void getFromHdfs() throws IOException { // 1,獲取輸入流 FSDataInputStream open = fileSystem.open( new Path( "/haddopPut.txt" )); // 2,獲取輸出流 FileOutputStream out = new FileOutputStream( new File( "C:\\Users\\Administrator\\Desktop\\haddopPut.txt" )); // 3,拷貝 IOUtils.copyBytes(open, out, configuration); // 4,關閉流 IOUtils.closeStream(open); IOUtils.closeStream(out); fileSystem.close(); } } |
到此這篇關于java實現對Hadoop的操作的文章就介紹到這了,更多相關Java Hadoop內容請搜索服務器之家以前的文章或繼續瀏覽下面的相關文章希望大家以后多多支持服務器之家!
原文鏈接:https://blog.csdn.net/weixin_37581297/article/details/84349916