Spark Flashcards
(68 cards)
Что такое Spark? Зачем нужен? Где использовали?
Теги: #wildberries
Spark – это распределённая вычислительная платформа для быстрой обработки больших данных (batch/stream). Используется для ETL, аналитики, machine learning и т.д. Применяется во множестве компаний (Netflix, Uber, e-commerce), где важна высокая скорость анализа данных.
Spark Streaming: near real-time или real-time? Сравнение Spark vs Flink
Теги: #wildberries
Коротко: по умолчанию Spark Structured Streaming — это near-real-time (микробатчи с типичными задержками от сотен миллисекунд до секунд). В Spark есть два пути к меньшим задержкам: экспериментальный Continuous Processing (\~1 мс, но сильно ограничен по операциям) и новый в Databricks Real-time mode (паблик превью, миллисекундные p99 без переписывания кода, только в Databricks). Flink — потоковый движок “record-by-record”, рожденный для миллисекундных задержек и сложных stateful сценариев. (spark.apache.org, Microsoft Learn, Databricks)
Что обычно называют “real-time”
- Hard real-time (жёсткие дедлайны в микросекундах/миллисекундах, детерминизм) — ни Spark, ни Flink под это не заточены.
- Soft/near real-time — практично для бизнеса: задержки от \~10–500 мс до нескольких секунд, с фолт-толерантностью и exactly-once/at-least-once. Под это попадают и Spark, и Flink (с разными профилями задержек).
Модель исполнения и задержки
- Spark Structured Streaming: микро-батчи по триггеру (например, каждые 1–10 с; можно настраивать до \~100 мс). Есть Continuous Processing (\~1 мс, experimental, поддерживает только map/фильтры, без агрегаций/джойнов) и Real-time mode в Databricks (паблик превью, p99 до единиц–десятков мс). (docs.databricks.com, spark.apache.org, Databricks)
- Flink: истинный streaming “по событию”, с backpressure и чекпоинтами; для super-low-latency сценариев рекомендуют at-least-once (несколько мс), для exactly-once задержка возрастает из-за транзакций/чекпоинтов. (nightlies.apache.org)
Семантика времени/порядка и состояние
- Оба движка поддерживают event-time и watermarks для работы с опоздавшими событиями. У Flink — это центральная модель (богатые стратегии watermarks, таймеры, fine-grained контроль). (nightlies.apache.org)
-
Состояние:
- Spark — state store с чекпоинтами; есть RocksDB state store provider (встроен с Spark 3.2; на Databricks включается конфигом). (spark.apache.org, docs.databricks.com)
- Flink — ключевое состояние (часто в RocksDB), инкрементальные чекпоинты, таймеры на ключах. (nightlies.apache.org)
Гарантии доставки/записи
- Spark: end-to-end exactly-once достижим при “реплейбл” источниках и идемпотентных/транзакционных сингках (Kafka/Delta и т. п.) за счёт отслеживания оффсетов + WAL/чекпоинты (в микробатч-режиме). Continuous — только at-least-once. (spark.apache.org)
-
Flink: exactly-once через интеграцию чекпоинтов с 2-фазным коммитом в сингках (KafkaSink
DeliveryGuarantee.EXACTLY_ONCE
; общее APISupportsCommitter
/Two-Phase Commit). (nightlies.apache.org)
Практическая «рыба»: когда что брать
Бери Flink, если нужно:
- p99 < 100 мс на stateful-операциях (джойны/окна/таймеры), сложная работа с out-of-order, тонкая реакция на backpressure. (nightlies.apache.org)
Бери Spark, если:
- основная экосистема уже на Spark (batch/SQL/ML), SLA — секунды/сотни мс, нужны единые API/каталог/оркестрация;
- хочешь попробовать миллисекунды без ре-платформинга в Databricks (Real-time mode, превью). В OSS можно Continuous, но он сильно ограничен. (Databricks, spark.apache.org)
Вывод
- Ответ на “Spark Streaming: near real-time или real-time?” — по умолчанию near-real-time. Реальный real-time (миллисекунды) возможен: (а) в OSS Spark — лишь для узкого класса задач через Continuous (at-least-once), (б) в Databricks Real-time mode (превью) — для более широкого набора задач без переписывания кода. (spark.apache.org, Databricks)
- Flink — ваш выбор, если главная цель — милисекундная реакция на сложные event-time/состояние-тяжёлые пайплайны с гибкой обработкой out-of-order. (nightlies.apache.org)
Если хочешь, дам конкретный «чек-лист» под твой кейс (тип источников/синков, SLA p95/p99, объём/кардинальность состояния, требования к семантике).
Что знаешь про PyArrow? Как взаимодействует со Spark?
Теги: #wildberries
PyArrow — это Python-обёртка для Apache Arrow, формата колоночного хранения данных в памяти, который оптимизирован для быстрого доступа и передачи данных между процессами или системами.
В контексте Spark он используется в основном в двух местах:
Arrow-based оптимизация Pandas UDFs (Vectorized UDFs) Когда мы используем Pandas UDF (а не обычные Python UDF), Spark сериализует данные в формате Arrow, что позволяет избежать затратных сериализаций в pickle и делает передачу данных между JVM и Python значительно быстрее. toPandas() и fromPandas() При вызове .toPandas() Spark тоже использует Arrow (если включено через spark.sql.execution.arrow.pyspark.enabled), что ускоряет конвертацию Spark DataFrame в Pandas DataFrame.
Важно упомянуть edge cases для собеса (показывает зрелость):
Arrow не поддерживает все типы Spark. Например, сложные nested типы или специфичные типы (MapType, ArrayType со сложной вложенностью) могут привести к падениям или отключению Arrow. Arrow работает только с Batch-ориентированными задачами, для Streaming — не применим.
Функции трансформации и действия, в чём разница, примеры?
Теги: #Ярослав #Билайн, #rubbles
В Apache Spark существуют два основных типа операций: трансформации и действия. Они имеют разные характеристики и поведение при работе с RDD, DataFrame и Dataset.
Трансформации
Определение: Трансформации — это операции, которые преобразуют один RDD (или DataFrame, Dataset) в другой. Они ленивые (lazy), что означает, что они не выполняются немедленно, а откладываются до тех пор, пока не будет вызвано действие.
Примеры трансформаций:
map(func): Применяет функцию к каждому элементу RDD и возвращает новый RDD.
filter(func): Возвращает новый RDD, содержащий только элементы, для которых функция возвращает True.
flatMap(func): Применяет функцию к каждому элементу RDD и возвращает новый RDD, где каждый элемент может быть разделен на несколько элементов.
groupByKey(): Группирует значения по ключу и возвращает новый RDD с ключами и списками значений.
Действия
Определение: Действия — это операции, которые возвращают результат или производят побочный эффект. Они вызывают выполнение всех трансформаций, которые были применены к RDD до момента вызова действия.
Примеры действий:
collect(): Собирает все элементы RDD и возвращает их в виде списка.
count(): Возвращает количество элементов в RDD.
first(): Возвращает первый элемент RDD.
reduce(func): Объединяет элементы RDD с использованием указанной функции.
saveAsTextFile(path): Сохраняет RDD в виде текстового файла по указанному пути
Что такое Spark Catalyst и как работает?
Теги: #Ярослав
Catalyst – это оптимизатор запросов в Spark SQL. Он строит деревья логического и физического плана, применяет правила оптимизации (pushdown, reordering джоинов и т.д.), а затем формирует оптимальный план выполнения. Основные фишки - Анализатор (Analyzer), Логический план (Logical Plan) Оптимизатор (Optimizer) Физический план (Physical Plan) Выбор плана и генерация кода.
Он проходит четыре основных этапа:
Analysis (Анализ) Проверка синтаксиса и разрешение всех ссылок (например, имен таблиц и столбцов) до конкретных объектов. Если что-то не существует — ошибка на этом этапе. Logical Optimization (Логическая оптимизация) Применяются правила оптимизации, например: Упрощение выражений (WHERE 1=1 убирается) Удаление ненужных колонок (Project pruning) Push down фильтров (фильтры ближе к источнику) Physical Planning (Физическое планирование) Создание возможных физических планов (разные способы join, partitioning) и выбор лучшего по стоимости (Cost Model). Code Generation (Whole-stage code generation) Spark генерирует Java bytecode для выполнения, что значительно ускоряет исполнение (уменьшает количество вызовов JVM).
Какие есть коннекторы у Spark? Особенности коннектора к Greenplum?
Теги: #wildberries
Общие коннекторы
* JDBC: универсальный способ для подключения к реляционным базам (PostgreSQL, MySQL, Greenplum и т.д.).
* Kafka: чтение/запись стриминговых данных.
* HDFS/S3: работа с файлами в распределённом хранилище.
* Cassandra и прочие NoSQL-хранилища.
Greenplum
* Через JDBC: требуется соответствующий драйвер и указание url, table, user, password. Работает «из коробки», но не всегда максимально эффективно для больших данных.
* PXF (Parallel eXtensible Framework): специальный коннектор, который умеет параллельно выгружать и загружать данные в Greenplum. Он устанавливается на стороне Greenplum и позволяет Spark обращаться к Greenplum, используя параллельные потоки (каждый сегмент Greenplum может читать/записывать данные самостоятельно).
Основная особенность PXF — параллелизм: данные распределяются по сегментам Greenplum, а Spark читает и пишет в эти сегменты напрямую, что даёт высокую скорость обмена.
Spark: чему соответствует каждая job в коде?
Теги: #Cian
В Spark есть две ключевые концепции:
Трансформации (transformations) — «ленивые» операции (map, filter, join и т.д.), которые только формируют план вычислений (DAG), но сами не запускают обработку данных.
Действия (actions) — операции, которые действительно запускают вычисления (например, count, collect, show).
Каждый раз, когда в коде встречается action, Spark запускает новую job.
Одна job может включать в себя несколько stages — этапов выполнения, разбитых по границам shuffle (см. вопрос про stages).
Из-за чего job разделяется на одну и несколько stages?
Теги: #Cian
Spark разбивает job на stages, когда возникает операция, требующая shuffle – широкая трансформация.
- Нarrow-трансформации (map, filter, etc.) не требуют shuffle: данные передаются по цепочке (pipe-line) в рамках одного executor.
- Wide-трансформации (reduceByKey, join, groupBy и т.д.) вызывают shuffle, то есть нужно перегруппировать данные между executor’ами.
Как только в плане встречается shuffle, Spark завершает текущий этап (stage) и начинает новый после shuffle. Поэтому job может иметь один или несколько stages.
Какие есть типы джойнов в Spark (логические и физические)? Как они работают?
Теги: #Cian, #Билайн, #мир #Ярослав
- Логические типы join‑ов и их точная семантика
-
inner
Возвращает строки, где условие истинно для обеих сторон.
‑ Дубликаты и порядок строк не гарантируются.
‑ Строка сNULL
в ключе обычно «пролетает» (условие=
даётUNKNOWN
, а неTRUE
)([Apache Spark][1]). -
left(left outer)
Все строки левой, плюс столбцы правой, когда есть совпадение, иначеNULL
.
Часто используют как «enrich‑lookup» и как промежуточный этап в CDC‑процедурах — несовпавшие строки легко выловить черезWHERE right.key IS NULL
([Apache Spark][1]). -
right(right outer)
Симметричен левому; в Spark используется реже, потому что тот же эффект даютleft
после перестановки сторон([Apache Spark][1]). -
full(full outer)
Объединяет оба множества целиком; не‑совпавшие стороны получаютNULL
.
Полезен в DI‑процессах («что исчезло / появилось между двумя снапшотами?»)([Apache Spark][1]). -
cross
Декартово произведение без условия. Spark требует либо явногоcrossJoin
, либо конфигурацииspark.sql.crossJoin.enabled=true
. Кол‑во строк = N × M, поэтому план строго проверяется оптимизатором([Apache Spark][1]). -
left_semi
«Фильтр по существованию»: из левой берутся только те строки, что нашли пару справа, причём в результате остаются только левый набор столбцов.
‑ Используется вместоEXISTS
,IN (…)
, «semi‑lookup» перед тяжёлым джойном.
‑ Дубликаты левой стороны сохраняются; правая сторона не выводится([Apache Spark][1]). -
left_anti
Антипод semi: возвращает строки левой, для которых пары нет. Это удобнее, чемLEFT JOIN … WHERE right.key IS NULL
, потому что противодействует случайным дубликатам справа и короче пишется.
Применяется для «NOT EXISTS», дедупликации «новых» записей и т. д.([Apache Spark][1]).
> Правых semi/anti нет; при необходимости меняют местами таблицы.
- Как Catalyst превращает логику в физику—алгоритмы, шагзашагом
2.1 BroadcastHash Join(Map side join)
- Оценивается размер меньшей стороны; если ≤
autoBroadcastJoinThreshold
(10 MiB по умолчанию) или стоит хинтbroadcast
, Spark собирает эту часть на драйвер и рассылает executors как broadcast‑variable([Apache Spark][2]). - В каждом task строится неизменяемая хеш‑таблица.
- Большая сторона потоково сканируется, ключ пробивается в хеше, совпадения сразу эмитятся.
Плюсы: нулевой shuffle → минимальный сетевой трафик; O(N) по большой стороне.
Минусы: риск OOM при недооценке объёма.
2.2 Sort‑Merge Join(SMJ)—«Сначала разложи, потом отсортируй, потом склей»
-
Shuffle‑по ключу
Spark пересылает строки обеих таблиц так, чтобы все записи с одним и тем же хэш ключом оказались вместе (хэш партицирование). У каждой строки берут join‑ключ, считают hash(key), затем по модулю числа партиций определяют «коробку». Все строки с одним и тем же ключом попадают в одну коробку (на один executor). Это и называется «шардинг по ключу». -
Sort‑внутри каждой «почты»
Уже локально executors сортируют партициию по ключу. Сложность \~N log N
на партициию. -
Merge‑два курса
Теперь в каждой партиции у нас два отсортированных списка (левая и правая таблица). Два указателя «бегут» синхронно, находят одинаковые ключи и сразу пишут результат. Это уже линейно (O(N+M)
) — сами данные больше не перемещаются. ([SparkCodehub][1])
Когда брать SMJ
- Ключевое условие — равенство (
=
). - Обе стороны слишком велики для broadcast.
- Есть место на диск/SSD для временных файлов сортировки.
- Плохо, если один ключ встречается миллион раз: возникнет «толстая» партиция; AQE обычно её дробит.
2.3 Shuffle Hash Join(SHJ)—«Разложи, построй хеш‑таблицу, пробивай»
-
Shuffle‑по ключу
Та же пересылка, что и в SMJ: строки разъезжаются к executors поhash(key)
. -
Выбери «малую» сторону
Внутри партиции Spark смотрит, какая из двух половинок меньше (после фильтров). Её грузят в оперативку и строят хеш‑таблицу:ключ → список строк
. -
Стриминг второй стороны
Вторую (большую) таблицу читают построчно: для каждой строки ключ ищется в хеше, совпадения сразу пишутся в выход.
Если мини‑таблица неожиданно не влезла в память, Spark проливает её на диск частями («Grace‑loop») и повторяет шаг 3 по диапазонам хеша. Поэтому отдельной стратегии «Grace Hash» в плане нет — это резервный режим SHJ. ([Medium][2], [Medium][3])
Когда SHJ выгоден
- Ключ‑равенство, как и в SMJ.
- После фильтров хоть одна сторона помещается в ОЗУ одного task‑а.
- Меньше работы, чем сортировка, но чувствителен к ООМ: хеш‑таблица живёт в RAM.
2.4 Broadcast Nested Loop Join(BNLJ)—«Раздай словарик и перебери»
-
Малую сторону broadcast
Spark целиком копирует маленькую таблицу в память каждого executor‑а (ограничение — размер ≤autoBroadcastJoinThreshold
, дефолт 10 МиБ, либо явный хинтbroadcast
). -
Для каждой строки большой — цикл по маленькой
Исполняется буквальный вложенныйfor
: берём строку из большой таблицы и сравниваем с каждой строкой маленькой, применяем условие (=
,<
,BETWEEN
,UDF — любой предикат). -
Если условия нет → Cross
Когда предикат опущен, Spark из того же плана делает чистый декартов продукт.
Такая стратегия включается автоматом для неравенственных (<
, >
, !=
) джойнов или когда оптимизатор не смог использовать хеш/сорт‑мердж. Она проста, но сложностьO(N × b)
, где b
—маленькая таблица, поэтому работает только при реально малых объёмах. ([Stack Overflow][4], [bigdatainrealworld.com][5])
2.5 CartesianProductExec(Cross Join) — «Перебираем всё со всем»
Никакого условия, никакой оптимизации:
- Spark сознательно требует
df.crossJoin(df2)
или включённый флагspark.sql.crossJoin.enabled=true
, чтобы защититься от случайных ошибок. - Кол‑во строк =
N × M
. - Используется в тестах, при генерации комбинаций или если BNLJ был принудительно отключён. ([Stack Overflow][6])
- Оптимизации, влияющие на выбор стратегии
- Bucket‑join— предрассортировка и бакетизация обеих таблиц: Spark может пропустить shuffle и сразу стартовать SMJ.
-
Skew‑join mitigation (AQE)— если во время выполнения партиция оказывается аномально тяжёлой, Spark дробит её и дублирует малую сторону; включается флагом
spark.sql.adaptive.skewJoin.enabled
([Apache Spark][2]). - DynamicPartitionPruning— подзапрос с semi/inner‑join отдаёт список ключей, по которым Spark отбрасывает лишние партиции до реального джойна.
Куда это всё «пришивается» к логике
Catalyst идёт сверху вниз:
- Пытается BHJ—если хотя бы одна сторона broadcast‑able.
- Если это екви‑джойн, проверяет SHJ vs SMJ (выбор зависит от
spark.sql.join.preferSortMergeJoin
, размера и флагов AQE). - Если условия сложные—берёт BNLJ; если условий нет—Cartesian.
- После запуска AQE может динамически переключить SMJ → BHJ или включить skew‑split, если рантайм‑статистика это позволяет([Apache Spark][2]).
Как выбрать алгоритм?
1 — смотрим на размер данных:
Небольшая таблица → Broadcast Join
Большие таблицы → Sort-Merge или Shuffle Hash
2 — смотрим на структуру данных:
Отсортированы → Sort-Merge
Skewed-ключи → Skew Join
3 — инфраструктура:
Spark → используйте хинты (BROADCAST, MERGE)
Реляционная СУБД → опирайтесь на планировщик и индексы
❕Запомнить:
Nested Loop и Hash Join - база для СУБД. Broadcast и Sort-Merge - основа для распределенных систем. Всегда анализируйте план выполнения (explain в SQL, df.explain() в Spark) + метрики (память, сеть).
client / cluster mode в спарке
Теги: #Cian
В client mode драйвер запускается локально, а executors – в кластере. В cluster mode драйвер также запускается внутри кластера (например, на Yarn или Kubernetes).
Параметры spark submit
Теги: #Ярослав
Основные:
* Spark Submit — это команда, используемая для запуска приложений Apache Spark на кластере. Она принимает множество параметров, которые позволяют настраивать поведение приложения и управлять ресурсами. Вот основные параметры spark-submit:
–class: указывает главный класс приложения.
–master: определяет мастер-узел Spark
–deploy-mode: режим развертывания приложения: client или cluster.
–name: задает имя приложения в интерфейсе управления Spark.
–executor-memory: определяет объем памяти для каждого исполнителя
–driver-memory: объем памяти для драйвера
–conf: позволяет задать дополнительные настройки Spark.
–executor-cores: количество ядер CPU для каждого исполнителя.
–num-executors: общее количество исполнителей
–files: передает дополнительные файлы в приложение, которые будут доступны в рабочей директории
Чем кэширование отличается от broadcast join?
Теги: #мир
Кэширование (cache/persist)
1. Сохраняет результат вычислений (RDD/DataFrame) в оперативной памяти (или на диск, если не хватает памяти)
2. Позволяет переиспользовать уже рассчитанные данные при повторных обращениях, сокращая время вычислений
3. Типичное применение: несколько разных действий (actions) на одном и том же наборе данных
Broadcast Join
1. «Рассылает» (broadcast) маленькую таблицу/датасет на все узлы кластера, где она хранится в памяти
2. Ускоряет join с большой таблицей, поскольку не требует shuffle
3. Типичное применение: одна таблица очень мала, а другая – большая
Это два разных механизма оптимизации: кэширование помогает при многократном использовании одного набора данных, а broadcast join ускоряет соединение (join), когда один из датасетов невелик.
Что такое spill?
Теги: #Ярослав
Скорость спарка в основном завязана на вычислениях в оперативной памяти. Если данные не помещаются в ОЗУ, происходит spill - сохранение промежуточных результатов на диск, который в разы медленнее оперативной памяти. Может происходить как при вычислениях на executor’е, так и при сборе всех результатов на driver. Решение - увеличивать количество кусочков, на которые делятся данные (партиций) или увеличивать лимиты ОЗУ.
Чем отличаются RDD, DataFrame и Dataset? В чём особенности каждого?
Теги: #мир, #Билайн #Ярослав
RDD (Resilient Distributed Dataset) — низкоуровневая структура данных в Spark, которая предоставляет API для работы с распределенными данными. RDD immutable (неизменяемый) и поддерживает ленивые вычисления. Подходит для гибких, нестандартных операций с данными, но не имеет оптимизаций и схемы, что снижает производительность.
DataFrame — это схематизированный набор данных, представленный в виде таблицы с колонками и строками. DataFrame поддерживает API с SQL-подобными операциями, использует Catalyst-оптимизатор и Tungsten для выполнения запросов, что повышает производительность. DataFrame легче использовать для структурированных данных и анализа, но он менее гибок, чем RDD, и не поддерживает строгую типизацию.
Dataset — объединяет свойства RDD и DataFrame, добавляя строгую типизацию данных (type-safe). Dataset компилируется с проверкой типов, использует Catalyst и Tungsten, что улучшает производительность. Поддерживает SQL-подобный API и предоставляет методы RDD для трансформаций. Полноценно доступен только в Scala и Java, что ограничивает использование в Python.
Краткое сравнение
RDD: Гибкость, отсутствие схемы, нет оптимизаций. Подходит для нестандартных и сложных вычислений.
DataFrame: SQL-подобный API, автоматические оптимизации, высокая производительность. Предпочтителен для структурированных данных.
Dataset: Строгая типизация, производительность DataFrame, поддержка RDD API. Лучший выбор для больших типизированных данных в Scala и Java.
Какие бывают типы кэширования?
Теги: #мир
Memory Only
o Сохраняет данные в оперативной памяти, при нехватке – пересчитывает недостающие партиции.
Memory and Disk
o Пытается сохранить данные в память, а излишек складывает на диск.
Disk Only
o Сохраняет данные только на диск, не используя память.
Memory Only Serialized
o Хранит данные в памяти в сериализованном виде (например, с помощью Kryo), что экономит память, но требует затрат на сериализацию/десериализацию.
Memory and Disk Serialized
o Аналогично предыдущему пункту, но при нехватке памяти выгружает сериализованные данные на диск.
При выборе метода учитывают: объём данных, скорость доступа и избыточность вычислений (что «дешевле» – пересчитать или сохранить?).
Как выбрать количество ядер на executor? Как увеличение числа ядер влияет на память (OutOfMemory)?
Теги: #мир
Короткий ответ
Количество ядер (spark.executor.cores
) — это прежде всего про параллелизм внутри одного JVM-процесса. Чем больше ядер, тем больше задач (tasks) могут выполняться одновременно в одном executor’е, а значит тем выше суммарное потребление его памяти. Если память, фактически нужная всем одновременным задачам и внутренним буферам Spark, превышает то, что выделено параметрами spark.executor.memory + spark.executor.memoryOverhead
, JVM умирает с java.lang.OutOfMemoryError
.
Ниже — разбор по пунктам, без упрощений и «магических» чисел.
- Что физически происходит, когда вы прибавляете ядра
-
Каждое ядро ≈ один рабочий поток Spark
Executor создаёт пул, и TaskScheduler отдает в него доspark.executor.cores
задач. -
У каждой задачи свой набор структур:
- объектные строчки RDD/Dataset, пока они не будут отброшены из mem-store;
- массивы shuffle-буферов (Sort-Shuffle, Tungsten-shuffle writer);
- off-heap страницы для сортировки (если включён
spark.memory.offHeap.enabled
); - в PySpark — отдельный Python-процесс и его heap.
-
Память внутри JVM жёстко поделена:
-
Unified Memory (storage + execution) ≈ 60 % executor-heap по умолчанию;
сюда лезут и кэш-блоки, и временные буферы shuffle, и агрегации. - Остальное — user-heap, метаданные классов, строчки, сериализованные объекты и др.
-
spark.executor.memoryOverhead
резервируется за пределами Java-heap: native-буферы Netty, off-heap страницы, Python workers, Arrow, JNI-компоненты (Parquet, ORC, RocksDB и т. д.).
-
Unified Memory (storage + execution) ≈ 60 % executor-heap по умолчанию;
-
Когда добавляете ядра, растёт только параллелизм, но не квота памяти.
→ «память на задачу» ≈executorHeap / concurrentTasks
.
Если раньше одну тяжёлую задачу держали в 300 МБ и всё проходило, то после увеличения ядер вдвое та же задача получает лишь \~150 МБ, и при том же объёме данных начинается OOM.
- Практический алгоритм выбора числа ядер
Шаг 1. Определитесь, что лимитирует джобу
- CPU-bound (чтение, форматирование, UDF-ы без крупной агрегации) — больше ядер окупается.
- Memory-/shuffle-bound (wide joins, groupByKey на больших картах, строите крупный в-памяти кэш) — часто выгоднее меньше ядер и несколько executor’ов, чем монолит с 15 ядрами.
Шаг 2. Измерьте фактическое потребление памяти на задачу
- Запустите job в Spark UI, посмотрите «Peak Execution Memory» на самом тяжёлом stage.
- Умножьте на \~1.3–1.5 для запаса под GC фрагментацию и неучтённые структуры.
Шаг 3. Посчитайте безопасное executor.cores
safeCores = floor( heapBytes / (peakTaskMemBytes * safetyFactor) )
- Для JVM-heap берите именно то, что выдаёт
Runtime.maxMemory()
, а не номинальное. - Если safeCores < 2 — увеличьте память или пересмотрите серилизацию/партиционирование.
Шаг 4. Проверьте ограничения инфраструктуры
-
YARN:
yarn.scheduler.maximum-allocation-vcores
,memory-mb
; cgroups могут жестко убить процесс раньше Java-OOM. - Kubernetes: лимиты pod’а действуют сверху — heap не сможет расшириться.
- Standalone/Mesos: смотрите, чтобы суммарное vcore-потребление не забило весь узел и не начался OS-swap.
- Нетривиальные эффекты, о которых часто забывают
-
GC ⇄ cores. Больше активных объектов одновременно → длиннее stop-the-world паузы у G1/ParallelGC. Явно настройте
spark.executor.extraJavaOptions
с G1 и-XX:MaxGCPauseMillis
, иначе рост ядер приведёт к GC-бурстам и «фальшивым» OOM (timeouts). -
PySpark × Arrow.
spark.sql.execution.arrow.pyspark.enabled=true
создаёт большой off-heap буфер внутри JVM + IPC-шару. При 8+ ядрах размер этой памяти практически линейно растёт. - Shuffle-spill на диск. Когда heap забит, Spark начинает сливать промежуточные сегменты на диск; если диск медленный, задача «застынет» и GC не освободит память вовремя — получите OOM с формулировкой «failed to allocate page». Не лечится добавлением ядер, только мемом или SSD/NVMe.
-
Skew (перекос по партициям). Если одна-две партиции съедают половину кучи, увеличение ядер ситуацию даже ухудшит, потому что другие потоки будут конкурировать за тот же heap и очередь spill-ов. Лечится
salting
,skewJoin
,adaptiveExecution
.
- Правила-“отсечки”, работающие в проде
- 2–5 ядер на executor на YARN — золотая середина для смешанных (CPU+shuffle) workloads.
- \~300 МБ heap-памяти на задачу — консервативная оценка для Java/Scala кода без больших UDF-объектов. Для PySpark считайте 400–500 МБ за счёт Python-heap.
- Не выходите за 8 ядер на executor, пока не доказали профилировщиком, что именно CPU простаивают.
-
Никогда не масштабируйтесь только одним числом — всегда тройка:
executor.instances
,executor.cores
,executor.memory(+Overhead)
должна подбираться одновременно.
Итог
Увеличение spark.executor.cores
полезно лишь тогда, когда ваш bottleneck действительно CPU и при этом у каждой задачи остаётся достаточно heap+off-heap-памяти с запасом на GC и сетевые буферы. Во всех остальных случаях вы быстрее упёрнётесь в OOM или длинные GC-паузы, чем получите прирост производительности. Сначала померьте память на задачу, прикиньте безопасное число ядер, а уже потом масштабируйте job.
Архитектура Spark - что под капотом?
Теги: #Ярослав
Компоненты архитектуры Spark:
Драйвер (Driver): запускает пользовательский код и управляет DAG, этапами, задачами.
Менеджер кластеров (Cluster Manager): управляет ресурсами в кластере (может быть YARN, Mesos или Standalone).
Воркеры (Worker Nodes): физические или виртуальные машины, на которых выполняются исполнители.
Исполнители (Executors): процессы на воркерах, выполняющие задачи и управляют памятью для RDD.
Что такое udf?
Теги: #Яроcлав
Что такое UDF в Spark
UDF (User‑Defined Function) — это функция, которую вы регистрируете в Spark SQL, чтобы выполнить логику, которой нет в стандартных выражениях org.apache.spark.sql.functions
или SQL‑синтаксисе. Spark вызывает эту функцию для каждой строки (или батча строк), поэтому важно понимать, где она исполняется и сколько стоит перенос данных.
Три основные разновидности
-
PythonUDF (scalar)
Исполнение: в отдельном Python‑процессе на каждом экзекьютере через Py4J, тк сначала данные сериализются в пикл и потом передаются на питон.
Когда применять: только если совсем нельзя выразить логику на SQL/Scala.
Минусы: Проблема в том, что для исполнения Python UDF нужно перенести данные к коду и вернуть обратно. Каждую строку приходится копировать JVM ⇄ Python; Catalyst не оптимизирует, поэтому нет push‑down фильтров, project pruning и т.д. -
PandasUDF (vectorized)
Исполнение: тоже в Python, но колонками (Arrow RecordBatch), что снижает накладные расходы на сериализацию на порядок.
Хорошо подходит для: NumPy/Pandas‑логики, агрегатов со сложной математикой.
Ограничения: поддерживаются не все типы (nested Struct/Map иногда падают); нельзя использовать в streaming. -
Scala/JavaUDF
Исполнение: прямо в JVMExecutor’е, без межпроцессного копирования.
Плюсы: быстрее Python‑вариантов.
Минусы: Catalyst всё равно видит функцию как «чёрный ящик», значит оптимизации также теряются.
Как создать и вызвать (кратко)
```python
from pyspark.sql.functions import udf
from pyspark.sql.types import IntegerType
scalar Python UDF
@udf(returnType=IntegerType())
def add_one(x):
return x + 1
df = spark.range(5).withColumn(“y”, add_one(“id”))
~~~
```python
import pandas as pd
import pyspark.sql.functions as F
from pyspark.sql.types import LongType
from pyspark.sql.functions import pandas_udf
Pandas UDF
@pandas_udf(LongType())
def add_one_vec(s: pd.Series) -> pd.Series:
return s + 1
df = spark.range(5).withColumn(“y”, add_one_vec(“id”))
~~~
```scala
// Scala UDF
val addOne = udf((x: Long) => x + 1)
spark.range(5).withColumn(“y”, addOne($”id”))
~~~
Практические правила для продакшна
-
Сначала ищите готовую функцию или выражение
expr(...)
. - Если нужна кастомная логика на JVM — пишите Scala/JavaUDF.
-
Если без Python не обойтись — предпочитайте Pandas UDF и включайте Arrow:
python spark.conf.set("spark.sql.execution.arrow.pyspark.enabled", "true")
- Держите UDF stateless и идемпотентной; помечайте недетерминированные функции как
.asNondeterministic()
. - Тестируйте на реальном объёме данных: даже PandasUDF может просесть в скорости при гигабайтных батчах.
Итого: UDF — мощный, но дорогой инструмент. Используйте его, только когда стандартного Spark SQL недостаточно, выбирая вариант (Scala, Pandas, Python scalar) по наименьшему оверхеду для вашей задачи.
Как ограничить количество ресурсов в Спарк? Можно ли через YARN это сделать?
Теги: #мир
Как “подрезать” Spark‑приложение по ресурсам
- Границы, которые ставит сам Spark
-
Память и CPU на драйвер и исполнители
Черезspark-submit
можно задать:--driver-memory
,--driver-cores
,--executor-memory
,--executor-cores
,--num-executors
.
Это превращается в YARN‑контейнеры, у которых чётко задан объём памяти и количество vCore‑ов. -
Память “поверх” heap’а
Есть параметрspark.yarn.executor.memoryOverhead
. Он нужен, чтобы резервировать память под off‑heap‑выделения, Arrow, JNI, Netty и т.п. По умолчанию Spark сам выставляет минимум 384 МБ или 10% отexecutor-memory
, но этого часто не хватает. Если не задать — YARN может прибить контейнер за превышение лимита, хотя GC‑логи будут чистыми. -
CPU на задачу
Черезspark.task.cpus
можно указать, сколько vCore требуется на одну task. Это полезно, если task тяжёлая — например, обучение модели, видеокодек, вызов C++‑библиотеки и т.п. Тогда можно уменьшить общее число task’ов, но дать им больше ресурсов.
- Что может ограничить YARN поверх Spark
Да, можно — и часто именно через YARN и ограничивают Spark‑приложения. Тут работает сразу несколько уровней:
-
На уровне узла (ноды)
В конфиге YARN NodeManager задаютсяyarn.nodemanager.resource.memory-mb
иcpu-vcores
. Это лимит, который вообще доступен на данной машине. Больше нельзя ни одному приложению — ни Spark, ни Hive, ни чему другому. -
На уровне очереди (CapacityScheduler или FairScheduler)
В YARN очереди можно задаватьcapacity
,max-capacity
,user-limit-factor
.
Эти параметры управляют, сколько ресурсов (в % от общего пула) может получить очередь, и сколько один пользователь может забрать из этой очереди.
Например, можно ограничить, чтобы Spark‑приложения конкретного пользователя не забирали больше 30% ресурсов. -
На уровне контейнера
Контейнер — это единица, которую запрашивает Spark для executor’а или драйвера. Spark сам выбирает размеры в зависимости от указанных флагов, но контейнер никогда не выйдет за рамки, которые позволяют очередь и ноды.
Практические нюансы
-
Если драйвер в client‑режиме
Тогда он не запускается внутри YARN‑контейнера, а работает на вашей машине (или edge‑ноде). YARN на него не влияет — если у вас не настроены cgroups, драйвер может съесть сколько угодно памяти. Это может стать bottleneck. -
Dynamic Allocation
С параметромspark.dynamicAllocation.enabled=true
Spark может автоматически запрашивать и отпускать executors. Всё равно действует ограничениеmaxExecutors
, и оно не позволит вылезти за границы очереди или кластера. -
Resource Profiles (Spark 3.1+)
Позволяют задавать разные размеры executors для разных стадий. Например, для одной стадии давать 2 core и 4 гб, а для другой — 4 core и 8 гб. Это не обход лимитов — просто более гибкая работа в пределах разрешённых границ. -
GPU / FPGA
Для работы с видеокартами Spark использует параметрыspark.executor.resource.gpu.amount
и аналогичные. Чтобы YARN правильно выделял узлы с нужными ресурсами, используют метки (NodeLabels) и cgroups. -
Проблемы с Off‑Heap памятью
Если в логах нет GC, но YARN убивает контейнер — почти наверняка дело в том, что выделения шли в off‑heap. Это может быть Arrow, Netty, C‑библиотеки, Broadcast‑переменные. Чтобы этого избежать, увеличиваютmemoryOverhead
.
Минимальная шпаргалка для ответа
> “Я ограничиваю ресурсы на трёх уровнях: внутри Spark — это executor‑memory, cores, task.cpus, dynamic allocation; на уровне YARN — через queue capacity и user‑limit‑factor; на уровне нод — через ресурсы NodeManager. В cluster‑режиме драйвер тоже попадает под лимиты, в client‑режиме — нет, и это отдельный риск. Также использую memoryOverhead, если есть Arrow или Off‑Heap‑нагрузка. Могу задавать resource profiles для разных стадий, а для GPU — выделяю ресурсы через node labels.”
Хочешь дополнить это кейсами с реальных проектов — например, как спасал джобу от OOM или как падал client‑драйвер?
Как происходит выполнение (вычисления) в Spark? Что под капотом у Spark?
Теги: #Билайн, #rubbles
Коротко: почти в такой, но не «жёсткой линейкой». Есть фиксированный «скелет» шага за шагом, а внутри него — параллелизм и обратная связь (AQE), из-за чего часть вещей делается на лету. Ниже — что идёт строго по порядку, а что может накладываться/меняться.
Что идёт строго по порядку (скелет)
-
Action → планирование на драйвере. Трансформации ленивы; действие (например,
count
,show
, запись) триггерит оптимизацию/исполнение. Для DataFrame/Dataset это: логический план → оптимизированный → физический (Catalyst + выбор стратегии). Посмотреть можно черезexplain("extended")
. (spark.apache.org) - Разбиение на стадии по границам shuffle. Любая action-работа превращается в стадии, которые разделены распределёнными shuffle-операциями (wide deps). (spark.apache.org)
- DAGScheduler/TaskScheduler. Драйвер строит DAG стадий и подаёт готовые стадии как TaskSet в планировщик задач; затем задачи раздаются на executors. (archive.apache.org, spark.apache.org)
-
Исполнение задач на executors.
- Для первой стадии читаются исходники; для последующих — читаются shuffle-файлы предыдущих стадий.
- На wide-границе задача пишет shuffle-выход (sort-based shuffle по умолчанию) на локальный диск; следующая стадия его читает. (spark.apache.org)
Итого: «Action → (Catalyst) план → стадии → задачи → shuffle-write/read → следующая стадия → … → результат» — это базовый, детерминированный каркас. (spark.apache.org)
Что НЕ строго по очереди (накладывается/меняется)
-
AQE меняет план в runtime. После старта запроса Spark может: схлопывать пост-shuffle партиции, конвертировать sort-merge join в broadcast/shuffled-hash, лечить skew. С 3.2 включено по умолчанию (
spark.sql.adaptive.enabled=true
). (archive.apache.org, downloads.apache.org, spark.apache.org) - Ресурсы и параллелизм. Исполнители могут быть подняты заранее или динамически; в пределах готовой стадии её задачи бегут параллельно; другие стадии этого же джоба ждут родителей, но другие джобы могут исполняться параллельно (конфигурируется политикой планировщика). (spark.apache.org)
- Broadcast и обмен данными. Если выбран broadcast-join (или AQE сконвертировал в него), малая сторона рассылается executors без shuffle; это может подготавливаться параллельно с другими частями плана. (spark.apache.org)
-
UI/диагностика не влияет на порядок. UI на
:4040
просто слушает события выполнения; порядок выше не меняет. (spark.apache.org)
Ответ на твой вопрос «строго ли в такой последовательности?»
- Да, в терминах уровней порядок «Action → Catalyst-план → DAGScheduler (стадии) → TaskScheduler (задачи) → shuffle → следующая стадия» — верный. Это «скелет», без которого вычисление не состоится. (spark.apache.org, archive.apache.org)
- Но нет, не «монотонной лентой»: executors/задачи работают параллельно, другие джобы могут накладываться, а AQE вправе переиграть часть физического плана уже во время бега. Это нормальная, задокументированная динамика движка. (spark.apache.org, archive.apache.org)
Зачем это знать? — Чтобы корректно читать план/UI и объяснять «почему медленно»: узкие места почти всегда на границах shuffle (чтение/запись, сеть, количество партиций) или из-за неверной стратегии join/скью. Понимание, что фиксировано, а что адаптивно, позволяет либо заранее выбрать лучшую стратегию (partitioning, hints), либо «дать AQE шанс» и проверить фактическую стратегию в UI/плане. (spark.apache.org)
Почему так устроено? — Жёсткий каркас гарантирует корректность/устойчивость (линейность стадий по зависимостям и восстановление по lineage), а адаптивные и параллельные части дают производительность и лучшую утилизацию кластера. (spark.apache.org)
Если хочешь, сожму это в один «постер»-чеклист (по шагам + где смотреть в UI) под твой кластер и версии Spark.
Как изменить степень параллелизма в Spark?
Теги: #Билайн #Ярослав
PySpark
0 · Что именно считается параллелизмом
-
Число одновременно исполняющихся задач (task).
parallelism = min(кол‑во_партиций_на_стадии, executors× coresPerExecutor)
Меняем мы всегда либо количество партиций, либо количество доступных CPU‑ядер. Всё остальное — вариации вокруг этих двух рычагов.
1 · Меняем партиции в самом коде
```python
df = spark.read.parquet(path) # партиции придут «как лежат» в файлах
df = df.repartition(1600) # создаём 1600 новых шардов (shuffle)
df = df.repartition(“customer_id”) # hash‑partition по ключу (shuffle)
df_small = df.filter(“dt = ‘2025‑05‑12’”)
df_small = df_small.coalesce(200) # ужимаем без shuffle
rdd = sc.textFile(“hdfs:///data”, minPartitions=800)
rdd = rdd.repartition(1200) # RDD API то же самое
~~~
Нюансы
-
coalesce(N)
безshuffle=True
может только уменьшать N; при увеличении Spark создаст пустышки → параллелизм не вырастет. - При переходе RDD → DataFrame теряется кастомный
Partitioner
, если он был. - В Spark 3 включён AQE:
spark.sql.adaptive.enabled=true
. Тогда Spark умеет сам дробить «длинные» таски и схлопывать мелкие, но только после первого шаффла. То есть первоначальноеrepartition()
всё равно важно.
2 · Меняем число одновременных задач через spark-submit
```bash
spark-submit \
–master yarn \
–deploy-mode cluster \
–num-executors 50 \
–executor-cores 4 \
–executor-memory 8g \
–conf spark.default.parallelism=800 \
–conf spark.sql.shuffle.partitions=1600 \
main.py
~~~
-
--num-executors × --executor-cores
→ верхняя граница одновременно работающих тасков. -
spark.default.parallelism
читается ровно один раз при созданииSparkContext
; после старта приложения поменять нельзя. Используется, когда Spark «сам решает», сколько партиций создать (напримерreduceByKey
безnumPartitions
). -
spark.sql.shuffle.partitions
определяет число выходных партиций каждой shuffle‑операции в SparkSQL / DataFrame API (по умолчанию200). Слишком маленькое значение →«хвостовые» таски; слишком большое →море мелких файлов.
Быстрый хак (попадает в мой шаблон скрипта)
```bash
CORES_TOTAL=$(($(nproc –all) * 4)) # 4 узла по nproc ядер
spark-submit \
–conf spark.sql.shuffle.partitions=$((CORES_TOTAL*2)) \
…
~~~
Даёт «чуть больше партиций, чем ядер»— почти всегда попадаю в sweet‑spot без микроскопа.
3 · Динамика во время выполнения
```bash
–conf spark.dynamicAllocation.enabled=true
–conf spark.shuffle.service.enabled=true
~~~
- Spark самостоятельно добавит ещё executors, если очередь тасков длинная, и высвободит их, когда стадия закончится.
-
Важно: существующие RDD и shuffle‑файлы уже имеют фиксированное число партиций; dynamic allocation не меняет их. Поэтому после огромного
coalesce(10)
добавить 100экзекьюторов эффекта не даст.
Как бороться с перекосом данных (data skew)?
Теги: #Ярослав
Перекос данных (data skew) возникает, когда данные распределены по разделам (partitions) неравномерно, и некоторые из них получают существенно больше данных. Это приводит к увеличению времени выполнения задач и перегрузке памяти на отдельных узлах. Spark предлагает несколько подходов для борьбы с этим:
- Использование broadcast join
При перекосе данных в джоинах с маленькой таблицей используйте broadcast join: небольшая таблица передается на все узлы, и перекос при соединении минимизируется.
Включите автоматический broadcast join с помощью spark.sql.autoBroadcastJoinThreshold.
5. Выборка ключей с перекосом и их обработка отдельно
Найдите ключи, вызывающие перекос, обработайте их отдельно (например, в отдельном DataFrame) и соедините результат с основными данными после выполнения операций.
- Агрегация данных перед джойном
Если перекос возникает в джоинах, попробуйте сначала агрегировать данные по ключам до соединения, что может уменьшить количество данных и нагрузку.
Изменение логики
Пересмотреть гранулярность ключа (более высокая кардинальность).
Декомпозировать «горячие» ключи на отдельный пайплайн/батч.
AQE (Adaptive Query Execution) skew-join
В Spark 3+ можно включить автоматическое разбиение перекошенных shuffle-партиций на несколько меньших при sort-merge join:
spark.sql.adaptive.enabled=true и spark.sql.adaptive.skewJoin.enabled=true
Порог/фактор настраиваются (skewedPartitionFactor, skewedPartitionThresholdInBytes). Это позволяет динамически разрезать «жирный» reduce-partition на несколько задач.
От чего зависит нагрузка по вычислениям? Есть ли spill’ы? #Иннотех
- Нагрузка по вычислениям зависит от:
1. Объёма данных: чем больше данных, тем выше объём обработки.
2. Типа операций:
o Широкие трансформации (join, groupBy, reduceByKey) вызывают shuffle и требуют перераспределения данных.
o Узкие трансформации (map, filter) работают в рамках текущих партиций. - Spill (пролив на диск) происходит, когда Spark не хватает памяти для хранения временных структур (например, при shuffle или сортировке).
- Spark «выгружает» часть данных на диск, чтобы освободить оперативную память.
- Spill может замедлять вычисления, поскольку доступ к диску медленнее, чем к памяти.
- Проверить наличие spill’ов можно в логах Spark (или через Spark UI, в разделе Tasks/Stages видна информация о spill на диск).
Какая разница между Coalesce и Repartition и в каких случаях и когда что юзать?
Теги: #Ярослав
- coalesce и repartition — это методы переразбиения (reshuffling) данных в Spark, но с разными подходами и назначением.
- coalesce
Используется для уменьшения количества разделов (partitions).
Работает без шаффлинга (reshuffling) данных между узлами, если данные остаются на тех же узлах.
Обычно быстрее и эффективнее, так как не требует передачи данных по сети.
Идеален для оптимизации разделов, если данные стали меньше на более поздних стадиях обработки, например, после фильтрации.
Пример использования:
scala
val reducedData = largeData.coalesce(4)
Когда использовать:
Если нужно уменьшить количество разделов.
Для увеличения производительности на финальных этапах, когда данные уже обработаны и требуется только сохранить их в меньшем количестве разделов. - repartition
Может использоваться как для уменьшения, так и для увеличения числа разделов.
Требует шаффлинга данных, что приводит к перераспределению по всем узлам кластера.
Подходит для случаев, когда необходимо равномерно распределить данные или переразбить их для параллельной обработки, что актуально на ранних этапах анализа данных.
Пример использования:
scala
val repartitionedData = data.repartition(10)
Почему repartition
— в начале/перед тяжёлыми шагами
-
Ранняя балансировка: перед
join
/groupBy
вы хотите равномерные партиции, иначе получите “хвосты” (stragglers).repartition(key)
разносит ключи ровнее. - Управление параллелизмом: увеличить число партиций до разумного уровня, чтобы загрузить все ядра кластера на первых дорогих стадиях.
- Снижение skew: если одна группа/ключ разрастается, хэш-repartition или range-repartition до агрегации сгладит перекос.
Почему coalesce
— в конце/перед записью
-
Нет лишнего shuffle: к концу плана shuffle уже был (join/agg/sort). Делать ещё один (через
repartition
) только ради меньшего числа файлов — лишние затраты.coalesce
уменьшит партиции дёшево. -
Контроль маленьких файлов: перед
write
склеиваем партиции до целевого количества → меньше файлов и нагрузка на метаданные. -
Размер очевиден только в конце: именно к финишу вы знаете итоговый объём и можете рассчитать адекватное
n
для coalesce.