# MLFLOW Pipeline and Intro to aws

# MLFLOW Pipeline with DVC, MLFLOW, Dagshub

여기서는 MLFLOW, Dagshub, DVC를 이용해 모델을 트랙킹하는 것을 실제로 진행하는 과정을 보여준다. 우선 결론부터

```css

├── data
│   ├── processed
│   │   └── data.csv
│   └── raw
│       ├── data.csv
│       └── data.csv.dvc
├── dvc.lock
├── dvc.yaml
├── models
│   └── model.pkl
├── params.yaml
├── README.md
├── requirements.txt
└── src
    ├── evaluate.py
    ├── __init__.py
    ├── preprocess.py
    └── train.py

6 directories, 13 files
```

이러한 디렉토리 구조의 프로그램을 작성하고 이를 관리한다.

우선 `params.yaml`에 `preprocess` 관련 데이터셋과 `train`의 데이터셋과 파라미터를 집어넣는다.

결과적으로

```yaml
preprocess:
  input: data/raw/data.csv
  output: data/processed/data.csv

train:
  data: data/raw/data.csv
  model: models/model.pkl
  random_state: 42
  n_estimators: 100
  max_depth: 5
```

이러한 형태를 바탕으로 하고, 이 `yaml` 파일을 읽어와서 이후의 전처리와 학습에 사용하게 된다.

