Skip to content

Commit

Permalink
Added optimistic check for deletes
Browse files Browse the repository at this point in the history
  • Loading branch information
oskardudycz committed Sep 27, 2024
1 parent d12912b commit b9b1e59
Show file tree
Hide file tree
Showing 3 changed files with 156 additions and 34 deletions.
76 changes: 58 additions & 18 deletions src/packages/pongo/src/core/collection/pongoCollection.ts
Original file line number Diff line number Diff line change
Expand Up @@ -80,8 +80,13 @@ export const pongoCollection = <
schema,
}: PongoCollectionOptions<ConnectorType>): PongoCollection<T> => {
const sqlExecutor = pool.execute;
const command = async (sql: SQL, options?: CollectionOperationOptions) =>
(await transactionExecutorOrDefault(db, options, sqlExecutor)).command(sql);
const command = async <Result extends QueryResultRow = QueryResultRow>(
sql: SQL,
options?: CollectionOperationOptions,
) =>
(
await transactionExecutorOrDefault(db, options, sqlExecutor)
).command<Result>(sql);

const query = async <T extends QueryResultRow>(
sql: SQL,
Expand Down Expand Up @@ -165,13 +170,17 @@ export const pongoCollection = <
): Promise<PongoUpdateResult> => {
await ensureCollectionCreated(options);

const result = await command(
const result = await command<UpdateSqlResult>(
SqlFor.updateOne(filter, update, options),
options,
);
return result.rowCount
? { acknowledged: true, modifiedCount: result.rowCount }
: { acknowledged: false, modifiedCount: 0 };
? {
acknowledged: result.rows[0]!.modified! > 0,
modifiedCount: result.rows[0]!.modified!,
matchedCount: result.rows[0]!.matched!,
}
: { acknowledged: false, modifiedCount: 0, matchedCount: 0 };
},
upsertOne: async (
filter: PongoFilter<T>,
Expand All @@ -180,13 +189,17 @@ export const pongoCollection = <
): Promise<PongoUpdateResult> => {
await ensureCollectionCreated(options);

const result = await command(
const result = await command<UpdateSqlResult>(
SqlFor.upsertOne(filter, update, options),
options,
);
return result.rowCount
? { acknowledged: true, modifiedCount: result.rowCount }
: { acknowledged: false, modifiedCount: 0 };
? {
acknowledged: true,
modifiedCount: result.rowCount,
matchedCount: result.rowCount,
}
: { acknowledged: false, modifiedCount: 0, matchedCount: 0 };
},
replaceOne: async (
filter: PongoFilter<T>,
Expand All @@ -195,13 +208,17 @@ export const pongoCollection = <
): Promise<PongoUpdateResult> => {
await ensureCollectionCreated(options);

const result = await command(
const result = await command<UpdateSqlResult>(
SqlFor.replaceOne(filter, document, options),
options,
);
return result.rowCount
? { acknowledged: true, modifiedCount: result.rowCount }
: { acknowledged: false, modifiedCount: 0 };
? {
acknowledged: result.rows[0]!.modified! > 0,
modifiedCount: result.rows[0]!.modified!,
matchedCount: result.rows[0]!.matched!,
}
: { acknowledged: false, modifiedCount: 0, matchedCount: 0 };
},
updateMany: async (
filter: PongoFilter<T>,
Expand All @@ -212,22 +229,30 @@ export const pongoCollection = <

const result = await command(SqlFor.updateMany(filter, update), options);
return result.rowCount
? { acknowledged: true, modifiedCount: result.rowCount }
: { acknowledged: false, modifiedCount: 0 };
? {
acknowledged: true,
modifiedCount: result.rowCount,
matchedCount: result.rowCount,
}
: { acknowledged: false, modifiedCount: 0, matchedCount: 0 };
},
deleteOne: async (
filter?: PongoFilter<T>,
options?: DeleteOneOptions,
): Promise<PongoDeleteResult> => {
await ensureCollectionCreated(options);

const result = await command(
const result = await command<DeleteSqlResult>(
SqlFor.deleteOne(filter ?? {}, options),
options,
);
return result.rowCount
? { acknowledged: true, deletedCount: result.rowCount }
: { acknowledged: false, deletedCount: 0 };
? {
acknowledged: result.rows[0]!.deleted! > 0,
deletedCount: result.rows[0]!.deleted!,
matchedCount: result.rowCount,
}
: { acknowledged: false, deletedCount: 0, matchedCount: 0 };
},
deleteMany: async (
filter?: PongoFilter<T>,
Expand All @@ -236,9 +261,14 @@ export const pongoCollection = <
await ensureCollectionCreated(options);

const result = await command(SqlFor.deleteMany(filter ?? {}), options);

return result.rowCount
? { acknowledged: true, deletedCount: result.rowCount }
: { acknowledged: false, deletedCount: 0 };
? {
acknowledged: true,
deletedCount: result.rowCount,
matchedCount: result.rowCount,
}
: { acknowledged: false, deletedCount: 0, matchedCount: 0 };
},
findOne: async (
filter?: PongoFilter<T>,
Expand Down Expand Up @@ -405,3 +435,13 @@ export type PongoCollectionSQLBuilder = {
rename: (newName: string) => SQL;
drop: () => SQL;
};

type UpdateSqlResult = {
matched: number | null;
modified: number | null;
};

type DeleteSqlResult = {
matched: number | null;
deleted: number | null;
};
2 changes: 2 additions & 0 deletions src/packages/pongo/src/core/typing/operations.ts
Original file line number Diff line number Diff line change
Expand Up @@ -394,6 +394,7 @@ export interface PongoInsertManyResult {

export interface PongoUpdateResult {
acknowledged: boolean;
matchedCount: number;
modifiedCount: number;
}

Expand All @@ -404,6 +405,7 @@ export interface PongoUpdateManyResult {

export interface PongoDeleteResult {
acknowledged: boolean;
matchedCount: number;
deletedCount: number;
}

Expand Down
112 changes: 96 additions & 16 deletions src/packages/pongo/src/postgres/sqlBuilder/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -91,15 +91,15 @@ export const postgresSQLBuilder = (
`WITH cte AS (
SELECT
_id,
1 as matched,
CASE WHEN _version = %L THEN 1 ELSE 0 END AS version_matched,
CASE WHEN _version = %L THEN 1 ELSE 0 END AS matched,
1 as modified,
FROM %I %s LIMIT 1
)
UPDATE %I
SET data = %s
FROM cte
WHERE %I._id = cte._id AND %I._version = %L
RETURNING cte.matched, cte.version_matched;`,
RETURNING cte.matched, cte.modified;`,
expectedVersion,
collectionName,
where(filterQuery),
Expand All @@ -113,14 +113,14 @@ export const postgresSQLBuilder = (
SELECT
_id,
1 as matched,
1 as version_matched
1 as modified
FROM %I %s LIMIT 1
)
UPDATE %I
SET data = %s
FROM cte
WHERE %I._id = cte._id
RETURNING cte.matched, cte.version_matched;`,
RETURNING cte.matched, cte.modified;`,
collectionName,
where(filterQuery),
collectionName,
Expand Down Expand Up @@ -160,21 +160,60 @@ export const postgresSQLBuilder = (
document: WithoutId<T>,
options?: ReplaceOneOptions,
): SQL => {
const expectedVersionUpdate = options?.expectedVersion
? { _version: expectedVersionValue(options.expectedVersion) }
const expectedVersion = options?.expectedVersion
? expectedVersionValue(options.expectedVersion)
: null;

const expectedVersionUpdate = expectedVersion
? { _version: expectedVersionValue(expectedVersion) }
: {};

const filterQuery = constructFilterQuery<T>({
...expectedVersionUpdate,
...filter,
});

return sql(
`UPDATE %I SET data = %L || jsonb_build_object('_id', data->>'_id') %s;`,
collectionName,
JSON.stringify(document),
where(filterQuery),
);
return expectedVersion
? sql(
`WITH cte AS (
SELECT
_id,
CASE WHEN _version = %L THEN 1 ELSE 0 END AS matched,
1 as modified,
FROM %I %s LIMIT 1
)
UPDATE %I
SET data = %L || jsonb_build_object('_id', data->>'_id')
FROM cte
WHERE %I._id = cte._id AND %I._version = %L
RETURNING cte.matched, cte.modified;`,
expectedVersion,
collectionName,
where(filterQuery),
collectionName,
JSON.stringify(document),
collectionName,
expectedVersion,
)
: sql(
`WITH cte AS (
SELECT
_id,
1 as matched,
1 as modified
FROM %I %s LIMIT 1
)
UPDATE %I
SET data = %L || jsonb_build_object('_id', data->>'_id')
FROM cte
WHERE %I._id = cte._id
RETURNING cte.matched, cte.modified;`,
collectionName,
where(filterQuery),
collectionName,
JSON.stringify(document),
collectionName,
);
},
updateMany: <T>(filter: PongoFilter<T>, update: PongoUpdate<T>): SQL => {
const filterQuery = constructFilterQuery(filter);
Expand All @@ -188,16 +227,57 @@ export const postgresSQLBuilder = (
);
},
deleteOne: <T>(filter: PongoFilter<T>, options?: DeleteOneOptions): SQL => {
const expectedVersionUpdate = options?.expectedVersion
? { _version: expectedVersionValue(options.expectedVersion) }
const expectedVersion = options?.expectedVersion
? expectedVersionValue(options.expectedVersion)
: null;

const expectedVersionUpdate = expectedVersion
? { _version: expectedVersionValue(expectedVersion) }
: {};

const filterQuery = constructFilterQuery<T>({
...expectedVersionUpdate,
...filter,
});

return sql('DELETE FROM %I %s;', collectionName, where(filterQuery));
return expectedVersion
? sql(
`WITH cte AS (
SELECT
_id,
CASE WHEN _version = %L THEN 1 ELSE 0 END AS matched,
1 as deleted,
FROM %I %s LIMIT 1
)
DELETE FROM %I
USING cte
WHERE %I._id = cte._id AND %I._version = %L
RETURNING cte.matched, cte.deleted;`,
expectedVersion,
collectionName,
where(filterQuery),
collectionName,
collectionName,
collectionName,
expectedVersion,
)
: sql(
`WITH cte AS (
SELECT
_id,
1 as matched,
1 as deleted
FROM %I %s LIMIT 1
)
DELETE FROM %I
USING cte
WHERE %I._id = cte._id
RETURNING cte.matched, cte.deleted;`,
collectionName,
where(filterQuery),
collectionName,
collectionName,
);
},
deleteMany: <T>(filter: PongoFilter<T>): SQL => {
const filterQuery = constructFilterQuery(filter);
Expand Down

0 comments on commit b9b1e59

Please sign in to comment.