go使用rabbitmq
作者:互联网
rabbitmq是一款消息中间件,采用erlang语言编写。基于AMQP协议来实现。AMQP的主要特征是面向消息、队列、路由(包括点对点和发布/订阅)、可靠性、安全。AMQP协议更多用在企业系统内,对数据一致性、稳定性和可靠性要求很高的场景,对性能和吞吐量的要求还在其次。消息的消费者被动拉取(rabbitMQ 推送消息给消费者)。
基本概念包括:vhost,producer,exchange,queue(设定routingkey来绑定exchange和queue),consumer。
vhost是rabbitmq虚拟的“服务器”,比如“/”,“/user”。是单独的,互不干扰。 topic模式下,定义routingkey规则: 发送到topic类型交换机的消息的routing_key不能随便设置–它必须是多个单词组成,用点分割。单词可以是任意的,但它们通常指定连接到该消息的某些功能。例如:“stock.usd.nyse”,“nyse.vmw”,“quick.orange.rabbit”。路由关键字可包含任意多的单词,但最高限制是255字节。绑定的关键字必须有相同的格式。topic交换机和direct交换的逻辑是相似的–拥有特定的路由关键字的消息将被发送到所有匹配关键字的队列。然而,绑定关键字有两个特殊的情况:
(1)* (星号) 可以代替一个完整的单词.
(2)# (井号) 可以代替零个或多个单词.
以下是go使用rabbitmq基本操作:
package rabbitmq import ( "errors" "github.com/streadway/amqp" "log" "strings" ) //定义全局mqmap var RabbitMqMap = make(map[string]*rabbitMQ) // RabbitMQ 用于管理和维护rabbitmq的对象 type rabbitMQ struct { //wg sync.WaitGroup channel *amqp.Channel mqConn *amqp.Connection } //连接mq,config中可配置channel连接的容量和心跳时长 //默认为: /** maxChannelMax = (2 << 15) - 1 defaultHeartbeat = 10 * time.Second defaultConnectionTimeout = 30 * time.Second defaultProduct = "https://github.com/streadway/amqp" defaultVersion = "β" // Safer default that makes channel leaks a lot easier to spot // before they create operational headaches. See https://github.com/rabbitmq/rabbitmq-server/issues/1593. defaultChannelMax = (2 << 10) - 1 */ func (mq *rabbitMQ) connToMq(url string, config *amqp.Config) (rabbitMq *rabbitMQ, err error) { mq.mqConn, err = amqp.DialConfig(url, *config) if err != nil { return } mq.channel, err = mq.mqConn.Channel() mq.mqConn.Channel() if err != nil { return } return mq, nil } //直接初始化队列 func (mq *rabbitMQ) PrepareQueue(queueName string) (queue amqp.Queue, err error) { if queueName == "" { return queue, errors.New("queueName为空") } queue, err = mq.channel.QueueDeclare( queueName, //name true, //durable,是否持久化,默认持久需要根据情况选择 false, //delete when unused false, //exclusive false, //no-wait nil, //arguments ) return } // prepareExchange 准备rabbitmq的Exchange func (mq *rabbitMQ) PrepareExchange(exchangeName, exchangeType string) error { if exchangeName == "" { return errors.New("exchangeName为空") } err := mq.channel.ExchangeDeclare( exchangeName, // exchange exchangeType, // type true, // durable 是否持久化,默认持久需要根据情况选择 false, // autoDelete false, // internal false, // noWait nil, // args ) if nil != err { return err } return nil } //通过exchange发送消息 func (mq *rabbitMQ) ExchangeSend(exchangeName, routingKey string, publishing amqp.Publishing) error { return mq.channel.Publish( exchangeName, //exchangeName routingKey, //routing key true, //mandatory false, //immediate publishing, ) } //通过队列发送消息 func (mq *rabbitMQ) QueueSend(queueName string, publishing amqp.Publishing) error { return mq.channel.Publish( "", //exchangeName queueName, //queue name false, //mandatory false, //immediate publishing, ) } //消费队列,内部方法会阻塞,使用时需要单独启用一个线程处理,常驻后台执行 func (mq *rabbitMQ) QueueConsume(queueName, consumer string) (delivery <-chan amqp.Delivery, err error) { err = mq.channel.Qos(1, 0, true) if err != nil { log.Fatal("Queue Consume: ", err.Error()) return nil, err } //后期可调整参数 delivery, err = mq.channel.Consume( queueName, // queue consumer, // consumer false, // auto-ack false, // exclusive false, // no-local false, // no-wait nil, // args ) if err != nil { log.Fatal("Queue Consume: ", err.Error()) return nil, err } return delivery, nil } //队列绑定exchange func (mq *rabbitMQ) QueueBindExchange(queueName, routingKey, exchangeName string) error { return mq.channel.QueueBind(queueName, routingKey, exchangeName, false, nil) } //关闭连接 func Close() { for k := range RabbitMqMap { RabbitMqMap[k].channel.Close() RabbitMqMap[k].mqConn.Close() } } func InitRabbitMq() { var ( mq rabbitMQ config amqp.Config err error ) //此处可定义多个配置,可调整 dsn := "amqp://guest:guest@localhost:5672/" config.Vhost = "/" RabbitMqMap["mq"], err = mq.connToMq(dsn, &config)if err != nil { log.Fatal("[rabbit-mq] connect to rabbit-mq error:" + err.Error()) } else { log.Println("[rabbit-mq] connect success") } } //获取连接 func GetRabbitConn(name ...string) *rabbitMQ { rabbitName := "" if len(name) > 0 { rabbitName = strings.ToLower(name[0]) } if rabbitName == "" { return RabbitMqMap["mq"] } return RabbitMqMap[rabbitName] }
producer,生产消息:
package main import ( "fmt" "github.com/streadway/amqp" "log" "strconv" "test01/mq/rabbitmq" "time" ) func product() { rabbitmq.InitRabbitMq() mq := rabbitmq.RabbitMqMap["mq"]
// 可以定义基本的exchange类型,topic(模糊匹配),direct err := mq.PrepareExchange("topic_exchange", "topic") if err != nil { log.Fatal("准备交换机出错", err) } queue, err := mq.PrepareQueue("test-qq") if err != nil { fmt.Println("queue初始化失败", err.Error()) log.Fatal(queue.Name) } queue2, err := mq.PrepareQueue("test-qq2") if err != nil { fmt.Println("queue2初始化失败", err.Error()) log.Fatal(queue2.Name) } queue3, err := mq.PrepareQueue("test-qq3") if err != nil { fmt.Println("queue3初始化失败", err.Error()) log.Fatal(queue3.Name) } if err := mq.QueueBindExchange("test-qq", "wodekey.log.info", "topic_exchange"); err != nil { log.Fatal("队列绑定交换机出错", err) } if err := mq.QueueBindExchange("test-qq2", "wodekey.log.debug", "topic_exchange"); err != nil { log.Fatal("队列绑定交换机出错2", err) } if err := mq.QueueBindExchange("test-qq3", "wodekey.log.error", "topic_exchange"); err != nil { log.Fatal("队列绑定交换机出错2", err) } for i := 0; i < 1000; i++ { //mq.QueueSend("test-qq", amqp.Publishing{ // AppId: "", // ContentType: "application/json", // MessageId: "你好", // Body: []byte("这是我的消息:" + strconv.Itoa(i)), //}) //fmt.Println("发送成功: test-qq ", i) //time.Sleep(2 * time.Second) //mq.QueueSend("test-qq2", amqp.Publishing{ // AppId: "", // ContentType: "application/json", // MessageId: "你好啊", // Body: []byte("这是我的消息2:" + strconv.Itoa(i)), //}) mq.ExchangeSend("topic_exchange", "wodekey.random", amqp.Publishing{ ContentType: "application/json", Body: []byte("这是我的消息哦" + strconv.Itoa(i)), }) fmt.Println("发送成功: exchange ", i) time.Sleep(1 * time.Second) } } func main() { product() time.Sleep(1000000 * time.Second) }
consumer,启动三个线程来消费:
package main import ( "fmt" "log" "test01/mq/rabbitmq" "time" ) func consumer() { rabbitmq.InitRabbitMq() mq := rabbitmq.RabbitMqMap["mq2"] if err := mq.QueueBindExchange("test-qq", "*.log.info", "topic_exchange"); err != nil { log.Fatal("队列绑定交换机出错", err) } if err := mq.QueueBindExchange("test-qq2", "*.log.debug", "topic_exchange"); err != nil { log.Fatal("队列绑定交换机出错2", err) } if err := mq.QueueBindExchange("test-qq3", "*.log.error", "topic_exchange"); err != nil { log.Fatal("队列绑定交换机出错2", err) } go func() { resu, err := mq.QueueConsume("test-qq", "consumer1") if err != nil { fmt.Println("消费错误", err.Error()) log.Fatal("test-qq消费错误") } for d := range resu { fmt.Println(d.ConsumerTag + " test-qq消费成功:", string(d.Body)) d.Ack(true) //需手动应答 time.Sleep(2 * time.Second) } }() go func() { resu, err := mq.QueueConsume("test-qq2", "consumer002") if err != nil { fmt.Println("消费错误", err.Error()) log.Fatal("test-qq2消费错误") } for d := range resu { fmt.Println(d.ConsumerTag + " test-qq2消费成功:", string(d.Body)) d.Ack(true) //需手动应答 time.Sleep(2 * time.Second) } }() go func() { resu, err := mq.QueueConsume("test-qq3", "consumer003") if err != nil { fmt.Println("消费错误", err.Error()) log.Fatal("test-qq3消费错误") } for d := range resu { fmt.Println(d.ConsumerTag + " test-qq3消费成功:", string(d.Body)) d.Ack(true) //需手动应答 time.Sleep(2 * time.Second) } }() } func main() { consumer() time.Sleep(10000000* time.Second) }
标签:log,err,time,rabbitmq,test,mq,使用,go,Fatal 来源: https://www.cnblogs.com/ahgo/p/16360282.html