In our previous blog we talked about monitoring Kafka as a broker service, looking at ways to think about disk utilization and replication problems. But the Kafka brokers sit in the middle of an ecosystem, with Kafka producers on one side writing data, and Kafka consumers on the other side reading data. In this post, we will dive into the consumer side of this application ecosystem, which means looking closely at Kafka consumer group monitoring.
Kafka as a broker service has a very simple API, and could practically be used with many kinds of applications and application architectures leveraging the brokers for i/o and queueing messages. However, it turns out that there is a common architecture pattern: a group of application nodes collaborates to consume messages, often scaling out as message volume goes up, and handling the scenario where nodes crash or drop out.. This pattern generally keeps the data and messages flowing with reliability and with certainty even as application nodes come and go. There’s also a reference implementation for this architecture based on decades of hard won experience with high performance distributed systems, called the Kafka Consumer Group. This reference implementation is shipped with Apache Kafka as a JAR and is well documented, although it is possible to implement a Consumer Group application in any language.
A Consumer Group’s Relationship to Partitions
While the Consumer Group uses the broker APIs, it is more of an application pattern, or a set of behaviors embedded into your application. The Kafka brokers are an important part of the puzzle, but do not provide the Consumer Group behavior directly. A Consumer Group based application may run on several nodes, and when they start up they coordinate with each other in order to split up the work. This is slightly imperfect because the work in this case is a set of partitions defined by the Producer. Each Consumer node can read a partition and one can split up the partitions to match the number of consumer nodes as needed. If the number of Consumer Group nodes is more than the number of partitions, the excess nodes remain idle. This might be desirable to handle failover. If there are more partitions than Consumer Group nodes, then some nodes will be reading more than one partition.
Reading Multiple Partitions on one Node
There are a couple of tricky things to consider as one designs a Consumer Group. If a consumer node takes multiple partitions, or ends up taking multiple partitions on failover, those partitions will appear intermingled, if viewed as a single stream of messages. So a Consumer Group application could get row #100 from partition 3, then row #90 from partition 4, then back to partition 3 for row #101. Nothing in Kafka can guarantee order across partitions, as only messages within a partition are in order. So either order should not matter to the consumer application, or the consumer application is able to order these partitions by splitting the stream appropriately.
Multiple Topics within a Consumer Group
The other tricky design consideration is that each member of a Consumer Group may subscribe to some, but not all, of the topics being handled in the group. This makes thinking about distribution a little complex. In a simple case of a Consumer Group handling one and only one topic, all nodes would subscribe to that topic and distribution of work would be uniform. If there are two topics, and only some nodes subscribe to Topic-1, then those Topic-1 partitions will only be assigned to the subscribing nodes, and if one goes down it will be reassigned only to one of the remaining subscribing nodes, if there are any. Think of this Consumer Group design like a group of groups, where each subgroup is pooled and balanced independently.
The Rebalancing Phase
As nodes in a Consumer Group come and go, the running nodes decide how to divide up the partitions. In the reference implementation, each partition is assigned one owner in a rebalancing phase. Rebalancing triggers under different circumstances, but think of it as the phase that happens when an application scales up and down. When an application crashes, all the well-behaved nodes stop work, unsubscribe from their partitions, and their former partitions will be available to be reassigned. Those well-behaved nodes will then wait for all the partitions to reach this state. The less-well-behaved nodes, such as the one that suddenly crashed, will of course not unsubscribe to their partitions.
In this failure case, where some nodes are waiting patiently and some other nodes are gone, wedged, or otherwise non-responsive, two timeouts start ticking. One is a timeout for the Kafka client, which might be something like zookeeper.session.timeout.ms. This is a heartbeat window which is used for detecting that a node hasn’t reported back in a timely manner. This is tested all the time and used to evict bad nodes. The other timeout is rebalance.backoff.ms * rebalance.max.retries. This is the largest window allowed for the rebalancing phase, where clients are not reading anything from Kafka. But if this window is smaller than the Kafka client session timer, rebalancing could fail due to a crashed node and you’d have a stopped Consumer Group. And if the Kafka client session timer is too small, you could evict application nodes by mistake and trigger unnecessary rebalancing. So thinking carefully about these two timeout windows is necessary to keep your application running well.
Looking a little deeper into rebalancing, one might wonder how these assignments between clients and partitions happen. This turns out to be an interesting area for Kafka’s roadmap, as you can imagine different strategies for assigning work can be quite useful to different kinds of applications. One might, for example, have specialist nodes that are better for some kinds of work within the group, and it might be nice to try to push the right data to them. Today that can be done at the Topic level, as mentioned above, essentially dividing a Consumer Group into a bunch of subgroups. Or one might want some assignment that results in uniform workloads, based on the number of messages in each partition. But until we have pluggable assignment functions, the reference implementation has a straightforward assignment strategy called Range Assignment. There is also a newer Round Robin assignor which is useful for applications like Mirror Maker, but most applications just use the default assignment algorithm.
The Range Assignor tries to land on a uniform distribution of partitions, at least within each topic, while at the same time avoiding the need to coordinate and bargain between nodes. This last goal, independent assignment, is done by each node executing a fixed algorithm: sort the partitions, sort the consumers, then for each topic take same-sized ranges of partitions for each consumer. Where the sizes cannot be the same, the consumers at the beginning of the sorted list will end up with one extra partition. With this algorithm, each application node can see the entire layout by itself, and from there take up the right assignments.
Let’s look at an example from comments in the source code:
* For example, suppose there are two consumers C0 and C1, two topics t0 and t1, and each topic has 3 partitions,
* resulting in partitions t0p0, t0p1, t0p2, t1p0, t1p1, and t1p2.
* The assignment will be:
* C0: [t0p0, t0p1, t1p0, t1p1]
* C1: [t0p2, t1p2]
Notice that each topic is broken up into ranges regardless of the other topics, so the first application node in this case ends up with one extra partition from Topic-1 and one extra partition from Topic-2. This could be twice the work for our unbalanced node, as it has four partitions while the second node has only two. But if a third node were added, everything would become perfectly balanced, as each node would have one partition from each topic. And if a fourth node were added, you’d have one idle node doing nothing, because no topic has four partitions.
You might be wondering at this point where all of these assignment decisions are stored. In earlier versions of the Consumer Group reference implementation, Zookeeper was used to store all of this kind of meta data. Since then, newer versions of Kafka have a set of APIs to support storing Consumer Group metadata in the brokers themselves. Each Consumer Group can sync up with one of the brokers that will take on the role of Coordinator for that group. While all the decision making is still down in the application nodes, the Coordinator can fulfill a JoinGroup request and supply metadata about the Consumer Group, like assignments and offsets. This Coordinator node is also responsible for the heartbeat timer mentioned above, so if the Consumer Group application node that is leading group decisions disappears, the Coordinator could kick everyone out and essentially require the Consumer Group to be reformed by the remaining nodes. An important part of Consumer Group behavior, then, is electing leader nodes and working with the Coordinator to read and write metadata about assignments and partitions.
This is a lot of complex behavior that you “get for free” when you use a Consumer Group, so it is important to understand not just how to configure and set up your application, but also how to get operational insight into the various systems. To cover the application ecosystem end to end, you must monitor at least Zookeeper, Brokers / Coordinators, Producers, and Consumers. Zookeeper is at least used to bootstrap everything else, but often is also used to store Consumer Group assignments and offset updates. Brokers / Coordinators must be fully functional of course as every message must pass through them. It is possible to see Brokers in a degraded state while the Producers and Consumers are working correctly, but it typically cannot last this way for a long time without eventually starting to impact throughput or error rates at least on the Producers. Monitoring Producers is like monitoring a simpler Kafka application, which just wants to write to a partition. And we can see Producer behavior holistically from the Consumer’s point of view, as it is possible to tell from Broker metadata how much data is being added to each of the partitions under a Consumer Group. So even though the Producers are not necessarily coordinated or aware of a Consumer Group, the Consumer Group can naturally tell if Producers have sudden spikes or drops in traffic.
Lag as a KPI
Just by looking at the metadata of a Consumer Group, we can determine a few key metrics: how many messages are being written to the partitions within the group, how many messages are being read from those partitions, and what is the difference? The difference is called Lag, and represents how far the Consumer Group application is behind the producers. Producer offsets are kept in the Kafka Broker in charge of that partition, which can tell you the last offset in the partition. Consumer offsets are kept either in Zookeeper or the Kafka Coordinator, and tell you the most recently read offset in each partition. Note that these offsets are eventually consistent, and synchronized on different heartbeats by different application clusters, so they may not make perfect sense at all times. For example, you could counterintuitively have a Consumer offset that is greater than a Producer offset, but if you waited another heartbeat cycle or two and then updated the Producer offset, it should normally be ahead of the previous Consumer offset. In aggregate, total application lag is the sum of all the partition lags. For a normal Consumer Group, lag should be close to zero or at least somewhat flat and stable, which would mean the application is keeping up with the producers. Total lag is the number of messages behind real time. For an application that wants to be near real time it is important to monitor lag as a key performance indicator, and to drive lag down.
Monitoring Consumer Lag with OpsClarity
As you can see, the mechanics of consumer lag and monitoring can be complex and difficult. Most monitoring solutions offer Kafka Broker monitoring, and leave it to the user to collect application metrics around Consumer Groups. In the next blog, we’ll look at Consumer Group monitoring with open source solutions like Burrow, and compare to how we monitor Kafka at OpsClarity. OpsClarity has automated monitoring of the entire Kafka ecosystem, from Producers to Brokers to Consumer Groups, integrated with surrounding systems critical to your application. In Part 2 of this blog we discuss how some of open source monitoring tools address consumer lag. In Part 3, we discuss how OpsClarity monitoring solution provides a more comprehensive solution for Monitoring Kafka and Consumer Lag.
On-Demand Webinar: Monitoring Kafka and Understanding Consumer Lag