5.RDD操作综合实例
作者:互联网
一、词频统计
A. 分步骤实现
1、准备文件
下载小说或长篇新闻稿
上传到hdfs上
start-all.sh
hdfs dfs -put 666.txt
hdfs dfs -ls
2、读文件创建RDD
>>> lines = sc.textFile("/home/hadoop/666.txt")
>>> lines.foreach(print)
3、分词
>>> words =lines.flatMap(lambda line :line.split())
>>> words.collect()
4、排除大小写lower(),map()
>>> words2 = words.map(lambda word:word.lower())
>>> words2.collect()
标点符号re.split(pattern,str),flatMap(),
>>> import re
>>> words3 = words2.flatMap(lambda line:re.split('\W+',line))
>>> words3.collect()
停用词,stopwords.txt,filter(),
hdfs dfs -put /home/hadoop/virtual_share/stopwords.txt
hdfs dfs -ls
>>> words4 = words3.flatMap(lambda a:a.split())
>>> words4.collect()
>>> stopwords = sc.textFile("file:///home/hadoop/stopwords.txt").flatMap(lambda a:a.split()).collect()
>>> stopwords
>>> words5 = words4.filter(lambda a:a not in stopwords)
>>> words5.collect()
长度小于2的词filter()
>>> words6 = words5.filter(lambda a:len(a)>2)
>>> words6.collect()
5、统计词频
>>> words7 = words6.map(lambda a:(a,1))
>>> words7.collect()
>>> words7 = words7.reduceByKey(lambda a,b:a+b)
>>> words7.collect()
6、按词频排序
>>> words8 = words7.sortBy(lambda x:x[1], False)
>>> words8.collect()
7、输出到文件
>>> words8.savaAsTextFile('ad_RDD')
>>> words8.saveAsTextFile('file:///home/hadoop/ad_RDD')
8、查看结果
cd ad_RDD/
cat part-00000 | head -5
B. 一句话实现:文件入文件出
C. 和作业2的“二、Python编程练习:英文文本的词频统计 ”进行比较,理解Spark编程的特点
Spark的特性主要有以下四点:
快速
- 与 Hadoop 的 MapReduce 相比, Spark 基于内存的运算是 MapReduce 的 100 倍.基于硬盘的运算也要快 10 倍以上.
- Spark 实现了高效的 DAG 执行引擎, 可以通过基于内存来高效处理数据流
易用
- Spark 支持 Scala, Java, Python, R 和 SQL 脚本, 并提供了超过 80 种高性能的算法, 非常容易创建并行 App
- 而且 Spark 支持交互式的 Python 和 Scala 的 shell, 这意味着可以非常方便地在这些 shell 中使用 Spark 集群来验证解决问题的方法, 而不是像以前一样 需要打包, 上传集群, 验证等. 这对于原型开发非常重要.
通用
- Spark 结合了SQL, Streaming和复杂分析.
- Spark 提供了大量的类库, 包括 SQL 和 DataFrames, 机器学习(MLlib), 图计算(GraphicX), 实时流处理(Spark Streaming) .
- 可以把这些类库无缝的柔和在一个 App 中.
- 减少了开发和维护的人力成本以及部署平台的物力成本.
易融合性
- Spark 可以非常方便的与其他开源产品进行融合,比如, Spark 可以使用 Hadoop 的 YARN 和 Appache Mesos 作为它的资源管理和调度器, 并且可以处理所有 Hadoop 支持的数据, 包括 HDFS, HBase等.
二、求Top值
上传文件到hdfs
1、丢弃不合规范的行:
分词
处理空行、少数据项
处理缺失数据
>>> import re
>>> account = sc.textFile('payment.txt').flatMap(lambda a:re.split('\W+',a)).flatMap(lambda a:a.split())
>>> account.collect()
2、支付金额转换为数值型,按支付金额排序
>>> account1 = account.map(lambda a:(a,int(a)))
>>> account1.collect()
3、取出Top3
>>> account1.saveAsTextFile('top')
标签:hdfs,flatMap,collect,RDD,实例,split,操作,Spark,lambda 来源: https://www.cnblogs.com/n1254088/p/16115063.html