Статья Изучаем Python на практике. Пишем чекер SSH серверов.

Приветствую всех, кто изучает Python и хочет перейти от сухой теории к практическому применению полученных знаний.
Просмотрев форум codeby.net нашел несколько материалов, где авторы делились своим опытом и пробовали свои силы в написании небольших скриптов. К сожалению большинство авторов использовали процедурный тип написания кода. В то время как создание, поддержание и развитие приложения даже средней сложности требует ООП подхода.
Попробуем в небольшом проекте применить ООП, в самом простом виде.
Автор будет очень благодарен более опытным товарищам, если они наставят на путь истинный и, возможно, дадут пару подзатыльников для дальнейшего развития в нужном направлении.
Не смотря на свой возраст - автор начинающий программист-любитель, самоучка, а без посторонней помощи в программировании можно барахтаться годами примерно на одном уровне. Именно поэтому и пишу.

Для тех, кто сразу хочет развернуть проект, скачиваем архив и в папке с архивом выполняем:
Python:
pip install -r requirements.txt

Полный текст программы можно посмотреть в конце поста под спойлером "Весь код".

Итак, приступим. Имеем на входе - текстовый файл, назовем его 'ssh_nocheck.txt' со строками вида:
Python:
username password ipaddress port
Соответственно: логин, пароль, адрес SSH сервера, порт для подключения.

Нужно проверить каждый хост (строку файла) на возможность подключения.
Учетные данные и адрес сервера, принявшего соединение, записать в файл, назовем его goods.txt.

Для тестирования работы скрипта понадобятся SSH аккаунты. Есть сервисы на которых их можно получить легально и бесплатно.
Здесь каталог ссылок на такие сервисы. , к сожалению много мертвых.
Автор воспользовался этим:

Раз уж мы решили применить ООП, посмотрим какие сущности у нас есть в поставленной задаче:
1. файл со списком серверов, нуждающихся в проверке;
2. файл для записи списка серверов, ответивших на запрос при проверке;
3. сам сервер, который нужно проверить, в виде строки во входящем файле.
После написания кода заметил, что дублируются операции по работе с файлами и решил вывести все, что касается файловых операций в отдельный класс.
Таким образом получилось 4 класса:

  1. class InputOutput - будет отвечать за чтение/запись в файл
  2. class InputList - будет получать данные из файла-списка и хранить некоторые параметры при работе программы
  3. class Host - будет хранить данные каждого хоста и методы по обработке
  4. class OutputList - будет сохранять в новый файл список серверов, прошедших проверку

Продвигаться в написании классов будем по порядку решения поставленной задачи.
Начнем с точки входа в программу. По умолчанию это метод main.

Python:
def main():
    parser = cmd_arg_parser()
    namespace = parser.parse_args(sys.argv[1:])
    input_f = namespace.input_file
    output_f = namespace.output_file
    output_list = OutputList()
    input_list = InputList()
    host = Host()
    io = InputOutput(input_f, output_f, input_list, output_list)
    input_list.handling_list(output_list, host, io)
    print('Filtered:', input_list.bad_host_count, 'bad hosts.', 'Passed the test:', output_list.count_of_good_hosts, 'hosts')

Пропустим пока строки 2-5. В строках 6-9 создаются объекты на основе классов.
Обратите внимание на строки 8 и 9. Здесь не просто создаются объекты, но им передаются другие объекты.
Так в строке 8 создается объект Host() и присваевается переменной host. В скобках передаются два объекта, которые были созданы в строками выше - это объекты входящего и исходящего списков. Они передаются в объект Host(), что бы он мог взаимодействовать со списками: менять значения их полей и использовать их методы.
В строке 10 вызывается метод объекта input_list. Обращение к методу объекта происходит через переменную, которой присвоен этот объект, после которой ставится точка.
После точки можно обратится к полю объекта или вызвать метод. Если вызывается метод, то он заканчивается круглыми скобками. Внутри скобок передаются параметры, необходимые для нормальной работы метода. В данном случае передаются 3 объекта.

