Статья Поиск, скачивание и извлечение метаданных из документов в свободном доступе с помощью Python. #02

В предыдущей статье мы начали писать скрипт поиска проиндексированных документов на определенном сайте, скачивание этих документов и извлечение из них доступных метаданных. И все это с помощью Python. Давайте продолжим создавать код и разбирать работу функций.

02.jpg


Дисклеймер: Все данные, предоставленные в данной статье, взяты из открытых источников, не призывают к действию и являются только лишь данными для ознакомления, и изучения механизмов используемых технологий.


Загрузка найденных документов

После того, как документы будут найдены, нужно их загрузить для анализа, да и просто, если есть желание, для чтения. Иногда можно узнать много нового и полезного. Создадим функцию downloads(url, addr). На вход она принимает ссылку на документ и название сайта на котором производился поиск. Название сайта нужно, чтобы сохранять документы в созданную ранее директорию.

Немного скажу, что здесь я промучился, при всей очевидности достаточно долго. Не знаю, правильно ли я все сделал, но, документы загружаются. Тут дело вот в чем. Для загрузки документов я использую потоки. Но, скрипт зависал и мог провисеть бесконечно, пока его не прерывали в принудительном порядке. Я начал проверять, почему так происходит и, пришел к выводу, что по некоторым ссылкам очень долгое время нет ответа от сервера. А так как его нет, либо сервер не хочет отдавать документ, либо ссылка попросту битая. Такие иногда попадаются при поиске DuckDuckGo. С Google реже, но индексы у него свежее.

Создадим заголовки для запросов. В них добавим случайный user-agent. Это обязательно, так как без него сервера порой отказываются отдавать документы. И accept. Что не обязательно, но не помешает.

Python:
    ua = UserAgent()
    headers = {
        'user-agent': ua.random,
        'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,image/apng,*/*;q=0.8'
    }

Создадим пустую переменную для запроса, чтобы она была видна во всем коде функции. Теперь делаем запрос для получения заголовков по ссылке. Для этого используем функцию head. С ее помощью заголовки получаются без загрузки контента. Установим необходимые параметры для запроса: ссылку, по которой будет происходить загрузка; заголовки с user-agen и accept; отключаем проверку сертификата; устанавливаем общий таймаут в 30 секунд. Тут дело в том, что если в течении 30 секунд ответа от сервера получено не будет, вызовется исключение связанное с окончанием таймаута. Мы его обработаем и таким образом завершим работу потока.

Затем нужно определить название загружаемого документа. Честно говоря, не всегда его можно получить. Это конечно не часто, но иногда случается, когда ссылка не прямая, а с редиректом. Обычно в этих случаях название указывается в заголовках. Его мы и получаем из ключа Content-Disposition. Но иногда и ссылка с редиректом, и названия в заголовках нет. Тогда мы просто не загружаем данный документ. Если ключа нет в заголовках, проверяем ссылку, делим ее по «/» и забираем последний элемент. Зачастую названия нужно декодировать потому оборачиваем все в requests.utils.unquote. Бывает, что декодировать не получается, тогда обрабатываем исключение. Но это тоже редкость.

Python:
        resp = ''
        try:
            resp = requests.get(url=url, headers=headers, verify=False, timeout=30)
            doc_name = requests.utils.unquote(
                resp.headers['Content-Disposition'].replace("filename", "").replace("attachment", "").
                replace("UTF-8", "").replace("*", "").replace(";", "").replace("=", "").replace("'", "").
                replace('"', "").replace('/', "").replace('inline', ""))
        except KeyError:
            doc_name = requests.utils.unquote(url.split("/")[-1])
        except (requests.exceptions.ReadTimeout, requests.exceptions.ChunkedEncodingError):
            return

Теперь проверяем, существует ли файл в директории загрузки. Особенно это полезно, если вы перезапустили скрипт, чтобы не загружать одно и то же два раза. И если нет, открываем файл на запись в байтовом режиме, и скачиваем документ по частям, каждая из которых равна 1024 байта. Если часть существует, записываем. Если ее не будет, обрабатываем исключение.

Python:
        try:
            if Path(Path.cwd() / addr / doc_name).exists():
                return
            with open(Path.cwd() / addr / doc_name, 'wb') as file:
                for chunk in resp.iter_content(chunk_size=1024):
                    if chunk:
                        file.write(chunk)
        except (OSError, requests.exceptions.ChunkedEncodingError):
            return

И заключаем все в блок try — except, в котором обрабатываем ошибку соединения.

Python:
# функция загрузки файлов
def downloads(url, addr):
    ua = UserAgent()
    headers = {
        'user-agent': ua.random,
        'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,image/apng,*/*;q=0.8'
    }

    try:
        resp = ''
        try:
            resp = requests.get(url=url, headers=headers, verify=False, timeout=30)
            doc_name = requests.utils.unquote(
                resp.headers['Content-Disposition'].replace("filename", "").replace("attachment", "").
                replace("UTF-8", "").replace("*", "").replace(";", "").replace("=", "").replace("'", "").
                replace('"', "").replace('/', "").replace('inline', ""))
        except KeyError:
            doc_name = requests.utils.unquote(url.split("/")[-1])
        except (requests.exceptions.ReadTimeout, requests.exceptions.ChunkedEncodingError):
            return

        try:
            if Path(Path.cwd() / addr / doc_name).exists():
                return
            with open(Path.cwd() / addr / doc_name, 'wb') as file:
                for chunk in resp.iter_content(chunk_size=1024):
                    if chunk:
                        file.write(chunk)
        except (OSError, requests.exceptions.ChunkedEncodingError):
            return
    except requests.exceptions.ConnectionError:
        return

Теперь создадим еще одну функцию, для запуска потоков в которых будет отрабатывать загрузка файла. Назову ее doc_downloads(links, addr). На вход она принимает список или множество (тут смотря из какой функции вызывается) со ссылками. И адрес сайта, на котором производиться поиск. Создаем список для потоков. Запускаем цикл, в котором перебираем ссылки. Создаем объект потока, в котором указываем целевую функцию, и передаваемые значения. В данном случае это ссылка на документ и адрес сайта для сохранения файлов в нужную директорию. Определяем поток как демона и запускаем. После чего добавляем его в список потоков. Делаем небольшую паузу. После чего запускаем цикл, в котором считаем потоки и сообщаем об этом пользователю.

