前言
ThreadLocal被ThreadLocalMap中的entry的key弱引用,如果出現GC的情況時,
沒有被其他對象引用,會被回收,但是ThreadLocal對應的value卻不會回收,容易造成內存泄漏,這也間接導致了內存溢出以及數據假丟失;
那么問題來了,有沒有更高效的ThreadLocal有;
今天我們就來分析一波FastThreadLocalThread
一、FastThreadLocalThread源碼分析
Netty為了在某些場景下提高性能,改進了jdk ThreadLocal,Netty實現的FastThreadLocal 優化了Java 原生 ThreadLocal 的訪問速度,存儲速度。避免了檢測弱引用帶來的 value 回收難問題,和數組位置沖突帶來的線性查找問題,解決這些問題并不是沒有代價;
Netty實現的 FastThreadLocal 底層也是通過數組存儲 value 對象,與Java原生ThreadLocal使用自身作為Entry的key不同,FastThreadLocal通過保存數組的全局唯一下標,實現了對value的快速訪問。同時FastThreadLocal 也實現了清理對象的方法;
1、FastThreadLocalThread
在Netty中,要使用 FastThreadLocal 實現線程本地變量需要將線程包裝成 FastThreadLocalThread ,如果不是 FastThreadLocalThread ,會使用 slowThreadLocalMap的 ThreadLocal 來存儲變量副本;
- io.netty.util.concurrent.DefaultThreadFactory
- @Override
- public Thread newThread(Runnable r) {
- Thread t = newThread(FastThreadLocalRunnable.wrap(r), prefix + nextId.incrementAndGet());
- // 一般daemon為false,意思是不設置為守護線程
- if (t.isDaemon() != daemon) {
- t.setDaemon(daemon);
- }
- // 優先級 默認為5
- if (t.getPriority() != priority) {
- t.setPriority(priority);
- }
- return t;
- }
- protected Thread newThread(Runnable r, String name) {
- return new FastThreadLocalThread(threadGroup, r, name);
- }
FastThreadLocalThread 繼承自Thread類,有如下成員變量:
- io.netty.util.concurrent.FastThreadLocalThread
- // 任務執行完,是否清除FastThreadLocal的標記
- private final boolean cleanupFastThreadLocals;
- // 類似于Thread類中ThreadLocalMap,為了實現FastThreadLocal
- private InternalThreadLocalMap threadLocalMap;
2、 InternalThreadLocalMap
FastThreadLocalThread.threadLocalMap 是 InternalThreadLocalMap 對象實例。在第一次獲取FTL數據時,會初始化FastThreadLocalThread.threadLocalMap,調用的構造函數如下:
- private InternalThreadLocalMap() {
- //為了簡便,InternalThreadLocalMap父類
- //UnpaddedInternalThreadLocalMap不展開介紹
- super(newIndexedVariableTable());
- }
- //默認的數組大小為32,且使用UNSET對象填充數組
- //如果下標處數據為UNSET,則表示沒有數據
- private static Object[] newIndexedVariableTable() {
- Object[] array = new Object[32];
- Arrays.fill(array, UNSET);
- return array;
- }
為了避免寫時候影響同一cpu緩沖行的其他數據并發訪問,其使用了緩存行填充技術 (cpu 緩沖行填充),在類定義中聲明了如下long字段進行填充;
- //InternalThreadLocalMap
- // Cache line padding (must be public)
- // With CompressedOops enabled, an instance of this class should occupy at least 128 bytes.
- public long rp1, rp2, rp3, rp4, rp5, rp6, rp7, rp8, rp9;
FTL使用的數組下標是InternalThreadLocalMap中的靜態變量nextIndex統一遞增生成的:
- static final AtomicInteger nextIndex = new AtomicInteger();
- public static int nextVariableIndex() {
- //Netty中所有FTL數組下標都是通過遞增這個靜態變量實現的
- //采用靜態變量生成所有FTL元素在數組中的下標會造成一個問題,
- //會造成InternalThreadLocalMap中數組不必要的自動擴容
- int index = nextIndex.getAndIncrement();
- if (index < 0) {
- nextIndex.decrementAndGet();
- throw new IllegalStateException("too many thread-local indexed variables");
- }
- return index;
- }
InternalThreadLocalMap.nextVariableIndex()方法獲取FTL在該FastThreadLocalThread.threadLocalMap數組下標,因為InternalThreadLocalMap.nextVariableIndex() 使用靜態域 nextIndex 遞增維護所有FTL的下標,會造成后面實例化的 FTL 下標過大,如果FTL下標大于其對應 FastThreadLocalThread.threadLocalMap 數組的長度,會進行數組的自動擴容,如下:
- private void expandIndexedVariableTableAndSet(int index, Object value) {
- Object[] oldArray = indexedVariables;
- final int oldCapacity = oldArray.length;
- //下面復雜的實現是為了將newCapacity規范為最接近的一個2的指數,
- //這段代碼在早期的 jdk HashMap 中見過
- int newCapacity = index;
- newCapacity |= newCapacity >>> 1;
- newCapacity |= newCapacity >>> 2;
- newCapacity |= newCapacity >>> 4;
- newCapacity |= newCapacity >>> 8;
- newCapacity |= newCapacity >>> 16;
- newCapacity ++;
- Object[] newArray = Arrays.copyOf(oldArray, newCapacity);
- Arrays.fill(newArray, oldCapacity, newArray.length, UNSET);
- newArray[index] = value;
- indexedVariables = newArray;
- }
3、FastThreadLocal
構造函數:
有兩個重要的下標域,FTL不僅在FastThreadLocalThread.threadLocalMap中保存了用戶實際使用的value(在數組中的下標為index),還在數組中保存為了實現清理記錄的相關數據,也即下標variablesToRemoveIndex,一般情況 variablesToRemoveIndex = 0;因為variablesToRemoveIndex 是靜態變量,所以全局唯一;
- //如果在該FTL中放入了數據,也就實際調用了其set或get函數,會在
- //該FastThreadLocalThread.threadLocalMap數組的
- // variablesToRemoveIndex下標處放置一個IdentityHashMap,
- //并將該FTL放入IdentityHashMap中,在后續清理時會取出
- //variablesToRemoveIndex下標處的IdentityHashMap進行清理
- private static final int variablesToRemoveIndex = InternalThreadLocalMap.nextVariableIndex();
- //在threadLocalMap數組中存放實際數據的下標
- private final int index;
- public FastThreadLocal() {
- index = InternalThreadLocalMap.nextVariableIndex();
- }
用戶可擴展的函數:
- //初始化 value 函數
- protected V initialValue() throws Exception {
- return null;
- }
- //讓使用者在該FTL被移除時可以有機會做些操作。
- protected void onRemoval(@SuppressWarnings("UnusedParameters") V value) throws Exception { }
FastThreadLocalThread
cleanupFastThreadLocals 字段在 4.1 的最新版本中已經沒有在用到了
- /**
- * true,表示FTL會在線程結束時被主動清理 見 FastThreadLocalRunnable 類
- * false,需要將FTL放入后臺清理線程的隊列中
- */
- // This will be set to true if we have a chance to wrap the Runnable.
- //這個字段則用于標識該線程在結束時是否會主動清理FTL
- private final boolean cleanupFastThreadLocals;
- //次對象將在 第一次 FastThreadLocal.get 和 FastThreadLocal.set 時候創建
- private InternalThreadLocalMap threadLocalMap;
- public FastThreadLocalThread(Runnable target) {
- super(FastThreadLocalRunnable.wrap(target));
- cleanupFastThreadLocals = true;
- }
4、 set 方法
- public final void set(V value) {
- //判斷設置的 value 值是否是缺省值
- if (value != InternalThreadLocalMap.UNSET) {
- //獲取當前線程的 InternalThreadLocalMap , 如果當前線程為FastThreadLocalThread,那么直接通過threadLocalMap引用獲取
- //否則通過 jdk 原生的 threadLocal 獲取
- InternalThreadLocalMap threadLocalMap = InternalThreadLocalMap.get();
- //FastThreadLocal 對應的 index 下標的 value 替換成新的 value
- setKnownNotUnset(threadLocalMap, value);
- } else {
- //如果放置的對象為UNSET,則表示清理,會對該FTL進行清理,類似毒丸對象
- remove();
- }
- }
這里擴容方會調用 InternalThreadLocalMap.expandIndexedVariableTableAndSet
- private void setKnownNotUnset(InternalThreadLocalMap threadLocalMap, V value) {
- //在數組下標index處放置實際對象,如果index大于數組length,會進行數組擴容.
- if (threadLocalMap.setIndexedVariable(index, value)) {
- //放置成功之后,將該FTL加入到 variablesToRemoveIndex 下標的
- //IdentityHashMap,等待后續清理
- addToVariablesToRemove(threadLocalMap, this);
- }
- }
- /**
- * 該FTL加入到variablesToRemoveIndex下標的IdentityHashMap
- * IdentityHashMap的特性可以保證同一個實例不會被多次加入到該位置
- */
- @SuppressWarnings("unchecked")
- private static void addToVariablesToRemove(InternalThreadLocalMap threadLocalMap, FastThreadLocal<?> variable) {
- //獲取 variablesToRemoveIndex下標處的 IdentityHashMap
- Object v = threadLocalMap.indexedVariable(variablesToRemoveIndex);
- Set<FastThreadLocal<?>> variablesToRemove;
- //如果是第一次獲取,則 variablesToRemoveIndex下標處的值為 UNSET
- if (v == InternalThreadLocalMap.UNSET || v == null) {
- //新建一個新的 IdentityHashMap 并
- variablesToRemove = Collections.newSetFromMap(new IdentityHashMap<FastThreadLocal<?>, Boolean>());
- //放入到下標variablesToRemoveIndex處
- threadLocalMap.setIndexedVariable(variablesToRemoveIndex, variablesToRemove);
- } else {
- variablesToRemove = (Set<FastThreadLocal<?>>) v;
- }
- //將該FTL放入該IdentityHashMap中
- variablesToRemove.add(variable);
- }
下面看InternalThreadLocalMap.get()實現:
- public static InternalThreadLocalMap get() {
- Thread thread = Thread.currentThread();
- //首先看當前 thread 是否為FastThreadLocalThread實例
- //如果是的話,可以快速通過引用,獲取到其 threadLocalMap
- if (thread instanceof FastThreadLocalThread) {
- return fastGet((FastThreadLocalThread) thread);
- } else {
- //如果不是,則 jdk 原生慢速獲取到其 threadLocalMap
- return slowGet();
- }
- }
5、 get 方法
get方法極為簡單,實現如下:
- ===========================FastThreadLocal==========================
- public final V get() {
- InternalThreadLocalMap threadLocalMap = InternalThreadLocalMap.get();
- Object v = threadLocalMap.indexedVariable(index);
- if (v != InternalThreadLocalMap.UNSET) {
- return (V) v;
- }
- return initialize(threadLocalMap);
- }
首先獲取當前線程的map,然后根據 FastThreadLocal的index 獲取value,然后返回,如果是空對象,則通過 initialize 返回,initialize 方法會將返回值設置到 map 的槽位中,并放進 Set 中;
- initialize
- ============================FastThreadLocal==========================
- private V initialize(InternalThreadLocalMap threadLocalMap) {
- V v = null;
- try {
- //1、獲取初始值
- v = initialValue();
- } catch (Exception e) {
- throw new RuntimeException(e);
- }
- // 2、設置value到InternalThreadLocalMap中
- threadLocalMap.setIndexedVariables(index, v);
- // 3、添加當前的FastThreadLocal到InternalThreadLocalMap的Set<FastThreadLocal<?>>中
- addToVariablesToRemove(threadLocalMap, this);
- return v;
- }
- //初始化參數:由子類復寫
- protected V initialValue() throws Exception {
- return null;
- }
- 獲取 ThreadLocalMap
- 直接通過索引取出對象
- 如果為空那么調用初始化方法初始化
6、ftl的資源回收機制
netty中ftl的兩種回收機制回收機制:
自動:使用ftlt執行一個被FastThreadLocalRunnable wrap的Runnable任務,在任務執行完畢后會自動進行ftl的清理;
手動:ftl和InternalThreadLocalMap都提供了remove方法,在合適的時候用戶可以(有的時候也是必須,例如普通線程的線程池使用ftl)手動進行調用,進行顯示刪除;
- FastThreadLocalRunnable
- final class FastThreadLocalRunnable implements Runnable {
- private final Runnable runnable;
- @Override
- public void run() {
- try {
- runnable.run();
- } finally {
- FastThreadLocal.removeAll();
- }
- }
- static Runnable wrap(Runnable runnable) {
- return runnable instanceof FastThreadLocalRunnable
- ? runnable : new FastThreadLocalRunnable(runnable);
- }
- }
如果將線程執行的任務包裝成 FastThreadLocalRunnable,那么在任務執行完后自動刪除ftl的資源。
- ===============================FastThreadLocal===========================
- public static void removeAll() {
- // 獲取到map
- InternalThreadLocalMap threadLocalMap = InternalThreadLocalMap.getIfSet();
- if (threadLocalMap == null) {
- return;
- }
- try {
- // 獲取到Set<FastThreadLocal>集合
- Object v = threadLocalMap.indexedVariable(variablesToRemoveIndex);
- if (v != null && v != InternalThreadLocalMap.UNSET) {
- @SuppressWarnings("unchecked")
- Set<FastThreadLocal<?>> variablesToRemove = (Set<FastThreadLocal<?>>) v;
- // 將Set轉換為數組
- FastThreadLocal<?>[] variablesToRemoveArray =
- variablesToRemove.toArray(new FastThreadLocal[variablesToRemove.size()]);
- // 遍歷數組,刪除每一個FastThreadLocal對應的value
- for (FastThreadLocal<?> tlv: variablesToRemoveArray) {
- tlv.remove(threadLocalMap);
- }
- }
- } finally {
- // 刪除當前線程的InternalThreadLocalMap
- InternalThreadLocalMap.remove();
- }
- }
- public static void remove() {
- Thread thread = Thread.currentThread();
- if (thread instanceof FastThreadLocalThread) {
- // 將FastThreadLocalThread 內部的map置位null
- ((FastThreadLocalThread) thread).setThreadLocalMap(null);
- } else {
- // 將 ThreadLocal內部ThreadLocalMap 中的value置位null
- slowThreadLocalMap.remove();
- }
- }
remove方法:
- ===============================FastThreadLocal==========================
- private void remove() {
- remove(InternalThreadLocalMap.getIfSet());
- }
- private void remove(InternalThreadLocalMap threadLocalMap) {
- if (threadLocalMap == null) {
- return;
- }
- // 從 InternalThreadLocalMap 中刪除當前的FastThreadLocal對應的value并設UNSET
- Object v = threadLocalMap.removeIndexedVariable(index);
- // 從 InternalThreadLocalMap 中的Set<FastThreadLocal<?>>中刪除當前的FastThreadLocal對象
- removeFromVariablesToRemove(threadLocalMap, this);
- // 如果刪除的是有效值,則進行onRemove方法的回調
- if (v != InternalThreadLocalMap.UNSET) {
- try {
- // 回調子類復寫的onRemoved方法,默認為空實現
- onRemoved((V) v);
- } catch (Exception e) {
- throw new RuntimeException(e);
- }
- }
- }
總結
只有不斷的學習,不斷的找到自己的缺點才可以進步;
一起加油;
原文地址:https://mp.weixin.qq.com/s/WNfFAgnXHyBSbCGwo1pyJQ