Skip to content

Commit

Permalink
redis datastructor auto created
Browse files Browse the repository at this point in the history
  • Loading branch information
zhangjunfeng committed Nov 15, 2024
1 parent d60d0a4 commit 395024f
Show file tree
Hide file tree
Showing 6 changed files with 58 additions and 51 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import org.apache.ignite.IgniteLogger;
import org.apache.ignite.IgniteQueue;
import org.apache.ignite.IgniteSet;
import org.apache.ignite.cache.CacheAtomicityMode;
import org.apache.ignite.configuration.CollectionConfiguration;
import org.apache.ignite.internal.GridKernalContext;
import org.apache.ignite.internal.IgniteInternalFuture;
Expand Down Expand Up @@ -60,6 +61,9 @@ public class GridRedisListAddCommandHandler implements GridRedisCommandHandler {

/** Kernel context. */
protected final GridKernalContext ctx;

protected CollectionConfiguration cfg = new CollectionConfiguration();


/**
* Handler constructor.
Expand All @@ -71,30 +75,26 @@ public class GridRedisListAddCommandHandler implements GridRedisCommandHandler {
public GridRedisListAddCommandHandler(IgniteLogger log, GridKernalContext ctx) {
this.log = log;
this.ctx = ctx;
cfg.setAtomicityMode(CacheAtomicityMode.TRANSACTIONAL);
}

/** {@inheritDoc} */
@Override public Collection<GridRedisCommand> supportedCommands() {
return SUPPORTED_COMMANDS;
}


}

@Override
public IgniteInternalFuture<GridRedisMessage> handleAsync(GridNioSession ses, GridRedisMessage msg) {
assert msg != null;

if (msg.messageSize() < 3) {
msg.setResponse(GridRedisProtocolParser.toGenericError("Wrong number of arguments"));
return new GridFinishedFuture<>(msg);
// throw new GridRedisGenericException("Wrong number of arguments");
return new GridFinishedFuture<>(msg);
}

GridRedisCommand cmd = msg.command();

String queueName = msg.cacheName()+"-"+msg.key();
CollectionConfiguration cfg = new CollectionConfiguration();
cfg.setBackups(1);
String queueName = msg.cacheName()+"-"+msg.key();

if(cmd == LPUSH || cmd == LPUSHX) {
IgniteQueue<String> list = ctx.grid().queue(queueName,0,cfg);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@
import org.apache.ignite.IgniteLogger;
import org.apache.ignite.IgniteQueue;
import org.apache.ignite.IgniteSet;
import org.apache.ignite.cache.CacheAtomicityMode;
import org.apache.ignite.configuration.CollectionConfiguration;
import org.apache.ignite.internal.GridKernalContext;
import org.apache.ignite.internal.IgniteInternalFuture;
import org.apache.ignite.internal.processors.rest.handlers.redis.GridRedisCommandHandler;
Expand Down Expand Up @@ -55,6 +57,8 @@ public class GridRedisListPopCommandHandler implements GridRedisCommandHandler {

/** Kernel context. */
protected final GridKernalContext ctx;

protected CollectionConfiguration cfg = new CollectionConfiguration();

/**
* Handler constructor.
Expand All @@ -66,6 +70,7 @@ public class GridRedisListPopCommandHandler implements GridRedisCommandHandler {
public GridRedisListPopCommandHandler(IgniteLogger log, GridKernalContext ctx) {
this.log = log;
this.ctx = ctx;
cfg.setAtomicityMode(CacheAtomicityMode.TRANSACTIONAL);
}

/** {@inheritDoc} */
Expand All @@ -77,20 +82,14 @@ public GridRedisListPopCommandHandler(IgniteLogger log, GridKernalContext ctx) {

@Override
public IgniteInternalFuture<GridRedisMessage> handleAsync(GridNioSession ses, GridRedisMessage msg) {
assert msg != null;

if (msg.messageSize() < 3) {
msg.setResponse(GridRedisProtocolParser.toGenericError("Wrong number of arguments"));
return new GridFinishedFuture<>(msg);
// throw new GridRedisGenericException("Wrong number of arguments");
}
assert msg != null;

GridRedisCommand cmd = msg.command();

String queueName = msg.cacheName()+"-"+msg.key();
String value = null;
if(cmd == LPOP || cmd == BLPOP) {
IgniteQueue<String> list = ctx.grid().queue(queueName,0,null);
IgniteQueue<String> list = ctx.grid().queue(queueName,0,cfg);
value = list.poll();
if(value==null && cmd == BLPOP) {
value = list.take();
Expand All @@ -101,13 +100,13 @@ else if(cmd == RPOP || cmd == BRPOP) {
throw new UnsupportedOperationException("RPOP or BRPOP not supported for ignite queue!");
}
else if(cmd == SPOP) {
IgniteSet<String> list = ctx.grid().set(queueName, null);
IgniteSet<String> list = ctx.grid().set(queueName, cfg);
if(!list.isEmpty()) {
value = list.iterator().next();
}
}
else if(cmd == ZPOPMAX) {
IgniteSet<ScoredItem<String>> list = ctx.grid().set(queueName,null);
IgniteSet<ScoredItem<String>> list = ctx.grid().set(queueName,cfg);

double max = Double.MIN_VALUE;
for(ScoredItem<String> item: list) {
Expand All @@ -120,7 +119,7 @@ else if(cmd == ZPOPMAX) {
list.remove(new ScoredItem<String>(value,max));
}
else if(cmd == ZPOPMIN) {
IgniteSet<ScoredItem<String>> list = ctx.grid().set(queueName,null);
IgniteSet<ScoredItem<String>> list = ctx.grid().set(queueName,cfg);

double min = Double.MAX_VALUE;
for(ScoredItem<String> item: list) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@
import org.apache.ignite.IgniteLogger;
import org.apache.ignite.IgniteQueue;
import org.apache.ignite.IgniteSet;
import org.apache.ignite.cache.CacheAtomicityMode;
import org.apache.ignite.configuration.CollectionConfiguration;
import org.apache.ignite.internal.GridKernalContext;
import org.apache.ignite.internal.IgniteInternalFuture;
import org.apache.ignite.internal.processors.rest.handlers.redis.GridRedisCommandHandler;
Expand Down Expand Up @@ -53,6 +55,8 @@ public class GridRedisListRemCommandHandler implements GridRedisCommandHandler {

/** Kernel context. */
protected final GridKernalContext ctx;

protected CollectionConfiguration cfg = new CollectionConfiguration();

/**
* Handler constructor.
Expand All @@ -64,6 +68,7 @@ public class GridRedisListRemCommandHandler implements GridRedisCommandHandler {
public GridRedisListRemCommandHandler(IgniteLogger log, GridKernalContext ctx) {
this.log = log;
this.ctx = ctx;
cfg.setAtomicityMode(CacheAtomicityMode.TRANSACTIONAL);
}

/** {@inheritDoc} */
Expand All @@ -77,8 +82,7 @@ public IgniteInternalFuture<GridRedisMessage> handleAsync(GridNioSession ses, Gr

if (msg.messageSize() < 3) {
msg.setResponse(GridRedisProtocolParser.toGenericError("Wrong number of arguments"));
return new GridFinishedFuture<>(msg);
// throw new GridRedisGenericException("Wrong number of arguments");
return new GridFinishedFuture<>(msg);
}

GridRedisCommand cmd = msg.command();
Expand All @@ -88,7 +92,7 @@ public IgniteInternalFuture<GridRedisMessage> handleAsync(GridNioSession ses, Gr
if(cmd == LSET) {
int pos = Integer.parseInt(msg.aux(2));
String value = msg.aux(3);
IgniteQueue<String> list = ctx.grid().queue(queueName,0,null);
IgniteQueue<String> list = ctx.grid().queue(queueName,0,cfg);
if(pos<0) {
pos = list.size() + pos;
}
Expand All @@ -108,7 +112,7 @@ public IgniteInternalFuture<GridRedisMessage> handleAsync(GridNioSession ses, Gr
else if(cmd == LREM) {
int count = Integer.parseInt(msg.aux(2));
String value = msg.aux(3);
IgniteQueue<String> list = ctx.grid().queue(queueName,0,null);
IgniteQueue<String> list = ctx.grid().queue(queueName,0,cfg);
Iterator<String> it = list.iterator();
int n = 0;
while(it.hasNext()) {
Expand All @@ -124,7 +128,7 @@ else if(cmd == LREM) {
}
else if(cmd == SREM) {
List<String> keys = msg.aux();
IgniteSet<String> list = ctx.grid().set(queueName, null);
IgniteSet<String> list = ctx.grid().set(queueName, cfg);
Iterator<String> it = list.iterator();
int n = 0;
while(it.hasNext()) {
Expand All @@ -138,7 +142,7 @@ else if(cmd == SREM) {
}
else if(cmd == ZREM) {
List<String> keys = msg.aux();
IgniteSet<ScoredItem<String>> list = ctx.grid().set(queueName,null);
IgniteSet<ScoredItem<String>> list = ctx.grid().set(queueName,cfg);
Iterator<ScoredItem<String>> it = list.iterator();
int n = 0;
while(it.hasNext()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@
import org.apache.ignite.IgniteLogger;
import org.apache.ignite.IgniteQueue;
import org.apache.ignite.IgniteSet;
import org.apache.ignite.cache.CacheAtomicityMode;
import org.apache.ignite.configuration.CollectionConfiguration;
import org.apache.ignite.internal.GridKernalContext;
import org.apache.ignite.internal.IgniteInternalFuture;
import org.apache.ignite.internal.processors.rest.handlers.redis.GridRedisCommandHandler;
Expand Down Expand Up @@ -59,6 +61,8 @@ public class GridRedisListsCommandHandler implements GridRedisCommandHandler {

/** Kernel context. */
protected final GridKernalContext ctx;

protected CollectionConfiguration cfg = new CollectionConfiguration();

/**
* Handler constructor.
Expand All @@ -70,6 +74,7 @@ public class GridRedisListsCommandHandler implements GridRedisCommandHandler {
public GridRedisListsCommandHandler(IgniteLogger log, GridKernalContext ctx) {
this.log = log;
this.ctx = ctx;
cfg.setAtomicityMode(CacheAtomicityMode.TRANSACTIONAL);
}

/** {@inheritDoc} */
Expand All @@ -82,17 +87,12 @@ public GridRedisListsCommandHandler(IgniteLogger log, GridKernalContext ctx) {
@Override
public IgniteInternalFuture<GridRedisMessage> handleAsync(GridNioSession ses, GridRedisMessage msg) {
assert msg != null;

if (msg.messageSize() < 3) {
msg.setResponse(GridRedisProtocolParser.toGenericError("Wrong number of arguments"));
return new GridFinishedFuture<>(msg);
// throw new GridRedisGenericException("Wrong number of arguments");
}

GridRedisCommand cmd = msg.command();

String queueName = msg.cacheName()+"-"+msg.key();
IgniteQueue<String> list = ctx.grid().queue(queueName,0,null);
IgniteQueue<String> list = ctx.grid().queue(queueName,0,cfg);

if(cmd == LPOS) {
String value = msg.aux(2);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,8 @@
import org.apache.ignite.IgniteLogger;
import org.apache.ignite.IgniteQueue;
import org.apache.ignite.IgniteSet;
import org.apache.ignite.cache.CacheAtomicityMode;
import org.apache.ignite.configuration.CollectionConfiguration;
import org.apache.ignite.internal.GridKernalContext;
import org.apache.ignite.internal.IgniteInternalFuture;
import org.apache.ignite.internal.processors.rest.handlers.redis.GridRedisCommandHandler;
Expand Down Expand Up @@ -62,6 +64,8 @@ public class GridRedisSetsCommandHandler implements GridRedisCommandHandler {

/** Kernel context. */
protected final GridKernalContext ctx;

protected CollectionConfiguration cfg = new CollectionConfiguration();

/**
* Handler constructor.
Expand All @@ -73,6 +77,7 @@ public class GridRedisSetsCommandHandler implements GridRedisCommandHandler {
public GridRedisSetsCommandHandler(IgniteLogger log, GridKernalContext ctx) {
this.log = log;
this.ctx = ctx;
cfg.setAtomicityMode(CacheAtomicityMode.TRANSACTIONAL);
}

/** {@inheritDoc} */
Expand All @@ -85,17 +90,11 @@ public GridRedisSetsCommandHandler(IgniteLogger log, GridKernalContext ctx) {
@Override
public IgniteInternalFuture<GridRedisMessage> handleAsync(GridNioSession ses, GridRedisMessage msg) {
assert msg != null;

if (msg.messageSize() < 3) {
msg.setResponse(GridRedisProtocolParser.toGenericError("Wrong number of arguments"));
return new GridFinishedFuture<>(msg);
// throw new GridRedisGenericException("Wrong number of arguments");
}

GridRedisCommand cmd = msg.command();

String queueName = msg.cacheName()+"-"+msg.key();
IgniteSet<String> list = ctx.grid().set(queueName, null);
IgniteSet<String> list = ctx.grid().set(queueName, cfg);

if(cmd == SISMEMBER) {
String query = msg.aux(2);
Expand Down Expand Up @@ -156,7 +155,7 @@ else if(cmd == SDIFF) {
HashSet<String> result = new HashSet<>(list);
for(String key2: othersKeys) {
String queueName2 = msg.cacheName()+"-"+key2;
IgniteSet<String> list2 = ctx.grid().set(queueName2, null);
IgniteSet<String> list2 = ctx.grid().set(queueName2, cfg);
if(list2!=null) {
result.removeAll(list2);
}
Expand All @@ -168,15 +167,15 @@ else if(cmd == SINTER) {
HashSet<String> result = new HashSet<>(list);
for(String key2: othersKeys) {
String queueName2 = msg.cacheName()+"-"+key2;
IgniteSet<String> list2 = ctx.grid().set(queueName2, null);
IgniteSet<String> list2 = ctx.grid().set(queueName2, cfg);
if(list2!=null) {
result.retainAll(list2);
}
}
msg.setResponse(GridRedisProtocolParser.toArray(result));
}
else if(cmd == SCARD) {
msg.setResponse(GridRedisProtocolParser.toInteger(list.size()));
msg.setResponse(GridRedisProtocolParser.toInteger(list.size()));
}

return new GridFinishedFuture<>(msg);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,8 @@

import org.apache.ignite.IgniteLogger;
import org.apache.ignite.IgniteSet;
import org.apache.ignite.cache.CacheAtomicityMode;
import org.apache.ignite.configuration.CollectionConfiguration;
import org.apache.ignite.internal.GridKernalContext;
import org.apache.ignite.internal.IgniteInternalFuture;
import org.apache.ignite.internal.processors.rest.handlers.redis.GridRedisCommandHandler;
Expand Down Expand Up @@ -62,6 +64,8 @@ public class GridRedisSortedSetsCommandHandler implements GridRedisCommandHandle

/** Kernel context. */
protected final GridKernalContext ctx;

protected CollectionConfiguration cfg = new CollectionConfiguration();

/**
* Handler constructor.
Expand All @@ -73,6 +77,7 @@ public class GridRedisSortedSetsCommandHandler implements GridRedisCommandHandle
public GridRedisSortedSetsCommandHandler(IgniteLogger log, GridKernalContext ctx) {
this.log = log;
this.ctx = ctx;
cfg.setAtomicityMode(CacheAtomicityMode.TRANSACTIONAL);
}

/** {@inheritDoc} */
Expand All @@ -85,17 +90,12 @@ public GridRedisSortedSetsCommandHandler(IgniteLogger log, GridKernalContext ctx
@Override
public IgniteInternalFuture<GridRedisMessage> handleAsync(GridNioSession ses, GridRedisMessage msg) {
assert msg != null;

if (msg.messageSize() < 3) {
msg.setResponse(GridRedisProtocolParser.toGenericError("Wrong number of arguments"));
return new GridFinishedFuture<>(msg);
// throw new GridRedisGenericException("Wrong number of arguments");
}

GridRedisCommand cmd = msg.command();

String queueName = msg.cacheName()+"-"+msg.key();
IgniteSet<ScoredItem<String>> list = ctx.grid().set(queueName,null);
String queueName = msg.cacheName()+"-"+msg.key();

IgniteSet<ScoredItem<String>> list = ctx.grid().set(queueName,cfg);


if(cmd==ZRANK || cmd==ZREVRANK) {
Expand Down Expand Up @@ -227,7 +227,12 @@ else if(cmd == ZSCAN) {
msg.setResponse(GridRedisProtocolParser.toArray(List.of(n,result)));
}
else if(cmd == ZCARD) {
msg.setResponse(GridRedisProtocolParser.toInteger(list.size()));
if(list==null) {
msg.setResponse(GridRedisProtocolParser.toInteger(0));
}
else {
msg.setResponse(GridRedisProtocolParser.toInteger(list.size()));
}
}
return new GridFinishedFuture<>(msg);
}
Expand Down

0 comments on commit 395024f

Please sign in to comment.