Школа
системного анализа
и проектирования
Автор: Анна Вичугова

Пакетный и потоковый ETL для PostgreSQL с AirFlow и с коннекторами Kafka

Введение

В этой статье на практических примерах рассматривается как и с помощью каких инструментов можно реализовать потоковый и пакетный ETL-процессы. В статье даются рекомендации по проектированию ETL-конвейеров.

Подробно рассмотрим:
■ что такое ETL и для чего он нужен?
■ на что обратить внимание при проектировании ETL?
■ пример проектирования пакетного ETL с Apache Airflow.
■ пример проектирования потокового ETL с Kafka.

Статья может быть интересна системным и бизнес-аналитикам, которые хотят получить общее представление о проектировании ETL.

Что такое ETL?

ETL — процесс обработки данных, состоящий из трёх шагов.
  1. E - Extract (Извлечение) — извлечение данных из источников;
  2. T - Transform (Преобразование) — обработка данных, например, устранение дубликатов, изменение типа, изменение регистров в строках и т.д.;
  3. L - Load (Загрузка) — сохранение данных в систему-приёмник (хранилище или озеро данных) для дальнейшего использования.
Набор действий, производимых с данными в ETL-процессе ещё называют ETL-конвейером.

Рис.1 — ETL-процесс верхнеуровнево

Обычно ETL применяется для обработки, генерируемыми различными источниками. Типовая задача ETL-конвейера — собрать данные из нескольких OLTP-систем, преобразовать и сохранить в OLAP-системе.

Например, если необходимо создать аналитический дашборд с большим количеством данных, которые хранятся в разных системах, предварительно нужно получить данные из нескольких источников, обработать и сохранить в единое хранилище для последующей аналитики.

На самом деле элементы ETL-процессов можно найти в абсолютно любой системе. Ввод данных пользователем в интерфейсе, их обработку и сохранение в базе можно рассматривать как ETL-процессы.

Ключевые вопросы проектирования ETL-процессов

Что необходимо продумать и учесть при проектировании ETL-процессов?
  1. Источники и приёмники данных. Откуда необходимо получать данные и что будет выступать конечным приёмником?
  2. Правила запуска. Процесс должен запускаться периодически по расписанию или при наступлении события/срабатывании триггера?
  3. Структуры данных. В каком формате, в какой структуре и с какими типами хранятся данные в источниках? В каком виде данные нужны приёмнику?
  4. Толерантность к потерям и дублям. Нужны ли повторные операции, или потери данных некритичны? Нужно ли обрабатывать дубли?
  5. Допустимая задержка. Как быстро данные должны поступать, проходить ETL-конвейер? Можно ли реализовать их обработку с какой-то периодичностью или для бизнеса важна минимальная задержка?
  6. Безопасность. Какие сервисы могут запускать ETL-процессы? Как обеспечить безопасность доступа к источникам данных?
  7. Бизнес-логика. Какую бизнес-логику необходимо наложить на данные на этапе преобразования?
  8. Ресурсы. Какие ресурсы нужны для обработки конвейером имеющихся объёмов данных?
Рис. 2 — Ключевые компоненты в архитектуре ETL-процесса

Примеры проектирования ETL

Существует две основных разновидности обработки данных: пакетная и потоковая. Рассмотрим на практических примерах как подходить к проектированию ETL при пакетной и потоковой обработке.

Пример проектирования пакетного ETL

Пакетная обработка — подход к обработке данных, при котором обрабатывается сразу большой объём данных, накопленный за определённый промежуток времени: пакет (batch).

Особенности пакетной обработки:
■ хорошо подходит для больших объёмов данных;
■ можно спрогнозировать нагрузку, потому что обработка происходит с определённой периодичностью;
■ простая отладка, повторяемость запуска конвейера;
■ ограниченность размера пакета данных;
■ запуск по расписанию;
■ возможна задержка как по времени, так и по актуальности — пока данные дожидаются обработки пакета, они могут стать неактуальными.

Пакетная обработка применяется чаще, потому что позволяет закрыть большую часть задач по управлению данными. Пример задачи для пакетной обработки: собрать и визуализировать статистику продаж за период (неделю, месяц, год) для маркетингового отдела.

На рынке много готовых решений для реализации пакетной обработки.
  1. Apache Airflow. Одно из самых зрелых решений, можно назвать стандартом в этой области.
  2. Luigi. Одна из альтернатив Airflow.
  3. Dagster. Одно из новых решений для реализации пакетной обработки.

ETL-конвейер реализуется в виде DAG — Directed Acyclic Graph (направленный ациклический граф). По сути DAG представляет собой цепочку задач.

Основная особенность DAG в том, что он направлен строго в одну сторону, то есть процесс не может вернуться на предыдущие шаги.

Рассмотрим пример простого пакетного ETL-конвейера, реализованного с помощью Apache Airflow.

Представим, что нам необходимо реализовать миграцию данных о продажах интернет-магазина в ElasticSearch (документо-ориентированная база данных с мощным поисковым движком). Исходные данные о продажах хранятся в PostgreSQL. Так процесс сводится к задачам:
■ извлечь данные из PostgreSQL;
■ преобразовать данные в нужный формат — в данном случае JSON;
записать данные в ElasticSearch.
Рис. 3 — Задачи пакетного ETL-процесса
DAG для этого процесса может выглядеть следующим образом:
Рис. 4 — Пример DAG
(Источник: Школа Больших Данных)
ETL-конвейеры для Apache Airflow представляют собой python-скрипты, в которых задаются задачи конвейера и порядок их выполнения.

Задача в скрипте может выглядеть так (Источник: Школа Больших Данных):
send_notification_task = TelegramOperator(
   task_id='send_notification_task',
   token=telegram_token,
   chat_id=telegram_chat_id,
   text='ETL-process has been done {} with result {}'.format(
       now.strftime("%m/%d/%Y, %H:%M:%S"),
       '{{ ti.xcom_pull(key="return_value", task_ids="load_task") }}'
   ),
   dag=dag
)
end_task = DummyOperator(task_id='end_task', dag=dag)
А так задается порядок выполнения задач:
start_task >> extract_task >> transform_task >> load_task >> send_notification_task >> end_task
В интерфейсе Apache Airflow можно:
■ формировать и просматривать список DAG вашего проекта;
■ запускать вручную ETL-конвейеры;
■ настраивать запускать конвейеры по расписанию;
■ отслеживать ход выполнения конвейеров;
■ просматривать логи.
Рис. 5 — ETL-конвейер в виде DAG в Apache Airflow
(Источник: Школа Больших Данных)
Передача данных между задачами в Apache Airflow осуществляется через внутренний механизм XCom. Этот механизм предназначен для передачи небольшого объёма данных. Если объём данных большой, необходимо учесть в архитектуре внешнее хранилище для передачи данных между задачами.

Пример проектирования потокового ETL

Потоковая обработка — подход к обработке данных, при котором обрабатываемые данные поступают неограниченным потоком (Stream) по мере вырабатывания их источником.

Особенности потоковой обработки:
■ размер пакета данных не ограничен;
■ запуск конвейера не привязан к расписанию, запускается по событию, в любой момент времени;
■ почти real-time обработка, минимальная задержка;
■ плохо подходит для обработки данных больших объёмов;
■ непредсказуемая нагрузка;
■ сложная отладка, низкая повторяемость процесса.

С помощью потоковой обработки обычно решаются задачи, в которых нужна минимальная задержка, например, мониторинг IoT-систем или анализ действий пользователей в социальных сетях.

Реализация потокового ETL возможна с помощью технологий потоковой обработки данных на примере Apache Kafka и Flink.

Пример простого потокового ETL-конвейера:
  1. Сперва нужно настроить логическую репликацию в PostgreSQL: включить режим логической репликации, создать слот репликации и определить таблицы, изменения в которых надо отслеживать. Например, когда в таблице заказов появилась новая запись.
  2. Запустить Kafka Connect и настроить коннектор Debezium, указав в конфигурации параметры подключения к PostgreSQL, имя слота репликации, перечень отслеживаемых таблиц или схем данных и параметры топика Kafka, куда надо публиковать события изменений.
  3. При возникновении изменений в отслеживаемых таблицах PostgreSQL коннектор Debezium преобразует их в текстовые записи, например, в формате JSON, и публикует в заданные топики Kafka. Обычно для каждой таблицы создаётся отдельный топик.
  4. Потребление из Kafka тоже реализовано с помощью коннектора, который считывает опубликованные события изменений из заданного топика и сохраняет эти данные в Elasticsearch.
  5. Данные в ElasticSearch можно визуализировать на диаграммах дашбордов в Kibana.
Рис. 6 — Потоковый ETL-конвейер
(Источник: Школа Больших Данных)

Рекомендации по проектированию

Что нужно учесть при проектировании ETL-конвейеров.
  1. Сущность данных. Каков объём и формат данных, какая структура? Может ли меняться структура данных? Есть ли схема?
  2. Характер работы источника данных. Насколько часто и в какой момент источник поставляет данные?
  3. Периодичность запуска и сложность ETL-конвейера. Как часто нужно запускать конвейер? Насколько сложную логику преобразования необходимо реализовать?
  4. Толерантность приёмника к задержке. Необходима поставка данных в реальном времени или допустима задержка обработки данных?
  5. Надёжность источников и приёмников данных. Что делать, если нарушился конвейер? Необходимо ли попробовать восстановить или повторить конвейер? Как логировать ошибки?
  6. Опыт команды и соответствие техрадару.

Резюме

  1. ETL — это процесс обработки данных, состоящий из трёх шагов: Извлечение, Преобразование, Загрузка.
  2. Чаще всего ETL применяется для сбора данных из нескольких источников, приведения их к необходимому формату и сохранения в хранилище или базе данных для последующей аналитики.
  3. При проектировании ETL-процессов важно обращать внимание на структуру и формат исходных данных, надёжность источников, допустимость задержки и сложность бизнес-логики.
Об авторе

■ Другие статьи по теме Базы данных

Показать еще