其他分享
首页 > 其他分享> > Mastering Spark for Data Science:输入格式和架构

Mastering Spark for Data Science:输入格式和架构

作者:互联网


本章的目的是演示如何将数据从其原始格式加载到不同的模式,从而使各种不同类型的下游分析能够在同一数据上运行。 在编写分析甚至更好地构建可重用软件的库时,通常必须使用固定输入类型的接口。 因此,根据目的,灵活地在模式之间转换数据的方式可以带来可观的下游价值,既可以扩展可能的分析类型,又可以重新使用现有代码。

我们的主要目标是学习Spark随附的数据格式功能,尽管我们还将通过引入行之有效的方法来深入研究数据管理的精髓,这些方法将增强您的数据处理能力并提高您的生产率。 毕竟,很可能在某个时候您将被要求正规化工作,并且在编写分析以及以后编写分析的过程中,如何避免潜在的长期陷阱是非常宝贵的。

考虑到这一点,我们将使用本章介绍传统上易于理解的数据模式领域。 我们将涵盖传统数据库建模的关键领域,并解释其中的一些基石原理如何仍适用于Spark。

此外,在磨练我们的Spark技能的同时,我们将分析GDELT数据模型,并展示如何以有效和可扩展的方式存储此大型数据集。

我们将涵盖以下主题:

尺寸建模:与Spark相关的优点和缺点
专注于GDELT模型
抬起读取模式的盖子
Avro对象模型
实木复合地板储物型号
让我们从一些最佳实践开始。

有条理的生活是美好的生活
在了解Spark和大数据的好处时,您可能听说过有关结构化数据与半结构化数据与非结构化数据的讨论。尽管Spark促进了结构化,半结构化和非结构化数据的使用,但它也为其一致的处理提供了基础。唯一的限制是它应该基于记录。如果它们是基于记录的,则无论其组织如何,都可以以相同的方式来转换,丰富和操纵数据集。

但是,值得注意的是,拥有非结构化数据并不一定要采用非结构化方法。在上一章中已经确定了探索数据集的技术之后,很容易直接进入将数据存储在可访问的位置并立即开始简单的概要分析。在现实生活中,此活动通常优先于尽职调查。再一次,我们鼓励您考虑几个关键的感兴趣领域,例如文件完整性,数据质量,计划管理,
版本管理,安全性等,然后再进行此探索。这些不容忽视,许多本身就是大话题。

因此,尽管我们已经在第2章“数据获取”中讨论了许多这些问题,并将在以后进行更多研究,例如在第13章“安全数据”,但在本章中,我们将重点关注数据输入和输出格式,我们可以采用的一些方法,以确保更好的数据处理和管理。

GDELT空间建模

由于我们已选择在本书中使用GDELT进行分析,因此,我们将介绍使用该数据集的第一个示例。 首先,让我们选择一些数据。

有两种可用的数据流:全球知识图(GKG)和事件。
在本章中,我们将使用GKG数据创建可从Spark SQL查询的时间序列数据集。 这将为我们创建一些简单的入门分析提供一个很好的起点。
在接下来的第4章“探索性数据分析”和第5章“地理分析Spark”中,我们将更详细地介绍GKG。 然后,在第7章,构建社区中,我们将通过生成自己的人员网络图并将其用于一些很酷的分析中来探索事件。

GDELT模型
GDELT已经存在20多年了,在那段时间里,它进行了一些重大的修改。 对于我们的入门示例,为了简化起见,让我们限制从2013年4月1日开始的数据范围,当时GDELT对文件结构进行了大修,引入了GKG文件。 值得注意的是,本章讨论的原理适用于所有版本的GDELT数据,但是,此日期之前的特定模式和统一资源标识符(URI)可能与所描述的不同。 我们将使用的版本是GDELT v2.1,它是撰写本文时的最新版本。 但是,再次值得注意的是,这与GDELT 2.0略有不同。

GKG数据中有两个数据轨道:
1.整个知识图及其所有字段。
2.图的子集,其中包含一组预定义的类别。
我们将看第一首曲目。

首先查看数据


我们在第2章“数据采集”中讨论了如何下载GDELT数据,因此,如果您已经配置了NiFi管道来下载GKG数据,只需确保它在HDFS中可用即可。 但是,如果您还没有完成本章,那么我们建议您首先这样做,因为它解释了为什么您应该采用结构化的方法来获取数据。

尽管我们竭尽全力阻止使用临时数据下载,但本章的范围是众所周知的,因此,如果您有兴趣按照此处显示的示例进行操作,则可以跳过使用NiFi并获得 直接获取数据(以便尽快上手)。

如果您确实希望下载示例,这里提醒您在哪里可以找到GDELT 2.1 GKG主文件列表:

记下几个匹配的最新条目。 gkg.csv.zip,使用您喜欢的HTTP工具将其复制,然后将其上传到HDFS。 例如:

http: //data. gdeltproject. org/gdeltv2/masterfilelist. txt

记下几个匹配的最新条目。 gkg.csv.zip,使用您喜欢的HTTP工具将其复制,然后将其上传到HDFS。 例如:

wget http: //data. gdeltproject. org/gdeltv2/20150218230000.gkg.csv.zip -o log. txt
unzip 20150218230000.gkg.csv.zip
hdfs dfs -put 20150218230000.gkg.csv /data/gdelt/gkg/2015/02/21/

记下几个匹配的最新条目。 gkg.csv.zip,使用您喜欢的HTTP工具将其复制,然后将其上传到HDFS。 例如:

注意
在加载到HDFS之前,实际上并不需要解压缩数据。 Spark的TextInputFormat类支持压缩类型,并将透明地解压缩。 但是,由于我们在上一章中解压缩了NiFi管道中的内容,因此在此处执行解压缩是为了保持一致性。

核心全球知识图模型

有一些重要的原则需要理解,无论从计算还是在人工方面,从长远来看肯定可以节省时间。 像许多CSV一样,此文件隐藏了一些复杂性,如果在此阶段无法很好地理解,则可能在稍后的大规模分析中成为一个真正的问题。 GDELT文档描述了数据。 可以在这里找到:http://data.gdeltproject.org/documentation/GDELT-Global_Knowledge_Graph_Codebook-V2.1.pdf。 它表示每条CSV行都是换行符分隔符,其结构如图1所示:

从表面上看,这似乎是一个很好的简单模型,通过该模型,我们可以简单地查询字段并使用封闭的数据,就像每天导入和导出到Microsoft Excel的CSV文件一样。 但是,如果我们更详细地检查这些字段,那么很明显,某些字段实际上是对外部源的引用,而其他字段是扁平化的数据,实际上是由其他表表示的。

隐藏的复杂性
GKG核心模型中的扁平化数据结构表示隐藏的复杂性。 例如,查看文档中的字段V2GCAM,它概述了以下概念:这是一系列逗号分隔的块,其中包含冒号分隔的键值对,代表GCAM变量的对及其各自的计数。 像这样:

如果我们参考GCAM规范http://data.gdeltproject.org/documentation/GCAM-MASTERCODEBOOK.TXT,我们可以将其翻译为:

还有其他以相同方式工作的字段,例如V2Locations,V2Persons,V2Organizations等。 那么,这到底是怎么回事? 这些嵌套结构都是什么?为什么选择这种方式表示数据? 实际上,事实证明,这是折叠维模型的一种便捷方法,这样它可以在单行记录中表示,而不会丢失任何数据或交叉引用。 实际上,这是一种常用的技术,称为非规范化。

标准化模型

传统上,维模型是一种包含许多事实和维表的数据库表结构。由于它们在实体关系图中的出现,它们通常被称为具有星型或雪花型方案。在这样的模型中,事实是可以被计数或求和的值,并且通常在给定的时间点提供度量。由于它们通常基于交易或重复事件,因此事实的数量倾向于变得非常大。另一方面,维度是信息的逻辑分组,其目的是限定事实或将事实上下文化。它们通常为通过分组或汇总来解释事实提供一个切入点。同样,维度可以是分层的,一个维度可以引用另一个维度。我们可以在图2中看到扩展的GKG尺寸结构图。
在我们的GCAM示例中,事实是在上表中找到的条目,而维度是GCAM参考本身。尽管这看起来像是一个简单的逻辑抽象,但这确实意味着我们有一个值得关注的重要领域:维度建模对于传统的数据库非常有用,在传统的数据库中,数据可以拆分成表格-在这种情况下,是GKG和GCAM表-这些类型的数据库,就其本质而言,已针对该结构进行了优化。例如,用于查找值或汇总事实的操作是本地可用的。但是,在使用Spark时,我们认为理所当然的某些操作可能会非常昂贵。例如,如果我们想对数百万个条目的所有GCAM字段取平均值,那么我们将需要执行非常大的计算。我们将在下图中更详细地讨论这一点:

复杂数据带来的挑战

探索了GKG数据模式后,我们现在知道分类法是典型的星形模式,其中单个事实表引用了多个维度表。 有了这种层次结构,如果我们需要以传统数据库所允许的相同方式对数据进行切片和切块,那么我们一定会很费力。

但是,是什么使得在Spark上进行处理如此困难? 让我们看一下这种类型的组织固有的三个不同问题。

问题1-上下文信息丢失

首先,在数据集的每个记录中使用的各种数组都有问题。 例如,V1Locations,V1Organizations和V1Persons字段都包含一个0或更多对象的列表。 因为我们没有用于获取此信息的文本的原始正文(尽管如果源是WEB,JSTOR等,我们有时可以获取它,因为它们将包含到源文档的链接),所以我们失去了上下文 实体之间的关系。
例如,如果我们的数据中包含[巴拉克·奥巴马,戴维·卡梅伦,弗朗索瓦·奥朗德,美国,法国,GB,德士古,埃索,壳牌],那么我们可以假设源文章与各国元首之间的会议有关 石油危机的国家。 但是,这仅是一个假设,可能并非如此,如果我们确实是客观的,我们可以同样地认为该文章与拥有著名雇员的公司有关。

为了帮助我们推断实体之间的这些关系,我们可以开发一个时间序列模型,该模型在一定时间段内采用GDELT字段的所有单个内容,并执行扩展联接。 因此,在一个简单的水平上,经常看到的那些对实际上更可能彼此相关,我们可以开始做出一些更具体的假设。 例如,如果我们在时间序列中看到[美国巴拉克·奥巴马] 100,000次,[法国巴拉克·奥巴马]仅5000次,那么很可能第一对之间存在紧密的关系,而第二对之间存在次要关系 第二。 换句话说,我们可以识别脆弱的关系,并在需要时将其删除。 可以大规模使用此方法来识别显然无关的实体之间的关系。 在第7章,建立社区中,我们使用此原理来识别一些极不可能的人之间的关系!

问题2:重新建立尺寸

使用任何非规范化数据,应该可以重构或扩大原始尺寸模型。 考虑到这一点,让我们看一个有用的Spark函数,它将帮助我们扩展数组并产生平坦的结果。 它称为DataFrame.explode,这是一个说明性示例:

import org.apache.spark.sql.{DataFrame, Dataset, SparkSession}
import org.apache.spark.sql.functions.{udf, array, explode, col}
object reestablish {
  case class Grouped(locations:Array[String],people:Array[String])
  def main(args: Array[String]): Unit = {
    val spark = SparkSession
      .builder
      .master("local[*]")
      .appName("myChart")
      .config("spark.sql.warehouse.dir", ".")
      .getOrCreate()
    val group = Grouped(Array("USA","France","GB"),Array("Barack Obama","David Cameron","Francois Hollande"))
    import spark.implicits._
    val ds = Seq(group).toDS()
    //ds.show()
  //val ds = Seq(1,2,3).toDS()
    ds.show()
    val frame: DataFrame = ds.withColumn("locations", explode($"locations")).withColumn("people",explode($"people"))
    frame.show()


  }
}

