一区二区三区在线-一区二区三区亚洲视频-一区二区三区亚洲-一区二区三区午夜-一区二区三区四区在线视频-一区二区三区四区在线免费观看

服務器之家:專注于服務器技術及軟件下載分享
分類導航

PHP教程|ASP.NET教程|Java教程|ASP教程|編程技術|正則表達式|C/C++|IOS|C#|Swift|Android|VB|R語言|JavaScript|易語言|vb.net|

服務器之家 - 編程語言 - Java教程 - Java/Web調用Hadoop進行MapReduce示例代碼

Java/Web調用Hadoop進行MapReduce示例代碼

2021-02-22 11:14土豆拍死馬鈴薯 Java教程

本篇文章主要介紹了Java/Web調用Hadoop進行MapReduce示例代碼,小編覺得挺不錯的,現(xiàn)在分享給大家,也給大家做個參考。一起跟隨小編過來看看吧

hadoop環(huán)境搭建詳見此文章 http://www.ythuaji.com.cn/article/140756.html 。

我們已經知道hadoop能夠通過hadoop jar ***.jar input output的形式通過命令行來調用,那么如何將其封裝成一個服務,讓java/web來調用它?使得用戶可以用方便的方式上傳文件到hadoop并進行處理,獲得結果。首先,***.jar是一個hadoop任務類的封裝,我們可以在沒有jar的情況下運行該類的main方法,將必要的參數(shù)傳遞給它。input 和output則將用戶上傳的文件使用hadoop的javaapi put到hadoop的文件系統(tǒng)中。然后再通過hadoop的javaapi 從文件系統(tǒng)中取得結果文件。

搭建javaweb工程。本文使用spring、springmvc、mybatis框架, 當然,這不是重點,就算沒有使用任何框架也能實現(xiàn)。

項目框架如下:

Java/Web調用Hadoop進行MapReduce示例代碼

項目中使用到的jar包如下:

Java/Web調用Hadoop進行MapReduce示例代碼Java/Web調用Hadoop進行MapReduce示例代碼

在spring的配置文件中,加入

?
1
2
3
4
5
<bean id="multipartresolver" class="org.springframework.web.multipart.commons.commonsmultipartresolver">
   <property name="defaultencoding" value="utf-8" />
   <property name="maxuploadsize" value="10485760000" />
   <property name="maxinmemorysize" value="40960" />
</bean>

使得項目支持文件上傳。

新建一個login.jsp 點擊登錄后進入user/login

Java/Web調用Hadoop進行MapReduce示例代碼

user/login中處理登錄,登錄成功后,【在hadoop文件系統(tǒng)中創(chuàng)建用戶文件夾】,然后跳轉到console.jsp

?
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
package com.chenjie.controller;
 
import java.io.ioexception;
  
import javax.annotation.resource;
 
import javax.servlet.http.httpservletrequest;
 
import javax.servlet.http.httpservletresponse;
 
import org.apache.hadoop.conf.configuration;
 
import org.apache.hadoop.fs.filesystem;
 
import org.apache.hadoop.fs.path;
 
import org.springframework.stereotype.controller;
 
import org.springframework.web.bind.annotation.requestmapping;
 
import com.chenjie.pojo.jsonresult;
 
import com.chenjie.pojo.user;
 
import com.chenjie.service.userservice;
 
import com.chenjie.util.appconfig;
 
import com.google.gson.gson;
/**
 
 * 用戶請求控制器
 
 *
 
 * @author chen
 
 *
 
 */
 
@controller
 
// 聲明當前類為控制器
 
@requestmapping("/user")
 
// 聲明當前類的路徑
 
public class usercontroller {
 
  @resource(name = "userservice")
 
  private userservice userservice;// 由spring容器注入一個userservice實例
  /**
 
   * 登錄
 
   *
 
   * @param user
 
   *      用戶
 
   * @param request
 
   * @param response
 
   * @throws ioexception
 
   */
 
  @requestmapping("/login")
 
  // 聲明當前方法的路徑
 
  public string login(user user, httpservletrequest request,
 
      httpservletresponse response) throws ioexception {
 
    response.setcontenttype("application/json");// 設置響應內容格式為json
 
    user result = userservice.login(user);// 調用userservice的登錄方法
 
    request.getsession().setattribute("user", result);
 
    if (result != null) {
 
      createhadoopfsfolder(result);
 
      return "console";
 
    }
 
    return "login";
 
  }
 
  public void createhadoopfsfolder(user user) throws ioexception {
 
    configuration conf = new configuration();
 
    conf.addresource(new path("/opt/hadoop-1.2.1/conf/core-site.xml"));
 
    conf.addresource(new path("/opt/hadoop-1.2.1/conf/hdfs-site.xml"));
 
 
 
    filesystem filesystem = filesystem.get(conf);
 
    system.out.println(filesystem.geturi());
 
 
 
    path file = new path("/user/" + user.getu_username());
 
    if (filesystem.exists(file)) {
 
      system.out.println("haddop hdfs user foler exists.");
 
      filesystem.delete(file, true);
 
      system.out.println("haddop hdfs user foler delete success.");
 
    }
 
    filesystem.mkdirs(file);
 
    system.out.println("haddop hdfs user foler creat success.");
 
  }
}

console.jsp中進行文件上傳和任務提交、

Java/Web調用Hadoop進行MapReduce示例代碼

文件上傳和任務提交:

?
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
package com.chenjie.controller;
 
import java.io.file;
import java.io.ioexception;
import java.net.inetsocketaddress;
import java.net.uri;
import java.util.arraylist;
import java.util.iterator;
import java.util.list;
 
import javax.servlet.http.httpservletrequest;
import javax.servlet.http.httpservletresponse;
 
import org.apache.hadoop.conf.configuration;
import org.apache.hadoop.fs.fsdatainputstream;
import org.apache.hadoop.fs.filesystem;
import org.apache.hadoop.fs.path;
import org.apache.hadoop.mapred.jobclient;
import org.apache.hadoop.mapred.jobconf;
import org.apache.hadoop.mapred.jobid;
import org.apache.hadoop.mapred.jobstatus;
import org.apache.hadoop.mapred.runningjob;
import org.springframework.stereotype.controller;
import org.springframework.web.bind.annotation.requestmapping;
import org.springframework.web.multipart.multipartfile;
import org.springframework.web.multipart.multiparthttpservletrequest;
import org.springframework.web.multipart.commons.commonsmultipartresolver;
 
import com.chenjie.pojo.user;
import com.chenjie.util.utils;
 
@controller
// 聲明當前類為控制器
@requestmapping("/hadoop")
// 聲明當前類的路徑
public class hadoopcontroller {
 
  @requestmapping("/upload")
  // 聲明當前方法的路徑
  //文件上傳
  public string upload(httpservletrequest request,
      httpservletresponse response) throws ioexception {
    list<string> filelist = (list<string>) request.getsession()
        .getattribute("filelist");//得到用戶已上傳文件列表
    if (filelist == null)
      filelist = new arraylist<string>();//如果文件列表為空,則新建
    user user = (user) request.getsession().getattribute("user");
    if (user == null)
      return "login";//如果用戶未登錄,則跳轉登錄頁面
    commonsmultipartresolver multipartresolver = new commonsmultipartresolver(
        request.getsession().getservletcontext());//得到在spring配置文件中注入的文件上傳組件
    if (multipartresolver.ismultipart(request)) {//如果請求是文件請求
      multiparthttpservletrequest multirequest = (multiparthttpservletrequest) request;
 
      iterator<string> iter = multirequest.getfilenames();//得到文件名迭代器
      while (iter.hasnext()) {
        multipartfile file = multirequest.getfile((string) iter.next());
        if (file != null) {
          string filename = file.getoriginalfilename();
          file folder = new file("/home/chenjie/cjhadooponline/"
              + user.getu_username());
          if (!folder.exists()) {
            folder.mkdir();//如果文件不目錄存在,則在服務器本地創(chuàng)建
          }
          string path = "/home/chenjie/cjhadooponline/"
              + user.getu_username() + "/" + filename;
 
          file localfile = new file(path);
 
          file.transferto(localfile);//將上傳文件拷貝到服務器本地目錄
          // filelist.add(path);
        }
        handleuploadfiles(user, filelist);//處理上傳文件
      }
 
    }
    request.getsession().setattribute("filelist", filelist);//將上傳文件列表保存在session中
    return "console";//返回console.jsp繼續(xù)上傳文件
  }
 
  @requestmapping("/wordcount")
  //調用hadoop進行mapreduce
  public void wordcount(httpservletrequest request,
      httpservletresponse response) {
    system.out.println("進入controller wordcount ");
    user user = (user) request.getsession().getattribute("user");
    system.out.println(user);
    // if(user == null)
    // return "login";
    wordcount c = new wordcount();//新建單詞統(tǒng)計任務
    string username = user.getu_username();
    string input = "hdfs://chenjie-virtual-machine:9000/user/" + username
        + "/wordcountinput";//指定hadoop文件系統(tǒng)的輸入文件夾
    string output = "hdfs://chenjie-virtual-machine:9000/user/" + username
        + "/wordcountoutput";//指定hadoop文件系統(tǒng)的輸出文件夾
    string reslt = output + "/part-r-00000";//默認輸出文件
    try {
      thread.sleep(3*1000);
      c.main(new string[] { input, output });//調用單詞統(tǒng)計任務
      configuration conf = new configuration();//新建hadoop配置
      conf.addresource(new path("/opt/hadoop-1.2.1/conf/core-site.xml"));//添加hadoop配置,找到hadoop部署信息
      conf.addresource(new path("/opt/hadoop-1.2.1/conf/hdfs-site.xml"));//hadoop配置,找到文件系統(tǒng)
 
      filesystem filesystem = filesystem.get(conf);//得打文件系統(tǒng)
      path file = new path(reslt);//找到輸出結果文件
      fsdatainputstream instream = filesystem.open(file);//打開
      uri uri = file.touri();//得到輸出文件路徑
      system.out.println(uri);
      string data = null;
      while ((data = instream.readline()) != null) {
        //system.out.println(data);
        response.getoutputstream().println(data);//講結果文件寫回用戶網頁
      }
//     inputstream in = filesystem.open(file);
//     outputstream out = new fileoutputstream("result.txt");
//     ioutils.copybytes(in, out, 4096, true);
      instream.close();
    } catch (exception e) {
      system.err.println(e.getmessage());
    }
  }
 
  @requestmapping("/mapreducestates")
  //得到mapreduce的狀態(tài)
  public void mapreduce(httpservletrequest request,
      httpservletresponse response) {
    float[] progress=new float[2];
    try {
      configuration conf1=new configuration();
      conf1.set("mapred.job.tracker", utils.jobtracker);
       
      jobstatus jobstatus = utils.getjobstatus(conf1);
//     while(!jobstatus.isjobcomplete()){
//       progress = utils.getmapreduceprogess(jobstatus);
//       response.getoutputstream().println("map:" + progress[0] + "reduce:" + progress[1]);
//       thread.sleep(1000);
//     }
      jobconf jc = new jobconf(conf1);
       
      jobclient jobclient = new jobclient(jc);
      jobstatus[] jobsstatus = jobclient.getalljobs(); 
      //這樣就得到了一個jobstatus數(shù)組,隨便取出一個元素取名叫jobstatus 
      jobstatus = jobsstatus[0]; 
      jobid jobid = jobstatus.getjobid(); //通過jobstatus獲取jobid 
      runningjob runningjob = jobclient.getjob(jobid); //通過jobid得到runningjob對象 
      runningjob.getjobstate();//可以獲取作業(yè)狀態(tài),狀態(tài)有五種,為jobstatus.failed 、jobstatus.killed、jobstatus.prep、jobstatus.running、jobstatus.succeeded 
      jobstatus.getusername();//可以獲取運行作業(yè)的用戶名。 
      runningjob.getjobname();//可以獲取作業(yè)名。 
      jobstatus.getstarttime();//可以獲取作業(yè)的開始時間,為utc毫秒數(shù)。 
      float map = runningjob.mapprogress();//可以獲取map階段完成的比例,0~1, 
      system.out.println("map=" + map);
      float reduce = runningjob.reduceprogress();//可以獲取reduce階段完成的比例。
      system.out.println("reduce="+reduce);
      runningjob.getfailureinfo();//可以獲取失敗信息。 
      runningjob.getcounters();//可以獲取作業(yè)相關的計數(shù)器,計數(shù)器的內容和作業(yè)監(jiān)控頁面上看到的計數(shù)器的值一樣。 
       
       
    } catch (ioexception e) {
      progress[0] = 0;
      progress[1] = 0;
    }
   
    request.getsession().setattribute("map", progress[0]);
    request.getsession().setattribute("reduce", progress[1]);
  }
   
