返回顶部
首页 > 资讯 > 后端开发 > Python >详解Spark Sql在UDF中如何引用外部数据
  • 769
分享到

详解Spark Sql在UDF中如何引用外部数据

Spark Sql UDF引用外部数据Spark Sql UDF 2023-02-01 12:02:44 769人浏览 薄情痞子

Python 官方文档:入门教程 => 点击学习

摘要

目录前言场景介绍方法一 Driver端加载方法二 Excutor端加载小结前言 spark sql可以通过UDF来对DataFrame的Column进行自定义操作。在特定场景下定义

前言

spark sql可以通过UDF来对DataFrame的Column进行自定义操作。在特定场景下定义UDF可能需要用到Spark Context以外的资源或数据。比如从List或Map中取值,或是通过连接池从外部的数据源中读取数据,然后再参与Column的运算。

Excutor中每个task的工作线程都会对UDF的call进行调用,外部资源的使用发生在Excutor端,而资源加载既能发生在Driver端,也可以发生在Excutor端。如果外部资源对象能序列化,我们可以在Driver端进行初始化,然后广播(broadcast)到Excutor端参与运算。对于不能进行序列化的对象,如JedisPool(Redis连接池),只能在Excutor端进行初始化。

因此,在UDF中引用外部资源有以下两类方法:

  • 能序列化:在Driver端进行初始化,然后通过spark的broadcast方法广播到Excutor上进行使用;
  • 不能序列化:在Excutor端进行初始化然后使用。

下面我们将用一个实际例子对上述两种方法进行详细介绍。

本文使用环境:Spark-2.3.0,Java 8。

场景介绍

我们以一个DataFrame(两个字段node_1、node_2)作为原始数据;一棵二叉搜索树(BST)作为Spark外部被引用数据;目标是定义一个UDF来判断:BST中是否刚好存在一个父节点,它的左右子节点值与node_1、node_2两个字段值相同。然后将判断结果输出到新列is_bro。其中DataFrame:

BST:

输出DataFrame:

二叉树的定义与判断是否为父节点的左右子节点的逻辑如下:

import java.io.Serializable;

public class TreeNode implements Serializable{
    private Integer val;
    private TreeNode left;
    private TreeNode right;
    public TreeNode() {
    }
    public TreeNode(Integer val) {
        this.val = val;
    }
    public TreeNode(Integer val, TreeNode left, TreeNode right) {
        this.val = val;
        this.left = left;
        this.right = right;
    }
    public Integer getVal() {
        return val;
    }
    public void setVal(Integer val) {
        this.val = val;
    }
    public TreeNode getLeft() {
        return left;
    }
    public void setLeft(TreeNode left) {
        this.left = left;
    }
    public TreeNode getRight() {
        return right;
    }
    public void setRight(TreeNode right) {
        this.right = right;
    }
    
    public Boolean isBro( Integer num1, Integer num2) {
        if (null == getLeft()||null == getRight()) {
            return false;
        }
        if (getLeft().getVal().compareTo(num1)==0 && getRight().getVal().compareTo(num2)==0) {
            return true;
        }
        return getLeft().isBro(num1, num2) || getRight().isBro(num1, num2);
    }
}

生成上图所示BST的方法createTree()如下:

public static TreeNode createTree(){
    TreeNode[] treeNodes = new TreeNode[8];
    for(int i=1; i<=7; i++){
        treeNodes[i] =  new TreeNode(i);
    }
    treeNodes[2].setLeft(treeNodes[1]);
    treeNodes[2].setRight(treeNodes[3]);
    treeNodes[6].setLeft(treeNodes[5]);
    treeNodes[6].setRight(treeNodes[7]);
    treeNodes[4].setLeft(treeNodes[2]);
    treeNodes[4].setRight(treeNodes[6]);
    return treeNodes[4];
}

方法一 Driver端加载

在Driver端完成初始化并定义UDF

JavaSparkContext javaSparkContext = new JavaSparkContext(spark.sparkContext());
//  初始化树
TreeNode tree = createTree();
//  broadcast
Broadcast<TreeNode> broadcastTree = javaSparkContext.broadcast(tree);
//  lambda表达式定义udf
UserDefinedFunction udf = functions.udf((Integer num1, Integer num2) -> {
    return broadcastTree.getValue().isBro(num1,num2);
}, BooleanType);
//  注册udf
spark.udf().reGISter("isBro",udf);
//  使用udf
df = df.withColumn("is_bro",functions.expr("isBro(node_1, node_2)"));

方法二 Excutor端加载

如果我们直接在call中进行初始化会存在问题:由于多个task的线程会在同一时刻对UDF中的call进行调用,导致资源对象在同一时刻被初始化多次,造成Excutor内存资源浪费。此外,如果外部资源为连接池对象,在同一时刻初始化多次会建立多个连接,增加外部数据源的访问压力。

