这篇文章主要讲解了“sparkStreaming的实现和使用方法”,文中的讲解内容简单清晰,易于学习与理解,下面请大家跟着小编的思路慢慢深入,一起来研究和学习“SparkStreaming的实现和使用方法”吧!一.DStream 整合RDD
这篇文章主要讲解了“sparkStreaming的实现和使用方法”,文中的讲解内容简单清晰,易于学习与理解,下面请大家跟着小编的思路慢慢深入,一起来研究和学习“SparkStreaming的实现和使用方法”吧!
生产中使用多的是一个文件中有很多域名,另一个中是黑名单,要进行剔除数据一:日志信息 DStream domain,traffic xinlang.com xinlang.com baidu.com数据二:已有的文件 黑名单 RDD domain baidu.com
package sparkstreaming02import org.apache.spark.{SparkConf, SparkContext}import Scala.collection.mutable.ListBufferobject Demo1 { def main(args: Array[String]): Unit = { val conf = new SparkConf().setAppName("Demo1").setMaster("local[2]") val sc = new SparkContext(conf) val input1 = new ListBuffer[(String,Long)] input1.append(("www.xinlang.com", 8888)) input1.append(("www.xinalng.com", 9999)) input1.append(("www.baidu.com", 7777)) val data1 = sc.parallelize(input1) //进行join一定要是key,value形式的 val input2 = new ListBuffer[(String,Boolean)] input2.append(("www.baidu.com",true)) val data2 = sc.parallelize(input2) data1.leftOuterJoin(data2) .filter(x => { x._2._2.getOrElse(false) != true }).map(x => (x._1,x._2._1)) .collect().foreach(println) }}
package sparkstreaming02import org.apache.spark.SparkConfimport org.apache.spark.streaming.{Seconds, StreaminGContext}import scala.collection.mutable.ListBufferobject Streaming { def main(args: Array[String]): Unit = { val conf = new SparkConf().setAppName("Streaming").setMaster("local[2]") val ssc = new StreamingContext(conf,Seconds(10)) val lines = ssc.SocketTextStream("s201",9999) // 数据二: rdd val input2 = new ListBuffer[(String,Boolean)] input2.append(("www.baidu.com",true)) val data2 = ssc.sparkContext.parallelize(input2) lines.map(x=>(x.split(",")(0), x)).transfORM( rdd => { rdd.leftOuterJoin(data2) .filter(x => { x._2._2.getOrElse(false) != true //注意 join之后过滤 }).map(x => (x._1,x._2._1)) } ).print() ssc.start() ssc.awaitTermination() }}
、
connect 在Driver端创建,record在executor,发过去序列化错误
解决:第一种把connect放到executor端这样弊端是每条记录会生成一个connect太耗费资源 Words.foreachRDD { rdd => rdd.foreach { record => val connection = createConnection() // executed at the driver val word = record._1 val count = record._2.toInt val sql = s"insert into wc (wc,count) values($word,$count)" connection.createStatement().execute(sql) }
package sparkstreaming02import java.sql.DriverManagerimport org.apache.spark.SparkConfimport org.apache.spark.streaming.{Seconds, StreamingContext}object MysqlStreaming { def main(args: Array[String]): Unit = { val conf = new SparkConf().setMaster("local[2]").setAppName("MysqlStreaming") val ssc = new StreamingContext(conf,Seconds(1)) val lines = ssc.socketTextStream("s201",9999) val words = lines.flatMap(x => x.split(",")).map((_,1)).reduceByKey(_+_)// words.foreachRDD { rdd =>// val connection = createConnection() // executed at the driver// rdd.foreach { record =>// val word = record._1// val count = record._2// val sql = s"insert into wc (word,count) values($word,$count)"// connection.createStatement().execute(sql)// }// }// words.foreachRDD { rdd =>// rdd.foreach { record =>// val connection = createConnection() // executed at the driver// val word = record._1// val count = record._2.toInt// val sql = s"insert into wc (wc,count) values($word,$count)"// connection.createStatement().execute(sql)// }// } //最终的写法 words.foreachRDD { rdd => rdd.foreachPartition { partitionOfRecords => val connection = createConnection() partitionOfRecords.foreach( record =>{ val word = record._1 val count = record._2 val sql = s"insert into wc (wc,count) values('$word',$count)" connection.createStatement().execute(sql)} ) } } ssc.start() ssc.awaitTermination() } def createConnection() = { Class.forName("com.mysql.cj.jdbc.Driver") DriverManager.getConnection("jdbc:mysql://localhost:3306/Hive?serverTimezone=UTC&useSSL=false","root","123456") }}
错误,插入数据库时,你要插入字符串要用''例如:val sql = s"insert into wc (wc,count) values($word,$count)"word是字符串,你要不加双引号就报这个错误正确val sql = s"insert into wc (wc,count) values('$word',$count)"
感谢各位的阅读,以上就是“SparkStreaming的实现和使用方法”的内容了,经过本文的学习后,相信大家对SparkStreaming的实现和使用方法这一问题有了更深刻的体会,具体使用情况还需要大家实践验证。这里是编程网,小编将为大家推送更多相关知识点的文章,欢迎关注!
--结束END--
本文标题: SparkStreaming的实现和使用方法
本文链接: https://lsjlt.com/news/229960.html(转载时请注明来源链接)
有问题或投稿请发送至: 邮箱/279061341@qq.com QQ/279061341
2024-05-24
2024-05-24
2024-05-24
2024-05-24
2024-05-24
2024-05-24
2024-05-24
2024-05-24
2024-05-24
2024-05-24
回答
回答
回答
回答
回答
回答
回答
回答
回答
回答
0