异步任务和定时任务

异步任务和定时任务@Async 注解和自定义线程池的使用 ;@Scheduled和自定义线程池的搭配使用;对使用过程中可能出现的问题进行分析和解决。

1.@Async

通过 @Async 可以创建一个异步任务。注解可以标注在一个类或一个方法上面。 @Async 是基于 Spring 的 AOP 实现的。

1.1 Demo

启动类:

01_启动类.png

异步任务类:

01_异步任务.png

服务类:

01_服务类.png

controller 类:

01_controller类.png

执行结果:

从执行结果可以发现,一共有 8 个线程在执行这 50 个异步任务。

01_执行结果.png

明显开启了一个线程池,并使用该线程池中的线程执行异步任务。那该线程池的配置信息是怎么样的呢?

我们可以通过下面的代码,打印该线程池的相关信息。

public void printThreadPools() {
    String[] beanNames = applicationContext.getBeanNamesForType(ThreadPoolTaskExecutor.class);
​
    for (String beanName : beanNames) {
        ThreadPoolTaskExecutor threadPool = applicationContext.getBean(beanName, ThreadPoolTaskExecutor.class);
        System.out.println("Thread pool bean name: " + beanName);
        System.out.println("Core pool size: " + threadPool.getCorePoolSize());
        System.out.println("Max pool size: " + threadPool.getMaxPoolSize());
        System.out.println("Queue capacity: " + threadPool.getQueueCapacity());
        System.out.println("Thread name prefix: " + threadPool.getThreadNamePrefix());
    }
}

运行结果只打印了一个线程池的信息,说明该线程池是默认线程池。其中核心线程数量是 8 个,最大线程数量是 Integer.MAX_VALUE ,队列容量是 Integer.MAX_VALUE。这说明默认的线程池容易出现 OOM。

01_线程池信息.png

关于 @Async 注解有以下几点需要注意:

  • 需要在启动类或配置类加上 @EnableAsync 注解
  • 返回值类型只支持 voidFuture
  • value 属性声明了要使用的线程池的 beanName

详细解析可看以下博客:异步任务和定时任务

源码:JavaDemoRep/SpringBootDemo/ThreadPoolDemo/ThreadPoolDemo1 · Qing-Yu-SH/JavaDemoRep

2.自定义线程池

2.1 线程池

通过 Executors 创建

  • FixedThreadPoolSingleThreadExecutor

    • 允许请求的队列长度为 Integer.MAX_VALUE ,可能堆积大量的请求,从而导致 OOM
  • CachedThreadPoolScheduledThreadPool

    • 允许创建的线程数量为 Integer.MAX_VALUE ,可能会创建大量线程,从而导致 OOM

通过 ThreadPoolExecutor 创建

其构造函数如下所示,其中参数有 7 个:

  • corePoolSize:核心线程数,定义了最小可以同时运行的线程数量

  • maximumPoolSize:最大线程数,定义了当队列中存放的任务达到最大容量时,允许线程池能够同时运行的最大线程数量

  • keepAliveTime:线程存活时间;当线程池中的线程数量大于 corePoolSize,并且没有新的任务提交,核心线程外的线程不会立即销毁,而是等待 keepAliveTime 后,如果仍然空闲,则销毁

  • unitkeepAliveTime 的时间单位

  • workQueue:任务队列;当有新任务时,先判断当前运行的线程数量是否达到核心线程数,如果达到的话,新任务就会被放入任务队列中

  • threadFactory:创建线程的工厂,executor 创建新线程的时候会用到

  • handler:饱和策略;如果当前运行的线程数量达到最大线程数量,并且队列中容量已是最大,则新任务根据饱和策略处理

    • ThreadPoolExecutor.AbortPolicy:抛出 RejectedExecutionException 来拒绝新任务的处理(默认)
    • ThreadPoolExecutor.DiscardPolicy:不处理新任务,直接丢弃掉
    • ThreadPoolExecutor.DiscardOldestPolicy:丢弃最早未处理的任务
    • ThreadPoolExecutor.CallerRunsPolicy:由提交该任务的线程执行
