-
Notifications
You must be signed in to change notification settings - Fork 0
/
Cluster.cs
295 lines (250 loc) · 11.6 KB
/
Cluster.cs
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
using Azure;
using Azure.Core;
using Azure.ResourceManager;
using Azure.ResourceManager.Resources;
using Azure.ResourceManager.Resources.Models;
using Azure.ResourceManager.ServiceBus;
using Azure.ResourceManager.ServiceBus.Models;
using CloudWorker.Client.SDK.ARM;
using CloudWorker.MessageQueue;
using Microsoft.Extensions.Logging;
using System;
using System.Collections.Generic;
using System.Diagnostics;
using System.IO;
using System.Text.Json;
using System.Threading;
using System.Threading.Tasks;
namespace CloudWorker.Client.SDK;
public interface ICluster
{
string Id { get; }
Task CreateOrUpdateAsync(CancellationToken token = default);
Task ValidateAsync(CancellationToken token = default);
Task DestroyAsync(CancellationToken token = default);
Task<IClusterProperties> GetPropertiesAsync(CancellationToken token = default);
}
public class Cluster : ICluster
{
private readonly TokenCredential _credential;
private readonly ClusterId _clusterId;
private readonly ClusterConfig? _clusterConfig;
private ILogger? _logger;
public string Id => _clusterId.ToString();
//NOTE: Some Azure RPs, like Service Bus, do not allow a resource name starting with a digit number.
//So here a name prefix "clw-" (short for CloudWorker) is used.
private string DeploymentName => $"clw-{_clusterId.ResourceId}-deployment";
private string MessagingRgName => $"clw-{_clusterId.ResourceId}-messaging";
private string ComputingRgName => $"clw-{_clusterId.ResourceId}-computing";
//Service Bus RP requires that the name length should be less than 50 and it cannot end with -sb.
private string ServiceBusName => $"clw-{_clusterId.ResourceId}-sbn";
private string AppInsightsName => $"clw-{_clusterId.ResourceId}-appinsights";
//NOTE: The following tags are defined in starter.bicep. Keey them up to date!
private const string QueueTypeTag = "QueueType";
private const string RequestQueueNameTag = "RequestQueueName";
private const string ResponseQueueNameTag = "ResponseQueueName";
private const string ServiceTag = "Service";
//To create a new cluster
public Cluster(TokenCredential credential, ClusterConfig clusterConfig, ILogger<Cluster>? logger = null)
: this(credential, clusterConfig, null, logger) {}
//To use an existing cluster
public Cluster(TokenCredential credential, string clusterId, ILogger<Cluster>? logger = null)
{
_clusterId = ClusterId.FromString(clusterId);
_credential = credential;
_logger = logger;
_logger?.LogInformation("Cluster ID: {id}", _clusterId);
}
//To create or update a cluster
public Cluster(TokenCredential credential, ClusterConfig clusterConfig, string? clusterId = null, ILogger<Cluster>? logger = null)
{
clusterConfig.Validate();
if (clusterId != null)
{
var clsId = ClusterId.FromString(clusterId);
var subId = clusterConfig.SubScriptionId;
if (subId != clsId.SubscriptionId)
{
throw new ArgumentException("SubScription IDs are inconsistent!");
}
//Use specified id to create or update a cluster
_clusterId = clsId;
}
else
{
//Create a new id to create a new cluster
_clusterId = new ClusterId(clusterConfig.SubScriptionId, Guid.NewGuid());
}
_clusterConfig = clusterConfig;
_credential = credential;
_logger = logger;
_logger?.LogInformation("Cluster ID: {id}", _clusterId);
}
public async Task CreateOrUpdateAsync(CancellationToken token = default)
{
if (_clusterConfig == null)
{
var msg = "Cannot create a cluster without ClusterConfig!";
_logger?.LogError(msg);
throw new InvalidOperationException(msg);
}
_logger?.LogInformation("Create or update cluster {id}", _clusterId);
try
{
var baseDir = Path.GetDirectoryName(typeof(Cluster).Assembly.Location);
var templateFile = Path.Join(baseDir, "ArmTemplates", "starter.json");
_logger?.LogInformation("Use template {file}", templateFile);
var template = File.ReadAllText(templateFile);
var parameters = NewTemplateParameters();
var jsonOptions = new JsonSerializerOptions() { PropertyNamingPolicy = JsonNamingPolicy.CamelCase };
var parametersInJson = JsonSerializer.Serialize(parameters, jsonOptions);
_logger?.LogDebug("Template parameters in JSON:\n{content}", parametersInJson);
var deploymentProperties = new ArmDeploymentProperties(ArmDeploymentMode.Incremental)
{
Template = BinaryData.FromString(template),
Parameters = BinaryData.FromString(parametersInJson)
};
var deploymentData = new ArmDeploymentContent(deploymentProperties) { Location = _clusterConfig.Location };
var client = new ArmClient(_credential, _clusterConfig.SubScriptionId.ToString());
var sub = client.GetDefaultSubscription();
var deployments = sub.GetArmDeployments();
_logger?.LogInformation("Create or update deployment {name}", DeploymentName);
var result = await deployments.CreateOrUpdateAsync(Azure.WaitUntil.Completed, DeploymentName, deploymentData, token).ConfigureAwait(false);
var deployment = result.Value;
_logger?.LogInformation("Finish deployment {name}", DeploymentName);
}
catch (Exception ex)
{
_logger?.LogError(ex, "Error when creating or updating cluster {id}.", _clusterId);
throw;
}
}
private StarterParameters NewTemplateParameters()
{
Debug.Assert(_clusterConfig != null);
if (_clusterConfig.Service == ServiceType.Custom)
{
throw new NotImplementedException();
}
var parameters = new StarterParameters()
{
Location = ArmParamValue<string>.Create(_clusterConfig.Location),
Service = ArmParamValue<string>.Create(_clusterConfig.Service.ToString().ToLower()),
EnvironmentVariables = ArmParamValue<IEnumerable<SecureEnvironmentVariable>>.Create(_clusterConfig.EnvironmentVariables),
FileShares = ArmParamValue<IEnumerable<FileShareMount>>.Create(_clusterConfig.FileShares),
MessagingRgName = ArmParamValue<string>.Create(MessagingRgName),
ComputingRgName = ArmParamValue<string>.Create(ComputingRgName),
ServiceBusName = ArmParamValue<string>.Create(ServiceBusName),
AppInsightsName = ArmParamValue<string>.Create(AppInsightsName),
NodeCount = ArmParamValue<int>.Create(_clusterConfig.NodeCount),
NodeOptions = ArmParamValue<NodeOptions>.Create(_clusterConfig.NodeOptions),
ServiceBusQueueOptions = ArmParamValue<ARM.ServiceBusQueueOptions>.Create(_clusterConfig.ServiceBusQueueOptions)
};
return parameters;
}
//TODO: Remove the method or provide real implementation
public Task ValidateAsync(CancellationToken token = default)
{
return Task.CompletedTask;
}
public async Task DestroyAsync(CancellationToken token = default)
{
_logger?.LogInformation("Destroy cluster {id}", _clusterId);
try
{
var client = new ArmClient(_credential, _clusterId.SubscriptionId.ToString());
var sub = client.GetDefaultSubscription();
var qTask = DeleteResourceGroupAsync(sub, MessagingRgName);
var cTask = DeleteResourceGroupAsync(sub, ComputingRgName);
await Task.WhenAll(qTask, cTask).ConfigureAwait(false);
_logger?.LogInformation("Cluster {id} is destroyed.", _clusterId);
}
catch (Exception ex)
{
_logger?.LogError(ex, "Error when destroying cluster {id}.", _clusterId);
throw;
}
}
private async Task DeleteResourceGroupAsync(SubscriptionResource subscription, string rgName, CancellationToken token = default)
{
_logger?.LogInformation("Delete resource group {name} of subscription {id}", rgName, subscription.Id);
try
{
ResourceGroupResource rg = await subscription.GetResourceGroupAsync(rgName, token).ConfigureAwait(false);
await rg.DeleteAsync(Azure.WaitUntil.Completed, cancellationToken: token);
}
catch (RequestFailedException ex) when (ex.Status == 404)
{
_logger?.LogWarning("Resource group {name} is not found in subscription {id}.", rgName, subscription.Id);
}
}
//TODO: Get property for monitoring/AppInsights URL
public async Task<IClusterProperties> GetPropertiesAsync(CancellationToken token = default)
{
try
{
var client = new ArmClient(_credential, _clusterId.SubscriptionId.ToString());
var sub = client.GetDefaultSubscription();
var qTask = GetQueuePropertiesAsync(sub, token);
var sTask = GetServicePropertiesAsync(sub, token);
await Task.WhenAll(qTask, sTask).ConfigureAwait(false);
qTask.Result.Validate();
sTask.Result.Validate();
return new ClusterProperties()
{
QueueProperties = qTask.Result,
ServiceProperties = sTask.Result
};
}
catch (Exception ex)
{
_logger?.LogError(ex, "Error when getting properties of cluster {id}.", _clusterId);
throw;
}
}
private async Task<QueueProperties> GetQueuePropertiesAsync(SubscriptionResource subscription, CancellationToken token = default)
{
ResourceGroupResource rg = await subscription.GetResourceGroupAsync(MessagingRgName, token).ConfigureAwait(false);
var queueProperties = new QueueProperties();
foreach (var tag in rg.Data.Tags)
{
switch (tag.Key)
{
case QueueTypeTag:
queueProperties.QueueType = tag.Value;
break;
case RequestQueueNameTag:
queueProperties.RequestQueueName = tag.Value;
break;
case ResponseQueueNameTag:
queueProperties.ResponseQueueName = tag.Value;
break;
}
}
//TODO: Support getting connection string for Storage Queue
if (!ServiceBusQueue.QueueType.Equals(queueProperties.QueueType, StringComparison.OrdinalIgnoreCase))
{
return queueProperties;
}
ServiceBusNamespaceResource sb = await rg.GetServiceBusNamespaceAsync(ServiceBusName, token).ConfigureAwait(false);
ServiceBusNamespaceAuthorizationRuleResource rule = await sb.GetServiceBusNamespaceAuthorizationRuleAsync("RootManageSharedAccessKey", token).ConfigureAwait(false);
ServiceBusAccessKeys keys = await rule.GetKeysAsync(token).ConfigureAwait(false);
queueProperties.ConnectionString = keys.PrimaryConnectionString;
return queueProperties;
}
private async Task<ServiceProperties> GetServicePropertiesAsync(SubscriptionResource subscription, CancellationToken token = default)
{
ResourceGroupResource rg = await subscription.GetResourceGroupAsync(ComputingRgName, token).ConfigureAwait(false);
var serviceProperties = new ServiceProperties();
foreach (var tag in rg.Data.Tags)
{
switch (tag.Key)
{
case ServiceTag:
serviceProperties.Service = tag.Value;
break;
}
}
return serviceProperties;
}
}