Skip to content

Commit

Permalink
Implemented AzureLockFactory and added unit tests for AzureLock
Browse files Browse the repository at this point in the history
  • Loading branch information
tomlm committed Aug 21, 2024
1 parent 4d009b9 commit 14c7424
Show file tree
Hide file tree
Showing 4 changed files with 198 additions and 79 deletions.
145 changes: 113 additions & 32 deletions source/Lucene.Net.Store.Azure.Tests/AzureDirectoryTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
using System.Runtime.CompilerServices;
using System.Security.Cryptography;
using System.Text;
using System.Threading.Tasks;
using Azure.Storage.Blobs;
using Lucene.Net.Analysis.Standard;
using Lucene.Net.Documents;
Expand Down Expand Up @@ -98,15 +99,15 @@ public void TestReadAndWriteWithSubDirectory()
{
Trace.TraceInformation("Tests failed:\n{0}", x);
}

AssertFilesAreEqual(azureDirectory, expectedDirectory);
}

[TestMethod]
public void TestReadAndWriteWithTwoShardDirectories()
{
string containerName = $"{_containerRoot}/{GetMethodName()}";

var (azureDirectory1, expectedDirectory1) = Arrange($"{containerName}/shard1");
var (dog, cat, car) = InitializeCatalog(azureDirectory1, 1000, expectedDirectory1);

Expand All @@ -119,19 +120,17 @@ public void TestReadAndWriteWithTwoShardDirectories()
AssertFilesAreEqual(azureDirectory2, expectedDirectory2, "#1 shard2| ");

// delete all azureDirectory1 blobs
foreach (string file in azureDirectory1.ListAll())
{
foreach (string file in azureDirectory1.ListAll().Where(x => !x.EndsWith(".lock")))
azureDirectory1.DeleteFile(file);
foreach (string file in expectedDirectory1.ListAll())
expectedDirectory1.DeleteFile(file);
}

ValidateDirectory(azureDirectory2, dog2, cat2, car2);

foreach (string file in azureDirectory2.ListAll())
{
foreach (string file in azureDirectory2.ListAll().Where(x => !x.EndsWith(".lock")))
azureDirectory2.DeleteFile(file);
foreach (string file in expectedDirectory2.ListAll())
expectedDirectory2.DeleteFile(file);
}

AssertFilesAreEqual(azureDirectory1, expectedDirectory1, "#2 shard1| ");
AssertFilesAreEqual(azureDirectory2, expectedDirectory2, "#2 shard2| ");
Expand All @@ -142,7 +141,7 @@ public void TestReadAndWrite_WritingTwoConsecutiveTimes()
{
string containerName = $"{_containerRoot}/{GetMethodName()}";
var (azureDirectory, expectedDirectory) = Arrange(containerName);

var (dog, cat, car) = InitializeCatalog(azureDirectory, 500, expectedDirectory);
var (dog1, cat1, car1) = InitializeCatalog(azureDirectory, 500, expectedDirectory);
dog += dog1;
Expand Down Expand Up @@ -210,7 +209,7 @@ public void TestReadAndWriteWithTwoShardDirectories_WritingTwoConsecutiveTimes()

var (azureDirectory1, expectedDirectory1) = Arrange($"{containerName}/shard1");
var (azureDirectory2, expectedDirectory2) = Arrange($"{containerName}/shard2");

var (dog, cat, car) = InitializeCatalog(azureDirectory1, 500, expectedDirectory1);
var (dog1, cat1, car1) = InitializeCatalog(azureDirectory1, 500, expectedDirectory1);
dog += dog1;
Expand All @@ -229,25 +228,107 @@ public void TestReadAndWriteWithTwoShardDirectories_WritingTwoConsecutiveTimes()
AssertFilesAreEqual(azureDirectory2, expectedDirectory2, "#1 shard2| ");

// delete all azureDirectory1 blobs
foreach (string file in azureDirectory1.ListAll())
{
foreach (string file in azureDirectory1.ListAll().Where(x => !x.EndsWith(".lock")))
azureDirectory1.DeleteFile(file);
foreach (string file in expectedDirectory1.ListAll())
expectedDirectory1.DeleteFile(file);
}

ValidateDirectory(azureDirectory2, dog2, cat2, car2);

foreach (string file in azureDirectory2.ListAll())
{
foreach (string file in azureDirectory2.ListAll().Where(x => !x.EndsWith(".lock")))
azureDirectory2.DeleteFile(file);
foreach (string file in expectedDirectory2.ListAll())
expectedDirectory2.DeleteFile(file);
}


AssertFilesAreEqual(azureDirectory1, expectedDirectory1, "#2 shard1| ");
AssertFilesAreEqual(azureDirectory2, expectedDirectory2, "#2 shard2| ");

}


[TestMethod]
public void AzureLock_LockTest()
{
string containerName = $"{_containerRoot}/{GetMethodName()}";
var (azureDirectory, expectedDirectory) = Arrange(containerName);
var blobLock = azureDirectory.MakeLock("write.lock");
var fsLock = expectedDirectory.MakeLock("write.lock");
Assert.IsNotNull(fsLock);
Assert.IsNotNull(blobLock);

Assert.IsFalse(fsLock.IsLocked());
Assert.IsFalse(blobLock.IsLocked());

Assert.IsTrue(fsLock.Obtain());
Assert.IsTrue(blobLock.Obtain());

Assert.IsTrue(fsLock.IsLocked());
Assert.IsTrue(blobLock.IsLocked());

fsLock.Dispose();
blobLock.Dispose();

Assert.IsFalse(fsLock.IsLocked());
Assert.IsFalse(blobLock.IsLocked());

AssertFilesAreEqual(azureDirectory, expectedDirectory);
}

[TestMethod]
public async Task AzureLock_StressTest()
{
// run THREADCOUNT threads each attempting to grab and release the lock ITERATIONS times
var THREADCOUNT = 50;
var ITERATIONS = 5;
var DELAY = 10;
string containerName = $"{_containerRoot}/{GetMethodName()}";
var (azureDirectory, expectedDirectory) = Arrange(containerName);
Random rnd = new Random();
int[] locks = new int[THREADCOUNT];
int[] releases = new int[THREADCOUNT];
List<Task> tasks = new List<Task>();
for (int i = 0; i < THREADCOUNT; i++)
{
var blobLock = new AzureLock("write.lock", azureDirectory);
var instance = i;
tasks.Add(Task.Run(async () =>
{
for (int y = 0; y < ITERATIONS; y++)
{
while (!blobLock.Obtain())
{
await Task.Delay(rnd.Next(DELAY) + DELAY);
}
// we have lock
locks[instance]++;
await Task.Delay(rnd.Next(DELAY) + DELAY);
blobLock.Dispose();
releases[instance]++;
}
}));
}
await Task.WhenAll(tasks);
for (int i = 0; i < THREADCOUNT; i++)
{
Assert.AreEqual(ITERATIONS, locks[i]);
Assert.AreEqual(ITERATIONS, releases[i]);
}
}


[TestMethod]
public void AzureLockTest()
{
string containerName = $"{_containerRoot}/{GetMethodName()}";
var (azureDirectory, expectedDirectory) = Arrange(containerName);
var factory = azureDirectory.LockFactory;
var testLock = factory.MakeLock("write.lock");
var testLock2 = factory.MakeLock("write.lock");
var testLock3 = factory.MakeLock("write3.lock");
Assert.AreSame(testLock, testLock2);
Assert.AreNotSame(testLock, testLock3);
}

[TestMethod]
public void CanListAllFileNames_InFlatContainer()
{
Expand Down Expand Up @@ -358,7 +439,7 @@ private string GetMethodName([CallerMemberName] string methodName = null)
{
return methodName;
}

private void TestListingFilesOfDirectory(string containerName, string expectedFileNames, int numberOfSimulatedIndexWrites = 1)
{
var connectionString = _connectionString ?? "UseDevelopmentStorage=true";
Expand All @@ -373,48 +454,48 @@ private void TestListingFilesOfDirectory(string containerName, string expectedFi
}

// Act
var actual = azureDirectory.ListAll();
var actual = azureDirectory.ListAll().Where(x => !x.EndsWith(".lock"));

// Assert
var actualFileNames = string.Join("\n", actual);
Assert.AreEqual(expectedFileNames, actualFileNames);
}

private void AssertFilesAreEqual(AzureDirectory azureDirectory, FSDirectory expcetedDirectory, string messagePrefix = null)
{
var cacheDirectory = azureDirectory.CacheDirectory;
var cachedFiles = cacheDirectory.ListAll().OrderBy(x => x).ToList();
var azureFiles = azureDirectory.ListAll().OrderBy(x => x).ToList();
var azureFiles = azureDirectory.ListAll().OrderBy(x => x).Where(x => !x.EndsWith(".lock")).ToList();
var expectedFiles = expcetedDirectory.ListAll().OrderBy(x => x).ToList();

var prefix = messagePrefix ?? string.Empty;

Assert.AreEqual(azureFiles.Count, cachedFiles.Intersect(azureFiles).Count(), $"{prefix}files contained in azure directory must exist in cache");
Assert.AreEqual(string.Join("\n", azureFiles), string.Join("\n",expectedFiles), $"{prefix}files contained in azure directory and expected directory differ");
Assert.AreEqual(string.Join("\n", azureFiles), string.Join("\n", expectedFiles), $"{prefix}files contained in azure directory and expected directory differ");

var errors = new List<string>();

foreach (var f in azureFiles.FilterSiFiles())
{
using var actualFile = azureDirectory.OpenInput(f, new IOContext());
using var expectedFile = expcetedDirectory.OpenInput(f, new IOContext());
using var cachedFile = cacheDirectory.OpenInput(f, new IOContext());

byte[] actualData = new byte[actualFile.Length];
actualFile.ReadBytes(actualData, 0, (int)actualFile.Length);
byte[] expectedData = new byte[expectedFile.Length];
expectedFile.ReadBytes(expectedData, 0, (int)expectedFile.Length);
byte[] cachedData = new byte[cachedFile.Length];
cachedFile.ReadBytes(cachedData, 0, (int)cachedFile.Length);

if (expectedFile.Length != actualFile.Length)
errors.Add($"{prefix}the FSDirectory and azure files '{f}' differ in length (actual: {actualFile.Length}, expected: {expectedFile.Length})");
else if(!actualData.SequenceEqual(expectedData))
else if (!actualData.SequenceEqual(expectedData))
errors.Add($"{prefix}the FSDirectory and azure files '{f}' differ in their content (actual MD5: {Convert.ToBase64String(MD5.HashData(actualData))}, expected MD5: {Convert.ToBase64String(MD5.HashData(expectedData))})");

if (cachedFile.Length != actualFile.Length)
errors.Add($"{prefix}the cached- and azure files '{f}' differ in length (actual: {actualFile.Length}, cachedFile: {cachedFile.Length})");
else if(!actualData.SequenceEqual(cachedData))
else if (!actualData.SequenceEqual(cachedData))
errors.Add($"{prefix}the cached- and azure files '{f}' differ in their content (actual MD5: {Convert.ToBase64String(MD5.HashData(actualData))}, cached MD5: {Convert.ToBase64String(MD5.HashData(cachedData))})");
}

Expand Down Expand Up @@ -464,12 +545,12 @@ private static (int dog, int cat, int car) InitializeCatalog(AzureDirectory azur
var dog = 0;
var cat = 0;
var car = 0;

// if we are passed a reference directory, we also write to it so we can compare the files later
var referenceIndexWriter = referenceDirectory != null
var referenceIndexWriter = referenceDirectory != null
? new IndexWriter(referenceDirectory, new IndexWriterConfig(Lucene.Net.Util.LuceneVersion.LUCENE_48, new StandardAnalyzer(Lucene.Net.Util.LuceneVersion.LUCENE_48)))
: null;

try
{
using (var indexWriter = new IndexWriter(azureDirectory, indexWriterConfig))
Expand Down
45 changes: 8 additions & 37 deletions source/Lucene.Net.Store.Azure/AzureDirectory.cs
Original file line number Diff line number Diff line change
Expand Up @@ -7,18 +7,14 @@

namespace Lucene.Net.Store.Azure
{
public class AzureDirectory : Directory
public class AzureDirectory : BaseDirectory
{
private BlobServiceClient _blobClient;
private readonly string containerName;
private readonly string subDirectory;

private readonly Dictionary<string, AzureLock> _locks = new Dictionary<string, AzureLock>();
private LockFactory _lockFactory = new NativeFSLockFactory();
private readonly Dictionary<string, AzureIndexOutput> _nameCache = new Dictionary<string, AzureIndexOutput>();

public override LockFactory LockFactory => _lockFactory;

public AzureDirectory(string storageAccount) :
this(storageAccount, null, null)
{
Expand Down Expand Up @@ -46,7 +42,7 @@ public AzureDirectory(
public AzureDirectory(
string storageAccount,
string catalog,
Directory cacheDirectory)
Directory cacheDirectory)
{
if (storageAccount == null)
throw new ArgumentNullException("storageAccount");
Expand All @@ -61,6 +57,9 @@ public AzureDirectory(

_blobClient = new BlobServiceClient(storageAccount);
_initCacheDirectory(cacheDirectory);

// default lock factory is AzureLockFactory
SetLockFactory(new AzureLockFactory(this));
}

/// <summary>
Expand Down Expand Up @@ -114,9 +113,11 @@ public void ClearCache()
public override string[] ListAll()
{
var prefix = string.IsNullOrEmpty(this.subDirectory) ? null : this.subDirectory + "/";

return BlobContainer.GetBlobsByHierarchy(delimiter: "/", prefix: prefix)
.Where(x => x.IsBlob)
.Select(x => x.Blob.Name.Split('/').Last()).ToArray();
.Select(x => x.Blob.Name.Split('/').Last())
.ToArray();
}

/// <summary>Returns true if a file with the given name exists. </summary>
Expand Down Expand Up @@ -188,31 +189,6 @@ public override IndexInput OpenInput(string name, IOContext context)
}
}

/// <summary>Construct a {@link Lock}.</summary>
/// <param name="name">the name of the lock file
/// </param>
public override Lock MakeLock(string name)
{
lock (_locks)
{
if (!_locks.ContainsKey(name))
{
_locks.Add(name, new AzureLock(name, this));
}
return _locks[name];
}
}

public override void ClearLock(string name)
{
lock (_locks)
{
if (_locks.ContainsKey(name))
{
_locks[name].BreakLock();
}
}
}

/// <summary>Closes the store. </summary>
protected override void Dispose(bool disposing)
Expand All @@ -221,11 +197,6 @@ protected override void Dispose(bool disposing)
_blobClient = null;
}

public override void SetLockFactory(LockFactory lockFactory)
{
_lockFactory = lockFactory;
}

/// <summary>Creates a new, empty file in the directory with the given name.
/// Returns a stream writing this file.
/// </summary>
Expand Down
Loading

0 comments on commit 14c7424

Please sign in to comment.