Статья Немного об IPTV или проверка m3u с помощью Python. Часть 01

Уже довольно давно существует такая технология, как потоковая передача данных. С ее помощью стала возможной передача медиа-контента для просмотра, к примеру, телеканалов или просто видео на компьютере или смартфоне с помощью специальных плееров, которые и созданы для этого. Для примера, это всем известный медиа-комбайн VLC. Но речь не совсем об этом. Для того, чтобы плеер имел возможность открыть сразу же множество каналов, были придуманы, еще в далеком 1997 году, плейлисты. И с возможностью добавления в них ссылок на потоки они стали очень популярны в среде любителей IPTV, для обмена ссылками на каналы. Ну и так как, сами по себе условно «бесплатные» ссылки особо долго не живут, требуется инструмент для проверки плейлиста. Безусловно, такой инструмент нашелся — это IPTV Checker. Однако, данная программа работает только под Windows. Давайте попробуем разобраться, возможно ли с помощью Python сделать что-то подобное.

000_01.png


Скажу сразу, я не фанат телевизора. Но в данном случае меня больше заинтересовала сама проверка работоспособности. Не будем особо торопиться с созданием основного инструмента и для начала пробежимся по структуре плейлиста, а также создадим несколько инструментов, которые выполняют простейшую их обработку.

Структура плейлиста достаточно проста. В базовом видео он состоит из трех основных элементов. Это:

1. #EXTM3U — заголовок, указывающий на то, что данный текст является плейлистом;
2. #EXTINF — информация о медиафайле плейлиста, то есть, его описание;
3. http:// (https://) - ссылка на медиафайл, которая может быть как локальной, так и глобальной.

И если о заголовке и ссылке особо сказать нечего, то на описании медиафайла остановимся чуть подробнее. # в начале означает, что проигрыватель с этого места должен начать считывание данных. Далее, за сокращением EXTINF, что означает Extended information, следует указание продолжительности данных. В случае с потоком в интернете используется -1, так как конечная длина файла в этом случае неизвестна. Затем, через запятую указывается название медиаресурса. Для примера: «Песня». Но, это еще не все. Здесь же, в расширенной информации могут присутствовать такие параметры как: «tvg-name» - где указывается название канала или программы; «tvg-logo» - ссылка на логотип канала или обложку альбома; «audio-track» - информация об аудиодорожке; «group-title» - название группы каналов, для их объединения в плеерах по группам.

В принципе, данной информации уже вполне достаточно, чтобы начать работать с плейлистами. Тем более, что сильно глубоко менять их структуры мы здесь не будем.


Разбиение плейлиста на более мелкие части

Зачастую списки каналов в плейлистах могут содержать не одну тысячу ссылок. Открывать такие плейлисты в плеере не особо удобно, да и не все плееры иногда могут отрыть такое количество ссылок. Можно конечно разбить плейлист вручную простым копипастом, но в нашем распоряжении есть Python, поэтому можно воспользоваться им.


Что потребуется?

По большему счету для работы данного скрипта не требуется установки сторонних модулей. Но, для того, чтобы хоть немного разукрасить вывод информации в терминал, давайте установим библиотеку colorama. Для этого пишем в терминале команду:

pip install colorama

После того, как colorama установиться, импортируем в скрипт нужные модули и библиотеки, а также, сразу же инициализируем colorama.

Python:
import sys
import time
from pathlib import Path

from colorama import Fore
from colorama import init

init()

Двигаемся дальше. Для того, чтобы куда-то складывать информацию прочитанную из плейлиста, создадим список, а также счетчик, с помощью которого будем считать итерации и делить плейлист на нужные части.

Python:
split_set = []
num = 0

Создадим функцию main. В ней мы будем запрашивать данные у пользователя о его действиях, а также запускать функцию разбиения файла. Для начала определим глобальные переменные. Выведем информацию о том, что делает данный скрипт, и запросим у пользователя, что он хочет сделать. Обработать файл или целую директорию с файлами, чтобы разделить из все сразу.

Python:
global split_set, num
    try:
        print(f"{Fore.GREEN}{'-'*47}\n{Fore.YELLOW}РАЗБИВКА ФАЙЛА M3U НА БОЛЕЕ МЕЛКИЕ ЧАСТИ\n{Fore.GREEN}"
              f"{'-'*47}\n")
        user_check = input("Выберите действие:\n[1] Обработка директории\n[2] Обработка файла\n>>> ")

Далее следует обработка пользовательского выбора. Если он выбрал разбиение файлов в директории, выполняем проверку, существует ли она и является ли указанный путь путем к директории. Затем запрашиваем у пользователя количество каналов в файле после разбивки. Итерируемся по директории, забираем из нее файлы «.m3u» и передаем в функцию для разбивки. После чего выводим информацию о затраченном времени.

Python:
        if user_check == "1":
            path = input("Введите путь к директории: ")
            if not Path(path).exists() or path == "" or not Path(path).is_dir():
                print(Fore.RED + "Введенного пути не существует")
                sys.exit(0)
            chunk_count = input("Введите количество каналов в файле: ")
            st = time.monotonic()
            for file in Path(path).iterdir():
                if Path(file).suffix == ".m3u":
                    split_list(str(file), int(chunk_count))
                    split_set.clear()
                    num = 0
            print(f"{Fore.GREEN}Затраченное время: {Fore.BLUE}{time.monotonic() - st:.2f} c.")

Примерно то же самое происходит, если пользователь выбрал разбивку одного файла. За исключением того, что нам нет необходимости итерироваться по директории.

Python:
def main():
    """
    Получение пользовательских данных. Запуск разбивки файла.
    """
    global split_set, num
    try:
        print(f"{Fore.GREEN}{'-'*47}\n{Fore.YELLOW}РАЗБИВКА ФАЙЛА M3U НА БОЛЕЕ МЕЛКИЕ ЧАСТИ\n{Fore.GREEN}"
              f"{'-'*47}\n")
        user_check = input("Выберите действие:\n[1] Обработка директории\n[2] Обработка файла\n>>> ")
        if user_check == "1":
            path = input("Введите путь к директории: ")
            if not Path(path).exists() or path == "" or not Path(path).is_dir():
                print(Fore.RED + "Введенного пути не существует")
                sys.exit(0)
            chunk_count = input("Введите количество каналов в файле: ")
            st = time.monotonic()
            for file in Path(path).iterdir():
                if Path(file).suffix == ".m3u":
                    split_list(str(file), int(chunk_count))
                    split_set.clear()
                    num = 0
            print(f"{Fore.GREEN}Затраченное время: {Fore.BLUE}{time.monotonic() - st:.2f} c.")
        elif user_check == "2":
            path = input(Fore.RESET + "Введите путь к файлу: ")
            if not Path(path).exists() or path == "" or not Path(path).is_file():
                print(Fore.RED + "Введенного пути не существует")
                sys.exit(0)
            chunk_count = input("Введите количество каналов в файле: ")
            st = time.monotonic()
            split_list(path, int(chunk_count))
            print(f"{Fore.GREEN}Затраченное время: {Fore.BLUE}{time.monotonic() - st:.2f} c.")
        else:
            print("Ваш выбор не понятен")
            raise KeyboardInterrupt
    except KeyboardInterrupt:
        print(Fore.YELLOW + "\n\nGood by, my friend! Good by!\n")


