Configuration, Starting and Stopping the Actor System

In actor4j, the following important configuration options are available.

ActorSystemConfig config = ActorSystemConfig.builder()
	.parallelism(1)
	.parallelismFactor(1)
//	.parkMode() [default] or .sleepMode() .yieldMode();
//	...
	.build();

ActorSystem system = ActorSystem.create(ActorRuntime.factory(), config); // using factory method
ActorSystem system = ActorRuntime.create(config);
or
ActorSystem system = ActorSystem.create(ActorRuntime.factory());
ActorSystem system = ActorRuntime.create(); // with default config
	

On the one hand, the number of threads can be set with parallelism and the scaling factor with parallelismFactor:

Number of threads = parallelism * parallelismFactor

In addition, it can be defined by the configuration in which Modi the threads are operating if the situation occurs that temporarily no messages are received. The actor system is started with the method call:

system.start();

The actor system can be terminated with a controlled shutdown of all actors or neither. A stop directive is sent internally to all actors with a controlled shutdown. Through parameter transfer, it is possible to determine whether the calling thread waits until the shutdown of the actor system has been completely terminated.

system.shutdown(); // normal shutdown
system.shutdown(true); // shutdown and wait

system.shutdownWithActors(); // shutdown with actors
system.shutdownWithActors(true);

Actor4j solves a controlled shutdown by sending a termination message to the user actor (father node of all actors, tree structure), resulting in the other subordinate actors terminating in a cascade form. The actors themselves are responsible for the orderly handling of their termination.

Actors, Pattern Matching, and Behaviour

There are two possibilities for adding actors to the actor system. On the one hand, specifying the class and its constructor (is then instantiated by reflection) or via a factory method. Both variants are passed to a dependency injection container, which can instantiate the actors accordingly. Actors can be generated outside the actor system. These are automatically subordinated to the user actor (father of all user-generated actors). However, they can also be generated within an actor, but these are then child actors of the corresponding actor. After instantiation, the function returns a unique UUID (unambiguous identification of the actor).

Creating Actors

// over reflection
system.addActor(MyActor.class, "MyActor", ...);
// or using a factory method
system.addActor(new ActorFactory() {
	@Override
	public Actor create() {
		 return new MyActor();	
	 }
});

// or in the context of an actor
addChild(MyActor.class, "MyActor", ...);				
// or
UUID myActor = addChild( () -> new MyActor() ); 

Actors must derive from the class Actor and implement the receive method. In the example below, MyActor waits for a message containing a String and outputs it via a logger. Subsequently, the message is sent back to the sender. When a different message is received, a warning (unhandled (message)) is outputted if debugUnhandled has been set in the actor system. [1]

public class MyActor extends Actor {
	@Override
	public void receive(ActorMessage<?> message) {
		if (message.value() instanceof String) {
			logger().info(String.format(
				"Received String message: %s", message.valueAsString()));
			send(message, message.source());
		} 
		else
			unhandled(message);
	}
}		            
/* Adapted for actor4j according to [1] */

Messages

Messages can be sent using the send method. The following methods are available. Tell offers a similar syntax to Akka. A message can also be forwarded (forward). Aliases are, among other things, available to access a remote actor in a simplified manner. Messages can also be sent with priority (will be delivered with priority).

send(ActorMessage<?> message)
send(ActorMessage<?> message, UUID dest)
sendViaPath(ActorMessage<?> message, String path)
sendViaAlias(ActorMessage<?> message, String alias)
tell(T value, int tag, UUID dest)
tell(T value, int tag, UUID dest, String domain)
tell(T value, int tag, UUID dest, UUID interaction)
tell(T value, int tag, UUID dest, UUID interaction, String protocol)
tell(T value, int tag, UUID dest, UUID interaction, String protocol, String domain)
tell(T value, int tag, String alias)
tell(T value, int tag, String alias, String domain)
tell(T value, int tag, String alias, UUID interaction)
tell(T value, int tag, String alias, UUID interaction, String protocol)
tell(T value, int tag, String alias, UUID interaction, String protocol, String domain)

forward(ActorMessage<?> message, UUID dest)
priority(ActorMessage<?> message, UUID dest)