따라서 [`preprocess.py`](http://preprocess.py) 파일에서

```python
import pandas as pd
import sys
import yaml
import os

## Load parameters from param.yaml

params=yaml.safe_load(open("params.yaml"))['preprocess']

def preprocess(input_path,output_path):
    data=pd.read_csv(input_path)
    
    os.makedirs(os.path.dirname(output_path),exist_ok=True)
    data.to_csv(output_path,header=None,index=False)
    print(f"Preprocesses data saved to {output_path}")

if __name__=="__main__":
    preprocess(params["input"],params["output"])
```

이러한 식으로 `yaml`을 읽어와서 전처리하게 된다. 이 파일을 실행하면 처음의 전체 디렉토리 구조에서 봤던 `preprocessed/data.csv`가 생성된다.

데이터의 전처리가 끝났다면 다음은 모델 차례다.

모델은 `train`과 `evaluate` 두 과정으로 진행되는데

[`train.py`](http://train.py)는

```python
import os
import pickle
from urllib.parse import urlparse

import dagshub
import mlflow
import pandas as pd
import yaml
from mlflow.models import infer_signature
from sklearn.ensemble import RandomForestClassifier
from sklearn.metrics import accuracy_score, classification_report, confusion_matrix
from sklearn.model_selection import GridSearchCV, train_test_split

# Initialize dagshub integration
dagshub.init(repo_owner="maximizemaxwell", repo_name="MLOps-pipeline", mlflow=True)


def hyperparameter_tuning(X_train, y_train, param_grid):
    rf = RandomForestClassifier()
    grid_search = GridSearchCV(
        estimator=rf, param_grid=param_grid, cv=3, n_jobs=-1, verbose=2
    )
    grid_search.fit(X_train, y_train)
    return grid_search


def train(data_path: str, model_path: str):
    # Load data
    data = pd.read_csv(data_path, header=None)
    X = data.iloc[:, :-1]
    y = data.iloc[:, -1]

    # Set MLflow URI (redundant with dagshub.init, can skip if you trust dagshub setup)
    mlflow.set_tracking_uri("https://dagshub.com/maximizemaxwell/MLOps-pipeline.mlflow")

    with mlflow.start_run():
        # Train-test split
        X_train, X_test, y_train, y_test = train_test_split(
            X, y, test_size=0.2, random_state=42
        )
        signature = infer_signature(X_train, y_train)

        # Define hyperparameter grid
        param_grid = {
            "n_estimators": [100, 200],
            "max_depth": [5, 10, None],
            "min_samples_split": [2, 5],
            "min_samples_leaf": [1, 2],
        }

        # Hyperparameter tuning
        grid_search = hyperparameter_tuning(X_train, y_train, param_grid)
        best_model = grid_search.best_estimator_

        # Evaluation
        y_pred = best_model.predict(X_test)
        accuracy = accuracy_score(y_test, y_pred)
        print(f"Accuracy: {accuracy}")

        # Logging metrics and parameters
        mlflow.log_metric("accuracy", accuracy)
        for key, val in grid_search.best_params_.items():
            mlflow.log_param(key, val)

        # Log confusion matrix and classification report as artifacts
        os.makedirs("artifacts", exist_ok=True)
        with open("artifacts/confusion_matrix.txt", "w") as f:
            f.write(str(confusion_matrix(y_test, y_pred)))
        with open("artifacts/classification_report.txt", "w") as f:
            f.write(classification_report(y_test, y_pred))

        mlflow.log_artifact("artifacts/confusion_matrix.txt")
        mlflow.log_artifact("artifacts/classification_report.txt")

        # Model logging
        if urlparse(mlflow.get_tracking_uri()).scheme != "file":
            mlflow.sklearn.log_model(
                best_model, "model", registered_model_name="Best_Model"
            )
        else:
            mlflow.sklearn.log_model(best_model, "model", signature=signature)

        # Save locally
        os.makedirs(os.path.dirname(model_path), exist_ok=True)
        with open(model_path, "wb") as f:
            pickle.dump(best_model, f)
        print(f"Model saved to {model_path}")


if __name__ == "__main__":
    params = yaml.safe_load(open("../params.yaml"))["train"]
    train(params["data"], params["model"])
```

과 같은 식으로 구성하였다. 참고한 유데미 강의의 코드와 dagshub연동 부분이 다르고 경로의 차이가 조금 있다.

이 코드를 실행하게 되면 `models/`디렉토리에 `pkl` 파일이 생성됨을 볼 수 있다.

코드를 실행하면

```txt
...
[CV] END max_depth=None, min_samples_leaf=1, min_samples_split=2, n_estimators=200; total time=   1.2s
[CV] END max_depth=None, min_samples_leaf=2, min_samples_split=2, n_estimators=200; total time=   0.7s
[CV] END max_depth=None, min_samples_leaf=2, min_samples_split=5, n_estimators=200; total time=   0.5s
[CV] END max_depth=None, min_samples_leaf=2, min_samples_split=5, n_estimators=200; total time=   0.6s
[CV] END max_depth=None, min_samples_leaf=2, min_samples_split=5, n_estimators=200; total time=   0.5s
Accuracy: 0.7532467532467533
```

이런 식으로 Accuracy가 출력되는데, 돌아가서 dagshub의 experiment에 가보면

![](https://i.imgur.com/kvNIk1Q.png align="left")

이런 식으로 똑같이 기록된다.

그러면 다음으로 [`evaluate.py`](http://evaluate.py)는

```python
import os
import pickle
from urllib.parse import urlparse

import dagshub
import mlflow
import pandas as pd
import yaml
from sklearn.metrics import accuracy_score

# Initialize dagshub integration
dagshub.init(repo_owner="maximizemaxwell", repo_name="MLOps-pipeline", mlflow=True)
# Get absolute path to params.yaml (in project root)
params_path = os.path.join(os.path.dirname(__file__), "..", "params.yaml")
params = yaml.safe_load(open(params_path))["train"]

# Full data/model path
data_path = os.path.join(os.path.dirname(__file__), "..", params["data"])
model_path = os.path.join(os.path.dirname(__file__), "..", params["model"])


def evaluate(data_path, model_path):
    # Column names (based on PIMA dataset)
    cols = [
        "Pregnancies",
        "Glucose",
        "BloodPressure",
        "SkinThickness",
        "Insulin",
        "BMI",
        "DiabetesPedigreeFunction",
        "Age",
        "Outcome",
    ]
    data = pd.read_csv(data_path, names=cols, header=None)
    X = data.drop(columns=["Outcome"])
    y = data["Outcome"]

    # Use your own DagsHub tracking URI
    mlflow.set_tracking_uri("https://dagshub.com/maximizemaxwell/MLOps-pipeline.mlflow")

    model = pickle.load(open(model_path, "rb"))

    predictions = model.predict(X)
    accuracy = accuracy_score(y, predictions)

    # Log to MLflow
    mlflow.log_metric("eval_accuracy", accuracy)
    print(f"Model accuracy: {accuracy:.4f}")


if __name__ == "__main__":
    evaluate(data_path, model_path)
```

이런 식으로 쓸 수 있겠고,

```txt
  warnings.warn(
Model accuracy: 0.9505
🏃 View run clumsy-goose-362 at: https://dagshub.com/maximizemaxwell/MLOps-pipeline.mlflow/#/experiments/0/runs/ddc3b5707ec448ec8c465d41a0956487
🧪 View experiment at: https://dagshub.com/maximizemaxwell/MLOps-pipeline.mlflow/#/experiments/0
```

실행결과를 확인할 수 있다.

다시 dagshub으로 돌아가서

![](https://i.imgur.com/jJa9PnB.png align="left")

evaluate 실험도 여기서 accuracy를 확인할 수 있다.

# MLFLOW with AWS

지금까지는 실험 추적과 버전 관리를 dagshub에 올렸으나, aws 클라우드에 데이터사이언스 프로젝트를 호스팅하여 똑같이 실험과정을 진행하고 싶을 수 있다.

실험 추적은 `S3 버킷`에 기록되고, 이것은 다시 `aws EC2` 인스턴스로 들어간다. 따라서 아래와 같은 [`app.py`](http://app.py)를 작성할 수 있다.

```python
import os
import sys

import pandas as pd
import numpy as np
from sklearn.metrics import mean_squared_error, mean_absolute_error, r2_score
from sklearn.model_selection import train_test_split
from sklearn.linear_model import ElasticNet
from urllib.parse import urlparse
import mlflow
from mlflow.models.signature import infer_signature
import mlflow.sklearn 

import logging

import os

os.environ["MLFLOW_TRACKING_URI"]="http://ec2-54-158-152-207.compute-1.amazonaws.com:5000/"

logging.basicConfig(level=logging.WARN)
logger = logging.getLogger(__name__)

def eval_metrics(actual,pred):
    rmse=np.sqrt(mean_squared_error(actual,pred))
    mae=mean_absolute_error(actual,pred)
    r2=r2_score(actual,pred)
    return rmse,mae,r2


if __name__=="__main__":

    ## Data Ingestion-Reading the dataset-- wine quality dataset

    csv_url=(
        "https://raw.githubusercontent.com/mlflow/mlflow/master/tests/datasets/winequality-red.csv"
    )
    

    try:
        data=pd.read_csv(csv_url,sep=";")
    except Exception as e:
        logger.exception("Unable to download the data")

    # Split the data in train and test

    train,test=train_test_split(data)

    train_x = train.drop(["quality"], axis=1)
    test_x = test.drop(["quality"], axis=1)
    train_y = train[["quality"]]
    test_y = test[["quality"]]

    alpha = float(sys.argv[1]) if len(sys.argv) > 1 else 0.5
    l1_ratio = float(sys.argv[2]) if len(sys.argv) > 2 else 0.5

    with mlflow.start_run():
        lr=ElasticNet(alpha=alpha,l1_ratio=l1_ratio,random_state=42)
        lr.fit(train_x,train_y)

        predicted_qualities = lr.predict(test_x)
        (rmse, mae, r2) = eval_metrics(test_y, predicted_qualities)

        print("Elasticnet model (alpha={:f}, l1_ratio={:f}):".format(alpha, l1_ratio))
        print("  RMSE: %s" % rmse)
        print("  MAE: %s" % mae)
        print("  R2: %s" % r2)

        mlflow.log_param("alpha", alpha)
        mlflow.log_param("l1_ratio", l1_ratio)
        mlflow.log_metric("rmse", rmse)
        mlflow.log_metric("r2", r2)
        mlflow.log_metric("mae", mae)


        ## For the remote server AWS we need to do the setup

        remote_server_uri="http://ec2-54-158-152-207.compute-1.amazonaws.com:5000/"
        mlflow.set_tracking_uri(remote_server_uri)

        tracking_url_type_store = urlparse(mlflow.get_tracking_uri()).scheme


        if tracking_url_type_store!="file":
            mlflow.sklearn.log_model(
                lr,"model",registered_model_name="ElasticnetWineModel"
            )
        else:
            mlflow.sklearn.log_model(lr,"model")
```
