A story about streaming unique batches in Monix

Kasper Kondzielski
SoftwareMill Tech Blog
5 min readMay 11, 2020

--

Photo by Erik Mclean on Unsplash

Recently I got quite a challenging task in my project which I thought might be interesting to share. Before we jump into the details let me sketch briefly the context.

The purpose of the service is to read withdrawals from a database table and execute them by sending to some external system. The task is to batch these executions in packages not bigger than config.batchSize. Furthermore, if we can’t achieve config.batchSize through config.timeWindow we should send what we have accumulated so far, if there’s any. As these withdrawals can come in different currencies we should batch them only within the same currency.

Let’s start by putting the requirements into more friendly form.

Requirements

  • periodical reads of new withdrawals from table
  • grouping withdrawals by currency
  • batching withdrawals over time and count within a single currency

I don’t know about you, but whenever I have to execute some operation periodically I’m thinking about reactive streams, since they are very handy in such use cases. In this post I will use Monix as an implementation of the reactive streams standard, but it should look very similar in any other implementations.

Before we dig into reactive operators let’s first define our component dependencies.

Let’s dive into the code

First, let’s create a withdrawal repository to access our withdrawal table.

Then, let’s look for all withdrawals which are in a waiting state and additionally return them ordered by some serial id. That serial id is monotonic and a new one will be assigned for each withdrawal by database when inserting into it. Later, you will see why this is very important.

Reading from a database is obviously a side effect, which is asynchronous and which can fail, so modeling it as a Task seems reasonable. Next, we need to define the external system api. Let’s say it looks like that:

I simplified the return type for the purpose of this article. It is an asynchronous effect which doesn’t return anything useful on success, and not to over complicate things we won’t bother about error handling assuming that the external system always succeeds.

Having defined all the dependencies let’s write our WithdrawalBatchingExecutor:

First, we need to start fetching withdrawals from our repository periodically. We can do that by creating a stream that emits a new value within a specified time intervals and later, on each item, we call our repository.

I used intervalWithFixedDelay here because I wanted my successive computations to be separated by a fixed amount of time. That means that the time is being counted after the computation has finished. There is also the intervalAtFixedRate operator which, as the name suggests, tries to emit items at a given rate, subtracting the time it took to do a computation from the delay of a next computation. As you can see we have very fine grained control over time here.

Next we need to group these withdrawals by their currency. There is no simpler operation than this.

Now we have to batch withdrawals inside respective groups. We could apply another transformation to this stream in which we would iterate over this Map and try to write batching logic by our own, but we can do better.

If we look into the monix.reactive.Observable API, we will see that there is a function which fits our needs perfectly — bufferTimeAndCounted(). Since it operates on values within a stream and not on a list (which is our case), before we can use it, we first need to convert our list to a stream.

And now we can apply our magic transformation:

Note: there are also other predefined buffering operators and I encourage you to check them out.

It looks very shiny, but will it actually work? The answer is no, it won’t :)

If we look closer, we can see that on each tick we are emitting what is inside the repository, which means that in our buffer we will be accumulating the same values over and over. We need an operator that tells us only about new withdrawals.

If you know distinctUntilChanged operator, it might be tempting to use it here, but it won’t work either.

DistinctUntilChanged is an operator which emits a new value only if it differs from a previous one. If we apply it after unrolling the list it won’t work because every item on our list is different from the previous one.

On the other hand, if we apply it to our stream before unrolling the list, we will get a new list every time it changes. That’s good, we are getting new items, but together with the old ones. We still need to figure out how to remove the items we have already seen.

This is where scan operator comes to the rescue.

Let’s use it to take all new elements till we reach the element which we have already seen and, because we will only do that if the list changes, at the end we will get only new items. It is crucial for this solution to work that the order of withdrawals is guaranteed.

Now we can unroll our list and apply the buffer operator, so the code looks like:

Great success, we have managed to batch our withdrawals, but we have forgotten about the grouping requirement. Luckily for us there is also an operator for that. Unsurprisingly it is called groupBy.

From the scala-doc:

“Groups the items emitted by an Observable according to a specified criterion, and emits these grouped items as GroupedObservables, one GroupedObservable per group.”

Which in short means that for every group of withdrawals we will have a new observable emitted. That’s actually what we wanted, because now we can operate on streams which contain only withdrawals with single currency.

But now the main stream has the following type — Observable[Seq[GroupedObservable]] which might raise some questions:

  • How do we combine these forked streams back into one?
  • How do we apply our buffered logic into the inner streams

Again there is an operator for that exact use case and it is called mergeMap.

We pass to it a function from a source stream’s item to an observable and it merges all these observables back to one. In our case an item is a single groupedObservable which will emit withdrawals within a single currency. When we combine it with what we already have, we get our final solution which does what we wanted:

To complete the solution we need to mark withdrawals back as executed in our table. Depending on the external system’s ability to deduplicate executions we might want to commit these changes to the database before or after actual execution.

Wrap up

As you can see we have achieved quite an elegant solution which is composed of observable’s operators. This solution not only fulfils all the requirements, but it is reasonably easy (having in mind complexity of the task) to test it using monix.TestScheduler.

If you would like to get more familiar with monix.reactive I encourage you to write tests against theWithdrawBatchingExecutor.

--

--