Messages that can be exchanged between the actors are of the ActorMessage<T> type. A message (see Table 1) consists of the object to be transferred (payload of the message), a tag that helps differentiate between messages, the sender address of the message, the receiver address of the message, an interaction ID for messages related to an interaction protocol (conversation) and the associated domain of the message. Methods for sending messages within the actor can be called (see above).

The structure of the ActorMessage<T> looks like that:

Type Name Description
T value payload of the message
int tag tags help for differentiating between messages
UUID source source of the message
UUID dest destination of the message
UUID interaction associated interaction
String protocol interaction protocol
String domain domain of the message

Tab. 1: Structure of the ActorMessage<T>

Pattern Matching

Pattern matching can be used with the ActorMessageMatcher class to receive messages. This class was inspired by pattern matching in Scala [2]. The message can be checked to match a tag, source, or class of the passed object (value). If a match is true, an action is triggered. This example is based on the top, except that this is expressed by the language means of the class ActorMessageMatcher. Tags serve as a simple means of communication. ACK would be such a tag.

public class MyActor extends Actor {
	protected ActorMessageMatcher matcher;
	protected final int ACK = 1;
	
	@Override
	public void preStart() {
		matcher = new ActorMessageMatcher();
		
		matcher
		.match(String.class, 
			msg -> logger().info(String.format(
				"Received String message: %s", msg.valueAsString())))
		.match(ACK, 
			msg -> logger().info("ACK tag received"))
		.matchAny(
			msg -> send(msg, msg.dest()))
		.matchElse(
			msg -> unhandled(msg));
	}
	
	@Override
	public void receive(ActorMessage<?> message) {
		matcher.apply(message);
	}
}                

MatchAny is constantly triggered, no matter the message received. If no match is found, MatchElse is fired.

Behaviour

The message processing method receive of an actor can be replaced by another method at runtime (HotSwap to Akka [3]). In the example below, the actor’s behavior is changed (on receipt of a tag SWAP). Upon receipt of the following message, information about the received message is outputted. Finally, the behavior with unbecome is returned to the original receive method.

public class MyActor extends Actor {
	protected final int SWAP=22;
	
	@Override
	public void receive(ActorMessage<?> message) {
		if (message.tag() == SWAP)
			become(msg -> {
				logger().info(String.format(
					"Received message: %s", msg));
				unbecome();
			}, false); // false -> putted on stack
		else
			unhandled(message);
	}
}

Simple Example

// Initialize the actor system
ActorSystem system = ActorSystem.create(ActorRuntime.factory(), "Example");
		
// Creation of actor pong
UUID pong = system.addActor(() -> new Actor() {
	@Override
	public void receive(ActorMessage<?> message) {
		// Receives message from ping
		System.out.println(message.valueAsString());
		// Sends message "pong" to ping
		tell("pong", 0, message.source());
	}
});
// Creation of actor ping
UUID ping = system.addActor(() -> new Actor() {
	@Override
	public void receive(ActorMessage<?> message) {
		if (message.value()!=null)
			// Receives message from pong
			System.out.println(message.valueAsString());
		// Sends message "ping" to pong
		tell("ping", 0, pong);
	}
});
		
// Starts the actor system
system.start();
		
// Sends a message to ping to start the ping-pong process
system.send(ActorMessage.create(null, 0, system.SYSTEM_ID(), ping));
		
// Lifetime for the ping-pong process
try {
	Thread.sleep(2000);
} catch (InterruptedException e) {
	e.printStackTrace();
}
// Wait until all actors are shut down
system.shutdownWithActors(true);

Life Cycle of Actors and Monitoring

Representation of the life cycle of an actor

Fig. 1: Representation of the life cycle of an actor (adapted for actor4j according to [1])

Life Cycle

As already mentioned, actors are either instantiated via system.addActor(...) or parentActor.addChild(...). Actors then receive a randomly generated (or persistent) UUID as a unique identifier, with which they can then communicate with other actors (sending messages). An actor can also have an alternative identifier, the alias (for better legibility or when the UUID is not previously known). The preStart method is initially called by the actor’s first awakening. This method will be used for the first initializations of the actor. It will also be called the method recover if the actor is a PersistentActor. This method recovers then the state of the actor. An actor can also be restarted, usually triggered by an exception (see chapter Supervision). In this case, the old instance preRestart is called first. Then a new instance is generated with the dependency injection container. The new instance replaces the old instance, and the method postRestart is called by the new instance. The preRestart and postRestart methods are used so that the actor can react adequately to the restart situation. The marking (UUID) of the original actor is retained. This also guarantees that references from other actors to this actor will stay valid. An actor can be stopped by calling the stop method or receiving the STOP or POISONPILL message. [1][4]

Monitoring

An actor can also monitor another actor for that it has not yet terminated itself. If the observed actor is terminated, a message TERMINATED is sent to the observer. An assignment is then made via message.source, corresponding to the sender’s UUID. With watch, an observer can register with an actor and de-register with unwatch. [1]

watch(UUID dest)
unwatch(UUID dest)

Comparison to Akka

The life cycle and monitoring are mainly similar to Akka’s approach. Instead of a UUID, an ActorRef is returned when an actor is instantiated.

Supervision

OneForOne-Strategy and OneForAll-Strategy

Fig. 2: OneForOne-Strategy and OneForAll-Strategy (cp. [5][6])

The supervisor actor monitors the child actors. In the event of an error, they are resumed or restarted, or stopped by them. Two strategies are foreseen (see Fig. 2). In the OneForOne-Strategy, only the affected actor is considered. In the OneForAll-Strategy, on the other hand, the affected actor is considered and the neighboring actors (below the supervisor actor). [5][6] The default strategy for actor4j is a OneForOne-Strategy [7] and is specified as:

public SupervisorStrategyDirective apply(Exception e) {
	if (
	     e instanceof ActorInitializationException || 
	     e instanceof ActorKilledException)
		return STOP;
	else
		return RESTART;
}

An ActorInitializationException is thrown if an error occurs during the instantiation of an actor. An ActorKilledException is triggered by an incoming KILL message at the actor. An exception is deliberately provoked to activate the supervisor and its error handling strategy in this case. A restart is done by default for every exception; otherwise, the actor is stopped. The default strategy can be changed by overriding the supervisorStrategy method. [1]

Extended representation of the life cycle of an actor

Fig. 3: Extended representation of the life cycle of an actor (cp. [8])

