client-go: k8s选主
快速上手
下面这个代码就是一个选主的大概逻辑
package main import ( \"context\" \"flag\" \"fmt\" _ \"net/http/pprof\" \"os\" \"path/filepath\" \"time\" \"golang.org/x/exp/rand\" v1 \"k8s.io/api/core/v1\" metav1 \"k8s.io/apimachinery/pkg/apis/meta/v1\" \"k8s.io/apimachinery/pkg/util/uuid\" \"k8s.io/client-go/kubernetes\" \"k8s.io/client-go/kubernetes/scheme\" corev1 \"k8s.io/client-go/kubernetes/typed/core/v1\" \"k8s.io/client-go/tools/clientcmd\" \"k8s.io/client-go/tools/leaderelection\" \"k8s.io/client-go/tools/leaderelection/resourcelock\" \"k8s.io/client-go/tools/record\" \"k8s.io/client-go/util/homedir\") func main() { ctx := context.Background() var kubeconfig *string if home := homedir.HomeDir(); home != \"\" { kubeconfig = flag.String(\"kubeconfig\", filepath.Join(home, \".kube\", \"config\"), \"\") } config, err := clientcmd.BuildConfigFromFlags(\"\", *kubeconfig) if err != nil { panic(err) } clientset, err := kubernetes.NewForConfig(config) if err != nil { panic(err) } broadcaster := record.NewBroadcaster() broadcaster.StartRecordingToSink(&corev1.EventSinkImpl{ Interface: clientset.CoreV1().Events(\"default\"), }) eventRecorder := broadcaster.NewRecorder(scheme.Scheme, v1.EventSource{Component: \"hello-word\"}) createIdentity := func() string { hostname, err := os.Hostname() if err != nil { hostname = fmt.Sprintf(\"rand%d\", rand.Intn(10000)) } return fmt.Sprintf(\"%s_%s\", hostname, string(uuid.NewUUID())) } lock := &resourcelock.LeaseLock{ LeaseMeta: metav1.ObjectMeta{ Namespace: \"default\", Name: \"hello-world\", }, Client: clientset.CoordinationV1(), LockConfig: resourcelock.ResourceLockConfig{ Identity: createIdentity(), EventRecorder: eventRecorder, }, } leaderelection.RunOrDie(ctx, leaderelection.LeaderElectionConfig{ Lock: lock, LeaseDuration: 5 * time.Second, RenewDeadline: 4 * time.Second, RetryPeriod: 2 * time.Second, Callbacks: leaderelection.LeaderCallbacks{ OnStartedLeading: func(ctx context.Context) { fmt.Println(\"start leading\") }, OnStoppedLeading: func() { fmt.Println(\"stop leading\") }, OnNewLeader: func(identity string) { fmt.Printf(\"new leader: %s\\n\", identity) }, }, Coordinated: false, })}
我们同时启动多个终端来运行这个程序,并且杀掉主节点来模拟节点挂掉,观察是否会重新进行选举出新的master
从图中可以看到,第一个程序选为主节点之后,第二三个程序自动成为slave,我们 kill 掉第一个程序之后,第二个程序抢到了锁成为了 master
租约 Lease
k8s 内置很多种资源,其中 lease 也是k8s的一种资源,顾名思义表达的是租户对某种资源的
占有的信息表示
➜ ~ k get lease -A NAMESPACE NAMEHOLDER AGEdefault hello-world VM-221-245-tencentos_bada2219-3a27-4b19-8b80-23fc05604391 6d7hkube-node-lease vm-221-245-tencentos vm-221-245-tencentos 36dkube-system kube-controller-manager VM-221-245-tencentos_8f5a4f85-ca0c-4b5f-ac81-8b0ff5ff2e49 36dkube-system kube-scheduler VM-221-245-tencentos_4fab96c4-156b-4a77-b862-87224be44cb2 36d
比如我们查看一个 k8s 的 node 的 lease
➜ ~ k get lease vm-221-245-tencentos -n kube-node-lease -oyaml apiVersion: coordination.k8s.io/v1kind: Lease # 资源的种类metadata: creationTimestamp: \"2024-09-27T12:24:07Z\" # 这个资源的创建时间戳 name: vm-221-245-tencentos # 名称 namespace: kube-node-lease # 命名空间 ownerReferences: - apiVersion: v1 kind: Node name: vm-221-245-tencentos # 这个资源的占有名称 uid: 5df61ad6-cc1a-4669-8b0e-d48a5b0ffb91 resourceVersion: \"3811080\" uid: 411c6d4e-5afb-4eba-a1a0-8a56d00b75dbspec: holderIdentity: vm-221-245-tencentos leaseDurationSeconds: 40 # 租约的时常40s renewTime: \"2024-11-03T01:35:05.171458Z\" # 租约的更新时间
而实际上 leader 选举中的资源 lock 其实就是一种 lease,表明 master 主节点持有对某个资源
的唯一性
查看 https://github.com/kubernetes/client-go/tree/master/tools/leaderelection 的源文件
可以看到 leaderElection 的目录结构,主要分为 resourcelock 和 leaderelection 的主文件,
文件内容不是很多
➜ leaderelection git:(master) tree jinchaozhu@VM-221-245-tencentos leaderelection %.├── healthzadaptor.go├── healthzadaptor_test.go├── leaderelection.go├── leaderelection_test.go├── leasecandidate.go├── leasecandidate_test.go├── metrics.go├── OWNERS└── resourcelock ├── interface.go ├── leaselock.go └── multilock.go
数据结构
存储 Lease 相关的信息
// LeaderElectionRecord is the record that is stored in the leader election annotation.// This information should be used for observational purposes only and could be replaced// with a random string (e.g. UUID) with only slight modification of this code.// TODO(mikedanese): this should potentially be versionedtype LeaderElectionRecord struct { // leader的标识 HolderIdentity string`json:\"holderIdentity\"` // 选举间隔 LeaseDurationSeconds int `json:\"leaseDurationSeconds\"` // 选举成为leader的时间 AcquireTime metav1.Time `json:\"acquireTime\"` // 续任时间 RenewTime metav1.Time `json:\"renewTime\"` // leader位置的转让次数 LeaderTransitions int `json:\"leaderTransitions\"` // 选举策略 Strategy v1.CoordinatedLeaseStrategy `json:\"strategy\"` PreferredHolder string`json:\"preferredHolder\"`}
Elector 相关的配置文件
type LeaderElectionConfig struct { // 锁,用来保证时序竞态 Lock rl.Interface // 非leader候选者尝试获取leadership的间隔时间 // Core clients default this value to 15 seconds. LeaseDuration time.Duration // leade 放弃leadership角色之前的确认时间 // Core clients default this value to 10 seconds. RenewDeadline time.Duration // 候选者应该获取leader角色的重试时间 // Core clients default this value to 2 seconds. RetryPeriod time.Duration // 回掉函数 // 比如开始leader选举触发什么、成为leader触发什么、放弃leader触发什么 Callbacks LeaderCallbacks // WatchDog is the associated health checker // WatchDog may be null if it\'s not needed/configured. WatchDog *HealthzAdaptor // ReleaseOnCancel should be set true if the lock should be released // when the run context is cancelled. If you set this to true, you must // ensure all code guarded by this lease has successfully completed // prior to cancelling the context, or you may have two processes // simultaneously acting on the critical path. ReleaseOnCancel bool // Name is the name of the resource lock for debugging Name string // Coordinated will use the Coordinated Leader Election feature // WARNING: Coordinated leader election is ALPHA. Coordinated bool}
主要逻辑
选举的逻辑大概如下:
- 刚开始实例启动的时候,各个实例都是一个 LeaderElector 的角色,最先开始选举的就成
为 leader;成为 leader 之后便会维护一个 LeaseLock 供每个 LeaderElector 进行访问查询
- 其余的 LeaderElector 进入候选状态,hang 住监控 leader 的状态,必要时异常会再次参与选举
- Leader 获取到 Leadership 之后会持续性的刷新自己的 leader 状态
func (le *LeaderElector) Run(ctx context.Context) { defer runtime.HandleCrash() defer le.config.Callbacks.OnStoppedLeading() // StoppedLeading 函数每个节点都会执行 // 未获得leadership的节点这里就会返回 // acquire 就是各个节点来争抢 leadership if !le.acquire(ctx) { return // ctx signalled done } ctx, cancel := context.WithCancel(ctx) defer cancel() // 这里只有获取到leadership的角色的节点才会执行 StartedLeading go le.config.Callbacks.OnStartedLeading(ctx) // 获取到 leadership 之后不停的刷新当前的状态信息 le.renew(ctx)} func (le *LeaderElector) acquire(ctx context.Context) bool { ... klog.Infof(\"attempting to acquire leader lease %v...\", desc) wait.JitterUntil(func() { if !le.config.Coordinated { succeeded = le.tryAcquireOrRenew(ctx) // 尝试竞争 } else { succeeded = le.tryCoordinatedRenew(ctx) } .... klog.Infof(\"successfully acquired lease %v\", desc) cancel() ..... }, le.config.RetryPeriod, JitterFactor, true, ctx.Done()) return succeeded}
核心逻辑 tryAcquireOrRenew
// tryAcquireOrRenew tries to acquire a leader lease if it is not already acquired,// else it tries to renew the lease if it has already been acquired. Returns true// on success else returns false.func (le *LeaderElector) tryAcquireOrRenew(ctx context.Context) bool { now := metav1.NewTime(le.clock.Now()) leaderElectionRecord := rl.LeaderElectionRecord{ // 这里 identity 为当前竞选者的标识 HolderIdentity: le.config.Lock.Identity(), LeaseDurationSeconds: int(le.config.LeaseDuration / time.Second), RenewTime: now, AcquireTime: now, } // 1.判断是否是Leader,如果是Leader并且Lease有效,则进行 Lock 的信息更新 if le.IsLeader() && le.isLeaseValid(now.Time) { oldObservedRecord := le.getObservedRecord() leaderElectionRecord.AcquireTime = oldObservedRecord.AcquireTime leaderElectionRecord.LeaderTransitions = oldObservedRecord.LeaderTransitions err := le.config.Lock.Update(ctx, leaderElectionRecord) ........ } // 2.不是Leader,则进行锁的获取 oldLeaderElectionRecord, oldLeaderElectionRawRecord, err := le.config.Lock.Get(ctx) if err != nil { ........ } // 3.对比检查是否 Elctection 的 Record 信息 // 需要更新则刷新本地的缓存信息 if !bytes.Equal(le.observedRawRecord, oldLeaderElectionRawRecord) { le.setObservedRecord(oldLeaderElectionRecord) le.observedRawRecord = oldLeaderElectionRawRecord } if len(oldLeaderElectionRecord.HolderIdentity) > 0 && le.isLeaseValid(now.Time) && !le.IsLeader() { klog.V(4).Infof(\"lock is held by %v and has not yet expired\", oldLeaderElectionRecord.HolderIdentity) return false } // 4. 按照是否Leader判断是否进行更新 ElectionRecord if le.IsLeader() { leaderElectionRecord.AcquireTime = oldLeaderElectionRecord.AcquireTime leaderElectionRecord.LeaderTransitions = oldLeaderElectionRecord.LeaderTransitions le.metrics.slowpathExercised(le.config.Name) } else { leaderElectionRecord.LeaderTransitions = oldLeaderElectionRecord.LeaderTransitions + 1 } // 5.更新锁本身的信息 if err = le.config.Lock.Update(ctx, leaderElectionRecord); err != nil { klog.Errorf(\"Failed to update lock: %v\", err) return false } // 更新锁成功则说明当前节点持有锁:抢锁成功/续期成功 le.setObservedRecord(&leaderElectionRecord) return true}
上面的 Lock 其实是自实现的一种 LeaseLock
// Interface offers a common interface for locking on arbitrary// resources used in leader election. The Interface is used// to hide the details on specific implementations in order to allow// them to change over time. This interface is strictly for use// by the leaderelection code.type Interface interface { // Get returns the LeaderElectionRecord Get(ctx context.Context) (*LeaderElectionRecord, []byte, error) // Create attempts to create a LeaderElectionRecord Create(ctx context.Context, ler LeaderElectionRecord) error // Update will update and existing LeaderElectionRecord Update(ctx context.Context, ler LeaderElectionRecord) error .....} // Get returns the election record from a Lease specfunc (ll *LeaseLock) Get(ctx context.Context) (*LeaderElectionRecord, []byte, error) { lease, err := ll.Client.Leases(ll.LeaseMeta.Namespace).Get(ctx, ll.LeaseMeta.Name, metav1.GetOptions{}) if err != nil { return nil, nil, err } ll.lease = lease record := LeaseSpecToLeaderElectionRecord(&ll.lease.Spec) recordByte, err := json.Marshal(*record) if err != nil { return nil, nil, err } return record, recordByte, nil} // Create attempts to create a Leasefunc (ll *LeaseLock) Create(ctx context.Context, ler LeaderElectionRecord) error { var err error ll.lease, err = ll.Client.Leases(ll.LeaseMeta.Namespace).Create(ctx, &coordinationv1.Lease{ ObjectMeta: metav1.ObjectMeta{ Name: ll.LeaseMeta.Name, Namespace: ll.LeaseMeta.Namespace, }, Spec: LeaderElectionRecordToLeaseSpec(&ler), }, metav1.CreateOptions{}) return err}
怎么判断当前leader是持有租约的呢?
func (le *LeaderElector) isLeaseValid(now time.Time) bool { return le.observedTime.Add(time.Second * time.Duration(le.getObservedRecord().LeaseDurationSeconds)).After(now)}
其实就是判断上次观察到的时间与当前之间的差是否在 LeaseDurationSeconds 的范围内,在
范围内就代表是有效的
那这个选主的主到底是怎么判断的呢?
我们查看下面的判断逻辑
// IsLeader returns true if the last observed leader was this client else returns false.func (le *LeaderElector) IsLeader() bool { return le.getObservedRecord().HolderIdentity == le.config.Lock.Identity()}
其实就是拿当前的 ElectionRecord 和每个实例启动时的配置文件里面的 Identity 来进行比较
判断是否一致即可
而这个 ObservedRecord 的信息是从 k8s 里面进行获取的,这就保证了唯一性
Identity 是每个实例启动的唯一标识,这个字段千万不能重复,否则选举一定失败,报错如下
E1103 15:44:21.019391 3024650 leaderelection.go:429] Failed to update lock optimitically: Operation cannot be fulfilled on leases.coordination.k8s.io \"hello-world\": the object has been modified; please apply your changes to the latest version and try again, falling back to slow path


