Spring Batch появился за много лет до появления MongoDB, и его архитектура изначально предполагала наличие SQL-базы данных для хранения состояния заданий Spring Batch.

Но это было десятилетия назад, а один из самых частых вопросов у тех, кто впервые сталкивался со Spring Batch, звучал так: «Почему этой штуке нужно обращаться к SQL-базе данных?» Ответ, конечно, заключался в том, что Spring Batch ведет подробнейший учет каждого задания, шага и выполнения в JobRepository, и многие годы этот репозиторий говорил на одном диалекте — SQL. Если вы счастливо жили в мире MongoDB, вам все равно приходилось тащить за собой экземпляр Postgres или MySQL лишь для того, чтобы Batch мог записать, что он сделал в прошлый вторник.

В последних версиях Spring Batch JobRepository был отвязан от JDBC, а Spring Boot 4.1 наконец доводит этот опыт до полноценного состояния с помощью корректной автоконфигурации spring-boot-starter-batch-data-mongodb. Вы получаете такой же zero-config-опыт Boot для метаданных batch-задач, каким пользователи JDBC наслаждались с самого начала.

Занятный факт: доктор Дэйв Сайер, сооснователь Spring Boot, был основателем и многолетним руководителем Spring Batch. Естественно, первой автоконфигурацией, которую он написал для Spring Boot, была автоконфигурация для Spring Batch! Так что, когда я говорю, что пользователи Spring Boot пользовались поддержкой на базе JDBC в Spring Boot с самого начала, я именно это имею в виду :)

В этом материале мы разберем небольшой, но полноценный пример. Он:

  • Хранит Spring Batch JobRepository в MongoDB через новый стартер 4.1.

  • Читает customers.csv из classpath.

  • Записывает строки в таблицу PostgreSQL customers.

  • Запускает все это на сервисах, поднятых через compose.yaml в корне проекта.

Поднимаем инфраструктуру

Прежде чем работать с одной конкретной строкой в Java, запустим два вспомогательных сервиса:

docker compose up

compose.yaml поднимает экземпляр MongoDB, настроенный как одноузловой replica set — поддержке MongoDB в Batch нужны транзакции, а для транзакций нужен replica set, — экземпляр PostgreSQL для целевой таблицы и контейнер Grafana LGTM для наблюдаемости, если он понадобится позже.

Задание, которое мы определим, — это простой ETL-процесс: extraction, transformation, load, то есть извлечение, преобразование и загрузка. Он читает данные из файла customers.csv и записывает их в таблицу customers в базе данных PostgreSQL.

Нам нужно инициализировать таблицу Postgres customers; это делает src/main/resources/schema.sql:

create table if not exists customers (
    id    serial primary key,
    name  varchar(255),
    email varchar(255)
);

SQL-инициализация Spring Boot (spring.sql.init.mode=always) выполняет этот скрипт при запуске. Сторона MongoDB устроена так же автономно — spring.batch.data.mongodb.schema.initialize=true говорит новому стартеру создать коллекции, необходимые JobRepository.

Ключевые фрагменты application.properties:

spring.mongodb.host=localhost
spring.mongodb.port=27017
spring.mongodb.database=mydatabase

spring.batch.data.mongodb.schema.initialize=true

spring.datasource.url=jdbc:postgresql://localhost/mydatabase
spring.datasource.username=myuser
spring.datasource.password=secret
Комментарий от Михаила Поливаха

Тут можно использовать Spring Boot Docker Compose starter, который избавит почти от всей ручной настройки какой-либо конфигурации (опять же, если мы говорим про простой "demo" кейс).

Мы также о нем писали:

https://habr.com/ru/companies/spring_aio/articles/1031216/

Задание

Наш batch job называется etl и выполняет два шага последовательно:

@Bean
Job job(@Qualifier(STEP_RESET) Step stepReset,
        @Qualifier(STEP_FILES_TO_DB) Step stepFilesToDb) {
    return new JobBuilder("etl", this.repository)
            .start(stepReset)
            .next(stepFilesToDb)
            .incrementer(new RunIdIncrementer())
            .build();
}

Сначала reset — tasklet, который очищает целевую таблицу, затем files-to-db — reader → processor → writer, который фактически переносит данные. RunIdIncrementer — небольшая, но важная деталь: он увеличивает параметр run.id при каждом запуске, чтобы Spring Batch рассматривал каждый вызов как новый экземпляр задания, а не отказывался повторно запускать уже завершенное задание.

Шаг первый: Reset

Самый простой тип шага в Spring Batch — это Tasklet: единый фрагмент работы без понятия чтения или записи элементов. Это подходящий инструмент, когда между шагами нужно просто что-то выполнить. Здесь — начать с чистого листа:

@Bean(STEP_RESET)
Step cleanTableStep(JdbcClient db, JobRepository repository) {
    return new StepBuilder("reset", repository)
            .tasklet((contribution, chunkContext) -> {
                db.sql("delete from customers").update();
                return RepeatStatus.FINISHED;
            })
            .build();
}

Tasklet выполняется один раз, возвращает RepeatStatus.FINISHED, и мы движемся дальше.

Шаг второй: reader, processor, writer

Самый интересный шаг — chunked-шаг. Базовый паттерн Spring Batch — прочитать элемент, обработать его, накопить chunk, записать chunk. Reader извлекает строки из customers.csv:

@Bean
FlatFileItemReader<Customer> customerFlatFileItemReader(
        @Value("classpath:/customers.csv") Resource csv) {
    return new FlatFileItemReaderBuilder<Customer>()
            .name("customer-reader")
            .resource(csv)
            .delimited(c -> c.delimiter(",").names("id", "name", "email"))
            .fieldSetMapper(fs -> new Customer(
                    fs.readInt("id"),
                    fs.readString("name"),
                    fs.readString("email")))
            .build();
}

Сам CSV:

id,name,email
1,josh,josh@joshlong.com
2,dashaun,dashaun@dashaun.com
3,james,james@jamesward.dev

Идея понятна.

Writer отправляет каждый chunk в Postgres, используя ON CONFLICT DO NOTHING, чтобы повторные запуски не падали из-за конфликтов по первичному ключу:

    @Bean
    FlatFileItemReader<Customer> customerFlatFileItemReader(@Value("classpath:/customers.csv") Resource csv) {
        return new FlatFileItemReaderBuilder<Customer>() //
                .name("customer-reader")
                .resource(csv)
                .delimited(c -> c.delimiter(",").names("id", "name", "email"))
                .targetType(Customer.class)
                .build();
    }

А сам шаг связывает все вместе с небольшим сквозным processor — отличным местом, куда позже можно добавить трансформацию, обогащение или фильтрацию, — и размером chunk, равным 10:

    @Bean
    JdbcBatchItemWriter<Customer> customerJdbcBatchItemWriter(DataSource dataSource) {
        return new JdbcBatchItemWriterBuilder<Customer>()//
                .assertUpdates(true)//
                .dataSource(dataSource)//
                .sql("INSERT INTO customers(id, name, email) VALUES (:id, :name, :email) on conflict do nothing")//
                .beanMapped()//
                .itemPreparedStatementSetter((item, ps) -> {
                    ps.setInt(1, item.id());
                    ps.setString(2, item.name());
                    ps.setString(3, item.email());
                })//
                .build();
    }

Обратите внимание на переключатель faultTolerant() и политику retry — Spring Batch будет незаметно повторять обработку элементов, выбрасывающих IllegalArgumentException, до десяти раз, прежде чем пометит chunk как завершившийся с ошибкой. Это одна строка, потому что всю “бухгалтерию” за вас ведет фреймворк, и именно эта бухгалтерия теперь живет в MongoDB.

Запуск

При запущенном Docker Compose:

./mvnw spring-boot:run

Задание стартует, reset очищает Postgres, files-to-db прогоняет CSV через chunk-пайплайн в Postgres, а каждый переход между шагами, число элементов, статус завершения и временная метка выполнения записываются в MongoDB. Откройте mongosh — и увидите знакомые Batch-коллекции: BATCH_JOB_INSTANCE, BATCH_JOB_EXECUTION, BATCH_STEP_EXECUTION. Только теперь это документы, а не таблицы.

Наблюдаемость

Spring Batch-задания публикуют массу интересных Spring ApplicationEvents! Я слушаю одно из них — JobExecutionEvent, которое публикуется всякий раз, когда задание завершилось, успешно или нет.

    @EventListener
    void after(JobExecutionEvent event) {
        IO.println("Job execution #" + event.getJobExecution() + " finished");
    }

Когда я создавал программу через Spring Initializr, я не забыл добавить и OpenTelemetry Spring Boot starter. Spring Boot и Micrometer давно поддерживают OpenTelemetry, но раньше всегда требовалась некоторая возня. Теперь, если в classpath есть OpenTelemetry starter, он будет публиковать метрики в любой OpenTelemetry endpoint — по умолчанию предполагается порт 3000. Если выбрать поддержку Docker Compose в Spring Initializr, он также выдаст конфигурацию Grafana, которая будет слушать OpenTelemetry-информацию на порту 3000.

