返回顶部
首页 > 资讯 > 操作系统 >kafka复习:(23)事务
  • 309
分享到

kafka复习:(23)事务

kafkalinq分布式 2023-08-30 14:08:05 309人浏览 泡泡鱼
摘要

一、生产者,开启事务。 package com.cisdi.dsp.modules.metaAnalysis.rest.kafka2023;import org.apache.kafka.clients.producer.KafkaProd

一、生产者,开启事务

package com.cisdi.dsp.modules.metaAnalysis.rest.kafka2023;import org.apache.kafka.clients.producer.KafkaProducer;import org.apache.kafka.clients.producer.ProducerConfig;import org.apache.kafka.clients.producer.ProducerRecord;import org.apache.kafka.clients.producer.RecordMetadata;import org.apache.kafka.common.serialization.StringSerializer;import java.util.Date;import java.util.Properties;import java.util.concurrent.ExecutionException;import java.util.concurrent.Future;public class KafkaTest22 {    public static void main(String[] args) {        Properties properties= new Properties();        properties.setProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());        properties.setProperty(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());        properties.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"xx.xx.xx.xx:9092");        properties.setProperty(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, "true");        properties.setProperty(ProducerConfig.RETRIES_CONFIG, "2");        properties.setProperty(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, "4");        properties.setProperty(ProducerConfig.TRANSACTIONAL_ID_CONFIG, "myTransaction");        KafkaProducer kafkaProducer=new KafkaProducer(properties);        ProducerRecord producerRecord=new ProducerRecord<>("study2024",0,"fff","hello sister,now is: "+ new Date());        kafkaProducer.initTransactions();        kafkaProducer.beginTransaction();        try{            Future future = kafkaProducer.send(producerRecord);            long offset = 0;            try {                offset = future.get().offset();            } catch (InterruptedException e) {                e.printStackTrace();            } catch (ExecutionException e) {                e.printStackTrace();            }            System.out.println(offset);            Thread.sleep(60000);            kafkaProducer.commitTransaction();        } catch (Exception ex){            kafkaProducer.abortTransaction();        }        kafkaProducer.close();    }}

二、消费者,设置隔离级别为"read_committed"

package com.cisdi.dsp.modules.metaAnalysis.rest.kafka2023;import org.apache.kafka.clients.consumer.ConsumerConfig;import org.apache.kafka.clients.consumer.ConsumerRecord;import org.apache.kafka.clients.consumer.ConsumerRecords;import org.apache.kafka.clients.consumer.KafkaConsumer;import org.apache.kafka.clients.producer.ProducerConfig;import org.apache.kafka.common.TopicPartition;import org.apache.kafka.common.serialization.StringDeserializer;import org.apache.kafka.common.serialization.StringSerializer;import java.time.Duration;import java.time.temporal.TemporalUnit;import java.util.Arrays;import java.util.Properties;import java.util.concurrent.TimeUnit;public class KafkaTest23 {    private static Properties getProperties(){        Properties properties=new Properties();        properties.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());        properties.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());        properties.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"xx.xx.xx.xx:9092");        properties.setProperty(ConsumerConfig.GROUP_ID_CONFIG,"testGroup");        properties.setProperty(ConsumerConfig.ISOLATION_LEVEL_CONFIG, "read_committed");        //properties.setProperty(ConsumerConfig.ISOLATION_LEVEL_CONFIG, "read_uncommitted");        return properties;    }    public static void main(String[] args) {        KafkaConsumer myConsumer=new KafkaConsumer(getProperties());        String topic="study2024";        myConsumer.subscribe(Arrays.asList(topic));        while(true){            ConsumerRecords consumerRecords=myConsumer.poll(Duration.ofMillis(5000));            for(ConsumerRecord record: consumerRecords){                System.out.println(record.value());                System.out.println("record offset is: "+record.offset());            }        }    }}

三、运行结果,按照上述配置,当生产者发送消息并从kafka broker获取到offset后就会sleep,在生产者sleep的时候,消费者是获取不到消息的,只有sleep完成并提交事务之后,消费者才会获取到消息

来源地址:https://blog.csdn.net/amadeus_liu2/article/details/132567347

--结束END--

本文标题: kafka复习:(23)事务

本文链接: https://lsjlt.com/news/383056.html(转载时请注明来源链接)

有问题或投稿请发送至: 邮箱/279061341@qq.com    QQ/279061341

猜你喜欢
软考高级职称资格查询
编程网,编程工程师的家园,是目前国内优秀的开源技术社区之一,形成了由开源软件库、代码分享、资讯、协作翻译、讨论区和博客等几大频道内容,为IT开发者提供了一个发现、使用、并交流开源技术的平台。
  • 官方手机版

  • 微信公众号

  • 商务合作