Интеграция XML-каталогов в хранилище данных: от хаоса к порядку
Проблема: разрозненные каталоги от поставщиков
Типичная ситуация для дистрибьюторов и интернет-магазинов: у вас 10-50 поставщиков, каждый присылает свой каталог в XML. Форматы разные, структура отличается, обновляются в разное время.
Что обычно идет не так
- Разные XML-схемы: один поставщик использует
<price>, другой<priceRub>, третий<cost currency="RUB"> - Несогласованная номенклатура: одна и та же позиция называется по-разному в каждом каталоге
- Кодировки: Windows-1251, UTF-8, UTF-8 with BOM
- Битые XML: незакрытые теги, невалидные символы, огромные файлы (500+ MB)
- Отсутствие истории: новый файл полностью перезаписывает старый, динамику цен не отследить
Результат: менеджеры вручную сводят прайсы в Excel, тратят часы на поиск товаров, ошибаются в ценах.
Решение: Data Warehouse с автоматической интеграцией
Мы создаем единое хранилище данных (DWH), которое:
- Автоматически скачивает XML от поставщиков (FTP, HTTP, email)
- Парсит и валидирует данные
- Приводит к единой схеме
- Хранит историю изменений цен и остатков
- Обогащает дополнительными данными
- Предоставляет API и дашборды
Архитектура
┌─────────────────────────────────┐
│ Источники (поставщики) │
│ - FTP серверы │
│ - HTTP endpoints │
│ - Email вложения │
│ - Google Sheets API │
└──────────┬──────────────────────┘
│
┌──────────▼──────────────────────┐
│ ETL Pipeline (Airflow) │
│ - Скачивание файлов │
│ - Парсинг XML (lxml, xmltodict)│
│ - Валидация и очистка │
│ - Приведение к единой схеме │
└──────────┬──────────────────────┘
│
┌──────────▼──────────────────────┐
│ Staging (PostgreSQL) │
│ - Сырые данные │
│ - Логи обработки │
└──────────┬──────────────────────┘
│
┌──────────▼──────────────────────┐
│ Transformation (dbt) │
│ - Дедупликация │
│ - Обогащение │
│ - Расчет производных метрик │
└──────────┬──────────────────────┘
│
┌──────────▼──────────────────────┐
│ Data Warehouse (ClickHouse) │
│ - Единая модель данных │
│ - История изменений │
│ - Витрины для аналитики │
└──────────┬──────────────────────┘
│
┌──────────▼──────────────────────┐
│ Потребители данных │
│ - BI дашборды (Metabase) │
│ - API для сайта/1С │
│ - Экспорт в файлы │
└─────────────────────────────────┘
Кейс: B2B дистрибьютор стройматериалов
Исходная ситуация:
- 24 поставщика
- XML каталоги от 5 MB до 800 MB
- Обновления: ежедневно, раз в неделю, по запросу
- Менеджеры тратили 15 часов в неделю на сведение прайсов
Что сделали
1. Автоматизация загрузки
# Пример Airflow DAG для загрузки XML
from airflow import DAG
from airflow.operators.python import PythonOperator
from datetime import datetime, timedelta
def download_xml_from_ftp(supplier_config):
"""Скачивание XML с FTP сервера поставщика"""
import ftplib
ftp = ftplib.FTP(supplier_config['host'])
ftp.login(supplier_config['user'], supplier_config['password'])
# Скачиваем файл
filename = supplier_config['filename']
with open(f'/tmp/{filename}', 'wb') as f:
ftp.retrbinary(f'RETR {filename}', f.write)
ftp.quit()
return f'/tmp/{filename}'
dag = DAG(
'xml_catalog_etl',
default_args={
'owner': 'data-team',
'retries': 3,
'retry_delay': timedelta(minutes=5),
},
schedule_interval='0 6 * * *', # каждый день в 6:00
start_date=datetime(2025, 1, 1),
catchup=False,
)
# Создаем task для каждого поставщика
for supplier in suppliers_config:
PythonOperator(
task_id=f'download_{supplier["name"]}',
python_callable=download_xml_from_ftp,
op_kwargs={'supplier_config': supplier},
dag=dag,
)
2. Универсальный парсер XML
import xmltodict
import pandas as pd
from typing import Dict, List
class XMLCatalogParser:
"""Универсальный парсер для XML каталогов"""
def __init__(self, mapping_config: Dict):
"""
mapping_config определяет соответствие полей:
{
'article': 'offer.@id',
'name': 'offer.name',
'price': 'offer.price',
'currency': 'offer.currencyId',
'stock': 'offer.@available',
}
"""
self.mapping = mapping_config
def parse(self, xml_path: str) -> pd.DataFrame:
# Парсим XML
with open(xml_path, 'rb') as f:
data = xmltodict.parse(f)
# Извлекаем offers (путь может отличаться)
offers = self._extract_offers(data)
# Маппинг полей
records = []
for offer in offers:
record = {}
for target_field, source_path in self.mapping.items():
record[target_field] = self._get_nested_value(offer, source_path)
records.append(record)
return pd.DataFrame(records)
def _get_nested_value(self, obj, path: str):
"""Получение вложенного значения по пути 'a.b.c'"""
keys = path.split('.')
value = obj
for key in keys:
if key.startswith('@'):
# Атрибут
value = value.get(key, None)
else:
value = value.get(key, {})
return value
3. Нормализация и валидация
def normalize_catalog(df: pd.DataFrame, supplier_id: int) -> pd.DataFrame:
"""Приведение к единой схеме"""
# Добавляем метаданные
df['supplier_id'] = supplier_id
df['loaded_at'] = pd.Timestamp.now()
# Чистим цены
df['price'] = df['price'].str.replace(' ', '').str.replace(',', '.')
df['price'] = pd.to_numeric(df['price'], errors='coerce')
# Убираем невалидные записи
df = df.dropna(subset=['article', 'price'])
df = df[df['price'] > 0]
# Нормализуем названия
df['name'] = df['name'].str.strip().str.title()
# Приводим остатки к bool
df['in_stock'] = df['stock'].apply(lambda x: str(x).lower() in ['true', 'yes', '1', 'да'])
return df
4. Отслеживание изменений (CDC)
-- ClickHouse: таблица с версионированием
CREATE TABLE catalog_history (
article String,
supplier_id UInt32,
name String,
price Decimal(10, 2),
in_stock UInt8,
valid_from DateTime,
valid_to DateTime DEFAULT toDateTime('2099-12-31'),
is_current UInt8 DEFAULT 1
) ENGINE = MergeTree()
ORDER BY (supplier_id, article, valid_from);
-- При загрузке новых данных закрываем старые версии
UPDATE catalog_history
SET
valid_to = now(),
is_current = 0
WHERE article = ?
AND supplier_id = ?
AND is_current = 1
AND (price != ? OR in_stock != ?);
-- Вставляем новую версию
INSERT INTO catalog_history
(article, supplier_id, name, price, in_stock, valid_from)
VALUES (?, ?, ?, ?, ?, now());
Результаты
| Метрика | До | После |
|---|---|---|
| Время обновления каталога | 6-8 часов вручную | 30 минут автоматически |
| Ошибки в ценах | ~5% заказов | 0.2% |
| Задержка данных | 1-3 дня | real-time |
| Покрытие аналитикой | 0% | 100% (история, динамика) |
Экономия: 15 ч/нед × 4 нед × 3000 ₽/ч = 180 000 ₽/мес на ФОТ менеджеров
Продвинутая аналитика
После интеграции открываются новые возможности:
1. Мониторинг цен конкурентов
-- ClickHouse: сравнение цен по одному товару у разных поставщиков
SELECT
article,
name,
supplier_id,
price,
price - minPrice AS price_diff,
round((price - minPrice) / minPrice * 100, 1) AS markup_percent
FROM catalog_history
JOIN (
SELECT article, min(price) AS minPrice
FROM catalog_history
WHERE is_current = 1
GROUP BY article
) USING article
WHERE is_current = 1
AND in_stock = 1
ORDER BY markup_percent DESC
LIMIT 100;
2. Анализ динамики цен
-- Товары с максимальным изменением цены за неделю
SELECT
article,
name,
supplier_id,
price AS current_price,
prev_price,
round((price - prev_price) / prev_price * 100, 1) AS price_change_pct
FROM (
SELECT
article,
name,
supplier_id,
price,
lagInFrame(price) OVER (PARTITION BY article, supplier_id ORDER BY valid_from) AS prev_price
FROM catalog_history
WHERE valid_from >= now() - INTERVAL 7 DAY
)
WHERE prev_price > 0
ORDER BY abs(price_change_pct) DESC
LIMIT 50;
3. Алерты на критические события
# Отправка уведомления в Telegram при резком изменении цены
def check_price_anomalies():
query = """
SELECT article, name, supplier_id,
current_price, prev_price,
price_change_pct
FROM price_changes_view
WHERE abs(price_change_pct) > 20 -- изменение более 20%
AND valid_from >= now() - INTERVAL 1 DAY
"""
anomalies = clickhouse_client.query(query)
if len(anomalies) > 0:
message = "⚠️ Резкое изменение цен:\n\n"
for row in anomalies:
message += f"• {row['name']} ({row['article']})\n"
message += f" Было: {row['prev_price']} ₽\n"
message += f" Стало: {row['current_price']} ₽\n"
message += f" Изменение: {row['price_change_pct']:+.1f}%\n\n"
telegram_bot.send_message(chat_id=ADMIN_CHAT_ID, text=message)
Технический стек
- Оркестрация: Apache Airflow
- Парсинг: lxml, xmltodict, pandas
- Staging: PostgreSQL
- Трансформации: dbt (data build tool)
- DWH: ClickHouse
- BI: Metabase
- Мониторинг: Prometheus, Grafana
- Алерты: Telegram Bot API
Частые проблемы и решения
Проблема 1: Огромные XML файлы (500+ MB)
Решение: Streaming парсинг с lxml.etree.iterparse
from lxml import etree
def parse_large_xml(file_path: str):
"""Потоковый парсинг больших XML"""
context = etree.iterparse(file_path, events=('end',), tag='offer')
for event, elem in context:
# Обрабатываем один <offer>
yield {
'article': elem.get('id'),
'name': elem.findtext('name'),
'price': elem.findtext('price'),
}
# Очищаем память
elem.clear()
while elem.getprevious() is not None:
del elem.getparent()[0]
del context
Проблема 2: Несогласованная номенклатура
Решение: Fuzzy matching + ML-модель для дедупликации
from rapidfuzz import fuzz
def find_duplicates(articles_df: pd.DataFrame, threshold: float = 0.85):
"""Поиск дубликатов по нечеткому совпадению названий"""
duplicates = []
for i, row1 in articles_df.iterrows():
for j, row2 in articles_df.iloc[i+1:].iterrows():
similarity = fuzz.ratio(row1['name'], row2['name']) / 100
if similarity >= threshold:
duplicates.append({
'article1': row1['article'],
'article2': row2['article'],
'name1': row1['name'],
'name2': row2['name'],
'similarity': similarity
})
return pd.DataFrame(duplicates)
Проблема 3: Поставщики не соблюдают SLA
Решение: Мониторинг и автоматические напоминания
# Airflow sensor с таймаутом
from airflow.sensors.filesystem import FileSensor
wait_for_xml = FileSensor(
task_id='wait_for_supplier_xml',
filepath='/ftp/supplier_123/catalog.xml',
poke_interval=300, # проверяем каждые 5 минут
timeout=7200, # максимум 2 часа ждем
mode='poke',
dag=dag,
)
# При таймауте отправляем напоминание поставщику
def send_reminder_email():
send_email(
to='supplier@example.com',
subject='Напоминание: обновите каталог',
html_content='...'
)
Заключение
Интеграция разрозненных XML-каталогов в единое хранилище:
- ✅ Экономит 10-20 часов в неделю на ручной работе
- ✅ Снижает ошибки в ценах на 95%
- ✅ Дает актуальные данные в real-time
- ✅ Открывает возможности для аналитики (история, динамика, аномалии)
Окупаемость проекта: 2-4 месяца для компаний от 10 поставщиков.
Нужна помощь с интеграцией каталогов?
Мы строим ETL-пайплайны для интеграции любых источников данных: XML, CSV, Excel, API, базы данных. Создаем единое хранилище и настраиваем автоматическое обновление.
📞 +7 (924) 547-36-78 📧 info@bi-ai.ru 💬 Telegram: @bi_ai_team