Итак, запустите приложение, затем откройте localhost:3000, нажмите Drilldown, затем Metrics и найдите spring_batch в поле поиска.

Либо можно открыть http://localhost:8080/actuator/metrics и увидеть те же метрики. Но мне нравятся яркие и красочные графики, так что страница Grafana меня весьма радует.

Бонус: Native Images с GraalVM

Технология native image в GraalVM потенциально позволяет снизить общее потребление памяти. Spring Batch уже в основном работает с GraalVM native images, но появились новые классы, которые мне нужно было учесть. И несколько новых schema-файлов.

    static class Hints implements RuntimeHintsRegistrar {

        @Override
        public void registerHints(@NonNull RuntimeHints hints, @Nullable ClassLoader classLoader) {
            for (var c : new Class[]{
                    org.springframework.batch.core.repository.persistence.JobInstance.class,
                    org.springframework.batch.core.repository.persistence.ExecutionContext.class,
                    org.springframework.batch.core.repository.persistence.ExitStatus.class,
                    org.springframework.batch.core.repository.persistence.StepExecution.class,
                    org.springframework.batch.core.repository.persistence.JobExecution.class,
                    org.springframework.batch.core.repository.persistence.JobParameter.class,
            }) {
                hints.reflection().registerType(c, MemberCategory.values());
            }

            var prefix = "org/springframework/batch/core/";
            for (var r : new String[]{
                    "schema-mongodb", //
                    "schema-drop-mongodb"}) {
                for (var suffix : "jsonl,js".split(",")) {
                    var path = prefix + r + "." + suffix;
                    var resource = new ClassPathResource(path);
                    if (resource.exists()) {
                        hints.resources().registerResource(resource);
                    }
                }
            }
        }
    }

И нам также нужно сообщить GraalVM о customers.csv.


    static class ResourceHints implements RuntimeHintsRegistrar {

        @Override
        public void registerHints(RuntimeHints hints, @Nullable ClassLoader classLoader) {
            hints.reflection().registerType(Customer.class, MemberCategory.values());
            hints.resources().registerResource(new ClassPathResource("/customers.csv"));
        }
    }

Зарегистрируйте оба в обычном порядке, добавив это в класс BatchConfiguration или в любой класс с @Configuration:

@ImportRuntimeHints({BatchConfiguration.ResourceHints.class, BatchConfiguration.Hints.class})

После этого можно собрать GraalVM native image обычным способом. Я записал шаги в скрипте native.sh в корне репозитория:

#!/usr/bin/env bash
ls -la target && rm -rf target
./mvnw -DskipTests -Pnative native:compile
./target/batch

Запустите приложение: ./target/batch — и убедитесь, что оно стартует практически мгновенно и потребляет заметно меньше RAM, чем при запуске на JVM. На моей машине — Apple M5 с macOS — оно стартует примерно за одну десятую секунды и использует около 150 МБ RAM. Долгоживущие batch-задания обычно не относятся к тем вещам, которым критически важен быстрый старт, но экономия RAM приятна, а быстрое время запуска точно не мешает.

Ленивые подключения DataSource

Еще одна оптимизация, которая в общем масштабе не слишком меняет именно эту нагрузку, но сама по себе весьма приятна: в Spring Boot 4.1 теперь поддерживается ленивое получение соединения. Помните: по умолчанию Spring Boot инициализирует DataSource и создает соединение всякий раз, когда начинается транзакция, даже если нет гарантии, что вы это соединение вообще используете. Этой платы можно избежать с помощью нового конфигурационного свойства Spring Boot spring.datasource.connection-fetch=lazy.

Получить исходники

Как обычно, полный код этого примера доступен здесь.

Почему это важно

Историческая связка Spring Batch с реляционной базой данных всегда была прагматичным компромиссом, а не архитектурным идеалом. Фреймворку нужно надежное место, где можно помнить, что он сделал, и SQL был путем наименьшего сопротивления. Теперь, когда абстракция JobRepository по-настоящему отвязана, а Spring Boot 4.1 поставляет полноценную первоклассную автоконфигурацию для MongoDB, командам, работающим на document stores, больше не нужно держать JDBC-базу только ради того, чтобы угодить batch-слою.

Выбирайте базу данных, которая подходит вашим данным. Spring Batch подстроится под ваш выбор.

Присоединяйтесь к русскоязычному сообществу разработчиков на Spring Boot в телеграм — Spring АйО, чтобы быть в курсе последних новостей из мира разработки на Spring Boot и всего, что с ним связано.

Комментарии (0)