00、学习目标
- 能够说出什么延迟任务
- 能够整合SpringDataRedis
- 能够使用延迟任务实现文章定时发布
需求:定时发布文章
1)定时任务+表扫描原表
优点:实现简单
缺点:高频地查询原表数据,效率并不高,也会导致精确度不高
2)Redis过期key监听
优点:实现相对简单
缺点:
可能会占用过多Redis内存
依赖Redis持久化策略,容易数据丢失
当Redis内存多大时,Redis执行淘汰机制,也会数据丢失。
3)RabbitMQ死信队列+TTL过期时间
优点:MQ可靠性强,不容易数据丢失,处理效率也比较高
缺点:实现比较复杂
这种方案更适合过期时间固定的定时发布场景(不太适合精确发布(不固定时间))
4)Redis+MySQL实现延迟队列
优点:可靠性比较强,效率比较高,适合精确发布(不固定时间)
缺点:实现比较复杂
01、文章定时发布:延迟任务技术对比
1)JDK自带的DelayQueue
JDK自带DelayQueue 是一个支持延时获取元素的阻塞队列, 内部采用优先队列 PriorityQueue 存储元素,同时元素必须实现 Delayed 接口;在创建元素时可以指定多久才可以从队列中获取当前元素,只有在延迟期满时才能从队列中提取元素
DelayQueue属于排序队列,它的特殊之处在于队列的元素必须实现Delayed接口,该接口需要实现compareTo和getDelay方法
getDelay方法:获取元素在队列中的剩余时间,只有当剩余时间为0时元素才可以出队列。
compareTo方法:用于排序,确定元素出队列的顺序。
DelayQueue的问题:
使用线程池或者原生DelayQueue程序挂掉之后,任务都是放在内存,需要考虑未处理消息的丢失带来的影响,如何保证数据不丢失,需要持久化(磁盘)
2)RabbitMQ实现延迟任务
- TTL:Time To Live (消息存活时间) 原理:死信队列+TTL消息
- 死信队列:Dead Letter Exchange(死信交换机),当消息成为Dead message后,可以重新发送另一个交换机(死信交换机)
TTL消息,过期时间如何计算? TTL的定时时间 = 用户选择的发布时间毫秒值-当前时间毫秒值
RabbitMQ实现延迟任务的问题:
是一种可取方案,但实现过程相对复杂一些。
3)Redis实现延迟任务(本项目使用)
Redis实现延迟任务的具体实现思路
关键:采用Redis的SortedSet实现延迟队列
问题思路
1.为什么采用Redis的ZSet实现延迟任务?
zset数据类型的去重有序(score分数排序)特点进行延迟。例如:时间戳作为score进行排序
2.为什么任务需要存储在MySQL数据库中?
延迟任务是一个通用的服务,任何需要延迟得任务都可以调用该服务,且需要考虑数据持久化的问题,存储数据库中是一种数据安全的考虑,且Redis如果Zset数据存储过多,效率下降,所有先将任务存储在MySQL先,定时再导入到Redis中去执行。
3.在添加zset数据的时候,为什么只存储未来5分钟内的任务?
任务模块是一个通用的模块,项目中任何需要延迟队列的地方,都可以调用这个接口,要考虑到数据量的问题,如果数据量特别大,为了防止阻塞,只需要把未来几分钟要执行的数据存入缓存即可。
02、文章定时发布:延迟任务表结构和实体
任务表:
任务日志表:
version:用于MyBatisPlus的乐观锁,防止多线程并发修改同一条数据时出现数据安全问题。
Taskinfo 任务实体
package com.heima.model.schedule.pojos;
import com.baomidou.mybatisplus.annotation.IdType;
import com.baomidou.mybatisplus.annotation.TableField;
import com.baomidou.mybatisplus.annotation.TableId;
import com.baomidou.mybatisplus.annotation.TableName;
import lombok.Data;
import java.io.Serializable;
import java.util.Date;
@Data
@TableName("taskinfo")
public class Taskinfo{
/**
* 任务id
*/
@TableId(type = IdType.ID_WORKER)
private Long taskId;
/**
* 执行时间
*/
@TableField("execute_time")
private Date executeTime;
/**
* 参数
*/
@TableField("parameters")
private String parameters;
/**
* 任务主题
*/
@TableField("task_topic")
private Integer taskTopic;
}
TaskinfoLogs 任务日志实体:
package com.heima.model.schedule.pojos;
import com.baomidou.mybatisplus.annotation.*;
import lombok.Data;
import java.io.Serializable;
import java.util.Date;
@Data
@TableName("taskinfo_logs")
public class TaskinfoLogs{
/**
* 任务id
*/
@TableId(type = IdType.ID_WORKER)
private Long taskId;
/**
* 执行时间
*/
@TableField("execute_time")
private Date executeTime;
/**
* 参数
*/
@TableField("parameters")
private String parameters;
/**
* 任务主题
*/
@TableField("task_topic")
private Integer taskTopic;
/**
* 版本号,用乐观锁
*/
@Version
private Integer version;
/**
* 状态 0=int 1=EXECUTED 2=CANCELLED
*/
@TableField("status")
private Integer status;
}
03、文章定时发布:搭建延迟任务微服务
1)创建项目并导入依赖
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<artifactId>heima-leadnews-service</artifactId>
<groupId>com.heima</groupId>
<version>1.0-SNAPSHOT</version>
</parent>
<modelVersion>4.0.0</modelVersion>
<artifactId>heima-leadnews-schedule</artifactId>
<dependencies>
<dependency>
<groupId>com.heima</groupId>
<artifactId>heima-leadnews-common</artifactId>
<version>1.0-SNAPSHOT</version>
</dependency>
<dependency>
<groupId>com.heima</groupId>
<artifactId>heima-leadnews-model</artifactId>
<version>1.0-SNAPSHOT</version>
</dependency>
<dependency>
<groupId>com.heima</groupId>
<artifactId>heima-leadnews-utils</artifactId>
<version>1.0-SNAPSHOT</version>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
</dependency>
<dependency>
<groupId>com.baomidou</groupId>
<artifactId>mybatis-plus-boot-starter</artifactId>
</dependency>
<dependency>
<groupId>com.alibaba.cloud</groupId>
<artifactId>spring-cloud-starter-alibaba-nacos-discovery</artifactId>
</dependency>
<dependency>
<groupId>com.alibaba.cloud</groupId>
<artifactId>spring-cloud-starter-alibaba-nacos-config</artifactId>
</dependency>
</dependencies>
</project>
2)编写启动类
package com.heima.schedule;
import com.baomidou.mybatisplus.extension.plugins.OptimisticLockerInterceptor;
import com.baomidou.mybatisplus.extension.plugins.PaginationInterceptor;
import org.mybatis.spring.annotation.MapperScan;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.context.annotation.Bean;
/**
* 延迟任务微服务
*/
@SpringBootApplication
@MapperScan("com.heima.schedule.mapper")
public class ScheduleApplication {
public static void main(String[] args) {
SpringApplication.run(ScheduleApplication.class,args);
}
/**
* mybatis-plus乐观锁支持
* @return
*/
@Bean
public OptimisticLockerInterceptor optimisticLockerInterceptor(){
return new OptimisticLockerInterceptor();
}
}
3)配置bootstrap.yml
server:
port: 9004
spring:
application:
name: leadnews-schedule
cloud:
nacos:
discovery:
server-addr: 192.168.66.133:8848
config:
server-addr: 192.168.66.133:8848
file-extension: yml
4)配置logback.xml
<?xml version="1.0" encoding="UTF-8"?>
<configuration>
<!--定义日志文件的存储地址,使用绝对路径-->
<property name="LOG_HOME" value="e:/logs"/>
<!-- Console 输出设置 -->
<appender name="CONSOLE" class="ch.qos.logback.core.ConsoleAppender">
<encoder>
<!--格式化输出:%d表示日期,%thread表示线程名,%-5level:级别从左显示5个字符宽度%msg:日志消息,%n是换行符-->
<pattern>%d{yyyy-MM-dd HH:mm:ss.SSS} [%thread] %-5level %logger{36} - %msg%n</pattern>
<charset>utf8</charset>
</encoder>
</appender>
<!-- 按照每天生成日志文件 -->
<appender name="FILE" class="ch.qos.logback.core.rolling.RollingFileAppender">
<rollingPolicy class="ch.qos.logback.core.rolling.TimeBasedRollingPolicy">
<!--日志文件输出的文件名-->
<fileNamePattern>${LOG_HOME}/leadnews.%d{yyyy-MM-dd}.log</fileNamePattern>
</rollingPolicy>
<encoder>
<pattern>%d{yyyy-MM-dd HH:mm:ss.SSS} [%thread] %-5level %logger{36} - %msg%n</pattern>
</encoder>
</appender>
<!-- 异步输出 -->
<appender name="ASYNC" class="ch.qos.logback.classic.AsyncAppender">
<!-- 不丢失日志.默认的,如果队列的80%已满,则会丢弃TRACT、DEBUG、INFO级别的日志 -->
<discardingThreshold>0</discardingThreshold>
<!-- 更改默认的队列的深度,该值会影响性能.默认值为256 -->
<queueSize>512</queueSize>
<!-- 添加附加的appender,最多只能添加一个 -->
<appender-ref ref="FILE"/>
</appender>
<logger name="org.apache.ibatis.cache.decorators.LoggingCache" level="DEBUG" additivity="false">
<appender-ref ref="CONSOLE"/>
</logger>
<logger name="org.springframework.boot" level="debug"/>
<root level="info">
<!--<appender-ref ref="ASYNC"/>-->
<appender-ref ref="FILE"/>
<appender-ref ref="CONSOLE"/>
</root>
</configuration>
5)在Nacos添加配置
spring:
datasource:
driver-class-name: com.mysql.cj.jdbc.Driver
url: jdbc:mysql://192.168.66.133:3306/leadnews_schedule?serverTimezone=Asia/Shanghai&characterEncoding=utf8&useUnicode=true&useSSL=true
username: root
password: root
# 设置Mapper接口所对应的XML文件位置,如果你在Mapper接口中有自定义方法,需要进行该配置
mybatis-plus:
mapper-locations: classpath*:mapper/*.xml
# 设置别名包扫描路径,通过该属性可以给包中的类注册别名
type-aliases-package: com.heima.model.schedule.pojos
logging:
level:
com.heima.schedule: debug
6)Mapper接口
package com.heima.schedule.mapper;
import com.baomidou.mybatisplus.core.mapper.BaseMapper;
import com.heima.model.schedule.pojos.Taskinfo;
public interface TaskinfoMapper extends BaseMapper<Taskinfo> {
}
package com.heima.schedule.mapper;
import com.baomidou.mybatisplus.core.mapper.BaseMapper;
import com.heima.model.schedule.pojos.TaskinfoLogs;
public interface TaskinfoLogsMapper extends BaseMapper<TaskinfoLogs> {
}
04、文章定时发布:集成Redis及ZSet的使用
1)创建redis容器(已完成)
docker run -d --name redis --restart=always -p 6379:6379 redis
requirepass:定义redis的登录密码
2)导入redis依赖
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-redis</artifactId>
</dependency>
3)添加redis连接配置(Nacos)
spring:
redis:
host: 192.168.66.133
password: leadnews
port: 6379
4)编写测试类
package com.heima.schedule;
import com.baomidou.mybatisplus.extension.api.R;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.test.context.junit4.SpringRunner;
import java.util.Random;
import java.util.Set;
/**
* 演示SortedSet
*/
@RunWith(SpringRunner.class)
@SpringBootTest(classes = ScheduleApplication.class)
public class SortedSetTest {
@Autowired
private StringRedisTemplate redisTemplate;
/**
* 向SortedSet存入数据
*/
@Test
public void testAdd(){
Random random = new Random();
for(int i = 1;i<=20;i++) {
redisTemplate.opsForZSet().add("hello","jack"+i,random.nextInt(100));
}
}
/**
* 从SortedSet查询数据
*/
@Test
public void testRange(){
//根据元素下标位置查询元素(下标从0开始计算)
//Set<String> set = redisTemplate.opsForZSet().range("hello", 3, 9);
//根据元素的score分数查询元素(score从6至27)
//Set<String> set = redisTemplate.opsForZSet().rangeByScore("hello",6,27);
//查询score小于等于27分的元素
Set<String> set = redisTemplate.opsForZSet().rangeByScore("hello",0,27);
System.out.println(set);
}
/**
* 移除SortedSet数据
*/
@Test
public void testRemove(){
//精准移除
//redisTemplate.opsForZSet().remove("hello","jack11");
//score范围移除
redisTemplate.opsForZSet().removeRangeByScore("hello",0,27);
}
}
05、文章定时发布:添加任务
Dto:
Task对象用于存储每个任务的数据,可以在微服务之间传输任务数据时使用
package com.heima.model.schedule.dtos;
import lombok.Data;
import java.io.Serializable;
package com.heima.model.schedule.dtos;
import lombok.Data;
import java.io.Serializable;
@Data
public class Task{
/**
* 任务id
*/
private Long taskId;
/**
* 类型
*/
private Integer taskTopic;
/**
* 执行时间
*/
private long executeTime;
/**
* task参数
*/
private String parameters;
}
接口:
package com.heima.schedule.service;
import com.heima.model.schedule.dtos.Task;
import java.util.List;
/**
* 任务处理业务
*/
public interface TaskService {
/**
* 添加任务
*/
public long addTask(Task task);
}
实现:
package com.heima.schedule.service.impl;
import com.heima.common.constants.RedisConstants;
import com.heima.common.constants.ScheduleConstants;
import com.heima.model.schedule.dtos.Task;
import com.heima.model.schedule.pojos.Taskinfo;
import com.heima.model.schedule.pojos.TaskinfoLogs;
import com.heima.schedule.mapper.TaskinfoLogsMapper;
import com.heima.schedule.mapper.TaskinfoMapper;
import com.heima.schedule.service.TaskService;
import com.heima.utils.common.BeanHelper;
import com.heima.utils.common.JsonUtils;
import org.joda.time.DateTime;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;
import springfox.documentation.spring.web.json.Json;
import java.util.Date;
@Service
public class TaskServiceImpl implements TaskService {
@Autowired
private TaskinfoMapper taskinfoMapper;
@Autowired
private TaskinfoLogsMapper taskinfoLogsMapper;
@Autowired
private StringRedisTemplate redisTemplate;
@Override
@Transactional
public Long addTask(Task task) {
//把任务添加到DB
addTaskToDb(task);
//把任务添加Redis
addTaskToCache(task);
return task.getTaskId();
}
/**
* 把任务添加Redis
* @param task
*/
private void addTaskToCache(Task task) {
//判断任务的执行时间是否在未来5分钟以内
//获取当前时间的未来5分钟的时间
long futureTime = DateTime.now().plusMinutes(5).getMillis();
if(task.getExecuteTime()<=futureTime){
String key = RedisConstants.TASK_TOPIC_PREFIX+task.getTaskTopic();
redisTemplate.opsForZSet().add(key, JsonUtils.toString(task),task.getExecuteTime());
}
}
/**
* 把任务添加到DB
* @param task
*/
private void addTaskToDb(Task task) {
try {
//添加任务表
Taskinfo taskinfo = BeanHelper.copyProperties(task,Taskinfo.class);
taskinfo.setExecuteTime(new Date(task.getExecuteTime()));
taskinfoMapper.insert(taskinfo);
//把新产生的任务ID赋值给Task对象
task.setTaskId(taskinfo.getTaskId());
//添加任务日志表
TaskinfoLogs taskinfoLogs = BeanHelper.copyProperties(taskinfo,TaskinfoLogs.class);
taskinfoLogs.setVersion(1);//设置初始值,后续依靠MyBatisPlus乐观锁拦截器实现version更新
taskinfoLogs.setStatus(ScheduleConstants.SCHEDULED);
taskinfoLogsMapper.insert(taskinfoLogs);
} catch (Exception e) {
e.printStackTrace();
throw new RuntimeException(e);
}
}
}
定义任务状态常量类:
package com.heima.common.constants;
public class ScheduleConstants {
//task状态
public static final int SCHEDULED=0; //初始化状态
public static final int EXECUTED=1; //已执行状态
public static final int CANCELLED=2; //已取消状态
public static final int TASK_TOPIC_NEWS_PUBLISH=1; //文章发布任务
}
定义Redis常量类
package com.heima.common.constants;
public abstract class RedisConstants {
public static String TASK_TOPIC_PREFIX = "task_topic_";
}
编写测试类
package com.heima.schedule;
import com.heima.common.constants.ScheduleConstants;
import com.heima.model.schedule.dtos.Task;
import com.heima.schedule.service.TaskService;
import com.heima.utils.common.JsonUtils;
import org.joda.time.DateTime;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.test.context.junit4.SpringRunner;
import java.util.HashMap;
import java.util.Map;
import java.util.Set;
/**
* 测试延迟任务的方法
*/
@RunWith(SpringRunner.class)
@SpringBootTest(classes = ScheduleApplication.class)
public class TaskServiceTest {
@Autowired
private TaskService taskService;
@Test
public void testAddTask(){
for(int i=1;i<=10;i++){
//模拟文章定时发布
Task task = new Task();
task.setTaskTopic(ScheduleConstants.TASK_TOPIC_NEWS_PUBLISH);
Map map = new HashMap<>();
map.put("id",6294);
task.setParameters(JsonUtils.toString(map));
task.setExecuteTime(DateTime.now().plusMinutes(i).getMillis());
taskService.addTask(task);
}
}
}
06、文章定时发布:消费任务
接口:
package com.heima.schedule.service;
import com.heima.model.schedule.dtos.Task;
import java.util.List;
public interface TaskService {
/**
* 添加任务到延迟队列
*/
public Long addTask(Task task);
/**
* 从延迟队列消费任务
* 重点:从延迟队列取出符合条件(根据score查询,score小于或等于当前时间毫秒值)
*/
public List<Task> pollTask(Integer taskTopic);
}
实现:
package com.heima.schedule.service.impl;
import com.baomidou.mybatisplus.core.toolkit.CollectionUtils;
import com.heima.common.constants.RedisConstants;
import com.heima.common.constants.ScheduleConstants;
import com.heima.model.schedule.dtos.Task;
import com.heima.model.schedule.pojos.Taskinfo;
import com.heima.model.schedule.pojos.TaskinfoLogs;
import com.heima.schedule.mapper.TaskinfoLogsMapper;
import com.heima.schedule.mapper.TaskinfoMapper;
import com.heima.schedule.service.TaskService;
import com.heima.utils.common.BeanHelper;
import com.heima.utils.common.JsonUtils;
import org.joda.time.DateTime;
import org.junit.Test;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;
import java.util.ArrayList;
import java.util.Date;
import java.util.List;
import java.util.Set;
@Service
public class TaskServiceImpl implements TaskService {
@Autowired
private TaskinfoMapper taskinfoMapper;
@Autowired
private TaskinfoLogsMapper taskinfoLogsMapper;
@Autowired
private StringRedisTemplate redisTemplate;
@Override
@Transactional
public Long addTask(Task task) {
//把任务添加到DB
addTaskToDB(task);
//把任务添加到Redis
addTaskToCache(task);
return task.getTaskId();
}
/**
* 任务添加到redis
* @param task
*/
private void addTaskToCache(Task task) {
//判断当前任务的执行时间是否在5分钟内,5分钟内的任务才存入Redis
long futureDate = DateTime.now().plusMinutes(5).getMillis();
if(task.getExecuteTime()<=futureDate){
//存入redis的SortedSet
String key = RedisConstants.TASK_TOPIC_PREFIX+task.getTaskTopic();
redisTemplate.opsForZSet().add(key, JsonUtils.toString(task),task.getExecuteTime());
}
}
/**
* 把任务添加到DB
* @param task
*/
public void addTaskToDB(Task task) {
try {
//任务添加到任务表
Taskinfo taskinfo = BeanHelper.copyProperties(task,Taskinfo.class);
taskinfo.setExecuteTime(new Date(task.getExecuteTime()));
taskinfoMapper.insert(taskinfo);
task.setTaskId(taskinfo.getTaskId());
//任务添加到任务日志表
TaskinfoLogs taskinfoLogs = BeanHelper.copyProperties(taskinfo,TaskinfoLogs.class);
taskinfoLogs.setStatus(ScheduleConstants.SCHEDULED);//初始化状态
taskinfoLogs.setVersion(1);
taskinfoLogsMapper.insert(taskinfoLogs);
} catch (Exception e) {
e.printStackTrace();
throw new RuntimeException(e);
}
}
@Override
@Transactional
public List<Task> pollTask(Integer taskTopic) {
String key = RedisConstants.TASK_TOPIC_PREFIX+taskTopic;
//查询redis中符合执行条件的任务
Set<String> taskSet = redisTemplate.opsForZSet().rangeByScore(key, 0, System.currentTimeMillis());
List<Task> taskList = new ArrayList<>();
if(CollectionUtils.isNotEmpty(taskSet)){
for(String taskJson:taskSet){
Task task = JsonUtils.toBean(taskJson,Task.class);
//更新DB数据
updateTaskToDB(task);
//删除redis数据
redisTemplate.opsForZSet().remove(key,taskJson);
taskList.add(task);
}
}
return taskList;
}
/**
* 更新DB的任务数据
* @param task
*/
private void updateTaskToDB(Task task) {
try {
//删除任务表记录
taskinfoMapper.deleteById(task.getTaskId());
//更新任务日志表记录
TaskinfoLogs taskinfoLogs = taskinfoLogsMapper.selectById(task.getTaskId());
taskinfoLogs.setStatus(ScheduleConstants.EXECUTED);//已执行
taskinfoLogsMapper.updateById(taskinfoLogs);
} catch (Exception e) {
e.printStackTrace();
throw new RuntimeException(e);
}
}
}
测试
package com.heima.schedule;
import com.baomidou.mybatisplus.extension.api.R;
import com.heima.model.schedule.dtos.Task;
import com.heima.schedule.service.TaskService;
import org.joda.time.DateTime;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringRunner;
import java.util.List;
import java.util.Random;
@RunWith(SpringRunner.class)
@SpringBootTest(classes = ScheduleApplication.class)
public class TaskServiceTest {
@Autowired
private TaskService taskService;
/**
* 添加延迟任务
*/
@Test
public void testAddTask(){
Random random = new Random();
for(int i=1;i<=20;i++){
Task task = new Task();
task.setTaskTopic(1);
task.setExecuteTime(DateTime.now().plusMinutes(random.nextInt(10)).getMillis());
task.setParameters("task"+i);
taskService.addTask(task);
}
}
/**
* 消费延迟任务
*/
@Test
public void testPollTask(){
List<Task> taskList = taskService.pollTask(1);
}
}
07、文章定时发布:数据库同步到Redis
package com.heima.schedule.job;
import com.baomidou.mybatisplus.core.conditions.query.QueryWrapper;
import com.baomidou.mybatisplus.core.toolkit.CollectionUtils;
import com.heima.common.constants.RedisConstants;
import com.heima.model.schedule.dtos.Task;
import com.heima.model.schedule.pojos.Taskinfo;
import com.heima.schedule.mapper.TaskinfoMapper;
import com.heima.utils.common.BeanHelper;
import com.heima.utils.common.JsonUtils;
import lombok.extern.slf4j.Slf4j;
import org.joda.time.DateTime;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Component;
import java.util.Date;
import java.util.List;
@Component
@Slf4j
public class SyncDbToCacheJob {
@Autowired
private TaskinfoMapper taskinfoMapper;
@Autowired
private StringRedisTemplate redisTemplate;
/**
* 定时同步MySQL数据到缓存
*/
@Scheduled(fixedRate = 10000)
public void syncData(){
log.info("执行同步MySQL数据到缓存任务,{}",new Date());
//查询MySQL符合条件(未来5分钟内执行)的任务
Date futureTime = DateTime.now().plusMinutes(5).toDate();
QueryWrapper<Taskinfo> queryWrapper = new QueryWrapper<>();
queryWrapper.lt("execute_time",futureTime);
List<Taskinfo> taskinfoList = taskinfoMapper.selectList(queryWrapper);
//导入缓存
if(CollectionUtils.isNotEmpty(taskinfoList)){
for(Taskinfo taskinfo:taskinfoList){
String key = RedisConstants.TASK_TOPIC_PREFIX+taskinfo.getTaskTopic();//区分不同任务类型
Task task = BeanHelper.copyProperties(taskinfo,Task.class);
task.setExecuteTime(taskinfo.getExecuteTime().getTime());
redisTemplate.opsForZSet().add(key, JsonUtils.toString(task),task.getExecuteTime());//把任务的执行时间作为score分数
}
}
}
}
在启动类开启定时任务
08、文章定时发布:延迟任务Feign接口
1)Controller
在heima-leadnews-schedule提供Controller
package com.heima.schedule.controller;
import com.heima.model.schedule.dtos.Task;
import com.heima.schedule.service.TaskService;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.*;
import java.util.List;
@RestController
@RequestMapping("/task")
public class TaskController {
@Autowired
private TaskService taskService;
/**
* 添加延迟队列任务
*/
@PostMapping("/addTask")
public Long addTask(@RequestBody Task task){
return taskService.addTask(task);
}
/**
* 消费任务
*/
@PostMapping("/pollTask/{taskTopic}")
public List<Task> pollTask(@PathVariable("taskTopic") Integer taskTopic){
return taskService.pollTask(taskTopic);
}
}
2)Feign接口
在heima-leadnews-api项目添加TaskFeign接口
package com.heima.schedule.feign;
import com.heima.model.schedule.dtos.Task;
import org.springframework.cloud.openfeign.FeignClient;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestBody;
import java.util.List;
@FeignClient(name = "leadnews-schedule",path = "/task")
public interface TaskFeign {
/**
* 添加延迟队列任务
*/
@PostMapping("/addTask")
public Long addTask(@RequestBody Task task);
/**
* 消费任务
*/
@PostMapping("/pollTask/{taskTopic}")
public List<Task> pollTask(@PathVariable("taskTopic") Integer taskTopic);
}
09、文章定时发布:审核文章添加延迟任务
1)定义添加任务方法
在heima-leadnews-wemedia服务中
接口:
package com.heima.wemedia.service;
import com.heima.model.wemedia.pojos.WmNews;
public interface WmNewsTaskService {
/**
* 添加自媒体定时发布延迟任务
*/
public Long addWmNewsTask(WmNews wmNews);
}
实现:
package com.heima.wemedia.service.impl;
import com.heima.common.constants.ScheduleConstants;
import com.heima.model.schedule.dtos.Task;
import com.heima.model.wemedia.pojos.WmNews;
import com.heima.schedule.feign.TaskFeign;
import com.heima.utils.common.JsonUtils;
import com.heima.wemedia.service.WmNewsTaskService;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
@Service
public class WmNewsTaskServiceImpl implements WmNewsTaskService {
@Autowired
private TaskFeign taskFeign;
@Override
public Long addWmNewsTask(WmNews wmNews) {
Task task = new Task();
task.setTaskTopic(ScheduleConstants.TASK_TOPIC_NEWS_PUBLISH); //1 代表自媒体文章定时发布
task.setExecuteTime(wmNews.getPublishTime().getTime());//必须自媒体定时发布时间
WmNews news = new WmNews();
news.setId(wmNews.getId());
task.setParameters(JsonUtils.toString(news));
Long taskId = taskFeign.addTask(task);
return taskId;
}
}
2)审核文章添加延迟任务
@Override
@Async //该方法会放在一个独立的线程中被执行
@GlobalTransactional //加入全局事务
public void autoScanWmNews(Integer id) {
//根据id查询自媒体文章
WmNews wmNews = wmNewsMapper.selectById(id);
//判断是否为“提交审核”状态
if(wmNews.getStatus()!=1){
return;
}
//从文章提取图片(从MinIO下载图片)
List<byte[]> imageList = getImagesFromNews(wmNews);
//从文章中提取文本
List<String> textList = getTextFromNews(wmNews,imageList);
//检测自定义敏感词
if(CollectionUtils.isNotEmpty(textList)){
boolean flag = handleSensitiveScan(textList,wmNews);
if(!flag){
return;//如果审核失败则退出
}
}
//把文本提交阿里云检测,根据结果文章修改
if(CollectionUtils.isNotEmpty(textList)){
try {
Map result = greenTextScan.greeTextScan(textList);
boolean flag = handleScanResult(result,wmNews);
if(!flag){
return;//如果审核失败则退出审核
}
} catch (Exception e) {
e.printStackTrace();
log.error("调用阿里云API失败,{}",e.getMessage());
}
}
//把图片提交阿里云检测,根据结果文章修改
if(CollectionUtils.isNotEmpty(imageList)){
try {
Map result = greenImageScan.imageScan(imageList);
boolean flag = handleScanResult(result,wmNews);
if(!flag){
return;//如果审核失败则退出审核
}
} catch (Exception e) {
e.printStackTrace();
log.error("调用阿里云API失败,{}",e.getMessage());
}
}
//判断发布时间大于当前时间,修改文章状态为8(待发布)
if(wmNews.getPublishTime()!=null && wmNews.getPublishTime().after(new Date())){
wmNews.setStatus(WmNews.Status.SUCCESS.getCode());
wmNews.setReason("文章审核成功,待发布");
wmNewsMapper.updateById(wmNews);
//把当前自媒体文章发布任务添加到延迟队列中
Long taskId = wmNewsTaskService.addWmNewsTask(wmNews);
//更新wm_news表的task_id字段
return; // 必须退出
}
//发布文章(文章保存App端 Feign接口)
publishApArticle(wmNews);
}
10、文章定时发布:消费任务发表文章
定义定时任务,每隔1秒扫描1次redis延迟任务队列,查询符合执行时间的任务,进行文章发表
package com.heima.wemedia.schedule;
import com.baomidou.mybatisplus.core.toolkit.CollectionUtils;
import com.heima.common.constants.ScheduleConstants;
import com.heima.model.schedule.dtos.Task;
import com.heima.model.wemedia.pojos.WmNews;
import com.heima.schedule.feign.TaskFeign;
import com.heima.utils.common.JsonUtils;
import com.heima.wemedia.mapper.WmNewsMapper;
import com.heima.wemedia.service.WmNewsAutoScanService;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Component;
import java.util.List;
@Component
public class WmNewsTaskJob {
@Autowired
private TaskFeign taskFeign;
@Autowired
private WmNewsAutoScanService wmNewsAutoScanService;
@Autowired
private WmNewsMapper wmNewsMapper;
@Scheduled(fixedRate = 1000)
public void pollWmNewsTask(){
//到延迟队列中拉取任务
List<Task> taskList = taskFeign.pollTask(ScheduleConstants.TASK_TOPIC_NEWS_PUBLISH);
if(CollectionUtils.isNotEmpty(taskList)){
for(Task task:taskList){
//从Task取出文章ID
WmNews wmNews = JsonUtils.toBean(task.getParameters(),WmNews.class);
//查询文章
wmNews = wmNewsMapper.selectById(wmNews.getId());
//发布文章
wmNewsAutoScanService.publishApArticle(wmNews);
System.out.println("文章已经定时发布成功");
}
}
}
}
在heima-leadnews-wemedia微服务开启定时任务
在heima-leadnews-wemedia微服务把publishArticle方法加入到WmNewsAutoScanService类上
11、课程总结
-
文章定时
方案:Redis延迟任务(队列)
2)搭建延迟任务服务
2.1 添加任务
2.2 消费任务
取消任务
3)文章微服务 调用 任务方法实现文章定时发布
原文地址:http://www.cnblogs.com/IsMhhla/p/16868526.html