kubelet: The primary “node agent” that runs on each node.
Kubelet源码分析
Kubelet Version: v1.3.4
1. Kubelet简介
Kubelet运行在Kubernetes的每个Node上,主要负责容器的生命周期管理和状态维护,以及监控数据采集。kubelet从配置文件或者HTTP请求或者apiserver获取容器配置清单,容器配置清单是一个描述pod的文件,然后它会根据最新获取到的清单同步本地node上的pod和容器的状态。
2. Kubelet类型介绍
Kubelet是组件中重要的结构体,它的位置在pkg/kubelet/kubelet.go中
type Kubelet struct {
hostname string
nodeName string
// 最终执行docker命令的client,调用的是
// github.com/docker/engine-api/client
dockerClient dockertools.DockerInterface
// 从apiserver拉取pod配置清单时使用的client
kubeClient clientset.Interface
// 维护kubelet文件的目录,默认值是/var/lib/kubelet
rootDirectory string
// 同步pod的协程
podWorkers PodWorkers
// 管理pod的manager
podManager kubepod.Manager
// 获取节点信息及容器信息的接口
cadvisor cadvisor.Interface
// 已停止的容器的垃圾收集策略
containerGC kubecontainer.ContainerGC
// 管理image的操作以及image的垃圾收集策略
imageManager imageManager
// 创建pod时,用来检查磁盘空间是否充足
diskSpaceManager diskSpaceManager
// 维护一个podstatus的状态缓存,供查询更新
statusManager status.Manager
// 管理pod上磁盘的挂载和卸载
volumeManager volumemanager.VolumeManager
//管理pod image,负责容器的垃圾收集,调用底层的dockerclient实现功能
containerRuntime kubecontainer.Runtime
...
...
...
}
2.1 dockerclient功能介绍
接口:
type DockerInterface interface {
ListContainers(options dockertypes.ContainerListOptions)
([]dockertypes.Container, error)
InspectContainer(id string) (*dockertypes.ContainerJSON, error)
CreateContainer(dockertypes.ContainerCreateConfig) (*dockertypes.ContainerCreateResponse, error)
StartContainer(id string) error
StopContainer(id string, timeout int) error
RemoveContainer(id string, opts dockertypes.ContainerRemoveOptions)
error
InspectImage(image string) (*dockertypes.ImageInspect, error)
ListImages(opts dockertypes.ImageListOptions) ([]dockertypes.Image,
error)
PullImage(image string, auth dockertypes.AuthConfig, opts dockertypes.ImagePullOptions) error
RemoveImage(image string, opts dockertypes.ImageRemoveOptions) ([]dockertypes.ImageDelete, error)
ImageHistory(id string) ([]dockertypes.ImageHistory, error)
Logs(string, dockertypes.ContainerLogsOptions, StreamOptions) error
Version() (*dockertypes.Version, error)
Info() (*dockertypes.Info, error)
CreateExec(string, dockertypes.ExecConfig) (*dockertypes.ContainerExecCreateResponse, error)
StartExec(string, dockertypes.ExecStartCheck, StreamOptions) error
InspectExec(id string) (*dockertypes.ContainerExecInspect, error)
AttachToContainer(string, dockertypes.ContainerAttachOptions,
StreamOptions) error
ResizeContainerTTY(id string, height, width int) error
ResizeExecTTY(id string, height, width int) error
}
2.2 podWorkers功能介绍
接口:
type PodWorkers interface {
UpdatePod(options *UpdatePodOptions)
ForgetNonExistingPodWorkers(desiredPods map[types.UID]empty)
ForgetWorker(uid types.UID)
}
当有创建pod或者更新pod的请求推送过来时,就会调用podWorkers的UpdatePod方法,生成一个go协程执行podWorkers的managePodLoop方法,每个pod对应一个go协程,如果协程不存在,才创建,然后将需要同步的pod数据丢到跟协程相通的channel中。managePodLoop方法中会再调用Kubelet的syncPod方法,来最终同步apiserver中的pod状态到本地。
func (p *podWorkers) UpdatePod(options *UpdatePodOptions) {
pod := options.Pod
uid := pod.UID
var podUpdates chan UpdatePodOptions
var exists bool
p.podLock.Lock()
defer p.podLock.Unlock()
if podUpdates, exists = p.podUpdates[uid]; !exists {
podUpdates = make(chan UpdatePodOptions, 1)
p.podUpdates[uid] = podUpdates
go func() {
defer runtime.HandleCrash()
p.managePodLoop(podUpdates)
}()
}
if !p.isWorking[pod.UID] {
p.isWorking[pod.UID] = true
podUpdates <- *options
} else {
// if a request to kill a pod is pending, we do not let anything overwrite that request.
update, found := p.lastUndeliveredWorkUpdate[pod.UID]
if !found || update.UpdateType != kubetypes.SyncPodKill {
p.lastUndeliveredWorkUpdate[pod.UID] = *options
}
}
}
func (p *podWorkers) managePodLoop(podUpdates <-chan UpdatePodOptions) {
var lastSyncTime time.Time
for update := range podUpdates {
err := func() error {
podUID := update.Pod.UID
status, err := p.podCache.GetNewerThan(podUID, lastSyncTime)
if err != nil {
return err
}
err = p.syncPodFn(syncPodOptions{
mirrorPod: update.MirrorPod,
pod: update.Pod,
podStatus: status,
killPodOptions: update.KillPodOptions,
updateType: update.UpdateType,
})
lastSyncTime = time.Now()
if err != nil {
return err
}
return nil
}()
// notify the call-back function if the operation succeeded or not
if update.OnCompleteFunc != nil {
update.OnCompleteFunc(err)
}
if err != nil {
glog.Errorf("Error syncing pod %s, skipping: %v", update.Pod.UID, err)
p.recorder.Eventf(update.Pod, api.EventTypeWarning, events.FailedSync, "Error syncing pod, skipping: %v", err)
}
p.wrapUp(update.Pod.UID, err)
}
}
2.3 podManager功能介绍
接口:
type Manager interface {
GetPods() []*api.Pod
GetPodByFullName(podFullName string) (*api.Pod, bool)
GetPodByName(namespace, name string) (*api.Pod, bool)
GetPodByUID(types.UID) (*api.Pod, bool)
GetPodByMirrorPod(*api.Pod) (*api.Pod, bool)
GetMirrorPodByPod(*api.Pod) (*api.Pod, bool)
GetPodsAndMirrorPods() ([]*api.Pod, []*api.Pod)
SetPods(pods []*api.Pod)
AddPod(pod *api.Pod)
UpdatePod(pod *api.Pod)
DeletePod(pod *api.Pod)
DeleteOrphanedMirrorPods()
TranslatePodUID(uid types.UID) types.UID
GetUIDTranslations()(podToMirror, mirrorToPod map[types.UID]types.UID)
IsMirrorPodOf(mirrorPod, pod *api.Pod) bool
MirrorClient
}
实现pod的增删改查等功能
2.4 cadvisor功能介绍
接口:
type Interface interface {
Start() error
DockerContainer(name string, req *cadvisorapi.ContainerInfoRequest)
(cadvisorapi.ContainerInfo, error)
ContainerInfo(name string, req *cadvisorapi.ContainerInfoRequest) (*cadvisorapi.ContainerInfo, error)
ContainerInfoV2(name string, options cadvisorapiv2.RequestOptions) (map[string]cadvisorapiv2.ContainerInfo, error)
SubcontainerInfo(name string, req *cadvisorapi.ContainerInfoRequest) (map[string]*cadvisorapi.ContainerInfo, error)
MachineInfo() (*cadvisorapi.MachineInfo, error)
VersionInfo() (*cadvisorapi.VersionInfo, error)
ImagesFsInfo() (cadvisorapiv2.FsInfo, error)
RootFsInfo() (cadvisorapiv2.FsInfo, error)
WatchEvents(request *events.Request) (*events.EventChannel, error)
}
主要作用是获取本地node上的容器信息,机器信息,镜像信息,以及磁盘信息等
构建Kubelet对象时会初始化一个cadvisor对象,此时会启动一个httpserver服务默认监听4194端口, RegisterHandlers会给/containers和/docker路径分别注册一个handler, RegisterPrometheusHandler会给/metrics路径注册一个handler,当访问这些路径时就会调用相应的handler获取监控数据。
func (cc *cadvisorClient) exportHTTP(port uint) error {
// Register the handlers regardless as this registers the prometheus
// collector properly.
mux := http.NewServeMux()
err := cadvisorhttp.RegisterHandlers(mux, cc, "", "", "", "")
if err != nil {
return err
}
cadvisorhttp.RegisterPrometheusHandler(mux, cc, "/metrics", nil)
// Only start the http server if port > 0
if port > 0 {
serv := &http.Server{
Addr: fmt.Sprintf(":%d", port),
Handler: mux,
}
// TODO(vmarmol): Remove this when the cAdvisor port is once again free.
// If export failed, retry in the background until we are able to bind.
// This allows an existing cAdvisor to be killed before this one registers.
go func() {
defer runtime.HandleCrash()
err := serv.ListenAndServe()
for err != nil {
glog.Infof("Failed to register cAdvisor on port %d, retrying. Error: %v", port, err)
time.Sleep(time.Minute)
err = serv.ListenAndServe()
}
}()
}
return nil
}
kubelet启动时会调用cadvisor的Start方法,实际上调用的是cadvisor manager的Start方法,Start里面会为每个cgroup下检测到的容器启动一个housekeeping协程,来定时更新cadvisor中关于容器的监控数据,最后还会启动一个全局的housekeeping协程,保证增加容器删除容器时cadvisor中的数据也能及时更新。
2.5 containerGC功能介绍
接口:
type ContainerGC interface {
// Garbage collect containers.
GarbageCollect(allSourcesReady bool) error
}
实际上调用的是containerRuntime的GarbageCollect方法。
容器垃圾收集会根据一个策略来执行,用到的参数有三个:
gcPolicy := kubecontainer.ContainerGCPolicy{
// 容器的最小存活时间
MinAge: kc.MinimumGCAge,
// 每个pod的最大容器个数
MaxPerPodContainer: kc.MaxPerPodContainerCount,
// 整个node上的最大容器个数
MaxContainers: kc.MaxContainerCount,
}
主要步骤是: * 清除所有存活时间大于MinAge,不是k8s定义的容器 * 清除所属的pod已经删除的容器 * 如果某个pod的容器个数超过规定的最大个数,清除超出的时间最老的容器 * 如果node上的容器个数超过规定的最大个数,清除超出的时间最老的容器
2.6 imageManager功能介绍
接口:
type imageManager interface {
GarbageCollect() error
Start() error
GetImageList() ([]kubecontainer.Image, error)
DeleteUnusedImages() (int64, error)
}
/pkg/kubelet/kubelet.go中的Run(updates <-**chan **kubetypes.PodUpdate)方法中会启动imageManager的Start方法和GarbageCollect方法。
Start方法会启动一个无限循环的协程,主要做的事情是维护imageRecord这个记录,先从containerRuntime获取实际的images,然后跟imageRecord中的记录作比较,加入新增加的image的记录,更新image的firstDetected时间,lastUsed时间以及size,然后将已删除的image从imageRecord记录中删除。
GarbageCollect方法主要用到了三个参数:
type ImageGCPolicy struct {
// 镜像磁盘使用的百分比大于此值,则垃圾回收
HighThresholdPercent int
// 镜像磁盘使用的百分比小于此值,则不用垃圾回收
LowThresholdPercent int
// 镜像回收前使用的最小时间
MinAge time.Duration
}
执行步骤是: * 先通过cadvisor获取镜像的磁盘使用情况 * 如果镜像磁盘使用的百分比大于HighThresholdPercent,则开始垃圾回收 * 垃圾回收的原则是,将imageRecord中的images排序,lastUsed时间较老的排在前面,lastUsed相同的则firstDetected较老的排在前面,然后遍历imageRecord,如果lastUsed时间等于或大于当前时间,则不释放,如果image的使用时间小于规定的MinAge,也不释放,否则释放。
2.7 diskSpaceManager功能介绍
接口:
type diskSpaceManager interface {
// Checks the available disk space
IsRootDiskSpaceAvailable() (bool, error)
IsRuntimeDiskSpaceAvailable() (bool, error)
}
构建diskSpaceManager的policy的时候会用到两个参数:
type DiskSpacePolicy struct {
// docker images的空余空间限制
DockerFreeDiskMB int
// host volumes的空余空间限制
RootFreeDiskMB int
}
新创建pod时调用这两个方法来检查roodiskspace(host volumes的占用的位置)以及runtimediskspace(docker images占用的位置)是否充足。这两个方法中的磁盘数据是调用cadvisor的接口获取的。
2.8 statusManager功能介绍
接口:
type Manager interface {
PodStatusProvider
Start()
SetPodStatus(pod *api.Pod, status api.PodStatus)
SetContainerReadiness(podUID types.UID, containerID kubecontainer.ContainerID, ready bool)
TerminatePod(pod *api.Pod)
RemoveOrphanedStatuses(podUIDs map[types.UID]bool)
}
type PodStatusProvider interface {
GetPodStatus(uid types.UID) (api.PodStatus, bool)
}
manager结构:
type manager struct {
kubeClient clientset.Interface
podManager kubepod.Manager
// Map from pod UID to sync status of the corresponding pod.
podStatuses map[types.UID]versionedPodStatus
podStatusesLock sync.RWMutex
podStatusChannel chan podStatusSyncRequest
// Map from (mirror) pod UID to latest status version successfully sent to the API server.
// apiStatusVersions must only be accessed from the sync thread.
apiStatusVersions map[types.UID]uint64
}
Kubelet启动时,Kubelet对象初始化完成后会调用statusManager的start方法,来启动一个协程定时批量同步manager中的podStatuses到apiserver。
批量同步时会找出那些apiStatusVersions 的pod version和podStatuses的pod version不同的pod来同步, podManager中的podstatus和podStatuses中的podstatus不同的pod也会调用syncPod方法来同步。
创建pod时也会调用statusManager的SetPodStatus方法,此方法中会把收到的podstatus放入podStatusChannel中,从而启动syncPod方法,然后同步当前状态到apiserver中,这个过程中pod的版本号会加1,如果向apiserver的同步没有成功,则apiStatusVersions 跟podStatuses的pod版本号就不相同,这样就需要批量同步的时候再更新。
2.9 volumeManager功能介绍
接口:
type VolumeManager interface {
Run(stopCh <-chan struct{})
WaitForAttachAndMount(pod *api.Pod) error
GetMountedVolumesForPod(podName types.UniquePodName)
container.VolumeMap
GetExtraSupplementalGroupsForPod(pod *api.Pod) []int64
GetVolumesInUse() []api.UniqueVolumeName
VolumeIsAttached(volumeName api.UniqueVolumeName) bool
MarkVolumesAsReportedInUse(volumesReportedAsInUse
[]api.UniqueVolumeName)
}
volumeManager的作用主要是卸载挂载volume,以及查询pod的volume。
Start方法也是在kubelet启动时,Kubelet对象初始化完成后调用的,Start方法也会启动一个协程,定时检查来保证该卸载的volume已经被卸载,该挂载的volume被正确挂载。
2.10 containerRuntime功能介绍
接口:
type Runtime interface {
Type() string
Version() (Version, error)
Version() (Version, error)
APIVersion() (Version, error)
Status() error
GetPods(all bool) ([]*Pod, error)
GarbageCollect(gcPolicy ContainerGCPolicy, allSourcesReady bool) error
SyncPod(pod *api.Pod, apiPodStatus api.PodStatus, podStatus *PodStatus, pullSecrets []api.Secret, backOff *flowcontrol.Backoff) PodSyncResult
KillPod(pod *api.Pod, runningPod Pod, gracePeriodOverride *int64) error
GetPodStatus(uid types.UID, name, namespace string) (*PodStatus, error)
PullImage(image ImageSpec, pullSecrets []api.Secret) error
IsImagePresent(image ImageSpec) (bool, error)
ListImages() ([]Image, error)
RemoveImage(image ImageSpec) error
ImageStats() (*ImageStats, error)
GetNetNS(containerID ContainerID) (string, error)
GetPodContainerID(*Pod) (ContainerID, error)
GetContainerLogs(pod *api.Pod, containerID ContainerID, logOptions *api.PodLogOptions, stdout, stderr io.Writer) (err error)
DeleteContainer(containerID ContainerID) error
ContainerCommandRunner
ContainerAttacher}
type ContainerAttacher interface {
AttachContainer(id ContainerID, stdin io.Reader, stdout, stderr io.WriteCloser, tty bool, resize <-chan term.Size) (err error)
}
type ContainerCommandRunner interface {
ExecInContainer(containerID ContainerID, cmd []string, stdin io.Reader, stdout, stderr io.WriteCloser, tty bool, resize <-chan term.Size) error
PortForward(pod *Pod, port uint16, stream io.ReadWriteCloser) error
}
实现该接口的对象是pkg/kubelet/dockertools/docker_manager.go中的DockerManager。
DockerManager结构体中有个client的成员,实际上就是dockerClient,上面已经介绍过。 DockerManager实现了Runtime接口的所有方法,这些方法中会使用dockerClient来实现对底层docker服务的操作。
当有新的pod创建请求时,会调用SyncPod方法来同步用户请求pod到本地物理机上。
containerGC就是调用的此处的GarbageCollect方法,前面已经介绍过。
3. Kubelet启动流程
UnsecuredKubeletConfig方法主要是根据KubeletServer对象来初始化KubeletConfig对象。
CreateAPIServerClientConfig方法主要是用来初始化KubeClient对象,用来与APIServer交互。
makePodSourceConfig方法相当于容器配置清单的生产者,有三种方式获取配置清单,分别是文件方式,url方式,ApiServer方式,目前我们主要是用ApiServer方式。其中会创建一个ListWatch对象,一直监听ApiServer中pod的变化,当有变化时,变化的pod就会被放到一个PodUpdate类型的channel中,这个channel会一直往下传递到syncLoop的updates参数中,以便podWorkers从其中取出消息进行处理。
NewMainKubelet方法主要是初始化Kubelet对象的各个成员变量,包括我们之前介绍的那些,各个manager等。
StartGarbageCollection方法主要是用来启动垃圾回收,包括containerGC.GrabageCollect和imageManager.GrabageCollect。
最后调用Kubelet对象的Run方法,这里面启动了各个manager的start方法: * kl.imageManager.Start() * kl.containerManager.Start() * kl.volumeManager.Run(wait.NeverStop) * kl.cadvisor.Start() * kl.statusManager.Start() * kl.probeManager.Start()
最后启动了Kubelet的main sync loop,来创建podworkers并获取pod的创建删除更新等消息,在物理节点上调用dockerClient做出相应的处理。
4. Kubelet创建pod的流程
syncLoopIteration会从上面提到的生产者创建的channel中取出消息,然后根据消息的不同类型(kubetypes.ADD kubetypes.UPDATE kubetypes.REMOVE kubetypes.RECONCILE)做不同的处理。
kubetypes.ADD kubetypes.UPDATE类型的消息都会调用dispatchWork启动一个新的podWorkers来syncPod。
首先回调的是kubelet中的syncLoop,其中做的事情主要是: * 如果是创建pod,记录pod的创建时间; * 更新statusManager中的pod status; * 检查这个pod的source是否是apiserver,如果不是,并且还没有mirrorPod,则创建; * 为pod创建数据目录; * 等待磁盘的挂载和卸载; * 回调docker_manager的SyncPod方法。
docker_manager的SyncPod中会调用computePodContainerChanges来比较apiserver发送过来的pod和kubelet的缓存中podStatus有什么不同,并把需不需要重新创建基础容器,哪些container需要重启,哪些container需要继续保持运行等信息记录下来,然后根据不同的情况依次做出相应的处理。比如需要重新创建基础容器并且需要重启的containers个数是0,因为只要重新创建基础容器,那pod中所有的容器都需要重启,所以这种情况说明pod有问题,就可以将pod kill掉。创建容器有最终会调用kubelet.dockerClient.CreateContainer()以及kubelet.dockerClient.StartContainer()。
当syncPod运行结束后,会把当前pod的UID加入kubelet的WorkQueue中,syncLoopIteration还会启动两个定时任务,其中一个每秒钟将WorkQueue中的pod同步一次,另一个定时任务每隔两秒钟会清理一下没用的pod,podWorkers等资源。