• 15 апреля стартует «Курс «SQL-injection Master» ©» от команды The Codeby

    За 3 месяца вы пройдете путь от начальных навыков работы с SQL-запросами к базам данных до продвинутых техник. Научитесь находить уязвимости связанные с базами данных, и внедрять произвольный SQL-код в уязвимые приложения.

    На последнюю неделю приходится экзамен, где нужно будет показать свои навыки, взломав ряд уязвимых учебных сайтов, и добыть флаги. Успешно сдавшие экзамен получат сертификат.

    Запись на курс до 25 апреля. Получить промодоступ ...

Статья Реализация метода HTTP-запроса «GET» с помощью socket в Python

Наверное все пользовались библиотекой requests и знакомы с ее методом GET. Давайте попробуем с помощью библиотеки socket реализовать модуль для выполнения простых функций данного метода, таких как: статус-код, заголовки и тело запроса.

000.jpg

Конечно же, мы не будем писать свою библиотеку requests, потому как в этом нет необходимости. Но, думаю, что нужно знать, как выполнить запрос к сайту или по определенному адресу с помощью socket. Иногда появляются такие задачи, где requests, просто не работает. Для примера, есть онлайн трансляции, адрес которых начинается не с привычных http или https, а с другого протокола rtmp. Если попробовать выполнить запрос указав такой адрес в requests, вы получите сообщение о том, что невозможно выполнить соединение потому, что схема адреса не соответствует стандартам. А вот с помощью сокетов это реализовать вполне по силам. Итак, давайте перейдем от теории к делу и откроем IDE, в которой вы пишете код Python.



Что потребуется?

В данном коде необходимо будет установить только одну стороннюю библиотеку, которая не будет напрямую относиться к будущему кода, а потребуется только для валидации ссылок. Это библиотека validators. Для ее установки пишем в терминале:

pip install validators


Импорт библиотек в скрипт

