Skip to content

Commit

Permalink
Workspace Deletion
Browse files Browse the repository at this point in the history
  • Loading branch information
spugachev committed Oct 10, 2023
1 parent be5d438 commit 9c148e9
Show file tree
Hide file tree
Showing 16 changed files with 378 additions and 60 deletions.
5 changes: 2 additions & 3 deletions lib/chatbot-api/functions/api-handler/routes/documents.py
Original file line number Diff line number Diff line change
Expand Up @@ -67,8 +67,7 @@ def file_upload(workspace_id: str):
if extension not in allowed_extensions:
raise genai_core.types.CommonError("Invalid file extension")

result = genai_core.upload.generate_presigned_post(
workspace_id, request.fileName)
result = genai_core.upload.generate_presigned_post(workspace_id, request.fileName)

return {"ok": True, "data": result}

Expand Down Expand Up @@ -147,7 +146,7 @@ def add_document(workspace_id: str, document_type: str):
crawler_properties={
"follow_links": request.followLinks,
"limit": request.limit,
}
},
)

return {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ def lambda_handler(event, context: LambdaContext):
workspace_id = event["workspace_id"]
document_id = event["document_id"]
response = s3_client.get_object(Bucket=bucket_name, Key=object_key)
file_content = response['Body'].read().decode('utf-8')
file_content = response["Body"].read().decode("utf-8")
data = json.loads(file_content)

iteration = data["iteration"]
Expand All @@ -31,8 +31,7 @@ def lambda_handler(event, context: LambdaContext):
follow_links = data["follow_links"]
limit = data["limit"]

logger.info(
f"Processing document {document_id} in workspace {workspace_id}")
logger.info(f"Processing document {document_id} in workspace {workspace_id}")
logger.info(f"Workspace: {workspace}")
logger.info(f"Document: {document}")
logger.info(f"Limit: {limit}")
Expand All @@ -55,7 +54,9 @@ def lambda_handler(event, context: LambdaContext):
result["iteration"] = iteration
result["crawler_job_id"] = crawler_job_id

iteration_object_key = f"{workspace_id}/{document_id}/crawler/{crawler_job_id}/{iteration}.json"
iteration_object_key = (
f"{workspace_id}/{document_id}/crawler/{crawler_job_id}/{iteration}.json"
)
s3_client.put_object(
Bucket=PROCESSING_BUCKET_NAME,
Key=iteration_object_key,
Expand Down
3 changes: 2 additions & 1 deletion lib/rag-engines/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -91,9 +91,10 @@ export class RagEngines extends Construct {
const workspaces = new Workspaces(this, "Workspaces", {
shared: props.shared,
config: props.config,
dataImport,
ragDynamoDBTables: tables,
auroraPgVector: auroraPgVector ?? undefined,
openSearch: openSearchVector ?? undefined,
openSearchVector: openSearchVector ?? undefined,
kendraRetrieval: kendraRetrieval ?? undefined,
});

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,21 +3,24 @@ import * as cdk from "aws-cdk-lib";
import { Construct } from "constructs";
import { SystemConfig } from "../../shared/types";
import { Shared } from "../../shared";
import { RagDynamoDBTables } from "../rag-dynamodb-tables";
import { OpenSearchVector } from "../opensearch-vector";
import { KendraRetrieval } from "../kendra-retrieval";
import { AuroraPgVector } from "../aurora-pgvector";
import { DataImport } from "../data-import";
import { RagDynamoDBTables } from "../rag-dynamodb-tables";
import * as sfn from "aws-cdk-lib/aws-stepfunctions";
import * as tasks from "aws-cdk-lib/aws-stepfunctions-tasks";
import * as lambda from "aws-cdk-lib/aws-lambda";
import * as logs from "aws-cdk-lib/aws-logs";
import * as iam from "aws-cdk-lib/aws-iam";

export interface DeleteWorkspaceProps {
readonly config: SystemConfig;
readonly shared: Shared;
readonly dataImport: DataImport;
readonly ragDynamoDBTables: RagDynamoDBTables;
readonly auroraPgVector?: AuroraPgVector;
readonly openSearch?: OpenSearchVector;
readonly openSearchVector?: OpenSearchVector;
readonly kendraRetrieval?: KendraRetrieval;
}

Expand All @@ -43,16 +46,26 @@ export class DeleteWorkspace extends Construct {
props.shared.commonLayer.layer,
props.shared.pythonSDKLayer,
],
timeout: cdk.Duration.minutes(5),
timeout: cdk.Duration.minutes(15),
logRetention: logs.RetentionDays.ONE_WEEK,
environment: {
...props.shared.defaultEnvironmentVariables,
AURORA_DB_SECRET_ID: props.auroraPgVector?.database.secret
?.secretArn as string,
UPLOAD_BUCKET_NAME: props.dataImport.uploadBucket.bucketName,
PROCESSING_BUCKET_NAME: props.dataImport.processingBucket.bucketName,
WORKSPACES_TABLE_NAME:
props.ragDynamoDBTables.workspacesTable.tableName,
WORKSPACES_BY_OBJECT_TYPE_INDEX_NAME:
props.ragDynamoDBTables.workspacesByObjectTypeIndexName,
DOCUMENTS_TABLE_NAME:
props.ragDynamoDBTables?.documentsTable.tableName ?? "",
DOCUMENTS_BY_COMPOUND_KEY_INDEX_NAME:
props.ragDynamoDBTables?.documentsByCompountKeyIndexName ?? "",
DEFAULT_KENDRA_S3_DATA_SOURCE_BUCKET_NAME:
props.kendraRetrieval?.kendraS3DataSourceBucket?.bucketName ?? "",
OPEN_SEARCH_COLLECTION_ENDPOINT:
props.openSearchVector?.openSearchCollectionEndpoint ?? "",
},
}
);
Expand All @@ -64,7 +77,37 @@ export class DeleteWorkspace extends Construct {
);
}

if (props.openSearchVector) {
deleteFunction.addToRolePolicy(
new iam.PolicyStatement({
actions: [
"aoss:APIAccessAll",
"aoss:DescribeIndex",
"aoss:DeleteIndex",
],
resources: [props.openSearchVector.openSearchCollection.attrArn],
})
);

props.openSearchVector.addToAccessPolicy(
"delete-workspace",
[deleteFunction.role?.roleArn],
[
"aoss:DeleteIndex",
"aoss:DescribeIndex",
"aoss:ReadDocument",
"aoss:WriteDocument",
]
);
}

props.dataImport.uploadBucket.grantReadWrite(deleteFunction);
props.dataImport.processingBucket.grantReadWrite(deleteFunction);
props.kendraRetrieval?.kendraS3DataSourceBucket?.grantReadWrite(
deleteFunction
);
props.ragDynamoDBTables.workspacesTable.grantReadWriteData(deleteFunction);
props.ragDynamoDBTables.documentsTable.grantReadWriteData(deleteFunction);

const handleError = new tasks.DynamoUpdateItem(this, "HandleError", {
table: props.ragDynamoDBTables.workspacesTable,
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,8 @@
import genai_core.types
import genai_core.workspaces
import genai_core.aurora.delete
import genai_core.opensearch.delete
import genai_core.kendra.delete
from aws_lambda_powertools import Logger
from aws_lambda_powertools.utilities.typing import LambdaContext

Expand All @@ -8,3 +12,15 @@
@logger.inject_lambda_context(log_event=True)
def lambda_handler(event, context: LambdaContext):
workspace_id = event["workspace_id"]
workspace = genai_core.workspaces.get_workspace(workspace_id)
if workspace is None:
raise genai_core.types.CommonError("Workspace not found")

if workspace["engine"] == "aurora":
genai_core.aurora.delete.delete_aurora_workspace(workspace)
elif workspace["engine"] == "opensearch":
genai_core.opensearch.delete.delete_open_search_workspace(workspace)
elif workspace["engine"] == "kendra":
genai_core.kendra.delete.delete_kendra_workspace(workspace)
else:
raise genai_core.types.CommonError("Workspace engine not supported")
11 changes: 7 additions & 4 deletions lib/rag-engines/workspaces/index.ts
Original file line number Diff line number Diff line change
@@ -1,19 +1,21 @@
import { Construct } from "constructs";
import { SystemConfig } from "../../shared/types";
import { Shared } from "../../shared";
import { DeleteWorkspace } from "./delete-wrokspace";
import { RagDynamoDBTables } from "../rag-dynamodb-tables";
import { DeleteWorkspace } from "./delete-workspace";
import { AuroraPgVector } from "../aurora-pgvector";
import { OpenSearchVector } from "../opensearch-vector";
import { KendraRetrieval } from "../kendra-retrieval";
import { DataImport } from "../data-import";
import { RagDynamoDBTables } from "../rag-dynamodb-tables";
import * as sfn from "aws-cdk-lib/aws-stepfunctions";

export interface WorkkspacesProps {
readonly config: SystemConfig;
readonly shared: Shared;
readonly dataImport: DataImport;
readonly ragDynamoDBTables: RagDynamoDBTables;
readonly auroraPgVector?: AuroraPgVector;
readonly openSearch?: OpenSearchVector;
readonly openSearchVector?: OpenSearchVector;
readonly kendraRetrieval?: KendraRetrieval;
}

Expand All @@ -26,9 +28,10 @@ export class Workspaces extends Construct {
const workflow = new DeleteWorkspace(this, "DeleteWorkspace", {
config: props.config,
shared: props.shared,
dataImport: props.dataImport,
ragDynamoDBTables: props.ragDynamoDBTables,
auroraPgVector: props.auroraPgVector,
openSearch: props.openSearch,
openSearchVector: props.openSearchVector,
kendraRetrieval: props.kendraRetrieval,
});

Expand Down
71 changes: 71 additions & 0 deletions lib/shared/layers/python-sdk/python/genai_core/aurora/delete.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
import os
import boto3
import genai_core.utils.delete_files_with_prefix
from psycopg2 import sql
from genai_core.aurora.connection import AuroraConnection

PROCESSING_BUCKET_NAME = os.environ["PROCESSING_BUCKET_NAME"]
UPLOAD_BUCKET_NAME = os.environ["UPLOAD_BUCKET_NAME"]
WORKSPACES_TABLE_NAME = os.environ["WORKSPACES_TABLE_NAME"]
DOCUMENTS_TABLE_NAME = os.environ.get("DOCUMENTS_TABLE_NAME")

WORKSPACE_OBJECT_TYPE = "workspace"

dynamodb = boto3.resource("dynamodb")


def delete_aurora_workspace(workspace: dict):
workspace_id = workspace["workspace_id"]
genai_core.utils.delete_files_with_prefix.delete_files_with_prefix(
UPLOAD_BUCKET_NAME, workspace_id
)
genai_core.utils.delete_files_with_prefix.delete_files_with_prefix(
PROCESSING_BUCKET_NAME, workspace_id
)

table_name = sql.Identifier(workspace_id.replace("-", ""))
with AuroraConnection(autocommit=False) as cursor:
cursor.execute(
sql.SQL("DROP TABLE IF EXISTS {table};").format(table=table_name)
)

workspaces_table = dynamodb.Table(WORKSPACES_TABLE_NAME)
documents_table = dynamodb.Table(DOCUMENTS_TABLE_NAME)

items_to_delete = []
last_evaluated_key = None
while True:
query_args = {
"KeyConditionExpression": boto3.dynamodb.conditions.Key("workspace_id").eq(
workspace_id
)
}

if last_evaluated_key:
query_args["ExclusiveStartKey"] = last_evaluated_key

response = documents_table.query(**query_args)
items_to_delete.extend(response["Items"])

last_evaluated_key = response.get("LastEvaluatedKey")
if not last_evaluated_key:
break

# Batch delete in groups of 25
for i in range(0, len(items_to_delete), 25):
with documents_table.batch_writer() as batch:
for item in items_to_delete[i : i + 25]:
batch.delete_item(
Key={
"workspace_id": item["workspace_id"],
"document_id": item["document_id"],
}
)

print(f"Deleted {len(items_to_delete)} items.")

response = workspaces_table.delete_item(
Key={"workspace_id": workspace_id, "object_type": WORKSPACE_OBJECT_TYPE},
)

print(f"Delete Item succeeded: {response}")
3 changes: 1 addition & 2 deletions lib/shared/layers/python-sdk/python/genai_core/chunks.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,8 +44,7 @@ def add_chunks(
)
chunk_ids = [uuid.uuid4() for _ in chunks]

store_chunks_on_s3(workspace_id, document_id,
document_sub_id, chunk_ids, chunks)
store_chunks_on_s3(workspace_id, document_id, document_sub_id, chunk_ids, chunks)

if engine == "aurora":
result = genai_core.aurora.chunks.add_chunks_aurora(
Expand Down
Loading

0 comments on commit 9c148e9

Please sign in to comment.