Skip to content

Commit

Permalink
fix: datalake fixes and perftest
Browse files Browse the repository at this point in the history
Signed-off-by: Alexander Onnikov <[email protected]>
  • Loading branch information
aonnikov committed Oct 22, 2024
1 parent c1fe925 commit 215ffa9
Show file tree
Hide file tree
Showing 5 changed files with 117 additions and 23 deletions.
11 changes: 6 additions & 5 deletions common/config/rush/pnpm-lock.yaml

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 2 additions & 1 deletion server/datalake/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,8 @@
"jest": "^29.7.0",
"ts-jest": "^29.1.1",
"@types/jest": "^29.5.5",
"@types/node-fetch": "~2.6.2"
"@types/node-fetch": "~2.6.2",
"ts-node": "^10.8.0"
},
"dependencies": {
"@hcengineering/core": "^0.6.32",
Expand Down
10 changes: 4 additions & 6 deletions server/datalake/src/client.ts
Original file line number Diff line number Diff line change
Expand Up @@ -49,11 +49,7 @@ type BlobUploadResult = BlobUploadSuccess | BlobUploadError

/** @public */
export class Client {
private readonly endpoint: string

constructor (host: string, port?: number) {
this.endpoint = port !== undefined ? `${host}:${port}` : host
}
constructor (private readonly endpoint: string) {}

getObjectUrl (ctx: MeasureContext, workspace: WorkspaceId, objectName: string): string {
const path = `/blob/${workspace.name}/${encodeURIComponent(objectName)}`
Expand Down Expand Up @@ -81,7 +77,9 @@ export class Client {
): Promise<Readable> {
const url = this.getObjectUrl(ctx, workspace, objectName)
const headers = {
Range: `bytes=${offset}-${length ?? ''}`
Range: length !== undefined
? `bytes=${offset}-${offset + length - 1}`
: `bytes=${offset}`
}

const response = await fetchSafe(ctx, url, { headers })
Expand Down
14 changes: 3 additions & 11 deletions server/datalake/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ export class DatalakeService implements StorageAdapter {
static config = 'datalake'
client: Client
constructor (readonly opt: DatalakeConfig) {
this.client = new Client(opt.endpoint, opt.port)
this.client = new Client(opt.endpoint)
}

async initialize (ctx: MeasureContext, workspaceId: WorkspaceId): Promise<void> {}
Expand Down Expand Up @@ -167,23 +167,15 @@ export class DatalakeService implements StorageAdapter {
}

export function processConfigFromEnv (storageConfig: StorageConfiguration): string | undefined {
let endpoint = process.env.DATALAKE_ENDPOINT
const endpoint = process.env.DATALAKE_ENDPOINT
if (endpoint === undefined) {
return 'DATALAKE_ENDPOINT'
}

let port = 80
const sp = endpoint.split(':')
if (sp.length > 1) {
endpoint = sp[0]
port = parseInt(sp[1])
}

const config: DatalakeConfig = {
kind: 'datalake',
name: 'datalake',
endpoint,
port
endpoint
}
storageConfig.storages.push(config)
storageConfig.default = 'datalake'
Expand Down
102 changes: 102 additions & 0 deletions server/datalake/src/perfTest.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,102 @@
import { MeasureMetricsContext, generateId } from '@hcengineering/core'
import type { StorageConfiguration } from '@hcengineering/server-core'
import { DatalakeService, processConfigFromEnv, type DatalakeConfig } from '.'

const MB = 1024 * 1024

const config: StorageConfiguration = { default: 'minio', storages: [] }
const minioConfigVar = processConfigFromEnv(config)
if (minioConfigVar !== undefined || config.storages[0] === undefined) {
console.error('No Datalake config env is configured:' + minioConfigVar)
it.skip('No Datalake config env is configured', async () => {})
process.exit(1)
}
const toolCtx = new MeasureMetricsContext('test', {})
const storageService = new DatalakeService({ ...(config.storages[0] as DatalakeConfig) })

async function doTest (): Promise<void> {
const genWorkspaceId1 = generateId()

const ws1 = { name: genWorkspaceId1 }
await storageService.make(toolCtx, ws1)

/// /////// Uploads
console.log('upload 1mb test')
let st1 = Date.now()
const sz = 10
const stream = Buffer.alloc(sz * 1024 * 1024)
for (let i = 0; i < 10; i++) {
// We need 1Mb random file to check upload speed.
const st = Date.now()
await storageService.put(toolCtx, ws1, `testObject.${i}`, stream, 'application/octet-stream', stream.length)
console.log('upload time', Date.now() - st)
}
let now = Date.now()
console.log(`upload performance: ${Math.round((sz * 10 * 1000 * 100) / (now - st1)) / 100} mb per second`)

/// // Downloads 1
console.log('download 1mb test')
st1 = Date.now()
for (let i = 0; i < 10; i++) {
// We need 1Mb random file to check upload speed.
const st = Date.now()
await storageService.read(toolCtx, ws1, `testObject.${i}`)
console.log('download time', Date.now() - st)
}

now = Date.now()
console.log(`download performance: ${Math.round((sz * 10 * 1000 * 100) / (now - st1)) / 100} mb per second`)

/// Downloads 2
st1 = Date.now()
for (let i = 0; i < 10; i++) {
// We need 1Mb random file to check upload speed.
const st = Date.now()
const readable = await storageService.get(toolCtx, ws1, `testObject.${i}`)
const chunks: Buffer[] = []
readable.on('data', (chunk) => {
chunks.push(chunk)
})
await new Promise<void>((resolve) => {
readable.on('end', () => {
resolve()
readable.destroy()
})
})
console.log('download time 2', Date.now() - st)
}

now = Date.now()
console.log(`download performance: ${Math.round((sz * 10 * 1000 * 100) / (now - st1)) / 100} mb per second`)

/// Downloads 3
console.log('download partial test')
st1 = Date.now()
for (let i = 0; i < 10; i++) {
// We need 1Mb random file to check upload speed.
const st = Date.now()
for (let i = 0; i < sz; i++) {
const readable = await storageService.partial(toolCtx, ws1, `testObject.${i}`, i * MB, MB)
const chunks: Buffer[] = []
readable.on('data', (chunk) => {
chunks.push(chunk)
})
await new Promise<void>((resolve) => {
readable.on('end', () => {
resolve()
readable.destroy()
})
})
}
console.log('download time 2', Date.now() - st)
}

now = Date.now()
console.log(`download performance: ${Math.round((sz * 10 * 1000 * 100) / (now - st1)) / 100} mb per second`)
}

void doTest().catch((err) => {
console.error(err)
})

console.log('done')

0 comments on commit 215ffa9

Please sign in to comment.