Spark: Join optimization๋ฅผ ๋ฒ์ญํ์์ต๋๋ค.
์กฐ์ธ(join) ์ฐ์ฐ์ ํ ์ด๋ธ์ ๊ฒฐํฉํ๋ ๊ณผ์ ์ผ๋ก ๋น์ฉ์ด ๋ง์ด ๋ค๋ฉฐ, ๋ฐ์ดํฐ ์ ํ(shuffle)๊ณผ ์ฑ๋ฅ ๋ณ๋ชฉํ์์ ์ด๋ํ ์ ์์ต๋๋ค.
์กฐ์ธ์ ์ํํ๊ธฐ ์ ์
๋ฐ์ดํฐ ์ ํ(Limit Data)
์กฐ์ธ ์์ ์ ์ DataFrame์์ ๋ถํ์ํ ํ(Row)๊ณผ ์ด(Column)์ ํํฐ๋งํ์ฌ ๋ฐ์ดํฐ๋ฅผ ์ค์ด์ธ์.
- ๋ฐ์ดํฐ ์ ํ ์ค ์ ์ก๋ ๊ฐ์
- Executor ์ฒ๋ฆฌ ์๊ฐ ๋จ์ถ
join_default = product.join(catalog, ['product_id'])
# ํ ์ ํ(Predicate Pushdown): ํน์ ์กฐ๊ฑด์ ๋ง๋ ํ๋ง ํํฐ๋ง
# ํ์ ์ด 4 ์ด์์ธ ์ ํ๋ง ๋ฐํ
product_4star = product.select('product_id', 'product_name')\
.filter(col('rating') > 3.9)
# ์ด ์ ํ(Projection Pushdown): ํ์ํ ์ด๋ง ์ ํ
# catalog ๋ฐ์ดํฐ์
์์ product_id์ category ์ด๋ง ์ค์บ
catalog_ = catalog.select('product_id', 'category')
join_optimized = product_4star.join(catalog_, ['product_id'])
Catalyst Optimizer ํ์ฉ
๊ณ ์์ค Spark API ์ฌ์ฉ: DataFrame, Dataset, SparkSQL์ ํ์ฉํ์ธ์. Dynamic Frame์ด๋ RDD๋ฅผ ์ฌ์ฉํ๋ ๊ฒฝ์ฐ Catalyst Optimizer์ ์ต์ ํ๋ฅผ ์ ์ฉํ ์ ์์ต๋๋ค.
์ ์ํ ์ฟผ๋ฆฌ ์คํ(AQE, Adaptive Query Execution): Catalyst Optimizer๋ ์คํ ์ค์ธ Spark ์์
์์ ๋ฐํ์ ํต๊ณ๋ฅผ ํ์ฉํ์ฌ ์ฟผ๋ฆฌ๋ฅผ ์ต์ ํํฉ๋๋ค.
Broadcast Hash Join
Broadcast Hash Join์ ๋ฐ์ดํฐ ์ ํ(shuffling)์ด ํ์ ์์ผ๋ฉฐ, ์์ ํ ์ด๋ธ๊ณผ ํฐ ํ ์ด๋ธ์ ์กฐ์ธํ ๋๋ง ์ ์ฉํ ์ ์์ต๋๋ค.
์์ ํ ์ด๋ธ์ด ๋จ์ผ Spark ์คํ๊ธฐ(executor)์ ๋ฉ๋ชจ๋ฆฌ์ ์ ํฉํ ๊ฒฝ์ฐ Broadcast Hash Join์ ์ฌ์ฉํ๋ ๊ฒ์ ๊ณ ๋ คํ์ธ์. ์ด ๋ฐฉ์์ ์์ RDD๋ฅผ ๊ฐ ์์ปค ๋ ธ๋๋ก ์ ๋ฌํ ํ, ํฐ RDD์ ๊ฐ ํํฐ์ ๊ณผ ๋งต-์ฌ์ด๋ ๊ฒฐํฉ(map-side combining)์ ์ํํฉ๋๋ค.
from pySpark.sql.functions import broadcast
# DataFrame
joined = large_df.join(broadcast(smaller_df), right_df[key] == left_df[key], how='inner')
# SparkSQL
SELECT /*+ BROADCAST(t1) */
FROM t1 INNER JOIN t2 ON t1.key = t2.key;
Shuffle Join
- ์ ํ ํด์ ์กฐ์ธ(Shuffle Hash Join): Shuffle Hash Join์ ์ ๋ ฌ ์์ด ๋ ๊ฐ์ ๋ฐ์ดํฐ ํ๋ ์์ ์กฐ์ธํฉ๋๋ค. Spark ์คํ๊ธฐ(executor)์ ๋ฉ๋ชจ๋ฆฌ์ ์ ์ฅํ ์ ์๋ ๋ ๊ฐ์ ์์ ํ ์ด๋ธ์ ์กฐ์ธํ ๋ ์ ํฉํฉ๋๋ค.
- ์ ๋ ฌ-๋ณํฉ ์กฐ์ธ(Sort-Merge Join): Sort-Merge Join์ ๋ค์ ๊ณผ์ ์ ๊ฑฐ์ณ ์กฐ์ธ์ ์ํํฉ๋๋ค: ๋ฐ์ดํฐ ํ๋ ์์ ์
ํ(Shuffle) → ๋ฐ์ดํฐ๋ฅผ ์ ๋ ฌ(Sort) → ์กฐ์ธ ์ํ. ์ด ๋ฐฉ์์ ๋๊ท๋ชจ ํ
์ด๋ธ ์กฐ์ธ์ ์ ํฉํ์ง๋ง, ๋ฉ๋ชจ๋ฆฌ ๋ถ์กฑ(OOM) ๋ฌธ์ ์ ์ฑ๋ฅ ์ ํ๋ฅผ ์ด๋ํ ์ ์์ต๋๋ค. ์ด๋ฐ ๊ฒฝ์ฐ, ๋ฒํทํ(Bucketing)๋ฅผ ํตํด ์กฐ์ธ์ ํจ์จ์ฑ์ ๋์ผ ์ ์์ต๋๋ค.Bucketing๋ฒํทํ๋ ์กฐ์ธ ํค๋ฅผ ๊ธฐ์ค์ผ๋ก ๋ฐ์ดํฐ๋ฅผ ๋ฏธ๋ฆฌ ์
ํํ๊ณ ์ ๋ ฌํ ํ ์ค๊ฐ ํ
์ด๋ธ์ ์ ์ฅํฉ๋๋ค. ์ด๋ฅผ ํตํด ๋๊ท๋ชจ ํ
์ด๋ธ ์กฐ์ธ ์ ์
ํ๊ณผ ์ ๋ ฌ ๋น์ฉ์ ์ค์ผ ์ ์์ต๋๋ค.
์กฐ์ธ ์ ์ฌํํฐ์ ๋(Repartition before Join)๋ RDD๊ฐ ๋์ผํ ํค์ ๋์ผํ ํํฐ์ ๋ ์ฝ๋๋ก ํํฐ์ ๋๋์ด ์๋ค๋ฉด, ์กฐ์ธํด์ผ ํ RDD ๋ ์ฝ๋๊ฐ ๋์ผํ ์์ปค ๋ ธ๋์ ์์นํ ๊ฐ๋ฅ์ฑ์ด ๋์์ง๋๋ค. ์ด๋ ์ ํ ํ๋๊ณผ ๋ฐ์ดํฐ ๋ถ๊ท ํ(skewness)์ ์ค์ฌ ์ฑ๋ฅ์ ํฅ์์ํฌ ์ ์์ต๋๋ค.# ๋ฒํทํ ํ ์ด๋ธ ์์ฑ df.write.bucketBy(30, 'orderId').sortBy('orderDate').saveAsTable('bucketedOrder')
return_ord = returns.repartition(N, 'product_id') order_rep = order.repartition(N, 'product_id') joined = return_ord.join(order_rep, 'product_id')
'note' ์นดํ ๊ณ ๋ฆฌ์ ๋ค๋ฅธ ๊ธ
์์ ๋ก ์ดํด๋ณด๋ AI๋ฅผ ์ํ ๋ฐ์ดํฐ ์์ง๋์ด๋ง (0) | 2024.11.30 |
---|---|
Kubernetes HPA (Horizontal Pod Autoscaler) ์์๋ณด๊ธฐ (0) | 2024.11.27 |
Kubernetes์ Probe ์์๋ณด๊ธฐ (0) | 2024.11.25 |
๋์ปค ๋ฉํฐ์คํ ์ด์ง (0) | 2024.11.24 |
Apache Spark RDD(Resilient Distributed Dataset) (0) | 2024.11.22 |