ML/MLOps Engineer
PySpark ํ”„๋กœ๋•์…˜ ํ™˜๊ฒฝ์—์„œ OutOfMemory(OOM) ๋ฌธ์ œ ํ•ด๊ฒฐํ•˜๊ธฐ

Resolving OutOfMemory (OOM) Errors in PySpark: Best Practices for Optimizing Spark Jobs in Production (opens new window)๋ฅผ ์ฝ๊ณ  ์ •๋ฆฌํ•˜์˜€์Šต๋‹ˆ๋‹ค.

์บ์‹ฑ, ์žฌํŒŒํ‹ฐ์…”๋‹, ์ตœ์ ํ™”๋œ ์กฐ์ธ, ์†์ƒ๋œ ๋ ˆ์ฝ”๋“œ ์ฒ˜๋ฆฌ, ๋ฐ์ดํ„ฐ ๋ถˆ๊ท ํ˜• ๋ฌธ์ œ๋ฅผ ํ•ด๊ฒฐํ•˜๋Š” ๋“ฑ์˜ ์ตœ์ ํ™”๋ฅผ ์ ์šฉํ–ˆ์Œ์—๋„ ๋ถˆ๊ตฌํ•˜๊ณ  PySpark ํ”„๋กœ๋•์…˜ ํ™˜๊ฒฝ์—์„œ OutOfMemory(OOM) ์˜ค๋ฅ˜๊ฐ€ ๋ฐœ์ƒํ•˜๋Š” ๊ฒฝ์šฐ ๋ฐ์ดํ„ฐ ๊ตฌ์กฐ์˜ ๋ถˆํ™•์‹ค์„ฑ์ด๋‚˜ ๋น„ํšจ์œจ์ ์ธ ๋ฉ”๋ชจ๋ฆฌ ๊ด€๋ฆฌ์— ๊ธฐ์ธํ•  ์ˆ˜ ์žˆ์Šต๋‹ˆ๋‹ค. ์ด๋ฅผ ํ•ด๊ฒฐํ•˜๊ธฐ ์œ„ํ•œ ๋ฐฉ๋ฒ•์€ ๋‹ค์Œ๊ณผ ๊ฐ™์Šต๋‹ˆ๋‹ค:

1. ์ŠคํŒŒํฌ ์„ค์ • ์กฐ์ • (๋ฉ”๋ชจ๋ฆฌ ๊ด€๋ฆฌ)

  • Executor ๋ฉ”๋ชจ๋ฆฌ ์ฆ๊ฐ€: ํ˜„์žฌ Executor ๋ฉ”๋ชจ๋ฆฌ๊ฐ€ ๋Œ€๋Ÿ‰์˜ ๋ฐ์ดํ„ฐ๋ฅผ ์ฒ˜๋ฆฌํ•˜๊ธฐ์— ๋ถ€์กฑํ•˜๋‹ค๋ฉด, ๋ฉ”๋ชจ๋ฆฌ ํ• ๋‹น๋Ÿ‰์„ ๋Š˜๋ ค์•ผ ํ•ฉ๋‹ˆ๋‹ค.
    spark.conf.set("spark.executor.memory", "8g")  # ํ•„์š”์— ๋”ฐ๋ผ ์กฐ์ •
    spark.conf.set("spark.driver.memory", "4g")
  • Executor ์ฝ”์–ด ์กฐ์ •: Executor๋‹น ๋„ˆ๋ฌด ๋งŽ์€ ์ฝ”์–ด๋ฅผ ํ• ๋‹นํ•˜๋ฉด ๋ฉ”๋ชจ๋ฆฌ ๊ณผ๋ถ€ํ•˜๊ฐ€ ๋ฐœ์ƒํ•  ์ˆ˜ ์žˆ์œผ๋ฏ€๋กœ ์ ์ ˆํžˆ ์กฐ์ •ํ•ฉ๋‹ˆ๋‹ค.
    spark.conf.set("spark.executor.cores", "2")
     
  •  ๋””์Šคํฌ๋กœ ๋ฐ์ดํ„ฐ ์˜คํ”„๋กœ๋“œ: ๋” ํฐ ๋ฐ์ดํ„ฐ๋ฅผ ๋””์Šคํฌ ๊ธฐ๋ฐ˜ ์ €์žฅ์†Œ๋กœ ์˜คํ”„๋กœ๋“œํ•˜๋ฉด ๋ฉ”๋ชจ๋ฆฌ ๋ถ€์กฑ ์˜ค๋ฅ˜๋ฅผ ๋ฐฉ์ง€ํ•  ์ˆ˜ ์žˆ์Šต๋‹ˆ๋‹ค.
    df.persist(StorageLevel.DISK_ONLY)

2. ๋™์  ์ž์› ํ• ๋‹น ํ™œ์„ฑํ™”

  • ์ž‘์—… ๋ถ€ํ•˜์— ๋”ฐ๋ผ Executor ์ˆ˜๋ฅผ ์กฐ์ •ํ•  ์ˆ˜ ์žˆ๋Š” ๋™์  ์ž์› ํ• ๋‹น์„ ํ™œ์„ฑํ™”ํ•˜๋ฉด, ์˜ˆ์ƒ์น˜ ๋ชปํ•œ ๋Œ€๋Ÿ‰ ๋ฐ์ดํ„ฐ ์ฒ˜๋ฆฌ์— ๋„์›€์ด ๋ฉ๋‹ˆ๋‹ค. 
  • spark.conf.set("spark.dynamicAllocation.enabled", "true")
    spark.conf.set("spark.dynamicAllocation.minExecutors", "1")
    spark.conf.set("spark.dynamicAllocation.maxExecutors", "100")  # ํด๋Ÿฌ์Šคํ„ฐ ํฌ๊ธฐ์— ๋งž๊ฒŒ ์กฐ์ •
    

3. ์ ์‘ํ˜• ์ฟผ๋ฆฌ ์‹คํ–‰(AQE) ํ™œ์„ฑํ™”

  • AQE๋ฅผ ํ™œ์„ฑํ™”ํ•˜๋ฉด ๋Ÿฐํƒ€์ž„ ๋ฐ์ดํ„ฐ ๋ฉ”ํŠธ๋ฆญ์— ๋”ฐ๋ผ ์ฟผ๋ฆฌ ํ”Œ๋žœ์„ ๋™์ ์œผ๋กœ ์ตœ์ ํ™”ํ•˜๊ณ , ์…”ํ”Œ ํŒŒํ‹ฐ์…˜์„ ์กฐ์ •ํ•˜๊ฑฐ๋‚˜ ํ†ตํ•ฉํ•˜์—ฌ ๋ฐ์ดํ„ฐ ๋ถˆ๊ท ํ˜•์„ ํšจ๊ณผ์ ์œผ๋กœ ์ฒ˜๋ฆฌํ•  ์ˆ˜ ์žˆ์Šต๋‹ˆ๋‹ค.
    spark.conf.set("spark.sql.adaptive.enabled", "true")
    spark.conf.set("spark.sql.adaptive.shuffle.targetPostShuffleInputSize", "64MB")  # ์…”ํ”Œ ์ตœ์ ํ™”
    

4. ๋น„์ •ํ˜• ๋ฐ์ดํ„ฐ์— ๋Œ€ํ•œ ์Šคํ‚ค๋งˆ ๊ฐ•์ œ ์ ์šฉ

  • ๋“ค์–ด์˜ค๋Š” ๋ฐ์ดํ„ฐ์˜ ๊ตฌ์กฐ๊ฐ€ ๋ถˆํ™•์‹คํ•˜๊ฑฐ๋‚˜ ๋ณ€๋™์ด ์‹ฌํ•œ ๊ฒฝ์šฐ, ์Šคํ‚ค๋งˆ ์ถ”๋ก  ๋Œ€์‹  ์Šคํ‚ค๋งˆ๋ฅผ ๋ช…์‹œ์ ์œผ๋กœ ์ •์˜ํ•˜๋ฉด ๋ฉ”๋ชจ๋ฆฌ ์‚ฌ์šฉ์„ ์ตœ์ ํ™”ํ•  ์ˆ˜ ์žˆ์Šต๋‹ˆ๋‹ค.
    from pyspark.sql.types import StructType, StructField, StringType, IntegerType
    
    schema = StructType([
        StructField("col1", StringType(), True),
        StructField("col2", IntegerType(), True),
        # ๋‚˜๋จธ์ง€ ์Šคํ‚ค๋งˆ ์ •์˜
    ])
    
    df = spark.read.schema(schema).json("path/to/data")  # ์Šคํ‚ค๋งˆ ์ง€์ •
    

5. ํŒŒํ‹ฐ์…˜ ์ˆ˜ ์กฐ์ •

  • ๋ฐ์ดํ„ฐ ํฌ๊ธฐ์— ๋”ฐ๋ผ DataFrame์˜ ํŒŒํ‹ฐ์…˜์„ ์กฐ์ •ํ•˜์—ฌ ๋ฉ”๋ชจ๋ฆฌ์™€ ์„ฑ๋Šฅ ๊ฐ„ ๊ท ํ˜•์„ ๋งž์ถฅ๋‹ˆ๋‹ค. ํŒŒํ‹ฐ์…˜ ์ˆ˜๊ฐ€ ๋„ˆ๋ฌด ์ ์œผ๋ฉด OOM ์˜ค๋ฅ˜๊ฐ€ ๋ฐœ์ƒํ•  ์ˆ˜ ์žˆ๊ณ , ๋„ˆ๋ฌด ๋งŽ์œผ๋ฉด ์˜ค๋ฒ„ํ—ค๋“œ๊ฐ€ ์ฆ๊ฐ€ํ•ฉ๋‹ˆ๋‹ค.
    df = df.repartition(200, "column_name")  # ํด๋Ÿฌ์Šคํ„ฐ ํฌ๊ธฐ ๋ฐ ๋ฐ์ดํ„ฐ ์–‘์— ๋”ฐ๋ผ ์กฐ์ •
    

6. ๋ฐ์ดํ„ฐ ๋ถˆ๊ท ํ˜• ๋™์  ์ฒ˜๋ฆฌ

  • ๋ฐ์ดํ„ฐ ๋ถˆ๊ท ํ˜• ๋ฌธ์ œ๊ฐ€ ์—ฌ์ „ํžˆ ์กด์žฌํ•œ๋‹ค๋ฉด, Salting ๊ธฐ๋ฒ•์ด๋‚˜ ๋น„๋Œ€์นญ ์กฐ์ธ ์ตœ์ ํ™”๋ฅผ ์‚ฌ์šฉํ•˜์—ฌ ํŠน์ • Executor์— ๋ฉ”๋ชจ๋ฆฌ ๊ณผ๋ถ€ํ•˜๊ฐ€ ๋ฐœ์ƒํ•˜์ง€ ์•Š๋„๋ก ํ•ฉ๋‹ˆ๋‹ค.
    import pyspark.sql.functions as F
    
    df1 = df1.withColumn("join_key_salted", F.concat(F.col("join_key"), F.lit("_"), F.rand()))
    df2 = df2.withColumn("join_key_salted", F.concat(F.col("join_key"), F.lit("_"), F.rand()))
    
    df_joined = df1.join(df2, "join_key_salted", "left")
    

7. ๋Œ€๊ทœ๋ชจ DataFrame ์บ์‹œ ์ œํ•œ

  • ๋Œ€๊ทœ๋ชจ DataFrame์„ ์บ์‹œํ•˜๋Š” ๊ฒฝ์šฐ ํ•„์š”์„ฑ์„ ๋‹ค์‹œ ๊ฒ€ํ† ํ•˜์‹ญ์‹œ์˜ค. ๋ฉ”๋ชจ๋ฆฌ์— ํฐ ๋ฐ์ดํ„ฐ์…‹์„ ์บ์‹œํ•˜๋ฉด OOM ์˜ค๋ฅ˜๊ฐ€ ๋ฐœ์ƒํ•  ์ˆ˜ ์žˆ์Šต๋‹ˆ๋‹ค. ํ•„์š”ํ•˜๋‹ค๋ฉด ๋””์Šคํฌ์— ์ €์žฅํ•ฉ๋‹ˆ๋‹ค
    df.persist(StorageLevel.MEMORY_AND_DISK)
    

8. ๋Œ€๊ทœ๋ชจ DataFrame ์กฐ์ธ ์ตœ์ ํ™”

  • ์กฐ์ธ์„ ์‚ฌ์šฉํ•  ๋•Œ OOM ์˜ค๋ฅ˜๊ฐ€ ๋ฐœ์ƒํ•œ๋‹ค๋ฉด, ์ž‘์€ ํ…Œ์ด๋ธ”์—๋Š” Broadcast Join์„ ํ™œ์šฉํ•˜๊ณ , ์กฐ์ธ ํ‚ค๊ฐ€ ๊ณ ๋ฅด๊ฒŒ ๋ถ„์‚ฐ๋˜์—ˆ๋Š”์ง€ ํ™•์ธํ•ฉ๋‹ˆ๋‹ค.
    from pyspark.sql.functions import broadcast
    
    large_df = spark.read.parquet("path_to_large_df")
    small_df = spark.read.parquet("path_to_small_df")
    
    df_join = large_df.join(broadcast(small_df), "join_key", "left")
    

9. ์ŠคํŒŒํฌ ์ž‘์—… ์ถ”์  ๋ฐ ๋ชจ๋‹ˆํ„ฐ๋ง

  • Spark UI๋ฅผ ์‚ฌ์šฉํ•ด ๋ฉ”๋ชจ๋ฆฌ ์‚ฌ์šฉ๋Ÿ‰, ์ž‘์—… ์‹คํ–‰, ๋ฐ์ดํ„ฐ ์…”ํ”Œ๋ง์„ ๋ชจ๋‹ˆํ„ฐ๋งํ•ฉ๋‹ˆ๋‹ค. ๋ฉ”๋ชจ๋ฆฌ๋ฅผ ๊ณผ๋„ํ•˜๊ฒŒ ์‚ฌ์šฉํ•˜๋Š” ๋‹จ๊ณ„๋‚˜ ๋ถˆ๊ท ํ˜•์„ ์œ ๋ฐœํ•˜๋Š” ์š”์†Œ๋ฅผ ์‹๋ณ„ํ•˜๊ณ , ํŒŒ์ดํ”„๋ผ์ธ์„ ์ตœ์ ํ™”ํ•ฉ๋‹ˆ๋‹ค.

10. ํŒŒํ‹ฐ์…”๋‹ ์ „๋žต ๊ณ ๋ ค

  • ๋Œ€๊ทœ๋ชจ ๋ฐ์ดํ„ฐ์…‹์„ ์ฒ˜๋ฆฌํ•˜๋Š” ์ž‘์—…์—์„œ๋Š” HDFS, S3, Delta Lake์™€ ๊ฐ™์€ ๋ฐ์ดํ„ฐ ์†Œ์Šค์— ๋Œ€ํ•ด ์ ์ ˆํ•œ ํŒŒํ‹ฐ์…”๋‹์„ ์‚ฌ์šฉํ•˜์—ฌ ์ฝ๋Š” ๋ฐ์ดํ„ฐ ์–‘์„ ์ œํ•œํ•ด์•ผ ํ•ฉ๋‹ˆ๋‹ค. ์ ์ ˆํžˆ ํŒŒํ‹ฐ์…”๋‹๋œ ๋ฐ์ดํ„ฐ๋Š” Spark๊ฐ€ ์ž‘์—…์— ํ•„์š”ํ•œ ๋ฐ์ดํ„ฐ๋งŒ ์ฒ˜๋ฆฌํ•˜๋„๋ก ๋•์Šต๋‹ˆ๋‹ค.
    df.write.partitionBy("partition_column").parquet("path_to_data")
    

๊ฒฐ๋ก 

์ŠคํŒŒํฌ์—์„œ OutOfMemory(OOM) ์˜ค๋ฅ˜๋Š” ์ฃผ๋กœ ์ž˜๋ชป๋œ ๋ฉ”๋ชจ๋ฆฌ ์„ค์ •, ๋ฐ์ดํ„ฐ ๋ถˆ๊ท ํ˜•, ๋น„ํšจ์œจ์ ์ธ ํŒŒํ‹ฐ์…”๋‹ ์ „๋žต์œผ๋กœ ์ธํ•ด ๋ฐœ์ƒํ•ฉ๋‹ˆ๋‹ค. ๋ฉ”๋ชจ๋ฆฌ ํŠœ๋‹ ๊ธฐ์ˆ , ์Šคํ‚ค๋งˆ ๊ฐ•์ œ ์ ์šฉ, ํšจ์œจ์ ์ธ ํŒŒํ‹ฐ์…”๋‹์„ ํ†ตํ•ด ์ด๋Ÿฌํ•œ ๋ฌธ์ œ๋ฅผ ์™„ํ™”ํ•˜๊ณ , ์ŠคํŒŒํฌ ์ž‘์—…์„ ๋” ์›ํ™œํ•˜๊ณ  ๋น ๋ฅด๊ฒŒ ์‹คํ–‰ํ•  ์ˆ˜ ์žˆ์Šต๋‹ˆ๋‹ค.


๋ฐ์ดํ„ฐ์˜ ํŠน์„ฑ๊ณผ ํด๋Ÿฌ์Šคํ„ฐ ํ™˜๊ฒฝ์„ ๊ธฐ๋ฐ˜์œผ๋กœ ํŒŒ์ดํ”„๋ผ์ธ์„ ๋ชจ๋‹ˆํ„ฐ๋งํ•˜๊ณ  ์กฐ์ •ํ•˜๋ฉด OOM ๋ฌธ์ œ๋ฅผ ํ•ด๊ฒฐํ•˜๊ณ  ์˜ˆ์ธก ๋ถˆ๊ฐ€๋Šฅํ•œ ๋ฐ์ดํ„ฐ ๊ตฌ์กฐ์—๋„ ์•ˆ์ •์ ์ธ ํ”„๋กœ๋•์…˜ ํ™˜๊ฒฝ์„ ์œ ์ง€ํ•  ์ˆ˜ ์žˆ์Šต๋‹ˆ๋‹ค.