Data Engineering for AI: Practical Examples and Best Practices (opens new window)๋ฅผ ์ฝ๊ณ ์ ๋ฆฌํ์ต๋๋ค.
AI ๊ธฐ๋ฐ ์ ํ๋ฆฌ์ผ์ด์ ์ด ์ ์ ๋ ๋๋ฆฌ ์ฌ์ฉ๋จ์ ๋ฐ๋ผ, AI๋ฅผ ์ํ ๋ฐ์ดํฐ ๊ด๋ฆฌ๋ ์ ๋ฌธ์ ์ธ ๊ธฐ์ ๊ณผ ๊ฐ๋ ฅํ ํ๋ ์์ํฌ๋ฅผ ์๊ตฌํ๋ ๋ณต์กํ ์์ ์ผ๋ก ๋ฐ์ ํ์ต๋๋ค. AI ๋ฐ์ดํฐ ์์ง๋์ด๋ง์ ์์ ๋ฅผ ํตํด ํจ๊ป ์์๋ด ๋๋ค.
1. AI ์ํฌ๋ก๋๋ฅผ ์ํ ๋ฐ์ดํฐ ํ์ดํ๋ผ์ธ ๊ตฌ์ถ
AI๋ฅผ ์ํ ๋ฐ์ดํฐ ํ์ดํ๋ผ์ธ์ ์ ์ฐ์ฑ๊ณผ ํ์ฅ์ฑ์ด ํ์ํ๋ฉฐ, ์ข ์ข ๋ฐฐ์น(batch)์ ์คํธ๋ฆฌ๋ฐ(streaming) ๊ธฐ๋ฅ์ ๋ชจ๋ ์๊ตฌํฉ๋๋ค. Apache Spark์ Kafka๋ฅผ ์ฌ์ฉํ์ฌ ๋ฐ์ดํฐ๋ฅผ ์ฒ๋ฆฌํ๋ ํ์ดํ๋ผ์ธ ์์ ๋ฅผ ์ดํด๋ณด๊ฒ ์ต๋๋ค.
์์: Apache Spark์ Kafka๋ฅผ ํ์ฉํ ์ค์๊ฐ ๋ฐ์ดํฐ ์์ง
from pyspark.sql import SparkSession
from pyspark.sql.functions import from_json, col
from pyspark.sql.types import StructType, StructField, StringType, DoubleType
# Define the schema for incoming data
schema = StructType([
StructField("id", StringType(), True),
StructField("timestamp", StringType(), True),
StructField("value", DoubleType(), True)
])
# Initialize Spark session
spark = SparkSession.builder \
.appName("AI Data Pipeline") \
.getOrCreate()
# Read streaming data from Kafka
df = spark \
.readStream \
.format("kafka") \
.option("kafka.bootstrap.servers", "localhost:9092") \
.option("subscribe", "data-topic") \
.load()
# Parse JSON data
parsed_df = df.select(from_json(col("value").cast("string"), schema).alias("data")).select("data.*")
# Apply transformations for AI model
transformed_df = parsed_df.withColumn("feature", col("value") * 2) # Example feature engineering
# Write to data sink
query = transformed_df.writeStream \
.format("console") \
.outputMode("append") \
.start()
query.awaitTermination()
์ด ์์์์๋:
- Kafka ํ ํฝ์์ Spark๋ก ๋ฐ์ดํฐ๋ฅผ ๊ฐ์ ธ์ ๋ณํ ์์ ์ ์ํํฉ๋๋ค.
- ๊ฐ๋จํ ํผ์ณ ๋ณํ(feature transformation)์ ์ ์ฉํ์ฌ ์๋ก์ด ํผ์ณ ์ด(feature column)์ ์์ฑํ๋ฉฐ, ์ด๋ ์ดํ ๋จ๊ณ์์์ AI ๋ชจ๋ธ์ ์ ์ฉํ ์ ์์ต๋๋ค.
- ์ฒ๋ฆฌ๋ ๋ฐ์ดํฐ๋ ์ฝ์, ๋ฐ์ดํฐ ๋ ์ดํฌ, ๋๋ ์ค์๊ฐ์ผ๋ก AI ๋ชจ๋ธ์ ์ง์ ์ถ๋ ฅํ ์ ์์ต๋๋ค.
2. ๋ฐ์ดํฐ ํ์ง ๋ฐ ๊ด์ธก์ฑ
AI๋ฅผ ์ํด ๋ฐ์ดํฐ ํ์ง์ ๋ณด์ฅํ๋ ๊ฒ์ ๋งค์ฐ ์ค์ํ๋ฉฐ, ์๋ํ๋ ํ์ง ๊ฒ์ฌ๋ฅผ ์ค์ ํ๋ฉด ์ค๋ฅ๋ฅผ ํฌ๊ฒ ์ค์ผ ์ ์์ต๋๋ค. Great Expectations๋ฅผ ์ฌ์ฉํ์ฌ AI ๋ชจ๋ธ๋ก ๋ฐ์ดํฐ๊ฐ ์ ๋ฌ๋๊ธฐ ์ ์ ์ ์ ๋ฐ์ดํฐ๋ฅผ ๊ฒ์ฆํ๋ ์๋ฅผ ์ดํด๋ณด๊ฒ ์ต๋๋ค.
great expectations๋ python์ ๊ธฐ๋ฐ์ผ๋กํ ๋ฐ์ดํฐ ํ์ง ํ๊ฐ ์คํ์์ค ์ ๋๋ค. https://github.com/great-expectations/great_expectations ๋ฅผ ์ฐธ๊ณ ํ์ธ์
์์: Great Expectations๋ฅผ ํ์ฉํ ๋ฐ์ดํฐ ๊ฒ์ฆ
from great_expectations.core.batch import BatchRequest
from great_expectations.data_context import DataContext
# Initialize Great Expectations context
context = DataContext()
# Define the data batch to validate
batch_request = BatchRequest(
datasource_name="my_datasource",
data_connector_name="my_data_connector",
data_asset_name="my_table"
)
# Define a new expectation suite
suite = context.create_expectation_suite("ai_data_quality_suite", overwrite_existing=True)
# Add data expectations
validator = context.get_validator(batch_request=batch_request, expectation_suite_name="ai_data_quality_suite")
validator.expect_column_values_to_not_be_null("timestamp")
validator.expect_column_values_to_be_in_set("status", ["active", "inactive"])
# Run validation and check results
results = validator.validate()
if not results["success"]:
print("Data quality validation failed!")
else:
print("Data quality validation passed!")
์ด ์์์์๋:
- Great Expectations๋ฅผ ์ฌ์ฉํ์ฌ ๋ฐ์ดํฐ์ ์ฃผ์ ํน์ฑ๋ค์ ๊ฒ์ฆํฉ๋๋ค. ์๋ฅผ ๋ค์ด, ๋น์ด ์์ง ์์ ํ์์คํฌํ ๊ฐ์ด๋ ์ํ ์ด(status column)์์ ํ์ฉ ๊ฐ๋ฅํ ์ํ๋ฅผ ํ์ธํฉ๋๋ค.
- ์ด๋ฌํ ๊ฒ์ฆ์ AI ๋ชจ๋ธ ํ์ต ํ์ดํ๋ผ์ธ์ ๋ฐ์ดํฐ ์ด์๊ฐ์ด ์ ์ ๋๊ธฐ ์ ์ ๋ฌธ์ ๋ฅผ ๋ฐ๊ฒฌํ๋ ๋ฐ ๋์์ ์ค๋๋ค.
- Great Expectations๋ฅผ ์ผ๋ฐ์ ์ธ ์๋ก ์ ์ํ์ง๋ง, Splunk, SignalFx ๋๋ ์์ฒด ๊ฐ๋ฐํ ๋๊ตฌ์ ๊ฐ์ ๋ค๋ฅธ ํ๋ ์์ํฌ/๋๊ตฌ๋ฅผ ์ ํํ ์๋ ์์ต๋๋ค.
3. ๋ฐ์ดํฐ ์นดํ๋ก๊ทธ ๋ฐ ๋ฉํ๋ฐ์ดํฐ ๊ด๋ฆฌ
ํ๋ถํ ๋ฉํ๋ฐ์ดํฐ๋ฅผ ํฌํจํ ๋ฐ์ดํฐ ์นดํ๋ก๊ทธ๋ฅผ ์์ฑํ๋ฉด ๋ฐ์ดํฐ ๊ณผํ์๊ฐ ์ฌ์ฉํ๋ ๋ฐ์ดํฐ์ ๊ณ๋ณด(lineage), ํ์ง, ๋งฅ๋ฝ(context)์ ์ดํดํ๋ ๋ฐ ๋์์ ์ค๋๋ค. Apache Atlas๋ฅผ ์ฌ์ฉํ๋ฉด ๋ฉํ๋ฐ์ดํฐ๋ฅผ ํ๋ก๊ทธ๋๋ฐ ๋ฐฉ์์ผ๋ก ์นดํ๋ก๊ทธํํ ์ ์์ต๋๋ค.
์์: Apache Atlas API๋ฅผ ํ์ฉํ ๋ฐ์ดํฐ ์นดํ๋ก๊ทธํ
Apache Atlas๋ ๋ฐ์ดํฐ์ , ํ ์ด๋ธ, ๊ณ๋ณด๋ฅผ ๋ํ๋ด๋ ์ํฐํฐ๋ฅผ ์์ฑํ๋ ๋ฑ ๋ฉํ๋ฐ์ดํฐ๋ฅผ ๊ด๋ฆฌํ ์ ์๋ REST API๋ฅผ ์ ๊ณตํฉ๋๋ค.
import requests
import json
# Define the entity details for a new dataset
entity = {
"entities": [
{
"typeName": "hive_table",
"attributes": {
"name": "sales_data",
"qualifiedName": "sales_data@prod",
"description": "Sales data for AI model training",
"owner": "data_engineering_team",
"tableType": "MANAGED_TABLE",
"columns": [
{"name": "timestamp", "dataType": "date"},
{"name": "sale_amount", "dataType": "double"}
]
}
}
]
}
# Send a POST request to create the entity
response = requests.post(
"http://atlas-server:21000/api/atlas/v2/entity",
headers={"Content-Type": "application/json"},
data=json.dumps(entity)
)
if response.status_code == 200:
print("Entity created in Apache Atlas")
else:
print(f"Failed to create entity: {response.text}")# Define the entity details for a new dataset
entity = {
"entities": [
{
"typeName": "hive_table",
"attributes": {
"name": "sales_data",
"qualifiedName": "sales_data@prod",
"description": "Sales data for AI model training",
"owner": "data_engineering_team",
"tableType": "MANAGED_TABLE",
"columns": [
{"name": "timestamp", "dataType": "date"},
{"name": "sale_amount", "dataType": "double"}
]
}
}
]
}
์ด ์์์์๋:
- sales_data ํ ์ด๋ธ์ ์คํค๋ง, ์์ ์, ๋ชฉ์ ์ ๋ํ ๋ฉํ๋ฐ์ดํฐ๋ก ์นดํ๋ก๊ทธํํ์ฌ AI ํ์ต์ ์ฌ์ฉ๋๋ ๋ฐ์ดํฐ๋ฅผ ์ถ์ ํ๊ณ ์ดํดํ๊ธฐ ์ฝ๊ฒ ๋ง๋ญ๋๋ค.
4. ํ์ฅ์ฑ์ ์ํ ๋ฐ์ดํฐ ํํฐ์ ๋ ๋ฐ ์ธ๋ฑ์ฑ
๋ฐ์ดํฐ ํํฐ์ ๋๊ณผ ์ธ๋ฑ์ฑ์ ํนํ ๋ถ์ฐ ์์คํ ์์ ์ฑ๋ฅ์ ํฅ์์ํฌ ์ ์์ต๋๋ค. Delta Lake๋ฅผ ์ฌ์ฉํ ํํฐ์ ๋๊ณผ ์ธ๋ฑ์ฑ ์์ ๋ฅผ ์ดํด๋ณด๊ฒ ์ต๋๋ค.
์์: Delta Lake๋ฅผ ํ์ฉํ ํํฐ์ ๋๊ณผ ์ธ๋ฑ์ฑ
from delta.tables import DeltaTable
from pyspark.sql import SparkSession
spark = SparkSession.builder \
.appName("Delta Lake Example") \
.getOrCreate()
# Load data and write with partitioning
df = spark.read.csv("s3://my-bucket/data.csv")
df.write.format("delta").partitionBy("date").save("/delta/sales_data")
# Optimize and create Z-Order index on relevant columns
delta_table = DeltaTable.forPath(spark, "/delta/sales_data")
delta_table.optimize().executeZOrderBy("customer_id")
์ด ์์์์๋:
- sales_data๋ฅผ ๋ ์ง๋ณ๋ก ํํฐ์ ๋ํ์ฌ ๋ฐ์ดํฐ ์ค์บ ํฌ๊ธฐ๋ฅผ ์ค์ฌ ์ฟผ๋ฆฌ ์ฑ๋ฅ์ ํฅ์์ํต๋๋ค.
- customer_id์ Z-Order ์ธ๋ฑ์ฑ์ ์ ์ฉํ์ฌ ํด๋น ์ด์ ๋ํ ์ฝ๊ธฐ ์ฑ๋ฅ์ ์ต์ ํํ๊ณ , downstream AI ํ๋ก์ธ์ค(์: ๊ณ ๊ฐ ๋ง์ถคํ ๋ชจ๋ธ)๋ฅผ ๋ ๋น ๋ฅด๊ฒ ๋ง๋ญ๋๋ค.
5. ๋ฐ์ดํฐ ๋ง์คํน ๋ฐ ์ต๋ช ํ
๋ฏผ๊ฐํ ๋ฐ์ดํฐ๋ฅผ ์ฒ๋ฆฌํ ๋ ์ต๋ช ํ๋ ํ์์ ์ ๋๋ค. ๋ค์ ์์๋ Python์ Faker ๋ผ์ด๋ธ๋ฌ๋ฆฌ๋ฅผ ์ฌ์ฉํ์ฌ ์๋ณธ ๋ฐ์ดํฐ์ ์ ๊ตฌ์กฐ์ ๋ถํฌ๋ฅผ ์ ์งํ๋ฉด์ ํฉ์ฑ๋ ์ต๋ช ํ๋ ๋ฐ์ดํฐ๋ฅผ ์์ฑํ๋ ๋ฐฉ๋ฒ์ ๋ณด์ฌ์ค๋๋ค.
์์: Python์ Faker๋ฅผ ์ฌ์ฉํ ๋ฐ์ดํฐ ๋ง์คํน
from faker import Faker
import pandas as pd
fake = Faker()
df = pd.DataFrame({
"customer_id": [fake.uuid4() for _ in range(100)],
"customer_name": [fake.name() for _ in range(100)],
"transaction_amount": [fake.random_number(digits=5) for _ in range(100)]
})
# Display anonymized data
print(df.head())
์ด ์์์์๋:
- Faker๊ฐ ๊ณ ๊ฐ ID๋ฅผ ์ํ ๊ฐ์ง UUID, ํฉ์ฑ๋ ์ด๋ฆ, ์์์ ๊ฑฐ๋ ๊ธ์ก์ ์์ฑํฉ๋๋ค.
- ์ด ์ต๋ช ํ๋ ๋ฐ์ดํฐ์ ์ ๊ณ ๊ฐ ๊ฐ์ธ์ ๋ณด๋ฅผ ์ํ์ ๋น ๋จ๋ฆฌ์ง ์๊ณ AI ๋ชจ๋ธ ํ์ต๊ณผ ํ ์คํธ์ ์์ ํ๊ฒ ์ฌ์ฉํ ์ ์์ต๋๋ค.
- Faker๋ฅผ ์ผ๋ฐ์ ์ธ ์๋ก ์ฌ์ฉํ์ง๋ง, PySyft, Presidio, SDV ๋ฑ๊ณผ ๊ฐ์ ๋ค์ํ ๋ง์คํน ๋ผ์ด๋ธ๋ฌ๋ฆฌ๊ฐ ์์ต๋๋ค. ์ฌ์ฉ์ ์ ์ ํจ์๋ฅผ ๊ตฌ์ถํ ์๋ ์์ต๋๋ค.
๊ฒฐ๋ก
AI๋ฅผ ์ํ ๋ฐ์ดํฐ ๊ด๋ฆฌ์๋ ์ ํต์ ์ธ ๋ฐ์ดํฐ ์์ง๋์ด๋ง ๊ธฐ์ ๊ณผ AI ์์ ๋ฐ์ํ๋ ๋ฌธ์ ๋ฅผ ์ฒ๋ฆฌํ๊ธฐ ์ํ ์ ๋ฌธ ๊ธฐ์ ์ด ๊ฒฐํฉ๋ฉ๋๋ค. ์คํธ๋ฆฌ๋ฐ ์์ง, ๋ฐ์ดํฐ ๊ฒ์ฆ, ์นดํ๋ก๊ทธํ, ํํฐ์ ๋, ๋ง์คํน๊ณผ ๊ฐ์ ์์๋ฅผ ํตํด ์ด๋ฌํ ๊ธฐ์ ๋ค์ด ์ด๋ป๊ฒ AI ์ํฌ๋ก๋๋ฅผ ์ง์ํ๋์ง ์ ์ ์์ต๋๋ค. AI๋ฅผ ์ํ ๋ฐ์ดํฐ ์์ง๋์ด๋ง์ ๋ถ์ผ๋ ๋ฐฉ๋ํ๋ฉฐ, AI๊ฐ ๋ฐ์ ํจ์ ๋ฐ๋ผ ์ด๋ฌํ ๋๊ตฌ์ ๋ชจ๋ฒ ์ฌ๋ก์ ๋ํ ์๋ จ๋๋ฅผ ์ ์งํ๋ ๊ฒ์ด ์ค์ํฉ๋๋ค.
'note' ์นดํ ๊ณ ๋ฆฌ์ ๋ค๋ฅธ ๊ธ
PySpark ํ๋ก๋์ ํ๊ฒฝ์์ OutOfMemory(OOM) ๋ฌธ์ ํด๊ฒฐํ๊ธฐ (0) | 2024.12.12 |
---|---|
SparkSession๊ณผ SparkContext ์ดํดํ๊ธฐ (0) | 2024.12.04 |
Kubernetes HPA (Horizontal Pod Autoscaler) ์์๋ณด๊ธฐ (0) | 2024.11.27 |
Spark ์กฐ์ธ(join) ์ต์ ํ (0) | 2024.11.26 |
Kubernetes์ Probe ์์๋ณด๊ธฐ (0) | 2024.11.25 |