-
Notifications
You must be signed in to change notification settings - Fork 14.5k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Pass "real" TaskInstance object with xcom methods in context to task execute functions #44481
Open
1 of 6 tasks
Labels
area:core
area:task-execution-interface-aip72
AIP-72: Task Execution Interface (TEI) aka Task SDK
area:task-sdk
Comments
kaxil
moved this from Todo
to Next 2 Weeks
in AIP-72 - Task Execution Interface and SDK
Nov 29, 2024
kaxil
moved this from Next 2 Weeks
to In Progress
in AIP-72 - Task Execution Interface and SDK
Dec 3, 2024
The current values in the Context dict:
The plan is to:
As a goal: I am planning to get as much info as possible on the Execution side itself as opposed to getting it from the API server |
kaxil
added a commit
that referenced
this issue
Dec 12, 2024
part of #44481 . This adds some readily available context keys
kaxil
added a commit
to astronomer/airflow
that referenced
this issue
Dec 12, 2024
kaxil
added a commit
to astronomer/airflow
that referenced
this issue
Dec 13, 2024
kaxil
added a commit
to astronomer/airflow
that referenced
this issue
Dec 13, 2024
ellisms
pushed a commit
to ellisms/airflow
that referenced
this issue
Dec 13, 2024
part of apache#44481 . This adds some readily available context keys
kaxil
added a commit
to astronomer/airflow
that referenced
this issue
Dec 16, 2024
kaxil
added a commit
to astronomer/airflow
that referenced
this issue
Dec 16, 2024
kaxil
added a commit
to astronomer/airflow
that referenced
this issue
Dec 16, 2024
kaxil
added a commit
to astronomer/airflow
that referenced
this issue
Dec 16, 2024
kaxil
added a commit
to astronomer/airflow
that referenced
this issue
Dec 16, 2024
Part of apache#44481 This commit augments the TI context available in the Task Execution Interface with the one from the Execution API Server. In future PRs the following will be added: - More methods on TI like ti.xcom_pull, ti.xcom_push etc - Lazy fetching of connections, variables - Verifying the "get_current_context" is working
kaxil
added a commit
to astronomer/airflow
that referenced
this issue
Dec 16, 2024
Part of apache#44481 This commit augments the TI context available in the Task Execution Interface with the one from the Execution API Server. In future PRs the following will be added: - More methods on TI like ti.xcom_pull, ti.xcom_push etc - Lazy fetching of connections, variables - Verifying the "get_current_context" is working
kaxil
added a commit
that referenced
this issue
Dec 16, 2024
Part of #44481 This commit augments the TI context available in the Task Execution Interface with the one from the Execution API Server. In future PRs the following will be added: - More methods on TI like ti.xcom_pull, ti.xcom_push etc - Lazy fetching of connections, variables - Verifying the "get_current_context" is working
kaxil
added a commit
to astronomer/airflow
that referenced
this issue
Dec 17, 2024
part of apache#44481 . This adds some readily available context keys Create a minimal DagRun model and pass it to TIContext Pass TIRunContext from API Server & split TI run endpoint
kaxil
added a commit
that referenced
this issue
Dec 19, 2024
part of #44481 - Added a minimal Connection user-facing object in Task SDK definition for use in the DAG file - Added logic to get Connections in the context. Fixed some bugs in the way related to Connection parsing/serializing! Now, we have following Connection related objects: - `ConnectionResponse` is auto-generated and tightly coupled with the API schema. - `ConnectionResult` is runtime-specific and meant for internal communication between Supervisor & Task Runner. - `Connection` class here is where the public-facing, user-relevant aspects are exposed, hiding internal details. **Next up**: - Same for XCom & Variable - Implementation of BaseHook.get_conn Tested it with a DAG: <img width="1711" alt="image" src="https://github.com/user-attachments/assets/14d28fb7-f6c5-4fbe-b226-46873af2d0f3" /> DAG: ```py from __future__ import annotations from airflow.models.baseoperator import BaseOperator from airflow.models.dag import dag class CustomOperator(BaseOperator): def execute(self, context): import os os.environ["AIRFLOW_CONN_AIRFLOW_DB"] = "sqlite:///home/airflow/airflow.db" task_id = context["task_instance"].task_id print(f"Hello World {task_id}!") print(context) print(context["conn"].airflow_db) assert context["conn"].airflow_db.conn_id == "airflow_db" @dag() def super_basic_run(): CustomOperator(task_id="hello") super_basic_run() ``` For case where a **connection is not found** <img width="1435" alt="image" src="https://github.com/user-attachments/assets/7c5e0cb4-6ed4-41aa-9a57-e5641adce954" />
kaxil
added a commit
to astronomer/airflow
that referenced
this issue
Dec 19, 2024
kaxil
added a commit
to astronomer/airflow
that referenced
this issue
Dec 19, 2024
kaxil
added a commit
to astronomer/airflow
that referenced
this issue
Dec 20, 2024
kaxil
added a commit
to astronomer/airflow
that referenced
this issue
Dec 20, 2024
kaxil
added a commit
that referenced
this issue
Dec 20, 2024
It fixes the following bug ```python {"timestamp":"2024-12-20T10:38:56.890735","logger":"task","error_detail": [{"exc_type":"RecursionError","exc_value":"maximum recursion depth exceeded in comparison","syntax_error":null,"is_cause":false,"frames": [ {"filename":"/opt/airflow/task_sdk/src/airflow/sdk/execution_time/task_runner.py","lineno":382,"name":"main"}, {"filename":"/opt/airflow/task_sdk/src/airflow/sdk/execution_time/task_runner.py","lineno":317,"name":"run"}, {"filename":"/opt/airflow/airflow/models/baseoperator.py","lineno":378,"name":"wrapper"}, {"filename":"/opt/airflow/providers/src/airflow/providers/standard/operators/python.py","lineno":182,"name":"execute"}, {"filename":"/opt/airflow/task_sdk/src/airflow/sdk/definitions/baseoperator.py","lineno":660,"name":"__setattr__"}, {"filename":"/opt/airflow/task_sdk/src/airflow/sdk/definitions/baseoperator.py","lineno":1126,"name":"_set_xcomargs_dependency"}, {"filename":"/opt/airflow/airflow/models/xcom_arg.py","lineno":132,"name":"apply_upstream_relationship"}, {"filename":"/opt/airflow/airflow/models/xcom_arg.py","lineno":118,"name":"iter_xcom_references"}, {"filename":"/opt/airflow/airflow/models/xcom_arg.py","lineno":121,"name":"iter_xcom_references"}, {"filename":"/opt/airflow/airflow/models/xcom_arg.py","lineno":118,"name":"iter_xcom_references"}, ... ``` To reproduce just run `tutorial_dag` or the following minimal dag: ```python import pendulum from airflow.models.dag import DAG from airflow.providers.standard.operators.python import PythonOperator with DAG( "sdk_tutorial_dag", schedule=None, start_date=pendulum.datetime(2021, 1, 1, tz="UTC"), catchup=False, tags=["example"], ) as dag: dag.doc_md = __doc__ def extract(**kwargs): ti = kwargs["ti"] data_string = '{"1001": 301.27, "1002": 433.21, "1003": 502.22}' ti.xcom_push("order_data", data_string) extract_task = PythonOperator( task_id="extract", python_callable=extract, ) extract_task ``` I need this fix for #45075 (part of the getting [Task Context working with AIP-72](#44481))
kaxil
added a commit
to astronomer/airflow
that referenced
this issue
Dec 20, 2024
kaxil
added a commit
to astronomer/airflow
that referenced
this issue
Dec 23, 2024
kaxil
added a commit
to astronomer/airflow
that referenced
this issue
Dec 24, 2024
kaxil
added a commit
to astronomer/airflow
that referenced
this issue
Dec 24, 2024
kaxil
added a commit
to astronomer/airflow
that referenced
this issue
Dec 24, 2024
kaxil
added a commit
to astronomer/airflow
that referenced
this issue
Dec 24, 2024
kaxil
added a commit
to astronomer/airflow
that referenced
this issue
Dec 24, 2024
kaxil
added a commit
to astronomer/airflow
that referenced
this issue
Dec 24, 2024
kaxil
added a commit
to astronomer/airflow
that referenced
this issue
Dec 24, 2024
kaxil
added a commit
that referenced
this issue
Dec 24, 2024
Part of #44481 There is a lot of cleanup to do but I wanted to get a basic DAG that uses XCom working first. Example DAG used: `tutorial_dag` ```py from __future__ import annotations # [START tutorial] # [START import_module] import json import textwrap import pendulum # The DAG object; we'll need this to instantiate a DAG from airflow.models.dag import DAG # Operators; we need this to operate! from airflow.providers.standard.operators.python import PythonOperator # [END import_module] # [START instantiate_dag] with DAG( "tutorial_dag", # [START default_args] # These args will get passed on to each operator # You can override them on a per-task basis during operator initialization default_args={"retries": 2}, # [END default_args] description="DAG tutorial", schedule=None, start_date=pendulum.datetime(2021, 1, 1, tz="UTC"), catchup=False, tags=["example"], ) as dag: # [END instantiate_dag] # [START documentation] dag.doc_md = __doc__ # [END documentation] # [START extract_function] def extract(**kwargs): ti = kwargs["ti"] data_string = '{"1001": 301.27, "1002": 433.21, "1003": 502.22}' ti.xcom_push("order_data", data_string) # [END extract_function] # [START transform_function] def transform(**kwargs): ti = kwargs["ti"] extract_data_string = ti.xcom_pull(task_ids="extract", key="order_data") order_data = json.loads(extract_data_string) total_order_value = 0 for value in order_data.values(): total_order_value += value total_value = {"total_order_value": total_order_value} total_value_json_string = json.dumps(total_value) ti.xcom_push("total_order_value", total_value_json_string) # [END transform_function] # [START load_function] def load(**kwargs): ti = kwargs["ti"] total_value_string = ti.xcom_pull(task_ids="transform", key="total_order_value") total_order_value = json.loads(total_value_string) print(total_order_value) # [END load_function] # [START main_flow] extract_task = PythonOperator( task_id="extract", python_callable=extract, ) extract_task.doc_md = textwrap.dedent( """\ #### Extract task A simple Extract task to get data ready for the rest of the data pipeline. In this case, getting data is simulated by reading from a hardcoded JSON string. This data is then put into xcom, so that it can be processed by the next task. """ ) transform_task = PythonOperator( task_id="transform", python_callable=transform, ) transform_task.doc_md = textwrap.dedent( """\ #### Transform task A simple Transform task which takes in the collection of order data from xcom and computes the total order value. This computed value is then put into xcom, so that it can be processed by the next task. """ ) load_task = PythonOperator( task_id="load", python_callable=load, ) load_task.doc_md = textwrap.dedent( """\ #### Load task A simple Load task which takes in the result of the Transform task, by reading it from xcom and instead of saving it to end user review, just prints it out. """ ) extract_task >> transform_task >> load_task ``` --- <img width="1703" alt="image" src="https://github.com/user-attachments/assets/10025ef4-0410-4c2a-9bb6-1e68f51a8805" /> <img width="1710" alt="image" src="https://github.com/user-attachments/assets/201b61c0-3998-4b06-b0d4-2145120321f8" /> --- <img width="1721" alt="image" src="https://github.com/user-attachments/assets/dd9c50e3-20c5-4762-99f9-c02a8c16732e" />
kaxil
added
area:task-sdk
area:task-execution-interface-aip72
AIP-72: Task Execution Interface (TEI) aka Task SDK
labels
Dec 27, 2024
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Labels
area:core
area:task-execution-interface-aip72
AIP-72: Task Execution Interface (TEI) aka Task SDK
area:task-sdk
The K8s system tests are running https://github.com/apache/airflow/blob/cd5ccf0abf5027cf457b36a8a2bee397dcb538b4/airflow/example_dags/example_xcom.py and this is currently failing as we don't pass
ti
to the operatorexecute()
function.(We currently pass
task_isntance,
notti
, but even passing it asti
wouldn't work as the simple class we have for TI in TaskSDK doesn't havexcom_pull
orxcom_push
methods.Sub-tasks:
XComArg
#45112BaseOperator.do_xcom_push
is True and if task returns a value #45230ti.get_current_context
in Task SDK #45234The text was updated successfully, but these errors were encountered: