k8s云原生-Fluid-Dataset和Runtime的生命周期

作者: K8S实践指南

目前,Fluid支持的默认Runtime是AlluxioRuntime。 Fluid提供了Runtime接口,并且假设Runtime和Dataset是一对一的关系,支持开发者扩展不同的Runtime。

本文档介绍了Dataset和Runtime的生命周期,以及开发自己的Runtime的方法。介绍过程中,将假设用户需要基于JindoFS开发JindoRuntime。

Dataset

Dataset的生命周期流程如图所示:

dataset

Runtime

Runtime的生命周期流程如图所示:

runtime

其中,Engine的整体生命周期如下:

Engine.Setup

ShouldSetupMaster:判断是否需要创建Master。
SetupMaster:创建Master。
CheckMasterReady:检查Master是否Ready。
ShouldCheckUFS:判断是否需要PrepareUFS.
PrepareUFS:为分布式缓存集群设定远端存储点。
ShouldSetupWorkers:判断是否需要创建Worker。
SetupWorkers:创建Worker。
CheckWorkersReady:检查Master是否Ready。
CheckAndUpdateRuntimeStatus:检查并更新Runtime的状态。
UpdateDatesetStatus:更新Dataset的状态。

Engine.CreateVolume

CreatePV:根据存储位置创建PV。
CreatePVC:创建PVC。

Engine.Sync

SyncMetadata:进行metadata的同步工作。
CheckAndUpdateRuntimeStatus:检查并更新Runtime的状态。
UpdateCacheOfDataset:更新Dataset的cacheStates。
CheckRuntimeHealthy:检查分布式缓存集群的健康状态,根据检查结果修改Dataset的状态。
SyncReplicas:比较Runtime的期待副本数和分布式缓存集群中的当前副本数。
CheckAndUpdateRuntimeStatus:检查并更新Runtime的状态。

Reconciler

Fluid使用了kubebuilder生成脚手架代码,脚手架代码基于controller runtime框架实现对Controller的管理。

在controller runtime框架中,真正的事件处理通过Reconcile方法暴露给CRD开发者。每种CRD必须定义一个实现了reconcile.Reconcile接口的Reconcile结构体,开发者只需在此结构体的Reconcile方法中去处理业务逻辑就可以了。

在Fluid中,已经定义了Runtime的Reconciler:

// RuntimeReconciler is the default implementation
type RuntimeReconciler struct {
    client.Client
    Log      logr.Logger
    Recorder record.EventRecorder
    // Real implement
    implement RuntimeReconcilerInterface
}

其中,RuntimeReconcilerInterface接口定义了以下方法:

  • ReconcileInternal、ReconcileRuntimeDeletion、AddFinalizerAndRequeue、GetRuntimeObjectMeta、GetDataset:Runtime的Reconciler已经实现。
  • ReconcileRuntime:需要调用GetRuntime方法获取Runtime开发者自己定义Runtime,塞入ctx;调用ReconcileInternal实现具体逻辑。
  • GetOrCreateEngine、RemoveEngine:需要Runtime开发者自己实现。

若要开发JindoRuntime,需要定义JindoRuntime的Reconciler,它是RuntimeReconcilerInterface的具体实现:

// RuntimeReconciler reconciles a JindoRuntime object
type RuntimeReconciler struct {
    Scheme  *runtime.Scheme
    engines map[string]base.Engine
    mutex   *sync.Mutex
    *controllers.RuntimeReconciler
}

其中包含了Runtime的Reconciler的地址。因此,可以通过它调用Runtime的Reconciler已经实现的方法。 Runtime的开发者还需要实现GetRuntime方法,以获得自己定义的Runtime。

在创建JindoRuntime的Reconciler的时候,需要先创建Runtime的Reconciler,再把它的地址添加进来:

func NewRuntimeReconciler(client client.Client,
    log logr.Logger,
    scheme *runtime.Scheme,
    recorder record.EventRecorder) *RuntimeReconciler {
    r := &RuntimeReconciler{
        Scheme:  scheme,
        mutex:   &sync.Mutex,
        engines: map[string]base.Engine,
    }
    r.RuntimeReconciler = controllers.NewRuntimeReconciler(r, client, log, recorder)
    return r
}

Engine

Engine是Fluid抽象出的分布式缓存的管理引擎,Engine需要实现该接口:

