Привет! Меня зовут Игнат. В этой статье я расскажу про разработку адаптера для работы с EGTS-протоколом (Era Glonass Telematics Standard), который можно использовать для передачии телеметрии курьеров из курьерских служб в РНИС (РНИС — государственная информационная система «Единая региональная навигационно‑информационная система города Москвы»). Код написан на Kotlin и подходит для использования в Java и Kotlin приложениях. Простота подключения модуля привела к идее вынести проект в опенсорс, чтобы его могли использовать любые курьерские службы с бэкендом на jvm. Поехали!
Требования, ограничения, приказы и другие вводные для разработки
В начале 2024 года Департамент транспорта Москвы выпустил приказ, который стандартизирует работу курьеров сервисов доставки. Одно из требований — передавать информацию о перемещении курьеров-партнёров в реальном времени (с полным списком можно ознакомиться здесь). Данные нужно передавать регулятору бинарными пакетами Era Glonass Telematics Standard по протоколу TCP/IP.
Согласно приведенному выше документу отправлять нужно следующие данные:
id курьера;
id устройства;
координаты;
скорость и направление движения;
тип передвижения (пешком, авто и т.п.);
номер транспортного средства;
номер сумки;
статус курьера;
номер заказа.
Нефункциональные требования к передаче:
передача раз в 20 секунд по протоколу EGTS (передача потока байтов по открытому tcp соединению);
соединение остается открытым всё время работы сервиса (стримим байты в канал отправки и слушаем такой же поток байтов на прием чтобы получать ответы).
Поскольку привычный JSON over HTTP использовать для этой задачи нельзя в силу нефункциональных требований, а готовых Java-библиотек для отправки телеметрии по EGTS нет, я разработал адаптер EGTS протокола, который сейчас выведен в open-source и доступен для использования всеми желающими.
Особенности EGTS протокола
Телематические данные согласно нормативной документации нужно передавать гостированным бинарным протоколом EGTS. Любому инженеру хочется работать без ограничений, но они всегда существуют и делают работу даже интереснее — вот какие плюсы есть в EGTS на мой взгляд:
Компактность данных
В сравнении с JSON/XML бинарный формат сильно экономит объем передаваемой информации, потому что при кодировании используется минимально необходимое число бит. Например, направление с точностью до градуса передается всего 9 битами, а скорость с точностью до 0,1 км/сек — 14 битами);Проверка целостности
Каждый пакет содержит CRC контрольную сумму как заголовка, так и блока данных;Подтверждение доставки
Встроенная система ACK/NACK уведомляет отправителя об успешной доставке или ошибках;Сессионная работа
Соединение остается открытым всё время работы, что снижает накладные расходы на установку/разрыв TCP-сессий;Стандартизация
Соответствует ГОСТ Р 54619-2011, что обеспечивает совместимость с гос.системами.
Адаптируем EGTS под передачу трекинга курьеров
В этом разделе я приведу пример дизайна информационной системы для сбора и передачи трекинга, покажу место адаптера протокола в этой информационной системе, расскажу про структуру EGTS пакетов и внутреннюю архитектуру адаптера протокола.
Пример дизайна информационной системы для передачи трекинга и место адаптера EGTS протокола в ней

внешняя система КИС АРТ (art.taxi.mos.ru — комплексная информационная система «Аналитика работы такси»)
внешняя система РНИС — государственная информационная система «Единая региональная навигационно-информационная система города Москвы»
Информационная система построена из нескольких микросервисов. Точки трекинга генерируются мобильным приложением курьера либо IOT устройством на транспортном средстве. За прием событий трекинга с мобильных устройств отвечает сервис процессинга сообщений с мобильных устройств. Полученные события он передает потребителям через топик Kafka.
Треки курьеров должны содержать в себе ID и статус курьера в системе КИС АРТ. За взаимодействие с этой системой отвечает сервис Цифровой Паспорт Курьера. Данные об ID и статусе курьеров он так же передает потребителям через Kafka.
Основную роль в передаче трекинга играют:
Сервис трекинга курьеров, который принимает и обогащает точки трека дополнительными данными - например, ID курьера в системе КИС АРТ. Данные для обогащения треков сервис хранит в БД. поскольку треки поступают довольно часто, а данные для обогащения (атрибуты курьеров) обновляются редко, обращения к этим данным происходят через распределенный кэш. Поды сервиса сохраняют точки в реляционной БД и пересылают их пачками по REST с гарантией доставки в поды гейтвея;
Внешний API Gateway, передающий их непосредственно в Департамент транспорта. Реплики gateway с помощью адаптера EGTS протокола пакуют каждую пачку в бинарный пакет и отправляют их по TCP соединению в РНИС.
Структура пакета EGTS
В протоколе EGTS данные передаются в виде вложенных бинарных структур, основные уровни — транспортный и уровень услуг.
На транспортном уровне закодирован заголовок пакета, включающий информацию о пакете (id пакета, размер и тип содержимого) и служебную информацию (длина заголовка, тип и ключ шифрования, контрольная сумма и проч.)
Уровень услуг содержит одну или несколько записей со служебной информацией (длина, номер, различные флаги) и полезным содержанием. В нашем случае полезное содержание – это id курьера и несколько подзаписей разного типа, каждая из которых отвечает за свой тип полезного содержания:
телематика (координаты, скорость);
ID-курьера в системе КИС АРТ;
номера заказов, ТС и сумки;
тип передвижения;
статус курьера.
В описываемой реализации, каждый пакет может содержать несколько сотен записей по разным курьерам. Запись содержит одну точку трекинга одного конкретного курьера, а в ее подзаписях уже размещена информация по разным параметрам этой точки трекинга.

