Статья Поиск владельца ASN, IP, CIDR с помощью Python

Когда вы узнаете IP-адрес какого-либо сайта, то желательно получить по данному адресу как можно больше информации. В принципе, можно воспользоваться сервисом WHOIS. Но далеко не всегда данный сервис предоставляет все диапазоны IP-адресов, которые принадлежат организации по данному IP. И хотя, сервисов, да и скриптов, которые дают дополнительную информацию довольно много, почему бы нам для практики не сделать свой собственный скрипт на Python, который будет получать нужные нам данные.

000.png


Что такое ASN?

Если вкратце, то ASN, это номер автономной системы. Автономная система, это группа шлюзов, представляющих собой маршрутизаторы или роутеры, которые принадлежат одной организации. Организациями могут быть самые разнообразные поставщики услуг. Это интернет-провайдеры, хостеры или поисковые системы. Таким образом, номер автономной системы, это некий числовой идентификатор сетей, которые участвуют в BGP (Border Gateway Protocol), в котором определяются маршруты передачи пакетов по всему миру.


Зачем определять ASN по IP или IP по ASN?

Когда собирается информация, с помощью ASN можно получить некоторые дополнительные параметры. Для примера, можно определить, что разные IP-адреса принадлежат одной и той же организации, узнать все диапазоны IP-адресов организации, а также краткое описание.


Что потребуется?

Для того, чтобы написать скрипт, который получает данные об IP или ищет номер ASN, необходима база данных адресов. Воспользуемся базой Фрэнка Дениса «IPv4 to ASN map». В рамках данного скрипта будет использоваться файл, в котором собраны IPv4-адреса в их 32-битном представлении. Скачать данную базу в формате «.tsv» можно на сайте: или воспользоваться прямой ссылкой для . После того, как вы скачаете данный файл, его необходимо распаковать с директорию «ip2asn». Впрочем, выбор директории остается за вами. Просто в этом случае, вам нужно будет поменять путь к данному файлу в скрипте, который будет парсить данный файл.

Формат файла «.tsv», это почти то же самое, что и «.csv», только разделителем в данном случае служит табуляция. Для использования его в скрипте, нам необходимо его распарсить, то есть получить из него все данные и сохранить в базу sqlite, с которой мы будем непосредственно работать. Самым простым способом, которым можно прочитать данные из этого файла, это воспользоваться библиотекой pandas, в которой функционал работы с файлами данного формата доступен из коробки. Поэтому, ее необходимо установить. Для этого пишем в терминале:

pip install pandas

Также, для работы скрипта потребуется спарсить данные с сайта который дает доступ к xml-версии таблицы, расшифровывающей двухбуквенные сокращения стран. То есть, для примера, RU - Россия. И хотя, это и не является слишком значимым параметром для данного скрипта, тем не менее, читать уже расшифрованные данные гораздо удобнее, чем те же самые сокращения. Поэтому, для парсинга данных из формата xml воспользуемся библиотекой BeautifulSoup, в качестве парсера используем библиотеку lxml, а загружать xml будем с помощью библиотеки requests. Для их установки пишем в терминале команду:

pip install requests bs4 lxml

Для вывода полученных из базы данных в виде таблицы воспользуемся библиотекой rich, которая позволяет вывести данные в ячейках с переносом строки, для слишком длинных значений, что немаловажно. Так как, к примеру, при использовании той же библиотеки prettytable, когда попадаются слишком длинные названия организаций, текст рвется и на экране получается неразбериха. Для установки библиотеки rich пишем в терминале команду:

pip install rich

После того, как будут установлены все необходимые библиотеки, давайте приступим к написанию первого скрипта, который распарсит данные из файла «.tsv» и запишет из в базу данных sqlite.


Парсим таблицу «.tsv»

Создадим файл скрипта с названием parse_tsv.py. Импортируем в него нужные для работы библиотеки.

Python:
from json import dump
from pathlib import Path
from ipaddress import IPv4Address
from sqlite3 import connect

from pandas import read_csv


Сжатие базы данных

Для начала напишем функцию, которая будет сжимать созданную базу данных, если в ней окажется пустое место, для уменьшения ее объема.
Назовем данную функцию vacuum_base(). В нее не передаются никакие параметры.
Открываем базу данных, путь к которой указан в переменной path_base. Устанавливаем с ней соединение и выполняем команду «VACUUM», после чего закрываем соединение с базой.

Python:
def vacuum_base():
    """
    Очистка пустых значений из базы.
    """
    path_base = Path.cwd() / 'ip2asn' / 'ip2asn_base.db'
    conn = connect(path_base)
    conn.execute("VACUUM")
    conn.close()


Создание базы данных

Для работы скрипта потребуется функция, которая создает нужную нам базу данных. Назовем ее create_base(). Как видите, она также не принимает никаких параметров.

Для начала создаем директорию, в которую будем помещать базу данных. После, присвоим переменной path_base путь к базе. Создадим соединение с базой и выполним запрос на создание таблицы, если она не существует с полями cidr — здесь будут храниться данные о диапазонах адресов, asn — в данном столбце поместим данные об ASN, country — данные о стране, в которой находиться владелец диапазона адресов, owner — данные о владельце ASN и CIDR. Создадим таблицу и закроем соединение с базой.

Python:
def create_base():
    """
    Создаем базу данных sqlite.
    """
    path = Path.cwd() / 'ip2asn'
    path.mkdir(exist_ok=True)
    path_base = path / 'ip2asn_base.db'
    conn = connect(path_base)
    cur = conn.cursor()
    cur.execute("""CREATE TABLE IF NOT EXISTS ip2asn(
                   cidr TEXT,
                   asn TEXT,
                   country TEXT,
                   owner TEXT);
                """)
    conn.commit()
    cur.close()
    conn.close()


Запись данных в базу

Для записи данных в базу создадим функцию save_to_base(data_list: list). На вход она принимает список с кортежами, в которых будет находиться информация по диапазонам адресов и их ASN.

Здесь, все так же, как и в предыдущих функциях. Определяем путь к базе данных. Устанавливаем соединение. И выполняем запрос, с помощью которого записываем данные в базу. В данном случае запрос позволяет записать сразу же несколько параметров по каждому из столбцов базы. Сохраняем изменения и закрываем соединение с базой.

Python:
def save_to_base(data_list: list):
    """
    Сохраняем данные в базе.
    :param data_list: Список с кортежами из данных.
    """
    path_base = Path.cwd() / 'ip2asn' / 'ip2asn_base.db'
    conn = connect(path_base)
    cur = conn.cursor()
    cur.execute('BEGIN TRANSACTION')
    cur.executemany("INSERT INTO ip2asn VALUES(?, ?, ?, ?);", data_list)
    conn.commit()
    cur.close()
    conn.close()


Получение количества записей в базе

Создадим функцию record_count() -> int. Она возвращает целое число, которое соответствует количеству записей в базе. Определим путь к базе. Выполним соединение. Выполним запрос на получение всех записей из базы, которые будут возвращены в виде списка кортежей, определим количество записей и вернем его из функции. После чего закроем соединение с базой.


Python:
def record_count() -> int:
    path_base = Path.cwd() / 'ip2asn' / 'ip2asn_base.db'
    conn = connect(path_base)
    cur = conn.cursor()
    select_query = f"""SELECT * FROM ip2asn"""
    cur.execute(select_query)
    data = len(cur.fetchall())
    cur.close()
    conn.close()
    return data


Парсим данные из файла «.tsv» и сохраняем в базу sqlite


Для того, чтобы получить данные из файла «.tsv» создадим функцию ip_parse(). Для начала определим две переменные, одна из которых это список, куда мы будем помещать кортежи с данными для записи в базу: asn_data; вторая словарь, куда мы также будем помещать полученные данные, но уже для сохранения их в виде json. И хотя, в дальнейшем, данный json использоваться не будет, все же, сохраним базу еще и в этом формате, для возможного использования в будущем.

Определим заголовки таблицы headers , которую будем формировать с помощью pandas. Читаем файл с помощью функции pandas.read_csv, указываем разделитель, которым является табуляция, а также указываем необходимые нам заголовки.

Определяем переменные, в которые будем сохранять получаемые из строк значения. Запустим цикл по значениям датафрейма. В следующем цикле проитерируемся по значениям столбцов. Проверяем заголовок столбца и в соответствии с этим присваиваем переменным значения.

Python:
def ip_parse():
    asn_data = []
    ip2asn = dict()
    headers = ['start_ip', 'end_ip', 'asn', 'country', 'owner']
    df = read_csv(Path.cwd() / 'ip2asn' / 'ip2asn-v4-u32.tsv', sep='\t', header=None,
                  names=headers)
    start_ip = end_ip = asn = country = owner = ""
    for val in df.values:
        for num, item in enumerate(val):
            if df.columns[num] == "start_ip":
                start_ip = item
            elif df.columns[num] == "end_ip":
                end_ip = item
            elif df.columns[num] == "asn":
                asn = item
            elif df.columns[num] == "country":
                country = item
            elif df.columns[num] == "owner":
                owner = item

Так как в таблице нет диапазонов адресов, а только начальный и конечный адрес диапазона, нам необходимо вычислить его самостоятельно. Так как IP-адреса диапазона представлены в уже упакованном виде, преобразуем строковые значения в число, вычтем из конечного адреса начальный. Преобразуем полученное число в двоичный вид, вычтем 1. Затем от максимального значение диапазона, то есть 32 отнимем то, что у нас получилось. Это и будет искомый CIDR.
После вычислений добавим значения в список и словарь. После того, как мы получим все значения, сохраним данные в json и запишем в базу данных sqlite.

Python:
        prefixlen = 32 - ((int(end_ip) - int(start_ip) + 1).bit_length() - 1)
        cidr_notation = f'{IPv4Address(int(start_ip))}/{prefixlen}'
        print(f'CIDR: {cidr_notation} | ASN: {asn} | Country: {country} | Owner: {owner}')
        ip2asn.update({cidr_notation: {"asn": asn, "country": country, "owner": owner}})
        asn_data.append((cidr_notation, asn, country, owner))
    with open(Path.cwd() / 'ip2asn' / 'ip2asn-v4-u32.json', 'w', encoding='utf-8') as file:
        dump(ip2asn, file, indent=4, ensure_ascii=False)
    save_to_base(asn_data)


Python:
def ip_parse():
    asn_data = []
    ip2asn = dict()
    headers = ['start_ip', 'end_ip', 'asn', 'country', 'owner']
    df = read_csv(Path.cwd() / 'ip2asn' / 'ip2asn-v4-u32.tsv', sep='\t', header=None,
                  names=headers)
    start_ip = end_ip = asn = country = owner = ""
    for val in df.values:
        for num, item in enumerate(val):
            if df.columns[num] == "start_ip":
                start_ip = item
            elif df.columns[num] == "end_ip":
                end_ip = item
            elif df.columns[num] == "asn":
                asn = item
            elif df.columns[num] == "country":
                country = item
            elif df.columns[num] == "owner":
                owner = item
        prefixlen = 32 - ((int(end_ip) - int(start_ip) + 1).bit_length() - 1)
        cidr_notation = f'{IPv4Address(int(start_ip))}/{prefixlen}'
        print(f'CIDR: {cidr_notation} | ASN: {asn} | Country: {country} | Owner: {owner}')
        ip2asn.update({cidr_notation: {"asn": asn, "country": country, "owner": owner}})
        asn_data.append((cidr_notation, asn, country, owner))
    with open(Path.cwd() / 'ip2asn' / 'ip2asn-v4-u32.json', 'w', encoding='utf-8') as file:
        dump(ip2asn, file, indent=4, ensure_ascii=False)
    save_to_base(asn_data)

Теперь осталось только запустить написанный нами скрипт. Для этого напишем еще одну функцию main, в которой будем делать проверку на существование базы, если ее нет — создавать. После чего запускать функцию парсинга .tsv. Затем сожмем созданную базу данных и выведем из нее количество записанных значений.

Python:
def main():
    if not (Path.cwd() / 'ip2asn' / 'ip2asn_base.db').exists():
        create_base()
    ip_parse()
    vacuum_base()
    print(f"\nКоличество записей в базе ASN: {record_count()}")


if __name__ == "__main__":
    main()

Python:
# pip install pandas

from json import dump
from pathlib import Path
from ipaddress import IPv4Address
from sqlite3 import connect

from pandas import read_csv


def vacuum_base():
    """
    Очистка пустых значений из базы.
    """
    path_base = Path.cwd() / 'ip2asn' / 'ip2asn_base.db'
    conn = connect(path_base)
    conn.execute("VACUUM")
    conn.close()


def create_base():
    """
    Создаем базу данных sqlite.
    """
    path = Path.cwd() / 'ip2asn'
    path.mkdir(exist_ok=True)
    path_base = path / 'ip2asn_base.db'
    conn = connect(path_base)
    cur = conn.cursor()
    cur.execute("""CREATE TABLE IF NOT EXISTS ip2asn(
                   cidr TEXT,
                   asn TEXT,
                   country TEXT,
                   owner TEXT);
                """)
    conn.commit()
    cur.close()
    conn.close()


def save_to_base(data_list: list):
    """
    Сохраняем данные в базе.
    :param data_list: Список с кортежами из данных.
    """
    path_base = Path.cwd() / 'ip2asn' / 'ip2asn_base.db'
    conn = connect(path_base)
    cur = conn.cursor()
    cur.execute('BEGIN TRANSACTION')
    cur.executemany("INSERT INTO ip2asn VALUES(?, ?, ?, ?);", data_list)
    conn.commit()
    cur.close()
    conn.close()


def record_count() -> int:
    path_base = Path.cwd() / 'ip2asn' / 'ip2asn_base.db'
    conn = connect(path_base)
    cur = conn.cursor()
    select_query = f"""SELECT * FROM ip2asn"""
    cur.execute(select_query)
    data = len(cur.fetchall())
    cur.close()
    conn.close()
    return data


def ip_parse():
    asn_data = []
    ip2asn = dict()
    headers = ['start_ip', 'end_ip', 'asn', 'country', 'owner']
    df = read_csv(Path.cwd() / 'ip2asn' / 'ip2asn-v4-u32.tsv', sep='\t', header=None,
                  names=headers)
    start_ip = end_ip = asn = country = owner = ""
    for val in df.values:
        for num, item in enumerate(val):
            if df.columns[num] == "start_ip":
                start_ip = item
            elif df.columns[num] == "end_ip":
                end_ip = item
            elif df.columns[num] == "asn":
                asn = item
            elif df.columns[num] == "country":
                country = item
            elif df.columns[num] == "owner":
                owner = item
        prefixlen = 32 - ((int(end_ip) - int(start_ip) + 1).bit_length() - 1)
        cidr_notation = f'{IPv4Address(int(start_ip))}/{prefixlen}'
        print(f'CIDR: {cidr_notation} | ASN: {asn} | Country: {country} | Owner: {owner}')
        ip2asn.update({cidr_notation: {"asn": asn, "country": country, "owner": owner}})
        asn_data.append((cidr_notation, asn, country, owner))
    with open(Path.cwd() / 'ip2asn' / 'ip2asn-v4-u32.json', 'w', encoding='utf-8') as file:
        dump(ip2asn, file, indent=4, ensure_ascii=False)
    save_to_base(asn_data)


