운영(재시도·SLA·알림)
왜 운영 설정이 중요한가
DAG를 만드는 것보다 운영에서 살아남게 하는 것이 더 어렵다. 네트워크가 순간 끊기고, 외부 API가 503을 반환하고, 새벽 배치가 SLA를 넘기는 일은 모두 예측 가능한 사고다. Airflow는 이 세 가지 상황을 각각 재시도(Retry), 데드라인 경보(Deadline Alert/SLA), 콜백·알림(Callback/Notifier) 으로 대응할 수 있는 도구를 제공한다.
재시도(Retry)
기본 파라미터
Task 인스턴스가 실패하면 Airflow는 retries 횟수만큼 다시 실행한다. 재시도 간격은 retry_delay로 지정한다.
from datetime import timedelta
from airflow.sdk import DAG
from airflow.providers.standard.operators.python import PythonOperator
with DAG(
dag_id="retry_demo",
default_args={
"retries": 3,
"retry_delay": timedelta(minutes=5),
},
) as dag:
fetch = PythonOperator(task_id="fetch_api", python_callable=call_external_api)default_args에 설정하면 DAG 안 모든 Task에 적용된다. Task 정의에서 같은 파라미터를 쓰면 개별 Task에만 적용(override)된다.
지수 백오프(Exponential Backoff)
외부 API 장애처럼 잠깐의 대기가 도움이 되는 상황에서는 지수 백오프를 쓴다. 재시도마다 대기 시간이 두 배씩 늘어나며, max_retry_delay로 상한을 제한할 수 있다.
default_args = {
"retries": 5,
"retry_delay": timedelta(minutes=1),
"retry_exponential_backoff": True,
"max_retry_delay": timedelta(minutes=60),
}1차 실패 후 1분, 2차 실패 후 2분, 3차 4분, 4차 8분, 5차 16분 — 상한인 60분을 넘으면 60분으로 고정된다.
재시도 횟수를 어떻게 정하나
| 상황 | 권장 retries | 이유 |
|---|---|---|
| 멱등(idempotent) API 호출 | 3~5 | 일시적 오류 커버 |
| DB INSERT (중복 위험) | 0~1 | 멱등성 보장 후 늘릴 것 |
| 매우 긴 ML 학습 Task | 1~2 | 비용 대비 효과 |
| 단순 echo / 검증 Task | 0 | 실패를 빠르게 알리는 게 낫다 |
콜백(Callback)
콜백은 Task나 DAG의 상태 전환 시점에 호출되는 Python 함수다. 알림·집계·외부 연동 등 모든 사이드이펙트를 여기에 모은다.
Task 레벨 콜백
| 파라미터 | 호출 시점 |
|---|---|
on_failure_callback | Task 최종 실패(재시도 소진 후) |
on_retry_callback | 재시도 시작 직전 |
on_success_callback | Task 성공 |
on_skipped_callback | Task skip |
def alert_on_failure(context):
dag_id = context["dag"].dag_id
task_id = context["task_instance"].task_id
log_url = context["task_instance"].log_url
print(f"FAILED: {dag_id}.{task_id} — {log_url}")
process = PythonOperator(
task_id="process",
python_callable=do_work,
on_failure_callback=alert_on_failure,
on_retry_callback=lambda ctx: print("retrying..."),
)context 딕셔너리에는 dag, task_instance, dag_run, logical_date 등 모든 실행 메타데이터가 들어 있다.
DAG 레벨 콜백
DAG 런 전체의 성공·실패를 한 곳에서 처리하고 싶을 때는 DAG에 콜백을 걸면 된다.
with DAG(
dag_id="etl_pipeline",
on_failure_callback=send_slack_alert,
on_success_callback=update_dashboard,
):
...Notifier: 재사용 가능한 알림 객체
Airflow 2.6+에서 BaseNotifier를 구현한 Notifier 클래스를 콜백 자리에 직접 전달할 수 있다. Slack, PagerDuty, Opsgenie 등의 Provider가 기본 Notifier를 제공한다.
from airflow.providers.slack.notifications.slack import SlackNotifier
with DAG(
dag_id="etl_pipeline",
on_failure_callback=SlackNotifier(
slack_conn_id="slack_default",
text="💥 {{ dag_run.dag_id }} 실패 — {{ task_instance.task_id }}",
channel="#airflow-alerts",
),
):
...여러 Notifier를 리스트로 전달하면 동시에 모두 실행된다.
on_failure_callback=[email_notifier, slack_notifier, pagerduty_notifier]SLA와 Deadline Alerts
Airflow 2.x: sla 파라미터
Airflow 2.x에서는 Task에 sla(Service Level Agreement) 값을 지정해 DAG 스케줄 시작 기준으로 허용 경과 시간을 정의했다.
# Airflow 2.x 방식
with DAG(
dag_id="daily_report",
sla_miss_callback=on_sla_miss,
):
process = PythonOperator(
task_id="process",
python_callable=run_report,
sla=timedelta(hours=2), # 스케줄 기준 2시간 이내 완료해야 함
)주의: sla 파라미터는 Task의 실행 시간 제한이 아니라 DAG 스케줄 시작 기준 상대 시간이다. 실행 시간을 제한하려면 execution_timeout을 써야 한다.
Airflow 3.x: deadline + DeadlineAlert
Airflow 3.0에서 sla / sla_miss_callback이 제거되고, Deadline Alerts(AIP-86) 로 대체됐다. DAG 선언부에 deadline 파라미터로 설정한다.
from airflow.sdk import DAG, DeadlineAlert, DeadlineReference, AsyncCallback
from airflow.providers.slack.notifications.slack_webhook import SlackWebhookNotifier
from datetime import timedelta
with DAG(
dag_id="daily_etl",
schedule="@daily",
deadline=DeadlineAlert(
reference=DeadlineReference.DAGRUN_LOGICAL_DATE,
interval=timedelta(hours=2),
callback=AsyncCallback(
SlackWebhookNotifier,
kwargs={"text": "⏰ {{ dag_run.dag_id }} 데드라인 초과!"},
),
),
):
...reference는 기준 시각이고 interval은 허용 여유 시간이다. 기준 + 여유 시간 이내에 DAG 런이 완료되지 않으면 callback이 호출된다.
(logical_date) → interval=2h 경과 →
execution_timeout vs deadline
| 설정 | 단위 | 동작 |
|---|---|---|
execution_timeout | Task | 지정 시간 초과 시 Task를 강제 종료 |
deadline (Airflow 3) | DAG 런 | 지정 시간 초과 시 알림만 발송 (런 자체는 계속 실행) |
이메일 알림
Airflow에는 SMTP 기반 이메일 발송 기능이 내장돼 있다.
# airflow.cfg
[smtp]
smtp_host = smtp.gmail.com
smtp_starttls = True
smtp_ssl = False
smtp_user = your@gmail.com
smtp_password = app-specific-password
smtp_port = 587
smtp_mail_from = airflow@example.comwith DAG(
dag_id="critical_etl",
default_args={
"email": ["oncall@example.com"],
"email_on_failure": True,
"email_on_retry": False,
},
):
...Airflow 3.0에서는
AIRFLOW__EMAIL__*환경변수 일부가 제거됐다. SendGrid·Amazon SES 같은 서드파티 이메일 Provider Notifier를 사용하는 쪽으로 전환이 권장된다.
운영 체크리스트
(DAG 전체 실패) deadline=DeadlineAlert
(시간 초과 경보)
retry_exponential_backoff execution_timeout
(Task 강제 종료) on_failure_callback
(Task 실패 알림)
| 항목 | 권장 값 |
|---|---|
retries | 외부 의존성 있는 Task: 3; 내부 계산 Task: 0~1 |
retry_delay | 짧은 작업: 1~5분; 외부 API: 5~30분 |
retry_exponential_backoff | 외부 API 장애 대응 시 True |
execution_timeout | 기대 실행 시간의 2~3배 |
on_failure_callback | Slack / PagerDuty Notifier 연결 |
deadline (Airflow 3) | 비즈니스 SLA 기반 설정 |
References
- https://airflow.apache.org/docs/apache-airflow/stable/administration-and-deployment/logging-monitoring/callbacks.html
- https://airflow.apache.org/docs/apache-airflow/stable/howto/deadline-alerts.html
- https://airflow.apache.org/docs/apache-airflow/stable/howto/sla-to-deadlines.html
- https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=323488182
- https://www.getorchestra.io/guides/airflow-concepts-airflow-sla-and-retries
- https://reintech.io/blog/error-handling-retry-strategies-airflow
- https://airflow.apache.org/docs/apache-airflow-providers-slack/stable/notifications/slack_notifier_howto_guide.html