这篇文章将为大家详细讲解有关spark怎么写HBase,小编觉得挺实用的,因此分享给大家做个参考,希望大家阅读完这篇文章后可以有所收获。package com.iesol.high_frequencyimport java.io.Buffe
这篇文章将为大家详细讲解有关spark怎么写HBase,小编觉得挺实用的,因此分享给大家做个参考,希望大家阅读完这篇文章后可以有所收获。
package com.iesol.high_frequency
import java.io.BufferedInputStream;
import java.io.BufferedReader;
import java.io.DatainputStream;
import java.io.DataOutputStream;
import java.io.File;
import java.io.FileInputStream;
import java.io.FileNotFoundException;
import java.io.FileOutputStream;
import java.io.FileReader;
import java.io.IOException;
import java.io.ObjectInputStream;
import java.io.ObjectOutputStream;
import java.io.Serializable;
import Scala.util.control._;
import java.NIO.file.Path;
import java.nio.file.Paths;
import com.isesol.mapReduce.binFileRead_forscala
import java.util.List;
import org.apache.spark.SparkContext
import org.apache.spark.SparkConf
import org.apache.hadoop.hbase.HBaseConfiguration
import org.apache.hadoop.mapred.JobConf
import org.apache.hadoop.hbase.mapred.TableOutputFORMat
import org.apache.hadoop.hbase.util.Bytes
import org.apache.spark.rdd.RDD.rddToPairRDDFunctions
import org.apache.hadoop.hbase.client.Put
import org.apache.hadoop.hbase.io.ImmutableBytesWritable
import org.apache.hadoop.hbase.spark._
import org.apache.hadoop.hbase.client.Scan
import org.apache.hadoop.hbase.TableName
import org.apache.hadoop.hbase.filter._
import org.apache.hadoop.hbase.filter.CompareFilter.CompareOp
import org.apache.hadoop.hbase.client.HTable
import scala.util.Random
object parseFile {
def main(args: Array[String]) {
val fileName = args(0)
val appId = args(1)
val Machine_tool = args(2)
val bizId = args(3)
//colId 表示目前只有一种高频采集,通过colID找到对应的表字段个数
val colId = "1"
val conf = new SparkConf()
conf.setMaster("local").setAppName("high frequency collection " + appId)
val sc = new SparkContext(conf)
val hbaseCols = binFileRead_forscala.getHaseCols(colId)
val total_colNums = hbaseCols.size()
val getFile = binFileRead_forscala.binFileOut(fileName, total_colNums)
val getData = new Array[String](getFile.size())
for (num <- 0 to getFile.size() - 1) {
getData(num) = getFile.get(num)
}
val hbaseCols_scala = new Array[String](hbaseCols.size())
for (num <- 0 to hbaseCols.size() - 1) {
hbaseCols_scala(num) = hbaseCols.get(num)
println("hbase cols is " + hbaseCols_scala(num))
}
val bankRDD = sc.parallelize(getData).map { x => x.split(",") }
try {
bankRDD.foreachPartition { x =>
var count = 0
val hbaseconf = HBaseConfiguration.create()
hbaseconf.set("hbase.ZooKeeper.quorum", "datanode01.isesol.com,datanode02.isesol.com,datanode03.isesol.com,datanode04.isesol.com,cmserver.isesol.com")
hbaseconf.set("hbase.zookeeper.property.clientPort", "2181")
hbaseconf.set("maxSessionTimeout", "6")
val myTable = new HTable(hbaseconf, TableName.valueOf("t_high_frequently"))
// myTable.setAutoFlush(true)
myTable.setWriteBufferSize(3 * 1024 * 1024)
x.foreach { y =>
{
var rowkey = System.currentTimeMillis().toString()
val p = new Put(Bytes.toBytes(machine_tool + "-" + appId + "-" + bizId + "-" + rowkey))
for (i <- 0 to hbaseCols_scala.size - 1) {
p.add(Bytes.toBytes("cf"), Bytes.toBytes(hbaseCols_scala(i)), Bytes.toBytes(y(i)))
}
myTable.put(p)
}
}
myTable.flushCommits()
myTable.close()
}
} catch {
case ex: Exception => println("can not connect hbase")
}
}
}
关于“Spark怎么写HBASE”这篇文章就分享到这里了,希望以上内容可以对大家有一定的帮助,使各位可以学到更多知识,如果觉得文章不错,请把它分享出去让更多的人看到。
--结束END--
本文标题: Spark怎么写HBASE
本文链接: https://lsjlt.com/news/232414.html(转载时请注明来源链接)
有问题或投稿请发送至: 邮箱/279061341@qq.com QQ/279061341
2024-05-24
2024-05-24
2024-05-24
2024-05-24
2024-05-24
2024-05-24
2024-05-24
2024-05-24
2024-05-24
2024-05-24
回答
回答
回答
回答
回答
回答
回答
回答
回答
回答
0