Статья Немного об IPTV или проверка m3u с помощью Python. Часть 02

Это вторая часть статьи о проверке плейлистов на работоспособность. Небольшие пояснения для тех, кто не читал первую часть. В прошлой статье мы создали небольшие инструменты для обработки плейлистов. Такие скрипты как: разбиение плейлиста на части, объединение, сортировка по именам каналов. А также проверка плейлистов с помощью плеера VLC. В этой же части я попробую рассказать, как я пытался сделать чекер и, что из этого получилось.

000_01.png



Как я уже и говорил в первой части, я не фанат телевизора. Но вот сам принцип проверки потока меня заинтересовал. Сейчас будет немного лирики, поэтому, кого больше интересует практическая часть, этот абзац можно смело пропустить. Я не буду рассказывать сейчас сколько версий чекера я сделал. Просто скажу, что много. А все дело в том, что изначально я исходил из неправильных предпосылок. И здесь я скорее хотел сказать именно о том, как важно в любых проектах продумывать их концепцию и определять цели. Изначально я предположил, что для того, чтобы определить, есть ли медиаконтент или нет, нужно понять, какое содержимое прилетает к нам в ответе на запрос. И тут была первая ошибка. Небольшая, но все же. Для начала я пробовал использовать модуль magic, надстройку над библиотекой libmagic1. И в целом, она работала довольно хорошо. Вот только меня беспокоило то, что за собой она тащит очень большую и главную зависимость, то есть, нужно понять, что в системе есть libmagic1, если нет, сообщить об этом пользователю. Установить. А в Windows вообще, в обязательном порядке установка DLL библиотеки. Потому, через какое-то время от использования данного модуля я отказался. Решил покопать в сторону определения содержимого с помощью сигнатур файлов. И даже собрал внушительную коллекцию сигнатур медиаконтента. И это тоже работало. Но, как-то довольно кривовато. А суть, в принципе, была в том, что нам вообще не нужно понимать, что за поток передается в ответе. То есть, если это не html или текст, то явно содержимое имеющее отношение к медиа. А потому, если у нас 200-й статус код, и мы поняли, что содержимое не текст, то значит, канал рабочий. Таким образом, отвалилось около 150 строк кода, которые я «запилил» для определения контента, вычленения плейлистов и сегментов. В общем, что у меня получилось, смотрите ниже сами.


Что потребуется?

В данном скрипте нужно установить несколько сторонних библиотек. Это собственно requests, bs4 и lxml, чтобы парсить xml, так как бывают и такие плейлисты. colorama — куда же без нее, надо чтобы все было красиво. И pytube. Последний нужен для проверки ссылок на YouTube, так как в плейлистах попадается и такое. Поэтому, пишем в терминале:

pip install requests bs4 lxml colorama pytube

Теперь импортируем нужные в работе библиотеки, инициализируем colorama, а также пропишем код для подавления варнинга при использовании в requests параметра verify в значении False.

Python:
import copy
import sys
import time
from concurrent.futures import ThreadPoolExecutor
from pathlib import Path
from urllib.parse import urljoin

import requests
from bs4 import BeautifulSoup
from colorama import Fore
from colorama import init
from pytube import YouTube, exceptions

requests.packages.urllib3.disable_warnings()

init()

Определяем множества из сигнатур, которые понадобятся во время работы скрипта. Следует сказать, что сигнатур медиаконтента здесь нет. Только лишь сигнатуры текста и html. Вот собственно и они:

Python:
text_plain = {"23 45 58 54", "5B 70 6C 61 79 6C 69 73", "74 65 78 74 2F 70 6C 61 69 6E", "EF BB BF 00", "FF FE 00 00",
              "74 65 78 74 2F 70 6C 61 69 6E", "3B 20 63 68 61 72 73 65 74 3D", "49 53 4F 2D 38 38 35 39 2D 31",
              "69 73 6F 2D 38 38 35 39 2D 31", "55 54 46 2D 38", "FE FF 00 00"}

html = {"7B 22", "3C 21 44 4F 43 54 59 50 45 20", "48 54 4D 4C TT", "3C 48 54 4D 4C TT", "3C 48 45 41 44 TT",
        "3C 53 43 52 49 50 54 TT", "3C 49 46 52 41 4D 45 TT", "3C 48 31 TT", "3C 44 49 56 TT", "3C 46 4F 4E 54 TT",
        "3C 54 41 42 4C 45 TT", "3C 41 TT", "3C 53 54 59 4C 45 TT", "3C 54 49 54 4C 45 TT", "3C 42 TT", "3C 42 52 TT",
        "3C 42 4F 44 59 TT", "3C 50 TT", "3C 21 2D 2D TT", "3C 3F 78 6D 6C", "25 50 44 46 2D", "4E 6F", "3C 41", "3C",
        "45 52"}

Это все, что мне удалось наковырять по разным сайтам. Думаю, что их больше, но тех, что есть, вполне хватает.
Конечно же, так как мы будем делать запросы, нам понадобятся заголовки:

Python:
headers = {
    "User-Agent": "libmpv",
    "Accept": "*/*",
    "Connection": "keep-alive",
    "Icy-MetaData": "1"
}

Этот заголовок я отловил из плеера Celluloid с помощью Wireshark. Ну и списки, в которых будут храниться рабочие и нерабочие каналы, а также счетчик для перепроверки. О ней я поясню, когда дело дойдет до кода.

Python:
status = []
error = []
cnt = 0

Ну вот, с импортами и глобальными переменными вроде бы определились. Теперь двигаемся дальше. А дальше у нас будет функция main.


Открываем файл и запускаем потоки

Я сделал скрипт таким образом, чтобы он имел возможность проверять сразу же несколько файлов. Поэтому, для начала запрашиваем у пользователя путь к директории с файлами. Проверяем, есть ли директория, является ли путь директорией. Если да, двигаемся дальше.

Python:
    global status, error, cnt
    path = input("\nВведите путь к директории с m3u: ")
    if not Path(path).exists() or not Path(path).is_dir():
        print("Нет такой директории или указанный путь не является директорией")
        sys.exit(0)

Определяем переменную для замера времени выполнения скрипта. Также, создаем список из файлов в директории. Это нужно для того, чтобы скрипт проверял файлы в более-менее отсортированном виде. Потому как в данном случае итерация идет с помощью генератора, и файлы могут выдаваться вразброс.

Python:
    time_start = time.monotonic()

    files = [fil for fil in Path(path).iterdir() if Path(fil).suffix == ".m3u"]

Запускаем цикл для итерации по списку файлов. Определяем счетчик, который будет содержать количество каналов. В данном случае это будет служить больше в информационных целях. Поэтому, открываем файл на чтение, итерируемся по нему и при каждом нахождении ссылки увеличиваем счетчик. После выводим сообщение в терминал о том, какой файл сейчас обрабатывается, сколько ссылок в файле, а также, если файлов несколько, какой по счету из скольки.

Python:
    for nm, file in enumerate(sorted(files)):
        n = 0
        with open(file, 'r', encoding='utf-8') as f:
            for nn in f.readlines():
                if nn.startswith("http"):
                    n += 1
        print(f'\n{Fore.YELLOW}Checked file: {Fore.RESET}{Path(file).name} | {Fore.YELLOW}Count: {Fore.RESET}{n} | '
              f'{Fore.YELLOW}File: {Fore.RESET}{nm + 1}/{len(files)}\n')
        print(Fore.GREEN + "-" * 60 + "\n")

Определяем переменную, в которую будем сохранять описание канала. Создаем с помощью with объект ThreadPoolExecutor, указываем максимальное количество потоков. Создание пула с помощью контекстного менеджера with избавляет нас от необходимости дожидаться завершения потоков. Затем открываем файл, итерируемся по содержимому построчно в цикле. Присваиваем переменной значение описания канала, а если попадается ссылка на канал, запускаем поток с указанием функции для запуска проверки, в которую передаем ссылку, описание канала и количество ссылок в файле.

Python:
        ext = ""
        with ThreadPoolExecutor(max_workers=5) as executor:
            with open(file, 'r', encoding='utf-8') as fl:
                for line in fl.readlines():
                    if line.startswith("#EXTINF"):
                        ext = line.strip()
                        continue
                    elif line.startswith("http"):
                        if len(line.strip().split()) > 1:
                            line = line.strip().replace(" ", "")
                        executor.submit(verification, url=line.strip(), ext=ext, count=n)

Теперь часть кода, в котором выполняется перепроверка ошибок. Для начала, для чего это сделано. Так как контент, который мы проверяем, это все же медиаконтент, на инициализацию его требуется время. И иногда просто не хватает времени на чтение данных, то есть, возникает исключение ReadTimeout. Однако, мы уже обратились к серверу и вполне возможно, что еще некоторое время он будет пытаться поддержать соединение. Потому, мы перепроверяем нерабочие ссылки. Помогает, но частично. И здесь ничего не поделаешь. Впрочем, если кто-то подскажет мне, как можно это обойти, я буду очень благодарен. Но, я смотрел в «IPTV Checker», который под Windows. По-умолчанию там выставлен timeout в 5 секунд, но можно выставить его вручную. По моему до 10. Также можно указать количество потоков, по умолчанию 5, ну и количество проходов. То есть, там тоже есть проблема пропуска рабочих ссылок. Вот потому и нужна перепроверка. А счетчик, которые определяется глобально, нужен для того, чтобы выводить информацию пользователю о ходе перепроверки.

Python:
        if error:
            re_check = copy.deepcopy(error)
            error.clear()
            print("\r\033[K", end="")
            with ThreadPoolExecutor(max_workers=5) as executor:
                for i in re_check:
                    ext = i.split("\n")[0].strip()
                    url = i.split("\n")[1].strip()
                    executor.submit(verification, url=url, ext=ext, count=len(re_check), re_ch=True)
            re_check.clear()
            cnt = 0

Ну и код, в котором выводиться информация для пользователя, а также очищаются списки и переменные, если проверить нужно несколько файлов.

Python:
        save_status_error(str(file))
        print(f"\n{Fore.GREEN}{'All'.ljust(8)}{Fore.RESET}|{str(int((len(status)) + (len(error)))).center(6)}| {Fore.GREEN}channels")
        print(f"{Fore.GREEN}{'Online'.ljust(8)}{Fore.RESET}|{str(int(len(status))).center(6)}| {Fore.GREEN}channels")
        print(f"{Fore.GREEN}{'Offline'.ljust(8)}{Fore.RESET}|{str(int(len(error))).center(6)}| {Fore.GREEN}channels\n")

        print(f'{Fore.GREEN}Scan time {Fore.RESET}| '
              f'{(int(time.monotonic() - time_start) // 3600) % 24:d} ч. '
              f'{(int(time.monotonic() - time_start) // 60) % 60:02d} м. '
              f'{int(time.monotonic() - time_start) % 60:02d} с.\n')
        print(Fore.GREEN + "-" * 60 + "\n")
        Path(file).unlink()
        status.clear()
        error.clear()


if __name__ == "__main__":
    main()

