目录常规用法创建链接池创建链接池接口实现链接池接口关闭链接扩缩容性能测试常规用法 grpc 四种基本使用 请求响应模式客户端数据流模式服务端数据流模式双向流模式 常见的grpc调用
grpc
四种基本使用
常见的grpc
调用写法
func main(){
//... some code
// 链接grpc服务
conn , err := grpc.Dial(":8000",grpc.WithInsecure)
if err != nil {
//...log
}
defer conn.Close()
//...some code
存在的问题:面临高并发的情况,性能问题很容易就会出现,例如我们在做性能测试的时候,就会发现,打一会性能测试,客户端请求服务端的时候就会报错:
rpc error: code = Unavailable desc = all SubConns are in TransientFailure, latest connection error: connection error: desc = "transport: Error while dialing dial tcp xxx:xxx: connect: connection refused
实际去查看问题的时候,很明显,这是 gRPC 的连接数被打满了,很多连接都还未完全释放。[#本文来源:janrs.com#]
gRPC
的通信本质上也是 TCP
的连接,那么一次连接就需要三次握手,和四次挥手,每一次建立连接和释放连接的时候,都需要走这么一个过程,如果我们频繁的建立和释放连接,这对于资源和性能其实都是一个大大的浪费。
在服务端,gRPC
服务端的链接管理不用我们操心,但是 gRPC
客户端的链接管理非常有必要关心,要实现复用客户端的连接。
创建链接池需要考虑的问题:
type Pool interface {
// 获取一个新的连接 , 当关闭连接的时候,会将该连接放入到池子中
Get() (Conn, error)
// 关闭连接池,自然连接池子中的连接也不再可用
Close() error
//[#本文来源:janrs.com#]
Status() string
}
创建链接池代码
func New(address string, option Options) (Pool, error) {
if address == "" {
return nil, errors.New("invalid address settings")
}
if option.Dial == nil {
return nil, errors.New("invalid dial settings")
}
if option.MaxIdle <= 0 || option.MaxActive <= 0 || option.MaxIdle > option.MaxActive {
return nil, errors.New("invalid maximum settings")
}
if option.MaxConcurrentStreams <= 0 {
return nil, errors.New("invalid maximun settings")
}
p := &pool{
index: 0,
current: int32(option.MaxIdle),
ref: 0,
opt: option,
conns: make([]*conn, option.MaxActive),
address: address,
closed: 0,
}
for i := 0; i < p.opt.MaxIdle; i++ {
c, err := p.opt.Dial(address)
if err != nil {
p.Close()
return nil, fmt.Errorf("dial is not able to fill the pool: %s", err)
}
p.conns[i] = p.wrapConn(c, false)
}
log.Printf("new pool success: %v\n", p.Status())
return p, nil
}
关于以上的代码,需要特别注意每一个连接的建立也是在 New
里面完成的,[#本文来源:janrs.com#]只要有 1
个连接未建立成功,那么咱们的连接池就算是建立失败,咱们会调用 p.Close()
将之前建立好的连接全部释放掉。
关闭链接池代码
// 关闭连接池
func (p *pool) Close() error {
atomic.StoreInt32(&p.closed, 1)
atomic.StoreUint32(&p.index, 0)
atomic.StoreInt32(&p.current, 0)
atomic.StoreInt32(&p.ref, 0)
p.deleteFrom(0)
log.Printf("[janrs.com]close pool success: %v\n", p.Status())
return nil
}
从具体位置删除链接池代码
// 清除从 指定位置开始到 MaxActive 之间的连接
func (p *pool) deleteFrom(begin int) {
for i := begin; i < p.opt.MaxActive; i++ {
p.reset(i)
}
}
销毁具体的链接代码
// 清除具体的连接
func (p *pool) reset(index int) {
conn := p.conns[index]
if conn == nil {
return
}
conn.reset()
p.conns[index] = nil
}
代码
func (c *conn) reset() error {
cc := c.cc
c.cc = nil
c.once = false
// 本文博客来源:janrs.com
if cc != nil {
return cc.Close()
}
return nil
}
func (c *conn) Close() error {
c.pool.decrRef()
if c.once {
return c.reset()
}
return nil
}
在使用连接池通过 pool.Get()
拿到具体的连接句柄 conn
之后,会使用 conn.Close()
关闭连接,实际上也是会走到上述的 Close()
实现的位置,但是并未指定当然也没有权限显示的指定将 once
置位为 false
,也就是对于调用者来说,是关闭了连接,对于连接池来说,实际上是将连接归还到连接池中。
关键代码
func (p *pool) Get() (Conn, error) {
// the first selected from the created connections
nextRef := p.incrRef()
p.RLock()
current := atomic.LoadInt32(&p.current)
p.RUnlock()
if current == 0 {
return nil, ErrClosed
}
if nextRef <= current*int32(p.opt.MaxConcurrentStreams) {
next := atomic.AddUint32(&p.index, 1) % uint32(current)
return p.conns[next], nil
}
// 本文博客来源:janrs.com
// the number connection of pool is reach to max active
if current == int32(p.opt.MaxActive) {
// the second if reuse is true, select from pool's connections
if p.opt.Reuse {
next := atomic.AddUint32(&p.index, 1) % uint32(current)
return p.conns[next], nil
}
// the third create one-time connection
c, err := p.opt.Dial(p.address)
return p.wrapConn(c, true), err
}
// the fourth create new connections given back to pool
p.Lock()
current = atomic.LoadInt32(&p.current)
if current < int32(p.opt.MaxActive) && nextRef > current*int32(p.opt.MaxConcurrentStreams) {
// 2 times the incremental or the remain incremental ##janrs.com
increment := current
if current+increment > int32(p.opt.MaxActive) {
increment = int32(p.opt.MaxActive) - current
}
var i int32
var err error
for i = 0; i < increment; i++ {
c, er := p.opt.Dial(p.address)
if er != nil {
err = er
break
}
p.reset(int(current + i))
p.conns[current+i] = p.wrapConn(c, false)
}
// 本文博客来源:janrs.com
current += i
log.Printf("#janrs.com#grow pool: %d ---> %d, increment: %d, maxActive: %d\n",
p.current, current, increment, p.opt.MaxActive)
atomic.StoreInt32(&p.current, current)
if err != nil {
p.Unlock()
return nil, err
}
}
p.Unlock()
next := atomic.AddUint32(&p.index, 1) % uint32(current)
return p.conns[next], nil
}
Get
代码逻辑
current*int32(p.opt.MaxConcurrentStreams)
范围内,那么直接取连接进行使用即可。option
中的 reuse
参数是否是 true
,若是复用,则随机取出连接池中的任意连接提供使用,如果不复用,则新建一个连接。2
倍或者 1
倍的数量对连接池进行扩容了。也可以在 Get
的实现上进行缩容,具体的缩容策略可以根据实际情况来定,例如当引用计数 nextRef
只有当前活跃连接数的 10%
的时候(这只是一个例子),就可以考虑缩容了。
有关链接池的创建以及性能测试
mycodesmells.com/post/poolin…
--结束END--
本文标题: Go创建Grpc链接池实现过程详解
本文链接: https://lsjlt.com/news/198399.html(转载时请注明来源链接)
有问题或投稿请发送至: 邮箱/279061341@qq.com QQ/279061341
2024-04-05
2024-04-05
2024-04-05
2024-04-04
2024-04-05
2024-04-05
2024-04-05
2024-04-05
2024-04-04
回答
回答
回答
回答
回答
回答
回答
回答
回答
回答
0