Actor4j currently supports eight directives: RESUME, STOP, TERMINATED, RESTART, ESCALATE, RECOVER, ACTIVATE and DEACTIVATE (see also Fig. 3). Stopping and restarting of the actors is asynchronous.

  • RESUME: In this case, the supervisor remains passive. The actor can continue his activities undisturbed [8].
  • STOP:
    • To all children, the message STOP is sent (recursive process, if the children also have children) so they can terminate. Use of watch, to observe that all children have terminated.
    • Call of postStop.
  • TERMINATED: The actor is stopped.
  • RESTART:
    • PreRestart is called at the current instance.
    • To all children, the message STOP is sent (recursive process, if the children also have children) so they can terminate. Use of watch, to observe that all children have terminated.
    • Call of postStop at the current instance; after all, children have finished and confirmed this with the TERMINATED message.
    • Instantiate a new instance with the dependency injection container. It is ensured that the UUID is maintained.
    • Call of postRestart (with preStart (with optional recover) for the new instance.
  • ESCALATE: If a supervisor is unclear about the correct strategy in the event of a specific error, he can pass it on to his superior supervisor for clarification.
  • RECOVER: The actor will be recovered to its last state. Novel events can lead to an update of the actor’s state.
  • ACTIVATE and DEACTIAVTE: Activates or deactivates the actor (messages will be or no longer be processed). The current explained directives remain deliverable, even when the actor is deactivated.

[1][4][7][8]

Important Actor Types

Four important actors will be presented next, derived from the class Actor. The class Actor is abstract.

ActorGroupMember (Interface)

The use of this interface signals to the ActorSystem that the correspondingly implemented actor is a member of a group. This is taken into account when distributing the actors to the threads. Actors belonging to a group are held together on a thread. The basic idea behind this has been explained in our paper (see chapter results and conclusion [9]). Inter-communication between threads is more expensive than pure intra-communication (within the same thread).

ActorWithGroup

This class implements the interface ActorGroupMember.

ResourceActor

Workload tasks should not be performed within the ActorSystem. Because they block the reactive system, and it is no longer responsive. Therefore the class ResourceActor is provided. These special actors are executed in a separate thread pool, thus avoiding disturbances within the ActorSystem. It should distinguish stateless (@Stateless) and stateful (@Stateful) actors. The advantage of this distinction lies in the fact that stateless actors can be executed in parallel.

PseudoActor

A PseudoActor is a mediator between the outside world and the ActorSystem. It allows communication with the actors within the actor system from outside. Unlike the other actors, the PseudoActor has its own message queue, in which the messages of different actors can then be stored by the ActorSystem. The class PseudoActor is derived from the class ActorWithRxStash. To be able to process received messages, the run method must be started manually.

Class Diagrams to the Core Components

Now, the core components of actor4j are presented from the user and developer perspectives. In particular, connections between the core components will be clarified.

User’s Point of View

The most important core components of actor4j can be seen in the overview (Fig. 4). The average user of actor4j will mainly work with the classes ActorSystem and the different actor classes (Actor, ActorGroupMember, ActorWithRxStash, ResourceActor). Possibly, the user will change the supervisor’s strategy. A timer (ActorTimer) should also be valuable in different situations. The ActorService class is used for a potential server (see chapter, cluster configuration).

Developer’s Point of View

For a potential supporter or interested person, further details are helpful. The class ActorSystem is a wrapper of the class ActorSystemImpl. ActorSystemImpl internally creates a map of the set ActorCell. A wrapper is also the class Actor from the class ActorCell. ActorSystemImpl uses the ActorExecutorService to generate the ActorThreads. ActorThreads are executing the actors when they have received a message. The ActorMessageDispatcher acts as a link between the actors during message passing. It places the new message in the appropriate queue of the ActorThread. In the case of an actor error, the ActorThread class is called the auxiliary class ActorStrategyOnFailure, which then executes the defined strategy of the supervisor (OneForOneSupervisorStrategy or OneForAllSupervisorStrategy). Supervisor strategies can affect the RestartProtocol and the StopProtocol. These protocols can also be triggered by a corresponding message to the actor (RESTART, STOP or POISONPILL). PseudoActor has its own ActorCell, which is called PseudoActorCell. This also includes a separate queue for the purpose of communication between the actor system and the outside world (the PseudoActor). Last but not least, again to the ActorMessageDispatcher. The ActorBalancingOnCreation class is used to distribute the actors on the ActorThreads when the actor system is started. At runtime, the class ActorBalancingOnRuntime is used.

Class diagram to the core components of actor4j

Fig. 4: Class diagram of the core components of actor4j

Note

Enclosed are two scruffy class diagrams maked with yuml.me, have fun!

References

[1] Lightbend (2015). Actors. UntypedActor API. http://doc.akka.io/docs/akka/2.4/java/untyped-actors.html
[2] EPFL (2015). Pattern Matching. http://docs.scala-lang.org/tutorials/tour/pattern-matching.html
[3] Lightbend (2015). Actors. HotSwap. http://doc.akka.io/docs/akka/2.4/java/untyped-actors.html#untypedactor-hotswap [4] Lightbend (2015). Persistence. http://doc.akka.io/docs/akka/2.4/java/persistence.html [5] Joe Armstrong (2013). Programming Erlang. Software for a Concurrent World (Pragmatic Programmers). Pragmatic Bookshelf. Pages 398-399
[6] Lightbend (2015). Supervision and Monitoring. http://doc.akka.io/docs/akka/2.4/general/supervision.html
[7] Lightbend (2015). Fault tolerance. http://doc.akka.io/docs/akka/2.4/java/fault-tolerance.html [8] Derek Wyatt (2013). AKKA Concurrency. Artima Inc. Pages 160-164.
[9] D. A. Bauer and J. Mäkiö, “Actor4j: A Software Framework for the Actor Model Focusing on the Optimization of Message Passing,” AICT 2018: The Fourteenth Advanced International Conference on Telecommunications, IARIA, Barcelona, Spain 2018, pp. 125-134, [Online]. Available from: http://www.thinkmind.org/articles/aict_2018_8_10_10087.pdf