ML/MLOps Engineer
์˜ˆ์ œ๋กœ ์‚ดํŽด๋ณด๋Š” AI๋ฅผ ์œ„ํ•œ ๋ฐ์ดํ„ฐ ์—”์ง€๋‹ˆ์–ด๋ง

Data Engineering for AI: Practical Examples and Best Practices (opens new window)๋ฅผ ์ฝ๊ณ  ์ •๋ฆฌํ–ˆ์Šต๋‹ˆ๋‹ค.

AI ๊ธฐ๋ฐ˜ ์• ํ”Œ๋ฆฌ์ผ€์ด์…˜์ด ์ ์  ๋” ๋„๋ฆฌ ์‚ฌ์šฉ๋จ์— ๋”ฐ๋ผ, AI๋ฅผ ์œ„ํ•œ ๋ฐ์ดํ„ฐ ๊ด€๋ฆฌ๋„ ์ „๋ฌธ์ ์ธ ๊ธฐ์ˆ ๊ณผ ๊ฐ•๋ ฅํ•œ ํ”„๋ ˆ์ž„์›Œํฌ๋ฅผ ์š”๊ตฌํ•˜๋Š” ๋ณต์žกํ•œ ์ž‘์—…์œผ๋กœ ๋ฐœ์ „ํ–ˆ์Šต๋‹ˆ๋‹ค. AI ๋ฐ์ดํ„ฐ ์—”์ง€๋‹ˆ์–ด๋ง์„ ์˜ˆ์ œ๋ฅผ ํ†ตํ•ด ํ•จ๊ป˜ ์•Œ์•„๋ด…๋‹ˆ๋‹ค.

1. AI ์›Œํฌ๋กœ๋“œ๋ฅผ ์œ„ํ•œ ๋ฐ์ดํ„ฐ ํŒŒ์ดํ”„๋ผ์ธ ๊ตฌ์ถ•

AI๋ฅผ ์œ„ํ•œ ๋ฐ์ดํ„ฐ ํŒŒ์ดํ”„๋ผ์ธ์€ ์œ ์—ฐ์„ฑ๊ณผ ํ™•์žฅ์„ฑ์ด ํ•„์š”ํ•˜๋ฉฐ, ์ข…์ข… ๋ฐฐ์น˜(batch)์™€ ์ŠคํŠธ๋ฆฌ๋ฐ(streaming) ๊ธฐ๋Šฅ์„ ๋ชจ๋‘ ์š”๊ตฌํ•ฉ๋‹ˆ๋‹ค. Apache Spark์™€ Kafka๋ฅผ ์‚ฌ์šฉํ•˜์—ฌ ๋ฐ์ดํ„ฐ๋ฅผ ์ฒ˜๋ฆฌํ•˜๋Š” ํŒŒ์ดํ”„๋ผ์ธ ์˜ˆ์ œ๋ฅผ ์‚ดํŽด๋ณด๊ฒ ์Šต๋‹ˆ๋‹ค.

์˜ˆ์‹œ: Apache Spark์™€ Kafka๋ฅผ ํ™œ์šฉํ•œ ์‹ค์‹œ๊ฐ„ ๋ฐ์ดํ„ฐ ์ˆ˜์ง‘

from pyspark.sql import SparkSession  
from pyspark.sql.functions import from_json, col  
from pyspark.sql.types import StructType, StructField, StringType, DoubleType  
  
# Define the schema for incoming data  
schema = StructType([  
    StructField("id", StringType(), True),  
    StructField("timestamp", StringType(), True),  
    StructField("value", DoubleType(), True)  
])  
  
# Initialize Spark session  
spark = SparkSession.builder \  
    .appName("AI Data Pipeline") \  
    .getOrCreate()  
  
# Read streaming data from Kafka  
df = spark \  
    .readStream \  
    .format("kafka") \  
    .option("kafka.bootstrap.servers", "localhost:9092") \  
    .option("subscribe", "data-topic") \  
    .load()  
  
# Parse JSON data  
parsed_df = df.select(from_json(col("value").cast("string"), schema).alias("data")).select("data.*")  
  
