From 86aaf0cd9b96ffa4204d7799d38b59a6270e6220 Mon Sep 17 00:00:00 2001 From: Alexander Onnikov Date: Wed, 23 Oct 2024 17:49:39 +0700 Subject: [PATCH] fix: more datalake logs Signed-off-by: Alexander Onnikov --- server/datalake/src/client.ts | 95 ++++++++++++++++++++++++++--------- workers/datalake/src/blob.ts | 12 ++++- 2 files changed, 80 insertions(+), 27 deletions(-) diff --git a/server/datalake/src/client.ts b/server/datalake/src/client.ts index acb5031b7b..807c63145a 100644 --- a/server/datalake/src/client.ts +++ b/server/datalake/src/client.ts @@ -58,7 +58,14 @@ export class Client { async getObject (ctx: MeasureContext, workspace: WorkspaceId, objectName: string): Promise { const url = this.getObjectUrl(ctx, workspace, objectName) - const response = await fetchSafe(ctx, url) + + let response + try { + response = await fetchSafe(ctx, url) + } catch (err) { + console.error('failed to get object', { workspace, objectName, err }) + throw err + } if (response.body == null) { ctx.error('bad datalake response', { objectName }) @@ -80,7 +87,13 @@ export class Client { Range: length !== undefined ? `bytes=${offset}-${offset + length - 1}` : `bytes=${offset}` } - const response = await fetchSafe(ctx, url, { headers }) + let response + try { + response = await fetchSafe(ctx, url, { headers }) + } catch (err) { + console.error('failed to get partial object', { workspace, objectName, err }) + throw err + } if (response.body == null) { ctx.error('bad datalake response', { objectName }) @@ -97,7 +110,13 @@ export class Client { ): Promise { const url = this.getObjectUrl(ctx, workspace, objectName) - const response = await fetchSafe(ctx, url, { method: 'HEAD' }) + let response: Response + try { + response = await fetchSafe(ctx, url, { method: 'HEAD' }) + } catch (err) { + console.error('failed to stat object', { workspace, objectName, err }) + throw err + } const headers = response.headers const lastModified = Date.parse(headers.get('Last-Modified') ?? '') @@ -113,7 +132,12 @@ export class Client { async deleteObject (ctx: MeasureContext, workspace: WorkspaceId, objectName: string): Promise { const url = this.getObjectUrl(ctx, workspace, objectName) - await fetchSafe(ctx, url, { method: 'DELETE' }) + try { + await fetchSafe(ctx, url, { method: 'DELETE' }) + } catch (err) { + console.error('failed to delete object', { workspace, objectName, err }) + throw err + } } async putObject ( @@ -134,14 +158,20 @@ export class Client { ctx.warn('unknown object size', { workspace, objectName }) } } - if (size === undefined || size < 64 * 1024 * 1024) { - await ctx.with('direct-upload', {}, async (ctx) => { - await this.uploadWithFormData(ctx, workspace, objectName, stream, metadata) - }) - } else { - await ctx.with('signed-url-upload', {}, async (ctx) => { - await this.uploadWithSignedURL(ctx, workspace, objectName, stream, metadata) - }) + + try { + if (size === undefined || size < 64 * 1024 * 1024) { + await ctx.with('direct-upload', {}, async (ctx) => { + await this.uploadWithFormData(ctx, workspace, objectName, stream, metadata) + }) + } else { + await ctx.with('signed-url-upload', {}, async (ctx) => { + await this.uploadWithSignedURL(ctx, workspace, objectName, stream, metadata) + }) + } + } catch (err) { + console.error('failed to put object', { workspace, objectName, err }) + throw err } } @@ -170,14 +200,12 @@ export class Client { const result = (await response.json()) as BlobUploadResult[] if (result.length !== 1) { - ctx.error('bad datalake response', { workspace, objectName, result }) - throw new Error('Bad datalake response') + throw new Error('Bad datalake response: ' + result.toString()) } const uploadResult = result[0] if ('error' in uploadResult) { - ctx.error('error during blob upload', { workspace, objectName, error: uploadResult.error }) throw new Error('Upload failed: ' + uploadResult.error) } } @@ -201,26 +229,43 @@ export class Client { 'x-amz-meta-last-modified': metadata.lastModified.toString() } }) - await this.signObjectComplete(ctx, workspace, objectName) - } catch { + } catch (err) { await this.signObjectDelete(ctx, workspace, objectName) + throw new Error('Failed to upload via signed URL') } + + await this.signObjectComplete(ctx, workspace, objectName) } private async signObjectSign (ctx: MeasureContext, workspace: WorkspaceId, objectName: string): Promise { - const url = this.getSignObjectUrl(workspace, objectName) - const response = await fetchSafe(ctx, url, { method: 'POST' }) - return await response.text() + try { + const url = this.getSignObjectUrl(workspace, objectName) + const response = await fetchSafe(ctx, url, { method: 'POST' }) + return await response.text() + } catch (err: any) { + ctx.error('failed to sign object', { workspace, objectName, err }) + throw new Error('Failed to sign URL') + } } private async signObjectComplete (ctx: MeasureContext, workspace: WorkspaceId, objectName: string): Promise { - const url = this.getSignObjectUrl(workspace, objectName) - await fetchSafe(ctx, url, { method: 'PUT' }) + try { + const url = this.getSignObjectUrl(workspace, objectName) + await fetchSafe(ctx, url, { method: 'PUT' }) + } catch (err: any) { + ctx.error('failed to complete signed url upload', { workspace, objectName, err }) + throw new Error('Failed to complete signed URL upload') + } } private async signObjectDelete (ctx: MeasureContext, workspace: WorkspaceId, objectName: string): Promise { - const url = this.getSignObjectUrl(workspace, objectName) - await fetchSafe(ctx, url, { method: 'DELETE' }) + try { + const url = this.getSignObjectUrl(workspace, objectName) + await fetchSafe(ctx, url, { method: 'DELETE' }) + } catch (err: any) { + ctx.error('failed to abort signed url upload', { workspace, objectName, err }) + throw new Error('Failed to abort signed URL upload') + } } private getSignObjectUrl (workspace: WorkspaceId, objectName: string): string { @@ -234,7 +279,7 @@ async function fetchSafe (ctx: MeasureContext, url: string, init?: RequestInit): try { response = await fetch(url, init) } catch (err: any) { - ctx.error('network error', { error: err }) + ctx.error('network error', { err }) throw new Error(`Network error ${err}`) } diff --git a/workers/datalake/src/blob.ts b/workers/datalake/src/blob.ts index e4478e8bfa..7b79a184a0 100644 --- a/workers/datalake/src/blob.ts +++ b/workers/datalake/src/blob.ts @@ -124,11 +124,19 @@ export async function postBlobFormData (request: Request, env: Env, workspace: s const contentType = request.headers.get('Content-Type') if (contentType === null || !contentType.includes('multipart/form-data')) { console.error({ error: 'expected multipart/form-data' }) - return error(400, 'Expected multipart/form-data') + return error(400, 'expected multipart/form-data') } const sql = postgres(env.HYPERDRIVE.connectionString) - const formData = await request.formData() + + let formData: FormData + try { + formData = await request.formData() + } catch (err: any) { + const message = err instanceof Error ? err.message : String(err) + console.error({ error: 'failed to parse form data', message }) + return error(400, 'failed to parse form data') + } const files: [File, key: string][] = [] formData.forEach((value: any, key: string) => {