concurrency

内容整理自《Java并发编程实战》《java-concurrency-patterns》 相关工具类《vjtools》

Build Status


1.序言及全览

学习并发的原因

并发编程解决的核心问题

如何学习


2.抽象问题总结

并发程序的背后

  1. CPU 增加了缓存,以均衡与内存的速度差异;
  2. 操作系统增加了进程、线程,以分时复用 CPU,进而均衡 CPU 与 I/O 设备的速度差异;
  3. 编译程序优化指令执行次序,使得缓存能够得到更加合理地利用。

缓存导致的可见性问题

线程切换带来的原子性问题

254b129b145d80e9bb74123d6e620efb

编译优化带来的有序性问题

public class Singleton {
  private static Singleton instance;
  private Singleton(){}
  public static Singleton getInstance(){
    //一重判断
    if (instance == null) {
      synchronized(Singleton.class) {
        //二重判断防止多线程同时竞争锁的情况多次创建
        if (instance == null)
          instance = new Singleton();
        }
    }
    return instance;
  }
}

3.JAVA内存模型

按需禁用缓存以及编译优化 代码来源

final

    final int x;
    // 错误的构造函数
    public FinalFieldExample() { 
          x = 3;
          y = 4;
          // 此处就是讲 this 逸出,
          global.obj = this;
    }

        

4.JAVA线程的生命周期

通用的线程生命周期

9bbc6fa7fb4d631484aa953626cf6ae5

Java 中线程的生命周期

线程转换条件

  1. RUNNABLE - BLOCKED
    • 线程等待synchronized隐式锁(线程调用阻塞式API依然是RUNNABLE状态)
  2. RUNNABLE - WAITING
    • 获得synchronized隐式锁的线程,调用Object.wait();
    • Thread.join();
    • LockSupport.park();
  3. RUNNABLE - TIMED_WAITING
    • Thread.sleep(long millis)
    • 获得 synchronized 隐式锁的线程调用 Object.wait(long timeout)
    • Thread.join(long millis)
    • LockSupport.parkNanos(Object blocker, long deadline)
    • LockSupport.parkUntil(long deadline)
  4. NEW - RUNNABLE
    • Thread.start()
  5. RUNNABLE - TERMINATED
    • run()执行完后自动转为 TERMINATED
    • stop()(@Deprecated 直接结束线程,如果线程持有ReentrantLock锁并不会释放)
    • interrupt()
      • 异常通知
      • 主动监测

5.多线程以及线程数确定

多线程目的

多线程应用场景

多线程效果

线程数


#6. 若干反例

class SafeCalc {
  long value = 0L;
  long get() {
    synchronized (new Object()) {
      return value;
    }
  }
  void addOne() {
    synchronized (new Object()) {
      value += 1;
    }
  }
}

class Account {
  // 账户余额  
  private Integer balance;
  // 账户密码
  private String password;
  // 取款
  void withdraw(Integer amt) {
    synchronized(balance) {
      if (this.balance > amt){
        this.balance -= amt;
      }
    }
  } 
  // 更改密码
  void updatePassword(String pw){
    synchronized(password) {
      this.password = pw;
    }
  } 
}

void addIfNotExist(Vector v, 
    Object o){
  if(!v.contains(o)) {
    v.add(o);
  }
}


7. Lock和Condition

重复造轮子的原因抑或Lock&Condition的优势

synchronized 的问题是,持有锁 A 后,如果尝试获取锁 B 失败,那么线程就进入阻塞状态,一旦发生死锁,就没有任何机会来唤醒阻塞的线程。但如果阻塞状态的线程能够响应中断信号的时候,能够唤醒它,那它就有机会释放曾经持有的锁 A。这样就破坏了不可抢占条件了。
如果线程在一段时间之内没有获取到锁,不是进入阻塞状态,而是返回一个错误,那这个线程也有机会释放曾经持有的锁。这样也能破坏不可抢占条件。
如果尝试获取锁失败,并不进入阻塞状态,而是直接返回,那这个线程也有机会释放曾经持有的锁。这样也能破坏不可抢占条件。

体现在具体代码上就是Lock接口的三个方法

// 支持中断的 API
void lockInterruptibly() throws InterruptedException;
// 支持超时的 API
boolean tryLock(long time, TimeUnit unit) throws InterruptedException;
// 支持非阻塞获取锁的 API
boolean tryLock();

