Привет, Хабр!

Все мы привыкли к операторам Kubernetes с их паттерном leader election. Один под становится лидером, остальные молча ждут своей участи в тени, обрабатывая события только через его призму. Надежная схема? Безусловно. Но что, если я скажу, что можно создать оператор вообще без единой точки отказа, где каждый под равен друг другу, а координация действий происходит через механизм слухов, gossip‑протокол? Звучит как ересь? Возможно. Но это работает.

Наши кластеры Kubernetes разрослись до таких размеров, что централизованные контроллеры начали хрипеть. Задержки на выборах лидера, проблемы с сетевым разделением, когда «зомби‑лидер» продолжает существовать в своей части сети… Это знакомо многим. Мы стали искать способ децентрализовать логику наших операторов, и взгляд упал на gossip‑протоколы, десятилетиями испытанные в системах вроде Amazon Dynamo или Consul.

Суть замысла: самоорганизующийся оператор

Вместо того чтобы иметь одного лидера, который владеет всей картиной состояния кастомных ресурсов, каждый под нашего оператора содержит полную копию состояния всех интересующих его CR. Согласование состояния не инициируется извне через API‑сервер Kubernetes, а возникает внутри роя подов оператора. Когда один под обнаруживает необходимость реконсиляции, он не бежит сразу что‑то делать. Вместо этого он распространяет информацию о необходимости этой реконсиляции среди других подов через gossip‑протокол.

Таким образом отсутствует единая точка отказа. Сетевое разделение перестает быть фатальным: части кластера могут продолжать работу независимо, координируясь внутри своей партиции. Пропадает задержка, связанная с выборами лидера. Мастером становится каждый.

Выбираем инструмент

Для gossip‑логики не станем изобретать велосипед. Возьмем за основу проверенный SWIM‑протокол. Его идея проста: каждый узел периодически шлет ping случайно выбранному соседу. Если тот не отвечает, узел просит нескольких других узлов проверить его.

Но чистый SWIM имеет недостатки при высокой нагрузке или в нестабильных сетях. Поэтому мы использовали его улучшенную версию — Lifeguard. Она добавляет механизмы для снижения ложных срабатываний, вводя понятие «подозрительного» узла, который проверяется более тщательно, прежде чем будет объявлен мертвым. Для Go есть отличная библиотека — memberlist от HashiCorp, которая реализует именно этот улучшенный протокол.

Лепим CRD

Первым делом определим кастомный ресурс. Допустим, это будет DistributedApp — некое приложение, экземпляры которого должны координироваться без центрального мозга.

package v1alpha1

import (
	metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
)

// +genclient
// +k8s:deepcopy-gen:interfaces=k8s.io/apimachinery/pkg/runtime.Object

// DistributedApp – кастомный ресурс для нашего децентрализованного оператора.
type DistributedApp struct {
	metav1.TypeMeta   `json:",inline"`
	metav1.ObjectMeta `json:"metadata,omitempty"`

	Spec   DistributedAppSpec   `json:"spec,omitempty"`
	Status DistributedAppStatus `json:"status,omitempty"`
}

// DistributedAppSpec определяет желаемое состояние приложения.
type DistributedAppSpec struct {
	Replicas          int32  `json:"replicas"`
	ApplicationImage  string `json:"applicationImage"`
	ConfigMapName     string `json:"configMapName,omitempty"`
}

// DistributedAppStatus отражает текущее состояние.
type DistributedAppStatus struct {
	ReconciledByPod string   `json:"reconciledByPod,omitempty"` // Имя пода, выполнившего последнюю реконсиляцию
	LastReconcileTime metav1.Time `json:"lastReconcileTime,omitempty"`
	ActivePods       []string `json:"activePods,omitempty"` // Список подов оператора, знающих об этом CR
}

// +k8s:deepcopy-gen:interfaces=k8s.io/apimachinery/pkg/runtime.Object

// DistributedAppList – список DistributedApp.
type DistributedAppList struct {
	metav1.TypeMeta `json:",inline"`
	metav1.ListMeta `json:"metadata,omitempty"`
	Items           []DistributedApp `json:"items"`
}

Генерим клиентские либы стандартными средствами controller-tools. Это основа, с которой будет работать оператор.

Сердце оператора

Теперь самое интересное — создание gossip‑слоя поверх memberlist. Нам нужно, чтобы поды оператора не только знали друг о друге, но и обменивались сообщениями о необходимости реконсиляции CR.

Создадим структуру, которая будет инкапсулировать всю gossip‑логику.

package gossip

import (
	"encoding/json"
	"fmt"
	"log"
	"net"
	"os"
	"time"

	"github.com/hashicorp/memberlist"
)

// ReconMessage – сообщение о необходимости реконсиляции конкретного CR.
type ReconMessage struct {
	CRName      string `json:"crName"`      // Имя CR
	CRNamespace string `json:"crNamespace"` // Пространство имен CR
	OriginPod   string `json:"originPod"`   // Под, отправивший сообщение
	Timestamp   int64  `json:"timestamp"`   // Временная метка
}

// GossipManager управляет членством в кластере и распространением сообщений.
type GossipManager struct {
	memberlist    *memberlist.Memberlist
	eventHandler  func(message ReconMessage) // Обработчик входящих сообщений
	localNodeName string
}

// NewGossipManager создает и настраивает новый менеджер.
func NewGossipManager(bindPort int, knownPeers []string, handler func(ReconMessage)) (*GossipManager, error) {
	hostname, _ := os.Hostname()
	// Для уникальности в Kubernetes добавим имя пода из переменной окружения
	podName := os.Getenv("POD_NAME")
	if podName == "" {
		podName = hostname
	}
	localNodeName := fmt.Sprintf("%s-%d", podName, bindPort)

	config := memberlist.DefaultLocalConfig()
	config.Name = localNodeName
	config.BindAddr = "0.0.0.0"
	config.BindPort = bindPort
	config.AdvertisePort = bindPort

	// Увеличиваем таймауты для стабильности в Kubernetes
	config.TCPTimeout = 30 * time.Second
	config.ProbeTimeout = 5 * time.Second
	config.ProbeInterval = 10 * time.Second

	// Создаем кастомный Delegate для обработки сообщений.
	delegate := &customDelegate{
		messageHandler: handler,
	}
	config.Delegate = delegate
	config.Events = delegate

	mlist, err := memberlist.Create(config)
	if err != nil {
		return nil, fmt.Errorf("failed to create memberlist: %v", err)
	}

	if len(knownPeers) > 0 {
		_, err = mlist.Join(knownPeers)
		if err != nil {
			log.Printf("Warning: initial join failed: %v. Will retry.", err)
		}
	}

	gm := &GossipManager{
		memberlist:    mlist,
		eventHandler:  handler,
		localNodeName: localNodeName,
	}

	delegate.gossipManager = gm // Устанавливаем обратную ссылку

	return gm, nil
}

// SpreadReconciliation распространяет сообщение о необходимости реконсиляции.
func (gm *GossipManager) SpreadReconciliation(crName, crNamespace string) error {
	msg := ReconMessage{
		CRName:      crName,
		CRNamespace: crNamespace,
		OriginPod:   gm.localNodeName,
		Timestamp:   time.Now().UnixNano(),
	}

	msgBytes, err := json.Marshal(msg)
	if err != nil {
		return err
	}

	// Рассылаем сообщение всем узлам кластера.
	for _, member := range gm.memberlist.Members() {
		if member.Name == gm.localNodeName {
			continue // Себе не отправляем
		}
		err = gm.memberlist.SendReliable(member, msgBytes)
		if err != nil {
			log.Printf("Failed to send message to %s: %v", member.Name, err)
			// Не прерываем выполнение, пытаемся отправить остальным
		}
	}
	return nil
}

// customDelegate реализует интерфейсы memberlist.Delegate и memberlist.EventDelegate.
type customDelegate struct {
	messageHandler func(ReconMessage)
	gossipManager  *GossipManager
}

// NodeMeta используется для обмена метаданными. Мы можем передать, например, версию оператора.
func (d *customDelegate) NodeMeta(limit int) []byte {
	// Возвращаем пустые метаданные для простоты.
	return []byte{}
}

// NotifyMsg вызывается, когда узел получает сообщение.
func (d *customDelegate) NotifyMsg(msgBytes []byte) {
	if d.messageHandler == nil {
		return
	}
	var msg ReconMessage
	err := json.Unmarshal(msgBytes, &msg)
	if err != nil {
		log.Printf("Failed to unmarshal gossip message: %v", err)
		return
	}
	// Игнорируем сообщения от себя самого (на всякий случай).
	if msg.OriginPod == d.gossipManager.localNodeName {
		return
	}
	log.Printf("Received reconciliation gossip for %s/%s from %s", msg.CRNamespace, msg.CRName, msg.OriginPod)
	d.messageHandler(msg)
}

// GetBroadcasts не используется в нашем простом сценарии.
func (d *customDelegate) GetBroadcasts(overhead, limit int) [][]byte {
	return nil
}

// LocalState и MergeRemoteState используются для обмена полным состоянием (state exchange).
// Это мощный механизм, но для начала обойдемся без него.
func (d *customDelegate) LocalState(join bool) []byte {
	return []byte{}
}
func (d *customDelegate) MergeRemoteState(buf []byte, join bool) {
}

// NotifyJoin обрабатывает событие присоединения нового узла.
func (d *customDelegate) NotifyJoin(node *memberlist.Node) {
	log.Printf("Node joined: %s", node.Name)
}

// NotifyLeave обрабатывает событие отключения узла.
func (d *customDelegate) NotifyLeave(node *memberlist.Node) {
	log.Printf("Node left: %s", node.Name)
	// Здесь можно добавить логику, например, перераспределение CR, за которые отвечал ушедший узел.
}

// NotifyUpdate обрабатывает обновление метаданных узла.
func (d *customDelegate) NotifyUpdate(node *memberlist.Node) {
	// Пока не используется.
}

Это костяк нашего gossip‑слоя. Каждый под оператора создает экземпляр GossipManager, который присоединяется к кластеру через memberlist. Ключевой метод — SpreadReconciliation, который рассылает сообщение о том, что конкретный CR нужно пересмотреть.

Планировщик на слухах

Самое сложное в децентрализованной системе — избежать ситуации, когда все узлы одновременно пытаются сделать одну и ту же работу. Нужен механизм, который гарантирует, что реконсиляцию CR выполнит только один узел в определенный момент времени. Но без центрального планировщика.

Решение — детерминированный выбор исполнителя на основе имени CR. Каждый узел, получив gossip‑сообщение о необходимости реконсиляции, независимо вычисляет, является ли он ответственным за эту реконсиляцию в данный момент. Алгоритм может быть таким:

  1. Все поды оператора имеют одинаковый список активных членов кластера (благодаря gossip‑протоколу).

  2. Сортируем имена подов лексикографически.

  3. Для каждого CR вычисляем хэш его полного имени (namespace/name).

  4. На основе этого хэша выбираем под из отсортированного списка. Например, index = hash(cr_name) % len(members).

Этот алгоритм гарантирует, что все узлы придут к одному и тому же выводу о том, кто должен обрабатывать CR. Если выбранный под отваливается, хэш пересчитывается заново по новому списку членов, и ответственность переходит к следующему поду.

Реализуем эту логику в нашем основном контроллере.

package controller

