Author : Lal Kiran

Published on: May 29, 2018

 

Akka framework

Akka is an open source framework for developing concurrent and distributed applications. Akka supports actor based programming model for resolving the issues in concurrency. Today we have multi core CPUs for processing, so in order to improve the performance of a program we need to use multiple cores concurrently, that means we have to use multiple threads. Using threads will cause several concurrency issues. One of the programming models for resolving these issues is actor model.

How Actor model solves the concurrency issue?

 Consider the scenario where two threads trying to access the same instance at the same time like below.

 

Actor model

In this scenario we cannot predict the result correctly as the instructions of the two invocations can be called in random fashion, so we need some type of coordination between two threads. We can resolve this issue by using thread locks and inter thread communication, but this is a costly operation as it will affect the performance.

Actor programming model resolves this issue by avoiding locks. In Actor model instead of calling a method, actors send messages to each other. Here the main difference between calling a method and passing a message is message have no return value, it just delegates the work to the next actor.

The actor react to the incoming messages sequentially. Different actors work concurrently, so that the system can process as many messages simultaneously. The below image shows the working of actor model.

serialized_timeline_invariants

What is Actor in Akka:

An Actor in Akka is a class which extends akka.actor.AbstractActor class and override the createReceive method (In which we can write the behaviour of the actor).

createReceive method doesn’t have any arguments and it returns AbstractActor.Receive. In createReceive method we can write how the Actor should handle different kind of messages.

Actor has several life cycle methods like preStart(), postStop(), preRestart(), postRestart(). We can override these methods in the Actor class.

Actors handle the error situations very smoothly. The child actor can report the error to the parent actor in the actor hierarchy by passing a message, or the parent actor can set the backoff strategy to child actor so that when a failure occurs parent actor gets notified and it will try to restart the child according to the given strategy.

parent are notified when child actor fails

 Akka APIs:

  1. ActorSystem:

ActorSystem is the root element in an actor structure. We can use ActorSystem’s actorOf() method to create an actor instance. It is also the starting point for creating actors.

We can create ActorSystem instance by using the static create() method in ActorSystem.

  1. Akka Props:

Props is a configuration class,Which is used to create actors. We can pass the Props instance to the actorOf method to create an actor.

  1. ActorRef:

ActorRef is the handle for an actor. We can retrieve ActerRef from  ActorSystem and ActorContext as well, as these classes implements ActorRefFactory which produces ActorRef objects.

final ActorRef other = getContext().actorOf(Props.create(OtherActor.class), “childName”);

The ActorRef class have the methods tell() and forward() to pass the messages to the next actor.

other.tell([message], getSelf());

other.forward([message], getSelf());

  1. ActorContext:

ActorContext provides the contextual information about the actor. ActorContext provides several methods like actorOf,become,unbecome,child,children,stop,watch,etc.

How to create an Actor Hierarchy with an example:

 Consider the below example where we need to read the host address and port from a xml file and connect to a Tcp Socket using those host and port. Then to print the Tcp data to a log.

We can have the below Actor hierarchy.

Actor hierarchy

We can start with a java class AkkaExample.java to create the ActorSystem instance and create the Supervisor instance from the ActorSystem.

AkkaExample.java

import akka.actor.ActorRef;

import akka.actor.ActorSystem;

import com.lightbend.akka.sample.Greeter.*;

import java.io.IOException;

public class AkkaExample {

public static void main(String[] args) {

ActorSystem system = ActorSystem.create(“akkaExample”);

try{

ActorRef tcpSupervisorActor = system.actorOf(TCPSupervisorActor.Props(“tcp-supervisor”),”tcp-supervisor”);

tcpSupervisorActor.tell(“Start”,ActorRef.noSender());

}

catch(IOException ioe){

}

}

}

 We have to create the TCPSupervisorActor class.

 TCPSupervisorActor.java   

import akka.actor.AbstractActorWithTimers;

import akka.actor.ActorRef;

import akka.actor.Props;

import akka.actor.Terminated;

import akka.actor.AbstractActorWithTimers;

public class TCPSupervisorActor extends AbstractActorWithTimers {

private final String name;

private List<ActorRef> tcpActorList = new LinkedList<>();

private ActorRef actorRef = null;

public static Props props(String name) {

return Props.create(TCPSupervisorActor.class, () -> new TCPSupervisorActor(name));

}

public TCPSupervisorActor(String name) {

this.name = name;

this.actorRef = getSelf();

}

@Override

public void preStart() {

log.info(“Tcp supervisor started”);

}

@Override

public void postStop() {

log.info(“Tcp data supervisor stopped”);

}

@Override

public Receive createReceive() {

return receiveBuilder()

.match(String.class, s -> s.equals(“Start”), s -> {

Try{

// Killing the existing actorRef

tcpActorList.forEach((actorRef) -> {

getContext().stop(actorRef);

});

// Logic for Retrieving the Tcp host and port list from config file

List<TCPConfig> tcpConfigList = loadTcpConfig();

for(TCPConfig tcpConfig : tcpConfigList) {

ActorRef tcpActor = getContext()

.watch(getContext().actorOf(TcpActor.props(tcpConfig.getName()),

tcpConfig.getName()));

tcpActor.tell(tcpConfig, getSelf());

tcpActorList.add(tcpActor);

}

}

catch(IOException ioe){

}

}).match(Terminated.class, t -> {

// Clearing the TCP actor list.

tcpActorList.remove(t.actor());

actorRef.tell(“Start”, getSelf());

}).matchAny(o -> {

System.out.println(“received unknown message”);

}).build();

}

}

In the above example TCPSupervisor class send a message to the TCPActor class using the method tell(). Supervisor passes the “Start” message and in the TCPActor it receives the message in the createReceieve method.

After receiving the message “Start” TCPActor will stop any existing TCPActor references using the stop() method in ActorContext. Then Itrerate the TCPConfig List to create TCPActor references. For each TCPActor we will send the corresponding tcpConfig objects.

There is another method we have used here getContext().watch(). This method is used to monitor the child actors. The monitoring actor gets a Terminated message when any of the watched child actor gets terminated. When a Terminated message received by the supervisor it will restart the process by sending a “Start” message to itself.

Now we can look at the TCPActor.

TCPActor.java

import akka.actor.ActorRef;

import akka.actor.Props;

import akka.io.Tcp;

import akka.io.Tcp.CommandFailed;

import akka.io.Tcp.Connected;

import akka.io.Tcp.ConnectionClosed;

import akka.io.Tcp.Received;

import akka.io.TcpMessage;

import akka.pattern.Backoff;

import akka.pattern.BackoffSupervisor;

import akka.actor.AbstractActorWithTimers;

public class TCPActor extends AbstractActorWithTimers {

private final String name;

private TcpConfig lastMessage;

// Supervision strategy for READ actor

// restart the child actors on exception in 3-30 seconds variying delay

final Props supervisorProps = BackoffSupervisor.props(Backoff.onFailure(ReadActor.props(“READ_ACTOR”),

“READ_ACTOR”, Duration.create(3, TimeUnit.SECONDS), Duration.create(30, TimeUnit.SECONDS), 0.2));

private final ActorRef readActor = getContext().actorOf(supervisorProps);

public static Props props(String name) {

return Props.create(TCPActor.class, () -> new TCPActor(name));

}

public TCPActor(String name) {

super(false);

this.name = name;

}

@Override

public void preStart() {

log.info(“TCP actor started”);

}

@Override

public void postStop() {

log.info(“TCP actor stopped”);

}

@Override

public Receive createReceive() {

return receiveBuilder().match(TCPConfig.class, n -> {

try {

lastMessage = n;

// Passing Instance number to the Read Actor

readActor.tell(n.getInstance(), getSelf());

final InetSocketAddress remote = new InetSocketAddress(InetAddress.getByName(n.getTcpIpAddress()),

n.getTcpIpPort());

final ActorRef tcp = Tcp.get(getContext().getSystem()).manager();

tcp.tell(TcpMessage.connect(remote), getSelf());

} catch (Exception e) {

getTimers().startSingleTimer(“NMEA_TCP_RETRY”, lastMessage, Duration.create(10, TimeUnit.SECONDS));

}

}).match(CommandFailed.class, msg -> {

getTimers().startSingleTimer(“TCP_RETRY”, lastMessage, Duration.create(10, TimeUnit.SECONDS));

}).match(Connected.class, msg -> {

getSender().tell(TcpMessage.register(getSelf()), getSelf());

getContext().become(connected(getSender()));

}).matchAny(o -> System.out.println(“received unknown message”)).build();

}

/**

* @param connection

* @return

*/

private Receive connected(final ActorRef connection) {

return receiveBuilder().match(CommandFailed.class, msg -> {

getTimers().startSingleTimer(“TCP_RETRY”, lastMessage, Duration.create(10, TimeUnit.SECONDS));

}).match(Received.class, msg -> {

//Converting ByteString to string

String message = msg.data().decodeString(StandardCharsets.UTF_8);

// Passing data from TCP connection to ReadActor

readActor.tell(message, getSelf());

}).matchEquals(“close”, msg -> {

connection.tell(TcpMessage.close(), getSelf());

}).match(ConnectionClosed.class, msg -> {

getContext().stop(getSelf());

}).build();

}

}

TCPActor extends AbstractActorWithTimers here, actually AbstractActorWithTimers is same as AbstractActor except the fact that it provides us with some scheduling options. We can schedule a message after a specific time using the getTimers() function in AbstractActorWithTimers. In this example if TCPActor couldn’t able to connect to a specific InetSocketAddress, it will retry with the same TCPConfig message after 10 seconds.

