kubelet源码分析

"kubelet source code notes"

Posted by zhangweiwei on September 7, 2016

kubelet: The primary “node agent” that runs on each node.

Kubelet源码分析

Kubelet Version: v1.3.4

1. Kubelet简介

Kubelet运行在Kubernetes的每个Node上,主要负责容器的生命周期管理和状态维护,以及监控数据采集。kubelet从配置文件或者HTTP请求或者apiserver获取容器配置清单,容器配置清单是一个描述pod的文件,然后它会根据最新获取到的清单同步本地node上的pod和容器的状态。

2. Kubelet类型介绍

Kubelet是组件中重要的结构体,它的位置在pkg/kubelet/kubelet.go中

type Kubelet struct {
       hostname      string
       nodeName      string
       // 最终执行docker命令的client,调用的是
       // github.com/docker/engine-api/client
       dockerClient  dockertools.DockerInterface
       // 从apiserver拉取pod配置清单时使用的client
       kubeClient    clientset.Interface
       // 维护kubelet文件的目录,默认值是/var/lib/kubelet
       rootDirectory string
       // 同步pod的协程
       podWorkers PodWorkers
       // 管理pod的manager
       podManager kubepod.Manager
       // 获取节点信息及容器信息的接口
       cadvisor cadvisor.Interface
       // 已停止的容器的垃圾收集策略
       containerGC kubecontainer.ContainerGC
       // 管理image的操作以及image的垃圾收集策略
       imageManager imageManager
       // 创建pod时,用来检查磁盘空间是否充足
       diskSpaceManager diskSpaceManager
       // 维护一个podstatus的状态缓存,供查询更新
       statusManager status.Manager
       // 管理pod上磁盘的挂载和卸载
       volumeManager volumemanager.VolumeManager
       //管理pod image,负责容器的垃圾收集,调用底层的dockerclient实现功能
       containerRuntime kubecontainer.Runtime
       ...
       ...
       ...
}

2.1 dockerclient功能介绍

接口:

type DockerInterface interface {
       ListContainers(options dockertypes.ContainerListOptions) 	   		
	([]dockertypes.Container, error)
       InspectContainer(id string) (*dockertypes.ContainerJSON, error)
       CreateContainer(dockertypes.ContainerCreateConfig) 	(*dockertypes.ContainerCreateResponse, error)
       StartContainer(id string) error
       StopContainer(id string, timeout int) error
       RemoveContainer(id string, opts dockertypes.ContainerRemoveOptions) 	
	error
       InspectImage(image string) (*dockertypes.ImageInspect, error)
       ListImages(opts dockertypes.ImageListOptions) ([]dockertypes.Image, 	
	error)
       PullImage(image string, auth dockertypes.AuthConfig, opts 	dockertypes.ImagePullOptions) error
       RemoveImage(image string, opts dockertypes.ImageRemoveOptions) 	([]dockertypes.ImageDelete, error)
       ImageHistory(id string) ([]dockertypes.ImageHistory, error)
       Logs(string, dockertypes.ContainerLogsOptions, StreamOptions) error
       Version() (*dockertypes.Version, error)
       Info() (*dockertypes.Info, error)
       CreateExec(string, dockertypes.ExecConfig) 	(*dockertypes.ContainerExecCreateResponse, error)
       StartExec(string, dockertypes.ExecStartCheck, StreamOptions) error
       InspectExec(id string) (*dockertypes.ContainerExecInspect, error)
       AttachToContainer(string, dockertypes.ContainerAttachOptions, 		
	StreamOptions) error
       ResizeContainerTTY(id string, height, width int) error
       ResizeExecTTY(id string, height, width int) error
}

2.2 podWorkers功能介绍

接口:

type PodWorkers interface {
       UpdatePod(options *UpdatePodOptions)
       ForgetNonExistingPodWorkers(desiredPods map[types.UID]empty)
       ForgetWorker(uid types.UID)
}

当有创建pod或者更新pod的请求推送过来时,就会调用podWorkers的UpdatePod方法,生成一个go协程执行podWorkers的managePodLoop方法,每个pod对应一个go协程,如果协程不存在,才创建,然后将需要同步的pod数据丢到跟协程相通的channel中。managePodLoop方法中会再调用Kubelet的syncPod方法,来最终同步apiserver中的pod状态到本地。

