Использование In-door локации
Навигация внутри помещений может быть нетривиальной задачей, учитывая низкую точность GPS из-за искажений сигнала во время его прохождения через стены. Можно по всему помещению развесить маячки, но это требует больших затрат на оборудование и обслуживание. При этом, Wi-Fi роутеры, которые уже есть в помещениях, как раз могут выступать такими маячками. Измеряя мощность сигнала на устройствах, можно определять местоположение с довольно большой точностью.
У нас в Сколтехе был проект, посвященный тому чтобы люди на больших мероприятиях могли находить друг друга на территории кампуса. Но тут сразу вылазит несколько проблем, главная из которых это конфиденциальность и приватность. Сейчас In-door навигация активно используется ритейлерами для определения актуальных точек в магазинах. Это позволяет оптимизировать использование коммерческой площади. Но эти данные анонимизированные. Развитие этой технологии может привести к тому что на работе начальство будет знать, сколько времени конкретный работник проводит за разговорами у кулера. Также мы столкнулись с закрытостью IOS, которая ограничивает сканирование Wi-Fi сети, напрямую ограничивая использование идеи навигации с помощью мощности Wi-Fi маршрутизаторов. Подозреваю, что эти ограничения также связаны с обеспечением конфиденциальности пользователей.
С большим желанием реализовать идею, нашлось применение. У нас по кампусу можно кататься на скейтах и самокатах. Они не общественные, у каждого свой, но иногда их все таки кто-то берет. Ставить сигнализацию не очень уместно, потому что хозяин может быть далеко. Да и лишний шум никому не нужен. Но можно поставить на скейт маячок и отправлять уведомления хозяину при изменении локации, а также уведомление о долгом отсутствии сигнала. Так можно легко найти свой транспорт, если его не оказалось на прежнем месте или узнать, что его вынесли за пределы кампуса. От этой идеи и отталкивается весь проект.
Алгоритм определения локации
В алгоритме определения локации используется идея фингерпринтинга. В одной локации сигнатуры сети (адреса маршрутизаторов и мощность их сигнала) будут похожи. Это можно использовать для определения аудитории, сравнивая измеренную сигнатуру с ранее записанными и в которых определена локация. Определением локации будем считать именно номер аудитории. Конечно, это не так красиво как точка на карте, но в самой простой реализации In-door локации этот вариант самый удобный, в том числе с точки пользовательского интерфейса. Если измеренная маячком сигнатура оказалась совсем незнакомой, пользователю предлагается привязать к ней номер аудитории, тем самым пополняя базу данных о различных аудиториях на кампусе.
Чтобы решить данную задачу классификации аудиторий, нужно ввести функцию ошибки или, другими словами, "расстояния" между точками. Классический способ это средний квадрат разницы мощностей между общими маршрутизаторами в измеренной сигнатуре и сохраненной. Но оказалось, что этот способ имеет существенный недостаток. Когда рассматриваются только общие маршрутизаторы в точках, оказывается, что учитываются роутеры, которые физически находятся между этими точками. А это означает, что будут точки, которые в физическом пространстве окажутся далеко, но функция ошибки будет принимать одинаковое значение. Пример такой ситуации изображен на Рис. 1. Здесь синие круги - это маршрутизаторы, а красные квадраты - маячки, сканирующие сеть. На линиях показана мощность сигнала от каждого общего маршрутизатора, которые учитываются при подсчете функции ошибки.

Функция ошибка должна быть пропорциональна физическому расстоянию между точками. На Рис. 1 видно, что при подсчете функции ошибки не учитываются маршрутизаторы, которые присутствуют в одной точке, но отсутствуют в другой. А эта информация довольно важна при определении локации. Пример подсчета функции ошибки с учетом отсутствующих роутеров в различных точках изображен на Рис. 2.

