SPARK Flashcards

(13 cards)

1
Q

Архитектура

A

Spark имеет несколько вариантов развёртывания, в основном используется либо YARN вместе с Hadoop или Kubernetes если Hadoop’а нет

Составляющие Spark:

1)Executor - Процесс на узле/ноде(Worker’е) Спарка, который непосредственно выполняет таски(обрабатывает данные) и отчитывается драйверу о выполнении. Тасок может быть несколько. При этом каждая таска требует 1 ядро процессора для выполнения

2)Cluster manager - Общий термин для систем управления ресурсами. В основном это YARN Resource Manager или Kubernetes(или что-то ещё), выдаёт спарку ресурсы(Процессор и оперативная память) под задачи
В случае запуска Spark Standalone(используется для локального исполнения) - в качестве Cluster Manager’а может выступать Spark Master

3)Driver - Процесс, “Голова” Спарка, здесь ваш код преобразуется в задачи, которые распределяются по узлам, отправляется запрос на ресурсы для обработки в Cluster Manager, создаёт SparkSession, строит DAG выполнения вашего запроса

4)Catalyst - Оптимизатор, работает для DataFrame API или DataSet API, формирует логический план, выбирает джоины и т.д. Ещё существует Tungsten - Низкоуровневый оптимизатор, который получает на вход план от Catalyst и оптимизирует его ещё сильнее

