Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[REP] Virtual Cluster #49

Open
wants to merge 13 commits into
base: main
Choose a base branch
from
Open

[REP] Virtual Cluster #49

wants to merge 13 commits into from

Conversation

jjyao
Copy link
Contributor

@jjyao jjyao commented Dec 17, 2023

Signed-off-by: Jiajun Yao <[email protected]>
Signed-off-by: Jiajun Yao <[email protected]>
reps/2023-12-14-virtual-cluster.md Outdated Show resolved Hide resolved
reps/2023-12-14-virtual-cluster.md Outdated Show resolved Hide resolved
reps/2023-12-14-virtual-cluster.md Outdated Show resolved Hide resolved
Signed-off-by: Jiajun Yao <[email protected]>
Signed-off-by: Jiajun Yao <[email protected]>
Signed-off-by: Jiajun Yao <[email protected]>
Signed-off-by: Jiajun Yao <[email protected]>
Signed-off-by: Jiajun Yao <[email protected]>
enum SchedulingPolicy {
PACK
SPREAD
STRICT_SPREAD
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It'd be good to specify/give examples for how this will support other non-placement group strategies, like NodeAffinity.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This won't. This only supports pg strategies.

// If specified, limit the consumption of these resources to
// the specified values.
// If not specified, the default value is infinite.
map<string, double> flexible_resource_max
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It'd be good to specify what the placement strategy will be between the flexible resource group and the fixed size nodes will be (i.e. that we will only support packing virtual nodes for now).

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think the strategy is within flexible resources: Ray will create as few flexible virtual nodes as possible for the given amount of flexible resources.

Do we really need a strategy between flexible nodes and fixed size nodes?

reps/2023-12-14-virtual-cluster.md Outdated Show resolved Hide resolved
reps/2023-12-14-virtual-cluster.md Show resolved Hide resolved

With the introduction of virtual clusters, every Ray job runs in its own virtual cluster and only has access to resources inside that virtual cluster. Each virtual cluster has a spec that defines the min and max resources of the cluster. Min resources are minimal resources required for the job to run and they are atomically reserved for gang scheduling. If min resources cannot be reserved when there are not enough available resources, the job will be queued. With job queueing, we can implement different policies such as FIFO or priority-based queueing. Max resources are the autoscaling limit of the virtual cluster and the maximal resources can be used by the job.

Virtual clusters can be nested and a Ray job can create sub-clusters to isolate separate parts of its application workload. For example, a Tune grid sweep job can create a sub-cluster for each of its nested Train workload. These possibly nested virtual clusters form a tree where the root is the entire physical cluster.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it's a bit strange that the root is a physical cluster. Shouldn't it be another virtual cluster?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You can think of it's a virtual cluster that represents the entire physical cluster: each virtual node represents a physical node.

Signed-off-by: Jiajun Yao <[email protected]>
Signed-off-by: Jiajun Yao <[email protected]>
@jjyao jjyao marked this pull request as ready for review December 24, 2023 21:03
// If specified, ensure we have at least this min amount
// of resources before starting the cluster.
// If not specified, the default value is 0.
map<string, double> flexible_resource_min
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
map<string, double> flexible_resource_min
map<string, double> flex_resource_min

nit: "flexible" is too long/hard to spell, prefer to abbreviate in these cases

Copy link
Contributor

@scv119 scv119 Jan 1, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should we just name it min_resources, max_resources?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should we just name it min_resources, max_resources?

The semantics is different. flex_resource_min doesn't include fixed node resources while min_resources should include everything.

reps/2023-12-14-virtual-cluster.md Outdated Show resolved Hide resolved

```
message VirtualCluster {
// A virtual cluster consits of flexible resources and fixed size resources.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should add map<string, string> labels that applies to both flex and fixed nodes.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What's the use case for adding labels to flex nodes?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You need labels in order to schedule tasks within an specific virtual cluster in the first place, right? This is also for consistency with the physical node interface.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yea, flex nodes also have system labels (e.g. vc_id). I'm just not sure whether we need to expose a way for users to set custom labels for flex nodes.

So you are suggesting we have map<string, string> cluster_level_custom_labels that applies to all virtual nodes of a virtual cluster. And later on fixed nodes can also set additional custom labels individually?

Or we have map<string, string> flex_nodes_custom_labels that only applies to flex nodes.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see, if we have automatically added labels for vc_id then I agree this is more of a P1.

reps/2023-12-14-virtual-cluster.md Outdated Show resolved Hide resolved
reps/2023-12-14-virtual-cluster.md Show resolved Hide resolved
reps/2023-12-14-virtual-cluster.md Outdated Show resolved Hide resolved
ray.init(virtual_cluster=VirtualCluster(flexible_resource_min={"CPU": 1}, flexible_resource_max={"CPU": 8}))

# The job uses 1 A100 GPU.
ray.init(virtual_cluster=VirtualCluster(fixed_size_nodes=[FixedSizeNode(resources={"GPU": 1}, parent_node_selector={"accelerator_type": In("A100")})]))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what happens if there is no such node exactly matches the fixed size node resource? will we round up, shard the node, or fail?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

a follow up question is what happens if the required_nodes conflicts with flex_resource_max?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A physical node can be split into multiple fixed size virtual nodes.

a follow up question is what happens if the required_nodes conflicts with flex_resource_max?

flex_resource_max doesn't include fixed size node resources. Total virtual cluster resource is flex resource + fixed resource.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

would flex_resource_max -> additional_resource make sense?

// together with flexible_resource_min
// that will be atomically reserved when the
// virtual cluster is created.
repeated FixedSizeNodes fixed_size_nodes
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

would min_nodes be a better name?

@stephanie-wang stephanie-wang self-assigned this Jan 2, 2024

### General Motivation

While users can run multiple Ray jobs within a single Ray cluster simultaneously, there is currently no sufficient isolation between jobs. Users also have no way to specify cross-job policies such as fairness. For example, if you run two Ray Tune jobs on the same cluster, they will both try to use all the cluster resources and compete for cluster resources in an ad-hoc mechanism, without notion of isolation, fairness, or priority. In order to properly support multi-tenancy, we need a machnism to share cluster resources between different jobs with isolation and certain cross-job policies. While [placement group](https://docs.ray.io/en/releases-2.9.0/ray-core/scheduling/placement-group.html) can solve some of the issues, its lack of nesting and autoscaling support makes it unusable for certain workloads. Virtual cluster is the mechanism we propose here and it supports isolation, nesting, and autoscaling.
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should add a concrete use case to explain why nesting is useful.

reps/2023-12-14-virtual-cluster.md Show resolved Hide resolved
jjyao added 4 commits January 10, 2024 06:26
Signed-off-by: Jiajun Yao <[email protected]>
Signed-off-by: Jiajun Yao <[email protected]>
Signed-off-by: Jiajun Yao <[email protected]>
Signed-off-by: Jiajun Yao <[email protected]>

### General Motivation

While users can run multiple Ray jobs within a single Ray cluster simultaneously, there is currently no sufficient isolation between jobs. Users also have no way to specify cross-job policies such as fairness. For example, if you run two Ray Tune jobs on the same cluster, they will both try to use all the cluster resources and compete for cluster resources in an ad-hoc mechanism, without notion of isolation, fairness, or priority. In order to properly support multi-tenancy, we need a machnism to share cluster resources between different jobs with isolation and certain cross-job policies. While [placement group](https://docs.ray.io/en/releases-2.9.0/ray-core/scheduling/placement-group.html) can solve some of the issues, its lack of nesting and autoscaling support makes it unusable for certain workloads. Virtual cluster is the mechanism we propose here and it supports (logical resources) isolation, nesting, and autoscaling.
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it enough for a Virtual cluster to support only logical isolation? Whether it makes more sense for virtual cluster to support real resource isolation?

Virtual clusters with different min and max resources are autoscalable. When scaling up, virtual clusters will try to borrow more resources from their parent clusters. If their parents have available resources, the scaling up is instant. Otherwise, parents will try to borrow resources from their parents recursively and eventually this may cause the upscale of the physical cluster. When scaling down, virtual clusters will return resources to their parents recursively and eventually this may cause the downscale of the physical cluster. To ensure fairness and avoid starvation, the borrowed resources are associated with leases. Once leases expire and the tasks or actors that are currently using these resources finish, the borrowed resources will be returned back to their parent clusters so that parent clusters can lend these resources to potentially other child clusters that also need to be upscaled.

Virtual nodes of a virtual cluster can be either fixed size or flexible/resizable. For a single virtual cluster, there can be multiple fixed-size virtual nodes but at most one flexible virtual node on a single physical node. The upscaling of a virtual cluster can be achieved by adding new virtual nodes or scaling up an existing flexible virtual node.

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Whether preemption is supported ?

Copy link
Contributor

@justinvyu justinvyu left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🚀

task.options(
scheduling_strategy=PlacementGroupSchedulingStrategy(placement_group=pg)).remote()

# Create a virtual cluster with two virtual nodes that are packed
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Make it clear that these are 2 ways of achieving the same thing.

# Existing Method: Placement Groups

...

# New Method with Virtual Cluster

...

Comment on lines +247 to +262
ray.init(virtual_cluster=VirtualCluster(
fixed_size_nodes=[
FixedSizeNodes(
nodes=[
FixedSizeNode(resources={"GPU": 1}, labels={"bundle_index": "0"}),
FixedSizeNode(resources={"GPU": 1}, labels={"bundle_index": "1"})
],
scheduling_policy=STRICT_SPREAD
)
]))

actors = []
for i in range(2):
actors.append(Actor.options(
num_gpus=1,
node_labels={"bundle_index": In(str(i))}).remote())
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Previously, we're able to wait on a placement group to be ready:

pg = placement_group(...)  # non-blocking

ray.get(pg.ready())  # blocking

Is the new API going to block on ray.init until the fixed nodes are assigned to the right physical locations?

ray.init(vc=VirtualCluster())  # hangs until all fixed nodes are ready.

Comment on lines +286 to +287
flex_resource_min={"CPU": 1},
flex_resource_max={"CPU": 10}))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So, the entire cluster can use up to 10 CPUs, so both trials and both workers within each trainer are possibly competing over the same CPUs?

At the moment, CPU=1 is the default, so would you need to set a min_cpus and max_cpus for each node?

For example, how do I get this setup:

Trial 1:
    Worker A: 1 GPU, 1-10 CPUs for last-mile preprocessing
    Worker B: same^
Trial 2:
    Worker A: same^
    Worker B: same^
SplitCoordinator (and other Ray Data tasks): 1-10 CPUs, 1-4 GPUs depending on the autoscaling of the Ray Data pipeline stages

The Ray Data components are currently detached from the Ray Train/Tune placement group. How does placement_group_capture_child_tasks=None translate to VirtualClusters?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also, would physical CPU isolation still be left to be controlled by the user?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Trial 1: {fixed_nodes={2 GPU nodes}, flex={2..20 CPUs}}
Trial 2: {fixed_nodes={2 GPU nodes}, flex={2..20 CPUs}}

For the SplitCoordinator, I'm assuming you mean there is some separate preprocessing job that's run here before kicking off the training trials? In that case, you can put that in a another VC with {flex={1-10 CPU, 1-4 GPU}}.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The SplitCoordinator I'm mentioning is the thing executing the ray data + ray train streaming execution that gets run in the background during the training execution.

https://github.com/ray-project/ray/blob/master/python/ray/data/_internal/iterator/stream_split_iterator.py#L47

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see, then those should be part of the trial cluster, so this would be a reasonable config:

Trial 1: {fixed_nodes={2 GPU nodes}, flex={2..20 CPUs}}
Trial 2: {fixed_nodes={2 GPU nodes}, flex={2..20 CPUs}}

For more predictable performance you'd probably also want to assign a fixed number of CPUs instead of allowing 2..20 for flex, so probably {fixed_nodes={2 GPU nodes}, flex={10 CPUs}} would be the way to go.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The Ray Data components are currently detached from the Ray Train/Tune placement group. How does [placement_group_capture_child_tasks=None](https://docs.ray.io/en/latest/ray-core/scheduling/placement-group.html#advanced-child-tasks-and-actors) translate to VirtualClusters?

To answer this question directly, by design we don't need to support detached resources for virtual clusters. The data tasks can use flex resources instead.

1. Deprecate the placement group API and use virtual cluster API directly.
2. Keep the placement group API and only change the internal implementation to use virtual cluster.

#### Examples
Copy link
Contributor

@justinvyu justinvyu Jan 17, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Another example to consider:

ML platform with priority-based pre-emption of distributed training workers

Overall Ray Cluster: 16 GPUs, 128 CPUs
    Job 0 (P0): min_gpus=4, max_gpus=10, cpus=32
        Worker * 10: each with 1 GPU
    Job 1 (P1): min_gpus=2, max_gpus=8, cpus=1
        Worker * 3: each with 1 GPU
    Job 2 (P1): min_gpus=1, max_gpus=4, cpus=1
        Worker * 3: each with 1 GPU
    Job 3 (P2): min_gpus=1
        Pending for something to free up. 

It might make more sense to switch these priorities to GPU time in %. This arbitrary upscale/downscale of each independent training job (in its own virtual cluster) depends on elastic distributed training from Ray Train / another library.

This might be a little too much scope for the current REP, but I would like a future REP to enable this kind of pre-emption policy for each "parent" virtual cluster. The parent VirtualCluster should be able to drain some nodes of the child clusters to reclaim resources, as a form of "hard downscaling", rather than just waiting for task completion.

@larrylian
Copy link
Contributor

@jjyao
The two prerequisite tasks that need to be completed are:

  1. Move the NodeLabelSchedulingStrategy to node_label.
  2. Support dynamic Node Label.
    Should we consider starting the design for these now?

scheduling_policy=STRICT_SPREAD
)
])
with vc:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For nested virtualcluster case:

  1. Actor 1 create vc_1 , Actor 2 want to create actor_3 in vc_1. This case actor 2 don't have vc_1 handle. so can't use with vc:
    So i prefer virtual cluster hase name , options has a virtual_cluster param.
Actor.options(virtual_cluster="vc_1").rmeote()

// If specified, ensure we have at least this min amount
// of resources before starting the cluster.
// If not specified, the default value is 0.
map<string, double> flex_resource_min
Copy link

@MissiontoMars MissiontoMars Jan 25, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can virtual nodes use custom resources or node labels on physical nodes in addition to CPU/GPU resources?

@larrylian
Copy link
Contributor

larrylian commented Jan 25, 2024 via email

resources={"GPU": 1},
parent_node_selector={"accelerator_type": In("A100")}
)
]))

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Earlier, job queuing was mentioned, FIFO/priority-based queueing, which means that multiple jobs can run simultaneously on a virtual cluster. In this case, how can multiple ray.init calls specify the use of the same virtual cluster?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

9 participants