Статья Перехват DNS-трафика с помощью Python и библиотеки scapy

Не так давно утилита Берта Хуберта воспроизводящая звуковой сигнал каждый раз, когда браузер отправляет данные в Google, наделала много шуму в онлайн-прессе. Сама идея перехвата трафика не нова, и я подумал, а почему бы не сделать утилиту на Python с похожим функционалом. Конечно, с утилитой, написанной на «C» она не сравнится по скорости работы, но ведь ее можно использовать не только для того, чтобы детектить Google. Можно детектить еще и Яндекс ))). Но на самом деле, с ее помощью получится регистрировать открытие разных сайтов, детектить адреса за Cloudflare и при необходимости составлять список сайтов, которые используют этот CDN.

5ad2c7c7776cb578194045.jpeg

Конечно же, у меня получилось не совсем то же самое. Я не перехватывал адреса, которые проходят напрямую по ip-адресу. То есть, слой перехваченного пакета IP остался не затронут. Однако, у меня и не было такой цели. Нужно было сделать перехват запросов к сайтам, с возможностью получения адреса и по возможности ip-адреса. Ну, а определение адресов Яндекс, Google и Cloudflare, это добавилось уже позже. Так что, на данный момент скрипт может перехватывать адреса, на которые идет запрос. Пытается определить или получить (это уже зависит от того, содержится ли адрес в DNS) ip-адрес, детектит обращение к Яндекс, Google и вдобавок проверяет, не входит ли ip-адрес домена в диапазон адресов Cloudflare.

Дисклеймер: Все данные, предоставленные в данной статье, взяты из открытых источников, не призывают к действию и являются только лишь данными для ознакомления, и изучения механизмов используемых технологий.


Что понадобится?

Из того, что отсутствует в стандартных библиотеках python, нужно будет установить requests. С его помощью мы будем получать диапазоны адресов Cloudflare прямо со страницы с диапазонами. Установить chime, для подачи звукового сигнала при обнаружении адресов из проверяемых диапазонов. Также нужно установить библиотеку getmac, для того, чтобы можно было определить mac-адрес компьютера, и уже с помощью mac-адреса получить название активного сетевого интерфейса в операционной системе Windows. Ну и для того, чтобы вывод в терминал не был особо скучным, установим библиотеку colorama. С ее помощью мы будем раскрашивать вывод. Для того, чтобы установить все вышеназванные библиотеки, пишем в терминале команду:

pip install chime colorama requests getmac

И, конечно же, установить самую основную библиотеку, с помощью которой и будет происходить перехват трафика с 53 порта DNS – Scapy. Для ее установки пишем в терминале команду:

pip install --pre scapy[basic]

Теперь, что касается пользователей операционной системы Windows. Для того, чтобы все заработало, дополнительно потребуется установить Npcap Free Edition ( ). Это библиотека захвата (и отправки) пакетов проекта Nmap для Microsoft Windows. Без нее Scapy работать не будет.

После того, как все библиотеки будут установлены, необходимо импортировать их в скрипт, а также импортировать все нужные в работу библиотеки и вдобавок инициализировать библиотеку colorama.

Python:
import os.path
import platform
import subprocess
from datetime import datetime
from ipaddress import ip_network, ip_address
from socket import gethostbyname, gaierror, socket, AF_INET, SOCK_DGRAM

import chime
import getmac
from colorama import Fore
from colorama import init
from requests import get
from scapy.all import sniff

init()

После того, как будет выполнен импорт библиотек, создадим три пустых глобальных списка: list_addr, yandex, google. В данные списки мы будем складывать диапазоны адресов Cloudflare, Яндекс и Google.

Python:
list_addr = []
yandex = []
google = []

Итак, все импортировано, можно начинать писать код. И начнем мы его писать с самого начала, с функции main().


Определение ОС, проверка прав, запуск перехвата трафика

Создадим функцию main(). Для начала выполним проверку, под какой операционной системой работает наш скрипт, так как способы получения активного сетевого соединения у каждой операционной системы немного разные. Для этого будем использовать функцию system библиотеки platform. Она возвращает название операционной системы. Для Linux, это будет, соответственно “Linux”, для Windows – “Windows”. Ну и для Mac – “Darwin”. Вот только для mac данный скрипт не заточен. Просто потому, что его у меня нет, а значит, я не знаю, как он себя поведет на данной ОС.

Если у нас операционная система Linux, необходимо выполнить проверку прав пользователя, так как по этой ОС скрипт требует повышения привилегий, то есть, должен быть запущен из под суперпользователя. Если это не так, выводим предупреждение и завершаем работу скрипта.

