← 스터디 홈
3편 · 약 9분

XCom과 태스크 통신

태스크는 왜 기본적으로 격리되어 있나

Airflow는 Task를 독립적인 실행 단위로 설계했다. Task는 서로 다른 프로세스·컨테이너·머신에서 실행될 수 있고, 같은 파이썬 메모리 공간을 공유하지 않는다. 이 격리 덕분에 재시도·분산 실행·실패 격리가 가능하지만, 한 Task의 결과를 다음 Task에 넘기려면 명시적인 통신 채널이 필요하다.

그 채널이 XCom(Cross-Communication)이다.

XCom이란

XCom은 Task 사이에 소량의 데이터를 전달하는 키-값 저장소다. 기본 구현(BaseXCom)은 Airflow의 Metadata DB에 XCom을 저장하고, 다른 Task는 DB를 통해 그 값을 읽어 간다.

XCom의 식별자:

필드의미
key사용자가 정한 이름 (기본값: "return_value")
task_id값을 push한 Task
dag_id해당 DAG
run_id해당 DAG Run

같은 DAG 안에서 run_id가 일치하면 Task 사이에 값을 주고받을 수 있다.

명시적 방식: xcom_push / xcom_pull

classic Operator 방식에서는 TaskInstance 객체를 통해 직접 push/pull한다.

from airflow.sdk import DAG
from airflow.providers.standard.operators.python import PythonOperator
from datetime import datetime

def push_fn(ti):
    ti.xcom_push(key="row_count", value=42_000)

def pull_fn(ti):
    count = ti.xcom_pull(task_ids="extract", key="row_count")
    print(f"받은 행 수: {count}")

with DAG("xcom_demo", schedule=None, start_date=datetime(2025, 1, 1)) as dag:
    push_task = PythonOperator(task_id="extract", python_callable=push_fn)
    pull_task = PythonOperator(task_id="load",    python_callable=pull_fn)

    push_task >> pull_task

xcom_pull의 주요 파라미터:

파라미터설명
task_ids읽어올 Task의 id (str 또는 list)
keyXCom 키 (기본값: "return_value")
dag_id외부 DAG에서 읽을 때 지정

TaskFlow 방식: 자동 XCom

@task 데코레이터를 쓰면 함수의 반환값이 자동으로 return_value 키로 push되고, 다음 Task의 인수로 자동 pull된다. 코드가 훨씬 간결해진다.

from airflow.sdk import dag, task
from datetime import datetime

@dag(schedule=None, start_date=datetime(2025, 1, 1))
def xcom_taskflow():

    @task
    def extract() -> dict:
        return {"rows": 42_000, "source": "orders"}

    @task
    def transform(data: dict) -> dict:
        return {"rows": data["rows"], "clean": True}

    @task
    def load(data: dict) -> None:
        print(f"적재 완료: {data['rows']}행")

    load(transform(extract()))

xcom_taskflow()

함수 인수·반환값의 타입 힌트가 있으면 Airflow가 자동으로 직렬화 방식을 결정한다. @dataclass·@attr.define으로 정의된 객체도 지원된다.

XCom 데이터 흐름

Task A
(extract)
return {"rows": 42000} xcom_push
key="return_value"
Metadata DB
(XCom 저장소)
dag_id / run_id / task_id / key
→ value (직렬화)
xcom_pull
task_ids="extract"
{"rows": 42000} Task B
(transform)
XCom을 통한 태스크 간 데이터 전달 흐름

크기 제한과 주의사항

XCom의 기본 저장소는 Metadata DB이므로 용량 제한이 엄격하다.

  • MySQL 백엔드: BLOB 타입, 기본 최대 64KB
  • PostgreSQL 백엔드: BYTEA 타입, 사실상 무제한이지만 수십 MB는 비권장

XCom은 DataFrame, 대용량 JSON, 파일 내용을 주고받는 용도가 아니다. 소량의 제어 데이터(행 수, 상태 플래그, 파일 경로, 설정값)를 전달하는 데 적합하다.

대용량 데이터를 전달해야 한다면 데이터를 S3·GCS 같은 외부 스토리지에 저장하고 경로만 XCom으로 전달하는 패턴이 일반적이다.

오브젝트 스토리지 XCom 백엔드

Airflow는 apache-airflow-providers-common-io 패키지를 통해 XCom을 오브젝트 스토리지(S3, GCS, Azure Blob 등)에 저장하는 커스텀 백엔드를 제공한다.

# airflow.cfg
[core]
xcom_backend = airflow.providers.common.io.xcom.backend.XComObjectStorageBackend

[common.io]
xcom_objectstorage_path = s3://my-conn@my-bucket/xcom
xcom_objectstorage_threshold = 1048576   # 1MB 이상은 S3에 저장

threshold 미만의 값은 DB에, 이상은 오브젝트 스토리지에 저장하는 하이브리드 방식이다. DB에는 오브젝트 스토리지 경로가 참조로 남는다.

Airflow 3.0 변화점

Airflow 3.0에서 Task는 Metadata DB에 직접 접근할 수 없고, Task Execution Interface(API)를 통해 Scheduler와만 통신한다. 이 변화로 XCom도 영향을 받는다.

  • ti.xcom_pull(task_ids=None)의 동작이 바뀌었다. 2.x에서는 upstream Task 전체를 검색했지만, 3.0에서는 현재 Task 자신이 push한 값만 반환한다. 기존 DAG를 3.0으로 마이그레이션할 때 주의가 필요하다.
  • TaskFlow에서는 이 변화의 영향을 거의 받지 않는다. 명시적 task_ids를 지정하면 동작이 동일하다.

흔한 실수와 해결책

실수 1: 여러 Task에서 같은 key를 push

# 위험: extract와 validate가 모두 "result" 키로 push
# load가 어느 값을 가져올지 불명확해진다
ti.xcom_pull(task_ids=["extract", "validate"], key="result")
# -> list로 반환되므로 인덱싱 필요

실수 2: 대용량 데이터 직접 전달

# 나쁜 예
@task
def extract() -> pd.DataFrame:
    return pd.read_csv("huge.csv")   # 수백 MB → DB 폭발

# 좋은 예
@task
def extract() -> str:
    df = pd.read_csv("huge.csv")
    path = "s3://bucket/tmp/data.parquet"
    df.to_parquet(path)
    return path                       # 경로만 전달

References

  • https://airflow.apache.org/docs/apache-airflow/stable/core-concepts/xcoms.html
  • https://airflow.apache.org/docs/apache-airflow-providers-common-io/stable/xcom_backend.html
  • https://aravindc42.medium.com/airflow-3-xcoms-passing-data-between-tasks-the-quick-guide-aa269e42482c
  • https://www.restack.io/docs/airflow-knowledge-apache-xcom-size-limit
  • https://github.com/apache/airflow/issues/51821