Python:
def main():
    """
    Получение данных от пользователя о сканируемой директории.
    Формирование списка файлов в директории.
    Подсчет количества ссылок на потоки и вывод в терминал.
    Запуск потоков для проверки наличия медиа-контента.
    Запуск перепроверки нерабочих потоков.
    Вывод данных о результатах проверки в терминал.
    """
    global status, error, cnt
    path = input("\nВведите путь к директории с m3u: ")
    if not Path(path).exists() or not Path(path).is_dir():
        print("Нет такой директории или указанный путь не является директорией")
        sys.exit(0)

    time_start = time.monotonic()

    files = [fil for fil in Path(path).iterdir() if Path(fil).suffix == ".m3u"]

    for nm, file in enumerate(sorted(files)):
        n = 0
        with open(file, 'r', encoding='utf-8') as f:
            for nn in f.readlines():
                if nn.startswith("http"):
                    n += 1
        print(f'\n{Fore.YELLOW}Checked file: {Fore.RESET}{Path(file).name} | {Fore.YELLOW}Count: {Fore.RESET}{n} | '
              f'{Fore.YELLOW}File: {Fore.RESET}{nm + 1}/{len(files)}\n')
        print(Fore.GREEN + "-" * 60 + "\n")

        ext = ""
        with ThreadPoolExecutor(max_workers=5) as executor:
            with open(file, 'r', encoding='utf-8') as fl:
                for line in fl.readlines():
                    if line.startswith("#EXTINF"):
                        ext = line.strip()
                        continue
                    elif line.startswith("http"):
                        if len(line.strip().split()) > 1:
                            line = line.strip().replace(" ", "")
                        executor.submit(verification, url=line.strip(), ext=ext, count=n)

        if error:
            re_check = copy.deepcopy(error)
            error.clear()
            print("\r\033[K", end="")
            with ThreadPoolExecutor(max_workers=5) as executor:
                for i in re_check:
                    ext = i.split("\n")[0].strip()
                    url = i.split("\n")[1].strip()
                    executor.submit(verification, url=url, ext=ext, count=len(re_check), re_ch=True)
            re_check.clear()
            cnt = 0

        save_status_error(str(file))
        print(f"\n{Fore.GREEN}{'All'.ljust(8)}{Fore.RESET}|{str(int((len(status)) + (len(error)))).center(6)}| {Fore.GREEN}channels")
        print(f"{Fore.GREEN}{'Online'.ljust(8)}{Fore.RESET}|{str(int(len(status))).center(6)}| {Fore.GREEN}channels")
        print(f"{Fore.GREEN}{'Offline'.ljust(8)}{Fore.RESET}|{str(int(len(error))).center(6)}| {Fore.GREEN}channels\n")

        print(f'{Fore.GREEN}Scan time {Fore.RESET}| '
              f'{(int(time.monotonic() - time_start) // 3600) % 24:d} ч. '
              f'{(int(time.monotonic() - time_start) // 60) % 60:02d} м. '
              f'{int(time.monotonic() - time_start) % 60:02d} с.\n')
        print(Fore.GREEN + "-" * 60 + "\n")
        Path(file).unlink()
        status.clear()
        error.clear()


if __name__ == "__main__":
    main()


Функция для запуска проверки и обработки результата

Создадим функцию, в которой будем запускать проверку ссылок, а также обрабатывать ответы полученные после получения данных и обработки результатов. Я назвал данную функцию verification(url: str, ext: str, count: int, re_ch=False). На вход она получает ссылку на канал, описание канала, количество файлов при перепроверке, статус перепроверки.

Здесь мы запускаем функцию проверки, получаем от нее ответ. И в зависимости от того, что вернется, True или False распределяем по спискам, с выводом информации для пользователя.

Python:
    global status, error, cnt

    if load_txt(url):
        status.append(f'{ext}\n{url}\n')
        if re_ch:
            cnt += 1
            print(f"\r{Fore.YELLOW}Re-Check: {Fore.RESET}{cnt}/{count} | {Fore.YELLOW}Online: {Fore.RESET}"
                  f"{len(status)} | {Fore.YELLOW}Offline: {Fore.RESET}{len(error)}",
                  end="")
        else:
            print(f"\r{Fore.YELLOW}Check: {Fore.RESET}{len(status) + len(error)}/{count} | {Fore.YELLOW}Online: "
                  f"{Fore.RESET}{len(status)} | {Fore.YELLOW}Offline: {Fore.RESET}{len(error)}",
                  end="")

Python:
def verification(url: str, ext: str, count: int, re_ch=False):
    """
    Запуск проверки ссылок на поток и обработка результатов
    возвращенных из функции проверки.

    :param url: Ссылка на поток.
    :param ext: Описание потока.
    :param count: Количество ссылок для перепроверки.
    :param re_ch: Статус перепроверки.
    """
    global status, error, cnt

    if load_txt(url):
        status.append(f'{ext}\n{url}\n')
        if re_ch:
            cnt += 1
            print(f"\r{Fore.YELLOW}Re-Check: {Fore.RESET}{cnt}/{count} | {Fore.YELLOW}Online: {Fore.RESET}"
                  f"{len(status)} | {Fore.YELLOW}Offline: {Fore.RESET}{len(error)}",
                  end="")
        else:
            print(f"\r{Fore.YELLOW}Check: {Fore.RESET}{len(status) + len(error)}/{count} | {Fore.YELLOW}Online: "
                  f"{Fore.RESET}{len(status)} | {Fore.YELLOW}Offline: {Fore.RESET}{len(error)}",
                  end="")

    else:
        error.append(f'{ext}\n{url}\n')
        if re_ch:
            cnt += 1
            print(f"\r{Fore.YELLOW}Re-Check: {Fore.RESET}{cnt}/{count} | {Fore.YELLOW}Online: {Fore.RESET}"
                  f"{len(status)} | {Fore.YELLOW}Offline: {Fore.RESET}{len(error)}",
                  end="")
        else:
            print(f"\r{Fore.YELLOW}Check: {Fore.RESET}{len(status) + len(error)}/{count} | {Fore.YELLOW}Online: "
                  f"{Fore.RESET}{len(status)} | {Fore.YELLOW}Offline: {Fore.RESET}{len(error)}",
                  end="")


Функция проверки файлов xml

Не всегда плейлисты прилетают в текстовом виде. Иногда они содержат в ответе xml. Соответственно, его нужно распарсить, чтобы получить сегменты видео и составить на них ссылки. Так как данный тип выбивается из основного потока проверок, я сделал для него отдельную функцию. Ее название - find_mpd(txt: str, url: str) -> bool. На вход она принимает текст ответа на запрос, ссылку для сборки ссылок на сегменты видео или аудио.

Ищем определенные теги, забираем из них текст, составляем ссылки и возвращаем одну из функции. А если сегментов найдено не будет, возвращаем False.

Python:
def find_mpd(txt: str, url: str) -> bool:
    """
    Парсинг плейлиста в формате xml с расширением ".mpd".

    :param txt: Текст запроса.
    :param url: Стартовый url для формирования ссылки на сегмент потока.
    :return: True или False, в зависимости от полученных результатов.
    """
    soup = BeautifulSoup(txt, 'xml')
    seg = soup.find_all('SegmentTemplate')
    seg_list = []
    for i in seg:
        if i.get('initialization') is not None:
            seg_list.append(urljoin(url, i.get('initialization')))
    if seg_list:
        return True if load_txt(seg_list[0]) else False


Поиск ссылок на плейлисты и сегменты видео или аудио

Если бы вы знали, сколько я писал раньше функций для того, чтобы найти определенный тип данных в тексте с использованием регулярок, думаю, что вы бы ужаснулись )). Все же, я пришел к самому простому (не факт, что самому правильному) решению. Просто проитерироваться по тексту. Создадим функцию find_link(url: str, text: str) -> (list, bool). На вход она получает ссылку на канал, для составления ссылки на сегменты, текст из запроса в котором и выполняется поиск ссылок и сегментов. И возвращает список со ссылками или False, если ссылок найдено не было.

Итерируемся по тексту, пропускаем строки с решетками. Все остальные строки забираем и пытаемся составить ссылки. Если список ссылок составлен, то есть не пуст, возвращаем его из функции.

Python:
def find_link(url: str, text: str) -> (list, bool):
    """
    Поиск ссылок на сегменты потока в полученном тексте запроса.

    :param url: Ссылка на поток, для формирования ссылки на сегмент.
    :param text: Текст запроса для поиска ссылок на сегменты или сегментов.
    :return: Список со ссылками или False.
    """
    lnk = []
    for i in text.splitlines():
        if i.startswith("#"):
            continue
        elif not i.strip():
            continue
        lnk.append(i.strip()) if i.startswith("http") else lnk.append(urljoin(url, i.strip()))
    return lnk if lnk else False


Выполнение запросов, получение контента и итерация по нему

И еще одна, последняя в данном скрипте функция для того, чтобы выполнять запросы и обрабатывать ответы на них. Я назвал ее load_txt(url: str) -> bool. На вход она получает ссылку, а возвращает True или False.

Итак, первый кусочек кода. Здесь мы проверяем, не является ли ссылка ссылкой на YouTube. Проверяем именно, чтобы она была в начале. Иногда попадаются ссылки, которые ведут на YouTube через сервер. Их мы не рассматриваем. Только прямые ссылки. Если ссылка на Ютуб, загоняем ее в pytube. Пытаемся получить ID-видео. Так-как это быстрее всего. Если ID получен, значит видео существует. В противном случае просто обрабатываем исключение и возвращаем False.

Python:
    if url.startswith("https://www.youtube.com") \
            or url.startswith("https://youtube.com") \
            or url.startswith("http://www.youtube.com") \
            or url.startswith("http://youtube.com"):
        try:
            yt = YouTube("http://www.youtube.com/watch?v=6UFOJjDqL_g")
            if yt.video_id:
                return True
        except exceptions.RegexMatchError:
            return False

Следующая операция, это выполнение запроса по ссылке и получение ответа. В запросе передаем ссылку, заголовки, указываем timeout, указываем, что запрос читает потоковые данные, явно указываем, что поддерживаем редирект, и отключаем верификацию, то есть проверку сертификатов, так как многие сервера работают еще по протоколу http. Проверяем статус-код. Если он 200, проверяем, нет ли в полученной ссылке расширения. Если есть, запускаем отдельную функцию для парсинга xml. Если нет, двигаемся дальше.

Python:
        res = requests.get(url, headers=headers, timeout=15, stream=True, allow_redirects=True, verify=False)
        if res.status_code == 200:
            if ".mpd" in res.url:
                if find_mpd(res.text, res.url):
                    return True
                return False

Итерируемся по полученному контенту. Так как он в байтах, считываем первые 64. Этого более чем достаточно для попытки определения содержимого. Переводим байты в hex, для того, чтобы получить сигнатуру. Так как смещение у проверяемых типов файлов нулевое, нам не требуется цикл для итерации по смещениям. Поэтому, для начала итерируемя по списку сигнатур с html. Дело в том, что иногда в ответах прилетает 200 статус код. А вот содержимое совсем не соответствует простому тексту. То есть, некоторые сервера отдают ошибки доступа в виде html, операторы связи, если доступ запрещен, вместо статус-кода возвращают страницу с сообщение (к примеру, Мегафон). То есть, здесь нужно убрать html, так как это явная ошибка. Есть одно исключение, которое мы проверяем в начале. Это сигнатура «68 74 74 70». Дело в том, что иногда сервер отдает ссылку на плейлист в виде http. Эта сигнатура ему и соответствует. Если она получена, мы сразу же делаем рекурсию и получаем данные по полученной ссылке. Затем проверяем html. Если находим сигнатуру, возвращаем False.

Дальше идет проверка наличия в ответе текста. Тут уже прилетает то, что нужно. То есть, текст плейлиста. Определяем сигнатуру текста, итерируемся по содержимому до конца, чтобы получить то, что прилетело полностью. Отправляем в функцию для получения ссылок на сегменты. Если возвращается список ссылок — итерируемся по ним и делаем рекурсию в текущую функцию. И так до того, пока не будет находиться ни html, ни текст. Если не прилетит ошибка, конечно, доступа. Если мы, после цикла рекурсий и 200-го статус-кода получаем содержимое, но оно не является текстом или страницей, значит делаем предположение, что это медиаконтект и возвращаем True.