5)Worker - ** Узлы/ноды** . Физически - это отдельные сервера, на которых запущен один или несколько процессов Executor . Именно по ним распределяются загруженные из ** hdfs** /** s3** /откуда-угодно-ещё данные

У Spark есть несколько библиотек-ответвлений:
Spark Streaming - потоковая микробатчевая обработка позволяющая делать near real-time расчёты
GeoSpark
MLlib
GraphX

How well did you know this?
1
Not at all
2
3
4
5
Perfectly
2
Q
A

Spark позволяет считать данные на нескольких кластерах (серверах) распределено и параллельно. Работает в оперативной памяти ( чем больше памяти, тем быстрее обрабатывает). Spark разбивает данные на кусочки (партиции) и раскидывает по разным серверам, где каждый сервер считывает свои кусочки данных (по этому это быстро)
Spark делит датасет на кусочки по 128 мб. Каждый кусок - партиция. Партиция летит на сервер. Сервер обрабатывает эти кусочки. Эти партиции бегают между серверами, где-то фильтруются, где-то джоинятся, где-то соединяются - происходит операция shuffle. Обработанные данные сохраняются в любой формат ( TXT, S3, Hadoop, Greenplum, Parquet)

How well did you know this?
1
Not at all
2
3
4
5
Perfectly
3
Q

Dataframe vs Dataset vs RDD

A

RDD (Resilient Distributed Dataset) - Низкоуровневая, неизменяемая распределенная коллекция объектов
В основном используется для обработки неструктурированных типов данных, в случае же если данные структурированы лучше использовать DataFrame
На текущий момент считается, что использовать только RDD - bad practice

DataFrame - Высокоуровневая, табличная структура с именованными столбцами и схемой (похоже на таблицы в SQL СУБД или Pandas)
Применяется в 90% случаев. Использует все главные фишки Spark’а, Catalyst и т.д., с ним просто работать по аналогии с Dataframe в Pandas или SQL таблицами. Умеет работать только с структурированными данными

Dataset (только Scala/Java) - Прокачанный DataFrame, нужен для очень ресурсоёмких операций, с помощью него можно достичь максимальной оптимизации, но нельзя использовать Python

How well did you know this?
1
Not at all
2
3
4
5
Perfectly
4
Q

Coalesce vs Repartition

A

Coalesce - Уменьшает количество партиций, не может их увеличить. При этом не гарантирует равномерности распределения данных, может возникнуть Data Skew. Но при этом не вызывает Shuffle, из-за чего быстрее

Repartition - Может как уменьшть количество партиций так и увеличивать. Гарантирует равномерность распределения данных, исключает Data Skew. Вызывает Shuffle из-за чего более медленнный

Coalesce
+Не вызывает Shuffle -> Быстрый
-Не может увеличивать кол-во партиций, только уменьшать
-Может возникнуть перекос данных
Repartition
+Может свободно менять количество партиций
+Гарантирует отсутствия возникновения перекоса после репартиционирования
-Вызывает Shuffle -> Медленный

How well did you know this?
1
Not at all
2
3
4
5
Perfectly
5
Q

Shuffle

A

Представьте себе:

У вас есть большая библиотека (ваши данные). Книги (данные) разбросаны по разным комнатам (рабочим узлам кластера).
Вам нужно собрать ВСЕ книги одного автора на одну полку. Например, все книги Толстого — в комнату 1, все книги Достоевского — в комнату 2 и т.д. Проблема: Книги Толстого изначально лежат не только в комнате 1, но и в комнатах 2, 3, 4… То же самое с другими авторами.

Сотрудники в каждой комнате (Executor’ы) начинают сортировать книги по авторам, которые есть у них.

Потом они отправляют книги Толстого в комнату 1, книги Достоевского в комнату 2, книги Пушкина в комнату 3 и т.д. через коридоры (сеть). Каждая комната получает книги только “своего” автора со всех остальных комнат.

Вот что такое Shuffle в Spark:
Это процесс физической перегруппировки данных между разными рабочими узлами (Executors) кластера.

Он происходит, когда для выполнения операции данные, относящиеся к одному “ключу” (key), должны оказаться вместе на одном узле.

Ключ (Key) — это то, по чему мы группируем или соединяем данные.
Shuffle = Пересылка данных между узлами по сети.

Происходит при операциях группировки (groupBy), соединения (join) и сортировки (orderBy).

How well did you know this?
1
Not at all
2
3
4
5
Perfectly
6
Q

Cache() и Persist()

A

Кэширование - процесс, необходимый в случае реализации повторяющихся задач, какие-то данные ожидают множественного использования

Cache() - Сохраняет данные в оперативной памяти. Если данные в память не поместятся, часть пересчитается перед новым вызовом

Persist() - гибкие настройка, для того, чтобы часть данных сохранить либо в памяти, либо на диске

Когда данных мало - используем Cache() и не паримся, если много - думаем как использовать Persist()

Чтобы не засорить кэшем память, когда закончили с повторяющимися задачами делаем :Unpersist() - высвобождение кэша из памяти

Основная идея (простыми словами)
Представьте, что вы работаете с большим набором данных:

Вы делаете сложные преобразования (фильтрация, агрегация, join) → это занимает 10 минут.

Потом вам нужно сделать 3 разных действия с результатом:

1.Сохранить в базу данных

2.Построить график

3.Отправить статистику в отчет

Без cache/persist: Spark будет пересчитывать всё с нуля каждый раз → 10 мин × 3 = 30 минут.
С cache/persist: Результат сохраняется в памяти/на диске → повторные действия берутся из кеша → общее время 10 мин + 1 мин + 1 мин = 12 минут.

How well did you know this?
1
Not at all
2
3
4
5
Perfectly
7
Q

Что такое spill

A

Spill – сохранение промежуточных результатов на диск, который в разы медленнее оперативной памяти.

Может происходить как при вычислениях на executor’е, так и при сборе всех результатов на driver. Решение – увеличивать количество кусочков, на которые делятся данные (партиций) или увеличивать лимиты ОЗУ

How well did you know this?
1
Not at all
2
3
4
5
Perfectly
8
Q

Трансформации, Действия, Ленивые вычисления

A

Трансформации преобразуют данные (агрегируют, фильтруют и пр.).

Действия возвращают результат (печатают в консоли первые 5 строк, сохраняют в файл и пр.).

Lazy evaluation – трансформации будут выполнены только в момент вызова Action (действия). Это нужно, чтобы можно было логически делить код на понятные шаги, а Spark “под капотом” менял порядок шагов и выполнял вместе. За это отвечает оптимизатор Catalyst

Такая оптимизация уменьшает количество чтений данных с диска, упрощает вычисления с сохранением логики, пропускает данные, которые не нужны для получения результата.

How well did you know this?
1
Not at all
2
3
4
5
Perfectly
9
Q

Широкие и узкие трансформации

A

Узкая трансформация = Работа в пределах одной полки (данные не нужно никуда переносить).

Широкая трансформация = Нужно собрать все книги одного автора со всех полок (требуется перемещение между полками).

Влияют на производительность, распределение данных и отказоустойчивость.
Широкие трансформации могут требовать передать данные по сети, узкие выполняются на одном узле над одной партицией
Зачастую оптимизация заключается в уменьшении числа широких операций до минимума

Широкие:
Intersection
Distinct
GroupByKey
ReduceByKey
Join
Repartition

Узкие:
Map
Filter
Union
Sample
Coalesce

How well did you know this?
1
Not at all
2
3
4
5
Perfectly
10
Q

Как бороться с перекосом данных (data skew)

A

Можно считать, что у нас перекос, если данных в одном узле больше чем в другом на 10%

1.Salting - находим ключ по которому распределены данные и видим сильный перекос в одном из узлов. Берём этот ключ с перекошенными данными и создаём из 1 ключа несколько новых путём добавления рандомных символов, перераспределяем(shuffle) с учётом новых “солёных” ключей, получаем более равномерное распределение данных.
Когда использовать: Для groupBy(), join(), distinct()

В случае, если вы делаете salting ключа перед join-ом двух таблиц, то вам нужно посолить одинаковым способом ключ джоина в обоих таблицах и по новому посоленому ключу уже джоинить

2.Broadcast Join - если мы джоиним маленькую табличку, то можем передать её целиком на каждый узел, по аналогии с Distributed Replicated в Greenplum. Уменьшает Shuffle и риск Data Skew соответственно

3.Repartition - увеличение числа партиций и равномерное распределение по ним(вызывает Shuffle)

4.Избегать взаимодействий с кривыми ключами, где много null, дубликатов или ещё каких-то косяков

How well did you know this?
1
Not at all
2
3
4
5
Perfectly
11
Q

Как исправить ошибку OOM (Out of memory)

A

Driver OOM:

Собираем слишком много данных на драйвер через collect()
1.столько не собирать
2.ставить лимиты
3.выделить больше памяти
Делаем слишком тяжёлый Broadcast
1.Не делать таких джоинов
2.Поставить ограничение на размер Broadcast-нутой таблицы
3.Выделить больше памяти

Executor OOM:

Убедиться, что spark.executor.memoryOverhead задан (обычно 10% от executor-memory) - запас памяти для обеспечения надёжности операций

Перекос данных:
1.Увеличить число партиций: repartition()
2.Исправить data skew: использовать Salting или Broadcast
Слишком мало партиций:
1.Увеличить число партиций: repartition() (или default другой задать)

Сделать Unpersist() если забыли, а также провести базовые логические оптимизации - фильтровать пораньше, группировать пораньше и т.д., как в SQL

How well did you know this?
1
Not at all
2
3
4
5
Perfectly
12
Q

Параллелизм**

A

В Spark параллелизм достигается на 3-х уровнях:

С точки зрения кластера: Добавляем или уменьшаем количество Worker Nodes(серверов) - влияет на количество партиций данных

В рамках каждого сервера (Worker): Создаём множество(или один) Executor’ов с ядрами

На последнем уровне под каждое ядро выделяется своя таска на Executor’е

Таким образом у нас есть 3 основных фактора Параллелизма:

Количество ядер процессора на каждом Executor

Количество Executor’ов(процессов на серверах)

Количество Worker-ов(серверов) - по ним делятся партиции данных

Оптимизация:

Соблюдаем баланс памяти и процессора, слишком много ядер могут просто перегрузить оперативную память, слишком малое же может наоборот неэффективно её расходовать (Google говорит использовать не менее 5 ядер на executor)

Избегаем data skew и shuffle в рамках партиций

Можно задавать дефолтный параллелизм или количество партиций с помощью:
spark.sql.shuffle.partitions
spark.default.parallelism - как я понял, для RDD, в случае DataFrame задаём partitions и всё

How well did you know this?
1
Not at all
2
3
4
5
Perfectly
13
Q
A
How well did you know this?
1
Not at all
2
3
4
5
Perfectly