DAG/Task/Operator 개념
Airflow는 무엇을 해결하는가
데이터 파이프라인은 보통 여러 단계로 구성된다. "매일 새벽 1시에 원천 DB에서 데이터를 긁어와, 변환 후, 리포트 테이블에 적재하라." 이 흐름을 cron 스크립트 몇 개로 관리하면 처음엔 쉽지만, 단계가 늘어나고 의존 관계가 생기며, 실패 재시도·알림·실행 이력이 필요해지는 순간 감당하기 어려워진다.
Apache Airflow는 이 "워크플로 오케스트레이션" 문제를 해결하는 플랫폼이다. 파이프라인을 Python 코드로 표현하고, 스케줄·의존성·재시도·모니터링을 한 곳에서 관리한다. 2014년 Airbnb에서 시작해 현재 Apache 최상위 프로젝트이며, 2025년 4월 Airflow 3.0이 출시됐다.
DAG — 파이프라인을 표현하는 그래프
DAG(Directed Acyclic Graph, 방향 비순환 그래프)는 Airflow의 핵심 단위다. Task들을 노드로, 실행 순서(의존 관계)를 방향 있는 엣지로 표현한다. "비순환(Acyclic)"이라는 점이 중요하다 — 사이클이 없어야 언젠가는 완료된다.
from airflow.sdk import DAG
from airflow.providers.standard.operators.python import PythonOperator
from datetime import datetime
with DAG(
dag_id="daily_etl",
schedule="@daily",
start_date=datetime(2025, 1, 1),
catchup=False,
) as dag:
extract = PythonOperator(task_id="extract", python_callable=run_extract)
transform = PythonOperator(task_id="transform", python_callable=run_transform)
load = PythonOperator(task_id="load", python_callable=run_load)
extract >> transform >> load주요 파라미터:
| 파라미터 | 역할 |
|---|---|
dag_id | DAG를 식별하는 고유 이름 |
schedule | 실행 주기 (cron 문자열 또는 프리셋) |
start_date | 스케줄이 시작되는 기준 시점 |
catchup | 과거 미실행 구간 자동 보완 여부 (기본 True) |
Task — 파이프라인의 실행 단위
Task는 DAG 안의 실행 단위다. Scheduler가 Task 인스턴스(TaskInstance)를 생성하고, 상태를 추적한다.
Task의 생명 주기:
none → scheduled → queued → running → success
↘ failed → up_for_retry하나의 Task가 실패하면 기본적으로 downstream Task들은 실행되지 않는다. depends_on_past=True를 설정하면 전날 같은 Task가 성공한 경우에만 오늘 실행된다.
Operator — Task의 실체
Operator는 Task의 타입이다. Operator를 DAG 코드에서 인스턴스화하면 Task가 된다. Airflow는 다양한 내장 Operator를 제공하며, Provider 패키지로 AWS·GCP·MySQL 등 외부 시스템 연동을 확장할 수 있다.
자주 쓰이는 Operator
PythonOperator— Python 함수를 실행. 가장 범용적.BashOperator— Bash 명령어를 실행.SQLExecuteQueryOperator— SQL을 실행.EmptyOperator— 아무 일도 하지 않는 플레이스홀더. 의존성 구조를 명확히 할 때 유용.- Sensors — 외부 조건이 충족될 때까지 대기하는 특수 Operator. 예:
FileSensor(파일 존재 확인),HttpSensor(API 응답 확인).
TaskFlow API — 현대적 방식
Airflow 2.0 이후 @task 데코레이터를 쓰는 TaskFlow 스타일이 권장된다. 함수를 그대로 Task로 만들고, 반환값이 자동으로 XCom에 실려 다음 Task로 전달된다.
from airflow.sdk import dag, task
@dag(schedule="@daily", start_date=datetime(2025, 1, 1), catchup=False)
def daily_etl():
@task
def extract() -> dict:
return {"rows": 1000}
@task
def transform(data: dict) -> dict:
return {"rows": data["rows"], "status": "clean"}
@task
def load(data: dict) -> None:
print(f"Loaded {data['rows']} rows")
load(transform(extract()))
daily_etl()함수 호출 구조가 의존 관계를 대체한다. >> 연산자를 별도로 쓰지 않아도 된다.
의존 관계 선언
classic Operator 방식에서 의존 관계는 두 가지로 선언한다.
# bitshift 연산자 (권장)
extract >> transform >> load
# 명시적 메서드
transform.set_upstream(extract)
load.set_downstream(transform)
# 팬아웃 / 팬인
extract >> [transform_a, transform_b] >> loadAirflow 아키텍처
(파싱·스케줄) ↓ Metadata DB
(상태 저장) ↓ Executor
(Task 배분)
(Task 실행) ↑ Webserver
(UI·API)
| 컴포넌트 | 역할 |
|---|---|
| Scheduler | DAG 파일을 파싱하고 실행 대상 Task를 Metadata DB에 등록 |
| Metadata DB | 모든 DAG·Task 상태, 실행 이력을 저장 (PostgreSQL / MySQL) |
| Executor | Task를 어디서 실행할지 결정 (Local / Celery / Kubernetes 등) |
| Worker | 실제로 Task를 실행하는 프로세스 |
| Webserver | 브라우저 UI 및 REST API 제공 |
Airflow 3.0부터 Task는 Metadata DB에 직접 접근하지 않는다. Task Execution Interface를 통해 Scheduler와 통신하므로 보안 모델이 강화됐다.
References
- https://airflow.apache.org/docs/apache-airflow/stable/core-concepts/dags.html
- https://airflow.apache.org/docs/apache-airflow/stable/core-concepts/tasks.html
- https://airflow.apache.org/docs/apache-airflow/stable/core-concepts/operators.html
- https://airflow.apache.org/blog/airflow-three-point-oh-is-here/
- https://www.astronomer.io/docs/learn/dags