if __name__ == "__main__":
    main()

Двигаемся дальше. Создадим функцию, в которой и будет происходить разбиение файлов на части. Я назвал ее split_list(path: str, chunk_count: int). Как видно, на входе она принимает путь к файлу для разбивки, а также целое число, указывающее на количество каналов в одном файле.

Для начала выводим информацию о действиях. Затем определяем переменную, которая будет хранить описание канала. В нее я добавил значение по умолчанию, так как сталкивался с тем, что в некоторых плейлистах напрочь отсутствует (почему-то) это самое описание.

Python:
global num, split_set

num_line = len(f"РАЗБИВАЮ ФАЙЛ НА ЧАСТИ ПО {chunk_count} ЭЛ.")
print(Fore.YELLOW + f"\nРАЗБИВАЮ ФАЙЛ НА ЧАСТИ ПО {chunk_count} ЭЛ.\n{Fore.GREEN}{'-'*num_line}")
ext_inf = '#EXTINF: -1, Null'

Открываем файл на чтение и итерируемся в нем построчно. Делаем проверку. Если начало строки содержит #EXTINF, то добавляем его в переменную ext_inf, а если ссылку на поток, или плейлист, то добавляем полученные данные в созданный нами ранее список.

Python:
    with open(path, 'r', encoding='utf-8') as file:
        for line in file.readlines():
            if line.startswith("#EXTINF"):
                ext_inf = line.strip()
            elif line.startswith("http"):
                split_set.append(f"{ext_inf}\n{line.strip()}\n")

Теперь проверяем, не равна ли длина списка указанному пользователем количеству каналов. Если да, увеличиваем переменную num на единицу, выводим сообщение пользователю, вызываем функцию сохранения информации из списка и затем очищаем его от всей информации. После чего итерируемся далее.

Python:
            if len(split_set) == chunk_count:
                num += 1
                print(Fore.YELLOW + f"\rЧасть {Fore.BLUE}{num+1} {Fore.YELLOW}обработана", end="")
                save_split(path, str(chunk_count))
                split_set.clear()

В случае, если у нас осталось некоторое количество каналов, которое не попадает под условие заданное пользователем, то есть, меньше, сохраняем остаточные данные. И выводим сообщение для пользователя.

Python:
    if 0 < len(split_set) < chunk_count:
        num += 1
        print(Fore.YELLOW + f"\rЧасть {Fore.BLUE}{num} {Fore.YELLOW}обработана", end="")
        save_split(path, str(chunk_count))
    print(f"\r\b", end="")
    print(Fore.GREEN + f"Файл {Path(path).name} разбит. Количество частей: {Fore.BLUE}{num}")

И еще одна функция, это функция сохранения данных. Я назвал ее save_split(path: str, cnt: str). На вход она принимает путь к исходному файлу, чтобы можно было забрать из него директорию для сохранения, а также исходное имя файла. И номер файла по порядку.

Ну и далее — все просто. Получаем директорию, в которую будем сохранять части файла. Затем, чтобы нумерация была нормальной и файловый менеджер корректно сортировал файлы проверяем число, больше 10 или меньше. В зависимости от этого добавляем или не добавляем 0 к номеру файла. Создаем директорию. Открываем файл на дозапись, тем самым создавая его. И в цикле сохраняем в него данные из списка. Так как мы ранее сохраняли их в список с символом перевода каретки, то и в файл они запишутся корректно. Поэтому, сохраняем как есть. И да, дописываем в самое начало файла заголовок, означающий, что данный файл является плейлистом.

Python:
    global num
    dir_split = Path(path).parent / f"split_{cnt}_channel_{str(Path(path).name.split(Path(path).suffix)[0])}"
    s = f'0{str(num)}' if num < 10 else num
    dir_split.mkdir(exist_ok=True)
    with open(dir_split / f"{Path(path).name.split(Path(path).suffix)[0]}_{s}.m3u", "a",
              encoding="utf-8") as f:
        f.write("#EXTM3U\n")
        for sst in split_set:
            f.write(sst)

Python:
import sys
import time
from pathlib import Path

from colorama import Fore
from colorama import init

init()

split_set = []
num = 0


def save_split(path: str, cnt: str):
    """
    Сохранение информации с указанным количеством элементов.

    :param cnt: Кол-во файлов для разбивки.
    :param path: Путь к исходному файлу.
    """
    global num
    dir_split = Path(path).parent / f"split_{cnt}_channel_{str(Path(path).name.split(Path(path).suffix)[0])}"
    s = f'0{str(num)}' if num < 10 else num
    dir_split.mkdir(exist_ok=True)
    with open(dir_split / f"{Path(path).name.split(Path(path).suffix)[0]}_{s}.m3u", "a",
              encoding="utf-8") as f:
        f.write("#EXTM3U\n")
        for sst in split_set:
            f.write(sst)


def split_list(path: str, chunk_count: int):
    """
    Разбивка общего количества каналов на указанное в переменной.
    Сохранение разбитых частей в файлы.

    :param path: Путь к папке с файлом.
    :param chunk_count: Количество каналов в файле после разбивки.
    """
    global num, split_set

    num_line = len(f"РАЗБИВАЮ ФАЙЛ НА ЧАСТИ ПО {chunk_count} ЭЛ.")
    print(Fore.YELLOW + f"\nРАЗБИВАЮ ФАЙЛ НА ЧАСТИ ПО {chunk_count} ЭЛ.\n{Fore.GREEN}{'-'*num_line}")
    ext_inf = '#EXTINF: -1, Null'
    with open(path, 'r', encoding='utf-8') as file:
        for line in file.readlines():
            if line.startswith("#EXTINF"):
                ext_inf = line.strip()
            elif line.startswith("http"):
                split_set.append(f"{ext_inf}\n{line.strip()}\n")
            if len(split_set) == chunk_count:
                num += 1
                print(Fore.YELLOW + f"\rЧасть {Fore.BLUE}{num+1} {Fore.YELLOW}обработана", end="")
                save_split(path, str(chunk_count))
                split_set.clear()
    if 0 < len(split_set) < chunk_count:
        num += 1
        print(Fore.YELLOW + f"\rЧасть {Fore.BLUE}{num} {Fore.YELLOW}обработана", end="")
        save_split(path, str(chunk_count))
    print(f"\r\b", end="")
    print(Fore.GREEN + f"Файл {Path(path).name} разбит. Количество частей: {Fore.BLUE}{num}")


