• Курсы Академии Кодебай, стартующие в мае - июне, от команды The Codeby

    1. Цифровая криминалистика и реагирование на инциденты
    2. ОС Linux (DFIR) Старт: 16 мая
    3. Анализ фишинговых атак Старт: 16 мая Устройства для тестирования на проникновение Старт: 16 мая

    Скидки до 10%

    Полный список ближайших курсов ...

Статья Загрузка файла частями с помощью и без использования потоков в Python

Загрузка файлов из интернета — это всегда интересная задача. Конечно, в нынешнее время, когда скорости соединения переваливают за сотню мегабит в секунду, это вопрос не такой актуальный, как раньше. Мне стало интересно потестировать скорость загрузки файлов и изображений с разных сайтов с использованием Python.

000.jpeg


Может быть вы помните, был такой Download Manager. Впрочем, он и сейчас существует. Но когда-то его популярность просто зашкаливала. Еще бы. Позволяет качать файлы в несколько потоков. Докачивает файлы при обрыве соединения. Но, речь сейчас не о нем. Давайте попробуем реализовать многопоточную загрузку файлов с помощью Python и сравнить, сильно ли отличается скорость загрузки.


Что потребуется?

Для работы скриптов потребуется установить библиотеку requests. С ее помощью мы будем загружать тестируемые файлы. Для ее установки пишем в терминале:

pip install requests

Больше в сторонних библиотеках, в данном случае, потребности больше не возникнет. Все остальное есть в Python «из коробки».

Для начала создадим четыре файла, в которых реализуем похожий функционал, который будет различаться только деталями.

Первый файл — загрузка файла полностью. То есть, по сути, обычная загрузка с помощью requests.

Второй файл — загрузка с помощью того же requests, но частями. То есть, будем запрашивать у сервера нужное количество байт в определенном диапазоне и скачивать его на диск. После чего, соберем загруженные части.

Третий файл — делает то же, что и второй, но использует для загрузки запрашиваемых частей потоки.

Четвертый файл — аналогичный функционал, но уже добавлена асинхронность. Скрипт для четвертого файла я нашел на . Внес только небольшие изменения. Это к вопросу, что вы где-то это уже видели ))


Создаем первый файл

Для начала импортируем нужные библиотеки, а также определим заголовки для запроса.

Python:
import time
from pathlib import Path
from urllib.parse import urlparse

import requests

headers = {
        "user-agent": "Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/106.0.0.0 "
                      "YaBrowser/22.11.3.838 Yowser/2.5 Safari/537.36",
        "accept": "text/html,application/xhtml+xml,application/xml;q=0.9,image/avif,image/webp,image/apng,*/*;q=0.8,"
                  "application/signed-exchange;v=b3;q=0.9"
    }


Функция загрузки файла

Создадим функцию download(url). На вход она принимает ссылку к загружаемому файлу. Выполним запрос, в который передадим ссылку на файл, заголовки, выставим значение потоковой загрузки в True, а также, хотя это и не обязательно, укажем явно, что запрос должен выполнять автоматическую переадресацию, с помощью параметра allow_redirects. Проверяем статус-код. Если он в двухсотом диапазоне, двигаемся дальше. Пытаемся получить имя файла из заголовков, которое иногда передается в ключе filename. Проверяем, если имя из заголовков не получено, устанавливаем имя из ссылки, которую берем уже из запроса, так как может быть редирект. Не на всех сайтах указывают прямые ссылки на файлы. Открываем файл на запись в байтовом режиме, итерируемся по получаемому контенту и записываем полученное содержимое на диск. Если же статус код не в двухсотом диапазоне, выходим из функции. Обрабатывать данные ошибки я не стал, так как скрипты больше для тестирования. Ну и в случае возникновения исключения выводим его на экран и также выходим из функции.

Python:
def download(url):
    try:
        rs = requests.get(url=url, headers=headers, stream=True, allow_redirects=True)
        if 200 <= rs.status_code <= 299:
            name = rs.headers.get('filename')
            if not name:
                name = Path(urlparse(rs.url).path).name
            with open(name, 'wb') as file:
                for part in rs.iter_content(1024):
                    file.write(part)
        else:
            return
    except Exception as ex:
        print(ex)
        return


