Статья для участия в Конкурсе программистов
Граббер анонимных FTP.
Используя анонимные логин и пароль (anonymous/anonymous) граббер пробует подключиться к хосту по протоколу ftp. Если на удалённом хосте разрешена анонимная аутентификация, то граббер собирает информацию о файла/папках, хранящихся в открытом доступе.
В этой конкурсной статье хотелось бы вспомнить старый, добрый FTP, ведь порой людская лень даёт о себе знать, а её результаты могут быть очень интересны. Хотелось бы показать, как можно собрать информацию о файлах/папках, лежащих на ftp-хостах в открытом доступе и сделать всё это в автоматическом режиме.
Для понимания общей логики программы приведу блок-схему:
В составе программы работает:
- «parse_args» - метод, который определяет аргументы командной строки при запуске программы;
- «FTPGrabber» - главный класс, содержащий 4 метода (конструктор «__init__», «main», «anonymous_login», «get_list_dir»), объединённые общей логикой;
- «generate_ip_range» - вспомогательный метод, импортируемый из написанного модуля.
Стек вызовов программы:
Программа должна получить один обязательный аргумент и два опциональных, необязательных.
Обязательный - значение должно содержать ip-адрес, либо сеть в CIDR-нотации.
Опциональные – это количество потоков (по умолчанию 100) и поиск файлов по расширениям.
На поиске фалов по расширениям остановлюсь чуть подробнее. Если опция не задана, то программа граббер собирает все файлы и папки. Если задана опция –e, то необходимо указать расширения файлов, которые необходимо сграбить. Таким образом, если опцию задать следующим видом –e .txt,.xls,.doc, то при сборе данных граббер будет учитывать только файлы с расширениями txt, xls и doc.
В двух методах «anonymous_login» и «get_list_dir» есть блоки обработки исключения, в которых по факту исключения не обрабатываются. Сделано это намерено, так как в этом нет нужды. Например, если попытка установить анонимное соединение (в методе «anonymous_login») окажется неудачной, то вызывается соответствующее исключение, мы могли бы проинформировать об этом пользователя, но если будет проверяться, допустим, более 100000 хостов, то страшно представить, сколько будет ненужной информации. Мы нацелены только на успешные попытки анонимной аутентификации. Поэтому мы сразу приступаем к попытке пройти аутентификацию на следующем хосте.
Получаем содержимое директории на хосте командой NLST – это вариант команды LIST, только более ёмкий. Если директория будет пуста, то NLST нам всё равно возвращает найденный «файл» в виде знака «.» (точка), который показывает переход на уровень вверх, в родительскую директорию. Это считается как ложное срабатывание, ведь от этого «файла» пользы нет, поэтому мы это проверяем и к записи не допускаем.
Для наглядности времени выполнения работы граббера используется сторонняя библиотека «tqdm», которая в командной строке рисует статус-бар.
При написании использовались 3 сторонние библиотеки: argparse, ipaddress и tqdm. Если нет pip3, то предварительно установить через "apt-get install python3-pip", далее установить пакеты командой "pip3 install <имя_пакета>" соответственно.
Либо используя файл requirements.txt выполнить команду "pip3 install -r requirements.txt"
Работа программы проверялась на версиях интерпретатора 3.6 и 3.7, как на debian 9 stretch, так и на windows7.
Возможны опечатки, ошибки как по тексту, так и по коду - пишите, буду исправлять.
Граббер анонимных FTP.
Используя анонимные логин и пароль (anonymous/anonymous) граббер пробует подключиться к хосту по протоколу ftp. Если на удалённом хосте разрешена анонимная аутентификация, то граббер собирает информацию о файла/папках, хранящихся в открытом доступе.
В этой конкурсной статье хотелось бы вспомнить старый, добрый FTP, ведь порой людская лень даёт о себе знать, а её результаты могут быть очень интересны. Хотелось бы показать, как можно собрать информацию о файлах/папках, лежащих на ftp-хостах в открытом доступе и сделать всё это в автоматическом режиме.
Для понимания общей логики программы приведу блок-схему:
В составе программы работает:
- «parse_args» - метод, который определяет аргументы командной строки при запуске программы;
- «FTPGrabber» - главный класс, содержащий 4 метода (конструктор «__init__», «main», «anonymous_login», «get_list_dir»), объединённые общей логикой;
- «generate_ip_range» - вспомогательный метод, импортируемый из написанного модуля.
Стек вызовов программы:
- Метод «generate_ip_range». Так как к общей логике программы он не относится, то для удобства этот метод был вынесен в отдельный модуль и мы его импортируем. Методу передаётся обязательный аргумент командной строки в виде ip-адреса или подсети в CIDR-нотации. Метод возвращает список (self.ip_range), в котором содержатся ip-адреса для указанной подсети, либо список с одним элементом, содержащий один ip-адрес и переменную (self.prefix), которая содержит префикс сети. Для работы с ip-адресацией задействована сторонняя библиотека «ipaddress».
Python:
def generate_ip_range(ip_subnet):
'''Возвращает список адресов и префикс сети, либо один адрес'''
ip_range = []
try:
for ip_address in ipaddress.IPv4Network(ip_subnet):
ip_range.append(str(ip_address))
except ipaddress.AddressValueError as ave:
print(str(ave))
print('Wrong CIDR notation. See https://en.wikipedia.org/wiki/Classless_Inter-Domain_Routing')
sys.exit()
except ValueError as ve:
print(str(ve))
print('Wrong CIDR block. See https://en.wikipedia.org/wiki/Classless_Inter-Domain_Routing')
sys.exit()
return ip_range, ipaddress.IPv4Network(ip_subnet).prefixlen
- Метод «parse_args» - как я уже сказал выше, определяет аргументы командной строки, также создаёт экземпляр класса FTPGrabber и начинает работу граббера.
Python:
def parse_args():
'''Определение аргументов, создание экземпляра класса FTPGrabber, начало работы граббера'''
parser = argparse.ArgumentParser(description='Описание аргументов/опций',
epilog='Примеры:\npython parse.py 192.168.1.1'
'\npython parse.py 95.26.0.0/16 -e .docx,.msg'
'\npython parse.py 199.125.45.2 -e .txt',
usage=
'''%(prog)s [опции]\nДля вызова помощи: %(prog)s -h''',
formatter_class=argparse.RawTextHelpFormatter)
parser.add_argument("ip_range", action='store', type=str,
help='Значение должно содержать ip-адрес, либо сеть в CIDR-нотации.'
' Пример: "192.168.0.1", "192.168.0.0/24", "220.155.33.12/30"'
)
parser.add_argument("-t", dest='threads', action='store', type=int, default=100,
help='Количество потоков.'
' Пример: "-t 10", "-t 500".'
' Если опция не указана, то количество потоков по умолчанию 100.')
parser.add_argument('-e', dest='extensions', action='store',
help='Представляет фильтр поиска. Значение должно содержать расширение файлов,'
' которые необходимо найти, в формате ".*".'
' Несколько расширений должны быть указаны через запятую.'
' Пример: "-e doc", "-e .doc,.docx,.xls,.msg,.txt".'
' Если опция не указана, то по умолчанию ведётся поиск всех фалов/директорий.'
)
args = parser.parse_args()
ftp = FTPGrabber(args)
ftp.main()
- Класс «FTPGrabber».
- Метод-конструктор «__init__». При создании экземпляра класса принимает аргументы, парсит их, инициализирует переменные класса.
-
Python:
def __init__(self, args): '''Проверка аргументов и инициализация переменных''' self.ip_range, self.prefix = generate_ip_range(args.ip_range) self.extensions = args.extensions if self.extensions is not None: self.extensions = self.extensions.split(',') if str(self.prefix) != '32': n = args.ip_range.find('/') self.logfile_name = args.ip_range[:n] + '(' + args.ip_range[n+1:] + ')' else: self.logfile_name = args.ip_range self.logfile_name = self.logfile_name + ( (''.join(self.extensions).replace('.', '-') + '.txt') if (self.extensions is not None) else '-all.txt') logfile_name_without_spec = re.sub(r'[/\\:*?"<>|]', r'', self.logfile_name) if self.logfile_name != logfile_name_without_spec: print('Недопустимые символы в указанном расширении.') sys.exit() self.threads = args.threads self.username = 'anonymous' self.password = 'anonymous'
- Метод «main». Запускает потоки, ожидает их завершения, записывает информацию в файл. Вызывает в свою очередь метод «anonymous_login».
-
Python:
def main(self): '''Инициализация и запуск потоков, протоколирование в файл.''' print('[*] Проверка ' + str(len(self.ip_range)) + ' хоста(ов)...') time.sleep(1) threads = [] ip_range_split = [] # разбиваем список кратно количеству указанных потоков size = self.threads while len(self.ip_range) > size: piece = self.ip_range[:size] ip_range_split.append(piece) self.ip_range = self.ip_range[size:] ip_range_split.append(self.ip_range) for ip_range in tqdm(ip_range_split): for host in ip_range: # запуск потоков t = Thread(target=self.anonymous_login, args=(host, self.username, self.password, self.extensions)) try: t.start() threads.append(t) except RuntimeError as ex: print(ex) print("Слишком большое количество потоков, попробуйте меньшее значение.") sys.exit() # ожидание завершения потоков for t in threads: t.join() # запись результатов в файл with codecs.open(self.logfile_name, 'w', 'utf-8') as file: for line in self.passedlist_global: file.write(line) time.sleep(1) print('[*] Проверка закночена, результат сохранён в "'+self.logfile_name+'"')
- Метод «anonymous_login». Пытается установить анонимное соединение с хостом, если соединение установлено, то вызывает метод «get_list_dir» для сбора информации. Записывает собранные данные в переменную класса self.passedlist_global. В этом методе операция добавления данных в переменную класса, представляет из-себя атомарную операцию, реализованную с помощью семафора (Semophore ) из питоновской библиотеки «threading». Следовательно, операция будет доступна только для одного потока.
-
Python:
def anonymous_login(self, hostname, username, password, extensions): '''Ftp-авторизация, сбор найденной информации''' try: # устанавливаем соединение ftp = ftplib.FTP(hostname, timeout=3) ftp.login(username, password) # собираем найденную информацию passedlist = self.get_list_dir(ftp, extensions) ftp.quit() self.semaphore.acquire() # если директория пустая, то переходит к следующему хосту if (len(passedlist) == 1) and (passedlist[0] == '\tнайден файл/директория: .\n') or (not passedlist): return self.passedlist_global.append('[+] Информация, найденная на хосте '+hostname+'\n') for item in passedlist: self.passedlist_global.append(item) self.semaphore.release() except Exception: ''' Так как нет нужды в уведомлении пользователя, то перехватываются все исключения. Переходим к следующему хосту. ''' pass finally: self.semaphore.release() pass
- Метод «get_list_dir». Возвращает список найденных файлов/директорий.
-
Python:
def get_list_dir(self, ftp, extensions): '''Возвращает список найденных файлов/директорий''' passedlist = [] try: dirlist = ftp.nlst() # если определённые расширения при запуске не указаны, то записывает все файлы/папки if extensions is None: for fileName in dirlist: passedlist.append('\tнайден файл/директория: ' + fileName+'\n') else: for fileName in dirlist: fn = fileName.lower() if any(ext in fn for ext in extensions): passedlist.append('\tнайден файл: ' + fileName+'\n') return passedlist except Exception: ''' Так как нет нужды в уведомлении пользователя, то перехватываются все исключения. Переходим к следующему хосту. ''' pass
Программа должна получить один обязательный аргумент и два опциональных, необязательных.
Обязательный - значение должно содержать ip-адрес, либо сеть в CIDR-нотации.
Опциональные – это количество потоков (по умолчанию 100) и поиск файлов по расширениям.
На поиске фалов по расширениям остановлюсь чуть подробнее. Если опция не задана, то программа граббер собирает все файлы и папки. Если задана опция –e, то необходимо указать расширения файлов, которые необходимо сграбить. Таким образом, если опцию задать следующим видом –e .txt,.xls,.doc, то при сборе данных граббер будет учитывать только файлы с расширениями txt, xls и doc.
В двух методах «anonymous_login» и «get_list_dir» есть блоки обработки исключения, в которых по факту исключения не обрабатываются. Сделано это намерено, так как в этом нет нужды. Например, если попытка установить анонимное соединение (в методе «anonymous_login») окажется неудачной, то вызывается соответствующее исключение, мы могли бы проинформировать об этом пользователя, но если будет проверяться, допустим, более 100000 хостов, то страшно представить, сколько будет ненужной информации. Мы нацелены только на успешные попытки анонимной аутентификации. Поэтому мы сразу приступаем к попытке пройти аутентификацию на следующем хосте.
Получаем содержимое директории на хосте командой NLST – это вариант команды LIST, только более ёмкий. Если директория будет пуста, то NLST нам всё равно возвращает найденный «файл» в виде знака «.» (точка), который показывает переход на уровень вверх, в родительскую директорию. Это считается как ложное срабатывание, ведь от этого «файла» пользы нет, поэтому мы это проверяем и к записи не допускаем.
Для наглядности времени выполнения работы граббера используется сторонняя библиотека «tqdm», которая в командной строке рисует статус-бар.
При написании использовались 3 сторонние библиотеки: argparse, ipaddress и tqdm. Если нет pip3, то предварительно установить через "apt-get install python3-pip", далее установить пакеты командой "pip3 install <имя_пакета>" соответственно.
Либо используя файл requirements.txt выполнить команду "pip3 install -r requirements.txt"
Работа программы проверялась на версиях интерпретатора 3.6 и 3.7, как на debian 9 stretch, так и на windows7.
Возможны опечатки, ошибки как по тексту, так и по коду - пишите, буду исправлять.
Python:
# -*- coding: utf-8 -*-
import ftplib
import re
import sys
import time
from threading import *
import codecs
import argparse
from tqdm import tqdm
from gen_ip_range import generate_ip_range
class FTPGrabber():
max_conn = 1
semaphore = Semaphore(max_conn)
passedlist_global = []
logfile_name = ''
def __init__(self, args):
'''Проверка агументов и инициализация переменных'''
self.ip_range, self.prefix = generate_ip_range(args.ip_range)
self.extensions = args.extensions
if self.extensions is not None:
self.extensions = self.extensions.split(',')
if str(self.prefix) != '32':
n = args.ip_range.find('/')
self.logfile_name = args.ip_range[:n] + '(' + args.ip_range[n+1:] + ')'
else:
self.logfile_name = args.ip_range
self.logfile_name = self.logfile_name + (
(''.join(self.extensions).replace('.', '-') + '.txt') if (self.extensions is not None) else '-all.txt')
logfile_name_without_spec = re.sub(r'[/\\:*?"<>|]', r'', self.logfile_name)
if self.logfile_name != logfile_name_without_spec:
print('Недопустимые символы в указанном расширении.')
sys.exit()
self.threads = args.threads
self.username = 'anonymous'
self.password = 'anonymous'
def main(self):
'''Инициализация и запуск потоков, протоколирование в файл.'''
print('[*] Проверка ' + str(len(self.ip_range)) + ' хоста(ов)...')
time.sleep(1)
threads = []
ip_range_split = []
# разбиваем список кратно количеству указанных потоков
size = self.threads
while len(self.ip_range) > size:
piece = self.ip_range[:size]
ip_range_split.append(piece)
self.ip_range = self.ip_range[size:]
ip_range_split.append(self.ip_range)
for ip_range in tqdm(ip_range_split):
for host in ip_range:
# запуск потоков
t = Thread(target=self.anonymous_login, args=(host, self.username, self.password, self.extensions))
try:
t.start()
threads.append(t)
except RuntimeError as ex:
print(ex)
print("Слишком большое количество потоков, попробуйте меньшее значение.")
sys.exit()
# ожидание завершения потоков
for t in threads:
t.join()
# запись результатов в файл
with codecs.open(self.logfile_name, 'w', 'utf-8') as file:
for line in self.passedlist_global:
file.write(line)
time.sleep(1)
print('[*] Проверка закночена, результат сохранён в "'+self.logfile_name+'"')
def anonymous_login(self, hostname, username, password, extensions):
'''Ftp-авторизация, сбор найденной информации'''
try:
# устанавливаем соединение
ftp = ftplib.FTP(hostname, timeout=3)
ftp.login(username, password)
# собираем найденную информацию
passedlist = self.get_list_dir(ftp, extensions)
ftp.quit()
self.semaphore.acquire()
# если директория пустая, то переходит к следующему хосту
if (len(passedlist) == 1) and (passedlist[0] == '\tнайден файл/директория: .\n') or (not passedlist):
return
self.passedlist_global.append('[+] Информация, найденная на хосте '+hostname+'\n')
for item in passedlist:
self.passedlist_global.append(item)
self.semaphore.release()
except Exception:
'''
Так как нет нужды в уведомлении пользователя, то перехватываются все исключения.
Переходим к следующему хосту.
'''
pass
finally:
self.semaphore.release()
pass
def get_list_dir(self, ftp, extensions):
'''Возвращает список найденных файлов/директорий'''
passedlist = []
try:
dirlist = ftp.nlst()
# если определённые расширения при запуске не указаны, то записывает все файлы/папки
if extensions is None:
for fileName in dirlist:
passedlist.append('\tнайден файл/директория: ' + fileName+'\n')
else:
for fileName in dirlist:
fn = fileName.lower()
if any(ext in fn for ext in extensions):
passedlist.append('\tнайден файл: ' + fileName+'\n')
return passedlist
except Exception:
'''
Так как нет нужды в уведомлении пользователя, то перехватываются все исключения.
Переходим к следующему хосту.
'''
pass
def parse_args():
'''Определение аргументов, создание экземпляра класса FTPGrabber, начало работы граббера'''
parser = argparse.ArgumentParser(description='Описание аргументов/опций',
epilog='Примеры:\npython parse.py 192.168.1.1'
'\npython parse.py 95.26.0.0/16 -e .docx,.msg'
'\npython parse.py 199.125.45.2 -e .txt',
usage=
'''%(prog)s [опции]\nДля вызова помощи: %(prog)s -h''',
formatter_class=argparse.RawTextHelpFormatter)
parser.add_argument("ip_range", action='store', type=str,
help='Значение должно содержать ip-адрес, либо сеть в CIDR-нотации.'
' Пример: "192.168.0.1", "192.168.0.0/24", "220.155.33.12/30"'
)
parser.add_argument("-t", dest='threads', action='store', type=int, default=100,
help='Количество потоков.'
' Пример: "-t 10", "-t 500".'
' Если опция не указана, то количество потоков по умолчанию 100.')
parser.add_argument('-e', dest='extensions', action='store',
help='Представляет фильтр поиска. Значение должно содержать расширение файлов,'
' которые необходимо найти, в формате ".*".'
' Несколько расширений должны быть указаны через запятую.'
' Пример: "-e doc", "-e .doc,.docx,.xls,.msg,.txt".'
' Если опция не указана, то по умолчанию ведётся поиск всех фалов/директорий.'
)
args = parser.parse_args()
ftp = FTPGrabber(args)
ftp.main()
if __name__ == '__main__':
parse_args()
Python:
import ipaddress
import sys
def generate_ip_range(ip_subnet):
'''Возвращает список адресов и префикс сети, либо один адрес'''
ip_range = []
try:
for ip_address in ipaddress.IPv4Network(ip_subnet):
ip_range.append(str(ip_address))
except ipaddress.AddressValueError as ave:
print(str(ave))
print('Wrong CIDR notation. See https://en.wikipedia.org/wiki/Classless_Inter-Domain_Routing')
sys.exit()
except ValueError as ve:
print(str(ve))
print('Wrong CIDR block. See https://en.wikipedia.org/wiki/Classless_Inter-Domain_Routing')
sys.exit()
return ip_range, ipaddress.IPv4Network(ip_subnet).prefixlen
Код:
argparse==1.4.0
tqdm==4.28.1
ipaddress==1.0.22
Последнее редактирование модератором: