Ручной анализ в такой ситуации - путь к профессиональному выгоранию и фатальным ошибкам. Открыть 10-гигабайтный лог в блокноте? Бесполезно. Выгрузить Event Log в CSV через wevtutil и склеивать в Excel? Excel упадёт на миллионе строк, а времени на обработку уйдёт половина смены. К тому же человеческий глаз неизбежно пропустит аномалию, которая прячется между тысячами рутинных записей.
Здесь на помощь приходит автоматизация. И инструмент номер один для неё - Python. Почему именно он, а не PowerShell, не Perl и не набор bash-скриптов? Потому что Python - это экосистема, закрывающая 90% задач DFIR-аналитика:
- Pandas - таблицы на стероидах. Фильтрация, группировка, объединение миллионов строк за секунды.
- re - регулярные выражения для выкусывания индикаторов из любого текстового мусора.
- datetime - работа с временем: нормализация часовых поясов, вычисление интервалов, построение хронологии.
- json / xml.etree - парсинг современных форматов логов.
- Специализированные библиотеки: python-evtx для чтения Windows Event Log, python-registry для работы с реестром, pytsk3 для MFT, jinja2 для генерации отчётов, matplotlib для визуализации.
Но автоматизация - это не про "запустил скрипт и ушёл пить кофе". Это про перекладывание рутины на машину, чтобы освободить голову для настоящего анализа. Скрипты берут на себя грязную работу: нормализацию форматов, поиск паттернов, подсчёт статистики. Аналитик же сосредотачивается на интерпретации: почему атакующий выбрал именно этот вектор, как развивалась атака во времени, какие ещё хосты могли быть скомпрометированы. Автоматизация не заменяет экспертизу - она даёт ей пространство для манёвра.
Эта статья - практический гайд, построенный на реальных задачах, с которыми сталкивается DFIR-специалист. Мы не будем переписывать документацию и разбирать основы синтаксиса. Мы сразу перейдём к делу: возьмём типовые сценарии анализа и покажем, как закрыть их с помощью Python. В каждом разделе - готовый код, пояснения, на каких граблях можно споткнуться и как их обойти.
Кому это будет полезно:
- SOC-аналитикам L2/L3, которые тонут в море алертов и хотят автоматизировать первичный разбор.
- Incident Responder'ам, которым после ночного вызова нужно за пару часов собрать картину происшествия.
- Forensic Examiner'ам, работающим с образами и артефактами, - покажем, как вытаскивать данные из MFT, реестра, Prefetch.
- Всем, кто уже знает, что такое event ID 4625 и чем отличается Syslog от Windows Event Log, но ещё не построил супертаймлайн из десяти источников одной командой.
Устраивайся поудобнее
1. Python в DFIR workflow
1.1. Зачем автоматизировать: объём данных, разнородные форматы, time pressure
Давай сразу к цифрам. Средний инцидент сегодня - это не один скомпрометированный хост, а цепочка: фишинг, малварь, C2, lateral movement, эксфильтрация. За пару дней атакующий успевает наследить в DNS-логах, firewall, контроллере домена, на рабочих станциях. Собираешь логи со всех источников - и получаешь 50–100 ГБ текста. Вручную открыть такой объём в Notepad++? Ну-ну.А ещё форматы... Syslog пишет в одном стиле, Windows Event Log - в своём XML-зоопарке, Apache - вообще строка с пробелами. Всё это надо склеить в единую хронологию. А клиент (или начальник) звонит каждые полчаса: «Ну что там? Нашли, откуда зашли?». Time pressure - наше всё. Без автоматизации тут либо сдаваться, либо работать сутками, теряя внимательность.
Поэтому Python - не роскошь, а инструмент выживания. Он берёт на себя грязную работу: нормализацию, поиск, подсчёт, а ты сосредотачиваешься на анализе. И да, это дешевле и гибче, чем любой корпоративный SIEM. Мы тут не против SIEM, но когда нужно быстро наколенном скрипте разобрать свежий дамп - Python вне конкуренции.
Если хочется посмотреть на Python не как на “ещё один язык”, а как на реальный ускоритель рутинных задач в ИБ - от парсинга логов до быстрых утилит под конкретный кейс, пригодится это руководство: Python в ИБ: Автоматизация и скрипты 2025.
1.2. Инструменты: Pandas, re, datetime, json, xml.etree - базовый набор
Погнали по списку того, что будем таскать из ящика:- Pandas - наша рабочая лошадка. Это Excel на стероидах. DataFrame позволяет фильтровать, группировать, объединять тысячи событий за доли секунды. Если ты ещё не знаком с Pandas - считай, ты не видел жизни.
- re - регулярные выражения. Ими мы будем выкусывать IP, URL, хеши из текстового мусора. Без них никак.
- datetime - работа со временем. Перевод строк в timestamp, нормализация часовых поясов, вычисление разниц. Всё для timeline.
- json / xml.etree - многие современные логи (и даже Windows Event Log внутри) хранятся в JSON или XML. Надо уметь это парсить.
- os, glob - чтобы ходить по папкам, собирать файлы.
- argparse - для скриптов, которые запускаются из командной строки. Сделаем интерфейс по-человечески.
- python-evtx - читаем .evtx файлы напрямую.
- python-registry - для работы с кустами реестра (но об этом в другой раз).
- python-ntfs / pytsk3 - для MFT.
- jinja2 - генерация красивых HTML-отчётов.
- matplotlib / seaborn - графики.
- requests - если надо отправить результаты в Elasticsearch или другой REST API.
pip install pandas python-evtx python-registry pytsk3 jinja2 matplotlib requests
1.3. Jupyter notebooks vs CLI scripts - когда что использовать
Тут принцип простой:- Jupyter - для разведки. Когда ты впервые видишь новый формат логов, экспериментируешь с регулярками, строишь графики, чтобы понять, где копать. Ячейки позволяют перезапускать части кода, смотреть промежуточные результаты - идеально для исследовательской фазы.
- CLI-скрипты - для боевого применения. Когда алгоритм отлажен, данные надо обрабатывать регулярно или в пайплайне. Запустил скрипт с параметрами, получил CSV/отчёт, пошёл дальше.
Совет: используй библиотеку dask для работы с данными, которые не влезают в память. Она имитирует Pandas, но обрабатывает данные по частям. Но об этом позже.
2. Log Normalization
2.1. Парсинг: Syslog (regex), Windows Event Log (python-evtx), Apache (CLF)
Первый этап - вытащить структуру из сырых строк. У каждого формата свои приколы.Syslog
Классический syslog бывает двух основных форматов: старый (RFC3164) и новый (RFC5424). Старый выглядит так:
Код:
<34>Oct 11 22:14:15 mymachine su: 'su root' failed for lonvick on /dev/pts/8
Новый:
Код:
<34>1 2003-10-11T22:14:15.003Z mymachine.example.com su - - - 'su root' failed
Python:
import re
syslog_pattern = re.compile(
r'^<(?P<pri>\d+)>(?P<timestamp>\w{3}\s+\d{1,2}\s+\d{2}:\d{2}:\d{2})\s+(?P<host>\S+)\s+(?P<app>[^:]+):\s+(?P<message>.*)$'
)
Но это для старого. Для нового - другая регулярка. Либо можно попробовать универсальный подход: сначала попытаться распарсить как новый, если не получилось - как старый.
Подводный камень: в старом формате год не указывается, его приходится брать из даты файла или считать, что события за последние 12 месяцев. Ещё бывает, что timestamp включает год в начале - тогда своя регулярка.
Windows Event Log (.evtx)
Здесь нам поможет библиотека python-evtx. Она умеет читать evtx файлы и выдавать XML каждой записи. Дальше парсим XML.Пример базового извлечения событий:
Python:
import Evtx.Evtx as evtx
import xml.etree.ElementTree as ET
def parse_evtx(file_path):
events = []
with evtx.Evtx(file_path) as log:
for record in log.records():
xml_str = record.xml()
root = ET.fromstring(xml_str)
# Пространство имён обычно такое:
ns = {'ns': 'http://schemas.microsoft.com/win/2004/08/events/event'}
system = root.find('ns:System', ns)
event_id = system.find('ns:EventID', ns).text
time_created = system.find('ns:TimeCreated', ns).get('SystemTime')
# Данные события могут быть в EventData/Data
event_data = root.find('ns:EventData', ns)
data_dict = {}
if event_data is not None:
for data in event_data.findall('ns:Data', ns):
name = data.get('Name', 'Unknown')
data_dict[name] = data.text
events.append({
'event_id': event_id,
'timestamp': time_created,
'data': data_dict
})
return events
Но этот код вытянет все события подряд. На практике нам нужны только определённые EventID (например, 4624, 4625 для логонов). Фильтровать можно либо на этапе парсинга, либо потом через Pandas.
Альтернатива: можно использовать wevtutil для экспорта evtx в XML/CSV, но это внешний инструмент, и он медленный. python-evtx работает быстрее и даёт контроль.
Apache access log (Common Log Format)
Тут всё проще: поля разделены пробелами, но есть хитрости - в запросе могут быть пробелы, а в кавычках - что угодно. Стандартный combined формат:
Код:
127.0.0.1 - - [10/Oct/2000:13:55:36 -0700] "GET /apache_pb.gif HTTP/1.0" 200 2326 "http://www.example.com/start.html" "Mozilla/4.08 [en] (Win98; I ;Nav)"
Python:
apache_pattern = re.compile(
r'^(?P<ip>\S+) \S+ \S+ \[(?P<timestamp>[^\]]+)\] "(?P<method>\S+) (?P<path>\S+) \S+" (?P<status>\d+) \S+ "(?P<referer>[^"]*)" "(?P<user_agent>[^"]*)"$'
)
Поля: ip, timestamp, method, path, status, referer, user_agent. Timestamp потом надо преобразовать в datetime.
2.2. Нормализация: единый формат timestamp, source, event_type, details
Мы собрали события из разных источников, но они живут в своих структурах. Чтобы объединить их, нужна общая схема. Я предлагаю такую:| Поле | Тип | Описание |
| timestamp | datetime | Время события в UTC |
| source | str | Идентификатор источника ('syslog', 'evtx', 'apache') |
| event_type | str | Тип события ( 'logon_failure', 'http_request') |
| host | str | Хост, на котором произошло событие |
| raw | str | Исходная строка (для отладки) |
| details | dict/json | Дополнительные поля (ip, user, status и т.д.) |
Такой подход позволяет хранить всё в одном DataFrame и при необходимости обращаться к details.
Пример приведения записи из syslog к общему формату:
Python:
def normalize_syslog(parsed):
return {
'timestamp': pd.to_datetime(parsed['timestamp']), # надо ещё год добавить
'source': 'syslog',
'event_type': 'syslog_message',
'host': parsed['host'],
'raw': parsed['raw'],
'details': {'app': parsed['app'], 'message': parsed['message']}
}
Для evtx:
Python:
def normalize_evtx(parsed):
# пример: событие 4625 (неудачный логон)
if parsed['event_id'] == '4625':
event_type = 'logon_failure'
else:
event_type = f'win_event_{parsed["event_id"]}'
return {
'timestamp': pd.to_datetime(parsed['timestamp']),
'source': 'evtx',
'event_type': event_type,
'host': parsed.get('computer', 'unknown'),
'raw': '', # можно не хранить, если лень
'details': parsed['data']
}
2.3. Pandas DataFrame: загрузка, типизация, индексация по timestamp
После того как мы нормализовали все события в список словарей, превращаем его в DataFrame:
Python:
import pandas as pd
df = pd.DataFrame(normalized_events)
df['timestamp'] = pd.to_datetime(df['timestamp']) # убедимся, что тип правильный
df.set_index('timestamp', inplace=True) # делаем индексом для временных рядов
Теперь у нас мощный инструмент фильтрации:
Python:
# все события за последний час
last_hour = df[df.index > datetime.now() - timedelta(hours=1)]
# все неудачные логоны
logon_failures = df[df['event_type'] == 'logon_failure']
# группировка по источнику и типу
df.groupby(['source', 'event_type']).size()
Pandas автоматически сортирует по индексу, если нужно - сделаем df.sort_index().
Типизация: детали хранятся в словаре, но иногда полезно вытащить конкретные поля в отдельные колонки:
Python:
df['ip'] = df['details'].apply(lambda x: x.get('ip', None))
Когда уже понятна механика парсинга .evtx, следующий шаг - научиться видеть за Event ID не просто строки, а логику атаки: где брутфорс, где подозрительный логон, где корреляция с другими источниками. Все это мы разобрали в нашем руководстве: Форензика для начинающих: пошаговый анализ цифровых следов и логов SOC-аналитикам.
3. Pattern Matching с RegEx
Регулярки - наше всё, когда нужно выковырять индикаторы из текста. Рассмотрим типовые задачи.3.1. IP extraction: IPv4, IPv6, private vs public classification
IPv4
Простая регулярка для IPv4 (без проверки корректности диапазонов):
Python:
ipv4_pattern = re.compile(r'\b(?:[0-9]{1,3}\.){3}[0-9]{1,3}\b')
Python:
ipv4_valid = re.compile(r'\b(?:(?:25[0-5]|2[0-4][0-9]|[01]?[0-9][0-9]?)\.){3}(?:25[0-5]|2[0-4][0-9]|[01]?[0-9][0-9]?)\b')
Python:
df['ips'] = df['raw'].str.findall(ipv4_valid)
IPv6
IPv6 сложнее, но есть стандартная регулярка из RFC. Я использую упрощённый вариант:
Python:
ipv6_pattern = re.compile(r'\b(?:[a-fA-F0-9]{1,4}:){7}[a-fA-F0-9]{1,4}\b|\b(?:[a-fA-F0-9]{1,4}:){1,7}:|\b:(?::[a-fA-F0-9]{1,4}){1,7}\b')
Классификация private/public
После извлечения IP полезно понять, какие из них внутренние, а какие внешние. Модуль ipaddress нам в помощь:
Python:
import ipaddress
def classify_ip(ip_str):
try:
ip = ipaddress.ip_address(ip_str)
if ip.is_private:
return 'private'
elif ip.is_global:
return 'public'
else:
return 'other'
except:
return 'invalid'
df['ip_class'] = df['ip'].apply(classify_ip)
3.2. URL и domain: извлечение из free‑text полей, defang/refang
URL
Регулярка для URL - дело неблагодарное, потому что в логах URL часто обрезаны или экранированы. Простейший вариант:
Python:
url_pattern = re.compile(r'https?://[^\s<>"\'{}|\\^`\[\]]+')
Domain
Домены вытаскиваем по похожему принципу, но без протокола:
Python:
domain_pattern = re.compile(r'\b(?:[a-zA-Z0-9-]+\.)+[a-zA-Z]{2,}\b')
Defang/Refang
В отчётах часто встречаются обезвреженные индикаторы: hxxp://example[.]com. Чтобы их использовать, нужно привести к нормальному виду:
Python:
def refang(ioc):
return ioc.replace('[.]', '.').replace('hxxp', 'http').replace('hxxps', 'https')
И наоборот, для публикации отчётов безопасно показывать defang:
Python:
def defang(ioc):
return ioc.replace('.', '[.]').replace('http', 'hxxp', 1)
3.3. Hash detection: MD5, SHA1, SHA256
Хеши - это последовательности hex-символов определённой длины. Регулярки:
Python:
md5_pattern = re.compile(r'\b[a-fA-F0-9]{32}\b')
sha1_pattern = re.compile(r'\b[a-fA-F0-9]{40}\b')
sha256_pattern = re.compile(r'\b[a-fA-F0-9]{64}\b')
Всё вместе можно объединить одной, но тогда не различить тип. Лучше искать отдельно.
Применяем к тексту и собираем все хеши:
Python:
def extract_hashes(text):
hashes = {}
hashes['md5'] = md5_pattern.findall(text)
hashes['sha1'] = sha1_pattern.findall(text)
hashes['sha256'] = sha256_pattern.findall(text)
return hashes
Важно: хеши могут пересекаться (например, SHA1 длиннее MD5, но MD5 может быть частью SHA1). Поэтому порядок поиска лучше от большего к меньшему, или использовать границы слов.
4. Timeline Analysis
4.1. SuperTimeline: объединение событий из 10+ источников
SuperTimeline (или суперхронология) - это когда мы собираем все события из всех доступных источников в одну временную линию. Сюда входят не только логи, но и артефакты файловой системы (времена создания/изменения файлов), реестр (последние записанные значения), Prefetch, события из памяти и т.д.В нашем случае мы ограничимся логами, но подход тот же: нормализуем каждый источник в DataFrame с общим форматом, затем объединяем их.
4.2. Pandas merge/concat
Если у нас несколько DataFrame с одинаковыми колонками, используем pd.concat:
Python:
df_timeline = pd.concat([df_syslog, df_evtx, df_apache], ignore_index=False)
df_timeline.sort_index(inplace=True)
Если структуры различаются, можно привести их к общему знаменателю, добавив недостающие колонки со значениями None.
Если нужно объединить по ключу (например, по IP), используем merge. Но для timeline чаще просто конкатенация.
Пример: построим количество событий по часам:
Python:
events_per_hour = df_timeline.resample('H').size()
events_per_hour.plot() # matplotlib
4.3. Визуализация: matplotlib timeline plot, HTML report с Jinja2
Визуализация помогает быстро заметить аномалии: всплески активности, временные окна атаки.Matplotlib timeline plot
Python:
import matplotlib.pyplot as plt
fig, ax = plt.subplots(figsize=(12,6))
for source in df_timeline['source'].unique():
subset = df_timeline[df_timeline['source'] == source]
# scatter plot событий во времени
ax.scatter(subset.index, [source]*len(subset), label=source, s=1)
ax.legend()
plt.show()
Но scatter с миллионом точек может тормозить. Лучше агрегировать по минутам и строить тепловую карту или ступенчатый график.
HTML-отчёт с Jinja2
Jinja2 позволяет генерировать красивые отчёты из шаблонов. Создаём шаблон report_template.html:
HTML:
<html>
<head><title>Timeline Report</title></head>
<body>
<h1>Timeline from {{ start_date }} to {{ end_date }}</h1>
<table border="1">
<tr><th>Timestamp</th><th>Source</th><th>Event Type</th><th>Details</th></tr>
{% for event in events %}
<tr>
<td>{{ event.timestamp }}</td>
<td>{{ event.source }}</td>
<td>{{ event.event_type }}</td>
<td>{{ event.details }}</td>
</tr>
{% endfor %}
</table>
</body>
</html>
В скрипте:
Python:
from jinja2 import Template
with open('report_template.html') as f:
template = Template(f.read())
html_output = template.render(
start_date=df_timeline.index.min(),
end_date=df_timeline.index.max(),
events=df_timeline.reset_index().to_dict(orient='records')
)
with open('timeline_report.html', 'w') as f:
f.write(html_output)
Можно добавить графики в виде base64-encoded PNG или использовать Plotly для интерактива.
5. Практические скрипты
Теперь, когда теория позади, перейдём к реальным скриптам, которые ты сможешь завтра же применить на практике. Каждый скрипт я постараюсь объяснить построчно, показать пример вывода и указать, где могут вылезти грабли.5.1. Скрипт 1: анализ Windows Security Event Log - logon events (4624/4625)
Задача: проанализировать файл Security.evtx, выявить все события входа (успешного и неудачного), посчитать количество неудачных попыток по IP и пользователям, построить timeline и выделить возможный брутфорс.Код:
Python:
#!/usr/bin/env python3
# evtx_logon_analysis.py
import argparse
import pandas as pd
import Evtx.Evtx as evtx
import xml.etree.ElementTree as ET
from datetime import datetime
def parse_evtx_logons(file_path):
events = []
with evtx.Evtx(file_path) as log:
for record in log.records():
xml_str = record.xml()
root = ET.fromstring(xml_str)
ns = {'ns': 'http://schemas.microsoft.com/win/2004/08/events/event'}
system = root.find('ns:System', ns)
event_id = system.find('ns:EventID', ns).text
if event_id not in ['4624', '4625']:
continue
time_created = system.find('ns:TimeCreated', ns).get('SystemTime')
computer = system.find('ns:Computer', ns).text
event_data = root.find('ns:EventData', ns)
data = {}
if event_data is not None:
for d in event_data.findall('ns:Data', ns):
name = d.get('Name', 'unknown')
data[name] = d.text
events.append({
'timestamp': time_created,
'event_id': event_id,
'computer': computer,
'data': data
})
return events
def main():
parser = argparse.ArgumentParser(description='Analyze Windows Logon Events from .evtx')
parser.add_argument('evtx', help='Path to Security.evtx file')
parser.add_argument('--output', '-o', default='logon_report.csv', help='Output CSV file')
args = parser.parse_args()
print(f"[*] Parsing {args.evtx}...")
raw_events = parse_evtx_logons(args.evtx)
print(f"[+] Found {len(raw_events)} logon events")
# нормализация
rows = []
for ev in raw_events:
row = {
'timestamp': pd.to_datetime(ev['timestamp']),
'event_id': ev['event_id'],
'computer': ev['computer'],
'ip': ev['data'].get('IpAddress', '').replace('::ffff:', ''), # убираем IPv6 mapping
'user': ev['data'].get('TargetUserName', ''),
'logon_type': ev['data'].get('LogonType', ''),
'status': 'success' if ev['event_id'] == '4624' else 'failure'
}
rows.append(row)
df = pd.DataFrame(rows)
df.set_index('timestamp', inplace=True)
df.sort_index(inplace=True)
# статистика
print("\n=== Logon Summary ===")
print(df.groupby('status').size())
print("\n=== Top 10 Failed Logons by IP ===")
failed = df[df['status'] == 'failure']
print(failed['ip'].value_counts().head(10))
print("\n=== Top 10 Failed Logons by User ===")
print(failed['user'].value_counts().head(10))
# брутфорс-детектор: больше 10 неудач с одного IP за 5 минут
print("\n=== Potential Brute-Force IPs (threshold 10 failures in 5 min) ===")
# используем rolling window
failed_by_ip = failed.groupby('ip').resample('5T').size().reset_index()
brute_ips = failed_by_ip[failed_by_ip[0] >= 10]['ip'].unique()
print(brute_ips)
# сохраняем CSV
df.to_csv(args.output)
print(f"[+] Report saved to {args.output}")
if __name__ == '__main__':
main()
Пояснения:
- Используем python-evtx для чтения, фильтруем только 4624 и 4625.
- Извлекаем IP, имя пользователя, тип входа.
- Статус определяем по event_id.
- Для поиска брутфорса группируем неудачные попытки по IP и пересэмплируем по 5 минутам. Если в каком-то пятиминутном окне >=10 попыток - считаем IP подозрительным.
- Выводим статистику и сохраняем в CSV.
Код:
[*] Parsing Security.evtx...
[+] Found 12500 logon events
=== Logon Summary ===
status
failure 4200
success 8300
=== Top 10 Failed Logons by IP ===
192.168.1.100 1500
10.0.0.55 800
...
=== Top 10 Failed Logons by User ===
administrator 2000
user1 500
...
=== Potential Brute-Force IPs (threshold 10 failures in 5 min) ===
['192.168.1.100', '10.0.0.55']
Грабли:
- В поле IpAddress может приходить IPv6-отображение IPv4
:ffff:192.168.1.1), мы обрезаем. - Для старых Windows в данных могут отсутствовать некоторые поля.
- Время в evtx хранится в UTC, но Windows часто пишет локальное время. Проверь часовой пояс.
5.2. Скрипт 2: Apache access log - top attackers, suspicious paths
Задача: разобрать access.log веб-сервера, выделить IP с большим количеством запросов, найти подозрительные пути (админки, скрипты, сканеры), построить график активности.Код:
Python:
#!/usr/bin/env python3
# apache_analyze.py
import argparse
import re
import pandas as pd
import matplotlib.pyplot as plt
from collections import Counter
apache_pattern = re.compile(
r'^(?P<ip>\S+) \S+ \S+ \[(?P<timestamp>[^\]]+)\] "(?P<method>\S+) (?P<path>\S+) \S+" (?P<status>\d+) \S+ "(?P<referer>[^"]*)" "(?P<user_agent>[^"]*)"$'
)
def parse_apache_line(line):
m = apache_pattern.match(line)
if not m:
return None
d = m.groupdict()
# преобразуем timestamp
ts_str = d['timestamp']
# формат: 10/Oct/2000:13:55:36 -0700
ts = pd.to_datetime(ts_str, format='%d/%b/%Y:%H:%M:%S %z')
return {
'timestamp': ts,
'ip': d['ip'],
'method': d['method'],
'path': d['path'],
'status': int(d['status']),
'referer': d['referer'],
'user_agent': d['user_agent']
}
def main():
parser = argparse.ArgumentParser(description='Analyze Apache access log')
parser.add_argument('logfile', help='Path to access.log')
parser.add_argument('--output', '-o', default='apache_report.csv')
args = parser.parse_args()
print(f"[*] Parsing {args.logfile}...")
events = []
with open(args.logfile, 'r', encoding='utf-8', errors='ignore') as f:
for line in f:
ev = parse_apache_line(line)
if ev:
events.append(ev)
print(f"[+] Parsed {len(events)} lines")
df = pd.DataFrame(events)
df.set_index('timestamp', inplace=True)
df.sort_index(inplace=True)
print("\n=== Top 10 IPs by request count ===")
print(df['ip'].value_counts().head(10))
print("\n=== Top 10 most requested paths ===")
print(df['path'].value_counts().head(10))
# подозрительные пути (словари, скрипты)
suspicious = ['admin', 'phpmyadmin', 'wp-admin', 'mysql', 'sql', 'backup', 'config', '.env']
pattern = '|'.join(suspicious)
suspicious_hits = df[df['path'].str.contains(pattern, case=False, na=False)]
print(f"\n=== Suspicious path hits ({len(suspicious_hits)}) ===")
print(suspicious_hits['path'].value_counts().head(10))
# статусы ошибок
errors = df[df['status'] >= 400]
print(f"\n=== HTTP errors (4xx/5xx): {len(errors)} ===")
print(errors['status'].value_counts())
# временной график
hits_per_hour = df.resample('H').size()
hits_per_hour.plot(figsize=(12,4), title='Requests per hour')
plt.tight_layout()
plt.savefig('requests_per_hour.png')
print("[+] Graph saved to requests_per_hour.png")
df.to_csv(args.output)
print(f"[+] Report saved to {args.output}")
if __name__ == '__main__':
main()
Пояснения:
- Регулярка под combined log format.
- Извлекаем timestamp с часовым поясом, Pandas его понимает.
- Считаем топ IP и пути.
- Ищем подозрительные слова в пути (часто используются сканерами).
- Строим график числа запросов по часам.
- В логах могут быть невалидные строки (обрывки), нужно обрабатывать ошибки.
- Путь может быть закодирован, но мы ищем подстроки, это работает.
- Если лог огромный, чтение построчно с регуляркой может быть медленным. Можно использовать pandas.read_csv с разделителем пробел, но тогда сложности с кавычками. Для ускорения можно применить re.compile один раз и параллельную обработку (но это уже сложнее).
Если после базового парсинга хочется глубже разобрать сам access log - какие поля в нём самые ценные, как быстро вытаскивать IP, коды ответа, запросы и строить полезную статистику, пригодится наше руководство: Анализ лог-файлов Apache.
5.3. Скрипт 3: artifact parser - $MFT timeline с python-ntfs
Задача: прочитать файл $MFT, извлечь временные метки создания, изменения, записи и доступа для каждого файла, построить timeline активности файловой системы. Это полезно для выявления времени появления вредоносных файлов, их модификаций.Важно: работа с MFT требует либо прав администратора (чмонтировать диск), либо доступа к сырому файлу $MFT (можно скопировать с помощью инструментов типа rawcopy). Для примера будем использовать библиотеку python-ntfs (GitHub - williballenthin/python-ntfs: Open source Python library for NTFS analysis), которая умеет парсить MFT напрямую.
Установка:
pip install python-ntfsКод:
Python:
#!/usr/bin/env python3
# mft_timeline.py
import argparse
import pandas as pd
from ntfs.mft import MFT
from ntfs.attributes import ATTR_TYPE
import os
def parse_mft(mft_path):
records = []
mft = MFT(mft_path)
for record in mft.records():
if record.header.base_record_reference != 0:
continue # пропускаем расширенные записи, обычно они дублируют базовые
# Получаем имена файлов из атрибута $FILE_NAME
fname_attr = None
for attr in record.attributes():
if attr.type == ATTR_TYPE.FILE_NAME:
fname_attr = attr
break
if not fname_attr:
continue
fn = fname_attr.file_name()
name = fn.name
parent_ref = fn.parent_directory
# Временные метки (в 100-нс интервалах от 1601-01-01)
create_time = fn.created_time
modify_time = fn.modified_time
mft_change_time = fn.mft_changed_time
access_time = fn.accessed_time
# Конвертируем в datetime
def ntfs_time_to_datetime(ntfs_time):
# ntfs_time - 64-bit, количество 100-нс интервалов с 1601-01-01
if ntfs_time == 0:
return None
# Конвертируем в секунды с 1970
unix_time = (ntfs_time - 116444736000000000) / 10000000
return pd.to_datetime(unix_time, unit='s')
records.append({
'name': name,
'parent_ref': parent_ref,
'create': ntfs_time_to_datetime(create_time),
'modify': ntfs_time_to_datetime(modify_time),
'mft_change': ntfs_time_to_datetime(mft_change_time),
'access': ntfs_time_to_datetime(access_time)
})
return records
def main():
parser = argparse.ArgumentParser(description='Parse $MFT and generate timeline')
parser.add_argument('mft', help='Path to $MFT file')
parser.add_argument('--output', '-o', default='mft_timeline.csv')
args = parser.parse_args()
print(f"[*] Parsing MFT from {args.mft}...")
records = parse_mft(args.mft)
print(f"[+] Loaded {len(records)} file records")
df = pd.DataFrame(records)
# Создаём "long format" для timeline: каждая запись с одним временем и типом
timeline = []
for _, row in df.iterrows():
for ttype in ['create', 'modify', 'mft_change', 'access']:
ts = row[ttype]
if pd.notnull(ts):
timeline.append({
'timestamp': ts,
'file': row['name'],
'event_type': f'$MFT_{ttype}'
})
df_timeline = pd.DataFrame(timeline)
df_timeline.set_index('timestamp', inplace=True)
df_timeline.sort_index(inplace=True)
print("\n=== Timeline summary ===")
print(df_timeline.groupby('event_type').size())
# Сохраняем
df_timeline.to_csv(args.output)
print(f"[+] Timeline saved to {args.output}")
if __name__ == '__main__':
main()
Пояснения:
- Используем библиотеку python-ntfs, она даёт доступ к атрибутам каждой записи.
- Извлекаем атрибут $FILE_NAME, где хранятся временные метки (в NTFS времени).
- Конвертируем NTFS-время (число 100-нс интервалов с 1601-01-01) в datetime.
- Создаём "длинный" формат: для каждого файла и каждого типа времени одна запись.
- Получаем timeline изменений файловой системы.
- Библиотека python-ntfs может быть нестабильна на больших MFT (сотни ГБ). Альтернативы: использовать pytsk3 (но там другой API) или читать MFT через analyzeMFT.py (сторонний инструмент) и потом парсить его CSV.
- MFT может содержать удалённые файлы (они помечены флагом). По умолчанию мы их тоже видим, что полезно.
- Время доступа (access_time) может быть отключено на системе, тогда оно не обновляется.
5.4. Скрипт 4: IOC extractor - извлечение всех IOC из текстового отчёта
Задача: дан текстовый файл (email, отчёт антивируса, лог), нужно автоматически извлечь из него IP, домены, URL, хеши и вывести в структурированном виде (CSV/JSON).Код:
Python:
#!/usr/bin/env python3
# ioc_extractor.py
import argparse
import re
import json
import pandas as pd
from collections import defaultdict
# Регулярки
ipv4_pattern = re.compile(r'\b(?:(?:25[0-5]|2[0-4][0-9]|[01]?[0-9][0-9]?)\.){3}(?:25[0-5]|2[0-4][0-9]|[01]?[0-9][0-9]?)\b')
domain_pattern = re.compile(r'\b(?:[a-zA-Z0-9-]+\.)+[a-zA-Z]{2,}\b')
url_pattern = re.compile(r'https?://[^\s<>"\'{}|\\^`\[\]]+')
md5_pattern = re.compile(r'\b[a-fA-F0-9]{32}\b')
sha1_pattern = re.compile(r'\b[a-fA-F0-9]{40}\b')
sha256_pattern = re.compile(r'\b[a-fA-F0-9]{64}\b')
def extract_iocs(text):
iocs = defaultdict(list)
iocs['ipv4'] = list(set(ipv4_pattern.findall(text)))
iocs['domain'] = list(set(domain_pattern.findall(text)))
iocs['url'] = list(set(url_pattern.findall(text)))
iocs['md5'] = list(set(md5_pattern.findall(text)))
iocs['sha1'] = list(set(sha1_pattern.findall(text)))
iocs['sha256'] = list(set(sha256_pattern.findall(text)))
# Убираем пересечения: если домен попал в url, не страшно, но можно очистить
return iocs
def main():
parser = argparse.ArgumentParser(description='Extract IOCs from text file')
parser.add_argument('input', help='Input text file')
parser.add_argument('--output', '-o', default='iocs.json', help='Output JSON file')
parser.add_argument('--csv', action='store_true', help='Output CSV instead of JSON')
args = parser.parse_args()
with open(args.input, 'r', encoding='utf-8', errors='ignore') as f:
text = f.read()
iocs = extract_iocs(text)
if args.csv:
# превращаем в плоский список для CSV
rows = []
for ioc_type, values in iocs.items():
for val in values:
rows.append({'type': ioc_type, 'value': val})
df = pd.DataFrame(rows)
df.to_csv(args.output.replace('.json', '.csv'), index=False)
print(f"[+] CSV saved to {args.output.replace('.json', '.csv')}")
else:
with open(args.output, 'w') as f:
json.dump(iocs, f, indent=2)
print(f"[+] JSON saved to {args.output}")
# Статистика
print("\n=== Extracted IOCs ===")
for k, v in iocs.items():
print(f"{k}: {len(v)}")
if __name__ == '__main__':
main()
Пояснения:
- Используем несколько регулярных выражений для разных типов IOC.
- Для каждого типа удаляем дубликаты через set.
- Результат можно сохранить в JSON (структурированно по типам) или в CSV (плоская таблица).
- Легко добавить другие типы (например, email, registry paths).
- Домены могут пересекаться с URL. В CSV они будут отдельными записями, это нормально.
- Могут быть ложные срабатывания (например, IPv4 в десятичной записи не ловится). Можно использовать библиотеку ipaddress для валидации после извлечения.
- Для defanged IOC нужно предварительно refang.
5.5. Скрипт 5: correlation - связывание событий firewall + auth + endpoint
Задача: объединить логи из трёх источников (файрвол, события аутентификации, логи EDR) и найти корреляции. Например: все IP, которые были заблокированы файрволом и одновременно делали неудачные попытки входа. Или все процессы, которые были запущены на хосте сразу после успешного входа с внешнего IP.Подход: нормализуем каждый источник в DataFrame с общим индексом timestamp, добавляем колонку с IP (если есть), затем объединяем по временному окну.
Упрощённый пример (без полного парсинга, демонстрируем идею):
Python:
#!/usr/bin/env python3
# correlate.py
import pandas as pd
import argparse
def load_firewall_logs(file):
# заглушка: читаем CSV с колонками timestamp, ip, action
df = pd.read_csv(file, parse_dates=['timestamp'])
df.set_index('timestamp', inplace=True)
df['source'] = 'firewall'
return df
def load_auth_logs(file):
df = pd.read_csv(file, parse_dates=['timestamp'])
df.set_index('timestamp', inplace=True)
df['source'] = 'auth'
return df
def load_endpoint_logs(file):
df = pd.read_csv(file, parse_dates=['timestamp'])
df.set_index('timestamp', inplace=True)
df['source'] = 'endpoint'
return df
def main():
parser = argparse.ArgumentParser()
parser.add_argument('--firewall', required=True)
parser.add_argument('--auth', required=True)
parser.add_argument('--endpoint', required=True)
parser.add_argument('--window', default='5min', help='Correlation window')
args = parser.parse_args()
df_fw = load_firewall_logs(args.firewall)
df_auth = load_auth_logs(args.auth)
df_ep = load_endpoint_logs(args.endpoint)
# объединяем все
df_all = pd.concat([df_fw, df_auth, df_ep])
df_all.sort_index(inplace=True)
# теперь ищем IP, которые есть в firewall (action=block) и в auth (failure)
# допустим, в firewall есть колонка ip, в auth тоже ip
# берём временное окно: события из разных источников, произошедшие в течение window
# Простой способ: сгруппировать по IP и проверить пересечение типов событий
# Но это не учитывает временную близость. Лучше использовать rolling window с merge_asof.
# Пример: найти все пары событий firewall (block) и auth (failure) с одного IP, произошедшие не позднее 5 минут друг от друга
fw_blocks = df_fw[df_fw['action'] == 'block'][['ip']].reset_index()
auth_fails = df_auth[df_auth['status'] == 'failure'][['ip']].reset_index()
# используем merge_asof для поиска ближайших по времени
# сначала сортируем
fw_blocks.sort_values('timestamp', inplace=True)
auth_fails.sort_values('timestamp', inplace=True)
# убираем дубликаты индексов
result = pd.merge_asof(
fw_blocks, auth_fails,
on='timestamp', by='ip',
direction='nearest',
tolerance=pd.Timedelta(args.window)
)
# result содержит строки из fw_blocks с добавленным timestamp из auth_fails, если нашли в пределах окна
correlated = result.dropna(subset=['timestamp_y'])
print(f"Found {len(correlated)} correlations between firewall blocks and auth failures within {args.window}")
print(correlated[['ip', 'timestamp_x', 'timestamp_y']])
if __name__ == '__main__':
main()
Пояснения:
- Загружаем три нормализованных DataFrame.
- Используем merge_asof для поиска событий из разных источников, близких по времени (с одного IP).
- Выводим корреляции.
- merge_asof требует сортировки по времени.
- Толерантность задаётся как pd.Timedelta.
- Если событий много, может быть много совпадений. Нужно продумать логику: ищем все пары, или только если блокировка произошла после неудач? Направление можно задать параметром direction.
- В реальных данных IP может быть NAT, тогда корреляция будет неточной.
6. Интеграция и отчёты
6.1. Отправка результатов в ELK
После того как мы получили обогащённые данные, часто нужно отправить их в Elasticsearch для визуализации или дальнейшего анализа. Есть несколько способов:- Использовать официальный клиент elasticsearch-py.
- Записать в CSV и загрузить через Logstash.
- Использовать bulk API.
Python:
from elasticsearch import Elasticsearch, helpers
import pandas as pd
def df_to_es(df, index_name, es_host='localhost:9200'):
es = Elasticsearch([es_host])
actions = []
for i, row in df.iterrows():
doc = row.to_dict()
doc['_index'] = index_name
doc['_id'] = f"{i}" # уникальный ID
actions.append(doc)
helpers.bulk(es, actions)
Осторожно: если DataFrame большой, лучше отправлять чанками. Также нужно настроить mapping под типы данных (особенно timestamp).
6.2. Генерация HTML-отчёта с Jinja2
Мы уже рассматривали базовый шаблон. Усложним: добавим графики в формате base64 и интерактивные таблицы (например, с помощью DataTables).Генерация графика в base64:
Python:
import matplotlib.pyplot as plt
import io
import base64
def fig_to_base64(fig):
buf = io.BytesIO()
fig.savefig(buf, format='png')
buf.seek(0)
return base64.b64encode(buf.read()).decode('utf-8')
В шаблоне:
HTML:
<img src="data:image/png;base64,{{ plot_base64 }}" />
Шаблон report_template.html:
HTML:
<html>
<head>
<title>DFIR Report</title>
<link rel="stylesheet" type="text/css" href="https://cdn.datatables.net/1.10.25/css/jquery.dataTables.css">
<script src="https://code.jquery.com/jquery-3.6.0.min.js"></script>
<script src="https://cdn.datatables.net/1.10.25/js/jquery.dataTables.js"></script>
</head>
<body>
<h1>Incident Timeline Report</h1>
<h2>Generated on {{ now }}</h2>
<img src="data:image/png;base64,{{ plot_base64 }}" />
<h2>Events ({{ events|length }})</h2>
<table id="events" class="display">
<thead>
<tr><th>Timestamp</th><th>Source</th><th>Type</th><th>Details</th></tr>
</thead>
<tbody>
{% for ev in events %}
<tr>
<td>{{ ev.timestamp }}</td>
<td>{{ ev.source }}</td>
<td>{{ ev.event_type }}</td>
<td>{{ ev.details }}</td>
</tr>
{% endfor %}
</tbody>
</table>
<script>
$(document).ready( function () {
$('#events').DataTable();
} );
</script>
</body>
</html>
В Python:
Python:
from jinja2 import Template
from datetime import datetime
with open('report_template.html') as f:
template = Template(f.read())
plot_base64 = fig_to_base64(plt.gcf()) # после построения графика
html = template.render(
now=datetime.now().strftime('%Y-%m-%d %H:%M:%S'),
plot_base64=plot_base64,
events=df_timeline.reset_index().to_dict(orient='records')
)
with open('final_report.html', 'w') as f:
f.write(html)
Получается полноценный интерактивный отчёт.
7. Заключение: остаёмся людьми
Автоматизация - это не магия. Это просто молоток, которым можно забивать гвозди, а можно - себе по пальцам. Мы рассмотрели пять скриптов, которые покрывают 80% рутинных задач. Но если запустить их на сырых данных без понимания, что внутри, - результат будет таким же сырым. Поэтому прежде чем жать Enter, всегда задавай себе три вопроса:- Что я ищу? (Если не знаешь - скрипт найдёт кучу мусора.)
- Откуда данные? (Может, часы в логах переведены неправильно, и весь timeline поедет.)
- Что я буду делать с результатом? (Просто сохранить в CSV и забыть? Или передать клиенту?)
Золотое правило: всегда оставляй в отчёте раздел «Ограничения метода». Это не стыдно, это профессионально. Судьям и заказчикам важна прозрачность.
Автоматизация должна облегчать жизнь, а не превращать её в ад поддержки кода. Я видел ребят, которые пишут мега-скрипты на 5000 строк, а через полгода сами в них не могут разобраться. Не будь как они.
- Документируй. Хотя бы пару строк в начале каждого модуля: что делает, какие входные данные, пример вызова.
- Версионируй. Git - наше всё. Даже если ты один, история изменений спасает, когда случайно удалил рабочую версию.
- Тестируй на маленьких данных. Не лезь сразу с гигабайтным логом - сначала отрежь 100 строк и проверь, что регулярка не падает.
Куда движемся?
DFIR не стоит на месте. Вчера были логи Apache, сегодня - контейнеры, Kubernetes, облачные audit-логи (AWS CloudTrail, Azure AD). Форматы меняются, объёмы растут. Но подход остаётся: нормализация, обогащение, визуализация. Python с его экосистемой (Pandas, Dask, PySpark) позволяет масштабироваться.Мой тебе совет: не замыкайся на одном инструменте. Скрипты - это классно, но иногда быстрее написать запрос в Elastic или использовать готовый плагин для Volatility. Держи под рукой арсенал и выбирай под задачу.
Последнее редактирование модератором: