Привет, Хабр! Это Антон Дятлов, инженер по защите информации в Selectel. В одной из предыдущих статей я рассказывал, как настроить скрипт, который через API «Сканер-ВС 6» запустит сканирование, создаст отчеты и отправит уведомление в Telegram. Мы научились запускать сканер по расписанию через cron, импортировать IP-адреса и подсети, получать отчеты об уязвимостях. Такой подход хорошо работал на небольших объемах, но в реальных задачах — особенно в инфраструктурах с десятками и сотнями хостов — быстро всплыли ограничения: скрипт требовал ручного контроля на многих этапах, переход между стадиями (сетевой скан → скан уязвимостей → отчет) приходилось отслеживать вручную, проявилась ошибка в сканере, которая не позволяла корректно удалять ассеты.

В этой статье разберемся во второй версии скрипта — с переосмысленным пайплайном, расширенной поддержкой входных данных (IP и подсети), минимизацией ручных действий и автоматическим контролем всех этапов, а также удалением ассетов через SQL.

Требования к коду

Для начала рассмотрим, что должно уметь решение.

  • Регулярно сканировать множество хостов и подсетей. 

  • Объединять адреса по группам с помощью имен и ID, чтобы оптимизировать сканирование и экономить ресурсы. 

  • Работать в фоне: от запуска по cron до получения отчетов, без вмешательства со стороны человека.

Как это выглядело в первой версии

Исходный формат входных данных:

[
    {"id": "1", "name": "zone1", "ip": "10.1.1.1"},
    {"id": "2", "name": "zone1", "ip": "10.1.1.2"}
]

Базовый цикл работы:

for client in clients:
    asset_id = create_asset(client['ip'])
    scan_id = add_netscan_task(client['ip'])
    run_task(scan_id)

    while not is_completed(scan_id):
        time.sleep(3600)
    vulnscan_id = add_vulnscan_task(asset_id)
    run_task(vulnscan_id)

    while not is_completed(vulnscan_id):
        time.sleep(3600)
    report_id = add_report(asset_id)

Однако тут видим несколько нюансов. 

  • Нельзя обрабатывать IP (один адрес) и net (подсети) одновременно. Это затрудняет гибкость кода: нельзя сканировать подсети, приходится находить активные адреса и записывать их.

  • Переход между этапами сканирования завязан на cron и требует расчета времени, за которое просканируются адреса. Если не вычислить, за какое время просканируются конкретные адреса, есть риск перезапуска кода после завершения процесса. Это чревато лишними ручными действиями — придется удалять задачи и активы вручную.

  • Cron запускает пайплайн по расписанию — без проверки, завершился ли предыдущий этап, и с риском запуститься «в холостую» — когда не все задачи были завершены, а количество запусков cron ограничено.

Что еще не устраивало

  • Переходы между стадиями: если какой-то этап задержался, следующий откладывается и просто ждет следующего запуска cron.

  • Нет контроля состояния: пайплайн может «застрять», не досканировать или повторно запустить то, что уже просканировано.

  • Нет самовосстановления: если скрипт/сервер перезапустили, нужно начинать все заново или вручную перебирать промежуточные файлы и разбираться, где он остановился.

Игровой сервер с криперами и порталом в Незер. Добывайте ресурсы, стройте объекты, исследуйте мир Selectel в Minecraft и получайте призы.

Исследовать →

Архитектура v2.0

Ключевые идеи новой версии — универсальность, надежность и автоматизация.

  • Автоматическое определение типа входных данных (IP/net), динамическое формирование всех адресов.

  • Группировка по name, чтобы на каждого клиента, отдел или внутреннюю структуру запускалось по одному netscan, vulnscan и отчету.

  • Строгая стадийность: netscan → vulnscan → report. Все управление — с помощью внутреннего флага и JSON-файла, этапы идут друг за другом через at, а не cron.

  • Один JSON-файл хранит все: ID, прогресс, статусы. После завершения все возвращается к исходному виду.

  • Флаг done и защита от повторных запусков: cron может запускать скрипт хоть каждый час, но лишней работы не будет.

  • Возможность жесткой очистки ресурсов SQL-командой.

