Java API實現向Hive批量導入數據
Java程序中產生的數據,如果導入oracle或者mysql庫,可以通過jdbc連接insert批量操作完成,但是當前版本的hive并不支持批量insert操作,因為需要先將結果數據寫入hdfs文件,然后插入Hive表中。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
|
package com.enn.idcard; import java.io.IOException; import java.sql.Connection; import java.sql.DriverManager; import java.sql.SQLException; import java.sql.Statement; import java.util.ArrayList; import java.util.List; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; /** * <p>Description: </p> * @author kangkaia * @date 2017年12月26日 下午1:42:24 */ public class HiveJdbc { public static void main(String[] args) throws IOException { List<List> argList = new ArrayList<List>(); List<String> arg = new ArrayList<String>(); arg.add( "12345" ); arg.add( "m" ); argList.add(arg); arg = new ArrayList<String>(); arg.add( "54321" ); arg.add( "f" ); argList.add(arg); // System.out.println(argList.toString()); String dst = "/test/kk.txt" ; createFile(dst,argList); loadData2Hive(dst); } /** * 將數據插入hdfs中,用于load到hive表中,默認分隔符是"\001" * @param dst * @param contents * @throws IOException */ public static void createFile(String dst , List<List> argList) throws IOException{ Configuration conf = new Configuration(); FileSystem fs = FileSystem.get(conf); Path dstPath = new Path(dst); //目標路徑 //打開一個輸出流 FSDataOutputStream outputStream = fs.create(dstPath); StringBuffer sb = new StringBuffer(); for (List<String> arg:argList){ for (String value:arg){ sb.append(value).append( "\001" ); } sb.deleteCharAt(sb.length() - 4 ); //去掉最后一個分隔符 sb.append( "\n" ); } sb.deleteCharAt(sb.length() - 2 ); //去掉最后一個換行符 byte [] contents = sb.toString().getBytes(); outputStream.write(contents); outputStream.close(); fs.close(); System.out.println( "文件創建成功!" ); } /** * 將HDFS文件load到hive表中 * @param dst */ public static void loadData2Hive(String dst) { String JDBC_DRIVER = "org.apache.hive.jdbc.HiveDriver" ; String CONNECTION_URL = "jdbc:hive2://server-13:10000/default;auth=noSasl" ; String username = "admin" ; String password = "admin" ; Connection con = null ; try { Class.forName(JDBC_DRIVER); con = (Connection) DriverManager.getConnection(CONNECTION_URL,username,password); Statement stmt = con.createStatement(); String sql = " load data inpath '" +dst+ "' into table population.population_information " ; stmt.execute(sql); System.out.println( "loadData到Hive表成功!" ); } catch (SQLException e) { e.printStackTrace(); } catch (ClassNotFoundException e) { e.printStackTrace(); } finally { // 關閉rs、ps和con if (con != null ){ try { con.close(); } catch (SQLException e) { e.printStackTrace(); } } } } } |
注意:
本例使用mvn搭建,conf配置文件放在src/main/resources目錄下。
Hive提供的默認文件存儲格式有textfile、sequencefile、rcfile等。用戶也可以通過實現接口來自定義輸入輸的文件格式。
在實際應用中,textfile由于無壓縮,磁盤及解析的開銷都很大,一般很少使用。Sequencefile以鍵值對的形式存儲的二進制的格式,其支持針對記錄級別和塊級別的壓縮。rcfile是一種行列結合的存儲方式(text file和sequencefile都是行表[row table]),其保證同一條記錄在同一個hdfs塊中,塊以列式存儲。一般而言,對于OLTP而言,行表優勢大于列表,對于OLAP而言,列表的優勢大于行表,特別容易想到當做聚合操作時,列表的復雜度將會比行表小的多,雖然單獨rcfile的列運算不一定總是存在的,但是rcfile的高壓縮率確實減少文件大小,因此實際應用中,rcfile總是成為不二的選擇,達觀數據平臺在選擇文件存儲格式時也大量選擇了rcfile方案。
通過hdfs導入hive的表默認是textfile格式的,因此可以改變存儲格式,具體方法是先創建sequencefile、rcfile等格式的空表,然后重新插入數據即可。
1
2
3
|
insert overwrite table seqfile_table select * from textfile_table; …… insert overwrite table rcfile_table select * from textfile_table; |
java 批量插入hive中轉在HDFS
稍微修改了下,這文章是通過將數據存盤后,加載到HIVE.
模擬數據放到HDFS然后加載到HIVE,請大家記得添加HIVE JDBC依賴否則會報錯。
加載前的數據表最好用外部表,否則會drop表的時候元數據會一起刪除!
1
2
3
4
5
|
< dependency > < groupId >org.apache.hive</ groupId > < artifactId >hive-jdbc</ artifactId > < version >1.1.0</ version > </ dependency > |
代碼
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
|
import java.io.IOException; import java.net.URI; import java.net.URISyntaxException; import java.sql.Connection; import java.sql.DriverManager; import java.sql.SQLException; import java.sql.Statement; import java.util.ArrayList; import java.util.List; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; public class Demo { public static void main(String[] args) throws Exception { List<List> argList = new ArrayList<List>(); List<String> arg = new ArrayList<String>(); arg.add( "12345" ); arg.add( "m" ); argList.add(arg); arg = new ArrayList<String>(); arg.add( "54321" ); arg.add( "f" ); argList.add(arg); // System.out.println(argList.toString()); String dst = "/test/kk.txt" ; createFile(dst,argList); // loadData2Hive(dst); } /** * 將數據插入hdfs中,用于load到hive表中,默認分隔符是"|" * @param dst * @param contents * @throws IOException * @throws Exception * @throws InterruptedException */ public static void createFile(String dst , List<List> argList) throws IOException, InterruptedException, Exception{ Configuration conf = new Configuration(); FileSystem fs = FileSystem.get( new URI( "hdfs://hadoop:9000" ),conf, "root" ); Path dstPath = new Path(dst); //目標路徑 //打開一個輸出流 FSDataOutputStream outputStream = fs.create(dstPath); StringBuffer sb = new StringBuffer(); for (List<String> arg:argList){ for (String value:arg){ sb.append(value).append( "|" ); } sb.deleteCharAt(sb.length() - 1 ); //去掉最后一個分隔符 sb.append( "\n" ); } byte [] contents = sb.toString().getBytes(); outputStream.write(contents); outputStream.flush();; outputStream.close(); fs.close(); System.out.println( "文件創建成功!" ); } /** * 將HDFS文件load到hive表中 * @param dst */ public static void loadData2Hive(String dst) { String JDBC_DRIVER = "org.apache.hive.jdbc.HiveDriver" ; String CONNECTION_URL = "jdbc:hive2://hadoop:10000/default" ; String username = "root" ; String password = "root" ; Connection con = null ; try { Class.forName(JDBC_DRIVER); con = (Connection) DriverManager.getConnection(CONNECTION_URL,username,password); Statement stmt = con.createStatement(); String sql = " load data inpath '" +dst+ "' into table test " ; //test 為插入的表 stmt.execute(sql); System.out.println( "loadData到Hive表成功!" ); } catch (SQLException e) { e.printStackTrace(); } catch (ClassNotFoundException e) { e.printStackTrace(); } finally { // 關閉rs、ps和con if (con != null ){ try { con.close(); } catch (SQLException e) { e.printStackTrace(); } } } } } |
以上為個人經驗,希望能給大家一個參考,也希望大家多多支持服務器之家。
原文鏈接:https://blog.csdn.net/kangkangwanwan/article/details/78915134