Привет, Хабр!

Сегодня мы рассмотрим, как заставить PostgreSQL самостоятельно крутить K-Means для сегментации клиентов, не вытаскивая данные наружу. Пройдемся по циклу: нормализуем фичи в materialized view, напишем функцию PL/PythonU, которая дергает scikit-learn, сохраняем cluster_id обратно в таблицу и закрываем гештальт отчётом «доход по кластеру» чистым SQL.

Схема данных

Допустим, есть транзакции и базовая информация о клиентах:

CREATE TABLE public.customers (
    customer_id      bigint PRIMARY KEY,
    registered_at    timestamptz,
    email            text UNIQUE
);

CREATE TABLE public.orders (
    order_id         bigint PRIMARY KEY,
    customer_id      bigint REFERENCES public.customers,
    order_dt         timestamptz NOT NULL,
    order_amount     numeric(12,2) NOT NULL
);

Генерация фичей и нормализация в Materialized View

Нужны числовые признаки на одного клиента: orders_cnt, days_since_last, mean_amount, total_amount. Сразу делаем z-score, чтобы K-Means не страдал от разных масштабов. Всё в одном запросе, а результат кешируем материализованным представлением:

CREATE MATERIALIZED VIEW ds.mv_customer_features AS
WITH stats AS (
    SELECT
        avg(orders_cnt)::numeric  AS avg_orders_cnt,
        stddev_samp(orders_cnt)   AS sd_orders_cnt,
        avg(days_since_last)      AS avg_days_last,
        stddev_samp(days_since_last) AS sd_days_last,
        avg(mean_amount)          AS avg_mean_amount,
        stddev_samp(mean_amount)  AS sd_mean_amount,
        avg(total_amount)         AS avg_total_amount,
        stddev_samp(total_amount) AS sd_total_amount
    FROM (
        SELECT
            c.customer_id,
            COUNT(o.*)                      AS orders_cnt,
            EXTRACT(EPOCH FROM (now() - MAX(o.order_dt))) / 86400 AS days_since_last,
            AVG(o.order_amount)             AS mean_amount,
            SUM(o.order_amount)             AS total_amount
        FROM public.customers c
        LEFT JOIN public.orders o USING (customer_id)
        GROUP BY c.customer_id
    ) sub
),
base AS (
    SELECT
        c.customer_id,
        COUNT(o.*)                      AS orders_cnt,
        EXTRACT(EPOCH FROM (now() - MAX(o.order_dt))) / 86400 AS days_since_last,
        AVG(o.order_amount)             AS mean_amount,
        SUM(o.order_amount)             AS total_amount
    FROM public.customers c
    LEFT JOIN public.orders o USING (customer_id)
    GROUP BY c.customer_id
), z AS (
    SELECT
        b.customer_id,
        (b.orders_cnt    - s.avg_orders_cnt)   / NULLIF(s.sd_orders_cnt,0)   AS z_orders_cnt,
        (b.days_since_last - s.avg_days_last)  / NULLIF(s.sd_days_last,0)    AS z_days_last,
        (b.mean_amount   - s.avg_mean_amount)  / NULLIF(s.sd_mean_amount,0)  AS z_mean_amount,
        (b.total_amount  - s.avg_total_amount) / NULLIF(s.sd_total_amount,0) AS z_total_amount
    FROM base b CROSS JOIN stats s
)
SELECT * FROM z;

Материализованный вид хорош тем, что его можно освежать по расписанию (REFRESH MATERIALIZED VIEW CONCURRENTLY ds.mv_customer_features;) и он даёт индексы, если понадобится. Под капотом всё стандартный SQL, никаких костылей.

Подключаем PL/PythonU

CREATE EXTENSION IF NOT EXISTS plpython3u;

Права выдаём только чтение на ds.mv_customer_features и запись на public.customers.cluster_id.

Хранимая функция K-Means

Функция получает желаемое число кластеров (k), обучает K-Means на нормированных фичах, сохраняет модель в JSON (для истории) и пишет номер кластера клиенту в таблицу.