Python:
    if platform.system() == "Linux":
        from os import getuid
        if not getuid() == 0:
            print('\n[+] Запустите скрипт с правами суперпользователя!\n')
            return

Если же все прошло нормально, то запускаем итерацию по спискам диапазонов адресов Яндекс и Google. То есть, добавим диапазоны в соответствующие списки. Для этого запускаем функцию load_range(). И только после этого начинаем перехват трафика с помощью функции библиотеки scapy – sniff.

sniff(iface=ip_iface(), filter='dst port 53', count=0, store=False, prn=packet_sniff)

Данная функция запускается с определенными параметрами. Давайте немного остановимся на них.

Параметр iface. С его помощью задается сетевой интерфейс для прослушивания трафика.
Параметр filter, соответственно, фильтр пакетов. С его помощью можно установить, какой тип трафика мы желаем прослушивать, а также порт. В данном случае нас интересуют исходящие пакеты на 53-м порту.
Параметр count задает количество пакетов для перехвата. В данном случае, так как указано значение 0, перехват будет идти бесконечно. А если установить определенное значение, например 10, то будет перехвачено 10 пакетов и функция завершит свою работу.
Параметр store указывает, сохранять или нет перехваченные пакеты. В данном случае нам сохранение пакетов не нужно. Поэтому ставим False.
Параметр prn указывает, в какую функцию передавать перехваченные пакеты для обработки.

Python:
def main():
    """
    Выполняем проверку на операционную систему. В зависимости
    от этого проверяем, если под linux, запущен ли скрипт от
    имени суперпользователя. Если да, запускаем перехват трафика.
    В ОС Windows скрипт не требует наличия административных прав.
    Поэтому перехват трафика запускается сразу.
    Функция перехвата трафика имеет следующие параметры:
    :param iface: сетевой интерфейс для прослушивания.
    :param filter: фильтр, в данном случае фильтруем исходящие запросы на порт 53,
    который является портом DNS.
    :param count: количество перехваченных пакетов. В данном случае, перехват будет
    идти бесконечно.
    :paran store: не сохранять ни один полученный пакет.
    :param prn: функция в которую передаются пакеты для обработки.
    :return: выход из функции.
    """
    if platform.system() == "Linux":
        from os import getuid
        if not getuid() == 0:
            print('\n[+] Запустите скрипт с правами суперпользователя!\n')
            return
        load_range()
        print(Fore.GREEN + '[*] Перехват пакетов...\n')
        sniff(iface=ip_iface(), filter='dst port 53', count=0, store=False, prn=packet_sniff)
    elif platform.system() == "Windows":
        load_range()
        print(Fore.GREEN + '[*] Перехват пакетов...\n')
        sniff(iface=iface_win(), filter='dst port 53', count=0, store=False, prn=packet_sniff)


Загрузка диапазона адресов в списки

Для того, чтобы детектить адреса Яндекс, Google или Cloudflare, необходимо загрузить списки диапазонов адресов, которые принадлежат данным компаниям. Именно по этим диапазонам мы будем итерироваться и проверять адрес на принадлежность к одному из низ.

Создадим функцию load_range(). Объявим глобальные переменные списков, которые мы создали ранее.

Python:
    global list_addr
    global yandex
    global google

В переменную dir_range присваиваем значение пути к папке со списками диапазонов адресов. Затем проверяем, существует ли такая директория. Если нет, сообщаем об этом пользователю и выходим из функции, так как без загрузки диапазонов особо проверять будет нечего.

Python:
    dir_range = os.path.join(os.getcwd(), 'ip_range')
    if not os.path.exists(dir_range):
        print(Fore.RED + '[-] Не существует директории с конфигурацией')
        exit(0)

Если все хорошо, и папка существует, загружаем с сайта Cloudflare диапазоны адресов и добавляем в глобальный список list_addr.

Python:
    print(Fore.GREEN + '\n[*] Получение диапазона адресов Cloudflare')
    list_addr = get(url='https://www.cloudflare.com/ips-v4').text.splitlines()
    list_addr.append("104.16.0.0/12")

Теперь нужно загрузить остальные диапазоны, которые находятся в директории 'ip_range', в соответствующих списках. Проверяем, существует ли файл с диапазонами адресов. После чего открываем файл, считываем его и разбиваем построчно, с помощью splitlines(). Все это присваиваем в глобальную переменную с соответствующим названием. Таким образом, без итерации по файлу, мы получаем список диапазонов.

