О каждой из этих тем по отдельности я писал в этой: баннеры, и этой: сканирование, статьях. Но давайте попробуем объединить два этих скрипта, чтобы использовать для сканирования адресов возможности scapy, а также получать баннеры с открытых портов с помощью socket.
На самом деле это тема в общем-то не нова. И никто не мешает вам для сканирования сети взять nmap и пользоваться им напрямую или использовать его в скрипте python. Но гораздо интереснее попробовать сделать что-то, что будет выполнять похожие функции, самостоятельно. Тем более, такого тщательного сканирования в данном случае просто не требуется.
Попробую пояснить свою мысль. Уж не знаю, почему, но мне в последнее время стало интересно сканировать те адреса в интернете, которые, скорее всего, не находятся в индексе Google, Яндекс или еще какой поисковой системы. Просто в силу того, что на данные страницы не ведет ни одна ссылка в интернете. Но ведь за данными адресами могут скрываться как сайты, так и просто устройства (роутеры, маршрутизаторы, ip-камеры и т.д.). То есть, в простом понимании, позаниматься «нетсталкингом». Что это такое, можете сами поискать в
Тут же сразу возник вопрос: "А какие адреса сканировать и где их брать?". Этот вопрос я решил. Не знаю, насколько полно это получилось, но мне удалось собрать ip-диапазоны сетей провайдеров (CIDR) по всему миру. Всех или нет, это уже другой вопрос. Однако, того что удалось накопать уже хватит на многие месяцы сканирования. Файл с адресами я добавлю во вложение.
Таким образом, для того, чтобы не сканировать уныло каждый адрес один за другим, мы будем случайным образом выбирать диапазон, также случайно выбирать адрес и пытаться получить по нему данные. Ну и если сканирование пройдет успешно и мы найдем хоть один живой порт, запишем эти данные в json, чтобы при необходимости можно было использовать его в каких-то других проектах.
Достаточно теории. Давайте перейдем к коду.
Блок импорта
Перед тем, как начать писать код необходимо установить сторонние библиотеки, которые потребуются в процессе его работы. Это scapy и requests. Для их установки набираем в терминале следующую команду:
Все остальные библиотеки, которые используются в скрипте, являются библиотеками «из коробки».
Итак, после того, как нужные библиотеки будут установлены, нужно импортировать все что необходимо в проект:
Создадим переменную headers и запишем в нее словарь с user_agent. Он нам понадобиться при использовании библиотеки requests.
Теперь, в глобальном пространстве скрипта прописываем две строчки:
Первая нужна для того, чтобы подавить warning от библиотеки requests при использовании параметра verify=False. Вторая служит для того, чтобы сделать наш рандомный выбор хоть чуточку более рандомным.
Ну и создаем глобальную переменную result, в которой и будет содержаться словарь с найденными портами, баннерами и заголовками, и который впоследствии будет записываться в json.
Вспомогательные функции
Для того, чтобы скрипт был более продвинутым и не сканировал одни и те же адреса по нескольку раз, когда они вдруг будут выбраны в рандоме, нужно создать функцию, которая будет проверять наличие данных адресов в файле, куда будет сохраняться каждый просканированный адрес. Также нужно создать функцию, которая будет сохранять просканированные адреса в файл. И функцию, которая будет сохранять полученные данные из словаря result, если они там, конечно же есть. Помимо этого, была обнаружена особенность сохранения в json (раньше просто не сталкивался). При получении заголовков с адреса и попытке записи в json данных из словаря, выкидывалось исключение, что в данном случае, при записи регистр имеет значение. А значит, требуется также функция, которая будет переводить все полученные ключи и значения заголовков в нижний регистр.
Ну вот, вроде бы ничего не забыл. Давайте приступим к их созданию.
Перевод словаря в нижний регистр
Для начала создадим функцию case_sence(resp: dict) -> dict. С ее помощью мы будем переводить данные словаря в нижний регистр. На входе она получает словарь с данными, а на выходе возвращает уже словарь с теми же данными, но переведенными в нижний регистр.
Создаем временный словарь, куда будем складывать переведенные в нижний регистр значения — resp_low. В цикле пробегаемся по каждому ключу и значению словаря, переводим в нижний регистр, записываем данные во временный словарь и возвращаем из функции.
Сохранение json с полученными данными из словаря result
Создадим функцию save_json(addr). На вход она получает текущий ip-адрес по которому проводилось сканирование. Так как json-ы мы будем складывать в отдельную папку, создаем ее с помощью функции mkdir библиотеки pathlib. Она позволяет без особых проверок в случае отсутствия директории ее создать. Если же директория существует, вызывается исключение, которое просто игнорируется с помощью указания явного параметра exist_ok=True. Получаем текущую дату. Проверяем длину словаря. Если она больше нуля, значит какие-то данные были получены. Открываем файл на перезапись, в качестве имени файла указываем текущую дату и ip-адрес, который передается в функцию. Записываем json и затем, чтобы пользователь видел, что что-то происходит, выводим из этого же словаря содержащиеся в нем данные.
Сохраняем просканированный ip-адрес
Вне зависимости от того, удалось ли получить по данному ip данные или нет, для того, чтобы избежать его повторного сканирования, дописываем его в текстовый файл. Создадим функцию save_ip(ip). На вход она получает адрес, который нужно сохранить. Ну, а дальше все просто.
Отрываем файл на дозапись, и записываем в него строку с адресом, после чего переводим каретку на следующую строку, для следующего адреса.
Проверка текущего ip-адреса на наличие в уже просканированных адресах
Для того, чтобы не сканировать одни и те же адреса повторно, нужно проверить каждый текущий адрес на наличие его в файле, куда мы предварительно записали уже отсканированные адреса.
Создадим функцию check_ip(ip). На вход она получает адрес который нужно проверить.
Проверяем, существует ли файл с отсканированными адресами. Его ведь может и не быть, если это наше первое сканирование. Если файл есть, открываем его на чтение и создаем список из ip-адресов в нем содержащихся. После этого проверяем, есть ли наш ip в получившемся списке. Если есть, возвращаем True. Нет — False.
Получение случайного ip-адреса
Перед тем, как сканировать какой-либо адрес, его еще нужно получить. А потому, создадим функцию для его получения ip_from_range(path_dir). В нее мы будем передавать путь к файлу с диапазонами ip-адресов (CIDR) провайдеров.
Запускаем бесконечный цикл. В нем отрываем файл с CIDR, создаем из его строк список и с помощью random.choice выбираем случайный элемент. Затем, создаем список ip-адресов, которые входят в этот диапазон и также с помощью random.choice выбираем случайный адрес.
Теперь передаем его в функцию check_ip для проверки наличия в уже сканированных адресах. Если есть, итерируемся снова. Нет, двигаемся дальше. Выводим информацию о сканируемом адресе. Передаем его в функцию сканирования портов и получения данных syn_ack_scan, куда помимо адреса передается кортеж с диапазоном портов для сканирования. Здесь, в данном коде, они жестко определены. Но ничто не мешает вам сделать их запрос у пользователя или выставить свой диапазон.
Затем проверяем, не является ли словарь с данными пустым. Если он пуст, выводим в терминал сообщение об отсутствии результатов. Если не пуст, запускаем функцию сохранения данных save_json, в которую передаем текущий ip-адрес.
И затем, вне зависимости от того, были или нет получены данные по адресу, сохраняем текущий ip в файл уже сканированных и очищаем глобальный словарь result, где и содержались данные по адресу. Очищаем словарь в любом случае.
Получение баннеров на определенном порту
Что же, здесь я не буду особо оригинален и возьму предыдущий свой код из статьи указанной в начале. Разница в нем будет только в том, что в случае удачного получения баннера значение не будет записываться в словарь, а возвращаться просто в виде полученной строки.
Создадим функцию scan_port(ip: str, port: int) -> (str, dict, bool). На вход она получает ip-адрес для сканирования и порт. А возвращает данные в зависимости от полученных результатов и вызванных исключений.
Создаем потоковый сокет на основе протокола TCP, который будет передавать и принимать информацию. Устанавливаем время жизни соединения, которое равно 5 секунд. В принципе, данного времени должно хватить. Но, можно поэкспериментировать со значением. Устанавливаем соединение по переданным в функцию адресу и порту. Выполняем попытку получить баннер. Если баннер получен, пытаемся его декодировать, а также обрабатываем случай, когда декодированное значение является пустым. Если баннер не пустая строка, возвращаем полученное значение из функции. Обрабатываем различные ошибки при декодировании и установке соединения. А также ошибку при получении данных.
Сканирование портов с помощью scapy запросами syn — ack
А теперь создадим основную функцию, с помощью которой будем сканировать порты на полученном в функцию адресе. Создадим функцию syn_ack_scan(ip: str, ports: tuple). На вход она получает ip-адрес, и кортеж из диапазона портов. На самом деле, scapy умеет сам итерироваться по диапазону CIDR. Но, так как количество адресов в диапазоне может быть довольно большим, занимать это будет много времени. Потому, практичнее все же передавать в функцию один ip-адрес. Также у scapy есть возможность сканировать не диапазон, а только один порт. Но, в нашем случае передается кортеж. И для того, чтобы просканировать только один адрес, нужно в кортеже передать два одинаковых значения, то есть (80, 80), для примера.
Создаем пакет для сканирования. Выполняем запрос с установленным таймаутом и отключенным выводом в терминал. Забираем ответ и итерируемся по нему в цикле.
Так как результат сканирования это кортеж, то его можно распарсить на две переменные. В нашем случае это будут полученные и ошибочные ответы. Так как ошибочные ответы нас не интересуют, забираем только полученные в переменную receive. Выполняем проверку, если флаг в ответе содержит SA, что означает отрытый порт, так как от него получен ответ, создаем ключ в словаре с текущим ip-адресом.
Делаем проверку, если порт равен 80 или 443, с помощью requests выполняем запрос на наличие на той стороне сайта или страницы. То есть, пытаемся получить заголовки. Если получается, добавляем порт, протокол и полученные заголовки в словарь. Если не получиться, просто порт, протокол и то, что порт открыт.
В случае с другими портами делаем попытку получить баннер. Если баннер получен, добавляем порт, протокол и значение баннера в словарь. Если баннер не получен, добавляем порт, протокол и значение, что этот порт открыт с словарь.
И да, если заголовки были получены, делаем попытку получить из них ключ «location», который может указывать на сайт при переадресации с ip-адреса.
Вот, это если вкратце. Ну и обрабатываем различные исключения.
Запуск сканирования адресов
И еще один небольшой момент, это запуск сканирования адресов. Так как скрипт использует scapy, то и для работы он требует запуска из под «sudo». Потому проверяем в самом начале, как он запущен. Если не от имени суперпользователя, сообщаем об этом и выходим из скрипта.
Если же все хорошо, запускаем функцию рандомного получения адресов, куда передаем путь к файлу с CIDR.
Ну и то, что сохраняет скрипт в директории:
А так выглядит json внутри:
А на этом, пожалуй, все.
Спасибо за внимание. Надеюсь, данная информация будет вам полезна
UPD: Смотрите измененную версию скрипта в комментариях.
Дисклеймер: Все данные, предоставленные в данной статье, взяты из открытых источников, не призывают к действию и являются только лишь данными для ознакомления, и изучения механизмов используемых технологий.
На самом деле это тема в общем-то не нова. И никто не мешает вам для сканирования сети взять nmap и пользоваться им напрямую или использовать его в скрипте python. Но гораздо интереснее попробовать сделать что-то, что будет выполнять похожие функции, самостоятельно. Тем более, такого тщательного сканирования в данном случае просто не требуется.
Попробую пояснить свою мысль. Уж не знаю, почему, но мне в последнее время стало интересно сканировать те адреса в интернете, которые, скорее всего, не находятся в индексе Google, Яндекс или еще какой поисковой системы. Просто в силу того, что на данные страницы не ведет ни одна ссылка в интернете. Но ведь за данными адресами могут скрываться как сайты, так и просто устройства (роутеры, маршрутизаторы, ip-камеры и т.д.). То есть, в простом понимании, позаниматься «нетсталкингом». Что это такое, можете сами поискать в
Ссылка скрыта от гостей
.Тут же сразу возник вопрос: "А какие адреса сканировать и где их брать?". Этот вопрос я решил. Не знаю, насколько полно это получилось, но мне удалось собрать ip-диапазоны сетей провайдеров (CIDR) по всему миру. Всех или нет, это уже другой вопрос. Однако, того что удалось накопать уже хватит на многие месяцы сканирования. Файл с адресами я добавлю во вложение.
Таким образом, для того, чтобы не сканировать уныло каждый адрес один за другим, мы будем случайным образом выбирать диапазон, также случайно выбирать адрес и пытаться получить по нему данные. Ну и если сканирование пройдет успешно и мы найдем хоть один живой порт, запишем эти данные в json, чтобы при необходимости можно было использовать его в каких-то других проектах.
Достаточно теории. Давайте перейдем к коду.
Блок импорта
Перед тем, как начать писать код необходимо установить сторонние библиотеки, которые потребуются в процессе его работы. Это scapy и requests. Для их установки набираем в терминале следующую команду:
pip install requests
pip install --pre scapy[basic]
Все остальные библиотеки, которые используются в скрипте, являются библиотеками «из коробки».
Итак, после того, как нужные библиотеки будут установлены, нужно импортировать все что необходимо в проект:
Python:
import json
import random
import socket
import sys
from datetime import datetime as dt
from ipaddress import ip_network
from os import getuid
from pathlib import Path
from platform import system
import requests
import scapy.all as sc
Создадим переменную headers и запишем в нее словарь с user_agent. Он нам понадобиться при использовании библиотеки requests.
Python:
headers = {
"user-agent": "Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/107.0.0.0 "
"Safari/537.36 "
}
Теперь, в глобальном пространстве скрипта прописываем две строчки:
Python:
requests.packages.urllib3.disable_warnings()
random.seed(dt.now().second + random.randint(0, 101))
result = dict()
Первая нужна для того, чтобы подавить warning от библиотеки requests при использовании параметра verify=False. Вторая служит для того, чтобы сделать наш рандомный выбор хоть чуточку более рандомным.
Ну и создаем глобальную переменную result, в которой и будет содержаться словарь с найденными портами, баннерами и заголовками, и который впоследствии будет записываться в json.
Вспомогательные функции
Для того, чтобы скрипт был более продвинутым и не сканировал одни и те же адреса по нескольку раз, когда они вдруг будут выбраны в рандоме, нужно создать функцию, которая будет проверять наличие данных адресов в файле, куда будет сохраняться каждый просканированный адрес. Также нужно создать функцию, которая будет сохранять просканированные адреса в файл. И функцию, которая будет сохранять полученные данные из словаря result, если они там, конечно же есть. Помимо этого, была обнаружена особенность сохранения в json (раньше просто не сталкивался). При получении заголовков с адреса и попытке записи в json данных из словаря, выкидывалось исключение, что в данном случае, при записи регистр имеет значение. А значит, требуется также функция, которая будет переводить все полученные ключи и значения заголовков в нижний регистр.
Ну вот, вроде бы ничего не забыл. Давайте приступим к их созданию.
Перевод словаря в нижний регистр
Для начала создадим функцию case_sence(resp: dict) -> dict. С ее помощью мы будем переводить данные словаря в нижний регистр. На входе она получает словарь с данными, а на выходе возвращает уже словарь с теми же данными, но переведенными в нижний регистр.
Создаем временный словарь, куда будем складывать переведенные в нижний регистр значения — resp_low. В цикле пробегаемся по каждому ключу и значению словаря, переводим в нижний регистр, записываем данные во временный словарь и возвращаем из функции.
Python:
def case_sence(resp: dict) -> dict:
"""
Перевод регистра ключей и значений в нижний диапазон,
так как запись в верхнем регистре в json невозможна.
:param resp: словарь с данными для перевода.
:return: словарь с переведенными данными.
"""
resp_low = dict()
for res in resp:
resp_low.update({res.lower(): resp[res].lower()})
return resp_low
Сохранение json с полученными данными из словаря result
Создадим функцию save_json(addr). На вход она получает текущий ip-адрес по которому проводилось сканирование. Так как json-ы мы будем складывать в отдельную папку, создаем ее с помощью функции mkdir библиотеки pathlib. Она позволяет без особых проверок в случае отсутствия директории ее создать. Если же директория существует, вызывается исключение, которое просто игнорируется с помощью указания явного параметра exist_ok=True. Получаем текущую дату. Проверяем длину словаря. Если она больше нуля, значит какие-то данные были получены. Открываем файл на перезапись, в качестве имени файла указываем текущую дату и ip-адрес, который передается в функцию. Записываем json и затем, чтобы пользователь видел, что что-то происходит, выводим из этого же словаря содержащиеся в нем данные.
Python:
def save_json(addr):
"""
Сохранение данных из словаря с полученными значениями.
Создаем директорию для сохранения json, если она не существует.
Забираем дату в нужном формате. Проверяем, не является ли словарь result пустым.
Записываем значения из словаря в файл json.
Выводим информацию из словаря в терминал для пользователя.
:param addr: ip-адрес, по которому выполнялось текущее сканирование.
"""
(Path.cwd() / "checked_ip").mkdir(exist_ok=True)
day = dt.now().strftime("%d-%m-%Y")
if len(result) > 0:
with open(Path.cwd() / "checked_ip" / f"{day}_{addr}.json", 'w', encoding='utf-8') as file:
json.dump(result, file, indent=4, ensure_ascii=False)
for res in result:
print(f"\nИнформация об адресе: {res}\n{'-'*65}")
for st in result[res]:
print(f"Порт: {st:20} | Статус: {result[res][st]}")
print(f"{'-'*65}")
Сохраняем просканированный ip-адрес
Вне зависимости от того, удалось ли получить по данному ip данные или нет, для того, чтобы избежать его повторного сканирования, дописываем его в текстовый файл. Создадим функцию save_ip(ip). На вход она получает адрес, который нужно сохранить. Ну, а дальше все просто.
Отрываем файл на дозапись, и записываем в него строку с адресом, после чего переводим каретку на следующую строку, для следующего адреса.
Python:
def save_ip(ip):
"""
Сохраняет текущие ip-адреса в текстовый документ, для
последующей проверки по ним, перед сканированием, чтобы
избежать повторного сканирования.
Открываем файл, дописываем адрес в конец файла.
:param ip: ip-адрес для записи в файл.
"""
with open(Path.cwd() / "used_ip.txt", 'a', encoding='utf-8') as file:
file.write(f"{ip}\n")
Проверка текущего ip-адреса на наличие в уже просканированных адресах
Для того, чтобы не сканировать одни и те же адреса повторно, нужно проверить каждый текущий адрес на наличие его в файле, куда мы предварительно записали уже отсканированные адреса.
Создадим функцию check_ip(ip). На вход она получает адрес который нужно проверить.
Проверяем, существует ли файл с отсканированными адресами. Его ведь может и не быть, если это наше первое сканирование. Если файл есть, открываем его на чтение и создаем список из ip-адресов в нем содержащихся. После этого проверяем, есть ли наш ip в получившемся списке. Если есть, возвращаем True. Нет — False.
Python:
def check_ip(ip):
"""
Проверка ip-адреса на нахождение в списке уже сканированных адресов.
Проверяем, существует ли файл. Открываем, создаем список из адресов
и проверяем вхождение текущего адреса в сформированный список.
Если есть, возвращаем True, нет - False.
:param ip: строка, ip-адрес для проверки.
:return: булево значение. True или False в зависимости от результата.
"""
if (Path.cwd() / "used_ip.txt").exists():
with open(Path.cwd() / "used_ip.txt", 'r', encoding='utf-8') as file:
lines = [x.strip() for x in file.readlines()]
if ip in lines:
return True
return False
Получение случайного ip-адреса
Перед тем, как сканировать какой-либо адрес, его еще нужно получить. А потому, создадим функцию для его получения ip_from_range(path_dir). В нее мы будем передавать путь к файлу с диапазонами ip-адресов (CIDR) провайдеров.
Запускаем бесконечный цикл. В нем отрываем файл с CIDR, создаем из его строк список и с помощью random.choice выбираем случайный элемент. Затем, создаем список ip-адресов, которые входят в этот диапазон и также с помощью random.choice выбираем случайный адрес.
Теперь передаем его в функцию check_ip для проверки наличия в уже сканированных адресах. Если есть, итерируемся снова. Нет, двигаемся дальше. Выводим информацию о сканируемом адресе. Передаем его в функцию сканирования портов и получения данных syn_ack_scan, куда помимо адреса передается кортеж с диапазоном портов для сканирования. Здесь, в данном коде, они жестко определены. Но ничто не мешает вам сделать их запрос у пользователя или выставить свой диапазон.
Python:
def ip_from_range(path_dir):
"""
Получение случайного ip-адреса из файла с CIDR.
Запускаем бесконечный цикл. Открываем файл с диапазонами
cidr провайдеров. Выбираем случайный cidr, выбираем случайный ip.
Проверяем, не использовался ли этот адрес ранее.
Если нет, запускаем сканирование адреса. После, если словарь с результатом
не пуст, сохраняем полученные данные. Если пуст, выводим сообщение пользователю,
что нет результата. Вне зависимости от наполненности словаря сохраняем текущий ip,
после чего очищаем словарь.
:param path_dir: строка, путь к файлу с диапазонами (cidr).
"""
global result
while True:
with open(path_dir, 'r', encoding='utf-8') as file:
cidr = random.choice([x.strip() for x in file.readlines()])
ip = random.choice([str(x) for x in ip_network(cidr)])
if check_ip(ip):
continue
print(f"\nСканирую: {ip}")
syn_ack_scan(ip, (1, 1001))
if len(result) > 0:
save_json(ip)
else:
print(f"No result\n{'-' * 65}")
save_ip(ip)
result.clear()
Затем проверяем, не является ли словарь с данными пустым. Если он пуст, выводим в терминал сообщение об отсутствии результатов. Если не пуст, запускаем функцию сохранения данных save_json, в которую передаем текущий ip-адрес.
И затем, вне зависимости от того, были или нет получены данные по адресу, сохраняем текущий ip в файл уже сканированных и очищаем глобальный словарь result, где и содержались данные по адресу. Очищаем словарь в любом случае.
Получение баннеров на определенном порту
Что же, здесь я не буду особо оригинален и возьму предыдущий свой код из статьи указанной в начале. Разница в нем будет только в том, что в случае удачного получения баннера значение не будет записываться в словарь, а возвращаться просто в виде полученной строки.
Создадим функцию scan_port(ip: str, port: int) -> (str, dict, bool). На вход она получает ip-адрес для сканирования и порт. А возвращает данные в зависимости от полученных результатов и вызванных исключений.
Создаем потоковый сокет на основе протокола TCP, который будет передавать и принимать информацию. Устанавливаем время жизни соединения, которое равно 5 секунд. В принципе, данного времени должно хватить. Но, можно поэкспериментировать со значением. Устанавливаем соединение по переданным в функцию адресу и порту. Выполняем попытку получить баннер. Если баннер получен, пытаемся его декодировать, а также обрабатываем случай, когда декодированное значение является пустым. Если баннер не пустая строка, возвращаем полученное значение из функции. Обрабатываем различные ошибки при декодировании и установке соединения. А также ошибку при получении данных.
Python:
def scan_port(ip: str, port: int) -> (str, dict, bool):
"""
Получение банера на отрытом потру.
Выполняется подключение к ip-адресу на полученный
порт. Если подключение удалось, значит порт открыт. Производиться
попытка получить баннер. Если баннер не получен, обрабатывается
исключение, в котором идет запрос службы на порту у функции за
пределами класса. Если получить службу не удалось, возвращается
unassigned. Полученные значения добавляются в словари для
последующей обработки. В словарь с баннерами добавляются значения,
если баннер получен. В остальных случаях, в словарь открытых портов.
:param ip: ip-адрес или домен для сканирования.
:param port: Номер порта для сканирования. Целое число
:return: Возвращается False. Служит для выхода из функции
"""
s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
s.settimeout(5)
try:
s.connect((ip, port))
try:
banner = s.recv(1024).decode().strip()
if banner == '':
return False
else:
return banner
except OSError:
return False
except UnicodeDecodeError:
banner = s.recv(1024).strip()
return banner
except (socket.timeout, ConnectionRefusedError, OSError):
return False
Сканирование портов с помощью scapy запросами syn — ack
А теперь создадим основную функцию, с помощью которой будем сканировать порты на полученном в функцию адресе. Создадим функцию syn_ack_scan(ip: str, ports: tuple). На вход она получает ip-адрес, и кортеж из диапазона портов. На самом деле, scapy умеет сам итерироваться по диапазону CIDR. Но, так как количество адресов в диапазоне может быть довольно большим, занимать это будет много времени. Потому, практичнее все же передавать в функцию один ip-адрес. Также у scapy есть возможность сканировать не диапазон, а только один порт. Но, в нашем случае передается кортеж. И для того, чтобы просканировать только один адрес, нужно в кортеже передать два одинаковых значения, то есть (80, 80), для примера.
Создаем пакет для сканирования. Выполняем запрос с установленным таймаутом и отключенным выводом в терминал. Забираем ответ и итерируемся по нему в цикле.
Так как результат сканирования это кортеж, то его можно распарсить на две переменные. В нашем случае это будут полученные и ошибочные ответы. Так как ошибочные ответы нас не интересуют, забираем только полученные в переменную receive. Выполняем проверку, если флаг в ответе содержит SA, что означает отрытый порт, так как от него получен ответ, создаем ключ в словаре с текущим ip-адресом.
Делаем проверку, если порт равен 80 или 443, с помощью requests выполняем запрос на наличие на той стороне сайта или страницы. То есть, пытаемся получить заголовки. Если получается, добавляем порт, протокол и полученные заголовки в словарь. Если не получиться, просто порт, протокол и то, что порт открыт.
В случае с другими портами делаем попытку получить баннер. Если баннер получен, добавляем порт, протокол и значение баннера в словарь. Если баннер не получен, добавляем порт, протокол и значение, что этот порт открыт с словарь.
И да, если заголовки были получены, делаем попытку получить из них ключ «location», который может указывать на сайт при переадресации с ip-адреса.
Python:
def syn_ack_scan(ip: str, ports: tuple):
# создание пакета для сканирования
try:
request_syn = sc.IP(dst=ip) / sc.TCP(dport=ports, flags="S")
except socket.gaierror:
raise ValueError(f'{ip} получить не удалось')
answer = sc.sr(request_syn, timeout=2, retry=1, verbose=False)[0] # отправка пакета
for _, receive in answer:
try:
if receive['TCP'].flags == "SA":
try:
if ip not in result:
result[ip] = dict()
if receive['TCP'].sport == 80:
try:
head = requests.head(url=f"https://{ip}:80", headers=headers, verify=False,
timeout=3).headers
try:
result[ip].update({'80/http': case_sence(head)['location']})
except Exception:
result[ip].update({'80/http': case_sence(head)})
except Exception:
result[ip].update({"80/http": "open"})
elif receive['TCP'].sport == 443:
try:
head = requests.head(url=f"https://{ip}:443", headers=headers, verify=False,
timeout=3).headers
try:
result[ip].update({"443/http": case_sence(head)['location']})
except Exception:
result[ip].update({"443/http": case_sence(head)})
except Exception:
result[ip].update({"443/http": "open"})
else:
resp = scan_port(ip=ip, port=receive['TCP'].sport)
if not resp:
result[ip].update({f"{str(receive['TCP'].sport)}/"
f"{sc.TCP_SERVICES[receive['TCP'].sport]}": "open"})
else:
result[ip].update({f"{str(receive['TCP'].sport)}/"
f"{sc.TCP_SERVICES[receive['TCP'].sport]}": resp})
except KeyError:
result[ip].update({f"{str(receive['TCP'].sport)}/undefined": "open"})
except IndexError:
continue
Вот, это если вкратце. Ну и обрабатываем различные исключения.
Запуск сканирования адресов
И еще один небольшой момент, это запуск сканирования адресов. Так как скрипт использует scapy, то и для работы он требует запуска из под «sudo». Потому проверяем в самом начале, как он запущен. Если не от имени суперпользователя, сообщаем об этом и выходим из скрипта.
Если же все хорошо, запускаем функцию рандомного получения адресов, куда передаем путь к файлу с CIDR.
Python:
def main():
"""
Запуск сканирования адресов.
"""
if system() == "Linux":
if not getuid() == 0:
print("\n [+] Run the script as root user!")
sys.exit(0)
ip_from_range("provider_ranges.txt")
if __name__ == "__main__":
main()
Python:
"""
Скрипт для сканирования ip-адресов выбранных случайно из
списка CIDR провайдеров.
Для работы скрипта требуется установка библиотек:
pip install --pre scapy[basic] requests
"""
import json
import random
import socket
import sys
from datetime import datetime as dt
from ipaddress import ip_network
from os import getuid
from pathlib import Path
from platform import system
import requests
import scapy.all as sc
headers = {
"user-agent": "Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/107.0.0.0 "
"Safari/537.36 "
}
requests.packages.urllib3.disable_warnings()
random.seed(dt.now().second + random.randint(0, 101))
result = dict()
def scan_port(ip: str, port: int) -> (str, dict, bool):
"""
Получение банера на отрытом потру.
Выполняется подключение к ip-адресу на полученный
порт. Если подключение удалось, значит порт открыт. Производиться
попытка получить баннер. Если баннер не получен, обрабатывается
исключение, в котором идет запрос службы на порту у функции за
пределами класса. Если получить службу не удалось, возвращается
unassigned. Полученные значения добавляются в словари для
последующей обработки. В словарь с баннерами добавляются значения,
если баннер получен. В остальных случаях, в словарь открытых портов.
:param ip: ip-адрес или домен для сканирования.
:param port: Номер порта для сканирования. Целое число
:return: Возвращается False. Служит для выхода из функции
"""
s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
s.settimeout(5)
try:
s.connect((ip, port))
try:
banner = s.recv(1024).decode().strip()
if banner == '':
return False
else:
return banner
except OSError:
return False
except UnicodeDecodeError:
banner = s.recv(1024).strip()
return banner
except (socket.timeout, ConnectionRefusedError, OSError):
return False
def syn_ack_scan(ip: str, ports: tuple):
"""
Сканирование портов в переданном диапазоне на переданном ip-адресе.
Выполняется сканирование портов в указанном диапазоне. Если есть ответ
по текущему порту, выполняется попытка получения банера, если это не 80 и 443 порты.
Если это 80 или 443 порт, пытаемся получить заголовки сайта, который возможно
хостится на данном сервере по-данному ip.
Дабавляем все полученные данные в глобальный словарь.
:param ip: строка, ip-адрес цели.
:param ports: кортеж из портов для сканирования.
"""
# создание пакета для сканирования
try:
request_syn = sc.IP(dst=ip) / sc.TCP(dport=ports, flags="S")
except socket.gaierror:
raise ValueError(f'{ip} получить не удалось')
answer = sc.sr(request_syn, timeout=2, retry=1, verbose=False)[0] # отправка пакета
for _, receive in answer:
try:
if receive['TCP'].flags == "SA":
try:
print(receive['TCP'].sport)
if ip not in result:
result[ip] = dict()
if receive['TCP'].sport == 80:
try:
head = requests.head(url=f"https://{ip}:80", headers=headers, verify=False,
timeout=3).headers
try:
result[ip].update({'80/http': case_sence(head)['location']})
except Exception:
result[ip].update({'80/http': case_sence(head)})
except Exception:
result[ip].update({"80/http": "open"})
elif receive['TCP'].sport == 443:
try:
head = requests.head(url=f"https://{ip}:443", headers=headers, verify=False,
timeout=3).headers
try:
result[ip].update({"443/http": case_sence(head)['location']})
except Exception:
result[ip].update({"443/http": case_sence(head)})
except Exception:
result[ip].update({"443/http": "open"})
else:
resp = scan_port(ip=ip, port=receive['TCP'].sport)
if not resp:
result[ip].update({f"{str(receive['TCP'].sport)}/"
f"{sc.TCP_SERVICES[receive['TCP'].sport]}": "open"})
else:
result[ip].update({f"{str(receive['TCP'].sport)}/"
f"{sc.TCP_SERVICES[receive['TCP'].sport]}": resp})
except KeyError:
result[ip].update({f"{str(receive['TCP'].sport)}/undefined": "open"})
except IndexError:
continue
def case_sence(resp: dict) -> dict:
"""
Перевод регистра ключей и значений в нижний диапазон,
так как запись в верхнем регистре в json невозможна.
:param resp: словарь с данными для перевода.
:return: словарь с переведенными данными.
"""
resp_low = dict()
for res in resp:
resp_low.update({res.lower(): resp[res].lower()})
return resp_low
def save_json(addr):
"""
Сохранение данных из словаря с полученными значениями.
Создаем директорию для сохранения json, если она не существует.
Забираем дату в нужном формате. Проверяем, не является ли словарь result пустым.
Записываем значения из словаря в файл json.
Выводим информацию из словаря в терминал для пользователя.
:param addr: ip-адрес, по которому выполнялось текущее сканирование.
"""
(Path.cwd() / "checked_ip").mkdir(exist_ok=True)
day = dt.now().strftime("%d-%m-%Y")
if len(result) > 0:
with open(Path.cwd() / "checked_ip" / f"{day}_{addr}.json", 'w', encoding='utf-8') as file:
json.dump(result, file, indent=4, ensure_ascii=False)
for res in result:
print(f"\nИнформация об адресе: {res}\n{'-'*65}")
for st in result[res]:
print(f"Порт: {st:20} | Статус: {result[res][st]}")
print(f"{'-'*65}")
def save_ip(ip):
"""
Сохраняет текущие ip-адреса в текстовый документ, для
последующей проверки по ним, перед сканированием, чтобы
избежать повторного сканирования.
Открываем файл, дописываем адрес в конец файла.
:param ip: ip-адрес для записи в файл.
"""
with open(Path.cwd() / "used_ip.txt", 'a', encoding='utf-8') as file:
file.write(f"{ip}\n")
def check_ip(ip):
"""
Проверка ip-адреса на нахождение в списке уже сканированных адресов.
Проверяем, существует ли файл. Открываем, создаем список из адресов
и проверяем вхождение текущего адреса в сформированный список.
Если есть, возвращаем True, нет - False.
:param ip: строка, ip-адрес для проверки.
:return: булево значение. True или False в зависимости от результата.
"""
if (Path.cwd() / "used_ip.txt").exists():
with open(Path.cwd() / "used_ip.txt", 'r', encoding='utf-8') as file:
lines = [x.strip() for x in file.readlines()]
if ip in lines:
return True
return False
def ip_from_range(path_dir):
"""
Получение случайного ip-адреса из файла с CIDR.
Запускаем бесконечный цикл. Открываем файл с диапазонами
cidr провайдеров. Выбираем случайный cidr, выбираем случайный ip.
Проверяем, не использовался ли этот адрес ранее.
Если нет, запускаем сканирование адреса. После, если словарь с результатом
не пуст, сохраняем полученные данные. Если пуст, выводим сообщение пользователю,
что нет результата. Вне зависимости от наполненности словаря сохраняем текущий ip,
после чего очищаем словарь.
:param path_dir: строка, путь к файлу с диапазонами (cidr).
"""
global result
while True:
with open(path_dir, 'r', encoding='utf-8') as file:
cidr = random.choice([x.strip() for x in file.readlines()])
ip = random.choice([str(x) for x in ip_network(cidr)])
if check_ip(ip):
continue
print(f"\nСканирую: {ip}")
syn_ack_scan(ip, (1, 1001))
if len(result) > 0:
save_json(ip)
else:
print(f"No result\n{'-' * 65}")
save_ip(ip)
result.clear()
def main():
"""
Запуск сканирования адресов.
"""
if system() == "Linux":
if not getuid() == 0:
print("\n [+] Run the script as root user!")
sys.exit(0)
ip_from_range("provider_ranges.txt")
if __name__ == "__main__":
main()
Ну и то, что сохраняет скрипт в директории:
А так выглядит json внутри:
А на этом, пожалуй, все.
Спасибо за внимание. Надеюсь, данная информация будет вам полезна
UPD: Смотрите измененную версию скрипта в комментариях.
Вложения
Последнее редактирование модератором: