-
Notifications
You must be signed in to change notification settings - Fork 29
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[REP] Virtual Cluster #49
base: main
Are you sure you want to change the base?
Conversation
Signed-off-by: Jiajun Yao <[email protected]>
Signed-off-by: Jiajun Yao <[email protected]>
Signed-off-by: Jiajun Yao <[email protected]>
Signed-off-by: Jiajun Yao <[email protected]>
Signed-off-by: Jiajun Yao <[email protected]>
Signed-off-by: Jiajun Yao <[email protected]>
Signed-off-by: Jiajun Yao <[email protected]>
enum SchedulingPolicy { | ||
PACK | ||
SPREAD | ||
STRICT_SPREAD |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It'd be good to specify/give examples for how this will support other non-placement group strategies, like NodeAffinity.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This won't. This only supports pg strategies.
reps/2023-12-14-virtual-cluster.md
Outdated
// If specified, limit the consumption of these resources to | ||
// the specified values. | ||
// If not specified, the default value is infinite. | ||
map<string, double> flexible_resource_max |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It'd be good to specify what the placement strategy will be between the flexible resource group and the fixed size nodes will be (i.e. that we will only support packing virtual nodes for now).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think the strategy is within flexible resources: Ray will create as few flexible virtual nodes as possible for the given amount of flexible resources.
Do we really need a strategy between flexible nodes and fixed size nodes?
reps/2023-12-14-virtual-cluster.md
Outdated
|
||
With the introduction of virtual clusters, every Ray job runs in its own virtual cluster and only has access to resources inside that virtual cluster. Each virtual cluster has a spec that defines the min and max resources of the cluster. Min resources are minimal resources required for the job to run and they are atomically reserved for gang scheduling. If min resources cannot be reserved when there are not enough available resources, the job will be queued. With job queueing, we can implement different policies such as FIFO or priority-based queueing. Max resources are the autoscaling limit of the virtual cluster and the maximal resources can be used by the job. | ||
|
||
Virtual clusters can be nested and a Ray job can create sub-clusters to isolate separate parts of its application workload. For example, a Tune grid sweep job can create a sub-cluster for each of its nested Train workload. These possibly nested virtual clusters form a tree where the root is the entire physical cluster. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think it's a bit strange that the root is a physical cluster. Shouldn't it be another virtual cluster?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You can think of it's a virtual cluster that represents the entire physical cluster: each virtual node represents a physical node.
Signed-off-by: Jiajun Yao <[email protected]>
Signed-off-by: Jiajun Yao <[email protected]>
reps/2023-12-14-virtual-cluster.md
Outdated
// If specified, ensure we have at least this min amount | ||
// of resources before starting the cluster. | ||
// If not specified, the default value is 0. | ||
map<string, double> flexible_resource_min |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
map<string, double> flexible_resource_min | |
map<string, double> flex_resource_min |
nit: "flexible" is too long/hard to spell, prefer to abbreviate in these cases
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
should we just name it min_resources, max_resources?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
should we just name it min_resources, max_resources?
The semantics is different. flex_resource_min
doesn't include fixed node resources while min_resources
should include everything.
|
||
``` | ||
message VirtualCluster { | ||
// A virtual cluster consits of flexible resources and fixed size resources. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should add map<string, string> labels
that applies to both flex and fixed nodes.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What's the use case for adding labels to flex nodes?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You need labels in order to schedule tasks within an specific virtual cluster in the first place, right? This is also for consistency with the physical node interface.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yea, flex nodes also have system labels (e.g. vc_id). I'm just not sure whether we need to expose a way for users to set custom labels for flex nodes.
So you are suggesting we have map<string, string> cluster_level_custom_labels
that applies to all virtual nodes of a virtual cluster. And later on fixed nodes can also set additional custom labels individually?
Or we have map<string, string> flex_nodes_custom_labels
that only applies to flex nodes.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I see, if we have automatically added labels for vc_id then I agree this is more of a P1.
reps/2023-12-14-virtual-cluster.md
Outdated
ray.init(virtual_cluster=VirtualCluster(flexible_resource_min={"CPU": 1}, flexible_resource_max={"CPU": 8})) | ||
|
||
# The job uses 1 A100 GPU. | ||
ray.init(virtual_cluster=VirtualCluster(fixed_size_nodes=[FixedSizeNode(resources={"GPU": 1}, parent_node_selector={"accelerator_type": In("A100")})])) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
what happens if there is no such node exactly matches the fixed size node resource? will we round up, shard the node, or fail?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
a follow up question is what happens if the required_nodes conflicts with flex_resource_max?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
A physical node can be split into multiple fixed size virtual nodes.
a follow up question is what happens if the required_nodes conflicts with flex_resource_max?
flex_resource_max doesn't include fixed size node resources. Total virtual cluster resource is flex resource + fixed resource.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
would flex_resource_max -> additional_resource make sense?
// together with flexible_resource_min | ||
// that will be atomically reserved when the | ||
// virtual cluster is created. | ||
repeated FixedSizeNodes fixed_size_nodes |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
would min_nodes be a better name?
reps/2023-12-14-virtual-cluster.md
Outdated
|
||
### General Motivation | ||
|
||
While users can run multiple Ray jobs within a single Ray cluster simultaneously, there is currently no sufficient isolation between jobs. Users also have no way to specify cross-job policies such as fairness. For example, if you run two Ray Tune jobs on the same cluster, they will both try to use all the cluster resources and compete for cluster resources in an ad-hoc mechanism, without notion of isolation, fairness, or priority. In order to properly support multi-tenancy, we need a machnism to share cluster resources between different jobs with isolation and certain cross-job policies. While [placement group](https://docs.ray.io/en/releases-2.9.0/ray-core/scheduling/placement-group.html) can solve some of the issues, its lack of nesting and autoscaling support makes it unusable for certain workloads. Virtual cluster is the mechanism we propose here and it supports isolation, nesting, and autoscaling. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We should add a concrete use case to explain why nesting is useful.
Signed-off-by: Jiajun Yao <[email protected]>
Signed-off-by: Jiajun Yao <[email protected]>
Signed-off-by: Jiajun Yao <[email protected]>
Signed-off-by: Jiajun Yao <[email protected]>
|
||
### General Motivation | ||
|
||
While users can run multiple Ray jobs within a single Ray cluster simultaneously, there is currently no sufficient isolation between jobs. Users also have no way to specify cross-job policies such as fairness. For example, if you run two Ray Tune jobs on the same cluster, they will both try to use all the cluster resources and compete for cluster resources in an ad-hoc mechanism, without notion of isolation, fairness, or priority. In order to properly support multi-tenancy, we need a machnism to share cluster resources between different jobs with isolation and certain cross-job policies. While [placement group](https://docs.ray.io/en/releases-2.9.0/ray-core/scheduling/placement-group.html) can solve some of the issues, its lack of nesting and autoscaling support makes it unusable for certain workloads. Virtual cluster is the mechanism we propose here and it supports (logical resources) isolation, nesting, and autoscaling. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is it enough for a Virtual cluster to support only logical isolation? Whether it makes more sense for virtual cluster to support real resource isolation?
Virtual clusters with different min and max resources are autoscalable. When scaling up, virtual clusters will try to borrow more resources from their parent clusters. If their parents have available resources, the scaling up is instant. Otherwise, parents will try to borrow resources from their parents recursively and eventually this may cause the upscale of the physical cluster. When scaling down, virtual clusters will return resources to their parents recursively and eventually this may cause the downscale of the physical cluster. To ensure fairness and avoid starvation, the borrowed resources are associated with leases. Once leases expire and the tasks or actors that are currently using these resources finish, the borrowed resources will be returned back to their parent clusters so that parent clusters can lend these resources to potentially other child clusters that also need to be upscaled. | ||
|
||
Virtual nodes of a virtual cluster can be either fixed size or flexible/resizable. For a single virtual cluster, there can be multiple fixed-size virtual nodes but at most one flexible virtual node on a single physical node. The upscaling of a virtual cluster can be achieved by adding new virtual nodes or scaling up an existing flexible virtual node. | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Whether preemption is supported ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🚀
task.options( | ||
scheduling_strategy=PlacementGroupSchedulingStrategy(placement_group=pg)).remote() | ||
|
||
# Create a virtual cluster with two virtual nodes that are packed |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Make it clear that these are 2 ways of achieving the same thing.
# Existing Method: Placement Groups
...
# New Method with Virtual Cluster
...
ray.init(virtual_cluster=VirtualCluster( | ||
fixed_size_nodes=[ | ||
FixedSizeNodes( | ||
nodes=[ | ||
FixedSizeNode(resources={"GPU": 1}, labels={"bundle_index": "0"}), | ||
FixedSizeNode(resources={"GPU": 1}, labels={"bundle_index": "1"}) | ||
], | ||
scheduling_policy=STRICT_SPREAD | ||
) | ||
])) | ||
|
||
actors = [] | ||
for i in range(2): | ||
actors.append(Actor.options( | ||
num_gpus=1, | ||
node_labels={"bundle_index": In(str(i))}).remote()) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Previously, we're able to wait on a placement group to be ready:
pg = placement_group(...) # non-blocking
ray.get(pg.ready()) # blocking
Is the new API going to block on ray.init
until the fixed nodes are assigned to the right physical locations?
ray.init(vc=VirtualCluster()) # hangs until all fixed nodes are ready.
flex_resource_min={"CPU": 1}, | ||
flex_resource_max={"CPU": 10})) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
So, the entire cluster can use up to 10 CPUs, so both trials and both workers within each trainer are possibly competing over the same CPUs?
At the moment, CPU=1
is the default, so would you need to set a min_cpus
and max_cpus
for each node?
For example, how do I get this setup:
Trial 1:
Worker A: 1 GPU, 1-10 CPUs for last-mile preprocessing
Worker B: same^
Trial 2:
Worker A: same^
Worker B: same^
SplitCoordinator (and other Ray Data tasks): 1-10 CPUs, 1-4 GPUs depending on the autoscaling of the Ray Data pipeline stages
The Ray Data components are currently detached from the Ray Train/Tune placement group. How does placement_group_capture_child_tasks=None
translate to VirtualCluster
s?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Also, would physical CPU isolation still be left to be controlled by the user?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Trial 1: {fixed_nodes={2 GPU nodes}, flex={2..20 CPUs}}
Trial 2: {fixed_nodes={2 GPU nodes}, flex={2..20 CPUs}}
For the SplitCoordinator, I'm assuming you mean there is some separate preprocessing job that's run here before kicking off the training trials? In that case, you can put that in a another VC with {flex={1-10 CPU, 1-4 GPU}}.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The SplitCoordinator
I'm mentioning is the thing executing the ray data + ray train streaming execution that gets run in the background during the training execution.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I see, then those should be part of the trial cluster, so this would be a reasonable config:
Trial 1: {fixed_nodes={2 GPU nodes}, flex={2..20 CPUs}}
Trial 2: {fixed_nodes={2 GPU nodes}, flex={2..20 CPUs}}
For more predictable performance you'd probably also want to assign a fixed number of CPUs instead of allowing 2..20 for flex, so probably {fixed_nodes={2 GPU nodes}, flex={10 CPUs}}
would be the way to go.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The Ray Data components are currently detached from the Ray Train/Tune placement group. How does [placement_group_capture_child_tasks=None](https://docs.ray.io/en/latest/ray-core/scheduling/placement-group.html#advanced-child-tasks-and-actors) translate to VirtualClusters?
To answer this question directly, by design we don't need to support detached resources for virtual clusters. The data tasks can use flex resources instead.
1. Deprecate the placement group API and use virtual cluster API directly. | ||
2. Keep the placement group API and only change the internal implementation to use virtual cluster. | ||
|
||
#### Examples |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Another example to consider:
ML platform with priority-based pre-emption of distributed training workers
Overall Ray Cluster: 16 GPUs, 128 CPUs
Job 0 (P0): min_gpus=4, max_gpus=10, cpus=32
Worker * 10: each with 1 GPU
Job 1 (P1): min_gpus=2, max_gpus=8, cpus=1
Worker * 3: each with 1 GPU
Job 2 (P1): min_gpus=1, max_gpus=4, cpus=1
Worker * 3: each with 1 GPU
Job 3 (P2): min_gpus=1
Pending for something to free up.
It might make more sense to switch these priorities to GPU time in %. This arbitrary upscale/downscale of each independent training job (in its own virtual cluster) depends on elastic distributed training from Ray Train / another library.
This might be a little too much scope for the current REP, but I would like a future REP to enable this kind of pre-emption policy for each "parent" virtual cluster. The parent VirtualCluster should be able to drain some nodes of the child clusters to reclaim resources, as a form of "hard downscaling", rather than just waiting for task completion.
@jjyao
|
scheduling_policy=STRICT_SPREAD | ||
) | ||
]) | ||
with vc: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
For nested virtualcluster case:
- Actor 1 create vc_1 , Actor 2 want to create actor_3 in vc_1. This case actor 2 don't have vc_1 handle. so can't use
with vc:
So i prefer virtual cluster hase name , options has avirtual_cluster
param.
Actor.options(virtual_cluster="vc_1").rmeote()
// If specified, ensure we have at least this min amount | ||
// of resources before starting the cluster. | ||
// If not specified, the default value is 0. | ||
map<string, double> flex_resource_min |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can virtual nodes use custom resources or node labels on physical nodes in addition to CPU/GPU resources?
Hi, thanks. I will reply to you soon when I'm back.
|
resources={"GPU": 1}, | ||
parent_node_selector={"accelerator_type": In("A100")} | ||
) | ||
])) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Earlier, job queuing was mentioned, FIFO/priority-based queueing, which means that multiple jobs can run simultaneously on a virtual cluster. In this case, how can multiple ray.init calls specify the use of the same virtual cluster?
https://docs.google.com/document/d/1TJ3jHWVGGviJOQYYlB9yNjUUG8Hn54hYsNqnUrY4L14/edit#heading=h.8f25wuyjzzer