Трейсы в Spring Boot 3 с использованием Zipkin и Kafka в качестве транспорта. Часть 2

Предисловие

В предыдущей статье мы создали автоконфигурацию для того, чтобы включать трейсы в приложениях на базе Spring Boot. Как ранее было указано, она не позволяет собрать воедино цепочку вызовов из трейсов нескольких сервисов. В текущей статье мы доработаем стартер, добавив эту возможность.

Изменения, не относящиеся к решению проблемы

С момента написания первой части статьи, в стартер были добавлены изменения: появились конфигурационные свойства custom.tracing.sasl-mechanism и custom.tracing.security-protocol, которые передаются в свойства продюсера. Это позволяет управлять механизмами аутентификации в kafka.

Кроме того, мы стали явно указывать тип трейсов, добавив management.tracing.propagation.type со значением w3c. Это решило проблему смешивания различных типов трейсов, что ранее приводило к разрывам в трассировке.

Конечная конфигурация в application.yaml стала выглядеть следующим образом:

custom:
  tracing:
    bootstrap-servers: broker-1:9092, broker-2:9092, broker-3:9092
    username: ${USERNAME}
    password: ${PASSWORD}
    sasl-mechanism: SCRAM-SHA-256
    security-protocol: SASL_SSL
    topic: trace-topic
management:
  tracing:
    enabled: true
    propagation:
      type: w3c
    sampling:
      probability: 1

Как мы интегрируемся

Для понимания контекста и потребности в создаваемых конфигурационных bean’ах, нужно обозначить технологии, с помощью которых мы интегрируем сервисы:

Формат трейсов

Мы будем использовать формат w3c, который следует стандарту W3C Trace Context.

Когда используется W3C Trace Context, интеграционное взаимодействие включает следующие заголовки:

  • traceparent: Основной ID трассировки и ID операции;

  • tracestate: Дополнительная информация для передачи через границы систем (опциональный).

Traceparent

Это основной заголовок, который несет в себе:

  • Версию протокола;

  • Trace ID (глобальный идентификатор трассировки);

  • Parent ID (идентификатор текущего span);

  • Flags (флаги управления трассировкой).

Формат заголовка traceparent:

{version}-{trace-id}-{parent-id}-{trace-flags}

Здесь:

  • 00 — Версия формата traceparent (сейчас используется: 00 (версия 0));

  • 69bd60bfbe6c8f2ebc4fd6787aa0a747 — Trace ID (уникальный идентификатор одной трассы (end-to-end запрос));

  • bc4fd6787aa0a747 — Parent ID (идентификатор конкретного span в трассе). В новой системе будет создан новый span с новым parent-id, но старым trace-id;

  • 01 — Флаги управления трассировкой. В спецификации sampled-flag описывается, что существуют различные сценарии их обработки. В рамках текущего решения используется Probability sampling, вероятность которого задается свойством management.tracing.sampling.probability в application.yaml:

    • 01: трассировка регистрируется в системе распределенной трассировки;

    • 00: распространение трассировки не прерывается, но регистрация останавливается. При этом флаг распространяется далее.

Подробное описание приведено в спецификации заголовка traceparent.

Tracestate

Этот заголовок используется для передачи вендор-специфичных данных и добавления информации в цепочку запросов без изменения стандарта traceparent. Подробное описание можно найти в спецификации заголовка tracestate.

Конфигурации

В предыдущей статье все конфигурации были вынесены в отдельный стартер, поэтому изменять конечные приложения нам не потребуется (кроме изменения версии самого стартера).

Здесь и далее все дополнения производятся в исходном коде стартера, но если вы конфигурируете непосредственно приложение, то можно просто добавить конфигурации в него.

OpenFeign

Для feign-клиентов необходимо добавить поддержку micrometer. Существует несколько способов это сделать в т.ч. автоматически (при выполнении определенных условий). Мы создали bean типа MicrometerObservationCapability. Описание подхода описано в спецификации Spring Cloud OpenFeign.

Конфигурация:

import feign.micrometer.MicrometerObservationCapability;
import io.micrometer.observation.ObservationRegistry;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;


@Configuration(proxyBeanMethods = false)
public class FeignObservedConfiguration {

    @Bean
    public MicrometerObservationCapability micrometerObservationCapability(
            ObservationRegistry registry
    ) {
        return new MicrometerObservationCapability(registry);
    }
}

Зависимости

Для получения класса feign.micrometer.MicrometerObservationCapability подключаем зависимость:

<dependency>
    <groupId>io.github.openfeign</groupId>
    <artifactId>feign-micrometer</artifactId>
    <version>13.5</version>
