Статья Сканируем случайные адреса Интернет. Получаем баннеры и протоколы с помощью Python

О каждой из этих тем по отдельности я писал в этой: баннеры, и этой: сканирование, статьях. Но давайте попробуем объединить два этих скрипта, чтобы использовать для сканирования адресов возможности scapy, а также получать баннеры с открытых портов с помощью socket.

000.png

Дисклеймер: Все данные, предоставленные в данной статье, взяты из открытых источников, не призывают к действию и являются только лишь данными для ознакомления, и изучения механизмов используемых технологий.

На самом деле это тема в общем-то не нова. И никто не мешает вам для сканирования сети взять nmap и пользоваться им напрямую или использовать его в скрипте python. Но гораздо интереснее попробовать сделать что-то, что будет выполнять похожие функции, самостоятельно. Тем более, такого тщательного сканирования в данном случае просто не требуется.

Попробую пояснить свою мысль. Уж не знаю, почему, но мне в последнее время стало интересно сканировать те адреса в интернете, которые, скорее всего, не находятся в индексе Google, Яндекс или еще какой поисковой системы. Просто в силу того, что на данные страницы не ведет ни одна ссылка в интернете. Но ведь за данными адресами могут скрываться как сайты, так и просто устройства (роутеры, маршрутизаторы, ip-камеры и т.д.). То есть, в простом понимании, позаниматься «нетсталкингом». Что это такое, можете сами поискать в .

Тут же сразу возник вопрос: "А какие адреса сканировать и где их брать?". Этот вопрос я решил. Не знаю, насколько полно это получилось, но мне удалось собрать ip-диапазоны сетей провайдеров (CIDR) по всему миру. Всех или нет, это уже другой вопрос. Однако, того что удалось накопать уже хватит на многие месяцы сканирования. Файл с адресами я добавлю во вложение.

Таким образом, для того, чтобы не сканировать уныло каждый адрес один за другим, мы будем случайным образом выбирать диапазон, также случайно выбирать адрес и пытаться получить по нему данные. Ну и если сканирование пройдет успешно и мы найдем хоть один живой порт, запишем эти данные в json, чтобы при необходимости можно было использовать его в каких-то других проектах.

Достаточно теории. Давайте перейдем к коду.


Блок импорта

Перед тем, как начать писать код необходимо установить сторонние библиотеки, которые потребуются в процессе его работы. Это scapy и requests. Для их установки набираем в терминале следующую команду:

pip install requests
pip install --pre scapy[basic]

Все остальные библиотеки, которые используются в скрипте, являются библиотеками «из коробки».
Итак, после того, как нужные библиотеки будут установлены, нужно импортировать все что необходимо в проект:

Python:
import json
import random
import socket
import sys
from datetime import datetime as dt
from ipaddress import ip_network
from os import getuid
from pathlib import Path
from platform import system

import requests
import scapy.all as sc

Создадим переменную headers и запишем в нее словарь с user_agent. Он нам понадобиться при использовании библиотеки requests.

Python:
headers = {
    "user-agent": "Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/107.0.0.0 "
                  "Safari/537.36 "
}

Теперь, в глобальном пространстве скрипта прописываем две строчки:

Python:
requests.packages.urllib3.disable_warnings()
random.seed(dt.now().second + random.randint(0, 101))

result = dict()

Первая нужна для того, чтобы подавить warning от библиотеки requests при использовании параметра verify=False. Вторая служит для того, чтобы сделать наш рандомный выбор хоть чуточку более рандомным.
Ну и создаем глобальную переменную result, в которой и будет содержаться словарь с найденными портами, баннерами и заголовками, и который впоследствии будет записываться в json.


Вспомогательные функции

Для того, чтобы скрипт был более продвинутым и не сканировал одни и те же адреса по нескольку раз, когда они вдруг будут выбраны в рандоме, нужно создать функцию, которая будет проверять наличие данных адресов в файле, куда будет сохраняться каждый просканированный адрес. Также нужно создать функцию, которая будет сохранять просканированные адреса в файл. И функцию, которая будет сохранять полученные данные из словаря result, если они там, конечно же есть. Помимо этого, была обнаружена особенность сохранения в json (раньше просто не сталкивался). При получении заголовков с адреса и попытке записи в json данных из словаря, выкидывалось исключение, что в данном случае, при записи регистр имеет значение. А значит, требуется также функция, которая будет переводить все полученные ключи и значения заголовков в нижний регистр.

Ну вот, вроде бы ничего не забыл. Давайте приступим к их созданию.


Перевод словаря в нижний регистр

Для начала создадим функцию case_sence(resp: dict) -> dict. С ее помощью мы будем переводить данные словаря в нижний регистр. На входе она получает словарь с данными, а на выходе возвращает уже словарь с теми же данными, но переведенными в нижний регистр.

Создаем временный словарь, куда будем складывать переведенные в нижний регистр значения — resp_low. В цикле пробегаемся по каждому ключу и значению словаря, переводим в нижний регистр, записываем данные во временный словарь и возвращаем из функции.

Python:
def case_sence(resp: dict) -> dict:
    """
    Перевод регистра ключей и значений в нижний диапазон,
    так как запись в верхнем регистре в json невозможна.

    :param resp: словарь с данными для перевода.
    :return: словарь с переведенными данными.
    """
    resp_low = dict()
    for res in resp:
        resp_low.update({res.lower(): resp[res].lower()})
    return resp_low


Сохранение json с полученными данными из словаря result

Создадим функцию save_json(addr). На вход она получает текущий ip-адрес по которому проводилось сканирование. Так как json-ы мы будем складывать в отдельную папку, создаем ее с помощью функции mkdir библиотеки pathlib. Она позволяет без особых проверок в случае отсутствия директории ее создать. Если же директория существует, вызывается исключение, которое просто игнорируется с помощью указания явного параметра exist_ok=True. Получаем текущую дату. Проверяем длину словаря. Если она больше нуля, значит какие-то данные были получены. Открываем файл на перезапись, в качестве имени файла указываем текущую дату и ip-адрес, который передается в функцию. Записываем json и затем, чтобы пользователь видел, что что-то происходит, выводим из этого же словаря содержащиеся в нем данные.

Python:
def save_json(addr):
    """
    Сохранение данных из словаря с полученными значениями.

    Создаем директорию для сохранения json, если она не существует.
    Забираем дату в нужном формате. Проверяем, не является ли словарь result пустым.
    Записываем значения из словаря в файл json.
    Выводим информацию из словаря в терминал для пользователя.

    :param addr: ip-адрес, по которому выполнялось текущее сканирование.
    """
    (Path.cwd() / "checked_ip").mkdir(exist_ok=True)

    day = dt.now().strftime("%d-%m-%Y")
    if len(result) > 0:
        with open(Path.cwd() / "checked_ip" / f"{day}_{addr}.json", 'w', encoding='utf-8') as file:
            json.dump(result, file, indent=4, ensure_ascii=False)
        for res in result:
            print(f"\nИнформация об адресе: {res}\n{'-'*65}")
            for st in result[res]:
                print(f"Порт: {st:20} | Статус: {result[res][st]}")
        print(f"{'-'*65}")


Сохраняем просканированный ip-адрес