public ThreadPoolExecutor(int corePoolSize,                          int maximumPoolSize,                          long keepAliveTime,                          TimeUnit unit,                          BlockingQueue<Runnable> workQueue,                          ThreadFactory threadFactory,                          RejectedExecutionHandler handler) {
    if (corePoolSize < 0 ||
        maximumPoolSize <= 0 ||
        maximumPoolSize < corePoolSize ||
        keepAliveTime < 0)
        throw new IllegalArgumentException();
    if (workQueue == null || threadFactory == null || handler == null)
        throw new NullPointerException();
    this.acc = System.getSecurityManager() == null ? null : AccessController.getContext();
    this.corePoolSize = corePoolSize;
    this.maximumPoolSize = maximumPoolSize;
    this.workQueue = workQueue;
    this.keepAliveTime = unit.toNanos(keepAliveTime);
    this.threadFactory = threadFactory;
    this.handler = handler;
}

2.2 配置线程池

配置类:

@Configuration
public class ThreadPoolConfig {
    @Bean
    public ThreadPoolTaskExecutor threadPoolExecutor(){
        ThreadPoolTaskExecutor taskExecutor = new ThreadPoolTaskExecutor();
        taskExecutor.setThreadNamePrefix("yq-thread-");
        taskExecutor.setMaxPoolSize(16);
        taskExecutor.setCorePoolSize(10);
        taskExecutor.setQueueCapacity(10);
        taskExecutor.setRejectedExecutionHandler(new ThreadPoolExecutor.AbortPolicy());
        return taskExecutor;
    }
}

执行结果:

01_线程池配置执行结果.png

源码:JavaDemoRep/SpringBootDemo/ThreadPoolDemo/ThreadPoolDemo2 · Qing-Yu-SH/JavaDemoRep

3.@Scheduled

通过 @Scheduled 可以创建一个定时任务。@Scheduled主要有三种配置执行时间的方式:

  • cron 表达式
  • fixedRate
  • fixedRateString

3.1 Demo

启动类:

01_启动类Schedule.png

定时任务:

01_定时任务.png

运行结果:

01_定时任务执行结果.png

3.2 属性1 – cron 表达式

cron 表达式是一个字符串,字符串以 5 或 6 个空格隔开,分为 6 或 7 个域,每一个域代表一个含义,有如下两种语法格式:

seconds minutes hours dayOfMonth month dayOfWeek year
seconds minutes hours dayOfMonth month dayOfWeek
单位 允许值 允许通配符
秒(seconds) 0 – 59 , - * /
分(minutes) 0 – 59 , - * /
时(hours) 0 – 23 , - * /
日(dayOfMonth) 1 – 31 , - * /
月(month) 1 – 12 或者 JAN – DEC , - * /
周(dayOfWeek) 0 – 7 或者 SUN – SAT , - * /
符号 含义
, 表示列出枚举值;比如分域 6,20,意味着在 6 和 20 触发一次
- 表示区间;比如秒的 0-2,表示 0秒、1秒、2秒都会触发
* 表示匹配该域的任意值;在秒字段表示每秒,在月字段表示每月
/ 表示递增触发,比如秒的 0/5,表示从 0 秒开始,每隔 5 秒触发
? 不指定值,不需要关系当前指定的字段的值;比如每天都执行,不需要关心 dayOfWeek,设置为 ?
L 表示最后,只能出现在 dayOfMonthdayOfWeekdayOfWeek 为 2L,表示本月最后第二个星期
W 表示有效工作日,只能出现在 dayOfMonth,系统将在离指定日期的最近的有效工作日触发事件
LW 表示在某个月最后一个工作日
# 用于确定每个月第几个星期几,只能出现在 dayOfMonth 域;例如 4#2,表示某月的第二个星期三

定时任务:

@Scheduled(cron = "0/2 * * * * ?")
public void scheduleTask(){
    log.info("当前执行线程:{},ID:{}",Thread.currentThread().getName(),Thread.currentThread().getId());
}

执行结果:

2023-10-25 12:19:22.011  INFO 8604 --- [   scheduling-1] com.yq.ScheduleService : 当前执行线程:scheduling-1,ID:43
2023-10-25 12:19:24.016  INFO 8604 --- [   scheduling-1] com.yq.ScheduleService : 当前执行线程:scheduling-1,ID:43
2023-10-25 12:19:26.004  INFO 8604 --- [   scheduling-1] com.yq.ScheduleService : 当前执行线程:scheduling-1,ID:43
2023-10-25 12:19:28.009  INFO 8604 --- [   scheduling-1] com.yq.ScheduleService : 当前执行线程:scheduling-1,ID:43
2023-10-25 12:19:30.015  INFO 8604 --- [   scheduling-1] com.yq.ScheduleService : 当前执行线程:scheduling-1,ID:43

在线Cron表达式生成器 (qqe2.com)

3.3 属性2 – fixedDelay

fixedDelay以固定周期执行下次任务调度。当项目启动后,会执行一次;要等到任务执行完成后,再经过设定的时间后,才再次执行。fixedDelay 属性是 Long 类型,并且默认以毫秒(ms)为单位。

定时任务:

// @Scheduled(fixedDelayString = "2000")
@Scheduled(fixedDelay = 2000)
public void fixedDelay() throws InterruptedException {
    log.info("start FixedDelay -- 当前执行线程:{},ID:{}",Thread.currentThread().getName(),Thread.currentThread().getId());
    Thread.sleep(5000);
    log.info("end FixedDelay -- 当前执行线程:{},ID:{}",Thread.currentThread().getName(),Thread.currentThread().getId());
}

执行结果:

2023-10-25 15:59:38.348  INFO 15520 --- [   scheduling-1] com.yq.ScheduleService : start FixedDelay -- 当前执行线程:scheduling-1,ID:43
2023-10-25 15:59:43.348  INFO 15520 --- [   scheduling-1] com.yq.ScheduleService : end  FixedDelay -- 当前执行线程:scheduling-1,ID:43
2023-10-25 15:59:45.348  INFO 15520 --- [   scheduling-1] com.yq.ScheduleService : start FixedDelay -- 当前执行线程:scheduling-1,ID:43
2023-10-25 15:59:50.355  INFO 15520 --- [   scheduling-1] com.yq.ScheduleService : end  FixedDelay -- 当前执行线程:scheduling-1,ID:43
2023-10-25 15:59:52.355  INFO 15520 --- [   scheduling-1] com.yq.ScheduleService : start FixedDelay -- 当前执行线程:scheduling-1,ID:43
2023-10-25 15:59:57.362  INFO 15520 --- [   scheduling-1] com.yq.ScheduleService : end  FixedDelay -- 当前执行线程:scheduling-1,ID:43

3.4 属性3 – fixedRate

fixedRate 指定了每次任务执行的时间间隔,而不是上一个执行的结束时间到下一个执行的开始时间的间隔。

定时任务:

// @Scheduled(fixedRateString = "2000")
@Scheduled(fixedRate = 2000)
public void fixedRate() throws InterruptedException {
    log.info("start -- 当前执行线程:{},ID:{}",Thread.currentThread().getName(),Thread.currentThread().getId());
    Thread.sleep(5000);
    log.info("执行 FixedRate -- 当前执行线程:{},ID:{}",Thread.currentThread().getName(),Thread.currentThread().getId());
}

执行结果:

2023-10-25 16:08:52.314  INFO 10912 --- [   scheduling-1] com.yq.ScheduleService : start -- 当前执行线程:scheduling-1,ID:43
2023-10-25 16:08:57.321  INFO 10912 --- [   scheduling-1] com.yq.ScheduleService : end FixedRate -- 当前执行线程:scheduling-1,ID:43
2023-10-25 16:08:57.321  INFO 10912 --- [   scheduling-1] com.yq.ScheduleService : start -- 当前执行线程:scheduling-1,ID:43
2023-10-25 16:09:02.328  INFO 10912 --- [   scheduling-1] com.yq.ScheduleService : end FixedRate -- 当前执行线程:scheduling-1,ID:43
2023-10-25 16:09:02.328  INFO 10912 --- [   scheduling-1] com.yq.ScheduleService : start -- 当前执行线程:scheduling-1,ID:43
2023-10-25 16:09:07.335  INFO 10912 --- [   scheduling-1] com.yq.ScheduleService : end FixedRate -- 当前执行线程:scheduling-1,ID:43

3.5 属性4 – initialDelay

initialDelay 用于指定在首次执行定时任务之前的延迟时间。这允许你在应用启动后等待一段时间,然后再开始执行定时任务。这对于需要等待一段时间以确保应用程序完全启动的情况很有用。需要与fixedDelayfixedRate 属性搭配使用。

定时任务:

@Scheduled(initialDelay = 6000, fixedDelay = 2000)
public void initialDelay() throws InterruptedException {
    log.info("start -- 当前执行线程:{},ID:{}",Thread.currentThread().getName(),Thread.currentThread().getId());
    Thread.sleep(5000);
    log.info("end InitialDelay -- 当前执行线程:{},ID:{}",Thread.currentThread().getName(),Thread.currentThread().getId());
}

执行结果:

2023-10-25 16:16:07.503  INFO 10804 --- [           main] com.yq.ScheduleDemo1Application : Started ScheduleDemo1Application in 1.068 seconds (JVM running for 2.025)
2023-10-25 16:16:13.505  INFO 10804 --- [   scheduling-1] com.yq.ScheduleService : start -- 当前执行线程:scheduling-1,ID:43
2023-10-25 16:16:18.512  INFO 10804 --- [   scheduling-1] com.yq.ScheduleService : end InitialDelay -- 当前执行线程:scheduling-1,ID:43
2023-10-25 16:16:20.519  INFO 10804 --- [   scheduling-1] com.yq.ScheduleService : start -- 当前执行线程:scheduling-1,ID:43
2023-10-25 16:16:25.526  INFO 10804 --- [   scheduling-1] com.yq.ScheduleService : end InitialDelay -- 当前执行线程:scheduling-1,ID:43

3.6 单线程执行

多个定时任务默认是单线程执行的,因此任务调度器就会出现时间漂移,任务执行时间将不确定。多个定时任务的执行顺序是不确定的。

多个定时任务:

@Scheduled(cron = "0/2 * * * * ?")
public void scheduleTask01() throws InterruptedException {
    Thread.sleep(5000);
    log.info("当前任务01 - 当前执行线程:{},ID:{}",Thread.currentThread().getName(),Thread.currentThread().getId());
}
​
@Scheduled(cron = "0/2 * * * * ?")
public void scheduleTask02() throws InterruptedException {
    Thread.sleep(2000);
    log.info("当前任务02 - 当前执行线程:{},ID:{}",Thread.currentThread().getName(),Thread.currentThread().getId());
}

执行结果:

2023-10-25 16:25:08.013  INFO 1984 --- [   scheduling-1] com.yq.ScheduleService : 当前任务02 - 当前执行线程:scheduling-1,ID:43
2023-10-25 16:25:13.020  INFO 1984 --- [   scheduling-1] com.yq.ScheduleService : 当前任务01 - 当前执行线程:scheduling-1,ID:43
2023-10-25 16:25:15.027  INFO 1984 --- [   scheduling-1] com.yq.ScheduleService : 当前任务02 - 当前执行线程:scheduling-1,ID:43
2023-10-25 16:25:20.034  INFO 1984 --- [   scheduling-1] com.yq.ScheduleService : 当前任务01 - 当前执行线程:scheduling-1,ID:43
2023-10-25 16:25:22.034  INFO 1984 --- [   scheduling-1] com.yq.ScheduleService : 当前任务02 - 当前执行线程:scheduling-1,ID:43
2023-10-25 16:25:27.041  INFO 1984 --- [   scheduling-1] com.yq.ScheduleService : 当前任务01 - 当前执行线程:scheduling-1,ID:43

源码:JavaDemoRep/SpringBootDemo/ScheduleDemo/ScheduleDemo1 · Qing-Yu-SH/JavaDemoRep (github.com)

4.定时任务并行调度

4.1 Demo

启动类:

@EnableAsync
@EnableScheduling
@SpringBootApplication
public class ScheduleDemo2Application {
    public static void main(String[] args) {
        SpringApplication.run(ScheduleDemo2Application.class);
    }
}

线程池配置:

@Configuration
public class ThreadPoolConfig {
​
    @Bean
    public TaskExecutor asyncTaskExecutor(){
        ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
        executor.setThreadNamePrefix("yq-schedule-");
        executor.setMaxPoolSize(16);
        executor.setCorePoolSize(10);
        executor.setQueueCapacity(0);
        executor.setRejectedExecutionHandler(new ThreadPoolExecutor.AbortPolicy());
        return executor;
    }
​
}

定时任务:

@Slf4j
@Component
public class ScheduleService {
​
    @Async
    @Scheduled(cron = "0/2 * * * * ?")
    public void scheduleTask01() throws InterruptedException {
        Thread.sleep(5000);
        log.info("当前任务01 - 当前执行线程:{},ID:{}",Thread.currentThread().getName(),Thread.currentThread().getId());
    }
​
    @Async
    @Scheduled(cron = "0/2 * * * * ?")
    public void scheduleTask02() throws InterruptedException {
        Thread.sleep(2000);
        log.info("当前任务02 - 当前执行线程:{},ID:{}",Thread.currentThread().getName(),Thread.currentThread().getId());
    }
​
}

