ํ‹ฐ์Šคํ† ๋ฆฌ ๋ทฐ

๋ฐ˜์‘ํ˜•

๐Ÿชถ Airflow๋กœ ์™„์„ฑํ•˜๋Š” AI ํ€€ํŠธ ํŠธ๋ ˆ์ด๋”ฉ ํŒŒ์ดํ”„๋ผ์ธ

— ๋ฐ์ดํ„ฐ ์ˆ˜์ง‘๋ถ€ํ„ฐ ๋ชจ๋ธ ๋ฐฐํฌ๊นŒ์ง€ ํ•œ ๋ฒˆ์— ์ž๋™ํ™”ํ•˜๊ธฐ

์ง€๋‚œ ๊ธ€์—์„œ๋Š” MLflow๋กœ ๋ชจ๋ธ ๋ฒ„์ „ ๊ด€๋ฆฌ์™€ ์ž๋™ ๋ฐฐํฌ๊นŒ์ง€ ๊ตฌํ˜„ํ–ˆ์Šต๋‹ˆ๋‹ค.
์ด์ œ ๋‚จ์€ ๋งˆ์ง€๋ง‰ ๋‹จ๊ณ„๋Š”,
์ด ๋ชจ๋“  ์ž‘์—…์„ Airflow๋กœ ์Šค์ผ€์ค„๋ง·์ž๋™ํ™”ํ•ด ์™„์ „ํ•œ MLOps ์‹œ์Šคํ…œ์œผ๋กœ ๋งŒ๋“œ๋Š” ๊ฒƒ์ž…๋‹ˆ๋‹ค.

์˜ค๋Š˜์€ ๋ฐ์ดํ„ฐ ์ˆ˜์ง‘ → ํ•™์Šต → ๊ฒ€์ฆ → ๋ฐฐํฌ → ๋ฆฌํฌํŠธ ์ƒ์„ฑ์„
ํ•˜๋‚˜์˜ Airflow DAG(Directed Acyclic Graph)๋กœ ๋ฌถ๋Š” ์‹ค์ „ ๊ตฌ์ถ• ํŽธ์ž…๋‹ˆ๋‹ค.


๐ŸŽฏ ๋ชฉํ‘œ

“๋งค์ผ ์•„์นจ 6์‹œ, ์ž๋™์œผ๋กœ ํ€€ํŠธ ๋ชจ๋ธ์ด ํ•™์Šต๋˜๊ณ ,
์„ฑ๋Šฅ์ด ๊ฐœ์„ ๋˜๋ฉด ์ž๋™์œผ๋กœ ๋ฐฐํฌ๋˜๋Š” ์‹œ์Šคํ…œ”

์ฆ‰, AI๊ฐ€ ์Šค์Šค๋กœ ๋ฐ์ดํ„ฐ์™€ ์ฝ”๋“œ๋ฅผ ์—…๋ฐ์ดํŠธํ•˜๋ฉฐ ์ง„ํ™”ํ•˜๋Š” ํ™˜๊ฒฝ์„ ๋งŒ๋“œ๋Š” ๊ฒƒ์ž…๋‹ˆ๋‹ค.


โš™๏ธ 1๏ธโƒฃ Airflow ํ™˜๊ฒฝ ๊ตฌ์„ฑ

์„ค์น˜

pip install apache-airflow==2.9.0
airflow db init
airflow users create \
    --username admin \
    --password admin \
    --firstname Quant \
    --lastname AI \
    --role Admin \
    --email quant@ai.com

Airflow Web UI ์‹คํ–‰:

airflow webserver -p 8080
airflow scheduler

๐Ÿ‘‰ http://localhost:8080 ์ ‘์† → “quant_ai_pipeline” DAG์ด ๋ณด์ด๊ฒŒ ๋ฉ๋‹ˆ๋‹ค.


๐Ÿงฑ 2๏ธโƒฃ DAG ๊ธฐ๋ณธ ๊ตฌ์กฐ

๋ฐ˜์‘ํ˜•
# dags/quant_ai_pipeline.py
from airflow import DAG
from airflow.operators.python import PythonOperator
from datetime import datetime, timedelta
import os, subprocess, requests

default_args = {
    "owner": "quant_ai",
    "retries": 1,
    "retry_delay": timedelta(minutes=5),
}

def collect_data():
    subprocess.run(["python", "scripts/collect_factors.py"], check=True)

def train_model():
    subprocess.run(["python", "scripts/train_transformer.py"], check=True)

def validate_model():
    subprocess.run(["python", "scripts/validate_model.py"], check=True)

def deploy_model():
    subprocess.run(["mlflow", "models", "serve", "-m", "models:/quant_ai/Production", "-p", "6000"])