type Engine interface {
   // ID returns the id
   ID() string
   // Shutdown and clean up the engine
   Shutdown() error
   // Setup the engine
   Setup(ctx cruntime.ReconcileRequestContext) (ready bool, err error)
   // Setup the Volume
   CreateVolume() (err error)
   // Destroy the Volume
   DeleteVolume() (err error)
   // Sync syncs the alluxio runtime
   Sync(ctx cruntime.ReconcileRequestContext) error
}

pkg/ddc/base/template_engine.go中,为用户提供了一份engine的实现模板:

type TemplateEngine struct {
    Implement
    Id string
    client.Client
    Log     logr.Logger
    Context cruntime.ReconcileRequestContext
}

TemplateEngine实际只实现了ID、Setup、Sync方法。CreateVolume、 DeleteVolume、Shutdown方法只是调用了Runtime开发者具体实现的方法。

TemplateEngine中包含的Implement接口是Runtime开发者的engine需要实现的接口:

// The real engine should implement
type Implement interface {
    UnderFileSystemService
    // Is the master ready
    CheckMasterReady() (ready bool, err error)
    // are the workers ready
    CheckWorkersReady() (ready bool, err error)
    // ShouldSetupMaster checks if we need setup the master
    ShouldSetupMaster() (should bool, err error)
    // ShouldSetupWorkers checks if we need setup the workers
    ShouldSetupWorkers() (should bool, err error)
    ShouldCheckUFS() (should bool, err error)
    // setup the cache master
    SetupMaster() (err error)
    // setup the cache worker
    SetupWorkers() (err error)
    // check if it's Bound to the dataset
    // IsBoundToDataset() (bound bool, err error)
    // Bind to the dataset
    UpdateDatasetStatus(phase datav1alpha1.DatasetPhase) (err error)
    // Prepare the mounts and metadata if it's not ready
    PrepareUFS() (err error)
    // Shutdown and clean up the engine
    Shutdown() error
    // AssignNodesToCache picks up the nodes for replicas
    AssignNodesToCache(desiredNum int32) (currentNum int32, err error)
    // CheckRuntimeHealthy checks runtime healthy
    CheckRuntimeHealthy() (err error)
    // UpdateCacheOfDataset updates cache of the dataset
    UpdateCacheOfDataset() (err error)
    // CheckAndUpdateRuntimeStatus checks and updates the status
    CheckAndUpdateRuntimeStatus() (ready bool, err error)
    CreateVolume() error
    // SyncReplicas syncs the replicas
    SyncReplicas(ctx cruntime.ReconcileRequestContext) error
    // SyncMetadata syncs all metadata from UFS
    SyncMetadata() (err error)
    // Destroy the Volume
    DeleteVolume() (err error)
    // BindToDataset binds the engine to dataset
    BindToDataset() (err error)
}
type UnderFileSystemService interface {
    UsedStorageBytes() (int64, error)
    FreeStorageBytes() (int64, error)
    TotalStorageBytes() (int64, error)
    TotalFileNums() (int64, error)
}

综上,每一个Runtime的engine都需要实现以下方法:

  • CheckMasterReady
  • CheckWorkersReady
  • ShouldSetupMaster
  • ShouldSetupWorkers
  • ShouldCheckUFS
  • SetupMaster
  • SetupWorkers
  • UpdateDatasetStatus
  • PrepareUFS
  • AssignNodesToCache
  • CheckRuntimeHealthy
  • UpdateCacheOfDataset
  • BindToDataset
  • CheckAndUpdateRuntimeStatus
  • CreateVolume
  • SyncReplicas
  • SyncMetadata
  • DeleteVolume
  • UsedStorageBytes
  • FreeStorageBytes
  • TotalStorageBytes
  • TotalFileNums

初始化Engine的方式可以参考pkg/ddc/alluxio/engine.go中的Build方法, 并且需要注册到pkg/ddc/factory.go

开发示例

安装kubebuilder

到 kubebuilder 的 GitHub release 页面上下载与您操作系统对应的 kubebuilder 安装包。

将下载好的安装包解压后将其移动到 /usr/local/kubebuilder 目录下,并将 /usr/local/kubebuilder/bin 添加到您的 $PATH 路径下。

利用kubebuilder生成Runtime的控制器

创建JindoRuntimeController的相关代码:

kubebuilder create api --group data --version v1alpha1 --kind JindoRuntime --namespaced true
Create Resource [y/n]
y
Create Controller [y/n]
y
Writing scaffold for you to edit...
api/v1alpha1/jindoruntime_types.go
controllers/jindoruntime_controller.go
2020/10/25 16:21:06 error updating main.go: open main.go: no such file or directory

注意此处错误可以忽略

controllers/jindoruntime_controller.go拷贝到pkg/controllers/v1alpha1/jindo

cd fluid
mv controllers/jindoruntime_controller.go pkg/controllers/v1alpha1/jindo

修改jindoruntime_controller.go如下:

package jindo
import (
    "context"
    "sync"
    "github.com/pkg/errors"
    "github.com/go-logr/logr"
    "k8s.io/apimachinery/pkg/runtime"
    "k8s.io/client-go/tools/record"
    ctrl "sigs.k8s.io/controller-runtime"
    "sigs.k8s.io/controller-runtime/pkg/client"
    datav1alpha1 "github.com/fluid-cloudnative/fluid/api/v1alpha1"
    "github.com/fluid-cloudnative/fluid/pkg/common"
    "github.com/fluid-cloudnative/fluid/pkg/controllers"
    "github.com/fluid-cloudnative/fluid/pkg/ddc/base"
    cruntime "github.com/fluid-cloudnative/fluid/pkg/runtime"
    "github.com/fluid-cloudnative/fluid/pkg/utils"
)
// Use compiler to check if the struct implements all the interface
var _ controllers.RuntimeReconcilerInterface = (*RuntimeReconciler)(nil)
// RuntimeReconciler reconciles a JindoRuntime object
type RuntimeReconciler struct {
    Scheme  *runtime.Scheme
    engines map[string]base.Engine
    mutex   *sync.Mutex
    *controllers.RuntimeReconciler
}
// NewRuntimeReconciler create controller for watching runtime custom resources created
func NewRuntimeReconciler(client client.Client,
    log logr.Logger,
    scheme *runtime.Scheme,
    recorder record.EventRecorder) *RuntimeReconciler {
    r := &RuntimeReconciler{
        Scheme:  scheme,
        mutex:   &sync.Mutex,
        engines: map[string]base.Engine,
    }
    r.RuntimeReconciler = controllers.NewRuntimeReconciler(r, client, log, recorder)
    return r
}
//Reconcile reconciles jindo runtime
// +kubebuilder:rbac:groups=data.fluid.io,resources=jindoruntimes,verbs=get;list;watch;create;update;patch;delete
// +kubebuilder:rbac:groups=data.fluid.io,resources=jindoruntimes/status,verbs=get;update;patch
func (r *RuntimeReconciler) Reconcile(req ctrl.Request) (ctrl.Result, error) {
    ctx := cruntime.ReconcileRequestContext{
        Context:        context.Background(),
        Log:            r.Log.WithValues("jindoruntime", req.NamespacedName),
        NamespacedName: req.NamespacedName,
        Recorder:       r.Recorder,
        Category:       common.AccelerateCategory,
        RuntimeType:    runtimeType,
        Client:         r.Client,
        FinalizerName:  runtimeResourceFinalizerName,
    }
    ctx.Log.V(1).Info("process the request", "request", req)
    //  1.Load the Runtime
    runtime, err := r.getRuntime(ctx)
    if err != nil {
        if utils.IgnoreNotFound(err) == nil {
            ctx.Log.V(1).Info("The runtime is not found", "runtime", ctx.NamespacedName)
            return ctrl.Result, nil
            ctx.Log.Error(err, "Failed to get the ddc runtime")
            return utils.RequeueIfError(errors.Wrap(err, "Unable to get ddc runtime"))
        }
    }
    ctx.Runtime = runtime
    ctx.Log.V(1).Info("process the runtime", "runtime", ctx.Runtime)
    // reconcile the implement
    return r.ReconcileInternal(ctx)
}
//SetupWithManager setups the manager with RuntimeReconciler
func (r *RuntimeReconciler) SetupWithManager(mgr ctrl.Manager) error {
    return ctrl.NewControllerManagedBy(mgr).
        For(&datav1alpha1.JindoRuntime).
        Complete(r)
}

