其他分享
首页 > 其他分享> > RabbitMq学习笔记——RabbitMQ C的使用

RabbitMq学习笔记——RabbitMQ C的使用

作者:互联网

1、需要用到的参数:

  主机名:hostname、端口号:port、交换器:exchange、路由key:routingkey 、绑定路由:bindingkey、用户名:user、密码:psw,默认用户名和密码为guest。

2、步骤:

1)、建立TCP连接:

  conn = amqp_new_connection();

  socket = amqp_tcp_socket_new(conn);

2)、打开建立的TCP连接,使用socket,主机名和端口号:

  status = amqp_socket_open(socket, hostname, port);

3)、登录

  amqp_login(conn,"/",0,131072,0,AMQP_SASL_METHOD_PLAIN,"guest","guest");

4)、打开信道:

  amqp_channel_open(conn, 1);

  amqp_get_rpc_reply(conn);

5)、开始发送或者监听消息

6)、关闭信道、关闭连接、销毁连接:

  amqp_channel_close(conn, 1, AMQP_REPLY_SUCCESS);

  amqp_connection_close(conn, AMQP_REPLY_SUCCESS);

  amqp_destroy_connection(conn);

3、发送端

  pro文件

  HEADERS += \

  include\amqp.h \

  include\amqp_framing.h \

  include\amqp_tcp_socket.h

  SOURCE += \

    main.cpp \

  LIBS += $$PWD\lib\librabbitmq.4.dll.a

 

main.cpp文件

  #include <QtCore/QCoreApplication>

  #include <QDebug>

  #include "amqp.h"

  #include "amqp_framing.h"

  #include "amqp_tcp_socket.h"

  int main(int argc, char *argv[])
  {
    QCoreApplication a(argc, argv);

    const char* hostname = "localhost";

    int port = 5672;

    const char* exchange = "amq.direct";

    const char* routingkey = "test";

    const char* messagebody = "Hello World!";发送"Hello World!”

    amqp_connection_state_t  conn = amqp_new_connection();

    amqp_socket_t* socket = amqp_tcp_socket_new(conn);

    if (!socket)

    {
      qDebug() << "creating TCP socket";
    }

    int status = amqp_socket_open(socket, hostname, port);
    if (status)

    {
      qDebug() << "opening TCP socket";
    }

    amqp_login(conn, "/", 0, 131072, 0, AMQP_SASL_METHOD_PLAIN, "guest", "guest");
    amqp_channel_open(conn, 1);
    amqp_get_rpc_reply(conn);

    {
      amqp_basic_properties_t props;
      props._flags = AMQP_BASIC_CONTENT_TYPE_FLAG | AMQP_BASIC_DELIVERY_MODE_FLAG;
      props.content_type = amqp_cstring_bytes("text/plain");
      props.delivery_mode = 2; /* persistent delivery mode */
      amqp_basic_publish(conn,1,amqp_cstring_bytes(exchange),amqp_cstring_bytes(routingkey),0,0,&props,amqp_cstring_bytes(messagebody));
    }

    amqp_channel_close(conn, 1, AMQP_REPLY_SUCCESS);
    amqp_connection_close(conn, AMQP_REPLY_SUCCESS);
    amqp_destroy_connection(conn);

    return a.exec();
  }

4、接收端

  pro文件

  HEADERS += \

  include\amqp.h \

  include\amqp_framing.h \

  include\amqp_tcp_socket.h

  SOURCE += \

    main.cpp \

  LIBS += $$PWD\lib\librabbitmq.4.dll.a

 

main.cpp文件

  int main(int argc, char *argv[])

  {

    QApplication a(argc, argv);
int port = 5672;
const char* hostname = "192.168.250.223";
const char* exchange = "amq.direct";
const char* bindingkey = "test";

    amqp_connection_state_t conn = amqp_new_connection();
amqp_socket_t* socket = amqp_tcp_socket_new(conn);
    if(!socket)
{
qDebug()<<"creating TCP socket";
}

    int status = amqp_socket_open(socket,hostname,port);
if(status)
{
qDebug() << "opening TCP socket";
}
    amqp_login(conn,"/",0,131072,0,AMQP_SASL_METHOD_PLAIN,"guest","guest");
amqp_channel_open(conn,1);
amqp_get_rpc_reply(conn);
    amqp_queue_declare_ok_t *r = amqp_queue_declare(conn,1,amqp_empty_bytes,0,0,0,1,amqp_empty_table);
amqp_get_rpc_reply(conn);
    amqp_bytes_t queuename = amqp_bytes_malloc_dup(r->queue);
if(queuename.bytes == nullptr)
{
fprintf(stderr,"Out of memory while copying queue name");
return 1;
}
    amqp_queue_bind(conn,1,queuename,amqp_cstring_bytes(exchange),amqp_cstring_bytes(bindingkey),amqp_empty_table);
amqp_get_rpc_reply(conn);
amqp_basic_consume(conn,1,queuename,amqp_empty_bytes,0,1,0,amqp_empty_table);
amqp_get_rpc_reply(conn);

while(1)
{
    amqp_maybe_release_buffers(conn);
    amqp_envelope_t envelope;
    amqp_rpc_reply_t res = amqp_consume_message(conn,&envelope,nullptr,0);
    if(AMQP_RESPONSE_NORMAL != res.reply_type)
    {
      break;
    }
    printf("Delivery %u, exchange %.*s routingkey %.*s\n",
          static_cast<uint>(envelope.delivery_tag),
          static_cast<int>(envelope.exchange.len),
          static_cast<char*>(envelope.exchange.bytes),
          static_cast<int>(envelope.routing_key.len),
          static_cast<char*>(envelope.routing_key.bytes));
    if(envelope.message.properties._flags & AMQP_BASIC_CONTENT_TYPE_FLAG)
    {
      printf("Content-type: %.*s\n",
            static_cast<int>(envelope.message.properties.content_type.len),
static_cast<char*>(envelope.message.properties.content_type.bytes));
    }
    amqp_dump(envelope.message.body.bytes,envelope.message.body.len);
    amqp_destroy_envelope(&envelope);
}
    amqp_channel_close(conn,1,AMQP_REPLY_SUCCESS);
amqp_connection_close(conn,AMQP_REPLY_SUCCESS);
amqp_destroy_connection(conn);
Widget w;
w.show();
return a.exec();
}

标签:AMQP,socket,envelope,bytes,笔记,RabbitMq,RabbitMQ,conn,amqp
来源: https://www.cnblogs.com/zhangnianyong/p/11324932.html