黑马头条--延迟任务精准发布文章
作者:互联网
1.添加任务
1.1.每次创建文章,就添加到任务中去
文章提交中调用添加任务方法
代码
@Override
@Async
public void addNewsToTask(Integer id, Date NEWS_SCAN_TIME) {
log.info("开始添加任务------->");
// 创建任务对象
Task task=new Task();
// 设置优先级 和类型
task.setTaskType(TaskTypeEnum.NEWS_SCAN_TIME.getTaskType());
task.setPriority(TaskTypeEnum.NEWS_SCAN_TIME.getPriority());
task.setExecuteTime(NEWS_SCAN_TIME.getTime());
// 创建文章对象
WmNews wmNews=new WmNews();
wmNews.setId(id);
byte[] serialize = ProtostuffUtil.serialize(wmNews);
// 序列化
task.setParameters(serialize);
// 添加任务
scheduleClient.addTask(task);
log.info("任务添加完成------->");
}
这里把文章id存入到了任务中的Parameters
属性里,方便后期拉去任务时获取Id然后调用自动审核的方法,把此Id传进去。
2.缓存任务定时转换刷新
/**
* 未来数据定时刷新
*/
@Scheduled(cron = "0 */1 * * * ?")
public void refresh(){
// 添加Redis分布式锁
cacheService.tryLock("FUTURE_TASK_SYNC", 1000 * 30);
log.info(System.currentTimeMillis() / 1000 + "执行了定时任务");
// 获取所有zset里的键
Set<String> scan = cacheService.scan(ScheduleConstants.FUTURE + "*");
for (String futureKey : scan) {
// 换成topic_key
String topicKey=ScheduleConstants.TOPIC+futureKey.split(ScheduleConstants.FUTURE)[1];
//获取该组key下当前需要消费的任务数据
Set<String> tasks = cacheService.zRangeByScore(futureKey, 0, System.currentTimeMillis());
if (!tasks.isEmpty()){
// 不为空 从future同步到topic
cacheService.refreshWithPipeline(futureKey,topicKey,tasks);
log.info("成功的将" + futureKey + "下的当前需要执行的任务数据刷新到" + topicKey + "下");
}
}
}
把未来时间
的任务如果小于了当前时间
就转换状态供调用者拉去消费
3.清空缓存,Mysql每五分钟向Redis同步一次,保证数据同步
代码
/**
* 每5分钟刷新一次数据到缓存
*/
@PostConstruct
@Scheduled(cron = "0 */5 * * * ?")
public void reloadTask(){
log.info("-------开始刷新进缓存-------");
// 清理缓存中的数据
clearTask();
// 查询数据库中小于未来5分钟的数据
// 获取5分钟后的时间 毫秒值
Calendar calendar = Calendar.getInstance();
calendar.add(Calendar.MINUTE, 5);
//查看小于未来5分钟的所有任务
List<Taskinfo> allTasks = taskinfoMapper.selectList(Wrappers.<Taskinfo>lambdaQuery().lt(Taskinfo::getExecuteTime,calendar.getTime()));
// 添加进Redis缓存中
if (allTasks!=null){
for (Taskinfo allTask : allTasks) {
Task task=new Task();
BeanUtils.copyProperties(allTask,task);
task.setExecuteTime(allTask.getExecuteTime().getTime());
//调用添加缓存的方法
addTaskToRedis(task);
}
log.info("以从数据库刷新------->缓存<--------");
}
}
4.消费者每过一秒拉去任务进行审核
代码
拉取成功后会把数据库中的任务给删除,相当于消费掉了
/**
* 按照频率拉取任务,自动审核
*/
@Override
@Scheduled(fixedRate = 1000) //每一秒拉去一次
public void scanNewsByTask() throws Exception {
log.info("文章审核---消费任务执行---begin---");
// 拉取任务
ResponseResult poll = scheduleClient.poll(TaskTypeEnum.NEWS_SCAN_TIME.getTaskType(), TaskTypeEnum.NEWS_SCAN_TIME.getPriority());
if (poll.getCode().equals(200)&&poll.getData()!=null){
// 获取Task对象
Task task = JSON.parseObject(JSON.toJSONString(poll.getData()), Task.class);
byte[] parameters = task.getParameters();
// 获取 WmNews
WmNews deserialize = ProtostuffUtil.deserialize(parameters, WmNews.class);
// 自动审核
wmNewsAutoScanService.autoScanWmNews(deserialize.getId());
}
log.info("文章审核---消费任务执行---end---");
}
标签:info,task,log,--,任务,缓存,Task,精准,头条 来源: https://www.cnblogs.com/sy2022/p/16552312.html