Отсутствующие роутеры в различных точках можно учитывать количественно. Я поставил коэффициент 5, выведенный эмпирически. Если этот коэффициент будет маленьким, то ошибка, вносимая общими роутерами, будет преобладать в расчете общей функции ошибки. И наоборот, если этот коэффициент будет большим, отсутствующие роутеры будут преобладать в общем расчете функции ошибки. Итоговая формула функции ошибки примет вид:
где - множество роутеров в измеряемой точке,
- множество роутеров в сохраненной точке
,
- мощность сигнала роутера
в измеренной точке в дБ,
- мощность сигнала роутера
в сохраненной точке
в дБ.
Функция ошибки рассчитывается для каждой записанной точки с привязанной аудиторией (спойлер: для почти каждой точки). Затем берется 5 точек с минимальной функцией ошибки и путем голосования определяется аудитория, относящаяся к скандированной точке. Получается не k-means кластеризация, а k-means классификация.
Следующий шаг заключается в определении трешхолда, выше которого алгоритм не будет автоматически определять аудиторию, а будет предлагать пользователю ввести ее вручную. Это своеобразная функция неуверенности. Если трешхолд будет маленьким, алгоритм постоянно будет просить уточнить аудиторию. Зато увеличится точность. В свою очередь, большой трешхолд может привести к большим выбросам или даже к дрейфу кластера, когда с пополнением базы данных общая совокупность точек, привязанных к определенной аудитории, сдвигается.
Трешхолд также был подобран эмпирически - 70. При нем уведомления о недостаточности данных не приходят когда скейт стоит на месте, а аудитории определяются точно даже на границе между ними.
Еще нужно ввести минимальное количество записей для аудитории, после которого она начинает определяться автоматически. Это нужно для начальной калибровки от пользователей во время ручного заполнения базы данных. Даже если ошибка определения ниже трешхолда, если количество записей данной аудитории меньше 3, пользователю все равно предлагается добавить аудиторию вручную.
Последняя деталь, важная для алгоритма определения аудиторий, это максимальное количество записей определенной аудитории, которое рассматривается для принятия решения. Это нужно для снижения нагрузки на сервер из-за того, что база данных постоянно пополняется новыми записями и сравнивать полученную сигнатуру со всеми точками не обязательно. Я поставил ограничение 20 записей для каждой аудитории.
Выбор плат и расчет автономности
Я выбрал плату WeMos ESP-Wroom-32 с батарейным отсеком под аккумулятор 18650. Удобство в том, что на самой плате уже есть отсек для аккумулятора. В условиях постоянных вибраций это может оказаться хорошим преимуществом.

Теперь перейдем к расчету автономности. Я использовал аккумулятор на 3400 мАч. По документации потребление ESP-Wroom-32 при активном использовании Wi-Fi - 160-240 мА. В режиме глубокого сна потребление 10-150 мкА. При отправке пакета раз в 10 минут по расчетам одного заряда аккумулятора должно хватить на 99 дней.
Настройки сети и сервера
Код маячка написан на C++ и он максимально прост. Вся его задача это сканировать сеть и отправить сигнатуру на сервер, который находится в локальной сети и уйти в глубокий сон на час чтобы сэкономить заряд батареи. После этого цикл повторяется. Сначала скрипт сканирует сеть и заполняет JSON файл данными о точках доступа (имя сети, MAC-адрес и мощность сигнала). Затем он подключается к сети Сколтеха и отправляет пакет HTTP на сервер по локальной сети. Сервером выступает обычный Raspberry Pi 3 Model B.
#include <WiFi.h>
#include <HTTPClient.h>
#include <ArduinoJson.h>
#include "esp_eap_client.h"
// Параметры сети
const char* ssid = "SKOLTECH";
const char* identity = "USERNAME";
const char* username = "USERNAME";
const char* password = "PASSWORD";
const char* serverUrl = "http://IP:PORT/";
void setup() {
Serial.begin(115200);
delay(1000);
WiFi.disconnect(true);
WiFi.mode(WIFI_STA);
WiFi.begin(ssid, WPA2_AUTH_PEAP, identity, username, password);
while (WiFi.status() != WL_CONNECTED) {
delay(500);
}
delay(1000);
sendWifiScanResults();
}
void sendWifiScanResults() {
int n = WiFi.scanNetworks();
if (n == 0) {
Serial.println("Сеть не найдена");
return;
}
// Заполнение JSON-файла
DynamicJsonDocument doc(2048);
doc["username"] = username;
JsonArray networks = doc.createNestedArray("networks");
for (int i = 0; i < n; ++i) {
JsonObject net = networks.createNestedObject();
net["ssid"] = WiFi.SSID(i);
net["bssid"] = WiFi.BSSIDstr(i);
net["rssi"] = WiFi.RSSI(i);
}
String jsonStr;
serializeJson(doc, jsonStr);
// Отправка пакета
if (WiFi.status() == WL_CONNECTED) {
HTTPClient http;
http.begin(serverUrl);
http.addHeader("Content-Type", "application/json");
int httpResponseCode = http.POST(jsonStr);
if (httpResponseCode > 0) {
Serial.println(httpResponseCode);
String response = http.getString();
Serial.println(response);
} else {
Serial.println(httpResponseCode);
}
http.end();
} else {
Serial.println("Нет Wi-Fi соединения");
}
esp_deep_sleep(10 * 60 * 1000000LL); // 10 минут
}
void loop() {
}
JSON-файл, который заполняется результатом сканирования сети, выглядит так:
{
"username": "Alexander.Shmatok",
"networks": [
{
"ssid": "SKOLTECH",
"bssid": "28:6F:7F:21:3E:42",
"rssi": -73
},
{
"ssid": "SKOLTECH_GUEST",
"bssid": "28:6F:7F:21:3E:40",
"rssi": -74
},
{
"ssid": "SberMedAI_2G",
"bssid": "28:6F:7F:21:3E:48",
"rssi": -74
},
…
{
"ssid": "CLOUDBASE",
"bssid": "88:C3:97:C9:18:7A",
"rssi": -90
},
{
"ssid": "BioInf_Workshop",
"bssid": "CC:DB:93:D4:B8:25",
"rssi": -90
},
{
"ssid": "SKOLTECH",
"bssid": "CC:DB:93:D4:B8:22",
"rssi": -91
}
]
}
В каждом JSON выходит примерно по 20 точек доступа. В таком виде информация поступает на сервер, где уже сравнивается с записанными точками и принимается решение по определению аудитории. Ниже приведен код сервера.
Код сервера
import json
from flask import Flask, request
import mysql.connector
from mysql.connector import Error
import telebot
from telebot import types
from datetime import datetime
from threading import Thread
import traceback
from collections import defaultdict
app = Flask(__name__)
bot = telebot.TeleBot(TELEGRAM_TOKEN)
bot.enable_save_next_step_handlers(delay=2)
bot.load_next_step_handlers()
# Параметры подключения к MySQL
DB_CONFIG = {
'host': 'localhost',
'user': 'root',
'password': 'PASSWORD',
'database': 'skate_locator',
}
# Минимальное количество записей одной аудитории для уверенности алгоритма
MIN_RECORDS = 3
# Максимальное количество рассматриваемых записей для одной аудитории
MAX_RECORDS = 20
# Хитрый SQL-запрос, который случайно выбирает 20 записей, относящихся к каждой аудитории
# Результат запроса возвращается функцией в JSON-файле
def get_measurements_by_audience(conn):
cursor = conn.cursor(dictionary=True)
cursor.execute("""SELECT audience, data
FROM (
SELECT
audience,
data,
ROW_NUMBER() OVER (PARTITION BY audience ORDER BY RAND()) AS rn
FROM measurements
WHERE audience IS NOT NULL
) t
WHERE rn <= 20;
""")
rows = cursor.fetchall()
groups = {}
for r in rows:
aid = r['audience']
dat = json.loads(r['data'])
networks = dat.get('networks', [])
current_data = {d['bssid']: d['rssi'] for d in networks}
groups.setdefault(aid, []).append(current_data)
return groups
# Функция определения аудитории
def calculate_audience(current_data, groups):
best_aid = None
best_error = float('inf')
errors = []
for aid, records in groups.items():
for rec in records:
sum_sq = 0
count = 0
# Подсчет функции ошибки
for bssid, rssi in current_data.items():
if bssid in rec:
diff = rssi - rec[bssid]
sum_sq += diff * diff
count += 1
if count:
errors.append(((sum_sq / count) + 5 * (len(rec) + len(current_data.items()) - 2 * count), aid))
# Определение 5 ближайших точек с минимальной ошибкой
nearest = sorted(errors, key=lambda x: x[0])[:5]
votes = defaultdict(float)
# Голосование 5 ближайших соседей. Количество голосов обратно пропорционально ошибке
for dist, aud in nearest:
votes[aud] += 1 / (dist + 1e-6)
# Если результата голосования нет, то предлагается ввести новую аудиторию
if votes:
return max(votes.items(), key=lambda x: x[1])[0], nearest[0][0]
else:
return None, 1000
# Возвращает количество записей в базе данных, относящихся к определенной аудитории
def count_records_for_audience(conn, audience):
cursor = conn.cursor()
cursor.execute(f'SELECT COUNT(*) FROM measurements WHERE audience = "{audience}"')
return cursor.fetchone()[0]
# Определение двух последних аудиторий, где находился скейт
def get_last_two_audiences(conn, user_id):
cursor = conn.cursor()
cursor.execute(f'SELECT audience FROM measurements WHERE user_id = {user_id} ORDER BY timestamp DESC LIMIT 2')
rows = cursor.fetchall()
return [row[0] for row in rows]
def notify_change(chat_id, old_aud, new_aud):
try:
bot.send_message(chat_id=chat_id,
text=f"Аудитория изменилась: {old_aud} → {new_aud}")
except TelegramError as e:
print("Telegram error:", e)
# Добавление записи в базу данных
def save_measurement(username, data_json):
try:
connection = mysql.connector.connect(**DB_CONFIG)
cursor = connection.cursor()
cursor.execute(f'SELECT chat_id FROM users WHERE name = "{username}" LIMIT 1')
user_id = int(cursor.fetchall()[0][0])
networks = data_json.get('networks', [])
current_data = {d['bssid']: d['rssi'] for d in networks}
groups = get_measurements_by_audience(connection)
best_aud, error = calculate_audience(current_data, groups)
insert_query = """
INSERT INTO measurements (user_id, data, audience)
VALUES (%s, %s, %s)
"""
count = count_records_for_audience(connection, best_aud) if best_aud else 0
# Принятие решения о ручном вводе аудитории
if count < MIN_RECORDS or error > 70:
bot.send_message(user_id, f'Недостаточно данных. Используйте команду /set_room и введите номер аудитории (например: A5-3013). Он привяжется к последней записанной локации. Error value - {error:.2f}')
cursor.execute(insert_query, (
user_id,
json.dumps(data_json),
None
))
connection.commit()
else:
cursor.execute(insert_query, (
user_id,
json.dumps(data_json),
best_aud
))
connection.commit()
# Если аудитория изменилась, пользователю отправляется уведомление
aud_list = get_last_two_audiences(connection, user_id)
if aud_list[1] != best_aud:
notify_change(user_id, aud_list[1], best_aud)
except Exception as e:
print("Ошибка при работе с MySQL:", e)
finally:
if connection.is_connected():
cursor.close()
connection.close()
# Отправка сообщения о том, где скейт был замечен последний раз
@bot.message_handler(commands=['where'])
def send_where_message(message):
try:
connection = mysql.connector.connect(**DB_CONFIG)
cursor = connection.cursor()
cursor.execute(f'SELECT audience, timestamp FROM measurements WHERE user_id = {message.from_user.id} AND audience IS NOT NULL ORDER BY timestamp DESC LIMIT 1')
result = cursor.fetchall()
audience = str(result[0][0])
timestamp = str(result[0][1])
bot.send_message(message.from_user.id, f'Последний раз ваш скейт был замечен возле аудитории {audience} в {timestamp}')
except Exception as e:
bot.reply_to(message, traceback.format_exc())
finally:
if connection.is_connected():
cursor.close()
connection.close()
# Команда ручного ввода аудитории к последней записи в базе данных без номера аудитории
@bot.message_handler(commands=['set_room'])
def send_room_message(message):
try:
connection = mysql.connector.connect(**DB_CONFIG)
cursor = connection.cursor()
cursor.execute(f'SELECT timestamp FROM measurements WHERE user_id = {message.from_user.id} AND audience IS NULL ORDER BY timestamp DESC LIMIT 1')
timestamp = str(cursor.fetchall()[0][0])
markup = types.ReplyKeyboardMarkup(one_time_keyboard=True, resize_keyboard=True)
markup.add(types.KeyboardButton('Отмена ❌'))
msg = bot.send_message(message.from_user.id, f'Введите номер аудитории (например: A5-3013). Он привяжется к записи, произведенной {timestamp}. Если передумали, нажмите кнопку отмены', reply_markup=markup)
bot.register_next_step_handler(msg, execute_db)
except Exception as e:
bot.reply_to(message, traceback.format_exc())
def execute_db(message):
try:
if message.text != 'Отмена ❌':
connection = mysql.connector.connect(**DB_CONFIG)
cursor = connection.cursor()
cursor.execute(
"""UPDATE measurements SET audience = %s WHERE user_id = %s AND audience IS NULL ORDER BY timestamp DESC LIMIT 1""",
(message.text, message.from_user.id)
)
connection.commit()
connection.close()
markup = telebot.types.ReplyKeyboardRemove()
bot.send_message(chat_id=message.chat.id, text=f"Аудитория {message.text} сохранена.", reply_markup=markup)
else:
pass
except Exception as e:
bot.reply_to(message, traceback.format_exc())
def bot_polling():
bot.infinity_polling()
# Приём пакетов от маячков
@app.route('/', methods=['POST'])
def receive_measurement():
data = request.get_json()
username = data.get('username', 'unknown')
save_measurement(username, data)
return {'status': 'ok'}
if __name__ == '__main__':
Thread(target=bot_polling).start()
app.run(host='0.0.0.0', port=8080)
Сервер одновременно выполняет несколько функций: прием пакетов от маячков, подключение к базе данных и отправка сообщений телеграм боту.
Есть еще один небольшой скрипт, который проверяет, как давно был отправлен последний пакет от маячка. Если с последней отправки прошло больше часа, а скейт был активен, то его пользователю отправляется сообщение о подозрении к пропаже. И наоборот, если скейт был неактивен и начал отправлять пакеты, пользователь получит соответствующее сообщение.
Код для проверки активности
import mysql.connector
import telebot
from datetime import datetime
TELEGRAM_TOKEN = 'TELEGRAM_TOKEN'
bot = telebot.TeleBot(TELEGRAM_TOKEN)
# Параметры подключения к MySQL
DB_CONFIG = {
'host': 'localhost',
'user': 'root',
'password': 'PASSWORD',
'database': 'skate_locator',
}
connection = mysql.connector.connect(**DB_CONFIG)
cursor = connection.cursor()
cursor.execute(f'SELECT * FROM users')
users = cursor.fetchall()
# Проверка времени по каждому пользователю
for user in users:
cursor.execute(f'SELECT timestamp FROM measurements WHERE user_id = {str(user[1])} ORDER BY timestamp DESC LIMIT 1')
timestamp = cursor.fetchall()[0][0]
now = datetime.now()
# Определение времени, прошедшего с отправки последнего пакета от скейта
delta = now - timestamp
# Оповестить пользователя о прекращении отправки пакетов
if user[4] == 1 and delta.total_seconds() > 3600:
# Scate is lost; is_active set to 0
bot.send_message(chat_id=int(user[1]), text=f"Скейт больше часа не отправлял свою локацию. Возможные причины: \
\n- Разрядился аккумулятор\n- Скейт покинул кампус\n- Плата была выключена\n")
cursor.execute(f"UPDATE users SET is_active = FALSE WHERE chat_id = {str(user[1])} LIMIT 1")
connection.commit()
# Оповестить пользователя о возобновлении отправки пакетов
elif user[4] == 0 and delta.total_seconds() < 3600:
# Return of scate; is_active set to 1
bot.send_message(chat_id=int(user[1]), text=f"Скейт снова отправляет локацию")
cursor.execute(f"UPDATE users SET is_active = TRUE WHERE chat_id = {str(user[1])} LIMIT 1")
connection.commit()
connection.close()

Общую топологию сети можно увидеть на Рис. 5. Маяки подключаются к локальной сети Сколтеха, однако сканирование они проводят по всем имеющимся вокруг точкам доступа и беспроводным маршрутизаторам. Сервер, который обрабатывает пакеты от маячков, также находится в локальной сети. Он же отправляет запросы с помощью библиотеки PyTelegramBotAPI уже на внешний сервер. Пользователь в итоге получает сообщения об изменении локации своего транспортного средства, как на Рис. 6.

Ниже на Рис. 7 приведен процесс ручной подвязки аудитории к записи в базе данных. Сначала пользователю приходит уведомление о большой ошибке к ближайшей точке, где предлагается ввести вручную номер аудитории.

Недостатки и будущие изменения
Самое главное - нужно реализовать безопасность. Злоумышленнику ничего не мешает просто с определенной периодичностью отправлять пакеты на сервер, потому что используется HTTP. Типичная MITM атака. Нужно прикрутить электронную подпись к пакетам, отправляемыми маяками.
Второе обновление, которое конечно же напрашивается, это In-door локация не с привязкой к аудиториям, а с отображением точек на карте. Это уже принципиально другой подход к алгоритмам. Нужно использовать триангуляцию. С другой стороны, ничего не мешает также предлагать пользователю поставить точку на карте когда алгоритм определения координат не уверен. Затем по собравшейся базе данных искать ближайшие точки и определять координаты внутри помещения по пересечению этих ближайших точек. Но тогда пользовательский интерфейс тоже нужно будет продумывать принципиально по-другому. Боюсь, что обычным телеграм ботом тут не обойтись.
Заключение
Получился неплохой минимальный работающий продукт (MVP), который решает простую проблему по нахождению личного транспорта внутри кампуса. Не уверен, что этот проект выйдет за рамки любительского, потому что мало кто ездит по своему офису на самокатах и нуждается в поиске своего транспорта из-за большого кампуса.
Главной же идеей была реализация In-door локации с использованием только Wi-Fi сети. Вместо маячка в виде платы с аккумулятором может выступать как ноутбук, так и телефон (но не iPhone), что в перспективе может использоваться для поиска и навигации людей внутри помещений.
nikulin_krd
О великий Ктулху, да у нас тут его величество оверинжениринг)) Что мешает использовать UWB, который для этого предназначен и потребляет существенно меньше тока? Ну на крайний случай, что мешает в каждую аудиторию установить в розетку любое зигби-реле и даже не одно, что позволить упростить задачи триангуляции? И та и другая технология существенно дешевле и для «сервера» и для «клиента» с точки зрения потребления тока…
sergopl
"Сколтех — международный университет, который готовит новое поколение лидеров в области технологий, науки и бизнеса". Ваши идеи не подходят, они узнали о существовании esp32...