Привет, Хабр!
Сегодня мы рассмотрим, как заставить PostgreSQL самостоятельно крутить K-Means для сегментации клиентов, не вытаскивая данные наружу. Пройдемся по циклу: нормализуем фичи в materialized view, напишем функцию PL/PythonU, которая дергает scikit-learn, сохраняем cluster_id обратно в таблицу и закрываем гештальт отчётом «доход по кластеру» чистым SQL.
Схема данных
Допустим, есть транзакции и базовая информация о клиентах:
CREATE TABLE public.customers (
customer_id bigint PRIMARY KEY,
registered_at timestamptz,
email text UNIQUE
);
CREATE TABLE public.orders (
order_id bigint PRIMARY KEY,
customer_id bigint REFERENCES public.customers,
order_dt timestamptz NOT NULL,
order_amount numeric(12,2) NOT NULL
);
Генерация фичей и нормализация в Materialized View
Нужны числовые признаки на одного клиента: orders_cnt
, days_since_last
, mean_amount
, total_amount
. Сразу делаем z-score, чтобы K-Means не страдал от разных масштабов. Всё в одном запросе, а результат кешируем материализованным представлением:
CREATE MATERIALIZED VIEW ds.mv_customer_features AS
WITH stats AS (
SELECT
avg(orders_cnt)::numeric AS avg_orders_cnt,
stddev_samp(orders_cnt) AS sd_orders_cnt,
avg(days_since_last) AS avg_days_last,
stddev_samp(days_since_last) AS sd_days_last,
avg(mean_amount) AS avg_mean_amount,
stddev_samp(mean_amount) AS sd_mean_amount,
avg(total_amount) AS avg_total_amount,
stddev_samp(total_amount) AS sd_total_amount
FROM (
SELECT
c.customer_id,
COUNT(o.*) AS orders_cnt,
EXTRACT(EPOCH FROM (now() - MAX(o.order_dt))) / 86400 AS days_since_last,
AVG(o.order_amount) AS mean_amount,
SUM(o.order_amount) AS total_amount
FROM public.customers c
LEFT JOIN public.orders o USING (customer_id)
GROUP BY c.customer_id
) sub
),
base AS (
SELECT
c.customer_id,
COUNT(o.*) AS orders_cnt,
EXTRACT(EPOCH FROM (now() - MAX(o.order_dt))) / 86400 AS days_since_last,
AVG(o.order_amount) AS mean_amount,
SUM(o.order_amount) AS total_amount
FROM public.customers c
LEFT JOIN public.orders o USING (customer_id)
GROUP BY c.customer_id
), z AS (
SELECT
b.customer_id,
(b.orders_cnt - s.avg_orders_cnt) / NULLIF(s.sd_orders_cnt,0) AS z_orders_cnt,
(b.days_since_last - s.avg_days_last) / NULLIF(s.sd_days_last,0) AS z_days_last,
(b.mean_amount - s.avg_mean_amount) / NULLIF(s.sd_mean_amount,0) AS z_mean_amount,
(b.total_amount - s.avg_total_amount) / NULLIF(s.sd_total_amount,0) AS z_total_amount
FROM base b CROSS JOIN stats s
)
SELECT * FROM z;
Материализованный вид хорош тем, что его можно освежать по расписанию (REFRESH MATERIALIZED VIEW CONCURRENTLY ds.mv_customer_features;
) и он даёт индексы, если понадобится. Под капотом всё стандартный SQL, никаких костылей.
Подключаем PL/PythonU
CREATE EXTENSION IF NOT EXISTS plpython3u;
Права выдаём только чтение на ds.mv_customer_features
и запись на public.customers.cluster_id
.
Хранимая функция K-Means
Функция получает желаемое число кластеров (k
), обучает K-Means на нормированных фичах, сохраняет модель в JSON (для истории) и пишет номер кластера клиенту в таблицу.
CREATE OR REPLACE FUNCTION ds.build_customer_clusters(k int DEFAULT 5)
RETURNS void
LANGUAGE plpython3u
SECURITY DEFINER
AS $$
import json
from sklearn.cluster import KMeans
from sklearn.exceptions import ConvergenceWarning
import warnings
plpy.execute("SET search_path TO ds, public")
# 1. Забираем данные
rows = plpy.execute("""
SELECT customer_id,
ARRAY[z_orders_cnt, z_days_last, z_mean_amount, z_total_amount] AS f
FROM ds.mv_customer_features
WHERE z_orders_cnt IS NOT NULL
""")
if len(rows) < k:
plpy.error(f"Not enough data points ({len(rows)}) for k={k}")
cust_ids = [r['customer_id'] for r in rows]
X = [r['f'] for r in rows]
# 2. Обучаем модель
warnings.filterwarnings("ignore", category=ConvergenceWarning)
model = KMeans(n_clusters=k, n_init='auto', random_state=42)
model.fit(X)
labels = model.labels_
# 3. Записываем кластера
tuples = [{'customer_id': cid, 'cluster_id': int(lbl)} for cid, lbl in zip(cust_ids, labels)]
plpy.execute("CREATE TEMP TABLE _tmp_cluster (customer_id bigint, cluster_id int) ON COMMIT DROP")
plpy.execute("INSERT INTO _tmp_cluster VALUES " +
", ".join(f"({t['customer_id']}, {t['cluster_id']})" for t in tuples))
plpy.execute("""
UPDATE public.customers c
SET cluster_id = t.cluster_id
FROM _tmp_cluster t
WHERE t.customer_id = c.customer_id
""")
# 4. Сериализуем модель (опционально)
plpy.execute("""
INSERT INTO ds.model_registry(model_name, trained_at, params, inertia)
VALUES ('customer_kmeans', now(), $1, $2)
""", [json.dumps(model.get_params()), float(model.inertia_)])
$$;
SECURITY DEFINER
исполняется с правами владельца, пользователи не получат лишние привилегии. Темп-таблица для массового апдейта быстрее, чем UPDATE … FROM (VALUES …)
на тысячи строк. random_state
фиксируем, чтобы результаты воспроизводились.
Запуск и расписание
Разово:
SELECT ds.build_customer_clusters(6);
REFRESH MATERIALIZED VIEW CONCURRENTLY ds.mv_customer_features;
Еще удобно делать REFRESH
и перестройку модели ночью cron-джобой или pg_cron:
SELECT cron.schedule('0 3 * * *', $$REFRESH MATERIALIZED VIEW CONCURRENTLY ds.mv_customer_features;
SELECT ds.build_customer_clusters(6);$$);
Отчёт «доход по кластеру»
Никакого Python — только SQL:
WITH rev AS (
SELECT
c.cluster_id,
SUM(o.order_amount) AS revenue
FROM public.customers c
JOIN public.orders o USING (customer_id)
GROUP BY c.cluster_id
)
SELECT
cluster_id,
revenue,
ROUND(revenue * 100.0 / SUM(revenue) OVER (), 2) AS revenue_pct
FROM rev
ORDER BY revenue DESC;
Результат сразу готов к дашборду: видно долю каждого сегмента в общей выручке.
Итог
Если у вас уже есть опыт сегментации на стороне БД — делитесь в комментариях: как масштабировали, какие проблемы ловили, где K-Means не зашёл. Чем больше примеров, тем полезнее статья для всех.
Погрузитесь в процесс разработки ПО с нуля: научитесь учитывать цели бизнеса и формулировать технические требования к продукту на базовом курсе «Системный аналитик».
Чтобы оставаться в курсе актуальных технологий и трендов, подписывайтесь на Telegram-канал OTUS.