从现在开始,努力学习吧!本文《消息不以魔术字节开头》主要讲解了等等相关知识点,我会在编程网中持续更新相关的系列文章,欢迎大家关注并积极留言建议。下面就先一起来看一下本篇正文内容吧,希望能帮到你!问题
从现在开始,努力学习吧!本文《消息不以魔术字节开头》主要讲解了等等相关知识点,我会在编程网中持续更新相关的系列文章,欢迎大家关注并积极留言建议。下面就先一起来看一下本篇正文内容吧,希望能帮到你!
问题内容我正在尝试使用 Go 中的 /linkedin/goavro 包将 avro 编码数据生成到 kafka 主题中。目标是能够使用不同的客户端使用该主题。
首先,我注册架构如下:
curl -x post -h "content-type: application/vnd.schemareGIStry.v1+JSON" --data '{"schema": "{\"name\":\"test_topic2\",\"type\":\"record\", \"fields\":[{\"name\":\"user\",\"type\":\"string\"},{\"name\":\"passWord\",\"size\":10,\"type\":\"string\"}]}"}' Http://localhost:8081/subjects/test_topic2-value/versions
然后我创建 avro 数据,使用 go 生成和使用它。
package main
import (
"GitHub.com/shopify/sarama"
"github.com/linkedin/goavro"
"fmt"
)
const (
brokers = "localhost:9092"
topic = "test_topic2"
)
const logineventavroschema = `{"name":"test_topic2","type":"record", "fields":[{"name":"user","type":"string"},{"name":"password","size":10,"type":"string"}]}`
func main() {
// create message
codec, err := goavro.newcodec(logineventavroschema)
if err != nil {
panic(err)
}
m := map[string]interface{}{
"user": "pikachu", "password": 231231,
}
single, err := codec.singlefromnative(nil, m)
if err != nil {
panic(err)
}
// producer
config := sarama.newconfig()
config.consumer.return.errors = true
config.producer.return.successes = true
config.version = sarama.v2_4_0_0
//get broker
cluster, err := sarama.newsyncproducer(brokers, config)
if err != nil {
panic(err)
}
defer func() {
if err := cluster.close(); err != nil {
panic(err)
}
}()
msg := &sarama.producermessage{
topic: topic,
value: sarama.stringencoder(single),
}
cluster.sendmessage(msg)
// consumer
clusterconsumer, err := sarama.newconsumer(brokers, config)
if err != nil {
panic(err)
}
defer func() {
if err := clusterconsumer.close(); err != nil {
panic(err)
}
}()
msgk, _ := clusterconsumer.consumepartition(topic, 0, sarama.offsetoldest)
for {
q := <-msgk.messages()
native, _, err := codec.nativefromsingle([]byte(q.value))
if err != nil {
fmt.println(err)
}
fmt.println(native)
}
这段代码工作正常,我可以成功地在 kafka 主题中生成和使用消息。
现在我尝试使用 python avro-consumer 中的主题:
from confluent_kafka import kafkaerror
from confluent_kafka.avro import avroconsumer
from confluent_kafka.avro.serializer import serializererror
c = avroconsumer({
'bootstrap.servers': 'localhost',
'group.id': 'groupid',
'schema.registry.url': 'http://localhost:8081',
'auto.offset.reset': 'earliest'})
c.subscribe(['test_topic2'])
while true:
try:
msg = c.poll(10)
except serializererror as e:
print("message deserialization failed for {}: {}".fORMat(msg, e))
break
if msg is none:
continue
if msg.error():
print("avroconsumer error: {}".format(msg.error()))
continue
print(msg.value(), msg.key())
c.close()
但是我收到以下错误:
confluent_kafka.avro.serializer.SerializerError: Message deserialization failed for message at test_topic2 [0] offset 1: message does not start with magic byte
我认为我错过了 go 制作者部分的一些内容,如果有人可以分享他/她如何解决此问题的经验,我将不胜感激。
goavro
不使用架构注册表。
另外,您正在使用 StringEncoder
,我假设它仅输出字符串切片而不是 Avro 字节
FWIW,我建议使用 kafka-avro-console-consumer 测试消费者(如果有)
理论要掌握,实操不能落!以上关于《消息不以魔术字节开头》的详细介绍,大家都掌握了吧!如果想要继续提升自己的能力,那么就来关注编程网公众号吧!
--结束END--
本文标题: 消息不以魔术字节开头
本文链接: https://lsjlt.com/news/596025.html(转载时请注明来源链接)
有问题或投稿请发送至: 邮箱/279061341@qq.com QQ/279061341
2024-04-05
2024-04-05
2024-04-05
2024-04-04
2024-04-05
2024-04-05
2024-04-05
2024-04-05
2024-04-04
回答
回答
回答
回答
回答
回答
回答
回答
回答
回答
0