본문 바로가기
DevOps/Data-Pipeline

[개인 프로젝트] 데이터 파이프라인 구축기 (1)

by 민우's 코딩 2025. 9. 3.

 

안녕하세요! 오늘은 데이터 파이프라인에 대해서, 갑자기 관심을 가지게 된 이유, 구축기 까지 한번 블로그에 작성해보려고 합니다.

 

최근에 작년에 참여한 공모전에 다시 한 번 도전하게 되면서 날씨 정보를 이어 받아서 전송하는 데이터 파이프라인을 팀원이 개발하고 저는 인프라 포지션을 맡아서 Server 구성을 진행하고 있었습니다.

 

데이터 파이프라인을 구성해서 직접 인프라에서 돌려보면서 궁금한 점이 많이 생겼고 나도 한 번 개발해볼까...? 싶은 생각에 개념만 무작정 찾아보면서 공부하는 것보다 AI 시대에 직접 구성 해보는 것이 더 머리에 잘 남고 기억 될 것 같아서 한 번 파이프라인을 구축해보자! 라는 생각에 시작하게 되었습니다.

 

그럼 데이터 파이프라인이라는 것이 무엇인가?

데이터 파이프라인은 다양한 소스에서 원시 데이터를 수집, 변환 및 처리하여 저장소나 분석 도구로 전달하는 일련의 자동화된 과정

 

뭔가 쉽게 직역하자면 어떠한 데이터가 나왔을때 그 데이터들을 모아서 내 저장소나 분석하는 모델로 전달해주는

말 그대로의 파이프라인 입니다. 

 

이 데이터라는 것이 회사의 규모나 사이트의 규모가 커지게 되면 그만큼의 빅데이터가 생겨 날 것이고, 그만큼의 유저 수가 생긴 곳에서 데이터를 모아서 그 데이터들을 확인하며, 앞으로의 방향성 등을 정하는데 정말 많은 도움을 줄 것 입니다.

 

결국엔 데이터로 더 발전시키기 위해서 데이터 파이프라인을 통해 데이터를 추출하고 변환하여 의사결정을 해주는

아주 중요한 것이 바로 데이터 파이프라인입니다!

 

데이터 파이프라인의 일반적인 패턴은 총 두가지로 나뉩니다.

  • ETL / ELT
    • Extract (추출):  로드 및 변환을 준비하기 위해 다양한 소스에서 데이터를 수집 ,원본 데이터 소스에서 데이터를 뽑아냄
    • Transform (변환):  분석가, 시각화 도구 또는 파이프라인이 제공하는 모든 사용 사례에 유용하게 쓸 수 있게 각 소스 시스템의 원본 데이터를 결합하고 형식을 지정하는 단계 ,데이터를 예쁘게 변환
    • Transform (변환):  원본 데이터(ETL의 경우) 또는 완전히 변환된 데이터(ELT의 경우)를 최종 대상으로 가져옴, 데이터를 데이터 저장소에 저장

이것을 비유하자면 다음과 같이 생각하면 더 편하게 이해가 될 것 같습니다!

 

  • Extract (추출): 마트(데이터 소스)에서 필요한 재료(데이터)를 사 오는 과정입니다.
  • Transform (변환): 사 온 재료를 요리하기 좋게 다듬는(데이터 정제, 가공, 계산) 과정입니다. (e.g., 감자 껍질 벗기기, 양파 다지기)
  • Load (적재): 잘 다듬은 재료를 냉장고(데이터베이스, 웨어하우스)에 저장하거나, 바로 접시에 담아(사용자에게 보여주기) 내놓는 과정입니다.

ETL과 ELT는 중간의 순서만 바뀐 것이라고 생각하면 됩니다!

  • ETL (Extract → Transform → Load):
    • 재료를 사 와서(E), 미리 다 손질한 뒤에(T), 냉장고에 보관(L)하는 방식입니다. 전통적인 데이터 웨어하우스(DW)에서 많이 사용됩니다.
  • ELT (Extract → Load → Transform):
    • 재료를 사 와서(E), 손질하지 않고 통째로 일단 냉장고에 넣고(L), 요리하기 직전에 꺼내서 손질(T)하는 방식입니다. 클라우드 기반의 강력한 데이터 웨어하우스(e.g., Redshift, BigQuery)가 등장하면서 인기를 얻고 있습니다.

 

 

저는 오늘 바로 미니 ETL 파이프라인을 한 번 구성해보았습니다!

 

두 가지 버전으로 구성해보았는데 두 가지의 차이점을 표로 정리하면 다음과 같습니다.

