Skip to content

Commit

Permalink
Create a place to react to events
Browse files Browse the repository at this point in the history
  • Loading branch information
thewilkybarkid committed Sep 26, 2024
1 parent f1f6775 commit 255d419
Show file tree
Hide file tree
Showing 2 changed files with 52 additions and 22 deletions.
53 changes: 32 additions & 21 deletions src/Feedback/index.ts
Original file line number Diff line number Diff line change
@@ -1,10 +1,11 @@
import { Array, Context, Data, Effect, pipe, type Record } from 'effect'
import { Array, Context, Data, Effect, pipe, PubSub, type Record } from 'effect'
import type { Orcid } from 'orcid-id-ts'
import type { Uuid } from 'uuid-ts'
import { EventStore } from '../Context.js'
import type { FeedbackCommand } from './Commands.js'
import { DecideFeedback } from './Decide.js'
import type { FeedbackError } from './Errors.js'
import type { FeedbackEvent } from './Events.js'
import { EvolveFeedback } from './Evolve.js'
import * as Queries from './Queries.js'
import {
Expand All @@ -21,6 +22,8 @@ export * from './Events.js'
export * from './Evolve.js'
export * from './State.js'

export class FeedbackEvents extends Context.Tag('FeedbackEvents')<FeedbackEvents, PubSub.PubSub<FeedbackEvent>>() {}

export class UnableToHandleCommand extends Data.TaggedError('UnableToHandleCommand')<{ cause?: Error }> {}

export class HandleFeedbackCommand extends Context.Tag('HandleFeedbackCommand')<
Expand All @@ -31,27 +34,35 @@ export class HandleFeedbackCommand extends Context.Tag('HandleFeedbackCommand')<
}) => Effect.Effect<void, UnableToHandleCommand | FeedbackError>
>() {}

export const makeHandleFeedbackCommand: Effect.Effect<typeof HandleFeedbackCommand.Service, never, EventStore> =
Effect.gen(function* () {
const eventStore = yield* EventStore

return ({ feedbackId, command }) =>
Effect.gen(function* () {
const { events, latestVersion } = yield* eventStore.getEvents(feedbackId)

const state = Array.reduce(events, new FeedbackNotStarted() as FeedbackState, (state, event) =>
EvolveFeedback(state)(event),
)

yield* pipe(DecideFeedback(state)(command), Effect.andThen(eventStore.commitEvent(feedbackId, latestVersion)))
}).pipe(
Effect.catchTags({
FailedToCommitEvent: cause => new UnableToHandleCommand({ cause }),
FailedToGetEvents: cause => new UnableToHandleCommand({ cause }),
ResourceHasChanged: cause => new UnableToHandleCommand({ cause }),
}),
export const makeHandleFeedbackCommand: Effect.Effect<
typeof HandleFeedbackCommand.Service,
never,
EventStore | FeedbackEvents
> = Effect.gen(function* () {
const eventStore = yield* EventStore
const feedbackEvents = yield* FeedbackEvents

return ({ feedbackId, command }) =>
Effect.gen(function* () {
const { events, latestVersion } = yield* eventStore.getEvents(feedbackId)

const state = Array.reduce(events, new FeedbackNotStarted() as FeedbackState, (state, event) =>
EvolveFeedback(state)(event),
)
})

yield* pipe(
DecideFeedback(state)(command),
Effect.tap(eventStore.commitEvent(feedbackId, latestVersion)),
Effect.andThen(event => PubSub.publish(feedbackEvents, event)),
)
}).pipe(
Effect.catchTags({
FailedToCommitEvent: cause => new UnableToHandleCommand({ cause }),
FailedToGetEvents: cause => new UnableToHandleCommand({ cause }),
ResourceHasChanged: cause => new UnableToHandleCommand({ cause }),
}),
)
})

export class UnableToQuery extends Data.TaggedError('UnableToQuery')<{ cause?: Error }> {}

Expand Down
21 changes: 20 additions & 1 deletion src/Program.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import { FetchHttpClient } from '@effect/platform'
import { type Array, Effect, flow, Layer, Match, pipe, Runtime } from 'effect'
import { type Array, Console, Effect, flow, Layer, Match, pipe, PubSub, Queue, Runtime } from 'effect'
import type { ReadonlyNonEmptyArray } from 'fp-ts/lib/ReadonlyNonEmptyArray.js'
import { DeprecatedLoggerEnv, DeprecatedSleepEnv, EventStore, ExpressConfig } from './Context.js'
import { makeDeprecatedSleepEnv } from './DeprecatedServices.js'
Expand Down Expand Up @@ -122,6 +122,16 @@ const setUpFetch = Layer.effect(

export const Program = pipe(
WebApp,
Layer.merge(
Layer.scopedDiscard(
Effect.gen(function* () {
const feedbackEvents = yield* Feedback.FeedbackEvents
const dequeue = yield* PubSub.subscribe(feedbackEvents)

yield* Queue.take(dequeue).pipe(Effect.andThen(Console.log))
}),
),
),
Layer.provide(getPrereview),
Layer.provide(getPreprint),
Layer.provide(Layer.effect(Feedback.HandleFeedbackCommand, Feedback.makeHandleFeedbackCommand)),
Expand All @@ -132,6 +142,15 @@ export const Program = pipe(
),
),
Layer.provide(Layer.effect(Feedback.GetFeedback, Feedback.makeGetFeedback)),
Layer.provide(
Layer.scoped(
Feedback.FeedbackEvents,
Effect.acquireRelease(
pipe(PubSub.unbounded<Feedback.FeedbackEvent>(), Effect.tap(Effect.logDebug('Feedback events started'))),
flow(PubSub.shutdown, Effect.tap(Effect.logDebug('Feedback events stopped'))),
),
),
),
Layer.provide(Layer.effect(EventStore, LibsqlEventStore.make)),
Layer.provide(setUpFetch),
Layer.provide(Layer.effect(Uuid.GenerateUuid, Uuid.make)),
Expand Down

0 comments on commit 255d419

Please sign in to comment.