diff --git a/common/src/main/java/org/apache/seata/common/ConfigurationKeys.java b/common/src/main/java/org/apache/seata/common/ConfigurationKeys.java index ef7304b1623..7dc1ec8d57e 100644 --- a/common/src/main/java/org/apache/seata/common/ConfigurationKeys.java +++ b/common/src/main/java/org/apache/seata/common/ConfigurationKeys.java @@ -1059,4 +1059,29 @@ public interface ConfigurationKeys { * The constant META_PREFIX */ String META_PREFIX = SEATA_FILE_ROOT_CONFIG + FILE_CONFIG_SPLIT_CHAR + FILE_ROOT_REGISTRY + FILE_CONFIG_SPLIT_CHAR + "metadata."; + + /** + * The constant RATE_LIMIT_PREFIX. + */ + String RATE_LIMIT_PREFIX = SERVER_PREFIX + "ratelimit."; + + /** + * The constant RATE_LIMIT_BUCKET_TOKEN_SECOND_NUM. + */ + String RATE_LIMIT_BUCKET_TOKEN_SECOND_NUM = RATE_LIMIT_PREFIX + "bucketTokenSecondNum"; + + /** + * The constant RATE_LIMIT_ENABLE. + */ + String RATE_LIMIT_ENABLE = RATE_LIMIT_PREFIX + "enable"; + + /** + * The constant RATE_LIMIT_BUCKET_TOKEN_MAX_NUM. + */ + String RATE_LIMIT_BUCKET_TOKEN_MAX_NUM = RATE_LIMIT_PREFIX + "bucketTokenMaxNum"; + + /** + * The constant RATE_LIMIT_BUCKET_TOKEN_INITIAL_NUM. + */ + String RATE_LIMIT_BUCKET_TOKEN_INITIAL_NUM = RATE_LIMIT_PREFIX + "bucketTokenInitialNum"; } diff --git a/compatible/src/main/java/io/seata/tm/api/DefaultFailureHandlerImpl.java b/compatible/src/main/java/io/seata/tm/api/DefaultFailureHandlerImpl.java index f56ad5cad43..dd2279d0ec4 100644 --- a/compatible/src/main/java/io/seata/tm/api/DefaultFailureHandlerImpl.java +++ b/compatible/src/main/java/io/seata/tm/api/DefaultFailureHandlerImpl.java @@ -16,8 +16,6 @@ */ package io.seata.tm.api; -import java.util.concurrent.TimeUnit; - import io.netty.util.HashedWheelTimer; import io.netty.util.Timeout; import io.netty.util.TimerTask; @@ -28,6 +26,8 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.util.concurrent.TimeUnit; + /** * The type Default failure handler. */ @@ -77,6 +77,11 @@ public void onRollbacking(GlobalTransaction tx, Throwable originalException) { SCHEDULE_INTERVAL_SECONDS, TimeUnit.SECONDS); } + @Override + public void onBeginRateLimitedFailure(org.apache.seata.tm.api.GlobalTransaction globalTransaction, Throwable cause) { + LOGGER.warn("Failed to begin transaction due to RateLimit. ", cause); + } + protected class CheckTimerTask implements TimerTask { private final GlobalTransaction tx; diff --git a/core/src/main/java/org/apache/seata/core/event/RateLimitEvent.java b/core/src/main/java/org/apache/seata/core/event/RateLimitEvent.java new file mode 100644 index 00000000000..f11a7158506 --- /dev/null +++ b/core/src/main/java/org/apache/seata/core/event/RateLimitEvent.java @@ -0,0 +1,104 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.seata.core.event; + +public class RateLimitEvent implements Event { + + /** + * The Trace id. + */ + private String traceId; + + /** + * The Limit type (like GlobalBeginFailed). + */ + private String limitType; + + /** + * The Application id. + */ + private String applicationId; + + /** + * The Client id. + */ + private String clientId; + + /** + * The Server ip address and port. + */ + private String serverIpAddressAndPort; + + public String getTraceId() { + return traceId; + } + + public void setTraceId(String traceId) { + this.traceId = traceId; + } + + public String getLimitType() { + return limitType; + } + + public void setLimitType(String limitType) { + this.limitType = limitType; + } + + public String getApplicationId() { + return applicationId; + } + + public void setApplicationId(String applicationId) { + this.applicationId = applicationId; + } + + public String getClientId() { + return clientId; + } + + public void setClientId(String clientId) { + this.clientId = clientId; + } + + public String getServerIpAddressAndPort() { + return serverIpAddressAndPort; + } + + public void setServerIpAddressAndPort(String serverIpAddressAndPort) { + this.serverIpAddressAndPort = serverIpAddressAndPort; + } + + public RateLimitEvent(String traceId, String limitType, String applicationId, String clientId, String serverIpAddressAndPort) { + this.traceId = traceId; + this.limitType = limitType; + this.applicationId = applicationId; + this.clientId = clientId; + this.serverIpAddressAndPort = serverIpAddressAndPort; + } + + @Override + public String toString() { + return "RateLimitEvent{" + + "traceId='" + traceId + '\'' + + ", limitType='" + limitType + '\'' + + ", applicationId='" + applicationId + '\'' + + ", clientId='" + clientId + '\'' + + ", serverIpAddressAndPort='" + serverIpAddressAndPort + '\'' + + '}'; + } +} diff --git a/core/src/main/java/org/apache/seata/core/exception/TransactionExceptionCode.java b/core/src/main/java/org/apache/seata/core/exception/TransactionExceptionCode.java index 16a2e899dc6..0bbb1b3108c 100644 --- a/core/src/main/java/org/apache/seata/core/exception/TransactionExceptionCode.java +++ b/core/src/main/java/org/apache/seata/core/exception/TransactionExceptionCode.java @@ -31,7 +31,6 @@ public enum TransactionExceptionCode { * BeginFailed */ BeginFailed, - /** * Lock key conflict transaction exception code. */ @@ -140,7 +139,12 @@ public enum TransactionExceptionCode { /** * Broken transaction exception code. */ - Broken; + Broken, + + /** + * BeginFailedRateLimited + */ + BeginFailedRateLimited; /** diff --git a/core/src/main/java/org/apache/seata/core/protocol/ResultCode.java b/core/src/main/java/org/apache/seata/core/protocol/ResultCode.java index d3338eb7f10..f3582286d35 100644 --- a/core/src/main/java/org/apache/seata/core/protocol/ResultCode.java +++ b/core/src/main/java/org/apache/seata/core/protocol/ResultCode.java @@ -32,7 +32,12 @@ public enum ResultCode { * Success result code. */ // Success - Success; + Success, + + /** + * Rate limited result code. + */ + RateLimited; /** * Get result code. diff --git a/core/src/test/java/org/apache/seata/core/protocol/ResultCodeTest.java b/core/src/test/java/org/apache/seata/core/protocol/ResultCodeTest.java index b1c7ef3e741..2fc2d7f89e1 100644 --- a/core/src/test/java/org/apache/seata/core/protocol/ResultCodeTest.java +++ b/core/src/test/java/org/apache/seata/core/protocol/ResultCodeTest.java @@ -29,8 +29,9 @@ class ResultCodeTest { void getByte() { Assertions.assertEquals(ResultCode.Failed, ResultCode.get((byte) 0)); Assertions.assertEquals(ResultCode.Success, ResultCode.get((byte) 1)); + Assertions.assertEquals(ResultCode.RateLimited, ResultCode.get((byte) 2)); Assertions.assertThrows(IllegalArgumentException.class, () -> { - ResultCode.get((byte) 2); + ResultCode.get((byte) 3); }); } @@ -38,14 +39,16 @@ void getByte() { void getInt() { Assertions.assertEquals(ResultCode.Failed, ResultCode.get(0)); Assertions.assertEquals(ResultCode.Success, ResultCode.get(1)); + Assertions.assertEquals(ResultCode.RateLimited, ResultCode.get(2)); Assertions.assertThrows(IllegalArgumentException.class, () -> { - ResultCode.get(2); + ResultCode.get(3); }); } @Test void values() { - Assertions.assertArrayEquals(new ResultCode[]{ResultCode.Failed, ResultCode.Success}, ResultCode.values()); + Assertions.assertArrayEquals(new ResultCode[]{ResultCode.Failed, ResultCode.Success, ResultCode.RateLimited}, + ResultCode.values()); } @Test diff --git a/dependencies/pom.xml b/dependencies/pom.xml index 6b93148e613..0505059763b 100644 --- a/dependencies/pom.xml +++ b/dependencies/pom.xml @@ -68,6 +68,7 @@ 6.3.0 1.0.0 1.82 + 8.1.0 1.21 1.10.12 1.7.1 @@ -612,6 +613,11 @@ ${jcommander.version} + + com.bucket4j + bucket4j_jdk8-core + ${bucket4j.version} + io.grpc grpc-testing diff --git a/integration-tx-api/src/main/java/org/apache/seata/integration/tx/api/interceptor/handler/GlobalTransactionalInterceptorHandler.java b/integration-tx-api/src/main/java/org/apache/seata/integration/tx/api/interceptor/handler/GlobalTransactionalInterceptorHandler.java index 288bd5dd782..4fa53248d7b 100644 --- a/integration-tx-api/src/main/java/org/apache/seata/integration/tx/api/interceptor/handler/GlobalTransactionalInterceptorHandler.java +++ b/integration-tx-api/src/main/java/org/apache/seata/integration/tx/api/interceptor/handler/GlobalTransactionalInterceptorHandler.java @@ -16,13 +16,6 @@ */ package org.apache.seata.integration.tx.api.interceptor.handler; -import java.lang.reflect.Method; -import java.util.LinkedHashSet; -import java.util.Set; -import java.util.concurrent.ScheduledThreadPoolExecutor; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicBoolean; - import com.google.common.eventbus.Subscribe; import org.apache.seata.common.exception.ShouldNeverHappenException; import org.apache.seata.common.thread.NamedThreadFactory; @@ -59,6 +52,13 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.lang.reflect.Method; +import java.util.LinkedHashSet; +import java.util.Set; +import java.util.concurrent.ScheduledThreadPoolExecutor; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; + import static org.apache.seata.common.DefaultValues.DEFAULT_DISABLE_GLOBAL_TRANSACTION; import static org.apache.seata.common.DefaultValues.DEFAULT_GLOBAL_TRANSACTION_TIMEOUT; import static org.apache.seata.common.DefaultValues.DEFAULT_TM_DEGRADE_CHECK; @@ -248,6 +248,10 @@ public TransactionInfo getTransactionInfo() { succeed = false; failureHandler.onBeginFailure(globalTransaction, cause); throw cause; + case BeginFailedRateLimited: + succeed = false; + failureHandler.onBeginRateLimitedFailure(globalTransaction, cause); + throw cause; case CommitFailure: succeed = false; failureHandler.onCommitFailure(globalTransaction, cause); diff --git a/metrics/seata-metrics-api/src/main/java/org/apache/seata/metrics/IdConstants.java b/metrics/seata-metrics-api/src/main/java/org/apache/seata/metrics/IdConstants.java index 507bafafee6..bbc8d532cbd 100644 --- a/metrics/seata-metrics-api/src/main/java/org/apache/seata/metrics/IdConstants.java +++ b/metrics/seata-metrics-api/src/main/java/org/apache/seata/metrics/IdConstants.java @@ -25,6 +25,8 @@ public interface IdConstants { String SEATA_EXCEPTION = "seata.exception"; + String SEATA_RATE_LIMIT = "seata.rate.limit"; + String APP_ID_KEY = "applicationId"; String GROUP_KEY = "group"; @@ -79,4 +81,9 @@ public interface IdConstants { String STATUS_VALUE_AFTER_ROLLBACKED_KEY = "AfterRollbacked"; + String LIMIT_TYPE_KEY = "limitType"; + + String CLIENT_ID_KEY = "clientId"; + + String SERVER_IP_ADDRESS_AND_PORT_KEY = "serverIpAddressAndPort"; } diff --git a/saga/seata-saga-spring/src/main/java/org/apache/seata/saga/engine/tm/DefaultSagaTransactionalTemplate.java b/saga/seata-saga-spring/src/main/java/org/apache/seata/saga/engine/tm/DefaultSagaTransactionalTemplate.java index 869f38bf1a6..30bb2321a99 100644 --- a/saga/seata-saga-spring/src/main/java/org/apache/seata/saga/engine/tm/DefaultSagaTransactionalTemplate.java +++ b/saga/seata-saga-spring/src/main/java/org/apache/seata/saga/engine/tm/DefaultSagaTransactionalTemplate.java @@ -16,15 +16,15 @@ */ package org.apache.seata.saga.engine.tm; -import java.util.List; - import org.apache.seata.common.exception.FrameworkErrorCode; +import org.apache.seata.common.util.StringUtils; import org.apache.seata.core.exception.TransactionException; +import org.apache.seata.core.exception.TransactionExceptionCode; import org.apache.seata.core.model.BranchStatus; import org.apache.seata.core.model.BranchType; import org.apache.seata.core.model.GlobalStatus; -import org.apache.seata.core.rpc.netty.RmNettyRemotingClient; import org.apache.seata.core.rpc.ShutdownHook; +import org.apache.seata.core.rpc.netty.RmNettyRemotingClient; import org.apache.seata.core.rpc.netty.TmNettyRemotingClient; import org.apache.seata.rm.DefaultResourceManager; import org.apache.seata.rm.RMClient; @@ -39,7 +39,6 @@ import org.apache.seata.tm.api.transaction.TransactionHook; import org.apache.seata.tm.api.transaction.TransactionHookManager; import org.apache.seata.tm.api.transaction.TransactionInfo; -import org.apache.seata.common.util.StringUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.BeansException; @@ -49,6 +48,8 @@ import org.springframework.context.ApplicationContextAware; import org.springframework.context.ConfigurableApplicationContext; +import java.util.List; + /** * Template of executing business logic with a global transaction for SAGA mode */ @@ -92,6 +93,10 @@ public GlobalTransaction beginTransaction(TransactionInfo txInfo) throws Transac tx.begin(txInfo.getTimeOut(), txInfo.getName()); triggerAfterBegin(tx); } catch (TransactionException txe) { + if (TransactionExceptionCode.BeginFailedRateLimited.equals(txe.getCode())) { + throw new TransactionalExecutor.ExecutionException(tx, txe, + TransactionalExecutor.Code.BeginFailedRateLimited); + } throw new TransactionalExecutor.ExecutionException(tx, txe, TransactionalExecutor.Code.BeginFailure); } diff --git a/seata-spring-autoconfigure/seata-spring-autoconfigure-core/src/main/java/org/apache/seata/spring/boot/autoconfigure/StarterConstants.java b/seata-spring-autoconfigure/seata-spring-autoconfigure-core/src/main/java/org/apache/seata/spring/boot/autoconfigure/StarterConstants.java index 5ec088d43ff..166123ee4fc 100644 --- a/seata-spring-autoconfigure/seata-spring-autoconfigure-core/src/main/java/org/apache/seata/spring/boot/autoconfigure/StarterConstants.java +++ b/seata-spring-autoconfigure/seata-spring-autoconfigure-core/src/main/java/org/apache/seata/spring/boot/autoconfigure/StarterConstants.java @@ -69,6 +69,7 @@ public interface StarterConstants { String SERVER_PREFIX = SEATA_PREFIX + ".server"; + String SERVER_RATELIMIT_PREFIX = SERVER_PREFIX + ".ratelimit"; String SERVER_UNDO_PREFIX = SERVER_PREFIX + ".undo"; String SERVER_RAFT_PREFIX = SERVER_PREFIX + ".raft"; String SERVER_RECOVERY_PREFIX = SERVER_PREFIX + ".recovery"; diff --git a/seata-spring-autoconfigure/seata-spring-autoconfigure-server/src/main/java/org/apache/seata/spring/boot/autoconfigure/SeataServerEnvironmentPostProcessor.java b/seata-spring-autoconfigure/seata-spring-autoconfigure-server/src/main/java/org/apache/seata/spring/boot/autoconfigure/SeataServerEnvironmentPostProcessor.java index 9749a003d81..ff459bf9190 100644 --- a/seata-spring-autoconfigure/seata-spring-autoconfigure-server/src/main/java/org/apache/seata/spring/boot/autoconfigure/SeataServerEnvironmentPostProcessor.java +++ b/seata-spring-autoconfigure/seata-spring-autoconfigure-server/src/main/java/org/apache/seata/spring/boot/autoconfigure/SeataServerEnvironmentPostProcessor.java @@ -17,6 +17,8 @@ package org.apache.seata.spring.boot.autoconfigure; import java.util.concurrent.atomic.AtomicBoolean; + +import org.apache.seata.spring.boot.autoconfigure.properties.server.ServerRateLimitProperties; import org.apache.seata.spring.boot.autoconfigure.properties.server.store.StoreProperties; import org.apache.seata.spring.boot.autoconfigure.properties.server.MetricsProperties; import org.apache.seata.spring.boot.autoconfigure.properties.server.ServerProperties; @@ -38,6 +40,7 @@ import static org.apache.seata.spring.boot.autoconfigure.StarterConstants.PROPERTY_BEAN_MAP; import static org.apache.seata.spring.boot.autoconfigure.StarterConstants.SERVER_PREFIX; import static org.apache.seata.spring.boot.autoconfigure.StarterConstants.SERVER_RAFT_PREFIX; +import static org.apache.seata.spring.boot.autoconfigure.StarterConstants.SERVER_RATELIMIT_PREFIX; import static org.apache.seata.spring.boot.autoconfigure.StarterConstants.SERVER_RECOVERY_PREFIX; import static org.apache.seata.spring.boot.autoconfigure.StarterConstants.SERVER_UNDO_PREFIX; import static org.apache.seata.spring.boot.autoconfigure.StarterConstants.SESSION_PREFIX; @@ -81,6 +84,7 @@ public static void init() { PROPERTY_BEAN_MAP.put(SERVER_RAFT_PREFIX, ServerRaftProperties.class); PROPERTY_BEAN_MAP.put(SESSION_PREFIX, SessionProperties.class); PROPERTY_BEAN_MAP.put(STORE_PREFIX, StoreProperties.class); + PROPERTY_BEAN_MAP.put(SERVER_RATELIMIT_PREFIX, ServerRateLimitProperties.class); } } diff --git a/seata-spring-autoconfigure/seata-spring-autoconfigure-server/src/main/java/org/apache/seata/spring/boot/autoconfigure/properties/server/ServerRateLimitProperties.java b/seata-spring-autoconfigure/seata-spring-autoconfigure-server/src/main/java/org/apache/seata/spring/boot/autoconfigure/properties/server/ServerRateLimitProperties.java new file mode 100644 index 00000000000..87c1317e3be --- /dev/null +++ b/seata-spring-autoconfigure/seata-spring-autoconfigure-server/src/main/java/org/apache/seata/spring/boot/autoconfigure/properties/server/ServerRateLimitProperties.java @@ -0,0 +1,78 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.seata.spring.boot.autoconfigure.properties.server; + +import org.springframework.boot.context.properties.ConfigurationProperties; +import org.springframework.stereotype.Component; + +import static org.apache.seata.spring.boot.autoconfigure.StarterConstants.SERVER_RATELIMIT_PREFIX; + +@Component +@ConfigurationProperties(prefix = SERVER_RATELIMIT_PREFIX) +public class ServerRateLimitProperties { + /** + * whether enable server rate limit + */ + private boolean enable; + + /** + * limit token number of bucket per second + */ + private Integer bucketTokenSecondNum; + + /** + * limit token max number of bucket + */ + private Integer bucketTokenMaxNum; + + /** + * limit token initial number of bucket + */ + private Integer bucketTokenInitialTime; + + public boolean isEnable() { + return enable; + } + + public void setEnable(boolean enable) { + this.enable = enable; + } + + public Integer getBucketTokenSecondNum() { + return bucketTokenSecondNum; + } + + public void setBucketTokenSecondNum(Integer bucketTokenSecondNum) { + this.bucketTokenSecondNum = bucketTokenSecondNum; + } + + public Integer getBucketTokenMaxNum() { + return bucketTokenMaxNum; + } + + public void setBucketTokenMaxNum(Integer bucketTokenMaxNum) { + this.bucketTokenMaxNum = bucketTokenMaxNum; + } + + public Integer getBucketTokenInitialTime() { + return bucketTokenInitialTime; + } + + public void setBucketTokenInitialTime(Integer bucketTokenInitialTime) { + this.bucketTokenInitialTime = bucketTokenInitialTime; + } +} diff --git a/serializer/seata-serializer-protobuf/src/main/resources/protobuf/org/apache/seata/protocol/transcation/resultCode.proto b/serializer/seata-serializer-protobuf/src/main/resources/protobuf/org/apache/seata/protocol/transcation/resultCode.proto index 4bd9ff3bd11..ef541700d16 100644 --- a/serializer/seata-serializer-protobuf/src/main/resources/protobuf/org/apache/seata/protocol/transcation/resultCode.proto +++ b/serializer/seata-serializer-protobuf/src/main/resources/protobuf/org/apache/seata/protocol/transcation/resultCode.proto @@ -29,4 +29,5 @@ enum ResultCodeProto { Success = 1; + RateLimited = 2; } \ No newline at end of file diff --git a/serializer/seata-serializer-protobuf/src/main/resources/protobuf/org/apache/seata/protocol/transcation/transactionExceptionCode.proto b/serializer/seata-serializer-protobuf/src/main/resources/protobuf/org/apache/seata/protocol/transcation/transactionExceptionCode.proto index 6cb20b0905c..663bb88a83c 100644 --- a/serializer/seata-serializer-protobuf/src/main/resources/protobuf/org/apache/seata/protocol/transcation/transactionExceptionCode.proto +++ b/serializer/seata-serializer-protobuf/src/main/resources/protobuf/org/apache/seata/protocol/transcation/transactionExceptionCode.proto @@ -130,4 +130,8 @@ enum TransactionExceptionCodeProto { */ FailedStore = 17; + /** + * BeginFailedRateLimited + */ + BeginFailedRateLimited = 18; } \ No newline at end of file diff --git a/serializer/seata-serializer-seata/src/main/java/org/apache/seata/serializer/seata/protocol/AbstractResultMessageCodec.java b/serializer/seata-serializer-seata/src/main/java/org/apache/seata/serializer/seata/protocol/AbstractResultMessageCodec.java index 3eedf496872..d6c30176c2e 100644 --- a/serializer/seata-serializer-seata/src/main/java/org/apache/seata/serializer/seata/protocol/AbstractResultMessageCodec.java +++ b/serializer/seata-serializer-seata/src/main/java/org/apache/seata/serializer/seata/protocol/AbstractResultMessageCodec.java @@ -16,13 +16,13 @@ */ package org.apache.seata.serializer.seata.protocol; -import java.nio.ByteBuffer; - import io.netty.buffer.ByteBuf; import org.apache.seata.common.util.StringUtils; import org.apache.seata.core.protocol.AbstractResultMessage; import org.apache.seata.core.protocol.ResultCode; +import java.nio.ByteBuffer; + /** * The type Abstract result message codec. * @@ -41,7 +41,7 @@ public void encode(T t, ByteBuf out) { String resultMsg = abstractResultMessage.getMsg(); out.writeByte(resultCode.ordinal()); - if (resultCode == ResultCode.Failed) { + if (resultCode == ResultCode.Failed || resultCode == ResultCode.RateLimited) { if (StringUtils.isNotEmpty(resultMsg)) { String msg; if (resultMsg.length() > Short.MAX_VALUE) { @@ -64,7 +64,7 @@ public void decode(T t, ByteBuffer in) { ResultCode resultCode = ResultCode.get(in.get()); abstractResultMessage.setResultCode(resultCode); - if (resultCode == ResultCode.Failed) { + if (resultCode == ResultCode.Failed || resultCode == ResultCode.RateLimited) { short len = in.getShort(); if (len > 0) { byte[] msg = new byte[len]; diff --git a/server/pom.xml b/server/pom.xml index 63d34056a5d..da49b6d764e 100644 --- a/server/pom.xml +++ b/server/pom.xml @@ -329,6 +329,12 @@ jackson-mapper-asl ${jackson-mapper.version} + + + + com.bucket4j + bucket4j_jdk8-core + diff --git a/server/src/main/java/org/apache/seata/server/coordinator/DefaultCoordinator.java b/server/src/main/java/org/apache/seata/server/coordinator/DefaultCoordinator.java index ddfd5f35d66..995f8cf4207 100644 --- a/server/src/main/java/org/apache/seata/server/coordinator/DefaultCoordinator.java +++ b/server/src/main/java/org/apache/seata/server/coordinator/DefaultCoordinator.java @@ -16,17 +16,6 @@ */ package org.apache.seata.server.coordinator; -import java.util.ArrayList; -import java.util.Collection; -import java.util.Comparator; -import java.util.List; -import java.util.Map; -import java.util.Objects; -import java.util.concurrent.ArrayBlockingQueue; -import java.util.concurrent.ScheduledThreadPoolExecutor; -import java.util.concurrent.ThreadPoolExecutor; -import java.util.concurrent.TimeUnit; - import io.netty.channel.Channel; import org.apache.commons.lang.time.DateFormatUtils; import org.apache.seata.common.DefaultValues; @@ -66,6 +55,7 @@ import org.apache.seata.core.rpc.netty.NettyRemotingServer; import org.apache.seata.server.AbstractTCInboundHandler; import org.apache.seata.server.metrics.MetricsPublisher; +import org.apache.seata.server.ratelimit.RateLimiterHandler; import org.apache.seata.server.session.BranchSession; import org.apache.seata.server.session.GlobalSession; import org.apache.seata.server.session.SessionCondition; @@ -76,6 +66,17 @@ import org.slf4j.LoggerFactory; import org.slf4j.MDC; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Comparator; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.concurrent.ArrayBlockingQueue; +import java.util.concurrent.ScheduledThreadPoolExecutor; +import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.TimeUnit; + import static org.apache.seata.common.Constants.ASYNC_COMMITTING; import static org.apache.seata.common.Constants.COMMITTING; import static org.apache.seata.common.Constants.RETRY_COMMITTING; @@ -196,6 +197,11 @@ public class DefaultCoordinator extends AbstractTCInboundHandler implements Tran private final ThreadPoolExecutor branchRemoveExecutor; + private static final boolean RATE_LIMIT_ENABLE = ConfigurationFactory.getInstance().getBoolean( + org.apache.seata.common.ConfigurationKeys.RATE_LIMIT_ENABLE, false); + + private RateLimiterHandler rateLimiterHandler; + private RemotingServer remotingServer; private final DefaultCore core; @@ -226,6 +232,10 @@ protected DefaultCoordinator(RemotingServer remotingServer) { } else { branchRemoveExecutor = null; } + // create server rate limter + if (RATE_LIMIT_ENABLE) { + rateLimiterHandler = RateLimiterHandler.getInstance(); + } } public static DefaultCoordinator getInstance(RemotingServer remotingServer) { @@ -641,7 +651,10 @@ public AbstractResultMessage onRequest(AbstractMessage request, RpcContext conte } AbstractTransactionRequestToTC transactionRequest = (AbstractTransactionRequestToTC) request; transactionRequest.setTCInboundHandler(this); - + AbstractResultMessage resultMessage = processRateLimit(request, context); + if (resultMessage != null) { + return resultMessage; + } return transactionRequest.handle(context); } @@ -762,4 +775,14 @@ private void doRemove(BranchSession bt) { } } } + + private AbstractResultMessage processRateLimit(AbstractMessage request, RpcContext context) { + if (RATE_LIMIT_ENABLE) { + AbstractResultMessage resultMessage = rateLimiterHandler.handle(request, context); + if (resultMessage != null) { + return resultMessage; + } + } + return null; + } } diff --git a/server/src/main/java/org/apache/seata/server/metrics/MeterIdConstants.java b/server/src/main/java/org/apache/seata/server/metrics/MeterIdConstants.java index 18787594f8f..6a125ac5b3a 100644 --- a/server/src/main/java/org/apache/seata/server/metrics/MeterIdConstants.java +++ b/server/src/main/java/org/apache/seata/server/metrics/MeterIdConstants.java @@ -109,4 +109,12 @@ public interface MeterIdConstants { Id SUMMARY_EXP = new Id(IdConstants.SEATA_EXCEPTION) .withTag(IdConstants.ROLE_KEY, IdConstants.ROLE_VALUE_TC) .withTag(IdConstants.METER_KEY, IdConstants.METER_VALUE_SUMMARY); + + Id COUNTER_RATE_LIMIT = new Id(IdConstants.SEATA_RATE_LIMIT) + .withTag(IdConstants.ROLE_KEY, IdConstants.ROLE_VALUE_TC) + .withTag(IdConstants.METER_KEY, IdConstants.METER_VALUE_COUNTER); + + Id SUMMARY_RATE_LIMIT = new Id(IdConstants.SEATA_RATE_LIMIT) + .withTag(IdConstants.ROLE_KEY, IdConstants.ROLE_VALUE_TC) + .withTag(IdConstants.METER_KEY, IdConstants.METER_VALUE_SUMMARY); } diff --git a/server/src/main/java/org/apache/seata/server/metrics/MetricsPublisher.java b/server/src/main/java/org/apache/seata/server/metrics/MetricsPublisher.java index d7dc1beef13..709e99b650d 100644 --- a/server/src/main/java/org/apache/seata/server/metrics/MetricsPublisher.java +++ b/server/src/main/java/org/apache/seata/server/metrics/MetricsPublisher.java @@ -18,8 +18,10 @@ import org.apache.seata.core.event.EventBus; import org.apache.seata.core.event.GlobalTransactionEvent; +import org.apache.seata.core.event.RateLimitEvent; import org.apache.seata.core.model.GlobalStatus; import org.apache.seata.server.event.EventBusManager; +import org.apache.seata.server.ratelimit.RateLimitInfo; import org.apache.seata.server.session.GlobalSession; /** @@ -94,4 +96,14 @@ public static void postSessionDoingEvent(final GlobalSession globalSession, Stri globalSession.getTransactionName(), globalSession.getApplicationId(), globalSession.getTransactionServiceGroup(), globalSession.getBeginTime(), null, status, retryGlobal, retryBranch)); } + + /** + * Post rate limit event. + * + * @param rateLimitInfo the rate limit info + */ + public static void postRateLimitEvent(RateLimitInfo rateLimitInfo) { + EVENT_BUS.post(new RateLimitEvent(rateLimitInfo.getTraceId(), rateLimitInfo.getLimitType(), rateLimitInfo.getApplicationId(), + rateLimitInfo.getClientId(), rateLimitInfo.getServerIpAddressAndPort())); + } } diff --git a/server/src/main/java/org/apache/seata/server/metrics/MetricsSubscriber.java b/server/src/main/java/org/apache/seata/server/metrics/MetricsSubscriber.java index 2a937d0b98c..53fc29d1893 100644 --- a/server/src/main/java/org/apache/seata/server/metrics/MetricsSubscriber.java +++ b/server/src/main/java/org/apache/seata/server/metrics/MetricsSubscriber.java @@ -16,14 +16,10 @@ */ package org.apache.seata.server.metrics; -import java.util.HashMap; -import java.util.Map; -import java.util.concurrent.TimeUnit; -import java.util.function.Consumer; - import com.google.common.eventbus.Subscribe; import org.apache.seata.core.event.ExceptionEvent; import org.apache.seata.core.event.GlobalTransactionEvent; +import org.apache.seata.core.event.RateLimitEvent; import org.apache.seata.core.model.GlobalStatus; import org.apache.seata.metrics.Id; import org.apache.seata.metrics.registry.Registry; @@ -31,8 +27,16 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.util.HashMap; +import java.util.Map; +import java.util.concurrent.TimeUnit; +import java.util.function.Consumer; + import static org.apache.seata.metrics.IdConstants.APP_ID_KEY; +import static org.apache.seata.metrics.IdConstants.CLIENT_ID_KEY; import static org.apache.seata.metrics.IdConstants.GROUP_KEY; +import static org.apache.seata.metrics.IdConstants.LIMIT_TYPE_KEY; +import static org.apache.seata.metrics.IdConstants.SERVER_IP_ADDRESS_AND_PORT_KEY; import static org.apache.seata.metrics.IdConstants.STATUS_VALUE_AFTER_COMMITTED_KEY; import static org.apache.seata.metrics.IdConstants.STATUS_VALUE_AFTER_ROLLBACKED_KEY; @@ -193,6 +197,20 @@ public void exceptionEventForMetrics(ExceptionEvent event) { .withTag(APP_ID_KEY, event.getName())).increase(1); } + @Subscribe + public void recordRateLimitEventForMetrics(RateLimitEvent event) { + registry.getCounter(MeterIdConstants.COUNTER_RATE_LIMIT + .withTag(LIMIT_TYPE_KEY, event.getLimitType()) + .withTag(APP_ID_KEY, event.getApplicationId()) + .withTag(CLIENT_ID_KEY, event.getClientId()) + .withTag(SERVER_IP_ADDRESS_AND_PORT_KEY, event.getServerIpAddressAndPort())).increase(1); + registry.getSummary(MeterIdConstants.SUMMARY_RATE_LIMIT + .withTag(LIMIT_TYPE_KEY, event.getLimitType()) + .withTag(APP_ID_KEY, event.getApplicationId()) + .withTag(CLIENT_ID_KEY, event.getClientId()) + .withTag(SERVER_IP_ADDRESS_AND_PORT_KEY, event.getServerIpAddressAndPort())).increase(1); + } + @Override public boolean equals(Object obj) { return this.getClass().getName().equals(obj.getClass().getName()); diff --git a/server/src/main/java/org/apache/seata/server/ratelimit/RateLimitInfo.java b/server/src/main/java/org/apache/seata/server/ratelimit/RateLimitInfo.java new file mode 100644 index 00000000000..9d46ac3bf77 --- /dev/null +++ b/server/src/main/java/org/apache/seata/server/ratelimit/RateLimitInfo.java @@ -0,0 +1,120 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.seata.server.ratelimit; + +import org.apache.seata.common.util.UUIDGenerator; + +/** + * The type Rate limit info. + */ +public class RateLimitInfo { + + /** + * The constant ROLE_TC. + */ + public static final String GLOBAL_BEGIN_FAILED = "globalBeginFailed"; + + /** + * The Trace id. + */ + private String traceId; + + /** + * The Limit type (like GlobalBeginFailed). + */ + private String limitType; + + /** + * The Application id. + */ + private String applicationId; + + /** + * The Client id. + */ + private String clientId; + + /** + * The Server ip address and port. + */ + private String serverIpAddressAndPort; + + private RateLimitInfo() { + } + + public static RateLimitInfo generateRateLimitInfo(String applicationId, String type, + String clientId, String serverIpAddressAndPort) { + RateLimitInfo rateLimitInfo = new RateLimitInfo(); + rateLimitInfo.setTraceId(String.valueOf(UUIDGenerator.generateUUID())); + rateLimitInfo.setLimitType(type); + rateLimitInfo.setApplicationId(applicationId); + rateLimitInfo.setClientId(clientId); + rateLimitInfo.setServerIpAddressAndPort(serverIpAddressAndPort); + return rateLimitInfo; + } + + public String getTraceId() { + return traceId; + } + + public void setTraceId(String traceId) { + this.traceId = traceId; + } + + public String getLimitType() { + return limitType; + } + + public void setLimitType(String limitType) { + this.limitType = limitType; + } + + public String getApplicationId() { + return applicationId; + } + + public void setApplicationId(String applicationId) { + this.applicationId = applicationId; + } + + public String getClientId() { + return clientId; + } + + public void setClientId(String clientId) { + this.clientId = clientId; + } + + public String getServerIpAddressAndPort() { + return serverIpAddressAndPort; + } + + public void setServerIpAddressAndPort(String serverIpAddressAndPort) { + this.serverIpAddressAndPort = serverIpAddressAndPort; + } + + @Override + public String toString() { + return "RateLimitInfo{" + + "traceId='" + traceId + '\'' + + ", limitType='" + limitType + '\'' + + ", applicationId='" + applicationId + '\'' + + ", clientId='" + clientId + '\'' + + ", serverIpAddressAndPort='" + serverIpAddressAndPort + '\'' + + '}'; + } +} diff --git a/server/src/main/java/org/apache/seata/server/ratelimit/RateLimiter.java b/server/src/main/java/org/apache/seata/server/ratelimit/RateLimiter.java new file mode 100644 index 00000000000..d62f42c5760 --- /dev/null +++ b/server/src/main/java/org/apache/seata/server/ratelimit/RateLimiter.java @@ -0,0 +1,29 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.seata.server.ratelimit; + +/** + * RateLimiter + */ +public interface RateLimiter { + /** + * check whether the request can pass + * + * @return the boolean + */ + boolean canPass(); +} diff --git a/server/src/main/java/org/apache/seata/server/ratelimit/RateLimiterHandler.java b/server/src/main/java/org/apache/seata/server/ratelimit/RateLimiterHandler.java new file mode 100644 index 00000000000..ca1d46c9efe --- /dev/null +++ b/server/src/main/java/org/apache/seata/server/ratelimit/RateLimiterHandler.java @@ -0,0 +1,73 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.seata.server.ratelimit; + +import org.apache.seata.common.XID; +import org.apache.seata.common.loader.EnhancedServiceLoader; +import org.apache.seata.core.exception.TransactionExceptionCode; +import org.apache.seata.core.protocol.AbstractMessage; +import org.apache.seata.core.protocol.AbstractResultMessage; +import org.apache.seata.core.protocol.ResultCode; +import org.apache.seata.core.protocol.transaction.GlobalBeginRequest; +import org.apache.seata.core.protocol.transaction.GlobalBeginResponse; +import org.apache.seata.core.rpc.RpcContext; +import org.apache.seata.server.metrics.MetricsPublisher; + +/** + * RateLimiterHandler + */ +public class RateLimiterHandler { + + private static volatile RateLimiterHandler instance; + + private final RateLimiter rateLimiter; + + public RateLimiterHandler(RateLimiter rateLimiter) { + this.rateLimiter = rateLimiter; + } + + private RateLimiterHandler() { + rateLimiter = EnhancedServiceLoader.load(RateLimiter.class); + } + + public static RateLimiterHandler getInstance() { + if (instance == null) { + synchronized (RateLimiterHandler.class) { + if (instance == null) { + instance = new RateLimiterHandler(); + } + } + } + return instance; + } + + public AbstractResultMessage handle(AbstractMessage request, RpcContext rpcContext) { + if (request instanceof GlobalBeginRequest) { + if (!rateLimiter.canPass()) { + GlobalBeginResponse response = new GlobalBeginResponse(); + response.setTransactionExceptionCode(TransactionExceptionCode.BeginFailedRateLimited); + response.setResultCode(ResultCode.RateLimited); + RateLimitInfo rateLimitInfo = RateLimitInfo.generateRateLimitInfo(rpcContext.getApplicationId(), + RateLimitInfo.GLOBAL_BEGIN_FAILED, rpcContext.getClientId(), XID.getIpAddressAndPort()); + MetricsPublisher.postRateLimitEvent(rateLimitInfo); + response.setMsg(String.format("TransactionException[rate limit exception, rate limit info: %s]", rateLimitInfo)); + return response; + } + } + return null; + } +} diff --git a/server/src/main/java/org/apache/seata/server/ratelimit/TokenBucketLimiter.java b/server/src/main/java/org/apache/seata/server/ratelimit/TokenBucketLimiter.java new file mode 100644 index 00000000000..4f20e71bbcc --- /dev/null +++ b/server/src/main/java/org/apache/seata/server/ratelimit/TokenBucketLimiter.java @@ -0,0 +1,119 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.seata.server.ratelimit; + +import org.apache.seata.common.ConfigurationKeys; +import org.apache.seata.common.executor.Initialize; +import org.apache.seata.common.loader.LoadLevel; +import org.apache.seata.common.loader.Scope; +import org.apache.seata.common.util.StringUtils; +import org.apache.seata.config.Configuration; +import org.apache.seata.config.ConfigurationFactory; + +import io.github.bucket4j.Bandwidth; +import io.github.bucket4j.Bucket; +import io.github.bucket4j.Refill; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.time.Duration; + +/** + * TokenBucketLimiter based on Bucket4j + */ +@LoadLevel(name = "token-bucket-limiter", scope = Scope.SINGLETON) +public class TokenBucketLimiter implements RateLimiter, Initialize { + private static final Logger LOGGER = LoggerFactory.getLogger(TokenBucketLimiter.class); + + /** + * whether enable server rate limit + */ + private boolean enable; + + /** + * limit token number of bucket per second + */ + private Integer bucketTokenSecondNum; + + /** + * limit token max number of bucket + */ + private Integer bucketTokenMaxNum; + + /** + * limit token initial number of bucket + */ + private Integer bucketTokenInitialNum; + + /** + * the Bucket + */ + private Bucket bucket; + + public TokenBucketLimiter() {} + + public TokenBucketLimiter(boolean enable, Integer bucketTokenSecondNum, + Integer bucketTokenMaxNum, Integer bucketTokenInitialNum) { + this.enable = enable; + this.bucketTokenSecondNum = bucketTokenSecondNum; + this.bucketTokenMaxNum = bucketTokenMaxNum; + this.bucketTokenInitialNum = bucketTokenInitialNum; + initBucket(); + } + + @Override + public void init() { + final Configuration config = ConfigurationFactory.getInstance(); + this.enable = config.getBoolean(ConfigurationKeys.RATE_LIMIT_ENABLE); + if (this.enable) { + String tokenSecondNum = config.getConfig(ConfigurationKeys.RATE_LIMIT_BUCKET_TOKEN_SECOND_NUM); + if (StringUtils.isBlank(tokenSecondNum)) { + throw new IllegalArgumentException("rate limiter tokenSecondNum is blank"); + } + String tokenMaxNum = config.getConfig(ConfigurationKeys.RATE_LIMIT_BUCKET_TOKEN_MAX_NUM); + if (StringUtils.isBlank(tokenMaxNum)) { + throw new IllegalArgumentException("rate limiter tokenMaxNum is blank"); + } + String tokenInitialNum = config.getConfig(ConfigurationKeys.RATE_LIMIT_BUCKET_TOKEN_INITIAL_NUM); + if (StringUtils.isBlank(tokenInitialNum)) { + throw new IllegalArgumentException("rate limiter tokenInitialNum is blank"); + } + this.bucketTokenSecondNum = Integer.parseInt(tokenSecondNum); + this.bucketTokenMaxNum = Integer.parseInt(tokenMaxNum); + this.bucketTokenInitialNum = Integer.parseInt(tokenInitialNum); + initBucket(); + LOGGER.info("TokenBucketLimiter init success, tokenSecondNum: {}, tokenMaxNum: {}, tokenInitialNum: {}", + this.bucketTokenSecondNum, this.bucketTokenMaxNum, this.bucketTokenInitialNum); + } + } + + @Override + public boolean canPass() { + return bucket.tryConsume(1); + } + + private void initBucket() { + Bandwidth limit = Bandwidth.classic(this.bucketTokenMaxNum, Refill.greedy(this.bucketTokenSecondNum, + Duration.ofSeconds(1))); + Bucket bucket = Bucket.builder().addLimit(limit).build(); + if (this.bucketTokenInitialNum > 0) { + bucket.addTokens(this.bucketTokenInitialNum); + } + this.bucket = bucket; + } + +} diff --git a/server/src/main/resources/META-INF/services/org.apache.seata.server.ratelimit.RateLimiter b/server/src/main/resources/META-INF/services/org.apache.seata.server.ratelimit.RateLimiter new file mode 100644 index 00000000000..333d353c12e --- /dev/null +++ b/server/src/main/resources/META-INF/services/org.apache.seata.server.ratelimit.RateLimiter @@ -0,0 +1,17 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +org.apache.seata.server.ratelimit.TokenBucketLimiter \ No newline at end of file diff --git a/server/src/main/resources/application.example.yml b/server/src/main/resources/application.example.yml index 059312ae856..d97e167ee58 100644 --- a/server/src/main/resources/application.example.yml +++ b/server/src/main/resources/application.example.yml @@ -159,6 +159,11 @@ seata: session: branch-async-queue-size: 5000 #branch async remove queue size enable-branch-async-remove: false #enable to asynchronous remove branchSession + ratelimit: + enable: false + bucketTokenSecondNum: 1 + bucketTokenMaxNum: 1 + bucketTokenInitialNum: 1 store: # support: file 、 db 、 redis 、 raft mode: file diff --git a/server/src/main/resources/application.raft.example.yml b/server/src/main/resources/application.raft.example.yml index 6c96b9e9fac..c64954a20b5 100644 --- a/server/src/main/resources/application.raft.example.yml +++ b/server/src/main/resources/application.raft.example.yml @@ -114,6 +114,11 @@ seata: session: branch-async-queue-size: 5000 #branch async remove queue size enable-branch-async-remove: false #enable to asynchronous remove branchSession + ratelimit: + enable: true + bucketTokenSecondNum: 1 + bucketTokenMaxNum: 1 + bucketTokenInitialNum: 1 store: # support: file mode: file diff --git a/server/src/test/java/org/apache/seata/server/ratelimiter/RateLimiterHandlerTest.java b/server/src/test/java/org/apache/seata/server/ratelimiter/RateLimiterHandlerTest.java new file mode 100644 index 00000000000..fd772092406 --- /dev/null +++ b/server/src/test/java/org/apache/seata/server/ratelimiter/RateLimiterHandlerTest.java @@ -0,0 +1,65 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.seata.server.ratelimiter; + +import org.apache.seata.core.protocol.transaction.GlobalBeginRequest; +import org.apache.seata.core.rpc.RpcContext; +import org.apache.seata.server.ratelimit.RateLimiter; +import org.apache.seata.server.ratelimit.RateLimiterHandler; +import org.apache.seata.server.ratelimit.TokenBucketLimiter; + +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.boot.test.context.SpringBootTest; + +/** + * RateLimiterHandlerTest + */ +@SpringBootTest +public class RateLimiterHandlerTest { + + /** + * Logger for TokenBucketLimiterTest + **/ + private static final Logger LOGGER = LoggerFactory.getLogger(RateLimiterHandlerTest.class); + + private static RateLimiterHandler rateLimiterHandler; + + @Test + public void testHandlePass() { + RateLimiter rateLimiter = new TokenBucketLimiter(true, 1, + 10, 10); + rateLimiterHandler = new RateLimiterHandler(rateLimiter); + GlobalBeginRequest request = new GlobalBeginRequest(); + RpcContext rpcContext = new RpcContext(); + Assertions.assertNull(rateLimiterHandler.handle(request, rpcContext)); + } + + @Test + public void testHandleNotPass() { + RateLimiter rateLimiter = new TokenBucketLimiter(true, 1, + 1, 0); + rateLimiterHandler = new RateLimiterHandler(rateLimiter); + GlobalBeginRequest request = new GlobalBeginRequest(); + RpcContext rpcContext = new RpcContext(); + rateLimiterHandler.handle(request, rpcContext); + Assertions.assertNotNull(rateLimiterHandler.handle(request, rpcContext)); + } + +} diff --git a/server/src/test/java/org/apache/seata/server/ratelimiter/TokenBucketLimiterTest.java b/server/src/test/java/org/apache/seata/server/ratelimiter/TokenBucketLimiterTest.java new file mode 100644 index 00000000000..9bac6220705 --- /dev/null +++ b/server/src/test/java/org/apache/seata/server/ratelimiter/TokenBucketLimiterTest.java @@ -0,0 +1,93 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.seata.server.ratelimiter; + +import org.apache.seata.common.thread.NamedThreadFactory; +import org.apache.seata.server.ratelimit.RateLimiter; +import org.apache.seata.server.ratelimit.TokenBucketLimiter; + +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.boot.test.context.SpringBootTest; +import org.springframework.util.StopWatch; + +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.SynchronousQueue; +import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; + +/** + * TokenBucketLimiterTest + */ +@SpringBootTest +public class TokenBucketLimiterTest { + + /** + * Logger for TokenBucketLimiterTest + **/ + private static final Logger LOGGER = LoggerFactory.getLogger(TokenBucketLimiterTest.class); + + @Test + public void testPerformanceOfTokenBucketLimiter() throws InterruptedException { + RateLimiter rateLimiter = new TokenBucketLimiter(true, 1, + 10, 10); + int threads = 10; + final int count = 100; + final CountDownLatch cnt = new CountDownLatch(count * threads); + + final ThreadPoolExecutor service1 = new ThreadPoolExecutor(threads, threads, 0L, TimeUnit.MILLISECONDS, + new SynchronousQueue(), new NamedThreadFactory("test1", false)); + AtomicInteger totalPass = new AtomicInteger(); + AtomicInteger totalReject = new AtomicInteger(); + StopWatch totalStopWatch = new StopWatch(); + totalStopWatch.start(); + for (int i = 0; i < threads; i++) { + service1.execute(() -> { + int pass = 0; + int reject = 0; + StopWatch w = new StopWatch(); + w.start(); + for (int u = 0; u < count; u++) { + try { + Thread.sleep(10); + } catch (InterruptedException e) { + e.printStackTrace(); + } + boolean result = rateLimiter.canPass(); + if (result) { + pass++; + totalPass.getAndIncrement(); + } else { + reject++; + totalReject.getAndIncrement(); + } + cnt.countDown(); + } + w.stop(); + LOGGER.info("total time:{}ms, pass:{}, reject:{}", w.getLastTaskTimeMillis(), pass, reject); + }); + } + cnt.await(); + totalStopWatch.stop(); + LOGGER.info("total time:{}ms, total pass:{}, total reject:{}", totalStopWatch.getLastTaskTimeMillis(), + totalPass.get(), totalReject.get()); + Assertions.assertNotEquals(0, totalReject.get()); + } +} diff --git a/server/src/test/resources/META-INF/services/org.apache.seata.server.ratelimit.RateLimiter b/server/src/test/resources/META-INF/services/org.apache.seata.server.ratelimit.RateLimiter new file mode 100644 index 00000000000..333d353c12e --- /dev/null +++ b/server/src/test/resources/META-INF/services/org.apache.seata.server.ratelimit.RateLimiter @@ -0,0 +1,17 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +org.apache.seata.server.ratelimit.TokenBucketLimiter \ No newline at end of file diff --git a/server/src/test/resources/file.conf b/server/src/test/resources/file.conf index 422c6dd5836..ef58245ae72 100644 --- a/server/src/test/resources/file.conf +++ b/server/src/test/resources/file.conf @@ -56,6 +56,12 @@ server { #schedule delete expired undo_log in milliseconds logDeletePeriod = 86400000 } + ratelimit { + enable = false + bucketTokenSecondNum = 1 + bucketTokenMaxNum = 1 + bucketTokenInitialNum = 1 + } } ## metrics settings metrics { diff --git a/tm/src/main/java/org/apache/seata/tm/DefaultTransactionManager.java b/tm/src/main/java/org/apache/seata/tm/DefaultTransactionManager.java index 602a037d764..75769399a97 100644 --- a/tm/src/main/java/org/apache/seata/tm/DefaultTransactionManager.java +++ b/tm/src/main/java/org/apache/seata/tm/DefaultTransactionManager.java @@ -51,6 +51,9 @@ public String begin(String applicationId, String transactionServiceGroup, String request.setTransactionName(name); request.setTimeout(timeout); GlobalBeginResponse response = (GlobalBeginResponse) syncCall(request); + if (response.getResultCode() == ResultCode.RateLimited) { + throw new TmTransactionException(TransactionExceptionCode.BeginFailedRateLimited, response.getMsg()); + } if (response.getResultCode() == ResultCode.Failed) { throw new TmTransactionException(TransactionExceptionCode.BeginFailed, response.getMsg()); } diff --git a/tm/src/main/java/org/apache/seata/tm/api/DefaultFailureHandlerImpl.java b/tm/src/main/java/org/apache/seata/tm/api/DefaultFailureHandlerImpl.java index c008238182a..7ad4beeec01 100644 --- a/tm/src/main/java/org/apache/seata/tm/api/DefaultFailureHandlerImpl.java +++ b/tm/src/main/java/org/apache/seata/tm/api/DefaultFailureHandlerImpl.java @@ -16,8 +16,6 @@ */ package org.apache.seata.tm.api; -import java.util.concurrent.TimeUnit; - import org.apache.seata.common.thread.NamedThreadFactory; import org.apache.seata.core.exception.TransactionException; import org.apache.seata.core.logger.StackTraceLogger; @@ -29,6 +27,8 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.util.concurrent.TimeUnit; + /** * The type Default failure handler. * @@ -57,6 +57,11 @@ public void onBeginFailure(GlobalTransaction tx, Throwable cause) { LOGGER.warn("Failed to begin transaction. ", cause); } + @Override + public void onBeginRateLimitedFailure(org.apache.seata.tm.api.GlobalTransaction globalTransaction, Throwable cause) { + LOGGER.warn("Failed to begin transaction due to RateLimit. ", cause); + } + @Override public void onCommitFailure(GlobalTransaction tx, Throwable cause) { LOGGER.warn("Failed to commit transaction[" + tx.getXid() + "]", cause); diff --git a/tm/src/main/java/org/apache/seata/tm/api/FailureHandler.java b/tm/src/main/java/org/apache/seata/tm/api/FailureHandler.java index c7c0e83a919..4c0a33af5ab 100644 --- a/tm/src/main/java/org/apache/seata/tm/api/FailureHandler.java +++ b/tm/src/main/java/org/apache/seata/tm/api/FailureHandler.java @@ -30,6 +30,14 @@ public interface FailureHandler { */ void onBeginFailure(T tx, Throwable cause); + /** + * On begin rate limited failure + * + * @param globalTransaction + * @param cause + */ + void onBeginRateLimitedFailure(GlobalTransaction globalTransaction, Throwable cause); + /** * On commit failure. * @@ -53,5 +61,4 @@ public interface FailureHandler { * @param originalException the originalException */ void onRollbacking(T tx, Throwable originalException); - } diff --git a/tm/src/main/java/org/apache/seata/tm/api/TransactionalExecutor.java b/tm/src/main/java/org/apache/seata/tm/api/TransactionalExecutor.java index 736c303ff16..05f25ae03ff 100644 --- a/tm/src/main/java/org/apache/seata/tm/api/TransactionalExecutor.java +++ b/tm/src/main/java/org/apache/seata/tm/api/TransactionalExecutor.java @@ -53,6 +53,12 @@ enum Code { // BeginFailure, + /** + * Begin failure of rate limited code. + */ + // + BeginFailedRateLimited, + /** * Commit failure code. */ diff --git a/tm/src/main/java/org/apache/seata/tm/api/TransactionalTemplate.java b/tm/src/main/java/org/apache/seata/tm/api/TransactionalTemplate.java index 5b25963e7b5..f8f0f5b9186 100644 --- a/tm/src/main/java/org/apache/seata/tm/api/TransactionalTemplate.java +++ b/tm/src/main/java/org/apache/seata/tm/api/TransactionalTemplate.java @@ -16,8 +16,6 @@ */ package org.apache.seata.tm.api; -import java.util.List; - import org.apache.seata.common.exception.FrameworkErrorCode; import org.apache.seata.common.exception.FrameworkException; import org.apache.seata.common.exception.ShouldNeverHappenException; @@ -35,6 +33,8 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.util.List; + /** * Template of executing business logic with a global transaction. * @@ -309,6 +309,10 @@ private void beginTransaction(TransactionInfo txInfo, GlobalTransaction tx) thro tx.begin(txInfo.getTimeOut(), txInfo.getName()); triggerAfterBegin(); } catch (TransactionException txe) { + if (TransactionExceptionCode.BeginFailedRateLimited.equals(txe.getCode())) { + throw new TransactionalExecutor.ExecutionException(tx, txe, + TransactionalExecutor.Code.BeginFailedRateLimited); + } throw new TransactionalExecutor.ExecutionException(tx, txe, TransactionalExecutor.Code.BeginFailure);