def notify_slack(msg="Pipeline ์™„๋ฃŒ"):
    requests.post(os.getenv("SLACK_WEBHOOK_URL"), json={"text": f"๐Ÿš€ {msg}"})

with DAG(
    "quant_ai_pipeline",
    default_args=default_args,
    description="AI Quant End-to-End MLOps Pipeline",
    schedule_interval="0 6 * * *",  # ๋งค์ผ ์˜ค์ „ 6์‹œ
    start_date=datetime(2025, 1, 1),
    catchup=False,
) as dag:

    t1 = PythonOperator(task_id="collect_data", python_callable=collect_data)
    t2 = PythonOperator(task_id="train_model", python_callable=train_model)
    t3 = PythonOperator(task_id="validate_model", python_callable=validate_model)
    t4 = PythonOperator(task_id="deploy_model", python_callable=deploy_model)
    t5 = PythonOperator(task_id="notify", python_callable=lambda: notify_slack("โœ… ์˜ค๋Š˜์˜ ํŒŒ์ดํ”„๋ผ์ธ ์™„๋ฃŒ"))

    t1 >> t2 >> t3 >> t4 >> t5

๐Ÿ“Š 3๏ธโƒฃ DAG ๋‹จ๊ณ„๋ณ„ ์„ค๋ช…

๋‹จ๊ณ„ ์—ญํ•  ๋„๊ตฌ

collect_data yfinance, PostgreSQL, Airflow XCom ์‹ค์‹œ๊ฐ„ ํŒฉํ„ฐ ๋ฐ์ดํ„ฐ ์ˆ˜์ง‘
train_model MLflow, PyTorch Transformer ๋˜๋Š” RL ๋ชจ๋ธ ํ•™์Šต
validate_model Scikit-learn MSE, R², Sharpe ๋“ฑ ๊ฒ€์ฆ
deploy_model MLflow Serve, Docker ๋ชจ๋ธ ์ž๋™ ๋ฐฐํฌ
notify Slack ๊ฒฐ๊ณผ ์•Œ๋ฆผ ์ „์†ก

๊ฐ ๋‹จ๊ณ„๋Š” ์˜์กด ๊ด€๊ณ„(task dependency) ๋กœ ์—ฐ๊ฒฐ๋˜์–ด ์žˆ์Šต๋‹ˆ๋‹ค.
์ฆ‰, ๋ฐ์ดํ„ฐ๊ฐ€ ์ˆ˜์ง‘๋˜์–ด์•ผ ํ•™์Šต์ด ์‹œ์ž‘๋˜๊ณ ,
๊ฒ€์ฆ ํ†ต๊ณผ ํ›„์—๋งŒ ๋ฐฐํฌ๊ฐ€ ์ง„ํ–‰๋ฉ๋‹ˆ๋‹ค.


๐Ÿงฉ 4๏ธโƒฃ XCom์œผ๋กœ ๋‹จ๊ณ„ ๊ฐ„ ๋ฐ์ดํ„ฐ ์ „๋‹ฌ

Airflow์˜ XCom์„ ํ™œ์šฉํ•˜๋ฉด ์ด์ „ ๋‹จ๊ณ„์˜ ์ถœ๋ ฅ์„ ๋‹ค์Œ ํƒœ์Šคํฌ์— ๋„˜๊ธธ ์ˆ˜ ์žˆ์Šต๋‹ˆ๋‹ค.

def validate_model(**context):
    result = subprocess.run(["python", "scripts/validate_model.py"], capture_output=True, text=True)
    context['ti'].xcom_push(key='validation_score', value=result.stdout)

def deploy_model(**context):
    score = float(context['ti'].xcom_pull(key='validation_score'))
    if score > 0.8:
        subprocess.run(["python", "scripts/deploy_model.py"])
    else:
        print("๐Ÿšซ ๋ชจ๋ธ ์„ฑ๋Šฅ ๋ฏธ๋‹ฌ, ๋ฐฐํฌ ์ทจ์†Œ")

๐Ÿ‘‰ ์ด๋ ‡๊ฒŒ ํ•˜๋ฉด ์„ฑ๋Šฅ ๊ธฐ์ค€์„ ์ž๋™์œผ๋กœ ํ‰๊ฐ€ํ•˜๊ณ ,
์„ฑ๊ณตํ•œ ๋ชจ๋ธ๋งŒ ๋ฐฐํฌ๋ฉ๋‹ˆ๋‹ค.


โ˜๏ธ 5๏ธโƒฃ MLflow + Airflow ํ†ตํ•ฉ

Airflow DAG์—์„œ MLflow ์‹คํ—˜ ๊ธฐ๋ก์„ ์ž๋™ํ™”ํ•ฉ๋‹ˆ๋‹ค.

def train_model():
    import mlflow
    mlflow.set_tracking_uri("http://localhost:5001")
    mlflow.set_experiment("quant_ai_pipeline")
    with mlflow.start_run(run_name="daily_train"):
        subprocess.run(["python", "scripts/train_transformer.py"], check=True)
        mlflow.log_param("source", "Airflow-DAG")

MLflow UI์—์„œ ๊ฐ Airflow ์‹คํ–‰์„ “ํ•˜๋ฃจ ๋‹จ์œ„ ๋ชจ๋ธ ์‹คํ—˜”์œผ๋กœ ๊ด€๋ฆฌํ•  ์ˆ˜ ์žˆ์Šต๋‹ˆ๋‹ค.


๐Ÿ“ˆ 6๏ธโƒฃ Streamlit ์‹ค์‹œ๊ฐ„ ๋ชจ๋‹ˆํ„ฐ๋ง

import streamlit as st
import pandas as pd
import requests

AIRFLOW_URL = "http://localhost:8080/api/v1/dags/quant_ai_pipeline/dagRuns"
MLFLOW_URL = "http://localhost:5001/api/2.0/preview/mlflow/experiments/list"

st.title("๐Ÿ“Š Quant AI Pipeline Monitoring")
st.subheader("Airflow DAG ์ƒํƒœ")
st.json(requests.get(AIRFLOW_URL, auth=('admin', 'admin')).json())

st.subheader("์ตœ๊ทผ MLflow ์‹คํ—˜ ๊ธฐ๋ก")
data = requests.get(MLFLOW_URL).json()
st.write(pd.DataFrame(data['experiments']))

์ด Streamlit ๋Œ€์‹œ๋ณด๋“œ์—์„œ๋Š”

  • Airflow DAG ์‹คํ–‰ ์ƒํƒœ
  • MLflow ์‹คํ—˜ ๊ฒฐ๊ณผ
  • ์ตœ๊ทผ ๋ฐฐํฌ๋œ ๋ชจ๋ธ ๋ฒ„์ „
    ์„ ํ•œ ํ™”๋ฉด์—์„œ ๋ชจ๋‹ˆํ„ฐ๋งํ•  ์ˆ˜ ์žˆ์Šต๋‹ˆ๋‹ค.

๐Ÿ” 7๏ธโƒฃ Airflow + Slack ์•Œ๋ฆผ

def notify_slack(msg):
    requests.post(os.getenv("SLACK_WEBHOOK_URL"), json={"text": f"๐Ÿ“ข {msg}"})

def on_failure_callback(context):
    notify_slack(f"โŒ ์‹คํŒจ: {context['task_instance'].task_id}")

t1 = PythonOperator(
    task_id="collect_data",
    python_callable=collect_data,
    on_failure_callback=on_failure_callback
)

์–ด๋–ค ๋‹จ๊ณ„์—์„œ ์‹คํŒจํ•˜๋”๋ผ๋„ Slack์œผ๋กœ ์ฆ‰์‹œ ์•Œ๋ฆผ์ด ์ „์†ก๋ฉ๋‹ˆ๋‹ค.


๐Ÿง  8๏ธโƒฃ Docker Compose๋กœ ํ†ตํ•ฉ ์‹คํ–‰

version: "3.9"
services:
  postgres:
    image: postgres:15
    environment:
      POSTGRES_USER: airflow
      POSTGRES_PASSWORD: airflow
      POSTGRES_DB: airflow
  airflow:
    image: apache/airflow:2.9.0
    depends_on: [postgres]
    ports:
      - "8080:8080"
    volumes:
      - ./dags:/opt/airflow/dags
    environment:
      AIRFLOW__CORE__EXECUTOR: LocalExecutor
      AIRFLOW__CORE__SQL_ALCHEMY_CONN: postgresql+psycopg2://airflow:airflow@postgres/airflow
      AIRFLOW__WEBSERVER__RBAC: "True"
      SLACK_WEBHOOK_URL: ${SLACK_WEBHOOK_URL}

๐Ÿ“Œ ์ •๋ฆฌ

๋‹จ๊ณ„ ์„ค๋ช…

