Skip to content

Commit

Permalink
chore: fail faster if syncing events is timing out
Browse files Browse the repository at this point in the history
  • Loading branch information
stbrody committed Jun 13, 2024
1 parent 433e45b commit 49ef34a
Showing 1 changed file with 31 additions and 29 deletions.
60 changes: 31 additions & 29 deletions suite/src/__tests__/fast/sync-events.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,10 @@ import fetch from 'cross-fetch'
import { randomCID, StreamID } from '@ceramicnetwork/streamid'
import { ReconEvent, ReconEventInput, randomEvents } from '../../utils/rustCeramicHelpers.js'

const delay = utilities.delay
const delayMs = utilities.delayMs
// Environment variables
const CeramicUrls = String(process.env.CERAMIC_URLS).split(',')
const READ_EVENTS_TIMEOUT_MS = 60 * 1000

async function registerInterest(url: string, model: StreamID): Promise<void> {
const response = await fetch(url + `/ceramic/interests/model/${model.toString()}`, {
Expand Down Expand Up @@ -60,16 +61,23 @@ async function getEventData(url: string, eventId: string): Promise<ReconEvent> {
return response.json()
}

async function readEvents(url: string, resumeToken: String, numExpectedEvents: number) {
async function readEvents(
url: string,
resumeToken: String,
numExpectedEvents: number,
timeoutMs = READ_EVENTS_TIMEOUT_MS,
) {
const events = []
console.log(
`readEvents: ${url} starting at ${resumeToken}, waiting for ${numExpectedEvents} events`,
)
var startTime = Date.now()
while (events.length < numExpectedEvents) {
if (Date.now() - startTime > 60000) {
if (Date.now() - startTime > timeoutMs) {
// if it took more than a minute, quit
console.warn(`readEvents: timeout after 60 seconds`)
console.warn(
`readEvents: timeout after ${timeoutMs} millis waiting for ${numExpectedEvents} but only ${events.length} events found`,
)
break
}

Expand All @@ -83,6 +91,8 @@ async function readEvents(url: string, resumeToken: String, numExpectedEvents: n
const eventWithData = await getEventData(url, event.id)
events.push(eventWithData)
}

await delayMs(100)
}
return sortModelEvents(events) // sort so that tests are stable
}
Expand All @@ -100,31 +110,23 @@ function sortModelEvents(events: ReconEvent[]): ReconEvent[] {
}

// Wait up till retries seconds for all urls to have at least count events
async function waitForEventCount(
urls: string[],
count: number,
retries: number,
resumeTokens: string[],
) {
async function waitForEventCount(urls: string[], count: number, resumeTokens: string[]) {
if (urls.length !== resumeTokens.length) {
throw new Error('The lengths of urls and resumeTokens arrays must be equal')
}
for (let r = 0; r < retries; r++) {
let all_good = true
for (let i = 0; i < urls.length; i++) {
let url = urls[i]
let events = await readEvents(url, resumeTokens[i], count)
if (events.length < count) {
all_good = false
break
}
}
if (all_good) {
return
let all_good = true
for (let i = 0; i < urls.length; i++) {
let url = urls[i]
let events = await readEvents(url, resumeTokens[i], count)
if (events.length < count) {
all_good = false
break
}
await delay(1)
}
throw new Error(`waitForEventCount: timeout after ${retries} retries`)
if (all_good) {
return
}
throw new Error(`waitForEventCount: timeout`)
}

describe('sync events', () => {
Expand Down Expand Up @@ -153,7 +155,7 @@ describe('sync events', () => {
await registerInterest(url, modelID)
}
const sortedModelEvents = sortModelEvents(modelEvents)
await waitForEventCount(CeramicUrls, modelEvents.length, 10, resumeTokens)
await waitForEventCount(CeramicUrls, modelEvents.length, resumeTokens)

// Use a sorted expected value for stable tests
// Validate each node got the events, including the first node
Expand All @@ -177,7 +179,7 @@ describe('sync events', () => {
}
await writeEvents(firstNodeUrl, modelEvents)

await waitForEventCount(CeramicUrls, modelEvents.length, 10, resumeTokens)
await waitForEventCount(CeramicUrls, modelEvents.length, resumeTokens)

// Use a sorted expected value for stable tests
const sortedModelEvents = sortModelEvents(modelEvents)
Expand Down Expand Up @@ -209,7 +211,7 @@ describe('sync events', () => {
// Write the second half of the data
await writeEvents(firstNodeUrl, secondHalf)

await waitForEventCount(CeramicUrls, modelEvents.length, 10, resumeTokens)
await waitForEventCount(CeramicUrls, modelEvents.length, resumeTokens)

// Use a sorted expected value for stable tests
const sortedModelEvents = sortModelEvents(modelEvents)
Expand Down Expand Up @@ -242,7 +244,7 @@ describe('sync events', () => {
writeEvents(secondNodeUrl, secondHalf),
])

await waitForEventCount(CeramicUrls, modelEvents.length, 10, resumeTokens)
await waitForEventCount(CeramicUrls, modelEvents.length, resumeTokens)

// Use a sorted expected value for stable tests
const sortedModelEvents = sortModelEvents(modelEvents)
Expand Down Expand Up @@ -294,7 +296,7 @@ describe('sync events', () => {
}
await Promise.all(writes)

await waitForEventCount(CeramicUrls, allEvents.length, 20, resumeTokens)
await waitForEventCount(CeramicUrls, allEvents.length, resumeTokens)

// Use a sorted expected value for stable tests
const sortedModelEvents = sortModelEvents(allEvents)
Expand Down

0 comments on commit 49ef34a

Please sign in to comment.