구분 Airflow 파이프라인 인터랙티브 슬랙봇
목적 데이터의 기록 및 축적 실시간 조회 및 응답
실행 방식 스케줄 기반 (정해진 시간에 자동 실행) 이벤트 기반 (사용자 요청 시 즉시 실행)
데이터 저장 O (PostgreSQL에 저장) X (저장 없이 바로 응답)
응답 시간 느림 (배치 처리) 빠름 (실시간 처리)

 

 

1. Airflow 파이프라인

Airflow 활용한 파이프라인 아키텍처

 

이 파이프라인은 환율 정보를 기록하고 저장하는 것에 중점을 둔 파이프라인입니다.

Airflow DAG를 작성하고, 스케줄에 맞춰 API 데이터를 PostgreSQL에 저장하고, 슬랙으로 메시지를 보내는 과정입니다.

우선 Airflow, postgreSQL을 docker 환경을 이용해서 구축했습니다.

 

curl -LfO 'https://airflow.apache.org/docs/apache-airflow/2.9.2/docker-compose.yaml'


docker-compose up -d

 

airflow yaml 파일을 다운로드하여 가져와주고 실행해줍니다!

 

그리고 나서 dags 폴더를 만들고 그 안에 오늘 실행할 pipeline python code를 작성해줍니다! (AI의 도움을 받아서..)

from __future__ import annotations

import pendulum
import requests
import psycopg2
from airflow.models.dag import DAG
from airflow.operators.python import PythonOperator
from airflow.models import Variable

# Airflow UI의 Admin -> Variables 에서 API_KEY 와 Redshift 접속 정보 등록 필요
API_KEY = Variable.get("exchange_rate_api_key") 
REDSHIFT_CONN = {
    "host": Variable.get("redshift_host"),
    "dbname": Variable.get("redshift_dbname"),
    "user": Variable.get("redshift_user"),
    "password": Variable.get("redshift_password"),
    "port": Variable.get("redshift_port", default_var=5439)
}

def get_exchange_rate(**context):
    """
    ExchangeRate-API로부터 USD->KRW 환율 정보를 가져오는 함수
    """
    url = f"https://v6.exchangerate-api.com/v6/{API_KEY}/pair/USD/KRW"
    response = requests.get(url)
    response.raise_for_status()  # 요청 실패 시 예외 발생
    data = response.json()
    
    print(f"API로부터 받은 데이터: {data}")

    # 다음 task(load_to_redshift)로 데이터 전달
    context['ti'].xcom_push(key='exchange_rate_data', value=data)


def load_to_redshift(**context):
    """
    이전 task에서 전달받은 데이터를 Redshift에 적재하는 함수
    """
    # 이전 task로부터 데이터 가져오기
    data = context['ti'].xcom_pull(key='exchange_rate_data', task_ids='get_exchange_rate_task')
    
    if not data:
        raise ValueError("API로부터 데이터를 가져오지 못했습니다.")

    base_code = data.get('base_code')
    target_code = data.get('target_code')
    conversion_rate = data.get('conversion_rate')
    last_update_utc = data.get('time_last_update_utc')

    # Redshift 연결
    conn = psycopg2.connect(**REDSHIFT_CONN)
    cur = conn.cursor()
    
    try:
        # 데이터 삽입 SQL
        insert_sql = """
        INSERT INTO raw_data.exchange_rate (base_code, target_code, conversion_rate, last_update_utc)
        VALUES (%s, %s, %s, %s);
        """
        cur.execute(insert_sql, (base_code, target_code, conversion_rate, last_update_utc))
        conn.commit()
        print("데이터 Redshift 적재 성공")
    except Exception as e:
        conn.rollback()
        print(f"데이터 적재 실패: {e}")
        raise
    finally:
        cur.close()
        conn.close()


with DAG(
    dag_id="exchange_rate_pipeline",
    start_date=pendulum.datetime(2024, 1, 1, tz="Asia/Seoul"),
    schedule="0 9 * * *",  # 매일 오전 9시에 실행
    catchup=False,
    tags=["mini-project"],
) as dag:
    get_exchange_rate_task = PythonOperator(
        task_id="get_exchange_rate_task",
        python_callable=get_exchange_rate,
    )

    load_to_redshift_task = PythonOperator(
        task_id="load_to_redshift_task",
        python_callable=load_to_redshift,
    )

    # Task 실행 순서 정의
    get_exchange_rate_task >> load_to_redshift_task

 

 

이제 Airflow에 접속하여 DAGs안에서 위에서 만든 DAG를 실행해주면 다음과 같은 결과가 나오게 되고!

Airflow 성공

 

Slack 알림
PostgreSQL에 저장된 데이터들

 

이렇게 첫 번째 Airflow를 활용한 ETL 데이터 파이프라인 구축을 완료했습니다!

 

 

 

 

2. 실시간 인터랙티브 봇 형식으로 만들기

 

그럼 데이터를 실시간으로 필요로하여 이벤트 기반으로 요청하는 상황으로 가정하면 실시간 처리로 해야 빠르게 데이터를 받아볼 수 있을테니 그 방식으로도 한번 만들어보자는 생각에 2번째 방법도 진행하였습니다!

 

Slack에서 요청하면 응답하는 봇 아키텍처

 

빠르게 Slack API 설정들을 진행해줍니다..

 

그리고 물어보면 답변해주는 실시간 응답해주는 파이썬 애플리케이션을 개발합니다.

 

import os
import requests
from dotenv import load_dotenv
from slack_bolt import App
from slack_bolt.adapter.socket_mode import SocketModeHandler

# .env 파일에서 환경 변수 로드
load_dotenv()

# 슬랙 앱 초기화
app = App(
    token=os.environ.get("SLACK_BOT_TOKEN")
)

# 봇이 멘션(@)되었을 때 실행될 함수
@app.event("app_mention")
def handle_app_mention_events(body, say):
    """봇을 태그하면 버튼이 있는 메시지를 보냅니다."""
    say(
        blocks=[
            {
                "type": "section",
                "text": {
                    "type": "mrkdwn",
                    "text": "안녕하세요! 무엇을 도와드릴까요?"
                }
            },
            {
                "type": "actions",
                "elements": [
                    {
                        "type": "button",
                        "text": {
                            "type": "plain_text",
                            "text": "오늘의 환율 알아보기 💵"
                        },
                        "action_id": "check_exchange_rate" # 이 버튼의 고유 ID
                    }
                ]
            }
        ],
        text="무엇을 도와드릴까요?"
    )

# 버튼이 클릭되었을 때 실행될 함수
@app.action("check_exchange_rate")
def handle_check_exchange_rate(ack, body, say, logger):
    """환율 알아보기 버튼을 누르면 API를 호출하여 결과를 알려줍니다."""
    ack() # 슬랙에게 버튼 클릭을 잘 받았다고 알림 (3초 이내 필수)
    
    api_key = os.environ.get("EXCHANGERATE_API_KEY")
    url = f"https://v6.exchangerate-api.com/v6/{api_key}/pair/USD/KRW"
    
    try:
        response = requests.get(url)
        response.raise_for_status()  # 요청 실패 시 예외 발생
        data = response.json()
        rate = data.get("conversion_rate")
        
        # 원래 메시지가 있던 곳에 스레드로 답변을 달아줍니다.
        thread_ts = body['message']['ts']
        say(text=f"오늘의 USD/KRW 환율은 *{rate:,.2f}원* 입니다.", thread_ts=thread_ts)

    except requests.exceptions.RequestException as e:
        logger.error(f"API 요청 실패: {e}")
        thread_ts = body['message']['ts']
        say(text="환율 정보를 가져오는 데 실패했습니다. 잠시 후 다시 시도해주세요.", thread_ts=thread_ts)


# 애플리케이션 시작
if __name__ == "__main__":
    # Socket Mode를 사용하려면 앱 설정에서 활성화해야 합니다.
    # (Settings > Socket Mode > Enable Socket Mode)
    handler = SocketModeHandler(app, os.environ["SLACK_APP_TOKEN"]) # SLACK_APP_TOKEN 은 xapp-... 으로 시작
    handler.start()

 

이 애플리케이션을 실행해주면 다음과 같이 원할때 실시간으로 환율 정보를 제공 받을 수 있습니다!

 

보시다시피 직접 슬랙 봇을 태그하면 환율정보를 알게 해주는 버튼을 클릭하면 환율 정보가 나옵니다!

이 방식은 첫 번째 방식과 다르게 실시간으로 정보를 제공 받기 위한 용도로 강하기 때문에 배치 처리가 아닌 실시간 처리이며 따라서 데이터베이스에 정보를 저장하지 않았습니다.

 

 

 

 

 

 

 

오늘 이 두 가지 방식의 데이터 파이프라인을 경험해보면서 ETL의 기본 개념, 배치 처리와 실시간 처리의 차이점 등을 직접 실시해보는 좋은 개인 프로젝트였습니다.