После установки библиотеки, нужно импортировать в скрипт все, что потребуется для его реализации. Потребуется нам библиотека socket, для установки соединения и получения данных; библиотека ssl, которая обеспечивает доступ к средствам шифрования безопасности транспортного уровня ((Transport Layer Security, TLS); модуль urlparse для разбора ссылки на составные части.

Python:
import socket
import ssl
import sys
from urllib.parse import urlparse

import validators


Создание класса для получения данных

Для нашей мини-имитации метода «GET» создадим класс SockGet и установим необходимые для работы переменные, чтобы мы могли получить к ним доступ из внешней функции. Определим статус-код, заголовки, контент и текст. Если данные получены не будут, то эти переменные так и останутся со значением None.

Python:
class SockGet:
    def __init__(self):
        self.status_code = None
        self.headers = None
        self.content = None
        self.text = None


Получение порта из ссылки

Для того, чтобы установить соединение с хостом, необходимо понимать, на какой порт это соединение устанавливать. А значит, нужно проанализировать ссылку полученную на входе и установить порт исходя из этого небольшого анализа. Создадим статический метод класса __get_port(url: str) -> int, который на входе принимает ссылку, а возвращает номер порта. Для начала заглянем в ссылку и проверим, нет ли в ней «:». Так как, если двоеточие есть, это может означать, что после него указан порт, с которым нужно будет устанавливать соединение. Если оно есть, забираем из него порт. Если нет, двигаемся дальше и проверяем, какой протокол используется в ссылке. Если это «http», порт будет 80, если «https», то 443. После того, как получим порт, возвращаем его из метода.

Python:
    @staticmethod
    def __get_port(url: str) -> int:
        if ":" in urlparse(url).netloc:
            return int(str(urlparse(url).netloc).split(":")[-1])
        else:
            return 80 if urlparse(url).scheme == "http" else 443


Разбор ссылки на составляющие

Для того, чтобы установить соединение с сайтом, нам понадобиться получить из ссылки хост, путь к странице, а также параметры, которые следуют за «?». Создадим статический метод __get_path(url: str) -> str. На вход он принимает ссылку, а на выходе возвращает путь с параметрами, если таковые имеются.

Иногда таковых просто не будет, и тогда нужно будет вернуть «/», поэтому проверяем путь к странице. Если путь к странице не пуст, получаем параметры, то есть, то что идет за «?». Делим ссылку по вопросу, забираем последний элемент, а затем собираем ключ с добавлением вопроса. Дело в том, что функция urlparse(url).path, возвращает путь без параметров. А иногда они нужны для доступа к странице или ресурсу. Ну, а дальше забираем путь, добавляем полученные параметры и возвращаем из метода. Если параметров нет, просто возвращаем путь.

Python:
    @staticmethod
    def __get_path(url: str) -> str:
        if not urlparse(url).path:
            return "/"
        else:
            if "?" in url:
                key = f'?{url.split("?")[-1]}'
                return f'{urlparse(url).path}{key}'
            else:
                return urlparse(url).path


Функция получения данных

Создадим функцию get(self, url: str, timeout). На вход она принимает ссылку и значение таймаута, которое может принимать два значения, как кортеж из двух значений, так и одно значение. Если вы помните, то в requests, если указать одну цифру в таймауте, то значение ожидания до соединения и ожидание до чтения контента становятся равны переданному значению. В нашем случае будет точно также. Проверяем тип переменной timeout, если это не кортеж, тогда присваиваем ей значение кортежа, в который передаем полученное значение таймаута. Затем проверяем валидатором переданную в функцию ссылку. Если она проходит проверку, получаем значение порта, хоста и пути к странице.

Python:
    def get(self, url: str, timeout):
        if type(timeout) != tuple:
            timeout = (timeout, timeout)
        if validators.url(url):
            port = self.__get_port(url)
            host = urlparse(url).hostname
            path = self.__get_path(url)

Затем создаем объект сокета, указываем, что мы будем использовать интернет соединение, а также протокол TCP, то есть, будет создаваться двустороннее соединение. Устанавливаем значение первого таймаута из кортежа. Определяем переменную response, куда будем складывать данные полученные в ответе от сервера.

Python:
            sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
            sock.settimeout(timeout[0])
            response = b""

Оборачиваем дальнейший код в блок try — except, чтобы была возможность поймать все исключения. Устанавливаем соединение с хостом. IP-адрес хоста сокет получит самостоятельно, не обязательно для этого использовать socket.gethostbyname(). Проверяем, по какому протоколу устанавливается соединение. Если это «https», создаем новый контекст ssl, с безопасными настройками по умолчанию, и обернем в эти настройки наше соединение. После этого сформируем строку запроса, куда вставим полученные значения пути и хоста и отправим с помощью фукнции sendall. Затем, так как соединение уже установлено, передаем в сокет новый таймаут, то есть второе значение из кортежа.

Python:
            try:
                sock.connect((host, port))

                if port == 443:
                    context = ssl.create_default_context()
                    sock = context.wrap_socket(sock, server_hostname=host)

                sock.sendall(f"GET {path} HTTP/1.1\r\nHost:{host}\r\n\r\n".encode())
                sock.settimeout(timeout[1])

Теперь необходимо получить данные. Тут у меня был небольшой стопор. Перелопатил «гору» всяких сайтов и ответов, но везде натыкался на один и тот же нерабочий код. Все дело в том, что данные, если это сайт, нужно получить все. И для этого используется бесконечный цикл. Но, вот выход из цикла указанный в примерах просто не работал. Ну не становилась проверяемая переменная None, False или хотя бы просто пустой. И все время я вылетал по таймауту. Понятное дело, что данные получались, но так как срабатывало исключение, ничего и никуда не передавалось.
Я уже было подумал, что нужно в обработку исключение запихать получение статус-кода и прочего, но вовремя остановился )), так как это, все же неправильно. Тогда я посмотрел на контент, который получаем от сервера. И вот, в самом конце я увидел то, что нужно, а именно строку, которая и означает завершение передачи контента. Тут уже точно не уверен, но эта строка передается на всех сайтах, которые я тестировал. Я стал проверять нужное значение и о чудо, все получилось. Собственно, вот код. То есть, пытаемся получить весь контент, после чего, как будет достигнут конец передачи, выходим из бесконечного цикла. Конечно, это не сработает для потоковой передачи, так как конца передачи просто не будет. Но, здесь код просто выйдет по таймауту. И в обработку исключения можно добавить, чтобы возвращались полученные данные. В данном же коде этого нет.

Python:
                while True:
                    data = sock.recv(1024)
                    response += data
                    if b'\r\n0\r\n\r\n' in data:
                        break
                    elif b'\r\n\r\n' in data:
                        break
                    elif not data:
                        break

Дальше обрабатываем полученные данные. Получаем статус код, заголовки и контент с текстом. Закрываем соединение и выходим из функции. Ну и обработка исключения выхода по таймауту, а также вывод сообщения в случае, если пользователь ввел невалидную ссылку.

Python:
                if response:
                    self.status_code = self.__get_status_code(response)
                    self.headers = self.__get_headers(response)
                    self.content = self.__get_content(port, response)
                    self.text = self.__get_text(port, response)
                sock.close()
            except Exception as ex:
                sock.close()
                return ex
        else:
            print("Введен некорректный url-адрес")
            return