import (
	"context"
	"crypto/sha256"
	"encoding/hex"
	"fmt"
	"log"
	"sort"
	"sync"
	"time"

	"your-project/pkg/gossip"
	clientset "your-project/pkg/generated/clientset/versioned"
	informers "your-project/pkg/generated/informers/externalversions"

	corev1 "k8s.io/api/core/v1"
	metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
	"k8s.io/apimachinery/pkg/util/wait"
	"k8s.io/client-go/kubernetes"
	"k8s.io/client-go/rest"
	"k8s.io/client-go/tools/cache"
)

// Controller – главный контроллер нашего оператора.
type Controller struct {
	kubeClient    kubernetes.Interface
	customClient  clientset.Interface
	gossipManager *gossip.GossipManager

	// Кэш CRD, которые знает этот контроллер.
	crdCache map[string]*yourv1alpha1.DistributedApp // Замени yourv1alpha1 на свой тип
	cacheLock sync.RWMutex

	// Канал для остановки контроллера.
	stopCh chan struct{}
}

// NewController создает новый экземпляр контроллера.
func NewController(config *rest.Config, gossipManager *gossip.GossipManager) (*Controller, error) {
	kubeClient, err := kubernetes.NewForConfig(config)
	if err != nil {
		return nil, err
	}
	customClient, err := clientset.NewForConfig(config)
	if err != nil {
		return nil, err
	}

	controller := &Controller{
		kubeClient:    kubeClient,
		customClient:  customClient,
		gossipManager: gossipManager,
		crdCache:      make(map[string]*yourv1alpha1.DistributedApp),
		stopCh:        make(chan struct{}),
	}

	// Регистрируем обработчик gossip-сообщений.
	gossipManager.SetEventHandler(controller.handleGossipMessage)

	return controller, nil
}

// Run запускает контроллер.
func (c *Controller) Run(workers int) error {
	defer close(c.stopCh)

	// Создаем informer для наших CRD.
	factory := informers.NewSharedInformerFactory(c.customClient, time.Minute*10)
	informer := factory.Your().V1alpha1().DistributedApps()
	informer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
		AddFunc:    c.onCRAdd,
		UpdateFunc: c.onCRUpdate,
		DeleteFunc: c.onCRDelete,
	})

	log.Println("Starting CRD informer")
	go informer.Informer().Run(c.stopCh)

	log.Println("Starting gossip-based controller")
	for i := 0; i < workers; i++ {
		go wait.Until(c.worker, time.Second, c.stopCh)
	}

	<-c.stopCh
	return nil
}

// worker – фоновая горутина для обработки событий (может использоваться для других задач).
func (c *Controller) worker() {
	// Пока не используется интенсивно, так как реконсиляция инициируется по gossip.
	time.Sleep(time.Minute)
}

// onCRAdd обрабатывает добавление нового CR.
func (c *Controller) onCRAdd(obj interface{}) {
	crd := obj.(*yourv1alpha1.DistributedApp)
	key := fmt.Sprintf("%s/%s", crd.Namespace, crd.Name)

	c.cacheLock.Lock()
	c.crdCache[key] = crd
	c.cacheLock.Unlock()

	log.Printf("CR added to cache: %s", key)

	// При добавлении CR сразу инициируем gossip о необходимости его реконсиляции.
	// Это гарантирует, что хотя бы один узел начнет работу.
	go func() {
		time.Sleep(100 * time.Millisecond) // Небольшая задержка для стабилизации
		err := c.gossipManager.SpreadReconciliation(crd.Name, crd.Namespace)
		if err != nil {
			log.Printf("Failed to spread gossip for new CR %s: %v", key, err)
		}
	}()
}

// onCRUpdate обрабатывает обновление CR.
func (c *Controller) onCRUpdate(oldObj, newObj interface{}) {
	newCRD := newObj.(*yourv1alpha1.DistributedApp)
	key := fmt.Sprintf("%s/%s", newCRD.Namespace, newCRD.Name)

	c.cacheLock.Lock()
	c.crdCache[key] = newCRD
	c.cacheLock.Unlock()

	log.Printf("CR updated in cache: %s", key)

	// При обновлении CR также распространяем слух.
	err := c.gossipManager.SpreadReconciliation(newCRD.Name, newCRD.Namespace)
	if err != nil {
		log.Printf("Failed to spread gossip for updated CR %s: %v", key, err)
	}
}

// onCRDelete обрабатывает удаление CR.
func (c *Controller) onCRDelete(obj interface{}) {
	crd, ok := obj.(*yourv1alpha1.DistributedApp)
	if !ok {
		// Обработка случая, когда объект уже удален.
		tombstone, ok := obj.(cache.DeletedFinalStateUnknown)
		if !ok {
			log.Printf("Unexpected object type on delete: %T", obj)
			return
		}
		crd, ok = tombstone.Obj.(*yourv1alpha1.DistributedApp)
		if !ok {
			log.Printf("Unexpected object type in tombstone: %T", tombstone.Obj)
			return
		}
	}
	key := fmt.Sprintf("%s/%s", crd.Namespace, crd.Name)

	c.cacheLock.Lock()
	delete(c.crdCache, key)
	c.cacheLock.Unlock()

	log.Printf("CR removed from cache: %s", key)
	// Удаление – тоже событие. Возможно, нужно почистить связанные ресурсы.
	// Распространяем слух об удалении для запуска garbage collection.
	err := c.gossipManager.SpreadReconciliation(crd.Name, crd.Namespace)
	if err != nil {
		log.Printf("Failed to spread gossip for deleted CR %s: %v", key, err)
	}
}

// handleGossipMessage обрабатывает входящее gossip-сообщение.
// Это центральная логика планировщика.
func (c *Controller) handleGossipMessage(msg gossip.ReconMessage) {
	key := fmt.Sprintf("%s/%s", msg.CRNamespace, msg.CRName)

	// Проверяем, есть ли такой CR в нашем кэше.
	c.cacheLock.RLock()
	crd, exists := c.crdCache[key]
	c.cacheLock.RUnlock()

	if !exists {
		// Если CR нет в кэше, возможно, этот под еще не получил его через informer.
		// Можно попытаться получить его через API-сервер, но для простоты проигнорируем.
		log.Printf("CR %s not found in local cache, ignoring gossip", key)
		return
	}

	// Детерминированный выбор: должен ли этот под выполнить реконсиляцию?
	if c.shouldIReconcile(msg.CRNamespace, msg.CRName) {
		log.Printf("I am the chosen one for CR %s! Starting reconciliation.", key)
		c.reconcileCR(crd)
	} else {
		log.Printf("Not my turn to reconcile CR %s.", key)
	}
}

