ML/MLOps Engineer
Spark๋ฅผ ์‚ฌ์šฉํ•œ Config-Driven ๋ฐ์ดํ„ฐ ํ‘œ์ค€ํ™” ํ”„๋ ˆ์ž„์›Œํฌ (๋ฒˆ์—ญ)

Medium ์˜ Config-Driven Data Standardization Framework using Spark ๋ฅผ ๋ฒˆ์—ญํ•œ ๊ธ€์ž…๋‹ˆ๋‹ค.

 

Spark๋ฅผ ์‚ฌ์šฉํ•œ Config-Driven ๋ฐ์ดํ„ฐ ํ‘œ์ค€ํ™” ํ”„๋ ˆ์ž„์›Œํฌ

 

https://medium.com/@pallavisinha12/config-driven-data-standardization-framework-using-spark-12aa7c52fae1


์˜ค๋Š˜๋‚  ์กฐ์ง์€ ๋‹ค์–‘ํ•œ ์›์ฒœ ๋ฐ์ดํ„ฐ ์†Œ์Šค์—์„œ ๋ฐฉ๋Œ€ํ•œ ์–‘์˜ ์›์‹œ ๋ฐ์ดํ„ฐ๋ฅผ ์ˆ˜์ง‘ํ•ฉ๋‹ˆ๋‹ค. ์ด ๋ฐ์ดํ„ฐ๋Š” ์ข…์ข… ์ผ๊ด€์„ฑ์ด ์—†๊ณ , ๊ตฌ์กฐ๊ฐ€ ๋ถˆ์™„์ „ํ•˜๋ฉฐ ๋ถ„์„๊ณผ ์˜์‚ฌ ๊ฒฐ์ •์„ ์œ„ํ•ด์„œ ์ƒ๋‹นํ•œ ์ „์ฒ˜๋ฆฌ๋ฅผ ํ•„์š”๋กœ ํ•ฉ๋‹ˆ๋‹ค. ์ด๋Ÿฌํ•œ ๋ฌธ์ œ๋ฅผ ํ•ด๊ฒฐํ•˜๊ธฐ ์œ„ํ•ด์„œ๋Š” ๊ฐ•๋ ฅํ•œ ๋ฐ์ดํ„ฐ ํ‘œ์ค€ํ™” ํ”„๋กœ์„ธ์Šค๊ฐ€ ํ•„์ˆ˜์ ์ž…๋‹ˆ๋‹ค.

๋จผ์ € ๋ฐ์ดํ„ฐ ํ‘œ์ค€ํ™”์˜ ์ค‘์š”์„ฑ์„ ๊ฐ„๋‹จํžˆ ์„ค๋ช…ํ•˜๊ณ , ๊ทธ ํ›„ ์„ค์ • ๊ธฐ๋ฐ˜ ์ ‘๊ทผ ๋ฐฉ์‹์˜ ์žฅ์ ์„ ์‚ดํŽด๋ณด๋ฉฐ, ๋งˆ์ง€๋ง‰์œผ๋กœ ์ด ํ”„๋ ˆ์ž„์›Œํฌ์˜ ๋‹จ๊ณ„๋ณ„ ๊ฐœ๋ฐœ ๊ณผ์ •์„ ๋‹ค๋ฃน๋‹ˆ๋‹ค. ๋˜ํ•œ ์ด ํ”„๋ ˆ์ž„์›Œํฌ๋ฅผ ์‚ฌ์šฉ์ž์˜ ์‚ฌ์šฉ ์‚ฌ๋ก€์— ์–ด๋–ป๊ฒŒ ํ™•์žฅํ•  ์ˆ˜ ์žˆ์œผ๋ฉฐ, ์–ด๋–ค ์œ ์šฉํ•œ ์ ์ด ์žˆ๋Š”์ง€๋„ ์‚ดํŽด๋ณด๊ฒ ์Šต๋‹ˆ๋‹ค.

 

๋ฐ์ดํ„ฐ ํ‘œ์ค€ํ™”๋ž€?

๋ฐ์ดํ„ฐ ํ‘œ์ค€ํ™”๋Š” ๋‹ค์–‘ํ•œ ์†Œ์Šค์—์„œ ์ˆ˜์ง‘๋œ ๋ฐ์ดํ„ฐ๋ฅผ ๊ณตํ†ต๋œ ํ˜•์‹์œผ๋กœ ๋ณ€ํ™˜ํ•˜๋Š” ๊ณผ์ •์ž…๋‹ˆ๋‹ค. ์—ฌ๊ธฐ์—๋Š” ๋ช…๋ช… ๊ทœ์น™์„ ์ผ์น˜์‹œํ‚ค๊ณ , ๋ฐ์ดํ„ฐ ์œ ํ˜•์„ ์ •๋ ฌํ•˜๋ฉฐ, ๋ฉ”ํƒ€๋ฐ์ดํ„ฐ๋ฅผ ์ถ”๊ฐ€ํ•˜๋Š” ๋“ฑ์˜ ์ž‘์—…์ด ํฌํ•จ๋˜์–ด ๋ฐ์ดํ„ฐ ๊ฐ„ ์ผ๊ด€์„ฑ์„ ๋ณด์žฅํ•ฉ๋‹ˆ๋‹ค.

 

์›์‹œ ๋ฐ์ดํ„ฐ๋ฅผ ํ‘œ์ค€ํ™”ํ•ด์•ผ ํ•˜๋Š” ์ด์œ 

1. ๋ฐ์ดํ„ฐ ์ œํ’ˆ ๊ฐ„ ์ผ๊ด€์„ฑ: ํ‘œ์ค€ํ™”๋Š” ์„œ๋กœ ๋‹ค๋ฅธ ์†Œ์Šค์—์„œ ์ˆ˜์ง‘๋œ ๋ฐ์ดํ„ฐ๊ฐ€ ๋™์ผํ•œ ๊ตฌ์กฐ๋ฅผ ๋”ฐ๋ฅด๋„๋ก ํ•˜์—ฌ ํ†ตํ•ฉ ๋ฐ ๋ถ„์„์ด ์šฉ์ดํ•˜๊ฒŒ ๋งŒ๋“ญ๋‹ˆ๋‹ค.
2. ์˜ค๋ฅ˜ ๊ฐ์†Œ: ์ผ๊ด€๋œ ๋ช…๋ช… ๊ทœ์น™, ์˜ฌ๋ฐ”๋ฅธ ๋ฐ์ดํ„ฐ ์œ ํ˜• ๋ฐ ๋ณ€ํ™˜์„ ์ ์šฉํ•จ์œผ๋กœ์จ ๋ฐ์ดํ„ฐ ์ฒ˜๋ฆฌ ๋ฐ ๋ถ„์„ ๊ณผ์ •์—์„œ ๋ฐœ์ƒํ•  ์ˆ˜ ์žˆ๋Š” ์˜ค๋ฅ˜๋ฅผ ํฌ๊ฒŒ ์ค„์ผ ์ˆ˜ ์žˆ์Šต๋‹ˆ๋‹ค.
3. ๋ฐ์ดํ„ฐ ํ™œ์šฉ์„ฑ ํ–ฅ์ƒ: ํ‘œ์ค€ํ™”๋œ ๋ฐ์ดํ„ฐ๋Š” ๋‹ค์–‘ํ•œ ๋ถ„์„ ๋ฐ ๋ณด๊ณ  ๋ชฉ์ ์œผ๋กœ ๋” ์‰ฝ๊ฒŒ ์ ‘๊ทผํ•˜๊ณ  ํ™œ์šฉํ•  ์ˆ˜ ์žˆ์œผ๋ฉฐ, ๋” ๋‚˜์€ ํ†ต์ฐฐ๋ ฅ๊ณผ ๋น„์ฆˆ๋‹ˆ์Šค ๊ฒฐ์ •์„ ์ด๋Œ์–ด๋ƒ…๋‹ˆ๋‹ค.
4. ํ™•์žฅ์„ฑ: ๋ฐ์ดํ„ฐ ์–‘์ด ์ฆ๊ฐ€ํ•จ์— ๋”ฐ๋ผ ํ‘œ์ค€ํ™”๋œ ๋ฐ์ดํ„ฐ ์ œํ’ˆ์€ ์ƒˆ๋กœ์šด ๋ฐ์ดํ„ฐ ์†Œ์Šค์™€ ์ปฌ๋Ÿผ์„ ์ถ”๊ฐ€ํ•  ๋•Œ ๋” ์‰ฝ๊ฒŒ ๊ด€๋ฆฌํ•˜๊ณ  ํ™•์žฅํ•  ์ˆ˜ ์žˆ์Šต๋‹ˆ๋‹ค.

 

์„ค์ •(config) ๊ธฐ๋ฐ˜ ๋ฐ์ดํ„ฐ ํ‘œ์ค€ํ™” ํ”„๋ ˆ์ž„์›Œํฌ

์ด ํ”„๋ ˆ์ž„์›Œํฌ์˜ ๋ชฉํ‘œ๋Š” ๊ฐ ๋ฐ์ดํ„ฐ ์ œํ’ˆ์— ๋Œ€ํ•ด ๋งž์ถคํ˜• ์ˆ˜์ง‘ ๋ฐ ๋ณ€ํ™˜ ์Šคํฌ๋ฆฝํŠธ๋ฅผ ์ž‘์„ฑํ•˜๋Š” ๋Œ€์‹ , ์„ค์ • ํŒŒ์ผ์„ ์‚ฌ์šฉํ•˜์—ฌ ๊ทœ์น™๊ณผ ๋งคํ•‘์„ ์ •์˜ํ•˜๋Š” ๋ฐฉ์‹์„ ์‚ฌ์šฉํ•ฉ๋‹ˆ๋‹ค. ์ด๋ฅผ ํ†ตํ•ด ํ‘œ์ค€ํ™” ๋กœ์ง์ด ์• ํ”Œ๋ฆฌ์ผ€์ด์…˜ ์ฝ”๋“œ์—์„œ ๋ถ„๋ฆฌ๋˜๋ฉฐ, ์‹œ์Šคํ…œ์˜ ์œ ์—ฐ์„ฑ๊ณผ ์œ ์ง€ ๊ด€๋ฆฌ์„ฑ์„ ํ–ฅ์ƒ์‹œํ‚ต๋‹ˆ๋‹ค.

์ด ํŠœํ† ๋ฆฌ์–ผ์—์„œ๋Š” ์„ค์ • ํŒŒ์ผ ํ˜•์‹์œผ๋กœ JSON์„ ์‚ฌ์šฉํ•˜๊ณ , ์ฝ”๋“œ๋Š” PySpark์™€ Spark SQL์„ ํ™œ์šฉํ•  ๊ฒƒ์ž…๋‹ˆ๋‹ค. ๋ฐ์ดํ„ฐ ์ œํ’ˆ์€ Delta ํ˜•์‹์œผ๋กœ ์ €์žฅ๋ฉ๋‹ˆ๋‹ค.

์„ค์ • ๊ธฐ๋ฐ˜ ์ ‘๊ทผ ๋ฐฉ์‹์˜ ์ฃผ์š” ์žฅ์ 
1. ์œ ์—ฐ์„ฑ: ์• ํ”Œ๋ฆฌ์ผ€์ด์…˜ ์ฝ”๋“œ๋ฅผ ๋ณ€๊ฒฝํ•˜์ง€ ์•Š๊ณ ๋„ ํ‘œ์ค€ํ™” ๊ทœ์น™์„ ์‰ฝ๊ฒŒ ์ˆ˜์ •ํ•  ์ˆ˜ ์žˆ์Šต๋‹ˆ๋‹ค.
2. ํ™•์žฅ์„ฑ: Spark๋ฅผ ์‚ฌ์šฉํ•˜์—ฌ ๋Œ€๊ทœ๋ชจ ๋ฐ์ดํ„ฐ ์„ธํŠธ๋ฅผ ํšจ์œจ์ ์œผ๋กœ ์ฒ˜๋ฆฌํ•  ์ˆ˜ ์žˆ์Šต๋‹ˆ๋‹ค.
3. ์œ ์ง€ ๋ณด์ˆ˜์„ฑ: ๋ณ€ํ™˜ ๋กœ์ง์ด ์„ค์ • ํŒŒ์ผ์— ์ค‘์•™ ์ง‘์ค‘ํ™”๋˜์–ด ์žˆ์–ด ์œ ์ง€ ๊ด€๋ฆฌ์™€ ์—…๋ฐ์ดํŠธ๊ฐ€ ์šฉ์ดํ•ฉ๋‹ˆ๋‹ค.

์šฐ๋ฆฌ๊ฐ€ ๊ตฌ์ถ•ํ•  ๊ธฐ๋Šฅ์€ ๋‹ค์Œ๊ณผ ๊ฐ™์Šต๋‹ˆ๋‹ค:
- ์ปฌ๋Ÿผ ์ด๋ฆ„ ์—…๋ฐ์ดํŠธ
- ๋ฐ์ดํ„ฐ ๋ณ€ํ™˜
- ์ปฌ๋Ÿผ ๋ฐ์ดํ„ฐ ์œ ํ˜• ์—…๋ฐ์ดํŠธ
- ์ปฌ๋Ÿผ ์„ค๋ช… ๋ฉ”ํƒ€๋ฐ์ดํ„ฐ ์—…๋ฐ์ดํŠธ
- ์ƒˆ๋กœ์šด ์ปฌ๋Ÿผ ์ถ”๊ฐ€

์‹œ์ž‘ํ•˜๊ธฐ ์ „์—, ๋ฐ์ดํ„ฐ ํ‘œ์ค€ํ™”๋Š” ์ด ๊ฒŒ์‹œ๊ธ€์—์„œ  ๋‹ค๋ฃจ๋Š” ๊ฒƒ ์ด์ƒ์˜ ํ™œ๋™์„ ํฌ๊ด„ํ•  ์ˆ˜ ์žˆ์Œ์„ ์–ธ๊ธ‰ํ•˜๊ณ  ์‹ถ์Šต๋‹ˆ๋‹ค. ๋ฐ์ดํ„ฐ ํ’ˆ์งˆ ๊ฒ€์ฆ ์ถ”๊ฐ€, ์ถ”๊ฐ€ ๋ฉ”ํƒ€๋ฐ์ดํ„ฐ ํ†ตํ•ฉ, ๋ฐ์ดํ„ฐ ์ œํ’ˆ ๋ฒ„์ „ ๊ด€๋ฆฌ ๋“ฑ ๋ฐ์ดํ„ฐ ๋ฌด๊ฒฐ์„ฑ๊ณผ ์‚ฌ์šฉ์„ฑ์„ ๋†’์ด๋Š” ๋‹ค๋ฅธ ํ”„๋กœ์„ธ์Šค๊ฐ€ ํฌํ•จ๋  ์ˆ˜ ์žˆ์Šต๋‹ˆ๋‹ค.

 

ํ™˜๊ฒฝ ์„ค์ •ํ•˜๊ธฐ 

์šฐ๋ฆฌ๋Š” ์ฝ”๋“œ ์ž‘์„ฑ์— Databricks Community Edition์„ ์‚ฌ์šฉํ•  ๊ฒƒ์ด๋ฉฐ, ๋ฐ์ดํ„ฐ ์ €์žฅ์„ ์œ„ํ•ด Databricks File System(DBFS)์„ ์‚ฌ์šฉํ•  ๊ฒƒ์ž…๋‹ˆ๋‹ค. ๊ทธ๋Ÿฌ๋‚˜ ํ”„๋ ˆ์ž„์›Œํฌ๋Š” ๊ฒฝ๋กœ๋ฅผ ์ธ์ˆ˜๋กœ ๋ฐ›๊ธฐ ๋•Œ๋ฌธ์— S3 ๋ฒ„ํ‚ท, ADLS ๋“ฑ ๋‹ค๋ฅธ ๊ณณ์— ํŒŒ์ผ์„ ์ €์žฅํ•ด๋„ ๊ดœ์ฐฎ์Šต๋‹ˆ๋‹ค.

์„ค์ • ํŒŒ์ผ ๊ตฌ์กฐ ์„ค๋ช…

์ด ๋ฐ๋ชจ์—์„œ ์‚ฌ์šฉํ•  ์„ค์ • ํŒŒ์ผ ๊ตฌ์กฐ๋Š” ๋‹ค์Œ๊ณผ ๊ฐ™์Šต๋‹ˆ๋‹ค:

์„ค์ • ํŒŒ์ผ ๊ตฌ์กฐ
- `data_product_name`: ํ‘œ์ค€ํ™” ํ›„ ํ• ๋‹นํ•  ๋ฐ์ดํ„ฐ ์ œํ’ˆ(DP)์˜ ์ด๋ฆ„
- `raw_data_product_name`: ์›๋ณธ ๋ฐ์ดํ„ฐ ์ œํ’ˆ์˜ ์ด๋ฆ„
- `schema`:
  - `source_columns`: (์›๋ณธ ๋ฐ์ดํ„ฐ ์ œํ’ˆ์—์„œ ์ง์ ‘ ๊ฐ€์ ธ์˜จ ์ปฌ๋Ÿผ)
    - `raw_name`: ์›๋ณธ ๋ฐ์ดํ„ฐ ์ œํ’ˆ์˜ ์ปฌ๋Ÿผ ์ด๋ฆ„
    - `standardized_name`: ์›๋ณธ ์ปฌ๋Ÿผ์˜ ํ‘œ์ค€ํ™”๋œ ์ด๋ฆ„
    - `data_type`: ๋ณ€ํ™˜ํ•  ์ปฌ๋Ÿผ ๋ฐ์ดํ„ฐ ์œ ํ˜•
    - `sql_transformation`: Spark SQL๋กœ ์ž‘์„ฑ๋œ ๋ณ€ํ™˜ ๊ทœ์น™
  - `new_columns`: (๋‹ค๋ฅธ ๋ฐ์ดํ„ฐ ์ œํ’ˆ๊ณผ ์กฐ์ธ์„ ํ†ตํ•ด ์ƒ์„ฑ๋œ ์ปฌ๋Ÿผ)
    - `name`: ์ƒ์„ฑ๋  ์ƒˆ๋กœ์šด ์ปฌ๋Ÿผ ์ด๋ฆ„
    - `data_type`: ๋ณ€ํ™˜ํ•  ์ปฌ๋Ÿผ์˜ ๋ฐ์ดํ„ฐ ์œ ํ˜•
    - `sql_transformation`: Spark SQL๋กœ ์ž‘์„ฑ๋œ ๋ณ€ํ™˜ ๊ทœ์น™
  - `metadata` (๋ชจ๋“  ์ปฌ๋Ÿผ์ด ์ถ”๊ฐ€๋œ ํ›„ ํ• ๋‹นํ•  ๋ฉ”ํƒ€๋ฐ์ดํ„ฐ)
    - `column_descriptions`: ์ปฌ๋Ÿผ ์„ค๋ช…

 


์›๋ณธ ๋ฐ์ดํ„ฐ ์ œํ’ˆ ์˜ˆ์‹œ: supplier. ์•„๋ž˜๋Š” ํ‘œ์ค€ํ™”ํ•  ์›๋ณธ ๋ฐ์ดํ„ฐ ์ œํ’ˆ supplier์ž…๋‹ˆ๋‹ค.

supplier (Raw data product)



ํ‘œ์ค€ํ™”๋œ ๋ฐ์ดํ„ฐ ์ œํ’ˆ ์˜ˆ์‹œ: Product. ์•„๋ž˜๋Š” Product๋ผ๋Š” ๋‹ค๋ฅธ ํ‘œ์ค€ํ™”๋œ ๋ฐ์ดํ„ฐ ์ œํ’ˆ์ž…๋‹ˆ๋‹ค. ์ƒˆ๋กœ์šด ์ปฌ๋Ÿผ์„ ์ถ”๊ฐ€ํ•˜๊ธฐ ์œ„ํ•ด ์ด๋ฅผ ์‚ฌ์šฉํ•  ๊ฒƒ์ž…๋‹ˆ๋‹ค.

Product (Other Standardized Data Product)

supplier์˜ JSON ์„ค์ • ํŒŒ์ผ

{
    "data_product_name" : "Product_Supplier",
    "raw_data_product_name" : "supplier",
    "schema" : {
        "source_columns" : [
            {
                "raw_name" : "sup_id",
                "standardized_name" : "Supplier_ID",
                "data_type" : "string",
                "sql_transformation" : "CONCAT('SUP', '-' , sup_id)"
            },
            {
                "raw_name" : "name",
                "standardized_name" : "Supplier_Name",
                "data_type" : "string",
                "sql_transformation" : ""
            },
            {
                "raw_name" : "price",
                "standardized_name" : "Purchase_Price",
                "data_type" : "int",
                "sql_transformation" : ""
            },
            {
                "raw_name" : "prod_name",
                "standardized_name" : "Product_Name",
                "data_type" : "string",
                "sql_transformation" : ""
            },
            {
                "raw_name" : "quantity",
                "standardized_name" : "Purchase_Quantity",
                "data_type" : "int",
                "sql_transformation" : ""
            },
            {
                "raw_name" : "",
                "standardized_name" : "Total_Cost",
                "data_type" : "int",
                "sql_transformation" : "price * quantity"
            }
        ],
        "new_columns" : [
            {
                "name" : "Product_ID",
                "data_type" : "string",
                "sql_transformation" : "MERGE INTO delta.`{temp_std_dp_path}` dest USING delta.`dbfs:/FileStore/project/Product` src ON dest.Product_Name = src.Product_Name WHEN MATCHED THEN UPDATE SET dest.Product_ID = src.Product_ID"
            }
        ]
    },
    "column_sequence_order" : [
        "Supplier_ID", "Supplier_Name", "Product_ID", "Product_Name", "Purchase_Price", "Purchase_Quantity", "Total_Cost"
    ],
    "metadata" : {
        "column_descriptions" : {
            "Supplier_ID" : "์ œํ’ˆ์˜ ๊ณต๊ธ‰์—…์ฒด์— ๋Œ€ํ•œ ๊ณ ์œ  ์‹๋ณ„์ž",
            "Supplier_Name" : "๊ณต๊ธ‰์—…์ฒด์˜ ์ด๋ฆ„",
            "Purchase_Price" : "๊ณต๊ธ‰์—…์ฒด๊ฐ€ ์ œํ’ˆ์„ ํŒ๋งคํ•˜๋Š” ๊ฐ€๊ฒฉ",
            "Product_Name" : "์ œํ’ˆ์˜ ์ด๋ฆ„",
            "Purchase_Quantity" : "๊ณต๊ธ‰์—…์ฒด๊ฐ€ ๋ณด์œ ํ•œ ์ œํ’ˆ์˜ ์ˆ˜๋Ÿ‰",
            "Total_Cost" : "์ฃผ์–ด์ง„ ๊ตฌ๋งค ๊ฐ€๊ฒฉ์œผ๋กœ ํŠน์ • ์ˆ˜๋Ÿ‰์˜ ์ œํ’ˆ์„ ๊ตฌ๋งคํ•˜๋Š” ๋ฐ ์†Œ์š”๋œ ์ด ๊ธˆ์•ก",
            "Product_ID" : "์ œํ’ˆ์— ๋Œ€ํ•œ ๊ณ ์œ  ์‹๋ณ„์ž"
        }
    }
}

์›๋ณธ ๋ฐ์ดํ„ฐ ์ œํ’ˆ(supplier)๊ณผ Product ๋ฐ์ดํ„ฐ ์ œํ’ˆ์„ Delta ํ˜•์‹์œผ๋กœ ์ €์žฅํ•˜๊ณ  JSON ์„ค์ • ํŒŒ์ผ์„ ์ ์ ˆํ•œ ๊ฒฝ๋กœ์— ์—…๋กœ๋“œํ•ฉ๋‹ˆ๋‹ค.

์šฐ๋ฆฌ๋Š” ํ’€ ๋กœ๋“œ ํ”„๋กœ์„ธ์Šค(ํ’€ ๋กœ๋“œ: ๋ฐ์ดํ„ฐ๋ฅผ ์ดˆ๊ธฐํ™”ํ•œ ํ›„ ์žฌ๋กœ๋“œ) full load process (truncate-load)๋ฅผ ๋”ฐ๋ฅผ ๊ฒƒ์ž…๋‹ˆ๋‹ค. ๋”ฐ๋ผ์„œ ๋ชจ๋“  ๋‹จ๊ณ„๋Š” ์ž„์‹œ ๋˜๋Š” ์Šคํ…Œ์ด์ง• ์˜์—ญ์—์„œ ์ˆ˜ํ–‰๋˜๋ฉฐ, ์ดํ›„ ์‹ค์ œ ํ‘œ์ค€ํ™”๋œ ๋ฐ์ดํ„ฐ ์ œํ’ˆ ๊ฒฝ๋กœ์— ๋ฎ์–ด์“ฐ๊ฒŒ ๋ฉ๋‹ˆ๋‹ค. 

 

ํ”„๋ ˆ์ž„์›Œํฌ ๊ฐœ๋ฐœ

Config Reader์˜ ์ธํ„ฐํŽ˜์ด์Šค๋ฅผ ์ •์˜ํ•ฉ๋‹ˆ๋‹ค. Config Reader ํด๋ž˜์Šค๋Š” ์„ค์ • ํŒŒ์ผ ๊ตฌ์กฐ๊ฐ€ ๋‹ค๋ฅผ ๊ฒฝ์šฐ์—๋„ ์‚ฌ์šฉํ•  ์ˆ˜ ์žˆ๋„๋ก ๊ตฌํ˜„๋  ๊ฒƒ์ž…๋‹ˆ๋‹ค. ์˜ˆ๋ฅผ ๋“ค์–ด, ์ด ํŠœํ† ๋ฆฌ์–ผ์—์„œ๋Š” JSON ์„ค์ •์„ ์‚ฌ์šฉํ•˜์ง€๋งŒ, YAML ์„ค์ •์„ ์‚ฌ์šฉํ•˜๋ ค๋ฉด ํ•ด๋‹น ํ˜•์‹์— ๋งž์ถฐ ์ƒˆ๋กœ์šด Config Reader ํด๋ž˜์Šค๋ฅผ ์ƒ์„ฑํ•˜๋ฉด ๋ฉ๋‹ˆ๋‹ค.

class ConfigReaderContract(ABC):
    @abstractmethod
    def read_source_columns_schema(self) -> spark.DataFrame:
        pass
    @abstractmethod
    def read_new_columns_schema(self) -> spark.DataFrame:
        pass
    @abstractmethod
    def read_column_descriptions_metadata(self) -> dict:
        pass
    @abstractmethod
    def read_column_sequence_order(self) -> list[str]:
        pass

ConfigReader ํด๋ž˜์Šค ๊ตฌํ˜„

class ConfigReader(ConfigReaderContract):
    def __init__(self, config_path):
        self.config_df = spark.read.option("multiLine", True).json(config_path)

    def read_source_columns_schema(self):
        exploded_df = self.config_df.select(explode(self.config_df["schema"].source_columns).alias("source_columns"))
        source_columns_schema_df = exploded_df.selectExpr(
            "source_columns.raw_name as raw_name",
            "source_columns.standardized_name as standardized_name",
            "source_columns.data_type as data_type",
            "source_columns.sql_transformation as sql_transformation"
        )
        return source_columns_schema_df

    def read_new_columns_schema(self):
        exploded_df = self.config_df.select(explode(self.config_df["schema"].new_columns).alias("new_columns"))
        new_columns_schema_df = exploded_df.selectExpr(
            "new_columns.name as name",
            "new_columns.data_type as data_type",
            "new_columns.sql_transformation as sql_transformation"
        )
        return new_columns_schema_df
    
    def read_column_descriptions_metadata(self):
        metadata_df = self.config_df.select("metadata.column_descriptions").alias("column_descriptions")
        descriptions_row_obj = metadata_df.first()["column_descriptions"]
        return descriptions_row_obj.asDict()
    
    def read_column_sequence_order(self):
        return list(self.config_df.first()["column_sequence_order"])

 

DataStandardizer ํด๋ž˜์Šค ๊ตฌํ˜„

class DataStandardizer:

    def __init__(self, raw_dp_path, temp_std_dp_path, std_dp_path):
        self.raw_dp_path = raw_dp_path
        self.temp_std_dp_path = temp_std_dp_path
        self.std_dp_path = std_dp_path

    def create_temp_std_dp_with_source_columns(self, source_columns_schema_df):
        source_columns_schema_df.createOrReplaceTempView("source_columns_config_table")
        select_query_sql = f"""
            SELECT 
                concat(
                    "SELECT ", 
                    array_join(collect_list(select_expression), ", "), 
                    " FROM delta.`{self.raw_dp_path}`"
                ) as select_query 
            FROM (
                SELECT 
                    CASE
                        WHEN sql_transformation = "" THEN concat("CAST(", concat("`", raw_name, "`"), " AS ", data_type, ") AS ", standardized_name)
                        ELSE concat("CAST(", sql_transformation, " AS ", data_type, ") AS ", standardized_name)
                    END as select_expression 
                FROM source_columns_config_table
            )
        """
        df = spark.sql(select_query_sql)
        select_query = df.first()["select_query"]
        create_sql_query = f"CREATE OR REPLACE TABLE delta.`{self.temp_std_dp_path}` as ( " + select_query + ")"
        spark.sql(create_sql_query)

    def add_new_columns_in_temp_std_dp(self, new_columns_schema_df):
        new_columns_schema_df_rows = new_columns_schema_df.collect()        
        for row in new_columns_schema_df_rows:
            add_new_columns_sql = f"ALTER TABLE delta.`{self.temp_std_dp_path}` ADD COLUMN {row['name']} {row['data_type']}"   
            sql_transformation = row["sql_transformation"].replace("{temp_std_dp_path}", self.temp_std_dp_path)  
            spark.sql(add_new_columns_sql)  
            spark.sql(sql_transformation)      
    
    def update_column_descriptions_metadata(self, column_descriptions_dict):
        for column_name, description in column_descriptions_dict.items():
            column_description_update_sql = f"ALTER TABLE delta.`{self.temp_std_dp_path}` CHANGE COLUMN {column_name} COMMENT '{description}';"
            spark.sql(column_description_update_sql)
        
    def move_data_to_std_dp(self, column_sequence_order):
        temp_std_df = spark.read.format("delta").load(self.temp_std_dp_path)
        temp_std_df = temp_std_df.select(column_sequence_order)
        temp_std_df.write.option("mergeSchema", "true").format("delta").mode("overwrite").save(self.std_dp_path)

    def run(self, config_reader):
        print("Raw df : ")
        raw_df = spark.read.format("delta").load(self.raw_dp_path)
        display(raw_df)

        source_columns_schema_df = config_reader.read_source_columns_schema()
        self.create_temp_std_dp_with_source_columns(source_columns_schema_df)

        new_columns_schema_df = config_reader.read_new_columns_schema()
        self.add_new_columns_in_temp_std_dp(new_columns_schema_df)

        column_descriptions_dict = config_reader.read_column_descriptions_metadata()
        self.update_column_descriptions_metadata(column_descriptions_dict)

        column_sequence_order = config_reader.read_column_sequence_order()
        self.move_data_to_std_dp(column_sequence_order)

        print("Standardized df : ")
        std_df = spark.read.format("delta").load(self.std_dp_path)
        display(std_df)

        print("Schema information for Standardized df : ")
        std_df.printSchema()    
        display(spark.sql(f"DESCRIBE TABLE delta.`{self.std_dp_path}`"))

`DataStandardizer` ํด๋ž˜์Šค๋Š” ์„ธ ๊ฐ€์ง€ ์†์„ฑ์„ ์‚ฌ์šฉํ•ฉ๋‹ˆ๋‹ค:
- `raw_dp_path`: ์›๋ณธ ๋ฐ์ดํ„ฐ ์ œํ’ˆ ๊ฒฝ๋กœ
- `temp_std_dp_path`: ํ‘œ์ค€ํ™” ์ž‘์—…์„ ์œ„ํ•œ ์ž„์‹œ ๊ฒฝ๋กœ
- `std_dp_path`: ์ตœ์ข… ํ‘œ์ค€ํ™”๋œ ๋ฐ์ดํ„ฐ ์ œํ’ˆ ๊ฒฝ๋กœ

๊ฐ ๋ฉ”์„œ๋“œ์— ๋Œ€ํ•œ ์„ค๋ช…์€ ๋‹ค์Œ๊ณผ ๊ฐ™์Šต๋‹ˆ๋‹ค:
1. create_temp_std_dp_with_source_columns — ์›๋ณธ ๋ฐ์ดํ„ฐ ์ œํ’ˆ์—์„œ ์ง์ ‘ ๊ฐ€์ ธ์˜จ ์ปฌ๋Ÿผ์œผ๋กœ ํ‘œ์ค€ํ™”๋œ ๋ฐ์ดํ„ฐ ์ œํ’ˆ์˜ ์ดˆ๊ธฐ ๋ฒ„์ „์„ ์ƒ์„ฑํ•ฉ๋‹ˆ๋‹ค.
2. add_new_columns_in_temp_std_dp — ๋‹ค๋ฅธ ๋ฐ์ดํ„ฐ ์ œํ’ˆ๊ณผ ์กฐ์ธํ•˜์—ฌ ์–ป์€ ์ƒˆ๋กœ์šด ์ปฌ๋Ÿผ์„ ์ž„์‹œ ํ‘œ์ค€ํ™”๋œ ๋ฐ์ดํ„ฐ ์ œํ’ˆ์— ์ถ”๊ฐ€ํ•ฉ๋‹ˆ๋‹ค.
3. update_column_descriptions_metadata — ๊ฐ ์ปฌ๋Ÿผ์— ๋Œ€ํ•œ ์„ค๋ช…์„ ์—…๋ฐ์ดํŠธํ•ฉ๋‹ˆ๋‹ค.
4. move_data_to_std_dp— ์ž„์‹œ/์Šคํ…Œ์ด์ง• ์˜์—ญ์—์„œ ์ตœ์ข… ํ‘œ์ค€ํ™”๋œ ๋ฐ์ดํ„ฐ ์ œํ’ˆ ๊ฒฝ๋กœ๋กœ ๋ฐ์ดํ„ฐ๋ฅผ ๋ณต์‚ฌํ•ฉ๋‹ˆ๋‹ค.
5. run — ์ด ๋ฉ”์„œ๋“œ๋Š” ์œ„์˜ ๋ชจ๋“  ๋‹จ๊ณ„๋ฅผ ์กฐ์ •ํ•˜๋ฉฐ, `ConfigReaderContract` ํด๋ž˜์Šค๋ฅผ ์‚ฌ์šฉํ•˜์—ฌ ๊ตฌํ˜„๋œ `config_reader` ์ธ์Šคํ„ด์Šค๋ฅผ ๋ฐ›์•„๋“ค์ž…๋‹ˆ๋‹ค.

ํ‘œ์ค€ํ™”๋œ ๋ฐ์ดํ„ฐ ์ œํ’ˆ ์‹คํ–‰

๋‹ค์Œ์€ `supplier` ์›๋ณธ ๋ฐ์ดํ„ฐ ์ œํ’ˆ์—์„œ ํ”„๋ ˆ์ž„์›Œํฌ๋ฅผ ์‹คํ–‰ํ•˜๋Š” ์ฝ”๋“œ์ž…๋‹ˆ๋‹ค.

# ๊ฒฝ๋กœ ์ •์˜
raw_dp_path = "dbfs:/FileStore/project/supplier"
std_dp_path = "dbfs:/FileStore/project/Product_Supplier"
temp_std_dp_path = "dbfs:/FileStore/project/Product_Supplier_temp"
config_path = "dbfs:/FileStore/project/supplier_config.json"

# ์„ค์ • ๋ฆฌ๋” ๋ฐ ๋ฐ์ดํ„ฐ ํ‘œ์ค€ํ™” ์ดˆ๊ธฐํ™”
config_reader = ConfigReader(config_path)
data_standardizer = DataStandardizer(
    raw_dp_path=raw_dp_path,
    temp_std_dp_path=temp_std_dp_path,
    std_dp_path=std_dp_path
)

# DataStandardizer ํด๋ž˜์Šค ์‹คํ–‰
data_standardizer.run(config_reader)

 

์œ„ ์ด๋ฏธ์ง€์—์„œ ๋ณผ ์ˆ˜ ์žˆ๋“ฏ์ด, ํ‘œ์ค€ํ™”๋œ DP๋Š” ์„ค์ • ํŒŒ์ผ์— ์–ธ๊ธ‰๋œ ๋ชจ๋“  ์—ด๊ณผ ๊ฐ ์—ด์— ๋Œ€ํ•œ ์„ค๋ช…๊ณผ ํ•จ๊ป˜ ์ƒ์„ฑ๋ฉ๋‹ˆ๋‹ค.

์˜ˆ๋ฅผ ๋“ค์–ด, ์›๋ณธ DP์˜ sup_id ์—ด์€ Supplier_ID๋กœ ์ด๋ฆ„์ด ๋ณ€๊ฒฝ๋˜์—ˆ์œผ๋ฉฐ, ๊ฐ ๊ฐ’ ์•ž์— ํ•„์š”ํ•œ ์ ‘๋‘์‚ฌ(SUP)๊ฐ€ ์ถ”๊ฐ€๋˜์—ˆ์Šต๋‹ˆ๋‹ค. ์„ค๋ช… ๋ฉ”ํƒ€๋ฐ์ดํ„ฐ์ธ "์ œํ’ˆ ๊ณต๊ธ‰์—…์ฒด์˜ ๊ณ ์œ  ์‹๋ณ„์ž"๋„ ์—…๋ฐ์ดํŠธ๋˜์—ˆ์Šต๋‹ˆ๋‹ค. ๋˜ํ•œ, ์›๋ณธ DP์˜ ๊ฐ€๊ฒฉ๊ณผ ์ˆ˜๋Ÿ‰์„ ๊ณฑํ•˜์—ฌ Total_Cost ์—ด์ด ์ถ”๊ฐ€๋˜์—ˆ์Šต๋‹ˆ๋‹ค. ์ƒˆ๋กœ์šด ์—ด์ธ Product_ID๋Š” ๋‹ค๋ฅธ ๋ฐ์ดํ„ฐ ์ œํ’ˆ์ธ Product์™€ ์กฐ์ธ์„ ํ†ตํ•ด ์ถ”๊ฐ€๋˜์—ˆ์Šต๋‹ˆ๋‹ค.

์ด ๋ธ”๋กœ๊ทธ์—์„œ ์‚ฌ์šฉ๋œ ์ฝ”๋“œ๋Š” ์ด GitHub ์ €์žฅ์†Œ์—์„œ ํ™•์ธํ•  ์ˆ˜ ์žˆ์Šต๋‹ˆ๋‹ค.

๊ฒฐ๋ก 

Spark๋ฅผ ํ™œ์šฉํ•œ ์„ค์ • ๊ธฐ๋ฐ˜ ๋ฐ์ดํ„ฐ ํ‘œ์ค€ํ™” ํ”„๋ ˆ์ž„์›Œํฌ๋Š” ์›๋ณธ ๋ฐ์ดํ„ฐ๋ฅผ ๊ณ ํ’ˆ์งˆ์˜ ํ‘œ์ค€ํ™”๋œ ๋ฐ์ดํ„ฐ ์ œํ’ˆ์œผ๋กœ ๋ณ€ํ™˜ํ•˜๋Š” ๋ฐ ๋งค์šฐ ์œ ์šฉํ•œ ๋ฐฉ๋ฒ•์ž…๋‹ˆ๋‹ค. ์„ค์ • ํŒŒ์ผ์„ ํ†ตํ•ด ํ‘œ์ค€ํ™” ๊ทœ์น™์„ ๊ด€๋ฆฌํ•จ์œผ๋กœ์จ ์œ ์—ฐ์„ฑ, ์ผ๊ด€์„ฑ, ์œ ์ง€ ๊ด€๋ฆฌ ์šฉ์ด์„ฑ์„ ์ œ๊ณตํ•˜๋ฉฐ, ์ฝ”๋“œ๋ฅผ ๋ณ€๊ฒฝํ•˜์ง€ ์•Š๊ณ ๋„ ๋™์ ์œผ๋กœ ๊ทœ์น™์„ ์กฐ์ •ํ•  ์ˆ˜ ์žˆ์Šต๋‹ˆ๋‹ค. ๋˜ํ•œ, ๋ฐ์ดํ„ฐ ํ’ˆ์งˆ ๊ฒ€์ฆ, ๋ฐ์ดํ„ฐ ์ œํ’ˆ ๋ฒ„์ „ ๊ด€๋ฆฌ ๋ฐ ๊ธฐํƒ€ ๋ฉ”ํƒ€๋ฐ์ดํ„ฐ ํ–ฅ์ƒ ๊ธฐ๋Šฅ์„ ์‰ฝ๊ฒŒ ํ™•์žฅํ•  ์ˆ˜ ์žˆ์–ด ๋งค์šฐ ํ™•์žฅ ๊ฐ€๋Šฅํ•˜๊ณ  ์ ์šฉ ๋ฒ”์œ„๊ฐ€ ๋„“์Šต๋‹ˆ๋‹ค.

---

reference 

Config-Driven Data Standardization Framework using Spark