</dependency>

Результат

Любой вызов сторонних (по отношении к нашему) сервисов сопровождается добавлением HTTP-заголовка traceparent.

Kafka

Для работы с заголовком traceparent в сообщениях kafka нам потребуется включить observation для компонентов:

  • для listener, в случае, если приложение занимается получением сообщений;

  • для template, в случае, если приложение занимается отправкой сообщений.

Включение может быть произведено через configuration properties, вот так:

spring:
  kafka:
    listener:
      observation-enabled: true
    template:
      observation-enabled: true

Чтобы не писать эти свойства в конфигурации каждого конечного приложения, мы используем BeanPostProcessor, исходный код которого приведен ниже:

import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.BeansException;
import org.springframework.beans.factory.config.BeanPostProcessor;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.lang.NonNull;

@Slf4j
public class KafkaObservationEnablerPostProcessor implements BeanPostProcessor {

    @Override
    public Object postProcessAfterInitialization(
            @NonNull Object bean,
            @NonNull String beanName
    ) throws BeansException {
        if (bean instanceof ConcurrentKafkaListenerContainerFactory<?, ?> factory) {
            factory.getContainerProperties().setObservationEnabled(true);
            log.debug("Observation enabled in consumer [beanName: {}, class: {}]", beanName, bean.getClass());

        } else if (bean instanceof KafkaTemplate<?, ?> kafkaTemplate) {
            kafkaTemplate.setObservationEnabled(true);
            log.debug("Observation enabled in producer [beanName: {}, class: {}]", beanName, bean.getClass());

        }
        return bean;
    }
}

Реализация применяется ко всем listener’ам и template’ам приложения, включая необходимые свойства.

Более подробно подход описан в статье Baeldung: Micrometer Observation and Spring Kafka.

Результат

Сообщения, отправленные в kafka, содержат заголовок traceparent. При получении сообщения, контекст трейса восстанавливается, все последующие действия привязываются к нему.

SOAP-интеграции

Прокси для SOAP-интеграций мы создаем с помощью JaxWsProxyFactoryBean. Если не углубляться в детали, то обычно это выглядит следующим образом:

import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import ru.alfastrah.api.tracing.observability.soap.ws.KnownSoapClient;

@Configuration
class TestSoapClientsConfiguration {

    public static final String TEST_URL = "test-url";

    @Bean
    public KnownSoapClient knownSoapClient() {
        JaxWsProxyFactoryBean jaxWsProxyFactoryBean = new JaxWsProxyFactoryBean();
        jaxWsProxyFactoryBean.setAddress(TEST_URL);
        return jaxWsProxyFactoryBean.create(KnownSoapClient.class);
    }
}

Для добавления HTTP-заголовка traceparent мы создаем реализацию AbstractOutDatabindingInterceptor, которую должны передать в factory JaxWsProxyFactoryBean.

Исходный код interceptor может выглядеть следующим образом:

import io.micrometer.tracing.TraceContext;
import io.micrometer.tracing.Tracer;
import org.apache.cxf.interceptor.AbstractOutDatabindingInterceptor;
import org.apache.cxf.interceptor.Fault;
import org.apache.cxf.message.Message;
import org.apache.cxf.phase.Phase;
import ru.alfastrah.api.tracing.util.Constants;

import java.util.List;
import java.util.Map;
import java.util.TreeMap;

import static java.util.Objects.isNull;
import static java.util.Objects.nonNull;

public class SoapTraceParentOutDatabindingInterceptor extends AbstractOutDatabindingInterceptor {

    private final Tracer tracer;

    public SoapTraceParentOutDatabindingInterceptor(
            Tracer tracer
    ) {
        super(Phase.PREPARE_SEND);
        this.tracer = tracer;
    }

    private static Map<String, List<String>> fetchHeaders(Message message) {
        Map<String, List<String>> headers = (Map<String, List<String>>) message.get(Message.PROTOCOL_HEADERS);
        if (isNull(headers)) {
            headers = new TreeMap<>();
        }
        return headers;
    }

    @Override
    public void handleMessage(Message message) throws Fault {
        final TraceContext traceContext = tracer.currentTraceContext().context();
        if (nonNull(traceContext)) {
            final Map<String, List<String>> headers = fetchHeaders(message);
            headers.put(
                    Constants.HttpHeaders.TRACE_PARENT,
                    Constants.Helpers.TO_TRACE_PARENT.apply(traceContext)
            );
            message.put(Message.PROTOCOL_HEADERS, headers);
        }
    }
}

