Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

♻️ Return to Stream as Operation #859

Merged
merged 3 commits into from
Dec 16, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion lib/async.ts
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ export function subscribe<T, R>(iter: AsyncIterator<T, R>): Subscription<T, R> {
*/
export function stream<T, R>(iterable: AsyncIterable<T, R>): Stream<T, R> {
return {
*subscribe() {
*[Symbol.iterator]() {
return subscribe(iterable[Symbol.asyncIterator]());
},
};
Expand Down
6 changes: 3 additions & 3 deletions lib/channel.ts
Original file line number Diff line number Diff line change
Expand Up @@ -39,8 +39,8 @@ export interface Channel<T, TClose> extends Stream<T, TClose> {
*
* yield* channel.send('too early'); // the channel has no subscribers yet!
*
* let subscription1 = yield* channel.subscribe();
* let subscription2 = yield* channel.subscribe();
* let subscription1 = yield* channel;
* let subscription2 = yield* channel;
*
* yield* channel.send('hello');
* yield* channel.send('world');
Expand All @@ -58,6 +58,6 @@ export function createChannel<T, TClose = void>(): Channel<T, TClose> {
return {
send: lift(signal.send),
close: lift(signal.close),
subscribe: signal.subscribe,
[Symbol.iterator]: signal[Symbol.iterator],
};
}
2 changes: 1 addition & 1 deletion lib/each.ts
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ import { createContext } from "./context.ts";
export function each<T>(stream: Stream<T, unknown>): Operation<Iterable<T>> {
return {
*[Symbol.iterator]() {
let subscription = yield* stream.subscribe();
let subscription = yield* stream;
let current = yield* subscription.next();
let stack = yield* EachStack.get();
if (!stack) {
Expand Down
32 changes: 14 additions & 18 deletions lib/events.ts
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ export function once<
>(target: T, name: K): Operation<EventTypeFromEventTarget<T, K>> {
return {
*[Symbol.iterator]() {
let subscription = yield* on(target, name).subscribe();
let subscription = yield* on(target, name);
let next = yield* subscription.next();
return next.value;
},
Expand All @@ -55,23 +55,19 @@ export function on<
T extends EventTarget,
K extends EventList<T> | (string & {}),
>(target: T, name: K): Stream<EventTypeFromEventTarget<T, K>, never> {
return {
subscribe() {
return resource(function* (provide) {
let { send, subscribe } = createSignal<Event>();
return resource(function* (provide) {
let signal = createSignal<Event>();

target.addEventListener(name, send);
target.addEventListener(name, signal.send);

try {
yield* provide(
yield* subscribe() as Operation<
Subscription<EventTypeFromEventTarget<T, K>, never>
>,
);
} finally {
target.removeEventListener(name, send);
}
});
},
};
try {
yield* provide(
yield* signal as Operation<
Subscription<EventTypeFromEventTarget<T, K>, never>
>,
);
} finally {
target.removeEventListener(name, signal.send);
}
});
}
8 changes: 4 additions & 4 deletions lib/signal.ts
Original file line number Diff line number Diff line change
Expand Up @@ -59,10 +59,10 @@ export interface Signal<T, TClose> extends Stream<T, TClose> {
* ```javascript
* export function useActions(pattern: ActionPattern): Stream<AnyAction, void> {
* return {
* *subscribe() {
* *[Symbol.iterator]() {
* const actions = yield* ActionContext;
* yield* QueueFactory.set(() => createFilterQueue(matcher(pattern));
* return yield* actions.subscribe();
* return yield* actions;
* }
* }
* }
Expand Down Expand Up @@ -116,7 +116,7 @@ export const SignalQueueFactory = createContext(
export function createSignal<T, TClose = never>(): Signal<T, TClose> {
let subscribers = new Set<Queue<T, TClose>>();

let useSubscription = resource<Subscription<T, TClose>>(function* (provide) {
let subscribe = resource<Subscription<T, TClose>>(function* (provide) {
let newQueue = yield* SignalQueueFactory;
let queue = newQueue<T, TClose>();
subscribers.add(queue);
Expand All @@ -140,5 +140,5 @@ export function createSignal<T, TClose = never>(): Signal<T, TClose> {
}
}

return { send, close, subscribe: () => useSubscription };
return { ...subscribe, send, close };
}
5 changes: 1 addition & 4 deletions lib/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -218,10 +218,7 @@ export interface Scope {
*
* @see https://frontside.com/effection/docs/collections#stream
*/
//export type Stream<T, TReturn> = Operation<Subscription<T, TReturn>>;
export interface Stream<T, TReturn> {
subscribe(): Operation<Subscription<T, TReturn>>;
}
export type Stream<T, TReturn> = Operation<Subscription<T, TReturn>>;

/**
* The Effection equivalent of an [`AsyncIterator`](https://developer.mozilla.org/en-US/docs/Web/JavaScript/Reference/Global_Objects/AsyncIterator)
Expand Down
19 changes: 10 additions & 9 deletions test/channel.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,8 @@ describe("Channel", () => {
$afterEach(() => close());

it("does not use the same event twice when serially subscribed to a channel", function* () {
let { subscribe, ...input } = createChannel<string, void>();
let input = createChannel<string, void>();

let actual: string[] = [];
function* channel() {
yield* sleep(10);
Expand All @@ -29,11 +30,11 @@ describe("Channel", () => {
function* root() {
yield* spawn(channel);

let subscription = yield* subscribe();
let subscription = yield* input;
let result = yield* subscription.next();
actual.push(result.value as string);

subscription = yield* subscribe();
subscription = yield* input;
result = yield* subscription.next();
actual.push(result.value as string);
}
Expand All @@ -51,7 +52,7 @@ describe("Channel", () => {

describe("sending a message", () => {
it("receives message on subscription", function* () {
let subscription = yield* channel.subscribe();
let subscription = yield* channel;
yield* channel.send("hello");
let result = yield* subscription.next();
expect(result.done).toEqual(false);
Expand All @@ -61,7 +62,7 @@ describe("Channel", () => {

describe("blocking on next", () => {
it("receives message on subscription done", function* () {
let subscription = yield* channel.subscribe();
let subscription = yield* channel;
let result = yield* spawn(() => subscription.next());
yield* sleep(10);
yield* channel.send("hello");
Expand All @@ -71,7 +72,7 @@ describe("Channel", () => {

describe("sending multiple messages", () => {
it("receives messages in order", function* () {
let subscription = yield* channel.subscribe();
let subscription = yield* channel;
let { send } = channel;
yield* send("hello");
yield* send("foo");
Expand All @@ -86,7 +87,7 @@ describe("Channel", () => {
it("receives message on subscribable end", function* () {
let channel = createChannel();

let subscription = yield* channel.subscribe();
let subscription = yield* channel;

yield* channel.send("hello");

Expand All @@ -101,7 +102,7 @@ describe("Channel", () => {
describe("without argument", () => {
it("closes subscriptions", function* () {
let channel = createChannel();
let subscription = yield* channel.subscribe();
let subscription = yield* channel;
yield* channel.send("foo");
yield* channel.close();
expect(yield* subscription.next()).toEqual({
Expand All @@ -118,7 +119,7 @@ describe("Channel", () => {
describe("with close argument", () => {
it("closes subscriptions with the argument", function* () {
let channel = createChannel<string, number>();
let subscription = yield* channel.subscribe();
let subscription = yield* channel;
yield* channel.send("foo");
yield* channel.close(12);

Expand Down
22 changes: 11 additions & 11 deletions test/each.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,18 +4,18 @@ import { createChannel, each, run, spawn, suspend } from "../mod.ts";
describe("each", () => {
it("can be used to iterate a stream", async () => {
await run(function* () {
let { subscribe, ...input } = createChannel<string, void>();
let channel = createChannel<string, void>();
let actual = [] as string[];
yield* spawn(function* () {
for (let value of yield* each({ subscribe })) {
for (let value of yield* each(channel)) {
actual.push(value);
yield* each.next();
}
});

yield* input.send("one");
yield* input.send("two");
yield* input.send("three");
yield* channel.send("one");
yield* channel.send("two");
yield* channel.send("three");

expect(actual).toEqual(["one", "two", "three"]);
});
Expand Down Expand Up @@ -53,30 +53,30 @@ describe("each", () => {

it("handles context correctly if you break out of a loop", async () => {
await expect(run(function* () {
let { subscribe, ...input } = createChannel<string>();
let channel = createChannel<string>();

yield* spawn(function* () {
for (let _ of yield* each({ subscribe })) {
for (let _ of yield* each(channel)) {
break;
}
// we're out of the loop, each.next() should be invalid.
yield* each.next();
});

yield* input.send("hello");
yield* channel.send("hello");
yield* suspend();
})).rejects.toHaveProperty("name", "IterationError");
});

it("throws an error if you forget to invoke each.next()", async () => {
await expect(run(function* () {
let { subscribe, ...input } = createChannel<string>();
let channel = createChannel<string>();
yield* spawn(function* () {
for (let _ of yield* each({ subscribe })) {
for (let _ of yield* each(channel)) {
_;
}
});
yield* input.send("hello");
yield* channel.send("hello");
yield* suspend();
})).rejects.toHaveProperty("name", "IterationError");
});
Expand Down
38 changes: 18 additions & 20 deletions www/docs/collections.mdx
Original file line number Diff line number Diff line change
Expand Up @@ -77,8 +77,8 @@ await main(function*() {
// the channel has no subscribers yet!
yield* channel.send('too early');

let subscription1 = yield* channel.subscribe();
let subscription2 = yield* channel.subscribe();
let subscription1 = yield* channel;
let subscription2 = yield* channel;

yield* send('hello');
yield* send('world');
Expand Down Expand Up @@ -178,7 +178,7 @@ imchannel { main, createChannel, spawn, sleep } from 'effection';
await main(function*() {
let channel = createChannel();

let subscription = yield* channel.subscribe();
let subscription = yield* channel;

yield* spawn(function*() {
yield* sleep(1000);
Expand Down Expand Up @@ -216,7 +216,7 @@ import { main, sleep, spawn, createChannel } from "effection";
await main(function*() {
let channel = createChannel();

let subscription = yield* channel.subscribe();
let subscription = yield* channel;

yield* spawn(function*() {
yield* channel.send('hello');
Expand Down Expand Up @@ -344,8 +344,8 @@ function* logAndCancel(button) {
```

It turns out that [resources][resources] are just what we need to make
this happen. If you recall, a [`Stream`][stream] is just has a
subscribe() [`Operation`][operation] that returns a
this happen. If you recall, a [`Stream`][stream] is just an
[`Operation`][operation] that returns a
[`Subscription`][subscription]. So the simplest way to implement such
an operation is as a _resource that provides a subscription_.

Expand All @@ -358,20 +358,18 @@ Armed with resources, we can now implement our hypothetical `clicksOn` utility.
import { resource } from "effection"

export function clicksOn(button) {
return {
subscribe: () => resource(function*(provide) {
let clicks = createSignal();
try {
button.addEventListener("click", clicks.send);

let subscription = yield* clicks.subscribe();
yield* provide(subscription);

} finally {
button.removeEventListener("click", send);
}
}),
};
return resource(function*(provide) {
let clicks = createSignal();
try {
button.addEventListener("click", clicks.send);

let subscription = yield* clicks.subscribe();
yield* provide(subscription);

} finally {
button.removeEventListener("click", send);
}
});
}
```

Expand Down
Loading