编程语言
首页 > 编程语言> > java-如何在Play Framework 2应用程序中存储Akka演员列表?

java-如何在Play Framework 2应用程序中存储Akka演员列表?

作者:互联网

我有一个Play框架2应用程序,可以接收数据并将其通过WebSockets发送到多个客户端.就像在this documentation中一样,我使用Akka actor来处理WebSocket.我还有一个WebSocketRouter类,该类扩展了UntypedActor并包含路由逻辑(确定客户端将系统接收的数据传递给哪个客户端).我知道我可以使用Akka的路由器功能,但这对我而言不是问题.问题是我必须存储所有活动客户端的列表.现在,我将其存储在WebSocketRouter类的静态列表中.这是编写概念验证原型的最快方法,但是它不是线程安全的,而且似乎也不是“ Akka方法”.
下面是一个简化的代码示例:

WebSocketController:

//This controller handles the creation of WebSockets.
public class WebSocketController extends Controller {
    public static WebSocket<String> index() {
        return WebSocket.withActor(new F.Function<ActorRef, Props>() {
            public Props apply(ActorRef out) throws Throwable {
                return MessageSender.props(out);
            }
        });
    }
}

MessageSender:

//Hold a reference to the auto-created Actor that handles WebSockets 
//and also registers and unregisters itself in the router.
public class  MessageSender extends UntypedActor {

    public static Props props(ActorRef out) {
        return Props.create(MessageSender.class, out);
    }

    private final ActorRef out;

    public MessageSender(ActorRef out) {
        this.out = out;
    }

    @Override
    public void preStart() {
        WebSocketRouter.addSender(getSelf());
    }

    @Override
    public void onReceive(Object message) throws Exception {
        if (message instanceof String) {
            out.tell(message, getSelf());
        }
        else {
            unhandled(message);
        }
    }

    public void postStop() {
        WebSocketRouter.removeSender(getSelf());
    }
}

WebSocketRouter:

public class WebSocketRouter extends UntypedActor {
    private static ArrayList<ActorRef> senders;
    static {
        senders = new ArrayList<>();
    }

    public static void addSender(ActorRef actorRef){
        senders.add(actorRef);
    }

    public static void removeSender(ActorRef actorRef){
        senders.remove(actorRef);
    }

    @Override
    public void onReceive(Object message) throws Exception {
        if (message instanceof String) {
            for (ActorRef sender : senders) {
                sender.tell(message, getSelf());
            }
        }
    }
}

再一次,我知道这是一个不好的解决方案,我正在寻找更好的解决方案.我已经考虑过创建一个线程安全的单例类,该类可以容纳当前连接.我还考虑过将当前连接的列表保存在某个Akka actor的实例中,并通过Akka消息修改该列表,但是为了这种方式,我必须静态地将ActorRef存储到该Actor,以便可以对其进行访问来自不同的ActorSystem.

解决我的问题最适合Akka意识形态的最佳方法是什么?

解决方法:

为何不静态引用Actor(在您的情况下为WebSocketRouter),为什么不提出一些消息来发送它呢?这样,参与者可以以一致的方式维护自己的内部状态.通过消息进行状态更改是Actor模型的主要优点之一.

在我开始编写代码之前,很抱歉,如果这不是100%准确的,我只使用了Scala版本的Akka,并基于对Akka Documentation的快速扫描.

因此,在您的情况下,我将定义一些对象以表示Join / Leave …

public class JoinMessage { } 
public class ExitMessage { } 

请注意,只有在您打算保持WebSocket打开并让用户停止收听路由器时,才真正需要ExitMessage.否则,路由器可以检测到Actor何时终止.

然后,您将更改MessageSender actor,使其在加入或离开聊天室时发送这些消息.

public class MessageSender extends UntypedActor {

    public static Props props(ActorRef out) {
        return Props.create(MessageSender.class, out);
    }

    private final ActorRef out;
    private final ActorRef router;

    public MessageSender(ActorRef out) {
        this.out = out;
        this.router= getContext().actorSelection("/Path/To/WebSocketRouter");
    }

    @Override
    public void preStart() {
        router.tell(new JoinMessage(), getSelf());
    }

    @Override
    public void onReceive(Object message) throws Exception {
        if (message instanceof String) {
            out.tell(message, getSelf());
        } else {
            unhandled(message);
        }
    }    
}

然后,您的路由器可以更改为在内部管理状态,而不是在Actor上公开内部方法(您知道这不好).

public class WebSocketRouter extends UntypedActor {
    private final Set<ActorRef> senders = new HashSet<>();

    private void addSender(ActorRef actorRef){
        senders.add(actorRef);
    }

    private void removeSender(ActorRef actorRef){
        senders.remove(actorRef);
    }

    @Override
    public void onReceive(Object message) throws Exception {
        if (message instanceof JoinMessage) {
            addSender(sender);
            getContext().watch(sender); // Watch sender so we can detect when they die.
        } else if (message instanceof Terminated) {
            // One of our watched senders has died.
            removeSender(sender);
        } else if (message instanceof String) {
            for (ActorRef sender : senders) {
                sender.tell(message, getSelf());
            }
        }
    }
}

再次,此代码使您可以了解如何通过使用Actor模型来完成此任务.很抱歉,如果Java并非100%准确,但是希望您能按照我的意图进行.

标签:playframework,akka,java
来源: https://codeday.me/bug/20191121/2051704.html