Skip to content

Commit

Permalink
fix(server): fix logs
Browse files Browse the repository at this point in the history
  • Loading branch information
HUAHUAI23 committed May 29, 2024
1 parent bf304e4 commit ddad699
Showing 1 changed file with 33 additions and 20 deletions.
53 changes: 33 additions & 20 deletions server/src/log/log.controller.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import {
Query,
UseGuards,
Sse,
MessageEvent,
} from '@nestjs/common'
import http from 'http'
import { ApiBearerAuth, ApiOperation, ApiQuery, ApiTags } from '@nestjs/swagger'
Expand Down Expand Up @@ -103,7 +104,7 @@ export class LogController {
@Param('podName') podName: string,
@Query('containerName') containerName: string,
@Param('appid') appid: string,
) {
): Promise<Observable<MessageEvent>> {
if (!containerName) {
containerName = appid
}
Expand Down Expand Up @@ -136,19 +137,32 @@ export class LogController {
const logs = new Log(kc)

const streamsEnded = new Set<string>()

const timerId = setInterval(() => {
subscriber.next('\u200B' as unknown as MessageEvent)
}, 30000)
const k8sLogResponses: http.IncomingMessage[] = []
const podLogStreams: PassThrough[] = []

const destroyStream = () => {
combinedLogStream?.removeAllListeners()
combinedLogStream?.destroy()
clearInterval(timerId)
combinedLogStream.removeAllListeners()
combinedLogStream.destroy()

k8sLogResponses.forEach((response) => response.destroy())

podLogStreams.forEach((stream) => {
stream.removeAllListeners()
stream.destroy()
})
}

let idCounter = 1
combinedLogStream.on('data', (chunk) => {
subscriber.next(chunk.toString() as MessageEvent)
const dataString = chunk.toString()
const messageEvent: MessageEvent = {
id: idCounter.toString(),
data: dataString,
type: 'log',
}
idCounter++
console.log(dataString)
subscriber.next(messageEvent)
})

combinedLogStream.on('error', (error) => {
Expand All @@ -157,18 +171,18 @@ export class LogController {
destroyStream()
})

combinedLogStream.on('end', () => {
combinedLogStream.on('close', () => {
subscriber.complete()
destroyStream()
})

const fetchLog = async (podName: string) => {
let k8sResponse: http.IncomingMessage | undefined
const podLogStream = new PassThrough()
streamsEnded.add(podName)
podLogStreams.push(podLogStream)

try {
k8sResponse = await logs.log(
const k8sResponse: http.IncomingMessage = await logs.log(
namespaceOfApp,
podName,
containerName,
Expand All @@ -181,26 +195,25 @@ export class LogController {
tailLines: 1000,
},
)

k8sLogResponses.push(k8sResponse)

podLogStream.pipe(combinedLogStream, { end: false })

podLogStream.on('error', (error) => {
combinedLogStream.emit('error', error)
podLogStream.removeAllListeners()
podLogStream.destroy()
this.logger.error(`podLogStream error for pod ${podName}`, error)
destroyStream()
})

podLogStream.once('end', () => {
podLogStream.on('close', () => {
streamsEnded.delete(podName)
if (streamsEnded.size === 0) {
combinedLogStream.end()
combinedLogStream.emit('close')
}
})
} catch (error) {
this.logger.error(`Failed to get logs for pod ${podName}`, error)
subscriber.error(error)
k8sResponse?.destroy()
podLogStream.removeAllListeners()
podLogStream.destroy()
destroyStream()
}
}
Expand Down

0 comments on commit ddad699

Please sign in to comment.