# Apply transformations for AI model  
transformed_df = parsed_df.withColumn("feature", col("value") * 2)  # Example feature engineering  
  
# Write to data sink  
query = transformed_df.writeStream \  
    .format("console") \  
    .outputMode("append") \  
    .start()  
  
query.awaitTermination()

์ด ์˜ˆ์‹œ์—์„œ๋Š”:

  • Kafka ํ† ํ”ฝ์—์„œ Spark๋กœ ๋ฐ์ดํ„ฐ๋ฅผ ๊ฐ€์ ธ์™€ ๋ณ€ํ™˜ ์ž‘์—…์„ ์ˆ˜ํ–‰ํ•ฉ๋‹ˆ๋‹ค.
  • ๊ฐ„๋‹จํ•œ ํ”ผ์ณ ๋ณ€ํ™˜(feature transformation)์„ ์ ์šฉํ•˜์—ฌ ์ƒˆ๋กœ์šด ํ”ผ์ณ ์—ด(feature column)์„ ์ƒ์„ฑํ•˜๋ฉฐ, ์ด๋Š” ์ดํ›„ ๋‹จ๊ณ„์—์„œ์˜ AI ๋ชจ๋ธ์— ์œ ์šฉํ•  ์ˆ˜ ์žˆ์Šต๋‹ˆ๋‹ค.
  • ์ฒ˜๋ฆฌ๋œ ๋ฐ์ดํ„ฐ๋Š” ์ฝ˜์†”, ๋ฐ์ดํ„ฐ ๋ ˆ์ดํฌ, ๋˜๋Š” ์‹ค์‹œ๊ฐ„์œผ๋กœ AI ๋ชจ๋ธ์— ์ง์ ‘ ์ถœ๋ ฅํ•  ์ˆ˜ ์žˆ์Šต๋‹ˆ๋‹ค.

2. ๋ฐ์ดํ„ฐ ํ’ˆ์งˆ ๋ฐ ๊ด€์ธก์„ฑ

AI๋ฅผ ์œ„ํ•ด ๋ฐ์ดํ„ฐ ํ’ˆ์งˆ์„ ๋ณด์žฅํ•˜๋Š” ๊ฒƒ์€ ๋งค์šฐ ์ค‘์š”ํ•˜๋ฉฐ, ์ž๋™ํ™”๋œ ํ’ˆ์งˆ ๊ฒ€์‚ฌ๋ฅผ ์„ค์ •ํ•˜๋ฉด ์˜ค๋ฅ˜๋ฅผ ํฌ๊ฒŒ ์ค„์ผ ์ˆ˜ ์žˆ์Šต๋‹ˆ๋‹ค. Great Expectations๋ฅผ ์‚ฌ์šฉํ•˜์—ฌ AI ๋ชจ๋ธ๋กœ ๋ฐ์ดํ„ฐ๊ฐ€ ์ „๋‹ฌ๋˜๊ธฐ ์ „์— ์œ ์ž… ๋ฐ์ดํ„ฐ๋ฅผ ๊ฒ€์ฆํ•˜๋Š” ์˜ˆ๋ฅผ ์‚ดํŽด๋ณด๊ฒ ์Šต๋‹ˆ๋‹ค.

great expectations๋Š” python์„ ๊ธฐ๋ฐ˜์œผ๋กœํ•œ ๋ฐ์ดํ„ฐ ํ’ˆ์งˆ ํ‰๊ฐ€ ์˜คํ”ˆ์†Œ์Šค ์ž…๋‹ˆ๋‹ค. https://github.com/great-expectations/great_expectations ๋ฅผ ์ฐธ๊ณ ํ•˜์„ธ์š”

์˜ˆ์‹œ: Great Expectations๋ฅผ ํ™œ์šฉํ•œ ๋ฐ์ดํ„ฐ ๊ฒ€์ฆ

from great_expectations.core.batch import BatchRequest  
from great_expectations.data_context import DataContext  
  
# Initialize Great Expectations context  
context = DataContext()  
  
# Define the data batch to validate  
batch_request = BatchRequest(  
    datasource_name="my_datasource",  
    data_connector_name="my_data_connector",  
    data_asset_name="my_table"  
)  
  