Python:
    if os.path.exists(os.path.join(dir_range, 'yandex_ipv4.txt')):
        print(Fore.GREEN + '[*] Загрузка адресов Яндекс')
        with open(os.path.join(dir_range, 'yandex_ipv4.txt'), 'r', encoding='utf-8') as ya:
            yandex = ya.read().splitlines()

    if os.path.exists(os.path.join(dir_range, 'google_ipv4.txt')):
        print(Fore.GREEN + '[*] Загрузка адресов Google\n')
        with open(os.path.join(dir_range, 'google_ipv4.txt'), 'r', encoding='utf-8') as go:
            google = go.read().splitlines()

Python:
def load_range():
    """
    Проверяем, существует ли папка с диапазонами
    адресов. Если нет, выходим из функции. Если есть,
    получаем диапазоны адресов Cloudflare, присваиваем их
    в глобальную переменную list_addr.
    Загружаем диапазоны адресов Яндекс и Google, разбиваем
    построчно и присваиваем в глобальные переменные yandex
    и google.
    :return: выход из функции.
    """
    global list_addr
    global yandex
    global google

    dir_range = os.path.join(os.getcwd(), 'ip_range')
    if not os.path.exists(dir_range):
        print(Fore.RED + '[-] Не существует директории с конфигурацией')
        exit(0)

    print(Fore.GREEN + '\n[*] Получение диапазона адресов Cloudflare')
    list_addr = get(url='https://www.cloudflare.com/ips-v4').text.splitlines()
    list_addr.append("104.16.0.0/12")

    if os.path.exists(os.path.join(dir_range, 'yandex_ipv4.txt')):
        print(Fore.GREEN + '[*] Загрузка адресов Яндекс')
        with open(os.path.join(dir_range, 'yandex_ipv4.txt'), 'r', encoding='utf-8') as ya:
            yandex = ya.read().splitlines()

    if os.path.exists(os.path.join(dir_range, 'google_ipv4.txt')):
        print(Fore.GREEN + '[*] Загрузка адресов Google\n')
        with open(os.path.join(dir_range, 'google_ipv4.txt'), 'r', encoding='utf-8') as go:
            google = go.read().splitlines()


Обработка перехваченных пакетов

Еще в описании функции main() в одном из ее параметров осуществляется передача пакетов в функцию для обработки. В данном случае это функция packet_sniff(packet), которую нужно создать. На вход она принимает перехваченный пакет.
Объявляем переменную для доменного имени, которое мы будем получать из перехваченного адреса. И в переменную qname присваиваем значение параметра qname из перехваченного пакета, со слоя 'DNS Question Record'.

Для того чтобы понимать, откуда и что берется, я прикреплю ниже картинку со структурой пакета.

screenshot1.png

Пакет состоит из нескольких слоев, уровней. Как видите, первые уровень, это Ethernet. Здесь передается mac-адрес получателя, mac-адрес отправителя и тип протокола, по которому будет идти передача. В данном случае – это IPv6. Следующий уровень, это IPv6 или IP. В зависимости от версии протокола. Отсюда уже можно получить ip или ipv6 адреса получателя и отправителя. Следующий слой UDP, но в данном случае он нам не особо интересен. А вот далее идет слой DNS. У него есть подуровень DNS Question Record, у которого есть параметр qname. В нем содержится адрес, по которому переходит браузер и отправляется запрос. Вот его мы и забираем в данном случае. В теории, можно было бы забирать еще и IPv6 адрес получателя пакета. Но мне он в данном случае не нужен. Также в пакете, иногда, есть еще один подуровень - DNS Resource Record. Но он появляется не всегда. А только когда есть ip-адрес, по которому нужно передать пакет. Вот в этом подуровне, в параметре rdata она и содержится. Поэтому, нужно проверять, есть он или нет. На всякий случай. Чтобы, в случае, если там есть адрес не обращаться зря за адресом по доменному имени.

Python:
    domain = None
    qname = packet['DNS Question Record'].qname.decode()

Затем получаем имя домена простой разбивкой адреса на составляющие по точке, и обратной сборки в нужном порядке. Заключим получение доменного имени в блок try-except, так как иногда появляются странные адреса, без домена верхнего уровня, и тогда возникает исключение при разбивке.

Python:
    try:
        domain = f'{qname[0:-1].split(".")[-2]}.{qname[0:-1].split(".")[-1]}'
    except IndexError:
        print(Fore.GREEN + f'[+] Запрос: {Fore.RESET}{qname[0:-1]}')
        print(Fore.GREEN + f'[+] Время запроса: {Fore.YELLOW}{datetime.now().time()}')
        print(Fore.GREEN + '[+] Домен: Unknown')

