Java并发编程

线程

概念

image-20200331171731777

01_02

image-20200522121553190

什么是叫一个进程? 什么叫一个线程?

image-20200522121725120
  • Program app ->QQ.exe

    进程:做一个简单的解释,你的硬盘上有一个简单的程序,这个程序叫QQ.exe,这是一个程序,这个程序是一个静态的概念,它被扔在硬盘上也没人理他,但是当你双击它,弹出一个界面输入账号密码登录进去了,OK,这个时候叫做一个进程。进程相对于程序来说它是一个动态的概念

    线程:作为一个进程里面最小的执行单元它就叫一个线程,用简单的话讲一个程序里不同的执行路径就叫做一个线程

启动线程的五种方式

1: 继承Thread类 2: 实现Runnable 3: 线程池Executors.newCachedThrad

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
package com.oi.juc.c_000;

import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.FutureTask;

public class T02_HowToCreateThread {
static class MyThread extends Thread {
@Override
public void run() {
System.out.println("Hello MyThread!");
}
}

static class MyRun implements Runnable {
@Override
public void run() {
System.out.println("Hello MyRun!");
}
}

static class MyCall implements Callable<String> {
@Override
public String call() {
System.out.println("Hello MyCall");
return "success";
}
}

//启动线程的5种方式
public static void main(String[] args) {
// 继承Thread
new MyThread().start();
// 实现Runable
new Thread(new MyRun()).start();
// Lambda
new Thread(()->{
System.out.println("Hello Lambda!");
}).start();
// 实现Callable
Thread t = new Thread(new FutureTask<String>(new MyCall()));
t.start();
// 缓存线程池
ExecutorService service = Executors.newCachedThreadPool();
service.execute(()->{
System.out.println("Hello ThreadPool");
});
service.shutdown();
}
}

image-20200526060941085image-20200526061909808

生命周期

wait(), join(), LockSupport() 进入waiting状态; notify(), notifyAll(), LockSupport
yield() Running –> Ready
等待过得同步代码块的锁, 进入Blocked状态, 获得后, 进入Runnale
image-20200522122259623

image-20200331201536720

常用方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
package com.oi.juc.c_000;

public class T03_Sleep_Yield_Join {
public static void main(String[] args) {
//testSleep();
//testYield();
testJoin();
}
/*Sleep,意思就是睡眠,当前线程暂停一段时间让给别的线程去运行。Sleep是怎么复活的?由你的睡眠时间而定,等睡眠到规定的时间自动复活*/
static void testSleep() {
new Thread(()->{
for(int i=0; i<100; i++) {
System.out.println("A" + i);
try {
Thread.sleep(500);
//TimeUnit.Milliseconds.sleep(500)
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}).start();
}
/*Yield,就是当前线程正在执行的时候停止下来进入等待队列,回到等待队列里在系统的调度算法里头呢还是依然有可能把你刚回去的这个线程拿回来继续执行,当然,更大的可能性是把原来等待的那些拿出一个来执行,所以yield的意思是我让出一下CPU,后面你们能不能抢到那我不管*/
static void testYield() {
new Thread(()->{
for(int i=0; i<100; i++) {
System.out.println("A" + i);
if(i%10 == 0) Thread.yield();


}
}).start();

new Thread(()->{
for(int i=0; i<100; i++) {
System.out.println("------------B" + i);
if(i%10 == 0) Thread.yield();
}
}).start();
}
/*join, 意思就是在自己当前线程加入你调用Join的线程(),本线程等待。等调用的线程运行完了,自己再去执行。t1和t2两个线程,在t1的某个点上调用了t2.join,它会跑到t2去运行,t1等待t2运行完毕继续t1运行(自己join自己没有意义) */
static void testJoin() {
Thread t1 = new Thread(()->{
for(int i=0; i<100; i++) {
System.out.println("A" + i);
try {
Thread.sleep(500);
//TimeUnit.Milliseconds.sleep(500)
} catch (InterruptedException e) {
e.printStackTrace();
}
}
});

Thread t2 = new Thread(()->{

try {
t1.join();
} catch (InterruptedException e) {
e.printStackTrace();
}

for(int i=0; i<100; i++) {
System.out.println("A" + i);
try {
Thread.sleep(500);
//TimeUnit.Milliseconds.sleep(500)
} catch (InterruptedException e) {
e.printStackTrace();
}
}
});

t1.start();
t2.start();
}
}

image-20200331201905775

image-20200331201926560 image-20200331202023954 image-20200331202137156 image-20200331202109191

Synchronized

image-20200526063943625 image-20200526063846222 image-20200526064256437 image-20200526064338898 image-20200526064456019

这道题加Synchronized就没必要加volatile, synchronized既保证同步, 有保证线程可见

image-20200526065140791 image-20200331202636319
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
 /**
* synchronized关键字
* 对某个对象加锁

* 面试题:模拟银行账户
* 对业务写方法加锁
* 对业务读方法不加锁
* 这样行不行?
* 容易产生脏读问题(dirtyRead)

* 同步和非同步方法可以同时调用

* 一个同步方法可以调用另外一个同步方法,一个线程已经拥有某个 对象的锁,再次申请的时候仍然会得到该对象的锁.也就是说 synchronized获得的锁是可重入的
举例:子类调用父类的同步方法

* 程序在执行过程中,如果出现异常,默认情况锁会被释放
* 所以,在并发处理的过程中,有异常要多加小心,不然可能会发生不一致的情况。
* 比如,在一个web app处理过程中,多个servlet线程共同访问同一个资源,这时如果异常处理不合适,
* 在第一个线程中抛出异常,其他线程就会进入同步代码区,有可能会访问到异常产生时的数据。
* 因此要非常小心的处理同步业务逻辑中的异常

·业务逻辑中只有下面这句需要sync,这时不应该给整个方法上锁
·采用细粒度的锁,可以使线程争用时间变短,从而提高效率
·不要以字符串常量作为锁定对象,比如你用到了一个类库,在该类库中代码锁定了字符串“Hello”,可能发生非常诡异的死锁阻塞
·锁定某对象o,如果o的属性发生改变,不影响锁的使用,但是如果o变成另外一个对象,则锁定的对象发生改变,应该避免将锁定对象的引用变成另外的对象

synchronized的底层实现
JDK早期的 重量级 - OS
后来的改进
锁升级的概念:
我就是厕所所长 (一 二)

sync (Object)
markword 记录这个线程ID (偏向锁)
如果线程争用:升级为 自旋锁
10次以后,
升级为重量级锁 - OS

执行时间短(加锁代码),线程数少,用自旋
执行时间长,线程数多,用系统锁**/

多个线程去访问同一个资源的时候对这个资源上锁。

访问某一段代码或者某临界资源的时候是需要有一把锁的概念在这儿的。

01_04

比如:我们对一个数字做递增,两个程序对它一块儿来做递增,递增就是把一个程序往上加1啊,如果两个线程共同访问的时候,第一个线程一读它是0,然后把它加1,在自己线程内部内存里面算还没有写回去的时候而第二个线程读到了它还是0,加1在写回去,本来加了两次,但还是1,那么我们在对这个数字递增的过程当中就上把锁,就是说第一个线程对这个数字访问的时候是独占的,不允许别的线程来访问,不允许别的线程来对它计算,我必须加完1收释放锁,其他线程才能对它继续加。

实质上,这把锁并不是对数字进行锁定的, 你可以任意指定,想锁谁就锁谁。

image-20200331202023954
  1. 上了把锁之后才能对count进行减减访问,你可以new一个Object,所以这里锁定就是o,当我拿到这把锁的时候才能执行这段代码。是锁定的某一个对象,synchronized有一个锁升级的概念
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
/**
*synchronized关键字
*对某个对象加锁
*@author oi
*/
package com.oi.juc.c_001;

public class T {

private int count = 10;
private Object o = new Object();

public void m() {
synchronized(o) { //任何线程要想执行下面的代码,必须先拿到o的锁
count--;
System.out.println(Thread.currentThread().getName() + " count = " + count);
}
}

}
  1. synchronized它的一些特性。如果说你每次都定义个一个锁的对象Object o 把它new出来那加锁的时候太麻烦每次都要new一个新的对象出来,所以呢,有一个简单的方式就是synchronized(this)锁定当前对象就行
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
/**
* synchronized关键字
* 对某个对象加锁
* @author oi
*/
package com.oi.juc.c_002;

public class T {

private int count = 10;

public void m() {
synchronized(this) { ߳
//任何线程想要执行那个下面的代码,必须先要拿到this的锁
count--;
System.out.println(Thread.currentThread().getName() + " count = " + count);
}
}

}
  1. 如果你要是锁定当前对象呢,你也可以写成如下方法。synchronized方法和synchronized(this)执行这段代码它是等值的
1
2
3
4
5
6
7
8
9
10
11
12
package com.oi.juc.c_003;

public class T {

private int count = 10;

public synchronized void m() {
//等同于在方法的代码执行时要synchronized(this)
count--;
System.out.println(Thread.currentThread().getName() + " count = " + count);
}
}
  1. 静态方法static是没有this对象的,你不需要new出一个对象来就能执行这个方法,但如果这个这个上面加一个synchronized的话就代表synchronized(T.class)。这里这个synchronized(T.class)锁的就是T类的对象
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
package com.oi.juc.c_004;

public class T {

private static int count = 10;

public synchronized static void m() {
//这里等同于synchronized(T.class)
count--;
System.out.println(Thread.currentThread().getName() + " count = " + count);
}

public static void mm() {
synchronized(T.class) {
//考虑一下这里写synchronized(this)是否可以?
count --;
}
}
}

问题:T.class是单例的吗?

一个类 load到内存它是不是单例的,想想看。一般情况下是,如果是在同一个ClassLoader空间那它一定是。不是同一个类加载器就不是了,不同的类加载器互相之间也不能访问。所以说你能访问它,那他一定就是单例

  1. 下面程序:很有可能读不到别的线程修改过的内容,除了这点之外count减减完了之后下面的count输出和你减完的结果不对,很容易分析:如果有一个线程把它从10减到9了,然后又有一个线程在前面一个线程还没有输出呢进来了把9又减到了8,继续输出的8,而不是9。如果你想修正它,前面第一个是在上面加volatile,改了马上就能得到。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
/**
* 分析一下这个程序的输出
* @author oi
*/
package com.oi.juc.c_005;

public class T implements Runnable {

private /*volatile*/ int count = 100;

public /*synchronized*/ void run() {
count--;
System.out.println(Thread.currentThread().getName() + " count = " + count);
}

public static void main(String[] args) {
T t = new T();
for(int i=0; i<100; i++) {
new Thread(t, "THREAD" + i).start();
}
}

}
  1. 另外这个之外还可以加synchronized,加了synchronized就没有必要在加volatile了,因为

synchronized既保证了原子性,又保证了可见性。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
//对比上一个小程序
package com.oi.juc.c_006;
public class T implements Runnable {

private int count = 10;

public synchronized void run() {
count--;
System.out.println(Thread.currentThread().getName() + " count = " + count);
}

public static void main(String[] args) {

for(int i=0; i<5; i++) {
T t = new T();
new Thread(t, "THREAD" + i).start();
}
}
}
  1. 如下代码:同步方法和非同步方法是否可以同时调用?就是我有一个synchronized的m1方法,我调用m1的时候能不能调用m2,拿大腿想一想这个是肯定可以的,线程里面访问m1的时候需要加锁,可是访问m2的时候我又不需要加锁,所以允许执行m2。

这些小实验的设计是比较考验功力的,学习线程的时候自己要多动手进行试验,任何一个理论,都可以进行验证。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
/**
*同步和非同步方法是否可以同时调用?
* @author oi
*/
package com.oi.juc.c_007;
public class T {

public synchronized void m1() {
System.out.println(Thread.currentThread().getName() + " m1 start...");
try {
Thread.sleep(10000);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(Thread.currentThread().getName() + " m1 end");
}

public void m2() {
try {
Thread.sleep(5000);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(Thread.currentThread().getName() + " m2 ");
}

public static void main(String[] args) {
T t = new T();

/*new Thread(()->t.m1(), "t1").start();
new Thread(()->t.m2(), "t2").start();*/

new Thread(t::m1, "t1").start();
new Thread(t::m2, "t2").start();

/*
//1.8之前的写法
new Thread(new Runnable() {

@Override
public void run() {
t.m1();
}
});
*/
}
}
  1. 我们在来看一个synchronized应用的例子

我们定义了一个class账户,有名称、余额。写方法给哪个用户设置它多少余额,读方法通过这个名字得到余额值。如果我们给写方法加锁,给读方法不加锁,你的业务允许产生这种问题吗?业务说我中间读到了一些不太好的数据也没关系,如果不允许客户读到中间不好的数据那这个就有问题。正因为我们加了锁的方法和不加锁的方法可以同时运行。

问题比如说:张三,给他设置100块钱启动了,睡了1毫秒之后呢去读它的值,然后再睡2秒再去读它的值这个时候你会看到读到的值有问题,原因是在设定的过程中this.name你中间睡了一下,这个过程当中我模拟了一个线程来读,这个时候调用的是getBalance方法,而调用这个方法的时候是不用加锁的,所以说我不需要等你整个过程执行完就可以读到你中间结果产生的内存,这个现象就叫做脏读。这问题的产生就是synchronized方法和非synchronized方法是同时运行的。解决就是把getBalance加上synchronized就可以了,如果你的业务允许脏读,就可以不用加锁,加锁之后的效率低下。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
/**
* 面试题:模拟银行账户
* 对业务写方法加锁
* 对业务读方法不加锁
* 这样行不行?
*
* 容易产生脏读问题(dirtyRead)
*/
package com.oi.juc.c_008;

import java.util.concurrent.TimeUnit;

public class Account {
String name;
double balance;

public synchronized void set(String name, double balance) {
this.name = name;

try {
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}

this.balance = balance;
}

public /*synchronized*/ double getBalance(String name) {
return this.balance;
}

public static void main(String[] args) {
Account a = new Account();
new Thread(()->a.set("zhangsan", 100.0)).start();

try {
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
e.printStackTrace();
}

System.out.println(a.getBalance("zhangsan"));

try {
TimeUnit.SECONDS.sleep(2);
} catch (InterruptedException e) {
e.printStackTrace();
}

System.out.println(a.getBalance("zhangsan"));
}
}

Synchronized 是可重入锁

锁定的是this

再来看synchronized的另外一个属性:可重入,是synchronized必须了解的一个概念。

如果是一个同步方法调用另外一个同步方法,有一个方法加了锁,另外一个方法也需要加锁,加的是同一把锁也是同一个线程,那这个时候申请仍然会得到该对象的锁。比如说是synchronized可重入的,有一个方法m1 是synchronized有一个方法m2也是synchrionzed,m1里能不能调m2。我们m1开始的时候这个线程得到了这把锁,然后在m1里面调用m2,如果说这个时候不允许任何线程再来拿这把锁的时候就死锁了。这个时候调m2它发现是同一个线程,因为你m2也需要申请这把锁,它发现是同一个线程申请的这把锁,允许,可以没问题,这就叫可重入锁。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
/**
* 一个同步方法可以调用另外一个同步方法,一个线程已经拥有某个对象的锁,再次申请的时候仍然会得到该对象的锁。
* 也就是说synchronized获得锁是可重入的
* synchronized
* @author oi
*/
package com.oi.juc.c_009;

import java.util.concurrent.TimeUnit;

public class T {
synchronized void m1() {
System.out.println("m1 start");
try {
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
e.printStackTrace();
}
m2();
System.out.println("m1 end");
}

synchronized void m2() {
try {
TimeUnit.SECONDS.sleep(2);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("m2");
}

public static void main(String[] args) {
new T().m1();
}
}

模拟一个父类子类的概念,父类synchronized,子类调用super.m的时候必须得可重入,否则就会出问题(调用父类是同一把锁)。所谓的重入锁就是你拿到这把锁之后不停加锁加锁,加好几道,但锁定的还是同一个对象,去一道就减个1,就是这么个概念。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
package com.oi.juc.c_010;

import java.util.concurrent.TimeUnit;

public class T {
synchronized void m() {
System.out.println("m start");
try {
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("m end");
}

public static void main(String[] args) {
new TT().m();
}
}

class TT extends T {
@Override
synchronized void m() {
System.out.println("child m start");
super.m();
System.out.println("child m end");
}
}
1
2
3
4
5
6
7
/**
* 程序在执行过程中,如果出现异常,默认情况锁会被释放
* 所以,在并发处理的过程中,有异常要多加小心,不然可能会发生不一致的情况。
* 比如,在一个web app处理过程中,多个servlet线程共同访问同一个资源,这时如果异常处理不合适,
* 在第一个线程中抛出异常,其他线程就会进入同步代码区,有可能会访问到异常产生时的数据。
* 因此要非常小心的处理同步业务逻辑中的异常
*/

synchronized的底层实现

1
2
3
4
5
6
7
8
9
10
11
12
13
14
synchronized的底层实现
JDK早期的 重量级 - OS
后来的改进
锁升级的概念:
我就是厕所所长 (一 二)

sync (Object)
0.无锁: 没有对资源进行锁定,所有的线程都能访问并修改同一个资源,但同时只有一个线程能修改成功。
1.偏向锁: markword 记录这个线程ID 没加锁, 默认不会有线程抢这把锁
2.自旋锁(CAS): 如果线程争用:升级为 自旋锁 默认10次以后,
3.重量级锁: 向操作系统内核申请, 等待时不占用CPU

执行时间短(加锁代码),线程数少,用自旋
执行时间长,线程数多,用系统锁
image-20200526234924652 image-20200526064113021 image-20200527055724201

Volatile

image-20200527061309216

保证线程可见性

image-20200331210227033
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
/**
volatile 关键字,使一个变量在多个线程间可见
A B线程都用到一个变量,java默认是A线程中保留一份copy,这样如果B线程修改了该变量,则A线程未必知道
使用volatile关键字,会让所有线程都会读到变量的修改值

在下面的代码中,running是存在于堆内存的t对象中
当线程t1开始运行的时候,会把running值从内存中读到t1线程的工作区,在运行过程中直接使用这个copy,并不会每次都去
读取堆内存,这样,当主线程修改running的值之后,t1线程感知不到,所以不会停止运行

使用volatile,将会强制所有线程都去堆内存中读取running的值

可以阅读这篇文章进行更深入的理解
http://www.cnblogs.com/nexiyi/p/java_memory_model_and_thread.html

volatile并不能保证多个线程共同修改running变量时所带来的不一致问题,也就是说volatile不能替代synchronized

*synchronized可以保证可见性和原子性,volatile只能保证可见性
**/

禁止指令重排序

synchronized+双重检查的单例模式要加volatile,防止指令重排序

why?

new对象过程(四条指令):

​ 1)给instance实例分配内存;

  2)初始化instance的构造器;

  3)将instance对象指向分配的内存空间(注意到这步时instance就非null了)

  如果指令按照顺序执行倒也无妨,但JVM为了优化指令,提高程序运行效率,允许指令重排序。如此,在程序真正运行时以上指令执行顺序可能是这样的:

  a)给instance实例分配内存;

  b)将instance对象指向分配的内存空间;

