一、Azkaban简明笔记
作者:互联网
1、azkaban部署
主要是集群部署安装。
1.1 准备安装包
1.2 配置MySQL
-
启动mysql
mysql -uroot -proot
-
创建azkaban数据库
create database azkaban;
-
创建azkaban用户并赋予权限(可以不设置账号,继续使用root账号)
-- 显示相关变量 SHOW VARIABLES like 'validate_password%'; -- 设置密码有效长度1位及以上 set global validate_password.length=1; -- 设置密码策略最低级别 set global validate_password_policy=0; -- 创建Azkaban用户,任何主机都可以访问Azkaban,密码是azkaban CREATE USER 'azkaban'@'%' IDENTIFIED BY 'azkaban'; -- 赋予Azkaban用户增删改查权限 GRANT SELECT,INSERT,UPDATE,DELETE ON azkaban.* to 'azkaban'@'%' WITH GRANT OPTION;
-
创建azkaban的表
source /opt/software/azkaban/azkaban-db-3.84.4/create-all-sql-3.84.4.sql;
-
更改mysql包大小,防止azkaban连接mysql阻塞
sudo vim /etc/my.cnf # 在[mysqld]下面加一行max_allowed_packet=1024M [mysqld] max_allowed_packet=1024M
-
重启mysql
sudo systemctl restart mysqld
1.3 配置Executor Server
Azkaban Executor Server处理工作流和作业的实际执行。
-
编辑azkaban.properties
vim /opt/software/azkaban/azkaban-exec-server-3.84.4/conf/azkaban.properties
修改如下属性:
# 修改如下内容 default.timezone.id=Asia/Shanghai azkaban.webserver.url=http://node001:8081 executor.port=12321 database.type=mysql mysql.port=3306 mysql.host=node001 mysql.database=azkaban mysql.user=azkaban mysql.password=azkaban mysql.numconnections=100 # 添加如下内容 executor.metric.reports=true executor.metric.milisecinterval.default=60000
-
进入
.../azkaban-exec-server-3.84.4/lib
更新mysql-connector-java-8.0.29.jar
包,使其与mysql版本匹配 -
同步
azkaban-exec-server-3.84.4
到所有节点 -
在各节点的**.../azkaban-exec-server-3.84.4/ **目录分别执行
bin/start-exec.sh
如果出现executor.port文件,说明启动成功。
-
在各个节点激活executor
curl -G "node001:$(<./executor.port)/executor?action=activate" && echo curl -G "node002:$(<./executor.port)/executor?action=activate" && echo curl -G "node003:$(<./executor.port)/executor?action=activate" && echo
如果节点出现提示:{"status":"success"},表示激活成功。
1.4 配置Web Server
Azkaban Web Server处理项目管理,身份验证,计划和执行触发。
-
编辑
azkaban.properties
修改如下: default.timezone.id=Asia/Shanghai database.type=mysql mysql.port=3306 mysql.host=node001 mysql.database=azkaban mysql.user=root mysql.password=root mysql.numconnections=100 azkaban.executorselector.filters=StaticRemainingFlowSize,CpuStatus
说明:
-
StaticRemainingFlowSize:正在排队的任务数
-
CpuStatus:CPU占用情况
-
MinimumFreeMemory:内存占用情况。测试环境,必须将MinimumFreeMemory删除掉,否则它会认为集群资源不够,不执行
-
-
修改
azkaban-users.xml
文件,添加新用户<azkaban-users> <user groups="azkaban" password="azkaban" roles="admin" username="azkaban"/> <user password="metrics" roles="metrics" username="metrics"/> <user password="azkaban" roles="metrics,admin" username="nuochengze"/> <role name="admin" permissions="ADMIN"/> <role name="metrics" permissions="METRICS"/> </azkaban-users>
-
进入
.../azkaban-web-server-3.84.4/lib
更新mysql-connector-java-8.0.29.jar
包,使其与mysql版本匹配 -
进入
...//azkaban-web-server-3.84.4/
目录,启动web Server./bin/start-web.sh
-
访问
http://node001:8081/
,并用配置的账号登录
2、使用
2.1 HelloWorld案例
-
新建azkaban.project文件,编辑内容如下:
azkaban-flow-version: 2.0
该文件的表明,采用新的Flow-API方式解析flow文件
-
新建hello_world.flow文件,编辑内容如下:
nodes: - name: jobA # job的名称 type: command # job的类型,command表示要执行作业的方式为命令 config: # job的配置信息 command: echo "hello world"
-
将azkaban.project、hello_world.flow文件压缩到一个zip文件
说明:文件名称必须是英文
-
在WebServer新建一个项目:
http://node001:8081/index
- 给项目名称命名和添加项目描述
- 将zip包文件上传
- 执行
Execute Flow
- 在
Job List
中查看运行结果
2.2 作业依赖案例
需求:JobA和JobB执行完了,才能执行JobC
步骤:
-
创建
basic.flow
文件,添加如下内容nodes: - name: JobC type: command dependsOn: - JobA - JobB config: command: echo "I'm JobC" - name: JobA type: command config: command: echo "I'm JobA" - name: JobB type: command config: command: echo "I'm JobB"
dependsOn后面表示当前Job依赖的其他Job
-
创建
azkaban.project
文件,编辑内容如下:azkaban-flow-version: 2.0
-
将
basic.flow
和azkaban.project
文件压缩成dependson_example.zip
文件 -
在WebServer新建一个项目:
http://node001:8081/index
- 给项目名称命名和添加项目描述
- 将zip包文件上传
- 执行
Execute Flow
- 在
Job List
中查看运行结果
2.3 自动失败重试案例
需求:如果执行任务失败,需要重试3次,重试的时间间隔10000ms
修改basic.flow文件:
nodes:
- name: JobA
type: command
config:
command: sh /not_exists.sh
retries: 3
retry.backoff: 10000
参数说明:
- retries:重试次数
- retry.backoff: 重试的时间间隔
执行的次数= 一次失败+三次重试
全局配置的方式:
# 在Flow全局配置中添加任务失败配置,此时重试配置会应用到所有Job
config:
retires: 3
retry.backoff: 10000
nodes:
- name: JobA
type: command
config:
command: sh /not_exists.sh
2.4 手动失败重试案例
Enable 和 Disable 下面都分别有如下参数:
-
Parents:该作业的上一个任务
-
Ancestors:该作业前的所有任务
-
Children:该作业后的一个任务
-
Descendents:该作业后的所有任务
-
Enable All:所有的任务
2.5 JavaProcess作业类型案例
JavaProcess 类型可以运行一个自定义主类方法,
type 类型为 javaprocess,
可用的配置为:
- Xms:最小堆
- Xmx:最大堆
- classpath:类路径
- java.class:要运行的 Java 对象,其中必须包含 Main 方法
- main.args:main 方法的参数
步骤:
-
新建一个azkaban的maven工程
-
创建包名:com.nuochengze
-
创建AzTest类:
package com.nuochengze public class AzTest { public static void main(String[] args){ System.out.println("This is a test."); } }
-
打包成jar包
-
修改
basic.flow
文件nodes: - name: test_java type: javaprocess config: Xms: 96M Xmx: 200M java.class: com.nuochengze.AzTest
-
将jar包和
basic.flow
文件及azkaban.project
文件打包成zip包
2.6 条件工作流案例
条件工作流功能允许用户自定义执行条件来决定是否运行某些Job。条件可以由当前Job 的父 Job 输出的运行时参数构成,也可以使用预定义宏。在这些条件下,用户可以在确定 Job 执行逻辑时获得更大的灵活性,例如,只要父 Job 之一成功,就可以运行当前 Job。
运行时参数案例:
-
基本原理
-
父 Job 将参数写入 JOB_OUTPUT_PROP_FILE 环境变量所指向的文件
-
子 Job 使用 ${jobName:param}来获取父 Job 输出的参数并定义执行条件
-
-
支持的条件运算符
- == 等于
- != 不等于
- > 大于
- >= 大于等于
- < 小于
- <= 小于等于
- && 与
- || 或
- ! 非
案例:
-
需求:JobA执行一个shell脚本,JobB执行一个shell脚本,但JobB不需要每天都执行,而只需要每个周一执行
-
步骤:
-
新建jobA.sh
#! /bin/bash echo "do JobA" wk=`date +%w` echo "{\"wk\":$wk}" > $JOB_OUTPUT_PROP_FILE
-
新建JobB.sh
#!/bin/bash echo "do JobB"
-
新建basic.flow
nodes: - name: JobA type: command config: command: sh JobA.sh - name: JobB type: command dependsOn: - JobA config: command: sh JobB.sh condition: ${JobA:wk} == 1
按照设定条件,JobB会根据当日日期决定是否执行
-
2.7 预定义宏案例
Azkaban 中预置了几个特殊的判断条件,称为预定义宏。 预定义宏会根据所有父 Job 的完成情况进行判断,再决定是否执行。
可用的预定义宏如 下:
- all_success: 表示父 Job 全部成功才执行(默认)
- all_done:表示父 Job 全部完成才执行
- all_failed:表示父 Job 全部失败才执行
- one_success:表示父 Job 至少一个成功才执行
- one_failed:表示父 Job 至少一个失败才执行
案例:
-
需求
JobA 执行一个 shell 脚本 ,JobB 执行一个 shell 脚本,JobC 执行一个 shell 脚本,要求 JobA、JobB 中有一个成功即可执行
-
步骤
-
新建JobA.sh
#!/bin/bash echo "do JobA"
-
新建JobC.sh
#!/bin/bash echo "do JobC
-
新建basic.flow
nodes: - name: JobA type: command config: command: sh JobA.sh - name: JobB type: command config: command: sh JobB.sh - name: JobC type: command depondsOn: - JobA - JobB config: command: sh JobC.sh condition: one_success
注意:没有JobB.sh
-
2.8 定时执行
- Azkaban 可以定时执行工作流。在执行工作流时候,选择左下角 Schedule
- 右上角注意时区是上海,然后在左面填写具体执行事件,填写的方法和 crontab 配置定时 任务规则一致
- 点击 remove Schedule 即可删除当前任务的调度规则
2.9 邮箱报警案例
-
注册邮箱并开启smtp,获取第三方客户端授权码
-
Azkaban 默认支持通过邮件对失败的任务进行报警,配置方法如下:
-
在 azkaban-web 节 点 node001上,编辑
.../azkaban-web-server-3.84.4/conf/azkaban.properties
,并修改如下内容:#这里设置邮件发送服务器 mail.sender=xxxx@126.com mail.host=smtp.126.com mail.user=xxxx@126.com mail.password=用邮箱的授权码
-
保存并重启web-server
-
页面配置
-
2.10 Azkaban多Executor模式注意事项
Azkaban 多 Executor 模式是指,在集群中多个节点部署 Executor。在这种模式下, Azkaban web Server 会根据策略,选取其中一个 Executor 去执行任务。
为确保所选的 Executor 能够准确的执行任务,我们须在以下两种方案任选其一,推荐使 用方案二。
-
方案一:指定特定的 Executor(hadoop102)去执行任务。
-
在 MySQL 中 azkaban 数据库 executors 表中,查询 node001上的 Executor 的 id。
-
在执行工作流程时加入 useExecutor 属性,如下
-
- 方案二:在 Executor 所在所有节点部署任务所需脚本和应用。
标签:简明,笔记,azkaban,Job,sh,command,JobA,mysql,Azkaban 来源: https://www.cnblogs.com/nuochengze/p/16648348.html