
Kafka is a very powerful and robust messaging system that is widely used in big data systems.
However, it is not that good at its own housekeeping.
Each topic in Kafka is divided into partitions that enable parallel publishing and consuming of data and each partition has several replicas (usually Three) which are spread across different nodes of the Kafka cluster. This way, if a node is lost we still have the data and can continue working.
Unlike other systems such as HDFS, Kafka is not self-healing in case of a node lost. When a node is lost, some partitions will get under-replicated. Kafka will not replicate those partitions to another node. Moreover, if we add a new, empty node, to the cluster, Kafka will not automatically move any replicas to it and it will remain unused until we create new topics (and the old topics will remain on the other nodes). I guess this is missing because the designers of Kafka wanted to make it “masterless” where no node is different from the others and there is no master node which manages things.
So we have to do this manually or rely on third party products to do the job. I created a demo 4 node Kafka cluster with a couple of topics.
First, let’s examine the manual option:
Manual rebalancing
First, a note: I used Cloudera’s kafka. For some reason Cloudera tends to take Apache software and change the name while maintaining the same functionality. So the names of the scripts may differ a little from the Apache or the Confluent version. For instance, Apache Kafka uses ReassignPartitionsComman while Cloudera uses kafka-reassign-partitions.sh, but they have the same functionality.
Manual rebalancing is a tedious task, especially if you have to rebalance many topics and if it happens often. A better option is using Cruise control that will be presented later in this post.
Run this to see the partition status:
kafka-topics –describe –zookeeper cloudera4.lan:2181
The above command will show you all the topics in the cluster. You can add a parameter –topic [topic name] to view a specific topic. For the demonstration we will focus only on one topic called “guy1”.
You can see information about guy1 topic. For each partition you can see which broker id is the leader, on which brokers the replicas of every partition reside and it’s ISR (list of replicas which are in sync with the leader).
Topic:guy1 PartitionCount:10 ReplicationFactor:3 Configs:cleanup.policy=delete,min.insync.replicas=1 Topic: guy1 Partition: 0 Leader: 88 Replicas: 88,89,94 ISR: 89,88,94 Topic: guy1 Partition: 1 Leader: 89 Replicas: 89,94,87 ISR: 89,87,94 Topic: guy1 Partition: 2 Leader: 94 Replicas: 94,87,88 ISR: 87,88,94 Topic: guy1 Partition: 3 Leader: 88 Replicas: 87,88,89 ISR: 89,88,87 Topic: guy1 Partition: 4 Leader: 88 Replicas: 88,87,89 ISR: 88,87,89 Topic: guy1 Partition: 5 Leader: 89 Replicas: 89,88,94 ISR: 89,88,94 Topic: guy1 Partition: 6 Leader: 94 Replicas: 94,89,87 ISR: 94,89,87 Topic: guy1 Partition: 7 Leader: 87 Replicas: 87,94,88 ISR: 87,94,88 Topic: guy1 Partition: 8 Leader: 88 Replicas: 88,89,94 ISR: 88,89,94 Topic: guy1 Partition: 9 Leader: 89 Replicas: 89,94,87 Isr: 89,94,87
Now let’s stop one broker, remove it from the cluster and delete all its log dirs (in order to mimic a broker failure). We will remove the broker with Id 94.
If we run the describe command again:
Topic:guy1 PartitionCount:10 ReplicationFactor:3 Configs:cleanup.policy=delete,min.insync.replicas=1 Topic: guy1 Partition: 0 Leader: 88 Replicas: 88,89,94 ISR: 89,88 Topic: guy1 Partition: 1 Leader: 89 Replicas: 89,94,87 ISR: 89,87 Topic: guy1 Partition: 2 Leader: 87 Replicas: 94,87,88 ISR: 87,88 Topic: guy1 Partition: 3 Leader: 88 Replicas: 87,88,89 ISR: 89,88,87 Topic: guy1 Partition: 4 Leader: 88 Replicas: 88,87,89 ISR: 88,87,89 Topic: guy1 Partition: 5 Leader: 89 Replicas: 89,88,94 ISR: 89,88 Topic: guy1 Partition: 6 Leader: 89 Replicas: 94,89,87 ISR: 89,87 Topic: guy1 Partition: 7 Leader: 87 Replicas: 87,94,88 ISR: 87,88 Topic: guy1 Partition: 8 Leader: 88 Replicas: 88,89,94 ISR: 88,89 Topic: guy1 Partition: 9 Leader: 89 Replicas: 89,94,87 Isr: 89,87
You can see that some broker ids are missing from the ISR, and that broker 94 is not the leader of any partition anymore.
We can run this command to find under replicated partitions:
kafka-topics –describe –zookeeper cloudera4.lan:2181 –under-replicated-partitions
You can see that 8 partitions were affected by the crash of broker 94:
Topic: guy1 Partition: 0 Leader: 88 Replicas: 88,89,94 ISR: 89,88 Topic: guy1 Partition: 1 Leader: 89 Replicas: 89,94,87 ISR: 89,87 Topic: guy1 Partition: 2 Leader: 87 Replicas: 94,87,88 ISR: 87,88 Topic: guy1 Partition: 5 Leader: 89 Replicas: 89,88,94 ISR: 89,88 Topic: guy1 Partition: 6 Leader: 89 Replicas: 94,89,87 ISR: 89,87 Topic: guy1 Partition: 7 Leader: 87 Replicas: 87,94,88 ISR: 87,88 Topic: guy1 Partition: 8 Leader: 88 Replicas: 88,89,94 ISR: 88,89 Topic: guy1 Partition: 9 Leader: 89 Replicas: 89,94,87 Isr: 89,87
Note that after removing the broker and adding it back to the cluster it was assigned Id 95, not 94.
If I add the “new” broker back to the cluster those partitions will still be under replicated, so we will have to manually reassign the replicas to brokers.
First, prepare a json file listing the topics which have missing replicas, for example:
{"topics":
[{"topic": "guy1"}, {"topic": "guy2"}, {"topic": "guy3"}],
"version": 1
}
Save this file in any name you want. I chose “test.json”.
kafka-reassign-partitions.sh
Now run kafka-reasign-partitions script with the –generate option. You will have to provide a list of broker which are optional for hosting partitions (make sure you give the correct Id’s, remember the new node has new Id). And finally, point it to the test.json file we created earlier:
kafka-reassign-partitions –zookeeper cloudera4.lan:2181 –broker-list 95,87,88,89 –generate –topics-to-move-json-file test.json
The reply will be Two jsons, one with the current partitions allocation (which we know is wrong, so we just ignore it), and a proposed new allocation that tries to spread the partitions evenly across brokers:
Current partition replica assignment
{"version":1,"partitions":[{"topic":"guy1","partition":6,"replicas":[94,89,87],"log_dirs":["any","any","any"]},{"topic":"guy1","partition":7,"replicas":[87,94,88],"log_dirs":["any","any","any"]},{"topic":"guy1","partition":2,"replicas":[94,87,88],"log_dirs":["any","any","any"]},{"topic":"guy1","partition":8,"replicas":[88,89,94],"log_dirs":["any","any","any"]},{"topic":"guy1","partition":5,"replicas":[89,88,94],"log_dirs":["any","any","any"]},{"topic":"guy1","partition":4,"replicas":[88,87,89],"log_dirs":["any","any","any"]},{"topic":"guy1","partition":0,"replicas":[88,89,94],"log_dirs":["any","any","any"]},{"topic":"guy1","partition":9,"replicas":[89,94,87],"log_dirs":["any","any","any"]},{"topic":"guy1","partition":3,"replicas":[87,88,89],"log_dirs":["any","any","any"]},{"topic":"guy1","partition":1,"replicas":[89,94,87],"log_dirs":["any","any","any"]}]}
Proposed partition reassignment configuration
{"version":1,"partitions":[{"topic":"guy1","partition":8,"replicas":[95,87,88],"log_dirs":["any","any","any"]},{"topic":"guy1","partition":5,"replicas":[87,95,88],"log_dirs":["any","any","any"]},{"topic":"guy1","partition":2,"replicas":[88,95,87],"log_dirs":["any","any","any"]},{"topic":"guy1","partition":7,"replicas":[89,88,95],"log_dirs":["any","any","any"]},{"topic":"guy1","partition":4,"replicas":[95,89,87],"log_dirs":["any","any","any"]},{"topic":"guy1","partition":1,"replicas":[87,89,95],"log_dirs":["any","any","any"]},{"topic":"guy1","partition":9,"replicas":[87,88,89],"log_dirs":["any","any","any"]},{"topic":"guy1","partition":3,"replicas":[89,87,88],"log_dirs":["any","any","any"]},{"topic":"guy1","partition":6,"replicas":[88,87,89],"log_dirs":["any","any","any"]},{"topic":"guy1","partition":0,"replicas":[95,88,89],"log_dirs":["any","any","any"]}]}
If you do not want to change anything, just copy the proposed partitions json and paste it into a file. We will call it reasign.json. Now run the kafka-reasign-partitions, this time with the –execute option. This will start to move partitions around. If you run it on many topics at the same time and each has many partition then it can take some time to complete and also be resource heavy.
kafka-reassign-partitions –zookeeper cloudera4.lan:2181 –execute –reassignment-json-file reasign.json
You can view the status of the command by running the same script with the –verify option. This will tell you which tasks has completed, which are still running and what was their outcome (success/failure):
kafka-reassign-partitions –zookeeper cloudera4.lan:2181 –verify –reassignment-json-file reasign.json
When it’s all done you can run kafka-topics –describe again and see that all ISR’s have Three brokers again.
Automatic rebalancing
The best automated rebalancing (and many more automated housekeeping tasks) tool should be Cruise Control. However, until now I couldn’t get it to work properly.
It connects to my Kafka cluster, but collects no metrics. It detects offline brokers but can do nothing because it doesn’t have enough data. After a few days I gave up on it (for now) and decided to automate the manual process instead. It wont be fancy like Cruise Control but will make my life a little easier. I will get back to Cruise Control sometime and make it work.
I tried to keep it as simple as I can and I ended up with this bash script. It will work with Cloudera but with other distributions you will need to change the names of the scripts like “kafka-reasign-partitions” and “kafka-topics” according to your kafka distribution. For convenience, you need to set the appropriate commands only in the “set_commands” function.:
#!/usr/bin/bash
# This script balances a Kafka cluster. It distributes the partitions across the available brokers to achieve balance.
# An unbalanced Kafka cluster is a cluster that one or more of it's brokers are lost or newly added.
#
# There are two modes of running the balancer:
# Manual:
# This mode is aimed at planned maintenance where you just added a new broker, or when you know you are about to remove a broker
# and want to remove all partitions off this broker. It reallocate ALL the partitions and you can choose which brokers are taking part in the partitions distribution.
# This is done by passing the brokers parameter which is a list of comma separated broker id's.
#
# Auto:
# This mode is for unplanned lost of a broker. It can be run unattended, periodically, by a scheduler such as cron. It checks for under replicated partitions and if
# it finds any, it reallocate new partitions on the surviving brokers. It ignores the brokers parameter.
#
# The script is basically a wrapper around some Kafka built in scripts.
function set_commands {
KAFKA_TOPICS="kafka-topics"
ZOOKEEPER_CLIENT="/opt/Cloudera/parcels/CDH/lib/zookeeper/bin/zkCli.sh"
REASSIGN_PARTITIONS="kafka-reassign-partitions"
}
function log() {
timestamp=`date`
echo $timestamp
echo ""
echo $1
}
function usage {
echo "Usage: balancer.sh --zookeeper=[zookeeper host:port] --mode=[auto | manual] [--brokers=[comma delimited broker ids]]"
}
function start_manual {
$KAFKA_TOPICS --describe --zookeeper $zookeeper > temp
if [ -z "$brokers" ];then
brokers=`$ZOOKEEPER_CLIENT -server $zookeeper ls /brokers/ids > brokers;cat brokers | tail -1 |tr -d "[]" | tr -d ' '`
fi
common
}
function start_auto {
$KAFKA_TOPICS --describe --zookeeper $zookeeper --under-replicated-partitions > temp
brokers=`$ZOOKEEPER_CLIENT -server $zookeeper ls /brokers/ids > brokers;cat brokers | tail -1 |tr -d "[]" | tr -d ' '`
common
}
function common {
lines=`cat temp | awk -F ":" '{print $2}' | awk '{print $1}' | uniq | wc -l`
if [ $lines = 0 ];then
log "No under replicated partitions found. Exiting."
exit 0
fi
echo "{"topics": [" > 1.json
i=0
for topic in `cat temp | awk -F ":" '{print $2}' | awk '{print $1}' | uniq`;do
((i=i+1))
if [ $i = $lines ];then
echo "{"topic": "$topic"}" >> 1.json
else
echo "{"topic": "$topic"}," >> 1.json
fi
done
echo "]," >> 1.json
echo ""version":1 }" >> 1.json
$REASSIGN_PARTITIONS --zookeeper $zookeeper --broker-list $brokers --generate --topics-to-move-json-file 1.json > temp
sed -n 5,6p temp > 2.json
$REASSIGN_PARTITIONS --zookeeper $zookeeper --execute --reassignment-json-file 2.json
log "rebalancing started. Please run "$REASSIGN_PARTITIONS --zookeeper $zookeeper:2181 --verify --reassignment-json-file 2.json" to check status."
}
for var in "[email protected]";do
case "$var" in
--zookeeper=*)
zookeeper="${var#*=}"
;;
--mode=*)
mode="${var#*=}"
;;
--brokers=*)
brokers="${var#*=}"
;;
*)
usage
exit 1
esac
done
set_commands
if [ $mode = "auto" ];then
start_auto
elif [ $mode = "manual" ];then
start_manual
else
usage
fi
The script takes Three possible parameters:
- zookeeper (host:port)
- Mode (See description in the beginning of the script)
- brokers (a comma separated list of broker id’s)
It writes all messages to standard output, not to any log file. So if you want to schedule it to run automatically you should collect and keep the output (something like “balancer.sh –mode=auto –zookeeper=cloudera4.lan:2181 > balancer.log 2>&1”).
Keep your cluster (and yourself) balanced !