Spark Flashcards

(21 cards)

1
Q

Модель памяти в Spark. Как на executor распределяется? Какая память за что отвечает?

A

Spark поддерживает Java, R и Python, основным языком реализации самого фреймворка является Scala. Поэтому все операции выполняются внутри JVM, даже если пользовательский код написан на Python или R. Среда выполнения фреймворка разделяет пространство кучи (Heap) JVM в драйвере и исполнителях на 4 разные части

Execution Memory - используемая структурами данных во время shuffle-операций, при которых данные перемешиваются – т.е. соединение, группировка и агрегирование;

– используется для выполнения shuffle-операций, агрегатов, сортировок, joins и любых промежуточных вычислений.
– когда данных слишком много — может происходить spill на диск.
– размер задаётся через spark.memory.fraction * heapSize и spark.memory.storageFraction (он же “threshold” между execution и storage).

Storage Memory зарезервирована для кэширования данных
– кеширование (cache()/persist(MEMORY_ONLY)), broadcast-переменные и RDD persist.
– если Storage не использует всю отведённую область, избыточная часть может «перетекать» в Execution и наоборот, до порогов.

Reserved Memory зарезервированная память для внутренних целей фреймворка.

– небольшой фрагмент (обычно ~300 MB) резервируется под JVM, агрегации, внутренние буферы и native код.

User Memory для хранения структур данных, созданных и управляемых пользовательским кодом.

Unified = heap – Reserved, из которых
* Execution Memory (~30%) для вычислений (shuffle, join, sort)
* Storage Memory (~30%) для кеша и broadcast
* оставшиеся 40% (heap – unified) недоступны под Spark и служат под Java-овер­хеды.

Кроме JVM Heap, есть еще два сегмента памяти, к которым обращается Spark:
память вне кучи (Off-Heap Memory) – сегмент за пределами JVM, который иногда используется виртуальной машиной Java, например, для метода intern(), гарантирующего что все строки с одинаковым содержимым, совместно используют одну и ту же память. Память вне кучи также может использоваться для хранения сериализованных датафреймов и RDD.

Внешняя память процесса (External Process Memory), которую используют программы на PySpark и SparkR в рамках процессов Python и R вне JVM.

How well did you know this?
1
Not at all
2
3
4
5
Perfectly
2
Q

Что можно посмотреть в Spark UI?

A

Кол-во выделенных ресурсов, план запроса
Какие executor, jobы

How well did you know this?
1
Not at all
2
3
4
5
Perfectly
3
Q

Как распределяется память между экзекьюторами?

A

Spark не делит память между executor’ами напрямую. Ты указываешь, сколько памяти нужно каждому, и сколько executor’ов запустить.
Далее менеджер ресурсов (например, YARN) размещает executor’ы по доступным нодам.
Executor обрабатывает только свою часть данных, хранит свои таски и использует выделенную память независимо.
Если перекос данных или плохо настроено количество executor’ов/памяти — возможны OOM, spills, простаивание.

Ты задаёшь параметры:

–executor-memory: сколько памяти получит каждый executor
–num-executors: сколько executor’ов всего
–executor-cores: сколько ядер у каждого executor

Spark сам не делит память между executor’ами — он просит у YARN (или другого ResourceManager) выделить N контейнеров по M памяти.
ResourceManager уже решает, есть ли такие ресурсы на нодах и куда их можно поставить.

How well did you know this?
1
Not at all
2
3
4
5
Perfectly
4
Q

Что такое спиллы (spills) и как их избежать?

A

Скорость спарка в основном завязана на вычислениях в оперативной памяти. Если данные не помещаются в ОЗУ, происходит spill – сохранение промежуточных результатов на диск, который в разы медленнее оперативной памяти.

Может происходить как при вычислениях на executor’е, так и при сборе всех результатов на driver. Решение – увеличивать количество кусочков, на которые делятся данные (партиций) или увеличивать лимиты ОЗУ
Как узнать о спиле?
1. Посмотреть в Spark UI
Spark UI → Executors → Metrics
– В секции “Executor Summary” есть колонки “Memory Spill” и “Disk Spill” (или даже более детально “Shuffle Spill (Memory)” и “Shuffle Spill (Disk)”). Если там ненулевые значения — значит, часть данных не уместилась в память и была сброшена на диск.
2. Логи драйвера и executors
Spilling block rdd_5_3 to disk
Shuffle spill: memory = 50.3 MB, disk = 123.7 MB

Как избежать:
Увеличить executor memory
Увеличить spark.sql.shuffle.partitions, чтобы снизить нагрузку на партицию
Использовать persist() или broadcast() вручную
Настроить join’ы (например, avoid sort-merge join)
Следить за размером данных в одной партиции (перекосы = больше spill)

How well did you know this?
1
Not at all
2
3
4
5
Perfectly
5
Q

Какие есть виды джойнов (физических операторов) в Spark?

A

Broadcast Hash Join
Когда применяется: один из датафреймов «маленький» и помещается в память всех executors.

Sort-Merge Join
Когда применяется: большие данные, где хочется минимизировать потребление памяти или когда Spark не может выбрать hash-join (например, когда есть внешние соединения).

Shuffle Hash Join
Когда применяется: ни один из DF не маленький, но доступна память для хеширования одного из них после shuffle.

Cartesian Join / Nested Loop Join
Когда применяется: нет ключей для обычного join, или нужен полный декарт.

How well did you know this?
1
Not at all
2
3
4
5
Perfectly
6
Q

Опиши ситуацию при которой может быть перекос данных? Как его обнаружить и устранить?Какие этапы обработки данных в Spark чаще всего приводят к перекосу данных (чтение, запись, шаффл)?

A

Можно считать, что у нас перекос, если данных в одном узле больше чем в другом на 10%

Причины: Shuffle-операции (groupBy, reduceByKey, join, distinct) — требуют перераспределения данных.
Чтение из источников с неравномерным партиционированием (JDBC, Kafka).
Запись в partitionBy, если одна партиция содержит почти все данные.

Симптомы: В Spark UI одна-две задачи (Task) в Stage занимают намного больше времени, чем остальные.

Как обнаружить: Смотрим распределение времени и объёма данных по задачам в Spark UI → Stage → Tasks.

Решение:
Salting - находим ключ по которому распределены данные и видим сильный перекос в одном из узлов. Берём этот ключ с перекошенными данными и создаём из 1 ключа несколько новых путём добавления рандомных символов, перераспределяем(shuffle) с учётом новых “солёных” ключей, получаем более равномерное распределение данных.

В случае, если вы делаете salting ключа перед join-ом двух таблиц, то вам нужно посолить одинаковым способом ключ джоина в обоих таблицах и по новому посоленому ключу уже джоинить

Broadcast Join - если мы джоиним маленькую табличку, то можем передать её целиком на каждый узел, по аналогии с Distributed Replicated в Greenplum. Уменьшает Shuffle и риск Data Skew соответственно

Repartition - увеличение числа партиций и равномерное распределение по ним(вызывает Shuffle)

Избегать взаимодействий с кривыми ключами, где много null, дубликатов или ещё каких-то косяков

How well did you know this?
1
Not at all
2
3
4
5
Perfectly
7
Q

Что такое Job, Stage и Task в Spark? В чем их разница? Почему нельзя все делать в тасках? Как они разграничиваются между собой? Зачем несколько джоб почему такая группировка у них?

A

Spark строит DAG (Directed Acyclic Graph) операций и делит выполнение на уровни абстракции:

  1. Job
    Это единица исполнения, которая запускается при action (count(), collect(), save() и т.д.)
    В коде может быть 1 DataFrame, но 5 action → будет 5 Job’ов
  2. Stage
    Stage — этап внутри Job, состоящий из набора задач, которые можно выполнить без пересылки (shuffle). Spark разбивает DAG трансформаций на несколько Stage по границам shuffle–операций (например, groupBy, join
  3. Task
    Минимальная единица работы внутри Stage. За каждый partition данных создаётся свой Task. Если у Stage 100 партиций, то он порождает 100 задач, которые выполняются параллельно на executors.

Spark обрабатывает одним ядром executor’а одну партицию

Почему нельзя всё делать в тасках?
Потому что таски работают в рамках одного Stage — без shuffle
Когда нужны перегруппировки данных между executor’ами (например, join, groupBy) — нужен shuffle, а это уже Stage boundary
Между Stage’ами — синхронизация, сериализация, переразбивка данных, и это невозможно сделать внутри одной таски
Почему несколько Job и как они группируются?
Каждое действие (action) запускает новый Job, даже если оно применяется к тому же самому DataFrame
Spark ленивый: transformations (select, filter, withColumn) копятся, но реально выполняются только при action
Job состоит из Stage’ов, Stage — из Task’ов → такая структура нужна для планирования, распределения, фолт-толерантности

Фолт-толерантность — это способность Spark автоматически восстанавливаться при сбоях, перезапуская неудачные задачи на других узлах, используя DAG-план и lineage.
Это ключевая причина, почему Spark надёжно работает в распределённых кластерах с десятками или сотнями машин.

How well did you know this?
1
Not at all
2
3
4
5
Perfectly
8
Q

Task и partitions это одно и тоже?

A

Partition — это логическая часть данных в RDD/DataFrame, а Task — это исполняемая единица, которая обрабатывает одну партицию.
Количество Task’ов в Stage всегда равно количеству партиций на этом этапе.
То есть: partition — данные, task — работа над этими данными.
В Spark партиция (partition) — это логическое разделение RDD (Resilient Distributed Dataset) на более мелкие части, каждая из которых может обрабатываться параллельно разными рабочими узлами кластера. Таска (task), в свою очередь, — это конкретное вычисление, которое выполняется одним рабочим узлом на одной или нескольких партициях RDD.
Другими словами, RDD разбивается на партиции, и на каждой партиции выполняется один или несколько тасков.

How well did you know this?
1
Not at all
2
3
4
5
Perfectly
9
Q

Когда количество партиций в Spark меняется?

A

Можем поменять через repartition
При shuffle операции
Любая трансформация, требующая перекладывания данных между executors, разрывает исходные партиции и создаёт новые по умолчанию в размере, заданном параметром spark.sql.shuffle.partitions (обычно 200). К таким операциям относятся groupBy, reduceByKey, join, distinct, repartition (без указания числа – по умолчанию), coalesce (если явно указать меньшее число) и др.

How well did you know this?
1
Not at all
2
3
4
5
Perfectly
10
Q

Если есть DataFrame с 1000 партиций. И я группирую его по какому-то полю и считаю count. Какое количество партиций получится на выходе после shuffle?

A

После groupBy(…).count() число партиций в результатирующем DataFrame будет равно значению настройки
spark.conf.get(“spark.sql.shuffle.partitions”)
— по умолчанию это 200. Если вы переопределяли эту опцию (например, до 500), то и после shuffle получите ровно столько партиций.

How well did you know this?
1
Not at all
2
3
4
5
Perfectly
11
Q

Таска обрабатывает один блок данных или одну партицию? Какая связь с блоком данных в HDFS? Мы хотим разбить на 10 файлов. Что мы пишем в Spark?repartition 10. Получается партиция это один блок?

A

Одна таска в Spark всегда обрабатывает одну партицию.
А вот партиция может содержать один или несколько HDFS-блоков — они не равны напрямую.

Важно:
HDFS делит файлы на блоки фиксированного размера (например, 128 МБ).
Spark делит данные на партиции, и по умолчанию — по одному на блок, но не обязательно.
Partition — это логическая единица внутри Spark, а блок — физическая единица на диске (HDFS).

Смотря какого размера. Если больше чем 128, то нет

Получается, партиция = блок?
Нет.
Но: При чтении из HDFS — по умолчанию Spark создаёт по одной партиции на каждый блок.
Но это настраиваемо — можно изменить число партиций при чтении (minPartitions, spark.sql.files.maxPartitionBytes и др.)

How well did you know this?
1
Not at all
2
3
4
5
Perfectly
12
Q

Как работает партицирование в Spark?

A

Partition (партиция) — это некая часть исходных данных, полученная в ходе разбиения датасета в Apache Spark или Hive. Процедура разбиения на партиции называется партицированием (partitioning). Благодаря делению на партиции появляется возможность обрабатывать большие данные.

Apache Spark поддерживает два вида партиций: в оперативной памяти в виде фрейма данных (DataFrame) и на диске в виде файла:
Партиция в памяти выполняется с помощью вызовов repartition или coalesce.
Партиция на диске выполняется с помощью вызова partitionBy (это аналогично партициям в Hive).

При создании партиций в оперативной памяти датасет будет просто разделен на равные части. Вызов repartition также выполнит перетасовку (shuffle). А вызов coalesce, который умеет только уменьшать число партиций, может этого не делать, поскольку он более оптимизирован.

При создании партиций на диске они будут сгенерированы на основе уникальных значений столбца(ов), по которому(ым) происходит разбиение. Иными словами, partitionBy работает как groupBy, только вместо групп на диске создаются файлы с партициями.

К плюсам партиций относятся: быстрый доступ к данным; возможность производить операции на меньших датасетах.

К минусам партиций относятся: наличие большого числа партиций (операции ввода-вывода медленные); не решают проблему неравномерности данных (партиции могут быть разного размера, например, количество записей с населением Китая и Ватикана разное).

How well did you know this?
1
Not at all
2
3
4
5
Perfectly
13
Q

Что такое shuffle и как он влияет на производительность?

A

Shuffle — это пересылка данных между executor’ами по сети.
Возникает при:
groupBy, join, repartition, distinct, orderBy, reduceByKey

Почему это дорого:
Данные сериализуются
Пишутся на диск (spills)
Пересылаются по сети
Потом считываются заново

Это один из самых затратных этапов в Spark — надо его по возможности избегать или контролировать (например, не делать лишние repartition, использовать broadcast join, salting).

How well did you know this?
1
Not at all
2
3
4
5
Perfectly
14
Q

Если нужно потюнить какие-то параметры? Что собираешься править если не работает джоба или не хватает скорости?
Как джоба может не работать?

A

Количество партиций:
spark.sql.shuffle.partitions — по дефолту 200
меньше, если данных мало; больше — если перекос и тормоза

Параметры памяти:
–executor-memory, –executor-cores
spark.memory.fraction, spark.memory.storageFraction

Выбор join’ов:
Включаю spark.sql.autoBroadcastJoinThreshold
Или вручную broadcast(df)

Кэширование:
df.cache() если reused
persist(StorageLevel.MEMORY_AND_DISK) если большие

Как Spark job может “не работать”?

Ошибка на executor’е:
OutOfMemoryError (heap или off-heap)
сериализация (Pickle, Kryo)
пользовательская UDF кинула ошибку

Driver умер:
нехватка памяти (–driver-memory)
большой результат в .collect()

Stage завис на долгой таске:
чаще всего из-за перекоса данных или спилл на диск

Проблемы с источниками данных:
плохой split на HDFS
нестабильный S3 / JDBC

How well did you know this?
1
Not at all
2
3
4
5
Perfectly
15
Q

Checkpoint / localCheckpoint в Spark

A

В Spark каждый RDD/DataFrame хранит не только данные, но и линейку преобразований (lineage) — цепочку операций от исходных источников до текущего результата. При сбое Spark может «переиграть» части DAG-а, используя lineage, но если он становится слишком длинным или сложным, это может приводить к избыточным пересчётам и даже к OOM. Здесь на помощь приходят два механизма — checkpoint() и localCheckpoint().

checkpoint()
Сохраняет RDD/DataFrame физически в надёжном хранилище (обычно HDFS).
Разрывает lineage: после чекпоинта Spark «забудет» старый DAG-участок и при сбое восстановится только от состояния на диске.

Плюсы
Устойчивость: при падении драйвера/executor-а не придётся заново прогонять весь lineage.
Разрыв долгого DAG: упрощает граф зависимостей и уменьшает требования к памяти.

Минусы
Медленнее, чем localCheckpoint(): данные записываются в HDFS, проходят shuffle и блокировка.
Нужна доступность HDFS и права на запись.

localCheckpoint()
Копирует RDD/DataFrame в локальное временное хранилище executors (memory/disk), не трогая HDFS.
Тоже разрывает lineage, но без гарантии долговечности — если executor упадёт, локальный чекпоинт пропадёт.

Плюсы
Быстрее: не идёт запись в HDFS, нет сетевых операций.
Удобно для промежуточного сброса внутри одного Job, когда нужен разрыв DAG, но не критична надёжность.

Минусы
Ненадёжность: при падении executor-а данные потеряются, и lineage вернётся.
Локальность: используется только в рамках текущего приложения и пока жив executor.
checkpoint() — для постоянного сохранения и разрыва DAG-а в HDFS, когда нужен надёжный recovery.

localCheckpoint() — для быстрой оптимизации памяти внутри одного приложения, без записи в HDFS, но без долговечности.

How well did you know this?
1
Not at all
2
3
4
5
Perfectly
16
Q

А когда на driver нужно добавлять ресурсы? Есть такое вообще?

A

Если ты используешь .collect(), .toPandas(), .take(), .head() — всё возвращается на Driver
Если ты делаешь join и сам Driver строит план для большой схемы (особенно в SQL)
Если ты делаешь Spark с Pandas UDF → иногда обрабатывается на Driver, особенно при ошибках

В таких случаях:
–driver-memory 8g
–driver-cores 2
Driver это конечная точка входа. Когда мы вызываем action. Данные возвращаются на driver и если данных больше чем оперативы, то может упасть. если собрали 2гб, а на driver 1гб то можем упасть

17
Q

Что такое ленивые вычисления? Зачем они нужны? Как они работают?

A

Lazy evaluation – трансформации будут выполнены только в момент вызова Action (действия). Это нужно, чтобы можно было логически делить код на понятные шаги, а Spark “под капотом” менял порядок шагов и выполнял вместе. За это отвечает оптимизатор Catalyst

Такая оптимизация уменьшает количество чтений данных с диска, упрощает вычисления с сохранением логики, пропускает данные, которые не нужны для получения результата.

Трансформации преобразуют данные (агрегируют, фильтруют и пр.).

Действия возвращают результат (печатают в консоли первые 5 строк, сохраняют в файл и пр.).

18
Q

Когда на UI мы можем увидеть, что появилась новая джоба?

A

Как только Spark начинает выполнять action (действие) — например:
.count(), .collect(), .write(), .show(), .save()

любое другое действие, которое триггерит выполнение DAG

Трансформации (например, .filter(), .select(), .withColumn()) — ленивые и не создают job до тех пор, пока не вызван action.

Что мы видим в UI:
На вкладке Jobs появляется новая строка с ID (например, Job 1, Job 2).
В колонке Status видно RUNNING, потом SUCCEEDED или FAILED.
Вкладки Stages, Tasks, Executors показывают подробности о выполнении.

19
Q

У нас есть вкладка джоб внутри спарковая джоба. По какому правилу мы можем понять, пролистать код и что вот строчка и что после нее в UI будет новая джоба в вкладке джоб

A

Spark ленивый — трансформации (select, withColumn, filter, join, groupBy) только накапливаются.
Новая Job в UI появляется только когда ты вызываешь действие (action).

20
Q

Разница action и transformation? Как Spark делает? Допустим мы пишем джоин и появится ли джоба как написали джоин?

A

Строится даг, оптимизатором это оптимизируется, строится логически, оптимизировано логически, потом физический план
Мы построили даг. До момента того пока не был вызван action, у нас ничего не считается

По какому правилу в коде понять, где начнётся новая Job?
Как только ты видишь action, например:.collect()
.count()
.show()
.write.parquet()
.foreach()
.save()
С этого места Spark запускает новую Job, и она появится в UI во вкладке Jobs.

Transformation — ленивое преобразование данных (не исполняется сразу)
select, filter, map, withColumn, join, groupBy
Action — триггер на исполнение DAG
collect, count, show, write, take
Будет ли Job, если написать только join?
Нет.
Пока ты написал только df1.join(df2, …) — ничего не произойдёт.
Job появится только при вызове action.

21
Q

Что такое Adaptive Query Execution (AQE)?

A

Это механизм, который динамически оптимизирует план выполнения запроса во время выполнения, а не только при компиляции.

Что может делать AQE:
менять тип join’ов (например, переключиться с sort-merge на broadcast)
сливать мелкие shuffle partition’ы
перераспределять данные, если дисбаланс

Работает с Spark 3.x и выше. Помогает исправлять ошибки планирования, которые нельзя было предугадать до выполнения (например, неожиданный объем данных в join).