其他分享
首页 > 其他分享> > pyspark 中的rdd api 编码练习

pyspark 中的rdd api 编码练习

作者:互联网

1,使用pyspark 的rdd api 进行了数据文件的处理,包括构建RDD, 统计分析RDD ,从文件中读取数据RDD,从文件中构建 rdd的模式shema. 

然后通过模式,从rdd中生成dataframe。

 

2,代码

'''
构建sparkSession 和练习数据(RDD 和 KV rdd)
'''
spark = SparkSession.builder.appName("rdd_api_test")\
    .master("local[2]")\
    .getOrCreate()
sc = spark.sparkContext
rdd1 = sc.parallelize([1, 5, 60, 'a', 9, 'c', 4, 'z', 'f'])
rdd2 = sc.parallelize([('a', 6),
                      ('a', 1),
                      ('b', 2),
                      ('c', 5),
                      ('c', 8),
                      ('c', 11)])

'''
查看rdd元素 , 元素个数, KV对RDD中key的出现次数, 分区个数等常用api
'''
print(rdd2.collect())
print (rdd2.take(2))
print('sum of elements:', rdd2.count())
print('RDD count of key:', rdd2.countByKey())
print('RDD output as map:',  rdd2.collectAsMap())
print('RDD number of partitions:', rdd2.getNumPartitions())

'''
数值型rdd ,常用统计函数, 最小,最大 ,平均 , 标准差,方差
'''
rdd5 = sc.parallelize(range(100))
print('RDD Min:', rdd5.min()) # rdd 最小值
print('RDD Max:', rdd5.max())
print('RDD Mean:', rdd5.mean())
print('RDD Standard deviation:', rdd5.stdev())
print('RDD Variance:', rdd5.variance())


'''
从文件读取数据,并且去掉第一行列名,进行显示
'''
full_csv = sc.textFile('nba.csv')
header = full_csv.first()
print(full_csv.filter(lambda line: line != header).take(4)) # 去掉头行

'''
#从 文档中读出文件头部,设置rdd模式,然后把RDD转化为df
'''
from pyspark.sql.types import IntegerType, DoubleType, StringType, StructType, StructField

header_list = sc.textFile('customerheaders.txt')\
    .map(lambda line: line.split(":")).collect()  #从文件中读取rdd的模式,后续把rdd转化为Dataframe

def strToType(str):   #字符串到 数据类型的转化器
  if str == 'int':
    return IntegerType()
  elif str == 'double':
    return DoubleType()
  else:
    return StringType()

schema = StructType([StructField(t[0], strToType(t[1]), True) for t in header_list])  #构造结构化类型,也就是表的schema
for item in schema:
  print(item)

'''
对原始文件rdd中每一行进行规范化处理
'''
customers_rdd = sc.textFile('customers.txt')
def parseLine(line):
  tokens = zip(line.split(","), header_list)
  parsed_tokens = []
  for token in tokens:
    token_type = token[1][1]
    print('token_type = ', token[0])
    if token_type == 'double':
      parsed_tokens.append(float(token[0]))
    elif token_type == 'int':
      parsed_tokens.append(int(token[0]))
    else:
      parsed_tokens.append(token[0])
  return parsed_tokens

records = customers_rdd.map(parseLine)  # 解析每一行

for item in records.take(4):
    print(item)

df = sc.createDataFrame(records, schema) # rdd --> df
print (df)

'''
其他一些API介绍:
rdd.foreach([FUNCTION]): 对每个元素执行函数
rdd.groupBy([CRITERA]):  分组聚合 like: ('a', 1) ('b',2) ('a', 3) --> ('a',Iterable(1,3)) ('b', 2) 
rdd.subtract(rdd2):  做差集计算,元素在rdd中出现,没有在rdd2中出现
rdd.subtractByKey(rdd2): 同上,适用于KV rdd
rdd.sortBy([FUNCTION]): 自定义RDD元素排序
rdd.sortByKey():  按照key 进行排序,其中key的类型必须实现了排序逻辑
rdd.join(rdd2):   like : ('a', 1) ('b',2) ('a', 3) --> ('a',(1,3)) ('b', 2)

'''

 

标签:rdd2,sc,RDD,pyspark,rdd,token,api,print
来源: https://www.cnblogs.com/gao1261828/p/16385331.html