Python:
def doc_downloads(links, addr):
    print('[+] Начинаю загрузку')
    threads = []

    for link in links:
        t = threading.Thread(target=downloads, kwargs={'url': link, 'addr': addr})
        t.daemon = True
        t.start()
        threads.append(t)
        time.sleep(0.3)

        for num, thread in enumerate(threads):
            print(f'\r   - Обработано: {num + 1}', end='')
            thread.join()


Вспомогательная функция получения e-mail из текста документа

Создадим функцию, которая будет, по сути, вспомогательной и служить только одной цели, а именно поиску e-mail в полученном тексте. Назову ее email_find(text). На вход она получает текст из других функций, с помощью регулярного выражения ищет электронную почту и если находит, добавляет в множество, которое для этого было создано ранее.

Python:
# функция проверки наличия в тексте e-mail
def email_find(text):
    emails = re.findall("([a-zA-Z0-9_.+-][email protected][a-zA-Z0-9-]+\.[a-zA-Z0-9-.]+)", text)
    if emails and len(emails) > 0:
        for em in emails:
            email.add(em)


Вспомогательная функция получения метаданных из изображения

Создадим функцию, которая будет получать метаданные из найденных изображений в документах. На входе она получает путь для открытия изображения, а также адрес сайта, на котором ведется поиск, это для названия файла, в который будут сохраняться найденные метаданные. Назову ее image_metadata(file, addr).

Открываем файл с помощью функции Image библиотеки Pillow. Получаем exif данные, если они есть. Открываем файл на запись, запускаем цикл по полученным метаданным и добавляем каждый тег и его значение в текстовый документ. Если же метаданных нет, просто выходим из функции.

Python:
# функция обработки метаданных из картинок документов
def image_metadata(file, addr):
    img = Image.open(file)
    try:
        exif = {ExifTags.TAGS[k]: v for k, v in img._getexif().items() if k in ExifTags.TAGS}
        with open(Path.cwd() / f'{addr}_meta.txt', 'a', encoding='utf-8') as meta:
            for info in exif:
                if info == 'GPSInfo':
                    try:
                        meta.write('GPSInfo\n\n')
                        meta.write(
                            f'{info:27}: lat {exif[info][2]} {exif[info][1]} - long {exif[info][4]} {exif[info][3]}\n')
                    except KeyError:
                        pass
                else:
                    if isinstance(exif[info], bytes):
                        try:
                            info_d = exif[info].decode()
                            meta.write(f'{info:25}: {info_d}\n')
                        except UnicodeDecodeError:
                            meta.write(f'{info:25}: {exif[info]}\n')
                    else:
                        meta.write(f'{info:25}: {exif[info]}\n')
            meta.write(f"{'-' * 25}\n")
    except (AttributeError, zipfile.BadZipFile):
        return

Здесь также обрабатываем исключения, которые иногда возникают при записи данных в текстовый файл, потому, что метаданные бывают очень уж не декодируемые. А также исключение, когда не удается прочитать содержимое архива.
А теперь пришло время создать функции для чтения метаданных. И начнем мы, пожалуй, с pdf.


Чтение текста и метаданных из pdf-документа

Создадим функцию pdf_metadata(item). На входе она получает путь к файлу pdf. Затем открываем его и если не удалось открыть, обрабатываем ошибку открытия файла.

Python:
    try:
        doc = fitz.open(item)
    except fitz.fitz.FileDataError:
        return

Теперь считываем метаданные. Здесь нас интересуют сведения об авторе документа, а также ПО с помощью которого документ был создан.

Python:
    if doc.metadata['author'] != '':
        author.add(doc.metadata['author'])
    creator = ''
    producer = ''
    if doc.metadata['creator'] != '':
        creator = doc.metadata['creator']
    if doc.metadata['producer'] != '':
        producer = doc.metadata["producer"]
    if creator == '' and producer == '':
        pass
    else:
        soft.add(f'{creator} ({producer})'.strip())

Здесь выполняются разные проверки на то,чтобы не было пустых значений. Затем запускаем цикл, в котором считываем текст постранично и передаем его в функцию поиска электронной почты.

Python:
    for current_page in range(len(doc)):
        if doc.load_page(current_page).get_text("text") != "":
            text = doc.load_page(current_page).get_text("text")
            email_find(str(text))

Python:
# получение текста и метаданных pdf
def pdf_metadata(item):
    try:
        doc = fitz.open(item)
    except fitz.fitz.FileDataError:
        return
    if doc.metadata['author'] != '':
        author.add(doc.metadata['author'])
    creator = ''
    producer = ''
    if doc.metadata['creator'] != '':
        creator = doc.metadata['creator']
    if doc.metadata['producer'] != '':
        producer = doc.metadata["producer"]
    if creator == '' and producer == '':
        pass
    else:
        soft.add(f'{creator} ({producer})'.strip())
    for current_page in range(len(doc)):
        if doc.load_page(current_page).get_text("text") != "":
            text = doc.load_page(current_page).get_text("text")
            email_find(str(text))


Чтение метаданных, текста и поиск картинок в документах Microsoft Office 2007 и выше

Как вы знаете, формат документов офис, начиная с 2007 версии, это zip архив с папками и файлами xml. Следовательно, нам нужно этот документ распаковать и распарсить данные. Создадим функцию microdoc_metadata(item, addr). На вход здесь получаем путь к файлу и адрес сайта, на котором мы искали документ. Адрес нужен для передачи в функцию получения метаданных из изображений, если таковые найдутся.

Для начала проверяем, является ли файл архивом. Затем делаем попытку его открыть, если это не получается, обрабатываем исключение и выходим из функции.

Python:
    zipfile.is_zipfile(item)
    try:
        zfile = zipfile.ZipFile(item)
    except zipfile.BadZipFile:
        return