Здесь компонент получает текущий контекст из переданного в конструктор io.micrometer.tracing.Tracer, после чего формирует заголовок и добавляет его к сообщению.

Заголовок формируется функцией Constants.Helpers.TO_TRACE_PARENT типа Function<TraceContext, List<String>>:

public static final Function<TraceContext, List<String>> TO_TRACE_PARENT = traceContext -> List.of(
        "%s-%s-%s-%s".formatted(
                "00", // https://www.w3.org/TR/trace-context/#version
                traceContext.traceId(), // https://www.w3.org/TR/trace-context/#trace-id
                traceContext.parentId(), // https://www.w3.org/TR/trace-context/#parent-id
                Boolean.TRUE.equals(traceContext.sampled()) ? "01" : "00" // https://www.w3.org/TR/trace-context/#sampled-flag
        ));

Немного выше я привел пример того, как создается прокси. Следует помнить, что его создание происходит в приложении, тогда как конфигурация происходит в компоненте стартера. Отсюда возникает важная деталь: нужно передать interceptor в JaxWsProxyFactoryBean, при этом оставить контроль над трейс-конфигурацией стартеру, не передавая его приложению и не заставляя разработчика об этом помнить.

Решением данной задачи для нас служит @Aspect, содержащий @PointCut’ы для методов, которые создают прокси. Сразу отмечу, что количество SOAP-сервисов невелико, а все пакеты, которые генерируются jax’ом для них, также известны.

import lombok.RequiredArgsConstructor;
import org.apache.cxf.endpoint.Client;
import org.apache.cxf.frontend.ClientProxy;
import org.aspectj.lang.annotation.AfterReturning;
import org.aspectj.lang.annotation.Aspect;
import org.aspectj.lang.annotation.Pointcut;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Aspect
@RequiredArgsConstructor
public class SoapClientCreationAspect {

    private final SoapTraceParentOutDatabindingInterceptor soapTraceParentOutDatabindingInterceptor;

    @Pointcut("execution(ru.alfastrah.api..ws.* *(..))")
    public void matchesSoapClientType() {
    }

    @Pointcut("@within(org.springframework.context.annotation.Configuration) && @annotation(org.springframework.context.annotation.Bean)")
    public void beanInConfiguration() {
    }

    @Pointcut("beanInConfiguration() && matchesSoapClientType()")
    public void soapClientBeanCreation() {
    }

    @AfterReturning(pointcut = "soapClientBeanCreation()", returning = "proxy")
    public Object afterUnicusSoapClientBeanCreated(Object proxy) {
        final Client client = ClientProxy.getClient(proxy);
        client.getOutInterceptors().add(soapTraceParentOutDatabindingInterceptor);
        return proxy;
    }
}

Конфигурация вышеуказанных компонентов для включения аспекта:

import io.micrometer.tracing.Tracer;
import org.apache.cxf.jaxws.JaxWsProxyFactoryBean;
import org.springframework.boot.autoconfigure.condition.ConditionalOnBean;
import org.springframework.boot.autoconfigure.condition.ConditionalOnClass;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration(proxyBeanMethods = false)
@ConditionalOnClass(JaxWsProxyFactoryBean.class)
public class SoapObservationConfiguration {

    @Bean
    @ConditionalOnBean(Tracer.class)
    SoapTraceParentOutDatabindingInterceptor soapTraceParentOutDatabindingInterceptor(
            Tracer tracer
    ) {
        return new SoapTraceParentOutDatabindingInterceptor(tracer);
    }

    @Bean
    @ConditionalOnBean(SoapTraceParentOutDatabindingInterceptor.class)
    SoapClientCreationAspect soapTracingAspect(
            SoapTraceParentOutDatabindingInterceptor interceptor
    ) {
        return new SoapClientCreationAspect(interceptor);
    }
}

Если вы конфигурируете трейсы в приложении, то реализация @Aspect’а для перехвата создаваемых прокси может оказаться излишеством. Достаточно просто передать interceptor в JaxWsProxyFactoryBean.

Зависимости

Классы org.apache.cxf.frontend.ClientProxy и org.apache.cxf.jaxws.JaxWsProxyFactoryBean можно найти, подключив зависимость:

<dependency>
    <groupId>org.apache.cxf</groupId>
    <artifactId>cxf-rt-frontend-jaxws</artifactId>
    <version>4.1.5</version>
    <scope>provided</scope>
</dependency>

Результат

Все прокси типов ru.alfastrah.api..ws.*, создаваемые приложением, перехватываются и обогащаются дополнительным interceptor’ом, который добавляет HTTP-заголовок traceparent к вызовам SOAP-сервисов.