Переходим к созданию классов.
Поскольку сначала нужно получить данные для проверки создадим класс class InputList.
Проанализируем какие атрибуты (поля класса) будут у объекта.
Python:
class InputList:
    def __init__(self):
        self.host_count = 0
        self.bad_host_count = 0
        self.current_line_count = 0
def __init__(self): это метод конструктор, он выполняется при создании объекта InputList.
self.host_count = 0 количество проверяемых ssh серверов
self.bad_host_count = 0 количество серверов, не ответивших на запрос
self.current_line_count = 0 номер текущей строки файла, из которого читается список
Все поля пустые, потому, что вновь созданный объект не загрузил пока никакие данные.

Создаем класс OutputList.
Python:
class OutputList:
    def __init__(self):
        self.count_of_good_hosts = 0
Конструктор очень простой, все, что мы будем хранить в объекте это количество проверенных хостов, они будут записаны в файл.

Создаем класс Host.
Python:
class Host:
    def __init__(self):
        self.user = None
        self.password = None
        self.ip = None
        self.port = None
        self.location = None
        self.start_time = timer()
        self.host_access_time = 0
В полях этого класса будут хранится учетные данные и адрес сервера, а так же его геолокация, и время доступа при авторизации на сервере.

Последний класс, который мы создадим это класс, отвечающий за файловые операции - InputOutput.
Python:
class InputOutput:
    def __init__(self, input_f, output_f, input_list, output_list):
        self.output_file = output_f
        self.input_file = input_f
        self.output_list = output_list
        self.input_list = input_list
В конструктор передаются имена входящего файла и файла с результатом, а так же объекты списков.

Теперь приступим к созданию методов классов. А вот тут начинаются сложности с объяснением. Для более ясного понимания лучше смотреть весь файл с программой, куски с комментариями будут просто подсказками.

Продолжаем наполнять класс InputList. Для начала узнаем сколько хостов нужно будет проверить, подсчитав количество строк в файле.
Python:
def hosts_counter(self, io):
    print('Counting host in file:', io.input_file)
    for line in io.read_data_from_file(flag='r'):
        self.host_count += 1
    return self.host_count
Для этого нужно прочитать файл. При чтении файла обычно создается список его содержимого (список списков строк), который помещается в оперативную память. Здесь есть определенная проблема - если файл будет достаточно большой, а ресурсы сервера ограничены, то чтение большого файла может израсходовать всю оперативную память виртуальной машины и привести системы в уход в swap. Чтобы такого не случилось лучше читать, обрабатывать и записывать по одной отдельной строке из файла.
Первый аргумент метода hosts_counter - self. self ссылается на сам класс, указывая, что это не какая то внешняя функция а именно метод этого класса. Поскольку все файловые операции будет выполнять класс InputOutput, вторым аргументом передается ссылка на объект io (см. метод main).
Во второй строке мы выводим на печать текст с переменной io.input_file - это обращение к полю объекта io с названием input_file.
Дальше создаем цикл, который последовательно прочтет все строки файла. На этот раз вызывается метод объекта io - read_data_from_file(flag='r') с именованным аргументом, который будет использоваться далее при открытии файла в режиме "только чтение". Если строка файла прочитана и не вызвано исключение в методе io.read_data_from_file(), то счетчик хостов увеличивается на единицу self.host_count += 1. В последней строке функция возвращает с помощью оператора return результат своей работы - поле объекта input_list подсчитанную сумму строк: self.host_count.
В методе hosts_counter() была ссылка на метод объекта io io.read_data_from_file() - создадим его в классе InputOutput.
Python:
def read_data_from_file(self, flag):
    try:
        with open(self.input_file, flag) as file:
            for line in file:
                yield line.strip().split(' ')
    except IOError:
        print("can't read from file, IO error")
        exit(1)
Вот здесь и появился именованный аргумент флаг. Действие функции обернуты в try-except для предотвращения падения программы без выясненной причины. В нашем случае если файл по какой то причине не может быть прочитан, то продолжать выполнение программы бессмысленно, поэтому она завершится после сообщения об ошибке ввода-вывода. И вот мы пришли к тому, что было сказано ранее о необходимости чтения файла и дальнейшей его обработки построчно. Это достигается использованием оператора yield. Если кратко, yield останавливает выполнения цикла до следующего вызова итератора цикла. То есть если просто вызвать где то в коде метод read_data_from_file() он прочтет только 1-ую строку файла и приостановит свою работу. Для того, что бы он отработал по всем строкам файла, метод должен быть вызван внутри другого цикла, в нашем случае он является итерируемым объектом (for line in io.read_data_from_file) в методе hosts_counter. Команда line.strip().split(' ') убирает символ перехода на новую строку и разбивает строку на подстроки по разделителю пробел. Таким образом данные о хосте выглядят как строка строк [[login], [pass], [ip], [port]].

Мы вынужденно отвлеклись от создания методов класса InputList, снова возвращаемся к нему.
Переходим к созданию метода, фактически управляющего всей последующей программой. Метод находится в InputList по той причине, что далее идет обработка данных именно этого списка. Но такое решение вопрос спорный и возможно такой метод было бы лучше разместить в функции main?
Напишите ваше мнение о размещении этого метода в комментариях.
Python:
def handling_list(self, output_list, host, io):
    print('Found', self.hosts_counter(io), 'hosts in list of file', io.input_file)
    for line in io.read_data_from_file(flag='r'):
        self.current_line_count += 1
        print('handling line#', self.current_line_count)
        check_result = host.check_host_data(line)
        if check_result:
            host.extract_host_data_from_line(line)
            connection = host.connect_to_host()
            if connection:
                prepare_data = output_list.prepare_data_to_write(line, host)
                write_line = io.write_data_to_file(prepare_data, output_list, flag='a')
                if write_line:
                    print('recorded line#', output_list.count_of_good_hosts, 'of', self.host_count)
            else:
                self.bad_host_count += 1
        else:
            self.bad_host_count += 1
Переходим к классу Host. Метод проверки данных для подключения check_host_data().
Python:
def check_host_data(self, line):
    print('Checking data of host', line[2])
    if len(line) == 4:
        return True
    else:
        print('no valid data in line')
        return False
Цикл, во второй строке дублирует цикл из предыдущего метода, проходя по итерируемому объекту читает строки из файла, увеличивая счетчик текущей строки на единицу, а затем переходит к строке проверки данных строки check_result = host.check_host_data(line). Дело в том, что данные в строке могут быть не корректны, например, может не хватать порта или пароля. Если список len(line) == 4 содержит 4 объекта, то проверка пройдена. Можно переходить к попытке подключения check_host(). Мы все еще в классе Host.
Python:
def check_host(self, line):
    if self.extract_host_data_from_line(line):
        print("Trying to connect to %s" % self.ip)
        self.connect_to_host()
        return True
    else:
        return False
Если удается извлечь все данные для подключения if self.extract_host_data_from_line(line), то можно подключаться.
Обратите внимание, переменные с self.user присваивают значением полям объекта Host. Специфика данного случая в том, что все созданные объекты находятся в единичном экземпляре. Даже вроде бы такой объект как host, нужен лишь при проверке своих данных, а дальше его поля перезаписываются данными из следующей строки.
Python:
def extract_host_data_from_line(self, data):
    self.user = data[0]
    self.password = data[1]
    self.ip = data[2]
    self.port = data[3]
    return True
После заполнения всех необходимых переменных для подключения делаем попытку соединения сервером self.connect_to_host() (мы все еще в классе Host)
Python:
def connect_to_host(self):
    try:
        ssh = paramiko.SSHClient()
        ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy())
        ssh.connect(hostname=self.ip, username=self.user, password=self.password, port=self.port, timeout=3)
        print('Connected to host', self.ip, 'Access time to host:', self.access_time(), 'seconds')
        return True
    except paramiko.AuthenticationException:
        print("Authentication failed when connecting to", self.ip)
        print('Host marked as bad.')
        return False
    except ConnectionError:
        print("Could not connect to %s" % self.ip)
        return False
