← 스터디 홈
1편 · 약 8분

DAG/Task/Operator 개념

Airflow는 무엇을 해결하는가

데이터 파이프라인은 보통 여러 단계로 구성된다. "매일 새벽 1시에 원천 DB에서 데이터를 긁어와, 변환 후, 리포트 테이블에 적재하라." 이 흐름을 cron 스크립트 몇 개로 관리하면 처음엔 쉽지만, 단계가 늘어나고 의존 관계가 생기며, 실패 재시도·알림·실행 이력이 필요해지는 순간 감당하기 어려워진다.

Apache Airflow는 이 "워크플로 오케스트레이션" 문제를 해결하는 플랫폼이다. 파이프라인을 Python 코드로 표현하고, 스케줄·의존성·재시도·모니터링을 한 곳에서 관리한다. 2014년 Airbnb에서 시작해 현재 Apache 최상위 프로젝트이며, 2025년 4월 Airflow 3.0이 출시됐다.

DAG — 파이프라인을 표현하는 그래프

DAG(Directed Acyclic Graph, 방향 비순환 그래프)는 Airflow의 핵심 단위다. Task들을 노드로, 실행 순서(의존 관계)를 방향 있는 엣지로 표현한다. "비순환(Acyclic)"이라는 점이 중요하다 — 사이클이 없어야 언젠가는 완료된다.

from airflow.sdk import DAG
from airflow.providers.standard.operators.python import PythonOperator
from datetime import datetime

with DAG(
    dag_id="daily_etl",
    schedule="@daily",
    start_date=datetime(2025, 1, 1),
    catchup=False,
) as dag:
    extract = PythonOperator(task_id="extract", python_callable=run_extract)
    transform = PythonOperator(task_id="transform", python_callable=run_transform)
    load = PythonOperator(task_id="load", python_callable=run_load)

    extract >> transform >> load

주요 파라미터:

파라미터역할
dag_idDAG를 식별하는 고유 이름
schedule실행 주기 (cron 문자열 또는 프리셋)
start_date스케줄이 시작되는 기준 시점
catchup과거 미실행 구간 자동 보완 여부 (기본 True)

Task — 파이프라인의 실행 단위

Task는 DAG 안의 실행 단위다. Scheduler가 Task 인스턴스(TaskInstance)를 생성하고, 상태를 추적한다.

Task의 생명 주기:

none → scheduled → queued → running → success
                                    ↘ failed → up_for_retry

하나의 Task가 실패하면 기본적으로 downstream Task들은 실행되지 않는다. depends_on_past=True를 설정하면 전날 같은 Task가 성공한 경우에만 오늘 실행된다.

Operator — Task의 실체

Operator는 Task의 타입이다. Operator를 DAG 코드에서 인스턴스화하면 Task가 된다. Airflow는 다양한 내장 Operator를 제공하며, Provider 패키지로 AWS·GCP·MySQL 등 외부 시스템 연동을 확장할 수 있다.

자주 쓰이는 Operator

  • PythonOperator — Python 함수를 실행. 가장 범용적.
  • BashOperator — Bash 명령어를 실행.
  • SQLExecuteQueryOperator — SQL을 실행.
  • EmptyOperator — 아무 일도 하지 않는 플레이스홀더. 의존성 구조를 명확히 할 때 유용.
  • Sensors — 외부 조건이 충족될 때까지 대기하는 특수 Operator. 예: FileSensor(파일 존재 확인), HttpSensor(API 응답 확인).

TaskFlow API — 현대적 방식

Airflow 2.0 이후 @task 데코레이터를 쓰는 TaskFlow 스타일이 권장된다. 함수를 그대로 Task로 만들고, 반환값이 자동으로 XCom에 실려 다음 Task로 전달된다.

from airflow.sdk import dag, task

@dag(schedule="@daily", start_date=datetime(2025, 1, 1), catchup=False)
def daily_etl():
    @task
    def extract() -> dict:
        return {"rows": 1000}

    @task
    def transform(data: dict) -> dict:
        return {"rows": data["rows"], "status": "clean"}

    @task
    def load(data: dict) -> None:
        print(f"Loaded {data['rows']} rows")

    load(transform(extract()))

daily_etl()

함수 호출 구조가 의존 관계를 대체한다. >> 연산자를 별도로 쓰지 않아도 된다.

의존 관계 선언

classic Operator 방식에서 의존 관계는 두 가지로 선언한다.

# bitshift 연산자 (권장)
extract >> transform >> load

# 명시적 메서드
transform.set_upstream(extract)
load.set_downstream(transform)

# 팬아웃 / 팬인
extract >> [transform_a, transform_b] >> load

Airflow 아키텍처

사용자 DAG 파일 작성
DAGs 폴더 Python 파일
Scheduler
(파싱·스케줄)
Metadata DB
(상태 저장)
Executor
(Task 배분)
Worker
(Task 실행)
Webserver
(UI·API)
Airflow 핵심 컴포넌트와 데이터 흐름
컴포넌트역할
SchedulerDAG 파일을 파싱하고 실행 대상 Task를 Metadata DB에 등록
Metadata DB모든 DAG·Task 상태, 실행 이력을 저장 (PostgreSQL / MySQL)
ExecutorTask를 어디서 실행할지 결정 (Local / Celery / Kubernetes 등)
Worker실제로 Task를 실행하는 프로세스
Webserver브라우저 UI 및 REST API 제공

Airflow 3.0부터 Task는 Metadata DB에 직접 접근하지 않는다. Task Execution Interface를 통해 Scheduler와 통신하므로 보안 모델이 강화됐다.

References

  • https://airflow.apache.org/docs/apache-airflow/stable/core-concepts/dags.html
  • https://airflow.apache.org/docs/apache-airflow/stable/core-concepts/tasks.html
  • https://airflow.apache.org/docs/apache-airflow/stable/core-concepts/operators.html
  • https://airflow.apache.org/blog/airflow-three-point-oh-is-here/
  • https://www.astronomer.io/docs/learn/dags