Далее, получаем, собственно, а вернее, пытаемся получить данные из подуровня с ip-адресом - DNS Resource Record. И если получаем, проверяем, не является ли данный адрес локальным, так как иногда здесь попадаются адреса, которые передаются непосредственно на роутер или маршрутизатор. Для этого из функции local_ipv4 получаем локальный адрес, разбиваем адреса по точкам, забираем первый элемент списка и сравниваем значения. Если они равны, выходим из функции. Если же нет, выводим сообщения в терминал о запросе, домене, времени запроса и ip-адресе. После этого передаем полученный ip-адрес в функцию change_ip для проверки, не является ли адрес принадлежащим к диапазонам адресов Яндекс, Google или Cloudflare.

Python:
        rdata = packet['DNS Resource Record'].rdata.decode()
        if rdata.split(".")[0] == local_ipv4().split(".")[0]:
            return
        print(Fore.GREEN + f'[+] Запрос: {Fore.RESET}{qname[0:-1]}')
        print(Fore.GREEN + f'[+] Домен: {Fore.YELLOW}{domain}')
        print(Fore.GREEN + f'[+] Время запроса: {Fore.YELLOW}{datetime.now().time()}')
        print(Fore.GREEN + f'[+] IP-адрес домена: {Fore.RESET}{rdata}')
        change_ip(rdata)
        print('\n')

Получение адреса из подуровня заключим в блок try-except, для того, чтобы обработать ошибку, если данного подуровня не будет в пакете. А в обработке ошибки попытаемся получить ip-адрес по доменному имени. После чего выводим сообщения в терминал и проверяем, какой ответ получен на запрос ip-адреса. Если это не сообщение об ошибке, запускаем функцию change_ip, в противном случае выходим из функции.

Python:
        ip = get_host(domain)
        print(Fore.GREEN + f'[+] Запрос: {Fore.RESET}{qname[0:-1]}')
        print(Fore.GREEN + f'[+] Домен: {Fore.YELLOW}{domain}')
        print(Fore.GREEN + f'[+] Время запроса: {Fore.YELLOW}{datetime.now().time()}')
        print(Fore.GREEN + f'[+] IP-адрес домена: {Fore.RESET}{ip}')
        if ip != 'Unknown - No address associated with hostname':
            change_ip(ip)
        print('\n')

Python:
def packet_sniff(packet):
    """
    Получаем url-сайта из перехваченного пакета.
    Забираем из url доменное имя. Выводим сообщения в
    терминал. Пытаемся получить ip-адрес из DNS Resource Record.
    Если адреса нет, обрабатываем исключение, и пытаемся получить
    ip-адрес по доменному имени.
    В любом из случаев выводим сообщение в терминал об url и
    времени запроса. Запускаем функцию проверки ip-адреса на
    принадлежность к адресам Яндекс, Google или Cloudflare.
    :param packet: перехваченный пакет.
    :return: выход из функции.
    """
    domain = None
    qname = packet['DNS Question Record'].qname.decode()
    try:
        domain = f'{qname[0:-1].split(".")[-2]}.{qname[0:-1].split(".")[-1]}'
    except IndexError:
        print(Fore.GREEN + f'[+] Запрос: {Fore.RESET}{qname[0:-1]}')
        print(Fore.GREEN + f'[+] Время запроса: {Fore.YELLOW}{datetime.now().time()}')
        print(Fore.GREEN + '[+] Домен: Unknown')

    try:
        rdata = packet['DNS Resource Record'].rdata.decode()
        if rdata.split(".")[0] == local_ipv4().split(".")[0]:
            return
        print(Fore.GREEN + f'[+] Запрос: {Fore.RESET}{qname[0:-1]}')
        print(Fore.GREEN + f'[+] Домен: {Fore.YELLOW}{domain}')
        print(Fore.GREEN + f'[+] Время запроса: {Fore.YELLOW}{datetime.now().time()}')
        print(Fore.GREEN + f'[+] IP-адрес домена: {Fore.RESET}{rdata}')
        change_ip(rdata)
        print('\n')
    except IndexError:
        ip = get_host(domain)
        print(Fore.GREEN + f'[+] Запрос: {Fore.RESET}{qname[0:-1]}')
        print(Fore.GREEN + f'[+] Домен: {Fore.YELLOW}{domain}')
        print(Fore.GREEN + f'[+] Время запроса: {Fore.YELLOW}{datetime.now().time()}')
        print(Fore.GREEN + f'[+] IP-адрес домена: {Fore.RESET}{ip}')
        if ip != 'Unknown - No address associated with hostname':
            change_ip(ip)
        print('\n')


