其他分享
首页 > 其他分享> > Apache NiFi开发人员指南

Apache NiFi开发人员指南

作者:互联网

 

 

介绍

NiFi组件

Processor API

支持API

AbstractProcessor API

组件生命周期

组件通知

受限

州经理

报告处理器活动

记录组件

记录属性

记录关系

记录能力和关键词

记录FlowFile属性交互

高级文档

种源事件

通用处理器模式

数据入口

数据出口

基于内容的路由(一对一)

基于内容的路由(一对多)

基于内容的路由流(一对多)

基于属性的路由

拆分内容(一对多)

根据内容更新属性

丰富/修改内容

错误处理

处理器中的例外情况

回调中的异常:IOException,RuntimeException

惩罚与屈服

会话回滚

一般设计考虑因素

考虑用户

凝聚力和可重用性

命名约定

处理器行为注释

数据缓冲

控制器服务

开发ControllerService

与ControllerService交互

报告任务

开发报告任务

UI扩展

自定义处理器UI

内容浏览者

命令行工具

TLS-工具包

测试

实例化TestRunner

添加ControllerServices

设置属性值

排队FlowFiles

运行处理器

验证输出

模拟外部资源

附加测试功能

NiFi档案馆(NAR)

每实例类加载

弃用组件

如何为Apache NiFi做出贡献

技术

从哪儿开始?

提供捐款

联系我们


介绍

本开发人员指南旨在为读者提供了解Apache NiFi扩展如何开发所需的信息,并帮助解释开发组件背后的思维过程。它提供了用于开发扩展的API的介绍和说明。但是,它没有详细介绍API中的每个方法,因为本指南旨在补充API的JavaDoc而不是替换它们。本指南还假定读者熟悉Java 7和Apache Maven。

本指南由开发人员为开发人员编写。在阅读本指南之前,您需要对NiFi和数据流概念有基本的了解。如果没有,请参阅NiFi概述 和NiFi用户指南,以熟悉NiFi的概念。

NiFi组件

NiFi提供了几个扩展点,使开发人员能够为应用程序添加功能以满足他们的需求。以下列表提供了最常见扩展点的高级描述:

Processor API

处理器是NiFi中使用最广泛的组件。处理器是唯一可以访问以创建,删除,修改或检查FlowFiles(数据和属性)的组件。

使用Java的ServiceLoader机制加载和实例化所有处理器。这意味着所有处理器必须遵守以下规则:

虽然Processor是一个可以直接实现的接口,但这样做非常罕见,因为它org.apache.nifi.processor.AbstractProcessor是几乎所有处理器实现的基类。AbstractProcessor类提供的功能的显著,这使得开发的处理器更容易,更方便的任务。对于本文档的范围,我们将主要关注AbstractProcessor处理Processor API时的类。

并发注意事项

NiFi是一个高度并发的框架。这意味着所有扩展必须是线程安全的。如果不熟悉用Java编写并发软件,强烈建议您熟悉Java并发原理。

支持API

为了理解处理器API,我们必须首先理解 - 至少在高层次 - 几个支持类和接口,这将在下面讨论。

FlowFile

FlowFile是一种逻辑概念,它将一段数据与一组关于该数据的属性相关联。这些属性包括FlowFile的唯一标识符,以及其名称,大小和任何数量的其他特定于流的值。虽然FlowFile的内容和属性可以更改,但FlowFile对象是不可变的。ProcessSession可以对FlowFile进行修改。

FlowFiles的核心属性在org.apache.nifi.flowfile.attributes.CoreAttributes枚举中定义。您将看到的最常见属性是filename,path和uuid。引号中的字符串是CoreAttributes枚举中属性的值。

ProcessSession

ProcessSession通常简称为“会话”,它提供了一种机制,通过该机制可以创建,销毁,检查,克隆FlowFiles并将其传输到其他处理器。此外,ProcessSession还提供了通过添加或删除属性或修改FlowFile内容来创建FlowFiles的修改版本的机制。ProcessSession还公开了一种用于发布源代码事件的机制,该机制提供了跟踪FlowFile的沿袭和历史的能力。在一个或多个FlowFiles上执行操作后,可以提交或回滚ProcessSession。

ProcessContext

ProcessContext提供了处理器和框架之间的桥梁。它提供有关处理器当前如何配置的信息,并允许处理器执行特定于Framework的任务,例如产生其资源,以便框架将安排其他处理器运行而不会不必要地消耗资源。

PropertyDescriptor

PropertyDescriptor定义将由Processor,ReportingTask或ControllerService使用的属性。属性的定义包括其名称,属性的描述,可选的默认值,验证逻辑,以及关于处理器是否有效所需的属性的指示符。PropertyDescriptors是通过实例化PropertyDescriptor.Builder 类的实例,调用适当的方法来填充有关属性的详细信息,最后调用该build方法来创建的。

验证器(Validator)

PropertyDescriptor必须指定一个或多个Validator,可用于确保用户输入的属性值有效。如果Validator指示属性值无效,则在属性生效之前,将无法运行或使用Component。如果未指定Validator,则假定Component无效,NiFi将报告该属性不受支持。

ValidationContext

验证属性值时,ValidationContext可用于获取ControllerServices,创建PropertyValue对象,以及使用表达式语言编译和评估属性值。

PropertyValue

返回到处理器的所有属性值都以PropertyValue对象的形式返回。此对象具有便捷方法,用于将值从String转换为其他形式(如数字和时间段),以及提供用于评估表达式语言的API。

关联(Relationship)

关系定义FlowFile可以从处理器传输到的路由。通过实例化Relationship.Builder 类的实例,调用适当的方法来填充关系的细节,最后调用 build方法来创建关系。

StateManager

StateManager为处理器,报告任务和控制器服务提供了一种轻松存储和检索状态的机制。API类似于ConcurrentHashMap,但每个操作都需要一个Scope。范围指示是在本地检索/存储状态还是以群集范围的方式存储状态。有关更多信息,请参阅“ 状态管理器”部分。

ProcessorInitializationContext

创建处理器后,initialize将使用InitializationContext对象调用其方法。此对象向处理器公开配置,该配置在处理器的整个生命周期内不会更改,例如处理器的唯一标识符。

ComponentLog

鼓励处理器通过ComponentLog接口执行日志记录 ,而不是获取第三方记录器的直接实例。这是因为通过ComponentLog进行日志记录允许框架将超出可配置严重性级别的日志消息呈现给用户界面,从而允许在发生重要事件时通知监视数据流的人员。此外,它通过在DEBUG模式下记录堆栈跟踪并在日志消息中提供处理器的唯一标识符,为所有处理器提供一致的日志记录格式。

AbstractProcessor API

由于绝大多数处理器都是通过扩展AbstractProcessor来创建的,因此我们将在本节中讨论它的抽象类。AbstractProcessor提供了处理器开发人员感兴趣的几种方法。

处理器初始化(Processor Initialization)

创建处理器时,在调用任何其他方法之前,init将调用AbstractProcessor 的 方法。该方法采用单个参数,即类型ProcessorInitializationContext。上下文对象为Processor提供ComponentLog,Processor的唯一标识符和ControllerServiceLookup,可用于与配置的ControllerServices交互。每个这样的对象是由AbstractProcessor存储,并且可以由子类经由获得getLoggergetIdentifier和 getControllerServiceLookup方法,分别。

揭露处理器的关系(Exposing Processor’s Relationships)

为了使处理器将FlowFile传输到新目的地以进行后续处理,处理器必须首先能够向框架公开它当前支持的所有关系。这允许应用程序的用户通过在处理器之间创建连接并将适当的关系分配给这些连接来将处理器彼此连接。

处理器通过覆盖该getRelationships方法来公开有效的关系集 。这个方法没有参数,并返回SetRelationship 对象。对于大多数处理器,此Set将是静态的,但其他处理器将根据用户配置动态生成Set。对于Set为静态的那些处理器,建议在Processor的构造函数或init方法中创建一个不可变的Set并返回该值,而不是动态生成Set。这种模式有助于实现更清晰的代码和更好的性能。

公开处理器属性(Exposing Processor Properties)

大多数处理器在能够使用之前需要一些用户配置。处理器支持的属性通过该getSupportedPropertyDescriptors方法向Framework公开 。这个方法没有参数,并返回List的 PropertyDescriptor对象。List中对象的顺序很重要,因为它决定了在用户界面中呈现属性的顺序。

PropertyDescriptor目的是通过创建一个新的实例构造PropertyDescriptor.Builder对象,调用构建器的适当的方法,并最终调用build方法。

虽然此方法涵盖了大多数用例,但有时需要允许用户配置名称未知的其他属性。这可以通过覆盖该getSupportedDynamicPropertyDescriptor方法来实现 。此方法将 String唯一参数作为参数,该参数指示属性的名称。该方法返回一个PropertyDescriptor对象,该 对象可用于验证属性的名称以及值。应该构建从此方法返回的任何PropertyDescriptor,isDynamicPropertyDescriptor.Builder类中将值设置为true 。AbstractProcessor的默认行为是不允许任何动态创建的属性。

验证处理器属性( Validating Processor Properties )

如果处理器的配置无效,则无法启动处理器。可以通过在PropertyDescriptor上设置Validator或通过PropertyDescriptor.Builder的allowableValues方法或identifiesControllerService方法限制属性的允许值来验证Processor属性。

但是,有时候单独验证处理器的属性是不够的。为此,AbstractProcessor公开了一个customValidate方法。该方法采用单个参数类型ValidationContext。此方法的返回值是描述验证期间发现的任何问题Collection的 ValidationResult对象。只应返回其isValid方法返回的ValidationResult对象 false。仅当所有属性根据其关联的Validators和Allowable Values有效时,才会调用此方法。即,只有当所有属性本身都有效时才会调用此方法,并且此方法允许整体验证处理器的配置。

响应配置更改(Responding to Changes in Configuration)

有时希望处理器在其属性改变时急切地做出反应。该onPropertyModified 方法允许处理器执行此操作。当用户更改Processor的属性值时,onPropertyModified将为每个已修改的属性调用该 方法。该方法有三个参数:PropertyDescriptor,它指示修改了哪个属性,旧值和新值。如果属性没有先前的值,则第二个参数将是null。如果删除了属性,则第三个参数将是null。重要的是要注意,无论值是否有效,都将调用此方法。只有在实际修改了值时才会调用此方法,而不是在用户更新处理器而不更改其值时调用此方法。在调用此方法时,保证调用此方法的线程是当前在Processor中执行代码的唯一线程,除非Processor本身创建自己的线程。

执行工作(Performing the Work)

当处理器有工作要做时,它计划onTrigger通过框架调用其方法来完成。该方法有两个参数:a ProcessContext和aProcessSession。该onTrigger方法的第一步通常是通过调用getProcessSession上的一个方法来获取要在其上执行工作的FlowFile 。对于从外部源将数据提取到NiFi的处理器,将跳过此步骤。然后,处理器可以自由检查FlowFile属性; 添加,删除或修改属性; 读取或修改FlowFile内容; 并将FlowFiles传输到适当的关系。

处理器被触发时(When Processors are Triggered)

onTrigger只有在计划运行处理器并且处理器存在工作时,才会调用处理器的方法。如果满足以下任何条件,则称处理器存在工作:

有几个因素会导致onTrigger调用Processor的 方法。首先,除非用户已将处理器配置为运行,否则不会触发处理器。如果计划运行处理器,则周期性地(该周期由用户界面中的用户配置)检查处理器是否有工作,如上所述。如果是这样,框架将检查处理器的下游目的地。如果处理器的任何出站连接已满,则默认情况下,将不会安排处理器运行。

但是,@TriggerWhenAnyDestinationAvailable注释可以添加到Processor的类中。在这种情况下,需求被更改,以便只有一个下游目标必须“可用”(如果连接的队列未满,则目标被视为“可用”),而不是要求所有下游目标都可用。

与处理器调度有关的还有@TriggerSerially 注释。使用此Annotation的处理器永远不会有多个线程onTrigger同时运行该方法。但是,必须注意,执行代码的线程可能会从调用更改为调用。因此,仍然必须注意确保处理器是线程安全的!

组件生命周期(Component Lifecycle)

NiFi API通过使用Java Annotations提供生命周期支持。该org.apache.nifi.annotations.lifecycle软件包包含几个生命周期管理注释。以下注释可以应用于NiFi组件中的Java方法,以指示何时应该调用方法的框架。对于组件生命周期的讨论,我们将NiFi组件定义为Processor,ControllerServices或ReportingTask。

@OnAdded

@OnAdded注解导致要尽快一个组件被创建调用的方法。在构造组件之后,将调用组件的initialize方法(或init方法,如果是子类 AbstractProcessor),后跟注释的方法@OnAdded。如果任何带有@OnAdded抛出异常的方法抛出异常,则会向用户返回错误,并且该组件将不会添加到流中。此外,不会调用具有此Annotation的其他方法。该方法仅在组件的生命周期内调用一次。使用此Annotation的方法必须采用零参数。

@OnEnabled

所述@OnEnabled注释可以被用来指示每当使能控制器服务的方法应该被调用。每次用户启用服务时,都会调用具有此批注的任何方法。此外,每次重新启动NiFi时,如果NiFi配置为“自动恢复状态”并且启用了服务,则将调用该方法。

如果带有此批注的方法抛出Throwable,则将为该组件发出日志消息和公告。在这种情况下,服务将保持“启用”状态,不可用。然后,在延迟之后将再次调用具有此注释的所有方法。在具有此注释的所有方法返回而不丢弃任何内容之前,该服务将不可用。

使用此批注的方法必须采用0参数或单个参数类型org.apache.nifi.controller.ConfigurationContext

请注意,如果应用于ReportingTask或Processor,则将忽略此批注。对于Controller Service,启用和禁用被视为生命周期事件,因为该操作使其可用或不可用于其他组件。但是,对于处理器和报告任务,这些不是生命周期事件,而是允许在启动或停止一组组件时排除组件的机制。

@OnRemoved

@OnRemoved注释将使得前一组分从流中除去要被调用的方法。这样可以在删除组件之前清除资源。使用此批注的方法必须采用零参数。如果带有此批注的方法抛出异常,则仍将删除该组件。

@OnScheduled

此批注指示每次调度组件运行时都应调用方法。由于未调度ControllerServices,因此在ControllerService上使用此批注没有意义,也不会受到尊重。它应仅用于处理器和报告任务。如果具有此批注的任何方法抛出异常,则不会调用具有此批注的其他方法,并且将向用户显示通知。在这种情况下, @OnUnscheduled然后触发带@OnStopped注释的方法,然后是带有注释的方法 (在此状态期间,如果这些方法中的任何一个抛出异常,则忽略这些异常)。然后该组件将在一段时间内执行,称为“管理产量持续时间”。nifi.properties文件。最后,该过程将再次启动,直到所有注释的方法 @OnScheduled都返回而不抛出任何异常。具有此批注的方法可以采用零参数或可以采用单个参数。如果使用单个参数变体,则ProcessContext如果组件是处理器或ConfigurationContext组件是ReportingTask ,则参数必须是类型。

@OnUnscheduled

只要不再计划运行Processor或ReportingTask,就会调用带有此批注的方法。那时,许多线程可能仍然在处理器的onTrigger方法中处于活动状态。如果此类方法抛出异常,则将生成日志消息,否则将忽略异常,并且仍将调用具有此批注的其他方法。具有此批注的方法可以采用零参数或可以采用单个参数。如果使用单个参数变体,则ProcessContext如果组件是处理器或ConfigurationContext组件是ReportingTask ,则参数必须是类型 。

@OnStopped

当不再计划运行Processor或ReportingTask并且从onTrigger方法返回所有线程时,将调用具有此批注的方法。如果这样的方法抛出异常,将生成一条日志消息,否则将忽略异常; 仍将调用具有此批注的其他方法。允许使用此注释的方法采用0或1参数。如果使用了参数,则如果组件是ReportingTask,则它必须是ConfigurationContext类型;如果组件是Processor,则它必须是ProcessContext类型。

@OnShutdown

@OnShutdown当NiFi成功关闭时,将调用任何使用注释注释的方法。如果此类方法抛出异常,则将生成日志消息,否则将忽略异常,并且仍将调用具有此批注的其他方法。使用此批注的方法必须采用零参数。注意:虽然NiFi将尝试在使用它的所有组件上调用带有此注释的方法,但这并不总是可行的。例如,进程可能会意外终止,在这种情况下,它没有机会调用这些方法。因此,虽然使用此注释的方法可用于清理资源,但是,不应依赖它们来处理关键业务逻辑。

组件通知( Component Notification )

NiFi API通过使用Java Annotations提供通知支持。该org.apache.nifi.annotations.notification软件包包含几个用于通知管理的注释。以下注释可以应用于NiFi组件中的Java方法,以向框架指示何时应该调用方法。对于组件通知的讨论,我们将NiFi组件定义为处理器,控制器服务或报告任务。

@OnPrimaryNodeStateChange

@OnPrimaryNodeStateChange一旦群集中的主节点的状态发生更改,注释就会调用方法。带有此注释的方法应该不带参数或类型的一个参数PrimaryNodeState。该PrimaryNodeState提供什么改变,使得组件可以采取适当的行动内容。该PrimaryNodeState枚举有两个可能的值: ELECTED_PRIMARY_NODE(节点接收到这个状态已经当选NiFi集群的主节点),或PRIMARY_NODE_REVOKED(接收到这个状态的节点是主节点,但现在已经有其主节点角色撤销)。

受限( Restricted )

受限组件是可用于执行操作员通过NiFi REST API / UI提供的任意未经过抽样的代码的组件,或者可用于使用NiFi OS凭证获取或更改NiFi主机系统上的数据。这些组件可由其他授权的NiFi用户使用,超出应用程序的预期用途,升级特权,或者可能暴露有关NiFi进程或主机系统内部的数据。所有这些功能都应被视为特权,管理员应了解这些功能,并为可信用户的子集明确启用它们。

可以使用@Restricted注释标记处理器,控制器服务或报告任务。这将导致组件被视为受限制,并且需要将用户显式添加到可以访问受限组件的用户列表中。一旦允许用户访问受限制的组件,就可以允许他们创建和修改这些组件,前提是允许所有其他权限。如果不访问受限制的组件,用户仍然会知道存在这些类型的组件,但即使有足够的权限,也无法创建或修改它们。

状态管理(State Manager)

从ProcessContext,ReportingContext和ControllerServiceInitializationContext,组件能够调用该getStateManager()方法。此状态管理器负责提供用于存储和检索状态的简单API。此机制旨在使开发人员能够非常轻松地存储一组键/值对,检索这些值并以原子方式更新它们。状态可以存储在节点的本地,也可以存储在集群中的所有节点上。然而,重要的是要注意,该机制仅用于提供存储非常“简单”状态的机制。因此,API只允许aMap<String, String>存储和检索,以及原子地替换整个Map。此外,ZooKeeper支持当前支持存储群集范围状态的唯一实现。因此,序列化后,整个State Map的大小必须小于1 MB。尝试存储超过此数量将导致抛出异常。如果处理器管理状态所需的交互比这更复杂(例如,必须存储和检索大量数据,或者必须单独存储和提取单个密钥),则应使用不同的机制(例如,与之通信)外部数据库)。

范围(Scope)

与状态管理器通信时,所有方法调用都要求提供范围。本范围将是Scope.LOCALScope.CLUSTER。如果NiFi在群集中运行,则此Scope为框架提供有关如何进行操作的重要信息。

如果使用存储状态Scope.CLUSTER,则集群中的所有节点将与相同的状态存储机制进行通信。如果使用存储和检索状态Scope.LOCAL,则每个节点将看到状态的不同表示。

还值得注意的是,如果将NiFi配置为作为独立实例运行,而不是在群集中运行,Scope.LOCAL则始终使用范围。这样做是为了允许NiFi组件的开发者以一致的方式编写代码,而不用担心NiFi实例是否是群集的。开发人员应该假设该实例是集群的并相应地编写代码。

存储和检索状态(Storing and Retrieving State)

国家正在使用StateManager的存储getStatesetStatereplace,和clear方法。所有这些方法都要求提供范围。应该注意,与Local作用域一起存储的状态与使用Cluster作用域存储的状态完全不同。如果处理器使用作用域使用My Key的键存储值Scope.CLUSTER,然后尝试使用Scope.LOCAL作用域检索该值,则检索的值将是null(除非使用Scope.CLUSTER作用域同时存储了一个值)。每个处理器的状态与其他处理器的状态隔离存储。

因此,两个处理器不能共享相同的状态。但是,在某些情况下,非常有必要在两个不同类型的处理器或两个相同类型的处理器之间共享状态。这可以通过使用Controller服务来完成。通过从Controller Service存储和检索状态,多个处理器可以使用相同的Controller Service,并且可以通过Controller Service的API公开状态。

单元测试(Unit Tests)

NiFi的Mock Framework提供了大量工具来执行处理器的单元测试。处理器单元测试通常从TestRunner课程开始。因此,TestRunner该类包含getStateManager自己的方法。但是,返回的StateManager具有特定类型:MockStateManager。除了StateManager接口定义的方法之外,此实现还提供了几种方法,可帮助开发人员更轻松地开发单元测试。

首先,MockStateManager实现StateManager接口,因此可以在单元测试中检查所有状态。此外,MockStateManager公开了一些assert*方法来执行状态按预期设置的断言。MockStateManager如果针对特定情况更新状态,还提供指示单元测试应立即失败的能力 Scope

报告处理器活动(Reporting Processor Activity)

处理器负责报告其活动,以便用户能够了解其数据发生了什么。处理器应该通过ComponentLog记录事件,ComponentLog可以通过InitializationContext或通过调用getLogger方法来访问AbstractProcessor

另外,处理器应该使用ProvenanceReporter 通过ProcessSession获得的接口 getProvenanceReporter方法。ProvenanceReporter应用于指示从外部源接收内容或发送到外部位置的任何时间。ProvenanceReporter还具有报告何时克隆,分叉或修改FlowFile,以及将多个FlowFile合并到单个FlowFile以及将FlowFile与其他标识符相关联时的报告方法。但是,这些功能对于报告来说不那么重要,因为框架能够检测这些功能并代表处理器发出适当的事件。然而,处理器开发人员发布这些事件是一种最佳实践,因为它在代码中显式地发出了这些事件,并且开发人员能够为事件提供其他详细信息,例如该行动采取了有关所采取行动的相关信息。如果处理器发出事件,则框架不会发出重复事件。相反,它总是假设处理器开发人员比框架更好地了解处理器上下文中发生的事情。但是,该框架可能会发出不同的事件。例如,如果处理器修改FlowFile的内容及其属性,然后仅发出ATTRIBUTES_MODIFIED事件,则框架将发出CONTENT_MODIFIED事件。如果为该FlowFile(由处理器或框架)发出任何其他事件,框架将不会发出ATTRIBUTES_MODIFIED事件。这是因为所有的事实 它总是假设处理器开发人员比框架更好地了解处理器上下文中发生的事情。但是,该框架可能会发出不同的事件。例如,如果处理器修改FlowFile的内容及其属性,然后仅发出ATTRIBUTES_MODIFIED事件,则框架将发出CONTENT_MODIFIED事件。如果为该FlowFile(由处理器或框架)发出任何其他事件,框架将不会发出ATTRIBUTES_MODIFIED事件。这是因为所有的事实 它总是假设处理器开发人员比框架更好地了解处理器上下文中发生的事情。但是,该框架可能会发出不同的事件。例如,如果处理器修改FlowFile的内容及其属性,然后仅发出ATTRIBUTES_MODIFIED事件,则框架将发出CONTENT_MODIFIED事件。如果为该FlowFile(由处理器或框架)发出任何其他事件,框架将不会发出ATTRIBUTES_MODIFIED事件。这是因为所有的事实 如果为该FlowFile(由处理器或框架)发出任何其他事件,框架将不会发出ATTRIBUTES_MODIFIED事件。这是因为所有的事实 如果为该FlowFile(由处理器或框架)发出任何其他事件,框架将不会发出ATTRIBUTES_MODIFIED事件。这是因为所有的事实 Provenance Events了解事件发生之前FlowFile的属性以及由于FlowFile处理而发生的属性,因此ATTRIBUTES_MODIFIED通常被认为是冗余的,并且会导致FlowFile谱系的呈现非常冗长。但是,如果从处理器的角度来看事件被认为是相关的,则处理器可以与其他事件一起发出此事件。

记录组件

NiFi通过用户界面从NiFi应用程序本身向用户提供大量文档,试图使用户体验尽可能简单方便。当然,为了实现这一点,处理器开发人员必须向框架提供该文档。NiFi提供了一些不同的机制来为框架提供文档。

记录属性

可以通过调用description PropertyDescriptor构建器的方法来记录单个属性:

  1.   public static final PropertyDescriptor MY_PROPERTY = new PropertyDescriptor.Builder()
  2.   .name("My Property")
  3.   .description("Description of the Property")
  4.   ...
  5.   .build();

如果属性要提供一组允许值,那么这些值将在UI的下拉字段中显示给用户。这些值中的每一个也可以给出描述:

  1.   public static final AllowableValue EXTENSIVE = new AllowableValue("Extensive", "Extensive",
  2.   "Everything will be logged - use with caution!");
  3.   public static final AllowableValue VERBOSE = new AllowableValue("Verbose", "Verbose",
  4.   "Quite a bit of logging will occur");
  5.   public static final AllowableValue REGULAR = new AllowableValue("Regular", "Regular",
  6.   "Typical logging will occur");
  7.    
  8.   public static final PropertyDescriptor LOG_LEVEL = new PropertyDescriptor.Builder()
  9.   .name("Amount to Log")
  10.   .description("How much the Processor should log")
  11.   .allowableValues(REGULAR, VERBOSE, EXTENSIVE)
  12.   .defaultValue(REGULAR.getValue())
  13.   ...
  14.   .build();

记录关系(Documenting Relationships)

处理器关系的记录方式与属性大致相同 - 通过调用descriptionRelationship的构建器的方法:

  1.   public static final Relationship MY_RELATIONSHIP = new Relationship.Builder()
  2.   .name("My Relationship")
  3.   .description("This relationship is used only if the Processor fails to process the data.")
  4.   .build();

记录能力和关键词(Documenting Capability and Keywords)

org.apache.nifi.annotations.documentation包提供了可用于记录组件的Java注释。CapabilityDescription注释可以添加到处理器,报告任务或控制器服务中,旨在提供组件提供的功能的简要说明。标签注释有一个value被定义为字符串数组的变量。因此,它通过提供多个值作为逗号分隔的带有花括号的字符串列表来使用。然后,通过允许用户基于标签(即关键字)过滤组件,将这些值合并到UI中。此外,UI提供了一个标签云,允许用户选择他们想要过滤的标签。云中最大的标签是那些在NiFi实例中的组件上最多的标签。下面提供了使用这些注释的示例:

  1.   @Tags({"example", "documentation", "developer guide", "processor", "tags"})
  2.   @CapabilityDescription("Example Processor that provides no real functionality but is provided" +
  3.   " for an example in the Developer Guide")
  4.   public static final ExampleProcessor extends Processor {
  5.   ...
  6.   }

记录FlowFile属性交互

很多时候,处理器会期望在入站的FlowFiles中设置某些FlowFile属性,以使处理器正常运行。在其他情况下,处理器可以在出站的FlowFile上更新或创建FlowFile属性。处理器开发人员可以使用ReadsAttributeWritesAttribute文档注释记录这两种行为。这些属性用于生成文档,使用户可以更好地了解处理器如何与流进行交互。

