-
Notifications
You must be signed in to change notification settings - Fork 0
/
Session.cs
110 lines (89 loc) · 3.69 KB
/
Session.cs
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
using Azure.Core;
using CloudWorker.MessageQueue;
using Microsoft.Extensions.Logging;
using System;
using System.Diagnostics;
using System.Threading;
using System.Threading.Tasks;
namespace CloudWorker.Client.SDK;
public interface ISession
{
string Id { get; }
IClusterProperties ClusterProperties { get; }
IMessageQueue CreateSender();
IMessageQueue CreateReceiver();
}
public class SessionConfig : ClusterConfig {}
public class Session : ISession
{
public string Id => _cluster.Id;
private ICluster _cluster;
private IClusterProperties? _properties;
private ILoggerFactory? _loggerFactory;
public IClusterProperties ClusterProperties => _properties ??
throw new InvalidOperationException("GetClusterPropertiesAsync must be called before getting ClusterProperties.");
private Session(ICluster cluster, ILoggerFactory? loggerFactory = null)
{
_cluster = cluster;
_loggerFactory = loggerFactory;
}
private async Task GetClusterPropertiesAsync()
{
_properties = await _cluster.GetPropertiesAsync().ConfigureAwait(false);
}
//TODO: Support Storage queue
private IMessageQueue CreateQueueClient(bool sender)
{
Debug.Assert(_properties != null);
var queueType = _properties.QueueProperties?.QueueType;
if (!ServiceBusQueue.QueueType.Equals(queueType))
{
throw new NotImplementedException($"Queue type {queueType} is not supported.");
}
var queueOptions = new ServiceBusQueueOptions()
{
QueueType = queueType,
ConnectionString = _properties.QueueProperties!.ConnectionString,
QueueName = sender ? _properties.QueueProperties.RequestQueueName : _properties.QueueProperties.ResponseQueueName,
MessageLease = 60, //TODO: Make it configurable in ClusterConfig
RetryOnThrottled = true
};
var logger = _loggerFactory?.CreateLogger<ServiceBusQueue>();
return new ServiceBusQueue(queueOptions, logger);
}
public IMessageQueue CreateSender()
{
return CreateQueueClient(true);
}
public IMessageQueue CreateReceiver()
{
return CreateQueueClient(false);
}
public static async Task<Session> CreateOrUpdateAsync(TokenCredential credential, SessionConfig sessionConfig, string? sessionId = null,
ILoggerFactory? loggerFactory = null, CancellationToken token = default)
{
var logger = loggerFactory?.CreateLogger<Cluster>();
var cluster = new Cluster(credential, sessionConfig, sessionId, logger);
await cluster.CreateOrUpdateAsync(token).ConfigureAwait(false);
var session = new Session(cluster, loggerFactory);
await session.GetClusterPropertiesAsync().ConfigureAwait(false);
return session;
}
public static async Task<Session> GetAsync(TokenCredential credential, string sessionId,
ILoggerFactory? loggerFactory = null, CancellationToken token = default)
{
var logger = loggerFactory?.CreateLogger<Cluster>();
var cluster = new Cluster(credential, sessionId, logger);
await cluster.ValidateAsync(token).ConfigureAwait(false);
var session = new Session(cluster, loggerFactory);
await session.GetClusterPropertiesAsync().ConfigureAwait(false);
return session;
}
public static async Task DestroyAsync(TokenCredential credential, string sessionId,
ILoggerFactory? loggerFactory = null, CancellationToken token = default)
{
var logger = loggerFactory?.CreateLogger<Cluster>();
var cluster = new Cluster(credential, sessionId, logger);
await cluster.DestroyAsync(token).ConfigureAwait(false);
}
}