
Вступление
Меня зовут Денис Агапитов, я руководитель группы Platform Core компании Bercut.
Сегодня я хочу рассказать вам об одном из вариантов доставки событий для распределённого приложения на Java.
Это доставка событий через БД, в которой хранится состояние распределённого приложения.
Как всё начиналось
При разработке распределённых приложений, которые хранят своё состояние в БД, появилась необходимость осуществлять обмен событиями между экземплярами приложений, работающих с этой БД.
На текущий момент у нас есть такие распределённые приложения, как Notification Broker, Scheduler, распределённый кэш настроек сервисов и другие.
Хотелось избежать больших нагромождений из сторонних библиотек и отдельно стоящих сервисов, таких как JMS, Kafka, etc. Это продиктовано тем, что использование сторонних сервисов только усложнит код, поддержку и настройку. Так как все экземпляры приложения работают с одной БД, то самым логичным решением было реализовать обмен событиями через саму БД. При этом необходимо выбирать решение, которое не будет зависеть от разработчика РСУБД.
В итоге мы решили разработать собственную реализацию на основе poll-запроса и секционированной таблицы для хранения и обмена событиями между всеми экземплярами распределённого приложения.
Доработки со стороны БД
Мы будем разбивать таблицу событий по дням. Каждая секция будет соответствовать одному дню. Это позволит снизить нагрузку на базу данных при удалении обработанных данных. События будут только добавляться в таблицу и читаться из неё. Удаление старых событий будет происходить путём удаления устаревших партиций. Для обслуживания нам понадобится еще одна таблица, которая будет выполнять функцию распределенной блокировки. Запуск обслуживания будет осуществляться java-приложением.
Здесь далее будем рассматривать решение на примере БД PostgreSQL.
Создадим таблицы partition_update
и event
, заполним первую текущим значением:
create table partition_update (
last_time timestamp not null
);
create table event(
uuid varchar(36) not null,
created timestamp not null,
type numeric(2) not null,
value varchar(36) not null
) partition by range (created);
insert into partition_update (last_time) values (now()::timestamp);
Дадим права нашему пользователю и сменим владельца таблиц:
GRANT ALL PRIVILEGES ON partition_update, event TO cache_user;
alter table partition_update owner to cache_user;
alter table event owner to cache_user;
Создадим ключи и индексы для таблиц:
alter table partition_update add constraint pu$pk primary key (last_time);
create index indx$e$p on event (created);
Создадим процедуры для создания и удаления партиций:
CREATE OR REPLACE PROCEDURE add_partition(from_day character varying, to_day character varying)
LANGUAGE 'plpgsql'
AS $BODY$
DECLARE
v_sql text;
v_count numeric;
BEGIN
select count(*) into v_count
from information_schema.tables
where substring(table_name, 1, length('event')) = 'event'
and substring(table_name, length('event') + 2) = from_day;
if v_count = 0 then
v_sql := 'create table event_' || from_day || ' partition of event for values ';
v_sql := v_sql || 'from (''' || from_day || ' 00:00:00'') ';
v_sql := v_sql || 'to (''' || to_day || ' 00:00:00'')';
execute v_sql;
end if;
END;
$BODY$;
CREATE OR REPLACE PROCEDURE remove_partition(from_day character varying)
LANGUAGE 'plpgsql'
AS $BODY$
DECLARE
v_sql text;
v_count numeric;
BEGIN
select count(*) into v_count
from information_schema.tables
where substring(table_name, 1, length('event')) = 'event'
and substring(table_name, length('event') + 2) = from_day;
if v_count > 0 then
v_sql := 'alter table event detach partition event_' || from_day;
execute v_sql;
v_sql = 'drop table event_' || from_day;
execute v_sql;
end if;
END;
$BODY$;
На этом доработки базы данных окончены.
Доработки по управлению партициями со стороны приложения
Теперь давайте рассмотрим какие доработки необходимо сделать со стороны приложения.
Для начала создадим класс, который будет блокировать одновременную работу по управлению партициями с разных экземпляров распределённого приложения:
package org.event.partition;
import java.sql.Connection;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.Timestamp;
public class PartitionLock implements AutoCloseable {
// Запрос на чтение с блокировкой
private static final String SQL_PARTITIONS_LOCK = "select last_time from partition_update for update ";
private final PreparedStatement stmt;
private final ResultSet rs;
public PartitionLock(Connection conn) throws SQLException {
stmt = conn.prepareStatement(SQL_PARTITIONS_LOCK, ResultSet.TYPE_SCROLL_SENSITIVE, ResultSet.CONCUR_UPDATABLE);
try {
rs = stmt.executeQuery();
} catch (SQLException ex) {
stmt.close();
throw ex;
}
}
// Метод с блокировкой и обновлением времени обслуживания партиций
public void lock() throws SQLException {
rs.next();
rs.updateTimestamp(1, new Timestamp(System.currentTimeMillis()));
rs.updateRow();
}
@Override
public void close() throws SQLException {
try {
rs.close();
} finally {
stmt.close();
}
}
}
Для удобства использования класс сделан как реализация интерфейса AutoCloseable
, чтобы можно было его использовать с конструкциями try-with-resources. При вызове метода lock
происходит блокировка через базу данных для управления партициями, а при разрушении корректно закрываются объекты работы с БД. В последствии, при получении commit
/rollback
, заблокированная запись разблокируется и, в случае успеха, обновляется дата последнего цикла обслуживания партиций.
Также нам понадобится класс, описывающий партицию:
package org.event.partition;
import java.text.ParseException;
import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.HashSet;
import java.util.Set;
import java.util.TimeZone;
public class Partition {
private static final SimpleDateFormat FORMATTER = new SimpleDateFormat("yyyy_MM_dd");
// на сколько дней вперёд создавать партиции
private static final long FUTURE_DAYS = 2;
// через сколько дней удалять партиции
private static final long PAST_DAYS = 2;
private static final long LENGTH_OF_DAY = 24 * 60 * 60 * 1000;
private static long normalize(long millis) {
return millis - ((millis + TimeZone.getDefault().getOffset(millis)) % LENGTH_OF_DAY);
}
public static Set<Partition> getActualSetOfPartitions() {
Set<Partition> result = new HashSet<>();
long min = normalize(System.currentTimeMillis());
long max = normalize(System.currentTimeMillis() + FUTURE_DAYS * LENGTH_OF_DAY);
long current = min;
while (current <= max) {
result.add(new Partition(current));
current += LENGTH_OF_DAY;
}
return result;
}
//
private final long time;
public Partition(long time) {
this.time = normalize(time);
}
public Partition(String day) throws ParseException {
this.time = FORMATTER.parse(day).getTime();
}
public long getTime() {
return time;
}
public String getDay() {
return FORMATTER.format(new Date(time));
}
public String getNextDay() {
return FORMATTER.format(new Date(time + LENGTH_OF_DAY));
}
public boolean isNeedToDelete() {
return time < normalize(System.currentTimeMillis()) - PAST_DAYS * LENGTH_OF_DAY;
}
@Override
public int hashCode() {
int hash = 5;
hash = 89 * hash + (int) (this.time ^ (this.time >>> 32));
return hash;
}
@Override
public boolean equals(Object obj) {
if (this == obj) {
return true;
}
if (obj == null) {
return false;
}
if (getClass() != obj.getClass()) {
return false;
}
final Partition other = (Partition) obj;
return this.time == other.time;
}
}
Класс содержит выровненное и нормализованное время (time
), методы получения разного представления партиции, а также метод принятия решения isNeedToDelete
и статический метод, который возвращает список актуальных на текущее время партиций.
Теперь создадим абстрактный класс сервиса, для дальнейшего переиспользования в двух сервисах, которые мы будем реализовывать:
package org.event;
import java.sql.SQLException;
public abstract class BaseService implements Runnable {
private final Controller controller;
private final String threadName;
private Thread container;
protected volatile boolean interrupted;
public BaseService(Controller controller, String threadName) {
this.controller = controller;
this.threadName = threadName;
this.container = null;
this.interrupted = false;
}
public synchronized void start() {
if (container == null) {
interrupted = false;
container = new Thread(this);
container.setName(threadName);
container.start();
}
}
public synchronized void stop() {
if (container != null) {
interrupted = true;
if (container != Thread.currentThread()) {
while (container.isAlive()) {
try {
notify();
container.join();
} catch (InterruptedException ignore) {
}
}
}
container = null;
}
}
@Override
public void run() {
while (!interrupted) {
try {
next();
} catch (SQLException ex) {
controller.onDatabaseDown(ex);
} catch (RuntimeException ex) {
controller.onUnknownError(ex);
}
}
}
protected abstract void next() throws SQLException;
}
Интерфейс Controller имеет следующий вид:
package org.event.partition;
import java.sql.SQLException;
public interface Controller {
public void onDatabaseDown(SQLException ex);
public void onUnknownError(Exception ex);
}
Не буду приводить здесь его реализацию, объясню здесь только суть методов:
onDatabaseDown
- обрабатывает события о проблемах на БД.onUnknownError
- обрабатывает возможные Runtime исключения.
В качестве обработки может выступать, например, вывод в лог-файл, приостановка обработки входящих сообщений или что-то ещё.
Для управления созданием и удалением партиции создадим сервис PartitionServeWorker
, основанный на BaseService
:
ackage org.event.partition;
import org.event.BaseService;
import javax.sql.DataSource;
import java.sql.Connection;
import java.sql.Statement;
import java.sql.ResultSet;
import java.sql.CallableStatement;
import java.sql.SQLException;
import java.text.ParseException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Comparator;
import java.util.Iterator;
import java.util.List;
import java.util.Set;
public class PartitionServeWorker extends BaseService {
// Константы, содержащие запросы на чтение и управление партициями
private static final String SQL_PARTITIONS_READ = "select substring(table_name, 7) as partition_day from information_schema.tables where substring(table_name, 1, 6) = 'event_'";
private static final String SQL_PARTITION_CREATE = "call add_partition(?, ?)";
private static final String SQL_PARTITION_DELETE = "call remove_partition(?)";
private static final Comparator<Partition> PARTITION_COMPARATOR = (Partition one, Partition two) -> Long.compare(one.getTime(), two.getTime());
private final DataSource dataSource;
private long next;
public PartitionServeWorker(Controller controller, DataSource dataSource) {
super(controller, "partition-serve-worker");
this.dataSource = dataSource;
}
@Override
protected void next() throws SQLException {
waitNext();
if (!interrupted) {
setNext(doServe());
}
}
public void waitNext() {
long now;
for (;;) {
synchronized (this) {
now = System.currentTimeMillis();
if (interrupted || now >= next) {
return;
}
try {
wait(next-now);
} catch (InterruptedException ex) {
return;
}
}
}
}
private void setNext(long next) {
this.next = next;
}
private long doServe() throws SQLException {
try (Connection conn = dataSource.getConnection()) {
conn.setAutoCommit(false);
try {
try (PartitionLock lock = new PartitionLock(conn)) {
// Блокируем запись в таблице partition_update,
// чтобы не конкурировать с другими экземплярами
lock.lock();
// Читам существующие партиции
List<Partition> partitions = readPartitions(conn);
Iterator<Partition> it = partitions.iterator();
while (it.hasNext()) {
Partition partition = it.next();
if (partition.isNeedToDelete()) {
// Удаляем устаревшие партиции
deletePartition(conn, partition);
it.remove();
}
}
// Получаем список партиций, необходимых к созданию
partitions = getNewPartitions(partitions);
// Сортируем их по увеличению даты
Collections.sort(partitions, PARTITION_COMPARATOR);
// Создаём партиции
for (Partition partition : partitions) {
createPartition(conn, partition);
}
}
// Вызываем фиксацию и разблокируем
// запись в таблице partition_update
conn.commit();
// Возвращаем время следующего обслуживания
return System.currentTimeMillis() + 1000 * 60 * 60;
} catch (SQLException ex) {
conn.rollback();
throw ex;
}
}
}
// Добавляем актуальные партиции, если их нет
private List<Partition> getNewPartitions(List<Partition> already) {
Set<Partition> result = Partition.getActualSetOfPartitions();
for (Partition partition : already) {
result.remove(partition);
}
return new ArrayList<>(result);
}
// Читаем существующие партиции
private List<Partition> readPartitions(Connection conn) throws SQLException {
List<Partition> result = new ArrayList<>();
try (Statement stmt = conn.createStatement()) {
try (ResultSet rs = stmt.executeQuery(SQL_PARTITIONS_READ)) {
while (rs.next()) {
result.add(new Partition(rs.getString(1)));
}
}
} catch (ParseException ex) {
throw new SQLException("Invalid partition suffix", ex);
}
return result;
}
// Создаём партицию
private void createPartition(Connection conn, Partition partition) throws SQLException {
try (CallableStatement stmt = conn.prepareCall(SQL_PARTITION_CREATE)) {
stmt.setString(1, partition.getDay());
stmt.setString(2, partition.getNextDay());
stmt.execute();
}
}
// Удаляем партицию
private void deletePartition(Connection conn, Partition partition) throws SQLException {
try (CallableStatement stmt = conn.prepareCall(SQL_PARTITION_DELETE)) {
stmt.setString(1, partition.getDay());
stmt.execute();
}
}
}
Сервис выполняет актуализацию партиций (создание необходимых и удаление устаревших) при старте и далее раз в час (23 кратный запас в случае проблем с БД).
Отправка событий из приложения
Для начала надо создать интерфейс события, которое мы будем отправлять в БД и получать из неё:
package org.event;
public interface Event {
public static final int TYPE_TASK = 1;
public static final int TYPE_EVENT = 2;
public String getUuid();
public long getTime();
public int getType();
public String getValue();
}
В описанном интерфейсе для примера приведено два типа события - задача и просто событие, с идентификаторами 1 и 2, соответственно.
Создадим реализацию базового события с двумя конструкторами - один для инициации события, второй - для его чтения:
package org.event;
import org.event.util.UUIDGenerator;
public class BaseEvent implements Event {
private final String uuid;
private final long time;
private final int type;
private final String value;
public BaseEvent(int type, String value) {
this(
UUIDGenerator.generateUUIDString(),
System.currentTimeMillis(),
type,
value
);
}
public BaseEvent(String uuid, long time, int type, String value) {
this.uuid = uuid;
this.time = time;
this.type = type;
this.value = value;
}
@Override
public String getUuid() {
return uuid;
}
@Override
public long getTime() {
return time;
}
@Override
public int getType() {
return type;
}
@Override
public String getValue() {
return value;
}
}
Здесь не буду приводить реализацию UUIDGenerator
, скажу только, что генерируемый идентификатор должен быть реально уникальным. У нас leastSigBits
генерируемого UUID опирается на mac-адреса хоста, где развёрнуто приложение и на случайное число, а mostSigBits
на время.
Отсылка события в базу данных происходит через примерно такой класс:
package org.event;
import java.sql.Connection;
import java.sql.PreparedStatement;
import java.sql.SQLException;
import java.sql.Timestamp;
import javax.sql.DataSource;
public class EventSender {
private static final String SQL_EVENT_INSERT
= "insert into event (uuid, created, type, value) values (?, ?, ?, ?)";
private final DataSource dataSource;
public EventSender(DataSource dataSource) {
this.dataSource = dataSource;
}
// Отсылка нотификации в автономной транзакции
public void sendEvent(Event event) throws SQLException {
try (Connection conn = dataSource.getConnection()) {
try (PreparedStatement stmt = conn.prepareStatement(SQL_EVENT_INSERT)) {
stmt.setString(1, event.getUuid());
stmt.setTimestamp(2, new Timestamp(event.getTime()));
stmt.setInt(3, event.getType());
stmt.setString(4, event.getValue());
stmt.execute();
}
conn.commit();
} catch (SQLException ex) {
conn.rollback();
throw ex;
}
}
// Отсылка нотификации в основной транзакции
public void sendEvent(Connection conn, Event event) throws SQLException {
try (PreparedStatement stmt = conn.prepareStatement(SQL_EVENT_INSERT)) {
stmt.setString(1, event.getUuid());
stmt.setTimestamp(2, new Timestamp(event.getTime()));
stmt.setInt(3, event.getType());
stmt.setString(4, event.getValue());
stmt.execute();
}
}
}
В случае обновления данных на экземпляре, через который происходит обновление, вызывается метод sendEvent
.
Надо отметить, что отсылка события может происходить в двух вариантах:
В основной транзакции вместе с изменением данных.
В изолированной транзакции после изменения данных.
Если транзакции по изменению данных быстрые и укладываются во время до одной секунды или чуть более, то логичным вариантом будет создавать события прямо в этой транзакции, чтобы событие об изменении появилось одновременно с данными.
Если же транзакции по изменению данных длительные, то событие необходимо посылать в рамках автономной транзакции. Это необходимо потому, что если делать это в одной транзакции, то время реального окончания commit
может сильно уехать вперёд относительно проставленного в событии времени и это событие уже никто не увидит.
Приём событий из приложения
А теперь создадим второй сервис, который будет вычитывать нотификации из базы данных и инициировать действия по их типам:
package org.event;
import java.sql.Connection;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.Timestamp;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
import java.util.NavigableMap;
import java.util.Set;
import java.util.TreeMap;
import javax.sql.DataSource;
public class EventDispatcher extends BaseService {
// Константа с запросом, читающим события с определённого времени
private static final String SQL_EVENT_READ
= " select uuid, created, type, value from event where created > ? order by created";
// Константа отката времени для чтения
private static final long EVENT_LAG = 5000;
// Константа времени хранения обработанных событий в памяти
private static final long EVENT_HISTORY = 600000;
private final EventProcessor processor;
private final DataSource dataSource;
private final Set<String> uuids;
private final NavigableMap<Long, String> uuidsByTime;
public EventDispatcher(EventProcessor processor, Controller controller, DataSource dataSource) {
super(controller, "event-dispatcher");
this.processor = processor;
this.dataSource = dataSource;
this.uuids = new HashSet<>();
this.uuidsByTime = new TreeMap<>();
}
@Override
public synchronized void start() {
// Регистрируем пустое событие,
// чтобы начинать читать следующие именно с этого времени
register(new BaseEvent(0, null));
super.start();
}
@Override
protected void next() throws SQLException {
processEvents();
processUuids();
}
private void processUuids() {
// Удаляем из памяти устаревшие события
long key, border = System.currentTimeMillis() - EVENT_HISTORY;
while (!uuidsByTime.isEmpty() && (key = uuidsByTime.firstKey()) < border) {
uuids.remove(uuidsByTime.remove(key));
}
}
private void processEvents() throws SQLException {
// Определяем from time - либо время последней нотификации,
// либо текущее время
long fromTime = uuidsByTime.isEmpty() ? System.currentTimeMillis() : uuidsByTime.lastKey();
try (Connection conn = dataSource.getConnection()) {
// Читаем события вычитая EVENT_LAG, чтобы ничего не пропустить
// из-за небольшого несовпадения времени на разных серверах
for (Event event : readEvents(conn, fromTime - EVENT_LAG)) {
processEvent(event);
}
}
}
// Обрабатываем событие
private void processEvent(Event event) throws SQLException {
try {
switch (event.getType()) {
case Event.TYPE_TASK:
processor.onTask(event);
break;
case Event.TYPE_EVENT:
processor.onEvent(event);
break;
}
} finally {
register(event);
}
}
// Регистрируем событие, как обработанное
private void register(Event event) {
uuids.add(event.getUuid());
uuidsByTime.put(event.getTime(), event.getUuid());
}
// Читаем события из БД
private List<Event> readEvents(Connection conn, long fromTime) throws SQLException {
List<Event> events = new ArrayList<>();
try (PreparedStatement stmt = conn.prepareStatement(SQL_EVENT_READ)) {
stmt.setTimestamp(1, new Timestamp(fromTime));
try (ResultSet rs = stmt.executeQuery()) {
while (rs.next()) {
String uuid = rs.getString(1);
if (!uuids.contains(uuid)) {
// Если событие не обрабатывалось, добавляем
events.add(
new BaseEvent(
uuid,
rs.getTimestamp(2).getTime(),
rs.getInt(3),
rs.getString(4)
)
);
}
}
}
}
return events;
}
}
Интерфейс EventProcessor
имеет следующий вид:
package org.event;
public interface EventProcessor {
public void onTask(Event event);
public void onEvent(Event event);
}
Реализация EventProcessor
как раз и обрабатывает пришедшие события. В нашем примере - Task
и Event
.
Плюсы и минусы
Основные плюсы данного подхода:
Позволяет не использовать сложные и неповоротливые механизмы с очередями, ограничиваясь только одной таблицей с событиями.
Позволяет разгрузить БД, используя механизм удаления целых партиций.
Будет работать на любой БД, где есть возможность создавать и удалять партиции.
События приезжают в другие экземпляры практически мгновенно, и их задержка зависит только от константы
READ_DELAY
.
У данного подхода есть и минусы. Рассмотрим два основных:
Все сервера, на которых установлен кластер приложений должны быть синхронизированы по времени.
Возможная потеря события, если его отсылка происходит в автономной транзакции.
С первым минусом легко справится, запустив синхронизацию времени штатными средствами ОС.
Частично минимизировать второй недостаток подхода можно обработав ошибку отправки события и поставив отправку повторно через некоторый интервал с новым временем.
Однако, если после commit
на транзакцию и реальной отправкой события произойдёт падение JVM, то такое событие будет потеряно безвозвратно.
Этот риск (падение JVM) можно считать невероятным событием, так как скорее будет падение по OOM при работе с обновлением БД, чем после фиксации транзакции.
Заключение
В данной статье мы рассмотрели реализацию хранения событий кластера приложений, имеющих общую базу данных.
Данную реализацию можно перенести на любую БД, поддерживающую секционирование, написав процедуры создания, удаления и запрос чтения партиций для выбранной БД.
Комментарии (0)
ris58h
18.09.2025 12:08Требования к системе уж очень расплывчатые: осуществлять обмен событиями между экземплярами приложений, работающих с этой БД. Да и результат непонятен - анализировать простыни кода нет желания, простите.
Хотелось бы видеть в статье ответы на такие вопросы:
Сколько возможных producer/consumer в системе?
Какая гарантия доставки (at least once, at most once, exactly-once)?
Видел в коде упоминания event и task. В чём разница? Есть какие-то гарантии что task будет обработан лишь одним consumer?
kmatveev
18.09.2025 12:08Я не автор, но по прочитанному отвечу
Теоретически неограниченно, практически продюсеры и консьюмеры параллельно взаимодействуют с таблицей. Все consumer-ы получают все сообщения.
Event сохраняется в базу, а то, с какого места эти event-ы потребляются consumer-ом, зависит от consumer-а, решение не содержит персиста для полученных событий. Теоретически каждое событие имеет uuid, поэтому consumer защищается от повторной доставки, храня uuid-ы для какого-то количество недавно обработанных сообщений. Практически же на уровне таблицы этот uuid не является ключом, так что можно вставить один и тот же event несколько раз, но на уровне чтения повторы отбросятся.
Task/Event - это просто вспомогательный флажок на event-е, чтобы дёрнуть соотвествующий метод. По факту оно всё event, нет доставки единственному получателю из всех слушающих.
DenAgapitov Автор
18.09.2025 12:08Спасибо за вопрос. Раскрою более широко. Данный подход применяется для кластерных приложений (да, мы их пишем сами), полное состояние которых хранится в БД.
Возьмём для примера приложение Notification Broker, которое у нас как раз и реализует pub/sub модель взаимодействия по спецификации WS-Brokered-Notification, о которой вы говорите. При старте экземпляра, он идёт в БД и вычитывает состояние: опубликованные темы, подписки к нему, подписки от него и другую необходимую информацию. И так как это кластерное приложение, то состояние должно распространяться между всеми активными в текущий момент экземплярами. Таким образом подписчик в рамках WS-Atomic-Notification получит событие с любого активного экземпляра, даже если подписался не у него.
Примеры событий, распространяемых на кластере: опубликована новая тема, приложение подписалось к producer, появился новый consumer.
Было бы странно писать брокер сообщений, используя другой брокер или очередь.
Раскрыв более широко, теперь могу дать ответы на ваши 3 вопроса:
Не ограничено.
Если эти стратегии как-то натянуть на предмет статьи, то я бы сказал, что это ближе всего к at least once.
Это пример разных событий. Их можно назвать как угодно. В нашем Notification Broker, например, типов событий несколько десятков.
Ещё для меня очень странно выглядит желание видеть в статье ответы на вопросы, даже не поняв суть статьи. Простыни кода анализировать не обязательно, но хотя бы прочитать текст было бы уважительно по отношению к автору.
ris58h
18.09.2025 12:08Ещё для меня очень странно выглядит желание видеть в статье ответы на вопросы, даже не поняв суть статьи.
Как можно понять суть статьи из такого скудного описания? У вас вся постановка задачи: "осуществлять обмен событиями между экземплярами приложений, работающих с этой БД". Потом сразу пошла реализация. Как понять что реализация отвечает требованиям? Ну да, какие-то события пишутся, какие-то читаются. Наверное работает, а может и нет.
Ни на один из моих вопросов явного ответа в статье нет. Как читатель должен это понять? Из кода? Вы утверждаете что код читать необязательно. Тогда как?
Статью я прочитал полностью и пока вижу лишь неуважение к читателю.
DenAgapitov Автор
18.09.2025 12:08Ни на один из моих вопросов явного ответа в статье нет. Как читатель должен это понять? Из кода? Вы утверждаете что код читать необязательно. Тогда как?
Я не утверждал, что код читать не надо, всё же это "Туториал", а не позновательный текст. Код сильно упрощён и, как я надеялся, сильно понятен большинству заинтересованных темой. Если для осознания нужен глубокий анализ, прошу прощения. В следующий раз постараюсь раскрывать тему более глубоко ещё и текстом.
На вопросы вам ответили дважды, спасибо @kmatveev.
kmatveev
18.09.2025 12:08Было бы странно писать брокер сообщений, используя другой брокер или очередь.
Почему? Они же как раз выполняют то, для чего они вам нужны.
kmatveev
18.09.2025 12:08Эх, так и норовят программисты использовать базу данных как shared memory, а в любой shared memory сделать очередь.
Мне кажется, в методе readPartitions() пропустили цикл с rs.next().
amarkevich
18.09.2025 12:08если уже используется PostgreSQL - логично использовать расширение pg_partman
aleksandy
18.09.2025 12:08private static final SimpleDateFormat FORMATTER
Т.е. работа в многопоточном окружении не предусматривается?
new Date(time + LENGTH_OF_DAY)
Вот бы было бы удобно, если бы в стандартной библиотеке был какой-нибудь тип, позволяющий работать с датами без подобного геморроя, правда?
И да, использование
timestamp
в postgresql - моветон.
Beholder
Может быть всё-таки проще с Kafka? После вот этих всех велосипедов.