Создаем переменную, в которой будет находиться дерево тегов считанных из распакованного файла и содержащееся в файле «core.xml». Если это не получается, обрабатываем ошибку и выходим из функции. Здесь содержатся сведения об авторе. Также, считываем данные из «app.xml». Здесь находятся сведения о версии ПО, в котором создан документ.

Python:
    try:
        core_xml = etree.fromstring(zfile.read('docProps/core.xml'))
        app_xml = etree.fromstring(zfile.read('docProps/app.xml'))
    except KeyError:
        return

Теперь проверяем, какой именно тип офисного документа содержится в ссылке, так как у разных типов хоть и похожая, но слегка отличающаяся структура архива. Для начала в переменную получаем теги из тела документа. Здесь содержится текст, он нам будет нужен чуть позже, когда мы будем передавать его в функцию поиска e-mail.

Python:
    if item.suffix == ".docx":
        try:
            doc_xml = etree.fromstring(zfile.read('word/document.xml'))
        except KeyError:
            return

Запускаем цикл в котором пробегаемся по всем ключевым тегам. Для начала ищем тег body, если он найден итерируем его и ищем тег «t». В нем содержится текст. Забираем его оттуда и передаем в функцию поиска e-mail. И так для каждого из тегов в теле документа.

Python:
        for element in doc_xml:
            if element.tag == '{http://schemas.openxmlformats.org/wordprocessingml/2006/main}body':
                for elem in element.iter():
                    if elem.tag == '{http://schemas.openxmlformats.org/wordprocessingml/2006/main}t':
                        email_find(str(elem.text))

Точно такие же операции проделываем для получения текста из файлов pptx и xlsx. Здесь принцип похож, только различаются имена тегов и названия файлов в которых содержится текст. Создаем еще один цикл, в котором пробегаемся по тегам метаданных документа. Здесь нас интересует тег автора и кто последний раз вносил изменения, так как тут тоже содержится имя пользователя.

Python:
    for element in core_xml.iter():
        if element.tag == '{http://purl.org/dc/elements/1.1/}creator':
            if element.text is not None or element.text != '':
                author.add(element.text)
        elif element.tag == \
                '{http://schemas.openxmlformats.org/package/2006/metadata/core-properties}lastModifiedBy':
            if element.text is not None or element.text != '':
                author.add(element.text)

И точно такой же цикл по тегам «app.xml», откуда забираем данные о приложении.

Python:
    for element in app_xml.iter():
        if element.tag == '{http://schemas.openxmlformats.org/officeDocument/2006/extended-properties}Properties':
            app_n = ''
            app_v = ''
            for elem in element.iter():
                if elem.tag == '{http://schemas.openxmlformats.org/officeDocument/2006/extended-properties}Application':
                    app_n = elem.text
                elif elem.tag == '{http://schemas.openxmlformats.org/officeDocument/2006/extended-properties}AppVersion':
                    app_v = elem.text
            soft.add(f'{app_n}_{app_v}')

Теперь получаем список файлов в архиве, пробегаемся по ним циклом и если в наименовании файла есть слово «media» проверяем расширение файла. Если оно равно расширению изображения распаковываем данный файл и передаем в функцию получения метаданных из изображения.

Python:
    z = zfile.namelist()
    for i in z:
        if "media" in i and i.endswith(".jpg"):
            file = zfile.open(i)
            image_metadata(file, addr)
        elif "media" in i and i.endswith(".jpeg"):
            file = zfile.open(i)
            image_metadata(file, addr)
        elif "media" in i and i.endswith(".png"):
            file = zfile.open(i)
            image_metadata(file, addr)

Python:
def microdoc_metadata(item, addr):
    zipfile.is_zipfile(item)
    try:
        zfile = zipfile.ZipFile(item)
    except zipfile.BadZipFile:
        return
    try:
        core_xml = etree.fromstring(zfile.read('docProps/core.xml'))
        app_xml = etree.fromstring(zfile.read('docProps/app.xml'))
    except KeyError:
        return
    if item.suffix == ".docx":
        try:
            doc_xml = etree.fromstring(zfile.read('word/document.xml'))
        except KeyError:
            return
        for element in doc_xml:
            if element.tag == '{http://schemas.openxmlformats.org/wordprocessingml/2006/main}body':
                for elem in element.iter():
                    if elem.tag == '{http://schemas.openxmlformats.org/wordprocessingml/2006/main}t':
                        email_find(str(elem.text))
    elif item.suffix == ".xlsx":
        try:
            xlsx_xml = etree.fromstring(zfile.read('xl/sharedStrings.xml'))
        except KeyError:
            return
        for element in xlsx_xml:
            for elem in element.iter():
                if elem.tag == '{http://schemas.openxmlformats.org/spreadsheetml/2006/main}t':
                    email_find(str(elem.text))
    elif item.suffix == ".pptx":
        for files in zfile.namelist():
            if 'slides' in files and files.endswith(".xml"):
                pptx = etree.fromstring(zfile.read(files))
                for element in pptx:
                    if element.tag == '{http://schemas.openxmlformats.org/presentationml/2006/main}cSld':
                        for elem in element.iter():
                            if elem.tag == '{http://schemas.openxmlformats.org/drawingml/2006/main}t':
                                email_find(str(elem.text))
    for element in core_xml.iter():
        if element.tag == '{http://purl.org/dc/elements/1.1/}creator':
            if element.text is not None or element.text != '':
                author.add(element.text)
        elif element.tag == \
                '{http://schemas.openxmlformats.org/package/2006/metadata/core-properties}lastModifiedBy':
            if element.text is not None or element.text != '':
                author.add(element.text)
    for element in app_xml.iter():
        if element.tag == '{http://schemas.openxmlformats.org/officeDocument/2006/extended-properties}Properties':
            app_n = ''
            app_v = ''
            for elem in element.iter():
                if elem.tag == '{http://schemas.openxmlformats.org/officeDocument/2006/extended-properties}Application':
                    app_n = elem.text
                elif elem.tag == '{http://schemas.openxmlformats.org/officeDocument/2006/extended-properties}AppVersion':
                    app_v = elem.text
            soft.add(f'{app_n}_{app_v}')
    z = zfile.namelist()
    for i in z:
        if "media" in i and i.endswith(".jpg"):
            file = zfile.open(i)
            image_metadata(file, addr)
        elif "media" in i and i.endswith(".jpeg"):
            file = zfile.open(i)
            image_metadata(file, addr)
        elif "media" in i and i.endswith(".png"):
            file = zfile.open(i)
            image_metadata(file, addr)


