Spark Flashcards

(68 cards)

1
Q

Что такое Spark? Зачем нужен? Где использовали?

Теги: #wildberries

A

Spark – это распределённая вычислительная платформа для быстрой обработки больших данных (batch/stream). Используется для ETL, аналитики, machine learning и т.д. Применяется во множестве компаний (Netflix, Uber, e-commerce), где важна высокая скорость анализа данных.

How well did you know this?
1
Not at all
2
3
4
5
Perfectly
2
Q

Spark Streaming: near real-time или real-time? Сравнение Spark vs Flink

Теги: #wildberries

A

Коротко: по умолчанию Spark Structured Streaming — это near-real-time (микробатчи с типичными задержками от сотен миллисекунд до секунд). В Spark есть два пути к меньшим задержкам: экспериментальный Continuous Processing (\~1 мс, но сильно ограничен по операциям) и новый в Databricks Real-time mode (паблик превью, миллисекундные p99 без переписывания кода, только в Databricks). Flink — потоковый движок “record-by-record”, рожденный для миллисекундных задержек и сложных stateful сценариев. (spark.apache.org, Microsoft Learn, Databricks)

Что обычно называют “real-time”

  • Hard real-time (жёсткие дедлайны в микросекундах/миллисекундах, детерминизм) — ни Spark, ни Flink под это не заточены.
  • Soft/near real-time — практично для бизнеса: задержки от \~10–500 мс до нескольких секунд, с фолт-толерантностью и exactly-once/at-least-once. Под это попадают и Spark, и Flink (с разными профилями задержек).

Модель исполнения и задержки

  • Spark Structured Streaming: микро-батчи по триггеру (например, каждые 1–10 с; можно настраивать до \~100 мс). Есть Continuous Processing (\~1 мс, experimental, поддерживает только map/фильтры, без агрегаций/джойнов) и Real-time mode в Databricks (паблик превью, p99 до единиц–десятков мс). (docs.databricks.com, spark.apache.org, Databricks)
  • Flink: истинный streaming “по событию”, с backpressure и чекпоинтами; для super-low-latency сценариев рекомендуют at-least-once (несколько мс), для exactly-once задержка возрастает из-за транзакций/чекпоинтов. (nightlies.apache.org)

Семантика времени/порядка и состояние

  • Оба движка поддерживают event-time и watermarks для работы с опоздавшими событиями. У Flink — это центральная модель (богатые стратегии watermarks, таймеры, fine-grained контроль). (nightlies.apache.org)
  • Состояние:
    • Spark — state store с чекпоинтами; есть RocksDB state store provider (встроен с Spark 3.2; на Databricks включается конфигом). (spark.apache.org, docs.databricks.com)
    • Flink — ключевое состояние (часто в RocksDB), инкрементальные чекпоинты, таймеры на ключах. (nightlies.apache.org)

Гарантии доставки/записи

  • Spark: end-to-end exactly-once достижим при “реплейбл” источниках и идемпотентных/транзакционных сингках (Kafka/Delta и т. п.) за счёт отслеживания оффсетов + WAL/чекпоинты (в микробатч-режиме). Continuous — только at-least-once. (spark.apache.org)
  • Flink: exactly-once через интеграцию чекпоинтов с 2-фазным коммитом в сингках (KafkaSink DeliveryGuarantee.EXACTLY_ONCE; общее API SupportsCommitter/Two-Phase Commit). (nightlies.apache.org)

Практическая «рыба»: когда что брать

Бери Flink, если нужно:

  • p99 < 100 мс на stateful-операциях (джойны/окна/таймеры), сложная работа с out-of-order, тонкая реакция на backpressure. (nightlies.apache.org)

Бери Spark, если:

  • основная экосистема уже на Spark (batch/SQL/ML), SLA — секунды/сотни мс, нужны единые API/каталог/оркестрация;
  • хочешь попробовать миллисекунды без ре-платформинга в Databricks (Real-time mode, превью). В OSS можно Continuous, но он сильно ограничен. (Databricks, spark.apache.org)

Вывод

  • Ответ на “Spark Streaming: near real-time или real-time?” — по умолчанию near-real-time. Реальный real-time (миллисекунды) возможен: (а) в OSS Spark — лишь для узкого класса задач через Continuous (at-least-once), (б) в Databricks Real-time mode (превью) — для более широкого набора задач без переписывания кода. (spark.apache.org, Databricks)
  • Flink — ваш выбор, если главная цель — милисекундная реакция на сложные event-time/состояние-тяжёлые пайплайны с гибкой обработкой out-of-order. (nightlies.apache.org)

Если хочешь, дам конкретный «чек-лист» под твой кейс (тип источников/синков, SLA p95/p99, объём/кардинальность состояния, требования к семантике).

How well did you know this?
1
Not at all
2
3
4
5
Perfectly
3
Q

Что знаешь про PyArrow? Как взаимодействует со Spark?

Теги: #wildberries

A

PyArrow — это Python-обёртка для Apache Arrow, формата колоночного хранения данных в памяти, который оптимизирован для быстрого доступа и передачи данных между процессами или системами.

В контексте Spark он используется в основном в двух местах:

Arrow-based оптимизация Pandas UDFs (Vectorized UDFs)
Когда мы используем Pandas UDF (а не обычные Python UDF), Spark сериализует данные в формате Arrow, что позволяет избежать затратных сериализаций в pickle и делает передачу данных между JVM и Python значительно быстрее.

toPandas() и fromPandas()
При вызове .toPandas() Spark тоже использует Arrow (если включено через spark.sql.execution.arrow.pyspark.enabled), что ускоряет конвертацию Spark DataFrame в Pandas DataFrame.

Важно упомянуть edge cases для собеса (показывает зрелость):

Arrow не поддерживает все типы Spark. Например, сложные nested типы или специфичные типы (MapType, ArrayType со сложной вложенностью) могут привести к падениям или отключению Arrow.

Arrow работает только с Batch-ориентированными задачами, для Streaming — не применим.
How well did you know this?
1
Not at all
2
3
4
5
Perfectly
4
Q

Функции трансформации и действия, в чём разница, примеры?

Теги: #Ярослав #Билайн, #rubbles

A

В Apache Spark существуют два основных типа операций: трансформации и действия. Они имеют разные характеристики и поведение при работе с RDD, DataFrame и Dataset.

Трансформации
Определение: Трансформации — это операции, которые преобразуют один RDD (или DataFrame, Dataset) в другой. Они ленивые (lazy), что означает, что они не выполняются немедленно, а откладываются до тех пор, пока не будет вызвано действие.
Примеры трансформаций:
map(func): Применяет функцию к каждому элементу RDD и возвращает новый RDD.
filter(func): Возвращает новый RDD, содержащий только элементы, для которых функция возвращает True.
flatMap(func): Применяет функцию к каждому элементу RDD и возвращает новый RDD, где каждый элемент может быть разделен на несколько элементов.
groupByKey(): Группирует значения по ключу и возвращает новый RDD с ключами и списками значений.

Действия
Определение: Действия — это операции, которые возвращают результат или производят побочный эффект. Они вызывают выполнение всех трансформаций, которые были применены к RDD до момента вызова действия.
Примеры действий:
collect(): Собирает все элементы RDD и возвращает их в виде списка.
count(): Возвращает количество элементов в RDD.
first(): Возвращает первый элемент RDD.
reduce(func): Объединяет элементы RDD с использованием указанной функции.
saveAsTextFile(path): Сохраняет RDD в виде текстового файла по указанному пути

How well did you know this?
1
Not at all
2
3
4
5
Perfectly
5
Q

Что такое Spark Catalyst и как работает?

Теги: #Ярослав

A

Catalyst – это оптимизатор запросов в Spark SQL. Он строит деревья логического и физического плана, применяет правила оптимизации (pushdown, reordering джоинов и т.д.), а затем формирует оптимальный план выполнения. Основные фишки - Анализатор (Analyzer), Логический план (Logical Plan) Оптимизатор (Optimizer) Физический план (Physical Plan) Выбор плана и генерация кода.

Он проходит четыре основных этапа:

Analysis (Анализ)
Проверка синтаксиса и разрешение всех ссылок (например, имен таблиц и столбцов) до конкретных объектов. Если что-то не существует — ошибка на этом этапе.

Logical Optimization (Логическая оптимизация)
Применяются правила оптимизации, например:

    Упрощение выражений (WHERE 1=1 убирается)

    Удаление ненужных колонок (Project pruning)

    Push down фильтров (фильтры ближе к источнику)

Physical Planning (Физическое планирование)
Создание возможных физических планов (разные способы join, partitioning) и выбор лучшего по стоимости (Cost Model).

Code Generation (Whole-stage code generation)
Spark генерирует Java bytecode для выполнения, что значительно ускоряет исполнение (уменьшает количество вызовов JVM).
How well did you know this?
1
Not at all
2
3
4
5
Perfectly
6
Q

Какие есть коннекторы у Spark? Особенности коннектора к Greenplum?

Теги: #wildberries

A

Общие коннекторы
* JDBC: универсальный способ для подключения к реляционным базам (PostgreSQL, MySQL, Greenplum и т.д.).
* Kafka: чтение/запись стриминговых данных.
* HDFS/S3: работа с файлами в распределённом хранилище.
* Cassandra и прочие NoSQL-хранилища.

Greenplum
* Через JDBC: требуется соответствующий драйвер и указание url, table, user, password. Работает «из коробки», но не всегда максимально эффективно для больших данных.
* PXF (Parallel eXtensible Framework): специальный коннектор, который умеет параллельно выгружать и загружать данные в Greenplum. Он устанавливается на стороне Greenplum и позволяет Spark обращаться к Greenplum, используя параллельные потоки (каждый сегмент Greenplum может читать/записывать данные самостоятельно).

Основная особенность PXF — параллелизм: данные распределяются по сегментам Greenplum, а Spark читает и пишет в эти сегменты напрямую, что даёт высокую скорость обмена.

How well did you know this?
1
Not at all
2
3
4
5
Perfectly
7
Q

Spark: чему соответствует каждая job в коде?

Теги: #Cian

A

В Spark есть две ключевые концепции:
Трансформации (transformations) — «ленивые» операции (map, filter, join и т.д.), которые только формируют план вычислений (DAG), но сами не запускают обработку данных.

Действия (actions) — операции, которые действительно запускают вычисления (например, count, collect, show).

Каждый раз, когда в коде встречается action, Spark запускает новую job.
Одна job может включать в себя несколько stages — этапов выполнения, разбитых по границам shuffle (см. вопрос про stages).

How well did you know this?
1
Not at all
2
3
4
5
Perfectly
8
Q

Из-за чего job разделяется на одну и несколько stages?

Теги: #Cian

A

Spark разбивает job на stages, когда возникает операция, требующая shuffle – широкая трансформация.

  • Нarrow-трансформации (map, filter, etc.) не требуют shuffle: данные передаются по цепочке (pipe-line) в рамках одного executor.
  • Wide-трансформации (reduceByKey, join, groupBy и т.д.) вызывают shuffle, то есть нужно перегруппировать данные между executor’ами.

Как только в плане встречается shuffle, Spark завершает текущий этап (stage) и начинает новый после shuffle. Поэтому job может иметь один или несколько stages.

How well did you know this?
1
Not at all
2
3
4
5
Perfectly
9
Q

Какие есть типы джойнов в Spark (логические и физические)? Как они работают?

Теги: #Cian, #Билайн, #мир #Ярослав

A
  1. Логические типы join‑ов и их точная семантика
  • inner
    Возвращает строки, где условие истинно для обеих сторон.
    ‑ Дубликаты и порядок строк не гарантируются.
    ‑ Строка с NULL в ключе обычно «пролета­ет» (условие = даёт UNKNOWN, а не TRUE)([Apache Spark][1]).
  • left(left outer)
    Все строки левой, плюс столбцы правой, когда есть совпадение, иначеNULL.
    Часто используют как «enrich‑lookup» и как промежуточный этап в CDC‑процедурах — несовпавшие строки легко выловить через WHERE right.key IS NULL([Apache Spark][1]).
  • right(right outer)
    Симметричен левому; в Spark используется реже, потому что тот же эффект дают left после перестановки сторон([Apache Spark][1]).
  • full(full outer)
    Объединяет оба множества целиком; не‑совпавшие стороны получают NULL.
    Полезен в DI‑процессах («что исчезло / появилось между двумя снапшотами?»)([Apache Spark][1]).
  • cross
    Декартово произведение без условия. Spark требует либо явного crossJoin, либо конфигурации spark.sql.crossJoin.enabled=true. Кол‑во строк = N × M, поэтому план строго проверяется оптимизатором([Apache Spark][1]).
  • left_semi
    «Фильтр по существованию»: из левой берутся только те строки, что нашли пару справа, причём в результате остаются только левый набор столбцов.
    ‑ Используется вместо EXISTS, IN (…), «semi‑lookup» перед тяжёлым джойном.
    ‑ Дубликаты левой стороны сохраняются; правая сторона не выводится([Apache Spark][1]).
  • left_anti
    Антипод semi: возвращает строки левой, для которых пары нет. Это удобнее, чем LEFT JOIN … WHERE right.key IS NULL, потому что противодействует случайным дубликатам справа и короче пишется.
    Применяется для «NOT EXISTS», дедупликации «новых» записей и т. д.([Apache Spark][1]).

> Правых semi/anti нет; при необходимости меняют местами таблицы.

  1. Как Catalyst превращает логику в физику—алгоритмы, шагзашагом

2.1 BroadcastHash Join(Map side join)

  1. Оценивается размер меньшей стороны; если ≤ autoBroadcastJoinThreshold (10 MiB по умолчанию) или стоит хинт broadcast, Spark собирает эту часть на драйвер и рассылает executors как broadcast‑variable([Apache Spark][2]).
  2. В каждом task строится неизменяемая хеш‑таблица.
  3. Большая сторона потоково сканируется, ключ пробивается в хеше, совпадения сразу эмитятся.

Плюсы: нулевой shuffle → минимальный сетевой трафик; O(N) по большой стороне.
Минусы: риск OOM при недооценке объёма.

2.2 Sort‑Merge Join(SMJ)—«Сначала разложи, потом отсортируй, потом склей»

  1. Shuffle‑по ключу
    Spark пересылает строки обеих таблиц так, чтобы все записи с одним и тем же хэш ключом оказались вместе (хэш партицирование). У каждой строки берут join‑ключ, считают hash(key), затем по модулю числа партиций определяют «коробку». Все строки с одним и тем же ключом попадают в одну коробку (на один executor). Это и называется «шардинг по ключу».
  2. Sort‑внутри каждой «почты»
    Уже локально executors сортируют партициию по ключу. Сложность \~ N log N на партициию.
  3. Merge‑два курса
    Теперь в каждой партиции у нас два отсортированных списка (левая и правая таблица). Два указателя «бегут» синхронно, находят одинаковые ключи и сразу пишут результат. Это уже линейно (O(N+M)) — сами данные больше не перемещаются. ([SparkCodehub][1])

Когда брать SMJ

  • Ключевое условие — равенство (=).
  • Обе стороны слишком велики для broadcast.
  • Есть место на диск/SSD для временных файлов сортировки.
  • Плохо, если один ключ встречается миллион раз: возникнет «толстая» партиция; AQE обычно её дробит.

2.3 Shuffle Hash Join(SHJ)—«Разложи, построй хеш‑таблицу, пробивай»

  1. Shuffle‑по ключу
    Та же пересылка, что и в SMJ: строки разъезжаются к executors по hash(key).
  2. Выбери «малую» сторону
    Внутри партиции Spark смотрит, какая из двух половинок меньше (после фильтров). Её грузят в оперативку и строят хеш‑таблицу: ключ → список строк.
  3. Стриминг второй стороны
    Вторую (большую) таблицу читают построчно: для каждой строки ключ ищется в хеше, совпадения сразу пишутся в выход.

Если мини‑таблица неожиданно не влезла в память, Spark проливает её на диск частями («Grace‑loop») и повторяет шаг 3 по диапазонам хеша. Поэтому отдельной стратегии «Grace Hash» в плане нет — это резервный режим SHJ. ([Medium][2], [Medium][3])

Когда SHJ выгоден

  • Ключ‑равенство, как и в SMJ.
  • После фильтров хоть одна сторона помещается в ОЗУ одного task‑а.
  • Меньше работы, чем сортировка, но чувствителен к ООМ: хеш‑таблица живёт в RAM.

2.4 Broadcast Nested Loop Join(BNLJ)—«Раздай словарик и перебери»

  1. Малую сторону broadcast
    Spark целиком копирует маленькую таблицу в память каждого executor‑а (ограничение — размер ≤ autoBroadcastJoinThreshold, дефолт 10 МиБ, либо явный хинт broadcast).
  2. Для каждой строки большой — цикл по маленькой
    Исполняется буквальный вложенный for: берём строку из большой таблицы и сравниваем с каждой строкой маленькой, применяем условие (=, <, BETWEEN,UDF — любой предикат).
  3. Если условия нет → Cross
    Когда предикат опущен, Spark из того же плана делает чистый декартов продукт.

Такая стратегия включается автоматом для неравенственных (<, >, !=) джойнов или когда оптимизатор не смог использовать хеш/сорт‑мердж. Она проста, но сложностьO(N × b), где b—маленькая таблица, поэтому работает только при реально малых объёмах. ([Stack Overflow][4], [bigdatainrealworld.com][5])

2.5 CartesianProductExec(Cross Join) — «Перебираем всё со всем»

Никакого условия, никакой оптимизации:

  • Spark сознательно требует df.crossJoin(df2) или включённый флаг spark.sql.crossJoin.enabled=true, чтобы защититься от случайных ошибок.
  • Кол‑во строк = N × M.
  • Используется в тестах, при генерации комбинаций или если BNLJ был принудительно отключён. ([Stack Overflow][6])
  1. Оптимизации, влияющие на выбор стратегии
  • Bucket‑join— предрассортировка и бакетизация обеих таблиц: Spark может пропустить shuffle и сразу стартовать SMJ.
  • Skew‑join mitigation (AQE)— если во время выполнения партиция оказывается аномально тяжёлой, Spark дробит её и дублирует малую сторону; включается флагом spark.sql.adaptive.skewJoin.enabled([Apache Spark][2]).
  • DynamicPartitionPruning— подзапрос с semi/inner‑join отдаёт список ключей, по которым Spark отбрасывает лишние партиции до реального джойна.

Куда это всё «пришивается» к логике

Catalyst идёт сверху вниз:

  1. Пытается BHJ—если хотя бы одна сторона broadcast‑able.
  2. Если это екви‑джойн, проверяет SHJ vs SMJ (выбор зависит от spark.sql.join.preferSortMergeJoin, размера и флагов AQE).
  3. Если условия сложные—берёт BNLJ; если условий нет—Cartesian.
  4. После запуска AQE может динамически переключить SMJ → BHJ или включить skew‑split, если рантайм‑статистика это позволяет([Apache Spark][2]).

Как выбрать алгоритм?

1 — смотрим на размер данных:

Небольшая таблица → Broadcast Join

Большие таблицы → Sort-Merge или Shuffle Hash

2 — смотрим на структуру данных:

Отсортированы → Sort-Merge

Skewed-ключи → Skew Join

3 — инфраструктура:

Spark → используйте хинты (BROADCAST, MERGE)

Реляционная СУБД → опирайтесь на планировщик и индексы

❕Запомнить:

Nested Loop и Hash Join - база для СУБД.
Broadcast и Sort-Merge - основа для распределенных систем.
Всегда анализируйте план выполнения (explain в SQL, df.explain() в Spark) + метрики (память, сеть).
How well did you know this?
1
Not at all
2
3
4
5
Perfectly
10
Q

client / cluster mode в спарке

Теги: #Cian

A

В client mode драйвер запускается локально, а executors – в кластере. В cluster mode драйвер также запускается внутри кластера (например, на Yarn или Kubernetes).

How well did you know this?
1
Not at all
2
3
4
5
Perfectly
11
Q

Параметры spark submit

Теги: #Ярослав

A

Основные:
* Spark Submit — это команда, используемая для запуска приложений Apache Spark на кластере. Она принимает множество параметров, которые позволяют настраивать поведение приложения и управлять ресурсами. Вот основные параметры spark-submit:

–class: указывает главный класс приложения.
–master: определяет мастер-узел Spark
–deploy-mode: режим развертывания приложения: client или cluster.
–name: задает имя приложения в интерфейсе управления Spark.
–executor-memory: определяет объем памяти для каждого исполнителя
–driver-memory: объем памяти для драйвера
–conf: позволяет задать дополнительные настройки Spark.
–executor-cores: количество ядер CPU для каждого исполнителя.
–num-executors: общее количество исполнителей
–files: передает дополнительные файлы в приложение, которые будут доступны в рабочей директории

How well did you know this?
1
Not at all
2
3
4
5
Perfectly
12
Q

Чем кэширование отличается от broadcast join?

Теги: #мир

A

Кэширование (cache/persist)
1. Сохраняет результат вычислений (RDD/DataFrame) в оперативной памяти (или на диск, если не хватает памяти)
2. Позволяет переиспользовать уже рассчитанные данные при повторных обращениях, сокращая время вычислений
3. Типичное применение: несколько разных действий (actions) на одном и том же наборе данных

Broadcast Join
1. «Рассылает» (broadcast) маленькую таблицу/датасет на все узлы кластера, где она хранится в памяти
2. Ускоряет join с большой таблицей, поскольку не требует shuffle
3. Типичное применение: одна таблица очень мала, а другая – большая
Это два разных механизма оптимизации: кэширование помогает при многократном использовании одного набора данных, а broadcast join ускоряет соединение (join), когда один из датасетов невелик.

How well did you know this?
1
Not at all
2
3
4
5
Perfectly
13
Q

Что такое spill?

Теги: #Ярослав

A

Скорость спарка в основном завязана на вычислениях в оперативной памяти. Если данные не помещаются в ОЗУ, происходит spill - сохранение промежуточных результатов на диск, который в разы медленнее оперативной памяти. Может происходить как при вычислениях на executor’е, так и при сборе всех результатов на driver. Решение - увеличивать количество кусочков, на которые делятся данные (партиций) или увеличивать лимиты ОЗУ.

How well did you know this?
1
Not at all
2
3
4
5
Perfectly
14
Q

Чем отличаются RDD, DataFrame и Dataset? В чём особенности каждого?

Теги: #мир, #Билайн #Ярослав

A

RDD (Resilient Distributed Dataset) — низкоуровневая структура данных в Spark, которая предоставляет API для работы с распределенными данными. RDD immutable (неизменяемый) и поддерживает ленивые вычисления. Подходит для гибких, нестандартных операций с данными, но не имеет оптимизаций и схемы, что снижает производительность.

DataFrame — это схематизированный набор данных, представленный в виде таблицы с колонками и строками. DataFrame поддерживает API с SQL-подобными операциями, использует Catalyst-оптимизатор и Tungsten для выполнения запросов, что повышает производительность. DataFrame легче использовать для структурированных данных и анализа, но он менее гибок, чем RDD, и не поддерживает строгую типизацию.

Dataset — объединяет свойства RDD и DataFrame, добавляя строгую типизацию данных (type-safe). Dataset компилируется с проверкой типов, использует Catalyst и Tungsten, что улучшает производительность. Поддерживает SQL-подобный API и предоставляет методы RDD для трансформаций. Полноценно доступен только в Scala и Java, что ограничивает использование в Python.

Краткое сравнение
RDD: Гибкость, отсутствие схемы, нет оптимизаций. Подходит для нестандартных и сложных вычислений.
DataFrame: SQL-подобный API, автоматические оптимизации, высокая производительность. Предпочтителен для структурированных данных.
Dataset: Строгая типизация, производительность DataFrame, поддержка RDD API. Лучший выбор для больших типизированных данных в Scala и Java.

How well did you know this?
1
Not at all
2
3
4
5
Perfectly
15
Q

Какие бывают типы кэширования?

Теги: #мир

A

Memory Only
o Сохраняет данные в оперативной памяти, при нехватке – пересчитывает недостающие партиции.

Memory and Disk
o Пытается сохранить данные в память, а излишек складывает на диск.

Disk Only
o Сохраняет данные только на диск, не используя память.

Memory Only Serialized
o Хранит данные в памяти в сериализованном виде (например, с помощью Kryo), что экономит память, но требует затрат на сериализацию/десериализацию.

Memory and Disk Serialized
o Аналогично предыдущему пункту, но при нехватке памяти выгружает сериализованные данные на диск.
При выборе метода учитывают: объём данных, скорость доступа и избыточность вычислений (что «дешевле» – пересчитать или сохранить?).

How well did you know this?
1
Not at all
2
3
4
5
Perfectly
16
Q

Как выбрать количество ядер на executor? Как увеличение числа ядер влияет на память (OutOfMemory)?

Теги: #мир

A

Короткий ответ

Количество ядер (spark.executor.cores) — это прежде всего про параллелизм внутри одного JVM-процесса. Чем больше ядер, тем больше задач (tasks) могут выполняться одновременно в одном executor’е, а значит тем выше суммарное потребление его памяти. Если память, фактически нужная всем одновременным задачам и внутренним буферам Spark, превышает то, что выделено параметрами spark.executor.memory + spark.executor.memoryOverhead, JVM умирает с java.lang.OutOfMemoryError.

Ниже — разбор по пунктам, без упрощений и «магических» чисел.

  1. Что физически происходит, когда вы прибавляете ядра
  2. Каждое ядро ≈ один рабочий поток Spark
    Executor создаёт пул, и TaskScheduler отдает в него до spark.executor.cores задач.
  3. У каждой задачи свой набор структур:
    • объектные строчки RDD/Dataset, пока они не будут отброшены из mem-store;
    • массивы shuffle-буферов (Sort-Shuffle, Tungsten-shuffle writer);
    • off-heap страницы для сортировки (если включён spark.memory.offHeap.enabled);
    • в PySpark — отдельный Python-процесс и его heap.
  4. Память внутри JVM жёстко поделена:
    • Unified Memory (storage + execution) ≈ 60 % executor-heap по умолчанию;
      сюда лезут и кэш-блоки, и временные буферы shuffle, и агрегации.
    • Остальное — user-heap, метаданные классов, строчки, сериализованные объекты и др.
    • spark.executor.memoryOverhead резервируется за пределами Java-heap: native-буферы Netty, off-heap страницы, Python workers, Arrow, JNI-компоненты (Parquet, ORC, RocksDB и т. д.).
  5. Когда добавляете ядра, растёт только параллелизм, но не квота памяти.
    → «память на задачу» ≈ executorHeap / concurrentTasks.
    Если раньше одну тяжёлую задачу держали в 300 МБ и всё проходило, то после увеличения ядер вдвое та же задача получает лишь \~150 МБ, и при том же объёме данных начинается OOM.
  1. Практический алгоритм выбора числа ядер

Шаг 1. Определитесь, что лимитирует джобу

  • CPU-bound (чтение, форматирование, UDF-ы без крупной агрегации) — больше ядер окупается.
  • Memory-/shuffle-bound (wide joins, groupByKey на больших картах, строите крупный в-памяти кэш) — часто выгоднее меньше ядер и несколько executor’ов, чем монолит с 15 ядрами.

Шаг 2. Измерьте фактическое потребление памяти на задачу

  • Запустите job в Spark UI, посмотрите «Peak Execution Memory» на самом тяжёлом stage.
  • Умножьте на \~1.3–1.5 для запаса под GC фрагментацию и неучтённые структуры.

Шаг 3. Посчитайте безопасное executor.cores

safeCores = floor( heapBytes / (peakTaskMemBytes * safetyFactor) )
  • Для JVM-heap берите именно то, что выдаёт Runtime.maxMemory(), а не номинальное.
  • Если safeCores < 2 — увеличьте память или пересмотрите серилизацию/партиционирование.

Шаг 4. Проверьте ограничения инфраструктуры

  • YARN: yarn.scheduler.maximum-allocation-vcores, memory-mb; cgroups могут жестко убить процесс раньше Java-OOM.
  • Kubernetes: лимиты pod’а действуют сверху — heap не сможет расшириться.
  • Standalone/Mesos: смотрите, чтобы суммарное vcore-потребление не забило весь узел и не начался OS-swap.
  1. Нетривиальные эффекты, о которых часто забывают
  • GC ⇄ cores. Больше активных объектов одновременно → длиннее stop-the-world паузы у G1/ParallelGC. Явно настройте spark.executor.extraJavaOptions с G1 и -XX:MaxGCPauseMillis, иначе рост ядер приведёт к GC-бурстам и «фальшивым» OOM (timeouts).
  • PySpark × Arrow. spark.sql.execution.arrow.pyspark.enabled=true создаёт большой off-heap буфер внутри JVM + IPC-шару. При 8+ ядрах размер этой памяти практически линейно растёт.
  • Shuffle-spill на диск. Когда heap забит, Spark начинает сливать промежуточные сегменты на диск; если диск медленный, задача «застынет» и GC не освободит память вовремя — получите OOM с формулировкой «failed to allocate page». Не лечится добавлением ядер, только мемом или SSD/NVMe.
  • Skew (перекос по партициям). Если одна-две партиции съедают половину кучи, увеличение ядер ситуацию даже ухудшит, потому что другие потоки будут конкурировать за тот же heap и очередь spill-ов. Лечится salting, skewJoin, adaptiveExecution.
  1. Правила-“отсечки”, работающие в проде
  2. 2–5 ядер на executor на YARN — золотая середина для смешанных (CPU+shuffle) workloads.
  3. \~300 МБ heap-памяти на задачу — консервативная оценка для Java/Scala кода без больших UDF-объектов. Для PySpark считайте 400–500 МБ за счёт Python-heap.
  4. Не выходите за 8 ядер на executor, пока не доказали профилировщиком, что именно CPU простаивают.
  5. Никогда не масштабируйтесь только одним числом — всегда тройка: executor.instances, executor.cores, executor.memory(+Overhead) должна подбираться одновременно.

Итог

Увеличение spark.executor.cores полезно лишь тогда, когда ваш bottleneck действительно CPU и при этом у каждой задачи остаётся достаточно heap+off-heap-памяти с запасом на GC и сетевые буферы. Во всех остальных случаях вы быстрее упёрнётесь в OOM или длинные GC-паузы, чем получите прирост производительности. Сначала померьте память на задачу, прикиньте безопасное число ядер, а уже потом масштабируйте job.

How well did you know this?
1
Not at all
2
3
4
5
Perfectly
17
Q

Архитектура Spark - что под капотом?

Теги: #Ярослав

A

Компоненты архитектуры Spark:
Драйвер (Driver): запускает пользовательский код и управляет DAG, этапами, задачами.
Менеджер кластеров (Cluster Manager): управляет ресурсами в кластере (может быть YARN, Mesos или Standalone).
Воркеры (Worker Nodes): физические или виртуальные машины, на которых выполняются исполнители.
Исполнители (Executors): процессы на воркерах, выполняющие задачи и управляют памятью для RDD.

How well did you know this?
1
Not at all
2
3
4
5
Perfectly
18
Q

Что такое udf?

Теги: #Яроcлав

A

Что такое UDF в Spark

UDF (User‑Defined Function) — это функция, которую вы регистрируете в Spark SQL, чтобы выполнить логику, которой нет в стандартных выражениях org.apache.spark.sql.functions или SQL‑синтаксисе. Spark вызывает эту функцию для каждой строки (или батча строк), поэтому важно понимать, где она исполняется и сколько стоит перенос данных.

Три основные разновидности

  1. PythonUDF (scalar)
    Исполнение: в отдельном Python‑процессе на каждом экзекьютере через Py4J, тк сначала данные сериализются в пикл и потом передаются на питон.
    Когда применять: только если совсем нельзя выразить логику на SQL/Scala.
    Минусы: Проблема в том, что для исполнения Python UDF нужно перенести данные к коду и вернуть обратно. Каждую строку приходится копировать JVM ⇄ Python; Catalyst не оптимизирует, поэтому нет push‑down фильтров, project pruning и т.д.
  2. PandasUDF (vectorized)
    Исполнение: тоже в Python, но колонками (Arrow RecordBatch), что снижает накладные расходы на сериализацию на порядок.
    Хорошо подходит для: NumPy/Pandas‑логики, агрегатов со сложной математикой.
    Ограничения: поддерживаются не все типы (nested Struct/Map иногда падают); нельзя использовать в streaming.
  3. Scala/JavaUDF
    Исполнение: прямо в JVMExecutor’е, без межпроцессного копирования.
    Плюсы: быстрее Python‑вариантов.
    Минусы: Catalyst всё равно видит функцию как «чёрный ящик», значит оптимизации также теряются.

Как создать и вызвать (кратко)

```python
from pyspark.sql.functions import udf
from pyspark.sql.types import IntegerType

scalar Python UDF
@udf(returnType=IntegerType())
def add_one(x):
return x + 1

df = spark.range(5).withColumn(“y”, add_one(“id”))
~~~

```python
import pandas as pd
import pyspark.sql.functions as F
from pyspark.sql.types import LongType
from pyspark.sql.functions import pandas_udf

Pandas UDF
@pandas_udf(LongType())
def add_one_vec(s: pd.Series) -> pd.Series:
return s + 1

df = spark.range(5).withColumn(“y”, add_one_vec(“id”))
~~~

```scala
// Scala UDF
val addOne = udf((x: Long) => x + 1)
spark.range(5).withColumn(“y”, addOne($”id”))
~~~

Практические правила для продакшна

  1. Сначала ищите готовую функцию или выражение expr(...).
  2. Если нужна кастомная логика на JVM — пишите Scala/JavaUDF.
  3. Если без Python не обойтись — предпочитайте Pandas UDF и включайте Arrow:
    python
    spark.conf.set("spark.sql.execution.arrow.pyspark.enabled", "true")
  4. Держите UDF stateless и идемпотентной; помечайте недетерминированные функции как .asNondeterministic().
  5. Тестируйте на реальном объёме данных: даже PandasUDF может просесть в скорости при гигабайтных батчах.

Итого: UDF — мощный, но дорогой инструмент. Используйте его, только когда стандартного Spark SQL недостаточно, выбирая вариант (Scala, Pandas, Python scalar) по наименьшему оверхеду для вашей задачи.

How well did you know this?
1
Not at all
2
3
4
5
Perfectly
19
Q

Как ограничить количество ресурсов в Спарк? Можно ли через YARN это сделать?

Теги: #мир

A

Как “подрезать” Spark‑приложение по ресурсам

  1. Границы, которые ставит сам Spark
  • Память и CPU на драйвер и исполнители
    Через spark-submit можно задать: --driver-memory, --driver-cores, --executor-memory, --executor-cores, --num-executors.
    Это превращается в YARN‑контейнеры, у которых чётко задан объём памяти и количество vCore‑ов.
  • Память “поверх” heap’а
    Есть параметр spark.yarn.executor.memoryOverhead. Он нужен, чтобы резервировать память под off‑heap‑выделения, Arrow, JNI, Netty и т.п. По умолчанию Spark сам выставляет минимум 384 МБ или 10% от executor-memory, но этого часто не хватает. Если не задать — YARN может прибить контейнер за превышение лимита, хотя GC‑логи будут чистыми.
  • CPU на задачу
    Через spark.task.cpus можно указать, сколько vCore требуется на одну task. Это полезно, если task тяжёлая — например, обучение модели, видеокодек, вызов C++‑библиотеки и т.п. Тогда можно уменьшить общее число task’ов, но дать им больше ресурсов.
  1. Что может ограничить YARN поверх Spark

Да, можно — и часто именно через YARN и ограничивают Spark‑приложения. Тут работает сразу несколько уровней:

  • На уровне узла (ноды)
    В конфиге YARN NodeManager задаются yarn.nodemanager.resource.memory-mb и cpu-vcores. Это лимит, который вообще доступен на данной машине. Больше нельзя ни одному приложению — ни Spark, ни Hive, ни чему другому.
  • На уровне очереди (CapacityScheduler или FairScheduler)
    В YARN очереди можно задавать capacity, max-capacity, user-limit-factor.
    Эти параметры управляют, сколько ресурсов (в % от общего пула) может получить очередь, и сколько один пользователь может забрать из этой очереди.
    Например, можно ограничить, чтобы Spark‑приложения конкретного пользователя не забирали больше 30% ресурсов.
  • На уровне контейнера
    Контейнер — это единица, которую запрашивает Spark для executor’а или драйвера. Spark сам выбирает размеры в зависимости от указанных флагов, но контейнер никогда не выйдет за рамки, которые позволяют очередь и ноды.

Практические нюансы

  • Если драйвер в client‑режиме
    Тогда он не запускается внутри YARN‑контейнера, а работает на вашей машине (или edge‑ноде). YARN на него не влияет — если у вас не настроены cgroups, драйвер может съесть сколько угодно памяти. Это может стать bottleneck.
  • Dynamic Allocation
    С параметром spark.dynamicAllocation.enabled=true Spark может автоматически запрашивать и отпускать executors. Всё равно действует ограничение maxExecutors, и оно не позволит вылезти за границы очереди или кластера.
  • Resource Profiles (Spark 3.1+)
    Позволяют задавать разные размеры executors для разных стадий. Например, для одной стадии давать 2 core и 4 гб, а для другой — 4 core и 8 гб. Это не обход лимитов — просто более гибкая работа в пределах разрешённых границ.
  • GPU / FPGA
    Для работы с видеокартами Spark использует параметры spark.executor.resource.gpu.amount и аналогичные. Чтобы YARN правильно выделял узлы с нужными ресурсами, используют метки (NodeLabels) и cgroups.
  • Проблемы с Off‑Heap памятью
    Если в логах нет GC, но YARN убивает контейнер — почти наверняка дело в том, что выделения шли в off‑heap. Это может быть Arrow, Netty, C‑библиотеки, Broadcast‑переменные. Чтобы этого избежать, увеличивают memoryOverhead.

Минимальная шпаргалка для ответа

> “Я ограничиваю ресурсы на трёх уровнях: внутри Spark — это executor‑memory, cores, task.cpus, dynamic allocation; на уровне YARN — через queue capacity и user‑limit‑factor; на уровне нод — через ресурсы NodeManager. В cluster‑режиме драйвер тоже попадает под лимиты, в client‑режиме — нет, и это отдельный риск. Также использую memoryOverhead, если есть Arrow или Off‑Heap‑нагрузка. Могу задавать resource profiles для разных стадий, а для GPU — выделяю ресурсы через node labels.”

Хочешь дополнить это кейсами с реальных проектов — например, как спасал джобу от OOM или как падал client‑драйвер?

How well did you know this?
1
Not at all
2
3
4
5
Perfectly
20
Q

Как происходит выполнение (вычисления) в Spark? Что под капотом у Spark?

Теги: #Билайн, #rubbles

A

Коротко: почти в такой, но не «жёсткой линейкой». Есть фиксированный «скелет» шага за шагом, а внутри него — параллелизм и обратная связь (AQE), из-за чего часть вещей делается на лету. Ниже — что идёт строго по порядку, а что может накладываться/меняться.

Что идёт строго по порядку (скелет)

  1. Action → планирование на драйвере. Трансформации ленивы; действие (например, count, show, запись) триггерит оптимизацию/исполнение. Для DataFrame/Dataset это: логический план → оптимизированный → физический (Catalyst + выбор стратегии). Посмотреть можно через explain("extended"). (spark.apache.org)
  2. Разбиение на стадии по границам shuffle. Любая action-работа превращается в стадии, которые разделены распределёнными shuffle-операциями (wide deps). (spark.apache.org)
  3. DAGScheduler/TaskScheduler. Драйвер строит DAG стадий и подаёт готовые стадии как TaskSet в планировщик задач; затем задачи раздаются на executors. (archive.apache.org, spark.apache.org)
  4. Исполнение задач на executors.
    • Для первой стадии читаются исходники; для последующих — читаются shuffle-файлы предыдущих стадий.
    • На wide-границе задача пишет shuffle-выход (sort-based shuffle по умолчанию) на локальный диск; следующая стадия его читает. (spark.apache.org)

Итого: «Action → (Catalyst) план → стадии → задачи → shuffle-write/read → следующая стадия → … → результат» — это базовый, детерминированный каркас. (spark.apache.org)

Что НЕ строго по очереди (накладывается/меняется)

  • AQE меняет план в runtime. После старта запроса Spark может: схлопывать пост-shuffle партиции, конвертировать sort-merge join в broadcast/shuffled-hash, лечить skew. С 3.2 включено по умолчанию (spark.sql.adaptive.enabled=true). (archive.apache.org, downloads.apache.org, spark.apache.org)
  • Ресурсы и параллелизм. Исполнители могут быть подняты заранее или динамически; в пределах готовой стадии её задачи бегут параллельно; другие стадии этого же джоба ждут родителей, но другие джобы могут исполняться параллельно (конфигурируется политикой планировщика). (spark.apache.org)
  • Broadcast и обмен данными. Если выбран broadcast-join (или AQE сконвертировал в него), малая сторона рассылается executors без shuffle; это может подготавливаться параллельно с другими частями плана. (spark.apache.org)
  • UI/диагностика не влияет на порядок. UI на :4040 просто слушает события выполнения; порядок выше не меняет. (spark.apache.org)

Ответ на твой вопрос «строго ли в такой последовательности?»

  • Да, в терминах уровней порядок «Action → Catalyst-план → DAGScheduler (стадии) → TaskScheduler (задачи) → shuffle → следующая стадия» — верный. Это «скелет», без которого вычисление не состоится. (spark.apache.org, archive.apache.org)
  • Но нет, не «монотонной лентой»: executors/задачи работают параллельно, другие джобы могут накладываться, а AQE вправе переиграть часть физического плана уже во время бега. Это нормальная, задокументированная динамика движка. (spark.apache.org, archive.apache.org)

Зачем это знать? — Чтобы корректно читать план/UI и объяснять «почему медленно»: узкие места почти всегда на границах shuffle (чтение/запись, сеть, количество партиций) или из-за неверной стратегии join/скью. Понимание, что фиксировано, а что адаптивно, позволяет либо заранее выбрать лучшую стратегию (partitioning, hints), либо «дать AQE шанс» и проверить фактическую стратегию в UI/плане. (spark.apache.org)

Почему так устроено? — Жёсткий каркас гарантирует корректность/устойчивость (линейность стадий по зависимостям и восстановление по lineage), а адаптивные и параллельные части дают производительность и лучшую утилизацию кластера. (spark.apache.org)

Если хочешь, сожму это в один «постер»-чеклист (по шагам + где смотреть в UI) под твой кластер и версии Spark.

How well did you know this?
1
Not at all
2
3
4
5
Perfectly
21
Q

Как изменить степень параллелизма в Spark?

Теги: #Билайн #Ярослав

A

PySpark

0 · Что именно считается параллелизмом

  • Число одновременно исполняющихся задач (task).
    parallelism = min(кол‑во_партиций_на_стадии, executors× coresPerExecutor)

Меняем мы всегда либо количество партиций, либо количество доступных CPU‑ядер. Всё остальное — вариации вокруг этих двух рычагов.

1 · Меняем партиции в самом коде

```python
df = spark.read.parquet(path) # партиции придут «как лежат» в файлах

df = df.repartition(1600) # создаём 1600 новых шардов (shuffle)
df = df.repartition(“customer_id”) # hash‑partition по ключу (shuffle)

df_small = df.filter(“dt = ‘2025‑05‑12’”)
df_small = df_small.coalesce(200) # ужимаем без shuffle

rdd = sc.textFile(“hdfs:///data”, minPartitions=800)
rdd = rdd.repartition(1200) # RDD API то же самое
~~~

Нюансы

  • coalesce(N) без shuffle=True может только уменьшать N; при увеличении Spark создаст пустышки → параллелизм не вырастет.
  • При переходе RDD → DataFrame теряется кастомный Partitioner, если он был.
  • В Spark 3 включён AQE: spark.sql.adaptive.enabled=true. Тогда Spark умеет сам дробить «длинные» таски и схлопывать мелкие, но только после первого шаффла. То есть первоначальное repartition() всё равно важно.

2 · Меняем число одновременных задач через spark-submit

```bash
spark-submit \
–master yarn \
–deploy-mode cluster \
–num-executors 50 \
–executor-cores 4 \
–executor-memory 8g \
–conf spark.default.parallelism=800 \
–conf spark.sql.shuffle.partitions=1600 \
main.py
~~~

  • --num-executors × --executor-cores→ верхняя граница одновременно работающих тасков.
  • spark.default.parallelism читается ровно один раз при создании SparkContext; после старта приложения поменять нельзя. Используется, когда Spark «сам решает», сколько партиций создать (например reduceByKey без numPartitions).
  • spark.sql.shuffle.partitions определяет число выходных партиций каждой shuffle‑операции в SparkSQL / DataFrame API (по умолчанию200). Слишком маленькое значение →«хвостовые» таски; слишком большое →море мелких файлов.

Быстрый хак (попадает в мой шаблон скрипта)

```bash
CORES_TOTAL=$(($(nproc –all) * 4)) # 4 узла по nproc ядер
spark-submit \
–conf spark.sql.shuffle.partitions=$((CORES_TOTAL*2)) \

~~~

Даёт «чуть больше партиций, чем ядер»— почти всегда попадаю в sweet‑spot без микроскопа.

3 · Динамика во время выполнения

```bash
–conf spark.dynamicAllocation.enabled=true
–conf spark.shuffle.service.enabled=true
~~~

  • Spark самостоятельно добавит ещё executors, если очередь тасков длинная, и высвободит их, когда стадия закончится.
  • Важно: существующие RDD и shuffle‑файлы уже имеют фиксированное число партиций; dynamic allocation не меняет их. Поэтому после огромного coalesce(10) добавить 100экзекьюторов эффекта не даст.
How well did you know this?
1
Not at all
2
3
4
5
Perfectly
22
Q

Как бороться с перекосом данных (data skew)?

Теги: #Ярослав

A

Перекос данных (data skew) возникает, когда данные распределены по разделам (partitions) неравномерно, и некоторые из них получают существенно больше данных. Это приводит к увеличению времени выполнения задач и перегрузке памяти на отдельных узлах. Spark предлагает несколько подходов для борьбы с этим:

  1. Использование broadcast join
    При перекосе данных в джоинах с маленькой таблицей используйте broadcast join: небольшая таблица передается на все узлы, и перекос при соединении минимизируется.

Включите автоматический broadcast join с помощью spark.sql.autoBroadcastJoinThreshold.
5. Выборка ключей с перекосом и их обработка отдельно
Найдите ключи, вызывающие перекос, обработайте их отдельно (например, в отдельном DataFrame) и соедините результат с основными данными после выполнения операций.

  1. Агрегация данных перед джойном
    Если перекос возникает в джоинах, попробуйте сначала агрегировать данные по ключам до соединения, что может уменьшить количество данных и нагрузку.

Изменение логики

Пересмотреть гранулярность ключа (более высокая кардинальность).

Декомпозировать «горячие» ключи на отдельный пайплайн/батч.

AQE (Adaptive Query Execution) skew-join
В Spark 3+ можно включить автоматическое разбиение перекошенных shuffle-партиций на несколько меньших при sort-merge join:
spark.sql.adaptive.enabled=true и spark.sql.adaptive.skewJoin.enabled=true
Порог/фактор настраиваются (skewedPartitionFactor, skewedPartitionThresholdInBytes). Это позволяет динамически разрезать «жирный» reduce-partition на несколько задач.

How well did you know this?
1
Not at all
2
3
4
5
Perfectly
23
Q

От чего зависит нагрузка по вычислениям? Есть ли spill’ы? #Иннотех

A
  • Нагрузка по вычислениям зависит от:
    1. Объёма данных: чем больше данных, тем выше объём обработки.
    2. Типа операций:
    o Широкие трансформации (join, groupBy, reduceByKey) вызывают shuffle и требуют перераспределения данных.
    o Узкие трансформации (map, filter) работают в рамках текущих партиций.
  • Spill (пролив на диск) происходит, когда Spark не хватает памяти для хранения временных структур (например, при shuffle или сортировке).
  • Spark «выгружает» часть данных на диск, чтобы освободить оперативную память.
  • Spill может замедлять вычисления, поскольку доступ к диску медленнее, чем к памяти.
  • Проверить наличие spill’ов можно в логах Spark (или через Spark UI, в разделе Tasks/Stages видна информация о spill на диск).
How well did you know this?
1
Not at all
2
3
4
5
Perfectly
24
Q

Какая разница между Coalesce и Repartition и в каких случаях и когда что юзать?

Теги: #Ярослав

A
  • coalesce и repartition — это методы переразбиения (reshuffling) данных в Spark, но с разными подходами и назначением.
  1. coalesce
    Используется для уменьшения количества разделов (partitions).
    Работает без шаффлинга (reshuffling) данных между узлами, если данные остаются на тех же узлах.
    Обычно быстрее и эффективнее, так как не требует передачи данных по сети.
    Идеален для оптимизации разделов, если данные стали меньше на более поздних стадиях обработки, например, после фильтрации.
    Пример использования:
    scala
    val reducedData = largeData.coalesce(4)
    Когда использовать:
    Если нужно уменьшить количество разделов.
    Для увеличения производительности на финальных этапах, когда данные уже обработаны и требуется только сохранить их в меньшем количестве разделов.
  2. repartition
    Может использоваться как для уменьшения, так и для увеличения числа разделов.
    Требует шаффлинга данных, что приводит к перераспределению по всем узлам кластера.
    Подходит для случаев, когда необходимо равномерно распределить данные или переразбить их для параллельной обработки, что актуально на ранних этапах анализа данных.

Пример использования:
scala
val repartitionedData = data.repartition(10)

Почему repartition — в начале/перед тяжёлыми шагами

  1. Ранняя балансировка: перед join/groupBy вы хотите равномерные партиции, иначе получите “хвосты” (stragglers). repartition(key) разносит ключи ровнее.
  2. Управление параллелизмом: увеличить число партиций до разумного уровня, чтобы загрузить все ядра кластера на первых дорогих стадиях.
  3. Снижение skew: если одна группа/ключ разрастается, хэш-repartition или range-repartition до агрегации сгладит перекос.

Почему coalesce — в конце/перед записью

  1. Нет лишнего shuffle: к концу плана shuffle уже был (join/agg/sort). Делать ещё один (через repartition) только ради меньшего числа файлов — лишние затраты. coalesce уменьшит партиции дёшево.
  2. Контроль маленьких файлов: перед write склеиваем партиции до целевого количества → меньше файлов и нагрузка на метаданные.
  3. Размер очевиден только в конце: именно к финишу вы знаете итоговый объём и можете рассчитать адекватное n для coalesce.
How well did you know this?
1
Not at all
2
3
4
5
Perfectly
25
Как удалить дубликаты из таблицы? Теги: #Иннотех
* В Spark SQL/DF: df.dropDuplicates(["cols"]), где cols – список колонок, по которым ищем дубликаты. * Можно использовать Window-функции и фильтр по row_number(), если нужна более гибкая логика.
26
Есть большой датафрейм, который нужно обработать, но он не помещается целиком в оперативку. Что делать? Теги: #wildberries
**«В Spark‑е сам по себе “слишком большой датафрейм” — это симптом того, что либо неправильно распределена нагрузка, либо выполняются действия, требующие хранить все данные одновременно. Решаю за счёт грамотного партиционирования, ранней фильтрации/проекции, правильного уровня `persist`, настройки памяти и, если нужно, горизонтального масштабирования кластера. Ни один из этих шагов не требует “всё в RAM”.»** --- 1. Проверяю, почему именно «не влезает» Сначала смотрю, где именно возникает проблема. Если взрыв памяти идёт на `collect`, `toPandas`, shuffle или join — значит, либо данные перетаскиваются на драйвер, либо Spark перегружает отдельные executor‑ы. Проверяю количество и баланс партиций через `df.rdd.getNumPartitions`, и если вижу сильный разброс, использую `repartition`, либо делаю salting на жирные ключи. Также смотрю, кэшируются ли данные в памяти — возможно, стоит использовать `persist(MEMORY_AND_DISK)` или вообще не кэшировать. Если читаю из CSV/JSON, перевожу в Parquet — это даст и сжатие, и predicate pushdown. Ещё полезно смотреть физический план через `df.explain(mode="formatted")`, чтобы понять, где Spark делает лишнюю работу. --- 2. Расчленяю большие задачи Первый шаг — ранняя фильтрация и проекция. Оставляю только нужные колонки и фильтрую по датам или другим условиям как можно раньше — Spark в этом случае прочитает меньше. Далее — чистое партиционирование на уровне источника. Например, если у меня Wildberries‑подобные данные, я делаю партиционирование по `region_id` и дате, чтобы Spark сам читал только нужные чанки. Это особенно важно при Parquet‑файлах — он умеет читать только нужные файлы по метаданным. Если объём данных всё равно велик, разбиваю задачу на куски по времени или категориям, обрабатываю их по отдельности и объединяю потом. Например, сначала по дням, затем финальная агрегация. Для state‑ful задач можно использовать checkpoint‑механику и структуру в стиле lakehouse. * Ранняя фильтрация и проекция:  df = (spark.read.parquet("/bronze/wb_parquet")    .select("event_date","region_id","sku","price","qty")    .filter("event_date >= '2025‑01‑01' AND region_id IN (50,77,99)")) * Храню данные партиционированными:  df.write.partitionBy("region_id","event_date").mode("append").parquet("/silver/wb_sales/") * Читаю с автоматическим pruning:  daily = spark.read.parquet("/silver/wb_sales/")\      .filter("region_id = 77 AND event_date = '2025‑05‑10'") * Если всё равно тяжело — чанк‑процессинг:  for day in days:   day_df = spark.read.parquet("/silver/wb_sales/")\        .filter(f"event_date = '{day}'")   processed = day_df.groupBy("sku").agg(sum("price*qty").alias("gmv"))   processed.write.mode("append").parquet("/gold/daily_gmv/") --- 3. Конфигурирую память и диск В конфигурации executors выделяю достаточно памяти, обычно от 4 до 8 ГБ. Если вижу GC overhead или частые spills, увеличиваю executor memory или перераспределяю ресурсы. Также можно настроить `spark.memory.fraction` и `storageFraction` для балансировки между кешированием и shuffle‑буферами. Если shuffle сильно перегружает файловую систему — перемещаю временные каталоги (`spark.local.dir`) на быстрый диск (например, SSD). Ещё важно выставить `spark.sql.shuffle.partitions` в разумное значение, зависящее от объёма данных, чтобы не плодить лишние tasks и мелкие файлы. Если есть мелкий справочник, можно увеличить `spark.sql.autoBroadcastJoinThreshold`, чтобы Spark мог использовать broadcast‑join. Пример spark‑submit:  --executor-memory 6g  --executor-cores 2  --num-executors 40  --conf spark.memory.fraction=0.6  --conf spark.memory.storageFraction=0.5  --conf spark.sql.shuffle.partitions=400  --conf spark.sql.autoBroadcastJoinThreshold=104857600  --conf spark.local.dir=/mnt/fastssd/spark_tmp При GC overhead или частом spilling увеличиваю память или дроблю задачу. --- 4. Меняю физический план, если join‑ы жирные Если один датасет — справочник, делаю broadcast join, например: ```scala dfBig.join(broadcast(dfSmall), Seq("product_id")) ``` Если оба датасета большие, заранее делаю `repartition` по ключу, чтобы shuffle был сбалансирован. Если вижу data skew — добавляю «соль» (salting) к самым частым значениям, а потом убираю её после join‑а. * Маленький справочник — broadcast join:  enriched = big_df.join(broadcast(dim_df), "product_id") * Два больших датасета — синхронно репартиционирую:  left = left_df.repartition("product_id")  right = right_df.repartition("product_id")  joined = left.join(right, "product_id") * Skew — солю ключ:  salted = skewed.withColumn("salt", (rand()*10).cast("int"))  salted = salted.repartition("product_id","salt")  joined = salted.join(right_df,"product_id").unionByName(others.join(right_df,"product_id")) --- 5. Обрабатываю крайние случаи Если драйвер падает из-за `collect()` — не использую его. Вместо этого сохраняю результат в файл, беру `take(100)` или использую `.show()`. Если зависают таски при shuffle — вероятно, перекос по ключам. Включаю `spark.sql.adaptive.skewJoin.enabled=true` и добавляю salting. Если выходит ошибка `Too many open files`, это значит, что Spark создал слишком много партиций — уменьшаю размер партиций или объединяю файлы заранее. Если spilling на диск происходит часто — включаю `spark.shuffle.spill.compress`, чтобы Spark использовал сжатие при записи временных данных. * Драйвер падает из‑за collect() → пишу результат в Parquet, делаю limit/show. * Завис shuffle → включаю  spark.sql.adaptive.enabled=true  spark.sql.adaptive.skewJoin.enabled=true * Too many open files → сокращаю maxPartitionBytes или заранее объединяю мелкие файлы. * Частый spilling → spark.shuffle.spill.compress=true и ZSTD. --- 6. Масштабирую, если ничего не помогает Если всё сделано правильно, но объём данных слишком велик — увеличиваю число executors, либо размер executors (вертикальное масштабирование). Практика показывает, что лучше иметь 1 CPU на 6–8 ГБ памяти. Также можно включить автоскейлинг, особенно если расчёты периодические, как это бывает при формировании отчётности в крупных интернет-магазинах вроде Wildberries. * Горизонтально — больше executors / K8s pods / EMR nodes. * Вертикально — до ~8 ГБ heap на ядро, дальше GC дороже. * Dynamic Allocation:  spark.dynamicAllocation.enabled=true  spark.dynamicAllocation.minExecutors=4  spark.dynamicAllocation.maxExecutors=60 --- Как это формулирую на собеседовании > «В Spark‑е датафрейм физически распределён по партициям; цель — сделать так, чтобы каждая партиция была 100–200 МБ и помещалась в память одного executor‑занятия. Сначала отфильтровываю и выбираю только нужные столбцы → Spark протолкнёт это к источнику благодаря column‑pruning и predicate push‑down. Если данные уже лежат в Parquet, просто читаю нужные партиции; если нет — пишу промежуточный слой. Затем проверяю план: нет ли `collect`, нет ли broadcast‑ов на гигабайт, нет ли свежего data skew. Кэширую `MEMORY_AND_DISK`, чтобы Spark сам spill‑ил, и при необходимости добавляю executors. Так мы обрабатываем десятки терабайт Wildberries‑логов без попытки втаскивать их целиком в RAM драйвера.» Хочешь, дам вариант для печати или в `.md`/`.txt` формате?
27
Что такое Shuffle, partitionBy, широкие и узкие трансформации? Теги: #Ярослав #Cian, #wildberries #Билайн, #rubbles
Как я объясняю эти понятия на собеседовании --- 1. Shuffle — «перетасовка» данных между узлами кластера * **Что это физически:** Spark разбивает данные на партиции, каждая‑— это файл в памяти/на диске исполнителя (executor). Когда логика запроса требует, чтобы записи с одинаковым ключом оказались в одной партиции, Spark **хэширует или ранжирует** ключи, переписывает данные во временные файлы, затем рассылает их по сети другим executors. * **За что платим:** * сетевой трафик (самый узкий канал в кластере); * сериализацию/десериализацию; * IO — данные почти всегда проливаются на диск, даже если хватает RAM, чтобы восстановиться после сбоя executor’а. * **Как понять, что shuffle случился:** в Spark UI появляется новая *stage*; план в DAG‑визуализаторе содержит «ShuffleExchange» (DataFrame API) или «shuffle read/write» (RDD). * **Чем опасен:** * длинные «хвосты» задач из‑за data skew (несбалансированное распределение ключей); * рост числа мелких файлов; * OOM при крупном ключе, если `spark.reducer.maxSizeInFlight`/`spark.shuffle.file.buffer` подобраны неверно. * **Типовые лекарства:** salting ключей, broadcast‑join вместо shuffle‑join, Adaptive Query Execution (AQE), аккуратный `repartition`, настройка `spark.sql.shuffle.partitions`, предварительная агрегация на маппере (`combineByKey`/`mapGroupsWithState`). --- 2. `partitionBy` — управление тем, *где* окажется каждая запись > В Spark есть **две** очень разные операции с таким именем. 1. **`RDD.partitionBy(partitioner)`** *Уровень выполнения* (runtime). Функция устанавливает партешёнер Pair‑RDD (`HashPartitioner`, `RangePartitioner`, кастомный), чтобы следующие операции `reduceByKey`, `groupByKey` и т. д. *не провоцировали новый shuffle*. По сути мы говорим планировщику: «Данные уже лежат как надо». 2. **`DataFrameWriter.partitionBy("col1", "col2")`** *Уровень хранилища*. Определяет каталожную разметку выходных файлов («…/col1=2025/col2=05/…»). Выполняется **после** всех вычислений; на производительность текущего запроса не влияет, но влияет на *следующие* чтения за счёт partition pruning. --- 3. Узкие (narrow) и широкие (wide) трансформации * **Критерий:** сколько партиций родительского RDD требуется, чтобы посчитать *одну* партицию дочернего. * *Narrow* — зависимость «один‑к‑одному» или «многие‑к‑одному» → никаких сетевых копий, вся работа остаётся внутри executor’а. * *Wide* — «многие‑ко‑многим» → обязательный shuffle. * **Почему важно:** именно широкие трансформации разбивают DAG на stages; оптимизация в Spark почти всегда сводится к сокращению их числа или к тому, чтобы делать их «по‑другому» (broadcast, local sort, combiners). Примеры (не все‑— только самые характерные) **Узкие** * `map`, `flatMap`, `filter`, `mapPartitions`, `sample` * `union` (если партиции уже согласованы) * `coalesce(num, shuffle = false)` — уменьшает число партиций без shuffle * `withColumn`, `drop`, `select` в Dataset/DataFrame — пока не трогаем ключ **Широкие** * `groupByKey`, `reduceByKey`, `aggregateByKey`, `countByKey` * `distinct`, `intersection`, `sortBy`, `orderBy` * `join` (кроме `broadcast join`) * `repartition`, `coalesce(..., shuffle = true)` * `pivot`, `cube`, `rollup` > **Edge‑case:** `groupByKey` всегда гоняет все значения ключа по сети; `reduceByKey` отправляет *уже частично агрегированные* значения, что даёт выигрыш на больших группах. --- 4. Что рассказываю из опыта эксплуатации * **Диагностика разбалансировки** — смотрю `Shuffle Read Size / Records` на таске; если медиана и 95‑й персентиль отличаются ×10, добавляю salting или `skewHint`. * **Контроль мелких файлов** — при массовых `repartition(1)` в ETL‑лейках ставлю `spark.sql.files.maxPartitionBytes` и включаю `optimizeWrite` в Delta Lake, чтобы Spark сам сливал спиллы. * **Особые ключи** — `null` часто попадает в отдельную партицию, что делает её самой «толстой». Помогает предварительный `filter`/`fillNull` или явный `partitionBy(new NullAwarePartitioner(n))`. * **Параметр `spark.sql.shuffle.partitions`** — ставлю ≈ 2‑3× число CPU‑ядер в кластере; меньше — недогрузка CPU, больше — слишком много мелких файлов и оверхед на task‑launch. * **AQE** (Spark 3+) — держу включённой. Она умеет: * динамически делить/сливать партиции после первой фазы shuffle; * переключать стратегию join на broadcast при подсчитанном размере. --- Короткий TL;DR, если просят «в двух словах» > **Shuffle** — когда Spark вынужден гонять данные между узлами, это дорого. > **partitionBy** — способ заранее расположить данные нужным образом (в памяти или на диске) и тем самым сэкономить на shuffle в будущем. > **Узкие трансформации** обходятся без сети; **широкие** вызывают shuffle. Чем меньше широких операций и чем лучше подготовлены данные, тем быстрее и дешевле пайплайн.
28
Как строится план запроса в Spark? Из чего он состоит? Как работает Catalyst Optimizer? Как выбирается оптимальный план? Теги: #ПетровичТех, #rubbles
Разберём пошагово: --- 1. Этапы построения плана запроса в Spark Когда вы пишете что-то вроде: ```python df.groupBy("city").agg(avg("salary")) ``` Spark проходит через несколько стадий: 1. **Unresolved Logical Plan** * Просто синтаксическое дерево (AST), где указаны операции (`Project`, `Aggregate`, `Filter`) и имена колонок/таблиц. * Здесь ещё не проверено, есть ли такие таблицы/поля, типы данных неизвестны. 2. **Analyzed Logical Plan** * После работы *Analyzer* имена таблиц и колонок проверены по каталогу (метаданные Hive Metastore или Spark Catalog). * Типы данных выведены, алиасы развёрнуты, все ссылки валидны. 3. **Optimized Logical Plan** * Работает **Catalyst Optimizer** (см. ниже). * Применяются эвристические правила (правило predicate pushdown, constant folding, удаление лишних проекций, переписывание join-ов и т.д.). * План всё ещё логический (что сделать, но не как). 4. **Physical Plan(s)** * *Planner* превращает логический план в набор возможных физических планов. * Здесь появляются конкретные стратегии выполнения: BroadcastHashJoin vs SortMergeJoin, HashAggregate vs SortAggregate, и т.п. 5. **Selected Physical Plan** * Выбирается оптимальный из кандидатов, на основе *cost model* (стоимость — число шффлов, размер данных, доступность broadcast, сортировки). * Это то, что реально будет исполнено воркерами. --- 2. Catalyst Optimizer: что он делает Catalyst — это модуль оптимизации, написанный на Scala с использованием *rule-based transformations*. Основные виды оптимизаций: * **Constant folding**: `SELECT 2 + 3` → `SELECT 5`. * **Predicate pushdown**: фильтры опускаются как можно ближе к источнику данных (`WHERE year=2023` → в паркетный скан). * **Projection pruning**: удаляются неиспользуемые колонки. * **Reordering joins**: если несколько join-ов, может менять порядок. * **Null propagation**: выражения с `NULL` упрощаются. * **Subquery elimination / simplification**: избавление от подзапросов. * **Filter / Limit pushdown** в DataSource API. Catalyst использует дерево правил (rule-based), а не только cost-based подход: каждое правило — это «pattern → rewrite». --- 3. Как выбирается оптимальный план После оптимизации логического плана Spark генерирует несколько физических кандидатов: * Пример: join можно сделать как `BroadcastHashJoin`, `ShuffleHashJoin`, `SortMergeJoin`. * Spark проверяет условия: * Если одна таблица мала (< spark.sql.autoBroadcastJoinThreshold), можно заbroadcastить. * Если обе большие — придётся Shuffle + Sort. * Если ключи отсортированы — можно использовать SortMergeJoin без доп. сортировки. Затем включается **Cost Model**: * Оценивает количество shuffle, сортировок, ширину данных, использование памяти. * Выбирает дешевый по «стоимости» план. Далее план закрепляется и исполняется. В Spark UI его можно увидеть в разделе **SQL → DAG Visualization** (там будет Physical Plan). --- ✅ **Кратко резюмировать как на собесе**: * План строится в несколько шагов: *Unresolved Logical → Analyzed Logical → Optimized Logical → Physical → Selected*. * Catalyst Optimizer — набор rule-based трансформаций: pushdown, pruning, переписывание выражений, join reordering. * Физический план строится с разными стратегиями (Broadcast, Shuffle, SortMerge). * Оптимальный выбирается по cost-model: минимизация shuffle, учёт размера таблиц, доступности broadcast. --- Хочешь, я сделаю это в двух версиях для собеса: **короткий (30–40 сек)** и **развёрнутый (2–3 мин)**?
29
Что такое pushdown-фильтр, как проверить, применился ли он? Теги: #wildberries
* Pushdown-фильтр – это механизм, когда Spark передаёт условия фильтрации на уровень источника данных (например, СУБД или файл-формат Parquet), чтобы: * Ограничить объём данных, которые нужно прочитать * Уменьшить сетевой и I/O трафик * Проверить: 1. Посмотреть физический план через df.explain(true) или spark.sql.explain(). o Если pushdown применяется, там будет указано, что фильтр был «протолкнут» (pushed down) к источнику. 2. Логи Spark/источника данных: можно увидеть, что выбираются уже отфильтрованные данные (не полный скан). 3. В некоторых коннекторах (например, JDBC) Spark строит SQL-запрос с условиями WHERE, если pushdown активирован.
30
Как исправить ошибку OOM (Out of memory)? Теги: #Ярослав
Для исправления ошибки OOM (Out of Memory) в Spark нужно оптимизировать использование памяти и вычислительных ресурсов. Основные подходы: 1. Настройка памяти Увеличьте память исполнителя с помощью spark.executor.memory (например, spark.executor.memory=4g). Это даст больше памяти для обработки данных. Настройте память драйвера: spark.driver.memory (например, spark.driver.memory=2g), если OOM возникает на уровне драйвера. Управляйте памятью для шаффлинга: spark.memory.fraction определяет долю памяти для хранения данных и вычислений. Увеличьте spark.memory.storageFraction, чтобы больше памяти выделить для промежуточных данных. 2. Оптимизация разделов (partitions) Увеличьте количество разделов при загрузке больших данных или выполнении тяжелых операций, например, data.repartition(200), чтобы уменьшить объем данных в каждом разделе и снизить нагрузку на память. Используйте coalesce для уменьшения числа разделов после фильтрации данных, что помогает избежать лишнего шаффлинга. 3. Уменьшение объема данных Фильтрация на ранних этапах: фильтруйте ненужные данные, чтобы уменьшить объем входных данных. Проекционные операции: используйте только нужные колонки, избегая работы с большими и ненужными полями. Настройка форматов хранения: выбирайте колоночные форматы данных (например, Parquet), которые потребляют меньше памяти. 4. Оптимизация шаффлинга Увеличьте число разделов для шаффлинга с помощью spark.sql.shuffle.partitions. Если возможно, используйте broadcast join для небольших таблиц в джоинах, чтобы избежать тяжелого шаффлинга. 5. Управление ресурсами кластера Увеличьте количество исполнителей и потоков (spark.executor.instances, spark.executor.cores), чтобы распределить нагрузку между узлами. Сжимайте данные в памяти с помощью spark.memory.compress=true и spark.rdd.compress=true.
31
Что такое persist в Spark и какие storage levels существуют? #тг
persist() позволяет дополнительно сообщить параметр storage level (MEMORY_ONLY, MEMORY_AND_DISK, MEMORY_ONLY_SER, MEMORY_AND_DISK_SER, DISK_ONLY)
32
Назовите наименьшую исполнительную единицу в Spark #тг
Задача
33
За что отвечает следующий код: my_rdd = sc.textFile(‘my_rdd_file.csv’) my_rdd = my_rdd.coalesce(5) #тг
За создание набора RDD и объединение его разделов
34
В чем разница между cache() и persist() в спарке? #тг
Что отличается cache() — это просто алиас для persist() с уровнем хранения по умолчанию RDD → MEMORY_ONLY DataFrame / Dataset → MEMORY_AND_DISK (колоночный формат) Изменить уровень после такого вызова нельзя: нужно сначала unpersist(), потом persist(newLevel). persist(StorageLevel) — тот же механизм, но даёт выбрать, где и как держать партиции: память, память+диск, сериализовано / не сериализовано, off‑heap, репликация — любая комбинация, которую предоставляет StorageLevel. Почему вообще пользуемся df = spark.read.parquet("s3://...")\ .filter("event_type = 'click'")\ .withColumn("ts", to_timestamp("timestamp")) Кажется, что df — это уже "табличка", но на самом деле это просто логический план вычислений. Никаких данных ещё нет. Spark пересчитывает DAG при каждом action. Кеширование «приклеивает» уже посчитанные партиции к executor’ам, поэтому при повторном обращении мы: не ходим снова в HDFS/DB/объектное хранилище; не тратим CPU на одинаковые трансформации; ускоряем итеративные алгоритмы, повторяющиеся join‑ы, feature engineering и т. д. Когда это реально выгодно Объект используется ≥ 2 раз и его расчёт дороже, чем хранение. Памяти мало → берём MEMORY_AND_DISK или сериализованные уровни (*_SER). Кешируем после фильтра/агрегации, а не «сырые» данные. Не забываем unpersist() — иначе забьём память кластера. TL;DR cache() — шорткат для persist() с дефолтным уровнем. persist() даёт настроить этот уровень. Юзаем, чтобы не пересчитывать тяжёлые части DAG, когда данные нужны многократно.
35
Какой класс отвечает за создание Spark-сессии на рабочих узлах в кластере? #тг
SparkContext
36
Разница flatMap и map Spark #тг
Функция map в Spark - это функция преобразования, которая применяет заданную функцию к каждому элементу RDD (Resilient Distributed Dataset) и возвращает новый RDD. Функция принимает входной элемент и возвращает один выходной элемент. Функция flatMap в Spark также является функцией преобразования, которая применяет заданную функцию к каждому элементу RDD и возвращает новый RDD. flatMap результат разворачивается (flatten), т.е. результатом является плоский список, Map -- список списков. Где применяется на практике Токенизация текста: flatMap(str.split) Разворачивание вложенных структур: JSON с массивами, лог событий, списки покупок и т.п. Построение пар (ключ, значение) из одной строки (например, для reduceByKey) В DataFrame API В чистом DataFrame API flatMap как такового нет. Аналог делается через: explode() — если в колонке массив или вложенная структура; или selectExpr("explode(array_col)"). Нужна только модификация колонок без изменения числа строк → map (в DataFrame: withColumn/select/expr). Нужно расплющить массив/словарь/строки в много строк → flatMap / explode / posexplode.
37
Когда вам понадобится кэшировать DataFrame в Apache Spark?#тг
--- Короткий ответ — когда DataFrame дорого пересчитывать, а вы собираетесь обращаться к нему **больше одного раза** в течение того же Spark‑приложения. --- 1. Практические триггеры, по которым я решаю «надо cache()» 1. **Долгий, тяжёлый пайплайн до этого места** *Парсинг JSON‑логов, нормализация временных зон, вызов UDF‑ов — и всё это на сотнях гигабайт. Повторять такое даже дважды — растратить время кластера.* 2. **Одна и та же «чистая» таблица участвует в нескольких действиях (actions)** ```python clean = raw.selectExpr(...).cache() clean.write.mode("overwrite").partitionBy("day").parquet("/silver/logs") print(clean.count()) # вторая action уже из памяти clean.filter("status = 500").show(20) ``` 3. **Множественные последующие join‑ы** Если «справочная» таблица соединяется с разными фактовыми, выгодно закэшировать именно справочник, а не каждый join‑результат.(бродкаст) 5. **Интерактивная аналитика в notebook’е** Когда кручу разные ad‑hoc queries поверх одной очищенной выборки и хочу, чтобы всё реагировало за секунды, а не за минуты. 6. **Спарк‑SQL с включённым adaptive execution** Он может сам материализовать подзапрос, но если я **знаю**, что этот подзапрос reused, я явно делаю `df.persist()` — получаю предсказуемый план вместо «угадываний» оптимизатора. --- 2. Мини‑кейсы «из жизни» (никакого ML) 2.1. Дневная витрина кликов → несколько агрегатов *Схема:* сырые click‑stream‑файлы → ETL‑очистка (`df_clicks_clean`) → три отдельных отчёта (по стране, по девайсу, по рекламной кампании). Я делаю ```python df_clicks_clean.persist(StorageLevel.MEMORY_AND_DISK) ``` и после первой `count()` все три отчёта читают данные из памяти; суммарное время падает \~в 2–3 раза. 2.2. Базовый каталог товаров, который «ходит» по пайплайну Каталог обновляется раз в день, но сегодня на него ссылаются: * обогащение цен * построение витрины «рекомендации» * проверка правил качества данных Кладу каталог в кэш; downstream‑джобы становятся CPU‑, а не I/O‑bound. --- 3. Тонкости, о которых любят спрашивать * **StorageLevel** * `MEMORY_ONLY` — быстро, но OOM → пересчёт. * `MEMORY_AND_DISK` — сейф: то, что не влезло, пойдёт на локальный диск. * `MEMORY_AND_DISK_SER` — чуть медленнее (сериализация), зато меньше RAM. * **Первое действие материализует кэш.** Без `df.count()` или другой action данных в памяти нет. * **Unpersist** вручную, если DataFrame огромный и больше не нужен: `df.unpersist(blocking=True)`. * **Не кэшировать**: * один‑разовые DataFrame’ы; * лёгкие (фильтр + select) трансформации; * когда кластеру критичнее memory footprint, чем повторные shuffle’ы. * **Внутри Spark SQL**: `CACHE TABLE my_table` делает то же, но на уровне каталога/сессии. --- Итоговая формулировка для собеседования > «Кэширование (`cache()`/`persist()`) имеет смысл, когда стоимость пересчёта DataFrame заметно выше стоимости хранения, а сам DataFrame будет использован как минимум дважды. На практике я кеширую после тяжёлых ETL‑шагов, перед множеством независимых actions,‑или в итеративных алгоритмах (например, GraphFrames), выбирая `MEMORY_AND_DISK`, если объём близок к памяти. После того как результаты перестают быть нужны — делаю `unpersist()`, чтобы не держать RAM впустую.» Эта формулировка показывает: * понимание механики lineage и materialization; * знание API (`cache`, `persist`, уровни хранения, `unpersist`); * опыт решения реальных задач ETL/аналитики без ML. Удачи на собеседовании!
38
Какие факторы влияют на join в apache spark? #тг
1) размер датасета 2) условие соединения: =, >, < и тд 3) тип соединения: inner/cross/left
39
Сколько гигабайт памяти выделяется на каждую задачу в Spark? #HalltapeRoadmapDE
В Spark нет жёстко зашитого фиксированного размера памяти на каждую задачу (task). Вместо этого память конфигурируется на уровне экзекьюторов (executors), а уже внутри одного экзекьютора одновременно могут выполняться несколько задач — каждая задача использует часть общего пула памяти. Как это устроено? Executor Memory Основной параметр — spark.executor.memory: сколько всего памяти (heap) будет доступно одному экзекьютору. Например, 8 GB. Плюс есть Memory Overhead (например, spark.executor.memoryOverhead), которую Spark резервирует под системные нужды (JVM overhead, native buffers для shuffle, PySpark worker и т.д.). Количество CPU (cores) у экзекьютора Параметр spark.executor.cores определяет, сколько задач может выполняться параллельно на одном экзекьюторе. Если указано cores=4, значит на одном экзекьюторе одновременно могут бежать до 4 задач (каждой нужен хотя бы 1 CPU core). Соответственно, если есть 8 GB памяти и 4 cores, условно (но не строго!) можно прикинуть, что в среднем на каждую задачу придётся ~2 GB (не учитывая overhead). Пулы памяти внутри JVM В Spark 1.x / 2.x была модель Execution / Storage Memory (т.е. часть памяти для shuffle, часть для кэша и т.д.). В Spark 2.x+ есть параметры вроде spark.memory.fraction, spark.memory.storageFraction и т.д., которые управляют тем, какая доля от всей памяти экзекьютора может пойти под вычисления (execution) и под кэш (storage). На практике это значит, что все задачи экзекьютора делят общий Memory pool. Если одна задача на shuffle, sort или операцию join начнёт съедать слишком много памяти, другие могут получить меньше.
40
Что такое Adaptive query execution? #HalltapeRoadmapDE
Adaptive Query Execution (AQE) — это механизм, появившийся (по умолчанию) начиная с Spark 3.0 (частично был доступен экспериментально и в Spark 2.4), который позволяет динамически оптимизировать физический план выполнения во время работы job. Зачем это нужно? Не всегда точны планы, построенные на основе статистики до старта job: Данные могут быть распределены неравномерно (data skew). Статистика таблиц может быть устаревшей или неполной. При shuffle могут возникать очень «тяжёлые» партиции. AQE даёт Spark возможность «подглядывать» в реальные данные и корректировать план на лету, например: Combine shuffle partitions: если Spark видит, что после shuffle многие партиции мелкие, он может объединить их в меньшее количество более крупных партиций (уменьшая overhead). Split skewed partitions: если обнаружились одна-две «перегруженные» партиции (skew), Spark может автоматически разбить их на несколько более мелких, чтобы параллелить обработку. Динамически менять тип join’а (например, заменять sort-merge join на broadcast join, если одна таблица оказалась достаточно маленькой). Пример включения AQE spark.conf.set("spark.sql.adaptive.enabled", "true") spark.conf.set("spark.sql.adaptive.coalescePartitions.enabled", "true") spark.conf.set("spark.sql.adaptive.skewJoin.enabled", "true") ... Основная выгода Стабильность выполнения больших job, которые сталкиваются с непредсказуемым распределением данных. Улучшение производительности за счёт сокращения shuffle и более «умных» join’ов. Итого, AQE — это механизм «Adaptive Execution», который после начального планирования Spark’ом может «переигрывать» стратегии (shuffle, join, разбиение партиций) на основании фактических метаданных, полученных во время выполнения, что помогает лучше бороться с «skew» и неравномерными нагрузками.
41
Различия driver vs executor теги #x5
Различия **Driver** и **Executor** в Apache Spark — 5 ключевых аспектов с нюансами практического тюнинга 1. **Роль в архитектуре** * **Driver** — единая JVM‑процессия, где создаётся `SparkContext`, строится DAG, ведётся планировщик (TaskScheduler) и агрегируются результаты/метрики. * **Executors** — множество JVM‑процессов на воркерах; каждый держит собственный thread‑pool («task slots»), выполняет присланные задачи, кеширует партиции, пишет shuffle‑файлы. 2. **Число и размещение** * На одно приложение — **один** driver (за исключением High‑Availability Standby driver в YARN). * Executors масштабируются: статически (`--num-executors`) или динамически (`spark.dynamicAllocation.enabled=true`). * В client‑mode driver запускается на машине, где вы вызвали `spark-submit`; в cluster‑mode — внутри кластера, поэтому потеря вашего ноутбука не гробит job. 3. **Память и ядра** * `--driver-memory` покрывает метаданные DAG, Scheduler, сериализацию результатов → опасны «большие collect()/toPandas()». * `--executor-memory` и `--executor-cores` задают объём рабочего кэша (Storage) и параллелизма. Переизбыток памяти на executor’е часто ведёт к медленному Full GC, а не к ускорению. * Shuffle и Broadcast аллоцируются именно в памяти executors; driver хранит лишь «дескрипторы», поэтому чрезмерное число broadcast‑join’ов не бьёт по driver’у, но добивает executors. 4. **Отказоустойчивость** * Если **executor** умирает, driver помечает задачи как «failed» и назначает их заново (список задач хранится у driver’а). * Если **driver** падает, всё приложение считается упавшим; YARN / Kubernetes могут перезапустить driver, но Spark‑уровнем прогресс не сохраняется. * Ошибка в пользовательской логике (`NullPointerException` в RDD‑map) всплывает на executor’е, затем проксируется driver’у, и именно драйвер решает отменить весь stage. 5. **Практические настройки и нетривиальные ловушки** * Запрашивайте у executors чуть больше ядер, чем `spark.sql.shuffle.partitions/2`, чтобы не держать ядра без дела, но избегайте 1 executor = вся нода — крупные heap’ы хуже для GC. * Для driver’а: ограничивайте `spark.driver.maxResultSize`, иначе при action `collect()` на сотни МБ он рухнет с OOM без права реанимации. * В K8s с динамическим аллокатором важно выставить `spark.kubernetes.memoryOverhead` — иначе контейнеры executors киляет OOM‑killer ядра ещё до того, как Spark‑GC скажет своё слово. * Логи: драйвер пишет единственный «главный» лог, executors — каждый свой; при дебаге OOM на executors ищите `GC overhead` именно там, а не в driver‑логе. * Метрики: системные (`/metrics/prometheus`) у driver’а показывают состояние DAG и scheduler’а, а метрики executors отражают использование памяти и shuffle. Без обеих групп невозможно корректно автотюнить кеш и репликацию (Storage tab). Такое краткое, но насыщенное разграничение демонстрирует, что вы не только знаете схему Spark, но и сталкивались с реальными «production‑граблями». Удачи на собесе!
42
Различия между take() и collect() теги #x5
--- `collect()` * Возвращает **все элементы** DataFrame или RDD в **драйверскую программу** как список. * Используется только если **точно уверен**, что объём данных **влезет в память драйвера**. * Может вызвать **OOM (OutOfMemoryError)** на драйвере, если данных много. * Под капотом запускает job и **собирает все партиции** целиком. **Пример:** ```python df.collect() # всё собирает на драйвер ``` --- `take(n)` * Возвращает **первые `n` элементов** из DataFrame или RDD. * Безопаснее `collect()` при отладке или логировании, т.к. **запрашивает ограниченное количество данных**. * Под капотом **останавливает выполнение**, как только собрано `n` элементов — то есть **не все партиции** обрабатываются, а только нужные. * Быстрее и менее ресурсоёмко при выборке подмножества данных. **Пример:** ```python df.take(10) # возвращает первые 10 строк ``` --- Когда использовать * `collect()` — только если **нужны все данные на драйвере** и ты **уверен в размере данных**. * `take(n)` — при **отладке**, **просмотре примеров**, **логировании**, **выборке первых строк** — это безопаснее и быстрее. ---
43
Как работает dynamic allocation теги #x5
Dynamic Allocation в Spark: шпаргалка по настройке В рубрике вопросы с собеседований сегодня разбираем динамическую аллокацию ресурсов. Простыми словами, это автоматическое управление количеством рабочих нод (исполнителей) в зависимости от текущей нагрузки, чтобы эффективно использовать ресурсы кластера: добавлять их при пиках задач и освобождать, когда они не нужны. Как включить? spark.dynamicAllocation.enabled=true Главные параметры — минимум и максимум исполнителей: spark.dynamicAllocation.minExecutors=2 # ниже не опустится spark.dynamicAllocation.maxExecutors=20 # выше не поднимется — таймаут простоя: spark.dynamicAllocation.executorIdleTimeout=60s # удаляет исполнителя через 60 сек простоя Чего лучше избегать? ❌ executorIdleTimeout=10s → постоянное создание/удаление исполнителей. ❌ maxExecutors=1000 → риск перегрузить кластер. ❌ не включить Shuffle Service в YARN → ошибки при удалении (spark.shuffle.service.enabled=true) Когда Dynamic Allocation может быть неэффективен? короткие задачи (< 1-2 мин) → накладные расходы на масштабирование больше, чем выгода. фиксированная нагрузка → если всегда нужно 10 исполнителей, DA только мешает. частые микрозадачи → DA не успевает реагировать. зависимости от конкретных нод → например, данные закэшированы на определенных исполнителях. Как проверить работу? Spark UI → вкладка "Executors" смотрите, как меняется число исполнителей если не удаляются → проверьте Shuffle Service и таймауты Короче, dynamic allocation - это «умное» управление ресурсами. Используйте: если нагрузка меняется (пики/спады). Отключайте: если задачи короткие, фиксированные или требуют жесткого контроля.
44
Сколько партиций при чтении ? #x5
По дефолту 200, но зависит от shuffle.partitions
45
запуск на YARN vs Kubernetes
**YARN ― традиционный, “хардварно‑привязанный” менеджер из мира Hadoop; Kubernetes ― контейнерный, облачно‑нативный оркестратор. Оба Spark‑backend’а делают одно и то же — подают драйверу и executor’ам CPU/память, — но решают это по‑разному, и эти различия хорошо «продаются» на собеседовании.** --- 1. Архитектура и жизненный цикл приложения На YARN Spark создаёт контейнеры через ResourceManager и ApplicationMaster. Он жёстко интегрирован с HDFS, и контейнеры запускаются как процессы на выделенных узлах. На Kubernetes сначала запускается pod с драйвером, и уже он сам создаёт pod’ы с executors через Kubernetes API. Всё работает через контейнеры, в которых уже упакованы версии Spark, Python и нужные библиотеки. ### Что важно подчеркнуть на интервью * **Data locality.** YARN сидит рядом с HDFS, блоки читаются по 10 GbE, поэтому большие shuffle‑ы дешевле. В k8s вы чаще тянете данные через сеть (S3/HDFS‑remote) — проверяйте latency. * **Контейнеризация.** На k8s можно держать несколько версий Spark/Python в одном кластере; в YARN ― только одна сборка на всех. Это критично при AB‑тестах или миграции библиотек ([acceldata.io][4]). --- 2. Масштабирование и динамическое выделение ресурсов * **YARN**: классическая `spark.dynamicAllocation` + **External Shuffle Service**. Работает стабильно, но требует демона Shuffle Service на каждом узле. * **Kubernetes**: c Spark 3.4 драйвер может трекать shuffle‑данные сам (`spark.dynamicAllocation.shuffleTracking.enabled=true`), shuffle‑service не обязателен. Минус — если на pod’ах осталось shuffle‑state, они не «уснут», может случиться dead‑lock при полном использовании кластера ([spark.apache.org][5]). * Практика: на k8s включаю DRA только для “elastic” джоб, где переразмеривание действительно окупается; иначе фиксирую executors — поды стартуют \~5‑10 с дольше из‑за pull’а образа. Red flag, который любят спрашивать: *“Почему DRA в k8s ведёт себя менее предсказуемо, чем в YARN?”* — из‑за того, что Spark не владеет нодами, а лишь просит k8s о pod’ах; если кластер multi‑tenant, k8s может сразу отдать ресурсы другим workload’ам. ([Reddit][6]) --- 3. Безопасность и мультитенантность * **YARN**: Kerberos, токены, ACL‑ы на HDFS. * **Kubernetes**: Spark создаёт secret на job, но любой, кто видит pod’ы в namespace, увидит и secret → обязательно настраиваем RBAC/Network Policies, иначе лажаемся уже на pentest’е. ([spark.apache.org][7]) * Плюс: в k8s есть **Resource Quota** и **Namespace‑sandboxing** — удобно делить ресурсы по командам, чего YARN умеет только грубо через Capacity/Fair Scheduler. ([acceldata.io][4]) --- 4. Наблюдаемость и отладка * YARN — Web‑UI ResourceManager, логи в HDFS, Ambari/CM. * k8s — pod‑level метрики (`kubectl top`, Prometheus, Grafana) и общие лог‑стэки (Fluentd → ELK, CloudWatch). Гораздо тоньше видно CPU‑throttling, OOMKilled и т.д. ([Medium][3]) --- 5. Производительность и стоимость * Amazon на EMR/EKS мерил: \~5 % ускорения и до 61 % экономии \$\$ за счёт авто‑масштабирования и более плотной упаковки pod’ов. Главное — считать, а не верить на слово. ([Pepperdata][8]) --- 6. Edge‑кейсы, о которых часто «спотыкаются» 1. **Stateful shuffle** на k8s: большие shuffle‑файлы → нужно PVC или hostPath, иначе диск pod’а исчезает при рестарте. 2. **GPU or NIC passthrough**: в k8s есть `nvidia.com/gpu` ресурсы, в YARN нужно cloudera‑© патчи. 3. **Streaming + checkpoint**: в YARN держим HDFS, в k8s — либо remote HDFS, либо S3 + `FileSystem` impl; latency выше, ставим меньший micro‑batch. 4. **Клиентский режим (`--deploy-mode client`)**: в k8s придётся прокидывать порты или пользоваться `spark.connect`; в YARN достаточно ssh‑jump. --- 7. Когда что выбирать (правило 80/20) * **Остаёмся на YARN**, когда у вас уже есть тяжёлый on‑prem Hadoop‑стек, близость к HDFS критична, а кластер статичен и предсказуем. * **Идём в Kubernetes**, когда: * Нужно несколько версий Spark/питона параллельно; * Кластер живёт в облаке, где pay‑as‑you‑go → важно гасить idle‑ноды; * Требуются тонкие RBAC/Quota, CI/CD, Helm/Argo CD; * Нужна унификация с остальными микросервисами (единый observability‑стэк, policy‑engine и т.д.). --- Как сформулировать ответ на собесе > *«Я разворачивал Spark и на YARN‑кластере Cloudera, и в k8s‑кластере (EKS) через Spark‑Operator. > На практике вижу, что YARN выигрывает, когда нужен data‑locality к HDFS и стабильные SLA, а Kubernetes берёт своё контейнерной изоляцией, autoscaling‑ом и DevOps‑дружелюбностью. > Для elastic‑batch я запускаю Spark‑jobs как transient pods с dynamic allocation (shuffle‑tracking с 3.4+), логи складываю в S3 через fluent‑bit, а метрики — в Prometheus. > При больших shuffle‑ах подключаю PVC на executor‑pods. > Безопасность закрываю RBAC + NetworkPolicy, потому что secrets от Spark лежат прямо в pod‑env. > В результате Spark‑workload забирает ровно столько ресурсов, сколько нужно, и не держит простаивающий YARN‑кластер».* Такой спектр деталей показывает, что вы действительно работали с обоими подходами и понимаете подводные камни.
46
Нехватка памяти в Apache Spark: симптомы, причины и методы устранения #Я
Нехватка памяти в Apache Spark: причины, симптомы и методы устранения 1. **Java Heap Space (OutOfMemoryError) в драйвере** **Симптом** ``` java.lang.OutOfMemoryError: Java heap space ``` **Причины** * Слишком много создаваемых объектов в драйвере. * Большие DataFrame или коллекции, создаваемые в циклах. **Как лечить** * Увеличить память драйвера (параметр **--driver-memory**). Пример в spark-submit: ```bash spark-submit --driver-memory 8g ... ``` * Снизить количество создаваемых объектов. → Выносить тяжёлые вычисления и создание объектов на экзекьюторы. * Уменьшить количество датафреймов или колонок, особенно если они создаются в цикле. → Собирайте их в одну структуру или пересматривайте архитектуру вычислений. --- 2. **Container killed by YARN for exceeding memory limits** **Симптом** ``` Container killed by YARN for exceeding memory limits ``` **Причины** * Недостаточно памяти на экзекьюторах. * Плохая структура DAG → тяжёлые shuffle-операции. * Неправильный выбор количества и размера экзекьюторов. **Как лечить** 2.1 Перестраивать граф (DAG) **Где смотреть** * Spark UI → DAG visualization * DataFrame.explain() **Что делать** * Отложить shuffle-операции (группировки, сортировки) ближе к концу пайплайна. → Использовать оптимизацию Catalyst → lazy evaluation → избегать лишних действий до конца пайплайна. * Объединять DataFrame заранее, чтобы не делать лишних join/agg в середине. 2.2 Увеличить память экзекьюторов **Как** * Изменить **--executor-memory** Пример: ```bash spark-submit --executor-memory 16g ... ``` * Также обязательно тюнить **spark.yarn.executor.memoryOverhead** (по умолчанию \~10% от executor-memory). → Добавляет память для JVM overhead, native libraries и т.д. Пример в конфиге или spark-submit: ```bash --conf spark.yarn.executor.memoryOverhead=2048 ``` 2.3 Склеивать экзекьюторы **Идея** * Вместо 2 экзекьюторов по 2 ядра и 4 ГБ → сделать 1 экзекьютор с 4 ядрами и 8 ГБ. → Это сгладит потребление памяти разными задачами. **Где настраивать** ```bash --executor-cores 4 --executor-memory 8g ``` **Ограничения** * Не более 5 ядер на экзекьютор (иначе → деградация из-за I/O HDFS и проблем с GC). 2.4 Уменьшить параллелизм ```bash --conf spark.sql.shuffle.partitions=50 ``` * Снизить количество ядер на экзекьютор → меньше одновременно выполняемых задач. 2.5 Уменьшить размер партиций **Когда важно** * Если отдельная партиция не помещается в память. * При repartition: ```python df.repartition(100) ``` или coalesce (если можно объединять): ```python df.coalesce(50) ``` --- 2.6 Тюнинг сериализации и памяти **Как** * Использовать Kryo вместо Java-сериализации: ```python spark.conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer") ``` * Настроить фракции памяти: ```bash --conf spark.memory.fraction=0.6 # сколько от executor-memory для execution (остальное для storage) --conf spark.memory.storageFraction=0.5 # сколько от fraction для storage (остальное для execution) ``` * При проблемах с кэшем → использовать MEMORY\_ONLY\_SER вместо MEMORY\_ONLY: ```python df.persist(StorageLevel.MEMORY_ONLY_SER) ``` Это позволит сериализовать данные и снизить потребление памяти. --- ВАЖНО **Когда "огромные экзекьюторы" — это плохо:** * Если экзекьютор имеет много ядер (6+) и очень много памяти → GC становится bottleneck. * HDFS и другие источники → неэффективно работают с большим количеством потоков на одном узле. Рекомендация Cloudera (подтверждено практикой): **до 5 ядер и соответствующее соотношение памяти — оптимально**. - Если хочешь — могу прямо показать **готовый пример spark-submit команды**, которая реализует сразу почти все советы (memoryOverhead, Kryo, executor-memory и т.д.). Хочешь?
47
Основные проблемы в Apache Spark (помимо нехватки памяти и партиций): симптомы и методы решения getrejected
**долгий джойн** Симптом: Очень долго выполняются операции join или приложение падает из-за нехватки памяти Как лечить: ● Если один издата сетов намного меньше другого, то можно использовать broadcast ● Если дата сеты большие,то использовать фильтр Блума или бакетирование **1. Broadcast больших датасетов** **Симптом** * Джоба "зависла": * Нет активных тасков. * Дальнейшие вычисления не идут. **Причина** * Слишком большие датасеты (обычно больше нескольких сотен МБ) передаются через Broadcast. * Их распространение на экзекьюторы → перегружает сеть и память. **Как лечить** * **Не использовать Broadcast для больших датасетов.** → Вместо этого выполнять Join'ы через стандартные методы: * Broadcast join → только для маленьких таблиц (до 10-100 МБ). * Для больших → **Shuffle join** (Spark сам выберет, если убрать broadcast hint). **Где реализовать** * Убрать `.hint("broadcast")` * Убедиться, что Spark сам не выбрал Broadcast → смотреть в Spark UI → Physical Plan → BroadcastHashJoin. **Пример**: ```python df1.join(df2, "id") # Без .hint("broadcast"), Spark сам выберет оптимальный метод ``` --- **2. Переиспользование общих ресурсов (например, соединений)** **Симптом** * Тормоза на шаге **map**. **Причина** * Внутри `.map` создаётся объект (например, соединение) → на каждый элемент создаётся новый экземпляр → тормозит. **Как лечить** * Использовать **mapPartitions** вместо **map**. → На каждый partition создаётся один объект → многократное переиспользование. **Где реализовать** **Плохой пример (тормозит):** ```scala rdd.map { x => val conn = new Connection() // работа с conn conn.close() } ``` **Хороший пример (эффективно):** ```scala rdd.mapPartitions { iter => val conn = new Connection() val result = iter.map { x => // работа с conn x } conn.close() result } ``` **Важно**: * mapPartitions не всегда удобен, но для тяжёлых ресурсов (соединения, объекты с дорогой инициализацией) → must have. --- 3. **Проблемы сериализации (NotSerializableException)** **Симптом** ``` Task not serializable: java.io.NotSerializableException ``` **Причина** * Передача в таски несериализуемых объектов (например, драйверских классов или открытых соединений). **Как лечить** * Создавать несериализуемые объекты только на экзекьюторах (внутри map/mapPartitions), а не на драйвере. **Где реализовать** **Плохой пример (исключение):** ```scala val conn = new Connection() // создаётся на драйвере rdd.map { x => conn.query(x) } ``` **Хороший пример (нормально):** ```scala rdd.mapPartitions { iter => val conn = new Connection() // создаётся на экзекьюторе val result = iter.map(x => conn.query(x)) conn.close() result } ``` --- 4. **Проблемы сериализации 2 (Task of very large size)** **Симптом** ``` WARN TaskSetManager: Stage X contains a task of very large size (100500 KB). The maximum recommended task size is 100 KB. ``` **Причина** * Большие объекты попали в замыкание → их надо сериализовать и отправить на экзекьютор → таски становятся гигантскими. **Как лечить** * Убрать большие объекты из замыканий. **Варианты решения**: 4.1 Перенос больших коллекций в Broadcast ```python val broadcastVar = spark.sparkContext.broadcast(largeCollection) rdd.map { x => broadcastVar.value.get(x) } ``` 4.2 Создание больших объектов на экзекьюторах ```scala rdd.mapPartitions { iter => val bigObject = new BigObject() iter.map { x => bigObject.process(x) } } ``` **Где смотреть в реальности** * В Spark UI в стадии будет видно размер тасков (Task Size). * Можно логировать замыкания через IDE (inspect closure). ---
48
Как тестировать и деплоить спрак? #Я
**Тема:** Тестирование и деплой PySpark‑проекта (2024) --- 1. Тестирование 1.1 Пирамида тестирования в Spark‑проектах К классическим уровням (Unit → Integration → End‑to‑End) у дата‑инженеров добавляется верхний слой **Data Quality**. * **Unit‑тесты** валидируют отдельные функции и трансформации. * **Integration‑тесты** убеждаются, что модули корректно работают вместе и дружат с внешними сервисами. * **Data Quality‑тесты** гарантируют соблюдение схем, контрактов и бизнес‑ограничений самих данных, независимо от кода. 1.2 Unit Testing **Зачем:** * мгновенная обратная связь при изменениях; * ускорение разработки за счёт раннего обнаружения ошибок; * надёжная проверка крайних входных значений. **Инструменты:** * библиотека **chispa** с ассерт‑хелперами `assert_df_equality`, `assert_column_equality` и «approx»‑вариантами для float‑ов. **Лучшие практики:** * пишем два рода тестов: для «чистых» Python‑функций и для Spark‑трансформаций; * каждую трансформацию выносим в отдельную функцию, а не в длинную цепочку `.withColumn()` — это резко упрощает изоляцию при тестировании. 1.3 Integration Testing Есть два рабочих сценария. 1. **Изолированная проверка внешних систем** — эмулируем или поднимаем тестовый S3, Kafka, Postgres и проверяем подключения по отдельности. 2. **Полный прогон DAG** — триггерим пайплайн целиком в оркестраторе (например, Airflow), асинхронно следим за статусом каждого job: если хоть одна падает, CI‑билд отметается. 1.4 Data Quality Testing **Что проверяем:** * контракты источников (количество строк, типы, nullable/не‑nullable); * бизнес‑правила (уникальность ключей, диапазоны числовых значений, формат телефонов и т. д.). Эти тесты не отвечают за логику пайплайна; они «стоят сбоку» и валидируют саму «субстанцию» данных. **Инструменты:** * **Great Expectations** – гибко, запах Unit‑тестов данных; * **Soda‑Core** – дружит со Spark, легко встраивается задачей в Airflow. Пример рабочего цикла для Soda: объявляем YAML‑конфиг, в коде создаём `Scan`, привязываем Spark‑сессию, добавляем YAML и вызываем `scan.execute()`. --- 2. CI & Deployment 2.1 Где размещать Spark‑кластер * **Stand‑alone** — минимальная инфраструктура; гуд для дев‑окружения и лёгких прод‑нагрузок. * **YARN** — классические Hadoop‑кластеры. * **Kubernetes** — облачное‑родное решение, auto‑scaling pod‑ов driver/executor. 2.2 Режим деплоя (`deployMode`) * **client** — driver крутится локально (часто на Airflow‑шедулере), executors — в кластере. Удобен для дебага, но чувствителен к сетевым обрывам. * **cluster** — и driver, и executors запускаются внутри кластера; это production‑default: минимальная точка отказа. 2.3 Запуск Spark‑тасков из Airflow Доступны три популярных executor‑а: 1. **LocalExecutor** — шедулер и воркер на одной машине; простейший, но ресурсоёмкий. 2. **CeleryExecutor** — несколько воркеров + брокер (RabbitMQ/Redis); хорошо горизонтально масштабируется. 3. **KubernetesExecutor** — каждая задача в отдельном pod, получает изоляцию и auto‑scaling. **Чего избегать:** * `PythonOperator`, который прямо внутри запускает Spark, – получаем «тёплый кластер» в одном процессе. * `BashOperator`, исполняющий `spark-submit` на той же ноде Airflow. **Хорошие решения:** * заворачивать `spark-submit` в **KubernetesPodOperator**; * либо тонким баш‑таском запускать `spark-submit` через SSH на полноценном кластере. 2.4 Управление зависимостями 1. **Установка пакетов на все ноды** — устаревший и трудоёмкий подход. 2. **Передача `.zip/.egg/.py` через `--py-files`** — быстро для кода, но не переносит C‑расширения. 3. **conda‑pack** — создаём окружение `conda create …`, пакуем `conda-pack -o env.tar.gz`, передаём архив флагом `--archives`, а переменной `PYSPARK_PYTHON` указываем `./environment/bin/python`. 4. **venv‑pack** — тот же принцип для `python -m venv`. 5. **PEX** — собирает одну самодостаточную исполняемую «капсулу» `.pex`; удобно для небольших проектов. Важно: `PYSPARK_DRIVER_PYTHON` (локальный driver) и `PYSPARK_PYTHON` (executor‑ы) держите раздельно, особенно при cluster‑режиме. --- 3. Типичная схема CI/CD 1. Запускаем юнит‑тесты и проверки качества данных сразу после `git push`. 2. Если всё зелёное, из CI вызываем интеграционный тест: триггрим Airflow‑DAG и ждём его успешного завершения. 3. После этого собираем контейнер или архив с зависимостями и публикуем в registry. 4. Продакшен‑релиз (Helm‑чарт или `spark-submit` через Airflow) происходит только при зелёном CI и, как правило, после ручного `approval` в pipeline. --- Контрольный вопрос **Как в одном CI‑конвейере проверить корректность кодовой логики PySpark‑проекта, убедиться в качестве данных и затем надёжно задеплоить задачу в Kubernetes без проблем с зависимостями?** Ответ кроется в описанном конспекте: юнит‑ и Data‑quality‑тесты обеспечивают проверку кода и данных; интеграционный прогон DAG подтверждает совместимость компонентов; подготовка окружения через conda‑pack/venv‑pack/PEX устраняет «ад‑библиотек»; а Kubernetes‑ориентированный deploy‑mode = cluster плюс корректный Airflow‑executor гарантируют «безболезненный» вывод в прод.
49
Как мониторить спрак #я?
1. Зачем всё это Мониторинг — единственный способ узнать, тратит ли ваш кластер ресурсы впустую, висит ли задача в GC‑стопе или тихо отвалилась половина executors. Spark уже содержит всю телеметрию; остаётся правильно её «вытащить» и сохранить. --- 2. Основные компоненты **History Server (SHS)** * Читает event‑логи (`spark.eventLog.enabled=true`) и строит ретроспективный UI. * Полезен для пост‑морте м диагностики, но не для alert‑инга в реальном времени. * Скалируется горизонтально: несколько SHS могут читать одну директорию с логами (S3/HDFS). **Listener Bus** * Шина, через которую Spark публикует события в рантайме. * Можно подписаться пользовательским `SparkListener`‑ом и стримить JSON куда угодно. **Ключевые типы событий** (не исчерпывающий список): `SparkListenerApplicationStart/End`, `ExecutorAdded/Removed/Blacklisted`, `JobStart/End`, `StageCompleted`, `TaskEnd`, `ExecutorMetricsUpdate` и др. Эти события покрывают весь жизненный цикл приложения и дают точку для custom‑метрик. --- 3. MetricsSystem **Конфигурация** * Любая проперти, начинающаяся с `spark.metrics.conf.*`, переопределяет значения из `metrics.properties`. * Файл `metrics.properties` кладётся в `$SPARK_HOME/conf/`. Поддерживает wildcards: `*`, `driver`, `executor`, `master`, `applications`. **Sources (откуда берём метрики)** Spark уже поставляет десяток стандартных `MetricsSource`: JVM, BlockManager, Shuffle, DAGScheduler, Executor, ExecutorAllocationManager, AppStatus, Accumulators и др. Их достаточно для 90 % сценариев; экзотика внедряется собственным Source‑классом. **Sinks (куда пишем)** Встречаются как pull‑, так и push‑варианты: * Pull: `MetricsServlet` (JSON), `PrometheusServlet`, `JmxSink`. * Push: `ConsoleSink`, `CSVSink`, `Slf4jSink`, `GraphiteSink`, `StatsdSink`. Помните о сетевых дорогах: push работает за NAT, pull хорош для балансировки Prometheus‑ом. --- 4. Конкретные протоколы **JSON API** Собирается `MetricsServlet`. После параметров ``` *.sink.servlet.class = org.apache.spark.metrics.sink.MetricsServlet *.sink.servlet.path = /metrics/json ``` драйвер отдаёт метрики на `http://:4040/metrics/json/`. Формат прост: `gauges`, `counters`, `histograms`, `meters`, `timers` — содержимое удобно греп‑ать, но плохо агрегировать на несколько нод. **JMX** Добавьте ``` *.sink.jmx.class = org.apache.spark.metrics.sink.JmxSink ``` и цепляйтесь к стандартному порту JMX (или проксируйте через JMX‑экспортер — привычный способ скормить метрики Prometheus‑у без сабжового сервлета). **Prometheus** Начиная с Spark 3.2 есть `PrometheusServlet` («pull»). Настройка: ``` *.sink.prometheusServlet.class = org.apache.spark.metrics.sink.PrometheusServlet *.sink.prometheusServlet.path = /metrics/prometheus ``` После этого драйвер и каждый executor публикуют метрики в формате Prometheus‑text. Тонкость: если executors создаются динамически (dynamic allocation), придётся прокидывать сервис‑дискавери. Популярный подход — регистрировать executors в Consul через Spark‑плагин (пример CERNDB/SparkPlugins). **Graphite** Для «старой школы» или если в компании уже стоит графит‑стек: ``` *.sink.graphite.class = org.apache.spark.metrics.sink.GraphiteSink *.sink.graphite.host = graphite.example.com *.sink.graphite.port = 2003 *.sink.graphite.period = 10 *.sink.graphite.unit = seconds ``` Пушит метрики каждые `period` секунд. --- 5. Pull VS Push: что выбрать * **Pull** (Prometheus, servlet, JMX): удобно, если у вас единый сборщик и нужна центральная ретенция. Минус — ворох firewall‑правил, если executors динамически рождаются в k8s. * **Push** (Graphite, StatsD, Slf4j, CSV, Console): просто пробрасывает TCP/UDP наружу, не требует, чтобы сборщик «видел» ноды. Но нужно следить, чтобы приложение не «забило» сеть при большой детализации. --- 6. Как собрать рабочую систему мониторинга за час 1. **Создайте `metrics.properties`** в `$SPARK_HOME/conf/` на всех нодах или положите его в Docker‑образ. 2. Пропишите `spark.metrics.namespace=${spark.app.name}` — так метрики разных перезапусков одной задачи попадут под общий префикс вместо хаотичных `spark-app-`. 3. Включите нужный Sink, чаще всего PrometheusServlet. 4. Если кластер в Kubernetes — объявите ServiceMonitor (или `PodMonitor`) для `4040` драйвера и `404*/` executor‑портов. 5. В Grafana импортируйте готовый дашборд (есть официальные примеры) и добавьте alert‑правила. --- 7. Метрики, за которыми имеет смысл следить * **Активные задачи (`spark_driver_DAGScheduler_stage_runningStages`)** ­— всплеск без роста throughput намекает на узкое место. * **CPU executors (`executor.cpuTime`)** — ниже 60 % ? возможно, ждём I/O. * **Память executors (`executor.memoryUsed`)** — близко к `executor.memoryMax` → ждите OOM или слишком агрессивный spilling. * **`task-metrics.peakExecutionMemory`** — помогает ловить неявные broadcast‑join‑ы. * **I/O (`inputMetrics.recordsRead`, `shuffle.read.bytes`)** — резкий рост без CPU — проблема в источнике или сети. Полный список — в официальной доке § «Executor metrics» и «Task metrics». --- 8. Нестандартные кейсы и подводные камни * **Dynamic Allocation**: executors приходят и уходят, поэтому статический scrape‑конфиг ломается. Решение — Consul/SD‑плагин, либо push‑sink. * **Шум от короткоживущих приложений**: каждое spark‑submit делает новый `spark.app.id`, Grafana‑график превращается в радугу. Лечится `spark.metrics.namespace`. * **Особенности k8s**: port‑mapping + IP‑пересоздание pod‑ов → на графиках может появляться дыра. Scrape по метке `pod` + персистентные labels минимизируют прыжки. * **Многокластерные инсталляции**: держите отдельный `namespace` или префикс на каждый кластер, чтобы метрики не смешивались. --- Контрольный вопрос **«Как организовать сбор и экспонирование метрик Spark‑приложения так, чтобы Prometheus автоматически обнаруживал все executors, а Grafana могла алертить по превышению памяти executors без ручной переконфигурации при каждом запуске?»** Ответ найдёте в конспекте: ListenerBus и MetricsSystem дают данные; PrometheusServlet + Consul‑плагин обеспечивают динамическое discovery; `spark.metrics.namespace` стабилизирует имена; а alert‑инг строится поверх ключевых метрик памяти executors.
50
Что знаешь про спрак стримминг? #я
--- 0. Зачем вообще ещё один API Structured Streaming — это потоковый движок в составе Spark SQL, который делает ровно две вещи: 1. **Делает поток похожим на обычный DataFrame.** Пишешь тот же код, что и в batch‑ETL, только добавляешь `.readStream / .writeStream`. 2. **Прячет микробатчи и восстановление после сбоя.** Ты думаешь о бизнес‑логике, Spark думает о checkpoint‑ах, offsets и state‑файлах. --- 1. История и терминология (почему DStreams устарели) * **Spark Streaming (DStreams)** — API 2013 г. на RDD‑ах: один RDD на каждый микробатч. Нет Catalyst‑оптимизации, сложных join‑ов, коллбеков на SQL‑уровне. * **Structured Streaming** (stable c Spark 2.2) — поток как «бесконечная таблица». Всё развитие в 3.x идёт здесь: новые триггеры, watermark‑ы, RocksDB‑state, `dropDuplicatesWithinWatermark`, full‑outer stream‑stream join. --- 2. Модель выполнения: «бесконечная» таблица + инкрементальный план 1. Источник отдаёт событие → попадает в микробатч. 2. К микробатчу применяется тот же логический план, что и в batch. 3. Spark поддерживает лог прогресса и, если внезапно упали, «доигрывает» только недоделанные батчи. 4. В Spark 3.5 watermark может передаваться между операторами, а `dropDuplicatesWithinWatermark` экономит‑state при дедупликации. --- 3. Fault Tolerance и exactly‑once * **Checkpoint‑каталог**: offsets + снапшоты state‑store + метаданные плана. * **State Store** по‑умолчанию RocksDB (с WAL) → повторный запуск ≠ дубликаты. * **Exactly‑once end‑to‑end** = идемпотентный источник (Kafka transactional id) + идемпотентный sink (Delta Lake, Hudi, атомарный файловый commit). --- 4. Источники и приёмники *Источники*: Kafka, Kinesis, файловая папка, socket, rate‑generator. *Sink‑и*: файлы, Kafka, console/memory (отладка), плюс `foreach` или `foreachBatch`, где можно записать в ClickHouse, PostgreSQL, Redis — что угодно, лишь бы код был идемпотентным. --- 5. Триггеры и режимы вывода * **as‑fast‑as‑possible (default)** — Spark запускает батч сразу, как есть данные. * **ProcessingTime("5 s")** — фиксированный шаг; если предыдущий батч шёл 7 с, следующий стартует сразу после. * **Trigger.Once / AvailableNow** — прочитать всё накопленное и завершиться (удобно для backfill). * **Continuous** (эксперимент, \~1 мс задержки) — не микробатчи, а долговечные task‑и; практически ограничен простыми map/filter. Modes: * `append` — новые строки (insert‑only). * `update` — только изменённые агрегаты. * `complete` — переписываем всю агрегированную таблицу (часто при отсутствии watermark‑а). --- 6. Watermark‑ы, окна и stateful‑операции * `withWatermark("eventTime", "10 minutes")` говорит, сколько ждём опоздавших событий. * Стратегия для нескольких watermark‑ов (`min | max`) настраивается флагом `spark.sql.streaming.multipleWatermarkPolicy`. * Окна: **tumbling**, **sliding**, **session**. * API для кастомного стейта: `mapGroupsWithState`, `flatMapGroupsWithState`. * `dropDuplicatesWithinWatermark` — встроенный dedup c TTL, без ручного key‑value состояния. * Late‑data после watermark → либо отбрасывается, либо side‑output (если написал соответствующий sink). --- 7. Join‑ы * **stream‑static** — статическую сторону можно бродкастить. * **stream‑stream** — нужны watermark‑ы + ограниченное окно; c 3.4 есть full‑outer. * State хранится на обеих сторонах; размер = окно + watermark + запас, поэтому важно TTL. --- 8. Мониторинг, backpressure, отладка * `StreamingQueryListener` даёт JSON‑метрики (inputRowsPerSecond, processedRowsPerSecond, stateOperators). * Spark UI ➜ вкладка *Structured Streaming*: DAG, задержка, граф состояния. * У Kafka‑источника: `maxOffsetsPerTrigger` (upper bound) и `minOffsetsPerTrigger` (нижний bound) дают гибкий backpressure. * Смотри разницу между input‑ и processed‑rows, чтобы понять узкое место. --- 9. Хитрости и типичные кочки 1. Лишний `repartition` в стриме = shuffle каждый батч, бьёт по latency. 2. Outer join + late data → двойной state, TTL ≥ watermark + 2 × window. 3. Exactly‑once File sink возможен только там, где rename атомарен (в S3 — через manifest log). 4. Checkpoint‑путь должен быть в распределённом хранилище (S3/HDFS/PVC), иначе потеря ноды = потерянный state. 5. Политика multiple watermark‑ов `min` в 3.4 могла стопорить DAG; в 3.5 это исправлено и вынесено в конфиг. --- 10. Мини‑пример: Kafka → агрегация → ClickHouse через foreachBatch ```python from pyspark.sql import SparkSession from pyspark.sql.functions import window, col, sum import clickhouse_driver # фигурально – зависит от вашей обёртки spark = SparkSession.builder.appName("OrdersByRegion").getOrCreate() orders_raw = (spark.readStream .format("kafka") .option("kafka.bootstrap.servers", "kafka:9092") .option("subscribe", "orders_topic") .load()) schema = "region STRING, amount DOUBLE, event_time TIMESTAMP" orders = (orders_raw .selectExpr("from_json(CAST(value AS STRING), '{}') AS data".format(schema)) .select("data.*")) agg = (orders .withWatermark("event_time", "15 minutes") .groupBy(window(col("event_time"), "1 hour"), col("region")) .agg(sum("amount").alias("total_amount"))) def write_to_ch(batch_df, batch_id): # один микробатч → одна транзакция conn = clickhouse_driver.Client(host="clickhouse") rows = batch_df.collect() # маленький пример; в проде лучше параллельный встав conn.execute("INSERT INTO orders_by_region VALUES", rows) (agg.writeStream .outputMode("update") .foreachBatch(write_to_ch) .option("checkpointLocation", "s3://chk/orders_by_region") .trigger(processingTime="5 minutes") .start() .awaitTermination()) ``` *Что видно*: `withWatermark` защищает от поздних событий, `outputMode="update"` даёт только дельту, `foreachBatch` гарантирует exactly‑once на уровне ClickHouse, потому что одна функция → одна транзакция. --- Итог *Structured Streaming* — это **«batch‑код, который случайно работает в real‑time»**. Запомни три вещи: 1. **Логический план тот же**, Spark сам будет инкрементально «догрывать» поток. 2. **Checkpoint + state‑store** = fault‑tolerance; храни их в устойчивом месте. 3. **Watermark‑ы** — граница терпения к опоздавшим событиям; без них любой окно‑join превращается в «вечный» state‑leak. Если освоишь эти основы, остальное сведётся к настройке *триггеров*, *мониторингу* и *причесыванию* редких углов вроде outer join c late data.
51
Расскажи про RDD. Что это? Зачем нужен? Где юзал? #я
# Простой RDD --- 📌 **Что такое RDD (Resilient Distributed Dataset)** **RDD — это основной низкоуровневый абстрактный тип данных в Spark.** Это **распределённая неизменяемая коллекция объектов**, которая: * **разделена на партиции (partitions)** — обработка идёт параллельно на разных узлах; * **immutable (неизменяемая)** — каждый раз создаётся новая копия после трансформации; * **ленивая (lazy evaluation)** — Spark строит DAG (граф вычислений) и запускает выполнение только при необходимости (при действии действия → action); * **fault-tolerant (устойчива к сбоям)** — восстановление партиций возможно за счёт lineage (цепочки операций, которые привели к данным). --- 📍 Почему придумали RDD? До Spark был Hadoop MapReduce → там всё очень тяжеловесно и медленно. RDD решили сделать **более удобной моделью данных для распределённых вычислений**. Так появилась концепция: * **Трансформаций (transformations)** — ленивые операции, создающие новые RDD. → примеры: `map`, `filter`, `flatMap`, `union`, `distinct`, `join`. * **Действий (actions)** — запуск DAG и получение результата. → примеры: `collect`, `count`, `first`, `take`, `reduce`, `saveAsTextFile`. **Очень важно!** → Трансформации ленивые → ничего не выполняется, пока ты не вызвал action. → Только в момент action Spark начинает "материализовывать" граф и выполнять задачи. --- 📍 Внутреннее устройство RDD **RDD состоит из:** * **partitions (разделы)** → по ним распределяются данные (обычно 128 мб). * **dependencies (зависимости)** → родительские RDD (lineage). * **partitioner (опционально)** → определяет, как данные разделяются (HashPartitioner, RangePartitioner). * **compute function** → функция для вычисления содержимого партиции. * **storage level** → кэширование и персистенция (`MEMORY_ONLY`, `MEMORY_AND_DISK` и т.п.). Lineage в Spark: Это часть объекта RDD → поле dependencies. Живёт в памяти драйвера → Master Node (Driver Program). Используется Spark DAGScheduler для планирования задач. Хранится до тех пор, пока RDD не уничтожен (и не уничтожена сессия Spark). Пример lineage (цепочки преобразований): ```text RDD1 (textFile) -> map -> RDD2 -> filter -> RDD3 -> count (action) ``` В любой момент Spark может "переиграть" RDD3, если, например, потерялся узел — просто заново выполнятся трансформации. lineage хранится в JVM-памяти Driver'а как цепочка объектов RDD и их зависимостей. --- 📍 Типы зависимостей (очень важно!) * **Narrow dependencies** → каждая партиция дочернего RDD зависит от одной или нескольких партиций родительского. → Примеры: `map`, `filter`. → Можно выполнять **параллельно без shuffle**. * **Wide dependencies** → дочерняя партиция зависит от многих родительских (нужно перемещать данные по сети → shuffle). → Примеры: `groupByKey`, `reduceByKey`, `join`. → Это уже дорого, нужно планировать. **Shuffle — узкое место Spark!** → поэтому в DataFrame и Catalyst стараются оптимизировать план, чтобы уменьшить wide dependencies. --- 📍 Когда использовать RDD (и почему иногда это необходимо) **Плюсы RDD:** * Полный контроль над вычислениями. * Можно работать с нестандартными структурами данных (не табличными). * Не нужно описывать схему. * Прямой доступ к партициям. * Легко интегрировать кастомные логики (например, сложные функции маппинга). **Минусы RDD:** * Нет оптимизации Catalyst → каждый шаг выполняется как есть → может быть **очень медленно**. * Нет автоматического управления схемой → вручную обрабатывать nullable, типы и т.д. * Нет возможности SQL. * Требует больше памяти и процессора. --- 📍 Очень важные нетривиальные моменты (их часто не знают даже senior) * **RDD не сериализует данные между трансформациями в JVM** → быстрее, но требует больше памяти. * **RDD может кешироваться или сохраняться (persist)**, но надо правильно выбирать уровень хранения. → плохой выбор → OOM (Out of Memory). * **Если потеряна партиция → lineage позволяет восстановить RDD**. * **Wide dependencies → shuffle → sort → spill на диск** → тут возникают узкие места в Spark job'ах. * **RDD не "видит" источников данных как табличные данные** → не может делать predicate pushdown. → Например, в Parquet RDD будет читать все данные, в DataFrame только нужные столбцы. --- 📍 Как создать RDD (Python пример) ```python rdd = spark.sparkContext.parallelize([1, 2, 3, 4, 5]) Применение map mapped_rdd = rdd.map(lambda x: x * 2) Действие print(mapped_rdd.collect()) ``` ```python # Чтение файла → RDD text_rdd = spark.sparkContext.textFile("hdfs://path/to/file.txt") Фильтрация errors_rdd = text_rdd.filter(lambda line: "ERROR" in line) Действие print(errors_rdd.count()) ``` --- 📌 Когда RDD действительно нужен 1️⃣ Нетабличные или произвольные бинарные данные Пример: У тебя в HDFS лежат 1000000 файлов в нестандартизированном бинарном формате (или просто gzip без четкой схемы). → DataFrame прочитать это не может (ему нужна schema). → RDD (sc.binaryFiles / sc.wholeTextFiles) — идеально: читает как (filename, bytes) и ты сам пишешь парсер в map. Реальный случай → обработка "сырых" данных с устройств (например, телеметрия с машин или IOT) в формате protobuf/avro, где schema часто меняется → RDD тут спасает. 3️⃣ Очень кастомная трансформация, которую SQL/DF не выразить Пример: Ты обрабатываешь сложный объект (вложенный JSON → в нём списки → в них мапы → в них ещё что-то). DataFrame поддерживает nested структуры, но сложные UDF внутри "ломают" оптимизацию → в итоге работает медленнее. → RDD + map → можешь написать произвольную функцию на чистом Python/Scala. Реальный случай → в AdTech pipeline обрабатываются "bid request" события → они очень вложенные → проще и быстрее писать в RDD.
52
Расскажи про dataframe. Что это? Зачем нужен? Где юзал? #я
🧩 Что такое DataFrame — коротко, но без потери деталей * **DataFrame в Spark — это объект‑обёртка над логическим планом запроса**. * В Scala/Java это класс `org.apache.spark.sql.Dataset[Row]`. * В PySpark — класс `pyspark.sql.dataframe.DataFrame`. * Он не «хранит» сами данные — только **инструкцию, как эти данные получить**. * Один и тот же физический набор данных могут представлять десятки разных DataFrame‑объектов (разные фильтры, агрегации), пока вы не вызовете Action. --- 1. Зачем нужен этот API‑слой и чем он выше RDD * DataFrame хранит **схему** (имена и типы колонок) → движок точно знает, что вы делаете. * Catalyst‑оптимизатор может: * вынести фильтры до чтения файла (predicate push‑down); * отбрасывать ненужные колонки (column pruning); * заменить shuffle‑join на broadcast‑join, если одна сторона маленькая; * в режиме AQE (Adaptive Query Execution) уменьшить число shuffle‑партиций «на лету» и перестроить план, если статистика изменилась. * Всё это невозможно на «сырых» RDD, потому что там нет схемы и семантики операций. --- 2. Как Spark превращает ваш код в выполняемый план 1. **Parser** строит начальный логический план. 2. **Analyzer** подставляет типы и проверяет, что все колонки существуют. 3. **Optimizer (Catalyst)** применяет правила: упрощение выражений, push‑down, reorder joins, constant folding, column pruning. 4. **Physical Planner** выбирает стратегии (BroadcastHashJoin, SortMergeJoin, ShuffleHashJoin), решает, где сортировать, где шардировать. 5. **Whole‑Stage CodeGen (часть Tungsten)** генерирует Java‑класс, который бежит в JVM без аллокаций объектов. 6. **Executors** исполняют код над своими партициями; lineage плана позволяет восстановить партицию при сбое. --- 3. Что физически лежит внутри партиции DataFrame * **In‑memory UnsafeRow** — компактный байтовый массив, где примитивы лежат без Object‑оболочек. * **ColumnarBatch** (векторное хранение, Arrow / Off‑heap) — для операций, которые выгодно делать блоками. * **Off‑heap страницы памяти** управляет Tungsten‑allocator, чтобы уменьшить давление на Java GC. * При `cache()` Spark складирует эти структуры в RAM; если места мало — страница «прольётся» на диск, но схема и статистика сохраняются. --- 4. Как DataFrame «ложится» в файлы при записи * Каждая Task, которая пишет партицию, создаёт **отдельный файл** (для Parquet — `part-0000*.parquet`). * Число файлов ≈ число выходных Task‑ов; можно влиять через `repartition(n)` / `coalesce(n)`. * При `partitionBy("dt")` Spark создаёт подкаталоги вида `dt=2025-05-07/part-000..` — это логические партиции таблицы, не путать с runtime‑partition‑ID. --- 5. Как писать «обычные запросы» * **DSL‑стиль** (PySpark/Scala): ```python from pyspark.sql import functions as F df = spark.read.json("/logs") result = (df.filter(df.level == "ERROR") .groupBy("service") .agg(F.count("*").alias("cnt")) .orderBy(F.desc("cnt")) .limit(20)) result.show() ``` * **SQL‑стиль**: ```python df.createOrReplaceTempView("logs") spark.sql(""" SELECT service, COUNT(*) AS cnt FROM logs WHERE level = 'ERROR' GROUP BY service ORDER BY cnt DESC LIMIT 20 """).show() ``` * Оба пути дают один и тот же план — проверьте `result.explain("formatted")`. --- 6. Ключевые возможности DataFrame, которых нет у RDD * **Predicate Push‑Down** — фильтр «("level" = 'ERROR')» выполняется в Parquet‑сканере, читаются только релевантные Row Group‑ы. * **Column Pruning** — если вам нужны `service` и `cnt`, лишние столбцы не читаются. * **Broadcast Join** — маленькая таблица (< авто‑порога или указана `broadcast`‑хинтом) пересылается целиком на каждый executor, избавляя от shuffle. * **AQE** — после первой волны task‑ов Spark может решить, что одна сторона join‑a всё‑таки мала, и переключить план. * **Whole‑Stage CodeGen** — Spark склеивает цепочку операторов в один большой циклический метод без виртуальных вызовов. --- 7. Нетривиальные «грабли» * **UDF / Pandas UDF** — Catalyst видит только вызов функции → теряются push‑down и vectorized‑I/O; в Python добавляется IPC‑овер‑хед. * **Data skew** — один «жирный» ключ тащит все записи в одну партицию → executor умирает; лечится salt‑ключами или `/*+ skew */`‑хинтом. * **Schema‑on‑Read кэшируется** — если внешняя таблица изменила схему, надо `spark.catalog.refreshTable`. * **caseSensitive** — настройка по умолчанию `false`; смешанный регистр колонок может «переехать» при сохранении в Hive. * **Checkpoint** обрывает lineage и физически записывает данные на HDFS; `cache` — нет, просто держит в памяти/на диске как часть DAG. --- 8. Как конвертировать туда‑сюда * DataFrame → RDD: `df.rdd` — теряете схему и Catalyst. * RDD → DataFrame: `rdd.toDF("col1", "col2")` — придётся явно задать типы колонок. * DataFrame → Dataset (Scala/Java): `df.as[CaseClass]` — получаем compile‑time type‑safety. --- 9. Что обязан знать Data Engineer 1. Читать и интерпретировать физический план (`explain("formatted")`). 2. Управлять shuffle‑параметрами (`spark.sql.shuffle.partitions`, `repartition`, `coalesce`). 3. Максимально избегать UDF, пользоваться встроенными функциями. 4. Настраивать кэш: уровень хранения (`MEMORY_AND_DISK_SER` часто безопаснее) и точку сброса. 5. Понимать, как AQE и Dynamic Partition Pruning меняют план во время выполнения. 6. Следить за skew и размером broadcast‑таблиц; при необходимости задавать хинты. --- 10. Мини‑шпаргалка пошаговой работы 1. **Чтение**: `df = spark.read.parquet("/src")`. 2. **Преобразования**: фильтры → выбор колонок → join‑ы → агрегации (без UDF). 3. **Проверка плана**: `df.explain("extended")`, ищем shuffle и join‑типы. 4. **Тестовая выборка**: `df.limit(1000).collect()` — проверка логики и схемы. 5. **Кэш, если нужно многократно**: `df.persist()`. 6. **Финальный Action**: `df.write.mode("overwrite").partitionBy("dt").parquet("/dest")`. --- 💬 Ответ на прямой вопрос «DataFrame — это отдельный объект или нет?» Да, **это полноценный объект в вашем приложении**, но он — **описание вычислений, а не сами данные**. * Пока не вызвано Action, в памяти лежит лишь логический план (дерево Catalyst). * Момент вызова `show`, `collect`, `write` и т. п. превращает план в задачи, которые бегут на кластере, читают/пишут реальные данные. * Поэтому два разных DataFrame, созданных из одного источника, могут ссылаться на одни и те же партиции на HDFS, но иметь разные фильтры и join‑ы. Теперь у вас есть **полный конспект**: весь материал из первого подробного ответа + пояснения «как работать» и «где лежат партиции» из второго, но без единой таблицы. Если нужно пример «живого» плана `explain` или показать, как выглядят `ColumnarBatch`‑структуры, дайте знать — разберём на коде. 🚦 Где обычно используется Spark (реальные сферы применения) 1️⃣ Batch processing (ETL / ELT) — это основное Что это: обработка больших объёмов данных из разных источников (базы, логи, файлы) и запись в другие системы (DWH, Data Lake, витрины). Где это в банке: Ежедневная агрегация транзакций (например, для антифрода или скоринговых моделей). Объединение разных источников в единую витрину (кластеризация клиентов, поведенческий анализ). Выгрузка в хранилища уровня DWH (Greenplum, Vertica, Snowflake и т.п.) для BI и аналитиков. Почему именно Spark: Обрабатывает десятки/сотни ТБ данных. Не нужно думать о распределении вручную → Spark сам делит задачи на executor-ы. 2️⃣ Stream processing (Structured Streaming) Что это: обработка данных "на лету" с низкой задержкой (до секунд или минут). Где это в банке: Мониторинг в реальном времени подозрительных операций (антифрод-системы). Реакция на рыночные события или ставки (маркетмейкинг, алго‑трейдинг). Потоки данных с API, брокеров или Kafka. Почему Spark: Обеспечивает exactly-once семантику. Можно строить на одном коде и batch, и stream pipelines (unified API). Можно ли RDD → DataFrame → Да, можно и делают это регулярно. Когда это нужно: Прочитал данные как RDD (например, нестандартный формат, лог файл), и хочешь использовать SQL или DataFrame API → нужен DataFrame. Выполнил кастомную тяжёлую обработку в RDD, и хочешь после этого красиво и удобно агрегировать или записывать в Parquet → нужен DataFrame.
53
какие есть действия и трансформации(широкие/узкие)? и что они делают? #я
1. RDD API («сырой» уровень коллекций) 1.1 Узкие трансформации — shuffle не нужен * **`map` / `flatMap` / `filter`** — построчно меняют, разворачивают или отбрасывают элементы; каждая запись остаётся в том же разделе. * **`mapPartitions`** — даёт итератор на целый раздел, поэтому можно пачкой обрабатывать и меньше сериализовать данных. * **`sample(withReplacement = false)`** — случайно выбирает часть строк, просто «выкидывая» их локально. * **`union`** — складывает два RDD без пересортировки. * **`coalesce(shuffle = false)`** — укрупняет разделы, просто «склеивая» файлы; сеть молчит. 1.2 Широкие трансформации — shuffle есть * **`reduceByKey`** — сначала локально агрегирует пары `(K,V)`, потом пересылает частичные результаты и завершает редукцию. * **`groupByKey`** — сразу шлёт все `(K,V)` по сети; самый дорогой вариант группировки. * **`join`** — выравнивает оба RDD по ключу и сращивает строки. * **`distinct`** — чтобы понять, есть ли дубликат, приходится перегруппировать все записи. * **`repartition` / `coalesce(shuffle = true)`** — полностью перераскладывают данные между разделами. * **`sortByKey`** — глобальная сортировка, требует shuffle. 1.3 Часто-используемые действия RDD * **`collect()`** — таскает весь результат в драйвер; опасно на больших объёмах. * **`count()` / `take(n)`** — быстро узнаём размер либо первую выборку строк. * **`saveAsTextFile(...)`** — стримит результат из исполнителей прямиком на диск или HDFS. --- 2. DataFrame / Dataset API (Spark SQL) 2.1 Узкие трансформации * **`select`, `withColumn`, `drop`, `alias`** — меняем набор и имена столбцов локально в каждом разделе. * **`filter / where`, `sample(withReplacement = false)`** — убираем строки или берём подвыборку без пересылки. * **`explode`, `transform`** — расширяем массивы или применяем функции высшего порядка; строки остаются на месте. * **`coalesce`** — уменьшаем число разделов без shuffle (удобно перед записью небольшого результата). 2.2 Широкие трансформации * **`groupBy(...).agg(...)`, `distinct`** — агрегация или глобальное удаление дубликатов требуют перегруппировки. * **`join`** — по умолчанию shuffle с обеих сторон; если одна из таблиц маленькая и Spark решит сделать **broadcast-join**, shuffle затронет только крупную сторону. * **`orderBy / sort`, `repartition`, `repartitionByRange`** — глобальная сортировка или переразбиение. * **Оконные функции `rank().over(...)`, `window`** — shuffle по `partition by`-колонкам, затем локальная сортировка внутри раздела. 2.3 Ключевые действия DataFrame/Dataset * **`show()` / `collect()` / `count()`** — первая команда, которая действительно выполняет план. * **`write.mode(...).save(...)`** — пишет результат в файлы, таблицы или Kafka и запускает расчёт. * **`foreach` / `foreachBatch`** — пользовательский побочный эффект; код исполняется на воркере, а не на драйвере. --- 3. Structured Streaming — главные «рулевые» методы * **`writeStream.start()`** — создаёт Query и запускает микробатчи (или Continuous Processing). * **`foreachBatch`** — даёт вам DataFrame каждого микробатча на обработку «как в офлайн-режиме». * **`awaitTermination()`** — блокирует поток, пока не остановите. * **`stop()`** — плавно закрывает Query и освобождает ресурсы. --- Как быстро понять «узкая» или «широкая» * **Узкая трансформация** — Spark обрабатывает данные там же, где они лежат; сеть не используется, промежуточных файлов нет. * **Широкая трансформация** — данные надо перегруппировать: Spark пишет часть разделов на диск, пересылает по сети и перечитывает их; это дорогая фаза shuffle, к ней относятся внимательно (агрегируют заранее, используют broadcast-join, регулируют число разделов). Этого короткого набора команд хватает для 90 % повседневных задач: видите знакомый оператор — сразу ясно, ждёт ли вас shuffle и какая «цена» у операции на кластере.
54
Как происходит солтинг и какая у него цена теги юрент
Что такое «солтинг» в Apache Spark **Проблема перекоса (data skew).** При `join`, `groupBy` или `reduceByKey` отдельные «горячие» ключи могут тянуть на себя непропорционально много строк. В Spark это приводит к медленным straggler-таскам, которые держат всю джобу. **Идея решения.** Мы добавляем к проблемному ключу случайный «соль» — целое число от 0 до S − 1. Один «жирный» ключ превращается в S более мелких, и Spark распределяет их по разным partition-ам. После вычислений соль убираем, восстанавливая исходный ключ. ```python from pyspark.sql import functions as F S = 10 # количество солей 1. «Малую» таблицу размножаем по всем солям small_salted = (small_df .withColumn("salt", F.expr(f"sequence(0,{S-1})")) .withColumn("salt", F.explode("salt"))) 2. «Большую» снабжаем случайной солью big_salted = big_df.withColumn("salt", (F.rand()*S).cast("int")) 3. Основной join; затем соль удаляем joined = (big_salted .join(small_salted, ["id", "salt"]) .drop("salt")) ``` --- Основные сценарии применения 1. **Join «большая × маленькая»** * Солим ключ на обеих сторонах. * Маленькая сторона дублируется S раз — это дешёвое broadcast-размножение. * Большая получает дополнительный столбец `salt` (4 байта на строку). 2. **Агрегации внутри одной таблицы** (`groupBy` / `reduceByKey`) * Соль добавляется только в этой таблице. * После агрегирования отдельные результаты «склеиваются» обратным преобразованием. 3. **Частичный перекос (несколько горячих ключей)** * Солится только проблемная подмножина. * Остальные ключи остаются как есть, что ограничивает рост данных. --- Накладные расходы * **Рост объёма данных**: малая таблица увеличивается примерно в S раз. * **Shuffle-трафик**: у большой таблицы добавляется 4-байтовое поле; рост трафика обычно невелик относительно исходного объёма. * **CPU**: поток задач для размноженной стороны растёт примерно пропорционально S. * **Планировщик**: число тасков множится; при S ≪ количества доступных ядер это незаметно, но при очень большом S возможен overhead на запуск JVM-тасков. * **Память в Spark UI «Storage»**: дублированные строки могут вытеснить другие датафреймы из кэша. На практике выигрыш от ликвидации straggler-тасков компенсирует эти издержки, если перекос действительно заметный. Правило пальцев: пусть `K = size_max / size_median`. Если K ≫ 1, берут S ≈ K⁄2, но не больше пары десятков, чтобы не «взорвать» план. --- Как подобрать количество солей S 1. **Измерьте перекос** в Spark UI: посмотрите размер самого большого шард-файла или длительность самых медленных тасков. 2. **Прикиньте S**: выберите число между 1 и 32, обычно S ≈ K⁄2 достаточно. 3. **Учтите ресурсы**: S не должен превышать общее число executor-ядер; иначе некоторые соли всё равно окажутся на тех же узлах. 4. **Перепроверьте план**: отключите broadcast-порог (`spark.sql.autoBroadcastJoinThreshold = -1`), чтобы убедиться, что Spark действительно выполняет join так, как вы предполагаете. --- Когда салтинг не помогает * Горячая сторона слишком велика, чтобы её можно было реплицировать. Тогда лучше **map-side aggregation**, **bucketing** или деление набора на префиксы до загрузки в Spark. * Включён **Adaptive Query Execution** (Spark 3.2+) с режимом `skewJoin`. Он может автоматически дробить перекошенные partition-ы; ручной салтинг только усложнит DAG. * **Стриминговые пайплайны** с точно-один-раз обработкой: недетерминизм соли усложняет гарантии. --- Итог Салтинг — это простой ручной приём для равномерного распределения нагрузки при тяжёлом перекосе ключей. Его цена линейно зависит от количества солей: растёт объём размноженной стороны, число тасков и план. Выбирайте S осмысленно, проверяйте Spark UI и ориентируйтесь не на «идеальный баланс», а на минимизацию общего wall-time при разумном потреблении ресурсов.
55
Зачем нужен шафл в спарке? теги юрент
Коротко: **shuffle в Spark — это перестановка (перераспределение) строк между executors/партициями по какому-то правилу (обычно по ключу), чтобы собрать вместе данные, которые должны обрабатываться совместно.** Без него невозможно корректно делать многие «глобальные» операции. Зачем он нужен (что без него нельзя) 1. **Свести все строки одного ключа в одно место.** `groupBy/agg`, `countByKey`, `distinct`, `dropDuplicates`, `window.partitionBy(...)` — результат по ключу зависит от всех строк этого ключа → нужно их собрать на одной партиции. 2. **Сопоставить строки из разных таблиц по ключу.** Обычный `join` (кроме broadcast-join) требует, чтобы строки с одинаковым join-ключом из обеих сторон оказались на одной партиции. 3. **Глобально упорядочить или переупорядочить данные.** `orderBy/sortBy`, `sortByKey`, `range partitioning`, `repartition(n)` — меняют схему партиционирования и/или количество партиций. 4. **Сменить partitioner.** Даже если ключи уже «рядом», но partitioner другой (или число партиций меняется), Spark должен перешардировать данные. Когда он возникает (типичные триггеры) * DataFrame/Dataset API: `groupBy/agg`, `dropDuplicates`, `distinct`, `orderBy`, `repartition`, любые joins без broadcast, `window.partitionBy`. * RDD API: `reduceByKey/aggregateByKey/groupByKey/join/cogroup/sortByKey/repartition`. * В планах Spark SQL это видно как **`Exchange` / `ShuffleExchangeExec`** и граница стадий (stage boundary). Как это работает под капотом (в двух словах) * **Shuffle write (map-side):** таски текущей стадии сериализуют, сортируют/хешируют по целевым партициям, пишут промежуточные файлы (часто со спиллами на диск). * **Shuffle read (reduce-side):** таски следующей стадии тянут по сети нужные им блоки со всех executors, объединяют их (часто с локальной сортировкой/агрегацией). * Отсюда стоимость: **сеть + диск + сериализация + аллокации памяти** и риск **data skew**. Цена shuffle (почему это «дорого») * **Сеть:** fan-out/fan-in между всеми executors. * **Диск:** спиллы и промежуточные файлы. * **CPU:** сериализация/десериализация, сортировка/хеш-агрегация. * **Планировщик:** больше тасков, больше оверхеда. * **Перекос ключей (skew):** горячие ключи забивают одну партицию → ступор всей джобе. Как уменьшать/избегать shuffle * **Broadcast join**, когда одна сторона маленькая: `broadcast(df_small)` или порог `spark.sql.autoBroadcastJoinThreshold`. * **Правильные агрегации:** предпочитать `reduceByKey/aggregateByKey` вместо `groupByKey` (есть map-side combine, меньше данных летит по сети). * **Настройка числа партиций:** * SQL: `spark.sql.shuffle.partitions` (по умолчанию 200, часто стоит уменьшать/увеличивать осознанно). * RDD: `reduceByKey(..., numPartitions)`, `repartition(n)` только когда нужно. * **AQE (Adaptive Query Execution):** `spark.sql.adaptive.enabled=true` — объединяет мелкие партиции, меняет стратегию join, умеет **skew join**. * **Salting для перекоса:** разбивать горячие ключи на под-ключи (это как раз ваш недавний вопрос) → нагрузка распределяется по нескольким reduce-таскам. * **Bucketing/согласованное партиционирование на диске:** когда обе таблицы **забакетированы по одному ключу и одному числу бакетов**, Spark может выполнить join **без shuffle** (или с минимальным). * **`sortWithinPartitions` вместо `orderBy`,** если нужна сортировка только внутри партиции (без глобального порядка). * **`coalesce(n, shuffle=false)`** для уменьшения партиций без shuffle, когда это допустимо. Быстрые ориентиры по операциям * **Точно шиффлит:** `orderBy`, обычный `join` крупных таблиц, `groupBy/agg`, `distinct`, `repartition`. * **Не шиффлит (или может не шиффлить):** `map/filter/withColumn`, `sortWithinPartitions`, `coalesce(n, shuffle=false)`, broadcast-join, bucketing-совместимый join. * **Осторожно:** `groupByKey` почти всегда хуже `reduceByKey/aggregateByKey` (больше данных улетает в сеть). Как увидеть shuffle * `df.explain("extended")` / Spark UI: ищите `Exchange`/`ShuffleExchangeExec` и границы стадий. В RDD — появление `ShuffledRDD` в lineage и разделение на stages в DAG. --- Мини-пример (идея map-side combine против «сырого» shuffle) ```python # Плохо: все значения летят по сети как есть rdd.groupByKey() \ .mapValues(lambda xs: sum(xs)) Лучше: предварительное суммирование на map-стороне уменьшает пересылку rdd.reduceByKey(lambda a, b: a + b) ``` Второй вариант делает предварительную агрегацию на каждой партиции и уже потом шлёт «сжатые» промежуточные суммы. --- **Итог:** shuffle — это необходимый механизм «свести, сопоставить, упорядочить» данные, без которого нет ни агрегатов, ни join’ов, ни глобального сортинга. Он дорогой, поэтому задача инженера — **минимизировать его частоту и объём** (broadcast, правильные агрегаты, AQE, bucketing, tuning партиций) и **смягчать перекос** (salting, skew join).
56
Какое ресур забивает шафл? теги юрент
Главное, что “ломает” кластер во время shuffle в Spark — это не процессоры, а I/O: * **Локальный диск.** Когда map-задачи сортируют и сбрасывают данные, они пишут тонны временных файлов. На медленных HDD время записи и merge-spill’ов быстро становится бутылочным горлышком. *Что делать:* NVMe / SSD, несколько `spark.local.dir`, достаточно свободного места. * **Сеть между узлами.** Reducer забирает кусочки от всех map-тасков, гоняя трафик “каждый-со-всеми”. При 1 GbE линк забивается почти сразу. *Что делать:* перейти на 10 GbE и выше (25 GbE или InfiniBand для реально больших джобов), при необходимости увеличить окно переноса — `spark.reducer.maxSizeInFlight`. * **CPU и память** обычно не лимит: компрессия съедает несколько ядер, но нагрузки нет, а off-heap-выделение памяти + динамический spill контролируют RAM. Коротко: если shuffle «тормозит», почти всегда упираетесь сперва в диск, потом в сеть; ускорение дисков и апгрейт сети дают максимальный выигрыш, тонкая настройка параметров лишь доп-тюнинг.
57
версии спарка. что нового?Точка
Окей, вот как я бы ответил «на собесе» — коротко, живо и по датам, но с реальными фичами. Что нового в Spark (по вехам) **Апрель 2023 — Spark 3.4** * Показали **Spark Connect** (первый прод-клиент для Python) — тонкий клиент, IDE-friendly подключение к удалённому Spark без тяжёлого драйвера. ([spark.apache.org][1]) * **Bloom-join включили по умолчанию** (AQE подмешивает bloom-фильтры для крупных джойнов → меньше лишнего шафла/IO). ([spark.apache.org][1]) * В SQL/DDL: ** **DEFAULT-значения** для колонок, ([spark.apache.org][1]) **Сентябрь 2024 — Spark 3.5** * В PySpark/SQL: **Python UDTF** (табличные UDF, вызываются из `FROM` и коллекцию таблицу а не скаляр) **Май 2025 — Spark 4.0** * Большой мажор: **ANSI-режим по умолчанию** (меньше «тихих» сюрпризов в SQL), ([spark.apache.org][3]) * В SQL: новый тип **`VARIANT`** (полуструктурка в одном столбце), **SQL UDF** ([spark.apache.org][3]) * В PySpark: **нативный `.plot()`** у DataFrame (под капотом Plotly) + **единый профайлинг UDF** и новый **Python Data Source API** — меньше плясок с `toPandas()` для EDA и диагностики. ([spark.apache.org][3], [Databricks][4]) * Spark Connect продолжили полировать; тонкий Python-клиент стал ещё легче (внешние обзоры подчёркивают малый размер), что упрощает деплой. ([spark.apache.org][5], [decube.io][6]) [1]: https://spark.apache.org/releases/spark-release-3-4-0.html " Spark Release 3.4.0 | Apache Spark " [2]: https://spark.apache.org/releases/spark-release-3-5-0.html " Spark Release 3.5.0 | Apache Spark " [3]: https://spark.apache.org/releases/spark-release-4-0-0.html " Spark Release 4.0.0 | Apache Spark " [4]: https://www.databricks.com/blog/introducing-apache-spark-40?utm_source=chatgpt.com "Introducing Apache Spark 4.0" [5]: https://spark.apache.org/docs/latest/spark-connect-overview.html?utm_source=chatgpt.com "Spark Connect Overview - Spark 4.0.0 Documentation" [6]: https://www.decube.io/post/apache-spark-4-release?utm_source=chatgpt.com "Apache Spark 4 | Comparison with previous version"
58
Шаффлинг с 1 экзекьютором возможен? Точка
Да. Даже если у кластера запущен ровно один executor, Spark всё-равно выполнит “wide”-операцию как shuffle-стадию: * задачи map-части запишут shuffle-файлы (hash-/sort-файлы) на лок-диск JVM этого executor’а; * задачи reduce-части тут же их прочитают. Сетевого обмена не будет, но все остальные расходы шафла (серилизация, сортировка, запись → чтение на диске, метаданные MapOutputTracker) сохраняются, так что выигрыш по времени обычно небольший. Если нужен совсем «без-shuffle» вариант, остаётся либо уменьшать количество партиций и работать c narrow-зависимостями, либо переходить в локальный режим `spark.master=local[*]`, где Spark оптимизирует часть накладных расходов.
59
Что такое Commiter? Точка
**Committer (точнее — Output Committer / FileCommitProtocol)** — это слой в Hadoop-экосистеме (MapReduce, Spark, Hive и т. д.), который атомарно «публикует» результаты задания: * Во время выполнения каждая task пишет данные во временный каталог/имя файла. * Когда task завершается успешно, committer вызывает `commitTask()` — переносит (часто «rename») её временные файлы в рабочий каталог job. * После всех task-ов драйвер вызывает `commitJob()`; если хотя бы одна task упала, вызывается `abortJob()` и временные файлы удаляются. Это гарантирует «всё-или-ничего» и решает конфликт спекулятивных задач. ([hadoop.apache.org][1], [hadoop.apache.org][2]) Почему вам должно быть не всё равно: 1. **Безопасность и целостность.** Если job рухнет посередине, старые данные в выходном каталоге не будут повреждены; частичные результаты не попадут «наружу». 2. **Стоимость на object-store.** Стандартный FileOutputCommitter массово «rename’ит» каталоги. На S3/ADLS это дорого: появляются лишние LIST/PUT операции, замедление ×2-×10. Поэтому у Hadoop есть альтернативы — “S3A magic / directory / manifest committers”, а Spark с версии 2.3 умеет подменять их через FileCommitProtocol, чтобы писать напрямую без rename. ([apache.github.io][3]) 3. **Семантика exactly-once.** Коммиттер отделяет «записал» от «опубликовал»; это позволяет Spark’у переливать данные idempotent-но и выдерживать падения на любом этапе. Итого: committer — маленький, но критичный механизм, который делает вывод распределённой задачи устойчивым и атомарным, а на облачных стораджах ещё и сильно влияет на производительность за счёт выбора правильного алгоритма commit. [1]: https://hadoop.apache.org/docs/stable/api/org/apache/hadoop/mapreduce/lib/output/FileOutputCommitter.html?utm_source=chatgpt.com "FileOutputCommitter (Apache Hadoop Main 3.4.1 API)" [2]: https://hadoop.apache.org/docs/r3.4.1/hadoop-mapreduce-client/hadoop-mapreduce-client-core/manifest_committer_protocol.html?utm_source=chatgpt.com "Manifest Committer Protocol - Apache Hadoop 3.4.1" [3]: https://apache.github.io/hadoop/hadoop-aws/tools/hadoop-aws/committers.html?utm_source=chatgpt.com "Committing work to S3 with the S3A Committers"
60
Memory model экзекьютора Точка
**Как Spark делит память внутри каждого executor’а (унифицированная схема, актуальна c Spark 1.6 и по 4.x)** * **JVM-heap** — основная область, на которую вы указываете `spark.executor.memory`. * ***Reserved Memory* ≈ 300 МБ** ­– фиксированный запас под системные структуры Spark; не настраивается. ([spark.apache.org][1]) * Остаток делится конфигурацией `spark.memory.fraction` (по умолчанию 0.6): * **Spark Memory (60 %)** — общее «озеро», из которого динамически берут: * **Execution Memory** — нуждается во времени shuffle-ов, join-ов, sort-ов, агрегатных хеш-таблиц; если кончается, данные проливают на диск. ([Medium][2]) * **Storage Memory** — кешированные RDD/DataFrame-блоки, broadcast-переменные. Её «иммунная» часть управляется `spark.memory.storageFraction` (по умолчанию 0.5). Когда Execution давит, Storage блоки могут быть вытеснены. ([spark.apache.org][3]) * **User Memory (40 %)** — всё, что Spark не трогает: UDF-объекты, сторонние библиотеки, драйверские метаданные. * **Off-heap / Unsafe** — опционально (`spark.memory.offHeap.enabled=true; …size`) хранит те же Execution/Storage структуры за пределами кучи, снижая нагрузку на GC. Входит в `memoryOverhead`. ([Medium][4]) * **Memory Overhead** (`spark.executor.memoryOverhead`, Yarn/K8s) — область вне JVM: нативные буферы, Python-worker-процессы, Akka, Arrow и т. д. Если запускаете PySpark или тяжёлые JNI-библиотеки, именно эту настройку увеличивают. ([blog.det.life][5]) **Ключевые выводы** * Один и тот же пул обслуживает и кэш, и runtime-алгоритмы — это повышает утилизацию памяти, но требует следить за shuffles; при переполнении всё равно будет spill. * Если видите GC-стоп-the-world или OutOfMemory, сначала проверяйте `memoryOverhead` (для Python) и реальное соотношение `execution vs storage` (UI > Storage tab). * При долгоживущих кэшах стоит снизить `spark.memory.storageFraction` или вынести их в off-heap. Так выглядит «модель памяти» одного executor’а — три основных слоя (Reserved → Spark Memory → User) плюс наружный Overhead/Off-heap, где Execution и Storage динамически делят общий кусок. [1]: https://spark.apache.org/docs/latest/tuning.html?utm_source=chatgpt.com "Tuning - Spark 4.0.0 Documentation" [2]: https://medium.com/analytics-vidhya/apache-spark-memory-management-49682ded3d42?utm_source=chatgpt.com "Apache Spark Memory Management" [3]: https://spark.apache.org/docs/latest/configuration.html?utm_source=chatgpt.com "Configuration - Spark 4.0.0 Documentation" [4]: https://medium.com/%40kiranvutukuri/executor-memory-in-apache-spark-part-06-6286d5d01ff4?utm_source=chatgpt.com "Executor Memory in Apache Spark(Part-06)" [5]: https://blog.det.life/memory-management-in-apache-spark-f6a3499c55e2?utm_source=chatgpt.com "Memory Management in Apache Spark - Data Engineer Things"
61
Бакетирование. Какие поля будешь бакетировать, какие не будешь? вб
**Что бакетирую** * Колонки-ключи для частых *equi-join* и `groupBy` (например `user_id`, `order_id`). * Высокая, но не экстремальная кардинальность: десятки тысяч – миллионы уникальных значений на сотни бакетов — это даёт равномерную нагрузку. * Значения стабильны (хэш сегодня = хэш завтра) и почти без `NULL`/heavy-skew. **Что *не* бакетирую** * Поля для диапазонных фильтров (`event_date`, `timestamp`) — здесь лучше партиционирование. * Низкая кардинальность или сильный перекос (`status`, `country_code`) — получится пара «толстых» бакетов и куча пустых. * Изменчивые или текстовые колонки, по которым редко делают точные join’ы; выгоды почти нет. --- Сноски 1. **Что это такое.** Бакетирование (bucketing) делит данные на фиксированное число файлов по хэшу одной или нескольких колонок; это позволяет Spark/Hive читать только нужные файлы и выполнять join без shuffle. ([Medium][1]) 2. **Как сделать.** * PySpark / Scala: `df.write.bucketBy(128, "user_id").sortBy("user_id").saveAsTable("t_users")` — кол-во бакетов и колонки задаются при записи. ([getorchestra.io][2]) * SQL: `CREATE TABLE … CLUSTERED BY (user_id) INTO 128 BUCKETS` — синтаксис одинаков в Spark 4.x и Hive. ([spark.apache.org][3]) 3. **Зачем нужно.** При одинаковом числе и наборе бакетов у двух таблиц Spark может сразу выполнить *bucket-join* без сетевого shuffle, а при выборках — пропустить лишние файлы (bucket pruning). ([kb.databricks.com][4]) 4. **Ограничения.** Число бакетов нельзя менять без полной перезаписи, а сама оптимизация работает только для таблиц, сохранённых через Catalog (managed или external). ([Medium][1]) [1]: https://sachin-s1dn.medium.com/understanding-sparks-bucketby-clause-252d54b395d9 "Understanding Spark’s BucketBy Clause | by Sachin D N | Medium" [2]: https://www.getorchestra.io/guides/spark-concepts-pyspark-sql-dataframewriter-bucketby-examples "Spark Concepts: pyspark.sql.DataFrameWriter.bucketBy Examples | Orchestra" [3]: https://spark.apache.org/docs/latest/sql-ref-syntax-ddl-create-table-datasource.html "CREATE DATASOURCE TABLE - Spark 4.0.0 Documentation" [4]: https://kb.databricks.com/data/bucketing "How to improve performance with bucketing - Databricks"
62
Как зайти на spark ui и что мона тс увидеть? Ламодаа
Окей, коротко и по делу: где открыть Spark UI и что там реально полезно смотреть. Как зайти на Spark UI Зависит от того, где крутится драйвер приложения (именно у драйвера живёт UI). **Локально / на edge-ноде** * По умолчанию: `http://localhost:4040` (если порт занят — 4041, 4042, …). * В ноутбуках (pyspark / Jupyter / Zeppelin) — тот же порт на хосте, где запущена сессия. **YARN** * **client mode**: UI на машине, где запущен драйвер (edge). Открой `http://:4040`. Если снаружи — сделай SSH-тоннель: `ssh -N -L 4040:localhost:4040 user@edge-host` * **cluster mode**: драйвер стартует в контейнере на воркере. Идёшь в YARN RM UI → твоё приложение → ссылка вида `/proxy/application_/` ведёт на Spark UI через прокси. * Завершившиеся джобы смотри в **Spark History Server** (ниже). **Standalone** * Пока приложение живо: `http://:4040`. Если драйвер на воркере — доступ туда (или SSH-тоннель) обязателен. * Отдельно есть UI мастера (`http://:8080`) и воркеров (`:8081`), но это не Spark UI конкретной джобы. **Kubernetes** * UI висит в **driver-pod** на 4040. Быстрый способ: `kubectl port-forward 4040:4040` → `http://localhost:4040` (или поднимай Service/Ingress, если нужно шарить в команде). **История (завершённые приложения) — Spark History Server** * Подними history-server и включи event-логирование: ```bash # В spark-defaults.conf spark.eventLog.enabled=true spark.eventLog.dir=hdfs:///spark-history # или s3a://..., file://... # Старт sbin/start-history-server.sh # Открой http://:18080 ``` * Там будут все приложения, для которых писался event log (UI «как живой», но read-only). --- Что можно увидеть в Spark UI (и зачем) **Jobs** * Список джобов, статус, длительность, DAG-граф. * Быстро понять: какой джоб «завис», откуда тянется долго. **Stages** * Детализация по стадиям: время, вход/выход, **Shuffle Read/Write**, DAG стадии. * Смотри на **долгие стадии** и **большие shuffle** — индикаторы перекосов/плохих джоинов. **Tasks (внутри стадии)** * Распределения длительности/байтов по таскам, ошибки, **Spill (memory/disk)**, **GC time**. * Один сверхдолгий таск = **data skew**; высокий spill = нехватка памяти/слишком широкий join/agg. **Executors** * Память/CPU, активные/заваленные таски, **Storage Memory**, **Shuffle spill**, ссылки на логи executors. * Полезно ловить «токсичный» executor, падения и GC-петли. **SQL (Spark SQL / DataFrame)** * Список SQL-кверей с физпланами, метриками операторов. * В Spark 3 — видны эффекты **AQE** (Adaptive Query Execution): coalesce shuffle partitions, skew join handling. * Ищи узлы с largest input/shuffle, смотри **numPartitions** после AQE. **Storage** * Какие RDD/DF закэшированы, их объём в памяти/на диске. * Видишь, кеш работает или нет, и не забит ли memory pool. **Environment** * Точный набор конфигов (`spark.*`), версия, classpath. * Быстро проверить, что реально применилось (пароли скрываются). **Streaming** (Structured Streaming) * Для каждой query: **input rate**, **processing time**, **batch duration**, **state store** метрики (rows, memory, spills), **watermark/latency**. * Если растёт processing time/lag — смотри stateful операторы и spills. **Event Timeline** * Интерактивная шкала событий (jobs/stages/tasks). Удобно ловить узкие места по времени. --- Что я бы «чекал» в первую очередь 1. **Stages → Summary** * `Shuffle Read (Total)` и `Shuffle Spill (Disk)`. Большие значения → пересобери партиционирование/джоины. * «Long tail» в тасках (ящик с усами): одиночные долгие = **skew** → salting, skew join, AQE. 2. **Tasks** * Collected metrics: **GC time**, **Input/Output size**, **Locality Level** (слишком много `PROCESS_LOCAL` vs `ANY` может намекать на дисбаланс данных/распределение). * Ошибки/ретраи — что валится (OOM, fetch failure, task killed). 3. **SQL** * Физплан: где `BroadcastHashJoin` vs `SortMergeJoin`; убедись, что broadcast реально сработал (и не превышает `spark.sql.autoBroadcastJoinThreshold`). * AQE: coalesced partitions (слишком много маленьких → overhead; слишком мало больших → дисбаланс). * Метрики операторов: у какого — максимум по времени/байтам. 4. **Executors** * Сравни **Task Time** и **Failed Tasks** между executors. Отстающий executor = узкое место/проблема узла. * Storage memory vs cached data — переполнения → лишний кеш выключить/увеличить память. 5. **Streaming** * Рост **input rows/sec** без роста **processed rows/sec** → отставание. * **State operator memory** и **spill** → настроить watermark/state TTL, увеличить `stateStore` память/разбить ключи. --- Полезные конфиги (минимум) ```properties # UI spark.ui.enabled=true spark.ui.port=4040 spark.ui.reverseProxy=true # если ходишь через прокси / YARN proxy spark.ui.view.acls=you,teammate # доступ к UI spark.admin.acls=you История spark.eventLog.enabled=true spark.eventLog.dir=hdfs:///spark-history # или s3a://bucket/history SQL/AQE spark.sql.adaptive.enabled=true spark.sql.autoBroadcastJoinThreshold=128MB # подстрой под кластер ``` --- Если UI «не открывается» * Приложение уже завершилось → смотри **History Server :18080** (нужен включённый event log). * Драйвер не на твоей машине → зайди через YARN proxy / K8s port-forward / SSH-тоннель. * Порт занят → Spark перескочил на 4041/4042. Посмотри логи драйвера: там есть строка с точным URL. * Закрыт фаервол/ACL → попроси открыть 4040 (или пользуйся прокси/порт-форвардом). * UI отключён в конфиге (`spark.ui.enabled=false`) → включить и перезапустить. Если расскажешь, где у тебя Spark (YARN/K8s/Standalone, client/cluster), подкину точную команду/ссылку именно под твой сценарий.
63
Как открыть план запроса в Спарк и на что там обратить внимание? Х5
Как посмотреть план запроса в Spark — кратко и без табличек 1. **Прямо в коде** * Для DataFrame/Dataset: `df.explain()`, `df.explain("extended")` или, в Spark 3 +, `df.explain("formatted")`. * Получите цепочку Parsed → Analyzed → Optimized logical plan и физический план (операторы Spark SQL). 2. **SQL-ноутбук или консоль** * `EXPLAIN [EXTENDED|FORMATTED] SELECT …` выводит ту же информацию для “чистого” SQL. 3. **Во время выполнения** * Spark UI, вкладка **SQL**—виден DAG, объём shuffle, число задач, узловые ресурсы. * Вкладки **Jobs / Stages / DAG Visualization** показывают, где именно тратятся CPU и сеть. 4. **После окончания джоба** * Spark History Server открывает тот же интерфейс в пост-мортем-режиме. --- Ключевые элементы, на которые стоит смотреть в плане * **`Exchange` / `ShuffleExchange`** — каждая такая строчка = широкая зависимость (shuffle). Сеть+диск, потенциальный узкий момент. * **`RepartitionByExpression` / `Coalesce` / `Rebalance`** — явное изменение числа партиций; проверьте, не раздробили ли датасет на тысячи мелких файлов или наоборот. * **Join-операторы** * `BroadcastHashJoin` — дешёвый, если маленькая сторона < `spark.sql.autoBroadcastJoinThreshold` (по умолчанию 10 МБ). * `SortMergeJoin` — две сортировки и два shuffle. * `ShuffleHashJoin`, `SkewedJoin`, `AQEShuffledHashJoin` — смотрите, активен ли AQE. * **Push-down фильтров и колонок** * В формате `explain("formatted")` ищите строку `PushedFilters`. Если пусто, читается больше данных, чем нужно. * **`*(n)` (WholeStageCodegen)** * Символ `*()` значит, что Spark сгенерировал единый кусок JVM-кода. Отсутствие может указывать на UDF или другой блокирующий фактор. * **`InMemoryTableScan`** — план читает данные из кэша. Убедитесь, что кэш заполняется вовремя и помещается в RAM. * **Метки AQE** * В Spark 3 + ищите `adaptive = true`, `Post-Shuffle Partition Coalescer`, `SkewedJoin` — признак того, что план «подкручивается» на лету. --- Мини-чек-лист “быстрого аудита” 1. **Сколько shuffle-барьеров?** Чем их меньше, тем лучше. 2. **Размеры партиций после AQE** — в Spark UI смотрите “Final # Partitions”. Ориентир ≈ 128 МБ ± 20 %. 3. **Типы join-ов**: * Бродкастите то, что укладывается в память. * Если таблица чуть больше порога, попробуйте `hint("broadcast")`. 4. **Сортировка перед `SortMergeJoin`** — обе стороны должны быть уже отсортированы по ключу. 5. **UDF / UDAF** — Java / Scala-UDF выбивают оператор из codegen; попробуйте заменить SQL-эквивалентом или Spark SQL functions. 6. **Проверка push-down** — предикат вида `col = 5` пушится, `func(col) = 5` — нет. 7. **Дисбаланс (data skew)** — во вкладке **Stages** смотрите распределение времени задач; ищите `SkewedJoin` в плане. Лечится salting или AQE-опциями. --- Мини-пример на PySpark ```python df = (spark.read.parquet("s3://bucket/events") .filter(F.col("event_type") == "click") .join(dim_users.hint("broadcast"), "user_id") .groupBy("country") .agg(F.count("*").alias("cnt"))) df.explain("formatted") # самый удобочитаемый вид ``` --- **Рабочий процесс**: сперва читаем план → формируем гипотезы → подтверждаем их в Spark UI числами CPU-секунд, shuffle-байт и пр. Только после этого меняем конфигурации или код.
64
падает экзекьютор — что произойдёт?» как Spark восстановит потерянные данные? собес
Коротко: при падении **executor** Spark помечает его «потерянным», отменяет запущенные на нём таски и **перепланирует их на живые executors**. Данные не «восстанавливаются с диска кластера» магически — они **пересчитываются по lineage** (истории вычислений) либо берутся из уцелевших копий/чекпоинтов/shuffle-файлов. Что именно происходит 1. **Детект отказа** * Драйвер перестаёт получать heartbeats → `ExecutorLostFailure`. * Все таски этого executor’а помечаются failed; незавершённые **map-outputs** и кеш-блоки считаются потерянными. 2. **Повторный запуск задач** * Планировщик кладёт задачи обратно в очередь и **ретраит** их (по умолчанию до `spark.task.maxFailures=4`). * Если потеряны результаты map-стадии, **вся зависимая reduce-стадия** будет передёрнута, а недостающие map-партиции будут **пересчитаны**. 3. **Откуда берутся «потерянные данные»** * **RDD/DataFrame lineage**: Spark знает, как получить партицию из исходных данных и трансформаций, и просто делает это заново. * **Кеш/пPersist**: * `MEMORY_ONLY`, `DISK_ONLY`, `MEMORY_AND_DISK` — данные лежат локально; при падении узла **пропадают** и будут пересчитаны. * `…_2` (репликация) — есть копия на другом executor’е → берётся копия, пересчёт не нужен. * **Shuffle-файлы**: * Если включён **External Shuffle Service** (на YARN/Standalone), файлы переживают смерть executor’а → reduce-таски могут их **дочитать без пересчёта**. * Без внешнего сервиса (типично на Kubernetes) shuffle-файлы умирают вместе с pod → map-стадии **пересчитываются**. * **Broadcast**: переотправляется на новые executors автоматически. * **Checkpoint**: если для RDD/Structured Streaming сделан checkpoint в HDFS/S3, пересчёт идёт **от чекпоинта**, а не от самого начала. 4. **Structured Streaming** * Хранит **offset’ы и state** в checkpoint-директории. При падении executors движок **перезапускает задачи**, поднимает state из стора (на DFS) и читает источники **с последнего зафиксированного батча/эпохи**. * Семантика вывода зависит от sink: idempotent upsert/ACID-таблицы → без дублей; «сырой» файловый sink → возможны частичные файлы до job-commit (Spark их обычно не «засчитывает», но мусор в \_temporary может остаться до чистки). 5. **Крайние случаи и предохранители** * Много последовательных фейлов стадии → abort стадии/джоба (`spark.stage.maxConsecutiveAttempts`). * **Blacklisting** узлов/экзекьюторов (`spark.scheduler.blacklist.*`) снижает «тряску». * **Speculative execution** помогает, если был не отказ, а «залипший» медленный таск. * **Driver** — отдельная история: падение драйвера валит приложение (нужны рестарты/супервизия на уровне менеджера кластера). Практические выводы / что настроить * На проде включайте внешний **shuffle-service** там, где он доступен (или используйте надёжный удалённый/пуш-shuffle), иначе любой падший executor = **пересчёт map-стадий**. * Для важных кэшей используйте **репликацию** (`*_2`) либо **checkpoint** на DFS. * В стриминге всегда задавайте **checkpointLocation** и делайте **идемпотентные** (или транзакционные) записи в sink. * Подберите `spark.task.maxFailures`, `spark.stage.maxConsecutiveAttempts`, включите **blacklisting**, следите за качеством дисков/сети: большинство «потерь» — это про **shuffle I/O**. * Про выходные файлы: используйте корректные **commit-протоколы** (для S3/облаков — манифест/атомарный commit), чтобы не оставлять «хвосты». Если нужно, разберу тот же сценарий «по слоям»: RDD vs DataFrame, batch vs streaming, YARN vs K8s — с конкретными параметрами и типичными логами ошибок (`FetchFailed`, `ExecutorLost`, `TaskKilled`).
65
В чем преимущество/недостаток сериализации данных при кэшировании? Магнит
Коротко: сериализация при кэшировании — это обмен памяти/сетевых байт на CPU и удобство работы с объектами. Она даёт более компактное хранение и переносимость, но увеличивает задержку из-за (де)сериализации и делает данные «непрозрачными» для операций на стороне кэша. Плюсы сериализованного кэша * **Экономия памяти.** Хранение «как байты» обычно заметно компактнее, чем набор живых объектных структур. В Spark это прямо указано: `MEMORY_ONLY_SER` «обычно более экономичен по памяти», но требует больше CPU при чтении. ([spark.apache.org][1]) * **Меньше давления на GC / off-heap.** В многоуровневых кэшах (Ehcache) всё вне heap хранится в сериализованном виде; off-heap позволяет держать большие объёмы и тем самым снижать влияние GC на приложение (ценой (де)сериализации). ([ehcache.org][2]) * **Меньше трафика по сети/диску.** Хорошая сериализация уменьшает объём передаваемых/записываемых данных — это ключевой рычаг производительности распределённых систем (см. раздел про сериализацию в Spark Tuning). ([spark.apache.org][3]) * **Языко- и сервис-нейтральность + эволюция схем.** Форматы вроде Avro/Protobuf/JSON Schema с реестром схем дают совместимость версий и контроль эволюции без «ручных» миграций ключей. ([docs.confluent.io][4]) Минусы сериализованного кэша * **Больше CPU и задержка на доступ.** Любое чтение/запись — это (де)сериализация; в Spark это прямо описано как «более CPU-интенсивно при чтении». ([spark.apache.org][1]) * **Ограниченные серверные операции.** Если вы кладёте объект в Redis как «строку-blob», сервер видит лишь байты — нельзя делать `HGET/HINCRBY` по полям, счётчики и выборки по атрибутам потребуют полного извлечения и парсинга на клиенте. (Строки — это произвольная байтовая последовательность; поля и инкременты доступны у `HASH`.) ([Redis][5]) * **Нет частичного чтения объекта.** Даже чтобы достать одно поле, надо десериализовать всю запись; Ehcache прямо отмечает, что чтение может требовать десериализации ключа/значения. ([ehcache.org][2]) * **Риски совместимости версий.** Меняя схему/класс, можно «сломать» старые байты в кэше, если не управлять совместимостью (решается реестром схем/режимами совместимости). ([docs.confluent.io][4]) * **Безопасность.** Опасно десериализовать недоверенные данные (классическая уязвимость — insecure deserialization для Java/Pickle и т. п.). ([owasp.org][6]) Когда что выбирать (практика) * **Нужна минимальная задержка, данные помещаются в память, операции по полям:** храните **десериализовано** / в структурном виде. Примеры: Spark `MEMORY_ONLY` для «горячих» RDD/DataFrame; в Redis — **HASH/SET/ZSET**, чтобы считать поля/делать инкременты на сервере. ([spark.apache.org][1], [Redis][7]) * **Памяти мало, а пересчитывать/пересылать дорого:** храните **сериализованно**. В Spark: `MEMORY_ONLY_SER` (или `MEMORY_AND_DISK_SER`) + быстрый сериализатор; это рекомендация самого Spark, если не влезает в `MEMORY_ONLY`. ([spark.apache.org][1]) * **Межсервисный/межъязыковой кэш:** берите Avro/Protobuf с управлением схемами (Confluent Schema Registry и т. п.). ([docs.confluent.io][4]) * **Большие объёмы в JVM-приложении и проблемы с GC:** используйте off-heap-слой кэша (он неизбежно сериализует; выигрываете в GC, теряете в CPU). ([ehcache.org][2]) --- Зачем? Чтобы **подогнать кэш под узкое место**: * если узкое место — **память/сеть**, сериализуйте (меньше байтов/объектов); * если узкое место — **CPU/латентность доступа**, храните в «родном» виде/структурах и используйте серверные операции. ([spark.apache.org][3], [Redis][7]) Почему? Потому что сериализация меняет **форму данных** (объекты → байты) и тем самым меняет профиль ресурсов: * байты **дешевле для памяти/сети**, но **дороже для процессора** при каждом доступе и «слепы» для серверной логики; * объекты **дороже по памяти/GC**, но **дешевле по CPU** при частых мелких доступах и позволяют выполнять операции «на месте». Эти соотношения подтверждаются официальной документацией Spark и Ehcache и моделью типов Redis. ([spark.apache.org][1], [ehcache.org][2], [Redis][5]) Если хочешь — дам микро-чеклист под твой стек (Spark/Redis/ClickHouse-согрев, anti-fraud стримы и т. п.) и под твои реальные паттерны доступа. [1]: https://spark.apache.org/docs/latest/rdd-programming-guide.html "RDD Programming Guide - Spark 4.0.0 Documentation" [2]: https://www.ehcache.org/documentation/3.4/tiering.html "Ehcache Tiering Options" [3]: https://spark.apache.org/docs/latest/tuning.html?utm_source=chatgpt.com "Tuning - Spark 4.0.0 Documentation" [4]: https://docs.confluent.io/platform/current/schema-registry/fundamentals/schema-evolution.html?utm_source=chatgpt.com "Schema Evolution and Compatibility for Schema Registry ..." [5]: https://redis.io/docs/latest/develop/data-types/strings/ "Redis Strings | Docs" [6]: https://owasp.org/www-community/vulnerabilities/Deserialization_of_untrusted_data?utm_source=chatgpt.com "Deserialization of untrusted data" [7]: https://redis.io/docs/latest/develop/data-types/hashes/?utm_source=chatgpt.com "Redis hashes | Docs"
66
На каком языке написан фреймворк Spark? Магнит
Коротко: **ядро Apache Spark написано на Scala и работает на JVM; значительная часть кода — на Java.** Для пользователей есть API на Scala, Java, Python (PySpark), SQL и (исторически) R. ([GitHub][1], [spark.apache.org][2]) Зачем Знать “родной” язык движка помогает понимать внутренние ограничения/оптимизации (JVM, сборка мусора, сериализация, планировщик), выбирать корректные тюнинги и объяснять, почему Python-код в Spark — это обёртка над JVM-движком. ([spark.apache.org][3]) Почему это так Репозиторий Spark показывает доминирующую долю Scala в кодовой базе; официальные доки фиксируют, что Spark собирается и запускается на Java/Scala и предоставляет высокоуровневые API для Scala/Java/Python/R. Это подтверждает: внутренности — JVM/Scala/Java, а Python — клиентский API поверх движка. ([GitHub][1], [spark.apache.org][2]) [1]: https://github.com/apache/spark "GitHub - apache/spark: Apache Spark - A unified analytics engine for large-scale data processing" [2]: https://spark.apache.org/docs/latest/?utm_source=chatgpt.com "Overview - Spark 4.0.0 Documentation" [3]: https://spark.apache.org/docs/latest/api/python/user_guide/bugbusting.html?utm_source=chatgpt.com "Chapter 4: Bug Busting - Debugging PySpark - Apache Spark"
67
Как пробросить Python и Java библиотеки в PySpark? Магнит
Короткий ответ: *Java/Scala JAR’ы* подхватываем через `--packages` (Maven-координаты) или `--jars` (локальные/HTTP/HDFS). *Python код* (ваши модули) — через `--py-files`/`SparkContext.addPyFile()`/`SparkSession.addArtifact(pyfile=True)`. *Сторонние Python-зависимости* (pandas/pyarrow/numpy и т.п., особенно с native-кодом) — не через `--py-files`, а через упакованную среду (`conda-pack`, `venv-pack`) по `--archives` либо PEX по `--files` — иначе исполнителям просто нечего будет импортировать. ([spark.apache.org][1]) --- Практика: «как пробросить» 1) JAR’ы и Java/Scala библиотеки **Вариант А — из Maven:** ```bash spark-submit \ --packages org.apache.hadoop:hadoop-aws:3.3.4,com.github.luben:zstd-jni:1.5.6-5 \ app.py # при необходимости: # --repositories https://repo1.maven.org/maven2,https://repos.spark-packages.org ``` `--packages` подтянет артефакты с транзитивными зависимостями, добавит их в classpath драйвера и исполнителей. Можно указывать дополнительные репозитории через `--repositories`. Эти же флаги работают в `pyspark` и `spark-shell`. ([spark.apache.org][1]) **Вариант Б — локальные/удалённые JAR’ы:** ```bash spark-submit \ --jars hdfs:///libs/my-udf.jar,https://host/lib/metrics.jar \ app.py ``` Все JAR’ы из `--jars` автоматически разъедутся по исполнителям и попадут в classpath драйвера и executors. Поддерживаются `file:`, `hdfs:`, `http(s):`, `ftp:` и `local:` URI. ([spark.apache.org][1]) **Точечная настройка classpath (редко нужно):** * Только для драйвера: `--driver-class-path` или `spark.driver.extraClassPath`. * Для executors: `spark.executor.extraClassPath` (обычно не требуется; оставлено для совместимости). ([spark.apache.org][2], [archive.apache.org][3]) 2) Ваш Python-код (модули/пакеты) **Быстрая доставка модулей:** ```bash spark-submit \ --py-files mypkg.zip,utils.py,helpers.egg \ app.py ``` `--py-files` понимает `.py/.zip/.egg` и добавляет их в `sys.path` у executors. То же можно сделать программно: `SparkContext.addPyFile()` или `SparkSession.addArtifact("mypkg.zip", pyfile=True)`. **Важно:** колёса (`.whl`) этим способом НЕ поддерживаются. ([spark.apache.org][4]) 3) Сторонние Python-зависимости (включая native-части) Когда нужны `pandas`, `pyarrow`, `numpy`, драйверы с C/Fortran и т.п., используйте среду: **Вариант A — Conda + conda-pack → `--archives`:** ```bash conda create -y -n pyspark_conda_env -c conda-forge pyarrow pandas conda-pack conda activate pyspark_conda_env conda pack -f -o pyspark_conda_env.tar.gz Дальше запускаем: export PYSPARK_PYTHON=./environment/bin/python spark-submit --archives pyspark_conda_env.tar.gz#environment app.py ``` *В кластерах YARN/K8s не выставляйте `PYSPARK_DRIVER_PYTHON` — он должен быть unset в cluster-режимах.* ([spark.apache.org][5]) **Вариант B — venv + venv-pack → `--archives`:** ```bash python -m venv pyspark_venv source pyspark_venv/bin/activate pip install pyarrow pandas venv-pack venv-pack -o pyspark_venv.tar.gz export PYSPARK_PYTHON=./environment/bin/python spark-submit --archives pyspark_venv.tar.gz#environment app.py ``` Тот же принцип; `venv-pack` делает переносимую архивную среду. ([spark.apache.org][5]) **Вариант C — PEX → `--files`:** ```bash pip install pex pyarrow pandas pex pyspark pyarrow pandas -o pyspark_pex_env.pex export PYSPARK_PYTHON=./pyspark_pex_env.pex spark-submit --files pyspark_pex_env.pex app.py ``` PEX — самодостатимый «интерпретатор» окружения; одинаковая версия Python должна стоять на всех узлах. ([spark.apache.org][5]) --- Частые мини-шпаргалки * **Интерактивно (`pyspark`):** ```bash pyspark --packages com.github.luben:zstd-jni:1.5.6-5 \ --py-files mypkg.zip ``` То же самое можно прописать в `spark-defaults.conf` (напр. `spark.jars.packages=...`, `spark.submit.pyFiles=...`) или передать через `--conf`. ([spark.apache.org][1]) * **Во время выполнения (без перезапуска job):** ```python spark.sparkContext.addPyFile("s3://bucket/mypkg.zip") spark.addArtifact("hdfs:///libs/my-udf.jar", pyfile=True) # PySpark 3.5+ ``` ([spark.apache.org][6]) * **Где JAR’ы оказываются?** Spark сам раздаёт их и включает в classpath и у драйвера, и у executors; на YARN очистку кэша делает фреймворк. ([spark.apache.org][1]) --- Типовые ошибки и как избежать * **«Положил `.whl` в `--py-files`, но модуль не импортится».** `--py-files` не поддерживает `whl`; используйте `--archives` (упакованную среду) или PEX. ([spark.apache.org][7]) * **«Pandas UDF падают: `ModuleNotFoundError: pyarrow`».** Убедитесь, что `pyarrow` есть на *каждом* executor — через conda/venv/PEX/образ Docker. ([spark.apache.org][5]) * **«Дописываю classpath руками, всё ломается».** Предпочитайте `--packages`/`--jars`; `spark.executor.extraClassPath` — крайняя мера. ([spark.apache.org][2]) --- Зачем и почему **Зачем разные механики?** * `--packages` решает доставку **JAR’ов** с транзитивными зависимостями из Maven-репозиториев — меньше ручной рутины и конфликтов. ([spark.apache.org][1]) * `--jars` нужен, когда артефакта нет в репозиториях/он приватный или его надо взять из HDFS/HTTP. ([spark.apache.org][1]) * `--py-files` — быстрый способ «подложить» **ваш** чисто-Python код (без native-частей). Это дёшево и надёжно. ([spark.apache.org][4]) * `--archives` (conda/venv-pack) или `--files` (PEX) — единственный корректный способ тащить **сторонние Python-библиотеки**, включая нативный код, чтобы окружение было одинаковым на драйвере и исполнителях. ([spark.apache.org][5]) **Почему так устроено?** * Исполнители — отдельные процессы/хосты, у них нет ваших `site-packages`. `--py-files` лишь расширяет `sys.path` архивами с `.py/.zip/.egg`; он не «ставит пакеты». Отсюда запрет на колёса и на бинарные расширения. ([spark.apache.org][7]) * Для JVM-мира Spark умеет сам подтягивать зависимости (Maven/Ivy), поэтому `--packages` удобнее и безопаснее, чем ручной classpath. ([spark.apache.org][1]) Если опишешь свою среду (Standalone/YARN/K8s, где запускаешь, есть ли Docker, нужны ли нативные библиотеки) — накидаю готовый шаблон команды и/или `spark-defaults.conf` под твой кейс. [1]: https://spark.apache.org/docs/latest/submitting-applications.html "Submitting Applications - Spark 4.0.0 Documentation" [2]: https://spark.apache.org/docs/latest/configuration.html?utm_source=chatgpt.com "Configuration - Spark 4.0.0 Documentation" [3]: https://archive.apache.org/dist/spark/docs/2.3.3/configuration.html?utm_source=chatgpt.com "Configuration - Spark 2.3.3 Documentation" [4]: https://spark.apache.org/docs/3.5.3/submitting-applications.html?utm_source=chatgpt.com "Submitting Applications - Spark 3.5.3 Documentation" [5]: https://spark.apache.org/docs/latest/api/python/tutorial/python_packaging.html "Python Package Management — PySpark 4.0.0 documentation" [6]: https://spark.apache.org/docs/latest/api/python/reference/pyspark.sql/api/pyspark.sql.SparkSession.addArtifact.html "pyspark.sql.SparkSession.addArtifact — PySpark 4.0.0 documentation" [7]: https://spark.apache.org/docs/3.5.3/api/python/user_guide/python_packaging.html?utm_source=chatgpt.com "Python Package Management — PySpark 3.5.3 ..."
68
Как выполняется процесс трансфера данных при работе Python UDF? Магнит
Ниже — как именно Spark гоняет данные между JVM и Python-процессом при выполнении **Python UDF** (подразумевается PySpark / Spark SQL). Как идет трансфер данных **Базовый (row-by-row) Python UDF (`udf()` без pandas):** 1. На каждом executor’e Spark запускает отдельный *Python worker*-процесс. 2. JVM-план вытаскивает нужные колонки и шлет их воркеру **пакетами** по локальному сокету/pipe; данные сериализуются **pickle** (batched pickling). 3. Python применяет UDF построчно к пакету, результаты **pickled** возвращаются назад в JVM, где дессиарилизуються, приклеиваются как новый столбец и план идет дальше. 4. Воркеры могут **переиспользоваться** между тасками, чтобы не пересоздавать процесс и не перетаскивать заново большие broadcast’ы (`spark.python.worker.reuse=true`). ([Apache Wiki][1], [Apache Spark][2]) **Vectorized / Pandas UDF (`pandas_udf`):** 1. Перед вызовом Python Spark конвертирует требуемые колонки в **Apache Arrow** (колонарный формат), режет на батчи. 2. Батчи Arrow отправляются в Python-воркер; внутри они превращаются в `pandas.Series`/`DataFrame`, функция отрабатывает **векторно по батчу**. 3. Результат обратно упаковывается в Arrow и возвращается в JVM. Это резко снижает накладные расходы на (де)сериализацию и копирования по сравнению с pickle. ([Apache Spark][3]) **Control-plane vs data-plane:** Связь драйверского Python-кода с JVM идет через **Py4J**, но **крупные данные UDF не гоняются через Py4J** — для них используются отдельные двунаправленные каналы (сокеты/пайпы) между JVM и Python-воркерами на executors. ([Apache Wiki][1]) **Современные оптимизации (Spark 3.5+ / 4.x):** В последних релизах появились **Arrow-optimized Python UDFs** (ускоренный путь даже для «обычных» UDF за счет Arrow-батчей), что уменьшает overhead против классических pickled UDF. ([Databricks][4]) Что реально пересылается * **Код UDF и зависимости** — доставляются на executors (например, через `--py-files`, `spark.submit.pyFiles`, `addFile`). * **Входные данные** — пачками (pickled или Arrow) из JVM в Python. * **Broadcast-переменные** — один раз в процесс Python; экономится при `spark.python.worker.reuse=true`. * **Результат** — батчи обратно (pickled или Arrow). ([Apache Spark][2]) Важные настройки и режимы * `spark.python.worker.reuse=true` — удерживает воркер живым между тасками, экономит время старта и пересылки больших broadcast’ов. ([Apache Spark][2]) * `spark.sql.execution.arrow.pyspark.enabled=true` — включает Arrow-путь для Pandas UDF и конверсий DF↔pandas. * Размер батчей для Arrow управляется через Arrow-параметры (напр., `maxRecordsPerBatch` в руководстве по Arrow). ([Apache Spark][3]) * Pandas UDF официально определены как «векторизованные UDF, использующие Arrow для трансфера и pandas для обработки». ([Apache Spark][5]) Краткая «схема» выполнения 1. Catalyst строит план → вставляет узел «выполнить Python UDF» (для Pandas UDF — Arrow-ветка). 2. Executor читает данные партиции → сериализует пакет → шлет Python-воркеру. 3. Python-воркер считает UDF (построчно для обычного UDF, векторно для Pandas UDF). 4. Возвращает пакет результатов → JVM приклеивает столбец → следующий оператор плана. ([Apache Wiki][1], [Apache Spark][3]) Почему это важно (узкие места) * **Pickle + GIL + межпроцессный hop** делают классические UDF медленными: много мелких (де)сериализаций и вызовов через границу языков. * **Arrow-колонарность + батчи** резко уменьшает число переходов и копий, раскрывая пропускную способность CPU/памяти. * Даже с Arrow UDF **хуже планируются**, чем нативные выражения Spark (теряется часть pushdown/whole-stage codegen), поэтому при возможности предпочитайте **встроенные функции** или SQL-выражения; Pandas UDF — второй выбор, обычные pickled UDF — как крайняя мера. ([Apache Spark][3]) --- Зачем Понимание механики трансфера позволяет правильно выбирать инструмент: где хватит встроенных функций, где оправдан Pandas UDF, а где обычный UDF убьет производительность из-за сериализации. Это напрямую влияет на SLA и стоимость кластера. ([Apache Spark][3]) Почему Потому что граница JVM↔Python — главный источник накладных расходов. Arrow минимизирует копирования и (де)сериализации (колонарный формат), `worker.reuse` сокращает прогрев окружения и пересылку broadcast’ов, а отказ от UDF в пользу нативных операторов вообще устраняет границу языков. ([Apache Spark][3]) > Если имелся в виду другой движок (не Spark), скажи какой — распишу его путь данных так же подробно. [1]: https://cwiki.apache.org/confluence/display/SPARK/PySpark%2BInternals/ "PySpark Internals - Spark - Apache Software Foundation" [2]: https://spark.apache.org/docs/latest/configuration.html "Configuration - Spark 4.0.0 Documentation" [3]: https://spark.apache.org/docs/latest/api/python/tutorial/sql/arrow_pandas.html "Apache Arrow in PySpark — PySpark 4.0.0 documentation" [4]: https://www.databricks.com/blog/arrow-optimized-python-udfs-apache-sparktm-35?utm_source=chatgpt.com "Arrow-optimized Python UDFs in Apache Spark™ 3.5" [5]: https://spark.apache.org/docs/latest/api/python/reference/pyspark.sql/api/pyspark.sql.functions.pandas_udf.html "pyspark.sql.functions.pandas_udf — PySpark 4.0.0 documentation"