Статья Слияние csv-файлов с одинаковой структурой заголовков с помощью Python

В предыдущих статьях по обработке наборов данных: статья 1, статья 2, мы написали скрипт, с помощью которого можно обработать данные из файлов «.csv», немного их очистив и нормализовав. Однако, этот скрипт сохранял данные, при большом количестве строк, в разбивке на несколько файлов. Давайте попробуем написать скрипт, который бы объединял эти файлы в один, без существенного использования оперативной памяти, так как размер файлов может быть достаточно велик и держать их в оперативке просто нерационально. В конце концов, она может просто закончиться.

logo.png

По сути, данный скрипт будет работать с любыми файлами «.csv» с одинаковой структурой заголовков. Для объединения файлов я нашел достаточно простой выход, через использование временной базы данных sqlite3, которая после добавления в нее данных и сохранения их в объединённый файл просто удаляется за ненадобностью. Конечно, данный метод тоже не лишен недостатков. При большом наборе данных, если требуется также проверять добавляемые данные на уникальность, чтобы не было одинаковых строк, он довольно медленный. Так как при увеличении количества записей тратиться больше времени на проверку. Однако, файлы размером в 15 миллионов строк он может объединить без особых усилий.


Создание вспомогательных функций

Создадим директорию «mod», в которой создадим файл «__init.py__», чтобы сделать данную директорию пакетом. Создадим файл base_work.py. В нем будут размещаться функции по работе с базой данных, создаваемой в процессе объединения файлов.

Импортируем необходимые библиотеки:

Python:
from pathlib import Path
from sqlite3 import connect

Создадим функцию create_base(que: str) -> None, которая ничего не возвращает, а принимает на вход строку со сформированным заранее запросом, который необходимо будет выполнить. Создаем, если не существует директорию «base», задаем путь к будущей базе данных в переменной path_base. Подключаемся к базе. Затем выполняем переданный в функцию запрос, после чего закрываем курсор и соединение с базой.

Python:
def create_base(que: str) -> None:
    path = Path.cwd() / 'base'
    path.mkdir(exist_ok=True)
    path_base = path / "merge.db"
    conn = connect(path_base)
    cur = conn.cursor()
    cur.execute(que)
    cur.close()
    conn.close()

Следующим шагом создадим функцию save_item(itm: list, num) -> None, с помощью которой будем сохранять данные в базе. На вход функция принимает данные в виде списка из кортежей, а также переменную num, в которой передаем строку с количеством столбцов. Подключаемся к базе данных.

Далее будут выполнятся команды, которые при нормальном функционировании базы sqlite не особо требуются, однако, в нашем случае, так как данных очень много, необходимо увеличить размер кэша, отключить ведение журнала, синхронизацию, определить хранилище данных в памяти. И только после этого выполнить запрос на добавление данных в базу указав при этом, чтобы одинаковые значения просто игнорировались и не добавлялись. Для этого, в предыдущей функции мы сделали каждое поле данной базы уникальным. Но, это мы сделаем чуть позже.

Python:
def save_item(itm: list, num: str) -> None:
    path_base = Path.cwd() / 'base' / "merge.db"
    conn = connect(path_base)
    curr = conn.cursor()
    curr.execute('PRAGMA cache_size = 200000')
    curr.execute('PRAGMA locking_mode = EXCLUSIVE')
    curr.execute('PRAGMA temp_store = MEMORY')
    curr.execute('PRAGMA synchronous = OFF')
    curr.execute('PRAGMA journal_mode = OFF')
    curr.execute('BEGIN TRANSACTION')
    curr.executemany(f"""INSERT OR IGNORE INTO merge VALUES({num});""", itm)
    curr.execute('END TRANSACTION')
    conn.commit()
    curr.close()
    conn.close()

Python:
from pathlib import Path
from sqlite3 import connect


def create_base(que: str) -> None:
    path = Path.cwd() / 'base'
    path.mkdir(exist_ok=True)
    path_base = path / "merge.db"
    conn = connect(path_base)
    cur = conn.cursor()
    cur.execute(que)
    cur.close()
    conn.close()


def save_item(itm: list, num: str) -> None:
    path_base = Path.cwd() / 'base' / "merge.db"
    conn = connect(path_base)
    curr = conn.cursor()
    curr.execute('PRAGMA cache_size = 200000')
    curr.execute('PRAGMA locking_mode = EXCLUSIVE')
    curr.execute('PRAGMA temp_store = MEMORY')
    curr.execute('PRAGMA synchronous = OFF')
    curr.execute('PRAGMA journal_mode = OFF')
    curr.execute('BEGIN TRANSACTION')
    curr.executemany(f"""INSERT OR IGNORE INTO merge VALUES({num});""", itm)
    curr.execute('END TRANSACTION')
    conn.commit()
    curr.close()
    conn.close()


Чтение данных, добавление в базу, сохранение в файл

Создадим файл merge_csv.py. Импортируем библиотеки и пакеты необходимые для работы функции.

Python:
import csv
import shutil
import time
from pathlib import Path
from sqlite3 import connect, OperationalError

from mod.base_work import create_base, save_item

Увеличим максимальный размер поля разрешенный парсером:

Python:
csv.field_size_limit(2147483647)

Создадим функцию csv_merge(heads: list, path: str, name: str) -> None, которая на вход получает список с заголовками, путь к обрабатываемым файлам, имя файла для сохранения объединенного файла.
Объявим список csv_row, куда будем добавлять данные для сохранения. Откроем файл на запись, добавим в него заголовки из списка heads.

Выполним подключение к базе данных, проитерируемся по запросу, который получает все столбцы из базы. Добавим в файл csv_row полученную строку. Затем проверим, не является ли количество объектов в csv_row равным 100000. Если да, сохраняем данные в файл, открыв его на дозапись. Очищаем список csv_row. То же самое сделаем после завершения цикла, но будем проверять список на наличие в нем объектов. Если их меньше, чем 100000, но больше 0, запишем данные в файл открытый на дозапись. Закрываем соединение с базой.

Python:
def csv_merge(heads: list, path: str, name: str) -> None:
    csv_row = []
    with open(Path(path).parent / name, mode="w", encoding='utf-8', newline='') as w_file:
        file_writer = csv.writer(w_file, delimiter=";")
        file_writer.writerow(heads)
    try:
        path_base = Path.cwd() / 'base' / "merge.db"
        conn = connect(path_base)
        curr = conn.cursor()
        for row in curr.execute("SELECT * FROM merge"):
            csv_row.append(list(row))
            if len(csv_row) == 100000:
                with open(Path(path).parent / name, mode="a", encoding='utf-8', newline='') as w_file:
                    file_writer = csv.writer(w_file, delimiter=";")
                    file_writer.writerows(csv_row)
                csv_row.clear()
        if 0 < len(csv_row) < 100000:
            with open(Path(path).parent / name, mode="a", encoding='utf-8', newline='') as w_file:
                file_writer = csv.writer(w_file, delimiter=";")
                file_writer.writerows(csv_row)
            csv_row.clear()
        curr.close()
        conn.close()
    except OperationalError:
        time.sleep(10)
        csv_merge(heads, path, name)

Создадим функцию main. Запросим путь к папке с файлами. Получим список файлов в папке. Объявим список heads, в который поместим заголовки. Объявим переменные num и name. В первой переменной будет содержаться количество столбцов для вставки. Во второй – имя объединенного файла. Проитерируемся по полученным файлам. Проверим, есть ли «_» в имени файла. Если да, обработаем имя с удалением определенного суффикса. Данный код писался для объединения файлов после обработки. Поэтому, там в теории должен быть суффикс _0. Если же его нет, то просто заберем имя первого файла, добавим к нему _merge.

Объявим список data_list, в который будем складывать строки из файлов. Откроем файл для чтения. Проитерируемся по данным. Проверим, является ли строка 0. Для этого применим функцию enumerate. Если строка нулевая, следовательно, она является строкой с заголовками. Но, мы эти заголовки можем уже один раз получить. И получать их второй раз не нужно. Поэтому проверим, если список heads не пуст, продолжим итерацию. Если же пуст, добавим в список запросы и заголовки. Это будет необходимо для формирования запроса на создание базы. Определим количество столбцов, которые будем вставлять в базу и передадим в переменную num. Сформируем запрос que, в который добавим сформированный ранее зарос на создание столбцов, а также сделаем каждый столбец уникальным. После чего передадим его в функцию create_base. После чего продолжим итерацию. Если номер строки не 0, добавляем строки из кортежей в список data_list. После проверяем количество объектов в data_list. Если оно равно необходимому нам, выполняем сохранение данных в базу с помощью функции save_item. После очищаем data_list. То же самое делаем после итерации по всем строкам файла. И сохраняем данные из data_list, если они в нем есть, но их количество меньше определенного нами ранее.

После итерации по всем файлам выполняем функцию, написанную нами ранее csv_merge, в которую передаем заголовки, путь к папке, и имя объединенного файла. Удаляем файл базы данных, удаляем папку, в которой он был создан. После выводим сообщение в терминал.