JindoRuntim的数据结构

在jindorutime_type.go中,根据需要,修改JindoRuntimeSpec和JindoRuntimeStatus的数据结构。

可以通过添加注释,控制kubectl get命令可以查询到的信息:

// +kubebuilder:object:root=true
// +kubebuilder:subresource:status
// +kubebuilder:printcolumn:name="Ready Masters",type="integer",JSONPath=`.status.masterNumberReady`,priority=10
// +kubebuilder:printcolumn:name="Desired Masters",type="integer",JSONPath=`.status.desiredMasterNumberScheduled`,priority=10
// +kubebuilder:printcolumn:name="Master Phase",type="string",JSONPath=`.status.masterPhase`,priority=0
// +kubebuilder:printcolumn:name="Ready Workers",type="integer",JSONPath=`.status.workerNumberReady`,priority=10
// +kubebuilder:printcolumn:name="Desired Workers",type="integer",JSONPath=`.status.desiredWorkerNumberScheduled`,priority=10
// +kubebuilder:printcolumn:name="Worker Phase",type="string",JSONPath=`.status.workerPhase`,priority=0
// +kubebuilder:printcolumn:name="Ready Fuses",type="integer",JSONPath=`.status.fuseNumberReady`,priority=10
// +kubebuilder:printcolumn:name="Desired Fuses",type="integer",JSONPath=`.status.desiredFuseNumberScheduled`,priority=10
// +kubebuilder:printcolumn:name="Fuse Phase",type="string",JSONPath=`.status.fusePhase`,priority=0
// +kubebuilder:printcolumn:name="Age",type="date",JSONPath=`.metadata.creationTimestamp`,priority=0
// +genclient

开发Jindo Engine

创建jindo engine的文件夹:

mkdir pkg/ddc/jindo

在其中创建engine.go文件:

package jindo
import (
    "fmt"
    datav1alpha1 "github.com/fluid-cloudnative/fluid/api/v1alpha1"
    "github.com/fluid-cloudnative/fluid/pkg/ddc/base"
    cruntime "github.com/fluid-cloudnative/fluid/pkg/runtime"
    "github.com/fluid-cloudnative/fluid/pkg/utils/kubeclient"
    "github.com/go-logr/logr"
    "sigs.k8s.io/controller-runtime/pkg/client"
)
type JindoEngine struct {
    runtime     *datav1alpha1.JindoRuntime
    name        string
    namespace   string
    runtimeType string
    Log         logr.Logger
    client.Client
    //When reaching this gracefulShutdownLimits, the system is forced to clean up.
    gracefulShutdownLimits int32
    retryShutdown          int32
    initImage              string
}
func Build(id string, ctx cruntime.ReconcileRequestContext) (base.Engine, error) {
    engine := &JindoEngine{
        name:                   ctx.Name,
        namespace:              ctx.Namespace,
        Client:                 ctx.Client,
        Log:                    ctx.Log,
        runtimeType:            ctx.RuntimeType,
        gracefulShutdownLimits: 5,
        retryShutdown:          0,
    }
    // var implement base.Implement = engine
    // engine.TemplateEngine = template
    if ctx.Runtime != nil {
        runtime, ok := ctx.Runtime.(*datav1alpha1.JindoRuntime)
        if !ok {
            return nil, fmt.Errorf("engine %s is failed to parse", ctx.Name)
        }
        engine.runtime = runtime
        return nil, fmt.Errorf("engine %s is failed to parse", ctx.Name)
    }
    template := base.NewTemplateEngine(engine, id, ctx)
    err := kubeclient.EnsureNamespace(ctx.Client, ctx.Namespace)
    return template, err
}

pkg/ddc/factory.go中修改init函数:

func init() {
    buildFuncMap = map[string]buildFunc{
        "alluxio": alluxio.Build,
        "jindo":   jindo.Build,
    }
}

实现engine的生命周期

为JindoEngine实现刚刚介绍的那些方法,即可实现Jindo engine的生命周期。

创建PV时,如果是FusePV,可以使用Fluid提供的csi-nodeplugin,只要在PV的spec中添加如下字段:

 csi:
  driver: fuse.csi.fluid.io
  volumeAttributes:
   fluid_path: /runtime-mnt/jindofs
   mount_type: fuse.alluxio-fuse

