.NET core 实现rocketmq生产及消费
作者:互联网
生产
1 先去rocketmqredashboard中新建一个主题
主题内容
2 执行生产者代码 发送 主题
查看详情
3 附代码 需要Nuget下载NewLife.RocketMQ 包
static void Main(string[] args)
{
Console.WriteLine("Hello World!");
//mq对象
using var mq = new Producer
{
//主题
Topic = "topic1",
//AccessKey= "DataExchange",
//SecretKey= "cd30hf9insi",
//服务地址
NameServerAddress = "127.0.0.1:9876",
};
mq.Start();
int i = 1;
//轮询发消息
while (true)
{
var content = DateTime.Now.ToString("yyyy年MM月dd日 HH:mm:ss.fff");
var message = new NewLife.RocketMQ.Protocol.Message()
{
BodyString = "测试第一条数据",
Keys = (i++).ToString(),
Tags = i % 2 == 0 ? "even" : "odd",
Flag = 0,
WaitStoreMsgOK = true
};
//发送消息(生产消息)
var sr = mq.Publish(message);
//string log = $"发送成功的消息,内容>{content},MsgId={sr.MsgId},BrokerName= {sr.Queue.BrokerName} ,QueueId={sr.Queue.QueueId},QueueOffset= {sr.QueueOffset}";
Console.WriteLine(message.ToString());
Task.Delay(TimeSpan.FromSeconds(10)).Wait();
}
}
消费
附代码
static void Main(string[] args)
{
//测试消费消息
var consumer = new NewLife.RocketMQ.Consumer
{
Topic = "topic1",
// Group = "CID_ONSAPI_OWNER",
NameServerAddress = "127.0.0.1:9876",
//设置每次接收消息只拉取一条信息
BatchSize = 1,
//FromLastOffset = true,
//SkipOverStoredMsgCount = 0,
//BatchSize = 20,
//Log = NewLife.Log.XTrace.Log,
};
consumer.OnConsume = (q, ms) =>
{
string mInfo = $"BrokerName={q.BrokerName},QueueId={q.QueueId},Length={ms.Length}";
Console.WriteLine(mInfo);
foreach (var item in ms.ToList())
{
string msg = $"消息:msgId={item.MsgId},key={item.Keys},产生时间【{item.BornTimestamp.ToDateTime()}】,内容>{item.Body.ToStr()}";
Console.WriteLine(msg);
}
// return false;//通知消息队:不消费消息
return true; //通知消息队:消费了消息
};
consumer.Start();
Console.WriteLine("消息接收测试");
Console.ReadLine();
}
标签:core,Console,string,sr,item,WriteLine,var,NET,rocketmq 来源: https://www.cnblogs.com/yunnn/p/16386810.html