In actor4j
, the following important configuration options are available.
ActorSystemConfig config = ActorSystemConfig.builder()
.parallelism(1)
.parallelismFactor(1)
// .parkMode() [default] or .sleepMode() .yieldMode();
// ...
.build();
ActorSystem system = ActorSystem.create(ActorRuntime.factory(), config); // using factory method
ActorSystem system = ActorRuntime.create(config);
or
ActorSystem system = ActorSystem.create(ActorRuntime.factory());
ActorSystem system = ActorRuntime.create(); // with default config
On the one hand, the number of threads can be set with parallelism
and the scaling factor with parallelismFactor
:
Number of threads = parallelism * parallelismFactor
In addition, it can be defined by the configuration in which Modi the threads are operating if the situation occurs that temporarily no messages are received. The actor system is started with the method call:
system.start();
The actor system can be terminated with a controlled shutdown of all actors or neither. A stop directive is sent internally to all actors with a controlled shutdown. Through parameter transfer, it is possible to determine whether the calling thread waits until the shutdown of the actor system has been completely terminated.
system.shutdown(); // normal shutdown
system.shutdown(true); // shutdown and wait
system.shutdownWithActors(); // shutdown with actors
system.shutdownWithActors(true);
Actor4j
solves a controlled shutdown by sending a termination message to the user actor (father node of all actors, tree structure), resulting in the other subordinate actors terminating in a cascade form. The actors themselves are responsible for the orderly handling of their termination.
There are two possibilities for adding actors to the actor system. On the one hand, specifying the class and its constructor (is then instantiated by reflection) or via a factory method. Both variants are passed to a dependency injection container, which can instantiate the actors accordingly. Actors can be generated outside the actor system. These are automatically subordinated to the user actor (father of all user-generated actors). However, they can also be generated within an actor, but these are then child actors of the corresponding actor. After instantiation, the function returns a unique UUID
(unambiguous identification of the actor).
// over reflection
system.addActor(MyActor.class, "MyActor", ...);
// or using a factory method
system.addActor(new ActorFactory() {
@Override
public Actor create() {
return new MyActor();
}
});
// or in the context of an actor
addChild(MyActor.class, "MyActor", ...);
// or
UUID myActor = addChild( () -> new MyActor() );
Actors must derive from the class Actor
and implement the receive
method. In the example below, MyActor
waits for a message containing a String
and outputs it via a logger. Subsequently, the message is sent back to the sender. When a different message is received, a warning (unhandled (message)
) is outputted if debugUnhandled
has been set in the actor system. [1]
public class MyActor extends Actor {
@Override
public void receive(ActorMessage<?> message) {
if (message.value() instanceof String) {
logger().info(String.format(
"Received String message: %s", message.valueAsString()));
send(message, message.source());
}
else
unhandled(message);
}
}
/* Adapted for actor4j according to [1] */
Messages can be sent using the send
method. The following methods are available. Tell
offers a similar syntax to Akka
. A message can also be forwarded (forward
). Aliases are, among other things, available to access a remote actor in a simplified manner. Messages can also be sent with priority
(will be delivered with priority).
send(ActorMessage<?> message)
send(ActorMessage<?> message, UUID dest)
sendViaPath(ActorMessage<?> message, String path)
sendViaAlias(ActorMessage<?> message, String alias)
tell(T value, int tag, UUID dest)
tell(T value, int tag, UUID dest, String domain)
tell(T value, int tag, UUID dest, UUID interaction)
tell(T value, int tag, UUID dest, UUID interaction, String protocol)
tell(T value, int tag, UUID dest, UUID interaction, String protocol, String domain)
tell(T value, int tag, String alias)
tell(T value, int tag, String alias, String domain)
tell(T value, int tag, String alias, UUID interaction)
tell(T value, int tag, String alias, UUID interaction, String protocol)
tell(T value, int tag, String alias, UUID interaction, String protocol, String domain)
forward(ActorMessage<?> message, UUID dest)
priority(ActorMessage<?> message, UUID dest)
Messages that can be exchanged between the actors are of the ActorMessage<T>
type. A message (see Table 1) consists of the object to be transferred (payload of the message), a tag that helps differentiate between messages, the sender address of the message, the receiver address of the message, an interaction ID for messages related to an interaction protocol (conversation) and the associated domain of the message. Methods for sending messages within the actor can be called (see above).
The structure of the ActorMessage<T>
looks like that:
Type | Name | Description | |
---|---|---|---|
T | value | payload of the message | |
int | tag | tags help for differentiating between messages | |
UUID | source | source of the message | |
UUID | dest | destination of the message | |
UUID | interaction | associated interaction | |
String | protocol | interaction protocol | |
String | domain | domain of the message |
Tab. 1: Structure of the ActorMessage<T>
Pattern matching can be used with the ActorMessageMatcher
class to receive messages. This class was inspired by pattern matching in Scala
[2]. The message can be checked to match a tag, source, or class of the passed object (value). If a match is true, an action is triggered. This example is based on the top, except that this is expressed by the language means of the class ActorMessageMatcher
. Tags serve as a simple means of communication. ACK
would be such a tag.
public class MyActor extends Actor {
protected ActorMessageMatcher matcher;
protected final int ACK = 1;
@Override
public void preStart() {
matcher = new ActorMessageMatcher();
matcher
.match(String.class,
msg -> logger().info(String.format(
"Received String message: %s", msg.valueAsString())))
.match(ACK,
msg -> logger().info("ACK tag received"))
.matchAny(
msg -> send(msg, msg.dest()))
.matchElse(
msg -> unhandled(msg));
}
@Override
public void receive(ActorMessage<?> message) {
matcher.apply(message);
}
}
MatchAny
is constantly triggered, no matter the message received. If no match is found, MatchElse
is fired.
The message processing method receive
of an actor can be replaced by another method at runtime (HotSwap
to Akka
[3]). In the example below, the actor’s behavior is changed (on receipt of a tag SWAP
). Upon receipt of the following message, information about the received message is outputted. Finally, the behavior with unbecome
is returned to the original receive
method.
public class MyActor extends Actor {
protected final int SWAP=22;
@Override
public void receive(ActorMessage<?> message) {
if (message.tag() == SWAP)
become(msg -> {
logger().info(String.format(
"Received message: %s", msg));
unbecome();
}, false); // false -> putted on stack
else
unhandled(message);
}
}
// Initialize the actor system
ActorSystem system = ActorSystem.create(ActorRuntime.factory(), "Example");
// Creation of actor pong
UUID pong = system.addActor(() -> new Actor() {
@Override
public void receive(ActorMessage<?> message) {
// Receives message from ping
System.out.println(message.valueAsString());
// Sends message "pong" to ping
tell("pong", 0, message.source());
}
});
// Creation of actor ping
UUID ping = system.addActor(() -> new Actor() {
@Override
public void receive(ActorMessage<?> message) {
if (message.value()!=null)
// Receives message from pong
System.out.println(message.valueAsString());
// Sends message "ping" to pong
tell("ping", 0, pong);
}
});
// Starts the actor system
system.start();
// Sends a message to ping to start the ping-pong process
system.send(ActorMessage.create(null, 0, system.SYSTEM_ID(), ping));
// Lifetime for the ping-pong process
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}
// Wait until all actors are shut down
system.shutdownWithActors(true);
Fig. 1: Representation of the life cycle of an actor (adapted for actor4j
according to [1])
As already mentioned, actors are either instantiated via system.addActor(...)
or parentActor.addChild(...)
. Actors then receive a randomly generated (or persistent) UUID
as a unique identifier, with which they can then communicate with other actors (sending messages). An actor can also have an alternative identifier, the alias (for better legibility or when the UUID
is not previously known). The preStart
method is initially called by the actor’s first awakening. This method will be used for the first initializations of the actor. It will also be called the method recover
if the actor is a PersistentActor
. This method recovers then the state of the actor. An actor can also be restarted, usually triggered by an exception (see chapter Supervision). In this case, the old instance preRestart
is called first. Then a new instance is generated with the dependency injection container. The new instance replaces the old instance, and the method postRestart
is called by the new instance. The preRestart
and postRestart
methods are used so that the actor can react adequately to the restart situation. The marking (UUID
) of the original actor is retained. This also guarantees that references from other actors to this actor will stay valid. An actor can be stopped by calling the stop
method or receiving the STOP
or POISONPILL
message. [1][4]
An actor can also monitor another actor for that it has not yet terminated itself. If the observed actor is terminated, a message TERMINATED
is sent to the observer. An assignment is then made via message.source
, corresponding to the sender’s UUID
. With watch
, an observer can register with an actor and de-register with unwatch
. [1]
watch(UUID dest)
unwatch(UUID dest)
The life cycle and monitoring are mainly similar to Akka’s approach. Instead of a UUID
, an ActorRef
is returned when an actor is instantiated.
Fig. 2: OneForOne-Strategy
and OneForAll-Strategy
(cp. [5][6])
The supervisor actor monitors the child actors. In the event of an error, they are resumed or restarted, or stopped by them. Two strategies are foreseen (see Fig. 2). In the OneForOne-Strategy
, only the affected actor is considered. In the OneForAll-Strategy
, on the other hand, the affected actor is considered and the neighboring actors (below the supervisor actor). [5][6] The default strategy for actor4j
is a OneForOne-Strategy
[7] and is specified as:
public SupervisorStrategyDirective apply(Exception e) {
if (
e instanceof ActorInitializationException ||
e instanceof ActorKilledException)
return STOP;
else
return RESTART;
}
An ActorInitializationException
is thrown if an error occurs during the instantiation of an actor. An ActorKilledException
is triggered by an incoming KILL
message at the actor. An exception is deliberately provoked to activate the supervisor and its error handling strategy in this case. A restart is done by default for every exception; otherwise, the actor is stopped. The default strategy can be changed by overriding the supervisorStrategy
method. [1]
Fig. 3: Extended representation of the life cycle of an actor (cp. [8])
Actor4j
currently supports eight directives: RESUME
, STOP
, TERMINATED
, RESTART
, ESCALATE
, RECOVER
, ACTIVATE
and DEACTIVATE
(see also Fig. 3). Stopping and restarting of the actors is asynchronous.
RESUME
: In this case, the supervisor remains passive. The actor can continue his activities undisturbed [8].STOP
:
STOP
is sent (recursive process, if the children also have children) so they can terminate. Use of watch
, to observe that all children have terminated.postStop
.TERMINATED
: The actor is stopped.RESTART
:
PreRestart
is called at the current instance.STOP
is sent (recursive process, if the children also have children) so they can terminate. Use of watch
, to observe that all children have terminated.postStop
at the current instance; after all, children have finished and confirmed this with the TERMINATED
message.UUID
is maintained.postRestart
(with preStart
(with optional recover
) for the new instance.ESCALATE
: If a supervisor is unclear about the correct strategy in the event of a specific error, he can pass it on to his superior supervisor for clarification.RECOVER
: The actor will be recovered to its last state. Novel events can lead to an update of the actor’s state.ACTIVATE
and DEACTIAVTE
: Activates or deactivates the actor (messages will be or no longer be processed). The current explained directives remain deliverable, even when the actor is deactivated.Four important actors will be presented next, derived from the class Actor
. The class Actor
is abstract.
The use of this interface signals to the ActorSystem
that the correspondingly implemented actor is a member of a group. This is taken into account when distributing the actors to the threads. Actors belonging to a group are held together on a thread. The basic idea behind this has been explained in our paper (see chapter results and conclusion [9]). Inter-communication between threads is more expensive than pure intra-communication (within the same thread).
This class implements the interface ActorGroupMember
.
Workload tasks should not be performed within the ActorSystem
. Because they block the reactive system, and it is no longer responsive. Therefore the class ResourceActor
is provided. These special actors are executed in a separate thread pool, thus avoiding disturbances within the ActorSystem
. It should distinguish stateless (@Stateless
) and stateful (@Stateful
) actors. The advantage of this distinction lies in the fact that stateless actors can be executed in parallel.
A PseudoActor
is a mediator between the outside world and the ActorSystem
. It allows communication with the actors within the actor system from outside. Unlike the other actors, the PseudoActor
has its own message queue, in which the messages of different actors can then be stored by the ActorSystem
. The class PseudoActor
is derived from the class ActorWithRxStash
. To be able to process received messages, the run method must be started manually.
Now, the core components of actor4j
are presented from the user and developer perspectives. In particular, connections between the core components will be clarified.
The most important core components of actor4j
can be seen in the overview (Fig. 4). The average user of actor4j
will mainly work with the classes ActorSystem
and the different actor classes (Actor
, ActorGroupMember
, ActorWithRxStash
, ResourceActor
). Possibly, the user will change the supervisor’s strategy. A timer (ActorTimer
) should also be valuable in different situations. The ActorService
class is used for a potential server (see chapter, cluster configuration).
For a potential supporter or interested person, further details are helpful. The class ActorSystem
is a wrapper of the class ActorSystemImpl
. ActorSystemImpl
internally creates a map of the set ActorCell
. A wrapper is also the class Actor
from the class ActorCell
. ActorSystemImpl
uses the ActorExecutorService
to generate the ActorThreads
. ActorThreads
are executing the actors when they have received a message. The ActorMessageDispatcher
acts as a link between the actors during message passing. It places the new message in the appropriate queue of the ActorThread
. In the case of an actor error, the ActorThread
class is called the auxiliary class ActorStrategyOnFailure
, which then executes the defined strategy of the supervisor (OneForOneSupervisorStrategy
or OneForAllSupervisorStrategy
). Supervisor strategies can affect the RestartProtocol
and the StopProtocol
. These protocols can also be triggered by a corresponding message to the actor (RESTART
, STOP
or POISONPILL
). PseudoActor
has its own ActorCell
, which is called PseudoActorCell
. This also includes a separate queue for the purpose of communication between the actor system and the outside world (the PseudoActor
). Last but not least, again to the ActorMessageDispatcher
. The ActorBalancingOnCreation
class is used to distribute the actors on the ActorThreads
when the actor system is started. At runtime, the class ActorBalancingOnRuntime
is used.
Fig. 4: Class diagram of the core components of actor4j
Enclosed are two scruffy class diagrams maked with yuml.me
, have fun!
[1] Lightbend (2015). Actors. UntypedActor API. http://doc.akka.io/docs/akka/2.4/java/untyped-actors.html
[2] EPFL (2015). Pattern Matching. http://docs.scala-lang.org/tutorials/tour/pattern-matching.html
[3] Lightbend (2015). Actors. HotSwap. http://doc.akka.io/docs/akka/2.4/java/untyped-actors.html#untypedactor-hotswap
[4] Lightbend (2015). Persistence. http://doc.akka.io/docs/akka/2.4/java/persistence.html
[5] Joe Armstrong (2013). Programming Erlang. Software for a Concurrent World (Pragmatic Programmers). Pragmatic Bookshelf. Pages 398-399
[6] Lightbend (2015). Supervision and Monitoring. http://doc.akka.io/docs/akka/2.4/general/supervision.html
[7] Lightbend (2015). Fault tolerance. http://doc.akka.io/docs/akka/2.4/java/fault-tolerance.html
[8] Derek Wyatt (2013). AKKA Concurrency. Artima Inc. Pages 160-164.
[9] D. A. Bauer and J. Mäkiö, “Actor4j: A Software Framework for the Actor Model Focusing on the Optimization of Message Passing,” AICT 2018: The Fourteenth Advanced International Conference on Telecommunications, IARIA, Barcelona, Spain 2018, pp. 125-134, [Online]. Available from: http://www.thinkmind.org/articles/aict_2018_8_10_10087.pdf