Привет, Хабр!

Есть классическая боль очередей: скейлинг по факту отставания. Пока backlog вырос, пока HPA дотянулся, пока новые pod«ы прогрелись — SLO уже упал. Решение напрашивается: считать не сколько наваливается прямо сейчас, а сколько нужно серверов, чтобы вероятность ждать больше T была ниже целевого порога. Ровно это умеет Erlang‑C пришедший из жизни колл‑центров. Берём λ — интенсивность входа, μ — среднюю производительность одного воркера, целевой сервис‑левел по ожиданию в очереди, и получаем требуемое число агентов c. Дальше превращаем это в desired replicas и отдаём в KEDA через External Scaler поверх gRPC. Получается предиктивный автоскейлинг, привязанный к SLO, а не к догоняющим метрикам.

Erlang-C как инструмент для SLO по ожиданию

Работаем в стандартной модели M/M/c: пуассоновский вход, экспоненциальные сервис‑таймы, c независимых серверов, бесконечная очередь. Тогда вероятность того, что пришедший попадёт ждать, выражается формулой Erlang‑C. Пусть a=λ/μ — предложенная нагрузка в ерлангах. Тогда

P_W = \frac{\frac{a^c}{c!}\cdot \frac{c}{c-a}}{\sum_{k=0}^{c-1}\frac{a^k}{k!} + \frac{a^c}{c!}\cdot \frac{c}{c-a}}

Это вероятность того, что клиент будет ждать начала обслуживания. Это — классика модели M/M/c.

Дальше важная связь с SLO: распределение времени ожидания условно на событие «пришлось ждать» — экспоненциальное с параметром cμ−λ. Значит неусловная вероятность уложиться в порог T такова:

Pr\{W_q \le T\} = 1 - P_W \cdot e^{-(c\mu - \lambda)T}

Сервис‑левел по ожиданию до T — это и есть формула выше

Цель: найти минимальное cc, при котором Pr⁡{Wq≤T}≥p при заданных TT и целевом уровне p. Условия устойчивости обязательны: λ<cμ. Оговорка адекватности: Erlang‑C переоценивает ожидание, если клиенты отваливаются из очереди — в таком случае модель Erlang‑A точнее, а C будет консервативной.

Как превратить это в контроллер мощностей

Шаги простые и повторяемые:

  1. Оценить или спрогнозировать λ на горизонте, который соответствует времени прогрева/холод‑старту и желаемому SLO‑окну. Это можно сделать PromQL функциями predict_linear или сглаживанием double_exponential_smoothing. В актуальной документации Prometheus holt_winters переименован в double_exponential_smoothing, а predict_linear — линейная регрессия на окне.

  2. Оценить μ из метрик сервиса. Надёжно брать средний сервис‑тайм за окно и инвертировать: μ=1/sˉ.

  3. По T,p посчитать минимальное cc бинарным поиском по формуле выше.

  4. Отдать cc как метрику, из которой HPA без лишних трюков получит desired replicas. В KEDA у ScaledObject для external‑метрик тип по умолчанию AverageValue, и HPA тогда целится в «глобальная метрика / targetAverageValue». Если положить targetAverageValue = 1, то desired replicas будет равен значению метрики.

Реализация KEDA External Scaler на Go

KEDA ходит к внешнему gRPC‑серверу и вызывает четыре метода: IsActive, StreamIsActive, GetMetricSpec, GetMetrics. Сигнатуры описаны в externalscaler.proto, а в документации KEDA показано, как именно KEDA их использует. Мы реализуем «pull»‑вариант без стрима.

Ниже заготовка. Она:

  • читает из ScaledObject.metadata PromQL‑запросы для λ и среднего сервис‑тайма;

  • прогнозирует λ на заданный горизонт;

  • считает cc по Erlang‑C с бинарным поиском;

  • фиксирует анти‑дребезг: scale down медленнее scale up, гистерезис по допуску ε;

  • публикует прометеевые метрики самого скейлера;

  • поддерживает TLS для gRPC по флагам.

// cmd/scaler/main.go
package main

import (
	"context"
	"crypto/tls"
	"encoding/json"
	"errors"
	"flag"
	"fmt"
	"log"
	"math"
	"net"
	"net/http"
	"os"
	"strconv"
	"time"

	pb "github.com/kedacore/keda/pkg/scalers/externalscaler"
	"google.golang.org/grpc"
	"google.golang.org/grpc/credentials"
)

type server struct {
	pb.UnimplementedExternalScalerServer
	httpc   *http.Client
	logger  *log.Logger
	now     func() time.Time
	// simple state for hysteresis
	lastC           int
	lastChange      time.Time
	scaleDownHold   time.Duration
	scaleUpMinStep  int
	epsilonSL       float64
	minC, maxC      int
}