def main():
    """
    Получение пользовательских данных. Запуск разбивки файла.
    """
    global split_set, num
    try:
        print(f"{Fore.GREEN}{'-'*47}\n{Fore.YELLOW}РАЗБИВКА ФАЙЛА M3U НА БОЛЕЕ МЕЛКИЕ ЧАСТИ\n{Fore.GREEN}"
              f"{'-'*47}\n")
        user_check = input("Выберите действие:\n[1] Обработка директории\n[2] Обработка файла\n>>> ")
        if user_check == "1":
            path = input("Введите путь к директории: ")
            if not Path(path).exists() or path == "" or not Path(path).is_dir():
                print(Fore.RED + "Введенного пути не существует")
                sys.exit(0)
            chunk_count = input("Введите количество каналов в файле: ")
            st = time.monotonic()
            for file in Path(path).iterdir():
                if Path(file).suffix == ".m3u":
                    split_list(str(file), int(chunk_count))
                    split_set.clear()
                    num = 0
            print(f"{Fore.GREEN}Затраченное время: {Fore.BLUE}{time.monotonic() - st:.2f} c.")
        elif user_check == "2":
            path = input(Fore.RESET + "Введите путь к файлу: ")
            if not Path(path).exists() or path == "" or not Path(path).is_file():
                print(Fore.RED + "Введенного пути не существует")
                sys.exit(0)
            chunk_count = input("Введите количество каналов в файле: ")
            st = time.monotonic()
            split_list(path, int(chunk_count))
            print(f"{Fore.GREEN}Затраченное время: {Fore.BLUE}{time.monotonic() - st:.2f} c.")
        else:
            print("Ваш выбор не понятен")
            raise KeyboardInterrupt
    except KeyboardInterrupt:
        print(Fore.YELLOW + "\n\nGood by, my friend! Good by!\n")


if __name__ == "__main__":
    main()

Итак, первый инструмент мы создали. Пора переходить ко второму.


Объединение плейлистов и удаление одинаковых ссылок

В общем-то, для чего служит данный скрипт, понятно из заголовка раздела. Следует пояснить, что данный скрипт может использоваться, к примеру, в том случае, когда у вас есть несколько небольших плейлистов и вы хотели бы сделать один побольше. Однако, в разных плейлистах иногда содержаться одинаковые ссылки на каналы. Просто у них разные описания. А потому, их желательно удалить, чтобы не дублировать. Ну и в одном плейлисте, также могут содержаться одинаковые ссылки. Плейлисты делают люди. А значит, возможны повторы. Что же, давайте приступим.


Что потребуется?

Также, как и в первом скрипте использование сторонних библиотек не обязательно. Я добавил colorama только для раскрашивания вывода в терминал. Для ее установки пишем команду:

pip install colorama

Импортируем необходимые модули и библиотеки, а также инициализируем colorama.

Python:
import shutil
import sys
import time
from pathlib import Path

from colorama import Fore
from colorama import init

init()

Теперь создадим глобальные переменные, которые будут использоваться при работе в любой из функций скрипта. count — счетчик уникальных ссылок. links — множество, в которое складываются все ссылки во время работы. Именно с помощью этого множества будет проверятся уникальность ссылок. А для того, чтобы данные не занимали слишком много места в памяти, так как плейлисты могут быть довольно внушительных размеров, используем множество, так как оно не позволяет хранить в себе два одинаковых элемента. Да и по скорости доступа к данным множество быстрее, чем тот же список. merge - список с каналов после проверки. error — список, в котором будут храниться ошибки возникшие в процессе объединения. Дело в том, что когда пользователи создают плейлисты, они не учитывают того, что кодировка на той машине, где он будет использоваться может отличатся. Я говорю о пользователях Windows. Попадаются плейлисты, которые сохранены в этой кодировке. И данную проблему можно было бы обойти, если бы не пристрастие добавления в описании канала всяких смайликов, символов сердечек и прочего, что даже при изменении кодировки на виндовую просто не читается. Помогает только пересохранение данного файла вручную с кодировкой utf-8. Ну и, собственно, получение абракадабры взамен смайликов. Я пытался определить кодировку, но chardet в данном случае не помогает. Поэтому я решил просто складывать ошибочные файлы в отдельную папку и сообщать об этом пользователю.

Python:
count = 0
links = set()
merge = []
error = []

Создадим функцию check_url_inline(url: str, ext_info: str). На вход она принимает ссылку на канал и его описание. Определяем глобальные переменные. Сравниваем полученную ссылку с содержимым множества. Если ссылки нет в множестве, собираем канал и добавляем его в список. А также увеличиваем счетчик.

Python:
def check_url_inline(url: str, ext_info: str):
    """
    Проверка дубликатов и добавление в основное множество.

    :param url: Ссылка для проверки.
    :param ext_info: Информация о канале.
    """
    global count, links, merge
    if url.strip() not in links:
        merge.append(f'{ext_info}\n{url.strip()}\n')
        count += 1
    links.add(url.strip())

Теперь нам потребуется функция main. Особо ее описывать нет необходимости. Пробежимся лишь вкратце. Выводим информацию для пользователя. Запрашиваем путь к директории с объединяемыми файлами. Запрашиваем имя для объединенного файла. Проверяем, является ли путь директорией и вообще, есть ли он. Вызываем функцию объединения файлов. В нее передаем путь к директории и флаг, который указывает, следует ли очищать описание, оставляя только название канала или нет. Соответственно, 0 — не очищать, 1 — очищать. Здесь, данную информацию я не запрашиваю явно. Если у кого-то есть желание, можете написать запрос данной информации у пользователя. Вызываем функцию сохранения объединенной информации. Выводим данные для пользователя в терминал.

Python:
def main():
    """
    Получение пользовательских данных. Запуск функций проверки
    на дубликаты и сохранения проверенного.

    """
    global merge, count
    print(f"{Fore.GREEN}{'-' * 43}\n{Fore.YELLOW}ПРОВЕРКА НА ДУБЛИКАТЫ И ОБЪЕДИНЕНИЕ КАНАЛОВ\n{Fore.GREEN}{'-' * 43}\n")
    path = input(Fore.RESET + "Введите путь к директории: ")
    name = input("Введите имя для объединенного файла: ")

    if not Path(path).exists() or not Path(path).is_dir():
        print(Fore.RED + "Директории не существует")
        sys.exit(0)
    st = time.monotonic()
    count_file = merged(path, 0)
    save_merge(name)
    print("\r\033[K", end="")
    print(f"\n{Fore.GREEN}Обработано файлов: {Fore.BLUE}{count_file}")
    print(f"{Fore.GREEN}Затраченное время: {Fore.BLUE}{time.monotonic() - st:.2f} c.")


if __name__ == "__main__":
    main()

Теперь создадим функцию для сохранения объединенной информации. Я назвал ее save_merge(name: str). На вход она принимает имя для сохраняемого файла, которое мы запрашивали у пользователя.

Открываем файл на запись, дописываем в него заголовок. Запускаем цикл по списку с каналами. Записываем в файл. Проверяем, есть ли информация в файле с ошибками. Если есть, запускаем цикл по списку. Создаем директорию, в которую будем складывать файлы с ошибками. Перемещаем файлы в соответствии со списком в созданную директорию. Выводим информацию о том, что имеются файлы с нарушенной кодировкой. И также записываем текстовый файл, в который помещаем имена файлов. После всего выводим информацию для пользователя о количестве оригинальных ссылок.

