1. 队列(Queue)

1.1 队列

队列-类图

  • 队列是一种集合

  • 除了基本的集合操作以外,队列还提供了额外的插入、提取和检查操作。

  • 队列通常以先进先出(FIFO)的方式对元素进行排序。

1.2. 阻塞队列 BlockingQueue

1.2.1. 说明

阻塞队列(BlockingQueue )常用于多线程领域。

相对于普通队列来说,它有以下特点:

  • 当阻塞队列是空时,从队列中获取元素的操作将会被阻塞,直至有数据存入队列
  • 当阻塞队列是满时,从队列中添加元素的操作将会被阻塞,直至队列中出现空位

这意味着,如果一个线程因为操作阻塞队列而被阻塞,那么除非另一个线程也操作这个队列并达成条件,否则这个线程会一直阻塞在这里。

1.2.2. 核心方法

对于向阻塞队列内添加数据,有四种处理方式,此处存放数据为例子

  1. 阻塞:如果存数据时队列已满,就一直阻塞到队列有空位并存入
  2. 异常:如果存数据时队列已满,就报异常
  3. 特殊值:如果队列能够容纳返回true,否则返回false
  4. 超时:如果存数据时队列已满,就等待指定时间,否则返回失败
方法类型 阻塞 异常 特殊值 超时
存放 put(e) add(e) offer(e) offer(e,time,unit)
取出 take() remove() poll() polltime,unit)

1.2.3. 常见阻塞队列

ArrayBlockingQueue、LinkedBlockingQueue、SynchronousQueue

这三种阻塞队列我们会在下面提到

1.3. ArrayBlockingQueue

ArrayBlockingQueue是有界队列,队列长度是有上限的,可以手动设置队列长度。

import java.util.concurrent.ArrayBlockingQueue;

public class TestQueue {
    public static void main(String[] args) {
        ArrayBlockingQueue q = new ArrayBlockingQueue(3);

        MyRun1 m1 = new MyRun1(q);
        MyRun2 m2 = new MyRun2(q);

        new Thread(m1).start();
        new Thread(m2).start();
    }
}
class MyRun1 implements Runnable {
    public ArrayBlockingQueue q;

    public MyRun1(ArrayBlockingQueue q) {
        this.q = q;
    }

