Статья Обработка наборов данных в csv-файлах с помощью Python. Часть 2

В первой части статьи мы написали функции для нормализации текстовых данных, получаемых из ячеек файла «.csv». Продолжим дописывать код до логического завершения и в данной статье создадим обработчик строк файла, который будет использовать написанные нами функции. Реализуем функции для чтения данных из файлов и сохранения обработанных данных в файл.

logo.png


Дисклеймер: Все данные, предоставленные в данной статье, взяты из открытых источников, не призывают к действию и являются только лишь данными для ознакомления, и изучения механизмов используемых технологий.


Обработчик строк файла «.csv»

Для начала создадим файл check_csv_row.py. В нем мы будем писать одноименную функцию, с помощью которой будем обрабатывать строку. В созданный нами файл импортируем ранее написанные нами функции нормализации.

Python:
from mod.data_normalize import fio_normalize, phone_normalize, email_normalize, bdate_normalize, other_check

На самом деле, все, что используется в обработчике, не более чем комбинация ранее определенных нами функций. Тем не менее, так как столбцов, которые необходимо обработать довольно большое количество, код функции получился довольно большим.

Создадим функцию check_csv_row(item_dict: dict, heads_list: list, heads: list, row: list) -> list. На вход она получает словарь с ключами в виде заголовков столбцов и значений из текущей строки. Список стандартных, предопределенных заголовков, список заголовков из файла, и строку для обработки. Возвращаем же сформированный список значений, которые нужны в результирующем файле после обработки.

Для начала создадим словарь, куда будем помещать получаемые значения с заголовками. Итерируемся по словарю с текущими значениями. С помощью match … case определяем, какое значение имеет текущий ключ. В соответствии с этим выполняем те или иные действия. Для примера, если ключ равен «fio», обновляем созданный словарь ключом «fio» и нормализованным значением, если значение ключа не пусто. Иначе, обновляем данное значение ключом с пустым значением.

Подобным образом обрабатываем нужные нам ключи. Отдельно обрабатываем id социальных сетей. В итоге формируем список из полученных значений, даже если они пусты и возвращаем из функции.

Python:
from mod.data_normalize import fio_normalize, phone_normalize, email_normalize, bdate_normalize, other_check