func (p *podWorkers) UpdatePod(options *UpdatePodOptions) {
       pod := options.Pod
       uid := pod.UID
       var podUpdates chan UpdatePodOptions
       var exists bool

       p.podLock.Lock()
       defer p.podLock.Unlock()
       if podUpdates, exists = p.podUpdates[uid]; !exists {
              podUpdates = make(chan UpdatePodOptions, 1)
              p.podUpdates[uid] = podUpdates

              go func() {
                     defer runtime.HandleCrash()
                     p.managePodLoop(podUpdates)
              }()
       }
       if !p.isWorking[pod.UID] {
              p.isWorking[pod.UID] = true
              podUpdates <- *options
       } else {
              // if a request to kill a pod is pending, we do not let anything overwrite that request.
              update, found := p.lastUndeliveredWorkUpdate[pod.UID]
              if !found || update.UpdateType != kubetypes.SyncPodKill {
                     p.lastUndeliveredWorkUpdate[pod.UID] = *options
              }
       }
}


func (p *podWorkers) managePodLoop(podUpdates <-chan UpdatePodOptions) {
       var lastSyncTime time.Time
       for update := range podUpdates {
              err := func() error {
                     podUID := update.Pod.UID
                     status, err := p.podCache.GetNewerThan(podUID, lastSyncTime)
                     if err != nil {
                            return err
                     }
                     err = p.syncPodFn(syncPodOptions{
                            mirrorPod:      update.MirrorPod,
                            pod:            update.Pod,
                            podStatus:      status,
                            killPodOptions: update.KillPodOptions,
                            updateType:     update.UpdateType,
                     })
                     lastSyncTime = time.Now()
                     if err != nil {
                            return err
                     }
                     return nil
              }()
              // notify the call-back function if the operation succeeded or not
              if update.OnCompleteFunc != nil {
                     update.OnCompleteFunc(err)
              }
              if err != nil {
                     glog.Errorf("Error syncing pod %s, skipping: %v", update.Pod.UID, err)
                     p.recorder.Eventf(update.Pod, api.EventTypeWarning, events.FailedSync, "Error syncing pod, skipping: %v", err)
              }
              p.wrapUp(update.Pod.UID, err)
       }
}

2.3 podManager功能介绍

接口:

type Manager interface {
       GetPods() []*api.Pod
       GetPodByFullName(podFullName string) (*api.Pod, bool)
       GetPodByName(namespace, name string) (*api.Pod, bool)
       GetPodByUID(types.UID) (*api.Pod, bool)
       GetPodByMirrorPod(*api.Pod) (*api.Pod, bool)
       GetMirrorPodByPod(*api.Pod) (*api.Pod, bool)
       GetPodsAndMirrorPods() ([]*api.Pod, []*api.Pod)
       SetPods(pods []*api.Pod)
       AddPod(pod *api.Pod)
       UpdatePod(pod *api.Pod)
       DeletePod(pod *api.Pod)
       DeleteOrphanedMirrorPods()
       TranslatePodUID(uid types.UID) types.UID
       GetUIDTranslations()(podToMirror, mirrorToPod map[types.UID]types.UID)
       IsMirrorPodOf(mirrorPod, pod *api.Pod) bool
       MirrorClient
}

实现pod的增删改查等功能

2.4 cadvisor功能介绍

接口:

type Interface interface {
       Start() error
       DockerContainer(name string, req *cadvisorapi.ContainerInfoRequest) 	
(cadvisorapi.ContainerInfo, error)
       ContainerInfo(name string, req *cadvisorapi.ContainerInfoRequest) (*cadvisorapi.ContainerInfo, error)
       ContainerInfoV2(name string, options cadvisorapiv2.RequestOptions) (map[string]cadvisorapiv2.ContainerInfo, error)
       SubcontainerInfo(name string, req *cadvisorapi.ContainerInfoRequest) (map[string]*cadvisorapi.ContainerInfo, error)
       MachineInfo() (*cadvisorapi.MachineInfo, error)
       VersionInfo() (*cadvisorapi.VersionInfo, error)
       ImagesFsInfo() (cadvisorapiv2.FsInfo, error)
       RootFsInfo() (cadvisorapiv2.FsInfo, error)
       WatchEvents(request *events.Request) (*events.EventChannel, error)
}

主要作用是获取本地node上的容器信息,机器信息,镜像信息,以及磁盘信息等

构建Kubelet对象时会初始化一个cadvisor对象,此时会启动一个httpserver服务默认监听4194端口, RegisterHandlers会给/containers和/docker路径分别注册一个handler, RegisterPrometheusHandler会给/metrics路径注册一个handler,当访问这些路径时就会调用相应的handler获取监控数据。

func (cc *cadvisorClient) exportHTTP(port uint) error {
       // Register the handlers regardless as this registers the prometheus
       // collector properly.
       mux := http.NewServeMux()
       err := cadvisorhttp.RegisterHandlers(mux, cc, "", "", "", "")
       if err != nil {
              return err
       }

       cadvisorhttp.RegisterPrometheusHandler(mux, cc, "/metrics", nil)

       // Only start the http server if port > 0
       if port > 0 {
              serv := &http.Server{
                     Addr:    fmt.Sprintf(":%d", port),
                     Handler: mux,
              }

              // TODO(vmarmol): Remove this when the cAdvisor port is once again free.
              // If export failed, retry in the background until we are able to bind.
              // This allows an existing cAdvisor to be killed before this one registers.
              go func() {
                     defer runtime.HandleCrash()

                     err := serv.ListenAndServe()
                     for err != nil {
                            glog.Infof("Failed to register cAdvisor on port %d, retrying. Error: %v", port, err)
                            time.Sleep(time.Minute)
                            err = serv.ListenAndServe()
                     }
              }()
       }

       return nil
}

kubelet启动时会调用cadvisor的Start方法,实际上调用的是cadvisor manager的Start方法,Start里面会为每个cgroup下检测到的容器启动一个housekeeping协程,来定时更新cadvisor中关于容器的监控数据,最后还会启动一个全局的housekeeping协程,保证增加容器删除容器时cadvisor中的数据也能及时更新。

2.5 containerGC功能介绍

接口:

type ContainerGC interface {
       // Garbage collect containers.
       GarbageCollect(allSourcesReady bool) error
}

实际上调用的是containerRuntime的GarbageCollect方法。

容器垃圾收集会根据一个策略来执行,用到的参数有三个:

gcPolicy := kubecontainer.ContainerGCPolicy{
       // 容器的最小存活时间
       MinAge:             kc.MinimumGCAge,
       // 每个pod的最大容器个数
       MaxPerPodContainer: kc.MaxPerPodContainerCount,
       // 整个node上的最大容器个数
       MaxContainers:      kc.MaxContainerCount,
}

主要步骤是: * 清除所有存活时间大于MinAge,不是k8s定义的容器 * 清除所属的pod已经删除的容器 * 如果某个pod的容器个数超过规定的最大个数,清除超出的时间最老的容器 * 如果node上的容器个数超过规定的最大个数,清除超出的时间最老的容器

2.6 imageManager功能介绍

接口:

type imageManager interface {
       GarbageCollect() error
       Start() error
       GetImageList() ([]kubecontainer.Image, error)
       DeleteUnusedImages() (int64, error)
}

/pkg/kubelet/kubelet.go中的Run(updates <-**chan **kubetypes.PodUpdate)方法中会启动imageManager的Start方法和GarbageCollect方法。

Start方法会启动一个无限循环的协程,主要做的事情是维护imageRecord这个记录,先从containerRuntime获取实际的images,然后跟imageRecord中的记录作比较,加入新增加的image的记录,更新image的firstDetected时间,lastUsed时间以及size,然后将已删除的image从imageRecord记录中删除。

GarbageCollect方法主要用到了三个参数:

type ImageGCPolicy struct {
       // 镜像磁盘使用的百分比大于此值,则垃圾回收
       HighThresholdPercent int
       // 镜像磁盘使用的百分比小于此值,则不用垃圾回收
       LowThresholdPercent int
       // 镜像回收前使用的最小时间
       MinAge time.Duration
}

执行步骤是: * 先通过cadvisor获取镜像的磁盘使用情况 * 如果镜像磁盘使用的百分比大于HighThresholdPercent,则开始垃圾回收 * 垃圾回收的原则是,将imageRecord中的images排序,lastUsed时间较老的排在前面,lastUsed相同的则firstDetected较老的排在前面,然后遍历imageRecord,如果lastUsed时间等于或大于当前时间,则不释放,如果image的使用时间小于规定的MinAge,也不释放,否则释放。

2.7 diskSpaceManager功能介绍

接口:

type diskSpaceManager interface {
       // Checks the available disk space
       IsRootDiskSpaceAvailable() (bool, error)
       IsRuntimeDiskSpaceAvailable() (bool, error)
}

构建diskSpaceManager的policy的时候会用到两个参数:

type DiskSpacePolicy struct {
       // docker images的空余空间限制
       DockerFreeDiskMB int
       // host volumes的空余空间限制
       RootFreeDiskMB int
}

新创建pod时调用这两个方法来检查roodiskspace(host volumes的占用的位置)以及runtimediskspace(docker images占用的位置)是否充足。这两个方法中的磁盘数据是调用cadvisor的接口获取的。

2.8 statusManager功能介绍

接口:

type Manager interface {
       PodStatusProvider
       Start()
       SetPodStatus(pod *api.Pod, status api.PodStatus)
       SetContainerReadiness(podUID types.UID, containerID 	kubecontainer.ContainerID, ready bool)
       TerminatePod(pod *api.Pod)
       RemoveOrphanedStatuses(podUIDs map[types.UID]bool)
}
type PodStatusProvider interface {
       GetPodStatus(uid types.UID) (api.PodStatus, bool)
}

manager结构:

type manager struct {
       kubeClient clientset.Interface
       podManager kubepod.Manager
       // Map from pod UID to sync status of the corresponding pod.
       podStatuses      map[types.UID]versionedPodStatus
       podStatusesLock  sync.RWMutex
       podStatusChannel chan podStatusSyncRequest
       // Map from (mirror) pod UID to latest status version successfully sent to the API server.
       // apiStatusVersions must only be accessed from the sync thread.
       apiStatusVersions map[types.UID]uint64
}

Kubelet启动时,Kubelet对象初始化完成后会调用statusManager的start方法,来启动一个协程定时批量同步manager中的podStatuses到apiserver。

批量同步时会找出那些apiStatusVersions 的pod version和podStatuses的pod version不同的pod来同步, podManager中的podstatus和podStatuses中的podstatus不同的pod也会调用syncPod方法来同步。

创建pod时也会调用statusManager的SetPodStatus方法,此方法中会把收到的podstatus放入podStatusChannel中,从而启动syncPod方法,然后同步当前状态到apiserver中,这个过程中pod的版本号会加1,如果向apiserver的同步没有成功,则apiStatusVersions 跟podStatuses的pod版本号就不相同,这样就需要批量同步的时候再更新。

2.9 volumeManager功能介绍

接口:

type VolumeManager interface {
       Run(stopCh <-chan struct{})
       WaitForAttachAndMount(pod *api.Pod) error
       GetMountedVolumesForPod(podName types.UniquePodName)                 
               container.VolumeMap
       GetExtraSupplementalGroupsForPod(pod *api.Pod) []int64
       GetVolumesInUse() []api.UniqueVolumeName
       VolumeIsAttached(volumeName api.UniqueVolumeName) bool
       MarkVolumesAsReportedInUse(volumesReportedAsInUse
               []api.UniqueVolumeName)
}

volumeManager的作用主要是卸载挂载volume,以及查询pod的volume。

Start方法也是在kubelet启动时,Kubelet对象初始化完成后调用的,Start方法也会启动一个协程,定时检查来保证该卸载的volume已经被卸载,该挂载的volume被正确挂载。

2.10 containerRuntime功能介绍

接口:

type Runtime interface {
       Type() string
       Version() (Version, error)
       Version() (Version, error)
       APIVersion() (Version, error)
       Status() error
       GetPods(all bool) ([]*Pod, error)
       GarbageCollect(gcPolicy ContainerGCPolicy, allSourcesReady bool) error
       SyncPod(pod *api.Pod, apiPodStatus api.PodStatus, podStatus *PodStatus, pullSecrets []api.Secret, backOff *flowcontrol.Backoff) PodSyncResult
       KillPod(pod *api.Pod, runningPod Pod, gracePeriodOverride *int64) error
       GetPodStatus(uid types.UID, name, namespace string) (*PodStatus, error)
       PullImage(image ImageSpec, pullSecrets []api.Secret) error
       IsImagePresent(image ImageSpec) (bool, error)
       ListImages() ([]Image, error)
       RemoveImage(image ImageSpec) error
       ImageStats() (*ImageStats, error)
       GetNetNS(containerID ContainerID) (string, error)
       GetPodContainerID(*Pod) (ContainerID, error)
       GetContainerLogs(pod *api.Pod, containerID ContainerID, logOptions *api.PodLogOptions, stdout, stderr io.Writer) (err error)
       DeleteContainer(containerID ContainerID) error
       ContainerCommandRunner
       ContainerAttacher}
