ML/MLOps Engineer
Spark ์กฐ์ธ(join) ์ตœ์ ํ™”

Spark: Join optimization๋ฅผ ๋ฒˆ์—ญํ•˜์˜€์Šต๋‹ˆ๋‹ค.

์กฐ์ธ(join) ์—ฐ์‚ฐ์€ ํ…Œ์ด๋ธ”์„ ๊ฒฐํ•ฉํ•˜๋Š” ๊ณผ์ •์œผ๋กœ ๋น„์šฉ์ด ๋งŽ์ด ๋“ค๋ฉฐ, ๋ฐ์ดํ„ฐ ์…”ํ”Œ(shuffle)๊ณผ ์„ฑ๋Šฅ ๋ณ‘๋ชฉํ˜„์ƒ์„ ์ดˆ๋ž˜ํ•  ์ˆ˜ ์žˆ์Šต๋‹ˆ๋‹ค.

์กฐ์ธ์„ ์ˆ˜ํ–‰ํ•˜๊ธฐ ์ „์—

๋ฐ์ดํ„ฐ ์ œํ•œ(Limit Data)

์กฐ์ธ ์ž‘์—… ์ „์— DataFrame์—์„œ ๋ถˆํ•„์š”ํ•œ ํ–‰(Row)๊ณผ ์—ด(Column)์„ ํ•„ํ„ฐ๋งํ•˜์—ฌ ๋ฐ์ดํ„ฐ๋ฅผ ์ค„์ด์„ธ์š”.

  • ๋ฐ์ดํ„ฐ ์…”ํ”Œ ์ค‘ ์ „์†ก๋Ÿ‰ ๊ฐ์†Œ
  • Executor ์ฒ˜๋ฆฌ ์‹œ๊ฐ„ ๋‹จ์ถ•
join_default = product.join(catalog, ['product_id'])

# ํ–‰ ์ œํ•œ(Predicate Pushdown): ํŠน์ • ์กฐ๊ฑด์— ๋งž๋Š” ํ–‰๋งŒ ํ•„ํ„ฐ๋ง
# ํ‰์ ์ด 4 ์ด์ƒ์ธ ์ œํ’ˆ๋งŒ ๋ฐ˜ํ™˜
product_4star = product.select('product_id', 'product_name')\
    .filter(col('rating') > 3.9)

# ์—ด ์ œํ•œ(Projection Pushdown): ํ•„์š”ํ•œ ์—ด๋งŒ ์„ ํƒ
# catalog ๋ฐ์ดํ„ฐ์…‹์—์„œ product_id์™€ category ์—ด๋งŒ ์Šค์บ”
catalog_ = catalog.select('product_id', 'category')

join_optimized = product_4star.join(catalog_, ['product_id'])

Catalyst Optimizer ํ™œ์šฉ

๊ณ ์ˆ˜์ค€ Spark API ์‚ฌ์šฉ: DataFrame, Dataset, SparkSQL์„ ํ™œ์šฉํ•˜์„ธ์š”. Dynamic Frame์ด๋‚˜ RDD๋ฅผ ์‚ฌ์šฉํ•˜๋Š” ๊ฒฝ์šฐ Catalyst Optimizer์˜ ์ตœ์ ํ™”๋ฅผ ์ ์šฉํ•  ์ˆ˜ ์—†์Šต๋‹ˆ๋‹ค.


์ ์‘ํ˜• ์ฟผ๋ฆฌ ์‹คํ–‰(AQE, Adaptive Query Execution): Catalyst Optimizer๋Š” ์‹คํ–‰ ์ค‘์ธ Spark ์ž‘์—…์—์„œ ๋Ÿฐํƒ€์ž„ ํ†ต๊ณ„๋ฅผ ํ™œ์šฉํ•˜์—ฌ ์ฟผ๋ฆฌ๋ฅผ ์ตœ์ ํ™”ํ•ฉ๋‹ˆ๋‹ค.

Broadcast Hash Join

Broadcast Hash Join์€ ๋ฐ์ดํ„ฐ ์…”ํ”Œ(shuffling)์ด ํ•„์š” ์—†์œผ๋ฉฐ, ์ž‘์€ ํ…Œ์ด๋ธ”๊ณผ ํฐ ํ…Œ์ด๋ธ”์„ ์กฐ์ธํ•  ๋•Œ๋งŒ ์ ์šฉํ•  ์ˆ˜ ์žˆ์Šต๋‹ˆ๋‹ค.

์ž‘์€ ํ…Œ์ด๋ธ”์ด ๋‹จ์ผ Spark ์‹คํ–‰๊ธฐ(executor)์˜ ๋ฉ”๋ชจ๋ฆฌ์— ์ ํ•ฉํ•œ ๊ฒฝ์šฐ Broadcast Hash Join์„ ์‚ฌ์šฉํ•˜๋Š” ๊ฒƒ์„ ๊ณ ๋ คํ•˜์„ธ์š”. ์ด ๋ฐฉ์‹์€ ์ž‘์€ RDD๋ฅผ ๊ฐ ์›Œ์ปค ๋…ธ๋“œ๋กœ ์ „๋‹ฌํ•œ ํ›„, ํฐ RDD์˜ ๊ฐ ํŒŒํ‹ฐ์…˜๊ณผ ๋งต-์‚ฌ์ด๋“œ ๊ฒฐํ•ฉ(map-side combining)์„ ์ˆ˜ํ–‰ํ•ฉ๋‹ˆ๋‹ค.

from pySpark.sql.functions import broadcast

# DataFrame
joined = large_df.join(broadcast(smaller_df), right_df[key] == left_df[key], how='inner')

# SparkSQL
SELECT /*+ BROADCAST(t1) */ 
FROM t1 INNER JOIN t2 ON t1.key = t2.key;

Shuffle Join

  • ์…”ํ”Œ ํ•ด์‹œ ์กฐ์ธ(Shuffle Hash Join): Shuffle Hash Join์€ ์ •๋ ฌ ์—†์ด ๋‘ ๊ฐœ์˜ ๋ฐ์ดํ„ฐ ํ”„๋ ˆ์ž„์„ ์กฐ์ธํ•ฉ๋‹ˆ๋‹ค. Spark ์‹คํ–‰๊ธฐ(executor)์˜ ๋ฉ”๋ชจ๋ฆฌ์— ์ €์žฅํ•  ์ˆ˜ ์žˆ๋Š” ๋‘ ๊ฐœ์˜ ์ž‘์€ ํ…Œ์ด๋ธ”์„ ์กฐ์ธํ•  ๋•Œ ์ ํ•ฉํ•ฉ๋‹ˆ๋‹ค.
  • ์ •๋ ฌ-๋ณ‘ํ•ฉ ์กฐ์ธ(Sort-Merge Join): Sort-Merge Join์€ ๋‹ค์Œ ๊ณผ์ •์„ ๊ฑฐ์ณ ์กฐ์ธ์„ ์ˆ˜ํ–‰ํ•ฉ๋‹ˆ๋‹ค: ๋ฐ์ดํ„ฐ ํ”„๋ ˆ์ž„์„ ์…”ํ”Œ(Shuffle) → ๋ฐ์ดํ„ฐ๋ฅผ ์ •๋ ฌ(Sort) → ์กฐ์ธ ์ˆ˜ํ–‰. ์ด ๋ฐฉ์‹์€ ๋Œ€๊ทœ๋ชจ ํ…Œ์ด๋ธ” ์กฐ์ธ์— ์ ํ•ฉํ•˜์ง€๋งŒ, ๋ฉ”๋ชจ๋ฆฌ ๋ถ€์กฑ(OOM) ๋ฌธ์ œ์™€ ์„ฑ๋Šฅ ์ €ํ•˜๋ฅผ ์ดˆ๋ž˜ํ•  ์ˆ˜ ์žˆ์Šต๋‹ˆ๋‹ค. ์ด๋Ÿฐ ๊ฒฝ์šฐ, ๋ฒ„ํ‚ทํ™”(Bucketing)๋ฅผ ํ†ตํ•ด ์กฐ์ธ์˜ ํšจ์œจ์„ฑ์„ ๋†’์ผ ์ˆ˜ ์žˆ์Šต๋‹ˆ๋‹ค.Bucketing๋ฒ„ํ‚ทํ™”๋Š” ์กฐ์ธ ํ‚ค๋ฅผ ๊ธฐ์ค€์œผ๋กœ ๋ฐ์ดํ„ฐ๋ฅผ ๋ฏธ๋ฆฌ ์…”ํ”Œํ•˜๊ณ  ์ •๋ ฌํ•œ ํ›„ ์ค‘๊ฐ„ ํ…Œ์ด๋ธ”์— ์ €์žฅํ•ฉ๋‹ˆ๋‹ค. ์ด๋ฅผ ํ†ตํ•ด ๋Œ€๊ทœ๋ชจ ํ…Œ์ด๋ธ” ์กฐ์ธ ์‹œ ์…”ํ”Œ๊ณผ ์ •๋ ฌ ๋น„์šฉ์„ ์ค„์ผ ์ˆ˜ ์žˆ์Šต๋‹ˆ๋‹ค.
    # ๋ฒ„ํ‚ทํ™” ํ…Œ์ด๋ธ” ์ƒ์„ฑ
    df.write.bucketBy(30, 'orderId').sortBy('orderDate').saveAsTable('bucketedOrder')
    ์กฐ์ธ ์ „ ์žฌํŒŒํ‹ฐ์…”๋‹(Repartition before Join)๋‘ RDD๊ฐ€ ๋™์ผํ•œ ํ‚ค์™€ ๋™์ผํ•œ ํŒŒํ‹ฐ์…”๋‹ ์ฝ”๋“œ๋กœ ํŒŒํ‹ฐ์…”๋‹๋˜์–ด ์žˆ๋‹ค๋ฉด, ์กฐ์ธํ•ด์•ผ ํ•  RDD ๋ ˆ์ฝ”๋“œ๊ฐ€ ๋™์ผํ•œ ์›Œ์ปค ๋…ธ๋“œ์— ์œ„์น˜ํ•  ๊ฐ€๋Šฅ์„ฑ์ด ๋†’์•„์ง‘๋‹ˆ๋‹ค. ์ด๋Š” ์…”ํ”Œ ํ™œ๋™๊ณผ ๋ฐ์ดํ„ฐ ๋ถˆ๊ท ํ˜•(skewness)์„ ์ค„์—ฌ ์„ฑ๋Šฅ์„ ํ–ฅ์ƒ์‹œํ‚ฌ ์ˆ˜ ์žˆ์Šต๋‹ˆ๋‹ค.
    return_ord = returns.repartition(N, 'product_id')
    order_rep = order.repartition(N, 'product_id')
    
    joined = return_ord.join(order_rep, 'product_id')