MLFLOW Pipeline and Intro to aws
MLFLOW Pipeline with DVC, MLFLOW, Dagshub
여기서는 MLFLOW, Dagshub, DVC를 이용해 모델을 트랙킹하는 것을 실제로 진행하는 과정을 보여준다. 우선 결론부터
├── data
│ ├── processed
│ │ └── data.csv
│ └── raw
│ ├── data.csv
│ └── data.csv.dvc
├── dvc.lock
├── dvc.yaml
├── models
│ └── model.pkl
├── params.yaml
├── README.md
├── requirements.txt
└── src
├── evaluate.py
├── __init__.py
├── preprocess.py
└── train.py
6 directories, 13 files
이러한 디렉토리 구조의 프로그램을 작성하고 이를 관리한다.
우선 params.yaml에 preprocess 관련 데이터셋과 train의 데이터셋과 파라미터를 집어넣는다.
결과적으로
preprocess:
input: data/raw/data.csv
output: data/processed/data.csv
train:
data: data/raw/data.csv
model: models/model.pkl
random_state: 42
n_estimators: 100
max_depth: 5
이러한 형태를 바탕으로 하고, 이 yaml 파일을 읽어와서 이후의 전처리와 학습에 사용하게 된다.
따라서 preprocess.py 파일에서
import pandas as pd
import sys
import yaml
import os
## Load parameters from param.yaml
params=yaml.safe_load(open("params.yaml"))['preprocess']
def preprocess(input_path,output_path):
data=pd.read_csv(input_path)
os.makedirs(os.path.dirname(output_path),exist_ok=True)
data.to_csv(output_path,header=None,index=False)
print(f"Preprocesses data saved to {output_path}")
if __name__=="__main__":
preprocess(params["input"],params["output"])
이러한 식으로 yaml을 읽어와서 전처리하게 된다. 이 파일을 실행하면 처음의 전체 디렉토리 구조에서 봤던 preprocessed/data.csv가 생성된다.
데이터의 전처리가 끝났다면 다음은 모델 차례다.
모델은 train과 evaluate 두 과정으로 진행되는데
import os
import pickle
from urllib.parse import urlparse
import dagshub
import mlflow
import pandas as pd
import yaml
from mlflow.models import infer_signature
from sklearn.ensemble import RandomForestClassifier
from sklearn.metrics import accuracy_score, classification_report, confusion_matrix
from sklearn.model_selection import GridSearchCV, train_test_split
# Initialize dagshub integration
dagshub.init(repo_owner="maximizemaxwell", repo_name="MLOps-pipeline", mlflow=True)
def hyperparameter_tuning(X_train, y_train, param_grid):
rf = RandomForestClassifier()
grid_search = GridSearchCV(
estimator=rf, param_grid=param_grid, cv=3, n_jobs=-1, verbose=2
)
grid_search.fit(X_train, y_train)
return grid_search
def train(data_path: str, model_path: str):
# Load data
data = pd.read_csv(data_path, header=None)
X = data.iloc[:, :-1]
y = data.iloc[:, -1]
# Set MLflow URI (redundant with dagshub.init, can skip if you trust dagshub setup)
mlflow.set_tracking_uri("https://dagshub.com/maximizemaxwell/MLOps-pipeline.mlflow")
with mlflow.start_run():
# Train-test split
X_train, X_test, y_train, y_test = train_test_split(
X, y, test_size=0.2, random_state=42
)
signature = infer_signature(X_train, y_train)
# Define hyperparameter grid
param_grid = {
"n_estimators": [100, 200],
"max_depth": [5, 10, None],
"min_samples_split": [2, 5],
"min_samples_leaf": [1, 2],
}
# Hyperparameter tuning
grid_search = hyperparameter_tuning(X_train, y_train, param_grid)
best_model = grid_search.best_estimator_
# Evaluation
y_pred = best_model.predict(X_test)
accuracy = accuracy_score(y_test, y_pred)
print(f"Accuracy: {accuracy}")
# Logging metrics and parameters
mlflow.log_metric("accuracy", accuracy)
for key, val in grid_search.best_params_.items():
mlflow.log_param(key, val)
# Log confusion matrix and classification report as artifacts
os.makedirs("artifacts", exist_ok=True)
with open("artifacts/confusion_matrix.txt", "w") as f:
f.write(str(confusion_matrix(y_test, y_pred)))
with open("artifacts/classification_report.txt", "w") as f:
f.write(classification_report(y_test, y_pred))
mlflow.log_artifact("artifacts/confusion_matrix.txt")
mlflow.log_artifact("artifacts/classification_report.txt")
# Model logging
if urlparse(mlflow.get_tracking_uri()).scheme != "file":
mlflow.sklearn.log_model(
best_model, "model", registered_model_name="Best_Model"
)
else:
mlflow.sklearn.log_model(best_model, "model", signature=signature)
# Save locally
os.makedirs(os.path.dirname(model_path), exist_ok=True)
with open(model_path, "wb") as f:
pickle.dump(best_model, f)
print(f"Model saved to {model_path}")
if __name__ == "__main__":
params = yaml.safe_load(open("../params.yaml"))["train"]
train(params["data"], params["model"])
과 같은 식으로 구성하였다. 참고한 유데미 강의의 코드와 dagshub연동 부분이 다르고 경로의 차이가 조금 있다.
이 코드를 실행하게 되면 models/디렉토리에 pkl 파일이 생성됨을 볼 수 있다.
코드를 실행하면
...
[CV] END max_depth=None, min_samples_leaf=1, min_samples_split=2, n_estimators=200; total time= 1.2s
[CV] END max_depth=None, min_samples_leaf=2, min_samples_split=2, n_estimators=200; total time= 0.7s
[CV] END max_depth=None, min_samples_leaf=2, min_samples_split=5, n_estimators=200; total time= 0.5s
[CV] END max_depth=None, min_samples_leaf=2, min_samples_split=5, n_estimators=200; total time= 0.6s
[CV] END max_depth=None, min_samples_leaf=2, min_samples_split=5, n_estimators=200; total time= 0.5s
Accuracy: 0.7532467532467533
이런 식으로 Accuracy가 출력되는데, 돌아가서 dagshub의 experiment에 가보면

이런 식으로 똑같이 기록된다.
그러면 다음으로 evaluate.py는
import os
import pickle
from urllib.parse import urlparse
import dagshub
import mlflow
import pandas as pd
import yaml
from sklearn.metrics import accuracy_score
# Initialize dagshub integration
dagshub.init(repo_owner="maximizemaxwell", repo_name="MLOps-pipeline", mlflow=True)
# Get absolute path to params.yaml (in project root)
params_path = os.path.join(os.path.dirname(__file__), "..", "params.yaml")
params = yaml.safe_load(open(params_path))["train"]
# Full data/model path
data_path = os.path.join(os.path.dirname(__file__), "..", params["data"])
model_path = os.path.join(os.path.dirname(__file__), "..", params["model"])
def evaluate(data_path, model_path):
# Column names (based on PIMA dataset)
cols = [
"Pregnancies",
"Glucose",
"BloodPressure",
"SkinThickness",
"Insulin",
"BMI",
"DiabetesPedigreeFunction",
"Age",
"Outcome",
]
data = pd.read_csv(data_path, names=cols, header=None)
X = data.drop(columns=["Outcome"])
y = data["Outcome"]
# Use your own DagsHub tracking URI
mlflow.set_tracking_uri("https://dagshub.com/maximizemaxwell/MLOps-pipeline.mlflow")
model = pickle.load(open(model_path, "rb"))
predictions = model.predict(X)
accuracy = accuracy_score(y, predictions)
# Log to MLflow
mlflow.log_metric("eval_accuracy", accuracy)
print(f"Model accuracy: {accuracy:.4f}")
if __name__ == "__main__":
evaluate(data_path, model_path)
이런 식으로 쓸 수 있겠고,
warnings.warn(
Model accuracy: 0.9505
🏃 View run clumsy-goose-362 at: https://dagshub.com/maximizemaxwell/MLOps-pipeline.mlflow/#/experiments/0/runs/ddc3b5707ec448ec8c465d41a0956487
🧪 View experiment at: https://dagshub.com/maximizemaxwell/MLOps-pipeline.mlflow/#/experiments/0
실행결과를 확인할 수 있다.
다시 dagshub으로 돌아가서

evaluate 실험도 여기서 accuracy를 확인할 수 있다.
MLFLOW with AWS
지금까지는 실험 추적과 버전 관리를 dagshub에 올렸으나, aws 클라우드에 데이터사이언스 프로젝트를 호스팅하여 똑같이 실험과정을 진행하고 싶을 수 있다.
실험 추적은 S3 버킷에 기록되고, 이것은 다시 aws EC2 인스턴스로 들어간다. 따라서 아래와 같은 app.py를 작성할 수 있다.
import os
import sys
import pandas as pd
import numpy as np
from sklearn.metrics import mean_squared_error, mean_absolute_error, r2_score
from sklearn.model_selection import train_test_split
from sklearn.linear_model import ElasticNet
from urllib.parse import urlparse
import mlflow
from mlflow.models.signature import infer_signature
import mlflow.sklearn
import logging
import os
os.environ["MLFLOW_TRACKING_URI"]="http://ec2-54-158-152-207.compute-1.amazonaws.com:5000/"
logging.basicConfig(level=logging.WARN)
logger = logging.getLogger(__name__)
def eval_metrics(actual,pred):
rmse=np.sqrt(mean_squared_error(actual,pred))
mae=mean_absolute_error(actual,pred)
r2=r2_score(actual,pred)
return rmse,mae,r2
if __name__=="__main__":
## Data Ingestion-Reading the dataset-- wine quality dataset
csv_url=(
"https://raw.githubusercontent.com/mlflow/mlflow/master/tests/datasets/winequality-red.csv"
)
try:
data=pd.read_csv(csv_url,sep=";")
except Exception as e:
logger.exception("Unable to download the data")
# Split the data in train and test
train,test=train_test_split(data)
train_x = train.drop(["quality"], axis=1)
test_x = test.drop(["quality"], axis=1)
train_y = train[["quality"]]
test_y = test[["quality"]]
alpha = float(sys.argv[1]) if len(sys.argv) > 1 else 0.5
l1_ratio = float(sys.argv[2]) if len(sys.argv) > 2 else 0.5
with mlflow.start_run():
lr=ElasticNet(alpha=alpha,l1_ratio=l1_ratio,random_state=42)
lr.fit(train_x,train_y)
predicted_qualities = lr.predict(test_x)
(rmse, mae, r2) = eval_metrics(test_y, predicted_qualities)
print("Elasticnet model (alpha={:f}, l1_ratio={:f}):".format(alpha, l1_ratio))
print(" RMSE: %s" % rmse)
print(" MAE: %s" % mae)
print(" R2: %s" % r2)
mlflow.log_param("alpha", alpha)
mlflow.log_param("l1_ratio", l1_ratio)
mlflow.log_metric("rmse", rmse)
mlflow.log_metric("r2", r2)
mlflow.log_metric("mae", mae)
## For the remote server AWS we need to do the setup
remote_server_uri="http://ec2-54-158-152-207.compute-1.amazonaws.com:5000/"
mlflow.set_tracking_uri(remote_server_uri)
tracking_url_type_store = urlparse(mlflow.get_tracking_uri()).scheme
if tracking_url_type_store!="file":
mlflow.sklearn.log_model(
lr,"model",registered_model_name="ElasticnetWineModel"
)
else:
mlflow.sklearn.log_model(lr,"model")