Пример кода v2.0 — главный цикл

Одно из ключевых преимуществ обновленной версии — гибкий запуск через cron. Скрипт можно запускать сколько угодно раз, но цикл выполнится только единожды в месяц — пока не будут созданы отчеты. Это избавляет от дублирования задач и активов, а также позволяет не следить вручную за количеством запусков:

  • не нужно «высчитывать», сколько раз запускать скрипт;

  • нет дублирующихся задач и сканов;

  • весь контроль происходит автоматически на уровне кода.

def main():
    original_data = load_json(client_list_file)
    yyyymm = get_current_yyyymm()
    # 1. Если есть флаг done — ничего не делаем
    if isinstance(original_data, dict) and original_data.get("done") and original_data.get("month") == yyyymm:
        print("Pipeline уже завершен в этом месяце. Повторный запуск не требуется.")
        return
    # 2. Стадия (netscan/vulnscan/report) и восстановление состояния
    if isinstance(original_data, dict) and "stage" in original_data:
        stage = original_data["stage"]
        orig_clients = original_data.get("original_clients", [])
        clients = original_data.get("clients", [])
    else:
        stage = "netscan"
        orig_clients = list(original_data)
        clients = list(original_data)
    # 3. Группировка по name, авторизация, запуск
    groups = group_clients_by_name(clients)
    auth_cookies = authorization()
    base_update(auth_cookies)

Обработка сетей и IP

В версии 1.0 скрипт умел обрабатывать только конкретные IP-адреса, а подсети игнорировались:

for client in clients:
    if "ip" in client:
    # Работать с адресом
    # Если net — не обрабатывали

В версии 2.0 добавлена поддержка подсетей (net). Скрипт пингует все адреса в заданной сети и выбирает только «живые»:

def get_alive_ips(network):
    net = ipaddress.ip_network(network, strict=False)
    alive = []
    for ip in net.hosts():
        ip_str = str(ip)
        try:
            result = subprocess.run(['ping', '-c', '1', '-W', '1', ip_str],
                                    stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL)
            if result.returncode == 0:
                alive.append(ip_str)
        except Exception:
            continue
    return alive

Затем все клиенты группируются по имени (name). В результате формируются группы с адресами и идентификаторами:

def group_clients_by_name(clients):
    groups = {}
    for client in clients:
        name = client["name"]
        if name not in groups:
            groups[name] = {"name": name, "ips": [], "ids": [], "assets": []}
        if "ip" in client:
            groups[name]["ips"].append(client["ip"])
            groups[name]["ids"].append(client["id"])
        elif "net" in client:
            alive_ips = get_alive_ips(client["net"])
            groups[name]["ips"].extend(alive_ips)
            groups[name]["ids"].append(client["id"])
    return groups

Если в клиентском списке указана подсеть (net), скрипт сам разбивает ее на IP, определяет, какие адреса доступны, и связывает их с конкретным ID клиента — по сути, каждому активному IP из подсети назначается тот же идентификатор, что и у источника сети. Это позволяет точно отслеживать, какие адреса относятся к какому клиенту.

Группировка по name

v1.0: каждая задача сканирования запускалась для одного IP. Один адрес — одна задача. Это быстро приводило к спаму задачами: сканер мог перегружаться, а интерфейс начинал тормозить. Кроме того, это было просто неудобно — особенно для больших клиентов с несколькими адресами: вместо одной задачи рождались десятки.

v2.0: все адреса, относящиеся к одному клиенту или внутреннему отделу, объединяются в одну задачу. Это упрощает управление, снижает нагрузку на сканер, а также ускоряет выполнение сканов:

for name, group in groups.items():
    if not group.get("id_netscan"):
        task_id = add_netscan_task(auth_cookies, name, group["ips"])
        if task_id:
            group["id_netscan"] = task_id