Функция main

Здесь все стандартно. Запрашиваем у пользователя ссылку. Получаем время запуска скрипта. Передаем в функцию загрузки ссылку. Выводим в терминал время работы.

Python:
def main():
    url = input("Введите ссылку для загрузки: ")
    t = time.time()
    download(url)
    print(f'Время загрузки файла: {time.time() - t:.2f}')


if __name__ == "__main__":
    main()

Python:
import time
from pathlib import Path
from urllib.parse import urlparse

import requests

headers = {
        "user-agent": "Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/106.0.0.0 "
                      "YaBrowser/22.11.3.838 Yowser/2.5 Safari/537.36",
        "accept": "text/html,application/xhtml+xml,application/xml;q=0.9,image/avif,image/webp,image/apng,*/*;q=0.8,"
                  "application/signed-exchange;v=b3;q=0.9"
    }


def download(url):
    try:
        rs = requests.get(url=url, headers=headers, stream=True, allow_redirects=True)
        if 200 <= rs.status_code <= 299:
            name = rs.headers.get('filename')
            if not name:
                name = Path(urlparse(rs.url).path).name
            with open(name, 'wb') as file:
                for part in rs.iter_content(1024):
                    file.write(part)
        else:
            return
    except Exception as ex:
        print(ex)
        return


def main():
    url = input("Введите ссылку для загрузки: ")
    t = time.time()
    download(url)
    print(f'Время загрузки файла: {time.time() - t:.2f}')


if __name__ == "__main__":
    main()


Создаем второй файл

Импортируем необходимые для работы библиотеки.

Python:
import time
from pathlib import Path
from urllib.parse import urlparse

import requests

headers = {
        "user-agent": "Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/106.0.0.0 "
                      "YaBrowser/22.11.3.838 Yowser/2.5 Safari/537.36",
        "accept": "text/html,application/xhtml+xml,application/xml;q=0.9,image/avif,image/webp,image/apng,*/*;q=0.8,"
                  "application/signed-exchange;v=b3;q=0.9"
    }


Функция получения размера загружаемого файла

Так как в данном скрипте мы будем загружать файл по частям, нужно получить его размер. Как правило, он передается сервером в заголовках. Случаи, когда его не окажется в данном случае не обрабатываются.

Создадим функцию get_size(url). На вход она принимает ссылку на загружаемый файл. Определим переменную для названия файла, чтобы ее было видно во всей функции. Выполняем запрос заголовков с помощью метода head. Обязательно выставляем параметр allow_redirects, так как автоматически в данном случае переадресация не делается, а нам нужно получить размер, который может скрываться за ссылкой из переадресации. Проверяем статус-код. Пытаемся получить название файла из заголовков. Если его нет, забираем название из ссылки. Затем забираем из заголовков размер файла по ссылке и возвращаем из функции вместе с именем. Ну и частично обрабатываем возникшие исключения. Однако, только частично. Так как их обработка не производиться. Задача скрипта протестировать скорость загрузки.

Python:
def get_size(url):
    name = ""
    try:
        rs = requests.head(url=url, headers=headers, allow_redirects=True)
        if 200 <= rs.status_code <= 299:
            name = rs.headers.get('filename')
            if not name:
                name = Path(urlparse(rs.url).path).name
            if rs.headers.get("Content-Length"):
                return int(rs.headers.get("Content-Length")), name
            return 0, name
    except Exception:
        return 0, name


Функция загрузки файла

