Databricks Data Engineer Professional Certification Flashcards

https://www.databricks.com/sites/default/files/2025-02/databricks-certified-data-engineer-professional-exam-guide-1-mar-2025.pdf (59 cards)

1
Q

Explain how Delta Lake uses the transaction log and cloud object storage to guarantee atomicity and durability

A
  • The transaction log only records transactions that execute fully and completely.
  • Databricks inherits the durability guarantees of the cloud object storage on which the data is stored.

https://docs.databricks.com/aws/en/lakehouse/acid

How well did you know this?
1
Not at all
2
3
4
5
Perfectly
2
Q

Describe how Delta Lake’s Optimistic Concurrency Control provides isolation, and which transactions might conflict

A

By checking for conflicts only at the time of commit, rather than throughout the entire transaction.

https://docs.databricks.com/aws/en/optimizations/isolation-level

How well did you know this?
1
Not at all
2
3
4
5
Perfectly
3
Q

Describe basic functionality of Delta clone.

A
  • A deep clone copies metadata and data, including stream and copy into metadata
  • A shallow clone copies metadata (excluding stream and copy into metadata) only, and retains a reference to the original data files

https://docs.databricks.com/aws/en/delta/clone

How well did you know this?
1
Not at all
2
3
4
5
Perfectly
4
Q

What is a Spark watermark?

A
  • In stream processing, a watermark is an Apache Spark feature that can define a time-based threshold for processing data when performing stateful operations such as aggregations.
  • Spark waits to close and output the windowed aggregation until the max event time seen, minus the specified watermark, is greater than the upper bound of the window.
How well did you know this?
1
Not at all
2
3
4
5
Perfectly
5
Q

What is Bloom Filtering?

A

A Bloom filter index is a space-efficient data structure that enables data skipping on chosen columns, particularly for fields containing arbitrary text.

CREATE BLOOMFILTER INDEX
ON TABLE table_name
FOR COLUMNS(column_name OPTIONS (fpp=0.1, numItems=5000))

Databricks Bloom filter indexes consist of a data skipping index for each data file. The Bloom filter index can be used to determine that a column value is definitively not in the file, or that it is probably in the file. Before reading a file Databricks checks the index file, and the file is read only if the index indicates that the file might match a data filter.

Predictive I/O outperforms bloom filters.

How well did you know this?
1
Not at all
2
3
4
5
Perfectly
6
Q

What is predictive optimization?

A

Predictive optimization removes the need to manually manage maintenance operations for Unity Catalog managed tables on Databricks.

With predictive optimization enabled, Databricks automatically does the following:

  • Identifies tables that would benefit from maintenance operations and queues these operations to run.
  • Collects statistics when data is written to a managed table.
  • Maintenance operations are run as necessary, eliminating both unnecessary runs for maintenance operations and the burden associated with tracking and troubleshooting performance.

https://docs.databricks.com/aws/en/optimizations/predictive-optimization

How well did you know this?
1
Not at all
2
3
4
5
Perfectly
7
Q

What is ZOrdering?

A

Z-ordering is a technique to colocate related information in the same set of files. This co-locality is automatically used by Delta Lake on Databricks data-skipping algorithms. This behavior dramatically reduces the amount of data that Delta Lake on Databricks needs to read. To Z-order data, you specify the columns to order on in the ZORDER BY clause:

OPTIMIZE events
WHERE date >= current_timestamp() - INTERVAL 1 day
ZORDER BY (eventType)

https://docs.databricks.com/aws/en/delta/data-skipping#what-is-z-ordering

How well did you know this?
1
Not at all
2
3
4
5
Perfectly
8
Q

What is predictive I/O?

A

Predictive I/O improves scanning performance by applying deep learning techniques to do the following:

  • Determine the most efficient access pattern to read the data and only scanning the data that is actually needed.
  • Eliminate the decoding of columns and rows that are not required to generate query results.
  • Calculate the probabilities of the search criteria in selective queries matching a row. As queries run, we use these probabilities to anticipate where the next matching row would occur and only read that data from cloud storage.
How well did you know this?
1
Not at all
2
3
4
5
Perfectly
9
Q

What is Liquid Clustering?

A

Liquid clustering replaces table partitioning and ZORDER to simplify data layout decisions and optimize query performance. It provides the flexibility to redefine clustering keys without rewriting existing data, allowing data layout to evolve alongside analytic needs over time.

-- Create a new table with liquid clustering
CREATE TABLE table1(col0 INT, col1 string) CLUSTER BY (col0);
-- Alter an existing table
ALTER TABLE <table_name>
CLUSTER BY (<clustering_columns>)

https://docs.databricks.com/aws/en/delta/clustering

How well did you know this?
1
Not at all
2
3
4
5
Perfectly
10
Q

Implement Delta tables optimized for Databricks SQL service

A
How well did you know this?
1
Not at all
2
3
4
5
Perfectly
11
Q

When should I partition a Delta Lake table?

A

Partitioning can speed up your queries if you provide the partition column(s) as filters or join on partition column(s) or aggregate on partition column(s) or merge on partition column(s), as it will help Spark to skip a lot of unnecessary data partition (i.e., subfolders) during scan time.
Alert Icon

  • Databricks recommends not to partition tables under 1TB in size and let ingestion time clustering automatically take effect. This feature will cluster the data based on the order the data was ingested by default for all tables.
  • You can partition by a column if you expect data in each partition to be at least 1GB
  • Always choose a low cardinality column — for example, year, date — as a partition column
  • You can also take advantage of Delta’s generated columns feature while choosing the partition column. Generated columns are a special type of column whose values are automatically generated based on a user-specified function over other columns in the Delta table.

https://www.databricks.com/discover/pages/optimize-data-workloads-guide#intro

How well did you know this?
1
Not at all
2
3
4
5
Perfectly
12
Q

How do I set a table’s target file size?

A

In cases where the default file size targeted by Auto-optimize (128MB) or Optimize (1GB) isn’t working for you, you can fine-tune it as per your requirement. You can set the target file size by using delta.targetFileSize table property and then Auto-optimize and Optimize will binpack to achieve the specified size instead.

How well did you know this?
1
Not at all
2
3
4
5
Perfectly
13
Q

How can I avoid data shuffling using a broadcast hash join?

A

To entirely avoid data shuffling, broadcast one of the two tables or DataFrames (the smaller one) that are being joined together. The table is broadcast by the driver, which copies it to all worker nodes.

If you’re running a driver with a lot of memory (32GB+), you can safely raise the broadcast thresholds to something like 200MB

set spark.sql.autoBroadcastJoinThreshold = 209715200;
set spark.databricks.adaptive.autoBroadcastJoinThreshold = 209715200;

Using hints:

SELECT /*+ BROADCAST(t) */ * FROM <table-name> t

https://www.databricks.com/discover/pages/optimize-data-workloads-guide#data-shuffling

How well did you know this?
1
Not at all
2
3
4
5
Perfectly
14
Q

How can I control data shuffling using a Shuffle hash join over sort-merge join?

A
set spark.sql.join.preferSortMergeJoin = false

In most cases Spark chooses sort-merge join (SMJ) when it can’t broadcast tables. Sort-merge joins are the most expensive ones. Shuffle-hash join (SHJ) has been found to be faster in some circumstances (but not all) than sort-merge since it does not require an extra sorting step like SMJ. There is a setting that allows you to advise Spark that you would prefer SHJ over SMJ, and with that Spark will try to use SHJ instead of SMJ wherever possible. Please note that this does not mean that Spark will always choose SHJ over SMJ. We are simply defining your preference for this option.

https://www.databricks.com/discover/pages/optimize-data-workloads-guide#data-shuffling

How well did you know this?
1
Not at all
2
3
4
5
Perfectly
15
Q

What is Databricks Cost Based Optimizer (CBO)?

A

Spark SQL can use a cost-based optimizer (CBO) to improve query plans. This is especially useful for queries with multiple joins. For this to work it is critical to collect table and column statistics and keep them up to date.

To get the full benefit of the CBO it is important to collect both column statistics and table statistics. You can use the ANALYZE TABLE command to manually collect statistics.

https://docs.databricks.com/aws/en/optimizations/cbo

How well did you know this?
1
Not at all
2
3
4
5
Perfectly
16
Q

What is Data Spilling, why does it happen, and how do I get rid of it?

A

When the memory available to a Spark Task’s CPU core is insufficient to process the amount of data it is given to process, some of that data is spilled to disk, which is inneficient.

AQE auto-tuning:

Spark AQE has a feature called autoOptimizeShuffle (AOS), which can automatically find the right number of shuffle partitions (so long as compression is not excessive):

set spark.sql.shuffle.partitions=auto

Manually fine-tune

-- in SQL
set spark.sql.shuffle.partitions = 2*<number of total worker cores in cluster>
-- in PySpark
spark.conf.set(“spark.sql.shuffle.partitions”, 2*<number of total worker cores in cluster>)
-- or
spark.conf.set(“spark.sql.shuffle.partitions”, 2*sc.defaultParallelism)

https://www.databricks.com/discover/pages/optimize-data-workloads-guide#data-spilling

How well did you know this?
1
Not at all
2
3
4
5
Perfectly
17
Q

Identify and remediate data skewness

A

If all the Spark tasks for the shuffle stage are finished and just one or two of them are hanging for a long time, that’s an indication of skew.

You can remediate by:
* Filter skewed values, if possible (such as nulls)
* In the case where you are able to identify the table, the column, and preferably also the values that are causing data skew, then you can explicitly tell Spark about it using skew hints so that Spark can try to resolve it for you:

	SELECT /*+ SKEW(’table’, ’column_name’, (value1, value2)) */ * FROM table
	

* AQE skew optimization (enabled by default for spark 3+)
* Salting: Append random integers to skewed column values

https://www.databricks.com/discover/pages/optimize-data-workloads-guide#data-skewness

How well did you know this?
1
Not at all
2
3
4
5
Perfectly
18
Q

What operations can cause “data explosion”?

A
  • The EXPLODE function
  • Joins
How well did you know this?
1
Not at all
2
3
4
5
Perfectly
19
Q

Contrast different strategies for partitioning data (e.g. identify proper partitioning columns to use)

A
  • Partitioning works well only for low or known cardinality fields (for example, date fields or physical locations), but not for fields with high cardinality such as timestamps
  • Z-order works for all fields, including high cardinality fields and fields that may grow infinitely (for example, timestamps or the customer ID in a transactions or orders table)
  • Most tables can leverage ingestion time clustering to avoid needing to worry about Z-order and partition tuning.
  • Databricks recommends all partitions contain at least a gigabyte of data. Tables with fewer, larger partitions tend to outperform tables with many smaller partitions.
  • Partitions can be beneficial for very large tables. Many performance enhancements around partitioning focus on very large tables (hundreds of terabytes or greater).

https://docs.databricks.com/aws/en/tables/partitions

How well did you know this?
1
Not at all
2
3
4
5
Perfectly
20
Q

Describe and distinguish partition hints: coalesce, repartition, repartition by range, and rebalance

A
  • COALESCE ( part_num ): Reduce the number of partitions to the specified number of partitions. It takes a partition number as a parameter.
  • REPARTITION ( { part_num | [ part_num , ] column_name [ , ...] } ): Repartition to the specified number of partitions using the specified partitioning expressions. It takes a partition number, column names, or both as parameters.
  • REPARTITION_BY_RANGE ( part_num [, column_name [, ...] ] | column_name [, ...] ): Repartition to the specified number of partitions using the specified partitioning expressions. It takes column names and an optional partition number as parameters.
  • REBALANCE [ ( column_name [, ...] ) ]: The REBALANCE hint can be used to rebalance the query result output partitions, so that every partition is of a reasonable size (not too small and not too big). It can take column names as parameters, and try its best to partition the query result by these columns. This is a best-effort: if there are skews, Spark will split the skewed partitions, to make these partitions not too big. This hint is useful when you need to write the result of this query to a table, to avoid too small/big files. This hint is ignored if AQE is not enabled.

https://docs.databricks.com/aws/en/sql/language-manual/sql-ref-syntax-qry-select-hints

How well did you know this?
1
Not at all
2
3
4
5
Perfectly
21
Q

Articulate how to write Pyspark dataframes to disk while manually controlling the size of individual part-files

A

df.write.option("maxRecordsPerFile", 50).save("/tmp/foo")

How well did you know this?
1
Not at all
2
3
4
5
Perfectly
22
Q

Articulate multiple strategies for updating 1+ records in a spark table (Type 1)

A

MERGE, JOIN+OVERWRITE, UPDATE

How well did you know this?
1
Not at all
2
3
4
5
Perfectly
23
Q

Implement common design patterns unlocked by Structured Streaming and Delta Lake.

A
How well did you know this?
1
Not at all
2
3
4
5
Perfectly
24
Q

Explore and tune state information using stream-static joins and Delta Lake

A

Define the static delta table:

In stream-static joins, state information refers to the data that needs to be maintained across micro-batches to perform the join. Delta Lake simplifies state management by:
* Storing State in Delta Tables: The static dataset is stored in a Delta table, which is efficiently managed and updated.
* Handling Updates: Delta Lake supports upserts and deletes, making it easy to update the static dataset without disrupting the streaming job.
Example: If the product catalog (static dataset) is updated, Delta Lake ensures that the changes are reflected in the next micro-batch of the streaming job.

https://medium.com/@sujathamudadla1213/section-2-data-processing-batch-processing-incremental-processing-and-optimization-subtopic-is-12fe3e77da2b

25
Implement stream-static joins
``` static_df = spark \ .read \ .format("delta") \ .load("/path/to/delta/product_catalog") enriched_df = streaming_df.join(static_df, "product_id", "left") query = enriched_df \ .writeStream \ .format("delta") \ .outputMode("append") \ .option("checkpointLocation", "/path/to/checkpoint") \ .start("/path/to/enriched_sales") ```
26
How do Stream to Stream Joins Work?
When performing stream-stream join, Spark buffers past inputs as a streaming state for both input streams, so that it can match every future input with past inputs. This state can be limited by using watermarks. ## Footnote https://www.databricks.com/blog/2018/03/13/introducing-stream-stream-joins-in-apache-spark-2-3.html
27
Implement necessary logic for deduplication using Spark Structured Streaming
# Note that below we are using event_time as the watermarking column and not including the event_time column as deduplication key in dropDuplicates function. ``` volume_path = '/Volumes/mt_asqs/demo/checkpoint_paths' stream_df = ( spark.readStream.format("delta") .table("mt_asqs.demo.dd_source") .withWatermark("event_time", "5 minutes") .dropDuplicates(["event_id"]) ) dropDuplicatesWithinWatermark does exactly what you would expect it to do, this is what you should use for deduplication when you know the time window within which you expect duplicate records in your stream. ( stream_df.writeStream .format("delta") .option("checkpointLocation", f"{volume_path}/dd_test3") .table("mt_asqs.demo.dd_output") ) ``` ```python stream_df = ( spark.readStream.format("delta") .table("mt_asqs.demo.ddww_source") .withWatermark("event_time", "5 minutes") .dropDuplicatesWithinWatermark(["event_id"]) ) volume_path = '/Volumes/mt_asqs/demo/checkpoint_paths' ( stream_df.writeStream .format("delta") .option("checkpointLocation", f"{volume_path}/ddww_test") .table("mt_asqs.demo.ddww_output") ) ``` ## Footnote https://community.databricks.com/t5/technical-blog/deep-dive-streaming-deduplication/ba-p/105062
28
Enable CDF on Delta Lake tables and re-design data processing steps to process CDC output instead of incremental feed from normal Structured Streaming read
# As a stream ``` CREATE TABLE student (id INT, name STRING, age INT) TBLPROPERTIES (delta.enableChangeDataFeed = true); ALTER TABLE myDeltaTable SET TBLPROPERTIES (delta.enableChangeDataFeed = true) ``` ...or for all tables: ```text set spark.databricks.delta.properties.defaults.enableChangeDataFeed = true; ``` To read the changes: ```python # as a stream spark.readStream .option("readChangeFeed", "true") .option("startingVersion", 76) .table("source_table") ) versions as ints or longs spark.read \ .option("readChangeFeed", "true") \ .option("startingVersion", 0) \ .option("endingVersion", 10) \ .table("myDeltaTable") timestamps as formatted timestamp spark.read \ .option("readChangeFeed", "true") \ .option("startingTimestamp", '2021-04-21 05:45:46') \ .option("endingTimestamp", '2021-05-21 12:00:00') \ .table("myDeltaTable") providing only the startingVersion/timestamp spark.read \ .option("readChangeFeed", "true") \ .option("startingVersion", 0) \ .table("myDeltaTable") ``` ```sql -- version as ints or longs e.g. changes from version 0 to 10 SELECT * FROM table_changes('tableName', 0, 10) -- timestamp as string formatted timestamps SELECT * FROM table_changes('tableName', '2021-04-21 05:45:46', '2021-05-21 12:00:00') -- providing only the startingVersion/timestamp SELECT * FROM table_changes('tableName', 0) ``` ## Footnote https://docs.databricks.com/aws/en/delta/delta-change-data-feed#enable https://docs.databricks.com/aws/en/delta/delta-change-data-feed
29
Leverage CDF to easily propagate deletes
# Old syntax **Python:** ``` apply_changes( target = "user_silver", source = "cdf_user_bronze", keys = ["user_id"], sequence_by = struct('user_timestamp','_commit_version'), except_column_list = ["_change_type", "_commit_version", "_commit_timestamp"], apply_as_deletes = expr("_change_type = 'delete'"), stored_as_scd_type = 1 ) ``` **SQL:** ``` CREATE OR REFRESH STREAMING TABLE table_name; CREATE FLOW flow_name AS AUTO CDC INTO table_name FROM source KEYS (keys) [IGNORE NULL UPDATES] [APPLY AS DELETE WHEN condition] [APPLY AS TRUNCATE WHEN condition] SEQUENCE BY orderByColumn [COLUMNS {columnList | * EXCEPT (exceptColumnList)}] [STORED AS {SCD TYPE 1 | SCD TYPE 2}] [TRACK HISTORY ON {columnList | * EXCEPT (exceptColumnList)}] ``` ## Footnote https://docs.databricks.com/aws/en/dlt-ref/dlt-sql-ref-apply-changes-into
30
Demonstrate how proper partitioning of data allows for simple archiving or deletion of data
Example: Partition by date or date part to be able to archive or delete data older than a certain date. `ALTER TABLE SET TBLPROPERTIES(delta.timeUntilArchived = 'X days');` ## Footnote https://docs.databricks.com/aws/en/optimizations/archive-delta#queries-optimized-for-archived-data
31
Articulate, how “smalls” (tiny files, scanning overhead, over partitioning, etc) induce performance problems into Spark queries
Scanning and metadata overhead, and under-utilization of CPU/memory resources due to small tasks sizes. ## Footnote https://medium.com/@sujathamudadla1213/section-2-data-processing-batch-processing-incremental-processing-and-optimization-subtopic-is-36f4af9d46f7
32
Describe the objective of data transformations during promotion from bronze to silver
* Cleaned, filtered, and enriched data. * Data is transformed into a more structured and usable format. * Acts as a single source of truth for downstream analytics. ## Footnote https://medium.com/@sujathamudadla1213/section-3-data-modeling-subtopic-is-describe-the-objective-of-data-transformations-during-5e1414dc4739
33
Discuss how Change Data Feed (CDF) addresses past difficulties propagating updates and deletes within Lakehouse architecture
CDF simplifies the process by eliminating the need for a watermark column, as it directly provides the changes (inserts, updates, deletes).
34
Apply Delta Lake clone to learn how shallow and deep clone interact with source/target tables.
* A deep clone copies metadata and data, including stream and `copy into` metadata * A shallow clone copies metadata (excluding stream and `copy into` metadata) only, and retains a reference to the original data files
35
Design a multiplex bronze table to avoid common pitfalls when trying to productionalize streaming workloads.
A multiplex bronze table is a single table that consolidates data from multiple sources or streams while addressing the challenges mentioned above. It uses metadata columns to differentiate between sources, handle schema evolution, and ensure data quality. **Key Features:** * Source Identification: Each record includes metadata about its source (e.g., source_id, source_timestamp). * Schema Flexibility: Supports schema evolution by storing data in a flexible format (e.g., JSON or Avro). * Deduplication: Uses unique identifiers or metadata to detect and remove duplicates. * Partitioning: Optimizes query performance by partitioning data (e.g., by date or source).
36
Implement best practices when streaming data from multiplex bronze tables.
* Use schema enforcement to ensure that incoming data adheres to a predefined structure. This prevents malformed data from entering the bronze layer. * Partition by Source or Topic: Partition the bronze table by source or topic to improve query performance and simplify data management. * Time-Based Partitioning: Use time-based partitioning (e.g., by day or hour) for time-series data. * Remove Duplicates: Use unique identifiers (e.g., event_id) to deduplicate data during ingestion. * Use micro-batching to balance latency and throughput.
37
Apply incremental processing, quality enforcement, and deduplication to process data from bronze to silver
* MERGE from **CDC**: ``` MERGE INTO silver_table AS target USING bronze_table AS source ON target.id = source.id WHEN MATCHED THEN UPDATE SET * WHEN NOT MATCHED THEN INSERT * ``` * **Watermarking:** Use a timestamp column (e.g., event_time) to identify new records. * **Auto Loader:** ``` df = spark.readStream \ .format("cloudFiles") \ .option("cloudFiles.format", "json") \ .load("path/to/bronze") ```
38
Make informed decisions about how to enforce data quality based on strengths and limitations of various approaches in Delta Lake
* Pipeline/Job: * Schema Enforcement: `spark.readStream.schema(schema).json("path/to/data")` * Constraints: `df.filter(col("value").isNotNull() & (col("value") > 0))` * Delta Lake Constraints (table level): * CHECK and NOT NULL Constraints * Data Quality Metrics and Monitoring: ``` from databricks import lakehouse_monitoring lakehouse_monitoring.create_monitor( table_name="my_table", metrics=["null_count", "value_distribution"] ) ``` * Data deduplication: `df.dropDuplicates(["id"])` ## Footnote https://medium.com/@sujathamudadla1213/section-3-data-modeling-subtopic-is-make-informed-decisions-about-how-to-enforce-data-b480b58a8999
39
Implement tables avoiding issues caused by lack of foreign key constraints
40
Add constraints to Delta Lake tables to prevent bad data from being written
``` ALTER TABLE employees ADD CONSTRAINT valid_age CHECK (age >= 18 AND age <= 65); ```
41
Describe the possible behaviors for a DLT (delta live table) EXPECT ation
* **EXPECT (default):** Records violating the expectation are retained, but the violation is logged as a data quality issue. * **EXPECT or ON VIOLATION DROP ROW:** Records violating the expectation are dropped from the target dataset. * **EXPECT or ON VIOLATION FAIL UPDATE:** The pipeline fails immediately when a record violates the expectation. ## Footnote https://learn.microsoft.com/en-us/azure/databricks/dlt/expectations
42
Describe the normal forms
Levels of Normalization: * **First Normal Form (1NF):** Eliminate duplicate columns and ensure atomic values (no array, table, or object column data types). * **Second Normal Form (2NF):** Remove partial dependencies (all non-key attributes depend on the entire primary key). * **Third Normal Form (3NF)**: Remove transitive dependencies (non-key attributes depend only on the primary key). ## Footnote https://en.wikipedia.org/wiki/First_normal_form https://en.wikipedia.org/wiki/Second_normal_form https://en.wikipedia.org/wiki/Third_normal_form
43
Implement lookup tables and describe the trade-offs for normalized data models Dimension tables using Delta Lake with streaming and batch workloads.
44
Describe SCD Types 0 - 4
* **SCD Type 0:** No changes permitted (durable). * **SCD Type 1:** No history is maintained; changes are updated in place! * **SCD Type 2:** History is maintained; changes are upserted! * **SCD Type 3:** Tracks changes using additional columns for previous data. * **SCD Type 4:** History table
45
Define the 3 Dynamic View Functions
* **`current_user()`:** Returns the current user's email address. * **`is_account_group_member():`** Returns TRUE if the current user is a member of a specific account-level group. Recommended for use in dynamic views against Unity Catalog data. * **`is_member()`:** Returns TRUE if the current user is a member of a specific workspace-level group. This function is provided for compatibility with the existing Hive metastore. Avoid using it with views against Unity Catalog data, because it does not evaluate account-level group membership.
46
Describe the elements in the Spark UI to aid in performance analysis, application debugging, and tuning of Spark applications.
Use the Jobs Timeline to identify major issues * Look at longest stage * Look for skew or spill * Determine if longest stage is I/O bound * Look for other causes of slow stage runtime
47
Inspect event timelines and metrics for stages and jobs performed on a cluster
* **Event Timeline:** The event timeline provides a visual representation of the sequence of events (e.g., task execution, shuffling, I/O operations) during the execution of a job. It helps identify bottlenecks (e.g., long-running tasks, excessive shuffling) and resource contention (e.g., CPU, memory). * **Metrics:** Metrics are quantitative measurements of cluster and job performance, such as: * CPU usage * Memory usage * Disk I/O * Network I/O * Task duration * Shuffle read/write sizes * **Stages and Jobs:** A job is a high-level action (e.g., count, save) submitted to the cluster. A stage is a set of tasks that can be executed in parallel. Stages are separated by shuffle boundaries.
48
Draw conclusions from information presented in the Spark UI, Ganglia UI, and the Cluster UI to assess performance problems and debug failing applications.
Event Timeline - Task Execution: Identify long-running tasks or tasks with high resource usage. - Shuffling: Look for stages with excessive shuffle read/write sizes. - Gaps: Gaps in the timeline indicate idle time, which may be caused by resource contention or scheduling delays. - A stage with many small tasks may indicate data skew. - A stage with long shuffle times may indicate network bottlenecks. Cluster Metrics * CPU Usage: High CPU usage may indicate computationally intensive tasks. * Memory Usage: High memory usage may lead to Out of Memory (OOM) errors. * Disk I/O: High disk I/O may indicate excessive spilling to disk. * Network I/O: High network I/O may indicate excessive shuffling. Task Metrics * Task Duration: Long task durations may indicate inefficient code or resource contention. * Shuffle Read/Write Sizes: Large shuffle sizes may indicate inefficient data partitioning. ## Footnote https://learn.microsoft.com/en-us/azure/databricks/optimizations/spark-ui-guide/ https://spark.apache.org/docs/latest/web-ui.html https://www.youtube.com/watch?v=y3VCWVbzAKA https://medium.com/@sujathamudadla1213/826467ab0f27
49
Design systems that control for cost and latency SLAs for production streaming jobs.
Optimize Data Ingestion Use Efficient Formats: Use columnar formats like Parquet or ORC for storage and Avro or Protobuf for streaming. Compress Data: Apply compression (e.g., Snappy, GZIP) to reduce data size and network overhead. Batch Small Records: Group small records into larger batches to reduce the number of I/O operations. b. Scale Resources Dynamically Autoscaling: Use autoscaling features (e.g., AWS Auto Scaling, Databricks Autoscaling) to adjust resources based on workload. Spot Instances: Use spot instances or preemptible VMs for non-critical workloads to reduce costs. c. Tune Processing Logic Parallelism: Increase parallelism by partitioning data and using multiple workers. Windowing: Use windowed aggregations to process data in chunks instead of processing each record individually. State Management: Optimize state management (e.g., use RocksDB for efficient state storage in Apache Flink). d. Monitor and Optimize Metrics Collection: Collect metrics (e.g., latency, throughput, resource usage) to identify bottlenecks. Cost Monitoring: Use cloud cost management tools (e.g., AWS Cost Explorer, Azure Cost Management) to track spending. Alerting: Set up alerts for SLA violations (e.g., latency exceeding a threshold or cost overruns). e. Fault Tolerance and Recovery Checkpointing: Use checkpointing to save the state of the streaming job periodically, enabling fast recovery from failures. Idempotent Processing: Design jobs to handle duplicate records without causing data corruption. Dead Letter Queues: Route failed records to a dead letter queue for later reprocessing. ## Footnote https://medium.com/@sujathamudadla1213/section-6-testing-deployment-9747b10d3f18
50
Deploy and monitor streaming and batch jobs
**Streaming Jobs:** * **Input Rate:** Volume of data being ingested. * **Processing Time:** Time taken to process each micro-batch. * **Latency:** Delay between data arrival and processing. * **Backpressure:** Indicates if the system is struggling to keep up with the input rate. **Batch Jobs:** * **Job Duration:** Time taken to complete the job. * **Resource Utilization:** CPU, memory, and disk usage. * **Data Volume:** Amount of data processed. * **Success/Failure Rate:** Percentage of jobs that succeed or fail. ## Footnote https://medium.com/@sujathamudadla1213/section-5-monitoring-logging-subtopic-deploy-and-monitor-streaming-and-batch-jobs-edbd77b4783b
51
Adapt a notebook dependency pattern to use Python file dependencies
52
Adapt Python code maintained as Wheels to direct imports using relative paths
53
Repair and rerun failed jobs
54
Create Jobs based on common use cases and patterns
55
Create a multi-task job with multiple dependencies
56
Design systems that control for cost and latency SLAs for production streaming jobs.
Right-Size Clusters Micro-Batching Data Compression (Snappy) Cost Monitoring (AWS Cost Explorer) Fault Tolerance and Recovery: Enable checkpointing Optimize Queries to Reduce Data Skew Databricks Lakehouse Monitoring
57
Configure the Databricks CLI and execute basic commands to interact with the workspace and clusters.
```bash pip install databricks-cli databricks configure --token ``` ...or set environment variables ## Footnote https://docs.databricks.com/aws/en/dev-tools/cli/authentication
58
Execute commands from the CLI to deploy and monitor Databricks jobs.
# ...etc. ```bash databricks jobs create --json-file job_config.json databricks jobs reset --job-id 123 --json-file updated_job_config.json databricks jobs get-run --run-id 456 databricks jobs get-run-output --run-id 456 ```
59
Use REST API to clone a job, trigger a run, and export the run output
Endpoints: * jobs/get * jobs/create * jobs/run-now * jobs/runs/export