java 中Spark中将对象序列化存储到hdfs摘要: spark应用中经常会遇到这样一个需求: 需要将JAVA对象序列化并存储到hdfs, 尤其是利用MLlib计算出来的一些模型, 存储到hdfs以便模型可以反复利用. 下面的例子演示了
java 中Spark中将对象序列化存储到hdfs
摘要: spark应用中经常会遇到这样一个需求: 需要将JAVA对象序列化并存储到hdfs, 尤其是利用MLlib计算出来的一些模型, 存储到hdfs以便模型可以反复利用. 下面的例子演示了Spark环境下从HBase读取数据, 生成一个Word2vec模型, 存储到hdfs.
废话不多说, 直接贴代码了. spark1.4 + hbase0.98
import org.apache.spark.storage.StorageLevelimport Scala.collection.JavaConverters._import java.io.Fileimport java.io.FileInputStreamimport java.io.FileOutputStreamimport java.io.ObjectInputStreamimport java.io.ObjectOutputStreamimport java.net.URIimport java.util.Dateimport org.ansj.library.UserDefineLibraryimport org.ansj.splitWord.analysis.NLPAnalysisimport org.ansj.splitWord.analysis.ToAnalysisimport org.apache.hadoop.fs.FSDatainputStreamimport org.apache.hadoop.fs.FSDataOutputStreamimport org.apache.hadoop.fs.FileSystemimport org.apache.hadoop.fs.FileUtilimport org.apache.hadoop.fs.Pathimport org.apache.hadoop.hbase.client._import org.apache.hadoop.hbase.{HBaseConfiguration, HTableDescriptor, TableName}import org.apache.hadoop.hbase.filter.FilterListimport org.apache.hadoop.hbase.filter.PageFilterimport org.apache.hadoop.hbase.filter.RegexStrinGComparatorimport org.apache.hadoop.hbase.filter.SingleColumnValueFilterimport org.apache.hadoop.hbase.filter.CompareFilter.CompareOpimport org.apache.hadoop.hbase.mapReduce.TableInputFORMatimport org.apache.hadoop.hbase.protobuf.ProtobufUtilimport org.apache.hadoop.hbase.util.{Base64, Bytes}import com.feheadline.fespark.db.Neo4jManagerimport com.feheadline.fespark.util.Envimport org.apache.spark.SparkConfimport org.apache.spark.SparkContextimport org.apache.spark.rdd._import org.apache.spark.mllib.feature.{Word2Vec, Word2VecModel}import scala.math.logimport scala.io.Sourceobject Word2VecDemo { def convertScanToString(scan: Scan) = { val proto = ProtobufUtil.toScan(scan) Base64.encodeBytes(proto.toByteArray) } def main(args: Array[String]): Unit = { val sparkConf = new SparkConf().setAppName("Word2Vec Demo") sparkConf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer") sparkConf.set("spark.kryoserializer.buffer", "256m") sparkConf.set("spark.kryoserializer.buffer.max","2046m") sparkConf.set("spark.akka.frameSize", "500") sparkConf.set("spark.rpc.askTimeout", "30") val sc = new SparkContext(sparkConf) val hbaseConf = HBaseConfiguration.create() hbaseConf.set("hbase.ZooKeeper.quorum", "myzookeeper") hbaseConf.set(TableInputFormat.INPUT_TABLE, "crawled") val scan = new Scan() val filterList:FilterList = new FilterList(FilterList.Operator.MUST_PASS_ALL) val comp:RegexStringComparator = new RegexStringComparator(""".{1500,}""") val articleFilter:SingleColumnValueFilter = new SingleColumnValueFilter( "data".getBytes, "article".getBytes, CompareOp.EQUAL, comp ) filterList.addFilter(articleFilter) filterList.addFilter(new PageFilter(100)) scan.setFilter(filterList) scan.setCaching(50) scan.setCacheBlocks(false) hbaseConf.set(TableInputFormat.SCAN,convertScanToString(scan)) val crawledRDD = sc.newapiHadoopRDD( hbaseConf, classOf[TableInputFormat], classOf[org.apache.hadoop.hbase.io.ImmutableBytesWritable], classOf[org.apache.hadoop.hbase.client.Result] ) val articlesRDD = crawledRDD.filter{ case (_,result) => { val content = Bytes.toString(result.getValue("data".getBytes,"article".getBytes)) content != null } } val wordsInDoc = articlesRDD.map{ case (_,result) => { val content = Bytes.toString(result.getValue("data".getBytes,"article".getBytes)) if(content!=null)ToAnalysis.parse(content).asScala.map(_.getName).toSeq else Seq("") } } val fitleredWordsInDoc = wordsInDoc.filter(_.nonEmpty) val word2vec = new Word2Vec() val model = word2vec.fit(fitleredWordsInDoc) //---------------------------------------重点看这里------------------------------------------------------------- //将上面的模型存储到hdfs val hadoopConf = sc.hadoopConfiguration hadoopConf.set("fs.defaultFS", "hdfs://myhadoop:9000/") val fileSystem = FileSystem.get(hadoopConf) val path = new Path("/user/hadoop/data/mllib/word2vec-object") val oos = new ObjectOutputStream(new FSDataOutputStream(fileSystem.create(path))) oos.writeObject(model) oos.close //这里示例另外一个程序直接从hdfs读取序列化对象使用模型 val ois = new ObjectInputStream(new FSDataInputStream(fileSystem.open(path))) val sample_model = ois.readObject.asInstanceOf[Word2VecModel] //-------------------------------------------------------------------------------------------------------------- }}
--结束END--
本文标题: java 中Spark中将对象序列化存储到hdfs
本文链接: https://lsjlt.com/news/225631.html(转载时请注明来源链接)
有问题或投稿请发送至: 邮箱/279061341@qq.com QQ/279061341
2024-05-24
2024-05-24
2024-05-24
2024-05-24
2024-05-24
2024-05-24
2024-05-24
2024-05-24
2024-05-24
2024-05-24
回答
回答
回答
回答
回答
回答
回答
回答
回答
回答
0