上一篇文章 探讨了一种手工管理对象的方式。这种方式的难点在于确定生命期,在正确的时机执行回收。 本文继续探讨生命期管理的可行手段。
最简单的场景
最简单的场景是,生命期只有数个顺序语句,这种场景不需要特别的处理:
func Copy(dst io.Writer, src io.Reader) (int, error) {
buf := getBytes(32 * 1024)
defer buf.Put()
return io.CopyBuffer(dst, src, buf)
}
如果生命期在大段代码的中间,可以用函数字面量的调用来构造一个函数体,并在其中调用defer:
func Copy(dst io.Writer, src io.Reader) (l int, err error) {
// ...
func() {
buf := getBytes(32 * 1024)
defer buf.Put()
l, err = io.CopyBuffer(dst, src, buf)
}()
// ...
return
}
管理多个分配的对象
如果一段逻辑内有多个对象的分配,并且这段逻辑的边界可以明确划分,那多个对象的生命期可以统一成一个,逻辑结束后统一回收。
首先可以引入一个生命期类型:
type Life struct {
ended int32
sync.Mutex
endOnce sync.Once
endFuncs []func()
}
它可以注册多个函数,在生命期结束时一一调用:
func (l *Life) OnEnd(fn func()) {
l.Lock()
l.endFuncs = append(l.endFuncs, fn)
l.Unlock()
}
func (l *Life) End() {
l.endOnce.Do(func() {
for _, fn := range l.endFuncs {
fn()
}
atomic.AddInt32(&l.ended, 1)
})
}
使用例子:
type Request struct {
Life
}
func Handle(req *Request) {
defer req.End()
res := make(chan []byte, 2)
go func() {
bs := getBytes(42)
req.OnEnd(bs.Put)
// ...
res <- bs.Bytes
}()
go func() {
bs2 := getBytes(42)
req.OnEnd(bs.Put)
// ...
res <- bs2.Bytes
}
sub := (<-res)[:20]
sub2 := (<-res)[20:]
// ...
}
如上例所示,bs 和 bs2 的生命期,就不是在一个函数体之内了,它们还会被 sub 和 sub2 引用。所以不能用简单的 defer 来实现回收。
这种场景,就没有必要一一考虑 sub、sub2 的生命期,只需要统一使用一个足够长的生命期,就可以了。
上例的做法是每个请求一个生命期,处理完请求就统一回收。
防止泄漏
如果用 getBytes 获得的对象,忘记调用 Put,那这个对象就会泄漏到自动回收里。 当然,这种泄漏对逻辑正确性是没有影响的,只不过没有起到减轻 GC 负担的作用而已。
要防范泄漏,首先是要检测出泄漏。 将 runtime.MemProfileRate 设为 1,可以使 mem profiler 记录每一次分配,这样就能用 runtime.MemProfile 得到精确的分配信息。 当然这样会稍微影响性能,所以只适用于开发期。
如果从分配信息里发现 getBytes 里的 make 处分配的内存(runtime.MemProfileRecord.AllocBytes)不断增加,那就说明有对象忘记调用 Put。
要找出未调用回收的对象,可以根据具体的业务特性,给对象设一个最大存活期,超过这个最大存活期而没有回收,就报错。
首先给 Life 类型增加 SetMax 方法:
func (l *Life) SetMax(d time.Duration) {
stack := debug.Stack()
time.AfterFunc(d, func() {
if atomic.LoadInt32(&l.ended) == 0 {
pt("%s\n", stack)
panic("live too long")
}
})
}
然后给 Bytes 类型加上 Life:
type Bytes struct {
*Life
Bytes []byte
class int
}
func getBytes(size int) Bytes {
class := 0
for size > bytesClasses[class] {
class++
if class == len(bytesClasses) {
break
}
}
if class == len(bytesClasses) {
return Bytes{
Life: new(Life),
Bytes: make([]byte, size),
class: -1,
}
}
return Bytes{
Life: new(Life),
Bytes: bytesPools[class].Get().([]byte)[:size],
class: class,
}
}
func (b Bytes) Put() {
if b.class == -1 {
return
}
bytesPools[b.class].Put(b.Bytes)
b.End()
}
这样就可以在怀疑泄漏的 getBytes 后面再调用 SetMax,提示出未回收的地方。 或者直接在 getBytes 里调用 SetMax,限制所有分配的对象的存活期。
当然,这个方法只适用于开发期,因为开销是比较大的。
上述是一种动态检测的方法,可能用静态的分析也能实现。但受限于本人能力,这方面就无法展开了。