Spark SQL的自定义函数UDF使用

作者:CarveStone 时间:2022-07-31 04:19:47 

Spark_SQL的UDF使用

用户自定义函数,也叫UDF,可以让我们使用Python/Java/Scala注册自定义函数,并在SQL中调用。这种方法很常用,通常用来给机构内的SQL用户们提供高级功能支持,这样这些用户就可以直接调用注册的函数而无需自己去通过编程来实现了。

  • 在Spark SQL中,编写UDF 尤为简单。Spark SQL不仅有自己的UDF接口,也支持已有的Apache Hive UDF。我们可以使用Spark支持的编程语言编写好函数,然后通过Spark SQL内建的方法传递进来,非常便捷地注册我们自己的UDF。

  • 在Scala和Python中,可以利用语言原生的函数和lambda语法的支持,而在Java中,则需要扩展对应的UDF类。UDF能够支持各种数据类型,返回类型也可以与调用时的参数类型完全不一样。

UDF简单使用

首先通过代码建立一个测试的DataFrame数据,通过RDD产生,再转换成DataFrame格式,通过写简单的UDF函数,对数据进行操作并输出,例如:

import org.apache.spark.sql.Row
import org.apache.spark.rdd._
import scala.collection.mutable.ArrayBuffer
import org.apache.spark.sql.types.{StructType, StructField, StringType, IntegerType}
// 通过RDD创建测试数据
val rdd: RDD[Row] = sc.parallelize(List("Michael,male, 29",            
"Andy,female, 30",
"Justin,male, 19",
"Dela,female, 25",
"Magi,male, 20",
"Pule,male,21"))
.map(_.split(",")).map(p => Row(p(0),p(1),p(2).trim.toInt))
// 创建Schema
val schema = StructType( Array( StructField("name",StringType, true),StructField("sex",StringType, true),StructField("age",IntegerType,true)))
// 转换DataFrame  
val peopleDF = spark.sqlContext.createDataFrame(rdd,schema)
// 注册UDF函数    
spark.udf.register("strlen",(x:String)=>x.length)
// 创建临时表      
peopleDF.registerTempTable("people")                  
// 选择输出语句,(选择输出列:名字,名字长度,性别从表people中)
spark.sql("select name, strlen(name) as strlen,sex from people").show()

创建 DataFrame

scala> val df = spark.read.json("data/user.json")
df: org.apache.spark.sql.DataFrame = [age: bigint, username: string]

注册 UDF

scala> spark.udf.register("addName",(x:String)=> "Name:"+x)
res9: org.apache.spark.sql.expressions.UserDefinedFunction =
UserDefinedFunction(<function1>,StringType,Some(List(StringType)))

创建临时表

scala> df.createOrReplaceTempView("people")

应用 UDF

scala> spark.sql("Select addName(name),age from people").show()

来源:https://blog.csdn.net/weixin_44018458/article/details/128800313

标签:Spark,SQL,UDF,自定义函数
0
投稿

猜你喜欢

  • Spring Cloud 的 Hystrix.功能及实践详解

    2023-11-19 06:40:46
  • C#启动进程的几种常用方法

    2023-06-18 04:13:48
  • 一文搞懂c# await,async执行流

    2023-07-14 01:22:35
  • SSM如何实现在Controller中添加事务管理

    2023-11-29 07:23:18
  • 解析spring cloud ouath2中的Eureka

    2023-10-12 04:07:54
  • 如何将javaweb项目部署到linux下

    2023-11-11 11:45:11
  • Mybatis下的SQL注入漏洞原理及防护方法解析

    2022-06-30 18:38:29
  • Android Java try catch 失效问题及解决

    2023-06-17 17:07:33
  • Mybatis中resultMap的Colum和property属性详解

    2023-09-16 11:17:33
  • Java线程池大小的设置方法实例

    2022-10-04 04:20:19
  • Java压缩文件ZIP实例代码

    2022-03-25 04:08:27
  • Java面试题冲刺第五天--基础篇2

    2023-10-07 13:17:04
  • Java 根据网址查询DNS/IP地址的方法

    2023-06-21 15:31:54
  • Java面向对象基础知识之委托和lambda

    2022-07-28 16:51:11
  • Java中常见的陷阱题及答案

    2021-08-10 16:32:11
  • SpringBoot在Controller层接收参数的n种姿势(超详细)

    2023-01-28 00:54:39
  • Java非侵入式API接口文档工具apigcc用法详解

    2023-11-24 10:01:00
  • Java基础将Bean属性值放入Map中的实例

    2023-10-11 13:57:40
  • 解决springmvc关于前台日期作为实体类对象参数类型转换错误的问题

    2023-11-28 20:53:42
  • java分页工具类的使用方法

    2023-08-17 02:00:14
  • asp之家 软件编程 m.aspxhome.com