diff --git a/docker-compose.dev.yaml b/docker-compose.dev.yaml index 95956913..b4e44108 100644 --- a/docker-compose.dev.yaml +++ b/docker-compose.dev.yaml @@ -126,12 +126,14 @@ services: restart: always rabbitmq: - image: rabbitmq + build: + context: services/rabbitmq + dockerfile: Dockerfile ports: - 5672:5672 - 15672:15672 env_file: - - envs/rabbitmq.env + - services/rabbitmq/.env volumes: - rabbitmq-data:/var/lib/rabbitmq healthcheck: diff --git a/services/item/src/events/consume-transaction-auto-completed-event.ts b/services/item/src/events/consume-transaction-auto-completed-event.ts new file mode 100644 index 00000000..574b89e4 --- /dev/null +++ b/services/item/src/events/consume-transaction-auto-completed-event.ts @@ -0,0 +1,59 @@ +import { ItemStatus } from "@/types"; +import { itemsCollection, transactionsCollection } from "@/utils/db"; +import { channel, delayedExchange } from "./init"; + +export async function consumeTransactionAutoCompletedEvent() { + const { queue } = await channel.assertQueue( + "transaction.transaction.auto-completed", + ); + + await channel.bindQueue(queue, delayedExchange, "transaction.auto-completed"); + + channel.consume(queue, async (message) => { + if (!message) { + return; + } + + const autoCompletedTransactionId = JSON.parse( + message.content.toString(), + ) as string; + + const transaction = await transactionsCollection.findOne({ + id: autoCompletedTransactionId, + }); + + if (!transaction) { + channel.ack(message); + return; + } + + if (transaction.completedAt || transaction.cancelledAt) { + channel.ack(message); + return; + } + + await itemsCollection.updateOne( + { + id: transaction.item.id, + }, + { + $set: { + status: ItemStatus.Sold, + }, + }, + ); + + await transactionsCollection.updateOne( + { + id: autoCompletedTransactionId, + }, + { + $set: { + completedAt: new Date(), + }, + }, + ); + + channel.ack(message); + }); +} diff --git a/services/item/src/events/init.ts b/services/item/src/events/init.ts index 865361a3..23e51f48 100644 --- a/services/item/src/events/init.ts +++ b/services/item/src/events/init.ts @@ -13,3 +13,13 @@ export const { exchange: itemExchange } = await channel.assertExchange( "item", "topic", ); + +export const { exchange: delayedExchange } = await channel.assertExchange( + "delayed", + "x-delayed-message", + { + arguments: { + "x-delayed-type": "topic", + }, + }, +); diff --git a/services/item/src/events/publish-transaction-auto-completed-event.ts b/services/item/src/events/publish-transaction-auto-completed-event.ts new file mode 100644 index 00000000..5267f1f0 --- /dev/null +++ b/services/item/src/events/publish-transaction-auto-completed-event.ts @@ -0,0 +1,15 @@ +import { channel, delayedExchange } from "./init"; + +export function publishTransactionAutoCompletedEvent(transactionId: string) { + channel.publish( + delayedExchange, + "transaction.auto-completed", + Buffer.from(JSON.stringify(transactionId)), + { + persistent: true, + headers: { + "x-delay": 1000 * 60 * 60 * 24 * 14, + }, + }, + ); +} diff --git a/services/item/src/index.ts b/services/item/src/index.ts index 5fca222f..c69f7a5f 100644 --- a/services/item/src/index.ts +++ b/services/item/src/index.ts @@ -7,6 +7,7 @@ import { logger } from "hono/logger"; import { secureHeaders } from "hono/secure-headers"; import { consumeAccountDeletedEvent } from "./events/consume-account-deleted-event"; import { consumeAccountUpdatedEvent } from "./events/consume-account-updated-event"; +import { consumeTransactionAutoCompletedEvent } from "./events/consume-transaction-auto-completed-event"; import { itemsController } from "./items/controller"; import { itemPacksController } from "./items/packs/controller"; import { globalErrorHandler } from "./middleware/global-error-handler"; @@ -74,5 +75,6 @@ app.notFound(globalNotFoundHandler); // Start consuming RabbitMQ events. await consumeAccountUpdatedEvent(); await consumeAccountDeletedEvent(); +await consumeTransactionAutoCompletedEvent(); export default app; diff --git a/services/item/src/items/states.ts b/services/item/src/items/states.ts index 1fc5d1a0..52edf35e 100644 --- a/services/item/src/items/states.ts +++ b/services/item/src/items/states.ts @@ -1,3 +1,4 @@ +import { publishTransactionAutoCompletedEvent } from "@/events/publish-transaction-auto-completed-event"; import * as transactionsRepository from "@/transactions/repository"; import { ItemStatus, type Item, type SimplifiedAccount } from "@/types"; import { HTTPException } from "hono/http-exception"; @@ -46,7 +47,7 @@ const forSaleToDealt: Transition = async ({ item, actor, buyer }) => { throw new HTTPException(409, { message: "Transaction already exists" }); } - await transactionsRepository.create({ + const transactionId = await transactionsRepository.create({ item: { id: item.id, name: item.name, @@ -55,6 +56,8 @@ const forSaleToDealt: Transition = async ({ item, actor, buyer }) => { seller: item.seller, buyer: buyer, }); + + publishTransactionAutoCompletedEvent(transactionId); }; const dealtToSold: Transition = async ({ item, actor }) => { diff --git a/services/item/src/transactions/repository.ts b/services/item/src/transactions/repository.ts index 82a572f2..f7bd03ad 100644 --- a/services/item/src/transactions/repository.ts +++ b/services/item/src/transactions/repository.ts @@ -13,8 +13,10 @@ export async function findOne(filter: Filter) { type CreateDto = Pick; export async function create(dto: CreateDto) { + const id = crypto.randomUUID(); + await transactionsCollection.insertOne({ - id: crypto.randomUUID(), + id, buyer: dto.buyer, seller: dto.seller, item: dto.item, @@ -22,6 +24,8 @@ export async function create(dto: CreateDto) { completedAt: null, cancelledAt: null, }); + + return id; } export async function complete(id: string) { diff --git a/services/item/tests/items/update-status.test.ts b/services/item/tests/items/update-status.test.ts index e0d8b808..8e22b277 100644 --- a/services/item/tests/items/update-status.test.ts +++ b/services/item/tests/items/update-status.test.ts @@ -1,4 +1,5 @@ import { publishItemUpdatedEvent } from "@/events/publish-item-updated-event"; +import { publishTransactionAutoCompletedEvent } from "@/events/publish-transaction-auto-completed-event"; import { ItemStatus, ItemType, type Item } from "@/types"; import { itemsCollection, transactionsCollection } from "@/utils/db"; import { afterAll, describe, expect, it, mock } from "bun:test"; @@ -16,6 +17,9 @@ describe("for sale -> dealt", () => { mock.module("@/events/publish-item-updated-event", () => ({ publishItemUpdatedEvent: mock(), })); + mock.module("@/events/publish-transaction-auto-completed-event", () => ({ + publishTransactionAutoCompletedEvent: mock(), + })); const item: Item = { id: crypto.randomUUID(), @@ -68,6 +72,10 @@ describe("for sale -> dealt", () => { }), ).toEqual(1); expect(publishItemUpdatedEvent).toHaveBeenCalledTimes(1); + expect(publishTransactionAutoCompletedEvent).toHaveBeenCalledTimes(1); + expect(publishTransactionAutoCompletedEvent).toHaveBeenLastCalledWith( + expect.any(String), + ); }); it("fails if buyer is not given", async () => { diff --git a/services/item/tests/preload.ts b/services/item/tests/preload.ts index 7ff7c1f4..6f1f7867 100644 --- a/services/item/tests/preload.ts +++ b/services/item/tests/preload.ts @@ -8,6 +8,10 @@ mock.module("@/events/consume-account-deleted-event", () => ({ consumeAccountDeletedEvent: mock(), })); +mock.module("@/events/consume-transaction-auto-completed-event", () => ({ + consumeTransactionAutoCompletedEvent: mock(), +})); + mock.module("@/events/publish-item-updated-event", () => ({ publishItemUpdatedEvent: mock(), })); @@ -15,3 +19,7 @@ mock.module("@/events/publish-item-updated-event", () => ({ mock.module("@/events/publish-item-deleted-event", () => ({ publishItemDeletedEvent: mock(), })); + +mock.module("@/events/publish-transaction-auto-completed-event", () => ({ + publishTransactionAutoCompletedEvent: mock(), +})); diff --git a/services/rabbitmq/Dockerfile b/services/rabbitmq/Dockerfile new file mode 100644 index 00000000..5c86d285 --- /dev/null +++ b/services/rabbitmq/Dockerfile @@ -0,0 +1,11 @@ +FROM rabbitmq + +WORKDIR / + +RUN apt-get update && apt-get install -y curl && rm -rf /var/lib/apt/lists/* + +RUN curl -fsSL https://github.com/rabbitmq/rabbitmq-delayed-message-exchange/releases/download/v4.0.2/rabbitmq_delayed_message_exchange-4.0.2.ez -o $RABBITMQ_HOME/plugins/rabbitmq_delayed_message_exchange-4.0.2.ez + +RUN chown rabbitmq:rabbitmq $RABBITMQ_HOME/plugins/rabbitmq_delayed_message_exchange-4.0.2.ez + +RUN rabbitmq-plugins enable --offline rabbitmq_delayed_message_exchange