From d403231958128aac8e5edf5eab052ebff8034c64 Mon Sep 17 00:00:00 2001 From: Pablo Murillo Date: Thu, 14 Sep 2023 17:38:39 +0200 Subject: [PATCH 1/7] skip haYPoint=endpoint when update periodic pings --- .../wfm/topology/ping/bolt/FlowFetcher.java | 17 +++++++++++++---- 1 file changed, 13 insertions(+), 4 deletions(-) diff --git a/src-java/ping-topology/ping-storm-topology/src/main/java/org/openkilda/wfm/topology/ping/bolt/FlowFetcher.java b/src-java/ping-topology/ping-storm-topology/src/main/java/org/openkilda/wfm/topology/ping/bolt/FlowFetcher.java index be1ebdd5616..52a648a869c 100644 --- a/src-java/ping-topology/ping-storm-topology/src/main/java/org/openkilda/wfm/topology/ping/bolt/FlowFetcher.java +++ b/src-java/ping-topology/ping-storm-topology/src/main/java/org/openkilda/wfm/topology/ping/bolt/FlowFetcher.java @@ -150,6 +150,11 @@ private void updatePeriodicHaPingHeap(Tuple input) throws PipelineException { if (periodicHaPingCommand.isEnable()) { haFlowRepository.findById(haFlowId) .flatMap(haFlow -> { + if (hasHasOneHaSubFlowWithYPointEqualsEndpoint(haFlow)) { + log.warn("Temporary disabled. HaFlow {} has one sub-flow with endpoint switch equals to " + + "Y-point switch", haFlowId); + return Optional.empty(); + } haFlowRepository.detach(haFlow); return getFlowWithTransitEncapsulation(haFlow); }) @@ -185,6 +190,7 @@ private Set getFlowsWithTransitEncapsulation() { private Set getHaFlowsWithTransitEncapsulation() { return haFlowRepository.findWithPeriodicPingsEnabled().stream() + .filter(haFlow -> !hasHasOneHaSubFlowWithYPointEqualsEndpoint(haFlow)) .peek(haFlowRepository::detach) .map(this::getFlowWithTransitEncapsulation) .flatMap(o -> o.map(Stream::of).orElseGet(Stream::empty)) @@ -341,10 +347,7 @@ public void handleOnDemandHaFlowRequest(Tuple input) throws PipelineException { return; } - SwitchId yPointSwitchId = haFlow.getForwardPath().getYPointSwitchId(); - long yEqualsEndpointCount = haFlow.getHaSubFlows().stream() - .filter(subFlow -> subFlow.getEndpointSwitchId().equals(yPointSwitchId)).count(); - if (yEqualsEndpointCount == 1) { + if (hasHasOneHaSubFlowWithYPointEqualsEndpoint(haFlow)) { emitOnDemandHaFlowResponse(input, request, format( "Temporary disabled. HaFlow %s has one sub-flow with endpoint switch equals to Y-point switch", request.getHaFlowId())); @@ -366,6 +369,12 @@ public void handleOnDemandHaFlowRequest(Tuple input) throws PipelineException { emit(input, pingContext, pullContext(input)); } + private boolean hasHasOneHaSubFlowWithYPointEqualsEndpoint(HaFlow haFlow) { + SwitchId yPointSwitchId = haFlow.getForwardPath().getYPointSwitchId(); + return haFlow.getHaSubFlows().stream() + .filter(subFlow -> subFlow.getEndpointSwitchId().equals(yPointSwitchId)).count() == 1; + } + private Optional getFlowWithTransitEncapsulation(Flow flow) { if (!flow.isOneSwitchFlow()) { Optional yFlowId = yFlowRepository.findYFlowId(flow.getFlowId()); From a93a3e23facfae0b75bbc6f80f58ad789a2e8387 Mon Sep 17 00:00:00 2001 From: Dmitrii Beliakov Date: Mon, 16 Oct 2023 11:22:25 +0200 Subject: [PATCH 2/7] Add a check whether a migrate script completed successfully. Add error messages and update docs about how to run migrations manually. --- docker/db-mysql-migration/migrate-develop.sh | 34 ++++++++++++++----- .../db-mysql-migration/migrations/README.md | 32 ++++++++--------- 2 files changed, 41 insertions(+), 25 deletions(-) diff --git a/docker/db-mysql-migration/migrate-develop.sh b/docker/db-mysql-migration/migrate-develop.sh index 07873e12e24..6157037482b 100755 --- a/docker/db-mysql-migration/migrate-develop.sh +++ b/docker/db-mysql-migration/migrate-develop.sh @@ -3,13 +3,29 @@ set -e cd /liquibase/changelog -liquibase \ - --headless=true --defaultsFile=/liquibase/liquibase.docker.properties \ - --username="${KILDA_MYSQL_USER}" \ - --password="${KILDA_MYSQL_PASSWORD}" \ - --url="${KILDA_MYSQL_JDBC_URL}" \ - update --changelog-file="root.yaml" - -echo "All migrations have been applied/verified" +rm -f /kilda/flag/migration.* + +echo "******\nStart liquibase update using URL: ${KILDA_MYSQL_JDBC_URL}\n******" + +if ! liquibase \ + --headless=true --defaultsFile=/liquibase/liquibase.docker.properties \ + --username="${KILDA_MYSQL_USER}" \ + --password="${KILDA_MYSQL_PASSWORD}" \ + --url="${KILDA_MYSQL_JDBC_URL}" \ + update --changelog-file="root.yaml"; +then + echo "******\nmigrate-develop.sh: DB migrations failure.\n******" + exit 1 +fi + +echo "******\nmigrate-develop.sh: All migrations have been applied/verified.\n******" touch /kilda/flag/migration.ok -exec sleep infinity +if [ -z "${NO_SLEEP}"] +then + echo "Set sleep infinity" + exec sleep infinity +else + echo "The migrate script completed" + exit 0 +fi + diff --git a/docker/db-mysql-migration/migrations/README.md b/docker/db-mysql-migration/migrations/README.md index 9ddfdc9b034..25392b31a48 100644 --- a/docker/db-mysql-migration/migrations/README.md +++ b/docker/db-mysql-migration/migrations/README.md @@ -34,7 +34,7 @@ chunk into `root.yaml` file: 001-feature-ABC.yaml ``` -Tag for rollback operation (during rollback everithing that was applied after this tag will be rolled back) +Tag for rollback operation (during rollback everything that was applied after this tag will be rolled back) ```yaml changeSet: id: tag-for-some-migration @@ -44,31 +44,31 @@ changeSet: tag: 000-migration ``` -To start DB update by hands you need to build migration container +To start DB update manually you need to compose a migration image and execute a migration script. Optionally, you +can execute liquibase with arbitrary parameters. + +To create an image, navigate to (TODO) ```shell script docker-compose build db_mysql_migration ``` -And execute following command (for DB on some foreign host): +For executing a migration script (you can override other environment variables as well). `NO_SLEEP` parameter will exit the +script normally, otherwise it will sleep infinitely to preserve the container running: ```shell script -docker run \ - --volume=/etc/resolv.conf:/etc/resolv.conf --rm --network=host \ - -e INSTALL_MYSQL=true \ - open-kilda_db_mysql_migration:latest \ - --username="kilda" \ - --password="password" \ - --url="jdbc:mysql://mysql.pendev/kilda" \ - update --changelog-file="root.yaml" +docker run --volume=/etc/resolv.conf:/etc/resolv.conf --rm --network=host \ +-e KILDA_MYSQL_JDBC_URL="jdbc:mysql://localhost:8101/kilda" \ +-e NO_SLEEP=true \ +--entrypoint=/kilda/migrate-develop.sh \ +kilda/db_mysql_migration:latest ``` -For rollback changes up to some specific tag, execute command +For executing liquibase manually, for example for rolling back changes up to some specific tag, execute the following command: ```shell script docker run \ --volume=/etc/resolv.conf:/etc/resolv.conf --rm --network=host \ - -e INSTALL_MYSQL=true \ - open-kilda_db_mysql_migration:latest \ + kilda/db_mysql_migration:latest \ --username="kilda" \ - --password="password" \ - --url="jdbc:mysql://mysql.pendev/kilda" \ + --password="kilda" \ + --url="jdbc:mysql://localhost:8101/kilda" \ rollback --changelog-file="root.yaml" --tag="some-specific-tag" ``` From b4797a18e55c260a2ff693ce0f7a78903248337c Mon Sep 17 00:00:00 2001 From: Yuliia Miroshnychenko Date: Tue, 17 Oct 2023 14:50:43 +0200 Subject: [PATCH 3/7] [TEST]: 5224: Ha-Flow: Ping: Updating switch triplet selection --- .../helpers/TopologyHelper.groovy | 12 ++++++ .../spec/flows/haflows/HaFlowPingSpec.groovy | 42 ++++++++++++------- 2 files changed, 39 insertions(+), 15 deletions(-) diff --git a/src-java/testing/functional-tests/src/main/groovy/org/openkilda/functionaltests/helpers/TopologyHelper.groovy b/src-java/testing/functional-tests/src/main/groovy/org/openkilda/functionaltests/helpers/TopologyHelper.groovy index 0547111c7bf..c360a4e5ac8 100644 --- a/src-java/testing/functional-tests/src/main/groovy/org/openkilda/functionaltests/helpers/TopologyHelper.groovy +++ b/src-java/testing/functional-tests/src/main/groovy/org/openkilda/functionaltests/helpers/TopologyHelper.groovy @@ -207,6 +207,18 @@ class TopologyHelper { } } + SwitchTriplet findSwitchTripletWithSharedEpInTheMiddleOfTheChain() { + def pairSharedEpAndEp1 = getAllSwitchPairs().neighbouring().random() + //shared endpoint should be in the middle of the switches chain to deploy ha-flow without shared path + def pairEp1AndEp2 = getAllSwitchPairs().neighbouring().excludePairs([pairSharedEpAndEp1]).includeSwitch(pairSharedEpAndEp1.src).random() + Switch thirdSwitch = pairSharedEpAndEp1.src == pairEp1AndEp2.dst ? pairEp1AndEp2.src : pairEp1AndEp2.dst + return switchTriplets.find { + it.shared.dpId == pairSharedEpAndEp1.src.dpId + && ((it.ep1.dpId == pairSharedEpAndEp1.dst.dpId && it.ep2.dpId == thirdSwitch.dpId) + || (it.ep1.dpId == thirdSwitch.dpId && it.ep2.dpId == pairSharedEpAndEp1.dst.dpId)) + } + } + SwitchTriplet findSwitchTripletForHaFlowWithProtectedPaths() { return switchTriplets.find { if (!SwitchTriplet.ALL_ENDPOINTS_DIFFERENT(it)) { diff --git a/src-java/testing/functional-tests/src/test/groovy/org/openkilda/functionaltests/spec/flows/haflows/HaFlowPingSpec.groovy b/src-java/testing/functional-tests/src/test/groovy/org/openkilda/functionaltests/spec/flows/haflows/HaFlowPingSpec.groovy index 41c98c37f4b..5d00ae4123e 100644 --- a/src-java/testing/functional-tests/src/test/groovy/org/openkilda/functionaltests/spec/flows/haflows/HaFlowPingSpec.groovy +++ b/src-java/testing/functional-tests/src/test/groovy/org/openkilda/functionaltests/spec/flows/haflows/HaFlowPingSpec.groovy @@ -57,18 +57,17 @@ class HaFlowPingSpec extends HealthCheckSpecification { @Tags([LOW_PRIORITY]) def "Able to turn off periodic pings on a HA-flow"() { given: "An HA-flow with periodic pings turned on" - def swT = topologyHelper.findSwitchTripletWithDifferentEndpoints() + def swT = topologyHelper.findSwitchTripletWithSharedEpInTheMiddleOfTheChain() def haFlowRequest = haFlowHelper.randomHaFlow(swT).tap { it.periodicPings = true } def haFlow = haFlowHelper.addHaFlow(haFlowRequest) + def paths = northboundV2.getHaFlowPaths(haFlow.haFlowId) + assert !paths.sharedPath.forward, "Ha-flow has shared path and one of the sub-flow is a Y-Point and ping is disable for such kind of Ha-flow" assert northboundV2.getHaFlow(haFlow.haFlowId).periodicPings wait(STATS_LOGGING_TIMEOUT) { assert flowStats.latencyOf(haFlow.getSubFlows().get(0).getFlowId()).get(LATENCY, REVERSE).hasNonZeroValues() - } - - when: "Turn off periodic pings" def updatedHaFlow = haFlowHelper.partialUpdateHaFlow( haFlow.haFlowId, HaFlowPatchPayload.builder().periodicPings(false).build()) @@ -91,19 +90,24 @@ class HaFlowPingSpec extends HealthCheckSpecification { } @Tags([LOW_PRIORITY]) - def "Unable to ping HA-flow via periodic pings if ISL is broken"() { + def "Unable to ping one of the HA-subflows via periodic pings if related ISL is broken"() { given: "Pinned HA-flow with periodic pings turned on which won't be rerouted after ISL fails" - def swT = topologyHelper.findSwitchTripletWithDifferentEndpoints() + def swT = topologyHelper.findSwitchTripletWithSharedEpInTheMiddleOfTheChain() def haFlowRequest = haFlowHelper.randomHaFlow(swT).tap { it.periodicPings = true it.pinned = true } def haFlow = haFlowHelper.addHaFlow(haFlowRequest) assert northboundV2.getHaFlow(haFlow.haFlowId).periodicPings + def paths = northboundV2.getHaFlowPaths(haFlow.haFlowId) - def islToFail = pathHelper.getInvolvedIsls(PathHelper.convert(paths.subFlowPaths[0].forward)).first() + assert !paths.sharedPath.forward, "Ha-flow has shared path and one of the sub-flow is a Y-Point and ping is disable for such kind of Ha-flow" + String subFlowWithBrokenIsl = paths.subFlowPaths.first().flowId + def islToFail = pathHelper.getInvolvedIsls(PathHelper.convert(paths.subFlowPaths.first().forward)).first() + + String subFlowWithActiveIsl = paths.subFlowPaths.flowId.find { it != subFlowWithBrokenIsl } - when: "Fail an HA-flow ISL (bring switch port down)" + when: "Fail one of the HA-subflows ISL (bring switch port down)" antiflap.portDown(islToFail.srcSwitch.dpId, islToFail.srcPort) wait(WAIT_OFFSET) { northbound.getLink(islToFail).state == FAILED } def afterFailTime = new Date().getTime() @@ -111,13 +115,19 @@ class HaFlowPingSpec extends HealthCheckSpecification { then: "Periodic pings are still enabled" northboundV2.getHaFlow(haFlow.haFlowId).periodicPings - and: "Metrics for HA-subflows have 'error' in tsdb" + and: "Metrics for the HA-subflow with broken ISL have 'error' status in tsdb" wait(pingInterval + WAIT_OFFSET * 2, 2) { - withPool { - [haFlow.subFlows*.flowId, [FORWARD, REVERSE]].combinations().eachParallel { - String flowId, Direction direction -> - flowStats.latencyOf(flowId).get(LATENCY, direction, ERROR).hasNonZeroValuesAfter(afterFailTime) - } + [FORWARD, REVERSE].each { Direction direction -> + def stats = flowStats.latencyOf(subFlowWithBrokenIsl).get(LATENCY, direction, ERROR) + assert stats != null && stats.dataPoints.keySet().find { it >= afterFailTime} + } + } + + and: "Metrics for HA-subflow with active ISL have 'success' status in tsdb" + wait(pingInterval + WAIT_OFFSET * 2, 2) { + [FORWARD, REVERSE].each { Direction direction -> + def stats = flowStats.latencyOf(subFlowWithActiveIsl).get(LATENCY, direction, SUCCESS) + assert stats != null && stats.hasNonZeroValuesAfter(afterFailTime) } } @@ -130,12 +140,14 @@ class HaFlowPingSpec extends HealthCheckSpecification { def "Able to turn on periodic pings on a HA-flow"() { when: "Create a HA-flow with periodic pings turned on" - def swT = topologyHelper.findSwitchTripletWithDifferentEndpoints() + def swT = topologyHelper.findSwitchTripletWithSharedEpInTheMiddleOfTheChain() def beforeCreationTime = new Date().getTime() def haFlowRequest = haFlowHelper.randomHaFlow(swT).tap { it.periodicPings = true } def haFlow = haFlowHelper.addHaFlow(haFlowRequest) + def paths = northboundV2.getHaFlowPaths(haFlow.haFlowId) + assert !paths.sharedPath.forward, "Ha-flow has shared path and one of the sub-flow is a Y-Point and ping is disable for such kind of Ha-flow" then: "Periodic pings are really enabled" northboundV2.getHaFlow(haFlow.haFlowId).periodicPings From 6889c675100fa9a624f9f37610e19a501e8a4901 Mon Sep 17 00:00:00 2001 From: pkazlenka Date: Fri, 20 Oct 2023 11:54:44 +0200 Subject: [PATCH 4/7] #5390: [TEST] Attempt to fix several flaky tests increasing waiting intervals Implements #5390 * ISL RTT stats test gives more time for last stats collected before turning stats off * Flow stats test gives more time for last stats collected after swapping the path * Storm restart spec now waits longer for links discovery in post-test * Whole run now waits longer for lab to be operational --- .../functionaltests/HealthCheckSpecification.groovy | 2 +- .../functionaltests/spec/server42/Server42IslRttSpec.groovy | 6 ++++-- .../functionaltests/spec/stats/FlowStatSpec.groovy | 2 +- .../functionaltests/spec/xresilience/StormLcmSpec.groovy | 2 +- 4 files changed, 7 insertions(+), 5 deletions(-) diff --git a/src-java/testing/functional-tests/src/test/groovy/org/openkilda/functionaltests/HealthCheckSpecification.groovy b/src-java/testing/functional-tests/src/test/groovy/org/openkilda/functionaltests/HealthCheckSpecification.groovy index 98026a74b60..5ff79657979 100644 --- a/src-java/testing/functional-tests/src/test/groovy/org/openkilda/functionaltests/HealthCheckSpecification.groovy +++ b/src-java/testing/functional-tests/src/test/groovy/org/openkilda/functionaltests/HealthCheckSpecification.groovy @@ -21,7 +21,7 @@ import org.springframework.beans.factory.annotation.Value @Slf4j class HealthCheckSpecification extends HealthCheckBaseSpecification { - private static final int WAIT_FOR_LAB_TO_BE_OPERATIONAL = 120 //sec + private static final int WAIT_FOR_LAB_TO_BE_OPERATIONAL = 210 //sec static Throwable healthCheckError static boolean healthCheckRan = false @Value('${health_check.verifier:true}') diff --git a/src-java/testing/functional-tests/src/test/groovy/org/openkilda/functionaltests/spec/server42/Server42IslRttSpec.groovy b/src-java/testing/functional-tests/src/test/groovy/org/openkilda/functionaltests/spec/server42/Server42IslRttSpec.groovy index 5ae91ee2df9..d708d47ea10 100644 --- a/src-java/testing/functional-tests/src/test/groovy/org/openkilda/functionaltests/spec/server42/Server42IslRttSpec.groovy +++ b/src-java/testing/functional-tests/src/test/groovy/org/openkilda/functionaltests/spec/server42/Server42IslRttSpec.groovy @@ -2,6 +2,7 @@ package org.openkilda.functionaltests.spec.server42 import org.openkilda.functionaltests.model.stats.IslStats import org.springframework.beans.factory.annotation.Autowired +import spock.lang.Unroll import static com.shazam.shazamcrest.matcher.Matchers.sameBeanAs import static groovyx.gpars.GParsPool.withPool @@ -58,7 +59,8 @@ class Server42IslRttSpec extends HealthCheckSpecification { int statsWaitSeconds = 4 @Tags([LOW_PRIORITY]) - def "ISL RTT stats are #testLabel available only if both global and switch toggles are 'on'"() { + @Unroll + def "ISL RTT stats are available only if both global and switch toggles are 'on'"() { given: "An active ISL with both switches having server42" def server42switchesDpIds = topology.getActiveServer42Switches()*.dpId def isl = topology.islsForActiveSwitches.find { @@ -658,7 +660,7 @@ class Server42IslRttSpec extends HealthCheckSpecification { void checkIslRttStats(Isl isl, Date checkpointTime, Boolean statExist) { //wait till near real-time stats arrive to TSDB - sleep(statsWaitSeconds * 1000) + sleep((statsWaitSeconds + 1) * 1000) def stats = islStats.of(isl).get(ISL_RTT, SERVER_42) if (statExist) { assert stats.hasNonZeroValuesAfter(checkpointTime.getTime()) diff --git a/src-java/testing/functional-tests/src/test/groovy/org/openkilda/functionaltests/spec/stats/FlowStatSpec.groovy b/src-java/testing/functional-tests/src/test/groovy/org/openkilda/functionaltests/spec/stats/FlowStatSpec.groovy index d2ee0e3f101..1b096c77ed9 100644 --- a/src-java/testing/functional-tests/src/test/groovy/org/openkilda/functionaltests/spec/stats/FlowStatSpec.groovy +++ b/src-java/testing/functional-tests/src/test/groovy/org/openkilda/functionaltests/spec/stats/FlowStatSpec.groovy @@ -97,7 +97,7 @@ class FlowStatSpec extends HealthCheckSpecification { } and: "Wait till stats from old main path are collected" - Wrappers.wait(WAIT_OFFSET, 3) { + Wrappers.wait(statsRouterRequestInterval, 3) { statsHelper."force kilda to collect stats"() def newStats = flowStats.of(flow.getFlowId()) assert newStats.get(FLOW_RAW_BYTES, srcSwitchId, mainForwardCookie).getNewestTimeStamp() == diff --git a/src-java/testing/functional-tests/src/test/groovy/org/openkilda/functionaltests/spec/xresilience/StormLcmSpec.groovy b/src-java/testing/functional-tests/src/test/groovy/org/openkilda/functionaltests/spec/xresilience/StormLcmSpec.groovy index 152aaaf2add..7dacaa60f0c 100644 --- a/src-java/testing/functional-tests/src/test/groovy/org/openkilda/functionaltests/spec/xresilience/StormLcmSpec.groovy +++ b/src-java/testing/functional-tests/src/test/groovy/org/openkilda/functionaltests/spec/xresilience/StormLcmSpec.groovy @@ -140,7 +140,7 @@ class StormLcmSpec extends HealthCheckSpecification { !networkDeployed && wfmManipulator.deployTopology("network") srcBlockData && lockKeeper.reviveSwitch(islUnderTest.srcSwitch, srcBlockData) dstBlockData && lockKeeper.reviveSwitch(islUnderTest.dstSwitch, dstBlockData) - Wrappers.wait(WAIT_OFFSET + discoveryInterval) { + Wrappers.wait(WAIT_OFFSET + discoveryTimeout) { assert database.getIsls(topology.getIsls()).every {it.status == IslStatus.ACTIVE} assert northbound.getAllLinks().every {it.state == IslChangeType.DISCOVERED} } From e20ecd92aa359d2761d08556314447f90580bcfb Mon Sep 17 00:00:00 2001 From: Yuliia Miroshnychenko Date: Tue, 24 Oct 2023 15:03:39 +0200 Subject: [PATCH 5/7] [TEST]: Server42: Isl Rtt: Fixing refactoring issue --- .../functionaltests/spec/server42/Server42IslRttSpec.groovy | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src-java/testing/functional-tests/src/test/groovy/org/openkilda/functionaltests/spec/server42/Server42IslRttSpec.groovy b/src-java/testing/functional-tests/src/test/groovy/org/openkilda/functionaltests/spec/server42/Server42IslRttSpec.groovy index 5ae91ee2df9..d0e5e55495e 100644 --- a/src-java/testing/functional-tests/src/test/groovy/org/openkilda/functionaltests/spec/server42/Server42IslRttSpec.groovy +++ b/src-java/testing/functional-tests/src/test/groovy/org/openkilda/functionaltests/spec/server42/Server42IslRttSpec.groovy @@ -670,7 +670,7 @@ class Server42IslRttSpec extends HealthCheckSpecification { void verifyLatencyValueIsCorrect(Isl isl) { def t = new Date() t.setSeconds(t.getSeconds() - 600) //kilda_latency_update_time_range: 600 - def stats = islStats.of(isl).get(ISL_RTT).getDataPoints() + def stats = islStats.of(isl).get(ISL_RTT, SERVER_42).getDataPoints() def expected = stats.values().average() def actual = northbound.getLink(isl).latency assert Math.abs(expected - actual) <= expected * 0.25 From c2c6f2816fde00d3698d2ae4a63619349ab033bd Mon Sep 17 00:00:00 2001 From: Sergey Nikitin Date: Fri, 27 Oct 2023 00:36:40 +0200 Subject: [PATCH 6/7] Store datapoints into special storage to save memory New Storage was introduced in OpenTsdbTopology to save RAM. It stores datapoints as strings. It saves 40% of memory in average --- .../opentsdb/bolts/OpenTsdbFilterBolt.java | 41 ++- .../wfm/topology/opentsdb/models/Storage.java | 103 ++++++++ .../OpenTsdbFilterBoltTest.java | 3 +- .../topology/opentsdb/models/StorageTest.java | 242 ++++++++++++++++++ 4 files changed, 364 insertions(+), 25 deletions(-) create mode 100644 src-java/opentsdb-topology/opentsdb-storm-topology/src/main/java/org/openkilda/wfm/topology/opentsdb/models/Storage.java rename src-java/opentsdb-topology/opentsdb-storm-topology/src/test/java/org/openkilda/wfm/topology/opentsdb/{bolt => bolts}/OpenTsdbFilterBoltTest.java (98%) create mode 100644 src-java/opentsdb-topology/opentsdb-storm-topology/src/test/java/org/openkilda/wfm/topology/opentsdb/models/StorageTest.java diff --git a/src-java/opentsdb-topology/opentsdb-storm-topology/src/main/java/org/openkilda/wfm/topology/opentsdb/bolts/OpenTsdbFilterBolt.java b/src-java/opentsdb-topology/opentsdb-storm-topology/src/main/java/org/openkilda/wfm/topology/opentsdb/bolts/OpenTsdbFilterBolt.java index 0f4866c1d23..0ce54248409 100644 --- a/src-java/opentsdb-topology/opentsdb-storm-topology/src/main/java/org/openkilda/wfm/topology/opentsdb/bolts/OpenTsdbFilterBolt.java +++ b/src-java/opentsdb-topology/opentsdb-storm-topology/src/main/java/org/openkilda/wfm/topology/opentsdb/bolts/OpenTsdbFilterBolt.java @@ -16,8 +16,9 @@ package org.openkilda.wfm.topology.opentsdb.bolts; import org.openkilda.messaging.info.Datapoint; +import org.openkilda.wfm.topology.opentsdb.models.Storage; +import org.openkilda.wfm.topology.opentsdb.models.Storage.DatapointValue; -import lombok.Value; import org.apache.storm.Config; import org.apache.storm.Constants; import org.apache.storm.task.OutputCollector; @@ -30,7 +31,6 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.util.HashMap; import java.util.Map; import java.util.concurrent.TimeUnit; @@ -43,7 +43,7 @@ public class OpenTsdbFilterBolt extends BaseRichBolt { public static final Fields STREAM_FIELDS = new Fields(FIELD_ID_DATAPOINT); - private Map storage = new HashMap<>(); + private final Storage storage = new Storage(); private OutputCollector collector; @Override @@ -63,11 +63,14 @@ public void execute(Tuple tuple) { if (isTickTuple(tuple)) { // opentsdb using current epoch time (date +%s) in seconds - long now = System.currentTimeMillis(); - storage.entrySet().removeIf(entry -> now - entry.getValue().getTime() > MUTE_IF_NO_UPDATES_MILLIS); + int initialSize = storage.size(); + storage.removeOutdated(MUTE_IF_NO_UPDATES_MILLIS); + if (LOGGER.isDebugEnabled()) { + LOGGER.debug("Removed {} outdated datapoints from the storage", initialSize - storage.size()); + } if (LOGGER.isTraceEnabled()) { - LOGGER.trace("storage after clean tuple: {}", storage.toString()); + LOGGER.trace("storage after clean tuple: {}", storage); } collector.ack(tuple); @@ -100,27 +103,27 @@ public void declareOutputFields(OutputFieldsDeclarer declarer) { private void addDatapoint(Datapoint datapoint) { LOGGER.debug("adding datapoint: {}", datapoint); LOGGER.debug("storage.size: {}", storage.size()); - storage.put(new DatapointKey(datapoint.getMetric(), datapoint.getTags()), datapoint); + storage.add(datapoint); if (LOGGER.isTraceEnabled()) { - LOGGER.trace("addDatapoint storage: {}", storage.toString()); + LOGGER.trace("addDatapoint storage: {}", storage); } } private boolean isUpdateRequired(Datapoint datapoint) { boolean update = true; - Datapoint prevDatapoint = storage.get(new DatapointKey(datapoint.getMetric(), datapoint.getTags())); + DatapointValue prevDatapointValue = storage.get(datapoint); - if (prevDatapoint != null) { + if (prevDatapointValue != null) { if (LOGGER.isTraceEnabled()) { LOGGER.trace("prev: {} cur: {} equals: {} time_delta: {}", - prevDatapoint, + prevDatapointValue, datapoint, - prevDatapoint.getValue().equals(datapoint.getValue()), - datapoint.getTime() - prevDatapoint.getTime() + prevDatapointValue.getValue().equals(datapoint.getValue()), + datapoint.getTime() - prevDatapointValue.getTime() ); } - update = !prevDatapoint.getValue().equals(datapoint.getValue()) - || datapoint.getTime() - prevDatapoint.getTime() >= MUTE_IF_NO_UPDATES_MILLIS; + update = !prevDatapointValue.getValue().equals(datapoint.getValue()) + || datapoint.getTime() - prevDatapointValue.getTime() >= MUTE_IF_NO_UPDATES_MILLIS; } return update; } @@ -136,12 +139,4 @@ private boolean isTickTuple(Tuple tuple) { private Values makeDefaultTuple(Datapoint datapoint) { return new Values(datapoint); } - - @Value - private static class DatapointKey { - - private String metric; - - private Map tags; - } } diff --git a/src-java/opentsdb-topology/opentsdb-storm-topology/src/main/java/org/openkilda/wfm/topology/opentsdb/models/Storage.java b/src-java/opentsdb-topology/opentsdb-storm-topology/src/main/java/org/openkilda/wfm/topology/opentsdb/models/Storage.java new file mode 100644 index 00000000000..9d90c7bcc6a --- /dev/null +++ b/src-java/opentsdb-topology/opentsdb-storm-topology/src/main/java/org/openkilda/wfm/topology/opentsdb/models/Storage.java @@ -0,0 +1,103 @@ +/* Copyright 2023 Telstra Open Source + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.openkilda.wfm.topology.opentsdb.models; + +import org.openkilda.messaging.info.Datapoint; + +import com.google.common.annotations.VisibleForTesting; +import lombok.ToString; +import lombok.Value; + +import java.io.Serializable; +import java.util.HashMap; +import java.util.Map; +import java.util.Map.Entry; +import java.util.Optional; +import java.util.SortedMap; +import java.util.TreeMap; + +@ToString +public class Storage implements Serializable { + public static final String NULL_KEY = "null"; + public static final char TAG_KEY_DELIMITER = '_'; + public static final char TAG_VALUE_DELIMITER = ':'; + public static final String NULL_TAG = "NULL_TAG"; + + private final Map map; + + public Storage() { + this.map = new HashMap<>(); + } + + public void add(Datapoint datapoint) { + map.put(createKey(datapoint), createValue(datapoint)); + } + + public DatapointValue get(Datapoint datapoint) { + return map.get(createKey(datapoint)); + } + + public void removeOutdated(long ttlInMillis) { + long now = System.currentTimeMillis(); + map.entrySet().removeIf(entry -> now - entry.getValue().getTime() > ttlInMillis); + } + + public int size() { + return map.size(); + } + + @VisibleForTesting + static String createKey(Datapoint datapoint) { + if (datapoint == null) { + return NULL_KEY; + } + StringBuilder key = new StringBuilder(); + key.append(datapoint.getMetric()); + + if (datapoint.getTags() != null) { + SortedMap sortedTags = getSortedTags(datapoint); + for (Entry entry : sortedTags.entrySet()) { + key.append(TAG_KEY_DELIMITER); + key.append(entry.getKey()); + key.append(TAG_VALUE_DELIMITER); + key.append(entry.getValue()); + } + } + return key.toString(); + } + + private static DatapointValue createValue(Datapoint datapoint) { + if (datapoint == null) { + return null; + } + return new DatapointValue(datapoint.getValue(), datapoint.getTime()); + } + + private static SortedMap getSortedTags(Datapoint datapoint) { + SortedMap sortedTags = new TreeMap<>(); + for (Entry entry : datapoint.getTags().entrySet()) { + String key = Optional.ofNullable(entry.getKey()).orElse(NULL_TAG); + sortedTags.put(key, entry.getValue()); + } + return sortedTags; + } + + @Value + public static class DatapointValue implements Serializable { + Number value; + Long time; + } +} diff --git a/src-java/opentsdb-topology/opentsdb-storm-topology/src/test/java/org/openkilda/wfm/topology/opentsdb/bolt/OpenTsdbFilterBoltTest.java b/src-java/opentsdb-topology/opentsdb-storm-topology/src/test/java/org/openkilda/wfm/topology/opentsdb/bolts/OpenTsdbFilterBoltTest.java similarity index 98% rename from src-java/opentsdb-topology/opentsdb-storm-topology/src/test/java/org/openkilda/wfm/topology/opentsdb/bolt/OpenTsdbFilterBoltTest.java rename to src-java/opentsdb-topology/opentsdb-storm-topology/src/test/java/org/openkilda/wfm/topology/opentsdb/bolts/OpenTsdbFilterBoltTest.java index d86afec91c8..f5a7eaac3a4 100644 --- a/src-java/opentsdb-topology/opentsdb-storm-topology/src/test/java/org/openkilda/wfm/topology/opentsdb/bolt/OpenTsdbFilterBoltTest.java +++ b/src-java/opentsdb-topology/opentsdb-storm-topology/src/test/java/org/openkilda/wfm/topology/opentsdb/bolts/OpenTsdbFilterBoltTest.java @@ -13,7 +13,7 @@ * limitations under the License. */ -package org.openkilda.wfm.topology.opentsdb.bolt; +package org.openkilda.wfm.topology.opentsdb.bolts; import static java.util.Collections.singletonMap; import static org.apache.storm.Constants.SYSTEM_COMPONENT_ID; @@ -30,7 +30,6 @@ import org.openkilda.messaging.info.Datapoint; import org.openkilda.messaging.info.InfoData; -import org.openkilda.wfm.topology.opentsdb.bolts.OpenTsdbFilterBolt; import org.apache.storm.task.OutputCollector; import org.apache.storm.tuple.Tuple; diff --git a/src-java/opentsdb-topology/opentsdb-storm-topology/src/test/java/org/openkilda/wfm/topology/opentsdb/models/StorageTest.java b/src-java/opentsdb-topology/opentsdb-storm-topology/src/test/java/org/openkilda/wfm/topology/opentsdb/models/StorageTest.java new file mode 100644 index 00000000000..226ba888776 --- /dev/null +++ b/src-java/opentsdb-topology/opentsdb-storm-topology/src/test/java/org/openkilda/wfm/topology/opentsdb/models/StorageTest.java @@ -0,0 +1,242 @@ +/* Copyright 2023 Telstra Open Source + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.openkilda.wfm.topology.opentsdb.models; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertNotEquals; +import static org.junit.jupiter.api.Assertions.assertNull; +import static org.openkilda.wfm.topology.opentsdb.models.Storage.NULL_TAG; +import static org.openkilda.wfm.topology.opentsdb.models.Storage.createKey; + +import org.openkilda.messaging.info.Datapoint; +import org.openkilda.wfm.topology.opentsdb.models.Storage.DatapointValue; + +import org.junit.jupiter.api.Test; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Random; + +public class StorageTest { + + public static final String KEY_1 = "key1"; + public static final String KEY_2 = "key2"; + public static final String KEY_3 = "key3"; + public static final String VALUE_1 = "value1"; + public static final String VALUE_2 = "value2"; + public static final String METRIC_1 = "metric1"; + public static final String METRIC_2 = "metric2"; + public static final String METRIC_3 = "metric3"; + private static final Random random = new Random(); + + @Test + public void testStorageUniqueness() { + int size = 1000; + List datapoints = new ArrayList<>(); + Storage storage = new Storage(); + for (int i = 0; i < size; i++) { + Datapoint datapoint = createDatapoint(i); + datapoints.add(datapoint); + storage.add(datapoint); + } + assertEquals(size, storage.size()); + + for (Datapoint datapoint : datapoints) { + assertDataPointValue(datapoint, storage.get(datapoint)); + } + } + + @Test + public void testStorageKeyNullDatapoint() { + assertEquals(Storage.NULL_KEY, createKey(null)); + } + + @Test + public void testStorageKeyEmptyDatapoint() { + assertEquals("null", createKey(new Datapoint(null, null, null, null))); + } + + @Test + public void testStorageKeyNullTag() { + String key = createKey(createDatapoint(METRIC_1, createTags(null, VALUE_1, KEY_2, null))); + assertEquals(METRIC_1 + "_" + NULL_TAG + ":" + VALUE_1 + "_" + KEY_2 + ":null", key); + } + + @Test + public void testStorageKey() { + String key = createKey(createDatapoint(METRIC_2, createTags(KEY_1, VALUE_1, KEY_2, VALUE_2))); + assertEquals(METRIC_2 + "_" + KEY_1 + ":" + VALUE_1 + "_" + KEY_2 + ":" + VALUE_2, key); + } + + @Test + public void testStorageDifferentTagOrder() { + Datapoint datapoint1 = createDatapoint(METRIC_1, createTags(KEY_1, VALUE_1, KEY_2, VALUE_2), 1); + Datapoint datapoint2 = createDatapoint(METRIC_1, createTags(KEY_2, VALUE_2, KEY_1, VALUE_1), 2); + assertEquals(createKey(datapoint1), createKey(datapoint2)); + + Storage storage = createStorage(datapoint1, datapoint2); + assertEquals(1, storage.size()); + assertDataPointValue(datapoint2, storage.get(datapoint1)); + } + + @Test + public void testStorageKeyDifferentTags() { + Datapoint datapoint1 = createDatapoint(METRIC_1, createTags(KEY_1, VALUE_1, KEY_2, VALUE_2), 1); + Datapoint datapoint2 = createDatapoint(METRIC_1, createTags(KEY_1, VALUE_1, KEY_3, VALUE_2), 2); + assertNotEquals(createKey(datapoint1), createKey(datapoint2)); + + Storage storage = createStorage(datapoint1, datapoint2); + assertEquals(2, storage.size()); + + assertDataPointValue(datapoint1, storage.get(datapoint1)); + assertDataPointValue(datapoint2, storage.get(datapoint2)); + } + + @Test + public void testStorageKeyDifferentValues() { + Datapoint datapoint1 = createDatapoint(METRIC_1, createTags(KEY_1, VALUE_1, KEY_2, VALUE_2), 1); + Datapoint datapoint2 = createDatapoint(METRIC_1, createTags(KEY_1, VALUE_1, KEY_2, VALUE_1), 2); + assertNotEquals(createKey(datapoint1), createKey(datapoint2)); + + Storage storage = createStorage(datapoint1, datapoint2); + assertEquals(2, storage.size()); + + assertDataPointValue(datapoint1, storage.get(datapoint1)); + assertDataPointValue(datapoint2, storage.get(datapoint2)); + } + + @Test + public void testStorageKeyDifferentMetrics() { + Datapoint datapoint1 = createDatapoint(METRIC_1, createTags(KEY_1, VALUE_1, KEY_2, VALUE_2), 1); + Datapoint datapoint2 = createDatapoint(METRIC_2, createTags(KEY_1, VALUE_1, KEY_2, VALUE_2), 2); + assertNotEquals(createKey(datapoint1), createKey(datapoint2)); + + Storage storage = createStorage(datapoint1, datapoint2); + assertEquals(2, storage.size()); + + assertDataPointValue(datapoint1, storage.get(datapoint1)); + assertDataPointValue(datapoint2, storage.get(datapoint2)); + } + + @Test + public void testStorageGetNoneExistent() { + Datapoint datapoint1 = createDatapoint(METRIC_1, createTags(KEY_1, VALUE_1, KEY_2, VALUE_2), 1); + Datapoint datapoint2 = createDatapoint(METRIC_2, createTags(KEY_1, VALUE_1, KEY_2, VALUE_2), 2); + + Storage storage = createStorage(datapoint1); + assertEquals(1, storage.size()); + + assertDataPointValue(datapoint1, storage.get(datapoint1)); + assertNull(storage.get(datapoint2)); + } + + @Test + public void testRemoveOutdatedAll() { + long now = System.currentTimeMillis(); + Datapoint datapoint1 = createDatapoint(METRIC_1, now, createTags(KEY_1, VALUE_1, KEY_2, VALUE_2), 1); + Datapoint datapoint2 = createDatapoint(METRIC_2, now - 1_000, createTags(KEY_1, VALUE_1, KEY_3, VALUE_2), 2); + Datapoint datapoint3 = createDatapoint(METRIC_3, now - 2_000, createTags(KEY_1, VALUE_1, KEY_3, VALUE_2), 2); + + Storage storage = createStorage(datapoint1, datapoint2, datapoint3); + assertEquals(3, storage.size()); + + storage.removeOutdated(-500); + assertEquals(0, storage.size()); + + assertNull(storage.get(datapoint1)); + assertNull(storage.get(datapoint2)); + assertNull(storage.get(datapoint3)); + } + + @Test + public void testRemoveTwoOutdated() { + long now = System.currentTimeMillis(); + Datapoint datapoint1 = createDatapoint(METRIC_1, now, createTags(KEY_1, VALUE_1, KEY_2, VALUE_2), 1); + Datapoint datapoint2 = createDatapoint(METRIC_2, now - 1_000, createTags(KEY_1, VALUE_1, KEY_3, VALUE_2), 2); + Datapoint datapoint3 = createDatapoint(METRIC_3, now - 2_000, createTags(KEY_1, VALUE_1, KEY_3, VALUE_2), 2); + + Storage storage = createStorage(datapoint1, datapoint2, datapoint3); + assertEquals(3, storage.size()); + + storage.removeOutdated(500); + assertEquals(1, storage.size()); + + assertDataPointValue(datapoint1, storage.get(datapoint1)); + assertNull(storage.get(datapoint2)); + assertNull(storage.get(datapoint3)); + } + + + @Test + public void testRemoveNoneOutdated() { + long now = System.currentTimeMillis(); + Datapoint datapoint1 = createDatapoint(METRIC_1, now, createTags(KEY_1, VALUE_1, KEY_2, VALUE_2), 1); + Datapoint datapoint2 = createDatapoint(METRIC_2, now - 1_000, createTags(KEY_1, VALUE_1, KEY_3, VALUE_2), 2); + Datapoint datapoint3 = createDatapoint(METRIC_3, now - 2_000, createTags(KEY_1, VALUE_1, KEY_3, VALUE_2), 3); + + Storage storage = createStorage(datapoint1, datapoint2, datapoint3); + assertEquals(3, storage.size()); + + storage.removeOutdated(5_000); + assertEquals(3, storage.size()); + + assertDataPointValue(datapoint1, storage.get(datapoint1)); + assertDataPointValue(datapoint2, storage.get(datapoint2)); + assertDataPointValue(datapoint3, storage.get(datapoint3)); + } + + private static Map createTags(String key1, String value1, String key2, String value2) { + Map map = new HashMap<>(); + map.put(key1, value1); + map.put(key2, value2); + return map; + } + + private static Datapoint createDatapoint(String metric, Long time, Map tags, long value) { + return new Datapoint(metric, time, tags, value); + } + + private static Datapoint createDatapoint(String metric, Map tags, long value) { + return createDatapoint(metric, random.nextLong(), tags, value); + } + + private static Datapoint createDatapoint(String metric, Map tags) { + return createDatapoint(metric, random.nextLong(), tags, random.nextInt()); + } + + private static Datapoint createDatapoint(int number) { + Map tags = new HashMap<>(); + for (int i = 0; i < 5; i++) { + tags.put("key" + i, "value" + i); + } + return new Datapoint("some.metric." + number, (long) number, tags, number); + } + + private static Storage createStorage(Datapoint... datapoints) { + Storage storage = new Storage(); + for (Datapoint datapoint : datapoints) { + storage.add(datapoint); + } + return storage; + } + + private static void assertDataPointValue(Datapoint expectedDatapoint, DatapointValue actualValue) { + assertEquals(expectedDatapoint.getValue(), actualValue.getValue()); + assertEquals(expectedDatapoint.getTime(), actualValue.getTime()); + } +} From abaf26a61440c43630964c462ed763158b6b646c Mon Sep 17 00:00:00 2001 From: Sergey Nikitin Date: Fri, 27 Oct 2023 15:14:54 +0200 Subject: [PATCH 7/7] Update CHANGELOG.md (release 1.145.2) --- CHANGELOG.md | 18 +++++++++++++++++- 1 file changed, 17 insertions(+), 1 deletion(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 5354607839a..dbb07500e76 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,6 +1,22 @@ # Changelog -## v1.145.1 (24/10/2023) +## v1.145.2 (30/10/2023) + +### Bug Fixes: +- [#5381](https://github.com/telstra/open-kilda/pull/5381) skip haYPoint=endpoint when update periodic pings + +### Improvements: +- [#5444](https://github.com/telstra/open-kilda/pull/5444) Add a check whether a migrate script completed successfully. [**configuration**] +- [#5447](https://github.com/telstra/open-kilda/pull/5447) [TEST]: 5224: Ha-Flow: Ping: Updating switch triplet selection +- [#5453](https://github.com/telstra/open-kilda/pull/5453) #5390: [TEST] Attempt to fix several flaky tests increasing waiting intervals (Issues: [#5390](https://github.com/telstra/open-kilda/issues/5390) [#5390](https://github.com/telstra/open-kilda/issues/5390)) [**tests**] +- [#5454](https://github.com/telstra/open-kilda/pull/5454) [TEST]: Server42: Isl Rtt:  Fixing refactoring issue +- [#5456](https://github.com/telstra/open-kilda/pull/5456) Store datapoints into special storage to save memory + +For the complete list of changes, check out [the commit log](https://github.com/telstra/open-kilda/compare/v1.145.1...v1.145.2). + +--- + +## v1.145.1 (26/10/2023) ### Bug Fixes: - [#5445](https://github.com/telstra/open-kilda/pull/5445) Do not write false 'flow not found' log if monitoring is disabled