内容整理自《Java并发编程实战》《java-concurrency-patterns》 相关工具类《vjtools》
public class Singleton {
private static Singleton instance;
private Singleton(){}
public static Singleton getInstance(){
//一重判断
if (instance == null) {
synchronized(Singleton.class) {
//二重判断防止多线程同时竞争锁的情况多次创建
if (instance == null)
instance = new Singleton();
}
}
return instance;
}
}
JVM 中的同步是基于进入和退出管程(Monitor)对象实现的。每个对象实例都会有一个 Monitor,Monitor 可以和对象一起创建、销毁。Monitor 是由 ObjectMonitor 实现,而 ObjectMonitor 实现,而 ObjectMonitor 是由C++ 的 ObjectMonitor.hpp 文件实现
当多个线程同时访问时,多个线程会被先放在EntryList集合,处于block状态的线程,都会被加入到该列表。接下来当线程获取到对象的Monitor时,Monitor依靠底层操作系统的Mutex Lock来实现互斥,线程申请Mutex成功则持有,其它线程无法获取到Mutex
ObjectMonitor() {
_header = NULL;
_count = 0; // 记录个数
_waiters = 0,
_recursions = 0;
_object = NULL;
_owner = NULL;
_WaitSet = NULL; // 处于 wait 状态的线程,会被加入到 _WaitSet
_WaitSetLock = 0 ;
_Responsible = NULL ;
_succ = NULL ;
_cxq = NULL ;
FreeNext = NULL ;
_EntryList = NULL ; // 处于等待锁 block 状态的线程,会被加入到该列表
_SpinFreq = 0 ;
_SpinClock = 0 ;
OwnerIsThread = 0 ;
}
wait():如果线程调用 wait() 方法,就会释放当前持有的 Mutex,并且该线程会进入 WaitSet 集合中,等待下一次被唤醒。如果当前线程顺利执行完方法,也将释放 Mutex。
jdk1.6引入Java对象头(MarkWord、指向类的指针以及数组长度三部分组成)
对象实例分为对象头、实例数据和对齐填充
class X {
// 修饰非静态方法 锁对象为当前类的实例对象 this
synchronized void get() {
}
// 修饰静态方法 锁对象为当前类的Class对象 Class X
synchronized static void set() {
}
// 修饰代码块
Object obj = new Object();
void put() {
synchronized(obj) {
}
}
}
final int x;
// 错误的构造函数
public FinalFieldExample() {
x = 3;
y = 4;
// 此处就是讲 this 逸出,
global.obj = this;
}
程序前面对某个变量的修改一定是对后续操作可见的。
对一个 volatile 变量的写操作, Happens-Before 于后续对这个 volatile 变量的读操作。
传递性规则
A Happens-Before B,且 B Happens-Before C,那么A Happens-Before C。
对一个锁的解锁 Happens-Before 于后续对这个锁加锁
synchronized (this) { // 此处自动加锁
// x 是共享变量, 初始值 =10
if (this.x < 12) {
this.x = 12;
}
} // 此处自动解锁
主线程 A 启动子线程 B 后,子线程 B 能够看到主线程在启动子线程 B 前的操作。
Thread B = new Thread(()->{
// 主线程调用 B.start() 之前
// 所有对共享变量的修改,此处皆可见
// 此例中,var==77
});
// 此处主线程A对共享变量 var 修改
var = 77;
// 主线程启动子线程
B.start();
线程 join() 规则
主线程 A 等待子线程 B 完成(主线程 A 通过调用子线程B 的 join() 方法实现),当子线程 B 完成后(主线程 A 中 join() 方法返回),主线程能够看到子线程的操作。
Thread B = new Thread(()->{
// 此处对共享变量 var 修改
var = 66;
});
// 例如此处对共享变量修改,
// 则这个修改结果对线程 B 可见
// 主线程启动子线程
B.start();
B.join()
// 子线程所有对共享变量的修改
// 在主线程调用 B.join() 之后皆可见
// 此例中,var==66
初始状态
指的是线程已经被创建,但是还不允许分配 CPU 执行。这个状态属于编程语言特有的,不过这里所谓的被创建,仅仅是在编程语言层面被创建,而在操作系统层面,真正的线程还没有创建。
指的是线程可以分配 CPU 执行。在这种状态下,真正的操作系统线程已经被成功创建了,所以可以分配 CPU 执行。
当有空闲的 CPU 时,操作系统会将其分配给一个处于可运行状态的线程,被分配到 CPU 的线程的状态就转换成了运行状态。
运行状态的线程如果调用一个阻塞的 API(例如以阻塞方式读文件)或者等待某个事件(例如条件变量),那么线程的状态就会转换到休眠状态,同时释放 CPU 使用权,休眠状态的线程永远没有机会获得 CPU 使用权。当等待的事件出现了,线程就会从休眠状态转换到可运行状态。
线程执行完或者出现异常就会进入终止状态,终止状态的线程不会切换到其他任何状态,进入终止状态也就意味着线程的生命周期结束了。
#6. 若干反例
class SafeCalc {
long value = 0L;
long get() {
synchronized (new Object()) {
return value;
}
}
void addOne() {
synchronized (new Object()) {
value += 1;
}
}
}
class Account {
// 账户余额
private Integer balance;
// 账户密码
private String password;
// 取款
void withdraw(Integer amt) {
synchronized(balance) {
if (this.balance > amt){
this.balance -= amt;
}
}
}
// 更改密码
void updatePassword(String pw){
synchronized(password) {
this.password = pw;
}
}
}
void addIfNotExist(Vector v,
Object o){
if(!v.contains(o)) {
v.add(o);
}
}
synchronized 的问题是,持有锁 A 后,如果尝试获取锁 B 失败,那么线程就进入阻塞状态,一旦发生死锁,就没有任何机会来唤醒阻塞的线程。但如果阻塞状态的线程能够响应中断信号的时候,能够唤醒它,那它就有机会释放曾经持有的锁 A。这样就破坏了不可抢占条件了。
如果线程在一段时间之内没有获取到锁,不是进入阻塞状态,而是返回一个错误,那这个线程也有机会释放曾经持有的锁。这样也能破坏不可抢占条件。
如果尝试获取锁失败,并不进入阻塞状态,而是直接返回,那这个线程也有机会释放曾经持有的锁。这样也能破坏不可抢占条件。
// 支持中断的 API
void lockInterruptibly() throws InterruptedException;
// 支持超时的 API
boolean tryLock(long time, TimeUnit unit) throws InterruptedException;
// 支持非阻塞获取锁的 API
boolean tryLock();
class X {
private final Lock rtl =new ReentrantLock();
int value;
public int get() {
// 获取锁
rtl.lock(); ②
try {
return value;
} finally {
// 保证锁能释放
rtl.unlock();
}
}
public void addOne() {
// 获取锁
rtl.lock();
try {
value = 1 + get(); ①
} finally {
// 保证锁能释放
rtl.unlock();
}
}
}
// 无参构造函数:默认非公平锁
public ReentrantLock() {
sync = new NonfairSync();
}
// 根据公平策略参数创建锁
public ReentrantLock(boolean fair){
sync = fair ? new FairSync() : new NonfairSync();
}
class Semaphore{
// 计数器
int count;
// 等待队列
Queue queue;
// 初始化操作
Semaphore(int c){
this.count=c;
}
//
void down(){
this.count--;
if(this.count<0){
// 将当前线程插入等待队列
// 阻塞当前线程
}
}
void up(){
this.count++;
if(this.count<=0) {
// 移除等待队列中的某个线程 T
// 唤醒线程 T
}
}
}
如果一个写线程正在执行写操作,此时禁止读线程读共享变量
List list = Collections.synchronizedList(new ArrayList());
synchronized (list) {
Iterator i = list.iterator();
while (i.hasNext())
foo(i.next());
}
CopyOnWriteArrayList写的时候会将共享变量新复制一份出来,这样做的好处是读操作完全无锁)
内部维护了一个数组,成员变量 array 就指向这个内部数组,所有的读操作都是基于 array 进行的,如下图所示,迭代器 Iterator 遍历的就是 array 数组。
若遍历 array 的同时,新增元素,CopyOnWriteArrayList 会将 array 复制一份,然后在新复制处理的数组上执行增加元素的操作,执行完之后再将 array 指向这个新的数组。读写是可以并行的,遍历操作一直都是基于原 array 执行,而写操作则是基于新 array 进行。
总结:
ConcurrentHashMap、ConcurrentSkipListMap(SkipList跳表) 区别在于Key是否有序
CopyOnWriteArraySet、ConcurrentSkipListSet
getAndIncrement() // 原子化 i++
getAndDecrement() // 原子化的 i--
incrementAndGet() // 原子化的 ++i
decrementAndGet() // 原子化的 --i
// 当前值 +=delta,返回 += 前的值
getAndAdd(delta)
// 当前值 +=delta,返回 += 后的值
addAndGet(delta)
//CAS 操作,返回是否成功
compareAndSet(expect, update)
// 以下四个方法
// 新值可以通过传入 func 函数来计算
getAndUpdate(func)
updateAndGet(func)
getAndAccumulate(x,func)
accumulateAndGet(x,func)
ThreadPoolExecutor(
int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler)
ThreadPoolExecutor提供了三个submit方法
这个方法的参数是一个 Runnable 接口,Runnable 接口的 run() 方法是没有返回值的,所以 submit(Runnable task) 这个方法返回的 Future 仅可以用来断言任务已经结束了,类似于 Thread.join()。
// 取消任务
boolean cancel(boolean mayInterruptIfRunning);
// 判断任务是否已取消
boolean isCancelled();
// 判断任务是否已结束
boolean isDone();
// 获得任务执行结果
get();
// 获得任务执行结果,支持超时
get(long timeout, TimeUnit unit);
FutureTask(Callable<V> callable);
FutureTask(Runnable runnable, V result);