Для работы используется библиотека paramiko, ее предварительно нужно установить
Python:
pip install paramiko
и импортировать в файле
Python:
import paramiko
Дальше дело самой библиотеки - создать подключение с учетными данными которые мы ей предоставим. Если подключение установлено, считаем проверку законченной. Если соединится не получилось, то тут два варианта либо сервер недоступен except ConnectionError:, либо неправильные учетные данные except paramiko.AuthenticationException:.
self.access_time() - считает время потраченное на подключение, что то вроде пинга, для которого я не нашел простого решения для Python, - все библиотеки, рассмотренные мной, вызывали нативную для ОС системную команду ping, вывод которой нужно было парсить.
Python:
def access_time(self):
    self.host_access_time = timer() - self.start_time
    return round(self.host_access_time, 2)
Возвращаемся к методу handling_list() класса InputList.
Python:
if connection:
    prepare_data = output_list.prepare_data_to_write(line, host)
Если соединение с сервером завершилось удачно, подготавливаем данные для записи в файл goods.txt. Работа с данными для результирующего файла возложена на метод prepare_data_to_write() класса OutputList. Превращаем все данные в строки иначе их не удастся объединить в одну строку, не забывая добавить в конце символ перехода на новую строку '\n', и возвращаем новую строку с данными для записи.
Python:
def prepare_data_to_write(self, line, host):
    joined = ' '.join(line) + ' '
    location = str(host.get_location(host.ip)) + ' '
    accsesstime = str(host.host_access_time)
    new_line = joined + location + accsesstime + '\n'
    return new_line
В файл goods.txt будут выводится не только учетные данные из исходного файла, но и время доступа к серверу во время попытки подключения и геолокация сервера.
Для определения местоположения сервера используется библиотека geoip2, ее предварительно нужно установить
Python:
pip install geoip2
и импортировать в файле
Python:
import geoip2.database
Для работы библиотеки скачиваем файл базы отсюда . И в класс Host добавляем метод
get_location(), который получает параметр в виде адреса, библиотека ищет совпадение по БД и возвращает результат в виде название страны на английском.
Python:
def get_location(self, ip):
    reader = geoip2.database.Reader('GeoLite2-Country.mmdb')
    response = reader.country(ip)
    return response.country.names['en']
Снова возвращаемся к методу handling_list() класса InputList.
Python:
write_line = io.write_data_to_file(prepare_data, output_list, flag='a')
    if write_line:
        print('recorded line#', output_list.count_of_good_hosts, 'of', self.host_count)
Пишем подготовленную строку в файл с помощью метода write_data_to_file() класса InputOutput. Управляющая конструкция с циклом for line in io.read_data_from_file(flag='r') находится в методе handling_list() класса InputList.
Python:
def write_data_to_file(self, line, output_list, flag):
    try:
        with open(self.output_file, flag) as file:
            file.write(line)
            output_list.count_of_good_hosts += 1
        return True
    except IOError:
        print("Can't write to output file, IO error")
        exit(1)
Аналогично, как и при чтении файла, заворачиваем всю конструкцию в try-except, с одной особенностью - параметр передаваемый при открытии файла будет не 'r', как при чтении, а 'a' add, строки будут добавлятся. В случае удачной записи поле с переменной проверенных хостов увеличивается на единицу. Если по каким то причинам запись невозможно работа программы будет остановлена.
Осталось обсудить первые несколько строк в функции main().
Для того, что бы можно было вводить собственные имена файлов для ввода ввода-вывода нужно обрабатывать аргументы переданные программе во время запуска. Для этого будет использоваться библиотеку argparse.
Python:
pip install argparse
в файле
Python:
import argparse
Работу парсера вынесем в отдельную функцию cmd_arg_parser().

