Spark Flashcards
Архитектура
Apache Spark
- Вычислительный движок и набор библиотек для
параллельной обработки большого объёма структурированных или неструктурированных данных
Написан на Java
Spark имеет несколько вариантов развёртывания, в основном используется либо YARN вместе с Hadoop или Kubernetes если Hadoop’а нет
Составляющие Spark:
Executor
- Процесс на узле/ноде(Worker’е) Спарка, который непосредственно выполняет таски(обрабатывает данные) и отчитывается драйверу о выполнении. Тасок может быть несколько. При этом каждая таска требует 1 ядро процессора для выполнения
Cluster manager
- Общий термин для систем управления ресурсами. В основном это YARN Resource Manager или Kubernetes(или что-то ещё), выдаёт спарку ресурсы(Процессор и оперативная память) под задачи
В случае запуска Spark Standalone
(используется для локального исполнения) - в качестве Cluster Manager’а может выступать Spark Master
Driver
- Процесс, “Голова” Спарка, здесь ваш код преобразуется в задачи, которые распределяются по узлам, отправляется запрос на ресурсы для обработки в Cluster Manager, создаёт SparkSession, строит DAG выполнения вашего запроса
Catalyst
- Оптимизатор, работает для DataFrame API или DataSet API, формирует логический план, выбирает джоины
Worker
- ** Узлы/ноды** . Физически - это отдельные сервера, на которых запущен один или несколько процессов Executor . Именно по ним распределяются загруженные из ** hdfs** /** s3** /откуда-угодно-ещё данные
У Spark есть несколько библиотек-ответвлений:Spark Streaming
- потоковая микробатчевая обработка позволяющая делать near real-time расчёты
Следующие 3, вероятно, вам никогда не пригодятся:GeoSpark
- библиотека для работы с гео-данными(Гексагоны!)MLlib
- подготовка и обработка данных для ML GraphX
- обработка графов
Имеет большое количество удобных интеграций с различными базами данных, объектными хранилищами и т.д.: JDBC, Кастомные opensource коннекторы(например, для GP), нативная поддержка S3 и HDFS
В основном запросы на Spark пишутся на Python, но также есть и Java/Scala - на них возможна работа с DataSet API
Трансформации, Действия, Ленивые вычисления
Трансформации преобразуют данные (агрегируют, фильтруют и пр.).
Действия возвращают результат (печатают в консоли первые 5 строк, сохраняют в файл и пр.).
Lazy evaluation
– трансформации будут выполнены только в момент вызова Action (действия). Это нужно, чтобы можно было логически делить код на понятные шаги, а Spark “под капотом” менял порядок шагов и выполнял вместе. За это отвечает оптимизатор Catalyst
Такая оптимизация уменьшает количество чтений данных с диска, упрощает вычисления с сохранением логики, пропускает данные, которые не нужны для получения результата.
Shuffle, partitionBy, широкие и узкие трансформации
Spark выполняет вычисления на кластере – группе серверов(нод/узлов), соединённых сетью. Передача данных по сети (shuffle) это дополнительный шаг, который нужен не для всех трансформаций. Но для того, чтобы вернуть правильный результат, иногда это необходимо
Например, прежде чем вычислять средний чек по магазинам за месяц, нужно все данные за этот один месяц переместить на один узел, а уже потом агрегировать. И так по каждому месяцу данные физически перераскладываются по разным серверам. Контролировать распределение данных можно через partitionBy
Широкие трансформации могут требовать передать данные по сети, узкие выполняются на одном узле над одной партицией
Зачастую оптимизация заключается в уменьшении числа широких операций до теоретического минимума
Широкие:Intersection
Distinct
GroupByKey
ReduceByKey
Join
Repartition
Узкие:Map
Filter
Union
Sample
Coalesce
(в случае уменьшения числа партиций, например до 1 партиции на ноду)
Что такое spill
Скорость спарка в основном завязана на вычислениях в оперативной памяти. Если данные не помещаются в ОЗУ, происходит spill
– сохранение промежуточных результатов на диск, который в разы медленнее оперативной памяти.
Может происходить как при вычислениях на executor’е, так и при сборе всех результатов на driver. Решение – увеличивать количество кусочков, на которые делятся данные (партиций) или увеличивать лимиты ОЗУ
Dataframe vs Dataset vs RDD
RDD (Resilient Distributed Dataset)
- Низкоуровневая, неизменяемая распределенная коллекция объектов
Существует с момента появления Spark
В основном используется для обработки неструктурированных типов данных, в случае же если данные структурированы лучше использовать DataFrame
DataFrame
- Высокоуровневая, табличная структура с именованными столбцами и схемой (похоже на таблицы в SQL СУБД или Pandas)
Добавили позднее, в Spark 1.3
Применяется в 90% случаев. Использует все главные фишки Spark’а, Catalyst и т.д., с ним просто работать по аналогии с Dataframe в Pandas или SQL таблицами. Умеет работать только с структурированными данными
Dataset
(только Scala/Java) - Прокачанный DataFrame, нужен для очень ресурсоёмких операций, с помощью него можно достичь максимальной оптимизации, но нельзя использовать Python
Был реализован в Spark 1.6
На текущий момент считается, что использовать только RDD - bad practice
Что происходит под капотом при запуске джобы
Пайплайн джобы на Spark:
- Driver принимает ваш код, запускает SparkSession, подключается к Cluster Manager’у, подрубает Executor’ы с учётом выделенных ресурсов, составляется DAG обработки данных
- Catalyst оптимизирует логический план, DAG разбивается на конкретные таски и стадии, таски распределяются по Executor’ам
- Executor’ы читают данные из источника и распределяет их между узлами(фильтры могут применяться уже тут)
- Executor’ы проводят все необходимые вычисления и возвращают данные на Driver
- Джоба заканчивается, ресурсы освобождаются(если не указан persist()), закрывается SparkSession
Coalesce vs Repartition
Так или иначе обе функции влияют на количество партиций, по которым будут распределены данные
Coalesce
- Уменьшает количество партиций, не может их увеличить. При этом не гарантирует равномерности распределения данных, может возникнуть Data Skew. Но при этом не вызывает Shuffle, из-за чего быстрее
Repartition
- Может как уменьшть количество партиций так и увеличивать. Гарантирует равномерность распределения данных, исключает Data Skew. Вызывает Shuffle из-за чего более медленнный
Coalesce
+Не вызывает Shuffle -> Быстрый
-Не может увеличивать кол-во партиций, только уменьшать
-Может возникнуть перекос данныхRepartition
+Может свободно менять количество партиций
+Гарантирует отсутствия возникновения перекоса после репартиционирования
-Вызывает Shuffle -> Медленный
Параллелизм
В Spark параллелизм достигается на 3-х уровнях:
С точки зрения кластера: Добавляем или уменьшаем количество Worker Nodes(серверов) - влияет на количество партиций данных
В рамках каждого сервера (Worker): Создаём множество(или один) Executor’ов с ядрами
На последнем уровне под каждое ядро выделяется своя таска на Executor’е
Таким образом у нас есть 3 основных фактора Параллелизма:
Количество ядер
процессора на каждом Executor
Количество Executor’ов
(процессов на серверах)
Количество Worker-ов
(серверов) - по ним делятся партиции данных
Оптимизация:
Соблюдаем баланс памяти и процессора, слишком много ядер могут просто перегрузить оперативную память, слишком малое же может наоборот неэффективно её расходовать (Google говорит использовать не менее 5 ядер на executor)
Избегаем data skew и shuffle в рамках партиций
Можно задавать дефолтный параллелизм или количество партиций с помощью:spark.sql.shuffle.partitions
spark.default.parallelism
- как я понял, для RDD, в случае DataFrame задаём partitions и всё
UDF
Пользовательские функции, написанные в Spark. Обычно плохо оптимизируются движком при анализе кода, являются плохой практикой. Старайся избегать в production коде, если на Python, только если никак нельзя обойтись стандартными методами и типами данных.
Проблема в том, что для исполнения Python UDF
нужно перенести данные к коду и вернуть обратно. Тот же Spark SQL обычно выполняется напрямую на данных (переносит код к данным), поэтому может быть в разы быстрее.
UDF на Scala спрашивают редко. Тоже будет медленнее встроенных трансформаций, но лучше, чем PySpark UDF, т.к. работает напрямую.
Как исправить ошибку OOM (Out of memory)
Ищем виновника, обычно их два:
Driver OOM:
- Собираем слишком много данных на драйвер через collect()
столько не собирать
ставить лимиты
выделить больше памяти - Делаем слишком тяжёлый Broadcast
Не делать таких джоинов
Поставить ограничение на размер Broadcast-нутой таблицы
Выделить больше памяти
Executor OOM:
Убедиться, что spark.executor.memoryOverhead задан (обычно 10% от executor-memory) - запас памяти для обеспечения надёжности операций
- Перекос данных
Увеличить число партиций: repartition()
Исправить data skew:
использовать Salting или Broadcast - Слишком мало партиций
Увеличить число партиций: repartition() (или default другой задать)
В теории ещё можно уменьшить количество ядер, использовать Checkpoint() для надёжности, Сделать Unpersist() если забыли, а также провести базовые логические оптимизации - фильтровать пораньше, группировать пораньше и т.д., как в SQL
Как бороться с перекосом данных (data skew)
Можно считать, что у нас перекос, если данных в одном узле больше чем в другом на 10%
Для решения этой проблемы есть В основном 4 варианта:
Salting
- находим ключ по которому распределены данные и видим сильный перекос в одном из узлов. Берём этот ключ с перекошенными данными и создаём из 1 ключа несколько новых путём добавления рандомных символов, перераспределяем(shuffle) с учётом новых “солёных” ключей, получаем более равномерное распределение данных.
В случае, если вы делаете salting ключа перед join-ом двух таблиц, то вам нужно посолить одинаковым способом ключ джоина в обоих таблицах и по новому посоленому ключу уже джоинить
Broadcast Join
- если мы джоиним маленькую табличку, то можем передать её целиком на каждый узел, по аналогии с Distributed Replicated в Greenplum. Уменьшает Shuffle и риск Data Skew соответственно
Repartition
- увеличение числа партиций и равномерное распределение по ним(вызывает Shuffle)
Избегать взаимодействий с кривыми ключами, где много null, дубликатов или ещё каких-то косяков
Параметры запуска джобы (spark submit)
Spark-submit
- операция, запускающая Spark-джобу, соответственно в ней же можно производить определённые настройки с:Выбором кластера:
–master: URL кластера (yarn, k8s://…, local[*], spark://host:port).
–deploy-mode: client или cluster
Управением ресурсами
–executor-memory: Память на Executor (например, 4g).
–executor-cores: Число ядер на Executor.
–driver-memory: Память драйвера (например, 2g).
–total-executor-cores: Общее число ядер в кластере (Standalone).Указыванием параллелизма
spark.sql.shuffle.partitions
spark.default.parallelism
Весь список можно посмотреть через spark-submit –help
Запоминать все параметры не надо, достаточно будет указать пару конкретных(мастер, дефолт partitions, driver-memory) и в целом, что можно сделать через spark-submit
Cache() и Persist()
Кэширование - процесс, необходимый в случае реализации повторяющихся задач, если какие-то данные ожидают множественного использования
Cache()
- Сохраняет данные в памяти. Если данные в память не поместятся, часть пересчитается перед новым вызовом
Persist()
- Позволяет использовать более гибкие настройки (Storage Level), для того, чтобы часть данных сохранить либо в памяти, либо на диске
Когда данных мало - используем Cache() и не паримся, если много - думаем как использовать Persist()
Чтобы не засорить кэшем память, когда закончили с повторяющимися задачами делаем
Unpersist()
- высвобождение кэша из памяти