Skip to content

Commit

Permalink
SLA Support in CLI (#16558)
Browse files Browse the repository at this point in the history
  • Loading branch information
dylanbhughes authored Jan 2, 2025
1 parent 6cbc709 commit 5389c71
Show file tree
Hide file tree
Showing 5 changed files with 668 additions and 17 deletions.
11 changes: 5 additions & 6 deletions src/prefect/_experimental/sla.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,15 +28,14 @@ class ServiceLevelAgreement(PrefectBaseModel, abc.ABC):

def set_deployment_id(self, deployment_id: UUID):
self._deployment_id = deployment_id
return self

@computed_field
@property
def owner_resource(self) -> str:
if not self._deployment_id:
raise ValueError(
"Deployment ID is not set. Please set using `set_deployment_id`."
)
return f"prefect.deployment.{self._deployment_id}"
def owner_resource(self) -> Union[str, None]:
if self._deployment_id:
return f"prefect.deployment.{self._deployment_id}"
return None


class TimeToCompletionSla(ServiceLevelAgreement):
Expand Down
86 changes: 86 additions & 0 deletions src/prefect/cli/deploy.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
from yaml.error import YAMLError

import prefect
from prefect._experimental.sla import SlaTypes
from prefect._internal.compatibility.deprecated import (
generate_deprecation_message,
)
Expand All @@ -40,6 +41,7 @@
exit_with_error,
)
from prefect.cli.root import app, is_interactive
from prefect.client.base import ServerType
from prefect.client.schemas.actions import DeploymentScheduleCreate
from prefect.client.schemas.filters import WorkerFilter
from prefect.client.schemas.objects import ConcurrencyLimitConfig
Expand Down Expand Up @@ -350,6 +352,12 @@ async def deploy(
"--prefect-file",
help="Specify a custom path to a prefect.yaml file",
),
sla: List[str] = typer.Option(
None,
"--sla",
help="Experimental: One or more SLA configurations for the deployment. May be"
" removed or modified at any time. Currently only supported on Prefect Cloud.",
),
):
"""
Create a deployment to deploy a flow from this project.
Expand Down Expand Up @@ -405,6 +413,7 @@ async def deploy(
"triggers": trigger,
"param": param,
"params": params,
"sla": sla,
}
try:
deploy_configs, actions = _load_deploy_configs_and_actions(
Expand Down Expand Up @@ -734,6 +743,14 @@ async def _run_single_deploy(

await _create_deployment_triggers(client, deployment_id, triggers)

if sla_specs := _gather_deployment_sla_definitions(
options.get("sla"), deploy_config.get("sla")
):
slas = _initialize_deployment_slas(deployment_id, sla_specs)
await _create_slas(client, slas)
else:
slas = []

app.console.print(
Panel(
f"Deployment '{deploy_config['flow_name']}/{deploy_config['name']}'"
Expand Down Expand Up @@ -791,6 +808,7 @@ async def _run_single_deploy(
push_steps=push_steps or None,
pull_steps=pull_steps or None,
triggers=trigger_specs or None,
sla=sla_specs or None,
prefect_file=prefect_file,
)
app.console.print(
Expand Down Expand Up @@ -1737,3 +1755,71 @@ def _handle_deprecated_schedule_fields(deploy_config: Dict):
)

return deploy_config


def _gather_deployment_sla_definitions(
sla_flags: Union[List[str], None], existing_slas: Union[List[Dict[str, Any]], None]
) -> Union[List[Dict[str, Any]], None]:
"""Parses SLA flags from CLI and existing deployment config in `prefect.yaml`.
Prefers CLI-provided SLAs over config in `prefect.yaml`.
"""
if sla_flags:
sla_specs = []
for s in sla_flags:
try:
if s.endswith(".yaml"):
with open(s, "r") as f:
sla_specs.extend(yaml.safe_load(f).get("sla", []))
elif s.endswith(".json"):
with open(s, "r") as f:
sla_specs.extend(json.load(f).get("sla", []))
else:
sla_specs.append(json.loads(s))
except Exception as e:
raise ValueError(f"Failed to parse SLA: {s}. Error: {str(e)}")
return sla_specs

return existing_slas


def _initialize_deployment_slas(
deployment_id: UUID, sla_specs: List[Dict[str, Any]]
) -> Union[List[SlaTypes], None]:
"""Initializes SLAs for a deployment.
Args:
deployment_id: Deployment ID.
sla_specs: SLA specification dictionaries.
Returns:
List of SLAs.
"""
slas = [pydantic.TypeAdapter(SlaTypes).validate_python(spec) for spec in sla_specs]

for sla in slas:
sla.set_deployment_id(deployment_id)

return slas


async def _create_slas(
client: "PrefectClient",
slas: List[SlaTypes],
):
if client.server_type == ServerType.CLOUD:
exceptions = []
for sla in slas:
try:
await client.create_sla(sla)
except Exception as e:
app.console.print(
f"""Failed to create SLA: {sla.get("name")}. Error: {str(e)}""",
style="red",
)
exceptions.append((f"""Failed to create SLA: {sla.get('name')}""", e))
if exceptions:
raise ValueError("Failed to create one or more SLAs.", exceptions)
else:
raise ValueError(
"SLA configuration is currently only supported on Prefect Cloud."
)
5 changes: 5 additions & 0 deletions src/prefect/client/orchestration/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -2967,6 +2967,11 @@ async def create_sla(self, sla: SlaTypes) -> UUID:
Returns:
the ID of the SLA in the backend
"""
if not sla.owner_resource:
raise ValueError(
"Deployment ID is not set. Please set using `set_deployment_id`."
)

response = await self._client.post(
"/slas/",
json=sla.model_dump(mode="json", exclude_unset=True),
Expand Down
4 changes: 4 additions & 0 deletions src/prefect/deployments/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -275,6 +275,7 @@ def _save_deployment_to_prefect_file(
push_steps: Optional[List[Dict]] = None,
pull_steps: Optional[List[Dict]] = None,
triggers: Optional[List[Dict]] = None,
sla: Optional[List[Dict]] = None,
prefect_file: Path = Path("prefect.yaml"),
):
"""
Expand Down Expand Up @@ -319,6 +320,9 @@ def _save_deployment_to_prefect_file(
if triggers and triggers != parsed_prefect_file_contents.get("triggers"):
deployment["triggers"] = triggers

if sla and sla != parsed_prefect_file_contents.get("sla"):
deployment["sla"] = sla

deployments = parsed_prefect_file_contents.get("deployments")
if deployments is None:
parsed_prefect_file_contents["deployments"] = [deployment]
Expand Down
Loading

0 comments on commit 5389c71

Please sign in to comment.