Skip to content

Commit

Permalink
Merge pull request #919 from b-editor/improve-dispatcher
Browse files Browse the repository at this point in the history
Dispatcherの機能追加
  • Loading branch information
yuto-trd authored Feb 6, 2024
2 parents c75dcab + 04df5e1 commit 73b0a59
Show file tree
Hide file tree
Showing 12 changed files with 264 additions and 128 deletions.
6 changes: 5 additions & 1 deletion src/Beutl.Engine/Rendering/RenderThread.cs
Original file line number Diff line number Diff line change
Expand Up @@ -4,5 +4,9 @@ namespace Beutl.Rendering;

public static class RenderThread
{
public static Dispatcher Dispatcher { get; } = Dispatcher.Spawn();
public static Dispatcher Dispatcher { get; } = Dispatcher.Spawn(() =>
{
Thread.CurrentThread.Name = "Beutl.RenderThread";
Thread.CurrentThread.IsBackground = true;
});
}
Original file line number Diff line number Diff line change
Expand Up @@ -32,41 +32,6 @@ public override void Load()
if (OperatingSystem.IsWindows())
{
DecoderRegistry.Register(GetDecoderInfo());

MFThread.Dispatcher.Invoke(() =>
{
Thread.CurrentThread.IsBackground = true;
Thread.CurrentThread.Name = "Beutl.MediaFoundation";
MediaManager.Startup();
});

if (Application.Current?.ApplicationLifetime is IControlledApplicationLifetime lifetime)
{
lifetime.Exit += OnApplicationExit;
}

AppDomain.CurrentDomain.UnhandledException += OnUnhandledException;
}
}

private void OnApplicationExit(object? sender, ControlledApplicationLifetimeExitEventArgs e)
{
Shutdown();
}

private void OnUnhandledException(object sender, UnhandledExceptionEventArgs e)
{
Shutdown();
}

private void Shutdown()
{
MFThread.Dispatcher.Invoke(() =>
{
MediaManager.Shutdown();
});
MFThread.Dispatcher.Stop();

AppDomain.CurrentDomain.UnhandledException -= OnUnhandledException;
}
}
47 changes: 45 additions & 2 deletions src/Beutl.Extensions.MediaFoundation/MFThread.cs
Original file line number Diff line number Diff line change
@@ -1,4 +1,9 @@
using Beutl.Threading;
using Avalonia;
using Avalonia.Controls.ApplicationLifetimes;

using Beutl.Threading;

using SharpDX.MediaFoundation;

#if MF_BUILD_IN
namespace Beutl.Embedding.MediaFoundation.Decoding;
Expand All @@ -8,5 +13,43 @@ namespace Beutl.Extensions.MediaFoundation.Decoding;

public static class MFThread
{
public static Dispatcher Dispatcher { get; } = Dispatcher.Spawn();
static MFThread()
{
Dispatcher = Dispatcher.Spawn(() =>
{
Thread.CurrentThread.IsBackground = true;
Thread.CurrentThread.Name = "Beutl.MediaFoundation";
MediaManager.Startup();
});

if (Application.Current?.ApplicationLifetime is IControlledApplicationLifetime lifetime)
{
lifetime.Exit += OnApplicationExit;
}

AppDomain.CurrentDomain.UnhandledException += OnUnhandledException;
}

public static Dispatcher Dispatcher { get; }

private static void OnApplicationExit(object? sender, ControlledApplicationLifetimeExitEventArgs e)
{
Shutdown();
}

private static void OnUnhandledException(object sender, UnhandledExceptionEventArgs e)
{
Shutdown();
}

private static void Shutdown()
{
Dispatcher.Invoke(() =>
{
MediaManager.Shutdown();
});
Dispatcher.Shutdown();

AppDomain.CurrentDomain.UnhandledException -= OnUnhandledException;
}
}
126 changes: 71 additions & 55 deletions src/Beutl.Threading/Dispatcher.cs
Original file line number Diff line number Diff line change
Expand Up @@ -7,43 +7,53 @@ public class Dispatcher
[ThreadStatic]
private static Dispatcher? s_current;

private readonly QueueSynchronizationContext _synchronizationContext = new();
private Thread? _thread;
private readonly QueueSynchronizationContext _synchronizationContext;

private Dispatcher()
{
_synchronizationContext = new(this);
Thread = new Thread(Start);
Thread.TrySetApartmentState(ApartmentState.STA);
}

public static Dispatcher Current => s_current!;

public void Start()
public Thread Thread { get; private set; }