func main() {
	addr := flag.String("listen", ":9090", "gRPC listen addr")
	scaleDownHold := flag.Duration("scale-down-hold", 2*time.Minute, "hold time before scaling down")
	scaleUpMinStep := flag.Int("scale-up-min-step", 1, "minimal scale up step")
	useTLS := flag.Bool("tls", false, "enable TLS")
	certFile := flag.String("tls-cert", "", "TLS cert")
	keyFile := flag.String("tls-key", "", "TLS key")
	flag.Parse()

	s := &server{
		httpc: &http.Client{
			Timeout: 5 * time.Second,
		},
		logger:         log.New(os.Stdout, "erlangc-scaler ", log.LstdFlags|log.Lmsgprefix),
		now:            time.Now,
		scaleDownHold:  *scaleDownHold,
		scaleUpMinStep: *scaleUpMinStep,
		epsilonSL:      0.01,
		minC:           1,
		maxC:           10000,
	}

	var srv *grpc.Server
	if *useTLS {
		creds, err := credentials.NewServerTLSFromFile(*certFile, *keyFile)
		if err != nil {
			log.Fatalf("tls: %v", err)
		}
		srv = grpc.NewServer(grpc.Creds(creds))
	} else {
		srv = grpc.NewServer()
	}
	pb.RegisterExternalScalerServer(srv, s)

	lis, err := net.Listen("tcp", *addr)
	if err != nil {
		log.Fatal(err)
	}
	log.Printf("listening on %s", *addr)
	if err := srv.Serve(lis); err != nil {
		log.Fatal(err)
	}
}

func (s *server) IsActive(ctx context.Context, in *pb.ScaledObjectRef) (*pb.IsActiveResponse, error) {
	// активируемся, если предсказанный λ > 0 и требуемое c >= minC
	res, _, err := s.compute(ctx, in)
	active := false
	if err == nil && res >= int32(s.minC) {
		active = true
	}
	return &pb.IsActiveResponse{Result: active}, nil
}

func (s *server) GetMetricSpec(ctx context.Context, in *pb.ScaledObjectRef) (*pb.GetMetricSpecResponse, error) {
	// HPA должен интерпретировать метрику как AverageValue = 1,
	// значит desiredReplicas = metricValue
	return &pb.GetMetricSpecResponse{
		MetricSpecs: []*pb.MetricSpec{
			{
				MetricName: "erlangc_required_replicas",
				TargetSize: 1, // AverageValue=1
			},
		},
	}, nil
}

func (s *server) GetMetrics(ctx context.Context, in *pb.GetMetricsRequest) (*pb.GetMetricsResponse, error) {
	cReq, sl, err := s.compute(ctx, in.ScaledObjectRef)
	if err != nil {
		return nil, err
	}
	// гистерезис: не даём дёргаться вниз чаще, чем раз в scaleDownHold
	now := s.now()
	cOut := int(cReq)
	if s.lastC > 0 {
		if cOut < s.lastC {
			if now.Sub(s.lastChange) < s.scaleDownHold {
				cOut = s.lastC
			}
		} else if cOut > s.lastC && cOut-s.lastC < s.scaleUpMinStep {
			cOut = s.lastC + s.scaleUpMinStep
		}
	}
	if cOut != s.lastC {
		s.lastC = cOut
		s.lastChange = now
	}

	s.logger.Printf("c*=%d, predictedSL=%.3f", cOut, sl)
	return &pb.GetMetricsResponse{
		MetricValues: []*pb.MetricValue{{
			MetricName:  "erlangc_required_replicas",
			MetricValue: int64(cOut),
		}},
	}, nil
}

func (s *server) compute(ctx context.Context, in *pb.ScaledObjectRef) (int32, float64, error) {
	md := in.ScalerMetadata
	promURL := md["prometheusURL"]
	if promURL == "" {
		return 0, 0, errors.New("prometheusURL is required")
	}
	arrQ := md["arrivalRateQuery"] // should return λ [1/s]
	serQ := md["serviceTimeQuery"] // should return average service time [s]
	if arrQ == "" || serQ == "" {
		return 0, 0, errors.New("arrivalRateQuery and serviceTimeQuery are required")
	}
	targetSL, _ := strconv.ParseFloat(md["targetSL"], 64) // e.g. 0.95
	if targetSL <= 0 || targetSL >= 1 {
		targetSL = 0.95
	}
	T, _ := strconv.ParseFloat(md["waitThresholdSeconds"], 64) // e.g. 1.0
	if T <= 0 {
		T = 1.0
	}

	lmbd, err := s.instantQuery(ctx, promURL, arrQ)
	if err != nil {
		return 0, 0, fmt.Errorf("arrival rate query: %w", err)
	}
	svc, err := s.instantQuery(ctx, promURL, serQ)
	if err != nil {
		return 0, 0, fmt.Errorf("service time query: %w", err)
	}
	if svc <= 0 || lmbd < 0 {
		return 0, 0, errors.New("invalid metrics")
	}
	mu := 1.0 / svc

	// нижняя граница: хотя бы ceil(λ/μ)
	lb := int(math.Ceil(lmbd / mu))
	if lb < s.minC {
		lb = s.minC
	}
	ub := lb
	// расширяем верхнюю границу, пока SL не выполнится или пока не упрёмся
	for ; ub <= s.maxC; ub *= 2 {
		sl := serviceLevel(lmbd, mu, float64(ub), T)
		if sl >= targetSL {
			break
		}
		if ub == 0 {
			ub = 1
		}
	}
	if ub > s.maxC {
		return int32(s.maxC), serviceLevel(lmbd, mu, float64(s.maxC), T), nil
	}
	// бинарный поиск
	best := ub
	for lb <= ub {
		m := (lb + ub) / 2
		sl := serviceLevel(lmbd, mu, float64(m), T)
		if sl >= targetSL {
			best = m
			ub = m - 1
		} else {
			lb = m + 1
		}
	}
	return int32(best), serviceLevel(lmbd, mu, float64(best), T), nil
}

func (s *server) instantQuery(ctx context.Context, base, q string) (float64, error) {
	req, err := http.NewRequestWithContext(ctx, "GET", fmt.Sprintf("%s/api/v1/query?query=%s", base, urlQueryEscape(q)), nil)
	if err != nil {
		return 0, err
	}
	resp, err := s.httpc.Do(req)
	if err != nil {
		return 0, err
	}
	defer resp.Body.Close()
	var out struct {
		Status string `json:"status"`
		Data   struct {
			ResultType string `json:"resultType"`
			Result     []struct {
				Value [2]any `json:"value"`
			} `json:"result"`
		} `json:"data"`
	}
	if err := json.NewDecoder(resp.Body).Decode(&out); err != nil {
		return 0, err
	}
	if out.Status != "success" || len(out.Data.Result) == 0 {
		return 0, errors.New("no data")
	}
	valStr, _ := out.Data.Result[0].Value[1].(string)
	return strconv.ParseFloat(valStr, 64)
}

// вероятность ждать
func erlangC(a float64, c int) float64 {
	if c <= 0 || a <= 0 {
		return 0
	}
	// устойчивое вычисление через рекуррентные коэффициенты
	term := 1.0
	sum := 1.0
	for k := 1; k <= c-1; k++ {
		term *= a / float64(k)
		sum += term
	}
	termC := term * a / float64(c) // a^c / c!
	if a >= float64(c) {
		// перегрузка — формально неустойчиво, возвращаем 1
		return 1.0
	}
	num := termC * float64(c) / (float64(c) - a)
	return num / (sum + num)
}

func serviceLevel(lambda, mu, c float64, T float64) float64 {
	if c <= 0 {
		return 0
	}
	Pw := erlangC(lambda/mu, int(c))
	gap := c*mu - lambda
	if gap <= 0 {
		return 0
	}
	return 1 - Pw*math.Exp(-gap*T)
}

// простая экранизация без внешних зависимостей
func urlQueryEscape(q string) string {
	r := ""
	for i := 0; i < len(q); i++ {
		ch := q[i]
		switch ch {
		case ' ':
			r += "%20"
		case '"':
			r += "%22"
		case '+':
			r += "%2B"
		case '%':
			r += "%25"
		case '&':
			r += "%26"
		default:
			r += string(ch)
		}
	}
	return r
}

Манифесты: деплой скейлера и ScaledObject

Разворачиваем Deployment с нашим gRPC‑сервисом. Если нужен TLS, добавляем секрет с ключом и сертификатом и включаем флаги. ScaledObject настраиваем так, чтобы HPA воспринимал возвращаемую метрику как AverageValue = 1. В metadata кладём PromQL‑запросы для λ и среднего сервис‑тайма.

apiVersion: apps/v1
kind: Deployment
metadata:
  name: erlangc-scaler
