Supported Web Communication Protocols

Actor4j can be also run within a web server, supporting a basic REST-API. Websockets or Google Remote Procedure Call (gRPC) is currently used for establishing an interconnection between Actor4j and interesting clients. There is also support for Message Queue Telemetry Transport (MQTT) and Advanced Message Queuing Protocol (AMQP) using a ResourceActor. Eclipse Paho will be used as an MQTT client, and Eclipse Mosquitto can be used, for example, as an MQTT broker. For AMQP, there is used RabbitMQ. The implementation for AMQP uses an MQTT compatible implementation (MQTT topics use a topic exchange called amq.topic by default). It is planned in the future to support Open Platform Communications - Unified Architecture (OPC-UA), with that Actor4j should be prepared for use cases in the Internet of Things (IoT) or Industrial Internet of Things (IIoT) area. The Constrained Application Protocol (CoAP) is already supported in this context.

REST and Websocket

A server node can be addressed via a REST API (see the current specification) or over a WebSocket connection. The servers in the cluster exchange messages via WebSocket connections. Of course, access to a server node is also possible over a client API. The WebSocket approach is similar to the REST API (see Fig. 1). A web template for a Tomcat server node can be found under: actor4j-web-template.

Representation of the basic calls to the actor4j REST API

Fig. 1: Representation of the basic calls to the actor4j REST API

Each actor has a unique ID (UUID). Alternatively, an actor can be addressed via an alias instead of its ID. For remote access, this is quite handy. Before a message can be processed, it is checked whether the corresponding addressee (actor) is running on the local machine. If this is not the case, an attempt is made to determine which host the addressee can be located (1, 2). Subsequently, the message is sent to the destination host (3). Already found addressees are temporarily stored for later easy access (usage of Guava Cache [1]).

A more modern version is planned for the future.

gRPC

A gRPC compatible server can be started easily by creating a class derived from ActorGrpcServer. In the configuration method, an actor system can be set up. In the following example, a service node will be registered so that messages can be sent later to this node to corresponding actors. Then the server will be started within the main method.

Example for usage of gRPC

public class PongServer extends ActorGrpcServer {
	@Override
	protected void config(ActorService service) {
		service.setParallelismMin(1);
		logger().setLevel(Level.INFO);
		
		service.setServiceNodeName("pong");
		service.addServiceNode(new ActorServiceNode("ping", "localhost:8080"));
		
		ActorGroup group = new ActorGroupSet();
		service.setAlias(service.addActor(() -> new Pong(group), 			
			service.getParallelismMin()), "pong");
	}
	
	public static void main(String[] args) {
		PongServer server = new PongServer();
		server.start("Actor4j-pong", 8081);
		server.awaitTermination();
	}

}

MQTT

Messaging is also supported by Actor4j. For using MQTT, there must be added an MQTTResourceActor configuration, and a callback logic must be implemented. In the callback method, a received message can be redirected to a designated actor (see example). In the examples below, there will be subscribed to a specific topic for the ResourceActor, so it can receive appropriate messages itself. That messages will then be redirected to the receiver actor.

Example for usage of MQTT

UUID receiver = system.addActor(() -> new Actor("receiver") {
	@Override
	public void receive(ActorMessage<?> message) {
		if (message.tag()==PUBLISH && message.value()!=null)
			System.out.printf("Message received: %s%n", message.valueAsString());
	}
});
UUID mqtt = system.addActor(() -> new MQTTResourceActor("mqtt", "tcp://localhost:1883") {
	@Override
	public void configure(MqttConnectOptions connectOptions) {
		/*
		connectOptions.setUserName("...");
		connectOptions.setPassword("...".toCharArray());
		*/
		connectOptions.setCleanSession(false);
	}

	@Override
	public MqttCallback callback() {
		return new MqttCallback() {
			@Override
			public void connectionLost(Throwable cause) {
				// empty
			}

			@Override
			public void messageArrived(String topic, MqttMessage message) throws Exception {
				tell(new String(message.getPayload(), "UTF-8"), PUBLISH, receiver);
			}

			@Override
			public void deliveryComplete(IMqttDeliveryToken token) {
				// empty
			}
		};
	}

	@Override
	public UUID clientId() {
		return UUID.fromString("470ceda2-219a-4d6d-997a-88c97a501a9b");
	}
});
		
system.send(ActorMessage.create("MyTopic", SUBSCRIBE, receiver, mqtt));
system.send(ActorMessage.create(new MQTTPublish("MyTopic", "Hello World!".getBytes(), 2, false), PUBLISH, system.SYSTEM_ID(), mqtt));

/* Full examples under https://github.com/relvaner/actor4j/tree/master/actor4j-examples */

AMQP

For using AMQP, there must be added an AMQPResourceActor configuration, and a callback logic must be implemented. For further description, see above (MQTT).

Example for usage of AMQP

UUID receiver = system.addActor(() -> new Actor("receiver") {
	@Override
	public void receive(ActorMessage<?> message) {
		if (message.tag()==PUBLISH && message.value()!=null)
			System.out.printf("Message received: %s%n", message.valueAsString());
	}
});
UUID amqp = system.addActor(() -> new AMQPResourceActor("amqp", "localhost", 5672) {
	@Override
	public void configure(ConnectionFactory factory) {
		/*
		factory.setUsername("...");
		factory.setPassword("...");
		*/
	}

	@Override
	public Consumer callback(Channel channel) {
		return new DefaultConsumer(channel) {
			@Override
			public void handleDelivery(String consumerTag, Envelope envelope, BasicProperties properties, byte[] body) throws IOException {
				tell(new String(body, "UTF-8"), PUBLISH, receiver);
			}
		};
	}
});
		
system.send(ActorMessage.create("MyTopic", SUBSCRIBE, receiver, amqp));
system.send(ActorMessage.create(new AMQPPublish("MyTopic", "Hello World!".getBytes()),	PUBLISH, system.SYSTEM_ID, amqp));

/* Full examples under https://github.com/relvaner/actor4j/tree/master/actor4j-examples */

CoAP

An additional CoAP service can be set up (see example), and the belonging actor system must be injected. The’ COAPActorClient’ can interact with an appropriate CoAP server node. Messages can be sent to designated actors.

Example for usage of CoAP

COAPActorService coapService = new COAPActorService() {
	@Override
	public ActorService getService() {
		return actorService;
	}
};
coapService.start();

CoapClient client = COAPActorClient.createClient();
COAPActorClient.sendMessage(client, "coap://localhost", new TransferActorMessage("Hello World!", 0, UUID.randomUUID(), coap));

/* Full examples under https://github.com/relvaner/actor4j/tree/master/actor4j-examples */

References

[1] Google Inc (2015). Guava. Google Core Libraries for Java. CachesExplained. https://github.com/google/guava/wiki/CachesExplained