Python:
            for chunk in res.iter_content(64):
                hex_bytes = " ".join(['{:02X}'.format(byte) for byte in chunk])
                if hex_bytes[0:11] == "68 74 74 70":
                    if load_txt(chunk.decode().strip()):
                        return True
                for it in html:
                    if hex_bytes[0:len(it)].upper() == it:
                        return False
                for tx_pl in text_plain:
                    if hex_bytes[0:len(tx_pl)].upper() == tx_pl:
                        for item in res.iter_content(1024):
                            chunk = chunk + item
                        if lnk := find_link(res.url, chunk.decode()):
                            for ln in lnk:
                                if load_txt(ln):
                                    return True
                                continue
                        elif not lnk:
                            return False
                    continue
                return True
            return False
        return False
    except Exception as ex:
        return True if "ICY 200 OK" in str(ex) else False

В исключение здесь добавлена проверка радио. Находил плейлисты, уже наверное второй раз не найду, где вполне рабочее радио и воспроизводиться медиаплеером. Вот только при попытке выполнения запроса вышибает в исключения. Ну и тогда я посмотрел, что прилетает в тексте. То есть, делаю проверку, если есть данное словосочетание, возвращаем True, радио рабочее, на и для остальных исключений — False.

Обработку других статус-кодов в данном контексте, думаю, делать не нужно. Если у нас 300-е коды, то переадресация происходит автоматом. Ну, а если 500-е, то ошибки сервера могут уйти при перепроверке, равно, как и 400-е коды.

Python:
# pip install colorama requests bs4 lxml pytube

import copy
import sys
import time
from concurrent.futures import ThreadPoolExecutor
from pathlib import Path
from urllib.parse import urljoin

import requests
from bs4 import BeautifulSoup
from colorama import Fore
from colorama import init
from pytube import YouTube, exceptions

requests.packages.urllib3.disable_warnings()

init()

text_plain = {"23 45 58 54", "5B 70 6C 61 79 6C 69 73", "74 65 78 74 2F 70 6C 61 69 6E", "EF BB BF 00", "FF FE 00 00",
              "74 65 78 74 2F 70 6C 61 69 6E", "3B 20 63 68 61 72 73 65 74 3D", "49 53 4F 2D 38 38 35 39 2D 31",
              "69 73 6F 2D 38 38 35 39 2D 31", "55 54 46 2D 38", "FE FF 00 00"}

html = {"7B 22", "3C 21 44 4F 43 54 59 50 45 20", "48 54 4D 4C TT", "3C 48 54 4D 4C TT", "3C 48 45 41 44 TT",
        "3C 53 43 52 49 50 54 TT", "3C 49 46 52 41 4D 45 TT", "3C 48 31 TT", "3C 44 49 56 TT", "3C 46 4F 4E 54 TT",
        "3C 54 41 42 4C 45 TT", "3C 41 TT", "3C 53 54 59 4C 45 TT", "3C 54 49 54 4C 45 TT", "3C 42 TT", "3C 42 52 TT",
        "3C 42 4F 44 59 TT", "3C 50 TT", "3C 21 2D 2D TT", "3C 3F 78 6D 6C", "25 50 44 46 2D", "4E 6F", "3C 41", "3C",
        "45 52"}

headers = {
    "User-Agent": "libmpv",
    "Accept": "*/*",
    "Connection": "keep-alive",
    "Icy-MetaData": "1"
}

status = []
error = []
cnt = 0


def save_status_error(path: str):
    """
    Сохранение содержимого списков status и error с рабочими и нерабочими ссылками.

    :param path: Путь к открытому для проверки файлу.
    """
    global status, error
    print("\n\n" + "-" * 60)
    print(f"\n{Fore.CYAN}SAVE DATA IN FILE\n{'-'*17}\n")
    (Path.cwd() / "checked").mkdir(exist_ok=True)

    if len(status) > 0:
        (Path.cwd() / "checked" / "good").mkdir(exist_ok=True)
        try:
            name = Path.cwd() / "checked" / "good" / f'{Path(path).name.split(Path(path).suffix)[0]}_good_' \
                                                     f'{int(len(status))}.m3u'
        except ValueError:
            name = Path.cwd() / "checked" / "good" / f'{Path(path).name}_good_{int(len(status))}.m3u'
        with open(name, "a", encoding='utf-8') as f:
            f.write("#EXTM3U\n")
            for item in sorted(status):
                f.write(f"{item}")
        print(f'{Fore.GREEN}GOOD SAVED: {Fore.YELLOW}"checked / good" -> "{name.name}"')

    if len(error) > 0:
        (Path.cwd() / "checked" / "error").mkdir(exist_ok=True)
        try:
            name = Path.cwd() / "checked" / "error" / f'{Path(path).name.split(Path(path).suffix)[0]}_error_' \
                                                      f'{int(len(error))}.m3u'
        except ValueError:
            name = Path.cwd() / "checked" / "error" / f'{Path(path).name}_error_{int(len(error))}.m3u'
        with open(name, "a", encoding='utf-8') as f:
            f.write("#EXTM3U\n")
            for item in sorted(error):
                f.write(f"{item}")
        print(f'{Fore.GREEN}ERROR SAVED: {Fore.YELLOW}"checked / error" -> "{name.name}"')

    print(Fore.GREEN + "\n" + "-" * 60)


def find_mpd(txt: str, url: str) -> bool:
    """
    Парсинг плейлиста в формате xml с расширением ".mpd".

    :param txt: Текст запроса.
    :param url: Стартовый url для формирования ссылки на сегмент потока.
    :return: True или False, в зависимости от полученных результатов.
    """
    soup = BeautifulSoup(txt, 'xml')
    seg = soup.find_all('SegmentTemplate')
    seg_list = []
    for i in seg:
        if i.get('initialization') is not None:
            seg_list.append(urljoin(url, i.get('initialization')))
    if seg_list:
        return True if load_txt(seg_list[0]) else False


def load_txt(url: str) -> bool:
    """
    Получение данных по ссылке на поток.

    :param url: Ссылка на поток.
    :return: True или False в зависимости от полученного результата.
    """
    # Проверка наличия ссылки на YouTube и запуск
    # проверки наличия идентификатора видео.
    # Если идентификатор есть, ссылка рабочая.
    if url.startswith("https://www.youtube.com") \
            or url.startswith("https://youtube.com") \
            or url.startswith("http://www.youtube.com") \
            or url.startswith("http://youtube.com"):
        try:
            yt = YouTube("http://www.youtube.com/watch?v=6UFOJjDqL_g")
            if yt.video_id:
                return True
        except exceptions.RegexMatchError:
            return False
    try:
        # Запрос данных по ссылке на поток с включенной функцией получения данных в потоке. Если статус-код 200,
        # выполняем последующие проверки. Первая - наличие в ссылке расширения с файлом xml, в котором
        # парсятся ссылки на сегменты видео.
        res = requests.get(url, headers=headers, timeout=15, stream=True, allow_redirects=True, verify=False)
        if res.status_code == 200:
            if ".mpd" in res.url:
                if find_mpd(res.text, res.url):
                    return True
                return False

            # Итерация по поученному контенту и проверка первых 64 байт на наличие
            # в получаемом потоке определенного типа данных по их сигнатуре.
            # Переводим байты в hex, проверяем: 1. Наличие ссылки (сигнатура: 68 74...);
            # 2. Проверка наличия в полученных данных html. Если есть, возвращаем False;
            # 3. Проверка наличия в полученных данных текста. Если находим, передаем в
            # функцию для обработки.
            # Если поток получен, но в нем не найдена ни одна сигнатура, будем считать,
            # что в данном потоке передаются медиа-данные, а значит возвращаем True.
            for chunk in res.iter_content(64):
                hex_bytes = " ".join(['{:02X}'.format(byte) for byte in chunk])
                if hex_bytes[0:11] == "68 74 74 70":
                    if load_txt(chunk.decode().strip()):
                        return True
                for it in html:
                    if hex_bytes[0:len(it)].upper() == it:
                        return False
                for tx_pl in text_plain:
                    if hex_bytes[0:len(tx_pl)].upper() == tx_pl:
                        for item in res.iter_content(1024):
                            chunk = chunk + item
                        if lnk := find_link(res.url, chunk.decode()):
                            for ln in lnk:
                                if load_txt(ln):
                                    return True
                                continue
                        elif not lnk:
                            return False
                    continue
                return True
            return False
        return False
    except Exception as ex:
        return True if "ICY 200 OK" in str(ex) else False


def find_link(url: str, text: str) -> (list, bool):
    """
    Поиск ссылок на сегменты потока в полученном тексте запроса.

    :param url: Ссылка на поток, для формирования ссылки на сегмент.
    :param text: Текст запроса для поиска ссылок на сегменты или сегментов.
    :return: Список со ссылками или False.
    """
    lnk = []
    for i in text.splitlines():
        if i.startswith("#"):
            continue
        elif not i.strip():
            continue
        lnk.append(i.strip()) if i.startswith("http") else lnk.append(urljoin(url, i.strip()))
    return lnk if lnk else False


def verification(url: str, ext: str, count: int, re_ch=False):
    """
    Запуск проверки ссылок на поток и обработка результатов
    возвращенных из функции проверки.

    :param url: Ссылка на поток.
    :param ext: Описание потока.
    :param count: Количество ссылок для перепроверки.
    :param re_ch: Статус перепроверки.
    """
    global status, error, cnt

    if load_txt(url):
        status.append(f'{ext}\n{url}\n')
        if re_ch:
            cnt += 1
            print(f"\r{Fore.YELLOW}Re-Check: {Fore.RESET}{cnt}/{count} | {Fore.YELLOW}Online: {Fore.RESET}"
                  f"{len(status)} | {Fore.YELLOW}Offline: {Fore.RESET}{len(error)}",
                  end="")
        else:
            print(f"\r{Fore.YELLOW}Check: {Fore.RESET}{len(status) + len(error)}/{count} | {Fore.YELLOW}Online: "
                  f"{Fore.RESET}{len(status)} | {Fore.YELLOW}Offline: {Fore.RESET}{len(error)}",
                  end="")

    else:
        error.append(f'{ext}\n{url}\n')
        if re_ch:
            cnt += 1
            print(f"\r{Fore.YELLOW}Re-Check: {Fore.RESET}{cnt}/{count} | {Fore.YELLOW}Online: {Fore.RESET}"
                  f"{len(status)} | {Fore.YELLOW}Offline: {Fore.RESET}{len(error)}",
                  end="")
        else:
            print(f"\r{Fore.YELLOW}Check: {Fore.RESET}{len(status) + len(error)}/{count} | {Fore.YELLOW}Online: "
                  f"{Fore.RESET}{len(status)} | {Fore.YELLOW}Offline: {Fore.RESET}{len(error)}",
                  end="")


