ํฐ์คํ ๋ฆฌ ๋ทฐ
๐ชถ Airflow๋ก ์์ฑํ๋ AI ํํธ ํธ๋ ์ด๋ฉ ํ์ดํ๋ผ์ธ
octo54 2025. 11. 3. 11:44๐ชถ Airflow๋ก ์์ฑํ๋ AI ํํธ ํธ๋ ์ด๋ฉ ํ์ดํ๋ผ์ธ
— ๋ฐ์ดํฐ ์์ง๋ถํฐ ๋ชจ๋ธ ๋ฐฐํฌ๊น์ง ํ ๋ฒ์ ์๋ํํ๊ธฐ
์ง๋ ๊ธ์์๋ MLflow๋ก ๋ชจ๋ธ ๋ฒ์ ๊ด๋ฆฌ์ ์๋ ๋ฐฐํฌ๊น์ง ๊ตฌํํ์ต๋๋ค.
์ด์ ๋จ์ ๋ง์ง๋ง ๋จ๊ณ๋,
์ด ๋ชจ๋ ์์
์ Airflow๋ก ์ค์ผ์ค๋ง·์๋ํํด ์์ ํ MLOps ์์คํ
์ผ๋ก ๋ง๋๋ ๊ฒ์
๋๋ค.
์ค๋์ ๋ฐ์ดํฐ ์์ง → ํ์ต → ๊ฒ์ฆ → ๋ฐฐํฌ → ๋ฆฌํฌํธ ์์ฑ์
ํ๋์ Airflow DAG(Directed Acyclic Graph)๋ก ๋ฌถ๋ ์ค์ ๊ตฌ์ถ ํธ์
๋๋ค.
๐ฏ ๋ชฉํ
“๋งค์ผ ์์นจ 6์, ์๋์ผ๋ก ํํธ ๋ชจ๋ธ์ด ํ์ต๋๊ณ ,
์ฑ๋ฅ์ด ๊ฐ์ ๋๋ฉด ์๋์ผ๋ก ๋ฐฐํฌ๋๋ ์์คํ ”
์ฆ, AI๊ฐ ์ค์ค๋ก ๋ฐ์ดํฐ์ ์ฝ๋๋ฅผ ์ ๋ฐ์ดํธํ๋ฉฐ ์งํํ๋ ํ๊ฒฝ์ ๋ง๋๋ ๊ฒ์ ๋๋ค.
โ๏ธ 1๏ธโฃ Airflow ํ๊ฒฝ ๊ตฌ์ฑ
์ค์น
pip install apache-airflow==2.9.0
airflow db init
airflow users create \
--username admin \
--password admin \
--firstname Quant \
--lastname AI \
--role Admin \
--email quant@ai.com
Airflow Web UI ์คํ:
airflow webserver -p 8080
airflow scheduler
๐ http://localhost:8080 ์ ์ → “quant_ai_pipeline” DAG์ด ๋ณด์ด๊ฒ ๋ฉ๋๋ค.
๐งฑ 2๏ธโฃ DAG ๊ธฐ๋ณธ ๊ตฌ์กฐ
# dags/quant_ai_pipeline.py
from airflow import DAG
from airflow.operators.python import PythonOperator
from datetime import datetime, timedelta
import os, subprocess, requests
default_args = {
"owner": "quant_ai",
"retries": 1,
"retry_delay": timedelta(minutes=5),
}
def collect_data():
subprocess.run(["python", "scripts/collect_factors.py"], check=True)
def train_model():
subprocess.run(["python", "scripts/train_transformer.py"], check=True)
def validate_model():
subprocess.run(["python", "scripts/validate_model.py"], check=True)
def deploy_model():
subprocess.run(["mlflow", "models", "serve", "-m", "models:/quant_ai/Production", "-p", "6000"])
def notify_slack(msg="Pipeline ์๋ฃ"):
requests.post(os.getenv("SLACK_WEBHOOK_URL"), json={"text": f"๐ {msg}"})
with DAG(
"quant_ai_pipeline",
default_args=default_args,
description="AI Quant End-to-End MLOps Pipeline",
schedule_interval="0 6 * * *", # ๋งค์ผ ์ค์ 6์
start_date=datetime(2025, 1, 1),
catchup=False,
) as dag:
t1 = PythonOperator(task_id="collect_data", python_callable=collect_data)
t2 = PythonOperator(task_id="train_model", python_callable=train_model)
t3 = PythonOperator(task_id="validate_model", python_callable=validate_model)
t4 = PythonOperator(task_id="deploy_model", python_callable=deploy_model)
t5 = PythonOperator(task_id="notify", python_callable=lambda: notify_slack("โ
์ค๋์ ํ์ดํ๋ผ์ธ ์๋ฃ"))
t1 >> t2 >> t3 >> t4 >> t5
๐ 3๏ธโฃ DAG ๋จ๊ณ๋ณ ์ค๋ช
๋จ๊ณ ์ญํ ๋๊ตฌ
| collect_data | yfinance, PostgreSQL, Airflow XCom | ์ค์๊ฐ ํฉํฐ ๋ฐ์ดํฐ ์์ง |
| train_model | MLflow, PyTorch | Transformer ๋๋ RL ๋ชจ๋ธ ํ์ต |
| validate_model | Scikit-learn | MSE, R², Sharpe ๋ฑ ๊ฒ์ฆ |
| deploy_model | MLflow Serve, Docker | ๋ชจ๋ธ ์๋ ๋ฐฐํฌ |
| notify | Slack | ๊ฒฐ๊ณผ ์๋ฆผ ์ ์ก |
๊ฐ ๋จ๊ณ๋ ์์กด ๊ด๊ณ(task dependency) ๋ก ์ฐ๊ฒฐ๋์ด ์์ต๋๋ค.
์ฆ, ๋ฐ์ดํฐ๊ฐ ์์ง๋์ด์ผ ํ์ต์ด ์์๋๊ณ ,
๊ฒ์ฆ ํต๊ณผ ํ์๋ง ๋ฐฐํฌ๊ฐ ์งํ๋ฉ๋๋ค.
๐งฉ 4๏ธโฃ XCom์ผ๋ก ๋จ๊ณ ๊ฐ ๋ฐ์ดํฐ ์ ๋ฌ
Airflow์ XCom์ ํ์ฉํ๋ฉด ์ด์ ๋จ๊ณ์ ์ถ๋ ฅ์ ๋ค์ ํ์คํฌ์ ๋๊ธธ ์ ์์ต๋๋ค.
def validate_model(**context):
result = subprocess.run(["python", "scripts/validate_model.py"], capture_output=True, text=True)
context['ti'].xcom_push(key='validation_score', value=result.stdout)
def deploy_model(**context):
score = float(context['ti'].xcom_pull(key='validation_score'))
if score > 0.8:
subprocess.run(["python", "scripts/deploy_model.py"])
else:
print("๐ซ ๋ชจ๋ธ ์ฑ๋ฅ ๋ฏธ๋ฌ, ๋ฐฐํฌ ์ทจ์")
๐ ์ด๋ ๊ฒ ํ๋ฉด ์ฑ๋ฅ ๊ธฐ์ค์ ์๋์ผ๋ก ํ๊ฐํ๊ณ ,
์ฑ๊ณตํ ๋ชจ๋ธ๋ง ๋ฐฐํฌ๋ฉ๋๋ค.
โ๏ธ 5๏ธโฃ MLflow + Airflow ํตํฉ
Airflow DAG์์ MLflow ์คํ ๊ธฐ๋ก์ ์๋ํํฉ๋๋ค.
def train_model():
import mlflow
mlflow.set_tracking_uri("http://localhost:5001")
mlflow.set_experiment("quant_ai_pipeline")
with mlflow.start_run(run_name="daily_train"):
subprocess.run(["python", "scripts/train_transformer.py"], check=True)
mlflow.log_param("source", "Airflow-DAG")
MLflow UI์์ ๊ฐ Airflow ์คํ์ “ํ๋ฃจ ๋จ์ ๋ชจ๋ธ ์คํ”์ผ๋ก ๊ด๋ฆฌํ ์ ์์ต๋๋ค.
๐ 6๏ธโฃ Streamlit ์ค์๊ฐ ๋ชจ๋ํฐ๋ง
import streamlit as st
import pandas as pd
import requests
AIRFLOW_URL = "http://localhost:8080/api/v1/dags/quant_ai_pipeline/dagRuns"
MLFLOW_URL = "http://localhost:5001/api/2.0/preview/mlflow/experiments/list"
st.title("๐ Quant AI Pipeline Monitoring")
st.subheader("Airflow DAG ์ํ")
st.json(requests.get(AIRFLOW_URL, auth=('admin', 'admin')).json())
st.subheader("์ต๊ทผ MLflow ์คํ ๊ธฐ๋ก")
data = requests.get(MLFLOW_URL).json()
st.write(pd.DataFrame(data['experiments']))
์ด Streamlit ๋์๋ณด๋์์๋
- Airflow DAG ์คํ ์ํ
- MLflow ์คํ ๊ฒฐ๊ณผ
- ์ต๊ทผ ๋ฐฐํฌ๋ ๋ชจ๋ธ ๋ฒ์
์ ํ ํ๋ฉด์์ ๋ชจ๋ํฐ๋งํ ์ ์์ต๋๋ค.
๐ 7๏ธโฃ Airflow + Slack ์๋ฆผ
def notify_slack(msg):
requests.post(os.getenv("SLACK_WEBHOOK_URL"), json={"text": f"๐ข {msg}"})
def on_failure_callback(context):
notify_slack(f"โ ์คํจ: {context['task_instance'].task_id}")
t1 = PythonOperator(
task_id="collect_data",
python_callable=collect_data,
on_failure_callback=on_failure_callback
)
์ด๋ค ๋จ๊ณ์์ ์คํจํ๋๋ผ๋ Slack์ผ๋ก ์ฆ์ ์๋ฆผ์ด ์ ์ก๋ฉ๋๋ค.
๐ง 8๏ธโฃ Docker Compose๋ก ํตํฉ ์คํ
version: "3.9"
services:
postgres:
image: postgres:15
environment:
POSTGRES_USER: airflow
POSTGRES_PASSWORD: airflow
POSTGRES_DB: airflow
airflow:
image: apache/airflow:2.9.0
depends_on: [postgres]
ports:
- "8080:8080"
volumes:
- ./dags:/opt/airflow/dags
environment:
AIRFLOW__CORE__EXECUTOR: LocalExecutor
AIRFLOW__CORE__SQL_ALCHEMY_CONN: postgresql+psycopg2://airflow:airflow@postgres/airflow
AIRFLOW__WEBSERVER__RBAC: "True"
SLACK_WEBHOOK_URL: ${SLACK_WEBHOOK_URL}
๐ ์ ๋ฆฌ
๋จ๊ณ ์ค๋ช
| 1 | Airflow DAG ์ ์ ๋ฐ ์ค์ผ์ค ๋ฑ๋ก |
| 2 | MLflow ํตํฉ์ผ๋ก ๋ชจ๋ธ ์คํ ๊ธฐ๋ก |
| 3 | XCom์ ํตํ ์ฑ๋ฅ ๊ฒ์ฆ ๊ฒฐ๊ณผ ์ ๋ฌ |
| 4 | ์ฑ๊ณต ๋ชจ๋ธ๋ง ์๋ ๋ฐฐํฌ |
| 5 | Slack + Streamlit์ผ๋ก ์ค์๊ฐ ๋ชจ๋ํฐ๋ง |
์ด์ ๋งค์ผ ์์นจ, ๋น์ ์ด ์ถ๊ทผํ๊ธฐ ์ ์
AI๊ฐ ์์์ ์๋ก์ด ๋ฐ์ดํฐ๋ฅผ ํ์ตํ๊ณ , ๋ชจ๋ธ์ ํ๊ฐํ๊ณ , ํ์ํ๋ฉด ๋ฐฐํฌํฉ๋๋ค.
์ด๊ฒ ๋ฐ๋ก **“์ค์ค๋ก ์งํํ๋ ํํธ ์์คํ ”**์ ํต์ฌ์ ๋๋ค.
๐ ๋ค์ ๊ธ ์๊ณ
๋ค์ ํธ์์๋ **“์ค์ ํํธ ํ๋ ์ด์ ์ ๋ต – ๋ฐฑํ
์คํธ์ ์ค์๊ฐ ๊ฑฐ๋ ๋ฐ์ดํฐ ๋๊ธฐํ ์์คํ
”**์ ๋ค๋ฃน๋๋ค.
AI ๋ชจ๋ธ์ด ํ์ตํ ๊ฒฐ๊ณผ๋ฅผ ์ค์ ๊ฑฐ๋์ ๋ฐ์ดํฐ์ ๋๊ธฐํํ์ฌ
์ค์๊ฐ ํฌํธํด๋ฆฌ์ค ์ถ์ ๋ฐ ์ํ ๊ด๋ฆฌ ์์คํ
์ ์์ฑํ ์์ ์
๋๋ค.
Airflow,MLflow,MLOps,ํํธAI,์๋๋ฐฐํฌ,๋ฐ์ดํฐํ์ดํ๋ผ์ธ,ํ์ด์ฌ์๋ํ,PostgreSQL,Streamlit,AIํฌ์
'study > airflow' ์นดํ ๊ณ ๋ฆฌ์ ๋ค๋ฅธ ๊ธ
| ๐งฉ Airflow ๊ธฐ์ด๋ถํฐ ์ฌํ๊น์ง 1ํธ (0) | 2025.10.30 |
|---|
- Total
- Today
- Yesterday
- PostgreSQL
- Python
- Redis
- ๋ฅ๋ฌ๋
- ai์ฒ ํ
- node.js
- CI/CD
- llm
- ์์ฑํAI
- seo ์ต์ ํ 10๊ฐ
- ๊ฐ๋ฐ๋ธ๋ก๊ทธ
- SEO์ต์ ํ
- nextJS
- REACT
- JWT
- NestJS
- Express
- rag
- JAX
- ๋ฐฑ์๋๊ฐ๋ฐ
- Docker
- Next.js
- LangChain
- DevOps
- flax
- fastapi
- ์น๊ฐ๋ฐ
- kotlin
- Prisma
- ์ฟ ๋ฒ๋คํฐ์ค
| ์ผ | ์ | ํ | ์ | ๋ชฉ | ๊ธ | ํ |
|---|---|---|---|---|---|---|
| 1 | 2 | 3 | 4 | |||
| 5 | 6 | 7 | 8 | 9 | 10 | 11 |
| 12 | 13 | 14 | 15 | 16 | 17 | 18 |
| 19 | 20 | 21 | 22 | 23 | 24 | 25 |
| 26 | 27 | 28 | 29 | 30 |

