Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix #115 Concurrency issue when adding documents. #116

Open
wants to merge 2 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
using System.Threading.Tasks;
using Exceptionless.DateTimeExtensions;
using Foundatio.Caching;
using Foundatio.Lock;
using Foundatio.Parsers.ElasticQueries;
using Foundatio.Parsers.ElasticQueries.Extensions;
using Foundatio.Repositories.Elasticsearch.Extensions;
Expand All @@ -30,19 +31,22 @@ public class DailyIndex : VersionedIndex
private TimeSpan? _maxIndexAge;
protected readonly Func<object, DateTime> _getDocumentDateUtc;
protected readonly string[] _defaultIndexes;
private readonly CacheLockProvider _ensureIndexLock;
private readonly Dictionary<DateTime, object> _ensuredDates = new();

public DailyIndex(IElasticConfiguration configuration, string name, int version = 1, Func<object, DateTime> getDocumentDateUtc = null)
: base(configuration, name, version)
{
AddAlias(Name);
_frozenAliases = new Lazy<IReadOnlyCollection<IndexAliasAge>>(() => _aliases.AsReadOnly());
_aliasCache = new ScopedCacheClient(configuration.Cache, "alias");
_ensureIndexLock = new CacheLockProvider(configuration.Cache, configuration.MessageBus, configuration.LoggerFactory);
_getDocumentDateUtc = getDocumentDateUtc;
_defaultIndexes = new[] { Name };
HasMultipleIndexes = true;

if (_getDocumentDateUtc != null)
_getDocumentDateUtc = (document) =>
_getDocumentDateUtc = document =>
{
var date = getDocumentDateUtc(document);
return date != DateTime.MinValue ? date : DefaultDocumentDateFunc(document);
Expand Down Expand Up @@ -131,13 +135,19 @@ protected override DateTime GetIndexDate(string index)
return DateTime.MaxValue;
}

private readonly Dictionary<DateTime, object> _ensuredDates = new();
protected async Task EnsureDateIndexAsync(DateTime utcDate)
{
utcDate = utcDate.Date;
if (_ensuredDates.ContainsKey(utcDate))
return;

await using var indexLock = await _ensureIndexLock.AcquireAsync($"Index:{GetVersionedIndex(utcDate)}", TimeSpan.FromMinutes(1)).AnyContext();
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It kind of feels like we should be locking on the index name here, but this allows us to go wide. I'm wondering if we should also change this to a concurrent dictionary.

if (indexLock is null)
throw new Exception("Unable to acquire index lock");

if (_ensuredDates.ContainsKey(utcDate))
return;

var indexExpirationUtcDate = GetIndexExpirationDate(utcDate);
if (Configuration.TimeProvider.GetUtcNow().UtcDateTime > indexExpirationUtcDate)
throw new ArgumentException($"Index max age exceeded: {indexExpirationUtcDate}", nameof(utcDate));
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,89 @@
using System;
using System.Linq;
using System.Threading.Tasks;
using Foundatio.Repositories.Elasticsearch.Tests.Repositories;
using Foundatio.Repositories.Elasticsearch.Tests.Repositories.Models;
using Foundatio.Repositories.Models;
using Foundatio.Utility;
using Microsoft.Extensions.Time.Testing;
using Xunit;
using Xunit.Abstractions;

namespace Foundatio.Repositories.Elasticsearch.Tests;

public sealed class DailyRepositoryTests : ElasticRepositoryTestBase
{
private readonly IFileAccessHistoryRepository _fileAccessHistoryRepository;

public DailyRepositoryTests(ITestOutputHelper output) : base(output)
{
_fileAccessHistoryRepository = new FileAccessHistoryRepository(_configuration.DailyFileAccessHistory);
}

public override async Task InitializeAsync()
{
await base.InitializeAsync();
await RemoveDataAsync();
}

[Fact]
public async Task AddAsyncWithCustomDateIndex()
{
var utcNow = new DateTime(2023, 1, 1, 0, 0, 0, DateTimeKind.Utc);
var history = await _fileAccessHistoryRepository.AddAsync(new FileAccessHistory { Path = "path1", AccessedDateUtc = utcNow }, o => o.ImmediateConsistency());
Assert.NotNull(history?.Id);

var result = await _fileAccessHistoryRepository.FindOneAsync(f => f.Id(history.Id));
Assert.Equal("file-access-history-daily-v1-2023.01.01", result.Data.GetString("index"));
}

[Fact]
public async Task AddAsyncWithCurrentDateViaDocumentsAdding()
{
_configuration.TimeProvider = new FakeTimeProvider(new DateTimeOffset(2023, 02, 1, 0, 0, 0, TimeSpan.Zero));

try
{
// NOTE: This has to be async handler as there is no way to remove a sync handler.
_fileAccessHistoryRepository.DocumentsAdding.AddHandler(OnDocumentsAdding);

var history = await _fileAccessHistoryRepository.AddAsync(new FileAccessHistory { Path = "path2" }, o => o.ImmediateConsistency());
Assert.NotNull(history?.Id);

var result = await _fileAccessHistoryRepository.FindOneAsync(f => f.Id(history.Id));
Assert.Equal("file-access-history-daily-v1-2023.02.01", result.Data.GetString("index"));
}
finally
{
_fileAccessHistoryRepository.DocumentsAdding.RemoveHandler(OnDocumentsAdding);
}
}

private Task OnDocumentsAdding(object sender, DocumentsEventArgs<FileAccessHistory> arg)
{
foreach (var document in arg.Documents)
{
if (document.AccessedDateUtc == DateTime.MinValue || document.AccessedDateUtc > _configuration.TimeProvider.GetUtcNow().UtcDateTime)
document.AccessedDateUtc = _configuration.TimeProvider.GetUtcNow().UtcDateTime;
}

return Task.CompletedTask;
}

[Fact]
public async Task CanAddAsync()
{
var history = await _fileAccessHistoryRepository.AddAsync(new FileAccessHistory { AccessedDateUtc = DateTime.UtcNow });
Assert.NotNull(history?.Id);
}

[Fact]
public Task AddAsyncConcurrentUpdates()
{
return Parallel.ForEachAsync(Enumerable.Range(0, 50), async (i, _) =>
{
var history = await _fileAccessHistoryRepository.AddAsync(new FileAccessHistory { AccessedDateUtc = DateTime.UtcNow });
Assert.NotNull(history?.Id);
});
}
}
10 changes: 4 additions & 6 deletions tests/Foundatio.Repositories.Elasticsearch.Tests/IndexTests.cs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
using System;
using System;
using System.Collections.Generic;
using System.Linq;
using System.Threading.Tasks;
Expand Down Expand Up @@ -120,10 +120,8 @@ public async Task GetByDateBasedIndexAsync()

await _configuration.DailyLogEvents.ConfigureAsync();


// TODO: Fix this once https://github.com/elastic/elasticsearch-net/issues/3829 is fixed in beta2
//var indexes = await _client.GetIndicesPointingToAliasAsync(_configuration.DailyLogEvents.Name);
//Assert.Empty(indexes);
var indexes = await _client.GetIndicesPointingToAliasAsync(_configuration.DailyLogEvents.Name);
Assert.Empty(indexes);

var alias = await _client.Indices.GetAliasAsync(_configuration.DailyLogEvents.Name);
_logger.LogRequest(alias);
Expand All @@ -142,7 +140,7 @@ public async Task GetByDateBasedIndexAsync()
Assert.True(alias.IsValid);
Assert.Equal(2, alias.Indices.Count);

var indexes = await _client.GetIndicesPointingToAliasAsync(_configuration.DailyLogEvents.Name);
indexes = await _client.GetIndicesPointingToAliasAsync(_configuration.DailyLogEvents.Name);
Assert.Equal(2, indexes.Count);

await repository.RemoveAllAsync(o => o.ImmediateConsistency());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ public sealed class MonthlyRepositoryTests : ElasticRepositoryTestBase

public MonthlyRepositoryTests(ITestOutputHelper output) : base(output)
{
_fileAccessHistoryRepository = new FileAccessHistoryRepository(_configuration);
_fileAccessHistoryRepository = new FileAccessHistoryRepository(_configuration.MonthlyFileAccessHistory);
}

public override async Task InitializeAsync()
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
using Foundatio.Repositories.Elasticsearch.Configuration;
using Foundatio.Repositories.Elasticsearch.Tests.Repositories.Models;
using Nest;

namespace Foundatio.Repositories.Elasticsearch.Tests.Repositories.Configuration.Indexes;

public sealed class DailyFileAccessHistoryIndex : DailyIndex<FileAccessHistory>
{
public DailyFileAccessHistoryIndex(IElasticConfiguration configuration) : base(configuration, "file-access-history-daily", 1, d => ((FileAccessHistory)d).AccessedDateUtc)
{
}

public override CreateIndexDescriptor ConfigureIndex(CreateIndexDescriptor idx)
{
return base.ConfigureIndex(idx.Settings(s => s.NumberOfReplicas(0).NumberOfShards(1)));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ public MyAppElasticConfiguration(IQueue<WorkItemData> workItemQueue, ICacheClien
AddIndex(DailyLogEvents = new DailyLogEventIndex(this));
AddIndex(MonthlyLogEvents = new MonthlyLogEventIndex(this));
AddIndex(ParentChild = new ParentChildIndex(this));
AddIndex(DailyFileAccessHistory = new DailyFileAccessHistoryIndex(this));
AddIndex(MonthlyFileAccessHistory = new MonthlyFileAccessHistoryIndex(this));
AddCustomFieldIndex(replicas: 0);
}
Expand Down Expand Up @@ -95,5 +96,6 @@ protected override void ConfigureSettings(ConnectionSettings settings)
public DailyLogEventIndex DailyLogEvents { get; }
public MonthlyLogEventIndex MonthlyLogEvents { get; }
public ParentChildIndex ParentChild { get; }
public DailyFileAccessHistoryIndex DailyFileAccessHistory { get; }
public MonthlyFileAccessHistoryIndex MonthlyFileAccessHistory { get; }
}
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
using Foundatio.Repositories.Elasticsearch.Tests.Repositories.Configuration;
using Foundatio.Repositories.Elasticsearch.Configuration;
using Foundatio.Repositories.Elasticsearch.Tests.Repositories.Models;

namespace Foundatio.Repositories.Elasticsearch.Tests.Repositories;
Expand All @@ -7,7 +7,11 @@ public interface IFileAccessHistoryRepository : ISearchableRepository<FileAccess

public class FileAccessHistoryRepository : ElasticRepositoryBase<FileAccessHistory>, IFileAccessHistoryRepository
{
public FileAccessHistoryRepository(MyAppElasticConfiguration elasticConfiguration) : base(elasticConfiguration.MonthlyFileAccessHistory)
public FileAccessHistoryRepository(DailyIndex<FileAccessHistory> dailyIndex) : base(dailyIndex)
{
}

public FileAccessHistoryRepository(MonthlyIndex<FileAccessHistory> monthlyIndex) : base(monthlyIndex)
{
}
}
Loading