00、学习目标

  • 能够说出什么延迟任务
  • 能够整合SpringDataRedis
  • 能够使用延迟任务实现文章定时发布

需求:定时发布文章

1)定时任务+表扫描原表

​ 优点:实现简单

​ 缺点:高频地查询原表数据,效率并不高,也会导致精确度不高

2)Redis过期key监听

​ 优点:实现相对简单

​ 缺点:

​ 可能会占用过多Redis内存

​ 依赖Redis持久化策略,容易数据丢失

​ 当Redis内存多大时,Redis执行淘汰机制,也会数据丢失。

3)RabbitMQ死信队列+TTL过期时间

​ 优点:MQ可靠性强,不容易数据丢失,处理效率也比较高

​ 缺点:实现比较复杂

​ 这种方案更适合过期时间固定的定时发布场景(不太适合精确发布(不固定时间))

4)Redis+MySQL实现延迟队列

​ 优点:可靠性比较强,效率比较高,适合精确发布(不固定时间)

​ 缺点:实现比较复杂

01、文章定时发布:延迟任务技术对比

1)JDK自带的DelayQueue

JDK自带DelayQueue 是一个支持延时获取元素的阻塞队列, 内部采用优先队列 PriorityQueue 存储元素,同时元素必须实现 Delayed 接口;在创建元素时可以指定多久才可以从队列中获取当前元素,只有在延迟期满时才能从队列中提取元素

image-20210513150058814

DelayQueue属于排序队列,它的特殊之处在于队列的元素必须实现Delayed接口,该接口需要实现compareTo和getDelay方法

getDelay方法:获取元素在队列中的剩余时间,只有当剩余时间为0时元素才可以出队列。

compareTo方法:用于排序,确定元素出队列的顺序。

DelayQueue的问题:

使用线程池或者原生DelayQueue程序挂掉之后,任务都是放在内存,需要考虑未处理消息的丢失带来的影响,如何保证数据不丢失,需要持久化(磁盘)

2)RabbitMQ实现延迟任务

  • TTL:Time To Live (消息存活时间) 原理:死信队列+TTL消息
  • 死信队列:Dead Letter Exchange(死信交换机),当消息成为Dead message后,可以重新发送另一个交换机(死信交换机)

TTL消息,过期时间如何计算? TTL的定时时间 = 用户选择的发布时间毫秒值-当前时间毫秒值

image-20210513150319742

RabbitMQ实现延迟任务的问题:

是一种可取方案,但实现过程相对复杂一些。

3)Redis实现延迟任务(本项目使用)

Redis实现延迟任务的具体实现思路

1636421939269

关键:采用Redis的SortedSet实现延迟队列

问题思路

1.为什么采用Redis的ZSet实现延迟任务?

zset数据类型的去重有序(score分数排序)特点进行延迟。例如:时间戳作为score进行排序

image-20210513150352211

1657588392841

2.为什么任务需要存储在MySQL数据库中?

延迟任务是一个通用的服务,任何需要延迟得任务都可以调用该服务,且需要考虑数据持久化的问题,存储数据库中是一种数据安全的考虑,且Redis如果Zset数据存储过多,效率下降,所有先将任务存储在MySQL先,定时再导入到Redis中去执行。

3.在添加zset数据的时候,为什么只存储未来5分钟内的任务?

任务模块是一个通用的模块,项目中任何需要延迟队列的地方,都可以调用这个接口,要考虑到数据量的问题,如果数据量特别大,为了防止阻塞,只需要把未来几分钟要执行的数据存入缓存即可。

02、文章定时发布:延迟任务表结构和实体

任务表:

 1636422336760

任务日志表:

 1636422356944

version:用于MyBatisPlus的乐观锁,防止多线程并发修改同一条数据时出现数据安全问题。

Taskinfo 任务实体

package com.heima.model.schedule.pojos;

import com.baomidou.mybatisplus.annotation.IdType;
import com.baomidou.mybatisplus.annotation.TableField;
import com.baomidou.mybatisplus.annotation.TableId;
import com.baomidou.mybatisplus.annotation.TableName;
import lombok.Data;

import java.io.Serializable;
import java.util.Date;

@Data
@TableName("taskinfo")
public class Taskinfo{

    /**
     * 任务id
     */
    @TableId(type = IdType.ID_WORKER)
    private Long taskId;

    /**
     * 执行时间
     */
    @TableField("execute_time")
    private Date executeTime;

