Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[WIP] Remove old lineage stuff #45260

Draft
wants to merge 9 commits into
base: main
Choose a base branch
from

Conversation

jason810496
Copy link
Contributor

closes: #44983


^ Add meaningful description above
Read the Pull Request Guidelines for more information.
In case of fundamental code changes, an Airflow Improvement Proposal (AIP) is needed.
In case of a new dependency, check compliance with the ASF 3rd Party License Policy.
In case of backwards incompatible changes please leave a note in a newsfragment file, named {pr_number}.significant.rst or {issue_number}.significant.rst, in newsfragments.

@jason810496
Copy link
Contributor Author

When attempting to remove the lineage logic from the core module, I noticed that it causes failures in tests related to OpenLineage listener capturing hook-level lineage (#41482).

For example, removing apply_lineage and prepare_lineage from BaseOperator:

@prepare_lineage
def pre_execute(self, context: Any):
"""Execute right before self.execute() is called."""
if self._pre_execute_hook is None:
return
ExecutionCallableRunner(
self._pre_execute_hook,
context_get_outlet_events(context),
logger=self.log,
).run(context)
def execute(self, context: Context) -> Any:
"""
Derive when creating an operator.
Context is the same dictionary used as when rendering jinja templates.
Refer to get_template_context for more context.
"""
raise NotImplementedError()
@apply_lineage
def post_execute(self, context: Any, result: Any = None):
"""
Execute right after self.execute() is called.
It is passed the execution context and any results returned by the operator.
"""
if self._post_execute_hook is None:
return
ExecutionCallableRunner(
self._post_execute_hook,
context_get_outlet_events(context),
logger=self.log,
).run(context, result)

Results in the following test failure:

FAILED providers/tests/openlineage/extractors/test_manager.py::test_extractor_manager_gets_data_from_pythonoperator - assert 0 == 1
 +  where 0 = len([])
 +    where [] = HookLineage(inputs=[], outputs=[]).outputs

def test_extractor_manager_gets_data_from_pythonoperator(session, dag_maker, hook_lineage_collector):

It seems OpenLineage is still coupled with the lineage module and might need to be moved to compact.lineage for now (or to the OpenLineage module in a future PR ).

After some experimentation, implementing an on_load callback in the OpenLineageProviderPlugin to monkey-patch the core module at runtime prevents the OpenLineage test failures, even with the lineage module removed from the core.

Based on https://airflow.apache.org/docs/apache-airflow/stable/authoring-and-scheduling/plugins.html#interface

I’m not sure if this is a suitable long-term solution for maintaining OpenLineage compatibility while cleaning up the lineage module ?

cc @Lee-W @uranusjr

@jason810496 jason810496 force-pushed the remove-old-lineage-stuff branch from 7975d5a to 727a6c1 Compare December 28, 2024 13:39
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

Remove old lineage stuff
1 participant