SPARK Flashcards
(13 cards)
Архитектура
Spark имеет несколько вариантов развёртывания, в основном используется либо YARN вместе с Hadoop или Kubernetes если Hadoop’а нет
Составляющие Spark:
1)Executor - Процесс на узле/ноде(Worker’е) Спарка, который непосредственно выполняет таски(обрабатывает данные) и отчитывается драйверу о выполнении. Тасок может быть несколько. При этом каждая таска требует 1 ядро процессора для выполнения
2)Cluster manager - Общий термин для систем управления ресурсами. В основном это YARN Resource Manager или Kubernetes(или что-то ещё), выдаёт спарку ресурсы(Процессор и оперативная память) под задачи
В случае запуска Spark Standalone(используется для локального исполнения) - в качестве Cluster Manager’а может выступать Spark Master
3)Driver - Процесс, “Голова” Спарка, здесь ваш код преобразуется в задачи, которые распределяются по узлам, отправляется запрос на ресурсы для обработки в Cluster Manager, создаёт SparkSession, строит DAG выполнения вашего запроса
4)Catalyst - Оптимизатор, работает для DataFrame API или DataSet API, формирует логический план, выбирает джоины и т.д. Ещё существует Tungsten - Низкоуровневый оптимизатор, который получает на вход план от Catalyst и оптимизирует его ещё сильнее
5)Worker - ** Узлы/ноды** . Физически - это отдельные сервера, на которых запущен один или несколько процессов Executor . Именно по ним распределяются загруженные из ** hdfs** /** s3** /откуда-угодно-ещё данные
У Spark есть несколько библиотек-ответвлений:
Spark Streaming - потоковая микробатчевая обработка позволяющая делать near real-time расчёты
GeoSpark
MLlib
GraphX
Spark позволяет считать данные на нескольких кластерах (серверах) распределено и параллельно. Работает в оперативной памяти ( чем больше памяти, тем быстрее обрабатывает). Spark разбивает данные на кусочки (партиции) и раскидывает по разным серверам, где каждый сервер считывает свои кусочки данных (по этому это быстро)
Spark делит датасет на кусочки по 128 мб. Каждый кусок - партиция. Партиция летит на сервер. Сервер обрабатывает эти кусочки. Эти партиции бегают между серверами, где-то фильтруются, где-то джоинятся, где-то соединяются - происходит операция shuffle. Обработанные данные сохраняются в любой формат ( TXT, S3, Hadoop, Greenplum, Parquet)
Dataframe vs Dataset vs RDD
RDD (Resilient Distributed Dataset) - Низкоуровневая, неизменяемая распределенная коллекция объектов
В основном используется для обработки неструктурированных типов данных, в случае же если данные структурированы лучше использовать DataFrame
На текущий момент считается, что использовать только RDD - bad practice
DataFrame - Высокоуровневая, табличная структура с именованными столбцами и схемой (похоже на таблицы в SQL СУБД или Pandas)
Применяется в 90% случаев. Использует все главные фишки Spark’а, Catalyst и т.д., с ним просто работать по аналогии с Dataframe в Pandas или SQL таблицами. Умеет работать только с структурированными данными
Dataset (только Scala/Java) - Прокачанный DataFrame, нужен для очень ресурсоёмких операций, с помощью него можно достичь максимальной оптимизации, но нельзя использовать Python
Coalesce vs Repartition
Coalesce - Уменьшает количество партиций, не может их увеличить. При этом не гарантирует равномерности распределения данных, может возникнуть Data Skew. Но при этом не вызывает Shuffle, из-за чего быстрее
Repartition - Может как уменьшть количество партиций так и увеличивать. Гарантирует равномерность распределения данных, исключает Data Skew. Вызывает Shuffle из-за чего более медленнный
Coalesce
+Не вызывает Shuffle -> Быстрый
-Не может увеличивать кол-во партиций, только уменьшать
-Может возникнуть перекос данных
Repartition
+Может свободно менять количество партиций
+Гарантирует отсутствия возникновения перекоса после репартиционирования
-Вызывает Shuffle -> Медленный
Shuffle
Представьте себе:
У вас есть большая библиотека (ваши данные). Книги (данные) разбросаны по разным комнатам (рабочим узлам кластера).
Вам нужно собрать ВСЕ книги одного автора на одну полку. Например, все книги Толстого — в комнату 1, все книги Достоевского — в комнату 2 и т.д. Проблема: Книги Толстого изначально лежат не только в комнате 1, но и в комнатах 2, 3, 4… То же самое с другими авторами.
Сотрудники в каждой комнате (Executor’ы) начинают сортировать книги по авторам, которые есть у них.
Потом они отправляют книги Толстого в комнату 1, книги Достоевского в комнату 2, книги Пушкина в комнату 3 и т.д. через коридоры (сеть). Каждая комната получает книги только “своего” автора со всех остальных комнат.
Вот что такое Shuffle в Spark:
Это процесс физической перегруппировки данных между разными рабочими узлами (Executors) кластера.
Он происходит, когда для выполнения операции данные, относящиеся к одному “ключу” (key), должны оказаться вместе на одном узле.
Ключ (Key) — это то, по чему мы группируем или соединяем данные.
Shuffle = Пересылка данных между узлами по сети.
Происходит при операциях группировки (groupBy), соединения (join) и сортировки (orderBy).
Cache() и Persist()
Кэширование - процесс, необходимый в случае реализации повторяющихся задач, какие-то данные ожидают множественного использования
Cache() - Сохраняет данные в оперативной памяти. Если данные в память не поместятся, часть пересчитается перед новым вызовом
Persist() - гибкие настройка, для того, чтобы часть данных сохранить либо в памяти, либо на диске
Когда данных мало - используем Cache() и не паримся, если много - думаем как использовать Persist()
Чтобы не засорить кэшем память, когда закончили с повторяющимися задачами делаем :Unpersist() - высвобождение кэша из памяти
Основная идея (простыми словами)
Представьте, что вы работаете с большим набором данных:
Вы делаете сложные преобразования (фильтрация, агрегация, join) → это занимает 10 минут.
Потом вам нужно сделать 3 разных действия с результатом:
1.Сохранить в базу данных
2.Построить график
3.Отправить статистику в отчет
Без cache/persist: Spark будет пересчитывать всё с нуля каждый раз → 10 мин × 3 = 30 минут.
С cache/persist: Результат сохраняется в памяти/на диске → повторные действия берутся из кеша → общее время 10 мин + 1 мин + 1 мин = 12 минут.
Что такое spill
Spill – сохранение промежуточных результатов на диск, который в разы медленнее оперативной памяти.
Может происходить как при вычислениях на executor’е, так и при сборе всех результатов на driver. Решение – увеличивать количество кусочков, на которые делятся данные (партиций) или увеличивать лимиты ОЗУ
Трансформации, Действия, Ленивые вычисления
Трансформации преобразуют данные (агрегируют, фильтруют и пр.).
Действия возвращают результат (печатают в консоли первые 5 строк, сохраняют в файл и пр.).
Lazy evaluation – трансформации будут выполнены только в момент вызова Action (действия). Это нужно, чтобы можно было логически делить код на понятные шаги, а Spark “под капотом” менял порядок шагов и выполнял вместе. За это отвечает оптимизатор Catalyst
Такая оптимизация уменьшает количество чтений данных с диска, упрощает вычисления с сохранением логики, пропускает данные, которые не нужны для получения результата.
Широкие и узкие трансформации
Узкая трансформация = Работа в пределах одной полки (данные не нужно никуда переносить).
Широкая трансформация = Нужно собрать все книги одного автора со всех полок (требуется перемещение между полками).
Влияют на производительность, распределение данных и отказоустойчивость.
Широкие трансформации могут требовать передать данные по сети, узкие выполняются на одном узле над одной партицией
Зачастую оптимизация заключается в уменьшении числа широких операций до минимума
Широкие:
Intersection
Distinct
GroupByKey
ReduceByKey
Join
Repartition
Узкие:
Map
Filter
Union
Sample
Coalesce
Как бороться с перекосом данных (data skew)
Можно считать, что у нас перекос, если данных в одном узле больше чем в другом на 10%
1.Salting - находим ключ по которому распределены данные и видим сильный перекос в одном из узлов. Берём этот ключ с перекошенными данными и создаём из 1 ключа несколько новых путём добавления рандомных символов, перераспределяем(shuffle) с учётом новых “солёных” ключей, получаем более равномерное распределение данных.
Когда использовать: Для groupBy(), join(), distinct()
В случае, если вы делаете salting ключа перед join-ом двух таблиц, то вам нужно посолить одинаковым способом ключ джоина в обоих таблицах и по новому посоленому ключу уже джоинить
2.Broadcast Join - если мы джоиним маленькую табличку, то можем передать её целиком на каждый узел, по аналогии с Distributed Replicated в Greenplum. Уменьшает Shuffle и риск Data Skew соответственно
3.Repartition - увеличение числа партиций и равномерное распределение по ним(вызывает Shuffle)
4.Избегать взаимодействий с кривыми ключами, где много null, дубликатов или ещё каких-то косяков
Как исправить ошибку OOM (Out of memory)
Driver OOM:
Собираем слишком много данных на драйвер через collect()
1.столько не собирать
2.ставить лимиты
3.выделить больше памяти
Делаем слишком тяжёлый Broadcast
1.Не делать таких джоинов
2.Поставить ограничение на размер Broadcast-нутой таблицы
3.Выделить больше памяти
Executor OOM:
Убедиться, что spark.executor.memoryOverhead задан (обычно 10% от executor-memory) - запас памяти для обеспечения надёжности операций
Перекос данных:
1.Увеличить число партиций: repartition()
2.Исправить data skew: использовать Salting или Broadcast
Слишком мало партиций:
1.Увеличить число партиций: repartition() (или default другой задать)
Сделать Unpersist() если забыли, а также провести базовые логические оптимизации - фильтровать пораньше, группировать пораньше и т.д., как в SQL
Параллелизм**
В Spark параллелизм достигается на 3-х уровнях:
С точки зрения кластера: Добавляем или уменьшаем количество Worker Nodes(серверов) - влияет на количество партиций данных
В рамках каждого сервера (Worker): Создаём множество(или один) Executor’ов с ядрами
На последнем уровне под каждое ядро выделяется своя таска на Executor’е
Таким образом у нас есть 3 основных фактора Параллелизма:
Количество ядер процессора на каждом Executor
Количество Executor’ов(процессов на серверах)
Количество Worker-ов(серверов) - по ним делятся партиции данных
Оптимизация:
Соблюдаем баланс памяти и процессора, слишком много ядер могут просто перегрузить оперативную память, слишком малое же может наоборот неэффективно её расходовать (Google говорит использовать не менее 5 ядер на executor)
Избегаем data skew и shuffle в рамках партиций
Можно задавать дефолтный параллелизм или количество партиций с помощью:
spark.sql.shuffle.partitions
spark.default.parallelism - как я понял, для RDD, в случае DataFrame задаём partitions и всё