diff --git a/flink-connector-cassandra/src/main/java/org/apache/flink/connector/cassandra/source/split/SplitsGenerator.java b/flink-connector-cassandra/src/main/java/org/apache/flink/connector/cassandra/source/split/SplitsGenerator.java index e44be343..98585c6e 100644 --- a/flink-connector-cassandra/src/main/java/org/apache/flink/connector/cassandra/source/split/SplitsGenerator.java +++ b/flink-connector-cassandra/src/main/java/org/apache/flink/connector/cassandra/source/split/SplitsGenerator.java @@ -27,7 +27,9 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.math.BigDecimal; import java.math.BigInteger; +import java.math.RoundingMode; import java.util.ArrayDeque; import java.util.ArrayList; import java.util.List; @@ -147,7 +149,9 @@ private float getRingFraction(List tokenRanges) { addressedTokens.add(distance(tokenRange.rangeStart, tokenRange.rangeEnd)); } // it is < 1 because it is a percentage - return addressedTokens.divide(partitioner.ringSize).floatValue(); + return new BigDecimal(addressedTokens) + .divide(new BigDecimal(partitioner.ringSize), 6, RoundingMode.HALF_UP) + .floatValue(); } /** Gets the list of token ranges that the table occupies on a given Cassandra node. */