Python:
def save_merge(name: str):
    """
    Сохранение проверенных данных без дубликатов.

    :param name: Имя для сохраняемого файла.
    """
    global count, merge
    with open(Path.cwd() / f"{name}.m3u", "w", encoding="utf-8") as ch:
        ch.write("#EXTM3U\n")
        for channel in merge:
            ch.write(channel)

    if error:
        for file in error:
            (Path(file).parent / 'unicode_error').mkdir(exist_ok=True)
            shutil.move(file, Path(file).parent / 'unicode_error' / Path(file).name)
        print("\nИмеются файлы требующие исправления кодировки.\nСмотрите файл: 'error.txt'\n")
        with open(Path.cwd() / "error.txt", "w", encoding="utf-8") as er:
            er.write("Файлы требующие исправления кодировки: \n\n")
            for err in error:
                er.write(f'{Path(err).name}\n')

    print(f"{Fore.GREEN}\nОригинальных ссылок: {Fore.BLUE}{count}")

Создадим функцию для чтения файлов. Я назвал ее merged(path_dir, flag). В нее мы передаем путь к файлу, а также флаг, который указывает, удалять описание и оставлять только название канала или нет. В данном скрипте данный файл указан по умолчанию и не запрашивается у пользователя. Хотя, его обработка в функции имеет место быть. Дальше все просто. Открываем файл на чтение, забираем заголовок в переменную, получаем ссылку и передаем заголовок и ссылку в функцию проверки. Если возникает исключение с кодировкой, добавляем в список с ошибками путь к файлу.

Python:
def merged(path_dir, flag):
    """
    Выборка информации о файле и ссылок из плейлиста.

    :param path_dir: Ссылка на папку с файлами.
    :param flag: Флаг очистки информации. 0 - не очищает; 1 - очищает, оставляет только название канала.
    """
    count_file = 0
    print("")
    for nn, file in enumerate(Path(path_dir).iterdir()):
        count_file += 1
        print("\r\033[K", end="")
        print(f'\r{Fore.YELLOW}{count_file} | Обработка: {nn+1}', end="")
        if file.suffix == ".m3u":
            ext_info = ''
            try:
                with open(file, 'r', encoding="utf-8") as f:
                    for item in f.readlines():
                        if item.startswith("#EXTINF"):
                            ext_info = f"#EXTINF:-1 ,{item.split(',')[-1].replace('#EXTGRP:', '').strip()}" \
                                if flag == 1 else item.strip()
                        elif item.startswith("http"):
                            if item.strip().split("/")[-1].split(".")[-1] == "mpd" \
                                    or item.strip().split("/")[-1].split(".")[-1] == "flv":
                                continue
                            check_url_inline(item.strip(), ext_info)
            except UnicodeDecodeError:
                error.append(file)
                continue
    return count_file

Вот в принципе и все. Ниже приведен полный код функции. И да, данная функция не обрабатывает файлы «.m3u8», только «.m3u». Поэтому, если у вас плейлист в первом виде, просто переименуйте расширение.

Python:
import shutil
import sys
import time
from pathlib import Path

from colorama import Fore
from colorama import init

init()

count = 0
links = set()
merge = []
error = []


def check_url_inline(url: str, ext_info: str):
    """
    Проверка дубликатов и добавление в основное множество.

    :param url: Ссылка для проверки.
    :param ext_info: Информация о канале.
    """
    global count, links, merge
    if url.strip() not in links:
        merge.append(f'{ext_info}\n{url.strip()}\n')
        count += 1
    links.add(url.strip())


def save_merge(name: str):
    """
    Сохранение проверенных данных без дубликатов.

    :param name: Имя для сохраняемого файла.
    """
    global count, merge
    with open(Path.cwd() / f"{name}.m3u", "w", encoding="utf-8") as ch:
        ch.write("#EXTM3U\n")
        for channel in merge:
            ch.write(channel)

    if error:
        for file in error:
            (Path(file).parent / 'unicode_error').mkdir(exist_ok=True)
            shutil.move(file, Path(file).parent / 'unicode_error' / Path(file).name)
        print("\nИмеются файлы требующие исправления кодировки.\nСмотрите файл: 'error.txt'\n")
        with open(Path.cwd() / "error.txt", "w", encoding="utf-8") as er:
            er.write("Файлы требующие исправления кодировки: \n\n")
            for err in error:
                er.write(f'{Path(err).name}\n')

    print(f"{Fore.GREEN}\nОригинальных ссылок: {Fore.BLUE}{count}")


def merged(path_dir, flag):
    """
    Выборка информации о файле и ссылок из плейлиста.

    :param path_dir: Ссылка на папку с файлами.
    :param flag: Флаг очистки информации. 0 - не очищает; 1 - очищает, оставляет только название канала.
    """
    count_file = 0
    print("")
    for nn, file in enumerate(Path(path_dir).iterdir()):
        count_file += 1
        print("\r\033[K", end="")
        print(f'\r{Fore.YELLOW}{count_file} | Обработка: {nn+1}', end="")
        if file.suffix == ".m3u":
            ext_info = ''
            try:
                with open(file, 'r', encoding="utf-8") as f:
                    for item in f.readlines():
                        if item.startswith("#EXTINF"):
                            ext_info = f"#EXTINF:-1 ,{item.split(',')[-1].replace('#EXTGRP:', '').strip()}" \
                                if flag == 1 else item.strip()
                        elif item.startswith("http"):
                            if item.strip().split("/")[-1].split(".")[-1] == "mpd" \
                                    or item.strip().split("/")[-1].split(".")[-1] == "flv":
                                continue
                            check_url_inline(item.strip(), ext_info)
            except UnicodeDecodeError:
                error.append(file)
                continue
    return count_file


def main():
    """
    Получение пользовательских данных. Запуск функций проверки
    на дубликаты и сохранения проверенного.

    """
    global merge, count
    print(f"{Fore.GREEN}{'-' * 43}\n{Fore.YELLOW}ПРОВЕРКА НА ДУБЛИКАТЫ И ОБЪЕДИНЕНИЕ КАНАЛОВ\n{Fore.GREEN}{'-' * 43}\n")
    path = input(Fore.RESET + "Введите путь к директории: ")
    name = input("Введите имя для объединенного файла: ")

    if not Path(path).exists() or not Path(path).is_dir():
        print(Fore.RED + "Директории не существует")
        sys.exit(0)
    st = time.monotonic()
    count_file = merged(path, 0)
    save_merge(name)
    print("\r\033[K", end="")
    print(f"\n{Fore.GREEN}Обработано файлов: {Fore.BLUE}{count_file}")
    print(f"{Fore.GREEN}Затраченное время: {Fore.BLUE}{time.monotonic() - st:.2f} c.")


if __name__ == "__main__":
    main()

Данная функция является полностью рабочей. Я, в процессе тестирования основного скрипта, набрала очень большое количество каналов. Решил объединить и немного был… хм… поражен ))) Она обработала более 12 млн. записей. Конечно же, это заняло некоторое время. Уж не помню сколько, но, что-то не более двух с копейками минут. Или даже меньше.


Переименование m3u8 в m3u

В этой функции нет ничего особенного, потому, детального описания не будет. Ее я сделал просто, как вспомогательную для того, чтобы не делать это вручную. Так как часто плейлисты скачиваются именно в формате m3u8, а мои скрипты с ним не работают. Просто поясню, что данный формат иногда не содержит в себе ссылок на каналы, а только лишь частичные ссылки на фрагменты потока. Поэтому, обрабатывать данный вид плейлистов не имеет большого смысла. Ну и собственно, сама функция. Итерируемся по директории. Выбираем файлы с нужным расширением и переименовываем с добавлением небольшого суффикса. Так как в директории могут быть файлы с похожим названием, но другим расширением. Переименовываем их.

Python:
from pathlib import Path

num = 0
print("\nПереименование файлов m3u8".upper())
print("")

for file in Path(input("Введите путь к директории для переименования>>> ")).iterdir():
    if Path(file).suffix == ".m3u8":
        if Path(file).exists():
            num += 1
            name = f'{Path(file).name.split(Path(file).suffix)[0]}_{num}_copy.m3u'
        else:
            name = f'{Path(file).name.split(Path(file).suffix)[0]}.m3u'
        print(name)
        Path(file).rename(f'{Path(file).parent / name}')


Сортировка каналов по названиям

Данный скрипт будет полезен в том случае, если у вас есть множество каналов, которые объединены в один файл, но находятся по нему вразброс. То есть, условная «Россия» в одном месте с уникальной ссылкой на поток и в другом месте та же «Россия», но уже с другой ссылкой. По сути, это один и тот же канал, только трансляция ведется с разных серверов. Вот, для того, чтобы собрать одинаковые по названиям каналы в кучку, я и сделал этот скрипт. По большому счету он очень простой и не использует никаких запредельных технологий )).

В данном скрипте не потребуется устанавливать сторонних модулей или библиотек. Так что, будем любоваться стандартным терминалом. Импортируем необходимые для работы скрипта библиотеки и модули. Создадим словарь, в который и будем складывать данные каналов для сортировки.

Python:
import json
import sys
from pathlib import Path

sort_ch = dict()

Создадим функцию main. В ней мы будем запрашивать путь к файлу для сортировки по именам каналов. Затем проверять существует ли файл и является ли он файлом. Создаем две переменные. В одной будем хранить полное описание канала, во второй — его отображаемое имя. Открываем файл на чтение. Итерируемся по нему построчно в цикле. Забираем описание канала, получаем его имя. Затем, если встречается ссылка, передаем все полученные данные в функцию для добавления в словарь, с помощью которого и будет происходить сортировка. После сохраняем отсортированные данные.

Python:
def main():
    """
    Открытие файла. Перебор ссылок и описаний. Передача в функцию
    для добавления в словарь.
    """
    print("\nСортировка каналов по имени".upper())
    print("")
    path = input("Введите путь к файлу m3u >>> ")

    if not Path(path).exists() or not Path(path).is_file():
        print("Пути не существует")
        sys.exit(0)

    ext, name = "", ""
    with open(path, 'r', encoding='utf-8') as f:
        for ln in f.readlines():
            if ln.startswith("#EXTINF"):
                ext = ln.strip()
                name = "No name" if ext.split(',')[-1].replace('#EXTGRP:', '').strip() is None \
                    else ext.split(',')[-1].replace('#EXTGRP:', '').strip()
                continue
            elif ln.startswith("http"):
                sort_channel(ext, name, ln.strip())
                pass
    s_name = Path(path).parent / f'{Path(path).name.replace(Path(path).suffix, "")}_sort.json'
    save_dict(str(s_name))


if __name__ == "__main__":
    main()

Теперь, собственно, создадим саму функцию сортировки. Я назвал ее sort_channel(ext: str, name: str, url: str). В нее мы передаем описание канала, его отображаемое имя и ссылку на поток. Здесь все просто. Создаем ключ с именем канала. Проверяем, есть ли в данном ключе значение url. Если нет, добавляем. Обратите внимание на порядок. Сначала идет url, а затем описание канала. Дело в том, что описание может быть одинаковым. А значит, данный канал не попадет в словарь. А вот у url шансов на уникальность больше. Особенно после того, как вы пройдетесь по плейлисту с помощью предыдущего скрипта для объединения каналов.

Python:
def sort_channel(ext: str, name: str, url: str):
    """
    Сортировка каналов по наименованиям в словаре.

    :param ext: Описание канала.
    :param name: Имя канала.
    :param url: Ссылка на канал.
    """
    if name not in sort_ch:
        sort_ch[name] = dict()
    sort_ch[name].update({url: ext})

Таким образом, мы также выполняем частичную сортировку на уникальность. Так как в словаре не может быть двух одинаковых ключей. Тем самым мы отсеем повторяющиеся ссылки.

И еще одна функция, уже для сохранения отсортированного содержимого словаря. Назовем ее save_dict(name: str). На вход она принимает имя, которое является путем к директории для сохранения, с расширением файла, которое в данном случае является «.json». Для начала сохраняем json. В принципе, этого делать не обязательно, но, мало ли что, вдруг вам понадобиться поработать с помощью каких-либо скриптов с данными каналами. Затем определяем имя для «.m3u» файла. Открываем его на запись, дописываем в него заголовок, после чего итерируемся по словарю и сохраняем полученные данные в файл.

Python:
def save_dict(name: str):
    """
    Сохранение результатов в файлы.

    :param name: Путь к файлу для сохранения json.
    """
    with open(name, 'w', encoding='utf-8') as f:
        json.dump(sort_ch, f, indent=4, ensure_ascii=False)
    m3u_name = Path(name).parent / f'{Path(name).name.replace(Path(name).suffix, "")}_sort.m3u'
    with open(m3u_name, 'w', encoding='utf-8') as file:
        file.write("#EXTM3U\n")
        for key in sort_ch:
            for item in sort_ch.get(key):
                file.write(f'{sort_ch.get(key).get(item)}\n{item}\n')
                print("\r\033[K", end="")
                print(f'\rСохранение: {sort_ch.get(key).get(item)} | {item}', end="")

Python:
import json
import sys
from pathlib import Path

sort_ch = dict()


def save_dict(name: str):
    """
    Сохранение результатов в файлы.

    :param name: Путь к файлу для сохранения json.
    """
    with open(name, 'w', encoding='utf-8') as f:
        json.dump(sort_ch, f, indent=4, ensure_ascii=False)
    m3u_name = Path(name).parent / f'{Path(name).name.replace(Path(name).suffix, "")}_sort.m3u'
    with open(m3u_name, 'w', encoding='utf-8') as file:
        file.write("#EXTM3U\n")
        for key in sort_ch:
            for item in sort_ch.get(key):
                file.write(f'{sort_ch.get(key).get(item)}\n{item}\n')
                print("\r\033[K", end="")
                print(f'\rСохранение: {sort_ch.get(key).get(item)} | {item}', end="")


def sort_channel(ext: str, name: str, url: str):
    """
    Сортировка каналов по наименованиям в словаре.

    :param ext: Описание канала.
    :param name: Имя канала.
    :param url: Ссылка на канал.
    """
    if name not in sort_ch:
        sort_ch[name] = dict()
    sort_ch[name].update({url: ext})


def main():
    """
    Открытие файла. Перебор ссылок и описаний. Передача в функцию
    для добавления в словарь.
    """
    print("\nСортировка каналов по имени".upper())
    print("")
    path = input("Введите путь к файлу m3u >>> ")

    if not Path(path).exists() or not Path(path).is_file():
        print("Пути не существует")
        sys.exit(0)

    ext, name = "", ""
    with open(path, 'r', encoding='utf-8') as f:
        for ln in f.readlines():
            if ln.startswith("#EXTINF"):
                ext = ln.strip()
                name = "No name" if ext.split(',')[-1].replace('#EXTGRP:', '').strip() is None \
                    else ext.split(',')[-1].replace('#EXTGRP:', '').strip()
                continue
            elif ln.startswith("http"):
                sort_channel(ext, name, ln.strip())
                pass
    s_name = Path(path).parent / f'{Path(path).name.replace(Path(path).suffix, "")}_sort.json'
    save_dict(str(s_name))


if __name__ == "__main__":
    main()


Проверка работоспособности каналов с помощью плеера VLC

Конечно же, мы не будем открывать каждый канал в плеере. А будем использовать скрипт Python.


Что потребуется?

Для начала, вне зависимости от операционной системы, вам потребуется установить сам медиаплеер. Затем, для того, чтобы мы могли работать с ним из python, установим библиотеку python-vlc. Пишем в терминале:

pip install python-vlc

И да, здесь мы также будем использовать colorama, хоть это и не обязательно. Для ее установки пишем:

pip install colorama

После того, как нужные библиотеки будут установлены, импортируем все необходимое в наш скрипт и инициализируем colorama.

Python:
import concurrent.futures
import subprocess
import sys
import time
from pathlib import Path

import vlc
from colorama import Fore
from colorama import init

init()

Создадим списки и переменные, в которых будем хранить рабочие и нерабочие каналы, а также подсчитывать количество обработанных ссылок.

Python:
status = []
error = []
st_count, err_count, all_ch = 0, 0, 0

В этот раз мы начнем с сохранения данных. Создадим функцию save_status_error(path). На вход она получает путь к обрабатываемому файлу, чтобы получить путь к его директории. Здесь нам не требуется сохранять каждый файл отдельно. Нам нужно просто сохранить рабочие и нерабочие каналы в разные файлы. У меня это реализовано именно так потому, что я использовал данный скрипт как вспомогательный, то есть, средство перепроверки нерабочих файлов, которые мне выдал мой скрипт. Потому так. Ну и далее все просто. Открываем файл на дозапись, итерируемся по спискам, если в них что-то есть и сохраняем данные в файл. Здесь я поставил небольшой фильтр, чтобы файлы «Триколора» не попадали в какой-либо из списков. Не знаю, не вдавался в подробности, может быть данные ссылки и можно смотреть на компьютере с помощью какого-то хитрого плеера. Тем более, что на работоспособность они в принципе проверяются неплохо. Но, дело в том, что к примеру, в плеере «Celluloid», который идет из коробки в Мяте, эти ссылки на потоки распадаются в буквальном списке на отдельные сегменты. Из-за этого плеер может даже подвиснуть. Ну, а в VLC они просто не воспроизводятся. Но это так, мое субъективное, личное мнение. Вполне возможно, что кому-то эти ссылки нужны. И тогда фильтр можно просто удалить.

Python:
def save_status_error(path):
    (Path(path).parent / 'checked_vlc').mkdir(exist_ok=True)
    if len(status) > 0:
        name = Path(path).parent / 'checked_vlc' / f'good_vlc.m3u'
        with open(name, "a", encoding='utf-8') as f:
            for item in status:
                if "tricolor" in item:
                    continue
                f.write(f"{item}")
    if len(error) > 0:
        name = Path(path).parent / 'checked_vlc' / f'error_vlc.m3u'
        with open(name, "a", encoding='utf-8') as f:
            for item in error:
                f.write(f"{item}")

Создадим функцию для проверки ссылок. Назовем ее, к примеру, vlc_player(url, ext, nm). На входе она получает ссылку на поток, описание канала, и порядковый номер, в котором она была извлечена из файла. Он служит больше в психологическом плане, когда показывает номер проверяемой ссылки для пользователя. Так как сам по себе плеер вываливает в терминал столько информации, что в ней легко потеряться. Что происходит в данном скрипте? Для начала создаем экземпляр плеера. Передаем в него ссылку и далее, передаем в плеер для воспроизведения. Запускаем таймер на 0,5 секунды. Так как проверка будет работать в потоках, таймер необходим. Если ставить значение меньше, контент не успевает воспроизвестись, так же, как и в случае, если таймер убрать вообще. Если больше, скрипт соответственно работает довольно долго. При этом, высока вероятность того, что если у вас много рабочих каналов, может просто подвиснуть графическая оболочка. Как в Windows с этим — не проверял. А вот Cinnamone подвисал намертво. Спасала только перезагрузка с кнопки.

Ну и далее читаем коды состояния. Если это коды «Error», «Ended» или «Opening», канал можно считать нерабочим. Более того, два кода vlc.State.Error", "State.Error" явно устарели. Их оставил так, на всякий случай. А два остальных были выявлены опытным путем. Читать документацию было слишком долго )).

Ну и соответственно добавляем в рабочие и нерабочие списки. Также обрабатываем исключение, хотя, за все время работы оно не возникало ни разу. Другие ошибки — да. При проверке слишком больших файлов скорее всего не хватает ресурсов. И скрипт вылетает. Большие — это более 4000.


Python:
def vlc_player(url, ext, nm):
    player, state = "", ""
    try:
        instance = vlc.Instance('--input-repeat=-1', '--no-fullscreen')
        player = instance.media_player_new()
        media = instance.media_new(url)
        player.set_media(media)
        player.play()
        time.sleep(0.5)
        state = str(player.get_state())

        if state in ["vlc.State.Error", "State.Error", "State.Ended", "State.Opening"]:
            player.stop()
            error.append(f'{ext}\n{url}\n')
            print(f'\n{nm} | Stream is working. Current state = {state.upper()}\n')
            return
        else:
            print(f'\n{nm} | Stream is working. Current state = {state.upper()}\n')
            player.stop()
            status.append(f'{ext}\n{url}\n')
            return
    except Exception:
        print(f'\nStream is dead. Current state = {state}\n')
        player.stop()
        error.append(f'{ext}\n{url}\n')
        return

Ну и функция main. Думаю особых пояснений она не требует. Здесь мы запрашиваем путь к директории с файлами или файлом. Так как можно обрабатывать сразу же несколько. Открываем файл, итерируемся по нему построчно и запускаем функцию проверки в потоках. Здесь их максимально количество 80. Качество проверки не страдает. VLC — очень мощный товарищ.

