Java 数据流之Broadcast State

作者:Vicky_Tang 时间:2022-05-21 15:17:19 

一、BroadcastState 的介绍

广播状态(Broadcast State)是 Operator State 的一种特殊类型。如果我们需要将配置 、规则等低吞吐事件流广播到下游所有 Task 时,就可以使用 BroadcastState。下游的 Task 接收这些配置、规则并保存为 BroadcastState,所有Task 中的状态保持一致,作用于另一个数据流的计算中。
简单理解:一个低吞吐量流包含一组规则,我们想对来自另一个流的所有元素基于此规则进行评估。
场景:动态更新计算规则。

广播状态与其他操作符状态的区别在于:

  • 它有一个 map 格式,用于定义存储结构

  • 它仅对具有广播流和非广播流输入的特定操作符可用

  • 这样的操作符可以具有不同名称的多个广播状态

Java 数据流之Broadcast State

二、BroadcastState 操作流程

Java 数据流之Broadcast State

三、案例实现

  • 从端口读取Json数据作为事件流

  • 从Mysql读取数据作为广播流

  • 关联广播流和事件流

  • 匹配对应的用户信息


package cn.kgc.broadcast

import java.sql.{Connection, DriverManager, PreparedStatement}

import com.alibaba.fastjson.JSON
import org.apache.flink.api.common.state.{BroadcastState, MapStateDescriptor}
import org.apache.flink.configuration.Configuration
import org.apache.flink.streaming.api.datastream.BroadcastStream
import org.apache.flink.streaming.api.functions.co.BroadcastProcessFunction
import org.apache.flink.streaming.api.functions.source.{RichParallelSourceFunction, SourceFunction}
import org.apache.flink.streaming.api.scala._
import org.apache.flink.util.Collector

// (001,'tom',18,'北京',15830010002)
// 定义样例类 接受 MySQL的用户数据
case class BaseUserInfo(id:Long,name:String,age:Int,city:String,phone:Long)

// user_id、user_name、user_addrss、behaviour、url
// 输出数据类型
case class UserVisitInfo(id:Long,name:String,city:String,behaviour:String,url:String)

// 实现广播ProcessFunction
class MyBroadcastFunc extends BroadcastProcessFunction[String,(Long, BaseUserInfo),UserVisitInfo]{

lazy val mapStateDes = new MapStateDescriptor[Long, BaseUserInfo]("mapState",classOf[Long],classOf[BaseUserInfo])

// 处理的是日志流中的每条数据
 override def processElement(value: String, ctx: BroadcastProcessFunction[String, (Long, BaseUserInfo), UserVisitInfo]#ReadOnlyContext, out: Collector[UserVisitInfo]): Unit = {
   // {"user_id":"001","ts":"2021-07-10 11:10:05","behaviour":"browse","url":"https://www.tb1.com/1.html"}
   val user_id = JSON.parseObject(value).getLong("user_id")
   val behaviour = JSON.parseObject(value).getString("behaviour")
   val url = JSON.parseObject(value).getString("url")

val mapState = ctx.getBroadcastState(mapStateDes)
   val userInfo = mapState.get(user_id)

out.collect(UserVisitInfo(user_id,userInfo.name,userInfo.city,behaviour,url))

}

// 处理的是广播流的每个值
 override def processBroadcastElement(value: (Long, BaseUserInfo), ctx: BroadcastProcessFunction[String, (Long, BaseUserInfo), UserVisitInfo]#Context, out: Collector[UserVisitInfo]): Unit = {
   val mapState: BroadcastState[Long, BaseUserInfo] = ctx.getBroadcastState(mapStateDes)
   mapState.put(value._1,value._2)
 }
}

class UserSourceFunc extends RichParallelSourceFunction[BaseUserInfo]{

var conn:Connection = _
 var statement: PreparedStatement = _
 var flag:Boolean = true

override def open(parameters: Configuration): Unit = {
   conn = DriverManager.getConnection("jdbc:mysql://localhost:3306/test?characterEncoding=utf-8&serverTimezone=UTC","root","liu911223")
   statement = conn.prepareStatement("select * from base_user")
 }

override def run(ctx: SourceFunction.SourceContext[BaseUserInfo]): Unit = {
   while (flag){
     Thread.sleep(5000)
     val resultSet = statement.executeQuery()
     while (resultSet.next()){
       val id = resultSet.getLong(1)
       val name = resultSet.getString(2)
       val age = resultSet.getInt(3)
       val city = resultSet.getString(4)
       val phone = resultSet.getLong(5)
       ctx.collect(BaseUserInfo(id,name,age,city,phone))
     }
   }
 }

override def cancel(): Unit = {
   flag = false
 }

override def close(): Unit = {
   if (statement != null) statement.close()
   if (conn != null) conn.close()
 }
}
object BroadcastDemo01 {
 def main(args: Array[String]): Unit = {
   val env = StreamExecutionEnvironment.getExecutionEnvironment
   env.setParallelism(1)

// 定义为KV,一方面是为了广播的时候定义为map,另一方面是为了做关联操作
   val userBaseDS: DataStream[(Long, BaseUserInfo)] = env.addSource(new UserSourceFunc)
     .map(user => (user.id, user))
   val mapStateDes = new MapStateDescriptor[Long, BaseUserInfo]("mapState",classOf[Long],classOf[BaseUserInfo])
   val broadCastStream: BroadcastStream[(Long, BaseUserInfo)] = userBaseDS.broadcast(mapStateDes)

// 日志JSON数据
   val dataInfoDS: DataStream[String] = env.socketTextStream("master",1314)

dataInfoDS.connect(broadCastStream)
     .process(new MyBroadcastFunc)
     .print()

env.execute()
 }
}

来源:https://blog.csdn.net/sweet19920711/article/details/120027690

标签:Java,Broadcast,State,数据流
0
投稿

猜你喜欢

  • IDEA 中使用 Hudi的示例代码

    2021-08-27 21:51:04
  • JavaWeb如何实现禁用浏览器缓存

    2021-09-13 01:27:45
  • Java集合之Map接口的实现类精解

    2023-10-07 15:10:37
  • 基于Java8 Stream API实现数据抽取收集

    2021-10-01 03:13:03
  • Java+Swing实现医院管理系统的完整代码

    2023-03-17 00:40:21
  • 详解Kotlin的空指针处理

    2022-06-03 06:57:58
  • Java 类加载机制详细介绍

    2023-12-19 13:55:59
  • java实现代码统计小程序

    2022-03-08 23:15:24
  • MAC配置java+jmeter环境变量过程解析

    2021-09-30 00:16:23
  • MyBatis一二级缓存

    2021-07-03 13:01:59
  • 详解JAVA 线程-线程的状态有哪些?它是如何工作的?

    2023-11-27 03:33:09
  • 游戏开发之随机概率的选择算法

    2022-08-26 13:21:09
  • Java之SpringBean生命周期问题理解

    2022-11-16 14:47:35
  • SpringBoot利用限速器RateLimiter实现单机限流的示例代码

    2023-04-05 19:57:50
  • C#从命令行读取参数的方法

    2023-07-12 15:23:11
  • Android ListView的Item点击效果的定制

    2023-06-21 11:46:01
  • Java super关键字的使用详解

    2021-11-30 13:40:43
  • 解决java.lang.Error: Unresolved compilation problems:问题

    2023-02-10 05:58:08
  • IDEA创建Java项目文件并运行教程解析

    2023-01-14 15:50:47
  • 使用SpringBoot 工厂模式自动注入到Map

    2021-12-22 10:02:42
  • asp之家 软件编程 m.aspxhome.com