Skip to content

Commit

Permalink
fix(server&web): fix logs disorder issue (#1982)
Browse files Browse the repository at this point in the history
  • Loading branch information
HUAHUAI23 authored May 30, 2024
1 parent 07019af commit b2f0115
Show file tree
Hide file tree
Showing 8 changed files with 11,766 additions and 4,980 deletions.
5,626 changes: 5,626 additions & 0 deletions pnpm-lock.yaml

Large diffs are not rendered by default.

77 changes: 49 additions & 28 deletions server/src/log/log.controller.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import {
Query,
UseGuards,
Sse,
MessageEvent,
} from '@nestjs/common'
import http from 'http'
import { ApiBearerAuth, ApiOperation, ApiQuery, ApiTags } from '@nestjs/swagger'
Expand Down Expand Up @@ -103,7 +104,7 @@ export class LogController {
@Param('podName') podName: string,
@Query('containerName') containerName: string,
@Param('appid') appid: string,
) {
): Promise<Observable<MessageEvent>> {
if (!containerName) {
containerName = appid
}
Expand All @@ -114,12 +115,7 @@ export class LogController {

if (!podNameList.includes(podName) && podName !== 'all') {
return new Observable<MessageEvent>((subscriber) => {
subscriber.next(
JSON.stringify({
error: 'podName not exist',
}) as unknown as MessageEvent,
)
subscriber.complete()
subscriber.error(new Error('podName not exist'))
})
}

Expand All @@ -136,19 +132,34 @@ export class LogController {
const logs = new Log(kc)

const streamsEnded = new Set<string>()

const timerId = setInterval(() => {
subscriber.next('\u200B' as unknown as MessageEvent)
}, 30000)
const k8sLogResponses: http.IncomingMessage[] = []
const podLogStreams: PassThrough[] = []

const destroyStream = () => {
combinedLogStream?.removeAllListeners()
combinedLogStream?.destroy()
clearInterval(timerId)
combinedLogStream.removeAllListeners()
combinedLogStream.destroy()

k8sLogResponses.forEach((response) => {
response.removeAllListeners()
response.destroy()
})

podLogStreams.forEach((stream) => {
stream.removeAllListeners()
stream.destroy()
})
}

let idCounter = 1
combinedLogStream.on('data', (chunk) => {
subscriber.next(chunk.toString() as MessageEvent)
const dataString = chunk.toString()
const messageEvent: MessageEvent = {
id: idCounter.toString(),
data: dataString,
type: 'log',
}
idCounter++
subscriber.next(messageEvent)
})

combinedLogStream.on('error', (error) => {
Expand All @@ -157,18 +168,18 @@ export class LogController {
destroyStream()
})

combinedLogStream.on('end', () => {
combinedLogStream.on('close', () => {
subscriber.complete()
destroyStream()
})

const fetchLog = async (podName: string) => {
let k8sResponse: http.IncomingMessage | undefined
const podLogStream = new PassThrough()
streamsEnded.add(podName)
podLogStreams.push(podLogStream)

try {
k8sResponse = await logs.log(
const k8sResponse: http.IncomingMessage = await logs.log(
namespaceOfApp,
podName,
containerName,
Expand All @@ -181,26 +192,33 @@ export class LogController {
tailLines: 1000,
},
)

k8sLogResponses.push(k8sResponse)

podLogStream.pipe(combinedLogStream, { end: false })

podLogStream.on('error', (error) => {
combinedLogStream.emit('error', error)
podLogStream.removeAllListeners()
podLogStream.destroy()
subscriber.error(error)
this.logger.error(`podLogStream error for pod ${podName}`, error)
destroyStream()
})

podLogStream.once('end', () => {
k8sResponse.on('close', () => {
streamsEnded.delete(podName)
if (streamsEnded.size === 0) {
combinedLogStream.end()
combinedLogStream.emit('close')
}
})

podLogStream.on('close', () => {
streamsEnded.delete(podName)
if (streamsEnded.size === 0) {
combinedLogStream.emit('close')
}
})
} catch (error) {
this.logger.error(`Failed to get logs for pod ${podName}`, error)
subscriber.error(error)
k8sResponse?.destroy()
podLogStream.removeAllListeners()
podLogStream.destroy()
this.logger.error(`Failed to get logs for pod ${podName}`, error)
destroyStream()
}
}
Expand All @@ -212,8 +230,11 @@ export class LogController {
} else {
fetchLog(podName)
}

// Clean up when the client disconnects
return () => destroyStream()
return () => {
destroyStream()
}
})
}
}
11 changes: 11 additions & 0 deletions web/package-lock.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions web/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
"@codingame/monaco-vscode-typescript-basics-default-extension": "~1.82.3",
"@emotion/react": "^11.11.0",
"@emotion/styled": "^11.11.0",
"@microsoft/fetch-event-source": "^2.0.1",
"@monaco-editor/react": "^4.6.0",
"@patternfly/react-log-viewer": "^5.0.0",
"@sentry/integrations": "^7.73.0",
Expand Down
Loading

0 comments on commit b2f0115

Please sign in to comment.