Здесь, почти то же самое, что и в предыдущем скрипте. Однако, так как мы загружаем файл частями, необходимы некоторые изменения. Создаем функцию download(url, start, end, output). Как видим, на вход она принимает уже больше параметров. Ссылку на файл, стартовый диапазон загрузки фрагмента. Конечный диапазон, а также имя загружаемого файла. Здесь у нас будут свои заголовки, в которые добавим один параметр: Range, в виде словаря, в котором и будет указываться стартовый байт и конечный для загрузки файла. Выполняем запрос, куда передаем ссылку для загрузки, заголовки с диапазоном, включаем параметр потоковой загрузки, а также переадресацию. Проверяем статус-код. Данный диапазон выбран потому, что при данном способе загрузки, статус-код будет не 200, а 206, что означает «частичное содержимое». Ну, а далее все стандартно. Отрываем файла на запись в байтовом режиме, итерируемся по полученному контенту и записываем загруженную часть в файл. Так как это только часть файла, в параметре output будет передаваться не полное имя файла, а имя с индексом, который будет необходим впоследствии для того, чтобы собрать целый файл из загруженных частей.

Python:
def download(url, start, end, output):
    header = {
        "user-agent": "Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/106.0.0.0 "
                      "YaBrowser/22.11.3.838 Yowser/2.5 Safari/537.36",
        "accept": "text/html,application/xhtml+xml,application/xml;q=0.9,image/avif,image/webp,image/apng,*/*;q=0.8,"
                  "application/signed-exchange;v=b3;q=0.9",
        'Range': f'bytes={start}-{end}'
    }

    try:
        rs = requests.get(url=url, headers=header, stream=True, allow_redirects=True)
        if 200 <= rs.status_code <= 299:
            with open(output, 'wb') as file:
                for part in rs.iter_content(1024):
                    file.write(part)
        else:
            return
    except Exception as ex:
        print(ex)
        return


Функция main

Здесь мы запрашиваем ссылку на загрузку у пользователя, получаем время старта скрипта, получаем размер и имя файла. Вычисляем размер части. В данном случае файл поделим на три. Определяем диапазон равный размеру файла, с шагом равным размеру части. Итерируемся по диапазону. С помощью enumerate получаем индексы частей. Передаем полученные значения в функцию загружки. Конечный диапазон делаем равным стартовому диапазону + размер части — 1. Также передаем название файла с индексом.

После того, как будет выполнена загрузка, открываем результирующий файла на запись, итерируемся по диапазону равному количеству частей. Считываем из файлов с индексами содержимое и записываем в результирующий файл. После чего удаляем загруженную часть. Ну и выводим в терминал время загрузки файла.

Python:
def main():
    url = input("Введите ссылку для загрузки: ")
    t = time.time()
    file_size, name = get_size(url)
    chunk_size = int(file_size / 3)
    chunks = range(0, file_size, chunk_size)
    for i, chunk in enumerate(chunks):
        download(url, chunk, chunk + chunk_size - 1, f'{Path(url).name}.{i}')

    with open(name, "wb") as file:
        for i in range(len(chunks)):
            with open(f'{Path(url).name}.{i}', 'rb') as ch:
                file.write(ch.read())
            Path(f'{Path(url).name}.{i}').unlink()

    print(f'Время загрузки файла частями без потоков: {time.time() - t:.2f}')


if __name__ == "__main__":
    main()

Python:
import time
from pathlib import Path
from urllib.parse import urlparse

import requests

headers = {
        "user-agent": "Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/106.0.0.0 "
                      "YaBrowser/22.11.3.838 Yowser/2.5 Safari/537.36",
        "accept": "text/html,application/xhtml+xml,application/xml;q=0.9,image/avif,image/webp,image/apng,*/*;q=0.8,"
                  "application/signed-exchange;v=b3;q=0.9"
    }


def get_size(url):
    name = ""
    try:
        rs = requests.head(url=url, headers=headers, allow_redirects=True)
        if 200 <= rs.status_code <= 299:
            name = rs.headers.get('filename')
            if not name:
                name = Path(urlparse(rs.url).path).name
            if rs.headers.get("Content-Length"):
                return int(rs.headers.get("Content-Length")), name
            return 0, name
    except Exception:
        return 0, name


def download(url, start, end, output):
    header = {
        "user-agent": "Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/106.0.0.0 "
                      "YaBrowser/22.11.3.838 Yowser/2.5 Safari/537.36",
        "accept": "text/html,application/xhtml+xml,application/xml;q=0.9,image/avif,image/webp,image/apng,*/*;q=0.8,"
                  "application/signed-exchange;v=b3;q=0.9",
        'Range': f'bytes={start}-{end}'
    }

    try:
        rs = requests.get(url=url, headers=header, stream=True, allow_redirects=True)
        if 200 <= rs.status_code <= 299:
            with open(output, 'wb') as file:
                for part in rs.iter_content(1024):
                    file.write(part)
        else:
            return
    except Exception as ex:
        print(ex)
        return


def main():
    url = input("Введите ссылку для загрузки: ")
    t = time.time()
    file_size, name = get_size(url)
    chunk_size = int(file_size / 3)
    chunks = range(0, file_size, chunk_size)
    for i, chunk in enumerate(chunks):
        download(url, chunk, chunk + chunk_size - 1, f'{Path(url).name}.{i}')

    with open(name, "wb") as file:
        for i in range(len(chunks)):
            with open(f'{Path(url).name}.{i}', 'rb') as ch:
                file.write(ch.read())
            Path(f'{Path(url).name}.{i}').unlink()

    print(f'Время загрузки файла частями без потоков: {time.time() - t:.2f}')


if __name__ == "__main__":
    main()


Создаем третий файл

Содержимое третьего файла будет отличаться от второго совсем незначительно. Добавим сюда потоки, с помощью которых будем загружать части файла. Описывать скрипт полностью нет необходимости, потому опишу только изменения. Добавим функцию thread_run(url, count). В нее мы будем передавать ссылку для загрузки и количество потоков, которое будет равно количеству частей файла. В нашем случае их 3. Перенесем из функции main получение размера и имени файла в данную функцию. Вычисляем количество частей и создаем диапазон, по которому будем итерироваться. Запускаем с помощью контекстного менеджера with ThreadPoolExecutor, в который передаем количество потоков, и определяем его имя для краткости и удобства. Итерируемся по диапазонам и запускаем потоки, в которых передаем имя функции и переменные нужные ей в работе. После чего возвращаем из функции количество частей и имя файла.

Python:
def thread_run(url, count):
    file_size, name = get_size(url)
    chunk_size = int(file_size / count)
    chunks = range(0, file_size, chunk_size)

    with ThreadPoolExecutor(max_workers=count) as executor:
        for i, chunk in enumerate(chunks):
            executor.submit(download, url=url, start=chunk, end=chunk + chunk_size - 1, output=f'{Path(url).name}.{i}')

    return len(chunks), name

Как я уже написал выше, из функции main была убрана часть, отвечающая за получение размера и имени файла. Добавим сюда запуск функции с потоками, которые возвращают значения диапазона частей и имя файла. Ну, а дальше как и во-второй функции. Отрываем файл на запись. Итерируемся по диапазону из количества частей. Читаем из содержимое и записываем в результирующий файл.

Python:
def main():
    url = input("Введите ссылку для загрузки: ")
    t = time.time()
    chunks, name = thread_run(url, 3)

    with open(name, "wb") as file:
        for i in range(len(chunks)):
            with open(f'{Path(url).name}.{i}', 'rb') as ch:
                file.write(ch.read())
            Path(f'{Path(url).name}.{i}').unlink()

    print(f'Время загрузки файла частями с потоками: {time.time() - t:.2f}')


if __name__ == "__main__":
    main()

Python:
import time
from concurrent.futures import ThreadPoolExecutor
from pathlib import Path
from urllib.parse import urlparse

import requests

headers = {
        "user-agent": "Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/106.0.0.0 "
                      "YaBrowser/22.11.3.838 Yowser/2.5 Safari/537.36",
        "accept": "text/html,application/xhtml+xml,application/xml;q=0.9,image/avif,image/webp,image/apng,*/*;q=0.8,"
                  "application/signed-exchange;v=b3;q=0.9"
    }

s = requests.Session()


def get_size(url):
    name = ""
    try:
        rs = requests.head(url=url, headers=headers, allow_redirects=True)
        if 200 <= rs.status_code <= 299:
            name = rs.headers.get('filename')
            if not name:
                name = Path(urlparse(rs.url).path).name
            if rs.headers.get("Content-Length"):
                return int(rs.headers.get("Content-Length")), name
            return 0, name
    except Exception:
        return 0, name


def download(url, start, end, output):
    header = {
        "user-agent": "Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/106.0.0.0 "
                      "YaBrowser/22.11.3.838 Yowser/2.5 Safari/537.36",
        "accept": "text/html,application/xhtml+xml,application/xml;q=0.9,image/avif,image/webp,image/apng,*/*;q=0.8,"
                  "application/signed-exchange;v=b3;q=0.9",
        'Range': f'bytes={start}-{end}'
    }

    try:
        rs = s.get(url=url, headers=header, stream=True, allow_redirects=True)
        if 200 <= rs.status_code <= 299:
            with open(output, 'wb') as file:
                for part in rs.iter_content(1024):
                    file.write(part)
        else:
            return
    except Exception as ex:
        print(ex)
        return


def thread_run(url, count):
    file_size, name = get_size(url)
    chunk_size = int(file_size / count)
    chunks = range(0, file_size, chunk_size)

    with ThreadPoolExecutor(max_workers=count) as executor:
        for i, chunk in enumerate(chunks):
            executor.submit(download, url=url, start=chunk, end=chunk + chunk_size - 1, output=f'{Path(url).name}.{i}')

    return len(chunks), name


def main():
    url = input("Введите ссылку для загрузки: ")
    t = time.time()
    chunks, name = thread_run(url, 3)

    with open(name, "wb") as file:
        for i in range(chunks):
            with open(f'{Path(url).name}.{i}', 'rb') as ch:
                file.write(ch.read())
            Path(f'{Path(url).name}.{i}').unlink()

    print(f'Время загрузки файла частями с потоками: {time.time() - t:.2f}')


if __name__ == "__main__":
    main()


Создаем четвертый файл

Теперь пришло время для асинхронной функции. Импортируем в скрипт нужные библиотеки и запросим у пользователя ссылку на загрузку.

Python:
import asyncio
import concurrent.futures
import functools
import time
from pathlib import Path
from urllib.parse import urlparse

import requests
import os

URL = input("Введите ссылку для загрузки: ")


Получение размера загружаемого файла

Создадим асинхронную функцию get_size(url), которая на вход принимает ссылку на файл. Выполняем запроси получаем из заголовков размера файла и его имя. Возвращаем полученные значения из функции.

Python:
async def get_size(url):
    response = requests.head(url, allow_redirects=True)
    if 200 <= response.status_code <= 299:
        size = int(response.headers['Content-Length'])
        name = response.headers.get('filename')
        if not name:
            name = Path(urlparse(response.url).path).name
        return size, name


Функция загрузки файла

Создадим функцию download_range(url, start, end, output). В нее мы передаем ссылку на загрузку, начало диапазона части, конец диапазона, имя загружаемого файла, в котором добавлен индекс части. Также создаем заголовки для запроса в которых указываем диапазон загружаемой части. Выполняем запрос, загружаем часть файла, создаем его и в байтовом режиме записываем загруженную часть на диск.

Python:
def download_range(url, start, end, output):
    headers = {'Range': f'bytes={start}-{end}'}
    response = requests.get(url, headers=headers, allow_redirects=True)

    with open(output, 'wb') as f:
        for part in response.iter_content(1024):
            f.write(part)


Асинхронная функция запуска потоков

Создадим функцию download(run, loop, url). Передаем объект run, для создания задания на загрузку. Петлю, в которой все это будет выполнятся и ссылку на файл. Получаем размера файла и его имя. Вычисляем размер части. Создаем диапазон начальных и конечных значений частей.

Создадим задание, в котором укажем целевую функцию, ссылку на файл, начало диапазона части, его конец, Имя загружаемой части. Формируем из этого список с заданиями в цикле. Запускаем задания на выполнение. После того, как задания выполнятся, сводим загруженные части в один файл и записываем на диск.

Python:
async def download(run, loop, url):
    file_size, name = await get_size(url)
    chunk_size = int(file_size / 3)
    chunks = range(0, file_size, chunk_size)

    tasks = [
        run(
            download_range,
            url,
            start,
            start + chunk_size - 1,
            f'{name}.part{i}',
        )
        for i, start in enumerate(chunks)
    ]

    await asyncio.wait(tasks)

    with open(name, 'wb') as o:
        for i in range(len(chunks)):
            chunk_path = f'{name}.part{i}'

            with open(chunk_path, 'rb') as s:
                o.write(s.read())

            os.remove(chunk_path)


Запуск загрузки файла

Получим время старта функции. Создадим объект executor. Создадим асинхронный цикл для выполнения заданий. Запустим задания на выполнение, после чего закрываем цикл и выводим в терминал время загрузки файла.

Python:
if __name__ == '__main__':
    t = time.time()
    executor = concurrent.futures.ThreadPoolExecutor(max_workers=3)
    loop = asyncio.new_event_loop()
    run = functools.partial(loop.run_in_executor, executor)

    asyncio.set_event_loop(loop)

    try:
        loop.run_until_complete(
            download(run, loop, URL)
        )
    finally:
        loop.close()
        print(f'Время загрузки файла частями асинхронной функции: {time.time() - t:.2f}')

Python:
import asyncio
import concurrent.futures
import functools
import time
from pathlib import Path
from urllib.parse import urlparse

import requests
import os

URL = input("Введите ссылку для загрузки: ")


async def get_size(url):
    response = requests.head(url, allow_redirects=True)
    if 200 <= response.status_code <= 299:
        size = int(response.headers['Content-Length'])
        name = response.headers.get('filename')
        if not name:
            name = Path(urlparse(response.url).path).name
        return size, name


def download_range(url, start, end, output):
    headers = {'Range': f'bytes={start}-{end}'}
    response = requests.get(url, headers=headers, allow_redirects=True)

    with open(output, 'wb') as f:
        for part in response.iter_content(1024):
            f.write(part)


async def download(run, loop, url):
    file_size, name = await get_size(url)
    chunk_size = int(file_size / 3)
    chunks = range(0, file_size, chunk_size)

    tasks = [
        run(
            download_range,
            url,
            start,
            start + chunk_size - 1,
            f'{name}.part{i}',
        )
        for i, start in enumerate(chunks)
    ]

    await asyncio.wait(tasks)

    with open(name, 'wb') as o:
        for i in range(len(chunks)):
            chunk_path = f'{name}.part{i}'

            with open(chunk_path, 'rb') as s:
                o.write(s.read())

            os.remove(chunk_path)


if __name__ == '__main__':
    t = time.time()
    executor = concurrent.futures.ThreadPoolExecutor(max_workers=3)
    loop = asyncio.new_event_loop()
    run = functools.partial(loop.run_in_executor, executor)

    asyncio.set_event_loop(loop)

    try:
        loop.run_until_complete(
            download(run, loop, URL)
        )
    finally:
        loop.close()
        print(f'Время загрузки файла частями асинхронной функции: {time.time() - t:.2f}')


Тестирование функций

Что ж, функции созданы, теперь их нужно потестировать. Для тестов были выбраны два первых попавшихся под руку сайта, которые позволяют загрузить mp3 файлы. А также нашел пару сайтов, с которых будем грузить картинку. Скажу сразу, выводы получились далеко не однозначные. Не знаю уж почему, но в данных тестах лучше всего себя показала функция простой загрузки файла без частей и потоков. Но, лучше показать. Вот скриншоты.

img_sait_01.png

img_sait_02.png

mp3_sait_01.png

mp3_sait_02.png


Уж не знаю, что и думать в этом случае.
Конечно же, в случае с использованием потоков в обычной и асинхронной функции позволит не загружать весь файл сразу, особенно, если он большого размера и сохранение его в оперативной памяти нецелесообразно.

Файлы с кодом во вложении к статье.

А на этом, пожалуй, все.

Спасибо за внимание. Надеюсь, данная информация будет вам полезна
 

Вложения

  • test.zip
    4,1 КБ · Просмотры: 113
Мы в соцсетях:

Обучение наступательной кибербезопасности в игровой форме. Начать игру!