AWS自动化机器学习-七、使用 AWS 步骤函数构建 ML 工作流

作者: Apache CN

在本章中,我们将从第六章中的 [处继续,使用 AWS 步骤函数*自动化机器学习过程。您将从那一章中回忆起,我们正在努力实现的主要目标是简化最初在第四章]过程。

因此,在本章中,我们将创建一个处理过程,该过程创建训练和测试数据集,训练一个 ML 模型,然后评估该模型的预测质量,评估它是否可以部署到生产中。正如您将看到的,自动化流程将被编码为 CI/CD 管道工件,使用Amazon Web Services(AWS)Step Functions Data ScienceSoftware Development Kit(SDK)for Python,并从 ML 从业者的角度进行开发,不需要提升开发团队成员的专业领域外的能力。

一旦我们将 E2E ML 流程编码为 AWS Step Functions 状态机,我们将通过将 ML 从业者的建模和工作流资产集成到之前构建的 CI/CD 管道中,继续自动化年龄计算器用例。为此,我们将在本章中讨论以下主题:

  • 构建状态机工作流
  • 执行集成测试
  • 监控管道的进度

技术要求

要理解本章中的代码示例,您需要以下内容:

  • Web 浏览器(为了获得最佳体验,建议您使用 Chrome 或 Firefox 浏览器)。
  • 访问我们在 [第 6 章] B17649_06_ePub.xhtml##_idTextAnchor094
  • 访问我们在 [第六章] B17649_06_ePub.xhtml##_idTextAnchor094
  • 我们将再次在 AWS 免费层的使用限制内工作,以避免产生不必要的成本。
  • 访问我们在 [第二章] B17649_02_ePub.xhtml##_idTextAnchor032
  • 本章的配套 GitHub 资源库 B17649_04_ePub.xhtml##_idTextAnchor061

构建状态机工作流

从 [第六章] B17649_06_ePub.xhtml##_idTextAnchor094

因此,在整个流程的这个阶段,ML 从业者必须使用 AWS Step Functions Data Science SDK for Python 创建这些源工件来构建 ML 工作流。因此,我们将视角切换到 ML 从业者的视角,并使用 SageMaker Studio 用户界面 ( UI )构建这些源工件。

设置服务权限