def check_csv_row(item_dict: dict, heads_list: list, heads: list, row: list) -> list:
    """Обработка строк полученных из csv."""
    items = dict()
    for key in item_dict:
        match key:
            case "fio":
                items.update({"fio": fio_normalize(item_dict.get(key)).strip()}) if item_dict.get(key) \
                    else items.update({"fio": ""})
            case "uname":
                if item_dict.get(key):
                    if "http" in item_dict.get(key) or len(item_dict.get(key)) > 50:
                        items.update({"uname": ""})
                    items.update({"uname": item_dict.get(key).strip().encode().decode()})
                else:
                    items.update({"uname": ""})
            case "phone":
                items.update({"phone": phone_normalize(item_dict.get("phone")).strip()}) if item_dict.get(key) \
                    else items.update({"phone": ""})
            case "bdate":
                items.update({"bdate": bdate_normalize(item_dict.get(key))}) if item_dict.get(key) \
                    else items.update({"bdate": ""})
            case "email":
                items.update({"email": email_normalize(item_dict.get(key)).strip()}) if item_dict.get(key) \
                    else items.update({"email": ""})
            case "snils":
                items.update({"snils": "".join(x for x in item_dict.get(key) if x.isdecimal())}) if item_dict.get(key) \
                    else items.update({"snils": ""})
            case "inn":
                items.update({"inn": "".join(x for x in item_dict.get("inn") if x.isdecimal())}) if item_dict.get(key) \
                    else items.update({"inn": ""})
            case "address":
                if item_dict.get(key):
                    items.update({"address": item_dict.get(key).strip().replace("'", "").replace('"', '')})
                elif item_dict.get(key) == "":
                    temp_addr = []
                    for ky in item_dict:
                        if item_dict.get(ky) and ky == "house":
                            temp_addr.append(f"д. {item_dict.get(ky)}")
                        elif item_dict.get(ky) and ky == "build":
                            temp_addr.append(f"стр. {item_dict.get(ky)}")
                        elif item_dict.get(ky) and ky == "corp":
                            temp_addr.append(f"корп. {item_dict.get(ky)}")
                        elif item_dict.get(ky) and ky == "office":
                            temp_addr.append(f"офф. {item_dict.get(ky)}")
                        elif item_dict.get(ky) and ky == "kv":
                            temp_addr.append(f"кв. {item_dict.get(ky)}")
                        elif item_dict.get(ky) and ky in ["zip", "country", "region", "m_obr", "rayon",
                                                          "city", "street", "area"]:
                            temp_addr.append(item_dict.get(ky))
                    items.update({"address": ", ".join(temp_addr)}) if temp_addr else items.update({"address": ""})
            case "passport":
                if item_dict.get(key):
                    items.update({"document": item_dict.get(key)})
                elif item_dict.get(key) == "":
                    doc = []
                    for k in item_dict:
                        if item_dict.get(k) and k == "pass_sn":
                            doc.append(f'Док. с/н: {item_dict.get(k).strip()}')
                        if item_dict.get(k) and k == "pass_dout":
                            doc.append(f'Дата выдачи: {item_dict.get(k).strip()}')
                        if item_dict.get(k) and k == "pass_iss":
                            doc.append(f'Выдан: {item_dict.get(k).strip()}')
                        if item_dict.get(k) and k == "pass_kp":
                            doc.append(f'Код подр.: {item_dict.get(k).strip()}')
                    items.update({"document": ", ".join(doc)}) if doc else items.update({"document": ""})

        """social account"""
        if key in ["vk_id", "ok_id", "fb_id", "lj_id", "tg_id", "insta_id", "mailru_id", "yandex_id",
                   "google_id", "x_id", "skype"]:
            soc = item_dict.get(key).replace("'", "") if item_dict.get(key) else ""
            items.update({key: soc})
            continue

    """other items"""
    other = other_check(heads_list, heads, row).strip()

    return [items.get("fio"), items.get("uname"), items.get("bdate"), items.get("phone"), items.get("email"),
            items.get("inn"), items.get("snils"), items.get("document"), items.get("address"),
            items.get("vk_id", ""), items.get("ok_id", ""), items.get("fb_id", ""), items.get("lj_id", ""),
            items.get("tg_id", ""), items.get("insta_id", ""), items.get("mailru_id", ""), items.get("yandex_id", ""),
            items.get("google_id", ""), items.get("x_id", ""), items.get("skype", ""), other]


Чтение «.csv» файла. Сохранение обработанных данных

Создадим итоговый файл csv_checker.py, в котором будем читать данные из файла, обрабатывать и сохранять обработанные значения.
Импортируем необходимые для работы библиотеки и созданные нами ранее функции.

Python:
import csv
import time
from pathlib import Path

from mod.check_csv_row import check_csv_row
from mod.data_normalize import count_val

Как видно из импорта, созданные функции я положил в директорию «mod», в которой создал файл «__init__.py» для того, чтобы данная директория считалась пакетом.
На следующем шаге увеличим максимальный размер поля разрешенный парсером, так как поля могут быть довольно большого размера.

Python:
csv.field_size_limit(2147483647)

Создадим список с заголовками, которые будут в новом обработанном файле row_list. Создадим список data_list, в который для начала положим список с заголовками, а в последующем будем добавлять обработанные списки со значениями, для последующего сохранения в файл. Также создадим два счетчика, первый из которых cnt считает количество сохраненных файлов и присваивает каждому названию файла суффикс с номером. Второй счетчик count служит скорее для декоративных целей, чтобы выводить информацию об обработанном количестве значений в терминал.

Python:
row_list = ["fio", "uname", "bdate", "phone", "email", "inn", "snils", "document", "address", "vk_id", "ok_id",
            "fb_id", "lj_id", "tg_id", "insta_id", "mailru_id", "yandex_id", "google_id", "x_id", "skype", "other"]