public bool HasShutdownStarted => _synchronizationContext.HasShutdownStarted;

public bool HasShutdownFinished => _synchronizationContext.HasShutdownFinished;

public event EventHandler<DispatcherUnhandledExceptionEventArgs>? UnhandledException
{
Dispatcher? oldDispatcher = s_current;
SynchronizationContext? oldSynchronizationContext = SynchronizationContext.Current;
add => _synchronizationContext.UnhandledException += value;
remove => _synchronizationContext.UnhandledException -= value;
}

try
{
s_current = this;
_thread = Thread.CurrentThread;
SynchronizationContext.SetSynchronizationContext(_synchronizationContext);
public event EventHandler? ShutdownStarted
{
add => _synchronizationContext.ShutdownStarted += value;
remove => _synchronizationContext.ShutdownStarted -= value;
}

_synchronizationContext.Start();
}
finally
{
s_current = oldDispatcher;
SynchronizationContext.SetSynchronizationContext(oldSynchronizationContext);
}
public event EventHandler? ShutdownFinished
{
add => _synchronizationContext.ShutdownFinished += value;
remove => _synchronizationContext.ShutdownFinished -= value;
}

public void Execute()
private void Start()
{
Dispatcher? oldDispatcher = s_current;
SynchronizationContext? oldSynchronizationContext = SynchronizationContext.Current;

try
{
s_current = this;
_thread = Thread.CurrentThread;
Thread = Thread.CurrentThread;
SynchronizationContext.SetSynchronizationContext(_synchronizationContext);

_synchronizationContext.Execute();
_synchronizationContext.Start();
}
finally
{
Expand All @@ -52,10 +62,16 @@ public void Execute()
}
}

[Obsolete("Use Shutdown.")]
public void Stop()
{
_synchronizationContext.Stop();
Debug.WriteLine($"'{_thread?.Name ?? _thread?.ManagedThreadId.ToString()}' を停止しました");
Shutdown();
}

public void Shutdown()
{
_synchronizationContext.Shutdown();
Debug.WriteLine($"'{Thread?.Name ?? Thread?.ManagedThreadId.ToString()}' を停止しました");
}

public bool CheckAccess()
Expand All @@ -72,13 +88,7 @@ public void VerifyAccess()
public static Dispatcher Spawn()
{
var dispatcher = new Dispatcher();
var thread = new Thread(() =>
{
dispatcher.Start();
});
thread.TrySetApartmentState(ApartmentState.STA);
thread.Start();

dispatcher.Thread.Start();
return dispatcher;
}

Expand All @@ -89,92 +99,98 @@ public static Dispatcher Spawn(Action operation)
return dispatcher;
}

public void Invoke(Action operation, DispatchPriority priority = DispatchPriority.Medium)
public void Invoke(Action operation, DispatchPriority priority = DispatchPriority.Medium, CancellationToken ct = default)
{
if (CheckAccess())
{
ct.ThrowIfCancellationRequested();
operation();
}
else
{
_synchronizationContext.Send(priority, operation);
_synchronizationContext.Send(priority, operation, ct).Wait(ct);
}
}

public T Invoke<T>(Func<T> operation, DispatchPriority priority = DispatchPriority.Medium)
public T Invoke<T>(Func<T> operation, DispatchPriority priority = DispatchPriority.Medium, CancellationToken ct = default)
{
if (CheckAccess())
{
ct.ThrowIfCancellationRequested();
return operation();
}
else
{
return InvokeAsync(operation, priority).Result;
Task<T> task = InvokeAsync(operation, priority, ct);
task.Wait(ct);
return task.Result;
}
}

public Task InvokeAsync(Action operation, DispatchPriority priority = DispatchPriority.Medium)
public Task InvokeAsync(Action operation, DispatchPriority priority = DispatchPriority.Medium, CancellationToken ct = default)
{
return _synchronizationContext.Send(priority, operation);
return _synchronizationContext.Send(priority, operation, ct);
}

public async Task InvokeAsync(Func<Task> operation, DispatchPriority priority = DispatchPriority.Medium)
public async Task InvokeAsync(Func<Task> operation, DispatchPriority priority = DispatchPriority.Medium, CancellationToken ct = default)
{
await await InvokeAsync<Task>(operation, priority);
await await InvokeAsync<Task>(operation, priority, ct);
}

public Task<T> InvokeAsync<T>(Func<T> operation, DispatchPriority priority = DispatchPriority.Medium)
public Task<T> InvokeAsync<T>(Func<T> operation, DispatchPriority priority = DispatchPriority.Medium, CancellationToken ct = default)
{
return _synchronizationContext.Send(priority, operation);
return _synchronizationContext.Send(priority, operation, ct);
}

