hadoop環(huán)境搭建詳見此文章 http://www.ythuaji.com.cn/article/140756.html 。
我們已經知道hadoop能夠通過hadoop jar ***.jar input output的形式通過命令行來調用,那么如何將其封裝成一個服務,讓java/web來調用它?使得用戶可以用方便的方式上傳文件到hadoop并進行處理,獲得結果。首先,***.jar是一個hadoop任務類的封裝,我們可以在沒有jar的情況下運行該類的main方法,將必要的參數(shù)傳遞給它。input 和output則將用戶上傳的文件使用hadoop的javaapi put到hadoop的文件系統(tǒng)中。然后再通過hadoop的javaapi 從文件系統(tǒng)中取得結果文件。
搭建javaweb工程。本文使用spring、springmvc、mybatis框架, 當然,這不是重點,就算沒有使用任何框架也能實現(xiàn)。
項目框架如下:
項目中使用到的jar包如下:
在spring的配置文件中,加入
1
2
3
4
5
|
<bean id= "multipartresolver" class = "org.springframework.web.multipart.commons.commonsmultipartresolver" > <property name= "defaultencoding" value= "utf-8" /> <property name= "maxuploadsize" value= "10485760000" /> <property name= "maxinmemorysize" value= "40960" /> </bean> |
使得項目支持文件上傳。
新建一個login.jsp 點擊登錄后進入user/login
user/login中處理登錄,登錄成功后,【在hadoop文件系統(tǒng)中創(chuàng)建用戶文件夾】,然后跳轉到console.jsp
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
|
package com.chenjie.controller; import java.io.ioexception; import javax.annotation.resource; import javax.servlet.http.httpservletrequest; import javax.servlet.http.httpservletresponse; import org.apache.hadoop.conf.configuration; import org.apache.hadoop.fs.filesystem; import org.apache.hadoop.fs.path; import org.springframework.stereotype.controller; import org.springframework.web.bind.annotation.requestmapping; import com.chenjie.pojo.jsonresult; import com.chenjie.pojo.user; import com.chenjie.service.userservice; import com.chenjie.util.appconfig; import com.google.gson.gson; /** * 用戶請求控制器 * * @author chen * */ @controller // 聲明當前類為控制器 @requestmapping ( "/user" ) // 聲明當前類的路徑 public class usercontroller { @resource (name = "userservice" ) private userservice userservice; // 由spring容器注入一個userservice實例 /** * 登錄 * * @param user * 用戶 * @param request * @param response * @throws ioexception */ @requestmapping ( "/login" ) // 聲明當前方法的路徑 public string login(user user, httpservletrequest request, httpservletresponse response) throws ioexception { response.setcontenttype( "application/json" ); // 設置響應內容格式為json user result = userservice.login(user); // 調用userservice的登錄方法 request.getsession().setattribute( "user" , result); if (result != null ) { createhadoopfsfolder(result); return "console" ; } return "login" ; } public void createhadoopfsfolder(user user) throws ioexception { configuration conf = new configuration(); conf.addresource( new path( "/opt/hadoop-1.2.1/conf/core-site.xml" )); conf.addresource( new path( "/opt/hadoop-1.2.1/conf/hdfs-site.xml" )); filesystem filesystem = filesystem.get(conf); system.out.println(filesystem.geturi()); path file = new path( "/user/" + user.getu_username()); if (filesystem.exists(file)) { system.out.println( "haddop hdfs user foler exists." ); filesystem.delete(file, true ); system.out.println( "haddop hdfs user foler delete success." ); } filesystem.mkdirs(file); system.out.println( "haddop hdfs user foler creat success." ); } } |
console.jsp中進行文件上傳和任務提交、
文件上傳和任務提交:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
|
package com.chenjie.controller; import java.io.file; import java.io.ioexception; import java.net.inetsocketaddress; import java.net.uri; import java.util.arraylist; import java.util.iterator; import java.util.list; import javax.servlet.http.httpservletrequest; import javax.servlet.http.httpservletresponse; import org.apache.hadoop.conf.configuration; import org.apache.hadoop.fs.fsdatainputstream; import org.apache.hadoop.fs.filesystem; import org.apache.hadoop.fs.path; import org.apache.hadoop.mapred.jobclient; import org.apache.hadoop.mapred.jobconf; import org.apache.hadoop.mapred.jobid; import org.apache.hadoop.mapred.jobstatus; import org.apache.hadoop.mapred.runningjob; import org.springframework.stereotype.controller; import org.springframework.web.bind.annotation.requestmapping; import org.springframework.web.multipart.multipartfile; import org.springframework.web.multipart.multiparthttpservletrequest; import org.springframework.web.multipart.commons.commonsmultipartresolver; import com.chenjie.pojo.user; import com.chenjie.util.utils; @controller // 聲明當前類為控制器 @requestmapping ( "/hadoop" ) // 聲明當前類的路徑 public class hadoopcontroller { @requestmapping ( "/upload" ) // 聲明當前方法的路徑 //文件上傳 public string upload(httpservletrequest request, httpservletresponse response) throws ioexception { list<string> filelist = (list<string>) request.getsession() .getattribute( "filelist" ); //得到用戶已上傳文件列表 if (filelist == null ) filelist = new arraylist<string>(); //如果文件列表為空,則新建 user user = (user) request.getsession().getattribute( "user" ); if (user == null ) return "login" ; //如果用戶未登錄,則跳轉登錄頁面 commonsmultipartresolver multipartresolver = new commonsmultipartresolver( request.getsession().getservletcontext()); //得到在spring配置文件中注入的文件上傳組件 if (multipartresolver.ismultipart(request)) { //如果請求是文件請求 multiparthttpservletrequest multirequest = (multiparthttpservletrequest) request; iterator<string> iter = multirequest.getfilenames(); //得到文件名迭代器 while (iter.hasnext()) { multipartfile file = multirequest.getfile((string) iter.next()); if (file != null ) { string filename = file.getoriginalfilename(); file folder = new file( "/home/chenjie/cjhadooponline/" + user.getu_username()); if (!folder.exists()) { folder.mkdir(); //如果文件不目錄存在,則在服務器本地創(chuàng)建 } string path = "/home/chenjie/cjhadooponline/" + user.getu_username() + "/" + filename; file localfile = new file(path); file.transferto(localfile); //將上傳文件拷貝到服務器本地目錄 // filelist.add(path); } handleuploadfiles(user, filelist); //處理上傳文件 } } request.getsession().setattribute( "filelist" , filelist); //將上傳文件列表保存在session中 return "console" ; //返回console.jsp繼續(xù)上傳文件 } @requestmapping ( "/wordcount" ) //調用hadoop進行mapreduce public void wordcount(httpservletrequest request, httpservletresponse response) { system.out.println( "進入controller wordcount " ); user user = (user) request.getsession().getattribute( "user" ); system.out.println(user); // if(user == null) // return "login"; wordcount c = new wordcount(); //新建單詞統(tǒng)計任務 string username = user.getu_username(); string input = "hdfs://chenjie-virtual-machine:9000/user/" + username + "/wordcountinput" ; //指定hadoop文件系統(tǒng)的輸入文件夾 string output = "hdfs://chenjie-virtual-machine:9000/user/" + username + "/wordcountoutput" ; //指定hadoop文件系統(tǒng)的輸出文件夾 string reslt = output + "/part-r-00000" ; //默認輸出文件 try { thread.sleep( 3 * 1000 ); c.main( new string[] { input, output }); //調用單詞統(tǒng)計任務 configuration conf = new configuration(); //新建hadoop配置 conf.addresource( new path( "/opt/hadoop-1.2.1/conf/core-site.xml" )); //添加hadoop配置,找到hadoop部署信息 conf.addresource( new path( "/opt/hadoop-1.2.1/conf/hdfs-site.xml" )); //hadoop配置,找到文件系統(tǒng) filesystem filesystem = filesystem.get(conf); //得打文件系統(tǒng) path file = new path(reslt); //找到輸出結果文件 fsdatainputstream instream = filesystem.open(file); //打開 uri uri = file.touri(); //得到輸出文件路徑 system.out.println(uri); string data = null ; while ((data = instream.readline()) != null ) { //system.out.println(data); response.getoutputstream().println(data); //講結果文件寫回用戶網頁 } // inputstream in = filesystem.open(file); // outputstream out = new fileoutputstream("result.txt"); // ioutils.copybytes(in, out, 4096, true); instream.close(); } catch (exception e) { system.err.println(e.getmessage()); } } @requestmapping ( "/mapreducestates" ) //得到mapreduce的狀態(tài) public void mapreduce(httpservletrequest request, httpservletresponse response) { float [] progress= new float [ 2 ]; try { configuration conf1= new configuration(); conf1.set( "mapred.job.tracker" , utils.jobtracker); jobstatus jobstatus = utils.getjobstatus(conf1); // while(!jobstatus.isjobcomplete()){ // progress = utils.getmapreduceprogess(jobstatus); // response.getoutputstream().println("map:" + progress[0] + "reduce:" + progress[1]); // thread.sleep(1000); // } jobconf jc = new jobconf(conf1); jobclient jobclient = new jobclient(jc); jobstatus[] jobsstatus = jobclient.getalljobs(); //這樣就得到了一個jobstatus數(shù)組,隨便取出一個元素取名叫jobstatus jobstatus = jobsstatus[ 0 ]; jobid jobid = jobstatus.getjobid(); //通過jobstatus獲取jobid runningjob runningjob = jobclient.getjob(jobid); //通過jobid得到runningjob對象 runningjob.getjobstate(); //可以獲取作業(yè)狀態(tài),狀態(tài)有五種,為jobstatus.failed 、jobstatus.killed、jobstatus.prep、jobstatus.running、jobstatus.succeeded jobstatus.getusername(); //可以獲取運行作業(yè)的用戶名。 runningjob.getjobname(); //可以獲取作業(yè)名。 jobstatus.getstarttime(); //可以獲取作業(yè)的開始時間,為utc毫秒數(shù)。 float map = runningjob.mapprogress(); //可以獲取map階段完成的比例,0~1, system.out.println( "map=" + map); float reduce = runningjob.reduceprogress(); //可以獲取reduce階段完成的比例。 system.out.println( "reduce=" +reduce); runningjob.getfailureinfo(); //可以獲取失敗信息。 runningjob.getcounters(); //可以獲取作業(yè)相關的計數(shù)器,計數(shù)器的內容和作業(yè)監(jiān)控頁面上看到的計數(shù)器的值一樣。 } catch (ioexception e) { progress[ 0 ] = 0 ; progress[ 1 ] = 0 ; } request.getsession().setattribute( "map" , progress[ 0 ]); request.getsession().setattribute( "reduce" , progress[ 1 ]); } //處理文件上傳 public void handleuploadfiles(user user, list<string> filelist) { file folder = new file( "/home/chenjie/cjhadooponline/" + user.getu_username()); if (!folder.exists()) return ; if (folder.isdirectory()) { file[] files = folder.listfiles(); for (file file : files) { system.out.println(file.getname()); try { putfiletohadoopfsfolder(user, file, filelist); //將單個文件上傳到hadoop文件系統(tǒng) } catch (ioexception e) { system.err.println(e.getmessage()); } } } } //將單個文件上傳到hadoop文件系統(tǒng) private void putfiletohadoopfsfolder(user user, file file, list<string> filelist) throws ioexception { configuration conf = new configuration(); conf.addresource( new path( "/opt/hadoop-1.2.1/conf/core-site.xml" )); conf.addresource( new path( "/opt/hadoop-1.2.1/conf/hdfs-site.xml" )); filesystem filesystem = filesystem.get(conf); system.out.println(filesystem.geturi()); path localfile = new path(file.getabsolutepath()); path foler = new path( "/user/" + user.getu_username() + "/wordcountinput" ); if (!filesystem.exists(foler)) { filesystem.mkdirs(foler); } path hadoopfile = new path( "/user/" + user.getu_username() + "/wordcountinput/" + file.getname()); // if (filesystem.exists(hadoopfile)) { // system.out.println("file exists."); // } else { // filesystem.mkdirs(hadoopfile); // } filesystem.copyfromlocalfile( true , true , localfile, hadoopfile); filelist.add(hadoopfile.touri().tostring()); } } |
啟動hadoop:
運行結果:
可以在任意平臺下,登錄該項目地址,上傳文件,得到結果。
運行成功。
源代碼:https://github.com/tudoupaisimalingshu/cjhadooponline
以上就是本文的全部內容,希望對大家的學習有所幫助,也希望大家多多支持服務器之家。
原文鏈接:http://blog.csdn.net/csj941227/article/details/71786040