java 中Spark中将对象序列化存储到hdfs

作者:小水熊 时间:2022-09-17 04:06:18 

java 中Spark中将对象序列化存储到hdfs

摘要: Spark应用中经常会遇到这样一个需求: 需要将JAVA对象序列化并存储到HDFS, 尤其是利用MLlib计算出来的一些模型, 存储到hdfs以便模型可以反复利用. 下面的例子演示了Spark环境下从Hbase读取数据, 生成一个word2vec模型, 存储到hdfs.

废话不多说, 直接贴代码了. spark1.4 + hbase0.98


import org.apache.spark.storage.StorageLevel
import scala.collection.JavaConverters._
import java.io.File
import java.io.FileInputStream
import java.io.FileOutputStream
import java.io.ObjectInputStream
import java.io.ObjectOutputStream
import java.net.URI
import java.util.Date
import org.ansj.library.UserDefineLibrary
import org.ansj.splitWord.analysis.NlpAnalysis
import org.ansj.splitWord.analysis.ToAnalysis
import org.apache.hadoop.fs.FSDataInputStream
import org.apache.hadoop.fs.FSDataOutputStream
import org.apache.hadoop.fs.FileSystem
import org.apache.hadoop.fs.FileUtil
import org.apache.hadoop.fs.Path
import org.apache.hadoop.hbase.client._
import org.apache.hadoop.hbase.{HBaseConfiguration, HTableDescriptor, TableName}
import org.apache.hadoop.hbase.filter.FilterList
import org.apache.hadoop.hbase.filter.PageFilter
import org.apache.hadoop.hbase.filter.RegexStringComparator
import org.apache.hadoop.hbase.filter.SingleColumnValueFilter
import org.apache.hadoop.hbase.filter.CompareFilter.CompareOp
import org.apache.hadoop.hbase.mapreduce.TableInputFormat
import org.apache.hadoop.hbase.protobuf.ProtobufUtil
import org.apache.hadoop.hbase.util.{Base64, Bytes}
import com.feheadline.fespark.db.Neo4jManager
import com.feheadline.fespark.util.Env
import org.apache.spark.SparkConf
import org.apache.spark.SparkContext
import org.apache.spark.rdd._
import org.apache.spark.mllib.feature.{Word2Vec, Word2VecModel}
import scala.math.log
import scala.io.Source