Получение метаданных из документов Libre (Open) Office

Создадим функцию libre_office_metadata(item, addr). Она получает на входе путь к файлу документа и адрес сайта, на котором документ был найден. Далее, так как документы данного типа также как и Microsoft Office 2007 являются zip с xml, получение метаданных и текста примерно похожее. Отличается только название тегов и расположение их в архиве. Поэтому подробно разбирать данный код не имеет большого смысла. Скажу только, что здесь также, если есть изображения, производиться попытка получить из них метаданные. Забирается название приложения, в котором документ создан и текст документа передается в функцию поиска электронной почты. Тега, который бы указывал на автора документа я здесь не нашел. Потому, здесь он не извлекается.

Python:
def libre_office_metadata(item, addr):
    zipfile.is_zipfile(item)
    try:
        zfile = zipfile.ZipFile(item)
    except zipfile.BadZipFile:
        return
    core_xml = etree.fromstring(zfile.read('meta.xml'))
    if item.suffix in [".odp", ".odg", ".ods"]:
        try:
            doc_xml = etree.fromstring(zfile.read('content.xml'))
        except KeyError:
            return
        for element in doc_xml:
            if element.tag == '{urn:oasis:names:tc:opendocument:xmlns:office:1.0}body':
                for elem in element.iter():
                    if elem.tag == '{urn:oasis:names:tc:opendocument:xmlns:text:1.0}p':
                        if elem.text is not None:
                            email_find(str(elem.text))
    elif item.suffix == ".odf":
        try:
            odf_xml = etree.fromstring(zfile.read('content.xml'))
        except KeyError:
            return
        for element in odf_xml:
            for elem in element.iter():
                if elem.tag == '{http://www.w3.org/1998/Math/MathML}annotation':
                    email_find(str(elem.text))
    elif item.suffix == ".odt":
        try:
            odt_xml = zfile.read('content.xml')
        except KeyError:
            return
        soup = BeautifulSoup(odt_xml, 'xml')
        elements = soup.find_all('text:p')
        for element in elements:
            if element.text != "":
                email_find(str(element.text))

    for element in core_xml.iter():
        if element.tag == '{urn:oasis:names:tc:opendocument:xmlns:meta:1.0}generator':
            soft.add(f'{element.text}{item.suffix}')
    z = zfile.namelist()
    for i in z:
        if "Pictures" in i and i.endswith(".jpg"):
            file = zfile.open(i)
            image_metadata(file, addr)
        elif "Pictures" in i and i.endswith(".jpeg"):
            file = zfile.open(i)
            image_metadata(file, addr)
        elif "Pictures" in i and i.endswith(".png"):
            file = zfile.open(i)
            image_metadata(file, addr)


Получение данных из документов Microsoft Office 2003

Создадим функцию old_doc_metadata(item). На входе она все также получает путь к файлу. Здесь нужно немного пояснений. На самом деле, этот формат является проприетарным, потому просто так прочитать его не получиться. Если бы мы использовали ОС Windows, расшифровать данные можно было бы с помощью библиотеки win32. Но, так как скрипт кроссплатформенный, я решил отказаться от получения текста из данного формата документов, а только лишь считывать метаданные. Тем более, что этот формат является устаревшим и встречается все реже. Думаю, что в будущем, он останется только лишь в самых ранних документах, особенно, когда госструктуры перейдут на Linux. Но, это мое субъективное мнение.

Для считывания метаданных из данного формата документов я нашел библиотеку olefile. Для начала проверяем корректность OLE файла с помощью функции assert. Если ловим исключение, выходим из функции. Затем создаем OLE объект с файлом. И считываем в переменную meta метаданные.

Python:
    try:
        assert olefile.isOleFile(item)
    except AssertionError:
        return
    ole = olefile.OleFileIO(item)
    meta = ole.get_metadata()

Затем выполняем проверку, не является ли значение тега байтами. Если да, пытаемся декодировать. Если нет, оставляем как есть и добавляем в множество с авторами. Если не получилось декодировать в utf-8, пытаемся декодировать в windows-1251, родной формат Windows.

Python:
     try:
        if isinstance(meta.author, bytes):
            if meta.author.decode() != "" and meta.author.decode() is not None:
                author.add(meta.author.decode())
        else:
            if meta.author != "" and meta.author is not None:
                author.add(meta.author)
    except UnicodeDecodeError:
        author.add(meta.author.decode('windows-1251'))

Точно также поступаем для всех тегов документа. Данный код работает на всех форматах офисных файлов, то есть doc, ppt, xls.

Python:
def old_doc_metadata(item):
    try:
        assert olefile.isOleFile(item)
    except AssertionError:
        return
    ole = olefile.OleFileIO(item)
    meta = ole.get_metadata()

    try:
        if isinstance(meta.author, bytes):
            if meta.author.decode() != "" and meta.author.decode() is not None:
                author.add(meta.author.decode())
        else:
            if meta.author != "" and meta.author is not None:
                author.add(meta.author)
    except UnicodeDecodeError:
        author.add(meta.author.decode('windows-1251'))

    try:
        if isinstance(meta.last_saved_by, bytes):
            if meta.last_saved_by.decode() != "" and meta.last_saved_by.decode() is not None:
                author.add(meta.last_saved_by.decode())
        else:
            if meta.last_saved_by != "" and meta.last_saved_by is not None:
                author.add(meta.last_saved_by)
    except UnicodeDecodeError:
        author.add(meta.last_saved_by.decode('windows-1251'))

    try:
        if isinstance(meta.creating_application, bytes):
            if meta.creating_application.decode() != "" and meta.creating_application.decode() is not None:
                soft.add(meta.creating_application.decode())
        else:
            if meta.creating_application != "" and meta.creating_application is not None:
                soft.add(meta.creating_application)
    except UnicodeDecodeError:
        soft.add(meta.creating_application.decode('windows-1251'))


