Implement Event bus using Akka

Lukasz Lenart
SoftwareMill Tech Blog
9 min readMay 22, 2020

--

You’ve probably read such posts many times and you are wondering what more I can tell you about this topic. Maybe not much but my idea for this post was inspired by a real case scenario I had implemented in my project. I will start with events going in/out with Websockets which are then propagated to Akka actors. In the following post I will add RabbitMQ to support multiple pods/instances of the app.

Photo by Joey Kyber on Unsplash

Background

In a project I’m working in we had to add support to allow users watch a folder for changes (we built a virtual file system over the top of Dgraph). Those changes then need to be propagated to the users whoever did the change — yet another user, a background running process, etc. So, user subscribes to a folder and gets notified via websocket, then we can re-fetch the folder’s data again if needed. You can see the same mechanism when working with Github Pull Requests, where you need to click a Refresh button to reload changed files.

AkkaHttp vs Websockets

AkkaHttp provides a support for server-side Websockets and it uses AkkaStreams with AkkaActors to do the job — a mix of all of the different technologies. Yet, do not be afraid of the pot I will explain you how to tie each of the pieces to implement the event bus.

Let’s start with defining a websocket endpoint using AkkaHttp:

trait WebsocketEventsRoute {

def system: ActorSystem

final def websocketEventsRoute: Route = {
path("events") {
val flow = ClientHandlerFlow.websocketFlow()
handleWebSocketMessages(flow)
}
}

}

You should be familiar with how to create an endpoint in AkkaHttp, so let’s focus on Websockets. First, we need an AkkaStream Flow which will be used to handle incoming messages and sending back responses/events — they can be propagated from the app’s internals, it doesn’t have to be a response to the incoming websocket message.

The Flow need to be passed to a directive handleWebSocketMessages() from AkkaHttp and we are ready to connect using websockets.

The flow

Now I must implement the ClientHandlerFlow#websocketFlow() function and prepare the flow used in the websocket communication. All websocket messages are passed through this flow which is backed by an actor — each websocket connection will create a dedicated proxy actor to handle the connection. User can open multiple connections and each will use a dedicated proxy actor and each can be closed separately:

def websocketFlow(): Flow[Message, Message, NotUsed] = {
val handler = system.actorOf(ClientHandlerActor.props(eventBus))

val incomingMessages: Sink[Message, NotUsed] =
Flow[Message]
.collect {
case TextMessage.Strict(text) =>
log.debug(
s"Transforming incoming msg [$text] into domain msg"
)
WebsocketMsg.Incoming(text)
}
.to(Sink.actorRef[WebsocketMsg.Incoming](handler, Terminated))
val outgoingMessages: Source[Message, NotUsed] =
Source
.actorRef[WebsocketMsg.Outgoing](10, OverflowStrategy.fail)
.mapMaterializedValue { userActor =>
handler ! WebsocketMsg.Connected(userActor)
NotUsed
}
.collect {
case outMsg: WebsocketMsg.Outgoing =>
log.debug(
s"Transforming domain msg [$outMsg] to websocket msg"
)
TextMessage(outMsg.text)
}
Flow.fromSinkAndSourceCoupled(incomingMessages, outgoingMessages)
}

First, I must create a handler which is an Akka Actor and it will be used to handle incoming messages and sending back events. I will also use this handler to handle domain messages — you can use another actor to handle the business logic but that depends on your needs.

Now, I can create a Sink for incomingMessages which collects all the Websocket TextMessages and transforms them into an intermediate form — WebsocketMsg.Incoming(String) which then can be used to create a proper domain object. Basically I didn’t want to match on String directly in my handler.

Next, I created a Source to handle outgoingMessages which has two responsibilities:

  • first notify the handler about a new connection via handler ! WebsocketMsg.Connected(ActorRef) — this is needed to properly register this communication channel with the Event bus
  • and then by collecting intermediate outgoing messages and transforming them into Websocket TextMessages. Please be aware that Source.actorRef creates a proxy actor used to handle websocket communication. This actor receives the incoming raw websocket messages and transfers them to the handler using the flow.

I think the flow is easy:

One line in this function requires additional explanation:

.to(Sink.actorRef[WebsocketMsg.Incoming](handler, Terminated))

the Terminated message will be sent to the handler once the connection is closed, I will use it to unregister the channel from the Event bus.

The handler

Now is time to implement my handler which will be responsible for receiving messages and sending them out. The handler is implemented as an Akka actor:

private class ClientActor extends Actor {

override def receive: Receive = {
...
}
}object ClientActor {
def props: Props = Props(new ClientActor)
}

Nothing extraordinary if you know how Classic Akka Actors are working, basically we must define a receive method to handle different messages. In my case I’m going to handle two kinds of messages:

  • WebsocketMsg — wrapper around raw TextMessage websocket class
  • Domain — domain specific events

In/Out

It’s time to handle incoming websocket message and outgoing one. To achieve this logic I will introduce an intermediate format, that I’ve been already using in the above Flow definition:

private sealed trait WebsocketMsg

private object WebsocketMsg {

case class Connected(userActor: ActorRef) extends WebsocketMsg
case class Incoming(text: String) extends WebsocketMsg
case class Outgoing(text: String) extends WebsocketMsg
// a helper method when you want to translate your outgoing
// messages into JSON using Circe
def outgoing[T: Encoder](data: T): Outgoing = {
Outgoing(data.asJson.noSpaces)
}
}

Having this ready, I can extend the handler to properly handle in&out:

override def receive: Receive = onMessage(Set.empty)

private def onMessage(userActors: Set[ActorRef]): Receive = {
case WebsocketMsg.Connected(userActor) =>
context.watch(userActor)
context.become(onMessage(userActors + userActor))

case Terminated(userActor) =>
val openUserActors = userActors - userActor
if (openUserActors.isEmpty) {
self ! PoisonPill
} else {
context.become(onMessage(openUserActors))
}

case WebsocketMsg.Incoming(text) =>
// an example how to use Circe to convert incoming message
// into a domain object
parse(text) match {
case Left(error) =>
self ! Domain.Error(s"Cannot parse $text as JSON")

case Right(value) =>
value.as[Domain] match {
case Left(error) =>
self ! Domain.Error(s"Cannot parse $value as Domain")

case Right(event) =>
self ! event
}
}

case outgoing: WebsocketMsg.Outgoing =>
userActors.foreach(_ ! outgoing)
case ...: Domain =>
// do the business logic

This looks complicated but to be honest is very easy once you understood the flow.

First, I have used Become/Unbecome approach to avoid keeping state in the actor. State, which contains open user websocket channels, is passed to the handler each time it has changed. And the state can change in two cases:

  • user opened a websocket connection — thus is represented by WebsocketMsg.Connected,
  • and when user closed the connection (or it was caused by a network failure, etc.), thus is represented by Terminated. You can notice that I have used the same messages when I was defining the flow.

So now, once a user opened a new connection the handler will receive a message with a reference to the proxy actor created by AkkaHttp to handle the connection. Now I must observe this actor to get notified once it will be terminated by using context.watch() and I’m adding it to the state as a downstream channel for sending messages back to the user.

When the handler receives Terminated(ActorRef) it removes that channel from the state to avoid sending messages to the closed connection. Also if this was the last open channel for that user, I also kill the handler to keep memory clean and avoid leaks — self ! PoisonPill.

What’s left is the part to handle in/out messages, this is happening with two messages also defined in the flow: WebsocketMsg.Incoming and .Outgoing.

I added an example how to transfer an incoming text into a Domain object using Circe — you can use a JSON library of your choice. Please notice that I’m checking if the message is a proper JSON and next if the JSON is a proper structure to map it to the Domain object. If something is wrong I’m sending back an error using Domain.Error. There is nocase for such message but it should look like this:

case error: Domain.Error =>
self ! WebsocketMsg.outgoing(error)

I’m transforming the domain object into a websocket message and send it out.

The bus

All the above logic is fine till you have one user and you want just communicate with that one user at a given time. But what to do if you have multiple users and you want to notify them all about some changes in the app?

The answer is — use the Event bus!

Classic AkkaActors provides a very neat support for implementing your own event bus. All you have to do is to implement one or maybe two traits. Also you must have your domain model to be handled by the event bus. See the example below:

class DomainEventBus extends EventBus with LookupClassification {

override type Event = Domain
override type Classifier = String
override type Subscriber = ActorRef

override protected def mapSize(): Int = 100 // use a proper hint

override protected def compareSubscribers(
a: ActorRef, b: ActorRef
): Int = a.compareTo(b)

override protected def classify(evt: Domain): Topic = evt.topic

override protected def publish(
evt: Domain, subscriber: ActorRef
): Unit = {
subscriber ! evt
}

}

As you may notice there is nothing complicated in this code. You must define a few types which are quite self-explanatory plus add a very simple logic.

Now, to use the event bus I must pass it to the handler which can be done by extending how the actor is constructed:

class ClientHandlerActor(eventbus: DomainEventBus) extends Actor {

override def receive: Receive = ...
}object ClientHandlerActor {
def props(eventbus: DomainEventBus): Props = {
Props(new ClientActor(eventbus)
}
}

And final step in using the event bus is to subscribe actors, in my case handlers, to be notified about events for a given Topic. As mentioned in the background, I had to built a system to monitor changes in a virtual file system and once there was a change propagate it to the all subscribers of the folder.

To achieve that I came up with the following domain model:

sealed trait EventType

object EventType extends Enum[EventType] with CirceEnum[EventType] {

val values: collection.immutable.IndexedSeq[EventType] =
findValues

case object FolderSubscribe extends EventType
case object FolderUnsubscribe extends EventType
case object FolderUpdated extends EventType
case object Error extends EventType
}

sealed trait Domain {
def topic: String
def eventType: EventType
}

object Domain {
case class FolderSubscribe(folderId: UUID) extends AssetEvent {
override val topic: String = IgnoreTopic
override val eventType: EventType = EventType.FolderSubscribe
}

case class FolderUnsubscribe(folderId: UUID) extends AssetEvent {
override val topic: String = IgnoreTopic
override val eventType: EventType = EventType.FolderUnsubscribe
}

case class FolderUpdated(folderId: UUID) extends AssetEvent {
override val topic: String = s"folder-$folderId"
override val eventType: EventType = EventType.FolderUpdated
}

case class Error(userId: String, message: String)
extends AssetEvent {
override val topic: String = s"user-$userId"
override val eventType: EventType = EventType.Error
}
}

First I have used Enumeratum library to define a few enums (I don’t like enum in Scala — they are basically broken) representing different types of events.

Next, I implemented my domain with message to allow user subscribe a folder or unsubscribe from watching the folder. Also I added a small class to convey errors back to the given user. Please see how different topics have been defined for each of these messages as this is the clue how events are routed by the event bus to different handlers.

Finally I can extend the handler to support sub/unsub and errors:

case event: Domain.FolderSubscribe =>
if (!eventbus.subscribe(self, event.topic) {
log.error(s"Cannot subscribe $self to topic ${event.topic}")
}

case event: Doamin.FolderUnsubscribe =>
if (!eventbus.unsubscribe(self, event.topic) {
log.error(s"Cannot unsubscribe $self from topic ${event.topic}")
}

case event: Domain =>
eventbus.publish(event)

I think there is nothing to explain, if the incoming message was FolderSubscribe I will subscribe the handler — self to receive events related to the topic represented by this message. And opposite, once FolderUnsubscribe was received I will unsubscribe handler from watching events related to that folder.

And at the end I’m handling any other events and publishing them directly to the event bus. In this case Error and FolderUpdated which will be propagated either to the given user (because of topic s”user-$userId"), or to all users watching the folder (because of topic s"folder-$folderId").

Final word

Implementing a websocket communication and propagating events using an event bus isn’t that hard. It can be a cumbersome process on the beginning, but once you get familiar with the flow it will be very easy.

There is one drawback with this approach: it works perfectly fine when you are running a one instance of your app. If you start scaling your app to a few instances (eg.: using Kubernetes and scaling onto multiple pods) your events get lost, because user can open a websocket connection to a one instance but events are happening on the other one. There is no connection between them.

To bridge the gap in the following post I will introduce RabbitMQ to solve that problem, keep tied!

--

--

OSS enthusiast, ASF committer, Apache Struts lead, developer, husband and father and biker :-)