其他分享
首页 > 其他分享> > go使用rabbitmq

go使用rabbitmq

作者:互联网

rabbitmq是一款消息中间件,采用erlang语言编写。基于AMQP协议来实现。AMQP的主要特征是面向消息、队列、路由(包括点对点和发布/订阅)、可靠性、安全。AMQP协议更多用在企业系统内,对数据一致性、稳定性和可靠性要求很高的场景,对性能和吞吐量的要求还在其次。消息的消费者被动拉取(rabbitMQ 推送消息给消费者)。

基本概念包括:vhost,producer,exchange,queue(设定routingkey来绑定exchange和queue),consumer。

  vhost是rabbitmq虚拟的“服务器”,比如“/”,“/user”。是单独的,互不干扰。   topic模式下,定义routingkey规则:  发送到topic类型交换机的消息的routing_key不能随便设置–它必须是多个单词组成,用点分割。单词可以是任意的,但它们通常指定连接到该消息的某些功能。例如:“stock.usd.nyse”,“nyse.vmw”,“quick.orange.rabbit”。路由关键字可包含任意多的单词,但最高限制是255字节。

     绑定的关键字必须有相同的格式。topic交换机和direct交换的逻辑是相似的–拥有特定的路由关键字的消息将被发送到所有匹配关键字的队列。然而,绑定关键字有两个特殊的情况:
(1)* (星号) 可以代替一个完整的单词.
(2)# (井号) 可以代替零个或多个单词.

  以下是go使用rabbitmq基本操作:  
package rabbitmq

import (
    "errors"
    "github.com/streadway/amqp"
    "log"
    "strings"
)

//定义全局mqmap
var RabbitMqMap = make(map[string]*rabbitMQ)

// RabbitMQ 用于管理和维护rabbitmq的对象
type rabbitMQ struct {
    //wg           sync.WaitGroup
    channel *amqp.Channel
    mqConn  *amqp.Connection
}

//连接mq,config中可配置channel连接的容量和心跳时长
//默认为:
/**
maxChannelMax = (2 << 15) - 1
defaultHeartbeat         = 10 * time.Second
defaultConnectionTimeout = 30 * time.Second
defaultProduct           = "https://github.com/streadway/amqp"
defaultVersion           = "β"
// Safer default that makes channel leaks a lot easier to spot
// before they create operational headaches. See https://github.com/rabbitmq/rabbitmq-server/issues/1593.
defaultChannelMax = (2 << 10) - 1
*/
func (mq *rabbitMQ) connToMq(url string, config *amqp.Config) (rabbitMq *rabbitMQ, err error) {
    mq.mqConn, err = amqp.DialConfig(url, *config)
    if err != nil {
        return
    }
    mq.channel, err = mq.mqConn.Channel()
    mq.mqConn.Channel()
    if err != nil {
        return
    }
    return mq, nil
}

//直接初始化队列
func (mq *rabbitMQ) PrepareQueue(queueName string) (queue amqp.Queue, err error) {
    if queueName == "" {
        return queue, errors.New("queueName为空")
    }
    queue, err = mq.channel.QueueDeclare(
        queueName, //name
        true,      //durable,是否持久化,默认持久需要根据情况选择
        false,     //delete when unused
        false,     //exclusive
        false,     //no-wait
        nil,       //arguments
    )
    return
}

// prepareExchange 准备rabbitmq的Exchange
func (mq *rabbitMQ) PrepareExchange(exchangeName, exchangeType string) error {
    if exchangeName == "" {
        return errors.New("exchangeName为空")
    }
    err := mq.channel.ExchangeDeclare(
        exchangeName, // exchange
        exchangeType, // type
        true,         // durable 是否持久化,默认持久需要根据情况选择
        false,        // autoDelete
        false,        // internal
        false,        // noWait
        nil,          // args
    )

    if nil != err {
        return err
    }

    return nil
}

//通过exchange发送消息
func (mq *rabbitMQ) ExchangeSend(exchangeName, routingKey string, publishing amqp.Publishing) error {

    return mq.channel.Publish(
        exchangeName, //exchangeName
        routingKey,   //routing key
        true,         //mandatory
        false,        //immediate
        publishing,
    )
}

//通过队列发送消息
func (mq *rabbitMQ) QueueSend(queueName string, publishing amqp.Publishing) error {

    return mq.channel.Publish(
        "",        //exchangeName
        queueName, //queue name
        false,     //mandatory
        false,     //immediate
        publishing,
    )

}

//消费队列,内部方法会阻塞,使用时需要单独启用一个线程处理,常驻后台执行
func (mq *rabbitMQ) QueueConsume(queueName, consumer string) (delivery <-chan amqp.Delivery, err error) {
    err = mq.channel.Qos(1, 0, true)
    if err != nil {
        log.Fatal("Queue Consume: ", err.Error())
        return nil, err
    }
    //后期可调整参数
    delivery, err = mq.channel.Consume(
        queueName, // queue
        consumer, // consumer
        false, // auto-ack
        false, // exclusive
        false, // no-local
        false, // no-wait
        nil,   // args
    )
    if err != nil {
        log.Fatal("Queue Consume: ", err.Error())
        return nil, err
    }
    return delivery, nil
}

//队列绑定exchange
func (mq *rabbitMQ) QueueBindExchange(queueName, routingKey, exchangeName string) error {
    return mq.channel.QueueBind(queueName, routingKey, exchangeName, false, nil)
}

//关闭连接
func Close() {
    for k := range RabbitMqMap {
        RabbitMqMap[k].channel.Close()
        RabbitMqMap[k].mqConn.Close()
    }
}

