Medium ์ Config-Driven Data Standardization Framework using Spark ๋ฅผ ๋ฒ์ญํ ๊ธ์ ๋๋ค.
Spark๋ฅผ ์ฌ์ฉํ Config-Driven ๋ฐ์ดํฐ ํ์คํ ํ๋ ์์ํฌ
์ค๋๋ ์กฐ์ง์ ๋ค์ํ ์์ฒ ๋ฐ์ดํฐ ์์ค์์ ๋ฐฉ๋ํ ์์ ์์ ๋ฐ์ดํฐ๋ฅผ ์์งํฉ๋๋ค. ์ด ๋ฐ์ดํฐ๋ ์ข
์ข
์ผ๊ด์ฑ์ด ์๊ณ , ๊ตฌ์กฐ๊ฐ ๋ถ์์ ํ๋ฉฐ ๋ถ์๊ณผ ์์ฌ ๊ฒฐ์ ์ ์ํด์ ์๋นํ ์ ์ฒ๋ฆฌ๋ฅผ ํ์๋ก ํฉ๋๋ค. ์ด๋ฌํ ๋ฌธ์ ๋ฅผ ํด๊ฒฐํ๊ธฐ ์ํด์๋ ๊ฐ๋ ฅํ ๋ฐ์ดํฐ ํ์คํ ํ๋ก์ธ์ค๊ฐ ํ์์ ์
๋๋ค.
๋จผ์ ๋ฐ์ดํฐ ํ์คํ์ ์ค์์ฑ์ ๊ฐ๋จํ ์ค๋ช
ํ๊ณ , ๊ทธ ํ ์ค์ ๊ธฐ๋ฐ ์ ๊ทผ ๋ฐฉ์์ ์ฅ์ ์ ์ดํด๋ณด๋ฉฐ, ๋ง์ง๋ง์ผ๋ก ์ด ํ๋ ์์ํฌ์ ๋จ๊ณ๋ณ ๊ฐ๋ฐ ๊ณผ์ ์ ๋ค๋ฃน๋๋ค. ๋ํ ์ด ํ๋ ์์ํฌ๋ฅผ ์ฌ์ฉ์์ ์ฌ์ฉ ์ฌ๋ก์ ์ด๋ป๊ฒ ํ์ฅํ ์ ์์ผ๋ฉฐ, ์ด๋ค ์ ์ฉํ ์ ์ด ์๋์ง๋ ์ดํด๋ณด๊ฒ ์ต๋๋ค.
๋ฐ์ดํฐ ํ์คํ๋?
๋ฐ์ดํฐ ํ์คํ๋ ๋ค์ํ ์์ค์์ ์์ง๋ ๋ฐ์ดํฐ๋ฅผ ๊ณตํต๋ ํ์์ผ๋ก ๋ณํํ๋ ๊ณผ์ ์ ๋๋ค. ์ฌ๊ธฐ์๋ ๋ช ๋ช ๊ท์น์ ์ผ์น์ํค๊ณ , ๋ฐ์ดํฐ ์ ํ์ ์ ๋ ฌํ๋ฉฐ, ๋ฉํ๋ฐ์ดํฐ๋ฅผ ์ถ๊ฐํ๋ ๋ฑ์ ์์ ์ด ํฌํจ๋์ด ๋ฐ์ดํฐ ๊ฐ ์ผ๊ด์ฑ์ ๋ณด์ฅํฉ๋๋ค.
์์ ๋ฐ์ดํฐ๋ฅผ ํ์คํํด์ผ ํ๋ ์ด์
1. ๋ฐ์ดํฐ ์ ํ ๊ฐ ์ผ๊ด์ฑ: ํ์คํ๋ ์๋ก ๋ค๋ฅธ ์์ค์์ ์์ง๋ ๋ฐ์ดํฐ๊ฐ ๋์ผํ ๊ตฌ์กฐ๋ฅผ ๋ฐ๋ฅด๋๋ก ํ์ฌ ํตํฉ ๋ฐ ๋ถ์์ด ์ฉ์ดํ๊ฒ ๋ง๋ญ๋๋ค.
2. ์ค๋ฅ ๊ฐ์: ์ผ๊ด๋ ๋ช
๋ช
๊ท์น, ์ฌ๋ฐ๋ฅธ ๋ฐ์ดํฐ ์ ํ ๋ฐ ๋ณํ์ ์ ์ฉํจ์ผ๋ก์จ ๋ฐ์ดํฐ ์ฒ๋ฆฌ ๋ฐ ๋ถ์ ๊ณผ์ ์์ ๋ฐ์ํ ์ ์๋ ์ค๋ฅ๋ฅผ ํฌ๊ฒ ์ค์ผ ์ ์์ต๋๋ค.
3. ๋ฐ์ดํฐ ํ์ฉ์ฑ ํฅ์: ํ์คํ๋ ๋ฐ์ดํฐ๋ ๋ค์ํ ๋ถ์ ๋ฐ ๋ณด๊ณ ๋ชฉ์ ์ผ๋ก ๋ ์ฝ๊ฒ ์ ๊ทผํ๊ณ ํ์ฉํ ์ ์์ผ๋ฉฐ, ๋ ๋์ ํต์ฐฐ๋ ฅ๊ณผ ๋น์ฆ๋์ค ๊ฒฐ์ ์ ์ด๋์ด๋
๋๋ค.
4. ํ์ฅ์ฑ: ๋ฐ์ดํฐ ์์ด ์ฆ๊ฐํจ์ ๋ฐ๋ผ ํ์คํ๋ ๋ฐ์ดํฐ ์ ํ์ ์๋ก์ด ๋ฐ์ดํฐ ์์ค์ ์ปฌ๋ผ์ ์ถ๊ฐํ ๋ ๋ ์ฝ๊ฒ ๊ด๋ฆฌํ๊ณ ํ์ฅํ ์ ์์ต๋๋ค.
์ค์ (config) ๊ธฐ๋ฐ ๋ฐ์ดํฐ ํ์คํ ํ๋ ์์ํฌ
์ด ํ๋ ์์ํฌ์ ๋ชฉํ๋ ๊ฐ ๋ฐ์ดํฐ ์ ํ์ ๋ํด ๋ง์ถคํ ์์ง ๋ฐ ๋ณํ ์คํฌ๋ฆฝํธ๋ฅผ ์์ฑํ๋ ๋์ , ์ค์ ํ์ผ์ ์ฌ์ฉํ์ฌ ๊ท์น๊ณผ ๋งคํ์ ์ ์ํ๋ ๋ฐฉ์์ ์ฌ์ฉํฉ๋๋ค. ์ด๋ฅผ ํตํด ํ์คํ ๋ก์ง์ด ์ ํ๋ฆฌ์ผ์ด์
์ฝ๋์์ ๋ถ๋ฆฌ๋๋ฉฐ, ์์คํ
์ ์ ์ฐ์ฑ๊ณผ ์ ์ง ๊ด๋ฆฌ์ฑ์ ํฅ์์ํต๋๋ค.
์ด ํํ ๋ฆฌ์ผ์์๋ ์ค์ ํ์ผ ํ์์ผ๋ก JSON์ ์ฌ์ฉํ๊ณ , ์ฝ๋๋ PySpark์ Spark SQL์ ํ์ฉํ ๊ฒ์
๋๋ค. ๋ฐ์ดํฐ ์ ํ์ Delta ํ์์ผ๋ก ์ ์ฅ๋ฉ๋๋ค.
์ค์ ๊ธฐ๋ฐ ์ ๊ทผ ๋ฐฉ์์ ์ฃผ์ ์ฅ์
1. ์ ์ฐ์ฑ: ์ ํ๋ฆฌ์ผ์ด์
์ฝ๋๋ฅผ ๋ณ๊ฒฝํ์ง ์๊ณ ๋ ํ์คํ ๊ท์น์ ์ฝ๊ฒ ์์ ํ ์ ์์ต๋๋ค.
2. ํ์ฅ์ฑ: Spark๋ฅผ ์ฌ์ฉํ์ฌ ๋๊ท๋ชจ ๋ฐ์ดํฐ ์ธํธ๋ฅผ ํจ์จ์ ์ผ๋ก ์ฒ๋ฆฌํ ์ ์์ต๋๋ค.
3. ์ ์ง ๋ณด์์ฑ: ๋ณํ ๋ก์ง์ด ์ค์ ํ์ผ์ ์ค์ ์ง์คํ๋์ด ์์ด ์ ์ง ๊ด๋ฆฌ์ ์
๋ฐ์ดํธ๊ฐ ์ฉ์ดํฉ๋๋ค.
์ฐ๋ฆฌ๊ฐ ๊ตฌ์ถํ ๊ธฐ๋ฅ์ ๋ค์๊ณผ ๊ฐ์ต๋๋ค:
- ์ปฌ๋ผ ์ด๋ฆ ์
๋ฐ์ดํธ
- ๋ฐ์ดํฐ ๋ณํ
- ์ปฌ๋ผ ๋ฐ์ดํฐ ์ ํ ์
๋ฐ์ดํธ
- ์ปฌ๋ผ ์ค๋ช
๋ฉํ๋ฐ์ดํฐ ์
๋ฐ์ดํธ
- ์๋ก์ด ์ปฌ๋ผ ์ถ๊ฐ
์์ํ๊ธฐ ์ ์, ๋ฐ์ดํฐ ํ์คํ๋ ์ด ๊ฒ์๊ธ์์ ๋ค๋ฃจ๋ ๊ฒ ์ด์์ ํ๋์ ํฌ๊ดํ ์ ์์์ ์ธ๊ธํ๊ณ ์ถ์ต๋๋ค. ๋ฐ์ดํฐ ํ์ง ๊ฒ์ฆ ์ถ๊ฐ, ์ถ๊ฐ ๋ฉํ๋ฐ์ดํฐ ํตํฉ, ๋ฐ์ดํฐ ์ ํ ๋ฒ์ ๊ด๋ฆฌ ๋ฑ ๋ฐ์ดํฐ ๋ฌด๊ฒฐ์ฑ๊ณผ ์ฌ์ฉ์ฑ์ ๋์ด๋ ๋ค๋ฅธ ํ๋ก์ธ์ค๊ฐ ํฌํจ๋ ์ ์์ต๋๋ค.
ํ๊ฒฝ ์ค์ ํ๊ธฐ
์ฐ๋ฆฌ๋ ์ฝ๋ ์์ฑ์ Databricks Community Edition์ ์ฌ์ฉํ ๊ฒ์ด๋ฉฐ, ๋ฐ์ดํฐ ์ ์ฅ์ ์ํด Databricks File System(DBFS)์ ์ฌ์ฉํ ๊ฒ์ ๋๋ค. ๊ทธ๋ฌ๋ ํ๋ ์์ํฌ๋ ๊ฒฝ๋ก๋ฅผ ์ธ์๋ก ๋ฐ๊ธฐ ๋๋ฌธ์ S3 ๋ฒํท, ADLS ๋ฑ ๋ค๋ฅธ ๊ณณ์ ํ์ผ์ ์ ์ฅํด๋ ๊ด์ฐฎ์ต๋๋ค.
์ค์ ํ์ผ ๊ตฌ์กฐ ์ค๋ช
์ด ๋ฐ๋ชจ์์ ์ฌ์ฉํ ์ค์ ํ์ผ ๊ตฌ์กฐ๋ ๋ค์๊ณผ ๊ฐ์ต๋๋ค:
์ค์ ํ์ผ ๊ตฌ์กฐ
- `data_product_name`: ํ์คํ ํ ํ ๋นํ ๋ฐ์ดํฐ ์ ํ(DP)์ ์ด๋ฆ
- `raw_data_product_name`: ์๋ณธ ๋ฐ์ดํฐ ์ ํ์ ์ด๋ฆ
- `schema`:
- `source_columns`: (์๋ณธ ๋ฐ์ดํฐ ์ ํ์์ ์ง์ ๊ฐ์ ธ์จ ์ปฌ๋ผ)
- `raw_name`: ์๋ณธ ๋ฐ์ดํฐ ์ ํ์ ์ปฌ๋ผ ์ด๋ฆ
- `standardized_name`: ์๋ณธ ์ปฌ๋ผ์ ํ์คํ๋ ์ด๋ฆ
- `data_type`: ๋ณํํ ์ปฌ๋ผ ๋ฐ์ดํฐ ์ ํ
- `sql_transformation`: Spark SQL๋ก ์์ฑ๋ ๋ณํ ๊ท์น
- `new_columns`: (๋ค๋ฅธ ๋ฐ์ดํฐ ์ ํ๊ณผ ์กฐ์ธ์ ํตํด ์์ฑ๋ ์ปฌ๋ผ)
- `name`: ์์ฑ๋ ์๋ก์ด ์ปฌ๋ผ ์ด๋ฆ
- `data_type`: ๋ณํํ ์ปฌ๋ผ์ ๋ฐ์ดํฐ ์ ํ
- `sql_transformation`: Spark SQL๋ก ์์ฑ๋ ๋ณํ ๊ท์น
- `metadata` (๋ชจ๋ ์ปฌ๋ผ์ด ์ถ๊ฐ๋ ํ ํ ๋นํ ๋ฉํ๋ฐ์ดํฐ)
- `column_descriptions`: ์ปฌ๋ผ ์ค๋ช
์๋ณธ ๋ฐ์ดํฐ ์ ํ ์์: supplier. ์๋๋ ํ์คํํ ์๋ณธ ๋ฐ์ดํฐ ์ ํ supplier์
๋๋ค.
ํ์คํ๋ ๋ฐ์ดํฐ ์ ํ ์์: Product. ์๋๋ Product๋ผ๋ ๋ค๋ฅธ ํ์คํ๋ ๋ฐ์ดํฐ ์ ํ์
๋๋ค. ์๋ก์ด ์ปฌ๋ผ์ ์ถ๊ฐํ๊ธฐ ์ํด ์ด๋ฅผ ์ฌ์ฉํ ๊ฒ์
๋๋ค.
supplier์ JSON ์ค์ ํ์ผ
{
"data_product_name" : "Product_Supplier",
"raw_data_product_name" : "supplier",
"schema" : {
"source_columns" : [
{
"raw_name" : "sup_id",
"standardized_name" : "Supplier_ID",
"data_type" : "string",
"sql_transformation" : "CONCAT('SUP', '-' , sup_id)"
},
{
"raw_name" : "name",
"standardized_name" : "Supplier_Name",
"data_type" : "string",
"sql_transformation" : ""
},
{
"raw_name" : "price",
"standardized_name" : "Purchase_Price",
"data_type" : "int",
"sql_transformation" : ""
},
{
"raw_name" : "prod_name",
"standardized_name" : "Product_Name",
"data_type" : "string",
"sql_transformation" : ""
},
{
"raw_name" : "quantity",
"standardized_name" : "Purchase_Quantity",
"data_type" : "int",
"sql_transformation" : ""
},
{
"raw_name" : "",
"standardized_name" : "Total_Cost",
"data_type" : "int",
"sql_transformation" : "price * quantity"
}
],
"new_columns" : [
{
"name" : "Product_ID",
"data_type" : "string",
"sql_transformation" : "MERGE INTO delta.`{temp_std_dp_path}` dest USING delta.`dbfs:/FileStore/project/Product` src ON dest.Product_Name = src.Product_Name WHEN MATCHED THEN UPDATE SET dest.Product_ID = src.Product_ID"
}
]
},
"column_sequence_order" : [
"Supplier_ID", "Supplier_Name", "Product_ID", "Product_Name", "Purchase_Price", "Purchase_Quantity", "Total_Cost"
],
"metadata" : {
"column_descriptions" : {
"Supplier_ID" : "์ ํ์ ๊ณต๊ธ์
์ฒด์ ๋ํ ๊ณ ์ ์๋ณ์",
"Supplier_Name" : "๊ณต๊ธ์
์ฒด์ ์ด๋ฆ",
"Purchase_Price" : "๊ณต๊ธ์
์ฒด๊ฐ ์ ํ์ ํ๋งคํ๋ ๊ฐ๊ฒฉ",
"Product_Name" : "์ ํ์ ์ด๋ฆ",
"Purchase_Quantity" : "๊ณต๊ธ์
์ฒด๊ฐ ๋ณด์ ํ ์ ํ์ ์๋",
"Total_Cost" : "์ฃผ์ด์ง ๊ตฌ๋งค ๊ฐ๊ฒฉ์ผ๋ก ํน์ ์๋์ ์ ํ์ ๊ตฌ๋งคํ๋ ๋ฐ ์์๋ ์ด ๊ธ์ก",
"Product_ID" : "์ ํ์ ๋ํ ๊ณ ์ ์๋ณ์"
}
}
}
์๋ณธ ๋ฐ์ดํฐ ์ ํ(supplier)๊ณผ Product ๋ฐ์ดํฐ ์ ํ์ Delta ํ์์ผ๋ก ์ ์ฅํ๊ณ JSON ์ค์ ํ์ผ์ ์ ์ ํ ๊ฒฝ๋ก์ ์
๋ก๋ํฉ๋๋ค.
์ฐ๋ฆฌ๋ ํ ๋ก๋ ํ๋ก์ธ์ค(ํ ๋ก๋: ๋ฐ์ดํฐ๋ฅผ ์ด๊ธฐํํ ํ ์ฌ๋ก๋) full load process (truncate-load)๋ฅผ ๋ฐ๋ฅผ ๊ฒ์
๋๋ค. ๋ฐ๋ผ์ ๋ชจ๋ ๋จ๊ณ๋ ์์ ๋๋ ์คํ
์ด์ง ์์ญ์์ ์ํ๋๋ฉฐ, ์ดํ ์ค์ ํ์คํ๋ ๋ฐ์ดํฐ ์ ํ ๊ฒฝ๋ก์ ๋ฎ์ด์ฐ๊ฒ ๋ฉ๋๋ค.
ํ๋ ์์ํฌ ๊ฐ๋ฐ
Config Reader์ ์ธํฐํ์ด์ค๋ฅผ ์ ์ํฉ๋๋ค. Config Reader ํด๋์ค๋ ์ค์ ํ์ผ ๊ตฌ์กฐ๊ฐ ๋ค๋ฅผ ๊ฒฝ์ฐ์๋ ์ฌ์ฉํ ์ ์๋๋ก ๊ตฌํ๋ ๊ฒ์ ๋๋ค. ์๋ฅผ ๋ค์ด, ์ด ํํ ๋ฆฌ์ผ์์๋ JSON ์ค์ ์ ์ฌ์ฉํ์ง๋ง, YAML ์ค์ ์ ์ฌ์ฉํ๋ ค๋ฉด ํด๋น ํ์์ ๋ง์ถฐ ์๋ก์ด Config Reader ํด๋์ค๋ฅผ ์์ฑํ๋ฉด ๋ฉ๋๋ค.
class ConfigReaderContract(ABC):
@abstractmethod
def read_source_columns_schema(self) -> spark.DataFrame:
pass
@abstractmethod
def read_new_columns_schema(self) -> spark.DataFrame:
pass
@abstractmethod
def read_column_descriptions_metadata(self) -> dict:
pass
@abstractmethod
def read_column_sequence_order(self) -> list[str]:
pass
ConfigReader ํด๋์ค ๊ตฌํ
class ConfigReader(ConfigReaderContract):
def __init__(self, config_path):
self.config_df = spark.read.option("multiLine", True).json(config_path)
def read_source_columns_schema(self):
exploded_df = self.config_df.select(explode(self.config_df["schema"].source_columns).alias("source_columns"))
source_columns_schema_df = exploded_df.selectExpr(
"source_columns.raw_name as raw_name",
"source_columns.standardized_name as standardized_name",
"source_columns.data_type as data_type",
"source_columns.sql_transformation as sql_transformation"
)
return source_columns_schema_df
def read_new_columns_schema(self):
exploded_df = self.config_df.select(explode(self.config_df["schema"].new_columns).alias("new_columns"))
new_columns_schema_df = exploded_df.selectExpr(
"new_columns.name as name",
"new_columns.data_type as data_type",
"new_columns.sql_transformation as sql_transformation"
)
return new_columns_schema_df
def read_column_descriptions_metadata(self):
metadata_df = self.config_df.select("metadata.column_descriptions").alias("column_descriptions")
descriptions_row_obj = metadata_df.first()["column_descriptions"]
return descriptions_row_obj.asDict()
def read_column_sequence_order(self):
return list(self.config_df.first()["column_sequence_order"])
DataStandardizer ํด๋์ค ๊ตฌํ
class DataStandardizer:
def __init__(self, raw_dp_path, temp_std_dp_path, std_dp_path):
self.raw_dp_path = raw_dp_path
self.temp_std_dp_path = temp_std_dp_path
self.std_dp_path = std_dp_path
def create_temp_std_dp_with_source_columns(self, source_columns_schema_df):
source_columns_schema_df.createOrReplaceTempView("source_columns_config_table")
select_query_sql = f"""
SELECT
concat(
"SELECT ",
array_join(collect_list(select_expression), ", "),
" FROM delta.`{self.raw_dp_path}`"
) as select_query
FROM (
SELECT
CASE
WHEN sql_transformation = "" THEN concat("CAST(", concat("`", raw_name, "`"), " AS ", data_type, ") AS ", standardized_name)
ELSE concat("CAST(", sql_transformation, " AS ", data_type, ") AS ", standardized_name)
END as select_expression
FROM source_columns_config_table
)
"""
df = spark.sql(select_query_sql)
select_query = df.first()["select_query"]
create_sql_query = f"CREATE OR REPLACE TABLE delta.`{self.temp_std_dp_path}` as ( " + select_query + ")"
spark.sql(create_sql_query)
def add_new_columns_in_temp_std_dp(self, new_columns_schema_df):
new_columns_schema_df_rows = new_columns_schema_df.collect()
for row in new_columns_schema_df_rows:
add_new_columns_sql = f"ALTER TABLE delta.`{self.temp_std_dp_path}` ADD COLUMN {row['name']} {row['data_type']}"
sql_transformation = row["sql_transformation"].replace("{temp_std_dp_path}", self.temp_std_dp_path)
spark.sql(add_new_columns_sql)
spark.sql(sql_transformation)
def update_column_descriptions_metadata(self, column_descriptions_dict):
for column_name, description in column_descriptions_dict.items():
column_description_update_sql = f"ALTER TABLE delta.`{self.temp_std_dp_path}` CHANGE COLUMN {column_name} COMMENT '{description}';"
spark.sql(column_description_update_sql)
def move_data_to_std_dp(self, column_sequence_order):
temp_std_df = spark.read.format("delta").load(self.temp_std_dp_path)
temp_std_df = temp_std_df.select(column_sequence_order)
temp_std_df.write.option("mergeSchema", "true").format("delta").mode("overwrite").save(self.std_dp_path)
def run(self, config_reader):
print("Raw df : ")
raw_df = spark.read.format("delta").load(self.raw_dp_path)
display(raw_df)
source_columns_schema_df = config_reader.read_source_columns_schema()
self.create_temp_std_dp_with_source_columns(source_columns_schema_df)
new_columns_schema_df = config_reader.read_new_columns_schema()
self.add_new_columns_in_temp_std_dp(new_columns_schema_df)
column_descriptions_dict = config_reader.read_column_descriptions_metadata()
self.update_column_descriptions_metadata(column_descriptions_dict)
column_sequence_order = config_reader.read_column_sequence_order()
self.move_data_to_std_dp(column_sequence_order)
print("Standardized df : ")
std_df = spark.read.format("delta").load(self.std_dp_path)
display(std_df)
print("Schema information for Standardized df : ")
std_df.printSchema()
display(spark.sql(f"DESCRIBE TABLE delta.`{self.std_dp_path}`"))
`DataStandardizer` ํด๋์ค๋ ์ธ ๊ฐ์ง ์์ฑ์ ์ฌ์ฉํฉ๋๋ค:
- `raw_dp_path`: ์๋ณธ ๋ฐ์ดํฐ ์ ํ ๊ฒฝ๋ก
- `temp_std_dp_path`: ํ์คํ ์์
์ ์ํ ์์ ๊ฒฝ๋ก
- `std_dp_path`: ์ต์ข
ํ์คํ๋ ๋ฐ์ดํฐ ์ ํ ๊ฒฝ๋ก
๊ฐ ๋ฉ์๋์ ๋ํ ์ค๋ช
์ ๋ค์๊ณผ ๊ฐ์ต๋๋ค:
1. create_temp_std_dp_with_source_columns — ์๋ณธ ๋ฐ์ดํฐ ์ ํ์์ ์ง์ ๊ฐ์ ธ์จ ์ปฌ๋ผ์ผ๋ก ํ์คํ๋ ๋ฐ์ดํฐ ์ ํ์ ์ด๊ธฐ ๋ฒ์ ์ ์์ฑํฉ๋๋ค.
2. add_new_columns_in_temp_std_dp — ๋ค๋ฅธ ๋ฐ์ดํฐ ์ ํ๊ณผ ์กฐ์ธํ์ฌ ์ป์ ์๋ก์ด ์ปฌ๋ผ์ ์์ ํ์คํ๋ ๋ฐ์ดํฐ ์ ํ์ ์ถ๊ฐํฉ๋๋ค.
3. update_column_descriptions_metadata — ๊ฐ ์ปฌ๋ผ์ ๋ํ ์ค๋ช
์ ์
๋ฐ์ดํธํฉ๋๋ค.
4. move_data_to_std_dp— ์์/์คํ
์ด์ง ์์ญ์์ ์ต์ข
ํ์คํ๋ ๋ฐ์ดํฐ ์ ํ ๊ฒฝ๋ก๋ก ๋ฐ์ดํฐ๋ฅผ ๋ณต์ฌํฉ๋๋ค.
5. run — ์ด ๋ฉ์๋๋ ์์ ๋ชจ๋ ๋จ๊ณ๋ฅผ ์กฐ์ ํ๋ฉฐ, `ConfigReaderContract` ํด๋์ค๋ฅผ ์ฌ์ฉํ์ฌ ๊ตฌํ๋ `config_reader` ์ธ์คํด์ค๋ฅผ ๋ฐ์๋ค์
๋๋ค.
ํ์คํ๋ ๋ฐ์ดํฐ ์ ํ ์คํ
๋ค์์ `supplier` ์๋ณธ ๋ฐ์ดํฐ ์ ํ์์ ํ๋ ์์ํฌ๋ฅผ ์คํํ๋ ์ฝ๋์ ๋๋ค.
# ๊ฒฝ๋ก ์ ์
raw_dp_path = "dbfs:/FileStore/project/supplier"
std_dp_path = "dbfs:/FileStore/project/Product_Supplier"
temp_std_dp_path = "dbfs:/FileStore/project/Product_Supplier_temp"
config_path = "dbfs:/FileStore/project/supplier_config.json"
# ์ค์ ๋ฆฌ๋ ๋ฐ ๋ฐ์ดํฐ ํ์คํ ์ด๊ธฐํ
config_reader = ConfigReader(config_path)
data_standardizer = DataStandardizer(
raw_dp_path=raw_dp_path,
temp_std_dp_path=temp_std_dp_path,
std_dp_path=std_dp_path
)
# DataStandardizer ํด๋์ค ์คํ
data_standardizer.run(config_reader)
์ ์ด๋ฏธ์ง์์ ๋ณผ ์ ์๋ฏ์ด, ํ์คํ๋ DP๋ ์ค์ ํ์ผ์ ์ธ๊ธ๋ ๋ชจ๋ ์ด๊ณผ ๊ฐ ์ด์ ๋ํ ์ค๋ช ๊ณผ ํจ๊ป ์์ฑ๋ฉ๋๋ค.
์๋ฅผ ๋ค์ด, ์๋ณธ DP์ sup_id ์ด์ Supplier_ID๋ก ์ด๋ฆ์ด ๋ณ๊ฒฝ๋์์ผ๋ฉฐ, ๊ฐ ๊ฐ ์์ ํ์ํ ์ ๋์ฌ(SUP)๊ฐ ์ถ๊ฐ๋์์ต๋๋ค. ์ค๋ช ๋ฉํ๋ฐ์ดํฐ์ธ "์ ํ ๊ณต๊ธ์ ์ฒด์ ๊ณ ์ ์๋ณ์"๋ ์ ๋ฐ์ดํธ๋์์ต๋๋ค. ๋ํ, ์๋ณธ DP์ ๊ฐ๊ฒฉ๊ณผ ์๋์ ๊ณฑํ์ฌ Total_Cost ์ด์ด ์ถ๊ฐ๋์์ต๋๋ค. ์๋ก์ด ์ด์ธ Product_ID๋ ๋ค๋ฅธ ๋ฐ์ดํฐ ์ ํ์ธ Product์ ์กฐ์ธ์ ํตํด ์ถ๊ฐ๋์์ต๋๋ค.
์ด ๋ธ๋ก๊ทธ์์ ์ฌ์ฉ๋ ์ฝ๋๋ ์ด GitHub ์ ์ฅ์์์ ํ์ธํ ์ ์์ต๋๋ค.
๊ฒฐ๋ก
Spark๋ฅผ ํ์ฉํ ์ค์ ๊ธฐ๋ฐ ๋ฐ์ดํฐ ํ์คํ ํ๋ ์์ํฌ๋ ์๋ณธ ๋ฐ์ดํฐ๋ฅผ ๊ณ ํ์ง์ ํ์คํ๋ ๋ฐ์ดํฐ ์ ํ์ผ๋ก ๋ณํํ๋ ๋ฐ ๋งค์ฐ ์ ์ฉํ ๋ฐฉ๋ฒ์
๋๋ค. ์ค์ ํ์ผ์ ํตํด ํ์คํ ๊ท์น์ ๊ด๋ฆฌํจ์ผ๋ก์จ ์ ์ฐ์ฑ, ์ผ๊ด์ฑ, ์ ์ง ๊ด๋ฆฌ ์ฉ์ด์ฑ์ ์ ๊ณตํ๋ฉฐ, ์ฝ๋๋ฅผ ๋ณ๊ฒฝํ์ง ์๊ณ ๋ ๋์ ์ผ๋ก ๊ท์น์ ์กฐ์ ํ ์ ์์ต๋๋ค. ๋ํ, ๋ฐ์ดํฐ ํ์ง ๊ฒ์ฆ, ๋ฐ์ดํฐ ์ ํ ๋ฒ์ ๊ด๋ฆฌ ๋ฐ ๊ธฐํ ๋ฉํ๋ฐ์ดํฐ ํฅ์ ๊ธฐ๋ฅ์ ์ฝ๊ฒ ํ์ฅํ ์ ์์ด ๋งค์ฐ ํ์ฅ ๊ฐ๋ฅํ๊ณ ์ ์ฉ ๋ฒ์๊ฐ ๋์ต๋๋ค.
---
reference
'note' ์นดํ ๊ณ ๋ฆฌ์ ๋ค๋ฅธ ๊ธ
virtualbox ubuntu 22.04์ elastic search ์ค์น ํ ์๊ฒฉ ์์ฒญ ํ ์คํธ (0) | 2024.10.10 |
---|---|
Backend for Frontend (BFF) Architecture ๋ฒ์ญ (0) | 2024.10.02 |
Docker ๋น๋ ์๊ฐ์ 40% ๋จ์ถํ ๋ฐฉ๋ฒ (๋ฒ์ญ) (0) | 2024.05.01 |
[LangChain] LangChain์ ํ์ฉํ ๋ฌธ์ ๊ธฐ๋ฐ ์ฑ๋ด ๋ง๋ค๊ธฐ (0) | 2024.03.20 |
2๋ ๋์ Kubernetes์์ Airflow๋ฅผ ์คํํ๊ณ ๋ฐฐ์ด ๊ฒ (๋ฒ์ญ) (0) | 2024.02.29 |