Java基础知识-并发编程

并发编程的基础、并发模拟工具及代码。

并发编程的基础

CPU多级缓存

时间局部性 空间局部性
缓存一致性: M E S I 用于保证多个CPU Cache之间共享数据的一致
M 修改状态
E 独享状态
S 共享状态
I 无效状态

乱序执行优化

为了提高运算速度而做出违背代码原有顺序的优化

JAVA内存模型 (Java Memory Model - JMM)

Heap 堆
可动态分配内存大小,生存期不需事先告诉编译器,存取速度慢,常用于存放对象
Thread Stack 栈
存取速度快,仅次于寄存器,数据大小和生存期必须确定,常用于存取基本类型变量,本地变量

栈上的引用变量数据可以访问引用的堆对象,如果两个线程同时调用了同一个对象,都会访问成员变量,但是这两个线程会有自己变量的私有拷贝。

每个线程都有自己的“本地内存”,Java中线程的“本地内存”指的是CPU寄存器和高速缓存的抽象描述。两个线程之间的变量共享是要通过主内存的。

Java内存模型-同步八种操作

  • lock(锁定):作用于主内存的变量,把一个变量标识为一条线程独占状态

  • unlock(解锁):作用于主内存的变量,把一个处于锁定状态的变量释放出来,释放后的变量才可以被其他线程锁定。

  • read(读取):作用于主内存的变量,把一个变量值从主内存传输到线程的工作内存中以便随后的load动作使用。

  • load(载入): 作用于工作内存的变量,它把read操作从主内存中得到的变量值放入工作内存的变量副本中

  • use(使用):作用于工作内存的变量,把工作内存中的一个变量值传递给执行引擎

  • assign(赋值):作用于工作内存的变量,它把一个从执行引擎接收到的值赋值给工作内存的变量

  • store(存储):作用于工作内存的变量,把工作内存中的一个变量的值传送到主内存中,以便随后的write的操作

  • write(写入):作用于主内存的变量,它把store操作从工作内存中一个变量的值传送到主内存的变量中

工作规则:

  • 如果要把一个变量从主内存中复制到工作内存,就需要按顺序地执行read和load操作,如果把变量从工作内存中同步回主内存中,就要按顺序地执行store和write操作。但Java内存模型只要求上述操作必须按顺序执行,而没有保证必须是连续执行。

  • 不允许read和load、store和write操作之一单独出现

  • 不允许一个线程丢弃它的最近assign的操作,即变量在工作内存中改变了之后必须同步到主内存中

  • 不允许—个线程无原因地(没有发生过任何assign操作)把数据从工作内存同步回主内存中

  • 一个新的变量只能在主内存中诞生,不允许在工作内存中直接使用一个未被初始化(load或assign )的变量。即就是对一个变量实施use和store操作之前,必须先执行过了assign和load操作

  • 一个变量在同_时刻只允许一条线程对其进行lock操作,但lock操作可以被同一条线程重复执行多次,多次执行lock后,只有执行相同次数的unlock操作,变量才会被解锁。lock和unlock必须成对出现

  • 如果对一个变量执行lock操作,将会清空工作内存中此变量的值,在执行引擎使用这个变量前需要重新执行load或assign操作初始化变量的值

  • 不允许一个线程无原因地(没有发生过任何assign操作)把数据从工作内存同步回主内存中

  • 一个新的变量只能在主内存中诞生/不允许在工作内存中直接使用一个未被初始化(load或assign )的变量。即就是对一个变量实施use和store操作之前, 必须先执行过了assign和load操作

并发模拟工具及代码:

工具:

PostMan

Apache Bench

Apache JMeter

代码:

CountDownLatch - 闭锁:1. await() 2. countDown() 阻塞线程,满足某种特定的条件继续执行,常与与线程池一同使用。

Semaphore - 信号量:可以阻塞线程,控制同一时间请求的并发量

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
package com.leezy.concurrency;

import com.leezy.concurrency.annoations.NotThreadSafe;
import lombok.extern.slf4j.Slf4j;