Получение ip-адреса по доменному имени

В предыдущей функции мы, в случае возникновения ошибки получали ip-адрес по доменному имени. Для этого создадим функцию get_host(domain). На вход она получает доменное имя. Затем, с помощью функции библиотеки socket – gethostbyname, получаем доменное имя. Заключаем код в блок try-except, для того, чтобы обработать ошибку, когда невозможно сопоставить доменное имя с ip-адресом, так как у него его просто нет.

Python:
def get_host(domain):
    """
    Попытка получить ip-адрес домена.
    :param domain: доменное имя для получения ip.
    :return: ip-адрес домена или сообщение об ошибке,
    в случае невозможности получить адрес.
    """
    try:
        return gethostbyname(domain)
    except gaierror:
        return 'Unknown - No address associated with hostname'


Проверка ip-адресов на вхождение в диапазоны

Следующая функция, которую нужно создать, это change_ip(ip). На вход она принимает ip-адрес. С помощью данной функции мы будем проверять вхождение ip-адреса в один из диапазонов. Объявим глобальные переменные, чтобы можно было по ним итерироваться.

Python:
    global yandex
    global google

Для начала проверим, не принадлежит ли адрес к диапазону адресов Cloudflare. Так как если это истина, то дальнейшая проверка уже не имеет смысла. Поэтому передаем ip-адрес в функцию cloud_detect. Если она возвращает True, выводим сообщение в терминал, подаем звуковой сигнал и выходим из функции.

Python:
    if cloud_detect(ip):
        print(Fore.RED + '[-] Обнаружен адрес Cloudflare!')
        chime.theme('mario')
        chime.success()
        return

Если же это не так, запускаем цикл по списку диапазонов. Проверяем с помощью функции ip_network, которая итерирует переданный диапазон по ip-адресам, не входит ли полученный ip-адрес в один из диапазонов. Если входит, выводим сообщение в терминал, подаем звуковой сигнал и выходим из функции.

Python:
    for addr_y in yandex:
        if ip_address(ip) in ip_network(addr_y.strip()):
            print(Fore.RED + f'[-] Внимание! Запрос к Yandex: {Fore.RESET}{ip}')
            chime.theme('chime')
            chime.success()
            return

Python:
def change_ip(ip):
    """
    Выполняется проверка, входит ли адрес в
    диапазон адресов Cloudflare. Если да, завершаем
    работу функции. Если нет, проверяем, входит ли
    ip-адрес в диапазоны адресов Яндекс или Google.
    Если да, выводим сообщение в терминал и подаем
    звуковое оповещение.
    :param ip: ip-адрес для проверки.
    :return: выход из функции.
    """
    global yandex
    global google

    if cloud_detect(ip):
        print(Fore.RED + '[-] Обнаружен адрес Cloudflare!')
        chime.theme('mario')
        chime.success()
        return

    for addr_y in yandex:
        if ip_address(ip) in ip_network(addr_y.strip()):
            print(Fore.RED + f'[-] Внимание! Запрос к Yandex: {Fore.RESET}{ip}')
            chime.theme('chime')
            chime.success()
            return
    for addr_g in google:
        if ip_address(ip) in ip_network(addr_g.strip()):
            print(Fore.RED + f'[-] Внимание! Запрос к Google: {Fore.RESET}{ip}')
            chime.theme('big-sur')
            chime.success()
            return


Проверка ip-адреса на принадлежность к диапазону адресов Cloudflare

Теперь нужно создать функцию, в которой мы будем проверять, не входит ли адрес в диапазон адресов данной CDN. Для этого создадим функцию cloud_detect(ip), именно к ней мы обращались из предыдущей функции для проверки. На вход она принимает ip-адрес. Затем объявляем глобальную переменную в которой содержится список диапазонов адресов.

global list_addr

Запускаем цикл по списку диапазонов и передаем ip-адрес и диапазон для итерации и проверки на вхождение с помощью функции ip_network. Если адрес найден в диапазоне, возвращаем True, если нет – False.

Python:
def cloud_detect(ip):
    """
    Проверка, входит ли проверяемый ip-адрес
    в диапазон адресов Cloudflare.
    :param ip: ip-адрес для проверки.
    :return: True - найден, False - не найден.
    """
    global list_addr
    for addr in list_addr:
        if ip_address(ip) in ip_network(addr):
            return True
    return False


