记录一个使用rabbitMq的工具类。
在构造该工具类时,构造方法会创建一个ThreadPoolTaskExecutor线程池。这个线程池是由Spring提供的,但其底层依旧是由jdk的ThreadPoolExecutor实现,只是对其进行了封装。
关于线程池的大小,需要根据实际情况进行填写。
package link.xiaomo.common.mq;
import cn.hutool.core.lang.UUID;
import lombok.extern.slf4j.Slf4j;
import org.slf4j.MDC;
import org.springframework.amqp.core.MessagePostProcessor;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
import java.time.Duration;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ThreadPoolExecutor;
import static link.xiaomo.common.constants.Constant.REQUEST_ID_HEADER;
@Slf4j
@Component
public class RabbitMqHelper {
private final RabbitTemplate rabbitTemplate;
private final MessagePostProcessor processor = new BasicIdMessageProcessor();
private final ThreadPoolTaskExecutor executor;
public RabbitMqHelper(RabbitTemplate rabbitTemplate) {
this.rabbitTemplate = rabbitTemplate;
executor = new ThreadPoolTaskExecutor();
// 配置核心线程数
executor.setCorePoolSize(10);
// 配置最大线程数
executor.setMaxPoolSize(15);
// 配置队列大小
executor.setQueueCapacity(99999);
// 配置线程池中的线程的名称前缀
executor.setThreadNamePrefix("mq-async-send-handler");
// 设置拒绝策略:当pool已经达到max size的时候,如何处理新任务
// CALLER_RUNS:不在新线程中执行任务,而是有调用者所在的线程来执行
executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
// 执行初始化
executor.initialize();
}
/**
* 根据exchange和routingKey发送消息
*/
public <T> void send(String exchange, String routingKey, T t) {
log.debug("准备发送消息,exchange:{}, RoutingKey:{}, message:{}", exchange, routingKey, t);
// 设置消息标示,用于消息确认,消息发送失败直接抛出异常,交给调用者处理
String id = UUID.randomUUID().toString(true);
CorrelationData correlationData = new CorrelationData(id);
// 设置发送超时时间为500毫秒
rabbitTemplate.setReplyTimeout(500);
// 发送消息,同时设置消息id
rabbitTemplate.convertAndSend(exchange, routingKey, t, processor, correlationData);
}
/**
* 根据exchange和routingKey发送消息,并且可以设置延迟时间
*/
public <T> void sendDelayMessage(String exchange, String routingKey, T t, Duration delay) {
// 设置消息标示,用于消息确认,消息发送失败直接抛出异常,交给调用者处理
String id = UUID.randomUUID().toString(true);
CorrelationData correlationData = new CorrelationData(id);
// 设置发送超时时间为500毫秒
rabbitTemplate.setReplyTimeout(500);
// 发送消息,同时设置消息id
rabbitTemplate.convertAndSend(exchange, routingKey, t, new DelayedMessageProcessor(delay), correlationData);
}
/**
* 根据exchange和routingKey 异步发送消息,并指定一个延迟时间
*
* @param exchange 交换机
* @param routingKey 路由KEY
* @param t 数据
* @param <T> 数据类型
*/
public <T> void sendAsync(String exchange, String routingKey, T t, Long time) {
String requestId = MDC.get(REQUEST_ID_HEADER);
CompletableFuture.runAsync(() -> {
try {
MDC.put(REQUEST_ID_HEADER, requestId);
// 发送延迟消息
if (time != null && time > 0) {
sendDelayMessage(exchange, routingKey, t, Duration.ofMillis(time));
} else {
send(exchange, routingKey, t);
}
} catch (Exception e) {
log.error("推送消息异常,t:{},", t, e);
}
}, executor);
}
/**
* 根据exchange和routingKey 异步发送消息
*
* @param exchange 交换机
* @param routingKey 路由KEY
* @param t 数据
* @param <T> 数据类型
*/
public <T> void sendAsync(String exchange, String routingKey, T t) {
sendAsync(exchange, routingKey, t, null);
}
}
关于代码中使用的静态变量Constant.REQUEST_ID_HEADER
。把这条放在项目中的静态文件就可以。
public interface Constant {
String REQUEST_ID_HEADER = "requestId";
}
本工具需要的依赖
<!--hutool工具包-->
<dependency>
<groupId>cn.hutool</groupId>
<artifactId>hutool-all</artifactId>
<version>5.7.17</version>
</dependency>
<!--mq-->
<dependency>
<groupId>org.springframework.amqp</groupId>
<artifactId>spring-amqp</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.amqp</groupId>
<artifactId>spring-rabbit</artifactId>
</dependency>
<!-- lombok 管理 -->
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<version>1.18.20</version>
</dependency>
Comments NOTHING