// shouldIReconcile определяет, должен ли текущий под выполнить реконсиляцию для данного CR.
func (c *Controller) shouldIReconcile(namespace, name string) bool {
	members := c.gossipManager.GetMemberList()
	if len(members) == 0 {
		return true // Если мы одни, то мы и отвечаем.
	}

	// Сортируем имена членов кластера для детерминизма.
	sort.Strings(members)

	// Вычисляем хэш от полного имени CR.
	crKey := fmt.Sprintf("%s/%s", namespace, name)
	hash := sha256.Sum256([]byte(crKey))
	hashInt := new(big.Int).SetBytes(hash[:])
	// Выбираем индекс на основе хэша.
	index := int(new(big.Int).Mod(hashInt, big.NewInt(int64(len(members))).Int64())

	// Получаем имя текущего пода.
	currentPod := os.Getenv("POD_NAME") // Предполагается, что передано через Downward API
	if currentPod == "" {
		// Если не задано, используем имя из memberlist (менее надежно).
		currentPod = c.gossipManager.GetLocalNodeName()
	}

	// Сравниваем, является ли текущий под выбранным.
	return members[index] == currentPod
}

// reconcileCR выполняет непосредственно реконсиляцию CR.
// Это та самая логика, которая в обычном операторе выполняется контроллером.
func (c *Controller) reconcileCR(crd *yourv1alpha1.DistributedApp) error {
	key := fmt.Sprintf("%s/%s", crd.Namespace, crd.Name)
	log.Printf("Starting reconciliation for %s", key)

	// Здесь стандартная логика оператора:
	// 1. Проверить текущее состояние в Kubernetes (поды, сервисы и т.д.).
	// 2. Сравнить с желаемым состоянием из crd.Spec.
	// 3. Выполнить необходимые действия (создать, обновить, удалить ресурсы).

	// Пример: проверяем, что количество подов приложения соответствует crd.Spec.Replicas.
	appPods, err := c.kubeClient.CoreV1().Pods(crd.Namespace).List(context.TODO(), metav1.ListOptions{
		LabelSelector: fmt.Sprintf("app=%s", crd.Name),
	})
	if err != nil {
		return err
	}

	currentReplicas := int32(len(appPods.Items))
	desiredReplicas := crd.Spec.Replicas

	if currentReplicas < desiredReplicas {
		// Нужно создать недостающие поды.
		log.Printf("Need to create %d pods for %s", desiredReplicas-currentReplicas, key)
		for i := currentReplicas; i < desiredReplicas; i++ {
			newPod := c.constructPodForCR(crd)
			_, err = c.kubeClient.CoreV1().Pods(crd.Namespace).Create(context.TODO(), newPod, metav1.CreateOptions{})
			if err != nil {
				log.Printf("Failed to create pod for %s: %v", key, err)
			}
		}
	} else if currentReplicas > desiredReplicas {
		// Нужно удалить лишние поды.
		log.Printf("Need to delete %d pods for %s", currentReplicas-desiredReplicas, key)
		podsToDelete := appPods.Items[desiredReplicas:]
		for _, pod := range podsToDelete {
			err = c.kubeClient.CoreV1().Pods(crd.Namespace).Delete(context.TODO(), pod.Name, metav1.DeleteOptions{})
			if err != nil {
				log.Printf("Failed to delete pod %s for %s: %v", pod.Name, key, err)
			}
		}
	}

	// Обновляем статус CR, указывая, что реконсиляцию выполнил этот под.
	updatedStatus := crd.Status.DeepCopy()
	updatedStatus.ReconciledByPod = os.Getenv("POD_NAME")
	updatedStatus.LastReconcileTime = metav1.Now()

	// Получаем список активных подов оператора из gossip-кластера.
	members := c.gossipManager.GetMemberList()
	updatedStatus.ActivePods = members

	_, err = c.customClient.YourV1alpha1().DistributedApps(crd.Namespace).UpdateStatus(context.TODO(), crd, metav1.UpdateOptions{})
	if err != nil {
		log.Printf("Failed to update status for %s: %v", key, err)
		return err
	}

	log.Printf("Successfully reconciled %s", key)
	return nil
}

// constructPodForCR создает объект Pod для нашего CR.
func (c *Controller) constructPodForCR(crd *yourv1alpha1.DistributedApp) *corev1.Pod {
	return &corev1.Pod{
		ObjectMeta: metav1.ObjectMeta{
			GenerateName: fmt.Sprintf("%s-", crd.Name),
			Namespace:    crd.Namespace,
			Labels: map[string]string{
				"app": crd.Name,
			},
			OwnerReferences: []metav1.OwnerReference{
				{
					APIVersion: "your.api/v1alpha1",
					Kind:       "DistributedApp",
					Name:       crd.Name,
					UID:        crd.UID,
				},
			},
		},
		Spec: corev1.PodSpec{
			Containers: []corev1.Container{
				{
					Name:  "application",
					Image: crd.Spec.ApplicationImage,
				},
			},
		},
	}
}

// GetMemberList – вспомогательный метод для получения списка членов кластера.
func (gm *GossipManager) GetMemberList() []string {
	members := gm.memberlist.Members()
	names := make([]string, len(members))
	for i, m := range members {
		names[i] = m.Name
	}
	return names
}

// GetLocalNodeName – вспомогательный метод.
func (gm *GossipManager) GetLocalNodeName() string {
	return gm.localNodeName
}

Garbage Collector на слухах

Что происходит с ресурсами (подами, сервисами), когда удаляется CR? В классическом операторе финализатор гарантирует, что контроллер успеет почистить ресурсы перед удалением CR. В нашем случае нужно быть осторожным.

Мы добавляем финализатор в CR. Когда приходит событие об удалении CR, мы также распространяем это через gossip. Узел, который окажется ответственным за реконсиляцию в этот момент (на основе хэша от имени CR), выполнит очистку и удалит финализатор, позволив API‑серверу окончательно удалить CR.

Нужно убедиться, что в момент удаления список членов кластера стабилен. Если узел, ответственный за очистку, отвалится во время процесса, другой узел должен взять эту задачу на себя. Алгоритм с хэшированием автоматически перераспределит ответственность при изменении состава кластера.

Эпидемическая конфиг-дистрибуция

Дополнительный бонус такой архитектуры — легкая рассылка конфигурации. Допустим, оператору нужно применить обновление конфигурации ко всем обрабатываемым CR. Для этого не нужно перезапускать поды или ждать, пока каждый CR будет обработан в ходе обычного цикла реконсиляции.

Достаточно отправить специальное широковещательное gossip‑сообщение, содержащее новую конфигурацию. Каждый узел, получив его, обновит свой внутренний кэш конфигурации и инициирует реконсиляцию для всех CR в своем кэше. Реконсиляция будет распределена между узлами по обычному алгоритму. Это приводит к очень быстрому распространению изменений по всей системе.

Нюансы

Идея не лишена сложностей. Сетевой трафик gossip‑протокола нужно мониторить, особенно в больших кластерах. Хотя memberlist оптимизирован, он все равно создает нагрузку. Рекомендуется настраивать интервалы probes и таймауты в соответствии с характеристиками сети.

Согласованность данных в кэше CR между узлами не является строгой. Это eventual consistency. Возможна ситуация, когда один узел еще не получил обновление CR через API‑сервер, но уже получил gossip‑сообщение о нем. Поэтому в методе handleGossipMessage мы проверяем наличие CR в кэше. Если его нет, можно отложить обработку или запросить CR непосредственно из API‑сервера.

Отладка такой системы сложнее. Логи реконсиляции размазаны по всем подам оператора. Необходимо агрегировать логи и иметь четкую идентификацию пода, который выполнил ту или иную реконсиляцию (что мы и делаем, записывая ReconciledByPod в статус CR).

Заключение

Переход от централизованной модели оператора к децентрализованной на основе gossip — это серьезное архитектурное изменение. Оно не нужно всем и каждому. Но для сценариев, требующих высокой отказоустойчивости, минимизации задержек и работы в условиях нестабильной сети, этот подход порой оказывается подходящим.

Попробуйте, и вы увидите Kubernetes с другой стороны.


Если вам близки идеи отказоустойчивых систем, распределённых алгоритмов и архитектур без единой точки отказа, обратите внимание на курс «Разработка децентрализованных приложений». На нём вы разберётесь, как реализуются механизмы координации без центрального контроллера, как устроены gossip‑протоколы, и как принципы peer‑to‑peer взаимодействия применяются в современных инфраструктурах. Чтобы узнать, подойдет ли вам программа курса, пройдите вступительный тест.

Рост в IT быстрее с Подпиской — дает доступ к 3-м курсам в месяц по цене одного. Подробнее

Комментарии (0)