Skip to content

Commit

Permalink
feat: added query caching (#44)
Browse files Browse the repository at this point in the history
* feat: added query caching

* style: applied prettier

* fix: fixed caching

* test: improved test script to not start a testing server if one is already running

* fix: fixed caching further, improved test

* fix: added comments about query caching

* fix: don't cache empty results, added test
  • Loading branch information
ErikBjare authored Nov 15, 2023
1 parent 8763962 commit d6eaeae
Show file tree
Hide file tree
Showing 3 changed files with 165 additions and 27 deletions.
118 changes: 104 additions & 14 deletions src/aw-client.ts
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@ export class AWClient {

public controller: AbortController;

private queryCache: { [cacheKey: string]: object[] };
private heartbeatQueues: {
[bucketId: string]: {
isProcessing: boolean;
Expand All @@ -89,6 +90,10 @@ export class AWClient {
baseURL: this.baseURL + "/api",
timeout: options.timeout || 30000,
});

// Cache for queries, by timeperiod
// TODO: persist cache and add cache expiry/invalidation
this.queryCache = {};
}

private async _get(endpoint: string, params: object = {}) {
Expand Down Expand Up @@ -120,7 +125,7 @@ export class AWClient {
public async ensureBucket(
bucketId: string,
type: string,
hostname: string
hostname: string,
): Promise<{ alreadyExist: boolean }> {
try {
await this._post(`/0/buckets/${bucketId}`, {
Expand All @@ -145,7 +150,7 @@ export class AWClient {
public async createBucket(
bucketId: string,
type: string,
hostname: string
hostname: string,
): Promise<undefined> {
await this._post(`/0/buckets/${bucketId}`, {
client: this.clientname,
Expand All @@ -166,7 +171,7 @@ export class AWClient {
buckets[bucket].created = new Date(buckets[bucket].created);
if (buckets[bucket].last_updated) {
buckets[bucket].last_updated = new Date(
buckets[bucket].last_updated
buckets[bucket].last_updated,
);
}
});
Expand All @@ -177,7 +182,7 @@ export class AWClient {
const bucket = await this._get(`/0/buckets/${bucketId}`);
if (bucket.data === undefined) {
console.warn(
"Received bucket had undefined data, likely due to data field unsupported by server. Try updating your ActivityWatch server to get rid of this message."
"Received bucket had undefined data, likely due to data field unsupported by server. Try updating your ActivityWatch server to get rid of this message.",
);
bucket.data = {};
}
Expand All @@ -188,15 +193,15 @@ export class AWClient {
public async getEvent(bucketId: string, eventId: number): Promise<IEvent> {
// Get a single event by ID
const event = await this._get(
"/0/buckets/" + bucketId + "/events/" + eventId
"/0/buckets/" + bucketId + "/events/" + eventId,
);
event.timestamp = new Date(event.timestamp);
return event;
}

public async getEvents(
bucketId: string,
params: GetEventsOptions = {}
params: GetEventsOptions = {},
): Promise<IEvent[]> {
const events = await this._get("/0/buckets/" + bucketId + "/events", {
params,
Expand All @@ -210,7 +215,7 @@ export class AWClient {
public async countEvents(
bucketId: string,
startTime?: Date,
endTime?: Date
endTime?: Date,
) {
const params = {
starttime: startTime ? startTime.toISOString() : null,
Expand All @@ -229,7 +234,7 @@ export class AWClient {
// Insert multiple events, requires the events to not have IDs assigned
public async insertEvents(
bucketId: string,
events: IEvent[]
events: IEvent[],
): Promise<void> {
// Check that events don't have IDs
// To replace an event, use `replaceEvent`, which does the opposite check (requires ID)
Expand All @@ -249,7 +254,7 @@ export class AWClient {
// Replace multiple events, requires the events to have IDs assigned
public async replaceEvents(
bucketId: string,
events: IEvent[]
events: IEvent[],
): Promise<void> {
for (const event of events) {
if (event.id === undefined) {
Expand All @@ -273,13 +278,13 @@ export class AWClient {
public heartbeat(
bucketId: string,
pulsetime: number,
heartbeat: IEvent
heartbeat: IEvent,
): Promise<void> {
// Create heartbeat queue for bucket if not already existing
if (
!Object.prototype.hasOwnProperty.call(
this.heartbeatQueues,
bucketId
bucketId,
)
) {
this.heartbeatQueues[bucketId] = {
Expand All @@ -302,9 +307,19 @@ export class AWClient {
}

/* eslint-disable @typescript-eslint/no-explicit-any */
/**
* Queries the aw-server for data
*
* If cache is enabled, for each {query, timeperiod} it will return cached data if available,
* if a timeperiod spans the future it will not cache it.
*/
public async query(
timeperiods: (string | { start: Date; end: Date })[],
query: string[]
query: string[],
params: { cache?: boolean; cacheEmpty?: boolean } = {
cache: true,
cacheEmpty: false,
},
): Promise<any[]> {
const data = {
query,
Expand All @@ -314,14 +329,89 @@ export class AWClient {
: tp;
}),
};
return await this._post("/0/query/", data);

const cacheResults: any[] = [];
if (params.cache) {
// Check cache for each {timeperiod, query} pair
for (const timeperiod of data.timeperiods) {
// check if timeperiod spans the future
const stop = new Date(timeperiod.split("/")[1]);
const now = new Date();
if (now < stop) {
cacheResults.push(null);
continue;
}
// check cache
const cacheKey = JSON.stringify({ timeperiod, query });
if (
this.queryCache[cacheKey] &&
(params.cacheEmpty || this.queryCache[cacheKey].length > 0)
) {
cacheResults.push(this.queryCache[cacheKey]);
} else {
cacheResults.push(null);
}
}

// If all results were cached, return them
if (cacheResults.every((r) => r !== null)) {
//console.debug("Returning fully cached query results");
return cacheResults;
}
}

const timeperiodsNotCached = data.timeperiods.filter(
(_, i) => cacheResults[i] === null,
);

// Otherwise, query with remaining timeperiods
const queryResults = await this._post("/0/query/", {
...data,
timeperiods: timeperiodsNotCached,
});

if (params.cache) {
/*
if (cacheResults.every((r) => r === null)) {
console.debug("Returning uncached query results");
} else if (
cacheResults.some((r) => r === null) &&
cacheResults.some((r) => r !== null)
) {
console.debug("Returning partially cached query results");
}
*/

// Cache results
// NOTE: this also caches timeperiods that span the future,
// but this is ok since we check that when first checking the cache,
// and makes it easier to return all results from cache.
for (const [i, result] of queryResults.entries()) {
const cacheKey = JSON.stringify({
timeperiod: timeperiodsNotCached[i],
query,
});
this.queryCache[cacheKey] = result;
}

// Return all results from cache
return timeperiods.map((_, i) => {
const cacheKey = JSON.stringify({
timeperiod: data.timeperiods[i],
query,
});
return this.queryCache[cacheKey];
});
} else {
return queryResults;
}
}
/* eslint-enable @typescript-eslint/no-explicit-any */

private async send_heartbeat(
bucketId: string,
pulsetime: number,
data: IEvent
data: IEvent,
): Promise<IEvent> {
const url =
"/0/buckets/" + bucketId + "/heartbeat?pulsetime=" + pulsetime;
Expand Down
53 changes: 46 additions & 7 deletions src/test/test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ describe("Basic API usage", () => {
// Check that the event is correct
assert.equal(
replacedEvent.timestamp.toISOString(),
testevent.timestamp.toISOString()
testevent.timestamp.toISOString(),
);
assert.equal(replacedEvent.data.label, newLabel);

Expand Down Expand Up @@ -148,17 +148,22 @@ describe("Basic API usage", () => {
};

return awc.heartbeat(bucketId, 5, newEvent);
})
}),
);
const events = await awc.getEvents(bucketId);
assert.equal(events.length, 1);
});

it("Query", async () => {
const e1 = { ...testevent, timestamp: new Date("2022-01-01") };
const e2 = { ...testevent, timestamp: new Date("2022-01-02") };
const d1 = new Date("2022-01-01");
const d2 = new Date("2022-01-02");
const d3 = new Date("2022-01-03");
const e1 = { ...testevent, timestamp: d1 };
const e2 = { ...testevent, timestamp: d2 };
const e3 = { ...testevent, timestamp: d3 };
await awc.heartbeat(bucketId, 5, e1);
await awc.heartbeat(bucketId, 5, e2);
await awc.heartbeat(bucketId, 5, e3);

// Both these are valid timeperiod specs
const timeperiods = [
Expand All @@ -171,14 +176,48 @@ describe("Basic API usage", () => {
const resp_e2: IEvent = resp[0][1];
assert.equal(
e1.timestamp.toISOString(),
new Date(resp_e2.timestamp).toISOString()
new Date(resp_e2.timestamp).toISOString(),
);
assert.equal(e1.data.label, resp_e2.data.label);
assert.equal(
e2.timestamp.toISOString(),
new Date(resp_e1.timestamp).toISOString()
new Date(resp_e1.timestamp).toISOString(),
);
assert.equal(e2.data.label, resp[0][0].data.label);
assert.equal(e2.data.label, resp_e1.data.label);

// Run query again and check that the results are the same (correctly cached)
const resp2: IEvent[][] = await awc.query(timeperiods, query);
assert.deepEqual(resp, resp2);

// Add a timeperiod and query again, to check that partial cache works
const timeperiods2 = [
{ start: d1, end: d2 },
{ start: d2, end: d3 },
];
const resp3: IEvent[][] = await awc.query(timeperiods2, query);
assert.equal(2, resp3[0].length);
assert.equal(2, resp3[1].length);

// Query a timeperiod without events in the past,
// then add an event for the timeperiod, and query again.
// This is to check that we don't cache when the query returned nothing.
const timeperiods3 = [
{ start: new Date("1980-1-1"), end: new Date("1980-1-2") },
];
const resp4: IEvent[][] = await awc.query(timeperiods3, query);

// Check that the result is empty
assert.equal(0, resp4[0].length);

// Add an event for the timeperiod
await awc.heartbeat(bucketId, 5, {
...testevent,
timestamp: new Date("1980-1-1"),
});

// Query again and check that the result is not empty
const resp5: IEvent[][] = await awc.query(timeperiods3, query);
assert.equal(1, resp5[0].length);
});
});

Expand Down
21 changes: 15 additions & 6 deletions test.sh
Original file line number Diff line number Diff line change
@@ -1,15 +1,24 @@
#!/bin/bash
aw-server --testing --storage=memory &> /dev/null &
AWPID=$!

# Give aw-server some time to start
sleep 5
# if something is already running on port 5666, assume server already running
if lsof -Pi :5666 -sTCP:LISTEN -t >/dev/null ; then
echo "aw-server already running on port 5666"
else
SERVER_STARTED=1
aw-server --testing --storage=memory &> /dev/null &
AWPID=$!

# Give aw-server some time to start
sleep 5
fi

# Run tests
mocha ./out/test/*.js
EXITCODE=$?

# Shutdown AW
kill $AWPID
if [ $SERVER_STARTED ]; then
# Shutdown AW
kill $AWPID
fi

exit $EXITCODE

0 comments on commit d6eaeae

Please sign in to comment.