CREATE OR REPLACE FUNCTION ds.build_customer_clusters(k int DEFAULT 5)
RETURNS void
LANGUAGE plpython3u
SECURITY DEFINER
AS $$
import json
from sklearn.cluster import KMeans
from sklearn.exceptions import ConvergenceWarning
import warnings

plpy.execute("SET search_path TO ds, public")

# 1. Забираем данные
rows = plpy.execute("""
    SELECT customer_id,
           ARRAY[z_orders_cnt, z_days_last, z_mean_amount, z_total_amount] AS f
    FROM ds.mv_customer_features
    WHERE z_orders_cnt IS NOT NULL
""")

if len(rows) < k:
    plpy.error(f"Not enough data points ({len(rows)}) for k={k}")

cust_ids = [r['customer_id'] for r in rows]
X = [r['f'] for r in rows]

# 2. Обучаем модель
warnings.filterwarnings("ignore", category=ConvergenceWarning)
model = KMeans(n_clusters=k, n_init='auto', random_state=42)
model.fit(X)
labels = model.labels_

# 3. Записываем кластера
tuples = [{'customer_id': cid, 'cluster_id': int(lbl)} for cid, lbl in zip(cust_ids, labels)]
plpy.execute("CREATE TEMP TABLE _tmp_cluster (customer_id bigint, cluster_id int) ON COMMIT DROP")
plpy.execute("INSERT INTO _tmp_cluster VALUES " + 
             ", ".join(f"({t['customer_id']}, {t['cluster_id']})" for t in tuples))

plpy.execute("""
    UPDATE public.customers c
    SET cluster_id = t.cluster_id
    FROM _tmp_cluster t
    WHERE t.customer_id = c.customer_id
""")

# 4. Сериализуем модель (опционально)
plpy.execute("""
    INSERT INTO ds.model_registry(model_name, trained_at, params, inertia)
    VALUES ('customer_kmeans', now(), $1, $2)
""", [json.dumps(model.get_params()), float(model.inertia_)])
$$;

SECURITY DEFINER исполняется с правами владельца, пользователи не получат лишние привилегии. Темп-таблица для массового апдейта быстрее, чем UPDATE … FROM (VALUES …) на тысячи строк. random_state фиксируем, чтобы результаты воспроизводились.

Запуск и расписание

Разово:

SELECT ds.build_customer_clusters(6);
REFRESH MATERIALIZED VIEW CONCURRENTLY ds.mv_customer_features;

Еще удобно делать REFRESH и перестройку модели ночью cron-джобой или pg_cron:

SELECT cron.schedule('0 3 * * *', $$REFRESH MATERIALIZED VIEW CONCURRENTLY ds.mv_customer_features; 
                      SELECT ds.build_customer_clusters(6);$$);

Отчёт «доход по кластеру»

Никакого Python — только SQL:

WITH rev AS (
    SELECT
        c.cluster_id,
        SUM(o.order_amount) AS revenue
    FROM public.customers c
    JOIN public.orders o USING (customer_id)
    GROUP BY c.cluster_id
)
SELECT
    cluster_id,
    revenue,
    ROUND(revenue * 100.0 / SUM(revenue) OVER (), 2) AS revenue_pct
FROM rev
ORDER BY revenue DESC;

Результат сразу готов к дашборду: видно долю каждого сегмента в общей выручке.

Итог

Если у вас уже есть опыт сегментации на стороне БД — делитесь в комментариях: как масштабировали, какие проблемы ловили, где K-Means не зашёл. Чем больше примеров, тем полезнее статья для всех.

Погрузитесь в процесс разработки ПО с нуля: научитесь учитывать цели бизнеса и формулировать технические требования к продукту на базовом курсе «Системный аналитик».

Чтобы оставаться в курсе актуальных технологий и трендов, подписывайтесь на Telegram-канал OTUS.

Комментарии (0)