Python:
def main():
    global status, error, st_count, err_count, all_ch
    path = input("Введите путь к директории: ")
    if not Path(path).exists() or not Path(path).is_dir():
        print("Директории не существует или введенный путь не ведет к директории")
        sys.exit(0)

    time_start = time.monotonic()
    for num, path in enumerate(Path(path).iterdir()):
        if Path(path).suffix == ".m3u":
            nm = 0
            print(f"Проверка файла: {num + 1} | {path}\n")
            if not Path(path).exists():
                sys.exit(0)
            ext = ""
            with concurrent.futures.ThreadPoolExecutor(max_workers=80) as executor:
                with open(path, 'r', encoding='utf-8') as file:
                    for line in file.readlines():
                        if line.startswith("#EXTINF"):
                            ext = line.strip()
                            continue
                        if line.startswith("http"):
                            nm += 1
                            all_ch += 1
                            executor.submit(vlc_player, url=line.strip(), ext=ext, nm=nm)
                            time.sleep(0.3)

            save_status_error(path)
            subprocess.Popen("clear", shell=True)
            time.sleep(0.3)
            print(f"\nGood: {len(status)}")
            print(f"Error: {len(error)}\n")
            st_count = st_count + len(status)
            err_count = err_count + len(error)
            status.clear()
            error.clear()
            Path(path).unlink()

    print(f"\nAll Channel: {all_ch}")
    print(f"\nAll Good: {st_count}")
    print(f"All Error: {err_count}\n")
    print(f'{Fore.LIGHTGREEN_EX}All Scan time | {Fore.RESET}'
          f'{(int(time.monotonic() - time_start) // 3600) % 24:d} ч. '
          f'{(int(time.monotonic() - time_start) // 60) % 60:02d} м. '
          f'{int(time.monotonic() - time_start) % 60:02d} с.\n')


if __name__ == "__main__":
    main()

Python:
# pip install python-vlc

import concurrent.futures
import subprocess
import sys
import time
from pathlib import Path

import vlc
from colorama import Fore
from colorama import init

init()

status = []
error = []
st_count, err_count, all_ch = 0, 0, 0


def save_status_error(path):
    (Path(path).parent / 'checked_vlc').mkdir(exist_ok=True)
    if len(status) > 0:
        name = Path(path).parent / 'checked_vlc' / f'good_vlc.m3u'
        with open(name, "a", encoding='utf-8') as f:
            for item in status:
                if "tricolor" in item:
                    continue
                f.write(f"{item}")
    if len(error) > 0:
        name = Path(path).parent / 'checked_vlc' / f'error_vlc.m3u'
        with open(name, "a", encoding='utf-8') as f:
            for item in error:
                f.write(f"{item}")


def vlc_player(url, ext, nm):
    player, state = "", ""
    try:
        instance = vlc.Instance('--input-repeat=-1', '--no-fullscreen')
        player = instance.media_player_new()
        media = instance.media_new(url)
        player.set_media(media)
        player.play()
        time.sleep(0.5)
        state = str(player.get_state())

        if state in ["vlc.State.Error", "State.Error", "State.Ended", "State.Opening"]:
            player.stop()
            error.append(f'{ext}\n{url}\n')
            print(f'\n{nm} | Stream is working. Current state = {state.upper()}\n')
            return
        else:
            print(f'\n{nm} | Stream is working. Current state = {state.upper()}\n')
            player.stop()
            status.append(f'{ext}\n{url}\n')
            return
    except Exception:
        print(f'\nStream is dead. Current state = {state}\n')
        player.stop()
        error.append(f'{ext}\n{url}\n')
        return


def main():
    global status, error, st_count, err_count, all_ch
    path = input("Введите путь к директории: ")
    if not Path(path).exists() or not Path(path).is_dir():
        print("Директории не существует или введенный путь не ведет к директории")
        sys.exit(0)

    time_start = time.monotonic()
    for num, path in enumerate(Path(path).iterdir()):
        if Path(path).suffix == ".m3u":
            nm = 0
            print(f"Проверка файла: {num + 1} | {path}\n")
            if not Path(path).exists():
                sys.exit(0)
            ext = ""
            with concurrent.futures.ThreadPoolExecutor(max_workers=80) as executor:
                with open(path, 'r', encoding='utf-8') as file:
                    for line in file.readlines():
                        if line.startswith("#EXTINF"):
                            ext = line.strip()
                            continue
                        if line.startswith("http"):
                            nm += 1
                            all_ch += 1
                            executor.submit(vlc_player, url=line.strip(), ext=ext, nm=nm)
                            time.sleep(0.3)

            save_status_error(path)
            subprocess.Popen("clear", shell=True)
            time.sleep(0.3)
            print(f"\nGood: {len(status)}")
            print(f"Error: {len(error)}\n")
            st_count = st_count + len(status)
            err_count = err_count + len(error)
            status.clear()
            error.clear()
            Path(path).unlink()

    print(f"\nAll Channel: {all_ch}")
    print(f"\nAll Good: {st_count}")
    print(f"All Error: {err_count}\n")
    print(f'{Fore.LIGHTGREEN_EX}All Scan time | {Fore.RESET}'
          f'{(int(time.monotonic() - time_start) // 3600) % 24:d} ч. '
          f'{(int(time.monotonic() - time_start) // 60) % 60:02d} м. '
          f'{int(time.monotonic() - time_start) % 60:02d} с.\n')


if __name__ == "__main__":
    main()

Что же, думаю, что на этом можно данную часть статьи завершить, так как она угрожает слишком затянуться.
Создание основного инструмента, с помощью которого мы будем проверять плейлисты на работоспособность перенесем во вторую часть.

Спасибо за внимание. Надеюсь, данная информация будет вам полезна
 
Последнее редактирование модератором:
  • Нравится
Реакции: MLNK и Victor_Mirniy

start120

Member
28.01.2023
6
0
BIT
43
Никак не могу найти подсказку или скрипт извлекающий с сайта ссылку на потоке видео в формате m3u, для дальнейшего формирования плэйлиста. Может кто подскажет?
 

Johan Van

Green Team
13.06.2020
364
696
BIT
405
Никак не могу найти подсказку или скрипт извлекающий с сайта ссылку на потоке видео в формате m3u, для дальнейшего формирования плэйлиста. Может кто подскажет?

Не совсем понятно, что вы имеете в виду. Если можно, подробнее о том, что вы хотите получить и откуда. Можете даже сайт написать, чтобы было понятно, где посмотреть
 

start120

Member
28.01.2023
6
0
BIT
43
Не совсем понятно, что вы имеете в виду. Если можно, подробнее о том, что вы хотите получить и откуда. Можете даже сайт написать, чтобы было понятно, где посмотреть
Хочу создать собственный плэйлист, который бы автоматически обновлялся с источников трансляций тв каналов. Известно, что почти все ресурсы транслирующие потоковое видео, часто меняют ссылку. Поэтому есть желание написать скрипт, который будет с периодичностью извлекать ссылку (и) с форматом m3u8. Найти с помощью сторонних приложений и дополнений, например, Video DawnloadHelper, не составляет труда, но вот извлечь из исходного кода страницы не получается. Для примера возьмет сайт, транслирующий Eurosport 1 : - вот конкретно с него нужно получить ссылку с форматом m3u8 с помощью скрипта.
 

Johan Van

Green Team
13.06.2020
364
696
BIT
405
Хочу создать собственный плэйлист, который бы автоматически обновлялся с источников трансляций тв каналов. Известно, что почти все ресурсы транслирующие потоковое видео, часто меняют ссылку. Поэтому есть желание написать скрипт, который будет с периодичностью извлекать ссылку (и) с форматом m3u8. Найти с помощью сторонних приложений и дополнений, например, Video DawnloadHelper, не составляет труда, но вот извлечь из исходного кода страницы не получается. Для примера возьмет сайт, транслирующий Eurosport 1 : - вот конкретно с него нужно получить ссылку с форматом m3u8 с помощью скрипта.

Попробуйте так сделать, если я правильно понял ваш вопрос:

Python:
from urllib.parse import urljoin

import requests

requests.packages.urllib3.disable_warnings()

headers = {
    "user-agent": "Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/106.0.0.0 "
                  "YaBrowser/22.11.3.838 Yowser/2.5 Safari/537.36",
    "accept": "text/html,application/xhtml+xml,application/xml;q=0.9,image/avif,image/webp,image/apng,*/*;q=0.8,"
              "application/signed-exchange;v=b3;q=0.9"
}


def iter_text(txt, url):
    links = []
    for item in txt.splitlines():
        if not item.startswith("#"):
            links.append(item.strip()) if item.startswith("http") else links.append(urljoin(url, item.strip()))
    return links if links else False


def load_m3u8(url):
    try:
        rs = requests.get(url=url, headers=headers, timeout=7, stream=True, allow_redirects=True, verify=False)
        if 200 <= rs.status_code <= 299:
            if links := iter_text(rs.text, url):
                return links
            return False
        return False
    except Exception:
        return False


def main():
    url = "https://est2.varcdn.top/Eurosport1/index.m3u8"
    if links := load_m3u8(url):
        print(links)
        for ln in links:
            print(load_m3u8(ln.strip()))


if __name__ == "__main__":
    main()

Ссылку берем тут:

Снимок экрана от 2023-01-28 21-59-41.png
 
  • Нравится
Реакции: start120

Johan Van

Green Team
13.06.2020
364
696
BIT
405
Хочу создать собственный плэйлист, который бы автоматически обновлялся с источников трансляций тв каналов. Известно, что почти все ресурсы транслирующие потоковое видео, часто меняют ссылку. Поэтому есть желание написать скрипт, который будет с периодичностью извлекать ссылку (и) с форматом m3u8. Найти с помощью сторонних приложений и дополнений, например, Video DawnloadHelper, не составляет труда, но вот извлечь из исходного кода страницы не получается. Для примера возьмет сайт, транслирующий Eurosport 1 : - вот конкретно с него нужно получить ссылку с форматом m3u8 с помощью скрипта.

Хотя, конечно, это все же не совсем автоматизация. Вам, как я понимаю, надо эту ссылку автоматом получать. А не заходить постоянно в инструменты разработчика.
 

start120

Member
28.01.2023
6
0
BIT
43
Попробуйте так сделать, если я правильно понял ваш вопрос:

Python:
from urllib.parse import urljoin

import requests

requests.packages.urllib3.disable_warnings()

headers = {
    "user-agent": "Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/106.0.0.0 "
                  "YaBrowser/22.11.3.838 Yowser/2.5 Safari/537.36",
    "accept": "text/html,application/xhtml+xml,application/xml;q=0.9,image/avif,image/webp,image/apng,*/*;q=0.8,"
              "application/signed-exchange;v=b3;q=0.9"
}


def iter_text(txt, url):
    links = []
    for item in txt.splitlines():
        if not item.startswith("#"):
            links.append(item.strip()) if item.startswith("http") else links.append(urljoin(url, item.strip()))
    return links if links else False


def load_m3u8(url):
    try:
        rs = requests.get(url=url, headers=headers, timeout=7, stream=True, allow_redirects=True, verify=False)
        if 200 <= rs.status_code <= 299:
            if links := iter_text(rs.text, url):
                return links
            return False
        return False
    except Exception:
        return False


def main():
    url = "https://est2.varcdn.top/Eurosport1/index.m3u8"
    if links := load_m3u8(url):
        print(links)
        for ln in links:
            print(load_m3u8(ln.strip()))


if __name__ == "__main__":
    main()

Ссылку берем тут:

Посмотреть вложение 66317
Спасибо большое, ссылку берет!!!
 

Johan Van

Green Team
13.06.2020
364
696
BIT
405
Спасибо большое, ссылку берет!!!

Сайт с трансляцией за cloudflare:

2023-01-28_22-11.png


Поэтому, просто запросами оттуда ничего не добиться. Возможно, можно попробовать selenium. Но как перехватить ссылку в нем я пока не знаю. Надо почитать документацию, если это возможно.
 
  • Нравится
Реакции: satfan

start120

Member
28.01.2023
6
0
BIT
43
Хотя, конечно, это все же не совсем автоматизация. Вам, как я понимаю, надо эту ссылку автоматом получать. А не заходить постоянно в инструменты разработчика.
Все верно, теперь думаю задача по легче осталась, полученную ссылку заменять в файле плэйлиста при входе в виндовс. Я чайник в в написании скриптов на питоне, применяю шаблоны). Буду пробовать. Еще раз спасибо!
 

start120

Member
28.01.2023
6
0
BIT
43
Сайт с трансляцией за cloudflare:

Посмотреть вложение 66320

Поэтому, просто запросами оттуда ничего не добиться. Возможно, можно попробовать selenium. Но как перехватить ссылку в нем я пока не знаю. Надо почитать документацию, если это возможно.
Понял, Вы ссылку через вкладку Нетворк взяли, я про перехват ссылки на потоковое видео думал. Ну инет перелопатил не нашел решение.
 

start120

Member
28.01.2023
6
0
BIT
43
Я через расширение в хроме Video Download Helper беру ссылку
 

Вложения

  • 1.jpg
    1.jpg
    85,6 КБ · Просмотры: 188

Johan Van

Green Team
13.06.2020
364
696
BIT
405
Я через расширение в хроме Video Download Helper беру ссылку

Я через расширение в хроме Video Download Helper беру ссылку

Я тут немного подумал над вашим вопросом и написал вот такую небольшую статью, которая, возможно, сможет решить вашу проблему: Перехват запросов с веб-страницы с помощью selenium и browsermobproxy в Python. Статья еще не опубликована. Не знаю, будет ли у вас к ней доступ или нет. Скорее всего да. Во всяком случае попробуйте или дождитесь публикации.
 
Мы в соцсетях:

Обучение наступательной кибербезопасности в игровой форме. Начать игру!