Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Pass "real" TaskInstance object with xcom methods in context to task execute functions #44481

Open
1 of 6 tasks
ashb opened this issue Nov 29, 2024 · 3 comments
Open
1 of 6 tasks
Assignees
Labels
area:core area:task-execution-interface-aip72 AIP-72: Task Execution Interface (TEI) aka Task SDK area:task-sdk

Comments

@ashb ashb converted this from a draft issue Nov 29, 2024
@ashb ashb self-assigned this Nov 29, 2024
@dosubot dosubot bot added the area:core label Nov 29, 2024
@kaxil kaxil moved this from Todo to Next 2 Weeks in AIP-72 - Task Execution Interface and SDK Nov 29, 2024
@kaxil kaxil assigned kaxil and unassigned ashb Dec 3, 2024
@kaxil kaxil moved this from Next 2 Weeks to In Progress in AIP-72 - Task Execution Interface and SDK Dec 3, 2024
@kaxil
Copy link
Member

kaxil commented Dec 10, 2024

The current values in the Context dict:

  • # NOTE: If you add to this dict, make sure to also update the following:
    # * Context in airflow/utils/context.pyi
    # * KNOWN_CONTEXT_KEYS in airflow/utils/context.py
    # * Table in docs/apache-airflow/templates-ref.rst
    context: dict[str, Any] = {
    "conf": conf,
    "dag": dag,
    "dag_run": dag_run,
    "data_interval_end": timezone.coerce_datetime(data_interval.end),
    "data_interval_start": timezone.coerce_datetime(data_interval.start),
    "outlet_events": OutletEventAccessors(),
    "ds": ds,
    "ds_nodash": ds_nodash,
    "expanded_ti_count": expanded_ti_count,
    "inlets": task.inlets,
    "inlet_events": InletEventsAccessors(task.inlets, session=session),
    "logical_date": logical_date,
    "macros": macros,
    "map_index_template": task.map_index_template,
    "outlets": task.outlets,
    "params": validated_params,
    "prev_data_interval_start_success": get_prev_data_interval_start_success(),
    "prev_data_interval_end_success": get_prev_data_interval_end_success(),
    "prev_start_date_success": get_prev_start_date_success(),
    "prev_end_date_success": get_prev_end_date_success(),
    "run_id": task_instance.run_id,
    "task": task,
    "task_instance": task_instance,
    "task_instance_key_str": f"{task.dag_id}__{task.task_id}__{ds_nodash}",
    "test_mode": task_instance.test_mode,
    "ti": task_instance,
    "triggering_asset_events": lazy_object_proxy.Proxy(get_triggering_events),
    "ts": ts,
    "ts_nodash": ts_nodash,
    "ts_nodash_with_tz": ts_nodash_with_tz,
    "var": {
    "json": VariableAccessor(deserialize_json=True),
    "value": VariableAccessor(deserialize_json=False),
    },
    "conn": ConnectionAccessor(),
  • Template ref: https://airflow.apache.org/docs/apache-airflow/stable/templates-ref.html

The plan is to:

  • Get certain key:value pairs from the API Server and pass it as a response to start the request to the API server (with TIEnterRunningPayload)
  • Augment it with key:value pairs from the Task Execution Interface (execution side of the Task SDK)

As a goal: I am planning to get as much info as possible on the Execution side itself as opposed to getting it from the API server

@kaxil
Copy link
Member

kaxil commented Dec 11, 2024

kaxil added a commit that referenced this issue Dec 12, 2024
part of #44481 . This adds some readily available context keys
kaxil added a commit to astronomer/airflow that referenced this issue Dec 12, 2024
kaxil added a commit to astronomer/airflow that referenced this issue Dec 13, 2024
kaxil added a commit to astronomer/airflow that referenced this issue Dec 13, 2024
ellisms pushed a commit to ellisms/airflow that referenced this issue Dec 13, 2024
part of apache#44481 . This adds some readily available context keys
kaxil added a commit to astronomer/airflow that referenced this issue Dec 16, 2024
kaxil added a commit to astronomer/airflow that referenced this issue Dec 16, 2024
kaxil added a commit to astronomer/airflow that referenced this issue Dec 16, 2024
kaxil added a commit to astronomer/airflow that referenced this issue Dec 16, 2024
kaxil added a commit to astronomer/airflow that referenced this issue Dec 16, 2024
Part of apache#44481

This commit augments the TI context available in the Task Execution Interface with the one from the Execution API Server.

In future PRs the following will be added:

- More methods on TI like ti.xcom_pull, ti.xcom_push etc
- Lazy fetching of connections, variables
- Verifying the "get_current_context" is working
kaxil added a commit to astronomer/airflow that referenced this issue Dec 16, 2024
Part of apache#44481

This commit augments the TI context available in the Task Execution Interface with the one from the Execution API Server.

In future PRs the following will be added:

- More methods on TI like ti.xcom_pull, ti.xcom_push etc
- Lazy fetching of connections, variables
- Verifying the "get_current_context" is working
kaxil added a commit that referenced this issue Dec 16, 2024
Part of #44481

This commit augments the TI context available in the Task Execution Interface with the one from the Execution API Server.

In future PRs the following will be added:

- More methods on TI like ti.xcom_pull, ti.xcom_push etc
- Lazy fetching of connections, variables
- Verifying the "get_current_context" is working
kaxil added a commit to astronomer/airflow that referenced this issue Dec 17, 2024
part of apache#44481 . This adds some readily available context keys

Create a minimal DagRun model and pass it to TIContext

Pass TIRunContext from API Server & split TI run endpoint
kaxil added a commit that referenced this issue Dec 19, 2024
part of #44481

- Added a minimal Connection user-facing object in Task SDK definition for use in the DAG file
- Added logic to get Connections in the context. Fixed some bugs in the way related to Connection parsing/serializing!


Now, we have following Connection related objects:
- `ConnectionResponse` is auto-generated and tightly coupled with the API schema.
- `ConnectionResult` is runtime-specific and meant for internal communication between Supervisor & Task Runner.
- `Connection` class here is where the public-facing, user-relevant aspects are exposed, hiding internal details.

**Next up**:

- Same for XCom & Variable
- Implementation of BaseHook.get_conn

Tested it with a DAG:

<img width="1711" alt="image" src="https://github.com/user-attachments/assets/14d28fb7-f6c5-4fbe-b226-46873af2d0f3" />

DAG:

```py
from __future__ import annotations

from airflow.models.baseoperator import BaseOperator
from airflow.models.dag import dag


class CustomOperator(BaseOperator):
    def execute(self, context):
        import os
        os.environ["AIRFLOW_CONN_AIRFLOW_DB"] = "sqlite:///home/airflow/airflow.db"
        task_id = context["task_instance"].task_id
        print(f"Hello World {task_id}!")
        print(context)
        print(context["conn"].airflow_db)
        assert context["conn"].airflow_db.conn_id == "airflow_db"


@dag()
def super_basic_run():
    CustomOperator(task_id="hello")


super_basic_run()

```

For case where a **connection is not found**

<img width="1435" alt="image" src="https://github.com/user-attachments/assets/7c5e0cb4-6ed4-41aa-9a57-e5641adce954" />
kaxil added a commit to astronomer/airflow that referenced this issue Dec 19, 2024
kaxil added a commit to astronomer/airflow that referenced this issue Dec 19, 2024
kaxil added a commit to astronomer/airflow that referenced this issue Dec 20, 2024
kaxil added a commit to astronomer/airflow that referenced this issue Dec 20, 2024
kaxil added a commit that referenced this issue Dec 20, 2024
It fixes the following bug

```python
{"timestamp":"2024-12-20T10:38:56.890735","logger":"task","error_detail":
[{"exc_type":"RecursionError","exc_value":"maximum recursion depth exceeded in comparison","syntax_error":null,"is_cause":false,"frames":
[
	{"filename":"/opt/airflow/task_sdk/src/airflow/sdk/execution_time/task_runner.py","lineno":382,"name":"main"},
	{"filename":"/opt/airflow/task_sdk/src/airflow/sdk/execution_time/task_runner.py","lineno":317,"name":"run"},
	{"filename":"/opt/airflow/airflow/models/baseoperator.py","lineno":378,"name":"wrapper"},
	{"filename":"/opt/airflow/providers/src/airflow/providers/standard/operators/python.py","lineno":182,"name":"execute"},
	{"filename":"/opt/airflow/task_sdk/src/airflow/sdk/definitions/baseoperator.py","lineno":660,"name":"__setattr__"},
	{"filename":"/opt/airflow/task_sdk/src/airflow/sdk/definitions/baseoperator.py","lineno":1126,"name":"_set_xcomargs_dependency"},
	{"filename":"/opt/airflow/airflow/models/xcom_arg.py","lineno":132,"name":"apply_upstream_relationship"},
	{"filename":"/opt/airflow/airflow/models/xcom_arg.py","lineno":118,"name":"iter_xcom_references"},
	{"filename":"/opt/airflow/airflow/models/xcom_arg.py","lineno":121,"name":"iter_xcom_references"},
	{"filename":"/opt/airflow/airflow/models/xcom_arg.py","lineno":118,"name":"iter_xcom_references"},
	...
```

To reproduce just run `tutorial_dag` or the following minimal dag:

```python
import pendulum

from airflow.models.dag import DAG
from airflow.providers.standard.operators.python import PythonOperator

with DAG(
    "sdk_tutorial_dag",
    schedule=None,
    start_date=pendulum.datetime(2021, 1, 1, tz="UTC"),
    catchup=False,
    tags=["example"],
) as dag:
    dag.doc_md = __doc__

    def extract(**kwargs):
        ti = kwargs["ti"]
        data_string = '{"1001": 301.27, "1002": 433.21, "1003": 502.22}'
        ti.xcom_push("order_data", data_string)

    extract_task = PythonOperator(
        task_id="extract",
        python_callable=extract,
    )

    extract_task
```

I need this fix for #45075 (part of the getting [Task Context working with AIP-72](#44481))
kaxil added a commit to astronomer/airflow that referenced this issue Dec 20, 2024
kaxil added a commit to astronomer/airflow that referenced this issue Dec 23, 2024
kaxil added a commit to astronomer/airflow that referenced this issue Dec 24, 2024
kaxil added a commit to astronomer/airflow that referenced this issue Dec 24, 2024
kaxil added a commit to astronomer/airflow that referenced this issue Dec 24, 2024
kaxil added a commit to astronomer/airflow that referenced this issue Dec 24, 2024
kaxil added a commit to astronomer/airflow that referenced this issue Dec 24, 2024
kaxil added a commit to astronomer/airflow that referenced this issue Dec 24, 2024
kaxil added a commit to astronomer/airflow that referenced this issue Dec 24, 2024
kaxil added a commit that referenced this issue Dec 24, 2024
Part of #44481

There is a lot of cleanup to do but I wanted to get a basic DAG that uses XCom working first.

Example DAG used: `tutorial_dag`

```py

from __future__ import annotations

# [START tutorial]
# [START import_module]
import json
import textwrap

import pendulum

# The DAG object; we'll need this to instantiate a DAG
from airflow.models.dag import DAG

# Operators; we need this to operate!
from airflow.providers.standard.operators.python import PythonOperator

# [END import_module]

# [START instantiate_dag]
with DAG(
    "tutorial_dag",
    # [START default_args]
    # These args will get passed on to each operator
    # You can override them on a per-task basis during operator initialization
    default_args={"retries": 2},
    # [END default_args]
    description="DAG tutorial",
    schedule=None,
    start_date=pendulum.datetime(2021, 1, 1, tz="UTC"),
    catchup=False,
    tags=["example"],
) as dag:
    # [END instantiate_dag]
    # [START documentation]
    dag.doc_md = __doc__
    # [END documentation]

    # [START extract_function]
    def extract(**kwargs):
        ti = kwargs["ti"]
        data_string = '{"1001": 301.27, "1002": 433.21, "1003": 502.22}'
        ti.xcom_push("order_data", data_string)

    # [END extract_function]

    # [START transform_function]
    def transform(**kwargs):
        ti = kwargs["ti"]
        extract_data_string = ti.xcom_pull(task_ids="extract", key="order_data")
        order_data = json.loads(extract_data_string)

        total_order_value = 0
        for value in order_data.values():
            total_order_value += value

        total_value = {"total_order_value": total_order_value}
        total_value_json_string = json.dumps(total_value)
        ti.xcom_push("total_order_value", total_value_json_string)

    # [END transform_function]

    # [START load_function]
    def load(**kwargs):
        ti = kwargs["ti"]
        total_value_string = ti.xcom_pull(task_ids="transform", key="total_order_value")
        total_order_value = json.loads(total_value_string)

        print(total_order_value)

    # [END load_function]

    # [START main_flow]
    extract_task = PythonOperator(
        task_id="extract",
        python_callable=extract,
    )
    extract_task.doc_md = textwrap.dedent(
        """\
    #### Extract task
    A simple Extract task to get data ready for the rest of the data pipeline.
    In this case, getting data is simulated by reading from a hardcoded JSON string.
    This data is then put into xcom, so that it can be processed by the next task.
    """
    )

    transform_task = PythonOperator(
        task_id="transform",
        python_callable=transform,
    )
    transform_task.doc_md = textwrap.dedent(
        """\
    #### Transform task
    A simple Transform task which takes in the collection of order data from xcom
    and computes the total order value.
    This computed value is then put into xcom, so that it can be processed by the next task.
    """
    )

    load_task = PythonOperator(
        task_id="load",
        python_callable=load,
    )
    load_task.doc_md = textwrap.dedent(
        """\
    #### Load task
    A simple Load task which takes in the result of the Transform task, by reading it
    from xcom and instead of saving it to end user review, just prints it out.
    """
    )

    extract_task >> transform_task >> load_task
```

---
<img width="1703" alt="image" src="https://github.com/user-attachments/assets/10025ef4-0410-4c2a-9bb6-1e68f51a8805" />

<img width="1710" alt="image" src="https://github.com/user-attachments/assets/201b61c0-3998-4b06-b0d4-2145120321f8" />

---
<img width="1721" alt="image" src="https://github.com/user-attachments/assets/dd9c50e3-20c5-4762-99f9-c02a8c16732e" />
@kaxil kaxil added area:task-sdk area:task-execution-interface-aip72 AIP-72: Task Execution Interface (TEI) aka Task SDK labels Dec 27, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
area:core area:task-execution-interface-aip72 AIP-72: Task Execution Interface (TEI) aka Task SDK area:task-sdk
Projects
Development

No branches or pull requests

2 participants