XCom과 태스크 통신
태스크는 왜 기본적으로 격리되어 있나
Airflow는 Task를 독립적인 실행 단위로 설계했다. Task는 서로 다른 프로세스·컨테이너·머신에서 실행될 수 있고, 같은 파이썬 메모리 공간을 공유하지 않는다. 이 격리 덕분에 재시도·분산 실행·실패 격리가 가능하지만, 한 Task의 결과를 다음 Task에 넘기려면 명시적인 통신 채널이 필요하다.
그 채널이 XCom(Cross-Communication)이다.
XCom이란
XCom은 Task 사이에 소량의 데이터를 전달하는 키-값 저장소다. 기본 구현(BaseXCom)은 Airflow의 Metadata DB에 XCom을 저장하고, 다른 Task는 DB를 통해 그 값을 읽어 간다.
XCom의 식별자:
| 필드 | 의미 |
|---|---|
key | 사용자가 정한 이름 (기본값: "return_value") |
task_id | 값을 push한 Task |
dag_id | 해당 DAG |
run_id | 해당 DAG Run |
같은 DAG 안에서 run_id가 일치하면 Task 사이에 값을 주고받을 수 있다.
명시적 방식: xcom_push / xcom_pull
classic Operator 방식에서는 TaskInstance 객체를 통해 직접 push/pull한다.
from airflow.sdk import DAG
from airflow.providers.standard.operators.python import PythonOperator
from datetime import datetime
def push_fn(ti):
ti.xcom_push(key="row_count", value=42_000)
def pull_fn(ti):
count = ti.xcom_pull(task_ids="extract", key="row_count")
print(f"받은 행 수: {count}")
with DAG("xcom_demo", schedule=None, start_date=datetime(2025, 1, 1)) as dag:
push_task = PythonOperator(task_id="extract", python_callable=push_fn)
pull_task = PythonOperator(task_id="load", python_callable=pull_fn)
push_task >> pull_taskxcom_pull의 주요 파라미터:
| 파라미터 | 설명 |
|---|---|
task_ids | 읽어올 Task의 id (str 또는 list) |
key | XCom 키 (기본값: "return_value") |
dag_id | 외부 DAG에서 읽을 때 지정 |
TaskFlow 방식: 자동 XCom
@task 데코레이터를 쓰면 함수의 반환값이 자동으로 return_value 키로 push되고, 다음 Task의 인수로 자동 pull된다. 코드가 훨씬 간결해진다.
from airflow.sdk import dag, task
from datetime import datetime
@dag(schedule=None, start_date=datetime(2025, 1, 1))
def xcom_taskflow():
@task
def extract() -> dict:
return {"rows": 42_000, "source": "orders"}
@task
def transform(data: dict) -> dict:
return {"rows": data["rows"], "clean": True}
@task
def load(data: dict) -> None:
print(f"적재 완료: {data['rows']}행")
load(transform(extract()))
xcom_taskflow()함수 인수·반환값의 타입 힌트가 있으면 Airflow가 자동으로 직렬화 방식을 결정한다. @dataclass·@attr.define으로 정의된 객체도 지원된다.
XCom 데이터 흐름
(extract) ↓ return {"rows": 42000} ↓ xcom_push
key="return_value"
(XCom 저장소) ↓ dag_id / run_id / task_id / key
→ value (직렬화)
task_ids="extract" ↓ {"rows": 42000} ↓ Task B
(transform)
크기 제한과 주의사항
XCom의 기본 저장소는 Metadata DB이므로 용량 제한이 엄격하다.
- MySQL 백엔드: BLOB 타입, 기본 최대 64KB
- PostgreSQL 백엔드: BYTEA 타입, 사실상 무제한이지만 수십 MB는 비권장
XCom은 DataFrame, 대용량 JSON, 파일 내용을 주고받는 용도가 아니다. 소량의 제어 데이터(행 수, 상태 플래그, 파일 경로, 설정값)를 전달하는 데 적합하다.
대용량 데이터를 전달해야 한다면 데이터를 S3·GCS 같은 외부 스토리지에 저장하고 경로만 XCom으로 전달하는 패턴이 일반적이다.
오브젝트 스토리지 XCom 백엔드
Airflow는 apache-airflow-providers-common-io 패키지를 통해 XCom을 오브젝트 스토리지(S3, GCS, Azure Blob 등)에 저장하는 커스텀 백엔드를 제공한다.
# airflow.cfg
[core]
xcom_backend = airflow.providers.common.io.xcom.backend.XComObjectStorageBackend
[common.io]
xcom_objectstorage_path = s3://my-conn@my-bucket/xcom
xcom_objectstorage_threshold = 1048576 # 1MB 이상은 S3에 저장threshold 미만의 값은 DB에, 이상은 오브젝트 스토리지에 저장하는 하이브리드 방식이다. DB에는 오브젝트 스토리지 경로가 참조로 남는다.
Airflow 3.0 변화점
Airflow 3.0에서 Task는 Metadata DB에 직접 접근할 수 없고, Task Execution Interface(API)를 통해 Scheduler와만 통신한다. 이 변화로 XCom도 영향을 받는다.
ti.xcom_pull(task_ids=None)의 동작이 바뀌었다. 2.x에서는 upstream Task 전체를 검색했지만, 3.0에서는 현재 Task 자신이 push한 값만 반환한다. 기존 DAG를 3.0으로 마이그레이션할 때 주의가 필요하다.- TaskFlow에서는 이 변화의 영향을 거의 받지 않는다. 명시적
task_ids를 지정하면 동작이 동일하다.
흔한 실수와 해결책
실수 1: 여러 Task에서 같은 key를 push
# 위험: extract와 validate가 모두 "result" 키로 push
# load가 어느 값을 가져올지 불명확해진다
ti.xcom_pull(task_ids=["extract", "validate"], key="result")
# -> list로 반환되므로 인덱싱 필요실수 2: 대용량 데이터 직접 전달
# 나쁜 예
@task
def extract() -> pd.DataFrame:
return pd.read_csv("huge.csv") # 수백 MB → DB 폭발
# 좋은 예
@task
def extract() -> str:
df = pd.read_csv("huge.csv")
path = "s3://bucket/tmp/data.parquet"
df.to_parquet(path)
return path # 경로만 전달References
- https://airflow.apache.org/docs/apache-airflow/stable/core-concepts/xcoms.html
- https://airflow.apache.org/docs/apache-airflow-providers-common-io/stable/xcom_backend.html
- https://aravindc42.medium.com/airflow-3-xcoms-passing-data-between-tasks-the-quick-guide-aa269e42482c
- https://www.restack.io/docs/airflow-knowledge-apache-xcom-size-limit
- https://github.com/apache/airflow/issues/51821