A Spark job loads an e-commerce fact table (≈120 million rows, 50 columns) each day. Because the same vendor file can be delivered several times in one day, the raw table contains duplicate rows that are identical on every column except load_ts (the ingestion timestamp). The engineering team must build a cleansed DataFrame that
keeps only the earliest load_ts for each unique combination of order_id and line_id,
produces identical results on every rerun, regardless of partitioning, and
avoids the wide aggregations that can exhaust executor memory on very large tables. They must also save all removed rows to duplicates_audit for compliance review.
Which PySpark approach best satisfies these requirements?
Cache the raw DataFrame and run dropDuplicates(["order_id","line_id"]); use a subtract to obtain duplicates_audit.
Call distinct() on the entire raw DataFrame so that duplicates disappear automatically; the difference between the two DataFrames becomes duplicates_audit.
Generate rn = row_number().over(Window.partitionBy("order_id","line_id").orderBy("load_ts")), keep rows where rn == 1, then anti-join this result back to the raw DataFrame to create duplicates_audit.
Run groupBy("order_id","line_id").agg(min("load_ts").alias("load_ts")) and inner-join the result to the raw DataFrame; the non-matching rows form duplicates_audit.
Using a window specification that partitions by order_id and line_id and orders by load_ts guarantees that exactly one deterministic record per business key is retained. Adding row_number() assigns a sequence inside each partition; filtering on row_number == 1 preserves the earliest timestamp. An anti-join can then be used to quickly isolate the discarded rows for duplicates_audit by finding rows in the original DataFrame that are not present in the cleansed one.
The dropDuplicates method simply keeps the first physical row it encounters after a hash shuffle; which row is kept is not deterministic and, for very wide tables, its implicit first() aggregation over every column causes heavy memory use.
groupBy().agg(min(load_ts)) returns only the keys and the minimum timestamp, so additional joins are required to recover the remaining 48 columns, adding shuffle cost and complexity.
distinct() would fail because the duplicate rows differ in load_ts, so none of them are exact duplicates across all 50 columns.
Therefore the window-based row_number solution is the only option that meets all functional and performance requirements.
Ask Bash
Bash is our AI bot, trained to help you pass your exam. AI Generated Content may display inaccurate information, always double-check anything important.
What is a Window function in PySpark?
Open an interactive chat with Bash
Why is the anti-join operation used, and how does it work?
Open an interactive chat with Bash
What are the drawbacks of using `dropDuplicates` for this task?