Skip to content

Commit

Permalink
[IngestionClient] Refactor - add ServiceBusClient via Dependency Inje…
Browse files Browse the repository at this point in the history
…ction instead of static clients (#2476)
  • Loading branch information
HenryvanderVegte authored Jul 15, 2024
1 parent 87130ab commit a943f96
Show file tree
Hide file tree
Showing 16 changed files with 186 additions and 81 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
// <copyright file="ServiceBusClientName.cs" company="Microsoft Corporation">
// Copyright (c) Microsoft Corporation. All rights reserved.
// Licensed under the MIT license. See LICENSE.md file in the project root for full license information.
// </copyright>

namespace Connector.Enums
{
public enum ServiceBusClientName
{
None = 0,
StartTranscriptionServiceBusClient,
FetchTranscriptionServiceBusClient,
CompletedTranscriptionServiceBusClient,
}
}
2 changes: 0 additions & 2 deletions samples/ingestion/ingestion-client/Connector/ModelIdentity.cs
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,6 @@

namespace Connector
{
using System;

public sealed class ModelIdentity
{
public ModelIdentity(string self)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,6 @@

namespace Connector
{
using Newtonsoft.Json;

public class CompletedMessage
{
public CompletedMessage(string audioFileName, string jsonResportLocation)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,6 @@ namespace Connector
{
using System.Collections.Generic;

using Connector.Serializable.Language.Conversations;

using Newtonsoft.Json;

public class SpeechTranscript
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,6 @@

namespace Connector.Serializable.TranscriptionStartedServiceBusMessage
{
using System;

public class AudioFileInfo
{
public AudioFileInfo(string fileUrl, int retryCount, TextAnalyticsRequests textAnalyticsRequests, string fileName)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,14 @@ namespace FetchTranscription
{
using System;
using System.Threading.Tasks;
using Azure.Messaging.ServiceBus;

using Connector;
using Connector.Database;

using Microsoft.Azure.Functions.Worker;
using Microsoft.Extensions.Azure;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Logging;

/// <summary>
Expand All @@ -19,6 +24,7 @@ public class FetchTranscription
{
private readonly IServiceProvider serviceProvider;
private readonly IStorageConnector storageConnector;
private readonly IAzureClientFactory<ServiceBusClient> serviceBusClientFactory;
private readonly ILogger<FetchTranscription> logger;

/// <summary>
Expand All @@ -27,11 +33,17 @@ public class FetchTranscription
/// <param name="serviceProvider">The service provider.</param>
/// <param name="logger">The FetchTranscription logger.</param>
/// <param name="storageConnector">Storage Connector dependency</param>
public FetchTranscription(IServiceProvider serviceProvider, ILogger<FetchTranscription> logger, IStorageConnector storageConnector)
/// <param name="serviceBusClientFactory">Azure client factory for service bus clients</param>
public FetchTranscription(
IServiceProvider serviceProvider,
ILogger<FetchTranscription> logger,
IStorageConnector storageConnector,
IAzureClientFactory<ServiceBusClient> serviceBusClientFactory)
{
this.serviceProvider = serviceProvider;
this.logger = logger;
this.storageConnector = storageConnector;
this.serviceBusClientFactory = serviceBusClientFactory;
}

/// <summary>
Expand All @@ -53,7 +65,9 @@ public async Task Run([ServiceBusTrigger("fetch_transcription_queue", Connection

var serviceBusMessage = TranscriptionStartedMessage.DeserializeMessage(message);

var transcriptionProcessor = new TranscriptionProcessor(this.serviceProvider, this.storageConnector);
var databaseContext = FetchTranscriptionEnvironmentVariables.UseSqlDatabase ? this.serviceProvider.GetRequiredService<IngestionClientDbContext>() : null;

var transcriptionProcessor = new TranscriptionProcessor(this.storageConnector, this.serviceBusClientFactory, databaseContext);

await transcriptionProcessor.ProcessTranscriptionJobAsync(serviceBusMessage, this.serviceProvider, this.logger).ConfigureAwait(false);
}
Expand Down
16 changes: 16 additions & 0 deletions samples/ingestion/ingestion-client/FetchTranscription/Program.cs
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,10 @@ namespace FetchTranscription

using Connector;
using Connector.Database;
using Connector.Enums;

using Microsoft.EntityFrameworkCore;
using Microsoft.Extensions.Azure;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Hosting;

Expand Down Expand Up @@ -44,6 +46,20 @@ public static void Main(string[] args)
s.AddSingleton(blobServiceClient);
s.AddSingleton(storageCredential);
s.AddTransient<IStorageConnector, StorageConnector>();

s.AddAzureClients(clientBuilder =>
{
clientBuilder.AddServiceBusClient(FetchTranscriptionEnvironmentVariables.StartTranscriptionServiceBusConnectionString)
.WithName(ServiceBusClientName.StartTranscriptionServiceBusClient.ToString());
clientBuilder.AddServiceBusClient(FetchTranscriptionEnvironmentVariables.FetchTranscriptionServiceBusConnectionString)
.WithName(ServiceBusClientName.FetchTranscriptionServiceBusClient.ToString());

if (!string.IsNullOrWhiteSpace(FetchTranscriptionEnvironmentVariables.CompletedServiceBusConnectionString))
{
clientBuilder.AddServiceBusClient(FetchTranscriptionEnvironmentVariables.CompletedServiceBusConnectionString)
.WithName(ServiceBusClientName.CompletedTranscriptionServiceBusClient.ToString());
}
});
})
.Build();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,49 +21,44 @@ namespace FetchTranscription
using Connector.Serializable.TranscriptionStartedServiceBusMessage;

using Microsoft.EntityFrameworkCore;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Azure;
using Microsoft.Extensions.Logging;
using Newtonsoft.Json;

public class TranscriptionProcessor
{
private static readonly ServiceBusClient StartServiceBusClient = new ServiceBusClient(FetchTranscriptionEnvironmentVariables.StartTranscriptionServiceBusConnectionString);
private readonly ServiceBusSender startTranscriptionServiceBusSender;

private static readonly ServiceBusSender StartServiceBusSender = StartServiceBusClient.CreateSender(ServiceBusConnectionStringProperties.Parse(FetchTranscriptionEnvironmentVariables.StartTranscriptionServiceBusConnectionString).EntityPath);
private readonly ServiceBusSender fetchTranscriptionServiceBusSender;

private static readonly ServiceBusClient FetchServiceBusClient = new ServiceBusClient(FetchTranscriptionEnvironmentVariables.FetchTranscriptionServiceBusConnectionString);

private static readonly ServiceBusSender FetchServiceBusSender = FetchServiceBusClient.CreateSender(ServiceBusConnectionStringProperties.Parse(FetchTranscriptionEnvironmentVariables.FetchTranscriptionServiceBusConnectionString).EntityPath);

private static readonly ServiceBusClient CompletedServiceBusClient;

private static readonly ServiceBusSender CompletedServiceBusSender;

private readonly IServiceProvider serviceProvider;
private readonly ServiceBusSender completedTranscriptionServiceBusSender;

private readonly IngestionClientDbContext databaseContext;

private readonly IStorageConnector storageConnector;

#pragma warning disable CA1810
static TranscriptionProcessor()
public TranscriptionProcessor(
IStorageConnector storageConnector,
IAzureClientFactory<ServiceBusClient> serviceBusClientFactory,
IngestionClientDbContext databaseContext)
{
if (!string.IsNullOrEmpty(FetchTranscriptionEnvironmentVariables.CompletedServiceBusConnectionString))
{
CompletedServiceBusClient = new ServiceBusClient(FetchTranscriptionEnvironmentVariables.CompletedServiceBusConnectionString);
CompletedServiceBusSender = CompletedServiceBusClient.CreateSender(ServiceBusConnectionStringProperties.Parse(FetchTranscriptionEnvironmentVariables.CompletedServiceBusConnectionString).EntityPath);
}
}
#pragma warning restore CA1810

public TranscriptionProcessor(IServiceProvider serviceProvider, IStorageConnector storageConnector)
{
this.serviceProvider = serviceProvider;
this.storageConnector = storageConnector;
this.databaseContext = databaseContext;

ArgumentNullException.ThrowIfNull(serviceBusClientFactory, nameof(serviceBusClientFactory));
var startTranscriptionServiceBusClient = serviceBusClientFactory.CreateClient(ServiceBusClientName.StartTranscriptionServiceBusClient.ToString());
var startTranscriptionQueueName = ServiceBusConnectionStringProperties.Parse(FetchTranscriptionEnvironmentVariables.StartTranscriptionServiceBusConnectionString).EntityPath;
this.startTranscriptionServiceBusSender = startTranscriptionServiceBusClient.CreateSender(startTranscriptionQueueName);

var fetchTranscriptionServiceBusClient = serviceBusClientFactory.CreateClient(ServiceBusClientName.FetchTranscriptionServiceBusClient.ToString());
var fetchTranscriptionQueueName = ServiceBusConnectionStringProperties.Parse(FetchTranscriptionEnvironmentVariables.FetchTranscriptionServiceBusConnectionString).EntityPath;
this.fetchTranscriptionServiceBusSender = fetchTranscriptionServiceBusClient.CreateSender(fetchTranscriptionQueueName);

if (FetchTranscriptionEnvironmentVariables.UseSqlDatabase)
if (!string.IsNullOrWhiteSpace(FetchTranscriptionEnvironmentVariables.CompletedServiceBusConnectionString))
{
this.databaseContext = this.serviceProvider.GetRequiredService<IngestionClientDbContext>();
var completedTranscriptionServiceBusClient = serviceBusClientFactory.CreateClient(ServiceBusClientName.CompletedTranscriptionServiceBusClient.ToString());
var completedTranscriptionQueueName = ServiceBusConnectionStringProperties.Parse(FetchTranscriptionEnvironmentVariables.CompletedServiceBusConnectionString).EntityPath;
this.completedTranscriptionServiceBusSender = completedTranscriptionServiceBusClient.CreateSender(completedTranscriptionQueueName);
}
}

Expand Down Expand Up @@ -97,11 +92,11 @@ public async Task ProcessTranscriptionJobAsync(TranscriptionStartedMessage servi
break;
case "Running":
log.LogInformation($"Transcription running, polling again after {messageDelayTime.TotalMinutes} minutes.");
await ServiceBusUtilities.SendServiceBusMessageAsync(FetchServiceBusSender, serviceBusMessage.CreateMessageString(), log, messageDelayTime).ConfigureAwait(false);
await ServiceBusUtilities.SendServiceBusMessageAsync(this.fetchTranscriptionServiceBusSender, serviceBusMessage.CreateMessageString(), log, messageDelayTime).ConfigureAwait(false);
break;
case "NotStarted":
log.LogInformation($"Transcription not started, polling again after {messageDelayTime.TotalMinutes} minutes.");
await ServiceBusUtilities.SendServiceBusMessageAsync(FetchServiceBusSender, serviceBusMessage.CreateMessageString(), log, messageDelayTime).ConfigureAwait(false);
await ServiceBusUtilities.SendServiceBusMessageAsync(this.fetchTranscriptionServiceBusSender, serviceBusMessage.CreateMessageString(), log, messageDelayTime).ConfigureAwait(false);
break;
}
}
Expand Down Expand Up @@ -221,7 +216,7 @@ private async Task ProcessFailedTranscriptionAsync(string transcriptionLocation,
};

var audioFileMessage = new Azure.Messaging.ServiceBus.ServiceBusMessage(JsonConvert.SerializeObject(serviceBusMessage));
await ServiceBusUtilities.SendServiceBusMessageAsync(StartServiceBusSender, audioFileMessage, log, TimeSpan.FromMinutes(1)).ConfigureAwait(false);
await ServiceBusUtilities.SendServiceBusMessageAsync(this.startTranscriptionServiceBusSender, audioFileMessage, log, TimeSpan.FromMinutes(1)).ConfigureAwait(false);
}
else
{
Expand Down Expand Up @@ -284,7 +279,7 @@ private async Task RetryOrFailJobAsync(TranscriptionStartedMessage message, stri
if (message.FailedExecutionCounter <= FetchTranscriptionEnvironmentVariables.RetryLimit || isThrottled)
{
log.LogInformation("Retrying..");
await ServiceBusUtilities.SendServiceBusMessageAsync(FetchServiceBusSender, message.CreateMessageString(), log, messageDelayTime).ConfigureAwait(false);
await ServiceBusUtilities.SendServiceBusMessageAsync(this.fetchTranscriptionServiceBusSender, message.CreateMessageString(), log, messageDelayTime).ConfigureAwait(false);
}
else
{
Expand Down Expand Up @@ -342,7 +337,7 @@ private async Task ProcessSucceededTranscriptionAsync(string transcriptionLocati
{
// If transcription analytics request is still running, re-queue message and get status again after X minutes:
log.LogInformation($"Transcription analytics requests still running for job {jobName} - re-queueing message.");
await ServiceBusUtilities.SendServiceBusMessageAsync(FetchServiceBusSender, serviceBusMessage.CreateMessageString(), log, GetMessageDelayTime(serviceBusMessage.PollingCounter)).ConfigureAwait(false);
await ServiceBusUtilities.SendServiceBusMessageAsync(this.fetchTranscriptionServiceBusSender, serviceBusMessage.CreateMessageString(), log, GetMessageDelayTime(serviceBusMessage.PollingCounter)).ConfigureAwait(false);
return;
}

Expand Down Expand Up @@ -418,7 +413,7 @@ private async Task ProcessSucceededTranscriptionAsync(string transcriptionLocati
log.LogInformation($"Added text analytics requests to service bus message - re-queueing message.");

// Poll for first time with TA request after 1 minute
await ServiceBusUtilities.SendServiceBusMessageAsync(FetchServiceBusSender, serviceBusMessage.CreateMessageString(), log, TimeSpan.FromMinutes(1)).ConfigureAwait(false);
await ServiceBusUtilities.SendServiceBusMessageAsync(this.fetchTranscriptionServiceBusSender, serviceBusMessage.CreateMessageString(), log, TimeSpan.FromMinutes(1)).ConfigureAwait(false);
return;
}

Expand Down Expand Up @@ -509,9 +504,9 @@ await this.databaseContext.StoreTranscriptionAsync(
}
}

if (!string.IsNullOrEmpty(FetchTranscriptionEnvironmentVariables.CompletedServiceBusConnectionString))
if (this.completedTranscriptionServiceBusSender != null)
{
await ServiceBusUtilities.SendServiceBusMessageAsync(CompletedServiceBusSender, JsonConvert.SerializeObject(completedMessages), log, GetMessageDelayTime(serviceBusMessage.PollingCounter)).ConfigureAwait(false);
await ServiceBusUtilities.SendServiceBusMessageAsync(this.completedTranscriptionServiceBusSender, JsonConvert.SerializeObject(completedMessages), log, GetMessageDelayTime(serviceBusMessage.PollingCounter)).ConfigureAwait(false);
}

var generalErrors = generalErrorsStringBuilder.ToString();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -256,7 +256,7 @@
},
"variables":
{
"Version": "v2.1.3",
"Version": "v2.1.4",
"AudioInputContainer": "audio-input",
"AudioProcessedContainer": "audio-processed",
"ErrorFilesOutputContainer": "audio-failed",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,13 +5,13 @@

namespace StartTranscription
{
using System;

using Azure.Storage;
using Azure.Storage.Blobs;

using Connector;
using Connector.Enums;

using Microsoft.Extensions.Azure;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Hosting;

Expand Down Expand Up @@ -44,6 +44,14 @@ public static void Main(string[] args)
s.AddSingleton(blobServiceClient);
s.AddSingleton(storageCredential);
s.AddTransient<IStorageConnector, StorageConnector>();

s.AddAzureClients(clientBuilder =>
{
clientBuilder.AddServiceBusClient(StartTranscriptionEnvironmentVariables.StartTranscriptionServiceBusConnectionString)
.WithName(ServiceBusClientName.StartTranscriptionServiceBusClient.ToString());
clientBuilder.AddServiceBusClient(StartTranscriptionEnvironmentVariables.FetchTranscriptionServiceBusConnectionString)
.WithName(ServiceBusClientName.FetchTranscriptionServiceBusClient.ToString());
});
})
.Build();

Expand Down
Loading

0 comments on commit a943f96

Please sign in to comment.