为此,我们可以借助单例模式中的懒汉式实现,让资源在每个Excutor中只被初始化一次。懒汉式的实现需要新建一个类(命名为IsBroUDF2)并实现UDF2<Integer, Integer, Boolean>接口,重写UDF2的call方法:

import org.apache.spark.sql.api.java.UDF2;

public class IsBroUDF2 implements UDF2<Integer,Integer,Boolean> {
    // 定义静态的TreeNode成员变量
    private static volatile TreeNode treeNode;
    public IsBroUDF2() {
    }
    @Override
    public Boolean call(Integer num1, Integer num2) throws Exception {
//        懒汉式 二次判定
        if(null==treeNode){
            synchronized (IsBroUDF2.class){
                if(null==treeNode){
                    treeNode=createTree();
                }
            }
        }
        return treeNode.isBro(num1,num2);
    }
    // 辅助方法
    public static TreeNode createTree(){
        TreeNode[] treeNodes = new TreeNode[8];
        for(int i=1; i<=7; i++){
            treeNodes[i] =  new TreeNode(i);
        }
        treeNodes[2].setLeft(treeNodes[1]);
        treeNodes[2].setRight(treeNodes[3]);
        treeNodes[6].setLeft(treeNodes[5]);
        treeNodes[6].setRight(treeNodes[7]);
        treeNodes[4].setLeft(treeNodes[2]);
        treeNodes[4].setRight(treeNodes[6]);
        return treeNodes[4];
    }
}

然后注册和使用UDF

//  注册udf
spark.udf().register("isBro",new IsBroUDF2(), BooleanType);
//  使用udf
df = df.withColumn("is_bro",functions.expr("isBro(node_1, node_2)"));

在call方法中通过加可以实现TreeNode资源在同一个Excutor中只被初始化一次。除了上面介绍的这种懒汉式的写法之外,还可以通过静态内部类懒加载、枚举等方式实现TreeNode资源在Excutor端只被初始化一次。

小结

想要在Spark Sql的UDF中使用Spark外的资源和数据进行运算,我们既可以在Driver端预先进行初始化然后广播到各Excutor上(要求对象能序列化),也可以直接在Excutor端进行加载;如果在Excutor端加载要保证外部资源对象只被初始化一次。

以上就是详解Spark Sql在UDF中如何引用外部数据的详细内容,更多关于Spark Sql UDF引用外部数据的资料请关注编程网其它相关文章!

--结束END--

本文标题: 详解Spark Sql在UDF中如何引用外部数据

本文链接: https://lsjlt.com/news/193828.html(转载时请注明来源链接)

有问题或投稿请发送至: 邮箱/279061341@qq.com    QQ/279061341

猜你喜欢
  • 详解Spark Sql在UDF中如何引用外部数据
    目录前言场景介绍方法一 Driver端加载方法二 Excutor端加载小结前言 Spark Sql可以通过UDF来对DataFrame的Column进行自定义操作。在特定场景下定义...
    99+
    2023-02-01
    Spark Sql UDF引用外部数据 Spark Sql UDF
  • Spark SQL外部数据源的机制以及spark-sql的使用
    这篇文章主要介绍“Spark SQL外部数据源的机制以及spark-sql的使用”,在日常操作中,相信很多人在Spark SQL外部数据源的机制以及spark-sql的使用问题上存在疑惑,小编查阅了各式资料,整理出简单好用的操作方法,希望对...
    99+
    2023-06-02
  • 在html中如何引用外部css文件
    如何引用外部 css 文件?使用 <link> 标签,指定 rel="stylesheet" 和 href="css 文件路径" 属性。创建外部 css...
    99+
    2024-04-11
    css
  • 如何在SQL Server中利用Apache Spark进行大数据分析
    在SQL Server中利用Apache Spark进行大数据分析可以通过以下步骤实现: 安装Apache Spark和Hadoop集群:首先在SQL Server上安装Apache Spark和Hadoop集群。可以使用HDInsig...
    99+
    2024-06-03
    sql server
  • 在vue中如何引入外部less文件
    目录vue引入外部less文件vue引入lang=“less“报错的解决vue引入外部less文件 首先vue环境搭建成功 一、安装 less 和less-l...
    99+
    2024-04-02
  • PostgreSQL中的外部表和外部数据源如何使用
    在PostgreSQL中,外部表和外部数据源可以通过使用外部数据包装器(Foreign Data Wrapper)来实现。外部数据包...
    99+
    2024-03-14
    PostgreSQL
  • 在vue中如何引入外部的css文件
    目录vue中引入外部css文件1. 全局引入2. 局部引入相对路径3. 局部引入绝对路径css-loader导致vue中样式失效的坑问题描述环境原因方案vue中引入外部css文件 在...
    99+
    2024-04-02
  • 如何解析SparkSQL外部数据源
    这期内容当中小编将会给大家带来有关如何解析SparkSQL外部数据源,文章内容丰富且以专业的角度为大家分析和叙述,阅读完这篇文章希望大家可以有所收获。场景介绍:    大数据MapReduce,Hiv...
    99+
    2023-06-02
  • Spark SQL是什么,如何使用SQL语句查询数据
    Spark SQL是Apache Spark中的一个组件,用于支持结构化数据处理。它提供了一个用于执行SQL查询的接口,允许用户使用...
    99+
    2024-04-09
    Spark
  • 在CI框架中如何引入外部CSS样式表?
    CI框架中如何使用外部CSS样式,需要具体代码示例 引言:CI(CodeIgniter) 是一个轻量级的PHP开发框架,被广泛用于构建Web应用程序。在开发Web应用程序时,外部CSS样式起着至关重要的作用,可以帮助我们美化页面...
    99+
    2024-01-16
    使用方法 CI框架 外部CSS
  • 详解ObjectiveC中Block如何捕获外部值
    目录引言自动变量静态变量、静态全局变量与全局变量带 __block 的自动变量捕获对象__block 对象类型的捕获引言 Block 本质上也是一个 Objective-C 对象,它...
    99+
    2024-04-02
  • 如何在Java中引用数据类型
    如何在Java中引用数据类型?针对这个问题,这篇文章详细介绍了相对应的分析和解答,希望可以帮助更多想解决这个问题的小伙伴找到更简单易行的方法。Java中的引用数据类型:Java作为面向对象的语言,Java中所有用class,interfac...
    99+
    2023-05-31
    数据类型 ava java
  • 如何在python中调用外部程序
    在python中调用外部程序的方法:1.使用os.system()函数调用;2.使用ShellExecute函数调用;3.使用ctypes模块调用;具体方法如下:使用os.system()函数调用python中可以使用os.system()...
    99+
    2024-04-02
  • 如何在PHP中使用外部变量
    如何在PHP中使用外部变量?针对这个问题,这篇文章详细介绍了相对应的分析和解答,希望可以帮助更多想解决这个问题的小伙伴找到更简单易行的方法。PHP常量和变量之外部变量:外部变量:外部变量就是PHP在使用过程中规定好的一些变量;例如:我们将一...
    99+
    2023-06-15
  • 如何在C#中调用外部进程
    这篇文章给大家介绍如何在C#中调用外部进程,内容非常详细,感兴趣的小伙伴们可以参考借鉴,希望对大家能有所帮助。在开始正题之前,我们先来看一看网上比较常见的C#调用外部进程:privatestringRunCmd(stringcommand)...
    99+
    2023-06-18
  • sql外键引用了无效的表如何解决
    在SQL中,如果外键引用了无效的表,通常会收到一个错误消息,指示找不到或无效的引用表。为了解决这个问题,您可以按照以下步骤进行操作:...
    99+
    2024-04-09
    sql
  • 如何在MySQL数据库中使用外键
    如何在MySQL数据库中使用外键 在关系型数据库中,外键是一种非常重要的概念,它能够帮助我们建立不同表之间的关联关系,并确保数据完整性。在MySQL数据库中,要使用外键,需要遵循一定的...
    99+
    2024-04-02
  • 详解如何在Angular中引入Mock.js
    目录介绍为什么使用 Mock.js如何使用Mock.js模拟API请求安装Mock.js创建mock数据文件在Angular中使用Mock.js示例总结介绍 Mock.js是一个用于...
    99+
    2023-05-16
    Angular引入Mock.js Angular Mock.js
  • 如何使用SQL语句在MySQL中进行数据索引和优化?
    如何使用SQL语句在MySQL中进行数据索引和优化?在使用MySQL数据库时,数据索引和优化是非常重要的。适当地创建索引和优化查询语句可以大大提高数据库的性能。本文将详细介绍如何使用SQL语句在MySQL中进行数据索引和优化,并提供具体的代...
    99+
    2023-12-17
    数据索引使用 SQL索引优化 MySQL数据优化
  • docker连不上外部数据库如何解决
    如果无法连接外部数据库,可能是由于以下几个原因:1. 网络问题:确保您的网络连接正常,尝试使用ping命令测试是否能够与外部数据库服...
    99+
    2023-10-09
    docker 数据库
软考高级职称资格查询
编程网,编程工程师的家园,是目前国内优秀的开源技术社区之一,形成了由开源软件库、代码分享、资讯、协作翻译、讨论区和博客等几大频道内容,为IT开发者提供了一个发现、使用、并交流开源技术的平台。
  • 官方手机版

  • 微信公众号

  • 商务合作