Получение имени активного сетевого интерфейса ОС Windows

Для получения имени активного сетевого интерфейса создадим функцию iface_win(). Имя интерфейса нужно для того, чтобы указать, с какого именно интерфейса прослушивать трафик. И если вы знаете это имя, вы можете указать его прямо в функции sniff. Но, если вы используете скрипт на разных машинах, лучше получить это имя непосредственно из операционной системы, так как пользователь может его попросту не знать.

С помощью subprocess.check_output выполняем команду «getmac /FO csv /NH /V» и получаем ее вывод, который нужно декодировать в кодировке «cp866». Разбиваем вывод построчно и в цикле пробегаемся по каждой строке. В строках, которые получены в выводе функции содержится mac-адрес сетевого интерфейса. И именно по нему мы будем проверять, нужный нам интерфейс или нет. Для этого необходимо получить этот mac-адрес. Воспользуемся функцией getmac.getmac.get_mac_address(), в которой заменим «:» на «-», так как вывод происходит в Linux-формате. А в ОС Windows разделителем является «-». И приведем его к верхнему регистру. А затем проверим, есть ли данный mac-адрес в строке. Если есть, забираем строку, разбиваем ее по запятым, забираем первый элемент списка, так как именно в нем содержится имя. Убираем лишние кавычки и возвращаем из функции.

Python:
def iface_win():
    """
    Получаем имя активного сетевого интерфейса в Windows.
    :return: имя активного сетевого интерфейса.
    """
    interface_all = subprocess.check_output('getmac /FO csv /NH /V', shell=False).decode('cp866').splitlines()
    for line in interface_all:
        if getmac.getmac.get_mac_address().replace(":", "-").upper() in line:
            return line.split(",")[0].replace('"', '')


Получение имени активного сетевого интерфейса ОС Linux

Получим имя активного сетевого интерфейса для Linux. Для этого также выполним, с помощью subprocess.check_output, команду «ip -h -br a | grep UP». Декодируем, разобьем вывод по пробелам и заберем первый элемент, очистим от лишних пробелов и вернем из функции.

Python:
def ip_iface():
    """
    Получаем имя активного сетевого интерфейса в Linux.
    :return: имя активного сетевого интерфейса.
    """
    return subprocess.check_output('ip -h -br a | grep UP', shell=True).decode().split()[0].strip()


Получение локального ip-адреса

Как вы помните, в функции packet_sniff мы получали локальный ip-адрес для того, чтобы сравнить его с адресом полученным из пакета. Получали мы его с помощью функции local_ipv4(). Давайте ее и создадим. На самом деле эту функцию я нашел на просторах Stack Overflow. И так как она работает, работает хорошо и исправно, по крайней мере, пока, я ее использую периодически в скриптах. А так как она является мультиплатформенной, то можно ее использовать в разных ОС. Здесь мы делаем запрос по адресу, после чего забираем адрес сокета. Который и является локальным адресом компьютера. Его и возвращаем из функции. В случае же, когда локальный адрес получить не удастся, будет возвращен адрес локальной петли.

Python:
def local_ipv4():
    """
    Выполняется запрос на адрес.
    После этого забирается адрес сокета.
    :return: локальный ip-адрес.
    """
    st = socket(AF_INET, SOCK_DGRAM)
    try:
        st.connect(('10.255.255.255', 1))
        ip_local = st.getsockname()[0]
    except Exception:
        ip_local = '127.0.0.1'
    finally:
        st.close()
    return ip_local

Python:
"""
Скрипт для перехвата DNS-трафика, проходящего
через сетевую карту по умолчанию. А также детектирования
обращений к IP-адресам Яндекс и Google, чтобы проверить,
насколько часто происходит к ним обращение.
При детекте выводится звуковой сигнал. Также проверяется
принадлежность ip-адреса домена к Cloudflare.
Для работы скрипта необходимо установить следующие библиотеки:
pip install chime colorama requests getmac
pip install --pre scapy[basic]
"""
import os.path
import platform
import subprocess
from datetime import datetime
from ipaddress import ip_network, ip_address
from socket import gethostbyname, gaierror, socket, AF_INET, SOCK_DGRAM

import chime
import getmac
from colorama import Fore
from colorama import init
from requests import get
from scapy.all import sniff

init()

list_addr = []
yandex = []
google = []


def local_ipv4():
    """
    Выполняется запрос на адрес.
    После этого забирается адрес сокета.
    :return: локальный ip-адрес.
    """
    st = socket(AF_INET, SOCK_DGRAM)
    try:
        st.connect(('10.255.255.255', 1))
        ip_local = st.getsockname()[0]
    except Exception:
        ip_local = '127.0.0.1'
    finally:
        st.close()
    return ip_local


def ip_iface():
    """
    Получаем имя активного сетевого интерфейса в Linux.
    :return: имя активного сетевого интерфейса.
    """
    return subprocess.check_output('ip -h -br a | grep UP', shell=True).decode().split()[0].strip()


def iface_win():
    """
    Получаем имя активного сетевого интерфейса в Windows.
    :return: имя активного сетевого интерфейса.
    """
    interface_all = subprocess.check_output('getmac /FO csv /NH /V', shell=False).decode('cp866').splitlines()
    for line in interface_all:
        if getmac.getmac.get_mac_address().replace(":", "-").upper() in line:
            return line.split(",")[0].replace('"', '')


def cloud_detect(ip):
    """
    Проверка, входит ли проверяемый ip-адрес
    в диапазон адресов Cloudflare.
    :param ip: ip-адрес для проверки.
    :return: True - найден, False - не найден.
    """
    global list_addr
    for addr in list_addr:
        if ip_address(ip) in ip_network(addr):
            return True
    return False


def change_ip(ip):
    """
    Выполняется проверка, входит ли адрес в
    диапазон адресов Cloudflare. Если да, завершаем
    работу функции. Если нет, проверяем, входит ли
    ip-адрес в диапазоны адресов Яндекс или Google.
    Если да, выводим сообщение в терминал и подаем
    звуковое оповещение.
    :param ip: ip-адрес для проверки.
    :return: выход из функции.
    """
    global yandex
    global google

    if cloud_detect(ip):
        print(Fore.RED + '[-] Обнаружен адрес Cloudflare!')
        chime.theme('mario')
        chime.success()
        return

    for addr_y in yandex:
        if ip_address(ip) in ip_network(addr_y.strip()):
            print(Fore.RED + f'[-] Внимание! Запрос к Yandex: {Fore.RESET}{ip}')
            chime.theme('chime')
            chime.success()
            return
    for addr_g in google:
        if ip_address(ip) in ip_network(addr_g.strip()):
            print(Fore.RED + f'[-] Внимание! Запрос к Google: {Fore.RESET}{ip}')
            chime.theme('big-sur')
            chime.success()
            return


def get_host(domain):
    """
    Попытка получить ip-адрес домена.
    :param domain: доменное имя для получения ip.
    :return: ip-адрес домена или сообщение об ошибке,
    в случае невозможности получить адрес.
    """
    try:
        return gethostbyname(domain)
    except gaierror:
        return 'Unknown - No address associated with hostname'


def packet_sniff(packet):
    """
    Получаем url-сайта из перехваченного пакета.
    Забираем из url доменное имя. Выводим сообщения в
    терминал. Пытаемся получить ip-адрес из DNS Resource Record.
    Если адреса нет, обрабатываем исключение, и пытаемся получить
    ip-адрес по доменному имени.
    В любом из случаев выводим сообщение в терминал об url и
    времени запроса. Запускаем функцию проверки ip-адреса на
    принадлежность к адресам Яндекс, Google или Cloudflare.
    :param packet: перехваченный пакет.
    :return: выход из функции.
    """
    domain = None
    qname = packet['DNS Question Record'].qname.decode()
    try:
        domain = f'{qname[0:-1].split(".")[-2]}.{qname[0:-1].split(".")[-1]}'
    except IndexError:
        print(Fore.GREEN + f'[+] Запрос: {Fore.RESET}{qname[0:-1]}')
        print(Fore.GREEN + f'[+] Время запроса: {Fore.YELLOW}{datetime.now().time()}')
        print(Fore.GREEN + '[+] Домен: Unknown')

    try:
        rdata = packet['DNS Resource Record'].rdata.decode()
        if rdata.split(".")[0] == local_ipv4().split(".")[0]:
            return
        print(Fore.GREEN + f'[+] Запрос: {Fore.RESET}{qname[0:-1]}')
        print(Fore.GREEN + f'[+] Домен: {Fore.YELLOW}{domain}')
        print(Fore.GREEN + f'[+] Время запроса: {Fore.YELLOW}{datetime.now().time()}')
        print(Fore.GREEN + f'[+] IP-адрес домена: {Fore.RESET}{rdata}')
        change_ip(rdata)
        print('\n')
    except IndexError:
        ip = get_host(domain)
        print(Fore.GREEN + f'[+] Запрос: {Fore.RESET}{qname[0:-1]}')
        print(Fore.GREEN + f'[+] Домен: {Fore.YELLOW}{domain}')
        print(Fore.GREEN + f'[+] Время запроса: {Fore.YELLOW}{datetime.now().time()}')
        print(Fore.GREEN + f'[+] IP-адрес домена: {Fore.RESET}{ip}')
        if ip != 'Unknown - No address associated with hostname':
            change_ip(ip)
        print('\n')


def load_range():
    """
    Проверяем, существует ли папка с диапазонами
    адресов. Если нет, выходим из функции. Если есть,
    получаем диапазоны адресов Cloudflare, присваиваем их
    в глобальную переменную list_addr.
    Загружаем диапазоны адресов Яндекс и Google, разбиваем
    построчно и присваиваем в глобальные переменные yandex
    и google.
    :return: выход из функции.
    """
    global list_addr
    global yandex
    global google

    dir_range = os.path.join(os.getcwd(), 'ip_range')
    if not os.path.exists(dir_range):
        print(Fore.RED + '[-] Не существует директории с конфигурацией')
        exit(0)

    print(Fore.GREEN + '\n[*] Получение диапазона адресов Cloudflare')
    list_addr = get(url='https://www.cloudflare.com/ips-v4').text.splitlines()
    list_addr.append("104.16.0.0/12")

    if os.path.exists(os.path.join(dir_range, 'yandex_ipv4.txt')):
        print(Fore.GREEN + '[*] Загрузка адресов Яндекс')
        with open(os.path.join(dir_range, 'yandex_ipv4.txt'), 'r', encoding='utf-8') as ya:
            yandex = ya.read().splitlines()

    if os.path.exists(os.path.join(dir_range, 'google_ipv4.txt')):
        print(Fore.GREEN + '[*] Загрузка адресов Google\n')
        with open(os.path.join(dir_range, 'google_ipv4.txt'), 'r', encoding='utf-8') as go:
            google = go.read().splitlines()


def main():
    """
    Выполняем проверку на операционную систему. В зависимости
    от этого проверяем, если под linux, запущен ли скрипт от
    имени суперпользователя. Если да, запускаем перехват трафика.
    В ОС Windows скрипт не требует наличия административных прав.
    Поэтому перехват трафика запускается сразу.
    Функция перехвата трафика имеет следующие параметры:
    :param iface: сетевой интерфейс для прослушивания.
    :param filter: фильтр, в данном случае фильтруем исходящие запросы на порт 53,
    который является портом DNS.
    :param count: количество перехваченных пакетов. В данном случае, перехват будет
    идти бесконечно.
    :paran store: не сохранять ни один полученный пакет.
    :param prn: функция в которую передаются пакеты для обработки.
    :return: выход из функции.
    """
    if platform.system() == "Linux":
        from os import getuid
        if not getuid() == 0:
            print('\n[+] Запустите скрипт с правами суперпользователя!\n')
            return
        load_range()
        print(Fore.GREEN + '[*] Перехват пакетов...\n')
        sniff(iface=ip_iface(), filter='dst port 53', count=0, store=False, prn=packet_sniff)
    elif platform.system() == "Windows":
        load_range()
        print(Fore.GREEN + '[*] Перехват пакетов...\n')
        sniff(iface=iface_win(), filter='dst port 53', count=0, store=False, prn=packet_sniff)


if __name__ == "__main__":
    main()

Вот в принципе и все. Только что мы создали скрипт, с помощью которого можем перехватывать и детектить DNS-трафик. Это уже вторая версия данного скрипта. В первой версии я пошел по темной стороне силы. Может быть потому, что у нее есть печеньки, но я пытался проитерировать и загрузить все адреса в оперативную память, то есть в глобальную переменную. И да, скрипт работал, так как у современных компьютеров памяти хватает. Но, как только я попытался запустить данный скрипт на Кальке с ограниченным объемом оперативки, а у нее было всего 2 Гб, то я был немедленно послан, а процесс прибит за нехваткой памяти. И тут я понял, что делаю что-то не так )). Пришлось подумать и переписать скрипт.

Небольшое видео с демонстрацией работы скрипта:



А на этом все.

Спасибо за внимание. Надеюсь, данная информация будет вам полезна
 

Вложения

  • dns_sniff.zip
    6 КБ · Просмотры: 329
Последнее редактирование:
Мы в соцсетях:

Обучение наступательной кибербезопасности в игровой форме. Начать игру!