Skip to content

Commit

Permalink
bug: CE-491 Adding Jetstream prevent duplicate complaints (#312)
Browse files Browse the repository at this point in the history
Co-authored-by: afwilcox <[email protected]>
  • Loading branch information
barrfalk and afwilcox authored May 3, 2024
1 parent 2cec874 commit f57d8ce
Show file tree
Hide file tree
Showing 13 changed files with 240 additions and 189 deletions.
84 changes: 45 additions & 39 deletions .github/workflows/deploy-nats.yml
Original file line number Diff line number Diff line change
@@ -1,50 +1,56 @@
name: Deploy NATS to OpenShift

on:
pull_request:
pull_request:

jobs:
deploy:
runs-on: ubuntu-latest
steps:
- name: Checkout
uses: actions/checkout@v2
- name: Checkout
uses: actions/checkout@v2

- name: Set up OpenShift CLI
uses: redhat-actions/oc-login@v1
with:
openshift_server_url: ${{ vars.OC_SERVER }}
openshift_token: ${{ secrets.OC_TOKEN }}
namespace: ${{ vars.OC_NAMESPACE }}
- name: Set up OpenShift CLI
uses: redhat-actions/oc-login@v1
with:
openshift_server_url: ${{ vars.OC_SERVER }}
openshift_token: ${{ secrets.OC_TOKEN }}
namespace: ${{ vars.OC_NAMESPACE }}

- name: Add NATS Helm repo
run: helm repo add nats https://nats-io.github.io/k8s/helm/charts/

- name: Check if NATS release exists
id: check_release
run: |
RELEASE_NAME=${{ github.event.repository.name }}-${{ github.event.number }}-nats
if helm list -q | grep -q $RELEASE_NAME; then
echo "Release $RELEASE_NAME already exists. Skipping install."
echo "release_exists=true" >> $GITHUB_ENV
else
echo "Release $RELEASE_NAME does not exist. Proceeding with install."
echo "release_exists=false" >> $GITHUB_ENV
fi
- name: Deploy NATS using Helm
if: env.release_exists == 'false'
run: |
helm install ${{ github.event.repository.name }}-${{ github.event.number }}-nats nats/nats \
--set natsBox.enabled=false \
--set container.merge.resources.requests.cpu=25m \
--set container.merge.resources.limits.cpu=100m \
--set container.merge.resources.requests.memory=100Mi \
--set container.merge.resources.limits.memory=200Mi \
--set reloader.enabled=false \
--set replicaCount=1
- name: Add NATS Helm repo
run: helm repo add nats https://nats-io.github.io/k8s/helm/charts/

- name: Label NATS Deployment
run: |
RELEASE_NAME=${{ github.event.repository.name }}-${{ github.event.number }}-nats
oc label statefulset $RELEASE_NAME app=${{ github.event.repository.name }}-${{ github.event.number }}
- name: Check if NATS release exists
id: check_release
run: |
RELEASE_NAME=${{ github.event.repository.name }}-${{ github.event.number }}-nats
if helm list -q | grep -q $RELEASE_NAME; then
echo "Release $RELEASE_NAME already exists. Skipping install."
echo "release_exists=true" >> $GITHUB_ENV
else
echo "Release $RELEASE_NAME does not exist. Proceeding with install."
echo "release_exists=false" >> $GITHUB_ENV
fi
- name: Deploy NATS using Helm
if: env.release_exists == 'false'
run: |
helm install ${{ github.event.repository.name }}-${{ github.event.number }}-nats nats/nats \
--set config.jetstream.enabled=true \
--set config.jetstream.fileStore.pvc.size=200Mi \
--set config.jetstream.memoryStore.enabled=true \
--set config.jetstream.memoryStore.maxSize=250Mi \
--set cluster.enabled=true \
--set natsBox.enabled=false \
--set persistence.enabled=true \
--set persistence.size=200Mi \
--set container.merge.resources.requests.cpu=25m \
--set container.merge.resources.limits.cpu=100m \
--set container.merge.resources.requests.memory=50Mi \
--set container.merge.resources.limits.memory=250Mi \
--set reloader.enabled=false \
--set replicaCount=1
- name: Label NATS Deployment
run: |
RELEASE_NAME=${{ github.event.repository.name }}-${{ github.event.number }}-nats
oc label statefulset $RELEASE_NAME app=${{ github.event.repository.name }}-${{ github.event.number }}
3 changes: 2 additions & 1 deletion .github/workflows/pr-open.yml
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,8 @@ jobs:
- name: webeoc
file: webeoc/openshift.deploy.yml
overwrite: true
parameters: -p MIN_REPLICAS=1 -p MAX_REPLICAS=2
needs: [backend]
parameters: -p MIN_REPLICAS=2 -p MAX_REPLICAS=2
- name: database
file: database/openshift.deploy.yml
overwrite: false
Expand Down
5 changes: 5 additions & 0 deletions backend/db/migrations/R__0.14.0__CE-101.sql
Original file line number Diff line number Diff line change
Expand Up @@ -362,6 +362,11 @@ BEGIN
-- Truncate and uppercase the webEOC value, get rid of spaces, and truncate to 9 characters to ensure we have room for adding a number for uniqueness
truncated_code := UPPER(LEFT(regexp_replace(webeoc_value, '\s', '', 'g'), 10));

-- Return null if truncated_code is empty or null
IF truncated_code IS NULL OR truncated_code = '' THEN
RETURN NULL;
END IF;

-- Resolve the target code table and column name based on code_table_type
CASE code_table_type
WHEN 'reprtdbycd' THEN
Expand Down
3 changes: 3 additions & 0 deletions docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,9 @@ services:
working_dir: "/app"
nats:
image: nats:latest # Using the latest official NATS image
command:
- "-js"
- "-DVV"
ports:
- "4222:4222" # Default NATS client port
- "8222:8222" # NATS monitoring port
Expand Down
41 changes: 14 additions & 27 deletions webeoc/openshift.deploy.yml
Original file line number Diff line number Diff line change
Expand Up @@ -145,23 +145,27 @@ objects:
cpu: ${CPU_LIMIT}
memory: ${MEMORY_LIMIT}
readinessProbe:
httpGet:
path: /health
port: 3002
scheme: HTTP
exec:
command:
- /usr/bin/env
- bash
- -c
- ls
initialDelaySeconds: 60
periodSeconds: 15
timeoutSeconds: 5
timeoutSeconds: 15
livenessProbe:
successThreshold: 1
failureThreshold: 3
httpGet:
path: /health
port: 3002
scheme: HTTP
exec:
command:
- /usr/bin/env
- bash
- -c
- ls
initialDelaySeconds: 60
periodSeconds: 30
timeoutSeconds: 5
timeoutSeconds: 15
- apiVersion: v1
kind: Service
metadata:
Expand All @@ -176,23 +180,6 @@ objects:
targetPort: 3002
selector:
deploymentconfig: ${NAME}-${ZONE}-${COMPONENT}
- apiVersion: route.openshift.io/v1
kind: Route
metadata:
labels:
app: ${NAME}-${ZONE}
name: ${NAME}-${ZONE}-${COMPONENT}
spec:
host: ${NAME}-${ZONE}-${COMPONENT}.${DOMAIN}
port:
targetPort: 3002-tcp
to:
kind: Service
name: ${NAME}-${ZONE}-${COMPONENT}
weight: 100
tls:
termination: edge
insecureEdgeTerminationPolicy: Redirect
- apiVersion: autoscaling/v2
kind: HorizontalPodAutoscaler
metadata:
Expand Down
26 changes: 18 additions & 8 deletions webeoc/package-lock.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

5 changes: 3 additions & 2 deletions webeoc/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -21,14 +21,15 @@
},
"dependencies": {
"@nestjs/axios": "^3.0.1",
"@nestjs/cli": "^9.0.0",
"@nestjs/common": "^9.0.0",
"@nestjs/core": "^9.0.0",
"@nestjs/platform-express": "^9.0.0",
"@nestjs/schedule": "^4.0.0",
"@nestjs/cli": "^9.0.0",
"cron": "^3.1.6",
"date-fns": "^3.6.0",
"dotenv": "^16.3.1",
"nats": "^2.18.0",
"nats": "^2.23.0",
"reflect-metadata": "^0.1.13",
"rxjs": "^7.2.0"
},
Expand Down
8 changes: 8 additions & 0 deletions webeoc/src/common/constants.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,14 @@
export const NATS_NEW_COMPLAINTS_TOPIC_NAME = "new_complaints";
export const NATS_NEW_COMPLAINTS_TOPIC_CONSUMER = "new_complaints_consumer";
export const NATS_UPDATED_COMPLAINTS_TOPIC_NAME = "updated_complaints";
export const NEW_STAGING_COMPLAINTS_TOPIC_NAME = "new_staging_complaints";
export const NEW_STAGING_COMPLAINTS_TOPIC_CONSUMER = "new_staging_complaints_consumer";
export const NATS_STREAM_NAME = "complaints_stream";

