Top 6 insights you should know before using the Kafka Connect BigQuery sink connector

Jaroslaw Kijanowski
SoftwareMill Tech Blog
5 min readMar 18, 2019

--

Photo by Joe Green on Unsplash

In one of our projects we stream large amount of data, both from databases and Kafka Stream applications, into our Google BigQuery data warehouse. Since all our data resides in Kafka topics, we’re using the Kafka Connect BigQuery sink connector to move all the stuff around.

For POCs a quick setup works great and you don’t need much of docs. The Configuration Basics section in the README is good enough. You eventually figure out what the topics parameter is good for and rest assured how awesome this connector is. It simply works out of the box!

The next day you have tons of ideas and wonder if the connector will bear the weight. You open the docs and one minute later you’ll find yourself in the darkest parts of the internet, or wherever personalized google results will take you, while searching for details on this connector. One site I’ve found particularly useful when looking for technical insights on how the thing works:

- allows users to stream data from Kafka straight into BigQuery with sub-minute latency via the Kafka Connect framework

- if the connector fails to complete a write for any reason, it can simply announce its failure by throwing an exception and Kafka Connect will give the same data again to the connector to write, helping make sure that no data gets skipped over

- the biggest issues that have come up have had to do with their streaming data quotas

Since we encountered several issues, once we started to stream a hundreds of tables from a handful databases, here is a summary of what we’ve learned, changed, overcome and adjusted. Big Thank You to Grzegorz Kocur for brainstorming sessions that led to coming up with following ideas:

1. Same old, same old … dependency 😱

First of all, we build the connector from sources and in build.gradle tweak the versions of these dependencies:

googleCloudVersion = ‘1.65.0’
ioConfluentVersion = ‘5.1.2’
kafkaVersion = ‘2.0.1’

Furthermore the way how the google cloud client libs are built has changed and we need to replace

with

Bumping google cloud allows us to get rid of:

… which is fixed by https://github.com/googleapis/google-cloud-java/pull/3984 since 0.72.0. The version shipped with the google client libs in version 1.65. is 0.83.0-alpha.

Getting the connector up to date with the latest confluent version was also necessary. Our Kafka Connect instance is hosting multiple plugins, all of which are built against version 5.1.2. Leaving the BigQuery connector as is, with ioConfluentVersionset to 3.2.0 would introduce an old version of the avro-converter and avro-serializer :

There is an issue, how decimal value types with precision are handled (in our case a DECIMAL(12,4)) and our source connector having loaded classes from the old avro-converter was going haywire:

A proper fix was introduced recently and we rely on the latest and greatest 5.1.2.

2) How does the connector handle deletes?

We feed our connector with data produced by a source connector — debezium. There is no notion of deleting a record. In such a case the message contains an after struct with all fields set to null. The BigQuery connector simply replicates this into a table and we do the filtering with SELECT statements. This is also described in this GitHub issue.

3) Too much rain over paradise

When replicating our database in snapshot mode, tons of records are streamed into our BigQuery tables. Sooner or later you’ll be hit by the (in)famous:

This, also reported, is fixed and a backoff and retry policy should be set up with bigQueryRetry and bigQueryRetryWait.

4) Transformations gone wrong

If you want to perform some transformations with a Single Message Transformer, you may break the schema which was used to setup the BigQuery table. This is well described in this issue.
What we found useful is this pattern: when streaming data from our source connector we use an SMT to inject an empty time stamp field:

Then, in the sink connector we replace this time stamp field with a real value.

At the time the table is created, the schema is aware upfront of the new — although not yet filled — field. It’s the sink connector that populates it.

This has been very well explained on stack overflow.

5) Surprise!

Besides the business value related data we also include Kafka meta data, like topic name, offset, partition and time of insert. This is configured via includeKafkaData. Having only one partition in the source topic, we now have three options to sort our data in the BigQuery table: time stamp inserted by the SMT (topic insertion time), timestamp inserted as part of the meta data (BigQuery insertion time) and offset.
Having an offset we may also filter out or look for duplicates.

6) De-de-de-de-duplication

That’s a tricky one, not solved at all, but rather accepted in our world of big data. We know we shouldn’t loose any records, but due to failures it may occur that some records will be duplicated.
There is an open issue with an idea, how this could be approached and at least when doing a bulk load of data, this could be an interesting option. On the other hand it is not obvious, what could be used as the insertId.
So far the connector did not blow up during massive load. We did not encounter any duplicates and haven’t tackled this issue yet.

Summary

All in all, we’re very satisfied with the connector. It simply works and after applying some tweaks we plan to go live with it very soon. And if you wonder where the real documentation is, just look at the wiki page under the Connector Configuration.

Looking for Scala and Java Experts?

Contact us!

We will make technology work for your business. See the projects we have successfully delivered.

--

--

Java consultant having experience with the Kafka ecosystem, Cassandra as well as GCP and AWS cloud providers. https://pl.linkedin.com/in/jaroslawkijanowski