Scaling Event Bus

Lukasz Lenart
SoftwareMill Tech Blog
5 min readJun 29, 2020

--

In my previous post I showed how you can implement an event bus using Akka and how to handle incoming and outgoing messages using Websockets. Now it’s time to feed the event bus with some internal events and also support scaling it up on multiple instances.

Photo by Patrick McManaman on Unsplash

The Publisher

Instead of depending directly on the event bus I will introduce a publisher object, an intermediate component, which then can be used by various different components to publish events into the event bus.

Adding additional abstractions allows to easily mock it in unit tests and also replace undergoing logic without changing the users of the component.

class DomainEventPublisher(eventBus: DomainEventBus) {

def publish(events: DomainEvent*): Unit = {
events.foreach { event =>
eventBus.publish(event)
}
}

}

As you see, there is nothing super ordinary in the component, just take the events and publish them into the event bus.

Now you can use it in your logic to spread events across the system:

class CommandActor(
copyService: CopyService,
publisher: DomainEventPublisher
) extends Actor {

override def receive: Receive = {
case cmd: CopyEntity =>
copyService
.copyEntity(cmd)
.onComplete {
case Success(CopyDone) =>
publisher
.publish(
DomainEvent
.FolderUpdated(cmd.destinationId)
)
...

In my case I will notify users about changes to a folder they are watching, so in case of a copy operation performed by some other user, they will get notified on a copy done and folder will be automatically updated without a need to refresh the web page.

This works perfectly fine when there is just one instance of the system and all the users are connected to the same instance. Yet the proposed solution won’t work in case of scaling the system onto multiple nodes. In these days almost every large system is built based on a cloud solution and scaling instances up and down is so natural. To support scaling we must use a messaging system.

The messaging system

Spreading events around multiple nodes makes an obvious choice on selecting a proper messaging system which can be Apache Kafka or RabbitMQ (or any other). This is right, but rather untrue ;-) Not every messaging system will fit the requirements or it will require to use some not recommended approaches.

In my case, I used RabbitMQ as I was more familiar with it.

The messaging system must support consuming messages from many instances and spread them into many instances of the same application. RabbitMQ allows implementing that by using an exchange and multiple dynamic queues — which is a queue created when instance starts (scale up) and it goes deleted once instance is down (scale down) and exclusively connected to the instance — no other client can use this queue. The diagram below shows this clearly:

The exchange must be of type fanout and you should consider setting optiondurable to true to survive broker restart.

That’s the whole configuration on RabbitMQ side, time to configure Alpakka.

Spreading messages

Having exchange ready I can use it to spread messages instead of putting them directly into Akka Event Bus as I did on the beginning. This means I need to extend the publisher and use AMQP to forward all the messages into RabbitMQ.

First, I must define a Sink[ByteString, _] and connect it to the exchange:

val provider = AmqpUriConnectionProvider(s"amqp://$host:$port")
val name = "domain.events"
val exchange = ExchangeDeclaration(name, "fanout").withDurable(true)

val sink = AmqpSink.simple(
AmqpWriteSettings(provider)
.withExchange(name)
.withDeclaration(exchange)
)

Now I can use the sink in my publisher to route events into the exchange:

class DomainEventPublisher(sink: Sink[ByteString, Future[Done]])(
implicit materializer: Materializer
) extends AssetEventCirceDecoders {

private val log: Logger = LoggerFactory.getLogger(getClass)

def publish(event: DoaminEvent): Future[Done] = {
import scala.concurrent.ExecutionContext.Implicits.global

log
.debug(s"Publishing event $event into AMQP")
Source
.single(event)
.map(toByteString)
.runWith(sink)
.recoverWith {
case NonFatal(t) =>
log.error(s"Cannot add event $event to queue", t)
Future.failed(t)
}
}

private[this] def toByteString(event: DomainEvent): ByteString = {
ByteString(event.asJson.noSpaces)
}

}

In the example code above I use Circe to serialise events (event.asJson...) into strings and such strings are then published into the exchange.

Consuming messages

Once messages were delivered into exchange it’s time to consume them. As I said earlier, each instance will have its own queue to read messages from. This queue will connect to the exchange to receive all the messages. To do this I must declare a Source[CommitableReadResult, _] :

val provider = AmqpUriConnectionProvider(s"amqp://$host:$port")
val queueName = "instance-id-<random-string>"

val queue = QueueDeclaration(queueName)
.withExclusive(true)
.withAutoDelete(true)

val binding = BindingDeclaration(queueName, "domain.events")

val settings = NamedQueueSourceSettings(provider, queueName)
.withDeclarations(immutable.Seq(queue, binding))

val source: Source[CommittableReadResult, NotUsed] =
RestartSource.withBackoff(2.seconds, 10.seconds, 0.1) { () =>
AmqpSource.committableSource(
settings,
bufferSize = 10
)
}

The very first thing is that each queue must have a unique name, I’m using a prefix plus some random string (it can be an UUID). Now the queue must be exclusive and auto delete as mentioned on the beginning. Also it is important to declare the binding which connects the queue with the exchange.

Once set I can declare the source and start consuming messages and pushing them into Akka Event Bus again:

class EventPropagator private (
eventbus: DomainEventBus,
eventSource: Source[CommittableReadResult, NotUsed]
)(implicit materializer: Materializer)
extends DomainEventCirceDecoders {

private val log: Logger = LoggerFactory.getLogger(getClass)

def run(): Future[Done] = {
eventSource
.map { message =>
decodeMessage(message) match {
case Left(error) =>
log.error(
s"Got error trying to decode message: $message",
error
)
message.nack(requeue = false)

case Right(event) =>
log.debug(s"Publishing event into eventbus: $event")
eventbus.publish(event)
message.ack()
}

}
.runWith(Sink.ignore)
}

private[this] def decodeMessage(
readResult: CommittableReadResult
): Either[Error, DomainEvent] = {
val message = readResult.message.bytes.utf8String
for {
json <- parse(message)
event <- json.as[DomainEvent]
} yield event
}

}

And again I use Circe to decode the incoming string into a proper domain object. Having an event I can publish it into the Event Bus to notify all handlers about it.

Basically this solution added two connectors to propagate the messages into RabbitMQ, read the messages from a queue and publish them into the Event Bus. Right now when the application will be scaled up or down, all the instances will be notified about the events. And it doesn’t matter where the source of the event was and where the user is connected to. All will be delivered as it was just one instance.

--

--

OSS enthusiast, ASF committer, Apache Struts lead, developer, husband and father and biker :-)