Kubeflow Trainer梳理
仓库链接:https://github.com/kubeflow/trainer
主要的结构体介绍
TrainJob
1 |
|
1 |
|
TrainingRuntimeList
1 |
|
设计巧思
TrainJob.TrainJobStatus
结构:
1 |
|
一个结束的trainJob的example:
如何看一个trainJob是否结束了:
- 直接遍历status.Conditions,比较是否有type为TrainJobComplete或TrainJobFailed,并且比较其status值是否为True
优点:使用condition可以清楚地记录所有发生过的事件的过程。
如何SetStatuCondition:
以newCondition.Type为标准去查看之前是否已经有相关的Condition了
如果还没有这个事件,就加入
如果已经有了,就更新为现在这个Condition
优点:控制了Condition的数量,
主要流程介绍
TrainJobReconciler.Reconcile
这是主要的回调函数
根据req的名字得到trainJob
如果trainJob已经结束了就直接返回
根据这个trainJob所属的runtime的APIGroup和Kind来得到这个类别的所有的runtime
然后调用reconcileObjects(ctx context.Context, runtime jobruntimes.Runtime, trainJob *trainer.TrainJob)
更新trainJob的Status
查看trainJob.Spec.Suspend和trainJob.Status.Conditions,任意标记了suspended,就将一个新的suspended添加到Status.Conditions中
根据reconcileObjects的返回的opState来添加对应的creationSucceeded、buildFailed、creationFailed、updateFailed对应的Condition
设置TerminalCondition
如果在上述的Condition设置中发生了condition的变化,就进行Status的更新
TrainJobReconciler.reconcileObjects
让传进来的runtime根据trainJob.Spec.RuntimeRef.Name得到ClusterTrainingRuntime
调用ClusterTrainingRuntime来创建新的objects
对于label和annotation会在template的基础上添加上trainJob的
最后会根据TemplateSpec生成出带label、annotation、mlPolicy、podGroupPolicy、PodSpecReplicas的infoOption,并根据infoOption来生成Info
根据f.enforceMLPlugins来依据MPI、Torch、PlainML中的设置更新info.RuntimePolicy.MLPolicy、info.RuntimePolicy.MLPolicy.Torch来更新info.Trainer中的一些信息,如NumNodes、Env、ContainerPort等
根据f.enforcePodGroupPolicyPlugins来依据CoScheduling的设置更新info.Scheduler中的信息,如info.Scheduler.PodLabels[schedulerpluginsv1alpha1.PodGroupLabel] = trainJob.Name
根据f.componentBuilderPlugins来依据coScheduling、jobSet、MPI等使用info更新一下额外的components,如CoScheduling就需要更新schedulerpluginsv1alpha1ac.PodGroup
遍历创建的objects进行处理
对object进行类型转换,转为obj
调用patch更新obj
JobSet介绍
JobSet是trainRuntime中的template,是创建pod的基本单元,一个jobset中有多个ReplicatedJob,每个ReplicatedJob代表的是多个一样的pod。
Coscheduling所添加的内容
原本Coscheduling所使用的podGroup的定义:
1 |
|
pkg/runtime/framework/plugins/coscheduling
对于CoScheduling
需要实现三大Plug:
EnforcePodGroupPolicyPlugin:
EnforcePodGroupPolicy函数:
- 主要是加入了
info.Scheduler.PodLabels[schedulerpluginsv1alpha1.
PodGroupLabel
] = trainJob.Name
- 主要是加入了
WatchExtensionPlugin
ReconcilerBuilders函数:
- 确保 PodGroup CRD 已安装,并配置控制器监听 PodGroup、LimitRange 和 RuntimeClass 资源的变化。当这些资源发生变化时,会触发相应的处理逻辑,以保证系统状态的一致性。
ComponentBuilderPlugin
build函数:
如果 info.RuntimePolicy.PodGroupPolicy.Coscheduling不为空,就创建schedulerpluginsv1alpha1ac.PodGroup(需要的内容,包括:
根据info.TotalRequests计算totalMembers、各个类型的资源的总的资源请求
构建出podGroup,主要是填充下面这部分内容:
1
2
3
4
5
6
7// PodGroupSpecApplyConfiguration represents an declarative configuration of the PodGroupSpec type for use
// with apply.
type PodGroupSpecApplyConfiguration struct {
MinMember *int32 `json:"minMember,omitempty"`
MinResources *v1.ResourceList `json:"minResources,omitempty"`
ScheduleTimeoutSeconds *int32 `json:"scheduleTimeoutSeconds,omitempty"`
}- 将podGroup的ownerReferences指向trainJob
对于PodGroupRuntimeClassHandler
需要添加这几个plugin:
TypedEventHandler,这个包含了对event增删查改以及Generic的操作
增删查改实际上都统一调用了queueSuspendedTrainJobs,具体代码执行如下:
依据runtimeClass.Name获取到所有的trainingRuntimes和clusterTrainingRuntimes
依据trainingRuntime.Name和clusterTrainingRuntime.Name获取到对应的trainJob
遍历所有的trainJob,如果它已经suspend那么就将其作为一个request加入到处理队列中
Volcano中相关内容
PodGroup整理
podGroup定义:
1 |
|
PodGroup Controller如何管理Pod
查看pkg/controllers/podgroup/pg_controller.go相关内容:
哪些Pod会被其管理:
pod.Spec.SchedulerName符合Volcano对应的调度器
pod.Annotation中有scheduling.k8s.io/group-name,且名字等于podGroup,如果没有就会创建一个对应的podGroup
可能还需要”volcano.sh/task-spec”=