Help, Kafka ate my data!

Grzegorz Kocur
SoftwareMill Tech Blog
7 min readMar 25, 2020

--

The bad news: if it already happened it’s not much you can do. Maybe there’s a chance the producer can re-process the data, but it strongly depends on your application. So — sorry, I can’t help. Instead of this, I will try to describe the most important configuration to prevent data loss in Kafka.

Update: The following blogpost was writen for Kafka <2.8.0. KIP-679 changes default producer config for acks from 1 to all and for enable.idempotence from false to true and is planned for Kafka 3.0.0.

Producer Acknowledgements

This is the super important configuration on the producer level. According to documentation, the acks property is:

The number of acknowledgments the producer requires the leader to have received before considering a request complete.

The possible values are: 0, 1, all and -1 (which is equivalent to all).

What does it mean?

If you set this value to “0” — it means “fire and forget”. The producer continues its work regardless the brokers were able to persist the message. Definitely not a good idea when each message is important.

Setting it to “1” means producer will continue when the leader was able to persist the message. Sounds much better, but imagine scenario when the leader goes down and messages had not been replicated to other replicas yet — these messages will be lost.

The property value “all” suggests it guarantees the message was persisted on all replicas, but things are more complicated — actually, it means it was written on all replicas which are in sync. It’s confusing and it can be a trap. More about it later. Anyway — “all” (or “-1”) is the most secure setting.

Producer retries

With retries setting one can fine-tune how many times the producer should try to send the message to a broker if sending fails. The default value is 2147483647 which is max int.

Sounds straightforward? Well, like in many other Kafka setting here is a trap, too. Two traps actually.

First — there is another setting delivery.timeout.ms that sets the timeout for all retries. So if the retries is a big number but the timeout is small the message delivery will fail anyway.

Second — if the order of the messages is important it’s critical to set them max.in.flight.requests.per.connection to 1. Setting it to value greater than 1 can cause messages reordering. More about this in this excellent article which explains the problem in details.

Replication

Kafka offers replication. It can be a “lifesaver” in case of broker failure. There are several important things to remember:

  • replication is done on partition level,
  • for each partition, there is one leader and one or more followers,
  • followers fetch data from the leader,
  • if producer acks is set to all — producer is acked when message is persisted on all replicas in sync.

There are important conclusions resulting from those facts:

  • if you have a high replication level set with brokers in multiple sites (for example in multiple availability zones) fetching messages by all followers takes some time.
  • if a leader fails after it had got the message but before this message was fetched by followers the message is lost. It’s the producer’s responsibility to replay this message. This is why the acks setting is so important.

The default replication factor for all topic can be configured with default.replication.factor — beware, the default setting is 1!

The replication factor can also be set per topic when the topic is created. If one wants to have different settings for particular topics it’s a good idea to set auto.create.topics.enable to false and create the topics with a script.

Replication — special cases

There are 2 additional settings for replication: offsets.topic.replication.factor
and
transaction.state.log.replication.factor

These are broker settings to “special” topics — the first one stores consumer offsets and the latter stores transactions details. Keep in mind those topics don’t use the default setting for regular topics.

Minimal in-sync replicas

As already said — the producer acks property determines when Kafka cluster should ack the message, and the all setting is more secure. What does the word “all” mean in this case? It means all in-sync replicas.

min.insync.replicas means:

When a producer sets acks to “all” (or “-1”), min.insync.replicas specifies the minimum number of replicas that must acknowledge a write for the write to be considered successful. If this minimum cannot be met, then the producer will raise an exception (either NotEnoughReplicas or NotEnoughReplicasAfterAppend).

Consider scenario: we set acks=all and min.insync.replicas=1 (which is the default!). There is an unstable network, and only the leader is in sync (for example other brokers lose the connection to zookeeper). Producer writes the message, and — according to min.insync.replicas this message is acknowledged. And before other brokers are back live — this node goes down because of failure. What does it mean? It means this message will never be replicated to other brokers and lost.

This scenario seems to be unrealistic, but it’s a real live example.

The minimal secure setting for min.insync.replicas is 2. This can be dangerous since the default value is 1 and it’s easy to forget to change it.

The min.insync.replicas configured on broker will be the default for all new topics (you can configure it per topic).

Again, the transaction topic doesn’t use this setting, it has its own: transaction.state.log.min.isr .

Unclean leader election

TL;DR: the default value is false and never set it to true if durability is more important than availability. It is especially dangerous when min.insync.replicas is set to 1. Why? Consider a scenario:

  1. You have 3 brokers, broker_1 is a leader.
  2. broker_3 goes offline for some reason.
  3. The broker_1 removes it from ISR list.
  4. The producer continues its work and writes several messages.
  5. The broker_1 and broker_2 goes offline at the same time.
  6. The broker_3 recovers and is online again, it becomes a leader.
  7. The broker_2 recovers and starts following broker_3 .

What does it mean? Messages stored (and acked!) when the broker_3 was offline are lost.

Setting unclean.leader.election.enable to false prevents a broker from becoming a leader if it was not in ISR list.

Consumer auto commit

When the consumer gets some messages it has to “tell” Hello, I already got this message, please don't give it to me again if I ask for new messages!! It’s done by committing the offset. It can be done manually or automatically. If the enable.auto.commit is set to true the consumer’s offset will be periodically committed in the background. So you have no control when the offset is sent, it can be sent even before the messages were processed. It’s easy to imagine what happens if the consumer fails — it won’t get messages already “older” than committed offset and such messages are lost.

(Actually, it’s a bit more complicated — the offset is stored not for particular consumer instance but for consumer group, but let’s simplify it a bit now).

What happens if you manually commit the offset and the consumer fails after processing the message and before the offset is committed? Yes, the message is processed twice leading to at most once delivery. If uniqueness is important you have to deduplicate the messages during the processing, but it’s a topic for another story.

Messages not synced to disk

What happens when all brokers ack the message? Does it mean it’s already stored on disk? Actually, no. It means it’s in the broker’s memory. Is it a problem? Yes, it can be a problem when all brokers fail at the same time. It’s possible when all brokers are in the same availability zone which is the worst practice.

Normally messages are flushed to disk when operating system decides to do so, but one can override this by setting the log.flush.interval.messages or log.flush.interval.ms on broker level or flush.messages.flush.ms on a topic level. For example, setting flush.messages=1 will cause writing every single message to disk. As you can imagine it has a big performance impact, so think twice before doing it.

Things are even worse — if you sync data to the disk it can be stored in disk cache. But the scenario when all disks in all brokers are broken at the same time is nearly impossible. On the other hand — if the cluster is poorly designed and all brokers are in the same availability zone the power failure can cause data loss.

Summary

Kafka is a distributed system and can be super durable if configured correctly. Some take-aways:

Producer settings

  • acks=all
  • retries=2147483647
  • delivery.timeout.ms=2147483647

Brokers settings

  • unclean.leader.election.enable=false
  • default.replication.factor=3
  • min.insync.replicas=2
  • offsets.topic.replication.factor=3
  • transaction.state.log.replication.factor=3
  • transaction.state.log.min.isr=2

Consumer settings

  • enable.auto.commit=false

Get “Start with Apache Kafka eBook”

We’ve gathered our lessons learned while consulting clients and using Kafka in commercial projects.

Need help with Apache Kafka?

We are a Certified Technical Partner of Confluent. Our engineering expertise with stream processing and distributed systems applications is proven in commercial projects, workshops and consulting.

Contact us!

--

--