# Define a new expectation suite  
suite = context.create_expectation_suite("ai_data_quality_suite", overwrite_existing=True)  
  
# Add data expectations  
validator = context.get_validator(batch_request=batch_request, expectation_suite_name="ai_data_quality_suite")  
validator.expect_column_values_to_not_be_null("timestamp")  
validator.expect_column_values_to_be_in_set("status", ["active", "inactive"])  
  
# Run validation and check results  
results = validator.validate()  
if not results["success"]:  
    print("Data quality validation failed!")  
else:  
    print("Data quality validation passed!")

์ด ์˜ˆ์‹œ์—์„œ๋Š”:

  • Great Expectations๋ฅผ ์‚ฌ์šฉํ•˜์—ฌ ๋ฐ์ดํ„ฐ์˜ ์ฃผ์š” ํŠน์„ฑ๋“ค์„ ๊ฒ€์ฆํ•ฉ๋‹ˆ๋‹ค. ์˜ˆ๋ฅผ ๋“ค์–ด, ๋น„์–ด ์žˆ์ง€ ์•Š์€ ํƒ€์ž„์Šคํƒฌํ”„ ๊ฐ’์ด๋‚˜ ์ƒํƒœ ์—ด(status column)์—์„œ ํ—ˆ์šฉ ๊ฐ€๋Šฅํ•œ ์ƒํƒœ๋ฅผ ํ™•์ธํ•ฉ๋‹ˆ๋‹ค.
  • ์ด๋Ÿฌํ•œ ๊ฒ€์ฆ์€ AI ๋ชจ๋ธ ํ•™์Šต ํŒŒ์ดํ”„๋ผ์ธ์— ๋ฐ์ดํ„ฐ ์ด์ƒ๊ฐ’์ด ์œ ์ž…๋˜๊ธฐ ์ „์— ๋ฌธ์ œ๋ฅผ ๋ฐœ๊ฒฌํ•˜๋Š” ๋ฐ ๋„์›€์„ ์ค๋‹ˆ๋‹ค.
  • Great Expectations๋ฅผ ์ผ๋ฐ˜์ ์ธ ์˜ˆ๋กœ ์ œ์‹œํ–ˆ์ง€๋งŒ, Splunk, SignalFx ๋˜๋Š” ์ž์ฒด ๊ฐœ๋ฐœํ•œ ๋„๊ตฌ์™€ ๊ฐ™์€ ๋‹ค๋ฅธ ํ”„๋ ˆ์ž„์›Œํฌ/๋„๊ตฌ๋ฅผ ์„ ํƒํ•  ์ˆ˜๋„ ์žˆ์Šต๋‹ˆ๋‹ค.

3. ๋ฐ์ดํ„ฐ ์นดํƒˆ๋กœ๊ทธ ๋ฐ ๋ฉ”ํƒ€๋ฐ์ดํ„ฐ ๊ด€๋ฆฌ

ํ’๋ถ€ํ•œ ๋ฉ”ํƒ€๋ฐ์ดํ„ฐ๋ฅผ ํฌํ•จํ•œ ๋ฐ์ดํ„ฐ ์นดํƒˆ๋กœ๊ทธ๋ฅผ ์ƒ์„ฑํ•˜๋ฉด ๋ฐ์ดํ„ฐ ๊ณผํ•™์ž๊ฐ€ ์‚ฌ์šฉํ•˜๋Š” ๋ฐ์ดํ„ฐ์˜ ๊ณ„๋ณด(lineage), ํ’ˆ์งˆ, ๋งฅ๋ฝ(context)์„ ์ดํ•ดํ•˜๋Š” ๋ฐ ๋„์›€์„ ์ค๋‹ˆ๋‹ค. Apache Atlas๋ฅผ ์‚ฌ์šฉํ•˜๋ฉด ๋ฉ”ํƒ€๋ฐ์ดํ„ฐ๋ฅผ ํ”„๋กœ๊ทธ๋ž˜๋ฐ ๋ฐฉ์‹์œผ๋กœ ์นดํƒˆ๋กœ๊ทธํ™”ํ•  ์ˆ˜ ์žˆ์Šต๋‹ˆ๋‹ค.

์˜ˆ์‹œ: Apache Atlas API๋ฅผ ํ™œ์šฉํ•œ ๋ฐ์ดํ„ฐ ์นดํƒˆ๋กœ๊ทธํ™”

Apache Atlas๋Š” ๋ฐ์ดํ„ฐ์…‹, ํ…Œ์ด๋ธ”, ๊ณ„๋ณด๋ฅผ ๋‚˜ํƒ€๋‚ด๋Š” ์—”ํ‹ฐํ‹ฐ๋ฅผ ์ƒ์„ฑํ•˜๋Š” ๋“ฑ ๋ฉ”ํƒ€๋ฐ์ดํ„ฐ๋ฅผ ๊ด€๋ฆฌํ•  ์ˆ˜ ์žˆ๋Š” REST API๋ฅผ ์ œ๊ณตํ•ฉ๋‹ˆ๋‹ค.

import requests  
import json  
  
# Define the entity details for a new dataset  
entity = {  
    "entities": [  
        {  
            "typeName": "hive_table",  
            "attributes": {  
                "name": "sales_data",  
                "qualifiedName": "sales_data@prod",  
                "description": "Sales data for AI model training",  
                "owner": "data_engineering_team",  
                "tableType": "MANAGED_TABLE",  
                "columns": [  
                    {"name": "timestamp", "dataType": "date"},  
                    {"name": "sale_amount", "dataType": "double"}  
                ]  
            }  
        }  
    ]  
}  
  
# Send a POST request to create the entity  
response = requests.post(  
    "http://atlas-server:21000/api/atlas/v2/entity",  
    headers={"Content-Type": "application/json"},  
    data=json.dumps(entity)  
)  
  
if response.status_code == 200:  
    print("Entity created in Apache Atlas")  
else:  
    print(f"Failed to create entity: {response.text}")# Define the entity details for a new dataset  
entity = {  
    "entities": [  
        {  
            "typeName": "hive_table",  
            "attributes": {  
                "name": "sales_data",  
                "qualifiedName": "sales_data@prod",  
                "description": "Sales data for AI model training",  
                "owner": "data_engineering_team",  
                "tableType": "MANAGED_TABLE",  
                "columns": [  
                    {"name": "timestamp", "dataType": "date"},  
                    {"name": "sale_amount", "dataType": "double"}  
                ]  
            }  
        }  
    ]  
}

์ด ์˜ˆ์‹œ์—์„œ๋Š”:

  • sales_data ํ…Œ์ด๋ธ”์„ ์Šคํ‚ค๋งˆ, ์†Œ์œ ์ž, ๋ชฉ์ ์— ๋Œ€ํ•œ ๋ฉ”ํƒ€๋ฐ์ดํ„ฐ๋กœ ์นดํƒˆ๋กœ๊ทธํ™”ํ•˜์—ฌ AI ํ•™์Šต์— ์‚ฌ์šฉ๋˜๋Š” ๋ฐ์ดํ„ฐ๋ฅผ ์ถ”์ ํ•˜๊ณ  ์ดํ•ดํ•˜๊ธฐ ์‰ฝ๊ฒŒ ๋งŒ๋“ญ๋‹ˆ๋‹ค.

4. ํ™•์žฅ์„ฑ์„ ์œ„ํ•œ ๋ฐ์ดํ„ฐ ํŒŒํ‹ฐ์…”๋‹ ๋ฐ ์ธ๋ฑ์‹ฑ

๋ฐ์ดํ„ฐ ํŒŒํ‹ฐ์…”๋‹๊ณผ ์ธ๋ฑ์‹ฑ์€ ํŠนํžˆ ๋ถ„์‚ฐ ์‹œ์Šคํ…œ์—์„œ ์„ฑ๋Šฅ์„ ํ–ฅ์ƒ์‹œํ‚ฌ ์ˆ˜ ์žˆ์Šต๋‹ˆ๋‹ค. Delta Lake๋ฅผ ์‚ฌ์šฉํ•œ ํŒŒํ‹ฐ์…”๋‹๊ณผ ์ธ๋ฑ์‹ฑ ์˜ˆ์ œ๋ฅผ ์‚ดํŽด๋ณด๊ฒ ์Šต๋‹ˆ๋‹ค.

์˜ˆ์‹œ: Delta Lake๋ฅผ ํ™œ์šฉํ•œ ํŒŒํ‹ฐ์…”๋‹๊ณผ ์ธ๋ฑ์‹ฑ

from delta.tables import DeltaTable  
from pyspark.sql import SparkSession  
  
spark = SparkSession.builder \  
    .appName("Delta Lake Example") \  
    .getOrCreate()  
  
# Load data and write with partitioning  
df = spark.read.csv("s3://my-bucket/data.csv")  
df.write.format("delta").partitionBy("date").save("/delta/sales_data")  
  
# Optimize and create Z-Order index on relevant columns  
delta_table = DeltaTable.forPath(spark, "/delta/sales_data")  
delta_table.optimize().executeZOrderBy("customer_id")

์ด ์˜ˆ์‹œ์—์„œ๋Š”:

  • sales_data๋ฅผ ๋‚ ์งœ๋ณ„๋กœ ํŒŒํ‹ฐ์…”๋‹ํ•˜์—ฌ ๋ฐ์ดํ„ฐ ์Šค์บ” ํฌ๊ธฐ๋ฅผ ์ค„์—ฌ ์ฟผ๋ฆฌ ์„ฑ๋Šฅ์„ ํ–ฅ์ƒ์‹œํ‚ต๋‹ˆ๋‹ค.
  • customer_id์— Z-Order ์ธ๋ฑ์‹ฑ์„ ์ ์šฉํ•˜์—ฌ ํ•ด๋‹น ์—ด์— ๋Œ€ํ•œ ์ฝ๊ธฐ ์„ฑ๋Šฅ์„ ์ตœ์ ํ™”ํ•˜๊ณ , downstream AI ํ”„๋กœ์„ธ์Šค(์˜ˆ: ๊ณ ๊ฐ ๋งž์ถคํ˜• ๋ชจ๋ธ)๋ฅผ ๋” ๋น ๋ฅด๊ฒŒ ๋งŒ๋“ญ๋‹ˆ๋‹ค.

5. ๋ฐ์ดํ„ฐ ๋งˆ์Šคํ‚น ๋ฐ ์ต๋ช…ํ™”

๋ฏผ๊ฐํ•œ ๋ฐ์ดํ„ฐ๋ฅผ ์ฒ˜๋ฆฌํ•  ๋•Œ ์ต๋ช…ํ™”๋Š” ํ•„์ˆ˜์ ์ž…๋‹ˆ๋‹ค. ๋‹ค์Œ ์˜ˆ์‹œ๋Š” Python์˜ Faker ๋ผ์ด๋ธŒ๋Ÿฌ๋ฆฌ๋ฅผ ์‚ฌ์šฉํ•˜์—ฌ ์›๋ณธ ๋ฐ์ดํ„ฐ์…‹์˜ ๊ตฌ์กฐ์™€ ๋ถ„ํฌ๋ฅผ ์œ ์ง€ํ•˜๋ฉด์„œ ํ•ฉ์„ฑ๋œ ์ต๋ช…ํ™”๋œ ๋ฐ์ดํ„ฐ๋ฅผ ์ƒ์„ฑํ•˜๋Š” ๋ฐฉ๋ฒ•์„ ๋ณด์—ฌ์ค๋‹ˆ๋‹ค.

์˜ˆ์‹œ: Python์˜ Faker๋ฅผ ์‚ฌ์šฉํ•œ ๋ฐ์ดํ„ฐ ๋งˆ์Šคํ‚น

from faker import Faker  
import pandas as pd  
  
fake = Faker()  
df = pd.DataFrame({  
    "customer_id": [fake.uuid4() for _ in range(100)],  
    "customer_name": [fake.name() for _ in range(100)],  
    "transaction_amount": [fake.random_number(digits=5) for _ in range(100)]  
})  
  
# Display anonymized data  
print(df.head())