import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Semaphore;

/**
* @program: spring-cloud-leezy
* @description: 测试并发
* @author: LEEZY
* @create: 2019-11-30 16:03
**/

@Slf4j
@NotThreadSafe
// count:4962 结果不准确,线程不安全
public class ConcurrencyTest {

// 请求总数
public static int clientTotal = 5000;

// 同时并发执行的线程数
public static int threadTotal = 200;

public static int count = 0;

private static void add() {
count++;
}

public static void main(String[] args) throws InterruptedException {
// 线程池
ExecutorService executorService = Executors.newCachedThreadPool();
// 信号量以及并发线程数
final Semaphore semaphore = new Semaphore(threadTotal);
// 计数器-闭锁 放入请求总数
final CountDownLatch countDownLatch = new CountDownLatch(clientTotal);
for (int i = 0; i < clientTotal; i++) {
executorService.execute(() -> {
try {
// 根据并发的限制数量, 判断当前线程是否允许被执行
semaphore.acquire();
add();
// 释放线程
semaphore.release();
} catch (InterruptedException e) {
log.error("exception", e);
}
// 执行一次就进行releaseShared
countDownLatch.countDown();
});
}
// 这个方法可以保证 countDown 减为0
countDownLatch.await();
// 关闭线程池
executorService.shutdown();
log.info("count:" + count);

}

}

线程安全性

原子性:提供了互斥访问,同一时刻只能有一个线程来对它进行操作。 - Atomic包

ThreadLocal的使用

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
import java.util.ArrayList;
import java.util.List;
import java.util.logging.Logger;

public class ThreadLocalTest {

private Logger logger = Logger.getLogger("logger");

private ThreadLocal<String> mThreadLocal = new ThreadLocal<>();

private void testThreadLocal() throws InterruptedException {
mThreadLocal.set("main-thread");
logger.info("result1: " + mThreadLocal.get());
Thread thread1 = new Thread() {
@Override
public void run() {
super.run();
// thread1内部设置的值只有thead1内部可以得到,所以result3为空
mThreadLocal.set("thread_one");
logger.info("result2: " + mThreadLocal.get());
}
};
thread1.start();
thread1.join();
Thread thread2 = new Thread() {
public void run() {
super.run();
logger.info("result3: " + mThreadLocal.get());
}
};
thread2.start();
thread2.join();
logger.info("result4: " + mThreadLocal.get());
}

public static void main(String[] args) {
ThreadLocalTest threadLocalTest = new ThreadLocalTest();
try {
threadLocalTest.testThreadLocal();
} catch (Exception e) {
System.out.println(e.getMessage());
}
}
}

输出:

1
2
3
4
5
6
7
8
# 十二月 15, 2019 4:36:21 下午 ThreadLocalTest testThreadLocal
信息: result1: main-thread
# 十二月 15, 2019 4:36:21 下午 ThreadLocalTest$1 run
信息: result2: thread_one
# 十二月 15, 2019 4:36:21 下午 ThreadLocalTest$2 run
信息: result3: null
# 十二月 15, 2019 4:36:21 下午 ThreadLocalTest testThreadLocal
信息: result4: main-thread

ThreadLocal详解

四个接口:

  • public void set(T value) { }
    设置当前线程的ThreadLocal值为指定的Value

  • public T get() { }
    获取当前线程所对应的ThreadLocal值,如果当前线程下没有值,就调用initialValue函数对其进行初始化

  • public void remove() { }
    删除当前线程ThreadLocal对应的值,当前线程结束时,系统会自动回收线程局部变量值。P.S. remove方法调用后再调用get方法会使
    initialValue()重新被调用,从而使ThreadLocal的值被重新被初始化。

  • protected T initilValue() { }
    当前线程通过get()方法第一次对ThreadLocal进行访问时,会调用该方法。

如果我们希望ThreadLocal拥有一个不为null的初始值,则应该为ThreadLocal定义一个子类,并重写initialValue()方法。

0%