并发编程的基础、并发模拟工具及代码。
并发编程的基础
CPU多级缓存
时间局部性 空间局部性
缓存一致性: M E S I 用于保证多个CPU Cache之间共享数据的一致
M 修改状态
E 独享状态
S 共享状态
I 无效状态
乱序执行优化
为了提高运算速度而做出违背代码原有顺序的优化
JAVA内存模型 (Java Memory Model - JMM)
Heap 堆
可动态分配内存大小,生存期不需事先告诉编译器,存取速度慢,常用于存放对象
Thread Stack 栈
存取速度快,仅次于寄存器,数据大小和生存期必须确定,常用于存取基本类型变量,本地变量
栈上的引用变量数据可以访问引用的堆对象,如果两个线程同时调用了同一个对象,都会访问成员变量,但是这两个线程会有自己变量的私有拷贝。
每个线程都有自己的“本地内存”,Java中线程的“本地内存”指的是CPU寄存器和高速缓存的抽象描述。两个线程之间的变量共享是要通过主内存的。
Java内存模型-同步八种操作
lock(锁定):作用于主内存的变量,把一个变量标识为一条线程独占状态
unlock(解锁):作用于主内存的变量,把一个处于锁定状态的变量释放出来,释放后的变量才可以被其他线程锁定。
read(读取):作用于主内存的变量,把一个变量值从主内存传输到线程的工作内存中以便随后的load动作使用。
load(载入): 作用于工作内存的变量,它把read操作从主内存中得到的变量值放入工作内存的变量副本中
use(使用):作用于工作内存的变量,把工作内存中的一个变量值传递给执行引擎
assign(赋值):作用于工作内存的变量,它把一个从执行引擎接收到的值赋值给工作内存的变量
store(存储):作用于工作内存的变量,把工作内存中的一个变量的值传送到主内存中,以便随后的write的操作
write(写入):作用于主内存的变量,它把store操作从工作内存中一个变量的值传送到主内存的变量中
工作规则:
如果要把一个变量从主内存中复制到工作内存,就需要按顺序地执行read和load操作,如果把变量从工作内存中同步回主内存中,就要按顺序地执行store和write操作。但Java内存模型只要求上述操作必须按顺序执行,而没有保证必须是连续执行。
不允许read和load、store和write操作之一单独出现
不允许一个线程丢弃它的最近assign的操作,即变量在工作内存中改变了之后必须同步到主内存中
不允许—个线程无原因地(没有发生过任何assign操作)把数据从工作内存同步回主内存中
一个新的变量只能在主内存中诞生,不允许在工作内存中直接使用一个未被初始化(load或assign )的变量。即就是对一个变量实施use和store操作之前,必须先执行过了assign和load操作
一个变量在同_时刻只允许一条线程对其进行lock操作,但lock操作可以被同一条线程重复执行多次,多次执行lock后,只有执行相同次数的unlock操作,变量才会被解锁。lock和unlock必须成对出现
如果对一个变量执行lock操作,将会清空工作内存中此变量的值,在执行引擎使用这个变量前需要重新执行load或assign操作初始化变量的值
不允许一个线程无原因地(没有发生过任何assign操作)把数据从工作内存同步回主内存中
一个新的变量只能在主内存中诞生/不允许在工作内存中直接使用一个未被初始化(load或assign )的变量。即就是对一个变量实施use和store操作之前, 必须先执行过了assign和load操作
并发模拟工具及代码:
工具:
PostMan
Apache Bench
Apache JMeter
代码:
CountDownLatch - 闭锁:1. await() 2. countDown() 阻塞线程,满足某种特定的条件继续执行,常与与线程池一同使用。
Semaphore - 信号量:可以阻塞线程,控制同一时间请求的并发量
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65
| package com.leezy.concurrency;
import com.leezy.concurrency.annoations.NotThreadSafe; import lombok.extern.slf4j.Slf4j;
import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.Semaphore;
@Slf4j @NotThreadSafe
public class ConcurrencyTest {
public static int clientTotal = 5000;
public static int threadTotal = 200;
public static int count = 0;
private static void add() { count++; }
public static void main(String[] args) throws InterruptedException { ExecutorService executorService = Executors.newCachedThreadPool(); final Semaphore semaphore = new Semaphore(threadTotal); final CountDownLatch countDownLatch = new CountDownLatch(clientTotal); for (int i = 0; i < clientTotal; i++) { executorService.execute(() -> { try { semaphore.acquire(); add(); semaphore.release(); } catch (InterruptedException e) { log.error("exception", e); } countDownLatch.countDown(); }); } countDownLatch.await(); executorService.shutdown(); log.info("count:" + count);
}
}
|
线程安全性
原子性:提供了互斥访问,同一时刻只能有一个线程来对它进行操作。 - Atomic包
ThreadLocal的使用
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44
| import java.util.ArrayList; import java.util.List; import java.util.logging.Logger;
public class ThreadLocalTest {
private Logger logger = Logger.getLogger("logger");
private ThreadLocal<String> mThreadLocal = new ThreadLocal<>();
private void testThreadLocal() throws InterruptedException { mThreadLocal.set("main-thread"); logger.info("result1: " + mThreadLocal.get()); Thread thread1 = new Thread() { @Override public void run() { super.run(); mThreadLocal.set("thread_one"); logger.info("result2: " + mThreadLocal.get()); } }; thread1.start(); thread1.join(); Thread thread2 = new Thread() { public void run() { super.run(); logger.info("result3: " + mThreadLocal.get()); } }; thread2.start(); thread2.join(); logger.info("result4: " + mThreadLocal.get()); }
public static void main(String[] args) { ThreadLocalTest threadLocalTest = new ThreadLocalTest(); try { threadLocalTest.testThreadLocal(); } catch (Exception e) { System.out.println(e.getMessage()); } } }
|
输出:
1 2 3 4 5 6 7 8
| # 十二月 15, 2019 4:36:21 下午 ThreadLocalTest testThreadLocal 信息: result1: main-thread # 十二月 15, 2019 4:36:21 下午 ThreadLocalTest$1 run 信息: result2: thread_one # 十二月 15, 2019 4:36:21 下午 ThreadLocalTest$2 run 信息: result3: null # 十二月 15, 2019 4:36:21 下午 ThreadLocalTest testThreadLocal 信息: result4: main-thread
|
ThreadLocal详解
四个接口:
public void set(T value) { }
设置当前线程的ThreadLocal值为指定的Value
public T get() { }
获取当前线程所对应的ThreadLocal值,如果当前线程下没有值,就调用initialValue函数对其进行初始化
public void remove() { }
删除当前线程ThreadLocal对应的值,当前线程结束时,系统会自动回收线程局部变量值。P.S. remove方法调用后再调用get方法会使
initialValue()重新被调用,从而使ThreadLocal的值被重新被初始化。
protected T initilValue() { }
当前线程通过get()方法第一次对ThreadLocal进行访问时,会调用该方法。
如果我们希望ThreadLocal拥有一个不为null的初始值,则应该为ThreadLocal定义一个子类,并重写initialValue()方法。