Spring Cloud Gateway

Интеграции (REST и SOAP) в нашем случае осуществляются с применением Spring Cloud Gateway, который маршрутизирует запросы на микросервисы. Помимо маршрутизации, у него есть иные задачи (добавление заголовков, аутентификация и т.п.), поэтому его участие также нужно видеть в цепочке трейсов для наиболее полной картины потока данных.

Spring Cloud Gateway написан на реактивном стеке, для его конфигурации потребуется поместить ObservationRegistry в инстанс ObservationThreadLocalAccessor и включить автоматическое распространение контекста. Эта конфигурация относится не только к Gateway, но и к сервисам, написанным на реактивном стеке.

Исходный код InitializingBean с конфигурацией, включающей его:

import io.micrometer.context.ContextRegistry;
import io.micrometer.observation.ObservationRegistry;
import io.micrometer.observation.contextpropagation.ObservationThreadLocalAccessor;
import org.springframework.beans.factory.InitializingBean;
import org.springframework.boot.autoconfigure.condition.ConditionalOnWebApplication;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import reactor.core.publisher.Hooks;

@Configuration(proxyBeanMethods = false)
@ConditionalOnWebApplication(type = ConditionalOnWebApplication.Type.REACTIVE)
public class ReactorObservationConfiguration {

    @Bean
    public InitializingBean reactorObservationInitializer(
            ObservationRegistry observationRegistry
    ) {
        return () -> {
            // Link ThreadLocal Micrometer with Reactor Context
            final ObservationThreadLocalAccessor threadLocalAccessor = ObservationThreadLocalAccessor.getInstance();
            threadLocalAccessor.setObservationRegistry(observationRegistry);
            ContextRegistry.getInstance().registerThreadLocalAccessor(threadLocalAccessor);

            // Enable automatic context propagation in Reactor 3.5.3+
            Hooks.enableAutomaticContextPropagation();
        };
    }
}

Зависимости

Добавляем io.projectreactor:reactor-core-micrometer, чтобы воспользоваться классом reactor.core.publisher.Hooks.

<dependency>
    <groupId>io.projectreactor</groupId>
    <artifactId>reactor-core-micrometer</artifactId>
    <scope>provided</scope>
</dependency>

Результат

Реактивные приложения (в т.ч. Spring Cloud Gateway), участвующие в интеграционном потоке, поддерживают трассировку.

Сборка всех конфигураций

Вышеописанные конфигурации добавляем в @Import над главным классом CustomTracingAutoConfiguration. Автоконфигурация включается, когда свойство management.tracing.enabled имеет значение true.

import org.springframework.boot.actuate.autoconfigure.tracing.zipkin.ZipkinAutoConfiguration;

@ConditionalOnProperty(name = "management.tracing.enabled", havingValue = "true")
@AutoConfiguration(before = ZipkinAutoConfiguration.class)
@Import({
        KafkaSenderConfiguration.class,
        ObservedAspectConfiguration.class,
        FeignObservedConfiguration.class,
        KafkaObservationEnablerConfig.class,
        SoapObservationConfiguration.class,
        ReactorObservationConfiguration.class
})
public class CustomTracingAutoConfiguration {
}

Использование

Подключение

Стартер подключается к приложению как зависимость в pom.xml.

<dependency>
    <groupId>ru.alfastrah.api</groupId>
    <artifactId>spring-boot-tracing-starter</artifactId>
    <version>1.1.0</version>
</dependency>

Затем в application.yaml добавляются свойства для его включения. Я упоминал их ранее, но размещу также здесь, чтобы были под рукой:

Конфигурация стартера и management в application.yaml

```yaml custom: tracing: bootstrap-servers: broker-1:9092, broker-2:9092, broker-3:9092 username: ${USERNAME} password: ${PASSWORD} sasl-mechanism: SCRAM-SHA-256 security-protocol: SASL_SSL topic: trace-topic management: tracing: enabled: true propagation: type: w3c sampling: probability: 1 ```

Демонстрация результатов

Для воспроизведения полной картины появившихся возможностей соберем систему сервисов. Цель их создания - демонстрация трейса, оставленного при их взаимодействии друг с другом.

Каждый сервис оснащен стартером и демонстрирует его функционирование на примере перечисленных интеграций. Ни один из сервисов не имплементирует дополнительной логики работы с трейсами. Эту задачу полностью реализует стартер. Реализации/конфигурации тестовых приложений скрыты под спойлерами.

API-gateway

Принимает запросы на порту 8080 и маршрутизирует их на сервис request-handler, начинает трейс. Содержит конфигурацию единственного route.