object Word2VecDemo {

def convertScanToString(scan: Scan) = {
 val proto = ProtobufUtil.toScan(scan)
 Base64.encodeBytes(proto.toByteArray)
}

def main(args: Array[String]): Unit = {
 val sparkConf = new SparkConf().setAppName("Word2Vec Demo")
 sparkConf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
 sparkConf.set("spark.kryoserializer.buffer", "256m")
 sparkConf.set("spark.kryoserializer.buffer.max","2046m")
 sparkConf.set("spark.akka.frameSize", "500")
 sparkConf.set("spark.rpc.askTimeout", "30")

val sc = new SparkContext(sparkConf)
 val hbaseConf = HBaseConfiguration.create()
 hbaseConf.set("hbase.zookeeper.quorum", "myzookeeper")

hbaseConf.set(TableInputFormat.INPUT_TABLE, "crawled")

val scan = new Scan()
 val filterList:FilterList = new FilterList(FilterList.Operator.MUST_PASS_ALL)

val comp:RegexStringComparator = new RegexStringComparator(""".{1500,}""")

val articleFilter:SingleColumnValueFilter = new SingleColumnValueFilter(
 "data".getBytes,
 "article".getBytes,
 CompareOp.EQUAL,
 comp
 )

filterList.addFilter(articleFilter)
 filterList.addFilter(new PageFilter(100))

scan.setFilter(filterList)
 scan.setCaching(50)
 scan.setCacheBlocks(false)
 hbaseConf.set(TableInputFormat.SCAN,convertScanToString(scan))

val crawledRDD = sc.newAPIHadoopRDD(
  hbaseConf,
  classOf[TableInputFormat],
  classOf[org.apache.hadoop.hbase.io.ImmutableBytesWritable],
  classOf[org.apache.hadoop.hbase.client.Result]
 )

val articlesRDD = crawledRDD.filter{
  case (_,result) => {
    val content = Bytes.toString(result.getValue("data".getBytes,"article".getBytes))
    content != null
  }
 }

val wordsInDoc = articlesRDD.map{
  case (_,result) => {
    val content = Bytes.toString(result.getValue("data".getBytes,"article".getBytes))
    if(content!=null)ToAnalysis.parse(content).asScala.map(_.getName).toSeq
    else Seq("")
  }
 }

val fitleredWordsInDoc = wordsInDoc.filter(_.nonEmpty)

val word2vec = new Word2Vec()
 val model = word2vec.fit(fitleredWordsInDoc)

//---------------------------------------重点看这里-------------------------------------------------------------
 //将上面的模型存储到hdfs
 val hadoopConf = sc.hadoopConfiguration
 hadoopConf.set("fs.defaultFS", "hdfs://myhadoop:9000/")
 val fileSystem = FileSystem.get(hadoopConf)
 val path = new Path("/user/hadoop/data/mllib/word2vec-object")
 val oos = new ObjectOutputStream(new FSDataOutputStream(fileSystem.create(path)))
 oos.writeObject(model)
 oos.close

//这里示例另外一个程序直接从hdfs读取序列化对象使用模型
 val ois = new ObjectInputStream(new FSDataInputStream(fileSystem.open(path)))
 val sample_model = ois.readObject.asInstanceOf[Word2VecModel]

/*
 * //你还可以将序列化文件从hdfs放到本地, scala程序使用模型
 * import java.io._
 * import org.apache.spark.mllib.feature.{Word2Vec, Word2VecModel}
 * val ois = new ObjectInputStream(new FileInputStream("/home/cherokee/tmp/word2vec-object"))
 * val sample_model = ois.readObject.asInstanceOf[Word2VecModel]
 * ois.close
 */
 //--------------------------------------------------------------------------------------------------------------
}
}

感谢阅读,希望能帮助到大家,谢谢大家对本站的支持!

来源:https://my.oschina.net/waterbear/blog/525347

标签:java,Spark,对象序列化
0
投稿

猜你喜欢

  • java跟踪执行的sql语句示例分享

    2022-07-30 20:13:18
  • Spring常用注解汇总

    2022-12-19 16:26:03
  • springboot post接口接受json时,转换为对象时,属性都为null的解决

    2023-06-17 15:24:23
  • SSH框架网上商城项目第2战之基本增删查改、Service和Action的抽取

    2023-06-21 19:16:23
  • java中申请不定长度数组ArrayList的方法

    2023-02-24 17:37:37
  • 我用java实现了王者荣耀的皮肤和英雄技能

    2022-01-13 13:44:09
  • Java装饰器设计模式_动力节点Java学院整理

    2023-11-11 03:03:10
  • java 中 poi解析Excel文件版本问题解决办法

    2023-11-15 16:49:45
  • SpringBoot使用Thymeleaf模板引擎访问静态html的过程

    2023-11-25 10:04:44
  • Java基于栈方式解决汉诺塔问题实例【递归与非递归算法】

    2023-04-10 08:05:02
  • IDEA JavaWeb项目启动运行后出现404错误的解决方法

    2022-05-19 01:22:11
  • 解决使用ProcessBuilder踩到的坑及注意事项

    2023-11-24 01:37:55
  • java实战之猜字小游戏

    2022-03-31 14:48:01
  • JSON复杂数据处理之Json树形结构数据转Java对象并存储到数据库的实现

    2023-09-17 17:03:59
  • C# 通过反射获取类型的字段值及给字段赋值的操作

    2021-11-14 13:30:23
  • flutter实现发送验证码功能

    2023-07-05 19:03:12
  • AsyncTask官方文档教程整理

    2023-07-31 20:25:08
  • JDK 7 新特性小结实例代码解析

    2022-04-18 13:03:07
  • SpringBoot重写addResourceHandlers映射文件路径方式

    2022-05-26 19:02:40
  • java读取cvs文件并导入数据库

    2023-11-25 01:23:13
  • asp之家 软件编程 m.aspxhome.com