Resolving OutOfMemory (OOM) Errors in PySpark: Best Practices for Optimizing Spark Jobs in Production (opens new window)๋ฅผ ์ฝ๊ณ ์ ๋ฆฌํ์์ต๋๋ค.
์บ์ฑ, ์ฌํํฐ์ ๋, ์ต์ ํ๋ ์กฐ์ธ, ์์๋ ๋ ์ฝ๋ ์ฒ๋ฆฌ, ๋ฐ์ดํฐ ๋ถ๊ท ํ ๋ฌธ์ ๋ฅผ ํด๊ฒฐํ๋ ๋ฑ์ ์ต์ ํ๋ฅผ ์ ์ฉํ์์๋ ๋ถ๊ตฌํ๊ณ PySpark ํ๋ก๋์ ํ๊ฒฝ์์ OutOfMemory(OOM) ์ค๋ฅ๊ฐ ๋ฐ์ํ๋ ๊ฒฝ์ฐ ๋ฐ์ดํฐ ๊ตฌ์กฐ์ ๋ถํ์ค์ฑ์ด๋ ๋นํจ์จ์ ์ธ ๋ฉ๋ชจ๋ฆฌ ๊ด๋ฆฌ์ ๊ธฐ์ธํ ์ ์์ต๋๋ค. ์ด๋ฅผ ํด๊ฒฐํ๊ธฐ ์ํ ๋ฐฉ๋ฒ์ ๋ค์๊ณผ ๊ฐ์ต๋๋ค:
1. ์คํํฌ ์ค์ ์กฐ์ (๋ฉ๋ชจ๋ฆฌ ๊ด๋ฆฌ)
- Executor ๋ฉ๋ชจ๋ฆฌ ์ฆ๊ฐ: ํ์ฌ Executor ๋ฉ๋ชจ๋ฆฌ๊ฐ ๋๋์ ๋ฐ์ดํฐ๋ฅผ ์ฒ๋ฆฌํ๊ธฐ์ ๋ถ์กฑํ๋ค๋ฉด, ๋ฉ๋ชจ๋ฆฌ ํ ๋น๋์ ๋๋ ค์ผ ํฉ๋๋ค.
spark.conf.set("spark.executor.memory", "8g") # ํ์์ ๋ฐ๋ผ ์กฐ์ spark.conf.set("spark.driver.memory", "4g")
- Executor ์ฝ์ด ์กฐ์ : Executor๋น ๋๋ฌด ๋ง์ ์ฝ์ด๋ฅผ ํ ๋นํ๋ฉด ๋ฉ๋ชจ๋ฆฌ ๊ณผ๋ถํ๊ฐ ๋ฐ์ํ ์ ์์ผ๋ฏ๋ก ์ ์ ํ ์กฐ์ ํฉ๋๋ค.
spark.conf.set("spark.executor.cores", "2")
- ๋์คํฌ๋ก ๋ฐ์ดํฐ ์คํ๋ก๋: ๋ ํฐ ๋ฐ์ดํฐ๋ฅผ ๋์คํฌ ๊ธฐ๋ฐ ์ ์ฅ์๋ก ์คํ๋ก๋ํ๋ฉด ๋ฉ๋ชจ๋ฆฌ ๋ถ์กฑ ์ค๋ฅ๋ฅผ ๋ฐฉ์งํ ์ ์์ต๋๋ค.
df.persist(StorageLevel.DISK_ONLY)
2. ๋์ ์์ ํ ๋น ํ์ฑํ
- ์์ ๋ถํ์ ๋ฐ๋ผ Executor ์๋ฅผ ์กฐ์ ํ ์ ์๋ ๋์ ์์ ํ ๋น์ ํ์ฑํํ๋ฉด, ์์์น ๋ชปํ ๋๋ ๋ฐ์ดํฐ ์ฒ๋ฆฌ์ ๋์์ด ๋ฉ๋๋ค.
-
spark.conf.set("spark.dynamicAllocation.enabled", "true") spark.conf.set("spark.dynamicAllocation.minExecutors", "1") spark.conf.set("spark.dynamicAllocation.maxExecutors", "100") # ํด๋ฌ์คํฐ ํฌ๊ธฐ์ ๋ง๊ฒ ์กฐ์
3. ์ ์ํ ์ฟผ๋ฆฌ ์คํ(AQE) ํ์ฑํ
- AQE๋ฅผ ํ์ฑํํ๋ฉด ๋ฐํ์ ๋ฐ์ดํฐ ๋ฉํธ๋ฆญ์ ๋ฐ๋ผ ์ฟผ๋ฆฌ ํ๋์ ๋์ ์ผ๋ก ์ต์ ํํ๊ณ , ์
ํ ํํฐ์
์ ์กฐ์ ํ๊ฑฐ๋ ํตํฉํ์ฌ ๋ฐ์ดํฐ ๋ถ๊ท ํ์ ํจ๊ณผ์ ์ผ๋ก ์ฒ๋ฆฌํ ์ ์์ต๋๋ค.
spark.conf.set("spark.sql.adaptive.enabled", "true") spark.conf.set("spark.sql.adaptive.shuffle.targetPostShuffleInputSize", "64MB") # ์ ํ ์ต์ ํ
4. ๋น์ ํ ๋ฐ์ดํฐ์ ๋ํ ์คํค๋ง ๊ฐ์ ์ ์ฉ
- ๋ค์ด์ค๋ ๋ฐ์ดํฐ์ ๊ตฌ์กฐ๊ฐ ๋ถํ์คํ๊ฑฐ๋ ๋ณ๋์ด ์ฌํ ๊ฒฝ์ฐ, ์คํค๋ง ์ถ๋ก ๋์ ์คํค๋ง๋ฅผ ๋ช
์์ ์ผ๋ก ์ ์ํ๋ฉด ๋ฉ๋ชจ๋ฆฌ ์ฌ์ฉ์ ์ต์ ํํ ์ ์์ต๋๋ค.
from pyspark.sql.types import StructType, StructField, StringType, IntegerType schema = StructType([ StructField("col1", StringType(), True), StructField("col2", IntegerType(), True), # ๋๋จธ์ง ์คํค๋ง ์ ์ ]) df = spark.read.schema(schema).json("path/to/data") # ์คํค๋ง ์ง์
5. ํํฐ์ ์ ์กฐ์
- ๋ฐ์ดํฐ ํฌ๊ธฐ์ ๋ฐ๋ผ DataFrame์ ํํฐ์
์ ์กฐ์ ํ์ฌ ๋ฉ๋ชจ๋ฆฌ์ ์ฑ๋ฅ ๊ฐ ๊ท ํ์ ๋ง์ถฅ๋๋ค. ํํฐ์
์๊ฐ ๋๋ฌด ์ ์ผ๋ฉด OOM ์ค๋ฅ๊ฐ ๋ฐ์ํ ์ ์๊ณ , ๋๋ฌด ๋ง์ผ๋ฉด ์ค๋ฒํค๋๊ฐ ์ฆ๊ฐํฉ๋๋ค.
df = df.repartition(200, "column_name") # ํด๋ฌ์คํฐ ํฌ๊ธฐ ๋ฐ ๋ฐ์ดํฐ ์์ ๋ฐ๋ผ ์กฐ์
6. ๋ฐ์ดํฐ ๋ถ๊ท ํ ๋์ ์ฒ๋ฆฌ
- ๋ฐ์ดํฐ ๋ถ๊ท ํ ๋ฌธ์ ๊ฐ ์ฌ์ ํ ์กด์ฌํ๋ค๋ฉด, Salting ๊ธฐ๋ฒ์ด๋ ๋น๋์นญ ์กฐ์ธ ์ต์ ํ๋ฅผ ์ฌ์ฉํ์ฌ ํน์ Executor์ ๋ฉ๋ชจ๋ฆฌ ๊ณผ๋ถํ๊ฐ ๋ฐ์ํ์ง ์๋๋ก ํฉ๋๋ค.
import pyspark.sql.functions as F df1 = df1.withColumn("join_key_salted", F.concat(F.col("join_key"), F.lit("_"), F.rand())) df2 = df2.withColumn("join_key_salted", F.concat(F.col("join_key"), F.lit("_"), F.rand())) df_joined = df1.join(df2, "join_key_salted", "left")
7. ๋๊ท๋ชจ DataFrame ์บ์ ์ ํ
- ๋๊ท๋ชจ DataFrame์ ์บ์ํ๋ ๊ฒฝ์ฐ ํ์์ฑ์ ๋ค์ ๊ฒํ ํ์ญ์์ค. ๋ฉ๋ชจ๋ฆฌ์ ํฐ ๋ฐ์ดํฐ์
์ ์บ์ํ๋ฉด OOM ์ค๋ฅ๊ฐ ๋ฐ์ํ ์ ์์ต๋๋ค. ํ์ํ๋ค๋ฉด ๋์คํฌ์ ์ ์ฅํฉ๋๋ค
df.persist(StorageLevel.MEMORY_AND_DISK)
8. ๋๊ท๋ชจ DataFrame ์กฐ์ธ ์ต์ ํ
- ์กฐ์ธ์ ์ฌ์ฉํ ๋ OOM ์ค๋ฅ๊ฐ ๋ฐ์ํ๋ค๋ฉด, ์์ ํ
์ด๋ธ์๋ Broadcast Join์ ํ์ฉํ๊ณ , ์กฐ์ธ ํค๊ฐ ๊ณ ๋ฅด๊ฒ ๋ถ์ฐ๋์๋์ง ํ์ธํฉ๋๋ค.
from pyspark.sql.functions import broadcast large_df = spark.read.parquet("path_to_large_df") small_df = spark.read.parquet("path_to_small_df") df_join = large_df.join(broadcast(small_df), "join_key", "left")
9. ์คํํฌ ์์ ์ถ์ ๋ฐ ๋ชจ๋ํฐ๋ง
- Spark UI๋ฅผ ์ฌ์ฉํด ๋ฉ๋ชจ๋ฆฌ ์ฌ์ฉ๋, ์์ ์คํ, ๋ฐ์ดํฐ ์ ํ๋ง์ ๋ชจ๋ํฐ๋งํฉ๋๋ค. ๋ฉ๋ชจ๋ฆฌ๋ฅผ ๊ณผ๋ํ๊ฒ ์ฌ์ฉํ๋ ๋จ๊ณ๋ ๋ถ๊ท ํ์ ์ ๋ฐํ๋ ์์๋ฅผ ์๋ณํ๊ณ , ํ์ดํ๋ผ์ธ์ ์ต์ ํํฉ๋๋ค.
10. ํํฐ์ ๋ ์ ๋ต ๊ณ ๋ ค
- ๋๊ท๋ชจ ๋ฐ์ดํฐ์
์ ์ฒ๋ฆฌํ๋ ์์
์์๋ HDFS, S3, Delta Lake์ ๊ฐ์ ๋ฐ์ดํฐ ์์ค์ ๋ํด ์ ์ ํ ํํฐ์
๋์ ์ฌ์ฉํ์ฌ ์ฝ๋ ๋ฐ์ดํฐ ์์ ์ ํํด์ผ ํฉ๋๋ค. ์ ์ ํ ํํฐ์
๋๋ ๋ฐ์ดํฐ๋ Spark๊ฐ ์์
์ ํ์ํ ๋ฐ์ดํฐ๋ง ์ฒ๋ฆฌํ๋๋ก ๋์ต๋๋ค.
df.write.partitionBy("partition_column").parquet("path_to_data")
๊ฒฐ๋ก
์คํํฌ์์ OutOfMemory(OOM) ์ค๋ฅ๋ ์ฃผ๋ก ์๋ชป๋ ๋ฉ๋ชจ๋ฆฌ ์ค์ , ๋ฐ์ดํฐ ๋ถ๊ท ํ, ๋นํจ์จ์ ์ธ ํํฐ์ ๋ ์ ๋ต์ผ๋ก ์ธํด ๋ฐ์ํฉ๋๋ค. ๋ฉ๋ชจ๋ฆฌ ํ๋ ๊ธฐ์ , ์คํค๋ง ๊ฐ์ ์ ์ฉ, ํจ์จ์ ์ธ ํํฐ์ ๋์ ํตํด ์ด๋ฌํ ๋ฌธ์ ๋ฅผ ์ํํ๊ณ , ์คํํฌ ์์ ์ ๋ ์ํํ๊ณ ๋น ๋ฅด๊ฒ ์คํํ ์ ์์ต๋๋ค.
๋ฐ์ดํฐ์ ํน์ฑ๊ณผ ํด๋ฌ์คํฐ ํ๊ฒฝ์ ๊ธฐ๋ฐ์ผ๋ก ํ์ดํ๋ผ์ธ์ ๋ชจ๋ํฐ๋งํ๊ณ ์กฐ์ ํ๋ฉด OOM ๋ฌธ์ ๋ฅผ ํด๊ฒฐํ๊ณ ์์ธก ๋ถ๊ฐ๋ฅํ ๋ฐ์ดํฐ ๊ตฌ์กฐ์๋ ์์ ์ ์ธ ํ๋ก๋์
ํ๊ฒฝ์ ์ ์งํ ์ ์์ต๋๋ค.
'note' ์นดํ ๊ณ ๋ฆฌ์ ๋ค๋ฅธ ๊ธ
Python: Parallelism(๋ณ๋ ฌ์ฒ๋ฆฌ) Vs Concurrency(๋์์ฑ) Vs Threading(์ค๋ ๋ฉ) (0) | 2024.12.12 |
---|---|
SparkSession๊ณผ SparkContext ์ดํดํ๊ธฐ (0) | 2024.12.04 |
์์ ๋ก ์ดํด๋ณด๋ AI๋ฅผ ์ํ ๋ฐ์ดํฐ ์์ง๋์ด๋ง (0) | 2024.11.30 |
Kubernetes HPA (Horizontal Pod Autoscaler) ์์๋ณด๊ธฐ (0) | 2024.11.27 |
Spark ์กฐ์ธ(join) ์ต์ ํ (0) | 2024.11.26 |