Это вторая часть статьи о проверке плейлистов на работоспособность. Небольшие пояснения для тех, кто не читал первую часть. В прошлой статье мы создали небольшие инструменты для обработки плейлистов. Такие скрипты как: разбиение плейлиста на части, объединение, сортировка по именам каналов. А также проверка плейлистов с помощью плеера VLC. В этой же части я попробую рассказать, как я пытался сделать чекер и, что из этого получилось.
Как я уже и говорил в первой части, я не фанат телевизора. Но вот сам принцип проверки потока меня заинтересовал. Сейчас будет немного лирики, поэтому, кого больше интересует практическая часть, этот абзац можно смело пропустить. Я не буду рассказывать сейчас сколько версий чекера я сделал. Просто скажу, что много. А все дело в том, что изначально я исходил из неправильных предпосылок. И здесь я скорее хотел сказать именно о том, как важно в любых проектах продумывать их концепцию и определять цели. Изначально я предположил, что для того, чтобы определить, есть ли медиаконтент или нет, нужно понять, какое содержимое прилетает к нам в ответе на запрос. И тут была первая ошибка. Небольшая, но все же. Для начала я пробовал использовать модуль magic, надстройку над библиотекой libmagic1. И в целом, она работала довольно хорошо. Вот только меня беспокоило то, что за собой она тащит очень большую и главную зависимость, то есть, нужно понять, что в системе есть libmagic1, если нет, сообщить об этом пользователю. Установить. А в Windows вообще, в обязательном порядке установка DLL библиотеки. Потому, через какое-то время от использования данного модуля я отказался. Решил покопать в сторону определения содержимого с помощью сигнатур файлов. И даже собрал внушительную коллекцию сигнатур медиаконтента. И это тоже работало. Но, как-то довольно кривовато. А суть, в принципе, была в том, что нам вообще не нужно понимать, что за поток передается в ответе. То есть, если это не html или текст, то явно содержимое имеющее отношение к медиа. А потому, если у нас 200-й статус код, и мы поняли, что содержимое не текст, то значит, канал рабочий. Таким образом, отвалилось около 150 строк кода, которые я «запилил» для определения контента, вычленения плейлистов и сегментов. В общем, что у меня получилось, смотрите ниже сами.
Что потребуется?
В данном скрипте нужно установить несколько сторонних библиотек. Это собственно requests, bs4 и lxml, чтобы парсить xml, так как бывают и такие плейлисты. colorama — куда же без нее, надо чтобы все было красиво. И pytube. Последний нужен для проверки ссылок на YouTube, так как в плейлистах попадается и такое. Поэтому, пишем в терминале:
Теперь импортируем нужные в работе библиотеки, инициализируем colorama, а также пропишем код для подавления варнинга при использовании в requests параметра verify в значении False.
Определяем множества из сигнатур, которые понадобятся во время работы скрипта. Следует сказать, что сигнатур медиаконтента здесь нет. Только лишь сигнатуры текста и html. Вот собственно и они:
Это все, что мне удалось наковырять по разным сайтам. Думаю, что их больше, но тех, что есть, вполне хватает.
Конечно же, так как мы будем делать запросы, нам понадобятся заголовки:
Этот заголовок я отловил из плеера Celluloid с помощью Wireshark. Ну и списки, в которых будут храниться рабочие и нерабочие каналы, а также счетчик для перепроверки. О ней я поясню, когда дело дойдет до кода.
Ну вот, с импортами и глобальными переменными вроде бы определились. Теперь двигаемся дальше. А дальше у нас будет функция main.
Открываем файл и запускаем потоки
Я сделал скрипт таким образом, чтобы он имел возможность проверять сразу же несколько файлов. Поэтому, для начала запрашиваем у пользователя путь к директории с файлами. Проверяем, есть ли директория, является ли путь директорией. Если да, двигаемся дальше.
Определяем переменную для замера времени выполнения скрипта. Также, создаем список из файлов в директории. Это нужно для того, чтобы скрипт проверял файлы в более-менее отсортированном виде. Потому как в данном случае итерация идет с помощью генератора, и файлы могут выдаваться вразброс.
Запускаем цикл для итерации по списку файлов. Определяем счетчик, который будет содержать количество каналов. В данном случае это будет служить больше в информационных целях. Поэтому, открываем файл на чтение, итерируемся по нему и при каждом нахождении ссылки увеличиваем счетчик. После выводим сообщение в терминал о том, какой файл сейчас обрабатывается, сколько ссылок в файле, а также, если файлов несколько, какой по счету из скольки.
Определяем переменную, в которую будем сохранять описание канала. Создаем с помощью with объект ThreadPoolExecutor, указываем максимальное количество потоков. Создание пула с помощью контекстного менеджера with избавляет нас от необходимости дожидаться завершения потоков. Затем открываем файл, итерируемся по содержимому построчно в цикле. Присваиваем переменной значение описания канала, а если попадается ссылка на канал, запускаем поток с указанием функции для запуска проверки, в которую передаем ссылку, описание канала и количество ссылок в файле.
Теперь часть кода, в котором выполняется перепроверка ошибок. Для начала, для чего это сделано. Так как контент, который мы проверяем, это все же медиаконтент, на инициализацию его требуется время. И иногда просто не хватает времени на чтение данных, то есть, возникает исключение ReadTimeout. Однако, мы уже обратились к серверу и вполне возможно, что еще некоторое время он будет пытаться поддержать соединение. Потому, мы перепроверяем нерабочие ссылки. Помогает, но частично. И здесь ничего не поделаешь. Впрочем, если кто-то подскажет мне, как можно это обойти, я буду очень благодарен. Но, я смотрел в «IPTV Checker», который под Windows. По-умолчанию там выставлен timeout в 5 секунд, но можно выставить его вручную. По моему до 10. Также можно указать количество потоков, по умолчанию 5, ну и количество проходов. То есть, там тоже есть проблема пропуска рабочих ссылок. Вот потому и нужна перепроверка. А счетчик, которые определяется глобально, нужен для того, чтобы выводить информацию пользователю о ходе перепроверки.
Ну и код, в котором выводиться информация для пользователя, а также очищаются списки и переменные, если проверить нужно несколько файлов.
Функция для запуска проверки и обработки результата
Создадим функцию, в которой будем запускать проверку ссылок, а также обрабатывать ответы полученные после получения данных и обработки результатов. Я назвал данную функцию verification(url: str, ext: str, count: int, re_ch=False). На вход она получает ссылку на канал, описание канала, количество файлов при перепроверке, статус перепроверки.
Здесь мы запускаем функцию проверки, получаем от нее ответ. И в зависимости от того, что вернется, True или False распределяем по спискам, с выводом информации для пользователя.
Функция проверки файлов xml
Не всегда плейлисты прилетают в текстовом виде. Иногда они содержат в ответе xml. Соответственно, его нужно распарсить, чтобы получить сегменты видео и составить на них ссылки. Так как данный тип выбивается из основного потока проверок, я сделал для него отдельную функцию. Ее название - find_mpd(txt: str, url: str) -> bool. На вход она принимает текст ответа на запрос, ссылку для сборки ссылок на сегменты видео или аудио.
Ищем определенные теги, забираем из них текст, составляем ссылки и возвращаем одну из функции. А если сегментов найдено не будет, возвращаем False.
Поиск ссылок на плейлисты и сегменты видео или аудио
Если бы вы знали, сколько я писал раньше функций для того, чтобы найти определенный тип данных в тексте с использованием регулярок, думаю, что вы бы ужаснулись )). Все же, я пришел к самому простому (не факт, что самому правильному) решению. Просто проитерироваться по тексту. Создадим функцию find_link(url: str, text: str) -> (list, bool). На вход она получает ссылку на канал, для составления ссылки на сегменты, текст из запроса в котором и выполняется поиск ссылок и сегментов. И возвращает список со ссылками или False, если ссылок найдено не было.
Итерируемся по тексту, пропускаем строки с решетками. Все остальные строки забираем и пытаемся составить ссылки. Если список ссылок составлен, то есть не пуст, возвращаем его из функции.
Выполнение запросов, получение контента и итерация по нему
И еще одна, последняя в данном скрипте функция для того, чтобы выполнять запросы и обрабатывать ответы на них. Я назвал ее load_txt(url: str) -> bool. На вход она получает ссылку, а возвращает True или False.
Итак, первый кусочек кода. Здесь мы проверяем, не является ли ссылка ссылкой на YouTube. Проверяем именно, чтобы она была в начале. Иногда попадаются ссылки, которые ведут на YouTube через сервер. Их мы не рассматриваем. Только прямые ссылки. Если ссылка на Ютуб, загоняем ее в pytube. Пытаемся получить ID-видео. Так-как это быстрее всего. Если ID получен, значит видео существует. В противном случае просто обрабатываем исключение и возвращаем False.
Следующая операция, это выполнение запроса по ссылке и получение ответа. В запросе передаем ссылку, заголовки, указываем timeout, указываем, что запрос читает потоковые данные, явно указываем, что поддерживаем редирект, и отключаем верификацию, то есть проверку сертификатов, так как многие сервера работают еще по протоколу http. Проверяем статус-код. Если он 200, проверяем, нет ли в полученной ссылке расширения. Если есть, запускаем отдельную функцию для парсинга xml. Если нет, двигаемся дальше.
Итерируемся по полученному контенту. Так как он в байтах, считываем первые 64. Этого более чем достаточно для попытки определения содержимого. Переводим байты в hex, для того, чтобы получить сигнатуру. Так как смещение у проверяемых типов файлов нулевое, нам не требуется цикл для итерации по смещениям. Поэтому, для начала итерируемя по списку сигнатур с html. Дело в том, что иногда в ответах прилетает 200 статус код. А вот содержимое совсем не соответствует простому тексту. То есть, некоторые сервера отдают ошибки доступа в виде html, операторы связи, если доступ запрещен, вместо статус-кода возвращают страницу с сообщение (к примеру, Мегафон). То есть, здесь нужно убрать html, так как это явная ошибка. Есть одно исключение, которое мы проверяем в начале. Это сигнатура «68 74 74 70». Дело в том, что иногда сервер отдает ссылку на плейлист в виде http. Эта сигнатура ему и соответствует. Если она получена, мы сразу же делаем рекурсию и получаем данные по полученной ссылке. Затем проверяем html. Если находим сигнатуру, возвращаем False.
Дальше идет проверка наличия в ответе текста. Тут уже прилетает то, что нужно. То есть, текст плейлиста. Определяем сигнатуру текста, итерируемся по содержимому до конца, чтобы получить то, что прилетело полностью. Отправляем в функцию для получения ссылок на сегменты. Если возвращается список ссылок — итерируемся по ним и делаем рекурсию в текущую функцию. И так до того, пока не будет находиться ни html, ни текст. Если не прилетит ошибка, конечно, доступа. Если мы, после цикла рекурсий и 200-го статус-кода получаем содержимое, но оно не является текстом или страницей, значит делаем предположение, что это медиаконтект и возвращаем True.
В исключение здесь добавлена проверка радио. Находил плейлисты, уже наверное второй раз не найду, где вполне рабочее радио и воспроизводиться медиаплеером. Вот только при попытке выполнения запроса вышибает в исключения. Ну и тогда я посмотрел, что прилетает в тексте. То есть, делаю проверку, если есть данное словосочетание, возвращаем True, радио рабочее, на и для остальных исключений — False.
Обработку других статус-кодов в данном контексте, думаю, делать не нужно. Если у нас 300-е коды, то переадресация происходит автоматом. Ну, а если 500-е, то ошибки сервера могут уйти при перепроверке, равно, как и 400-е коды.
Вот такой вот получился чекер. Ниже скриншот, который показывает результаты работы чекера.
И на этом можно было бы и закончить. Так как с технической стороны чекер свою работы делает. К сожалению, да, бывает, что вылетают некоторые каналы в ошибки. Но, с этим трудно что-то поделать. Я пробовал перепроверять рекурсивно до того момента, пока количество ошибок не будет равно одному и тому же значению хотя бы три раза. Но ничего хорошего из этого не получилось. Да, с каждой проверкой количество рабочих каналов увеличивалось. Однако, после перепроверки в плеере они просто не работали. Так что, тут еще важно соблюсти некоторый баланс.
Так вот, технически все вроде бы неплохо. Но, если доступ к контенту запрещен, то у вас на экране будет вот такая:
или такая:
картинки. Есть еще больше всяческих их разновидностей. То есть, контент там есть. А вот толку от него нет. Я решил попробовать избавиться от этих мусорных каналов. Поискал повторяющиеся паттерны в ссылках которые возвращаются от сервера и в ссылках на сегменты. И наковырял небольшой список, который отсеивает большую часть мусора. Иногда он прорывается, но это уже не зависит от фильтра. Просто при проверке ссылка была несколько другой. Я даже специально смотрел в дебаггере.
По итогу получилась еще одна версия скрипта, в которую я добавил одну функцию и две проверки.
Вот добавленная функция:
Стоп-лист, по которому итерируется цикл:
Первый участок кода, куда я добавил проверку:
Второй участок кода:
Но, на этом еще не все. Есть третья версия чекера. На ней я пока что и остановился. Дело в том, что попадаются в плейлистах ссылки, у которых вместо протоколов http или https указан протокол rtmp. Этот протокол для потоковой передачи данных разработанный фирмой Adobe. В некоторых случаях по данному протоколу вещают веб-камеры. Так как данный протокол был проприетарным, то и разработка модулей под него велась не особо активно, так как никто толком не знал полной его спецификации. Я нашел модуль для получения данных, но пока что в нем еще не разобрался. Однако, данный протокол понимает OpenCV. И с его помощью можно осуществить проверку. Вот только есть одна неприятная особенность. Когда в коде возникает исключение, библиотека выбрасывает в терминал длинную строку с исключением. И, к сожалению, я не нашел способ его обработки. Программа работает, но вот эти сообщения не очень приятны. Тем не менее, дело свое он делает.
Третья версия включает проверку данного протокола. Для того, чтобы она работала, нужно установить библиотеку opencv-python. Для ее установки пишем в терминале:
Функция, которая добавляется в этом случае:
Код для запуска функции, который добавляется в функцию verification.
Еще немного поэкспериментировав решил объединить все версии скрипта в одну, плюс к тому, добавил проверку на длину возвращаемого списка со ссылками. Дело в том, что в некоторых плейлистах содержится более 100-та ссылок. Соответственно, если, к примеру, по первым ссылкам на сегменты будет возвращен код ошибки, то итерация продолжиться до упора. А так как ссылок очень много, то может показаться, что скрипт просто завис. Чтобы этого избежать и нужна эта проверка. То есть, если длина возвращаемого списка более 10, то будем итерироваться только по последним 10 элементам. Ну и добавил проверку rtmp. Так что, эта версия на данный момент самая полная. Возможно, что в процессе обнаружиться еще что-то, что потребуется обработать или изменить. Это уже будет отображаться (скорее всего) в репозитории на GitHub, как только я его создам )).
И еще, добавил обработку ошибки декодирования данных, о которой писал в первой части. Если у вас операционная система Windows, желательно удалить строку:
Ну или добавить win10toast, для вывода сообщений в ОС Windows. Его можно установить командой:
Вот такая вот получилась история.
А на этом, пожалуй, все.
Спасибо за внимание. Надеюсь, данная информация будет вам полезна
Как я уже и говорил в первой части, я не фанат телевизора. Но вот сам принцип проверки потока меня заинтересовал. Сейчас будет немного лирики, поэтому, кого больше интересует практическая часть, этот абзац можно смело пропустить. Я не буду рассказывать сейчас сколько версий чекера я сделал. Просто скажу, что много. А все дело в том, что изначально я исходил из неправильных предпосылок. И здесь я скорее хотел сказать именно о том, как важно в любых проектах продумывать их концепцию и определять цели. Изначально я предположил, что для того, чтобы определить, есть ли медиаконтент или нет, нужно понять, какое содержимое прилетает к нам в ответе на запрос. И тут была первая ошибка. Небольшая, но все же. Для начала я пробовал использовать модуль magic, надстройку над библиотекой libmagic1. И в целом, она работала довольно хорошо. Вот только меня беспокоило то, что за собой она тащит очень большую и главную зависимость, то есть, нужно понять, что в системе есть libmagic1, если нет, сообщить об этом пользователю. Установить. А в Windows вообще, в обязательном порядке установка DLL библиотеки. Потому, через какое-то время от использования данного модуля я отказался. Решил покопать в сторону определения содержимого с помощью сигнатур файлов. И даже собрал внушительную коллекцию сигнатур медиаконтента. И это тоже работало. Но, как-то довольно кривовато. А суть, в принципе, была в том, что нам вообще не нужно понимать, что за поток передается в ответе. То есть, если это не html или текст, то явно содержимое имеющее отношение к медиа. А потому, если у нас 200-й статус код, и мы поняли, что содержимое не текст, то значит, канал рабочий. Таким образом, отвалилось около 150 строк кода, которые я «запилил» для определения контента, вычленения плейлистов и сегментов. В общем, что у меня получилось, смотрите ниже сами.
Что потребуется?
В данном скрипте нужно установить несколько сторонних библиотек. Это собственно requests, bs4 и lxml, чтобы парсить xml, так как бывают и такие плейлисты. colorama — куда же без нее, надо чтобы все было красиво. И pytube. Последний нужен для проверки ссылок на YouTube, так как в плейлистах попадается и такое. Поэтому, пишем в терминале:
pip install requests bs4 lxml colorama pytube
Теперь импортируем нужные в работе библиотеки, инициализируем colorama, а также пропишем код для подавления варнинга при использовании в requests параметра verify в значении False.
Python:
import copy
import sys
import time
from concurrent.futures import ThreadPoolExecutor
from pathlib import Path
from urllib.parse import urljoin
import requests
from bs4 import BeautifulSoup
from colorama import Fore
from colorama import init
from pytube import YouTube, exceptions
requests.packages.urllib3.disable_warnings()
init()
Определяем множества из сигнатур, которые понадобятся во время работы скрипта. Следует сказать, что сигнатур медиаконтента здесь нет. Только лишь сигнатуры текста и html. Вот собственно и они:
Python:
text_plain = {"23 45 58 54", "5B 70 6C 61 79 6C 69 73", "74 65 78 74 2F 70 6C 61 69 6E", "EF BB BF 00", "FF FE 00 00",
"74 65 78 74 2F 70 6C 61 69 6E", "3B 20 63 68 61 72 73 65 74 3D", "49 53 4F 2D 38 38 35 39 2D 31",
"69 73 6F 2D 38 38 35 39 2D 31", "55 54 46 2D 38", "FE FF 00 00"}
html = {"7B 22", "3C 21 44 4F 43 54 59 50 45 20", "48 54 4D 4C TT", "3C 48 54 4D 4C TT", "3C 48 45 41 44 TT",
"3C 53 43 52 49 50 54 TT", "3C 49 46 52 41 4D 45 TT", "3C 48 31 TT", "3C 44 49 56 TT", "3C 46 4F 4E 54 TT",
"3C 54 41 42 4C 45 TT", "3C 41 TT", "3C 53 54 59 4C 45 TT", "3C 54 49 54 4C 45 TT", "3C 42 TT", "3C 42 52 TT",
"3C 42 4F 44 59 TT", "3C 50 TT", "3C 21 2D 2D TT", "3C 3F 78 6D 6C", "25 50 44 46 2D", "4E 6F", "3C 41", "3C",
"45 52"}
Это все, что мне удалось наковырять по разным сайтам. Думаю, что их больше, но тех, что есть, вполне хватает.
Конечно же, так как мы будем делать запросы, нам понадобятся заголовки:
Python:
headers = {
"User-Agent": "libmpv",
"Accept": "*/*",
"Connection": "keep-alive",
"Icy-MetaData": "1"
}
Этот заголовок я отловил из плеера Celluloid с помощью Wireshark. Ну и списки, в которых будут храниться рабочие и нерабочие каналы, а также счетчик для перепроверки. О ней я поясню, когда дело дойдет до кода.
Python:
status = []
error = []
cnt = 0
Ну вот, с импортами и глобальными переменными вроде бы определились. Теперь двигаемся дальше. А дальше у нас будет функция main.
Открываем файл и запускаем потоки
Я сделал скрипт таким образом, чтобы он имел возможность проверять сразу же несколько файлов. Поэтому, для начала запрашиваем у пользователя путь к директории с файлами. Проверяем, есть ли директория, является ли путь директорией. Если да, двигаемся дальше.
Python:
global status, error, cnt
path = input("\nВведите путь к директории с m3u: ")
if not Path(path).exists() or not Path(path).is_dir():
print("Нет такой директории или указанный путь не является директорией")
sys.exit(0)
Определяем переменную для замера времени выполнения скрипта. Также, создаем список из файлов в директории. Это нужно для того, чтобы скрипт проверял файлы в более-менее отсортированном виде. Потому как в данном случае итерация идет с помощью генератора, и файлы могут выдаваться вразброс.
Python:
time_start = time.monotonic()
files = [fil for fil in Path(path).iterdir() if Path(fil).suffix == ".m3u"]
Запускаем цикл для итерации по списку файлов. Определяем счетчик, который будет содержать количество каналов. В данном случае это будет служить больше в информационных целях. Поэтому, открываем файл на чтение, итерируемся по нему и при каждом нахождении ссылки увеличиваем счетчик. После выводим сообщение в терминал о том, какой файл сейчас обрабатывается, сколько ссылок в файле, а также, если файлов несколько, какой по счету из скольки.
Python:
for nm, file in enumerate(sorted(files)):
n = 0
with open(file, 'r', encoding='utf-8') as f:
for nn in f.readlines():
if nn.startswith("http"):
n += 1
print(f'\n{Fore.YELLOW}Checked file: {Fore.RESET}{Path(file).name} | {Fore.YELLOW}Count: {Fore.RESET}{n} | '
f'{Fore.YELLOW}File: {Fore.RESET}{nm + 1}/{len(files)}\n')
print(Fore.GREEN + "-" * 60 + "\n")
Определяем переменную, в которую будем сохранять описание канала. Создаем с помощью with объект ThreadPoolExecutor, указываем максимальное количество потоков. Создание пула с помощью контекстного менеджера with избавляет нас от необходимости дожидаться завершения потоков. Затем открываем файл, итерируемся по содержимому построчно в цикле. Присваиваем переменной значение описания канала, а если попадается ссылка на канал, запускаем поток с указанием функции для запуска проверки, в которую передаем ссылку, описание канала и количество ссылок в файле.
Python:
ext = ""
with ThreadPoolExecutor(max_workers=5) as executor:
with open(file, 'r', encoding='utf-8') as fl:
for line in fl.readlines():
if line.startswith("#EXTINF"):
ext = line.strip()
continue
elif line.startswith("http"):
if len(line.strip().split()) > 1:
line = line.strip().replace(" ", "")
executor.submit(verification, url=line.strip(), ext=ext, count=n)
Теперь часть кода, в котором выполняется перепроверка ошибок. Для начала, для чего это сделано. Так как контент, который мы проверяем, это все же медиаконтент, на инициализацию его требуется время. И иногда просто не хватает времени на чтение данных, то есть, возникает исключение ReadTimeout. Однако, мы уже обратились к серверу и вполне возможно, что еще некоторое время он будет пытаться поддержать соединение. Потому, мы перепроверяем нерабочие ссылки. Помогает, но частично. И здесь ничего не поделаешь. Впрочем, если кто-то подскажет мне, как можно это обойти, я буду очень благодарен. Но, я смотрел в «IPTV Checker», который под Windows. По-умолчанию там выставлен timeout в 5 секунд, но можно выставить его вручную. По моему до 10. Также можно указать количество потоков, по умолчанию 5, ну и количество проходов. То есть, там тоже есть проблема пропуска рабочих ссылок. Вот потому и нужна перепроверка. А счетчик, которые определяется глобально, нужен для того, чтобы выводить информацию пользователю о ходе перепроверки.
Python:
if error:
re_check = copy.deepcopy(error)
error.clear()
print("\r\033[K", end="")
with ThreadPoolExecutor(max_workers=5) as executor:
for i in re_check:
ext = i.split("\n")[0].strip()
url = i.split("\n")[1].strip()
executor.submit(verification, url=url, ext=ext, count=len(re_check), re_ch=True)
re_check.clear()
cnt = 0
Ну и код, в котором выводиться информация для пользователя, а также очищаются списки и переменные, если проверить нужно несколько файлов.
Python:
save_status_error(str(file))
print(f"\n{Fore.GREEN}{'All'.ljust(8)}{Fore.RESET}|{str(int((len(status)) + (len(error)))).center(6)}| {Fore.GREEN}channels")
print(f"{Fore.GREEN}{'Online'.ljust(8)}{Fore.RESET}|{str(int(len(status))).center(6)}| {Fore.GREEN}channels")
print(f"{Fore.GREEN}{'Offline'.ljust(8)}{Fore.RESET}|{str(int(len(error))).center(6)}| {Fore.GREEN}channels\n")
print(f'{Fore.GREEN}Scan time {Fore.RESET}| '
f'{(int(time.monotonic() - time_start) // 3600) % 24:d} ч. '
f'{(int(time.monotonic() - time_start) // 60) % 60:02d} м. '
f'{int(time.monotonic() - time_start) % 60:02d} с.\n')
print(Fore.GREEN + "-" * 60 + "\n")
Path(file).unlink()
status.clear()
error.clear()
if __name__ == "__main__":
main()
Python:
def main():
"""
Получение данных от пользователя о сканируемой директории.
Формирование списка файлов в директории.
Подсчет количества ссылок на потоки и вывод в терминал.
Запуск потоков для проверки наличия медиа-контента.
Запуск перепроверки нерабочих потоков.
Вывод данных о результатах проверки в терминал.
"""
global status, error, cnt
path = input("\nВведите путь к директории с m3u: ")
if not Path(path).exists() or not Path(path).is_dir():
print("Нет такой директории или указанный путь не является директорией")
sys.exit(0)
time_start = time.monotonic()
files = [fil for fil in Path(path).iterdir() if Path(fil).suffix == ".m3u"]
for nm, file in enumerate(sorted(files)):
n = 0
with open(file, 'r', encoding='utf-8') as f:
for nn in f.readlines():
if nn.startswith("http"):
n += 1
print(f'\n{Fore.YELLOW}Checked file: {Fore.RESET}{Path(file).name} | {Fore.YELLOW}Count: {Fore.RESET}{n} | '
f'{Fore.YELLOW}File: {Fore.RESET}{nm + 1}/{len(files)}\n')
print(Fore.GREEN + "-" * 60 + "\n")
ext = ""
with ThreadPoolExecutor(max_workers=5) as executor:
with open(file, 'r', encoding='utf-8') as fl:
for line in fl.readlines():
if line.startswith("#EXTINF"):
ext = line.strip()
continue
elif line.startswith("http"):
if len(line.strip().split()) > 1:
line = line.strip().replace(" ", "")
executor.submit(verification, url=line.strip(), ext=ext, count=n)
if error:
re_check = copy.deepcopy(error)
error.clear()
print("\r\033[K", end="")
with ThreadPoolExecutor(max_workers=5) as executor:
for i in re_check:
ext = i.split("\n")[0].strip()
url = i.split("\n")[1].strip()
executor.submit(verification, url=url, ext=ext, count=len(re_check), re_ch=True)
re_check.clear()
cnt = 0
save_status_error(str(file))
print(f"\n{Fore.GREEN}{'All'.ljust(8)}{Fore.RESET}|{str(int((len(status)) + (len(error)))).center(6)}| {Fore.GREEN}channels")
print(f"{Fore.GREEN}{'Online'.ljust(8)}{Fore.RESET}|{str(int(len(status))).center(6)}| {Fore.GREEN}channels")
print(f"{Fore.GREEN}{'Offline'.ljust(8)}{Fore.RESET}|{str(int(len(error))).center(6)}| {Fore.GREEN}channels\n")
print(f'{Fore.GREEN}Scan time {Fore.RESET}| '
f'{(int(time.monotonic() - time_start) // 3600) % 24:d} ч. '
f'{(int(time.monotonic() - time_start) // 60) % 60:02d} м. '
f'{int(time.monotonic() - time_start) % 60:02d} с.\n')
print(Fore.GREEN + "-" * 60 + "\n")
Path(file).unlink()
status.clear()
error.clear()
if __name__ == "__main__":
main()
Функция для запуска проверки и обработки результата
Создадим функцию, в которой будем запускать проверку ссылок, а также обрабатывать ответы полученные после получения данных и обработки результатов. Я назвал данную функцию verification(url: str, ext: str, count: int, re_ch=False). На вход она получает ссылку на канал, описание канала, количество файлов при перепроверке, статус перепроверки.
Здесь мы запускаем функцию проверки, получаем от нее ответ. И в зависимости от того, что вернется, True или False распределяем по спискам, с выводом информации для пользователя.
Python:
global status, error, cnt
if load_txt(url):
status.append(f'{ext}\n{url}\n')
if re_ch:
cnt += 1
print(f"\r{Fore.YELLOW}Re-Check: {Fore.RESET}{cnt}/{count} | {Fore.YELLOW}Online: {Fore.RESET}"
f"{len(status)} | {Fore.YELLOW}Offline: {Fore.RESET}{len(error)}",
end="")
else:
print(f"\r{Fore.YELLOW}Check: {Fore.RESET}{len(status) + len(error)}/{count} | {Fore.YELLOW}Online: "
f"{Fore.RESET}{len(status)} | {Fore.YELLOW}Offline: {Fore.RESET}{len(error)}",
end="")
Python:
def verification(url: str, ext: str, count: int, re_ch=False):
"""
Запуск проверки ссылок на поток и обработка результатов
возвращенных из функции проверки.
:param url: Ссылка на поток.
:param ext: Описание потока.
:param count: Количество ссылок для перепроверки.
:param re_ch: Статус перепроверки.
"""
global status, error, cnt
if load_txt(url):
status.append(f'{ext}\n{url}\n')
if re_ch:
cnt += 1
print(f"\r{Fore.YELLOW}Re-Check: {Fore.RESET}{cnt}/{count} | {Fore.YELLOW}Online: {Fore.RESET}"
f"{len(status)} | {Fore.YELLOW}Offline: {Fore.RESET}{len(error)}",
end="")
else:
print(f"\r{Fore.YELLOW}Check: {Fore.RESET}{len(status) + len(error)}/{count} | {Fore.YELLOW}Online: "
f"{Fore.RESET}{len(status)} | {Fore.YELLOW}Offline: {Fore.RESET}{len(error)}",
end="")
else:
error.append(f'{ext}\n{url}\n')
if re_ch:
cnt += 1
print(f"\r{Fore.YELLOW}Re-Check: {Fore.RESET}{cnt}/{count} | {Fore.YELLOW}Online: {Fore.RESET}"
f"{len(status)} | {Fore.YELLOW}Offline: {Fore.RESET}{len(error)}",
end="")
else:
print(f"\r{Fore.YELLOW}Check: {Fore.RESET}{len(status) + len(error)}/{count} | {Fore.YELLOW}Online: "
f"{Fore.RESET}{len(status)} | {Fore.YELLOW}Offline: {Fore.RESET}{len(error)}",
end="")
Функция проверки файлов xml
Не всегда плейлисты прилетают в текстовом виде. Иногда они содержат в ответе xml. Соответственно, его нужно распарсить, чтобы получить сегменты видео и составить на них ссылки. Так как данный тип выбивается из основного потока проверок, я сделал для него отдельную функцию. Ее название - find_mpd(txt: str, url: str) -> bool. На вход она принимает текст ответа на запрос, ссылку для сборки ссылок на сегменты видео или аудио.
Ищем определенные теги, забираем из них текст, составляем ссылки и возвращаем одну из функции. А если сегментов найдено не будет, возвращаем False.
Python:
def find_mpd(txt: str, url: str) -> bool:
"""
Парсинг плейлиста в формате xml с расширением ".mpd".
:param txt: Текст запроса.
:param url: Стартовый url для формирования ссылки на сегмент потока.
:return: True или False, в зависимости от полученных результатов.
"""
soup = BeautifulSoup(txt, 'xml')
seg = soup.find_all('SegmentTemplate')
seg_list = []
for i in seg:
if i.get('initialization') is not None:
seg_list.append(urljoin(url, i.get('initialization')))
if seg_list:
return True if load_txt(seg_list[0]) else False
Поиск ссылок на плейлисты и сегменты видео или аудио
Если бы вы знали, сколько я писал раньше функций для того, чтобы найти определенный тип данных в тексте с использованием регулярок, думаю, что вы бы ужаснулись )). Все же, я пришел к самому простому (не факт, что самому правильному) решению. Просто проитерироваться по тексту. Создадим функцию find_link(url: str, text: str) -> (list, bool). На вход она получает ссылку на канал, для составления ссылки на сегменты, текст из запроса в котором и выполняется поиск ссылок и сегментов. И возвращает список со ссылками или False, если ссылок найдено не было.
Итерируемся по тексту, пропускаем строки с решетками. Все остальные строки забираем и пытаемся составить ссылки. Если список ссылок составлен, то есть не пуст, возвращаем его из функции.
Python:
def find_link(url: str, text: str) -> (list, bool):
"""
Поиск ссылок на сегменты потока в полученном тексте запроса.
:param url: Ссылка на поток, для формирования ссылки на сегмент.
:param text: Текст запроса для поиска ссылок на сегменты или сегментов.
:return: Список со ссылками или False.
"""
lnk = []
for i in text.splitlines():
if i.startswith("#"):
continue
elif not i.strip():
continue
lnk.append(i.strip()) if i.startswith("http") else lnk.append(urljoin(url, i.strip()))
return lnk if lnk else False
Выполнение запросов, получение контента и итерация по нему
И еще одна, последняя в данном скрипте функция для того, чтобы выполнять запросы и обрабатывать ответы на них. Я назвал ее load_txt(url: str) -> bool. На вход она получает ссылку, а возвращает True или False.
Итак, первый кусочек кода. Здесь мы проверяем, не является ли ссылка ссылкой на YouTube. Проверяем именно, чтобы она была в начале. Иногда попадаются ссылки, которые ведут на YouTube через сервер. Их мы не рассматриваем. Только прямые ссылки. Если ссылка на Ютуб, загоняем ее в pytube. Пытаемся получить ID-видео. Так-как это быстрее всего. Если ID получен, значит видео существует. В противном случае просто обрабатываем исключение и возвращаем False.
Python:
if url.startswith("https://www.youtube.com") \
or url.startswith("https://youtube.com") \
or url.startswith("http://www.youtube.com") \
or url.startswith("http://youtube.com"):
try:
yt = YouTube("http://www.youtube.com/watch?v=6UFOJjDqL_g")
if yt.video_id:
return True
except exceptions.RegexMatchError:
return False
Следующая операция, это выполнение запроса по ссылке и получение ответа. В запросе передаем ссылку, заголовки, указываем timeout, указываем, что запрос читает потоковые данные, явно указываем, что поддерживаем редирект, и отключаем верификацию, то есть проверку сертификатов, так как многие сервера работают еще по протоколу http. Проверяем статус-код. Если он 200, проверяем, нет ли в полученной ссылке расширения. Если есть, запускаем отдельную функцию для парсинга xml. Если нет, двигаемся дальше.
Python:
res = requests.get(url, headers=headers, timeout=15, stream=True, allow_redirects=True, verify=False)
if res.status_code == 200:
if ".mpd" in res.url:
if find_mpd(res.text, res.url):
return True
return False
Итерируемся по полученному контенту. Так как он в байтах, считываем первые 64. Этого более чем достаточно для попытки определения содержимого. Переводим байты в hex, для того, чтобы получить сигнатуру. Так как смещение у проверяемых типов файлов нулевое, нам не требуется цикл для итерации по смещениям. Поэтому, для начала итерируемя по списку сигнатур с html. Дело в том, что иногда в ответах прилетает 200 статус код. А вот содержимое совсем не соответствует простому тексту. То есть, некоторые сервера отдают ошибки доступа в виде html, операторы связи, если доступ запрещен, вместо статус-кода возвращают страницу с сообщение (к примеру, Мегафон). То есть, здесь нужно убрать html, так как это явная ошибка. Есть одно исключение, которое мы проверяем в начале. Это сигнатура «68 74 74 70». Дело в том, что иногда сервер отдает ссылку на плейлист в виде http. Эта сигнатура ему и соответствует. Если она получена, мы сразу же делаем рекурсию и получаем данные по полученной ссылке. Затем проверяем html. Если находим сигнатуру, возвращаем False.
Дальше идет проверка наличия в ответе текста. Тут уже прилетает то, что нужно. То есть, текст плейлиста. Определяем сигнатуру текста, итерируемся по содержимому до конца, чтобы получить то, что прилетело полностью. Отправляем в функцию для получения ссылок на сегменты. Если возвращается список ссылок — итерируемся по ним и делаем рекурсию в текущую функцию. И так до того, пока не будет находиться ни html, ни текст. Если не прилетит ошибка, конечно, доступа. Если мы, после цикла рекурсий и 200-го статус-кода получаем содержимое, но оно не является текстом или страницей, значит делаем предположение, что это медиаконтект и возвращаем True.
Python:
for chunk in res.iter_content(64):
hex_bytes = " ".join(['{:02X}'.format(byte) for byte in chunk])
if hex_bytes[0:11] == "68 74 74 70":
if load_txt(chunk.decode().strip()):
return True
for it in html:
if hex_bytes[0:len(it)].upper() == it:
return False
for tx_pl in text_plain:
if hex_bytes[0:len(tx_pl)].upper() == tx_pl:
for item in res.iter_content(1024):
chunk = chunk + item
if lnk := find_link(res.url, chunk.decode()):
for ln in lnk:
if load_txt(ln):
return True
continue
elif not lnk:
return False
continue
return True
return False
return False
except Exception as ex:
return True if "ICY 200 OK" in str(ex) else False
В исключение здесь добавлена проверка радио. Находил плейлисты, уже наверное второй раз не найду, где вполне рабочее радио и воспроизводиться медиаплеером. Вот только при попытке выполнения запроса вышибает в исключения. Ну и тогда я посмотрел, что прилетает в тексте. То есть, делаю проверку, если есть данное словосочетание, возвращаем True, радио рабочее, на и для остальных исключений — False.
Обработку других статус-кодов в данном контексте, думаю, делать не нужно. Если у нас 300-е коды, то переадресация происходит автоматом. Ну, а если 500-е, то ошибки сервера могут уйти при перепроверке, равно, как и 400-е коды.
Python:
# pip install colorama requests bs4 lxml pytube
import copy
import sys
import time
from concurrent.futures import ThreadPoolExecutor
from pathlib import Path
from urllib.parse import urljoin
import requests
from bs4 import BeautifulSoup
from colorama import Fore
from colorama import init
from pytube import YouTube, exceptions
requests.packages.urllib3.disable_warnings()
init()
text_plain = {"23 45 58 54", "5B 70 6C 61 79 6C 69 73", "74 65 78 74 2F 70 6C 61 69 6E", "EF BB BF 00", "FF FE 00 00",
"74 65 78 74 2F 70 6C 61 69 6E", "3B 20 63 68 61 72 73 65 74 3D", "49 53 4F 2D 38 38 35 39 2D 31",
"69 73 6F 2D 38 38 35 39 2D 31", "55 54 46 2D 38", "FE FF 00 00"}
html = {"7B 22", "3C 21 44 4F 43 54 59 50 45 20", "48 54 4D 4C TT", "3C 48 54 4D 4C TT", "3C 48 45 41 44 TT",
"3C 53 43 52 49 50 54 TT", "3C 49 46 52 41 4D 45 TT", "3C 48 31 TT", "3C 44 49 56 TT", "3C 46 4F 4E 54 TT",
"3C 54 41 42 4C 45 TT", "3C 41 TT", "3C 53 54 59 4C 45 TT", "3C 54 49 54 4C 45 TT", "3C 42 TT", "3C 42 52 TT",
"3C 42 4F 44 59 TT", "3C 50 TT", "3C 21 2D 2D TT", "3C 3F 78 6D 6C", "25 50 44 46 2D", "4E 6F", "3C 41", "3C",
"45 52"}
headers = {
"User-Agent": "libmpv",
"Accept": "*/*",
"Connection": "keep-alive",
"Icy-MetaData": "1"
}
status = []
error = []
cnt = 0
def save_status_error(path: str):
"""
Сохранение содержимого списков status и error с рабочими и нерабочими ссылками.
:param path: Путь к открытому для проверки файлу.
"""
global status, error
print("\n\n" + "-" * 60)
print(f"\n{Fore.CYAN}SAVE DATA IN FILE\n{'-'*17}\n")
(Path.cwd() / "checked").mkdir(exist_ok=True)
if len(status) > 0:
(Path.cwd() / "checked" / "good").mkdir(exist_ok=True)
try:
name = Path.cwd() / "checked" / "good" / f'{Path(path).name.split(Path(path).suffix)[0]}_good_' \
f'{int(len(status))}.m3u'
except ValueError:
name = Path.cwd() / "checked" / "good" / f'{Path(path).name}_good_{int(len(status))}.m3u'
with open(name, "a", encoding='utf-8') as f:
f.write("#EXTM3U\n")
for item in sorted(status):
f.write(f"{item}")
print(f'{Fore.GREEN}GOOD SAVED: {Fore.YELLOW}"checked / good" -> "{name.name}"')
if len(error) > 0:
(Path.cwd() / "checked" / "error").mkdir(exist_ok=True)
try:
name = Path.cwd() / "checked" / "error" / f'{Path(path).name.split(Path(path).suffix)[0]}_error_' \
f'{int(len(error))}.m3u'
except ValueError:
name = Path.cwd() / "checked" / "error" / f'{Path(path).name}_error_{int(len(error))}.m3u'
with open(name, "a", encoding='utf-8') as f:
f.write("#EXTM3U\n")
for item in sorted(error):
f.write(f"{item}")
print(f'{Fore.GREEN}ERROR SAVED: {Fore.YELLOW}"checked / error" -> "{name.name}"')
print(Fore.GREEN + "\n" + "-" * 60)
def find_mpd(txt: str, url: str) -> bool:
"""
Парсинг плейлиста в формате xml с расширением ".mpd".
:param txt: Текст запроса.
:param url: Стартовый url для формирования ссылки на сегмент потока.
:return: True или False, в зависимости от полученных результатов.
"""
soup = BeautifulSoup(txt, 'xml')
seg = soup.find_all('SegmentTemplate')
seg_list = []
for i in seg:
if i.get('initialization') is not None:
seg_list.append(urljoin(url, i.get('initialization')))
if seg_list:
return True if load_txt(seg_list[0]) else False
def load_txt(url: str) -> bool:
"""
Получение данных по ссылке на поток.
:param url: Ссылка на поток.
:return: True или False в зависимости от полученного результата.
"""
# Проверка наличия ссылки на YouTube и запуск
# проверки наличия идентификатора видео.
# Если идентификатор есть, ссылка рабочая.
if url.startswith("https://www.youtube.com") \
or url.startswith("https://youtube.com") \
or url.startswith("http://www.youtube.com") \
or url.startswith("http://youtube.com"):
try:
yt = YouTube("http://www.youtube.com/watch?v=6UFOJjDqL_g")
if yt.video_id:
return True
except exceptions.RegexMatchError:
return False
try:
# Запрос данных по ссылке на поток с включенной функцией получения данных в потоке. Если статус-код 200,
# выполняем последующие проверки. Первая - наличие в ссылке расширения с файлом xml, в котором
# парсятся ссылки на сегменты видео.
res = requests.get(url, headers=headers, timeout=15, stream=True, allow_redirects=True, verify=False)
if res.status_code == 200:
if ".mpd" in res.url:
if find_mpd(res.text, res.url):
return True
return False
# Итерация по поученному контенту и проверка первых 64 байт на наличие
# в получаемом потоке определенного типа данных по их сигнатуре.
# Переводим байты в hex, проверяем: 1. Наличие ссылки (сигнатура: 68 74...);
# 2. Проверка наличия в полученных данных html. Если есть, возвращаем False;
# 3. Проверка наличия в полученных данных текста. Если находим, передаем в
# функцию для обработки.
# Если поток получен, но в нем не найдена ни одна сигнатура, будем считать,
# что в данном потоке передаются медиа-данные, а значит возвращаем True.
for chunk in res.iter_content(64):
hex_bytes = " ".join(['{:02X}'.format(byte) for byte in chunk])
if hex_bytes[0:11] == "68 74 74 70":
if load_txt(chunk.decode().strip()):
return True
for it in html:
if hex_bytes[0:len(it)].upper() == it:
return False
for tx_pl in text_plain:
if hex_bytes[0:len(tx_pl)].upper() == tx_pl:
for item in res.iter_content(1024):
chunk = chunk + item
if lnk := find_link(res.url, chunk.decode()):
for ln in lnk:
if load_txt(ln):
return True
continue
elif not lnk:
return False
continue
return True
return False
return False
except Exception as ex:
return True if "ICY 200 OK" in str(ex) else False
def find_link(url: str, text: str) -> (list, bool):
"""
Поиск ссылок на сегменты потока в полученном тексте запроса.
:param url: Ссылка на поток, для формирования ссылки на сегмент.
:param text: Текст запроса для поиска ссылок на сегменты или сегментов.
:return: Список со ссылками или False.
"""
lnk = []
for i in text.splitlines():
if i.startswith("#"):
continue
elif not i.strip():
continue
lnk.append(i.strip()) if i.startswith("http") else lnk.append(urljoin(url, i.strip()))
return lnk if lnk else False
def verification(url: str, ext: str, count: int, re_ch=False):
"""
Запуск проверки ссылок на поток и обработка результатов
возвращенных из функции проверки.
:param url: Ссылка на поток.
:param ext: Описание потока.
:param count: Количество ссылок для перепроверки.
:param re_ch: Статус перепроверки.
"""
global status, error, cnt
if load_txt(url):
status.append(f'{ext}\n{url}\n')
if re_ch:
cnt += 1
print(f"\r{Fore.YELLOW}Re-Check: {Fore.RESET}{cnt}/{count} | {Fore.YELLOW}Online: {Fore.RESET}"
f"{len(status)} | {Fore.YELLOW}Offline: {Fore.RESET}{len(error)}",
end="")
else:
print(f"\r{Fore.YELLOW}Check: {Fore.RESET}{len(status) + len(error)}/{count} | {Fore.YELLOW}Online: "
f"{Fore.RESET}{len(status)} | {Fore.YELLOW}Offline: {Fore.RESET}{len(error)}",
end="")
else:
error.append(f'{ext}\n{url}\n')
if re_ch:
cnt += 1
print(f"\r{Fore.YELLOW}Re-Check: {Fore.RESET}{cnt}/{count} | {Fore.YELLOW}Online: {Fore.RESET}"
f"{len(status)} | {Fore.YELLOW}Offline: {Fore.RESET}{len(error)}",
end="")
else:
print(f"\r{Fore.YELLOW}Check: {Fore.RESET}{len(status) + len(error)}/{count} | {Fore.YELLOW}Online: "
f"{Fore.RESET}{len(status)} | {Fore.YELLOW}Offline: {Fore.RESET}{len(error)}",
end="")
def main():
"""
Получение данных от пользователя о сканируемой директории.
Формирование списка файлов в директории.
Подсчет количества ссылок на потоки и вывод в терминал.
Запуск потоков для проверки наличия медиа-контента.
Запуск перепроверки нерабочих потоков.
Вывод данных о результатах проверки в терминал.
"""
global status, error, cnt
path = input("\nВведите путь к директории с m3u: ")
if not Path(path).exists() or not Path(path).is_dir():
print("Нет такой директории или указанный путь не является директорией")
sys.exit(0)
time_start = time.monotonic()
files = [fil for fil in Path(path).iterdir() if Path(fil).suffix == ".m3u"]
for nm, file in enumerate(sorted(files)):
n = 0
with open(file, 'r', encoding='utf-8') as f:
for nn in f.readlines():
if nn.startswith("http"):
n += 1
print(f'\n{Fore.YELLOW}Checked file: {Fore.RESET}{Path(file).name} | {Fore.YELLOW}Count: {Fore.RESET}{n} | '
f'{Fore.YELLOW}File: {Fore.RESET}{nm + 1}/{len(files)}\n')
print(Fore.GREEN + "-" * 60 + "\n")
ext = ""
with ThreadPoolExecutor(max_workers=5) as executor:
with open(file, 'r', encoding='utf-8') as fl:
for line in fl.readlines():
if line.startswith("#EXTINF"):
ext = line.strip()
continue
elif line.startswith("http"):
if len(line.strip().split()) > 1:
line = line.strip().replace(" ", "")
executor.submit(verification, url=line.strip(), ext=ext, count=n)
if error:
re_check = copy.deepcopy(error)
error.clear()
print("\r\033[K", end="")
with ThreadPoolExecutor(max_workers=5) as executor:
for i in re_check:
ext = i.split("\n")[0].strip()
url = i.split("\n")[1].strip()
executor.submit(verification, url=url, ext=ext, count=len(re_check), re_ch=True)
re_check.clear()
cnt = 0
save_status_error(str(file))
print(f"\n{Fore.GREEN}{'All'.ljust(8)}{Fore.RESET}|{str(int((len(status)) + (len(error)))).center(6)}| {Fore.GREEN}channels")
print(f"{Fore.GREEN}{'Online'.ljust(8)}{Fore.RESET}|{str(int(len(status))).center(6)}| {Fore.GREEN}channels")
print(f"{Fore.GREEN}{'Offline'.ljust(8)}{Fore.RESET}|{str(int(len(error))).center(6)}| {Fore.GREEN}channels\n")
print(f'{Fore.GREEN}Scan time {Fore.RESET}| '
f'{(int(time.monotonic() - time_start) // 3600) % 24:d} ч. '
f'{(int(time.monotonic() - time_start) // 60) % 60:02d} м. '
f'{int(time.monotonic() - time_start) % 60:02d} с.\n')
print(Fore.GREEN + "-" * 60 + "\n")
Path(file).unlink()
status.clear()
error.clear()
if __name__ == "__main__":
main()
Вот такой вот получился чекер. Ниже скриншот, который показывает результаты работы чекера.
И на этом можно было бы и закончить. Так как с технической стороны чекер свою работы делает. К сожалению, да, бывает, что вылетают некоторые каналы в ошибки. Но, с этим трудно что-то поделать. Я пробовал перепроверять рекурсивно до того момента, пока количество ошибок не будет равно одному и тому же значению хотя бы три раза. Но ничего хорошего из этого не получилось. Да, с каждой проверкой количество рабочих каналов увеличивалось. Однако, после перепроверки в плеере они просто не работали. Так что, тут еще важно соблюсти некоторый баланс.
Так вот, технически все вроде бы неплохо. Но, если доступ к контенту запрещен, то у вас на экране будет вот такая:
или такая:
картинки. Есть еще больше всяческих их разновидностей. То есть, контент там есть. А вот толку от него нет. Я решил попробовать избавиться от этих мусорных каналов. Поискал повторяющиеся паттерны в ссылках которые возвращаются от сервера и в ссылках на сегменты. И наковырял небольшой список, который отсеивает большую часть мусора. Иногда он прорывается, но это уже не зависит от фильтра. Просто при проверке ссылка была несколько другой. Я даже специально смотрел в дебаггере.
По итогу получилась еще одна версия скрипта, в которую я добавил одну функцию и две проверки.
Вот добавленная функция:
Python:
def find_stop(url: str) -> bool:
"""
Поиск паттернов с помощью которых можно отсеять некоторые
каналы с заблокированным содержимым.
:param url: Ссылка для поиска паттерна.
:return: True или False в зависимости от результата.
"""
for st in stop_list:
if findall(f"{st}", url):
return True
return False
Стоп-лист, по которому итерируется цикл:
Python:
stop_list = ["/errors/", "test_end.ts", "buy.ts", "money.ts", "buy_packet.ts", "empty.ts", "key.ts", "/error/",
"/NOT_CLIENT/", "http://logo.apk-red.com/tv/hata.jpg", "http://v.viplime.fun/video/user.ts", "/forbidden/",
"zabava-block-htvod.cdn.ngenix.net", "err-ru.sulfat.li", "/000/", "BanT0ken", "/activate/", "/404/",
"/405/", "www.cloudflare-terms-of-service-abuse.com", "http://cdn01.lifeyosso.fun:8080/connect/mono.m3u8",
"http://nl4.iptv.monster/9999/video.m3u8", "http://v.viplime.fun/video/block.ts", "auth.m3u8", "/block/",
"auth", "logout", "VDO-X-404", "offline", "logout.mp4", "error.tv4.live", "block-ip-video.ts"]
Первый участок кода, куда я добавил проверку:
Python:
res = requests.get(url, headers=headers, timeout=15, stream=True, allow_redirects=True, verify=False)
if res.status_code == 200:
if find_stop(res.url):
return False
Второй участок кода:
Python:
for tx_pl in text_plain:
if hex_bytes[0:len(tx_pl)].upper() == tx_pl:
for item in res.iter_content(1024):
chunk = chunk + item
if lnk := find_link(res.url, chunk.decode()):
for ln in lnk:
if find_stop(ln):
return False
if load_txt(ln):
return True
continue
Python:
# pip install colorama requests bs4 lxml
import copy
import sys
import time
from concurrent.futures import ThreadPoolExecutor
from pathlib import Path
from re import findall
from urllib.parse import urljoin
import requests
from bs4 import BeautifulSoup
from colorama import Fore
from colorama import init
from pytube import YouTube, exceptions
requests.packages.urllib3.disable_warnings()
init()
text_plain = {"23 45 58 54", "5B 70 6C 61 79 6C 69 73", "74 65 78 74 2F 70 6C 61 69 6E", "EF BB BF 00", "FF FE 00 00",
"74 65 78 74 2F 70 6C 61 69 6E", "3B 20 63 68 61 72 73 65 74 3D", "49 53 4F 2D 38 38 35 39 2D 31",
"69 73 6F 2D 38 38 35 39 2D 31", "55 54 46 2D 38", "FE FF 00 00"}
html = {"7B 22", "3C 21 44 4F 43 54 59 50 45 20", "48 54 4D 4C TT", "3C 48 54 4D 4C TT", "3C 48 45 41 44 TT",
"3C 53 43 52 49 50 54 TT", "3C 49 46 52 41 4D 45 TT", "3C 48 31 TT", "3C 44 49 56 TT", "3C 46 4F 4E 54 TT",
"3C 54 41 42 4C 45 TT", "3C 41 TT", "3C 53 54 59 4C 45 TT", "3C 54 49 54 4C 45 TT", "3C 42 TT", "3C 42 52 TT",
"3C 42 4F 44 59 TT", "3C 50 TT", "3C 21 2D 2D TT", "3C 3F 78 6D 6C", "25 50 44 46 2D", "4E 6F", "3C 41", "3C",
"45 52"}
stop_list = ["/errors/", "test_end.ts", "buy.ts", "money.ts", "buy_packet.ts", "empty.ts", "key.ts", "/error/",
"/NOT_CLIENT/", "http://logo.apk-red.com/tv/hata.jpg", "http://v.viplime.fun/video/user.ts", "/forbidden/",
"zabava-block-htvod.cdn.ngenix.net", "err-ru.sulfat.li", "/000/", "BanT0ken", "/activate/", "/404/",
"/405/", "www.cloudflare-terms-of-service-abuse.com", "http://cdn01.lifeyosso.fun:8080/connect/mono.m3u8",
"http://nl4.iptv.monster/9999/video.m3u8", "http://v.viplime.fun/video/block.ts", "auth.m3u8", "/block/",
"auth", "logout", "VDO-X-404", "offline", "logout.mp4", "error.tv4.live", "block-ip-video.ts"]
headers = {
"User-Agent": "libmpv",
"Accept": "*/*",
"Connection": "keep-alive",
"Icy-MetaData": "1"
}
status = []
error = []
cnt = 0
def save_status_error(path: str):
"""
Сохранение содержимого списков status и error с рабочими и нерабочими ссылками.
:param path: Путь к открытому для проверки файлу.
"""
global status, error
print("\n\n" + "-" * 60)
print(f"\n{Fore.CYAN}SAVE DATA IN FILE\n{'-'*17}\n")
(Path.cwd() / "checked").mkdir(exist_ok=True)
if len(status) > 0:
(Path.cwd() / "checked" / "good").mkdir(exist_ok=True)
try:
name = Path.cwd() / "checked" / "good" / f'{Path(path).name.split(Path(path).suffix)[0]}_good_' \
f'{int(len(status))}.m3u'
except ValueError:
name = Path.cwd() / "checked" / "good" / f'{Path(path).name}_good_{int(len(status))}.m3u'
with open(name, "a", encoding='utf-8') as f:
f.write("#EXTM3U\n")
for item in sorted(status):
f.write(f"{item}")
print(f'{Fore.GREEN}GOOD SAVED: {Fore.YELLOW}"checked / good" -> "{name.name}"')
if len(error) > 0:
(Path.cwd() / "checked" / "error").mkdir(exist_ok=True)
try:
name = Path.cwd() / "checked" / "error" / f'{Path(path).name.split(Path(path).suffix)[0]}_error_' \
f'{int(len(error))}.m3u'
except ValueError:
name = Path.cwd() / "checked" / "error" / f'{Path(path).name}_error_{int(len(error))}.m3u'
with open(name, "a", encoding='utf-8') as f:
f.write("#EXTM3U\n")
for item in sorted(error):
f.write(f"{item}")
print(f'{Fore.GREEN}ERROR SAVED: {Fore.YELLOW}"checked / error" -> "{name.name}"')
print(Fore.GREEN + "\n" + "-" * 60)
def find_mpd(txt: str, url: str) -> bool:
"""
Парсинг плейлиста в формате xml с расширением ".mpd".
:param txt: Текст запроса.
:param url: Стартовый url для формирования ссылки на сегмент потока.
:return: True или False, в зависимости от полученных результатов.
"""
soup = BeautifulSoup(txt, 'xml')
seg = soup.find_all('SegmentTemplate')
seg_list = []
for i in seg:
if i.get('initialization') is not None:
seg_list.append(urljoin(url, i.get('initialization')))
if seg_list:
return True if load_txt(seg_list[0]) else False
def find_stop(url: str) -> bool:
"""
Поиск паттернов с помощью которых можно отсеять некоторые
каналы с заблокированным содержимым.
:param url: Ссылка для поиска паттерна.
:return: True или False в зависимости от результата.
"""
for st in stop_list:
if findall(f"{st}", url):
return True
return False
def load_txt(url: str) -> bool:
"""
Получение данных по ссылке на поток.
:param url: Ссылка на поток.
:return: True или False в зависимости от полученного результата.
"""
# Проверка наличия ссылки на YouTube и запуск
# проверки наличия идентификатора видео.
# Если идентификатор есть, ссылка рабочая.
if url.startswith("https://www.youtube.com") \
or url.startswith("https://youtube.com") \
or url.startswith("http://www.youtube.com") \
or url.startswith("http://youtube.com"):
try:
yt = YouTube("http://www.youtube.com/watch?v=6UFOJjDqL_g")
if yt.video_id:
return True
except exceptions.RegexMatchError:
return False
try:
# Запрос данных по ссылке на поток с включенной функцией получения данных в потоке. Если статус-код 200,
# выполняем последующие проверки. Первая - наличие в ссылке расширения с файлом xml, в котором
# парсятся ссылки на сегменты видео.
res = requests.get(url, headers=headers, timeout=15, stream=True, allow_redirects=True, verify=False)
if res.status_code == 200:
if find_stop(res.url):
return False
if ".mpd" in res.url:
if find_mpd(res.text, res.url):
return True
return False
# Итерация по поученному контенту и проверка первых 64 байт на наличие
# в получаемом потоке определенного типа данных по их сигнатуре.
# Переводим байты в hex, проверяем: 1. Наличие ссылки (сигнатура: 68 74...);
# 2. Проверка наличия в полученных данных html. Если есть, возвращаем False;
# 3. Проверка наличия в полученных данных текста. Если находим, передаем в
# функцию для обработки.
# Если поток получен, но в нем не найдена ни одна сигнатура, будем считать,
# что в данном потоке передаются медиа-данные, а значит возвращаем True.
for chunk in res.iter_content(64):
hex_bytes = " ".join(['{:02X}'.format(byte) for byte in chunk])
if hex_bytes[0:11] == "68 74 74 70":
if load_txt(chunk.decode().strip()):
return True
for it in html:
if hex_bytes[0:len(it)].upper() == it:
return False
for tx_pl in text_plain:
if hex_bytes[0:len(tx_pl)].upper() == tx_pl:
for item in res.iter_content(1024):
chunk = chunk + item
if lnk := find_link(res.url, chunk.decode()):
for ln in lnk:
if find_stop(ln):
return False
if load_txt(ln):
return True
continue
elif not lnk:
return False
continue
return True
return False
return False
except Exception as ex:
return True if "ICY 200 OK" in str(ex) else False
def find_link(url: str, text: str) -> (list, bool):
"""
Поиск ссылок на сегменты потока в полученном тексте запроса.
:param url: Ссылка на поток, для формирования ссылки на сегмент.
:param text: Текст запроса для поиска ссылок на сегменты или сегментов.
:return: Список со ссылками или False.
"""
lnk = []
for i in text.splitlines():
if i.startswith("#"):
continue
elif not i.strip():
continue
lnk.append(i.strip()) if i.startswith("http") else lnk.append(urljoin(url, i.strip()))
return lnk if lnk else False
def verification(url: str, ext: str, count: int, re_ch=False):
"""
Запуск проверки ссылок на поток и обработка результатов
возвращенных из функции проверки.
:param url: Ссылка на поток.
:param ext: Описание потока.
:param count: Количество ссылок для перепроверки.
:param re_ch: Статус перепроверки.
"""
global status, error, cnt
if load_txt(url):
status.append(f'{ext}\n{url}\n')
if re_ch:
cnt += 1
print(f"\r{Fore.YELLOW}Re-Check: {Fore.RESET}{cnt}/{count} | {Fore.YELLOW}Online: {Fore.RESET}"
f"{len(status)} | {Fore.YELLOW}Offline: {Fore.RESET}{len(error)}",
end="")
else:
print(f"\r{Fore.YELLOW}Check: {Fore.RESET}{len(status) + len(error)}/{count} | {Fore.YELLOW}Online: "
f"{Fore.RESET}{len(status)} | {Fore.YELLOW}Offline: {Fore.RESET}{len(error)}",
end="")
else:
error.append(f'{ext}\n{url}\n')
if re_ch:
cnt += 1
print(f"\r{Fore.YELLOW}Re-Check: {Fore.RESET}{cnt}/{count} | {Fore.YELLOW}Online: {Fore.RESET}"
f"{len(status)} | {Fore.YELLOW}Offline: {Fore.RESET}{len(error)}",
end="")
else:
print(f"\r{Fore.YELLOW}Check: {Fore.RESET}{len(status) + len(error)}/{count} | {Fore.YELLOW}Online: "
f"{Fore.RESET}{len(status)} | {Fore.YELLOW}Offline: {Fore.RESET}{len(error)}",
end="")
def main():
"""
Получение данных от пользователя о сканируемой директории.
Формирование списка файлов в директории.
Подсчет количества ссылок на потоки и вывод в терминал.
Запуск потоков для проверки наличия медиа-контента.
Запуск перепроверки нерабочих потоков.
Вывод данных о результатах проверки в терминал.
"""
global status, error, cnt
path = input("\nВведите путь к директории с m3u: ")
if not Path(path).exists() or not Path(path).is_dir():
print("Нет такой директории или указанный путь не является директорией")
sys.exit(0)
time_start = time.monotonic()
files = [fil for fil in Path(path).iterdir() if Path(fil).suffix == ".m3u"]
for nm, file in enumerate(sorted(files)):
n = 0
with open(file, 'r', encoding='utf-8') as f:
for nn in f.readlines():
if nn.startswith("http"):
n += 1
print(f'\n{Fore.YELLOW}Checked file: {Fore.RESET}{Path(file).name} | {Fore.YELLOW}Count: {Fore.RESET}{n} | '
f'{Fore.YELLOW}File: {Fore.RESET}{nm + 1}/{len(files)}\n')
print(Fore.GREEN + "-" * 60 + "\n")
num = 0
ext = ""
with ThreadPoolExecutor(max_workers=5) as executor:
with open(file, 'r', encoding='utf-8') as fl:
for line in fl.readlines():
if line.startswith("#EXTINF"):
ext = line.strip()
continue
elif line.startswith("http"):
num += 1
if len(line.strip().split()) > 1:
line = line.strip().replace(" ", "")
executor.submit(verification, url=line.strip(), ext=ext, count=n)
if error:
re_check = copy.deepcopy(error)
error.clear()
print("\r\033[K", end="")
with ThreadPoolExecutor(max_workers=5) as executor:
for i in re_check:
ext = i.split("\n")[0].strip()
url = i.split("\n")[1].strip()
executor.submit(verification, url=url, ext=ext, count=len(re_check), re_ch=True)
re_check.clear()
cnt = 0
save_status_error(str(file))
print(f"\n{Fore.GREEN}{'All'.ljust(8)}{Fore.RESET}|{str(int((len(status)) + (len(error)))).center(6)}| {Fore.GREEN}channels")
print(f"{Fore.GREEN}{'Online'.ljust(8)}{Fore.RESET}|{str(int(len(status))).center(6)}| {Fore.GREEN}channels")
print(f"{Fore.GREEN}{'Offline'.ljust(8)}{Fore.RESET}|{str(int(len(error))).center(6)}| {Fore.GREEN}channels\n")
print(f'{Fore.GREEN}Scan time {Fore.RESET}| '
f'{(int(time.monotonic() - time_start) // 3600) % 24:d} ч. '
f'{(int(time.monotonic() - time_start) // 60) % 60:02d} м. '
f'{int(time.monotonic() - time_start) % 60:02d} с.\n')
print(Fore.GREEN + "-" * 60 + "\n")
status.clear()
error.clear()
# Path(file).unlink()
if __name__ == "__main__":
main()
Но, на этом еще не все. Есть третья версия чекера. На ней я пока что и остановился. Дело в том, что попадаются в плейлистах ссылки, у которых вместо протоколов http или https указан протокол rtmp. Этот протокол для потоковой передачи данных разработанный фирмой Adobe. В некоторых случаях по данному протоколу вещают веб-камеры. Так как данный протокол был проприетарным, то и разработка модулей под него велась не особо активно, так как никто толком не знал полной его спецификации. Я нашел модуль для получения данных, но пока что в нем еще не разобрался. Однако, данный протокол понимает OpenCV. И с его помощью можно осуществить проверку. Вот только есть одна неприятная особенность. Когда в коде возникает исключение, библиотека выбрасывает в терминал длинную строку с исключением. И, к сожалению, я не нашел способ его обработки. Программа работает, но вот эти сообщения не очень приятны. Тем не менее, дело свое он делает.
Третья версия включает проверку данного протокола. Для того, чтобы она работала, нужно установить библиотеку opencv-python. Для ее установки пишем в терминале:
pip install opencv-python
Функция, которая добавляется в этом случае:
Python:
def check_rtmp(url: str) -> bool:
"""
Проверка медиа-потока по протоколу передачи данных rtmp.
Для проверки используется OpenCV.
:param url: Ссылка на поток для проверки.
:return: True или False в зависимости от результата.
"""
video = cv2.VideoCapture(url)
while True:
grabbed, frame = video.read()
if grabbed:
video.release()
return True
return False
Код для запуска функции, который добавляется в функцию verification.
Python:
if re_ch == "rtmp":
if check_rtmp(url):
status.append(f'{ext}\n{url}\n')
cnt += 1
print(f"\r{Fore.YELLOW}Check rtmp: {Fore.RESET}{cnt}/{count} | {Fore.YELLOW}Online: {Fore.RESET}"
f"{len(status)} | {Fore.YELLOW}Offline: {Fore.RESET}{len(error)}",
end="")
return
else:
error.append(f'{ext}\n{url}\n')
cnt += 1
print(f"\r{Fore.YELLOW}Check rtmp: {Fore.RESET}{cnt}/{count} | {Fore.YELLOW}Online: {Fore.RESET}"
f"{len(status)} | {Fore.YELLOW}Offline: {Fore.RESET}{len(error)}",
end="")
return
Python:
# pip install colorama requests bs4 lxml
import copy
import sys
import time
from concurrent.futures import ThreadPoolExecutor
from pathlib import Path
from urllib.parse import urljoin
import cv2
import requests
from bs4 import BeautifulSoup
from colorama import Fore
from colorama import init
from pytube import YouTube, exceptions
requests.packages.urllib3.disable_warnings()
init()
text_plain = {"23 45 58 54", "5B 70 6C 61 79 6C 69 73", "74 65 78 74 2F 70 6C 61 69 6E", "EF BB BF 00", "FF FE 00 00",
"74 65 78 74 2F 70 6C 61 69 6E", "3B 20 63 68 61 72 73 65 74 3D", "49 53 4F 2D 38 38 35 39 2D 31",
"69 73 6F 2D 38 38 35 39 2D 31", "55 54 46 2D 38", "FE FF 00 00"}
html = {"7B 22", "3C 21 44 4F 43 54 59 50 45 20", "48 54 4D 4C TT", "3C 48 54 4D 4C TT", "3C 48 45 41 44 TT",
"3C 53 43 52 49 50 54 TT", "3C 49 46 52 41 4D 45 TT", "3C 48 31 TT", "3C 44 49 56 TT", "3C 46 4F 4E 54 TT",
"3C 54 41 42 4C 45 TT", "3C 41 TT", "3C 53 54 59 4C 45 TT", "3C 54 49 54 4C 45 TT", "3C 42 TT", "3C 42 52 TT",
"3C 42 4F 44 59 TT", "3C 50 TT", "3C 21 2D 2D TT", "3C 3F 78 6D 6C", "25 50 44 46 2D", "4E 6F", "3C 41", "3C",
"45 52"}
headers = {
"User-Agent": "libmpv",
"Accept": "*/*",
"Connection": "keep-alive",
"Icy-MetaData": "1"
}
status = []
error = []
cnt = 0
def save_status_error(path: str):
"""
Сохранение содержимого списков status и error с рабочими и нерабочими ссылками.
:param path: Путь к открытому для проверки файлу.
"""
global status, error
print("\n\n" + "-" * 60)
print(f"\n{Fore.CYAN}SAVE DATA IN FILE\n{'-'*17}\n")
(Path.cwd() / "checked").mkdir(exist_ok=True)
if len(status) > 0:
(Path.cwd() / "checked" / "good").mkdir(exist_ok=True)
try:
name = Path.cwd() / "checked" / "good" / f'{Path(path).name.split(Path(path).suffix)[0]}_good_' \
f'{int(len(status))}.m3u'
except ValueError:
name = Path.cwd() / "checked" / "good" / f'{Path(path).name}_good_{int(len(status))}.m3u'
with open(name, "a", encoding='utf-8') as f:
f.write("#EXTM3U\n")
for item in sorted(status):
f.write(f"{item}")
print(f'{Fore.GREEN}GOOD SAVED: {Fore.YELLOW}"checked / good" -> "{name.name}"')
if len(error) > 0:
(Path.cwd() / "checked" / "error").mkdir(exist_ok=True)
try:
name = Path.cwd() / "checked" / "error" / f'{Path(path).name.split(Path(path).suffix)[0]}_error_' \
f'{int(len(error))}.m3u'
except ValueError:
name = Path.cwd() / "checked" / "error" / f'{Path(path).name}_error_{int(len(error))}.m3u'
with open(name, "a", encoding='utf-8') as f:
f.write("#EXTM3U\n")
for item in sorted(error):
f.write(f"{item}")
print(f'{Fore.GREEN}ERROR SAVED: {Fore.YELLOW}"checked / error" -> "{name.name}"')
print(Fore.GREEN + "\n" + "-" * 60)
def find_mpd(txt: str, url: str) -> bool:
"""
Парсинг плейлиста в формате xml с расширением ".mpd".
:param txt: Текст запроса.
:param url: Стартовый url для формирования ссылки на сегмент потока.
:return: True или False, в зависимости от полученных результатов.
"""
soup = BeautifulSoup(txt, 'xml')
seg = soup.find_all('SegmentTemplate')
seg_list = []
for i in seg:
if i.get('initialization') is not None:
seg_list.append(urljoin(url, i.get('initialization')))
if seg_list:
return True if load_txt(seg_list[0]) else False
def load_txt(url: str) -> bool:
"""
Получение данных по ссылке на поток.
:param url: Ссылка на поток.
:return: True или False в зависимости от полученного результата.
"""
# Проверка наличия ссылки на YouTube и запуск
# проверки наличия идентификатора видео.
# Если идентификатор есть, ссылка рабочая.
if url.startswith("https://www.youtube.com") \
or url.startswith("https://youtube.com") \
or url.startswith("http://www.youtube.com") \
or url.startswith("http://youtube.com"):
try:
yt = YouTube("http://www.youtube.com/watch?v=6UFOJjDqL_g")
if yt.video_id:
return True
except exceptions.RegexMatchError:
return False
try:
# Запрос данных по ссылке на поток с включенной функцией получения данных в потоке. Если статус-код 200,
# выполняем последующие проверки. Первая - наличие в ссылке расширения с файлом xml, в котором
# парсятся ссылки на сегменты видео.
res = requests.get(url, headers=headers, timeout=15, stream=True, allow_redirects=True, verify=False)
if res.status_code == 200:
if ".mpd" in res.url:
if find_mpd(res.text, res.url):
return True
return False
# Итерация по поученному контенту и проверка первых 64 байт на наличие
# в получаемом потоке определенного типа данных по их сигнатуре.
# Переводим байты в hex, проверяем: 1. Наличие ссылки (сигнатура: 68 74...);
# 2. Проверка наличия в полученных данных html. Если есть, возвращаем False;
# 3. Проверка наличия в полученных данных текста. Если находим, передаем в
# функцию для обработки.
# Если поток получен, но в нем не найдена ни одна сигнатура, будем считать,
# что в данном потоке передаются медиа-данные, а значит возвращаем True.
for chunk in res.iter_content(64):
hex_bytes = " ".join(['{:02X}'.format(byte) for byte in chunk])
if hex_bytes[0:11] == "68 74 74 70":
if load_txt(chunk.decode().strip()):
return True
for it in html:
if hex_bytes[0:len(it)].upper() == it:
return False
for tx_pl in text_plain:
if hex_bytes[0:len(tx_pl)].upper() == tx_pl:
for item in res.iter_content(1024):
chunk = chunk + item
if lnk := find_link(res.url, chunk.decode()):
for ln in lnk:
if load_txt(ln):
return True
continue
elif not lnk:
return False
continue
return True
return False
return False
except Exception as ex:
return True if "ICY 200 OK" in str(ex) else False
def find_link(url: str, text: str) -> (list, bool):
"""
Поиск ссылок на сегменты потока в полученном тексте запроса.
:param url: Ссылка на поток, для формирования ссылки на сегмент.
:param text: Текст запроса для поиска ссылок на сегменты или сегментов.
:return: Список со ссылками или False.
"""
lnk = []
for i in text.splitlines():
if i.startswith("#"):
continue
elif not i.strip():
continue
lnk.append(i.strip()) if i.startswith("http") else lnk.append(urljoin(url, i.strip()))
return lnk if lnk else False
def check_rtmp(url: str) -> bool:
"""
Проверка медиа-потока по протоколу передачи данных rtmp.
Для проверки используется OpenCV.
:param url: Ссылка на поток для проверки.
:return: True или False в зависимости от результата.
"""
video = cv2.VideoCapture(url)
while True:
grabbed, frame = video.read()
if grabbed:
video.release()
return True
return False
def verification(url: str, ext: str, count: int, re_ch=False):
"""
Запуск проверки ссылок на поток и обработка результатов
возвращенных из функции проверки.
:param url: Ссылка на поток.
:param ext: Описание потока.
:param count: Количество ссылок для перепроверки.
:param re_ch: Статус перепроверки.
"""
global status, error, cnt
if re_ch == "rtmp":
if check_rtmp(url):
status.append(f'{ext}\n{url}\n')
cnt += 1
print(f"\r{Fore.YELLOW}Check rtmp: {Fore.RESET}{cnt}/{count} | {Fore.YELLOW}Online: {Fore.RESET}"
f"{len(status)} | {Fore.YELLOW}Offline: {Fore.RESET}{len(error)}",
end="")
return
else:
error.append(f'{ext}\n{url}\n')
cnt += 1
print(f"\r{Fore.YELLOW}Check rtmp: {Fore.RESET}{cnt}/{count} | {Fore.YELLOW}Online: {Fore.RESET}"
f"{len(status)} | {Fore.YELLOW}Offline: {Fore.RESET}{len(error)}",
end="")
return
if load_txt(url):
status.append(f'{ext}\n{url}\n')
if re_ch:
cnt += 1
print(f"\r{Fore.YELLOW}Re-Check: {Fore.RESET}{cnt}/{count} | {Fore.YELLOW}Online: {Fore.RESET}"
f"{len(status)} | {Fore.YELLOW}Offline: {Fore.RESET}{len(error)}",
end="")
else:
print(f"\r{Fore.YELLOW}Check: {Fore.RESET}{len(status) + len(error)}/{count} | {Fore.YELLOW}Online: "
f"{Fore.RESET}{len(status)} | {Fore.YELLOW}Offline: {Fore.RESET}{len(error)}",
end="")
else:
error.append(f'{ext}\n{url}\n')
if re_ch:
cnt += 1
print(f"\r{Fore.YELLOW}Re-Check: {Fore.RESET}{cnt}/{count} | {Fore.YELLOW}Online: {Fore.RESET}"
f"{len(status)} | {Fore.YELLOW}Offline: {Fore.RESET}{len(error)}",
end="")
else:
print(f"\r{Fore.YELLOW}Check: {Fore.RESET}{len(status) + len(error)}/{count} | {Fore.YELLOW}Online: "
f"{Fore.RESET}{len(status)} | {Fore.YELLOW}Offline: {Fore.RESET}{len(error)}",
end="")
def main():
"""
Получение данных от пользователя о сканируемой директории.
Формирование списка файлов в директории.
Подсчет количества ссылок на потоки и вывод в терминал.
Запуск потоков для проверки наличия медиа-контента.
Запуск перепроверки нерабочих потоков.
Вывод данных о результатах проверки в терминал.
"""
global status, error, cnt
path = input("\nВведите путь к директории с m3u: ")
if not Path(path).exists() or not Path(path).is_dir():
print("Нет такой директории или указанный путь не является директорией")
sys.exit(0)
time_start = time.monotonic()
files = [fil for fil in Path(path).iterdir() if Path(fil).suffix == ".m3u"]
rtmp = []
for nm, file in enumerate(sorted(files)):
n = 0
with open(file, 'r', encoding='utf-8') as f:
for nn in f.readlines():
if nn.startswith("http"):
n += 1
print(f'\n{Fore.YELLOW}Checked file: {Fore.RESET}{Path(file).name} | {Fore.YELLOW}Count: {Fore.RESET}{n} | '
f'{Fore.YELLOW}File: {Fore.RESET}{nm + 1}/{len(files)}\n')
print(Fore.GREEN + "-" * 60 + "\n")
num = 0
ext = ""
with ThreadPoolExecutor(max_workers=5) as executor:
with open(file, 'r', encoding='utf-8') as fl:
for line in fl.readlines():
if line.startswith("#EXTINF"):
ext = line.strip()
continue
elif line.startswith("rtmp"):
rtmp.append(f"{ext}\n{line.strip()}")
elif line.startswith("http"):
num += 1
if len(line.strip().split()) > 1:
line = line.strip().replace(" ", "")
executor.submit(verification, url=line.strip(), ext=ext, count=n)
if error:
re_check = copy.deepcopy(error)
error.clear()
print("\r\033[K", end="")
with ThreadPoolExecutor(max_workers=5) as executor:
for i in re_check:
ext = i.split("\n")[0].strip()
url = i.split("\n")[1].strip()
executor.submit(verification, url=url, ext=ext, count=len(re_check), re_ch=True)
re_check.clear()
cnt = 0
if rtmp:
print("\r\033[K", end="")
with ThreadPoolExecutor(max_workers=5) as executor:
for i in rtmp:
ext = i.split("\n")[0].strip()
url = i.split("\n")[1].strip()
executor.submit(verification, url=url, ext=ext, count=len(rtmp), re_ch="rtmp")
rtmp.clear()
cnt = 0
save_status_error(str(file))
print(f"\n{Fore.GREEN}{'All'.ljust(8)}{Fore.RESET}|{str(int((len(status)) + (len(error)))).center(6)}| {Fore.GREEN}channels")
print(f"{Fore.GREEN}{'Online'.ljust(8)}{Fore.RESET}|{str(int(len(status))).center(6)}| {Fore.GREEN}channels")
print(f"{Fore.GREEN}{'Offline'.ljust(8)}{Fore.RESET}|{str(int(len(error))).center(6)}| {Fore.GREEN}channels\n")
print(f'{Fore.GREEN}Scan time {Fore.RESET}| '
f'{(int(time.monotonic() - time_start) // 3600) % 24:d} ч. '
f'{(int(time.monotonic() - time_start) // 60) % 60:02d} м. '
f'{int(time.monotonic() - time_start) % 60:02d} с.\n')
print(Fore.GREEN + "-" * 60 + "\n")
Path(file).unlink()
status.clear()
error.clear()
if __name__ == "__main__":
main()
Еще немного поэкспериментировав решил объединить все версии скрипта в одну, плюс к тому, добавил проверку на длину возвращаемого списка со ссылками. Дело в том, что в некоторых плейлистах содержится более 100-та ссылок. Соответственно, если, к примеру, по первым ссылкам на сегменты будет возвращен код ошибки, то итерация продолжиться до упора. А так как ссылок очень много, то может показаться, что скрипт просто завис. Чтобы этого избежать и нужна эта проверка. То есть, если длина возвращаемого списка более 10, то будем итерироваться только по последним 10 элементам. Ну и добавил проверку rtmp. Так что, эта версия на данный момент самая полная. Возможно, что в процессе обнаружиться еще что-то, что потребуется обработать или изменить. Это уже будет отображаться (скорее всего) в репозитории на GitHub, как только я его создам )).
Python:
# pip install colorama requests bs4 lxml
import copy
import sys
import time
from concurrent.futures import ThreadPoolExecutor
from os import system
from pathlib import Path
from re import findall
from urllib.parse import urljoin
import cv2
import requests
from bs4 import BeautifulSoup
from colorama import Fore
from colorama import init
from pytube import YouTube, exceptions
requests.packages.urllib3.disable_warnings()
init()
text_plain = {"23 45 58 54", "5B 70 6C 61 79 6C 69 73", "74 65 78 74 2F 70 6C 61 69 6E", "EF BB BF 00", "FF FE 00 00",
"74 65 78 74 2F 70 6C 61 69 6E", "3B 20 63 68 61 72 73 65 74 3D", "49 53 4F 2D 38 38 35 39 2D 31",
"69 73 6F 2D 38 38 35 39 2D 31", "55 54 46 2D 38", "FE FF 00 00"}
html = {"7B 22", "3C 21 44 4F 43 54 59 50 45 20", "48 54 4D 4C TT", "3C 48 54 4D 4C TT", "3C 48 45 41 44 TT",
"3C 53 43 52 49 50 54 TT", "3C 49 46 52 41 4D 45 TT", "3C 48 31 TT", "3C 44 49 56 TT", "3C 46 4F 4E 54 TT",
"3C 54 41 42 4C 45 TT", "3C 41 TT", "3C 53 54 59 4C 45 TT", "3C 54 49 54 4C 45 TT", "3C 42 TT", "3C 42 52 TT",
"3C 42 4F 44 59 TT", "3C 50 TT", "3C 21 2D 2D TT", "3C 3F 78 6D 6C", "25 50 44 46 2D", "4E 6F", "3C 41", "3C",
"45 52"}
stop_list = ["/errors/", "test_end.ts", "buy.ts", "money.ts", "buy_packet.ts", "empty.ts", "key.ts", "/error/",
"/NOT_CLIENT/", "http://logo.apk-red.com/tv/hata.jpg", "http://v.viplime.fun/video/user.ts", "/forbidden/",
"zabava-block-htvod.cdn.ngenix.net", "err-ru.sulfat.li", "/000/", "BanT0ken", "/activate/", "/404/",
"/405/", "www.cloudflare-terms-of-service-abuse.com", "http://cdn01.lifeyosso.fun:8080/connect/mono.m3u8",
"http://nl4.iptv.monster/9999/video.m3u8", "http://v.viplime.fun/video/block.ts", "auth.m3u8", "/block/",
"auth", "logout", "VDO-X-404", "offline", "logout.mp4", "error.tv4.live", "block-ip-video.ts", "/420/",
"delete.ts", "http://langamepp.com/user.mp4", "http://m.megafonpro.ru/http_errors?error=404"]
headers = {
"User-Agent": "libmpv",
"Accept": "*/*",
"Connection": "keep-alive",
"Icy-MetaData": "1"
}
status = []
error = []
cnt = 0
def save_status_error(path: str):
"""
Сохранение содержимого списков status и error с рабочими и нерабочими ссылками.
:param path: Путь к открытому для проверки файлу.
"""
global status, error
print("\n\n" + "-" * 60)
print(f"\n{Fore.CYAN}SAVE DATA IN FILE\n{'-'*17}\n")
(Path.cwd() / "checked").mkdir(exist_ok=True)
if len(status) > 0:
(Path.cwd() / "checked" / "good").mkdir(exist_ok=True)
try:
name = Path.cwd() / "checked" / "good" / f'{Path(path).name.split(Path(path).suffix)[0]}_good_' \
f'{int(len(status))}.m3u'
except ValueError:
name = Path.cwd() / "checked" / "good" / f'{Path(path).name}_good_{int(len(status))}.m3u'
with open(name, "a", encoding='utf-8') as f:
f.write("#EXTM3U\n")
for item in sorted(status):
f.write(f"{item}")
print(f'{Fore.GREEN}GOOD SAVED: {Fore.YELLOW}"checked / good" -> "{name.name}"')
if len(error) > 0:
(Path.cwd() / "checked" / "error").mkdir(exist_ok=True)
try:
name = Path.cwd() / "checked" / "error" / f'{Path(path).name.split(Path(path).suffix)[0]}_error_' \
f'{int(len(error))}.m3u'
except ValueError:
name = Path.cwd() / "checked" / "error" / f'{Path(path).name}_error_{int(len(error))}.m3u'
with open(name, "a", encoding='utf-8') as f:
f.write("#EXTM3U\n")
for item in sorted(error):
f.write(f"{item}")
print(f'{Fore.GREEN}ERROR SAVED: {Fore.YELLOW}"checked / error" -> "{name.name}"')
print(Fore.GREEN + "\n" + "-" * 60)
def find_mpd(txt: str, url: str) -> bool:
"""
Парсинг плейлиста в формате xml с расширением ".mpd".
:param txt: Текст запроса.
:param url: Стартовый url для формирования ссылки на сегмент потока.
:return: True или False, в зависимости от полученных результатов.
"""
soup = BeautifulSoup(txt, 'xml')
seg = soup.find_all('SegmentTemplate')
seg_list = []
for i in seg:
if i.get('initialization') is not None:
seg_list.append(urljoin(url, i.get('initialization')))
if seg_list:
return True if load_txt(seg_list[0]) else False
def find_stop(url: str) -> bool:
"""
Поиск паттернов с помощью которых можно отсеять некоторые
каналы с заблокированным содержимым.
:param url: Ссылка для поиска паттерна.
:return: True или False в зависимости от результата.
"""
for st in stop_list:
if findall(f"{st}", url):
return True
return False
def find_link(url: str, text: str) -> (list, bool):
"""
Поиск ссылок на сегменты потока в полученном тексте запроса.
:param url: Ссылка на поток, для формирования ссылки на сегмент.
:param text: Текст запроса для поиска ссылок на сегменты или сегментов.
:return: Список со ссылками или False.
"""
lnk = []
for i in text.splitlines():
if i.startswith("#"):
continue
elif not i.strip():
continue
lnk.append(i.strip()) if i.startswith("http") else lnk.append(urljoin(url, i.strip()))
return lnk if lnk else False
def load_txt(ses, url: str) -> bool:
"""
Получение данных по ссылке на поток.
:param ses: Сессия.
:param url: Ссылка на поток.
:return: True или False в зависимости от полученного результата.
"""
# Проверка наличия ссылки на YouTube и запуск
# проверки наличия идентификатора видео.
# Если идентификатор есть, ссылка рабочая.
if url.startswith("https://www.youtube.com") \
or url.startswith("https://youtube.com") \
or url.startswith("http://www.youtube.com") \
or url.startswith("http://youtube.com"):
try:
yt = YouTube("http://www.youtube.com/watch?v=6UFOJjDqL_g")
if yt.video_id:
return True
except exceptions.RegexMatchError:
return False
try:
# Запрос данных по ссылке на поток с включенной функцией получения данных в потоке. Если статус-код 200,
# выполняем последующие проверки. Первая - наличие в ссылке расширения с файлом xml, в котором
# парсятся ссылки на сегменты видео.
res = ses.get(url, headers=headers, timeout=10, stream=True, allow_redirects=True, verify=False)
if res.status_code == 200:
if find_stop(res.url):
return False
if ".mpd" in res.url:
if find_mpd(res.text, res.url):
return True
return False
# Итерация по поученному контенту и проверка первых 64 байт на наличие
# в получаемом потоке определенного типа данных по их сигнатуре.
# Переводим байты в hex, проверяем: 1. Наличие ссылки (сигнатура: 68 74...);
# 2. Проверка наличия в полученных данных html. Если есть, возвращаем False;
# 3. Проверка наличия в полученных данных текста. Если находим, передаем в
# функцию для обработки.
# Если поток получен, но в нем не найдена ни одна сигнатура, будем считать,
# что в данном потоке передаются медиа-данные, а значит возвращаем True.
for chunk in res.iter_content(64):
hex_bytes = " ".join(['{:02X}'.format(byte) for byte in chunk])
if hex_bytes[0:11] == "68 74 74 70":
if load_txt(ses, chunk.decode().strip()):
return True
for it in html:
if hex_bytes[0:len(it)].upper() == it:
return False
for tx_pl in text_plain:
if hex_bytes[0:len(tx_pl)].upper() == tx_pl:
for item in res.iter_content(1024):
chunk = chunk + item
if lnk := find_link(res.url, chunk.decode()):
if len(lnk) > 10:
lnk = lnk[-10:]
for ln in lnk:
if find_stop(ln):
return False
if load_txt(ses, ln):
return True
continue
elif not lnk:
return False
continue
return True
return False
return False
except Exception as ex:
return True if "ICY 200 OK" in str(ex) else False
def check_rtmp(url: str) -> bool:
"""
Проверка медиа-потока по протоколу передачи данных rtmp.
Для проверки используется OpenCV.
:param url: Ссылка на поток для проверки.
:return: True или False в зависимости от результата.
"""
video = cv2.VideoCapture(url)
while True:
grabbed, frame = video.read()
if grabbed:
video.release()
return True
return False
def verification(url: str, ext: str, count: int, re_ch=False):
"""
Запуск проверки ссылок на поток и обработка результатов
возвращенных из функции проверки.
:param url: Ссылка на поток.
:param ext: Описание потока.
:param count: Количество ссылок для перепроверки.
:param re_ch: Статус перепроверки.
"""
global status, error, cnt
if re_ch == "rtmp":
if check_rtmp(url):
status.append(f'{ext}\n{url}\n')
cnt += 1
print(f"\r{Fore.YELLOW}Check rtmp: {Fore.RESET}{cnt}/{count} | {Fore.YELLOW}Online: {Fore.RESET}"
f"{len(status)} | {Fore.YELLOW}Offline: {Fore.RESET}{len(error)}",
end="")
return
else:
error.append(f'{ext}\n{url}\n')
cnt += 1
print(f"\r{Fore.YELLOW}Check rtmp: {Fore.RESET}{cnt}/{count} | {Fore.YELLOW}Online: {Fore.RESET}"
f"{len(status)} | {Fore.YELLOW}Offline: {Fore.RESET}{len(error)}",
end="")
return
ses = requests.Session()
if load_txt(ses, url):
status.append(f'{ext}\n{url}\n')
if re_ch:
cnt += 1
print(f"\r{Fore.YELLOW}Re-Check: {Fore.RESET}{cnt}/{count} | {Fore.YELLOW}Online: {Fore.RESET}"
f"{len(status)} | {Fore.YELLOW}Offline: {Fore.RESET}{len(error)}",
end="")
else:
print(f"\r{Fore.YELLOW}Check: {Fore.RESET}{len(status) + len(error)}/{count} | {Fore.YELLOW}Online: "
f"{Fore.RESET}{len(status)} | {Fore.YELLOW}Offline: {Fore.RESET}{len(error)}",
end="")
else:
error.append(f'{ext}\n{url}\n')
if re_ch:
cnt += 1
print(f"\r{Fore.YELLOW}Re-Check: {Fore.RESET}{cnt}/{count} | {Fore.YELLOW}Online: {Fore.RESET}"
f"{len(status)} | {Fore.YELLOW}Offline: {Fore.RESET}{len(error)}",
end="")
else:
print(f"\r{Fore.YELLOW}Check: {Fore.RESET}{len(status) + len(error)}/{count} | {Fore.YELLOW}Online: "
f"{Fore.RESET}{len(status)} | {Fore.YELLOW}Offline: {Fore.RESET}{len(error)}",
end="")
ses.close()
def main():
"""
Получение данных от пользователя о сканируемой директории.
Формирование списка файлов в директории.
Подсчет количества ссылок на потоки и вывод в терминал.
Запуск потоков для проверки наличия медиа-контента.
Запуск перепроверки нерабочих потоков.
Вывод данных о результатах проверки в терминал.
"""
global status, error, cnt
# path = input("\nВведите путь к директории с m3u: ")
# if not Path(path).exists() or not Path(path).is_dir():
# print("Нет такой директории или указанный путь не является директорией")
# sys.exit(0)
path = "/home/vev/py_proj/checker_v4/000"
time_start = time.monotonic()
files = [fil for fil in Path(path).iterdir() if Path(fil).suffix == ".m3u"]
rtmp = []
file = ""
try:
for nm, file in enumerate(sorted(files)):
n = 0
with open(file, 'r', encoding='utf-8') as f:
for nn in f.readlines():
if nn.startswith("http"):
n += 1
print(f'\n{Fore.YELLOW}Checked file: {Fore.RESET}{Path(file).name} | {Fore.YELLOW}Count: {Fore.RESET}{n} | '
f'{Fore.YELLOW}File: {Fore.RESET}{nm + 1}/{len(files)}\n')
print(Fore.GREEN + "-" * 60 + "\n")
num = 0
ext = ""
with ThreadPoolExecutor(max_workers=5) as executor:
with open(file, 'r', encoding='utf-8') as fl:
for line in fl.readlines():
if line.startswith("#EXTINF"):
ext = line.strip()
continue
elif line.startswith("rtmp"):
rtmp.append(f"{ext}\n{line.strip()}")
elif line.startswith("http"):
num += 1
if len(line.strip().split()) > 1:
line = line.strip().replace(" ", "")
executor.submit(verification, url=line.strip(), ext=ext, count=n)
if error:
re_check = copy.deepcopy(error)
error.clear()
print("\r\033[K", end="")
with ThreadPoolExecutor(max_workers=5) as executor:
for i in re_check:
ext = i.split("\n")[0].strip()
url = i.split("\n")[1].strip()
executor.submit(verification, url=url, ext=ext, count=len(re_check), re_ch=True)
re_check.clear()
cnt = 0
if rtmp:
print("\r\033[K", end="")
with ThreadPoolExecutor(max_workers=5) as executor:
for i in rtmp:
ext = i.split("\n")[0].strip()
url = i.split("\n")[1].strip()
executor.submit(verification, url=url, ext=ext, count=len(rtmp), re_ch="rtmp")
rtmp.clear()
cnt = 0
save_status_error(str(file))
print(f"\n{Fore.GREEN}{'All'.ljust(8)}{Fore.RESET}|{str(int((len(status)) + (len(error)))).center(6)}| {Fore.GREEN}channels")
print(f"{Fore.GREEN}{'Online'.ljust(8)}{Fore.RESET}|{str(int(len(status))).center(6)}| {Fore.GREEN}channels")
print(f"{Fore.GREEN}{'Offline'.ljust(8)}{Fore.RESET}|{str(int(len(error))).center(6)}| {Fore.GREEN}channels\n")
print(f'{Fore.GREEN}Scan time {Fore.RESET}| '
f'{(int(time.monotonic() - time_start) // 3600) % 24:d} ч. '
f'{(int(time.monotonic() - time_start) // 60) % 60:02d} м. '
f'{int(time.monotonic() - time_start) % 60:02d} с.\n')
print(Fore.GREEN + "-" * 60 + "\n")
status.clear()
error.clear()
Path(file).unlink()
system(f'''notify-send "All operation complete"''')
except UnicodeDecodeError:
print(f"\nНе могу декодировать данные: {Path(file).name}\n")
system(f'''notify-send "Не могу декодировать данные: {Path(file).name}"''')
sys.exit(0)
if __name__ == "__main__":
main()
И еще, добавил обработку ошибки декодирования данных, о которой писал в первой части. Если у вас операционная система Windows, желательно удалить строку:
system(f'''notify-send "Не могу декодировать данные: {Path(file).name}"''')
Ну или добавить win10toast, для вывода сообщений в ОС Windows. Его можно установить командой:
pip install win10toast
Вот такая вот получилась история.
А на этом, пожалуй, все.
Спасибо за внимание. Надеюсь, данная информация будет вам полезна
Вложения
Последнее редактирование: