编程语言
首页 > 编程语言> > Agenda文档与源码学习

Agenda文档与源码学习

作者:互联网

Agenda定时任务学习

Agenda配置

一些问题

任务执行的顺序是什么?
任务默认遵循带优先级的先进先出执行顺序。不过也修改sort参数来改变执行规则。

lockLimit和maxConcurrency的区别是什么?
agenda会将任务一个接一个的上锁,并在mongodb数据库中设置lockedAt参数,并创建Job类实例缓存到_lockedJobs数组中,这个过程默认没有限制,但可以使用lockLimit进行限制,如果所有任务都需要被执行,则agenda会尝试上锁所有任务,并将任务从_lockedJobs数组中发到_runningJobs数组中,这些任务运行后会执行用户的代码,这可以通过maxConcurrency参数进行限制。
但如果许多任务在同一时间执行,你或许会发现他们甚至没有被加载进来,因为他们会尽可能多地给任务上锁,尽管没有足够的并发来处理他们。里有调整lockLimit和maxConcurrency参数来解决这个问题。

源码

保存任务

agenda/save-job.js
如果任务已经存在则更新数据库中的信息,如果不存在则插入一条记录。(其他无关代码省略)

module.exports = async function(job) {
  try {
    // 保存任务信息,并设置最后修改者为当前agenda队列进程
    const props = job.toJSON();
    props.lastModifiedBy = this._name;
    // 获取当前时间并设置默认查询选项
    const now = new Date();
    const protect = {};
    let update = {$set: props};
    // 如果当前任务已经有id,则修改相关属性,例如谁最后一次修改的它,并返回
    if (id) {
      const result = await this._collection.findOneAndUpdate(
        {_id: id},
        update,
        {returnOriginal: false}
      );
      return processDbResult(job, result);
    }
    // 否则,直接插入一条
    const result = await this._collection.insertOne(props);
    return processDbResult(job, result);
  } catch (error) {
    throw error;
  }
};
//一个判断任务是否应该立即执行的方法
const processDbResult = (job, result) => {
  let res = result.ops ? result.ops : result.value;
  if (res) {
    // 如果是个数组,则只取第一个
    if (Array.isArray(res)) {
      res = res[0];
    }
    job.attrs.nextRunAt = res.nextRunAt;
    // 如果当前时间大于任务下一次应该执行的时间,则立即执行任务
    if (job.attrs.nextRunAt && job.attrs.nextRunAt < this._nextScanAt) {
      processJobs.call(this, job);
    }
  }

  // Return the Job instance
  return job;
};

查找任务

agenda/find-and-lock-next-job.js

const JOB_PROCESS_WHERE_QUERY = {
  $and: [{
    name: jobName,
    disabled: {$ne: true}
  }, {
    $or: [{
      lockedAt: {$eq: null},
      nextRunAt: {$lte: this._nextScanAt}
    }, {
      lockedAt: {$lte: lockDeadline}
    }]
  }]
};
const JOB_PROCESS_SET_QUERY = {$set: {lockedAt: now}};
const JOB_RETURN_QUERY = {returnOriginal: false, sort: this._sort};
const result = await this._collection.findOneAndUpdate(JOB_PROCESS_WHERE_QUERY, JOB_PROCESS_SET_QUERY, JOB_RETURN_QUERY);

标签:const,job,JOB,任务,源码,文档,result,Agenda,QUERY
来源: https://www.cnblogs.com/aojun/p/15315022.html