将fluid_path和mount_type替换为实际的挂载目录和挂载种类。

修改Makefile

JINDORUNTIME_CONTROLLER_IMG ?= xxx/xxx
jindoruntime-controller-build: generate fmt vet
    CGO_ENABLED=0 GOOS=linux GOARCH=amd64 GO111MODULE=off  go build -gcflags="all=-N -l" -a -o bin/jindoruntime-controller -ldflags '${LDFLAGS}' cmd/jindo/main.go
docker-build-jindoruntime-controller: generate fmt vet
    docker build --no-cache . -f Dockerfile.jindoruntime -t ${JINDORUNTIME_CONTROLLER_IMG}:${GIT_VERSION}

执行make命令时,会在api/v1alpha1/zz_generated.deepcopy.go中自动生成CRD的deepcopy等方法。

文章列表

更多推荐

更多
  • Linux基础知识-五、更高级的命令行和概念 基本网络概念,安装新软件和更新系统,服务介绍,基本系统故障排除和防火墙,引入 ACLs,setuid、setgid 和粘性位,设置用户标识符,塞吉德,粘性比特,摘要, 在本章中,我们将了解以下内容:基本网络概念安装新
  • Linux基础知识-三、Linux 文件系统 理解文件系统,使用文件链接,搜索文件,与用户和组一起工作,使用文件权限,使用文本文件,使用 VIM 文本编辑器,摘要, 在前一章中,我们通过导航文件系统向您介绍了 Linux 文件和文件夹。在本章中,我们将学习如何使用、查找和更改读取和
  • Linux基础知识-四、使用命令行 基本的 Linux 命令,附加程序,网络工具,Nmap(消歧义),链接,iotop,iftop,快上来,lsof,理解过程,克隆,信号,杀,障碍,使用 Bash Shell 变量,Bash shell 脚本介绍,实现 Bash Shel
  • Linux基础知识-二、Linux 命令行 介绍命令行,文件环球化,引用命令,寻求帮助,使用 Linux Shell,理解标准流,理解正则表达式,与 sed 合作,使用 awk,浏览 Linux 文件系统,摘要, 在本章中,我们将向您介绍开始使用 Linux 命令行时最基本的概念
  • Linux基础知识-一、Linux 简介 Linux 系统概述,虚拟化,安装 VirtualBox 和 CentOS,使用 VirtualBox,通过 SSH 连接虚拟机,摘要, 一个操作系统 ( 操作系统)是一个运行在你的电脑上的特殊软件,它使得启动和运行微软
  • Linux基础知识-零、前言 这本书是给谁的,这本书涵盖了什么,充分利用这本书,下载彩色图像,使用的约定,取得联系,复习, 在这本书里,目标是建立一个坚实的基础,学习 Linux 命令行的所有要点,让你开始。它被设计成非常专注于只学习实用的核心技能和基本的 Linu
  • Git秘籍-十二、使用 Github 在这一章中,我将讨论使用 Github 来托管存储库。目前这是一个流行的开源项目托管平台。我们首先创建一个 Github 帐户并配置 SSH 密钥。完成后,您将学会如何:从公共 Github 库克隆克隆并推送至
    Apache CN

  • Git秘籍-十三、更多秘籍 在这最后一章,我将讨论一些尚未涉及的细节,这些细节迟早会成为你不可或缺的。您将了解到:如何使用命令`$ git diff`比较不同版本的文件?如何克服关于行尾的问题?配置忽略文件的三种不同方法使
    Apache CN

  • Git秘籍-十一、托管 Git 存储库 一旦你和你的同事们学会了如何提交、使用分支和远程操作,你就会想把 git 作为一个团队来使用。在这一章中,我将展示如何建立一个虚拟主机来与他人共享 git 库。 Git 可以使用 ssh、http、https 和 git 网络协议。要
    Apache CN

  • Git秘籍-十、远程存储库和同步 所有 VCS 系统背后的内在原因是使一组开发人员之间的协作尽可能无缝。最后,我们已经到了可以讨论如何使用 git 进行小组工作的时候了。我们将从最简单的设置开始,所有的存储库都可以通过本地存储获得。首先你必须学会如何使用遥控器。我
    Apache CN

  • 近期文章

    更多
    文章目录

      推荐作者

      更多