并发笔记之CAS分析

CAS,Compare And Swap,即比较并交换。整个AQS同步组件、Atomic原子类操作等等都是以CAS实现的,甚至ConcurrentHashMap在1.8版本也调整为了CAS+synchronized。
Lock/synchronized是一种悲观锁,但大多数情况并没有大量的竞争,相对而言CAS是一种乐观锁策略,认为竞争很少发生,一旦发生则抛给处理方处理重试还是采取其他策略,由于
没有加锁带来的较高开销和加锁中的临界区限制,这种无锁机制比加锁具有更高的扩展性。

底层原理

CAS的思想很简单,有三个参数:内存值V、旧值A、要更新的值B。当且仅当内存值V等于旧的预期值A时才会将内存值V更新为B,否则什么都不干。
JUC下的atomic类是通过CAS来实现的,以AtomicInteger为例:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
public class AtomicInteger extends Number implements java.io.Serializable {
private static final long serialVersionUID = 6214790243416807050L;

// setup to use Unsafe.compareAndSwapInt for updates
private static final Unsafe unsafe = Unsafe.getUnsafe();
private static final long valueOffset;

static {
try {
valueOffset = unsafe.objectFieldOffset
(AtomicInteger.class.getDeclaredField("value"));
} catch (Exception ex) { throw new Error(ex); }
}
//使用volatile修饰保证修改 其他线程的可见性
private volatile int value;

public final int getAndIncrement() {
return unsafe.getAndAddInt(this, valueOffset, 1);
}

sun.misc.Unsafe是CAS的核心类,Java无法直接访问操作系统底层,通过Unsafe可以在内存级别获取操作对象内存中的数据。
unsafe.getAndAddInt实现了原子增的操作
1
2
3
4
5
6
7
8
9
10
public final int getAndAddInt(Object o, long offset, int delta) {
int v;
do {
v = getIntVolatile(o, offset);
} while (!compareAndSwapInt(o, offset, v, v + delta));
return v;
}
public final native boolean compareAndSwapInt(Object o, long offset,
int expected,
int x);

可以看到最终通过JNI完成本地调用来实现,看下openJDK实现unsafe.cpp
++
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
UNSAFE_ENTRY(jboolean, Unsafe_CompareAndSwapInt(JNIEnv *env, jobject unsafe, jobject obj, jlong offset, jint e, jint x))
UnsafeWrapper("Unsafe_CompareAndSwapInt");
oop p = JNIHandles::resolve(obj);
jint* addr = (jint *) index_oop_from_field_offset_long(p, offset);
return (jint)(Atomic::cmpxchg(x, addr, e)) == e;
UNSAFE_END

//atomic.cpp
jbyte Atomic::cmpxchg(jbyte exchange_value, volatile jbyte* dest, jbyte compare_value) {
assert(sizeof(jbyte) == 1, "assumption.");
uintptr_t dest_addr = (uintptr_t)dest;
uintptr_t offset = dest_addr % sizeof(jint);
volatile jint* dest_int = (volatile jint*)(dest_addr - offset);
jint cur = *dest_int;
jbyte* cur_as_bytes = (jbyte*)(&cur);
jint new_val = cur;
jbyte* new_val_as_bytes = (jbyte*)(&new_val);
new_val_as_bytes[offset] = exchange_value;
while (cur_as_bytes[offset] == compare_value) {
jint res = cmpxchg(new_val, dest_int, cur);
if (res == cur) break;
cur = res;
new_val = cur;
new_val_as_bytes[offset] = exchange_value;
}
return cur_as_bytes[offset];
}

以linux x86为例,MP表示multiprocessor即多处理器。最终根据具体的处理器架构转换成汇编指令来实现CAS.atomic_linux_x86.inline.hpp:
++
1
2
3
4
5
6
7
8
inline jint     Atomic::cmpxchg    (jint     exchange_value, volatile jint*     dest, jint     compare_value) {
int mp = os::is_MP();
__asm__ volatile (LOCK_IF_MP(%4) "cmpxchgl %1,(%3)"
: "=a" (exchange_value)
: "r" (exchange_value), "a" (compare_value), "r" (dest), "r" (mp)
: "cc", "memory");
return exchange_value;
}

这里的cmpxchg是x86和Intel架构中的compare and exchange指令。__asm__说明是ASM汇编,__volatile__禁止编译器优化
当多处理时需要前面加上lock指令。
1
2
// Adding a lock prefix to an instruction on MP machine
#define LOCK_IF_MP(mp) "cmp $0, " #mp "; je 1f; lock; 1: "

os::is_MP()判断当前系统是否为多核系统,如果是就给总线加锁,故同一芯片上的其他处理器就暂时不能通过总线访问内存,保证了该指令在多处理器环境下的原子性。

CAS的问题

ABA问题

CAS需要在操作值的时候检查下值有没有发生变化,如果没有发生变化则更新,但是如果一个值从A变为B,又从B变成A,那么CAS在进行检查的时候会发现它的值没有发生变化,
但实际发生了改变,这就是ABA问题。
解决思路:为变量加上版本号,每次修改版本号加一,目前JDK的atomic包里提供了AtomicStampedReference来解决ABA问题。这个类会检查当前引用是否等于预期的引用,
并且当前的标识是否等于预期标识,如果全部相等,则以原子的方式将该引用和该标识进行更新。

自旋时间过长

如果CAS一直不成功,则一直自旋 对CPU将带来极大的开销。

坚持原创技术分享,您的支持将鼓励我继续创作!