spec:
  replicas: 1
  selector:
    matchLabels: { app: erlangc-scaler }
  template:
    metadata:
      labels: { app: erlangc-scaler }
    spec:
      containers:
        - name: scaler
          image: ghcr.io/org/erlangc-scaler:1.0.0
          args:
            - --listen=:9090
            - --scale-down-hold=120s
            - --scale-up-min-step=2
          ports:
            - name: grpc
              containerPort: 9090
          readinessProbe:
            tcpSocket: { port: 9090 }
            initialDelaySeconds: 2
            periodSeconds: 5
          livenessProbe:
            tcpSocket: { port: 9090 }
            initialDelaySeconds: 10
            periodSeconds: 10
---
apiVersion: v1
kind: Service
metadata:
  name: erlangc-scaler
spec:
  selector:
    app: erlangc-scaler
  ports:
    - name: grpc
      port: 9090
      targetPort: 9090
---
apiVersion: keda.sh/v1alpha1
kind: ScaledObject
metadata:
  name: queue-worker-erlangc
spec:
  scaleTargetRef:
    name: queue-worker
  pollingInterval: 30
  cooldownPeriod: 300
  minReplicaCount: 0
  maxReplicaCount: 200
  advanced:
    horizontalPodAutoscalerConfig:
      behavior:
        # быстро вверх, аккуратно вниз
        scaleUp:
          stabilizationWindowSeconds: 0
          policies:
            - type: Percent
              value: 100
              periodSeconds: 15
        scaleDown:
          stabilizationWindowSeconds: 300
          policies:
            - type: Percent
              value: 50
              periodSeconds: 60
  triggers:
    - type: external
      metricType: AverageValue
      metadata:
        scalerAddress: erlangc-scaler.default.svc.cluster.local:9090
        prometheusURL: http://prometheus-server.monitoring.svc.cluster.local
        # прогноз λ на 60 с вперёд по регрессии на последнем часу
        arrivalRateQuery: sum(predict_linear(sum(rate(queue_in_total[5m]))[1h:5m], 60))
        # средний сервис-тайм по сумме/счётчику
        serviceTimeQuery: (sum(rate(job_duration_seconds_sum[5m])) / sum(rate(job_duration_seconds_count[5m])))
        waitThresholdSeconds: "1"
        targetSL: "0.95"

Семантика external‑метрик и HPA‑формулы подтверждена в официальных доках Kubernetes и KEDA. Обратите внимание на behavior.scaleDown.stabilizationWindowSeconds — дефолтное значение 300 с подавляет флаппинг при схлопывании, это нормальная практика.

Генератор нагрузки и эталонный воркер

Чтобы воспроизвести поведение M/M/c, возьмём Redis Lists как очередь, генератор с пуассоновским входом и воркер, который моделирует экспоненциальный сервис‑тайм с заданным средним. Нагрузчик просто пушит JSON‑сообщения в лист, воркер блокирующе читает и обрабатывает.

// cmd/loadgen/main.go
package main

import (
	"context"
	"flag"
	"log"
	"math/rand"
	"time"

	"github.com/redis/go-redis/v9"
)

func main() {
	addr := flag.String("redis", "redis:6379", "redis addr")
	queue := flag.String("queue", "jobs", "queue name")
	lambda := flag.Float64("lambda", 10, "arrival rate per second")
	flag.Parse()

	rdb := redis.NewClient(&redis.Options{Addr: *addr})
	ctx := context.Background()

	src := rand.New(rand.NewSource(time.Now().UnixNano()))
	for {
		// экспоненциальные межприходы
		u := src.Float64()
		wait := -mathLog(u) / *lambda
		time.Sleep(time.Duration(wait * float64(time.Second)))
		err := rdb.LPush(ctx, *queue, time.Now().UnixNano()).Err()
		if err != nil {
			log.Printf("LPUSH: %v", err)
		}
	}
}

func mathLog(u float64) float64 { return -1 * (math.Log(1-u)) }
// cmd/worker/main.go
package main

import (
	"context"
	"flag"
	"log"
	"math"
	"math/rand"
	"time"

	"github.com/redis/go-redis/v9"
	"github.com/prometheus/client_golang/prometheus"
	"github.com/prometheus/client_golang/prometheus/promhttp"
	"net/http"
)

var (
	jobDur = prometheus.NewHistogram(prometheus.HistogramOpts{
		Name:    "job_duration_seconds",
		Help:    "service time per job",
		Buckets: prometheus.DefBuckets,
	})
	processed = prometheus.NewCounter(prometheus.CounterOpts{
		Name: "queue_processed_total",
		Help: "jobs processed",
	})
)

func main() {
	addr := flag.String("redis", "redis:6379", "redis addr")
	queue := flag.String("queue", "jobs", "queue name")
	mean := flag.Float64("mean", 0.2, "mean service time in seconds")
	flag.Parse()

	prometheus.MustRegister(jobDur, processed)
	go func() {
		http.Handle("/metrics", promhttp.Handler())
		_ = http.ListenAndServe(":8080", nil)
	}()

	rdb := redis.NewClient(&redis.Options{Addr: *addr})
	ctx := context.Background()
	src := rand.New(rand.NewSource(time.Now().UnixNano()))

	for {
		res, err := rdb.BRPop(ctx, 0, *queue).Result()
		if err != nil {
			log.Printf("BRPOP: %v", err)
			time.Sleep(time.Second)
			continue
		}
		start := time.Now()
		// экспоненциальный сервис-тайм
		u := src.Float64()
		t := -math.Log(1-u) * (*mean)
		time.Sleep(time.Duration(t * float64(time.Second)))
		jobDur.Observe(time.Since(start).Seconds())
		processed.Inc()
	}
}

С такими метриками Prometheus‑запросы в ScaledObject становятся простыми: arrivalRateQuery = sum(rate(queue_in_total[5m])) или используйте queue_enqueued_total в своём стеке. serviceTimeQuery = sum(rate(job_duration_seconds_sum[5m])) / sum(rate(job_duration_seconds_count[5m])). Подчеркну: predict_linear подходит для gauges, для скоростей используйте подход через субквест [...].

Дэшборды: что мониторить

Хочется видеть одновременно прогноз λ, оценку μ, расчётное c, фактические реплики и сервис‑левел. Один из минимальных дашбордов можно собрать так:

  • Панель 1: sum(rate(queue_in_total[5m])) и predict_linear(sum(rate(queue_in_total[5m]))[1h:5m], 60) — текущая и прогнозная интенсивность.

  • Панель 2: 1 / (sum(rate(job_duration_seconds_sum[5m])) / sum(rate(job_duration_seconds_count[5m]))) — μ.

  • Панель 3: внешняя метрика erlangc_required_replicas.

  • Панель 4: расчётный сервис‑левел панелью Stat, вычисляем в скейлере и экспортируем или прикидываем в PromQL аналитику, если метрики доступны.

  • Панель 5: kube_deployment_status_replicas по целевому Deployment.

Минимальный JSON‑фрагмент панели Stat для c:

{
  "type": "stat",
  "title": "Required replicas (Erlang-C)",
  "targets": [
    {
      "expr": "erlangc_required_replicas",
      "legendFormat": "c*"
    }
  ],
  "options": {
    "reduceOptions": { "calcs": ["lastNotNull"], "values": false }
  }
}

Устойчивость

Даже идеальная модель может заставить HPA дёргаться, если входные оценки шумные. Контур стабилизации в двух местах:

  1. На стороне HPA через behavior: стабилизационное окно на scale down по умолчанию 300 секунд и ограничение скорости изменений. Это штатная возможность autoscaling/v2.

  2. В самом скейлере: держим минимальный шаг апскейла, откладываем даунскейл на scaleDownHold, накладываем допуск ε на целевой SLO. Это снижает чувствительность к лёгким промахам в λ/μ.

При желании можно добавить EMA к λ и μ или перейти на PromQL double_exponential_smoothing, но помните, что это сглаживание, а не точное предсказание.


Что в итоге получается

Мы переводим SLO по ожиданию в очереди в конкретное число реплик через Erlang‑C, используя текущие и прогнозные метрики. Это прозрачная математика плюс аккуратный контур стабилизации. Профит — меньше промахов по SLO, меньше догоняющих апскейлов при всплесках, контролируемая стоимость. А главное — логика автоскейлинга перестаёт зависеть от конкретного брокера, потому что мы подаём в скейлер не размер очереди, а первичные параметры потока и сервиса.

В итоге предиктивный автоскейлинг через Erlang‑C и KEDA позволяет проектировать систему не «по факту перегрузки», а исходя из формализованных требований к уровню сервиса. Это уже уровень архитектурных решений, где математика, распределённые системы и практики эксплуатации должны работать вместе.

Если вы хотите глубже разобраться в том, как строить подобные механизмы в продакшн‑среде, рекомендуем обратить внимание на курс Highload Architect. Там вы сможете изучить подходы к проектированию систем, где подобные методы масштабирования — не исключение, а правило. Чтобы узнать, подойдет ли вам программа курса, пройдите вступительный тест.

Рост в IT быстрее с Подпиской — дает доступ к 3-м курсам в месяц по цене одного. Подробнее

Комментарии (1)


  1. gis
    29.09.2025 17:03

    А что значит "SLO" в контексте вашей статьи?