Skip to main content

Command Palette

Search for a command to run...

MLFLOW Pipeline and Intro to aws

Updated
5 min read

MLFLOW Pipeline with DVC, MLFLOW, Dagshub

여기서는 MLFLOW, Dagshub, DVC를 이용해 모델을 트랙킹하는 것을 실제로 진행하는 과정을 보여준다. 우선 결론부터


├── data
│   ├── processed
│   │   └── data.csv
│   └── raw
│       ├── data.csv
│       └── data.csv.dvc
├── dvc.lock
├── dvc.yaml
├── models
│   └── model.pkl
├── params.yaml
├── README.md
├── requirements.txt
└── src
    ├── evaluate.py
    ├── __init__.py
    ├── preprocess.py
    └── train.py

6 directories, 13 files

이러한 디렉토리 구조의 프로그램을 작성하고 이를 관리한다.

우선 params.yamlpreprocess 관련 데이터셋과 train의 데이터셋과 파라미터를 집어넣는다.

결과적으로

preprocess:
  input: data/raw/data.csv
  output: data/processed/data.csv

train:
  data: data/raw/data.csv
  model: models/model.pkl
  random_state: 42
  n_estimators: 100
  max_depth: 5

이러한 형태를 바탕으로 하고, 이 yaml 파일을 읽어와서 이후의 전처리와 학습에 사용하게 된다.

따라서 preprocess.py 파일에서

import pandas as pd
import sys
import yaml
import os

## Load parameters from param.yaml

params=yaml.safe_load(open("params.yaml"))['preprocess']

def preprocess(input_path,output_path):
    data=pd.read_csv(input_path)

    os.makedirs(os.path.dirname(output_path),exist_ok=True)
    data.to_csv(output_path,header=None,index=False)
    print(f"Preprocesses data saved to {output_path}")

if __name__=="__main__":
    preprocess(params["input"],params["output"])

이러한 식으로 yaml을 읽어와서 전처리하게 된다. 이 파일을 실행하면 처음의 전체 디렉토리 구조에서 봤던 preprocessed/data.csv가 생성된다.

데이터의 전처리가 끝났다면 다음은 모델 차례다.

모델은 trainevaluate 두 과정으로 진행되는데

train.py

import os
import pickle
from urllib.parse import urlparse

import dagshub
import mlflow
import pandas as pd
import yaml
from mlflow.models import infer_signature
from sklearn.ensemble import RandomForestClassifier
from sklearn.metrics import accuracy_score, classification_report, confusion_matrix
from sklearn.model_selection import GridSearchCV, train_test_split

# Initialize dagshub integration
dagshub.init(repo_owner="maximizemaxwell", repo_name="MLOps-pipeline", mlflow=True)


def hyperparameter_tuning(X_train, y_train, param_grid):
    rf = RandomForestClassifier()
    grid_search = GridSearchCV(
        estimator=rf, param_grid=param_grid, cv=3, n_jobs=-1, verbose=2
    )
    grid_search.fit(X_train, y_train)
    return grid_search


def train(data_path: str, model_path: str):
    # Load data
    data = pd.read_csv(data_path, header=None)
    X = data.iloc[:, :-1]
    y = data.iloc[:, -1]

    # Set MLflow URI (redundant with dagshub.init, can skip if you trust dagshub setup)
    mlflow.set_tracking_uri("https://dagshub.com/maximizemaxwell/MLOps-pipeline.mlflow")

    with mlflow.start_run():
        # Train-test split
        X_train, X_test, y_train, y_test = train_test_split(
            X, y, test_size=0.2, random_state=42
        )
        signature = infer_signature(X_train, y_train)

        # Define hyperparameter grid
        param_grid = {
            "n_estimators": [100, 200],
            "max_depth": [5, 10, None],
            "min_samples_split": [2, 5],
            "min_samples_leaf": [1, 2],
        }

        # Hyperparameter tuning
        grid_search = hyperparameter_tuning(X_train, y_train, param_grid)
        best_model = grid_search.best_estimator_

        # Evaluation
        y_pred = best_model.predict(X_test)
        accuracy = accuracy_score(y_test, y_pred)
        print(f"Accuracy: {accuracy}")

        # Logging metrics and parameters
        mlflow.log_metric("accuracy", accuracy)
        for key, val in grid_search.best_params_.items():
            mlflow.log_param(key, val)

        # Log confusion matrix and classification report as artifacts
        os.makedirs("artifacts", exist_ok=True)
        with open("artifacts/confusion_matrix.txt", "w") as f:
            f.write(str(confusion_matrix(y_test, y_pred)))
        with open("artifacts/classification_report.txt", "w") as f:
            f.write(classification_report(y_test, y_pred))

        mlflow.log_artifact("artifacts/confusion_matrix.txt")
        mlflow.log_artifact("artifacts/classification_report.txt")

        # Model logging
        if urlparse(mlflow.get_tracking_uri()).scheme != "file":
            mlflow.sklearn.log_model(
                best_model, "model", registered_model_name="Best_Model"
            )
        else:
            mlflow.sklearn.log_model(best_model, "model", signature=signature)

        # Save locally
        os.makedirs(os.path.dirname(model_path), exist_ok=True)
        with open(model_path, "wb") as f:
            pickle.dump(best_model, f)
        print(f"Model saved to {model_path}")


if __name__ == "__main__":
    params = yaml.safe_load(open("../params.yaml"))["train"]
    train(params["data"], params["model"])

과 같은 식으로 구성하였다. 참고한 유데미 강의의 코드와 dagshub연동 부분이 다르고 경로의 차이가 조금 있다.

이 코드를 실행하게 되면 models/디렉토리에 pkl 파일이 생성됨을 볼 수 있다.

코드를 실행하면

...
[CV] END max_depth=None, min_samples_leaf=1, min_samples_split=2, n_estimators=200; total time=   1.2s
[CV] END max_depth=None, min_samples_leaf=2, min_samples_split=2, n_estimators=200; total time=   0.7s
[CV] END max_depth=None, min_samples_leaf=2, min_samples_split=5, n_estimators=200; total time=   0.5s
[CV] END max_depth=None, min_samples_leaf=2, min_samples_split=5, n_estimators=200; total time=   0.6s
[CV] END max_depth=None, min_samples_leaf=2, min_samples_split=5, n_estimators=200; total time=   0.5s
Accuracy: 0.7532467532467533

이런 식으로 Accuracy가 출력되는데, 돌아가서 dagshub의 experiment에 가보면

이런 식으로 똑같이 기록된다.

그러면 다음으로 evaluate.py

import os
import pickle
from urllib.parse import urlparse

import dagshub
import mlflow
import pandas as pd
import yaml
from sklearn.metrics import accuracy_score

# Initialize dagshub integration
dagshub.init(repo_owner="maximizemaxwell", repo_name="MLOps-pipeline", mlflow=True)
# Get absolute path to params.yaml (in project root)
params_path = os.path.join(os.path.dirname(__file__), "..", "params.yaml")
params = yaml.safe_load(open(params_path))["train"]

# Full data/model path
data_path = os.path.join(os.path.dirname(__file__), "..", params["data"])
model_path = os.path.join(os.path.dirname(__file__), "..", params["model"])


def evaluate(data_path, model_path):
    # Column names (based on PIMA dataset)
    cols = [
        "Pregnancies",
        "Glucose",
        "BloodPressure",
        "SkinThickness",
        "Insulin",
        "BMI",
        "DiabetesPedigreeFunction",
        "Age",
        "Outcome",
    ]
    data = pd.read_csv(data_path, names=cols, header=None)
    X = data.drop(columns=["Outcome"])
    y = data["Outcome"]

    # Use your own DagsHub tracking URI
    mlflow.set_tracking_uri("https://dagshub.com/maximizemaxwell/MLOps-pipeline.mlflow")

    model = pickle.load(open(model_path, "rb"))

    predictions = model.predict(X)
    accuracy = accuracy_score(y, predictions)

    # Log to MLflow
    mlflow.log_metric("eval_accuracy", accuracy)
    print(f"Model accuracy: {accuracy:.4f}")


