
데이터 추출, 가공, 저장, 분석 파이프라인을 자동화하기 위해 위해 Airflow를 빠르게 배워보고자 강의를 시청했다. 강의는 Windows OS 기준으로 진행되었지만, 본 글은 Mac OS를 사용해 진행했다. 3강까지 진행하면서, 필요한 개념과 몇몇 이슈들을 해결한 과정을 함께 정리한다.
1. 환경설정
- Prerequisite:
- docker (airflow를 docker로 설치하는 방법으로 진행)
- docker compose
- vscode
- 강의는 windows를 기준으로 진행하지만, 본 문서에서는 Mac 기준 진행함
- 작업 폴더 내에 Dockerfile을 아래와 같이 작성한 후 이미지 빌드
- 커맨드:
docker build -t airflow-tutorial .
FROM apache/airflow:latest-python3.12
USER root
RUN apt-get update && \\
apt-get -y install git && \\
apt-get clean
USER airflow
ENV PYTHONPATH="${PYTHONPATH}:/home/airflow/.local/lib/python3.12/site-packages"
ENV PYTHONPATH="${PYTHONPATH}:/opt/airflow/plugins"
- docker-compose.yaml 을 아래와 같이 작성하고 compose up
- 커맨드:
docker compose -f "docker-compose.yaml" up -d --build
version: '3'
services:
airflow_tutorial:
image: airflow-tutorial:latest
volumes:
- ./airflow:/opt/airflow
ports:
- "8080:8080"
command: airflow standalone
- http://localhost:8080/ 로 접속해 로그인
- id: admin
- pass: standalone_admin_password.txt 파일에 적혀있음
- 종료:
docker compose down
2. DAG 기초
- alrflow/dags/welcome_dag.py 파일 생성 후 아래와 같이 작성
from airflow import DAG
from airflow.operators.python_operator import PythonOperator
from airflow.utils.dates import days_ago
from datetime import datetime
import requests
def print_welcome():
print('Welcome to Airflow!')
def print_date():
print('Today is {}'.format(datetime.today().date()))
def print_random_quote():
response = requests.get('<https://api.quotable.io/random>')
quote = response.json()['content']
print('Quote of the day: "{}"'.format(quote))
dag = DAG(
'welcome_dag',
default_args={'start_date': days_ago(1)},
schedule_interval='0 23 * * *', # 매일 오후 11시에 DAG 실행
catchup=False
)
print_welcome_task = PythonOperator(
task_id='print_welcome',
python_callable=print_welcome,
dag=dag
)
print_date_task = PythonOperator(
task_id='print_date',
python_callable=print_date,
dag=dag
)
print_random_quote = PythonOperator(
task_id='print_random_quote',
python_callable=print_random_quote,
dag=dag
)
# Set the dependencies between the tasks
print_welcome_task >> print_date_task >> print_random_quote
- Airflow가 실행되어있는 상태애서 dag를 추가하고 조금 시간이 지나면 UI에서 DAG 확인 가능
- UI 상에서 직접 트리거 가능
- UI의 Logs 에서 결과 확인
3. Provider VS Operator VS Hooks
- Coverage: Provider ⊃ Operator, Hooks
- Provider: 외부 시스템과의 연동을 가능하게 하는 플러그인 형태의 모듈
- ex) Amazon Web Services (AWS) Provider는 AWS 서비스들을 사용하기 위한 Operators와 Hooks를 제공
- Provider 참고: https://airflow.apache.org/docs/apache-airflow-providers/packages-ref.html
- Operator: DAGs 내에서 개별 작업을 정의하고, 재사용 가능한 작업 단위를 제공
- Action: 특정 작업을 실행
- ex) PythonOperator, BashOperator 등
- Transfer: 데이터를 옮김
- ex) S3ToRedshiftOperator, LocalFilesystemToGCSOperator, S3ToGCSOperator 등
- Sensor: 특정 조건이 충족될 때까지 대기
- ex) S3KeySensor, RedshiftClusterSensor 등
- 참고: Operator는 Blueprint (Python의 Class), Task는 Implementation (Python의 Object)에 해당
- Operator 사용 예시
from datetime import datetime
from airflow import DAG
from airflow.operators.bash import BashOperator
with DAG(dag_id='bash_operator_dag', start_date=datetime (2023, 8, 1),
schedule_interval="@daily") as dag:
task1 = BashOperator (
task_id='command_example',
bash_command='echo "Airflow is running!"*
)
task2 = BashOperator(
task_id='execute_script',
bash_command='/path/to/your/script.sh',
env={'ENV_VAR': 'value'}
)
first_task >> second_task
- Hook: 외부 시스템과의 인터페이스를 추상화하는 클래스
- Hook 사용 예시
from airflow.providers.postgres.hooks.postgres import PostgresHook
pg_hook = PostgresHook(conn_id='my_postgres_connection')
result = pg_hook.get_records 'SELECT * FROM table')
- Operators, Hooks 참고: https://airflow.apache.org/docs/apache-airflow-providers/operators-and-hooks-ref/index.html
4. Task, DAGs, airflow.cfg
- airflow.cfg 파일에서 airflow 운용을 위한 전반적인 설정을 커스텀할 수 있음
- 예를 들어 smtp 서버나 dags folder 같은 것들
- tasks 간 dependency를 만드는 방법 4 가지
# 1. Bitshift operations (>>, <<)
task1 >> task2 >> task3
# 2. set-upstream and set-downstream function
task1.set_upstream(task2)
task3.set_downstream(task2)
# 3. Chain funetion:
chain(task1, task2, task3)
# 4. TaskFlow API: dependencies are automatically inferred based on the sequence of task function calls)
task1()
task2()
task3()
- 첫 번째 예제: dags/exchange_rate_pipeline.py 파일 작성 (여러 Operator 활용해보는 목적)
from airflow import DAG
from airflow.operators.bash_operator import BashOperator
from airflow.operators.email_operator import EmailOperator
from airflow.operators.python_operator import PythonOperator
from datetime import datetime, timedelta
from hooks.clean_data import clean_data
# Define or Instantiate DAG
dag = DAG(
'exchange_rate_etl',
start_date=datetime(2023, 10, 1),
end_date=datetime(2023, 12, 31),
schedule_interval='0 22 * * *',
default_args={"retries": 2, "retry_delay": timedelta(minutes=5)},
catchup=False
)
# Define or Instantiate Tasks
download_task = BashOperator(
task_id='download_file',
bash_command='curl -o xrate.csv <https://data-api.ecb.europa.eu/service/data/EXR/M.USD.EUR.SP00.A?format=csvdata>',
cwd='/tmp',
dag=dag,
)
clean_data_task = PythonOperator(
task_id='clean_data',
python_callable=clean_data,
dag=dag,
)
send_email_task = EmailOperator(
task_id='send_email',
to='sleekdatasolutions@gmail.com',
subject='Exchange Rate Download - Successful',
html_content='The Exchange Rate data has been successfully downloaded, cleaned, and loaded.',
dag=dag,
)
# Define Task Dependencies
download_task >> clean_data_task >> send_email_task
- airflow/plugins/hooks/clean_data.py 파일 작성
import os
import pandas as pd
from airflow.hooks.base_hook import BaseHook
class CleanDataHook(BaseHook):
def __init__(self, path='/tmp/xrate.csv'):
self.path = path
def clean_data(self):
data = pd.read_csv(self.path, header=None)
default_values = {
int: 0,
float: 0.0,
str: '',
}
cleaned_data = data.fillna(value=default_values)
now = pd.Timestamp.now()
year = now.year
month = now.month
day = now.day
data_dir = f'/opt/airflow/data/xrate_cleansed/{year}/{month}/{day}'
os.makedirs(data_dir, exist_ok=True)
cleaned_data.to_csv(f'{data_dir}/xrate.csv', index=False)
- dag parameter 참고
- dockerfile 수정
FROM apache/airflow:latest-python3.12
USER root
RUN apt-get update && \\
apt-get -y install git && \\
apt-get clean
USER airflow
COPY requirements.txt /tmp/requirements.txt
RUN pip install -r /tmp/requirements.txt
ENV PYTHONPATH="${PYTHONPATH}:/home/airflow/.local/lib/python3.12/site-packages"
ENV PYTHONPATH="${PYTHONPATH}:/opt/airflow/plugins"
- docker-compose.yaml 수정
version: '3'
services:
airflow_tutorial:
image: airflow-tutorial:latest
volumes:
- ./airflow:/opt/airflow
ports:
- "8080:8080"
command: airflow standalone
my_smtp:
image: ixdotai/smtp
restart: always
- airflow.cfg 수정
...
smtp_host = my_smtp
...
- requirements.txt 작성
pandas
- 강의의 bytemark/smtp 이미지는 ARM 아키텍처 기반 운영체제에서 동작하지 않으니 수정하고 진행
- smtp가 포함된 DAG를 실행시킨 후 docker compose down 시 리소스를 계속 사용 중이라는 메시지
- compose up 시 다음과 같이 옵션 주기:
docker compose -f "docker-compose.yaml" up -d --build --remove-orphans
- 두 번째 예제: git_repo_dag.py 파일 작성 (connection 설정을 위한 예제)
from airflow import DAG
from airflow.utils.dates import days_ago
from airflow.providers.github.operators.github import GithubOperator
from airflow.operators.dummy import DummyOperator
import logging
# Define the DAG
dag = DAG(
'git_repo_dag',
default_args={'start_date': days_ago(1)},
schedule_interval='0 21 * * *',
catchup=False
)
# Start Dummy Operator
start = DummyOperator(task_id='start', dag=dag)
# List GitRepository Tags
list_repo_tags = GithubOperator(
task_id="list_repo_tags",
github_method="get_repo",
github_method_args={"full_name_or_id": "ycseong07/minimal-streamlit-lecture"},
result_processor=lambda repo: logging.info(list(repo.get_tags())),
dag=dag,
)
# End Dummy Operator
end = DummyOperator(task_id='end', dag=dag)
# Define task dependencies
start >> list_repo_tags >> end
- requirements.txt 수정
pandas
apache-airflow-providers-github
- UI 상에서 Admin - Connections 에서 아래와 같이 connection 추가
- Connection Id: github_default
- Connection Type: Github
- GitHub Access Token: xxxxx
이슈 리스트
- bytemark/smtp 이미지가 Mac OS(ARM)에서 동작하지 않음
- ixdotai/smtp 이미지로 대체
- plugins/clean_data.py 를 import해오지 못하는 문제
- plugins/hooks 폴더 내에 clean_data.py 파일을 작성하고, customized hook으로 사용할 수 있도록 구조화
- dockerfile 내에 plugins 폴더를 PYTHONPATH 로 추가\
- 참고: https://stackoverflow.com/questions/43907813/cant-import-airflow-plugins
- 이메일이 전송되지 않음. bytemark/smtp 이미지와 ixdotai/smtp 이미지의 동작방식이 다른 것 같은데, 이건 따로 구글 이메일로 발송 시키는 방법에 대한 자료가 많아 우선 패스함
References
Share article