์ด ์˜ˆ์‹œ์—์„œ๋Š”:

  • Faker๊ฐ€ ๊ณ ๊ฐ ID๋ฅผ ์œ„ํ•œ ๊ฐ€์งœ UUID, ํ•ฉ์„ฑ๋œ ์ด๋ฆ„, ์ž„์˜์˜ ๊ฑฐ๋ž˜ ๊ธˆ์•ก์„ ์ƒ์„ฑํ•ฉ๋‹ˆ๋‹ค.
  • ์ด ์ต๋ช…ํ™”๋œ ๋ฐ์ดํ„ฐ์…‹์€ ๊ณ ๊ฐ ๊ฐœ์ธ์ •๋ณด๋ฅผ ์œ„ํ—˜์— ๋น ๋œจ๋ฆฌ์ง€ ์•Š๊ณ  AI ๋ชจ๋ธ ํ•™์Šต๊ณผ ํ…Œ์ŠคํŠธ์— ์•ˆ์ „ํ•˜๊ฒŒ ์‚ฌ์šฉํ•  ์ˆ˜ ์žˆ์Šต๋‹ˆ๋‹ค.
  • Faker๋ฅผ ์ผ๋ฐ˜์ ์ธ ์˜ˆ๋กœ ์‚ฌ์šฉํ–ˆ์ง€๋งŒ, PySyft, Presidio, SDV ๋“ฑ๊ณผ ๊ฐ™์€ ๋‹ค์–‘ํ•œ ๋งˆ์Šคํ‚น ๋ผ์ด๋ธŒ๋Ÿฌ๋ฆฌ๊ฐ€ ์žˆ์Šต๋‹ˆ๋‹ค. ์‚ฌ์šฉ์ž ์ •์˜ ํ•จ์ˆ˜๋ฅผ ๊ตฌ์ถ•ํ•  ์ˆ˜๋„ ์žˆ์Šต๋‹ˆ๋‹ค.

๊ฒฐ๋ก 

AI๋ฅผ ์œ„ํ•œ ๋ฐ์ดํ„ฐ ๊ด€๋ฆฌ์—๋Š” ์ „ํ†ต์ ์ธ ๋ฐ์ดํ„ฐ ์—”์ง€๋‹ˆ์–ด๋ง ๊ธฐ์ˆ ๊ณผ AI ์—์„œ ๋ฐœ์ƒํ•˜๋Š” ๋ฌธ์ œ๋ฅผ ์ฒ˜๋ฆฌํ•˜๊ธฐ ์œ„ํ•œ ์ „๋ฌธ ๊ธฐ์ˆ ์ด ๊ฒฐํ•ฉ๋ฉ๋‹ˆ๋‹ค. ์ŠคํŠธ๋ฆฌ๋ฐ ์ˆ˜์ง‘, ๋ฐ์ดํ„ฐ ๊ฒ€์ฆ, ์นดํƒˆ๋กœ๊ทธํ™”, ํŒŒํ‹ฐ์…”๋‹, ๋งˆ์Šคํ‚น๊ณผ ๊ฐ™์€ ์˜ˆ์‹œ๋ฅผ ํ†ตํ•ด ์ด๋Ÿฌํ•œ ๊ธฐ์ˆ ๋“ค์ด ์–ด๋–ป๊ฒŒ AI ์›Œํฌ๋กœ๋“œ๋ฅผ ์ง€์›ํ•˜๋Š”์ง€ ์•Œ ์ˆ˜ ์žˆ์Šต๋‹ˆ๋‹ค. AI๋ฅผ ์œ„ํ•œ ๋ฐ์ดํ„ฐ ์—”์ง€๋‹ˆ์–ด๋ง์˜ ๋ถ„์•ผ๋Š” ๋ฐฉ๋Œ€ํ•˜๋ฉฐ, AI๊ฐ€ ๋ฐœ์ „ํ•จ์— ๋”ฐ๋ผ ์ด๋Ÿฌํ•œ ๋„๊ตฌ์™€ ๋ชจ๋ฒ” ์‚ฌ๋ก€์— ๋Œ€ํ•œ ์ˆ™๋ จ๋„๋ฅผ ์œ ์ง€ํ•˜๋Š” ๊ฒƒ์ด ์ค‘์š”ํ•ฉ๋‹ˆ๋‹ค.