Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

AIP-72: Handle SIGTERM signal on Supervisor #44626

Open
wants to merge 2 commits into
base: main
Choose a base branch
from

Conversation

kaxil
Copy link
Member

@kaxil kaxil commented Dec 3, 2024

As part of AIP-72, this PR introduces proper signal handling for the supervisor process. The supervisor now intercepts SIGTERM signals to ensure that child processes (task processes) are terminated gracefully.

Without signal handling, terminating the supervisor process (e.g., via kill -TERM) could leave child processes running as orphaned tasks. This change ensures that when the supervisor is stopped, all managed subprocesses are also terminated cleanly.

Sample Output when Superviser receives SIGTERM with (kill -TERM 138):

2024-12-09 09:28:53 [debug    ] DAG file parsed                [task] [task] file=/files/dags/example_bash_operator.py
2024-12-09 09:28:53 [warning  ] BashOperator.execute cannot be called outside TaskInstance! [airflow.task.operators.airflow.providers.standard.operators.bash.BashOperator] [task]
2024-12-09 09:28:53 [info     ] Tmp dir root location: /tmp    [airflow.task.hooks.airflow.providers.standard.hooks.subprocess.SubprocessHook] [task]
2024-12-09 09:28:53 [info     ] Running command: ['/usr/bin/bash', '-c', 'sleep 10000000'] [airflow.task.hooks.airflow.providers.standard.hooks.subprocess.SubprocessHook] [task]
2024-12-09 09:28:53 [info     ] Output:                        [airflow.task.hooks.airflow.providers.standard.hooks.subprocess.SubprocessHook] [task]
[2024-12-09T09:28:58.945+0000] {_client.py:1026} INFO - HTTP Request: PUT http://localhost:9091/execution/task-instances/01938d9d-a579-7b8e-aa91-0a2093a318d5/heartbeat "HTTP/1.1 204 No Content"
[2024-12-09T09:29:04.176+0000] {_client.py:1026} INFO - HTTP Request: PUT http://localhost:9091/execution/task-instances/01938d9d-a579-7b8e-aa91-0a2093a318d5/heartbeat "HTTP/1.1 204 No Content"
[2024-12-09T09:29:09.229+0000] {_client.py:1026} INFO - HTTP Request: PUT http://localhost:9091/execution/task-instances/01938d9d-a579-7b8e-aa91-0a2093a318d5/heartbeat "HTTP/1.1 204 No Content"
2024-12-09 09:29:09 [error    ] Received termination signal in supervisor. Terminating watched subprocess [supervisor] process_pid=139 signal=15 supervisor_pid=138
2024-12-09 09:29:09 [debug    ] Task process exited            [supervisor] exit_code=<Negsignal.SIGTERM: -15>
2024-12-09 09:29:09 [info     ] Process exited                 [supervisor] exit_code=<Negsignal.SIGTERM: -15> pid=139 signal=SIGTERM
[2024-12-09T09:29:14.262+0000] {_client.py:1026} INFO - HTTP Request: PATCH http://localhost:9091/execution/task-instances/01938d9d-a579-7b8e-aa91-0a2093a318d5/state "HTTP/1.1 204 No Content"
2024-12-09 09:29:14 [info     ] Task finished                  [supervisor] duration=21.38433257100405 exit_code=<Negsignal.SIGTERM: -15> final_state=failed
image

vs without signal handling:

024-12-09 09:34:47 [debug    ] Loaded DAG <DAG: example_bash_op> [airflow.models.dagbag.DagBag] [task]
2024-12-09 09:34:47 [debug    ] DAG file parsed                [task] [task] file=/files/dags/example_bash_operator.py
2024-12-09 09:34:47 [warning  ] BashOperator.execute cannot be called outside TaskInstance! [airflow.task.operators.airflow.providers.standard.operators.bash.BashOperator] [task]
2024-12-09 09:34:47 [info     ] Tmp dir root location: /tmp    [airflow.task.hooks.airflow.providers.standard.hooks.subprocess.SubprocessHook] [task]
2024-12-09 09:34:47 [info     ] Running command: ['/usr/bin/bash', '-c', 'sleep 10000000'] [airflow.task.hooks.airflow.providers.standard.hooks.subprocess.SubprocessHook] [task]
2024-12-09 09:34:47 [info     ] Output:                        [airflow.task.hooks.airflow.providers.standard.hooks.subprocess.SubprocessHook] [task]
[2024-12-09T09:34:52.417+0000] {_client.py:1026} INFO - HTTP Request: PUT http://localhost:9091/execution/task-instances/01938d9d-a579-7b8e-aa91-0a2093a318d5/heartbeat "HTTP/1.1 204 No Content"
Terminated
image

A key difference is in first case the task is also set to failed state as it should while before it kept the TI as running


^ Add meaningful description above
Read the Pull Request Guidelines for more information.
In case of fundamental code changes, an Airflow Improvement Proposal (AIP) is needed.
In case of a new dependency, check compliance with the ASF 3rd Party License Policy.
In case of backwards incompatible changes please leave a note in a newsfragment file, named {pr_number}.significant.rst or {issue_number}.significant.rst, in newsfragments.

@kaxil kaxil force-pushed the handle-supervisor-signals branch 4 times, most recently from 257e1c6 to 691d183 Compare December 9, 2024 09:00
@kaxil kaxil requested a review from ashb December 9, 2024 09:08
@kaxil kaxil marked this pull request as ready for review December 9, 2024 09:08
@kaxil
Copy link
Member Author

kaxil commented Dec 9, 2024

I will follow-up with a PR to handle signals for the actual Task process

# The actual signal sent to the task process is tested in `TestWatchedSubprocessKill` class
mock_kill.assert_called_once_with(signal.SIGTERM, force=True)
mock_logger.error.assert_called_once_with(
"Received termination signal in supervisor. Terminating watched subprocess",
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I should probably also assert that a final TI patch was called, 🤔

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can only do that if I call supervise instead of just proc._setup_signal_handlers()

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Alternative is:

    def test_supervisor_signal_handling(self, mocker, monkeypatch, captured_logs):
        """Verify that the supervisor correctly handles signals and terminates the task process."""
        mocker.patch("airflow.sdk.execution_time.supervisor.MIN_HEARTBEAT_INTERVAL", 0.01)
        mock_client = MagicMock(spec=sdk_client.Client)
        subprocess_pid = os.getpid()

        def subprocess_main():
            os.kill(subprocess_pid, signal.SIGTERM)
            # It should not take 5 seconds. The process should terminate immediately
            sleep(5)

        proc = WatchedSubprocess.start(
            path=os.devnull,
            ti=TaskInstance(id=TI_ID, task_id="b", dag_id="c", run_id="d", try_number=1),
            client=mock_client,
            target=subprocess_main,
        )

        rc = proc.wait()

        assert rc == -signal.SIGTERM
        assert proc.final_state == TerminalTIState.FAILED
        assert proc._exit_code == -signal.SIGTERM

        assert {
           'signal': signal.SIGTERM,
           'process_pid': proc.pid,
           'supervisor_pid': mocker.ANY,
           'event': 'Received termination signal in supervisor. Terminating watched subprocess',
           'timestamp': mocker.ANY,
           'level': 'error',
           'logger': 'supervisor'
        } in captured_logs

        assert {
           'pid': proc.pid,
           'exit_code': -signal.SIGTERM,
           'signal': "SIGTERM",
           'event': 'Process exited',
           'timestamp': mocker.ANY,
           'level': 'info',
           'logger': 'supervisor'
        } in captured_logs

        mock_client.task_instances.finish.assert_called_once_with(
            id=TI_ID, state=TerminalTIState.FAILED, when=mocker.ANY
        )

@kaxil kaxil force-pushed the handle-supervisor-signals branch from 691d183 to 3f1a533 Compare December 9, 2024 20:17
@kaxil
Copy link
Member Author

kaxil commented Dec 9, 2024

Parking this for now to work on #44481. If someone wants to take it (and propagating signals to subprocess & its childrens) on, go for it

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

1 participant