在云服务器中构建Spark数据管道 - 新手必备
作者:互联网
批量数据管道 101
提取、转换、加载
批量数据管道通常执行一个或多个 ETL 步骤。
每一步都遵循以下模式:
Extract — 从某个位置加载数据(例如 S3)
转换——执行聚合、过滤、应用 UDF 等。
加载 — 将输出写入某个位置(例如 S3 上的另一条路径)
有时,管道被建模为此类步骤的 DAG。
增量运行和时间概念
大多数批处理管道旨在增量处理数据——即:
阅读“新数据”
处理它
将输出与之前运行的结果合并
定义“新数据”
大多数(但不是全部)批处理作业按计划运行,例如每天运行。因此,大多数团队首先将“新”数据定义为:
“每天,对前一天的数据运行作业”。
换句话说,管道的每次运行都负责处理一个(固定的)时间窗口的数据。
这种方法被称为“滚动窗口”:具有固定大小且它们之间没有重叠的连续时间框架。
挂钟时间与事件时间
一个天真的解决方案不起作用,用于处理带有管道的时间窗口是:
触发管道每 24 小时运行一次
让管道代码使用“ wall time ”——例如now() 的值减去 24 小时作为它需要处理的时间窗口
这不起作用的原因是例如管道在周末发生故障并且现在您需要对 72 小时前的数据运行它的情况。
此外,大多数分析关注的是事件在实践中发生的时间,而不是我们收到事件发生的消息或开始对事件进行推理的时间(尽管在某些情况下,这些也很重要)。
因此,更好的方法是根据“事件时间”定义每次运行的时间窗口——即造成数据变化的真实事件发生的时间。
这意味着作业的每次运行都分配了一个固定的时间窗口,该时间窗口不直接依赖于作业实际运行的时间。
(注意——在某些用例中,例如机器学习,这种方法可能需要改进,因为它会产生训练测试偏差)。
使用调度程序创建每个时间窗口的管道运行
在时间窗口上运行管道的一种常见方法是使用通用调度程序(例如 Airflow、Prefect 等)
调度程序配置为触发管道,例如每天某个时间
每个触发器在逻辑上与需要处理的不同事件时间窗口相关联(例如,前一个日历日期)。
每次调度程序触发管道运行时,它应该将触发时间传递给它
该代码使用触发时间来确定它需要处理的时间窗口。
如果我们的调度程序比预期晚醒来,比如在 0401,它仍然应该将“正确”的触发时间传递给管道
如果触发器运行失败,则应使用确切的原始触发器时间作为参数重新运行管道
如果调度器本身宕机了,它应该注意创建当它宕机时遗漏的触发器
因此,调度程序和管道代码之间存在很多微妙之处和隐式契约,以正确管理时间窗口。
此外,每个调度程序对这些合约的解释都略有不同。
以下是来自Airflow和Prefect以及Databricks的一些示例。
其中一个结论是流水线需要能够接受一个表示调度器触发时间的参数,并根据该值计算出时间窗口。
延迟数据和水印
不幸的是,数据有迟到的令人恼火的趋势。
假设我们想在整个日历日的“真实世界”数据上运行我们的批处理。
如果我们的调度程序恰好在午夜触发,则可能前一天的一些数据仍在写入,并且会被遗漏。
一种简单的方法是为我们的调度时间添加一个“缓冲区”——例如,将触发器设置为在 0400 触发,以便处理前一个日期的数据。
缓冲区大小可以猜测甚至凭经验确定。
有几点值得注意:
固定缓冲区并不能保证避免延迟数据,它只是一种风险管理技术
我们采用的缓冲区大小(即数据完整性的置信度)与结果的新鲜度之间存在明显的权衡
如果您的管道处理另一个批处理过程的输出,您通常更容易受到延迟(例如,当上游批处理失败时)
碰巧晚于缓冲区到达的数据很难被注意到,除非你主动采取一些措施
事实证明,管理延迟数据和决定何时以最小风险处理数据是安全的是一个相当复杂的问题——尤其是在结果需要最新的系统中。
为了解决新鲜度和完整性之间的权衡,您需要考虑水印的概念——即我们可以假设的大概最晚时间已经足够完整以便进行处理——以及如何检测和响应确实发生较旧数据的情况以比预期更大的延迟到达。
总而言之——对于大多数不经常运行(每天/每周)的批处理作业,在计划中使用缓冲区是一种简单的启发式方法,用于处理数据中的某些延迟。
提取、转换、加载
批量数据管道通常执行一个或多个 ETL 步骤。
每一步都遵循以下模式:
Extract — 从某个位置加载数据(例如 S3)
转换——执行聚合、过滤、应用 UDF 等。
加载 — 将输出写入某个位置(例如 S3 上的另一条路径)
有时,管道被建模为此类步骤的 DAG。
增量运行和时间概念
大多数批处理管道旨在增量处理数据——即:
阅读“新数据”
处理它
将输出与之前运行的结果合并
定义“新数据”
大多数(但不是全部)批处理作业按计划运行,例如每天运行。因此,大多数团队首先将“新”数据定义为:
“每天,对前一天的数据运行作业”。
换句话说,管道的每次运行都负责处理一个(固定的)时间窗口的数据。
这种方法被称为“滚动窗口”:具有固定大小且它们之间没有重叠的连续时间框架。
挂钟时间与事件时间
一个天真的解决方案不起作用,用于处理带有管道的时间窗口是:
触发管道每 24 小时运行一次
让管道代码使用“ wall time ”——例如now() 的值减去 24 小时作为它需要处理的时间窗口
这不起作用的原因是例如管道在周末发生故障并且现在您需要对 72 小时前的数据运行它的情况。
此外,大多数分析关注的是事件在实践中发生的时间,而不是我们收到事件发生的消息或开始对事件进行推理的时间(尽管在某些情况下,这些也很重要)。
因此,更好的方法是根据“事件时间”定义每次运行的时间窗口——即造成数据变化的真实事件发生的时间。
这意味着作业的每次运行都分配了一个固定的时间窗口,该时间窗口不直接依赖于作业实际运行的时间。
(注意——在某些用例中,例如机器学习,这种方法可能需要改进,因为它会产生训练测试偏差)。
使用调度程序创建每个时间窗口的管道运行
在时间窗口上运行管道的一种常见方法是使用通用调度程序(例如 Airflow、Prefect 等)
调度程序配置为触发管道,例如每天某个时间
每个触发器在逻辑上与需要处理的不同事件时间窗口相关联(例如,前一个日历日期)。
每次调度程序触发管道运行时,它应该将触发时间传递给它
该代码使用触发时间来确定它需要处理的时间窗口。
如果我们的调度程序比预期晚醒来,比如在 0401,它仍然应该将“正确”的触发时间传递给管道
如果触发器运行失败,则应使用确切的原始触发器时间作为参数重新运行管道
如果调度器本身宕机了,它应该注意创建当它宕机时遗漏的触发器
因此,调度程序和管道代码之间存在很多微妙之处和隐式契约,以正确管理时间窗口。
此外,每个调度程序对这些合约的解释都略有不同。
以下是来自Airflow和Prefect以及Databricks的一些示例。
其中一个结论是流水线需要能够接受一个表示调度器触发时间的参数,并根据该值计算出时间窗口。
延迟数据和水印
不幸的是,数据有迟到的令人恼火的趋势。
假设我们想在整个日历日的“真实世界”数据上运行我们的批处理。
如果我们的调度程序恰好在午夜触发,则可能前一天的一些数据仍在写入,并且会被遗漏。
一种简单的方法是为我们的调度时间添加一个“缓冲区”——例如,将触发器设置为在 0400 触发,以便处理前一个日期的数据。
缓冲区大小可以猜测甚至凭经验确定。
有几点值得注意:
固定缓冲区并不能保证避免延迟数据,它只是一种风险管理技术
我们采用的缓冲区大小(即数据完整性的置信度)与结果的新鲜度之间存在明显的权衡
如果您的管道处理另一个批处理过程的输出,您通常更容易受到延迟(例如,当上游批处理失败时)
碰巧晚于缓冲区到达的数据很难被注意到,除非你主动采取一些措施
事实证明,管理延迟数据和决定何时以最小风险处理数据是安全的是一个相当复杂的问题——尤其是在结果需要最新的系统中。
为了解决新鲜度和完整性之间的权衡,您需要考虑水印的概念——即我们可以假设的大概最晚时间已经足够完整以便进行处理——以及如何检测和响应确实发生较旧数据的情况以比预期更大的延迟到达。
总而言之——对于大多数不经常运行(每天/每周)的批处理作业,在计划中使用缓冲区是一种简单的启发式方法,用于处理数据中的某些延迟。