Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: more datalake logs #7019

Merged
merged 1 commit into from
Oct 23, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
95 changes: 70 additions & 25 deletions server/datalake/src/client.ts
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,14 @@ export class Client {

async getObject (ctx: MeasureContext, workspace: WorkspaceId, objectName: string): Promise<Readable> {
const url = this.getObjectUrl(ctx, workspace, objectName)
const response = await fetchSafe(ctx, url)

let response
try {
response = await fetchSafe(ctx, url)
} catch (err) {
console.error('failed to get object', { workspace, objectName, err })
throw err
}

if (response.body == null) {
ctx.error('bad datalake response', { objectName })
Expand All @@ -80,7 +87,13 @@ export class Client {
Range: length !== undefined ? `bytes=${offset}-${offset + length - 1}` : `bytes=${offset}`
}

const response = await fetchSafe(ctx, url, { headers })
let response
try {
response = await fetchSafe(ctx, url, { headers })
} catch (err) {
console.error('failed to get partial object', { workspace, objectName, err })
throw err
}

if (response.body == null) {
ctx.error('bad datalake response', { objectName })
Expand All @@ -97,7 +110,13 @@ export class Client {
): Promise<StatObjectOutput | undefined> {
const url = this.getObjectUrl(ctx, workspace, objectName)

const response = await fetchSafe(ctx, url, { method: 'HEAD' })
let response: Response
try {
response = await fetchSafe(ctx, url, { method: 'HEAD' })
} catch (err) {
console.error('failed to stat object', { workspace, objectName, err })
throw err
}

const headers = response.headers
const lastModified = Date.parse(headers.get('Last-Modified') ?? '')
Expand All @@ -113,7 +132,12 @@ export class Client {

async deleteObject (ctx: MeasureContext, workspace: WorkspaceId, objectName: string): Promise<void> {
const url = this.getObjectUrl(ctx, workspace, objectName)
await fetchSafe(ctx, url, { method: 'DELETE' })
try {
await fetchSafe(ctx, url, { method: 'DELETE' })
} catch (err) {
console.error('failed to delete object', { workspace, objectName, err })
throw err
}
}

async putObject (
Expand All @@ -134,14 +158,20 @@ export class Client {
ctx.warn('unknown object size', { workspace, objectName })
}
}
if (size === undefined || size < 64 * 1024 * 1024) {
await ctx.with('direct-upload', {}, async (ctx) => {
await this.uploadWithFormData(ctx, workspace, objectName, stream, metadata)
})
} else {
await ctx.with('signed-url-upload', {}, async (ctx) => {
await this.uploadWithSignedURL(ctx, workspace, objectName, stream, metadata)
})

try {
if (size === undefined || size < 64 * 1024 * 1024) {
await ctx.with('direct-upload', {}, async (ctx) => {
await this.uploadWithFormData(ctx, workspace, objectName, stream, metadata)
})
} else {
await ctx.with('signed-url-upload', {}, async (ctx) => {
await this.uploadWithSignedURL(ctx, workspace, objectName, stream, metadata)
})
}
} catch (err) {
console.error('failed to put object', { workspace, objectName, err })
throw err
}
}

Expand Down Expand Up @@ -170,14 +200,12 @@ export class Client {

const result = (await response.json()) as BlobUploadResult[]
if (result.length !== 1) {
ctx.error('bad datalake response', { workspace, objectName, result })
throw new Error('Bad datalake response')
throw new Error('Bad datalake response: ' + result.toString())
}

const uploadResult = result[0]

if ('error' in uploadResult) {
ctx.error('error during blob upload', { workspace, objectName, error: uploadResult.error })
throw new Error('Upload failed: ' + uploadResult.error)
}
}
Expand All @@ -201,26 +229,43 @@ export class Client {
'x-amz-meta-last-modified': metadata.lastModified.toString()
}
})
await this.signObjectComplete(ctx, workspace, objectName)
} catch {
} catch (err) {
await this.signObjectDelete(ctx, workspace, objectName)
throw new Error('Failed to upload via signed URL')
}

await this.signObjectComplete(ctx, workspace, objectName)
}

private async signObjectSign (ctx: MeasureContext, workspace: WorkspaceId, objectName: string): Promise<string> {
const url = this.getSignObjectUrl(workspace, objectName)
const response = await fetchSafe(ctx, url, { method: 'POST' })
return await response.text()
try {
const url = this.getSignObjectUrl(workspace, objectName)
const response = await fetchSafe(ctx, url, { method: 'POST' })
return await response.text()
} catch (err: any) {
ctx.error('failed to sign object', { workspace, objectName, err })
throw new Error('Failed to sign URL')
}
}

private async signObjectComplete (ctx: MeasureContext, workspace: WorkspaceId, objectName: string): Promise<void> {
const url = this.getSignObjectUrl(workspace, objectName)
await fetchSafe(ctx, url, { method: 'PUT' })
try {
const url = this.getSignObjectUrl(workspace, objectName)
await fetchSafe(ctx, url, { method: 'PUT' })
} catch (err: any) {
ctx.error('failed to complete signed url upload', { workspace, objectName, err })
throw new Error('Failed to complete signed URL upload')
}
}

private async signObjectDelete (ctx: MeasureContext, workspace: WorkspaceId, objectName: string): Promise<void> {
const url = this.getSignObjectUrl(workspace, objectName)
await fetchSafe(ctx, url, { method: 'DELETE' })
try {
const url = this.getSignObjectUrl(workspace, objectName)
await fetchSafe(ctx, url, { method: 'DELETE' })
} catch (err: any) {
ctx.error('failed to abort signed url upload', { workspace, objectName, err })
throw new Error('Failed to abort signed URL upload')
}
}

private getSignObjectUrl (workspace: WorkspaceId, objectName: string): string {
Expand All @@ -234,7 +279,7 @@ async function fetchSafe (ctx: MeasureContext, url: string, init?: RequestInit):
try {
response = await fetch(url, init)
} catch (err: any) {
ctx.error('network error', { error: err })
ctx.error('network error', { err })
throw new Error(`Network error ${err}`)
}

Expand Down
12 changes: 10 additions & 2 deletions workers/datalake/src/blob.ts
Original file line number Diff line number Diff line change
Expand Up @@ -124,11 +124,19 @@ export async function postBlobFormData (request: Request, env: Env, workspace: s
const contentType = request.headers.get('Content-Type')
if (contentType === null || !contentType.includes('multipart/form-data')) {
console.error({ error: 'expected multipart/form-data' })
return error(400, 'Expected multipart/form-data')
return error(400, 'expected multipart/form-data')
}

const sql = postgres(env.HYPERDRIVE.connectionString)
const formData = await request.formData()

let formData: FormData
try {
formData = await request.formData()
} catch (err: any) {
const message = err instanceof Error ? err.message : String(err)
console.error({ error: 'failed to parse form data', message })
return error(400, 'failed to parse form data')
}

const files: [File, key: string][] = []
formData.forEach((value: any, key: string) => {
Expand Down