Kafka log and consumer offset expiration test

consumer spending

The consumer locally records two offset s:
• When consuming the pulled offset, fetchInitialOffset will obtain the last consumed offset from the broker during initialization and rebalance, and then pull messages from the specified offset. Next, the Consumer will continuously pull messages from the specified offset to the broker through fetchNewMessages().
• Consume the submitted offset, and automatically/manually submit the consumed offset after consumption

Describe the submission steps:
1. Local first mark offset, MarkOffset:

  • Note that only the local tags have been consumed at this time, and are not actually submitted to the broker

2. Then execute CommitOffset to submit to the broker, and CommitOffset has the following two ways:

  • Auto-commit (the default is this), Consumer.Offsets.AutoCommit.Enable (open by default)

    Two situations trigger CommitOffset to commit automatically:
    1. Timing submission, related to Consumer.Offsets.AutoCommit.Interval
    2. When the consumer session is stopped and the consumption is interrupted, the submission will be triggered when it exits

  • Submit manually (not recommended)

The topic __consumer_offsets stores consumer offsets information. The default cleanup policy log.cleanup.policy is compact. Simply put, it compresses the same key and keeps the last one. The default log.cleanup.policy of other topic log s is delete by default. The log.retention parameter is for the delete cleanup policy and does not take effect on compact. compact means that junk files will be cleaned up after compression, which is mainly related to the log.cleaner configuration. Can read:
Kafka 2.2.0 message log cleaning mechanism: log deletion log compression


kafka broker version: v2.0.0, sarama (go sdk) version v1.26.1
Change setting:
• log.retention.minutes: 5 (default log.retention.hours=168)

log retention time

• log.retention.check.interval.ms: 1000 (default 600000ms,10 minutes)

Log Expiration Check Frequency

• offsets.retention.minutes: 1 (default 24 hours before, default 10080 after version 2.0, 7 days)

Consumer offset retention time

• offsets.retention.check.interval.ms: 1000 (default 300000ms, 5 minutes)

Expired offset check frequency for consumers

• Consumer initialization consumption is set to OffsetOldest

Consumers consume from the oldest log offset after expiration

consumer offset expiration time

1. Open the producer and consumer

Where offset10 is the last round of production

2. Read the __consumer_offsets message to view the consumer offset

./kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic __consumer_offsets --formatter 'kafka.coordinator.group.GroupMetadataManager$OffsetsMessageFormatter'|grep -v console-consumer-

Where ExpirationTime-CommitTime=1 minute, indicating that the retention time is one minute

3. Shut down the producer. After one minute (related to offsets.retention.minutes and offsets.retention.check.interval.ms), [my-group,sync_time_test3,0] in __consumer_offsets receives a null message, which means that the offsets of the consumer have expired .

4. Restart the producer
The consumer continues the last consumption, and it is fine if the consumer is not restarted at this time. At this time, although there is no offset consumed by the consumer in the broker, the consumer locally remembers the offset of the last fetch and will continue to pull the next message. Only during initialization and rebalance will fetchInitialOffset get the last consumed offset from the broker and then pull messages from the specified offset.

5. Shut down the producer again and wait for the consumer offset to expire

6. Restarting the consumer immediately will re-consume

Re-consumed, you can compare the consumption time with step 4

Why does the consumer still exist after the consumer offset expires?

1. First open the producer consumer

2. Shut down the producer
The consumer has expired, but looking at the consumer still has:

Close the consumer, the consumer disappears:

But the content inside is gone, the content can be viewed with the following command:

./kafka-consumer-groups.sh --bootstrap-server localhost:9092 --group my-group --describe
# This command will report an error in kafka v2.0.0 https://github.com/apache/kafka/pull/4980

3. The analysis should be because the heartbeat keeps the consumer all the time

Consumer.Group.Session.Timeout is used to detect the timeout of worker program failure. The worker sends heartbeats periodically to signal its liveness to the agent. If the agent does not receive a heartbeat before this session timeout expires, the agent will be removed from the group. Note that the value must be between group.min.session.timeout.ms and group.max.session.timeout.ms configured in the broker configuration.

Let's set it to 30s first:

Then set sleep for a long time without heartbeat:

4. Only open consumers
my-group is removed after 30 seconds because there is no heartbeat

log expiration time

Check topic Order
./kafka-run-class.sh kafka.tools.GetOffsetShell --topic sync_time_test3 --broker-list localhost:9092 --time -1 
# View the offset of the topic (not the offset of the consumer), the last parameter -1 means to display the maximum value of the current offset, -2 means the minimum value of the offset

