Конкурс FTP-граббер

  • Автор темы Cenzor
  • Дата начала
Статья для участия в Конкурсе программистов
Граббер анонимных FTP.
Используя анонимные логин и пароль (anonymous/anonymous) граббер пробует подключиться к хосту по протоколу ftp. Если на удалённом хосте разрешена анонимная аутентификация, то граббер собирает информацию о файла/папках, хранящихся в открытом доступе.

В этой конкурсной статье хотелось бы вспомнить старый, добрый FTP, ведь порой людская лень даёт о себе знать, а её результаты могут быть очень интересны. Хотелось бы показать, как можно собрать информацию о файлах/папках, лежащих на ftp-хостах в открытом доступе и сделать всё это в автоматическом режиме.
Для понимания общей логики программы приведу блок-схему:
блок-схема.PNG

В составе программы работает:
- «parse_args» - метод, который определяет аргументы командной строки при запуске программы;
- «FTPGrabber» - главный класс, содержащий 4 метода (конструктор «__init__», «main», «anonymous_login», «get_list_dir»), объединённые общей логикой;
- «generate_ip_range» - вспомогательный метод, импортируемый из написанного модуля.
Стек вызовов программы:
стек.PNG

  • Метод «generate_ip_range». Так как к общей логике программы он не относится, то для удобства этот метод был вынесен в отдельный модуль и мы его импортируем. Методу передаётся обязательный аргумент командной строки в виде ip-адреса или подсети в CIDR-нотации. Метод возвращает список (self.ip_range), в котором содержатся ip-адреса для указанной подсети, либо список с одним элементом, содержащий один ip-адрес и переменную (self.prefix), которая содержит префикс сети. Для работы с ip-адресацией задействована сторонняя библиотека «ipaddress».
Python:
def generate_ip_range(ip_subnet):
    '''Возвращает список адресов и префикс сети, либо один адрес'''
    ip_range = []
    try:
        for ip_address in ipaddress.IPv4Network(ip_subnet):
            ip_range.append(str(ip_address))
    except ipaddress.AddressValueError as ave:
        print(str(ave))
        print('Wrong CIDR notation. See https://en.wikipedia.org/wiki/Classless_Inter-Domain_Routing')
        sys.exit()
    except ValueError as ve:
        print(str(ve))
        print('Wrong CIDR block. See https://en.wikipedia.org/wiki/Classless_Inter-Domain_Routing')
        sys.exit()
    return ip_range, ipaddress.IPv4Network(ip_subnet).prefixlen
  • Метод «parse_args» - как я уже сказал выше, определяет аргументы командной строки, также создаёт экземпляр класса FTPGrabber и начинает работу граббера.
Python:
def parse_args():
    '''Определение аргументов, создание экземпляра класса FTPGrabber, начало работы граббера'''
    parser = argparse.ArgumentParser(description='Описание аргументов/опций',
                                     epilog='Примеры:\npython parse.py 192.168.1.1'
                                            '\npython parse.py 95.26.0.0/16 -e .docx,.msg'
                                            '\npython parse.py 199.125.45.2 -e .txt',
                                     usage=
                                     '''%(prog)s [опции]\nДля вызова помощи: %(prog)s -h''',
                                     formatter_class=argparse.RawTextHelpFormatter)
    parser.add_argument("ip_range", action='store', type=str,
                        help='Значение должно содержать ip-адрес, либо сеть в CIDR-нотации.'
                             ' Пример: "192.168.0.1", "192.168.0.0/24", "220.155.33.12/30"'
                        )
    parser.add_argument("-t", dest='threads', action='store', type=int, default=100,
                        help='Количество потоков.'
                             ' Пример: "-t 10", "-t 500".'
                             ' Если опция не указана, то количество потоков по умолчанию 100.')
    parser.add_argument('-e', dest='extensions', action='store',
                        help='Представляет фильтр поиска. Значение должно содержать расширение файлов,'
                             ' которые необходимо найти, в формате ".*".'
                             ' Несколько расширений должны быть указаны через запятую.'
                             ' Пример: "-e doc", "-e .doc,.docx,.xls,.msg,.txt".'
                             ' Если опция не указана, то по умолчанию ведётся поиск всех фалов/директорий.'
                        )
    args = parser.parse_args()
    ftp = FTPGrabber(args)
    ftp.main()
  • Класс «FTPGrabber».
    • Метод-конструктор «__init__». При создании экземпляра класса принимает аргументы, парсит их, инициализирует переменные класса.
    • Python:
          def __init__(self, args):
              '''Проверка аргументов и инициализация переменных'''
              self.ip_range, self.prefix = generate_ip_range(args.ip_range)
              self.extensions = args.extensions
              if self.extensions is not None:
                  self.extensions = self.extensions.split(',')
              if str(self.prefix) != '32':
                  n = args.ip_range.find('/')
                  self.logfile_name = args.ip_range[:n] + '(' + args.ip_range[n+1:] + ')'
              else:
                  self.logfile_name = args.ip_range
              self.logfile_name = self.logfile_name + (
                  (''.join(self.extensions).replace('.', '-') + '.txt') if (self.extensions is not None) else '-all.txt')
              logfile_name_without_spec = re.sub(r'[/\\:*?"<>|]', r'', self.logfile_name)
              if self.logfile_name != logfile_name_without_spec:
                  print('Недопустимые символы в указанном расширении.')
                  sys.exit()
              self.threads = args.threads
              self.username = 'anonymous'
              self.password = 'anonymous'
    • Метод «main». Запускает потоки, ожидает их завершения, записывает информацию в файл. Вызывает в свою очередь метод «anonymous_login».
    • Python:
          def main(self):
              '''Инициализация и запуск потоков, протоколирование в файл.'''
              print('[*] Проверка ' + str(len(self.ip_range)) + ' хоста(ов)...')
              time.sleep(1)
              threads = []
              ip_range_split = []
              # разбиваем список кратно количеству указанных потоков
              size = self.threads
              while len(self.ip_range) > size:
                  piece = self.ip_range[:size]
                  ip_range_split.append(piece)
                  self.ip_range = self.ip_range[size:]
              ip_range_split.append(self.ip_range)
              for ip_range in tqdm(ip_range_split):
                  for host in ip_range:
                      # запуск потоков
                      t = Thread(target=self.anonymous_login, args=(host, self.username, self.password, self.extensions))
                      try:
                          t.start()
                          threads.append(t)
                      except RuntimeError as ex:
                          print(ex)
                          print("Слишком большое количество потоков, попробуйте меньшее значение.")
                          sys.exit()
                  # ожидание завершения потоков
                  for t in threads:
                      t.join()
              # запись результатов в файл
              with codecs.open(self.logfile_name, 'w', 'utf-8') as file:
                  for line in self.passedlist_global:
                      file.write(line)
              time.sleep(1)
              print('[*] Проверка закночена, результат сохранён в "'+self.logfile_name+'"')
    • Метод «anonymous_login». Пытается установить анонимное соединение с хостом, если соединение установлено, то вызывает метод «get_list_dir» для сбора информации. Записывает собранные данные в переменную класса self.passedlist_global. В этом методе операция добавления данных в переменную класса, представляет из-себя атомарную операцию, реализованную с помощью семафора (Semophore ) из питоновской библиотеки «threading». Следовательно, операция будет доступна только для одного потока.
    • Python:
          def anonymous_login(self, hostname, username, password, extensions):
              '''Ftp-авторизация, сбор найденной информации'''
              try:
                  # устанавливаем соединение
                  ftp = ftplib.FTP(hostname, timeout=3)
                  ftp.login(username, password)
                  # собираем найденную информацию
                  passedlist = self.get_list_dir(ftp, extensions)
                  ftp.quit()
                  self.semaphore.acquire()
                  # если директория пустая, то переходит к следующему хосту
                  if (len(passedlist) == 1) and (passedlist[0] == '\tнайден файл/директория: .\n') or (not passedlist):
                      return
                  self.passedlist_global.append('[+] Информация, найденная на хосте '+hostname+'\n')
                  for item in passedlist:
                      self.passedlist_global.append(item)
                  self.semaphore.release()
              except Exception:
                  '''
                  Так как нет нужды в уведомлении пользователя, то перехватываются все исключения.
                  Переходим к следующему хосту.
                  '''
                  pass
              finally:
                  self.semaphore.release()
                  pass
    • Метод «get_list_dir». Возвращает список найденных файлов/директорий.
    • Python:
          def get_list_dir(self, ftp, extensions):
              '''Возвращает список найденных файлов/директорий'''
              passedlist = []
              try:
                  dirlist = ftp.nlst()
                  # если определённые расширения при запуске не указаны, то записывает все файлы/папки
                  if extensions is None:
                      for fileName in dirlist:
                          passedlist.append('\tнайден файл/директория: ' + fileName+'\n')
                  else:
                      for fileName in dirlist:
                          fn = fileName.lower()
                          if any(ext in fn for ext in extensions):
                              passedlist.append('\tнайден файл: ' + fileName+'\n')
                  return passedlist
              except Exception:
                  '''
                  Так как нет нужды в уведомлении пользователя, то перехватываются все исключения.
                  Переходим к следующему хосту.
                  '''
                  pass

