详解Spark Sql在UDF中如何引用外部数据

作者:KYs_Daddy 时间:2021-08-17 14:51:17 

前言

Spark Sql可以通过UDF来对DataFrame的Column进行自定义操作。在特定场景下定义UDF可能需要用到Spark Context以外的资源或数据。比如从List或Map中取值,或是通过连接池从外部的数据源中读取数据,然后再参与Column的运算。

Excutor中每个task的工作线程都会对UDF的call进行调用,外部资源的使用发生在Excutor端,而资源加载既能发生在Driver端,也可以发生在Excutor端。如果外部资源对象能序列化,我们可以在Driver端进行初始化,然后广播(broadcast)到Excutor端参与运算。对于不能进行序列化的对象,如JedisPool(redis连接池),只能在Excutor端进行初始化。

因此,在UDF中引用外部资源有以下两类方法:

  • 能序列化:在Driver端进行初始化,然后通过spark的broadcast方法广播到Excutor上进行使用;

  • 不能序列化:在Excutor端进行初始化然后使用。

下面我们将用一个实际例子对上述两种方法进行详细介绍。

本文使用环境:Spark-2.3.0,Java 8。

场景介绍

我们以一个DataFrame(两个字段node_1、node_2)作为原始数据;一棵二叉搜索树(BST)作为Spark外部被引用数据;目标是定义一个UDF来判断:BST中是否刚好存在一个父节点,它的左右子节点值与node_1、node_2两个字段值相同。然后将判断结果输出到新列is_bro。其中DataFrame:

详解Spark Sql在UDF中如何引用外部数据

BST:

详解Spark Sql在UDF中如何引用外部数据

输出DataFrame:

详解Spark Sql在UDF中如何引用外部数据

二叉树的定义与判断是否为父节点的左右子节点的逻辑如下:

import java.io.Serializable;
/**
* @author wangjiahui
* @create 2021-03-14-10:57
*/
public class TreeNode implements Serializable{
   private Integer val;
   private TreeNode left;
   private TreeNode right;
   public TreeNode() {
   }
   public TreeNode(Integer val) {
       this.val = val;
   }
   public TreeNode(Integer val, TreeNode left, TreeNode right) {
       this.val = val;
       this.left = left;
       this.right = right;
   }
   public Integer getVal() {
       return val;
   }
   public void setVal(Integer val) {
       this.val = val;
   }
   public TreeNode getLeft() {
       return left;
   }
   public void setLeft(TreeNode left) {
       this.left = left;
   }
   public TreeNode getRight() {
       return right;
   }
   public void setRight(TreeNode right) {
       this.right = right;
   }
   /**
    * 判断是否刚好有一个父节点的左、右子节点值与num1、num2相同
    * @param num1
    * @param num2
    * @return
    */
   public Boolean isBro( Integer num1, Integer num2) {
       if (null == getLeft()||null == getRight()) {
           return false;
       }
       if (getLeft().getVal().compareTo(num1)==0 && getRight().getVal().compareTo(num2)==0) {
           return true;
       }
       return getLeft().isBro(num1, num2) || getRight().isBro(num1, num2);
   }
}

生成上图所示BST的方法createTree()如下:

public static TreeNode createTree(){
   TreeNode[] treeNodes = new TreeNode[8];
   for(int i=1; i<=7; i++){
       treeNodes[i] =  new TreeNode(i);
   }
   treeNodes[2].setLeft(treeNodes[1]);
   treeNodes[2].setRight(treeNodes[3]);
   treeNodes[6].setLeft(treeNodes[5]);
   treeNodes[6].setRight(treeNodes[7]);
   treeNodes[4].setLeft(treeNodes[2]);
   treeNodes[4].setRight(treeNodes[6]);
   return treeNodes[4];
}

方法一 Driver端加载

在Driver端完成初始化并定义UDF

JavaSparkContext javaSparkContext = new JavaSparkContext(spark.sparkContext());
//  初始化树
TreeNode tree = createTree();
//  broadcast
Broadcast<TreeNode> broadcastTree = javaSparkContext.broadcast(tree);
//  lambda表达式定义udf
UserDefinedFunction udf = functions.udf((Integer num1, Integer num2) -> {
   return broadcastTree.getValue().isBro(num1,num2);
}, BooleanType);
//  注册udf
spark.udf().register("isBro",udf);
//  使用udf
df = df.withColumn("is_bro",functions.expr("isBro(node_1, node_2)"));

方法二 Excutor端加载