getTimers().startSingleTimer(“TCP_RETRY”, lastMessage, Duration.create(10, TimeUnit.SECONDS));

After receiving the TCPConfig message from TCPSupervisorActor, TCPActor get the reference to TCP manager using the below line.

final ActorRef tcp = Tcp.get(getContext().getSystem()).manager();

Akka handles the I/O resources through the manager classes. Manager is an actor and it will try to connect to the given InetSocketAddress.

tcp.tell(TcpMessage.connect(remote), getSelf());

If the connection is successful manager will reply with a Connected message and if connection failed it will reply with a CommandFailed message. In the case of Connection failure TCPActor tries to reconnect after few seconds using the getTimers.startSingleTimer().

When the connection is successful the TCPActor must activate the connection by sending a TcpMessage.register method to the TCPManager to inform who should receive the data from socket. using the below line.

getSender().tell(TcpMessage.register(getSelf()), getSelf());

 Then TCPActor needs to change its state from unconnected to connected using the become method, because the messages received in both states are different. We can see the messages received in connected state in the connected method. TCPActor will receive data in bytestring and call the the ReadActor using the String message.

The ReadActor reference in TCPActor is created using the BackOffSupervisor strategy. This will restart the child Actor (Here ReadActor) on any failure in a varying time delays from 3 to 30 seconds. After the max time interval, parent actor will not try to restart it again.

The last Actor is ReadActor, this actor will accept any String type messages and log it.

public class ReadActor extends AbstractActorWithTimers {

private final String name;

public static Props props(String name) {

return Props.create(ReadActor.class, () -> new ReadActor(name));

}

public ReadActor(String name) {

super(false);

this.name = name;

}

@Override

public void preStart() {

log.info(“Read started”);

}

@Override

public void postStop() {

log.info(“Read stopped”);

}

@Override

public Receive createReceive() {

return receiveBuilder().match(String.class, b -> {

// Code for writing the messages to file

}).matchAny(o -> System.out.println(“received unknown message”)).build();;

}

}

 Test Class:

 Akka has Akka-testkit module for test support. Testkit contains several tools which makes the testing of actors easy. It contains an actor called testActor which is used to examine the actor messages with various expectMessage assertions. Given below some of the assertion methods in Akka-TestKit.

1. public <T> T expectMsgEquals(FiniteDuration max, T msg) :

            The given message should be recieved within the time span.

2. public <T> T expectMsgPF(Duration max, String hint, Function<Object, T> f):

            Message should be received in the given time frame and the given function should be defined for that message.

3. public <T> T expectMsgClass(FiniteDuration max, Class<T> c):

            Object which is an instace of the given class should be reveived in the given time frame.

4. public Object expectMsgAnyOf(Duration max, Object… msg):

            Received message should be equal to at least one of the passed reference object

5. public <T> T expectMsgAnyClassOf(FiniteDuration max, Class<? extends T>… c):

            Received message should be the instance of atleast one of the passed parameters.

6. public void expectNoMsg(FiniteDuration max):

Expecting no message within a given time.

7. List<Object> receiveN(int n, FiniteDuration max)Expecting n messages in a given time span.Let’s create a simple test class for the TCPSupervisorActor.import static org.junit.Assert.assertEquals;import java.io.IOException;import java.util.Arrays;import java.util.List;import org.junit.AfterClass;import org.junit.BeforeClass;import org.junit.Test;import akka.actor.ActorRef;

import akka.actor.ActorSystem;

import akka.actor.Props;

import akka.testkit.javadsl.TestKit;

import scala.concurrent.duration.Duration;

public class NmeaProtoSupervisorTest {

private static ActorSystem system;

private static TestKit testKit;

private static ActorRef subject;

@BeforeClass

public static void setup() {

system = ActorSystem.create();

testKit = new TestKit(system);

subject = system.actorOf(Props.create(TCPSupervisorActor.class, “Test”));

}

/**

* This method is used to test the behaviour of Start message send to

* TCPSupervisor Actor.

*/

@Test

public void testCreateReceiveBonVoyage() {

subject.tell(“Start”, testKit.getRef());

testKit.expectMsgClass(TCPConfig.class);

}

@AfterClass

public static void teardown() {

TestKit.shutdownActorSystem(system);

system = null;

}

}

The TestKit object should be initialized in the @BeforeClass method. Here the subject actor is the TCPSupervisorActor. After the test we have shutdown the actor system created in the @AfterClass method.

Here we have tested the behaviour of TCPSupervisorActor when it receives a “Start” message. We are expecting a message of type TCPConfig here.

Conclusion:

In this introduction we have learned how to create an Actor hierarchy and test classes in Akka.  Hope this introduction allows you to understand Akka framework and read more about other Akka modules like Cluster, Streams, HTTP etc.

FacebooktwitterlinkedinFacebooktwitterlinkedin