编程语言
首页 > 编程语言> > php – RabbitMQ实现

php – RabbitMQ实现

作者:互联网

这里有一些我目前的设置.

> REST API将数据推送(队列)到队列中
> Queue有一个消费者,它始终在运行并生成到Exchange
> Exchange路由到其他几个队列(如20)
>每个(20)队列都执行特定任务(消费者总是运行)
> Cron作业运行以检查是否所有(20)任务都已完成并生成另一个队列

我不确定我喜欢消费者一直在运行,因为每个消费者使用大约300MB的Ram(我认为它是MB,现在不在我面前)而且我正在寻找另一个实现.

    M <-- Message coming from REST API
    |
    |
    +-First Queue
    |
    |
    | <-- The Exchange
   /|\
  / | \
 /  |  \ <-- bind to multiple queues ( 20+ )
Q1  Q2  Q3 <-- Each Queue is a task that must be completed


    | <-- CRON runs to check if all queues above have completed
    |
    |
    Q4 <-- Queues 1,2 and 3 must finish first before Queue 4 can start
    |
    C <-- Consumer 

我在下面的相关问题建议使用RPC,但问题是RPC(据我所知)将有多个实例.这是一个资源紧张的过程,我认为通过添加RPC调用它只会使服务器陷入困境然后变得没有响应(如果我错了请纠正我).

另一种方法是使用聚合器模式

> http://www.eaipatterns.com/Aggregator.html

这看起来正是我需要的,但我发现文档有限.有人做过这种模式吗?

我的问题是我对它目前的实施方式不满意,我正在寻找改进流程的方法.我正在寻找摆脱CRON,实现一个新的模式,并没有让消费者一直运行.

该过程目前还仅支持每个消费者的单个实例.它可以有多个消费者但我们如何实现它我们当时只想要一个.

这是使用RabbitMQBundle在PHP,Symfony2 Framework中实现的

相关问题:

> RabbitMQ wait for multiple queues to finish

解决方法:

OldSound在这里,是RabbitMQ Bundle的创造者.

bundle本身不支持开箱即用的Aggregator模式,但你可以使用底层的php-amqplib来实现它.

要执行聚合,您需要发布具有相关ID的消息和沿处理链的id的线程.然后,聚合器将根据您必须处理该特定任务的不同工作者的数量等待X消息量.等待消息的一种方法是让一个数组保留它们,因为它们被相关id索引.

因此,无论何时有传入消息,您都可以:

$correlation_id = $msg->get('correlation_id');
$this->receivedMessages[$correlation_id]['msgs'][] = $msg;

然后你去的地方:

if ($someNumber == count($this->receivedMessages[$correlation_id]['msgs']) {
// proceed to next step
}

我现在正在为Symfony开发一个Workflow捆绑包,我计划很快就开源.该捆绑包可用于以非常简单的方式实现您呈现的用例(即,您只需要为每个任务提供服务).

现在我想知道为什么每个消费者需要300 MB的RAM?你需要用它们运行完整的堆栈框架吗?如果可能的话,为消费者应用程序创建一个新的Symfony内核,只加载你需要的内容以减少开销.

标签:php,rabbitmq,amqp,design-patterns,message-queue
来源: https://codeday.me/bug/20190901/1782173.html