Skip to content

Commit

Permalink
Refactor Variable CRUD methods in client (#16564)
Browse files Browse the repository at this point in the history
  • Loading branch information
aaazzam authored Jan 1, 2025
1 parent 277731d commit 82f35ec
Show file tree
Hide file tree
Showing 3 changed files with 171 additions and 110 deletions.
124 changes: 14 additions & 110 deletions src/prefect/client/orchestration/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,10 @@
ArtifactCollectionClient,
ArtifactCollectionAsyncClient,
)
from prefect.client.orchestration._variables.client import (
VariableClient,
VariableAsyncClient,
)

import prefect
import prefect.exceptions
Expand Down Expand Up @@ -54,8 +58,6 @@
LogCreate,
TaskRunCreate,
TaskRunUpdate,
VariableCreate,
VariableUpdate,
WorkPoolCreate,
WorkPoolUpdate,
WorkQueueCreate,
Expand Down Expand Up @@ -89,7 +91,6 @@
Parameter,
TaskRunPolicy,
TaskRunResult,
Variable,
Worker,
WorkerMetadata,
WorkPool,
Expand Down Expand Up @@ -244,7 +245,11 @@ def get_client(
)


class PrefectClient(ArtifactAsyncClient, ArtifactCollectionAsyncClient):
class PrefectClient(
ArtifactAsyncClient,
ArtifactCollectionAsyncClient,
VariableAsyncClient,
):
"""
An asynchronous client for interacting with the [Prefect REST API](/api-ref/rest-api/).
Expand Down Expand Up @@ -2953,61 +2958,6 @@ async def get_scheduled_flow_runs_for_work_pool(
response.json()
)

async def create_variable(self, variable: VariableCreate) -> Variable:
"""
Creates an variable with the provided configuration.
Args:
variable: Desired configuration for the new variable.
Returns:
Information about the newly created variable.
"""
response = await self._client.post(
"/variables/",
json=variable.model_dump(mode="json", exclude_unset=True),
)
return Variable(**response.json())

async def update_variable(self, variable: VariableUpdate) -> None:
"""
Updates a variable with the provided configuration.
Args:
variable: Desired configuration for the updated variable.
Returns:
Information about the updated variable.
"""
await self._client.patch(
f"/variables/name/{variable.name}",
json=variable.model_dump(mode="json", exclude_unset=True),
)

async def read_variable_by_name(self, name: str) -> Optional[Variable]:
"""Reads a variable by name. Returns None if no variable is found."""
try:
response = await self._client.get(f"/variables/name/{name}")
return Variable(**response.json())
except httpx.HTTPStatusError as e:
if e.response.status_code == status.HTTP_404_NOT_FOUND:
return None
else:
raise

async def delete_variable_by_name(self, name: str) -> None:
"""Deletes a variable by name."""
try:
await self._client.delete(f"/variables/name/{name}")
except httpx.HTTPStatusError as e:
if e.response.status_code == 404:
raise prefect.exceptions.ObjectNotFound(http_exc=e) from e
else:
raise

async def read_variables(self, limit: Optional[int] = None) -> list[Variable]:
"""Reads all variables."""
response = await self._client.post("/variables/filter", json={"limit": limit})
return pydantic.TypeAdapter(list[Variable]).validate_python(response.json())

async def read_worker_metadata(self) -> dict[str, Any]:
"""Reads worker metadata stored in Prefect collection registry."""
response = await self._client.get("collections/views/aggregate-worker-metadata")
Expand Down Expand Up @@ -3436,7 +3386,11 @@ def __exit__(self, *_: object) -> NoReturn:
assert False, "This should never be called but must be defined for __enter__"


class SyncPrefectClient(ArtifactClient, ArtifactCollectionClient):
class SyncPrefectClient(
ArtifactClient,
ArtifactCollectionClient,
VariableClient,
):
"""
A synchronous client for interacting with the [Prefect REST API](/api-ref/rest-api/).
Expand Down Expand Up @@ -4316,53 +4270,3 @@ def read_block_document_by_name(
else:
raise
return BlockDocument.model_validate(response.json())

def create_variable(self, variable: VariableCreate) -> Variable:
"""
Creates an variable with the provided configuration.
Args:
variable: Desired configuration for the new variable.
Returns:
Information about the newly created variable.
"""
response = self._client.post(
"/variables/",
json=variable.model_dump(mode="json", exclude_unset=True),
)
return Variable(**response.json())

def update_variable(self, variable: VariableUpdate) -> None:
"""
Updates a variable with the provided configuration.
Args:
variable: Desired configuration for the updated variable.
Returns:
Information about the updated variable.
"""
self._client.patch(
f"/variables/name/{variable.name}",
json=variable.model_dump(mode="json", exclude_unset=True),
)

def read_variable_by_name(self, name: str) -> Optional[Variable]:
"""Reads a variable by name. Returns None if no variable is found."""
try:
response = self._client.get(f"/variables/name/{name}")
return Variable(**response.json())
except httpx.HTTPStatusError as e:
if e.response.status_code == status.HTTP_404_NOT_FOUND:
return None
else:
raise

def delete_variable_by_name(self, name: str) -> None:
"""Deletes a variable by name."""
try:
self._client.delete(f"/variables/name/{name}")
except httpx.HTTPStatusError as e:
if e.response.status_code == 404:
raise prefect.exceptions.ObjectNotFound(http_exc=e) from e
else:
raise
Empty file.
157 changes: 157 additions & 0 deletions src/prefect/client/orchestration/_variables/client.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,157 @@
from __future__ import annotations

from typing import TYPE_CHECKING

import httpx

from prefect.client.orchestration.base import BaseAsyncClient, BaseClient
from prefect.exceptions import ObjectNotFound

if TYPE_CHECKING:
from prefect.client.schemas.actions import (
VariableCreate,
VariableUpdate,
)
from prefect.client.schemas.objects import (
Variable,
)


class VariableClient(BaseClient):
def create_variable(self, variable: "VariableCreate") -> "Variable":
"""
Creates an variable with the provided configuration.
Args:
variable: Desired configuration for the new variable.
Returns:
Information about the newly created variable.
"""
response = self._client.post(
"/variables/",
json=variable.model_dump(mode="json", exclude_unset=True),
)
from prefect.client.schemas.objects import Variable

return Variable.model_validate(response.json())

def read_variable_by_name(self, name: str) -> "Variable | None":
"""Reads a variable by name. Returns None if no variable is found."""
try:
response = self.request(
"GET", "/variables/name/{name}", path_params={"name": name}
)
from prefect.client.schemas.objects import Variable

return Variable(**response.json())
except httpx.HTTPStatusError as e:
if e.response.status_code == 404:
return None
else:
raise

def read_variables(self, limit: int | None = None) -> list["Variable"]:
"""Reads all variables."""
response = self.request("POST", "/variables/filter", json={"limit": limit})
from prefect.client.schemas.objects import Variable

return Variable.model_validate_list(response.json())

def update_variable(self, variable: "VariableUpdate") -> None:
"""
Updates a variable with the provided configuration.
Args:
variable: Desired configuration for the updated variable.
Returns:
Information about the updated variable.
"""
self._client.patch(
f"/variables/name/{variable.name}",
json=variable.model_dump(mode="json", exclude_unset=True),
)
return None

def delete_variable_by_name(self, name: str) -> None:
"""Deletes a variable by name."""
try:
self.request(
"DELETE",
"/variables/name/{name}",
path_params={"name": name},
)
return None
except httpx.HTTPStatusError as e:
if e.response.status_code == 404:
raise ObjectNotFound(http_exc=e) from e
else:
raise


class VariableAsyncClient(BaseAsyncClient):
async def create_variable(self, variable: "VariableCreate") -> "Variable":
"""Creates a variable with the provided configuration."""
response = await self._client.post(
"/variables/",
json=variable.model_dump(mode="json", exclude_unset=True),
)
from prefect.client.schemas.objects import Variable

return Variable.model_validate(response.json())

async def read_variable_by_name(self, name: str) -> "Variable | None":
"""Reads a variable by name. Returns None if no variable is found."""
try:
response = await self.request(
"GET",
"/variables/name/{name}",
path_params={"name": name},
)
from prefect.client.schemas.objects import Variable

return Variable.model_validate(response.json())
except httpx.HTTPStatusError as e:
if e.response.status_code == 404:
return None
else:
raise

async def read_variables(self, limit: int | None = None) -> list["Variable"]:
"""Reads all variables."""
response = await self.request(
"POST", "/variables/filter", json={"limit": limit}
)
from prefect.client.schemas.objects import Variable

return Variable.model_validate_list(response.json())

async def update_variable(self, variable: "VariableUpdate") -> None:
"""
Updates a variable with the provided configuration.
Args:
variable: Desired configuration for the updated variable.
Returns:
Information about the updated variable.
"""
await self.request(
"PATCH",
"/variables/name/{name}",
path_params={"name": variable.name},
json=variable.model_dump(mode="json", exclude_unset=True),
)
return None

async def delete_variable_by_name(self, name: str) -> None:
"""Deletes a variable by name."""
try:
await self.request(
"DELETE",
"/variables/name/{name}",
path_params={"name": name},
)
except httpx.HTTPStatusError as e:
if e.response.status_code == 404:
raise ObjectNotFound(http_exc=e) from e
else:
raise

0 comments on commit 82f35ec

Please sign in to comment.