go 对象池化组件 bytebufferpool使用详解
作者:FfFJ 时间:2024-02-10 14:26:11
1. 针对问题
在编程开发的过程中,我们经常会有创建同类对象的场景,这样的操作可能会对性能产生影响,一个比较常见的做法是使用对象池,需要创建对象的时候,我们先从对象池中查找,如果有空闲对象,则从对象池中移除这个对象并将其返回给调用者使用,只有在池中无空闲对象的时候,才会真正创建一个新对象
另一方面,对于使用完的对象,我们并不会对它进行销毁,而是将它放回到对象池以供后续使用,使用对象池在频繁创建和销毁对象的情况下,能大幅的提升性能,同时为了避免对象池中的对象占用过多的内存,对象池一般还配有特定的清理策略,Go的标准库sync.Pool
就是这样一个例子,sync.Pool
中的对象会被垃圾回收清理掉
这类对象中,有一种比较特殊的是字节切片,在做字符串拼接的时候,为了拼接高效,我们通常将中间结果存放在一个字节缓冲中,拼接完之后,再从字节缓冲区生成字符串
Go标准库bytes.Buffer
封装字节切片,提供一些使用接口,我们知道切片的容量是有限的,容量不足时需要进行扩容,而频繁的扩容容易造成性能抖动
bytebufferpool
实现了自己的Buffer
类型,并引入一个简单的算法降低扩容带来的性能损失
2. 使用方法
bytebufferpool
的接入很轻量
func main() {
bf := bytebufferpool.Get()
bf.WriteString("Hello")
bf.WriteString(" World!!")
fmt.Println(bf.String())
}
上面的这种用法使用的是defaultPool
,bytebufferpool
的Pool
对象是公开的,也可以自行新建
3. 源码剖析
bytebufferpool
是如何做到最大程度减小内存分配和浪费的呢,先宏观的看整个Pool
的定义,然后细化到相关的方法,就可以找到答案
bytebufferpool
中Pool
结构体的定义为
type Pool struct {
calls [steps]uint64
calibrating uint64
defaultSize uint64
maxSize uint64
pool sync.Pool
}
其中calls
存储了某一个区间内不同大小对象的个数,calibrating
是一个标志位,标志当前Pool
是否在重新规划中,defaultSize
是元素新建时的默认大小,它的选取逻辑是当前calls
中出现次数最多的对象对应的区间最大值,这样可以防止从对象池中捞取之后的频繁扩容,maxSize
限制了放入Pool
中的最大元素的大小,防止因为一些很大的对象占用过多的内存
bytebufferpool
中定义了一些和defaultSize
及maxSize
计算相关的常量
const (
minBitSize = 6 // 2**6=64 is a CPU cache line size
steps = 20
minSize = 1 << minBitSize
maxSize = 1 << (minBitSize + steps - 1)
calibrateCallsThreshold = 42000
maxPercentile = 0.95
)
其中minBitSize
表示的是第一个区间对象大小的最大值(2的xx次方-1),在bytebufferpool
中,将对象大小分为20个区间,也就是steps
,第一个区间为[0, 2^6-1]
,第二个为[2^6, 2^7-1]
...,依此类推
calibrateCallsThreshold
表示如果某个区间内对象的数量超过这个阈值,则对Pool
中的变量进行重新的计算,maxPercentile
用于计算Pool
中的maxSize
,表示前95%
的元素大小
bytebufferpool
中的方法也比较少,核心的是Get
和Put
方法
Get
func (p *Pool) Get() *ByteBuffer {
v := p.pool.Get()
if v != nil {
return v.(*ByteBuffer)
}
return &ByteBuffer{
B: make([]byte, 0, atomic.LoadUint64(&p.defaultSize)),
}
}
可以看到,如果对象池中没有对象的话,会申请defaultSize
大小的切片返回
Put
func (p *Pool) Put(b *ByteBuffer) {
idx := index(len(b.B))
if atomic.AddUint64(&p.calls[idx], 1) > calibrateCallsThreshold {
p.calibrate()
}
maxSize := int(atomic.LoadUint64(&p.maxSize))
if maxSize == 0 || cap(b.B) <= maxSize {
b.Reset()
p.pool.Put(b)
}
}
Put方法会比较麻烦,我们分步来看
计算放入元素在
calls
数组中的位置
func index(n int) int {
n--
n >>= minBitSize
idx := 0
for n > 0 {
n >>= 1
idx++
}
if idx >= steps {
idx = steps - 1
}
return idx
}
这里的逻辑就是先将长度右移minBitSize
,如果依然大于0,则每次右移一位,idx加1,最后如果idx超出了总的steps
(20),则位置就在最后一个区间
判断当前区间放入元素的个数是否超过了
calibrateCallsThreshold
指定的阈值,超过则重新计算Pool
中元素的值
func (p *Pool) calibrate() {
// 如果正在重新计算,则返回,控制多并发
if !atomic.CompareAndSwapUint64(&p.calibrating, 0, 1) {
return
}
// 计算每一段区间中的元素个数 & 元素总个数
a := make(callSizes, 0, steps)
var callsSum uint64
for i := uint64(0); i < steps; i++ {
calls := atomic.SwapUint64(&p.calls[i], 0)
callsSum += calls
a = append(a, callSize{
calls: calls,
size: minSize << i,
})
}
// 按照对象元素的个数从大到小排序
sort.Sort(a)
// defaultSize 为内部切片的默认大小,减少扩容次数
// maxSize 限制放入pool中的最大元素大小
defaultSize := a[0].size
maxSize := defaultSize
// 将前95%元素中的最大size给maxSize
maxSum := uint64(float64(callsSum) * maxPercentile)
callsSum = 0
for i := 0; i < steps; i++ {
if callsSum > maxSum {
break
}
callsSum += a[i].calls
size := a[i].size
if size > maxSize {
maxSize = size
}
}
// 对defaultSize和maxSize进行赋值
atomic.StoreUint64(&p.defaultSize, defaultSize)
atomic.StoreUint64(&p.maxSize, maxSize)
atomic.StoreUint64(&p.calibrating, 0)
}
判断当前放入元素的大小是否超过了
maxSize
,超过则不放入对象池中
来源:https://juejin.cn/post/7150528125841965093