Внутренняя архитектура EGTS адаптера
Здесь и далее речь пойдет о непосредственно коде адаптера. Некоторые части я буду приводить в статье и комментировать, полный же код доступен в репозитории.
Иерархическая структура EGTS пакета представлена data-классами в пакете model модуля library. Кодировка-декодировка реализована классами энкодеров, которые для использования в приложении либо инициализируются в Spring бины, либо могут быть инициализированы вручную. Классы бинов расположены в пакете encoder модуля library. Структура обоих пакетов (model и encoder) повторяет иерархическую структуру package EGTS. Инициализация Sping бинов вынесена в автоконфигурацию spring-boot Starter модуля starter.
├── encoder
│ ├── AbstractEgtsEncoder.kt
│ ├── EgtsPacketEncoder.kt
│ └── sfrd
│ ├── AppServicesFrameDataEncoder.kt
│ ├── ResponseServicesFrameDataEncoder.kt
│ ├── ServiceDataRecordsEncoder.kt
│ └── record
│ ├── RecordDataEncoder.kt
│ └── subrecord
│ ├── AbstractSubRecordEncoder.kt
│ ├── AnalogSensorDataEncoder.kt
│ ├── AuthResultEncoder.kt
│ ├── DispatcherIdentityEncoder.kt
│ ├── ExternalDataEncoder.kt
│ ├── PosSubRecordDataEncoder.kt
│ └── SubRecordResponseEncoder.kt
├── model
│ ├── EgtsPacket.kt
│ └── sfrd
│ ├── AppServicesFrameData.kt
│ ├── ResponseServicesFrameData.kt
│ ├── ServiceDataRecords.kt
│ ├── ServicesFrameData.kt
│ └── record
│ ├── RecordData.kt
│ ├── ServiceDataRecord.kt
│ └── subrecord
│ ├── SubRecord.kt
│ ├── SubRecordData.kt
│ └── types
│ ├── AnalogSensorData.kt
│ ├── AuthResult.kt
│ ├── DispatcherIdentity.kt
│ ├── ExternalSensorData.kt
│ ├── PosSubRecordData.kt
│ └── SubRecordResponse.kt
Рис.3 Структура пакетов энкодера и модели
При кодировании пакета EgtsPacketEncoder (энкодер верхнего уровня) получает на вход метода performEncode data-класс EgtsPacket. В полях которого содержится информация верхнего уровня пакета и вложенные data-классы более нижних уровней. Подготовка дата классов производится, например, фабрикой в составе приложения. Полный пример можно посмотреть в отдельном репозитории с демо-приложением. Часть кода с формированием пакета телеметрии из доменного класса CourierTrackRecord приведена ниже:
class PacketFactory
@Service
class PacketFactory(
private val egtsPacketEncoder: EgtsPacketEncoder,
) {
private val logger = LoggerFactory.getLogger(javaClass)
@Value("\${egts.dispatcher-id}") // ИД курьерской службы, присвоенный РНИС
private var dispatcherId: Int = 0
companion object {
/*
после установления соединения это второй полученный пакет
с помощью этой константы пакет с результатом авторизации извлекается
из receivedPacketsStorage
*/
const val AUTH_RESULT_PACKET_NUMBER: UShort = 1u
}
val MAX_USHORT_ID: UShort = UShort.MAX_VALUE
val FIRST_PACKET_ID: UShort = 0u // стартовое значение счетчика пакетов
val FIRST_RECORD_ID: UShort = 0u // стартовое значение счетчика записей
private var packetCounter: UShort = FIRST_PACKET_ID
private var recordCounter: UShort = FIRST_RECORD_ID
fun assembleAuthPacket(): EgtsPacket {
logger.debug("assembleAuthPacket()")
// пакет авторизации открывает соединение, поэтому при его сборке счетчики сбрасываются
packetCounter = FIRST_PACKET_ID
recordCounter = FIRST_RECORD_ID
// подзапись авторизации
val authSubRecord = SubRecord(
subRecordTypeId = SubRecordType.EGTS_SR_DISPATCHER_IDENTITY.id,
subRecordData = DispatcherIdentity(dispatcherId = dispatcherId),
)
// запись содержит единственную подзапись - подзапись авторизации
val serviceDataRecord = ServiceDataRecord(
recordNumber = getRecordNumber(),
sourceServiceType = ServiceType.AUTH_SERVICE,
recipientServiceType = ServiceType.AUTH_SERVICE,
recordData = RecordData(
subRecordList = listOf(authSubRecord),
),
)
// запись в пакете тоже единственная
val servicesFrameData = AppServicesFrameData(
serviceDataRecords = ServiceDataRecords(
sdrList = listOf(serviceDataRecord),
),
)
val egtsPacket = EgtsPacket(
packetIdentifier = getPacketNumber(),
packetType = PacketType.APP_DATA,
servicesFrameData = servicesFrameData,
)
return egtsPacket.also {
logger.info("assembled auth packet as {}", it)
}
}
/*
подтверждение получения пакета с единственной записью
поскольку при передаче трекинга единственный пакет (кроме подтверждений передачи) который
мы получаем от сервера - это результат авторизации, а в нем как раз одна запись,
реализация сборки пакетов подтверждения входящих пакетов с несколькими записями в фабрике не нужна
*/
fun assembleSingleRecordResponsePacket(incomingPacket: EgtsPacket): EgtsPacket {
logger.debug("assembleSingleRecordResponsePacket()")
// извлекаем единственную запись
val incomingPacketRecord = (incomingPacket.servicesFrameData as AppServicesFrameData)
.serviceDataRecords.sdrList.first()
// собираем содержимое подзаписи используя номер записи, получение которой подтверждаем
val responseSubRecordData = SubRecordResponse(
confirmedRecordNumber = incomingPacketRecord.recordNumber.toShort(),
recordStatus = EgtsConstants.RST_OK,
)
// кладем данные подзаписи в обертку с указанием типа подзаписи
val responseSubRecord = SubRecord(
subRecordTypeId = SubRecordType.EGTS_SR_RECORD_RESPONSE.id,
subRecordData = responseSubRecordData,
)
// и помещаем обертку в запись в виде листа с одним элементом
val responseRecordData = RecordData(
subRecordList = listOf(responseSubRecord),
)
// еще одна обертка для записи со служебной информацией
val responseRecord = ServiceDataRecord(
recordNumber = getRecordNumber(),
recordData = responseRecordData,
sourceServiceType = incomingPacketRecord.sourceServiceType,
recipientServiceType = incomingPacketRecord.recipientServiceType,
)
// запись в пакете подтверждения тоже одна
val responseServiceDataRecords = ServiceDataRecords(
sdrList = listOf(responseRecord),
)
val servicesFrameData = ResponseServicesFrameData(
responsePacketId = incomingPacket.packetIdentifier,
processingResult = TransportLayerProcessingResult.EGTS_PC_OK,
serviceDataRecords = responseServiceDataRecords,
)
return EgtsPacket(
servicesFrameData = servicesFrameData,
packetIdentifier = getPacketNumber(),
packetType = PacketType.RESPONSE,
).also { logger.info("assembled single record response packet {}", egtsPacketEncoder.encode(it).toHexString()) }
}
fun assembleTelematicsPacket(courierTrackRecords: List<CourierTrackRecord>): EgtsPacket {
logger.debug("assembleTelematicsPacket(list of {} CourierTrackRecords)", courierTrackRecords.size)
// сформированные записи трека
val couriersServiceDataRecords = courierTrackRecords.map { courierTrackRecord ->
val attId = courierTrackRecord.courierData.attId
if (attId < 0) throw IllegalArgumentException("attId appears to be negative which should not be the case")
val recordData = courierTrackRecord.toRecordData()
ServiceDataRecord(
recordNumber = getRecordNumber(),
objectIdentifier = attId.toUInt(), // каждая запись трека содержит ид устройства, с которого он был отправлен
sourceServiceType = ServiceType.TELE_DATA_SERVICE,
recipientServiceType = ServiceType.TELE_DATA_SERVICE,
recordData = recordData,
)
}
val servicesFrameData = AppServicesFrameData(
serviceDataRecords = ServiceDataRecords(
sdrList = couriersServiceDataRecords,
),
)
return EgtsPacket(
packetIdentifier = getPacketNumber(),
packetType = PacketType.APP_DATA,
servicesFrameData = servicesFrameData,
).also { logger.info("assembled telematics packet") }
}
// циклические счетчики пакетов и записей
private fun getPacketNumber() = packetCounter.also { if (packetCounter++ == MAX_USHORT_ID) packetCounter = FIRST_PACKET_ID }
private fun getRecordNumber() = recordCounter.also { if (recordCounter++ == MAX_USHORT_ID) recordCounter = FIRST_PACKET_ID }
}
// сборка записи трека с несколькими типа подзаписей внутри
fun CourierTrackRecord.toRecordData(): RecordData {
// собираем подзапись телеметрии
val posSubRecord = SubRecord(
subRecordTypeId = SubRecordType.EGTS_SR_POS_DATA.id,
subRecordData = toPosSubRecordData(),
)
// подзапись типа ТС
val vehicleTypeAnalogSensorSubRecord = SubRecord(
subRecordTypeId = SubRecordType.EGTS_SR_ABS_AN_SENS_DATA.id,
subRecordData = AnalogSensorData(
analogSensorNumber = RnisAnalogSensor.VEHICLE_TYPE.sensorNumber,
analogSensorValue = courierData.vehicleType.code,
),
)
// статус курьера
val courierStatusAnalogSensorSubRecord = SubRecord(
subRecordTypeId = SubRecordType.EGTS_SR_ABS_AN_SENS_DATA.id,
subRecordData = AnalogSensorData(
analogSensorNumber = RnisAnalogSensor.COURIER_STATUS.sensorNumber,
analogSensorValue = courierData.courierStatus.code,
),
)
// в эту подзапись конкатенированной строкой кладутся ИД в системе КИС АРТ, номер ТС, номер сумки номера заказов
val rnisExternalDataString = with(courierData) {
val kisartIdString = with(courierData) {
if (kisartId != null && kisartId > KISARTID_THRESHOLD_VALUE) "kisartid=$kisartId;" else ""
}
val vplateString = vehicleNumber?.let { "vplate=$it;" } ?: ""
val bagplatesString = "bagplates=$backpackNumber"
val ordersString = if (orderId?.isNotEmpty() == true) ";orders=${orderId.joinToString(separator = ",")}" else ""
"$kisartIdString$vplateString$bagplatesString$ordersString"
}
val rnisExternalDataSubRecord = SubRecord(
subRecordTypeId = SubRecordType.EGTS_SR_EXT_DATA.id,
subRecordData = ExternalSensorData(
vendorsData = VendorData(
data = rnisExternalDataString,
),
),
)
// собираем все эти подзаписи вместе чтобы положит в одну запись трека
val subRecordList = mutableListOf(
posSubRecord,
vehicleTypeAnalogSensorSubRecord,
courierStatusAnalogSensorSubRecord,
rnisExternalDataSubRecord,
)
/*
особенность протокола - если КИС АРТ ИД меньше трех байт - он помещается в специальную подзапись
очевидно, диапазон со временем закончился, и значения не попадающие в него передаются не в ней а в виде строки
вместе с номерами сумки прочей информацией в подзаписи выше
*/
with(courierData) {
if (kisartId != null && kisartId <= KISARTID_THRESHOLD_VALUE) {
subRecordList.add(
SubRecord(
subRecordTypeId = SubRecordType.EGTS_SR_ABS_AN_SENS_DATA.id,
subRecordData = AnalogSensorData(
analogSensorNumber = RnisAnalogSensor.KISART_ID.sensorNumber,
analogSensorValue = kisartId,
),
),
)
}
}
return RecordData(
subRecordList = subRecordList,
)
}
// сборка подзаписи непосредственно навигационных данных
fun CourierTrackRecord.toPosSubRecordData(): PosSubRecordData {
if (navigationData.direction !in 0.0..360.0) throw IllegalArgumentException("Direction must fit in range 0..360.0")
return PosSubRecordData(
navigationTime = time,
latitude = navigationData.latitude,
longitude = navigationData.longitude,
isValid = navigationData.isValid,
speed = navigationData.speed,
direction = navigationData.direction.toInt(),
)
}
Рис.4 Фабрика EGTS пакетов
Метод performEncode возвращает ByteArray, который может быть передан на EGTS-сервер по TCP. Для декодирования метод performDecode того же энкодера получает на вход ByteArray и возвращает верхнеуровневый data-класс пакета со всей вложенной структурой data-классов внутри.
При декодировании пакета энкодер каждого уровня, начиная с самого верхнего, читает информацию со своего уровня пакета и вызывает энкодер следующего уровня вложенности для чтения байтов. Если на следующем уровне чтение может осуществляться разными типами энкодеров, то для выбора конкретного энкодера используется байт типа подзаписи. Кодирование производится по тому же принципу делегирования энкодерам более нижнего уровня.
Ниже для иллюстрации подхода приведен код энкодера записи, который после чтения информации своего уровня, делегирует чтение содержащихся подзаписей декодерам подзаписи.class RecordDataEncoder
class RecordDataEncoder
class RecordDataEncoder(
subRecordEncoders: List<AbstractSubRecordEncoder<out SubRecordData>>,
) : AbstractEgtsEncoder<RecordData>("RECORD_DATA") {
private val encodersBySubRecordTypeId = subRecordEncoders.associateBy { it.subRecordTypeId }
override fun performEncode(egtsEntity: RecordData): ByteArray =
ByteArrayOutputStream().apply {
for (subRecord in egtsEntity.subRecordList) {
write(subRecord.subRecordTypeId)
subRecord.subRecordData?.let { subRecordData ->
val subRecordEncoder = pickAnEncoder(subRecord.subRecordTypeId)
val subRecordDataByteArray: ByteArray = subRecordEncoder.encode(
egtsEntity = subRecordData,
)
val subRecordLength = subRecordDataByteArray.size
write(subRecordLength.toShort().toLittleEndianByteArray())
write(subRecordDataByteArray)
} ?: {
val subRecordLength = 0
write(subRecordLength.toShort().toLittleEndianByteArray())
}
}
}.toByteArray()
override fun performDecode(byteArray: ByteArray): RecordData {
ByteArrayInputStream(byteArray).apply {
val subRecordList = mutableListOf<SubRecord>()
while (available() > 0) {
val subRecordTypeId = read()
val subRecordEncoder = pickAnEncoder(subRecordTypeId)
val subRecordLength = readShort()
if (subRecordLength > 0) {
val subRecordBytes = readNBytes(subRecordLength.toInt())
val subRecordData = subRecordEncoder.decode(subRecordBytes)
val subRecord = SubRecord(
subRecordTypeId = subRecordTypeId,
subRecordData = subRecordData,
)
subRecordList.add(subRecord)
}
}
return RecordData(
subRecordList = subRecordList,
)
}
}
private fun pickAnEncoder(subRecordTypeId: Int): AbstractSubRecordEncoder<SubRecordData> {
val subRecordEncoder = encodersBySubRecordTypeId[subRecordTypeId] ?:
throw EgtsAdapterException(
code = EgtsExceptionErrorCode.EGTS_DECODE_EXCEPTION,
errorMessage = "no subrecord type with id $subRecordTypeId implemented yes. To implement it: \n" +
"1) start from SubRecordType enum (that's not mandatory step. it just helps to keep things organized) \n" +
"2) add data class to 'model' package \n" +
"3) implement appropriate encoder at 'encoder' package. Use subRecordTypeId and fieldName from 1) \n" +
"4) add new encoder bean initialization to autoconfiguration \n" +
"5) implement test at 'test' package. Use SubRecordType enum entry from 1) to reference subRecordTypeId \n" +
"6) use new encoder at your app code. Use SubRecordType enum entry from 1) to reference subRecordTypeId \n" +
"you're awesome!",
)
@Suppress("UNCHECKED_CAST")
return subRecordEncoder as AbstractSubRecordEncoder<SubRecordData>
}
}
Рис. 5 Код энкодера единичной записи пакета
Все энкодеры являются наследниками абстрактного класса AbstractEgtsEncoder, в который вынесено общее поведение по логированию и обработке исключений. Для реализации энкодера нового типа подзаписи (или записи) нужно унаследоваться от этого класса и реализовать методы performEncode и performDecode.
Вынесение в spring-boot Starter
Изначально адаптер протокола я расположил не в гейтвее, а в сервисе. Позже адаптер протокола был перенесен в гейтвей (Адаптер протокола не относится к домену сервиса и не содержит бизнес-логики, поэтому было принято решение перенести его из сервиса в гейтвей). Для упрощения переноса я упаковал его в spring-boot Starter. Его достаточно подключить в build.gradle.kts проекта, чтобы получить инициализированный бин энкодера, в который можно внедрить зависимости в сервисный слой и использовать без дополнительной ручной конфигурации.
Простота подключения модуля привела к идее вынести проект в опенсорс, чтобы его могли использовать и другие курьерские службы с бэкендом на jvm.
Разделение на модули
Для того чтобы адаптер можно было использовать в не spring-boot проектах он был разделен на два модуля:
egts-adapter-library
Содержит модель и классы энкодеров. Можно инициализировать вручную, как это сделано в тестовом классе AbstractIntegrationTest модуля library. Также этот модуль следует использовать, если приложению не нужен функционал кодирования, а требуется только API. Помимо модели и энкодеров библиотека содержит юнит тесты для всех классов.
egts-adapter-starter
Spring-boot Starter транслирующий egts-adapter-library в качестве API, использующий его для инициализации бинов энкодеров в классе автоконфигурации. Так как кодировка-декодировка тестируется в модуле библиотеки, тест стартера проверяет только инициализацию spring-контекста и наличие в нём нужного бина енкодера.
Добавление sample-app
Назначение и функционал демо-приложения
Для демонстрации подключения адаптера в проект и расширения его кода в отдельном репозитории доступно демо-приложение.
Приложение реализует функционал клиента в составе сервиса курьерской службы. При работе оно устанавливает TCP соединение с тестовым сервером куда отправляет EGTS пакеты с рандомно сгенерированными треками с телеметрией курьеров. Тестовый сервер принимает пакеты, подтверждает получение пакета, после чего верифицирует состав пакета и отправляет подтверждение его корректности.
Демо приложение предназначено только для проверки корректности работы EGTS-адаптера, поэтому код в приложении сильно упрощен (например, не поддерживает конкурентные вызовы, не содержит circuit-breaker и т.п.), поэтому не подходит для использования в проде. Учитывайте это, если решите переиспользовать его в своем решении.
Устройство sample-app
Демо-приложение построено на фреймворке spring-integration. Основная конфигурация находится в классе SpringIntegrationConfiguration, который я приведу здесь целиком, поскольку это центральная часть демо приложения, соединяющая все компоненты в единый пайплайн отправки и приема пакетов.
lass SpringIntegrationConfiguration
@Configuration
@EnableIntegration
open class SpringIntegrationConfiguration(
private val packetEncoder: EgtsPacketEncoder,
private val receivedPacketsStorage: ReceivedPacketsStorage,
) {
private val logger = LoggerFactory.getLogger(javaClass)
@Value("\${tcp.server-address}")
private var serverAddress = "localhost"
@Value("\${tcp.server-port}")
private var serverPort = 8080
@Bean
open fun serializer(): EgtsPacketSerializer =
EgtsPacketSerializer(packetEncoder)
@Bean
open fun deserializer(): EgtsPacketDeserializer =
EgtsPacketDeserializer(packetEncoder)
@Bean
open fun sendingChannel(): MessageChannel = DirectChannel()
@Bean
open fun receivingChannel(): MessageChannel = DirectChannel()
/*
фабрика соединений отвечает за открытие и поддержание TCP соединения и сериализация и десериализацию
сообщений
*/
@Bean
open fun connectionFactory(
serializer: EgtsPacketSerializer,
deserializer: EgtsPacketDeserializer,
): TcpNetClientConnectionFactory {
val factory = TcpNetClientConnectionFactory(
serverAddress,
serverPort,
)
factory.serializer = serializer
factory.deserializer = deserializer
factory.isSingleUse = false // Keep the connection open
return factory.also {
logger.info("created connection factory for {}:{}", serverAddress, serverPort)
}
}
// хэндлер будет передавать полученные из исходящего потока сообщения с помощью фабрики соединений
@Bean
open fun sendingMessageHandler(connectionFactory: TcpNetClientConnectionFactory): TcpSendingMessageHandler {
val handler = TcpSendingMessageHandler()
handler.setConnectionFactory(connectionFactory)
return handler
}
/*
входящий адаптер использует соединение предоставленное фабрикой, чтобы получить десериализованное
сообщение и передать его во входящий канал
*/
@Bean
open fun tcpInboundAdapter(
connectionFactory: TcpNetClientConnectionFactory,
@Qualifier("receivingChannel")
receivingChannel: MessageChannel,
): TcpReceivingChannelAdapter {
val adapter = TcpReceivingChannelAdapter()
adapter.setConnectionFactory(connectionFactory)
adapter.outputChannel = receivingChannel
return adapter
}
/*
исходящий поток перенаправляет сообщения из канала отправки в хэндлер исходящих сообщений
который уже передает их по TCP в соответствии с настройками фабрики соединений
(предоставленное соединение и сериализатор)
*/
@Bean
open fun outgoingFlow(
connectionFactory: TcpNetClientConnectionFactory,
@Qualifier("sendingChannel")
sendingChannel: MessageChannel,
sendingMessageHandler: TcpSendingMessageHandler,
): IntegrationFlow {
return IntegrationFlow.from(sendingChannel)
.handle(sendingMessageHandler)
.get()
}
/*
входящий поток складывает содержание сообщения из входящего канала (десериализованные пакеты)
в хранилище полученных пакетов
*/
@Bean
open fun receivingFlow(
@Qualifier("receivingChannel")
receivingChannel: MessageChannel,
): IntegrationFlow {
return IntegrationFlow.from(receivingChannel)
.handle { message ->
@Suppress("UNCHECKED_CAST")
val packetsMap = (message.payload as Set<EgtsPacket>)
.associateBy {
if (it.packetType == PacketType.APP_DATA) {
it.packetIdentifier
} else {
(it.servicesFrameData as ResponseServicesFrameData).responsePacketId
}
}
receivedPacketsStorage.receivedPackets.putAll(packetsMap)
}
.get()
}
}
Рис. 6 Конфигурационный класс демо-приложения
Первые два бина (EgtsPacketSerializer и EgtsPacketDeserializer) предназначены для преобразования дата классов модели адаптера в поток битов. Именно сериализатор и десериализатор используют бин энкодера, предоставляемый разработанным spring-boot starter.
Как я упоминал, соединение всегда открыто и пакеты идут без разделителей. Десериализатор имеет небольшой дополнительный функционал, который позволяет получать из входящего потока целый пакет и передавать энкодеру только его, не оставляя часть в потоке и не захватывая часть следующего пакета. Поскольку длина пакета всегда находится в одном и том же месте заголовка (то есть на одном расстоянии от начала пакета), то десериализатор считывает ее, а затем просто читает из канала нужное количество байтов, которое потом передает энкодеру.
Остальная конфигурация не содержит ничего специфичного для адаптера протокола. Входящий адаптер сохраняет полученные пакеты в мапу с ключом - ID пакета, чтобы сервисный класс мог по ключу получить ответ на отправленный пакет.
В пакете service находятся три класса:
EgtsServerConnectionManager
PacketFactory
TrackRecordsSender
EgtsServerConnectionManager отвечает за установление соединения (включая формирование с помощью PacketFactory пакета авторизации), отправку пакетов, получение ответов на пакеты.
PacketFactory отвечает за формирование пакета авторизации и ответов на пакеты (подтверждение получения), а также за преобразование внешнего ДТО с треком курьера во вложенную структуру дата классов EGTS пакета.
TrackRecordsSender проверяет доступность соединения перед отправкой. С помощью фабрики пакует записи треков в дата класс EGTS пакета, осуществляет его отправку и получает ответ.
Общая логика приложения выглядит следующим образом:
TrackRecordsController получает REST запрос с количеством треков курьера, которые приложение должно отправить в составе пакета.
↓
Генерирует требуемое количество рандомных записей треков и передает TrackRecordsSender.
↓
TrackRecordsSender проверяет, что соединение открыто. Если это не так, инициирует его открытие менеджером соединения EgtsServerConnectionManager.
↓
С помощью фабрики формирует из полученных треков data-класс EGTS пакета.
↓
Передает data-класс EgtsServerConnectionManager для отправки.
↓
TrackRecordsSender производит отправку пакета, который проходит через EgtsPacketSerializer и отправляется через TCP соединение, с помощью настроенной конфигурации spring-integration.
↓
При получении ответа от сервера ответ проходит через EgtsPacketDeserializer и сохраняется в хранилище ReceivedPacketsStorage.
↓
TrackRecordsSender запрашивает у EgtsServerConnectionManager ответ. EgtsServerConnectionManager извлекает ответ из ReceivedPacketsStorage и отдает TrackRecordsSender.
↓
TrackRecordsSender логирует ответ транспортного уровня из полученного ответа.

Использование sample-app
Для того чтобы убедиться, что всё действительно работает, необходим EGTS-сервер, который будет принимать пакеты, верифицировать их и отвечать клиенту. В качестве мока такого сервера можно использовать скрипт на python, с помощью которого рекомендуют проводить отладку на начальном этапе разработки клиентов для подключения курьерских служб к РНИС.
Из папки в которую мы скачали содержимое репозитория с проверочным скриптом, запускаем скрипт командой:
python start_client_debugger.py -n 4 -d 333
4 — это количество пакетов, которые скрипт примет перед тем, как завершит свою работу.
333 — ид для авторизации (задается в поперти демо-приложения).
Теперь можно соответствующее количество раз дёрнуть ручку контроллера демо-приложения и убедиться, что скрипт принимает и логирует получаемые пакеты, а приложение получает и логирует ответы.
start_client_debugger.py -n 4 -d 333
First packet is correct: Packet ID: 0; Packet Type: 1; records: [{RecNum: 0, sst: 1, subrecords: [{Type: 5, dt: 0, did: 333}]}]
Received egts packet: Packet ID: 2; Packet Type: 1; records: [{RecNum: 2, sst: 2, ID: 4255801, subrecords: [{Type: 16, vld: False, ntm: 1746274839000, lat: -51.91971604989835, long: 152.52254735969998, speed: 518, dir: 18, busy: 0, src: 0},{Type: 24, asn: 194, asv: 0},{Type: 24, asn: 197, asv: 1},{Type: 44, dt: kisartid=1135184247;vplate=kdrdnkg;bagplates=ezeifhfxc]}]
Received egts packet: Packet ID: 3; Packet Type: 1; records: [{RecNum: 3, sst: 2, ID: 6996301, subrecords: [{Type: 16, vld: False, ntm: 1746274861000, lat: 6.550008060073016, long: 93.10344790879252, speed: 998, dir: 103, busy: 0, src: 0},{Type: 24, asn: 194, asv: 1},{Type: 24, asn: 197, asv: 0},{Type: 44, dt: vplate=qbpr;bagplates=qpzf]}{RecNum: 4, sst: 2, ID: 6980839, subrecords: [{Type: 16, vld: False, ntm: 1746274861000, lat: -14.522056347812073, long: 156.29171361594734, speed: 897, dir: 266, busy: 0, src: 0},{Type: 24, asn: 194, asv: 3},{Type: 24, asn: 197, asv: 2},{Type: 44, dt: kisartid=1508917967;vplate=j;bagplates=xftewb]}]
SUCCESS. EGTS connection test succeeded. Received 4 packets.
Please check in logs if data in packets is correct.
curl для ручки приложения
curl --location 'localhost:8080/track/send_track_record' \
--header 'Content-Type: application/json' \
--data '2'
Расширение кода в проекте без доработки самой библиотеки
Библиотека уже содержит data-классы и классы энкодеров для наиболее распространенных типов EGTS пакетов. Например, записи запроса и ответа, подзаписи авторизации и передачи телематики. Но что делать, если требуется передавать тип записи, нереализованный в библиотеке? В этом случае приветствуется merge request на дополнение кода самой библиотеки. Но если привносить вклад в общее дело пока не хочется — можно расширить код уже в своём приложении.
В пакете extension сэмпл приложения показан пример такого расширения функционала адаптера добавлением типа подзаписи «счетный вход». Подзапись содержит два поля – байт с номером счетного входа и три байта под его значение. Для добавления подзаписи в проект добавляем её data-класс.
data class CounterSubRecordData(
val counterNumber: Byte,
val counterValue: Int,
) : SubRecordData()
и класс энкодера
class CounterDataEncoder : AbstractSubRecordEncoder<CounterSubRecordData>(
subRecordTypeId = 25, // EGTS protocol documentation Таблица Б.1 - Список подзаписей сервиса EGTS_TELEDATA_SERVICE
fieldName = "EGTS_SR_ABS_CNTR_DATA",
) {
override fun performEncode(egtsEntity: CounterSubRecordData): ByteArray =
ByteArrayOutputStream().apply {
with(egtsEntity) {
write(byteArrayOf(counterNumber))
write(counterValue.toThreeByteLittleEndianByteArray())
}
}.toByteArray()
override fun performDecode(byteArray: ByteArray): CounterSubRecordData {
ByteArrayInputStream(byteArray).apply {
val counterNumber = readByte().also {
logger.trace("AnalogSensor sensorNumber byteValue is {}", it)
}
val counterValue = readThreeBytesToPositiveInt().also {
logger.trace("AnalogSensor value is {}", it)
}
return CounterSubRecordData(
counterNumber = counterNumber,
counterValue = counterValue,
)
}
}
}
энкодер унаследован от
AbstractSubRecordEncoder<CounterSubRecordData>
Поскольку это энкодер подзаписи, в конструктор передаётся его код subRecordTypeId согласно ГОСТ и имя поля в пакете для целей логирования – EGTS_SR_ABS_CNTR_DATA. Чтобы этот энкодер занял своё место в иерархической структуре энкодеров и автоматичеси вызывался энкодером верхнего уровня, его нужно инициализировать, как Spring бин. Делается это в отдельном конфигурационном классе.
@Configuration
open class ImplementedEncodersConfiguration {
@Bean
@ConditionalOnProperty("egts.initialize-encoders")
open fun counterSubRecordDataEncoder() = CounterDataEncoder()
}
В приложение добавлен соответствующий тест
tech.ecom.egts.demo.extension.CounterDataEncoderTest
Он собирает data-класс пакета с новым типом записи и проверяет, как полученный пакет кодируется-декодируется энкодером верхнего уровня.
Публикация в maven-central
Проект опубликован в репозитории мавен и может быть подключен в любой проект как это сделано в демо-приложении
Адаптер EGTS протокола готов
Код прошёл проверку практикой, работает в production окружении. Трекинг нескольких тысяч курьеров Москвы отправляется в РНИС столицы, начиная с 2024 года. Адаптер опубликован в maven-central репозитории и подключается в приложение, как обычная внешняя зависимость. Возможно использование как в приложениях на основе Spring (в варианте Spring-boot-starter), так и в приложениях не на базе этого фреймворка в варианте library.
Надюсь статья будет полезна для всех, кто столкнулся с необходимостью отправки телематических данных курьеров в гос структуры. Пишите вопросы в комментарии, если они остались.