Skip to content

Commit

Permalink
Refactor transcription analytics status check (#1951)
Browse files Browse the repository at this point in the history
  • Loading branch information
HenryvanderVegte authored May 22, 2023
1 parent d806061 commit 82568af
Show file tree
Hide file tree
Showing 5 changed files with 145 additions and 96 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
// <copyright file="TranscriptionAnalyticsJobStatus.cs" company="Microsoft Corporation">
// Copyright (c) Microsoft Corporation. All rights reserved.
// Licensed under the MIT license. See LICENSE.md file in the project root for full license information.
// </copyright>

namespace Connector.Enums
{
public enum TranscriptionAnalyticsJobStatus
{
None,
NotStarted,
Running,
Completed
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
// <copyright file="TranscriptionStartedMessageExtensions.cs" company="Microsoft Corporation">
// Copyright (c) Microsoft Corporation. All rights reserved.
// Licensed under the MIT license. See LICENSE.md file in the project root for full license information.
// </copyright>

namespace FetchTranscriptionFunction
{
using System;
using System.Linq;
using System.Threading.Tasks;

using Connector;
using Connector.Enums;

using Language;
using Microsoft.Extensions.Logging;

using TextAnalytics;

public static class TranscriptionStartedMessageExtensions
{
public static async Task<TranscriptionAnalyticsJobStatus> GetTranscriptionAnalyticsJobsStatusAsync(
this TranscriptionStartedMessage transcriptionStartedMessage,
TextAnalyticsProvider textAnalyticsProvider,
AnalyzeConversationsProvider analyzeConversationsProvider,
ILogger logger)
{
_ = transcriptionStartedMessage ?? throw new ArgumentNullException(nameof(transcriptionStartedMessage));

if (textAnalyticsProvider == null && analyzeConversationsProvider == null)
{
return TranscriptionAnalyticsJobStatus.None;
}

var textAnalyticsRequestCompleted = textAnalyticsProvider != null ? await textAnalyticsProvider.TextAnalyticsRequestsCompleted(transcriptionStartedMessage.AudioFileInfos).ConfigureAwait(false) : true;
var conversationalAnalyticsRequestCompleted = analyzeConversationsProvider != null ? await analyzeConversationsProvider.ConversationalRequestsCompleted(transcriptionStartedMessage.AudioFileInfos).ConfigureAwait(false) : true;

if (!textAnalyticsRequestCompleted || !conversationalAnalyticsRequestCompleted)
{
return TranscriptionAnalyticsJobStatus.Running;
}

// if the message already contains text analytics requests and all are in terminal state, treat jobs as completed:
var containsTextAnalyticsRequest = transcriptionStartedMessage.AudioFileInfos.Where(audioFileInfo => audioFileInfo.TextAnalyticsRequests != null).Any();
if (containsTextAnalyticsRequest)
{
return TranscriptionAnalyticsJobStatus.Completed;
}

return TranscriptionAnalyticsJobStatus.NotStarted;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -323,27 +323,17 @@ private async Task ProcessSucceededTranscriptionAsync(string transcriptionLocati
var textAnalyticsInfoProvided = !string.IsNullOrEmpty(textAnalyticsKey)
&& !string.IsNullOrEmpty(textAnalyticsRegion)
&& !textAnalyticsRegion.Equals("none", StringComparison.OrdinalIgnoreCase);

var conversationsAnalysisProvider = textAnalyticsInfoProvided ? new AnalyzeConversationsProvider(serviceBusMessage.Locale, textAnalyticsKey, textAnalyticsRegion, log) : null;

var textAnalyticsProvider = textAnalyticsInfoProvided ? new TextAnalyticsProvider(serviceBusMessage.Locale, textAnalyticsKey, textAnalyticsRegion, log) : null;

// Check if there is a text analytics request already running:
var containsTextAnalyticsRequest = serviceBusMessage.AudioFileInfos.Where(audioFileInfo => audioFileInfo.TextAnalyticsRequests != null).Any();
var transcriptionAnalyticsJobsStatus = await serviceBusMessage.GetTranscriptionAnalyticsJobsStatusAsync(textAnalyticsProvider, conversationsAnalysisProvider, log).ConfigureAwait(false);

if (containsTextAnalyticsRequest && textAnalyticsProvider != null)
if (transcriptionAnalyticsJobsStatus == TranscriptionAnalyticsJobStatus.Running)
{
var textAnalyticsRequestCompleted = await textAnalyticsProvider.TextAnalyticsRequestsCompleted(serviceBusMessage.AudioFileInfos).ConfigureAwait(false);

var conversationalAnalyticsRequestCompleted = await conversationsAnalysisProvider.ConversationalRequestsCompleted(serviceBusMessage.AudioFileInfos).ConfigureAwait(false);

// If text analytics request is still running, re-queue message and get status again after X minutes
if (!textAnalyticsRequestCompleted || !conversationalAnalyticsRequestCompleted)
{
log.LogInformation($"Text analytics request still running for job {jobName} - re-queueing message.");
await ServiceBusUtilities.SendServiceBusMessageAsync(FetchServiceBusSender, serviceBusMessage.CreateMessageString(), log, GetMessageDelayTime(serviceBusMessage.PollingCounter)).ConfigureAwait(false);
return;
}
// If transcription analytics request is still running, re-queue message and get status again after X minutes:
log.LogInformation($"Transcription analytics requests still running for job {jobName} - re-queueing message.");
await ServiceBusUtilities.SendServiceBusMessageAsync(FetchServiceBusSender, serviceBusMessage.CreateMessageString(), log, GetMessageDelayTime(serviceBusMessage.PollingCounter)).ConfigureAwait(false);
return;
}

var transcriptionFiles = await BatchClient.GetTranscriptionFilesAsync(transcriptionLocation, subscriptionKey).ConfigureAwait(false);
Expand Down Expand Up @@ -392,113 +382,104 @@ private async Task ProcessSucceededTranscriptionAsync(string transcriptionLocati
}
}

if (textAnalyticsProvider != null &&
(FetchTranscriptionEnvironmentVariables.SentimentAnalysisSetting != SentimentAnalysisSetting.None
|| FetchTranscriptionEnvironmentVariables.PiiRedactionSetting != PiiRedactionSetting.None
|| AnalyzeConversationsProvider.IsConversationalPiiEnabled()
|| AnalyzeConversationsProvider.IsConversationalSummarizationEnabled()))
if (transcriptionAnalyticsJobsStatus == TranscriptionAnalyticsJobStatus.Completed)
{
// If we already got text analytics requests in the transcript (containsTextAnalyticsRequest), add the results to the transcript.
// Otherwise, submit new text analytics requests.
if (containsTextAnalyticsRequest)
foreach (var speechTranscriptMapping in speechTranscriptMappings)
{
foreach (var speechTranscriptMapping in speechTranscriptMappings)
var speechTranscript = speechTranscriptMapping.Value;
var audioFileInfo = speechTranscriptMapping.Key;
var fileName = audioFileInfo.FileName;
if (FetchTranscriptionEnvironmentVariables.PiiRedactionSetting != PiiRedactionSetting.None)
{
var speechTranscript = speechTranscriptMapping.Value;
var audioFileInfo = speechTranscriptMapping.Key;
var fileName = audioFileInfo.FileName;
if (FetchTranscriptionEnvironmentVariables.PiiRedactionSetting != PiiRedactionSetting.None)
speechTranscript.RecognizedPhrases.ToList().ForEach(phrase =>
{
speechTranscript.RecognizedPhrases.ToList().ForEach(phrase =>
if (phrase.NBest != null && phrase.NBest.Any())
{
if (phrase.NBest != null && phrase.NBest.Any())
{
var firstNBest = phrase.NBest.First();
phrase.NBest = new[] { firstNBest };
}
});
}
var firstNBest = phrase.NBest.First();
phrase.NBest = new[] { firstNBest };
}
});
}

var textAnalyticsErrors = new List<string>();
var textAnalyticsErrors = new List<string>();

if (audioFileInfo.TextAnalyticsRequests.AudioLevelRequests?.Any() == true)
{
var audioLevelErrors = await textAnalyticsProvider.AddAudioLevelEntitiesAsync(audioFileInfo.TextAnalyticsRequests.AudioLevelRequests.Select(request => request.Id), speechTranscript).ConfigureAwait(false);
textAnalyticsErrors.AddRange(audioLevelErrors);
}
if (audioFileInfo.TextAnalyticsRequests.AudioLevelRequests?.Any() == true)
{
var audioLevelErrors = await textAnalyticsProvider.AddAudioLevelEntitiesAsync(audioFileInfo.TextAnalyticsRequests.AudioLevelRequests.Select(request => request.Id), speechTranscript).ConfigureAwait(false);
textAnalyticsErrors.AddRange(audioLevelErrors);
}

if (audioFileInfo.TextAnalyticsRequests.UtteranceLevelRequests?.Any() == true)
{
var utteranceLevelErrors = await textAnalyticsProvider.AddUtteranceLevelEntitiesAsync(audioFileInfo.TextAnalyticsRequests.UtteranceLevelRequests.Select(request => request.Id), speechTranscript).ConfigureAwait(false);
textAnalyticsErrors.AddRange(utteranceLevelErrors);
}
if (audioFileInfo.TextAnalyticsRequests.UtteranceLevelRequests?.Any() == true)
{
var utteranceLevelErrors = await textAnalyticsProvider.AddUtteranceLevelEntitiesAsync(audioFileInfo.TextAnalyticsRequests.UtteranceLevelRequests.Select(request => request.Id), speechTranscript).ConfigureAwait(false);
textAnalyticsErrors.AddRange(utteranceLevelErrors);
}

if (audioFileInfo.TextAnalyticsRequests.ConversationRequests?.Any() == true)
{
var conversationalAnalyticsErrors = await conversationsAnalysisProvider.AddConversationalEntitiesAsync(audioFileInfo.TextAnalyticsRequests.ConversationRequests.Select(request => request.Id), speechTranscript).ConfigureAwait(false);
textAnalyticsErrors.AddRange(conversationalAnalyticsErrors);
}
if (audioFileInfo.TextAnalyticsRequests.ConversationRequests?.Any() == true)
{
var conversationalAnalyticsErrors = await conversationsAnalysisProvider.AddConversationalEntitiesAsync(audioFileInfo.TextAnalyticsRequests.ConversationRequests.Select(request => request.Id), speechTranscript).ConfigureAwait(false);
textAnalyticsErrors.AddRange(conversationalAnalyticsErrors);
}

if (textAnalyticsErrors.Any())
{
var distinctErrors = textAnalyticsErrors.Distinct();
var errorMessage = $"File {(string.IsNullOrEmpty(fileName) ? "unknown" : fileName)}:\n{string.Join('\n', distinctErrors)}";
if (textAnalyticsErrors.Any())
{
var distinctErrors = textAnalyticsErrors.Distinct();
var errorMessage = $"File {(string.IsNullOrEmpty(fileName) ? "unknown" : fileName)}:\n{string.Join('\n', distinctErrors)}";

generalErrorsStringBuilder.AppendLine(errorMessage);
}
generalErrorsStringBuilder.AppendLine(errorMessage);
}
}
else
}
else if (transcriptionAnalyticsJobsStatus == TranscriptionAnalyticsJobStatus.NotStarted)
{
foreach (var speechTranscriptMapping in speechTranscriptMappings)
{
foreach (var speechTranscriptMapping in speechTranscriptMappings)
{
var speechTranscript = speechTranscriptMapping.Value;
var audioFileInfo = speechTranscriptMapping.Key;
var speechTranscript = speechTranscriptMapping.Value;
var audioFileInfo = speechTranscriptMapping.Key;

var fileName = audioFileInfo.FileName;
var fileName = audioFileInfo.FileName;

if (speechTranscript.RecognizedPhrases != null && speechTranscript.RecognizedPhrases.All(phrase => phrase.RecognitionStatus.Equals("Success", StringComparison.Ordinal)))
{
var textAnalyticsErrors = new List<string>();
if (speechTranscript.RecognizedPhrases != null && speechTranscript.RecognizedPhrases.All(phrase => phrase.RecognitionStatus.Equals("Success", StringComparison.Ordinal)))
{
var textAnalyticsErrors = new List<string>();

(var utteranceLevelJobIds, var utteranceLevelErrors) = await textAnalyticsProvider.SubmitUtteranceLevelRequests(
speechTranscript,
FetchTranscriptionEnvironmentVariables.SentimentAnalysisSetting).ConfigureAwait(false);
(var utteranceLevelJobIds, var utteranceLevelErrors) = await textAnalyticsProvider.SubmitUtteranceLevelRequests(
speechTranscript,
FetchTranscriptionEnvironmentVariables.SentimentAnalysisSetting).ConfigureAwait(false);

var utteranceLevelRequests = utteranceLevelJobIds?.Select(jobId => new TextAnalyticsRequest(jobId, TextAnalyticsRequestStatus.Running));
textAnalyticsErrors.AddRange(utteranceLevelErrors);
var utteranceLevelRequests = utteranceLevelJobIds?.Select(jobId => new TextAnalyticsRequest(jobId, TextAnalyticsRequestStatus.Running));
textAnalyticsErrors.AddRange(utteranceLevelErrors);

(var audioLevelJobIds, var audioLevelErrors) = await textAnalyticsProvider.SubmitAudioLevelRequests(
speechTranscript,
FetchTranscriptionEnvironmentVariables.SentimentAnalysisSetting,
FetchTranscriptionEnvironmentVariables.PiiRedactionSetting).ConfigureAwait(false);
(var audioLevelJobIds, var audioLevelErrors) = await textAnalyticsProvider.SubmitAudioLevelRequests(
speechTranscript,
FetchTranscriptionEnvironmentVariables.SentimentAnalysisSetting,
FetchTranscriptionEnvironmentVariables.PiiRedactionSetting).ConfigureAwait(false);

var audioLevelRequests = audioLevelJobIds?.Select(jobId => new TextAnalyticsRequest(jobId, TextAnalyticsRequestStatus.Running));
textAnalyticsErrors.AddRange(audioLevelErrors);
var audioLevelRequests = audioLevelJobIds?.Select(jobId => new TextAnalyticsRequest(jobId, TextAnalyticsRequestStatus.Running));
textAnalyticsErrors.AddRange(audioLevelErrors);

(var conversationJobIds, var conversationErrors) = await conversationsAnalysisProvider.SubmitAnalyzeConversationsRequestAsync(speechTranscript).ConfigureAwait(false);
(var conversationJobIds, var conversationErrors) = await conversationsAnalysisProvider.SubmitAnalyzeConversationsRequestAsync(speechTranscript).ConfigureAwait(false);

var conversationalRequests = conversationJobIds?.Select(jobId => new TextAnalyticsRequest(jobId, TextAnalyticsRequestStatus.Running));
textAnalyticsErrors.AddRange(conversationErrors);
var conversationalRequests = conversationJobIds?.Select(jobId => new TextAnalyticsRequest(jobId, TextAnalyticsRequestStatus.Running));
textAnalyticsErrors.AddRange(conversationErrors);

audioFileInfo.TextAnalyticsRequests = new TextAnalyticsRequests(utteranceLevelRequests, audioLevelRequests, conversationalRequests);
audioFileInfo.TextAnalyticsRequests = new TextAnalyticsRequests(utteranceLevelRequests, audioLevelRequests, conversationalRequests);

if (textAnalyticsErrors.Any())
{
var distinctErrors = textAnalyticsErrors.Distinct();
var errorMessage = $"File {(string.IsNullOrEmpty(fileName) ? "unknown" : fileName)}:\n{string.Join('\n', distinctErrors)}";
if (textAnalyticsErrors.Any())
{
var distinctErrors = textAnalyticsErrors.Distinct();
var errorMessage = $"File {(string.IsNullOrEmpty(fileName) ? "unknown" : fileName)}:\n{string.Join('\n', distinctErrors)}";

generalErrorsStringBuilder.AppendLine(errorMessage);
}
generalErrorsStringBuilder.AppendLine(errorMessage);
}
}
}

log.LogInformation($"Added text analytics requests to service bus message - re-queueing message.");
log.LogInformation($"Added text analytics requests to service bus message - re-queueing message.");

// Poll for first time with TA request after 1 minute
await ServiceBusUtilities.SendServiceBusMessageAsync(FetchServiceBusSender, serviceBusMessage.CreateMessageString(), log, TimeSpan.FromMinutes(1)).ConfigureAwait(false);
return;
}
// Poll for first time with TA request after 1 minute
await ServiceBusUtilities.SendServiceBusMessageAsync(FetchServiceBusSender, serviceBusMessage.CreateMessageString(), log, TimeSpan.FromMinutes(1)).ConfigureAwait(false);
return;
}

foreach (var speechTranscriptMapping in speechTranscriptMappings)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -227,7 +227,7 @@
}
},
"variables": {
"Version": "v2.0.5",
"Version": "v2.0.6",
"AudioInputContainer": "audio-input",
"AudioProcessedContainer": "audio-processed",
"ErrorFilesOutputContainer": "audio-failed",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -123,7 +123,7 @@
}
},
"variables": {
"Version": "v2.0.5",
"Version": "v2.0.6",
"AudioInputContainer": "audio-input",
"AudioProcessedContainer": "audio-processed",
"ErrorFilesOutputContainer": "audio-failed",
Expand Down

0 comments on commit 82568af

Please sign in to comment.