目录
- 1.介绍
- 2.安装
-
- 3.AMQP-CPP 简单使用
-
- 4.类与接口
-
- 5.使用
-
- 1.publish.cc
- 2.consume.cc
- 3.makefile
1.介绍
RabbitMQ
:消息队列组件,实现两个客户端主机之间消息传输的功能(发布&订阅)
- 核心概念:交换机、队列、绑定、消息
- 交换机类型:
- 广播交换:当交换机收到消息,则将消息发布到所有绑定的队列中
- 直接交换:根据消息中的
bkey
与绑定的rkey
对比,一致则放入队列
- 主题交换:使用
bkey
与绑定的rkey
进行规则匹配,成功则放入队列
2.安装
1.RabbitMq
2.客户端库
- C语言库
- C++库
sudo apt install libev-dev git clone https://github.com/CopernicaMarketingSoftware/AMQP-CPP.gitcd AMQP-CPP/makemake install
- 如果安装时出现以下报错,则表示
ssl
版本出现问题
/usr/include/openssl/macros.h:147:4: error: #error \"OPENSSL_API_COMPAT expresses an impossible API compatibility level\" 147 | # error \"OPENSSL_API_COMPAT expresses an impossible API compatibility level\" | ^~~~~ In file included from /usr/include/openssl/ssl.h:18, from linux_tcp/openssl.h:20, from linux_tcp/openssl.cpp:12: /usr/include/openssl/bio.h:687:1: error: expected constructor, destructor, or type conversion before ‘DEPRECATEDIN_1_1_0’ 687 | DEPRECATEDIN_1_1_0(int BIO_get_port(const char *str, unsigned short *port_ptr))
- 解决方案:卸载当前的
ssl
库,重新进行修复安装
dpkg -l | grep sslsudo dpkg -P --force-all libevent-openssl-2.1-7sudo dpkg -P --force-all opensslsudo dpkg -P --force-all libssl-devsudo apt --fix-broken install
3.AMQP-CPP 简单使用
1.介绍
AMQP-CPP
是用于与RabbitMq
消息中间件通信的C++库
- 它能解析从
RabbitMq
服务发送来的数据,也可以生成发向RabbitMq
的数据包
AMQP-CPP
库不会向RabbitMq
建立网络连接,所有的网络IO由用户完成
AMQP-CPP
提供了可选的网络层接口,它预定义了TCP
模块,用户就不用自己实现网络IO,
- 也可以选择
libevent、libev、libuv、asio
等异步通信组件, 需要手动安装对应的组件
AMQP-CPP
完全异步,没有阻塞式的系统调用,不使用线程就能够应用在高性能应用中
- 注意:它需要C++17的支持
2.使用
AMQP-CPP
的使用有两种模式:
- 使用默认的
TCP
模块进行网络通信
- 使用扩展的
libevent、libev、libuv、asio
异步通信组件进行通信
- 此处以
libev
为例,不需要自己实现monitor
函数,可以直接使用AMQP::LibEvHandler
4.类与接口
1.Channel
channel
是一个虚拟连接,一个连接上可以建立多个通道
- 并且所有的
RabbitMq
指令都是通过channel
传输
- 因为所有操作是异步的,所以在
channel
上执行指令的返回值并不能作为操作执行结果
- 实际上它返回的是
Deferred
类,可以使用它安装处理函数
namespace AMQP { using SuccessCallback = std::function<void()>; using ErrorCallback = std::function<void(const char *message)>; using FinalizeCallback = std::function<void()>; using QueueCallback = std::function<void(const std::string &name, uint32_t messagecount, uint32_t consumercount)>; using DeleteCallback = std::function<void(uint32_t deletedmessages)>; using MessageCallback = std::function<void(const Message &message, uint64_t deliveryTag, bool redelivered)>; using AckCallback = std::function<void(uint64_t deliveryTag, bool multiple)>; using PublishAckCallback = std::function<void()>; using PublishNackCallback = std::function<void()>; using PublishLostCallback = std::function<void()>; class Channel { Channel(Connection *connection); bool connected(); Deferred &declareExchange(const std::string_view &name, ExchangeType type, int flags, const Table &arguments); DeferredQueue &declareQueue(const std::string_view &name, int flags, const Table &arguments); Deferred &bindQueue(const std::string_view &exchange, const std::string_view &queue, const std::string_view &routingkey, const Table &arguments); bool publish(const std::string_view &exchange, const std::string_view &routingKey, const std::string &message, int flags = 0); DeferredConsumer &consume(const std::string_view &queue, const std::string_view &tag, int flags, const Table &arguments); bool ack(uint64_t deliveryTag, int flags=0); }; class DeferredConsumer { DeferredConsumer &onSuccess(const ConsumeCallback& callback); DeferredConsumer &onReceived(const MessageCallback& callback); DeferredConsumer &onMessage(const MessageCallback& callback); DeferredConsumer &onCancelled(const CancelCallback& callback); }; class Message : public Envelope { const std::string &exchange(); const std::string &routingkey(); }; class Envelope : public MetaData { const char *body(); uint64_t bodySize(); };}
2.ev
typedef struct ev_async { EV_WATCHER (ev_async); EV_ATOMIC_T sent; }ev_async; enum { EVBREAK_CANCEL = 0, EVBREAK_ONE = 1, EVBREAK_ALL = 2 }; struct ev_loop *ev_default_loop (unsigned int flags EV_CPP (= 0));# define EV_DEFAULT ev_default_loop (0) int ev_run (struct ev_loop *loop);void ev_break (struct ev_loop *loop, int32_t break_type) ; void (*callback)(struct ev_loop *loop, ev_async *watcher, int32_t revents); void ev_async_init(ev_async *w, callback cb);void ev_async_start(struct ev_loop *loop, ev_async *w); void ev_async_send(struct ev_loop *loop, ev_async *w);
5.使用
1.publish.cc
#include #include #include #include #include int main(){ auto *loop = EV_DEFAULT; AMQP::LibEvHandler handler(loop); AMQP::Address address(\"amqp://root:SnowK8989@127.0.0.1:5672/\"); AMQP::TcpConnection connection(&handler, address); AMQP::TcpChannel channel(&connection); channel.declareExchange(\"test-exchange\", AMQP::ExchangeType::direct) .onError([](const char *message) { std::cout << \"声明交换机失败: \" << message << std::endl; }) .onSuccess([]() { std::cout << \"test-exchange 交换机创建成功\" << std::endl; }); channel.declareQueue(\"test-queue\") .onError([](const char *message) { std::cout << \"声明队列失败: \" << message << std::endl; }) .onSuccess([]() { std::cout << \"test-queue 队列创建成功\" << std::endl; }); channel.bindQueue(\"test-exchange\", \"test-queue\", \"test-queue-key\") .onError([](const char *message) { std::cout << \"test-exchange - test-queue 绑定失败: \" \\ << message << std::endl; }) .onSuccess([]() { std::cout << \"test-exchange - test-queue 绑定成功\" << std::endl; }); for (int i = 0; i < 5; ++i) { std::string msg = \"Hello SnowK-\" + std::to_string(i); if(channel.publish(\"test-exchange\", \"test-queue-key\", msg) == false) { std::cout << \"publish 失败\" << std::endl; } } ev_run(loop, 0); return 0;}
2.consume.cc
#include #include #include #include #include void MessageCB(AMQP::TcpChannel* channel, const AMQP::Message& message, uint64_t deliveryTag, bool redelivered){ std::string msg; msg.assign(message.body(), message.bodySize()); std::cout << msg << std::endl; channel->ack(deliveryTag);}int main(){ auto *loop = EV_DEFAULT; AMQP::LibEvHandler handler(loop); AMQP::Address address(\"amqp://root:SnowK8989@127.0.0.1:5672/\"); AMQP::TcpConnection connection(&handler, address); AMQP::TcpChannel channel(&connection); channel.declareExchange(\"test-exchange\", AMQP::ExchangeType::direct) .onError([](const char *message) { std::cout << \"声明交换机失败: \" << message << std::endl; }) .onSuccess([]() { std::cout << \"test-exchange 交换机创建成功\" << std::endl; }); channel.declareQueue(\"test-queue\") .onError([](const char *message) { std::cout << \"声明队列失败: \" << message << std::endl; }) .onSuccess([]() { std::cout << \"test-queue 队列创建成功\" << std::endl; }); channel.bindQueue(\"test-exchange\", \"test-queue\", \"test-queue-key\") .onError([](const char *message) { std::cout << \"test-exchange - test-queue 绑定失败: \" \\ << message << std::endl; }) .onSuccess([]() { std::cout << \"test-exchange - test-queue 绑定成功\"; }); auto callback = std::bind(MessageCB, &channel, std::placeholders::_1, std::placeholders::_2, std::placeholders::_3); channel.consume(\"test-queue\", \"consume-tag\") .onReceived(callback) .onError([](const char *message) { std::cout << \"订阅 test-queue 队列消息失败: \" << message << std::endl; exit(0); }); ev_run(loop, 0); return 0;}
3.makefile
all: publish consumepublish: publish.ccg++ -o $@ $^ -lamqpcpp -lev -std=c++17consume: consume.ccg++ -o $@ $^ -lamqpcpp -lev -std=c++17.PHONY:cleanclean:rm publish consume