AWS自动化机器学习-八、使用 Apache Airflow 实现机器学习过程的自动化

作者: Apache CN

当建立一个 ML 模型时,有一个所有 ML 从业者都知道的基本原则;也就是说,最大似然模型只有在数据被训练时才是稳健的。在前四章中,我们主要关注于使用以源代码为中心的机制来自动化 ML 过程。换句话说,我们应用了持续集成和持续部署的 DevOps 方法,通过提供模型源代码、调整参数和 ML 工作流源代码来自动化 ML 过程。对这些工件的任何更改都会触发 CI/CD 管道的发布更改过程。

然而,我们也提供了从 UCI 机器学习仓库下载的静态鲍鱼数据作为源工件,但是我们从未对这些数据进行任何更改。因此,使用典型的 DevOps 方法,数据工件是静态的,因此不会触发 CI/CD 过程的变更发布。

因此,数据成为应用 DevOps 方法实现 ML 流程自动化与 MLOps 策略之间的关键区别。对于一个成功的 MLOps 策略,我们基本上需要在数据发生变化时提供自动化 ML 过程的能力。本质上,正如 ML 自动化过程在源代码被添加或更新时被触发一样,我们也需要在现有数据被更新或新数据被添加时触发自动化过程。

这就引出了一个问题:当我们有新数据时,我们如何自动化 ML 流程?

为了回答这个问题,在这一章中,我们将使用一种以数据为中心的方法,关注 AWS 上 ML 过程的自动化。本章的总体目标是复制我们在 [第 4 章] 、中建立的基础,使用 AWS 步骤函数自动化机器学习过程,用新的训练数据自动化年龄计算器示例。我们将通过以下主题来实现这一目标:

  • 介绍阿帕奇气流
  • 介绍亚马逊 MWAA
  • 利用气流处理鲍鱼数据集
  • 配置 MWAA 系统先决条件
  • 配置 MWAA 环境

技术要求