在我们开始定义 Jupyter 笔记本中的状态机工作流之前,我们需要为 SageMaker 执行角色分配额外的访问权限,以适应 Data Science SDK。根据 SDK 文档(https://AWS-Step-Functions-data-science-SDK . readthe docs . io/en/stable/readme link . html ## AWS-permissions),在 SageMaker Studio 中使用 SDK 不需要 Step Functions 服务所需权限之外的任何额外的身份和访问管理 ( IAM )权限。例如,如果我们要在工作流中使用 AWS Lambda 函数,我们需要向 SageMaker 执行角色添加 AWS Lambda 权限。

然而,您可能还记得在 [第二章] B17649_02_ePub.xhtml##_idTextAnchor032

创建 ML 工作流程

由于 Data Science SDK 主要是为了在 Jupyter 笔记本中执行而创建的,因此我们将使用以下步骤将 ML 流程编写成笔记本:

  1. 使用亚马逊 SageMaker 管理控制台(【https://console.aws.amazon.com/sagemaker/home】T2,点击左侧导航面板中的 SageMaker 域选项,打开 SageMaker 域仪表盘。
  2. 在左侧 Studio 导航面板中的abalone-cicd-pipeline文件夹内,这意味着我们在 [第五章] B17649_05_ePub.xhtml#idTextAnchor078)中克隆的存储库连续部署的一个生产 ML 模型还没有被删除。由于我们在本章中使用了相同的存储库名称,只需右键单击abalone-cicd-pipeline文件夹,然后单击删除,这样我们就可以添加我们在 [第 6 章](B17649_06_ePub.xhtml#idTextAnchor094
  3. 启动 Studio UI 后,点击左侧边栏中 Git 图标上的,然后点击 Clone a Repository 按钮,如下图所示: ![Figure 7.1 – Clone a Repository ]

图 7.1–克隆存储库

  1. https://git-codecommit.<AWS Region>.amazonaws.com/v1/repos/abalone-cicd-pipeline中,其中<AWS Region>是您正在使用的当前区域。例如,如果您使用us-west-2作为当前地区,那么 URL 将是https://git-codecommit.us-west-2.amazonaws.com/v1/repos/abalone-cicd-pipeline
  2. 单击克隆按钮克隆管道存储库。
  3. 一旦库被克隆,在 Studio 导航面板中双击文件夹打开abalone-cicd-pipeline文件夹。
  4. 现在,再次点击 Git 图标,打开作为当前存储库的文件夹。
  5. 点击当前分支旁边的下拉箭头,点击新建分支按钮,如下图所示: ![Figure 7.2 – New Branch ]

图 7.2–新分支

  1. 在中把model作为新的分支机构名称,点击创建分支机构,如下截图所示: ![Figure 7.3 – Create a Branch dialog ]

图 7.3–创建分支对话框

注意

关于在 Studio UI 中克隆 Git 存储库的更多信息,请参见 SageMaker 文档(https://docs . AWS . Amazon . com/SageMaker/latest/DG/Studio-tasks-Git . html)。

  1. 现在,回到abalone-cicd-pipeline文件夹,删除现有内容。
  2. 在文件夹内点击右键,选择notebook,双击打开。
  3. 点击文件菜单,然后点击新建菜单选项,再点击笔记本,创建一个新的 Jupyter 笔记本。
  4. 出现提示时,确保选择 Python 3(数据科学)内核,点击选择按钮。
  5. Once the new notebook has been launched and the kernel has been started, we can go ahead and create our workflow, using the subsequent code. 注意 在配套的 GitHub 资源库(https://GitHub . com/packt publishing/Automated-Machine-Learning-on-AWS/blob/main/chapter 07/Notebook/鲍鱼% 20 step % 20 functions % 20 workflow % 20 example . ipynb)中提供了笔记本的示例。由于以下代码示例显示了完整的 Jupyter 笔记本单元格,以及部分代码片段,因此建议您将此示例笔记本用作参考。
  6. In the first code cell, enter the following code to install the necessary Data Science SDK and SageMaker SDK:
    %%capture
    !pip install stepfunctions==2.2.0 sagemaker==2.49.1
    
    注意 我们对stepfunctionssagemaker库的版本进行了硬编码,因为它们已经过测试,可以在示例的上下文中工作。
  7. 您可能还记得第六章 、中的构建管道应用工件部分,使用 AWS 步骤函数自动化机器学习过程,我们将 ML 工作流编排构建为一个名为workflow_build的代码构建项目。现在,我们将通过使用以下代码实例化 CodeBuild 环境变量来创建将在 CodeBuild 环境中执行的运行时流程。当我们稍后在笔记本中开始使用这些环境变量时,它们将变得有意义;然而,从下面的代码片段中可以看到,我们正在调用系统管理器参数存储 ( SSM )来获取简单存储服务 ( S3 )桶变量,该变量是在 CDK 应用
    import os
    import boto3
    os.environ["MODEL_NAME"] = "abalone"
    os.environ["PIPELINE_NAME"] = "abalone-cicd-pipeline"
    os.environ["BUCKET_NAME"] = f"""{boto3.client("ssm").get_parameter(Name="PipelineBucketName")["Parameter"]["Value"]}"""
    os.environ["DATA_PREFIX"] = "abalone_data"
    os.environ["EPOCHS"] = "200"
    os.environ["BATCH_SIZE"] = "8"
    os.environ["THRESHOLD"] = "2.1"
    
    中定义的
  8. In the next cell, we are going to build a custom cell magic. In previous examples, we've used a %%writefile magic to capture the cell contents to a file. However, the %%writefile magic does not execute the cell contents. Since we are building and testing a workflow creation script, the following code will create a custom magic called %%custom_writefile, whereby we capture the cell contents to a file, as well as run the contents:
    from IPython.core.magic import register_cell_magic
    @register_cell_magic
    def custom_writefile(line, cell):
        print("Writing ".format(line.split()[0]))
        with open(line.split()[0], "a") as f:
            f.write(cell)
        print("Running Cell")
        get_ipython().run_cell(cell)
    
    注意

    defining-custom-magics](https://ipython.readthedocs.io/en/stable/config/custommagics.html#defining-custom-magics))。使用这种方法的唯一缺点是,如果我们在一个特定的单元格中犯了编码错误,我们必须删除%%custom_writefile创建的任何文件,并从笔记本的开头重新开始。

  9. 接下来,我们创建一个名为workflow的文件夹,其中定义了一个构建脚本来创建工作流。在新的单元格中输入并执行以下代码:
    !mkdir ../workflow
    
  10. 现在,我们可以开始构建我们的主要脚本,称为main.py。在一个新的代码单元中,添加以下代码,开始导入和捕获各种 Python 库来构建工作流:
    %%custom_writefile ../workflow/main.py
    import io
    import os
    import random
    import time
    import uuid
    import boto3
    import botocore
    import zipfile
    import json
    from time import gmtime, strftime, sleep
    from botocore.exceptions import ClientError
    
  11. 接下来,我们加载数据科学 SDK 所需的库,如下:
    %%custom_writefile ../workflow/main.py
    import stepfunctions
    from stepfunctions import steps
    from stepfunctions.inputs import ExecutionInput
    from stepfunctions.steps import (
        Chain,
        ChoiceRule,
        ModelStep,
        ProcessingStep,
        TrainingStep,
        TuningStep,
        TransformStep,
        Task,
        EndpointConfigStep,
        EndpointStep,
        LambdaStep
    )
    from stepfunctions.template import TrainingPipeline
    from stepfunctions.template.utils import replace_parameters_with_jsonpath
    from stepfunctions.workflow import Workflow
    
  12. 最后,我们从 SageMaker SDK 加载库,如下:
    %%custom_writefile ../workflow/main.py
    import sagemaker
    from sagemaker.tensorflow import TensorFlow
    from sagemaker.tuner import IntegerParameter, ContinuousParameter, HyperparameterTuner
    from sagemaker import get_execution_role
    from sagemaker.amazon.amazon_estimator import get_image_uri
    from sagemaker.processing import ProcessingInput, ProcessingOutput, Processor
    from sagemaker.s3 import S3Uploader
    from sagemaker.sklearn.processing import SKLearnProcessor
    
  13. 在导入相关的 Python 库之后,我们使用下面的代码来定义到工作流中使用的各种 AWS 服务的连接:
    %%custom_writefile ../workflow/main.py
    sagemaker_session = sagemaker.Session()
    region = sagemaker_session.boto_region_name
    role = get_execution_role()
    sfn_client = boto3.client("stepfunctions")
    lambda_client = boto3.client("lambda")
    codepipeline_client = boto3.client("codepipeline")
    ssm_client = boto3.client("ssm")
    
  14. 既然我们已经加载了所需的库,我们将定义一些助手函数。我们需要的第一个助手函数是get_execution_role()函数。以下代码定义了此函数,以获取作为 CDK 应用的一部分创建的工作流执行角色的 SSM 参数:
    %%custom_writefile ../workflow/main.py
    def get_workflow_role():
        try:
            response = ssm_client.get_parameter(
                Name="WorkflowRoleParameter",
            )
            return response["Parameter"]["Value"]
        except ClientError as e:
            error_message = e.response["Error"]["Message"]
            print(error_message)
            raise Exception(error_message)
    
  15. 下面的代码定义了第二个函数,称为update_lambda()。该函数将更新任何 AWS Lambda 函数的代码,如果该函数已经存在:
    %%custom_writefile ../workflow/main.py
    def update_lambda(name, zip_name):
        lambda_client.update_function_code(
            FunctionName=name,
            ZipFile=open(zip_name, mode="rb").read(),
            Publish=True
        )
    
  16. 下一个助手函数叫做get_lambda()。这个函数获取任何已定义的 AWS Lambda 代码,压缩它,并创建一个新的 Lambda 函数。如果λ函数已经存在,get_lambda()将调用update_lambda()用更新后的代码更新现有的λ函数。代码如下面的代码片段所示:
    %%custom_writefile ../workflow/main.py
    def get_lambda(name, bucket, description):
        print("Creating Lambda Package ")
        zip_name = f"../artifacts/.zip"
        lambda_src = f"../artifacts/.py"
        z = zipfile.ZipFile(zip_name, mode="w")
        z.write(lambda_src, arcname=lambda_src.split("/")[-1])
        z.close()
        print("Uploading Lambda Package to S3 ")
        S3Uploader.upload(
            local_path=zip_name,
            desired_s3_uri=f"s3:///lambda",
        )
        try:
            print(f"Creating Lambda Function '' …")
            lambda_client.create_function(
                FunctionName=name,
                Runtime="python3.8",
                Role=get_workflow_role(),
                Handler=f".lambda_handler",
                Code={
                    "S3Bucket": bucket,
                    "S3Key": f"lambda/.zip"
                },
                Description=description,
                Timeout=120,
                MemorySize=128
            )
        except ClientError as e:
            print(f"Lambda Function '' already exists, re-creating ...")
            update_lambda(name, zip_name)
        return name
    
  17. The final helper function we will define is called get_execution_id(). This function calls CodePipeline to get the identifier (ID) of the current execution. You will recall that for this, we will be versioning the workflow execution, and thus pipeline assets, based on the current execution ID. If there is no execution ID, then we will use the current time as a versioning ID. The code is illustrated in the following snippet:
    %%custom_writefile ../workflow/main.py
    def get_execution_id(name=None):
        try:
            response = codepipeline_client.get_pipeline_state(name=name)
            for stage in response["stageStates"]:
                if stage["stageName"] == "Build":
                    for action in stage["actionStates"]:
                        if action["actionName"] == "BuildModel":
                            return stage["latestExecution"]["pipelineExecutionId"]
        except KeyError:
            return strftime('%Y%m%d%H%M%S', gmtime())
    
    注意 SageMaker 希望每个作业、训练模型和端点都有唯一的名称。如果这些名称不唯一,执行将会失败。因此,如果我们对工作流代码进行单元测试,而不是将其作为 CI/CD 管道的一部分运行,那么我们需要为 SageMaker 作业提供一个唯一的版本。
  18. 现在我们已经创建了助手函数,我们可以继续声明额外的参数,这些参数对于工作流本身是特定的,对于工作流执行也是唯一的。下面的代码定义了唯一的工作流参数:
    %%custom_writefile ../workflow/main.py
    execution_id = get_execution_id(name=os.environ["PIPELINE_NAME"])
    model = os.environ["MODEL_NAME"]
    data_prefix = os.environ["DATA_PREFIX"]
    model_prefix = execution_id
    bucket_name = os.environ["BUCKET_NAME"]
    model_name = f"-{execution_id}"
    training_job_name = f"-TrainingJob-{execution_id}"
    preprocessing_job_name = f"-ProcessingJob-{execution_id}"
    evaluation_job_name = f"-EvaluationJob-{execution_id}"
    deeplearning_container_image = f"763104351884.dkr.ecr.{region}.amazonaws.com/tensorflow-training:2.5.0-cpu-py37-ubuntu18.04-v1.0"
    
  19. 接下来,我们将执行参数定义为一个ExecutionInput()模式。模式定义了参数的类型,这些参数将被提供来启动工作流的执行。代码如下面的代码片段所示:
    %%custom_writefile ../workflow/main.py
    execution_input = ExecutionInput(
        schema={
            "ModelName": str,
            "PreprocessingJobName": str,
            "TrainingJobName": str,
            "EvaluationProcessingJobName": str
        }
    )
    
  20. 我们定义的最后一组参数指定了数据配置。这里,我们定义了一个 S3 位置来获取原始鲍鱼数据,以及一个 S3 位置来获取处理后的数据:
    %%custom_writefile ../workflow/main.py
    s3_bucket_base_uri = f"s3://"
    input_data = os.path.join(s3_bucket_base_uri,  data_prefix, "raw/abalone.data")
    output_data = os.path.join(s3_bucket_base_uri, data_prefix)
    preprocessed_training_data = os.path.join(output_data, "input", "training")
    preprocessed_testing_data = f"/testing"
    model_data_s3_uri = f"{s3_bucket_base_uri}/{model_prefix}/{training_job_name}/output/model.tar.gz"
    output_model_evaluation_s3_uri = f"{s3_bucket_base_uri}/{model_prefix}/{training_job_name}/evaluation"
    

既然我们已经定义了所需的全局变量、辅助函数和总体工作流参数,下一步就是编写工作流本身。以下步骤将指导您如何创建组成工作流的步骤:

  1. 工作流程的第一步是将原始鲍鱼数据作为 SageMaker 处理任务进行处理。然而,在定义处理步骤之前,我们需要为 SageMaker 提供一个处理脚本。下面的代码单元创建了一个artifacts文件夹来存储各种脚本工件:
    !mkdir ../artifacts
    
  2. Now, we use the following code to capture the preprocessing.py processing artifact script:
    %%writefile ../artifacts/preprocessing.py
    import os
    import pandas as pd
    import numpy as np
    prefix = "/opt/ml"
    processing_path = os.path.join(prefix, "processing")
    preprocessing_input_path = os.path.join(processing_path, "input")
    preprocessing_output_path = os.path.join(processing_path, "output")
    if __name__ == "__main__":
        print("Preprocessing Data")
        column_names = ["sex", "length", "diameter", "height", "whole_weight", "shucked_weight", "viscera_weight", "shell_weight", "rings"]
        data = pd.read_csv(os.path.join(preprocessing_input_path, "abalone.data"), names=column_names)
        y = data.rings.values.reshape(len(data), 1)
        del data["rings"]
        print("Creating Catagorical Features")
        data = pd.get_dummies(data).to_numpy()
        X = np.concatenate((y, data), axis=1)
        print("Splitting Data into Training, Validation and, Test Datasets")
        training, validation, testing = np.split(X, [int(.8*len(X)), int(.95*len(X))])
        pd.DataFrame(training).to_csv(os.path.join(preprocessing_output_path, "training/training.csv"), header=False, index=False)
        pd.DataFrame(validation).to_csv(os.path.join(preprocessing_output_path, "training/validation.csv"), header=False, index=False)
        pd.DataFrame(testing).to_csv(os.path.join(preprocessing_output_path, "testing/testing.csv"), header=False, index=False)
        print("Done!")
    
    注意 如您所见,该脚本包含了我们在本书中一直用于鲍鱼数据集的相同数据处理方法。
  3. 现在我们有了处理作业的脚本,我们可以定义一个工作流步骤定义来调用 SageMaker 和在工作流中作为任务执行处理作业。在下面的代码片段中,我们开始将processing_step定义为一个ProcessingStep()状态机:
    ...
    %%custom_writefile ../workflow/main.py
    processing_step = ProcessingStep(
        "Pre-process Data",
    ...
    
  4. 接下来,我们将处理作业的类型指定为SKLearnProcessor()以及用于处理作业的计算资源的类型,如下:
    ...
        processor=SKLearnProcessor(
            framework_version="0.23-1",
            role=role,
            instance_type="ml.m5.xlarge",
            instance_count=1,
            max_runtime_in_seconds=1200,
        ),
        job_name=execution_input["PreprocessingJobName"],
    ...
    
  5. 如下面的代码片段所示,我们现在为处理作业指定输入数据的位置,以及我们在步骤 2 :
    ...
        inputs=[
            ProcessingInput(
                source=input_data,
                destination="/opt/ml/processing/input",
                input_name="input"
            ),
            ProcessingInput(
                source=sagemaker_session.upload_data(
                    path="../artifacts/preprocessing.py",
                    bucket=bucket_name,
                    key_prefix=os.path.join(data_prefix, "code")
                ),
                destination="/opt/ml/processing/input/code",
                input_name="code"
            )
        ],
    ...
    
    中创建的processing.py脚本的位置
  6. 一旦定义了输入,我们就可以定义输出。在下面的代码片段中,我们为trainingtesting数据定义了输出位置。这些数据集将被存储在我们在前面的步骤 7 中定义的 S3 桶中:
    ...
        outputs=[
            ProcessingOutput(
               source="/opt/ml/processing/output/training",
               destination=os.path.join(output_data, "input", "training"),
               output_name="training"
            ),
            ProcessingOutput(
                source="/opt/ml/processing/output/testing",
                destination=os.path.join(output_data, "testing"),
                output_name="testing"
            )
        ],
    ...
    
  7. 如下面的代码片段所示,processing_step状态机的最后一部分是将preprocessing.py脚本指定为processing_step :
    ...
        container_entrypoint=["python3", "/opt/ml/processing/input/code/preprocessing.py"],
    )
    ...
    
    的执行入口点
  8. 一旦数据得到处理,我们就可以进入工作流程的下一步,在这里我们训练模型。在这一步中,我们将调用 SageMaker 运行一个训练作业。在定义工作流步骤之前,我们需要向 SageMaker 提供模型训练代码作为工件。下面的代码片段展示了如何创建一个model_training.py工件。如您所见,我们使用与前面示例相同的方法定义了一个训练张量流模型的过程:
    ...
    if __name__ == "__main__":
        print(f"Tensorflow Version: ")
        column_names = ["rings", "length", "diameter", "height", "whole weight", "shucked weight", "viscera weight", "shell weight", "sex_F", "sex_I", "sex_M"]
        parser = argparse.ArgumentParser()
        parser.add_argument('--epochs', type=int, default=2)
        parser.add_argument('--batch-size', type=int, default=8)
        parser.add_argument('--model-dir', type=str, default=os.environ['SM_MODEL_DIR'])
        parser.add_argument('--training', type=str, default=os.environ['SM_CHANNEL_TRAINING'])
        args, _ = parser.parse_known_args()
        epochs = args.epochs
        batch_size = args.batch_size
        training_path = args.training
        model_path = args.model_dir
        train_data = pd.read_csv(os.path.join(training_path, 'training.csv'), sep=',', names=column_names)
        val_data = pd.read_csv(os.path.join(training_path, 'validation.csv'), sep=',', names=column_names)
        train_y = train_data['rings'].to_numpy()
        train_X = train_data.drop(['rings'], axis=1).to_numpy()
        val_y = val_data['rings'].to_numpy()
        val_X = val_data.drop(['rings'], axis=1).to_numpy()
        train_X = preprocessing.normalize(train_X)
        val_X = preprocessing.normalize(val_X)
        network_layers = [Dense(64, activation="relu", kernel_initializer="normal", input_dim=10), Dense(64, activation="relu"), Dense(1, activation="linear")]
        model = Sequential(network_layers)
        model.compile(optimizer='adam', loss='mse', metrics=['mae', 'accuracy'])
        model.summary()
        model.fit(train_X, train_y, validation_data=(val_X, val_y), batch_size=batch_size, epochs=epochs, shuffle=True, verbose=1)
        model.save(os.path.join(model_path, 'model.h5'))
        model_version = 1
        export_path = os.path.join(model_path, str(model_version))
        tf.keras.models.save_model(model, export_path, overwrite=True, include_optimizer=True, save_format=None, signatures=None, options=None)
    ...
    
  9. 既然已经创建了训练工件,我们定义一个training_step工作流步骤来定义一个工作流任务TrainingStep()的实例。作为任务的一部分,我们指定训练脚本的位置、超参数、作业名称、要使用的计算资源的类型以及已处理训练数据的 S3 位置。代码如下面的代码片段所示:
    %%custom_writefile ../workflow/main.py
    training_step = TrainingStep(
        "Model Training",
        estimator=TensorFlow(
            entry_point='../artifacts/model_training.py',
            role=role,
            hyperparameters={
                'epochs': int(os.environ['EPOCHS']),
                'batch-size': int(os.environ['BATCH_SIZE']),
            }, 
            train_instance_count=1,
            train_instance_type='ml.m5.xlarge',
            framework_version='2.4',
            py_version="py37",
            script_mode=True,
            output_path=os.path.join(s3_bucket_base_uri, model_prefix)
        ),
        data={"training": sagemaker.inputs.TrainingInput(preprocessed_training_data, content_type="csv")},
        job_name=execution_input["TrainingJobName"],
        wait_for_completion=True,
    )
    
  10. 在模型被训练之后,我们需要评估它是否有资格成为生产级模型。我们将定义一个 SageMaker 处理作业来执行评估,正如我们对数据处理任务所做的那样,我们定义了一个名为evaluate.py的脚本工件。这个工件将加载训练好的模型,加上测试数据集,并将模型推理输出捕获到 S3 的一个evaluation.json文件中。下面的代码创建了一个工件,并为评估加载了必要的 Python 库:
    %%writefile ../artifacts/evaluate.py
    import json
    import os
    import tarfile
    import pandas as pd
    import numpy as np
    import tensorflow as tf
    from tensorflow import keras
    from tensorflow.keras.models import Sequential
    from tensorflow.keras.layers import Dense
    from tensorflow.keras.optimizers import Adam
    from sklearn import preprocessing
    
  11. 接下来,我们定义一个 Python 函数来加载和编译训练好的 TensorFlow 模型,如下:
    %%writefile -a ../artifacts/evaluate.py
    def load_model(model_path):
        model = tf.keras.models.load_model(os.path.join(model_path, 'model.h5'))
        model.compile(optimizer='adam', loss='mse')
        return model
    
  12. 现在,我们将另一个函数添加到工件中,以从加载的模型中获取推论,并将结果存储在 S3 中,如下:
    %%writefile -a ../artifacts/evaluate.py
    def evaluate_model(prefix, model):
        column_names = ["rings", "length", "diameter", "height", "whole weight", "shucked weight",
                        "viscera weight", "shell weight", "sex_F", "sex_I", "sex_M"]
        input_path = os.path.join(prefix, "processing/testing")
        output_path = os.path.join(prefix, "processing/evaluation")
        predictions = []
        truths = []
        test_df = pd.read_csv(os.path.join(input_path, "testing.csv"), names=column_names)
        y = test_df['rings'].to_numpy()
        X = test_df.drop(['rings'], axis=1).to_numpy()
        X = preprocessing.normalize(X)
        for row in range(len(X)):
            payload = [X[row].tolist()]
            result = model.predict(payload)
            print(result[0][0])
            predictions.append(float(result[0][0]))
            truths.append(float(y[row]))
        report = {
            "GroundTruth": truths,
            "Predictions": predictions
        }
        with open(os.path.join(output_path, "evaluation.json"), "w") as f:
            f.write(json.dumps(report))
    
  13. 最后,我们将主程序附加到上执行求值,如下:
    %%writefile -a ../artifacts/evaluate.py
    if __name__ == "__main__":
        print("Extracting model archive ...")
        prefix = "/opt/ml"
        model_path = os.path.join(prefix, "model")
        tarfile_path = os.path.join(prefix, "processing/model/model.tar.gz")
        with tarfile.open(tarfile_path) as tar:
            tar.extractall(path=model_path)
        print("Loading Trained Model ...")
        model = load_model(model_path)
        print("Evaluating Trained Model ...")
        evaluate_model(prefix, model)
        print("Done!")
    
  14. As with the data processing step, we use the following code to define another ProcessingStep() workflow to execute the evaluation.py artifact, as follows:
    %%custom_writefile ../workflow/main.py
    evaluation_step = ProcessingStep(
        "Model Evaluation",
        processor=Processor(
            image_uri=deeplearning_container_image,
            instance_count=1,
            instance_type="ml.m5.xlarge",
            role=role,
            max_runtime_in_seconds=1200
        ),
        job_name=execution_input["EvaluationProcessingJobName"],
        inputs=[
            ProcessingInput(
                source=preprocessed_testing_data,
                destination="/opt/ml/processing/testing",
                input_name="input"
            ),
            ProcessingInput(
                source=model_data_s3_uri,
                destination="/opt/ml/processing/model",
                input_name="model"
            ),
            ProcessingInput(
                source=sagemaker_session.upload_data(
                    path="../artifacts/evaluate.py",
                    bucket=bucket_name,
                    key_prefix=os.path.join(data_prefix, "code")
                ),
                destination="/opt/ml/processing/input/code",
                input_name="code"
            )
        ],
        outputs=[
            ProcessingOutput(
                source="/opt/ml/processing/evaluation",
                destination=output_model_evaluation_s3_uri,
                output_name="evaluation"
            )
        ],
        container_entrypoint=["python3", "/opt/ml/processing/input/code/evaluate.py"]
    )
    
    注意 SageMaker 处理作业本身支持使用scikit-learn Python 库处理数据。由于我们正在评估一个 TensorFlow 模型,该模型本身不受支持,因此我们利用 TensorFlow 训练参数image_uriProcessingStep()状态机中执行模型评估。
  15. 因为我们已经在evaluation.json文件中捕获了推理结果,所以我们需要根据评估标准来评估结果。为此,我们将使用 AWS Lambda 函数。下面的代码片段显示了lambda_handler()的定义,作为一个名为analyze_results.py :
    ...
    def lambda_handler(event, context):
    

Environment Variables ##“)

    logger.debug(os.environ)

Event ##“)

    logger.debug(event)
    s3 = boto3.client("s3")
    if ("Bucket" in event):
        bucket = event["Bucket"]
    else:
        raise KeyError("S3 'Bucket' not found in Lambda event!")
    if ("Key" in event):
        key = event["Key"]
    else:
        raise KeyError("S3 'Key' not found in Lambda event!")
    logger.info("Downloading evlauation results file ...")
    json_file = json.loads(s3.get_object(Bucket = bucket, Key = key)['Body'].read())
    logger.info("Analyzing Model Evaluation Results ...")
    y = json_file["GroundTruth"]
    y_hat = json_file["Predictions"]
    summation = 0
    for i in range (0, len(y)):
        squared_diff = (y[i] - y_hat[i])**2
        summation += squared_diff
    rmse = math.sqrt(summation/len(y))
    logger.info("Root Mean Square Error: ".format(rmse))
    logger.info("Done!")
    return {
        "statusCode": 200,
        "Result": rmse,
    }
...
    的工件
16.  为了将 Lambda 函数作为工作流中的一个步骤来运行,我们定义了一个`LambdaStep()`函数,并使用助手函数来创建一个 Lambda,如下所示:
%%custom_writefile ../workflow/main.py
analyze_results_step = LambdaStep(
    "Analyze Evaluation Results",
    parameters={
        "FunctionName": get_lambda(
            "analyze_results",
            bucket_name,
            "Analyze the results from the Model Evaluation"
        ),
        "Payload": {
            "Bucket": bucket_name,
            "Key": f"""{model_prefix}/{training_job_name}/evaluation/evaluation.json"""
        }
    }
)
17.  工作流中的最后一个任务是将训练好的模型注册为 SageMaker 模型。这是将在 CI/CD 管道的 CD 阶段作为托管端点部署的模型。下面的代码创建了一个`ModelStep()`函数,并从`TrainingStep()`工作流任务中指向训练好的模型:
%%custom_writefile ../workflow/main.py
register_model_step = ModelStep(
    "Register Trained Model",
    model=training_step.get_expected_model(),
    model_name=execution_input["ModelName"],
    instance_type="ml.m5.large"
)
18.  现在我们已经有了工作流的所有步骤,我们需要将它们放在一起并定义状态机的流。为了做到这一点,我们将从工作流执行的各种最终结果开始逆向工作来构建工作流。在下面的代码片段中,我们定义了失败的工作流的结果状态,并将每个步骤连接到失败状态,如果它们失败的话:
%%custom_writefile ../workflow/main.py
workflow_failed_state = stepfunctions.steps.states.Fail(
    "ML Workflow Failed", cause="SageMakerProcessingJobFailed"
)
catch_state = stepfunctions.steps.states.Catch(error_equals=["States.TaskFailed"], next_step=workflow_failed_state)
processing_step.add_catch(catch_state)
training_step.add_catch(catch_state)
evaluation_step.add_catch(catch_state)
analyze_results_step.add_catch(catch_state)
register_model_step.add_catch(catch_state)
If the trained model fails the evaluation—or, in other words, is above the threshold we establish for a production-grade model—the workflow should also fail. In the next code snippet, we define a failure state for the model exceeding the evaluation criteria:
%%custom_writefile ../workflow/main.py
threshold_fail_state = stepfunctions.steps.states.Fail(
    "Model Evaluation Exceeds Threshold"
)
Along with declaring the final failure states of the workflow, we also need to create a final state whereby the model's evaluation determines it to be below the evaluation threshold, and therefore a production-grade model. The following code snippet defines this Pass() state:
%%custom_writefile ../workflow/main.py
threshold_pass_state = stepfunctions.steps.states.Pass(
    "Model Evaluation Below Threshold"
)
19.  为了确定模型评估是高于还是低于评估标准,我们定义了一个`Choice()`状态并配置了一个`ChoiceRule()`函数来确定`analyze_results_step`任务的输出是否小于`THRESHOLD`变量,如下:
%%custom_writefile ../workflow/main.py
check_threshold_step = steps.states.Choice(
    "Threshold Evaluation Check"
)
threshold_rule = steps.choice_rule.ChoiceRule.NumericLessThan(
    variable=analyze_results_step.output()['Payload']['Result'],
    value=float(os.environ["THRESHOLD"])
)
check_threshold_step.add_choice(rule=threshold_rule, next_step=threshold_pass_state)
check_threshold_step.default_choice(next_step=threshold_fail_state)
20.  我们刚刚创建了 ML 工作流的所有步骤和状态,以及各个步骤将使用的支持工件。创建我们的工作流程的最后一个部分是把它们放在一起。下面的代码将各个步骤链接在一起,并创建了一个工作流图:
%%custom_writefile ../workflow/main.py
_graph = Chain(
    [
        processing_step,
        training_step,
        register_model_step,
        evaluation_step,
        analyze_results_step,
        check_threshold_step
    ]
)
21.  We now have our workflow defined, using the Data Science SDK. If we refer to *Figure 6.2* of [*Chapter 6*] B17649_06_ePub.xhtml##_idTextAnchor094
ml_workflow = Workflow(ml_workflow
    name="abalone-workflow-unit-test",
    definition=ml_workflow_graph,
    role=get_workflow_role(),
)
ml_workflow.create()
execution = ml_workflow.execute(
    inputs={
        "ModelName": model_name,
        "PreprocessingJobName": preprocessing_job_name,
        "TrainingJobName": training_job_name,
        "EvaluationProcessingJobName": evaluation_job_name,
    }
)
execution_output = execution.get_output(wait=True)
    注意
    在工作流上执行单元测试将在自由层之外产生额外的 AWS 资源成本。你可以放弃前一步,以避免额外的费用。
22.  基于单元测试过程的成功,对`main.py`脚本的最后添加是捕获流程以执行生产状态机。下面的代码将创建和`abalone-workflow`生产工作流,并为生产执行提供特定于执行的参数:
%%writefile -a ../workflow/main.py
print("Creating ML Workflow")
ml_workflow = Workflow(
    name="abalone-workflow",
    definition=ml_workflow_graph,
    role=get_workflow_role(),
)
try:
    print("Creating Step Functions State Machine")
    ml_workflow.create()
except sfn_client.exceptions.StateMachineAlreadyExists:
    print("Found Existing State Machine, Updating the State Machine definition")
else:
    ml_workflow.update(ml_workflow_graph)
    time.sleep(120)
print("Executing ML Workflow State Machine")
ml_workflow.execute(
    inputs={
        "ModelName": model_name,
        "PreprocessingJobName": preprocessing_job_name,
        "TrainingJobName": training_job_name,
        "EvaluationProcessingJobName": evaluation_job_name
    }
)
23.  这就完成了 ML 从业者对重构解决方案的贡献。剩下要做的就是将这些变更提交给存储库。为此,点击 **Git** 图标。
24.  点击**已变更**和**未跟踪**部分的加号( **+** )图标,将变更移动到**已实现**部分。
25.  在`Initial commit of Workflow Artifacts`中,如下图所示:
![Figure 7.4 – Staged changes
]

图 7.4–阶段性变化
1.  点击`model`分支。
2.  Once the changes have been committed, click **Git** from the menu bar, and select **Push to Remote**.
    注意
    如果出现提示,请提供您的电子邮件地址和姓名。

通过检入这些代码,我们,作为 ML 实践者,已经完成了我们对重构解决方案的贡献。我们已经使用数据科学 SDK 编写了一个 ML 工作流并创建了一个状态机,而不必使用 Amazon States 语言。以下截图显示了我们 ML 工作流程的图形表示:
![Figure 7.5 – ML workflow state machine
]

图 7.5–ML 工作流状态机

然而,即使状态机代码已经被检入,整个 CI/CD 管道仍然不会执行,因为我们还没有定义管道和状态机之间的集成。在下一部分中,我们将再次从开发工程师的角度执行最终的集成。

## 执行集成测试

为了完成 CI/CD 管道的发布,我们需要将 ML 从业者提交的代码集成到构建过程中。我们通过创建一个`buildspec.yml`文件向代码构建阶段提供构建指令来实现这一点。

注意

您可以在配套的 GitHub 资源库([https://GitHub . com/packt publishing/Automated-Machine-Learning-on-AWS/blob/main/chapter 07/Files/build spec . yml](https://github.com/PacktPublishing/Automated-Machine-Learning-on-AWS/blob/main/Chapter07/Files/buildspec.yml))中找到`buildspec.yml`文件的完整副本,以供参考。

以下步骤将从开发工程师的角度引导您完成集成过程:
1.  使用 Cloud9 环境,在终端窗口中运行下面的命令来获取 ML 从业者所做的最新更改:
$ cd ~/environment/abalone-cicd-pipeline/ && git pull
2.  通过运行以下命令切换到`model`分支:
$ git checkout model
3.  右键点击导航面板中的`abalone-cicd-pipeline`文件夹,选择**新建文件**。
4.  将文件命名为`buildspec.yml`,双击进行编辑。
5.  添加以下代码来声明加载必要的 Python 库和执行`main.py`脚本的指令:
version: 0.2
env:
  variables:
    DATA_PREFIX: abalone_data
    EPOCHS: 200
    BATCH_SIZE: 8
    THRESHOLD: 2.1
phases:
  install:
    runtime-versions:
      python: 3.8
    commands:
      - printenv
      - echo "Updating Build Environment"
      - apt-get update
      - python -m pip install --upgrade pip
      - python -m pip install --upgrade boto3 awscli sagemaker==2.49.1 stepfunctions==2.2.0
  build:
    commands:
      - echo Build started on 'date'
      - echo "Creating ML Workflow "
      - |
            sh -c """
            cd workflow/
            python main.py
            """
      post_build:
        commands:
          - echo "Build Completed"
  1. 保存文件。
  2. 在终端窗口中,运行以下命令将buildspec.yml文件添加到文件存储库文件索引:
    $ git add -A
    
  3. 通过运行以下命令将集成提交到 CodeCommit 存储库:
    $ git commit -m "Add Integration Artifacts"
    
  4. 现在,使用下面的命令将更改推送到存储库:
    $ git push
    

我们现在已经将工作流集成到 CI/CD 管道中,通过提交这些更改,我们还创建了一个管道版本。在下一节中,我们将监控管道。

监控管道进度

通过 CodePipeline 控制台监控管道的执行。在 web 浏览器中,打开 AWS 代码管道管理控制台(【https://console.aws.amazon.com/codesuite/codepipeline/home】的),然后点击管道名称— abalone-cicd-pipeline。下面的屏幕截图描述了管道执行: ![Figure 7.6 – CodePipeline console ]

图 7.6–代码管道控制台

如果您将图 7.6图 5.4[第 5 章] B17649_05_ePub.xhtml##_idTextAnchor078

要在新的 web 浏览器选项卡中查看状态机的进度,请打开 AWS 步骤功能管理控制台(https://console.aws.amazon.com/states)并选择abalone-workflow状态机。您将看到一个执行列表。单击最近的执行以查看其进度。下面的截图显示了成功执行: ![Figure 7.7 – Succeeded state machine execution ]

图 7.7–成功的状态机执行

正如您从图 7.7 中看到的,工作流已经成功完成,产生了一个低于预先建立的评估标准的经过训练的 ML 模型。要验证评估标准,点击图形检查器分析评估结果步骤上的,然后点击步骤输出选项卡。以下屏幕截图显示了模型评估的示例结果: ![Figure 7.8 – Analyze Evaluation Results: Step output ]

图 7.8-分析评估结果:步骤输出

这完成了管道的 CI 阶段,我们可以再次批准部署模型。

注意

请参见第 5 章 、生产 ML 模型的持续部署、中的执行自动化 ML 模型部署清理部分,了解如何批准模型并继续管道的 CD 阶段。

一旦 CodePipeline 的deployendppoint动作完成,我们就有了一个生产模型,可以集成到年龄计算器应用中,并提供鲍鱼年龄预测。

总结

在本章中,我们继续重构我们在第六章[] B17649_06_ePub.xhtml##_idTextAnchor094

_idTextAnchor094

然而,在这一章和前面的章节中,我们都将注意力集中在基于源代码变更发布生产级模型上。相反,CI/CD 流程如何适应训练数据的变化(或更新)?

在下一章中,我们将回顾当源数据发生变化时,如何使 ML 过程自动化。*

文章列表

更多推荐

更多
  • AWS自动化机器学习-十一、MLSDLC 的持续集成、部署和训练 技术要求,编纂持续集成阶段,管理持续部署阶段,管理持续训练,延伸,构建集成工件,构建测试工件,构建生产工件,自动化持续集成流程,回顾构建阶段,回顾测试阶段,审查部署和维护阶段,回顾应用用户体验,创建新的鲍鱼调查数据,回顾持续训练流程,清
    Apache CN

  • AWS自动化机器学习-六、使用 AWS 步骤函数自动化机器学习过程 技术要求,介绍 AWS 步骤功能,使用 Step 函数 Data Science SDK for CI/CD,建立 CI/CD 渠道资源,创建状态机,解决状态机的复杂性,更新开发环境,创建管道工件库,构建管道应用构件,部署 CI/CD
    Apache CN

  • AWS自动化机器学习-第三部分:优化以源代码为中心的自动化机器学习方法 本节将向您介绍整体 CI/CD 流程的局限性,以及如何将 ML 从业者的角色进一步整合到管道构建流程中。本节还将介绍这种角色集成如何简化自动化过程,并通过向您介绍 AWS Step 函数向您展示一种优化的方法。本节包括以下章节:
    Apache CN

  • AWS自动化机器学习-一、AWS 上的自动化机器学习入门 技术要求,洗钱流程概述,洗钱过程的复杂性,端到端 ML 流程示例,AWS 如何使 ML 开发和部署过程更容易自动化,介绍 ACME 渔业物流,ML 的情况,从数据中获得洞察力,建立正确的模型,训练模型,评估训练好的模型,探索可能的后续步
    Apache CN

  • AWS自动化机器学习-二、使用 SageMaker 自动驾驶器自动化机器学习模型开发 技术要求,介绍 AWS AI 和 ML 前景,SageMaker 自动驾驶器概述,利用 SageMaker 自动驾驶器克服自动化挑战,使用 SageMaker SDK 自动化 ML 实验,SageMaker Studio 入门,准备实验
    Apache CN

  • AWS自动化机器学习-四、机器学习的持续集成和持续交(CI/CD) 四、机器学习的持续集成和持续交CI/CD技术要求,介绍 CI/CD 方法,通过 CI/CD 实现 ML 自动化,在 AWS 上创建 CI/CD 管道,介绍 CI/CD 的 CI 部分,介绍 CI/CD 的 CD 部分,结束循环,采取以部
    Apache CN

  • AWS自动化机器学习-九、使用 Amazon Managed Workflows 为 Apache AirFlow 构建 ML 工作流 技术要求,开发以数据为中心的工作流程,创建合成鲍鱼调查数据,执行以数据为中心的工作流程,构建和单元测试数据 ETL 工件,构建气流 DAG,清理, 在前面的年龄计算器示例中,我们了解了如何通过 ML 从业者和开发人员团队之间的跨职能
    Apache CN

  • AWS自动化机器学习-七、使用 AWS 步骤函数构建 ML 工作流 技术要求,构建状态机工作流,执行集成测试,监控管道进度,设置服务权限,创建 ML 工作流程, 在本章中,我们将从第六章中的 [处继续,使用 AWS 步骤函数自动化机器学习过程。您将从那一章中回忆起,我们正在努力实现的主要目标是简化
    Apache CN

  • AWS自动化机器学习-八、使用 Apache Airflow 实现机器学习过程的自动化 技术要求,介绍阿帕奇气流,介绍亚马逊 MWAA,利用气流处理鲍鱼数据集,配置 MWAA 系统的先决条件,配置 MWAA 环境, 当建立一个 ML 模型时,有一个所有 ML 从业者都知道的基本原则;也就是说,最大似然模型只有在数据被训练时
    Apache CN

  • AWS自动化机器学习-五、自动化 ML 模型的持续部署 技术要求,部署 CI/CD 管道,构建 ML 模型工件,执行自动化 ML 模型部署,整理管道结构,创建 CDK 应用,部署管道应用,查看建模文件,审查申请文件,查看模型服务文件,查看容器构建文件,提交 ML 工件,清理, 在 [第 4
    Apache CN

  • 近期文章

    更多
    文章目录

      推荐作者

      更多