Actor4j
can be also run within a web server, supporting a basic REST-API
. Websockets
or Google Remote Procedure Call (gRPC
) is currently used for establishing an interconnection between Actor4j
and interesting clients. There is also support for Message Queue Telemetry Transport (MQTT
) and Advanced Message Queuing Protocol (AMQP
) using a ResourceActor. Eclipse Paho will be used as an MQTT
client, and Eclipse Mosquitto can be used, for example, as an MQTT
broker. For AMQP
, there is used RabbitMQ
. The implementation for AMQP
uses an MQTT
compatible implementation (MQTT
topics use a topic exchange called amq.topic
by default). It is planned in the future to support Open Platform Communications - Unified Architecture (OPC-UA
), with that Actor4j
should be prepared for use cases in the Internet of Things (IoT) or Industrial Internet of Things (IIoT) area. The Constrained Application Protocol (CoAP
) is already supported in this context.
A server node can be addressed via a REST API
(see the current specification) or over a WebSocket connection. The servers in the cluster exchange messages via WebSocket connections. Of course, access to a server node is also possible over a client API. The WebSocket approach is similar to the REST API (see Fig. 1). A web template for a Tomcat
server node can be found under: actor4j-web-template.
Fig. 1: Representation of the basic calls to the actor4j
REST API
Each actor has a unique ID (UUID
). Alternatively, an actor can be addressed via an alias instead of its ID. For remote access, this is quite handy. Before a message can be processed, it is checked whether the corresponding addressee (actor) is running on the local machine. If this is not the case, an attempt is made to determine which host the addressee can be located (1, 2). Subsequently, the message is sent to the destination host (3). Already found addressees are temporarily stored for later easy access (usage of Guava Cache [1]).
A more modern version is planned for the future.
A gRPC
compatible server can be started easily by creating a class derived from ActorGrpcServer
. In the configuration method, an actor system can be set up. In the following example, a service node will be registered so that messages can be sent later to this node to corresponding actors. Then the server will be started within the main method.
public class PongServer extends ActorGrpcServer {
@Override
protected void config(ActorService service) {
service.setParallelismMin(1);
logger().setLevel(Level.INFO);
service.setServiceNodeName("pong");
service.addServiceNode(new ActorServiceNode("ping", "localhost:8080"));
ActorGroup group = new ActorGroupSet();
service.setAlias(service.addActor(() -> new Pong(group),
service.getParallelismMin()), "pong");
}
public static void main(String[] args) {
PongServer server = new PongServer();
server.start("Actor4j-pong", 8081);
server.awaitTermination();
}
}
Messaging is also supported by Actor4j
. For using MQTT
, there must be added an MQTTResourceActor
configuration, and a callback logic must be implemented. In the callback method, a received message can be redirected to a designated actor (see example). In the examples below, there will be subscribed to a specific topic for the ResourceActor
, so it can receive appropriate messages itself. That messages will then be redirected to the receiver actor.
UUID receiver = system.addActor(() -> new Actor("receiver") {
@Override
public void receive(ActorMessage<?> message) {
if (message.tag()==PUBLISH && message.value()!=null)
System.out.printf("Message received: %s%n", message.valueAsString());
}
});
UUID mqtt = system.addActor(() -> new MQTTResourceActor("mqtt", "tcp://localhost:1883") {
@Override
public void configure(MqttConnectOptions connectOptions) {
/*
connectOptions.setUserName("...");
connectOptions.setPassword("...".toCharArray());
*/
connectOptions.setCleanSession(false);
}
@Override
public MqttCallback callback() {
return new MqttCallback() {
@Override
public void connectionLost(Throwable cause) {
// empty
}
@Override
public void messageArrived(String topic, MqttMessage message) throws Exception {
tell(new String(message.getPayload(), "UTF-8"), PUBLISH, receiver);
}
@Override
public void deliveryComplete(IMqttDeliveryToken token) {
// empty
}
};
}
@Override
public UUID clientId() {
return UUID.fromString("470ceda2-219a-4d6d-997a-88c97a501a9b");
}
});
system.send(ActorMessage.create("MyTopic", SUBSCRIBE, receiver, mqtt));
system.send(ActorMessage.create(new MQTTPublish("MyTopic", "Hello World!".getBytes(), 2, false), PUBLISH, system.SYSTEM_ID(), mqtt));
/* Full examples under https://github.com/relvaner/actor4j/tree/master/actor4j-examples */
For using AMQP
, there must be added an AMQPResourceActor
configuration, and a callback logic must be implemented. For further description, see above (MQTT
).
UUID receiver = system.addActor(() -> new Actor("receiver") {
@Override
public void receive(ActorMessage<?> message) {
if (message.tag()==PUBLISH && message.value()!=null)
System.out.printf("Message received: %s%n", message.valueAsString());
}
});
UUID amqp = system.addActor(() -> new AMQPResourceActor("amqp", "localhost", 5672) {
@Override
public void configure(ConnectionFactory factory) {
/*
factory.setUsername("...");
factory.setPassword("...");
*/
}
@Override
public Consumer callback(Channel channel) {
return new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, BasicProperties properties, byte[] body) throws IOException {
tell(new String(body, "UTF-8"), PUBLISH, receiver);
}
};
}
});
system.send(ActorMessage.create("MyTopic", SUBSCRIBE, receiver, amqp));
system.send(ActorMessage.create(new AMQPPublish("MyTopic", "Hello World!".getBytes()), PUBLISH, system.SYSTEM_ID, amqp));
/* Full examples under https://github.com/relvaner/actor4j/tree/master/actor4j-examples */
An additional CoAP
service can be set up (see example), and the belonging actor system must be injected. The’ COAPActorClient’ can interact with an appropriate CoAP
server node. Messages can be sent to designated actors.
COAPActorService coapService = new COAPActorService() {
@Override
public ActorService getService() {
return actorService;
}
};
coapService.start();
…
CoapClient client = COAPActorClient.createClient();
COAPActorClient.sendMessage(client, "coap://localhost", new TransferActorMessage("Hello World!", 0, UUID.randomUUID(), coap));
/* Full examples under https://github.com/relvaner/actor4j/tree/master/actor4j-examples */
[1] Google Inc (2015). Guava. Google Core Libraries for Java. CachesExplained. https://github.com/google/guava/wiki/CachesExplained