1. Produce messages first, and then close production after generating a few
View topic offset

After generating the message, check the topic offset

2. Open the consumer, close after consumption

3. Re-open the consumer every more than a minute, at this time it will be re-consumed

4. After more than 5 minutes, the maximum and minimum offsets of the log are the same, the rest have expired, and only the next offset will be retained

5. At this time, there is no news if the consumer is opened again.

consumer commit code

Here is part of the code for CommitOffset:

// Consume implements ConsumerGroup.
func (c *consumerGroup) Consume(ctx context.Context, topics []string, handler ConsumerGroupHandler) error {
    // Ensure group is not closed
    select {
    case <-c.closed:
        return ErrClosedConsumerGroup
    // Refresh metadata for requested topics
    if err := c.client.RefreshMetadata(topics...); err != nil {
        return err

    // Init session Note that after entering here, it will continue to consume and wait to exit
    sess, err := c.newSession(ctx, topics, handler, c.config.Consumer.Group.Rebalance.Retry.Max)
    if err == ErrClosedClient {
        return ErrClosedConsumerGroup
    } else if err != nil {
        return err
    // Gracefully release session claims will execute offsets.Close() and flushToBroker  
    return sess.release(true)

//sess.release(true)--> offsets.Close():
func (om *offsetManager) Close() error {
    om.closeOnce.Do(func() {
        // exit the mainLoop
        if om.conf.Consumer.Offsets.AutoCommit.Enable {

        // mark all POMs as closed

        // flush one last time, the last flush
        if om.conf.Consumer.Offsets.AutoCommit.Enable {
            for attempt := 0; attempt <= om.conf.Consumer.Offsets.Retry.Max; attempt++ {
                if om.releasePOMs(false) == 0 {
    return nil

//Submit offset to broker
func (om *offsetManager) Commit() {

func (om *offsetManager) flushToBroker() {
    req := om.constructRequest() //This will check if there is a new offset that needs to be submitted
    resp, err := broker.CommitOffset(req) //Here is the submission
    om.handleResponse(broker, req, resp)

func (om *offsetManager) constructRequest() *OffsetCommitRequest {
    var r *OffsetCommitRequest
    var perPartitionTimestamp int64
    //This is whether to manually set the offset retention time. If it is 0, the retention time of the broker shall prevail.
    if om.conf.Consumer.Offsets.Retention == 0 {
        perPartitionTimestamp = ReceiveTime
        r = &OffsetCommitRequest{
            Version:                 1,
            ConsumerGroup:           om.group,
            ConsumerID:              om.memberID,
            ConsumerGroupGeneration: om.generation,
    } else {
        r = &OffsetCommitRequest{
            Version:                 2,
            RetentionTime:           int64(om.conf.Consumer.Offsets.Retention / time.Millisecond),
            ConsumerGroup:           om.group,
            ConsumerID:              om.memberID,
            ConsumerGroupGeneration: om.generation,

    for _, topicManagers := range om.poms {
        for _, pom := range topicManagers {
            if pom.dirty { //The dirty here is to determine whether the data has been updated and whether there is a new offset that needs to be submitted
                r.AddBlock(pom.topic, pom.partition, pom.offset, perPartitionTimestamp, pom.metadata)

    if len(r.blocks) > 0 {
        return r

    return nil


  • The retention of topic logs is related to log.retention.check.interval.ms and log.retention.check.interval.ms (which can be configured locally for the topic). Even after the log is cleaned up, the latest offset information will be retained. After the retention time is exceeded, the next time Production will also continue from last time. But topic: __consumer_offsets defaults to the compact retention policy and has nothing to do with log.retention.
  • Consumer information is kept in __consumer_offsets. It is related to offsets.retention.minutes and offsets.retention.check.interval.ms (consumer configuration cannot be performed for the specified topic, it is a global setting), if you have to change it separately, you can change the parameter Consumer.Offsets.Retention of the consumer client, such as setting For 2 minutes:

    At this point, the consumer's ExpirationTime-CommitTime=2 minutes is specified in __consumer_offsets:
  • It is recommended to set the expiration time of consumers offsets.retention > log retention time log.retention

Recommended reading:

[kafka principle] Consumer offset __consumer_offsets_ related analysis
Consumer offsets in Kafka __consumer_offsets
Kafka 2.2.0 message log cleaning mechanism: log deletion log compression

Tags: Operation & Maintenance Go Back-end kafka

Posted by MerMer on Sat, 15 Oct 2022 11:34:01 +0530