Программа должна получить один обязательный аргумент и два опциональных, необязательных.
Обязательный - значение должно содержать ip-адрес, либо сеть в CIDR-нотации.
Опциональные – это количество потоков (по умолчанию 100) и поиск файлов по расширениям.
На поиске фалов по расширениям остановлюсь чуть подробнее. Если опция не задана, то программа граббер собирает все файлы и папки. Если задана опция –e, то необходимо указать расширения файлов, которые необходимо сграбить. Таким образом, если опцию задать следующим видом –e .txt,.xls,.doc, то при сборе данных граббер будет учитывать только файлы с расширениями txt, xls и doc.

В двух методах «anonymous_login» и «get_list_dir» есть блоки обработки исключения, в которых по факту исключения не обрабатываются. Сделано это намерено, так как в этом нет нужды. Например, если попытка установить анонимное соединение (в методе «anonymous_login») окажется неудачной, то вызывается соответствующее исключение, мы могли бы проинформировать об этом пользователя, но если будет проверяться, допустим, более 100000 хостов, то страшно представить, сколько будет ненужной информации. Мы нацелены только на успешные попытки анонимной аутентификации. Поэтому мы сразу приступаем к попытке пройти аутентификацию на следующем хосте.

Получаем содержимое директории на хосте командой NLST – это вариант команды LIST, только более ёмкий. Если директория будет пуста, то NLST нам всё равно возвращает найденный «файл» в виде знака «.» (точка), который показывает переход на уровень вверх, в родительскую директорию. Это считается как ложное срабатывание, ведь от этого «файла» пользы нет, поэтому мы это проверяем и к записи не допускаем.

Для наглядности времени выполнения работы граббера используется сторонняя библиотека «tqdm», которая в командной строке рисует статус-бар.
При написании использовались 3 сторонние библиотеки: argparse, ipaddress и tqdm. Если нет pip3, то предварительно установить через "apt-get install python3-pip", далее установить пакеты командой "pip3 install <имя_пакета>" соответственно.
Либо используя файл requirements.txt выполнить команду "pip3 install -r requirements.txt"
Работа программы проверялась на версиях интерпретатора 3.6 и 3.7, как на debian 9 stretch, так и на windows7.
Возможны опечатки, ошибки как по тексту, так и по коду - пишите, буду исправлять.
Python:
# -*- coding: utf-8 -*-

import ftplib
import re
import sys
import time
from threading import *
import codecs
import argparse
from tqdm import tqdm
from gen_ip_range import generate_ip_range


