Hanlp分词器(通过spark)
作者:互联网
这里主要是对内容数据进行标签处理
这里我们是用分词器是HanLP
HanLP是哈工大提供的一种中文分词的工具,因为他支持Java API
这里我们使用spark + hanlp进行中文分词
1、准备工作
##1. 在hdfs创建目录用于存放hanlp的数据
[root@hadoop ~]# hdfs dfs -mkdir -p /common/nlp/data
##2. 将hanlp的工具上传到服务器的指定位置
##3. 解压到当前目录
[root@hadoop soft]# tar -zxvf hanlp.dictionary.tgz
##4. 将语料库上传到hdfs的指定位置
[root@hadoop soft]# hdfs dfs -put ./dictionary/ /common/nlp/data
##5. 将这个hanlp.properties拷贝到当前工程下的resources目录下
使用Hanlp必须继承IIOAdapter,因为是使用我们自定义的分词库
package com.fuwei.bigdata.profile.nlp.hanlp
import com.hankcs.hanlp.corpus.io.{IIOAdapter, IOUtil}
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.{FileSystem, Path}
import java.io.{FileInputStream, InputStream, OutputStream}
import java.net.URI
/**
* 使用Hanlp必须继承IIOAdapter,因为是使用我们自定义的分词库
* 当用户自定义语料库在HDFS上的时候,配置此IIOAdapter
* usage:
* 1、在HDFS创建/commoon/nlp目录
* 2、将hanlp.directory.tgz上传到hdfs的目录下
* 3、在当前工程中配置hanlp.properties
* 4、在语料库.bin的文件如果存在,加载词典的时候就会直接加载,如果有新词的时候,不会直接加载,
* 如果有新词的时候,不会直接加载,需要将bin删除,才会
*/
class HadoopFileIoAdapter extends IIOAdapter{
/**
* 这个主要是我们需要分词的文件的路径
* @param s
* @return
*/
override def open(path: String): InputStream = {
//1、获取操作hdfs的文件系统对象
val configuration = new Configuration()
val fs: FileSystem = FileSystem.get(URI.create(path), configuration)
//2、判断路径是否存在
if (fs.exists(new Path(path))){//此时说明存在
fs.open(new Path(path))
}else{
if (IOUtil.isResource(path)){
//判断这个资源路径是否为hanlp需要的资源路径
IOUtil.getResourceAsStream("/"+path)
}else{
new FileInputStream(path)
}
}
}
/**
* 创建一个文件,用于输出处理后的结果
* @param s
* @return
*/
override def create(path: String): OutputStream = {
val configuration = new Configuration()
val fs: FileSystem = FileSystem.get(URI.create(path),configuration)
fs.create(new Path(path))
}
}
package com.fuwei.bigdata.profile
import com.hankcs.hanlp.dictionary.stopword.CoreStopWordDictionary
import com.hankcs.hanlp.seg.common.Term
import com.hankcs.hanlp.tokenizer.StandardTokenizer
import com.qf.bigdata.profile.conf.Config
import com.qf.bigdata.profile.utils.SparkUtils
import org.apache.log4j.{Level, Logger}
import org.apache.spark.sql.{DataFrame, SaveMode, SparkSession}
import org.slf4j.LoggerFactory
import java.util
import scala.collection.{JavaConversions, mutable}
/**
* 对内容日志进行标签处理
*/
object NewsContentSegment {
private val logger = LoggerFactory.getLogger(NewsContentSegment.getClass.getSimpleName)
def main(args: Array[String]): Unit = {
Logger.getLogger("org").setLevel(Level.WARN)
//1、解析参数
val params: Config = Config.parseConfig(NewsContentSegment, args)
System.setProperty("HADOOP_USER_NAME",params.proxyUser)
logger.warn("job is running please wait for a moment ......")
//2、获取SparkSession
val spark: SparkSession = SparkUtils.getSparkSession(params.env, NewsContentSegment.getClass.getSimpleName)
import spark.implicits._
//3、如果是做本地测试,没有必要显示所有代码,测试10行数据即可
var limitData = ""
if (params.env.equalsIgnoreCase("dev")){
limitData = "limit 10"
}
//4、读取源数据
val sourceArticleDataSQL =
s"""
|select
|""".stripMargin
val sourceDF: DataFrame = spark.sql(sourceArticleDataSQL)
sourceDF.show()
//5、分词
val termsDF: DataFrame = sourceDF.mapPartitions(partition => {
//5.1存放结果的集合
var resTermList: List[(String, String)] = List[(String, String)]()
//5.2遍历分区数据
partition.foreach(row => {
//5.3获取到字段信息
val article_id: String = row.getAs("").toString
val context: String = row.getAs("").toString
//5.4分词
val terms: util.List[Term] = StandardTokenizer.segment(context)
//5.5去除停用词
val stopTerms: util.List[Term] = CoreStopWordDictionary.apply(terms) //去除terms中的停用词
//5.6转换为scala的buffer
val stopTermsAsScalaBuffer: mutable.Buffer[Term] = JavaConversions.asScalaBuffer(stopTerms)
//5.7保留名词,去除单个汉字,单词之间使用逗号隔开
val convertTerms: String = stopTermsAsScalaBuffer.filter(term => {
term.nature.startsWith("n") && term.word.length != 1
}).map(term => term.word).mkString(",")
//5.8构建单个结果
var res = (article_id, convertTerms)
//5.9去除空值
if (convertTerms.length != 0) {
resTermList = res :: resTermList //向结果中追加
}
})
resTermList.iterator
}).toDF("article_id", "context_terms")
termsDF.show()
//6、写入到hive
termsDF.write.mode(SaveMode.Overwrite).format("ORC").saveAsTable("dwd_news.news_article_terms")
//7、释放资源
spark.close()
logger.info(" job has success.....")
}
}
spark自定义jar包测试
${SPARK_HOME}/bin/spark-submit \
--jars /data/apps/hive-1.2.1/auxlib/hudi-spark-bundle_2.11-0.5.2-incubating.jar \
--conf spark.sql.hive.convertMetastoreParquet=false \
--conf spark.executor.heartbeatInterval=120s \
--conf spark.network.timeout=600s \
--conf spark.sql.catalogImplementation=hive \
--conf spark.yarn.submit.waitAppCompletion=false \
--name user_profile_terms \
--conf spark.task.cpus=1 \
--conf spark.executor.cores=4 \
--conf spark.sql.shuffle.partitions=50 \
--master yarn \
--deploy-mode cluster \
--driver-memory 1G \
--executor-memory 3G \
--num-executors 1 \
--class com.fuwei.bigdata.profile.NewsContentSegment \
/data/jar/user-profile.jar \
-e prod -x root
标签:String,val,--,hanlp,分词器,import,spark,Hanlp 来源: https://blog.csdn.net/li1579026891/article/details/122414432