Функция поиска файлов и запуска функций извлечения метаданных

Создадим функцию file_search(addr), которая на входе будет получать адрес сайта на котором производился поиск. А так как у нас сохранение документов происходит в текущей директории скрипта, в которой создается папка для документов с именем сайта, искать документы будем именно там.

Добавляем в переменную путь к директории с документами. Проверяем, существует ли данная директория. Если да, итерируем все объекты в ней. После чего запускаем цикл, в котором проверяем расширение файла. И запускаем функцию извлечения метаданных в соответствии с расширением. Я также добавил сюда и запуск извлечения метаданных из изображений.

Python:
def file_search(addr):
    dirs = Path(Path.cwd() / addr)
    if Path.exists(dirs):
        files = Path.iterdir(dirs)
        for file in files:
            if file.suffix == ".pdf":
                pdf_metadata(file)
            elif file.suffix in ['.docx', '.pptx', '.xlsx', '.ppsx']:
                microdoc_metadata(file, addr)
            elif file.suffix in ['.odf', '.odg', '.odp', '.ods', '.odt']:
                libre_office_metadata(file, addr)
            elif file.suffix in ['.doc', '.xls', '.ppt']:
                old_doc_metadata(file)
            elif file.suffix in ['.jpg', '.jpeg']:
                image_metadata(file, addr)


Сохранение полученных результатов в файл

Что же, вот мы и добрались до последней функции данного скрипта. В ней происходит сохранение всех полученных данных, которые содержаться в множествах. Создадим функцию result_save(addr). На входе она получает адрес сайта, чтобы создать путь для проверки существования директории. В случае, если не будет найдено файлов, директория также не будет создана, а значит и сохранять будет нечего.

Создаем путь к директории с файлами, а также объект docx.Document(). Устанавливаем размер шрифта в документе, стиль документа и сам шрифт.

Python:
    dir_f = Path(Path.cwd() / addr)
    doc = docx.Document()
    style = doc.styles['Normal']
    style.font.name = 'Arial'
    style.font.size = Pt(10)

Делаем проверку на существование директории. Если существует, двигаемся дальше. Открываем текстовый документ на запись. Сохранять мы его будем в директории скрипта, а не в директории файлов, чтобы не смешивать с остальными документами. Проверяем длину множеств и если она не равна 0, пробегаемся циклом по содержимому и записываем полученные значения в файл. Данную операцию повторяем для всех объявленных множеств.

Python:
    if dir_f.exists():
        with open(Path.cwd() / f'{addr}_result.txt', 'a', encoding='utf-8') as result:
            if len(author) > 0:
                result.write(f'Author:\n{"-" * 25}\n\n')
                for pers in author:
                    result.write(f'{pers}\n')

Дальше делаем проверку на существование текстового файла с данными. Открываем его на чтение и считываем содержимое в переменную. Проверяем, не является ли содержимое пустым, если применить функцию strip. Бывает такое, что записывается только лишь пробел. И если длина файла после обрезки пустоты не равна 0, добавляем содержимое в параграф документа Word, после чего сохраняем офисный документ. На всякий случай обрабатываем исключение, когда данные не могут быть добавлены в xml. Тогда мы просто завершаем работу функции. Ну, а если текстовый документ с записанными ранее значениями будет пустым, просто удаляем его и сообщаем пользователю, что данные не получены.

Python:
        if Path(Path.cwd() / f'{addr}_result.txt').exists():
            with open(Path.cwd() / f'{addr}_result.txt', 'r', encoding='utf-8') as res:
                read = res.read()
            if len(read.strip()) > 0:
                try:
                    doc.add_paragraph(read)
                except ValueError:
                    return
                doc.save(Path.cwd() / f'{addr}_result.docx')
                print(f'\n[+] Данные сохранены: {Path.cwd() / f"{addr}_result.docx"}')
            else:
                Path(Path.cwd() / f'{addr}_result.txt').unlink()
                print('\n[-] Метаданные не получены')

Python:
def result_save(addr):
    dir_f = Path(Path.cwd() / addr)
    doc = docx.Document()
    style = doc.styles['Normal']
    style.font.name = 'Arial'
    style.font.size = Pt(10)

    if dir_f.exists():
        with open(Path.cwd() / f'{addr}_result.txt', 'a', encoding='utf-8') as result:
            if len(author) > 0:
                result.write(f'Author:\n{"-" * 25}\n\n')
                for pers in author:
                    result.write(f'{pers}\n')
            if len(soft) > 0:
                result.write(f'\n\nSoftware used:\n{"-" * 25}\n\n')
                for s in soft:
                    result.write(f'{s}\n')
            if len(email) > 0:
                result.write(f'\n\nE-mail:\n{"-" * 25}\n\n')
                for mail in email:
                    result.write(f'{mail}\n')

        if Path(Path.cwd() / f'{addr}_result.txt').exists():
            with open(Path.cwd() / f'{addr}_result.txt', 'r', encoding='utf-8') as res:
                read = res.read()
            if len(read.strip()) > 0:
                try:
                    doc.add_paragraph(read)
                except ValueError:
                    return
                doc.save(Path.cwd() / f'{addr}_result.docx')
                print(f'\n[+] Данные сохранены: {Path.cwd() / f"{addr}_result.docx"}')
            else:
                Path(Path.cwd() / f'{addr}_result.txt').unlink()
                print('\n[-] Метаданные не получены')

Что же, вот в принципе и все. Кажется, что ничего не забыл. И описал все функции. Потому привожу полный код скрипта ниже.

Python:
# pip install python-docx
# pip install pymupdf
# pip install requests
# pip install fake-useragent
# pip install olefile
# pip install selenium
# pip install Pillow
# pip install bs4
# pip install lxml
# pip install urllib3
# pip install google

import os
import platform
import re
import threading
import time
import urllib
import zipfile
from pathlib import Path
from xml.etree import ElementTree as etree

import docx
import fitz
import olefile
import requests
import selenium
from PIL import Image, ExifTags
from bs4 import BeautifulSoup
from docx.shared import Pt
from fake_useragent import UserAgent
from googlesearch import search
from selenium import webdriver
from selenium.webdriver.common.by import By
from selenium.webdriver.firefox.options import Options
from selenium.webdriver.firefox.service import Service

requests.packages.urllib3.disable_warnings()

author = set()
soft = set()
email = set()


# функция поиска ссылок в Google
def search_g(site: str, col: int, doc: str):
    dork = f"site:{site} filetype:{doc}"
    res_list = []
    try:
        for num, results in enumerate(search(dork, tld="ru", lang="ru", num=int(col), start=0, stop=None, pause=2.0,
                                             extra_params={"filter": "0"})):
            if num == int(col):
                break
            print(f'\r[~] Поиск: {num + 1}/{col}', end='')
            res_list.append(results)
        return res_list
    except urllib.error.HTTPError:
        return 'error'


# функция прокрутки страницы в Selenium
def scroll_to_bottom(driver):
    old_position = 0
    new_position = None

    while new_position != old_position:
        # Get old scroll position
        old_position = driver.execute_script(
            ("return (window.pageYOffset !== undefined) ?"
             " window.pageYOffset : (document.documentElement ||"
             " document.body.parentNode || document.body);"))
        # Sleep and Scroll
        time.sleep(1)
        driver.execute_script((
            "var scrollingElement = (document.scrollingElement ||"
            " document.body);scrollingElement.scrollTop ="
            " scrollingElement.scrollHeight;"))
        # Get new position
        new_position = driver.execute_script(
            ("return (window.pageYOffset !== undefined) ?"
             " window.pageYOffset : (document.documentElement ||"
             " document.body.parentNode || document.body);"))


# функция получения ссылок из DuckDuckGo
def get_bro(addr, docs):
    options = Options()
    options.headless = True

    path_ex = ''

    if platform.system() == "Linux":
        if not Path(Path.cwd() / 'geckodriver' / 'geckodriver').exists():
            print('[-] Нет драйвера для запуска браузера. Ссылка для загрузки: '
                  'https://github.com/mozilla/geckodriver/releases')
            return
        else:
            path_ex = Path.cwd() / 'geckodriver' / 'geckodriver'
    elif platform.system() == "Windows":
        if not Path(Path.cwd() / 'geckodriver' / 'geckodriver.exe').exists():
            print('[-] Нет драйвера для запуска браузера. Ссылка для загрузки: '
                  'https://github.com/mozilla/geckodriver/releases')
            return
        else:
            path_ex = Path.cwd() / 'geckodriver' / 'geckodriver.exe'

    browser = webdriver.Firefox(options=options, service=Service(log_path=os.devnull, executable_path=str(path_ex)))

    link_set = set()

    print('\n[+] Поиск в DuckDuckGo')
    for item in docs:
        print(f'   - Поиск документов: "{item}"')
        url = f'https://duckduckgo.com/?q=site%3A{addr}+filetype%3A{item}&t=h_&ia=web'
        browser.get(url)
        scroll_to_bottom(browser)

        num = 1
        while True:
            scroll_to_bottom(browser)
            try:
                more = browser.find_element(By.XPATH, f'//*[@id="rld-{num}"]/a')
                more.click()
                num += 1
            except selenium.common.exceptions.NoSuchElementException:
                break
        html = browser.page_source
        soup = BeautifulSoup(html, 'lxml')
        links = soup.find_all('div', class_='nrn-react-div')

        for link in links:
            art = link.find('article').find_all('a', class_='eVNpHGjtxRBq_gLOfGDr')

            for a_link in art:
                if str(a_link['href']).startswith("https://duckduckgo.com"):
                    continue

                link_set.add(a_link['href'])
    print('\n[+] Поиск документов завершен')

    browser.close()
    browser.quit()

    if len(link_set) > 0:
        print(f'\n[+] Найдено ссылок: {len(link_set)}')

        dir_p = Path.cwd() / addr
        dir_p.mkdir(exist_ok=True)

        doc_downloads(link_set, addr)
    else:
        print(f'\n[-] Документов не найдено')


# функция загрузки файлов
def downloads(url, addr):
    ua = UserAgent()
    headers = {
        'user-agent': ua.random,
        'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,image/apng,*/*;q=0.8'
    }

    try:
        resp = ''
        try:
            resp = requests.get(url=url, headers=headers, verify=False, timeout=30)
            doc_name = requests.utils.unquote(
                resp.headers['Content-Disposition'].replace("filename", "").replace("attachment", "").
                replace("UTF-8", "").replace("*", "").replace(";", "").replace("=", "").replace("'", "").
                replace('"', "").replace('/', "").replace('inline', ""))
        except KeyError:
            doc_name = requests.utils.unquote(url.split("/")[-1])
        except (requests.exceptions.ReadTimeout, requests.exceptions.ChunkedEncodingError):
            return

        try:
            if Path(Path.cwd() / addr / doc_name).exists():
                return
            with open(Path.cwd() / addr / doc_name, 'wb') as file:
                for chunk in resp.iter_content(chunk_size=1024):
                    if chunk:
                        file.write(chunk)
        except (OSError, requests.exceptions.ChunkedEncodingError):
            return
    except requests.exceptions.ConnectionError:
        return


# запуск потоков для загрузки файлов
def doc_downloads(links, addr):
    print('[+] Начинаю загрузку')
    threads = []

    for link in links:
        t = threading.Thread(target=downloads, kwargs={'url': link, 'addr': addr})
        t.daemon = True
        t.start()
        threads.append(t)
        time.sleep(0.3)

        for num, thread in enumerate(threads):
            print(f'\r   - Обработано: {num + 1}', end='')
            thread.join()


# функция проверки наличия в тексте e-mail
def email_find(text):
    emails = re.findall("([a-zA-Z0-9_.+-][email protected][a-zA-Z0-9-]+\.[a-zA-Z0-9-.]+)", text)
    if emails and len(emails) > 0:
        for em in emails:
            email.add(em)