使用此方法,我们可以轻松地扩展数组,然后执行我们选择的分组。 扩展后,可以使用DataFrame方法轻松聚合数据,甚至可以使用SparkSQL完成数据。 在我们存储库中的Zeppelin笔记本中可以找到一个示例。

重要的是要理解,尽管此函数易于实现,但不一定具有高性能,并且可能隐藏所需的基础处理复杂性。 实际上,本章随附一个示例,其中使用Zeppelin笔记本中的GKG数据使用爆炸功能,因此,如果爆炸功能的作用域不合理,则该功能将在内存不足时返回堆空间问题。

此功能不能解决消耗大量系统资源的内在问题,因此在使用它时仍应小心。 虽然无法解决此一般问题,但可以通过仅执行必要的分组和联接,或者通过提前计算它们并确保在可用资源内完成对它们进行管理。 您甚至可能希望编写一种算法,该算法可拆分数据集并按顺序执行分组,每次都保持不变。 在第14章,可扩展算法中,我们探索了可帮助解决此问题和其他常见处理问题的方法。

问题3:包含参考数据
对于此问题,让我们看一下在图3中进行了扩展的GDELT事件数据:

这种图形表示形式引起人们对数据关系的关注,并表明了我们可能希望如何对其进行充气。 在这里,我们看到了许多字段,它们只是代码,需要翻译回其原始描述才能显示任何有意义的内容。 例如,为了解释Actor1CountryCode(GDELT事件),我们需要将事件数据与一个或多个提供翻译文本的独立参考数据集结合起来。 在这种情况下,文档告诉我们引用位于此处的CAMEO数据集:
http://data.gdeltproject.org/documentation/CAMEO.Manual.1.1b3.pdf。

这种连接一直在数据规模上带来严重的问题,并且根据给定的方案有多种处理方法-在此阶段,准确了解数据的使用方式非常重要,这可能需要立即进行连接, 并且可能会推迟到将来的某个时候。

如果我们选择在处理之前完全对数据进行非规范化或扁平化,那么先进行连接是有意义的。 在这种情况下,后续的分析肯定会更高效,因为相关的联接已经完成:

因此,在我们的示例中:

wc: 125, c2. 21: 4, c10. 1: 40, v10. 1: 3. 21111111

对于记录中的每个代码,都有一个到相应引用表的联接,整个记录变为:

WordCount: 125, General_Inquirer_Bodypt: 4, SentiWordNet: 40, SentiWordNet average:v10. 1: 3. 21111111

这是一个简单的更改,但是如果跨大量行执行,则它将占用大量磁盘空间。折衷方案是必须在某个时刻执行连接,可能是在摄取时执行,或者在摄取后将其作为常规批处理作业。完全按原样摄取数据并在方便用户的时候对数据集进行展平是完全合理的。无论如何,任何分析都可以使用扁平化的数据,并且数据分析人员无需担心此潜在的隐藏问题。另一方面,通常,将联接推迟到处理的后期进行,这意味着要与之联接的记录较少,因为管道中可能已有聚合步骤。在这种情况下,在最后一次机会加入表是有回报的,因为参考表或尺寸表通常足够小,可以进行广播连接或地图侧连接。由于这是一个非常重要的主题,因此我们将在本书中继续探讨处理联结方案的不同方法。

加载数据

正如我们在前几章中概述的那样,传统的系统工程通常采用一种模式将数据从其源移动到其目的地(即ETL),而Spark则倾向于依赖读取模式。 了解这些概念与模式和输入格式之间的关系非常重要,让我们更详细地描述此方面:

获取源数据->提取有用的位->转换这些位->加载到最终存储库

从表面上看,ETL方法似乎是明智的,实际上,几乎每个存储和处理数据的组织都已实施了ETL方法。 有一些非常受欢迎的,功能丰富的产品可以很好地执行ETL任务-更不用说Apache的开源产品Apache Camel http://camel.apache.org/etl-example.html。

但是,这种看似直接的方法掩盖了实现甚至简单数据管道所需的真正工作。 这是因为我们必须确保所有数据都符合固定的架构,然后才能使用它。 例如,如果我们想从起始目录中提取一些数据,则最小的工作如下:

1.确保我们一直在查看提取目录。
2.数据到达后,收集它。
3.确保数据没有丢失任何内容,并根据预定义的规则集进行验证。
4.根据预定义的规则集,提取我们感兴趣的数据部分。
5.根据预定义的方案转换这些选定的零件。
6.使用正确的版本化架构将数据加载到存储库(例如,数据库)中。
7.处理任何失败的记录。

我们可以立即在此处看到许多必须解决的格式问题:

1.我们有一个预定义的规则集,因此,它必须是版本控制的。任何错误都将意味着最终数据库中的不良数据,并通过ETL流程重新记录该数据以进行更正(这既浪费时间又浪费资源)。对入站数据集格式的任何更改,都必须更改此规则集。
2.对目标架构的任何更改都将需要非常仔细的管理。至少,ETL中的版本控制发生了更改,甚至可能重新处理了部分或全部先前的数据(这可能是非常耗时且昂贵的回程)。
3.对最终存储库的任何更改都将至少导致版本控制架构更改,甚至可能导致新的ETL模块更改(同样,这非常耗费时间和资源)。
4.不可避免地,会有一些不良数据使其进入数据库。因此,管理员将需要设置规则来监视表的引用完整性,以确保将损坏保持在最低水平,并安排重新输入任何损坏的数据。

如果我们现在考虑这些问题并大量增加数据的数量,速度,多样性和准确性,那么很容易看出我们简单的ETL系统已迅速成长为几乎无法管理的系统。 格式,架构和业务规则的任何更改都会产生负面影响。 在某些情况下,由于需要所有处理步骤,因此可能没有足够的处理器和内存资源甚至无法跟上步伐。 在所有ETL步骤都已得到同意并就位之前,无法提取数据。 在大型公司中,甚至可能在任何实施开始之前就需要数月的时间来同意模式转换,从而导致大量积压甚至数据丢失。 所有这些都会导致系统难以更改的脆弱性。

模式敏捷性

为了克服这个问题,读取模式鼓励我们转向一个非常简单的原理:在运行时将模式应用于数据,而不是在加载时(即在摄取时)应用模式。 换句话说,在读取数据进行处理时,会将模式应用于数据。 这在某种程度上简化了ETL流程:

获取源数据->加载到最终存储库

当然,这并不意味着您完全消除了转换步骤。 您只是在推迟验证,应用业务规则,错误处理,确保引用完整性,丰富,汇总以及以其他方式夸大模型,直到准备好使用模型为止。 这样做的目的是,到那时,您应该对数据以及对您希望使用它的方式有更多的了解。 因此,您可以使用对数据的更多了解来提高加载方法的效率。 同样,这是一个折衷。 您节省的前期处理成本中,您可能会丢失重复处理和潜在的不一致之处。 但是,诸如持久性,索引编制,存储和缓存之类的技术都可以在这里提供帮助。 如上一章所述,由于处理步骤顺序的逆转,此过程通常称为ELT。

这种方法的一个好处是,它允许更大的自由度来针对任何给定用例对您表示和建模数据的方式做出适当的决策。 例如,可以采用多种方式来对数据进行结构化,格式化,存储,压缩或序列化,并且鉴于与您要解决的特定问题相关的一组特定要求,选择最合适的方法是很有意义的。

这种方法提供的最重要的机会之一是,您可以选择如何物理布局数据,即,决定保存数据的目录结构。通常不建议将所有数据存储在一个目录中,因为随着文件数量的增加,底层文件系统需要花费更长的时间才能解决它们。但是,理想情况下,我们希望能够指定最小的数据拆分以实现功能,并以所需的容量有效地存储和检索。因此,应根据所需的分析和期望接收的数据量在逻辑上对数据进行分组。换句话说,可以根据类型,子类型,日期,时间或某些其他相关属性在目录之间划分数据,但是应确保没有单个目录承担不适当的负担。在这里要意识到的另一个重要点是,一旦数据被登陆,就总是可以在以后重新格式化或重组,而在ETL范式中,这通常要困难得多。

除此之外,ELT还可以在变更管理和版本控制方面带来令人惊讶的好处。 例如,如果外部因素导致数据架构发生变化,则可以简单地将不同的数据加载到数据存储中的新目录中,并使用可支持架构演进的灵活的架构允许序列化库(例如Avro或Parquet)(我们将 请参阅本章后面的内容); 或者,如果特定作业的结果不令人满意,我们只需要在重新运行该作业之前更改该作业的内部即可。 这意味着架构更改变成可以基于每个分析而不是按每个提要进行管理的事物,并且可以更好地隔离和管理更改的影响。

顺便说一下,值得考虑使用一种混合方法,这种方法在流使用案例中特别有用,通过这种方法,可以在收集和提取期间进行某些处理,而在运行时进行其他处理。 是否使用ETL或ELT的决定不一定是二进制的。 Spark提供的功能可让您控制数据管道。 反过来,这为您提供了在必要时灵活地转换或持久化数据的灵活性,而不是采用“一刀切”的方法。

确定采用哪种方法的最佳方法是从特定数据集的实际日常使用中学习并相应地调整其处理过程,从而随着获得更多经验而确定瓶颈和脆弱性。 可能还会征收公司规则,例如病毒扫描或数据安全性,这些规则将确定特定的路由。 在本章的最后,我们将对此进行更多研究。

现实检查

与计算中的大多数事情一样,没有灵丹妙药。 ELT和读取模式无法解决您所有的数据格式问题,但它们是工具箱中的有用工具,通常来说,缺点通常胜过缺点。但是,值得注意的是,在某些情况下,如果您不小心,可能会遇到困难。

特别是,对复杂数据模型(与数据库相反)执行临时分析可能会涉及更多。例如,在提取新闻文章中提到的所有城市名称的列表的简单情况下,在SQL数据库中,您实际上可以从GKG中运行select CITY,而在Spark中,您首先需要了解数据模式,解析和验证数据,然后创建相关表并即时处理任何错误,有时是每次运行查询时。

同样,这是一个折衷。使用读取模式时,您将失去内置数据表示和固定模式的固有知识,但可以灵活地根据需要应用不同的模型或视图。像往常一样,Spark提供了旨在帮助利用这种方法的功能,例如转换,DataFrames,SparkSQL和REPL,并且在正确使用时,它们使您可以最大化读取模式的好处。我们将在继续学习中进一步了解这一点。

GKG ELT

当我们的NiFi管道将数据直接写入HDFS时,我们可以充分利用读取架构的优势,并立即开始使用它,而不必等待数据被处理。 如果您想更高级一些,则可以以可拆分和/或压缩的格式(例如bzip2(Spark的本机))加载数据。 让我们看一个简单的例子。

注意

HDFS使用块系统来存储数据。 为了以最有效的方式存储和利用数据,HDFS文件应尽可能拆分。 例如,如果使用TextOutputFormat类加载CSV GDELT文件,则大于块大小的文件将在filesize / blocksize块之间拆分。 部分块不会在磁盘上占据整个块的大小。

通过使用DataFrames,我们可以编写SQL语句来探索数据,也可以使用数据集来链接流利的方法,但是无论哪种情况,都需要一些初始准备。

好消息是,通常这可以完全由Spark来完成,因为它支持通过案例类,使用编码器将数据透明加载到Datasets中,因此在大多数情况下,您无需太担心内部工作原理。 确实,当您拥有一个相对简单的数据模型时,通常足以定义一个案例类,将数据映射到该案例类并使用toDS方法转换为数据集。 但是,在大多数现实世界中,数据模型更加复杂,您将需要编写自己的自定义解析器。 自定义解析器在数据工程中并不是什么新鲜事物,但是在读取模式设置中,数据科学家通常需要使用它们,因为数据的解释是在运行时完成的,而不是在加载时完成的。 下面是在我们的存储库中使用自定义GKG解析器的示例:

package test2

import org.apache.spark.sql.{DataFrame, SparkSession}
import org.apache.spark.sql.types.{LongType, StringType, StructField, StructType}


object GKGExaples {
  def GkgSchema = StructType(Array(
    StructField("GkgRecordId", StringType, true),                          //$1
    StructField("V21Date", StringType, true),                                       //$2
    StructField("V2SrcCollectionId" , StringType, true),                          //$3
    StructField("V2SrcCmnName"      , StringType, true),                          //$4
    StructField("V2DocId"           , StringType, true),                          //$5
    StructField("V1Counts"          , StringType, true),            //$6
    StructField("V21Counts"         , StringType, true),           //$7
    StructField("V1Themes"          , StringType, true),               //$8
    StructField("V2EnhancedThemes"  , StringType, true),    //$9
    StructField("V1Locations"       , StringType, true),         //$10
    StructField("V2Locations"       , StringType, true), //$11
    StructField("V1Persons"         , StringType, true),               //$12
    StructField("V2Persons"         , StringType, true),   //$13
    StructField("V1Orgs"            , StringType, true),               //$14
    StructField("V2Orgs"            , StringType, true),      //$15
    StructField("V1Stone"           , StringType, true),                       //$16
    StructField("V21Dates"          , StringType, true),    //$17
    StructField("V2GCAM"            , StringType, true),             //$18
    StructField("V21ShareImg"       , StringType, true),                          //$19
    StructField("V21RelImg"         , StringType, true),               //$20
    StructField("V21SocImage"       , StringType, true),               //$21
    StructField("V21SocVideo"       , StringType, true),               //$22
    StructField("V21Quotations"     , StringType, true),       //$23
    StructField("V21AllNames"       , StringType, true),            //$24
    StructField("V21Amounts"        , StringType, true),          //$25
    StructField("V21TransInfo"      , StringType, true),            //$26
    StructField("V2ExtrasXML"       , StringType, true)                           //$27
  ))

  def main(args: Array[String]): Unit = {
    val filename="F:\\code\\reestablib\\data\\20160101020000.gkg.csv"
    val spark = SparkSession
      .builder
      .master("local[*]")
      .appName("GKGExample")
      .config("spark.sql.warehouse.dir", ".")
      .getOrCreate()

    val df: DataFrame = spark.read.option("header","false").schema(GkgSchema).option("delimiter",",").csv(filename)
    df.createOrReplaceTempView("GKG")
    spark.sql("select V2GCAM from gkg   ").show()
    //spark.sql("SELECT AVG(GOLDSTEIN) AS GOLDSTEIN FROM GKG WHERE GOLDSTEIN IS NOT NULL").show()
  }



}

在此之前,您可以看到,一旦数据被解析,就可以在各种Spark API中使用。
如果您更喜欢使用SQL,则可以定义自己的架构,注册表并使用SparkSQL。 在这两种方法中,您都可以根据将如何使用数据来选择如何加载数据,从而可以更加灵活地花费时间进行解析。 例如,加载GKG的最基本模式是将每个字段都视为String,如下所示:

使用这种方法,您可以立即开始分析数据,这对于许多数据工程任务很有用。 准备就绪后,您可以选择GKG记录的其他元素以进行扩展。 在下一章中,我们将详细介绍。 拥有DataFrame之后,您可以通过定义case类并进行强制转换将其转换为Dataset,如下所示:

职位事项

在这里值得注意的是,当从CSV加载数据时,Spark的模式匹配完全在位置上。 这意味着,当Spark基于给定的分隔符对记录进行标记化时,即使存在标头,它也会使用其位置将每个标记分配给架构中的字段。 因此,如果架构定义中省略了某个列,或者由于数据漂移或数据版本控制而导致数据集随时间变化,则可能会出现Spark不一定会警告您的错位!

因此,我们建议您定期进行基本的数据分析和数据质量检查,以缓解这些情况。 您可以使用DataFrameStatFunctions中的内置函数来辅助此操作。 一些示例如下所示:

 df.describe("V1Themes").show()

会记录这行的平均数,中位数,用于统计数值类型字段的统计值,比如count, mean, stddev, min, max等。

注意:spark 里面的数据探索函数的使用

接下来,让我们解释一种在我们周围放置一些结构的好方法
代码,并通过使用Avro或Parquet减少编写的代码量。

Avro 
我们已经看到,不需要任何传统的ETL工具就可以轻松地提取一些数据并使用Spark进行分析。 虽然在几乎所有模式都被忽略的环境中工作非常有用,但这在商业世界中并不现实。 但是,有一个良好的中间立场,这为我们提供了优于ETL和无限制数据处理-Avro的巨大优势。
Apache Avro是序列化技术,其目的类似于Google protocol buffers。 与许多其他序列化技术一样,Avro使用模式描述数据,但其用途的关键在于它提供以下功能:

它将模式与数据一起存储。 由于架构仅在文件顶部存储一次,因此可以实现高效存储。 这也意味着即使原始类文件不再可用,也可以读取数据。
它支持读取架构和架构演进。 这意味着它可以为读取和写入数据实现不同的模式,从而提供了模式版本控制的优点,而没有每次我们希望进行数据修改时都会产生大量管理开销的缺点。
它与语言无关。 因此,它可以与允许自定义序列化框架的任何工具或技术一起使用。 例如,对于直接写入Hive尤其有用。

由于Avro使用包含的数据存储模式,因此它是自描述的。 因此,我们不必为没有类而费力地读取数据,也不必试图猜测适用于哪个版本的模式,或者在最坏的情况下必须完全丢弃数据,我们可以简单地为该模式查询Avro文件 写入数据。

Avro还允许以附加更改或追加的形式对模式进行修改,从而可以适应这些更改,从而使特定实现与旧数据向后兼容。

由于Avro以二进制形式表示数据,因此可以更有效地进行传输和处理。 此外,由于其固有的压缩功能,它占用磁盘上的空间更少。
由于上述原因,Avro是一种非常流行的序列化格式,已被多种技术和最终系统使用,您无疑会在某个时候使用它。 因此,在下一节中,我们将演示两种不同的读取和写入Avro格式数据的方式。 第一种是一种优雅而简单的方法,它使用了第三方专门创建的库,称为spark-avro,第二种是秘密的方法,对于理解Avro的工作原理很有用。

Spark-Avro方法

为了解决实现Avro的复杂性,已经开发了spark avro库。 这可以使用maven以通常的方式导入:

com.databricksspark-avro_2.113.1.0

对于此实现,我们将使用StructType对象创建Avro模式,使用RDD转换输入数据,并从两者创建DataFrame。 最后,可以使用spark-avro库将结果以Avro格式写入文件。
StructType对象是上面和第4章“探索性数据分析”中使用的GkgCoreSchema的变体,其构造如下:

 

Parquet 和 Avro之间是优化数据及IO的优化。

摘要
在本章中,我们已经看到了为什么在进行过多的探索工作之前始终应该彻底理解数据集。我们已经讨论了结构化数据和维度建模的细节,尤其是关于如何将其应用于GDELT数据集的方面,并扩展了GKG模型以显示其潜在的复杂性。

我们已经解释了传统ETL和更新的按读取架构的ELT技术之间的区别,并涉及了数据工程师在数据存储,压缩和数据格式方面面临的一些问题-特别是Avro和Parquet的优势和实现。我们还演示了使用各种Spark API探索数据的几种方法,包括如何在Spark Shell上使用SQL的示例。

我们可以通过提及存储库中的代码将所有内容组合在一起,并且是读取原始GKG文件的完整模型来结束本章(如果需要一些数据,请使用第1章,数据获取中的Apache NiFi GDELT数据接收管道)。
在下一章中,我们将通过探索用于大规模探索和分析数据的技术来更深入地研究GKG模型。我们将看到如何使用SQL开发和丰富我们的GKG数据模型,并研究Apache Zeppelin笔记本如何提供更丰富的数据科学体验。

 

第4章探索性数据分析

在商业环境中执行的探索性数据分析(EDA)通常作为较大工作的一部分进行委托,该工作是按照可行性评估的方式组织和执行的。 这项可行性评估的目的,也是我们所谓扩展EDA的重点,是回答有关所检查数据是否适合目的并值得进一步投资的一系列问题。

在此一般职权范围内,预计数据调查将涵盖可行性的多个方面,包括在生产中使用数据的实际方面,例如数据的及时性,质量,复杂性和覆盖范围,以及是否适合预期的假设。 被测试。 尽管从数据科学的角度来看,其中某些方面可能不太有趣,但这些以数据质量为主导的调查并不比单纯的统计见解重要。 当所涉及的数据集非常大且复杂,并且为数据科学准备数据所需的投资可能很大时,尤其如此。 为了说明这一点,并使这个话题变为现实,我们介绍了对事件,语言和语气全球数据库(GDELT)项目提供的庞大而复杂的全球知识图(GKG)数据提要进行EDA的方法。 

在本章中,我们将创建和解释EDA,同时涵盖以下主题:

了解规划和构建扩展探索性数据分析的问题和设计目标
什么是数据剖析,包括示例,以及如何围绕连续数据质量监控技术形成数据质量的通用框架
如何围绕该方法构造一个通用的基于掩码的数据探查器
如何将探索性指标存储到标准架构中,以利于研究随时间推移指标中的数据漂移
如何使用Apache Zeppelin笔记本进行快速的EDA工作,以及绘制图表和图形
如何以时间序列和时空数据集的形式提取和研究GDELT中的GCAM情绪
如何扩展Apache Zeppelin以使用绘图生成自定义plot.ly。 图书馆

问题,原则和计划
在本节中,我们将探讨为什么可能需要EDA并讨论创建EDA的重要注意事项。

(EDA)数据探索

EDA项目之前的一个难题是:请您给我一份您的EDA拟议费用的估算和明细表吗?
我们如何回答这个问题最终决定了我们的EDA战略和策略。 在过去的日子里,这个问题的答案通常是这样开始的:基本上,您是按列付费的。.此经验法则是基于以下前提:存在一个可迭代的数据探索工作单元,而这些工作单元 估算工作量,从而估算执行EDA的费用。
这个想法有趣的是,工作单元是根据要研究的数据结构而不是需要编写的函数来引用的。 这样做的原因是简单的。 假定功能的数据处理管道已经存在,而不是新工作,因此提供的报价实际上是将新输入的数据结构配置到我们的标准数据处理管道以探究数据的隐含成本。

这种想法将我们带到了主要的EDA问题上,即在计划任务和估计时间方面,探索似乎很难固定下来。 推荐的方法是将探索视为配置驱动的任务。 这有助于我们更有效地组织和评估工作,并帮助围绕工作进行思考,以使配置成为主要挑战,而不是编写许多临时的即弃代码。
配置数据浏览的过程也驱使我们考虑可能需要的处理模板。 我们将需要根据我们探索的数据的形式来配置它们。 例如,我们需要用于结构化数据,文本数据,图形形状数据,图像数据,声音数据,时间序列数据和空间数据的标准探索管道。 有了这些模板后,我们只需将输入数据映射到它们,然后配置我们的提取过滤器,即可对数据进行聚焦。

设计原则

将这些想法现代化以用于基于Apache Spark的EDA处理意味着我们需要设计一些可配置的EDA函数和代码时要牢记一些通用原则:
易于重用的功能/特性:我们需要定义功能以通用方式在通用数据结构上工作,以便它们产生良好的探索性功能并以最小化为新数据集配置功能所需的方式交付它们
最小化中间数据结构:我们需要避免激增中间模式,帮助最小化中间配置,并在可能的情况下创建可重用的数据结构。数据驱动的配置:在可能的情况下,我们需要具有可从元数据生成的配置,以减少手动样板工作
模板化可视化:从通用输入模式和元数据驱动的常规可重用可视化

最后,尽管这本身并不是严格的原则,但我们需要构建一种探索性工具,该工具应足够灵活以发现数据结构,而不是依赖于严格的预定义配置。 当出现问题时,这可以帮助我们对文件内容,编码或文件定义中可能存在的错误进行反向工程,以帮助他们解决问题。

探索总计划
所有EDA工作的早期阶段始终都是基于确定数据是否质量良好的简单目标。 如果我们专注于此,以创建一个广泛适用的通用入门计划,那么我们可以制定一组通用任务。
这些任务创建了拟议的EDA项目计划的总体形状,如下所示:

准备源工具,获取我们的输入数据集,查看文档,等等。 必要时检查数据的安全性。
获取,解密和暂存HDFS中的数据; 收集非功能需求(NFR)进行规划。
对文件内容运行代码点级别的频率报告。
对文件字段中丢失的数据量进行填充检查。
运行低粒度格式探查器以检查文件中的高基数字段。
对文件中的格式控制字段运行高粒度格式探查器。
在适当的地方运行参照完整性检查。
运行词典检查,以验证外部尺寸。
运行数字数据的基本数字和统计探索。
对感兴趣的关键数据进行更多基于可视化的探索。

注意

在字符编码术语中,代码点或代码位置是组成代码空间的任何数值。 许多代码点表示单个字符,但是它们也可以具有其他含义,例如用于格式化。

准备

既然我们已经制定了总体行动计划,那么在探索数据之前,我们必须首先投资构建可重复使用的工具,以进行探索管道中世俗的早期部分,从而帮助我们验证数据; 然后第二步调查GDELT的内容。

快速浏览新型数据的一种简单有效的方法是利用基于掩码的数据配置文件。 在这种情况下,掩码是字符串的转换函数,该字符串将数据项概括为一个特征,作为一组掩码,其基数比研究领域中的原始值低。
当将一列数据汇总为掩码频率计数时,该过程通常称为数据分析,它可以快速了解字符串的通用结构和内容,从而揭示原始数据是如何编码的。 考虑以下掩码以浏览数据:

将大写字母转换为A
将小写字母翻译成
将数字0到9转换为9

乍一看似乎很简单。 例如,让我们将此掩码应用于数据的高基数字段,例如GDELT GKG文件的V2.1 Source Common Name字段。 该文档建议它记录正在研究的新闻文章来源的通用名称,通常是从中检索新闻文章的网站的名称。 我们期望它包含域名,例如nytimes.com。

在Spark中实施生产解决方案之前,让我们在Unix命令行上对探查器进行原型设计,以提供一个可以在任何地方运行的示例:

$ cat 20150218230000. gkg. csv | gawk -F"\t"
'{print $4}' | \
sed "s/[0-9]/9/g; s/[a-z]/a/g;s/[A-Z]/A/g" |
sort | \
uniq -c | sort -r -n | head -20


232 aaaa. aaa
195 aaaaaaaaaa. aaa
186 aaaaaa. aaa
182 aaaaaaaa. aaa
168 aaaaaaa. aaa
167 aaaaaaaaaaaa. aaa
167 aaaaa. aaa
153 aaaaaaaaaaaaa. aaa
147 aaaaaaaaaaa. aaa
120 aaaaaaaaaaaaaa. aaa

输出是在“源公用名称”列中找到的记录的排序计数,以及由正则表达式(regex)生成的掩码。 查看此配置文件数据的结果应该很清楚,该字段包含域名-还是? 由于我们只查看了最常见的掩码(在本例中为前20个),因此排序列表另一端的掩码的长尾巴可能以较低的频率存在潜在的数据质量问题。

我们可以引入微妙的变化来提高蒙版功能的泛化能力,而不仅仅是查看前20个蒙版甚至不见底20个蒙版。 通过使正则表达式将多个相邻的小写字母折叠成一个字符,可以减小掩码的基数,而不会真正降低我们解释结果的能力。 我们可以对正则表达式进行很小的更改就可以对此改进进行原型设计,并希望在输出的一页中查看所有掩码:

 hdfs dfs -cat 20150218230000. gkg. csv |
\
gawk -F"\t" ' {print $4}' | sed "s/[0-9]/9/g;
s/[A-Z]/A/g; \
s/[a-z]/a/g; s/a*a/a/g"| sort | uniq -c | sort
-r -n

2356 a. a
508 a. a. a
83 a-a. a
58 a99. a
36 a999. a
24 a-9. a
21 99a. a
21 9-a. a
15 a9. a
15 999a. a
12 a9a. a
11 a99a. a
8 a-a. a. a
7 9a. a
3 a-a-a. a
2 AAA Aa <---note here the pattern that
stands out
2 9a99a. a
2 9a. a. a
1 a9. a. a
1 a. 99a. a
1 9a9a. a
1 9999a. a

很快,我们就制作了一个面具原型,将三千个左右的原始值减少到很短的22个值列表中,这些值很容易用肉眼检查。 由于长尾巴现在是短得多的尾巴,因此我们可以轻松地在此数据字段中发现任何可能代表质量问题或特殊情况的异常值。 这种检查虽然是手动的,但功能非常强大。
注意,例如,在输出中有一个特定的掩码AAA Aa,其中没有点,就像我们在域名中所期望的那样。 我们将此发现解释为意味着我们发现了两行原始数据,这些数据不是有效的域名,但可能是通用描述符。 也许这是一个错误,或者是不合逻辑的字段使用的示例,这意味着可能会有其他值滑入此列,这些值可能在逻辑上应该移到其他地方。

这值得研究,并且很容易检查这两个记录。 我们通过在原始数据旁边生成掩码,然后对有问题的掩码进行过滤以找到原始字符串以进行手动检查来实现。
与其在命令行上编写一个很长的划线员,不如使用一个用awk编写的称为bytefreq(字节频率的缩写)的遗留数据分析器检查这些记录。 它具有用于生成格式化报告,数据库就绪指标的开关,以及用于并排输出掩码和数据的开关。 我们已经为本书的读者专门提供了bytefreq的开源软件,建议您使用它来真正了解该技术的实用性:
https://bitbucket.org/bytesumo/bytefreq。

当我们检查奇数掩码A Aa时,可以看到发现的令人讨厌的文本是BBC Monitoring,并且在重新阅读GDELT文档时,我们将看到这不是错误,而是已知的特殊情况。这意味着在使用此字段时,我们必须记住要处理这种特殊情况。解决该问题的一种方法是通过包含一个更正规则,以将该字符串值交换为更有效的值,例如,有效域名www.monitor.bbc.co.uk,即文本所指向的数据源。字符串引用。
我们在这里介绍的想法是,掩码可以用作检索特定字段中违规记录的键。这种逻辑将我们带到基于掩码的配置文件的下一个主要优点:输出掩码是数据质量错误代码的一种形式。这些错误代码可以分为两类:良好掩码的白名单和用于查找质量较差数据的不良掩码的黑名单。考虑到这种方式,掩码就构成了搜索和检索数据清除方法,或者引发警报或拒绝记录的基础。
课程是,我们可以创建处理函数,以补救使用针对特定字段中的数据计算出的特定掩码发现的原始字符串。这种想法得出以下结论:我们可以围绕基于掩码的配置文件创建一个通用框架,以便在我们在数据读取管道中读取数据时进行数据质量控制和补救。这具有一些真正有利的解决方案属性:

生成数据质量掩码是一个读取过程。我们可以接受新的原始数据并将其写入磁盘,然后在读取时仅在查询时需要时才生成掩码-因此数据清理可以是一个动态过程。
然后,可以将处理功能动态地应用于针对性的修复工作,这些工作有助于在读取时清理我们的数据。
由于以前看不见的字符串被概括为掩码,因此即使以前从未见过确切的字符串,也可以将新字符串标记为存在质量问题。这种通用性有助于我们降低复杂性,简化流程并创建可重复使用的智能解决方案-甚至跨主题领域。
创建掩膜的数据项不会落入掩膜白名单,修复列表或黑名单中,可能会被隔离以引起注意;人类分析人员可以检查记录并将其列入白名单,或者创建新的处理功能,以帮助将数据从隔离区移回生产环境。
数据隔离可以简单地实现为读取过滤器,并且当创建新的补救功能以清理或修复数据时,在读取时进行的动态处理将自动将校正后的数据释放给用户,而不会造成长时间延迟。
最终,将创建一个随时间稳定的数据质量处理库。新工作主要是通过将现有处理方法映射并应用于新数据来完成的。例如,电话号码重新格式化处理功能可以在许多数据集和项目中广泛使用。

现在说明了方法和体系结构的好处,构建基于通用蒙版的探查器的要求应该更加清楚。 请注意,掩码生成过程是经典的Hadoop MapReduce过程:将输入的数据映射到掩码,并将这些掩码减少回总频率计数。 还要注意,即使在这个简短的示例中,我们也已经使用了两种类型的掩码,每种掩码都是由基础转换的管道组成的。 它建议我们需要一种工具,该工具支持预定义的遮罩库,并允许用户定义的遮罩,这些遮罩可以快速且按需创建。 它还建议应该有一些方法可以将这些遮罩堆叠起来,以将它们构建成复杂的管道。

可能还不那么明显的是,以这种方式完成的所有数据分析都可以将探查器指标写入通用的输出格式。 通过简化配置文件数据的记录,存储,检索和使用,这有助于提高代码的可重用性。
作为示例,我们应该能够使用以下模式报告所有基于掩码的探查器指标:

Metric Descriptor
Source Studied
IngestTime
MaskType
FieldName
Occurrence Count
KeyCount
MaskCount
Description

一旦以这种单一模式格式捕获了指标,我们就可以使用用户界面(例如Zeppelin笔记本)构建辅助报告。
在我们逐步实现这些功能之前,需要对字符类掩码进行介绍,因为它们与常规配置掩码略有不同。

引入角色类面具
我们还可以应用另一种简单的数据配置文件类型,以帮助进行文件检查。它涉及分析构成整个文件的实际字节。这是一种古老的方法,最初是来自密码学的一种方法,该方法使用文本中字母的频率分析来获得解密替换码的优势。
尽管在当今的数据科学界还不是一种普遍的技术,但是在需要时字节级分析却非常有用。过去,数据编码是一个大问题。文件通过ASCII和EBCDIC标准的一系列代码页进行编码。字节频率报告通常对于发现文件中使用的实际编码,定界符和行尾至关重要。回到那时,可以创建文件但不能从技术上描述文件的人数令人惊讶。如今,随着世界越来越多地使用基于Unicode的字符编码,这些旧方法需要更新。在Unicode中,字节的概念被现代化为多字节代码点,
可以使用以下功能在Scala中显示:

 

 

标签:Science,可以,使用,Mastering,掩码,Spark,数据,我们
来源: https://blog.51cto.com/u_13887992/2733340