  c)初始化instance的构造器;

  这时候,当线程一执行b)完毕,在执行c)之前,被切换到线程二上,这时候instance判断为非空,此时线程二直接来到return instance语句,拿走instance然后使用,接着就顺理成章地报错(对象尚未初始化)。

image-20200527062720639

volatile保证a初始化之后再赋值给变量

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
public class Mgr06 {
private static volatile Mgr06 INSTANCE; //JIT
private Mgr06() {
}
public static Mgr06 getInstance() {
if (INSTANCE == null) {
//双重检查
synchronized (Mgr06.class) {
if(INSTANCE == null) {
try {
Thread.sleep(1);
} catch (InterruptedException e) {
e.printStackTrace();
}
INSTANCE = new Mgr06();
}
}
}
return INSTANCE;
}
image-20200527062234223
1
2
3
4
·volatile 引用类型(包括数组)只能保证引用本身的可见性,不能保证内部字段的可见性
·volatile并不能保证多个线程共同修改变量时,所带来的不一致问题,也就是说volatile不能替代synchronized
·synchronized可以保证可见性和原子性,volatile只能保证可见性
* 同步代码块中的语句越少越好

锁优化

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
/**
* synchronized优化
* 同步代码块中的语句越少越好
* 比较m1和m2
* @author oi
*/
public class FineCoarseLock {

int count = 0;

synchronized void m1() {
//do sth need not sync
try {
TimeUnit.SECONDS.sleep(2);
} catch (InterruptedException e) {
e.printStackTrace();
}
//业务逻辑中只有下面这句需要sync,这时不应该给整个方法上锁
count ++;

//do sth need not sync
try {
TimeUnit.SECONDS.sleep(2);
} catch (InterruptedException e) {
e.printStackTrace();
}
}

void m2() {
//do sth need not sync
try {
TimeUnit.SECONDS.sleep(2);
} catch (InterruptedException e) {
e.printStackTrace();
}
//业务逻辑中只有下面这句需要sync,这时不应该给整个方法上锁
//采用细粒度的锁,可以使线程争用时间变短,从而提高效率
synchronized (this) {
count++;
}
//do sth need not sync
try {
TimeUnit.SECONDS.sleep(2);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}

Synchronized注意事项

1
2
3
4
5
6
7
/**
* 不要以字符串常量作为锁定对象
* 在下面的例子中,m1和m2其实锁定的是同一个对象
* 这种情况还会发生比较诡异的现象,比如你用到了一个类库,在该类库中代码锁定了字符串“Hello”,
* 但是你读不到源码,所以你在自己的代码中也锁定了"Hello",这时候就有可能发生非常诡异的死锁阻塞,
* 因为你的程序和你用到的类库不经意间使用了同一把锁
*/
1
2
3
4
5
/**
* 锁定某对象o,如果o的属性发生改变,不影响锁的使用
* 但是如果o变成另外一个对象,则锁定的对象发生改变
* 应该避免将锁定对象的引用变成另外的对象
*/

CAS / Atomic类

1
2
3
4
/**
* 解决同样的问题的更高效的方法,使用AtomXXX类
* AtomXXX类本身方法都是原子性的,但不能保证多个方法连续调用是原子性的
**/

CAS是CPU原语支持,判断之后不会被打断

ABA问题:基本类型不影响,引用类型会产生ABA

AtomicStampReference 类, 加时间戳解决ABA问题

CAS调用的是Unsafe
image-20200331230925283

JDK11 CompareAndSet 1.8 CompareAndSwap

image-20200401054224292
CAS是乐观锁
image-20200331231024356
ABA问题怎么解决
  1. AtomicStampedReference:带版本戳的原子引用类型,版本戳为int类型。
  2. AtomicMarkableReference:带版本戳的原子引用类型,版本戳为boolean类型。(只能降低概率, 不能避免)
乐观锁( Optimistic Locking )

乐观锁是相对悲观锁而言的,乐观锁假设数据一般情况下不会造成冲突,所以在数据进行提交更新的时候,才会正式对数据的冲突与否进行检测,如果发现冲突了,则返回给用户错误的信息,让用户决定如何去做。

相对于悲观锁,在对数据库进行处理的时候,乐观锁并不会使用数据库提供的锁机制。一般的实现乐观锁的方式就是记录数据版本。

悲观锁(Pessimistic Lock)

当我们要对一个数据库中的一条数据进行修改的时候,为了避免同时被其他人修改,最好的办法就是直接对该数据进行加锁以防止并发。这种借助数据库锁机制,在修改数据之前先锁定,再修改的方式被称之为悲观并发控制(又名“悲观锁”,Pessimistic Concurrency Control,缩写“PCC”)

悲观锁主要是共享锁或排他锁

  • 共享锁又称为读锁,简称S锁。顾名思义,共享锁就是多个事务对于同一数据可以共享一把锁,都能访问到数据,但是只能读不能修改。
  • 排他锁又称为写锁,简称X锁。顾名思义,排他锁就是不能与其他锁并存,如果一个事务获取了一个数据行的排他锁,其他事务就不能再获取该行的其他锁,包括共享锁和排他锁,但是获取排他锁的事务是可以对数据行读取和修改。

悲观并发控制实际上是“先取锁再访问”的保守策略,为数据处理的安全提供了保证。

三种原子操作对比

1000线程结果:
image-20200527124006570

LongAdder: 分段锁: 也是CAS操作

1000个线程, 分四段锁(250), 最后求和
image-20200527124346716

线程数量少,LongAdder不一定有优势

线程数量少, synchronized反而有优势
image-20200527124231381

ReentrantLock : 可重入锁

Synchronized加在同一类的不同方法, 相当于synchrnized this, 是同一把锁

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
reentrantlock(可重入锁)用于替代synchronized
synchronized功能多

synchronized:自动解锁
Reentrantlock:手动解锁, 一定要解锁, 不能影响后边的线程 try/catch/finally
/**
* reentrantlock用于替代synchronized
* 由于m1锁定this,只有m1执行完毕的时候,m2才能执行
* 这里是复习synchronized最原始的语义
*
* 使用reentrantlock可以完成同样的功能
* 需要注意的是,必须要必须要必须要手动释放锁(重要的事情说三遍)
* 使用syn锁定的话如果遇到异常,jvm会自动释放锁,但是lock必须手动释放锁,因此经常在finally中进行锁的释放
**/

/* 使用reentrantlock可以进行“尝试锁定”tryLock,这样无法锁定,或者在指定时间内无法锁定,线程可以决定是否继续等待*/
public class T03_ReentrantLock3 {
Lock lock = new ReentrantLock();
void m1() {
try {
lock.lock();
for (int i = 0; i < 3; i++) {
TimeUnit.SECONDS.sleep(1);

System.out.println(i);
}
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
lock.unlock();
}
}
/**
* 使用tryLock进行尝试锁定,不管锁定与否,方法都将继续执行
* 可以根据tryLock的返回值来判定是否锁定
* 也可以指定tryLock的时间,由于tryLock(time)抛出异常,所以要注意unclock的处理,必须放到finally中
*/
void m2() {
/*
boolean locked = lock.tryLock();
System.out.println("m2 ..." + locked);
if(locked) lock.unlock();
*/
boolean locked = false;
try {
locked = lock.tryLock(5, TimeUnit.SECONDS);
System.out.println("m2 ..." + locked);
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
if(locked) lock.unlock();
}
}
public static void main(String[] args) {
T03_ReentrantLock3 rl = new T03_ReentrantLock3();
new Thread(rl::m1).start();
try {
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
e.printStackTrace();
}
new Thread(rl::m2).start();
}
}
lockIntenrrupt打断
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
/* 使用ReentrantLock还可以调用lockInterruptibly方法,可以对线程interrupt方法做出响应,
* 在一个线程等待锁的过程中,可以被打断*/
public class T04_ReentrantLock4 {
public static void main(String[] args) {
Lock lock = new ReentrantLock();
Thread t1 = new Thread(()->{
try {
lock.lock();
System.out.println("t1 start");
TimeUnit.SECONDS.sleep(Integer.MAX_VALUE);
System.out.println("t1 end");
} catch (InterruptedException e) {
System.out.println("interrupted!");
} finally {
lock.unlock();
}});
t1.start();
Thread t2 = new Thread(()->{
try {
//lock.lock();
lock.lockInterruptibly(); //可以对interrupt()方法做出响应
System.out.println("t2 start");
TimeUnit.SECONDS.sleep(5);
System.out.println("t2 end");
} catch (InterruptedException e) {
System.out.println("interrupted!");
} finally {
lock.unlock();
}});
t2.start();
try {
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
e.printStackTrace();
}
t2.interrupt(); //打断线程2的等待
}
}
ReentrantLock默认是非公平, 可设置为公平锁
1
2
 /*ReentrantLock还可以指定为公平锁*/
private static ReentrantLock lock=new ReentrantLock(true); //参数为true表示为公平锁,请对比输出结果
ReentrantLock Vs Synchronized
image-20200527130123437

CountDownLatch : 倒数发车

  • 类似 Join : 门闩作用
  • CountDownLatch.countDown(); 是原子操作
  • latch.await(); 阻塞住 countDown到0的时候, 继续执行
  • latch.countDown(); latch数减一
  • 对比Join: CountDownLatch更灵活
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
public class T06_TestCountDownLatch {
public static void main(String[] args) {
usingJoin();
usingCountDownLatch();
}
private static void usingCountDownLatch() {
// 启100个线程, latch = 线程数
Thread[] threads = new Thread[100];
CountDownLatch latch = new CountDownLatch(threads.length);
for(int i=0; i<threads.length; i++) {
threads[i] = new Thread(()->{
int result = 0;
for(int j=0; j<10000; j++) result += j;
latch.countDown();
});
}
for (int i = 0; i < threads.length; i++) {
threads[i].start();
}
try {
latch.await();
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("end latch");
}
private static void usingJoin() {
Thread[] threads = new Thread[100];
for(int i=0; i<threads.length; i++) {
threads[i] = new Thread(()->{
int result = 0;
for(int j=0; j<10000; j++) result += j;
});
}
for (int i = 0; i < threads.length; i++) {
threads[i].start();
}
for (int i = 0; i < threads.length; i++) {
try {
threads[i].join();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
System.out.println("end join");
}
}

CyclicBarrier : 人满发车

  • 自己指定发车的线程数量image-20200401153822854
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
public static void main(String[] args) {
//CyclicBarrier barrier = new CyclicBarrier(20);
CyclicBarrier barrier = new CyclicBarrier(20, () -> System.out.println("满人"));
/*CyclicBarrier barrier = new CyclicBarrier(20, new Runnable() {
@Override
public void run() {
System.out.println("满人,发车");
}});*/
for(int i=0; i<100; i++) {
new Thread(()->{
try {
barrier.await();
} catch (InterruptedException e) {
e.printStackTrace();
} catch (BrokenBarrierException e) {
e.printStackTrace();
}
}).start();
}
}

谷歌开源的组件

配合Zuul网关
image-20200527132511607

Phaser: 相位器

婚礼环节举例, 遗传算法, 一个一个的栅栏
image-20200401153718039

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
static class MarriagePhaser extends Phaser {
@Override
protected boolean onAdvance(int phase, int registeredParties) {
switch (phase) {
case 0:
System.out.println("所有人到齐了!" + registeredParties);
System.out.println();
return false;
case 1:
System.out.println("所有人吃完了!" + registeredParties);
System.out.println();
return false;
case 2:
System.out.println("所有人离开了!" + registeredParties);
System.out.println();
return false;
case 3:
System.out.println("婚礼结束!新郎新娘抱抱!" + registeredParties);
return true;
default:
return true;
}
}
}

ReadWriteLock 读写锁 [ 共享锁 (读锁) + 排它锁 (写锁) ]

  • 读读共享
  • 读写共享
  • 写写排他
image-20200527134243634

https://www.jianshu.com/p/9cd5212c8841

  1. Java并发库中ReetrantReadWriteLock实现了ReadWriteLock接口并添加了可重入的特性
  2. ReentrantReadWriteLock读写锁的效率明显高于synchronized关键字
  3. ReentrantReadWriteLock读写锁的实现中,读锁使用共享模式;写锁使用独占模式,换句话说,读锁可以在没有写锁的时候被多个线程同时持有,写锁是独占的
  4. ReetrantReadWriteLock读写锁的实现中,需要注意的,当有读锁时,写锁就不能获得;而当有写锁时,除了获得写锁的这个线程可以获得读锁外,其他线程不能获得读锁
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
public class T10_TestReadWriteLock {
static Lock lock = new ReentrantLock();
private static int value;
static ReadWriteLock readWriteLock = new ReentrantReadWriteLock();
static Lock readLock = readWriteLock.readLock();
static Lock writeLock = readWriteLock.writeLock();
public static void read(Lock lock) {
try {
lock.lock();
Thread.sleep(1000);
System.out.println("read over!");
//模拟读取操作
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
lock.unlock();
}
}
public static void write(Lock lock, int v) {
try {
lock.lock();
Thread.sleep(1000);
value = v;
System.out.println("write over!");
//模拟写操作
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
lock.unlock();
}
}
public static void main(String[] args) {
//Runnable readR = ()-> read(lock);
Runnable readR = ()-> read(readLock);
//Runnable writeR = ()->write(lock, new Random().nextInt());
Runnable writeR = ()->write(writeLock, new Random().nextInt());
for(int i=0; i<18; i++) new Thread(readR).start();
for(int i=0; i<2; i++) new Thread(writeR).start();
}
}

Semaphore: 限流, 默认非公平, 可改fair参数

AbstractQuenedSynchronizer抽象的队列式同步器

image-20200401163248555
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
public static void main(String[] args) {
//Semaphore s = new Semaphore(2);
//默认非公平
Semaphore s = new Semaphore(2, true);
//允许一个线程同时执行
//Semaphore s = new Semaphore(1);
new Thread(()->{
try {
s.acquire(); // 获得许可, 谁得到, 谁执行
System.out.println("T1 running...");
Thread.sleep(200);
System.out.println("T1 running...");
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
s.release();
}
}).start();
new Thread(()->{
try {
s.acquire();
System.out.println("T2 running...");
Thread.sleep(200);
System.out.println("T2 running...");
s.release();
} catch (InterruptedException e) {
e.printStackTrace();
}
}).start();
}

Exchanger: 执行到交换方法后阻塞, 然后线程交换数据

两个线程之间, 一个线程命令另一个线程阻塞, 交换数据, 继续执行
image-20200401164647991

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
import java.util.concurrent.Exchanger;

public class T12_TestExchanger {

static Exchanger<String> exchanger = new Exchanger<>();

public static void main(String[] args) {
new Thread(()->{
String s = "T1";
try {
s = exchanger.exchange(s);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(Thread.currentThread().getName() + " " + s);

}, "t1").start();

new Thread(()->{
String s = "T2";
try {
s = exchanger.exchange(s);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(Thread.currentThread().getName() + " " + s);

}, "t2").start();
}
}

三道线程通信面试题

到五停止

实现一个容器,提供两个方法,add,size
写两个线程,线程1添加10个元素到容器中,线程2实现监控元素的个数,当个数到5个时,线程2给出提示并结束

wait / notify

notify()不释放锁

wait()释放锁

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
/*
* 这里使用wait和notify做到,wait会释放锁,而notify不会释放锁
* 需要注意的是,运用这种方法,必须要保证t2先执行,也就是首先让t2监听才可以
*
* notify之后,t1必须释放锁,t2退出后,也必须notify,通知t1继续执行
* 整个通信过程比较繁琐
*/
package com.oi.juc.c_020_01_Interview;
public class T04_NotifyFreeLock {
//添加volatile,使t2能够得到通知
volatile List lists = new ArrayList();
public void add(Object o) {
lists.add(o);
}
public int size() {
return lists.size();
}

public static void main(String[] args) {
T04_NotifyFreeLock c = new T04_NotifyFreeLock();

final Object lock = new Object();

new Thread(() -> {
synchronized(lock) {
System.out.println("t2启动");
if(c.size() != 5) {
try {
lock.wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
System.out.println("t2 结束");
//通知t1继续执行
lock.notify();
}
}, "t2").start();

try {
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e1) {
e1.printStackTrace();
}

new Thread(() -> {
System.out.println("t1启动");
synchronized(lock) {
for(int i=0; i<10; i++) {
c.add(new Object());
System.out.println("add " + i);

if(c.size() == 5) {
lock.notify();
//释放锁,让t2得以执行
try {
lock.wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
try {
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}, "t1").start();
}
}
CountDownLatch
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
/* 使用Latch(门闩)替代wait notify来进行通知
* 好处是通信方式简单,同时也可以指定等待时间
* 使用await和countdown方法替代wait和notify
* CountDownLatch不涉及锁定,当count的值为零时当前线程继续运行
* 当不涉及同步,只是涉及线程通信的时候,用synchronized + wait/notify就显得太重了
* 这时应该考虑countdownlatch/cyclicbarrier/semaphore
*/
public class T05_CountDownLatch {
// 添加volatile,使t2能够得到通知
volatile List lists = new ArrayList();
public void add(Object o) {
lists.add(o);
}
public int size() {
return lists.size();
}
public static void main(String[] args) {
T05_CountDownLatch c = new T05_CountDownLatch();
CountDownLatch latch = new CountDownLatch(1);

new Thread(() -> {
System.out.println("t2启动");
if (c.size() != 5) {
try {
latch.await();

//也可以指定等待时间
//latch.await(5000, TimeUnit.MILLISECONDS);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
System.out.println("t2 结束");

}, "t2").start();

try {
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e1) {
e1.printStackTrace();
}

new Thread(() -> {
System.out.println("t1启动");
for (int i = 0; i < 10; i++) {
c.add(new Object());
System.out.println("add " + i);
if (c.size() == 5) {
// 打开门闩,让t2得以执行
latch.countDown();
}
try {
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
e.printStackTrace();
}
}

}, "t1").start();
}
}
LockSupport
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
public class T07_LockSupport_WithoutSleep {
// 添加volatile,使t2能够得到通知
volatile List lists = new ArrayList();
public void add(Object o) {
lists.add(o);
}
public int size() {
return lists.size();
}

static Thread t1 = null, t2 = null;

public static void main(String[] args) {
T07_LockSupport_WithoutSleep c = new T07_LockSupport_WithoutSleep();

t1 = new Thread(() -> {
System.out.println("t1启动");
for (int i = 0; i < 10; i++) {
c.add(new Object());
System.out.println("add " + i);

if (c.size() == 5) {
LockSupport.unpark(t2);
LockSupport.park();
}
}
}, "t1");

t2 = new Thread(() -> {
LockSupport.park();
System.out.println("t2 结束");
LockSupport.unpark(t1);
}, "t2");
t2.start();
t1.start();
}
}
Semaphore + join
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
public class T08_Semaphore {
// 添加volatile,使t2能够得到通知
volatile List lists = new ArrayList();
public void add(Object o) {
lists.add(o);
}
public int size() {
return lists.size();
}

static Thread t1 = null, t2 = null;

public static void main(String[] args) {
T08_Semaphore c = new T08_Semaphore();
Semaphore s = new Semaphore(1);

t1 = new Thread(() -> {
try {
s.acquire();
for (int i = 0; i < 5; i++) {
c.add(new Object());
System.out.println("add " + i);
}
s.release();
} catch (InterruptedException e) {
e.printStackTrace();
}

try {
t2.start();
t2.join();
} catch (InterruptedException e) {
e.printStackTrace();
}
try {
s.acquire();
for (int i = 5; i < 10; i++) {
System.out.println(i);
}
s.release();
} catch (InterruptedException e) {
e.printStackTrace();
}

}, "t1");

t2 = new Thread(() -> {
try {
s.acquire();
System.out.println("t2 结束");
s.release();
} catch (InterruptedException e) {
e.printStackTrace();
}
}, "t2");
//t2.start();
t1.start();
}
}
生产消费

面试题:写一个固定容量同步容器,拥有put和get方法,以及getCount方法,
能够支持2个生产者线程以及10个消费者线程的阻塞调用

wait / notify
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
public class MyContainer1<T> {
final private LinkedList<T> lists = new LinkedList<>();
final private int MAX = 10; //最多10个元素
private int count = 0;

public synchronized void put(T t) {
// 醒了还要判断是不是满了
while(lists.size() == MAX) { //想想为什么用while而不是用if?
try {
this.wait(); //effective java
} catch (InterruptedException e) {
e.printStackTrace();
}
}
lists.add(t);
++count;
this.notifyAll(); // 叫醒所有等待线程, 不能指定叫醒消费线程
}

public synchronized T get() {
T t = null;
while(lists.size() == 0) {
try {
this.wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
t = lists.removeFirst();
count --;
this.notifyAll(); //通知生产者进行生产
return t;
}

public static void main(String[] args) {
MyContainer1<String> c = new MyContainer1<>();
//启动消费者线程
for(int i=0; i<10; i++) {
new Thread(()->{
for(int j=0; j<5; j++) System.out.println(c.get());
}, "c" + i).start();
}

try {
TimeUnit.SECONDS.sleep(2);
} catch (InterruptedException e) {
e.printStackTrace();
}

//启动生产者线程
for(int i=0; i<2; i++) {
new Thread(()->{
for(int j=0; j<25; j++) c.put(Thread.currentThread().getName() + " " + j);
}, "p" + i).start();
}
}
}
ReentrantLock + Condition (背过)

精确叫醒生产 / 消费线程

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
public class MyContainer2<T> {
final private LinkedList<T> lists = new LinkedList<>();
final private int MAX = 10; //最多10个元素
private int count = 0;
private Lock lock = new ReentrantLock();
private Condition producer = lock.newCondition();
private Condition consumer = lock.newCondition();

public void put(T t) {
try {
lock.lock();
while (lists.size() == MAX) { //想想为什么用while而不是用if?
producer.await();
}
lists.add(t);
++count;
consumer.signalAll(); //通知消费者线程进行消费
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
lock.unlock();
}
}

public T get() {
T t = null;
try {
lock.lock();
while (lists.size() == 0) {
consumer.await();
}
t = lists.removeFirst();
count--;
producer.signalAll(); //通知生产者进行生产
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
lock.unlock();
}
return t;
}

public static void main(String[] args) {
MyContainer2<String> c = new MyContainer2<>();
//启动消费者线程
for (int i = 0; i < 10; i++) {
new Thread(() -> {
for (int j = 0; j < 5; j++) System.out.println(c.get());
}, "c" + i).start();
}

try {
TimeUnit.SECONDS.sleep(2);
} catch (InterruptedException e) {
e.printStackTrace();
}

//启动生产者线程
for (int i = 0; i < 2; i++) {
new Thread(() -> {
for (int j = 0; j < 25; j++) c.put(Thread.currentThread().getName() + " " + j);
}, "p" + i).start();
}
}
}
交替打印
常用方法1: LockSupport的park/unpark
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
//Locksupport park 当前线程阻塞(停止)
//unpark(Thread t)
public class T02_00_LockSupport {
static Thread t1 = null, t2 = null;

public static void main(String[] args) throws Exception {
char[] aI = "1234567".toCharArray();
char[] aC = "ABCDEFG".toCharArray();

t1 = new Thread(() -> {

for(char c : aI) {
System.out.print(c);
LockSupport.unpark(t2); //叫醒T2
LockSupport.park(); //T1阻塞
}
}, "t1");

t2 = new Thread(() -> {

for(char c : aC) {
LockSupport.park(); //t2阻塞 放在打印前,可以保证先打印1
System.out.print(c);
LockSupport.unpark(t1); //叫醒t1
}
}, "t2");

t1.start();
t2.start();
}
}
常用方法2: synchronized的wait/notify
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
public class T07_00_sync_wait_notify {
// 标记, 保证t1先打印
private static volatile boolean t2Started = false;
// CountDownLatch也可以实现
//private static CountDownLatch latch = new C(1);

public static void main(String[] args) {
final Object o = new Object();

char[] aI = "1234567".toCharArray();
char[] aC = "ABCDEFG".toCharArray();

new Thread(()->{
//latch.await();

synchronized (o) {
// 保证第一个线程先运行
while(!t2Started) {
try {
o.wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}

for(char c : aI) {
System.out.print(c);
try {
o.notify();
o.wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}

o.notify(); // 第一个线程最终必须要唤醒t2, 不然程序结束不了
}
}, "t1").start();
// t2
new Thread(()->{

synchronized (o) {
for(char c : aC) {
System.out.print(c);
//latch.countDown()
t2Started = true;
try {
o.notify();
o.wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
o.notify();
}
}, "t2").start();
}
}
方法3: ReentrantLock 的 condition 的 signal/await
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
public class T09_00_lock_condition {
public static void main(String[] args) {

char[] aI = "1234567".toCharArray();
char[] aC = "ABCDEFG".toCharArray();

Lock lock = new ReentrantLock();
// 两个等待队列
Condition conditionT1 = lock.newCondition();
Condition conditionT2 = lock.newCondition();

new Thread(()->{
try {
lock.lock();

for(char c : aI) {
System.out.print(c);
conditionT2.signal();
conditionT1.await();
}

conditionT2.signal();

} catch (Exception e) {
e.printStackTrace();
} finally {
lock.unlock();
}

}, "t1").start();

new Thread(()->{
try {
lock.lock();

for(char c : aC) {

System.out.print(c);
conditionT1.signal();
conditionT2.await();
}

conditionT1.signal();

} catch (Exception e) {
e.printStackTrace();
} finally {
lock.unlock();
}
}, "t2").start();
}
}

AQS (所有锁的心)

ReentrantLock源码 lock( )

通过模板方法, (回调函数, 钩子函数)

调用父类方法, 子类去实现

ReentrantLock.lock( ) -> Sync.acquire( ) -> AQS.tryAcquire( ) -> ReentrantLock.nonfairTryAcquire( ) -> AQS.getState( )
image-20200527204622726

image-20200402065727531
  1. node里装的是thread线程

  2. state 是 volatile 修饰的, 设置state的方法

image-20200402065940538

AQS添加节点: 不用给整个链表加锁, 只观测tail节点, 通过CAS, 提高效率

image-20200527224948564

image-20200527230919030 image-20200527231134651

头节点先获得, 第二个一直尝试

image-20200527231438271

image-20200527232418283

AQS使用了模板方法模式,自定义同步器时需要重写下面几个AQS提供的模板方法:

1
2
3
4
5
isHeldExclusively()   //该线程是否正在独占资源。只有用到condition才需要去实现它。 
tryAcquire(int) //独占方式。尝试获取资源,成功则返回true,失败则返回false。
tryRelease(int) //独占方式。尝试释放资源,成功则返回true,失败则返回false。
tryAcquireShared(int) //共享方式。尝试获取资源。负数表示失败;0表示成功,但没有剩余可用资源;正数表示成功,且有剩余资源。
tryReleaseShared(int) //共享方式。尝试释放资源,成功则返回true,失败则返回false

AQS类中的其他方法都是final ,所以无法被其他类使用,只有这几个方法可以被其他类使用。

ReentrantLock 为例,state初始化为0,表示未锁定状态。A线程lock()时,会调用tryAcquire()独占该锁并将state+1。此后,其他线程再tryAcquire()时就会失败,直到A线程unlock()到state=0(即释放锁)为止,其它线程才有机会获取该锁。当然,释放锁之前,A线程自己是可以重复获取此锁的(state会累加),这就是可重入的概念。但要注意,获取多少次就要释放多么次,这样才能保证state是能回到零态的。

再以CountDownLatch以例,任务分为N个子线程去执行,state初始化为N(注意N要与线程个数一致)。这N个子线程是并行执行的,每个子线程执行完后countDown()一次,state会CAS(Compare and Swap)减1。等到所有子线程都执行完后(即state=0),会unpark()主调用线程,然后主调用线程就会从await()函数返回,继续后余动作。

一般来说,自定义同步器要么是独占方法,要么是共享方式,他们也只需实现tryAcquire-tryReleasetryAcquireShared-tryReleaseShared中的一种即可。但AQS也支持自定义同步器同时实现独占和共享两种方式,如 ReentrantReadWriteLock

AQS的数据结构是双向链表 原理CAS + volatile
  1. 核心是int类型的state
  2. state 被 volatile 修饰
  3. AQS 拥有 boolean compareAndSetState(int expect, int update) 方法, 往队尾加节点
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
** 源码 **
/**
* Head of the wait queue, lazily initialized. Except for
* initialization, it is modified only via method setHead. Note:
* If head exists, its waitStatus is guaranteed not to be
* CANCELLED.
*/
private transient volatile Node head;

/**
* Tail of the wait queue, lazily initialized. Modified only via
* method enq to add new wait node.
*/
private transient volatile Node tail;

/**
* The synchronization state.
*/
private volatile int state;

/**
* Returns the current value of synchronization state.
* This operation has memory semantics of a {@code volatile} read.
* @return current state value
*/
protected final int getState() {
return state;
}

/**
* Sets the value of synchronization state.
* This operation has memory semantics of a {@code volatile} write.
* @param newState the new state value
*/
protected final void setState(int newState) {
state = newState;
}

/**
* Atomically sets synchronization state to the given updated
* value if the current state value equals the expected value.
* This operation has memory semantics of a {@code volatile} read
* and write.
*
* @param expect the expected value
* @param update the new value
* @return {@code true} if successful. False return indicates that the actual
* value was not equal to the expected value.
*/
protected final boolean compareAndSetState(int expect, int update) {
// See below for intrinsics setup to support this
return unsafe.compareAndSwapInt(this, stateOffset, expect, update);
}
AQS面试题

image-20200527213938929

image-20200402065631929 image-20200402063557639 image-20200528235029593

ThreadLocal

作用

ThreadLocal是解决线程安全问题一个很好的思路,它通过为每个线程提供一个独立的变量副本解决了变量并发访问的冲突问题。在很多情况下,ThreadLocal比直接使用synchronized同步机制解决线程安全问题更简单,更方便,且结果程序拥有更高的并发性。

设到当前线程的map
image-20200527234707313

image-20200527234813608
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
/**
* ThreadLocal线程局部变量
*
* ThreadLocal是使用空间换时间,synchronized是使用时间换空间
* 比如在hibernate中session就存在与ThreadLocal中,避免synchronized的使用
*
* 运行下面的程序,理解ThreadLocal
*/
package com.oi.juc.c_022_RefTypeAndThreadLocal;

import java.util.concurrent.TimeUnit;

public class ThreadLocal2 {
//volatile static Person p = new Person();
static ThreadLocal<Person> tl = new ThreadLocal<>();

public static void main(String[] args) {

new Thread(()->{
try {
TimeUnit.SECONDS.sleep(2);
} catch (InterruptedException e) {
e.printStackTrace();
}

System.out.println(tl.get());
}).start();

new Thread(()->{
try {
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
e.printStackTrace();
}
tl.set(new Person());
}).start();
}

static class Person {
String name = "zhangsan";
}
}
强软弱虚四种引用

image-20200528125043817

image-20200402113830722
强引用: new 出来的, 不回收

内存空间不足时,Java虚拟机宁愿抛出OutOfMemoryError错误,使程序异常终止,也不会靠随意回收具有强引用对象来解决内存不足的问题。

1
2
3
4
M m = new M();
m = null;
System.gc(); //DisableExplicitGC , 有引用 , M就不会被回收
System.in.read(); // 阻塞住线程 , 观察效果
软引用: 满则回收
  • 软引用是用来描述一些还有用但并非必须的对象。
  • 对于软引用关联着的对象,在系统将要发生内存溢出异常之前,将会把这些对象列进回收范围进行第二次回收。
  • 如果这次回收还没有足够的内存,才会抛出内存溢出异常。
  • 如果一个对象只具有软引用,则内存空间充足时,垃圾回收器不会回收它;如果内存空间不足了,就会回收这些对象的内存。只要垃圾回收器没有回收它,该对象就可以被程序使用。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
/**
* 软引用
* 软引用是用来描述一些还有用但并非必须的对象。
* 对于软引用关联着的对象,在系统将要发生内存溢出异常之前,将会把这些对象列进回收范围进行第二次回收。
* 如果这次回收还没有足够的内存,才会抛出内存溢出异常。
* -Xmx20M
*/
package com.oi.juc.c_022_RefTypeAndThreadLocal;

import java.lang.ref.SoftReference;

public class T02_SoftReference {
public static void main(String[] args) {
SoftReference<byte[]> m = new SoftReference<>(new byte[1024*1024*10]);
//m = null;
System.out.println(m.get());
System.gc();
try {
Thread.sleep(500);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(m.get());

//再分配一个数组,heap将装不下,这时候系统会垃圾回收,先回收一次,如果不够,会把软引用干掉
byte[] b = new byte[1024*1024*15];
System.out.println(m.get());
}
}

//软引用非常适合缓存使用
弱引用:一般用在容器里ThreadLocal<>(), WeakHashMap

作用: 有强引用指向它的时候, 一旦强引用消失, 就不用再管他了

image-20200528124442712

JVM首先将软引用中的对象引用置为null,然后通知垃圾回收器进行回收:

WeakReference对象的生命周期基本由垃圾回收器决定,一旦垃圾回收线程发现了弱引用对象,在下一次GC过程中就会对其进行回收。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
/**
* 弱引用遭到gc就会回收
*/
public class T03_WeakReference {
public static void main(String[] args) {
WeakReference<M> m = new WeakReference<>(new M());

System.out.println(m.get());
System.gc();
System.out.println(m.get());

ThreadLocal<M> tl = new ThreadLocal<>();
tl.set(new M());
tl.remove();
}
}

tl 指向ThreadLocal 是强引用

ThreadLocalMap 的 key 是弱引用

使用ThreadLocal , 不用的对象, 务必remove() 掉, 因为key如果为空了, value永远访问不到, 导致内存泄漏

image-20200402120531506

ThreadLocal的set() 方法 调用了getMap() , key: threadlocal对象, value:

image-20200528125305588

key一个弱引用指向threadlocal

image-20200528125402186
虚引用

垃圾回收时候直接干掉

image-20200528131123856 image-20200402124801164 image-20200402172620123 image-20200402172209626

并发容器

image-20200528134414349

image-20200528220820441

image-20200528220924869

image-20200528221154743

image-20200528221314448

同步容器类

总结:
1:对于map/set的选择使用
HashMap
TreeMap
LinkedHashMap

Hashtable
Collections.sychronizedXXX

ConcurrentHashMap
ConcurrentSkipListMap

2:队列
ArrayList
LinkedList
Collections.synchronizedXXX
CopyOnWriteList
Queue
CocurrentLinkedQueue //concurrentArrayQueue
BlockingQueue
LinkedBQ
ArrayBQ
TransferQueue
SynchronusQueue
DelayQueue执行定时任务

1:Vector Hashtable :早期使用synchronized实现
2:ArrayList HashSet :未考虑多线程安全(未实现同步)
3:HashSet vs Hashtable StringBuilder vs StringBuffer
4:Collections.synchronized***工厂方法使用的也是synchronized

使用早期的同步容器以及Collections.synchronized***方法的不足之处,请阅读:
http://blog.csdn.net/itm_hadf/article/details/7506529

使用新的并发容器
http://xuganggogo.iteye.com/blog/321630

Map的发展历程

image-20200528222832446

hashTable –> hashMap –> synchronizedHashMap –> concurrentHashMap

  1. hashTable 整体加锁, 所有方法加synchronized

  2. hashmap 没有锁, 并发场景数据不一致

  3. synchronizedHashMap 在 hashMap 的基础上使用了细粒度锁

  4. concurrentHashMap 使用了CAS, 插入效率一般, 读取效率特别高

从Vector -> List -> Queue ( 解决超卖问题 )

Vector: 古老的同步容器, 方法都是synchronized, 但是两个同步方法之间的业务逻辑不是原子性的, 还是会发生超卖问题, 因此需要再外层再加synchronized

ConcurrentLinkedQueue: CAS实现, 适合代码执行时间短

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
/**
* 有N张火车票,每张票都有一个编号
* 同时有10个窗口对外售票
* 请写一个模拟程序
*
* 分析下面的程序可能会产生哪些问题?
* 重复销售?超量销售?
*
* 使用Vector或者Collections.synchronizedXXX
* 分析一下,这样能解决问题吗?
*
* 就算操作A和B都是同步的,但A和B组成的复合操作也未必是同步的,仍然需要自己进行同步
* 就像这个程序,判断size和进行remove必须是一整个的原子操作
*/
public class TicketSeller3 {
static List<String> tickets = new LinkedList<>();
static {
for(int i=0; i<1000; i++) tickets.add("票 编号:" + i);
}
public static void main(String[] args) {
for(int i=0; i<10; i++) {
new Thread(()->{
while(true) {
synchronized(tickets) {
if(tickets.size() <= 0) break;
try {
TimeUnit.MILLISECONDS.sleep(10);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("销售了--" + tickets.remove(0));
}
}
}).start();
}
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
 /* 使用ConcurrentLinkedQueue提高并发性 */
public class TicketSeller4 {
static Queue<String> tickets = new ConcurrentLinkedQueue<>();
static {
for(int i=0; i<1000; i++) tickets.add("票 编号:" + i);
}
public static void main(String[] args) {

for(int i=0; i<10; i++) {
new Thread(()->{
while(true) {
String s = tickets.poll();
if(s == null) break;
else System.out.println("销售了--" + s);
}
}).start();
}
}
}

Synchronized Vs CAS

要看并发量高低, 并发量不高, 代码执行时间长, 用Synchronized

跳表

ConcurrentSkipListMap

跳表比链表查找快, 比treeMap的cas操作容易

image-20200528230935451

写时复制

CopyOnWriteArrayList
CopyOnWriteArraySet

往一个容器添加元素的时候,不直接往当前容器添加,而是先将当前容器进行Copy,复制出一个新的容器,然后新的容器里添加元素,添加完元素之后,再将原容器的引用指向新的容器。

CopyOnWriteArraySet内部维护着一个CopyOnWriteArrayList

写时排他锁, 读时共享锁, 适合读特别多, 写特别少

LinkedBlockingQueue: 天生的生产者/消费者模型

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
static BlockingQueue<String> strs = new LinkedBlockingQueue<>();
static Random r = new Random();
public static void main(String[] args) {
new Thread(() -> {
for (int i = 0; i < 100; i++) {
try {
strs.put("a" + i); //如果满了,就会等待
TimeUnit.MILLISECONDS.sleep(r.nextInt(1000));
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}, "p1").start();

for (int i = 0; i < 5; i++) {
new Thread(() -> {
for (;;) {
try {
System.out.println(Thread.currentThread().getName() + " take -" + strs.take()); //如果空了,就会等待
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}, "c" + i).start();

}
}

ArrayBlockingQueue

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
static BlockingQueue<String> strs = new ArrayBlockingQueue<>(10);

static Random r = new Random();

public static void main(String[] args) throws InterruptedException {
for (int i = 0; i < 10; i++) {
strs.put("a" + i);
}

//strs.put("aaa"); // 满了就会等待,程序阻塞
//strs.add("aaa"); // 满了报错
//strs.offer("aaa"); // 满了结束, 用返回值判断
strs.offer("aaa", 1, TimeUnit.SECONDS); // 满了等一秒, 加不进去, 返回

System.out.println(strs);
}
Queue和List的区别

Queue多了很多线程友好的方法, 或者阻塞, 或者时间等待
offer()
peek()
poll()
put()
take()

DelayQueue:

按时间进行任务调度, 等待多长时间后执行

本质是PriorityQueue, 需要指定一种排序方式, 例如等待时间短的先执行

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
public class T07_DelayQueue {

static BlockingQueue<MyTask> tasks = new DelayQueue<>();

static Random r = new Random();

static class MyTask implements Delayed {
String name;
long runningTime;

MyTask(String name, long rt) {
this.name = name;
this.runningTime = rt;
}

@Override
public int compareTo(Delayed o) {
if(this.getDelay(TimeUnit.MILLISECONDS) < o.getDelay(TimeUnit.MILLISECONDS))
return -1;
else if(this.getDelay(TimeUnit.MILLISECONDS) > o.getDelay(TimeUnit.MILLISECONDS))
return 1;
else
return 0;
}

@Override
public long getDelay(TimeUnit unit) {

return unit.convert(runningTime - System.currentTimeMillis(), TimeUnit.MILLISECONDS);
}


@Override
public String toString() {
return name + " " + runningTime;
}
}

public static void main(String[] args) throws InterruptedException {
long now = System.currentTimeMillis();
MyTask t1 = new MyTask("t1", now + 1000);
MyTask t2 = new MyTask("t2", now + 2000);
MyTask t3 = new MyTask("t3", now + 1500);
MyTask t4 = new MyTask("t4", now + 2500);
MyTask t5 = new MyTask("t5", now + 500);

tasks.put(t1);
tasks.put(t2);
tasks.put(t3);
tasks.put(t4);
tasks.put(t5);

System.out.println(tasks);

for(int i=0; i<5; i++) {
System.out.println(tasks.take());
}
}
}
PriorityQueue

内部是一个红黑树
可自定义compareTo
image-20200529064306360

SynchronousQueue 用处最大, 任务调度

容量为0, 只能用put()阻塞 , 用来手递手给另一个线程数据
类似Exchanger

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
public static void main(String[] args) throws InterruptedException {
BlockingQueue<String> strs = new SynchronousQueue<>();

new Thread(()->{
try {
System.out.println(strs.take());
} catch (InterruptedException e) {
e.printStackTrace();
}
}).start();

strs.put("aaa"); //阻塞等待消费者消费
//strs.put("bbb");
//strs.add("aaa");
System.out.println(strs.size());
}

TransferQueue & LinkedTransferQueue

transfer(), 装完等着, 等其他线程取走
用于订单提交后, 确认有人处理订单, 再返回
收钱, 确认收到, 返回

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
public static void main(String[] args) throws InterruptedException {
LinkedTransferQueue<String> strs = new LinkedTransferQueue<>();

new Thread(() -> {
try {
System.out.println(strs.take());
} catch (InterruptedException e) {
e.printStackTrace();
}
}).start();

strs.transfer("aaa");

//strs.put("aaa");
/*new Thread(() -> {
try {
System.out.println(strs.take());
} catch (InterruptedException e) {
e.printStackTrace();
}
}).start();*/

}

线程任务

Excutor

image-20200522221235180image-20200403195118211
image-20200531072858642

ExcutorService 方法

主要是submint和shutdown和shutdownnow

image-20200529125814455 image-20200403201506150

Callable

类似Runnable, 相当于Runnable + return 一个

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
public class T03_Callable {
public static void main(String[] args) throws ExecutionException, InterruptedException {
Callable<String> c = new Callable() {
@Override
public String call() throws Exception {
return "Hello Callable";
}
};
ExecutorService service = Executors.newCachedThreadPool();
Future<String> future = service.submit(c); //异步

System.out.println(future.get());//阻塞

service.shutdown();
}
}

Future

存储执行的任务将来才会产生的结果
ExcutorService.submit(task, result) 返回值是Future

image-20200529125137980

FutureTask

更灵活, 既是一个Runnable又可以存结果
Future + Runnable
image-20200529130051594
image-20200529130142044

FutureTask对比Callable

Callable执行完, 需要另外的Future进行存储
FutureTask执行完, 结果保存在自己这
image-20200529130307731

1
2
3
4
5
6
7
8
9
10
11
12
13
public class T06_00_Future {
public static void main(String[] args) throws InterruptedException, ExecutionException {

FutureTask<Integer> task = new FutureTask<>(()->{
TimeUnit.MILLISECONDS.sleep(500);
return 1000;
}); //new Callable () { Integer call();}

new Thread(task).start();

System.out.println(task.get()); //阻塞
}
}

CompletableFutrue

方便各种任务的管理, 同时管理多个Future

1
2
3
4
5
6
7
8
9
10
CompletableFuture<Double> futureTM = CompletableFuture.supplyAsync(()->priceOfTM());
CompletableFuture<Double> futureTB = CompletableFuture.supplyAsync(()->priceOfTB());
CompletableFuture<Double> futureJD = CompletableFuture.supplyAsync(()->priceOfJD());

CompletableFuture.allOf(futureTM, futureTB, futureJD).join();

CompletableFuture.supplyAsync(()->priceOfTM())
.thenApply(String::valueOf)
.thenApply(str-> "price " + str)
.thenAccept(System.out::println);

线程池

概念

线程池 : 线程集合hashset + 任务集合

image-20200403203015066

image-20200403195118211image-20200403201544155

image-20200522120920520

工作原理

image-20200522120855575 image-20200522121506135

分类

image-20200522120956504

生命周期

image-20200530071412193

Running状态的线程, 调用ShutDown() 进入ShutDown状态; 调用ShutDownNow(), 进入Stop状态

image-20200522121038785

七个参数

image-20200522121055638 image-20200522121106590

image-20200529132654110

image-20200403190747668
拒绝策略: 1.任务队列满 2.线程池满

JDK默认提供四种

image-20200529133742585

阿里规约
image-20200529133303945

image-20200529133426122

submit与execute区别

(1)可以接受的任务类型

​ submit:

img

​ execute:

img

​ 可以看出:

​ execute只能接受Runnable类型的任务

​ submit不管是Runnable还是Callable类型的任务都可以接受,但是Runnable返回值均为void,所以使用Future的get()获得的还是null

(2)返回值

​ 由Callable和Runnable的区别可知:

​ execute没有返回值

​ submit有返回值,所以需要返回值的时候必须使用submit

(3)异常

1.execute中抛出异常

execute中的是Runnable接口的实现,所以只能使用try、catch来捕获CheckedException,通过实现UncaughtExceptionHande接口处理UncheckedException

​ 即和普通线程的处理方式完全一致

2.submit中抛出异常

不管提交的是Runnable还是Callable类型的任务,如果不对返回值Future调用get()方法,都会吃掉异常

Executors

线程池的工厂, 工具类

JDK自带线程池

SingleThreadPool
  • 意义在于: 任务队列不用自己维护, 线程的生命周期不自己管理
  • 核心和最大都是1
  • 任务队列是阻塞队列
  • 阻塞队列最大Integer.MAX
  • 使用单个工作线程来执行一个无边界的队列。(注意,如果单个线程在执行过程中因为某些错误中止,新的线程会替代它执行后续线程)。它可以保证认为是按顺序执行的,任何时候都不会有多于一个的任务处于活动状态。和 newFixedThreadPool(1) 的区别在于,如果线程遇到错误中止,它是无法使用替代线程的。
    image-20200530065410475
1
2
3
4
5
6
7
8
9
10
11
12
// 顺序执行
public static void main(String[] args) {
ExecutorService service = Executors.newSingleThreadExecutor();
for(int i=0; i<5; i++) {
final int j = i;
// 没有线程, 阻塞住
service.execute(()->{

System.out.println(j + " " + Thread.currentThread().getName());
});
}
}
CachedThreadPool
  • 核心线程数0, 最大线程数Integer.MAX_VALUE (线程数几乎达不到,区别于阻塞队列的MAX)
  • 空闲线程存活时间60S
  • 任务队列是同步队列
  • 特点: 来了就进入任务队列, 任务队列SynchronousQueue是手递手的, 容量为0
    来一个任务, 必须马上执行, 没有线程执行, 马上new一个
  • 优势: 大量短生命周期的异步任务时, 不会堆积, 不会启动特别多的线程时使用
  • 调用 execute 时,重用空闲线程,如果不存在空闲线程,那么会重新创建一个新的线程。如果线程超过 60 秒还未被使用,就会被中止并从缓存中移除。因此,线程池在长时间空闲后不会消耗任何资源
    image-20200530065629729
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
public static void main(String[] args) throws InterruptedException {
ExecutorService service = Executors.newCachedThreadPool();
System.out.println(service);
for (int i = 0; i < 2; i++) {
service.execute(() -> {
try {
TimeUnit.MILLISECONDS.sleep(500);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(Thread.currentThread().getName());
});
}
System.out.println(service);

TimeUnit.SECONDS.sleep(80);

System.out.println(service);
}
FixedThreadPool
  • 固定线程数量
  • 所有线程都是核心线程, 因此没有回收, 空闲回收时间是0
  • 任务队列是阻塞队列, 最大Integer.MAX_VALUE
  • 复用 固定数量的线程 处理一个 共享的无边界队列 。任何时间点,最多有 nThreads 个线程会处于活动状态执行任务。如果当所有线程都是活动时,有多的任务被提交过来,那么它会一致在队列中等待直到有线程可用。如果任何线程在执行过程中因为错误而中止,新的线程会替代它的位置来执行后续的任务。所有线程都会一致存于线程池中,直到执行 ExecutorService.shutdown() 关闭。
    image-20200530073303210
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
// 4核CPU, 4个线程, 并行执行任务, 计算0-20000的质数
// 单线程2.8秒, 四线程不到1秒
final int cpuCoreNum = 4;

ExecutorService service = Executors.newFixedThreadPool(cpuCoreNum);

MyTask t1 = new MyTask(1, 80000); //1-5 5-10 10-15 15-20
MyTask t2 = new MyTask(80001, 130000);
MyTask t3 = new MyTask(130001, 170000);
MyTask t4 = new MyTask(170001, 200000);

Future<List<Integer>> f1 = service.submit(t1);
Future<List<Integer>> f2 = service.submit(t2);
Future<List<Integer>> f3 = service.submit(t3);
Future<List<Integer>> f4 = service.submit(t4);

start = System.currentTimeMillis();
f1.get();
f2.get();
f3.get();
f4.get();
end = System.currentTimeMillis();
System.out.println(end - start);
并发 & 并行
image-20200530075537742
CachedThreadPool和FixedThreadPool选择

任务量忽大忽小, 用Cached
任务量比较平稳, 用Fixed

ScheduledThreadPool
  • 定时任务线程池

    image-20200530075137625
image-20200530074810921
1
2
3
4
5
6
7
8
9
10
11
public static void main(String[] args) {
ScheduledExecutorService service = Executors.newScheduledThreadPool(4);
service.scheduleAtFixedRate(()->{
try {
TimeUnit.MILLISECONDS.sleep(new Random().nextInt(1000));
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(Thread.currentThread().getName());
}, 0, 500, TimeUnit.MILLISECONDS);
}
WorkStealingPool
  • 每个线程维护一个自己的任务队列

  • 空闲的线程从其他线程的队列中偷一个任务执行

  • new了一个ForkJoinPool

    image-20200530093103697 image-20200530092942647
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
public static void main(String[] args) throws IOException {
ExecutorService service = Executors.newWorkStealingPool();
System.out.println(Runtime.getRuntime().availableProcessors());

service.execute(new R(1000));
service.execute(new R(2000));
service.execute(new R(2000));
service.execute(new R(2000)); //daemon
service.execute(new R(2000));

//由于产生的是精灵线程(守护线程、后台线程),主线程不阻塞的话,看不到输出
System.in.read();
}

static class R implements Runnable {
int time;
R(int t) {
this.time = t;
}

@Override
public void run() {
try {
TimeUnit.MILLISECONDS.sleep(time);
} catch (InterruptedException e) {
e.printStackTrace();
}

System.out.println(time + " " + Thread.currentThread().getName());
}
}
ForkJoinPool
  • 父任务分成小任务, 最后结果汇总image-20200530093324219 image-20200530093513702 image-20200530093851923
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
public class T12_ForkJoinPool {
static int[] nums = new int[1000000];
static final int MAX_NUM = 50000;
static Random r = new Random();

static {
for(int i=0; i<nums.length; i++) {
nums[i] = r.nextInt(100);
}

System.out.println("---" + Arrays.stream(nums).sum()); //stream api
}

// 不带返回值的
static class AddTask extends RecursiveAction {

int start, end;

AddTask(int s, int e) {
start = s;
end = e;
}

@Override
protected void compute() {

if(end-start <= MAX_NUM) {
long sum = 0L;
for(int i=start; i<end; i++) sum += nums[i];
System.out.println("from:" + start + " to:" + end + " = " + sum);
} else {

int middle = start + (end-start)/2;

AddTask subTask1 = new AddTask(start, middle);
AddTask subTask2 = new AddTask(middle, end);
subTask1.fork();
subTask2.fork();
}
}
}

// 带返回值的
static class AddTaskRet extends RecursiveTask<Long> {
private static final long serialVersionUID = 1L;
int start, end;
AddTaskRet(int s, int e) {
start = s;
end = e;
}

@Override
protected Long compute() {
// 达到可执行数量
if(end-start <= MAX_NUM) {
long sum = 0L;
for(int i=start; i<end; i++) sum += nums[i];
return sum;
}
// 没达到数量, 继续拆分
int middle = start + (end-start)/2;

AddTaskRet subTask1 = new AddTaskRet(start, middle);
AddTaskRet subTask2 = new AddTaskRet(middle, end);
subTask1.fork();
subTask2.fork();

return subTask1.join() + subTask2.join();
}

}

public static void main(String[] args) throws IOException {
/*ForkJoinPool fjp = new ForkJoinPool();
AddTask task = new AddTask(0, nums.length);
fjp.execute(task);*/

T12_ForkJoinPool temp = new T12_ForkJoinPool();

ForkJoinPool fjp = new ForkJoinPool();
AddTaskRet task = new AddTaskRet(0, nums.length);
fjp.execute(task);
long result = task.join();
System.out.println(result);

//System.in.read();
}
}
ParallelStreamAPI 并行流式API

并行流式API, 效率高于foreach, 底层是ForkJoinPool, 快四倍

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
public static void main(String[] args) {
List<Integer> nums = new ArrayList<>();
Random r = new Random();
for(int i=0; i<10000; i++) nums.add(1000000 + r.nextInt(1000000));

//System.out.println(nums);

long start = System.currentTimeMillis();
nums.forEach(v->isPrime(v));
long end = System.currentTimeMillis();
System.out.println(end - start); // 1916

// 并行流
start = System.currentTimeMillis();
nums.parallelStream().forEach(T13_ParallelStreamAPI::isPrime);
end = System.currentTimeMillis();

System.out.println(end - start); // 514
}

static boolean isPrime(int num) {
for(int i=2; i<=num/2; i++) {
if(num % i == 0) return false;
}
return true;
}

自定义拒绝策略

MyHandler implements RejectedExecutionHandler

image-20200530090527305

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
public static void main(String[] args) {
ExecutorService service = new ThreadPoolExecutor(4, 4,
0, TimeUnit.SECONDS, new ArrayBlockingQueue<>(6),
Executors.defaultThreadFactory(),
new MyHandler());
}

static class MyHandler implements RejectedExecutionHandler {

@Override
public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
//log("r rejected")
//save r kafka mysql redis
//try 3 times
if(executor.getQueue().size() < 10000) {
//try put again();
}
}
}

线程池源码

image-20200530092516213

image-20200730134028335