同步VS异步 and 阻塞VS非阻塞 Link
- 同步/异步与阻塞/非阻塞是两组不同的概念,它们可以共存组合,同步和异步与消息的通知机制有关,阻塞和非阻塞与程序等待消息(无所谓同步或者异步)时的状态有关 同步:就是在发出一个功能调用时,在没有得到结果之前,该调用就不返回(与阻塞的区别很多时候当前线程还是激活的,只是从逻辑上当前函数没有返回而已)。异步:当一个异步过程调用发出后,调用者不能立刻得到结果;这个调用的部件在处理完成后,通过状态、通知和回调来通知调用者。阻塞:是指调用结果返回之前,当前线程会被挂起,函数只有在得到结果之后才会返回。非阻塞指在不能立刻得到结果之前,该函数不会阻塞当前线程,而会立刻返回。
- 举例: 我去买一本书,立即买到了,这就是非阻塞;如果恰好书店没有,我就等一直等到书店有了这本书买到了才走,这就是阻塞;这两种情况,非阻塞和阻塞都可以称为同步。如果书店恰好没有,我就告诉书店老板,书来了告诉我一声让我来取或者直接送到我家,然后我就走了,这就是异步。
Java对象锁和类锁
- 在Java程序运行时环境中,JVM需要对两类线程共享的数据进行协调:①保存在堆中的实例变量 ②保存在方法区中的类变量。
- 一个线程可以多次对同一个对象上锁。对于每一个对象,java虚拟机维护一个加锁计数器,线程每获得一次该对象,计数器就加1,每释放一次,计数器就减 1,当计数器值为0时,对象就被完全释放了。
- 对于同一个类A,线程1争夺A对象实例的对象锁,线程2争夺类A的类锁,这两者不存在竞争关系。也就说对象锁和类锁互补干预内政
互斥锁与读写锁
- 互斥锁:任意时刻,只能有一个线程持有锁。即假设A线程已经获取了锁,在A线程释放这个锁之前,B线程是无法获取到这个锁的,B要获取这个锁就会进入阻塞状态。例如synchronized和ReentrantLock都属于互斥锁
- 读写锁ReadWriteLock:Java并发包中实现ReadWriteLock接口的ReentrantReadWriteLock,它并没有实现Lock接口,是其内部类ReadLock和WriteLock实现了Lock的接口
- 在真实的业务场景中,通常会出现对一份数据的读取操作次数远高于写入操作这种读多写少的场景,而线程与线程间的读读操作是不涉及到线程安全的问题,没有必要加入互斥锁
- 所以读写锁只要满足“读-读”不互斥 ,”读-写”互斥 ,”写-写”互斥;也就是说只要在任何时候必须保证:①只有一个线程在写入;②线程正在读取的时候,写入操作等待;③线程正在写入的时候,其他线程的写入操作和读取操作都要等待;这样,使用读写锁就可以大大提高效率
- 读写锁实例讲解
Synchronized代码块和方法
- synchronized关键字强制实施一个互斥锁,使得被保护的代码块在同一时间只能有一个线程进入并执行,用于防止多线程访问临界共享资源出现数据不一致性;但是同步是一种高开销的操作(因为在锁竞争激烈的情况下,各个线程不断在阻塞和运行状态之间切换会导致操作系统的上下文切换开销非常大),因此应该尽量减少同步的内容;通常没有必要同步整个方法,使用synchronized代码块同步关键代码;
Synchronized锁住的是对象或者类,对象被锁住时,该对象所有的其他同步方法和代码块不能被执行;类被锁住时,该类所有的其他同步静态方法和代码块不能被执行
class Bank { private int account = 100; public int getAccount() { return account; } /** * 用同步方法实现 * * @param money */ public synchronized void save(int money) { account += money; } /** * 用同步代码块实现 * * @param money */ public void save1(int money) { synchronized (this) { account += money; } } }
ReentrantLock可重入锁
- 可重入锁,也叫做递归锁,指的是同一线程外层函数获得锁之后,内层递归函数仍然有获取该锁的代码,但不受影响;即被同一个线程多次获取,而不会产生死锁。在JAVA环境下 ReentrantLock 和synchronized 都是可重入锁
- ReentrantLock是可重入、互斥、实现了Lock接口的锁,需要自己手动加锁与释放;要注意及时释放锁,否则会出现死锁,通常在finally代码释放锁;它能完成Synchronized所实现的所有功能,而且性能上有一定提升
- ReentrantLock的常用方法:
- lock(), 如果获取了锁立即返回,如果别的线程持有锁,当前线程则一直处于休眠状态,直到获取锁
- lockInterruptibly -> 调用后一直阻塞到获得锁(和lock()一样),但是接受中断信号
- tryLock(), 如果获取了锁立即返回true,如果别的线程正持有锁,立即返回false
- tryLock(long timeout,TimeUnit unit),如果获取了锁定立即返回true,如果别的线程正持有锁,会等待参数给定的时间,在等待的过程中,如果获取了锁定,就返回true,如果等待超时,返回false
- synchronized和ReentrantLock的主要区别
- ReentrantLock新特性
- 等待可中断—-在持有锁的线程长时间不释放锁的时候,等待的线程可以选择放弃等待. 使用tryLock(long timeout, TimeUnit unit)方法
- 公平锁(按照申请锁的顺序来依次获得锁称为公平锁)—-synchronized的是非公平锁,ReentrantLock可以通过构造函数实现公平锁. new RenentrantLock(boolean fair)
- 可绑定多个Condition—-通过多次newCondition可以获得多个Condition对象,可以简单的通过await(),signal()方法实现比较复杂的线程同步的功能
- 使用场景并发量比较小的情况下,Synchronized的性能要优于ReetrantLock,但是在并发量比较高,资源竞争很激烈的情况下,其性能下降很严重,此时ReentrantLock是个不错的方案;如果synchronized关键字能满足用户的需求,就用synchronized,因为它能简化代码,可读性高,而且编译器会尽可能优化synchronize
- ReentrantLock新特性
class Bank {
private int account = 100;
// 创建一个ReentrantLock锁
private Lock lock = new ReentrantLock();
public int getAccount() {
return account;
}
public void save(int money) {
lock.lock(); // 获得锁
try {
account += money;
} finally {
lock.unlock(); // 释放锁
}
}
}
自旋锁
- 自旋锁是对线程阻塞的一种优化,他的原理是当线程争用锁失败的时候不立即进入阻塞状态,而是再等一会,因为对于执行时间短的代码这一会可能就会释放锁,而线程就不需要进行一次阻塞与唤醒。这个等待操作就是让线程多执行几个空指令,至于等待多久这跟具体的处理器实现有关,也有可能处理器根本不支持自旋锁,具体实现的时候我们可以设置一个临界值,当超过了这个临界值之后我们就不自旋了,就乖乖进入阻塞状态吧。这种优化对于执行时间短的代码是很有效的。synchronized使用自旋锁的时机是线程进入等待队列即阻塞的前一步。
- 无锁例如Disruptor并发框架,Disruptor底层依赖一个RingBuffer来进行线程之间的数据交换,在并发条件下,多线程对RingBuffer的读和写不会涉及到锁,然而因为RingBuffer满或者RingBuffer中没有可消费内容引发的线程等待,那就要另当别论了。Disruptor无锁原理RingBuffer维护者可读和可写的指针,也叫游标,它指向生产者或消费者需要写或读的位置,而对于指针的更新是由CAS来完成的,这个过程中我们不需要加锁/解锁的过程。
ThreadLocal线程局部变量
- ThreadLocal与同步机制区别:a.ThreadLocal与同步机制都是为了解决多线程中相同变量的访问冲突问题。b.前者采用以”空间换时间”的方法,后者采用以”时间换空间”的方式
- 性质每一个使用ThreadLocal变量的线程都获得该变量的副本,副本之间相互独立,这样每一个线程都可以随意修改自己的变量副本,而不会对其他线程产生影响
- 四个函数:initialValue函数用于获取此线程局部变量的初始值;get用于获取此线程局部变量在当前线程中的副本值,set用于设置,remove用于移除此线程局部变量在当前线程中的副本值;顺序在进行get之前,必须先set,否则会报空指针异常;如果想在get之前不需要调用set就能正常访问的话,必须重写initialValue()方法
实现原理:内部持有一个叫做ThreadLocalMap的内部类,从源码可以看出这个ThreadLocalMap的Entry继承了WeakReference,并且使用ThreadLocal作为键值,以键值对形式存储着[ThreadLocal对象, 存放的值],每个线程中可有多个threadLocal变量
public class Bank { // 使用ThreadLocal类管理共享变量account private ThreadLocal<Integer> account = new ThreadLocal<Integer>() { @Override protected Integer initialValue() { return 100; } }; public void save(int money) { account.set(account.get() + money); } public int getAccount() { return account.get(); } }
volatile变量可见性
volatile: 在需要同步的变量上加上修饰符volatile,例如:private volatile int account = 100;volatile可以保证变量的可见性但是不保证原子性,不能用于修饰final常量,线程每一次对volatile变量的修改都会即时刷新到主存和通知到其他线程,也就是说每一次每个线程读取volatile变量时都会从主存中去取而不会从缓存寄存器中获取,以确保变量的值都是最新获取的;该关键字在JDK1.6开始可以保证指令不被重排序;但是他不能保证原子性,比如n++这种复合型操作;java虚拟机规范(jvm spec)中,规定了声明为volatile的long和double变量的get和set操作是原子的,所以将long和double类型的变量用volatile修饰,就可以保证对他们的赋值操作的原子性。适用场景:作为状态标识量适用,也在双重检查的单例中使用;适用于一个线程写,多个线程读的情况;两个条件:①对线程的写操作不依赖于当前值 ② 该变量没有包含在具有其他变量的不等式中
CAS比较并交换操作:CAS 操作包含三个操作数 —— 内存位置(V)、预期原值(A)和新值(B). CAS有效地说明了“我认为位置 V 应该包含值 A;如果包含该值,则更新值为B;否则,不要更改该位置,只告诉我这个位置现在的值即可。” 处理ABA情况,引入[引用, 版本号]机制来确保ABA也说明该数据已经被其他线程修改过
例如,有一个变量i=0,Thread-1和Thread-2都对这个变量执行自增操作。 可能会出现Thread-1与Thread-2同时读取i=0到各自的工作内存中,然后各自执行+1,最后将结果赋予i。这样,虽然两个线程都对i执行了自增操作,但是最后i的值为1,而不是2。
解决这个问题使用互斥锁自然可以。但是也可以使用CAS来实现,思路如下:
自增操作可以分为三步:(1)从内存中读取这个变量的当前值(2)执行(变量=上一步取到的当前值+1)的赋值操作 (3)将自增后的值写入变量
多线程情况下,自增操作出现问题的原因就是执行(2)的时候,变量在主内存中的值已经不等于上一步取到的当前值了,所以赋值时,用CompareAndSet操作代替Set操作:首先比较一下内存中这个变量的值是否等于上一步取到的当前值,如果等于,则说明可以执行+1运算,并赋值;如果不等于,则说明有其他线程在此期间更改了主内存中此变量的值,上一步取出的当前值已经失效,此时,不再执行+1运算及后续的赋值操作,而是返回主内存中此变量的最新值。“比较并交换(compare_and_swap)”操作是原子操作,它使用平台提供的CPU级别上的用于并发操作的硬件原语。
原子(atom)本意是“不能被进一步分割的最小粒子”,而原子操作(atomic operation)意为”不可被中断的一个或一系列操作”
基于CAS的并发算法称为“无锁定算法”,因为线程不必再等待锁定。“无锁定算法”要求某个线程总是执行操作
Wait && Notify同步(生产者消费者模式实现)
- Wait调用任意对象的wait()方法导致该线程阻塞,并释放该对象上的锁,进入等待对象同步锁的状态;Notify随机唤醒一个等待对象同步锁的线程
- wait()方法:当缓冲区已满/空时,生产者/消费者线程停止自己的执行,放弃锁,使自己处于等待状态,让其他线程执行。notify()方法:当生产者/消费者向缓冲区放入/取出一个产品时,向其他等待的线程发出可执行的通知,同时放弃锁,使自己处于等待状态。
- 生产者-消费者(producer-consumer)问题,也称作有界缓冲区(bounded-buffer)问题,两个进程共享一个公共的固定大小的缓冲区。其中一个是生产者,用于将消息放入缓冲区;另外一个是消费者,用于从缓冲区中取出消息。问题出现在当缓冲区已经满了,而此时生产者还想向其中放入一个新的数据项的情形,其解决方法是让生产者此时进行休眠,等待消费者从缓冲区中取走了一个或者多个数据后再去唤醒它。同样地,当缓冲区已经空了,而消费者还想去取消息,此时也可以让消费者进行休眠,等待生产者放入一个或者多个数据时再唤醒它
- 如何停止消费者: 第一种方法: 设置一个Volatile的boolean类型变量作为flag, 生产者结束后标示该变量为true, 消费者轮询这个变量来决定自己是否退出; 第二种方法: 经典的“毒丸”策略,生产者结束后,把一个特别的对象:“毒丸”对象放入队列。消费者从队列中拿到对象后,判断是否是毒丸对象。如果是普通非毒丸对象,则正常消费。如果是毒丸对象,则放回队列(并杀死其他消费者),然后结束自己。
- 代码实现:
public class ProducerCustomer {
/**
* @param args
*/
public static void main(String[] args) {
Product product = new Product(); // 实例化产品对象
Producer p = new Producer(product); // 实例化一个生产者
Consumer c = new Consumer(product); // 实例化一个消费者
new Thread(p).start(); // 开启生产者线程
new Thread(c).start(); // 开启消费者线程
}
/**
* 生产者线程
*/
static class Producer implements Runnable {
private Product product;
public Producer(Product product) {
this.product = product;
}
public void run() {
while (true) {
// 同步代码锁
synchronized (product) {
try {
while (product.getMax() == product.size()) {
// 产品栈已经满,不需要再生产,执行线程等待操作
System.out.println("产品栈已满,生产者休息中......");
product.wait();
}
// 产品栈不足,开始生产
Object newObj = new Object();
product.add(newObj);
System.out.println("Producer 生产了一个产品,目前产品栈大小为:"
+ product.size());
// 生产一个产品之后,线程开始睡眠
Thread.sleep((long) Math.random() * 3000);
// 通知消费者,有新产品
product.notify();
} catch (Exception e) {
}
}
}
}
}
/**
* 消费者线程
*/
static class Consumer implements Runnable {
private Product product;
public Consumer(Product product) {
this.product = product;
}
public void run() {
while (true) {
// 同步代码锁
synchronized (product) {
try {
while (product.size() == 0) {
// 产品栈已空,不可以再消费,执行线程等待操作
System.out.println("产品栈为空,消费者等待中......");
product.wait();
}
// 产品栈有商品,开始消费
product.remove();
System.out.println("Consumer 消费了一个产品,目前产品栈大小为:"
+ product.size());
// 消费一个产品之后,线程开始睡眠
Thread.sleep((long) Math.random() * 3000);
// 通知生产者,消费了一个产品
product.notify();
} catch (Exception e) {
}
}
}
}
}
static class Product {
// 使用list集合模拟一个产品栈
private List<Object> goods = new LinkedList<Object>();
private int max = 10; // 最大产品数量
public void add(Object obj) {
goods.add(obj);
}
public void remove() {
if (goods.size() > 0) {
goods.remove(goods.size() - 1);
}
}
public int size() {
return goods.size();
}
public int getMax() {
return this.max;
}
public void setMax(int max) {
this.max = max;
}
}
}
java.util.concurrent并发包
几乎java.util.concurrent 中的所有类都是在 ReentrantLock 之上构建的,ReentrantLock 则是在原子变量类的基础上构建的
Semaphore(信号量)
- Semaphore(信号量)是用来控制同时访问特定资源的线程数量,它通过协调各个线程,以保证合理的使用公共资源。
- 应用场景
- Semaphore可以用于做流量控制,特别公用资源有限的应用场景,比如数据库连接。假如有一个需求,要读取几万个文件的数据,因为都是IO密集型任务,我们可以启动几十个线程并发的读取,但是如果读到内存后,还需要存储到数据库中,而数据库的连接数只有10个,这时我们必须控制只有十个线程同时获取数据库连接保存数据,否则会报错无法获取数据库连接。这个时候,我们就可以使用Semaphore来做流控
- 使用方法Semaphore的构造方法Semaphore(int permits) 接受一个整型的数字,表示可用的许可证数量。Semaphore(10)表示允许10个线程获取许可证,也就是最大并发数是10。Semaphore的用法也很简单,首先线程使用Semaphore的acquire()获取一个许可证,使用完之后调用release()归还许可证。还可以用tryAcquire()方法尝试获取许可证。
原子变量
java.util.concurrent.atomic包中提供了原子变量的9种风格(AtomicInteger、AtomicLong、 AtomicReference、AtomicBoolean等,其原子地更新一对值)
CountDownLatch
- 利用它可以实现类似计数器的功能; 比如有一个任务A,它要等待其他4个任务执行完毕之后才能执行,就可以利用CountDownLatch来实现
例:假如有Thread1、Thread2、Thread3、Thread4四条线程分别统计C、D、E、F四个盘的大小,所有线程都统计完毕交给Thread5线程去做汇总,实现代码如下:
public class CountDownLatchTest { private static CountDownLatch count = new CountDownLatch(4); private static ExecutorService service = Executors.newFixedThreadPool(6); public static void main(String args[]) throws InterruptedException { for (int i = 0; i < 4; i++) { service.execute(() -> { // 模拟任务耗时 try { int timer = new Random().nextInt(5); TimeUnit.SECONDS.sleep(timer); System.out.printf("%s时完成磁盘的统计任务,耗费%d秒.\n", new Date().toString(), timer); // 任务完成之后,计数器减一 count.countDown(); } catch (InterruptedException e) { e.printStackTrace(); } }); } // 主线程一直被阻塞,知道count的计数器被设置为0 count.await(); System.out.printf("%s时全部任务都完成,执行合并计算.\n", new Date().toString()); service.shutdown(); } }
CyclicBarrier回环栅栏
- 通过它可以实现让一组线程等待至某个状态之后再全部同时执行。叫做回环是因为当所有等待线程都被释放以后,CyclicBarrier可以被重用。我们暂且把这个状态就叫做barrier,当调用await()方法之后,线程就处于barrier了
- 例子:假若有若干个线程都要进行写数据操作,并且只有所有线程都完成写数据操作之后,这些线程才能继续做后面的事情
CountDownLatch VS CyclicBarrier
- CountDownLatch和CyclicBarrier都能够实现线程之间的等待,只不过它们侧重点不同:CountDownLatch一般用于某个线程A等待若干个其他线程执行完任务之后,它才执行;而CyclicBarrier一般用于一组线程互相等待至某个状态,然后这一组线程再同时执行
- CountDownLatch是不能够重用的,而CyclicBarrier是可以重用的。
Fork-Join 框架:
并行分解方法,执行一个任务将首先分解(fork)为多个子任务,完成之后再合并(join)