可重入锁(ReentrantLock)

class X {
  private final Lock rtl =new ReentrantLock();
  int value;
  public int get() {
    // 获取锁
    rtl.lock();         ②
    try {
      return value;
    } finally {
      // 保证锁能释放
      rtl.unlock();
    }
  }
  public void addOne() {
    // 获取锁
    rtl.lock();  
    try {
      value = 1 + get(); ①
    } finally {
      // 保证锁能释放
      rtl.unlock();
    }
  }
}

公平锁和非公平锁

// 无参构造函数:默认非公平锁
public ReentrantLock() {
    sync = new NonfairSync();
}
// 根据公平策略参数创建锁
public ReentrantLock(boolean fair){
    sync = fair ? new FairSync() : new NonfairSync();
}

用锁的最佳实践(from Doug Lea)


8.Semaphore

信号量模型

1

方法语义

class Semaphore{
  // 计数器
  int count;
  // 等待队列
  Queue queue;
  // 初始化操作
  Semaphore(int c){
    this.count=c;
  }
  // 
  void down(){
    this.count--;
    if(this.count<0){
      // 将当前线程插入等待队列
      // 阻塞当前线程
    }
  }
  void up(){
    this.count++;
    if(this.count<=0) {
      // 移除等待队列中的某个线程 T
      // 唤醒线程 T
    }
  }
}

使用方法

实现互斥

实现限流器(Semaphore 可以允许多个线程访问一个临界区)


9.ReadWriteLock、StampedLock、CountDownLatch、CyclicBarrier

ReadWriteLock 读写锁

StampedLock 加上乐观读(无锁)

CountDownLatch

CyclicBarrier


10. 并发容器

同步容器(jdk1.5 之前)

包装安全类

Vector、Stack 和 Hashtable(基于synchronized实现)

对同步容器做遍历操作时需要加锁保证互斥

List list = Collections.synchronizedList(new ArrayList());
synchronized (list) {  
  Iterator i = list.iterator(); 
  while (i.hasNext())
    foo(i.next());
}    

并发容器(jdk1.5 之后)

rongqi

List

CopyOnWriteArrayList写的时候会将共享变量新复制一份出来,这样做的好处是读操作完全无锁)

Map

ConcurrentHashMap、ConcurrentSkipListMap(SkipList跳表) 区别在于Key是否有序

Set

CopyOnWriteArraySet、ConcurrentSkipListSet

Queue


11. 原子类

原子化的基本数据类型

getAndIncrement() // 原子化 i++
getAndDecrement() // 原子化的 i--
incrementAndGet() // 原子化的 ++i
decrementAndGet() // 原子化的 --i
// 当前值 +=delta,返回 += 前的值
getAndAdd(delta) 
// 当前值 +=delta,返回 += 后的值
addAndGet(delta)
//CAS 操作,返回是否成功
compareAndSet(expect, update)
// 以下四个方法
// 新值可以通过传入 func 函数来计算
getAndUpdate(func)
updateAndGet(func)
getAndAccumulate(x,func)
accumulateAndGet(x,func)

原子化的对象引用类型

原子化数组(比基本类型多了数组的索引参数)

原子化对象属性更新器(基于反射原子化更新对象属性,对象属性必须是volitale保证可见性)

原子化累加器(空间换时间,只支持累加操作性能比原子化基本数据类型更好,不支持compareAndSet())


12. 线程池

为什么要用线程池

线程池是一种生产者-消费者模式(非一般意义池化资源)

ThreadPoolExecutor(
  int corePoolSize,
  int maximumPoolSize,
  long keepAliveTime,
  TimeUnit unit,
  BlockingQueue<Runnable> workQueue,
  ThreadFactory threadFactory,
  RejectedExecutionHandler handler) 

注意事项


13. Future

获得任务执行结果

// 取消任务
boolean cancel(boolean mayInterruptIfRunning);
// 判断任务是否已取消  
boolean isCancelled();
// 判断任务是否已结束
boolean isDone();
// 获得任务执行结果
get();
// 获得任务执行结果,支持超时
get(long timeout, TimeUnit unit);

FutureTask工具类(实现了RunnableFuture而它继承了Runnable和Future接口)

FutureTask(Callable<V> callable);
FutureTask(Runnable runnable, V result);