    @Override
    public void run() {
        //向队列里添加数据
        for (int i = 0; i < 100; i++) {
            try {
                q.put(i);//存入队列 队列满了 代码就停在这里
                System.out.println("存入了" + i);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            try {
                Thread.sleep(10);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }
}
class MyRun2 implements Runnable {
    public ArrayBlockingQueue q;

    public MyRun2(ArrayBlockingQueue q) {
        this.q = q;
    }

    @Override
    public void run() {
        while (true) {
            int peek = 0;
            try {
                System.out.println("从队列里取数据");
                peek = (int) q.take();//从队列里取数据 如果没有数据 就停在这里
                System.out.println(peek);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }
}
  • 无界队列 LinkedBlockingQueue 创建时不需要给容量
  • ArrayBlockingQueue是有界队列 创建 的时候必须给容量

1.4. 队列版 消费者生产者模型

  • 由于有多个生产者和消费者 线程, 打印顺序会有问题

代码

package link.xiaomo.test5;

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ArrayBlockingQueue;

public class Desk {
    //用队列
    private ArrayBlockingQueue<String> queue = new ArrayBlockingQueue<>(5);

    // 放1个包子的方法
    // 厨师1 厨师2 厨师3
    public void put() {
        String tName = Thread.currentThread().getName();

        try {
            queue.put(tName + "做的一个包子 ---");
            System.out.println(tName + "做好了包子--" + queue.size());
            Thread.sleep(1000);
        } catch (InterruptedException e) {
            throw new RuntimeException(e);
        }
    }

    // 吃货1 吃货2
    public void get() {
        String tName = Thread.currentThread().getName();
        try {
            String baozi = queue.take();
            System.out.println(tName + "获取到了" + baozi);
        } catch (InterruptedException e) {
            throw new RuntimeException(e);
        }
    }
}

测试类

package link.xiaomo.test5;

public class ThreadTest {
    public static void main(String[] args) {
        //   需求:3个生产者线程,负责生产包子,每个线程每次只能生产1个包子放在桌子上
        //      2个消费者线程负责吃包子,每人每次只能从桌子上拿1个包子吃。
        Desk desk = new Desk();

        // 创建3个生产者线程(3个厨师)
        new Thread(() -> {
            while (true) {
                desk.put();
            }
        }, "厨师1").start();

        new Thread(() -> {
            while (true) {
                desk.put();
            }
        }, "厨师2").start();
//
        new Thread(() -> {
            while (true) {
                desk.put();
            }
        }, "厨师3").start();

        // 创建2个消费者线程(2个吃货)
        new Thread(() -> {
            while (true) {
                desk.get();
            }
        }, "吃货1").start();

        new Thread(() -> {
            while (true) {
                desk.get();
            }
        }, "吃货2").start();
    }
}

1.5. 线程案例:多线程文件拷贝

  • 多线程完成文件的拷贝
  • 把 电影 文件夹 里的所有的电影文件 拷贝到 D:/电影里, 所有的拷贝过程放到子线程执行 ( 只考虑有一层文件夹)
  • 为了模拟效果 可以在循环中添加等待sleep

代码

package link.xiaomo.test7;

import java.io.*;

class CopyRunnable implements Runnable {
    private File f1; // 被拷贝的文件
    private File f2; // 目标文件

    public CopyRunnable(File f1, File f2) {
        this.f1 = f1;
        this.f2 = f2;
    }

    @Override
    public void run() {
        System.out.println("开始拷贝" + f1.getName());
        try (FileInputStream fis = new FileInputStream(f1);
             FileOutputStream fos = new FileOutputStream(f2);) {

            byte[] bytes = new byte[1024];
            int len;
            while ((len = fis.read(bytes)) != -1) {
                fos.write(bytes, 0, len);
            }
            Thread.sleep(1000);
            System.out.println("拷贝完成" + f1.getName());
        } catch (IOException e) {
            throw new RuntimeException(e);
        } catch (InterruptedException e) {
            throw new RuntimeException(e);
        }
    }
}

public class Demo02 {
    public static void main(String[] args) {
        File movieFile = new File("mymodule/电影");
        File dFile = new File("D:/电影");
        if (!dFile.exists())
            dFile.mkdirs();

        // 获取所有电影文件
        File[] files = movieFile.listFiles();
        // 遍历
        for (File f : files) {
            // 开启子线程 完成拷贝
            new Thread(new CopyRunnable(f, new File(dFile, f.getName()))).start();
        }
    }
}

2. 线程池

什么是线程池?作用什么?

  • 线程池 用来维护线程的一个容器,我们只需要把要做的任务传给线程池,线程池会 自动去创建线程执行任务.
  • 可以创建和维护线程 , 对线程进行复用, 避免了大量的创建和销毁线程

2.1. 线程池-ThreadPoolExecutor

创建线程池对象 :

ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(核心线程数量,最大线程数量,空闲线程最大存活时间,时间单位,任务队列,创建线程工厂,任务的拒绝策略);

代码实现 :

package com.itxiaomo.mythreadpool;

import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

public class MyThreadPoolDemo3 {
//    参数一:核心线程数量
//    参数二:最大线程数
//    参数三:空闲线程最大存活时间
//    参数四:时间单位
//    参数五:任务队列
//    参数六:创建线程工厂
//    参数七:任务的拒绝策略
    public static void main(String[] args) {
        ThreadPoolExecutor pool = new ThreadPoolExecutor(2,5,2,TimeUnit.SECONDS,new ArrayBlockingQueue<>(3), Executors.defaultThreadFactory(),new ThreadPoolExecutor.AbortPolicy());
        pool.exute(new MyRunnable());//执行任务
        pool.submit(new MyRunnable());//执行任务
        pool.shutdown();
    }
}

2.2. 线程池-参数详解

1591165506516

public ThreadPoolExecutor(int corePoolSize,
                              int maximumPoolSize,
                              long keepAliveTime,
                              TimeUnit unit,
                              BlockingQueue<Runnable> workQueue,
                              ThreadFactory threadFactory,
                              RejectedExecutionHandler handler)

corePoolSize:   核心线程的最大值,不能小于0
maximumPoolSize:最大线程数,不能小于等于0,maximumPoolSize >= corePoolSize
keepAliveTime:  空闲线程最大存活时间,不能小于0
unit:           时间单位
workQueue:      任务队列,不能为null
threadFactory:  创建线程工厂,不能为null      
handler:        任务的拒绝策略,不能为null  

2.3. 线程池的运行过程

2.3.1. 执行流程

  • 1首先先开启核心线程去运行任务
  • 2当核心线程满的时候, 多的任务会直接放到队列里
  • 3 当队列也满的时候, 再多出的任务 会创建临时线程去执行
  • 4 当 线程总数量+队列的总数量 都满了,那么多余的任务就会被拒绝

2.3.2. 代码实现

  • 注意在添加不同数量的任务对象时, 观察线程池的运行效果,理解前5个参数的作用
package link.xiaomo.test6;

import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
public class Demo02 {
    public static void main(String[] args) {
        //1 创建线程池对象
        ThreadPoolExecutor pool = new ThreadPoolExecutor(2, 5, 3, TimeUnit.SECONDS,
                new ArrayBlockingQueue<>(3),
                Executors.defaultThreadFactory(),//创建线程工厂
                new ThreadPoolExecutor.AbortPolicy() //任务的拒绝策略
        );
        //执行流程
        // 1首先先开启核心线程去运行任务
        // 2当核心线程满的时候, 多的任务会直接放到队列里
        // 3 当队列也满的时候, 再多出的任务 会创建临时线程去执行
        // 4 当  线程总数量+队列的总数量  都满了,那么多余的任务就会被拒绝

        //2 执行任务 参数接收一个Runnable对象
        for (int i = 1; i <= 8; i++) {
            pool.execute(new MyRunnable(i));
        }

        pool.execute(new MyRunnable(9));//这个任务的添加 超出了线程池的最大数量,会被拒绝

        //3 shutdown 可以让线程池在执行完任务后 结束
        pool.shutdown();

        System.out.println("111111111111111");

    }
}
public class MyRunnable implements Runnable {
    @Override
    public void run() {
        System.out.println(Thread.currentThread().getName() + "开始");
        try {
            Thread.sleep(3000);
        } catch (InterruptedException e) {
            throw new RuntimeException(e);
        }
        System.out.println(Thread.currentThread().getName() + "结束");
    }
}

2.4. 线程池-拒绝策略

  • RejectedExecutionHandler是jdk提供的一个任务拒绝策略接口,它下面存在4个子类。

    ThreadPoolExecutor.AbortPolicy: 丢弃任务并抛出RejectedExecutionException异常。是默认的策略。
    ThreadPoolExecutor.DiscardPolicy: 丢弃任务,但是不抛出异常 这是不推荐的做法。
    ThreadPoolExecutor.DiscardOldestPolicy 抛弃队列中等待最久的任务 然后把当前任务加入队列中。
    ThreadPoolExecutor.CallerRunsPolicy 调用任务的run()方法绕过线程池直接在main线程中执行。
  • 注:明确线程池对多可执行的任务数 = 队列容量 + 最大线程数


代码演示

    public static void main(String[] args) {
        //1 创建线程池对象
        ThreadPoolExecutor pool = new ThreadPoolExecutor(2,
                4, 3, TimeUnit.SECONDS,
                new ArrayBlockingQueue<>(2),
                Executors.defaultThreadFactory(),//创建线程工厂
                new ThreadPoolExecutor.AbortPolicy() //任务的拒绝策略
        );
        for (int i = 1; i <= 7; i++) {
            pool.execute(new MyRunnable(i));
        }
        //3 shutdown 可以让线程池在执行完任务后 结束
        pool.shutdown();

    }

2.5. 线程池执行Callable任务 (有返回值的任务)

  • submit方法 可以传入Callable对象
package link.xiaomo.test7;

import java.util.concurrent.*;

public class Demo05 {
    public static void main(String[] args) {
        //1 创建线程池对象
        ThreadPoolExecutor pool = new ThreadPoolExecutor(2,
                4, 3, TimeUnit.SECONDS,
                new ArrayBlockingQueue<>(2),
                Executors.defaultThreadFactory(),//创建线程工厂
                new ThreadPoolExecutor.AbortPolicy() //任务的拒绝策略
        );

        //返回一个future的子类对象 里面存放着任务返回的值
        Future<String> future1 = pool.submit(new MyCallable(1));
        Future<String> future2 = pool.submit(new MyCallable(2));
        Future<String> future3 = pool.submit(new MyCallable(3));

        try {
            //get方法获取返回值
            System.out.println(future1.get());
            System.out.println(future2.get());
            System.out.println(future3.get());
        } catch (InterruptedException e) {
            throw new RuntimeException(e);
        } catch (ExecutionException e) {
            throw new RuntimeException(e);
        }

    }
}

2.6. newCachedThreadPool

2.6.1. 快速创建线程池

Executors提供了一些快速创建线程池的方式,比如通过Executors.newCachedThreadPool()可以快速的创建一个线程池。任务队列使用了没有容量的阻塞队列SynchronousQueue

这是一个可缓存线程池,如果线程池长度超过处理需要,可灵活回收空闲线程,若无可回收,则新建线程。

线程池为无限大,当执行第二个任务时第一个任务已经完成,会复用执行第一个任务的线程,而不用每次新建线程。

        //1,创建一个默认的线程池对象.池子中默认是空的.默认最多可以容纳int类型的最大值数量的线程.
        ExecutorService executorService = Executors.newCachedThreadPool();

        executorService.execute(new MyRunnable(1));
        executorService.execute(new MyRunnable(2));
        executorService.execute(new MyRunnable(3));
        executorService.execute(new MyRunnable(4));
        executorService.execute(new MyRunnable(5));

        executorService.shutdown();

2.6.2. SynchronousQueue

上面提到的Executors.newCachedThreadPool()方式获取的线程池,其内部的任务队列为SynchronousQueue,这是一个很特殊的队列。

SynchronousQueue,实际上它不是一个真正的队列,因为SynchronousQueue没有容量。与其他阻塞队列不同,SynchronousQueue是一个不存储元素的阻塞队列。只是它维护一组线程,这些线程在等待着把元素加入或移出队列。

特点:

  • 内部没有存储(容量为0)
  • 是阻塞队列(实现BlockingQueue接口)
  • 生产或者消费线程会造成阻塞,每个插入操作必须等待另一个线程的移除操作,反之亦然

2.7. newFixedThreadPool

2.7.1. 只有核心线程的线程池

  • 通过Executors.newFixedThreadPool()创建线程池,这个线程池只有核心线程 ,创建时需要指定最大线程数量,但是任务队列使用的是无界阻塞队列LinkedBlockingQueue
        //1,创建一个默认的线程池对象.需要制定一个线程池的数量
        //通过看newFixedThreadPool方法 这个线程池只有核心线程 而且队列是无界的
        ExecutorService executorService = Executors.newFixedThreadPool(3);

        executorService.execute(new MyRunnable(1));
        executorService.execute(new MyRunnable(2));
        executorService.execute(new MyRunnable(3));
        executorService.execute(new MyRunnable(4));
        executorService.execute(new MyRunnable(5));

        executorService.shutdown();
如人饮水,冷暖自知。
最后更新于 2023-08-05