Skip to content

Commit

Permalink
fix: more datalake logs (#7019)
Browse files Browse the repository at this point in the history
Signed-off-by: Alexander Onnikov <[email protected]>
  • Loading branch information
aonnikov authored and haiodo committed Oct 25, 2024
1 parent f3f6ce6 commit 8d217bf
Show file tree
Hide file tree
Showing 2 changed files with 80 additions and 27 deletions.
95 changes: 70 additions & 25 deletions server/datalake/src/client.ts
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,14 @@ export class Client {

async getObject (ctx: MeasureContext, workspace: WorkspaceId, objectName: string): Promise<Readable> {
const url = this.getObjectUrl(ctx, workspace, objectName)
const response = await fetchSafe(ctx, url)

let response
try {
response = await fetchSafe(ctx, url)
} catch (err) {
console.error('failed to get object', { workspace, objectName, err })
throw err
}

if (response.body == null) {
ctx.error('bad datalake response', { objectName })
Expand All @@ -80,7 +87,13 @@ export class Client {
Range: length !== undefined ? `bytes=${offset}-${offset + length - 1}` : `bytes=${offset}`
}

const response = await fetchSafe(ctx, url, { headers })
let response
try {
response = await fetchSafe(ctx, url, { headers })
} catch (err) {
console.error('failed to get partial object', { workspace, objectName, err })
throw err
}

if (response.body == null) {
ctx.error('bad datalake response', { objectName })
Expand All @@ -97,7 +110,13 @@ export class Client {
): Promise<StatObjectOutput | undefined> {
const url = this.getObjectUrl(ctx, workspace, objectName)

const response = await fetchSafe(ctx, url, { method: 'HEAD' })
let response: Response
try {
response = await fetchSafe(ctx, url, { method: 'HEAD' })
} catch (err) {
console.error('failed to stat object', { workspace, objectName, err })
throw err
}

const headers = response.headers
const lastModified = Date.parse(headers.get('Last-Modified') ?? '')
Expand All @@ -113,7 +132,12 @@ export class Client {

async deleteObject (ctx: MeasureContext, workspace: WorkspaceId, objectName: string): Promise<void> {
const url = this.getObjectUrl(ctx, workspace, objectName)
await fetchSafe(ctx, url, { method: 'DELETE' })
try {
await fetchSafe(ctx, url, { method: 'DELETE' })
} catch (err) {
console.error('failed to delete object', { workspace, objectName, err })
throw err
}
}

async putObject (
Expand All @@ -134,14 +158,20 @@ export class Client {
ctx.warn('unknown object size', { workspace, objectName })
}
}
if (size === undefined || size < 64 * 1024 * 1024) {
await ctx.with('direct-upload', {}, async (ctx) => {
await this.uploadWithFormData(ctx, workspace, objectName, stream, metadata)
})
} else {
await ctx.with('signed-url-upload', {}, async (ctx) => {
await this.uploadWithSignedURL(ctx, workspace, objectName, stream, metadata)
})

try {
if (size === undefined || size < 64 * 1024 * 1024) {
await ctx.with('direct-upload', {}, async (ctx) => {
await this.uploadWithFormData(ctx, workspace, objectName, stream, metadata)
})
} else {
await ctx.with('signed-url-upload', {}, async (ctx) => {
await this.uploadWithSignedURL(ctx, workspace, objectName, stream, metadata)
})
}
} catch (err) {
console.error('failed to put object', { workspace, objectName, err })
throw err
}
}

Expand Down Expand Up @@ -170,14 +200,12 @@ export class Client {

const result = (await response.json()) as BlobUploadResult[]
if (result.length !== 1) {
ctx.error('bad datalake response', { workspace, objectName, result })
throw new Error('Bad datalake response')
throw new Error('Bad datalake response: ' + result.toString())
}

const uploadResult = result[0]

if ('error' in uploadResult) {
ctx.error('error during blob upload', { workspace, objectName, error: uploadResult.error })
throw new Error('Upload failed: ' + uploadResult.error)
}
}
Expand All @@ -201,26 +229,43 @@ export class Client {
'x-amz-meta-last-modified': metadata.lastModified.toString()
}
})
await this.signObjectComplete(ctx, workspace, objectName)
} catch {
} catch (err) {
await this.signObjectDelete(ctx, workspace, objectName)
throw new Error('Failed to upload via signed URL')
}

await this.signObjectComplete(ctx, workspace, objectName)
}

private async signObjectSign (ctx: MeasureContext, workspace: WorkspaceId, objectName: string): Promise<string> {
const url = this.getSignObjectUrl(workspace, objectName)
const response = await fetchSafe(ctx, url, { method: 'POST' })
return await response.text()
try {
const url = this.getSignObjectUrl(workspace, objectName)
const response = await fetchSafe(ctx, url, { method: 'POST' })
return await response.text()
} catch (err: any) {
ctx.error('failed to sign object', { workspace, objectName, err })
throw new Error('Failed to sign URL')
}
}

private async signObjectComplete (ctx: MeasureContext, workspace: WorkspaceId, objectName: string): Promise<void> {
const url = this.getSignObjectUrl(workspace, objectName)
await fetchSafe(ctx, url, { method: 'PUT' })
try {
const url = this.getSignObjectUrl(workspace, objectName)
await fetchSafe(ctx, url, { method: 'PUT' })
} catch (err: any) {
ctx.error('failed to complete signed url upload', { workspace, objectName, err })
throw new Error('Failed to complete signed URL upload')
}
}

private async signObjectDelete (ctx: MeasureContext, workspace: WorkspaceId, objectName: string): Promise<void> {
const url = this.getSignObjectUrl(workspace, objectName)
await fetchSafe(ctx, url, { method: 'DELETE' })
try {
const url = this.getSignObjectUrl(workspace, objectName)
await fetchSafe(ctx, url, { method: 'DELETE' })
} catch (err: any) {
ctx.error('failed to abort signed url upload', { workspace, objectName, err })
throw new Error('Failed to abort signed URL upload')
}
}

private getSignObjectUrl (workspace: WorkspaceId, objectName: string): string {
Expand All @@ -234,7 +279,7 @@ async function fetchSafe (ctx: MeasureContext, url: string, init?: RequestInit):
try {
response = await fetch(url, init)
} catch (err: any) {
ctx.error('network error', { error: err })
ctx.error('network error', { err })
throw new Error(`Network error ${err}`)
}

Expand Down
12 changes: 10 additions & 2 deletions workers/datalake/src/blob.ts
Original file line number Diff line number Diff line change
Expand Up @@ -124,11 +124,19 @@ export async function postBlobFormData (request: Request, env: Env, workspace: s
const contentType = request.headers.get('Content-Type')
if (contentType === null || !contentType.includes('multipart/form-data')) {
console.error({ error: 'expected multipart/form-data' })
return error(400, 'Expected multipart/form-data')
return error(400, 'expected multipart/form-data')
}

const sql = postgres(env.HYPERDRIVE.connectionString)
const formData = await request.formData()

let formData: FormData
try {
formData = await request.formData()
} catch (err: any) {
const message = err instanceof Error ? err.message : String(err)
console.error({ error: 'failed to parse form data', message })
return error(400, 'failed to parse form data')
}

const files: [File, key: string][] = []
formData.forEach((value: any, key: string) => {
Expand Down

0 comments on commit 8d217bf

Please sign in to comment.