def main():
    if not (Path.cwd() / 'ip2asn' / 'ip2asn_base.db').exists():
        create_base()
    ip_parse()
    vacuum_base()
    print(f"\nКоличество записей в базе ASN: {record_count()}")


if __name__ == "__main__":
    main()


Парсим xml для расшифровки сокращенных значений стран

Если вы помните, следующим шагом, который нам потребуется сделать, это получить xml-файл с таблицей, содержащей расшифровку сокращенных значений стран и распарсить его для последующего использования в основном скрипте. Поэтому, создадим файл parse_country_code.py. Импортируем в скрипт нужные для работы скрипта библиотеки. Все они у вас должны быть установлены заранее.

Python:
import json
from pathlib import Path

from bs4 import BeautifulSoup
from requests import get

Так как у нас есть прямая ссылка на файл xml ( ), все, что нам потребуется, это получить текст содержащий код.


Получение файла xml

Создадим функцию get_table(url: str) -> (str, bool). На вход она получает ссылку на файл, а возвращает полученный текст или False в случае неудачи.
Переходим по ссылке. Проверяем статус код. Если он 200, возвращаем из функции текст xml. Если нет, False.

Python:
def get_table(url: str) -> (str, bool):
    try:
        res = get(url=url)
        if res.status_code == 200:
            return res.text
    except Exception:
        return False


Парсинг полученных данных

Создадим функцию parse_table(text: str) -> (dict, bool). На вход она получает текст xml, а возвращает словарь с распарсенными значениями или False, в случае неудачи.
Создаем словарь country_alpha2. Затем создаем объект BeautifulSoup, в который передаем полученный xml и указываем парсер, с помощью которого будем парсить значения. Находим все теги country.
Итерируемся по найденным блокам, забираем из них краткое наименование страны на русском, а также сокращенное двухбуквенное международное наименование. Обновляем словарь и добавляем в него полученные значения. После чего, возвращаем словарь из функции.

Python:
def parse_table(text: str) -> (dict, bool):
    country_alpha2 = dict()
    soup = BeautifulSoup(text, 'xml')
    countries = soup.find_all('country')
    for country in countries:
        name = country.find('name').text.strip()
        alpha2 = country.find('alpha2').text
        country_alpha2.update({alpha2: name})
    if country_alpha2:
        return country_alpha2
    return False


Сохраняем полученные значения

Создадим функцию main. Получим текст xml, проверим, не пустое ли значение переменной с текстом. Если нет, передаем в функцию парсинга значений из которой возвращается словарь с полученными значениями. Проверяем, также, чтобы словарь не был пустым, после чего сохраняем полученные данные в файл json.

Python:
def main():
    if text := get_table('https://www.artlebedev.ru/country-list/xml/'):
        if country_alpha2 := parse_table(text):
            with open(Path.cwd() / 'ip2asn' / 'country_alpha2.json', 'w', encoding='utf-8') as cn:
                json.dump(country_alpha2, cn, indent=4, ensure_ascii=False)
        else:
            print("No data")
    else:
        print('No text')


if __name__ == "__main__":
    main()

Python:
# pip install requests bs4 lxml

import json
from pathlib import Path

from bs4 import BeautifulSoup
from requests import get


def get_table(url: str) -> (str, bool):
    try:
        res = get(url=url)
        if res.status_code == 200:
            return res.text
    except Exception:
        return False


def parse_table(text: str) -> (dict, bool):
    country_alpha2 = dict()
    soup = BeautifulSoup(text, 'xml')
    countries = soup.find_all('country')
    for country in countries:
        name = country.find('name').text.strip()
        alpha2 = country.find('alpha2').text
        country_alpha2.update({alpha2: name})
    if country_alpha2:
        return country_alpha2
    return False


def main():
    if text := get_table('https://www.artlebedev.ru/country-list/xml/'):
        if country_alpha2 := parse_table(text):
            with open(Path.cwd() / 'ip2asn' / 'country_alpha2.json', 'w', encoding='utf-8') as cn:
                json.dump(country_alpha2, cn, indent=4, ensure_ascii=False)
        else:
            print("No data")
    else:
        print('No text')


if __name__ == "__main__":
    main()


Создаем скрипт для получения ASN по IP и наоборот

На самом деле, из данной базы можно получить несколько значений. Для начала, это поискать IP-адрес, и получить по нему CIDR, ASN, страну и владельца диапазона. Также, здесь можно выполнить поиск по ASN и получить все найденные значения, которые будут включать все вышеприведенные параметры.

Создадим файл ip2asn.py. Импортируем в него библиотеки, нужные для работы скрипта.

Python:
import json
import sqlite3
import sys
from ipaddress import IPv4Network, IPv4Address, AddressValueError
from pathlib import Path

from rich.console import Console
from rich.table import Table

Для начала, так как мы будем пользоваться консолью и передачей параметров в командной строке, а не в диалоговом режиме, создадим небольшой файл справки с доступными опциями для нашего скрипта.

Python:
helps = """
    Команды:

    -h - вызов справки (пример: python3 ip2asn.py -h);
    -ip - поиск IP-адреса в базе (пример: python3 ip2asn.py -ip 192.168.1.1);
    -ipv - дополнительный параметр, для исключения поиска похожих данных
                                (пример: python3 ip2asn.py -ipv 192.168.1.1);
    -d - поиск CIDR в базе (пример: python3 ip2asn.py -d 192.168.0.0/16);
    -a - поиск ASN в базе (пример: python3 ip2asn.py -a 3361);
    -c - выборка из базы по стране (пример: python3 ip2asn.py -c CN);
    -o - выборка из базы по оператору (пример: python3 ip2asn.py -o "BJ-GUANGHUAN-AP Beijing");
         обратите внимание, если название оператора пишется с пробелами, стоку необходимо заключить в кавычки.
    -oc - выборка по оператору и фильтр по стране (пример: python3 ip2asn.py -oc "BJ-GUANGHUAN-AP Beijing" CN);
    """


Получение данных из базы

Создадим функцию select_data(column='', value='') -> (list, bool). На вход она получает название столбца в базе данных, по которому нужно осуществлять выборку и значение, которое необходимо найти.
Укажем значение по умолчанию, так как не во всех случаях оно требуется. Возвращает данная функция список с полученными значениями в случае успеха и False в случае неудачи.
Для того, чтобы не создавать несколько функций для различных запросов, попытаемся сделать ее универсальной, для получения необходимых данных.
Подключаемся к базе данных.

Python:
def select_data(column='', value='') -> (list, bool):
    path_base = Path.cwd() / 'ip2asn' / 'ip2asn_base.db'
    conn = sqlite3.connect(path_base)
    cur = conn.cursor()

Проверяем переданные в функцию значения. Если значение столбца пусто, делаем полную выборку данных из базы.
Если в качестве значения будет указан поиск владельца диапазона или ASN, выполняем запрос, выбирающий не только точно подходящие значения, но и те, что содержат в себе искомое значение.
Для этого используем оператор LIKE.
Так как выборка по ASN, стране и CIDR происходит одинаково, будем проверять наличие одного из этих параметров.
После чего выбираем данные по значению из определенного столбца.
В случае с IP-адресом возможна выборка всех значений владельца, который был получен в основном запросе.
Поэтому проверим еще одно условие и сделаем выборку по каждому из них. После чего вернем значения из функции.

Python:
    if not column:
        select_query = f"""SELECT * FROM ip2asn"""
        cur.execute(select_query)
    elif column == "owner":
        select_query = f"""SELECT * FROM ip2asn WHERE owner LIKE ?"""
        cur.execute(select_query, (f"%{value}%",))
    elif column in ['asn', 'country', 'cidr']:
        select_query = f"""SELECT * FROM ip2asn WHERE {column} = ?"""
        cur.execute(select_query, (value,))
    elif column == "asn,owner":
        data_list = []
        val = value.split("|")
        for col in ["asn", "owner"]:
            dat = []
            if col == "asn":
                select_query = f"""SELECT * FROM ip2asn WHERE {col} = ?"""
                dat = cur.execute(select_query, (val[0],))
            elif col == "owner":
                select_query = f"""SELECT * FROM ip2asn WHERE {col} LIKE ?"""
                dat = cur.execute(select_query, (f"%{val[1]}%",))
            if dat:
                data_list.extend(cur.fetchall())
        if data_list:
            return data_list

Далее, выполняем запрос из других значений. И также, если данные были получены, возвращаем из фукнции.

Python:
    data = cur.fetchall()
    cur.close()
    conn.close()
    return data if data else False

Python:
def select_data(column='', value='') -> (list, bool):
    path_base = Path.cwd() / 'ip2asn' / 'ip2asn_base.db'
    conn = sqlite3.connect(path_base)
    cur = conn.cursor()
    if not column:
        select_query = f"""SELECT * FROM ip2asn"""
        cur.execute(select_query)
    elif column == "owner":
        select_query = f"""SELECT * FROM ip2asn WHERE owner LIKE ?"""
        cur.execute(select_query, (f"%{value}%",))
    elif column in ['asn', 'country', 'cidr']:
        select_query = f"""SELECT * FROM ip2asn WHERE {column} = ?"""
        cur.execute(select_query, (value,))
    elif column == "asn,owner":
        data_list = []
        val = value.split("|")
        for col in ["asn", "owner"]:
            dat = []
            if col == "asn":
                select_query = f"""SELECT * FROM ip2asn WHERE {col} = ?"""
                dat = cur.execute(select_query, (val[0],))
            elif col == "owner":
                select_query = f"""SELECT * FROM ip2asn WHERE {col} LIKE ?"""
                dat = cur.execute(select_query, (f"%{val[1]}%",))
            if dat:
                data_list.extend(cur.fetchall())
        if data_list:
            return data_list
    data = cur.fetchall()
    cur.close()
    conn.close()
    return data if data else False


Функция для поиска IP-адреса

Создадим функцию ip_find(ip: str) -> (tuple, bool). На вход она получает переданный в функцию IP-адрес, а возвращает кортеж из полученного значения.
Выполняем запрос к базе и итерируемся по полученным данным. Проверяем, входит ли искомый IP-адрес в диапазон представленный в базе. Если да, возвращаем кортеж со значениями из функции.

Python:
def ip_find(ip: str) -> (tuple, bool):
    for dat in select_data():
        if IPv4Address(ip) in IPv4Network(dat[0], False):
            return dat
    return False


Функция для поиска ASN, страны и CIDR

Создадим функцию asn_country_cidr_find(column: str, value: str) -> (list, bool). На входе она получает значение столбца, в котором необходимо произвести поиск, а также само значение, которое нужно выбрать из базы. Возвращает функция список с полученными кортежами.

Выборка здесь делается похожим образом, а потому, для получения данных по этим значениям достаточно одной функции. Выполняем запрос к базе данных, в которую передаем столбец, по которому будет вестись выборка и значение, которое необходимо найти. Проверяем, получены ли данные. Если да, возвращаем их из функции. Если нет, возвращаем False.

Python:
def asn_country_cidr_find(column: str, value: str) -> (list, bool):
    if asn_list := select_data(column, value):
        return asn_list
    return False


Функция для поиска владельца с возможность фильтрации по стране

Создадим функцию owner_find(value: str, country='') -> (list, bool). На вход она получает значение, по которому необходимо выполнить выборку. А также, возможна передача страны, по которой отберутся данные из полученных результатов. Создаем список, в который будем складывать кортежи с значениями страны, которая была передана в функцию. Выполним запрос и если были возвращены значения, проверяем, была ли указана страна для фильтрации. Если нет, просто возвращаем полученные значения. Если страна указана, итерируемся по полученным данным. Проверяем, соответствует ли переданное значение переданной в функцию стране. Добавляем в список, если да.

После проверяем список с отфильтрованными значениями. Если он не пуст, возвращаем из функции. Если пуст, возвращаем False.

Python:
def owner_find(value: str, country='') -> (list, bool):
    country_list = []
    if own := select_data('owner', value):
        if not country:
            return own
        else:
            for con in own:
                if con[2] == country:
                    country_list.append(con)
    if country_list:
        return country_list
    return False


Проверка IP-адреса

Создадим вспомогательную функцию, которая будет проверять переданный в параметрах IP-адрес на валидность — check_ip(ip).
Здесь все просто. Передаем полученное значение адреса в объект IPv4Address. Если значение не является IP-аресом, будет получено исключение, а значит, адрес не валиден.

Python:
def check_ip(ip):
    try:
        IPv4Address(ip)
        return True
    except AddressValueError:
        return False


Печать полученных результатов

Создадим функцию print_res(res: list, title: str), которая на входе получает список с данными для печати, а также заголовок для таблицы.
Открываем json-файл с расшифровкой значений стран. Передаем словарь в переменную cn_code. Создаем объект таблицы, в который передаем заголовок и его положение относительно таблицы.
Добавляем столбцы, задаем названия, устанавливаем перенос значений в False, указываем выравнивание в ячейке и цвет символов в ней.
После итерируемся по переданным в функцию данным, забираем значения из кортежей и помещаем в объект таблицы в виде строк.
Затем распечатываем созданную таблицы. Для этого создаем объект Console и выводим в него таблицу.

Python:
def print_res(res: list, title: str):
    with open(Path.cwd() / 'ip2asn' / 'country_alpha2.json', 'r', encoding='utf-8') as cn:
        cn_code = json.load(cn)
    table = Table(title=title, title_justify='left')
    table.add_column("CIDR", no_wrap=False, justify="left", style="green")
    table.add_column("ASN", no_wrap=False, justify="left", style="green")
    table.add_column("Country", no_wrap=False, justify="left", style="green")
    table.add_column("Owner", no_wrap=False, justify="left", style="green")
    for r in res:
        table.add_row(r[0], r[1], f'"{cn_code[r[2]]}, {r[2]}"', f'"{r[3]}"')
    console = Console()
    print(' ')
    console.print(table)


Функция парсинга значений из командной строки

Так как все действия, которые здесь выполняются, по большей части однотипны, описывать все не имеет большого значения. Ограничимся описанием нескольких из них.
Проверяем строку на наличие команд. Если команда является -ip, то забираем указанное после нее значение адреса, проверяем его на валидность.
Затем выполняем поиск адреса в диапазонах. Если адрес найден, также выполняем поиск по дополнительным параметрам, то есть, по ASN и владельцу. Если значения найдены, выводим на печать.
Следующий параметр, то же самое, но с дополнительным параметром v. Это указывает, что поиск по дополнительным значениям осуществляться не будет.
Для вывода справки нужно набрать в терминале команду:

python3 ip2asn.py -h

Python:
if __name__ == "__main__":
    try:
        if sys.argv[1] == "-ip":
            if check_ip(sys.argv[2]):
                if fnd := ip_find(sys.argv[2]):
                    print_res([fnd], 'Найденные значения IP:')
                    if own := select_data('asn,owner', f'{fnd[2]}|{fnd[3]}'):
                        print_res(own, 'Дополнительные значения (ASN, владелец):')
                else:
                    print("Данные не найдены")
            else:
                print(helps)
        elif sys.argv[1] == "-ipv":
            if check_ip(sys.argv[2]):
                if fnd := ip_find(sys.argv[2]):
                    print_res([fnd], 'Найденные значения IP:')
                else:
                    print("Данные не найдены")
            else:
                print(helps)

Python:
if __name__ == "__main__":
    try:
        if sys.argv[1] == "-ip":
            if check_ip(sys.argv[2]):
                if fnd := ip_find(sys.argv[2]):
                    print_res([fnd], 'Найденные значения IP:')
                    if own := select_data('asn,owner', f'{fnd[2]}|{fnd[3]}'):
                        print_res(own, 'Дополнительные значения (владелец):')
                else:
                    print("Данные не найдены")
            else:
                print(helps)
        elif sys.argv[1] == "-ipv":
            if check_ip(sys.argv[2]):
                if fnd := ip_find(sys.argv[2]):
                    print_res([fnd], 'Найденные значения IP:')
                else:
                    print("Данные не найдены")
            else:
                print(helps)
        elif sys.argv[1] == "-d":
            try:
                IPv4Network(sys.argv[2])
            except AddressValueError:
                print(helps)
                sys.exit(0)
            if asn_country := asn_country_cidr_find('cidr', sys.argv[2]):
                print_res(asn_country, 'Найденные значения CIDR:')
            else:
                print("Данные не найдены")
        elif sys.argv[1] == "-a":
            if asn_country := asn_country_cidr_find('asn', sys.argv[2]):
                print_res(asn_country, 'Найденные значения ASN:')
            else:
                print("Данные не найдены")
        elif sys.argv[1] == "-c":
            if asn_country := asn_country_cidr_find('country', sys.argv[2]):
                print_res(asn_country, 'Найденные значения Country:')
            else:
                print("Данные не найдены")
        elif sys.argv[1] == "-o":
            if own := owner_find(sys.argv[2].replace("'", "").replace('"', '').replace("asn", "").replace("ASN", "")):
                print_res(own, 'Найденные значения Owner:')
            else:
                print("Данные не найдены")
        elif sys.argv[1] == "-oc":
            owner = sys.argv[2].replace("'", "").replace('"', '').replace("asn", "").replace("ASN", "")
            cont = sys.argv[3].upper()
            if own := owner_find(owner, cont):
                print_res(own, 'Найденные значения Owner - Country:')
            else:
                print("Данные не найдены")
        elif sys.argv[1] == "-h":
            print(helps)
    except IndexError:
        print(helps)



Python:
# pip install rich

import json
import sqlite3
import sys
from ipaddress import IPv4Network, IPv4Address, AddressValueError
from pathlib import Path

from rich.console import Console
from rich.table import Table

helps = """
    Команды:

    -h - вызов справки (пример: python3 ip2asn.py -h);
    -ip - поиск IP-адреса в базе (пример: python3 ip2asn.py -ip 192.168.1.1);
    -ipv - дополнительный параметр, для исключения поиска похожих данных
                                (пример: python3 ip2asn.py -ipv 192.168.1.1);
    -d - поиск CIDR в базе (пример: python3 ip2asn.py -d 192.168.0.0/16);
    -a - поиск ASN в базе (пример: python3 ip2asn.py -a 3361);
    -c - выборка из базы по стране (пример: python3 ip2asn.py -c CN);
    -o - выборка из базы по оператору (пример: python3 ip2asn.py -o "BJ-GUANGHUAN-AP Beijing");
         обратите внимание, если название оператора пишется с пробелами, стоку необходимо заключить в кавычки.
    -oc - выборка по оператору и фильтр по стране (пример: python3 ip2asn.py -oc "BJ-GUANGHUAN-AP Beijing" CN);
    """


def select_data(column='', value='') -> (list, bool):
    path_base = Path.cwd() / 'ip2asn' / 'ip2asn_base.db'
    conn = sqlite3.connect(path_base)
    cur = conn.cursor()
    if not column:
        select_query = f"""SELECT * FROM ip2asn"""
        cur.execute(select_query)
    elif column == "owner":
        select_query = f"""SELECT * FROM ip2asn WHERE owner LIKE ?"""
        cur.execute(select_query, (f"%{value}%",))
    elif column in ['asn', 'country', 'cidr']:
        select_query = f"""SELECT * FROM ip2asn WHERE {column} = ?"""
        cur.execute(select_query, (value,))
    elif column == "asn,owner":
        data_list = []
        val = value.split("|")
        for col in ["asn", "owner"]:
            dat = []
            if col == "asn":
                select_query = f"""SELECT * FROM ip2asn WHERE {col} = ?"""
                dat = cur.execute(select_query, (val[0],))
            elif col == "owner":
                select_query = f"""SELECT * FROM ip2asn WHERE {col} LIKE ?"""
                dat = cur.execute(select_query, (f"%{val[1]}%",))
            if dat:
                data_list.extend(cur.fetchall())
        if data_list:
            return data_list
    data = cur.fetchall()
    cur.close()
    conn.close()
    return data if data else False


def ip_find(ip: str) -> (tuple, bool):
    for dat in select_data():
        if IPv4Address(ip) in IPv4Network(dat[0], False):
            return dat
    return False


def asn_country_cidr_find(column: str, value: str) -> (list, bool):
    if asn_list := select_data(column, value):
        return asn_list
    return False


def owner_find(value: str, country='') -> (list, bool):
    country_list = []
    if own := select_data('owner', value):
        if not country:
            return own
        else:
            for con in own:
                if con[2] == country:
                    country_list.append(con)
    if country_list:
        return country_list
    return False


def check_ip(ip):
    try:
        IPv4Address(ip)
        return True
    except AddressValueError:
        return False


def print_res(res: list, title: str):
    with open(Path.cwd() / 'ip2asn' / 'country_alpha2.json', 'r', encoding='utf-8') as cn:
        cn_code = json.load(cn)
    table = Table(title=title, title_justify='left')
    table.add_column("CIDR", no_wrap=False, justify="left", style="green")
    table.add_column("ASN", no_wrap=False, justify="left", style="green")
    table.add_column("Country", no_wrap=False, justify="left", style="green")
    table.add_column("Owner", no_wrap=False, justify="left", style="green")
    for r in res:
        table.add_row(r[0], r[1], f'"{cn_code[r[2]]}, {r[2]}"', f'"{r[3]}"')
    console = Console()
    print(' ')
    console.print(table)


if __name__ == "__main__":
    try:
        if sys.argv[1] == "-ip":
            if check_ip(sys.argv[2]):
                if fnd := ip_find(sys.argv[2]):
                    print_res([fnd], 'Найденные значения IP:')
                    if own := select_data('asn,owner', f'{fnd[2]}|{fnd[3]}'):
                        print_res(own, 'Дополнительные значения (ASN, владелец):')
                else:
                    print("Данные не найдены")
            else:
                print(helps)
        elif sys.argv[1] == "-ipv":
            if check_ip(sys.argv[2]):
                if fnd := ip_find(sys.argv[2]):
                    print_res([fnd], 'Найденные значения IP:')
                else:
                    print("Данные не найдены")
            else:
                print(helps)
        elif sys.argv[1] == "-d":
            try:
                IPv4Network(sys.argv[2])
            except AddressValueError:
                print(helps)
                sys.exit(0)
            if asn_country := asn_country_cidr_find('cidr', sys.argv[2]):
                print_res(asn_country, 'Найденные значения CIDR:')
            else:
                print("Данные не найдены")
        elif sys.argv[1] == "-a":
            if asn_country := asn_country_cidr_find('asn', sys.argv[2]):
                print_res(asn_country, 'Найденные значения ASN:')
            else:
                print("Данные не найдены")
        elif sys.argv[1] == "-c":
            if asn_country := asn_country_cidr_find('country', sys.argv[2]):
                print_res(asn_country, 'Найденные значения Country:')
            else:
                print("Данные не найдены")
        elif sys.argv[1] == "-o":
            if own := owner_find(sys.argv[2].replace("'", "").replace('"', '').replace("asn", "").replace("ASN", "")):
                print_res(own, 'Найденные значения Owner:')
            else:
                print("Данные не найдены")
        elif sys.argv[1] == "-oc":
            owner = sys.argv[2].replace("'", "").replace('"', '').replace("asn", "").replace("ASN", "")
            cont = sys.argv[3].upper()
            if own := owner_find(owner, cont):
                print_res(own, 'Найденные значения Owner - Country:')
            else:
                print("Данные не найдены")
        elif sys.argv[1] == "-h":
            print(helps)
    except IndexError:
        print(helps)

Ниже показан результат поиска IP-адреса. Была использована опция, которая отключает вывод дополнительных значений:

01.png


Сохранение полученных значений в данном скрипте реализовывать не стал, так как вывод можно легко переадресовать в файл:

python3 ip2asn.py -d 192.168.0.0/16 > 123.txt

А на этом, пожалуй, все.

Спасибо за внимание. Надеюсь, данная информация будет вам полезна
 

Вложения

  • ip2asn.zip
    4 КБ · Просмотры: 178
Последнее редактирование модератором:
Мы в соцсетях:

Обучение наступательной кибербезопасности в игровой форме. Начать игру!