Skip to content

Commit

Permalink
fix: remove prefixes from listStream method parameters (#6480)
Browse files Browse the repository at this point in the history
Signed-off-by: Alexander Onnikov <[email protected]>
  • Loading branch information
aonnikov authored Sep 4, 2024
1 parent 3be2de0 commit 60444eb
Show file tree
Hide file tree
Showing 7 changed files with 15 additions and 42 deletions.
2 changes: 1 addition & 1 deletion models/core/src/migration.ts
Original file line number Diff line number Diff line change
Expand Up @@ -337,7 +337,7 @@ async function migrateBlobData (exAdapter: StorageAdapterEx, client: MigrationCl
if (!(await adapter.exists(ctx, client.workspaceId))) {
continue
}
const blobs = await adapter.listStream(ctx, client.workspaceId, '')
const blobs = await adapter.listStream(ctx, client.workspaceId)
const bulk = new Map<Ref<Blob>, Blob>()
try {
const push = async (force: boolean): Promise<void> => {
Expand Down
15 changes: 5 additions & 10 deletions packages/storage/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ export interface StorageAdapter {

listBuckets: (ctx: MeasureContext) => Promise<BucketInfo[]>
remove: (ctx: MeasureContext, workspaceId: WorkspaceId, objectNames: string[]) => Promise<void>
listStream: (ctx: MeasureContext, workspaceId: WorkspaceId, prefix?: string) => Promise<BlobStorageIterator>
listStream: (ctx: MeasureContext, workspaceId: WorkspaceId) => Promise<BlobStorageIterator>
stat: (ctx: MeasureContext, workspaceId: WorkspaceId, objectName: string) => Promise<Blob | undefined>
get: (ctx: MeasureContext, workspaceId: WorkspaceId, objectName: string) => Promise<Readable>
put: (
Expand Down Expand Up @@ -114,15 +114,11 @@ export class DummyStorageAdapter implements StorageAdapter, StorageAdapterEx {

async remove (ctx: MeasureContext, workspaceId: WorkspaceId, objectNames: string[]): Promise<void> {}

async list (ctx: MeasureContext, workspaceId: WorkspaceId, prefix?: string | undefined): Promise<ListBlobResult[]> {
async list (ctx: MeasureContext, workspaceId: WorkspaceId): Promise<ListBlobResult[]> {
return []
}

async listStream (
ctx: MeasureContext,
workspaceId: WorkspaceId,
prefix?: string | undefined
): Promise<BlobStorageIterator> {
async listStream (ctx: MeasureContext, workspaceId: WorkspaceId): Promise<BlobStorageIterator> {
return {
next: async (): Promise<ListBlobResult | undefined> => {
return undefined
Expand Down Expand Up @@ -203,11 +199,10 @@ export async function removeAllObjects (
export async function objectsToArray (
ctx: MeasureContext,
storage: StorageAdapter,
workspaceId: WorkspaceId,
prefix?: string
workspaceId: WorkspaceId
): Promise<ListBlobResult[]> {
// We need to list all files and delete them
const iterator = await storage.listStream(ctx, workspaceId, prefix)
const iterator = await storage.listStream(ctx, workspaceId)
const bulk: ListBlobResult[] = []
while (true) {
const obj = await iterator.next()
Expand Down
6 changes: 1 addition & 5 deletions server/core/src/__tests__/memAdapters.ts
Original file line number Diff line number Diff line change
Expand Up @@ -52,11 +52,7 @@ export class MemStorageAdapter implements StorageAdapter {
}
}

async listStream (
ctx: MeasureContext,
workspaceId: WorkspaceId,
prefix?: string | undefined
): Promise<BlobStorageIterator> {
async listStream (ctx: MeasureContext, workspaceId: WorkspaceId): Promise<BlobStorageIterator> {
const files = Array.from(this.files.values()).filter((it) => it.workspace === workspaceId.name)
return {
next: async () => {
Expand Down
10 changes: 2 additions & 8 deletions server/core/src/server/aggregator.ts
Original file line number Diff line number Diff line change
Expand Up @@ -224,14 +224,8 @@ export class AggregatorStorageAdapter implements StorageAdapter, StorageAdapterE
await this.dbAdapter.clean(ctx, workspaceId, DOMAIN_BLOB, objectNames as Ref<Blob>[])
}

async listStream (
ctx: MeasureContext,
workspaceId: WorkspaceId,
prefix?: string | undefined
): Promise<BlobStorageIterator> {
const data = await this.dbAdapter.findStream<Blob>(ctx, workspaceId, DOMAIN_BLOB, {
_id: { $regex: `${prefix ?? ''}.*` }
})
async listStream (ctx: MeasureContext, workspaceId: WorkspaceId): Promise<BlobStorageIterator> {
const data = await this.dbAdapter.findStream<Blob>(ctx, workspaceId, DOMAIN_BLOB, {})
return {
next: async (): Promise<ListBlobResult | undefined> => {
return await data.next()
Expand Down
6 changes: 1 addition & 5 deletions server/datalake/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -73,11 +73,7 @@ export class DatalakeService implements StorageAdapter {
}

@withContext('listStream')
async listStream (
ctx: MeasureContext,
workspaceId: WorkspaceId,
prefix?: string | undefined
): Promise<BlobStorageIterator> {
async listStream (ctx: MeasureContext, workspaceId: WorkspaceId): Promise<BlobStorageIterator> {
throw new Error('not supported')
}

Expand Down
8 changes: 2 additions & 6 deletions server/minio/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -182,11 +182,7 @@ export class MinioService implements StorageAdapter {
}

@withContext('listStream')
async listStream (
ctx: MeasureContext,
workspaceId: WorkspaceId,
prefix?: string | undefined
): Promise<BlobStorageIterator> {
async listStream (ctx: MeasureContext, workspaceId: WorkspaceId): Promise<BlobStorageIterator> {
let hasMore = true
let stream: BucketStream<BucketItem> | undefined
let done = false
Expand All @@ -199,7 +195,7 @@ export class MinioService implements StorageAdapter {
next: async (): Promise<ListBlobResult | undefined> => {
try {
if (stream === undefined && !done) {
const rprefix = rootPrefix !== undefined ? rootPrefix + (prefix ?? '') : prefix ?? ''
const rprefix = rootPrefix ?? ''
stream = this.client.listObjects(this.getBucketId(workspaceId), rprefix, true)
stream.on('end', () => {
stream?.destroy()
Expand Down
10 changes: 3 additions & 7 deletions server/s3/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -232,11 +232,7 @@ export class S3Service implements StorageAdapter {
}

@withContext('listStream')
async listStream (
ctx: MeasureContext,
workspaceId: WorkspaceId,
prefix?: string | undefined
): Promise<BlobStorageIterator> {
async listStream (ctx: MeasureContext, workspaceId: WorkspaceId): Promise<BlobStorageIterator> {
let hasMore = true
const buffer: ListBlobResult[] = []
let token: string | undefined
Expand All @@ -248,7 +244,7 @@ export class S3Service implements StorageAdapter {
if (hasMore && buffer.length === 0) {
const res = await this.client.listObjectsV2({
Bucket: this.getBucketId(workspaceId),
Prefix: rootPrefix !== undefined ? rootPrefix + (prefix ?? '') : prefix ?? '',
Prefix: rootPrefix ?? '',
ContinuationToken: token
})
if (res.IsTruncated === true) {
Expand All @@ -273,7 +269,7 @@ export class S3Service implements StorageAdapter {
}
}
} catch (err: any) {
ctx.error('Failed to get list', { error: err, workspaceId: workspaceId.name, prefix })
ctx.error('Failed to get list', { error: err, workspaceId: workspaceId.name })
}
if (buffer.length > 0) {
return buffer.shift()
Expand Down

0 comments on commit 60444eb

Please sign in to comment.