  //處理文件上傳
  public void handleuploadfiles(user user, list<string> filelist) {
    file folder = new file("/home/chenjie/cjhadooponline/"
        + user.getu_username());
    if (!folder.exists())
      return;
    if (folder.isdirectory()) {
      file[] files = folder.listfiles();
      for (file file : files) {
        system.out.println(file.getname());
        try {
          putfiletohadoopfsfolder(user, file, filelist);//將單個文件上傳到hadoop文件系統(tǒng)
        } catch (ioexception e) {
          system.err.println(e.getmessage());
        }
      }
    }
  }
 
  //將單個文件上傳到hadoop文件系統(tǒng)
  private void putfiletohadoopfsfolder(user user, file file,
      list<string> filelist) throws ioexception {
    configuration conf = new configuration();
    conf.addresource(new path("/opt/hadoop-1.2.1/conf/core-site.xml"));
    conf.addresource(new path("/opt/hadoop-1.2.1/conf/hdfs-site.xml"));
 
    filesystem filesystem = filesystem.get(conf);
    system.out.println(filesystem.geturi());
 
    path localfile = new path(file.getabsolutepath());
    path foler = new path("/user/" + user.getu_username()
        + "/wordcountinput");
    if (!filesystem.exists(foler)) {
      filesystem.mkdirs(foler);
    }
     
    path hadoopfile = new path("/user/" + user.getu_username()
        + "/wordcountinput/" + file.getname());
//   if (filesystem.exists(hadoopfile)) {
//     system.out.println("file exists.");
//   } else {
//     filesystem.mkdirs(hadoopfile);
//   }
    filesystem.copyfromlocalfile(true, true, localfile, hadoopfile);
    filelist.add(hadoopfile.touri().tostring());
 
  }
 
}

啟動hadoop:

Java/Web調用Hadoop進行MapReduce示例代碼

運行結果:

可以在任意平臺下,登錄該項目地址,上傳文件,得到結果。

Java/Web調用Hadoop進行MapReduce示例代碼

Java/Web調用Hadoop進行MapReduce示例代碼


Java/Web調用Hadoop進行MapReduce示例代碼

Java/Web調用Hadoop進行MapReduce示例代碼

Java/Web調用Hadoop進行MapReduce示例代碼

Java/Web調用Hadoop進行MapReduce示例代碼

運行成功。

源代碼:https://github.com/tudoupaisimalingshu/cjhadooponline

以上就是本文的全部內容,希望對大家的學習有所幫助,也希望大家多多支持服務器之家。

原文鏈接:http://blog.csdn.net/csj941227/article/details/71786040

延伸 · 閱讀

精彩推薦
主站蜘蛛池模板: 91香蕉小视频 | 国产aⅴ一区二区三区 | 色偷偷亚洲综合网亚洲 | 日日草视频| 我的青梅竹马是消防员2季未增删免费 | 精品久久亚洲 | 国产普通话对白露脸流出 | 国产情侣啪啪 | 99精品久久精品一区二区 | 特级老女人淫片高清视频 | 欧美日韩在线观看精品 | 国产欧美在线播放 | free性泰国女人hd | 视频一区在线观看 | 国产精品久久久久aaaa | 色狠狠色狠狠综合天天 | 欧美精品黑人巨大在线播放 | 亚洲第一免费播放区 | 欧美另类videos另类粗暴 | xxxx俄罗斯大白屁股 | 瘦老汉gay| 国产一级在线免费观看 | 精品国产福利一区二区在线 | 国产精品视频人人做人人爱 | 精品久久久久免费极品大片 | 日韩一级片在线观看 | 三级理论在线观看 | 亚洲va久久久久综合 | 99久视频| 午夜福利合集1000在线 | 美女的隐私无遮挡撒尿 | 日韩一区二区三区免费 | 免费看隐私男生网站 | 亚洲久草视频 | 亚洲精品九色在线网站 | 翁熄性放纵交换300章 | 亚洲人成影院午夜网站 | 北条麻妃黑人 | 日本福利片国产午夜久久 | 日产国产精品亚洲系列 | 免费午夜影片在线观看影院 |