Статья Сканируем случайные адреса Интернет. Получаем баннеры и протоколы с помощью Python

О каждой из этих тем по отдельности я писал в этой: баннеры, и этой: сканирование, статьях. Но давайте попробуем объединить два этих скрипта, чтобы использовать для сканирования адресов возможности scapy, а также получать баннеры с открытых портов с помощью socket.

000.png

Дисклеймер: Все данные, предоставленные в данной статье, взяты из открытых источников, не призывают к действию и являются только лишь данными для ознакомления, и изучения механизмов используемых технологий.

На самом деле это тема в общем-то не нова. И никто не мешает вам для сканирования сети взять nmap и пользоваться им напрямую или использовать его в скрипте python. Но гораздо интереснее попробовать сделать что-то, что будет выполнять похожие функции, самостоятельно. Тем более, такого тщательного сканирования в данном случае просто не требуется.

Попробую пояснить свою мысль. Уж не знаю, почему, но мне в последнее время стало интересно сканировать те адреса в интернете, которые, скорее всего, не находятся в индексе Google, Яндекс или еще какой поисковой системы. Просто в силу того, что на данные страницы не ведет ни одна ссылка в интернете. Но ведь за данными адресами могут скрываться как сайты, так и просто устройства (роутеры, маршрутизаторы, ip-камеры и т.д.). То есть, в простом понимании, позаниматься «нетсталкингом». Что это такое, можете сами поискать в .

Тут же сразу возник вопрос: "А какие адреса сканировать и где их брать?". Этот вопрос я решил. Не знаю, насколько полно это получилось, но мне удалось собрать ip-диапазоны сетей провайдеров (CIDR) по всему миру. Всех или нет, это уже другой вопрос. Однако, того что удалось накопать уже хватит на многие месяцы сканирования. Файл с адресами я добавлю во вложение.

Таким образом, для того, чтобы не сканировать уныло каждый адрес один за другим, мы будем случайным образом выбирать диапазон, также случайно выбирать адрес и пытаться получить по нему данные. Ну и если сканирование пройдет успешно и мы найдем хоть один живой порт, запишем эти данные в json, чтобы при необходимости можно было использовать его в каких-то других проектах.

Достаточно теории. Давайте перейдем к коду.


Блок импорта

Перед тем, как начать писать код необходимо установить сторонние библиотеки, которые потребуются в процессе его работы. Это scapy и requests. Для их установки набираем в терминале следующую команду:

pip install requests
pip install --pre scapy[basic]

Все остальные библиотеки, которые используются в скрипте, являются библиотеками «из коробки».
Итак, после того, как нужные библиотеки будут установлены, нужно импортировать все что необходимо в проект:

Python:
import json
import random
import socket
import sys
from datetime import datetime as dt
from ipaddress import ip_network
from os import getuid
from pathlib import Path
from platform import system

import requests
import scapy.all as sc

Создадим переменную headers и запишем в нее словарь с user_agent. Он нам понадобиться при использовании библиотеки requests.

Python:
headers = {
    "user-agent": "Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/107.0.0.0 "
                  "Safari/537.36 "
}

Теперь, в глобальном пространстве скрипта прописываем две строчки:

Python:
requests.packages.urllib3.disable_warnings()
random.seed(dt.now().second + random.randint(0, 101))

result = dict()

Первая нужна для того, чтобы подавить warning от библиотеки requests при использовании параметра verify=False. Вторая служит для того, чтобы сделать наш рандомный выбор хоть чуточку более рандомным.
Ну и создаем глобальную переменную result, в которой и будет содержаться словарь с найденными портами, баннерами и заголовками, и который впоследствии будет записываться в json.


Вспомогательные функции

Для того, чтобы скрипт был более продвинутым и не сканировал одни и те же адреса по нескольку раз, когда они вдруг будут выбраны в рандоме, нужно создать функцию, которая будет проверять наличие данных адресов в файле, куда будет сохраняться каждый просканированный адрес. Также нужно создать функцию, которая будет сохранять просканированные адреса в файл. И функцию, которая будет сохранять полученные данные из словаря result, если они там, конечно же есть. Помимо этого, была обнаружена особенность сохранения в json (раньше просто не сталкивался). При получении заголовков с адреса и попытке записи в json данных из словаря, выкидывалось исключение, что в данном случае, при записи регистр имеет значение. А значит, требуется также функция, которая будет переводить все полученные ключи и значения заголовков в нижний регистр.

Ну вот, вроде бы ничего не забыл. Давайте приступим к их созданию.


Перевод словаря в нижний регистр

Для начала создадим функцию case_sence(resp: dict) -> dict. С ее помощью мы будем переводить данные словаря в нижний регистр. На входе она получает словарь с данными, а на выходе возвращает уже словарь с теми же данными, но переведенными в нижний регистр.

Создаем временный словарь, куда будем складывать переведенные в нижний регистр значения — resp_low. В цикле пробегаемся по каждому ключу и значению словаря, переводим в нижний регистр, записываем данные во временный словарь и возвращаем из функции.

Python:
def case_sence(resp: dict) -> dict:
    """
    Перевод регистра ключей и значений в нижний диапазон,
    так как запись в верхнем регистре в json невозможна.

    :param resp: словарь с данными для перевода.
    :return: словарь с переведенными данными.
    """
    resp_low = dict()
    for res in resp:
        resp_low.update({res.lower(): resp[res].lower()})
    return resp_low


Сохранение json с полученными данными из словаря result

Создадим функцию save_json(addr). На вход она получает текущий ip-адрес по которому проводилось сканирование. Так как json-ы мы будем складывать в отдельную папку, создаем ее с помощью функции mkdir библиотеки pathlib. Она позволяет без особых проверок в случае отсутствия директории ее создать. Если же директория существует, вызывается исключение, которое просто игнорируется с помощью указания явного параметра exist_ok=True. Получаем текущую дату. Проверяем длину словаря. Если она больше нуля, значит какие-то данные были получены. Открываем файл на перезапись, в качестве имени файла указываем текущую дату и ip-адрес, который передается в функцию. Записываем json и затем, чтобы пользователь видел, что что-то происходит, выводим из этого же словаря содержащиеся в нем данные.

Python:
def save_json(addr):
    """
    Сохранение данных из словаря с полученными значениями.

    Создаем директорию для сохранения json, если она не существует.
    Забираем дату в нужном формате. Проверяем, не является ли словарь result пустым.
    Записываем значения из словаря в файл json.
    Выводим информацию из словаря в терминал для пользователя.

    :param addr: ip-адрес, по которому выполнялось текущее сканирование.
    """
    (Path.cwd() / "checked_ip").mkdir(exist_ok=True)

    day = dt.now().strftime("%d-%m-%Y")
    if len(result) > 0:
        with open(Path.cwd() / "checked_ip" / f"{day}_{addr}.json", 'w', encoding='utf-8') as file:
            json.dump(result, file, indent=4, ensure_ascii=False)
        for res in result:
            print(f"\nИнформация об адресе: {res}\n{'-'*65}")
            for st in result[res]:
                print(f"Порт: {st:20} | Статус: {result[res][st]}")
        print(f"{'-'*65}")


Сохраняем просканированный ip-адрес

