Netty分布式ByteBuf使用的回收逻辑剖析

作者:向南是个万人迷 时间:2023-07-18 23:26:26 

前文传送门:ByteBuf使用subPage级别内存分配

ByteBuf回收

之前的章节我们提到过, 堆外内存是不受jvm垃圾回收机制控制的, 所以我们分配一块堆外内存进行ByteBuf操作时, 使用完毕要对对象进行回收, 这一小节, 就以PooledUnsafeDirectByteBuf为例讲解有关内存分配的相关逻辑

PooledUnsafeDirectByteBuf中内存释放的入口方法是其父类AbstractReferenceCountedByteBuf中的release方法:

@Override
public boolean release() {
    return release0(1);
}

这里调用了release0, 跟进去

private boolean release0(int decrement) {
   for (;;) {
       int refCnt = this.refCnt;
       if (refCnt < decrement) {
           throw new IllegalReferenceCountException(refCnt, -decrement);
       }
       if (refCntUpdater.compareAndSet(this, refCnt, refCnt - decrement)) {
           if (refCnt == decrement) {
               deallocate();
               return true;
           }
           return false;
       }
   }
}

 if (refCnt == decrement) 中判断当前byteBuf是否没有被引用了, 如果没有被引用, 则通过deallocate()方法进行释放

因为我们是以PooledUnsafeDirectByteBuf为例, 所以这里会调用其父类PooledByteBuf的deallocate方法:

protected final void deallocate() {
   if (handle >= 0) {
       final long handle = this.handle;
       this.handle = -1;
       memory = null;
       chunk.arena.free(chunk, handle, maxLength, cache);
       recycle();
   }
}

this.handle = -1表示当前的ByteBuf不再指向任何一块内存

memory = null这里将memory也设置为null

chunk.arena.free(chunk, handle, maxLength, cache)这一步是将ByteBuf的内存进行释放

recycle()是将对象放入的对象回收站, 循环利用

我们首先分析free方法

void free(PoolChunk<T> chunk, long handle, int normCapacity, PoolThreadCache cache) {
   //是否为unpooled
   if (chunk.unpooled) {
       int size = chunk.chunkSize();
       destroyChunk(chunk);
       activeBytesHuge.add(-size);
       deallocationsHuge.increment();
   } else {
       //那种级别的Size
       SizeClass sizeClass = sizeClass(normCapacity);
       //加到缓存里
       if (cache != null && cache.add(this, chunk, handle, normCapacity, sizeClass)) {
           return;
       }
       //将缓存对象标记为未使用
       freeChunk(chunk, handle, sizeClass);
   }
}

首先判断是不是unpooled, 我们这里是Pooled, 所以会走到else块中:

sizeClass(normCapacity)计算是哪种级别的size, 我们按照tiny级别进行分析

cache.add(this, chunk, handle, normCapacity, sizeClass)是将当前当前ByteBuf进行缓存

我们之前讲过, 再分配ByteBuf时首先在缓存上分配, 而这步, 就是将其缓存的过程, 跟进去:

boolean add(PoolArena<?> area, PoolChunk chunk, long handle, int normCapacity, SizeClass sizeClass) {
   //拿到MemoryRegionCache节点
   MemoryRegionCache<?> cache = cache(area, normCapacity, sizeClass);
   if (cache == null) {
       return false;
   }
   //将chunk, 和handle封装成实体加到queue里面
   return cache.add(chunk, handle);
}

首先根据根据类型拿到相关类型缓存节点, 这里会根据不同的内存规格去找不同的对象, 我们简单回顾一下, 每个缓存对象都包含一个queue, queue中每个节点是entry, 每一个entry中包含一个chunk和handle, 可以指向唯一的连续的内存

我们跟到cache中

private MemoryRegionCache<?> cache(PoolArena<?> area, int normCapacity, SizeClass sizeClass) {
   switch (sizeClass) {
   case Normal:
       return cacheForNormal(area, normCapacity);
   case Small:
       return cacheForSmall(area, normCapacity);
   case Tiny:
       return cacheForTiny(area, normCapacity);
   default:
       throw new Error();
   }
}

假设我们是tiny类型, 这里就会走到cacheForTiny(area, normCapacity)方法中, 跟进去:

private MemoryRegionCache<?> cacheForTiny(PoolArena<?> area, int normCapacity) {
   int idx = PoolArena.tinyIdx(normCapacity);
   if (area.isDirect()) {
       return cache(tinySubPageDirectCaches, idx);
   }
   return cache(tinySubPageHeapCaches, idx);
}

这个方法我们之前剖析过, 就是根据大小找到第几个缓存中的第几个缓存, 拿到下标之后, 通过cache去超相对应的缓存对象:  

private static <T>  MemoryRegionCache<T> cache(MemoryRegionCache<T>[] cache, int idx) {
   if (cache == null || idx > cache.length - 1) {
       return null;
   }
   return cache[idx];
}

我们这里看到, 是直接通过下标拿的缓存对象

回到add方法中

boolean add(PoolArena<?> area, PoolChunk chunk, long handle, int normCapacity, SizeClass sizeClass) {
   //拿到MemoryRegionCache节点
   MemoryRegionCache<?> cache = cache(area, normCapacity, sizeClass);
   if (cache == null) {
       return false;
   }
   //将chunk, 和handle封装成实体加到queue里面
   return cache.add(chunk, handle);
}

这里的cache对象调用了一个add方法, 这个方法就是将chunk和handle封装成一个entry加到queue里面

我们跟到add方法中:

public final boolean add(PoolChunk<T> chunk, long handle) {
   Entry<T> entry = newEntry(chunk, handle);
   boolean queued = queue.offer(entry);
   if (!queued) {
       entry.recycle();
   }
   return queued;
}

我们之前介绍过, 从在缓存中分配的时候从queue弹出一个entry, 会放到一个对象池里面, 而这里Entry<T> entry = newEntry(chunk, handle)就是从对象池里去取一个entry对象, 然后将chunk和handle进行赋值

然后通过queue.offer(entry)加到queue中

我们回到free方法中

void free(PoolChunk<T> chunk, long handle, int normCapacity, PoolThreadCache cache) {
   //是否为unpooled
   if (chunk.unpooled) {
       int size = chunk.chunkSize();
       destroyChunk(chunk);
       activeBytesHuge.add(-size);
       deallocationsHuge.increment();
   } else {
       //那种级别的Size
       SizeClass sizeClass = sizeClass(normCapacity);
       //加到缓存里
       if (cache != null && cache.add(this, chunk, handle, normCapacity, sizeClass)) {
           return;
       }
       freeChunk(chunk, handle, sizeClass);
   }
}

这里加到缓存之后, 如果成功, 就会return, 如果不成功, 就会调用freeChunk(chunk, handle, sizeClass)方法, 这个方法的意义是, 将原先给ByteBuf分配的内存区段标记为未使用

跟进freeChunk简单分析下:

void freeChunk(PoolChunk<T> chunk, long handle, SizeClass sizeClass) {
   final boolean destroyChunk;
   synchronized (this) {
       switch (sizeClass) {
       case Normal:
           ++deallocationsNormal;
           break;
       case Small:
           ++deallocationsSmall;
           break;
       case Tiny:
           ++deallocationsTiny;
           break;
       default:
           throw new Error();
       }
       destroyChunk = !chunk.parent.free(chunk, handle);
   }
   if (destroyChunk) {
       destroyChunk(chunk);
   }
}

我们再跟到free方法中:

boolean free(PoolChunk<T> chunk, long handle) {
   chunk.free(handle);
   if (chunk.usage() < minUsage) {
       remove(chunk);
       return move0(chunk);
   }
   return true;
}

chunk.free(handle)的意思是通过chunk释放一段连续的内存

再跟到free方法中:

void free(long handle) {
   int memoryMapIdx = memoryMapIdx(handle);
   int bitmapIdx = bitmapIdx(handle);

if (bitmapIdx != 0) {
       PoolSubpage<T> subpage = subpages[subpageIdx(memoryMapIdx)];
       assert subpage != null && subpage.doNotDestroy;
       PoolSubpage<T> head = arena.findSubpagePoolHead(subpage.elemSize);
       synchronized (head) {
           if (subpage.free(head, bitmapIdx & 0x3FFFFFFF)) {
               return;
           }
       }
   }
   freeBytes += runLength(memoryMapIdx);
   setValue(memoryMapIdx, depth(memoryMapIdx));
   updateParentsFree(memoryMapIdx);
}

 if (bitmapIdx != 0)这 里判断是当前缓冲区分配的级别是Page还是Subpage, 如果是Subpage, 则会找到相关的Subpage将其位图标记为0

如果不是subpage, 这里通过分配内存的反向标记, 将该内存标记为未使用

这段逻辑可以读者自行分析, 如果之前分配相关的知识掌握扎实的话, 这里的逻辑也不是很难

回到PooledByteBuf的deallocate方法中:

protected final void deallocate() {
   if (handle >= 0) {
       final long handle = this.handle;
       this.handle = -1;
       memory = null;
       chunk.arena.free(chunk, handle, maxLength, cache);
       recycle();
   }
}

最后, 通过recycle()将释放的ByteBuf放入对象回收站, 有关对象回收站的知识, 会在以后的章节进行剖析

来源:https://www.cnblogs.com/xiangnan6122/p/10205843.html

标签:Netty,分布式,ByteBuf,回收
0
投稿

猜你喜欢

  • Android webView加载数据时内存溢出问题及解决

    2021-06-24 13:19:28
  • C#图像伪彩色处理方法

    2022-09-23 10:51:52
  • Spring Boot Hello World的实现代码

    2023-10-13 17:45:01
  • 关于Mybatis的@param注解及多个传参

    2021-09-10 21:17:21
  • spring MVC中接口参数解析的过程详解

    2023-11-28 09:17:50
  • 三种Java自定义DNS解析器方法与实践

    2022-01-13 10:12:11
  • Java设计模式之工厂模式实现方法详解

    2023-11-26 07:55:51
  • SpringCloud eureka(server)微服务集群搭建过程

    2023-05-22 15:08:55
  • tcp、udp、ip协议分析_动力节点Java学院整理

    2023-05-17 18:00:17
  • 巧用Dictionary实现日志数据批量插入

    2022-03-10 12:31:05
  • XRecyclerView实现下拉刷新、滚动到底部加载更多等功能

    2023-11-07 06:40:55
  • Java ThreadPoolExecutor 线程池的使用介绍

    2021-06-28 12:40:35
  • 简单介绍三层架构工作原理

    2022-10-01 20:28:25
  • C#动态webservice调用接口

    2023-10-18 07:22:00
  • Android实现小米相机底部滑动指示器

    2023-03-12 04:57:00
  • java模拟TCP通信实现客户端上传文件到服务器端

    2023-11-26 10:14:49
  • Android仿微信选择图片和拍照功能

    2023-08-18 05:22:50
  • Android列表点击事件定义的一些思考

    2021-12-24 08:41:40
  • Java运算符从见过到掌握下

    2023-01-29 15:47:19
  • 浅谈Action+Service +Dao 功能

    2023-01-04 13:44:20
  • asp之家 软件编程 m.aspxhome.com