Skip to content

Commit

Permalink
feat: adds initial pipeline support (#33)
Browse files Browse the repository at this point in the history
  • Loading branch information
eaddingtonwhite authored Apr 26, 2024
1 parent e1b0c8c commit 0e6d738
Show file tree
Hide file tree
Showing 5 changed files with 282 additions and 16 deletions.
115 changes: 115 additions & 0 deletions src/Pipeline.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,115 @@
import Commander from 'ioredis/built/utils/Commander';
import {Command} from 'ioredis';
import {MomentoRedisAdapter} from './momento-redis-adapter';
import {ArgumentType} from 'ioredis/built/Command';
import {CommandParameter} from 'ioredis/built/types';

interface Pipeline {
length: number;
}

/**
* Pipeline class is an object passed back when a user calls client.pipeline()
* it extends the base IORedis Commander interface and overrides the sendMessage
* function so that you can then chain multiple redis commands together and under
* the hood will batch up all the chained commands and then execute them in
* parallel.
*/
class Pipeline extends Commander<{type: 'pipeline'}> {
promise: Promise<[error: Error | null, result: unknown][] | null>;
resolve: (result: [error: Error | null, result: unknown][] | null) => void;
reject: (error: Error) => void;

private _queue: Array<Command> = [];
private _result: Array<unknown> = [];

momentoAdapter: MomentoRedisAdapter;

constructor(public redis: MomentoRedisAdapter) {
super();
this.momentoAdapter = redis;

this.promise = new Promise((resolve, reject) => {
this.resolve = resolve;
this.reject = reject;
});

// eslint-disable-next-line @typescript-eslint/no-this-alias
const _this = this;
Object.defineProperty(this, 'length', {
get: function () {
return _this._queue.length;
},
});
}

sendCommand(command: Command): unknown {
command.pipelineIndex = this._queue.length;
command.promise = this.invokeMomentoRedisClient(command.name, command.args);
this._queue.push(command);
return this;
}

addBatch(commands: Array<Array<unknown>>) {
for (let i = 0; i < commands.length; ++i) {
// Parse input cmd in Array format ex: ["get", "foo"]
const command = commands[i];
const commandName = command[0] as string;
const args = command.slice(1) as ArgumentType[];

// Invoke Command
const cmdToQueue = new Command(commandName, args);
cmdToQueue.promise = this.invokeMomentoRedisClient(
cmdToQueue.name,
cmdToQueue.args
);

// Push command with promise to queue
cmdToQueue.pipelineIndex = this._queue.length;
this._queue.push(cmdToQueue);
}
return this;
}

// Invoke function on momento adaptor
invokeMomentoRedisClient(
name: string,
args: CommandParameter[]
): Promise<unknown> {
// eslint-disable-next-line @typescript-eslint/ban-ts-comment
// @ts-ignore
if (this.momentoAdapter[name] === undefined) {
throw new Error(`Un-Supported Command Passed: ${name}`);
}
// eslint-disable-next-line @typescript-eslint/ban-ts-comment
// @ts-ignore
// eslint-disable-next-line @typescript-eslint/no-unsafe-call
return this.momentoAdapter[name](...args); // eslint-disable-line @typescript-eslint/no-unsafe-return
}

exec(): Promise<[error: Error | null, result: unknown][] | null> {
if (!this._queue.length) {
this.resolve([]);
}

// eslint-disable-next-line @typescript-eslint/no-this-alias
const _this = this;
// eslint-disable-next-line @typescript-eslint/no-floating-promises
execPipeline();
return this.promise;

async function execPipeline() {
for (let i = 0; i < _this._queue.length; ++i) {
_this._result.push([null, await _this._queue[i].promise]);
}

// eslint-disable-next-line @typescript-eslint/ban-ts-comment
// @ts-ignore
// eslint-disable-next-line @typescript-eslint/no-unsafe-argument
_this.resolve(_this._result);
return _this.promise;
}
}
}

export default Pipeline;
24 changes: 23 additions & 1 deletion src/momento-redis-adapter.ts
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,10 @@ import {
CacheUpdateTtl,
MomentoErrorCode,
} from '@gomomento/sdk';
import {RedisKey} from 'ioredis';
import {Command, RedisKey} from 'ioredis';
import * as zstd from '@mongodb-js/zstd';
import {ChainableCommander} from 'ioredis/built/utils/RedisCommander';
import Pipeline from './Pipeline';

const TEXT_DECODER = new TextDecoder();

Expand Down Expand Up @@ -181,11 +183,20 @@ export interface MomentoIORedis {
): Promise<(string | null)[]>;

flushdb(): Promise<'OK'>;

flushdb(async: 'ASYNC'): Promise<'OK'>;

flushdb(sync: 'SYNC'): Promise<'OK'>;

unlink(...args: [...keys: RedisKey[]]): Promise<number>;

// TODO using type <any> here causes lint errors see if way to tighten up
// TODO currently we pass back ChainableCommander here which means we dont get
// TODO compile time checks on what methods are supported by the MomentoIORedis
// TODO interface. We could try defining our own ChainableMomento interface
// TODO instead here potentially
pipeline(commands?: Array<Array<any>>): ChainableCommander;

quit(): Promise<'OK'>;
}

