Skip to content

Commit

Permalink
CompletableEvent is dead, all Event
Browse files Browse the repository at this point in the history
  • Loading branch information
neuecc committed Dec 13, 2023
1 parent 5a4ac09 commit 31793f0
Show file tree
Hide file tree
Showing 44 changed files with 727 additions and 1,607 deletions.
99 changes: 0 additions & 99 deletions sandbox/ConsoleApp1/LiveList.cs

This file was deleted.

9 changes: 2 additions & 7 deletions sandbox/ConsoleApp1/Program.cs
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@



var publisher = new Publisher<int>();
var publisher = new Publisher<int, R3.Unit>();

var d = publisher
.Where(x => true)
Expand Down Expand Up @@ -193,12 +193,7 @@

public static class Extensions
{
public static IDisposable WriteLine<T>(this Event<T> source)
{
return source.Subscribe(x => Console.WriteLine(x));
}

public static IDisposable WriteLine<T, U>(this CompletableEvent<T, U> source)
public static IDisposable WriteLine<T, U>(this Event<T, U> source)
{
return source.Subscribe(x => Console.WriteLine(x), _ => Console.WriteLine("COMPLETED"));
}
Expand Down
96 changes: 2 additions & 94 deletions src/R3/Event.cs
Original file line number Diff line number Diff line change
@@ -1,102 +1,10 @@
#pragma warning disable CS0618
#pragma warning disable CS0618

using System.Diagnostics;

namespace R3;

// similar as IObservable<T>
// IDisposable Subscribe(Subscriber<TMessage> subscriber)
public abstract class Event<TMessage>
{
[StackTraceHidden, DebuggerStepThrough]
public IDisposable Subscribe(Subscriber<TMessage> subscriber)
{
try
{
var subscription = SubscribeCore(subscriber);

if (SubscriptionTracker.TryTrackActiveSubscription(subscription, 2, out var trackableDisposable))
{
subscription = trackableDisposable;
}

subscriber.SourceSubscription.Disposable = subscription;
return subscriber; // return subscriber to make subscription chain.
}
catch
{
subscriber.Dispose(); // when SubscribeCore failed, auto detach caller subscriber
throw;
}
}

[StackTraceHidden, DebuggerStepThrough]
protected abstract IDisposable SubscribeCore(Subscriber<TMessage> subscriber);
}

// similar as IObserver<T> but no stop on OnError.
public abstract class Subscriber<TMessage> : IDisposable
{
#if DEBUG
[Obsolete("Only allow in Event<TMessage>.")]
#endif
internal SingleAssignmentDisposableCore SourceSubscription;

int calledDispose;

public bool IsDisposed => Volatile.Read(ref calledDispose) != 0;

[StackTraceHidden, DebuggerStepThrough]
public void OnNext(TMessage message)
{
if (IsDisposed) return;
try
{
OnNextCore(message);
}
catch (Exception ex)
{
OnErrorResume(ex);
}
}

protected abstract void OnNextCore(TMessage message);

[StackTraceHidden, DebuggerStepThrough]
public void OnErrorResume(Exception error)
{
if (IsDisposed) return;

try
{
OnErrorResumeCore(error);
}
catch (Exception ex)
{
EventSystem.GetUnhandledExceptionHandler().Invoke(ex);
}
}

protected abstract void OnErrorResumeCore(Exception error);

[StackTraceHidden, DebuggerStepThrough]
public void Dispose()
{
if (Interlocked.Exchange(ref calledDispose, 1) != 0)
{
return;
}

DisposeCore(); // Dispose self
SourceSubscription.Dispose(); // Dispose attached parent
}

[StackTraceHidden, DebuggerStepThrough]
protected virtual void DisposeCore() { }
}

// similar as IObservable<T>
public abstract class CompletableEvent<TMessage, TComplete>
public abstract class Event<TMessage, TComplete>
{
[StackTraceHidden, DebuggerStepThrough]
public IDisposable Subscribe(Subscriber<TMessage, TComplete> subscriber)
Expand Down
147 changes: 147 additions & 0 deletions src/R3/EventSubscribeExtensions.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,147 @@
using System.Diagnostics;

namespace R3;

public static class EventSubscribeExtensions
{
// TODO: with State

[DebuggerStepThrough]
public static IDisposable Subscribe<TMessage, TComplete>(this Event<TMessage, TComplete> source)
{
return source.Subscribe(NopSubscriber<TMessage, TComplete>.Instance);
}

[DebuggerStepThrough]
public static IDisposable Subscribe<TMessage, TComplete>(this Event<TMessage, Result<TComplete>> source)
{
return source.Subscribe(NopRSubscriber<TMessage, TComplete>.Instance);
}

[DebuggerStepThrough]
public static IDisposable Subscribe<TMessage, TComplete>(this Event<TMessage, TComplete> source, Action<TMessage> onNext)
{
return source.Subscribe(new AnonymousSubscriber<TMessage, TComplete>(onNext, EventSystem.GetUnhandledExceptionHandler(), Stubs<TComplete>.Nop));
}

[DebuggerStepThrough]
public static IDisposable Subscribe<TMessage, TComplete>(this Event<TMessage, Result<TComplete>> source, Action<TMessage> onNext)
{
return source.Subscribe(new AnonymousRSubscriber<TMessage, TComplete>(onNext, EventSystem.GetUnhandledExceptionHandler()));
}

[DebuggerStepThrough]
public static IDisposable Subscribe<TMessage, TComplete>(this Event<TMessage, TComplete> source, Action<TMessage> onNext, Action<TComplete> onComplete)
{
return source.Subscribe(new AnonymousSubscriber<TMessage, TComplete>(onNext, EventSystem.GetUnhandledExceptionHandler(), onComplete));
}

[DebuggerStepThrough]
public static IDisposable Subscribe<TMessage, TComplete>(this Event<TMessage, TComplete> source, Action<TMessage> onNext, Action<Exception> onErrorResume, Action<TComplete> onComplete)
{
return source.Subscribe(new AnonymousSubscriber<TMessage, TComplete>(onNext, onErrorResume, onComplete));
}
}

[DebuggerStepThrough]
internal sealed class NopSubscriber<TMessage, TComplete> : Subscriber<TMessage, TComplete>
{
public static readonly NopSubscriber<TMessage, TComplete> Instance = new();

private NopSubscriber()
{
}

[DebuggerStepThrough]
protected override void OnNextCore(TMessage message)
{
}

[DebuggerStepThrough]
protected override void OnErrorResumeCore(Exception error)
{
EventSystem.GetUnhandledExceptionHandler().Invoke(error);
}

[DebuggerStepThrough]
protected override void OnCompletedCore(TComplete complete)
{
}
}

[DebuggerStepThrough]
internal sealed class NopRSubscriber<TMessage, TComplete> : Subscriber<TMessage, Result<TComplete>>
{
public static readonly NopRSubscriber<TMessage, TComplete> Instance = new();

private NopRSubscriber()
{
}

[DebuggerStepThrough]
protected override void OnNextCore(TMessage message)
{
}

[DebuggerStepThrough]
protected override void OnErrorResumeCore(Exception error)
{
EventSystem.GetUnhandledExceptionHandler().Invoke(error);
}

[DebuggerStepThrough]
protected override void OnCompletedCore(Result<TComplete> complete)
{
if (complete.IsFailure)
{
EventSystem.GetUnhandledExceptionHandler().Invoke(complete.Exception);
}
}
}

[DebuggerStepThrough]
internal sealed class AnonymousSubscriber<TMessage, TComplete>(Action<TMessage> onNext, Action<Exception> onErrorResume, Action<TComplete> onComplete) : Subscriber<TMessage, TComplete>
{
[DebuggerStepThrough]
protected override void OnNextCore(TMessage message)
{
onNext(message);
}

[DebuggerStepThrough]
protected override void OnErrorResumeCore(Exception error)
{
onErrorResume(error);
}

[DebuggerStepThrough]
protected override void OnCompletedCore(TComplete complete)
{
onComplete(complete);
}
}

[DebuggerStepThrough]
internal sealed class AnonymousRSubscriber<TMessage, TComplete>(Action<TMessage> onNext, Action<Exception> onErrorResume) : Subscriber<TMessage, Result<TComplete>>
{
[DebuggerStepThrough]
protected override void OnNextCore(TMessage message)
{
onNext(message);
}

[DebuggerStepThrough]
protected override void OnErrorResumeCore(Exception error)
{
onErrorResume(error);
}

[DebuggerStepThrough]
protected override void OnCompletedCore(Result<TComplete> complete)
{
if (complete.IsFailure)
{
EventSystem.GetUnhandledExceptionHandler().Invoke(complete.Exception);
}
}
}
Loading

0 comments on commit 31793f0

Please sign in to comment.