定时任务
定时任务
通过框架开发的 HTTP Server
是请求响应模型的,但是仍然还会有许多场景需要执行一些定时任务,例如:
- 定时上报应用状态。
- 定时从远程接口更新本地缓存。
- 定时进行文件切割、临时文件删除。
框架提供了一套机制来让定时任务的编写和维护更加优雅。
编写定时任务
所有的定时任务都统一存放在 app/schedule
目录下,每一个文件都是一个独立的定时任务, 可以配置定时任务的属性和要执行的方法。
例如:定义一个更新远程数据到内存缓存的定时任务,就可以在 app/schedule
目录下创建一个 update_cache.js
文件
const Subscription = require('egg').Subscription
class UpdateCache extends Subscription {
// 通过 schedule 属性来设置定时任务的执行间隔等配置
static get schedule() {
return {
interval: '1m', // 1 分钟间隔
type: 'all', // 指定所有的 worker 都需要执行
}
}
// subscribe 是真正定时任务执行时被运行的函数
async subscribe() {
const res = await this.ctx.curl('http://www.api.com/cache', {
dataType: 'json',
})
this.ctx.app.cache = res.data
}
}
module.exports = UpdateCache
可以简写:
module.exports = {
schedule: {
interval: '1m', // 1 分钟间隔
type: 'all', // 指定所有的 worker 都需要执行
},
async task(ctx) {
const res = await ctx.curl('http://www.api.com/cache', {
dataType: 'json',
})
ctx.app.cache = res.data
},
}
定时任务会在每一个 Worker
进程上每 1 分钟执行一次,将远程数据请求回来挂载到 app.cache
上。
任务
task
或subscribe
同时支持generator function
和async function
。task
的入参为ctx
,匿名的 Context 实例,可以通过它调用service
等。
定时方式
定时任务可以指定 interval 或者 cron 两种不同的定时方式。
interval
通过 schedule.interval
参数来配置定时任务的执行时机,定时任务将会每间隔指定的时间执行一次。interval 可以配置成
- 数字类型,单位为毫秒数,例如 5000。
- 字符类型,会通过 ms 转换成毫秒数,例如 5s。
module.exports = {
schedule: {
// 每 10 秒执行一次,转化为10000ms
interval: '10s',
},
}
cron
通过 schedule.cron
参数来配置定时任务的执行时机,定时任务将会按照 cron
表达式在特定的时间点执行。cron
表达式通过 cron-parser
进行解析。cron表达式在线解析
注意:cron-parser 支持可选的秒(linux crontab 不支持)。
* * * * * *
┬ ┬ ┬ ┬ ┬ ┬
│ │ │ │ │ |
│ │ │ │ │ └ day of week (0 - 7) (0 or 7 is Sun)
│ │ │ │ └───── month (1 - 12)
│ │ │ └────────── day of month (1 - 31)
│ │ └─────────────── hour (0 - 23)
│ └──────────────────── minute (0 - 59)
└───────────────────────── second (0 - 59, optional)
cron表达式解析:
module.exports = {
schedule: {
// 每三小时准点执行一次
cron: '0 0 */3 * * *',
},
}
类型
框架提供的定时任务默认支持两种类型,worker
和 all
。
worker
和 all
都支持上面的两种定时方式,只是当到执行时机时,会执行定时任务的 worker 不同
worker
类型:每台机器上只有一个worker
会执行这个定时任务,每次执行定时任务的 worker 的选择是随机的。all
类型:每台机器上的每个worker
都会执行这个定时任务。
其他参数
除了刚才介绍到的几个参数之外,定时任务还支持这些参数:
cronOptions
: 配置cron
的时区等,参见cron-parser
文档immediate
:配置了该参数为true
时,这个定时任务会在应用启动并ready
后立刻执行一次这个定时任务。disable
:配置该参数为true
时,这个定时任务不会被启动。env
:数组,仅在指定的环境下才启动该定时任务。
执行日志
执行日志会输出到 ${appInfo.root}/logs/{app_name}/egg-schedule.log
,默认不会输出到控制台,可以通过 config.customLogger.scheduleLogger
来自定义。
// config/config.default.js
config.customLogger = {
scheduleLogger: {
// consoleLevel: 'NONE',
// file: path.join(appInfo.root, 'logs', appInfo.name, 'egg-schedule.log'),
},
}
动态配置定时任务
有时候需要配置定时任务的参数。定时任务还有支持另一种写法:
// 直接返回对象
module.exports = (app) => {
return {
schedule: {
interval: app.config.cacheTick,
type: 'all',
},
async task(ctx) {
// 获取外部请求
const res = await ctx.curl('http://www.api.com/cache', {
contentType: 'json',
})
ctx.app.cache = res.data
},
}
}
手动执行定时任务
可以通过 app.runSchedule(schedulePath)
来运行一个定时任务。 app.runSchedule
接受一个定时任务文件路径(app/schedule
目录下的相对路径或者完整的绝对路径), 执行对应的定时任务,返回一个 Promise
对象。
- 通过手动执行定时任务可以更优雅地编写对定时任务的单元测试。
const assert = require('node:assert')
const mm = require('egg-mock')
it('should schedule work fine', async () => {
const app = mm.app()
await app.ready()
await app.runSchedule('update_cache')
assert(app.cache)
})
- 应用启动时,手动执行定时任务进行系统初始化,等初始化完毕后再启动应用。 参见应用启动自定义章节,我们可以在 app.js 中编写初始化逻辑。
module.exports = (app) => {
app.beforeStart(async () => {
// 保证应用启动监听端口前数据已经准备好了
// 后续数据的更新由定时任务自动触发
await app.runSchedule('update_cache')
})
}
扩展定时任务类型
默认框架提供的定时任务只支持每台机器的单个进程执行和全部进程执行, 有些情况下,我们的服务并不是单机部署的,这时候可能有一个集群的某一个进程执行一个定时任务的需求。
框架并没有直接提供此功能,但开发者可以在上层框架自行扩展新的定时任务类型。
在 agent.js
中继承 agent.ScheduleStrategy
,然后通过 agent.schedule.use()
注册即可
module.exports = (agent) => {
class ClusterStrategy extends agent.ScheduleStrategy {
start() {
// 订阅其他的分布式调度服务发送的消息,收到消息后让一个进程执行定时任务
// 用户在定时任务的 schedule 配置中来配置分布式调度的场景(scene)
agent.mq.subscribe(schedule.scene, () => this.sendOne())
}
}
agent.schedule.use('cluster', ClusterStrategy)
}
ScheduleStrategy 基类提供了:
schedule
: 定时任务的属性,disable
是默认统一支持的,其他配置可以自行解析。this.sendOne(...args)
: 随机通知一个worker
执行task
,args
会传递给subscribe(...args)
或task(ctx, ...args)
。this.sendAll(...args)
: 通知所有的worker
执行task
。