Airflow 튜토리얼 (1)

May 12, 2024
Airflow 튜토리얼 (1)
데이터 추출, 가공, 저장, 분석 파이프라인을 자동화하기 위해 위해 Airflow를 빠르게 배워보고자 강의를 시청했다. 강의는 Windows OS 기준으로 진행되었지만, 본 글은 Mac OS를 사용해 진행했다. 3강까지 진행하면서, 필요한 개념과 몇몇 이슈들을 해결한 과정을 함께 정리한다.

1. 환경설정

  • Prerequisite:
    • docker (airflow를 docker로 설치하는 방법으로 진행)
    • docker compose
    • vscode
  • 강의는 windows를 기준으로 진행하지만, 본 문서에서는 Mac 기준 진행함
  • 작업 폴더 내에 Dockerfile을 아래와 같이 작성한 후 이미지 빌드
    • 커맨드: docker build -t airflow-tutorial .
FROM apache/airflow:latest-python3.12 USER root RUN apt-get update && \\ apt-get -y install git && \\ apt-get clean USER airflow ENV PYTHONPATH="${PYTHONPATH}:/home/airflow/.local/lib/python3.12/site-packages" ENV PYTHONPATH="${PYTHONPATH}:/opt/airflow/plugins"
  • docker-compose.yaml 을 아래와 같이 작성하고 compose up
    • 커맨드: docker compose -f "docker-compose.yaml" up -d --build
version: '3' services: airflow_tutorial: image: airflow-tutorial:latest volumes: - ./airflow:/opt/airflow ports: - "8080:8080" command: airflow standalone
  • http://localhost:8080/ 로 접속해 로그인
    • id: admin
    • pass: standalone_admin_password.txt 파일에 적혀있음
  • 종료: docker compose down

2. DAG 기초

  • alrflow/dags/welcome_dag.py 파일 생성 후 아래와 같이 작성
from airflow import DAG from airflow.operators.python_operator import PythonOperator from airflow.utils.dates import days_ago from datetime import datetime import requests def print_welcome(): print('Welcome to Airflow!') def print_date(): print('Today is {}'.format(datetime.today().date())) def print_random_quote(): response = requests.get('<https://api.quotable.io/random>') quote = response.json()['content'] print('Quote of the day: "{}"'.format(quote)) dag = DAG( 'welcome_dag', default_args={'start_date': days_ago(1)}, schedule_interval='0 23 * * *', # 매일 오후 11시에 DAG 실행 catchup=False ) print_welcome_task = PythonOperator( task_id='print_welcome', python_callable=print_welcome, dag=dag ) print_date_task = PythonOperator( task_id='print_date', python_callable=print_date, dag=dag ) print_random_quote = PythonOperator( task_id='print_random_quote', python_callable=print_random_quote, dag=dag ) # Set the dependencies between the tasks print_welcome_task >> print_date_task >> print_random_quote
  • Airflow가 실행되어있는 상태애서 dag를 추가하고 조금 시간이 지나면 UI에서 DAG 확인 가능
    • UI 상에서 직접 트리거 가능
    • UI의 Logs 에서 결과 확인

3. Provider VS Operator VS Hooks

  • Coverage: Provider ⊃ Operator, Hooks
  • Operator: DAGs 내에서 개별 작업을 정의하고, 재사용 가능한 작업 단위를 제공
    • Action: 특정 작업을 실행
      • ex) PythonOperator, BashOperator 등
    • Transfer: 데이터를 옮김
      • ex) S3ToRedshiftOperator, LocalFilesystemToGCSOperator, S3ToGCSOperator 등
    • Sensor: 특정 조건이 충족될 때까지 대기
      • ex) S3KeySensor, RedshiftClusterSensor 등
    • 참고: Operator는 Blueprint (Python의 Class), Task는 Implementation (Python의 Object)에 해당
    • Operator 사용 예시
from datetime import datetime from airflow import DAG from airflow.operators.bash import BashOperator with DAG(dag_id='bash_operator_dag', start_date=datetime (2023, 8, 1), schedule_interval="@daily") as dag: task1 = BashOperator ( task_id='command_example', bash_command='echo "Airflow is running!"* ) task2 = BashOperator( task_id='execute_script', bash_command='/path/to/your/script.sh', env={'ENV_VAR': 'value'} ) first_task >> second_task
  • Hook: 외부 시스템과의 인터페이스를 추상화하는 클래스
    • Hook 사용 예시
from airflow.providers.postgres.hooks.postgres import PostgresHook pg_hook = PostgresHook(conn_id='my_postgres_connection') result = pg_hook.get_records 'SELECT * FROM table')

4. Task, DAGs, airflow.cfg

  • airflow.cfg 파일에서 airflow 운용을 위한 전반적인 설정을 커스텀할 수 있음
    • 예를 들어 smtp 서버나 dags folder 같은 것들
  • tasks 간 dependency를 만드는 방법 4 가지
# 1. Bitshift operations (>>, <<) task1 >> task2 >> task3 # 2. set-upstream and set-downstream function task1.set_upstream(task2) task3.set_downstream(task2) # 3. Chain funetion: chain(task1, task2, task3) # 4. TaskFlow API: dependencies are automatically inferred based on the sequence of task function calls) task1() task2() task3()
  • 첫 번째 예제: dags/exchange_rate_pipeline.py 파일 작성 (여러 Operator 활용해보는 목적)
from airflow import DAG from airflow.operators.bash_operator import BashOperator from airflow.operators.email_operator import EmailOperator from airflow.operators.python_operator import PythonOperator from datetime import datetime, timedelta from hooks.clean_data import clean_data # Define or Instantiate DAG dag = DAG( 'exchange_rate_etl', start_date=datetime(2023, 10, 1), end_date=datetime(2023, 12, 31), schedule_interval='0 22 * * *', default_args={"retries": 2, "retry_delay": timedelta(minutes=5)}, catchup=False ) # Define or Instantiate Tasks download_task = BashOperator( task_id='download_file', bash_command='curl -o xrate.csv <https://data-api.ecb.europa.eu/service/data/EXR/M.USD.EUR.SP00.A?format=csvdata>', cwd='/tmp', dag=dag, ) clean_data_task = PythonOperator( task_id='clean_data', python_callable=clean_data, dag=dag, ) send_email_task = EmailOperator( task_id='send_email', to='sleekdatasolutions@gmail.com', subject='Exchange Rate Download - Successful', html_content='The Exchange Rate data has been successfully downloaded, cleaned, and loaded.', dag=dag, ) # Define Task Dependencies download_task >> clean_data_task >> send_email_task
  • airflow/plugins/hooks/clean_data.py 파일 작성
import os import pandas as pd from airflow.hooks.base_hook import BaseHook class CleanDataHook(BaseHook): def __init__(self, path='/tmp/xrate.csv'): self.path = path def clean_data(self): data = pd.read_csv(self.path, header=None) default_values = { int: 0, float: 0.0, str: '', } cleaned_data = data.fillna(value=default_values) now = pd.Timestamp.now() year = now.year month = now.month day = now.day data_dir = f'/opt/airflow/data/xrate_cleansed/{year}/{month}/{day}' os.makedirs(data_dir, exist_ok=True) cleaned_data.to_csv(f'{data_dir}/xrate.csv', index=False)
  • dockerfile 수정
FROM apache/airflow:latest-python3.12 USER root RUN apt-get update && \\ apt-get -y install git && \\ apt-get clean USER airflow COPY requirements.txt /tmp/requirements.txt RUN pip install -r /tmp/requirements.txt ENV PYTHONPATH="${PYTHONPATH}:/home/airflow/.local/lib/python3.12/site-packages" ENV PYTHONPATH="${PYTHONPATH}:/opt/airflow/plugins"
  • docker-compose.yaml 수정
version: '3' services: airflow_tutorial: image: airflow-tutorial:latest volumes: - ./airflow:/opt/airflow ports: - "8080:8080" command: airflow standalone my_smtp: image: ixdotai/smtp restart: always
  • airflow.cfg 수정
... smtp_host = my_smtp ...
  • requirements.txt 작성
pandas
  • 강의의 bytemark/smtp 이미지는 ARM 아키텍처 기반 운영체제에서 동작하지 않으니 수정하고 진행
  • smtp가 포함된 DAG를 실행시킨 후 docker compose down 시 리소스를 계속 사용 중이라는 메시지
    • compose up 시 다음과 같이 옵션 주기: docker compose -f "docker-compose.yaml" up -d --build --remove-orphans
  • 두 번째 예제: git_repo_dag.py 파일 작성 (connection 설정을 위한 예제)
from airflow import DAG from airflow.utils.dates import days_ago from airflow.providers.github.operators.github import GithubOperator from airflow.operators.dummy import DummyOperator import logging # Define the DAG dag = DAG( 'git_repo_dag', default_args={'start_date': days_ago(1)}, schedule_interval='0 21 * * *', catchup=False ) # Start Dummy Operator start = DummyOperator(task_id='start', dag=dag) # List GitRepository Tags list_repo_tags = GithubOperator( task_id="list_repo_tags", github_method="get_repo", github_method_args={"full_name_or_id": "ycseong07/minimal-streamlit-lecture"}, result_processor=lambda repo: logging.info(list(repo.get_tags())), dag=dag, ) # End Dummy Operator end = DummyOperator(task_id='end', dag=dag) # Define task dependencies start >> list_repo_tags >> end
  • requirements.txt 수정
pandas apache-airflow-providers-github
  • UI 상에서 Admin - Connections 에서 아래와 같이 connection 추가
    • Connection Id: github_default
    • Connection Type: Github
    • GitHub Access Token: xxxxx

이슈 리스트

  • bytemark/smtp 이미지가 Mac OS(ARM)에서 동작하지 않음
    • ixdotai/smtp 이미지로 대체
  • 이메일이 전송되지 않음. bytemark/smtp 이미지와 ixdotai/smtp 이미지의 동작방식이 다른 것 같은데, 이건 따로 구글 이메일로 발송 시키는 방법에 대한 자료가 많아 우선 패스함

References

Share article

데이터 쓰는 문덕배