Netty源码-02-FastThreadLocalThread

对Java Thread做的优化。

一 Demo

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
public class FastThreadLocalTest00 {

private final static FastThreadLocal<Long> v = new FastThreadLocal<Long>() {
@Override
protected Long initialValue() throws Exception {
System.out.println("init");
return 0L;
}
};

public static void main(String[] args) throws InterruptedException {
new FastThreadLocalThread(() -> {
System.out.println("fast1 v1=" + v.get());
v.set(1L);
System.out.println("fast1 v2=" + v.get());
v.remove();
System.out.println("fast1 v3=" + v.get());
}).start();

new FastThreadLocalThread(() -> {
System.out.println("fast2 v1=" + v.get());
v.set(2L);
System.out.println("fast2 v2=" + v.get());
v.remove();
System.out.println("fast2 v3=" + v.get());
}).start();

Thread.sleep(3_000);
}
}

二 FastThreadLocal

1
2
3
4
5
6
7
// FastThreadLocal.java

private final int index; // 指向InternalThreadLocalMap中数组下一个可用脚标

public FastThreadLocal() {
this.index = InternalThreadLocalMap.nextVariableIndex(); // index初始化时为默认值0 把0处的slot预留出来 从1开始可用 此时InternalThreadLocalMap里面的数组并没有实例化
}
1
2
3
4
5
6
7
8
9
// InternalThreadLocalMap.java
public static int nextVariableIndex() {
int index = nextIndex.getAndIncrement();
if (index < 0) {
nextIndex.decrementAndGet();
throw new IllegalStateException("too many thread-local indexed variables");
}
return index;
}

三 get方法

1
2
3
4
5
6
7
8
9
10
11
12
// FastThreadLocal.java
public final V get() {
InternalThreadLocalMap threadLocalMap = InternalThreadLocalMap.get(); // Netty自己封装的数据结构 不是直接用的Jdk原生的threadLocals指向的ThreadLocalMap 懒加载触发InternalThreadLocalMap中的数组初始化
Object v = threadLocalMap.indexedVariable(this.index);
if (v != InternalThreadLocalMap.UNSET) {
return (V) v;
}
/**
* 首次调用get没有数据的回调initialValue方法
*/
return initialize(threadLocalMap);
}

1 InternalThreadLocalMap懒加载

1
2
3
4
5
6
7
8
public static InternalThreadLocalMap get() { // 懒加载 InternalThreadLocalMap使用数组存储元素 初始化默认长度32 全部用UNSET标识占位 脚标从1开始可用
Thread thread = Thread.currentThread(); // Netty封装了FastThreadLocalThread 根据线程类型区分数据存储策略
if (thread instanceof FastThreadLocalThread) { // 配套Netty封装的FastThreadLocalThread使用
return fastGet((FastThreadLocalThread) thread);
} else { // 配套Jdk的Thread使用
return slowGet();
}
}
1
2
3
4
5
6
7
private static InternalThreadLocalMap fastGet(FastThreadLocalThread thread) {
InternalThreadLocalMap threadLocalMap = thread.threadLocalMap();
if (threadLocalMap == null) {
thread.setThreadLocalMap(threadLocalMap = new InternalThreadLocalMap()); // 实例化InternalThreadLocalMap存储数据 默认长度32 全部用UNSET填充
}
return threadLocalMap;
}
1
2
3
private InternalThreadLocalMap() {
this.indexedVariables = newIndexedVariableTable(); // 初始化数组 默认容量32
}
1
2
3
4
5
private static Object[] newIndexedVariableTable() {
Object[] array = new Object[INDEXED_VARIABLE_TABLE_INITIAL_SIZE]; // 数组初始化 默认容量32
Arrays.fill(array, UNSET); // 全部用UNSET标识填充
return array;
}

2 数组元素设值

1
2
// FastThreadLocal.java
Object v = threadLocalMap.indexedVariable(this.index);
1
2
3
4
5
// InternalThreadLocalMap.java
public Object indexedVariable(int index) { // 根据数组脚标寻址
Object[] lookup = indexedVariables;
return index < lookup.length? lookup[index] : UNSET;
}

3 首次get没值触发回调初始值

首次调用get没有数据的回调initialValue方法

1
2
3
4
5
6
7
8
9
10
11
12
private V initialize(InternalThreadLocalMap threadLocalMap) {
V v = null;
try {
v = initialValue(); // 回调initialValue方法
} catch (Exception e) {
PlatformDependent.throwException(e);
}

threadLocalMap.setIndexedVariable(index, v); // 首次get无值时将initialValue方法的值放到数组
addToVariablesToRemove(threadLocalMap, this);
return v;
}

四 set方法

1
2
3
4
5
6
7
8
9
10
// FastThreadLocal.java
public final void set(V value) {
if (value != InternalThreadLocalMap.UNSET) { // 有效值
InternalThreadLocalMap threadLocalMap = InternalThreadLocalMap.get(); // 数据结构
setKnownNotUnset(threadLocalMap, value);
} else {
remove();
}
}

1
2
3
4
5
private void setKnownNotUnset(InternalThreadLocalMap threadLocalMap, V value) {
if (threadLocalMap.setIndexedVariable(index, value)) { // 向数组中放元素 脚标后移 容量不够触发数组扩容
addToVariablesToRemove(threadLocalMap, this);
}
}
1
2
3
4
5
6
7
8
9
10
11
12
// InternalThreadLocalMap.java
public boolean setIndexedVariable(int index, Object value) {
Object[] lookup = this.indexedVariables;
if (index < lookup.length) {
Object oldValue = lookup[index];
lookup[index] = value;
return oldValue == UNSET;
} else {
expandIndexedVariableTableAndSet(index, value); // 扩容
return true;
}
}

五 remove方法

1
2
3
public final void remove() {
remove(InternalThreadLocalMap.getIfSet());
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
public final void remove(InternalThreadLocalMap threadLocalMap) {
if (threadLocalMap == null) {
return;
}

Object v = threadLocalMap.removeIndexedVariable(index);
removeFromVariablesToRemove(threadLocalMap, this);

if (v != InternalThreadLocalMap.UNSET) {
try {
onRemoval((V) v);
} catch (Exception e) {
PlatformDependent.throwException(e);
}
}
}

Netty源码-02-FastThreadLocalThread
https://bannirui.github.io/2023/03/06/Netty源码-02-FastThreadLocalThread/
作者
dingrui
发布于
2023年3月6日
许可协议