Конфигурация route'а (application.yaml) для API-gateway

```yaml spring: cloud: gateway: routes: - id: request-handler predicates: Path=/trace-demo/start uri: http://localhost:8081 ```

Request-handler

Принимает запросы на порту 8081 от api-gateway, далее делает REST-запрос в сервис producer-service с помощью Feign-клиента.

Реализация Request-handler

```java import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; import org.springframework.boot.SpringApplication; import org.springframework.boot.autoconfigure.SpringBootApplication; import org.springframework.cloud.openfeign.EnableFeignClients; import org.springframework.cloud.openfeign.FeignClient; import org.springframework.http.HttpStatus; import org.springframework.http.MediaType; import org.springframework.web.bind.annotation.PostMapping; import org.springframework.web.bind.annotation.RequestBody; import org.springframework.web.bind.annotation.ResponseStatus; import org.springframework.web.bind.annotation.RestController;

@SpringBootApplication @Slf4j @EnableFeignClients(clients = RequestHandlerApp.DemoClient.class) public class RequestHandlerApp {

public static void main(String[] args) {
    SpringApplication.run(RequestHandlerApp.class, args);
}

@FeignClient(
        name = "demo-client",
        url = "http://localhost:8082"
)
public interface DemoClient {
    @PostMapping(path = "/trace-demo/producer/send", produces = MediaType.APPLICATION_JSON_VALUE)
    void sendRequest(@RequestBody RequestDto requestDto);
}

@RequiredArgsConstructor
@RestController
public static class DemoRestController {

    private final DemoClient demoClient;

    @PostMapping(path = "/trace-demo/start", consumes = MediaType.APPLICATION_JSON_VALUE)
    @ResponseStatus(HttpStatus.NO_CONTENT)
    void start(@RequestBody RequestDto request) {
        log.info("Received request: {}", request);
        demoClient.sendRequest(request);
    }
}

public record RequestDto(String message) {
}

}

</spoiler>

#### Producer-service

Принимает запросы на порту 8082 от **request-handler**, а затем отправляет сообщение в **kafka**.
Демонстрирует поддержку трассировки при взаимодействии через kafka.

<spoiler title="Реализация Producer-service">
```java
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.http.MediaType;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestBody;
import org.springframework.web.bind.annotation.RestController;

import java.util.UUID;
import java.util.concurrent.CompletableFuture;

@SpringBootApplication
@Slf4j
public class ProducerApp {

    public static void main(String[] args) {
        SpringApplication.run(ProducerApp.class, args);
    }

    @RequiredArgsConstructor
    @RestController
    public static class DemoRestController {

        private final KafkaTemplate<UUID, RequestDto> kafkaTemplate;

        @PostMapping(path = "/trace-demo/producer/send",
                consumes = MediaType.APPLICATION_JSON_VALUE,
                produces = MediaType.APPLICATION_JSON_VALUE
        )
        CompletableFuture<SendResult> produce(@RequestBody RequestDto request) {
            log.info("Received request: {}", request);
            final UUID key = UUID.randomUUID();

            log.info("send message to kafka: {}", request);
            return kafkaTemplate.sendDefault(key, request)
                    .thenApply(sendResult ->
                            new SendResult(sendResult.getProducerRecord().key()));
        }

    }

    record RequestDto(String message) {
    }

    record SendResult(UUID messageKey) {
    }
}

Consumer-service

Слушает сообщения в топике demo-traces-topic, реализует паттерн DLQ с помощью встроенных инструментов Spring:

  • первая итерация обработки завершается ошибкой, помещает сообщение в топик demo-traces-topic-retry;

  • вторая итерация успешна, отправляет запрос далее, в soap-service.

Демонстрирует обработку заголовков сообщения Kafka, продолжая ранее начатый трейc, а также добавление заголовка traceparent к SOAP-запросу.

Реализация Consumer-service

