Иногда мы с товарищами собираемся, чтобы сыграть несколько забегов в игре PUBG: Battlegrounds. Взаимодействие между участниками команды (сквада) тут возведено в абсолют, и голосовая коммуникация жизненно необходима для успешного выживания. В PUBG нельзя расслабляться ни на секунду. Частенько бывают ситуации, когда нужно на слух определять местоположение соперника. Любой посторонний звук в такой момент способен испортить матч всей команде.

Чтобы минимизировать ситуации, когда мои домочадцы пытаются со мной заговорить во время игры, я решил поставить световое табло ON AIR, похожее на то, которое используют на радиостанциях (и да, после такой наглости я даже выжил). Задумка была в том, чтобы табло автоматически зажигалось, когда я нахожусь на сервере TeamSpeak, и отключалось после дисконнекта. Что в итоге у меня получилось — читайте дальше.

Вот оно — табло
Вот оно — табло

Прежде всего я раздобыл готовое табло. Разумеется, можно было бы заморочиться с неоном, но я решил обойтись обычным LED. Цена на такую штуку не слишком велика, но вот с автоматизацией придется повозиться. Первое, что я сделал, — проверил ток потребления:

Тестер Дядюшки Ляо
Тестер Дядюшки Ляо

1,44 А — это действительно много, чтобы брать напрямую с RPI. Запитывать придется от отдельного блока. Управлять буду с помощью реле. Нормальные люди обычно покупают готовые модули, но ждать три недели с маркетплейса не хотелось. Под рукой же лежало реле TRJ-5VDC-SA-CD:

TRJ-5VDC-SA-CD
TRJ-5VDC-SA-CD

Достаточно подать на управляющие контакты 5 В 40 мА — и оно успешно сработает. Проблемы две: GPIO Raspberry Pi оперирует напряжением 3,3 V, а максимум c такого пина можно отдать ~16 мА. Тем не менее на самой гребенке два пина 5 V, которые по факту выдают до 1–2 А в зависимости от блока питания и загруженности платы. Само реле для удержания в открытом состоянии будет потреблять 40–70 мА. Получается, нужно собрать простейший драйвер.

Плюс не стоит забывать, что реле управляется катушкой, которая благодаря коварной физике накапливает в себе магнитную энергию. Достаточно резко оборвать ток — и напряжение на выводах способно подскочить и выжечь нафиг наш нежный GPIO (привет, обратная ЭДС). Так что надо будет подумать о защите в виде диода.

Делаю драйвер

Порывшись в коробочках с радиодеталями (у каждого мужчины такая со временем появляется), я нашел NPN-транзистор КТ3102 в корпусе TO-92 и сигнальный кремниевый диод 1N4148. Вначале я рассчитывал, что мне для открытия транзистора будет достаточно ~2,6 мА, поэтому ошибочно взял резистор на 1кΩ.

Дальше собрал схему:

  • +5 V (pin 2) — анод 1N4148;

  • катод 1N4148 — катушка управления;

  • катушка управления — эмиттер транзистора;

  • GPIO 17 (pin 11) — резистор;

  • резистор — база транзистора;

  • GND (pin 6) — коллектор транзистора.

Быстро спаял навесным монтажом и подключил к «малинке». Чтобы было проще, установил Node-Red при помощи волшебного скрипта:

