airflow二集成EMR使用

作者: ZacksTang

1. 准备工作 1.1. 安装并初始化airflow,参考以下文档: https://www.cnblogs.com/zackstang/p/11082322.html

其中还要额外安装的是: sudo pip-3.6 install -i https://pypi.tuna.tsinghua.edu.cn/simple 'apache-airflow[celery]' sudo pip-3.6 install -i https://pypi.tuna.tsinghua.edu.cn/simple boto3 1.2. 配置好本地AWS Credentials,此credential需有启动EMR 的权限。 1.3. 置数据库为外部数据库:

编辑 airflow.cfg 文件,修改数据库连接配置(需提前在数据库中创建好airflowdb 的数据库): sql_alchemy_conn = mysql://user:password@database_location/airflowdb

使用下面的命令检查并初始化: airflow initdb 1.4. 配置executor 为 CeleryExecutor

编辑airflow.cfg 文件,修改executor配置: executor = CeleryExecutor

修改后可以保证相互无依赖的任务可以并行执行。默认为SequentialExecutor,也就是按顺序执行。 1.5 配置broker_url 与 result_backend airflow.cfg 文件中修改以下两个条目: broker_url = sqla+mysql:// user:password@database_location:3306/airflowdb result_backend = db+mysql:// user:password@database_location:3306/airflowdb

配置完后启动airflow 的web ui,worker,flower以及scheduler: airflow webserver -p 8080 \& airflow worker \& airflow flower \& airflow scheduler \& 2. 定义工作流

创建dag_trasform.py 文件,在文件中定义工作流

from airflow import DAG
from airflow.operators.bash_operator import BashOperator
from datetime import datetime, timedelta
from airflow.sensors.external_task_sensor import ExternalTaskSensor
default_args = {
    'owner': 'Airflow',
    'depends_on_past': False,
    'start_date': datetime.now().replace(microsecond=0),
    'email': [‘xxxxxx@qq.com'],
    'email_on_failure': False,
    'email_on_retry': False,
    'retries': 0,
    'retry_delay': timedelta(minutes=5),
     'queue': 'bash_queue',
     'pool': 'backfill',
     'priority_weight': 10,
     'end_date': datetime(2016, 1, 1),
}
dag = DAG('dag_transform', default_args=default_args,
          schedule_interval=timedelta(days=1))
 create emr cluster
t0 = BashOperator(
    task_id='create_emr_cluster',
    bash_command='python3 /home/hadoop/scripts/launch_emr.py',
    dag=dag)
 do wordcount
t1 = BashOperator(
    task_id='spark_job',
    bash_command='python3 /home/hadoop/scripts/submit_spark_job.py',
    dag=dag)
 check result in s3
t2 = BashOperator(
    task_id='check_s3',
    bash_command='python3 /home/hadoop/scripts/check_s3_result.py',
    dag=dag)
 hive query
t3 = BashOperator(
    task_id='query',
    bash_command='python3 /home/hadoop/scripts/query_result.py',
    dag=dag)
 terminate cluster
t4 = BashOperator(
    task_id='terminate_cluster',
    bash_command='python3 /home/hadoop/scripts/terminate_cluster.py',
    dag=dag)
 define airflow DAG
t0 >> t1
t1 >> t2
t2 >> t3
t3 >> t4

其中各个BashOperator中的脚本需自行实现,根据需求实现即可。 3. 重制 Airflow 数据库

将 dag_transform.py 文件放入 airflow/dags/ 下,然后重置 airflow 数据库:airflow resetdb 4. 运行

在airflow里手动执行这个DAG,可以看到这个DAG已经开始运行:

查看 dag_transform 可以看到已经在运行启动emr的脚本了

[[2020-03-12 12:42:54,197] {bash_operator.py:105} INFO - Temporary script location: /tmp/airflowtmptwdg7a_6/create_emr_clusterlbzuu36e
[2020-03-12 12:42:54,197] {bash_operator.py:115} INFO - Running command: python3 /home/hadoop/scripts/launch_emr.py

可以看到 EMR 集群正在启动:

t1 spark wordcount 开始执行:

t2 完成后,t3 hive query 开始执行:

最后,整个DAG执行完毕:

们也可以看到EMR集群开始自动终止:

参考文档: https://aws.amazon.com/cn/blogs/china/dynamic-start-stop-of-emr-cluster-with-airflow-and-remote-submission-of-tasks-via-livy/



原文创作:ZacksTang

原文链接:https://www.cnblogs.com/zackstang/p/12482903.html

文章列表

更多推荐

更多
  • 生成对抗网络项目-八、条件 GAN - 使用条件对抗网络的图像到图像翻译 Pix2pix 是一种生成对抗网络(GAN),用于图像到图像的翻译。 图像到图像转换是一种将图像的一种表示形式转换为另一种表示形式的方法。 Pix2pix 学习从输入图像到输出图像的映射。 ...
  • 生成对抗网络项目-九、预测 GAN 的未来 我们从对 GAN 的简要介绍开始,学习了各种重要概念。然后,我们探索了 3D-GAN,这是一种可以生成 3D 图像的 GAN。 我们训练了 3D-GAN,以生成现实世界对象的 3D ...
  • 生成对抗网络项目-生成对抗网络项目 文章列表,生成对抗网络项目-一、生成对抗网络简介,生成对抗网络项目-七、CycleGAN - 将绘画变成照片,生成对抗网络项目-三、使用条件 GAN 进行人脸老化,生成对抗网络项目-九、预测 GAN 的未来,生成对抗网络项目-二、3D-...
  • 生成对抗网络项目-六、StackGAN - 逼真的文本到图像合成 StackGAN 的架构,StackGAN 是一个两阶段的网络。 每个阶段都有两个生成器和两个判别器。 StackGAN 由许多网络组成,这些网络如下:阶段 1 GAN ...
  • 生成对抗网络项目-七、CycleGAN - 将绘画变成照片 CycleGAN 是一种生成对抗网络(GAN),用于跨域迁移任务,例如更改图像的样式,将绘画转变为照片, 反之亦然,例如照片增强功能,更改照片的季节等等。 CycleGAN 由朱俊彦,Taesung Park,Phillip Isola 和...
  • 生成对抗网络项目-三、使用条件 GAN 进行人脸老化 cGAN 是 GAN 的一种,它取决于一些额外的信息。 我们将额外的y信息作为额外的输入层提供给生成器。 在朴素 GAN 中,无法控制所生成图像的类别。 ...
  • 生成对抗网络项目-五、使用 SRGAN 生成逼真的图像 在本章中,将涵盖以下主题:SRGAN 简介,建立项目,下载 CelebA 数据集,SRGAN 的 Keras 实现,训练和优化 SRGAN 网络SRGAN 的实际应用,SRGAN 简介,与其他 GAN 一样,SRGAN ...
  • 生成对抗网络项目-四、使用 DCGAN 生成动漫角色 在本章中,我们将介绍以下主题:DCGAN 简介,GAN 网络的架构细节,建立项目,为训练准备数据集,DCGAN 的 Keras 实现以生成动画角色,在动漫角色数据集上训练 DCGAN,评估训练好的模型,通过优化超参数优化网络,DCGAN ...
  • 生成对抗网络项目-二、3D-GAN -- 使用 GAN 生成形状 3D 生成对抗网络(3D-GAN)是 GAN 的变体,就像 StackGAN,CycleGAN 和超分辨率生成对抗网络(SRGAN)一样 。 与朴素 GAN 相似,它具有生成器和判别器模型。 这两个网络都使用 3D 卷积层,而不是使用 2D...
  • 生成对抗网络项目-一、生成对抗网络简介 在本章中,我们将研究生成对抗网络(GAN)。 它们是一种深度神经网络架构,它使用无监督的机器学习来生成数据。 他们在 2014 年由 Ian Goodfellow,Yoshua Bengio 和 Aaron Courville ...
  • 近期文章

    更多
    文章目录

      推荐作者

      更多