java – 使用AmazonSQSClient消息消息缓慢
作者:互联网
因此,我在春季jms 50-100使用并发,允许最多连接高达200.一切都按预期工作但如果我尝试从队列中检索100k消息,我的意思是我的sqs上有100k消息,我通过弹簧读取它们jms正常的方法.
@JmsListener
Public void process (String message) {
count++;
Println (count);
//code
}
我在控制台中看到了所有日志,但是在大约17k之后,它开始抛出异常
像:aws sdk异常:端口已经在使用中.
为什么我会看到这个例外,怎么做.我摆脱它?
我试着在互联网上寻找它.找不到任何东西.
我的设定:
并发50-100
为每个任务设置消息:50
客户承认
timestamp=10:27:57.183, level=WARN , logger=c.a.s.j.SQSMessageConsumerPrefetch, message={ConsumerPrefetchThread-30} Encountered exception during receive in ConsumerPrefetch thread,
javax.jms.JMSException: AmazonClientException: receiveMessage.
at com.amazon.sqs.javamessaging.AmazonSQSMessagingClientWrapper.handleException(AmazonSQSMessagingClientWrapper.java:422)
at com.amazon.sqs.javamessaging.AmazonSQSMessagingClientWrapper.receiveMessage(AmazonSQSMessagingClientWrapper.java:339)
at com.amazon.sqs.javamessaging.SQSMessageConsumerPrefetch.getMessages(SQSMessageConsumerPrefetch.java:248)
at com.amazon.sqs.javamessaging.SQSMessageConsumerPrefetch.run(SQSMessageConsumerPrefetch.java:207)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
Caused by: com.amazonaws.SdkClientException: Unable to execute HTTP request: Address already in use: connect
更新:我找到了问题,似乎正在创建新的套接字,直到每个套接字都耗尽.
我的春季jms版本是4.3.10
要复制此问题,只需执行以上配置,最大连接为200,货币设置为50-100,并将大约40k消息推送到sqs队列.可以使用https://github.com/adamw/elasticmq作为本地堆栈服务器复制Amazon sqs ..之后在这里完成.注释jms监听器并使用soap ui加载测试并调用send消息来触发许多消息.仅仅因为你评论了@jmslistener注释,它就不会消耗来自队列的消息.一旦您看到已发送40k消息,请停止.取消注释@jmslistener并重新启动服务器.
更新:
DefaultJmsListenerContainerFactory factory =
new DefaultJmsListenerContainerFactory();
factory.setConnectionFactory(connectionFactory);
factory.setDestinationResolver(new DynamicDestinationResolver());
factory.setErrorHandler(Throwable::printStackTrace);
factory.setConcurrency("50-100");
factory.setSessionAcknowledgeMode(Session.CLIENT_ACKNOWLEDGE);
return factory;
更新:
SQSConnectionFactory connectionFactory = new SQSConnectionFactory( new ProviderConfiguration(), amazonSQSclient);
更新:
客户配置详情:
Protocol : HTTP
Max connections : 200
更新:
我似乎使用了缓存连接工厂类.我读了堆栈溢出和他们的官方文档,不使用缓存连接工厂类和默认的jms监听器容器工厂.
https://stackoverflow.com/a/21989895/5871514
它给出了我之前得到的相同错误.
更新
我的目标是获得500 tps,即我应该能够消耗那么多..所以我尝试了这种方法,似乎我可以达到100-200,但不超过那个..加上这个东西是高阻挡并发..如果你使用它..如果你有更好的解决方案来实现它..我都是耳朵.
**更新 **
我正在使用amazonsqsclient
解决方法:
对消费者的饥饿
JMS客户端倾向于实现的一种可能的优化是消息消耗缓冲区或“预取”.此缓冲区有时可通过消息数或缓冲区大小(以字节为单位)进行调整.
目的是防止消费者每次收到消息时都去服务器,而不是批量提取多个消息.
在您拥有许多“快速消费者”(这是这些库可能采用的观点视图)的环境中,此预取设置为稍高的默认值,以便最小化这些往返.
但是,在消息使用缓慢的环境中,此预取可能是一个问题.缓慢的消费者正在阻止来自更快消费者的那些预取消息的消息消费.在高度并发的环境中,这可能会导致饥饿.
在这种情况下,SQSConnectionFactory有一个property for this:
SQSConnectionFactory sqsConnectionFactory = new SQSConnectionFactory( new ProviderConfiguration(), amazonSQSclient);
sqsConnectionFactory.setNumberOfMessagesToPrefetch(0);
制片人的饥饿(即通过JmsTemplate)
这些JMS实现期望通过某个中介与代理接口是很常见的.这些中介实际上缓存和重用连接或使用池化机制来重用它们.在Java EE世界中,这通常由Java EE服务器上的JCA适配器或其他方法处理.
由于Spring JMS的工作方式,它期望ConnectionFactory的中间委托存在来执行此缓存/池化.否则,当Spring JMS想要连接到代理时,它会在每次想要对代理执行某些操作时尝试打开新的连接和会话(!).
为了解决这个问题,Spring提供了一些选择.最简单的是CachingConnectionFactory
,它缓存单个Connection,并允许在该Connection上打开许多Sessions.将此添加到上面的@Configuration的简单方法是:
@Bean
public ConnectionFactory connectionFactory(AmazonSQSClient amazonSQSclient) {
SQSConnectionFactory sqsConnectionFactory = new SQSConnectionFactory(new ProviderConfiguration(), amazonSQSclient);
// Doing the following is key!
CachingConnectionFactory connectionfactory = new CachingConnectionFactory();
connectionfactory.setTargetConnectionFactory(sqsConnectionFactory);
// Set the #connectionfactory properties to your liking here...
return connectionFactory;
}
如果你想要一些更奇特的东西作为JMS池解决方案(除了多个Sessions之外还会为你集合Connections和MessageProducers),你可以从他们的库中使用相当新的PooledJMS project的JmsPoolConnectionFactory等.
标签:java,spring-boot-2,amazon-sqs,spring-jms 来源: https://codeday.me/bug/20190522/1152501.html