执行结果:

2023-10-25 16:35:20.020  INFO 21244 --- [cTaskExecutor-1] com.yq.ScheduleService : 当前任务02 - 当前执行线程:SimpleAsyncTaskExecutor-1,ID:50
2023-10-25 16:35:22.006  INFO 21244 --- [cTaskExecutor-4] com.yq.ScheduleService : 当前任务02 - 当前执行线程:SimpleAsyncTaskExecutor-4,ID:53
2023-10-25 16:35:23.014  INFO 21244 --- [cTaskExecutor-2] com.yq.ScheduleService : 当前任务01 - 当前执行线程:SimpleAsyncTaskExecutor-2,ID:51
2023-10-25 16:35:24.013  INFO 21244 --- [cTaskExecutor-6] com.yq.ScheduleService : 当前任务02 - 当前执行线程:SimpleAsyncTaskExecutor-6,ID:55
2023-10-25 16:35:25.003  INFO 21244 --- [cTaskExecutor-3] com.yq.ScheduleService : 当前任务01 - 当前执行线程:SimpleAsyncTaskExecutor-3,ID:52
2023-10-25 16:35:26.006  INFO 21244 --- [cTaskExecutor-7] com.yq.ScheduleService : 当前任务02 - 当前执行线程:SimpleAsyncTaskExecutor-7,ID:56

上面的执行结果与预期不符,是存在问题的,你发现问题了吗?

4.2 执行结果分析

从日志中发现,我们配置的线程池没有发挥作用。我们的程序通过 SimpleAsyncTaskExecutor 线程池来执行定时任务,该线程池的线程数量是 Integer.MAX_VALUE

观察日志发现,打印了以下的提示信息。说明在 Spring 容器中,发现了多个线程池,并且没有一个称为 taskExecutor 的线程池。

More than one TaskExecutor bean found within the context, and none is named 'taskExecutor'. Mark one of them as primary or name it 'taskExecutor' (possibly as an alias) in order to use it for async processing: [asyncTaskExecutor, taskScheduler]

针对该问题,我们有多个解决办法。

1.定义线程池时,设定其名称为 taskExecutor

// 通过注解 Bean 的 name 属性指定
@Bean(name = "taskExecutor")
public TaskExecutor taskExecutor(){
    ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
    executor.setThreadNamePrefix("yq-schedule-");
    executor.setMaxPoolSize(16);
    executor.setCorePoolSize(10);
    executor.setQueueCapacity(0);
    executor.setRejectedExecutionHandler(new ThreadPoolExecutor.AbortPolicy());
    return executor;
}
// 设定方法名为 taskExecutor,会将方法名作为 beanName
@Bean
public TaskExecutor taskExecutor(){
    ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
    executor.setThreadNamePrefix("yq-schedule-");
    executor.setMaxPoolSize(16);
    executor.setCorePoolSize(10);
    executor.setQueueCapacity(0);
    executor.setRejectedExecutionHandler(new ThreadPoolExecutor.AbortPolicy());
    return executor;
}

2.通过 @Primary 注解设置自定义的线程池为首选

@Primary
@Bean
public TaskExecutor asyncTaskExecutor(){
    ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
    executor.setThreadNamePrefix("yq-schedule-");
    executor.setMaxPoolSize(16);
    executor.setCorePoolSize(10);
    executor.setQueueCapacity(0);
    executor.setRejectedExecutionHandler(new ThreadPoolExecutor.AbortPolicy());
    return executor;
}

3.通过 @Async 的 value 属性指定要使用的线程池

@Async(value = "asyncTaskExecutor")
@Scheduled(cron = "0/2 * * * * ?")
public void scheduleTask02() throws InterruptedException {
    Thread.sleep(2000);
    log.info("当前任务02 - 当前执行线程:{},ID:{}",Thread.currentThread().getName(),Thread.currentThread().getId());
}

源码:JavaDemoRep/SpringBootDemo/ScheduleDemo/ScheduleDemo2 · Qing-Yu-SH/JavaDemoRep (github.com)

版权声明:本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 举报,一经查实,本站将立刻删除。
转载请注明出处: https://daima100.com/36697.html

(0)
上一篇 2023-11-15
下一篇 2023-11-15

相关推荐

发表回复

您的电子邮箱地址不会被公开。 必填项已用*标注