```java import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; import org.apache.cxf.ext.logging.LoggingInInterceptor; import org.apache.cxf.ext.logging.LoggingOutInterceptor; import org.apache.cxf.jaxws.JaxWsProxyFactoryBean; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.springframework.boot.SpringApplication; import org.springframework.boot.autoconfigure.SpringBootApplication; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.kafka.annotation.KafkaListener; import org.springframework.kafka.annotation.RetryableTopic; import org.springframework.kafka.support.KafkaHeaders; import org.springframework.messaging.handler.annotation.Header; import org.springframework.retry.annotation.Backoff; import org.springframework.stereotype.Component; import ru.alfastrah.schemas.interplat4.AnyPort; import ru.alfastrah.schemas.interplat4.GetAnyRequest; import ru.alfastrah.schemas.interplat4.GetAnyResponse;

import java.util.UUID;

@SpringBootApplication @Slf4j public class ConsumerApp {

public static void main(String[] args) {
    SpringApplication.run(ConsumerApp.class, args);
}

@Component
@RequiredArgsConstructor
public static class Listener {

    private final AnyPort anyPort;

    @RetryableTopic(
            attempts = "2",
            backoff = @Backoff(delayExpression = "1000")
    )
    @KafkaListener(topics = "demo-traces-topic")
    public void listen(
            ConsumerRecord<UUID, KafkaMessage> consumerRecord,
            @Header(KafkaHeaders.RECEIVED_TOPIC) String receivedTopic

    ) {
        final KafkaMessage value = consumerRecord.value();
        log.info("Handled message: {}", value);

        if (!receivedTopic.endsWith("retry")) {
            log.error("Processing error. Retry");
            throw new RuntimeException("Processing error");
        }

        final GetAnyRequest soapRequest = new GetAnyRequest();
        soapRequest.setMessage(value.message());

        final GetAnyResponse soapResponse = anyPort.getAny(soapRequest);
        log.info("Response: {}", soapResponse.isResult());
    }

}

@Configuration
public static class SoapConfig {

    @Bean
    AnyPort anyPort() {
        JaxWsProxyFactoryBean jaxWsProxyFactoryBean = new JaxWsProxyFactoryBean();
        jaxWsProxyFactoryBean.setAddress("http://localhost:8084/ws");
        jaxWsProxyFactoryBean.getOutInterceptors().add(new LoggingOutInterceptor());
        jaxWsProxyFactoryBean.getInInterceptors().add(new LoggingInInterceptor());
        return jaxWsProxyFactoryBean.create(AnyPort.class);
    }
}

public record KafkaMessage(String message) {
}

}

</spoiler>

#### Soap-service

Принимает SOAP-запросы на порту 8084 от **consumer-service**, формирует успешный ответ. Демонстрирует обработку заголовка `traceparent` в [SOAP-сервисах,
созданных с помощью Spring Boot](https://www.baeldung.com/spring-boot-soap-web-service).

<spoiler title="Реализация Soap-service">
```java
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.boot.web.servlet.ServletRegistrationBean;
import org.springframework.context.ApplicationContext;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.core.io.ClassPathResource;
import org.springframework.ws.config.annotation.EnableWs;
import org.springframework.ws.config.annotation.WsConfigurerAdapter;
import org.springframework.ws.server.endpoint.annotation.Endpoint;
import org.springframework.ws.server.endpoint.annotation.PayloadRoot;
import org.springframework.ws.server.endpoint.annotation.RequestPayload;
import org.springframework.ws.server.endpoint.annotation.ResponsePayload;
import org.springframework.ws.transport.http.MessageDispatcherServlet;
import org.springframework.ws.wsdl.wsdl11.DefaultWsdl11Definition;
import org.springframework.xml.xsd.SimpleXsdSchema;
import org.springframework.xml.xsd.XsdSchema;
import ru.alfastrah.api.ws.GetAnyRequest;
import ru.alfastrah.api.ws.GetAnyResponse;

@SpringBootApplication
@Slf4j
public class SoapServiceApplication {

    private static final String NAMESPACE_URI = "http://alfastrah.ru/api/ws";

    public static void main(String[] args) {
        SpringApplication.run(SoapServiceApplication.class, args);
    }

    @Endpoint
    @RequiredArgsConstructor
    public static class AnyEndpoint {

        @PayloadRoot(namespace = NAMESPACE_URI, localPart = "getAnyRequest")
        @ResponsePayload
        public GetAnyResponse get(@RequestPayload GetAnyRequest request) {
            log.info("Handling message: {}", request.getMessage());

            final GetAnyResponse getAnyResponse = new GetAnyResponse();
            getAnyResponse.setResult(true);

            return getAnyResponse;
        }
    }

    @EnableWs
    @Configuration
    public static class WebServiceConfig extends WsConfigurerAdapter {

        @Bean
        public ServletRegistrationBean<MessageDispatcherServlet> messageDispatcherServlet(
                ApplicationContext applicationContext
        ) {
            MessageDispatcherServlet servlet = new MessageDispatcherServlet();
            servlet.setApplicationContext(applicationContext);
            servlet.setTransformWsdlLocations(true);
            return new ServletRegistrationBean<>(servlet, "/ws/*");
        }