Python:
    def get(self, url: str, timeout):
        if type(timeout) != tuple:
            timeout = (timeout, timeout)
        if validators.url(url):
            port = self.__get_port(url)
            host = urlparse(url).hostname
            path = self.__get_path(url)

            sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
            sock.settimeout(timeout[0])
            response = b""
            try:
                sock.connect((host, port))

                if port == 443:
                    context = ssl.create_default_context()
                    sock = context.wrap_socket(sock, server_hostname=host)

                sock.sendall(f"GET {path} HTTP/1.1\r\nHost:{host}\r\n\r\n".encode())
                sock.settimeout(timeout[1])

                while True:
                    data = sock.recv(1024)
                    response += data
                    if b'\r\n0\r\n\r\n' in data:
                        break
                    elif b'\r\n\r\n' in data:
                        break
                    elif not data:
                        break

                if response:
                    self.status_code = self.__get_status_code(response)
                    self.headers = self.__get_headers(response)
                    self.content = self.__get_content(port, response)
                    self.text = self.__get_text(port, response)
                sock.close()
            except Exception as ex:
                sock.close()
                return ex
        else:
            print("Введен некорректный url-адрес")
            return


Получение статус-кода

Создадим статический метод класса, с помощью которого будем получать статус-код из полученного контента. Я назвал метод __get_status_code(response: bytes) -> int, на вход он получает загруженный контент, а возвращает целое число, которое и будет статус-кодом. Для начала декодируем полученные данные. Затем разбиваем текст по символам переноса строки, забираем первый элемент, разбиваем на строки, забираем первую, которую снова разбиваем уже по пробелам и объединяем срез с помощью join, после чего оборачиваем в int и возвращаем из функции.

Python:
    @staticmethod
    def __get_status_code(response: bytes) -> int:
        return int(" ".join(response.decode().split('\r\n\r\n')[0].splitlines()[0].split()[1:-1]))


Получение заголовков

Заголовки возвращаются в виде строк, которые довольно легко можно поместить в словарь, который и вернуть из функции. Создаем пустой словарь. В цикле итерируемся по контенту, который предварительно был разбит на строки и оттуда была получена нужная часть текста. После чего, каждую строку разбиваем по «:» и складываем в словарь.

Python:
    @staticmethod
    def __get_headers(response: bytes) -> dict:
        headers = dict()
        for head in response.decode().split('\r\n\r\n')[0].splitlines()[1:]:
            headers.update({head.split(": ")[0]: head.split(": ")[1]})
        return headers


Получение контента

Данный код будет работать, думаю, что не всегда, так как не весь контент можно декодировать в стандартной кодировке. Тем не менее, это работает для многих сайтов. Создадим метод __get_content(port: int, response: bytes) -> bytes, который на входе получает порт и загруженный контент. Далее проверяем порт, если это 80, то в нем нужно будет дополнительно убрать один лишний элемент из текста. Если порт 443, элемента там нет. Поэтому, декодируем полученный контент, разбиваем в список, забираем элемент с контентом, затем, снова кодируем, после чего возвращаем из функции.

Python:
    @staticmethod
    def __get_content(port: int, response: bytes) -> bytes:
        if port == 80:
            return response.decode().split('\r\n\r\n')[1].split("\r\n")[1].encode()
        return response.decode().split('\r\n\r\n')[1].encode()


Получение текста

Текст получается по тому же принципу, что и контент. Только, если в случае с контентом, перед тем, как его вернуть из функции мы его кодируем, то здесь возвращаем уже декодированное содержимое.

Python:
    @staticmethod
    def __get_text(port: int, response: bytes) -> str:
        if port == 80:
            return response.decode().split('\r\n\r\n')[1].split("\r\n")[1]
        return response.decode().split('\r\n\r\n')[1]


Функция main

Ну и последняя функция, которая здесь не обязательна. Ее я сделал только лишь для того, чтобы продемонстрировать получение статус кода и заголовков. Создаем объект класса. Затем передаем, в метод get, введенную пользователем ссылку, а также timeout, который так же можно запрашивать у пользователя. Здесь он равен 5. Также можно передать кортеж из двух значений. Ну и после проверяем возвращаемое значение. То есть, из функции может вернуться ошибка. Если она равна None, ничего не печатаем. Если же есть ошибка, выводим ее в терминал и завершаем работу. Проверяем статус-код. Если он 200, печатаем его, а также печатаем заголовки из полученного ответа сервера. Ну или, если не 200 статус-код, то просто печатаем его для пользователя в терминале.

Python:
def main():
    url = input("Введите ссылку на страницу: ")

    req = SockGet()
    ex = req.get(url=url, timeout=5)
    if ex:
        print(f"Error: {ex}")
        sys.exit(0)
    if req.status_code == 200:
        print(req.status_code)
        print(req.headers)
    else:
        print(f"Status Code: {req.status_code}")


Python:
import socket
import ssl
import sys
from urllib.parse import urlparse

import validators


class SockGet:
    def __init__(self):
        self.status_code = None
        self.headers = None
        self.content = None
        self.text = None

    @staticmethod
    def __get_port(url: str) -> int:
        if ":" in urlparse(url).netloc:
            return int(str(urlparse(url).netloc).split(":")[-1])
        else:
            return 80 if urlparse(url).scheme == "http" else 443

    @staticmethod
    def __get_path(url: str) -> str:
        if not urlparse(url).path:
            return "/"
        else:
            if "?" in url:
                key = f'?{url.split("?")[-1]}'
                return f'{urlparse(url).path}{key}'
            else:
                return urlparse(url).path

    def get(self, url: str, timeout):
        if type(timeout) != tuple:
            timeout = (timeout, timeout)
        if validators.url(url):
            port = self.__get_port(url)
            host = urlparse(url).hostname
            path = self.__get_path(url)

            sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
            sock.settimeout(timeout[0])
            response = b""
            try:
                sock.connect((host, port))

                if port == 443:
                    context = ssl.create_default_context()
                    sock = context.wrap_socket(sock, server_hostname=host)

                sock.sendall(f"GET {path} HTTP/1.1\r\nHost:{host}\r\n\r\n".encode())
                sock.settimeout(timeout[1])

                while True:
                    data = sock.recv(1024)
                    response += data
                    if b'\r\n0\r\n\r\n' in data:
                        break
                    elif b'\r\n\r\n' in data:
                        break
                    elif not data:
                        break

                if response:
                    self.status_code = self.__get_status_code(response)
                    self.headers = self.__get_headers(response)
                    self.content = self.__get_content(port, response)
                    self.text = self.__get_text(port, response)
                sock.close()
            except Exception as ex:
                sock.close()
                return ex
        else:
            print("Введен некорректный url-адрес")
            return

    @staticmethod
    def __get_status_code(response: bytes) -> int:
        return int(" ".join(response.decode().split('\r\n\r\n')[0].splitlines()[0].split()[1:-1]))

    @staticmethod
    def __get_headers(response: bytes) -> dict:
        headers = dict()
        for head in response.decode().split('\r\n\r\n')[0].splitlines()[1:]:
            headers.update({head.split(": ")[0]: head.split(": ")[1]})
        return headers

    @staticmethod
    def __get_content(port: int, response: bytes) -> bytes:
        if port == 80:
            return response.decode().split('\r\n\r\n')[1].split("\r\n")[1].encode()
        return response.decode().split('\r\n\r\n')[1].encode()

    @staticmethod
    def __get_text(port: int, response: bytes) -> str:
        if port == 80:
            return response.decode().split('\r\n\r\n')[1].split("\r\n")[1]
        return response.decode().split('\r\n\r\n')[1]


def main():
    url = input("Введите ссылку на страницу: ")

    req = SockGet()
    ex = req.get(url=url, timeout=5)
    if ex:
        print(f"Error: {ex}")
        sys.exit(0)
    if req.status_code == 200:
        print(req.status_code)
        print(req.headers)
    else:
        print(f"Status Code: {req.status_code}")


if __name__ == "__main__":
    main()

А вот ответ, который мы получаем при выполнении запроса к сайту:

01.png


Однако, такое получается не со всеми сайтами. К примеру, не получается с Codeby, на некоторых страницах. Могу предположить, что так как форум за CDN, то у сокета возникают проблемы при получении ip-адреса. Но, это не точно.

Вот еще скрин, где ответ от страницы с форума получен:

02.png


И для примера, вот как я использую похожий код для проверки "живости" ссылки. В данном коде мне не особо важно, какой возвратился статус-код, мне просто нужно понять, есть ли ответ от сервера:

Python:
import socket
import ssl
from urllib.parse import urlparse


def port_get(url: str) -> int:
    if ":" in urlparse(url).netloc:
        return int(str(urlparse(url).netloc).split(":")[-1])
    else:
        return 80 if urlparse(url).scheme == "http" else 443


def chunk_url(url: str) -> tuple:
    key = f'?{url.split("/")[-1].split("?")[-1]}' if "?" in url else ""
    host = urlparse(url).hostname
    path = f'{urlparse(url).path}{key}'
    return host, path


def sock_ch(url: str) -> bool:
    port = port_get(url)
    host, path = chunk_url(url)
    sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
    sock.settimeout(5)
    try:
        sock.connect((host, port))

        if port == 443:
            context = ssl.create_default_context()
            sock = context.wrap_socket(sock, server_hostname=host)

        sock.sendall(f"GET {path} HTTP/1.1\r\nHost:{host}\r\n\r\n".encode())

        data = sock.recv(1024)
        sock.close()

        if data:
            return True
        return False
    except Exception:
        sock.close()
        return False

А на этом, пожалуй, все.

Спасибо за внимание. Надеюсь данная информация будет вам полезна
 

Вложения

  • resp_class.py.zip
    1,3 КБ · Просмотры: 91
Последнее редактирование:
Мы в соцсетях:

Обучение наступательной кибербезопасности в игровой форме. Начать игру!