package com.cisdi.dsp.modules.metaAnalysis.rest.kafka2023;import org.apache.kafka.clients.consumer.*;import org.apache.k
package com.cisdi.dsp.modules.metaAnalysis.rest.kafka2023;import org.apache.kafka.clients.consumer.*;import org.apache.kafka.clients.producer.KafkaProducer;import org.apache.kafka.clients.producer.ProducerConfig;import org.apache.kafka.clients.producer.ProducerRecord;import org.apache.kafka.common.TopicPartition;import org.apache.kafka.common.errors.ProducerFencedException;import org.apache.kafka.common.serialization.StringDeserializer;import org.apache.kafka.common.serialization.StringSerializer;import java.time.Duration;import java.util.*;public class KafkaTest24 { public static final String brokerList = "k8s-master:9092"; public static Properties getConsumerProperties() { Properties props = new Properties(); props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, brokerList); props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); //必须配置手动提交 props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false); props.put(ConsumerConfig.GROUP_ID_CONFIG, "groupId"); return props; } public static Properties getProducerProperties() { Properties props = new Properties(); props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, brokerList); props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); props.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, "transactionalId"); return props; } //先从source-topic消费,再往sink-topic生产 public static void main(String[] args) { KafkaConsumer consumer = new KafkaConsumer<>(getConsumerProperties()); consumer.subscribe(Collections.singletonList("source-topic")); KafkaProducer producer = new KafkaProducer<>(getProducerProperties()); //初始化事务 producer.initTransactions(); while (true) { ConsumerRecords records = consumer.poll(Duration.ofMillis(1000)); if (!records.isEmpty()) { Map offsets = new HashMap<>(); //开启事务 producer.beginTransaction(); try { for (TopicPartition partition : records.partitions()) { List> partitionRecords = records.records(partition); for (ConsumerRecord record : partitionRecords) {ProducerRecord producerRecord = new ProducerRecord<>("sink-topic", record.key(), record.value());producer.send(producerRecord);System.out.println("sent :" + record.value()); } long lastConsumedOffset = partitionRecords.get(partitionRecords.size() - 1).offset(); offsets.put(partition, new OffsetAndMetadata(lastConsumedOffset + 1)); } // 提交消费位移 // consume-transfORM-produce模式,此处的group id 必须要配置成consumer 中配置的group id producer.sendOffsetsToTransaction(offsets, "groupId"); producer.commitTransaction(); } catch (ProducerFencedException e) { producer.abortTransaction(); } } } }}
来源地址:https://blog.csdn.net/amadeus_liu2/article/details/132578447
--结束END--
本文标题: kafka复习:(24)consume-transform-produce模式
本文链接: https://lsjlt.com/news/383117.html(转载时请注明来源链接)
有问题或投稿请发送至: 邮箱/279061341@qq.com QQ/279061341
2024-03-01
2024-03-01
2024-03-01
2024-03-01
2024-03-01
2024-02-29
2024-02-29
2024-02-29
2024-02-29
2024-02-29
回答
回答
回答
回答
回答
回答
回答
回答
回答
回答
0