注意:由于Java 7不支持对类型重复注释,因此您可能需要使用ReadsAttributesWritesAttributes指示处理器读取或写入多个FlowFile属性。此批注只能应用于处理器。下面列出了一个例子:

  1.   @WritesAttributes({ @WritesAttribute(attribute = "invokehttp.status.code", description = "The status code that is returned"),
  2.   @WritesAttribute(attribute = "invokehttp.status.message", description = "The status message that is returned"),
  3.   @WritesAttribute(attribute = "invokehttp.response.body", description = "The response body"),
  4.   @WritesAttribute(attribute = "invokehttp.request.url", description = "The request URL"),
  5.   @WritesAttribute(attribute = "invokehttp.tx.id", description = "The transaction ID that is returned after reading the response"),
  6.   @WritesAttribute(attribute = "invokehttp.remote.dn", description = "The DN of the remote server") })
  7.   public final class InvokeHTTP extends AbstractProcessor {

通常,处理器和控制器服务彼此相关。有时它是PutFile和/中的put / get关系GetFile。有时,处理器使用类似InvokeHTTP和的ControllerService StandardSSLContextService。有时一个ControllerService使用另一个DistributedMapCacheClientServiceDistributedMapCacheServer。这些扩展点的开发者可以使用SeeAlso标签来关联这些不同的组件。此注释将文档中的这些组件链接起来。 SeeAlso可以应用于处理器,ControllerServices和ReportingTasks。下面列出了如何执行此操作的示例:

  1.   @SeeAlso(GetFile.class)
  2.   public class PutFile extends AbstractProcessor {

高级文档

当上述文档方法不足时,NiFi可以通过“使用”文档向用户公开更高级的文档。当用户右键单击处理器时,NiFi在上下文菜单中提供“使用”菜单项。此外,UI在右上角显示“帮助”链接,从中可以找到相同的使用信息。

处理器的高级文档以名为的HTML文件的形式提供additionalDetails.html。此文件应存在于名称为Processor的完全限定名称的目录中,并且应该命名此目录的父级 docs并存在于Processor的jar的根目录中。此文件将从生成的HTML文件链接,该文件将包含所有Capability,Keyword,PropertyDescription和Relationship信息,因此没有必要复制该文件。这个地方可以提供有关此处理器正在执行的操作,预期和生成的数据类型以及预期和生成的FlowFile属性的丰富说明。由于此文档采用HTML格式,因此您可以包含图像和表格以最好地描述此组件。可以使用相同的方法为Processors,ControllerServices和ReportingTasks提供高级文档。

 

源事件

源报告的不同事件类型是:

源事件描述

ADDINFO

表示用于添加其他信息(例如新链接到新URI或UUID)的起源事件

ATTRIBUTES_MODIFIED

表示以某种方式修改了FlowFile的属性。当同时报告另一个事件时,不需要此事件,因为另一个事件已包含所有FlowFile属性

CLONE

表示FlowFile与其父FlowFile完全相同

CONTENT_MODIFIED

表示以某种方式修改了FlowFile的内容。使用此事件类型时,建议您提供有关如何修改内容的详细信息

CREATE

表示FlowFile是从未从远程系统或外部进程接收的数据生成的

DOWNLOAD

表示用户或外部实体下载了FlowFile的内容

DROP

表示由于对象到期之外的某些原因导致对象生命结束的起源事件

EXPIRE

表示由于未及时处理对象而导致对象生命结束的起源事件

FETCH  
FORK

表示一个或多个FlowFiles是从父FlowFile派生的

JOIN

表示单个FlowFile是通过将多个父FlowFiles连接在一起而派生的

RECEIVE

表示从外部进程接收数据的来源事件。此事件类型应该是FlowFile的第一个事件。因此,从外部源接收数据并使用该数据替换现有FlowFile内容的处理器应使用FETCH事件类型,而不是RECEIVE事件类型

REPLAY

表示重放FlowFile的originance事件。事件的UUID指示正在重播的原始FlowFile的UUID。该事件包含一个父UUID,它也是正在重放的FlowFile的UUID,以及一个子UUID,它是新创建的FlowFile的UUID,它将被重新排队等待处理

ROUTE

表示FlowFile已路由到指定的关系,并提供有关FlowFile路由到此关系的原因的信息

SEND

表示将数据发送到外部进程的originance事件

UNKNOWN

表示原产地事件的类型未知,因为尝试访问该事件的用户无权知道该类型

通用处理器模式( Common Processor Patterns )

虽然NiFi用户可以使用许多不同的处理器,但绝大多数处理器属于几种常见的设计模式之一。下面,我们讨论这些模式,模式是否合适,我们遵循这些模式的原因,以及应用此类模式时需要注意的事项。请注意,下面讨论的模式和建议是一般指导原则,而不是强化规则。

数据入口( Data Ingress )

将数据提取到NiFi中的处理器具有一个名为的关系success。此处理器通过ProcessSession create方法生成新的FlowFiles,并且不从传入的Connections中提取FlowFiles。处理器名称以“Get”或“Listen”开头,具体取决于它是轮询外部源还是公开某些外部源可以连接的接口。名称以用于通信的协议结束。遵循这种模式的处理器包括GetFileGetSFTP, ListenHTTP,和GetHTTP

此处理器可以在使用@OnScheduled注释的方法中创建或初始化连接池 。但是,由于通信问题可能会阻止建立连接或导致连接终止,因此此时不会创建连接本身。而是在onTrigger方法中从池创建或租用连接。

onTrigger此处理器的方法首先从连接池租用连接(如果可能),或以其他方式创建与外部服务的连接。当没有来自外部源的数据时,yieldProcessContext 的方法由处理器调用,并且该方法返回,以便该处理器避免持续运行和耗尽资源而没有任何好处。否则,此处理器然后通过ProcessSession的create 方法创建FlowFile,并为FlowFile 分配适当的文件名和路径(通过添加filenamepath 属性),以及可能适当的任何其他属性。通过ProcessSession获取FlowFile内容的OutputStreamwrite方法,传递一个新的OutputStreamCallback(通常是一个匿名的内部类)。在此回调中,处理器能够写入FlowFile并将内容从外部资源流式传输到FlowFile的OutputStream。如果希望将InputStream的整个内容写入FlowFile,则importFromProcessSession 的方法可能比write方法更方便 。

当此处理器希望接收许多小文件时,建议在提交会话之前从单个会话创建多个FlowFiles。通常,这允许框架更有效地处理新创建的FlowFiles的内容。

此处理器生成一个Provenance事件,指示它已接收数据并指定数据来自何处。此处理器应记录FlowFile的创建,以便可以通过分析日志来确定FlowFile的来源(如有必要)。

此处理器确认收到数据和/或从外部源中删除数据,以防止接收重复文件。只有在创建了FlowFile的ProcessSession提交后才能执行此操作!如果不遵守此原则可能会导致数据丢失,因为在提交会话之前重新启动NiFi将导致临时文件被删除。但请注意,使用此方法可以接收重复数据,因为应用程序可以在提交会话之后以及在确认或从外部源中删除数据之前重新启动。但是,一般而言,潜在的数据重复优于潜在的数据丢失。最终返回连接或将连接添加到连接池,具体取决于连接是从连接池租用还是在onTrigger方法中创建。

如果存在通信问题,则通常会终止连接,并且不会将连接返回(或添加)到连接池。在使用注释注释的方法中拆除与远程系统的连接并关闭连接池,@OnStopped以便可以回收资源。

数据出口(Data Egress)

将数据发布到外部源的处理器有两个关系:successfailure。处理器名称以“Put”开头,后跟用于数据传输的协议。遵循这种模式的处理器包括PutEmail,, PutSFTP和 PostHTTP(请注意,该名称不以“Put”开头,因为这会导致混淆,因为PUT和POST在处理HTTP时具有特殊含义)。

此处理器可以在使用@OnScheduled注释的方法中创建或初始化连接池 。但是,由于通信问题可能会阻止建立连接或导致连接终止,因此此时不会创建连接本身。而是在onTrigger方法中从池创建或租用连接。

onTrigger方法首先通过该get方法从ProcessSession获取FlowFile 。如果没有可用的FlowFile,则该方法返回而不获取与远程资源的连接。

如果至少有一个FlowFile可用,则处理器会从连接池获取连接(如果可能),或以其他方式创建新连接。如果处理器既不能从连接池租用连接也不能创建新连接,则FlowFile将路由到failure,记录事件,并返回方法。

如果获得了连接,则处理器通过调用readProcessSession上的方法并传递InputStreamCallback(通常是匿名内部类)来获取FlowFile内容 的InputStream,并从该回调中将FlowFile的内容传输到目标。记录事件以及传输文件所花费的时间和传输文件的数据速率。通过getProvenanceReporter方法从ProcessSession获取报告器并在报告器上调用send方法,向ProvenanceReporter报告SEND事件 。连接将返回或添加到连接池,具体取决于连接是从池租用还是由onTrigger方法新创建 。

如果存在通信问题,则通常会终止连接,并且不会将连接返回(或添加)到连接池。如果将数据发送到远程资源时出现问题,则处理错误的所需方法取决于一些注意事项。如果问题与网络状况有关,则FlowFile通常会路由到failure。FlowFile不会受到惩罚,因为数据没有必要存在问题。与数据入口处理器的情况不同,我们通常不会调用yieldProcessContext。这是因为在摄取的情况下,在处理器能够执行其功能之前,FlowFile不存在。但是,对于Put Processor,DataFlow Manager可以选择路由failure到不同的处理器。这可以允许在一个系统出现问题的情况下使用“备份”系统,或者可以用于跨多个系统的负载分配。

如果出现与数据相关的问题,则应采取两种方法之一。首先,如果问题可能会解决,FlowFile会受到惩罚,然后路由到 failure。例如,当使用PutFTP时,由于文件命名冲突而无法传输FlowFile。假设最终将从目录中删除该文件,以便可以传输新文件。因此,我们会惩罚FlowFile并路由到 failure以后我们可以再试一次。在另一种情况下,如果数据存在实际问题(例如数据不符合某些要求的规范),则可以采用不同的方法。在这种情况下,将failure关系分解为a failure和a 可能是有利的 communications failure关系。这允许DataFlow Manager确定如何单独处理这些情况。在这些情况下,通过在创建关系时在“描述”中对其进行澄清,很好地记录两个关系之间的差异。

与远程系统的连接被拆除,并且连接池在注释的方法中关闭,@OnStopped以便可以回收资源。

基于内容的路由(一对一) Route Based on Content (One-to-One)

根据内容路由数据的处理器将采用以下两种形式之一:将传入的FlowFile路由到恰好一个目标,或将传入数据路由到0个或更多目标。在这里,我们将讨论第一个案例。

此处理器有两种关系:matchedunmatched。如果需要特定的数据格式,则处理器还将具有failure在输入不是预期格式时使用的关系。处理器公开一个指示路由标准的属性。

如果指定路由条件的属性需要处理(例如编译正则表达式),则此处理将在注释的方法中完成@OnScheduled,如果可能的话。然后将结果存储在标记为的成员变量中volatile

onTrigger方法获得单个FlowFile。该方法通过ProcessSession的方法读取FlowFile的内容,read 在数据流传输时评估匹配条件。然后,处理器确定FlowFile是应该路由到matched还是unmatched基于条件是否匹配,并将FlowFile路由到适当的关系。

然后,处理器发出一个Provenance ROUTE事件,指示处理器将FlowFile路由到哪个关系。

此处理器使用 包中的注释@SideEffectFree和 @SupportsBatching注释进行注释org.apache.nifi.annotations.behavior

基于内容的路由(一对多)  Route Based on Content (One-to-Many)

如果处理器将单个FlowFile路由到可能的许多关系,则此处理器将与上述基于内容的路由数据处理器略有不同。此处理器通常具有由用户动态定义的unmatched关系以及关系。

为了使用户能够另外定义Properties,getSupportedDynamicPropertyDescriptor必须重写该方法。此方法返回带有提供的名称和适用的Validator的PropertyDescriptor,以确保用户指定的匹配条件有效。

在此处理器中,getRelationships方法返回的关系集 是标记的成员变量volatile。此Set最初构造为一个名为的Relationship unmatchedonPropertyModified重写该方法,以便在添加或删除属性时,创建具有相同名称的新关系。如果处理器具有非用户定义的属性,则检查指定的属性是否是用户定义的非常重要。这可以通过调用来实现isDynamic传递给此方法的PropertyDescriptor的方法。如果此属性是动态的,则会创建一组新的关系,并将先前的一组关系复制到其中。这个新Set要么添加了新创建的关系,要么从中删除了,具体取决于是否将新属性添加到处理器或删除了属性(通过检查是否有此函数的第三个参数来检测属性删除null)。然后更新包含“关系集”的成员变量以指向此新集。

如果指定路由条件的属性需要处理(例如编译正则表达式),则此处理将在使用注释的方法中完成@OnScheduled(如果可能)。然后将结果存储在标记为的成员变量中volatile。此成员变量通常是Map键类型的类型Relationship,值的类型由处理属性值的结果定义。

onTrigger方法通过getProcessSession 的方法获得FlowFile 。如果没有FlowFile可用,则立即返回。否则,将创建一组类型关系。该方法通过ProcessSession的方法读取FlowFile的内容,read在数据流传输时评估每个匹配条件。对于匹配的任何条件,与该匹配条件关联的关系将添加到“关系集”中。

在读取FlowFile的内容后,该方法检查“关系集”是否为空。如果是这样,原始FlowFile会添加一个属性,以指示它被路由到的关系并被路由到unmatched。记录此信息,发出一个Provenance ROUTE事件,该方法返回。如果Set的大小等于1,则原始FlowFile会添加一个属性,以指示它所路由的关系,并路由到Set中条目指定的Relationship。记录此信息,为FlowFile发出一个Provenance ROUTE事件,该方法返回。

如果Set包含多个Relationship,则Processor会为每个Relationship创建一个FlowFile的副本,第一个除外。这是通过cloneProcessSession 的方法完成的。无需报告CLONE Provenance事件,因为框架将为您处理此事。原始FlowFile和每个克隆被路由到其适当的关系,其中属性指示关系的名称。为每个FlowFile发出一个Provenance ROUTE事件。记录此信息,方法返回。

此处理器使用 包中的注释@SideEffectFree和 @SupportsBatching注释进行注释org.apache.nifi.annotations.behavior

基于内容的路由流(一对多)

先前对基于内容的路由(一对多)的描述提供了用于创建非常强大的处理器的抽象。但是,它假定每个FlowFile将完整路由到零个或多个关系。如果传入的数据格式是许多不同信息的“流”,我们希望将此流的不同部分发送到不同的关系,该怎么办?例如,假设我们想要一个RouteCSV处理器,以便配置多个正则表达式。如果CSV文件中的一行与正则表达式匹配,则该行应包含在出站FlowFile中与关联关系中。如果正则表达式与关系“has-apples”相关联且正则表达式与FlowFile中的1,000行匹配,则应该有一个出站FlowFile用于“has-apples”关系,其中包含1,000行。如果不同的正则表达式与关系“has-oranges”相关联并且正则表达式与FlowFile中的50行匹配,则应该有一个出站FlowFile用于“has-oranges”关系,其中包含50行。即,一个FlowFile进来,两个FlowFiles问世。两个FlowFiles可能包含原始FlowFile中的一些相同的文本行,或者它们可能完全不同。这是我们将在本节中讨论的处理器类型。

此处理器的名称以“Route”开头,并以其路由的数据类型的名称结束。在我们的示例中,我们正在路由CSV数据,因此处理器名为RouteCSV。此处理器支持动态属性。每个用户定义的属性都有一个映射到关系名称的名称。Property的值采用“Match Criteria”所需的格式。在我们的示例中,属性的值必须是有效的正则表达式。

此处理器维护一个内部ConcurrentMap,其中键是a Relationship,值的类型取决于匹配条件的格式。在我们的例子中,我们将保持一个 ConcurrentMap<Relationship, Pattern>。此处理器会覆盖该 onPropertyModified方法。如果提供给此方法的新值(第三个参数)为null,则将从ConcurrentMap中删除名称由属性名称(第一个参数)定义的Relationship。否则,处理新值(在我们的示例中,通过调用Pattern.compile(newValue)),并将此值添加到ConcurrentMap,其中键再次为其名称由属性名称指定的Relationship。

此处理器将覆盖该customValidate方法。在此方法中,它将从中检索所有属性ValidationContext并计算动态PropertyDescriptors的数量(通过调用isDynamic() PropertyDescriptor)。如果动态PropertyDescriptors的数量为0,则表示用户尚未添加任何关系,因此处理器返回 ValidationResult指示处理器无效的因为它没有添加关系。

处理器在getRelationships调用其方法时返回用户指定的所有关系,并且还将返回unmatched关系。因为此处理器必须读取和写入内容存储库(这可能相对昂贵),如果预期此处理器将用于非常高的数据量,则添加允许用户指定是否有用的属性可能是有利的。他们是否关心与任何匹配标准不匹配的数据。

onTrigger调用该方法时,处理器获取FlowFile via ProcessSession.get。如果没有可用数据,则处理器返回。否则,处理器创建一个Map<Relationship, FlowFile>。我们将此地图称为flowFileMap。处理器通过调用读取传入的FlowFile ProcessSession.read 并提供InputStreamCallback。在回调中,处理器从FlowFile读取第一个数据。然后,处理器根据此数据评估每个匹配条件。如果特定条件(在我们的示例中,正则表达式)匹配,则处理器从中获取flowFileMap属于适当关系的FlowFile 。如果此关系的Map中尚不存在FlowFile,则处理器通过调用创建新的FlowFile session.create(incomingFlowFile),然后将新的FlowFile添加到flowFileMap。然后,处理器通过调用写入这一块数据到FlowFile的session.append 用OutputStreamCallback。从这个OutputStreamCallback中,我们可以访问新的FlowFile的OutputStream,因此我们可以将数据写入新的FlowFile。然后我们从OutputStreamCallback返回。在遍历每个匹配条件之后,如果它们都不匹配,我们对unmatched关系执行与上述相同的例程 (除非用户将我们配置为不写出不匹配的数据)。现在我们已经调用了session.append,我们有了一个新版本的FlowFile。因此,我们需要更新我们flowFileMap以将关系与新的FlowFile相关联。

如果在任何时候抛出异常,我们将需要将传入的FlowFile路由到failure。我们还需要删除每个新创建的FlowFiles,因为我们不会将它们转移到任何地方。我们可以通过调用做到这一点session.remove(flowFileMap.values())。此时,我们将记录错误并返回。

否则,如果一切都成功,我们现在可以迭代 flowFileMap并将每个FlowFile传输到相应的关系。然后删除原始FlowFile或将其路由到original关系。对于每个新创建的FlowFiles,我们还会发出一个Provenance ROUTE事件,指示FlowFile前往哪个关系。在ROUTE事件的详细信息中包含此FlowFile中包含多少条信息也很有帮助。这使得DataFlow Manager可以在查看Provenance Lineage视图时轻松查看给定输入FlowFile的每个关系有多少信息。

此外,某些处理器可能需要“分组”发送到每个关系的数据,以便发送到关系的每个FlowFile具有相同的值。在我们的示例中,我们可能希望允许正则表达式具有捕获组,并且如果CSV中的两个不同行与正则表达式匹配但捕获组具有不同的值,我们希望将它们添加到两个不同的FlowFiles。然后可以将匹配值作为属性添加到每个FlowFile。这可以通过修改来实现flowFileMap,使得其被定义为Map<Relationship, Map<T, FlowFile>>其中T是分组功能的类型(在我们的例子中,该集团将是一个String,因为它是评价一个正则表达式的捕获组的结果)。

基本路由属性 (Route Based on Attributes)

该处理器几乎与上述基于内容处理器的路由数据相同。它采用两种不同的形式:一对一和一对多,基于内容的路由处理器也是如此。但是,此处理器不会调用ProcessSession的read 方法,因为它不会读取FlowFile内容。此处理器通常非常快,因此@SupportsBatching在这种情况下注释非常重要。

拆分内容(一对多)

此处理器通常不需要用户配置,除了要创建的每个Split的大小。该onTrigger方法从其输入队列获取FlowFile。创建FlowFile类型的列表。通过ProcessSession的read方法读取原始FlowFile ,并使用InputStreamCallback。在InputStreamCallback中,读取内容直到达到FlowFile应该被拆分的点。如果不需要拆分,则返回Callback,并将原始FlowFile路由到success。在这种情况下,会发出一个Provenance ROUTE事件。通常,在将FlowFile路由到时不会发出ROUTE事件success因为这会产生一个非常冗长的血统,变得难以导航。但是,在这种情况下,事件是有用的,因为否则我们会期望FORK事件,并且没有任何事件可能会引起混淆。将记录FlowFile未拆分但转移到的事实success,并返回该方法。

如果达到需要拆分FlowFile的点,则通过ProcessSession的create(FlowFile)方法或 clone(FlowFile, long, long)方法创建新的FlowFile 。下一部分代码取决于create是使用该clone方法还是使用该方法。两种方法如下所述。必须根据具体情况确定哪种解决方案是合适的。

当数据不能直接从原始FlowFile复制到新的FlowFile时,Create Method最合适。例如,如果仅复制某些数据,或者在复制到新FlowFile之前将以某种方式修改数据,则此方法是必需的。但是,如果新FlowFile的内容将是原始FlowFile的一部分的精确副本,则克隆方法是更优选的。

创建方法 如果使用该create方法,则使用原始FlowFile作为参数调用该方法,以便新创建的FlowFile将继承原始FlowFile的属性,并且框架将创建Provenance FORK事件。

然后代码进入一个try/finally块。在finally 块中,新创建的FlowFile将添加到已创建的FlowFile列表中。这是在一个finally块内完成的,这样如果抛出异常,将适当地清理新创建的FlowFile。在try块中,回调通过write使用OutputStreamCallback 调用ProcessSession的方法来启动新的回调。然后将适当的数据从原始FlowFile的InputStream复制到新FlowFile的OutputStream。

克隆方法 如果新创建的FlowFile的内容只是原始FlowFile的字节的连续子集,则最好使用该clone(FlowFile, long, long)方法而不是create(FlowFile)ProcessSession 的 方法。在这种情况下,新FlowFile内容应该开始的原始FlwoFile的偏移量将作为方法的第二个参数传递clone。新FlowFile的长度作为方法的第三个参数传递clone 。例如,如果原始FlowFile是10,000字节并且我们调用clone(flowFile, 500, 100)了,那么返回给我们的FlowFile将与flowFile关于它的属性。但是,新创建的FlowFile的内容长度为100个字节,并且将从原始FlowFile的偏移量500开始。也就是说,新创建的FlowFile的内容与您复制原始FlowFile的字节500到599的内容相同。

创建克隆后,它将添加到FlowFiles列表中。

在适用的情况下,此方法比Create方法更受欢迎,因为不需要磁盘I / O. 该框架能够简单地创建一个新的FlowFile,它引用原始FlowFile内容的一个子集,而不是实际复制数据。但是,这并不总是可行的。例如,如果必须从原始FlowFile的开头复制标题信息并将其添加到每个Split的开头,则无法使用此方法。

两种方法 无论使用克隆方法还是创建方法,以下内容均适用:

如果在InputStreamCallback中的任何点处,达到了无法继续处理的条件(例如,输入格式错误),ProcessException则应抛出a。对ProcessSession read方法的调用包含在一个被捕获的try/catch块中ProcessException。如果捕获到异常,则会生成一条说明错误的日志消息。通过ProcessSession remove 方法删除新创建的FlowFiles列表。原始FlowFile被路由到failure

如果没有问题,则将原始FlowFile路由到,original 并更新所有新创建的FlowFiles以包含以下属性:

属性名称描述

split.parent.uuid

原始FlowFile的UUID

split.index

一个单一的数字,表示列表中的哪个FlowFile(创建的第一个FlowFile将具有值0,第二个将具有值1,等等)

split.count

已创建的拆分FlowFiles的总数

新创建的FlowFiles被路由到success; 记录此事件; 并且该方法返回。

根据内容更新属性

此处理器与上面讨论的基于内容处理器的路由非常相似。而不是将FlowFile路由到matchedunmatched,FlowFile通常路由到success或者failure 将属性添加到FlowFile中。要添加的属性的配置方式类似于基于内容的路由(一对多),用户可以定义自己的属性。属性的名称表示要添加的属性的名称。属性的值指示要应用于数据的一些匹配条件。如果匹配条件与数据匹配,则添加一个名称与Property属性相同的属性。属性的值是匹配内容的条件。

例如,评估XPath表达式的处理器可能允许输入用户定义的XPath。如果XPath与FlowFile的内容匹配,则该FlowFile将添加一个属性,该属性的名称等于Property name的名称,并且该值等于与XPath匹配的XML Element或Attribute的文本内容。failure如果在此示例中传入的FlowFile不是有效的XML,则将使用该关系。success无论是否找到任何匹配,都将使用该关系。然后,可以在适当时使用它来路由FlowFile。

此处理器发出类型为ATTRIBUTES_MODIFIED的源事件。

丰富/修改内容

Enrich / Modify Content模式非常常见且非常通用。此模式负责任何一般内容修改。对于大多数情况,此处理器标有@SideEffectFree@SupportsBatching注释。处理器具有任意数量的必需属性和可选属性,具体取决于处理器的功能。处理器通常具有successfailure关系。failure当输入文件不是预期格式时,通常使用该关系。

此处理器获取FlowFile并使用ProcessSession的write(StreamCallback)方法对其进行更新,以便它能够从FlowFile的内容中读取并写入FlowFile内容的下一个版本。如果在回调期间遇到错误,则回调将抛出一个ProcessException。对ProcessSession write方法的调用包含在一个try/catch块中,该 块捕获ProcessException 并将FlowFile路由到失败。

如果回调成功,则会发出CONTENT_MODIFIED Provenance事件。

错误处理

编写处理器时,可能会出现几种不同的意外情况。如果处理器本身不处理错误,处理器开发人员必须了解NiFi框架如何运行的机制,并且了解处理器预期的错误处理非常重要。在这里,我们将讨论处理器如何在工作过程中处理意外错误。

处理器中的例外情况

在执行onTrigger处理器的方法期间,许多事情可能会出错。常见的故障情况包括:

任何这些条件都可能导致从处理器抛出异常。从框架的角度来看,有两种类型的异常可以逃脱处理器:ProcessException以及所有其他类型的异常。

如果从处理器抛出ProcessException,框架将假定这是一个已知结果的失败。此外,尝试稍后再次处理数据的条件可能是成功的。因此,框架将回滚正在处理的会话并惩罚正在处理的FlowFiles。

但是,如果任何其他Exception转义处理器,则框架将假定它是开发人员未考虑的故障。在这种情况下,框架还将回滚会话并惩罚FlowFiles。但是,在这种情况下,我们可以进入一些非常有问题的案例。例如,处理器可能处于不良状态并且可能持续运行,耗尽系统资源,而不提供任何有用的工作。这是相当常见的,例如,当连续抛出NullPointerException时。为了避免这种情况,如果ProcessException以外的Exception能够转义处理器 onTrigger方法,框架也将“管理性地产生”处理器。这意味着处理器不会被触发再运行一段时间。时间量在nifi.properties文件中配置,但默认为10秒。

回调中的异常:IOException,RuntimeException

更通常情况下,当一个异常在处理器中发生时,它从一个回调内发生(即, InputStreamCallbackOutputStreamCallback,或StreamCallback)。也就是说,在处理FlowFile的内容时。回调被允许抛出RuntimeExceptionIOException。在RuntimeException的情况下,此异常将传播回该onTrigger方法。在an的情况下 IOException,Exception将被包装在ProcessException中,然后将从Framework抛出此ProcessException。

出于这个原因,建议使用回调的处理器在一个try/catch块内执行,并捕获与其期望回调抛出的ProcessException任何其他处理器RuntimeException。但是,不建议处理器捕获一般ExceptionThrowable案例。由于两个原因,这是不鼓励的。

首先,如果抛出了意外的RuntimeException,它可能是一个错误并允许框架回滚会话将确保没有数据丢失并确保DataFlow Manager能够通过保持排队的数据来处理他们认为合适的数据地点。

其次,当从回调中抛出IOException时,实际上有两种类型的IOExceptions:从处理器代码抛出的那些(例如,数据不是预期的格式或网络连接失败),以及从那里抛出的那些内容存储库(存储FlowFile内容的位置)。如果是后一种情况,框架将捕获此IOException并将其包装到一个FlowFileAccessException扩展中RuntimeException。这是明确完成的,因此Exception将转义onTrigger方法,框架可以适当地处理这个条件。捕获常规异常可防止这种情况发生。

Penalization vs. Yielding

当在处理过程中出现问题时,框架公开了两种方法,允许处理器开发人员避免执行不必要的工作:“惩罚”和“屈服”。对于刚接触NiFi API的开发人员来说,这两个概念可能会让人感到困惑。开发人员可以通过调用来惩罚FlowFilepenalize(FlowFile)ProcessSession的方法。这导致FlowFile本身在一段时间内无法访问下游处理器。FlowFile不可访问的时间由DataFlow Manager通过在Processor Configuration对话框中设置“Penalty Duration”设置来确定。默认值为30秒。通常,这是在处理器确定由于预期将自己排序的环境原因而无法处理数据时完成的。一个很好的例子是PutSFTP处理器,如果SFTP服务器上已存在具有相同文件名的文件,它将惩罚FlowFile。在这种情况下,处理器会惩罚FlowFile并将其路由到失败。然后,DataFlow Manager可以将故障路由回相同的PutSFTP处理器。这样,如果文件存在相同的文件名,处理器不会再次尝试发送文件30秒(或DFM配置处理器使用的任何时间段)。与此同时,它能够继续处理其他FlowFiles。

另一方面,让步允许处理器开发人员向框架指示它将在一段时间内不能执行任何有用的功能。这通常发生在与远程资源通信的处理器上。如果处理器无法连接到远程资源,或者如果远程资源需要提供数据但报告它没有,则处理器应调用yieldProcessContext对象然后返回。通过这样做,处理器告诉框架它不应该浪费资源来触发此处理器运行,因为它无法做任何事情 - 最好使用这些资源来允许其他处理器运行。

会话回滚 ( Session Rollback )

到目前为止,当我们讨论ProcessSession过时,我们通常将其简称为访问FlowFiles的机制。但是,它提供了另一个非常重要的功能,即事务性。在ProcessSession上调用的所有方法都作为事务发生。当我们决定结束交易时,我们可以通过电话 commit()或通过电话来完成rollback()。通常,这由AbstractProcessor类处理:如果onTrigger方法抛出异常,AbstractProcessor将捕获异常,调用session.rollback(),然后重新抛出异常。否则,AbstractProcessor将调用commit()ProcessSession。

但是,有时候开发人员会希望显式回滚会话。这可以通过调用rollback()rollback(boolean)方法随时完成。如果使用后者,则布尔值表示从队列中提取的那些FlowFiles(通过ProcessSession get方法)是否应该在被添加回队列之前受到惩罚。

rollback被调用时,对发生在该届会议上FlowFiles任何修改被丢弃,以既包括内容的修改和属性修改。另外,所有的种源的事件被回滚(与通过使值发射的任何SEND事件之外trueforce参数)。从输入队列中提取的FlowFile然后被传输回输入队列(并且可选地被处罚),以便可以再次处理它们。

另一方面,当commit调用该方法时,FlowFile的新状态将保留在FlowFile存储库中,并且发生的任何Provenance事件都会保留在Provenance存储库中。先前的内容被销毁(除非另一个FlowFile引用相同的内容),并且FlowFiles被传输到出站队列,以便下一个处理器可以对数据进行操作。

注意使用org.apache.nifi.annotations.behavior.SupportsBatching 注释如何影响此行为也很重要。如果处理器使用此注释,则呼叫ProcessSession.commit可能不会立即生效。相反,这些提交可以一起批处理以提供更高的吞吐量。但是,如果处理器在任何时候回滚ProcessSession,则自上次调用以来的所有更改commit都将被丢弃,所有“批处理”提交都将生效。这些“批量”提交不会回滚。

一般设计考虑因素 ( General Design Considerations  )

在设计处理器时,需要考虑一些重要的设计。“开发人员指南”的这一部分将开发人员在创建处理器时应考虑的一些想法放在首位。

考虑用户 (Consider the User)

开发处理器(或任何其他组件)时要记住的最重要的概念之一是您正在创建的用户体验。重要的是要记住,作为这样一个组件的开发者,您可能对其他人没有的上下文有重要的了解。应始终提供文档,以便不熟悉该过程的人员能够轻松使用它。

在考虑用户体验时,同样重要的是要注意一致性非常重要。最好坚持使用标准命名约定。这适用于处理器名称,属性名称和值,关系名称以及用户将体验的任何其他方面。

简单至关重要!避免添加您不希望用户理解或更改的属性。作为开发人员,我们被告知硬编码值很糟糕。但这有时会导致开发人员暴露属性,当要求澄清时,告诉用户只保留默认值。这导致混乱和复杂性。

凝聚力和可重用性 (Cohesion and Reusability)

为了制作单个,有凝聚力的单元,开发人员有时会试图将多个功能组合到一个处理器中。对于处理器期望输入数据采用格式X以便处理器可以将数据转换为格式Y并将新格式化的数据发送到某些外部服务的情况,情况确实如此。

采用这种方法格式化特定端点的数据,然后将数据发送到同一处理器内的该端点有几个缺点:

为了避免这些问题,并使处理器更具可重用性,处理器应始终坚持“做一件事,做得好”的原则。这样的处理器应该分成两个独立的处理器:一个用于将数据从格式X转换为格式Y,另一个处理器用于将数据发送到远程资源。

命名约定 (Naming Conventions)

为了向用户提供一致的外观和感觉,建议处理器遵守标准命名约定。以下是使用的标准约定列表:

处理器行为注释 (Processor Behavior Annotations)

在创建处理器时,开发人员能够向框架提供有关如何最有效地利用处理器的提示。这是通过将注释应用于Processor的类来完成的。可以应用于处理器的注释存在于三个子包中org.apache.nifi.annotations。那些在documentation子包被用于提供文档给用户。lifecycle子包中的那些指示框架应该在处理器上调用哪些方法以响应适当的生命周期事件。behavior包中的那些帮助框架理解如何在调度和一般行为方面与处理器交互。

org.apache.nifi.annotations.behavior包中的以下注释可用于修改框架处理处理器的方式:

数据缓冲(Data Buffering)

需要记住的一点是,NiFi提供了通用的数据处理功能。数据可以是任何格式。处理器通常安排有多个线程。开发人员对NiFi做出的一个常见错误是将FlowFile的所有内容缓冲到内存中。虽然有些情况需要这样做,但应尽可能避免,除非众所周知数据的格式是什么。例如,负责对XML文档执行XPath的处理器需要加载整个数据内容到内存中。这通常是可以接受的,因为预计XML不会非常大。但是,搜索特定字节序列的处理器可用于搜索数百千兆字节或更多的文件。

不是将这些数据缓冲到内存中,而是建议在从内容存储库流式传输时评估数据(即,扫描InputStream提供给回调的内容ProcessSession.read)。当然,在这种情况下,我们不希望从Content Repository读取每个字节,因此我们将使用BufferedInputStream或以某种方式缓冲一些少量数据,视情况而定。

控制器服务(Controller Services)

ControllerService接口允许开发人员以干净,一致的方式在JVM上共享功能和状态。该接口类似于Processor 接口的接口,但没有onTrigger方法,因为Controller Services未安排定期运行,并且Controller Services没有关系,因为它们没有直接集成到流中。相反,它们由处理器,报告任务和其他控制器服务使用。

开发ControllerService

与处理器接口一样,ControllerService接口公开了配置,验证和初始化的方法。这些方法都与Processor接口的initialize方法相同,只是方法是传递a ControllerServiceInitializationContext而不是a ProcessorInitializationContext

控制器服务附带了一个处理器没有的附加约束。控制器服务必须包含扩展的接口ControllerService。然后,实现只能通过其界面进行交互。例如,处理器永远不会被赋予ControllerService的具体实现,因此必须仅通过扩展的接口引用服务ControllerService

这种约束主要是因为处理器可以存在于一个NiFi存档(NAR)中,而处理器所在的控制器服务的实现可以存在于不同的NAR中。这是通过框架动态实现公开的接口来实现的,框架可以切换到适当的ClassLoader并在具体实现上调用所需的方法。但是,为了使其工作,处理器和控制器服务实现必须共享Controller Service接口的相同定义。因此,这两个NAR必须依赖于包含Controller Service接口的NAR。有关更多信息,请参阅NiFi Archives(NAR)

与ControllerService交互

ControllerServices可以通过ControllerServiceLookup或使用identifiesControllerServicePropertyDescriptor的Builder类的方法由Processor,另一个ControllerService或ReportingTask获取 。ControllerServiceLookup可以由处理器从传递给该initialize 方法的ProcessorInitializationContext获得。同样,它是由ControllerService从ControllerServiceInitializationContext获得的,也是由ReportingTask通过传递给该initialize方法的ReportingConfiguration对象获得的。

但是,对于大多数用例,使用identifiesControllerService PropertyDescriptor Builder 的方法是首选,并且是最不复杂的方法。为了使用这个方法,我们创建了一个PropertyDescriptor,它引用了一个Controller服务:

  1.   public static final PropertyDescriptor SSL_CONTEXT_SERVICE = new PropertyDescriptor.Builder()
  2.   .name("SSL Context Service")
  3.   .description("Specified the SSL Context Service that can be used to create secure connections")
  4.   .required(true)
  5.   .identifiesControllerService(SSLContextService.class)
  6.   .build();

使用此方法,将提示用户提供应使用的SSL上下文服务。这是通过向用户提供下拉菜单来完成的,他们可以从中选择已配置的任何SSLContextService配置,而不管实现如何。

为了使用此服务,处理器可以使用以下代码:

  1.   final SSLContextService sslContextService = context.getProperty(SSL_CONTEXT_SERVICE)
  2.   .asControllerService(SSLContextService.class);

请注意,这SSLContextService是一个扩展ControllerService的接口。目前唯一的实施是StandardSSLContextService。但是,处理器开发人员无需担心此细节。

报告任务

到目前为止,我们几乎没有提到如何向外界传达NiFi及其组件的表现。系统是否能够跟上传入的数据速率?系统还能处理多少?在一天的高峰时段与一天中最不忙的时间处理多少数据?

为了回答这些问题以及更多问题,NiFi提供了通过ReportingTask 界面向外部服务报告状态,统计,指标和监控信息的功能。ReportingTasks可以访问大量信息,以确定系统的运行方式。

开发报告任务

与Processor和ControllerService接口一样,ReportingTask接口公开了配置,验证和初始化的方法。这些方法都与Processor和ControllerService接口的initialize方法相同,除了 方法是传递ReportingConfiguration 对象,而不是其他组件接收的初始化对象。ReportingTask还有一个onTrigger框架调用的方法,用于触发任务执行其任务。

在该onTrigger方法中,ReportingTask被授权访问ReportingContext,从中可以获得有关NiFi实例的配置和信息。BulletinRepository允许查询公告,并允许ReportingTask提交自己的公告,以便将信息呈现给用户。可通过Context访问的ControllerServiceLookup提供对已配置的ControllerServices的访问。但是,这种获取Controller Services的方法不是首选方法。相反,获取Controller服务的首选方法是在PropertyDescriptor中引用Controller服务,如与ControllerService交互部分中所述。

EventAccess通过ReportingContext公开的对象提供对该对象的访问ProcessGroupStatus,该对象公开有关过程组,处理器,连接和其他组件在过去五分钟内处理的数据量的统计信息。此外,EventAccess对象提供对存储在其中的ProvenanceEventRecords的访问ProvenanceEventRepository。当从外部源接收数据,发送到外部服务,从系统中删除,修改或根据所做出的决定路由时,处理器发出这些源事件事件。

每个ProvenanceEvent都具有FlowFile的ID,事件的类型,事件的创建时间,以及组件访问FlowFile时与FlowFile关联的所有FlowFile属性以及与之关联的FlowFile属性。 FlowFile是事件描述的处理结果。这为ReportingTasks提供了大量信息,允许以多种不同方式生成报告,以公开任何操作问题所需的指标和监控功能。

UI扩展

NiFi中有两个UI扩展点:

可以创建自定义UI以提供超出大多数处理器设置中可用的标准属性/值表的配置选项。具有自定义UI的处理器的示例是UpdateAttributeJoltTransformJSON

可以创建内容查看器以扩展可在NiFi中查看的数据类型。NiFi在lib目录中附带NAR,其中包含数据类型的内容查看器,如csv,xml,avro,json(standard-nar)和图像类型,如png,jpeg和gif(media-nar)。

自定义处理器UI

要将自定义UI添加到处理器:

  1. 创建您的UI。

  2. 在处理器NAR中构建并捆绑WAR。

  3. WAR需要nifi-processor-configuration在META-INF目录中包含一个文件,该文件将Custom UI与该处理器相关联。

  4. 将NAR放在lib目录中,它将在NiFi启动时被发现。

  5. 在处理器的“配置处理器”窗口中,“属性”选项卡现在应该有一个Advanced按钮,该按钮将访问自定义UI。

例如,以下是UpdateAttribute的NAR布局:

更新属性NAR布局

 
  1.   nifi-update-attribute-bundle
  2.   │
  3.   ├── nifi-update-attribute-model
  4.   │
  5.   ├── nifi-update-attribute-nar
  6.   │
  7.   ├── nifi-update-attribute-processor
  8.   │
  9.   ├── nifi-update-attribute-ui
  10.   │ ├── pom.xml
  11.   │ └── src
  12.   │ └── main
  13.   │ ├── java
  14.   │ ├── resources
  15.   │ └── webapp
  16.   │ └── css
  17.   │ └── images
  18.   │ └── js
  19.   │ └── META-INF
  20.   │ │ └── nifi-processor-configuration
  21.   │ └── WEB-INF
  22.   │
  23.   └── pom.xml

 

 

内容nifi-processor-configuration如下:

 

org.apache.nifi.processors.attributes.UpdateAttribute:${project.groupId}:nifi-update-attribute-nar:${project.version}

 

 

  还可以为Controller Services和Reporting Tasks实现自定义UI。

内容浏览者

要添加内容查看器:

  1. 在处理器NAR中构建并捆绑WAR。

  2. WAR需要nifi-content-viewer在META-INF目录中包含一个文件,该目录列出了支持的内容类型。

  3. 将NAR放在lib目录中,它将在NiFi启动时被发现。

  4. 遇到匹配的内容类型时,内容查看器将生成适当的视图。

一个很好的例子是标准内容查看器的NAR布局:

标准内容查看器NAR布局

 
  1.   nifi-standard-bundle
  2.   │
  3.   ├── nifi-jolt-transform-json-ui
  4.   │
  5.   ├── nifi-standard-content-viewer
  6.   │ ├── pom.xml
  7.   │ └── src
  8.   │ └── main
  9.   │ ├── java
  10.   │ ├── resources
  11.   │ └── webapp
  12.   │ └── css
  13.   │ └── META-INF
  14.   │ │ └── nifi-content-viewer
  15.   │ └── WEB-INF
  16.   │
  17.   ├── nifi-standard-nar
  18.   │
  19.   ├── nifi-standard-prioritizers
  20.   │
  21.   ├── nifi-standard-processors
  22.   │
  23.   ├── nifi-standard-reporting-tasks
  24.   │
  25.   ├── nifi-standard-utils
  26.   │
  27.   └── pom.xml

 

内容nifi-content-viewer如下:

  1.   application/xml
  2.   application/json
  3.   text/plain
  4.   text/csv
  5.   avro/binary
  6.   application/avro-binary
  7.   application/avro+binary

命令行工具

TLS-工具包

客户端/服务器操作模式来自于希望自动生成所需的TLS配置工件而无需在集中位置执行该生成。这简化了群集环境中的配置。由于我们不一定拥有运行生成逻辑或可信证书颁发机构的中心位置,因此使用共享密钥来相互验证客户端和服务器。

tls-toolkit使用HMAC验证CA服务器的公钥和客户端发送的CSR来防止中间人攻击。共享密钥(令牌)用作HMAC密钥。

基本流程如下:

  1. 客户端生成KeyPair。

  2. 客户端生成包含CSR和HMAC的请求json有效负载,其中令牌作为密钥,CSR的公钥指纹作为数据。

  3. 客户端连接到指定的https端口上的CA主机名,并验证CA证书的CN与主机名匹配(注意:因为此时我们不信任CA,这增加了无安全性,这只是一种方式如果可能的话,提早错误)。

  4. 服务器使用令牌作为密钥并使用CSR的公钥指纹作为数据,从客户端有效负载验证HMAC。这证明客户端知道共享密钥,并且希望对具有该公钥的CSR进行签名。(注意:中间的人可以转发此功能,但无法在不使HMAC无效的情况下更改CSR,从而无法实现目的)。

  5. 服务器对CSR进行签名,并发回包含证书的响应json有效负载和以令牌作为密钥的HMAC以及其公钥的指纹作为数据。

  6. 客户端使用令牌作为密钥以及由TLS会话提供的证书公钥的指纹来验证响应HMAC。这验证了知道共享密钥的CA是我们通过TLS与之交谈的CA.

  7. 客户端验证来自TLS会话的CA证书是否在有效负载中签署了证书。

  8. 客户端将生成的KeyPair添加到具有证书链的密钥库,并将CA证书从TLS连接添加到其信任库。

  9. 客户端写出包含密钥库,信任库密码和有关交换的其他详细信息的配置json。

测试

测试将在更大的框架内使用的组件通常非常麻烦和棘手。通过NiFi,我们努力使测试组件尽可能简单。为此,我们创建了一个nifi-mock可与JUnit结合使用的模块,以提供对组件的广泛测试。

模拟框架主要用于测试处理器,因为它们是迄今为止最常开发的扩展点。但是,该框架确实提供了测试Controller服务的能力。

通常通过创建功能测试来验证组件行为,从而对组件进行测试。这样做是因为处理器通常由少数辅助方法组成,但逻辑将主要包含在onTrigger方法中。该TestRunner接口允许我们通过将更多的“原始”的对象,如文件和字节数组到FlowFiles来测试处理器和控制器服务,并处理创建ProcessSessions和所需的处理器ProcessContexts做它的工作,以及调用在必要的生命周期方法为了确保处理器在单元测试中的行为与在生产中的行为相同。

实例化TestRunner

处理器或控制器服务的大多数单元测试都是通过创建TestRunner 类的实例来开始的。为了向处理器添加必要的类,您可以使用Maven依赖项:

  1.   <dependency>
  2.   <groupId>org.apache.nifi</groupId>
  3.   <artifactId>nifi-mock</artifactId>
  4.   <version>${nifi version}</version>
  5.   </dependency>

我们TestRunner通过调用类的一个静态newTestRunner方法TestRunners(位于org.apache.nifi.util包中)来创建一个新的。这些方法为被测处理器提供了一个参数(可以是要测试的处理器的类,也可以是处理器的实例),并允许设置处理器名称。

添加ControllerServices

在创建新的测试运行器之后,我们可以将任何控制器服务添加到我们的处理器将需要的测试运行器以执行其工作。我们通过调用该addControllerService方法并提供Controller Service的标识符和Controller Service的实例来完成此操作。

如果控制器服务需要进行配置,它的性质可以通过调用被设置setProperty(ControllerService, PropertyDescriptor, String)setProperty(ControllerService, String, String)setProperty(ControllerService, PropertyDescriptor, AllowableValue)方法。每种方法都返回一个 ValidationResult。然后可以检查此对象以确保通过调用该属性有效isValid。可以通过调用setAnnotationData(ControllerService, String)方法来设置注释数据。

我们现在可以通过调用来确保Controller Service有效assertValid(ControllerService)- 或者如果通过调用测试Controller Service本身,则确保配置的值无效 assertNotValid(ControllerService)

将Controller Service添加到Test Runner并进行配置后,现在可以通过调用该enableControllerService(ControllerService)方法启用它 。如果Controller Service无效,则此方法将抛出IllegalStateException。否则,该服务现在可以使用了。

设置属性值

配置任何必要的Controller服务后,我们需要配置我们的处理器。我们可以通过调用与Controller Services相同的方法来完成此操作,而无需指定任何Controller Service。即,我们可以打电话setProperty(PropertyDescriptor, String),等等。每个setProperty方法再次返回一个ValidationResult属性,可用于确保属性值有效。

同样,根据我们的期望,我们也可以调用assertValid()assertNotValid()确保处理器的配置是否有效。

排队FlowFiles

在触发处理器运行之前,通常需要将FlowFiles排入队列以供处理器处理。这可以通过使用该类的enqueue方法来实现TestRunner。该enqueue方法具有几个不同的覆盖,并允许在的形式被添加的数据byte[]InputStreamPath。这些方法中的每一种还支持允许Map<String, String>添加以支持FlowFile属性的变体。

此外,还有一种enqueue方法可以获取FlowFile对象的var-args。例如,这可用于获取处理器的输出,然后将其提供给处理器的输入。

运行处理器

配置Controller Services并将必要的FlowFile排入队列后,可以通过调用run方法来触发处理器运行TestRunner。如果在没有任何参数的情况下调用此方法,它将使用@OnScheduled注释调用Processor中的任何方法,调用Processor的onTrigger方法一次,然后运行@OnUnscheduledfinally @OnStopped方法。

如果希望在触发onTrigger其他事件@OnUnscheduled和 @OnStopped生命周期事件之前运行该方法的多次迭代,则该run(int)方法可用于指定现在onTrigger应该调用的许多迭代。

还有,当我们想要触发处理器上运行,但不会触发时间@OnUnscheduled@OnStopped 生命周期事件。例如,这有助于在这些事件发生之前检查处理器的状态。这可以使用run(int, boolean)和传递false作为第二个参数来实现。但是,在执行此操作后,调用@OnScheduled生命周期方法可能会导致问题。因此,我们现在可以onTrigger再次运行,而不会通过使用方法的run(int,boolean,boolean)版本runfalse作为第三个参数传递来发生这些事件。

如果测试多个线程发生的行为很有用,这也可以通过调用setThreadCount方法来实现 TestRunner。默认值为1个线程。如果使用多个线程,请务必记住,run调用TestRunner指定应触发处理器的次数,而不是每个线程应触发处理器的次数。因此,如果线程计数设置为2但 run(1)被调用,则只使用一个线程。

验证输出

处理器运行完毕后,单元测试通常需要验证FlowFiles是否符合预期要求。这可以使用TestRunnersassertAllFlowFilesTransferred和 assertTransferCount方法来实现。前一种方法将关系和整数作为参数,以指示应该将多少FlowFiles传输到该关系。除非将此数量的FlowFiles转移到给定的关系或者任何FlowFile被转移到任何其他关系,否则该方法将无法通过单元测试。该assertTransferCount方法仅验证FlowFile计数是给定关系的预期数量。

验证计数后,我们可以通过该getFlowFilesForRelationship方法获得实际的输出FlowFiles 。这个方法返回一个List<MockFlowFile>。重要的是要注意List的类型MockFlowFile,而不是FlowFile接口。这样做是因为MockFlowFile有许多方法可以验证内容。

例如,MockFlowFile有断言FlowFile属性exists(assertAttributeExists)的方法,声明其他属性不存在(assertAttributeNotExists),或者Attributes具有正确的值(assertAttributeEqualsassertAttributeNotEquals)。存在用于验证FlowFile的内容的类似方法。可以将FlowFile的内容与a byte[],和InputStream文件或String进行比较。如果预期数据是文本的,则首选String版本,因为如果输出不是预期的,它会提供更直观的错误消息。

模拟外部资源

测试连接到远程资源的NiFi处理器时最大的问题之一是我们不希望从单元测试中实际连接到某些远程资源。我们可以在单元测试中自己站起来一个简单的服务器并配置处理器与之通信,但是我们必须理解并实现特定于服务器的规范,并且可能无法正确发送错误消息等我们想要测试。

通常,这里采用的方法是在处理器中具有负责获得远程资源的连接或客户端的方法。我们通常将此方法标记为受保护。在单元测试中,我们不是TestRunner通过调用TestRunners.newTestRunner(Class)和提供Processor类来创建,而是在单元测试中创建处理器的子类并使用它:

  1.   @Test
  2.   public void testConnectionFailure() {
  3.   final TestRunner runner = TestRunners.newTestRunner(new MyProcessor() {
  4.   protected Client getClient() {
  5.   // Return a mocked out client here.
  6.   return new Client() {
  7.   public void connect() throws IOException {
  8.   throw new IOException();
  9.   }
  10.    
  11.   // ...
  12.   // other client methods
  13.   // ...
  14.   };
  15.   }
  16.   });
  17.    
  18.   // rest of unit test.
  19.   }

这允许我们实现一个客户端,该客户端模拟所有网络通信并返回我们想要测试的不同错误结果,并确保我们的逻辑对于处理成功调用客户端是正确的。

附加测试功能

除了测试框架提供的上述功能外,TestRunner还提供了几种方便的方法来验证处理器的行为。提供了用于确保已清空处理器的输入队列的方法。单元测试能够获取ProcessContext,ProcessSessionFactory,ProvenanceReporter以及TestRunner将使用的其他特定于框架的实体。该shutdown方法提供了测试注释为仅在关闭NiFi时运行的处理器方法的能力。可以为使用自定义用户界面的处理器设置注释数据。最后,可以通过该setThreadCount(int)方法设置应该用于运行处理器的线程数。

NiFi档案馆(NAR)

当来自许多不同组织的软件都托管在同一环境中时,Java ClassLoaders很快成为一个问题。如果多个组件依赖于同一个库但每个组件依赖于不同的版本,则会出现许多问题,通常会导致出现意外行为或NoClassDefFoundError错误。为了防止这些问题成为问题,NiFi引入了NiFi存档或NAR的概念。

NAR允许将多个组件及其依赖项一起打包到单个包中。然后,NAR包与其他NAR包提供ClassLoader隔离。开发人员应始终将其NiFi组件部署为NAR包。

为此,开发人员创建了一个新的Maven工件,我们将其称为NAR工件。包装设定为nardependencies然后创建POM 的部分,以便NAR依赖于要包含在NAR中的所有NiFi组件。

为了使用包装nar,我们必须使用该nifi-nar-maven-plugin模块。通过将以下代码段添加到NAR的pom.xml中来实现此目的:

  1.   <build>
  2.   <plugins>
  3.   <plugin>
  4.   <groupId>org.apache.nifi</groupId>
  5.   <artifactId>nifi-nar-maven-plugin</artifactId>
  6.   <version>1.1.0</version>
  7.   <extensions>true</extensions>
  8.   </plugin>
  9.   </plugins>
  10.   </build>

在Apache NiFi代码库中,这存在于NiFi根POM中,其中所有其他NiFi工件(除了nifi-nar-maven-plugin本身)都继承,因此我们不需要将其包含在我们的任何其他工具中POM文件。

NAR能够具有一个类型的依赖项nar。如果指定了多个类型的依赖项 nar,那么nifi-nar-maven-plugin将会出错。如果NAR A在NAR B上添加依赖关系,这将不会导致NAR B打包NAR A的所有组件。相反,这将向 NAR A Nar-Dependency-IdMANIFEST.MF文件添加一个元素。这将导致设置NAR B的ClassLoader作为NAR A的Parent ClassLoader。在这种情况下,我们将NAR B称为NAR A 的Parent

Parent ClassLoaders的这种链接是NiFi使用的机制,以便在所有NAR之间共享Controller Services。如“ 开发 控制器服务”部分所述,必须将控制器服务分为扩展的接口ControllerService和实现该接口的实现。只要Controller Service Implementation和Processor共享Controller Service接口的相同定义,Controller Services就可以从任何处理器引用,无论它在哪个NAR中。

为了共享相同的定义,处理器的NAR和Controller服务实现的NAR必须作为父服务器服务定义的NAR。示例层次结构可能如下所示:

控制器服务NAR布局

 
  1.   root
  2.   ├── my-controller-service-api
  3.   │ ├── pom.xml
  4.   │ └── src
  5.   │ └── main
  6.   │ └── java
  7.   │ └── org
  8.   │ └── my
  9.   │ └── services
  10.   │ └── MyService.java
  11.   │
  12.   ├── my-controller-service-api-nar
  13.   │ └── pom.xml //此POM文件的类型为nar。它依赖于 nifi-standard-services-api-nar。
  14.   │
  15.   │
  16.   │
  17.   ├── my-controller-service-impl
  18.   │ ├── pom.xml //此POM文件是类型jar。它依赖于 my-controller-service-api。它不具有任何依赖 nar的工件
  19.   │ └── src
  20.   │ ├── main
  21.   │ │ ├── java
  22.   │ │ │ └── org
  23.   │ │ │ └── my
  24.   │ │ │ └── services
  25.   │ │ │ └── MyServiceImpl.java
  26.   │ │ └── resources
  27.   │ │ └── META-INF
  28.   │ │ └── services
  29.   │ │ └── org.apache.nifi.controller.ControllerService
  30.   │ └── test
  31.   │ └── java
  32.   │ └── org
  33.   │ └── my
  34.   │ └── services
  35.   │ └── TestMyServiceImpl.java
  36.   │
  37.   │
  38.   ├── my-controller-service-nar
  39.   │ └── pom.xml // 此POM文件的类型为nar。它依赖于 my-controller-service-api-nar。
  40.   │
  41.   │
  42.   └── other-processor-nar
  43.   └── pom.xml //此POM文件的类型为nar。它依赖于 my-controller-service-api-nar。

 

虽然这些看起来可能看起来非常复杂,但在创建一次或两次这样的层次结构之后,它变得复杂得多。请注意,这 my-controller-service-api-nar依赖于 nifi-standard-services-api-nar。这样做是为了使任何具有依赖性的NAR my-controller-service-api-nar也能够访问由nifi-standard-services-api-narSSLContextService 提供的所有Controller服务 。在同一个叶片中,没有必要为每个服务创建不同的“service-api”NAR。相反,通常有一个“service-api”NAR封装了许多不同的Controller服务的API,这通常是有意义的。nifi-standard-services-api-nar。通常,API不会包含广泛的依赖关系,因此,ClassLoader隔离可能不那么重要,因此将许多API工件集中到同一个NAR中通常是可以接受的。

Per-Instance ClassLoading

组件开发人员可能希望在运行时向组件的类路径添加其他资源。例如,您可能希望将JDBC驱动程序的位置提供给与关系数据库交互的处理器,从而允许处理器使用任何驱动程序,而不是尝试将驱动程序捆绑到NAR中。

这可以通过将一个或多个PropertyDescriptor实例声明 dynamicallyModifiesClasspath为true来实现。例如:

  1.   PropertyDescriptor EXTRA_RESOURCE = new PropertyDescriptor.Builder()
  2.   .name("Extra Resources")
  3.   .description("The path to one or more resources to add to the classpath.")
  4.   .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
  5.   .expressionLanguageSupported(true)
  6.   .dynamicallyModifiesClasspath(true)
  7.   .build();

在组件上设置这些属性后,框架将标识所有dynamicallyModifiesClasspath设置为true的属性 。对于每个属性,框架都尝试从属性的值中解析文件系统资源。该值可以是一个或多个目录或文件的逗号分隔列表,其中跳过任何不存在的路径。如果资源表示目录,则列出该目录,并且该目录中的所有文件都将单独添加到类路径中。

每个属性可以通过验证器对值的格式施加进一步的限制。例如,使用StandardValidators.FILE_EXISTS_VALIDATOR将属性限制为接受单个文件。使用StandardValidators.NON_EMPTY_VALIDATOR允许逗号分隔文件或目录的任意组合。

通过将资源添加到始终首先检查的内部ClassLoader,将资源添加到实例ClassLoader。只要这些属性的值发生更改,内部ClassLoader就会关闭并使用新资源重新创建。

NiFi提供了@RequiresInstanceClassLoading注释,以进一步扩展和隔离组件类路径上可用的库。您可以注释组件,@RequiresInstanceClassLoading 以指示组件的实例ClassLoader需要组件的NAR ClassLoader中所有资源的副本。如果@RequiresInstanceClassLoading不存在,实例ClassLoader只是将其父ClassLoader设置为NAR ClassLoader,而不是复制资源。

@RequiresInstanceClassLoading注释还提供了一个可选的标志`cloneAncestorResources'。如果设置为true,则实例ClassLoader将包含祖先资源,直到包含组件引用的控制器服务API的第一个ClassLoader,或者直到Jetty NAR。如果设置为false或未指定,则仅包含组件NAR中的资源。

因为@RequiresInstanceClassLoading为组件的每个实例从NAR ClassLoader复制资源,所以明智地使用此功能。如果创建了一个组件的十个实例,则组件的NAR ClassLoader中的所有类都将加载到内存中十次。当创建足够的组件实例时,这最终会显着增加内存占用。

此外,在使用Controller Services时使用@RequiresInstanceClassLoading时存在一些限制。处理器,报告任务和控制器服务可以在其属性描述符之一中引用Controller Service API。当Controller Service API与引用它的组件或Controller Service实现捆绑在同一NAR中时,可能会出现问题。如果遇到这些情况之一并且扩展需要实例类加载,则将跳过扩展并记录适当的ERROR。要解决此问题,Controller Service API应捆绑在父NAR中。引用该服务的服务实现和扩展应取决于Controller Service API NAR。请参阅中的Controller Service NAR布局NiFi档案馆(NARs)部分。只要Controller Service API与需要它的扩展捆绑在一起,即使未使用@RequiresInstanceClassLoading,也会记录一条警告以帮助避免这种不良做法。

弃用组件

有时可能需要弃用组件。每当发生这种情况时,开发人员可以使用@DeprecationNotice注释来指示组件已被弃用,允许开发人员描述弃用的原因并建议替代组件。下面是一个如何执行此操作的示例:

  1.   @DeprecationNotice(alternatives = {ListenSyslog.class}, classNames = {"org.apache.nifi.processors.standard.ListenRELP"}, reason = "Technology has been superseded", )
  2.   public class ListenOldProtocol extends AbstractProcessor {

如您所见,替代方案可用于定义替代组件和数组,而classNames可用于通过字符串数组表示类似内容。

如何为Apache NiFi做出贡献

我们总是很高兴能够从社区中获得贡献 - 尤其是来自新贡献者的贡献!我们有兴趣接受代码的贡献,以及文档甚至可以作为图标或样式应用于应用程序的艺术作品。

技术

Apache NiFi的后端是用Java编写的。Web层利用JAX-RS和JavaScript广泛用于提供用户界面。我们依赖于几个第三方JavaScript库,包括D3和JQuery等。我们为我们的构建使用Apache Maven,为我们的版本控制系统使用Git。

文档在AsciiDoc中创建。

从哪儿开始?

NiFi的JIRA页面可用于查找标记为“初学者”的票证,或者您可以深入了解创建处理器的任何票证。处理器应该是独立的,不依赖于其他外部组件(Controller Services除外),因此它们为新的NiFi开发人员提供了良好的起点。这使开发人员暴露于NiFi API,并且是数据流系统中最具扩展性的部分。

系统级和概述文档位于“<code checkout location> / nifi / nifi-docs / src / main / asciidoc”中。可以使用实时预览编辑AsciiDoc,以便于生成文档。

提供捐款

可以通过创建补丁来提供贡献:

git format-patch

并将该补丁附加到故障单,或生成Pull请求。

有关贡献的更多详细信息,请参阅贡献者指南的相关部分。

联系我们

开发人员邮件列表(dev@nifi.apache.org)受到密切监控,我们倾向于快速响应。如果您有任何疑问,请随时向我们发送电子邮件 - 我们随时为您提供帮助!不幸的是,电子邮件可能会在随机播放中丢失,所以如果您发送电子邮件并且在一两天内没有收到回复,那就是我们的错 - 不要担心打扰我们。只需再次ping邮件列表。

标签:NiFi,调用,开发人员,FlowFile,处理器,Apache,方法,属性
来源: https://www.cnblogs.com/jobbible/p/12400610.html