AWS自动化机器学习-九、使用 Amazon Managed Workflows 为 Apache AirFlow 构建 ML 工作流

作者: Apache CN

在前面的年龄计算器示例中,我们了解了如何通过 ML 从业者和开发人员团队之间的跨职能协作,为 ML 工作流自动化应用以源代码为中心的方法。在 [第 8 章] B17649_08_ePub.xhtml##_idTextAnchor115

因此,为了构建一个成功的以数据为中心的 ML 工作流,我们需要应用相同的方法在 ML 从业者和数据工程团队之间创建一个敏捷的、跨功能的协作。因此,在本章中,我们将继续我们在 [第 8 章] B17649_08_ePub.xhtml##_idTextAnchor115

因此,本章的主要动机是强调数据工程和 ML 实践者团队如何通过构建和执行负责这个以数据为中心的工作流的 Airflow DAG,在 Apache Airflow 上构建、执行和管理自动化的 ML 过程。在本章结束时,您将了解添加新的训练数据将如何触发自动化的端到端 ML 流程,并能够生成生产级年龄计算器模型。

为此,我们将讨论以下主题:

  • 开发以数据为中心的工作流
  • 创建合成鲍鱼调查数据
  • 执行以数据为中心的工作流

技术要求

对于本章,您将需要以下内容:

  • 网络浏览器(为了获得最佳体验,建议您使用 Chrome 或 Firefox。)
  • 访问您在 [第 8 章] B17649_08_ePub.xhtml##_idTextAnchor115
  • 访问我们在 [第八章] B17649_08_ePub.xhtml##_idTextAnchor115
  • 参考 AWS 免费层的使用限制,以避免超出不必要的成本。
  • 本章的源代码示例在本书的 GitHub 资源库中提供:https://GitHub . com/packt publishing/Automated-Machine-Learning-on-AWS/tree/main/chapter 09)。Cloud9 开发环境中应该已经有代码示例了。如果没有,请参考 [第四章] B17649_04_ePub.xhtml##_idTextAnchor061

开发以数据为中心的工作流程

在 [第八章] B17649_08_ePub.xhtml##_idTextAnchor115 ![Figure 9.1 – Workflow development process ]

图 9.1-工作流开发流程

正如您所看到的,数据工程团队必须开发组成整个过程的两个主要工件,如下所示:

  • 该单元测试了数据 ETL 工件
  • 该装置测试了气流 DAG

一旦数据工程团队创建并测试了负责合并和准备训练数据的 ETL 工件,他们就可以将它们与 ML 模型工件相结合,以创建代表以数据为中心的工作流的气流 DAG。在对这个气流 DAG 进行单元测试时,为了确保数据转换代码和 ML 模型代码成功集成,可以将结果工作流发布到产品中。

让我们从数据工程团队的角度开始构建 ETL 工件。

构建和单元测试数据 ETL 工件

在整个以数据为中心的工作流的上下文中,ETL 任务背后的主要目标是将任何新数据与现有的数据合并,以便生成的数据集可以进一步划分为单独的训练、验证和测试数据集。然而,当数据工程团队构建这项任务背后的代码时,他们需要记住,并不总是能够预先确定需要合并的新数据的准确数量。因此,在本节中,我们将为 ETL 任务创建代码工件,并通过使用 AWS Glue 作业将任务作为 Spark 脚本执行来确保任务的可伸缩性。将用于执行 ETL 任务的 AWS 胶合作业在第 8 章 、中被创建为 CDK 构造,使用 Apache Airflow 自动化机器学习过程。

小费

