Airflow Flashcards

(13 cards)

1
Q

Работала ли с кастомными операторами?

A

Конечно.
Упаковала Spark-джобу в Airflow оператор
Spark джоба подключается к Oracle через JDBC, выбирает нужные данные (инкрементально), и загружает их в Hive (через Spark).
На основе данного кастомного оператора был создан DAG, который читает данные из Oracle и пишет в Hive. Он предназначен для переливки исторических данных
Выполняется периодически (еженедельно) для подтягивания пропущенных или обновлённых данных.
Кастомный оператор для Kafka и Spark я сделала максимально простым и удобным — он работает как шаблон, где уже преднастроены все соединения (Kafka, Spark, хранилище), и пользователю остаётся только указать: откуда и куда переливать данные. Такой подход особенно удобен для аналитиков и менее технических коллег — им не нужно разбираться в деталях запуска Spark Streaming или настройки Kafka-коннектора. Достаточно задать: название Kafka-топика, схему или таблицу назначения, путь до Spark-скрипта (если нужно).

От любого оператора наследуешься от которого тебе надо, либо свой метод, доп логику бахаешь и всё

How well did you know this?
1
Not at all
2
3
4
5
Perfectly
2
Q

Из каких компонентов состоит Airflow?

A

Scheduler — планирует DAG-и по расписанию.
Executor — запускает задачи (например, KubernetesExecutor или LocalExecutor).
Web UI — интерфейс для мониторинга и управления.
Metadata DB — хранит всю информацию: DAG-и, статусы, зависимости, XCom и др.
Worker — выполняет таски (если используется Celery/KubernetesExecutor).

How well did you know this?
1
Not at all
2
3
4
5
Perfectly
3
Q

Где все данные лежат в Airflow?

A

В metadata-базе данных (PostgreSQL или MySQL).
Хранятся: DAG Run, Task Instance, XCom, Connection, Variables, Logs (частично, если локально).

How well did you know this?
1
Not at all
2
3
4
5
Perfectly
4
Q

Помнишь какие executor есть?

A

SequentialExecutor — для разработки (1 таска за раз);
LocalExecutor — параллельные таски на одной машине;
CeleryExecutor — распределённый, с брокером (RabbitMQ/Redis);
KubernetesExecutor — таска = pod, масштабируется динамически;

How well did you know this?
1
Not at all
2
3
4
5
Perfectly
5
Q

Писала ли макросы или использовала существующие?

A

Использовала существующие макросы:
{{ ds }}, {{ execution_date }}, {{ prev_ds }} — для дат в SQL/шаблонах.

How well did you know this?
1
Not at all
2
3
4
5
Perfectly
6
Q

Знакома с сенсорами?

A

Да. Использовала:
ExternalTaskSensor — ждал выполнения другого DAG-а;
HivePartitionSensor — проверка появления партиции;
FileSensor — ожидание файла на HDFS

Sensor — это специальный тип Operator в Airflow, который ожидает наступления определённого условия, прежде чем передать управление дальше в DAG.

How well did you know this?
1
Not at all
2
3
4
5
Perfectly
7
Q

Что такое execution_date у DAG run?

A

Это логическая дата, за которую происходит обработка.
Она не всегда равна дате запуска DAG-а.
Например, если DAG запускается 1 июня в 00:00, то execution_date = 31 мая (данные за прошлый день).
SLA (Service Level Agreement) — это время, за которое задача должна выполниться.

How well did you know this?
1
Not at all
2
3
4
5
Perfectly
8
Q

Как проектировали ETL-пайплайны в предыдущих проектах?

A

Использовали Airflow как оркестратор (700+ DAG-ов).
Данные шли из Kafka, Oracle, PostgreSQL, HDFS.
Этапы: извлечение → нормализация (3НФ) → денормализация → витрины.
Раскладка по слоям: staging → core (3NF) → DML (витрины).
Использовался Spark (batch и streaming) для обработки, Hive и ClickHouse — как хранилища.

How well did you know this?
1
Not at all
2
3
4
5
Perfectly
9
Q

Как решались проблемы с динамическими схемами данных?

A

Использовали схему с пробросом всех полей в staging-слой.
Делали автоматическое определение новых колонок (schema evolution).
В Spark писали обёртки для автоматического добавления колонок с null по шаблону.
Также в Hive хранили схему и сравнивали с текущей.

How well did you know this?
1
Not at all
2
3
4
5
Perfectly
10
Q

Какие инструменты для проверки качества данных использовал?

A

Для контроля качества данных я реализовала гибридный подход, объединяющий автоматизированные проверки непосредственно в ETL-процессах и их оркестрацию в Airflow.
Основные метрики, на которых я сосредоточилась, — это актуальность, полнота и дубликаты.

Актуальность: проверяю, что в загружаемых данных есть записи с актуальной датой, чтобы убедиться, что данные обновляются своевременно. Если данные устарели, задача падает и генерируется алерт.
Полнота: Полнота — это метрика качества данных, которая показывает, какую долю значений в определённом поле (столбце) составляют заполненные, непустые (не NULL) записи по отношению ко всему объёму данных.
Полнота = Количество непустых значений/Общее количество записей
Дубликаты: ищу повторяющиеся значения по уникальному ключу. Если дубликаты есть — задача падает.

Эти проверки реализованы как отдельные Python-функции, каждая из которых выполняет конкретную проверку (актуальность, полнота, дубликаты).
В Airflow они обёрнуты в PythonOperator, который запускает соответствующую функцию в рамках DAG-а.
При запуске функции происходит подключение к данным (через SparkSession), выполнение необходимых вычислений и проверок.Эти проверки реализованы как отдельные Python-функции, каждая из которых выполняет конкретную проверку (актуальность, полнота, дубликаты).
Если проверка не проходит (например, полнота ниже порога или найдены дубликаты), функция выбрасывает исключение — задача падает, и Airflow фиксирует ошибку, в Telegram получаем алерт.

Актуальность
Проверка, что данные за нужный период загружены или обновлены.
Пример: в конце ETL проверяем, что есть записи с датой >= ожидаемой даты.
max_date = df.agg({“date_column”: “max”}).collect()[0][0]
if max_date < expected_date:
raise ValueError(f”Данные не актуальны: max_date={max_date}, ожидается >= {expected_date}”)

agg() — для вычисления агрегата по колонке, здесь max() для получения максимальной даты.
collect() — для получения результата агрегата на драйвере (т.к. agg() возвращает DataFrame).
Логика сравнения с ожидаемой датой — на уровне Python.

Полнота
Проверяем долю непустых значений в ключевых колонках.
non_null = df.filter(col(“important_column”).isNotNull()).count()
completeness_ratio = non_null / total if total > 0 else 0
threshold = 0.95 # допустимый минимум полноты
if completeness_ratio < threshold:
raise ValueError(f”Полнота ниже порога: {completeness_ratio:.2f} < {threshold}”)

count() — для подсчёта общего числа строк в DataFrame.
filter() + col().isNotNull() — для фильтрации строк, где указанный столбец не равен NULL.
Снова count() — подсчёт количества непустых значений.
Дальше простая арифметика для вычисления доли непустых значений.

Дубликаты
Проверяем наличие повторяющихся ключей.
duplicates = df.groupBy(“key_column”).count().filter(“count > 1”).count()
if duplicates > 0:
raise ValueError(f”Найдены дубликаты: {duplicates} повторяющихся ключей”)

groupBy() — группировка по уникальному ключу.
count() — подсчёт количества записей в каждой группе.
filter(“count > 1”) — фильтрация групп с дубликатами (где количество > 1).
Ещё один вызов count() для подсчёта количества таких дубликатных ключей.

Эти проверки реализованы внутри Spark, максимально близко к данным для эффективности и точности. Airflow служит оркестратором: запускает ETL-джобы и проверки последовательно в рамках DAG-а, отслеживает статусы задач и при ошибках — фиксирует падения и инициирует уведомления.

How well did you know this?
1
Not at all
2
3
4
5
Perfectly
11
Q

Какие метрики использовались для проверки качества данных?

A

Для мониторинга и визуализации метрик качества были созданы кастомные метрики Prometheus
data_quality_completeness_ratio — процент непустых значений
data_quality_duplicate_count — число дубликатов
data_quality_freshness — метрика актуальности данных

Эти метрики формируются в рамках Python-функций, где после подсчёта значений они отправляются в Prometheus Pushgateway — специальный компонент, который принимает метрики от batch-процессов.
В Airflow после выполнения проверки вызывается код, который пушит текущее значение метрики в Pushgateway с указанием уникального job-имени и лейблов.

На основе метрик, в Grafana строила дашборды с ключевыми метриками качества данных.
Настроила правила алертов в Prometheus Alertmanager, чтобы при нарушениях метрик качества данных — например, низкой полноте, обнаружении дубликатов или отсутствии свежих данных приходили уведомления в отдельный Telegram-канал нашей команды.

How well did you know this?
1
Not at all
2
3
4
5
Perfectly
12
Q

Как проводилась валидация данных между слоями (сырой, DDS, витрины)?

A

Сравнение агрегатов (кол-во строк, сумм, дат).
Сравнение ключей и бизнес-показателей.
Контроль соответствия справочников.
В Airflow DAG добавляли валидационные таски: .agg(), .exceptAll().

How well did you know this?
1
Not at all
2
3
4
5
Perfectly
13
Q

Как тестировали пайплайны и витрины данных?

A

Основной фокус был на end-to-end тестировании.

Использовали:
проверку количества строк/данных между слоями;
логгирование и исключения в Airflow;
fallback-логику и ретраи.
Для unit-тестов — точечно pytest (например, утилиты, DQ-функции, сериализация в Kafka).

How well did you know this?
1
Not at all
2
3
4
5
Perfectly