其他分享
首页 > 其他分享> > 黑马头条--延迟任务精准发布文章

黑马头条--延迟任务精准发布文章

作者:互联网

1.添加任务

1.1.每次创建文章,就添加到任务中去

文章提交中调用添加任务方法

代码

  @Override
    @Async
    public void addNewsToTask(Integer id, Date NEWS_SCAN_TIME) {
        log.info("开始添加任务------->");
//        创建任务对象
        Task task=new Task();
//        设置优先级 和类型
        task.setTaskType(TaskTypeEnum.NEWS_SCAN_TIME.getTaskType());
        task.setPriority(TaskTypeEnum.NEWS_SCAN_TIME.getPriority());
        task.setExecuteTime(NEWS_SCAN_TIME.getTime());
//        创建文章对象
        WmNews wmNews=new WmNews();
        wmNews.setId(id);
        byte[] serialize = ProtostuffUtil.serialize(wmNews);
//        序列化
        task.setParameters(serialize);
//        添加任务
        scheduleClient.addTask(task);
        log.info("任务添加完成------->");
    }

这里把文章id存入到了任务中的Parameters属性里,方便后期拉去任务时获取Id然后调用自动审核的方法,把此Id传进去。

2.缓存任务定时转换刷新

 /**
     * 未来数据定时刷新
     */
    @Scheduled(cron = "0 */1 * * * ?")
    public void refresh(){
//        添加Redis分布式锁
        cacheService.tryLock("FUTURE_TASK_SYNC", 1000 * 30);
        log.info(System.currentTimeMillis() / 1000 + "执行了定时任务");
//        获取所有zset里的键
        Set<String> scan = cacheService.scan(ScheduleConstants.FUTURE + "*");
        for (String futureKey : scan) {
//            换成topic_key
            String topicKey=ScheduleConstants.TOPIC+futureKey.split(ScheduleConstants.FUTURE)[1];
            //获取该组key下当前需要消费的任务数据
            Set<String> tasks = cacheService.zRangeByScore(futureKey, 0, System.currentTimeMillis());
            if (!tasks.isEmpty()){
//                不为空  从future同步到topic
                cacheService.refreshWithPipeline(futureKey,topicKey,tasks);
                log.info("成功的将" + futureKey + "下的当前需要执行的任务数据刷新到" + topicKey + "下");
            }
        }


    }

未来时间的任务如果小于了当前时间就转换状态供调用者拉去消费

3.清空缓存,Mysql每五分钟向Redis同步一次,保证数据同步

代码

 /**
     * 每5分钟刷新一次数据到缓存
     */
    @PostConstruct
    @Scheduled(cron = "0 */5 * * * ?")
    public void reloadTask(){
        log.info("-------开始刷新进缓存-------");
//        清理缓存中的数据
          clearTask();
//          查询数据库中小于未来5分钟的数据
//        获取5分钟后的时间 毫秒值
        Calendar calendar = Calendar.getInstance();
        calendar.add(Calendar.MINUTE, 5);
        //查看小于未来5分钟的所有任务
        List<Taskinfo> allTasks = taskinfoMapper.selectList(Wrappers.<Taskinfo>lambdaQuery().lt(Taskinfo::getExecuteTime,calendar.getTime()));
//        添加进Redis缓存中
        if (allTasks!=null){
            for (Taskinfo allTask : allTasks) {
                Task task=new Task();
                BeanUtils.copyProperties(allTask,task);
                task.setExecuteTime(allTask.getExecuteTime().getTime());
                //调用添加缓存的方法
                addTaskToRedis(task);
            }
            log.info("以从数据库刷新------->缓存<--------");
        }
    }

4.消费者每过一秒拉去任务进行审核

代码

拉取成功后会把数据库中的任务给删除,相当于消费掉了

 /**
     * 按照频率拉取任务,自动审核
     */
    @Override
    @Scheduled(fixedRate = 1000) //每一秒拉去一次
    public void scanNewsByTask() throws Exception {
        log.info("文章审核---消费任务执行---begin---");
//        拉取任务
        ResponseResult poll = scheduleClient.poll(TaskTypeEnum.NEWS_SCAN_TIME.getTaskType(), TaskTypeEnum.NEWS_SCAN_TIME.getPriority());
        if (poll.getCode().equals(200)&&poll.getData()!=null){
//            获取Task对象
            Task task = JSON.parseObject(JSON.toJSONString(poll.getData()), Task.class);
            byte[] parameters = task.getParameters();
//            获取 WmNews
            WmNews deserialize = ProtostuffUtil.deserialize(parameters, WmNews.class);
//            自动审核
            wmNewsAutoScanService.autoScanWmNews(deserialize.getId());
        }
        log.info("文章审核---消费任务执行---end---");
    }

标签:info,task,log,--,任务,缓存,Task,精准,头条
来源: https://www.cnblogs.com/sy2022/p/16552312.html