$ bash <(curl -sL https://github.com/node-red/linux-installers/releases/latest/download/update-nodejs-and-nodered-deb)

Для автостарта Node-Red выполнил:
$ sudo systemctl enable nodered.service

Теперь можно проверить переключение реле, подавая 1 и 0 на GPIO 17 с помощью штатной ноды Inject в связке с rpi-gpio out:

Первый блин комом — вместо отчетливого щелчка я услышал еле-еле заметную попытку. Померял напряжение мультиметром, а там всего лишь 2 V. Значит, транзистор не ушел в насыщение и не открылся. Поэтому заменил номинал резистора (поставил 470Ω вместо 1кΩ). Ток с GPIO-пина вырос до ~5,6 мА — вполне безопасное значение. Проверил — реле уверенно переключается.

MQTT

Теперь пришла пора поставить брокер сообщений Mosquitto и настроить базовую авторизацию по логину и паролю:

$ sudo apt install -y mosquitto mosquitto-clients

Как и Node-Red, имеет смысл добавить Mosquitto в автостарт:

$ sudo systemctl enable mosquitto

В комплекте есть специальная утилита, позволяющая безопасно хранить заданный пароль для указанного пользователя:

$ sudo mosquitto_passwd -c /etc/mosquitto/passwd nodered

По умолчанию Mosquitto читает конфиги в директории /etc/mosquitto/conf.d/:

$ sudo nano /etc/mosquitto/conf.d/local.conf

Прописываем туда порт, запрещаем подключаться без авторизации и говорим, где лежит файл с хешем пароля:

listener 1883
allow_anonymous false
password_file /etc/mosquitto/passwd

Перечитываем конфиг и перезапускаем брокер:

$ sudo systemctl restart mosquitto

Теперь можно возвращаться в веб-интерфейс Node-Red и настроить работу через MQTT. В первую очередь надо добавить сервер. Так как все запущено на Raspberry, прописываем адрес 127.0.0.1 (или localhost):

Переходим на вкладку Security и указываем там ранее созданного пользователя и пароль nodered / password:

Слушать мы будем определенный топик. Назовем его для примера state:

Переделываем тестовую схему — вместо прямого соединения добавляем ноды MQTT в качестве посредника. При нажатии кнопки Deploy снизу каждой из них появится актуальный статус. Если все настроено правильно, то будет надпись connected:

Проверив еще раз, что отправка сообщения переключает реле, соединяем его выводы COM и NO (Normally Open) в разрыв жилы +5 V USB-кабеля. Проще всего взять отдельный USB-удлинитель и именно у него перерезать жилу. Это даст возможность управлять так любым USB-прибором, а не только конкретным табло.

Итак, первая часть проекта завершена. Поднят рабочий сервер Node-Red с брокером Mosquitto, который ожидает сообщения в топике state с 1 или 0 и дает команду на реле для включения табло. Теперь пора написать приложение, которое будет считывать данные с сервера TeamSpeak и отсылать сообщение MQTT.

Soft

У TeamSpeak есть два режима ServerQuery:

  1. RAW (читай Telnet) на 10011.

  2. SSH (да-да, он самый) на 10022.

Первый вариант хорош тем, что туда можно подключаться обычным сокетом, читать и писать строки, а также вытворять прочие безумства. Альтернатива — SSH. Все то же самое, только безопасно. Приложение будет на Python 3.11, к которому я дополнительно установлю пару пакетов с клиентами SSH (paramiko) и MQTT (paho-mqtt).

$ sudo apt install python3-paho-mqtt python3-paramiko

Код, разумеется, писался в нормальной IDE, но для переноса воспользовался редактором nano:

$ sudo nano ts3_presence_to_mqtt.py

Полный код:
import socket, time, sys
from typing import Optional, Tuple
import paho.mqtt.client as mqtt
import paramiko

# ================== НАСТРОЙКИ ==================
TS3_HOST = "192.168.88.105"
TS3_PROTOCOL = "ssh"
TS3_QUERY_PORT = 10022
TS3_LOGIN = "serveradmin"
TS3_PASSWORD = "CHANGE_ME"
TS3_SID = 1
TS3_TARGET_NICK = "Test"

MQTT_HOST = "192.168.88.27"
MQTT_PORT = 1883
MQTT_USERNAME = "nodered"
MQTT_PASSWORD = "password"
MQTT_TOPIC = "state"
MQTT_QOS = 1

CHECK_PERIOD_SEC = 1.0
CONNECT_TIMEOUT = 5.0
RECONNECT_DELAY_SEC = 3.0
# ===============================================

def ts3_unescape(s: str) -> str:
    rep = {r"\\s":" ", r"\\p":"|", r"\\/":"/", r"\\n":"\n", r"\\r":"\r", r"\\t":"\t", r"\\v":"\v", r"\\\\":"\\"}
    for k,v in rep.items(): s = s.replace(k,v)
    return s

def parse_presence(payload: str, target_nick: str) -> bool:
    if not payload: return False
    for rec in payload.split("|"):
        kv = {}
        for part in rec.split():
            if "=" in part:
                k,v = part.split("=",1)
                kv[k] = ts3_unescape(v)
        if kv.get("client_nickname") == target_nick:
            return True
    return False

class TS3Raw:
    def __init__(self, host, port, user, password, sid):
        self.host, self.port, self.user, self.password, self.sid = host, port, user, password, sid
        self.sock: Optional[socket.socket] = None

    def _send(self, cmd: str):
        self.sock.sendall((cmd.strip()+"\n").encode())

    def _read_until_ok(self) -> Tuple[str, dict]:
        self.sock.settimeout(CONNECT_TIMEOUT)
        buf = b""
        while True:
            chunk = self.sock.recv(4096)
            if not chunk: raise ConnectionError("TS3 socket closed")
            buf += chunk
            text = buf.decode("utf-8","replace")
            if "\nerror " in text or text.endswith("error "):
                lines = text.strip().splitlines()
                if not lines or not lines[-1].startswith("error "): continue
                err_line = lines[-1][6:]
                err = {}
                for p in err_line.split():
                    if "=" in p:
                        k,v = p.split("=",1)
                        err[k] = ts3_unescape(v)
                payload = "\n".join(lines[:-1])
                return payload, err

    def connect(self):
        self.sock = socket.create_connection((self.host, self.port), timeout=CONNECT_TIMEOUT)
        self.sock.settimeout(1.0)
        try: _ = self.sock.recv(4096)
        except Exception: pass
        self._send(f"login client_login_name={self.user} client_login_password={self.password}")
        _, err = self._read_until_ok()
        if err.get("id") != "0": raise PermissionError(f"login failed: {err}")
        self._send(f"use sid={self.sid}")
        _, err = self._read_until_ok()
        if err.get("id") != "0": raise RuntimeError(f"use failed: {err}")

    def clientlist(self) -> str:
        self._send("clientlist")
        payload, err = self._read_until_ok()
        if err.get("id") != "0": raise RuntimeError(f"clientlist failed: {err}")
        return payload

    def close(self):
        try:
            if self.sock:
                try: self._send("quit")
                except Exception: pass
                self.sock.close()
        finally:
            self.sock = None

class TS3SSH:
    def __init__(self, host, port, user, password, sid):
        self.host, self.port, self.user, self.password, self.sid = host, port, user, password, sid
        self.ssh: Optional[paramiko.SSHClient] = None
        self.chan: Optional[paramiko.Channel] = None
        self.buf = ""

    def connect(self):
        self.ssh = paramiko.SSHClient()
        self.ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy())
        self.ssh.connect(self.host, port=self.port, username=self.user, password=self.password, timeout=CONNECT_TIMEOUT, allow_agent=False, look_for_keys=False)
        self.chan = self.ssh.invoke_shell(width=160, height=24)
        self.chan.settimeout(CONNECT_TIMEOUT)
        time.sleep(0.2)
        self._drain()
        self._send(f"use sid={self.sid}")
        self._read_until_ok()

    def _send(self, cmd: str):
        self.chan.send((cmd.strip()+"\n").encode())

    def _drain(self):
        try:
            while self.chan.recv_ready():
                self.buf += self.chan.recv(4096).decode("utf-8","replace")
        except Exception:
            pass

    def _read_until_ok(self) -> Tuple[str, dict]:
        deadline = time.time() + CONNECT_TIMEOUT
        while time.time() < deadline:
            self._drain()
            if "\nerror " in self.buf:
                text = self.buf
                self.buf = ""
                lines = text.strip().splitlines()
                if not lines: continue
                idx = None
                for i in range(len(lines)-1, -1, -1):
                    if lines[i].startswith("error "):
                        idx = i; break
                if idx is None: continue
                err_line = lines[idx][6:]
                err = {}
                for p in err_line.split():
                    if "=" in p:
                        k,v = p.split("=",1)
                        err[k] = ts3_unescape(v)
                payload = "\n".join(lines[:idx])
                return payload, err
            time.sleep(0.05)
        raise TimeoutError("TS3 SSH read timeout")

    def clientlist(self) -> str:
        self._send("clientlist")
        payload, err = self._read_until_ok()
        if err.get("id") != "0": raise RuntimeError(f"clientlist failed: {err}")
        return payload

    def close(self):
        try:
            if self.chan: 
                try: self._send("quit")
                except Exception: pass
                self.chan.close()
        finally:
            if self.ssh:
                self.ssh.close()
            self.chan = None
            self.ssh = None

