Airflow 에 Slack API 연동 방법
연동 방법으로는 크게 3가지 방법이 있는 것 같습니다. 아래 방법 중 세 번째 방법으로 연동을 해보았습니다.
- 직접 Slack Python Class / Method 를 생성하여 호출
- Airflow Connection - HTTP webhook / Slack Incoming Webhook 연계
- Airflow Connection - Slack API 연계
Airflow Connection - Slack API 연계
Airflow 메뉴 중 하나인 'Admin' 에서 Connections 메뉴을 제공합니다.
Add a new record 를 눌러줍니다.
- Connection Id: slack_api_default *이름을 동일하게 설정해주셔야합니다
- Connection Type: Slack API
- Slack API Token: { Your Slack API }
을 입력해주고 'Save' 버튼을 눌러줍니다
DAG Operator 및 callback 구현
기존의 구글링 했을 때는 SlackAPIPostOperator 메소드 내에서 token 를 입력을 했어야 했습니다.
직접 token 을 입력해보았을 때, 에러가 발생하고 현재는 지원하지 않는 것 같습니다.
airflow.providers.slack.operators.slack — apache-airflow-providers-slack Documentation
SlackAPIPostOperator 와 send_slack_notification 의 차이는 찾아보고 있는데 아직까지는 확실히 모르겠습니다
SlackAPIPostOperator: Slack API Operators — apache-airflow-providers-slack Documentation
send_slack_notifiacation: How-to Guide for Slack notifications — apache-airflow-providers-slack Documentation
from airflow import DAG
from airflow.operators.bash import BashOperator
from datetime import datetime, timedelta
from airflow.providers.slack.operators.slack import SlackAPIPostOperator
from airflow.providers.slack.notifications.slack import send_slack_notification
from utils.alert import SlackAlert
# DAG 기본 설정
default_args = {
'owner': 'airflow',
'depends_on_past': False,
'start_date': datetime(2024, 6, 29),
'email_on_failure': False,
'email_on_retry': False,
'retries': 1,
'retry_delay': timedelta(minutes=5),
'on_success_callback': [
send_slack_notification(
text="The DAG {{ dag.dag_id }} succeeded",
channel="#alarm"
)
],
}
# DAG 정의
dag = DAG(
'Slack_test',
default_args=default_args,
description='A simple tutorial DAG',
schedule_interval=timedelta(days=1),
)
# 첫 번째 Bash 작업
task1 = BashOperator(
task_id='print_date',
bash_command='date',
dag=dag,
)
# 세 번째 Bash 작업
task2 = BashOperator(
task_id='print_hello',
bash_command='echo "Hello World"',
dag=dag,
)
slack_alert = SlackAPIPostOperator(
task_id="slack_alert",
channel='#alarm',
blocks=[
{
"type": "section",
"text": {
"type": "mrkdwn",
"text": (
"*<https://github.com/apache/airflow|Apache Airflow™>* "
"is an open-source platform for developing, scheduling, "
"and monitoring batch-oriented workflows."
),
}
}
],
text="Fallback message",
)
# 작업 순서 정의
task1 >> task2 >> slack_alert
심화 - Slack Alert Utils 구현
DAG 메인 코드
from airflow import DAG
from airflow.operators.bash import BashOperator
from datetime import datetime, timedelta
from airflow.providers.slack.operators.slack import SlackAPIPostOperator
from airflow.providers.slack.notifications.slack import send_slack_notification
from utils.alert import SlackAlert
alert = SlackAlert('#alarm')
# DAG 기본 설정
default_args = {
'owner': 'airflow',
'depends_on_past': False,
'start_date': datetime(2024, 6, 29),
'email_on_failure': False,
'email_on_retry': False,
'retries': 1,
'retry_delay': timedelta(minutes=5),
'on_success_callback': alert.slack_fail_alert
}
# DAG 정의
dag = DAG(
'Slack_test',
default_args=default_args,
description='A simple tutorial DAG',
schedule_interval=timedelta(days=1),
)
# 첫 번째 Bash 작업
task1 = BashOperator(
task_id='print_date',
bash_command='date',
dag=dag,
)
# 세 번째 Bash 작업
task2 = BashOperator(
task_id='print_hello',
bash_command='echo "Hello World"',
dag=dag,
)
# 작업 순서 정의
task1 >> task2
SlackAlert Utils 코드
connection 에 저장한 변수는 아래와 같이 불러 올 수 있습니다.
from airflow.hooks.base_hook import BaseHook
slack_token = BaseHook.get_connection('slack_api_default').password
from airflow.hooks.base_hook import BaseHook
from airflow.operators.slack_operator import SlackAPIPostOperator
class SlackAlert:
def __init__(self, channel):
self.slack_channel = channel
self.slack_token = BaseHook.get_connection('slack_api_default').password
def slack_fail_alert(self, context):
alert = SlackAPIPostOperator(
task_id='slack_failed',
channel=self.slack_channel,
text="""
:red_circle: Task Failed.
*Task*: {task}
*Dag*: {dag}
*Execution Time*: {exec_date}
*Log Url*: {log_url}
""".format(
task=context.get('task_instance').task_id,
dag=context.get('task_instance').dag_id,
exec_date=context.get('execution_date'),
log_url=context.get('task_instance').log_url
)
)
return alert.execute(context=context)
'Devops > Airflow' 카테고리의 다른 글
Airflow DAG 1초 예제 (3) | 2024.06.29 |
---|