当前位置: 首页 > >

java 无锁并发_Java高并发程序设计(四)?? 无锁

发布时间:

一、无锁类的原理详解


无锁的概念就是无*诵校拚*侵杆械南叱潭寄芙肓俳缜匏谖拚*幕∩霞由狭艘惶蹙褪敲看尉赫厝挥幸惶跄芄皇こ觥@砺凵衔拚*锌赡芟叱潭际О埽晕匏惺悼尚校导衔拚*臀匏畈欢唷


(一)CAS(Compare And Swap)


CAS算法的过程是这样的,它包含三个参数CAS(V,E,N)。V表示要更新的变量,E表示预期值,N表示新值。仅当V值等于E值时,才会将V的值设为N,如果V值和E值不同,则说明已经有了其它线程做了更新,则当前线程什么都不做。最后CAS返回V的真实值,CAS操作是抱着乐观态度进行的,它总是认为自己可以成功完成操作。当多个线程同时使用CAS操作一个变量时,只有一个会胜出,并成功更新,其余均会失败,失败的线程不会被挂起,仅是被告知失败,并且允许再次尝试,当然也允许失败的线程放弃操作。基于这样的原理,CAS操作即使没有锁,也可以发现其它线程对当前线程的干扰,并进行恰当的处理。


CAS思路:现在要对数据进行赋值,这个数据是在临界区中,需要被保护的一个数据,因为我要对这个数据进行赋值,现在假如有多个线程同时进来,应该是只有一个线程是能够胜出的,那么怎么判断哪个线程是可以成功的,现在要求线程对数据操作时给出一个期望值 ,如果数据的实际值和期望值是相符的,那么就可以把数据设置下去 ,否则就只能失败,因为如果期望的数值和实际的数值不相符,那么就有可能数据在操作的间隙当中已经被其它线程修改过了,因为数据已经被其它线程修改过了,所以这一次就不能修改了,想修改这个数据要等到下一次。所以每一次CAS操作先去读一下当前值是多少,读出来之后再去做一次CAS操作,你把期望值跟你刚才读出来的数据做一个比较,如果比较成功,然后就把新的值设上去。CAS总是抱着乐观的态度进行操作,它认为自己很有可能是能够完成这个事情的,所以它不会觉得完不成怎么办,完不成要重试,CAS它认为这个重试的概率是很小的,所以就可以直接去做。


(二)CPU指令



所以不存在因为步骤多,而导致步骤之间被其它线程干扰的问题。


CAS整的一个操作过程是一个原子操作,它是由一条CPU指令完成,并不是把取数据,比较,设值是有好多条CPU指令完成,它只是有一条CPU指令完成。


例如cmpxchg指令,它的逻辑就是比较目标值跟我寄存器的值是不是相等,如果相等就设置一个跳转标志,并且把原始数据设置到目标里面,否则跳转标志就不设置了,所以CAS从指令层面保证它这个操作是可靠的,有效的。


二、无锁类的使用


Java当中提供了一些无锁类的使用,所谓无锁类就是它在底部使用比较交换指令来实现的。与阻塞的方式相比,一般认为这种有锁的方式因为有了锁,所以线程会阻塞,会挂起,甚至它在进入临界区之前,由系统对它进行的阻塞和挂起,相对来讲无锁的方式性能会更好一些,因为一般来说,除非你去人为的挂起这个线程,否则你通过无锁的方式这个线程是不能被挂起的,它只会不断的做重试。一个线程如果在操作系统层面被挂起,那么它做一次上下文交换大概需要8万个时钟周期,但是如果只是做一个重试操作,只需要很少的时钟周期,因此在这种情况下相当于在拿一个很小的成本去节省8万个时钟周期的成本,除非运气很差,一直重试不成功,但基本上都是节省时间提高效率的,无锁的方式要比阻塞的方式性能好很多,


(一)AtomicInteger


1. 概念


AtomicInteger类位于java.util.concurrent.atomic包下,继承于Number类,Integer类也继承于Number类,Number类表示一个数字。


2. 主要接口


(1)public final int get()?      //取得当前值


(2)public final void set(int newValue)      //设置当前值


(3)public final int getAndSet(int newValue)    //设置新值并返回旧值


(4)public final boolean compareAndSet(int expect, int update)  //如果当前值为expect,则设置为update


(5)public final int getAndIncrement()      //当前值加1,返回旧值


(6)public final int getAndDecrement()      //当前值减1,返回旧值


(7)public final int getAndAdd(int delta)      //当前值加delta,返回旧值


(8)public final int incrementAndGet()      //当前值加1,返回新值


(9)public final int decrementAndGet()      //当前值减1,返回新值


(10)public final int addAndGet(int delta)    //当前值增加delta,返回新值


3. 主要接口的实现



AtomicInteger类内部有一个非常重要的字段value,value字段其实是AtomicInteger封装的一个字段,也就是内部所有的操作都是对value做的,value才是内部的一个真正的值,atomic是对它的包装而已。



compareAndSet函数:参数expect代表期望值,参数update代表更新的新值,如果成功返回true,失败返回false。如果返回false代表实际的值和期望的值不一样。这里用到了unsafe操作,unsafe是一个不安全的操作。Java相对于C或C++更安全,是因为Java封装了指针的操作,而unsafe恰恰相反,在java相对于比较底层,提供一些类似指针的操作。unsafe.compareAndSwapInt(this,valueOffset,expect,update)函数是在这个类的valueOffset偏移量上看它的值是多少


在JDK1.7中getAndIncrement函数实现如下:



getAndIncrement函数首先取得当前值,然后再加1。


首先取得当前值(current),然后加1(next),因此加1的值一定是当前值基础上加1,不可能是已经改过的中间变量加1。然后比较compareAndSet,期望值是当前值,目标时next,如果我在做完加1操作后,有其它线程先我一步,修改了这个变量的值,那就会导致实际的current和期望的current不相符,所以这个设置必然失败。如果这个设置失败,compareAndSet函数返回false,走不到return,继续for循环,直到成功,返回当前值。


4.代码示例


1 importjava.util.concurrent.atomic.AtomicInteger;2


3 public classAtomicIntegerDemo {4 static AtomicInteger i = newAtomicInteger();5 public static class AddThread implementsRunnable{6


7 @Override8 public voidrun() {9 for(int k = 0; k < 10000; k++) {10 i.getAndIncrement();11 }12 }13


14 }15


16 public static void main(String[] args) throwsInterruptedException {17 Thread[] ts = new Thread[10];18 for(int m = 0; m < 10; m++) {19 ts[m] = new Thread(newAddThread());20 }21 for(int k = 0; k < 10; k++) {22 ts[k].start();23 }24 for(int k = 0; k < 10; k++) {25 ts[k].join();;26 }27 System.out.println(i);28 }29 }


(二)Unsafe


1.概述


非安全的操作,比如:根据偏移量设置值,park(),底层的CAS操作。


非公开的API,在不同版本的JDK中,可能有较大的差异。


根据偏移量设置值:



unsafe有一个函数objectFieldOffset,获得AtomicInteger这个类的对象的value字段在这个对象上的偏移量。根据AtomicInteger对象的基地址,加上偏移量,就可以拿到value字段的地址,进而可以对value字段进行操作。


park():把线程停下来


底层的CAS操作:compareAndSwap函数就是在unsafe中实现的。


2. 主要接口


(1)public native int getInt(Object o, long offset);  //获得给定对象偏移量上的int值


(2)public native void putInt(Object o, long offset, int x);  //设置给定对象偏移量上的int值


(3)public native long objectFieldOffset(Field f);  //获得字段在对象中的偏移量


(4)public native void putIntVolatile(Object o, long offset);  //设置给定对象的int值,使用volatile语义(其它线程马上能看到我的改动)


(5)public native int getIntVolatile(Object o, long offset);  //获得给定对象的int值,使用volatile语义(其它线程马上能看到我的改动)


(6)public native void putOrderedInt(Object o, long offset, int x);  //和putIntVolatile()一样,但是它要求被操作字段就是volatile类型的


(三)AtomicReference


1. 概述


对引用进行修改。


是一个模板类,抽象化了数据类型。


AtomicReference和AtomicInteger相比,AtomicInteger封装的是一个整数,AtomicReference封装的是一个对象的引用,只要对对象引用进行修改,就可以使用AtomicReference保证线程安全。


AtomicReference是一个模板,它带有模板变量V,表示它可以封装任意类型的数据。


2. 主要接口


(1)public final V get();  //获得当前值


(2)public final void set(V newValue);  //设置当前值为新值


(3)public final boolean compareAndSet(V expect, V update);  //如果当前值为expect,则设置为update


(4)public final V getAndSet(V newValue);  //设置新值newValue,返回旧值。


3.代码示例


1 importjava.util.concurrent.atomic.AtomicReference;2