Python:
def main() -> None:
    path = input("path folder: ")
    files = [x for x in Path(path).iterdir() if Path(x).suffix in [".csv", ".CSV"]]
    heads = []
    num = ""
    name = ""
    for n, file in enumerate(files):
        if n == 0:
            if "_" in Path(file).name.removesuffix(Path(file).suffix):
                name = f'{"_".join(Path(file).name.removesuffix(Path(file).suffix).split("_")[:-1])}_merge.csv'
            else:
                name = f'{Path(file).name.removesuffix(Path(file).suffix)}_merge.csv'
        data_list = []
        with open(file, "r", encoding="utf-8") as cs:
            for nm, row in enumerate(csv.reader(cs, delimiter=";")):
                print("\r\033[K", end="")
                print(f"\r{n+1}/{len(files)} | Row insert: {nm+1}", end="")
                if nm == 0:
                    if heads:
                        continue
                    heads.append([f'{item} TEXT' for item in row])
                    heads.append(row)
                    num = ", ".join("?" for _ in range(len(row)))
                    que = f"""CREATE TABLE IF NOT EXISTS merge({", ".join(heads[0])}, UNIQUE({", ".join(heads[1])}));"""
                    create_base(que)
                    continue
                data_list.append(tuple(row))
                if len(data_list) == 100000:
                    save_item(data_list, num)
                    data_list.clear()
            if 0 < len(data_list) < 100000:
                save_item(data_list, num)
                data_list.clear()
    print(f"\n\nSave to file")
    csv_merge(heads[1], path, name)
    path_base = Path.cwd() / 'base' / "merge.db"
    Path(path_base).unlink()
    shutil.rmtree(Path.cwd() / "base")
    print("Saved")

Python:
import csv
import shutil
import time
from pathlib import Path
from sqlite3 import connect, OperationalError

from mod.base_work import create_base, save_item

csv.field_size_limit(2147483647)


def csv_merge(heads: list, path: str, name: str) -> None:
    csv_row = []
    with open(Path(path).parent / name, mode="w", encoding='utf-8', newline='') as w_file:
        file_writer = csv.writer(w_file, delimiter=";")
        file_writer.writerow(heads)
    try:
        path_base = Path.cwd() / 'base' / "merge.db"
        conn = connect(path_base)
        curr = conn.cursor()
        for row in curr.execute("SELECT * FROM merge"):
            csv_row.append(list(row))
            if len(csv_row) == 100000:
                with open(Path(path).parent / name, mode="a", encoding='utf-8', newline='') as w_file:
                    file_writer = csv.writer(w_file, delimiter=";")
                    file_writer.writerows(csv_row)
                csv_row.clear()
        if 0 < len(csv_row) < 100000:
            with open(Path(path).parent / name, mode="a", encoding='utf-8', newline='') as w_file:
                file_writer = csv.writer(w_file, delimiter=";")
                file_writer.writerows(csv_row)
            csv_row.clear()
        curr.close()
        conn.close()
    except OperationalError:
        time.sleep(10)
        csv_merge(heads, path, name)


def main() -> None:
    path = input("path folder: ")
    files = [x for x in Path(path).iterdir() if Path(x).suffix in [".csv", ".CSV"]]
    heads = []
    num = ""
    name = ""
    for n, file in enumerate(files):
        if n == 0:
            if "_" in Path(file).name.removesuffix(Path(file).suffix):
                name = f'{"_".join(Path(file).name.removesuffix(Path(file).suffix).split("_")[:-1])}_merge.csv'
            else:
                name = f'{Path(file).name.removesuffix(Path(file).suffix)}_merge.csv'
        data_list = []
        with open(file, "r", encoding="utf-8") as cs:
            for nm, row in enumerate(csv.reader(cs, delimiter=";")):
                print("\r\033[K", end="")
                print(f"\r{n+1}/{len(files)} | Row insert: {nm+1}", end="")
                if nm == 0:
                    if heads:
                        continue
                    heads.append([f'{item} TEXT' for item in row])
                    heads.append(row)
                    num = ", ".join("?" for _ in range(len(row)))
                    que = f"""CREATE TABLE IF NOT EXISTS merge({", ".join(heads[0])}, UNIQUE({", ".join(heads[1])}));"""
                    create_base(que)
                    continue
                data_list.append(tuple(row))
                if len(data_list) == 100000:
                    save_item(data_list, num)
                    data_list.clear()
            if 0 < len(data_list) < 100000:
                save_item(data_list, num)
                data_list.clear()
    print(f"\n\nSave to file")
    csv_merge(heads[1], path, name)
    path_base = Path.cwd() / 'base' / "merge.db"
    Path(path_base).unlink()
    shutil.rmtree(Path.cwd() / "base")
    print("Saved")


if __name__ == "__main__":
    main()

После создания всех необходимых файлов и функций в них, вот какой должна быть структура вашего проекта:

screenshot1.png

И небольшое видео, демонстрирующее работу скрипта.





А на этом, пожалуй, все.

Спасибо за внимание. Надеюсь, данная информация будет вам полезна
 

Вложения

Мы в соцсетях:

Обучение наступательной кибербезопасности в игровой форме. Начать игру!