data_list = [row_list]
cnt = 0
count = 0

Создадим функцию save_data(file: Path) -> None, которая на входе получает путь к текущему обрабатываемому файлу, и ничего не возвращает. В данной функции мы будем сохранять обработанные значения. Для начала в директории с обрабатываемым файлом создадим папку «conv», куда будем складывать сохраняемые файлы. Получим имя обрабатываемого файла name_file. Открываем файл на запись в только что созданной директории, добавляем к имени обрабатываемого файла суффикс из счетчика. После этого сохраняем все значения из списка data_list. Увеличиваем счетчик count на количество элементов в списке data_list. Выводим сообщение о сохранении файла в терминал. Очищаем список data_list и помещаем в него заголовки из списка row_list для последующей обработки. Увеличиваем счетчик сохраненных файлов cnt.

Python:
def save_data(file: Path) -> None:
    global cnt, count
    (Path(file).parent / "conv").mkdir(exist_ok=True)
    name_file = Path(file).name.removesuffix(Path(file).suffix)
    with open(Path(file).parent / "conv" / f"{name_file}_{cnt}.csv", mode="w", encoding='utf-8',
              newline='') as w_file:
        file_writer = csv.writer(w_file, delimiter=";")
        file_writer.writerows(data_list)
    count += len(data_list)
    print(f"\n{'-'*25}\nsave: {count}\n{'-'*25}")
    data_list.clear()
    data_list.append(row_list)
    cnt += 1

Создадим функцию read_files(file: Path, cnt_line: int) -> None, которая ничего не возвращает, а на вход получает путь к файлу «.csv», а также количество строк в обрабатываемом файле.

Создаем список heads_list, в котором будут содержаться все значения заголовков, которые мы будем обрабатывать. Создадим список heads, куда будем помещать полученные из файла заголовки.

Открываем файл и считываем построчно значения. Добавим функцию enumerate, для подсчета количества строк. Укажем в качестве разделителя значений в файле «|». Создаем словарь, куда поместим ключи в виде заголовков и значения текущей строки, соответствующие этим заголовкам. Проверим номер строки. Если он 0, значит содержит заголовки. А следовательно, считываем их и помещаем в список heads.

Получаем в переменную items_list список обработанных значений из. Проверяем с помощью функции count_val не является ли строка для сохранения пустой, либо содержит более одного значения. Выводим, скорее в декоративных целях, информацию об обрабатываемой строке в терминал. После чего в data_list добавляем полученные значения из items_list. Проверяем, не является ли количество объектов в списке data_list равным 100001. Если да, сохраняем полученные значения в файл. Такую же проверку выполняем при выходе из цикла, то есть после того, как последняя строка будет обработана для того, чтобы сохранить значения, если размер списка data_list меньше, чем 100001, но больше 0.

Python:
def read_files(file: Path, cnt_line: int) -> None:
    global data_list, cnt, count
    heads_list = ["fio", "uname", "bdate", "phone", "email", "address", "zip", "country", "region", "area", "rayon",
                  "m_obr", "city", "street", "house", "corp", "build", "office", "floor", "kv", "pass_sn", "pass_iss",
                  "passport", "pass_dout", "pass_kp", "snils", "inn", "vk_id", "ok_id", "fb_id", "lj_id", "tg_id",
                  "insta_id", "mailru_id", "yandex_id", "google_id", "x_id", "skype"]
    heads = []
    with open(file, "r", encoding="utf-8") as cs:
        for nm, row in enumerate(csv.reader(cs, delimiter="|")):
            """creating a dictionary of column values"""
            item_dict = {}
            try:
                if nm == 0:
                    heads.extend([x.strip() for x in row])
                    continue
                for item in heads_list:
                    if item in heads:
                        item_dict.update({item: row[heads.index(item)].strip()})
                    else:
                        item_dict.update({item: ""})
            except IndexError as e:
                print(f"\nПроверьте csv на ошибки: {e}")
                exit()

            """text processing"""
            items_list = check_csv_row(item_dict, heads_list, heads, row)

            """check the number of values in the list"""
            if not count_val(items_list):
                continue
            print("\r\033[K", end="")
            print(f"\r{nm}/{cnt_line} | {items_list[0]} | {items_list[1]} | {items_list[3]}", end="")

            """adding a list to the save list"""
            data_list.append(items_list)

            """check the number of objects in the list, save, clear"""
            if len(data_list) == 100001:
                save_data(file)
        if 1 < len(data_list) < 100001:
            save_data(file)
        cnt = 0

И, наконец, финальная функция, с которой все и начинается - main() -> None. Она ничего не получает и не возвращает. Для начала указываем путь к директории для обработки файлов. Проверяем, является ли переданный путь путем к директории и вообще, существует ли он. Если нет, выходим из скрипта. Если все в порядке считываем все пути к файлам в директории с расширением «.csv». Проверяем количество файлов в директории. Если оно меньше 1, выходим из скрипта.

Итерируемся по списку файлов. В переменную cnt_line получаем количество строк в файле. Выводим информацию о текущем обрабатываемом файле в терминал. Передаем в функцию read_files путь к обрабатываемому файлу и количество строк в файле. Следующая строка закомментирована. Если ее раскомментировать, исходный файл после обработки будет удалятся. Однако, по опыту скажу, что не стоит этого делать, проще удалить руками. Мало ли для чего вам может понадобиться исходный файл.

Ну и далее, выводим в терминал время обработки файлов.

Python:
def main() -> None:
    try:
        path = input("path folder: >>> ")
        if not Path(path).exists() or not path or not Path(path).is_dir():
            exit(0)
        tm = time.monotonic()
        files = [x for x in Path(path).iterdir() if Path(x).is_file() and Path(x).suffix in [".csv", ".CSV"]]
        if len(files) < 1:
            exit(0)
        for nn, file in enumerate(files):
            cnt_line = sum(1 for _ in open(file, "rb"))
            print(f"\n{nn + 1}/{len(files)} | {Path(file).name} | Lines: {cnt_line}\n{'*' * 35}")
            read_files(file, cnt_line)
            # Path(file).unlink()
        ch_time = (f'All check {len(files)} complete | {(int(time.monotonic() - tm) // 3600)} h. '
                   f'{(int(time.monotonic() - tm) % 3600) // 60} m. {float(time.monotonic() - tm) % 60:.2f} s.')
        lnt = len(ch_time)
        print(f'\n{"-" * lnt}\n{ch_time}\n{"-" * lnt}')
    except KeyboardInterrupt:
        exit()

Python:
import csv
import time
from pathlib import Path

from mod.check_csv_row import check_csv_row
from mod.data_normalize import count_val

csv.field_size_limit(2147483647)

row_list = ["fio", "uname", "bdate", "phone", "email", "inn", "snils", "document", "address", "vk_id", "ok_id",
            "fb_id", "lj_id", "tg_id", "insta_id", "mailru_id", "yandex_id", "google_id", "x_id", "skype", "other"]
data_list = [row_list]
cnt = 0
count = 0


def save_data(file: Path) -> None:
    global cnt, count
    (Path(file).parent / "conv").mkdir(exist_ok=True)
    name_file = Path(file).name.removesuffix(Path(file).suffix)
    with open(Path(file).parent / "conv" / f"{name_file}_{cnt}.csv", mode="w", encoding='utf-8',
              newline='') as w_file:
        file_writer = csv.writer(w_file, delimiter=";")
        file_writer.writerows(data_list)
    count += len(data_list)
    print(f"\n{'-'*25}\nsave: {count}\n{'-'*25}")
    data_list.clear()
    data_list.append(row_list)
    cnt += 1


