一:準備數據源
在項目下新建一個student.txt文件,里面的內容為:
1
2
3
4
|
1 ,zhangsan, 20 2 ,lisi, 21 3 ,wanger, 19 4 ,fangliu, 18 |
二:實現
java版:
1.首先新建一個student的bean對象,實現序列化和tostring()方法,具體代碼如下:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
|
package com.cxd.sql; import java.io.serializable; @suppresswarnings ( "serial" ) public class student implements serializable { string sid; string sname; int sage; public string getsid() { return sid; } public void setsid(string sid) { this .sid = sid; } public string getsname() { return sname; } public void setsname(string sname) { this .sname = sname; } public int getsage() { return sage; } public void setsage( int sage) { this .sage = sage; } @override public string tostring() { return "student [sid=" + sid + ", sname=" + sname + ", sage=" + sage + "]" ; } } |
2.轉換,具體代碼如下
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
|
package com.cxd.sql; import java.util.arraylist; import org.apache.spark.sparkconf; import org.apache.spark.api.java.javardd; import org.apache.spark.sql.dataset; import org.apache.spark.sql.row; import org.apache.spark.sql.rowfactory; import org.apache.spark.sql.savemode; import org.apache.spark.sql.sparksession; import org.apache.spark.sql.types.datatypes; import org.apache.spark.sql.types.structfield; import org.apache.spark.sql.types.structtype; public class txttoparquetdemo { public static void main(string[] args) { sparkconf conf = new sparkconf().setappname( "txttoparquet" ).setmaster( "local" ); sparksession spark = sparksession.builder().config(conf).getorcreate(); reflecttransform(spark); //java反射 dynamictransform(spark); //動態轉換 } /** * 通過java反射轉換 * @param spark */ private static void reflecttransform(sparksession spark) { javardd<string> source = spark.read().textfile( "stuinfo.txt" ).javardd(); javardd<student> rowrdd = source.map(line -> { string parts[] = line.split( "," ); student stu = new student(); stu.setsid(parts[ 0 ]); stu.setsname(parts[ 1 ]); stu.setsage(integer.valueof(parts[ 2 ])); return stu; }); dataset<row> df = spark.createdataframe(rowrdd, student. class ); df.select( "sid" , "sname" , "sage" ). coalesce( 1 ).write().mode(savemode.append).parquet( "parquet.res" ); } /** * 動態轉換 * @param spark */ private static void dynamictransform(sparksession spark) { javardd<string> source = spark.read().textfile( "stuinfo.txt" ).javardd(); javardd<row> rowrdd = source.map( line -> { string[] parts = line.split( "," ); string sid = parts[ 0 ]; string sname = parts[ 1 ]; int sage = integer.parseint(parts[ 2 ]); return rowfactory.create( sid, sname, sage ); }); arraylist<structfield> fields = new arraylist<structfield>(); structfield field = null ; field = datatypes.createstructfield( "sid" , datatypes.stringtype, true ); fields.add(field); field = datatypes.createstructfield( "sname" , datatypes.stringtype, true ); fields.add(field); field = datatypes.createstructfield( "sage" , datatypes.integertype, true ); fields.add(field); structtype schema = datatypes.createstructtype(fields); dataset<row> df = spark.createdataframe(rowrdd, schema); df.coalesce( 1 ).write().mode(savemode.append).parquet( "parquet.res1" ); } } |
scala版本:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
|
import org.apache.spark.sql.sparksession import org.apache.spark.sql.types.stringtype import org.apache.spark.sql.types.structfield import org.apache.spark.sql.types.structtype import org.apache.spark.sql.row import org.apache.spark.sql.types.integertype object rdd2dataset { case class student(id: int ,name:string,age: int ) def main(args:array[string]) { val spark=sparksession.builder().master( "local" ).appname( "rdd2dataset" ).getorcreate() import spark.implicits._ reflectcreate(spark) dynamiccreate(spark) } /** * 通過java反射轉換 * @param spark */ private def reflectcreate(spark:sparksession):unit={ import spark.implicits._ val sturdd=spark.sparkcontext.textfile( "student2.txt" ) //todf()為隱式轉換 val studf=sturdd.map(_.split( "," )).map(parts⇒student(parts( 0 ).trim.toint,parts( 1 ),parts( 2 ).trim.toint)).todf() //studf.select("id","name","age").write.text("result") //對寫入文件指定列名 studf.printschema() studf.createorreplacetempview( "student" ) val namedf=spark.sql( "select name from student where age<20" ) //namedf.write.text("result") //將查詢結果寫入一個文件 namedf.show() } /** * 動態轉換 * @param spark */ private def dynamiccreate(spark:sparksession):unit={ val sturdd=spark.sparkcontext.textfile( "student.txt" ) import spark.implicits._ val schemastring= "id,name,age" val fields=schemastring.split( "," ).map(fieldname => structfield(fieldname, stringtype, nullable = true )) val schema=structtype(fields) val rowrdd=sturdd.map(_.split( "," )).map(parts⇒row(parts( 0 ),parts( 1 ),parts( 2 ))) val studf=spark.createdataframe(rowrdd, schema) studf.printschema() val tmpview=studf.createorreplacetempview( "student" ) val namedf=spark.sql( "select name from student where age<20" ) //namedf.write.text("result") //將查詢結果寫入一個文件 namedf.show() } } |
注:
1.上面代碼全都已經測試通過,測試的環境為spark2.1.0,jdk1.8。
2.此代碼不適用于spark2.0以前的版本。
以上這篇java和scala實現 spark rdd轉換成dataframe的兩種方法小結就是小編分享給大家的全部內容了,希望能給大家一個參考,也希望大家多多支持服務器之家。
原文鏈接:https://blog.csdn.net/u010592112/article/details/73730796