返回顶部
首页 > 资讯 > 后端开发 > GO >深入理解Golangchannel的应用
  • 812
分享到

深入理解Golangchannel的应用

2024-04-02 19:04:59 812人浏览 安东尼
摘要

目录前言整体结构创建发送接收关闭前言 channel是用于 Goroutine 之间的同步、通信的数据结构 channel 的底层是通过 mutex 来控制并发的,但它为程序员提供了

前言

channel是用于 Goroutine 之间的同步、通信的数据结构

channel 的底层是通过 mutex 来控制并发的,但它为程序员提供了更高一层次的抽象,封装了更多的功能,这样并发编程变得更加容易和安全,得以让程序员把注意力留到业务上去,提升开发效率

channel的用途包括但不限于以下几点:

  • 协程间通信,同步
  • 定时任务:和timer结合
  • 解耦生产方和消费方,实现阻塞队列
  • 控制并发数

本文将介绍channel的底层原理,包括数据结构,channel的创建,发送,接收,关闭的实现逻辑

整体结构

Go channel的数据结构如下所示:

type hchan struct {
    qcount   uint           // total data in the queue
    dataqsiz uint           // size of the circular queue
    buf      unsafe.Pointer // points to an array of dataqsiz elements
    elemsize uint16
    closed   uint32
    elemtype *_type // element type
    sendx    uint   // send index
    recvx    uint   // receive index
    recvq    waitq  // list of recv waiters
    sendq    waitq  // list of send waiters
    lock mutex
}

qcount:已经存储了多少个元素

dataqsie:最多存储多少个元素,即缓冲区容量

buf:指向缓冲区的位置,实际上是一个数组

elemsize:每个元素占多大空间

closed:channel能够关闭,这里记录其关闭状态

elemtype:保存数据的类型信息,用于go运行时使用

sendx,recvx:

  • 记录下一个要发送到的位置,下一次从哪里还是接收
  • 这里用数组模拟队列,这两个变量即表示队列的队头,队尾
  • 因此channel的缓冲也被称为环形缓冲区

recvq,sendq:

当发送个接收不能立即完成时,需要让协程在channel上等待,所以有两个等待队列,分别针对接收和发送

lock:channel支持协程间并发访问,因此需要一把来保护

创建

创建channel会被编译器编译为调用makechan函数

// 无缓冲通道
ch1 := make(chan int)
// 有缓冲通道
ch2 := make(chan int, 10)

会根据创建的是带缓存,还是无缓冲,决定第二个参数size的值

可以看出,创建出来的是hchan指针,这样就能在函数间直接传递 channel,而不用传递 channel 的指针

func makechan(t *chantype, size int) *hchan {
   elem := t.elem
    
   // mem:缓冲区大小 
   mem, overflow := math.MulUintptr(elem.size, uintptr(size))
   if overflow || mem > maxAlloc-hchanSize || size < 0 {
      panic(plainError( "makechan: size out of range" ))
   }

   var c *hchan
   switch {
   // 缓冲区大小为空,只申请hchanSize大小的内存
   case mem == 0:
       c = (*hchan)(mallocGC(hchanSize, nil, true))
       c.buf = c.raceaddr()
   // 元素类型不包含指针,一次性分配hchanSize+mem大小的内存
   case elem.ptrdata == 0:
       c = (*hchan)(mallocgc(hchanSize+mem, nil, true))
       c.buf = add(unsafe.Pointer(c), hchanSize)
   // 否则就是带缓存,且有指针,分配两次内存
   default:
      // Elements contain pointers.
       c = new(hchan)
       c.buf = mallocgc(mem, elem, true)
   }
    
   // 保存元素类型,元素大小,容量
   c.elemsize = uint16(elem.size)
   c.elemtype = elem
   c.dataqsiz = uint(size)
   lockInit(&c.lock, lockRankHchan)
   
   return c
}

发送

执行以下代码时:

ch <- 3

编译器会转化为对chansend的调用

func chansend(c *hchan, ep unsafe.Pointer, block bool, callerpc uintptr) bool {
   // 如果channel是空
   if c == nil {
      // 非阻塞,直接返回
      if !block {
         return  false
      }
      // 否则阻塞当前协程
      gopark(nil, nil, waitReasonChanSendNilChan, traceEvGoStop, 2)
      throw( "unreachable" )
   }

   // 非阻塞,没有关闭,且容量满了,无法发送,直接返回
   if !block && c.closed == 0 && full(c) {
      return  false
   }

   // 加锁
   lock(&c.lock)

   // 如果已经关闭,无法发送,直接panic
   if c.closed != 0 {
      unlock(&c.lock)
      panic(plainError( "send on closed channel" ))
   }

   // 从接收队列弹出一个协程的包装结构sudog
   if sg := c.recvq.dequeue(); sg != nil {
      // 如果能弹出,即有等到接收的协程,说明:
      // 该channel要么是无缓冲,要么缓冲区为空,不然不可能有协程在等待
      // 将要发送的数据拷贝到该协程的接收指针上
      send(c, sg, ep, func() { unlock(&c.lock) }, 3)
      return  true
}

   // 缓冲区还有空间
   if c.qcount < c.dataqsiz {
      // qp:计算要发送到的位置的地址
      qp := chanbuf(c, c.sendx)
      // 将数据从ep拷贝到qp
      typedmemmove(c.elemtype, qp, ep)
      // 待发送位置移动
      c.sendx++
      // 由于是数组模拟队列,sendx到顶了需要归零
      if c.sendx == c.dataqsiz {
         c.sendx = 0
      }
      // 缓冲区数量++
      c.qcount++
      unlock(&c.lock)
      return  true
}

   // 往下就是缓冲区无数据,也没有等到接收协程的情况了
   
   // 如果是非阻塞模式,直接返回
   if !block {
      unlock(&c.lock)
      return  false
    }

   // 将当前协程包装成sudog,阻塞到channel上
   gp := getg()
   mysg := acquireSudog()
   mysg.releasetime = 0
   if t0 != 0 {
      mysg.releasetime = -1
   }
  
   mysg.elem = ep
   mysg.waitlink = nil
   mysg.g = gp
   mysg.isSelect = false
   mysg.c = c
   gp.waiting = mysg
   gp.param = nil
   
   // 当前协程进入发送等待队列
   c.sendq.enqueue(mysg)
   atomic.Store8(&gp.parkingOnChan, 1)
   gopark(chanparkcommit, unsafe.Pointer(&c.lock), waitReasonChanSend, traceEvGoBlockSend, 2)
   
 // 被唤醒后从这里开始执行
   
   KeepAlive(ep)

   if mysg != gp.waiting {
      throw( "G waiting list is corrupted" )
   }
   gp.waiting = nil
   gp.activeStackChans = false
   closed := !mysg.success
   gp.param = nil
   if mysg.releasetime > 0 {
      blockevent(mysg.releasetime-t0, 2)
   }
   mysg.c = nil
   releaseSudog(mysg)
   // 被唤醒后发现channel关闭了,panic
   if closed {
      if c.closed == 0 {
         throw( "chansend: spurious wakeup" )
      }
      panic(plainError( "send on closed channel" ))
   }
   return  true
}

整体流程为:

如果当前操作为非阻塞,channel没有关闭,且容量满了,无法发送,直接返回

从接收队列弹出一个协程的包装结构sudog,如果能弹出,即有等到接收的协程,说明:

  • 该channel要么是无缓冲,要么缓冲区为空,不然不可能有协程在等待
  • 将要发送的数据拷贝到该协程的接收指针上,返回
  • 这里直接从发送者拷贝到接收者的内存,而不是先把数据拷贝到缓冲区,再从缓冲区拷贝到接收者,节约了一次内存拷贝

否则看看缓冲区还有空间,如果有,将数据拷贝到缓冲区上,也返回

接下来就是既没有接收者等待,缓冲区也为空的情况,就需要将当前协程包装成sudog,阻塞到channel上

将协程阻塞到channel的等待队列时,将其包装成了sudog结构:

type sudog struct {
   // 协程
   g *g
   // 前一个,后一个指针
   next *sudog
   prev *sudog
   // 等到发送的数据在哪,等待从哪个位置接收数据
   elem unsafe.Pointer
   acquiretime int64
   releasetime int64
   ticket      uint32
   isSelect bool
   success bool

   parent   *sudog // semaRoot binary tree
   waitlink *sudog // g.waiting list or semaRoot
   waittail *sudog // semaRoot
   // 在哪个channel上等待
   c        *hchan // channel
}

其目的是:

  • g本身没有存储前一个,后一个指针,需要用sudog结构包装才能加入队列
  • elem字段存储等到发送的数据在哪,等待从哪个位置接收数据,用于从数据能从协程到协程的直接拷贝

来看看一些子函数:

1.判断channel是否是满的

func full(c *hchan) bool {
   // 无缓冲
   if c.dataqsiz == 0 {
      // 并且没有其他协程在等待
      return c.recvq.first == nil
   }
   // 有缓冲,但容量装满了
   return c.qcount == c.dataqsiz
}

2.send方法:


func send(c *hchan, sg *sudog, ep unsafe.Pointer, unlockf func(), skip int) {
   // 如果接收者指针不为空,直接把数据从ep拷贝到sg.elem
   if sg.elem != nil {
      sendDirect(c.elemtype, sg, ep)
      sg.elem = nil
   }
   gp := sg.g
   unlockf()
   gp.param = unsafe.Pointer(sg)
   sg.success = true
   if sg.releasetime != 0 {
      sg.releasetime = cputicks()
   }
   // 唤醒该接收者协程
   goready(gp, skip+1)
}

接收

从channel中接收数据有几种写法:

  • 带不带ok
  • 接不接收返回值

根据带不带ok,决定用下面哪个方法

func chanrecv1(c *hchan, elem unsafe.Pointer) {
        chanrecv(c, elem, true)
}

func chanrecv2(c *hchan, elem unsafe.Pointer) (received bool) {
        _, received = chanrecv(c, elem, true)
        return
}

根据接不接收返回值,决定elem是不是nil

最终都会调用chanrecv方法:

func chanrecv(c *hchan, ep unsafe.Pointer, block bool) (selected, received bool) {
    // 如果channel为nil,根据参数中是否阻塞来决定是否阻塞
   if c == nil {
      if !block {
         return
   }
      gopark(nil, nil, waitReasonChanReceiveNilChan, traceEvGoStop, 2)
      throw( "unreachable" )
   }

   // 非阻塞,并且channel为空
   if !block && empty(c) {
      // 如果还没关闭,直接返回
   if atomic.Load(&c.closed) == 0 {
      return
   }
      // 否则已经关闭,
      // 如果为空,返回该类型的零值
   if empty(c) {
     if ep != nil {
        typedmemclr(c.elemtype, ep)
     }
     return  true, false
       }
   }

   lock(&c.lock)
   
   // 同样,如果channel已经关闭,且缓冲区没有元素,返回该类型零值
   if c.closed != 0 && c.qcount == 0 {
      unlock(&c.lock)
      if ep != nil {
         typedmemclr(c.elemtype, ep)
      }
      return  true, false
}
    
   // 如果有发送者正在阻塞,说明:
   // 1.无缓冲
   // 2.有缓冲,但缓冲区满了。因为只有缓冲区满了,才可能有发送者在等待
   if sg := c.sendq.dequeue(); sg != nil {
      // 将数据从缓冲区拷贝到ep,再将sg的数据拷贝到缓冲区,该函数详细流程可看下文
      recv(c, sg, ep, func() { unlock(&c.lock) }, 3)
      return  true, true
}
    
   // 如果缓存区有数据, 
   if c.qcount > 0 {
      // qp为缓冲区中下一次接收的位置
      qp := chanbuf(c, c.recvx)
      // 将数据从qp拷贝到ep
      if ep != nil {
         typedmemmove(c.elemtype, ep, qp)
      }
      typedmemclr(c.elemtype, qp)
      c.recvx++
      if c.recvx == c.dataqsiz {
         c.recvx = 0
      }
      c.qcount--
      unlock(&c.lock)
      return  true, true
}

   // 接下来就是既没有发送者在等待,也缓冲区也没数据
   if !block {
      unlock(&c.lock)
      return  false, false
}

   // 将当前协程包装成sudog,阻塞到channel中
   gp := getg()
   mysg := acquireSudog()
   mysg.releasetime = 0
   if t0 != 0 {
      mysg.releasetime = -1
   }
   // 记录接收地址
   mysg.elem = ep
   mysg.waitlink = nil
   gp.waiting = mysg
   mysg.g = gp
   mysg.isSelect = false
   mysg.c = c
   gp.param = nil
   c.recvq.enqueue(mysg)

   atomic.Store8(&gp.parkingOnChan, 1)
   gopark(chanparkcommit, unsafe.Pointer(&c.lock), waitReasonChanReceive,        traceEvGoBlockRecv, 2)

   // 从这里唤醒
   if mysg != gp.waiting {
      throw( "G waiting list is corrupted" )
   }
   gp.waiting = nil
   gp.activeStackChans = false
   if mysg.releasetime > 0 {
      blockevent(mysg.releasetime-t0, 2)
   }
   success := mysg.success
   gp.param = nil
   mysg.c = nil
   releaseSudog(mysg)
   return  true, success
}

接收流程如为:

如果channel为nil,根据参数中是否阻塞来决定是否阻塞

如果channel已经关闭,且缓冲区没有元素,返回该类型零值

如果有发送者正在阻塞,说明:

  • 要么是无缓冲
  • 有缓冲,但缓冲区满了。因为只有缓冲区满了,才可能有发送者在等待
  • 将数据从缓冲区拷贝到ep,再将发送者的数据拷贝到缓冲区,并唤该发送者

如果缓存区有数据, 则从缓冲区将数据复制到ep,返回

接下来就是既没有发送者在等待,也缓冲区也没数据的情况:

将当前协程包装成sudog,阻塞到channel中

来看其中的子函数recv():


func recv(c *hchan, sg *sudog, ep unsafe.Pointer, unlockf func(), skip int) {
   // 如果是无缓冲channel,直接将数据从发送者sg拷贝到ep
   if c.dataqsiz == 0 {
      if ep != nil {
         recvDirect(c.elemtype, sg, ep)
      }
   // 接下来是有缓冲,且缓冲区满的情况   
   } else {
      // qp为channel缓冲区中,接收者下一次接收的地址
   qp := chanbuf(c, c.recvx)
      // 将数据从qp拷贝到ep
   if ep != nil {
         typedmemmove(c.elemtype, ep, qp)
    }
    // 将发送者的数据从sg.elem拷贝到qp
    typedmemmove(c.elemtype, qp, sg.elem)
    c.recvx++
    if c.recvx == c.dataqsiz {
       c.recvx = 0
    }
    // 由于一接收已发送,缓冲区还是满的,因此 c.sendx = c.recvx
    c.sendx = c.recvx 
}
   sg.elem = nil
   gp := sg.g
   unlockf()
   gp.param = unsafe.Pointer(sg)
   sg.success = true
   if sg.releasetime != 0 {
      sg.releasetime = cputicks()
   }
   // 唤醒发送者
   goready(gp, skip+1)
}

关闭

func closechan(c *hchan) {
   // 不能关闭空channel
   if c == nil {
      panic(plainError( "close of nil channel" ))
   }

   lock(&c.lock)
   // 不能重复关闭
   if c.closed != 0 {
      unlock(&c.lock)
      panic(plainError( "close of closed channel" ))
   }

   // 修改关闭状态
   c.closed = 1

   var glist gList

   // 释放所有的接收者协程,并为它们赋予零值
 for {
      sg := c.recvq.dequeue()
      if sg == nil {
         break
      }
      if sg.elem != nil {
         typedmemclr(c.elemtype, sg.elem)
         sg.elem = nil
      }
      if sg.releasetime != 0 {
         sg.releasetime = cputicks()
      }
      gp := sg.g
      gp.param = unsafe.Pointer(sg)
      sg.success = false
      glist.push(gp)
   }

   // 释放所有的发送者协程
 for {
      sg := c.sendq.dequeue()
      if sg == nil {
         break
     }
      sg.elem = nil
      if sg.releasetime != 0 {
         sg.releasetime = cputicks()
      }
      gp := sg.g
      gp.param = unsafe.Pointer(sg)
      sg.success = false
      glist.push(gp)
   }
   unlock(&c.lock)

   // 执行唤醒操作
 for !glist.empty() {
      gp := glist.pop()
      gp.schedlink = 0
      goready(gp, 3)
   }
}

关闭的流程比较简单,可以看出:

不能关闭空channel,不能重复关闭channel

先上一把大锁,接着把所有挂在这个 channel 上的 sender 和 receiver 全都连成一个 sudog 链表,再解锁。最后,再将所有的 sudog 全都唤醒:

接收者:会收到该类型的零值

这里返回零值没有问题,因为之所以这些接收者会阻塞,就是因为缓冲区没有数据,因此channel关闭后该接收者收到零值也符合逻辑

发送者:会被唤醒,然后panic

因此不能在有多个sender的时候贸然关闭channel

以上就是深入理解golang channel的应用的详细内容,更多关于Golang channel的资料请关注编程网其它相关文章!

您可能感兴趣的文档:

--结束END--

本文标题: 深入理解Golangchannel的应用

本文链接: https://lsjlt.com/news/121326.html(转载时请注明来源链接)

有问题或投稿请发送至: 邮箱/279061341@qq.com    QQ/279061341

猜你喜欢
  • 深入理解Golangchannel的应用
    目录前言整体结构创建发送接收关闭前言 channel是用于 goroutine 之间的同步、通信的数据结构 channel 的底层是通过 mutex 来控制并发的,但它为程序员提供了...
    99+
    2024-04-02
  • Spring中Controller应用深入理解
    目录概述1. 添加依赖2. 关于异常总结概述 Controller是Spring接受并处理网页请求的组件,是整个应用的入口,因此学会Controller的常用注解对理解一个应用是重中...
    99+
    2022-12-08
    Spring Controller Spring Controller原理 Spring Controller注解
  • 深入理解Python中\n的作用及应用
    在Python中,我们经常会见到`这个符号,它代表着换行符,用于表示文本中的换行。在本文中,我们将深入理解Python中`的作用及应用,并通过具体的代码示例来展示它的使用方式。 首先,...
    99+
    2024-04-02
  • 深入理解Vue3响应式原理
    目录响应式原理手写实现1、实现Reactive2、实现依赖的收集和触发effect影响函数收集/添加依赖触发依赖3、移除/停止依赖衍生类型1、实现readonly2、实现shallo...
    99+
    2022-12-19
    vue3响应式原理精讲 vue3 响应式 vue 响应式原理
  • 深入理解 PHP Session 跨域的应用场景
    引言:在 Web 开发中,会经常遇到需要在不同域名下进行数据共享的场景。而 PHP Session 是一种常用的实现方式,用于在不同页面之间传递用户会话数据。然而,由于浏览器的同源策略,Session 数据在跨域情况下的传递会受到限制。本文...
    99+
    2023-10-21
    应用场景 PHP Session 跨域
  • 深入解析Golang中锁的原理和应用
    Golang中锁的原理及应用解析引言在并发编程中,常常会遇到多个 goroutine 同时访问共享资源的情况,一旦多个 goroutine 同时对共享资源进行读写操作,可能导致数据不一致性或者产生竞态条件。为了解决这个问题,Golang 提...
    99+
    2023-12-28
    Golang中锁的原理:锁机制 Golang中锁的应用:并发编程 Golang中锁的解析:互斥锁
  • Java反射的应用之动态代理深入理解
    目录一、代理模式的引入二、动态代理一、代理模式的引入 静态代理举例 特点:代理类和被代理类在编译期间,就确定下来了。 interface ClothFactory{ ...
    99+
    2024-04-02
  • 深入理解Vue的数据响应式
    目录1. ES语法的getter和setter2. ES语法的 defineProperty3. Vue对数据的代理和监听4. Vue的数据响应式1. ES语法的getter和set...
    99+
    2024-04-02
  • Flask模板继承深入理解与应用
    目录什么叫模板继承呢模板页完整代码什么叫模板继承呢 在我的理解就是:在前端页面中肯定有很多页面中有很多相同的地方,比如页面顶部的导航栏,底部的页脚等部分,这时候如果每一个页面都重写一...
    99+
    2024-04-02
  • 深入理解vue的使用
    目录理解vue的核心理念探讨vue的双向绑定原理及实现vue双向绑定原理实现过程理解vue的核心理念 使用vue会让人感到身心愉悦,它同时具备angular和react的优点,轻量级...
    99+
    2024-04-02
  • 深入理解Vuex的作用
    目录概述组件之间共享数据的方式Vuex原理简介Vuex是实现组件全局状态(数据)管理的一种机制什么样的数据适合存储到Vuex中Vuex的基本使用1.安装Vuex依赖包2.导入Vuex...
    99+
    2024-04-02
  • 深入理解golangchan的使用
    目录前言见真身结构体发送数据接收数据上手定义发送与接收前言 之前在看golang多线程通信的时候, 看到了go 的管道. 当时就觉得这玩意很神奇, 因为之前接触过的不管是php, j...
    99+
    2024-04-02
  • 深入理解粘性定位的应用和功能
    粘性定位是一种在网页设计中常用的技术,它能够使网页元素保持在页面的固定位置,即使用户滚动页面时也不会发生改变。粘性定位具有很强的功能性和实用性,在网页设计和用户体验中发挥着重要作用。本文将探讨粘性定位的功能和应用。 一、功能 ...
    99+
    2024-01-29
    应用 定位 粘性 粘性定位功能
  • 深入理解MySQL存储过程的应用场景
    深入理解MySQL存储过程的应用场景 MySQL是一种常用的关系型数据库管理系统,广泛应用于各种Web应用和企业信息系统中。存储过程是MySQL中一种重要的数据库对象,它是一组预先编译...
    99+
    2024-03-14
    mysql 应用场景 存储过程 sql语句
  • 深入理解z-index的工作原理和应用技巧
    目录前言1、z-index2、层叠上下文3、层叠水平4、层叠顺序5、创建层叠上下文小结前言 最近参与某前端项目架构改造,发现项目中滥用z-index,设置的值有几十种并且不统一。在对...
    99+
    2023-05-19
    z-index属性 z-index的用法 网页制作中层的z-index
  • GosyncWaitGroup使用深入理解
    目录基本介绍使用源码分析AddDoneWait注意事项基本介绍 WaitGroup是go用来做任务编排的一个并发原语,它要解决的就是并发 - 等待的问题: 当有一个 goroutin...
    99+
    2024-04-02
  • 什么是JWT?深入理解JWT从原理到应用
    🎉🎉欢迎来到我的CSDN主页!🎉🎉 🏅我是Java方文山,一个在CSDN分享笔记的博主。📚📚 🌟推荐给...
    99+
    2023-10-08
    jwt jjwt-root session
  • 深入解析UUID及其应用
    讨论UUID的定义、分类、应用及生成工具。 什么是UUID? UUID是Universally Unique Identifier的缩写,它是在一定的范围内(从特定的名字空间到全球)唯一的机器生成的标识...
    99+
    2024-04-02
  • 深入分析Android ViewStub的应用详解
    在开发应用程序的时候,经常会遇到这样的情况,会在运行时动态根据条件来决定显示哪个View或某个布局。那么最通常的想法就是把可能用到的View都写在上面,先把它们的可见性都设为V...
    99+
    2022-06-06
    viewstub Android
  • 深入讲解SPI 在 Spring 中的应用
    目录一、概述二、Java SPI2.1 Java SPI2.2 源码分析三、Dubbo SPI3.1 基本概念3.2 Dubbo SPI3.3 源码分析四、Spring SPI4.1...
    99+
    2024-04-02
软考高级职称资格查询
编程网,编程工程师的家园,是目前国内优秀的开源技术社区之一,形成了由开源软件库、代码分享、资讯、协作翻译、讨论区和博客等几大频道内容,为IT开发者提供了一个发现、使用、并交流开源技术的平台。
  • 官方手机版

  • 微信公众号

  • 商务合作