Вне зависимости от того, удалось ли получить по данному ip данные или нет, для того, чтобы избежать его повторного сканирования, дописываем его в текстовый файл. Создадим функцию save_ip(ip). На вход она получает адрес, который нужно сохранить. Ну, а дальше все просто.

Отрываем файл на дозапись, и записываем в него строку с адресом, после чего переводим каретку на следующую строку, для следующего адреса.

Python:
def save_ip(ip):
    """
    Сохраняет текущие ip-адреса в текстовый документ, для
    последующей проверки по ним, перед сканированием, чтобы
    избежать повторного сканирования.

    Открываем файл, дописываем адрес в конец файла.
    :param ip: ip-адрес для записи в файл.
    """
    with open(Path.cwd() / "used_ip.txt", 'a', encoding='utf-8') as file:
        file.write(f"{ip}\n")


Проверка текущего ip-адреса на наличие в уже просканированных адресах

Для того, чтобы не сканировать одни и те же адреса повторно, нужно проверить каждый текущий адрес на наличие его в файле, куда мы предварительно записали уже отсканированные адреса.

Создадим функцию check_ip(ip). На вход она получает адрес который нужно проверить.

Проверяем, существует ли файл с отсканированными адресами. Его ведь может и не быть, если это наше первое сканирование. Если файл есть, открываем его на чтение и создаем список из ip-адресов в нем содержащихся. После этого проверяем, есть ли наш ip в получившемся списке. Если есть, возвращаем True. Нет — False.

Python:
def check_ip(ip):
    """
    Проверка ip-адреса на нахождение в списке уже сканированных адресов.

    Проверяем, существует ли файл. Открываем, создаем список из адресов
    и проверяем вхождение текущего адреса в сформированный список.
    Если есть, возвращаем True, нет - False.
    :param ip: строка, ip-адрес для проверки.
    :return: булево значение. True или False в зависимости от результата.
    """
    if (Path.cwd() / "used_ip.txt").exists():
        with open(Path.cwd() / "used_ip.txt", 'r', encoding='utf-8') as file:
            lines = [x.strip() for x in file.readlines()]
            if ip in lines:
                return True
    return False


Получение случайного ip-адреса

Перед тем, как сканировать какой-либо адрес, его еще нужно получить. А потому, создадим функцию для его получения ip_from_range(path_dir). В нее мы будем передавать путь к файлу с диапазонами ip-адресов (CIDR) провайдеров.

Запускаем бесконечный цикл. В нем отрываем файл с CIDR, создаем из его строк список и с помощью random.choice выбираем случайный элемент. Затем, создаем список ip-адресов, которые входят в этот диапазон и также с помощью random.choice выбираем случайный адрес.

Теперь передаем его в функцию check_ip для проверки наличия в уже сканированных адресах. Если есть, итерируемся снова. Нет, двигаемся дальше. Выводим информацию о сканируемом адресе. Передаем его в функцию сканирования портов и получения данных syn_ack_scan, куда помимо адреса передается кортеж с диапазоном портов для сканирования. Здесь, в данном коде, они жестко определены. Но ничто не мешает вам сделать их запрос у пользователя или выставить свой диапазон.

Python:
def ip_from_range(path_dir):
    """
    Получение случайного ip-адреса из файла с CIDR.

    Запускаем бесконечный цикл. Открываем файл с диапазонами
    cidr провайдеров. Выбираем случайный cidr, выбираем случайный ip.
    Проверяем, не использовался ли этот адрес ранее.
    Если нет, запускаем сканирование адреса. После, если словарь с результатом
    не пуст, сохраняем полученные данные. Если пуст, выводим сообщение пользователю,
    что нет результата. Вне зависимости от наполненности словаря сохраняем текущий ip,
    после чего очищаем словарь.
    :param path_dir: строка, путь к файлу с диапазонами (cidr).
    """
    global result
    while True:
        with open(path_dir, 'r', encoding='utf-8') as file:
            cidr = random.choice([x.strip() for x in file.readlines()])
            ip = random.choice([str(x) for x in ip_network(cidr)])
            if check_ip(ip):
                continue
            print(f"\nСканирую: {ip}")
            syn_ack_scan(ip, (1, 1001))
            if len(result) > 0:
                save_json(ip)
            else:
                print(f"No result\n{'-' * 65}")
            save_ip(ip)
            result.clear()

Затем проверяем, не является ли словарь с данными пустым. Если он пуст, выводим в терминал сообщение об отсутствии результатов. Если не пуст, запускаем функцию сохранения данных save_json, в которую передаем текущий ip-адрес.
И затем, вне зависимости от того, были или нет получены данные по адресу, сохраняем текущий ip в файл уже сканированных и очищаем глобальный словарь result, где и содержались данные по адресу. Очищаем словарь в любом случае.


Получение баннеров на определенном порту

Что же, здесь я не буду особо оригинален и возьму предыдущий свой код из статьи указанной в начале. Разница в нем будет только в том, что в случае удачного получения баннера значение не будет записываться в словарь, а возвращаться просто в виде полученной строки.

Создадим функцию scan_port(ip: str, port: int) -> (str, dict, bool). На вход она получает ip-адрес для сканирования и порт. А возвращает данные в зависимости от полученных результатов и вызванных исключений.

Создаем потоковый сокет на основе протокола TCP, который будет передавать и принимать информацию. Устанавливаем время жизни соединения, которое равно 5 секунд. В принципе, данного времени должно хватить. Но, можно поэкспериментировать со значением. Устанавливаем соединение по переданным в функцию адресу и порту. Выполняем попытку получить баннер. Если баннер получен, пытаемся его декодировать, а также обрабатываем случай, когда декодированное значение является пустым. Если баннер не пустая строка, возвращаем полученное значение из функции. Обрабатываем различные ошибки при декодировании и установке соединения. А также ошибку при получении данных.

Python:
def scan_port(ip: str, port: int) -> (str, dict, bool):
    """
    Получение банера на отрытом потру.

    Выполняется подключение к ip-адресу на полученный
    порт. Если подключение удалось, значит порт открыт. Производиться
    попытка получить баннер. Если баннер не получен, обрабатывается
    исключение, в котором идет запрос службы на порту у функции за
    пределами класса. Если получить службу не удалось, возвращается
    unassigned. Полученные значения добавляются в словари для
    последующей обработки. В словарь с баннерами добавляются значения,
    если баннер получен. В остальных случаях, в словарь открытых портов.

    :param ip: ip-адрес или домен для сканирования.
    :param port: Номер порта для сканирования. Целое число
    :return: Возвращается False. Служит для выхода из функции
    """
    s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
    s.settimeout(5)

    try:
        s.connect((ip, port))
        try:
            banner = s.recv(1024).decode().strip()
            if banner == '':
                return False
            else:
                return banner
        except OSError:
            return False
        except UnicodeDecodeError:
            banner = s.recv(1024).strip()
            return banner
    except (socket.timeout, ConnectionRefusedError, OSError):
        return False


Сканирование портов с помощью scapy запросами syn — ack

А теперь создадим основную функцию, с помощью которой будем сканировать порты на полученном в функцию адресе. Создадим функцию syn_ack_scan(ip: str, ports: tuple). На вход она получает ip-адрес, и кортеж из диапазона портов. На самом деле, scapy умеет сам итерироваться по диапазону CIDR. Но, так как количество адресов в диапазоне может быть довольно большим, занимать это будет много времени. Потому, практичнее все же передавать в функцию один ip-адрес. Также у scapy есть возможность сканировать не диапазон, а только один порт. Но, в нашем случае передается кортеж. И для того, чтобы просканировать только один адрес, нужно в кортеже передать два одинаковых значения, то есть (80, 80), для примера.

Создаем пакет для сканирования. Выполняем запрос с установленным таймаутом и отключенным выводом в терминал. Забираем ответ и итерируемся по нему в цикле.

Так как результат сканирования это кортеж, то его можно распарсить на две переменные. В нашем случае это будут полученные и ошибочные ответы. Так как ошибочные ответы нас не интересуют, забираем только полученные в переменную receive. Выполняем проверку, если флаг в ответе содержит SA, что означает отрытый порт, так как от него получен ответ, создаем ключ в словаре с текущим ip-адресом.

Делаем проверку, если порт равен 80 или 443, с помощью requests выполняем запрос на наличие на той стороне сайта или страницы. То есть, пытаемся получить заголовки. Если получается, добавляем порт, протокол и полученные заголовки в словарь. Если не получиться, просто порт, протокол и то, что порт открыт.

В случае с другими портами делаем попытку получить баннер. Если баннер получен, добавляем порт, протокол и значение баннера в словарь. Если баннер не получен, добавляем порт, протокол и значение, что этот порт открыт с словарь.

И да, если заголовки были получены, делаем попытку получить из них ключ «location», который может указывать на сайт при переадресации с ip-адреса.

Python:
def syn_ack_scan(ip: str, ports: tuple):
    # создание пакета для сканирования
    try:
        request_syn = sc.IP(dst=ip) / sc.TCP(dport=ports, flags="S")
    except socket.gaierror:
        raise ValueError(f'{ip} получить не удалось')
    answer = sc.sr(request_syn, timeout=2, retry=1, verbose=False)[0]  # отправка пакета

    for _, receive in answer:
        try:
            if receive['TCP'].flags == "SA":
                try:
                    if ip not in result:
                        result[ip] = dict()
                    if receive['TCP'].sport == 80:
                        try:
                            head = requests.head(url=f"https://{ip}:80", headers=headers, verify=False,
                                                 timeout=3).headers
                            try:
                                result[ip].update({'80/http': case_sence(head)['location']})
                            except Exception:
                                result[ip].update({'80/http': case_sence(head)})
                        except Exception:
                            result[ip].update({"80/http": "open"})
                    elif receive['TCP'].sport == 443:
                        try:
                            head = requests.head(url=f"https://{ip}:443", headers=headers, verify=False,
                                                 timeout=3).headers
                            try:
                                result[ip].update({"443/http": case_sence(head)['location']})
                            except Exception:
                                result[ip].update({"443/http": case_sence(head)})
                        except Exception:
                            result[ip].update({"443/http": "open"})
                    else:
                        resp = scan_port(ip=ip, port=receive['TCP'].sport)
                        if not resp:
                            result[ip].update({f"{str(receive['TCP'].sport)}/"
                                               f"{sc.TCP_SERVICES[receive['TCP'].sport]}": "open"})
                        else:
                            result[ip].update({f"{str(receive['TCP'].sport)}/"
                                               f"{sc.TCP_SERVICES[receive['TCP'].sport]}": resp})
                except KeyError:
                    result[ip].update({f"{str(receive['TCP'].sport)}/undefined": "open"})
        except IndexError:
            continue

Вот, это если вкратце. Ну и обрабатываем различные исключения.


Запуск сканирования адресов

И еще один небольшой момент, это запуск сканирования адресов. Так как скрипт использует scapy, то и для работы он требует запуска из под «sudo». Потому проверяем в самом начале, как он запущен. Если не от имени суперпользователя, сообщаем об этом и выходим из скрипта.

Если же все хорошо, запускаем функцию рандомного получения адресов, куда передаем путь к файлу с CIDR.

Python:
def main():
    """
    Запуск сканирования адресов.
    """
    if system() == "Linux":
        if not getuid() == 0:
            print("\n [+] Run the script as root user!")
            sys.exit(0)

    ip_from_range("provider_ranges.txt")


if __name__ == "__main__":
    main()

Python:
"""
Скрипт для сканирования ip-адресов выбранных случайно из
списка CIDR провайдеров.

Для работы скрипта требуется установка библиотек:
pip install --pre scapy[basic] requests
"""

import json
import random
import socket
import sys
from datetime import datetime as dt
from ipaddress import ip_network
from os import getuid
from pathlib import Path
from platform import system

import requests
import scapy.all as sc

headers = {
    "user-agent": "Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/107.0.0.0 "
                  "Safari/537.36 "
}

requests.packages.urllib3.disable_warnings()
random.seed(dt.now().second + random.randint(0, 101))

result = dict()


def scan_port(ip: str, port: int) -> (str, dict, bool):
    """
    Получение банера на отрытом потру.

    Выполняется подключение к ip-адресу на полученный
    порт. Если подключение удалось, значит порт открыт. Производиться
    попытка получить баннер. Если баннер не получен, обрабатывается
    исключение, в котором идет запрос службы на порту у функции за
    пределами класса. Если получить службу не удалось, возвращается
    unassigned. Полученные значения добавляются в словари для
    последующей обработки. В словарь с баннерами добавляются значения,
    если баннер получен. В остальных случаях, в словарь открытых портов.

    :param ip: ip-адрес или домен для сканирования.
    :param port: Номер порта для сканирования. Целое число
    :return: Возвращается False. Служит для выхода из функции
    """
    s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
    s.settimeout(5)

    try:
        s.connect((ip, port))
        try:
            banner = s.recv(1024).decode().strip()
            if banner == '':
                return False
            else:
                return banner
        except OSError:
            return False
        except UnicodeDecodeError:
            banner = s.recv(1024).strip()
            return banner
    except (socket.timeout, ConnectionRefusedError, OSError):
        return False


def syn_ack_scan(ip: str, ports: tuple):
    """
    Сканирование портов в переданном диапазоне на переданном ip-адресе.

    Выполняется сканирование портов в указанном диапазоне. Если есть ответ
    по текущему порту, выполняется попытка получения банера, если это не 80 и 443 порты.
    Если это 80 или 443 порт, пытаемся получить заголовки сайта, который возможно
    хостится на данном сервере по-данному ip.
    Дабавляем все полученные данные в глобальный словарь.

    :param ip: строка, ip-адрес цели.
    :param ports: кортеж из портов для сканирования.
    """
    # создание пакета для сканирования
    try:
        request_syn = sc.IP(dst=ip) / sc.TCP(dport=ports, flags="S")
    except socket.gaierror:
        raise ValueError(f'{ip} получить не удалось')
    answer = sc.sr(request_syn, timeout=2, retry=1, verbose=False)[0]  # отправка пакета

    for _, receive in answer:
        try:
            if receive['TCP'].flags == "SA":
                try:
                    print(receive['TCP'].sport)
                    if ip not in result:
                        result[ip] = dict()
                    if receive['TCP'].sport == 80:
                        try:
                            head = requests.head(url=f"https://{ip}:80", headers=headers, verify=False,
                                                 timeout=3).headers
                            try:
                                result[ip].update({'80/http': case_sence(head)['location']})
                            except Exception:
                                result[ip].update({'80/http': case_sence(head)})
                        except Exception:
                            result[ip].update({"80/http": "open"})
                    elif receive['TCP'].sport == 443:
                        try:
                            head = requests.head(url=f"https://{ip}:443", headers=headers, verify=False,
                                                 timeout=3).headers
                            try:
                                result[ip].update({"443/http": case_sence(head)['location']})
                            except Exception:
                                result[ip].update({"443/http": case_sence(head)})
                        except Exception:
                            result[ip].update({"443/http": "open"})
                    else:
                        resp = scan_port(ip=ip, port=receive['TCP'].sport)
                        if not resp:
                            result[ip].update({f"{str(receive['TCP'].sport)}/"
                                               f"{sc.TCP_SERVICES[receive['TCP'].sport]}": "open"})
                        else:
                            result[ip].update({f"{str(receive['TCP'].sport)}/"
                                               f"{sc.TCP_SERVICES[receive['TCP'].sport]}": resp})
                except KeyError:
                    result[ip].update({f"{str(receive['TCP'].sport)}/undefined": "open"})
        except IndexError:
            continue


def case_sence(resp: dict) -> dict:
    """
    Перевод регистра ключей и значений в нижний диапазон,
    так как запись в верхнем регистре в json невозможна.

    :param resp: словарь с данными для перевода.
    :return: словарь с переведенными данными.
    """
    resp_low = dict()
    for res in resp:
        resp_low.update({res.lower(): resp[res].lower()})
    return resp_low


def save_json(addr):
    """
    Сохранение данных из словаря с полученными значениями.

    Создаем директорию для сохранения json, если она не существует.
    Забираем дату в нужном формате. Проверяем, не является ли словарь result пустым.
    Записываем значения из словаря в файл json.
    Выводим информацию из словаря в терминал для пользователя.

    :param addr: ip-адрес, по которому выполнялось текущее сканирование.
    """
    (Path.cwd() / "checked_ip").mkdir(exist_ok=True)

    day = dt.now().strftime("%d-%m-%Y")
    if len(result) > 0:
        with open(Path.cwd() / "checked_ip" / f"{day}_{addr}.json", 'w', encoding='utf-8') as file:
            json.dump(result, file, indent=4, ensure_ascii=False)
        for res in result:
            print(f"\nИнформация об адресе: {res}\n{'-'*65}")
            for st in result[res]:
                print(f"Порт: {st:20} | Статус: {result[res][st]}")
        print(f"{'-'*65}")


def save_ip(ip):
    """
    Сохраняет текущие ip-адреса в текстовый документ, для
    последующей проверки по ним, перед сканированием, чтобы
    избежать повторного сканирования.

    Открываем файл, дописываем адрес в конец файла.
    :param ip: ip-адрес для записи в файл.
    """
    with open(Path.cwd() / "used_ip.txt", 'a', encoding='utf-8') as file:
        file.write(f"{ip}\n")


def check_ip(ip):
    """
    Проверка ip-адреса на нахождение в списке уже сканированных адресов.

    Проверяем, существует ли файл. Открываем, создаем список из адресов
    и проверяем вхождение текущего адреса в сформированный список.
    Если есть, возвращаем True, нет - False.
    :param ip: строка, ip-адрес для проверки.
    :return: булево значение. True или False в зависимости от результата.
    """
    if (Path.cwd() / "used_ip.txt").exists():
        with open(Path.cwd() / "used_ip.txt", 'r', encoding='utf-8') as file:
            lines = [x.strip() for x in file.readlines()]
            if ip in lines:
                return True
    return False


def ip_from_range(path_dir):
    """
    Получение случайного ip-адреса из файла с CIDR.

    Запускаем бесконечный цикл. Открываем файл с диапазонами
    cidr провайдеров. Выбираем случайный cidr, выбираем случайный ip.
    Проверяем, не использовался ли этот адрес ранее.
    Если нет, запускаем сканирование адреса. После, если словарь с результатом
    не пуст, сохраняем полученные данные. Если пуст, выводим сообщение пользователю,
    что нет результата. Вне зависимости от наполненности словаря сохраняем текущий ip,
    после чего очищаем словарь.
    :param path_dir: строка, путь к файлу с диапазонами (cidr).
    """
    global result
    while True:
        with open(path_dir, 'r', encoding='utf-8') as file:
            cidr = random.choice([x.strip() for x in file.readlines()])
            ip = random.choice([str(x) for x in ip_network(cidr)])
            if check_ip(ip):
                continue
            print(f"\nСканирую: {ip}")
            syn_ack_scan(ip, (1, 1001))
            if len(result) > 0:
                save_json(ip)
            else:
                print(f"No result\n{'-' * 65}")
            save_ip(ip)
            result.clear()


def main():
    """
    Запуск сканирования адресов.
    """
    if system() == "Linux":
        if not getuid() == 0:
            print("\n [+] Run the script as root user!")
            sys.exit(0)

    ip_from_range("provider_ranges.txt")


if __name__ == "__main__":
    main()

Ну и то, что сохраняет скрипт в директории:

01.png


А так выглядит json внутри:

02.png


А на этом, пожалуй, все.

Спасибо за внимание. Надеюсь, данная информация будет вам полезна

UPD:
Смотрите измененную версию скрипта в комментариях.
 

Вложения

Последнее редактирование модератором:
  • Нравится
Реакции: Dzen
Тут же сразу возник вопрос: "А какие адреса сканировать и где их брать?". Этот вопрос я решил. Не знаю, насколько полно это получилось, но мне удалось собрать ip-диапазоны сетей провайдеров (CIDR) по всему миру. Всех или нет, это уже другой вопрос. Однако, того что удалось накопать уже хватит на многие месяцы сканирования. Файл с адресами я добавлю во вложение.
из Мордора не открывается



Для их установки набираем в терминале следующую команду:

pip install --pre scapy[basic] requests
1669286095405.png



По коду могу сказать одно - вам есть к чему стремиться, а это скорее всего неплохо)

Создадим переменную headers и запишем в нее словарь с user_agent. Он нам понадобиться при использовании библиотеки requests.
Ну и для таких действий есть отличная генерации фэйковых/случайных юзерагентов


Достаточно просто
Python:
if result:
Пустой объект всегда возвращает False

Python:
open(Path.cwd() / "checked_ip" / f"{day}_{addr}.json"
А если скрипт будет выполняться в Windows? Получим портянку ошибок связанных с обращением к пути.
Для подобных операций в python есть отличный метод os.path.join
Он сам выбирает нужный тип слеша, в зависимости от системы.


Вне зависимости от того, удалось ли получить по данному ip данные или нет
Так а почему это не важно?
Я бы наоборот писал в один файл те адреса, у которых мы точно что-то получили, а в другой - те, у которых была ошибка или, которые недоступны.

Python:
            lines = [x.strip() for x in file.readlines()]
            if ip in lines:
                return True
условие if p in lines можно так же убрать в генератор списка и избавиться от переменной lines

Получение случайного ip-адреса
Python:
cidr = random.choice([x.strip() for x in file.readlines()])
Зачем тут цикл, если получить случайный элемент можно напрямую из readlines()
Python:
cidr = random.choice(file.readlines()).strip()
Аналогично и здесь
Python:
  ip = random.choice([str(x) for x in ip_network(cidr)])

Но сам цикл и программа будет безумно медленной, вы только представьте, сколько будет накладных расходов на чтение и запись только одного IP адреса?
Вам нужно прочитать 3 файла, отфильтровать данные и записать их.

Куда разумнее было бы сначала создать список (а лучше кортеж) и определённого случайного диапазона. а потом уже этот список обрабатывать. А фильтрацию производить не в момент сканирования, а до или после.
Python:
    try:
        s.connect((ip, port))
        try:
            banner = s.recv(1024).decode().strip()
            if banner == '':
                return False
            else:
                return banner
        except OSError:
            return False
        except UnicodeDecodeError:
            banner = s.recv(1024).strip()
            return banner
    except (socket.timeout, ConnectionRefusedError, OSError):
        return False
а зачем столько try? Убрать обработку кода можно в один блок, у вас же все исключения описаны.

Python:
            banner = s.recv(1024).decode().strip()
            if banner == '':
для таких конструкций в python3.8 ввели моржовый оператор.

Python:
            banner = s.recv(1024).strip()
            return banner
разумнее сразу возвращать результат, не тратя времени на создание переменной
Python:
            return s.recv(1024).strip()


Функция сканирования портов
Какая же странная функция:
1.
Python:
try:
    request_syn = sc.IP(dst=ip) / sc.TCP(dport=ports, flags="S")
except socket.gaierror:
    raise ValueError(f'{ip} получить не удалось')
answer = sc.sr(request_syn, timeout=2, retry=1, verbose=False)[0]  # отправка пакета
Не совсем понятно - если возникло исключение socket.gaierror, зачем продолжать выполнение кода?

2. Зачем столько блоков try?
Почитайте что такое код


Прочтите вот эту статью @explorer и попробуйте реализовать многопоточность, доработав оставшийся функционал, пока код, откровенно, сырой.
 
из Мордора не открывается




Посмотреть вложение 64644


По коду могу сказать одно - вам есть к чему стремиться, а это скорее всего неплохо)


Ну и для таких действий есть отличная генерации фэйковых/случайных юзерагентов



Достаточно просто
Python:
if result:
Пустой объект всегда возвращает False

Python:
open(Path.cwd() / "checked_ip" / f"{day}_{addr}.json"
А если скрипт будет выполняться в Windows? Получим портянку ошибок связанных с обращением к пути.
Для подобных операций в python есть отличный метод os.path.join
Он сам выбирает нужный тип слеша, в зависимости от системы.



Так а почему это не важно?
Я бы наоборот писал в один файл те адреса, у которых мы точно что-то получили, а в другой - те, у которых была ошибка или, которые недоступны.

Python:
            lines = [x.strip() for x in file.readlines()]
            if ip in lines:
                return True
условие if p in lines можно так же убрать в генератор списка и избавиться от переменной lines


Python:
cidr = random.choice([x.strip() for x in file.readlines()])
Зачем тут цикл, если получить случайный элемент можно напрямую из readlines()
Python:
cidr = random.choice(file.readlines()).strip()
Аналогично и здесь
Python:
  ip = random.choice([str(x) for x in ip_network(cidr)])

Но сам цикл и программа будет безумно медленной, вы только представьте, сколько будет накладных расходов на чтение и запись только одного IP адреса?
Вам нужно прочитать 3 файла, отфильтровать данные и записать их.

Куда разумнее было бы сначала создать список (а лучше кортеж) и определённого случайного диапазона. а потом уже этот список обрабатывать. А фильтрацию производить не в момент сканирования, а до или после.
Python:
    try:
        s.connect((ip, port))
        try:
            banner = s.recv(1024).decode().strip()
            if banner == '':
                return False
            else:
                return banner
        except OSError:
            return False
        except UnicodeDecodeError:
            banner = s.recv(1024).strip()
            return banner
    except (socket.timeout, ConnectionRefusedError, OSError):
        return False
а зачем столько try? Убрать обработку кода можно в один блок, у вас же все исключения описаны.

Python:
            banner = s.recv(1024).decode().strip()
            if banner == '':
для таких конструкций в python3.8 ввели моржовый оператор.

Python:
            banner = s.recv(1024).strip()
            return banner
разумнее сразу возвращать результат, не тратя времени на создание переменной
Python:
            return s.recv(1024).strip()


Функция сканирования портов
Какая же странная функция:
1.
Python:
try:
    request_syn = sc.IP(dst=ip) / sc.TCP(dport=ports, flags="S")
except socket.gaierror:
    raise ValueError(f'{ip} получить не удалось')
answer = sc.sr(request_syn, timeout=2, retry=1, verbose=False)[0]  # отправка пакета
Не совсем понятно - если возникло исключение socket.gaierror, зачем продолжать выполнение кода?

2. Зачем столько блоков try?
Почитайте что такое код


Прочтите вот эту статью @explorer и попробуйте реализовать многопоточность, доработав оставшийся функционал, пока код, откровенно, сырой.

Позвольте мне частично ответить на данные замечания. Частично, потому, что с некоторыми согласен. С некоторыми нет ))

По поводу установки: pip install –pre scapy[basic] requests ошибок не возникало ни на Windows, ни на Linux.
Это установка на Windows (для примера):

screenshot1.png


Из документации:

screenshot4.png


По поводу fake-useragent знаю. Но в данном случае считаю его использование излишним, так как мне не особо важно, чтобы заголовок менялся. Запросы делаются не на один сайт. А потому создавать видимость другого браузера нет необходимости. И еще у него иногда возникает исключение, в котором говориться, что ресурс недоступен. Так как код лежит на heroku. С ошибкой сталкивался лично.

По поводу путей. Модуль Path из библиотеки pathlib и был создан для того, чтобы заменить некоторые функции предоставляемые библиотекой os. В данном случае слеш служит не для разделения пути, а именно как оператор функции. В Windows путь будет выглядеть примерно так:

screenshot2.png


Примерно потому, что убрал дату и адрес.

По поводу записи адреса в файл, получили данные или нет. В данном случае записывается проверенный адрес для того, чтобы не проверять его в будущем, при последующем запуске скрипта. В данном случае не имеет значения, были ли ошибки в процессе получения данных или нет. Просто потому, что если данные не получены, я считаю этот адрес не рабочим. Отдельно записывать ошибочные адреса не вижу смысла, так как я итак сохраняю данные полученные с адресов в json. А если я буду сохранять файл с валидными адресами и с невалидными, то в этом случае мне придется при проверке адреса итерироваться уже по двум спискам? Но тут не знаю. Может быть я просто не понял вашу мысль.

По этому коду:

Python:
            lines = [x.strip() for x in file.readlines()]
            if ip in lines:
                return True

Здесь да, согласен. Можно действительно сделать так: return [x.strip() for x in file.readlines() if ip in lines]

По поводу получения случайного диапазона из readlines() – согласен.
По поводу этого: ip = random.choice([str(x) for x in ip_network(cidr)]) нет. Выбрасывает в исключение:

screenshot3.png


По этому коду:

Python:
    try:
        s.connect((ip, port))
        try:
            banner = s.recv(1024).decode().strip()
            if banner == '':
                return False
            else:
                return banner
        except OSError:
            return False
        except UnicodeDecodeError:
            banner = s.recv(1024).strip()
            return banner
    except (socket.timeout, ConnectionRefusedError, OSError):
        return False

Да, можно сократить:

Python:
    try:
        s.connect((ip, port))
        if banner := s.recv(1024).decode().strip():
            return banner
        return False
    except UnicodeDecodeError:
        return s.recv(1024).strip()
    except (socket.timeout, ConnectionRefusedError, OSError):
        return False

По поводу этого:

Python:
try:
    request_syn = sc.IP(dst=ip) / sc.TCP(dport=ports, flags="S")
except socket.gaierror:
    raise ValueError(f'{ip} получить не удалось')
answer = sc.sr(request_syn, timeout=2, retry=1, verbose=False)[0]

Да, пародон. Данный кусок кода использовался у меня не тут. И пару раз даже попадался мне на глаза. Но, почему-то выпустил из виду. Блок try – except здесь лишний.

По поводу «писать все дважды». Изначально было желание вынести получение заголовков в отдельную функцию. Передумал, чтобы не загромождать код. А так да. Можно было бы просто убрать отсюда и вынести отдельно вот этот кусок:

Python:
                        try:
                            head = requests.head(url=f"https://{ip}:80", headers=headers, verify=False,
                                                 timeout=3).headers
                            try:
                                result[ip].update({'80/http': case_sence(head)['location']})
                            except Exception:
                                result[ip].update({'80/http': case_sence(head)})
                        except Exception:
                            result[ip].update({"80/http": "open"})

и этот:

Python:
                        try:
                            head = requests.head(url=f"https://{ip}:443", headers=headers, verify=False,
                                                 timeout=3).headers
                            try:
                                result[ip].update({"443/http": case_sence(head)['location']})
                            except Exception:
                                result[ip].update({"443/http": case_sence(head)})
                        except Exception:
                            result[ip].update({"443/http": "open"})

По многопоточности. Делал. Тщательность сканирования портов снижается. Плюс к тому, скорее всего у scapy под капотом своя обработка потоков (тут не уверен, но судя по скорости работы так и есть), так как 1000 портов сканируются достаточно быстро, также как если бы были задействованы потоки. Ведь по сути, когда мы сканируем локальную сеть с помощью scapy, он может сканировать не один или пару адресов. Он может просканировать сразу же диапазон: 192.168.0.1/24. Так что, потоки здесь излишни.
Хотя тут вопрос, как scapy отправляет запросы на каждый порт. В документации об этом сказано достаточно расплывчато. Думаю, надо еще поискать информацию по данному вопросу.

UPD: Забыл упомянуть. По созданию списка для проверки адресов, согласен. Быстрее проверить вхождение. А вот по поводу сохранения, тут ситуация двоякая. С одной стороны, можно было бы сделать список, по мере работы добавлять в него адреса, а уже в конце, по KeyboardInterrupt сохранять все в файл. Но сделал так, как сделал, то есть сохраняю в файл после каждой итерации, во избежание потери данных о проверенных адресах.

За статью @explorer спасибо. Думаю вы и имели в виду мультипроцессинг. До него руки пока не дошли. Но очень хочется )).

А так да, конечно же, всегда есть к чему стремиться.
 
Последнее редактирование:
  • Нравится
Реакции: f22
Немного переделал скрипт. Сделал раздельное сканирование портов, получение баннеров, а также получение заголовков.
Теперь данные сохраняются в БД SQLite, так как в скрипте явно требовалась более быстрая реализация проверки ip-адреса, был он уже сканирован или нет.
Если продолжить сохранять адреса в текстовый документ, то в конце-концов, так как адресов все же довольно много, его размер может достигнуть критического значения.
Конечно, при больших объемах памяти в нынешних ПК достигнуть их объема достаточно трудно, но хранить такой объект, даже временно в памяти не совсем практично,
как и итерироваться по нему построчно. Все это занимает слишком много времени.

Расписывать подробности не буду. По отношению к статье изменился порядок сканирования, ну и добавилась функция сохранения данных в БД.
В принципе, скрипт стал отрабатывать быстрее, так как делает меньше лишних движений. Если раньше для получения данных требовалось около 9 с копейками
секунд. то сейчас это время сократилось до 5 с небольшим.

Из добавленного еще список популярных портов, который напрямую передается в scapy. Так как он может принимать кортежи, списки и отдельные порты для сканирования.

Python:
import random
import socket
import sqlite3
import threading
import time
from datetime import datetime as dt
from ipaddress import ip_network
from pathlib import Path

import requests
import scapy.all as sc

requests.packages.urllib3.disable_warnings()
random.seed(dt.now().second + random.randint(0, 101))

ports = [1, 3, 4, 6, 7, 9, 13, 17, 19, 20, 21, 22, 23, 24, 25, 26, 30, 32, 33, 37, 42, 43, 49, 53, 70, 79,
            80, 81, 82, 83, 84, 85, 88, 89, 90, 99, 100, 106, 109, 110, 111, 113, 119, 125, 135, 139, 143, 144,
            146, 161, 163, 179, 199, 211, 212, 222, 254, 255, 256, 259, 264, 280, 301, 306, 311, 340, 366, 389,
            406, 407, 416, 417, 425, 427, 443, 444, 445, 458, 464, 465, 481, 497, 500, 512, 513, 514, 515, 524,
            541, 543, 544, 545, 548, 554, 555, 563, 587, 593, 616, 617, 625, 631, 636, 646, 648, 666, 667, 668,
            683, 687, 691, 700, 705, 711, 714, 720, 722, 726, 749, 765, 777, 783, 787, 800, 801, 808, 843, 873,
            880, 888, 898, 900, 901, 902, 903, 911, 912, 981, 987, 990, 992, 993, 995, 999, 1000, 1001, 1002,
            1007, 1009, 1010, 1011, 1021, 1022, 1023, 1024, 1025, 1026, 1027, 1028, 1029, 1030, 1031, 1032,
            1033, 1034, 1035, 1036, 1037, 1038, 1039, 1040, 1041, 1042, 1043, 1044, 1045, 1046, 1047, 1048,
            1049, 1050, 1051, 1052, 1053, 1054, 1055, 1056, 1057, 1058, 1059, 1060, 1061, 1062, 1063, 1064,
            1065, 1066, 1067, 1068, 1069, 1070, 1071, 1072, 1073, 1074, 1075, 1076, 1077, 1078, 1079, 1080,
            1081, 1082, 1083, 1084, 1085, 1086, 1087, 1088, 1089, 1090, 1091, 1092, 1093, 1094, 1095, 1096,
            1097, 1098, 1099, 1100, 1102, 1104, 1105, 1106, 1107, 1108, 1110, 1111, 1112, 1113, 1114, 1117,
            1119, 1121, 1122, 1123, 1124, 1126, 1130, 1131, 1132, 1137, 1138, 1141, 1145, 1147, 1148, 1149,
            1151, 1152, 1154, 1163, 1164, 1165, 1166, 1169, 1174, 1175, 1183, 1185, 1186, 1187, 1192, 1198,
            1199, 1201, 1213, 1216, 1217, 1218, 1233, 1234, 1236, 1244, 1247, 1248, 1259, 1271, 1272, 1277,
            1287, 1296, 1300, 1301, 1309, 1310, 1311, 1322, 1328, 1334, 1352, 1417, 1433, 1434, 1443, 1455,
            1461, 1494, 1500, 1501, 1503, 1521, 1524, 1533, 1556, 1580, 1583, 1594, 1600, 1641, 1658, 1666,
            1687, 1688, 1700, 1717, 1718, 1719, 1720, 1721, 1723, 1755, 1761, 1782, 1783, 1801, 1805, 1812,
            1839, 1840, 1862, 1863, 1864, 1875, 1900, 1914, 1935, 1947, 1971, 1972, 1974, 1984, 1998, 1999,
            2000, 2001, 2002, 2003, 2004, 2005, 2006, 2007, 2008, 2009, 2010, 2013, 2020, 2021, 2022, 2030,
            2033, 2034, 2035, 2038, 2040, 2041, 2042, 2043, 2045, 2046, 2047, 2048, 2049, 2065, 2068, 2099,
            2100, 2103, 2105, 2106, 2107, 2111, 2119, 2121, 2126, 2135, 2144, 2160, 2161, 2170, 2179, 2190,
            2191, 2196, 2200, 2222, 2251, 2260, 2288, 2301, 2323, 2366, 2381, 2382, 2383, 2393, 2394, 2399,
            2401, 2492, 2500, 2522, 2525, 2557, 2601, 2602, 2604, 2605, 2607, 2608, 2638, 2701, 2702, 2710,
            2717, 2718, 2725, 2800, 2809, 2811, 2869, 2875, 2909, 2910, 2920, 2967, 2968, 2998, 3000, 3001,
            3003, 3005, 3006, 3007, 3011, 3013, 3017, 3030, 3031, 3052, 3071, 3077, 3128, 3168, 3211, 3221,
            3260, 3261, 3268, 3269, 3283, 3300, 3301, 3306, 3322, 3323, 3324, 3325, 3333, 3351, 3367, 3369,
            3370, 3371, 3372, 3389, 3390, 3404, 3476, 3493, 3517, 3527, 3546, 3551, 3580, 3659, 3689, 3690,
            3703, 3737, 3766, 3784, 3800, 3801, 3809, 3814, 3826, 3827, 3828, 3851, 3869, 3871, 3878, 3880,
            3889, 3905, 3914, 3918, 3920, 3945, 3971, 3986, 3995, 3998, 4000, 4001, 4002, 4003, 4004, 4005,
            4006, 4045, 4111, 4125, 4126, 4129, 4224, 4242, 4279, 4321, 4343, 4443, 4444, 4445, 4446, 4449,
            4550, 4567, 4662, 4848, 4899, 4900, 4998, 5000, 5001, 5002, 5003, 5004, 5009, 5030, 5033, 5050,
            5051, 5054, 5060, 5061, 5080, 5087, 5100, 5101, 5102, 5120, 5190, 5200, 5214, 5221, 5222, 5225,
            5226, 5269, 5280, 5298, 5357, 5405, 5414, 5431, 5432, 5440, 5500, 5510, 5544, 5550, 5555, 5560,
            5566, 5631, 5633, 5666, 5678, 5679, 5718, 5730, 5800, 5801, 5802, 5810, 5811, 5815, 5822, 5825,
            5850, 5859, 5862, 5877, 5900, 5901, 5902, 5903, 5904, 5906, 5907, 5910, 5911, 5915, 5922, 5925,
            5950, 5952, 5959, 5960, 5961, 5962, 5963, 5987, 5988, 5989, 5998, 5999, 6000, 6001, 6002, 6003,
            6004, 6005, 6006, 6007, 6009, 6025, 6059, 6100, 6101, 6106, 6112, 6123, 6129, 6156, 6346, 6389,
            6502, 6510, 6543, 6547, 6565, 6566, 6567, 6580, 6646, 6666, 6667, 6668, 6669, 6689, 6692, 6699,
            6779, 6788, 6789, 6792, 6839, 6881, 6901, 6969, 7000, 7001, 7002, 7004, 7007, 7019, 7025, 7070,
            7100, 7103, 7106, 7200, 7201, 7402, 7435, 7443, 7496, 7512, 7625, 7627, 7676, 7741, 7777, 7778,
            7800, 7911, 7920, 7921, 7937, 7938, 7999, 8000, 8001, 8002, 8007, 8008, 8009, 8010, 8011, 8021,
            8022, 8031, 8042, 8045, 8080, 8081, 8082, 8083, 8084, 8085, 8086, 8087, 8088, 8089, 8090, 8093,
            8099, 8100, 8180, 8181, 8192, 8193, 8194, 8200, 8222, 8254, 8290, 8291, 8292, 8300, 8333, 8383,
            8400, 8402, 8443, 8500, 8600, 8649, 8651, 8652, 8654, 8701, 8800, 8873, 8888, 8899, 8994, 9000,
            9001, 9002, 9003, 9009, 9010, 9011, 9040, 9050, 9071, 9080, 9081, 9090, 9091, 9099, 9100, 9101,
            9102, 9103, 9110, 9111, 9200, 9207, 9220, 9290, 9415, 9418, 9485, 9500, 9502, 9503, 9535, 9575,
            9593, 9594, 9595, 9618, 9666, 9876, 9877, 9878, 9898, 9900, 9917, 9929, 9943, 9944, 9968, 9998,
            9999, 10000, 10001, 10002, 10003, 10004, 10009, 10010, 10012, 10024, 10025, 10082, 10180, 10215,
            10243, 10566, 10616, 10617, 10621, 10626, 10628, 10629, 10778, 11110, 11111, 11967, 12000, 12174,
            12265, 12345, 13456, 13722, 13782, 13783, 14000, 14238, 14441, 14442, 15000, 15002, 15003, 15004,
            15660, 15742, 16000, 16001, 16012, 16016, 16018, 16080, 16113, 16992, 16993, 17877, 17988, 18040,
            18101, 18988, 19101, 19283, 19315, 19350, 19780, 19801, 19842, 20000, 20005, 20031, 20221, 20222,
            20828, 21571, 22939, 23502, 24444, 24800, 25734, 25735, 26214, 27000, 27352, 27353, 27355, 27356,
            27715, 28201, 30000, 30718, 30951, 31038, 31337, 32768, 32769, 32770, 32771, 32772, 32773, 32774,
            32775, 32776, 32777, 32778, 32779, 32780, 32781, 32782, 32783, 32784, 32785, 33354, 33899, 34571,
            34572, 34573, 35500, 38292, 40193, 40911, 41511, 42510, 44176, 44442, 44443, 44501, 45100, 48080,
            49152, 49153, 49154, 49155, 49156, 49157, 49158, 49159, 49160, 49161, 49163, 49165, 49167, 49175,
            49176, 49400, 49999, 50000, 50001, 50002, 50003, 50006, 50300, 50389, 50500, 50636, 50800, 51103,
            51493, 52673, 52822, 52848, 52869, 54045, 54328, 55055, 55056, 55555, 55600, 56737, 56738, 57294,
            57797, 58080, 60020, 60443, 61532, 61900, 62078, 63331, 64623, 64680, 65000, 65129, 65389]

result = dict()


def create_base():
    """
    Создаем базу данных sqlite
    """
    conn = sqlite3.connect('checked_ip.db')
    cur = conn.cursor()
    cur.execute("""CREATE TABLE IF NOT EXISTS check_ip(
                   ip TEXT,
                   port INT,
                   service TEXT,
                   status TEXT);
                """)
    conn.commit()
    cur.close()
    conn.close()


def case_sense(resp) -> dict:
    """
    Переводим данные json из заголовка в нижний регистр.

    :param resp: json с данными заголовка.
    :return: словарь с данными переведенными в нижний регистр.
    """
    resp_low = dict()
    for res in resp:
        resp_low.update({res.lower(): resp[res].lower()})
    return resp_low


def get_headers(ip: str, pt: str, proto: str):
    """
    Попытка получения заголовков, если обнаружены открытые 80 или 443 порты.

    :param ip: строка, ip-адрес.
    :param pt: строка, порт.
    :param proto: строка, протокол.
    """
    global result

    headers = {
        "user-agent": "Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/107.0.0.0 "
                      "Safari/537.36 "
    }

    try:
        head = case_sense(requests.head(url=f"{proto}://{ip}:{pt}", headers=headers, verify=False, timeout=3).headers)
        result[ip].update({f"{pt}/{proto}": head.get('location')}) if head.get('location') \
            else result[ip].update({f"{pt}/{proto}": head})
    except Exception:
        return


def scan_port(ip: str, pt: int, proto: str):
    """
    Попытка получения баннера с открытого порта.

    :param ip: строка, ip-адрес.
    :param pt: целое число, порт.
    :param proto: строка, сервис.
    """
    global result
    ans, s = "", ""
    try:
        s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
        s.settimeout(2)
        if s.connect_ex((ip, pt)) == 0:
            ans = s.recv(1024)
            result[ip].update({f"{pt}/{proto}": str(ans.decode()).strip()}) if str(ans.decode().strip()) != "" \
                else result[ip].update({f"{pt}/{proto}": "open"})
        s.close()
    except UnicodeDecodeError:
        s.close()
        result[ip].update({f"{pt}/{proto}": str(ans).replace("b'", "").replace("'", "").replace('b"', "").
                          replace('"', "")})
    except (socket.timeout, ConnectionRefusedError, OSError):
        s.close()
        return


def thread_func(ip: str):
    """
    Запуск потоков для сканирования открытых портов.

    :param ip: строка, ip-адрес.
    """
    global result
    threads = []

    for rs in result:
        for pt in result[rs]:
            t = threading.Thread(target=scan_port, kwargs={'ip': ip, 'pt': int(pt.split("/")[0]),
                                                           "proto": pt.split("/")[1]}, daemon=True)
            t.start()
            threads.append(t)

    for thread in threads:
        thread.join()


def syn_ack_scan(ip: str):
    """
    Попытка сканирования портов на определенном ip-адресе.

    :param ip: строка, ip-адрес.
    """
    global ports, result

    request_syn = sc.IP(dst=ip) / sc.TCP(dport=ports, flags="S")
    answer = sc.sr(request_syn, timeout=2, retry=1, verbose=False)[0]

    for _, receive in answer:
        try:
            if receive['TCP'].flags == "SA":
                if ip not in result:
                    result[ip] = dict()
                result[ip].update({f"{str(receive['TCP'].sport)}/"
                                   f"{str(sc.TCP_SERVICES[receive['TCP'].sport])}": "open"})
        except KeyError:
            result[ip].update({f"{str(receive['TCP'].sport)}/undefined": "open"})
        except IndexError:
            continue


def insert_base(ip: str):
    """
    Сохранение информации в БД sqlite.

    :param ip: строка, ip-адрес.
    """
    global result

    con_ins = sqlite3.connect('checked_ip.db')
    cur_ins = con_ins.cursor()

    if result:
        table_list = []
        for res in result:
            print(f"\nИнформация об адресе: {res}\n{'-'*65}")
            table_list = [(res, int(port.split("/")[0]), str(port.split("/")[1]), str(result[res][port]))
                          for port in result[res]]
            for st in result[res]:
                print(f"Порт: {st:20} | Статус: {result[res][st]}")
        cur_ins.executemany("INSERT INTO check_ip VALUES(?, ?, ?, ?);", table_list)
        con_ins.commit()
    else:
        cur_ins.execute("INSERT INTO check_ip VALUES(?, ?, ?, ?);", (ip, 0, "no data", "no data"))
        con_ins.commit()
    cur_ins.close()
    con_ins.close()
    result.clear()


def ip_rand(path_dir: str):
    """
    Получение случайного ip-адреса. Запуск функций сканирования портов,
    получения баннеров и заголовков.

    :param path_dir: строка, путь к файлу с CIDR провайдеров.
    """
    while KeyboardInterrupt:
        with open(path_dir, 'r', encoding='utf-8') as file:
            cidr = random.choice(file.readlines()).strip()
            ip = random.choice([str(x) for x in ip_network(cidr)])

            con_check = sqlite3.connect('checked_ip.db')
            cur_check = con_check.cursor()
            sel = """SELECT ip FROM check_ip WHERE ip = ?"""
            if cur_check.execute(sel, (ip,)).fetchone() is not None:
                continue
            else:
                cur_check.close()
                con_check.close()
                start = time.monotonic()
                print(f"\nСканирую: {ip}")
                syn_ack_scan(ip)

                if result:
                    thread_func(ip)
                    for res in result:
                        for port in result[res]:
                            if port.split("/")[0] == "80" or port.split("/")[0] == "443":
                                get_headers(ip, port.split("/")[0], port.split("/")[1])

                    insert_base(ip)
                    print(f"\nВремя сканирования: {time.monotonic() - start:.2f} c.\n{'-' * 65}")
                else:
                    print(f"No result\nВремя сканирования: {time.monotonic() - start:.2f} c.\n{'-' * 65}")
                    insert_base(ip)


def main():
    """
    Проверка существования файла с CIDR провайдеров. Запуск функции
    получения случайного ip-адреса: ip_rand.
    """
    if not (Path.cwd() / "checked_ip.db").exists():
        create_base()
    ip_rand("provider_ranges.txt")


if __name__ == "__main__":
    main()

Так выглядит сканирование в терминале:

01.png


А так данные в БД:

02.png
 

Вложения

@Johan Van заведи ты уже себе Github.
1. это проще чем кидать код в архиве
2. при изменении или правках это легче отслеживать и не надо перезаливать архивы.
3. все будет в одном месте и удобно каталогизировано
4. обретёшь базовые знания по гиту, что упростит тебе жизнь и разработку
 
  • Нравится
Реакции: dream to perfection
Мы в соцсетях:

Обучение наступательной кибербезопасности в игровой форме. Начать игру!