c#-使用MassTransit重新传递RabbitMq消息时保留标头
作者:互联网
目的:重新传递消息时,我需要跟踪标题.
组态:
> RabbitMQ 3.7.9
> Erlang 21.2
>大众运输5.1.5
> Quartz数据库的MySql 8.0
我尝试过的没有成功的事情:
第一次尝试:
await context.Redeliver(TimeSpan.FromSeconds(5), (consumeCtx, sendCtx) => {
if (consumeCtx.Headers.TryGetHeader("SenderApp", out object sender))
{
sendCtx.Headers.Set("SenderApp", sender);
}
}).ConfigureAwait(false);
第二次尝试:
protected Task ScheduleSend(Uri rabbitUri, double delay)
{
return GetBus().ScheduleSend<IProcessOrganisationUpdate>(
rabbitUri,
TimeSpan.FromSeconds(delay),
_Data,
new HeaderPipe(_SenderApp, 0));
}
public class HeaderPipe : IPipe<SendContext>
{
private readonly byte _Priority;
private readonly string _SenderApp;
public HeaderPipe (byte priority)
{
_Priority = priority;
_SenderApp = Assembly.GetEntryAssembly()?.GetName()?.Name ?? "Default";
}
public HeaderPipe (string senderApp, byte priority)
{
_Priority = priority;
_SenderApp = senderApp;
}
public void Probe (ProbeContext context)
{ }
public Task Send (SendContext context)
{
context.Headers.Set("SenderApp", _SenderApp);
context.SetPriority(_Priority);
return Task.CompletedTask;
}
}
预期:FinQuest.Robot.DBProcess
结果:null
我用我的SenderApp登录Consume方法.第一次看起来像这样
Initial trigger checking returns true for FinQuest.Robots.OrganisationLinkedinFeed (id: 001ae487-ad3d-4619-8d34-367881ec91ba, sender: FinQuest.Robot.DBProcess, modif: LinkedIn)
重新交付后看起来像这样
Initial trigger checking returns true for FinQuest.Robots.OrganisationLinkedinFeed (id: 001ae487-ad3d-4619-8d34-367881ec91ba, sender: , modif: LinkedIn)
我做错了什么?由于最大重试次数,我不想使用重试功能(我不想受到限制).
提前致谢.
解决方法:
可能需要使用重新交付过滤器使用的一种方法:
https://github.com/MassTransit/MassTransit/blob/develop/src/MassTransit/SendContextExtensions.cs#L90
public static void TransferConsumeContextHeaders(this SendContext sendContext, ConsumeContext consumeContext)
在您的代码中,您将使用它:
await context.Redeliver(TimeSpan.FromSeconds(5), (consumeCtx, sendCtx) => {
sendCtx.TransferConsumeContextHeaders(consumeCtx);
});
标签:quartz-net,rabbitmq,masstransit,c 来源: https://codeday.me/bug/20191108/2005296.html