其他分享
首页 > 其他分享> > Hanlp分词器(通过spark)

Hanlp分词器(通过spark)

作者:互联网

这里主要是对内容数据进行标签处理

这里我们是用分词器是HanLP

HanLP是哈工大提供的一种中文分词的工具,因为他支持Java API

这里我们使用spark + hanlp进行中文分词

1、准备工作

##1. 在hdfs创建目录用于存放hanlp的数据
[root@hadoop ~]# hdfs dfs -mkdir -p /common/nlp/data

##2. 将hanlp的工具上传到服务器的指定位置
##3. 解压到当前目录
[root@hadoop soft]# tar -zxvf hanlp.dictionary.tgz

##4. 将语料库上传到hdfs的指定位置
[root@hadoop soft]# hdfs dfs -put ./dictionary/ /common/nlp/data

##5. 将这个hanlp.properties拷贝到当前工程下的resources目录下
使用Hanlp必须继承IIOAdapter,因为是使用我们自定义的分词库
package com.fuwei.bigdata.profile.nlp.hanlp

import com.hankcs.hanlp.corpus.io.{IIOAdapter, IOUtil}
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.{FileSystem, Path}

import java.io.{FileInputStream, InputStream, OutputStream}
import java.net.URI


/**
 * 使用Hanlp必须继承IIOAdapter,因为是使用我们自定义的分词库
 * 当用户自定义语料库在HDFS上的时候,配置此IIOAdapter
 * usage:
 * 1、在HDFS创建/commoon/nlp目录
 * 2、将hanlp.directory.tgz上传到hdfs的目录下
 * 3、在当前工程中配置hanlp.properties
 * 4、在语料库.bin的文件如果存在,加载词典的时候就会直接加载,如果有新词的时候,不会直接加载,
 * 如果有新词的时候,不会直接加载,需要将bin删除,才会
 */
class HadoopFileIoAdapter extends IIOAdapter{

    /**
     * 这个主要是我们需要分词的文件的路径
     * @param s
     * @return
     */
    override def open(path: String): InputStream = {
        //1、获取操作hdfs的文件系统对象
        val configuration = new Configuration()
        val fs: FileSystem = FileSystem.get(URI.create(path), configuration)
        //2、判断路径是否存在
        if (fs.exists(new Path(path))){//此时说明存在
            fs.open(new Path(path))
        }else{
            if (IOUtil.isResource(path)){
                //判断这个资源路径是否为hanlp需要的资源路径
                IOUtil.getResourceAsStream("/"+path)
            }else{
                new FileInputStream(path)
            }
        }
    }

    /**
     * 创建一个文件,用于输出处理后的结果
     * @param s
     * @return
     */
    override def create(path: String): OutputStream = {
        val configuration = new Configuration()
        val fs: FileSystem = FileSystem.get(URI.create(path),configuration)
        fs.create(new Path(path))
    }

}

package com.fuwei.bigdata.profile

import com.hankcs.hanlp.dictionary.stopword.CoreStopWordDictionary
import com.hankcs.hanlp.seg.common.Term
import com.hankcs.hanlp.tokenizer.StandardTokenizer
import com.qf.bigdata.profile.conf.Config
import com.qf.bigdata.profile.utils.SparkUtils
import org.apache.log4j.{Level, Logger}
import org.apache.spark.sql.{DataFrame, SaveMode, SparkSession}
import org.slf4j.LoggerFactory

import java.util
import scala.collection.{JavaConversions, mutable}





/**
 * 对内容日志进行标签处理
 */
object NewsContentSegment {
    private val logger = LoggerFactory.getLogger(NewsContentSegment.getClass.getSimpleName)

    def main(args: Array[String]): Unit = {
        Logger.getLogger("org").setLevel(Level.WARN)
        //1、解析参数
        val params: Config = Config.parseConfig(NewsContentSegment, args)
        System.setProperty("HADOOP_USER_NAME",params.proxyUser)
        logger.warn("job is running please wait for a moment ......")

        //2、获取SparkSession
        val spark: SparkSession = SparkUtils.getSparkSession(params.env, NewsContentSegment.getClass.getSimpleName)
        import spark.implicits._

        //3、如果是做本地测试,没有必要显示所有代码,测试10行数据即可
        var limitData = ""
        if (params.env.equalsIgnoreCase("dev")){
            limitData = "limit 10"
        }

        //4、读取源数据
        val sourceArticleDataSQL =
            s"""
               |select
               |""".stripMargin

        val sourceDF: DataFrame = spark.sql(sourceArticleDataSQL)
        sourceDF.show()

        //5、分词
        val termsDF: DataFrame = sourceDF.mapPartitions(partition => {
            //5.1存放结果的集合
            var resTermList: List[(String, String)] = List[(String, String)]()

            //5.2遍历分区数据
            partition.foreach(row => {
                //5.3获取到字段信息
                val article_id: String = row.getAs("").toString
                val context: String = row.getAs("").toString

                //5.4分词
                val terms: util.List[Term] = StandardTokenizer.segment(context)
                //5.5去除停用词
                val stopTerms: util.List[Term] = CoreStopWordDictionary.apply(terms) //去除terms中的停用词

                //5.6转换为scala的buffer
                val stopTermsAsScalaBuffer: mutable.Buffer[Term] = JavaConversions.asScalaBuffer(stopTerms)

                //5.7保留名词,去除单个汉字,单词之间使用逗号隔开
                val convertTerms: String = stopTermsAsScalaBuffer.filter(term => {
                    term.nature.startsWith("n") && term.word.length != 1
                }).map(term => term.word).mkString(",")

                //5.8构建单个结果
                var res = (article_id, convertTerms)

                //5.9去除空值
                if (convertTerms.length != 0) {
                    resTermList = res :: resTermList //向结果中追加
                }
            })
            resTermList.iterator
        }).toDF("article_id", "context_terms")

        termsDF.show()

        //6、写入到hive
        termsDF.write.mode(SaveMode.Overwrite).format("ORC").saveAsTable("dwd_news.news_article_terms")

        //7、释放资源
        spark.close()
        logger.info(" job has success.....")
    }

}

spark自定义jar包测试

${SPARK_HOME}/bin/spark-submit \
--jars /data/apps/hive-1.2.1/auxlib/hudi-spark-bundle_2.11-0.5.2-incubating.jar \
--conf spark.sql.hive.convertMetastoreParquet=false \
--conf spark.executor.heartbeatInterval=120s \
--conf spark.network.timeout=600s \
--conf spark.sql.catalogImplementation=hive \
--conf spark.yarn.submit.waitAppCompletion=false \
--name user_profile_terms \
--conf spark.task.cpus=1 \
--conf spark.executor.cores=4 \
--conf spark.sql.shuffle.partitions=50 \
--master yarn \
--deploy-mode cluster \
--driver-memory 1G \
--executor-memory 3G \
--num-executors 1 \
--class com.fuwei.bigdata.profile.NewsContentSegment \
/data/jar/user-profile.jar \
-e prod -x root

标签:String,val,--,hanlp,分词器,import,spark,Hanlp
来源: https://blog.csdn.net/li1579026891/article/details/122414432