Spark Flashcards
(18 cards)
❓Apache Spark là gì ?
Apache Spark là framework xử lý dữ liệu phân tán, thường được dùng cho dữ liệu lớn chạy in-memory → nhanh hơn Hadoop nhiều lần.
⚡ Vì sao Spark quan trọng?
- Nhanh: Xử lý dữ liệu trên RAM, không phải đĩa.
- Đa năng: Hỗ trợ batch, stream, ML, graph.
-
Đa ngôn ngữ: Viết code bằng Python, Scala, Java.
Dễ tích hợp: Kết nối với HDFS, S3, Kafka…
—
✅ Tóm gọn:
Spark = “Nhanh, mạnh, đa năng” → phù hợp cho mọi bài toán xử lý dữ liệu lớn.
❓ MapReduce là gì?
MapReduce là mô hình xử lý dữ liệu phân tán → chia nhỏ công việc lớn, chạy song song trên nhiều máy → ghép kết quả lại thành một.
⚙️ Cơ chế hoạt động:
- Chia nhỏ job lớn → thành task nhỏ
- Phân tán các task đó lên nhiều máy
- Tổng hợp kết quả từ nhiều máy thành 1 output duy nhất
🔧 Gồm 2 hàm chính:
🔹 Map
:
- Input: Dữ liệu thô từ người dùng
-
Output: Danh sách cặp
key-value
→ ví dụ: đếm từ → (word
, 1)
🔹 Reduce
:
-
Input: Output từ
Map
- Xử lý: Shuffle + Aggregate theo key
-
Output: Kết quả cuối cùng (ví dụ:
word → count
)
✅ Tóm gọn:
> MapReduce = Chia nhỏ (Map) + Gom nhóm + Tổng hợp (Reduce)
→ Giải pháp cũ nhưng nền tảng cho Big Data (như Hadoop).
❓ Lazy Evaluation là gì?
Trong quá trình làm việc với Spark, mình thấy cơ chế trì hoãn thực thi rất quan trọng để tối ưu hiệu suất và tiết kiệm tài nguyên. Thay vì thực thi từng phép biến đổi ngay lập tức, Spark gom tất cả các bước biến đổi (transformations) thành một kế hoạch xử lý (DAG) và chỉ thực thi khi gặp action.
Kinh nghiệm thực tế mình rút ra:
- Việc này giúp giảm thiểu việc tính toán thừa và I/O không cần thiết. Mình thường gom nhiều bước biến đổi lại rồi mới gọi action một lần để Spark tối ưu toàn bộ pipeline cùng lúc.
- Tránh gọi nhiều action trung gian như count()
, collect()
vì mỗi action sẽ kích hoạt thực thi lại toàn bộ pipeline, gây tốn thời gian và tài nguyên.
- Mình hay dùng df.explain()
và Spark UI để kiểm tra DAG và kế hoạch thực thi, từ đó phát hiện các điểm chưa tối ưu như shuffle nhiều hoặc filter không được pushdown.
- Catalyst Optimizer tự động tối ưu DAG, đẩy filter xuống nguồn dữ liệu, gộp các phép biến đổi liên tiếp, giúp giảm I/O và tăng tốc độ xử lý.
- Nhờ hiểu và tận dụng cơ chế này, mình đã cải thiện đáng kể hiệu suất các job ETL và truy vấn phức tạp, đồng thời tiết kiệm tài nguyên cluster.
Tóm lại, lazy evaluation không chỉ là khái niệm lý thuyết mà là công cụ thực tế giúp mình viết code Spark hiệu quả, dễ bảo trì và tối ưu tài nguyên trong môi trường xử lý dữ liệu lớn.
❓ Predicate Pushdown là gì ?
Predicate Pushdown là kỹ thuật mà Spark cố gắng đẩy điều kiện lọc xuống ngay tầng đọc dữ liệu (như Parquet, ORC, hoặc database) thay vì đọc toàn bộ dữ liệu rồi mới lọc. Điều này giúp giảm đáng kể lượng dữ liệu được tải vào bộ nhớ, tiết kiệm tài nguyên và tăng tốc độ xử lý.
Kinh nghiệm của mình:
- Trong các dự án thực tế, mình luôn ưu tiên viết các filter rõ ràng, đơn giản để Spark có thể pushdown được. Ví dụ, tránh dùng các hàm phức tạp hoặc UDF trong filter vì chúng thường không được hỗ trợ pushdown.
- Mình cũng thường kiểm tra Spark UI hoặc log để xác nhận filter có được đẩy xuống datasource hay không. Nếu không, mình sẽ điều chỉnh lại code hoặc cấu hình datasource.
- Một lưu ý quan trọng là predicate pushdown phát huy hiệu quả nhất khi kết hợp với partition pruning và sử dụng các định dạng file columnar như Parquet hoặc ORC.
- Khi làm việc với JDBC datasource, mình thường kiểm tra câu SQL sinh ra để đảm bảo filter được áp dụng ngay ở database, tránh tải dữ liệu thừa về Spark.
- Nhờ áp dụng predicate pushdown hợp lý, mình đã giảm đáng kể thời gian chạy các job ETL từ hàng giờ xuống còn vài chục phút, đồng thời giảm áp lực lên cluster.
Tóm lại, predicate pushdown không chỉ là một tính năng lý thuyết mà là một phần rất quan trọng trong tối ưu hiệu năng Spark, và kinh nghiệm thực tế cho thấy việc hiểu rõ cách hoạt động và áp dụng đúng sẽ giúp bạn tiết kiệm rất nhiều tài nguyên và thời gian.
❓Hệ Thống Xử Lý Dữ Liệu Phân Tán Spark Là Gì?
Cấu trúc Master – Worker – Cluster Manager: Kinh nghiệm thực tế dễ hiểu
- Master – Người chỉ huy trung tâm
- Master giống như “đội trưởng” của cả hệ thống, có nhiệm vụ giao việc cho các worker, theo dõi tiến độ, và xử lý khi có lỗi xảy ra.
- Trong thực tế, Master phải luôn “đứng mũi chịu sào”, nếu Master bị quá tải hoặc mất kết nối thì toàn bộ hệ thống sẽ bị ảnh hưởng, nên cần đảm bảo Master có cấu hình mạnh, có backup hoặc cơ chế dự phòng để tránh downtime.
- Master cũng cần có công cụ giám sát trực quan để đội vận hành dễ dàng biết được tình trạng các worker, task đang chạy đến đâu.
- Worker – Những người lính thực thi
- Worker là những máy tính thực sự làm việc, nhận việc từ Master rồi xử lý dữ liệu.
- Trong thực tế, worker có thể có cấu hình khác nhau, có node mạnh, node yếu; có node đang bận việc khác. Nên việc Master phân bổ công việc cần dựa trên khả năng thực tế của từng worker để tránh node yếu bị quá tải.
- Worker cần có khả năng báo lỗi kịp thời và tự động khởi động lại task khi gặp sự cố, đồng thời gửi báo cáo tiến độ về Master thường xuyên.
- Cluster Manager – Bộ điều phối tài nguyên thông minh
- Cluster Manager giống như “bộ não” quản lý toàn bộ tài nguyên (CPU, RAM, mạng) của các worker trong cluster.
- Nó giúp tự động khởi động hoặc tắt các worker, cân bằng tải, và phân bổ tài nguyên sao cho hiệu quả nhất.
- Trong thực tế, Cluster Manager giúp hệ thống linh hoạt mở rộng hoặc thu nhỏ quy mô nhanh chóng theo nhu cầu xử lý, đồng thời đảm bảo không có node nào bị thiếu hoặc thừa tài nguyên quá nhiều.
- Một số kinh nghiệm thực tế quan trọng
- Phân chia dữ liệu hợp lý: Dữ liệu nên được chia nhỏ vừa phải, không quá nhỏ gây overhead quản lý, cũng không quá lớn làm mất cân bằng tải.
- Giám sát tài nguyên liên tục: Cần có hệ thống giám sát để theo dõi CPU, RAM, I/O của từng worker và trạng thái task, từ đó điều chỉnh phân bổ công việc cho phù hợp.
- Xử lý lỗi nhanh: Khi worker bị lỗi hoặc mất kết nối, Master và Cluster Manager phải nhanh chóng phân phối lại công việc cho worker khác, tránh gián đoạn.
- Tối ưu mạng và lưu trữ: Đảm bảo dữ liệu được lưu trữ và truyền tải hiệu quả, giảm thiểu độ trễ mạng, ưu tiên xử lý dữ liệu ngay trên node chứa dữ liệu (data locality).
- Tự động hóa vận hành: Sử dụng container và công cụ orchestration giúp triển khai, cập nhật worker nhanh chóng, giảm lỗi vận hành.
Tóm lại
- Master: giao việc, giám sát, xử lý lỗi, cần mạnh và có dự phòng.
- Worker: thực thi công việc, cần giám sát tài nguyên, tự báo lỗi và tái khởi động task.
- Cluster Manager: quản lý tài nguyên, cân bằng tải, tự động mở rộng và thu nhỏ cluster.
Vận hành thành công hệ thống phân tán là sự phối hợp nhịp nhàng giữa ba thành phần này, đảm bảo công việc được xử lý nhanh, ổn định và linh hoạt theo nhu cầu thực tế.
Nếu ví von, hệ thống này giống như một đội bóng:
- Master là huấn luyện viên phân công chiến thuật,
- Worker là các cầu thủ trên sân thi đấu,
- Cluster Manager là ban huấn luyện và hậu cần đảm bảo cầu thủ có đủ sức khỏe và trang thiết bị để thi đấu tốt.
❓ Vòng đời ứng dụng Apache Spark
Dưới đây là phiên bản đã được format lại rõ ràng và dễ đọc hơn, giữ nguyên toàn bộ nội dung như bạn yêu cầu:
✅ Quy trình chạy ứng dụng Spark kèm kinh nghiệm thực tế
1. Submit ứng dụng
-
Lý thuyết:
Người dùng chạy lệnhspark-submit
để gửi ứng dụng lên cluster. Driver được tạo ra và gửi yêu cầu tài nguyên (CPU, RAM) tới Cluster Manager (YARN, Kubernetes, Mesos,…). -
Kinh nghiệm:
- Khi submit, mình thường chú ý cấu hình
--num-executors
,--executor-memory
,--executor-cores
sao cho phù hợp với workload, tránh lãng phí tài nguyên hoặc gây nghẽn. - Ngoài ra, mình hay dùng
--conf spark.dynamicAllocation.enabled=true
để cluster tự động điều chỉnh executor theo tải, giúp tiết kiệm tài nguyên. - Cần kiểm tra kỹ các tham số môi trường và dependency (jar, file config) để tránh lỗi runtime.
- Khi submit, mình thường chú ý cấu hình
2. Khởi tạo SparkSession và cấp phát Executors
-
Lý thuyết:
Driver khởi tạo SparkSession để thiết lập môi trường làm việc và yêu cầu Cluster Manager cấp phát các Executor. -
Kinh nghiệm:
- SparkSession là điểm bắt đầu cho mọi thao tác, mình thường cấu hình thêm các tham số như
spark.sql.shuffle.partitions
để tối ưu số lượng partitions cho shuffle, tránh gây tắc nghẽn hoặc làm việc không hiệu quả. - Khi chạy trên YARN, mình theo dõi logs của Driver để đảm bảo các executors được cấp phát đúng như cấu hình, tránh trường hợp thiếu tài nguyên gây delay hoặc crash.
- SparkSession là điểm bắt đầu cho mọi thao tác, mình thường cấu hình thêm các tham số như
3. Phân chia công việc (Job → Stage → Task)
-
Lý thuyết:
Khi gặp thao tác action (ví dụ:count()
,collect()
), Spark tạo job, chia thành stage và task.Trong Spark, mỗi job được tạo ra khi bạn gọi một action (nhưcount()
,collect()
,save()
…), và một job sẽ được chia thành nhiều stage.#### Số lượng stage trong một job phụ thuộc vào kiểu transformations bạn sử dụng:-
Narrow transformations (chuyển đổi hẹp) như
map()
,filter()
,union()
không yêu cầu shuffle dữ liệu giữa các partition, nên các bước này có thể thực thi trong cùng một stage. -
Wide transformations (chuyển đổi rộng) như
reduceByKey()
,groupByKey()
,join()
yêu cầu shuffle dữ liệu giữa các partition, tạo ra ranh giới stage (stage boundary) mới, tức là bắt đầu một stage mới.
- Một job có thể gồm nhiều stage.
- Mỗi stage là một chuỗi các transformations hẹp có thể thực thi liên tục mà không cần shuffle.
- Mỗi lần gặp một wide transformation (shuffle), Spark sẽ kết thúc stage hiện tại và bắt đầu stage mới.
map() → filter() → reduceByKey() → map() → collect()
, thì sẽ có 2 stage:- Stage 1:
map() → filter()
(narrow transformations) - Stage 2:
reduceByKey() → map()
(bắt đầu sau shuffle của reduceByKey)
-
Narrow transformations (chuyển đổi hẹp) như
-
Kinh nghiệm:
- Hiểu rõ cách Spark phân chia công việc giúp mình tối ưu code, ví dụ hạn chế shuffle không cần thiết (tránh join hoặc groupBy không cần thiết) vì shuffle rất tốn kém tài nguyên và thời gian.
- Mình thường debug bằng cách bật log ở mức DEBUG hoặc sử dụng UI Spark để xem DAG, giúp phát hiện bottleneck hoặc task bị phân bổ không đều (skew).
- Khi gặp data skew, mình áp dụng kỹ thuật như salting keys hoặc tăng số partitions để cân bằng tải.
4. Thực thi task trên Executor và gộp kết quả
-
Lý thuyết:
Driver gửi task đến Executor để xử lý song song, Executor thực thi transformations, tự động retry nếu lỗi, kết quả được gộp lại. -
Kinh nghiệm:
- Trong thực tế, mình hay gặp lỗi task fail do OOM (Out Of Memory) hoặc timeout, nên thường điều chỉnh
spark.executor.memory
hoặcspark.network.timeout
để phù hợp. - Việc sử dụng checkpoint hoặc persist đúng cách giúp giảm recomputation khi retry task, tăng hiệu quả.
- Với các action như
collect()
, mình rất hạn chế dùng trên tập dữ liệu lớn vì dễ gây tràn bộ nhớ Driver, thay vào đó ưu tiên dùngtake()
hoặc ghi trực tiếp ra storage. - Khi ghi dữ liệu, mình chọn format file phù hợp (Parquet, ORC) và phân vùng dữ liệu để tối ưu truy vấn sau này.
- Trong thực tế, mình hay gặp lỗi task fail do OOM (Out Of Memory) hoặc timeout, nên thường điều chỉnh
5. Kết thúc ứng dụng
-
Lý thuyết:
Driver gửi tín hiệu giải phóng tài nguyên, Executors dừng, tài nguyên trả lại cluster, ứng dụng kết thúc. -
Kinh nghiệm:
- Mình luôn theo dõi trạng thái ứng dụng qua Spark UI hoặc công cụ quản lý cluster để đảm bảo tài nguyên được giải phóng đúng cách, tránh rò rỉ tài nguyên gây ảnh hưởng đến các ứng dụng khác.
- Nếu ứng dụng chạy lâu hoặc có nhiều job liên tục, mình cân nhắc dùng Dynamic Allocation hoặc cấu hình timeout hợp lý để tránh giữ executor không cần thiết.
- Trong môi trường production, mình cũng xây dựng alert để phát hiện sớm các job bị treo hoặc lỗi, giúp giảm downtime.
🔍 Tóm lại:
Việc hiểu quy trình Spark là nền tảng, nhưng quan trọng hơn là kinh nghiệm vận hành, tối ưu và xử lý sự cố thực tế mới giúp bạn trở thành người dùng Spark chuyên nghiệp.
❓Cơ Chế Ưu Tiên Cấu Hình Trong Apache Spark ?
🔝 Thứ Tự Ưu Tiên Cấu Hình trong Spark
> Code > CLI > spark-defaults.conf
Đây là nguyên tắc vàng để xác định giá trị cấu hình nào được áp dụng cuối cùng khi chạy ứng dụng Spark.
1️⃣ SparkSession (Code level) – Ưu tiên cao nhất
- Cấu hình qua
.config()
khi tạoSparkSession
sẽ ghi đè toàn bộ cấu hình bên ngoài. - Dùng khi cần chắc chắn ứng dụng chạy đúng config bất kể môi trường.
📌 Ví dụ:
```python
spark = SparkSession.builder \
.appName(“App”) \
.config(“spark.executor.memory”, “8g”) \
.getOrCreate()
~~~
→ spark.executor.memory = 8g
sẽ vượt lên trên cả CLI và file config.
2️⃣ CLI (spark-submit
) – Ưu tiên trung bình
- Các option truyền trực tiếp bằng
--conf
khi submit app cũng ghi đèspark-defaults.conf
, nhưng thấp hơn config trong code.
📌 Ví dụ:
```bash
spark-submit \
–conf spark.executor.memory=6g \
my_app.py
~~~
→ Sẽ dùng 6g
trừ khi trong code đã đặt 8g
.
3️⃣ File cấu hình spark-defaults.conf
– Ưu tiên thấp nhất
- Chứa các cấu hình mặc định được Spark load khi khởi động
- Có thể đặt tại:
$SPARK_HOME/conf/spark-defaults.conf
📌 Ví dụ:
spark.executor.memory 4g spark.executor.cores 2
→ Chỉ áp dụng nếu không bị override bởi CLI hoặc code
⚠️ Ngoại lệ cần nhớ:
- Một số cấu hình liên quan đến deploy mode, master URL (VD:
--master
,--deploy-mode
) chỉ có thể đặt qua CLI, không thể override trong code. - Với Spark chạy trên YARN/Kubernetes, việc hiểu rõ ai kiểm soát tài nguyên là rất quan trọng để tránh xung đột.
🧠 Ghi nhớ nhanh:
Code > CLI > spark-defaults.conf
> 👉 Cấu hình càng gần code gốc → càng có quyền ưu tiên cao
❓4 Phương Thức Xử Lý Dữ Liệu Trong Spark là gì ?
1. RDD (Resilient Distributed Dataset)
Là tầng thấp nhất, nền tảng cốt lõi của Spark. RDD không có schema, rất linh hoạt, phù hợp khi bạn cần kiểm soát chi tiết, xử lý logic phức tạp hoặc streaming. Tuy nhiên, code thường dài, khó debug và dễ sai nếu chưa quen.
Dùng khi: Cần thao tác dữ liệu phức tạp, tùy chỉnh sâu.
2. DataFrame
Xây dựng trên RDD nhưng có schema cố định, cho phép Spark tự động tối ưu qua Catalyst Optimizer. API dễ dùng, hỗ trợ nhiều ngôn ngữ (Python, Scala, Java, R). Thích hợp cho ETL, phân tích dữ liệu dạng bảng, và pipeline machine learning.
Dùng khi: Làm ETL, exploratory analysis, xử lý dữ liệu có cấu trúc.
3. DataSet
Kết hợp ưu điểm của RDD (type safety) và DataFrame (hiệu suất cao). Có kiểm tra kiểu dữ liệu ngay khi compile (Scala/Java). Dùng Catalyst để tối ưu nhưng thêm tính an toàn kiểu dữ liệu.
Dùng khi: Viết ứng dụng Scala/Java cần performance và type safety.
4. Spark SQL
Cho phép truy vấn dữ liệu bằng cú pháp SQL quen thuộc, tích hợp tốt với các công cụ BI qua JDBC/ODBC. Hỗ trợ dữ liệu có cấu trúc và bán cấu trúc (JSON, Parquet), tận dụng Catalyst và AQE để tối ưu.
Dùng khi: Team quen SQL, làm báo cáo, truy vấn ad-hoc, trực quan hóa dữ liệu.
Tóm tắt nhanh:
- RDD: linh hoạt, kiểm soát thấp, phức tạp
- DataFrame: dễ dùng, hiệu suất cao, tự tối ưu
- DataSet: type-safe, hiệu suất, compile-time check
- Spark SQL: SQL quen thuộc, tích hợp BI, dễ truy vấn
❓Catalyst Optimizer là gì ?
Catalyst Optimizer là bộ máy tối ưu truy vấn cốt lõi của Spark SQL, giúp biến các câu lệnh SQL hoặc DataFrame thành kế hoạch thực thi hiệu quả mà không cần phải chỉnh tay. Từ kinh nghiệm thực tế, Catalyst giúp mình tiết kiệm rất nhiều thời gian và tài nguyên khi xử lý dữ liệu lớn.
Quá trình tối ưu của Catalyst gồm 4 bước chính:
-
Parsing & Analysis
Khi mình viết câu lệnh SQL hoặc DataFrame, Spark sẽ chuyển nó thành một cây kế hoạch logic chưa rõ ràng (unresolved logical plan). Catalyst sẽ phân tích, xác định schema, tên bảng, cột, hàm dựa trên catalog để tạo ra kế hoạch logic đã được resolve, đảm bảo câu lệnh hợp lệ và đầy đủ thông tin. -
Logical Optimization
Ở bước này, Catalyst áp dụng hàng loạt quy tắc (rule-based) để đơn giản hóa và tối ưu kế hoạch logic. Ví dụ:- Predicate Pushdown: Đẩy điều kiện lọc xuống gần nguồn dữ liệu để giảm lượng dữ liệu đọc vào. Mình hay thấy nó giúp giảm đáng kể I/O khi đọc file Parquet hoặc ORC.
- Column Pruning: Chỉ đọc những cột cần thiết, tránh đọc dữ liệu thừa.
- Constant Folding: Tính trước các biểu thức hằng số để giảm tính toán khi chạy.
- Filter Simplification: Đơn giản hóa các điều kiện filter phức tạp.
- Join Reordering: Sắp xếp lại thứ tự join để giảm shuffle và data movement.
-
Physical Planning
Catalyst lựa chọn thuật toán thực thi phù hợp như Broadcast Hash Join, Sort Merge Join hay Nested Loop Join dựa trên chi phí ước tính (cost-based). Đây là bước quyết định hiệu suất thực tế của truy vấn. -
Code Generation
Cuối cùng, Catalyst sử dụng kỹ thuật whole-stage code generation, biên dịch truy vấn thành bytecode Java tối ưu, giúp tận dụng CPU hiệu quả hơn.
Kinh nghiệm thực tế của mình với Catalyst:
- Catalyst giúp tự động tối ưu rất nhiều, mình chỉ cần viết truy vấn rõ ràng, Catalyst sẽ lo phần tối ưu phức tạp.
- Khi làm việc với dữ liệu lớn, việc Catalyst đẩy filter xuống nguồn dữ liệu và chỉ đọc cột cần thiết giúp giảm I/O rất nhiều, tăng tốc độ xử lý.
- Catalyst còn giúp tối ưu join tự động, đặc biệt là reorder join và chọn thuật toán join phù hợp, giúp giảm shuffle và tăng hiệu suất.
- Mình cũng tận dụng tính năng cost-based optimization (CBO) bằng cách cập nhật thống kê dữ liệu để Catalyst chọn kế hoạch tốt hơn.
Tóm lại, Catalyst Optimizer là “bộ não” giúp Spark SQL chạy nhanh và hiệu quả mà không cần mình phải tối ưu thủ công từng bước. Hiểu được cách Catalyst hoạt động giúp mình viết truy vấn tốt hơn và biết cách tận dụng các tính năng của Spark.
❓AQE là gì?
Adaptive Query Execution (AQE) là một tính năng rất hữu ích mà mình thường bật khi chạy Spark, đặc biệt với các pipeline dữ liệu lớn và phức tạp. Thay vì dựa hoàn toàn vào thống kê tĩnh trước khi chạy, AQE cho phép Spark tự động điều chỉnh kế hoạch thực thi dựa trên dữ liệu thực tế thu thập được trong quá trình chạy.
Từ kinh nghiệm thực tế, AQE giúp mình giải quyết được nhiều vấn đề phổ biến như:
- Chuyển đổi join động: Ví dụ, Spark có thể chọn Sort Merge Join ban đầu vì dự đoán bảng lớn, nhưng khi chạy thực tế, nếu bảng nhỏ hơn nhiều do filter, AQE sẽ tự chuyển sang Broadcast Hash Join nhanh hơn, tiết kiệm shuffle và tài nguyên.
- Gộp partition nhỏ: Trước đây, nhiều partition nhỏ gây overhead lớn cho scheduler và tăng áp lực bộ nhớ. AQE tự động gộp các partition nhỏ lại sao cho mỗi partition có kích thước hợp lý, giảm số lượng task và tăng hiệu suất.
- Xử lý data skew: Khi phát hiện một số partition quá lớn, AQE sẽ chia nhỏ hoặc nhân bản các task này để tránh nghẽn executor, giảm tình trạng treo job hoặc disk spill.
Điểm mạnh của AQE là giúp giảm thiểu việc phải điều chỉnh thủ công các tham số cấu hình và giảm rủi ro do dữ liệu thay đổi hoặc thống kê không chính xác. Mình thường bật AQE bằng cách set spark.sql.adaptive.enabled=true
và theo dõi qua Spark UI để đảm bảo các tối ưu được áp dụng hiệu quả.
Tóm lại, AQE giúp Spark trở nên “thông minh” hơn, tự động thích nghi với dữ liệu và workload thực tế, từ đó cải thiện hiệu suất và độ ổn định của pipeline mà không cần nhiều tuning thủ công.
❓Sort Merge Join (SMJ) là gì ?
Khi làm việc với join trong Spark, đặc biệt là với dữ liệu lớn, mình thường gặp và sử dụng nhiều nhất là Sort Merge Join (SMJ). Đây là chiến lược join mặc định khi bảng không đủ nhỏ để dùng broadcast join.
Cách SMJ hoạt động thực tế như sau:
- Đầu tiên, Spark sẽ shuffle cả hai bảng theo khóa join để đảm bảo các bản ghi có cùng key nằm chung một partition. Đây là bước tốn tài nguyên nhất vì dữ liệu phải truyền qua mạng.
- Sau shuffle, trong mỗi partition, Spark sẽ sort dữ liệu theo khóa join. Việc sort này giúp dữ liệu được sắp xếp thứ tự, tạo điều kiện thuận lợi cho bước tiếp theo.
- Tiếp theo, Spark dùng thuật toán merge với 2 con trỏ chạy song song trên 2 bảng đã sort. Nếu key của bảng A nhỏ hơn bảng B, con trỏ A tiến, ngược lại con trỏ B tiến, còn nếu bằng nhau thì ghép bản ghi và cả hai con trỏ cùng tiến. Thuật toán này rất hiệu quả vì chỉ cần quét một lần, giảm độ phức tạp từ O(n*m) xuống O(n+m).
Kinh nghiệm thực tế khi dùng SMJ:
- Mình luôn chú ý tối ưu số lượng partition shuffle (
spark.sql.shuffle.partitions
) để tránh tạo quá nhiều task nhỏ hoặc task quá lớn, giúp cân bằng tải và tăng tốc độ join. - Nếu phát hiện data skew (một số key có lượng dữ liệu quá lớn), mình sẽ áp dụng kỹ thuật như salting hoặc điều chỉnh partition để phân phối đều hơn, tránh executor bị nghẽn.
- SMJ phù hợp với các join quy mô lớn, các kiểu join inner, left, right, full outer, và hỗ trợ tốt các trường hợp dữ liệu không thể broadcast.
- Mình thường theo dõi Spark UI để kiểm tra chi tiết các bước shuffle, sort, merge, từ đó phát hiện các bottleneck và điều chỉnh cấu hình hoặc logic xử lý.
Tóm lại:
- Sort Merge Join là giải pháp mặc định, ổn định và scalable cho join dữ liệu lớn trong Spark.
- Nó bao gồm shuffle dữ liệu theo key, sort trong partition và merge bằng thuật toán 2 con trỏ.
- Cần tối ưu partition, xử lý data skew và theo dõi Spark UI để đảm bảo hiệu suất.
❓ Giải Thích Ngắn Gọn, Kỹ Thuật, Dễ Nhớ về Spark Join ?
🔹 1. Build Side
- Định nghĩa: Bảng nhỏ hơn trong phép join, được Spark load vào bộ nhớ để tạo HashedRelation.
- Vai trò: Dữ liệu này sẽ được broadcast hoặc giữ trong bộ nhớ để tra cứu nhanh khi join.
-
Dễ nhớ:
Build side = bảng nhỏ
, là nơi “xây dựng” bảng băm.
🔹 2. Stream Side
- Định nghĩa: Bảng lớn hơn, được scan từng dòng để dò tìm (probe) vào bảng băm.
- Vai trò: Là bảng “được dò” để ghép dữ liệu.
-
Dễ nhớ:
Stream side = bảng lớn
, “chạy từng dòng để tìm”.
🔹 3. Probe
- Định nghĩa: Hành động tra từng dòng của stream side vào bảng băm từ build side.
- Vai trò: Là bước chính để thực hiện join hiệu quả.
-
Dễ nhớ:
Probe = dò dòng stream vào bảng băm
.
🔹 4. HashedRelation
- Định nghĩa: Là bảng băm được tạo từ build side theo khóa join.
- Vai trò: Cho phép tra cứu cực nhanh khi join.
-
Dễ nhớ:
HashedRelation = bảng băm trong bộ nhớ
.
🔹 5. Shuffle
- Định nghĩa: Quá trình phân phối lại dữ liệu theo key để các bản ghi giống nhau nằm chung partition.
- Vai trò: Đảm bảo dữ liệu join được local mà không phải qua mạng.
-
Dễ nhớ:
Shuffle = xáo trộn để gom key lại
.
🔹 6. Broadcast
- Định nghĩa: Gửi toàn bộ bảng nhỏ từ driver đến mọi executor.
- Vai trò: Tránh shuffle bảng lớn, giúp join nhanh.
-
Dễ nhớ:
Broadcast = phát sóng bảng nhỏ
.
🔹 7. Equi-Join
-
Định nghĩa: Join dựa trên điều kiện bằng (
a.id = b.id
). -
Dễ nhớ:
Equi-Join = join bằng nhau
.
🔹 8. Non-Equi Join
-
Định nghĩa: Join với điều kiện khác bằng, như
a.value > b.value
. -
Dễ nhớ:
Non-Equi = join không bằng
.
🔹 9. Local Shuffle Reader
- Định nghĩa: Đọc dữ liệu shuffle từ đĩa local thay vì qua mạng.
-
Dễ nhớ:
Local Shuffle Reader = đọc shuffle tại chỗ
.
🔹 10. Data Skew
- Định nghĩa: Dữ liệu phân bố không đều, một số key quá nhiều bản ghi.
-
Dễ nhớ:
Data Skew = lệch dữ liệu
, gây nghẽn.
🔹 11. Shuffle Partition Coalescing
- Định nghĩa: Gộp các partition nhỏ sau shuffle thành partition lớn.
-
Dễ nhớ:
Coalescing = gộp nhỏ thành lớn
.
🧠 Vì Sao Sort
Giúp Merge Join
Nhanh Hơn?
- Sau shuffle, Spark sort dữ liệu theo khóa join trong từng partition.
- Spark sử dụng 2 con trỏ duyệt qua 2 bảng đã sort:
- Nếu
key A < key B
→ tiến con trỏ A - Nếu
key B < key A
→ tiến con trỏ B - Nếu
key A = key B
→ ghép bản ghi & cả hai cùng tiến
- Nếu
- Đây là thuật toán giống merge trong merge sort, chỉ cần quét một lần.
- Giảm độ phức tạp từ O(n * m) → O(n + m) ⇒ tiết kiệm đáng kể CPU và RAM.
❓Executor Memory ?
- Executor Memory 4GB → Thực tế dùng ~3.8GB (trừ 300MB reserved).
-
Memory chia làm 2 phần:
- Unified Memory (60%): Dùng cho execution (join, shuffle) và storage (cache, broadcast).
- User Memory (40%): Dùng cho UDF, biến tạm, PySpark subprocess.
-
Vấn đề phổ biến:
- OOM khi join lớn → tăng executor.memory hoặc giảm storageFraction (ví dụ 0.3).
- Cache bị mất → tăng storageFraction (ví dụ 0.7), dùng MEMORY_AND_DISK_SER.
- PySpark OOM dù heap còn → tăng executor.memoryOverhead ≥ 1024MB.
- GC nhiều → bật off-heap memory.
- Shuffle spill → giảm shuffle partitions, xử lý data skew.
-
Best practices:
- Join-heavy: ưu tiên execution memory (giảm storageFraction).
- Cache-heavy: ưu tiên storage memory (tăng storageFraction).
- PySpark: tăng memoryOverhead.
- Giảm shuffle partitions nếu shuffle nhỏ.
- Luôn theo dõi Spark UI để phát hiện bottleneck.
- Gọi
.unpersist()
sau khi dùng cache.
❓ Disk Spill là gì ?
Disk Spill trong Spark xảy ra khi bộ nhớ RAM không đủ để xử lý các tác vụ như shuffle, sort, join, groupBy… nên Spark buộc phải ghi dữ liệu tạm xuống đĩa, gây giảm hiệu suất nghiêm trọng.
Từ kinh nghiệm của mình, để giảm thiểu disk spill, mình thường làm những việc sau:
-
Tăng dung lượng RAM cho executor
Mình điều chỉnh tham sốspark.executor.memory
sao cho đủ lớn để chứa dữ liệu trung gian. Nếu cluster có thể mở rộng, mình cũng cân nhắc tăng số node hoặc core để có nhiều bộ nhớ hơn. Ngoài ra, mình ưu tiên dùng Kryo Serializer thay vì Java Serializer để giảm kích thước object trong bộ nhớ. -
Tối ưu partition dữ liệu
Mình luôn đảm bảo mỗi partition có kích thước hợp lý, thường khoảng 128MB. Partition quá lớn dễ gây thiếu bộ nhớ, partition quá nhỏ lại tạo overhead nhiều task nhỏ, làm tăng khả năng disk spill. Việc này giúp cân bằng tải trên các executor, giảm nguy cơ spill. -
Giảm shuffle không cần thiết
Mình hạn chế dùng các phép toán wide transformation phức tạp không cần thiết, hoặc tối ưu logic để giảm shuffle, ví dụ dùngbroadcast join
khi join với bảng nhỏ, tránh shuffle toàn bộ dữ liệu. -
Xử lý data skew
Khi phát hiện data skew (một số partition quá lớn), mình áp dụng kỹ thuật salting để phân tán dữ liệu đều hơn, tránh executor bị quá tải và phải spill. -
Dùng
coalesce()
sau filter
Sau khi lọc dữ liệu giảm kích thước, mình dùngcoalesce()
để giảm số partition, tránh tạo nhiều partition trống gây lãng phí tài nguyên và overhead scheduling, đồng thời giảm áp lực GC. -
Quản lý output file
Trước khi ghi file, mình dùngrepartition()
để tránh tạo nhiều file nhỏ (small files) gây overhead cho hệ thống lưu trữ như HDFS hoặc S3. Đồng thời, mình dùngpartitionBy()
khi ghi file để phân vùng dữ liệu theo key logic (ví dụ ngày, vùng địa lý), giúp truy vấn downstream hiệu quả hơn. -
Theo dõi và tuning qua Spark UI
Mình thường xuyên theo dõi các chỉ số spill (disk và memory) trong Spark UI để phát hiện sớm bottleneck. Nếu thấy spill nhiều, mình sẽ điều chỉnh lại cấu hình bộ nhớ, partition hoặc logic xử lý.
Tóm lại, disk spill là dấu hiệu hệ thống thiếu bộ nhớ hoặc thao tác xử lý chưa tối ưu. Kinh nghiệm của mình là kết hợp tăng bộ nhớ executor, tối ưu partition, giảm shuffle, xử lý skew và quản lý output file để giảm spill, từ đó cải thiện hiệu suất và ổn định pipeline Spark.
❓Tối Ưu Hóa Repartition trong Apache Spark ?
Giảm skew khi groupBy
Khi xử lý dữ liệu lớn từ 100GB đến 1TB, kinh nghiệm của mình là luôn bắt đầu bằng việc giảm thiểu dữ liệu xử lý càng sớm càng tốt. Mình thường dùng filter()
ngay từ đầu pipeline để loại bỏ dữ liệu không cần thiết, tránh cho Spark phải shuffle và tính toán trên dữ liệu thừa, điều này giúp tiết kiệm rất nhiều tài nguyên và thời gian. Đồng thời, mình ưu tiên lưu dữ liệu ở định dạng columnar như Parquet hoặc ORC vì chúng giảm kích thước file vật lý và giúp Spark đọc dữ liệu hiệu quả hơn.
Về phân vùng dữ liệu, mình không để mặc định mà điều chỉnh số lượng partition sao cho mỗi partition có kích thước khoảng 128MB, vừa đủ để tận dụng tối đa tài nguyên mà không gây overhead do nhiều file nhỏ. Khi thực hiện join giữa hai dataset, mình luôn kiểm tra xem chúng đã được phân vùng theo cùng key chưa. Nếu chưa, mình chủ động dùng repartition(key)
để đồng bộ partition, tránh shuffle toàn bộ dữ liệu khi join. Ví dụ, với join theo user_id
, mình sẽ repartition cả hai dataset theo user_id
trước khi join.
Một vấn đề mình hay gặp là data skew, đặc biệt khi dùng các phép toán như groupBy
hoặc flatMap
. Khi đó, một số partition có thể quá tải dữ liệu, làm chậm toàn bộ job. Mình xử lý bằng kỹ thuật salting, tức là thêm một giá trị ngẫu nhiên vào key để phân tán dữ liệu đều hơn, sau đó mới groupBy theo key gốc. Ngoài ra, khi cần giảm số partition, mình ưu tiên dùng coalesce()
thay vì repartition()
để tránh shuffle toàn phần gây tốn kém.
Về cấu hình Spark, mình không để mặc định spark.sql.shuffle.partitions
là 200 mà điều chỉnh dựa trên kích thước dữ liệu và tài nguyên cluster. Công thức mình hay dùng là lấy giá trị lớn nhất giữa số partition dựa trên kích thước dữ liệu (khoảng 128MB/partition) và số core nhân đôi của cluster. Ví dụ, với 10GB dữ liệu và cluster có 40 core, mình đặt khoảng 80 partition để cân bằng hiệu suất và tránh overhead.
Cuối cùng, mình luôn theo dõi hiệu suất qua Spark UI và các công cụ giám sát như CloudWatch để phát hiện sớm các bottleneck như executor bị thiếu bộ nhớ, task chạy lâu hoặc shuffle quá nhiều. Từ đó, mình điều chỉnh lại cấu hình tài nguyên, tối ưu logic xử lý hoặc phân vùng lại dữ liệu cho phù hợp.
Tóm lại, kinh nghiệm của mình là giảm dữ liệu thừa sớm, tối ưu phân vùng và join, xử lý data skew bằng salting, điều chỉnh tham số shuffle phù hợp và giám sát chặt chẽ trong quá trình chạy để đảm bảo pipeline vừa ổn định vừa tối ưu chi phí.
❓ Tối Ưu Hóa Apache Spark Cho Dữ Liệu Lớn Trên 100GB ?
-
Phát triển và test trên local với dữ liệu mẫu nhỏ trước
Mình thường chạy Spark ở chế độ local trên máy dev với subset dữ liệu để debug logic và test các bước xử lý. Việc này giúp nhanh chóng phát hiện lỗi, thay vì chạy thẳng trên cluster tốn kém. -
Triển khai trên cluster nhỏ hoặc môi trường staging
Sau khi logic ổn, mình deploy lên cluster nhỏ (ví dụ EMR 3-5 node) để test với dữ liệu lớn hơn, đồng thời tối ưu cấu hình Spark như số lượng executor, bộ nhớ, số core. Thường mình điều chỉnhspark.sql.shuffle.partitions
để tránh quá nhiều hoặc quá ít partition gây tắc nghẽn. -
Chia dữ liệu hợp lý, tránh skew
Kinh nghiệm mình thấy là dữ liệu cần được phân vùng (partition) theo key có tính phân tán tốt, tránh trường hợp một executor phải xử lý quá nhiều dữ liệu (data skew), gây chậm toàn bộ job. Nếu phát hiện skew, mình sẽ dùng kỹ thuật như salting key hoặc repartition lại. -
Tận dụng caching và broadcast join khi phù hợp
Với bảng nhỏ, mình thường broadcast để tránh shuffle dữ liệu lớn, giúp join nhanh hơn. Còn với dữ liệu trung gian được dùng nhiều lần, mình cache hoặc persist để giảm thời gian tính toán lại. -
Theo dõi và điều chỉnh tài nguyên qua Spark UI và CloudWatch
Trong quá trình chạy, mình luôn theo dõi executor memory usage, task duration, shuffle read/write để phát hiện bottleneck. Ví dụ nếu thấy nhiều task bị GC quá lâu hoặc executor bị OOM, mình sẽ tăngspark.executor.memory
hoặc giảm kích thước partition. -
Sử dụng định dạng dữ liệu columnar như Parquet và Delta Lake
Mình ưu tiên lưu dữ liệu dưới dạng Parquet để giảm dung lượng và tăng tốc truy vấn. Nếu dự án có yêu cầu versioning hoặc truy vấn phức tạp, mình dùng Delta Lake để tận dụng tính năng transaction và time travel. -
Tối ưu cấu hình Spark theo workload
Mỗi job có đặc thù khác nhau, mình không dùng cấu hình cứng mà thường test và điều chỉnh tham số nhưspark.executor.instances
,spark.executor.cores
,spark.memory.fraction
dựa trên đặc điểm dữ liệu và tài nguyên cluster.
Tóm lại, mình luôn bắt đầu từ local với dữ liệu nhỏ để phát triển, rồi test trên cluster nhỏ để tối ưu, tránh chạy trực tiếp trên cluster lớn với dữ liệu thật vì rất tốn kém và khó debug. Việc phân vùng dữ liệu, tối ưu join, caching và giám sát tài nguyên là những điểm mình chú trọng để xử lý dữ liệu lớn hiệu quả.
Shuffle Hash Join (SHJ) là gì ?
Shuffle Hash Join (SHJ) là chiến lược join được sử dụng khi cả hai bảng đều có kích thước trung bình hoặc lớn, không đủ nhỏ để broadcast. Đây là giải pháp nằm giữa Broadcast Hash Join và Sort Merge Join, giúp giảm chi phí sort nhưng vẫn cần shuffle dữ liệu.
Cách hoạt động thực tế của SHJ:
- Spark sẽ shuffle cả hai bảng theo khóa join để các bản ghi có cùng key được gom về cùng partition trên các executor.
- Trong mỗi partition, Spark sẽ chọn bảng nhỏ hơn (theo partition) để xây dựng bảng băm (HashedRelation) trong bộ nhớ.
- Sau đó, Spark sẽ quét (probe) từng dòng của bảng lớn hơn trong partition đó, dò tìm key tương ứng trong bảng băm để ghép dữ liệu.
- Quá trình này diễn ra song song trên tất cả partition và executor, tận dụng tốt khả năng phân tán của cluster.
Kinh nghiệm thực tế khi dùng SHJ:
- SHJ rất hiệu quả khi dữ liệu có key phân bố đều và mỗi partition của bảng nhỏ có thể vừa vặn trong bộ nhớ executor. Nếu partition build side quá lớn, dễ gây OOM hoặc phải spill xuống đĩa, làm giảm hiệu suất.
- Mình thường theo dõi kỹ kích thước partition và bộ nhớ executor để đảm bảo các partition build side không vượt quá khả năng xử lý.
- Nếu phát hiện data skew, mình áp dụng kỹ thuật như salting hoặc tăng số partition (
spark.sql.shuffle.partitions
) để phân phối dữ liệu đều hơn, tránh executor bị nghẽn. - SHJ không cần sort dữ liệu trong partition, nên giảm được chi phí CPU so với Sort Merge Join, giúp tiết kiệm thời gian chạy job.
- Tuy nhiên, SHJ vẫn cần shuffle dữ liệu qua mạng, nên nếu workload bị giới hạn bởi băng thông mạng, hiệu quả có thể giảm.
- Mình cũng lưu ý SHJ chỉ hỗ trợ equi-join và các kiểu join phổ biến như inner, left, right, semi, anti join. Với non-equi join hoặc cross join, Spark sẽ dùng các phương pháp khác.
- Để kích hoạt SHJ, có thể dùng hint
hint("SHUFFLE_HASH")
hoặc điều chỉnh cấu hìnhspark.sql.join.preferSortMergeJoin
thànhfalse
, nhưng thường mình để Spark tự chọn dựa trên thống kê dữ liệu.
Tóm lại, kinh nghiệm của mình khi dùng Shuffle Hash Join là:
- Dùng khi cả hai bảng đều lớn, không thể broadcast.
- Đảm bảo partition build side nhỏ vừa đủ trong bộ nhớ executor.
- Theo dõi và xử lý data skew kịp thời.
- Tận dụng ưu điểm không cần sort, giảm CPU so với Sort Merge Join.
- Theo dõi Spark UI để phát hiện OOM hoặc spill, từ đó điều chỉnh cấu hình.
Broadcast Join là gì ?
Broadcast Join trong Apache Spark là gì?
Broadcast Join (còn gọi là Broadcast Hash Join) là một kỹ thuật join rất hiệu quả khi một trong hai bảng tham gia join có kích thước nhỏ (thường nhỏ hơn ngưỡng mặc định 10MB, có thể cấu hình qua spark.sql.autoBroadcastJoinThreshold
).
Cách hoạt động thực tế:
- Bảng nhỏ hơn (Build Side) được Spark broadcast, tức là gửi toàn bộ dữ liệu của bảng này tới tất cả các executor trong cluster.
- Bảng lớn hơn (Stream Side) được phân vùng và mỗi executor sẽ thực hiện join cục bộ giữa phần dữ liệu của bảng lớn và bản sao bảng nhỏ đã được broadcast.
- Vì bảng nhỏ được gửi đến mọi node, nên không cần shuffle dữ liệu bảng lớn, giúp giảm rất nhiều chi phí truyền dữ liệu qua mạng.
Ưu điểm:
- Rất nhanh vì tránh shuffle dữ liệu lớn.
- Phù hợp với các phép equi-join.
- Thường dùng khi một bảng rất nhỏ (ví dụ bảng dimension) join với bảng lớn (ví dụ bảng fact).
- Spark có thể tự động chuyển sang Broadcast Join khi kích thước bảng nhỏ đủ nhỏ, hoặc bạn có thể chủ động dùng hàm
broadcast()
trong API để ép Spark broadcast bảng nhỏ.
Ví dụ đơn giản trong PySpark:
```python
from pyspark.sql.functions import broadcast
joined_df = big_df.join(broadcast(small_df), “join_key”)
~~~
Ở đây, small_df
sẽ được broadcast tới tất cả executor, giúp join nhanh hơn rất nhiều.
Khi nào không dùng Broadcast Join?
- Khi cả hai bảng đều lớn, broadcast bảng nhỏ sẽ tốn bộ nhớ và mạng, làm chậm job hoặc gây lỗi OOM.
- Không áp dụng được cho các phép join không phải equi-join hoặc full outer join.
Tóm lại: Broadcast Join là kỹ thuật join tối ưu khi có một bảng nhỏ, giúp tránh shuffle bảng lớn, tăng tốc độ xử lý và giảm tài nguyên tiêu thụ. Đây là kỹ thuật mình thường ưu tiên dùng khi có điều kiện phù hợp.