pyspark 中的rdd api 编码练习
作者:互联网
1,使用pyspark 的rdd api 进行了数据文件的处理,包括构建RDD, 统计分析RDD ,从文件中读取数据RDD,从文件中构建 rdd的模式shema.
然后通过模式,从rdd中生成dataframe。
2,代码
''' 构建sparkSession 和练习数据(RDD 和 KV rdd) ''' spark = SparkSession.builder.appName("rdd_api_test")\ .master("local[2]")\ .getOrCreate() sc = spark.sparkContext rdd1 = sc.parallelize([1, 5, 60, 'a', 9, 'c', 4, 'z', 'f']) rdd2 = sc.parallelize([('a', 6), ('a', 1), ('b', 2), ('c', 5), ('c', 8), ('c', 11)]) ''' 查看rdd元素 , 元素个数, KV对RDD中key的出现次数, 分区个数等常用api ''' print(rdd2.collect()) print (rdd2.take(2)) print('sum of elements:', rdd2.count()) print('RDD count of key:', rdd2.countByKey()) print('RDD output as map:', rdd2.collectAsMap()) print('RDD number of partitions:', rdd2.getNumPartitions()) ''' 数值型rdd ,常用统计函数, 最小,最大 ,平均 , 标准差,方差 ''' rdd5 = sc.parallelize(range(100)) print('RDD Min:', rdd5.min()) # rdd 最小值 print('RDD Max:', rdd5.max()) print('RDD Mean:', rdd5.mean()) print('RDD Standard deviation:', rdd5.stdev()) print('RDD Variance:', rdd5.variance()) ''' 从文件读取数据,并且去掉第一行列名,进行显示 ''' full_csv = sc.textFile('nba.csv') header = full_csv.first() print(full_csv.filter(lambda line: line != header).take(4)) # 去掉头行 ''' #从 文档中读出文件头部,设置rdd模式,然后把RDD转化为df ''' from pyspark.sql.types import IntegerType, DoubleType, StringType, StructType, StructField header_list = sc.textFile('customerheaders.txt')\ .map(lambda line: line.split(":")).collect() #从文件中读取rdd的模式,后续把rdd转化为Dataframe def strToType(str): #字符串到 数据类型的转化器 if str == 'int': return IntegerType() elif str == 'double': return DoubleType() else: return StringType() schema = StructType([StructField(t[0], strToType(t[1]), True) for t in header_list]) #构造结构化类型,也就是表的schema for item in schema: print(item) ''' 对原始文件rdd中每一行进行规范化处理 ''' customers_rdd = sc.textFile('customers.txt') def parseLine(line): tokens = zip(line.split(","), header_list) parsed_tokens = [] for token in tokens: token_type = token[1][1] print('token_type = ', token[0]) if token_type == 'double': parsed_tokens.append(float(token[0])) elif token_type == 'int': parsed_tokens.append(int(token[0])) else: parsed_tokens.append(token[0]) return parsed_tokens records = customers_rdd.map(parseLine) # 解析每一行 for item in records.take(4): print(item) df = sc.createDataFrame(records, schema) # rdd --> df print (df) ''' 其他一些API介绍: rdd.foreach([FUNCTION]): 对每个元素执行函数 rdd.groupBy([CRITERA]): 分组聚合 like: ('a', 1) ('b',2) ('a', 3) --> ('a',Iterable(1,3)) ('b', 2) rdd.subtract(rdd2): 做差集计算,元素在rdd中出现,没有在rdd2中出现 rdd.subtractByKey(rdd2): 同上,适用于KV rdd rdd.sortBy([FUNCTION]): 自定义RDD元素排序 rdd.sortByKey(): 按照key 进行排序,其中key的类型必须实现了排序逻辑 rdd.join(rdd2): like : ('a', 1) ('b',2) ('a', 3) --> ('a',(1,3)) ('b', 2) '''
标签:rdd2,sc,RDD,pyspark,rdd,token,api,print 来源: https://www.cnblogs.com/gao1261828/p/16385331.html