Интеграция XML-каталогов в хранилище данных: от хаоса к порядку

Интеграция XML-каталогов в хранилище данных: от хаоса к порядку


Проблема: разрозненные каталоги от поставщиков

Типичная ситуация для дистрибьюторов и интернет-магазинов: у вас 10-50 поставщиков, каждый присылает свой каталог в XML. Форматы разные, структура отличается, обновляются в разное время.

Что обычно идет не так

  • Разные XML-схемы: один поставщик использует <price>, другой <priceRub>, третий <cost currency="RUB">
  • Несогласованная номенклатура: одна и та же позиция называется по-разному в каждом каталоге
  • Кодировки: Windows-1251, UTF-8, UTF-8 with BOM
  • Битые XML: незакрытые теги, невалидные символы, огромные файлы (500+ MB)
  • Отсутствие истории: новый файл полностью перезаписывает старый, динамику цен не отследить

Результат: менеджеры вручную сводят прайсы в Excel, тратят часы на поиск товаров, ошибаются в ценах.

Решение: Data Warehouse с автоматической интеграцией

Мы создаем единое хранилище данных (DWH), которое:

  1. Автоматически скачивает XML от поставщиков (FTP, HTTP, email)
  2. Парсит и валидирует данные
  3. Приводит к единой схеме
  4. Хранит историю изменений цен и остатков
  5. Обогащает дополнительными данными
  6. Предоставляет API и дашборды

Архитектура

┌─────────────────────────────────┐
│  Источники (поставщики)         │
│  - FTP серверы                  │
│  - HTTP endpoints               │
│  - Email вложения               │
│  - Google Sheets API            │
└──────────┬──────────────────────┘

┌──────────▼──────────────────────┐
│  ETL Pipeline (Airflow)         │
│  - Скачивание файлов            │
│  - Парсинг XML (lxml, xmltodict)│
│  - Валидация и очистка          │
│  - Приведение к единой схеме    │
└──────────┬──────────────────────┘

┌──────────▼──────────────────────┐
│  Staging (PostgreSQL)           │
│  - Сырые данные                 │
│  - Логи обработки               │
└──────────┬──────────────────────┘

┌──────────▼──────────────────────┐
│  Transformation (dbt)           │
│  - Дедупликация                 │
│  - Обогащение                   │
│  - Расчет производных метрик    │
└──────────┬──────────────────────┘

┌──────────▼──────────────────────┐
│  Data Warehouse (ClickHouse)    │
│  - Единая модель данных         │
│  - История изменений            │
│  - Витрины для аналитики        │
└──────────┬──────────────────────┘

┌──────────▼──────────────────────┐
│  Потребители данных             │
│  - BI дашборды (Metabase)       │
│  - API для сайта/1С             │
│  - Экспорт в файлы              │
└─────────────────────────────────┘

Кейс: B2B дистрибьютор стройматериалов

Исходная ситуация:

  • 24 поставщика
  • XML каталоги от 5 MB до 800 MB
  • Обновления: ежедневно, раз в неделю, по запросу
  • Менеджеры тратили 15 часов в неделю на сведение прайсов

Что сделали

1. Автоматизация загрузки

# Пример Airflow DAG для загрузки XML
from airflow import DAG
from airflow.operators.python import PythonOperator
from datetime import datetime, timedelta

def download_xml_from_ftp(supplier_config):
    """Скачивание XML с FTP сервера поставщика"""
    import ftplib

    ftp = ftplib.FTP(supplier_config['host'])
    ftp.login(supplier_config['user'], supplier_config['password'])

    # Скачиваем файл
    filename = supplier_config['filename']
    with open(f'/tmp/{filename}', 'wb') as f:
        ftp.retrbinary(f'RETR {filename}', f.write)

    ftp.quit()
    return f'/tmp/{filename}'

dag = DAG(
    'xml_catalog_etl',
    default_args={
        'owner': 'data-team',
        'retries': 3,
        'retry_delay': timedelta(minutes=5),
    },
    schedule_interval='0 6 * * *',  # каждый день в 6:00
    start_date=datetime(2025, 1, 1),
    catchup=False,
)

# Создаем task для каждого поставщика
for supplier in suppliers_config:
    PythonOperator(
        task_id=f'download_{supplier["name"]}',
        python_callable=download_xml_from_ftp,
        op_kwargs={'supplier_config': supplier},
        dag=dag,
    )

2. Универсальный парсер XML

import xmltodict
import pandas as pd
from typing import Dict, List

class XMLCatalogParser:
    """Универсальный парсер для XML каталогов"""

    def __init__(self, mapping_config: Dict):
        """
        mapping_config определяет соответствие полей:
        {
            'article': 'offer.@id',
            'name': 'offer.name',
            'price': 'offer.price',
            'currency': 'offer.currencyId',
            'stock': 'offer.@available',
        }
        """
        self.mapping = mapping_config

    def parse(self, xml_path: str) -> pd.DataFrame:
        # Парсим XML
        with open(xml_path, 'rb') as f:
            data = xmltodict.parse(f)

        # Извлекаем offers (путь может отличаться)
        offers = self._extract_offers(data)

        # Маппинг полей
        records = []
        for offer in offers:
            record = {}
            for target_field, source_path in self.mapping.items():
                record[target_field] = self._get_nested_value(offer, source_path)
            records.append(record)

        return pd.DataFrame(records)

    def _get_nested_value(self, obj, path: str):
        """Получение вложенного значения по пути 'a.b.c'"""
        keys = path.split('.')
        value = obj
        for key in keys:
            if key.startswith('@'):
                # Атрибут
                value = value.get(key, None)
            else:
                value = value.get(key, {})
        return value

3. Нормализация и валидация

def normalize_catalog(df: pd.DataFrame, supplier_id: int) -> pd.DataFrame:
    """Приведение к единой схеме"""

    # Добавляем метаданные
    df['supplier_id'] = supplier_id
    df['loaded_at'] = pd.Timestamp.now()

    # Чистим цены
    df['price'] = df['price'].str.replace(' ', '').str.replace(',', '.')
    df['price'] = pd.to_numeric(df['price'], errors='coerce')

    # Убираем невалидные записи
    df = df.dropna(subset=['article', 'price'])
    df = df[df['price'] > 0]

    # Нормализуем названия
    df['name'] = df['name'].str.strip().str.title()

    # Приводим остатки к bool
    df['in_stock'] = df['stock'].apply(lambda x: str(x).lower() in ['true', 'yes', '1', 'да'])

    return df

4. Отслеживание изменений (CDC)

-- ClickHouse: таблица с версионированием
CREATE TABLE catalog_history (
    article String,
    supplier_id UInt32,
    name String,
    price Decimal(10, 2),
    in_stock UInt8,
    valid_from DateTime,
    valid_to DateTime DEFAULT toDateTime('2099-12-31'),
    is_current UInt8 DEFAULT 1
) ENGINE = MergeTree()
ORDER BY (supplier_id, article, valid_from);

-- При загрузке новых данных закрываем старые версии
UPDATE catalog_history
SET
    valid_to = now(),
    is_current = 0
WHERE article = ?
  AND supplier_id = ?
  AND is_current = 1
  AND (price != ? OR in_stock != ?);

-- Вставляем новую версию
INSERT INTO catalog_history
    (article, supplier_id, name, price, in_stock, valid_from)
VALUES (?, ?, ?, ?, ?, now());

Результаты

МетрикаДоПосле
Время обновления каталога6-8 часов вручную30 минут автоматически
Ошибки в ценах~5% заказов0.2%
Задержка данных1-3 дняreal-time
Покрытие аналитикой0%100% (история, динамика)

Экономия: 15 ч/нед × 4 нед × 3000 ₽/ч = 180 000 ₽/мес на ФОТ менеджеров

Продвинутая аналитика

После интеграции открываются новые возможности:

1. Мониторинг цен конкурентов

-- ClickHouse: сравнение цен по одному товару у разных поставщиков
SELECT
    article,
    name,
    supplier_id,
    price,
    price - minPrice AS price_diff,
    round((price - minPrice) / minPrice * 100, 1) AS markup_percent
FROM catalog_history
JOIN (
    SELECT article, min(price) AS minPrice
    FROM catalog_history
    WHERE is_current = 1
    GROUP BY article
) USING article
WHERE is_current = 1
  AND in_stock = 1
ORDER BY markup_percent DESC
LIMIT 100;

2. Анализ динамики цен

-- Товары с максимальным изменением цены за неделю
SELECT
    article,
    name,
    supplier_id,
    price AS current_price,
    prev_price,
    round((price - prev_price) / prev_price * 100, 1) AS price_change_pct
FROM (
    SELECT
        article,
        name,
        supplier_id,
        price,
        lagInFrame(price) OVER (PARTITION BY article, supplier_id ORDER BY valid_from) AS prev_price
    FROM catalog_history
    WHERE valid_from >= now() - INTERVAL 7 DAY
)
WHERE prev_price > 0
ORDER BY abs(price_change_pct) DESC
LIMIT 50;

3. Алерты на критические события

# Отправка уведомления в Telegram при резком изменении цены
def check_price_anomalies():
    query = """
    SELECT article, name, supplier_id,
           current_price, prev_price,
           price_change_pct
    FROM price_changes_view
    WHERE abs(price_change_pct) > 20  -- изменение более 20%
      AND valid_from >= now() - INTERVAL 1 DAY
    """

    anomalies = clickhouse_client.query(query)

    if len(anomalies) > 0:
        message = "⚠️ Резкое изменение цен:\n\n"
        for row in anomalies:
            message += f"• {row['name']} ({row['article']})\n"
            message += f"  Было: {row['prev_price']}\n"
            message += f"  Стало: {row['current_price']}\n"
            message += f"  Изменение: {row['price_change_pct']:+.1f}%\n\n"

        telegram_bot.send_message(chat_id=ADMIN_CHAT_ID, text=message)

Технический стек

  • Оркестрация: Apache Airflow
  • Парсинг: lxml, xmltodict, pandas
  • Staging: PostgreSQL
  • Трансформации: dbt (data build tool)
  • DWH: ClickHouse
  • BI: Metabase
  • Мониторинг: Prometheus, Grafana
  • Алерты: Telegram Bot API

Частые проблемы и решения

Проблема 1: Огромные XML файлы (500+ MB)

Решение: Streaming парсинг с lxml.etree.iterparse

from lxml import etree

def parse_large_xml(file_path: str):
    """Потоковый парсинг больших XML"""
    context = etree.iterparse(file_path, events=('end',), tag='offer')

    for event, elem in context:
        # Обрабатываем один <offer>
        yield {
            'article': elem.get('id'),
            'name': elem.findtext('name'),
            'price': elem.findtext('price'),
        }

        # Очищаем память
        elem.clear()
        while elem.getprevious() is not None:
            del elem.getparent()[0]

    del context

Проблема 2: Несогласованная номенклатура

Решение: Fuzzy matching + ML-модель для дедупликации

from rapidfuzz import fuzz

def find_duplicates(articles_df: pd.DataFrame, threshold: float = 0.85):
    """Поиск дубликатов по нечеткому совпадению названий"""

    duplicates = []

    for i, row1 in articles_df.iterrows():
        for j, row2 in articles_df.iloc[i+1:].iterrows():
            similarity = fuzz.ratio(row1['name'], row2['name']) / 100

            if similarity >= threshold:
                duplicates.append({
                    'article1': row1['article'],
                    'article2': row2['article'],
                    'name1': row1['name'],
                    'name2': row2['name'],
                    'similarity': similarity
                })

    return pd.DataFrame(duplicates)

Проблема 3: Поставщики не соблюдают SLA

Решение: Мониторинг и автоматические напоминания

# Airflow sensor с таймаутом
from airflow.sensors.filesystem import FileSensor

wait_for_xml = FileSensor(
    task_id='wait_for_supplier_xml',
    filepath='/ftp/supplier_123/catalog.xml',
    poke_interval=300,  # проверяем каждые 5 минут
    timeout=7200,  # максимум 2 часа ждем
    mode='poke',
    dag=dag,
)

# При таймауте отправляем напоминание поставщику
def send_reminder_email():
    send_email(
        to='supplier@example.com',
        subject='Напоминание: обновите каталог',
        html_content='...'
    )

Заключение

Интеграция разрозненных XML-каталогов в единое хранилище:

  • ✅ Экономит 10-20 часов в неделю на ручной работе
  • ✅ Снижает ошибки в ценах на 95%
  • ✅ Дает актуальные данные в real-time
  • ✅ Открывает возможности для аналитики (история, динамика, аномалии)

Окупаемость проекта: 2-4 месяца для компаний от 10 поставщиков.


Нужна помощь с интеграцией каталогов?

Мы строим ETL-пайплайны для интеграции любых источников данных: XML, CSV, Excel, API, базы данных. Создаем единое хранилище и настраиваем автоматическое обновление.

📞 +7 (924) 547-36-78 📧 info@bi-ai.ru 💬 Telegram: @bi_ai_team