From eea309cd2cb3153caa4364f8404711427df0aca8 Mon Sep 17 00:00:00 2001 From: Charles Lowell Date: Wed, 13 Dec 2023 10:12:38 -0600 Subject: [PATCH] =?UTF-8?q?=F0=9F=94=A5Remove=20stream=20combinators?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Stream transformations are not our core competency. If Effection is measured by that yard stick, it'll come up short because what we have is 5% implementation of a not-strongly-opinionated stream helpers library. One could very reasonably ask "well where is flatMap(), merge(), reduce(), reduceRight()" and on and on. The answer is that we aren't in that business, and it's better to not play that game at all, than to enter the race and get lapped 20 times. Our mantra is "Structured Concurrency and Effects for JavaScript". It's a minimalist approach that seeks to excel via maximum compatibility and minimum friction over the long term. Having these as part of our API surface feels like it runs counter to that principle. In the spirit of keeping things "core", remove first(), filter(), map(), pipe(), and associated documentation. They should be published as a separate library where the optimal implementation (if there is one) can be discovered. --- lib/events.ts | 9 +- lib/filter.ts | 33 ---- lib/first.ts | 13 -- lib/map.ts | 27 ---- lib/mod.ts | 4 - lib/pipe.ts | 273 -------------------------------- test/stream-combinators.test.ts | 129 --------------- www/docs/collections.mdx | 209 +----------------------- 8 files changed, 13 insertions(+), 684 deletions(-) delete mode 100644 lib/filter.ts delete mode 100644 lib/first.ts delete mode 100644 lib/map.ts delete mode 100644 lib/pipe.ts delete mode 100644 test/stream-combinators.test.ts diff --git a/lib/events.ts b/lib/events.ts index df38e6eb5..9b0ff23e1 100644 --- a/lib/events.ts +++ b/lib/events.ts @@ -1,7 +1,6 @@ // deno-lint-ignore-file no-explicit-any ban-types import { createSignal } from "./signal.ts"; import { resource } from "./instructions.ts"; -import { first } from "./mod.ts"; import type { Operation, Stream, Subscription } from "./types.ts"; type FN = (...any: any[]) => any; @@ -32,7 +31,13 @@ export function once< T extends EventTarget, K extends EventList | (string & {}), >(target: T, name: K): Operation> { - return first(on(target, name)); + return { + *[Symbol.iterator]() { + let subscription = yield* on(target, name).subscribe(); + let next = yield* subscription.next(); + return next.value; + }, + }; } /** diff --git a/lib/filter.ts b/lib/filter.ts deleted file mode 100644 index 459484543..000000000 --- a/lib/filter.ts +++ /dev/null @@ -1,33 +0,0 @@ -import type { Operation, Stream } from "./types.ts"; - -export interface Predicate { - (a: A): Operation; -} - -export function filter(predicate: Predicate) { - return function (stream: Stream): Stream { - return { - subscribe() { - return { - *[Symbol.iterator]() { - let subscription = yield* stream.subscribe(); - - return { - *next() { - while (true) { - let next = yield* subscription.next(); - - if (next.done) { - return next; - } else if (yield* predicate(next.value)) { - return next; - } - } - }, - }; - }, - }; - }, - }; - }; -} diff --git a/lib/first.ts b/lib/first.ts deleted file mode 100644 index e0ffbc49d..000000000 --- a/lib/first.ts +++ /dev/null @@ -1,13 +0,0 @@ -import type { Operation, Stream } from "./types.ts"; - -export function first(stream: Stream): Operation; -export function* first( - stream: Stream, -): Operation { - let subscription = yield* stream.subscribe(); - let result = yield* subscription.next(); - - if (!result.done) { - return result.value; - } -} diff --git a/lib/map.ts b/lib/map.ts deleted file mode 100644 index ae9465572..000000000 --- a/lib/map.ts +++ /dev/null @@ -1,27 +0,0 @@ -import type { Operation, Stream } from "./types.ts"; - -export function map(op: (a: A) => Operation) { - return function (stream: Stream): Stream { - return { - subscribe() { - return { - *[Symbol.iterator]() { - let subscription = yield* stream.subscribe(); - - return { - *next() { - let next = yield* subscription.next(); - - if (next.done) { - return next; - } else { - return { ...next, value: yield* op(next.value) }; - } - }, - }; - }, - }; - }, - }; - }; -} diff --git a/lib/mod.ts b/lib/mod.ts index e2b0e0f9e..6f3b41899 100644 --- a/lib/mod.ts +++ b/lib/mod.ts @@ -5,13 +5,9 @@ export * from "./instructions.ts"; export * from "./call.ts"; export * from "./run.ts"; export * from "./sleep.ts"; -export * from "./first.ts"; export * from "./async.ts"; export * from "./abort-signal.ts"; export * from "./result.ts"; -export * from "./map.ts"; -export * from "./filter.ts"; -export * from "./pipe.ts"; export * from "./lift.ts"; export * from "./events.ts"; export * from "./main.ts"; diff --git a/lib/pipe.ts b/lib/pipe.ts deleted file mode 100644 index ba75b51d6..000000000 --- a/lib/pipe.ts +++ /dev/null @@ -1,273 +0,0 @@ -// deno-lint-ignore no-explicit-any -type Func = (...arg: any) => any; - -export function pipe(a: A): A; -export function pipe(a: A, ab: (a: A) => B): B; -export function pipe(a: A, ab: (a: A) => B, bc: (b: B) => C): C; -export function pipe( - a: A, - ab: (a: A) => B, - bc: (b: B) => C, - cd: (c: C) => D, -): D; -export function pipe( - a: A, - ab: (a: A) => B, - bc: (b: B) => C, - cd: (c: C) => D, - de: (d: D) => E, -): E; -export function pipe( - a: A, - ab: (a: A) => B, - bc: (b: B) => C, - cd: (c: C) => D, - de: (d: D) => E, - ef: (e: E) => F, -): F; -export function pipe( - a: A, - ab: (a: A) => B, - bc: (b: B) => C, - cd: (c: C) => D, - de: (d: D) => E, - ef: (e: E) => F, - fg: (f: F) => G, -): G; -export function pipe( - a: A, - ab: (a: A) => B, - bc: (b: B) => C, - cd: (c: C) => D, - de: (d: D) => E, - ef: (e: E) => F, - fg: (f: F) => G, - gh: (g: G) => H, -): H; -export function pipe( - a: A, - ab: (a: A) => B, - bc: (b: B) => C, - cd: (c: C) => D, - de: (d: D) => E, - ef: (e: E) => F, - fg: (f: F) => G, - gh: (g: G) => H, - hi: (h: H) => I, -): I; -export function pipe( - a: A, - ab: (a: A) => B, - bc: (b: B) => C, - cd: (c: C) => D, - de: (d: D) => E, - ef: (e: E) => F, - fg: (f: F) => G, - gh: (g: G) => H, - hi: (h: H) => I, - ij: (i: I) => J, -): J; -export function pipe( - a: A, - ab: (a: A) => B, - bc: (b: B) => C, - cd: (c: C) => D, - de: (d: D) => E, - ef: (e: E) => F, - fg: (f: F) => G, - gh: (g: G) => H, - hi: (h: H) => I, - ij: (i: I) => J, - jk: (j: J) => K, -): K; -export function pipe( - a: A, - ab: (a: A) => B, - bc: (b: B) => C, - cd: (c: C) => D, - de: (d: D) => E, - ef: (e: E) => F, - fg: (f: F) => G, - gh: (g: G) => H, - hi: (h: H) => I, - ij: (i: I) => J, - jk: (j: J) => K, - kl: (k: K) => L, -): L; -export function pipe( - a: A, - ab: (a: A) => B, - bc: (b: B) => C, - cd: (c: C) => D, - de: (d: D) => E, - ef: (e: E) => F, - fg: (f: F) => G, - gh: (g: G) => H, - hi: (h: H) => I, - ij: (i: I) => J, - jk: (j: J) => K, - kl: (k: K) => L, - lm: (l: L) => M, -): M; -export function pipe( - a: A, - ab: (a: A) => B, - bc: (b: B) => C, - cd: (c: C) => D, - de: (d: D) => E, - ef: (e: E) => F, - fg: (f: F) => G, - gh: (g: G) => H, - hi: (h: H) => I, - ij: (i: I) => J, - jk: (j: J) => K, - kl: (k: K) => L, - lm: (l: L) => M, - mn: (m: M) => N, -): N; -export function pipe( - a: A, - ab: (a: A) => B, - bc: (b: B) => C, - cd: (c: C) => D, - de: (d: D) => E, - ef: (e: E) => F, - fg: (f: F) => G, - gh: (g: G) => H, - hi: (h: H) => I, - ij: (i: I) => J, - jk: (j: J) => K, - kl: (k: K) => L, - lm: (l: L) => M, - mn: (m: M) => N, - no: (n: N) => O, -): O; - -export function pipe( - a: A, - ab: (a: A) => B, - bc: (b: B) => C, - cd: (c: C) => D, - de: (d: D) => E, - ef: (e: E) => F, - fg: (f: F) => G, - gh: (g: G) => H, - hi: (h: H) => I, - ij: (i: I) => J, - jk: (j: J) => K, - kl: (k: K) => L, - lm: (l: L) => M, - mn: (m: M) => N, - no: (n: N) => O, - op: (o: O) => P, -): P; -export function pipe( - a: A, - ab: (a: A) => B, - bc: (b: B) => C, - cd: (c: C) => D, - de: (d: D) => E, - ef: (e: E) => F, - fg: (f: F) => G, - gh: (g: G) => H, - hi: (h: H) => I, - ij: (i: I) => J, - jk: (j: J) => K, - kl: (k: K) => L, - lm: (l: L) => M, - mn: (m: M) => N, - no: (n: N) => O, - op: (o: O) => P, - pq: (p: P) => Q, -): Q; -export function pipe( - a: A, - ab: (a: A) => B, - bc: (b: B) => C, - cd: (c: C) => D, - de: (d: D) => E, - ef: (e: E) => F, - fg: (f: F) => G, - gh: (g: G) => H, - hi: (h: H) => I, - ij: (i: I) => J, - jk: (j: J) => K, - kl: (k: K) => L, - lm: (l: L) => M, - mn: (m: M) => N, - no: (n: N) => O, - op: (o: O) => P, - pq: (p: P) => Q, - qr: (q: Q) => R, -): R; -export function pipe( - a: A, - ab: (a: A) => B, - bc: (b: B) => C, - cd: (c: C) => D, - de: (d: D) => E, - ef: (e: E) => F, - fg: (f: F) => G, - gh: (g: G) => H, - hi: (h: H) => I, - ij: (i: I) => J, - jk: (j: J) => K, - kl: (k: K) => L, - lm: (l: L) => M, - mn: (m: M) => N, - no: (n: N) => O, - op: (o: O) => P, - pq: (p: P) => Q, - qr: (q: Q) => R, - rs: (r: R) => S, -): S; -export function pipe< - A, - B, - C, - D, - E, - F, - G, - H, - I, - J, - K, - L, - M, - N, - O, - P, - Q, - R, - S, - T, ->( - a: A, - ab: (a: A) => B, - bc: (b: B) => C, - cd: (c: C) => D, - de: (d: D) => E, - ef: (e: E) => F, - fg: (f: F) => G, - gh: (g: G) => H, - hi: (h: H) => I, - ij: (i: I) => J, - jk: (j: J) => K, - kl: (k: K) => L, - lm: (l: L) => M, - mn: (m: M) => N, - no: (n: N) => O, - op: (o: O) => P, - pq: (p: P) => Q, - qr: (q: Q) => R, - rs: (r: R) => S, - st: (s: S) => T, -): T; -export function pipe(a: unknown, ...fns: Func[]): unknown { - for (const fn of fns) { - a = fn(a); - } - - return a; -} diff --git a/test/stream-combinators.test.ts b/test/stream-combinators.test.ts deleted file mode 100644 index 9e8e38e89..000000000 --- a/test/stream-combinators.test.ts +++ /dev/null @@ -1,129 +0,0 @@ -import { beforeEach, describe, expect, expectType, it } from "./suite.ts"; - -import type { Channel } from "../mod.ts"; -import { createChannel, filter, lift, map, pipe, run } from "../mod.ts"; -import type { Subscription } from "../lib/types.ts"; - -describe("Stream combinators", () => { - let channel: Channel; - - beforeEach(() => { - channel = createChannel(); - }); - - it("lets you map", () => - run(function* () { - let upCase = map(function* (item: string) { - return item.toUpperCase(); - }); - - let subscription = yield* upCase(channel).subscribe(); - - expectType>(subscription); - - yield* channel.send("foo"); - - let next = yield* subscription.next(); - - expect(next.done).toBe(false); - expect(next.value).toBe("FOO"); - - yield* channel.close("var"); - - expect(yield* subscription.next()).toEqual({ - done: true, - value: "var", - }); - })); - - it("lets you filter", () => - run(function* () { - let longs = filter(function* (a: string) { - return a.length > 3; - }); - - let subscription = yield* longs(channel).subscribe(); - - expectType>(subscription); - - yield* channel.send("no"); - yield* channel.send("way"); - yield* channel.send("good"); - - let next = yield* subscription.next(); - - expect(next.done).toBe(false); - expect(next.value).toBe("good"); - - yield* channel.close("var"); - - expect(yield* subscription.next()).toEqual({ - done: true, - value: "var", - }); - })); - - it("lets you map and filter in combination", () => - run(function* () { - let shorts = filter(function* (a: string) { - return a.length < 4; - }); - - let length = map(function* (item: string) { - return item.length; - }); - - let subscription = yield* pipe(channel, shorts, length).subscribe(); - - expectType>(subscription); - - yield* channel.send("too long"); - yield* channel.send("too long 2"); - yield* channel.send("too long 3"); - yield* channel.send("foo"); - - let next = yield* subscription.next(); - - expect(next.done).toBe(false); - expect(next.value).toBe("foo".length); - - yield* channel.close("var"); - - expect(yield* subscription.next()).toEqual({ - done: true, - value: "var", - }); - })); - - it("lets you pass an ordinary function for a predicate", () => - run(function* () { - let upCase = map(lift((item: string) => { - return item.toUpperCase(); - })); - - let shorts = filter(lift((a: string) => { - return a.length < 4; - })); - - let subscription = yield* pipe(channel, shorts, upCase).subscribe(); - - expectType>(subscription); - - yield* channel.send("too long"); - yield* channel.send("too long 2"); - yield* channel.send("too long 3"); - yield* channel.send("foo"); - - let next = yield* subscription.next(); - - expect(next.done).toBe(false); - expect(next.value).toBe("FOO"); - - yield* channel.close("var"); - - expect(yield* subscription.next()).toEqual({ - done: true, - value: "var", - }); - })); -}); diff --git a/www/docs/collections.mdx b/www/docs/collections.mdx index 4a559771e..a6de13f79 100644 --- a/www/docs/collections.mdx +++ b/www/docs/collections.mdx @@ -17,7 +17,7 @@ They are: |-----------------|-----------------------| |`AsyncIterator` | `Subscription` | |`AsyncIterable` | `Stream` | -|`for await` | `for yield* each` | +|`for await` | `for yield*` | If you're familiar with how to use these constructs, then you will be right at @@ -98,129 +98,6 @@ As you can see, the channel can have multiple subscribers and sending a message to the channel adds it to each active subscription. If the channel does not have any active subscriptions, then sending a message to it does nothing. -## Transforming Streams - -Because streams are completely stateless, it means that they can be transformed -into other streams using nothing but simple functions. For example, we can apply -several stream transformations in sequence by using the built-in [`map()`][map] -and [`filter()`][filter] functions. - -Let's look at an example of this in action by using `map()`: - -``` javascript -import { main, map, createChannel } from 'effection'; - -await main(function*() { - let channel = createChannel(); - let textStream = map(value => value.text)(channel); - let uppercaseStream = map(value => value.toUpperCase())(textStream); - - let subscription = yield* uppercaseStream.susbscribe(); - - yield* channel.send({ text: 'hello' }); - yield* channel.send({ text: 'world' }); - - yield* subscription.next(); - //=> { done: false, value: 'HELLO' } - - yield* subscription.next(); - //=> { done: false, value: 'WORLD' } -}); -``` - -If we unpack this a bit, we can see that we're creating a new `Stream` called -`textStream` using the `map` function on `stream`. It derives its values by -pulling the `text` property from each value it sees. - -We then use `map` again on `textStream` to create `uppercaseStream`, which -converts each value into uppercase. - -When we subscribe to `uppercaseStream` and send a value to the channel, we can -see that all of our transformations are applied. - -The `filter()` function can be used to restrict the values emitted by a -stream. The following uses a regular expression to select only those values -that include the string "ello" somewhere inside them. - -``` javascript -import { main, filter, createChannel } from 'effection'; - -await main(function*() { - let channel = createChannel(); - let elloStream = filter(value => value.match(/ello/))(channel); - - let subscription = yield* elloStream.subscribe(); - - yield* channel.send('hello'); - - // our filtered stream skips over this value - yield* channel.send('world'); - - yield* channel.send('jello'); - - console.log(yield* subscription.next()); - //=> { done: false, value: 'hello' } - - console.log(yield* subscription.next()); - //=> { done: false, value: 'jello' } -}); -``` - -When it is run, it prints out "hello" and "jello", but skips "world". - -## Piping Streams - -But what's up with that weird `map(fn)(stream)` and -`filter(fn)(stream)` syntax anyway? It might have even thrown you for -a loop the first time you saw it. Why don't we just pass the stream -that we want to transform as a second argument, like `map(fn, -stream)`? - -The reason is so that we can use `map()`, `filter()` and functions like them to -very rapidly compose into streaming pipelines. For example, we could re-write -the text transformation above using the [`pipe()`][pipe] utility function. - - -``` javascript -import { main, pipe, map, createChannel } from 'effection'; - -await main(function*() { - let channel = createChannel(); - - let messages = pipe( - channel, - map(value => value.text), - map(value => value.toUpperCase()), - ); - - let subscription = yield* messages.subscribe(); - - yield* channel.send({ text: 'hello' }); - yield* channel.send({ text: 'world' }); - - yield* subscription.next(); - //=> { done: false, value: 'HELLO' } - - yield* subscription.next(); - //=> { done: false, value: 'WORLD' } -}); -``` - -We can do this because invoking `map(fn)` returns a function that -takes a stream as its argument, and returns a stream. This allows us -to chain together any number of these stream-to-stream functions into -a pipeline to create new streams that apply each transformation in -sequence. - ->💡The `pipe()` function that comes with Effection is just a utility for -> function composition that is provided for convenience. You can just as -> easily use [`fp-ts#pipe`][fp-ts.pipe] or [`lodash#flow`][lodash.flow] - -In fact, any function that takes a stream and returns a stream can be used in -such a way, not just `map()`, and `filter()`. This means you can write your own -stream combinators, just so long as they return a function that takes a stream -and returns as stream. - ## for yield* each The entire point of having a stream is to consume values from it, and we @@ -288,86 +165,12 @@ await main(function*() { }); ``` -Another way of consuming values from a stream is to use the `first`. This is -handy if you want to just the very next item from a stream of events. - -``` javascript -import { main, first, createChannel, spawn, sleep } from 'effection'; - -await main(function*() { - let channel = createChannel(); - - yield* spawn(function*() { - yield sleep(1000); - yield* channel.send('hello'); - yield* sleep(1000); - yield* channel.send('world'); - }); - - let value = yield* first(channel); - console.log(value); // logs 'hello' -}); -``` - -As you can see here, once we send any value to the Stream, `first()` -returns that value. Now you might be tempted to call `first()` -multiple times, like this: - -``` javascript -// THIS IS NOT IDEAL -import { main, createChannel, spawn, sleep } from 'effection'; - -await main(function*() { - let channel = createChannel(); - - yield* spawn(function*() { - yield* sleep(1000); - yield* channel.send('hello'); - yield* sleep(1000); - yield* channel.send('world'); - }); - - let firstValue = yield* first(channel); - console.log(firstValue); // logs 'hello' - - let secondValue = yield* first(channel); - console.log(secondValue); // logs 'world' -}); -``` - -And while this works, it has a problem that becomes apparent if we slightly -change the order we do things in: - -``` javascript -// THIS IS NOT IDEAL -imchannel { main, createChannel, spawn, sleep } from 'effection'; - -await main(function*() { - let channel = createChannel(); - - yield* spawn(function*() { - yield* sleep(1000); - yield* channel.send('hello'); - yield* channel.send('world'); - }); - - let firstValue = yield* first(channel); - console.log(firstValue); // logs 'hello' - - yield* sleep(1000); - - // will block forever! We missed the message! - let secondValue = yield* first(channel); - - // we never get here! - console.log(secondValue); -}); -``` +## Why Subscriptions? -This makes it very clear why Subscriptions are necessary. It is -because unlike listening for one-off events, a subscription guarantees -that we can never miss any messages. Here we can see how Subscriptions -are more resilient: +Unlike listening for one-off events, a subscription guarantees +that we can never miss any messages. We can see how a subscription +will not drop a message even though it may receive deliveries even while +the consumer is doing other things: ``` javascript imchannel { main, createChannel, spawn, sleep } from 'effection';