编程语言
首页 > 编程语言> > java – 为集群环境创建列表

java – 为集群环境创建列表

作者:互联网

在我的应用程序中有一个列表publisherPostListenerList,它接收来自RabbitMQ队列的实时用户帖子,以发送给订阅者/消费者.该列表是ApplicationListener类的一个属性,它监听pubsub队列的事件.下面的控制器方法通过getter方法&获取列表元素.基于逻辑将帖子推送给订阅者.

流程如下

用户撰写帖子 – >帖子进入DB队列 – >来自Queue的消息被添加到listPostListenerList列表中,该列表被推送给用户的订户.

我们可以看到publisherPostListenerList是n个并发请求的公共列表,因为ApplicationListener是一个单例.对于单个实例,安装程序正常工作但在集群环境中会失败,因为每个节点都有自己的publisherPostListenerList列表.

我该如何处理这种情况?我不能使ApplicationListener类无状态我需要列表来存储从队列接收的post元素.我是否将列表放在分布式内存缓存中?或者还有其他传统方式吗?

ApplicationListener.java

@Component
public class ApplicationEventListener {

    private List<Post> publisherPostListenerList = new CopyOnWriteArrayList<Post>();

    private static final Logger logger = Logger.getLogger(ApplicationEventListener.class);

    @EventListener
    public void postSubmissionEventHandler(PostSubmissionEvent event) throws IOException {
        Post post = event.getPost();
        logger.debug("application published user post received " + post);
        publisherPostListenerList.add(post);
    }

    public List<Post> getPublisherPostListenerList() {
        return publisherPostListenerList;
    }

    public void setPublisherPostListenerList(List<Post> publisherPostListenerList) {
        this.publisherPostListenerList = publisherPostListenerList;
    }
}

用于将消息推送到订户的控制器方法

@RequestMapping(value="/getRealTimeServerPushUserPosts")
    public SseEmitter getRealTimeServerPushUserPosts(@RequestParam("userId") int userId){
        SseEmitter sseEmitter = new SseEmitter();
        CustomUserDetail myUserDetails = currentUserAccessor.getCurrentLoggedInUser();
        User loggedInUser=myUserDetails.getUser();

        List<Integer> userPublisherIDList = this.userService.loadUserPublisherIdListWhichLoggedInUserFollows(loggedInUser);
        List<Post> postList =eventListener.getPublisherPostListenerList();


        for(Integer userPublisherId : userPublisherIDList){
            for(Post post:postList){
                    if((userPublisherId.intValue()) == (post.getUser().getUserId().intValue())){
                        try {
                        sseEmitter.send(post);
                        postList.remove(post); //removes the post for all the subscribers as the list acts as a global list.
                    } catch (IOException e) {
                        logger.error(e);
                    }
                }
             }
         }
        return sseEmitter;
    }

解决方法:

您可以使用Hazelcast IList.它遵循j.u.List语义,适用于分布式/集群环境.

您可以找到文档here和示例here.
另一种选择是使用分布式地图aka IMap.

如果您对实施细节有具体问题,请与我们联系.

谢谢

标签:java,spring,rabbitmq,hazelcast,distributed-caching
来源: https://codeday.me/bug/20190701/1352016.html