返回顶部
首页 > 资讯 > 后端开发 > GO >Golang如何将日志以Json格式输出到Kafka
  • 787
分享到

Golang如何将日志以Json格式输出到Kafka

2024-04-02 19:04:59 787人浏览 八月长安
摘要

目录格式化接口普通文本格式化器JSON文本格式化器写日志接口写日志到文件写日志到kafka接口的组装如何提高日志处理的吞吐量在上一篇文章中我实现了一个支持Debug、Info、Err

在上一篇文章中我实现了一个支持Debug、Info、Error等多个级别的日志库,并将日志写到了磁盘文件中,代码比较简单,适合练手。有兴趣的可以通过这个链接前往:https://GitHub.com/bosima/ylog/releases/tag/v1.0.1

工程实践中,我们往往还需要对日志进行采集,将日志归集到一起,然后用于各种处理分析,比如生产环境上的错误分析、异常告警等等。在日志消息系统领域,Kafka久负盛名,这篇文章就以将日志发送到Kafka来实现日志的采集;同时考虑到日志分析时对结构化数据的需求,这篇文章还会提供一种输出json格式日志的方法。

这个升级版的日志库还要保持向前兼容,即还能够使用普通文本格式,以及写日志到磁盘文件,这两个特性和要新增的两个功能分别属于同类处理,因此我这里对它们进行抽象,形成两个接口:格式化接口、写日志接口。

格式化接口

所谓格式化,就是日志的格式处理。这个日志库目前要支持两种格式:普通文本和Json。

为了在不同格式之上提供一个统一的抽象,ylog中定义 logEntry 来代表一条日志:

type logEntry struct {
	Ts    time.Time `json:"ts"`
	File  string    `json:"file"`
	Line  int       `json:"line"`
	Level LogLevel  `json:"level"`
	Msg   string    `json:"msg"`
}

格式化接口的能力就是将日志从logEntry格式转化为其它某种数据格式。ylog中对它的定义是:

type LoggerFORMatter interface {
	Format(*logEntry, *[]byte) error
}

第1个参数是一个logEntry实例,也就是要被格式化的日志,第2个参数是日志格式化之后要写入的容器

普通文本格式化器

其实现是这样的:

type textFormatter struct {
}
func NewTextFormatter() *textFormatter {
	return &textFormatter{}
}
func (f *textFormatter) Format(entry *logEntry, buf *[]byte) error {
	formatTime(buf, entry.Ts)
	*buf = append(*buf, ' ')
	file := toShort(entry.File)
	*buf = append(*buf, file...)
	*buf = append(*buf, ':')
	itoa(buf, entry.Line, -1)
	*buf = append(*buf, ' ')
	*buf = append(*buf, levelNames[entry.Level]...)
	*buf = append(*buf, ' ')
	*buf = append(*buf, entry.Msg...)
	return nil
}

可以看到它的主要功能就是将logEntry中的各个字段按照某种顺序平铺开来,中间用空格分隔。

其中的很多数据处理方法参考了golang标准日志库中的数据格式化处理代码,有兴趣的可以去github中详细查看。

这里对日期时间格式化为字符串做了特别的优化,在标准日志库中为了将年、月、日、时、分、秒、毫秒、微秒等格式化指定长度的字符串,使用了一个函数:

func itoa(buf *[]byte, i int, wid int) {
	// Assemble decimal in reverse order.
	var b [20]byte
	bp := len(b) - 1
	for i >= 10 || wid > 1 {
		wid--
		q := i / 10
		b[bp] = byte('0' + i - q*10)
		bp--
		i = q
	}
	// i < 10
	b[bp] = byte('0' + i)
	*buf = append(*buf, b[bp:]...)
}

其逻辑大概就是将数字中的每一位转换为字符并存入byte中,注意这里初始化byte数组的时候是20位,这是int64最大的数字位数。

其实时间字符串中的每个部分位数都是固定的,比如年是4位、月日时分秒都是2位,根本不需要20位,所以这个空间可以节省;还有这里用了循环,这对于CPU的分支预测可能有那么点影响,所以我这里分别对不同位数写了专门的格式化方法,以2位数为例:

func itoa2(buf *[]byte, i int) {
	q := i / 10
	s := byte('0' + i - q*10)
	f := byte('0' + q)
	*buf = append(*buf, f, s)
}

Json文本格式化器

其实现是这样的:

type jsonFormatter struct {
}
func NewJsonFormatter() *jsonFormatter {
	return &jsonFormatter{}
}
func (f *jsonFormatter) Format(entry *logEntry, buf *[]byte) (err error) {
	entry.File = toShortFile(entry.File)
	jsonBuf, err := json.Marshal(entry)
	*buf = append(*buf, jsonBuf...)
	return
}

代码也很简单,使用标准库的json序列化方法将logEntry实例转化为Json格式的数据。

对于Json格式,后续考虑支持用户自定义Json字段,这里暂时先简单处理。

写日志接口

写日志就是将日志输出到别的目标,比如ylog要支持的输出到磁盘文件、输出到Kafka等。

前边格式化接口将格式化后的数据封装到了 []byte 中,写日志接口就是将格式化处理的输出 []byte 写到某种输出目标中。参考Golang中各种Writer的定义,ylog中对它的定义是:

type LoggerWriter interface {
	Ensure(*logEntry) error
	Write([]byte) error
	Sync() error
	Close() error
}

这里有4个方法:

  • Ensure 确保输出目标已经准备好接收数据,比如打开要写入的文件、创建Kafka连接等等。
  • Write 向输出目标写数据。
  • Sync 要求输出目标将缓存持久化,比如写数据到磁盘时,操作系统会有缓存,通过这个方法要求缓存数据写入磁盘。
  • Close 写日志结束,关闭输出目标。

写日志到文件

这里定义一个名为fileWriter的类型,它需要实现LoggerWriter的接口。

先看类型的定义:

type fileWriter struct {
	file     *os.File
	lastHour int64
	Path     string
}

包含四个字段:

  • file 要输出的文件对象。
  • lastHour 按照小时创建文件的需要。
  • Path 日志文件的根路径。

再看其实现的接口:

func (w *fileWriter) Ensure(entry *logEntry) (err error) {
	if w.file == nil {
		f, err := w.createFile(w.Path, entry.Ts)
		if err != nil {
			return err
		}
		w.lastHour = w.getTimeHour(entry.Ts)
		w.file = f
		return nil
	}
	currentHour := w.getTimeHour(entry.Ts)
	if w.lastHour != currentHour {
		_ = w.file.Close()
		f, err := w.createFile(w.Path, entry.Ts)
		if err != nil {
			return err
		}
		w.lastHour = currentHour
		w.file = f
	}
	return
}
func (w *fileWriter) Write(buf []byte) (err error) {
	buf = append(buf, '\n')
	_, err = w.file.Write(buf)
	return
}
func (w *fileWriter) Sync() error {
	return w.file.Sync()
}
func (w *fileWriter) Close() error {
	return w.file.Close()
}

Ensure 中的主要逻辑是创建当前要写入的文件对象,如果小时数变了,先把之前的关闭,再创建一个新的文件。

Write 把数据写入到文件对象,这里加了一个换行符,也就是说对于文件日志,其每条日志最后都会有一个换行符,这样比较方便阅读。

Sync 调用文件对象的Sync方法,将日志从操作系统缓存刷到磁盘。

Close 关闭当前文件对象。

写日志到Kafka

这里定义一个名为kafkaWriter的类型,它也需要实现LoggerWriter的接口。

先看其结构体定义:

type kafkaWriter struct {
	Topic     string
	Address   string
	writer    *kafka.Writer
	batchSize int
}

这里包含四个字段:

Topic 写Kafka时需要一个主题,这里默认当前Logger中所有日志使用同一个主题。

Address Kafka的访问地址。

writer 向Kafka写数据时使用的Writer,这里集成的是:github.com/segmentio/kafka-go,支持自动重试和重连。

batchSize Kafka写日志的批次大小,批量写可以提高日志的写效率。

再看其实现的接口:

func (w *kafkaWriter) Ensure(curTime time.Time) (err error) {
	if w.writer == nil {
		w.writer = &kafka.Writer{
			Addr:      kafka.tcp(w.Address),
			Topic:     w.Topic,
			BatchSize: w.batchSize,
			Async:     true,
		}
	}
	return
}
func (w *kafkaWriter) Write(buf []byte) (err error) {
	// buf will be reused by ylog when this method return,
	// with aysnc write, we need copy data to a new slice
	kbuf := append([]byte(nil), buf...)
	err = w.writer.WriteMessages(context.Background(),
		kafka.Message{Value: kbuf},
	)
	return
}
func (w *kafkaWriter) Sync() error {
	return nil
}
func (w *kafkaWriter) Close() error {
	return w.writer.Close()
}

这里采用的是异步发送到Kafka的方式,WriteMessages方法不会阻塞,因为传入的buf要被ylog重用,所以这里copy了一下。异步还会存在的一个问题就是不会返回错误,可能丢失数据,不过对于日志这种数据,没有那么严格的要求,也可以接受。

如果采用同步发送,因为批量发送比较有效率,这里可以攒几条再发,但日志比较稀疏时,可能短时间很难攒够,就会出现长时间等不到日志的情况,所以还要有个超时机制,这有点麻烦,不过我也写了一个版本,有兴趣的可以去看看:Https://github.com/bosima/ylog/blob/main/examples/kafka-writer.go

接口的组装

有了格式化接口和写日志接口,下一步就是将它们组装起来,以实现相应的处理能力。

首先是创建它们,因为我这里也没有动态配置的需求,所以就放到创建Logger实例的时候了,这样比较简单。

func NewYesLogger(opts ...Option) (logger *YesLogger) {
	logger = &YesLogger{}
	...
	logger.writer = NewFileWriter("logs")
	logger.formatter = NewTextFormatter()

	for _, opt := range opts {
		opt(logger)
	}
	...
	return
}

可以看到默认的formatter是textFormatter,默认的writer是fileWriter。这个函数传入的Option其实是个函数,在下边的opt(logger)中会执行它们,所以使用其它的Formatter或者Writer可以这样做:

logger := ylog.NewYesLogger(
		...
		ylog.Writer(ylog.NewKafkaWriter(address, topic, writeBatchSize)),
		ylog.Formatter(ylog.NewJsonFormatter()),
)

这里 ylog.Writer 和 ylog.Formatter 就是符合Option类型的函数,调用它们可以设置不同的Formatter和Writer。

然后怎么使用它们呢?

...
l.formatter.Format(entry, &buf)
l.writer.Ensure(entry)
err := l.writer.Write(buf)
...

当 logEntry 进入消息处理环节后,首先调用formatter的Format方法格式化logEntry;然后调用了writer的Ensure方法确保writer已经准备好,最后调用writer的Write方法将格式化之后的数据输出到对应的目标。

为什么不将Ensure方法放到Write中呢?这是因为目前写文本日志的时候需要根据logEntry中的日志时间创建日志文件,这样就需要给Writer传递两个参数,有点别扭,所以这里将它们分开了。

如何提高日志处理的吞吐量

Kafka的吞吐量是很高的,那么如果放到ylog自身来说,如何提高它的吞吐量呢?

首先想到的就是Channel,可以使用有缓冲的Channel模拟一个队列,生产者不停的向Channel发送数据,如果Writer可以一直在缓冲被填满之前将数据取走,那么理论上说生产者就是非阻塞的,相比同步输出到某个Writer,没有直接磁盘IO、网络IO,日志处理的吞吐量必将大幅提升。

定义一个Channel,其容量默认为当前机器逻辑处理器的数量:

logger.pipe = make(chan *logEntry, runtime.NumCPU())

发送数据的代码:

entry := &logEntry{
		Level: level,
		Msg:   s,
		File:  file,
		Line:  line,
		Ts:    now,
	}
	l.pipe <- entry

接收数据的代码:

for {
		select {
		case entry := <-l.pipe:
			// reuse the slice memory
			buf = buf[:0]
			l.formatter.Format(entry, &buf)
			l.writer.Ensure(entry.Ts)
			err := l.writer.Write(buf)
		...
		}
	}

实际效果怎么样呢?看下Benchmark:

goos: darwin
goarch: amd64
pkg: github.com/bosima/ylog
cpu: Intel(R) Core(TM) i5-8259U CPU @ 2.30GHz
BenchmarkInfo-8   	 1332333	       871.6 ns/op	     328 B/op	       4 allocs/op

这个结果可以和zerolog、zap等高性能日志库一较高下了,当然目前可以做的事情要比它们简单很多。

如果对Java有所了解的同学应该听说过log4j,在log4j2中引入了一个名为Disruptor的组件,它让日志处理飞快了起来,受到很多Java开发者的追捧。Disruptor之所以这么厉害,是因为它使用了无并发、环形队列、缓存行填充等多种高级技术。

相比之下,Golang的Channel虽然也使用了环形缓冲,但是还是使用了锁,作为队列来说性能并不是最优的。

Golang中有没有类似的东西呢?最近出来的ZenQ可能是一个不错的选择,不过看似还不太稳定,过段时间再尝试下。有兴趣的可以去看看:https://github.com/alphadose/ZenQ 。

好了,以上就是本文的主要内容。关于ylog的介绍也告一段落了,后续会在Github上持续更新,增加更多有用的功能,并不断优化处理性能,欢迎关注:https://github.com/bosima/ylog 。

到此这篇关于Golang:将日志以Json格式输出到Kafka的文章就介绍到这了,更多相关Golang日志输出到Kafka内容请搜索编程网以前的文章或继续浏览下面的相关文章希望大家以后多多支持编程网!

您可能感兴趣的文档:

--结束END--

本文标题: Golang如何将日志以Json格式输出到Kafka

本文链接: https://lsjlt.com/news/148935.html(转载时请注明来源链接)

有问题或投稿请发送至: 邮箱/279061341@qq.com    QQ/279061341

