一区二区三区在线-一区二区三区亚洲视频-一区二区三区亚洲-一区二区三区午夜-一区二区三区四区在线视频-一区二区三区四区在线免费观看

服務器之家:專注于服務器技術及軟件下載分享
分類導航

PHP教程|ASP.NET教程|Java教程|ASP教程|編程技術|正則表達式|C/C++|IOS|C#|Swift|Android|VB|R語言|JavaScript|易語言|vb.net|

服務器之家 - 編程語言 - Java教程 - java 中Spark中將對象序列化存儲到hdfs

java 中Spark中將對象序列化存儲到hdfs

2020-11-12 17:35小水熊 Java教程

這篇文章主要介紹了java 中Spark中將對象序列化存儲到hdfs的相關資料,需要的朋友可以參考下

javaSpark中將對象序列化存儲到hdfs

摘要: Spark應用中經常會遇到這樣一個需求: 需要將JAVA對象序列化并存儲到HDFS, 尤其是利用MLlib計算出來的一些模型, 存儲到hdfs以便模型可以反復利用. 下面的例子演示了Spark環境下從Hbase讀取數據, 生成一個word2vec模型, 存儲到hdfs.

廢話不多說, 直接貼代碼了. spark1.4 + hbase0.98

?
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
import org.apache.spark.storage.StorageLevel
import scala.collection.JavaConverters._
import java.io.File
import java.io.FileInputStream
import java.io.FileOutputStream
import java.io.ObjectInputStream
import java.io.ObjectOutputStream
import java.net.URI
import java.util.Date
import org.ansj.library.UserDefineLibrary
import org.ansj.splitWord.analysis.NlpAnalysis
import org.ansj.splitWord.analysis.ToAnalysis
import org.apache.hadoop.fs.FSDataInputStream
import org.apache.hadoop.fs.FSDataOutputStream
import org.apache.hadoop.fs.FileSystem
import org.apache.hadoop.fs.FileUtil
import org.apache.hadoop.fs.Path
import org.apache.hadoop.hbase.client._
import org.apache.hadoop.hbase.{HBaseConfiguration, HTableDescriptor, TableName}
import org.apache.hadoop.hbase.filter.FilterList
import org.apache.hadoop.hbase.filter.PageFilter
import org.apache.hadoop.hbase.filter.RegexStringComparator
import org.apache.hadoop.hbase.filter.SingleColumnValueFilter
import org.apache.hadoop.hbase.filter.CompareFilter.CompareOp
import org.apache.hadoop.hbase.mapreduce.TableInputFormat
import org.apache.hadoop.hbase.protobuf.ProtobufUtil
import org.apache.hadoop.hbase.util.{Base64, Bytes}
import com.feheadline.fespark.db.Neo4jManager
import com.feheadline.fespark.util.Env
import org.apache.spark.SparkConf
import org.apache.spark.SparkContext
import org.apache.spark.rdd._
import org.apache.spark.mllib.feature.{Word2Vec, Word2VecModel}
import scala.math.log
import scala.io.Source
 
object Word2VecDemo {
 
 def convertScanToString(scan: Scan) = {
  val proto = ProtobufUtil.toScan(scan)
  Base64.encodeBytes(proto.toByteArray)
 }
 
 def main(args: Array[String]): Unit = {
  val sparkConf = new SparkConf().setAppName("Word2Vec Demo")
  sparkConf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
  sparkConf.set("spark.kryoserializer.buffer", "256m")
  sparkConf.set("spark.kryoserializer.buffer.max","2046m")
  sparkConf.set("spark.akka.frameSize", "500")
  sparkConf.set("spark.rpc.askTimeout", "30")
  
 
  val sc = new SparkContext(sparkConf)
  val hbaseConf = HBaseConfiguration.create()
  hbaseConf.set("hbase.zookeeper.quorum", "myzookeeper")
 
  hbaseConf.set(TableInputFormat.INPUT_TABLE, "crawled")
 
  val scan = new Scan()
  val filterList:FilterList = new FilterList(FilterList.Operator.MUST_PASS_ALL)
  
  val comp:RegexStringComparator = new RegexStringComparator(""".{1500,}""")
  
  val articleFilter:SingleColumnValueFilter = new SingleColumnValueFilter(
  "data".getBytes,
  "article".getBytes,
  CompareOp.EQUAL,
  comp
  )
  
  filterList.addFilter(articleFilter)
  filterList.addFilter(new PageFilter(100))
  
  scan.setFilter(filterList)
  scan.setCaching(50)
  scan.setCacheBlocks(false)
  hbaseConf.set(TableInputFormat.SCAN,convertScanToString(scan))
 
  val crawledRDD = sc.newAPIHadoopRDD(
   hbaseConf,
   classOf[TableInputFormat],
   classOf[org.apache.hadoop.hbase.io.ImmutableBytesWritable],
   classOf[org.apache.hadoop.hbase.client.Result]
  )
 
  val articlesRDD = crawledRDD.filter{
   case (_,result) => {
     val content = Bytes.toString(result.getValue("data".getBytes,"article".getBytes))
     content != null
   }
  }
 
  val wordsInDoc = articlesRDD.map{
   case (_,result) => {
     val content = Bytes.toString(result.getValue("data".getBytes,"article".getBytes))
     if(content!=null)ToAnalysis.parse(content).asScala.map(_.getName).toSeq
     else Seq("")
   }
  }
  
  val fitleredWordsInDoc = wordsInDoc.filter(_.nonEmpty)
  
  val word2vec = new Word2Vec()
  val model = word2vec.fit(fitleredWordsInDoc)
  
  //---------------------------------------重點看這里-------------------------------------------------------------
  //將上面的模型存儲到hdfs
  val hadoopConf = sc.hadoopConfiguration
  hadoopConf.set("fs.defaultFS", "hdfs://myhadoop:9000/")
  val fileSystem = FileSystem.get(hadoopConf)
  val path = new Path("/user/hadoop/data/mllib/word2vec-object")
  val oos = new ObjectOutputStream(new FSDataOutputStream(fileSystem.create(path)))
  oos.writeObject(model)
  oos.close
  
  //這里示例另外一個程序直接從hdfs讀取序列化對象使用模型
  val ois = new ObjectInputStream(new FSDataInputStream(fileSystem.open(path)))
  val sample_model = ois.readObject.asInstanceOf[Word2VecModel]
  
  /*
  * //你還可以將序列化文件從hdfs放到本地, scala程序使用模型
  * import java.io._
  * import org.apache.spark.mllib.feature.{Word2Vec, Word2VecModel}
  * val ois = new ObjectInputStream(new FileInputStream("/home/cherokee/tmp/word2vec-object"))
  * val sample_model = ois.readObject.asInstanceOf[Word2VecModel]
  * ois.close
  */
  //--------------------------------------------------------------------------------------------------------------
 }
}

感謝閱讀,希望能幫助到大家,謝謝大家對本站的支持!

原文鏈接:https://my.oschina.net/waterbear/blog/525347

延伸 · 閱讀

精彩推薦
主站蜘蛛池模板: 黑人巨大初黑人解禁作品 | 超碰成人在线播放 | 91在线精品老司机免费播放 | 免费理伦片高清在线 | 亚洲国产果果在线播放在线 | 希望影院高清免费观看视频 | 性夜夜春夜夜爽AA片A | 日本特级a禁片在线播放 | 男生同性视频twink在线 | 久久热国产在线视频 | 成人资源在线观看 | 欧美亚洲一区二区三区在线 | 亚洲免费视 | 波多野结衣在线观看中文字幕 | 青青草在观免费 | 羞羞视频动漫 | 香蕉久久高清国产精品免费 | 无套大战白嫩乌克兰美女 | 天天干天天日天天射天天操毛片 | 免费一级日本c片完整版 | 免费免费啪视频在线观播放 | 波多野结衣中文字幕乱七八糟 | 我的漂亮朋友在线观看全集免费 | 国产成人综合一区人人 | 性做久久久久久久久浪潮 | 国产探花视频在线观看 | 99久久精彩视频 | 国产高清一区二区 | 色琪琪久久草在线视频 | 翁公与小莹在客厅激情 | 韩国最新理论三级在线观看 | 国产美女在线一区二区三区 | 黑人巨大爆粗亚裔女人 | 91精品国产在线 | 99re热精品这里精品 | 日本高清免费不卡在线播放 | 国产日韩欧美在线一区二区三区 | 国内精品久久久久影院中国 | 国产三级精品三级男人的天堂 | 毛片在线播放a | 午夜爱情动作片P |