![Airflow 튜토리얼 (1)](https://image.inblog.dev?url=https%3A%2F%2Finblog.ai%2Fapi%2Fog%3Ftitle%3DAirflow%2520%25ED%258A%259C%25ED%2586%25A0%25EB%25A6%25AC%25EC%2596%25BC%2520%281%29%26logoUrl%3Dhttps%253A%252F%252Finblog.ai%252Finblog_logo.png%26blogTitle%3D%25EB%258D%25B0%25EC%259D%25B4%25ED%2584%25B0%2520%25EC%2593%25B0%25EB%258A%2594%2520%25EB%25AC%25B8%25EB%258D%2595%25EB%25B0%25B0&w=2048&q=75)
데이터 추출, 가공, 저장, 분석 파이프라인을 자동화하기 위해 위해 Airflow를 빠르게 배워보고자 강의를 시청했다. 강의는 Windows OS 기준으로 진행되었지만, 본 글은 Mac OS를 사용해 진행했다. 3강까지 진행하면서, 필요한 개념과 몇몇 이슈들을 해결한 과정을 함께 정리한다.
1. 환경설정
- Prerequisite:
- docker (airflow를 docker로 설치하는 방법으로 진행)
- docker compose
- vscode
- 강의는 windows를 기준으로 진행하지만, 본 문서에서는 Mac 기준 진행함
- 작업 폴더 내에 Dockerfile을 아래와 같이 작성한 후 이미지 빌드
- 커맨드:
docker build -t airflow-tutorial .
FROM apache/airflow:latest-python3.12 USER root RUN apt-get update && \\ apt-get -y install git && \\ apt-get clean USER airflow ENV PYTHONPATH="${PYTHONPATH}:/home/airflow/.local/lib/python3.12/site-packages" ENV PYTHONPATH="${PYTHONPATH}:/opt/airflow/plugins"
- docker-compose.yaml 을 아래와 같이 작성하고 compose up
- 커맨드:
docker compose -f "docker-compose.yaml" up -d --build
version: '3' services: airflow_tutorial: image: airflow-tutorial:latest volumes: - ./airflow:/opt/airflow ports: - "8080:8080" command: airflow standalone
- http://localhost:8080/ 로 접속해 로그인
- id: admin
- pass: standalone_admin_password.txt 파일에 적혀있음
- 종료:
docker compose down
2. DAG 기초
- alrflow/dags/welcome_dag.py 파일 생성 후 아래와 같이 작성
from airflow import DAG from airflow.operators.python_operator import PythonOperator from airflow.utils.dates import days_ago from datetime import datetime import requests def print_welcome(): print('Welcome to Airflow!') def print_date(): print('Today is {}'.format(datetime.today().date())) def print_random_quote(): response = requests.get('<https://api.quotable.io/random>') quote = response.json()['content'] print('Quote of the day: "{}"'.format(quote)) dag = DAG( 'welcome_dag', default_args={'start_date': days_ago(1)}, schedule_interval='0 23 * * *', # 매일 오후 11시에 DAG 실행 catchup=False ) print_welcome_task = PythonOperator( task_id='print_welcome', python_callable=print_welcome, dag=dag ) print_date_task = PythonOperator( task_id='print_date', python_callable=print_date, dag=dag ) print_random_quote = PythonOperator( task_id='print_random_quote', python_callable=print_random_quote, dag=dag ) # Set the dependencies between the tasks print_welcome_task >> print_date_task >> print_random_quote
- Airflow가 실행되어있는 상태애서 dag를 추가하고 조금 시간이 지나면 UI에서 DAG 확인 가능
- UI 상에서 직접 트리거 가능
- UI의 Logs 에서 결과 확인
3. Provider VS Operator VS Hooks
- Coverage: Provider ⊃ Operator, Hooks
- Provider: 외부 시스템과의 연동을 가능하게 하는 플러그인 형태의 모듈
- ex) Amazon Web Services (AWS) Provider는 AWS 서비스들을 사용하기 위한 Operators와 Hooks를 제공
- Provider 참고: https://airflow.apache.org/docs/apache-airflow-providers/packages-ref.html
- Operator: DAGs 내에서 개별 작업을 정의하고, 재사용 가능한 작업 단위를 제공
- Action: 특정 작업을 실행
- ex) PythonOperator, BashOperator 등
- Transfer: 데이터를 옮김
- ex) S3ToRedshiftOperator, LocalFilesystemToGCSOperator, S3ToGCSOperator 등
- Sensor: 특정 조건이 충족될 때까지 대기
- ex) S3KeySensor, RedshiftClusterSensor 등
- 참고: Operator는 Blueprint (Python의 Class), Task는 Implementation (Python의 Object)에 해당
- Operator 사용 예시
from datetime import datetime from airflow import DAG from airflow.operators.bash import BashOperator with DAG(dag_id='bash_operator_dag', start_date=datetime (2023, 8, 1), schedule_interval="@daily") as dag: task1 = BashOperator ( task_id='command_example', bash_command='echo "Airflow is running!"* ) task2 = BashOperator( task_id='execute_script', bash_command='/path/to/your/script.sh', env={'ENV_VAR': 'value'} ) first_task >> second_task
- Hook: 외부 시스템과의 인터페이스를 추상화하는 클래스
- Hook 사용 예시
from airflow.providers.postgres.hooks.postgres import PostgresHook pg_hook = PostgresHook(conn_id='my_postgres_connection') result = pg_hook.get_records 'SELECT * FROM table')
- Operators, Hooks 참고: https://airflow.apache.org/docs/apache-airflow-providers/operators-and-hooks-ref/index.html
4. Task, DAGs, airflow.cfg
- airflow.cfg 파일에서 airflow 운용을 위한 전반적인 설정을 커스텀할 수 있음
- 예를 들어 smtp 서버나 dags folder 같은 것들
- tasks 간 dependency를 만드는 방법 4 가지
# 1. Bitshift operations (>>, <<) task1 >> task2 >> task3 # 2. set-upstream and set-downstream function task1.set_upstream(task2) task3.set_downstream(task2) # 3. Chain funetion: chain(task1, task2, task3) # 4. TaskFlow API: dependencies are automatically inferred based on the sequence of task function calls) task1() task2() task3()
- 첫 번째 예제: dags/exchange_rate_pipeline.py 파일 작성 (여러 Operator 활용해보는 목적)
from airflow import DAG from airflow.operators.bash_operator import BashOperator from airflow.operators.email_operator import EmailOperator from airflow.operators.python_operator import PythonOperator from datetime import datetime, timedelta from hooks.clean_data import clean_data # Define or Instantiate DAG dag = DAG( 'exchange_rate_etl', start_date=datetime(2023, 10, 1), end_date=datetime(2023, 12, 31), schedule_interval='0 22 * * *', default_args={"retries": 2, "retry_delay": timedelta(minutes=5)}, catchup=False ) # Define or Instantiate Tasks download_task = BashOperator( task_id='download_file', bash_command='curl -o xrate.csv <https://data-api.ecb.europa.eu/service/data/EXR/M.USD.EUR.SP00.A?format=csvdata>', cwd='/tmp', dag=dag, ) clean_data_task = PythonOperator( task_id='clean_data', python_callable=clean_data, dag=dag, ) send_email_task = EmailOperator( task_id='send_email', to='sleekdatasolutions@gmail.com', subject='Exchange Rate Download - Successful', html_content='The Exchange Rate data has been successfully downloaded, cleaned, and loaded.', dag=dag, ) # Define Task Dependencies download_task >> clean_data_task >> send_email_task
- airflow/plugins/hooks/clean_data.py 파일 작성
import os import pandas as pd from airflow.hooks.base_hook import BaseHook class CleanDataHook(BaseHook): def __init__(self, path='/tmp/xrate.csv'): self.path = path def clean_data(self): data = pd.read_csv(self.path, header=None) default_values = { int: 0, float: 0.0, str: '', } cleaned_data = data.fillna(value=default_values) now = pd.Timestamp.now() year = now.year month = now.month day = now.day data_dir = f'/opt/airflow/data/xrate_cleansed/{year}/{month}/{day}' os.makedirs(data_dir, exist_ok=True) cleaned_data.to_csv(f'{data_dir}/xrate.csv', index=False)
- dag parameter 참고
- dockerfile 수정
FROM apache/airflow:latest-python3.12 USER root RUN apt-get update && \\ apt-get -y install git && \\ apt-get clean USER airflow COPY requirements.txt /tmp/requirements.txt RUN pip install -r /tmp/requirements.txt ENV PYTHONPATH="${PYTHONPATH}:/home/airflow/.local/lib/python3.12/site-packages" ENV PYTHONPATH="${PYTHONPATH}:/opt/airflow/plugins"
- docker-compose.yaml 수정
version: '3' services: airflow_tutorial: image: airflow-tutorial:latest volumes: - ./airflow:/opt/airflow ports: - "8080:8080" command: airflow standalone my_smtp: image: ixdotai/smtp restart: always
- airflow.cfg 수정
... smtp_host = my_smtp ...
- requirements.txt 작성
pandas
- 강의의 bytemark/smtp 이미지는 ARM 아키텍처 기반 운영체제에서 동작하지 않으니 수정하고 진행
- smtp가 포함된 DAG를 실행시킨 후 docker compose down 시 리소스를 계속 사용 중이라는 메시지
- compose up 시 다음과 같이 옵션 주기:
docker compose -f "docker-compose.yaml" up -d --build --remove-orphans
- 두 번째 예제: git_repo_dag.py 파일 작성 (connection 설정을 위한 예제)
from airflow import DAG from airflow.utils.dates import days_ago from airflow.providers.github.operators.github import GithubOperator from airflow.operators.dummy import DummyOperator import logging # Define the DAG dag = DAG( 'git_repo_dag', default_args={'start_date': days_ago(1)}, schedule_interval='0 21 * * *', catchup=False ) # Start Dummy Operator start = DummyOperator(task_id='start', dag=dag) # List GitRepository Tags list_repo_tags = GithubOperator( task_id="list_repo_tags", github_method="get_repo", github_method_args={"full_name_or_id": "ycseong07/minimal-streamlit-lecture"}, result_processor=lambda repo: logging.info(list(repo.get_tags())), dag=dag, ) # End Dummy Operator end = DummyOperator(task_id='end', dag=dag) # Define task dependencies start >> list_repo_tags >> end
- requirements.txt 수정
pandas apache-airflow-providers-github
- UI 상에서 Admin - Connections 에서 아래와 같이 connection 추가
- Connection Id: github_default
- Connection Type: Github
- GitHub Access Token: xxxxx
이슈 리스트
- bytemark/smtp 이미지가 Mac OS(ARM)에서 동작하지 않음
- ixdotai/smtp 이미지로 대체
- plugins/clean_data.py 를 import해오지 못하는 문제
- plugins/hooks 폴더 내에 clean_data.py 파일을 작성하고, customized hook으로 사용할 수 있도록 구조화
- dockerfile 내에 plugins 폴더를 PYTHONPATH 로 추가\
- 참고: https://stackoverflow.com/questions/43907813/cant-import-airflow-plugins
- 이메일이 전송되지 않음. bytemark/smtp 이미지와 ixdotai/smtp 이미지의 동작방식이 다른 것 같은데, 이건 따로 구글 이메일로 발송 시키는 방법에 대한 자료가 많아 우선 패스함
References
Share article