猜你喜欢
  • Golang如何将日志以Json格式输出到Kafka
    目录格式化接口普通文本格式化器Json文本格式化器写日志接口写日志到文件写日志到Kafka接口的组装如何提高日志处理的吞吐量在上一篇文章中我实现了一个支持Debug、Info、Err...
    99+
    2024-04-02
  • Slf4j+logback实现JSON格式日志输出方式
    目录Slf4j+logback实现JSON格式日志输出依赖logback 记录JSON日志Slf4j+logback实现JSON格式日志输出 依赖 <dependency&...
    99+
    2024-04-02
  • Slf4j+logback实现JSON格式日志输出方式是什么
    Slf4j+logback实现JSON格式日志输出方式是什么,针对这个问题,这篇文章详细介绍了相对应的分析和解答,希望可以帮助更多想解决这个问题的小伙伴找到更简单易行的方法。Slf4j+logback实现JSON格式日志输出依赖<de...
    99+
    2023-06-22
  • docker日志如何输出到文件
    在Docker中,可以使用以下两种方式将日志输出到文件:1. 使用Docker日志驱动(Logging Driver):Docker...
    99+
    2023-09-28
    docker
  • 如何在Linux中格式化输出 JSON
    这篇文章将为大家详细讲解有关如何在Linux中格式化输出 JSON,文章内容质量较高,因此小编分享给大家做个参考,希望大家阅读完这篇文章后对相关知识有一定的了解。 JSON 是一种轻量级且与语言无关的数据存储格式,易于与大多数编程...
    99+
    2023-06-09
  • Tomcat的支持log4j及日志输出为json格式是怎样的
    今天就跟大家聊聊有关Tomcat的支持log4j及日志输出为json格式是怎样的,可能很多人都不太了解,为了让大家更加了解,小编给大家总结了以下内容,希望大家根据这篇文章可以有所收获。1.下载apache-tomcat-7.0.42及解压[...
    99+
    2023-06-03
  • linux如何把日志输出到文本
    在Linux中,可以通过重定向操作符将日志输出到文本文件中。你可以使用以下命令将命令的输出重定向到文件:```command > l...
    99+
    2023-10-09
    linux
  • PHP如何将数据库查询结果输出为json格式
    直接上实例代码 <php header("Content-type:text/html;charset=utf-8");//字符编码设置 $servername = "...
    99+
    2024-04-02
  • golang中如何将数据转为json格式
    本文小编为大家详细介绍“golang中如何将数据转为json格式”,内容详细,步骤清晰,细节处理妥当,希望这篇“golang中如何将数据转为json格式”文章能帮助大家解决疑惑,下面跟着小编的思路慢慢深入,一起来学习新知识吧。使用 Gola...
    99+
    2023-07-05
  • PHP以 WBMP 格式将图像输出到浏览器或文件
    ...
    99+
    2024-04-02
  • PHP以 PNG 格式将图像输出到浏览器或文件
    ...
    99+
    2024-04-02
  • Golang GinWeb之自定义日志格式和输出方式/启禁日志颜色的方法是什么
    这篇文章主要介绍“Golang GinWeb之自定义日志格式和输出方式/启禁日志颜色的方法是什么”,在日常操作中,相信很多人在Golang GinWeb之自定义日志格式和输出方式/启禁日志颜色的方法是什么问...
    99+
    2024-04-02
  • maven如何使用slf4j输出日志到文件
    目录使用slf4j输出日志到文件log4j.propertiesslf4j将部分日志打印在其他文件中logback.xml配置中新增一个FileAppender在需要额外打印日志的地...
    99+
    2024-04-02
  • 如何使用shell将脚本输出结果记录到日志文件
    这篇文章将为大家详细讲解有关如何使用shell将脚本输出结果记录到日志文件,小编觉得挺实用的,因此分享给大家做个参考,希望大家阅读完这篇文章后可以有所收获。使用tee命令:sh portal/main.sh |tee log.txt获取脚本...
    99+
    2023-06-09
  • MySQL 如何以垂直格式而不是表格格式生成输出?
    通过在 MySQL 语句末尾使用 \G,它以垂直格式而不是表格格式返回输出。考虑下面的例子 -mysql> Select curdate(); +------------+ | curdate() | +------------+ ...
    99+
    2023-10-22
  • Python如何将控制台输出另存为日志文件
    目录Python将控制台输出另存为日志文件需求  方法一:使用 Logger 类(推荐)方法二:仅使用 sys方法三:使用 logging 模块Python记录日...
    99+
    2023-05-19
    Python控制台 Python日志文件 Python控制台输出
  • 如何进行Docker安装ELK并实现JSON格式日志
    本篇文章给大家分享的是有关如何进行Docker安装ELK并实现JSON格式日志,小编觉得挺实用的,因此分享给大家学习,希望大家阅读完这篇文章后可以有所收获,话不多说,跟着小编一起来看看吧。ELK是什么ELK是elastic公司提供的一套完整...
    99+
    2023-06-16
  • C#如何实现日期时间的格式化输出
    这篇文章主要讲解了“C#如何实现日期时间的格式化输出”,文中的讲解内容简单清晰,易于学习与理解,下面请大家跟着小编的思路慢慢深入,一起来研究和学习“C#如何实现日期时间的格式化输出”吧!DateTime被放在System命名空间下,在顶级语...
    99+
    2023-07-05
  • 如何以批处理模式获取MySQL交互式输出格式?
    我们可以借助-t选项以批处理方式获取MySQL输出格式。例如,在使用 –t 选项以批处理模式运行相同的查询后,我们将获得类似交互式格式的输出。示例C:\Program Files\MySQL\bin>mysql -u root -p ...
    99+
    2023-10-22
  • 如何使用自定义Json注解实现输出日志字段脱敏
    这篇文章给大家分享的是有关如何使用自定义Json注解实现输出日志字段脱敏的内容。小编觉得挺实用的,因此分享给大家做个参考,一起跟随小编过来看看吧。自定义Json注解实现输出日志字段脱敏背景在日志输出的时候,有时会输出一些用户的敏感信息,如手...
    99+
    2023-06-22
软考高级职称资格查询
编程网,编程工程师的家园,是目前国内优秀的开源技术社区之一,形成了由开源软件库、代码分享、资讯、协作翻译、讨论区和博客等几大频道内容,为IT开发者提供了一个发现、使用、并交流开源技术的平台。
  • 官方手机版

  • 微信公众号

  • 商务合作