Help, Kafka ate my data!
The bad news: if it already happened it’s not much you can do. Maybe there’s a chance the producer can re-process the data, but it strongly depends on your application. So — sorry, I can’t help. Instead of this, I will try to describe the most important configuration to prevent data loss in Kafka.
Update: The following blogpost was writen for Kafka <2.8.0. KIP-679 changes default producer config for acks from 1 to all and for enable.idempotence from false to true and is planned for Kafka 3.0.0.
Producer Acknowledgements
This is the super important configuration on the producer level. According to documentation, the acks
property is:
The number of acknowledgments the producer requires the leader to have received before considering a request complete.
The possible values are: 0
, 1
, all
and -1
(which is equivalent to all
).
What does it mean?
If you set this value to “0” — it means “fire and forget”. The producer continues its work regardless the brokers were able to persist the message. Definitely not a good idea when each message is important.
Setting it to “1” means producer will continue when the leader was able to persist the message. Sounds much better, but imagine scenario when the leader goes down and messages had not been replicated to other replicas yet — these messages will be lost.
The property value “all” suggests it guarantees the message was persisted on all replicas, but things are more complicated — actually, it means it was written on all replicas which are in sync. It’s confusing and it can be a trap. More about it later. Anyway — “all” (or “-1”) is the most secure setting.
Producer retries
With retries
setting one can fine-tune how many times the producer should try to send the message to a broker if sending fails. The default value is 2147483647 which is max int.
Sounds straightforward? Well, like in many other Kafka setting here is a trap, too. Two traps actually.
First — there is another setting delivery.timeout.ms
that sets the timeout for all retries. So if the retries
is a big number but the timeout is small the message delivery will fail anyway.
Second — if the order of the messages is important it’s critical to set them max.in.flight.requests.per.connection
to 1. Setting it to value greater than 1 can cause messages reordering. More about this in this excellent article which explains the problem in details.
Replication
Kafka offers replication. It can be a “lifesaver” in case of broker failure. There are several important things to remember:
- replication is done on partition level,
- for each partition, there is one leader and one or more followers,
- followers fetch data from the leader,
- if producer
acks
is set toall
— producer is acked when message is persisted on all replicas in sync.
There are important conclusions resulting from those facts:
- if you have a high replication level set with brokers in multiple sites (for example in multiple availability zones) fetching messages by all followers takes some time.
- if a leader fails after it had got the message but before this message was fetched by followers the message is lost. It’s the producer’s responsibility to replay this message. This is why the
acks
setting is so important.
The default replication factor for all topic can be configured with default.replication.factor
— beware, the default setting is 1!
The replication factor can also be set per topic when the topic is created. If one wants to have different settings for particular topics it’s a good idea to set auto.create.topics.enable
to false and create the topics with a script.
Replication — special cases
There are 2 additional settings for replication: offsets.topic.replication.factor
andtransaction.state.log.replication.factor
These are broker settings to “special” topics — the first one stores consumer offsets and the latter stores transactions details. Keep in mind those topics don’t use the default setting for regular topics.
Minimal in-sync replicas
As already said — the producer acks
property determines when Kafka cluster should ack the message, and the all
setting is more secure. What does the word “all” mean in this case? It means all in-sync replicas
.
min.insync.replicas
means:
When a producer sets acks to “all” (or “-1”), min.insync.replicas specifies the minimum number of replicas that must acknowledge a write for the write to be considered successful. If this minimum cannot be met, then the producer will raise an exception (either NotEnoughReplicas or NotEnoughReplicasAfterAppend).
Consider scenario: we set acks=all
and min.insync.replicas=1
(which is the default!). There is an unstable network, and only the leader is in sync (for example other brokers lose the connection to zookeeper). Producer writes the message, and — according to min.insync.replicas
this message is acknowledged. And before other brokers are back live — this node goes down because of failure. What does it mean? It means this message will never be replicated to other brokers and lost.
This scenario seems to be unrealistic, but it’s a real live example.
The minimal secure setting for min.insync.replicas
is 2. This can be dangerous since the default value is 1 and it’s easy to forget to change it.
The min.insync.replicas
configured on broker will be the default for all new topics (you can configure it per topic).
Again, the transaction topic doesn’t use this setting, it has its own: transaction.state.log.min.isr
.
Unclean leader election
TL;DR: the default value is false
and never set it to true
if durability is more important than availability. It is especially dangerous when min.insync.replicas
is set to 1. Why? Consider a scenario:
- You have 3 brokers,
broker_1
is a leader. broker_3
goes offline for some reason.- The
broker_1
removes it from ISR list. - The producer continues its work and writes several messages.
- The
broker_1
andbroker_2
goes offline at the same time. - The
broker_3
recovers and is online again, it becomes a leader. - The
broker_2
recovers and starts followingbroker_3
.
What does it mean? Messages stored (and acked!) when the broker_3
was offline are lost.
Setting unclean.leader.election.enable
to false prevents a broker from becoming a leader if it was not in ISR list.
Consumer auto commit
When the consumer gets some messages it has to “tell” Hello, I already got this message, please don't give it to me again if I ask for new messages!!
It’s done by committing the offset. It can be done manually or automatically. If the enable.auto.commit
is set to true
the consumer’s offset will be periodically committed in the background. So you have no control when the offset is sent, it can be sent even before the messages were processed. It’s easy to imagine what happens if the consumer fails — it won’t get messages already “older” than committed offset and such messages are lost.
(Actually, it’s a bit more complicated — the offset is stored not for particular consumer instance but for consumer group, but let’s simplify it a bit now).
What happens if you manually commit the offset and the consumer fails after processing the message and before the offset is committed? Yes, the message is processed twice leading to at most once delivery. If uniqueness is important you have to deduplicate the messages during the processing, but it’s a topic for another story.
Messages not synced to disk
What happens when all brokers ack the message? Does it mean it’s already stored on disk? Actually, no. It means it’s in the broker’s memory. Is it a problem? Yes, it can be a problem when all brokers fail at the same time. It’s possible when all brokers are in the same availability zone which is the worst practice.
Normally messages are flushed to disk when operating system decides to do so, but one can override this by setting the log.flush.interval.messages
or log.flush.interval.ms
on broker level or flush.messages.flush.ms
on a topic level. For example, setting flush.messages=1
will cause writing every single message to disk. As you can imagine it has a big performance impact, so think twice before doing it.
Things are even worse — if you sync data to the disk it can be stored in disk cache. But the scenario when all disks in all brokers are broken at the same time is nearly impossible. On the other hand — if the cluster is poorly designed and all brokers are in the same availability zone the power failure can cause data loss.
Summary
Kafka is a distributed system and can be super durable if configured correctly. Some take-aways:
Producer settings
acks=all
retries=2147483647
delivery.timeout.ms=2147483647
Brokers settings
unclean.leader.election.enable=false
default.replication.factor=3
min.insync.replicas=2
offsets.topic.replication.factor=3
transaction.state.log.replication.factor=3
transaction.state.log.min.isr=2
Consumer settings
enable.auto.commit=false
Get “Start with Apache Kafka eBook”
We’ve gathered our lessons learned while consulting clients and using Kafka in commercial projects.
Need help with Apache Kafka?
We are a Certified Technical Partner of Confluent. Our engineering expertise with stream processing and distributed systems applications is proven in commercial projects, workshops and consulting.