Skip to content

Commit

Permalink
Upgraded to LiquidProjections.PollingEventStore.Sources 1.1.0 (#8)
Browse files Browse the repository at this point in the history
  • Loading branch information
IharBury authored and dennisdoomen committed Sep 14, 2017
1 parent bf33fbd commit acb4730
Show file tree
Hide file tree
Showing 7 changed files with 74 additions and 46 deletions.
6 changes: 3 additions & 3 deletions Src/LiquidProjections.NEventStore/.nuspec
Original file line number Diff line number Diff line change
Expand Up @@ -13,9 +13,9 @@
<releaseNotes/>
<dependencies>
<group>
<dependency id="NEventStore" version="[5.0, 6.0)"/>
<dependency id="LiquidProjections.Abstractions" version="[2.0, 3.0)"/>
</group>
<dependency id="NEventStore" version="[5.0, 6.0)"/>
<dependency id="LiquidProjections.Abstractions" version="[2.3, 3.0)"/>
</group>
</dependencies>
</metadata>
<files>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,6 @@ public class PollingEventStoreAdapter : IDisposable
/// </summary>
private readonly LruCache<long, Transaction> transactionCacheByPreviousCheckpoint;

private readonly CancellationTokenSource cancellationTokenSource = new CancellationTokenSource();
private CheckpointRequestTimestamp lastSuccessfulPollingRequestWithoutResults;

/// <summary>
Expand Down Expand Up @@ -173,11 +172,14 @@ private async Task<Page> LoadNextPageSequentially(long previousCheckpoint, strin
{
while (true)
{
cancellationTokenSource.Token.ThrowIfCancellationRequested();

if (isDisposed)
{
return new Page(previousCheckpoint, new Transaction[0]);
#if DEBUG
LogProvider.GetLogger(typeof(PollingEventStoreAdapter)).Debug(() =>
$"Page loading for subscription {subscriptionId} cancelled because the adapter is disposed.");
#endif

throw new OperationCanceledException();
}

CheckpointRequestTimestamp effectiveLastExistingCheckpointRequest =
Expand Down Expand Up @@ -246,8 +248,22 @@ private Task<Page> TryLoadNextPageSequentiallyOrWaitForCurrentLoadingToFinish(lo
$"for a page after checkpoint {previousCheckpoint}.");
#endif

// Ignore result.
Task _ = TryLoadNextPageAndMakeLoaderComplete(previousCheckpoint, taskCompletionSource, subscriptionId);
if (isDisposed)
{
#if DEBUG
LogProvider.GetLogger(typeof(PollingEventStoreAdapter))
.Debug(() => $"The loader {loader.Id} is cancelled because the adapter is disposed.");
#endif

// If the adapter is disposed before the current task is set, we cancel the task
// so we do not touch the event store.
taskCompletionSource.SetCanceled();
}
else
{
// Ignore result.
Task _ = TryLoadNextPageAndMakeLoaderComplete(previousCheckpoint, taskCompletionSource, subscriptionId);
}
}
else
{
Expand Down Expand Up @@ -394,17 +410,23 @@ public void Dispose()
{
isDisposed = true;

cancellationTokenSource.Cancel();

foreach (Subscription subscription in subscriptions.ToArray())
{
subscription.Complete();
}

// New loading tasks are no longer started at this point.
// After the current loading task is finished, the event store is no longer used and can be disposed.
Task loaderToWaitFor = Volatile.Read(ref currentLoader);
loaderToWaitFor?.Wait();

cancellationTokenSource.Dispose();
try
{
loaderToWaitFor?.Wait();
}
catch (AggregateException)
{
// Ignore.
}

(eventStore as IDisposable)?.Dispose();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,10 +48,11 @@ public void Start()
LogProvider.GetLogger(typeof(Subscription)).Debug(() => $"Subscription {Id} has been started.");
#endif

SubscriptionInfo info = new SubscriptionInfo
var info = new SubscriptionInfo
{
Id = Id,
Subscription = this
Subscription = this,
CancellationToken = cancellationTokenSource.Token
};

Task = Task.Factory.StartNew(async () =>
Expand Down Expand Up @@ -162,43 +163,48 @@ public void Complete()

public void Dispose()
{
bool isDisposing;

lock (syncRoot)
{
isDisposing = !isDisposed;

if (isDisposing)
if (!isDisposed)
{
isDisposed = true;
}
}

if (isDisposing)
{
if (cancellationTokenSource != null)
{
// Wait for the task asynchronously.
Task.Run(() =>
{
if (cancellationTokenSource != null)
{
#if DEBUG
LogProvider.GetLogger(typeof(Subscription)).Debug(() => $"Subscription {Id} is being stopped.");
LogProvider.GetLogger(typeof(Subscription)).Debug(() => $"Subscription {Id} is being stopped.");
#endif
if (!cancellationTokenSource.IsCancellationRequested)
{
cancellationTokenSource.Cancel();
}
if (!cancellationTokenSource.IsCancellationRequested)
{
cancellationTokenSource.Cancel();
}
Task?.Wait();
cancellationTokenSource.Dispose();
}
try
{
Task?.Wait();
}
catch (AggregateException)
{
// Ignore.
}
lock (eventStoreAdapter.subscriptionLock)
{
eventStoreAdapter.subscriptions.Remove(this);
}
cancellationTokenSource.Dispose();
}
lock (eventStoreAdapter.subscriptionLock)
{
eventStoreAdapter.subscriptions.Remove(this);
}
#if DEBUG
LogProvider.GetLogger(typeof(Subscription)).Debug(() => $"Subscription {Id} has been stopped.");
LogProvider.GetLogger(typeof(Subscription)).Debug(() => $"Subscription {Id} has been stopped.");
#endif
});
}
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,8 +39,8 @@
<LangVersion>6</LangVersion>
</PropertyGroup>
<ItemGroup>
<Reference Include="LiquidProjections.Abstractions, Version=2.2.0.0, Culture=neutral, processorArchitecture=MSIL">
<HintPath>..\..\packages\LiquidProjections.Abstractions.2.2.0\lib\netstandard1.1\LiquidProjections.Abstractions.dll</HintPath>
<Reference Include="LiquidProjections.Abstractions, Version=2.3.0.0, Culture=neutral, processorArchitecture=MSIL">
<HintPath>..\..\packages\LiquidProjections.Abstractions.2.3.0\lib\netstandard1.1\LiquidProjections.Abstractions.dll</HintPath>
</Reference>
<Reference Include="NEventStore, Version=5.2.0.0, Culture=neutral, processorArchitecture=MSIL">
<HintPath>..\..\packages\NEventStore.5.2.0\lib\net40\NEventStore.dll</HintPath>
Expand Down
4 changes: 2 additions & 2 deletions Src/LiquidProjections.NEventStore/packages.config
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
<?xml version="1.0" encoding="utf-8"?>
<packages>
<package id="LiquidProjections.Abstractions" version="2.2.0" targetFramework="net45" />
<package id="LiquidProjections.PollingEventStore.Sources" version="1.0.1" targetFramework="net45" />
<package id="LiquidProjections.Abstractions" version="2.3.0" targetFramework="net45" />
<package id="LiquidProjections.PollingEventStore.Sources" version="1.1.0" targetFramework="net45" />
<package id="Microsoft.CSharp" version="4.3.0" targetFramework="net45" />
<package id="NEventStore" version="5.2.0" targetFramework="net45" />
<package id="System.Dynamic.Runtime" version="4.3.0" targetFramework="net45" />
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,8 +46,8 @@
<Reference Include="FluentAssertions.Core, Version=4.19.3.0, Culture=neutral, PublicKeyToken=33f2691a05b67b6a, processorArchitecture=MSIL">
<HintPath>..\..\packages\FluentAssertions.4.19.3\lib\net45\FluentAssertions.Core.dll</HintPath>
</Reference>
<Reference Include="LiquidProjections.Abstractions, Version=2.2.0.0, Culture=neutral, processorArchitecture=MSIL">
<HintPath>..\..\packages\LiquidProjections.Abstractions.2.2.0\lib\netstandard1.1\LiquidProjections.Abstractions.dll</HintPath>
<Reference Include="LiquidProjections.Abstractions, Version=2.3.0.0, Culture=neutral, processorArchitecture=MSIL">
<HintPath>..\..\packages\LiquidProjections.Abstractions.2.3.0\lib\netstandard1.1\LiquidProjections.Abstractions.dll</HintPath>
</Reference>
<Reference Include="NEventStore, Version=5.2.0.0, Culture=neutral, processorArchitecture=MSIL">
<HintPath>..\..\packages\NEventStore.5.2.0\lib\net40\NEventStore.dll</HintPath>
Expand Down
2 changes: 1 addition & 1 deletion Tests/LiquidProjections.NEventStore.Specs/packages.config
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
<package id="Chill" version="3.0.1" targetFramework="net452" />
<package id="FakeItEasy" version="3.4.2" targetFramework="net452" />
<package id="FluentAssertions" version="4.19.3" targetFramework="net452" />
<package id="LiquidProjections.Abstractions" version="2.2.0" targetFramework="net452" />
<package id="LiquidProjections.Abstractions" version="2.3.0" targetFramework="net452" />
<package id="NEventStore" version="5.2.0" targetFramework="net452" />
<package id="xunit" version="2.2.0" targetFramework="net452" />
<package id="xunit.abstractions" version="2.0.1" targetFramework="net452" />
Expand Down

0 comments on commit acb4730

Please sign in to comment.