1. 队列(Queue)
1.1 队列
-
队列是一种
集合
。 -
除了基本的集合操作以外,队列还提供了额外的插入、提取和检查操作。
-
队列通常以
先进先出
(FIFO)的方式对元素进行排序。
1.2. 阻塞队列 BlockingQueue
1.2.1. 说明
阻塞队列(BlockingQueue )常用于多线程领域。
相对于普通队列来说,它有以下特点:
- 当阻塞队列是空时,从队列中获取元素的操作将会被阻塞,直至有数据存入队列
- 当阻塞队列是满时,从队列中添加元素的操作将会被阻塞,直至队列中出现空位
这意味着,如果一个线程因为操作阻塞队列而被阻塞,那么除非另一个线程也操作这个队列并达成条件,否则这个线程会一直阻塞在这里。
1.2.2. 核心方法
对于向阻塞队列内添加数据,有四种处理方式,此处存放数据为例子
- 阻塞:如果存数据时队列已满,就一直阻塞到队列有空位并存入
- 异常:如果存数据时队列已满,就报异常
- 特殊值:如果队列能够容纳返回true,否则返回false
- 超时:如果存数据时队列已满,就等待指定时间,否则返回失败
方法类型 | 阻塞 | 异常 | 特殊值 | 超时 |
---|---|---|---|---|
存放 | put(e) | add(e) | offer(e) | offer(e,time,unit) |
取出 | take() | remove() | poll() | polltime,unit) |
1.2.3. 常见阻塞队列
ArrayBlockingQueue、LinkedBlockingQueue、SynchronousQueue
这三种阻塞队列我们会在下面提到
1.3. ArrayBlockingQueue
ArrayBlockingQueue是有界队列
,队列长度是有上限的,可以手动设置队列长度。
import java.util.concurrent.ArrayBlockingQueue;
public class TestQueue {
public static void main(String[] args) {
ArrayBlockingQueue q = new ArrayBlockingQueue(3);
MyRun1 m1 = new MyRun1(q);
MyRun2 m2 = new MyRun2(q);
new Thread(m1).start();
new Thread(m2).start();
}
}
class MyRun1 implements Runnable {
public ArrayBlockingQueue q;
public MyRun1(ArrayBlockingQueue q) {
this.q = q;
}
@Override
public void run() {
//向队列里添加数据
for (int i = 0; i < 100; i++) {
try {
q.put(i);//存入队列 队列满了 代码就停在这里
System.out.println("存入了" + i);
} catch (InterruptedException e) {
e.printStackTrace();
}
try {
Thread.sleep(10);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
class MyRun2 implements Runnable {
public ArrayBlockingQueue q;
public MyRun2(ArrayBlockingQueue q) {
this.q = q;
}
@Override
public void run() {
while (true) {
int peek = 0;
try {
System.out.println("从队列里取数据");
peek = (int) q.take();//从队列里取数据 如果没有数据 就停在这里
System.out.println(peek);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
无界队列
LinkedBlockingQueue 创建时不需要给容量- ArrayBlockingQueue是
有界队列
创建 的时候必须给容量
1.4. 队列版 消费者生产者模型
- 由于有多个生产者和消费者 线程, 打印顺序会有问题
代码
package link.xiaomo.test5;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ArrayBlockingQueue;
public class Desk {
//用队列
private ArrayBlockingQueue<String> queue = new ArrayBlockingQueue<>(5);
// 放1个包子的方法
// 厨师1 厨师2 厨师3
public void put() {
String tName = Thread.currentThread().getName();
try {
queue.put(tName + "做的一个包子 ---");
System.out.println(tName + "做好了包子--" + queue.size());
Thread.sleep(1000);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
// 吃货1 吃货2
public void get() {
String tName = Thread.currentThread().getName();
try {
String baozi = queue.take();
System.out.println(tName + "获取到了" + baozi);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
}
测试类
package link.xiaomo.test5;
public class ThreadTest {
public static void main(String[] args) {
// 需求:3个生产者线程,负责生产包子,每个线程每次只能生产1个包子放在桌子上
// 2个消费者线程负责吃包子,每人每次只能从桌子上拿1个包子吃。
Desk desk = new Desk();
// 创建3个生产者线程(3个厨师)
new Thread(() -> {
while (true) {
desk.put();
}
}, "厨师1").start();
new Thread(() -> {
while (true) {
desk.put();
}
}, "厨师2").start();
//
new Thread(() -> {
while (true) {
desk.put();
}
}, "厨师3").start();
// 创建2个消费者线程(2个吃货)
new Thread(() -> {
while (true) {
desk.get();
}
}, "吃货1").start();
new Thread(() -> {
while (true) {
desk.get();
}
}, "吃货2").start();
}
}
1.5. 线程案例:多线程文件拷贝
- 多线程完成文件的拷贝
- 把 电影 文件夹 里的所有的电影文件 拷贝到 D:/电影里, 所有的拷贝过程放到子线程执行 ( 只考虑有一层文件夹)
- 为了模拟效果 可以在循环中添加等待sleep
代码
package link.xiaomo.test7;
import java.io.*;
class CopyRunnable implements Runnable {
private File f1; // 被拷贝的文件
private File f2; // 目标文件
public CopyRunnable(File f1, File f2) {
this.f1 = f1;
this.f2 = f2;
}
@Override
public void run() {
System.out.println("开始拷贝" + f1.getName());
try (FileInputStream fis = new FileInputStream(f1);
FileOutputStream fos = new FileOutputStream(f2);) {
byte[] bytes = new byte[1024];
int len;
while ((len = fis.read(bytes)) != -1) {
fos.write(bytes, 0, len);
}
Thread.sleep(1000);
System.out.println("拷贝完成" + f1.getName());
} catch (IOException e) {
throw new RuntimeException(e);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
}
public class Demo02 {
public static void main(String[] args) {
File movieFile = new File("mymodule/电影");
File dFile = new File("D:/电影");
if (!dFile.exists())
dFile.mkdirs();
// 获取所有电影文件
File[] files = movieFile.listFiles();
// 遍历
for (File f : files) {
// 开启子线程 完成拷贝
new Thread(new CopyRunnable(f, new File(dFile, f.getName()))).start();
}
}
}
2. 线程池
什么是线程池?作用什么?
- 线程池 用来维护线程的一个容器,我们只需要把要做的任务传给线程池,线程池会 自动去创建线程执行任务.
- 可以创建和维护线程 , 对线程进行复用, 避免了大量的创建和销毁线程
2.1. 线程池-ThreadPoolExecutor
创建线程池对象 :
ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(核心线程数量,最大线程数量,空闲线程最大存活时间,时间单位,任务队列,创建线程工厂,任务的拒绝策略);
代码实现 :
package com.itxiaomo.mythreadpool;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
public class MyThreadPoolDemo3 {
// 参数一:核心线程数量
// 参数二:最大线程数
// 参数三:空闲线程最大存活时间
// 参数四:时间单位
// 参数五:任务队列
// 参数六:创建线程工厂
// 参数七:任务的拒绝策略
public static void main(String[] args) {
ThreadPoolExecutor pool = new ThreadPoolExecutor(2,5,2,TimeUnit.SECONDS,new ArrayBlockingQueue<>(3), Executors.defaultThreadFactory(),new ThreadPoolExecutor.AbortPolicy());
pool.exute(new MyRunnable());//执行任务
pool.submit(new MyRunnable());//执行任务
pool.shutdown();
}
}
2.2. 线程池-参数详解
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler)
corePoolSize: 核心线程的最大值,不能小于0
maximumPoolSize:最大线程数,不能小于等于0,maximumPoolSize >= corePoolSize
keepAliveTime: 空闲线程最大存活时间,不能小于0
unit: 时间单位
workQueue: 任务队列,不能为null
threadFactory: 创建线程工厂,不能为null
handler: 任务的拒绝策略,不能为null
2.3. 线程池的运行过程
2.3.1. 执行流程
- 1首先先开启核心线程去运行任务
- 2当核心线程满的时候, 多的任务会直接放到队列里
- 3 当队列也满的时候, 再多出的任务 会创建临时线程去执行
- 4 当 线程总数量+队列的总数量 都满了,那么多余的任务就会被拒绝
2.3.2. 代码实现
- 注意在添加不同数量的任务对象时, 观察线程池的运行效果,理解前5个参数的作用
package link.xiaomo.test6;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
public class Demo02 {
public static void main(String[] args) {
//1 创建线程池对象
ThreadPoolExecutor pool = new ThreadPoolExecutor(2, 5, 3, TimeUnit.SECONDS,
new ArrayBlockingQueue<>(3),
Executors.defaultThreadFactory(),//创建线程工厂
new ThreadPoolExecutor.AbortPolicy() //任务的拒绝策略
);
//执行流程
// 1首先先开启核心线程去运行任务
// 2当核心线程满的时候, 多的任务会直接放到队列里
// 3 当队列也满的时候, 再多出的任务 会创建临时线程去执行
// 4 当 线程总数量+队列的总数量 都满了,那么多余的任务就会被拒绝
//2 执行任务 参数接收一个Runnable对象
for (int i = 1; i <= 8; i++) {
pool.execute(new MyRunnable(i));
}
pool.execute(new MyRunnable(9));//这个任务的添加 超出了线程池的最大数量,会被拒绝
//3 shutdown 可以让线程池在执行完任务后 结束
pool.shutdown();
System.out.println("111111111111111");
}
}
public class MyRunnable implements Runnable {
@Override
public void run() {
System.out.println(Thread.currentThread().getName() + "开始");
try {
Thread.sleep(3000);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
System.out.println(Thread.currentThread().getName() + "结束");
}
}
2.4. 线程池-拒绝策略
-
RejectedExecutionHandler是jdk提供的一个任务拒绝策略接口,它下面存在4个子类。
ThreadPoolExecutor.AbortPolicy: 丢弃任务并抛出RejectedExecutionException异常。是 默认
的策略。ThreadPoolExecutor.DiscardPolicy: 丢弃任务,但是不抛出异常 这是 不推荐
的做法。ThreadPoolExecutor.DiscardOldestPolicy 抛弃队列中等待最久的任务 然后把当前任务加入队列中。 ThreadPoolExecutor.CallerRunsPolicy 调用任务的run()方法绕过线程池直接在main线程中执行。 -
注:明确线程池对多可执行的任务数 = 队列容量 + 最大线程数
代码演示:
public static void main(String[] args) {
//1 创建线程池对象
ThreadPoolExecutor pool = new ThreadPoolExecutor(2,
4, 3, TimeUnit.SECONDS,
new ArrayBlockingQueue<>(2),
Executors.defaultThreadFactory(),//创建线程工厂
new ThreadPoolExecutor.AbortPolicy() //任务的拒绝策略
);
for (int i = 1; i <= 7; i++) {
pool.execute(new MyRunnable(i));
}
//3 shutdown 可以让线程池在执行完任务后 结束
pool.shutdown();
}
2.5. 线程池执行Callable任务 (有返回值的任务)
- submit方法 可以传入Callable对象
package link.xiaomo.test7;
import java.util.concurrent.*;
public class Demo05 {
public static void main(String[] args) {
//1 创建线程池对象
ThreadPoolExecutor pool = new ThreadPoolExecutor(2,
4, 3, TimeUnit.SECONDS,
new ArrayBlockingQueue<>(2),
Executors.defaultThreadFactory(),//创建线程工厂
new ThreadPoolExecutor.AbortPolicy() //任务的拒绝策略
);
//返回一个future的子类对象 里面存放着任务返回的值
Future<String> future1 = pool.submit(new MyCallable(1));
Future<String> future2 = pool.submit(new MyCallable(2));
Future<String> future3 = pool.submit(new MyCallable(3));
try {
//get方法获取返回值
System.out.println(future1.get());
System.out.println(future2.get());
System.out.println(future3.get());
} catch (InterruptedException e) {
throw new RuntimeException(e);
} catch (ExecutionException e) {
throw new RuntimeException(e);
}
}
}
2.6. newCachedThreadPool
2.6.1. 快速创建线程池
Executors提供了一些快速创建线程池的方式,比如通过Executors.newCachedThreadPool()可以快速的创建一个线程池。任务队列使用了没有容量的阻塞队列SynchronousQueue
。
这是一个可缓存线程池,如果线程池长度超过处理需要,可灵活回收空闲线程,若无可回收,则新建线程。
线程池为无限大,当执行第二个任务时第一个任务已经完成,会复用执行第一个任务的线程,而不用每次新建线程。
//1,创建一个默认的线程池对象.池子中默认是空的.默认最多可以容纳int类型的最大值数量的线程.
ExecutorService executorService = Executors.newCachedThreadPool();
executorService.execute(new MyRunnable(1));
executorService.execute(new MyRunnable(2));
executorService.execute(new MyRunnable(3));
executorService.execute(new MyRunnable(4));
executorService.execute(new MyRunnable(5));
executorService.shutdown();
2.6.2. SynchronousQueue
上面提到的Executors.newCachedThreadPool()方式获取的线程池,其内部的任务队列为SynchronousQueue,这是一个很特殊的队列。
SynchronousQueue,实际上它不是一个真正的队列,因为SynchronousQueue没有容量
。与其他阻塞队列不同,SynchronousQueue是一个不存储元素的阻塞队列。只是它维护一组线程,这些线程在等待着把元素加入或移出队列。
特点:
- 内部没有存储(容量为0)
- 是阻塞队列(实现BlockingQueue接口)
- 生产或者消费线程会造成阻塞,每个插入操作必须等待另一个线程的移除操作,反之亦然
2.7. newFixedThreadPool
2.7.1. 只有核心线程的线程池
- 通过Executors.newFixedThreadPool()创建线程池,这个线程池
只有核心线程
,创建时需要指定最大线程数量,但是任务队列使用的是无界阻塞队列LinkedBlockingQueue
//1,创建一个默认的线程池对象.需要制定一个线程池的数量
//通过看newFixedThreadPool方法 这个线程池只有核心线程 而且队列是无界的
ExecutorService executorService = Executors.newFixedThreadPool(3);
executorService.execute(new MyRunnable(1));
executorService.execute(new MyRunnable(2));
executorService.execute(new MyRunnable(3));
executorService.execute(new MyRunnable(4));
executorService.execute(new MyRunnable(5));
executorService.shutdown();
Comments NOTHING