ぽこやかざん雑記

データエンジニア / 下町モルモット / 広島カープファン / 深夜の馬鹿力 / おくやま

AirflowのXComsの記述方法あれこれ

Apache Airflow の XComsについて、いざコードで記述してみる時に手が止まらないようにするためのメモ。
具体的には、以下の4パターンでコードの記述方法が変わってくるので、その違いについて書いていく。

  1. @taskデコレータを使って、明示的にxcom_pushしたものをpullする
  2. @taskデコレータを使って、PythonOperatorのreturn値をpullする
  3. @taskデコレータを使わずに、明示的にxcom_pushしたものをpullする
  4. @taskデコレータを使わずに、PythonOperatorのreturn値をpullする

参考

AirflowのCustom XCom Backend触ってみた

関連情報

XComsとは

タスク間でデータを受け渡す仕組み。

以下公式ドキュメントの引用。

XComs (short for “cross-communications”) are a mechanism that let Tasks talk to each other, as by default Tasks are entirely isolated and may be running on entirely different machines.

ざっくり言うと、データを渡す側で xcom_push、データを受け取る側で xcom_pull をすることでデータの受け渡しが可能。

@task デコレータ(@dag デコレータ)について

Apache Airflow 2.0以降で導入されたもので、DAGの定義やタスクの作成をより直感的で簡潔に行うことができる。
詳しくは こちらを参照。
具体的な書き方については以下の公式の使用例がわかりやすい。

1. @taskデコレータを使って、明示的にxcom_pushしたものをpullする

from airflow.decorators import dag, task
from datetime import datetime

@dag(schedule_interval='@daily', start_date=datetime(2023, 1, 1))
def my_dag_1():
    @task
    def push_task():
        from airflow.models import TaskInstance
        ti = TaskInstance.current()
        ti.xcom_push(key='explicit_key', value='Explicit Value with @task decorator')

    @task
    def pull_task():
        from airflow.models import TaskInstance
        ti = TaskInstance.current()
        value = ti.xcom_pull(task_ids='push_task', key='explicit_key')
        print(value)

    push_task()
    pull_task()

2. @taskデコレータを使って、PythonOperatorのreturn値をpullする

ポイントとしては以下の2点:

  • PythonOperator 含め、ほとんどのオペレータではreturn値をXComsの return_value というキー自動的にプッシュする
  • xcom_pullは、キーが渡されない場合デフォルトで return_value キーの値を取得する

つまり、return値をpullする場合は明示的にpushする必要はないし、pull時にキーを指定しなくても良い。

@dag(schedule_interval='@daily', start_date=datetime(2023, 1, 1))
def my_dag_2():
    @task
    def push_task():
        return 'Value returned with @task decorator'

    @task
    def pull_task(data_from_push_task):
        print(data_from_push_task)

    data = push_task()
    pull_task(data)

3. @taskデコレータを使わずに、明示的にxcom_pushしたものをpullする

from airflow.models import DAG, TaskInstance
from airflow.operators.python_operator import PythonOperator
from datetime import datetime

def push_function(**kwargs):
    ti = kwargs['ti']
    ti.xcom_push(key='explicit_key', value='Explicit Value without @task decorator')

def pull_function(**kwargs):
    ti = kwargs['ti']
    value = ti.xcom_pull(task_ids='push_function', key='explicit_key')
    print(value)

with DAG('my_dag_3', schedule_interval='@daily', start_date=datetime(2023, 1, 1)) as dag:
    push_task = PythonOperator(
        task_id='push_function',
        python_callable=push_function,
        provide_context=True
    )

    pull_task = PythonOperator(
        task_id='pull_function',
        python_callable=pull_function,
        provide_context=True
    )

    push_task >> pull_task

4. @taskデコレータを使わずに、PythonOperatorのreturn値をpullする

ポイントは2と同じ。

def push_value_function():
    return 'Value returned without @task decorator'

def pull_value_function(**kwargs):
    ti = kwargs['ti']
    value = ti.xcom_pull(task_ids='push_value_function')
    print(value)

with DAG('my_dag_4', schedule_interval='@daily', start_date=datetime(2023, 1, 1)) as dag:
    push_value_task = PythonOperator(
        task_id='push_value_function',
        python_callable=push_value_function
    )

    pull_value_task = PythonOperator(
        task_id='pull_value_function',
        python_callable=pull_value_function,
        provide_context=True
    )

    push_value_task >> pull_value_task