Expand Down Expand Up @@ -824,6 +835,17 @@ export class MomentoRedisAdapter
return 'OK';
}
}

pipeline(commands?: Command[][]): ChainableCommander {
const pipeline = new Pipeline(this);
// This is behavior of silently not adding commands and returning empty
// pipeline if an array is not passed is ported from IORedis initial
// implementation. Trying to keep behavior the same for now.
if (Array.isArray(commands)) {
pipeline.addBatch(commands);
}
return pipeline;
}
}

async function decompress(compressed: Uint8Array): Promise<string> {
Expand Down
15 changes: 4 additions & 11 deletions test/get-set-delete.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -235,23 +235,16 @@ describe('setex', () => {
});

describe('unlink', () => {
it('happy path set get update unlink test', async () => {
it('happy path set get and unlink item', async () => {
const key = v4();
const value1 = v4();
const value2 = v4();
const value = v4();
// Set initial key value
await client.set(key, value1);
await client.set(key, value);

// Get value
let result = await client.get(key);
expect(result).toEqual(value1);

// Update value
await client.set(key, value2);
expect(result).toEqual(value);

// Read updated value
result = await client.get(key);
expect(result).toEqual(value2);
// Unlink key aka "Delete"
await client.unlink(key);

Expand Down
8 changes: 4 additions & 4 deletions test/integration-setup.ts
Original file line number Diff line number Diff line change
@@ -1,13 +1,13 @@
import {v4} from 'uuid';
import {CacheClientProps} from '@gomomento/sdk/dist/src/cache-client-props';
import {
CreateCache,
CacheClient,
CacheConfiguration,
Configurations,
CreateCache,
CredentialProvider,
DeleteCache,
MomentoErrorCode,
CacheClient,
CredentialProvider,
CacheConfiguration,
} from '@gomomento/sdk';
import {MomentoIORedis, MomentoRedisAdapter, NewIORedisWrapper} from '../src';

Expand Down
136 changes: 136 additions & 0 deletions test/pipeline.test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,136 @@
import {SetupIntegrationTest} from './integration-setup';
import {v4} from 'uuid';

const {client} = SetupIntegrationTest();

describe('pipelines', () => {
it('should be able to run with chaining commands commands', async () => {
const key1 = v4();
const key2 = v4();
const value1 = v4();
const value2 = v4();
const dictionaryName = v4();

await client
.pipeline()
.hset(dictionaryName, key1, value1)
.hset(dictionaryName, key2, value2)
.set(key1, value1)
.set(key2, value2)
.exec();

const results = await client
.pipeline()
.hget(dictionaryName, key1)
.hget(dictionaryName, key2)
.get(key1)
.get(key2)
.exec();

expect(results).toEqual([
// IORedis defines exec response as [err, result]
[null, value1],
[null, value2],
[null, value1],
[null, value2],
]);
});
it('should be able to run with batch load commands', async () => {
const key1 = v4();
const key2 = v4();
const value1 = v4();
const value2 = v4();
const dictionaryName = v4();

await client
.pipeline([
['hset', dictionaryName, key1, value1],
['hset', dictionaryName, key2, value2],
['set', key1, value1],
['set', key2, value2],
])
.exec();

const results = await client
.pipeline([
['hget', dictionaryName, key1],
['hget', dictionaryName, key2],
['get', key1],
['get', key2],
])
.exec();

expect(results).toEqual([
// IORedis defines exec response as [err, result]
[null, value1],
[null, value2],
[null, value1],
[null, value2],
]);
});

it('you should be able to mix commands', async () => {
const key1 = v4();
const key2 = v4();
const key3 = v4();
const key4 = v4();
const value1 = v4();
const value2 = v4();
const value3 = v4();
const value4 = v4();
const dictionaryName = v4();

await client
.pipeline([
['hset', dictionaryName, key1, value1],
['hset', dictionaryName, key2, value2],
['set', key3, value3],
['set', key4, value4],
])
.hset(dictionaryName, key1, value1)
.hset(dictionaryName, key2, value2)
.set(key3, value3)
.set(key4, value4)
.exec();

const results = await client
.pipeline([
['hget', dictionaryName, key1],
['hget', dictionaryName, key2],
['get', key3],
['get', key4],
])
.hget(dictionaryName, key1)
.hget(dictionaryName, key2)
.get(key3)
.get(key4)
.exec();

expect(results).toEqual([
// IORedis defines exec response as [err, result]
[null, value1],
[null, value2],
[null, value3],
[null, value4],
[null, value1],
[null, value2],
[null, value3],
[null, value4],
]);
});
it('throws an error when using an un supported command', async () => {
try {
await client.pipeline([['UNSUPPORTED_CMD']]).exec();
} catch (err) {
if (process.env.MOMENTO_ENABLED === 'true') {
expect(err).toEqual(
new Error('Un-Supported Command Passed: UNSUPPORTED_CMD')
);
} else {
expect(err).toEqual(
new TypeError("Cannot read properties of undefined (reading 'apply')")
);
}
}
});
});

0 comments on commit 0e6d738

Please sign in to comment.