3 public classAtomicReferenceTest {4 public final static AtomicReference atomicStr = new AtomicReference("abc");5


6 public static voidmain(String[] args) {7 for(int i = 0; i < 10; i++) {8 new Thread(""+i){9 public voidrun() {10 try{11 Thread.sleep((int)(Math.random()*100));12 } catch(InterruptedException e) {13 //TODO Auto-generated catch block


14 e.printStackTrace();15 }16 if(atomicStr.compareAndSet("abc", "def")) {17 System.out.println(Thread.currentThread().getName() + " successed!");18 } else{19 System.out.println(Thread.currentThread().getName() + " failed!");20 }21 }22 }.start();23 }24 }25 }


(四)AtomicStampedReference


1.概述


AtomicStampedReference也是一个对象的引用,但是加了stamped(邮戳,时间戳),也就是一个有唯一性标识的字段,时间戳可以认为是一个stamped,一个递增上去的不重复的数据可以认为是一个stamped,这是为了解决ABA问题。


ABA问题:有一个reference是A,然后我们把它改成了B,再然后把它改成了A。在这种情况下,有一个线程1首先拿到了reference是A,接着它会做一些自己额外的操作,比如说它会做一些计算,接着线程1开始准备赋值,这时候有另外一个线程2把A改成了B,又有一个线程把B改回到了A。线程1计算完毕后开始做CAS操作,它读了一下,发现还是A,它觉得这个数据没有被人改过,所以它就成功的把这个数据设置回去了,比如设置成C。这样一个过程,如果仅仅只是一个简单的加法,那可能问题不大,因为跟过程状态无关的,只跟最终结果相关,所以可以把数据设置上去,答案也不会错。但是有些情况之下,我们设置数据可能更它的过程状态相关,比如要给每个账户余额小于10元的用户充值10元,每个用户充值1次,不能因为花了这10元,再次充值,这是不行的,当你对数据的变化过程敏感的时候,是没有办法区分前面的A和后面的A,如果要区分,有一个办法就是给每一个对象都加上一个stamped,假设A为S,B为S+1,改后的A为S+2。如果要对数据进行赋值的时候,不仅仅要看A是不是A,还要看S是不是S,此时设置失败。确保在过程状态敏感的数据不会出现问题。


2.主要接口


(1)public boolean compareAndSet(V expectedReference, V newReference, int expectedStamp, int newStamp);  //比较设置参数依次为:期望值 写入新值 期望时间戳 新时间戳


(2)public V getReference();  //获得当前对象引用


(3)public int getStamp();  //获得当前时间戳


(4)public void set(V newReference, int newStamp);  //设置当前对象引用和时间戳


3.内部实现



AtomicStampedReference类内部封装了一个Pair,Pair内部首先是我们要用的数据reference,对于AtomicReference来讲,内部是一个value,现在把value做了一层封装,包装在Pair里面,另外还有一个stamp,stamp就是在应用层设置的,最好不要让它重复的一个数据,stamp决定了数据能不能设置成功。of方法是一个静态的工厂方法,它根据reference和stamp来生成一个Pair的实例。



当你去拿reference的时候,返回的是pair的reference。



当你去拿stamp的时候,返回的是pair的stamp。




当你去设置这个pair的时候,你需要提供期望的引用数据,新对象引用,期望的stamp值,新的stamp值。只有当期望的引用值等于当前的引用值,并且期望的stamp等于当前的stamp情况下,并且设置结果成功,返回true。


4.代码示例


1 importjava.util.concurrent.atomic.AtomicStampedReference;2


3 public classAtomicStampedReferenceDemo {4 static AtomicStampedReference money = new AtomicStampedReference(19, 0);5


6 public static voidmain(String[] args) {7 //模拟多个线程同时更新后台数据,为用户充值。


8 for(int i = 0; i < 3; i++) {9 final int timestamp =money.getStamp();10 newThread() {11 public voidrun() {12 while(true) {13 while(true) {14 Integer m =money.getReference();15 if(m < 20) {16 if(money.compareAndSet(m, m+20, timestamp, timestamp+1)) {17 System.out.println("余额小于20元,充值成功,余额:"


18 + money.getReference() + "元!");19 }20 }else{21 break;22 }23 }24 }25 }26 }.start();27 }28


29 //用户消费线程,模拟消费行为


30 newThread() {31 public voidrun() {32 for(int i = 0; i < 100; i++) {33 while(true) {34 int timestamp =money.getStamp();35 Integer m =money.getReference();36 if(m > 10) {37 System.out.println("大于10元");38 if(money.compareAndSet(m, m-10, timestamp, timestamp+1));39 System.out.println("成功消费10元,余额:" + money.getReference()+"元");40 } else{41 System.out.println("没有足够的金额");42 break;43 }44 try{45 Thread.sleep(100);46 }catch(Exception e) {47 //TODO: handle exception


48 }49 }50 }51 }52 }.start();53


54 }55 }


(五)AtomicIntegerArray


1.概述


支持无锁的数组


2.主要接口


public final int get(int i);  //获得数组第i个下标的元素


public final int length();  //获得数组的长度


public final int getAndSet(int i, int newValue);  //将数组第i个下标的值设置为newValue,并返回旧值


public final boolean compareAndSet(int i, int expect, int update);  //进行CAS操作,如果第i个下标的元素等于expect,则设置为update,设置成功返回true。


public final int getAndIncrement(int i);  //将第i个下标的元素加1


public final int getAndDecrement(int i);  //将第i个下标的元素减1


public final int getAndAdd(int i, int delta);  //将第i个下标的元素增加delta(delta可以是负数)


3.接口实现



内部封装了一个普通的数组array,也不需要声明为volatile。






当试图去get的时候,内部有一个方法是getRaw方法,取得array基地址上偏移量为offset的int值。checkedByteOffset取得第i个位置的元素在array上的偏移量。相对来讲比较高性能的实现会调用地址偏移量的方式。首先这个base是数组的基地址,也就是它第一个元素所在的基地址。如果是第i个,偏移了shift个地址。


LeadingZeros是前导零,一个数字把它换成二进制后,前面零的个数。我们整数是32位,例如整数4,0...0100,前面把零写满,正好是29个,arrayIndexScale是指数组中每一个元素有多宽,对int来讲,是4个byte,对int来讲,scale是4。所以前导零就是29,shift就是2。


每个元素的偏移量是:base+(i*4),用位运算表示就是:base+(i<<2)。


JDK大量使用位运算等高性能运算方式。


4.代码示例


1 importjava.util.concurrent.atomic.AtomicIntegerArray;2


3 public classAtomicIntegerArrayDemo {4 private static AtomicIntegerArray array = new AtomicIntegerArray(10);5 public static class AddThread implementsRunnable{6


7 @Override8 public voidrun() {9 for(int k = 0; k < 100000; k++) {10 array.getAndIncrement(k%array.length());11 }12 }13


14 }15


16 public static void main(String[] args) throwsInterruptedException {17 Thread[] ts = new Thread[10];18 for(int k = 0; k < 10; k++) {19 ts[k] = new Thread(newAddThread());20 }21 for(int k = 0; k < 10; k++) {22 ts[k].start();23 }24 for(int k = 0; k < 10; k++) {25 ts[k].join();26 }27 System.out.println(array);28 }29 }


(六)AtomicIntegerFieldUpdater


1.概述


让普通变量也享受到原子操作。


比如:在系统代码中可能会使用一些成员变量,可能最开始简单定义成int,可能并没有定义成AtomicInteger,后面开发中,希望使用CAS操作,但又不想去改数据类型,因为会牵连很多东西。这时候可以选择AtomicIntegerFieldUpdater。


2.主要接口


(1)AtomicIntegerFieldUpdater.newUpdater()


(2)incrementAndGet()


3.小说明


(1)Updater只能修改它可见范围内的变量。因为Updater使用反射得到这个变量。如果变量不可见,就会出错。比如如果score声明为private,就是不可行的。


(2)为了确保变量被正确的读取,它必须是volatile类型的,如果我们原有代码中未声明这个类型,那么简单的声明一下就行,这不会引起什么问题。


(3)由于CAS操作会通过对象实例中的偏移量直接进行赋值,因此,它不支持static字段(Unsafe.objectFieldOffset()不支持静态变量)。


4.代码示例


1 importjava.util.concurrent.atomic.AtomicInteger;2 importjava.util.concurrent.atomic.AtomicIntegerFieldUpdater;3


