Kubeflow Trainer梳理

仓库链接:https://github.com/kubeflow/trainer

主要的结构体介绍

TrainJob

1
2
3
4
5
6
7
8
9
10
11
12
13
// TrainJob represents configuration of a training job.
type TrainJob struct {
metav1.TypeMeta `json:",inline"`

// Standard object's metadata.
metav1.ObjectMeta `json:"metadata,omitempty"`

// Specification of the desired TrainJob.
Spec TrainJobSpec `json:"spec,omitempty"`

// Current status of TrainJob.
Status TrainJobStatus `json:"status,omitempty"`
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
// TrainJobSpec represents specification of the desired TrainJob.
type TrainJobSpec struct {
// Reference to the training runtime.
// The field is immutable.
// +kubebuilder:validation:XValidation:rule="self == oldSelf", message="runtimeRef is immutable"
RuntimeRef RuntimeRef `json:"runtimeRef"`

// Configuration of the desired trainer.
Trainer *Trainer `json:"trainer,omitempty"`

// Configuration of the training dataset.
DatasetConfig *DatasetConfig `json:"datasetConfig,omitempty"`

// Configuration of the pre-trained and trained model.
ModelConfig *ModelConfig `json:"modelConfig,omitempty"`

// Labels to apply for the derivative JobSet and Jobs.
// They will be merged with the TrainingRuntime values.
Labels map[string]string `json:"labels,omitempty"`

// Annotations to apply for the derivative JobSet and Jobs.
// They will be merged with the TrainingRuntime values.
Annotations map[string]string `json:"annotations,omitempty"`

// Custom overrides for the training runtime.
// +listType=atomic
PodSpecOverrides []PodSpecOverride `json:"podSpecOverrides,omitempty"`

// Whether the controller should suspend the running TrainJob.
// Defaults to false.
// +kubebuilder:default=false
Suspend *bool `json:"suspend,omitempty"`

// ManagedBy is used to indicate the controller or entity that manages a TrainJob.
// The value must be either an empty, `trainer.kubeflow.org/trainjob-controller` or
// `kueue.x-k8s.io/multikueue`. The built-in TrainJob controller reconciles TrainJob which
// don't have this field at all or the field value is the reserved string
// `trainer.kubeflow.org/trainjob-controller`, but delegates reconciling TrainJobs
// with a 'kueue.x-k8s.io/multikueue' to the Kueue. The field is immutable.
// Defaults to `trainer.kubeflow.org/trainjob-controller`
// +kubebuilder:default="trainer.kubeflow.org/trainjob-controller"
// +kubebuilder:validation:XValidation:rule="self in ['trainer.kubeflow.org/trainjob-controller', 'kueue.x-k8s.io/multikueue']", message="ManagedBy must be trainer.kubeflow.org/trainjob-controller or kueue.x-k8s.io/multikueue if set"
// +kubebuilder:validation:XValidation:rule="self == oldSelf", message="ManagedBy value is immutable"
ManagedBy *string `json:"managedBy,omitempty"`
}

TrainingRuntimeList

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
// TrainingRuntimeList is a collection of training runtimes.
type TrainingRuntimeList struct {
metav1.TypeMeta `json:",inline"`

// Standard list metadata.
metav1.ListMeta `json:"metadata,omitempty"`

// List of TrainingRuntimes.
Items []TrainingRuntime `json:"items"`
}

// TrainingRuntime represents a training runtime which can be referenced as part of
// `runtimeRef` API in TrainJob. This resource is a namespaced-scoped and can be referenced
// by TrainJob that created in the *same* namespace as the TrainingRuntime.
type TrainingRuntime struct {
metav1.TypeMeta `json:",inline"`

// Standard object's metadata.
metav1.ObjectMeta `json:"metadata,omitempty"`

// Specification of the desired TrainingRuntime.
Spec TrainingRuntimeSpec `json:"spec,omitempty"`
}

// TrainingRuntimeSpec represents a specification of the desired training runtime.
type TrainingRuntimeSpec struct {
// Configuration for the model training with ML-specific parameters.
MLPolicy *MLPolicy `json:"mlPolicy,omitempty"`

// Configuration for the PodGroup to enable gang-scheduling via supported plugins.
PodGroupPolicy *PodGroupPolicy `json:"podGroupPolicy,omitempty"`

// JobSet template which will be used by TrainJob.
Template JobSetTemplateSpec `json:"template"`
}

// MLPolicy represents configuration for the model trining with ML-specific parameters.
// +kubebuilder:validation:XValidation:rule="!(has(self.numNodes) && (has(self.torch) && has(self.torch.elasticPolicy)))", message="numNodes should not be set if torch.elasticPolicy is configured"
// +kubebuilder:validation:XValidation:rule="!(has(self.torch) && has(self.mpi))", message="Only one of the policy can be configured"
type MLPolicy struct {
// Number of training nodes.
// Defaults to 1.
NumNodes *int32 `json:"numNodes,omitempty"`

// Configuration for the runtime-specific parameters, such as Torch or MPI.
// Only one of its members may be specified.
MLPolicySource `json:",inline"`
}

// MLPolicySource represents the runtime-specific configuration for various technologies.
// One of the following specs can be set.
type MLPolicySource struct {
// Configuration for the PyTorch runtime.
Torch *TorchMLPolicySource `json:"torch,omitempty"`

// Configuration for the MPI Runtime.
MPI *MPIMLPolicySource `json:"mpi,omitempty"`
}

// PodGroupPolicy represents a PodGroup configuration for gang-scheduling.
type PodGroupPolicy struct {
// Configuration for gang-scheduling using various plugins.
PodGroupPolicySource `json:",inline"`
}

// PodGroupPolicySource represents supported plugins for gang-scheduling.
// Only one of its members may be specified.
type PodGroupPolicySource struct {
// Coscheduling plugin from the Kubernetes scheduler-plugins for gang-scheduling.
Coscheduling *CoschedulingPodGroupPolicySource `json:"coscheduling,omitempty"`

// TODO (andreyvelich): Add support for Volcano gang-scheduler.
}

// JobSetTemplateSpec represents a template of the desired JobSet.
type JobSetTemplateSpec struct {
// Metadata for custom JobSet's labels and annotations.
// JobSet name and namespace is equal to the TrainJob's name and namespace.
metav1.ObjectMeta `json:"metadata,omitempty"`

// Specification of the desired JobSet which will be created from TrainJob.
Spec jobsetv1alpha2.JobSetSpec `json:"spec,omitempty"`
}

设计巧思

TrainJob.TrainJobStatus

结构:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
// TrainJobStatus represents the current status of TrainJob.
type TrainJobStatus struct {
// Conditions for the TrainJob.
//
// +optional
// +listType=map
// +listMapKey=type
// +patchStrategy=merge
// +patchMergeKey=type
Conditions []metav1.Condition `json:"conditions,omitempty" patchStrategy:"merge" patchMergeKey:"type"`

// JobsStatus tracks the child Jobs in TrainJob.
// +listType=map
// +listMapKey=name
JobsStatus []JobStatus `json:"jobsStatus,omitempty"`
}
// Condition contains details for one aspect of the current state of this API Resource.
// ---
// This struct is intended for direct use as an array at the field path .status.conditions. For example,
//
// type FooStatus struct{
// // Represents the observations of a foo's current state.
// // Known .status.conditions.type are: "Available", "Progressing", and "Degraded"
// // +patchMergeKey=type
// // +patchStrategy=merge
// // +listType=map
// // +listMapKey=type
// Conditions []metav1.Condition `json:"conditions,omitempty" patchStrategy:"merge" patchMergeKey:"type" protobuf:"bytes,1,rep,name=conditions"`
//
// // other fields
// }
type Condition struct {
// type of condition in CamelCase or in foo.example.com/CamelCase.
// ---
// Many .condition.type values are consistent across resources like Available, but because arbitrary conditions can be
// useful (see .node.status.conditions), the ability to deconflict is important.
// The regex it matches is (dns1123SubdomainFmt/)?(qualifiedNameFmt)
// +required
// +kubebuilder:validation:Required
// +kubebuilder:validation:Pattern=`^([a-z0-9]([-a-z0-9]*[a-z0-9])?(\.[a-z0-9]([-a-z0-9]*[a-z0-9])?)*/)?(([A-Za-z0-9][-A-Za-z0-9_.]*)?[A-Za-z0-9])$`
// +kubebuilder:validation:MaxLength=316
Type string `json:"type" protobuf:"bytes,1,opt,name=type"`
// status of the condition, one of True, False, Unknown.
// +required
// +kubebuilder:validation:Required
// +kubebuilder:validation:Enum=True;False;Unknown
Status ConditionStatus `json:"status" protobuf:"bytes,2,opt,name=status"`
// observedGeneration represents the .metadata.generation that the condition was set based upon.
// For instance, if .metadata.generation is currently 12, but the .status.conditions[x].observedGeneration is 9, the condition is out of date
// with respect to the current state of the instance.
// +optional
// +kubebuilder:validation:Minimum=0
ObservedGeneration int64 `json:"observedGeneration,omitempty" protobuf:"varint,3,opt,name=observedGeneration"`
// lastTransitionTime is the last time the condition transitioned from one status to another.
// This should be when the underlying condition changed. If that is not known, then using the time when the API field changed is acceptable.
// +required
// +kubebuilder:validation:Required
// +kubebuilder:validation:Type=string
// +kubebuilder:validation:Format=date-time
LastTransitionTime Time `json:"lastTransitionTime" protobuf:"bytes,4,opt,name=lastTransitionTime"`
// reason contains a programmatic identifier indicating the reason for the condition's last transition.
// Producers of specific condition types may define expected values and meanings for this field,
// and whether the values are considered a guaranteed API.
// The value should be a CamelCase string.
// This field may not be empty.
// +required
// +kubebuilder:validation:Required
// +kubebuilder:validation:MaxLength=1024
// +kubebuilder:validation:MinLength=1
// +kubebuilder:validation:Pattern=`^[A-Za-z]([A-Za-z0-9_,:]*[A-Za-z0-9_])?$`
Reason string `json:"reason" protobuf:"bytes,5,opt,name=reason"`
// message is a human readable message indicating details about the transition.
// This may be an empty string.
// +required
// +kubebuilder:validation:Required
// +kubebuilder:validation:MaxLength=32768
Message string `json:"message" protobuf:"bytes,6,opt,name=message"`
}


type JobStatus struct {
// Name of the child Job.
Name string `json:"name"`

// Ready is the number of child Jobs where the number of ready pods and completed pods
// is greater than or equal to the total expected pod count for the child Job.
Ready int32 `json:"ready"`

// Succeeded is the number of successfully completed child Jobs.
Succeeded int32 `json:"succeeded"`

// Failed is the number of failed child Jobs.
Failed int32 `json:"failed"`

// Active is the number of child Jobs with at least 1 pod in a running or pending state
// which are not marked for deletion.
Active int32 `json:"active"`

// Suspended is the number of child Jobs which are in a suspended state.
Suspended int32 `json:"suspended"`
}

一个结束的trainJob的example:

如何看一个trainJob是否结束了:

  • 直接遍历status.Conditions,比较是否有type为TrainJobComplete或TrainJobFailed,并且比较其status值是否为True

优点:使用condition可以清楚地记录所有发生过的事件的过程。

如何SetStatuCondition:

  • 以newCondition.Type为标准去查看之前是否已经有相关的Condition了

  • 如果还没有这个事件,就加入

  • 如果已经有了,就更新为现在这个Condition

优点:控制了Condition的数量,

主要流程介绍

TrainJobReconciler.Reconcile

这是主要的回调函数

  1. 根据req的名字得到trainJob

  2. 如果trainJob已经结束了就直接返回

  3. 根据这个trainJob所属的runtime的APIGroup和Kind来得到这个类别的所有的runtime

  4. 然后调用reconcileObjects(ctx context.Context, runtime jobruntimes.Runtime, trainJob *trainer.TrainJob)

  5. 更新trainJob的Status

    1. 查看trainJob.Spec.Suspend和trainJob.Status.Conditions,任意标记了suspended,就将一个新的suspended添加到Status.Conditions中

    2. 根据reconcileObjects的返回的opState来添加对应的creationSucceeded、buildFailed、creationFailed、updateFailed对应的Condition

    3. 设置TerminalCondition

    4. 如果在上述的Condition设置中发生了condition的变化,就进行Status的更新

TrainJobReconciler.reconcileObjects

  1. 让传进来的runtime根据trainJob.Spec.RuntimeRef.Name得到ClusterTrainingRuntime

  2. 调用ClusterTrainingRuntime来创建新的objects

    1. 对于label和annotation会在template的基础上添加上trainJob的

    2. 最后会根据TemplateSpec生成出带label、annotation、mlPolicy、podGroupPolicy、PodSpecReplicas的infoOption,并根据infoOption来生成Info

    3. 根据f.enforceMLPlugins来依据MPI、Torch、PlainML中的设置更新info.RuntimePolicy.MLPolicy、info.RuntimePolicy.MLPolicy.Torch来更新info.Trainer中的一些信息,如NumNodes、Env、ContainerPort等

    4. 根据f.enforcePodGroupPolicyPlugins来依据CoScheduling的设置更新info.Scheduler中的信息,如info.Scheduler.PodLabels[schedulerpluginsv1alpha1.PodGroupLabel] = trainJob.Name

    5. 根据f.componentBuilderPlugins来依据coScheduling、jobSet、MPI等使用info更新一下额外的components,如CoScheduling就需要更新schedulerpluginsv1alpha1ac.PodGroup

  3. 遍历创建的objects进行处理

    1. 对object进行类型转换,转为obj

    2. 调用patch更新obj

JobSet介绍

JobSet是trainRuntime中的template,是创建pod的基本单元,一个jobset中有多个ReplicatedJob,每个ReplicatedJob代表的是多个一样的pod。

Coscheduling所添加的内容

原本Coscheduling所使用的podGroup的定义:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
// PodGroupSpec represents the template of a pod group.
type PodGroupSpec struct {
// MinMember defines the minimal number of members/tasks to run the pod group;
// if there's not enough resources to start all tasks, the scheduler
// will not start any.
MinMember uint32 `json:"minMember"`

// MinResources defines the minimal resource of members/tasks to run the pod group;
// if there's not enough resources to start all tasks, the scheduler
// will not start any.
MinResources *v1.ResourceList `json:"minResources,omitempty"`

// ScheduleTimeoutSeconds defines the maximal time of members/tasks to wait before run the pod group;
ScheduleTimeoutSeconds *int32 `json:"scheduleTimeoutSeconds,omitempty"`
}

// PodGroupStatus represents the current state of a pod group.
type PodGroupStatus struct {
// Current phase of PodGroup.
Phase PodGroupPhase `json:"phase"`

// OccupiedBy marks the workload (e.g., deployment, statefulset) UID that occupy the podgroup.
// It is empty if not initialized.
OccupiedBy string `json:"occupiedBy,omitempty"`

// The number of actively running pods.
// +optional
Running uint32 `json:"running"`

// The number of pods which reached phase Succeeded.
// +optional
Succeeded uint32 `json:"succeeded"`

// The number of pods which reached phase Failed.
// +optional
Failed uint32 `json:"failed"`

// ScheduleStartTime of the group
ScheduleStartTime metav1.Time `json:"scheduleStartTime"`
}

pkg/runtime/framework/plugins/coscheduling

对于CoScheduling

需要实现三大Plug:

  • EnforcePodGroupPolicyPlugin:

    • EnforcePodGroupPolicy函数:

      • 主要是加入了info.Scheduler.PodLabels[schedulerpluginsv1alpha1.PodGroupLabel] = trainJob.Name
  • WatchExtensionPlugin

    • ReconcilerBuilders函数:

      • 确保 PodGroup CRD 已安装,并配置控制器监听 PodGroup、LimitRange 和 RuntimeClass 资源的变化。当这些资源发生变化时,会触发相应的处理逻辑,以保证系统状态的一致性。
  • ComponentBuilderPlugin

    • build函数:

      • 如果 info.RuntimePolicy.PodGroupPolicy.Coscheduling不为空,就创建schedulerpluginsv1alpha1ac.PodGroup(需要的内容,包括:

      • 根据info.TotalRequests计算totalMembers、各个类型的资源的总的资源请求

      • 构建出podGroup,主要是填充下面这部分内容:

      1
      2
      3
      4
      5
      6
      7
      // PodGroupSpecApplyConfiguration represents an declarative configuration of the PodGroupSpec type for use
      // with apply.
      type PodGroupSpecApplyConfiguration struct {
      MinMember *int32 `json:"minMember,omitempty"`
      MinResources *v1.ResourceList `json:"minResources,omitempty"`
      ScheduleTimeoutSeconds *int32 `json:"scheduleTimeoutSeconds,omitempty"`
      }
      • 将podGroup的ownerReferences指向trainJob

对于PodGroupRuntimeClassHandler

需要添加这几个plugin:

  • TypedEventHandler,这个包含了对event增删查改以及Generic的操作

  • 增删查改实际上都统一调用了queueSuspendedTrainJobs,具体代码执行如下:

    • 依据runtimeClass.Name获取到所有的trainingRuntimes和clusterTrainingRuntimes

    • 依据trainingRuntime.Name和clusterTrainingRuntime.Name获取到对应的trainJob

    • 遍历所有的trainJob,如果它已经suspend那么就将其作为一个request加入到处理队列中

Volcano中相关内容

PodGroup整理

podGroup定义:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
// PodGroupSpec represents the template of a pod group.
type PodGroupSpec struct {
// MinMember defines the minimal number of members/tasks to run the pod group;
// if there's not enough resources to start all tasks, the scheduler
// will not start anyone.
MinMember int32 `json:"minMember,omitempty" protobuf:"bytes,1,opt,name=minMember"`

// Queue defines the queue to allocate resource for PodGroup; if queue does not exist,
// the PodGroup will not be scheduled. Defaults to `default` Queue with the lowest weight.
// +optional
Queue string `json:"queue,omitempty" protobuf:"bytes,2,opt,name=queue"`

// If specified, indicates the PodGroup's priority. "system-node-critical" and
// "system-cluster-critical" are two special keywords which indicate the
// highest priorities with the former being the highest priority. Any other
// name must be defined by creating a PriorityClass object with that name.
// If not specified, the PodGroup priority will be default or zero if there is no
// default.
// +optional
PriorityClassName string `json:"priorityClassName,omitempty" protobuf:"bytes,3,opt,name=priorityClassName"`

// MinResources defines the minimal resource of members/tasks to run the pod group;
// if there's not enough resources to start all tasks, the scheduler
// will not start anyone.
MinResources *v1.ResourceList `json:"minResources,omitempty" protobuf:"bytes,4,opt,name=minResources"`
}

// PodGroupStatus represents the current state of a pod group.
type PodGroupStatus struct {
// Current phase of PodGroup.
Phase PodGroupPhase `json:"phase,omitempty" protobuf:"bytes,1,opt,name=phase"`

// The conditions of PodGroup.
// +optional
Conditions []PodGroupCondition `json:"conditions,omitempty" protobuf:"bytes,2,opt,name=conditions"`

// The number of actively running pods.
// +optional
Running int32 `json:"running,omitempty" protobuf:"bytes,3,opt,name=running"`

// The number of pods which reached phase Succeeded.
// +optional
Succeeded int32 `json:"succeeded,omitempty" protobuf:"bytes,4,opt,name=succeeded"`

// The number of pods which reached phase Failed.
// +optional
Failed int32 `json:"failed,omitempty" protobuf:"bytes,5,opt,name=failed"`
}

PodGroup Controller如何管理Pod

查看pkg/controllers/podgroup/pg_controller.go相关内容:
哪些Pod会被其管理:

  • pod.Spec.SchedulerName符合Volcano对应的调度器

  • pod.Annotation中有scheduling.k8s.io/group-name,且名字等于podGroup,如果没有就会创建一个对应的podGroup

  • 可能还需要”volcano.sh/task-spec”=


Kubeflow Trainer梳理
http://example.com/2025/04/01/Kubeflow Trainer梳理/
作者
滑滑蛋
发布于
2025年4月1日
许可协议