def read_files(file: Path, cnt_line: int) -> None:
    global data_list, cnt, count
    heads_list = ["fio", "uname", "bdate", "phone", "email", "address", "zip", "country", "region", "area", "rayon",
                  "m_obr", "city", "street", "house", "corp", "build", "office", "floor", "kv", "pass_sn", "pass_iss",
                  "passport", "pass_dout", "pass_kp", "snils", "inn", "vk_id", "ok_id", "fb_id", "lj_id", "tg_id",
                  "insta_id", "mailru_id", "yandex_id", "google_id", "x_id", "skype"]
    heads = []
    with open(file, "r", encoding="utf-8") as cs:
        for nm, row in enumerate(csv.reader(cs, delimiter="|")):
            """creating a dictionary of column values"""
            item_dict = {}
            try:
                if nm == 0:
                    heads.extend([x.strip() for x in row])
                    continue
                for item in heads_list:
                    if item in heads:
                        item_dict.update({item: row[heads.index(item)].strip()})
                    else:
                        item_dict.update({item: ""})
            except IndexError as e:
                print(f"\nПроверьте csv на ошибки: {e}")
                exit()

            """text processing"""
            items_list = check_csv_row(item_dict, heads_list, heads, row)

            """check the number of values in the list"""
            if not count_val(items_list):
                continue
            print("\r\033[K", end="")
            print(f"\r{nm}/{cnt_line} | {items_list[0]} | {items_list[1]} | {items_list[3]}", end="")

            """adding a list to the save list"""
            data_list.append(items_list)

            """check the number of objects in the list, save, clear"""
            if len(data_list) == 100001:
                save_data(file)
        if 1 < len(data_list) < 100001:
            save_data(file)
        cnt = 0


def main() -> None:
    try:
        path = input("path folder: >>> ")
        if not Path(path).exists() or not path or not Path(path).is_dir():
            exit(0)
        tm = time.monotonic()
        files = [x for x in Path(path).iterdir() if Path(x).is_file() and Path(x).suffix in [".csv", ".CSV"]]
        if len(files) < 1:
            exit(0)
        for nn, file in enumerate(files):
            cnt_line = sum(1 for _ in open(file, "rb"))
            print(f"\n{nn + 1}/{len(files)} | {Path(file).name} | Lines: {cnt_line}\n{'*' * 35}")
            read_files(file, cnt_line)
            # Path(file).unlink()
        ch_time = (f'All check {len(files)} complete | {(int(time.monotonic() - tm) // 3600)} h. '
                   f'{(int(time.monotonic() - tm) % 3600) // 60} m. {float(time.monotonic() - tm) % 60:.2f} s.')
        lnt = len(ch_time)
        print(f'\n{"-" * lnt}\n{ch_time}\n{"-" * lnt}')
    except KeyboardInterrupt:
        exit()


if __name__ == "__main__":
    main()

Таким образом, после создания всех необходимых файлов с кодом, структура нашего проекта должна быть следующей:

screenshot8.png

Для примера обработаем сгенерированный файл со случайными данными. Вот, что получилось в итоге:

screenshot7.png

А это результат обработки файла содержащего серию и номер документа, для примера:

screenshot9.png

Если вы обратили внимание, сохранение данных в файл происходит по достижении размера списка data_list в 100001 объект. Таким образом, в файл сохраняется только указанное количество строк. Данное количество может варироваться, по желанию, однако, не стоит делать его слишком большим, так как размеры исходного файла могут достигать не одного гигабайта, и вам просто может не хватить оперативной памяти для того, чтобы открыть его и сохранить новые значения.

screenshot10.png

Лучше написать скрипт, который будет объединять обработанные файлы в один с минимальным использованием оперативной памяти. Но, об этом в следующей статье.

Небольшое видео, демонстрирующее работу скрипта и объединение обработанных файлов:




А на сегодня все. Все файлы из статьи вы найдете во вложении, вместе с файлом примера.

Спасибо за внимание. Надеюсь, данная информация будет вам полезна
 

Вложения

  • code 2.zip
    148 байт · Просмотры: 49
  • primer.zip
    2,5 МБ · Просмотры: 49
Последнее редактирование:
Мы в соцсетях:

Обучение наступательной кибербезопасности в игровой форме. Начать игру!