(참고) Airflow 환경
- Airflow: 2.9.2
- Python: 3.12.4
DAG (Python) 생성
Airflow 설치 시 만들었던, /dag 디렉토리에 python dag 파일을 생성하면 됩니다.
30초 후에 자동으로 리프레시 되기도 하고, 저같은 경우에는 container 를 restart 해서 바로 반영해주기도 합니다.
docker restart airflow
dag 파일이 생성되고 정상적으로 로드되면 airflow scheduler 가 dag 파일을 해석하여 queue 에 전달하게 됩니다.
airflow worker 는 queue 에 쌓인 task 를 수행하게 됩니다
# dags/dag_example.py
from airflow import DAG
from airflow.operators.bash import BashOperator
from datetime import datetime, timedelta
# DAG 기본 설정
default_args = {
'owner': 'airflow',
'depends_on_past': False,
'start_date': datetime(2024, 6, 29),
'email_on_failure': False,
'email_on_retry': False,
'retries': 1,
'retry_delay': timedelta(minutes=5),
}
# DAG 정의
dag = DAG(
'simple_bash_dag',
default_args=default_args,
description='A simple tutorial DAG',
schedule_interval=timedelta(days=1),
)
# 첫 번째 Bash 작업
task1 = BashOperator(
task_id='print_date',
bash_command='date',
dag=dag,
)
# 두 번째 Bash 작업
task2 = BashOperator(
task_id='sleep',
bash_command='sleep 5',
dag=dag,
)
# 세 번째 Bash 작업
task3 = BashOperator(
task_id='print_hello',
bash_command='echo "Hello World"',
dag=dag,
)
# 작업 순서 정의
task1 >> task2 >> task3
DAG 상세 설명
- default_args: DAG와 태스크의 default arguments를 정의해 줍니다.
- dag: DAG 인스턴스를 생성하고, 'simple_bash_dag'라는 이름으로 DAG를 생성합니다. 하루에 한 번 실행되도록 스케줄링을 걸어줍니다.
- task1, task2, task3: 각각 BashOperator를 사용하여 Bash 명령어를 실행하는 3개의 task를 만들어줍니다.
from airflow.operators.bash import BashOperator
BashOperator 를 import 해야 사용할 수 있다.
'print_date' task는 현재 날짜를 출력하고, 'sleep' task는 5초 동안 대기하며, 'print_hello' task는 "Hello World"를 간단하게 출력하는 예제입니다.
- task1 >> task2 >> task3: 태스크의 실행 순서를 만들어 주고 Airflow Web UI에서 확인할 수 있습니다.
Airflow Web UI 확인
print_date >> sleep >> print_hello 순으로 task 가 수행되는 것을 'Graph' 에서 확인할 수 있습니다.
DAG 수동 수행
우측 상단의 'Trigger DAG' 버튼을 눌러주시면 스케줄링과 무관하게 수동으로 DAG 를 수행하게 됩니다.
'Devops > Airflow' 카테고리의 다른 글
Airflow 에 Slack API 연동하기 (0) | 2024.06.29 |
---|