Итог: меньше задач, проще контролировать и быстрее обрабатывать.

Переход между этапами

v1.0: тесная связка с cron, а ожидание — по расписанию. Если netscan закончился быстро — допустим, за 10 минут, то следующий этап придется ожидать еще шесть часов.

В v2.0 каждый этап по завершению сам «назначает» запуск следующего:

def schedule_next_run(delay_seconds=120):
    cmd = f'echo "python3 {os.path.abspath(__file__)} --clientlist {client_list_file}" | at now + {delay_seconds//60} minutes'
    subprocess.run(cmd, shell=True)

Итог: скорость работы максимальная, время простоя между стадиями — минимальное.

Флаг done и защита от лишних запусков

v1.0: можно нечаянно перезапустить пайплайн и получить дублирование.

v2.0: в файл после завершения записывается флаг на этот месяц. Все последующие cron-запуски ничего не делают до следующего месяца:

{"done": true, "month": "202507"}

Итог: защита от повторов, не нужно проверять вручную.

Автоматическая чистка ресурсов

v1.0: API-удаление ресурсов иногда не срабатывает.

v2.0: в конце пайплайна вызывается SQL-скрипт, который удаляет все «зависшие» ресурсы:

def force_delete_all_assets_psql():
    cmd = [
        "psql",
        "postgres://scannervs:scannervs@localhost:5435/scanner-asset",
        "-c",
        "DELETE FROM scanner.assets"
    ]
    subprocess.run(cmd, ...)

Итог: база не захламляется, ресурсы не висят «грузом».

Реальный сценарий запуска: cron + at

Пример cron для «первой среды месяца», четырех стартов в сутки (каждые шесть часов) в течение трех дней подряд:

0 1,9,15,21 2,3,4 7 * python3 /way/to/script.py --clientlist /way/to/clientlist.json >> /way/to/scan1.log 2>&1

Если пайплайн занимает около 20 часов, cron может запустить его хоть 12 раз — но результат будет лишь один. Каждый этап запускает следующий через at.

Итоги и преимущества v2.0

Минимум ручных действий: один JSON и cron — все, дальше пайплайн работает сам. Если у вас больше одного JSON, можно настроить их обработку поочередно.

  • Есть поддержка любого сценария: IP и сети. Это позволяет производить сканирование как точечно, так и массово.

  • Восстановление после сбоев. Все стадии хранятся в JSON, запуск продолжается с нужного места. Не страшно, если ВМ со сканером упадет — прогресс сохранится.

  • Оптимизация. Никакого дублирования задач и отчетов, ручная работа сведена к минимуму.

  • Максимальная скорость. Следующий этап стартует сразу после завершения предыдущего.

  • Гибкость. Интеграция с внешними системами, уведомления приходят в Telegram.

Ниже — небольшая шпаргалка для интеграции своего пайплайна.

1. Добавьте свой список JSON в следующем формате:

[
    {"id": "1", "name": "имя сети 1", "net": "X.X.X.X/X"},
    {"id": "2", "name": "имя сети 2", "ip": "Y.Y.Y.Y"}
]

2. Пропишите cron с нужной частотой — обычно сканирование занимает от 6 до 48 часов в зависимости от размера подсети или количества открытых портов для IP-адресов.

3. Добавьте уведомления о завершении. Это поможет всегда быть в курсе готовности сканирований. Уведомление можно отправлять Telegram, чтобы оперативно выгружать отчеты и приступать к принятию мер по устранению уязвимостей.

Почему стоит мигрировать на новую версию

  • Ресурсы используются эффективнее.

  • Нет рисков пропустить скан или получить дубли.

  • Код стал проще, модульнее и надежнее.

  • Автоматизация процесса сканирования — вам нужно только все настроить и выгружать отчеты из сканера.

В будущем изучу возможность выгрузки отчетов в Telegram или почту, для еще более простого использования сканера. Если у вас есть опыт в этом направлении — делитесь в комментариях, будет интересно обсудить!

Комментарии (0)