Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

event loop thread gets blocked during disconnection/reconnection #2937

Open
leisurelyrcxf opened this issue Aug 1, 2024 · 3 comments
Open

Comments

@leisurelyrcxf
Copy link

leisurelyrcxf commented Aug 1, 2024

Bug Report

Current Behavior

event loop thread gets blocked during disconnection/reconnection

Input Code

public class MultiThreadSyncGet {

    private static final InternalLogger logger = InternalLoggerFactory.getInstance(MultiThreadSyncGet.class);

    private static final int LOOP_NUM = 10_000_000;

    private static final int BATCH_SIZE = 1000;

    private static final int DIGIT_NUM = 9;

    private static final String KEY_FORMATTER = String.format("key-%%0%dd", DIGIT_NUM);

    static {
        // noinspection ConstantValue
        LettuceAssert.assertState(DIGIT_NUM >= String.valueOf(LOOP_NUM).length() + 1, "digit num is not large enough");
    }

    void test() {
        RedisURI uri = RedisURI.create("redis.dev-d-okex.svc.dev.local", 6379);
        uri.setCredentialsProvider(new StaticCredentialsProvider(null, "123qweasd!@#".toCharArray()));
        try (RedisClient redisClient = RedisClient.create(uri)) {
            final ClientOptions.Builder optsBuilder = ClientOptions.builder()
                    .timeoutOptions(TimeoutOptions.builder().fixedTimeout(Duration.ofSeconds(60)).build());
            redisClient.setOptions(optsBuilder.build());
            final StatefulRedisConnection<byte[], byte[]> connection = redisClient.connect(ByteArrayCodec.INSTANCE);
            connection.setAutoFlushCommands(false);

            new Thread(() -> {
                try {
                    Thread.sleep(500);
                } catch (InterruptedException e) {
                    logger.error("interrupted", e);
                }
                connection.close();
            }).start();

            for (int j = 0; j < LOOP_NUM && connection.isOpen(); j++) {
                for (int i = 0; i < BATCH_SIZE; i++) {
                    connection.async().get(genKey(j));
                }
                connection.flushCommands();
            }
        }
    }

    private byte[] genKey(int j) {
        return String.format(KEY_FORMATTER, j).getBytes();
    }

    public static void main(String[] args) {
        new MultiThreadSyncGet().test();
        logger.info("=====================================");
    }

}

I manipulate network using MacOS's Network Link Conditioner, set Downlink Delay to 1000ms, then set debug breakpoint at stack.remove(command)

 public void operationComplete(Future<Void> future) {

            try {
                if (!future.isSuccess()) {
                    stack.remove(command);
                }
            } finally {
                recycle();
            }
        }

Then evaluate

new ArrayList<>(stack).indexOf(command)

got 3971.

evaluate

stack.size()

got 85000.

we can conclude the compute complexity to remove all the failed-to-flush commands is O(3971*(85000-3971)), which is super big. If there are more successfully flushed commands before the first failed-to-flush command, the complexity could be even higher.

Expected behavior/code

Thread not blocked

Environment

  • Lettuce version(s): git_sha_dee8020d92e6bff562cef723dcacdea714b89982
  • Redis version: 7.0.0

Possible Solution

Use HashIndexedQueue instead which provides O(1) complexity to remove an element from the queue.
This is due to ArrayDeque#remove is O(n), if there are lots of tasks failing at AddToStask#operationComplete, then the event loop thread may get blocked for too long

Additional context

problematic code:

    static class AddToStack implements GenericFutureListener<Future<Void>> {
       ...

        AddToStack(Recycler.Handle<AddToStack> handle) {
            this.handle = handle;
        }

        @SuppressWarnings("unchecked")
        @Override
        public void operationComplete(Future<Void> future) {

            try {
                if (!future.isSuccess()) {
                    stack.remove(command);
                }
            } finally {
                recycle();
            }
        }
}
@tishun
Copy link
Collaborator

tishun commented Aug 5, 2024

@leisurelyrcxf could you help me understand better the scenario you are addressing?
If you could provide a stack trace that explains it it would be perfect.

@tishun tishun added status: waiting-for-feedback We need additional information before we can continue and removed for: team-attention An issue we need to discuss as a team to make progress labels Aug 5, 2024
@leisurelyrcxf
Copy link
Author

leisurelyrcxf commented Aug 6, 2024

hi @tishun thx for the reply!

I updated the description (see the Details section) and stably reproduce method. Here is the callstack

operationComplete:1065, CommandHandler$AddToStack (io.lettuce.core.protocol)
notifyListener0:590, DefaultPromise (io.netty.util.concurrent)
notifyListeners0:583, DefaultPromise (io.netty.util.concurrent)
notifyListenersNow:559, DefaultPromise (io.netty.util.concurrent)
notifyListeners:492, DefaultPromise (io.netty.util.concurrent)
setValue0:636, DefaultPromise (io.netty.util.concurrent)
setFailure0:629, DefaultPromise (io.netty.util.concurrent)
tryFailure:118, DefaultPromise (io.netty.util.concurrent)
tryFailure:64, PromiseNotificationUtil (io.netty.util.internal)
safeFail:754, ChannelOutboundBuffer (io.netty.channel)
remove0:339, ChannelOutboundBuffer (io.netty.channel)
failFlushed:691, ChannelOutboundBuffer (io.netty.channel)
close:735, AbstractChannel$AbstractUnsafe (io.netty.channel)
close:620, AbstractChannel$AbstractUnsafe (io.netty.channel)
close:1352, DefaultChannelPipeline$HeadContext (io.netty.channel)
invokeClose:755, AbstractChannelHandlerContext (io.netty.channel)
access$1200:61, AbstractChannelHandlerContext (io.netty.channel)
run:738, AbstractChannelHandlerContext$11 (io.netty.channel)
runTask$$$capture:173, AbstractEventExecutor (io.netty.util.concurrent)
runTask:-1, AbstractEventExecutor (io.netty.util.concurrent)
 - Async stack trace
addTask:-1, SingleThreadEventExecutor (io.netty.util.concurrent)
execute:836, SingleThreadEventExecutor (io.netty.util.concurrent)
execute0:827, SingleThreadEventExecutor (io.netty.util.concurrent)
execute:817, SingleThreadEventExecutor (io.netty.util.concurrent)
safeExecute:1181, AbstractChannelHandlerContext (io.netty.channel)
close:735, AbstractChannelHandlerContext (io.netty.channel)
close:560, AbstractChannelHandlerContext (io.netty.channel)
close:957, DefaultChannelPipeline (io.netty.channel)
close:244, AbstractChannel (io.netty.channel)
closeAsync:606, DefaultEndpoint (io.lettuce.core.protocol)
closeAsync:152, CommandExpiryWriter (io.lettuce.core.protocol)
closeAsync:164, RedisChannelHandler (io.lettuce.core)
close:142, RedisChannelHandler (io.lettuce.core)
lambda$test$0:52, MultiThreadSyncGet (bench)
run:840, Thread (java.lang)

@tishun tishun added status: waiting-for-triage and removed status: waiting-for-feedback We need additional information before we can continue labels Aug 7, 2024
@tishun
Copy link
Collaborator

tishun commented Aug 7, 2024

Awesome, thanks for clarifying, let me get back to you after I spend some time thinking about this.
Specific implementation aside this seems to be a meaningful thing to fix.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

No branches or pull requests

2 participants