python-如何使用PySpark HashPartitioner检测大型json文件中的重复项
作者:互联网
我有一个很大的json文件,其中包含20GB以上的json结构元数据.它包含跨某些应用程序的简单用户元数据,我希望对其进行筛选以检测重复项.以下是数据外观的示例:
{"created": "2015-08-04", "created_at": "2010-03-15", "username": "koleslawrulez333"}
{"created": "2016-01-19", "created_at": "2012-05-25", "name": "arthurking231"}
{"created": "2016-07-23", "type": "Username", "created_at": "2011-08-27", "name": "starklord1943"}
{"created": "2015-11-08", "created_at": "2010-01-19", "name": "Assasinator5827"}
json文件逐行包含看起来与此非常相似的json对象.当两个json对象的“名称”字段相同时,将发生重复.因此,这是重复的:
{"created": "2016-07-23", "type": "Username", "created_at": "2011-08-27", "name": "Assasinator5827"}
{"created": "2015-11-08", "created_at": "2010-01-19", "name": "Assasinator5827"}
多达两个完全相同的json对象.
现在,我想遍历一个太大而无法放入内存的整个json文件,并通过使用最佳标准,找出所有重复项以及它们是什么重复项,然后执行一些逻辑-逻辑部分很简单,但是我不确定如何找到重复项.
我的想法:
>我考虑使用的第一件事是布隆过滤器.它们并不是那么令人困惑,并且可以很好,快速地工作,而且我认为它们本质上可以归结为O(n).但是,bloom过滤器不会让我知道重复的字符串是重复的字符串,这对我来说是个小问题.
>我考虑过使用外部合并排序.我基本上将文件划分为多个较小的文件,这些文件将适合内存,对每个块进行排序并搜索重复项(现在将它们聚集在一起).但是我不确定这个实现是否是我想要的.
>我遇到的下一件事情是按分区进行散列,我怀疑这是我想要的.本质上,散列是处理内存中的数据时查找重复项的最佳方法,那么为什么不将其用于不适合的数据呢?我对如何按分区散列有些困惑.我不确定这是否是我要找的东西.
因此,我认为我应该使用选项3(按分区散列),并且我知道Spark具有该功能.我希望有人可以让我知道我是否走上了正确的道路,并可能给我一些有关我是否正确的指示.从概念上讲,我有几个具体问题:
>假设我创建了100个完全适合内存的分区(因此,在我的情况下,每个分区为100MB).假设我将json文件中的前x个元素散列到一个分区中,但没有发现重复项.假设我有另一个分区,其中第二个100MB数据也没有重复.如果一次只能加载100MB的数据,该如何检查分区1和分区2彼此没有重复?要澄清的是,如果分区1的元素和分区2的元素相同,那么我怎么弄清楚呢?我想我需要将两者都加载到内存中,对吗?如果我不能…那我该怎么办?也许我误会了…
>这是我的第二个问题-似乎不是分区的工作方式,并且当您按分区进行散列时,具有相似散列或散列范围的元素将进入特定文件.因此,如果两个元素重复,我就会知道,因为该算法会尝试将其放入哈希已经存在的文件中.是这样吗
我知道我还有更多问题,我想不出来.有人有提示吗?特别是关于pyspark以及如何最好地使用它?还是pyspark不是我想要的?
解决方法:
这个问题比您想象的要简单.您确实只需要按照@Hitobat的建议按名称聚合数据.我将使用pyspark.sql.Window解决问题,以简化聚合输出.
给定以下数据是一个名为data.json的文件(也可以是文件目录,而不是单个文件)
data.json的内容
{"created": "2015-08-04", "created_at": "2010-03-15", "username": "koleslawrulez333"}
{"created": "2016-01-19", "created_at": "2012-05-25", "name": "arthurking231"}
{"created": "2016-07-23", "type": "Username", "created_at": "2011-08-27", "name": "starklord1943"}
{"created": "2015-11-08", "created_at": "2010-01-19", "name": "Assasinator5827"}
{"created": "2016-07-23", "type": "Username", "created_at": "2011-08-27", "name": "Assasinator5827"}
然后,pyspark代码将如下所示:
from pyspark.sql import Window
from pyspark.sql import functions as F
df = spark.read.json("data.json") # can be a directory of files as well
df.show()
输出量
+----------+----------+---------------+--------+----------------+
| created|created_at| name| type| username|
+----------+----------+---------------+--------+----------------+
|2015-08-04|2010-03-15| null| null|koleslawrulez333|
|2016-01-19|2012-05-25| arthurking231| null| null|
|2016-07-23|2011-08-27| starklord1943|Username| null|
|2015-11-08|2010-01-19|Assasinator5827| null| null|
|2016-07-23|2011-08-27|Assasinator5827|Username| null|
+----------+----------+---------------+--------+----------------+
然后分区并使用pyspark.sql.Window计数
name_partition_window = Window.partitionBy("name")
df_with_repeat_counts = df.select("*", F.count("*").over(name_partition_window).alias("name_counts"))
df_with_repeat_counts.show()
输出量
+----------+----------+---------------+--------+----------------+-----------+
| created|created_at| name| type| username|name_counts|
+----------+----------+---------------+--------+----------------+-----------+
|2016-01-19|2012-05-25| arthurking231| null| null| 1|
|2015-08-04|2010-03-15| null| null|koleslawrulez333| 1|
|2015-11-08|2010-01-19|Assasinator5827| null| null| 2|
|2016-07-23|2011-08-27|Assasinator5827|Username| null| 2|
|2016-07-23|2011-08-27| starklord1943|Username| null| 1|
+----------+----------+---------------+--------+----------------+-----------+
然后在name_count列上过滤数据框并按名称排序以进行检查
duplicates = df_with_repeat_counts.where(F.col("name_counts") > 1).orderBy("name")
duplicates.show()
输出量
+----------+----------+---------------+--------+--------+-----------+
| created|created_at| name| type|username|name_counts|
+----------+----------+---------------+--------+--------+-----------+
|2015-11-08|2010-01-19|Assasinator5827| null| null| 2|
|2016-07-23|2011-08-27|Assasinator5827|Username| null| 2|
+----------+----------+---------------+--------+--------+-----------+
此时,您可以根据用例需要分析重复的数据帧.
标签:data-partitioning,pyspark,json,hash,python 来源: https://codeday.me/bug/20191210/2104947.html