在 Actor 模型中,消息是唯一的沟通方式,你要做任何事情都需要发送消息,不能直接更改 Actor 内部的状态或者使用其内部的资源。
Actor 模型简介 1、Actor 是 actor 系统中的最小单位,就类似对象是 OOP 系统中的最小单位一样。 2、也正如一个对象,一个 actor 的状态和行为也是被封装起来的。你不能直接去访问或执行一个 actor 的方法或者是成员对象(field),而只能是向这个 actor 发送一个要求访问其内部状态的消息,正如你不可能有读心术去了解别人心里是怎么想的,而只能问对方你的感受如何。 3、每一个 actor 都有一个邮箱用来接收消息,而它们的日常工作就是处理这些消息。 4、你与每个 actor 交互的方式就是向其发送一条不可变的消息,这些消息会进入到这个 actor 的邮箱中。 5、actor 也有自己的层级关系,类似一个公司的组织结构。一个 actor 会有父 actor 或者子 actor 或者兄弟 actor,正如一个高管会有董事长管他,会有其他高管与之一起合作,也会有自己的下属。这些组织结构就是为了实现一个目的:“代理委派”,让所有 actor 合作起来使系统正常运转。 6、最后一点就是 actor 模型的异常处理,当 actor 处理消息时,某些地方可能会出错,可能会抛出一些异常,那么此时怎么处理呢?通常做法是当前 actor 会停止其本身和其子 actor 的所有操作,然后向其父 actor 发送一条通知错误信息的消息,然后父节点根据不同情况做出处理,一般处理方式有以下几种:
恢复子 actor,所有状态保持不变
重启子 actor,所有状态清空,从零开始
终止子 actor (当一个 actor 终止的时候,其邮箱內的消息会进入 dead letter 邮箱)。
把错误抛给上一级 actor 处理
Actor 之间如何沟通?
一个基于 actor 的系统当然会包含很多 actor,那么它们之间是怎么沟通的呢,让通过一个 ping-pong 的例子来了解一下。
object extends App { val system = ActorSystem("PingPongSystem") val pong = system.actorOf(Props[Pong], name = "pong") val ping = system.actorOf(Props(new Ping(pong)), name = "ping") ping ! StartMessage }
case object PingMessage case object PongMessage case object StartMessage case object StopMessage
class Ping(pong: ActorRef) extends Actor { var count = 0 def incrementAndPrint{ count += 1; println("ping") } def receive = { case StartMessage => incrementAndPrint pong ! PingMessage case PongMessage => incrementAndPrint if (count > 99) { sender ! StopMessage println("ping stopped") context.stop(self) } else { sender ! PingMessage } case _ => println("Ping got something unexpected.") } }
class Pong extends Actor { def receive = { case PingMessage => println(" pong") sender ! PongMessage case StopMessage => println("pong stopped") context.stop(self) case _ => println("Pong got something unexpected.") } }
override def receive: Receive = { case ForceRestart => throw new Exception("Boom!") case _ => println("Kenny received a message") } }
输出如下:
entered the Kenny constructor sending kenny a simple String message Kenny: preStart Kenny received a message make kenny restart Kenny: preRestart MESSAGE: ForceRestart REASON: Boom! Kenny: postStop entered the Kenny constructor Kenny: postRestart REASON: Boom! Kenny: preStart [ERROR] [05/21/2018 23:21:39.109] [LifecycleDemo-akka.actor.default-dispatcher-4] [akka: java.lang.Exception: Boom! at ink.baixin.akka.example.Kenny$$anonfun$receive$1.applyOrElse(LifecycleDemo.scala:45) at akka.actor.Actor.aroundReceive(Actor.scala:517) at akka.actor.Actor.aroundReceive$(Actor.scala:515) at ink.baixin.akka.example.Kenny.aroundReceive(LifecycleDemo.scala:26) at akka.actor.ActorCell.receiveMessage(ActorCell.scala:590) at akka.actor.ActorCell.invoke(ActorCell.scala:559) at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:257) at akka.dispatch.Mailbox.run(Mailbox.scala:224) at akka.dispatch.Mailbox.exec(Mailbox.scala:234) at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) at akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) at akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
stopping kenny shutting down system Kenny: postStop
让我们来分析一下整个过程: 首先创建一个 actor,进入其构造器创建一个 actor 实例;然后启动这个 actor,启动之前会先调用该实例的 preStart 方法;之后我们向这个 actor 发送了一条 String 类型的消息,actor 调用其 receive 方法进行处理;然后我们向这个 actor 发送了 ForceRestart 的消息用于重启,该消息会抛出一个异常,当 actor 检测到异常时,就会启动自动重启机制,调用实例自身的 preRestart 方法,此时会终止该 actor 实例及其所以子 actor 然后调用 postStop 方法;这个时候之前的 actor 实例的生命周期就彻底结束了,会创建一个新的 actor 实例,创建之后调用这个新实例的 postRestart 方法,然后再调用其 preStart 方法,之后新的 actor 就启动了,最后我们通过 ActorSystem 停止了这个 actor 实例,调用该实例的 postStop 方法,这就是上面 actor 经历的完整生命周期。
可以看到,一个 actor restart 前后实际上是两个不同的实例。
Start and Stop an Actor
启动 actor
使用 ActorSystem.actorOf 启动 可以使用 ActorSystem 创建其他普通 actor
val system = ActorSystem("HelloSystem") // the actor is created and started here 大专栏