4 public classAtomicIntegerFieldUpdaterDemo {5 public static classCandidate{6 intid;7 volatile intscore;8 }9


10 public final static AtomicIntegerFieldUpdater scoreUpdater =


11 AtomicIntegerFieldUpdater.newUpdater(Candidate.class, "score");12


13 //检查updater是否正确工作


14 public static AtomicInteger allScore = new AtomicInteger(0);15


16 public static void main(String[] args) throwsInterruptedException {17 final Candidate stu = newCandidate();18 Thread[] ts = new Thread[10000];19 for(int i = 0; i < 10000; i++) {20 ts[i] = newThread() {21 public voidrun() {22 if(Math.random() > 0.4) {23 scoreUpdater.getAndIncrement(stu);24 allScore.getAndIncrement();25 }26 }27 };28 ts[i].start();29 }30 for(int i = 0; i < 10000; i++) {31 ts[i].join();32 }33 System.out.println(stu.score);34 System.out.println(allScore);35 }36 }


三、无锁算法详解


1 importjava.util.AbstractList;2 importjava.util.concurrent.atomic.AtomicReference;3 importjava.util.concurrent.atomic.AtomicReferenceArray;4


5 public class LockFreeVector extends AbstractList{6


7 private static final boolean debug = false;8


9 /**


10 * size of the first bucket.11 * sizeof(bucket[i+1]) = 2*sizeof(bucket[i])12 */


13 private static final int FIRST_BUCKET_SIZE = 8;14


15 /**


16 * number of buckets17 * 30 will allow 8*(2^30-1) elements18 */


19 private static final int N_BUCKET = 30;20


21 /**


22 * we will have at most N_Bucket number Of buckets,23 * and we have sizeof(bucket.get(i)) = FIRST_BUCKET_SIZE**(i+1)24 */


25 private final AtomicReferenceArray>buckets;26


27 static class WriteDescriptor{28 publicE oldV;29 publicE newV;30 public AtomicReferenceArrayaddr;31 public intaddr_ind;32


33 /**


34 * Creating a new descriptor35 *36 *@paramaddr Operation address37 *@paramaddr_ind index of address38 *@paramoldV old operand39 *@paramnewV new operand40 */


41 public WriteDescriptor(AtomicReferenceArray addr, intaddr_ind, E oldV, E newV) {42 this.addr =addr;43 this.addr_ind =addr_ind;44 this.oldV =oldV;45 this.newV =newV;46 }47


48 /**


49 * set newV50 */


51 public voiddoIt() {52 addr.compareAndSet(addr_ind, oldV, newV);53 }54 }55


56 static class Descriptor{57 public intsize;58 volatile WriteDescriptorwriteop;59


60 /**


61 * create a new descriptor62 *@paramsize the size of vector63 *@paramwriteop executor write operation64 */


65 public Descriptor(int size, WriteDescriptorwriteop) {66 this.size =size;67 this.writeop =writeop;68 }69


70


71 public voidcompleteWrite() {72 WriteDescriptor tmpOp =writeop;73 if(tmpOp != null) {74 tmpOp.doIt();75 writeop = null; //this is safe since all write to writeop use null as r_value


76 }77


78 }79 }80


81 private AtomicReference>descriptor;82 private static final int zeroNumFirst =Integer.numberOfLeadingZeros(FIRST_BUCKET_SIZE);83


84 /**


85 * constructor86 */


87 publicLockFreeVector() {88 buckets = new AtomicReferenceArray>(N_BUCKET);89 buckets.set(0, new AtomicReferenceArray(FIRST_BUCKET_SIZE));90 descriptor = new AtomicReference>(new Descriptor(0,null));91 }92


93 /**


94 * add e at the end of vector95 *96 *@parame element added97 */


98 public voidpush_back(E e) {99 Descriptordesc;100 Descriptornewd;101 do{102 desc =descriptor.get();103 desc.completeWrite();104


105 int pos = desc.size +FIRST_BUCKET_SIZE;106 int zeroNumPos =Integer.numberOfLeadingZeros(pos);107 int bucketInd = zeroNumFirst -zeroNumPos;108 if(buckets.get(bucketInd) == null) {109 int newLen = 2 * buckets.get(bucketInd - 1).length();110 if(debug)111 System.out.println("New Length is : " +newLen);112 buckets.compareAndSet(bucketInd, null, new AtomicReferenceArray(newLen));113 }114


115 int idx = (0x80000000>>>zeroNumPos) ^pos;116 newd = new Descriptor(desc.size + 1, new WriteDescriptor(117 buckets.get(bucketInd),idx,null,e));118 } while(!descriptor.compareAndSet(desc, newd));119 descriptor.get().completeWrite();120 }121


122 @Override123 public E get(intindex) {124 //TODO Auto-generated method stub


125 return null;126 }127


128 @Override129 public intsize() {130 //TODO Auto-generated method stub


131 return 0;132 }133


134 }



友情链接: