Java Concurrency Note1

并发与内存模型

mm

并发模拟

  • postman
  • ab
  • 代码实现

CountDownLatch

cd

保证所有线程执行完后进行处理

package com.hku.concurrency;
import com.hku.concurrency.annoations.NotThreadSafe;
import lombok.extern.slf4j.Slf4j;
import java.util.concurrent.*;

@Slf4j
@NotThreadSafe
public class ConcurrencyTest {
    public static int clientTotal = 5000;
    public static int threadTotal = 200;
    public static int count = 0;

    public static void main(String[] args) throws Exception{
        ExecutorService executorService = Executors.newCachedThreadPool();
        final Semaphore semaphore = new Semaphore(threadTotal);
        final CountDownLatch countDownLatch = new CountDownLatch(clientTotal);
        for (int i = 0; i<clientTotal; i++){
            executorService.execute(()->{
                try {
                    semaphore.acquire();
                    add();
                    semaphore.release();
                }catch (Exception e){
                    log.error("exception",e);
                }
                countDownLatch.countDown();
            });
        }
        countDownLatch.await();
        executorService.shutdown();
        log.info("count:{}",count);
    }

    private static void add(){
        count ++;
    }
}

线程安全性

security

s3

Atomic包

package com.hku.concurrency.example.count;

import com.hku.concurrency.annoations.ThreadSafe;
import lombok.extern.slf4j.Slf4j;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Semaphore;
import java.util.concurrent.atomic.AtomicInteger;

@Slf4j
@ThreadSafe
public class CountExample2 {
    public static int clientTotal = 5000;
    public static int threadTotal = 200;
    public static AtomicInteger count = new AtomicInteger(0);

    public static void main(String[] args) throws Exception{
        ExecutorService executorService = Executors.newCachedThreadPool();
        final Semaphore semaphore = new Semaphore(threadTotal);
        final CountDownLatch countDownLatch = new CountDownLatch(clientTotal);
        for (int i = 0; i<clientTotal; i++){
            executorService.execute(()->{
                try {
                    semaphore.acquire();
                    add();
                    semaphore.release();
                }catch (Exception e){
                    log.error("exception",e);
                }
                countDownLatch.countDown();
            });
        }
        countDownLatch.await();
        executorService.shutdown();
        log.info("count:{}",count);
    }

    private static void add(){
        count.incrementAndGet();
    }
}
/**
     * Atomically increments by one the current value.
     *
     * @return the updated value
     */
    public final int incrementAndGet() {
        return unsafe.getAndAddInt(this, valueOffset, 1) + 1;
    }
/**
    * var1 当前对象
    * var2 当前对象的值
    * var4 要增加的值:1
    * var5 通过底层方法得到的当前值getIntVolatile
    * CAS:
    * compareAndSwapInt(var1, var2, var5, var5 + var4)
    * 当var1对象的当前值var2与底层的值var5相同时
    * 执行var5 + var4(增量1),并赋值给var1
*/
public final int getAndAddInt(Object var1, long var2, int var4) {
        int var5;
        do {
            var5 = this.getIntVolatile(var1, var2);
        } while(!this.compareAndSwapInt(var1, var2, var5, var5 + var4));
        return var5;
    }
  • AtomicLong vs Longadder

AtomicLong 在compare过程中如果一直操作,就会陷入循环,影响性能

Longadder 适合高并发,将任务分散至多个节点,可能有小误差

  • AtomicReference
package com.hku.concurrency.example.atomic;
import com.hku.concurrency.annoations.ThreadSafe;
import lombok.extern.slf4j.Slf4j;
import java.util.concurrent.atomic.AtomicReference;
@Slf4j
@ThreadSafe
public class AtomicExample4 {
    private static AtomicReference<Integer> count = new AtomicReference<>(0);

    public static void main(String[] args) {
        count.compareAndSet(0,2);
        count.compareAndSet(0,1);
        count.compareAndSet(1,3);
        count.compareAndSet(2,4);
        count.compareAndSet(3,5);
        log.info("count:{}",count.get());
    }
}
  • AtomicIntegerFieldUpdater

挂钩volatile非static字段

package com.hku.concurrency.example.atomic;
import com.hku.concurrency.annoations.ThreadSafe;
import lombok.Getter;
import lombok.extern.slf4j.Slf4j;
import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
@Slf4j
@ThreadSafe
public class AtomicExample5 {

    private static AtomicIntegerFieldUpdater<AtomicExample5> updater =
        AtomicIntegerFieldUpdater.newUpdater(AtomicExample5.class, "count");
    @Getter
    public volatile int count = 100;
    private static AtomicExample5 atomicExample5 = new AtomicExample5();
    public static void main(String[] args) {
        if (updater.compareAndSet(atomicExample5,100,120))
        log.info("success1:{}",atomicExample5.getCount());
        if (updater.compareAndSet(atomicExample5,100,120))
            log.info("success2:{}",atomicExample5.getCount());
        else log.info("failed:{}",atomicExample5.getCount());
    }
}
  • AtomicStampReference

ABA问题,修改后又修改回去,cas失效

CAS方法增加了stamp版本判断

/**
     * Atomically sets the value of both the reference and stamp
     * to the given update values if the
     * current reference is {@code ==} to the expected reference
     * and the current stamp is equal to the expected stamp.
     *
     * @param expectedReference the expected value of the reference
     * @param newReference the new value for the reference
     * @param expectedStamp the expected value of the stamp
     * @param newStamp the new value for the stamp
     * @return {@code true} if successful
     */
    public boolean compareAndSet(V   expectedReference,
                                 V   newReference,
                                 int expectedStamp,
                                 int newStamp) {
        Pair<V> current = pair;
        return
            expectedReference == current.reference &&
            expectedStamp == current.stamp &&
            ((newReference == current.reference &&
              newStamp == current.stamp) ||
             casPair(current, Pair.of(newReference, newStamp)));
    }
  • AtomicBoolean
package com.hku.concurrency.example.atomic;
import com.hku.concurrency.annoations.ThreadSafe;
import lombok.Getter;
import lombok.extern.slf4j.Slf4j;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Semaphore;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;

@Slf4j
@ThreadSafe
public class AtomicExample6 {
    private static AtomicBoolean atomicBoolean = new AtomicBoolean(false);
    public static int clientTotal = 5000;
    public static int threadTotal = 200;
    public static void main(String[] args) throws Exception{
        ExecutorService executorService = Executors.newCachedThreadPool();
        final Semaphore semaphore = new Semaphore(threadTotal);
        final CountDownLatch countDownLatch = new CountDownLatch(clientTotal);
        for (int i = 0; i<clientTotal; i++){
            executorService.execute(()->{
                try {
                    semaphore.acquire();
                    test();
                    semaphore.release();
                }catch (Exception e){
                    log.error("exception",e);
                }
                countDownLatch.countDown();
            });
        }
        countDownLatch.await();
        executorService.shutdown();
        log.info("atomicBoolean:{}",atomicBoolean.get());
    }
    private static void test(){
        if (atomicBoolean.compareAndSet(false,true))log.info("execute");
    }
}

Synchronized

lock

  • 同步锁

syn

package com.hku.concurrency.example.sync;
import lombok.extern.slf4j.Slf4j;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
@Slf4j
public class SynchronizedExample1 {
    //修饰代码块
    public void test1(){
        synchronized (this){
            for (int i = 0; i<10; i++){
                log.info("test1-{}",i);
            }
        }
    }
    //修饰方法
    public synchronized void test2(){
        for (int i = 0; i<10; i++){
            log.info("test2-{}",i);
        }
    }
    public static void main(String[] args) {
        SynchronizedExample1 synchronizedExample1 = new SynchronizedExample1();
        ExecutorService executorService = Executors.newCachedThreadPool();
        //启用线程池,可使两个线程同时启动
        executorService.execute(()->{
            synchronizedExample1.test1();
        });
        executorService.execute(()->{
            synchronizedExample1.test1();
        });
    }
}

上式可保证有序输出

 public static void main(String[] args) {
        SynchronizedExample1 synchronizedExample1 = new SynchronizedExample1();
        SynchronizedExample1 synchronizedExample2 = new SynchronizedExample1();
        ExecutorService executorService = Executors.newCachedThreadPool();
        //启用线程池,可使两个线程同时启动
        executorService.execute(()->{
            synchronizedExample1.test1(1);
        });
        executorService.execute(()->{
            synchronizedExample2.test1(2);
        });
    }
  • 修饰块和方法时,作用于调用的对象,不同的调用对象之间不影响

synlog

  • 修饰静态方法
 //修饰静态方法
    public static synchronized void test2(int t){
        for (int i = 0; i<10; i++){
            log.info("test2-{}-{}",t,i);
        }
    }
    public static void main(String[] args) {
        SynchronizedExample2 synchronizedExample1 = new SynchronizedExample2();
        SynchronizedExample2 synchronizedExample2 = new SynchronizedExample2();
        ExecutorService executorService = Executors.newCachedThreadPool();
        //启用线程池,可使两个线程同时启动
        executorService.execute(()->{
            synchronizedExample1.test2(1);
        });
        executorService.execute(()->{
            synchronizedExample2.test2(2);
        });
    }

synlog1

作用于所有对象

  • 修饰类
//修饰class
public void test1(int k){
        synchronized (SynchronizedExample2.class) {
            for (int i = 0; i < 10; i++) {
                log.info("test1-example{}-{}", k, i);
            }
        }
    }
    public static void main(String[] args) {
        SynchronizedExample2 synchronizedExample1 = new SynchronizedExample2();
        SynchronizedExample2 synchronizedExample2 = new SynchronizedExample2();
        ExecutorService executorService = Executors.newCachedThreadPool();
        //启用线程池,可使两个线程同时启动
        executorService.execute(()->{
            synchronizedExample1.test1(1);
        });
        executorService.execute(()->{
            synchronizedExample2.test1(2);
        });
    }

结果与上图相同

3lock

线程可见性

  • 共享变量在线程间的可见性

不可见的原因

  • 线程交叉进行
  • 重排序+线程交叉执行
  • 共享变量更新后的值没有在主内存和工作内存中及时更新

Synchronized

jmmsyn

Volatile

volatile

v1

v2

直接使用volatile修饰变量并不线程安全,不具有原子性

使用条件:

  • 对变量写操作不依赖当前值
  • 该变量赋值过程不包含其他变量
  • 适合作为状态标识

线程有序性

JMM会对指令重排序,虽然不会影响单线程的执行,但是会影响多线程并发得到正确性

  • JVM会默认保证一定的有序性,根据happens-before原则

  • 程序次序原则:单线程内按顺序执行(看起来),只会重排无依赖的指令

  • 锁定原则:unlock先于lock操作
  • volatile变量原则:对一个变量的写先于之后对此变量的读
  • 传递原则:A先于B,B先于C,那么A先于C
  • 线程启动原则:start先于其他动作
  • 线程中断原则:interrupt后才可检测到中断发生
  • 线程终结原则:所有行为先于线程终止检测
  • 对象终结原则:初始化先于finalize方法

安全发布对象

不安全的情况

直接发布可能带来的不安全性:被其他线程改变

package com.hku.concurrency.example.publish;

import com.hku.concurrency.annoations.NotThreadSafe;
import lombok.extern.slf4j.Slf4j;
import java.util.Arrays;
@Slf4j
@NotThreadSafe
public class UnsafePublish {
    private String [] status = {"a","b","c"};
    public String[] getStatus() {
        return status;
    }

    public static void main(String[] args) {
        UnsafePublish unsafePublish =new UnsafePublish();
        log.info("status:{}", Arrays.toString(unsafePublish.status));
        unsafePublish.getStatus()[0] = "e";
        log.info("status:{}",Arrays.toString(unsafePublish.status));
    }
}

构造完成之前,便发布使其可见

package com.hku.concurrency.example.publish;
import com.hku.concurrency.annoations.NotRecommend;
import com.hku.concurrency.annoations.NotThreadSafe;
import lombok.extern.slf4j.Slf4j;
@Slf4j
@NotThreadSafe
@NotRecommend
public class Escape {
    private int escapeIt = 0;
    public Escape(){
        new InnerClass();
    }
    private class InnerClass{
        public InnerClass(){
            log.info("{}",Escape.this.escapeIt);
        }
    }
    //对象溢出,在该对象构造完成之前,就已经对他发布,使其对外可见
    public static void main(String[] args) {
        new Escape();
    }
}

安全的发布对象

publish

单例模式的发布

package com.hku.concurrency.example.singleton;

import com.hku.concurrency.annoations.NotRecommend;
import com.hku.concurrency.annoations.NotThreadSafe;
import lombok.extern.slf4j.Slf4j;
/**
 * 懒汉模式
 * 多线程有问题
 * 多个线程都检测到空值,导致拿到不同的实例
 * Synchronized修饰getInstance可安全,但是性能降低
 */
@Slf4j
@NotRecommend
@NotThreadSafe
public class SingletonExample1 {
    //private constructor
    private SingletonExample1(){
    }
    //singleton instance
    private static SingletonExample1 instance = null;
    //static factory method
    public static SingletonExample1 getInstance(){
        if(instance == null){
            instance = new SingletonExample1();
        }
        return instance;
    }
}
package com.hku.concurrency.example.singleton;

import com.hku.concurrency.annoations.NotRecommend;
import com.hku.concurrency.annoations.NotThreadSafe;
import com.hku.concurrency.annoations.ThreadSafe;
import lombok.extern.slf4j.Slf4j;
/**
 * 饿汉模式
 * 单例实例在类装载时创建
 * 可能有资源浪费(如果实例没有使用)
 */
@Slf4j
@ThreadSafe
public class SingletonExample2 {
    //private constructor
    private SingletonExample2(){
    }
    //singleton instance
    private static SingletonExample2 instance = new SingletonExample2();
    //static factory method
    public static SingletonExample2 getInstance(){
        return instance;
    }
}
package com.hku.concurrency.example.singleton;

import com.hku.concurrency.annoations.NotRecommend;
import com.hku.concurrency.annoations.NotThreadSafe;
import lombok.extern.slf4j.Slf4j;
/**
 * 懒汉模式-双重同步锁单例模式
 * 多线程有问题
 */
@Slf4j
@NotThreadSafe
public class SingletonExample3 {
    //private constructor
    private SingletonExample3(){
    }
    //1、 memory = allocate()
    //2、 ctorInstance()初始化
    //3、 instance = memory使实例指向内存

    //JVM优化后,指令重排

    //1、 memory = allocate()
    //3、 instance = memory使实例指向内存
    //2、 ctorInstance()初始化

    //A还没有初始化,就被return
    //singleton instance
    private static SingletonExample3 instance = null;
    //static factory method
    public static SingletonExample3 getInstance(){
        if(instance == null){//双重检测                 //B
            synchronized (SingletonExample3.class) {//同步锁
                if(instance == null) {
                    instance = new SingletonExample3();//A-3
                }
            }
        }
        return instance;
    }
}

限制指令重排序,通过volatile:

package com.hku.concurrency.example.singleton;

import com.hku.concurrency.annoations.NotRecommend;
import com.hku.concurrency.annoations.NotThreadSafe;
import lombok.extern.slf4j.Slf4j;
/**
 * 懒汉模式-双重同步锁单例模式+volatile
 */
@Slf4j
@ThreadSafe
public class SingletonExample3 {
    //private constructor
    private SingletonExample3(){
    }
    //1、 memory = allocate()
    //2、 ctorInstance()初始化
    //3、 instance = memory使实例指向内存

    //JVM优化后,指令重排(volatile限制重排)

    //1、 memory = allocate()
    //3、 instance = memory使实例指向内存
    //2、 ctorInstance()初始化

    //A还没有初始化,就被return
    //singleton instance
    private volatile static SingletonExample3 instance = null;
    //static factory method
    public static SingletonExample3 getInstance(){
        if(instance == null){//双重检测                 //B
            synchronized (SingletonExample3.class) {//同步锁
                if(instance == null) {
                    instance = new SingletonExample3();//A-3
                }
            }
        }
        return instance;
    }
}
package com.hku.concurrency.example.singleton;

import com.hku.concurrency.annoations.NotThreadSafe;
import com.hku.concurrency.annoations.Recommend;
import com.hku.concurrency.annoations.ThreadSafe;
import lombok.extern.slf4j.Slf4j;
/**
 * 枚举模式
 * 安全且无资源浪费
 */
@Slf4j
@ThreadSafe
@Recommend
public class SingletonExample4 {
    //private constructor
    private SingletonExample4(){
    }
    public static SingletonExample4 getInstance(){
        return Singleton.INSTANCE.getInstance();
    }
    private enum Singleton{
        INSTANCE;
        private SingletonExample4 singleton;
        //JVM保证这个方法只被调用一次
        Singleton(){
            singleton = new SingletonExample4();
        }
        public SingletonExample4 getInstance(){
            return singleton;
        }
    }
}
  • Copyrights © 2019-2020 Rex