1 Airflow DAG ์ •์˜ ๋ฐ ์Šค์ผ€์ค„ ๋“ฑ๋ก
2 MLflow ํ†ตํ•ฉ์œผ๋กœ ๋ชจ๋ธ ์‹คํ—˜ ๊ธฐ๋ก
3 XCom์„ ํ†ตํ•œ ์„ฑ๋Šฅ ๊ฒ€์ฆ ๊ฒฐ๊ณผ ์ „๋‹ฌ
4 ์„ฑ๊ณต ๋ชจ๋ธ๋งŒ ์ž๋™ ๋ฐฐํฌ
5 Slack + Streamlit์œผ๋กœ ์‹ค์‹œ๊ฐ„ ๋ชจ๋‹ˆํ„ฐ๋ง

์ด์ œ ๋งค์ผ ์•„์นจ, ๋‹น์‹ ์ด ์ถœ๊ทผํ•˜๊ธฐ ์ „์—
AI๊ฐ€ ์•Œ์•„์„œ ์ƒˆ๋กœ์šด ๋ฐ์ดํ„ฐ๋ฅผ ํ•™์Šตํ•˜๊ณ , ๋ชจ๋ธ์„ ํ‰๊ฐ€ํ•˜๊ณ , ํ•„์š”ํ•˜๋ฉด ๋ฐฐํฌํ•ฉ๋‹ˆ๋‹ค.

์ด๊ฒŒ ๋ฐ”๋กœ **“์Šค์Šค๋กœ ์ง„ํ™”ํ•˜๋Š” ํ€€ํŠธ ์‹œ์Šคํ…œ”**์˜ ํ•ต์‹ฌ์ž…๋‹ˆ๋‹ค.


๐Ÿ“˜ ๋‹ค์Œ ๊ธ€ ์˜ˆ๊ณ 

๋‹ค์Œ ํŽธ์—์„œ๋Š” **“์‹ค์ „ ํ€€ํŠธ ํŽ€๋“œ ์šด์˜ ์ „๋žต – ๋ฐฑํ…Œ์ŠคํŠธ์™€ ์‹ค์‹œ๊ฐ„ ๊ฑฐ๋ž˜ ๋ฐ์ดํ„ฐ ๋™๊ธฐํ™” ์‹œ์Šคํ…œ”**์„ ๋‹ค๋ฃน๋‹ˆ๋‹ค.
AI ๋ชจ๋ธ์ด ํ•™์Šตํ•œ ๊ฒฐ๊ณผ๋ฅผ ์‹ค์ œ ๊ฑฐ๋ž˜์†Œ ๋ฐ์ดํ„ฐ์™€ ๋™๊ธฐํ™”ํ•˜์—ฌ
์‹ค์‹œ๊ฐ„ ํฌํŠธํด๋ฆฌ์˜ค ์ถ”์  ๋ฐ ์œ„ํ—˜ ๊ด€๋ฆฌ ์‹œ์Šคํ…œ์„ ์™„์„ฑํ•  ์˜ˆ์ •์ž…๋‹ˆ๋‹ค.


 

Airflow,MLflow,MLOps,ํ€€ํŠธAI,์ž๋™๋ฐฐํฌ,๋ฐ์ดํ„ฐํŒŒ์ดํ”„๋ผ์ธ,ํŒŒ์ด์ฌ์ž๋™ํ™”,PostgreSQL,Streamlit,AIํˆฌ์ž


 

'study > airflow' ์นดํ…Œ๊ณ ๋ฆฌ์˜ ๋‹ค๋ฅธ ๊ธ€

๐Ÿงฉ Airflow ๊ธฐ์ดˆ๋ถ€ํ„ฐ ์‹ฌํ™”๊นŒ์ง€ 1ํŽธ  (0) 2025.10.30
โ€ป ์ด ํฌ์ŠคํŒ…์€ ์ฟ ํŒก ํŒŒํŠธ๋„ˆ์Šค ํ™œ๋™์˜ ์ผํ™˜์œผ๋กœ, ์ด์— ๋”ฐ๋ฅธ ์ผ์ •์•ก์˜ ์ˆ˜์ˆ˜๋ฃŒ๋ฅผ ์ œ๊ณต๋ฐ›์Šต๋‹ˆ๋‹ค.
๊ณต์ง€์‚ฌํ•ญ
์ตœ๊ทผ์— ์˜ฌ๋ผ์˜จ ๊ธ€
์ตœ๊ทผ์— ๋‹ฌ๋ฆฐ ๋Œ“๊ธ€
Total
Today
Yesterday
๋งํฌ
ยซ   2026/04   ยป
์ผ ์›” ํ™” ์ˆ˜ ๋ชฉ ๊ธˆ ํ† 
1 2 3 4
5 6 7 8 9 10 11
12 13 14 15 16 17 18
19 20 21 22 23 24 25
26 27 28 29 30
๊ธ€ ๋ณด๊ด€ํ•จ
๋ฐ˜์‘ํ˜•