def main():
    """
    Получение данных от пользователя о сканируемой директории.
    Формирование списка файлов в директории.
    Подсчет количества ссылок на потоки и вывод в терминал.
    Запуск потоков для проверки наличия медиа-контента.
    Запуск перепроверки нерабочих потоков.
    Вывод данных о результатах проверки в терминал.
    """
    global status, error, cnt
    path = input("\nВведите путь к директории с m3u: ")
    if not Path(path).exists() or not Path(path).is_dir():
        print("Нет такой директории или указанный путь не является директорией")
        sys.exit(0)

    time_start = time.monotonic()

    files = [fil for fil in Path(path).iterdir() if Path(fil).suffix == ".m3u"]

    for nm, file in enumerate(sorted(files)):
        n = 0
        with open(file, 'r', encoding='utf-8') as f:
            for nn in f.readlines():
                if nn.startswith("http"):
                    n += 1
        print(f'\n{Fore.YELLOW}Checked file: {Fore.RESET}{Path(file).name} | {Fore.YELLOW}Count: {Fore.RESET}{n} | '
              f'{Fore.YELLOW}File: {Fore.RESET}{nm + 1}/{len(files)}\n')
        print(Fore.GREEN + "-" * 60 + "\n")

        ext = ""
        with ThreadPoolExecutor(max_workers=5) as executor:
            with open(file, 'r', encoding='utf-8') as fl:
                for line in fl.readlines():
                    if line.startswith("#EXTINF"):
                        ext = line.strip()
                        continue
                    elif line.startswith("http"):
                        if len(line.strip().split()) > 1:
                            line = line.strip().replace(" ", "")
                        executor.submit(verification, url=line.strip(), ext=ext, count=n)

        if error:
            re_check = copy.deepcopy(error)
            error.clear()
            print("\r\033[K", end="")
            with ThreadPoolExecutor(max_workers=5) as executor:
                for i in re_check:
                    ext = i.split("\n")[0].strip()
                    url = i.split("\n")[1].strip()
                    executor.submit(verification, url=url, ext=ext, count=len(re_check), re_ch=True)
            re_check.clear()
            cnt = 0

        save_status_error(str(file))
        print(f"\n{Fore.GREEN}{'All'.ljust(8)}{Fore.RESET}|{str(int((len(status)) + (len(error)))).center(6)}| {Fore.GREEN}channels")
        print(f"{Fore.GREEN}{'Online'.ljust(8)}{Fore.RESET}|{str(int(len(status))).center(6)}| {Fore.GREEN}channels")
        print(f"{Fore.GREEN}{'Offline'.ljust(8)}{Fore.RESET}|{str(int(len(error))).center(6)}| {Fore.GREEN}channels\n")

        print(f'{Fore.GREEN}Scan time {Fore.RESET}| '
              f'{(int(time.monotonic() - time_start) // 3600) % 24:d} ч. '
              f'{(int(time.monotonic() - time_start) // 60) % 60:02d} м. '
              f'{int(time.monotonic() - time_start) % 60:02d} с.\n')
        print(Fore.GREEN + "-" * 60 + "\n")
        Path(file).unlink()
        status.clear()
        error.clear()


if __name__ == "__main__":
    main()

Вот такой вот получился чекер. Ниже скриншот, который показывает результаты работы чекера.

001.png

И на этом можно было бы и закончить. Так как с технической стороны чекер свою работы делает. К сожалению, да, бывает, что вылетают некоторые каналы в ошибки. Но, с этим трудно что-то поделать. Я пробовал перепроверять рекурсивно до того момента, пока количество ошибок не будет равно одному и тому же значению хотя бы три раза. Но ничего хорошего из этого не получилось. Да, с каждой проверкой количество рабочих каналов увеличивалось. Однако, после перепроверки в плеере они просто не работали. Так что, тут еще важно соблюсти некоторый баланс.

Так вот, технически все вроде бы неплохо. Но, если доступ к контенту запрещен, то у вас на экране будет вот такая:

002.png


или такая:

003.png


картинки. Есть еще больше всяческих их разновидностей. То есть, контент там есть. А вот толку от него нет. Я решил попробовать избавиться от этих мусорных каналов. Поискал повторяющиеся паттерны в ссылках которые возвращаются от сервера и в ссылках на сегменты. И наковырял небольшой список, который отсеивает большую часть мусора. Иногда он прорывается, но это уже не зависит от фильтра. Просто при проверке ссылка была несколько другой. Я даже специально смотрел в дебаггере.

По итогу получилась еще одна версия скрипта, в которую я добавил одну функцию и две проверки.

Вот добавленная функция:

Python:
def find_stop(url: str) -> bool:
    """
    Поиск паттернов с помощью которых можно отсеять некоторые
    каналы с заблокированным содержимым.

    :param url: Ссылка для поиска паттерна.
    :return: True или False в зависимости от результата.
    """
    for st in stop_list:
        if findall(f"{st}", url):
            return True
    return False

Стоп-лист, по которому итерируется цикл:

Python:
stop_list = ["/errors/", "test_end.ts", "buy.ts", "money.ts", "buy_packet.ts", "empty.ts", "key.ts", "/error/",
             "/NOT_CLIENT/", "http://logo.apk-red.com/tv/hata.jpg", "http://v.viplime.fun/video/user.ts", "/forbidden/",
             "zabava-block-htvod.cdn.ngenix.net", "err-ru.sulfat.li", "/000/", "BanT0ken", "/activate/", "/404/",
             "/405/", "www.cloudflare-terms-of-service-abuse.com", "http://cdn01.lifeyosso.fun:8080/connect/mono.m3u8",
             "http://nl4.iptv.monster/9999/video.m3u8", "http://v.viplime.fun/video/block.ts", "auth.m3u8", "/block/",
             "auth", "logout", "VDO-X-404", "offline", "logout.mp4", "error.tv4.live", "block-ip-video.ts"]

Первый участок кода, куда я добавил проверку:

Python:
        res = requests.get(url, headers=headers, timeout=15, stream=True, allow_redirects=True, verify=False)
        if res.status_code == 200:
            if find_stop(res.url):
                return False

Второй участок кода:

Python:
                for tx_pl in text_plain:
                    if hex_bytes[0:len(tx_pl)].upper() == tx_pl:
                        for item in res.iter_content(1024):
                            chunk = chunk + item
                        if lnk := find_link(res.url, chunk.decode()):
                            for ln in lnk:
                                if find_stop(ln):
                                    return False
                                if load_txt(ln):
                                    return True
                                continue

Python:
# pip install colorama requests bs4 lxml

import copy
import sys
import time
from concurrent.futures import ThreadPoolExecutor
from pathlib import Path
from re import findall
from urllib.parse import urljoin

import requests
from bs4 import BeautifulSoup
from colorama import Fore
from colorama import init
from pytube import YouTube, exceptions

requests.packages.urllib3.disable_warnings()

init()

text_plain = {"23 45 58 54", "5B 70 6C 61 79 6C 69 73", "74 65 78 74 2F 70 6C 61 69 6E", "EF BB BF 00", "FF FE 00 00",
              "74 65 78 74 2F 70 6C 61 69 6E", "3B 20 63 68 61 72 73 65 74 3D", "49 53 4F 2D 38 38 35 39 2D 31",
              "69 73 6F 2D 38 38 35 39 2D 31", "55 54 46 2D 38", "FE FF 00 00"}

html = {"7B 22", "3C 21 44 4F 43 54 59 50 45 20", "48 54 4D 4C TT", "3C 48 54 4D 4C TT", "3C 48 45 41 44 TT",
        "3C 53 43 52 49 50 54 TT", "3C 49 46 52 41 4D 45 TT", "3C 48 31 TT", "3C 44 49 56 TT", "3C 46 4F 4E 54 TT",
        "3C 54 41 42 4C 45 TT", "3C 41 TT", "3C 53 54 59 4C 45 TT", "3C 54 49 54 4C 45 TT", "3C 42 TT", "3C 42 52 TT",
        "3C 42 4F 44 59 TT", "3C 50 TT", "3C 21 2D 2D TT", "3C 3F 78 6D 6C", "25 50 44 46 2D", "4E 6F", "3C 41", "3C",
        "45 52"}

stop_list = ["/errors/", "test_end.ts", "buy.ts", "money.ts", "buy_packet.ts", "empty.ts", "key.ts", "/error/",
             "/NOT_CLIENT/", "http://logo.apk-red.com/tv/hata.jpg", "http://v.viplime.fun/video/user.ts", "/forbidden/",
             "zabava-block-htvod.cdn.ngenix.net", "err-ru.sulfat.li", "/000/", "BanT0ken", "/activate/", "/404/",
             "/405/", "www.cloudflare-terms-of-service-abuse.com", "http://cdn01.lifeyosso.fun:8080/connect/mono.m3u8",
             "http://nl4.iptv.monster/9999/video.m3u8", "http://v.viplime.fun/video/block.ts", "auth.m3u8", "/block/",
             "auth", "logout", "VDO-X-404", "offline", "logout.mp4", "error.tv4.live", "block-ip-video.ts"]

headers = {
    "User-Agent": "libmpv",
    "Accept": "*/*",
    "Connection": "keep-alive",
    "Icy-MetaData": "1"
}

status = []
error = []
cnt = 0


def save_status_error(path: str):
    """
    Сохранение содержимого списков status и error с рабочими и нерабочими ссылками.

    :param path: Путь к открытому для проверки файлу.
    """
    global status, error
    print("\n\n" + "-" * 60)
    print(f"\n{Fore.CYAN}SAVE DATA IN FILE\n{'-'*17}\n")
    (Path.cwd() / "checked").mkdir(exist_ok=True)

    if len(status) > 0:
        (Path.cwd() / "checked" / "good").mkdir(exist_ok=True)
        try:
            name = Path.cwd() / "checked" / "good" / f'{Path(path).name.split(Path(path).suffix)[0]}_good_' \
                                                     f'{int(len(status))}.m3u'
        except ValueError:
            name = Path.cwd() / "checked" / "good" / f'{Path(path).name}_good_{int(len(status))}.m3u'
        with open(name, "a", encoding='utf-8') as f:
            f.write("#EXTM3U\n")
            for item in sorted(status):
                f.write(f"{item}")
        print(f'{Fore.GREEN}GOOD SAVED: {Fore.YELLOW}"checked / good" -> "{name.name}"')

    if len(error) > 0:
        (Path.cwd() / "checked" / "error").mkdir(exist_ok=True)
        try:
            name = Path.cwd() / "checked" / "error" / f'{Path(path).name.split(Path(path).suffix)[0]}_error_' \
                                                      f'{int(len(error))}.m3u'
        except ValueError:
            name = Path.cwd() / "checked" / "error" / f'{Path(path).name}_error_{int(len(error))}.m3u'
        with open(name, "a", encoding='utf-8') as f:
            f.write("#EXTM3U\n")
            for item in sorted(error):
                f.write(f"{item}")
        print(f'{Fore.GREEN}ERROR SAVED: {Fore.YELLOW}"checked / error" -> "{name.name}"')

    print(Fore.GREEN + "\n" + "-" * 60)


def find_mpd(txt: str, url: str) -> bool:
    """
    Парсинг плейлиста в формате xml с расширением ".mpd".

    :param txt: Текст запроса.
    :param url: Стартовый url для формирования ссылки на сегмент потока.
    :return: True или False, в зависимости от полученных результатов.
    """
    soup = BeautifulSoup(txt, 'xml')
    seg = soup.find_all('SegmentTemplate')
    seg_list = []
    for i in seg:
        if i.get('initialization') is not None:
            seg_list.append(urljoin(url, i.get('initialization')))
    if seg_list:
        return True if load_txt(seg_list[0]) else False


def find_stop(url: str) -> bool:
    """
    Поиск паттернов с помощью которых можно отсеять некоторые
    каналы с заблокированным содержимым.

    :param url: Ссылка для поиска паттерна.
    :return: True или False в зависимости от результата.
    """
    for st in stop_list:
        if findall(f"{st}", url):
            return True
    return False


def load_txt(url: str) -> bool:
    """
    Получение данных по ссылке на поток.

    :param url: Ссылка на поток.
    :return: True или False в зависимости от полученного результата.
    """
    # Проверка наличия ссылки на YouTube и запуск
    # проверки наличия идентификатора видео.
    # Если идентификатор есть, ссылка рабочая.
    if url.startswith("https://www.youtube.com") \
            or url.startswith("https://youtube.com") \
            or url.startswith("http://www.youtube.com") \
            or url.startswith("http://youtube.com"):
        try:
            yt = YouTube("http://www.youtube.com/watch?v=6UFOJjDqL_g")
            if yt.video_id:
                return True
        except exceptions.RegexMatchError:
            return False
    try:
        # Запрос данных по ссылке на поток с включенной функцией получения данных в потоке. Если статус-код 200,
        # выполняем последующие проверки. Первая - наличие в ссылке расширения с файлом xml, в котором
        # парсятся ссылки на сегменты видео.
        res = requests.get(url, headers=headers, timeout=15, stream=True, allow_redirects=True, verify=False)
        if res.status_code == 200:
            if find_stop(res.url):
                return False

            if ".mpd" in res.url:
                if find_mpd(res.text, res.url):
                    return True
                return False

            # Итерация по поученному контенту и проверка первых 64 байт на наличие
            # в получаемом потоке определенного типа данных по их сигнатуре.
            # Переводим байты в hex, проверяем: 1. Наличие ссылки (сигнатура: 68 74...);
            # 2. Проверка наличия в полученных данных html. Если есть, возвращаем False;
            # 3. Проверка наличия в полученных данных текста. Если находим, передаем в
            # функцию для обработки.
            # Если поток получен, но в нем не найдена ни одна сигнатура, будем считать,
            # что в данном потоке передаются медиа-данные, а значит возвращаем True.
            for chunk in res.iter_content(64):
                hex_bytes = " ".join(['{:02X}'.format(byte) for byte in chunk])
                if hex_bytes[0:11] == "68 74 74 70":
                    if load_txt(chunk.decode().strip()):
                        return True
                for it in html:
                    if hex_bytes[0:len(it)].upper() == it:
                        return False
                for tx_pl in text_plain:
                    if hex_bytes[0:len(tx_pl)].upper() == tx_pl:
                        for item in res.iter_content(1024):
                            chunk = chunk + item
                        if lnk := find_link(res.url, chunk.decode()):
                            for ln in lnk:
                                if find_stop(ln):
                                    return False
                                if load_txt(ln):
                                    return True
                                continue
                        elif not lnk:
                            return False
                    continue
                return True
            return False
        return False
    except Exception as ex:
        return True if "ICY 200 OK" in str(ex) else False


def find_link(url: str, text: str) -> (list, bool):
    """
    Поиск ссылок на сегменты потока в полученном тексте запроса.

    :param url: Ссылка на поток, для формирования ссылки на сегмент.
    :param text: Текст запроса для поиска ссылок на сегменты или сегментов.
    :return: Список со ссылками или False.
    """
    lnk = []
    for i in text.splitlines():
        if i.startswith("#"):
            continue
        elif not i.strip():
            continue
        lnk.append(i.strip()) if i.startswith("http") else lnk.append(urljoin(url, i.strip()))
    return lnk if lnk else False


def verification(url: str, ext: str, count: int, re_ch=False):
    """
    Запуск проверки ссылок на поток и обработка результатов
    возвращенных из функции проверки.

    :param url: Ссылка на поток.
    :param ext: Описание потока.
    :param count: Количество ссылок для перепроверки.
    :param re_ch: Статус перепроверки.
    """
    global status, error, cnt

    if load_txt(url):
        status.append(f'{ext}\n{url}\n')
        if re_ch:
            cnt += 1
            print(f"\r{Fore.YELLOW}Re-Check: {Fore.RESET}{cnt}/{count} | {Fore.YELLOW}Online: {Fore.RESET}"
                  f"{len(status)} | {Fore.YELLOW}Offline: {Fore.RESET}{len(error)}",
                  end="")
        else:
            print(f"\r{Fore.YELLOW}Check: {Fore.RESET}{len(status) + len(error)}/{count} | {Fore.YELLOW}Online: "
                  f"{Fore.RESET}{len(status)} | {Fore.YELLOW}Offline: {Fore.RESET}{len(error)}",
                  end="")

    else:
        error.append(f'{ext}\n{url}\n')
        if re_ch:
            cnt += 1
            print(f"\r{Fore.YELLOW}Re-Check: {Fore.RESET}{cnt}/{count} | {Fore.YELLOW}Online: {Fore.RESET}"
                  f"{len(status)} | {Fore.YELLOW}Offline: {Fore.RESET}{len(error)}",
                  end="")
        else:
            print(f"\r{Fore.YELLOW}Check: {Fore.RESET}{len(status) + len(error)}/{count} | {Fore.YELLOW}Online: "
                  f"{Fore.RESET}{len(status)} | {Fore.YELLOW}Offline: {Fore.RESET}{len(error)}",
                  end="")


def main():
    """
    Получение данных от пользователя о сканируемой директории.
    Формирование списка файлов в директории.
    Подсчет количества ссылок на потоки и вывод в терминал.
    Запуск потоков для проверки наличия медиа-контента.
    Запуск перепроверки нерабочих потоков.
    Вывод данных о результатах проверки в терминал.
    """
    global status, error, cnt
    path = input("\nВведите путь к директории с m3u: ")
    if not Path(path).exists() or not Path(path).is_dir():
        print("Нет такой директории или указанный путь не является директорией")
        sys.exit(0)

    time_start = time.monotonic()

    files = [fil for fil in Path(path).iterdir() if Path(fil).suffix == ".m3u"]

    for nm, file in enumerate(sorted(files)):
        n = 0
        with open(file, 'r', encoding='utf-8') as f:
            for nn in f.readlines():
                if nn.startswith("http"):
                    n += 1
        print(f'\n{Fore.YELLOW}Checked file: {Fore.RESET}{Path(file).name} | {Fore.YELLOW}Count: {Fore.RESET}{n} | '
              f'{Fore.YELLOW}File: {Fore.RESET}{nm + 1}/{len(files)}\n')
        print(Fore.GREEN + "-" * 60 + "\n")

        num = 0
        ext = ""
        with ThreadPoolExecutor(max_workers=5) as executor:
            with open(file, 'r', encoding='utf-8') as fl:
                for line in fl.readlines():
                    if line.startswith("#EXTINF"):
                        ext = line.strip()
                        continue
                    elif line.startswith("http"):
                        num += 1
                        if len(line.strip().split()) > 1:
                            line = line.strip().replace(" ", "")
                        executor.submit(verification, url=line.strip(), ext=ext, count=n)

        if error:
            re_check = copy.deepcopy(error)
            error.clear()
            print("\r\033[K", end="")
            with ThreadPoolExecutor(max_workers=5) as executor:
                for i in re_check:
                    ext = i.split("\n")[0].strip()
                    url = i.split("\n")[1].strip()
                    executor.submit(verification, url=url, ext=ext, count=len(re_check), re_ch=True)
            re_check.clear()
            cnt = 0

        save_status_error(str(file))
        print(f"\n{Fore.GREEN}{'All'.ljust(8)}{Fore.RESET}|{str(int((len(status)) + (len(error)))).center(6)}| {Fore.GREEN}channels")
        print(f"{Fore.GREEN}{'Online'.ljust(8)}{Fore.RESET}|{str(int(len(status))).center(6)}| {Fore.GREEN}channels")
        print(f"{Fore.GREEN}{'Offline'.ljust(8)}{Fore.RESET}|{str(int(len(error))).center(6)}| {Fore.GREEN}channels\n")

        print(f'{Fore.GREEN}Scan time {Fore.RESET}| '
              f'{(int(time.monotonic() - time_start) // 3600) % 24:d} ч. '
              f'{(int(time.monotonic() - time_start) // 60) % 60:02d} м. '
              f'{int(time.monotonic() - time_start) % 60:02d} с.\n')
        print(Fore.GREEN + "-" * 60 + "\n")

        status.clear()
        error.clear()

        # Path(file).unlink()


if __name__ == "__main__":
    main()

Но, на этом еще не все. Есть третья версия чекера. На ней я пока что и остановился. Дело в том, что попадаются в плейлистах ссылки, у которых вместо протоколов http или https указан протокол rtmp. Этот протокол для потоковой передачи данных разработанный фирмой Adobe. В некоторых случаях по данному протоколу вещают веб-камеры. Так как данный протокол был проприетарным, то и разработка модулей под него велась не особо активно, так как никто толком не знал полной его спецификации. Я нашел модуль для получения данных, но пока что в нем еще не разобрался. Однако, данный протокол понимает OpenCV. И с его помощью можно осуществить проверку. Вот только есть одна неприятная особенность. Когда в коде возникает исключение, библиотека выбрасывает в терминал длинную строку с исключением. И, к сожалению, я не нашел способ его обработки. Программа работает, но вот эти сообщения не очень приятны. Тем не менее, дело свое он делает.

Третья версия включает проверку данного протокола. Для того, чтобы она работала, нужно установить библиотеку opencv-python. Для ее установки пишем в терминале:

pip install opencv-python

Функция, которая добавляется в этом случае:

Python:
def check_rtmp(url: str) -> bool:
    """
    Проверка медиа-потока по протоколу передачи данных rtmp.
    Для проверки используется OpenCV.

    :param url: Ссылка на поток для проверки.
    :return: True или False в зависимости от результата.
    """
    video = cv2.VideoCapture(url)
    while True:
        grabbed, frame = video.read()
        if grabbed:
            video.release()
            return True
        return False

Код для запуска функции, который добавляется в функцию verification.

Python:
  if re_ch == "rtmp":
        if check_rtmp(url):
            status.append(f'{ext}\n{url}\n')
            cnt += 1
            print(f"\r{Fore.YELLOW}Check rtmp: {Fore.RESET}{cnt}/{count} | {Fore.YELLOW}Online: {Fore.RESET}"
                  f"{len(status)} | {Fore.YELLOW}Offline: {Fore.RESET}{len(error)}",
                  end="")
            return
        else:
            error.append(f'{ext}\n{url}\n')
            cnt += 1
            print(f"\r{Fore.YELLOW}Check rtmp: {Fore.RESET}{cnt}/{count} | {Fore.YELLOW}Online: {Fore.RESET}"
                  f"{len(status)} | {Fore.YELLOW}Offline: {Fore.RESET}{len(error)}",
                  end="")
            return

Python:
# pip install colorama requests bs4 lxml

import copy
import sys
import time
from concurrent.futures import ThreadPoolExecutor
from pathlib import Path
from urllib.parse import urljoin

import cv2
import requests
from bs4 import BeautifulSoup
from colorama import Fore
from colorama import init
from pytube import YouTube, exceptions

requests.packages.urllib3.disable_warnings()

init()

text_plain = {"23 45 58 54", "5B 70 6C 61 79 6C 69 73", "74 65 78 74 2F 70 6C 61 69 6E", "EF BB BF 00", "FF FE 00 00",
              "74 65 78 74 2F 70 6C 61 69 6E", "3B 20 63 68 61 72 73 65 74 3D", "49 53 4F 2D 38 38 35 39 2D 31",
              "69 73 6F 2D 38 38 35 39 2D 31", "55 54 46 2D 38", "FE FF 00 00"}

