From 82568af5148182d4bf724bcfa5ec324606e7a2b9 Mon Sep 17 00:00:00 2001 From: Henry van der Vegte Date: Mon, 22 May 2023 14:48:56 +0200 Subject: [PATCH] Refactor transcription analytics status check (#1951) --- .../Enums/TranscriptionAnalyticsJobStatus.cs | 15 ++ .../TranscriptionStartedMessageExtensions.cs | 53 ++++++ .../TranscriptionProcessor.cs | 169 ++++++++---------- .../Setup/ArmTemplateBatch.json | 2 +- .../Setup/ArmTemplateRealtime.json | 2 +- 5 files changed, 145 insertions(+), 96 deletions(-) create mode 100644 samples/ingestion/ingestion-client/Connector/Enums/TranscriptionAnalyticsJobStatus.cs create mode 100644 samples/ingestion/ingestion-client/FetchTranscription/Extensions/TranscriptionStartedMessageExtensions.cs diff --git a/samples/ingestion/ingestion-client/Connector/Enums/TranscriptionAnalyticsJobStatus.cs b/samples/ingestion/ingestion-client/Connector/Enums/TranscriptionAnalyticsJobStatus.cs new file mode 100644 index 000000000..4109ca3bd --- /dev/null +++ b/samples/ingestion/ingestion-client/Connector/Enums/TranscriptionAnalyticsJobStatus.cs @@ -0,0 +1,15 @@ +// +// Copyright (c) Microsoft Corporation. All rights reserved. +// Licensed under the MIT license. See LICENSE.md file in the project root for full license information. +// + +namespace Connector.Enums +{ + public enum TranscriptionAnalyticsJobStatus + { + None, + NotStarted, + Running, + Completed + } +} diff --git a/samples/ingestion/ingestion-client/FetchTranscription/Extensions/TranscriptionStartedMessageExtensions.cs b/samples/ingestion/ingestion-client/FetchTranscription/Extensions/TranscriptionStartedMessageExtensions.cs new file mode 100644 index 000000000..722e85dc6 --- /dev/null +++ b/samples/ingestion/ingestion-client/FetchTranscription/Extensions/TranscriptionStartedMessageExtensions.cs @@ -0,0 +1,53 @@ +// +// Copyright (c) Microsoft Corporation. All rights reserved. +// Licensed under the MIT license. See LICENSE.md file in the project root for full license information. +// + +namespace FetchTranscriptionFunction +{ + using System; + using System.Linq; + using System.Threading.Tasks; + + using Connector; + using Connector.Enums; + + using Language; + using Microsoft.Extensions.Logging; + + using TextAnalytics; + + public static class TranscriptionStartedMessageExtensions + { + public static async Task GetTranscriptionAnalyticsJobsStatusAsync( + this TranscriptionStartedMessage transcriptionStartedMessage, + TextAnalyticsProvider textAnalyticsProvider, + AnalyzeConversationsProvider analyzeConversationsProvider, + ILogger logger) + { + _ = transcriptionStartedMessage ?? throw new ArgumentNullException(nameof(transcriptionStartedMessage)); + + if (textAnalyticsProvider == null && analyzeConversationsProvider == null) + { + return TranscriptionAnalyticsJobStatus.None; + } + + var textAnalyticsRequestCompleted = textAnalyticsProvider != null ? await textAnalyticsProvider.TextAnalyticsRequestsCompleted(transcriptionStartedMessage.AudioFileInfos).ConfigureAwait(false) : true; + var conversationalAnalyticsRequestCompleted = analyzeConversationsProvider != null ? await analyzeConversationsProvider.ConversationalRequestsCompleted(transcriptionStartedMessage.AudioFileInfos).ConfigureAwait(false) : true; + + if (!textAnalyticsRequestCompleted || !conversationalAnalyticsRequestCompleted) + { + return TranscriptionAnalyticsJobStatus.Running; + } + + // if the message already contains text analytics requests and all are in terminal state, treat jobs as completed: + var containsTextAnalyticsRequest = transcriptionStartedMessage.AudioFileInfos.Where(audioFileInfo => audioFileInfo.TextAnalyticsRequests != null).Any(); + if (containsTextAnalyticsRequest) + { + return TranscriptionAnalyticsJobStatus.Completed; + } + + return TranscriptionAnalyticsJobStatus.NotStarted; + } + } +} diff --git a/samples/ingestion/ingestion-client/FetchTranscription/TranscriptionProcessor.cs b/samples/ingestion/ingestion-client/FetchTranscription/TranscriptionProcessor.cs index 91d831dd1..e53313cfd 100644 --- a/samples/ingestion/ingestion-client/FetchTranscription/TranscriptionProcessor.cs +++ b/samples/ingestion/ingestion-client/FetchTranscription/TranscriptionProcessor.cs @@ -323,27 +323,17 @@ private async Task ProcessSucceededTranscriptionAsync(string transcriptionLocati var textAnalyticsInfoProvided = !string.IsNullOrEmpty(textAnalyticsKey) && !string.IsNullOrEmpty(textAnalyticsRegion) && !textAnalyticsRegion.Equals("none", StringComparison.OrdinalIgnoreCase); - var conversationsAnalysisProvider = textAnalyticsInfoProvided ? new AnalyzeConversationsProvider(serviceBusMessage.Locale, textAnalyticsKey, textAnalyticsRegion, log) : null; - var textAnalyticsProvider = textAnalyticsInfoProvided ? new TextAnalyticsProvider(serviceBusMessage.Locale, textAnalyticsKey, textAnalyticsRegion, log) : null; - // Check if there is a text analytics request already running: - var containsTextAnalyticsRequest = serviceBusMessage.AudioFileInfos.Where(audioFileInfo => audioFileInfo.TextAnalyticsRequests != null).Any(); + var transcriptionAnalyticsJobsStatus = await serviceBusMessage.GetTranscriptionAnalyticsJobsStatusAsync(textAnalyticsProvider, conversationsAnalysisProvider, log).ConfigureAwait(false); - if (containsTextAnalyticsRequest && textAnalyticsProvider != null) + if (transcriptionAnalyticsJobsStatus == TranscriptionAnalyticsJobStatus.Running) { - var textAnalyticsRequestCompleted = await textAnalyticsProvider.TextAnalyticsRequestsCompleted(serviceBusMessage.AudioFileInfos).ConfigureAwait(false); - - var conversationalAnalyticsRequestCompleted = await conversationsAnalysisProvider.ConversationalRequestsCompleted(serviceBusMessage.AudioFileInfos).ConfigureAwait(false); - - // If text analytics request is still running, re-queue message and get status again after X minutes - if (!textAnalyticsRequestCompleted || !conversationalAnalyticsRequestCompleted) - { - log.LogInformation($"Text analytics request still running for job {jobName} - re-queueing message."); - await ServiceBusUtilities.SendServiceBusMessageAsync(FetchServiceBusSender, serviceBusMessage.CreateMessageString(), log, GetMessageDelayTime(serviceBusMessage.PollingCounter)).ConfigureAwait(false); - return; - } + // If transcription analytics request is still running, re-queue message and get status again after X minutes: + log.LogInformation($"Transcription analytics requests still running for job {jobName} - re-queueing message."); + await ServiceBusUtilities.SendServiceBusMessageAsync(FetchServiceBusSender, serviceBusMessage.CreateMessageString(), log, GetMessageDelayTime(serviceBusMessage.PollingCounter)).ConfigureAwait(false); + return; } var transcriptionFiles = await BatchClient.GetTranscriptionFilesAsync(transcriptionLocation, subscriptionKey).ConfigureAwait(false); @@ -392,113 +382,104 @@ private async Task ProcessSucceededTranscriptionAsync(string transcriptionLocati } } - if (textAnalyticsProvider != null && - (FetchTranscriptionEnvironmentVariables.SentimentAnalysisSetting != SentimentAnalysisSetting.None - || FetchTranscriptionEnvironmentVariables.PiiRedactionSetting != PiiRedactionSetting.None - || AnalyzeConversationsProvider.IsConversationalPiiEnabled() - || AnalyzeConversationsProvider.IsConversationalSummarizationEnabled())) + if (transcriptionAnalyticsJobsStatus == TranscriptionAnalyticsJobStatus.Completed) { - // If we already got text analytics requests in the transcript (containsTextAnalyticsRequest), add the results to the transcript. - // Otherwise, submit new text analytics requests. - if (containsTextAnalyticsRequest) + foreach (var speechTranscriptMapping in speechTranscriptMappings) { - foreach (var speechTranscriptMapping in speechTranscriptMappings) + var speechTranscript = speechTranscriptMapping.Value; + var audioFileInfo = speechTranscriptMapping.Key; + var fileName = audioFileInfo.FileName; + if (FetchTranscriptionEnvironmentVariables.PiiRedactionSetting != PiiRedactionSetting.None) { - var speechTranscript = speechTranscriptMapping.Value; - var audioFileInfo = speechTranscriptMapping.Key; - var fileName = audioFileInfo.FileName; - if (FetchTranscriptionEnvironmentVariables.PiiRedactionSetting != PiiRedactionSetting.None) + speechTranscript.RecognizedPhrases.ToList().ForEach(phrase => { - speechTranscript.RecognizedPhrases.ToList().ForEach(phrase => + if (phrase.NBest != null && phrase.NBest.Any()) { - if (phrase.NBest != null && phrase.NBest.Any()) - { - var firstNBest = phrase.NBest.First(); - phrase.NBest = new[] { firstNBest }; - } - }); - } + var firstNBest = phrase.NBest.First(); + phrase.NBest = new[] { firstNBest }; + } + }); + } - var textAnalyticsErrors = new List(); + var textAnalyticsErrors = new List(); - if (audioFileInfo.TextAnalyticsRequests.AudioLevelRequests?.Any() == true) - { - var audioLevelErrors = await textAnalyticsProvider.AddAudioLevelEntitiesAsync(audioFileInfo.TextAnalyticsRequests.AudioLevelRequests.Select(request => request.Id), speechTranscript).ConfigureAwait(false); - textAnalyticsErrors.AddRange(audioLevelErrors); - } + if (audioFileInfo.TextAnalyticsRequests.AudioLevelRequests?.Any() == true) + { + var audioLevelErrors = await textAnalyticsProvider.AddAudioLevelEntitiesAsync(audioFileInfo.TextAnalyticsRequests.AudioLevelRequests.Select(request => request.Id), speechTranscript).ConfigureAwait(false); + textAnalyticsErrors.AddRange(audioLevelErrors); + } - if (audioFileInfo.TextAnalyticsRequests.UtteranceLevelRequests?.Any() == true) - { - var utteranceLevelErrors = await textAnalyticsProvider.AddUtteranceLevelEntitiesAsync(audioFileInfo.TextAnalyticsRequests.UtteranceLevelRequests.Select(request => request.Id), speechTranscript).ConfigureAwait(false); - textAnalyticsErrors.AddRange(utteranceLevelErrors); - } + if (audioFileInfo.TextAnalyticsRequests.UtteranceLevelRequests?.Any() == true) + { + var utteranceLevelErrors = await textAnalyticsProvider.AddUtteranceLevelEntitiesAsync(audioFileInfo.TextAnalyticsRequests.UtteranceLevelRequests.Select(request => request.Id), speechTranscript).ConfigureAwait(false); + textAnalyticsErrors.AddRange(utteranceLevelErrors); + } - if (audioFileInfo.TextAnalyticsRequests.ConversationRequests?.Any() == true) - { - var conversationalAnalyticsErrors = await conversationsAnalysisProvider.AddConversationalEntitiesAsync(audioFileInfo.TextAnalyticsRequests.ConversationRequests.Select(request => request.Id), speechTranscript).ConfigureAwait(false); - textAnalyticsErrors.AddRange(conversationalAnalyticsErrors); - } + if (audioFileInfo.TextAnalyticsRequests.ConversationRequests?.Any() == true) + { + var conversationalAnalyticsErrors = await conversationsAnalysisProvider.AddConversationalEntitiesAsync(audioFileInfo.TextAnalyticsRequests.ConversationRequests.Select(request => request.Id), speechTranscript).ConfigureAwait(false); + textAnalyticsErrors.AddRange(conversationalAnalyticsErrors); + } - if (textAnalyticsErrors.Any()) - { - var distinctErrors = textAnalyticsErrors.Distinct(); - var errorMessage = $"File {(string.IsNullOrEmpty(fileName) ? "unknown" : fileName)}:\n{string.Join('\n', distinctErrors)}"; + if (textAnalyticsErrors.Any()) + { + var distinctErrors = textAnalyticsErrors.Distinct(); + var errorMessage = $"File {(string.IsNullOrEmpty(fileName) ? "unknown" : fileName)}:\n{string.Join('\n', distinctErrors)}"; - generalErrorsStringBuilder.AppendLine(errorMessage); - } + generalErrorsStringBuilder.AppendLine(errorMessage); } } - else + } + else if (transcriptionAnalyticsJobsStatus == TranscriptionAnalyticsJobStatus.NotStarted) + { + foreach (var speechTranscriptMapping in speechTranscriptMappings) { - foreach (var speechTranscriptMapping in speechTranscriptMappings) - { - var speechTranscript = speechTranscriptMapping.Value; - var audioFileInfo = speechTranscriptMapping.Key; + var speechTranscript = speechTranscriptMapping.Value; + var audioFileInfo = speechTranscriptMapping.Key; - var fileName = audioFileInfo.FileName; + var fileName = audioFileInfo.FileName; - if (speechTranscript.RecognizedPhrases != null && speechTranscript.RecognizedPhrases.All(phrase => phrase.RecognitionStatus.Equals("Success", StringComparison.Ordinal))) - { - var textAnalyticsErrors = new List(); + if (speechTranscript.RecognizedPhrases != null && speechTranscript.RecognizedPhrases.All(phrase => phrase.RecognitionStatus.Equals("Success", StringComparison.Ordinal))) + { + var textAnalyticsErrors = new List(); - (var utteranceLevelJobIds, var utteranceLevelErrors) = await textAnalyticsProvider.SubmitUtteranceLevelRequests( - speechTranscript, - FetchTranscriptionEnvironmentVariables.SentimentAnalysisSetting).ConfigureAwait(false); + (var utteranceLevelJobIds, var utteranceLevelErrors) = await textAnalyticsProvider.SubmitUtteranceLevelRequests( + speechTranscript, + FetchTranscriptionEnvironmentVariables.SentimentAnalysisSetting).ConfigureAwait(false); - var utteranceLevelRequests = utteranceLevelJobIds?.Select(jobId => new TextAnalyticsRequest(jobId, TextAnalyticsRequestStatus.Running)); - textAnalyticsErrors.AddRange(utteranceLevelErrors); + var utteranceLevelRequests = utteranceLevelJobIds?.Select(jobId => new TextAnalyticsRequest(jobId, TextAnalyticsRequestStatus.Running)); + textAnalyticsErrors.AddRange(utteranceLevelErrors); - (var audioLevelJobIds, var audioLevelErrors) = await textAnalyticsProvider.SubmitAudioLevelRequests( - speechTranscript, - FetchTranscriptionEnvironmentVariables.SentimentAnalysisSetting, - FetchTranscriptionEnvironmentVariables.PiiRedactionSetting).ConfigureAwait(false); + (var audioLevelJobIds, var audioLevelErrors) = await textAnalyticsProvider.SubmitAudioLevelRequests( + speechTranscript, + FetchTranscriptionEnvironmentVariables.SentimentAnalysisSetting, + FetchTranscriptionEnvironmentVariables.PiiRedactionSetting).ConfigureAwait(false); - var audioLevelRequests = audioLevelJobIds?.Select(jobId => new TextAnalyticsRequest(jobId, TextAnalyticsRequestStatus.Running)); - textAnalyticsErrors.AddRange(audioLevelErrors); + var audioLevelRequests = audioLevelJobIds?.Select(jobId => new TextAnalyticsRequest(jobId, TextAnalyticsRequestStatus.Running)); + textAnalyticsErrors.AddRange(audioLevelErrors); - (var conversationJobIds, var conversationErrors) = await conversationsAnalysisProvider.SubmitAnalyzeConversationsRequestAsync(speechTranscript).ConfigureAwait(false); + (var conversationJobIds, var conversationErrors) = await conversationsAnalysisProvider.SubmitAnalyzeConversationsRequestAsync(speechTranscript).ConfigureAwait(false); - var conversationalRequests = conversationJobIds?.Select(jobId => new TextAnalyticsRequest(jobId, TextAnalyticsRequestStatus.Running)); - textAnalyticsErrors.AddRange(conversationErrors); + var conversationalRequests = conversationJobIds?.Select(jobId => new TextAnalyticsRequest(jobId, TextAnalyticsRequestStatus.Running)); + textAnalyticsErrors.AddRange(conversationErrors); - audioFileInfo.TextAnalyticsRequests = new TextAnalyticsRequests(utteranceLevelRequests, audioLevelRequests, conversationalRequests); + audioFileInfo.TextAnalyticsRequests = new TextAnalyticsRequests(utteranceLevelRequests, audioLevelRequests, conversationalRequests); - if (textAnalyticsErrors.Any()) - { - var distinctErrors = textAnalyticsErrors.Distinct(); - var errorMessage = $"File {(string.IsNullOrEmpty(fileName) ? "unknown" : fileName)}:\n{string.Join('\n', distinctErrors)}"; + if (textAnalyticsErrors.Any()) + { + var distinctErrors = textAnalyticsErrors.Distinct(); + var errorMessage = $"File {(string.IsNullOrEmpty(fileName) ? "unknown" : fileName)}:\n{string.Join('\n', distinctErrors)}"; - generalErrorsStringBuilder.AppendLine(errorMessage); - } + generalErrorsStringBuilder.AppendLine(errorMessage); } } + } - log.LogInformation($"Added text analytics requests to service bus message - re-queueing message."); + log.LogInformation($"Added text analytics requests to service bus message - re-queueing message."); - // Poll for first time with TA request after 1 minute - await ServiceBusUtilities.SendServiceBusMessageAsync(FetchServiceBusSender, serviceBusMessage.CreateMessageString(), log, TimeSpan.FromMinutes(1)).ConfigureAwait(false); - return; - } + // Poll for first time with TA request after 1 minute + await ServiceBusUtilities.SendServiceBusMessageAsync(FetchServiceBusSender, serviceBusMessage.CreateMessageString(), log, TimeSpan.FromMinutes(1)).ConfigureAwait(false); + return; } foreach (var speechTranscriptMapping in speechTranscriptMappings) diff --git a/samples/ingestion/ingestion-client/Setup/ArmTemplateBatch.json b/samples/ingestion/ingestion-client/Setup/ArmTemplateBatch.json index c93c80789..cc64569d5 100644 --- a/samples/ingestion/ingestion-client/Setup/ArmTemplateBatch.json +++ b/samples/ingestion/ingestion-client/Setup/ArmTemplateBatch.json @@ -227,7 +227,7 @@ } }, "variables": { - "Version": "v2.0.5", + "Version": "v2.0.6", "AudioInputContainer": "audio-input", "AudioProcessedContainer": "audio-processed", "ErrorFilesOutputContainer": "audio-failed", diff --git a/samples/ingestion/ingestion-client/Setup/ArmTemplateRealtime.json b/samples/ingestion/ingestion-client/Setup/ArmTemplateRealtime.json index cb5d7a7b2..a481d8ac9 100644 --- a/samples/ingestion/ingestion-client/Setup/ArmTemplateRealtime.json +++ b/samples/ingestion/ingestion-client/Setup/ArmTemplateRealtime.json @@ -123,7 +123,7 @@ } }, "variables": { - "Version": "v2.0.5", + "Version": "v2.0.6", "AudioInputContainer": "audio-input", "AudioProcessedContainer": "audio-processed", "ErrorFilesOutputContainer": "audio-failed",