func InitRabbitMq() {
    var (
        mq     rabbitMQ
        config amqp.Config
        err    error
    )
   //此处可定义多个配置,可调整
        dsn := "amqp://guest:guest@localhost:5672/"
        config.Vhost = "/"
        RabbitMqMap["mq"], err = mq.connToMq(dsn, &config)if err != nil {
            log.Fatal("[rabbit-mq] connect to rabbit-mq error:" + err.Error())
        } else {
            log.Println("[rabbit-mq] connect success")
        }

}

//获取连接
func GetRabbitConn(name ...string) *rabbitMQ {
    rabbitName := ""
    if len(name) > 0 {
        rabbitName = strings.ToLower(name[0])
    }
    if rabbitName == "" {
        return RabbitMqMap["mq"]
    }
    return RabbitMqMap[rabbitName]
}

 

producer,生产消息:

package main

import (
    "fmt"
    "github.com/streadway/amqp"
    "log"
    "strconv"
    "test01/mq/rabbitmq"
    "time"
)


func product() {
    rabbitmq.InitRabbitMq()
    mq := rabbitmq.RabbitMqMap["mq"]
// 可以定义基本的exchange类型,topic(模糊匹配),direct err := mq.PrepareExchange("topic_exchange", "topic") if err != nil { log.Fatal("准备交换机出错", err) } queue, err := mq.PrepareQueue("test-qq") if err != nil { fmt.Println("queue初始化失败", err.Error()) log.Fatal(queue.Name) } queue2, err := mq.PrepareQueue("test-qq2") if err != nil { fmt.Println("queue2初始化失败", err.Error()) log.Fatal(queue2.Name) } queue3, err := mq.PrepareQueue("test-qq3") if err != nil { fmt.Println("queue3初始化失败", err.Error()) log.Fatal(queue3.Name) } if err := mq.QueueBindExchange("test-qq", "wodekey.log.info", "topic_exchange"); err != nil { log.Fatal("队列绑定交换机出错", err) } if err := mq.QueueBindExchange("test-qq2", "wodekey.log.debug", "topic_exchange"); err != nil { log.Fatal("队列绑定交换机出错2", err) } if err := mq.QueueBindExchange("test-qq3", "wodekey.log.error", "topic_exchange"); err != nil { log.Fatal("队列绑定交换机出错2", err) } for i := 0; i < 1000; i++ { //mq.QueueSend("test-qq", amqp.Publishing{ // AppId: "", // ContentType: "application/json", // MessageId: "你好", // Body: []byte("这是我的消息:" + strconv.Itoa(i)), //}) //fmt.Println("发送成功: test-qq ", i) //time.Sleep(2 * time.Second) //mq.QueueSend("test-qq2", amqp.Publishing{ // AppId: "", // ContentType: "application/json", // MessageId: "你好啊", // Body: []byte("这是我的消息2:" + strconv.Itoa(i)), //}) mq.ExchangeSend("topic_exchange", "wodekey.random", amqp.Publishing{ ContentType: "application/json", Body: []byte("这是我的消息哦" + strconv.Itoa(i)), }) fmt.Println("发送成功: exchange ", i) time.Sleep(1 * time.Second) } } func main() { product() time.Sleep(1000000 * time.Second) }

 

consumer,启动三个线程来消费:

package main

import (
    "fmt"
    "log"
    "test01/mq/rabbitmq"
    "time"
)

func consumer() {
    rabbitmq.InitRabbitMq()
    mq := rabbitmq.RabbitMqMap["mq2"]
    if err := mq.QueueBindExchange("test-qq", "*.log.info", "topic_exchange"); err != nil {
        log.Fatal("队列绑定交换机出错", err)
    }
    if err := mq.QueueBindExchange("test-qq2", "*.log.debug", "topic_exchange"); err != nil {
        log.Fatal("队列绑定交换机出错2", err)
    }
    if err := mq.QueueBindExchange("test-qq3", "*.log.error", "topic_exchange"); err != nil {
        log.Fatal("队列绑定交换机出错2", err)
    }
    go func() {
        resu, err := mq.QueueConsume("test-qq", "consumer1")
        if err != nil {
            fmt.Println("消费错误", err.Error())
            log.Fatal("test-qq消费错误")
        }
        for d := range resu {
            fmt.Println(d.ConsumerTag + " test-qq消费成功:", string(d.Body))

            d.Ack(true) //需手动应答
            time.Sleep(2 * time.Second)
        }
    }()

    go func() {
        resu, err := mq.QueueConsume("test-qq2", "consumer002")
        if err != nil {
            fmt.Println("消费错误", err.Error())
            log.Fatal("test-qq2消费错误")
        }
        for d := range resu {
            fmt.Println(d.ConsumerTag + " test-qq2消费成功:", string(d.Body))
            d.Ack(true) //需手动应答
            time.Sleep(2 * time.Second)
        }
    }()

    go func() {
        resu, err := mq.QueueConsume("test-qq3", "consumer003")
        if err != nil {
            fmt.Println("消费错误", err.Error())
            log.Fatal("test-qq3消费错误")
        }
        for d := range resu {
            fmt.Println(d.ConsumerTag + " test-qq3消费成功:", string(d.Body))
            d.Ack(true) //需手动应答
            time.Sleep(2 * time.Second)
        }
    }()
}

func main() {
    consumer()

    time.Sleep(10000000* time.Second)
}

 

 


标签:log,err,time,rabbitmq,test,mq,使用,go,Fatal
来源: https://www.cnblogs.com/ahgo/p/16360282.html