Intro to ETL Pipeline
Intro to ETL Pipeline
우선 ETL은 약자인데 무엇의 약자이냐면
ETL:
Extract
Transform
Load 의 약자이다.
그래서 ETL 파이프라인이 뭐냐고 하면, ETL 파이프라인은 데이터 파이프라인의 일부이다.
그럼 다시 데이터 파이프라인이란 무엇인가? 데이터 파이프라인이란 다양한 데이터 소스에서 원시 데이터를 수집하고 변환한 다음 분석을 위해 데이터 레이크나 데이터 웨어하우스같은 데이터 저장소로 이식하는 방법이다.
ETL Pipeline
Astro cli
Astro cli는 Airflow 세팅을 위한 일종의 프레임워크라고 보면 될 듯하다. 이 툴을 이용해서 초기화하면 웹 프레임워크 (리액트나 뷰처럼) 테스트부터 세팅에 필요한 기초적인 디렉토리 구조와 일부 코드들이 제공된다.
리눅스 환경에서 설치했으므로
curl -sSL install.astronomer.io | sudo bash -s
와 같이 설치해주었다.
설치 후에 초기화를 위해
~/coding/MLOps_study/week9 on main ?2 ────── max-env at 21:19:21 ─╮
❯ astro dev init ─╯
Initialized empty Astro project in /home/max/coding/MLOps_study/week9
astro dev init 을 진행해준다.
이를 진행해주면

위 사진과 같은 디렉토리 구조가 형성된다.
참고하는 강의(Complete MLOps Bootcamp With 10+ End to End ML Projects)에서 강사는
여담
API -> Transformation(python code) -> (JSON 포맷) -> LOAD -> DB 의 파이프라인을 소개하고 있고 본 강의에서는 DB를 컨테이너애서 사용하고자 하나, 한 가지 덧붙이자면 실제 DB를 운영하고자 한다면 도커 컨테이너는 그닥 좋은 선택은 아니다.
파이프라인을 테스트하는 용도로는 괜찮으나 도커 컨테이너는 기본적으로 데이터를 지속시키고 안정적으로 유지하기보단 분리된 환경의 프로그램을 틀을 이용해 빠르게 찍어내고 운영할 수 있는 것에 가깝다.
애초에 바인드 마운트를 쓰지 않는 이상 컨테이너가 삭제되면 데이터는 날아가고, --rm 옵션을 붙였다면 컨테이너는 중지되기만 해도 날아가는 가변적인 것이니...게다가 로컬에 의존하게 되는 바인드 마운트를 테스트 밖의 범위에서 쓰는 것도 바람직하지 않으니 실 운영에서는 DB서버를 컨테이너가 아닌 따로 두는 것을 권장하는 편이며, 현재도 그렇게 하고 있다. (성능 이슈도 있고)
그러나 학습이나 테스트용으로 여러가지 애플리케이션의 상호작용을 테스트하고자 한다면 컨테이너와 도커 컴포즈는 꽤 괜찮은 선택이다.
Airflow Hooks
Airflow에서 Hook이란 커스텀하게 로직을 구현하거나 외부 시스템의 푸시 작업 흐름을 만들 때 사용하는 것이다. Webhook에서의 hook과 의미적으로 크게 다르지는 않아보인다.
강의에서는 Postgres 접근을 위한 훅을 사용하는데 Mysql에서도 가능하다.
따로 더 말할 것은 없고 그냥 쉽게 접근하게 해주는 무언가라서
from airflow.providers.postgres.hooks.postgres import PostgresHook
이 친구를 잘 써주면 되는것이다.
파이프라인
파이프라인을 요약하자면
테이블이 없으면 만들고
API로 데이터를 추출해오고(추출 파이프라인, 여기서는 NASA API 사용)
필요한 정보로 적절히 변환해주고
DB로 적재해준다.(여기서는 적재 공간으로 컨테이너에서 구르는 postgres 사용)
그리고 데이터를 검증해주고
마지막으로 태스크 의존성을 검증하는 과정이다.
코드로 본다면
from airflow import DAG
from airflow.providers.http.operators.http import SimpleHttpOperator
from airflow.decorators import task
from airflow.providers.postgres.hooks.postgres import PostgresHook
from airflow.utils.dates import days_ago
import json
## Define the DAG
with DAG(
dag_id='nasa_apod_postgres',
start_date=days_ago(1),
schedule_interval='@daily',
catchup=False
) as dag:
## step 1: Create the table if it doesnt exists
@task
def create_table():
## initialize the Postgreshook
postgres_hook=PostgresHook(postgres_conn_id="my_postgres_connection")
## SQL query to create the table
create_table_query="""
CREATE TABLE IF NOT EXISTS apod_data (
id SERIAL PRIMARY KEY,
title VARCHAR(255),
explanation TEXT,
url TEXT,
date DATE,
media_type VARCHAR(50)
);
"""
## Execute the table creation query
postgres_hook.run(create_table_query)
## Step 2: Extract the NASA API Data(APOD)-Astronomy Picture of the Day[Extract pipeline]
## https://api.nasa.gov/planetary/apod?api_key=7BbRvxo8uuzas9U3ho1RwHQQCkZIZtJojRIr293q
extract_apod=SimpleHttpOperator(
task_id='extract_apod',
http_conn_id='nasa_api', ## Connection ID Defined In Airflow For NASA API
endpoint='planetary/apod', ## NASA API enpoint for APOD
method='GET',
data={"api_key":"{{ conn.nasa_api.extra_dejson.api_key}}"}, ## USe the API Key from the connection
response_filter=lambda response:response.json(), ## Convert response to json
)
## Step 3: Transform the data(Pick the information that i need to save)
@task
def transform_apod_data(response):
apod_data={
'title': response.get('title', ''),
'explanation': response.get('explanation', ''),
'url': response.get('url', ''),
'date': response.get('date', ''),
'media_type': response.get('media_type', '')
}
return apod_data
## step 4: Load the data into Postgres SQL
@task
def load_data_to_postgres(apod_data):
## Initialize the PostgresHook
postgres_hook=PostgresHook(postgres_conn_id='my_postgres_connection')
## Define the SQL Insert Query
insert_query = """
INSERT INTO apod_data (title, explanation, url, date, media_type)
VALUES (%s, %s, %s, %s, %s);
"""
## Execute the SQL Query
postgres_hook.run(insert_query,parameters=(
apod_data['title'],
apod_data['explanation'],
apod_data['url'],
apod_data['date'],
apod_data['media_type']
))
## step 5: Verify the data DBViewer
## step 6: Define the task dependencies
## Extract
create_table() >> extract_apod ## Ensure the table is create befor extraction
api_response=extract_apod.output
## Transform
transformed_data=transform_apod_data(api_response)
## Load
load_data_to_postgres(transformed_data)
이러한 상황이다.
이 코드는 astro cli를 이용해 구축해준 디렉토리에서 dags/etl.py에 위치하도록 두면 된다.
이 과정도 마찬가지로 AWS EC2에 배포할 수 있다.

