Skip to main content

Command Palette

Search for a command to run...

Intro to ETL Pipeline

Updated
3 min read

Intro to ETL Pipeline

우선 ETL은 약자인데 무엇의 약자이냐면

ETL:

  • Extract

  • Transform

  • Load 의 약자이다.

그래서 ETL 파이프라인이 뭐냐고 하면, ETL 파이프라인은 데이터 파이프라인의 일부이다.

그럼 다시 데이터 파이프라인이란 무엇인가? 데이터 파이프라인이란 다양한 데이터 소스에서 원시 데이터를 수집하고 변환한 다음 분석을 위해 데이터 레이크데이터 웨어하우스같은 데이터 저장소로 이식하는 방법이다.

ETL Pipeline

Astro cli

Astro cli는 Airflow 세팅을 위한 일종의 프레임워크라고 보면 될 듯하다. 이 툴을 이용해서 초기화하면 웹 프레임워크 (리액트나 뷰처럼) 테스트부터 세팅에 필요한 기초적인 디렉토리 구조와 일부 코드들이 제공된다.

리눅스 환경에서 설치했으므로

curl -sSL install.astronomer.io | sudo bash -s

와 같이 설치해주었다.

설치 후에 초기화를 위해

  ~/coding/MLOps_study/week9 on   main ?2 ──────  max-env at  21:19:21 ─╮
❯ astro dev init                                                             ─╯
Initialized empty Astro project in /home/max/coding/MLOps_study/week9

astro dev init 을 진행해준다.

이를 진행해주면

위 사진과 같은 디렉토리 구조가 형성된다.

참고하는 강의(Complete MLOps Bootcamp With 10+ End to End ML Projects)에서 강사는

여담

API -> Transformation(python code) -> (JSON 포맷) -> LOAD -> DB 의 파이프라인을 소개하고 있고 본 강의에서는 DB를 컨테이너애서 사용하고자 하나, 한 가지 덧붙이자면 실제 DB를 운영하고자 한다면 도커 컨테이너는 그닥 좋은 선택은 아니다.

파이프라인을 테스트하는 용도로는 괜찮으나 도커 컨테이너는 기본적으로 데이터를 지속시키고 안정적으로 유지하기보단 분리된 환경의 프로그램을 틀을 이용해 빠르게 찍어내고 운영할 수 있는 것에 가깝다.

애초에 바인드 마운트를 쓰지 않는 이상 컨테이너가 삭제되면 데이터는 날아가고, --rm 옵션을 붙였다면 컨테이너는 중지되기만 해도 날아가는 가변적인 것이니...게다가 로컬에 의존하게 되는 바인드 마운트를 테스트 밖의 범위에서 쓰는 것도 바람직하지 않으니 실 운영에서는 DB서버를 컨테이너가 아닌 따로 두는 것을 권장하는 편이며, 현재도 그렇게 하고 있다. (성능 이슈도 있고)

그러나 학습이나 테스트용으로 여러가지 애플리케이션의 상호작용을 테스트하고자 한다면 컨테이너와 도커 컴포즈는 꽤 괜찮은 선택이다.

Airflow Hooks

Airflow에서 Hook이란 커스텀하게 로직을 구현하거나 외부 시스템의 푸시 작업 흐름을 만들 때 사용하는 것이다. Webhook에서의 hook과 의미적으로 크게 다르지는 않아보인다.

강의에서는 Postgres 접근을 위한 훅을 사용하는데 Mysql에서도 가능하다.

따로 더 말할 것은 없고 그냥 쉽게 접근하게 해주는 무언가라서

from airflow.providers.postgres.hooks.postgres import PostgresHook

이 친구를 잘 써주면 되는것이다.

파이프라인

파이프라인을 요약하자면

  1. 테이블이 없으면 만들고

  2. API로 데이터를 추출해오고(추출 파이프라인, 여기서는 NASA API 사용)

  3. 필요한 정보로 적절히 변환해주고

  4. DB로 적재해준다.(여기서는 적재 공간으로 컨테이너에서 구르는 postgres 사용)

  5. 그리고 데이터를 검증해주고

  6. 마지막으로 태스크 의존성을 검증하는 과정이다.

코드로 본다면

from airflow import DAG
from airflow.providers.http.operators.http import SimpleHttpOperator
from airflow.decorators import task
from airflow.providers.postgres.hooks.postgres import PostgresHook
from airflow.utils.dates import days_ago
import json


## Define the DAG
with DAG(
    dag_id='nasa_apod_postgres',
    start_date=days_ago(1),
    schedule_interval='@daily',
    catchup=False

) as dag:

    ## step 1: Create the table if it doesnt exists

    @task
    def create_table():
        ## initialize the Postgreshook
        postgres_hook=PostgresHook(postgres_conn_id="my_postgres_connection")

        ## SQL query to create the table
        create_table_query="""
        CREATE TABLE IF NOT EXISTS apod_data (
            id SERIAL PRIMARY KEY,
            title VARCHAR(255),
            explanation TEXT,
            url TEXT,
            date DATE,
            media_type VARCHAR(50)
        );


        """
        ## Execute the table creation query
        postgres_hook.run(create_table_query)


    ## Step 2: Extract the NASA API Data(APOD)-Astronomy Picture of the Day[Extract pipeline]
    ## https://api.nasa.gov/planetary/apod?api_key=7BbRvxo8uuzas9U3ho1RwHQQCkZIZtJojRIr293q
    extract_apod=SimpleHttpOperator(
        task_id='extract_apod',
        http_conn_id='nasa_api',  ## Connection ID Defined In Airflow For NASA API
        endpoint='planetary/apod', ## NASA API enpoint for APOD
        method='GET',
        data={"api_key":"{{ conn.nasa_api.extra_dejson.api_key}}"}, ## USe the API Key from the connection
        response_filter=lambda response:response.json(), ## Convert response to json
    )



    ## Step 3: Transform the data(Pick the information that i need to save)
    @task
    def transform_apod_data(response):
        apod_data={
            'title': response.get('title', ''),
            'explanation': response.get('explanation', ''),
            'url': response.get('url', ''),
            'date': response.get('date', ''),
            'media_type': response.get('media_type', '')

        }
        return apod_data


    ## step 4:  Load the data into Postgres SQL
    @task
    def load_data_to_postgres(apod_data):
        ## Initialize the PostgresHook
        postgres_hook=PostgresHook(postgres_conn_id='my_postgres_connection')

        ## Define the SQL Insert Query

        insert_query = """
        INSERT INTO apod_data (title, explanation, url, date, media_type)
        VALUES (%s, %s, %s, %s, %s);
        """

        ## Execute the SQL Query

        postgres_hook.run(insert_query,parameters=(
            apod_data['title'],
            apod_data['explanation'],
            apod_data['url'],
            apod_data['date'],
            apod_data['media_type']


        ))



    ## step 5: Verify the data DBViewer


    ## step 6: Define the task dependencies
    ## Extract
    create_table() >> extract_apod  ## Ensure the table is create befor extraction
    api_response=extract_apod.output
    ## Transform
    transformed_data=transform_apod_data(api_response)
    ## Load
    load_data_to_postgres(transformed_data)

이러한 상황이다.

이 코드는 astro cli를 이용해 구축해준 디렉토리에서 dags/etl.py에 위치하도록 두면 된다.

이 과정도 마찬가지로 AWS EC2에 배포할 수 있다.

9 views

More from this blog

락프리 데이터 구조와 알고리즘

여기서는 락프리 데이터 구조를 설명한다. 락프리(lock-free) 란 배타락을 이용하지 않고 처리를 수행하는 데이터 구조 및 그에 대한 조작 알고리즘을 총칭한다. 왜 락프리인가? 전통적인 동시성 제어 방법인 뮤텍스나 세마포어는 여러 문제점을 가지고 있다: 성능 저하: 락 경합(lock contention)으로 인한 대기 시간 데드락: 여러 스레드가 서로의 락을 기다리는 상황 우선순위 역전: 낮은 우선순위 스레드가 높은 우선순위 스레드를 ...

Jul 27, 20257 min read126

소프트웨어 트랜잭셔널 메모리

소프트웨어 트랜잭셔널 메모리 동시성 프로그래밍에서 공유 자원에 대한 안전한 접근은 항상 중요한 과제다. 전통적으로 뮤텍스 락과 같은 비관적 락(Negative Lock) 방식을 사용해왔다. 이 방식은 크리티컬 섹션에 진입하기 전에 반드시 락을 획득해야 하며, 락을 얻지 못하면 코드 실행 자체가 블록된다. 하지만 이와는 다른 접근 방식이 있다. 바로 낙관적 락(Optimistic Lock) 방식인데, 이는 "일단 실행하고 나중에 검증하자"는 철학...

Jul 20, 202517 min read263

공평한 배타 제어

공평한 배타 제어 여기서는 공평한 배타 제어에 대해 설명한다. 먼저 컨텐션(contention) 이라는 개념을 이해할 필요가 있다. 컨텐션이란 여러 스레드가 동시에 같은 락을 획득하려고 경쟁하는 상황을 말한다. 컨텐션이 높을수록 스레드들이 락을 기다리는 시간이 길어지고 성능이 저하된다. 이러한 컨텐션 상황은 시스템 아키텍처에 따라 더욱 복잡해질 수 있다. 특히 비균일 메모리 접근(Non-Uniform Memory Access, NUMA) 와 같...

Jul 13, 20259 min read21

KernelSnitch[논문 리뷰]

Paper 1. Intro 이 글은 NDSS 2025에서 발표된 KernelSnitch 논문을 소개이다. 이 연구는 커널의 평범한 데이터 구조체들이 가진 본질적인 특성이 어떻게 심각한 보안 취약점이 되는지를 보여준다. 핵심은 이러하다: "데이터 구조체의 크기에 따른 접근 시간 차이를 이용해 커널의 비밀 정보를 유출할 수 있다" 여기서는 커널 힙 포인터 유출에 집중해서 설명한다. 이 공격이 성공하면 KASLR을 우회하고 더 심각한 커널 익스플로...

Jul 11, 20257 min read131

멀티태스크와 액터 모델

멀티태스크 협조적/비협조적 멀티태스크 선점: 프로세스와의 협조 없이 수행하는 컨택스트 스위칭이라고는 하나, 결국 뺏어오는 게 가능하냐의 문제다. 협조적 멀티태스크(비선점형, cooperative): 각각의 프로세스가 자발적으로 컨택스트 스위칭을 수행하는 멀티태스크 방식. 장점: 멀티태스크 매커니즘을 구현하기 쉽다. 단점: 프로세스가 자발적으로 컨텍스트 스위칭을 해야하는데, 만약 버그가 발생하여 프로세스가 무한 루프에 빠지거나 정지하게 되면 그 ...

Jul 6, 20252 min read25
M

MaxLog

35 posts