Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add arun_deployment #16605

Open
wants to merge 2 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 7 additions & 1 deletion flows/deploy.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,14 +2,17 @@
import subprocess
import sys
from pathlib import Path
from uuid import UUID

import anyio

import prefect
from prefect import Flow
from prefect.client.schemas.objects import FlowRun
from prefect.deployments import run_deployment


async def read_flow_run(flow_run_id):
async def read_flow_run(flow_run_id: UUID):
async with prefect.get_client() as client:
return await client.read_flow_run(flow_run_id)

Expand All @@ -26,6 +29,7 @@ def main():
source="https://github.com/PrefectHQ/prefect-recipes.git",
entrypoint="flows-starter/hello.py:hello",
)
assert isinstance(flow_instance, Flow)

flow_instance.deploy(
name="demo-deployment",
Expand All @@ -34,6 +38,7 @@ def main():
)

flow_run = run_deployment("hello/demo-deployment", timeout=0)
assert isinstance(flow_run, FlowRun)

subprocess.check_call(
[
Expand All @@ -47,6 +52,7 @@ def main():
)

flow_run = anyio.run(read_flow_run, flow_run.id)
assert flow_run.state is not None
assert flow_run.state.is_completed(), flow_run.state

finally:
Expand Down
12 changes: 6 additions & 6 deletions src/prefect/_internal/compatibility/async_dispatch.py
Original file line number Diff line number Diff line change
Expand Up @@ -63,10 +63,10 @@ def _is_acceptable_callable(

def async_dispatch(
async_impl: Union[
Callable[P, Coroutine[Any, Any, R]],
"classmethod[type[Any], P, Coroutine[Any, Any, R]]",
Callable[..., Coroutine[Any, Any, R]],
"classmethod[type[Any], ..., Coroutine[Any, Any, R]]",
Comment on lines +66 to +67
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is because the sync and async versions of run_deployment need to take different types of clients. i could make them both Optional[Any] but thought that would be more unclear and this way it pushes folks who care about typing to use the explicit async one

],
) -> Callable[[Callable[P, R]], Callable[P, Union[R, Coroutine[Any, Any, R]]]]:
) -> Callable[[Callable[..., R]], Callable[..., Union[R, Coroutine[Any, Any, R]]]]:
"""
Decorator that dispatches to either sync or async implementation based on context.

Expand All @@ -76,11 +76,11 @@ def async_dispatch(
if not _is_acceptable_callable(async_impl):
raise TypeError("async_impl must be an async function")
if isinstance(async_impl, classmethod):
async_impl = cast(Callable[P, Coroutine[Any, Any, R]], async_impl.__func__)
async_impl = cast(Callable[..., Coroutine[Any, Any, R]], async_impl.__func__)

def decorator(
sync_fn: Callable[P, R],
) -> Callable[P, Union[R, Coroutine[Any, Any, R]]]:
sync_fn: Callable[..., R],
) -> Callable[..., Union[R, Coroutine[Any, Any, R]]]:
@wraps(sync_fn)
def wrapper(
*args: P.args,
Expand Down
6 changes: 3 additions & 3 deletions src/prefect/blocks/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -1351,7 +1351,7 @@ async def delete(

await client.delete_block_document(block_document.id)

def __new__(cls: Type[Self], **kwargs) -> Self:
def __new__(cls: Type[Self], **kwargs: Any) -> Self:
"""
Create an instance of the Block subclass type if a `block_type_slug` is
present in the data payload.
Expand Down Expand Up @@ -1438,8 +1438,8 @@ def model_dump(
self,
*,
mode: Union[Literal["json", "python"], str] = "python",
include: "IncEx" = None,
exclude: "IncEx" = None,
include: Optional["IncEx"] = None,
exclude: Optional["IncEx"] = None,
context: Optional[Dict[str, Any]] = None,
by_alias: bool = False,
exclude_unset: bool = False,
Expand Down
5 changes: 3 additions & 2 deletions src/prefect/deployments/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,18 +3,19 @@


if TYPE_CHECKING:
from .flow_runs import run_deployment
from .flow_runs import arun_deployment, run_deployment
from .base import initialize_project
from .runner import deploy

_public_api: dict[str, tuple[str, str]] = {
"initialize_project": (__spec__.parent, ".base"),
"run_deployment": (__spec__.parent, ".flow_runs"),
"arun_deployment": (__spec__.parent, ".flow_runs"),
"deploy": (__spec__.parent, ".runner"),
}

# Declare API for type-checkers
__all__ = ["initialize_project", "deploy", "run_deployment"]
__all__ = ["initialize_project", "deploy", "run_deployment", "arun_deployment"]


def __getattr__(attr_name: str) -> object:
Expand Down
220 changes: 215 additions & 5 deletions src/prefect/deployments/flow_runs.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import time
from datetime import datetime
from typing import TYPE_CHECKING, Any, Iterable, Optional, Union
from uuid import UUID
Expand All @@ -6,6 +7,8 @@
import pendulum

import prefect
from prefect._internal.compatibility.async_dispatch import async_dispatch
from prefect.client.orchestration import PrefectClient, SyncPrefectClient, get_client
from prefect.client.schemas import FlowRun
from prefect.client.utilities import inject_client
from prefect.context import FlowRunContext, TaskRunContext
Expand All @@ -16,11 +19,11 @@
from prefect.telemetry.run_telemetry import (
LABELS_TRACEPARENT_KEY,
)
from prefect.utilities.asyncutils import sync_compatible
from prefect.utilities.asyncutils import run_coro_as_sync
from prefect.utilities.slugify import slugify
from prefect.utilities.timeout import timeout as timeout_context

if TYPE_CHECKING:
from prefect.client.orchestration import PrefectClient
from prefect.client.schemas.objects import FlowRun

prefect.client.schemas.StateCreate.model_rebuild(
Expand All @@ -32,9 +35,8 @@
logger = get_logger(__name__)


@sync_compatible
@inject_client
async def run_deployment(
async def arun_deployment(
name: Union[str, UUID],
client: Optional["PrefectClient"] = None,
parameters: Optional[dict[str, Any]] = None,
Expand All @@ -49,7 +51,7 @@ async def run_deployment(
job_variables: Optional[dict[str, Any]] = None,
) -> "FlowRun":
"""
Create a flow run for a deployment and return it after completion or a timeout.
Asynchronous method to create a flow run for a deployment and return it after completion or a timeout.

By default, this function blocks until the flow run finishes executing.
Specify a timeout (in seconds) to wait for the flow run to execute before
Expand Down Expand Up @@ -192,3 +194,211 @@ async def run_deployment(
await anyio.sleep(poll_interval)

return flow_run


def _ensure_call_client_method_sync(
client: Union["PrefectClient", "SyncPrefectClient"],
method: str,
*args: Any,
**kwargs: Any,
) -> Any:
"""
Ensure that a client method is called synchronously.

If the client is an async client, we need to run the method synchronously.
"""
maybe_coro = getattr(client, method)(*args, **kwargs)
if isinstance(client, PrefectClient):
# TODO: this is not ideal, but just use a sync client context for now
# since we can't run async client methods with run_coro_as_sync
# due to event loop binding issues
from prefect.context import SyncClientContext

with SyncClientContext.get_or_create() as client_ctx:
sync_client = client_ctx.client
return getattr(sync_client, method)(*args, **kwargs)
elif isinstance(client, SyncPrefectClient):
return maybe_coro
else:
raise ValueError(f"Invalid client type: {type(client)}")


@async_dispatch(arun_deployment)
def run_deployment(
name: Union[str, UUID],
client: Optional["PrefectClient"] = None,
parameters: Optional[dict[str, Any]] = None,
scheduled_time: Optional[datetime] = None,
flow_run_name: Optional[str] = None,
timeout: Optional[float] = None,
poll_interval: Optional[float] = 5,
tags: Optional[Iterable[str]] = None,
idempotency_key: Optional[str] = None,
work_queue_name: Optional[str] = None,
as_subflow: Optional[bool] = True,
job_variables: Optional[dict[str, Any]] = None,
) -> "FlowRun":
"""
Create a flow run for a deployment and return it after completion or a timeout.

By default, this function blocks until the flow run finishes executing.
Specify a timeout (in seconds) to wait for the flow run to execute before
returning flow run metadata. To return immediately, without waiting for the
flow run to execute, set `timeout=0`.

Note that if you specify a timeout, this function will return the flow run
metadata whether or not the flow run finished executing.

If called within a flow or task, the flow run this function creates will
be linked to the current flow run as a subflow. Disable this behavior by
passing `as_subflow=False`.

Args:
name: The deployment id or deployment name in the form:
`"flow name/deployment name"`
parameters: Parameter overrides for this flow run. Merged with the deployment
defaults.
scheduled_time: The time to schedule the flow run for, defaults to scheduling
the flow run to start now.
flow_run_name: A name for the created flow run
timeout: The amount of time to wait (in seconds) for the flow run to
complete before returning. Setting `timeout` to 0 will return the flow
run metadata immediately. Setting `timeout` to None will allow this
function to poll indefinitely. Defaults to None.
poll_interval: The number of seconds between polls
tags: A list of tags to associate with this flow run; tags can be used in
automations and for organizational purposes.
idempotency_key: A unique value to recognize retries of the same run, and
prevent creating multiple flow runs.
work_queue_name: The name of a work queue to use for this run. Defaults to
the default work queue for the deployment.
as_subflow: Whether to link the flow run as a subflow of the current
flow or task run.
job_variables: A dictionary of dot delimited infrastructure overrides that
will be applied at runtime; for example `env.CONFIG_KEY=config_value` or
`namespace='prefect'`
"""
_client = client or get_client(sync_client=True)

if timeout is not None and timeout < 0:
raise ValueError("`timeout` cannot be negative")

if scheduled_time is None:
scheduled_time = pendulum.now("UTC")

parameters = parameters or {}

deployment_id = None

if isinstance(name, UUID):
deployment_id = name
else:
try:
deployment_id = UUID(name)
except ValueError:
pass

if deployment_id:
deployment = _ensure_call_client_method_sync(
_client, "read_deployment", deployment_id=deployment_id
)
else:
deployment = _ensure_call_client_method_sync(
_client, "read_deployment_by_name", name
)

flow_run_ctx = FlowRunContext.get()
task_run_ctx = TaskRunContext.get()
if as_subflow and (flow_run_ctx or task_run_ctx):
# TODO: this logic can likely be simplified by using `Task.create_run`
from prefect.utilities._engine import dynamic_key_for_task_run
from prefect.utilities.engine import collect_task_run_inputs

# This was called from a flow. Link the flow run as a subflow.
task_inputs = {
k: run_coro_as_sync(collect_task_run_inputs(v))
for k, v in parameters.items()
}

if deployment_id:
flow = _ensure_call_client_method_sync(
_client, "read_flow", deployment.flow_id
)
deployment_name = f"{flow.name}/{deployment.name}"
else:
deployment_name = name

# Generate a task in the parent flow run to represent the result of the subflow
dummy_task = Task(
name=deployment_name,
fn=lambda: None,
version=deployment.version,
)
# Override the default task key to include the deployment name
dummy_task.task_key = f"{__name__}.run_deployment.{slugify(deployment_name)}"
flow_run_id = (
flow_run_ctx.flow_run.id
if flow_run_ctx
else task_run_ctx.task_run.flow_run_id
)
dynamic_key = (
dynamic_key_for_task_run(flow_run_ctx, dummy_task)
if flow_run_ctx
else task_run_ctx.task_run.dynamic_key
)
parent_task_run = _ensure_call_client_method_sync(
_client,
"create_task_run",
task=dummy_task,
flow_run_id=flow_run_id,
dynamic_key=dynamic_key,
task_inputs=task_inputs,
state=Pending(),
)
parent_task_run_id = parent_task_run.id
else:
parent_task_run_id = None

if flow_run_ctx and flow_run_ctx.flow_run:
traceparent = flow_run_ctx.flow_run.labels.get(LABELS_TRACEPARENT_KEY)
else:
traceparent = None

trace_labels = {LABELS_TRACEPARENT_KEY: traceparent} if traceparent else {}

flow_run = _ensure_call_client_method_sync(
_client,
"create_flow_run_from_deployment",
deployment.id,
parameters=parameters,
state=Scheduled(scheduled_time=scheduled_time),
name=flow_run_name,
tags=tags,
idempotency_key=idempotency_key,
parent_task_run_id=parent_task_run_id,
work_queue_name=work_queue_name,
job_variables=job_variables,
labels=trace_labels,
)

flow_run_id = flow_run.id

if timeout == 0:
return flow_run

try:
with timeout_context(timeout):
while True:
flow_run = _ensure_call_client_method_sync(
_client, "read_flow_run", flow_run_id
)
flow_state = flow_run.state
if flow_state and flow_state.is_final():
return flow_run
time.sleep(poll_interval)
except TimeoutError:
logger.warning(
f"Flow run {flow_run_id} did not complete within {timeout} seconds"
)

return flow_run
Loading
Loading