diff --git a/lib/events.ts b/lib/events.ts index df38e6eb..9b0ff23e 100644 --- a/lib/events.ts +++ b/lib/events.ts @@ -1,7 +1,6 @@ // deno-lint-ignore-file no-explicit-any ban-types import { createSignal } from "./signal.ts"; import { resource } from "./instructions.ts"; -import { first } from "./mod.ts"; import type { Operation, Stream, Subscription } from "./types.ts"; type FN = (...any: any[]) => any; @@ -32,7 +31,13 @@ export function once< T extends EventTarget, K extends EventList | (string & {}), >(target: T, name: K): Operation> { - return first(on(target, name)); + return { + *[Symbol.iterator]() { + let subscription = yield* on(target, name).subscribe(); + let next = yield* subscription.next(); + return next.value; + }, + }; } /** diff --git a/lib/filter.ts b/lib/filter.ts deleted file mode 100644 index 45948454..00000000 --- a/lib/filter.ts +++ /dev/null @@ -1,33 +0,0 @@ -import type { Operation, Stream } from "./types.ts"; - -export interface Predicate { - (a: A): Operation; -} - -export function filter(predicate: Predicate) { - return function (stream: Stream): Stream { - return { - subscribe() { - return { - *[Symbol.iterator]() { - let subscription = yield* stream.subscribe(); - - return { - *next() { - while (true) { - let next = yield* subscription.next(); - - if (next.done) { - return next; - } else if (yield* predicate(next.value)) { - return next; - } - } - }, - }; - }, - }; - }, - }; - }; -} diff --git a/lib/first.ts b/lib/first.ts deleted file mode 100644 index e0ffbc49..00000000 --- a/lib/first.ts +++ /dev/null @@ -1,13 +0,0 @@ -import type { Operation, Stream } from "./types.ts"; - -export function first(stream: Stream): Operation; -export function* first( - stream: Stream, -): Operation { - let subscription = yield* stream.subscribe(); - let result = yield* subscription.next(); - - if (!result.done) { - return result.value; - } -} diff --git a/lib/map.ts b/lib/map.ts deleted file mode 100644 index ae946557..00000000 --- a/lib/map.ts +++ /dev/null @@ -1,27 +0,0 @@ -import type { Operation, Stream } from "./types.ts"; - -export function map(op: (a: A) => Operation) { - return function (stream: Stream): Stream { - return { - subscribe() { - return { - *[Symbol.iterator]() { - let subscription = yield* stream.subscribe(); - - return { - *next() { - let next = yield* subscription.next(); - - if (next.done) { - return next; - } else { - return { ...next, value: yield* op(next.value) }; - } - }, - }; - }, - }; - }, - }; - }; -} diff --git a/lib/mod.ts b/lib/mod.ts index e2b0e0f9..6f3b4189 100644 --- a/lib/mod.ts +++ b/lib/mod.ts @@ -5,13 +5,9 @@ export * from "./instructions.ts"; export * from "./call.ts"; export * from "./run.ts"; export * from "./sleep.ts"; -export * from "./first.ts"; export * from "./async.ts"; export * from "./abort-signal.ts"; export * from "./result.ts"; -export * from "./map.ts"; -export * from "./filter.ts"; -export * from "./pipe.ts"; export * from "./lift.ts"; export * from "./events.ts"; export * from "./main.ts"; diff --git a/lib/pipe.ts b/lib/pipe.ts deleted file mode 100644 index ba75b51d..00000000 --- a/lib/pipe.ts +++ /dev/null @@ -1,273 +0,0 @@ -// deno-lint-ignore no-explicit-any -type Func = (...arg: any) => any; - -export function pipe(a: A): A; -export function pipe(a: A, ab: (a: A) => B): B; -export function pipe(a: A, ab: (a: A) => B, bc: (b: B) => C): C; -export function pipe( - a: A, - ab: (a: A) => B, - bc: (b: B) => C, - cd: (c: C) => D, -): D; -export function pipe( - a: A, - ab: (a: A) => B, - bc: (b: B) => C, - cd: (c: C) => D, - de: (d: D) => E, -): E; -export function pipe( - a: A, - ab: (a: A) => B, - bc: (b: B) => C, - cd: (c: C) => D, - de: (d: D) => E, - ef: (e: E) => F, -): F; -export function pipe( - a: A, - ab: (a: A) => B, - bc: (b: B) => C, - cd: (c: C) => D, - de: (d: D) => E, - ef: (e: E) => F, - fg: (f: F) => G, -): G; -export function pipe( - a: A, - ab: (a: A) => B, - bc: (b: B) => C, - cd: (c: C) => D, - de: (d: D) => E, - ef: (e: E) => F, - fg: (f: F) => G, - gh: (g: G) => H, -): H; -export function pipe( - a: A, - ab: (a: A) => B, - bc: (b: B) => C, - cd: (c: C) => D, - de: (d: D) => E, - ef: (e: E) => F, - fg: (f: F) => G, - gh: (g: G) => H, - hi: (h: H) => I, -): I; -export function pipe( - a: A, - ab: (a: A) => B, - bc: (b: B) => C, - cd: (c: C) => D, - de: (d: D) => E, - ef: (e: E) => F, - fg: (f: F) => G, - gh: (g: G) => H, - hi: (h: H) => I, - ij: (i: I) => J, -): J; -export function pipe( - a: A, - ab: (a: A) => B, - bc: (b: B) => C, - cd: (c: C) => D, - de: (d: D) => E, - ef: (e: E) => F, - fg: (f: F) => G, - gh: (g: G) => H, - hi: (h: H) => I, - ij: (i: I) => J, - jk: (j: J) => K, -): K; -export function pipe( - a: A, - ab: (a: A) => B, - bc: (b: B) => C, - cd: (c: C) => D, - de: (d: D) => E, - ef: (e: E) => F, - fg: (f: F) => G, - gh: (g: G) => H, - hi: (h: H) => I, - ij: (i: I) => J, - jk: (j: J) => K, - kl: (k: K) => L, -): L; -export function pipe( - a: A, - ab: (a: A) => B, - bc: (b: B) => C, - cd: (c: C) => D, - de: (d: D) => E, - ef: (e: E) => F, - fg: (f: F) => G, - gh: (g: G) => H, - hi: (h: H) => I, - ij: (i: I) => J, - jk: (j: J) => K, - kl: (k: K) => L, - lm: (l: L) => M, -): M; -export function pipe( - a: A, - ab: (a: A) => B, - bc: (b: B) => C, - cd: (c: C) => D, - de: (d: D) => E, - ef: (e: E) => F, - fg: (f: F) => G, - gh: (g: G) => H, - hi: (h: H) => I, - ij: (i: I) => J, - jk: (j: J) => K, - kl: (k: K) => L, - lm: (l: L) => M, - mn: (m: M) => N, -): N; -export function pipe( - a: A, - ab: (a: A) => B, - bc: (b: B) => C, - cd: (c: C) => D, - de: (d: D) => E, - ef: (e: E) => F, - fg: (f: F) => G, - gh: (g: G) => H, - hi: (h: H) => I, - ij: (i: I) => J, - jk: (j: J) => K, - kl: (k: K) => L, - lm: (l: L) => M, - mn: (m: M) => N, - no: (n: N) => O, -): O; - -export function pipe( - a: A, - ab: (a: A) => B, - bc: (b: B) => C, - cd: (c: C) => D, - de: (d: D) => E, - ef: (e: E) => F, - fg: (f: F) => G, - gh: (g: G) => H, - hi: (h: H) => I, - ij: (i: I) => J, - jk: (j: J) => K, - kl: (k: K) => L, - lm: (l: L) => M, - mn: (m: M) => N, - no: (n: N) => O, - op: (o: O) => P, -): P; -export function pipe( - a: A, - ab: (a: A) => B, - bc: (b: B) => C, - cd: (c: C) => D, - de: (d: D) => E, - ef: (e: E) => F, - fg: (f: F) => G, - gh: (g: G) => H, - hi: (h: H) => I, - ij: (i: I) => J, - jk: (j: J) => K, - kl: (k: K) => L, - lm: (l: L) => M, - mn: (m: M) => N, - no: (n: N) => O, - op: (o: O) => P, - pq: (p: P) => Q, -): Q; -export function pipe( - a: A, - ab: (a: A) => B, - bc: (b: B) => C, - cd: (c: C) => D, - de: (d: D) => E, - ef: (e: E) => F, - fg: (f: F) => G, - gh: (g: G) => H, - hi: (h: H) => I, - ij: (i: I) => J, - jk: (j: J) => K, - kl: (k: K) => L, - lm: (l: L) => M, - mn: (m: M) => N, - no: (n: N) => O, - op: (o: O) => P, - pq: (p: P) => Q, - qr: (q: Q) => R, -): R; -export function pipe( - a: A, - ab: (a: A) => B, - bc: (b: B) => C, - cd: (c: C) => D, - de: (d: D) => E, - ef: (e: E) => F, - fg: (f: F) => G, - gh: (g: G) => H, - hi: (h: H) => I, - ij: (i: I) => J, - jk: (j: J) => K, - kl: (k: K) => L, - lm: (l: L) => M, - mn: (m: M) => N, - no: (n: N) => O, - op: (o: O) => P, - pq: (p: P) => Q, - qr: (q: Q) => R, - rs: (r: R) => S, -): S; -export function pipe< - A, - B, - C, - D, - E, - F, - G, - H, - I, - J, - K, - L, - M, - N, - O, - P, - Q, - R, - S, - T, ->( - a: A, - ab: (a: A) => B, - bc: (b: B) => C, - cd: (c: C) => D, - de: (d: D) => E, - ef: (e: E) => F, - fg: (f: F) => G, - gh: (g: G) => H, - hi: (h: H) => I, - ij: (i: I) => J, - jk: (j: J) => K, - kl: (k: K) => L, - lm: (l: L) => M, - mn: (m: M) => N, - no: (n: N) => O, - op: (o: O) => P, - pq: (p: P) => Q, - qr: (q: Q) => R, - rs: (r: R) => S, - st: (s: S) => T, -): T; -export function pipe(a: unknown, ...fns: Func[]): unknown { - for (const fn of fns) { - a = fn(a); - } - - return a; -} diff --git a/test/stream-combinators.test.ts b/test/stream-combinators.test.ts deleted file mode 100644 index 9e8e38e8..00000000 --- a/test/stream-combinators.test.ts +++ /dev/null @@ -1,129 +0,0 @@ -import { beforeEach, describe, expect, expectType, it } from "./suite.ts"; - -import type { Channel } from "../mod.ts"; -import { createChannel, filter, lift, map, pipe, run } from "../mod.ts"; -import type { Subscription } from "../lib/types.ts"; - -describe("Stream combinators", () => { - let channel: Channel; - - beforeEach(() => { - channel = createChannel(); - }); - - it("lets you map", () => - run(function* () { - let upCase = map(function* (item: string) { - return item.toUpperCase(); - }); - - let subscription = yield* upCase(channel).subscribe(); - - expectType>(subscription); - - yield* channel.send("foo"); - - let next = yield* subscription.next(); - - expect(next.done).toBe(false); - expect(next.value).toBe("FOO"); - - yield* channel.close("var"); - - expect(yield* subscription.next()).toEqual({ - done: true, - value: "var", - }); - })); - - it("lets you filter", () => - run(function* () { - let longs = filter(function* (a: string) { - return a.length > 3; - }); - - let subscription = yield* longs(channel).subscribe(); - - expectType>(subscription); - - yield* channel.send("no"); - yield* channel.send("way"); - yield* channel.send("good"); - - let next = yield* subscription.next(); - - expect(next.done).toBe(false); - expect(next.value).toBe("good"); - - yield* channel.close("var"); - - expect(yield* subscription.next()).toEqual({ - done: true, - value: "var", - }); - })); - - it("lets you map and filter in combination", () => - run(function* () { - let shorts = filter(function* (a: string) { - return a.length < 4; - }); - - let length = map(function* (item: string) { - return item.length; - }); - - let subscription = yield* pipe(channel, shorts, length).subscribe(); - - expectType>(subscription); - - yield* channel.send("too long"); - yield* channel.send("too long 2"); - yield* channel.send("too long 3"); - yield* channel.send("foo"); - - let next = yield* subscription.next(); - - expect(next.done).toBe(false); - expect(next.value).toBe("foo".length); - - yield* channel.close("var"); - - expect(yield* subscription.next()).toEqual({ - done: true, - value: "var", - }); - })); - - it("lets you pass an ordinary function for a predicate", () => - run(function* () { - let upCase = map(lift((item: string) => { - return item.toUpperCase(); - })); - - let shorts = filter(lift((a: string) => { - return a.length < 4; - })); - - let subscription = yield* pipe(channel, shorts, upCase).subscribe(); - - expectType>(subscription); - - yield* channel.send("too long"); - yield* channel.send("too long 2"); - yield* channel.send("too long 3"); - yield* channel.send("foo"); - - let next = yield* subscription.next(); - - expect(next.done).toBe(false); - expect(next.value).toBe("FOO"); - - yield* channel.close("var"); - - expect(yield* subscription.next()).toEqual({ - done: true, - value: "var", - }); - })); -}); diff --git a/www/docs/collections.mdx b/www/docs/collections.mdx index 4a559771..a6de13f7 100644 --- a/www/docs/collections.mdx +++ b/www/docs/collections.mdx @@ -17,7 +17,7 @@ They are: |-----------------|-----------------------| |`AsyncIterator` | `Subscription` | |`AsyncIterable` | `Stream` | -|`for await` | `for yield* each` | +|`for await` | `for yield*` | If you're familiar with how to use these constructs, then you will be right at @@ -98,129 +98,6 @@ As you can see, the channel can have multiple subscribers and sending a message to the channel adds it to each active subscription. If the channel does not have any active subscriptions, then sending a message to it does nothing. -## Transforming Streams - -Because streams are completely stateless, it means that they can be transformed -into other streams using nothing but simple functions. For example, we can apply -several stream transformations in sequence by using the built-in [`map()`][map] -and [`filter()`][filter] functions. - -Let's look at an example of this in action by using `map()`: - -``` javascript -import { main, map, createChannel } from 'effection'; - -await main(function*() { - let channel = createChannel(); - let textStream = map(value => value.text)(channel); - let uppercaseStream = map(value => value.toUpperCase())(textStream); - - let subscription = yield* uppercaseStream.susbscribe(); - - yield* channel.send({ text: 'hello' }); - yield* channel.send({ text: 'world' }); - - yield* subscription.next(); - //=> { done: false, value: 'HELLO' } - - yield* subscription.next(); - //=> { done: false, value: 'WORLD' } -}); -``` - -If we unpack this a bit, we can see that we're creating a new `Stream` called -`textStream` using the `map` function on `stream`. It derives its values by -pulling the `text` property from each value it sees. - -We then use `map` again on `textStream` to create `uppercaseStream`, which -converts each value into uppercase. - -When we subscribe to `uppercaseStream` and send a value to the channel, we can -see that all of our transformations are applied. - -The `filter()` function can be used to restrict the values emitted by a -stream. The following uses a regular expression to select only those values -that include the string "ello" somewhere inside them. - -``` javascript -import { main, filter, createChannel } from 'effection'; - -await main(function*() { - let channel = createChannel(); - let elloStream = filter(value => value.match(/ello/))(channel); - - let subscription = yield* elloStream.subscribe(); - - yield* channel.send('hello'); - - // our filtered stream skips over this value - yield* channel.send('world'); - - yield* channel.send('jello'); - - console.log(yield* subscription.next()); - //=> { done: false, value: 'hello' } - - console.log(yield* subscription.next()); - //=> { done: false, value: 'jello' } -}); -``` - -When it is run, it prints out "hello" and "jello", but skips "world". - -## Piping Streams - -But what's up with that weird `map(fn)(stream)` and -`filter(fn)(stream)` syntax anyway? It might have even thrown you for -a loop the first time you saw it. Why don't we just pass the stream -that we want to transform as a second argument, like `map(fn, -stream)`? - -The reason is so that we can use `map()`, `filter()` and functions like them to -very rapidly compose into streaming pipelines. For example, we could re-write -the text transformation above using the [`pipe()`][pipe] utility function. - - -``` javascript -import { main, pipe, map, createChannel } from 'effection'; - -await main(function*() { - let channel = createChannel(); - - let messages = pipe( - channel, - map(value => value.text), - map(value => value.toUpperCase()), - ); - - let subscription = yield* messages.subscribe(); - - yield* channel.send({ text: 'hello' }); - yield* channel.send({ text: 'world' }); - - yield* subscription.next(); - //=> { done: false, value: 'HELLO' } - - yield* subscription.next(); - //=> { done: false, value: 'WORLD' } -}); -``` - -We can do this because invoking `map(fn)` returns a function that -takes a stream as its argument, and returns a stream. This allows us -to chain together any number of these stream-to-stream functions into -a pipeline to create new streams that apply each transformation in -sequence. - ->💡The `pipe()` function that comes with Effection is just a utility for -> function composition that is provided for convenience. You can just as -> easily use [`fp-ts#pipe`][fp-ts.pipe] or [`lodash#flow`][lodash.flow] - -In fact, any function that takes a stream and returns a stream can be used in -such a way, not just `map()`, and `filter()`. This means you can write your own -stream combinators, just so long as they return a function that takes a stream -and returns as stream. - ## for yield* each The entire point of having a stream is to consume values from it, and we @@ -288,86 +165,12 @@ await main(function*() { }); ``` -Another way of consuming values from a stream is to use the `first`. This is -handy if you want to just the very next item from a stream of events. - -``` javascript -import { main, first, createChannel, spawn, sleep } from 'effection'; - -await main(function*() { - let channel = createChannel(); - - yield* spawn(function*() { - yield sleep(1000); - yield* channel.send('hello'); - yield* sleep(1000); - yield* channel.send('world'); - }); - - let value = yield* first(channel); - console.log(value); // logs 'hello' -}); -``` - -As you can see here, once we send any value to the Stream, `first()` -returns that value. Now you might be tempted to call `first()` -multiple times, like this: - -``` javascript -// THIS IS NOT IDEAL -import { main, createChannel, spawn, sleep } from 'effection'; - -await main(function*() { - let channel = createChannel(); - - yield* spawn(function*() { - yield* sleep(1000); - yield* channel.send('hello'); - yield* sleep(1000); - yield* channel.send('world'); - }); - - let firstValue = yield* first(channel); - console.log(firstValue); // logs 'hello' - - let secondValue = yield* first(channel); - console.log(secondValue); // logs 'world' -}); -``` - -And while this works, it has a problem that becomes apparent if we slightly -change the order we do things in: - -``` javascript -// THIS IS NOT IDEAL -imchannel { main, createChannel, spawn, sleep } from 'effection'; - -await main(function*() { - let channel = createChannel(); - - yield* spawn(function*() { - yield* sleep(1000); - yield* channel.send('hello'); - yield* channel.send('world'); - }); - - let firstValue = yield* first(channel); - console.log(firstValue); // logs 'hello' - - yield* sleep(1000); - - // will block forever! We missed the message! - let secondValue = yield* first(channel); - - // we never get here! - console.log(secondValue); -}); -``` +## Why Subscriptions? -This makes it very clear why Subscriptions are necessary. It is -because unlike listening for one-off events, a subscription guarantees -that we can never miss any messages. Here we can see how Subscriptions -are more resilient: +Unlike listening for one-off events, a subscription guarantees +that we can never miss any messages. We can see how a subscription +will not drop a message even though it may receive deliveries even while +the consumer is doing other things: ``` javascript imchannel { main, createChannel, spawn, sleep } from 'effection';