Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

chore: add fallbacks to restart streamWorker if kafka connection breaks #34

Merged
merged 1 commit into from
Dec 22, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -38,3 +38,6 @@ run: ## run Pokeshop API on Docker Compose and run Trace-based tests with Tracet

down: ## stop Pokeshop API on Docker Compose and run Trace-based tests with Tracetest
docker compose -f docker-compose.yml -f ./docker-compose.stream.yml -f ./tracetest/docker-compose.yml down

build/docker: # build docker image locally
docker build . -t kubeshop/demo-pokemon-api:latest
23 changes: 7 additions & 16 deletions api/src/services/stream.service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -56,10 +56,6 @@ class InstrumentedKafkaStreamService<T> extends InstrumentedComponent implements
const headers = this.extractHeaders(message);
const parentContext = propagation.extract(context.active(), headers);

console.log('Extracting headers from message to get OTel data...')
console.log('Message headers: ', headers)
console.log('Context: ', parentContext)

const span = await createSpanFromContext(
`${this.topic} ${MessagingOperationValues.PROCESS}`,
parentContext,
Expand Down Expand Up @@ -109,10 +105,8 @@ class KafkaStreamService<T> implements StreamingService<T> {
brokers: [KAFKA_BROKER]
});

console.log(`Checking if need to create topic...`)
await this.waitForTopicCreation();

console.log(`Starting consumer for groupId 'test-group'...`)
this.consumer = this.client.consumer({ groupId: 'test-group' });
await this.consumer.connect();
} catch (ex) {
Expand All @@ -125,16 +119,17 @@ class KafkaStreamService<T> implements StreamingService<T> {
public async subscribe(callback: Function): Promise<void> {
const consumer = await this.connect();

console.log(`Subscribing consumer for topic '${this.topic}'...`)
await consumer.subscribe({ topic: this.topic, fromBeginning: true });

const { CRASH } = consumer.events;
await consumer.on(CRASH, () => {
// make the node process crash on purpose,
// so we can restart the worker
process.exit(-1);
});

await consumer.run({
eachMessage: async ({ message }) => {
console.log(`Consuming message...`);
console.log(`Message headers: ${message.headers?.toString()}`)
console.log(`Message key: ${message.key?.toString()}`)
console.log(`Message value: ${message.value?.toString()}`)

await callback(message);
},
})
Expand All @@ -145,21 +140,17 @@ class KafkaStreamService<T> implements StreamingService<T> {
return
}

console.log(`Connecting to Kafka admin to check topics...`);
const admin = this.client.admin()
await admin.connect()

while (true) {
const topics = await admin.listTopics()
console.log(`Topics registered for broker '${topics}' ...`);

if (topics.includes(this.topic)) {
console.log(`Topic '${this.topic}' exists.`);
await admin?.disconnect()
return
}

console.log(`Topic '${this.topic}' does not exists. Waiting for producer to create it.`);
await sleep(5_000); //wait for 5 seconds
}
}
Expand Down
1 change: 1 addition & 0 deletions docker-compose.stream.yml
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ services:
KAFKA_TOPIC: 'pokemon'
KAFKA_CLIENT_ID: 'streaming-worker'
REDIS_URL: cache
restart: on-failure
depends_on:
db:
condition: service_healthy
Expand Down
Loading