Создаем объект парсера:
Python:
parser = argparse.ArgumentParser()
и передаем ему два аргумента для входящего и исходящего файла
Python:
parser.add_argument('-i', '--input_file', default='ssh_nocheck.txt')
parser.add_argument('-o', '--output_file', default='goods.txt')
именованные аргументы default используются для имен файлов по умолчанию, если при старте программы пользователь не ввел собственные имена файлов.
Python:
def cmd_arg_parser():
    parser = argparse.ArgumentParser()
    parser.add_argument('-i', '--input_file', default='ssh_nocheck.txt')
    parser.add_argument('-o', '--output_file', default='goods.txt')
    return parser


def main():
    parser = cmd_arg_parser()
    namespace = parser.parse_args(sys.argv[1:])
    input_f = namespace.input_file
    output_f = namespace.output_file
Вот и все, 164 строки кода.

Python:
#!/usr/bin/python

import paramiko
from timeit import default_timer as timer
import sys
import argparse
import geoip2.database


class InputOutput:
    def __init__(self, input_f, output_f, input_list, output_list):
        self.output_file = output_f
        self.input_file = input_f
        self.output_list = output_list
        self.input_list = input_list

    def read_data_from_file(self, flag):
        try:
            with open(self.input_file, flag) as file:
                for line in file:
                    yield line.strip().split(' ')
        except IOError:
            print("can't read from file, IO error")
            exit(1)

    def write_data_to_file(self, line, output_list, flag):
        try:
            with open(self.output_file, flag) as file:
                file.write(line)
                output_list.count_of_good_hosts += 1
            return True
        except IOError:
            print("Can't write to output file, IO error")
            exit(1)


class Host:
    def __init__(self):
        self.user = None
        self.password = None
        self.ip = None
        self.port = None
        self.location = None
        self.start_time = timer()
        self.host_access_time = 0

    def access_time(self):
        self.host_access_time = timer() - self.start_time
        return round(self.host_access_time, 2)

    def get_location(self, ip):
        reader = geoip2.database.Reader('GeoLite2-Country.mmdb')
        response = reader.country(ip)
        return response.country.names['en']

    def check_host_data(self, line):
        print('Checking data of host', line[2])
        if len(line) == 4:
            return True
        else:
            print('no valid data in line')
            return False

    def extract_host_data_from_line(self, data):
        self.user = data[0]
        self.password = data[1]
        self.ip = data[2]
        self.port = data[3]
        return True

    def connect_to_host(self):
        try:
            ssh = paramiko.SSHClient()
            ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy())
            ssh.connect(hostname=self.ip, username=self.user, password=self.password, port=self.port, timeout=3)
            print('Connected to host', self.ip, 'Access time to host:', self.access_time(), 'seconds')
            return True
        except paramiko.AuthenticationException:
            print("Authentication failed when connecting to", self.ip)
            print('Host marked as bad.')
            return False
        except ConnectionError:
            print("Could not connect to %s" % self.ip)
            return False

    def check_host(self, line):
        if self.extract_host_data_from_line(line):
            print("Trying to connect to %s" % self.ip)
            self.connect_to_host()
            return True
        else:
            return False


class OutputList:
    def __init__(self):
        self.count_of_good_hosts = 0

    def write_data(self, io, data):
        io.write_data_to_file(data, flag='a')

    def prepare_data_to_write(self, line, host):
        joined = ' '.join(line) + ' '
        location = str(host.get_location(host.ip)) + ' '
        accsesstime = str(host.host_access_time)
        new_line = joined + location + accsesstime + '\n'
        return new_line


class InputList:
    def __init__(self):
        self.host_count = 0
        self.bad_host_count = 0
        self.current_line_count = 0

    def hosts_counter(self, io):
        print('Counting host in file:', io.input_file)
        for line in io.read_data_from_file(flag='r'):
            self.host_count += 1
        return self.host_count

    def handling_list(self, output_list, host, io):
        print('Found', self.hosts_counter(io), 'hosts in list of file', io.input_file)
        for line in io.read_data_from_file(flag='r'):
            self.current_line_count += 1
            print('handling line#', self.current_line_count)
            check_result = host.check_host_data(line)
            if check_result:
                host.extract_host_data_from_line(line)
                connection = host.connect_to_host()
                if connection:
                    prepare_data = output_list.prepare_data_to_write(line, host)
                    write_line = io.write_data_to_file(prepare_data, output_list, flag='a')
                    if write_line:
                        print('recorded line#', output_list.count_of_good_hosts, 'of', self.host_count)
                else:
                    self.bad_host_count += 1
            else:
                self.bad_host_count += 1