为了帮助数据工程团队创建和单元测试这个 Spark 脚本,AWS 提供了一个 Docker 容器,它捆绑了必要的库来构建和测试 Glue ETL 作业。AWS 已经在一篇名为使用容器在本地开发 AWS Glue ETL 作业的博客文章中发布了这些信息(https://AWS . Amazon . com/blogs/big-data/Developing-AWS-Glue-ETL-jobs-locally-using-a-container/)。如果你使用博文中提到的解决方案,建议你安装熊猫图书馆的版本 0.24.1 。这个版本的熊猫被要求直接复制一个 CSV 文件到 S3。

因此,为了开始构建 ETL 作业,我们将创建一个名为preprocess.py的 Python 脚本。这个脚本将从包含更新的鲍鱼数据的胶合目录中读取,并将其与原始鲍鱼数据集合并,提供 ML 模型所需的整体特征转换。

注意

由于本书的核心重点不是如何构造 Spark 脚本,preprocess.py文件的基础来自 AWS SageMaker GitHub 存储库(https://GitHub . com/AWS/Amazon-sage maker-examples/tree/master/advanced functionality/inference pipeline Spark ml xgboost _ 鲍鱼)。这个例子是在的 Apache 2.0 许可下授权的。我们将在这个例子的基础上进行构建,并针对我们的用例进行定制。

为了创建 ETL 脚本,我们将继续使用 Cloud9 环境。请遵循以下步骤:

  1. Using the navigation panel of the Cloud9 environment, navigate to the abalone-data-pipeline folder. 注意

_idTextAnchor115

  1. abalone-data-pipeline文件夹中,展开artifacts文件夹,然后展开airflow文件夹。右键点击scripts文件夹,选择preprocess.py,打开进行编辑。
  2. preprocess.py文件中,添加以下代码来导入必要的 PySpark 和 AWS 胶水库:
    import sys
    import os
    import boto3
    import pyspark
    import pandas as pd
    from functools import reduce
    from pyspark.sql import SparkSession, DataFrame
    from pyspark.ml import Pipeline
    from pyspark.sql.types import StructField, StructType, StringType, DoubleType
    from pyspark.ml.feature import StringIndexer, VectorIndexer, OneHotEncoder, VectorAssembler
    from pyspark.sql.functions import *
    from awsglue.job import Job
    from awsglue.transforms import *
    from awsglue.context import GlueContext
    from pyspark.context import SparkContext
    from awsglue.utils import getResolvedOptions
    from awsglue.dynamicframe import DynamicFrame
    from awsglue.utils import getResolvedOptions
    
  3. 接下来,我们将构建一些实用函数来帮助处理数据。第一个实用函数叫做csv_line(),我们通过它提供来自 Spark弹性分布式数据集 ( RDD )的一行文本数据,并创建一个逗号分隔的字符串。这个字符串最终将被写入 S3 上的一个 CSV 文件:
    def csv_line(data):
        r = ','.join(str(d) for d in data[1])
        return str(data[0]) + "," + r
    
  4. The next function that we will create is called toS3(). This function extracts the relevant features from the dataset, including the target feature, calls the csv_line() function to create a comma-delimited string for each line, converts the dataset into a pandas DataFrame, and writes the DataFrame to S3:
    def toS3(df, path):
        rdd = df.rdd.map(lambda x: (x.rings, x.features))
        rdd_lines = rdd.map(csv_line)
        spark_df = rdd_lines.map(lambda x: str(x)).map(lambda s: s.split(",")).toDF()
        pd_df = spark_df.toPandas()
        pd_df = pd_df.drop(columns=["_3"])
        pd_df.to_csv(f"s3://", header=False, index=False)
    
    注意 使用 Spark 数据帧使我们能够通过将数据集分布在多个 Spark 节点上来克服 pandas 数据帧的内存限制。然而,当 Spark 数据帧被写入磁盘时,它会创建多个部分文件,这取决于 RDD 分区的数量。要创建单个 CSV 文件,我们必须将 Spark 数据帧转换为单个 pandas 数据帧,从而将数据集写入单个文件。如果单个 pandas 数据帧超过一定的内存限制,使用这种技术可能无法扩展。然而,由于示例数据集并不大,我们可以使用 pandas 来创建单个文件。
  5. Finally, we must create the main() function. Using the following code snippet, we can initialize the spark and glueContext() classes to wrap the SparkContext object:
    ...
    def main():
        glueContext = GlueContext(SparkContext.getOrCreate())
        spark = SparkSession.builder.appName("PySparkAbalone").getOrCreate()
        spark.sparkContext._jsc.hadoopConfiguration().set("mapred.output.committer.class", "org.apache.hadoop.mapred.FileOutputCommitter")
    ...
    
    注意 由于 AWS Glue 本质上是一个无服务器 Spark 处理作业,SparkContext表示到无服务器 Spark 集群的连接,它由 AWS 在后台创建和管理。关于SparkContext类的更多信息,请参考 Spark 文档(https://Spark . Apache . org/docs/latest/API/Java/org/Apache/Spark/Spark context . html)。
  6. 由于我们将把preprocess.py文件作为脚本,连同命令参数一起传递给 AWS Glue,下面的代码片段展示了我们如何使用getResolvedOptions()函数声明args变量。这是 AWS Glue 提供的一个实用函数,用于访问通过preprocess.py脚本传递的命令参数:
    ...
        args = getResolvedOptions(sys.argv, ["GLUE_CATALOG", "S3_BUCKET", "S3_INPUT_KEY_PREFIX", "S3_OUTPUT_KEY_PREFIX"])
    ...
    
  7. 要将鲍鱼数据作为 Spark 数据帧读入,我们必须为每种列数据类型提供适当的模式。在下面的代码片段中,我们声明了schema变量,它包含在数据集的每一列中找到的数据的类型或结构:
    ...
        schema = StructType(
            [
                StructField("sex", StringType(), True),
                StructField("length", DoubleType(), True),
                StructField("diameter", DoubleType(), True),
                StructField("height", DoubleType(), True),
                StructField("whole_weight", DoubleType(), True),
                StructField("shucked_weight", DoubleType(), True),
                StructField("viscera_weight", DoubleType(), True),
                StructField("shell_weight", DoubleType(), True),
                StructField("rings", DoubleType(), True)
            ]
        )
    ...
    
  8. 接下来,我们必须编写下面的代码来合并来自 Glue 目录的新数据,以及原始的鲍鱼数据集,以创建一个名为distinct_df的数据帧。这个数据帧是严格的,因为在合并过程之后,任何重复的行都被删除:
    ...
        columns = ["sex", "length", "diameter", "height", "whole_weight", "shucked_weight", "viscera_weight", "shell_weight", "rings"]
        new = glueContext.create_dynamic_frame_from_catalog(database=args["GLUE_CATALOG"], table_name="new", transformation_ctx="new")
        new_df = new.toDF()
        new_df = new_df.toDF(*columns)
        raw_df = spark.read.csv(("s3://{}".format(os.path.join(args["S3_BUCKET"], args["S3_INPUT_KEY_PREFIX"]))), header=False, schema=schema)
        merged_df = reduce(DataFrame.unionAll, [raw_df, new_df])
        distinct_df = merged_df.distinct()
    ...
    
  9. 现在我们有了一个唯一的数据框架,我们可以设置 ETL 管道并开始转换数据集,为 ML 训练做准备。如下面的代码片段所示,ETL 过程的第一部分是使用StringIndexer()类将sex列作为训练特征进行索引。一旦sex特征被索引,我们就可以对该特征进行分类编码,从而通过使用OneHotEncoder()类:
    ...
        sex_indexer = StringIndexer(inputCol="sex", outputCol="indexed_sex")
        sex_encoder = OneHotEncoder(inputCol="indexed_sex", outputCol="sex_vec")
    ...
    
    为每个性别创建向量
  10. OneHotEncoder的输出是一组新的列,称为sex_vec,代表每个性别。下一步是使用VectorAssembler()类将sex_vec列与数据集的原始列合并。如下面的代码片段所示,在这里,我们必须实例化VectorAssembler并定义assembler变量:
    ...
        assembler = VectorAssembler(
            inputCols=[
                "sex_vec",
                "length",
                "diameter",
                "height",
                "whole_weight",
                "shucked_weight",
                "viscera_weight",
                "shell_weight"
            ],
            outputCol="features"
        )
    ...
    
  11. 如下面的代码片段所示,通过将sex_indexersex_encoderassembler组合成一个Pipeline,然后将其装配到distinct_df数据帧上,我们就有了一个预处理过的transformed_df数据帧,为模型训练做好了准备:
    ...
        pipeline = Pipeline(stages=[sex_indexer, sex_encoder, assembler])
        model = pipeline.fit(distinct_df)
        transformed_df = model.transform(merged_df)
    ...
    
  12. 最后一步是将数据分成相对训练、验证和测试数据集。如下面的代码片段所示,我们必须调用toS3()函数将这些数据集作为 CSV 文件写入数据桶:
        (train_df, validation_df, test_df) = transformed_df.randomSplit([0.8, 0.15, 0.05])
        toS3(train_df, os.path.join(args["S3_BUCKET"], args["S3_OUTPUT_KEY_PREFIX"], "training/training.csv"))
        toS3(validation_df, os.path.join(args["S3_BUCKET"], args["S3_OUTPUT_KEY_PREFIX"], "training/validation.csv"))
        toS3(test_df, os.path.join(args["S3_BUCKET"], args["S3_OUTPUT_KEY_PREFIX"], "testing/testing.csv"))
    ...
    
  13. 这样就创建了主程序来执行数据预处理任务:
    ...
    if __name__ == "__main__":
        main()
    ...
    

这就完成了粘合作业的 ETL 脚本。在对脚本进行单元测试之后,数据工程师可以开始创建数据处理工作流本身,以气流 DAG 的形式。让我们开始构建这个 DAG。

构建气流 DAG

现在,数据工程师必须以 DAG 的形式为要执行的气流创建整体工作流。您会记得,气流 DAG 是由气流工人执行的一组连续任务或操作。为了简化创建这些连续操作的过程,Airflow 提供了几个预先打包的操作符。这些操作符实质上包含了工作流中每个任务的逻辑。由于我们将这些操作的实际执行卸载到 AWS 服务,如 Glue 和 SageMaker,AWS 为这些服务和许多其他服务提供了几个预构建的操作符(https://air flow . Apache . org/docs/Apache-air flow-providers-Amazon/stable/operators/index . html)。

然而,使用这些 AWS 提供者操作符需要数据工程师或 ML 从业者完全理解相关的任务操作符,以及 AWS 服务如何执行任务。为了简化 DAG 创建过程,我们将主要使用标准的PythonOperator()类(https://air flow . Apache . org/docs/Apache-air flow/stable/_ API/air flow/operators/python/index . html?highlight = python operator # air flow . operators . python . python operator调用 SageMaker 服务。这意味着数据工程师可以将 ML 实验笔记本中的 SageMaker SDK 代码复制并粘贴到工作流 DAG 中。这样做使得 ML 从业者和数据工程师更容易将 ML 过程集成到数据工作流中。正如您将看到的,在 DAG 中使用PythonOperator()类允许进行 AWS provider 操作者没有提供的进一步定制。

注意 AWS 团队提供了许多例子,展示了如何为 sage maker(https://sage maker . readthedocs . io/en/stable/workflows/air flow/index . html)利用 AWS 提供者操作符。然而,由于我们将使用PythonOperator()来构建 SageMaker 服务调用,我们将基于另一个 AWS 示例(https://github . com/AWS/Amazon-sage maker-examples/blob/master/sage maker-experiments/track-an-air flow-workflow/track-an-air flow-workflow . ipynb)来构建我们的解决方案。这个例子是在 Apache 2.0 许可下提供的。我们将以这个例子为基础,使我们的 DAG 类似于我们在第 7 章 、使用 AWS 步骤函数构建 ML 工作流中配置的 ML 工作流。您可以参考创建 ML 工作流程部分的图 7.1 查看 ML 工作流程。

要开始构建气流 DAG,请遵循以下步骤:

  1. 在您的 Cloud9 工作区中,右键单击dags文件夹并选择model
  2. 为了定义 Lambda 函数处理程序,我们可以重用我们在第 7 章[] B17649_07_ePub.xhtml##_idTextAnchor106
    model_training.py file, you can review the code in the *Creating the ML workflow* section of *Chapter 7*, *Building the ML Workflow Using AWS Step Functions*.
    
    接下来,右键单击dags文件夹并选择abalone_data_pipeline.py。* 双击文件上的进行编辑,并添加以下代码来导入基本 Python 库:
    import boto3
    import json
    from datetime import timedelta
    
    • Next, we must add the following SageMaker SDK libraries:
      import sagemaker
      from sagemaker.tensorflow import TensorFlow
      from sagemaker.tensorflow.serving import Model
      from sagemaker.processing import ProcessingInput, ProcessingOutput, Processor
      from sagemaker.model_monitor import DataCaptureConfig
      
      注意 这些是 ML 从业者用于 ML 实验笔记本的相同的 SageMaker SDK 库。
    • 现在,使用下面的代码,我们可以导入 AWS 提供者操作符,以及气流提供者操作符:
      import airflow
      from airflow import DAG
      from airflow.operators.python_operator import PythonOperator
      from airflow.providers.amazon.aws.operators.glue import AwsGlueJobOperator
      from airflow.providers.amazon.aws.operators.glue_crawler import AwsGlueCrawlerOperator
      from airflow.providers.amazon.aws.hooks.lambda_function import AwsLambdaHook
      from airflow.operators.python_operator import BranchPythonOperator
      from airflow.operators.dummy import DummyOperator
      
    • Next, we must use the following code to define our global variables, as well as get the stored parameters that we defined in the CDK application:
      sagemaker_seesion = sagemaker.Session()
      region_name = sagemaker_seesion.boto_region_name
      model_name = "abalone"
      data_prefix = "abalone_data"
      data_bucket = f"""{boto3.client("ssm", region_name=region_name).get_parameter(Name="AirflowDataBucket")["Parameter"]["Value"]}"""
      glue_job_name = f"""{boto3.client("ssm", region_name=region_name).get_parameter(Name="GlueJob")["Parameter"]["Value"]}"""
      crawler_name = f"""{boto3.client("ssm", region_name=region_name).get_parameter(Name="GlueCrawler")["Parameter"]["Value"]}"""
      sagemaker_role = f"""{boto3.client("ssm", region_name=region_name).get_parameter(Name="SageMakerRoleARN")["Parameter"]["Value"]}"""
      lambda_function = f"""{boto3.client("ssm", region_name=region_name).get_parameter(Name="AnalyzeResultsLambda")["Parameter"]["Value"]}"""
      container_image = f"763104351884.dkr.ecr.{region_name}.amazonaws.com/tensorflow-training:2.5.0-cpu-py37-ubuntu18.04-v1.0"
      training_input = f"s3:///{data_prefix}/training"
      testing_input = f"s3:///{data_prefix}/testing"
      data_capture = f"s3:///endpoint-data-capture"
      
      注意

_idTextAnchor106

*   我们必须定义的最终变量是气流 DAG 的`default_args`。在下面的代码中,我们指定了气流调度程序执行 DAG 所需的一些缺省值:
    default_args = {
        "owner": "airflow",
        "depends_on_past": False,
        "start_date": airflow.utils.dates.days_ago(1),
        "retries": 0,
        "retry_delay": timedelta(minutes=2)
    }
*   因为我们使用`PythonOperator()`类与 SageMaker 服务接口,所以我们必须定义多个封装服务调用逻辑的函数。正如我们前面提到的,这些函数可以从 ML 实验笔记本中剪切和粘贴。例如,下面的代码创建了`training()`函数,它利用 SageMaker SDK 创建了一个`TensorFlow()`估计器,并调用`fit()`方法将模型训练为 SageMaker 训练作业:
    def training(data, **kwargs):
        estimator = TensorFlow(
            base_job_name=model_name,
            entry_point="/usr/local/airflow/dags/model/model_training.py",
            role=sagemaker_role,
            framework_version="2.4",
            py_version="py37",
            hyperparameters=,
            script_mode=True,
            instance_count=1,
            instance_type="ml.m5.xlarge",
        )
        estimator.fit(data)
        kwargs["ti"].xcom_push(
            key="TrainingJobName",
            value=str(estimator.latest_training_job.name)
        )

_idTextAnchor115) 、中创建的evaluate.py文件,使用 Apache Airflow 自动化机器学习过程,并评估训练好的模型在测试数据集上的推断。以下代码片段显示了如何实例化`evaluation(

    ...
    def evaluation(ds, **kwargs):
        training_job_name = kwargs["ti"].xcom_pull(key="TrainingJobName")
        estimator = TensorFlow.attach(training_job_name)
        model_data = estimator.model_data,
    ...
*   作为`evaluation()`函数的一部分,我们还必须定义处理器变量来初始化 SageMaker `Processor`类的一个实例。下面的代码片段显示了我们必须如何提供必要的参数来执行处理作业,即处理作业名称、处理容器映像的位置、要执行的处理脚本、SageMaker IAM 角色以及用于处理作业的计算资源的类型:
    ...
        processor = Processor(
            base_job_name=f"-evaluation",
            image_uri=container_image,
            entrypoint=[
                "python3",
                "/opt/ml/processing/input/code/evaluate.py"
            ],
            instance_count=1,
            instance_type="ml.m5.xlarge",
            role=sagemaker_role,
            max_runtime_in_seconds=1200
        )
    ...
*   下面的代码片段显示了如何调用`processor.run()`方法来执行我们在*步骤 11* 中定义的处理作业。为了运行处理作业,我们必须提供测试数据集(`testing_input`)的的 S3 位置,训练好的 ML 模型的 S3 位置(`model_data`,以及脚本的`evaluate.py`的 S3 位置:
    ...
        processor.run(
            inputs=[
                ProcessingInput(
                    source=testing_input,
                    destination="/opt/ml/processing/testing",
                    input_name="input"
                ),
                ProcessingInput(
                    source=model_data[0],
                    destination="/opt/ml/processing/model",
                    input_name="model"
                ),
                ProcessingInput(
                    source="s3://{}/airflow/scripts/evaluate.py".format(data_bucket),
                    destination="/opt/ml/processing/input/code",
                    input_name="code"
                )
            ],
    ...
*   随着在*步骤 12* 中定义`inputs`,下面的代码片段显示了如何将处理作业结果的 S3 位置定义为一个`output`参数:
    ...
            outputs=[
                ProcessingOutput(
                    source="/opt/ml/processing/evaluation",
                    destination="s3://{}/{}/evaluation".format(data_bucket, data_prefix),
                    output_name="evaluation"
                )
            ]
        )
*   现在我们有了训练和评估 ML 模型的函数,我们必须定义一个函数,通过在已训练的 TensorFlow 估计器上使用`deploy()`方法将已训练的模型部署为 SageMaker 托管的端点:
    def deploy_model(ds, **kwargs):
        training_job_name = kwargs["ti"].xcom_pull(key="TrainingJobName")
        estimator = TensorFlow.attach(training_job_name)
        model = Model(
            model_data=estimator.model_data,
            role=sagemaker_role,
            framework_version="2.4",
            sagemaker_session=sagemaker.Session()
        )
        model.deploy(
            initial_instance_count=2,
            instance_type="ml.m5.large",
            data_capture_config=DataCaptureConfig(
                enable_capture=True,
                sampling_percentage=100,
                destination_s3_uri=data_capture
            )
        )
*   之前,作为 CDK 应用的一部分,我们定义了一个 Lambda 函数来计算训练模型的**评估指标。在下面的代码中,我们利用 AWS provider 操作符来调用这个 Lambda 函数:
    def get_results(ds, **kwargs):
        hook = AwsLambdaHook(
            function_name=lambda_function,
            aws_conn_id="aws_default",
            invocation_type="RequestResponse",
            log_type="None",
            qualifier="$LATEST",
            config=None
        )
        request = hook.invoke_lambda(
            payload=json.dumps(
                {
                    "Bucket": data_bucket,
                    "Key": f"/evaluation/evaluation.json"
                }
            )
        )
        response = json.loads(request["Payload"].read().decode())
        kwargs["ti"].xcom_push(
            key="Results",
            value=response["Result"]
        )
    ``` ***   The last function we must create will take the RMSE score and compare it to the evaluation threshold to determine whether the trained model is considered production-grade. If the evaluation is `approved`, the model will be deployed as a SageMaker Hosted Endpoint. Alternatively, if the model is above the predefined threshold, the workflow will be categorized as `rejected`:
def branch(ds, **kwargs):
    result = kwargs["ti"].xcom_pull(key="Results")
    if result > 3.1:
        return "rejected"
    else:
        return "approved"
    注意

##_idTextAnchor106
    *   现在我们已经为工作流的每个步骤创建了处理逻辑,我们可以使用下面的代码来定义 DAG 本身。这里,我们使用`DAG()`类来初始化 DAG,提供工作流的名称和默认参数,并安排 DAG 在每天午夜自动执行:
with DAG(
    dag_id=f"-data-workflow",
    default_args=default_args,
    schedule_interval="@daily",
    concurrency=1,
    max_active_runs=1,
) as dag:
    *   DAG 执行的第一步是`crawler_task`。在这里,Airflow Scheduler 调用 AWS Glue Crawler 来读取新数据,推断数据模式,并将数据附加到 Glue 目录。在下面的代码中,我们使用 AWS 提供的`AwsGlueCrawlerOperator()` :
    crawler_task = AwsGlueCrawlerOperator(
        task_id="crawl_data",
        config=
    )
    来定义任务*   工作流程的第二步称为`etl_task`。在这个任务中,我们调用 AWS 提供的`AwsGlueJobOperator()`来运行我们在 CDK 应用中定义的粘合作业。您会记得,这项工作将初始的鲍鱼数据集与来自 Glue 目录的新数据合并,然后对其进行预处理以创建训练、验证和测试数据集:
    etl_task = AwsGlueJobOperator(
        task_id="preprocess_data",
        job_name=glue_job_name
    )
    *   既然数据集已经准备好并存储在数据桶中,我们可以使用`PythonOperator()`类来调用我们的`training()`函数。此任务提供预处理的训练数据的位置,并调用 SageMaker 使用张量流估计器运行训练作业:
    training_task = PythonOperator(
        task_id="training",
        python_callable=training,
        op_args=[training_input],
        provide_context=True,
        dag=dag
    )
    *   The next task in the workflow is `evaluation_task`. Here, we're using `PythonOperator()` to call the `evaluation()` function, whereby we instruct SageMaker to execute a Processing Job and test the trained model against the testing dataset:
    evaluation_task = PythonOperator(
        task_id="evaluate_model",
        python_callable=evaluation,
        provide_context=True,
        dag=dag
    )
    注意
    注意,在的`evaluation()`函数中,我们使用 Airflow `PythonOperator()`类来代替 AWS 为 SageMaker 提供的操作符。
    *   下一个任务是 Lambda 函数,它从测试数据集中确定模型评估结果。`analyze_results_task`使用`PythonOperator()`调用`get_results()` Python 函数。您将回忆起这个 Python 函数使`AnalyzeResults` Lambda 函数返回 RMSE 分数:
    analyze_results_task = PythonOperator(
        task_id="analyze_results",
        python_callable=get_results,
        provide_context=True,
        dag=dag
    )
    *   基于返回的 RMSE 结果,工作流程中的下一个任务是确定模型是否准备好生产。这里,我们使用`BranchPythonOperator()`类调用`branch()` Python 函数,并根据预先确定的阈值:
    check_threshold_task = BranchPythonOperator(
        task_id="check_threshold",
        python_callable=branch,
        provide_context=True,
        dag=dag
    )
    评估返回的结果*   如果模型评估结果低于阈值,工作流程将进入`deployment_task`。该任务调用`deploy_model()` Python 函数来创建 SageMaker 托管端点:
    deployment_task = PythonOperator(
        task_id="deploy_model",
        python_callable=deploy_model,
        provide_context=True,
        dag=dag
    )
    *   最后,我们必须通过使用`DummyOperator()`为工作流中的`start`、`end`、`rejected`和`approved`状态创建占位符来创建占位符任务:
    start_task = DummyOperator(
        task_id="start",
        dag=dag
    )
    end_task = DummyOperator(
        task_id="end",
        dag=dag
    )
    rejected_task = DummyOperator(
        task_id="rejected",
        dag=dag
    )
    approved_task = DummyOperator(
        task_id="approved",
        dag=dag
    )
    *   现在已经定义了工作流的各种任务,我们必须创建 DAG 的整体流程。在下面的代码中,我们定义了每个特定任务之间的依赖关系:
    start_task >> crawler_task >> etl_task >> training_task >> evaluation_task >> analyze_results_task >> check_threshold_task >> [rejected_task, approved_task]
    approved_task >> deployment_task >> end_task
    rejected_task >> end_task
    *   The workflow DAG is now complete. Now, we must save the file and run the following commands in the Cloud9 Terminal window to check the code into the repository:
$ git add -A
$ git commit -m "Initial commit of workflow DAG"
$ git push
    注意
    在将 DAG 推送到 CodeCommit 存储库之前,数据工程师可能想要执行本地单元测试,以确保 DAG 在被 MWAA 导入之前完全起作用。AWS 提供了一个名为**AWS-mwaa-local-runner**([https://github.com/aws/aws-mwaa-local-runner](https://github.com/aws/aws-mwaa-local-runner))的命令行接口实用程序,它使用 Docker 容器在本地再现了一个 mwaa 环境。通过使用这个实用程序,数据工程师不仅可以对 DAG 进行单元测试,还可以验证 Python 依赖项是否可以在 MWAA 上工作([https://docs . AWS . Amazon . com/mwaa/latest/user guide/working-DAGs-dependencies . html](https://docs.aws.amazon.com/mwaa/latest/userguide/working-dags-dependencies.html))。*** 
 ***既然我们已经创建了工作流 DAG 及其相关的工件,我们必须将更改提交到 CodeCommit 存储库中。这将导致构建将这些文件部署到数据桶中。大约 5 分钟后,MWAA 计划程序将导入 DAG。现在,您可以在 MWAA 网络用户界面中查看 DAG。以下步骤将带您了解如何访问 MWAA 网络用户界面:
1.  打开 MWAA 控制台([https://console.aws.amazon.com/mwaa/home](https://console.aws.amazon.com/mwaa/home),选择你的 MWAA 环境,名为 **MyAirflowEnvironment** 。
2.  点击**打开气流界面**链接,打开 MWAA 网页界面。
3.  一旦 UI 在新的浏览器选项卡中打开,您最终应该会看到**鲍鱼数据管道** DAG。以下截图显示了 web UI 中新导入的鲍鱼数据工作流 DAG 的示例:
![Figure 9.2 – abalone-data-workflow DAG
]

图 9.2–鲍鱼-数据-工作流 DAG
1.  Click on the DAG to open it.
    注意
    暂时不要启用 DAG,因为我们还没有为成功执行工作流提供任何新数据。
2.  单击**图表视图**链接,以图表形式查看 DAG。下图以图表形式显示了整个工作流程:
![Figure 9.3 – Workflow graph
]

图 9.3-工作流程图

注意,这个以数据为中心的工作流类似于我们在第七章 、*中创建的 ML 工作流,除了 Glue Crawler 和可伸缩的 Glue ETL 作业。但是,在看到工作流之前,我们需要模拟添加新鲍鱼调查数据的过程。让我们开始吧。*

## 创建合成鲍鱼调查数据

在前面的部分中,我们从数据工程团队的角度创建了成功实现以数据为中心的工作流所需的两个主要工件,第一个是 ETL 工件,它将原始鲍鱼数据与新数据合并,以创建训练、验证和测试数据集。我们还将这些 ETL 工件集成到一个以数据为中心的工作流中,以 Airflow DAG 工件的形式,来自动化 ML 过程,由此我们可以训练、评估和部署生产级*年龄计算器*模型。

您可能还记得第八章[] B17649_08_ePub.xhtml##_idTextAnchor115
 *因此,在执行以数据为中心的工作流之前,我们必须解决下一步问题。下图说明了我们将在本节中处理的下一步,即模拟新鲍鱼调查数据,以进一步优化 ML 模型:
![Figure 9.4 – Simulating new Abalone survey data
]

图 9.4–模拟新的鲍鱼调查数据

因为在我们的例子中,鲍鱼调查公司是一个虚构的实体,所以我们必须以某种方式获得新的鲍鱼数据;由于 T2 没有新的数据来源,我们将不得不综合一些。幸运的是,在麻省理工学院**的**数据给 AI 小组**([https://dai.lids.mit.edu/](https://dai.lids.mit.edu/)),已经开源了一个名为**CTGAN**(https://github.com/sdv-dev/CTGAN)的项目来帮助我们合成新的鲍鱼数据。**

注意
CTGAN 项目在 MIT 许可下可用(https://github . com/sdv-dev/CTGAN/blob/master/License)。
CTGAN 使用基于深度学习的**合成数据生成器**,本质上是一个条件生成对抗网络模型,从数据中学习并*预测*新的数据集。以下步骤将向您展示如何使用 SageMaker Studio UI 利用 CTGAN 合成新的鲍鱼数据:
1.  打开 SageMaker 管理控制台([https://console.aws.amazon.com/sagemaker/home](https://console.aws.amazon.com/sagemaker/home),然后点击左侧导航面板中的 **SageMaker 域**选项。
2.  Once the **SageMaker Domain** dashboard opens, click on the **Launch app** drop-down box and select the **Studio** option to open the Studio IDE.
    注意

##_idTextAnchor032
3.  从的`root`文件夹中的`Untitled.ipynb`。因为我们使用这个笔记本来合成新的鲍鱼调查数据,所以可以在 SageMaker Studio 环境中的任何文件夹中创建它。
4.  When prompted, select the **Python 3 (data Science)** kernel and click the **Select** button.
    注意
    建议您使用一个**ml . M5 . 4x large(16 vcpu+64mb)**实例类型。但是,这将产生额外的 AWS 使用成本。此外,在本书的 GitHub 存储库中还提供了一个 Jupyter 笔记本示例([https://GitHub . com/packt publishing/Automated-Machine-Learning-on-AWS/blob/main/chapter 09/Notebook/Simulating % 20 new % 20 鲍鱼%20Survey%20Data.ipynb](https://github.com/PacktPublishing/Automated-Machine-Learning-on-AWS/blob/main/Chapter09/Notebook/Simulating%20New%20Abalone%20Survey%20Data.ipynb) )。
5.  内核启动后,在代码单元中使用以下代码安装 CTGAN 库:
%%capture
!pip install ctgan
6.  接下来,导入必要的 Python 库和全局变量:
import io
import boto3
import warnings
import pandas as pd
from time import gmtime, strftime
warnings.filterwarnings("ignore")
s3 = boto3.client("s3")
model_name = "abalone"
column_names = [
    "sex",
    "length",
    "diameter",
    "height",
    "whole_weight",
    "shucked_weight",
    "viscera_weight",
    "shell_weight",
    "rings"
]
7.  在下一个代码单元中,添加以下代码以打开原始(或 *raw* )鲍鱼数据集,该数据集是在我们部署 CDK 应用时上传到 S3 的,并定义新鲍鱼数据文件的名称。新数据文件将包含附加到文件名的当前日期和时间,使其成为唯一的调查:
data_bucket = f"""{boto3.client("ssm").get_parameter(Name="AirflowDataBucket")["Parameter"]["Value"]}"""
raw_data_key = f"_data/raw/abalone.data"
new_data_key = f"{model_name}_data/new/abalone.{strftime('%Y%m%d%H%M%S', gmtime())}"
s3_object = s3.get_object(Bucket=data_bucket, Key=raw_data_key)
raw_df = pd.read_csv(io.BytesIO(s3_object["Body"].read()), encoding="utf8", names=column_names)
8.  现在,添加下面的代码来拟合 CTGAN 模型到原始数据,指定`sex`特性为分类值:
from ctgan import CTGANSynthesizer
ctgan = CTGANSynthesizer()
ctgan.fit(raw_df, ["sex"])
9.  要生成 100 个合成调查数据样本,请将以下代码添加到新的代码单元格:
samples = ctgan.sample(100)
10.  现在我们已经将鲍鱼数据合成为`samples`变量,我们可以使用下面的代码将它复制到 S3 数据桶:
samples.to_csv(f"s3://{data_bucket}/{new_data_key}", header=False, index=False)
随着新鲍鱼调查数据的合成和上传到 S3,我们可以执行以数据为中心的工作流程。我们将在下一节学习如何做到这一点。

## 执行以数据为中心的工作流程

在前面的部分,我们成功地生成了新的鲍鱼调查数据。因此,现在该数据集存储在 S3 上,本节将带您了解如何执行和发布以数据为中心的工作流,以创建在新数据集和原始数据集上都已优化的生产级 ML 模型。

与第七章*中的 [*示例一样,使用 AWS 步骤函数*构建 ML 工作流,我们可以将此执行和工作流的任何预定执行视为发布变更。下图显示了我们在气流 DAG 中定义的工作流执行的概述:] B17649_07_ePub.xhtml##_idTextAnchor106
![Figure 9.5 – Overview of the workflow's execution
]

图 9.5-工作流执行概述

正如您所看到的,一旦我们有了新数据并且计划启动,Airflow DAG 将执行 CI 阶段,更新鲍鱼数据集,训练新的 ML 模型,并评估已训练模型的性能。

一旦模型被自动批准为生产级模型,它将在 CD 阶段部署到生产中。然后,运营团队可以获得托管模型的所有权,以管理并持续监控其生产性能。

此 CI/CD 流程将根据 DAG 计划在每晚午夜执行,以确保生产模型在接受新调查数据的训练时不断得到优化。

要了解这一点,请执行以下步骤来执行工作流发布:
1.  使用 Airflow web UI,单击**鲍鱼数据工作流** DAG 旁边的切换按钮来启用它。
2.  启用 DAG 后,工作流将自动启动。单击 DAG 以查看其执行情况。
3.  使用**树形视图**或**图形视图**链接,您可以查看正在执行的 DAG 的每个任务。下图显示了完成的工作流执行图的外观:
![Figure 9.6 – Completed workflow execution graph
]

图 9.6-完成的工作流执行图
1.  单击任何任务都将允许您查看其任务配置,更重要的是,查看来自工作节点的日志输出。点击一个任务打开**任务实例**窗口,然后点击**日志**按钮打开工人日志:
![Figure 9.7 – Task Instance window
]

图 9.7–任务实例窗口

注意

由于 SageMaker 任务使用`PythonOperator()`,日志的输出显示了 SageMaker CloudWatch 日志的重定向。这是使用 SageMaker SDK 和`PythonOperator()`进行 SageMaker 执行调用的另一个原因,而不是 AWS 提供的 SageMaker 操作符,因为这些需要您在 CloudWatch 中查看日志输出,而不是在 Airflow web UI 中。
1.  要查看评估 RMSE 得分,请点击`approved`或`rejected`进行生产:
![Figure 9.8 – Example RMSE
]

图 9.8–RMSE 示例

使用前面的步骤,我们已经创建并执行了一个以数据为中心的自动化 ML 工作流,该工作流也将每天执行。如果上传新的调查数据,模型将在原始数据集以及新的调查数据上进行训练,希望使其更加稳健。

但是,重要的是要认识到,尽管工作流每天执行一次,但我们已经部署了仅在计划执行期间使用的基础架构资源。这意味着 MWAA 工作节点在不使用时处于空闲状态,从而消耗了 AWS 可计费资源。为了抵消未使用资源的超支,我们可能需要检查 MWAA 环境的最小和最大工作人员数量,并进行相应的调整。

在下一节中,您将学习如何通过删除这些不同的资源来限制这个示例的 AWS 成本。

## 清理

按照这些步骤删除我们已经部署的各种资源:
1.  打开 SageMaker 控制台(https://console.aws.amazon.com/sagemaker/home),使用左侧导航菜单,选择**推理**,然后选择**端点**选项。
2.  通过选择每个**名称**旁边的单选按钮并点击**动作**下拉菜单,然后点击**删除**选项,删除任何**端点**。
3.  对任何**端点配置**和任何**模型**重复此程序。
4.  Open the MWAA console (https://console.aws.amazon.com/mwaa/home) and select your environment. Click the **Actions** dropdown and select the **Delete** option to delete the MWAA environment.
    注意
    等待 MWAA 环境被删除,然后继续下一步。
5.  打开云编队控制台([https://console.aws.amazon.com/cloudformation/home](https://console.aws.amazon.com/cloudformation/home))并通过勾选堆栈旁边的单选按钮选择 **MWAA-VPC 堆栈**。一旦选中,点击**删除**按钮。
6.  对**鲍鱼数据管道**堆栈重复相同的程序。

至此,我们已经成功删除了在本章和第八章*中部署的各种 AWS 资源,使用 Apache Airflow* 自动化机器学习过程。

## 总结

在这一章中,我们扩展了上一章介绍的以数据为中心的方法,使用 Apache Airflow 来自动化 ML 工作流。为此,我们学习了如何构建负责将现有数据集与新数据合并的工件,以优化年龄计算器模型。我们还学习了如何使用 CTGAN 数据生成器来合成这个新的调查数据。一旦新的调查数据上传到 S3,我们就学会了如何构建并执行负责以数据为中心的工作流的气流 DAG。

通过这个实际操作的例子,我们了解了平台、数据工程团队和 ML 实践者如何合作来创建一个以数据为中心的 ML 自动化方法。我们还了解了 AWS 如何通过实现亚马逊 MWAA 环境来简化 Apache Airflow 环境的部署、管理和维护,并随后使用该环境创建了生产级*年龄计算器*模型。

在下一章中,我们将应用我们在本章和上一章中所学的知识,了解以数据为中心的方法如何进一步增强 CI/CD 方法,以包括**持续训练** ( **CT** ),这是 ML 自动化的一个额外阶段。****

## 文章列表
- [AWS自动化机器学习-一、AWS 上的自动化机器学习入门](https://www.oomspot.com/post/shangdezidonghuajiqixuexirumen)
- [AWS自动化机器学习-七、使用 AWS 步骤函数构建 ML 工作流](https://www.oomspot.com/post/uzhouhanshugoujianmlgongzuoliu)
- [AWS自动化机器学习-三、使用 AutoGluon 技术自动化复杂的模型开发](https://www.oomspot.com/post/ishuzidonghuafuzademoxingkaifa)
- [AWS自动化机器学习-九、使用 Amazon Managed Workflows 为 Apache AirFlow 构建 ML 工作流](https://www.oomspot.com/post/acheairflowgoujianmlgongzuoliu)
- [AWS自动化机器学习-二、使用 SageMaker 自动驾驶器自动化机器学习模型开发](https://www.oomspot.com/post/izidonghuajiqixueximoxingkaifa)
- [AWS自动化机器学习-五、自动化 ML 模型的持续部署](https://www.oomspot.com/post/uzidonghuamlmoxingdechixubushu)
- [AWS自动化机器学习-八、使用 Apache Airflow 实现机器学习过程的自动化](https://www.oomspot.com/post/anjiqixuexiguochengdezidonghua)
- [AWS自动化机器学习-六、使用 AWS 步骤函数自动化机器学习过程](https://www.oomspot.com/post/nshuzidonghuajiqixuexiguocheng)
- [AWS自动化机器学习-十、机器学习软件开发生命周期(MLSDLC)简介](https://www.oomspot.com/post/fashengmingzhouqimlsdlcjianjie)
- [AWS自动化机器学习-十一、MLSDLC 的持续集成、部署和训练](https://www.oomspot.com/post/lcdechixujichengbushuhexunlian)
- [AWS自动化机器学习-四、机器学习的持续集成和持续交(CI/CD)](https://www.oomspot.com/post/idechixujichenghechixujiaocicd)
- [AWS自动化机器学习-第一部分:自动机器学习过程和 AWS 上的 AutoML 的基础知识](https://www.oomspot.com/post/eawsshangdeautomldejichuzhishi)
- [AWS自动化机器学习-第三部分:优化以源代码为中心的自动化机器学习方法](https://www.oomspot.com/post/gxindezidonghuajiqixuexifangfa)
- [AWS自动化机器学习-第二部分:通过持续集成和持续交付(CI/CD)实现机器学习过程的自动化](https://www.oomspot.com/post/anjiqixuexiguochengdezidonghua)
- [AWS自动化机器学习-第五部分:自动化 AWS 上的端到端生产应用](https://www.oomspot.com/post/deduandaoduanshengchanyingyong)
- [AWS自动化机器学习-第四部分:优化以数据为中心的自动化机器学习方法](https://www.oomspot.com/post/gxindezidonghuajiqixuexifangfa)
- [AWS自动化机器学习-零、前言](https://www.oomspot.com/post/szidonghuajiqixuexilingqianyan)

更多推荐

更多
  • AWS自动化机器学习-十一、MLSDLC 的持续集成、部署和训练 技术要求,编纂持续集成阶段,管理持续部署阶段,管理持续训练,延伸,构建集成工件,构建测试工件,构建生产工件,自动化持续集成流程,回顾构建阶段,回顾测试阶段,审查部署和维护阶段,回顾应用用户体验,创建新的鲍鱼调查数据,回顾持续训练流程,清
    Apache CN

  • AWS自动化机器学习-六、使用 AWS 步骤函数自动化机器学习过程 技术要求,介绍 AWS 步骤功能,使用 Step 函数 Data Science SDK for CI/CD,建立 CI/CD 渠道资源,创建状态机,解决状态机的复杂性,更新开发环境,创建管道工件库,构建管道应用构件,部署 CI/CD
    Apache CN

  • AWS自动化机器学习-第三部分:优化以源代码为中心的自动化机器学习方法 本节将向您介绍整体 CI/CD 流程的局限性,以及如何将 ML 从业者的角色进一步整合到管道构建流程中。本节还将介绍这种角色集成如何简化自动化过程,并通过向您介绍 AWS Step 函数向您展示一种优化的方法。本节包括以下章节:
    Apache CN

  • AWS自动化机器学习-一、AWS 上的自动化机器学习入门 技术要求,洗钱流程概述,洗钱过程的复杂性,端到端 ML 流程示例,AWS 如何使 ML 开发和部署过程更容易自动化,介绍 ACME 渔业物流,ML 的情况,从数据中获得洞察力,建立正确的模型,训练模型,评估训练好的模型,探索可能的后续步
    Apache CN

  • AWS自动化机器学习-二、使用 SageMaker 自动驾驶器自动化机器学习模型开发 技术要求,介绍 AWS AI 和 ML 前景,SageMaker 自动驾驶器概述,利用 SageMaker 自动驾驶器克服自动化挑战,使用 SageMaker SDK 自动化 ML 实验,SageMaker Studio 入门,准备实验
    Apache CN

  • AWS自动化机器学习-四、机器学习的持续集成和持续交(CI/CD) 四、机器学习的持续集成和持续交CI/CD技术要求,介绍 CI/CD 方法,通过 CI/CD 实现 ML 自动化,在 AWS 上创建 CI/CD 管道,介绍 CI/CD 的 CI 部分,介绍 CI/CD 的 CD 部分,结束循环,采取以部
    Apache CN

  • AWS自动化机器学习-九、使用 Amazon Managed Workflows 为 Apache AirFlow 构建 ML 工作流 技术要求,开发以数据为中心的工作流程,创建合成鲍鱼调查数据,执行以数据为中心的工作流程,构建和单元测试数据 ETL 工件,构建气流 DAG,清理, 在前面的年龄计算器示例中,我们了解了如何通过 ML 从业者和开发人员团队之间的跨职能
    Apache CN

  • AWS自动化机器学习-七、使用 AWS 步骤函数构建 ML 工作流 技术要求,构建状态机工作流,执行集成测试,监控管道进度,设置服务权限,创建 ML 工作流程, 在本章中,我们将从第六章中的 [处继续,使用 AWS 步骤函数自动化机器学习过程。您将从那一章中回忆起,我们正在努力实现的主要目标是简化
    Apache CN

  • AWS自动化机器学习-八、使用 Apache Airflow 实现机器学习过程的自动化 技术要求,介绍阿帕奇气流,介绍亚马逊 MWAA,利用气流处理鲍鱼数据集,配置 MWAA 系统的先决条件,配置 MWAA 环境, 当建立一个 ML 模型时,有一个所有 ML 从业者都知道的基本原则;也就是说,最大似然模型只有在数据被训练时
    Apache CN

  • AWS自动化机器学习-五、自动化 ML 模型的持续部署 技术要求,部署 CI/CD 管道,构建 ML 模型工件,执行自动化 ML 模型部署,整理管道结构,创建 CDK 应用,部署管道应用,查看建模文件,审查申请文件,查看模型服务文件,查看容器构建文件,提交 ML 工件,清理, 在 [第 4
    Apache CN

  • 近期文章

    更多
    文章目录

      推荐作者

      更多