public async Task<T> InvokeAsync<T>(Func<Task<T>> operation, DispatchPriority priority = DispatchPriority.Medium)
public async Task<T> InvokeAsync<T>(Func<Task<T>> operation, DispatchPriority priority = DispatchPriority.Medium, CancellationToken ct = default)
{
return await await InvokeAsync<Task<T>>(operation, priority);
return await await InvokeAsync<Task<T>>(operation, priority, ct);
}

public void Dispatch(Action operation, DispatchPriority priority = DispatchPriority.Medium)
public void Dispatch(Action operation, DispatchPriority priority = DispatchPriority.Medium, CancellationToken ct = default)
{
_synchronizationContext.Post(priority, operation);
_synchronizationContext.Post(priority, operation, ct);
}

public void Dispatch(Func<Task> operation, DispatchPriority priority = DispatchPriority.Medium)
public void Dispatch(Func<Task> operation, DispatchPriority priority = DispatchPriority.Medium, CancellationToken ct = default)
{
_synchronizationContext.Post(priority, () => operation());
_synchronizationContext.Post(priority, () => operation(), ct);
}

public void Run(Action operation, DispatchPriority priority = DispatchPriority.Medium)
public void Run(Action operation, DispatchPriority priority = DispatchPriority.Medium, CancellationToken ct = default)
{
if (CheckAccess())
{
operation();
if (!ct.IsCancellationRequested)
operation();
}
else
{
Dispatch(operation, priority);
Dispatch(operation, priority, ct);
}
}

public void Run(Func<Task> operation, DispatchPriority priority)
public void Run(Func<Task> operation, DispatchPriority priority = DispatchPriority.Medium, CancellationToken ct = default)
{
if (CheckAccess())
{
operation();
if (!ct.IsCancellationRequested)
operation();
}
else
{
Dispatch(operation, priority);
Dispatch(operation, priority, ct);
}
}

public void Schedule(TimeSpan delay, Action operation, DispatchPriority priority = DispatchPriority.Medium)
public void Schedule(TimeSpan delay, Action operation, DispatchPriority priority = DispatchPriority.Medium, CancellationToken ct = default)
{
_synchronizationContext.PostDelayed(DateTime.UtcNow + delay, priority, operation);
_synchronizationContext.PostDelayed(DateTime.UtcNow + delay, priority, operation, ct);
}

public void Schedule(TimeSpan delay, Func<Task> operation, DispatchPriority priority = DispatchPriority.Medium)
public void Schedule(TimeSpan delay, Func<Task> operation, DispatchPriority priority = DispatchPriority.Medium, CancellationToken ct = default)
{
_synchronizationContext.PostDelayed(DateTime.UtcNow + delay, priority, () => operation());
_synchronizationContext.PostDelayed(DateTime.UtcNow + delay, priority, () => operation(), ct);
}

public static YieldTask Yield(DispatchPriority priority = DispatchPriority.Low)
Expand Down
14 changes: 12 additions & 2 deletions src/Beutl.Threading/DispatcherOperation.cs
Original file line number Diff line number Diff line change
@@ -1,11 +1,13 @@
namespace Beutl.Threading;

namespace Beutl.Threading;

internal sealed class DispatcherOperation
{
public DispatcherOperation(Action action, DispatchPriority priority)
public DispatcherOperation(Action action, DispatchPriority priority, CancellationToken ct)
{
Action = action;
Priority = priority;
Token = ct;
if (!ExecutionContext.IsFlowSuppressed())
{
ExecutionContext = ExecutionContext.Capture();
Expand All @@ -16,10 +18,18 @@ public DispatcherOperation(Action action, DispatchPriority priority)

public DispatchPriority Priority { get; }

public CancellationToken Token { get; }

public ExecutionContext? ExecutionContext { get; }

public void Run()
{
if (Token.IsCancellationRequested)
{
ExecutionContext?.Dispose();
return;
}

if (ExecutionContext is { } ctx)
{
ExecutionContext.Run(ctx, _ => Action(), null);
Expand Down
14 changes: 14 additions & 0 deletions src/Beutl.Threading/DispatcherUnhandledExceptionEventArgs.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;

namespace Beutl.Threading;

public class DispatcherUnhandledExceptionEventArgs(Exception exception) : EventArgs
{
public bool Handled { get; set; }

public Exception Exception { get; } = exception;
}
Loading

0 comments on commit 73b0a59

Please sign in to comment.