我们将在本章中使用以下资源:

  • web 浏览器(为了获得最佳体验,建议您使用 Chrome 或 Firefox)。
  • 访问您在第七章 、中使用的 AWS 帐户,使用 AWS 步骤功能构建 ML 工作流
  • 访问我们在 [第七章] B17649_07_ePub.xhtml##_idTextAnchor106
  • 我们将再次在 AWS 免费层的使用限制内工作,以避免产生不必要的成本。
  • 本章的配套 GitHub 存储库中提供了源代码示例(https://GitHub . com/packt publishing/Automated-Machine-Learning-on-AWS/tree/main/chapter 07)。Cloud9 开发环境中应该已经有代码示例了;如果没有,参考 [第四章]中的开发应用工件部分。

介绍阿帕奇气流

用于 ML 模型训练的数据可以来自各种来源,例如数据库、数据仓库,甚至数据湖。这些数据存储库以各种不同的数据格式存储数据。例如,数据可以存储为非结构化对象,如图像、视频或声音文件。对象可能被存储为半结构化数据,比如不符合标准化表格模式的 JSON 数据。在关系数据库或数据仓库的情况下,数据以有组织和结构化的格式存储,但它可能有多种不同类型的模式。

更糟糕的是,一些数据集可能非常大,通常为 TB 级,甚至 Pb 级,其中连接、合并和转换数据,通常被称为提取、转换和加载(ETL) 过程,需要大型计算集群,如的 HadoopApache Spark 集群。AWS 提供基础设施资源和专用服务,以 AWS Glue 的形式(托管 ETL 服务)和 Amazon Elastic Map ReduceEMR (大数据平台)来扩展这些大数据工作负载。

然而,对这些不同类型的大数据及其不同的源执行 ETL 任务,通常需要将多个独立的 ETL 任务以菊花链形式连接在一起,作为协调工作流的一部分,其中一个 ETL 任务的数据输出成为另一个 ETL 任务的输入,依此类推。可以想象,创建这样的工作流可能是一项艰巨的任务。

因此,为了简化流程,许多数据工程师依赖于Apache air flow(https://air flow . Apache . org/),这是一个平台,允许他们以编程方式构建、执行和管理这些潜在的复杂数据工作流。气流平台包括三个关键部件,即:

  • 基于网络的管理界面
  • 调度程序,负责调度和协调资源以执行工作流中的各个步骤或任务
  • 多个工作人员来执行工作流中每个特定任务的代码

为了使用 Airflow 平台,数据工程师以有向无环图(DAG) 的形式创建工作流的编码表示。图 8.1 显示了一个气流 DAG 的示例: ![Figure 8.1 – Airflow DAG example ]

图 8.1–气流 DAG 示例

注意 图 8.1 在 Apache 2.0 许可下可用,可以在 Airflow GitHub 资源库中引用(https://GitHub . com/Apache/air flow/blob/main/docs/Apache-air flow/img/edge label example . png)。

正如您从图 8.1 中看到的,DAG 由各种顺序的或定向的任务组成,这些任务被编程定义为 Python 结构。一旦 DAG 被提交给调度器,调度器就通过将每个任务分配给一个工作者来协调它的执行。每个工作者依次处理分配给它的单个任务的代码。

因此,使用 Airflow 可以显著简化数据工程师的任务,或者协调这些复杂的数据转换任务。然而,现在又有一个平台需要管理,这增加了基础架构和运营团队的复杂性,因为现在这些团队必须管理大数据处理平台(如 Hadoop 和 Spark)以及气流平台。 如何简化平台管理任务?

为了回答这个问题,我们接下来将探索亚马逊管理的阿帕奇气流 ( MWAA )的工作流程。

介绍亚马逊 MWAA

管理大数据平台通常不属于 ML 从业者的任务组合。通常,ML 从业者和数据工程团队依靠基础设施和运营团队来管理这些平台。

将气流作为大数据基础架构的一部分意味着这些平台团队现在必须管理额外的计算资源,协调他们的部署,更新软件和操作系统补丁,并监控这些资源,以确保他们不断满足工作负载扩展要求和其他服务级别协议(SLA)。 AWS 提供多种大数据托管服务,如 EMR 和 Glue,以帮助平台团队卸载这些管理任务,并且在 2020 年 11 月,AWS 推出了亚马逊 MWAA,以帮助卸载气流平台的管理。借助 MWAA,平台团队可以运行高度可用和可扩展的 Airflow 集群,而无需单独配置、更新和监控 Web UI 服务器、调度程序甚至工作节点。这意味着 ML 从业者和数据工程团队可以专注于开发数据工作流,而不依赖于平台团队。

为了说明 MWAA 在实践中是如何工作的,我们将利用服务的年龄计算器用例。

利用气流处理鲍鱼数据集

为了设置场景,您将回忆起 [第 1 章] B17649_01_ePub.xhtml##_idTextAnchor015) ,AWS 上的自动机器学习入门,其中提到 ACME Fishing Logistics 公司使用一个过时的数据集(在 UCI 机器学习库中找到

为了解决这个问题,ACME 聘请了一家外部公司来调查鲍鱼捕获量,并提供调查数据集的每日更新。这意味着已经调优的 ML 模型可以在新数据上重新训练,从而进一步优化。这也意味着数据工程团队需要编排一个流程或数据管道,将原始数据集与新的调查数据合并,并向新的模型训练管道提供新的训练、验证和测试数据集,所有这些都使用 MWAA 服务。

让我们看看不同的 ACME 团队将如何完成这项任务。

配置 MWAA 系统的先决条件

在我们推出 MWAA 服务之前,有几个先决条件需要满足,即:

  • MWAA 需要访问存储 Dag 的 S3 存储桶。
  • MWAA 需要访问一个requirements.txt文件,该文件也存储在 S3 上,以加载工人执行分配给他们的任务所需的任何独特的 Python 库。
  • 虽然 MWAA 不要求,但我们还需要配置各种 IAM 角色来访问后端服务,如 Glue 和 SageMaker。
  • 我们还需要提供各种后端服务所需的工件。例如,我们需要提供 ETL 脚本来执行胶合服务。

在以下步骤中,我们将作为 CDK 应用提供这些先决条件:

  1. 登录到上一章中使用的同一个 AWS 帐户,打开 AWS Cloud9 控制台(https://console . AWS . Amazon . com/cloud 9)。
  2. In the Your environments section, click the Open IDE button for the MLOps-IDE development environment. 注意 如果您一直关注这一点,那么您应该已经配置了 MLOps-IDE 环境,以及 AWS CDK 的 2.3.0 版本。如果没有,请参考 [第四章]中的准备开发环境部分。
  3. 接下来,我们将创建一个 CodeCommit 存储库来存储整个解决方案的各种编码工件:
    $ cd ~/environment
    $ aws codecommit create-repository --repository-name abalone-data-pipeline --repository-description "Automated ML on AWS using Managed Workflows for Apache Airflow"
    
  4. 现在我们可以捕获存储库的 URI 进行克隆:
    $ CLONE_URL=$(aws codecommit get-repository --repository-name abalone-data-pipeline --query "repositoryMetadata.cloneUrlHttp" --output text)
    
  5. 运行以下命令克隆存储库:
    $ git clone $CLONE_URL
    
  6. 运行以下命令来初始化新存储库中的 CDK 应用:
    $ cd abalone-data-pipeline && cdk init --language python
    $ git add -A
    $ git commit -m "Started CDK Project"
    $ git branch main
    $ git checkout main
    $ source .venv/bin/activate
    
  7. 接下来,我们将通过运行以下命令来安装必要的开发库:
    $ python -m pip install -U pip pylint boto3 sagemaker==2.49.1 apache-airflow
    $ pip install –r requirements.txt
    
  8. Now that we have the relevant libraries installed, we can start defining the various data pipeline resources. Using the Cloud9 navigation panel, on the left-hand side, expand the abalone_data_pipeline folder and double-click on the abalone_data_pipeline_stack.py file for editing. 注意 CDK 代码的例子可以在本章的配套 GitHub 资源库中找到(https://GitHub . com/packt publishing/Automated-Machine-Learning-on-AWS/tree/main/chapter 08/CDK)。
  9. 删除模板代码并添加以下代码以导入必要的 CDK 库:
    import os
    import aws_cdk.aws_codecommit as codecommit
    import aws_cdk.aws_codebuild as codebuild
    import aws_cdk as cdk
    import aws_cdk.aws_s3 as s3
    import aws_cdk.aws_ssm as ssm
    import aws_cdk.aws_s3_deployment as s3_deployment
    import aws_cdk.aws_iam as iam
    import aws_cdk.aws_glue as glue
    import aws_cdk.aws_lambda as lambda_
    import aws_cdk.aws_events_targets as targets
    from constructs import Construct
    
  10. 接下来,我们通过添加以下代码来初始化DataPipelineStack类:
    class DataPipelineStack(cdk.Stack):
        def __init__(self, scope: Construct, id: str, *, airflow_environment_name: str=None, model_name: str=None, repo_name: str=None, **kwargs) -> None:
            super().__init__(scope, id, **kwargs)
    
  11. 我们将构建的第一个构造是对 CodeCommit 存储库的引用。添加以下代码将 CodeCommit 存储库注册为 CDK 构造:
            code_repo = codecommit.Repository.from_repository_name(
                self,
                "SourceRepository",
                repository_name=repo_name
            )
    
  12. 现在,我们创建 S3 桶来存储相关数据,并将桶名存储为 SSM 参数,以便在气流 DAG:
            Data_bucket = s3.Bucket(
                self,
                "AirflowDataBucket",
                bucket_name=f"{model_name}-data-{cdk.Aws.REGION}-{cdk.Aws.ACCOUNT_ID}",
                block_public_access=s3.BlockPublicAccess.BLOCK_ALL,
                auto_delete_objects=True,
                removal_policy=cdk.RemovalPolicy.DESTROY,
                versioned=True
            )
            ssm.StringParameter(
                self,
                "DataBucketParameter",
                description="Airflow Data Bucket Name",
                parameter_name="AirflowDataBucket",
                string_value=data_bucket.bucket_name
            )
    
    中引用
  13. 接下来,我们创建一个 SageMaker 角色,它允许气流工作流启动 SageMaker API 调用。我们需要确保这个角色能够访问数据桶,并且在气流 DAG 中是可引用的。因此,我们也将角色 ARN 存储为 SSM 参数:
            sagemaker_role = iam.Role(
                self,
                "SageMakerBuildRole",
                assumed_by=iam.CompositePrincipal(
                    iam.ServicePrincipal("sagemaker.amazonaws.com")
                ),
                managed_policies=[
                    iam.ManagedPolicy.from_aws_managed_policy_name("AmazonSageMakerFullAccess")
                ]
            )
            data_bucket.grant_read_write(sagemaker_role)
            ssm.StringParameter(
                self,
                "SageMakerRoleParameter",
                description="SageMaker Role ARN",
                parameter_name="SageMakerRoleARN",
                string_value=sagemaker_role.role_arn
            )
    
  14. 在前面的章节中,我们创建了一个 AWS Lambda 函数来分析 ML 模型评估的结果。这是在 ML 工作流构建过程中完成的。因为我们正在为气流工作流构建各种资源,所以我们将在这里编写 Lambda 函数。该函数还被授权读取数据桶中的评估结果文件,并被存储为 SSM 参数:
            analyze_results_lambda = lambda_.Function(
                self,
                "AnalyzeResults",
                handler="index.lambda_handler",
                runtime=lambda_.Runtime.PYTHON_3_8,
                code=lambda_.Code.from_asset(os.path.join(os.path.dirname(__file__), "../artifacts/lambda/analyze_results")),
                memory_size=128,
                timeout=cdk.Duration.seconds(60)
            )
            data_bucket.grant_read(analyze_results_lambda)
            ssm.StringParameter(
                self,
                "AnalyzeResultsParameter",
                description="Analyze Results Lambda Function Name",
                parameter_name="AnalyzeResultsLambda",
                string_value=analyze_results_lambda.function_name
            )
    
  15. 接下来,我们将使用下面的代码创建必要的资源来处理训练、验证和测试数据集。如前所述,我们需要扩展数据处理,以处理潜在的大型数据集。因此,为了简化这个过程,我们将利用 AWS Glue ETL 服务。Glue 需要的第一个资源是一个 IAM 角色,它拥有对数据桶的必要权限:
            glue_role = iam.Role(
                self,
                "GlueRole",
                assumed_by=iam.CompositePrincipal(
                    iam.ServicePrincipal("glue.amazonaws.com")
                ),
                managed_policies=[
                    iam.ManagedPolicy.from_aws_managed_policy_name("service-role/AWSGlueServiceRole")
                ]
            )
            data_bucket.grant_read_write(glue_role)
    
  16. 我们现在可以创建一个 Glue 目录来存储对新鲍鱼数据的引用:
            glue_catalog = glue.CfnDatabase(
                self,
                "GlueDatabase",
                catalog_id=cdk.Aws.ACCOUNT_ID,
                database_input=glue.CfnDatabase.DatabaseInputProperty(
                    name=f"_new"
                )
            )
    
  17. 要用新的鲍鱼数据填充 Glue 目录,使用下面的代码创建一个 Glue Crawler。爬虫将抓取我们的数据桶中的新数据,并将其附加到胶合目录:
            glue_crawler = glue.CfnCrawler(
                self,
                "GlueCrawler",
                name=f"-crawler",
                role=glue_role.role_arn,
                database_name=glue_catalog.ref,
                targets={
                    "s3Targets": [
                        {
                            "path": f"s3://{data_bucket.bucket_name}/{model_name}_data/new/"
                        }
                    ]
                }
            )
    
  18. 我们还需要将爬行器的名称存储为 SSM 参数,以便在气流工作流中引用:
            ssm.StringParameter(
                self,
                "GlueCrawlerParameter",
                description="Glue Crawler Name",
                parameter_name="GlueCrawler",
                string_value=glue_crawler.name
            )
    
  19. 一旦新数据被添加到 Glue Catalog 中,我们就可以创建 Glue 作业来读取该数据,将其与原始的鲍鱼数据集合并,并执行必要的数据预处理任务,以使整个数据集为模型训练做好准备:
            glue_job = glue.CfnJob(
                self,
                "GlueETLJob",
                name=f"-etl-job",
                description="AWS Glue ETL Job to merge new + raw data, and process training data",
                role=glue_role.role_arn,
                glue_version="2.0",
                execution_property=glue.CfnJob.ExecutionPropertyProperty(
                    max_concurrent_runs=1
                ),
                command=glue.CfnJob.JobCommandProperty(
                    name="glueetl",
                    python_version="3",
                    script_location=f"s3://{data_bucket.bucket_name}/airflow/scripts/preprocess.py"
                ),
                default_arguments={
                    "--job-language": "python",
                    "--GLUE_CATALOG": glue_catalog.ref,
                    "--S3_BUCKET": data_bucket.bucket_name,
                    "--S3_INPUT_KEY_PREFIX": f"{model_name}_data/raw/abalone.data",
                    "--S3_OUTPUT_KEY_PREFIX": f"_data",
                    "--TempDir": f"s3:///glue-temp"
                },
                allocated_capacity=5,
                timeout=10
            )
    
  20. 由于气流工作流将调用粘合作业,我们也将作业名存储为 SSM 参数:
            ssm.StringParameter(
                self,
                "GlueJobParameter",
                description="Glue Job Name",
                parameter_name="GlueJob",
                string_value=glue_job.name
            )
    
  21. 为了确保原始的鲍鱼数据集也可用于胶合 ETL 作业,我们使用下面的代码来创建一个 S3 桶部署,该部署将原始的数据集上传到 S3:
            s3_deployment.BucketDeployment(
                self,
                "DeployData",
                sources=[
                    s3_deployment.Source.asset(os.path.join(os.path.dirname(__file__), "../artifacts/data"))
                ],
                destination_bucket=data_bucket,
                destination_key_prefix=f"_data/raw",
                retain_on_delete=False
            )
    
  22. 最后,我们创建一个允许气流 DAG 不断更新的 CodeBuild 项目。尽管我们正在构建一个以数据为中心的工作流,但我们还需要确保对已编码工作流本身的任何更新或更改都可以自动应用。下面的代码片段将code_deployment变量实例化为codebuild.Project() :
    ...
            code_deployment = codebuild.Project(
                self,
                "CodeDeploymentProject",
                project_name="CodeDeploymentProject",
                description="CodeBuild Project to Copy Airflow Artifacts to S3",
                source=codebuild.Source.code_commit(
                    repository=code_repo
                ),
                environment=codebuild.BuildEnvironment(
                    build_image=codebuild.LinuxBuildImage.STANDARD_5_0
                ),
                environment_variables={
                    "DATA_BUCKET": codebuild.BuildEnvironmentVariable(
                        value=data_bucket.bucket_name
                    )
                },
    ...
    
  23. CodeBuild 项目有三个阶段组成了BuildSpec,或者构建指令,即installbuildpost_build阶段。下面的代码片段显示了install阶段,其中安装了最新版本的 AWS CLI:
    ...
                            "install": {
                                "runtime-versions": {
                                    "python": 3.8
                                },
                                "commands": [
                                    "printenv",
                                    "echo 'Updating Build Environment'",
                                    "python -m pip install --upgrade pip",
                                    "python -m pip install --upgrade boto3 awscli"
                                ]
                            },
    ...
    
  24. 以下代码片段显示了更新气流资产的 AWS CLI 命令。在代码构建项目的build阶段,我们将气流工作流资产sync到 S3,这样一旦这些资产被复制到数据桶,MWAA 调度程序将自动导入新的 DAG 代码:
    ...
                            "build": {
                                "commands": [
                                    "echo 'Deploying Airflow Artifacts to S3'",
                                    "cd artifacts",
                                    "aws s3 sync airflow s3://${DATA_BUCKET}/airflow"
                                ]
                            },
    ...
    
  25. 为了确保 code build 项目拥有将气流资产同步到 S3 的适当权限,下面的代码片段授予code_deploy角色对data_bucket :
    ...
            data_bucket.grant_read_write(code_deployment.role)
    ...
    
    的读写权限
  26. 为了确保工作流的变更被应用,我们创建了一个事件触发器,每当任何变更被提交到源存储库时,它就启动 CodeBuild 项目:
            code_repo.on_commit(
                "StartDeploymentProject",
                target=targets.CodeBuildProject(code_deployment)
            )
    
  27. 保存并关闭abalone_data_pipeline_stack.py文件。
  28. Now that we have defined the necessary resources as a CDK construct, we can define the CDK application to deploy these resources. Using the workspace navigation panel, open the app.py file in the abalone-data-pipeline folder and delete the existing template code. 注意 关于app.py代码的完整示例可以在本章的配套 GitHub 资源库中找到(https://GitHub . com/packt publishing/Automated-Machine-Learning-on-AWS/tree/main/chapter 08/CDK)。
  29. 删除现有模板代码,并添加以下代码来定义 CDK 应用:

!/usr/bin/env python3

import os
import aws_cdk as cdk
from abalone_data_pipeline.abalone_data_pipeline_stack import DataPipelineStack
MODEL = "abalone"
CODECOMMIT_REPOSITORY = "abalone-data-pipeline"
app = cdk.App()
DataPipelineStack(
    app,
    CODECOMMIT_REPOSITORY,
    env=cdk.Environment(account=os.getenv("CDK_DEFAULT_ACCOUNT"), region=os.getenv("CDK_DEFAULT_REGION")),
    model_name=MODEL,
    repo_name=CODECOMMIT_REPOSITORY,
    airflow_environment_name=f"-airflow-environment"
)
app.synth()
30.  保存并关闭`app.py`文件。
31.  在为我们的工作流资源部署 CDK 应用之前,我们需要从 UHCI 存储库中下载原始的鲍鱼数据集。在 Cloud9 终端运行以下命令:
$ cd ~/environment/abalone-data-pipeline/
$ mkdir -p artifacts/data
$ wget -c -P artifacts/data https://archive.ics.uci.edu/ml/machine-learning-databases/abalone/abalone.data
32.  下一个工件是`analyze_results` Lambda 函数的源代码。运行以下命令创建必要的文件夹:
$ cd ~/environment/abalone-data-pipeline/
$ mkdir -p artifacts/lambda/analyze_results
33.  为了定义 Lambda 函数处理程序,我们可以重用我们在第 7 章[] B17649_07_ePub.xhtml##_idTextAnchor106
analyze_results Lambda function assesses the model evaluation results against the Root Mean Squared Error (RMSE) evaluation metric, you can review the code in the *Creating the ML workflow* section of *Chapter 7*, *Building the ML Workflow Using AWS Step Functions*.
**   我们需要配置的最后一个工件是`evaluate.py`文件。您将回忆起 [*第 7 章*] B17649_07_ePub.xhtml##_idTextAnchor106
evaluate.py code, you can refer to the *Creating the ML workflow* section of *Chapter 7*, *Building the ML Workflow Using AWS Step Functions*.
    *   Since we have all the necessary artifacts for the CDK application, we can go ahead and deploy it by running the following command in the terminal window:
cdk deploy
    注意
    部署 CDK 大约需要 5 分钟,你可以在云形成控制台([https://console.aws.amazon.com/cloudformation/home](https://console.aws.amazon.com/cloudformation/home))中跟踪进度。
    *   一旦部署了资源栈,我们就可以添加初始的气流工件。在我们部署 MWAA 环境之前,需要这些工件。为此,运行以下命令来设置工件源文件夹:
$ cd ~/environment/abalone-data-pipeline/
$ mkdir -p artifacts/airflow/dags
    *   我们将定义的第一个气流工件是`.airflowignore`文件。这是一个有用的文件,用于添加我们希望气流调度程序忽略的任何 DAG 文件。运行以下命令创建该文件:
$ touch artifacts/airflow/dags/.airflowignore
    *   Next, we define the `requirements.txt` file. This file specifies the various Python dependencies that the Airflow DAG will need to install on the workers. Using the Cloud9 navigation panel, right-click on the `airflow` folder and select `requirements.txt` and open it for editing.
    注意
    有关在 MWAA 管理 Python 依赖关系的最佳实践的更多信息,请参见 MWAA 文档([https://docs . AWS . Amazon . com/mwaa/latest/user guide/best-practices-dependencies . html](https://docs.aws.amazon.com/mwaa/latest/userguide/best-practices-dependencies.html))。
    *   In the `requirements.txt` file, add the following dependencies:
sagemaker==2.49.1
s3fs==0.5.1
boto3>=1.17.4
    注意
    我们使用 SageMaker SDK 版本`2.49.1`的唯一原因是为了确保与 ML 实践者在前一章中进行的 ML 实验保持一致。对于数据工程师或 ML 从业者提供的任何源代码,保持 Python 依赖版本不变是一个很好的实践。
    *   保存并关闭`requirements.txt`文件。*   由于我们已经有了部署 MWAA 环境所需的必要工件,我们可以运行以下命令将这些更改提交到 CodeCommit 存储库,并让 CodeBuild 项目在数据桶中自动更新它们:
$ git add -A
$ git commit -m "Initial commit of workflow artifacts"
$ git push --set-upstream origin main
 *更新大约需要一分钟完成。我们可以在控制台([https://console.aws.amazon.com/codesuite/codebuild/projects](https://console.aws.amazon.com/codesuite/codebuild/projects))中查看 CodeBuild 项目的输出,在**构建项目**仪表板中选择 **CodeDeploymentProject** 。如您所见,我们现在有了一种向 MWAA 部署新的和更新的 Dag 的机制。

因此,我们可以进入下一步;部署和配置 MWAA 环境。

## 配置 MWAA 环境

既然已经部署了必要的资源和先决条件,我们就可以开始配置 MWAA 环境了。以下步骤将引导您完成此过程:
1.  在新的浏览器选项卡中打开 MWAA 控制台([https://console.aws.amazon.com/mwaa/home](https://console.aws.amazon.com/mwaa/home))并点击**创建环境**按钮。
2.  在**指定细节**页面,向下滚动到亚马逊 S3 部分的 **DAG 代码,点击**浏览 S3** 按钮到我们的数据桶。**
3.  In the **Choose S3 bucket** window, check the radio button next to the bucket called **abalone-data-<REGION>-<ACCOUNT ID>** and then click **Choose**.
    注意
    确保桶名中的 **<地区>** 和 **<账户 ID >** 与您的环境相匹配。
4.  点击**选择**将返回到**指定详细信息**页面。在该页面中,点击 **DAGs 文件夹**部分下的按钮**浏览 S3** 按钮,打开**选择 S3 DAG 路径**窗口。*图 8.2* 显示了该窗口的一个示例:
![Figure 8.2 – Choose DAG path in S3 window
]

图 8.2–在 S3 窗口中选择 DAG 路径
1.  从*图 8.2* 中可以看到,点击`airflow`文件夹将其打开。
2.  打开`airflow`文件夹后,选择`dags`文件夹旁边的单选按钮,如图*图 8.3* 所示:
![Figure 8.3 – Selecting the dags folder
]

图 8.3–选择 dags 文件夹
1.  从*图 8.3* 中可以看到,一旦选择了`dags`文件夹,点击**选择**按钮返回上一个屏幕。
2.  现在,点击`requirements.txt`文件。
3.  重复步骤 *5* 和 *6* ,但这次点击`requirements.txt`文件旁边的单选按钮,如图*图 8.4* 所示:
![Figure 8.4 – Selecting the requirements.txt file
]

图 8.4–选择 requirements.txt 文件
1.  如您从*图 8.4* 中所见,一旦`requirements.txt`文件被选中,点击**选择**按钮返回上一屏幕。
2.  在**指定详细信息**屏幕上,点击**下一步**按钮,将进入**配置高级设置**屏幕。
3.  在**配置高级设置**界面的**联网**部分下,点击**创建 MWAA VPC** 按钮,为 MWAA 创建一个专用的**虚拟私有云** ( **VPC** )。这将在新的浏览器选项卡中启动**快速创建堆栈**云形成控制台。
4.  保留所有字段的默认值,并点击**创建堆栈**按钮。构建这个堆栈大约需要 2 分钟,一旦状态注册为 **CREATE_COMPLETED** ,就返回到托管 MWAA 控制台的浏览器选项卡。
5.  在**联网**部分,点击刷新按钮,然后使用**选择 VPC** 下拉菜单,选择 VPC 名为 **MWAA-VPC** ,如图*图 8.5* 所示:
![Figure 8.5 – MWAA VPC
]

图 8.5-MWAA VPC
1.  正如您从*图 8.5* 中看到的,选择 **MWAA-VPC** 会自动用正确的私有子网填充**子网 1** 和**子网 2** 字段。
2.  向下滚动到**网络服务器访问**部分,点击**公共网络(无额外设置)**选项旁边的单选按钮。
3.  Leave the rest of the fields at their defaults and then click the **Next** button.
    注意
    请注意在**权限**部分自动创建的角色名称。我们将为此角色分配 AWS 服务的附加权限。
4.  Review the MWAA environment configuration and click the **Create environment** button.
    注意
    部署 MWAA 环境将产生额外的 AWS 使用成本,超过免费层的使用成本。关于 MWAA 定价的更多信息,请参考产品定价文档([https://AWS . Amazon . com/managed-workflows-for-Apache-air flow/pricing/](https://aws.amazon.com/managed-workflows-for-apache-airflow/pricing/))。
5.  在配置环境时,在新的浏览器选项卡中打开 IAM 控制台([https://console.aws.amazon.com/iamv2/home?#/home](https://console.aws.amazon.com/iamv2/home?#/home))。
6.  在 IAM 控制台的左侧导航面板中,单击**角色**。
7.  在**角色**仪表板中,向下滚动直到看到在 MWAA 设置期间创建的 IAM 角色,然后单击它。
8.  在角色**摘要**仪表盘中,点击**附加策略**按钮。
9.  使用**附加权限**屏幕中的搜索栏,搜索并选择 **AmazonS3FullAccess** 、 **AWSLambda_FullAccess** 、**amazonsmsmfullaccess**、**awsglueconsolefullacess**和**amazonsagemakerfull access**策略。
10.  点击**附加策略**按钮。
11.  **汇总**屏幕应类似于*图 8.6* :
![Figure 8.6 – MWAA Role Permissions
]

图 8.6-MWAA 角色权限
1.  As you can see from *Figure 8.6*, we have added the necessary access to the various AWS services that the Airflow DAG will be leveraging.
    注意
    不建议在生产环境中提供对必要的 AWS 服务的完全访问。为了简单起见,我们在示例的上下文中利用这些策略。
2.  在大约 20 到 30 分钟后,MWAA 环境应该**可用**。*图 8.7* 显示了**气流环境**屏幕应该是什么样子:
![Figure 8.7 – Airflow environments
]

图 8.7–气流环境

正如您从*图 8.7* 中看到的,MWAA 环境已经部署完毕,可供我们使用。现在我们有了所有的先决条件以及一个气流平台。在下一章中,我们将使用这个 MWAA 环境来创建一个自动化的、以数据为中心的 ML 过程。

## 总结

在本章中,我们介绍了一种在 AWS 上自动化 ML 工作流的新方法,即以数据为中心的方法。为了编排这个以数据为中心的工作流,我们引入了一个通常由数据工程团队使用的平台,称为 Apache Airflow,并展示了如何使用亚马逊 MWAA 构建这样一个环境。

在下一章中,我们将看到如何继续使用我们刚刚创建的环境,并创建一个 DAG 来自动化 ML 过程,以创建*年龄计算器*模型。*

## 文章列表
- [AWS自动化机器学习-一、AWS 上的自动化机器学习入门](https://www.oomspot.com/post/shangdezidonghuajiqixuexirumen)
- [AWS自动化机器学习-七、使用 AWS 步骤函数构建 ML 工作流](https://www.oomspot.com/post/uzhouhanshugoujianmlgongzuoliu)
- [AWS自动化机器学习-三、使用 AutoGluon 技术自动化复杂的模型开发](https://www.oomspot.com/post/ishuzidonghuafuzademoxingkaifa)
- [AWS自动化机器学习-九、使用 Amazon Managed Workflows 为 Apache AirFlow 构建 ML 工作流](https://www.oomspot.com/post/acheairflowgoujianmlgongzuoliu)
- [AWS自动化机器学习-二、使用 SageMaker 自动驾驶器自动化机器学习模型开发](https://www.oomspot.com/post/izidonghuajiqixueximoxingkaifa)
- [AWS自动化机器学习-五、自动化 ML 模型的持续部署](https://www.oomspot.com/post/uzidonghuamlmoxingdechixubushu)
- [AWS自动化机器学习-八、使用 Apache Airflow 实现机器学习过程的自动化](https://www.oomspot.com/post/anjiqixuexiguochengdezidonghua)
- [AWS自动化机器学习-六、使用 AWS 步骤函数自动化机器学习过程](https://www.oomspot.com/post/nshuzidonghuajiqixuexiguocheng)
- [AWS自动化机器学习-十、机器学习软件开发生命周期(MLSDLC)简介](https://www.oomspot.com/post/fashengmingzhouqimlsdlcjianjie)
- [AWS自动化机器学习-十一、MLSDLC 的持续集成、部署和训练](https://www.oomspot.com/post/lcdechixujichengbushuhexunlian)
- [AWS自动化机器学习-四、机器学习的持续集成和持续交(CI/CD)](https://www.oomspot.com/post/idechixujichenghechixujiaocicd)
- [AWS自动化机器学习-第一部分:自动机器学习过程和 AWS 上的 AutoML 的基础知识](https://www.oomspot.com/post/eawsshangdeautomldejichuzhishi)
- [AWS自动化机器学习-第三部分:优化以源代码为中心的自动化机器学习方法](https://www.oomspot.com/post/gxindezidonghuajiqixuexifangfa)
- [AWS自动化机器学习-第二部分:通过持续集成和持续交付(CI/CD)实现机器学习过程的自动化](https://www.oomspot.com/post/anjiqixuexiguochengdezidonghua)
- [AWS自动化机器学习-第五部分:自动化 AWS 上的端到端生产应用](https://www.oomspot.com/post/deduandaoduanshengchanyingyong)
- [AWS自动化机器学习-第四部分:优化以数据为中心的自动化机器学习方法](https://www.oomspot.com/post/gxindezidonghuajiqixuexifangfa)
- [AWS自动化机器学习-零、前言](https://www.oomspot.com/post/szidonghuajiqixuexilingqianyan)

更多推荐

更多
  • AWS自动化机器学习-十一、MLSDLC 的持续集成、部署和训练 技术要求,编纂持续集成阶段,管理持续部署阶段,管理持续训练,延伸,构建集成工件,构建测试工件,构建生产工件,自动化持续集成流程,回顾构建阶段,回顾测试阶段,审查部署和维护阶段,回顾应用用户体验,创建新的鲍鱼调查数据,回顾持续训练流程,清
    Apache CN

  • AWS自动化机器学习-六、使用 AWS 步骤函数自动化机器学习过程 技术要求,介绍 AWS 步骤功能,使用 Step 函数 Data Science SDK for CI/CD,建立 CI/CD 渠道资源,创建状态机,解决状态机的复杂性,更新开发环境,创建管道工件库,构建管道应用构件,部署 CI/CD
    Apache CN

  • AWS自动化机器学习-第三部分:优化以源代码为中心的自动化机器学习方法 本节将向您介绍整体 CI/CD 流程的局限性,以及如何将 ML 从业者的角色进一步整合到管道构建流程中。本节还将介绍这种角色集成如何简化自动化过程,并通过向您介绍 AWS Step 函数向您展示一种优化的方法。本节包括以下章节:
    Apache CN

  • AWS自动化机器学习-一、AWS 上的自动化机器学习入门 技术要求,洗钱流程概述,洗钱过程的复杂性,端到端 ML 流程示例,AWS 如何使 ML 开发和部署过程更容易自动化,介绍 ACME 渔业物流,ML 的情况,从数据中获得洞察力,建立正确的模型,训练模型,评估训练好的模型,探索可能的后续步
    Apache CN

  • AWS自动化机器学习-二、使用 SageMaker 自动驾驶器自动化机器学习模型开发 技术要求,介绍 AWS AI 和 ML 前景,SageMaker 自动驾驶器概述,利用 SageMaker 自动驾驶器克服自动化挑战,使用 SageMaker SDK 自动化 ML 实验,SageMaker Studio 入门,准备实验
    Apache CN

  • AWS自动化机器学习-四、机器学习的持续集成和持续交(CI/CD) 四、机器学习的持续集成和持续交CI/CD技术要求,介绍 CI/CD 方法,通过 CI/CD 实现 ML 自动化,在 AWS 上创建 CI/CD 管道,介绍 CI/CD 的 CI 部分,介绍 CI/CD 的 CD 部分,结束循环,采取以部
    Apache CN

  • AWS自动化机器学习-九、使用 Amazon Managed Workflows 为 Apache AirFlow 构建 ML 工作流 技术要求,开发以数据为中心的工作流程,创建合成鲍鱼调查数据,执行以数据为中心的工作流程,构建和单元测试数据 ETL 工件,构建气流 DAG,清理, 在前面的年龄计算器示例中,我们了解了如何通过 ML 从业者和开发人员团队之间的跨职能
    Apache CN

  • AWS自动化机器学习-七、使用 AWS 步骤函数构建 ML 工作流 技术要求,构建状态机工作流,执行集成测试,监控管道进度,设置服务权限,创建 ML 工作流程, 在本章中,我们将从第六章中的 [处继续,使用 AWS 步骤函数自动化机器学习过程。您将从那一章中回忆起,我们正在努力实现的主要目标是简化
    Apache CN

  • AWS自动化机器学习-八、使用 Apache Airflow 实现机器学习过程的自动化 技术要求,介绍阿帕奇气流,介绍亚马逊 MWAA,利用气流处理鲍鱼数据集,配置 MWAA 系统的先决条件,配置 MWAA 环境, 当建立一个 ML 模型时,有一个所有 ML 从业者都知道的基本原则;也就是说,最大似然模型只有在数据被训练时
    Apache CN

  • AWS自动化机器学习-五、自动化 ML 模型的持续部署 技术要求,部署 CI/CD 管道,构建 ML 模型工件,执行自动化 ML 模型部署,整理管道结构,创建 CDK 应用,部署管道应用,查看建模文件,审查申请文件,查看模型服务文件,查看容器构建文件,提交 ML 工件,清理, 在 [第 4
    Apache CN

  • 近期文章

    更多
    文章目录

      推荐作者

      更多