Skip to content

Commit

Permalink
Merge pull request #33 from xzhayon/fix/fiber
Browse files Browse the repository at this point in the history
feat: speedup concurrency and catch interrupts
  • Loading branch information
xzhayon authored May 27, 2024
2 parents c2e0f92 + f98ef5b commit 4f9f942
Show file tree
Hide file tree
Showing 26 changed files with 518 additions and 456 deletions.
26 changes: 13 additions & 13 deletions src/Cause.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,17 +3,17 @@ import * as $Type from './Type'
import { Variant } from './Type'
import { FiberId } from './fiber/FiberId'

export type Cause<E> = Fail<E> | Die | Interrupt
export type Cause<E> = Die | Fail<E> | Interrupt

type _Cause<T extends string> = Variant<typeof uri, T>

export interface Fail<E> extends _Cause<'Fail'> {
readonly error: E
export interface Die extends _Cause<'Die'> {
readonly error: unknown
readonly fiberId: FiberId
}

export interface Die extends _Cause<'Die'> {
readonly error: unknown
export interface Fail<E> extends _Cause<'Fail'> {
readonly error: E
readonly fiberId: FiberId
}

Expand All @@ -24,14 +24,14 @@ export interface Interrupt extends _Cause<'Interrupt'> {
const uri = Symbol('Cause')
const _cause = $Type.variant(uri)

export function fail<E>(error: E, fiberId: FiberId): Cause<E> {
return { ..._cause('Fail'), error, fiberId }
}

export function die(error: unknown, fiberId: FiberId): Cause<never> {
return { ..._cause('Die'), error, fiberId }
}

export function fail<E>(error: E, fiberId: FiberId): Cause<E> {
return { ..._cause('Fail'), error, fiberId }
}

export function interrupt(fiberId: FiberId): Cause<never> {
return { ..._cause('Interrupt'), fiberId }
}
Expand All @@ -40,14 +40,14 @@ export function is(u: unknown): u is Cause<unknown> {
return $Struct.is(u) && $Struct.has(u, $Type.uri) && u[$Type.uri] === uri
}

export function isFail<E>(cause: Cause<E>): cause is Fail<E> {
return cause[$Type.tag] === 'Fail'
}

export function isDie(cause: Cause<any>): cause is Die {
return cause[$Type.tag] === 'Die'
}

export function isFail<E>(cause: Cause<E>): cause is Fail<E> {
return cause[$Type.tag] === 'Fail'
}

export function isInterrupt(cause: Cause<any>): cause is Interrupt {
return cause[$Type.tag] === 'Interrupt'
}
10 changes: 8 additions & 2 deletions src/Effector.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,13 +5,19 @@ import { And, Covariant, Invariant, IsAny } from './Type'
export type Effector<A, E = never, R = never> = Generator<
And<IsAny<R>, IsAny<E>> extends true
? any
: (R extends any ? Use<R> : never) | (E extends any ? Throw<E> : never),
:
| (R extends any ? Use<R> : never)
| (E extends any ? Throw<E> : never)
| void,
A
>
export type AsyncEffector<A, E = never, R = never> = AsyncGenerator<
And<IsAny<R>, IsAny<E>> extends true
? any
: (R extends any ? Use<R> : never) | (E extends any ? Throw<E> : never),
:
| (R extends any ? Use<R> : never)
| (E extends any ? Throw<E> : never)
| void,
A
>
export type AnyEffector<A, E = never, R = never> =
Expand Down
7 changes: 0 additions & 7 deletions src/Error.ts

This file was deleted.

117 changes: 0 additions & 117 deletions src/Fiber.ts

This file was deleted.

48 changes: 35 additions & 13 deletions src/Generator.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import * as $Function from './Function'
import * as $Struct from './Struct'
import { OrLazy } from './Type'

export type AnyGenerator<Y = unknown, R = any, N = unknown> =
| Generator<Y, R, N>
Expand All @@ -24,19 +25,25 @@ export type NextOf<G extends AnyGenerator> = G extends AnyGenerator<

export type Generated<A> = A extends AnyGenerator ? ReturnOf<A> : A

export function is(u: unknown): u is AnyGenerator {
return (
$Struct.is(u) &&
$Struct.has(u, 'next') &&
$Function.is(u.next) &&
$Struct.has(u, 'return') &&
$Function.is(u.return) &&
$Struct.has(u, 'throw') &&
$Function.is(u.throw) &&
(($Struct.has(u, Symbol.iterator) && $Function.is(u[Symbol.iterator])) ||
($Struct.has(u, Symbol.asyncIterator) &&
$Function.is(u[Symbol.asyncIterator])))
)
export function* fromPromise<A>(promise: OrLazy<Promise<A>>) {
const _promise = $Function.is(promise) ? promise() : promise
let result: { value: A } | { error: unknown } | undefined
_promise
.then((value) => {
result = { value }
})
.catch((error: unknown) => {
result = { error }
})
while (result === undefined) {
yield undefined as void
}

if ('error' in result) {
throw result.error
}

return result.value
}

export function* sequence<G extends Generator>(
Expand Down Expand Up @@ -74,3 +81,18 @@ export function traverseAsync<A, G extends AnyGenerator>(
) {
return sequenceAsync(as.map(f))
}

export function is(u: unknown): u is AnyGenerator {
return (
$Struct.is(u) &&
$Struct.has(u, 'next') &&
$Function.is(u.next) &&
$Struct.has(u, 'return') &&
$Function.is(u.return) &&
$Struct.has(u, 'throw') &&
$Function.is(u.throw) &&
(($Struct.has(u, Symbol.iterator) && $Function.is(u[Symbol.iterator])) ||
($Struct.has(u, Symbol.asyncIterator) &&
$Function.is(u[Symbol.asyncIterator])))
)
}
120 changes: 0 additions & 120 deletions src/Promise.ts
Original file line number Diff line number Diff line change
@@ -1,123 +1,3 @@
import { AnyEffector, ContextOf, ErrorOf } from './Effector'
import * as $Error from './Error'
import * as $Exit from './Exit'
import * as $Type from './Type'
import { OrLazy } from './Type'
import * as $Backdoor from './effect/Backdoor'
import * as $Exception from './effect/Exception'
import * as $Interruption from './effect/Interruption'

export function is(u: unknown): u is Promise<unknown> {
return u instanceof Promise
}

export function all<G extends AnyEffector<any, any, any>>(
effectors: ReadonlyArray<OrLazy<G>>,
) {
return $Backdoor.exploit<ContextOf<G>>()(async function* (run) {
try {
return await Promise.all(
effectors.map(run).map((promise) =>
promise.then((exit) => {
if ($Exit.isFailure(exit)) {
throw exit
}

return exit.value
}),
),
)
} catch (exit) {
if (!$Exit.is(exit) || !$Exit.isFailure(exit)) {
throw new Error('Cannot find Promise failure', { cause: exit })
}

switch (exit.cause[$Type.tag]) {
case 'Die':
throw exit.cause.error
case 'Fail':
return yield* $Exception.raise(exit.cause.error as ErrorOf<G>)
case 'Interrupt':
return yield* $Interruption.interrupt()
}
}
})
}

export function any<G extends AnyEffector<any, any, any>>(
effectors: ReadonlyArray<OrLazy<G>>,
) {
return $Backdoor.exploit<ContextOf<G>>()(async function* (run) {
try {
return await Promise.any(
effectors.map(run).map((promise) =>
promise.then((exit) => {
if ($Exit.isFailure(exit)) {
throw exit
}

return exit.value
}),
),
)
} catch (error) {
if (!$Error.is(error)) {
throw new Error('Cannot find Promise error', { cause: error })
}

if (!$Error.isAggregate(error)) {
throw error
}

throw new AggregateError(
error.errors.map((exit) => {
if (!$Exit.is(exit) || !$Exit.isFailure(exit)) {
return new Error('Cannot find Promise failure', { cause: exit })
}

switch (exit.cause[$Type.tag]) {
case 'Die':
case 'Fail':
return exit.cause.error
case 'Interrupt':
return new Error(`Fiber "${exit.cause.fiberId}" interrupted`)
}
}),
error.message,
)
}
})
}

export function race<G extends AnyEffector<any, any, any>>(
effectors: ReadonlyArray<OrLazy<G>>,
) {
return $Backdoor.exploit<ContextOf<G>>()(async function* (run) {
try {
return await Promise.race(
effectors.map(run).map((promise) =>
promise.then((exit) => {
if ($Exit.isFailure(exit)) {
throw exit
}

return exit.value
}),
),
)
} catch (exit) {
if (!$Exit.is(exit) || !$Exit.isFailure(exit)) {
throw new Error('Cannot find Promise failure', { cause: exit })
}

switch (exit.cause[$Type.tag]) {
case 'Die':
throw exit.cause.error
case 'Fail':
return yield* $Exception.raise(exit.cause.error as ErrorOf<G>)
case 'Interrupt':
return yield* $Interruption.interrupt()
}
}
})
}
1 change: 1 addition & 0 deletions src/concurrency/Concurrency.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
export { all, any, race } from './Fiber'
Loading

0 comments on commit 4f9f942

Please sign in to comment.