返回顶部
首页 > 资讯 > 后端开发 > GO >Golang errgroup 设计及实现原理解析
  • 484
分享到

Golang errgroup 设计及实现原理解析

2024-04-02 19:04:59 484人浏览 安东尼
摘要

目录开篇errgroup 源码拆解GroupWithContextWaitGoSetLimitTryGo使用方法结束语开篇 继上次学习了信号量 semaphore 扩展库的设计思路和

开篇

继上次学习了信号量 semaphore 扩展库的设计思路和实现之后,今天我们继续来看 golang.org/x/sync 包下的另一个经常被 Golang 开发者使用的大杀器:errgroup。

业务研发中我们经常会遇到需要调用多个下游的场景,比如加载一个商品的详情页,你可能需要访问商品服务,库存服务,券服务,用户服务等,才能从各个数据源获取到所需要的信息,经过一些鉴权逻辑后,组装成前端需要的数据格式下发。

串行调用当然可以,但这样就潜在的给各个调用预设了【顺序】,必须执行完 A,B,C 之后才能执行 D 操作。但如果我们对于顺序并没有强需求,从语义上看两个调用是完全独立可并发的,那么我们就可以让他们并发执行。

这个时候就可以使用 errgroup 来解决问题。一定意义上讲,errgroup 是基于 WaitGroup 在错误传递上进行一些优化而提供出来的能力。它不仅可以支持 context.Context 的相关控制能力,还可以将子任务的 error 向上传递。

errgroup 源码拆解

errgroup 定义在 golang.org/x/sync/errgroup,承载核心能力的结构体是 Group。

Group

type token struct{}
// A Group is a collection of goroutines working on subtasks that are part of
// the same overall task.
//
// A zero Group is valid, has no limit on the number of active goroutines,
// and does not cancel on error.
type Group struct {
	cancel func()
	wg sync.WaitGroup
	sem chan token
	errOnce sync.Once
	err     error
}

Group 就是对我们上面提到的一堆子任务执行计划的抽象。每一个子任务都会有自己对应的 goroutine 来执行。

通过这个结构我们也可以看出来,errgroup 底层实现多个 goroutine 调度,等待的能力还是基于 sync.WaitGroup。

WithContext

我们可以调用 errgroup.WithContext 函数来创建一个 Group。

// WithContext returns a new Group and an associated Context derived from ctx.
//
// The derived Context is canceled the first time a function passed to Go
// returns a non-nil error or the first time Wait returns, whichever occurs
// first.
func WithContext(ctx context.Context) (*Group, context.Context) {
	ctx, cancel := context.WithCancel(ctx)
	return &Group{cancel: cancel}, ctx
}

这里可以看到,Group 的 cancel 函数本质就是为了支持 context 的 cancel 能力,初始化的 Group 只有一个 cancel 属性,其他都是默认的。一旦有一个子任务返回错误,或者是 Wait 调用返回,这个新 Context 就会被 cancel。

Wait

本质上和 WaitGroup 的 Wait 方法语义一样,既然我们是个 group task,就需要等待所有任务都执行完,这个语义就由 Wait 方法提供。如果有多个子任务返回错误,它只会返回第一个出现的错误,如果所有的子任务都执行成功,就返回 nil。


// Wait blocks until all function calls from the Go method have returned, then
// returns the first non-nil error (if any) from them.
func (g *Group) Wait() error {
	g.wg.Wait()
	if g.cancel != nil {
		g.cancel()
	}
	return g.err
}

Wait 的实现非常简单。一个前置的 WaitGroup Wait,结束后只做了两件事:

  • 如果对于公共的 Context 有 cancel 函数,就将其 cancel,因为事情办完了;
  • 返回公共的 Group 结构中的 err 给调用方。

Go

Group 的核心能力就在于能够并发执行多个子任务,从调用者的角度,我们只需要传入要执行的函数,签名为:func() error即可,非常通用。如果任务执行成功,就返回 nil,否则就返回 error,并且会 cancel 那个新的 Context。底层的调度逻辑由 Group 的 Go 方法实现:

// Go calls the given function in a new goroutine.
// It blocks until the new goroutine can be added without the number of
// active goroutines in the group exceeding the configured limit.
//
// The first call to return a non-nil error cancels the group; its error will be
// returned by Wait.
func (g *Group) Go(f func() error) {
	if g.sem != nil {
		g.sem <- token{}
	}
	g.wg.Add(1)
	go func() {
		defer g.done()
		if err := f(); err != nil {
			g.errOnce.Do(func() {
				g.err = err
				if g.cancel != nil {
					g.cancel()
				}
			})
		}
	}()
}
func (g *Group) done() {
	if g.sem != nil {
		<-g.sem
	}
	g.wg.Done()
}

我们重点来分析下 Go 这里发生了什么。

WaitGroup 加 1 用作计数;

启动一个新的 goroutine 执行调用方传入的 f() 函数;

  • 若 err 为 nil 说明执行正常;
  • 若 err 不为 nil,说明执行出错,此时将这个返回的 err 赋值给全局 Group 的变量 err,并将 context cancel 掉。注意,这里的处理在 once 分支中,也就是只有第一个来的错误会被处理。

在 defer 语句中调用 Group 的 done 方法,底层依赖 WaitGroup 的 Done,说明这一个子任务结束。

这里也可以看到,其实所谓 errgroup,我们并不是将所有子任务的 error 拼成一个字符串返回,而是直接在 Go 方法那里将第一个错误返回,底层依赖了 once 的能力。

SetLimit

其实看到这里,你有没有觉得 errgroup 的功能有点鸡肋?底层核心技术都是靠 WaitGroup 完成的,自己只不过是起了个 goroutine 执行方法,err 还只能保留一个。而且 Group 里面的 sem 那个 chan 是用来干什么呢?

这一节我们就来看看,Golang 对 errgroup 能力的一次扩充。

到目前为止,errgroup 是可以做到一开始人们对它的期望的,即并发执行子任务。但问题在于,这里是每一个子任务都开了个goroutine,如果是在高并发的环境里,频繁创建大量goroutine 这样的用法很容易对资源负载产生影响。开发者们提出,希望有办法限制 errgroup 创建的 goroutine 数量,参照这个 proposal: #27837

// SetLimit limits the number of active goroutines in this group to at most n.
// A negative value indicates no limit.
//
// Any subsequent call to the Go method will block until it can add an active
// goroutine without exceeding the configured limit.
//
// The limit must not be modified while any goroutines in the group are active.
func (g *Group) SetLimit(n int) {
	if n < 0 {
		g.sem = nil
		return
	}
	if len(g.sem) != 0 {
		panic(fmt.Errorf("errgroup: modify limit while %v goroutines in the group are still active", len(g.sem)))
	}
	g.sem = make(chan token, n)
}

SetLimit 的参数 n 就是我们对这个 Group 期望的最大 goroutine 数量,这里其实除去校验逻辑,只干了一件事:g.sem = make(chan token, n),即创建一个长度为 n 的 channel,赋值给 sem。

回忆一下我们在 Go 方法 defer 调用 done 中的表现,是不是清晰了很多?我们来理一下:

首先,Group 结构体就包含了 sem 变量,只作为通信,元素是空结构体,不包含实际意义:

type Group struct {
	cancel func()
	wg sync.WaitGroup
	sem chan token
	errOnce sync.Once
	err     error
}

如果你对整个 Group 的 Limit 没有要求,which is fine,你就直接忽略这个 SetLimit,errgroup 的原有能力就可以支持你的诉求。

但是,如果你希望保持 errgroup 的 goroutine 在一个上限之内,请在调用 Go 方法前,先 SetLimit,这样 Group 的 sem 就赋值为一个长度为 n 的 channel。

那么,当你调用 Go 方法时,注意下面的框架代码,此时 g.sem 不为 nil,所以我们会塞一个 token 进去,作为占坑,语义上表示我申请一个 goroutine 用。

func (g *Group) Go(f func() error) {
	if g.sem != nil {
		g.sem <- token{}
	}
	g.wg.Add(1)
	go func() {
		defer g.done()
                ...
	}()
}

当然,如果此时 goroutine 数量已经达到上限,这里就会 block 住,直到别的 goroutine 干完活,sem 输出了一个 token之后,才能继续往里面塞。

在每个 goroutine 执行完毕后 defer 的 g.done 方法,则是完成了这一点:

func (g *Group) done() {
	if g.sem != nil {
		<-g.sem
	}
	g.wg.Done()
}

这样就把 sem 的用法串起来了。我们通过创建一个定长的channel来实现对于 goroutine 数量的管控,对于channel实际包含的元素并不关心,所以用一个空结构体省一省空间,这是非常优秀的设计,大家平常也可以参考。

TryGo

TryGo 和 SetLimit 这套体系其实都是不久前欧长坤大佬提交到 errgroup 的能力。

一如既往,所有带 TryXXX的函数,都不会阻塞。 其实办的事情非常简单,和 Go 方法一样,传进来一个 func() error来执行。

Go 方法的区别在于,如果判断 limit 已经不够了,此时不再阻塞,而是直接 return false,代表执行失败。其他的部分完全一样。

// TryGo calls the given function in a new goroutine only if the number of
// active goroutines in the group is currently below the configured limit.
//
// The return value reports whether the goroutine was started.
func (g *Group) TryGo(f func() error) bool {
	if g.sem != nil {
		select {
		case g.sem <- token{}:
			// Note: this allows barging iff channels in general allow barging.
		default:
			return false
		}
	}
	g.wg.Add(1)
	go func() {
		defer g.done()
		if err := f(); err != nil {
			g.errOnce.Do(func() {
				g.err = err
				if g.cancel != nil {
					g.cancel()
				}
			})
		}
	}()
	return true
}

使用方法

这里我们先看一个最常见的用法,针对一组任务,每一个都单独起 goroutine 执行,最后获取 Wait 返回的 err 作为整个 Group 的 err。


package main
import (
    "errors"
    "fmt"
    "time"
    "golang.org/x/sync/errgroup"
)
func main() {
    var g errgroup.Group
    // 启动第一个子任务,它执行成功
    g.Go(func() error {
        time.Sleep(5 * time.Second)
        fmt.Println("exec #1")
        return nil
    })
    // 启动第二个子任务,它执行失败
    g.Go(func() error {
        time.Sleep(10 * time.Second)
        fmt.Println("exec #2")
        return errors.New("failed to exec #2")
    })
    // 启动第三个子任务,它执行成功
    g.Go(func() error {
        time.Sleep(15 * time.Second)
        fmt.Println("exec #3")
        return nil
    })
    // 等待三个任务都完成
    if err := g.Wait(); err == nil {
        fmt.Println("Successfully exec all")
    } else {
        fmt.Println("failed:", err)
    }
}

你会发现,最后 err 打印出来就是第二个子任务的 err。

当然,上面这个 case 是我们正好只有一个报错,但是,如果实际有多个任务都挂了呢?

从完备性来考虑,有没有什么办法能够将多个任务的错误都返回呢?

这一点其实 errgroup 库并没有提供非常好的支持,需要开发者自行做一些改造。因为 Group 中只有一个 err 变量,我们不可能基于 Group 来实现这一点。

通常情况下,我们会创建一个 slice 来存储 f() 执行的 err。


package main
import (
    "errors"
    "fmt"
    "time"
    "golang.org/x/sync/errgroup"
)
func main() {
    var g errgroup.Group
    var result = make([]error, 3)
    // 启动第一个子任务,它执行成功
    g.Go(func() error {
        time.Sleep(5 * time.Second)
        fmt.Println("exec #1")
        result[0] = nil // 保存成功或者失败的结果
        return nil
    })
    // 启动第二个子任务,它执行失败
    g.Go(func() error {
        time.Sleep(10 * time.Second)
        fmt.Println("exec #2")
        result[1] = errors.New("failed to exec #2") // 保存成功或者失败的结果
        return result[1]
    })
    // 启动第三个子任务,它执行成功
    g.Go(func() error {
        time.Sleep(15 * time.Second)
        fmt.Println("exec #3")
        result[2] = nil // 保存成功或者失败的结果
        return nil
    })
    if err := g.Wait(); err == nil {
        fmt.Printf("Successfully exec all. result: %v\n", result)
    } else {
        fmt.Printf("failed: %v\n", result)
    }
}

可以看到,我们声明了一个 result slice,长度为 3。这里不用担心并发问题,因为每个 goroutine 读写的位置是确定唯一的。

本质上可以理解为,我们把 f() 返回的 err 不仅仅给了 Group 一份,还自己存了一份,当出错的时候,Wait 返回的错误我们不一定真的用,而是直接用自己错的这一个 error slice。

Go 官方文档中的利用 errgroup 实现 pipeline 的示例也很有参考意义:由一个子任务遍历文件夹下的文件,然后把遍历出的文件交给 20 个 goroutine,让这些 goroutine 并行计算文件的 md5。

这里贴出来简化代码学习一下.

package main
import (
	"context"
	"crypto/md5"
	"fmt"
	"io/ioutil"
	"log"
	"os"
	"path/filepath"
	"golang.org/x/sync/errgroup"
)
// Pipeline demonstrates the use of a Group to implement a multi-stage
// pipeline: a version of the MD5All function with bounded parallelism from
// https://blog.golang.org/pipelines.
func main() {
	m, err := MD5All(context.Background(), ".")
	if err != nil {
		log.Fatal(err)
	}
	for k, sum := range m {
		fmt.Printf("%s:\t%x\n", k, sum)
	}
}
type result struct {
	path string
	sum  [md5.Size]byte
}
// MD5All reads all the files in the file tree rooted at root and returns a map
// from file path to the MD5 sum of the file's contents. If the directory walk
// fails or any read operation fails, MD5All returns an error.
func MD5All(ctx context.Context, root string) (map[string][md5.Size]byte, error) {
	// ctx is canceled when g.Wait() returns. When this version of MD5All returns
	// - even in case of error! - we know that all of the goroutines have finished
	// and the memory they were using can be garbage-collected.
	g, ctx := errgroup.WithContext(ctx)
	paths := make(chan string)
	g.Go(func() error {
		defer close(paths)
		return filepath.Walk(root, func(path string, info os.FileInfo, err error) error {
			if err != nil {
				return err
			}
			if !info.Mode().IsRegular() {
				return nil
			}
			select {
			case paths <- path:
			case <-ctx.Done():
				return ctx.Err()
			}
			return nil
		})
	})
	// Start a fixed number of goroutines to read and digest files.
	c := make(chan result)
	const numDigesters = 20
	for i := 0; i < numDigesters; i++ {
		g.Go(func() error {
			for path := range paths {
				data, err := ioutil.ReadFile(path)
				if err != nil {
					return err
				}
				select {
				case c <- result{path, md5.Sum(data)}:
				case <-ctx.Done():
					return ctx.Err()
				}
			}
			return nil
		})
	}
	go func() {
		g.Wait()
		close(c)
	}()
	m := make(map[string][md5.Size]byte)
	for r := range c {
		m[r.path] = r.sum
	}
	// Check whether any of the goroutines failed. Since g is accumulating the
	// errors, we don't need to send them (or check for them) in the individual
	// results sent on the channel.
	if err := g.Wait(); err != nil {
		return nil, err
	}
	return m, nil
}

其实本质上还是 channel发挥了至关重要的作用,这里建议大家有时间尽量看看源文章:pkg.go.dev/golang.org/…

