Skip to content

Commit

Permalink
Add SourceUrlCache
Browse files Browse the repository at this point in the history
  • Loading branch information
vplauzon committed Dec 26, 2024
1 parent e7fbf3a commit ebbbffc
Show file tree
Hide file tree
Showing 11 changed files with 238 additions and 75 deletions.
51 changes: 49 additions & 2 deletions code/KustoCopyConsole/Entity/InMemory/RowItemInMemoryCache.cs
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,11 @@ public IEnumerable<RowItemBase> GetItems()
foreach (var block in sourceTableIteration.BlockMap.Values)
{
yield return block.RowItem;
foreach (var url in block.Urls)
{
yield return url.RowItem;
}

}
}
}
Expand All @@ -60,6 +65,8 @@ private IImmutableDictionary<TableIdentity, SourceTableCache> AppendItemToCache(
return AppendSourceTable(st);
case SourceBlockRowItem sb:
return AppendSourceBlock(sb);
case SourceUrlRowItem url:
return AppendSourceUrl(url);
default:
throw new NotSupportedException(
$"Not supported row item type: {item.GetType().Name}");
Expand All @@ -76,7 +83,7 @@ private IImmutableDictionary<TableIdentity, SourceTableCache> AppendSourceTable(
return _sourceTableMap.SetItem(
tableId,
_sourceTableMap[tableId].AppendIteration(
new SourceTableIterationCache(item)));
new SourceIterationCache(item)));
}
else
{
Expand All @@ -99,7 +106,47 @@ private IImmutableDictionary<TableIdentity, SourceTableCache> AppendSourceBlock(

return _sourceTableMap.SetItem(
tableId,
sourceTable.AppendIteration(sourceIteration.AppendBlock(item)));
sourceTable.AppendIteration(
sourceIteration.AppendBlock(new SourceBlockCache(item))));
}
else
{
throw new NotSupportedException("Iteration should come before block in logs");
}
}
else
{
throw new NotSupportedException("Table should come before block in logs");
}
}

private IImmutableDictionary<TableIdentity, SourceTableCache> AppendSourceUrl(
SourceUrlRowItem item)
{
var tableId = item.SourceTable;

if (_sourceTableMap.ContainsKey(tableId))
{
var sourceTable = _sourceTableMap[tableId];

if (sourceTable.IterationMap.ContainsKey(item.IterationId))
{
var sourceIteration = sourceTable.IterationMap[item.IterationId];

if(sourceIteration.BlockMap.ContainsKey(item.BlockId))
{
var block = sourceIteration.BlockMap[item.BlockId];

return _sourceTableMap.SetItem(
tableId,
sourceTable.AppendIteration(
sourceIteration.AppendBlock(
block.AppendUrl(new SourceUrlCache(item)))));
}
else
{
throw new NotSupportedException("Block should come before url in logs");
}
}
else
{
Expand Down
47 changes: 47 additions & 0 deletions code/KustoCopyConsole/Entity/InMemory/SourceBlockCache.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
using KustoCopyConsole.Entity.RowItems;
using KustoCopyConsole.Entity.State;
using System.Collections.Immutable;

namespace KustoCopyConsole.Entity.InMemory
{
internal class SourceBlockCache : CacheBase<SourceBlockRowItem>
{
private SourceBlockCache(SourceBlockRowItem item, IImmutableList<SourceUrlCache> urls)
: base(item)
{
Urls = urls;
}

public SourceBlockCache(SourceBlockRowItem item)
: this(item, ImmutableArray<SourceUrlCache>.Empty)
{
}

public IImmutableList<SourceUrlCache> Urls { get; }

public SourceBlockCache AppendUrl(SourceUrlCache url)
{
var existingUrlCache = Urls
.Where(u => u.RowItem.Url == url.RowItem.Url)
.FirstOrDefault();

if (existingUrlCache != null)
{
var newUrls = Urls.Remove(existingUrlCache);

if (url.RowItem.State != SourceUrlState.Deleted)
{
newUrls = newUrls.Add(url);
}

return new SourceBlockCache(RowItem, newUrls);
}
else
{
var newUrls = Urls.Add(url);

return new SourceBlockCache(RowItem, newUrls);
}
}
}
}
30 changes: 30 additions & 0 deletions code/KustoCopyConsole/Entity/InMemory/SourceIterationCache.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
using KustoCopyConsole.Entity.RowItems;
using System.Collections.Immutable;

namespace KustoCopyConsole.Entity.InMemory
{
internal class SourceIterationCache : CacheBase<SourceTableRowItem>
{
private SourceIterationCache(
SourceTableRowItem item,
IImmutableDictionary<long, SourceBlockCache> blockMap)
: base(item)
{
BlockMap = blockMap;
}

public SourceIterationCache(SourceTableRowItem item)
: this(item, ImmutableDictionary<long, SourceBlockCache>.Empty)
{
}

public IImmutableDictionary<long, SourceBlockCache> BlockMap { get; }

public SourceIterationCache AppendBlock(SourceBlockCache block)
{
var newBlockMap = BlockMap.SetItem(block.RowItem.BlockId, block);

return new SourceIterationCache(RowItem, newBlockMap);
}
}
}
12 changes: 0 additions & 12 deletions code/KustoCopyConsole/Entity/InMemory/SourceTableBlockCache.cs

This file was deleted.

10 changes: 5 additions & 5 deletions code/KustoCopyConsole/Entity/InMemory/SourceTableCache.cs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ namespace KustoCopyConsole.Entity.InMemory
internal class SourceTableCache
{
public SourceTableCache(
IImmutableDictionary<long, SourceTableIterationCache> iterationMap)
IImmutableDictionary<long, SourceIterationCache> iterationMap)
{
IterationMap = iterationMap;
}
Expand All @@ -15,14 +15,14 @@ public SourceTableCache(SourceTableRowItem iterationItem)
{
var iterationId = iterationItem.IterationId;

IterationMap = ImmutableDictionary<long, SourceTableIterationCache>
IterationMap = ImmutableDictionary<long, SourceIterationCache>
.Empty
.Add(iterationId, new SourceTableIterationCache(iterationItem));
.Add(iterationId, new SourceIterationCache(iterationItem));
}

public IImmutableDictionary<long, SourceTableIterationCache> IterationMap { get; }
public IImmutableDictionary<long, SourceIterationCache> IterationMap { get; }

public SourceTableCache AppendIteration(SourceTableIterationCache iteration)
public SourceTableCache AppendIteration(SourceIterationCache iteration)
{
var iterationId = iteration.RowItem.IterationId;

Expand Down
37 changes: 0 additions & 37 deletions code/KustoCopyConsole/Entity/InMemory/SourceTableIterationCache.cs

This file was deleted.

13 changes: 13 additions & 0 deletions code/KustoCopyConsole/Entity/InMemory/SourceUrlCache.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
using KustoCopyConsole.Entity.RowItems;
using System.Collections.Immutable;

namespace KustoCopyConsole.Entity.InMemory
{
internal class SourceUrlCache : CacheBase<SourceUrlRowItem>
{
public SourceUrlCache(SourceUrlRowItem item)
: base(item)
{
}
}
}
50 changes: 50 additions & 0 deletions code/KustoCopyConsole/Entity/RowItems/SourceUrlRowItem.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
using KustoCopyConsole.Entity.State;
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;

namespace KustoCopyConsole.Entity.RowItems
{
internal class SourceUrlRowItem : RowItemBase
{
public SourceUrlState State { get; set; }

public TableIdentity SourceTable { get; set; } = TableIdentity.Empty;

public long IterationId { get; set; }

public long BlockId { get; set; }

public string Url { get; set; } = string.Empty;

public override void Validate()
{
SourceTable.Validate();
if (IterationId < 1)
{
throw new InvalidDataException(
$"{nameof(IterationId)} should be positive but is {IterationId}");
}
if (BlockId < 1)
{
throw new InvalidDataException(
$"{nameof(BlockId)} should be positive but is {BlockId}");
}
if (!Uri.TryCreate(Url, UriKind.Absolute, out _))
{
throw new InvalidDataException($"{nameof(Url)} is invalid: {Url}");
}
}

public SourceUrlRowItem ChangeState(SourceUrlState newState)
{
var clone = (SourceUrlRowItem)Clone();

clone.State = newState;

return clone;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,9 @@

namespace KustoCopyConsole.Runner
{
internal class SourceTableExportingRunner : RunnerBase
internal class SourceExportingRunner : RunnerBase
{
public SourceTableExportingRunner(
public SourceExportingRunner(
MainJobParameterization parameterization,
RowItemGateway rowItemGateway,
DbClientFactory dbClientFactory)
Expand All @@ -23,10 +23,39 @@ public async Task RunAsync(
DateTime ingestionTimeEnd,
CancellationToken ct)
{
if (!RowItemGateway.InMemoryCache
var blockItem = await EnsureBlockCreatedAsync(
sourceTableRowItem,
blockId,
ingestionTimeStart,
ingestionTimeEnd,
ct);

if (blockItem.State != SourceBlockState.Exported)
{
await CleanUrlsAsync(blockItem);
}
}

private async Task CleanUrlsAsync(SourceBlockRowItem blockItem)
{
await Task.CompletedTask;

throw new NotImplementedException();
}

private async Task<SourceBlockRowItem> EnsureBlockCreatedAsync(
SourceTableRowItem sourceTableRowItem,
long blockId,
DateTime ingestionTimeStart,
DateTime ingestionTimeEnd,
CancellationToken ct)
{
var blockMap = RowItemGateway.InMemoryCache
.SourceTableMap[sourceTableRowItem.SourceTable]
.IterationMap[sourceTableRowItem.IterationId]
.BlockMap.ContainsKey(blockId))
.BlockMap;

if (!blockMap.ContainsKey(blockId))
{
var newBlockItem = new SourceBlockRowItem
{
Expand All @@ -39,16 +68,12 @@ public async Task RunAsync(
};

await RowItemGateway.AppendAsync(newBlockItem, ct);
}

var blockItem = RowItemGateway.InMemoryCache
.SourceTableMap[sourceTableRowItem.SourceTable]
.IterationMap[sourceTableRowItem.IterationId]
.BlockMap[blockId]
.RowItem;

if (blockItem.State == SourceBlockState.Exporting)
return newBlockItem;
}
else
{
return blockMap[blockId].RowItem;
}
}
}
Expand Down
Loading

0 comments on commit ebbbffc

Please sign in to comment.