Airflow Flashcards
(13 cards)
Работала ли с кастомными операторами?
Конечно.
Упаковала Spark-джобу в Airflow оператор
Spark джоба подключается к Oracle через JDBC, выбирает нужные данные (инкрементально), и загружает их в Hive (через Spark).
На основе данного кастомного оператора был создан DAG, который читает данные из Oracle и пишет в Hive. Он предназначен для переливки исторических данных
Выполняется периодически (еженедельно) для подтягивания пропущенных или обновлённых данных.
Кастомный оператор для Kafka и Spark я сделала максимально простым и удобным — он работает как шаблон, где уже преднастроены все соединения (Kafka, Spark, хранилище), и пользователю остаётся только указать: откуда и куда переливать данные. Такой подход особенно удобен для аналитиков и менее технических коллег — им не нужно разбираться в деталях запуска Spark Streaming или настройки Kafka-коннектора. Достаточно задать: название Kafka-топика, схему или таблицу назначения, путь до Spark-скрипта (если нужно).
От любого оператора наследуешься от которого тебе надо, либо свой метод, доп логику бахаешь и всё
Из каких компонентов состоит Airflow?
Scheduler — планирует DAG-и по расписанию.
Executor — запускает задачи (например, KubernetesExecutor или LocalExecutor).
Web UI — интерфейс для мониторинга и управления.
Metadata DB — хранит всю информацию: DAG-и, статусы, зависимости, XCom и др.
Worker — выполняет таски (если используется Celery/KubernetesExecutor).
Где все данные лежат в Airflow?
В metadata-базе данных (PostgreSQL или MySQL).
Хранятся: DAG Run, Task Instance, XCom, Connection, Variables, Logs (частично, если локально).
Помнишь какие executor есть?
SequentialExecutor — для разработки (1 таска за раз);
LocalExecutor — параллельные таски на одной машине;
CeleryExecutor — распределённый, с брокером (RabbitMQ/Redis);
KubernetesExecutor — таска = pod, масштабируется динамически;
Писала ли макросы или использовала существующие?
Использовала существующие макросы:
{{ ds }}, {{ execution_date }}, {{ prev_ds }} — для дат в SQL/шаблонах.
Знакома с сенсорами?
Да. Использовала:
ExternalTaskSensor — ждал выполнения другого DAG-а;
HivePartitionSensor — проверка появления партиции;
FileSensor — ожидание файла на HDFS
Sensor — это специальный тип Operator в Airflow, который ожидает наступления определённого условия, прежде чем передать управление дальше в DAG.
Что такое execution_date у DAG run?
Это логическая дата, за которую происходит обработка.
Она не всегда равна дате запуска DAG-а.
Например, если DAG запускается 1 июня в 00:00, то execution_date = 31 мая (данные за прошлый день).
SLA (Service Level Agreement) — это время, за которое задача должна выполниться.
Как проектировали ETL-пайплайны в предыдущих проектах?
Использовали Airflow как оркестратор (700+ DAG-ов).
Данные шли из Kafka, Oracle, PostgreSQL, HDFS.
Этапы: извлечение → нормализация (3НФ) → денормализация → витрины.
Раскладка по слоям: staging → core (3NF) → DML (витрины).
Использовался Spark (batch и streaming) для обработки, Hive и ClickHouse — как хранилища.
Как решались проблемы с динамическими схемами данных?
Использовали схему с пробросом всех полей в staging-слой.
Делали автоматическое определение новых колонок (schema evolution).
В Spark писали обёртки для автоматического добавления колонок с null по шаблону.
Также в Hive хранили схему и сравнивали с текущей.
Какие инструменты для проверки качества данных использовал?
Для контроля качества данных я реализовала гибридный подход, объединяющий автоматизированные проверки непосредственно в ETL-процессах и их оркестрацию в Airflow.
Основные метрики, на которых я сосредоточилась, — это актуальность, полнота и дубликаты.
Актуальность: проверяю, что в загружаемых данных есть записи с актуальной датой, чтобы убедиться, что данные обновляются своевременно. Если данные устарели, задача падает и генерируется алерт.
Полнота: Полнота — это метрика качества данных, которая показывает, какую долю значений в определённом поле (столбце) составляют заполненные, непустые (не NULL) записи по отношению ко всему объёму данных.
Полнота = Количество непустых значений/Общее количество записей
Дубликаты: ищу повторяющиеся значения по уникальному ключу. Если дубликаты есть — задача падает.
Эти проверки реализованы как отдельные Python-функции, каждая из которых выполняет конкретную проверку (актуальность, полнота, дубликаты).
В Airflow они обёрнуты в PythonOperator, который запускает соответствующую функцию в рамках DAG-а.
При запуске функции происходит подключение к данным (через SparkSession), выполнение необходимых вычислений и проверок.Эти проверки реализованы как отдельные Python-функции, каждая из которых выполняет конкретную проверку (актуальность, полнота, дубликаты).
Если проверка не проходит (например, полнота ниже порога или найдены дубликаты), функция выбрасывает исключение — задача падает, и Airflow фиксирует ошибку, в Telegram получаем алерт.
Актуальность
Проверка, что данные за нужный период загружены или обновлены.
Пример: в конце ETL проверяем, что есть записи с датой >= ожидаемой даты.
max_date = df.agg({“date_column”: “max”}).collect()[0][0]
if max_date < expected_date:
raise ValueError(f”Данные не актуальны: max_date={max_date}, ожидается >= {expected_date}”)
agg() — для вычисления агрегата по колонке, здесь max() для получения максимальной даты.
collect() — для получения результата агрегата на драйвере (т.к. agg() возвращает DataFrame).
Логика сравнения с ожидаемой датой — на уровне Python.
Полнота
Проверяем долю непустых значений в ключевых колонках.
non_null = df.filter(col(“important_column”).isNotNull()).count()
completeness_ratio = non_null / total if total > 0 else 0
threshold = 0.95 # допустимый минимум полноты
if completeness_ratio < threshold:
raise ValueError(f”Полнота ниже порога: {completeness_ratio:.2f} < {threshold}”)
count() — для подсчёта общего числа строк в DataFrame.
filter() + col().isNotNull() — для фильтрации строк, где указанный столбец не равен NULL.
Снова count() — подсчёт количества непустых значений.
Дальше простая арифметика для вычисления доли непустых значений.
Дубликаты
Проверяем наличие повторяющихся ключей.
duplicates = df.groupBy(“key_column”).count().filter(“count > 1”).count()
if duplicates > 0:
raise ValueError(f”Найдены дубликаты: {duplicates} повторяющихся ключей”)
groupBy() — группировка по уникальному ключу.
count() — подсчёт количества записей в каждой группе.
filter(“count > 1”) — фильтрация групп с дубликатами (где количество > 1).
Ещё один вызов count() для подсчёта количества таких дубликатных ключей.
Эти проверки реализованы внутри Spark, максимально близко к данным для эффективности и точности. Airflow служит оркестратором: запускает ETL-джобы и проверки последовательно в рамках DAG-а, отслеживает статусы задач и при ошибках — фиксирует падения и инициирует уведомления.
Какие метрики использовались для проверки качества данных?
Для мониторинга и визуализации метрик качества были созданы кастомные метрики Prometheus
data_quality_completeness_ratio — процент непустых значений
data_quality_duplicate_count — число дубликатов
data_quality_freshness — метрика актуальности данных
Эти метрики формируются в рамках Python-функций, где после подсчёта значений они отправляются в Prometheus Pushgateway — специальный компонент, который принимает метрики от batch-процессов.
В Airflow после выполнения проверки вызывается код, который пушит текущее значение метрики в Pushgateway с указанием уникального job-имени и лейблов.
На основе метрик, в Grafana строила дашборды с ключевыми метриками качества данных.
Настроила правила алертов в Prometheus Alertmanager, чтобы при нарушениях метрик качества данных — например, низкой полноте, обнаружении дубликатов или отсутствии свежих данных приходили уведомления в отдельный Telegram-канал нашей команды.
Как проводилась валидация данных между слоями (сырой, DDS, витрины)?
Сравнение агрегатов (кол-во строк, сумм, дат).
Сравнение ключей и бизнес-показателей.
Контроль соответствия справочников.
В Airflow DAG добавляли валидационные таски: .agg(), .exceptAll().
Как тестировали пайплайны и витрины данных?
Основной фокус был на end-to-end тестировании.
Использовали:
проверку количества строк/данных между слоями;
логгирование и исключения в Airflow;
fallback-логику и ретраи.
Для unit-тестов — точечно pytest (например, утилиты, DQ-функции, сериализация в Kafka).