Sync.Pool
Sync.Pool
需要提前了解GMPhttps://www.kancloud.cn/aceld/golang/1958305#2GolangGMP_2
简单来说就是Goroutine(Go协程): Thread(线程): Process(调度器)
不在详细展开了, 只针对Pool做一个简单的分析
使用
package mainimport "sync"type Instance struct {Id string
}func main() {// new a pool and Type is *Instancepool := sync.Pool{New: func() interface{} {return &Instance{Id: ""}},}// get from empty Poolins := pool.Get().(*Instance)ins.Id = "1"// after usage, put back to poolpool.Put(ins)// check if same with var insprint(pool.Get().(*Instance).Id)
}
结构体
Pool
type Pool struct {noCopy noCopylocal unsafe.Pointer // local fixed-size per-P pool, actual type is [P]poolLocallocalSize uintptr // size of the local arrayvictim unsafe.Pointer // local from previous cyclevictimSize uintptr // size of victims array// New optionally specifies a function to generate// a value when Get would otherwise return nil.// It may not be changed concurrently with calls to Get.New func() any
}
Pool是我们实际接触的数据, 其中包含了
-
local
是个数组,长度为 P 的个数。其元素类型是poolLocal
。这里面存储着各个 P 对应的本地对象池。可以近似的看做[P]poolLocal
。 -
localSize
。代表 local 数组的长度。因为 P 可以在运行时通过调用 runtime.GOMAXPROCS 进行修改, 因此我们还是得通过localSize
来对应local
数组的长度。 -
New
就是用户提供的创建对象的函数。这个选项也不是必需。当不填的时候,Get 有可能返回 nil。
poolLocal
// Local per-P Pool appendix.
type poolLocalInternal struct {private any // Can be used only by the respective P.shared poolChain // Local P can pushHead/popHead; any P can popTail.
}type poolLocal struct {poolLocalInternal// Prevents false sharing on widespread platforms with// 128 mod (cache line size) = 0 .pad [128 - unsafe.Sizeof(poolLocalInternal{})%128]byte
}
poolLocal
是和每个P
绑定的一个存储
-
private
是为了能够快速处理, 尤其是类似于经常性的出现 Get->Put->Get->Put时, 减少请求双链表存储的次数. -
shared
有两个作用, 第一是作为一个大容量的存储, 第二是其他的P
窃取.
这里稍微聊一下poolLocal
的pad
主要是为了加速CPU的Cache Line,使用的是缓存行Padding (谨慎使用)
不要忘记了, poolLocal
是一个Core
独占的, 那么这个时候, 防止其他的碎片数据一起塞入缓存行就有作用了, 即不会因为碎片数据的频繁更新而刷新Cache Line, 导致出现不命中导致的其他问题.
另外需要注意, 这里的poolChain
是结构体, 而不是指针, 原因是poolChain
是一个非常短小的结构体
type poolChain struct {head *poolChainElttail *poolChainElt
}
方法
Put
func (p *Pool) Put(x any) {// 检查x是否为空if x == nil {return}/* 这部分不用看, race.Enabled是用来指示是否启用了竞态检测器。当你使用 -race 标志编译你的Go程序时,竞态检测器会被启用,而这个变量会被设置为 true。通常只在调试或测试时使用,而不在生产环境下使用。if race.Enabled {if fastrandn(4) == 0 {// Randomly drop x on floor.return}race.ReleaseMerge(poolRaceAddr(x))race.Disable()}
*/// pin函数的作用我们稍后再聊, 简单来说就是获取和`P`绑定的poolLocall, _ := p.pin()// 因为是在同一个线程下执行的// 所以没有一些互斥锁等等// 如果能够直接放入private, 则直接对private赋值if l.private == nil {l.private = x} else {// 塞入缓存队列中.l.shared.pushHead(x)}// unpin操作runtime_procUnpin()if race.Enabled {race.Enable()}
}
Put
主要有这么几步
- 绑定G -> P
- 获取P的id
- 根据pid获取p对应的poolLocal
- 优先直接放入private, 其次放入缓存列表中
Get
func (p *Pool) Get() any {
/*if race.Enabled {race.Disable()}
*/// 绑定G -> P, 并返回P的poolLocall, pid := p.pin()// 优先使用privatex := l.private// 不管最后有没有用到, private一定是nill.private = nil// 如果为nilif x == nil {// Try to pop the head of the local shard. We prefer// the head over the tail for temporal locality of// reuse.// 尝试从缓存列表中得到一个// 尝试从head弹出元素,而不是尾部,这种偏好是基于时间局部性原理。x, _ = l.shared.popHead()// 如果得不到if x == nil {// 尝试从其他P中获取, 如果没法获取, 则尝试从victim中获取x = p.getSlow(pid)}}runtime_procUnpin()
/*if race.Enabled {race.Enable()if x != nil {race.Acquire(poolRaceAddr(x))}}
*/// 如果没有缓存中没有任何对象, 但是有New函数, 那么尝试直接New一个if x == nil && p.New != nil {x = p.New()}// 返回x, 需要注意x存在为nil的可能性return x
}
总结
Get
主要有这么几步
- 绑定G -> P
- 获取P的id
- 根据pid获取p对应的poolLocal
- 优先使用private
- 其次使用自己的缓存列表
- 再次使用尝试从其他的P里的poolLocal中获取一个
- 还不行就从vivtim中复用一个
- 如果Pool里没有任何可用对象, New
Other
pin
pin将当前goroutine绑定到P,禁用抢占,并返回P的poolLocal池和P的id。
调用者在使用完池后必须调用runtime_procUnpin()。
实现
func (p *Pool) pin() (*poolLocal, int) {// 调用runtime_procPin函数将当前的goroutine固定到一个P上,并获取该P的id。pid := runtime_procPin()// 加载p.localSize和p.locals := runtime_LoadAcquintptr(&p.localSize) // load-acquirel := p.local // load-consume// 检查pid是否小于p.localSize。if uintptr(pid) < s {// 如果是,则返回对应的poolLocal和pid。return indexLocal(l, pid), pid}// 如果是,则返回对应的poolLocal和pid。return p.pinSlow()
}
这里做一些解释, pid
通常是 0 - GOMAXPROCS
的一个值, 用来标记是哪一个线程
localSize
的值一般来说就等于GOMAXPROCS
如果uintptr(pid) < s
, 就代表着此时的poolLocal
已经被初始化过, 那么此时就可以直接返回.
反之就必须要做初始化的工作.
indexLocal
这个函数的作用比较简单, 主要是根据原始指针 + pid偏移, 计算出真正的poolLocal
func indexLocal(l unsafe.Pointer, i int) *poolLocal {lp := unsafe.Pointer(uintptr(l) + uintptr(i)*unsafe.Sizeof(poolLocal{}))return (*poolLocal)(lp)
}
pinSlow
func (p *Pool) pinSlow() (*poolLocal, int) {// 调用pinSlow前, 必定调用了pin, 暂时释放, 因为在pin的过程中, 无法对互斥锁进行操作runtime_procUnpin()// 初始化local, 禁止并发访问allPoolsMu.Lock()defer allPoolsMu.Unlock()// 重新pinpid := runtime_procPin()// poolCleanup 在pin的过程中, GC不会调用// double checks := p.localSizel := p.localif uintptr(pid) < s {return indexLocal(l, pid), pid}// 为了GC时能够管理所有的pool, 会将p放入管理队列中if p.local == nil {allPools = append(allPools, p)}// If GOMAXPROCS changes between GCs, we re-allocate the array and lose the old one.// 获取GOMAXPROCS, 表示最多有多少`P`size := runtime.GOMAXPROCS(0)// 初始化 poollocal := make([]poolLocal, size)// 之所以只用atomic, 可能是为了防止在多核环境下出现cache不一致问题.atomic.StorePointer(&p.local, unsafe.Pointer(&local[0])) // store-releaseruntime_StoreReluintptr(&p.localSize, uintptr(size)) // store-release// 返回该`P`所对应的poolLocal, 以及pidreturn &local[pid], pid
}
getSlow
func (p *Pool) getSlow(pid int) any {// 获取p的数量size := runtime_LoadAcquintptr(&p.localSize) // load-acquirelocals := p.local // load-consume// Try to steal one element from other procs.// 尝试从其他的P的缓存中偷取一个for i := 0; i < int(size); i++ {l := indexLocal(locals, (pid+i+1)%int(size))if x, _ := l.shared.popTail(); x != nil {return x}}// Try the victim cache. We do this after attempting to steal// from all primary caches because we want objects in the// victim cache to age out if at all possible.// 尝试从victim中复用一个size = atomic.LoadUintptr(&p.victimSize)// 如果victim中也没有, 直接返回if uintptr(pid) >= size {return nil}locals = p.victiml := indexLocal(locals, pid)if x := l.private; x != nil {l.private = nilreturn x}for i := 0; i < int(size); i++ {l := indexLocal(locals, (pid+i)%int(size))if x, _ := l.shared.popTail(); x != nil {return x}}// Mark the victim cache as empty for future gets don't bother// with it.// 如果victim中也没有, 直接将其标记为0, 防止其他的P又一次的遍历vivtimatomic.StoreUintptr(&p.victimSize, 0)return nil
}
拓展阅读 poolChain
结构体
poolChain
type poolChain struct {// head is the poolDequeue to push to. This is only accessed// by the producer, so doesn't need to be synchronized.// 只有生产者会操作head, 所以不存在并发竞争head *poolChainElt// tail is the poolDequeue to popTail from. This is accessed// by consumers, so reads and writes must be atomic.// 可能存在多个消费者使用tail, 因此必须保证原子性的处理tailtail *poolChainElt
}
poolDequeue
type poolDequeue struct {// headTail packs together a 32-bit head index and a 32-bit// tail index. Both are indexes into vals modulo len(vals)-1.//// tail = index of oldest data in queue// head = index of next slot to fill//// Slots in the range [tail, head) are owned by consumers.// A consumer continues to own a slot outside this range until// it nils the slot, at which point ownership passes to the// producer.//// The head index is stored in the most-significant bits so// that we can atomically add to it and the overflow is// harmless.// 将head 和 tail 封装成一个, 可以更好的保证原子性headTail uint64// vals is a ring buffer of interface{} values stored in this// dequeue. The size of this must be a power of 2.// vals[i].typ is nil if the slot is empty and non-nil// otherwise. A slot is still in use until *both* the tail// index has moved beyond it and typ has been set to nil. This// is set to nil atomically by the consumer and read// atomically by the producer.// 实际存储的示例 (指针)vals []eface
}
poolChainElt
type poolChainElt struct {poolDequeue// next and prev link to the adjacent poolChainElts in this// poolChain.//// next is written atomically by the producer and read// atomically by the consumer. It only transitions from nil to// non-nil.//// prev is written atomically by the consumer and read// atomically by the producer. It only transitions from// non-nil to nil.next, prev *poolChainElt
}
eface
type eface struct {typ, val unsafe.Pointer
}
注意
在 Go 语言中,interface{} 是一个空接口,它可以接受任何类型的值。
然而,当我们需要存储一个 interface{} 类型的值时,实际上我们需要存储两个信息:值的类型和值本身。
这是因为 Go 语言的接口是静态类型的,即使是空接口也需要知道值的具体类型。
eface 结构体被用来表示一个空接口的值。它有两个字段:typ 和 val,分别用来存储值的类型和值本身。
这样做的好处是可以显式地管理这两个信息,而不是依赖 Go 语言的运行时系统来管理。
这对于实现低级别的并发数据结构(如poolChain
的无锁队列)是非常有用的,因为这样可以更精细地控制内存的使用和同步。
此外,使用 unsafe.Pointer 可以避免 Go 语言的垃圾收集器误判这些值仍然在使用。
当一个值被从队列中移除时,它的 typ 字段会被设置为 nil,这样 Go 语言的垃圾收集器就知道这个值不再被使用,可以安全地回收它。
总的来说,使用 eface 结构体而不是直接使用 interface{} 可以提供更精细的控制,这对于实现高效的并发数据结构是非常重要的。
type poolChainElt struct {poolDequeue// next and prev link to the adjacent poolChainElts in this// poolChain.//// next is written atomically by the producer and read// atomically by the consumer. It only transitions from nil to// non-nil.//// prev is written atomically by the consumer and read// atomically by the producer. It only transitions from// non-nil to nil.next, prev *poolChainElt
}type poolDequeue struct {// headTail packs together a 32-bit head index and a 32-bit// tail index. Both are indexes into vals modulo len(vals)-1.//// tail = index of oldest data in queue// head = index of next slot to fill//// Slots in the range [tail, head) are owned by consumers.// A consumer continues to own a slot outside this range until// it nils the slot, at which point ownership passes to the// producer.//// The head index is stored in the most-significant bits so// that we can atomically add to it and the overflow is// harmless.headTail uint64// vals is a ring buffer of interface{} values stored in this// dequeue. The size of this must be a power of 2.//// vals[i].typ is nil if the slot is empty and non-nil// otherwise. A slot is still in use until *both* the tail// index has moved beyond it and typ has been set to nil. This// is set to nil atomically by the consumer and read// atomically by the producer.vals []eface
}
方法
poolChain.pushHead
func (c *poolChain) pushHead(val any) {d := c.head// 如果为初始化if d == nil {// Initialize the chain.const initSize = 8 // Must be a power of 2d = new(poolChainElt)d.vals = make([]eface, initSize)c.head = d// 初始化一个, 并且原子性的设置head以及tailstorePoolChainElt(&c.tail, d)}// 如果能够写入当前的dequeue中, 则直接返回if d.pushHead(val) {return}// The current dequeue is full. Allocate a new one of twice// the size.// 如果dequeue已经满了// 第二层的dequeue直接翻倍newSize := len(d.vals) * 2// 一层最大有1 << 30个if newSize >= dequeueLimit {// Can't make it any bigger.newSize = dequeueLimit}// 初始化下一层d2 := &poolChainElt{prev: d}d2.vals = make([]eface, newSize)c.head = d2storePoolChainElt(&d.next, d2)// 刚初始化, 必定可以直接放入d2.pushHead(val)
}
当head已经填满了, 就会生成的head, 同时修改当前的head为创建的, 并将新创建的head的prev设为之前的head
poolChain.popHead
func (c *poolChain) popHead() (any, bool) {// 获取当前的head poolChainEltd := c.headfor d != nil {// 尝试从当前的head的dequeue中pop一个if val, ok := d.popHead(); ok {return val, ok}// 如果没法成功, 则尝试// There may still be unconsumed elements in the// previous dequeue, so try backing up.d = loadPoolChainElt(&d.prev)}return nil, false
}
需要注意的是目前的实现会有一个问题, 那就是在批量push, 又批量pop时, 可能会频繁的调用loadPoolChainElt(&d.prev)
另外一个问题是, 可能会导致c.head.prev中出现大量的空poolDequeue
对于一些比较大量的使用pool的程序来说, 可能会引入一些问题.
一个可能的做法是
func (c *poolChain) popHead() (any, bool) {d := c.headfor d != nil {if val, ok := d.popHead(); ok {return val, ok}// There may still be unconsumed elements in the// previous dequeue, so try backing up.prev := loadPoolChainElt(&d.prev)// when try to load d.prev, that means the current dequeue is empty// try remove the current dequeue from the chain// and try to load the previous dequeue// atomic do this{d.prev.next = d.nextd.next.prev = d.prevd.next = nild.prev = nil}d = prev}return nil, false
}
但是很难在无锁的情况下实现, 而且可能引入更加复杂的处理.
popTail
func (c *poolChain) popTail() (any, bool) {// 获取taild := loadPoolChainElt(&c.tail)// 如果没有tail, 即没有初始化过, 直接返回if d == nil {return nil, false}for {// It's important that we load the next pointer// *before* popping the tail. In general, d may be// transiently empty, but if next is non-nil before// the pop and the pop fails, then d is permanently// empty, which is the only condition under which it's// safe to drop d from the chain.// 获取d 的 nextd2 := loadPoolChainElt(&d.next)// 尝试popTailif val, ok := d.popTail(); ok {return val, ok}// 如果tail poolDequeue已经空了, 则返回falseif d2 == nil {// This is the only dequeue. It's empty right// now, but could be pushed to in the future.return nil, false}// The tail of the chain has been drained, so move on// to the next dequeue. Try to drop it from the chain// so the next pop doesn't have to look at the empty// dequeue again.// if atomic.CompareAndSwapPointer((*unsafe.Pointer)(unsafe.Pointer(&c.tail)), unsafe.Pointer(d), unsafe.Pointer(d2)) {// We won the race. Clear the prev pointer so// the garbage collector can collect the empty// dequeue and so popHead doesn't back up// further than necessary.storePoolChainElt(&d2.prev, nil)}d = d2}
}
poolChain.popTail
是 poolChain
结构体的一个方法,它的主要作用是从队列的尾部移除并返回一个元素。如果队列为空,它将返回 false
。这个方法可以被任意数量的消费者调用。
以下是 poolChain.popTail
方法的详细步骤:
- 首先,它通过
loadPoolChainElt(&c.tail)
获取队列的尾部元素d
。如果d
为nil
,则表示队列为空,直接返回nil, false
。 - 然后,它进入一个无限循环,尝试从队列的尾部弹出一个元素。在每次循环中,它首先在弹出尾部元素之前加载下一个指针
d2
。这是因为在一般情况下,d
可能会暂时为空,但如果在弹出操作之前next
是非nil
的,并且弹出操作失败,那么d
就会永久性地为空。这是从链中删除d
的唯一条件。 - 接着,它尝试通过
d.popTail()
方法从d
的尾部弹出一个元素。如果成功,那么它将返回弹出的元素和true
。 - 如果
d2
为nil
,那么这就是唯一的队列。虽然现在它是空的,但是未来可能会有元素被推入,所以返回nil, false
。 - 如果队列的尾部已经被清空,那么就移动到下一个队列
d2
。尝试从链中删除它,这样下一次弹出操作就不必再看到空的队列。这是通过atomic.CompareAndSwapPointer
实现的。 - 如果我们赢得了比赛,那么就清除
d2.prev
指针,这样垃圾收集器就可以收集空的队列,而且popHead
不必再回退更多。 - 最后,将
d
设置为d2
,然后进入下一次循环。
注意
poolDequeue
是一个无锁的固定大小的单生产者,多消费者队列。它有三个主要的方法:pushHead
,popHead
和 popTail
。
-
pushHead(val any) bool
方法:
这个方法将一个元素添加到队列的头部。如果队列已满,它将返回 false
。这个方法只能由单个生产者调用。
它首先获取当前的头部和尾部索引,然后检查队列是否已满。如果队列已满,它将返回 false
。否则,它将获取头部索引对应的槽位,并检查该槽位是否已被 popTail
方法释放。如果该槽位还未被释放,那么队列实际上仍然是满的,因此返回 false
。
如果头部槽位是空的,那么生产者就拥有了这个槽位,并将值存入该槽位。然后,它将头部索引加一,这将把槽位的所有权传递给 popTail
方法,并作为写入槽位的存储屏障。
-
popHead() (any, bool)
方法:
这个方法从队列的头部移除并返回一个元素。如果队列为空,它将返回 false
。这个方法只能由单个生产者调用。
它首先获取当前的头部和尾部索引,然后检查队列是否为空。如果队列为空,它将返回 nil, false
。否则,它将头部索引减一,并尝试获取头部索引对应的槽位。如果成功,那么它就拥有了这个槽位,并从槽位中读取出值。
然后,它将槽位清零。由于这个方法不与 pushHead
方法竞争,所以这里不需要特别小心。
-
popTail() (any, bool)
方法:
这个方法从队列的尾部移除并返回一个元素。如果队列为空,它将返回 false
。这个方法可以被任意数量的消费者调用。
它首先获取当前的头部和尾部索引,然后检查队列是否为空。如果队列为空,它将返回 nil, false
。否则,它将尾部索引加一,并尝试获取尾部索引对应的槽位。如果成功,那么它就拥有了这个槽位,并从槽位中读取出值。
然后,它将槽位的值字段清零,并将类型字段设置为 nil
,这将把槽位的所有权传递给 pushHead
方法,并告诉 pushHead
方法这个槽位已经不再使用,可以安全地回收它。
需要注意的是, popTail -> 完全释放slot并不是一个原子性操作.
所以pushHead
需要做两个操作:
- 查看是否能够获取该槽
- 查看
popTail
是不是已经释放了该槽 -
pushHead
和popHead
在同一时间只会有一个占用, 所以可以不考虑并发问题