Skip to content

Commit

Permalink
Merge pull request #16 from entur/kafka-cluster-migration
Browse files Browse the repository at this point in the history
Merging branch that should have been merged directly with master.
  • Loading branch information
tysseng authored Jun 13, 2023
2 parents 6da7b4f + 3dbe587 commit f0c8246
Show file tree
Hide file tree
Showing 5 changed files with 53 additions and 14 deletions.
6 changes: 6 additions & 0 deletions cron.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -5,3 +5,9 @@ cron:
url: /bff-kafka/keepalive
schedule: every 5 mins
target: bff-kafka

- description: "Can you feel a heartbeat, Doctor? No, he's dead, Jim."
# The url triggers a check of last heartbeat to see if we are up.
url: /bff-kafka/heartbeat
schedule: every 1 mins
target: bff-kafka
29 changes: 24 additions & 5 deletions src/http.ts
Original file line number Diff line number Diff line change
@@ -1,12 +1,31 @@
import http, { IncomingMessage, ServerResponse } from 'http'
import logger from './logger.js'
import { getLastHeartbeatString, isHeartBeating } from './monitoring.js'

// Minimal HTTP server that accepts any calls, used for keepalive, to prevent idle timeouts.
/**
* Minimal HTTP server that accepts any calls, used for keepalive, to prevent idle timeouts, and
* for monitoring
*/
const httpServer = http.createServer((request: IncomingMessage, response: ServerResponse) => {
logger.debug(`Received http request to ${request.url}`)
response.setHeader('Content-Type', 'text/html')
response.writeHead(200)
response.end('pong')
if (request.url?.includes('heartbeat')) {
response.setHeader('Content-Type', 'text/html')

if (isHeartBeating()) {
response.writeHead(200)
response.end(
`Kafka consumer heart is beating, last time was ${getLastHeartbeatString()}`,
)
} else {
response.writeHead(503)
response.end(
`Kafka consumer seems dead. Last heartbeat was at ${getLastHeartbeatString()}`,
)
}
} else {
response.setHeader('Content-Type', 'text/html')
response.writeHead(200)
response.end('pong')
}
})

export default (port = process.env.PORT || 80): void => {
Expand Down
19 changes: 10 additions & 9 deletions src/kafka.ts
Original file line number Diff line number Diff line change
@@ -1,8 +1,7 @@
// types for Snappy are missing, but we don't need them
// @ts-ignore
// @ts-ignore Types for Snappy are missing, but we don't need them
import SnappyCodec from 'kafkajs-snappy'
import kafkaJsLZ4 from 'kafkajs-lz4'
import { Kafka, CompressionTypes, CompressionCodecs } from 'kafkajs'
import LZ4Codec from 'kafkajs-lz4'
import kafkajs, { Kafka, CompressionTypes } from 'kafkajs'
import type { EachMessagePayload, Consumer } from 'kafkajs'
import { SchemaRegistry } from '@kafkajs/confluent-schema-registry'

Expand All @@ -15,16 +14,18 @@ import { WinstonLogCreator } from './kafkajsWinstonLogger.js'
import { getSecret } from './secrets.js'
import logger from './logger.js'

// Yeah, this doesn't look good. It tries to resolve a CommonJS vs ES Modules error
// that leads to a "TS2351: This expression is not constructable" error when doing
// new LZ4Codec() if it is imported directly.
// (See https://github.com/ajv-validator/ajv/issues/2132#issuecomment-1290409907)
const LZ4Codec = kafkaJsLZ4.default
// For some CommonJS-related reason we get the following if we try to import CompressionCodecs directly:
// SyntaxError: Named export 'CompressionCodecs' not found. The requested module 'kafkajs' is a CommonJS module,
// which may not support all module.exports as named exports.
// CommonJS modules can always be imported via the default export
// -- thus we have to do it in two steps.
const { CompressionCodecs } = kafkajs

// Kafkajs supports Gzip compression by default. LZ4-support is needed because
// some of the producers suddenly started publishing LZ4-compressed messages.
// Snappy is included because it seems fairly popular, and we want to prevent a
// future crash like the one we got from LZ4.
// @ts-ignore Ts says that LZ4Codec is not constructable, but it is.
CompressionCodecs[CompressionTypes.LZ4] = new LZ4Codec().codec
CompressionCodecs[CompressionTypes.Snappy] = SnappyCodec

Expand Down
10 changes: 10 additions & 0 deletions src/monitoring.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
const MINUTE = 60000
const threshold = 3 * MINUTE
let lastHeartbeat = Date.now()
export const updateLastHeartbeat = (): void => {
lastHeartbeat = Date.now()
}

export const isHeartBeating = (): boolean => Date.now() - lastHeartbeat < threshold

export const getLastHeartbeatString = (): string => new Date(lastHeartbeat).toISOString()
3 changes: 3 additions & 0 deletions src/server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,13 +7,16 @@ import { KAFKA_TOPICS } from './config.js'
import { connectToKafka, proxyToPubSub } from './kafka.js'
import { ENVIRONMENT } from './config.js'
import http from './http.js'
import { updateLastHeartbeat } from './monitoring.js'

logger.info(`Starting kafka to pub sub bridge, env is ${ENVIRONMENT}.`)

// This adds a keepalive endpoint to prevent GCP from killing the app.
http()

const { consumer, registry } = await connectToKafka()
consumer.on('consumer.crash', () => logger.error('Oh damn, the Kafka consumer crashed!'))
consumer.on('consumer.heartbeat', updateLastHeartbeat)

const topics = KAFKA_TOPICS.split(',').map((topic) => topic.trim())
try {
Expand Down

0 comments on commit f0c8246

Please sign in to comment.