export const STAGING_API_ENDPOINT = "staging-complaint";
export const COMPLAINT_API_ENDPOINT = "complaint-internal";
export const NATS_QUEUE_GROUP_STAGING = "staging_complaints_queue_group";
export const NATS_QUEUE_GROUP_COMPLAINTS = "complaints_queue_group";
export const NATS_DURABLE_COMPLAINTS = "nats_durable_complaints";
export const NATS_DURABLE_STAGING = "nats_durable_staging";
export const NATS_DELIVER_SUBJECT = "complaints_deliver_subject";
51 changes: 35 additions & 16 deletions webeoc/src/complaints-publisher/complaints-publisher.service.ts
Original file line number Diff line number Diff line change
@@ -1,45 +1,64 @@
import { Injectable, Logger } from "@nestjs/common";
import { ClientProxy, ClientProxyFactory, Transport } from "@nestjs/microservices";
import { connect, headers, JetStreamClient, JSONCodec } from "nats";
import { NATS_NEW_COMPLAINTS_TOPIC_NAME, NEW_STAGING_COMPLAINTS_TOPIC_NAME } from "../common/constants";
import { Complaint } from "src/types/Complaints";

@Injectable()
export class ComplaintsPublisherService {
private client: ClientProxy;

private jsClient: JetStreamClient;
private readonly logger = new Logger(ComplaintsPublisherService.name);

constructor() {
this.client = ClientProxyFactory.create({
transport: Transport.NATS,
options: {
servers: [process.env.NATS_HOST],
},
this.initializeNATS();
}

private async initializeNATS() {
const nc = await connect({
servers: [process.env.NATS_HOST],
});
this.jsClient = nc.jetstream();
}

private codec = JSONCodec<Complaint>();

/**
* Publish meessage to topic to indicate that a new complaint is available from webeoc
* Publish message to JetStream to indicate that a new complaint is available from webeoc
* @param complaint
*/
async publishComplaintsFromWebEOC(complaint: Complaint): Promise<void> {
try {
this.client.emit(NATS_NEW_COMPLAINTS_TOPIC_NAME, complaint);
this.logger.log(`Complaint published: ${complaint.incident_number}`);
const msg = this.codec.encode(complaint);
const natsHeaders = headers(); // used to look for complaints that have already been submitted
natsHeaders.set("Nats-Msg-Id", `staged-${complaint.incident_number}`);
const ack = await this.jsClient.publish(NATS_NEW_COMPLAINTS_TOPIC_NAME, msg, { headers: natsHeaders });
if (!ack.duplicate) {
this.logger.debug(`New complaint: ${complaint.incident_number}`);
} else {
this.logger.debug(`Complaint already published: ${complaint.incident_number}`);
}
} catch (error) {
this.logger.error(`Error publishing complaint: ${error.message}`, error.stack);
throw error;
}
}

/**
*
* @param incident_number Publish message to topic to indicate that a new complaint was added to the staging table and is ready to be moved to the operation complaints tables
* Publish message to JetStream to indicate that a new complaint was added to the staging table
* @param incident_number
*/
async publishStagingComplaintInserted(complaint_identifier: string): Promise<void> {
async publishStagingComplaintInserted(incident_number: string): Promise<void> {
try {
this.client.emit(NEW_STAGING_COMPLAINTS_TOPIC_NAME, complaint_identifier);
this.logger.log(`Complaint ready to be moved to operational tables: ${complaint_identifier}`);
const natsHeaders = headers(); // used to look for complaints that have already been submitted
natsHeaders.set("Nats-Msg-Id", `complaint-${incident_number}`);
const ack = await this.jsClient.publish(NEW_STAGING_COMPLAINTS_TOPIC_NAME, incident_number, {
headers: natsHeaders,
});

if (!ack?.duplicate) {
this.logger.debug(`Complaint ready to be moved to operational tables: ${incident_number}`);
} else {
this.logger.debug(`Complaint already moved to operational: ${incident_number}`);
}
} catch (error) {
this.logger.error(`Error saving complaint to staging: ${error.message}`, error.stack);
throw error;
Expand Down
Loading

0 comments on commit f57d8ce

Please sign in to comment.