if __name__ == "__main__":
    evaluate(data_path, model_path)

이런 식으로 쓸 수 있겠고,

  warnings.warn(
Model accuracy: 0.9505
🏃 View run clumsy-goose-362 at: https://dagshub.com/maximizemaxwell/MLOps-pipeline.mlflow/#/experiments/0/runs/ddc3b5707ec448ec8c465d41a0956487
🧪 View experiment at: https://dagshub.com/maximizemaxwell/MLOps-pipeline.mlflow/#/experiments/0

실행결과를 확인할 수 있다.

다시 dagshub으로 돌아가서

evaluate 실험도 여기서 accuracy를 확인할 수 있다.

MLFLOW with AWS

지금까지는 실험 추적과 버전 관리를 dagshub에 올렸으나, aws 클라우드에 데이터사이언스 프로젝트를 호스팅하여 똑같이 실험과정을 진행하고 싶을 수 있다.

실험 추적은 S3 버킷에 기록되고, 이것은 다시 aws EC2 인스턴스로 들어간다. 따라서 아래와 같은 app.py를 작성할 수 있다.

import os
import sys

import pandas as pd
import numpy as np
from sklearn.metrics import mean_squared_error, mean_absolute_error, r2_score
from sklearn.model_selection import train_test_split
from sklearn.linear_model import ElasticNet
from urllib.parse import urlparse
import mlflow
from mlflow.models.signature import infer_signature
import mlflow.sklearn 

import logging

import os

os.environ["MLFLOW_TRACKING_URI"]="http://ec2-54-158-152-207.compute-1.amazonaws.com:5000/"

logging.basicConfig(level=logging.WARN)
logger = logging.getLogger(__name__)

def eval_metrics(actual,pred):
    rmse=np.sqrt(mean_squared_error(actual,pred))
    mae=mean_absolute_error(actual,pred)
    r2=r2_score(actual,pred)
    return rmse,mae,r2


if __name__=="__main__":

    ## Data Ingestion-Reading the dataset-- wine quality dataset

    csv_url=(
        "https://raw.githubusercontent.com/mlflow/mlflow/master/tests/datasets/winequality-red.csv"
    )


    try:
        data=pd.read_csv(csv_url,sep=";")
    except Exception as e:
        logger.exception("Unable to download the data")

    # Split the data in train and test

    train,test=train_test_split(data)

    train_x = train.drop(["quality"], axis=1)
    test_x = test.drop(["quality"], axis=1)
    train_y = train[["quality"]]
    test_y = test[["quality"]]

    alpha = float(sys.argv[1]) if len(sys.argv) > 1 else 0.5
    l1_ratio = float(sys.argv[2]) if len(sys.argv) > 2 else 0.5

    with mlflow.start_run():
        lr=ElasticNet(alpha=alpha,l1_ratio=l1_ratio,random_state=42)
        lr.fit(train_x,train_y)

        predicted_qualities = lr.predict(test_x)
        (rmse, mae, r2) = eval_metrics(test_y, predicted_qualities)

        print("Elasticnet model (alpha={:f}, l1_ratio={:f}):".format(alpha, l1_ratio))
        print("  RMSE: %s" % rmse)
        print("  MAE: %s" % mae)
        print("  R2: %s" % r2)

        mlflow.log_param("alpha", alpha)
        mlflow.log_param("l1_ratio", l1_ratio)
        mlflow.log_metric("rmse", rmse)
        mlflow.log_metric("r2", r2)
        mlflow.log_metric("mae", mae)


        ## For the remote server AWS we need to do the setup

        remote_server_uri="http://ec2-54-158-152-207.compute-1.amazonaws.com:5000/"
        mlflow.set_tracking_uri(remote_server_uri)

        tracking_url_type_store = urlparse(mlflow.get_tracking_uri()).scheme


        if tracking_url_type_store!="file":
            mlflow.sklearn.log_model(
                lr,"model",registered_model_name="ElasticnetWineModel"
            )
        else:
            mlflow.sklearn.log_model(lr,"model")
9 views

More from this blog

락프리 데이터 구조와 알고리즘

여기서는 락프리 데이터 구조를 설명한다. 락프리(lock-free) 란 배타락을 이용하지 않고 처리를 수행하는 데이터 구조 및 그에 대한 조작 알고리즘을 총칭한다. 왜 락프리인가? 전통적인 동시성 제어 방법인 뮤텍스나 세마포어는 여러 문제점을 가지고 있다: 성능 저하: 락 경합(lock contention)으로 인한 대기 시간 데드락: 여러 스레드가 서로의 락을 기다리는 상황 우선순위 역전: 낮은 우선순위 스레드가 높은 우선순위 스레드를 ...

Jul 27, 20257 min read126

소프트웨어 트랜잭셔널 메모리

소프트웨어 트랜잭셔널 메모리 동시성 프로그래밍에서 공유 자원에 대한 안전한 접근은 항상 중요한 과제다. 전통적으로 뮤텍스 락과 같은 비관적 락(Negative Lock) 방식을 사용해왔다. 이 방식은 크리티컬 섹션에 진입하기 전에 반드시 락을 획득해야 하며, 락을 얻지 못하면 코드 실행 자체가 블록된다. 하지만 이와는 다른 접근 방식이 있다. 바로 낙관적 락(Optimistic Lock) 방식인데, 이는 "일단 실행하고 나중에 검증하자"는 철학...

Jul 20, 202517 min read263

공평한 배타 제어

공평한 배타 제어 여기서는 공평한 배타 제어에 대해 설명한다. 먼저 컨텐션(contention) 이라는 개념을 이해할 필요가 있다. 컨텐션이란 여러 스레드가 동시에 같은 락을 획득하려고 경쟁하는 상황을 말한다. 컨텐션이 높을수록 스레드들이 락을 기다리는 시간이 길어지고 성능이 저하된다. 이러한 컨텐션 상황은 시스템 아키텍처에 따라 더욱 복잡해질 수 있다. 특히 비균일 메모리 접근(Non-Uniform Memory Access, NUMA) 와 같...

Jul 13, 20259 min read21

KernelSnitch[논문 리뷰]

Paper 1. Intro 이 글은 NDSS 2025에서 발표된 KernelSnitch 논문을 소개이다. 이 연구는 커널의 평범한 데이터 구조체들이 가진 본질적인 특성이 어떻게 심각한 보안 취약점이 되는지를 보여준다. 핵심은 이러하다: "데이터 구조체의 크기에 따른 접근 시간 차이를 이용해 커널의 비밀 정보를 유출할 수 있다" 여기서는 커널 힙 포인터 유출에 집중해서 설명한다. 이 공격이 성공하면 KASLR을 우회하고 더 심각한 커널 익스플로...

Jul 11, 20257 min read131

멀티태스크와 액터 모델

멀티태스크 협조적/비협조적 멀티태스크 선점: 프로세스와의 협조 없이 수행하는 컨택스트 스위칭이라고는 하나, 결국 뺏어오는 게 가능하냐의 문제다. 협조적 멀티태스크(비선점형, cooperative): 각각의 프로세스가 자발적으로 컨택스트 스위칭을 수행하는 멀티태스크 방식. 장점: 멀티태스크 매커니즘을 구현하기 쉽다. 단점: 프로세스가 자발적으로 컨텍스트 스위칭을 해야하는데, 만약 버그가 발생하여 프로세스가 무한 루프에 빠지거나 정지하게 되면 그 ...

Jul 6, 20252 min read25
M

MaxLog

35 posts