# функция обработки метаданных из картинок документов
def image_metadata(file, addr):
    img = Image.open(file)
    try:
        exif = {ExifTags.TAGS[k]: v for k, v in img._getexif().items() if k in ExifTags.TAGS}
        with open(Path.cwd() / f'{addr}_meta.txt', 'a', encoding='utf-8') as meta:
            for info in exif:
                if info == 'GPSInfo':
                    try:
                        meta.write('GPSInfo\n\n')
                        meta.write(
                            f'{info:27}: lat {exif[info][2]} {exif[info][1]} - long {exif[info][4]} {exif[info][3]}\n')
                    except KeyError:
                        pass
                else:
                    if isinstance(exif[info], bytes):
                        try:
                            info_d = exif[info].decode()
                            meta.write(f'{info:25}: {info_d}\n')
                        except UnicodeDecodeError:
                            meta.write(f'{info:25}: {exif[info]}\n')
                    else:
                        meta.write(f'{info:25}: {exif[info]}\n')
            meta.write(f"{'-' * 25}\n")
    except (AttributeError, zipfile.BadZipFile):
        return


# получение текста и метаданных pdf
def pdf_metadata(item):
    try:
        doc = fitz.open(item)
    except fitz.fitz.FileDataError:
        return
    if doc.metadata['author'] != '':
        author.add(doc.metadata['author'])
    creator = ''
    producer = ''
    if doc.metadata['creator'] != '':
        creator = doc.metadata['creator']
    if doc.metadata['producer'] != '':
        producer = doc.metadata["producer"]
    if creator == '' and producer == '':
        pass
    else:
        soft.add(f'{creator} ({producer})'.strip())
    for current_page in range(len(doc)):
        if doc.load_page(current_page).get_text("text") != "":
            text = doc.load_page(current_page).get_text("text")
            email_find(str(text))


# получение текста и метаданных из
# документов Office 2007 и выше
# форматы: docx, pptx, xlsx
def microdoc_metadata(item, addr):
    zipfile.is_zipfile(item)
    try:
        zfile = zipfile.ZipFile(item)
    except zipfile.BadZipFile:
        return
    try:
        core_xml = etree.fromstring(zfile.read('docProps/core.xml'))
        app_xml = etree.fromstring(zfile.read('docProps/app.xml'))
    except KeyError:
        return
    if item.suffix == ".docx":
        try:
            doc_xml = etree.fromstring(zfile.read('word/document.xml'))
        except KeyError:
            return
        for element in doc_xml:
            if element.tag == '{http://schemas.openxmlformats.org/wordprocessingml/2006/main}body':
                for elem in element.iter():
                    if elem.tag == '{http://schemas.openxmlformats.org/wordprocessingml/2006/main}t':
                        email_find(str(elem.text))
    elif item.suffix == ".xlsx":
        try:
            xlsx_xml = etree.fromstring(zfile.read('xl/sharedStrings.xml'))
        except KeyError:
            return
        for element in xlsx_xml:
            for elem in element.iter():
                if elem.tag == '{http://schemas.openxmlformats.org/spreadsheetml/2006/main}t':
                    email_find(str(elem.text))
    elif item.suffix == ".pptx":
        for files in zfile.namelist():
            if 'slides' in files and files.endswith(".xml"):
                pptx = etree.fromstring(zfile.read(files))
                for element in pptx:
                    if element.tag == '{http://schemas.openxmlformats.org/presentationml/2006/main}cSld':
                        for elem in element.iter():
                            if elem.tag == '{http://schemas.openxmlformats.org/drawingml/2006/main}t':
                                email_find(str(elem.text))
    for element in core_xml.iter():
        if element.tag == '{http://purl.org/dc/elements/1.1/}creator':
            if element.text is not None or element.text != '':
                author.add(element.text)
        elif element.tag == \
                '{http://schemas.openxmlformats.org/package/2006/metadata/core-properties}lastModifiedBy':
            if element.text is not None or element.text != '':
                author.add(element.text)
    for element in app_xml.iter():
        if element.tag == '{http://schemas.openxmlformats.org/officeDocument/2006/extended-properties}Properties':
            app_n = ''
            app_v = ''
            for elem in element.iter():
                if elem.tag == '{http://schemas.openxmlformats.org/officeDocument/2006/extended-properties}Application':
                    app_n = elem.text
                elif elem.tag == '{http://schemas.openxmlformats.org/officeDocument/2006/extended-properties}AppVersion':
                    app_v = elem.text
            soft.add(f'{app_n}_{app_v}')
    z = zfile.namelist()
    for i in z:
        if "media" in i and i.endswith(".jpg"):
            file = zfile.open(i)
            image_metadata(file, addr)
        elif "media" in i and i.endswith(".jpeg"):
            file = zfile.open(i)
            image_metadata(file, addr)
        elif "media" in i and i.endswith(".png"):
            file = zfile.open(i)
            image_metadata(file, addr)


# функция получения текста и метаданных из
# документов LibreOffice
# форматы: odf, odg, odp, ods, odt'
def libre_office_metadata(item, addr):
    zipfile.is_zipfile(item)
    try:
        zfile = zipfile.ZipFile(item)
    except zipfile.BadZipFile:
        return
    core_xml = etree.fromstring(zfile.read('meta.xml'))
    if item.suffix in [".odp", ".odg", ".ods"]:
        try:
            doc_xml = etree.fromstring(zfile.read('content.xml'))
        except KeyError:
            return
        for element in doc_xml:
            if element.tag == '{urn:oasis:names:tc:opendocument:xmlns:office:1.0}body':
                for elem in element.iter():
                    if elem.tag == '{urn:oasis:names:tc:opendocument:xmlns:text:1.0}p':
                        if elem.text is not None:
                            email_find(str(elem.text))
    elif item.suffix == ".odf":
        try:
            odf_xml = etree.fromstring(zfile.read('content.xml'))
        except KeyError:
            return
        for element in odf_xml:
            for elem in element.iter():
                if elem.tag == '{http://www.w3.org/1998/Math/MathML}annotation':
                    email_find(str(elem.text))
    elif item.suffix == ".odt":
        try:
            odt_xml = zfile.read('content.xml')
        except KeyError:
            return
        soup = BeautifulSoup(odt_xml, 'xml')
        elements = soup.find_all('text:p')
        for element in elements:
            if element.text != "":
                email_find(str(element.text))

    for element in core_xml.iter():
        if element.tag == '{urn:oasis:names:tc:opendocument:xmlns:meta:1.0}generator':
            soft.add(f'{element.text}{item.suffix}')
    z = zfile.namelist()
    for i in z:
        if "Pictures" in i and i.endswith(".jpg"):
            file = zfile.open(i)
            image_metadata(file, addr)
        elif "Pictures" in i and i.endswith(".jpeg"):
            file = zfile.open(i)
            image_metadata(file, addr)
        elif "Pictures" in i and i.endswith(".png"):
            file = zfile.open(i)
            image_metadata(file, addr)


# функция получения метаданных из документов
# Office 2003. форматы: doc, ppt, xls
def old_doc_metadata(item):
    try:
        assert olefile.isOleFile(item)
    except AssertionError:
        return
    ole = olefile.OleFileIO(item)
    meta = ole.get_metadata()

    try:
        if isinstance(meta.author, bytes):
            if meta.author.decode() != "" and meta.author.decode() is not None:
                author.add(meta.author.decode())
        else:
            if meta.author != "" and meta.author is not None:
                author.add(meta.author)
    except UnicodeDecodeError:
        author.add(meta.author.decode('windows-1251'))

    try:
        if isinstance(meta.last_saved_by, bytes):
            if meta.last_saved_by.decode() != "" and meta.last_saved_by.decode() is not None:
                author.add(meta.last_saved_by.decode())
        else:
            if meta.last_saved_by != "" and meta.last_saved_by is not None:
                author.add(meta.last_saved_by)
    except UnicodeDecodeError:
        author.add(meta.last_saved_by.decode('windows-1251'))

    try:
        if isinstance(meta.creating_application, bytes):
            if meta.creating_application.decode() != "" and meta.creating_application.decode() is not None:
                soft.add(meta.creating_application.decode())
        else:
            if meta.creating_application != "" and meta.creating_application is not None:
                soft.add(meta.creating_application)
    except UnicodeDecodeError:
        soft.add(meta.creating_application.decode('windows-1251'))


# функция запуска функция для получения метаданных
# в соответствии с типом файла
def file_search(addr):
    dirs = Path(Path.cwd() / addr)
    if Path.exists(dirs):
        files = Path.iterdir(dirs)
        for file in files:
            if file.suffix == ".pdf":
                pdf_metadata(file)
            elif file.suffix in ['.docx', '.pptx', '.xlsx', '.ppsx']:
                microdoc_metadata(file, addr)
            elif file.suffix in ['.odf', '.odg', '.odp', '.ods', '.odt']:
                libre_office_metadata(file, addr)
            elif file.suffix in ['.doc', '.xls', '.ppt']:
                old_doc_metadata(file)
            elif file.suffix in ['.jpg', '.jpeg']:
                image_metadata(file, addr)


# функция сохранения полученных метаданных в Word
def result_save(addr):
    dir_f = Path(Path.cwd() / addr)
    doc = docx.Document()
    style = doc.styles['Normal']
    style.font.name = 'Arial'
    style.font.size = Pt(10)

    if dir_f.exists():
        with open(Path.cwd() / f'{addr}_result.txt', 'a', encoding='utf-8') as result:
            if len(author) > 0:
                result.write(f'Author:\n{"-" * 25}\n\n')
                for pers in author:
                    result.write(f'{pers}\n')
            if len(soft) > 0:
                result.write(f'\n\nSoftware used:\n{"-" * 25}\n\n')
                for s in soft:
                    result.write(f'{s}\n')
            if len(email) > 0:
                result.write(f'\n\nE-mail:\n{"-" * 25}\n\n')
                for mail in email:
                    result.write(f'{mail}\n')

        if Path(Path.cwd() / f'{addr}_result.txt').exists():
            with open(Path.cwd() / f'{addr}_result.txt', 'r', encoding='utf-8') as res:
                read = res.read()
            if len(read.strip()) > 0:
                try:
                    doc.add_paragraph(read)
                except ValueError:
                    return
                doc.save(Path.cwd() / f'{addr}_result.docx')
                print(f'\n[+] Данные сохранены: {Path.cwd() / f"{addr}_result.docx"}')
            else:
                Path(Path.cwd() / f'{addr}_result.txt').unlink()
                print('\n[-] Метаданные не получены')


# функция запуска поиска в Google
def goo_start(addr, docs, res_count):
    print('[+] Поиск в Google')
    for item in docs:
        res = search_g(addr, res_count, item)
        if res != 'error':
            if len(res) > 0:
                print(f'\n   [+] Найдено документов {item}: {len(res)}')

                dir_p = Path.cwd() / addr
                dir_p.mkdir(exist_ok=True)

                doc_downloads(res, addr)
                time.sleep(10)
            else:
                print(f'\n[-] {item} не найдено')
        else:
            print('[-] Превышено кол-во запросов')
            return 'error'


# функция ввода пользовательских данных
# передача данных в функции запуска поиска
# запуск функции поиска метаданных
# запуск функции сохранения метаданных
def main():
    addr = input('Введите сайт для поиска: ')
    doc_type = input('Введите тип документов для поиска: ')
    docs = doc_type.split()
    if len(docs[0]) > 4:
        print('- Неверный тип документов или не поставлен пробел')
        return
    res_count = int(input('Введите количество результатов поиска: ') or '20')
    print(' ')
    start = time.monotonic()
    if len(docs) >= 1:
        goo = goo_start(addr, docs, res_count)
        if goo == "error":
            get_bro(addr, docs)
        file_search(addr)
        result_save(addr)
    elif len(docs) == 0:
        print('- Вы не ввели тип документов для поиска')
        return
    print(f'\n[+] Время работы: {time.monotonic() - start}')


if __name__ == "__main__":
    main()

Ну и картинка работы скрипта. Для проверки поискал документы pdf на codeby.net. Конечно же, много не нашлось, но вот что удалось получить из документов:

001.png

Спасибо за внимание. Надеюсь, что данная информация будет вам полезной
 

Вложения

  • dork_doc_search.py.zip
    5,3 КБ · Просмотры: 14
  • Нравится
Реакции: Notsaint
Мы в соцсетях: