In this Kafka series we are taking a deep look at Apache Kafka and Consumer Lag monitoring. In Part 1 of this series we discussed the basics about Kafka Consumer Lag. In Part 2, we discussed how some open source alternatives monitor Consumer Lag and their limitations. In this post, we will discuss how OpsClarity monitors Consumer lag.

Given all the complexity and challenges with monitoring lag, how do we do it at OpsClarity? Instead of modeling lag as an acceleration problem, OpsClarity models lag as a queue size problem and alert you when the queue is abnormally large. Instead of running Kafka clients to pull all messages or consume offset topics in real time, we use Kafka APIs to periodically measure offsets, with no additional overhead in between measurement periods. Instead of requiring configuration of the version of the Consumer Group, OpsClarity figures out where the offsets are stored automatically. Instead of running a Kafka monitor in isolation, it collects and visualizes issues up and down the service stack and across the data center, so that if Zookeeper has a host issue that then causes a service issue that then causes a Kafka issue, you can visually see that correlation and track the Kafka application issue to an upstream problem.

Detecting Queue Size Anomalies with OpsClarity

At OpsClarity, instead of looking at lag as acceleration, we prefer to model lag as a Queue. The unread messages in a Consumer Group represent a queue of work to do. A queue might be very stable and close to zero, or it may have cyclic patterns where it bursts up then is brought back down. We use a machine learning model to detect Queue Size anomalies, which allows it to adapt to each system’s queue behavior. In this way we can alert for lag problems with any type of Consumer Group application, without having to set thresholds.

Kafka-anomaly

 

ConsumerLag

Detecting Lag with OpsClarity Queue Latency Anomaly Detection Models

OpsClarity also offers another derivative metric, called lag_ms (for lag measured in milliseconds instead of messages). This metric comes from the same data points: at this point in time, the producer committed message number N. Then, using a table of Producer offsets over time, we can interpolate or extrapolate the time of any arbitrary offset, and then lookup consumer offsets in terms of what time the Producer wrote those messages. Now, instead of saying the consumer is 4000 messages behind, you can say it is 45 seconds behind, even while another consumer is behind another producer by 4000 messages, in that instance it was only 5 seconds behind. This way, instead of counting messages, which can be quite abstract and hard to compare between applications, you can think of lag as a simple latency problem: how far behind real time is the application? You might have SLAs or performance goals in terms of latency. For example, you might want an application to not fall more than 5 minutes behind. With the lag_ms metric, you can set simple threshold alerts on your SLAs and performance goals. And you can also use the OpsClarity Latency Model, which will learn what latencies are normal and alert you on anomalies.

Collecting Data

To get all of this lag data, OpsClarity builds a list of each consumer group, where each group has a list of topics, and where each topic has a list of partitions. Every partition has one producer offset, which is the latest message written, and one consumer group client offset, which is the last message read. Lag for a partition is then the difference between those offsets.

Zookeeper

To get the list of all consumer groups, OpsClarity looks within Zookeeper (for older style Consumer Groups) and the Kafka Coordinators (for newer style Consumer Groups). Zookeeper will at a minimum have a list of brokers. There is an optional chroot, which one could use to handle multiple Kafka clusters in one Zookeeper, then a brokers/ folder. The Kafka data stored in Zookeeper is in JSON strings, so when you get a key like broker/id/1 you get a string back, which you can then unpack as JSON, and has things like “host:” and “port:” fields. This is one way a Kafka application can bootstrap, as well as keep track of brokers as they go up and down.

Zookeeper might have a bunch of other metadata, most importantly under a “consumers” folder. The minimum we need is to get whatever Zookeeper Style consumer groups are listed under this folder, then the topics and partitions under those.

Those are the pieces we need to track within this Consumer Group. We don’t need the details about how the consumer group has balanced this load, yet.

Kafka Coordinator

Newer generation Consumer groups won’t store their meta data in Zookeeper, aside from the list of brokers. Instead, each group will elect one of the Brokers to act as a Coordinator. In this case, we can use the ListGroups and DescribeGroups APIs to get all the groups, and topics, and partitions, just as you would have done with Zookeeper. Kafka API calls are made by sending and receiving java data structures over a TCP connection to a broker. So, to ask a broker to ListGroups, you send it an encoded ListGroupsRequest and you’ll get back a ListGroupResponse data structure with an array of group names. The Brokers typically provide only APIs for the things they are directly managing, so in this case you’d have to ask each Broker for its groups in turn.

Offsets

Now that we have the list of topics and partitions for each group, we need a way to lookup all the offsets. Instead of tracking these continuously, we periodically use two Kafka APIs called FetchOffset and GetAvailableOffsets. The FetchOffset call takes a Consumer Group along with a list of all the partitions you want, and returns the consumer offset for each partition. You have to tell this API if the meta data is stored in Zookeeper or the Coordinator, but this is a way to let Kafka do the work of looking things up in Zookeeper so you don’t have to reinvent that mechanism.

The GetAvailableOffsets is an API that takes a list of partitions and returns the newest offset. Now, you can make one API call to get all the producer offsets available on that broker. Note that these are provided by the partition leader, which may be different from the consumer group coordinator. Each broker needs to be asked for the newest offsets of partitions that it leads.

 Putting it All Together

Now we have a nice set of offsets for every partition that is interesting: one write offset representing where the producer has synchronized, and one read offset for each group. You can have multiple things reading from a partition, but even if you had some unusual situation with multiple producers writing to one partition it would in the end look like one writer. Think of it as when you write something to a partition, the write offset moves for everyone. When an application reads something, that’s only interesting to that application, other applications don’t pay any attention. So, an application that reads by catching up once an hour would have a different offset than an application that tries to read continuously. But they all still share the same underlying data and can find the same write offset.

Following the logical organization of a Consumer Group, we can get a list of partitions for each group, and calculate lag for each of those partitions.

Note that it is possible that synchronizations between Producers and Consumers are out of phase and lag might appear negative. This means the next Producer sync will show that it is indeed ahead of the consumer, and in the meantime we can assume lag is close to zero. If they’re this close to each other, I think we can assume the consumer is in good shape and keeping up easily.

Scenarios

Now that you’re monitoring lag, you can catch many scenarios that are hard to catch otherwise. If one application node is having problems processing messages quickly, this won’t necessarily be easy to spot in Kafka Broker metrics. You might see a small dip in consumer messages on those topics, but it would typically be hard to spot clearly. But that node will immediately have a large jump in lag, and clearly signal that something is going wrong. Consumer Groups also have scenarios where rebalancing results in some nodes doing more work than before. If that extra work is absorbed by existing capacity of the node, then lag will be normal. But if that extra work causes small slow downs in the application message loop, then lag will start to rise above the other nodes.

Correlated Metrics