Kafka中commitAsync的使用与实例解析

news/2025/2/24 16:45:09

在使用Apache Kafka进行消息处理时,正确管理偏移量(offset)是确保数据一致性和可靠性的重要环节。Kafka提供了多种方式来提交偏移量,其中commitAsync()方法是一种高效且灵活的选择。本文将通过一个完整的实例,详细介绍如何在Kafka中使用commitAsync()方法来异步提交偏移量。

  1. 为什么需要异步提交偏移量?
    在Kafka中,偏移量用于记录消费者消费消息的位置。默认情况下,Kafka消费者会自动提交偏移量,但这种方式可能会导致数据丢失或重复消费。通过将enable.auto.commit设置为false,并手动调用commitAsync()方法,我们可以更精确地控制偏移量的提交时机,从而提高系统的可靠性和性能。
  2. 示例项目配置
    在开始之前,我们需要配置Kafka的生产者和消费者属性。以下是示例代码中的配置类ExampleConfig,它为生产者和消费者提供了基本的配置参数。
    java复制
    package com.logicbig.example;

import java.util.Properties;

public class ExampleConfig {
public static final String BROKERS = “localhost:9092”;

public static Properties getProducerProps() {
    Properties props = new Properties();
    props.put("bootstrap.servers", BROKERS);
    props.put("acks", "all");
    props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
    props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
    return props;
}

public static Properties getConsumerProps() {
    Properties props = new Properties();
    props.setProperty("bootstrap.servers", BROKERS);
    props.setProperty("group.id", "testGroup");
    props.setProperty("enable.auto.commit", "false");
    props.setProperty("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
    props.setProperty("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
    return props;
}

}
3. 创建Kafka主题
在运行消费者和生产者之前,我们需要创建一个Kafka主题。以下代码展示了如何使用AdminClient创建一个名为example-topic-2020-5-28的主题,并设置其分区数为1。
java复制
package com.logicbig.example;

import org.apache.kafka.clients.admin.AdminClient;
import org.apache.kafka.clients.admin.AdminClientConfig;
import org.apache.kafka.clients.admin.NewTopic;
import java.util.Collections;
import java.util.Properties;
import java.util.stream.Collectors;

public class TopicCreator {
public static void main(String[] args) throws Exception {
createTopic(“example-topic-2020-5-28”, 1);
}

private static void createTopic(String topicName, int numPartitions) throws Exception {
    Properties config = new Properties();
    config.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, ExampleConfig.BROKERS);
    AdminClient admin = AdminClient.create(config);

    boolean alreadyExists = admin.listTopics().names().get().stream()
            .anyMatch(existingTopicName -> existingTopicName.equals(topicName));

    if (alreadyExists) {
        System.out.printf("topic already exits: %s%n", topicName);
    } else {
        System.out.printf("creating topic: %s%n", topicName);
        NewTopic newTopic = new NewTopic(topicName, numPartitions, (short) 1);
        admin.createTopics(Collections.singleton(newTopic)).all().get();
    }

    System.out.println("-- describing topic --");
    admin.describeTopics(Collections.singleton(topicName)).all().get()
            .forEach((topic, desc) -> {
                System.out.println("Topic: " + topic);
                System.out.printf("Partitions: %s, partition ids: %s%n", desc.partitions().size(),
                        desc.partitions()
                                .stream()
                                .map(p -> Integer.toString(p.partition()))
                                .collect(Collectors.joining(",")));
            });

    admin.close();
}

}
运行上述代码后,将创建一个名为example-topic-2020-5-28的主题,分区数为1。
4. 使用commitAsync()提交偏移量
接下来,我们将通过一个完整的消费者和生产者示例,展示如何使用commitAsync()方法异步提交偏移量。
生产者代码
生产者代码将向example-topic-2020-5-28主题发送4条消息。
java复制
package com.logicbig.example;

import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;

public class CommitAsyncExample {
private static String TOPIC_NAME = “example-topic-2020-5-28”;

private static void sendMessages() {
    Properties producerProps = ExampleConfig.getProducerProps();
    KafkaProducer<String, String> producer = new KafkaProducer<>(producerProps);

    for (int i = 0; i < 4; i++) {
        String value = "message-" + i;
        System.out.printf("Sending message topic: %s, value: %s%n", TOPIC_NAME, value);
        producer.send(new ProducerRecord<>(TOPIC_NAME, value));
    }

    producer.flush();
    producer.close();
}

public static void main(String[] args) throws Exception {
    sendMessages();
}

}
消费者代码
消费者代码将订阅example-topic-2020-5-28主题,并使用commitAsync()方法异步提交偏移量。
java复制
package com.logicbig.example;

import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.common.TopicPartition;
import java.time.Duration;
import java.util.*;

public class CommitAsyncExample {
private static String TOPIC_NAME = “example-topic-2020-5-28”;
private static KafkaConsumer<String, String> consumer;
private static TopicPartition topicPartition;

public static void main(String[] args) throws Exception {
    Properties consumerProps = ExampleConfig.getConsumerProps();
    consumer = new KafkaConsumer<>(consumerProps);
    topicPartition = new TopicPartition(TOPIC_NAME, 0);
    consumer.assign(Collections.singleton(topicPartition));

    printOffsets("before consumer loop", consumer, topicPartition);
    sendMessages();
    startConsumer();
}

private static void startConsumer() {
    while (true) {
        ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(5));
        for (ConsumerRecord<String, String> record : records) {
            System.out.printf("consumed: key = %s, value = %s, partition id= %s, offset = %s%n",
                    record.key(), record.value(), record.partition(), record.offset());
        }

        if (records.isEmpty()) {
            System.out.println("-- terminating consumer --");
            break;
        }

        printOffsets("before commitAsync() call", consumer, topicPartition);
        consumer.commitAsync();
        printOffsets("after commitAsync() call", consumer, topicPartition);
    }

    printOffsets("after consumer loop", consumer, topicPartition);
}

private static void printOffsets(String message, KafkaConsumer<String, String> consumer, TopicPartition topicPartition) {
    Map<TopicPartition, OffsetAndMetadata> committed = consumer.committed(new HashSet<>(Arrays.asList(topicPartition)));
    OffsetAndMetadata offsetAndMetadata = committed.get(topicPartition);
    long position = consumer.position(topicPartition);
    System.out.printf("Offset info %s, Committed: %s, current position %s%n", message,
            offsetAndMetadata == null ? null : offsetAndMetadata.offset(), position);
}

private static void sendMessages() {
    Properties producerProps = ExampleConfig.getProducerProps();
    KafkaProducer<String, String> producer = new KafkaProducer<>(producerProps);

    for (int i = 0; i < 4; i++) {
        String value = "message-" + i;
        System.out.printf("Sending message topic: %s, value: %s%n", TOPIC_NAME, value);
        producer.send(new ProducerRecord<>(TOPIC_NAME, value));
    }

    producer.flush();
    producer.close();
}

}
5. 运行结果分析
运行上述消费者代码后,输出结果如下:
复制
Offset info before consumer loop, Committed: null, current position 0
Sending message topic: example-topic-2020-5-28, value: message-0
Sending message topic: example-topic-2020-5-28, value: message-1
Sending message topic: example-topic-2020-5-28, value: message-2
Sending message topic: example-topic-2020-5-28, value: message-3
consumed: key = null, value = message-0, partition id= 0, offset = 0
consumed: key = null, value = message-1, partition id= 0, offset = 1
consumed: key = null, value = message-2, partition id= 0, offset = 2
consumed: key = null, value = message-3, partition id= 0, offset = 3
Offset info before commitAsync() call, Committed


http://www.niftyadmin.cn/n/5864600.html

相关文章

Lua | 每日一练 (3)

&#x1f4a2;欢迎来到张胤尘的技术站 &#x1f4a5;技术如江河&#xff0c;汇聚众志成。代码似星辰&#xff0c;照亮行征程。开源精神长&#xff0c;传承永不忘。携手共前行&#xff0c;未来更辉煌&#x1f4a5; 文章目录 Lua | 每日一练 (3)题目参考答案减少查找次数预分配表…

【设计模式】【创建型模式】单例模式(Singleton)

&#x1f44b;hi&#xff0c;我不是一名外包公司的员工&#xff0c;也不会偷吃茶水间的零食&#xff0c;我的梦想是能写高端CRUD &#x1f525; 2025本人正在沉淀中… 博客更新速度 &#x1f44d; 欢迎点赞、收藏、关注&#xff0c;跟上我的更新节奏 &#x1f3b5; 当你的天空突…

centos7中Open-Webui的部署

前期中部署了ollama 及deepseek-r1,为了有web界面访问&#xff0c;需要部署open-webui 系统要求是python3.11以上版本&#xff0c; 一、先升级openssl 1.安装依赖 yum install -y gcc gcc-c autoconf automake zlib zlib-devel pcre-devel 2.下载源码包并解压 wget https:/…

基于springboot+vue的考研互助平台

开发语言&#xff1a;Java框架&#xff1a;springbootJDK版本&#xff1a;JDK1.8服务器&#xff1a;tomcat7数据库&#xff1a;mysql 5.7&#xff08;一定要5.7版本&#xff09;数据库工具&#xff1a;Navicat11开发软件&#xff1a;eclipse/myeclipse/ideaMaven包&#xff1a;…

基于跨学科任务图谱和大模型微调的智能体设计

技术方案:基于跨学科任务图谱和大模型微调的智能体设计 1. 跨学科任务图谱生成 1.1 降噪与补齐 降噪 跨学科任务图谱的生成首先要对原始数据进行降噪处理,以消除不必要的干扰信息。假设原始数据为 ( D ),降噪后的数据为 ( D’ ),降噪过程可以通过以下公式表示: [ D’…

2016年下半年试题二:论软件设计模式及其应用

论文库链接&#xff1a;系统架构设计师论文 论文题目 软件设计模式(Software DesignPatter)是一套被反复使用的、多数人知晓的、经过分类编目的代码设计经验的总结。使用设计模式是为了重用代码以提高编码效率增加代码的可理解性、保证代码的可靠性。软件设计模式是软件开发中的…

案例自定义tabBar

1.通过Vannt Weapp组件库&#xff0c;引用一个tabBar组件实现自定义tabBar基本模板 2.实现自定义图标 3.渲染tabBar上的数字徽标 下面这张图可以看到设置的样式并没有生效 根据文档可知需要 这一步需要把store中的sum值给绑到徽标上 4.实现tabBar的切换效果 解决底部选中项索引…

JavaScript函数-函数的返回值

在JavaScript编程语言中&#xff0c;函数是构建复杂逻辑和实现代码复用的关键组件。而函数的返回值则是这些功能的重要组成部分&#xff0c;它允许我们将数据从一个函数传递到另一个地方&#xff0c;从而使得函数更加通用和灵活。本文将深入探讨JavaScript函数返回值的各种特性…