html = {"7B 22", "3C 21 44 4F 43 54 59 50 45 20", "48 54 4D 4C TT", "3C 48 54 4D 4C TT", "3C 48 45 41 44 TT",
        "3C 53 43 52 49 50 54 TT", "3C 49 46 52 41 4D 45 TT", "3C 48 31 TT", "3C 44 49 56 TT", "3C 46 4F 4E 54 TT",
        "3C 54 41 42 4C 45 TT", "3C 41 TT", "3C 53 54 59 4C 45 TT", "3C 54 49 54 4C 45 TT", "3C 42 TT", "3C 42 52 TT",
        "3C 42 4F 44 59 TT", "3C 50 TT", "3C 21 2D 2D TT", "3C 3F 78 6D 6C", "25 50 44 46 2D", "4E 6F", "3C 41", "3C",
        "45 52"}

headers = {
    "User-Agent": "libmpv",
    "Accept": "*/*",
    "Connection": "keep-alive",
    "Icy-MetaData": "1"
}

status = []
error = []
cnt = 0


def save_status_error(path: str):
    """
    Сохранение содержимого списков status и error с рабочими и нерабочими ссылками.

    :param path: Путь к открытому для проверки файлу.
    """
    global status, error
    print("\n\n" + "-" * 60)
    print(f"\n{Fore.CYAN}SAVE DATA IN FILE\n{'-'*17}\n")
    (Path.cwd() / "checked").mkdir(exist_ok=True)

    if len(status) > 0:
        (Path.cwd() / "checked" / "good").mkdir(exist_ok=True)
        try:
            name = Path.cwd() / "checked" / "good" / f'{Path(path).name.split(Path(path).suffix)[0]}_good_' \
                                                     f'{int(len(status))}.m3u'
        except ValueError:
            name = Path.cwd() / "checked" / "good" / f'{Path(path).name}_good_{int(len(status))}.m3u'
        with open(name, "a", encoding='utf-8') as f:
            f.write("#EXTM3U\n")
            for item in sorted(status):
                f.write(f"{item}")
        print(f'{Fore.GREEN}GOOD SAVED: {Fore.YELLOW}"checked / good" -> "{name.name}"')

    if len(error) > 0:
        (Path.cwd() / "checked" / "error").mkdir(exist_ok=True)
        try:
            name = Path.cwd() / "checked" / "error" / f'{Path(path).name.split(Path(path).suffix)[0]}_error_' \
                                                      f'{int(len(error))}.m3u'
        except ValueError:
            name = Path.cwd() / "checked" / "error" / f'{Path(path).name}_error_{int(len(error))}.m3u'
        with open(name, "a", encoding='utf-8') as f:
            f.write("#EXTM3U\n")
            for item in sorted(error):
                f.write(f"{item}")
        print(f'{Fore.GREEN}ERROR SAVED: {Fore.YELLOW}"checked / error" -> "{name.name}"')

    print(Fore.GREEN + "\n" + "-" * 60)


def find_mpd(txt: str, url: str) -> bool:
    """
    Парсинг плейлиста в формате xml с расширением ".mpd".

    :param txt: Текст запроса.
    :param url: Стартовый url для формирования ссылки на сегмент потока.
    :return: True или False, в зависимости от полученных результатов.
    """
    soup = BeautifulSoup(txt, 'xml')
    seg = soup.find_all('SegmentTemplate')
    seg_list = []
    for i in seg:
        if i.get('initialization') is not None:
            seg_list.append(urljoin(url, i.get('initialization')))
    if seg_list:
        return True if load_txt(seg_list[0]) else False


def load_txt(url: str) -> bool:
    """
    Получение данных по ссылке на поток.

    :param url: Ссылка на поток.
    :return: True или False в зависимости от полученного результата.
    """
    # Проверка наличия ссылки на YouTube и запуск
    # проверки наличия идентификатора видео.
    # Если идентификатор есть, ссылка рабочая.
    if url.startswith("https://www.youtube.com") \
            or url.startswith("https://youtube.com") \
            or url.startswith("http://www.youtube.com") \
            or url.startswith("http://youtube.com"):
        try:
            yt = YouTube("http://www.youtube.com/watch?v=6UFOJjDqL_g")
            if yt.video_id:
                return True
        except exceptions.RegexMatchError:
            return False
    try:
        # Запрос данных по ссылке на поток с включенной функцией получения данных в потоке. Если статус-код 200,
        # выполняем последующие проверки. Первая - наличие в ссылке расширения с файлом xml, в котором
        # парсятся ссылки на сегменты видео.
        res = requests.get(url, headers=headers, timeout=15, stream=True, allow_redirects=True, verify=False)
        if res.status_code == 200:
            if ".mpd" in res.url:
                if find_mpd(res.text, res.url):
                    return True
                return False

            # Итерация по поученному контенту и проверка первых 64 байт на наличие
            # в получаемом потоке определенного типа данных по их сигнатуре.
            # Переводим байты в hex, проверяем: 1. Наличие ссылки (сигнатура: 68 74...);
            # 2. Проверка наличия в полученных данных html. Если есть, возвращаем False;
            # 3. Проверка наличия в полученных данных текста. Если находим, передаем в
            # функцию для обработки.
            # Если поток получен, но в нем не найдена ни одна сигнатура, будем считать,
            # что в данном потоке передаются медиа-данные, а значит возвращаем True.
            for chunk in res.iter_content(64):
                hex_bytes = " ".join(['{:02X}'.format(byte) for byte in chunk])
                if hex_bytes[0:11] == "68 74 74 70":
                    if load_txt(chunk.decode().strip()):
                        return True
                for it in html:
                    if hex_bytes[0:len(it)].upper() == it:
                        return False
                for tx_pl in text_plain:
                    if hex_bytes[0:len(tx_pl)].upper() == tx_pl:
                        for item in res.iter_content(1024):
                            chunk = chunk + item
                        if lnk := find_link(res.url, chunk.decode()):
                            for ln in lnk:
                                if load_txt(ln):
                                    return True
                                continue
                        elif not lnk:
                            return False
                    continue
                return True
            return False
        return False
    except Exception as ex:
        return True if "ICY 200 OK" in str(ex) else False


def find_link(url: str, text: str) -> (list, bool):
    """
    Поиск ссылок на сегменты потока в полученном тексте запроса.

    :param url: Ссылка на поток, для формирования ссылки на сегмент.
    :param text: Текст запроса для поиска ссылок на сегменты или сегментов.
    :return: Список со ссылками или False.
    """
    lnk = []
    for i in text.splitlines():
        if i.startswith("#"):
            continue
        elif not i.strip():
            continue
        lnk.append(i.strip()) if i.startswith("http") else lnk.append(urljoin(url, i.strip()))
    return lnk if lnk else False


def check_rtmp(url: str) -> bool:
    """
    Проверка медиа-потока по протоколу передачи данных rtmp.
    Для проверки используется OpenCV.

    :param url: Ссылка на поток для проверки.
    :return: True или False в зависимости от результата.
    """
    video = cv2.VideoCapture(url)
    while True:
        grabbed, frame = video.read()
        if grabbed:
            video.release()
            return True
        return False


def verification(url: str, ext: str, count: int, re_ch=False):
    """
    Запуск проверки ссылок на поток и обработка результатов
    возвращенных из функции проверки.

    :param url: Ссылка на поток.
    :param ext: Описание потока.
    :param count: Количество ссылок для перепроверки.
    :param re_ch: Статус перепроверки.
    """
    global status, error, cnt

    if re_ch == "rtmp":
        if check_rtmp(url):
            status.append(f'{ext}\n{url}\n')
            cnt += 1
            print(f"\r{Fore.YELLOW}Check rtmp: {Fore.RESET}{cnt}/{count} | {Fore.YELLOW}Online: {Fore.RESET}"
                  f"{len(status)} | {Fore.YELLOW}Offline: {Fore.RESET}{len(error)}",
                  end="")
            return
        else:
            error.append(f'{ext}\n{url}\n')
            cnt += 1
            print(f"\r{Fore.YELLOW}Check rtmp: {Fore.RESET}{cnt}/{count} | {Fore.YELLOW}Online: {Fore.RESET}"
                  f"{len(status)} | {Fore.YELLOW}Offline: {Fore.RESET}{len(error)}",
                  end="")
            return

    if load_txt(url):
        status.append(f'{ext}\n{url}\n')
        if re_ch:
            cnt += 1
            print(f"\r{Fore.YELLOW}Re-Check: {Fore.RESET}{cnt}/{count} | {Fore.YELLOW}Online: {Fore.RESET}"
                  f"{len(status)} | {Fore.YELLOW}Offline: {Fore.RESET}{len(error)}",
                  end="")
        else:
            print(f"\r{Fore.YELLOW}Check: {Fore.RESET}{len(status) + len(error)}/{count} | {Fore.YELLOW}Online: "
                  f"{Fore.RESET}{len(status)} | {Fore.YELLOW}Offline: {Fore.RESET}{len(error)}",
                  end="")

    else:
        error.append(f'{ext}\n{url}\n')
        if re_ch:
            cnt += 1
            print(f"\r{Fore.YELLOW}Re-Check: {Fore.RESET}{cnt}/{count} | {Fore.YELLOW}Online: {Fore.RESET}"
                  f"{len(status)} | {Fore.YELLOW}Offline: {Fore.RESET}{len(error)}",
                  end="")
        else:
            print(f"\r{Fore.YELLOW}Check: {Fore.RESET}{len(status) + len(error)}/{count} | {Fore.YELLOW}Online: "
                  f"{Fore.RESET}{len(status)} | {Fore.YELLOW}Offline: {Fore.RESET}{len(error)}",
                  end="")


def main():
    """
    Получение данных от пользователя о сканируемой директории.
    Формирование списка файлов в директории.
    Подсчет количества ссылок на потоки и вывод в терминал.
    Запуск потоков для проверки наличия медиа-контента.
    Запуск перепроверки нерабочих потоков.
    Вывод данных о результатах проверки в терминал.
    """
    global status, error, cnt
    path = input("\nВведите путь к директории с m3u: ")
    if not Path(path).exists() or not Path(path).is_dir():
        print("Нет такой директории или указанный путь не является директорией")
        sys.exit(0)

    time_start = time.monotonic()

    files = [fil for fil in Path(path).iterdir() if Path(fil).suffix == ".m3u"]
    rtmp = []

    for nm, file in enumerate(sorted(files)):
        n = 0
        with open(file, 'r', encoding='utf-8') as f:
            for nn in f.readlines():
                if nn.startswith("http"):
                    n += 1
        print(f'\n{Fore.YELLOW}Checked file: {Fore.RESET}{Path(file).name} | {Fore.YELLOW}Count: {Fore.RESET}{n} | '
              f'{Fore.YELLOW}File: {Fore.RESET}{nm + 1}/{len(files)}\n')
        print(Fore.GREEN + "-" * 60 + "\n")

        num = 0
        ext = ""
        with ThreadPoolExecutor(max_workers=5) as executor:
            with open(file, 'r', encoding='utf-8') as fl:
                for line in fl.readlines():
                    if line.startswith("#EXTINF"):
                        ext = line.strip()
                        continue
                    elif line.startswith("rtmp"):
                        rtmp.append(f"{ext}\n{line.strip()}")
                    elif line.startswith("http"):
                        num += 1
                        if len(line.strip().split()) > 1:
                            line = line.strip().replace(" ", "")
                        executor.submit(verification, url=line.strip(), ext=ext, count=n)

        if error:
            re_check = copy.deepcopy(error)
            error.clear()
            print("\r\033[K", end="")
            with ThreadPoolExecutor(max_workers=5) as executor:
                for i in re_check:
                    ext = i.split("\n")[0].strip()
                    url = i.split("\n")[1].strip()
                    executor.submit(verification, url=url, ext=ext, count=len(re_check), re_ch=True)
            re_check.clear()
            cnt = 0

        if rtmp:
            print("\r\033[K", end="")
            with ThreadPoolExecutor(max_workers=5) as executor:
                for i in rtmp:
                    ext = i.split("\n")[0].strip()
                    url = i.split("\n")[1].strip()
                    executor.submit(verification, url=url, ext=ext, count=len(rtmp), re_ch="rtmp")
            rtmp.clear()
            cnt = 0

        save_status_error(str(file))
        print(f"\n{Fore.GREEN}{'All'.ljust(8)}{Fore.RESET}|{str(int((len(status)) + (len(error)))).center(6)}| {Fore.GREEN}channels")
        print(f"{Fore.GREEN}{'Online'.ljust(8)}{Fore.RESET}|{str(int(len(status))).center(6)}| {Fore.GREEN}channels")
        print(f"{Fore.GREEN}{'Offline'.ljust(8)}{Fore.RESET}|{str(int(len(error))).center(6)}| {Fore.GREEN}channels\n")

        print(f'{Fore.GREEN}Scan time {Fore.RESET}| '
              f'{(int(time.monotonic() - time_start) // 3600) % 24:d} ч. '
              f'{(int(time.monotonic() - time_start) // 60) % 60:02d} м. '
              f'{int(time.monotonic() - time_start) % 60:02d} с.\n')
        print(Fore.GREEN + "-" * 60 + "\n")
        Path(file).unlink()
        status.clear()
        error.clear()