def cmd_arg_parser():
    parser = argparse.ArgumentParser()
    parser.add_argument('-i', '--input_file', default='ssh_nocheck.txt')
    parser.add_argument('-o', '--output_file', default='goods.txt')
    return parser


def main():
    parser = cmd_arg_parser()
    namespace = parser.parse_args(sys.argv[1:])
    input_f = namespace.input_file
    output_f = namespace.output_file
    output_list = OutputList()
    input_list = InputList()
    host = Host()
    io = InputOutput(input_f, output_f, input_list, output_list)
    input_list.handling_list(output_list, host, io)
    print('Filtered:', input_list.bad_host_count, 'bad hosts.', 'Passed the test:', output_list.count_of_good_hosts, 'hosts')



if __name__ == "__main__":
    main()
 

Вложения

  • ssh_parser.zip
    ssh_parser.zip
    3,4 МБ · Просмотры: 1 241
  • kisspng-functional-programming-in-python-computer-programm596729048.jpg
    kisspng-functional-programming-in-python-computer-programm596729048.jpg
    40 КБ · Просмотры: 889
Подскажите, есть из всего списка spisok_in.txt небольшой список ip адресов, заканчивающийся на .1 (граничный маршрутизатор)
из всего этого списка, допустим 90 адресов подключаются нормально ssh username@15.12.12.1,
а есть адреса, около 35, которые подключаются с определенными ключами ssh -m hmac-md5 -c aes-sha256 username@15.12.16.1.
Подскажите, как переделать функцию, чтобы она проверяла, если не работает первый вариант, пробовала подключение по второму, который с ключами.


Сегодня как раз написал для себя чекер. Но только использую библиотеку pexpect. В общей сумме ~20 строчек кода и полностью рабочий чекер.

Можете показать, если не секрет?
на второму или третьем питоне?
 
Можете показать, если не секрет?
на второму или третьем питоне?
Python3
Показать к сожалению не могу, так как исходники на другом компьютере. Но могу рассказать и привести код, который я использовал.
Загружаете IP адреса с портами.
Передаете это в функцию, которая разбивает строку на host, ip, username, password. К примеру, при помощи метода split.
Дальше подключение:
Python:
s = pxssh.pxssh()
isGood = False
if not s.login(host, login, port, password):
    isGood = False
else:
    s.logout()
    isGood = True

    return isGood

Конечно же вы должны отловить исключения, которые будут.
И в главном коде вызывать эту функцию. Если она вернула True, то записать строку в валид.

Так же важная пометка, нужно изменить немного код библиотеки. (Файл: /usr/lib/python3/dist-packages/pexpect/pxssh.py)
Найти функцию логин и поменять прототип на:

Python:
def login(self, server, username, port, password='', ...)

Где ... там остальные параметры. Ну и конечно порт с конца убрать, раз мы его переместили вперед. Такой прототип нам позволит подключаться к любому порту.

Вот вам основа дана, прикрутите еще какие-то фичи, если надо. Я сделал еще счетчик валида и запись этого валида в файл. Удачи, надеюсь помог!
 
  • Нравится
Реакции: id2746 и centr
Вот вам основа дана, прикрутите еще какие-то фичи, если надо. Я сделал еще счетчик валида и запись этого валида в файл. Удачи, надеюсь помог!

спасибо.
но проблема в том что это 3 питон...
а у меня токмо 2.7 можно.
и лишние библиотеки ставить - это целая эпопея и гора бумаг...

мне бы уже существующую допилить напильником))
 
Мы в соцсетях:

Обучение наступательной кибербезопасности в игровой форме. Начать игру!