对于用 errgroup 实现 pipeline 模式有很大帮助。

结束语

今天我们学习了 errgroup 的源码已经新增的 SetLimit 能力,其实看过了 sync 相关的这些库,整体用到的能力其实大差不差,很多设计思想都是非常精巧的。比如 errgroup 中利用定长 channel 实现控制 goroutine 数量,空结构体节省内存空间。

并且 sync 包的实现一般都还是非常简洁的,比如 once,singleflight,semaphore 等。建议大家有空的话自己过一遍,对并发和设计模式的理解会更上一个台阶。

errgroup 本身并不复杂,业界也有很多封装实现,大家可以对照源码再思考一下还有什么可以改进的地方。

以上就是Golang errgroup 设计及实现原理解析的详细内容,更多关于Golang errgroup 设计原理的资料请关注编程网其它相关文章!

您可能感兴趣的文档:

--结束END--

本文标题: Golang errgroup 设计及实现原理解析

本文链接: https://lsjlt.com/news/121117.html(转载时请注明来源链接)

有问题或投稿请发送至: 邮箱/279061341@qq.com    QQ/279061341

猜你喜欢
  • Golang errgroup 设计及实现原理解析
    目录开篇errgroup 源码拆解GroupWithContextWaitGoSetLimitTryGo使用方法结束语开篇 继上次学习了信号量 semaphore 扩展库的设计思路和...
    99+
    2024-04-02
  • golang协程设计及调度原理
    目录一、协程设计-GMP模型1.工作线程M2.逻辑处理器p3.协程g4.全局调度信息schedt5.GMP详细示图二、协程调度1.调度策略获取本地运行队列获取全局运行队列协程窃取2....
    99+
    2024-04-02
  • Golangmap实践及实现原理解析
    目录Map实践以及实现原理使用实例内存模型创建maphash函数key定位和碰撞解决扩容元素访问删除迭代Map实践以及实现原理 使用实例内存模型创建maphash函数key定位和碰撞...
    99+
    2024-04-02
  • Golang实现gRPC的Proxy的原理解析
    背景 gRPC是Google开始的一个RPC服务框架, 是英文全名为Google Remote Procedure Call的简称。 广泛的应用在有RPC场景的业务系统中,一些架构中...
    99+
    2024-04-02
  • Golang的gc简介及原理解析
    标题:Golang的GC简介及原理解析 Golang(Go语言)作为一种由Google开发的开源编程语言,一直以其高效的并发模型和快速的编译速度受到广泛关注。其中,垃圾回收(Garba...
    99+
    2024-03-06
    golang 原理解析 go语言
  • Golang Facade模式的设计思想与实现原理
    Golang中的Facade模式是一种结构型设计模式,它提供了一个统一的接口,用于隐藏子系统的复杂性,使得子系统更容易使用。Faca...
    99+
    2023-10-08
    Golang
  • 学习Golang接口:实现原理与设计模式
    学习Golang接口:实现原理与设计模式 在学习Golang编程语言的过程中,接口是一个非常重要的概念。接口在Golang中扮演着非常关键的角色,它在实现多态性、解耦和组合等方面发挥着...
    99+
    2024-03-13
    接口 golang 设计模式
  • Golang WaitGroup 底层原理及源码解析
    目录0 知识背景0.1 WaitGroup0.2 信号量(Semaphore)1 WaitGroup 底层原理1.1 定义1.1.1 noCopy1.1.2 state atomic...
    99+
    2023-05-18
    Golang WaitGroup 原理 Golang WaitGroup源码
  • InnoDB MVCC实现原理及源码解析
    1、原理介绍 数据多版本(MVCC)是MySQL实现高性能的一个主要的一个主要方式,通过对普通的SELECT不加锁,直接利用MVCC读取指版本的值,避免了对数据重复加锁的过程。InnoDB支持MVCC多版本...
    99+
    2024-04-02
  • Golang 函数设计原理揭秘
    go 函数设计原理包括:函数签名声明函数名称、参数和返回值类型;可变参数允许不知晓参数数目的调用;可返回多个值,通过名字返回值简化命名;函数类型可用于创建和传递函数。实战中函数设计的示例...
    99+
    2024-04-19
    golang 函数设计
  • Hive架构设计及原理的示例分析
    这篇文章给大家分享的是有关Hive架构设计及原理的示例分析的内容。小编觉得挺实用的,因此分享给大家做个参考,一起跟随小编过来看看吧。Hive架构设计及原理1.什么是Hive:Hive是构建在Hadoop之上的数据仓库平台,可以结构化的数据文...
    99+
    2023-06-03
  • golang匿名函数及闭包原理解析
    是的,go 中的匿名函数可用于快速定义一次性函数或立即执行函数,而闭包则用于将局部变量封锁在匿名函数中,即使后者返回也能访问这些变量。 Go 中的匿名函数和闭包理解 匿名函数是在不定义...
    99+
    2024-05-03
    函数 golang 闭包
  • OpenMPtaskconstruct实现原理及源码示例解析
    目录前言从编译器角度看 task constructTask Construct 源码分析总结前言 在本篇文章当中主要给大家介绍在 OpenMP 当中 task 的实现原理,以及他...
    99+
    2023-03-06
    OpenMP task construct原理 OpenMP task construct
  • 浅析Golang中map的实现原理
    Golang是一门支持面向对象编程的编程语言,它拥有高效的内存管理机制和灵活的语法特性,被广泛用于服务器端开发、网络编程、云计算等领域。在Golang中,map是一种非常重要的数据结构,它可以存储键值对,并提供快速的查找和插入操作。本文将介...
    99+
    2023-05-14
    go语言 Golang map
  • 深入理解Golang make和new的区别及实现原理
    目录前言new的使用底层实现make的使用底层实现总结前言 在Go语言中,有两个比较雷同的内置函数,分别是new和make方法,二者都可以用来分配内存,那他们有什么区别呢?对于初学者...
    99+
    2024-04-02
  • 详解Golang中NewTimer计时器的底层实现原理
    目录1.简介2.基本使用3.实现原理3.1 内容分析3.2 基本思路3.3 实现步骤3.4 NewTimer的实现4.总结1.简介 本文将介绍 Go 语言中的NewTimer,首先展...
    99+
    2023-05-18
    Golang NewTimer计时器原理 Golang NewTimer计时器 Golang 计时器
  • golang架构设计开闭原则手写实现
    目录缘起开闭原则场景思路ICourse.goGolangCourse.goIDiscount.goDiscountedGolangCourse.goopen_close_test.g...
    99+
    2024-04-02
  • Golang编译原理解析
    Golang编译原理解析与具体代码示例 在现代编程语言中,编译原理是一个至关重要的领域,它涉及到将高级语言代码转换为机器能够理解和执行的低级指令的过程。作为一门流行的编程语言,Gola...
    99+
    2024-03-06
    原理 编译 golang go语言
  • GolangWaitGroup实现原理解析
    原理解析 type WaitGroup struct { noCopy noCopy // 64-bit value: high 32 bits are counter,...
    99+
    2023-02-03
    Go WaitGroup Go WaitGroup实现原理
  • react diff 算法实现思路及原理解析
    目录事例分析diff 特点diff 思路实现 diff 算法修改入口文件实现 React.Fragment我们需要修改 children 对比前面几节我们学习了解了 react 的渲...
    99+
    2024-04-02
软考高级职称资格查询
编程网,编程工程师的家园,是目前国内优秀的开源技术社区之一,形成了由开源软件库、代码分享、资讯、协作翻译、讨论区和博客等几大频道内容,为IT开发者提供了一个发现、使用、并交流开源技术的平台。
  • 官方手机版

  • 微信公众号

  • 商务合作