class FTPGrabber():

    max_conn = 1
    semaphore = Semaphore(max_conn)
    passedlist_global = []
    logfile_name = ''

    def __init__(self, args):
        '''Проверка агументов и инициализация переменных'''
        self.ip_range, self.prefix = generate_ip_range(args.ip_range)
        self.extensions = args.extensions
        if self.extensions is not None:
            self.extensions = self.extensions.split(',')
        if str(self.prefix) != '32':
            n = args.ip_range.find('/')
            self.logfile_name = args.ip_range[:n] + '(' + args.ip_range[n+1:] + ')'
        else:
            self.logfile_name = args.ip_range
        self.logfile_name = self.logfile_name + (
            (''.join(self.extensions).replace('.', '-') + '.txt') if (self.extensions is not None) else '-all.txt')
        logfile_name_without_spec = re.sub(r'[/\\:*?"<>|]', r'', self.logfile_name)
        if self.logfile_name != logfile_name_without_spec:
            print('Недопустимые символы в указанном расширении.')
            sys.exit()
        self.threads = args.threads
        self.username = 'anonymous'
        self.password = 'anonymous'

    def main(self):
        '''Инициализация и запуск потоков, протоколирование в файл.'''
        print('[*] Проверка ' + str(len(self.ip_range)) + ' хоста(ов)...')
        time.sleep(1)
        threads = []
        ip_range_split = []
        # разбиваем список кратно количеству указанных потоков
        size = self.threads
        while len(self.ip_range) > size:
            piece = self.ip_range[:size]
            ip_range_split.append(piece)
            self.ip_range = self.ip_range[size:]
        ip_range_split.append(self.ip_range)
        for ip_range in tqdm(ip_range_split):
            for host in ip_range:
                # запуск потоков
                t = Thread(target=self.anonymous_login, args=(host, self.username, self.password, self.extensions))
                try:
                    t.start()
                    threads.append(t)
                except RuntimeError as ex:
                    print(ex)
                    print("Слишком большое количество потоков, попробуйте меньшее значение.")
                    sys.exit()
            # ожидание завершения потоков
            for t in threads:
                t.join()
        # запись результатов в файл
        with codecs.open(self.logfile_name, 'w', 'utf-8') as file:
            for line in self.passedlist_global:
                file.write(line)
        time.sleep(1)
        print('[*] Проверка закночена, результат сохранён в "'+self.logfile_name+'"')

    def anonymous_login(self, hostname, username, password, extensions):
        '''Ftp-авторизация, сбор найденной информации'''
        try:
            # устанавливаем соединение
            ftp = ftplib.FTP(hostname, timeout=3)
            ftp.login(username, password)
            # собираем найденную информацию
            passedlist = self.get_list_dir(ftp, extensions)
            ftp.quit()
            self.semaphore.acquire()
            # если директория пустая, то переходит к следующему хосту
            if (len(passedlist) == 1) and (passedlist[0] == '\tнайден файл/директория: .\n') or (not passedlist):
                return
            self.passedlist_global.append('[+] Информация, найденная на хосте '+hostname+'\n')
            for item in passedlist:
                self.passedlist_global.append(item)
            self.semaphore.release()
        except Exception:
            '''
            Так как нет нужды в уведомлении пользователя, то перехватываются все исключения.
            Переходим к следующему хосту.
            '''
            pass
        finally:
            self.semaphore.release()
            pass

    def get_list_dir(self, ftp, extensions):
        '''Возвращает список найденных файлов/директорий'''
        passedlist = []
        try:
            dirlist = ftp.nlst()
            # если определённые расширения при запуске не указаны, то записывает все файлы/папки
            if extensions is None:
                for fileName in dirlist:
                    passedlist.append('\tнайден файл/директория: ' + fileName+'\n')
            else:
                for fileName in dirlist:
                    fn = fileName.lower()
                    if any(ext in fn for ext in extensions):
                        passedlist.append('\tнайден файл: ' + fileName+'\n')
            return passedlist
        except Exception:
            '''
            Так как нет нужды в уведомлении пользователя, то перехватываются все исключения.
            Переходим к следующему хосту.
            '''
            pass


def parse_args():
    '''Определение аргументов, создание экземпляра класса FTPGrabber, начало работы граббера'''
    parser = argparse.ArgumentParser(description='Описание аргументов/опций',
                                     epilog='Примеры:\npython parse.py 192.168.1.1'
                                            '\npython parse.py 95.26.0.0/16 -e .docx,.msg'
                                            '\npython parse.py 199.125.45.2 -e .txt',
                                     usage=
                                     '''%(prog)s [опции]\nДля вызова помощи: %(prog)s -h''',
                                     formatter_class=argparse.RawTextHelpFormatter)
    parser.add_argument("ip_range", action='store', type=str,
                        help='Значение должно содержать ip-адрес, либо сеть в CIDR-нотации.'
                             ' Пример: "192.168.0.1", "192.168.0.0/24", "220.155.33.12/30"'
                        )
    parser.add_argument("-t", dest='threads', action='store', type=int, default=100,
                        help='Количество потоков.'
                             ' Пример: "-t 10", "-t 500".'
                             ' Если опция не указана, то количество потоков по умолчанию 100.')
    parser.add_argument('-e', dest='extensions', action='store',
                        help='Представляет фильтр поиска. Значение должно содержать расширение файлов,'
                             ' которые необходимо найти, в формате ".*".'
                             ' Несколько расширений должны быть указаны через запятую.'
                             ' Пример: "-e doc", "-e .doc,.docx,.xls,.msg,.txt".'
                             ' Если опция не указана, то по умолчанию ведётся поиск всех фалов/директорий.'
                        )
    args = parser.parse_args()
    ftp = FTPGrabber(args)
    ftp.main()


if __name__ == '__main__':
    parse_args()
Python:
import ipaddress
import sys


def generate_ip_range(ip_subnet):
    '''Возвращает список адресов и префикс сети, либо один адрес'''
    ip_range = []
    try:
        for ip_address in ipaddress.IPv4Network(ip_subnet):
            ip_range.append(str(ip_address))
    except ipaddress.AddressValueError as ave:
        print(str(ave))
        print('Wrong CIDR notation. See https://en.wikipedia.org/wiki/Classless_Inter-Domain_Routing')
        sys.exit()
    except ValueError as ve:
        print(str(ve))
        print('Wrong CIDR block. See https://en.wikipedia.org/wiki/Classless_Inter-Domain_Routing')
        sys.exit()
    return ip_range, ipaddress.IPv4Network(ip_subnet).prefixlen
Код:
argparse==1.4.0
tqdm==4.28.1
ipaddress==1.0.22
 
Последнее редактирование модератором:
C

Cenzor

Небольшое примечание. Таймаут ожидания ответа от хоста по протоклоу ftp указан в методе "anonymous_login" в строке
Python:
ftp = ftplib.FTP(hostname, timeout=3)
В текущем варианте это довольно большой таймаут, был выбран для общего случая. Для работы граббера в корпоративных сетях, в зоновых сетях оператора, предоставляющего услуги связи, достаточно будет 1 - 0,5 сек.
 

onero

Green Team
07.12.2018
17
51
BIT
40
Тут нет возможности использовать прокси? Очень нужная для таких целей функция.
 
C

Cenzor

Тут нет возможности использовать прокси? Очень нужная для таких целей функция.
Не простой вопрос вы задали. Во-первых, стандартная библиотека ftplib такое не умеет. Во-вторых-раз, есть отдельное решение (не моё), но это даже и не сторонний модуль , а частный случай (хотя, почему и не модуль, но pip про него не знает ), через библиотеку socket. Во-вторых-два, писался он под интерпретатор 2.7 и работал на нём. В-третьих, работал он только в trasparent режиме, тогда вопрос, а есть ли смысл? Ну и если рассуждать дальше, учитывая всё вышесказанное, то пилить такой костыль в эту утилитку, у которой самой строк кода будет меньше, чем у костыля - "ах, оставьте")))
Хотя, может я просто не осведомлён о существовании такой библиотеки.
А если от слов к делу, то лучшее решение именно этого вопроса, как я считаю, то это proxychains/dante. Как говорится, ни питоном единым жив человек. И самое главное, сбережёте нервы и время, время как на колдовство, так и на время работы программы.
 
C

Cenzor

GUI вариант для FTPGrabber на PyQt5
Логика программы та же.
Для работы с PyQt5 установить "pip3 install pyqt5", "pip3 install pyqt5-tools". Designer запускаем из питоновкой директории + "Lib\site-packages\pyqt5_tools\designer.exe"
Конструируем дизайн в PyQt5Designer, сохраняем, из .ui конвертируем в .py батником со следующим содержимым:
Код:
pyuic5 mainWindow.ui -o mainWindow.py
Подключаем этот модуль в основной программе:
Python:
from mainWindow import *
Главное окно программы.
Снимок.PNG

Выбрасываемые исключения открывают информационные окна с описанием проблемы:
Снимок2.PNG

По окончанию работы граббера, автоматически открывается созданный файл с результатами.

Python:
# -*- coding: utf-8 -*-

import sys
import webbrowser
import re
import codecs
import ftplib
from threading import *
from PyQt5 import QtWidgets
from PyQt5.QtWidgets import QMessageBox
from PyQt5 import QtCore
from mainWindow import *
from gen_ip_range import generate_ip_range
import ipaddress


class FTPGrabber(QtWidgets.QMainWindow):

    max_conn = 1
    semaphore = Semaphore(max_conn)
    passedlist_global = []

    def __init__(self, parent=None):
        QtWidgets.QWidget.__init__(self, parent)
        self.ui = Ui_MainWindow()
        self.ui.setupUi(self)
        self.username = 'anonymous'
        self.password = 'anonymous'
        self.set_signal()

    def set_signal(self):
        self.ui.chBExtensions.stateChanged.connect(self.changed_chBExtensions)
        self.ui.pBGrabit.clicked.connect(self.main)

    def changed_chBExtensions(self):
        if self.ui.chBExtensions.isChecked():
            self.ui.lEExtensions.setEnabled(True)
            self.ui.lEExtensions.setText('')
        else:
            self.ui.lEExtensions.setEnabled(False)
            self.ui.lEExtensions.setText('all extensions')

    def main(self):
        '''Инициализация переменных и запуск потоков, протоколирование в файл.'''
        try:
            self.check_var()
        except ipaddress.AddressValueError as ave:
            QMessageBox.about(self, 'Ошибка',
                              'Wrong CIDR notation. See https://en.wikipedia.org/wiki/Classless_Inter-Domain_Routing')
            return
        except ValueError as ve:
            if 'bits set' in str(ve):
                QMessageBox.about(self, 'Ошибка',
                              'Wrong CIDR block. See https://en.wikipedia.org/wiki/Classless_Inter-Domain_Routing')
            else:
                QMessageBox.about(self, 'Ошибка',
                                  'Количество потоков должно быть целым числом')
            return
        except BaseException as be:
            QMessageBox.about(self, 'Ошибка',
                              'Недопустимые символы в указанном расширении. Поле не должно содержать символы: /\:*?"<>|')
            return
        threads = []
        ip_range_split = []
        # разбиваем список кратно количеству указанных потоков
        size = self.thread
        while len(self.ip_range) > size:
            piece = self.ip_range[:size]
            ip_range_split.append(piece)
            self.ip_range = self.ip_range[size:]
        ip_range_split.append(self.ip_range)
        for ip_range in ip_range_split:
            for host in ip_range:
                # запуск потоков
                t = Thread(target=self.anonymous_login, args=(host, self.username, self.password, self.extensions))
                try:
                    t.start()
                    threads.append(t)
                except RuntimeError as ex:
                    QMessageBox.about(self, 'Ошибка',
                                      'Слишком большое количество потоков, попробуйте меньшее значение.')
                    return
            # ожидание завершения потоков
            for t in threads:
                t.join()
        # запись результатов в файл
        with codecs.open(self.logfile_name, 'w', 'utf-8') as file:
            for line in self.passedlist_global:
                file.write(line)
        self.passedlist_global = []
        QMessageBox.about(self, 'Результат',
                          'Проверка закночена, результат сохранён в "' + self.logfile_name + '"')
        webbrowser.open(self.logfile_name)

    def check_var(self):
        '''Инициализация переменных.'''
        self.ip_range, self.prefix = generate_ip_range(self.ui.lEIPRange.text())
        self.extensions = self.ui.lEExtensions.text()
        if self.ui.chBExtensions.isChecked():
            self.extensions = self.ui.lEExtensions.text().split(',')
        else:
            self.extensions = None
        if str(self.prefix) != '32':
            n = self.ui.lEIPRange.text().find('/')
            self.logfile_name = self.ui.lEIPRange.text()[:n] + '(' + self.ui.lEIPRange.text()[n + 1:] + ')'
        else:
            self.logfile_name = self.ui.lEIPRange.text()
        self.logfile_name = self.logfile_name + (
            (''.join(self.extensions).replace('.', '-') + '.txt') if (self.extensions is not None) else '-all.txt')
        logfile_name_without_spec = re.sub(r'[/\\:*?"<>|]', r'', self.logfile_name)
        if self.logfile_name != logfile_name_without_spec:
            raise BaseException
        self.thread = int(self.ui.lEThreads.text())

    def anonymous_login(self, hostname, username, password, extensions):
        '''Ftp-авторизация, сбор найденной информации'''
        try:
            # устанавливаем соединение
            ftp = ftplib.FTP(hostname, timeout=3)
            ftp.login(username, password)
            # собираем найденную информацию
            passedlist = self.get_list_dir(ftp, extensions)
            ftp.quit()
            self.semaphore.acquire()
            # если директория пустая, то переходит к следующему хосту
            if (len(passedlist) == 1) and (passedlist[0] == '\tнайден файл/директория: .\n') or (not passedlist):
                return
            self.passedlist_global.append('[+] Информация, найденная на хосте '+hostname+'\n')
            for item in passedlist:
                self.passedlist_global.append(item)
            self.semaphore.release()
        except Exception:
            '''
            Так как нет нужды в уведомлении пользователя, то перехватываются все исключения.
            Переходим к следующему хосту.
            '''
            pass
        finally:
            self.semaphore.release()
            pass

    def get_list_dir(self, ftp, extensions):
        '''Возвращает список найденных файлов/директорий'''
        passedlist = []
        try:
            dirlist = ftp.nlst()
            # если определённые расширения при запуске не указаны, то записывает все файлы/папки
            if extensions is None:
                for fileName in dirlist:
                    passedlist.append('\tнайден файл/директория: ' + fileName+'\n')
            else:
                for fileName in dirlist:
                    fn = fileName.lower()
                    if any(ext in fn for ext in extensions):
                        passedlist.append('\tнайден файл: ' + fileName+'\n')
            return passedlist
        except Exception:
            '''
            Так как нет нужды в уведомлении пользователя, то перехватываются все исключения.
            Переходим к следующему хосту.
            '''
            pass


if __name__ == "__main__":
    app = QtWidgets.QApplication(sys.argv)
    ftp_grabber = FTPGrabber()
    ftp_grabber.show()
    sys.exit(app.exec_())
Python:
from PyQt5 import QtCore, QtGui, QtWidgets

class Ui_MainWindow(object):
    def setupUi(self, MainWindow):
        MainWindow.setObjectName("MainWindow")
        MainWindow.setEnabled(True)
        MainWindow.resize(370, 157)
        sizePolicy = QtWidgets.QSizePolicy(QtWidgets.QSizePolicy.Fixed, QtWidgets.QSizePolicy.Fixed)
        sizePolicy.setHorizontalStretch(0)
        sizePolicy.setVerticalStretch(0)
        sizePolicy.setHeightForWidth(MainWindow.sizePolicy().hasHeightForWidth())
        MainWindow.setSizePolicy(sizePolicy)
        MainWindow.setMinimumSize(QtCore.QSize(370, 157))
        MainWindow.setMaximumSize(QtCore.QSize(370, 157))
        MainWindow.setIconSize(QtCore.QSize(24, 24))
        MainWindow.setToolButtonStyle(QtCore.Qt.ToolButtonIconOnly)
        MainWindow.setTabShape(QtWidgets.QTabWidget.Rounded)
        self.centralwidget = QtWidgets.QWidget(MainWindow)
        self.centralwidget.setObjectName("centralwidget")
        self.label = QtWidgets.QLabel(self.centralwidget)
        self.label.setGeometry(QtCore.QRect(20, 20, 180, 13))
        self.label.setObjectName("label")
        self.label_2 = QtWidgets.QLabel(self.centralwidget)
        self.label_2.setGeometry(QtCore.QRect(155, 50, 43, 13))
        self.label_2.setObjectName("label_2")
        self.chBExtensions = QtWidgets.QCheckBox(self.centralwidget)
        self.chBExtensions.setGeometry(QtCore.QRect(125, 80, 79, 17))
        self.chBExtensions.setObjectName("chBExtensions")
        self.lEIPRange = QtWidgets.QLineEdit(self.centralwidget)
        self.lEIPRange.setGeometry(QtCore.QRect(200, 18, 150, 20))
        self.lEIPRange.setObjectName("lEIPRange")
        self.lEThreads = QtWidgets.QLineEdit(self.centralwidget)
        self.lEThreads.setGeometry(QtCore.QRect(200, 48, 150, 20))
        self.lEThreads.setObjectName("lEThreads")
        self.lEExtensions = QtWidgets.QLineEdit(self.centralwidget)
        self.lEExtensions.setEnabled(False)
        self.lEExtensions.setGeometry(QtCore.QRect(200, 78, 150, 20))
        self.lEExtensions.setObjectName("lEExtensions")
        self.pBGrabit = QtWidgets.QPushButton(self.centralwidget)
        self.pBGrabit.setGeometry(QtCore.QRect(20, 110, 331, 23))
        self.pBGrabit.setObjectName("pBGrabit")
        MainWindow.setCentralWidget(self.centralwidget)
        self.menubar = QtWidgets.QMenuBar(MainWindow)
        self.menubar.setGeometry(QtCore.QRect(0, 0, 370, 21))
        self.menubar.setObjectName("menubar")
        MainWindow.setMenuBar(self.menubar)
        self.statusbar = QtWidgets.QStatusBar(MainWindow)
        self.statusbar.setObjectName("statusbar")
        MainWindow.setStatusBar(self.statusbar)

        self.retranslateUi(MainWindow)
        QtCore.QMetaObject.connectSlotsByName(MainWindow)

    def retranslateUi(self, MainWindow):
        _translate = QtCore.QCoreApplication.translate
        MainWindow.setWindowTitle(_translate("MainWindow", "FTPGrabber"))
        self.label.setText(_translate("MainWindow", "IP address/IP range (CIDR-notation):"))
        self.label_2.setText(_translate("MainWindow", "Threads:"))
        self.chBExtensions.setText(_translate("MainWindow", "Extensions:"))
        self.lEExtensions.setText(_translate("MainWindow", "all extensions"))
        self.pBGrabit.setText(_translate("MainWindow", "Grab it"))
Python:
import ipaddress
import sys


def generate_ip_range(ip_subnet):
    '''Возвращает список адресов и префикс сети, либо один адрес'''
    ip_range = []
    for ip_address in ipaddress.IPv4Network(ip_subnet):
        ip_range.append(str(ip_address))
    return ip_range, ipaddress.IPv4Network(ip_subnet).prefixlen
Код:
pyuic5 mainWindow.ui -o mainWindow.py
 
Последнее редактирование модератором:
Мы в соцсетях:

Обучение наступательной кибербезопасности в игровой форме. Начать игру!