Вне зависимости от того, удалось ли получить по данному ip данные или нет, для того, чтобы избежать его повторного сканирования, дописываем его в текстовый файл. Создадим функцию save_ip(ip). На вход она получает адрес, который нужно сохранить. Ну, а дальше все просто.

Отрываем файл на дозапись, и записываем в него строку с адресом, после чего переводим каретку на следующую строку, для следующего адреса.

Python:
def save_ip(ip):
    """
    Сохраняет текущие ip-адреса в текстовый документ, для
    последующей проверки по ним, перед сканированием, чтобы
    избежать повторного сканирования.

    Открываем файл, дописываем адрес в конец файла.
    :param ip: ip-адрес для записи в файл.
    """
    with open(Path.cwd() / "used_ip.txt", 'a', encoding='utf-8') as file:
        file.write(f"{ip}\n")


Проверка текущего ip-адреса на наличие в уже просканированных адресах

Для того, чтобы не сканировать одни и те же адреса повторно, нужно проверить каждый текущий адрес на наличие его в файле, куда мы предварительно записали уже отсканированные адреса.

Создадим функцию check_ip(ip). На вход она получает адрес который нужно проверить.

Проверяем, существует ли файл с отсканированными адресами. Его ведь может и не быть, если это наше первое сканирование. Если файл есть, открываем его на чтение и создаем список из ip-адресов в нем содержащихся. После этого проверяем, есть ли наш ip в получившемся списке. Если есть, возвращаем True. Нет — False.

Python:
def check_ip(ip):
    """
    Проверка ip-адреса на нахождение в списке уже сканированных адресов.

    Проверяем, существует ли файл. Открываем, создаем список из адресов
    и проверяем вхождение текущего адреса в сформированный список.
    Если есть, возвращаем True, нет - False.
    :param ip: строка, ip-адрес для проверки.
    :return: булево значение. True или False в зависимости от результата.
    """
    if (Path.cwd() / "used_ip.txt").exists():
        with open(Path.cwd() / "used_ip.txt", 'r', encoding='utf-8') as file:
            lines = [x.strip() for x in file.readlines()]
            if ip in lines:
                return True
    return False


Получение случайного ip-адреса

Перед тем, как сканировать какой-либо адрес, его еще нужно получить. А потому, создадим функцию для его получения ip_from_range(path_dir). В нее мы будем передавать путь к файлу с диапазонами ip-адресов (CIDR) провайдеров.

Запускаем бесконечный цикл. В нем отрываем файл с CIDR, создаем из его строк список и с помощью random.choice выбираем случайный элемент. Затем, создаем список ip-адресов, которые входят в этот диапазон и также с помощью random.choice выбираем случайный адрес.

Теперь передаем его в функцию check_ip для проверки наличия в уже сканированных адресах. Если есть, итерируемся снова. Нет, двигаемся дальше. Выводим информацию о сканируемом адресе. Передаем его в функцию сканирования портов и получения данных syn_ack_scan, куда помимо адреса передается кортеж с диапазоном портов для сканирования. Здесь, в данном коде, они жестко определены. Но ничто не мешает вам сделать их запрос у пользователя или выставить свой диапазон.

Python:
def ip_from_range(path_dir):
    """
    Получение случайного ip-адреса из файла с CIDR.

    Запускаем бесконечный цикл. Открываем файл с диапазонами
    cidr провайдеров. Выбираем случайный cidr, выбираем случайный ip.
    Проверяем, не использовался ли этот адрес ранее.
    Если нет, запускаем сканирование адреса. После, если словарь с результатом
    не пуст, сохраняем полученные данные. Если пуст, выводим сообщение пользователю,
    что нет результата. Вне зависимости от наполненности словаря сохраняем текущий ip,
    после чего очищаем словарь.
    :param path_dir: строка, путь к файлу с диапазонами (cidr).
    """
    global result
    while True:
        with open(path_dir, 'r', encoding='utf-8') as file:
            cidr = random.choice([x.strip() for x in file.readlines()])
            ip = random.choice([str(x) for x in ip_network(cidr)])
            if check_ip(ip):
                continue
            print(f"\nСканирую: {ip}")
            syn_ack_scan(ip, (1, 1001))
            if len(result) > 0:
                save_json(ip)
            else:
                print(f"No result\n{'-' * 65}")
            save_ip(ip)
            result.clear()

Затем проверяем, не является ли словарь с данными пустым. Если он пуст, выводим в терминал сообщение об отсутствии результатов. Если не пуст, запускаем функцию сохранения данных save_json, в которую передаем текущий ip-адрес.
И затем, вне зависимости от того, были или нет получены данные по адресу, сохраняем текущий ip в файл уже сканированных и очищаем глобальный словарь result, где и содержались данные по адресу. Очищаем словарь в любом случае.


Получение баннеров на определенном порту

Что же, здесь я не буду особо оригинален и возьму предыдущий свой код из статьи указанной в начале. Разница в нем будет только в том, что в случае удачного получения баннера значение не будет записываться в словарь, а возвращаться просто в виде полученной строки.

Создадим функцию scan_port(ip: str, port: int) -> (str, dict, bool). На вход она получает ip-адрес для сканирования и порт. А возвращает данные в зависимости от полученных результатов и вызванных исключений.

Создаем потоковый сокет на основе протокола TCP, который будет передавать и принимать информацию. Устанавливаем время жизни соединения, которое равно 5 секунд. В принципе, данного времени должно хватить. Но, можно поэкспериментировать со значением. Устанавливаем соединение по переданным в функцию адресу и порту. Выполняем попытку получить баннер. Если баннер получен, пытаемся его декодировать, а также обрабатываем случай, когда декодированное значение является пустым. Если баннер не пустая строка, возвращаем полученное значение из функции. Обрабатываем различные ошибки при декодировании и установке соединения. А также ошибку при получении данных.

Python:
def scan_port(ip: str, port: int) -> (str, dict, bool):
    """
    Получение банера на отрытом потру.

    Выполняется подключение к ip-адресу на полученный
    порт. Если подключение удалось, значит порт открыт. Производиться
    попытка получить баннер. Если баннер не получен, обрабатывается
    исключение, в котором идет запрос службы на порту у функции за
    пределами класса. Если получить службу не удалось, возвращается
    unassigned. Полученные значения добавляются в словари для
    последующей обработки. В словарь с баннерами добавляются значения,
    если баннер получен. В остальных случаях, в словарь открытых портов.

    :param ip: ip-адрес или домен для сканирования.
    :param port: Номер порта для сканирования. Целое число
    :return: Возвращается False. Служит для выхода из функции
    """
    s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
    s.settimeout(5)

    try:
        s.connect((ip, port))
        try:
            banner = s.recv(1024).decode().strip()
            if banner == '':
                return False
            else:
                return banner
        except OSError:
            return False
        except UnicodeDecodeError:
            banner = s.recv(1024).strip()
            return banner
    except (socket.timeout, ConnectionRefusedError, OSError):
        return False


Сканирование портов с помощью scapy запросами syn — ack

А теперь создадим основную функцию, с помощью которой будем сканировать порты на полученном в функцию адресе. Создадим функцию syn_ack_scan(ip: str, ports: tuple). На вход она получает ip-адрес, и кортеж из диапазона портов. На самом деле, scapy умеет сам итерироваться по диапазону CIDR. Но, так как количество адресов в диапазоне может быть довольно большим, занимать это будет много времени. Потому, практичнее все же передавать в функцию один ip-адрес. Также у scapy есть возможность сканировать не диапазон, а только один порт. Но, в нашем случае передается кортеж. И для того, чтобы просканировать только один адрес, нужно в кортеже передать два одинаковых значения, то есть (80, 80), для примера.

Создаем пакет для сканирования. Выполняем запрос с установленным таймаутом и отключенным выводом в терминал. Забираем ответ и итерируемся по нему в цикле.

Так как результат сканирования это кортеж, то его можно распарсить на две переменные. В нашем случае это будут полученные и ошибочные ответы. Так как ошибочные ответы нас не интересуют, забираем только полученные в переменную receive. Выполняем проверку, если флаг в ответе содержит SA, что означает отрытый порт, так как от него получен ответ, создаем ключ в словаре с текущим ip-адресом.

Делаем проверку, если порт равен 80 или 443, с помощью requests выполняем запрос на наличие на той стороне сайта или страницы. То есть, пытаемся получить заголовки. Если получается, добавляем порт, протокол и полученные заголовки в словарь. Если не получиться, просто порт, протокол и то, что порт открыт.

В случае с другими портами делаем попытку получить баннер. Если баннер получен, добавляем порт, протокол и значение баннера в словарь. Если баннер не получен, добавляем порт, протокол и значение, что этот порт открыт с словарь.

И да, если заголовки были получены, делаем попытку получить из них ключ «location», который может указывать на сайт при переадресации с ip-адреса.

Python:
def syn_ack_scan(ip: str, ports: tuple):
    # создание пакета для сканирования
    try:
        request_syn = sc.IP(dst=ip) / sc.TCP(dport=ports, flags="S")
    except socket.gaierror:
        raise ValueError(f'{ip} получить не удалось')
    answer = sc.sr(request_syn, timeout=2, retry=1, verbose=False)[0]  # отправка пакета

    for _, receive in answer:
        try:
            if receive['TCP'].flags == "SA":
                try:
                    if ip not in result:
                        result[ip] = dict()
                    if receive['TCP'].sport == 80:
                        try:
                            head = requests.head(url=f"https://{ip}:80", headers=headers, verify=False,
                                                 timeout=3).headers
                            try:
                                result[ip].update({'80/http': case_sence(head)['location']})
                            except Exception:
                                result[ip].update({'80/http': case_sence(head)})
                        except Exception:
                            result[ip].update({"80/http": "open"})
                    elif receive['TCP'].sport == 443:
                        try:
                            head = requests.head(url=f"https://{ip}:443", headers=headers, verify=False,
                                                 timeout=3).headers
                            try:
                                result[ip].update({"443/http": case_sence(head)['location']})
                            except Exception:
                                result[ip].update({"443/http": case_sence(head)})
                        except Exception:
                            result[ip].update({"443/http": "open"})
                    else:
                        resp = scan_port(ip=ip, port=receive['TCP'].sport)
                        if not resp:
                            result[ip].update({f"{str(receive['TCP'].sport)}/"
                                               f"{sc.TCP_SERVICES[receive['TCP'].sport]}": "open"})
                        else:
                            result[ip].update({f"{str(receive['TCP'].sport)}/"
                                               f"{sc.TCP_SERVICES[receive['TCP'].sport]}": resp})
                except KeyError:
                    result[ip].update({f"{str(receive['TCP'].sport)}/undefined": "open"})
        except IndexError:
            continue

Вот, это если вкратце. Ну и обрабатываем различные исключения.


Запуск сканирования адресов

И еще один небольшой момент, это запуск сканирования адресов. Так как скрипт использует scapy, то и для работы он требует запуска из под «sudo». Потому проверяем в самом начале, как он запущен. Если не от имени суперпользователя, сообщаем об этом и выходим из скрипта.

Если же все хорошо, запускаем функцию рандомного получения адресов, куда передаем путь к файлу с CIDR.

Python:
def main():
    """
    Запуск сканирования адресов.
    """
    if system() == "Linux":
        if not getuid() == 0:
            print("\n [+] Run the script as root user!")
            sys.exit(0)

    ip_from_range("provider_ranges.txt")


if __name__ == "__main__":
    main()

Python:
"""
Скрипт для сканирования ip-адресов выбранных случайно из
списка CIDR провайдеров.

Для работы скрипта требуется установка библиотек:
pip install --pre scapy[basic] requests
"""

import json
import random
import socket
import sys
from datetime import datetime as dt
from ipaddress import ip_network
from os import getuid
from pathlib import Path
from platform import system

import requests
import scapy.all as sc

headers = {
    "user-agent": "Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/107.0.0.0 "
                  "Safari/537.36 "
}

requests.packages.urllib3.disable_warnings()
random.seed(dt.now().second + random.randint(0, 101))

result = dict()


def scan_port(ip: str, port: int) -> (str, dict, bool):
    """
    Получение банера на отрытом потру.

    Выполняется подключение к ip-адресу на полученный
    порт. Если подключение удалось, значит порт открыт. Производиться
    попытка получить баннер. Если баннер не получен, обрабатывается
    исключение, в котором идет запрос службы на порту у функции за
    пределами класса. Если получить службу не удалось, возвращается
    unassigned. Полученные значения добавляются в словари для
    последующей обработки. В словарь с баннерами добавляются значения,
    если баннер получен. В остальных случаях, в словарь открытых портов.

    :param ip: ip-адрес или домен для сканирования.
    :param port: Номер порта для сканирования. Целое число
    :return: Возвращается False. Служит для выхода из функции
    """
    s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
    s.settimeout(5)

    try:
        s.connect((ip, port))
        try:
            banner = s.recv(1024).decode().strip()
            if banner == '':
                return False
            else:
                return banner
        except OSError:
            return False
        except UnicodeDecodeError:
            banner = s.recv(1024).strip()
            return banner
    except (socket.timeout, ConnectionRefusedError, OSError):
        return False


def syn_ack_scan(ip: str, ports: tuple):
    """
    Сканирование портов в переданном диапазоне на переданном ip-адресе.

    Выполняется сканирование портов в указанном диапазоне. Если есть ответ
    по текущему порту, выполняется попытка получения банера, если это не 80 и 443 порты.
    Если это 80 или 443 порт, пытаемся получить заголовки сайта, который возможно
    хостится на данном сервере по-данному ip.
    Дабавляем все полученные данные в глобальный словарь.

    :param ip: строка, ip-адрес цели.
    :param ports: кортеж из портов для сканирования.
    """
    # создание пакета для сканирования
    try:
        request_syn = sc.IP(dst=ip) / sc.TCP(dport=ports, flags="S")
    except socket.gaierror:
        raise ValueError(f'{ip} получить не удалось')
    answer = sc.sr(request_syn, timeout=2, retry=1, verbose=False)[0]  # отправка пакета

    for _, receive in answer:
        try:
            if receive['TCP'].flags == "SA":
                try:
                    print(receive['TCP'].sport)
                    if ip not in result:
                        result[ip] = dict()
                    if receive['TCP'].sport == 80:
                        try:
                            head = requests.head(url=f"https://{ip}:80", headers=headers, verify=False,
                                                 timeout=3).headers
                            try:
                                result[ip].update({'80/http': case_sence(head)['location']})
                            except Exception:
                                result[ip].update({'80/http': case_sence(head)})
                        except Exception:
                            result[ip].update({"80/http": "open"})
                    elif receive['TCP'].sport == 443:
                        try:
                            head = requests.head(url=f"https://{ip}:443", headers=headers, verify=False,
                                                 timeout=3).headers
                            try:
                                result[ip].update({"443/http": case_sence(head)['location']})
                            except Exception:
                                result[ip].update({"443/http": case_sence(head)})
                        except Exception:
                            result[ip].update({"443/http": "open"})
                    else:
                        resp = scan_port(ip=ip, port=receive['TCP'].sport)
                        if not resp:
                            result[ip].update({f"{str(receive['TCP'].sport)}/"
                                               f"{sc.TCP_SERVICES[receive['TCP'].sport]}": "open"})
                        else:
                            result[ip].update({f"{str(receive['TCP'].sport)}/"
                                               f"{sc.TCP_SERVICES[receive['TCP'].sport]}": resp})
                except KeyError:
                    result[ip].update({f"{str(receive['TCP'].sport)}/undefined": "open"})
        except IndexError:
            continue


def case_sence(resp: dict) -> dict:
    """
    Перевод регистра ключей и значений в нижний диапазон,
    так как запись в верхнем регистре в json невозможна.

    :param resp: словарь с данными для перевода.
    :return: словарь с переведенными данными.
    """
    resp_low = dict()
    for res in resp:
        resp_low.update({res.lower(): resp[res].lower()})
    return resp_low


def save_json(addr):
    """
    Сохранение данных из словаря с полученными значениями.

    Создаем директорию для сохранения json, если она не существует.
    Забираем дату в нужном формате. Проверяем, не является ли словарь result пустым.
    Записываем значения из словаря в файл json.
    Выводим информацию из словаря в терминал для пользователя.

    :param addr: ip-адрес, по которому выполнялось текущее сканирование.
    """
    (Path.cwd() / "checked_ip").mkdir(exist_ok=True)

    day = dt.now().strftime("%d-%m-%Y")
    if len(result) > 0:
        with open(Path.cwd() / "checked_ip" / f"{day}_{addr}.json", 'w', encoding='utf-8') as file:
            json.dump(result, file, indent=4, ensure_ascii=False)
        for res in result:
            print(f"\nИнформация об адресе: {res}\n{'-'*65}")
            for st in result[res]:
                print(f"Порт: {st:20} | Статус: {result[res][st]}")
        print(f"{'-'*65}")


def save_ip(ip):
    """
    Сохраняет текущие ip-адреса в текстовый документ, для
    последующей проверки по ним, перед сканированием, чтобы
    избежать повторного сканирования.

    Открываем файл, дописываем адрес в конец файла.
    :param ip: ip-адрес для записи в файл.
    """
    with open(Path.cwd() / "used_ip.txt", 'a', encoding='utf-8') as file:
        file.write(f"{ip}\n")


def check_ip(ip):
    """
    Проверка ip-адреса на нахождение в списке уже сканированных адресов.

    Проверяем, существует ли файл. Открываем, создаем список из адресов
    и проверяем вхождение текущего адреса в сформированный список.
    Если есть, возвращаем True, нет - False.
    :param ip: строка, ip-адрес для проверки.
    :return: булево значение. True или False в зависимости от результата.
    """
    if (Path.cwd() / "used_ip.txt").exists():
        with open(Path.cwd() / "used_ip.txt", 'r', encoding='utf-8') as file:
            lines = [x.strip() for x in file.readlines()]
            if ip in lines:
                return True
    return False


def ip_from_range(path_dir):
    """
    Получение случайного ip-адреса из файла с CIDR.

    Запускаем бесконечный цикл. Открываем файл с диапазонами
    cidr провайдеров. Выбираем случайный cidr, выбираем случайный ip.
    Проверяем, не использовался ли этот адрес ранее.
    Если нет, запускаем сканирование адреса. После, если словарь с результатом
    не пуст, сохраняем полученные данные. Если пуст, выводим сообщение пользователю,
    что нет результата. Вне зависимости от наполненности словаря сохраняем текущий ip,
    после чего очищаем словарь.
    :param path_dir: строка, путь к файлу с диапазонами (cidr).
    """
    global result
    while True:
        with open(path_dir, 'r', encoding='utf-8') as file:
            cidr = random.choice([x.strip() for x in file.readlines()])
            ip = random.choice([str(x) for x in ip_network(cidr)])
            if check_ip(ip):
                continue
            print(f"\nСканирую: {ip}")
            syn_ack_scan(ip, (1, 1001))
            if len(result) > 0:
                save_json(ip)
            else:
                print(f"No result\n{'-' * 65}")
            save_ip(ip)
            result.clear()


def main():
    """
    Запуск сканирования адресов.
    """
    if system() == "Linux":
        if not getuid() == 0:
            print("\n [+] Run the script as root user!")
            sys.exit(0)

    ip_from_range("provider_ranges.txt")


if __name__ == "__main__":
    main()

Ну и то, что сохраняет скрипт в директории:

01.png


А так выглядит json внутри:

02.png


А на этом, пожалуй, все.

Спасибо за внимание. Надеюсь, данная информация будет вам полезна

UPD:
Смотрите измененную версию скрипта в комментариях.
 

Вложения

  • syn_ack_rand.zip
    897,5 КБ · Просмотры: 122
Последнее редактирование модератором:
  • Нравится
Реакции: Dzen

f22

Codeby Academy
Gold Team
05.05.2019
1 928
226
BIT
1 696
Тут же сразу возник вопрос: "А какие адреса сканировать и где их брать?". Этот вопрос я решил. Не знаю, насколько полно это получилось, но мне удалось собрать ip-диапазоны сетей провайдеров (CIDR) по всему миру. Всех или нет, это уже другой вопрос. Однако, того что удалось накопать уже хватит на многие месяцы сканирования. Файл с адресами я добавлю во вложение.
из Мордора не открывается



Для их установки набираем в терминале следующую команду:

pip install --pre scapy[basic] requests
1669286095405.png



По коду могу сказать одно - вам есть к чему стремиться, а это скорее всего неплохо)

Создадим переменную headers и запишем в нее словарь с user_agent. Он нам понадобиться при использовании библиотеки requests.
Ну и для таких действий есть отличная генерации фэйковых/случайных юзерагентов


Достаточно просто
Python:
if result:
Пустой объект всегда возвращает False

Python:
open(Path.cwd() / "checked_ip" / f"{day}_{addr}.json"
А если скрипт будет выполняться в Windows? Получим портянку ошибок связанных с обращением к пути.
Для подобных операций в python есть отличный метод os.path.join
Он сам выбирает нужный тип слеша, в зависимости от системы.


Вне зависимости от того, удалось ли получить по данному ip данные или нет
Так а почему это не важно?
Я бы наоборот писал в один файл те адреса, у которых мы точно что-то получили, а в другой - те, у которых была ошибка или, которые недоступны.

Python:
            lines = [x.strip() for x in file.readlines()]
            if ip in lines:
                return True
условие if p in lines можно так же убрать в генератор списка и избавиться от переменной lines

Получение случайного ip-адреса
Python:
cidr = random.choice([x.strip() for x in file.readlines()])
Зачем тут цикл, если получить случайный элемент можно напрямую из readlines()
Python:
cidr = random.choice(file.readlines()).strip()
Аналогично и здесь
Python:
  ip = random.choice([str(x) for x in ip_network(cidr)])

Но сам цикл и программа будет безумно медленной, вы только представьте, сколько будет накладных расходов на чтение и запись только одного IP адреса?
Вам нужно прочитать 3 файла, отфильтровать данные и записать их.

Куда разумнее было бы сначала создать список (а лучше кортеж) и определённого случайного диапазона. а потом уже этот список обрабатывать. А фильтрацию производить не в момент сканирования, а до или после.
Python:
    try:
        s.connect((ip, port))
        try:
            banner = s.recv(1024).decode().strip()
            if banner == '':
                return False
            else:
                return banner
        except OSError:
            return False
        except UnicodeDecodeError:
            banner = s.recv(1024).strip()
            return banner
    except (socket.timeout, ConnectionRefusedError, OSError):
        return False
а зачем столько try? Убрать обработку кода можно в один блок, у вас же все исключения описаны.

Python:
            banner = s.recv(1024).decode().strip()
            if banner == '':
для таких конструкций в python3.8 ввели моржовый оператор.

Python:
            banner = s.recv(1024).strip()
            return banner
разумнее сразу возвращать результат, не тратя времени на создание переменной
Python:
            return s.recv(1024).strip()


Функция сканирования портов
Какая же странная функция:
1.
Python:
try:
    request_syn = sc.IP(dst=ip) / sc.TCP(dport=ports, flags="S")
except socket.gaierror:
    raise ValueError(f'{ip} получить не удалось')
answer = sc.sr(request_syn, timeout=2, retry=1, verbose=False)[0]  # отправка пакета
Не совсем понятно - если возникло исключение socket.gaierror, зачем продолжать выполнение кода?

2. Зачем столько блоков try?
Почитайте что такое код


Прочтите вот эту статью @explorer и попробуйте реализовать многопоточность, доработав оставшийся функционал, пока код, откровенно, сырой.
 

Johan Van

Green Team
13.06.2020
363
691
BIT
394
из Мордора не открывается




Посмотреть вложение 64644


По коду могу сказать одно - вам есть к чему стремиться, а это скорее всего неплохо)


Ну и для таких действий есть отличная генерации фэйковых/случайных юзерагентов



Достаточно просто
Python:
if result:
Пустой объект всегда возвращает False

Python:
open(Path.cwd() / "checked_ip" / f"{day}_{addr}.json"
А если скрипт будет выполняться в Windows? Получим портянку ошибок связанных с обращением к пути.
Для подобных операций в python есть отличный метод os.path.join
Он сам выбирает нужный тип слеша, в зависимости от системы.



Так а почему это не важно?
Я бы наоборот писал в один файл те адреса, у которых мы точно что-то получили, а в другой - те, у которых была ошибка или, которые недоступны.

Python:
            lines = [x.strip() for x in file.readlines()]
            if ip in lines:
                return True
условие if p in lines можно так же убрать в генератор списка и избавиться от переменной lines


Python:
cidr = random.choice([x.strip() for x in file.readlines()])
Зачем тут цикл, если получить случайный элемент можно напрямую из readlines()
Python:
cidr = random.choice(file.readlines()).strip()
Аналогично и здесь
Python:
  ip = random.choice([str(x) for x in ip_network(cidr)])

Но сам цикл и программа будет безумно медленной, вы только представьте, сколько будет накладных расходов на чтение и запись только одного IP адреса?
Вам нужно прочитать 3 файла, отфильтровать данные и записать их.

Куда разумнее было бы сначала создать список (а лучше кортеж) и определённого случайного диапазона. а потом уже этот список обрабатывать. А фильтрацию производить не в момент сканирования, а до или после.
Python:
    try:
        s.connect((ip, port))
        try:
            banner = s.recv(1024).decode().strip()
            if banner == '':
                return False
            else:
                return banner
        except OSError:
            return False
        except UnicodeDecodeError:
            banner = s.recv(1024).strip()
            return banner
    except (socket.timeout, ConnectionRefusedError, OSError):
        return False
а зачем столько try? Убрать обработку кода можно в один блок, у вас же все исключения описаны.

Python:
            banner = s.recv(1024).decode().strip()
            if banner == '':
для таких конструкций в python3.8 ввели моржовый оператор.

Python:
            banner = s.recv(1024).strip()
            return banner
разумнее сразу возвращать результат, не тратя времени на создание переменной
Python:
            return s.recv(1024).strip()


Функция сканирования портов
Какая же странная функция:
1.
Python:
try:
    request_syn = sc.IP(dst=ip) / sc.TCP(dport=ports, flags="S")
except socket.gaierror:
    raise ValueError(f'{ip} получить не удалось')
answer = sc.sr(request_syn, timeout=2, retry=1, verbose=False)[0]  # отправка пакета
Не совсем понятно - если возникло исключение socket.gaierror, зачем продолжать выполнение кода?

2. Зачем столько блоков try?
Почитайте что такое код


Прочтите вот эту статью @explorer и попробуйте реализовать многопоточность, доработав оставшийся функционал, пока код, откровенно, сырой.

Позвольте мне частично ответить на данные замечания. Частично, потому, что с некоторыми согласен. С некоторыми нет ))

По поводу установки: pip install –pre scapy[basic] requests ошибок не возникало ни на Windows, ни на Linux.
Это установка на Windows (для примера):

screenshot1.png


Из документации:

screenshot4.png


По поводу fake-useragent знаю. Но в данном случае считаю его использование излишним, так как мне не особо важно, чтобы заголовок менялся. Запросы делаются не на один сайт. А потому создавать видимость другого браузера нет необходимости. И еще у него иногда возникает исключение, в котором говориться, что ресурс недоступен. Так как код лежит на heroku. С ошибкой сталкивался лично.

По поводу путей. Модуль Path из библиотеки pathlib и был создан для того, чтобы заменить некоторые функции предоставляемые библиотекой os. В данном случае слеш служит не для разделения пути, а именно как оператор функции. В Windows путь будет выглядеть примерно так:

screenshot2.png


Примерно потому, что убрал дату и адрес.

По поводу записи адреса в файл, получили данные или нет. В данном случае записывается проверенный адрес для того, чтобы не проверять его в будущем, при последующем запуске скрипта. В данном случае не имеет значения, были ли ошибки в процессе получения данных или нет. Просто потому, что если данные не получены, я считаю этот адрес не рабочим. Отдельно записывать ошибочные адреса не вижу смысла, так как я итак сохраняю данные полученные с адресов в json. А если я буду сохранять файл с валидными адресами и с невалидными, то в этом случае мне придется при проверке адреса итерироваться уже по двум спискам? Но тут не знаю. Может быть я просто не понял вашу мысль.

По этому коду:

Python:
            lines = [x.strip() for x in file.readlines()]
            if ip in lines:
                return True

Здесь да, согласен. Можно действительно сделать так: return [x.strip() for x in file.readlines() if ip in lines]

По поводу получения случайного диапазона из readlines() – согласен.
По поводу этого: ip = random.choice([str(x) for x in ip_network(cidr)]) нет. Выбрасывает в исключение:

screenshot3.png


По этому коду:

Python:
    try:
        s.connect((ip, port))
        try:
            banner = s.recv(1024).decode().strip()
            if banner == '':
                return False
            else:
                return banner
        except OSError:
            return False
        except UnicodeDecodeError:
            banner = s.recv(1024).strip()
            return banner
    except (socket.timeout, ConnectionRefusedError, OSError):
        return False

Да, можно сократить:

Python:
    try:
        s.connect((ip, port))
        if banner := s.recv(1024).decode().strip():
            return banner
        return False
    except UnicodeDecodeError:
        return s.recv(1024).strip()
    except (socket.timeout, ConnectionRefusedError, OSError):
        return False

По поводу этого:

Python:
try:
    request_syn = sc.IP(dst=ip) / sc.TCP(dport=ports, flags="S")
except socket.gaierror:
    raise ValueError(f'{ip} получить не удалось')
answer = sc.sr(request_syn, timeout=2, retry=1, verbose=False)[0]

Да, пародон. Данный кусок кода использовался у меня не тут. И пару раз даже попадался мне на глаза. Но, почему-то выпустил из виду. Блок try – except здесь лишний.

По поводу «писать все дважды». Изначально было желание вынести получение заголовков в отдельную функцию. Передумал, чтобы не загромождать код. А так да. Можно было бы просто убрать отсюда и вынести отдельно вот этот кусок:

Python:
                        try:
                            head = requests.head(url=f"https://{ip}:80", headers=headers, verify=False,
                                                 timeout=3).headers
                            try:
                                result[ip].update({'80/http': case_sence(head)['location']})
                            except Exception:
                                result[ip].update({'80/http': case_sence(head)})
                        except Exception:
                            result[ip].update({"80/http": "open"})

и этот:

Python:
                        try:
                            head = requests.head(url=f"https://{ip}:443", headers=headers, verify=False,
                                                 timeout=3).headers
                            try:
                                result[ip].update({"443/http": case_sence(head)['location']})
                            except Exception:
                                result[ip].update({"443/http": case_sence(head)})
                        except Exception:
                            result[ip].update({"443/http": "open"})

По многопоточности. Делал. Тщательность сканирования портов снижается. Плюс к тому, скорее всего у scapy под капотом своя обработка потоков (тут не уверен, но судя по скорости работы так и есть), так как 1000 портов сканируются достаточно быстро, также как если бы были задействованы потоки. Ведь по сути, когда мы сканируем локальную сеть с помощью scapy, он может сканировать не один или пару адресов. Он может просканировать сразу же диапазон: 192.168.0.1/24. Так что, потоки здесь излишни.
Хотя тут вопрос, как scapy отправляет запросы на каждый порт. В документации об этом сказано достаточно расплывчато. Думаю, надо еще поискать информацию по данному вопросу.

UPD: Забыл упомянуть. По созданию списка для проверки адресов, согласен. Быстрее проверить вхождение. А вот по поводу сохранения, тут ситуация двоякая. С одной стороны, можно было бы сделать список, по мере работы добавлять в него адреса, а уже в конце, по KeyboardInterrupt сохранять все в файл. Но сделал так, как сделал, то есть сохраняю в файл после каждой итерации, во избежание потери данных о проверенных адресах.

За статью @explorer спасибо. Думаю вы и имели в виду мультипроцессинг. До него руки пока не дошли. Но очень хочется )).

А так да, конечно же, всегда есть к чему стремиться.
 
Последнее редактирование:
  • Нравится
Реакции: f22

Johan Van

Green Team
13.06.2020
363
691
BIT
394
Немного переделал скрипт. Сделал раздельное сканирование портов, получение баннеров, а также получение заголовков.
Теперь данные сохраняются в БД SQLite, так как в скрипте явно требовалась более быстрая реализация проверки ip-адреса, был он уже сканирован или нет.
Если продолжить сохранять адреса в текстовый документ, то в конце-концов, так как адресов все же довольно много, его размер может достигнуть критического значения.
Конечно, при больших объемах памяти в нынешних ПК достигнуть их объема достаточно трудно, но хранить такой объект, даже временно в памяти не совсем практично,
как и итерироваться по нему построчно. Все это занимает слишком много времени.

Расписывать подробности не буду. По отношению к статье изменился порядок сканирования, ну и добавилась функция сохранения данных в БД.
В принципе, скрипт стал отрабатывать быстрее, так как делает меньше лишних движений. Если раньше для получения данных требовалось около 9 с копейками
секунд. то сейчас это время сократилось до 5 с небольшим.

Из добавленного еще список популярных портов, который напрямую передается в scapy. Так как он может принимать кортежи, списки и отдельные порты для сканирования.

Python:
import random
import socket
import sqlite3
import threading
import time
from datetime import datetime as dt
from ipaddress import ip_network
from pathlib import Path

import requests
import scapy.all as sc

requests.packages.urllib3.disable_warnings()
random.seed(dt.now().second + random.randint(0, 101))

ports = [1, 3, 4, 6, 7, 9, 13, 17, 19, 20, 21, 22, 23, 24, 25, 26, 30, 32, 33, 37, 42, 43, 49, 53, 70, 79,
            80, 81, 82, 83, 84, 85, 88, 89, 90, 99, 100, 106, 109, 110, 111, 113, 119, 125, 135, 139, 143, 144,
            146, 161, 163, 179, 199, 211, 212, 222, 254, 255, 256, 259, 264, 280, 301, 306, 311, 340, 366, 389,
            406, 407, 416, 417, 425, 427, 443, 444, 445, 458, 464, 465, 481, 497, 500, 512, 513, 514, 515, 524,
            541, 543, 544, 545, 548, 554, 555, 563, 587, 593, 616, 617, 625, 631, 636, 646, 648, 666, 667, 668,
            683, 687, 691, 700, 705, 711, 714, 720, 722, 726, 749, 765, 777, 783, 787, 800, 801, 808, 843, 873,
            880, 888, 898, 900, 901, 902, 903, 911, 912, 981, 987, 990, 992, 993, 995, 999, 1000, 1001, 1002,
            1007, 1009, 1010, 1011, 1021, 1022, 1023, 1024, 1025, 1026, 1027, 1028, 1029, 1030, 1031, 1032,
            1033, 1034, 1035, 1036, 1037, 1038, 1039, 1040, 1041, 1042, 1043, 1044, 1045, 1046, 1047, 1048,
            1049, 1050, 1051, 1052, 1053, 1054, 1055, 1056, 1057, 1058, 1059, 1060, 1061, 1062, 1063, 1064,
            1065, 1066, 1067, 1068, 1069, 1070, 1071, 1072, 1073, 1074, 1075, 1076, 1077, 1078, 1079, 1080,
            1081, 1082, 1083, 1084, 1085, 1086, 1087, 1088, 1089, 1090, 1091, 1092, 1093, 1094, 1095, 1096,
            1097, 1098, 1099, 1100, 1102, 1104, 1105, 1106, 1107, 1108, 1110, 1111, 1112, 1113, 1114, 1117,
            1119, 1121, 1122, 1123, 1124, 1126, 1130, 1131, 1132, 1137, 1138, 1141, 1145, 1147, 1148, 1149,
            1151, 1152, 1154, 1163, 1164, 1165, 1166, 1169, 1174, 1175, 1183, 1185, 1186, 1187, 1192, 1198,
            1199, 1201, 1213, 1216, 1217, 1218, 1233, 1234, 1236, 1244, 1247, 1248, 1259, 1271, 1272, 1277,
            1287, 1296, 1300, 1301, 1309, 1310, 1311, 1322, 1328, 1334, 1352, 1417, 1433, 1434, 1443, 1455,
            1461, 1494, 1500, 1501, 1503, 1521, 1524, 1533, 1556, 1580, 1583, 1594, 1600, 1641, 1658, 1666,
            1687, 1688, 1700, 1717, 1718, 1719, 1720, 1721, 1723, 1755, 1761, 1782, 1783, 1801, 1805, 1812,
            1839, 1840, 1862, 1863, 1864, 1875, 1900, 1914, 1935, 1947, 1971, 1972, 1974, 1984, 1998, 1999,
            2000, 2001, 2002, 2003, 2004, 2005, 2006, 2007, 2008, 2009, 2010, 2013, 2020, 2021, 2022, 2030,
            2033, 2034, 2035, 2038, 2040, 2041, 2042, 2043, 2045, 2046, 2047, 2048, 2049, 2065, 2068, 2099,
            2100, 2103, 2105, 2106, 2107, 2111, 2119, 2121, 2126, 2135, 2144, 2160, 2161, 2170, 2179, 2190,
            2191, 2196, 2200, 2222, 2251, 2260, 2288, 2301, 2323, 2366, 2381, 2382, 2383, 2393, 2394, 2399,
            2401, 2492, 2500, 2522, 2525, 2557, 2601, 2602, 2604, 2605, 2607, 2608, 2638, 2701, 2702, 2710,
            2717, 2718, 2725, 2800, 2809, 2811, 2869, 2875, 2909, 2910, 2920, 2967, 2968, 2998, 3000, 3001,
            3003, 3005, 3006, 3007, 3011, 3013, 3017, 3030, 3031, 3052, 3071, 3077, 3128, 3168, 3211, 3221,
            3260, 3261, 3268, 3269, 3283, 3300, 3301, 3306, 3322, 3323, 3324, 3325, 3333, 3351, 3367, 3369,
            3370, 3371, 3372, 3389, 3390, 3404, 3476, 3493, 3517, 3527, 3546, 3551, 3580, 3659, 3689, 3690,
            3703, 3737, 3766, 3784, 3800, 3801, 3809, 3814, 3826, 3827, 3828, 3851, 3869, 3871, 3878, 3880,
            3889, 3905, 3914, 3918, 3920, 3945, 3971, 3986, 3995, 3998, 4000, 4001, 4002, 4003, 4004, 4005,
            4006, 4045, 4111, 4125, 4126, 4129, 4224, 4242, 4279, 4321, 4343, 4443, 4444, 4445, 4446, 4449,
            4550, 4567, 4662, 4848, 4899, 4900, 4998, 5000, 5001, 5002, 5003, 5004, 5009, 5030, 5033, 5050,
            5051, 5054, 5060, 5061, 5080, 5087, 5100, 5101, 5102, 5120, 5190, 5200, 5214, 5221, 5222, 5225,
            5226, 5269, 5280, 5298, 5357, 5405, 5414, 5431, 5432, 5440, 5500, 5510, 5544, 5550, 5555, 5560,
            5566, 5631, 5633, 5666, 5678, 5679, 5718, 5730, 5800, 5801, 5802, 5810, 5811, 5815, 5822, 5825,
            5850, 5859, 5862, 5877, 5900, 5901, 5902, 5903, 5904, 5906, 5907, 5910, 5911, 5915, 5922, 5925,
            5950, 5952, 5959, 5960, 5961, 5962, 5963, 5987, 5988, 5989, 5998, 5999, 6000, 6001, 6002, 6003,
            6004, 6005, 6006, 6007, 6009, 6025, 6059, 6100, 6101, 6106, 6112, 6123, 6129, 6156, 6346, 6389,
            6502, 6510, 6543, 6547, 6565, 6566, 6567, 6580, 6646, 6666, 6667, 6668, 6669, 6689, 6692, 6699,
            6779, 6788, 6789, 6792, 6839, 6881, 6901, 6969, 7000, 7001, 7002, 7004, 7007, 7019, 7025, 7070,
            7100, 7103, 7106, 7200, 7201, 7402, 7435, 7443, 7496, 7512, 7625, 7627, 7676, 7741, 7777, 7778,
            7800, 7911, 7920, 7921, 7937, 7938, 7999, 8000, 8001, 8002, 8007, 8008, 8009, 8010, 8011, 8021,
            8022, 8031, 8042, 8045, 8080, 8081, 8082, 8083, 8084, 8085, 8086, 8087, 8088, 8089, 8090, 8093,
            8099, 8100, 8180, 8181, 8192, 8193, 8194, 8200, 8222, 8254, 8290, 8291, 8292, 8300, 8333, 8383,
            8400, 8402, 8443, 8500, 8600, 8649, 8651, 8652, 8654, 8701, 8800, 8873, 8888, 8899, 8994, 9000,
            9001, 9002, 9003, 9009, 9010, 9011, 9040, 9050, 9071, 9080, 9081, 9090, 9091, 9099, 9100, 9101,
            9102, 9103, 9110, 9111, 9200, 9207, 9220, 9290, 9415, 9418, 9485, 9500, 9502, 9503, 9535, 9575,
            9593, 9594, 9595, 9618, 9666, 9876, 9877, 9878, 9898, 9900, 9917, 9929, 9943, 9944, 9968, 9998,
            9999, 10000, 10001, 10002, 10003, 10004, 10009, 10010, 10012, 10024, 10025, 10082, 10180, 10215,
            10243, 10566, 10616, 10617, 10621, 10626, 10628, 10629, 10778, 11110, 11111, 11967, 12000, 12174,
            12265, 12345, 13456, 13722, 13782, 13783, 14000, 14238, 14441, 14442, 15000, 15002, 15003, 15004,
            15660, 15742, 16000, 16001, 16012, 16016, 16018, 16080, 16113, 16992, 16993, 17877, 17988, 18040,
            18101, 18988, 19101, 19283, 19315, 19350, 19780, 19801, 19842, 20000, 20005, 20031, 20221, 20222,
            20828, 21571, 22939, 23502, 24444, 24800, 25734, 25735, 26214, 27000, 27352, 27353, 27355, 27356,
            27715, 28201, 30000, 30718, 30951, 31038, 31337, 32768, 32769, 32770, 32771, 32772, 32773, 32774,
            32775, 32776, 32777, 32778, 32779, 32780, 32781, 32782, 32783, 32784, 32785, 33354, 33899, 34571,
            34572, 34573, 35500, 38292, 40193, 40911, 41511, 42510, 44176, 44442, 44443, 44501, 45100, 48080,
            49152, 49153, 49154, 49155, 49156, 49157, 49158, 49159, 49160, 49161, 49163, 49165, 49167, 49175,
            49176, 49400, 49999, 50000, 50001, 50002, 50003, 50006, 50300, 50389, 50500, 50636, 50800, 51103,
            51493, 52673, 52822, 52848, 52869, 54045, 54328, 55055, 55056, 55555, 55600, 56737, 56738, 57294,
            57797, 58080, 60020, 60443, 61532, 61900, 62078, 63331, 64623, 64680, 65000, 65129, 65389]

result = dict()


def create_base():
    """
    Создаем базу данных sqlite
    """
    conn = sqlite3.connect('checked_ip.db')
    cur = conn.cursor()
    cur.execute("""CREATE TABLE IF NOT EXISTS check_ip(
                   ip TEXT,
                   port INT,
                   service TEXT,
                   status TEXT);
                """)
    conn.commit()
    cur.close()
    conn.close()


def case_sense(resp) -> dict:
    """
    Переводим данные json из заголовка в нижний регистр.

    :param resp: json с данными заголовка.
    :return: словарь с данными переведенными в нижний регистр.
    """
    resp_low = dict()
    for res in resp:
        resp_low.update({res.lower(): resp[res].lower()})
    return resp_low


def get_headers(ip: str, pt: str, proto: str):
    """
    Попытка получения заголовков, если обнаружены открытые 80 или 443 порты.

    :param ip: строка, ip-адрес.
    :param pt: строка, порт.
    :param proto: строка, протокол.
    """
    global result

    headers = {
        "user-agent": "Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/107.0.0.0 "
                      "Safari/537.36 "
    }

    try:
        head = case_sense(requests.head(url=f"{proto}://{ip}:{pt}", headers=headers, verify=False, timeout=3).headers)
        result[ip].update({f"{pt}/{proto}": head.get('location')}) if head.get('location') \
            else result[ip].update({f"{pt}/{proto}": head})
    except Exception:
        return


def scan_port(ip: str, pt: int, proto: str):
    """
    Попытка получения баннера с открытого порта.

    :param ip: строка, ip-адрес.
    :param pt: целое число, порт.
    :param proto: строка, сервис.
    """
    global result
    ans, s = "", ""
    try:
        s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
        s.settimeout(2)
        if s.connect_ex((ip, pt)) == 0:
            ans = s.recv(1024)
            result[ip].update({f"{pt}/{proto}": str(ans.decode()).strip()}) if str(ans.decode().strip()) != "" \
                else result[ip].update({f"{pt}/{proto}": "open"})
        s.close()
    except UnicodeDecodeError:
        s.close()
        result[ip].update({f"{pt}/{proto}": str(ans).replace("b'", "").replace("'", "").replace('b"', "").
                          replace('"', "")})
    except (socket.timeout, ConnectionRefusedError, OSError):
        s.close()
        return


def thread_func(ip: str):
    """
    Запуск потоков для сканирования открытых портов.

    :param ip: строка, ip-адрес.
    """
    global result
    threads = []

    for rs in result:
        for pt in result[rs]:
            t = threading.Thread(target=scan_port, kwargs={'ip': ip, 'pt': int(pt.split("/")[0]),
                                                           "proto": pt.split("/")[1]}, daemon=True)
            t.start()
            threads.append(t)

    for thread in threads:
        thread.join()


def syn_ack_scan(ip: str):
    """
    Попытка сканирования портов на определенном ip-адресе.

    :param ip: строка, ip-адрес.
    """
    global ports, result

    request_syn = sc.IP(dst=ip) / sc.TCP(dport=ports, flags="S")
    answer = sc.sr(request_syn, timeout=2, retry=1, verbose=False)[0]

    for _, receive in answer:
        try:
            if receive['TCP'].flags == "SA":
                if ip not in result:
                    result[ip] = dict()
                result[ip].update({f"{str(receive['TCP'].sport)}/"
                                   f"{str(sc.TCP_SERVICES[receive['TCP'].sport])}": "open"})
        except KeyError:
            result[ip].update({f"{str(receive['TCP'].sport)}/undefined": "open"})
        except IndexError:
            continue


def insert_base(ip: str):
    """
    Сохранение информации в БД sqlite.

    :param ip: строка, ip-адрес.
    """
    global result

    con_ins = sqlite3.connect('checked_ip.db')
    cur_ins = con_ins.cursor()

    if result:
        table_list = []
        for res in result:
            print(f"\nИнформация об адресе: {res}\n{'-'*65}")
            table_list = [(res, int(port.split("/")[0]), str(port.split("/")[1]), str(result[res][port]))
                          for port in result[res]]
            for st in result[res]:
                print(f"Порт: {st:20} | Статус: {result[res][st]}")
        cur_ins.executemany("INSERT INTO check_ip VALUES(?, ?, ?, ?);", table_list)
        con_ins.commit()
    else:
        cur_ins.execute("INSERT INTO check_ip VALUES(?, ?, ?, ?);", (ip, 0, "no data", "no data"))
        con_ins.commit()
    cur_ins.close()
    con_ins.close()
    result.clear()


def ip_rand(path_dir: str):
    """
    Получение случайного ip-адреса. Запуск функций сканирования портов,
    получения баннеров и заголовков.

    :param path_dir: строка, путь к файлу с CIDR провайдеров.
    """
    while KeyboardInterrupt:
        with open(path_dir, 'r', encoding='utf-8') as file:
            cidr = random.choice(file.readlines()).strip()
            ip = random.choice([str(x) for x in ip_network(cidr)])

            con_check = sqlite3.connect('checked_ip.db')
            cur_check = con_check.cursor()
            sel = """SELECT ip FROM check_ip WHERE ip = ?"""
            if cur_check.execute(sel, (ip,)).fetchone() is not None:
                continue
            else:
                cur_check.close()
                con_check.close()
                start = time.monotonic()
                print(f"\nСканирую: {ip}")
                syn_ack_scan(ip)

                if result:
                    thread_func(ip)
                    for res in result:
                        for port in result[res]:
                            if port.split("/")[0] == "80" or port.split("/")[0] == "443":
                                get_headers(ip, port.split("/")[0], port.split("/")[1])

                    insert_base(ip)
                    print(f"\nВремя сканирования: {time.monotonic() - start:.2f} c.\n{'-' * 65}")
                else:
                    print(f"No result\nВремя сканирования: {time.monotonic() - start:.2f} c.\n{'-' * 65}")
                    insert_base(ip)


def main():
    """
    Проверка существования файла с CIDR провайдеров. Запуск функции
    получения случайного ip-адреса: ip_rand.
    """
    if not (Path.cwd() / "checked_ip.db").exists():
        create_base()
    ip_rand("provider_ranges.txt")


if __name__ == "__main__":
    main()

Так выглядит сканирование в терминале:

01.png


А так данные в БД:

02.png
 

Вложения

  • sa_database.zip
    898,4 КБ · Просмотры: 118

MLNK

Mod. Ethical Hacking
Red Team
23.01.2018
560
706
BIT
7
@Johan Van заведи ты уже себе Github.
1. это проще чем кидать код в архиве
2. при изменении или правках это легче отслеживать и не надо перезаливать архивы.
3. все будет в одном месте и удобно каталогизировано
4. обретёшь базовые знания по гиту, что упростит тебе жизнь и разработку
 
  • Нравится
Реакции: dream to perfection
Мы в соцсетях:

Обучение наступательной кибербезопасности в игровой форме. Начать игру!