class MqttPublisher:
    def __init__(self, host, port, username, password, topic, qos=1):
        self.client = mqtt.Client()
        if username: self.client.username_pw_set(username, password)
        self.topic, self.qos = topic, qos
        self.client.connect_async(host, port, keepalive=30)
        self.client.loop_start()
    def publish_state(self, v:int): self.client.publish(self.topic, str(v), qos=self.qos, retain=True)

def make_ts3():
    if TS3_PROTOCOL == "ssh":
        return TS3SSH(TS3_HOST, TS3_QUERY_PORT, TS3_LOGIN, TS3_PASSWORD, TS3_SID)
    else:
        return TS3Raw(TS3_HOST, TS3_QUERY_PORT, TS3_LOGIN, TS3_PASSWORD, TS3_SID)

def main():
    ts3 = make_ts3()
    mqtt_pub = MqttPublisher(MQTT_HOST, MQTT_PORT, MQTT_USERNAME, MQTT_PASSWORD, MQTT_TOPIC, MQTT_QOS)
    last = None
    while True:
        try:
            if isinstance(ts3, TS3Raw) and ts3.sock is None: ts3.connect()
            if isinstance(ts3, TS3SSH) and ts3.chan is None: ts3.connect()
            payload = ts3.clientlist()
            value = 1 if parse_presence(payload, TS3_TARGET_NICK) else 0
        except Exception as e:
            sys.stderr.write(f"[WARN] TS3 check failed: {e}\n")
            value = 0
            try: ts3.close()
            except Exception: pass
            time.sleep(RECONNECT_DELAY_SEC)
        if value != last:
            mqtt_pub.publish_state(value)
            last = value
        time.sleep(CHECK_PERIOD_SEC)

if __name__ == "__main__":
    try: main()
    except KeyboardInterrupt: pass

Теперь достаточно запустить приложение в фоновом режиме следующей командой:

$ python ts3_presence_to_mqtt.py &

Как только вы подключитесь к серверу TeamSpeak с именем Test, реле щелкнет и табло незамедлительно загорится. Ну а при дисконнекте — потухнет. По умолчанию процесс подключения к серверу происходит разово, а далее оно запрашивает статус каждую секунду. При необходимости интервал можно увеличить, заменив значение переменной CHECK_PERIOD_SEC.

Что получилось

Этот небольшой DIY-проект оказался довольно интересным в реализации. Разумеется, было бы проще взять модуль с одним или несколькими реле, но и самодельный драйвер работает без каких-либо проблем. Программный код в целом тоже получился несложным — нужно было учесть, что TS3 Server Query экранирует спецсимволы, — и накидать собственный вариант «декодера». Для однозначного определения завершения любого ответа реализован парсинг до строки вида error id=[код] msg=[сообщение].

Чтобы комфортно работать с постоянным подключением к серверу, был написан отдельный класс, который занимается всем — от установки соединения с чтения баннера приветствия (при его наличии) до корректного завершения с закрытием сокета. Еще один класс — минимальная обертка над paho-mqtt с асинхронным подключением к брокеру и фоновым loop_start(), при котором клиент сам держит соединение и отправляет keepalive.

От сервера мы получаем полный ответ clientlist, который парсим с помощью функции parse_presence. Записи там идут с разделителем «|», а каждая из них по факту пара «ключ=значение» через пробел. В итоге достаточно собрать словарь полей, снять экранирование спецсимволов и проверить соответствие никнейма из «Настроек» полученному ответу. На основании этого отправить 1 или 0 по MQTT.

Ну что, как вам самоделка?

Комментарии (1)


  1. vesowoma
    10.09.2025 09:50

    Обычно диод подключают параллельно обмотке реле, так чтобы штатно через диод ток не шел.

    В приведенной в статье схеме диод защищать будет, но на нем впустую будет падение напряжения.