如果我们直接在call中进行初始化会存在问题:由于多个task的线程会在同一时刻对UDF中的call进行调用,导致资源对象在同一时刻被初始化多次,造成Excutor内存资源浪费。此外,如果外部资源为连接池对象,在同一时刻初始化多次会建立多个连接,增加外部数据源的访问压力。

为此,我们可以借助单例模式中的懒汉式实现,让资源在每个Excutor中只被初始化一次。懒汉式的实现需要新建一个类(命名为IsBroUDF2)并实现UDF2<Integer, Integer, Boolean>接口,重写UDF2的call方法:

import org.apache.spark.sql.api.java.UDF2;
/**
* @author wangjiahui
* @create 2021-03-14-14:25
*/
public class IsBroUDF2 implements UDF2<Integer,Integer,Boolean> {
   // 定义静态的TreeNode成员变量
   private static volatile TreeNode treeNode;
   public IsBroUDF2() {
   }
   @Override
   public Boolean call(Integer num1, Integer num2) throws Exception {
//        懒汉式 二次判定
       if(null==treeNode){
           synchronized (IsBroUDF2.class){
               if(null==treeNode){
                   treeNode=createTree();
               }
           }
       }
       return treeNode.isBro(num1,num2);
   }
   // 辅助方法
   public static TreeNode createTree(){
       TreeNode[] treeNodes = new TreeNode[8];
       for(int i=1; i<=7; i++){
           treeNodes[i] =  new TreeNode(i);
       }
       treeNodes[2].setLeft(treeNodes[1]);
       treeNodes[2].setRight(treeNodes[3]);
       treeNodes[6].setLeft(treeNodes[5]);
       treeNodes[6].setRight(treeNodes[7]);
       treeNodes[4].setLeft(treeNodes[2]);
       treeNodes[4].setRight(treeNodes[6]);
       return treeNodes[4];
   }
}

然后注册和使用UDF

//  注册udf
spark.udf().register("isBro",new IsBroUDF2(), BooleanType);
//  使用udf
df = df.withColumn("is_bro",functions.expr("isBro(node_1, node_2)"));

在call方法中通过加锁可以实现TreeNode资源在同一个Excutor中只被初始化一次。除了上面介绍的这种懒汉式的写法之外,还可以通过静态内部类懒加载、枚举等方式实现TreeNode资源在Excutor端只被初始化一次。

小结

想要在Spark Sql的UDF中使用Spark外的资源和数据进行运算,我们既可以在Driver端预先进行初始化然后广播到各Excutor上(要求对象能序列化),也可以直接在Excutor端进行加载;如果在Excutor端加载要保证外部资源对象只被初始化一次。

来源:https://juejin.cn/post/7194498526628282427

标签:Spark,Sql,UDF,引用数据
0
投稿

猜你喜欢

  • 使用SSM+Layui+Bootstrap实现汽车维保系统的示例代码

    2023-11-28 18:30:04
  • Java泛型映射不同的值类型详解及实例代码

    2023-07-29 00:20:52
  • mybatis源码解读之executor包懒加载功能 

    2022-09-17 00:28:05
  • JavaWeb实现文件上传下载功能实例详解

    2023-05-08 19:43:51
  • SpringBoot整合Zookeeper详细教程

    2022-07-24 11:33:09
  • java读取文件字符集示例方法

    2023-11-09 12:35:39
  • Spring Security 强制退出指定用户的方法

    2022-10-04 18:13:04
  • Java求两集合中元素交集的四种方法对比分析

    2023-08-23 09:24:56
  • JavaWeb工程中集成YMP框架快速上手

    2023-11-24 12:15:12
  • SpringBoot工程打包与运行的实现详解

    2023-11-10 23:51:28
  • 如何利用JAVA实现走迷宫程序

    2022-06-23 10:52:06
  • Java程序控制逻辑—流程控制

    2023-08-28 01:51:18
  • Java Set集合及其子类HashSet与LinkedHashSet详解

    2023-11-26 11:39:35
  • Android Studio kotlin生成编辑类注释代码

    2023-06-16 12:03:20
  • Java序列化JSON丢失精度问题的解决方法(修复Long类型太长)

    2022-10-15 00:01:34
  • 分布式调度XXL-Job整合Springboot2.X实战操作过程(推荐)

    2023-11-23 09:43:38
  • java 键盘输入一个数,输出数组中指定元素的示例

    2023-11-24 20:31:14
  • IDEA 2021.2 激活教程及启动报错问题解决方法

    2023-11-14 14:10:27
  • Java 中 synchronized的用法详解(四种用法)

    2022-03-11 08:55:05
  • Java Object类中的常用API介绍

    2023-11-09 01:51:00
  • asp之家 软件编程 m.aspxhome.com