        @Bean(name = "any")
        public DefaultWsdl11Definition defaultWsdl11Definition(XsdSchema countriesSchema) {
            DefaultWsdl11Definition wsdl11Definition = new DefaultWsdl11Definition();
            wsdl11Definition.setPortTypeName("AnyPort");
            wsdl11Definition.setLocationUri("/ws");
            wsdl11Definition.setTargetNamespace(NAMESPACE_URI);
            wsdl11Definition.setSchema(countriesSchema);
            return wsdl11Definition;
        }

        @Bean
        public XsdSchema countriesSchema() {
            return new SimpleXsdSchema(new ClassPathResource("/schema.xsd"));
        }
    }
}

Запуск и проверка

Для запуска демонстрации делаем POST запрос на api-gateway:

POST /trace-demo/start HTTP/1.1
Host: localhost:8080
Authorization: Bearer bearer.token.content
Content-Type: application/json
Content-Length: 34

{
    "message": "Hello, World!"
}

Посмотрим значимые логи сервисов. В них видим, что каждый сервис поучаствовал в обработке. В логах каждого сервиса присутствует traceId 69bd61a67d70e718ad939eda62b5ba02.

2026-03-20T18:03:02.925+03:00  INFO 21738 --- [api-gateway] [ctor-http-nio-4] [69bd61a67d70e718ad939eda62b5ba02-05d2ca72fd9e68f7] r.a.a.i.a.f.global.RequestLoggingFilter  : ---> [request-handler] [31da0780-4] [POST http://localhost:8080/trace-demo/start], headers=[{Authorization=[***], Accept=[*/*], Cache-Control=[no-cache], User-Agent=[PostmanRuntime/7.29.3], Connection=[keep-alive], Postman-Token=[b0de70f4-7600-4b02-9d0c-07087d99dcf7], Host=[localhost:8080], Accept-Encoding=[gzip, deflate, br], Content-Length=[34], Content-Type=[application/json]}]
2026-03-20T18:03:03.763+03:00  INFO 21738 --- [api-gateway] [ctor-http-nio-4] [69bd61a67d70e718ad939eda62b5ba02-05d2ca72fd9e68f7] r.a.a.i.a.f.global.RequestLoggingFilter  : <--- [request-handler] [31da0780-4] [204 NO_CONTENT http://localhost:8081/trace-demo/start]

2026-03-20T18:03:03.087+03:00  INFO 13874 --- [request-handler] [nio-8081-exec-1] [69bd61a67d70e718ad939eda62b5ba02-3dd0d6ee2d003442] ru.alfastrah.api.RequestHandlerApp       : Received request: RequestDto[message=Hello, World!]
2026-03-20T18:03:03.118+03:00 DEBUG 13874 --- [request-handler] [nio-8081-exec-1] [69bd61a67d70e718ad939eda62b5ba02-3dd0d6ee2d003442] r.a.api.RequestHandlerApp$DemoClient     : [DemoClient#sendRequest] ---> POST http://localhost:8082/trace-demo/producer/send HTTP/1.1
2026-03-20T18:03:03.730+03:00 DEBUG 13874 --- [request-handler] [nio-8081-exec-1] [69bd61a67d70e718ad939eda62b5ba02-3dd0d6ee2d003442] r.a.api.RequestHandlerApp$DemoClient     : [DemoClient#sendRequest] <--- HTTP/1.1 200 (605ms)

2026-03-20T18:03:03.235+03:00  INFO 20771 --- [producer-service] [nio-8082-exec-9] [69bd61a67d70e718ad939eda62b5ba02-0996a1356bf5ebd7] ru.alfastrah.api.ProducerApp             : Received request: RequestDto[message=Hello, World!]
2026-03-20T18:03:03.241+03:00  INFO 20771 --- [producer-service] [nio-8082-exec-9] [69bd61a67d70e718ad939eda62b5ba02-0996a1356bf5ebd7] ru.alfastrah.api.ProducerApp             : Send message to kafka: RequestDto[message=Hello, World!]

2026-03-20T18:03:03.647+03:00  INFO 16539 --- [consumer-service] [ntainer#0-0-C-1] [69bd61a67d70e718ad939eda62b5ba02-05d5ea464727cedd] r.a.api.app.ConsumerApp$Listener         : Handled message from topic demo-traces-topic: KafkaMessage[message=Hello, World!]
2026-03-20T18:03:03.665+03:00 ERROR 16539 --- [consumer-service] [ntainer#0-0-C-1] [69bd61a67d70e718ad939eda62b5ba02-05d5ea464727cedd] r.a.api.app.ConsumerApp$Listener         : Processing error. Retry
2026-03-20T18:03:04.688+03:00  INFO 16539 --- [consumer-service] [r#0-retry-0-C-1] [69bd61a67d70e718ad939eda62b5ba02-37ea00b17e36fb56] r.a.api.app.ConsumerApp$Listener         : Handled message from topic demo-traces-topic-retry: KafkaMessage[message=Hello, World!]
2026-03-20T18:03:04.704+03:00  INFO 16539 --- [consumer-service] [r#0-retry-0-C-1] [69bd61a67d70e718ad939eda62b5ba02-37ea00b17e36fb56] o.apache.cxf.services.AnyPort.REQ_OUT    : REQ_OUT
    Address: http://localhost:8084/ws
    HttpMethod: POST
    Content-Type: text/xml
    ExchangeId: 0d943c99-2f23-457f-b102-8380c0ac6fab
    ServiceName: AnyPortService
    PortName: AnyPortPort
    PortTypeName: AnyPort
    Headers: {SOAPAction="", Accept=*/*, traceparent=00-69bd61a67d70e718ad939eda62b5ba02-fd2a270c73531479-01}
    Payload: <soap:Envelope xmlns:soap="http://schemas.xmlsoap.org/soap/envelope/"><soap:Body><getAnyRequest xmlns="http://alfastrah.ru/api/ws"><message>Hello, World!</message></getAnyRequest></soap:Body></soap:Envelope>

2026-03-20T18:03:04.832+03:00  INFO 16539 --- [consumer-service] [r#0-retry-0-C-1] [69bd61a67d70e718ad939eda62b5ba02-37ea00b17e36fb56] o.apache.cxf.services.AnyPort.RESP_IN    : RESP_IN
    Address: http://localhost:8084/ws
    Content-Type: text/xml;charset=utf-8
    ResponseCode: 200
    ExchangeId: 0d943c99-2f23-457f-b102-8380c0ac6fab
    ServiceName: AnyPortService
    PortName: AnyPortPort
    PortTypeName: AnyPort
    Headers: {date=Fri, 20 Mar 2026 15:03:04 GMT, SOAPAction="", Accept=text/xml, text/html, image/gif, image/jpeg, *; q=.2, */*; q=.2, content-type=text/xml;charset=utf-8, Content-Length=256}
    Payload: <SOAP-ENV:Envelope xmlns:SOAP-ENV="http://schemas.xmlsoap.org/soap/envelope/"><SOAP-ENV:Header/><SOAP-ENV:Body><ns2:getAnyResponse xmlns:ns2="http://alfastrah.ru/api/ws"><ns2:result>true</ns2:result></ns2:getAnyResponse></SOAP-ENV:Body></SOAP-ENV:Envelope>

2026-03-20T18:03:04.805+03:00  INFO 16568 --- [soap-service] [nio-8084-exec-6] [69bd61a67d70e718ad939eda62b5ba02-950931dbcb208d37] r.alfastrah.api.SoapServiceApplication   : Handling message: Hello, World!

Далее посмотрим, как трейс 69bd61a67d70e718ad939eda62b5ba02 выглядит в Zipkin. Если он развернут на localhost, то прямая ссылка на него выглядит так http://localhost:9411/zipkin/traces/69bd61a67d70e718ad939eda62b5ba02.

Таблица span’ов

Таблицу можно увидеть, нажав на кнопку “Span table” в интерфейсе трейса. В таблице можно найти имена сервисов, участвующих во взаимодействии и дополнительную информацию. Каждый сервис присутствует, значит все сконфигурировано корректно.

span table
span table

Таймлайн

На таймлайне видны разрывы. Они соответствуют времени, которое ушло на то, чтобы сервис увидел, что в топике появилось сообщение и на ожидание перед повторной обработкой (retry).

timeline
timeline

Дерево span’ов

Дерево показывает подробности обработки.

span tree
span tree

Заключение

Путем добавления нескольких компонентов в конфигурацию, полученную как результат первой части статьи, мы все-таки достигли желаемой цели: теперь трейс содержит всю цепочку взаимодействий сервисов, которые участвовали в итерации обработки запроса.

Надеюсь, Вам было интересно.

Исходный код стартера можно найти по ссылке.

Благодарности

Большое спасибо моему доброму другу и коллеге Андрею за вклад в развитие описанной функциональности. Вместе мы довели ее до состояния, когда она помогает не только нашей команде, но и коллегам из смежных команд, которые ее используют.

Комментарии (1)


  1. LexaStealth
    20.04.2026 15:33

    Наконец-то нормальная статья, написанная нормальным языком на фоне потока нейрослопа