1. 准备工作 1.1. 安装并初始化airflow,参考以下文档: https://www.cnblogs.com/zackstang/p/11082322.html
其中还要额外安装的是: sudo pip-3.6 install -i https://pypi.tuna.tsinghua.edu.cn/simple 'apache-airflow[celery]' sudo pip-3.6 install -i https://pypi.tuna.tsinghua.edu.cn/simple boto3 1.2. 配置好本地AWS Credentials,此credential需有启动EMR 的权限。 1.3. 置数据库为外部数据库:
编辑 airflow.cfg 文件,修改数据库连接配置(需提前在数据库中创建好airflowdb 的数据库): sql_alchemy_conn = mysql://user:password@database_location/airflowdb
使用下面的命令检查并初始化: airflow initdb 1.4. 配置executor 为 CeleryExecutor
编辑airflow.cfg 文件,修改executor配置: executor = CeleryExecutor
修改后可以保证相互无依赖的任务可以并行执行。默认为SequentialExecutor,也就是按顺序执行。 1.5 配置broker_url 与 result_backend airflow.cfg 文件中修改以下两个条目: broker_url = sqla+mysql:// user:password@database_location:3306/airflowdb result_backend = db+mysql:// user:password@database_location:3306/airflowdb
配置完后启动airflow 的web ui,worker,flower以及scheduler: airflow webserver -p 8080 \& airflow worker \& airflow flower \& airflow scheduler \& 2. 定义工作流
创建dag_trasform.py 文件,在文件中定义工作流:
from airflow import DAG
from airflow.operators.bash_operator import BashOperator
from datetime import datetime, timedelta
from airflow.sensors.external_task_sensor import ExternalTaskSensor
default_args = {
'owner': 'Airflow',
'depends_on_past': False,
'start_date': datetime.now().replace(microsecond=0),
'email': [‘xxxxxx@qq.com'],
'email_on_failure': False,
'email_on_retry': False,
'retries': 0,
'retry_delay': timedelta(minutes=5),
'queue': 'bash_queue',
'pool': 'backfill',
'priority_weight': 10,
'end_date': datetime(2016, 1, 1),
}
dag = DAG('dag_transform', default_args=default_args,
schedule_interval=timedelta(days=1))
create emr cluster
t0 = BashOperator(
task_id='create_emr_cluster',
bash_command='python3 /home/hadoop/scripts/launch_emr.py',
dag=dag)
do wordcount
t1 = BashOperator(
task_id='spark_job',
bash_command='python3 /home/hadoop/scripts/submit_spark_job.py',
dag=dag)
check result in s3
t2 = BashOperator(
task_id='check_s3',
bash_command='python3 /home/hadoop/scripts/check_s3_result.py',
dag=dag)
hive query
t3 = BashOperator(
task_id='query',
bash_command='python3 /home/hadoop/scripts/query_result.py',
dag=dag)
terminate cluster
t4 = BashOperator(
task_id='terminate_cluster',
bash_command='python3 /home/hadoop/scripts/terminate_cluster.py',
dag=dag)
define airflow DAG
t0 >> t1
t1 >> t2
t2 >> t3
t3 >> t4
其中各个BashOperator中的脚本需自行实现,根据需求实现即可。 3. 重制 Airflow 数据库
将 dag_transform.py 文件放入 airflow/dags/ 下,然后重置 airflow 数据库:airflow resetdb 4. 运行
在airflow里手动执行这个DAG,可以看到这个DAG已经开始运行:
查看 dag_transform 可以看到已经在运行启动emr的脚本了:
[[2020-03-12 12:42:54,197] {bash_operator.py:105} INFO - Temporary script location: /tmp/airflowtmptwdg7a_6/create_emr_clusterlbzuu36e
[2020-03-12 12:42:54,197] {bash_operator.py:115} INFO - Running command: python3 /home/hadoop/scripts/launch_emr.py
可以看到 EMR 集群正在启动:
t1 spark wordcount 开始执行:
t2 完成后,t3 hive query 开始执行:
最后,整个DAG执行完毕:
们也可以看到EMR集群开始自动终止:
原文创作:ZacksTang
原文链接:https://www.cnblogs.com/zackstang/p/12482903.html
文章列表
- 集成学习与随机森林四Boosting与Stacking
- 集成学习与随机森林二Bagging与Pasting
- 集成学习与随机森林三随机森林与随机子空间
- 集成学习与随机森林一投票分类器
- 降维二PCA
- 降维三LLE与其他降维技术
- 降维一维度灾难与降维主要方法
- 机器学习项目流程四选择并训练模型
- 机器学习项目流程五模型调优
- 机器学习项目流程二探索并可视化数据
- 机器学习项目流程三为机器学习准备数据
- 机器学习项目流程一初探数据集
- 天池题目:工业蒸汽预测一 数据探索
- 分类问题四ROC曲线
- 分类问题六误差分析
- 分类问题五多元分类
- 分类问题二分类器的性能衡量
- 分类问题三混淆矩阵,Precision与Recall
- 分类问题七多标签分类与多输出分类
- 分类问题一MINST数据集与二元分类器
- 决策树二决策树回归
- 决策树一决策树分类
- 使用AWS SageMaker进行机器学习项目
- 使用AWS Glue进行 ETL 工作
- airflow二集成EMR使用
- XGBoost介绍
- Spark Structured Streaming二实战
- Spark Structured Streaming一基础
- SVM支持向量机二非线性SVM分类
- SVM支持向量机三SVM回归与原理
- SVM支持向量机一线性SVM分类
- Netty二线程模型
- Netty三Netty模型
- Netty一IO模型
- NLP与深度学习四Transformer模型
- NLP与深度学习六BERT模型的使用
- NLP与深度学习五BERT预训练模型
- NLP与深度学习二循环神经网络
- NLP与深度学习三Seq2Seq模型与Attention机制
- NLP与深度学习一NLP任务流程
- Kubernetes四Pod详解
- Kubernetes八安全认证
- Kubernetes五 Pod控制器详解
- Kubernetes二资源管理
- Kubernetes三实战入门
- Kubernetes七数据存储
- Kubernetes一Overview
- Kaggle泰坦尼克数据科学解决方案
- Kaggle 题目 nucs6220assignment1
- Elasticsearch 入门
- Docker二Image 与网络
- Docker一概念与基础
- DebeziumFlinkHudi:实时流式CDC
- ClickHouse介绍四ClickHouse使用操作
- ClickHouse介绍二MergeTree引擎
- ClickHouse介绍三MergeTree系列表引擎
- ClickHouse介绍一初次使用
- Bike Sharing Analysis二 假设检验方法
- Bike Sharing Analysis一 探索数据
- Apache Kylin二在EMR上搭建Kylin
- Apache Kylin三Kylin上手
- Apache Kylin一Kylin介绍