Spring Integration:使用聚合后再次拆分消息?
作者:互联网
在我的Spring Integration驱动项目中,我有一个分离器和有效负载路由器,用于将数据发送到各种变换器.然后将新的“已转换”对象传递回聚合器并进行处理.
现在我想分割我的聚合结果,以便它们保持正确,因为我需要将一些对象路由到单独的出站通道适配器.为实现这一目标,我在聚合器之后添加了第二个拆分器;但似乎只有聚合集合中的第一个元素被传递给路由器.
这是我目前的流程:
<splitter ref="articleContentExtractor" />
<!-- This router works exactly as expected -->
<payload-type-router>
... routing to various transformers ...
... results are sent to articleOutAggregateChannel ...
</payload-type-router>
<aggregator ref="articleAggregator" />
<splitter />
<!-- This is where it seems to go wrong, the second
splitter returns only the first object in the collection -->
<payload-type-router resolution-required="true">
<mapping type="x.y.z.AbstractContent" channel="contentOutChannel" />
<mapping type="x.y.z.Staff" channel="staffOutChannel" />
</payload-type-router>
<outbound-channel-adapter id="contentSaveService" ref="contentExporter"
method="persist" channel="contentOutChannel" />
<outbound-channel-adapter id="staffSaveService" ref="staffExporter"
method="persist" channel="staffOutChannel" />
我的聚合器代码:
@Aggregator
public List<? super BaseObject> compileArticle(List<? super BaseObject> parts) {
// Search for the required objects for referencing
Iterator<? super BaseObject> it = parts.iterator();
Article article = null;
List<Staff> authors = new ArrayList<Staff>();
while (it.hasNext()) {
Object part = it.next();
if (part instanceof Article) {
article = (Article)part;
}
else if (part instanceof Staff) {
authors.add((Staff)part);
}
}
// Apply references
article.setAuthors(authors);
return parts;
}
我究竟做错了什么?我正确使用我的聚合器吗?
注意:如果我只是完全删除聚合器和第二个分离器,那么流程的其余部分就可以完美地工作.
解决方法:
由于我没有你所有的代码,因此很难说清楚所发生的一切,但我能够按照你想要的方式使这个流程工作. baseObjectTransformer除了在通过它的对象上设置标志外什么都不做:
...
<context:component-scan base-package="net.grogscave.example" />
<channel id="inputChannel" />
<splitter ref="articleContentExtractor" input-channel="inputChannel"
output-channel="splitArticleStaff" />
<channel id="splitArticleStaff" />
<payload-type-router input-channel="splitArticleStaff">
<mapping type="net.grogscave.example.domain.Article" channel="articleChannel" />
<mapping type="net.grogscave.example.domain.Staff" channel="staffChannel" />
</payload-type-router>
<channel id="articleChannel" />
<transformer input-channel="articleChannel" output-channel="articleOutAggregateChannel"
ref="baseObjectTransformer"/>
<channel id="staffChannel" />
<transformer input-channel="staffChannel" output-channel="articleOutAggregateChannel"
ref="baseObjectTransformer"/>
<channel id="articleOutAggregateChannel" />
<aggregator ref="articleAggregator" input-channel="articleOutAggregateChannel" output-channel="splitArticleInChannel"/>
<channel id="splitArticleInChannel" />
<splitter input-channel="splitArticleInChannel" output-channel="splitArticleOutChannel" />
<channel id="splitArticleOutChannel" />
<payload-type-router resolution-required="true" input-channel="splitArticleOutChannel">
<mapping type="net.grogscave.example.domain.Article" channel="contentOutChannel" />
<mapping type="net.grogscave.example.domain.Staff" channel="staffOutChannel" />
</payload-type-router>
<channel id="contentOutChannel">
<queue capacity="10" />
</channel>
<channel id="staffOutChannel">
<queue capacity="10" />
</channel>
...
然后我在测试用例中使用以下代码运行此流程:
...
AbstractApplicationContext context = new ClassPathXmlApplicationContext(
"/META-INF/spring/split-agg-split.xml",
SplitAggSplitTests.class);
MessageChannel inputChannel = context.getBean("inputChannel",
MessageChannel.class);
PollableChannel contentOutChannel = context.getBean("contentOutChannel",
PollableChannel.class);
PollableChannel staffOutChannel = context.getBean("staffOutChannel",
PollableChannel.class);
inputChannel.send(new GenericMessage<String>("Dewey Wins!,A. Fool, C. Lewlis"));
logger.info("==> Article recieved: "
+ contentOutChannel.receive(0).getPayload());
for(int i = 0; i < 2; i++)
{
logger.info("==> Staff recieved: "
+ staffOutChannel.receive(0).getPayload());
}
...
我的日志输出产生所有三个实体.
所有这些,你是否考虑过简单地删除额外的分离器/聚合器并简单地将setAuthors逻辑移动到第一个分离器?我不知道你的流程的所有细节,但它似乎简化了事情.
标签:spring,spring-integration,aggregator 来源: https://codeday.me/bug/20190710/1418790.html