type ContainerAttacher interface {
       AttachContainer(id ContainerID, stdin io.Reader, stdout, stderr io.WriteCloser, tty bool, resize <-chan term.Size) (err error)
}
type ContainerCommandRunner interface {
       ExecInContainer(containerID ContainerID, cmd []string, stdin io.Reader, stdout, stderr io.WriteCloser, tty bool, resize <-chan term.Size) error
       PortForward(pod *Pod, port uint16, stream io.ReadWriteCloser) error
}

实现该接口的对象是pkg/kubelet/dockertools/docker_manager.go中的DockerManager。

DockerManager结构体中有个client的成员,实际上就是dockerClient,上面已经介绍过。 DockerManager实现了Runtime接口的所有方法,这些方法中会使用dockerClient来实现对底层docker服务的操作。

当有新的pod创建请求时,会调用SyncPod方法来同步用户请求pod到本地物理机上。

containerGC就是调用的此处的GarbageCollect方法,前面已经介绍过。

3. Kubelet启动流程

UnsecuredKubeletConfig方法主要是根据KubeletServer对象来初始化KubeletConfig对象。

CreateAPIServerClientConfig方法主要是用来初始化KubeClient对象,用来与APIServer交互。

makePodSourceConfig方法相当于容器配置清单的生产者,有三种方式获取配置清单,分别是文件方式,url方式,ApiServer方式,目前我们主要是用ApiServer方式。其中会创建一个ListWatch对象,一直监听ApiServer中pod的变化,当有变化时,变化的pod就会被放到一个PodUpdate类型的channel中,这个channel会一直往下传递到syncLoop的updates参数中,以便podWorkers从其中取出消息进行处理。

NewMainKubelet方法主要是初始化Kubelet对象的各个成员变量,包括我们之前介绍的那些,各个manager等。

StartGarbageCollection方法主要是用来启动垃圾回收,包括containerGC.GrabageCollect和imageManager.GrabageCollect。

最后调用Kubelet对象的Run方法,这里面启动了各个manager的start方法: * kl.imageManager.Start()  * kl.containerManager.Start()  * kl.volumeManager.Run(wait.NeverStop)  * kl.cadvisor.Start()  * kl.statusManager.Start()  * kl.probeManager.Start()

最后启动了Kubelet的main sync loop,来创建podworkers并获取pod的创建删除更新等消息,在物理节点上调用dockerClient做出相应的处理。

4. Kubelet创建pod的流程

syncLoopIteration会从上面提到的生产者创建的channel中取出消息,然后根据消息的不同类型(kubetypes.ADD kubetypes.UPDATE kubetypes.REMOVE kubetypes.RECONCILE)做不同的处理。

kubetypes.ADD kubetypes.UPDATE类型的消息都会调用dispatchWork启动一个新的podWorkers来syncPod。

首先回调的是kubelet中的syncLoop,其中做的事情主要是: * 如果是创建pod,记录pod的创建时间; * 更新statusManager中的pod status; * 检查这个pod的source是否是apiserver,如果不是,并且还没有mirrorPod,则创建; * 为pod创建数据目录; * 等待磁盘的挂载和卸载; * 回调docker_manager的SyncPod方法。

docker_manager的SyncPod中会调用computePodContainerChanges来比较apiserver发送过来的pod和kubelet的缓存中podStatus有什么不同,并把需不需要重新创建基础容器,哪些container需要重启,哪些container需要继续保持运行等信息记录下来,然后根据不同的情况依次做出相应的处理。比如需要重新创建基础容器并且需要重启的containers个数是0,因为只要重新创建基础容器,那pod中所有的容器都需要重启,所以这种情况说明pod有问题,就可以将pod kill掉。创建容器有最终会调用kubelet.dockerClient.CreateContainer()以及kubelet.dockerClient.StartContainer()。

当syncPod运行结束后,会把当前pod的UID加入kubelet的WorkQueue中,syncLoopIteration还会启动两个定时任务,其中一个每秒钟将WorkQueue中的pod同步一次,另一个定时任务每隔两秒钟会清理一下没用的pod,podWorkers等资源。