    /**
     * 参数
     */
    @TableField("parameters")
    private String parameters;

    /**
     * 任务主题
     */
    @TableField("task_topic")
    private Integer taskTopic;

}

TaskinfoLogs 任务日志实体:

package com.heima.model.schedule.pojos;

import com.baomidou.mybatisplus.annotation.*;
import lombok.Data;

import java.io.Serializable;
import java.util.Date;

@Data
@TableName("taskinfo_logs")
public class TaskinfoLogs{

    /**
     * 任务id
     */
    @TableId(type = IdType.ID_WORKER)
    private Long taskId;

    /**
     * 执行时间
     */
    @TableField("execute_time")
    private Date executeTime;

    /**
     * 参数
     */
    @TableField("parameters")
    private String parameters;

    /**
     * 任务主题
     */
    @TableField("task_topic")
    private Integer taskTopic;

    /**
     * 版本号,用乐观锁
     */
    @Version
    private Integer version;

    /**
     * 状态 0=int 1=EXECUTED 2=CANCELLED
     */
    @TableField("status")
    private Integer status;

}

03、文章定时发布:搭建延迟任务微服务

1)创建项目并导入依赖

 1636423123766

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <parent>
        <artifactId>heima-leadnews-service</artifactId>
        <groupId>com.heima</groupId>
        <version>1.0-SNAPSHOT</version>
    </parent>
    <modelVersion>4.0.0</modelVersion>

    <artifactId>heima-leadnews-schedule</artifactId>

    <dependencies>
        <dependency>
            <groupId>com.heima</groupId>
            <artifactId>heima-leadnews-common</artifactId>
            <version>1.0-SNAPSHOT</version>
        </dependency>
        <dependency>
            <groupId>com.heima</groupId>
            <artifactId>heima-leadnews-model</artifactId>
            <version>1.0-SNAPSHOT</version>
        </dependency>
        <dependency>
            <groupId>com.heima</groupId>
            <artifactId>heima-leadnews-utils</artifactId>
            <version>1.0-SNAPSHOT</version>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>
        <dependency>
            <groupId>mysql</groupId>
            <artifactId>mysql-connector-java</artifactId>
        </dependency>
        <dependency>
            <groupId>com.baomidou</groupId>
            <artifactId>mybatis-plus-boot-starter</artifactId>
        </dependency>
        <dependency>
            <groupId>com.alibaba.cloud</groupId>
            <artifactId>spring-cloud-starter-alibaba-nacos-discovery</artifactId>
        </dependency>
        <dependency>
            <groupId>com.alibaba.cloud</groupId>
            <artifactId>spring-cloud-starter-alibaba-nacos-config</artifactId>
        </dependency>
    </dependencies>
</project>

2)编写启动类

package com.heima.schedule;

import com.baomidou.mybatisplus.extension.plugins.OptimisticLockerInterceptor;
import com.baomidou.mybatisplus.extension.plugins.PaginationInterceptor;
import org.mybatis.spring.annotation.MapperScan;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.context.annotation.Bean;

/**
 * 延迟任务微服务
 */
@SpringBootApplication
@MapperScan("com.heima.schedule.mapper")
public class ScheduleApplication {
    public static void main(String[] args) {
        SpringApplication.run(ScheduleApplication.class,args);
    }

    /**
     * mybatis-plus乐观锁支持
     * @return
     */
    @Bean
    public OptimisticLockerInterceptor optimisticLockerInterceptor(){
        return new OptimisticLockerInterceptor();
    }
}

3)配置bootstrap.yml

server:
  port: 9004
spring:
  application:
    name: leadnews-schedule
  cloud:
    nacos:
      discovery:
        server-addr: 192.168.66.133:8848
      config:
        server-addr: 192.168.66.133:8848
        file-extension: yml

4)配置logback.xml

<?xml version="1.0" encoding="UTF-8"?>

<configuration>
    <!--定义日志文件的存储地址,使用绝对路径-->
    <property name="LOG_HOME" value="e:/logs"/>

    <!-- Console 输出设置 -->
    <appender name="CONSOLE" class="ch.qos.logback.core.ConsoleAppender">
        <encoder>
            <!--格式化输出:%d表示日期,%thread表示线程名,%-5level:级别从左显示5个字符宽度%msg:日志消息,%n是换行符-->
            <pattern>%d{yyyy-MM-dd HH:mm:ss.SSS} [%thread] %-5level %logger{36} - %msg%n</pattern>
            <charset>utf8</charset>
        </encoder>
    </appender>

    <!-- 按照每天生成日志文件 -->
    <appender name="FILE" class="ch.qos.logback.core.rolling.RollingFileAppender">
        <rollingPolicy class="ch.qos.logback.core.rolling.TimeBasedRollingPolicy">
            <!--日志文件输出的文件名-->
            <fileNamePattern>${LOG_HOME}/leadnews.%d{yyyy-MM-dd}.log</fileNamePattern>
        </rollingPolicy>
        <encoder>
            <pattern>%d{yyyy-MM-dd HH:mm:ss.SSS} [%thread] %-5level %logger{36} - %msg%n</pattern>
        </encoder>
    </appender>

    <!-- 异步输出 -->
    <appender name="ASYNC" class="ch.qos.logback.classic.AsyncAppender">
        <!-- 不丢失日志.默认的,如果队列的80%已满,则会丢弃TRACT、DEBUG、INFO级别的日志 -->
        <discardingThreshold>0</discardingThreshold>
        <!-- 更改默认的队列的深度,该值会影响性能.默认值为256 -->
        <queueSize>512</queueSize>
        <!-- 添加附加的appender,最多只能添加一个 -->
        <appender-ref ref="FILE"/>
    </appender>


    <logger name="org.apache.ibatis.cache.decorators.LoggingCache" level="DEBUG" additivity="false">
        <appender-ref ref="CONSOLE"/>
    </logger>
    <logger name="org.springframework.boot" level="debug"/>
    <root level="info">
        <!--<appender-ref ref="ASYNC"/>-->
        <appender-ref ref="FILE"/>
        <appender-ref ref="CONSOLE"/>
    </root>
</configuration>

5)在Nacos添加配置

spring:
  datasource:
    driver-class-name: com.mysql.cj.jdbc.Driver
    url: jdbc:mysql://192.168.66.133:3306/leadnews_schedule?serverTimezone=Asia/Shanghai&characterEncoding=utf8&useUnicode=true&useSSL=true
    username: root
    password: root


# 设置Mapper接口所对应的XML文件位置,如果你在Mapper接口中有自定义方法,需要进行该配置
mybatis-plus:
  mapper-locations: classpath*:mapper/*.xml
  # 设置别名包扫描路径,通过该属性可以给包中的类注册别名
  type-aliases-package: com.heima.model.schedule.pojos

logging:
  level:
    com.heima.schedule: debug

6)Mapper接口

package com.heima.schedule.mapper;

import com.baomidou.mybatisplus.core.mapper.BaseMapper;
import com.heima.model.schedule.pojos.Taskinfo;

public interface TaskinfoMapper extends BaseMapper<Taskinfo> {
}

package com.heima.schedule.mapper;

import com.baomidou.mybatisplus.core.mapper.BaseMapper;
import com.heima.model.schedule.pojos.TaskinfoLogs;

public interface TaskinfoLogsMapper extends BaseMapper<TaskinfoLogs> {
}

04、文章定时发布:集成Redis及ZSet的使用

1)创建redis容器(已完成)

docker run -d --name redis --restart=always -p 6379:6379 redis

requirepass:定义redis的登录密码

2)导入redis依赖

<dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-data-redis</artifactId>
        </dependency>

3)添加redis连接配置(Nacos)

spring:
  redis:
    host: 192.168.66.133
    password: leadnews
    port: 6379  

4)编写测试类

package com.heima.schedule;

import com.baomidou.mybatisplus.extension.api.R;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.test.context.junit4.SpringRunner;

import java.util.Random;
import java.util.Set;

/**
 * 演示SortedSet
 */
@RunWith(SpringRunner.class)
@SpringBootTest(classes = ScheduleApplication.class)
public class SortedSetTest {

    @Autowired
    private StringRedisTemplate redisTemplate;


    /**
     * 向SortedSet存入数据
     */
    @Test
    public void testAdd(){
        Random random = new Random();
        for(int i = 1;i<=20;i++) {
            redisTemplate.opsForZSet().add("hello","jack"+i,random.nextInt(100));
        }
    }

    /**
     * 从SortedSet查询数据
     */
    @Test
    public void testRange(){
        //根据元素下标位置查询元素(下标从0开始计算)
        //Set<String> set = redisTemplate.opsForZSet().range("hello", 3, 9);

        //根据元素的score分数查询元素(score从6至27)
        //Set<String> set = redisTemplate.opsForZSet().rangeByScore("hello",6,27);

        //查询score小于等于27分的元素
        Set<String> set = redisTemplate.opsForZSet().rangeByScore("hello",0,27);

        System.out.println(set);
    }

    /**
     * 移除SortedSet数据
     */
    @Test
    public void testRemove(){
        //精准移除
        //redisTemplate.opsForZSet().remove("hello","jack11");

        //score范围移除
        redisTemplate.opsForZSet().removeRangeByScore("hello",0,27);
    }
}



05、文章定时发布:添加任务

Dto:

Task对象用于存储每个任务的数据,可以在微服务之间传输任务数据时使用

package com.heima.model.schedule.dtos;

import lombok.Data;

import java.io.Serializable;

package com.heima.model.schedule.dtos;

import lombok.Data;

import java.io.Serializable;

@Data
public class Task{

    /**
     * 任务id
     */
    private Long taskId;
    
    /**
     * 类型
     */
    private Integer taskTopic;

    /**
     * 执行时间
     */
    private long executeTime;

    /**
     * task参数
     */
    private String parameters;
    
}

接口:

package com.heima.schedule.service;

import com.heima.model.schedule.dtos.Task;

import java.util.List;

/**
 * 任务处理业务
 */
public interface TaskService {

    /**
     * 添加任务
     */
    public long addTask(Task task);

}

实现:

package com.heima.schedule.service.impl;

import com.heima.common.constants.RedisConstants;
import com.heima.common.constants.ScheduleConstants;
import com.heima.model.schedule.dtos.Task;
import com.heima.model.schedule.pojos.Taskinfo;
import com.heima.model.schedule.pojos.TaskinfoLogs;
import com.heima.schedule.mapper.TaskinfoLogsMapper;
import com.heima.schedule.mapper.TaskinfoMapper;
import com.heima.schedule.service.TaskService;
import com.heima.utils.common.BeanHelper;
import com.heima.utils.common.JsonUtils;
import org.joda.time.DateTime;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;
import springfox.documentation.spring.web.json.Json;

import java.util.Date;

@Service
public class TaskServiceImpl implements TaskService {
    @Autowired
    private TaskinfoMapper taskinfoMapper;
    @Autowired
    private TaskinfoLogsMapper taskinfoLogsMapper;
    @Autowired
    private StringRedisTemplate redisTemplate;

    @Override
    @Transactional
    public Long addTask(Task task) {

        //把任务添加到DB
        addTaskToDb(task);

        //把任务添加Redis
        addTaskToCache(task);

        return task.getTaskId();
    }

    /**
     * 把任务添加Redis
     * @param task
     */
    private void addTaskToCache(Task task) {
        //判断任务的执行时间是否在未来5分钟以内
        //获取当前时间的未来5分钟的时间
        long futureTime = DateTime.now().plusMinutes(5).getMillis();
        if(task.getExecuteTime()<=futureTime){
            String key = RedisConstants.TASK_TOPIC_PREFIX+task.getTaskTopic();
            redisTemplate.opsForZSet().add(key, JsonUtils.toString(task),task.getExecuteTime());
        }
    }

    /**
     * 把任务添加到DB
     * @param task
     */
    private void addTaskToDb(Task task) {
        try {
            //添加任务表
            Taskinfo taskinfo = BeanHelper.copyProperties(task,Taskinfo.class);
            taskinfo.setExecuteTime(new Date(task.getExecuteTime()));
            taskinfoMapper.insert(taskinfo);

            //把新产生的任务ID赋值给Task对象
            task.setTaskId(taskinfo.getTaskId());

            //添加任务日志表
            TaskinfoLogs taskinfoLogs = BeanHelper.copyProperties(taskinfo,TaskinfoLogs.class);
            taskinfoLogs.setVersion(1);//设置初始值,后续依靠MyBatisPlus乐观锁拦截器实现version更新
            taskinfoLogs.setStatus(ScheduleConstants.SCHEDULED);
            taskinfoLogsMapper.insert(taskinfoLogs);
        } catch (Exception e) {
            e.printStackTrace();
            throw new RuntimeException(e);
        }
    }
}


定义任务状态常量类:

package com.heima.common.constants;

public class ScheduleConstants {

    //task状态
    public static final int SCHEDULED=0;   //初始化状态

    public static final int EXECUTED=1;       //已执行状态

    public static final int CANCELLED=2;   //已取消状态

    public static final int TASK_TOPIC_NEWS_PUBLISH=1; //文章发布任务
}

定义Redis常量类

package com.heima.common.constants;

public abstract class RedisConstants {
    public static String TASK_TOPIC_PREFIX = "task_topic_";
}

编写测试类

package com.heima.schedule;

import com.heima.common.constants.ScheduleConstants;
import com.heima.model.schedule.dtos.Task;
import com.heima.schedule.service.TaskService;
import com.heima.utils.common.JsonUtils;
import org.joda.time.DateTime;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.test.context.junit4.SpringRunner;

import java.util.HashMap;
import java.util.Map;
import java.util.Set;

/**
 * 测试延迟任务的方法
 */
@RunWith(SpringRunner.class)
@SpringBootTest(classes = ScheduleApplication.class)
public class TaskServiceTest {
    @Autowired
    private TaskService taskService;

    @Test
    public void testAddTask(){
        for(int i=1;i<=10;i++){
            //模拟文章定时发布
            Task task = new Task();
            task.setTaskTopic(ScheduleConstants.TASK_TOPIC_NEWS_PUBLISH);
            Map map = new HashMap<>();
            map.put("id",6294);
            task.setParameters(JsonUtils.toString(map));
            task.setExecuteTime(DateTime.now().plusMinutes(i).getMillis());
            taskService.addTask(task);
        }

    }
}

06、文章定时发布:消费任务

接口:

package com.heima.schedule.service;

import com.heima.model.schedule.dtos.Task;

import java.util.List;

public interface TaskService {

    /**
     * 添加任务到延迟队列
     */
    public Long addTask(Task task);

    /**
     * 从延迟队列消费任务
     *  重点:从延迟队列取出符合条件(根据score查询,score小于或等于当前时间毫秒值)
     */
    public List<Task> pollTask(Integer taskTopic);
}

实现:

package com.heima.schedule.service.impl;

import com.baomidou.mybatisplus.core.toolkit.CollectionUtils;
import com.heima.common.constants.RedisConstants;
import com.heima.common.constants.ScheduleConstants;
import com.heima.model.schedule.dtos.Task;
import com.heima.model.schedule.pojos.Taskinfo;
import com.heima.model.schedule.pojos.TaskinfoLogs;
import com.heima.schedule.mapper.TaskinfoLogsMapper;
import com.heima.schedule.mapper.TaskinfoMapper;
import com.heima.schedule.service.TaskService;
import com.heima.utils.common.BeanHelper;
import com.heima.utils.common.JsonUtils;
import org.joda.time.DateTime;
import org.junit.Test;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;

import java.util.ArrayList;
import java.util.Date;
import java.util.List;
import java.util.Set;

@Service
public class TaskServiceImpl implements TaskService {
    @Autowired
    private TaskinfoMapper taskinfoMapper;
    @Autowired
    private TaskinfoLogsMapper taskinfoLogsMapper;
    @Autowired
    private StringRedisTemplate redisTemplate;

    @Override
    @Transactional
    public Long addTask(Task task) {

        //把任务添加到DB
        addTaskToDB(task);

        //把任务添加到Redis
        addTaskToCache(task);

        return task.getTaskId();
    }


    /**
     * 任务添加到redis
     * @param task
     */
    private void addTaskToCache(Task task) {
        //判断当前任务的执行时间是否在5分钟内,5分钟内的任务才存入Redis
        long futureDate = DateTime.now().plusMinutes(5).getMillis();
        if(task.getExecuteTime()<=futureDate){
            //存入redis的SortedSet
            String key = RedisConstants.TASK_TOPIC_PREFIX+task.getTaskTopic();
            redisTemplate.opsForZSet().add(key, JsonUtils.toString(task),task.getExecuteTime());
        }

    }

    /**
     * 把任务添加到DB
     * @param task
     */

    public void addTaskToDB(Task task) {
        try {
            //任务添加到任务表
            Taskinfo taskinfo = BeanHelper.copyProperties(task,Taskinfo.class);
            taskinfo.setExecuteTime(new Date(task.getExecuteTime()));
            taskinfoMapper.insert(taskinfo);

            task.setTaskId(taskinfo.getTaskId());

            //任务添加到任务日志表
            TaskinfoLogs taskinfoLogs = BeanHelper.copyProperties(taskinfo,TaskinfoLogs.class);
            taskinfoLogs.setStatus(ScheduleConstants.SCHEDULED);//初始化状态
            taskinfoLogs.setVersion(1);
            taskinfoLogsMapper.insert(taskinfoLogs);
        } catch (Exception e) {
            e.printStackTrace();
            throw new RuntimeException(e);
        }
    }

    @Override
    @Transactional
    public List<Task> pollTask(Integer taskTopic) {
        String key = RedisConstants.TASK_TOPIC_PREFIX+taskTopic;
        //查询redis中符合执行条件的任务
        Set<String> taskSet = redisTemplate.opsForZSet().rangeByScore(key, 0, System.currentTimeMillis());

        List<Task> taskList = new ArrayList<>();
        if(CollectionUtils.isNotEmpty(taskSet)){
            for(String taskJson:taskSet){
                Task task = JsonUtils.toBean(taskJson,Task.class);

                //更新DB数据
                updateTaskToDB(task);

                //删除redis数据
                redisTemplate.opsForZSet().remove(key,taskJson);

                taskList.add(task);
            }

        }
        return taskList;
    }

    /**
     * 更新DB的任务数据
     * @param task
     */
    private void updateTaskToDB(Task task) {
        try {
            //删除任务表记录
            taskinfoMapper.deleteById(task.getTaskId());

            //更新任务日志表记录
            TaskinfoLogs taskinfoLogs = taskinfoLogsMapper.selectById(task.getTaskId());
            taskinfoLogs.setStatus(ScheduleConstants.EXECUTED);//已执行
            taskinfoLogsMapper.updateById(taskinfoLogs);
        } catch (Exception e) {
            e.printStackTrace();
            throw new RuntimeException(e);
        }
    }
}


测试

package com.heima.schedule;

import com.baomidou.mybatisplus.extension.api.R;
import com.heima.model.schedule.dtos.Task;
import com.heima.schedule.service.TaskService;
import org.joda.time.DateTime;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringRunner;

import java.util.List;
import java.util.Random;

@RunWith(SpringRunner.class)
@SpringBootTest(classes = ScheduleApplication.class)
public class TaskServiceTest {
    @Autowired
    private TaskService taskService;

    /**
     * 添加延迟任务
     */
    @Test
    public void testAddTask(){
        Random random = new Random();
        for(int i=1;i<=20;i++){
            Task task = new Task();
            task.setTaskTopic(1);
            task.setExecuteTime(DateTime.now().plusMinutes(random.nextInt(10)).getMillis());
            task.setParameters("task"+i);
            taskService.addTask(task);
        }

    }

    /**
     * 消费延迟任务
     */
    @Test
    public void testPollTask(){
        List<Task> taskList =  taskService.pollTask(1);

    }
}


07、文章定时发布:数据库同步到Redis

package com.heima.schedule.job;

import com.baomidou.mybatisplus.core.conditions.query.QueryWrapper;
import com.baomidou.mybatisplus.core.toolkit.CollectionUtils;
import com.heima.common.constants.RedisConstants;
import com.heima.model.schedule.dtos.Task;
import com.heima.model.schedule.pojos.Taskinfo;
import com.heima.schedule.mapper.TaskinfoMapper;
import com.heima.utils.common.BeanHelper;
import com.heima.utils.common.JsonUtils;
import lombok.extern.slf4j.Slf4j;
import org.joda.time.DateTime;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Component;

import java.util.Date;
import java.util.List;

@Component
@Slf4j
public class SyncDbToCacheJob {
    @Autowired
    private TaskinfoMapper taskinfoMapper;
    @Autowired
    private StringRedisTemplate redisTemplate;

    /**
     * 定时同步MySQL数据到缓存
     */
    @Scheduled(fixedRate = 10000)
    public void syncData(){
        log.info("执行同步MySQL数据到缓存任务,{}",new Date());
        //查询MySQL符合条件(未来5分钟内执行)的任务
        Date futureTime = DateTime.now().plusMinutes(5).toDate();
        QueryWrapper<Taskinfo> queryWrapper = new QueryWrapper<>();
        queryWrapper.lt("execute_time",futureTime);
        List<Taskinfo> taskinfoList = taskinfoMapper.selectList(queryWrapper);

        //导入缓存
        if(CollectionUtils.isNotEmpty(taskinfoList)){
            for(Taskinfo taskinfo:taskinfoList){
                String key = RedisConstants.TASK_TOPIC_PREFIX+taskinfo.getTaskTopic();//区分不同任务类型
                Task task = BeanHelper.copyProperties(taskinfo,Task.class);
                task.setExecuteTime(taskinfo.getExecuteTime().getTime());
                redisTemplate.opsForZSet().add(key, JsonUtils.toString(task),task.getExecuteTime());//把任务的执行时间作为score分数
            }
        }
    }
}

在启动类开启定时任务

1638368448812

08、文章定时发布:延迟任务Feign接口

1)Controller

在heima-leadnews-schedule提供Controller

package com.heima.schedule.controller;

import com.heima.model.schedule.dtos.Task;
import com.heima.schedule.service.TaskService;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.*;

import java.util.List;

@RestController
@RequestMapping("/task")
public class TaskController {
    @Autowired
    private TaskService taskService;

    /**
     * 添加延迟队列任务
     */
    @PostMapping("/addTask")
    public Long addTask(@RequestBody Task task){
        return taskService.addTask(task);
    }

    /**
     * 消费任务
     */
    @PostMapping("/pollTask/{taskTopic}")
    public List<Task> pollTask(@PathVariable("taskTopic") Integer taskTopic){
        return taskService.pollTask(taskTopic);
    }
}


2)Feign接口

在heima-leadnews-api项目添加TaskFeign接口

 1636425979805

package com.heima.schedule.feign;

import com.heima.model.schedule.dtos.Task;
import org.springframework.cloud.openfeign.FeignClient;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestBody;

import java.util.List;

@FeignClient(name = "leadnews-schedule",path = "/task")
public interface TaskFeign {

    /**
     * 添加延迟队列任务
     */
    @PostMapping("/addTask")
    public Long addTask(@RequestBody Task task);

    /**
     * 消费任务
     */
    @PostMapping("/pollTask/{taskTopic}")
    public List<Task> pollTask(@PathVariable("taskTopic") Integer taskTopic);
}


09、文章定时发布:审核文章添加延迟任务

1)定义添加任务方法

在heima-leadnews-wemedia服务中

接口:

package com.heima.wemedia.service;

import com.heima.model.wemedia.pojos.WmNews;

public interface WmNewsTaskService {

    /**
     * 添加自媒体定时发布延迟任务
     */
    public Long addWmNewsTask(WmNews wmNews);
}

实现:

package com.heima.wemedia.service.impl;

import com.heima.common.constants.ScheduleConstants;
import com.heima.model.schedule.dtos.Task;
import com.heima.model.wemedia.pojos.WmNews;
import com.heima.schedule.feign.TaskFeign;
import com.heima.utils.common.JsonUtils;
import com.heima.wemedia.service.WmNewsTaskService;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;

@Service
public class WmNewsTaskServiceImpl implements WmNewsTaskService {
    @Autowired
    private TaskFeign taskFeign;

    @Override
    public Long addWmNewsTask(WmNews wmNews) {
        Task task = new Task();
        task.setTaskTopic(ScheduleConstants.TASK_TOPIC_NEWS_PUBLISH); //1 代表自媒体文章定时发布
        task.setExecuteTime(wmNews.getPublishTime().getTime());//必须自媒体定时发布时间
        WmNews news = new WmNews();
        news.setId(wmNews.getId());
        task.setParameters(JsonUtils.toString(news));
        Long taskId = taskFeign.addTask(task);
        return taskId;
    }
}


2)审核文章添加延迟任务

 @Override
    @Async  //该方法会放在一个独立的线程中被执行
    @GlobalTransactional  //加入全局事务
    public void autoScanWmNews(Integer id) {
        //根据id查询自媒体文章
        WmNews wmNews = wmNewsMapper.selectById(id);

        //判断是否为“提交审核”状态
        if(wmNews.getStatus()!=1){
            return;
        }

        //从文章提取图片(从MinIO下载图片)
        List<byte[]> imageList = getImagesFromNews(wmNews);

        //从文章中提取文本
        List<String> textList = getTextFromNews(wmNews,imageList);


        //检测自定义敏感词
        if(CollectionUtils.isNotEmpty(textList)){
            boolean flag = handleSensitiveScan(textList,wmNews);
            if(!flag){
                return;//如果审核失败则退出
            }
        }


        //把文本提交阿里云检测,根据结果文章修改
        if(CollectionUtils.isNotEmpty(textList)){
            try {
                Map result = greenTextScan.greeTextScan(textList);
                boolean flag = handleScanResult(result,wmNews);
                if(!flag){
                    return;//如果审核失败则退出审核
                }
            } catch (Exception e) {
                e.printStackTrace();
                log.error("调用阿里云API失败,{}",e.getMessage());
            }
        }

        //把图片提交阿里云检测,根据结果文章修改
        if(CollectionUtils.isNotEmpty(imageList)){
            try {
                Map result = greenImageScan.imageScan(imageList);
                boolean flag = handleScanResult(result,wmNews);
                if(!flag){
                    return;//如果审核失败则退出审核
                }
            } catch (Exception e) {
                e.printStackTrace();
                log.error("调用阿里云API失败,{}",e.getMessage());
            }
        }

        //判断发布时间大于当前时间,修改文章状态为8(待发布)
        if(wmNews.getPublishTime()!=null && wmNews.getPublishTime().after(new Date())){
            wmNews.setStatus(WmNews.Status.SUCCESS.getCode());
            wmNews.setReason("文章审核成功,待发布");
            wmNewsMapper.updateById(wmNews);

            //把当前自媒体文章发布任务添加到延迟队列中
            Long taskId = wmNewsTaskService.addWmNewsTask(wmNews);
            //更新wm_news表的task_id字段

            return; // 必须退出
        }

        //发布文章(文章保存App端 Feign接口)
        publishApArticle(wmNews);
    }

10、文章定时发布:消费任务发表文章

定义定时任务,每隔1秒扫描1次redis延迟任务队列,查询符合执行时间的任务,进行文章发表

package com.heima.wemedia.schedule;

import com.baomidou.mybatisplus.core.toolkit.CollectionUtils;
import com.heima.common.constants.ScheduleConstants;
import com.heima.model.schedule.dtos.Task;
import com.heima.model.wemedia.pojos.WmNews;
import com.heima.schedule.feign.TaskFeign;
import com.heima.utils.common.JsonUtils;
import com.heima.wemedia.mapper.WmNewsMapper;
import com.heima.wemedia.service.WmNewsAutoScanService;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Component;

import java.util.List;

@Component
public class WmNewsTaskJob {
    @Autowired
    private TaskFeign taskFeign;
    @Autowired
    private WmNewsAutoScanService wmNewsAutoScanService;
    @Autowired
    private WmNewsMapper wmNewsMapper;

    @Scheduled(fixedRate = 1000)
    public void pollWmNewsTask(){
        //到延迟队列中拉取任务
        List<Task> taskList = taskFeign.pollTask(ScheduleConstants.TASK_TOPIC_NEWS_PUBLISH);
        if(CollectionUtils.isNotEmpty(taskList)){
            for(Task task:taskList){
                //从Task取出文章ID
                WmNews wmNews = JsonUtils.toBean(task.getParameters(),WmNews.class);

                //查询文章
                wmNews = wmNewsMapper.selectById(wmNews.getId());

                //发布文章
                wmNewsAutoScanService.publishApArticle(wmNews);

                System.out.println("文章已经定时发布成功");
            }
        }
    }
}



在heima-leadnews-wemedia微服务开启定时任务

1638370272941

在heima-leadnews-wemedia微服务把publishArticle方法加入到WmNewsAutoScanService类上

1650077888222

11、课程总结

  1. 文章定时

    方案:Redis延迟任务(队列)

1642834794360

2)搭建延迟任务服务

​ 2.1 添加任务

​ 2.2 消费任务

​ 取消任务

3)文章微服务 调用 任务方法实现文章定时发布

原文地址:http://www.cnblogs.com/IsMhhla/p/16868526.html

1. 本站所有资源来源于用户上传和网络,如有侵权请邮件联系站长! 2. 分享目的仅供大家学习和交流,请务用于商业用途! 3. 如果你也有好源码或者教程,可以到用户中心发布,分享有积分奖励和额外收入! 4. 本站提供的源码、模板、插件等等其他资源,都不包含技术服务请大家谅解! 5. 如有链接无法下载、失效或广告,请联系管理员处理! 6. 本站资源售价只是赞助,收取费用仅维持本站的日常运营所需! 7. 如遇到加密压缩包,默认解压密码为"gltf",如遇到无法解压的请联系管理员! 8. 因为资源和程序源码均为可复制品,所以不支持任何理由的退款兑现,请斟酌后支付下载 声明:如果标题没有注明"已测试"或者"测试可用"等字样的资源源码均未经过站长测试.特别注意没有标注的源码不保证任何可用性