针对这个资源池管理的一步步都实现了,而且做了详细的讲解,下面就看下整个示例代码,方便理解。package commonimport ( "
针对这个资源池管理的一步步都实现了,而且做了详细的讲解,下面就看下整个示例代码,方便理解。
package commonimport (
"errors"
"io"
"sync"
"log")//一个安全的资源池,被管理的资源必须都实现io.Close接口type Pool struct {
m sync.Mutex
res chan io.Closer
factory func() (io.Closer, error)
closed bool}var ErrPoolClosed = errors.New("资源池已经被关闭。")//创建一个资源池func New(fn func() (io.Closer, error), size uint) (*Pool, error) {
if size <= 0 {
return nil, errors.New("size的值太小了。")
}
return &Pool{
factory: fn,
res: make(chan io.Closer, size),
}, nil}//从资源池里获取一个资源func (p *Pool) Acquire() (io.Closer,error) {
select {
case r,ok := <-p.res:
log.Println("Acquire:共享资源")
if !ok {
return nil,ErrPoolClosed
}
return r,nil
default:
log.Println("Acquire:新生成资源")
return p.factory()
}}
//关闭资源池,释放资源func (p *Pool) Close() {
p.m.Lock()
defer p.m.Unlock()
if p.closed {
return
}
p.closed = true
//关闭通道,不让写入了
close(p.res) //关闭通道里的资源
for r:=range p.res {
r.Close()
}}func (p *Pool) Release(r io.Closer){
//保证该操作和Close方法的操作是安全的
p.m.Lock()
defer p.m.Unlock()
//资源池都关闭了,就省这一个没有释放的资源了,释放即可
if p.closed {
r.Close()
return
}
select {
case p.res <- r:
log.Println("资源释放到池子里了")
default:
log.Println("资源池满了,释放这个资源吧")
r.Close()
}
}
好了,资源池管理写好了,也知道资源池是如何实现的啦,现在我们看看如何使用这个资源池,模拟一个数据库连接池吧。
package mainimport (
"flysnow.org/hello/common"
"io"
"log"
"math/rand"
"sync"
"sync/atomic"
"time")const (
//模拟的最大Goroutine
maxGoroutine = 5
//资源池的大小
poolRes = 2)func main() {
//等待任务完成
var wg sync.WaitGroup
wg.Add(maxGoroutine)
p, err := common.New(createConnection, poolRes)
if err != nil {
log.Println(err)
return
}
//模拟好几个goroutine同时使用资源池查询数据
for query := 0; query < maxGoroutine; query++ {
go func(q int) {
dbQuery(q, p)
wg.Done()
}(query)
}
wg.Wait()
log.Println("开始关闭资源池")
p.Close()}//模拟数据库查询func dbQuery(query int, pool *common.Pool) {
conn, err := pool.Acquire()
if err != nil {
log.Println(err)
return
}
defer pool.Release(conn)
//模拟查询
time.Sleep(time.Duration(rand.Intn(1000)) * time.Millisecond)
log.Printf("第%d个查询,使用的是ID为%d的数据库连接", query, conn.(*dbConnection).ID)}//数据库连接type dbConnection struct {
ID int32//连接的标志}//实现io.Closer接口func (db *dbConnection) Close() error {
log.Println("关闭连接", db.ID)
return nil}var idCounter int32//生成数据库连接的方法,以供资源池使用func createConnection() (io.Closer, error) {
//并发安全,给数据库连接生成唯一标志
id := atomic.AddInt32(&idCounter, 1)
return &dbConnection{id}, nil
}
这时我们测试使用资源池的例子,首先定义了一个结构体dbConnection,它只有一个字段,用来做唯一标记。然后dbConnection实现了io.Closer接口,这样才可以使用我们的资源池。
createConnection函数对应的是资源池中的factory字段,用来创建数据库连接dbConnection的,同时为其赋予了一个为止的标志。
接着我们就同时开了 5 个goroutine,模拟并发的数据库查询dbQuery,查询方法里,先从资源池获取可用的数据库连接,用完后再释放。
这里我们会创建 5 个数据库连接,但是我们设置的资源池大小只有 2 ,所以再释放了 2 个连接后,后面的 3 个连接会因为资源池满了而释放不了,一会我们看下输出的打印信息就可以看到。
最后这个资源连接池使用完之后,我们要关闭资源池,使用资源池的Close方法即可。
2017/04/17 22:25:08 Acquire:新生成资源
2017/04/17 22:25:08 Acquire:新生成资源
2017/04/17 22:25:08 Acquire:新生成资源
2017/04/17 22:25:08 Acquire:新生成资源
2017/04/17 22:25:08 Acquire:新生成资源
2017/04/17 22:25:08 第2个查询,使用的是ID为4的数据库连接
2017/04/17 22:25:08 资源释放到池子里了
2017/04/17 22:25:08 第4个查询,使用的是ID为1的数据库连接
2017/04/17 22:25:08 资源释放到池子里了
2017/04/17 22:25:08 第3个查询,使用的是ID为5的数据库连接
2017/04/17 22:25:08 资源池满了,释放这个资源吧
2017/04/17 22:25:08 关闭连接 5
2017/04/17 22:25:09 第1个查询,使用的是ID为3的数据库连接
2017/04/17 22:25:09 资源池满了,释放这个资源吧
2017/04/17 22:25:09 关闭连接 3
2017/04/17 22:25:09 第0个查询,使用的是ID为2的数据库连接
2017/04/17 22:25:09 资源池满了,释放这个资源吧
2017/04/17 22:25:09 关闭连接 2
2017/04/17 22:25:09 开始关闭资源池
2017/04/17 22:25:09 关闭连接 4
2017/04/17 22:25:09 关闭连接 1
到这里,我们已经完成了一个资源池的管理,并且进行了使用测试。
资源对象池的使用比较频繁,因为我们想把一些对象缓存起来,以便使用,这样就会比较高效,而且不会经常调用GC,为此Go为我们提供了原生的资源池管理,防止我们重复造轮子,这就是sync.Pool,我们看下刚刚我们的例子,如果用sync.Pool实现。
package mainimport (
"log"
"math/rand"
"sync"
"sync/atomic"
"time")const (
//模拟的最大goroutine
maxGoroutine = 5)func main() {
//等待任务完成
var wg sync.WaitGroup
wg.Add(maxGoroutine)
p:=&sync.Pool{
New:createConnection,
}
//模拟好几个goroutine同时使用资源池查询数据
for query := 0; query < maxGoroutine; query++ {
go func(q int) {
dbQuery(q, p)
wg.Done()
}(query)
}
wg.Wait()}//模拟数据库查询
func dbQuery(query int, pool *sync.Pool) {
conn:=pool.Get().(*dbConnection)
defer pool.Put(conn)
//模拟查询
time.Sleep(time.Duration(rand.Intn(1000)) * time.Millisecond)
log.Printf("第%d个查询,使用的是ID为%d的数据库连接", query, conn.ID)}//数据库连接
type dbConnection struct {
ID int32//连接的标志}//实现io.Closer接口
func (db *dbConnection) Close() error {
log.Println("关闭连接", db.ID)
return nil}var idCounter int32//生成数据库连接的方法,以供资源池使用
func createConnection() interface{} {
//并发安全,给数据库连接生成唯一标志
id := atomic.AddInt32(&idCounter, 1)
return &dbConnection{ID:id}
}
进行微小的改变即可,因为系统库没有提供New这类的工厂函数,所以我们使用字面量创建了一个sync.Pool,注意里面的New字段,这是一个返回任意对象的方法,类似我们自己实现的资源池中的factory字段,意思都是一样的,都是当没有可用资源的时候,生成一个。
这里我们留意到系统的资源池是没有大小限制的,也就是说默认情况下是无上限的,受内存大小限制。
资源的获取和释放对应的方法是Get和Put,也很简洁,返回任意对象interface{}。
2017/04/17 22:42:43 第0个查询,使用的是ID为2的数据库连接
2017/04/17 22:42:43 第2个查询,使用的是ID为5的数据库连接
2017/04/17 22:42:43 第4个查询,使用的是ID为1的数据库连接
2017/04/17 22:42:44 第3个查询,使用的是ID为4的数据库连接
2017/04/17 22:42:44 第1个查询,使用的是ID为3的数据库连接
关于系统的资源池,我们需要注意的是它缓存的对象都是临时的,也就说下一次GC的时候,这些存放的对象都会被清除掉。
--结束END--
本文标题: Go语言之并发示例-Pool(二)
本文链接: https://lsjlt.com/news/42345.html(转载时请注明来源链接)
有问题或投稿请发送至: 邮箱/279061341@qq.com QQ/279061341
2024-10-23
2024-10-22
2024-10-22
2024-10-22
2024-10-22
2024-10-22
2024-10-22
2024-10-22
2024-10-22
2024-10-22
回答
回答
回答
回答
回答
回答
回答
回答
回答
回答
0