if __name__ == "__main__":
    main()

Еще немного поэкспериментировав решил объединить все версии скрипта в одну, плюс к тому, добавил проверку на длину возвращаемого списка со ссылками. Дело в том, что в некоторых плейлистах содержится более 100-та ссылок. Соответственно, если, к примеру, по первым ссылкам на сегменты будет возвращен код ошибки, то итерация продолжиться до упора. А так как ссылок очень много, то может показаться, что скрипт просто завис. Чтобы этого избежать и нужна эта проверка. То есть, если длина возвращаемого списка более 10, то будем итерироваться только по последним 10 элементам. Ну и добавил проверку rtmp. Так что, эта версия на данный момент самая полная. Возможно, что в процессе обнаружиться еще что-то, что потребуется обработать или изменить. Это уже будет отображаться (скорее всего) в репозитории на GitHub, как только я его создам )).

Python:
# pip install colorama requests bs4 lxml

import copy
import sys
import time
from concurrent.futures import ThreadPoolExecutor
from os import system
from pathlib import Path
from re import findall
from urllib.parse import urljoin

import cv2
import requests
from bs4 import BeautifulSoup
from colorama import Fore
from colorama import init
from pytube import YouTube, exceptions

requests.packages.urllib3.disable_warnings()

init()

text_plain = {"23 45 58 54", "5B 70 6C 61 79 6C 69 73", "74 65 78 74 2F 70 6C 61 69 6E", "EF BB BF 00", "FF FE 00 00",
              "74 65 78 74 2F 70 6C 61 69 6E", "3B 20 63 68 61 72 73 65 74 3D", "49 53 4F 2D 38 38 35 39 2D 31",
              "69 73 6F 2D 38 38 35 39 2D 31", "55 54 46 2D 38", "FE FF 00 00"}

html = {"7B 22", "3C 21 44 4F 43 54 59 50 45 20", "48 54 4D 4C TT", "3C 48 54 4D 4C TT", "3C 48 45 41 44 TT",
        "3C 53 43 52 49 50 54 TT", "3C 49 46 52 41 4D 45 TT", "3C 48 31 TT", "3C 44 49 56 TT", "3C 46 4F 4E 54 TT",
        "3C 54 41 42 4C 45 TT", "3C 41 TT", "3C 53 54 59 4C 45 TT", "3C 54 49 54 4C 45 TT", "3C 42 TT", "3C 42 52 TT",
        "3C 42 4F 44 59 TT", "3C 50 TT", "3C 21 2D 2D TT", "3C 3F 78 6D 6C", "25 50 44 46 2D", "4E 6F", "3C 41", "3C",
        "45 52"}

stop_list = ["/errors/", "test_end.ts", "buy.ts", "money.ts", "buy_packet.ts", "empty.ts", "key.ts", "/error/",
             "/NOT_CLIENT/", "http://logo.apk-red.com/tv/hata.jpg", "http://v.viplime.fun/video/user.ts", "/forbidden/",
             "zabava-block-htvod.cdn.ngenix.net", "err-ru.sulfat.li", "/000/", "BanT0ken", "/activate/", "/404/",
             "/405/", "www.cloudflare-terms-of-service-abuse.com", "http://cdn01.lifeyosso.fun:8080/connect/mono.m3u8",
             "http://nl4.iptv.monster/9999/video.m3u8", "http://v.viplime.fun/video/block.ts", "auth.m3u8", "/block/",
             "auth", "logout", "VDO-X-404", "offline", "logout.mp4", "error.tv4.live", "block-ip-video.ts", "/420/",
             "delete.ts", "http://langamepp.com/user.mp4", "http://m.megafonpro.ru/http_errors?error=404"]

headers = {
    "User-Agent": "libmpv",
    "Accept": "*/*",
    "Connection": "keep-alive",
    "Icy-MetaData": "1"
}

status = []
error = []
cnt = 0


def save_status_error(path: str):
    """
    Сохранение содержимого списков status и error с рабочими и нерабочими ссылками.

    :param path: Путь к открытому для проверки файлу.
    """
    global status, error
    print("\n\n" + "-" * 60)
    print(f"\n{Fore.CYAN}SAVE DATA IN FILE\n{'-'*17}\n")
    (Path.cwd() / "checked").mkdir(exist_ok=True)

    if len(status) > 0:
        (Path.cwd() / "checked" / "good").mkdir(exist_ok=True)
        try:
            name = Path.cwd() / "checked" / "good" / f'{Path(path).name.split(Path(path).suffix)[0]}_good_' \
                                                     f'{int(len(status))}.m3u'
        except ValueError:
            name = Path.cwd() / "checked" / "good" / f'{Path(path).name}_good_{int(len(status))}.m3u'
        with open(name, "a", encoding='utf-8') as f:
            f.write("#EXTM3U\n")
            for item in sorted(status):
                f.write(f"{item}")
        print(f'{Fore.GREEN}GOOD SAVED: {Fore.YELLOW}"checked / good" -> "{name.name}"')

    if len(error) > 0:
        (Path.cwd() / "checked" / "error").mkdir(exist_ok=True)
        try:
            name = Path.cwd() / "checked" / "error" / f'{Path(path).name.split(Path(path).suffix)[0]}_error_' \
                                                      f'{int(len(error))}.m3u'
        except ValueError:
            name = Path.cwd() / "checked" / "error" / f'{Path(path).name}_error_{int(len(error))}.m3u'
        with open(name, "a", encoding='utf-8') as f:
            f.write("#EXTM3U\n")
            for item in sorted(error):
                f.write(f"{item}")
        print(f'{Fore.GREEN}ERROR SAVED: {Fore.YELLOW}"checked / error" -> "{name.name}"')

    print(Fore.GREEN + "\n" + "-" * 60)


def find_mpd(txt: str, url: str) -> bool:
    """
    Парсинг плейлиста в формате xml с расширением ".mpd".

    :param txt: Текст запроса.
    :param url: Стартовый url для формирования ссылки на сегмент потока.
    :return: True или False, в зависимости от полученных результатов.
    """
    soup = BeautifulSoup(txt, 'xml')
    seg = soup.find_all('SegmentTemplate')
    seg_list = []
    for i in seg:
        if i.get('initialization') is not None:
            seg_list.append(urljoin(url, i.get('initialization')))
    if seg_list:
        return True if load_txt(seg_list[0]) else False


def find_stop(url: str) -> bool:
    """
    Поиск паттернов с помощью которых можно отсеять некоторые
    каналы с заблокированным содержимым.

    :param url: Ссылка для поиска паттерна.
    :return: True или False в зависимости от результата.
    """
    for st in stop_list:
        if findall(f"{st}", url):
            return True
    return False


def find_link(url: str, text: str) -> (list, bool):
    """
    Поиск ссылок на сегменты потока в полученном тексте запроса.

    :param url: Ссылка на поток, для формирования ссылки на сегмент.
    :param text: Текст запроса для поиска ссылок на сегменты или сегментов.
    :return: Список со ссылками или False.
    """
    lnk = []
    for i in text.splitlines():
        if i.startswith("#"):
            continue
        elif not i.strip():
            continue
        lnk.append(i.strip()) if i.startswith("http") else lnk.append(urljoin(url, i.strip()))
    return lnk if lnk else False


def load_txt(ses, url: str) -> bool:
    """
    Получение данных по ссылке на поток.

    :param ses: Сессия.
    :param url: Ссылка на поток.
    :return: True или False в зависимости от полученного результата.
    """
    # Проверка наличия ссылки на YouTube и запуск
    # проверки наличия идентификатора видео.
    # Если идентификатор есть, ссылка рабочая.
    if url.startswith("https://www.youtube.com") \
            or url.startswith("https://youtube.com") \
            or url.startswith("http://www.youtube.com") \
            or url.startswith("http://youtube.com"):
        try:
            yt = YouTube("http://www.youtube.com/watch?v=6UFOJjDqL_g")
            if yt.video_id:
                return True
        except exceptions.RegexMatchError:
            return False
    try:
        # Запрос данных по ссылке на поток с включенной функцией получения данных в потоке. Если статус-код 200,
        # выполняем последующие проверки. Первая - наличие в ссылке расширения с файлом xml, в котором
        # парсятся ссылки на сегменты видео.
        res = ses.get(url, headers=headers, timeout=10, stream=True, allow_redirects=True, verify=False)
        if res.status_code == 200:
            if find_stop(res.url):
                return False

            if ".mpd" in res.url:
                if find_mpd(res.text, res.url):
                    return True
                return False

            # Итерация по поученному контенту и проверка первых 64 байт на наличие
            # в получаемом потоке определенного типа данных по их сигнатуре.
            # Переводим байты в hex, проверяем: 1. Наличие ссылки (сигнатура: 68 74...);
            # 2. Проверка наличия в полученных данных html. Если есть, возвращаем False;
            # 3. Проверка наличия в полученных данных текста. Если находим, передаем в
            # функцию для обработки.
            # Если поток получен, но в нем не найдена ни одна сигнатура, будем считать,
            # что в данном потоке передаются медиа-данные, а значит возвращаем True.
            for chunk in res.iter_content(64):
                hex_bytes = " ".join(['{:02X}'.format(byte) for byte in chunk])
                if hex_bytes[0:11] == "68 74 74 70":
                    if load_txt(ses, chunk.decode().strip()):
                        return True
                for it in html:
                    if hex_bytes[0:len(it)].upper() == it:
                        return False
                for tx_pl in text_plain:
                    if hex_bytes[0:len(tx_pl)].upper() == tx_pl:
                        for item in res.iter_content(1024):
                            chunk = chunk + item
                        if lnk := find_link(res.url, chunk.decode()):
                            if len(lnk) > 10:
                                lnk = lnk[-10:]
                            for ln in lnk:
                                if find_stop(ln):
                                    return False
                                if load_txt(ses, ln):
                                    return True
                                continue
                        elif not lnk:
                            return False
                    continue
                return True
            return False
        return False
    except Exception as ex:
        return True if "ICY 200 OK" in str(ex) else False


def check_rtmp(url: str) -> bool:
    """
    Проверка медиа-потока по протоколу передачи данных rtmp.
    Для проверки используется OpenCV.

    :param url: Ссылка на поток для проверки.
    :return: True или False в зависимости от результата.
    """
    video = cv2.VideoCapture(url)
    while True:
        grabbed, frame = video.read()
        if grabbed:
            video.release()
            return True
        return False


def verification(url: str, ext: str, count: int, re_ch=False):
    """
    Запуск проверки ссылок на поток и обработка результатов
    возвращенных из функции проверки.

    :param url: Ссылка на поток.
    :param ext: Описание потока.
    :param count: Количество ссылок для перепроверки.
    :param re_ch: Статус перепроверки.
    """
    global status, error, cnt

    if re_ch == "rtmp":
        if check_rtmp(url):
            status.append(f'{ext}\n{url}\n')
            cnt += 1
            print(f"\r{Fore.YELLOW}Check rtmp: {Fore.RESET}{cnt}/{count} | {Fore.YELLOW}Online: {Fore.RESET}"
                  f"{len(status)} | {Fore.YELLOW}Offline: {Fore.RESET}{len(error)}",
                  end="")
            return
        else:
            error.append(f'{ext}\n{url}\n')
            cnt += 1
            print(f"\r{Fore.YELLOW}Check rtmp: {Fore.RESET}{cnt}/{count} | {Fore.YELLOW}Online: {Fore.RESET}"
                  f"{len(status)} | {Fore.YELLOW}Offline: {Fore.RESET}{len(error)}",
                  end="")
            return

    ses = requests.Session()

    if load_txt(ses, url):
        status.append(f'{ext}\n{url}\n')
        if re_ch:
            cnt += 1
            print(f"\r{Fore.YELLOW}Re-Check: {Fore.RESET}{cnt}/{count} | {Fore.YELLOW}Online: {Fore.RESET}"
                  f"{len(status)} | {Fore.YELLOW}Offline: {Fore.RESET}{len(error)}",
                  end="")
        else:
            print(f"\r{Fore.YELLOW}Check: {Fore.RESET}{len(status) + len(error)}/{count} | {Fore.YELLOW}Online: "
                  f"{Fore.RESET}{len(status)} | {Fore.YELLOW}Offline: {Fore.RESET}{len(error)}",
                  end="")
    else:
        error.append(f'{ext}\n{url}\n')
        if re_ch:
            cnt += 1
            print(f"\r{Fore.YELLOW}Re-Check: {Fore.RESET}{cnt}/{count} | {Fore.YELLOW}Online: {Fore.RESET}"
                  f"{len(status)} | {Fore.YELLOW}Offline: {Fore.RESET}{len(error)}",
                  end="")
        else:
            print(f"\r{Fore.YELLOW}Check: {Fore.RESET}{len(status) + len(error)}/{count} | {Fore.YELLOW}Online: "
                  f"{Fore.RESET}{len(status)} | {Fore.YELLOW}Offline: {Fore.RESET}{len(error)}",
                  end="")
    ses.close()


def main():
    """
    Получение данных от пользователя о сканируемой директории.
    Формирование списка файлов в директории.
    Подсчет количества ссылок на потоки и вывод в терминал.
    Запуск потоков для проверки наличия медиа-контента.
    Запуск перепроверки нерабочих потоков.
    Вывод данных о результатах проверки в терминал.
    """
    global status, error, cnt
    # path = input("\nВведите путь к директории с m3u: ")
    # if not Path(path).exists() or not Path(path).is_dir():
    #     print("Нет такой директории или указанный путь не является директорией")
    #     sys.exit(0)
    path = "/home/vev/py_proj/checker_v4/000"

    time_start = time.monotonic()

    files = [fil for fil in Path(path).iterdir() if Path(fil).suffix == ".m3u"]
    rtmp = []

    file = ""
    try:
        for nm, file in enumerate(sorted(files)):
            n = 0
            with open(file, 'r', encoding='utf-8') as f:
                for nn in f.readlines():
                    if nn.startswith("http"):
                        n += 1
            print(f'\n{Fore.YELLOW}Checked file: {Fore.RESET}{Path(file).name} | {Fore.YELLOW}Count: {Fore.RESET}{n} | '
                  f'{Fore.YELLOW}File: {Fore.RESET}{nm + 1}/{len(files)}\n')
            print(Fore.GREEN + "-" * 60 + "\n")

            num = 0
            ext = ""
            with ThreadPoolExecutor(max_workers=5) as executor:
                with open(file, 'r', encoding='utf-8') as fl:
                    for line in fl.readlines():
                        if line.startswith("#EXTINF"):
                            ext = line.strip()
                            continue
                        elif line.startswith("rtmp"):
                            rtmp.append(f"{ext}\n{line.strip()}")
                        elif line.startswith("http"):
                            num += 1
                            if len(line.strip().split()) > 1:
                                line = line.strip().replace(" ", "")
                            executor.submit(verification, url=line.strip(), ext=ext, count=n)

            if error:
                re_check = copy.deepcopy(error)
                error.clear()
                print("\r\033[K", end="")
                with ThreadPoolExecutor(max_workers=5) as executor:
                    for i in re_check:
                        ext = i.split("\n")[0].strip()
                        url = i.split("\n")[1].strip()
                        executor.submit(verification, url=url, ext=ext, count=len(re_check), re_ch=True)
                re_check.clear()
                cnt = 0

            if rtmp:
                print("\r\033[K", end="")
                with ThreadPoolExecutor(max_workers=5) as executor:
                    for i in rtmp:
                        ext = i.split("\n")[0].strip()
                        url = i.split("\n")[1].strip()
                        executor.submit(verification, url=url, ext=ext, count=len(rtmp), re_ch="rtmp")
                rtmp.clear()
                cnt = 0

            save_status_error(str(file))
            print(f"\n{Fore.GREEN}{'All'.ljust(8)}{Fore.RESET}|{str(int((len(status)) + (len(error)))).center(6)}| {Fore.GREEN}channels")
            print(f"{Fore.GREEN}{'Online'.ljust(8)}{Fore.RESET}|{str(int(len(status))).center(6)}| {Fore.GREEN}channels")
            print(f"{Fore.GREEN}{'Offline'.ljust(8)}{Fore.RESET}|{str(int(len(error))).center(6)}| {Fore.GREEN}channels\n")

            print(f'{Fore.GREEN}Scan time {Fore.RESET}| '
                  f'{(int(time.monotonic() - time_start) // 3600) % 24:d} ч. '
                  f'{(int(time.monotonic() - time_start) // 60) % 60:02d} м. '
                  f'{int(time.monotonic() - time_start) % 60:02d} с.\n')
            print(Fore.GREEN + "-" * 60 + "\n")

            status.clear()
            error.clear()

            Path(file).unlink()
        system(f'''notify-send "All operation complete"''')
    except UnicodeDecodeError:
        print(f"\nНе могу декодировать данные: {Path(file).name}\n")
        system(f'''notify-send "Не могу декодировать данные: {Path(file).name}"''')
        sys.exit(0)


if __name__ == "__main__":
    main()

И еще, добавил обработку ошибки декодирования данных, о которой писал в первой части. Если у вас операционная система Windows, желательно удалить строку:

system(f'''notify-send "Не могу декодировать данные: {Path(file).name}"''')

Ну или добавить win10toast, для вывода сообщений в ОС Windows. Его можно установить командой:

pip install win10toast

Вот такая вот получилась история.
А на этом, пожалуй, все.

Спасибо за внимание. Надеюсь, данная информация будет вам полезна
 

Вложения

  • checker_v1.zip
    4,4 КБ · Просмотры: 182
  • checker_v2.zip
    4,8 КБ · Просмотры: 181
  • checker_v3.zip
    4,6 КБ · Просмотры: 188
  • checker_m3u_v5.zip
    5,1 КБ · Просмотры: 266
Последнее редактирование:

InternetMC

Green Team
13.01.2019
11
4
BIT
336
У меня вопрос возник. А возможно ли узнать из самого потока название канала средствами python? И если да, то пните в какую сторону копать.
 

Johan Van

Green Team
13.06.2020
363
691
BIT
395
У меня вопрос возник. А возможно ли узнать из самого потока название канала средствами python? И если да, то пните в какую сторону копать.

Думаю, что да. Некоторые плееры пытаются это сделать. Для примера, IDv3 теги передаются в медиапотоке без видео. Вот только нужно их из потока получить. А значит, поток нужно декодировать. По крайней мере попробовать это сделать. Тем более, это ведь поток. И то, что для вас является началом вещания, для потока это может быть середина. Но, думаю, что название и прочие параметры периодически передаются. Не копал в этом направлении глубоко. Так что, вам надо попробовать понять, что ищут плееры в потоке. Какие, может быть, сигнатуры.
 

InternetMC

Green Team
13.01.2019
11
4
BIT
336
Сдается мне, что на большинстве потоков это напрасный труд. Зачем им записывать метаданные если они занимаются простым рестримом.
Python:
import asyncio
import json

URL = 'https://telecola-live.gcdn.co/cdn_origin2_ssd/matchtvhd/chunks.m3u8'

async def ffmpeg(cmd):
    proc = await asyncio.create_subprocess_shell(
        cmd,
        stdout=asyncio.subprocess.PIPE,
        stderr=asyncio.subprocess.PIPE)

    stdout, stderr = await proc.communicate()

    print(f'[{cmd!r} exited with {proc.returncode}]')
    if stdout:
        print(f'[stdout]\n{stdout.decode()}')
    if stderr:
        print(f'[stderr]\n{stderr.decode()}')

asyncio.run(ffmpeg(f'ffmpeg -hide_banner -user_agent "libmpv" -i {URL}'))
С помощью гугля набросал код, попробовал на некоторых потоках, везде пусто. :(
 

Johan Van

Green Team
13.06.2020
363
691
BIT
395
Сдается мне, что на большинстве потоков это напрасный труд. Зачем им записывать метаданные если они занимаются простым рестримом.
Python:
import asyncio
import json

URL = 'https://telecola-live.gcdn.co/cdn_origin2_ssd/matchtvhd/chunks.m3u8'

async def ffmpeg(cmd):
    proc = await asyncio.create_subprocess_shell(
        cmd,
        stdout=asyncio.subprocess.PIPE,
        stderr=asyncio.subprocess.PIPE)

    stdout, stderr = await proc.communicate()

    print(f'[{cmd!r} exited with {proc.returncode}]')
    if stdout:
        print(f'[stdout]\n{stdout.decode()}')
    if stderr:
        print(f'[stderr]\n{stderr.decode()}')

asyncio.run(ffmpeg(f'ffmpeg -hide_banner -user_agent "libmpv" -i {URL}'))
С помощью гугля набросал код, попробовал на некоторых потоках, везде пусто. :(

Ну, возможно так и есть. Я пробовал получить теги из аудиопотока, то есть из вещания интернет-радиостанции. Там можно что-то забрать. Теги передаются. То есть, при должном упорстве и копании документации, возможно что-то получить можно. А вот с видео не пробовал. Да и необходимости не было. Ну и, как вы абсолютно верно заметили, у большинства это рестрим. Думаю, что на ресиверы никто в потоке не передает названия каналов. Передается программа передач, которая уже принимается ресивером и раскидывается по каналам.
 

NoBro

New member
13.09.2023
2
0
BIT
3
Здравствуйте, подскажите,как запустить скрипт в работу ? не могу что то разобраться,ну и у меня есть плей лист и там порой проскакивает статичная картинка с просьбой купить подписку,я так понимаю с помощью вашего скрипта можно отсеять такие каналы. спасибо за ответы. у меня под рукой есть и runtu & windows 10
 
Мы в соцсетях:

Обучение наступательной кибербезопасности в игровой форме. Начать игру!